Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions app/c_connection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
"""Async connection wrapper for TCP sessions.

Wraps asyncio StreamReader/StreamWriter to provide a simple async
send/recv interface, replacing the deprecated use of raw sockets
obtained via writer.get_extra_info('socket') which returns a
TransportSocket that lacks send()/recv() methods.

See: https://github.com/mitre/manx/issues/51
"""

from app.utility.base_object import BaseObject


class Connection(BaseObject):

def __init__(self, reader, writer):
super().__init__()
self.reader = reader
self.writer = writer

async def recv(self, num_bytes):
return await self.reader.read(num_bytes)

async def send(self, data):
self.writer.write(data)
await self.writer.drain()
29 changes: 29 additions & 0 deletions app/c_session.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
"""TCP session model for the manx plugin.

Imported by caldera's contact_tcp.py (v5.0.0) as:
from plugins.manx.app.c_session import Session

See: https://github.com/mitre/manx/issues/51
"""

from app.utility.base_object import BaseObject


class Session(BaseObject):

@property
def unique(self):
return self.hash('%s' % self.paw)

def __init__(self, id, paw, connection):
super().__init__()
self.id = id
self.paw = paw
self.connection = connection

def store(self, ram):
existing = self.retrieve(ram['sessions'], self.unique)
if not existing:
ram['sessions'].append(self)
return self.retrieve(ram['sessions'], self.unique)
return existing
127 changes: 127 additions & 0 deletions app/tcp_patch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
"""Patch for caldera's TcpSessionHandler to fix deprecated socket API.

Caldera 5.0.0's contact_tcp.py uses writer.get_extra_info('socket') which
returns a TransportSocket object that lacks send()/recv() methods, causing:

AttributeError: 'TransportSocket' object has no attribute 'send'

This module patches the TCP handler at runtime to use asyncio
StreamReader/StreamWriter via the Connection wrapper instead.

If caldera has already been updated to use its own TCPSession class (main
branch), the patch detects this and is a no-op.

See: https://github.com/mitre/manx/issues/51
"""

import json
import socket
import time

from typing import Tuple

from plugins.manx.app.c_connection import Connection
from plugins.manx.app.c_session import Session


def _needs_patching(handler):
"""Check if the handler uses the old broken socket API.

Returns False if the handler already uses modern stream-based sessions
(e.g., caldera main branch with TCPSession).
"""
# If the accept method references 'get_extra_info', it needs patching.
# If it's already been patched or uses TCPSession, skip.
import inspect
try:
source = inspect.getsource(handler.accept)
return 'get_extra_info' in source
except (OSError, TypeError):
# Can't inspect, assume patching is safe (it's idempotent)
return True
Comment on lines +33 to +41


async def _patched_accept(self, reader, writer):
"""Replacement for TcpSessionHandler.accept that uses Connection wrapper."""
try:
profile = await self._handshake(reader)
except Exception as e:
self.log.debug('Handshake failed: %s' % e)
return
connection = Connection(reader, writer)
profile['executors'] = [e for e in profile['executors'].split(',') if e]
profile['contact'] = 'tcp'
agent, _ = await self.services.get('contact_svc').handle_heartbeat(**profile)
new_session = Session(id=self.generate_number(size=6), paw=agent.paw, connection=connection)
self.sessions.append(new_session)
await self.send(new_session.id, agent.paw, timeout=5)


async def _patched_refresh(self):
"""Replacement for TcpSessionHandler.refresh that uses async send."""
index = 0
while index < len(self.sessions):
session = self.sessions[index]
try:
await session.connection.send(str.encode(' '))
except (socket.error, OSError, ConnectionError):
self.log.debug(
'Error occurred when refreshing session %s. Removing from session pool.',
session.id
)
del self.sessions[index]
else:
index += 1


async def _patched_send(self, session_id: int, cmd: str, timeout: int = 60) -> Tuple[int, str, str, str]:
"""Replacement for TcpSessionHandler.send that uses async Connection."""
try:
conn = next(i.connection for i in self.sessions if i.id == int(session_id))
await conn.send(str.encode(' '))
time.sleep(0.01)
await conn.send(str.encode('%s\n' % cmd))
Comment on lines +80 to +83
response = await _patched_attempt_connection(self, session_id, conn, timeout=timeout)
response = json.loads(response)
return response['status'], response['pwd'], response['response'], response.get('agent_reported_time', '')
except Exception as e:
self.log.exception(e)
return 1, '~$ ', str(e), ''


async def _patched_attempt_connection(self, session_id, connection, timeout):
"""Replacement for TcpSessionHandler._attempt_connection using async recv."""
buffer = 4096
data = b''
waited_seconds = 0
time.sleep(0.1) # initial wait for fast operations.
while True:
try:
part = await connection.recv(buffer)
data += part
if len(part) < buffer:
break
Comment on lines +101 to +103
except BlockingIOError as err:
if waited_seconds < timeout:
time.sleep(1)
waited_seconds += 1
else:
self.log.error("Timeout reached for session %s", session_id)
return json.dumps(dict(status=1, pwd='~$ ', response=str(err)))
Comment on lines +92 to +110
return str(data, 'utf-8')
Comment on lines +92 to +111


def patch_tcp_handler(handler):
"""Apply the modern async stream patches to a TcpSessionHandler instance.

This is safe to call even if the handler has already been updated:
it checks whether patching is needed first.
"""
if not _needs_patching(handler):
return

import types
handler.accept = types.MethodType(_patched_accept, handler)
handler.refresh = types.MethodType(_patched_refresh, handler)
handler.send = types.MethodType(_patched_send, handler)
handler._attempt_connection = types.MethodType(_patched_attempt_connection, handler)
9 changes: 9 additions & 0 deletions hook.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from app.utility.base_world import BaseWorld
from plugins.manx.app.h_terminal import Handle
from plugins.manx.app.term_api import TermApi
from plugins.manx.app.tcp_patch import patch_tcp_handler

name = 'Terminal'
description = 'A toolset which supports terminal access'
Expand All @@ -13,6 +14,14 @@ async def enable(services):
app = services.get('app_svc').application
term_api = TermApi(services)

# Patch the TCP handler to fix deprecated socket API (issue #51).
# Caldera 5.0.0 uses writer.get_extra_info('socket') which returns a
# TransportSocket that lacks send()/recv(). This patch replaces the
# handler methods to use asyncio StreamReader/StreamWriter instead.
tcp_contacts = [c for c in services.get('contact_svc').contacts if c.name == 'tcp']
if tcp_contacts:
patch_tcp_handler(tcp_contacts[0].tcp_handler)

udp_contact = [c for c in services.get('contact_svc').contacts if c.name == 'websocket']
udp_contact[0].handler.handles.append(Handle(tag='manx'))

Expand Down
Empty file added tests/__init__.py
Empty file.
55 changes: 55 additions & 0 deletions tests/test_c_connection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
"""Tests for the Connection wrapper class."""

import asyncio
from unittest import mock

import pytest

from plugins.manx.app.c_connection import Connection


@pytest.fixture
def mock_reader():
reader = mock.AsyncMock()
return reader


@pytest.fixture
def mock_writer():
writer = mock.Mock()
writer.write = mock.Mock()
writer.drain = mock.AsyncMock()
return writer


@pytest.fixture
def connection(mock_reader, mock_writer):
return Connection(mock_reader, mock_writer)


class TestConnection:

@pytest.mark.asyncio
async def test_recv_delegates_to_reader(self, connection, mock_reader):
mock_reader.read.return_value = b'hello'
result = await connection.recv(1024)
assert result == b'hello'
mock_reader.read.assert_awaited_once_with(1024)

@pytest.mark.asyncio
async def test_send_writes_and_drains(self, connection, mock_writer):
await connection.send(b'world')
mock_writer.write.assert_called_once_with(b'world')
mock_writer.drain.assert_awaited_once()

@pytest.mark.asyncio
async def test_send_empty_data(self, connection, mock_writer):
await connection.send(b'')
mock_writer.write.assert_called_once_with(b'')
mock_writer.drain.assert_awaited_once()

@pytest.mark.asyncio
async def test_recv_empty(self, connection, mock_reader):
mock_reader.read.return_value = b''
result = await connection.recv(4096)
assert result == b''
27 changes: 27 additions & 0 deletions tests/test_c_session.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
"""Tests for the Session class."""

from unittest import mock

from plugins.manx.app.c_session import Session


class TestSession:

def test_session_creation(self):
conn = mock.Mock()
session = Session(id=123, paw='test_paw', connection=conn)
assert session.id == 123
assert session.paw == 'test_paw'
assert session.connection is conn

def test_unique_property(self):
conn = mock.Mock()
session = Session(id=1, paw='abc', connection=conn)
# unique should be deterministic for the same paw
assert session.unique == session.unique

def test_different_paws_different_unique(self):
conn = mock.Mock()
s1 = Session(id=1, paw='paw1', connection=conn)
s2 = Session(id=2, paw='paw2', connection=conn)
assert s1.unique != s2.unique
Loading
Loading