Skip to content

fix: replace deprecated socket send/recv with modern asyncio streams (#51)#59

Open
deacon-mp wants to merge 1 commit intomitre:masterfrom
deacon-mp:fix/issue-51-transport-socket-v2
Open

fix: replace deprecated socket send/recv with modern asyncio streams (#51)#59
deacon-mp wants to merge 1 commit intomitre:masterfrom
deacon-mp:fix/issue-51-transport-socket-v2

Conversation

@deacon-mp
Copy link
Copy Markdown
Contributor

Summary

  • Fixes TransportSocket object has no attribute 'send' #51 -- TransportSocket object has no attribute 'send'
  • Adds c_connection.py (async Connection wrapper around StreamReader/StreamWriter) and c_session.py (Session class that caldera 5.0.0's contact_tcp.py imports from plugins.manx.app)
  • Adds tcp_patch.py runtime patch applied at plugin load that replaces the broken TcpSessionHandler methods (accept/refresh/send) to use the async Connection wrapper instead of the raw TransportSocket
  • The patch auto-detects whether caldera already uses the modern API (main branch TCPSession) and is a no-op in that case
  • Supersedes stale PR [VIRTS-4793] Fix deprecated socket issues #42

Changes

  • app/c_connection.py -- Async wrapper providing send()/recv() over StreamWriter.write()/StreamReader.read()
  • app/c_session.py -- Session model class imported by caldera's contact_tcp.py
  • app/tcp_patch.py -- Runtime monkey-patch for TcpSessionHandler to fix the deprecated API
  • hook.py -- Apply the patch at plugin enable time
  • tests/ -- Unit tests for Connection, Session, and the TCP patch

Root cause

Caldera 5.0.0's contact_tcp.py calls writer.get_extra_info('socket') which returns an asyncio.TransportSocket that lacks send()/recv() methods. The fix wraps the StreamReader/StreamWriter pair in a Connection object that provides async send()/recv().

Test plan

  • Start Caldera with manx plugin enabled
  • Connect a manx TCP agent
  • Verify terminal session works without TransportSocket errors
  • Verify agent stays alive and trusted while beaconing
  • Run unit tests: pytest plugins/manx/tests/

…itre#51)

Caldera 5.0.0's contact_tcp.py calls writer.get_extra_info('socket')
which returns a TransportSocket object that lacks send()/recv() methods,
causing 'TransportSocket object has no attribute send'.

This adds:
- c_connection.py: async Connection wrapper around StreamReader/StreamWriter
- c_session.py: Session class that caldera's contact_tcp.py imports
- tcp_patch.py: runtime patch applied at plugin load to fix the TCP
  handler's accept/refresh/send methods to use the async Connection
  wrapper instead of the broken TransportSocket

The patch auto-detects whether caldera has already been updated (main
branch uses its own TCPSession) and is a no-op in that case.
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR addresses Caldera 5.0.0 incompatibilities in the Manx TCP contact path by introducing stream-based abstractions and a runtime patch to replace deprecated socket send/recv usage that triggers TransportSocket errors.

Changes:

  • Added Connection (StreamReader/StreamWriter wrapper) and Session model to match Caldera’s import expectations.
  • Introduced tcp_patch.py monkey-patch to replace TcpSessionHandler methods (accept/refresh/send/_attempt_connection) when the legacy API is detected.
  • Added unit tests covering the new wrapper/model and basic patch behavior.

Reviewed changes

Copilot reviewed 7 out of 8 changed files in this pull request and generated 6 comments.

Show a summary per file
File Description
app/c_connection.py Adds async send()/recv() wrapper over asyncio streams.
app/c_session.py Adds Session model class (expected import path for Caldera TCP contact).
app/tcp_patch.py Adds runtime patch to swap TcpSessionHandler methods to stream-based I/O.
hook.py Applies the TCP handler patch during plugin enable.
tests/test_c_connection.py Unit tests for Connection wrapper behavior.
tests/test_c_session.py Unit tests for Session model behavior.
tests/test_tcp_patch.py Unit tests for patching logic and patched refresh/send behaviors.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +33 to +41
# If the accept method references 'get_extra_info', it needs patching.
# If it's already been patched or uses TCPSession, skip.
import inspect
try:
source = inspect.getsource(handler.accept)
return 'get_extra_info' in source
except (OSError, TypeError):
# Can't inspect, assume patching is safe (it's idempotent)
return True
Comment on lines +80 to +83
conn = next(i.connection for i in self.sessions if i.id == int(session_id))
await conn.send(str.encode(' '))
time.sleep(0.01)
await conn.send(str.encode('%s\n' % cmd))
async def _patched_send(self, session_id: int, cmd: str, timeout: int = 60) -> Tuple[int, str, str, str]:
"""Replacement for TcpSessionHandler.send that uses async Connection."""
try:
conn = next(i.connection for i in self.sessions if i.id == int(session_id))
Comment on lines +92 to +110
async def _patched_attempt_connection(self, session_id, connection, timeout):
"""Replacement for TcpSessionHandler._attempt_connection using async recv."""
buffer = 4096
data = b''
waited_seconds = 0
time.sleep(0.1) # initial wait for fast operations.
while True:
try:
part = await connection.recv(buffer)
data += part
if len(part) < buffer:
break
except BlockingIOError as err:
if waited_seconds < timeout:
time.sleep(1)
waited_seconds += 1
else:
self.log.error("Timeout reached for session %s", session_id)
return json.dumps(dict(status=1, pwd='~$ ', response=str(err)))
Comment on lines +101 to +103
data += part
if len(part) < buffer:
break
Comment on lines +92 to +111
async def _patched_attempt_connection(self, session_id, connection, timeout):
"""Replacement for TcpSessionHandler._attempt_connection using async recv."""
buffer = 4096
data = b''
waited_seconds = 0
time.sleep(0.1) # initial wait for fast operations.
while True:
try:
part = await connection.recv(buffer)
data += part
if len(part) < buffer:
break
except BlockingIOError as err:
if waited_seconds < timeout:
time.sleep(1)
waited_seconds += 1
else:
self.log.error("Timeout reached for session %s", session_id)
return json.dumps(dict(status=1, pwd='~$ ', response=str(err)))
return str(data, 'utf-8')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

TransportSocket object has no attribute 'send'

2 participants