From 706dae44626bed365656f0f7e061732df3786a56 Mon Sep 17 00:00:00 2001 From: "David M. Raker" Date: Fri, 26 Sep 2025 11:51:38 -0700 Subject: [PATCH 01/10] Fixed passing of tokens to proxy process. --- src/protocol_proxy/proxy/launch.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/protocol_proxy/proxy/launch.py b/src/protocol_proxy/proxy/launch.py index a8fbcf1..85a6664 100644 --- a/src/protocol_proxy/proxy/launch.py +++ b/src/protocol_proxy/proxy/launch.py @@ -26,8 +26,8 @@ def launch(launcher_func: Callable): parser = proxy_command_parser() parser, proxy_runner = launcher_func(parser) opts = parser.parse_args() - proxy_token = UUID(hex=sys.stdin.buffer.read(32).hex()) - manager_token = UUID(hex=sys.stdin.buffer.read(32).hex()) + proxy_token = UUID(hex=sys.stdin.buffer.read(32).decode('utf8')) + manager_token = UUID(hex=sys.stdin.buffer.read(32).decode('utf8')) if iscoroutinefunction(proxy_runner): run(proxy_runner(token=proxy_token, manager_token=manager_token, **vars(opts))) else: From 562326214e8cb2fd94415f2b5e5768e60559a50f Mon Sep 17 00:00:00 2001 From: "David M. Raker" Date: Sat, 27 Sep 2025 13:56:09 -0700 Subject: [PATCH 02/10] Comments and typing. --- src/protocol_proxy/ipc/asyncio.py | 4 ++-- src/protocol_proxy/ipc/decorator.py | 4 +++- src/protocol_proxy/manager/asyncio.py | 1 - src/protocol_proxy/manager/base.py | 11 ++--------- src/protocol_proxy/manager/gevent.py | 3 +-- src/protocol_proxy/proxy/asyncio.py | 3 ++- src/protocol_proxy/proxy/gevent.py | 3 ++- 7 files changed, 12 insertions(+), 17 deletions(-) diff --git a/src/protocol_proxy/ipc/asyncio.py b/src/protocol_proxy/ipc/asyncio.py index eaa5761..8945de7 100644 --- a/src/protocol_proxy/ipc/asyncio.py +++ b/src/protocol_proxy/ipc/asyncio.py @@ -108,7 +108,7 @@ def __init__(self, connector: AsyncioIPCConnector, on_lost_connection=None, buff minimum_read_size: int = 76, outgoing_message=None, protocol_version: int = 1): self.buffer_size = buffer_size self.connector: AsyncioIPCConnector = connector - # _log.debug(f'{self.connector.proxy_name} INBOUND AIPC PROTOCOL: IN PROTOCOL INIT') + # _log.debug(f'{self.connector.proxy_name} INBOUND ASyncioIPC PROTOCOL: IN PROTOCOL INIT') self.minimum_read_size = minimum_read_size # TODO: Default is V1 header length. Is this appropriate? self.outgoing_message = outgoing_message self.on_con_lost = on_lost_connection @@ -139,7 +139,7 @@ def buffer_updated(self, n_bytes: int) -> None: # if len(self.received_data) > version_end: # if not (protocol := self.connector.PROTOCOL_VERSION.get(struct.unpack('>H', self.received_data[:2])[0])): # raise NotImplementedError(f'Unknown protocol version ({protocol.VERSION})' - # f' received from: {self.transport.get_extra_info("peername")}') + # f' received from: {self.transport.get_extra_info("peer_name")}') # header_end = version_end + protocol.HEADER_LENGTH if self.count >= self.header_length: diff --git a/src/protocol_proxy/ipc/decorator.py b/src/protocol_proxy/ipc/decorator.py index d244c86..552ff58 100644 --- a/src/protocol_proxy/ipc/decorator.py +++ b/src/protocol_proxy/ipc/decorator.py @@ -1,12 +1,14 @@ import logging +from typing import Any + _log = logging.getLogger(__name__) # TODO: Do we need an Asyncio version of this? # TODO: Did this work with the AsyncResult removed (just returns, possibly within greenlet)? def callback(func): - def verify(self, ipc, headers, raw_message: any): + def verify(self, ipc, headers, raw_message: Any): if peer := ipc.peers.get(headers.sender_id): if headers.sender_token == peer.token: return func(self, headers, raw_message) diff --git a/src/protocol_proxy/manager/asyncio.py b/src/protocol_proxy/manager/asyncio.py index 8eee00a..736358c 100644 --- a/src/protocol_proxy/manager/asyncio.py +++ b/src/protocol_proxy/manager/asyncio.py @@ -1,7 +1,6 @@ import asyncio import atexit import logging -import os import signal from abc import ABC diff --git a/src/protocol_proxy/manager/base.py b/src/protocol_proxy/manager/base.py index 3c34de4..8ad32bf 100644 --- a/src/protocol_proxy/manager/base.py +++ b/src/protocol_proxy/manager/base.py @@ -45,16 +45,9 @@ def _setup_proxy_process_command(self, unique_remote_id: tuple, **kwargs) -> tup command = [sys.executable, '-m', module, '--proxy-id', proxy_id.hex, '--proxy-name', proxy_name, '--manager-id', self.proxy_id.hex, '--manager-address', self.inbound_params.address, '--manager-port', str(self.inbound_params.port), *protocol_specific_params] - - # # TODO: Discuss with Riley why/whether this block was necessary and/or helpful: - # # Set PYTHONPATH so the proxy subprocess can import protocol_proxy - # proxy_env = os.environ.copy() - # src_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '../..', 'src')) - # proxy_env['PYTHONPATH'] = src_dir + os.pathsep + proxy_env.get('PYTHONPATH', '') - # # TODO: END block to discuss. else: - command = None #, proxy_env = None, None - return command, proxy_id, proxy_name # , proxy_env + command = None + return command, proxy_id, proxy_name @classmethod @abstractmethod diff --git a/src/protocol_proxy/manager/gevent.py b/src/protocol_proxy/manager/gevent.py index 46a2986..cb5e728 100644 --- a/src/protocol_proxy/manager/gevent.py +++ b/src/protocol_proxy/manager/gevent.py @@ -79,5 +79,4 @@ def _cleanup_proxy_process(self, process: Popen, timeout: float = 5.0): except Timeout: process.kill() except Exception as e: - _log.warning(f'Exception encountered attempting to terminate proxy process: {process.pid}') - + _log.warning(f'Exception encountered attempting to terminate proxy process ({process.pid}): {e}') diff --git a/src/protocol_proxy/proxy/asyncio.py b/src/protocol_proxy/proxy/asyncio.py index 4243deb..7ecb18e 100644 --- a/src/protocol_proxy/proxy/asyncio.py +++ b/src/protocol_proxy/proxy/asyncio.py @@ -2,6 +2,7 @@ import logging from abc import ABC +from typing import cast from uuid import UUID from ..ipc.asyncio import AsyncioIPCConnector, Future, AsyncioProtocolProxyPeer, SocketParams @@ -43,7 +44,7 @@ async def send_registration(self, remote: AsyncioProtocolProxyPeer): async def start(self): await super(AsyncioProtocolProxy, self).start() - await self.send_registration(self.peers[self.manager]) + await self.send_registration(cast(AsyncioProtocolProxyPeer, self.peers[self.manager])) async with self.inbound_server: await self.inbound_server.serve_forever() diff --git a/src/protocol_proxy/proxy/gevent.py b/src/protocol_proxy/proxy/gevent.py index a8aa339..5bd7e00 100644 --- a/src/protocol_proxy/proxy/gevent.py +++ b/src/protocol_proxy/proxy/gevent.py @@ -3,6 +3,7 @@ from abc import ABC from gevent import sleep, spawn from gevent.event import AsyncResult +from typing import cast from uuid import UUID from ..ipc.gevent import GeventIPCConnector, GeventProtocolProxyPeer, SocketParams @@ -22,7 +23,7 @@ def __init__(self, *, proxy_id: UUID, token: UUID, manager_address: str, manager registration_retry_delay=registration_retry_delay, **kwargs) self.peers[manager_id] = GeventProtocolProxyPeer(proxy_id=manager_id, socket_params=self.manager_params, token=manager_token) - spawn(self.send_registration, self.peers[manager_id]) + spawn(self.send_registration, cast(GeventProtocolProxyPeer, self.peers[manager_id])) def get_local_socket_params(self) -> SocketParams: return self.inbound_server_socket.getsockname() From 86ea92275c32619190ce89f5bc2cd3d4bb2511b2 Mon Sep 17 00:00:00 2001 From: "David M. Raker" Date: Sat, 27 Sep 2025 13:58:09 -0700 Subject: [PATCH 03/10] Adjustments to abstract classes. --- src/protocol_proxy/ipc/base.py | 4 ++-- src/protocol_proxy/ipc/headers.py | 12 ++++++------ src/protocol_proxy/proxy/base.py | 4 ++-- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/src/protocol_proxy/ipc/base.py b/src/protocol_proxy/ipc/base.py index 675e0c4..fe264ef 100644 --- a/src/protocol_proxy/ipc/base.py +++ b/src/protocol_proxy/ipc/base.py @@ -1,6 +1,6 @@ import logging -from abc import abstractmethod, ABC +from abc import abstractmethod, ABC, ABCMeta from collections.abc import Generator from dataclasses import dataclass from itertools import cycle @@ -50,7 +50,7 @@ class ProtocolProxyPeer(ABC): socket_params: SocketParams = None -class IPCConnector: +class IPCConnector(metaclass=ABCMeta): PROTOCOL_VERSION = {1: HeadersV1} def __init__(self, *, proxy_id: UUID, token: UUID, proxy_name: str = None, inbound_params: SocketParams = None, diff --git a/src/protocol_proxy/ipc/headers.py b/src/protocol_proxy/ipc/headers.py index fa94930..801d8fa 100644 --- a/src/protocol_proxy/ipc/headers.py +++ b/src/protocol_proxy/ipc/headers.py @@ -1,19 +1,19 @@ -import abc import logging import struct +from abc import abstractmethod, ABCMeta from uuid import UUID _log = logging.getLogger(__name__) -class ProtocolHeaders: +class ProtocolHeaders(metaclass=ABCMeta): # The endian_indicator and version number are added where needed and not included in the portion of the format here. FORMAT = 'Q32sH16s16s' HEADER_LENGTH = struct.calcsize(FORMAT) VERSION = 0 # 2 byte (16 bit) integer (H) - @abc.abstractmethod + @abstractmethod def __init__(self, data_length: int, method_name: str, request_id: int, sender_id, sender_token, **kwargs): if kwargs: _log.warning(f'Received extra kwargs for Proxy Protocol version {self.VERSION}: {list(kwargs.keys())}') @@ -28,7 +28,7 @@ def __init__(self, data_length: int, method_name: str, request_id: int, sender_i def bitflag_is_set(bit_position, byte_value): return bool((byte_value & (1 << bit_position)) >> bit_position) - @abc.abstractmethod + @abstractmethod def pack(self): # TODO: The sender_id is currently the proxy_id, which is a tuple. This should probably become a UUID? # (Need to figure out how/where to map one to the other.) @@ -36,12 +36,12 @@ def pack(self): self.method_name.encode('utf8'), self.request_id, self.sender_id.bytes, self.sender_token.bytes) - @abc.abstractmethod + @abstractmethod def unpack(self, header_bytes): pass def __repr__(self): - return f'ProtocolHeaders(FORMAT={self.FORMAT}, VERISON={self.VERSION},' \ + return f'ProtocolHeaders(FORMAT={self.FORMAT}, VERSION={self.VERSION},' \ f'data_length={self.data_length}, method_name={self.method_name},' \ f' request_id={self.request_id}, sender_id={self.sender_id}, sender_token={self.sender_token}' diff --git a/src/protocol_proxy/proxy/base.py b/src/protocol_proxy/proxy/base.py index 29619ed..882296f 100644 --- a/src/protocol_proxy/proxy/base.py +++ b/src/protocol_proxy/proxy/base.py @@ -10,8 +10,8 @@ # noinspection PyMissingConstructor -class ProtocolProxy(IPCConnector): - def __init__(self, *, manager_address: str, manager_port: int, manager_id: UUID, manager_token: UUID, +class ProtocolProxy(IPCConnector, metaclass=abc.ABCMeta): + def __init__(self, *, manager_address: str, manager_port: int, manager_id: UUID, registration_retry_delay: float = 20.0, **kwargs): """NOTE: Proxy implementations MUST: 1. Subclass a multitasking subclass of IPCConnector (gevent, asyncio, etc.) From c3b1b1b24fa0927454ac78793448d9dd5d87d98c Mon Sep 17 00:00:00 2001 From: "David M. Raker" Date: Sat, 27 Sep 2025 14:00:13 -0700 Subject: [PATCH 04/10] Added explicit None returns for clarity. --- src/protocol_proxy/ipc/decorator.py | 2 ++ src/protocol_proxy/manager/base.py | 1 + 2 files changed, 3 insertions(+) diff --git a/src/protocol_proxy/ipc/decorator.py b/src/protocol_proxy/ipc/decorator.py index 552ff58..0288805 100644 --- a/src/protocol_proxy/ipc/decorator.py +++ b/src/protocol_proxy/ipc/decorator.py @@ -14,6 +14,8 @@ def verify(self, ipc, headers, raw_message: Any): return func(self, headers, raw_message) else: _log.warning(f'Unable to authenticate caller: {headers.sender_id}') + return None else: _log.warning(f'Request from unknown party: {headers.sender_id}') + return None return verify diff --git a/src/protocol_proxy/manager/base.py b/src/protocol_proxy/manager/base.py index 8ad32bf..4a8754c 100644 --- a/src/protocol_proxy/manager/base.py +++ b/src/protocol_proxy/manager/base.py @@ -69,6 +69,7 @@ def get_proxy(cls, unique_remote_id: tuple, **kwargs) -> tuple[Self, ProtocolPro return manager, manager.get_proxy(unique_remote_id, **kwargs) except (ImportError, ValueError) as e: _log.warning(f'Unable to find a manager for get_proxy call: {e}') + return None def get_proxy_id(self, unique_remote_id: tuple | str) -> UUID: """Lookup or create a UUID for the proxy server From 4b0f0dd51a62be246cda5bf97e58afe317ef259b Mon Sep 17 00:00:00 2001 From: "David M. Raker" Date: Sat, 27 Sep 2025 14:00:54 -0700 Subject: [PATCH 05/10] Fix to timeout counter. --- src/protocol_proxy/ipc/gevent.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/protocol_proxy/ipc/gevent.py b/src/protocol_proxy/ipc/gevent.py index 2096a79..4d8a0c1 100644 --- a/src/protocol_proxy/ipc/gevent.py +++ b/src/protocol_proxy/ipc/gevent.py @@ -167,7 +167,7 @@ def _receive_socket(self, s: socket): remaining = headers.data_length buffer = b'' done = False - io_wait_time = 0.0 + io_wait_time = self.max_io_wait_seconds while not done: try: while chunk := s.recv(read_length := max(0, remaining if remaining < self.chunk_size else self.chunk_size)): From b7f7deba38ef7a1f6a64eae545172bb0986973eb Mon Sep 17 00:00:00 2001 From: "David M. Raker" Date: Sat, 27 Sep 2025 14:02:58 -0700 Subject: [PATCH 06/10] Updated variable names and import style for clarity. --- src/protocol_proxy/manager/asyncio.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/protocol_proxy/manager/asyncio.py b/src/protocol_proxy/manager/asyncio.py index 736358c..70a9ca8 100644 --- a/src/protocol_proxy/manager/asyncio.py +++ b/src/protocol_proxy/manager/asyncio.py @@ -4,7 +4,7 @@ import signal from abc import ABC -from asyncio.subprocess import Process +from asyncio import StreamReader, subprocess from uuid import uuid4 from typing import Type @@ -67,19 +67,19 @@ async def handle_peer_registration(self, headers: ProtocolHeaders, raw_message: return success @staticmethod - def _setup_exit(process: Process): + def _setup_exit(process: subprocess.Process): """Set up cleanup for the proxy process on exit.""" - def cleanup_func(process): - if process.returncode is None: + def cleanup_func(proc): + if proc.returncode is None: try: - process.terminate() + proc.terminate() except ProcessLookupError: pass asyncio.get_event_loop().add_signal_handler(signal.SIGTERM, cleanup_func, process) asyncio.get_event_loop().add_signal_handler(signal.SIGINT, cleanup_func, process) @staticmethod - def finalize_process(process: Process): + def finalize_process(process: subprocess.Process): try: process.kill() except ProcessLookupError: From 6b7fc6ffc1b5cf8f3b90e6c0ff04cc5df2dd4e87 Mon Sep 17 00:00:00 2001 From: "David M. Raker" Date: Sat, 27 Sep 2025 14:04:13 -0700 Subject: [PATCH 07/10] Implemented log forwarding from proxy to manager. --- src/protocol_proxy/manager/asyncio.py | 14 ++++++++++---- src/protocol_proxy/manager/base.py | 21 +++++++++++++++++++++ src/protocol_proxy/manager/gevent.py | 18 ++++++++++++------ src/protocol_proxy/proxy/launch.py | 7 +++++++ 4 files changed, 50 insertions(+), 10 deletions(-) diff --git a/src/protocol_proxy/manager/asyncio.py b/src/protocol_proxy/manager/asyncio.py index 70a9ca8..63e5251 100644 --- a/src/protocol_proxy/manager/asyncio.py +++ b/src/protocol_proxy/manager/asyncio.py @@ -40,21 +40,27 @@ async def wait_peer_registered(self, peer, timeout, func=None, *args, **kwargs): async def get_proxy(self, unique_remote_id: tuple, **kwargs) -> ProtocolProxyPeer: command, proxy_id, proxy_name = self._setup_proxy_process_command(unique_remote_id, **kwargs) # , proxy_env if command: - proxy_process = await asyncio.create_subprocess_exec(*command, stdin=asyncio.subprocess.PIPE) - # , stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE) - # TODO: Implement logging along lines of AIP.start_agent() (uncomment PIPES above too). + proxy_process = await asyncio.create_subprocess_exec(*command, stdin=asyncio.subprocess.PIPE, + stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE) + self.loop.create_task(self.log_subprocess_output(proxy_process.stdout)) + self.loop.create_task(self.log_subprocess_output(proxy_process.stderr)) _log.info(f"PPM: Created new ProtocolProxy {proxy_name} with ID {str(proxy_id)}, pid: {proxy_process.pid}") peer_token = uuid4() proxy_process.stdin.write(peer_token.hex.encode()) proxy_process.stdin.write(self.token.hex.encode()) await proxy_process.stdin.drain() proxy_process.stdin.close() - proxy_process.stdin = open(os.devnull) + proxy_process.stdin = None self.peers[proxy_id] = AsyncioProtocolProxyPeer(process=proxy_process, proxy_id=proxy_id, token=peer_token) self._setup_exit(proxy_process) atexit.register(self.finalize_process, proxy_process) return self.peers[proxy_id] + async def log_subprocess_output(self, stream: StreamReader): + while stream: + raw_line = await stream.readline() + self.log_subprocess_output_line(raw_line) + @callback async def handle_peer_registration(self, headers: ProtocolHeaders, raw_message: bytes): proxy: AsyncioProtocolProxyPeer | None = self.peers.get(headers.sender_id) diff --git a/src/protocol_proxy/manager/base.py b/src/protocol_proxy/manager/base.py index 4a8754c..dc1b238 100644 --- a/src/protocol_proxy/manager/base.py +++ b/src/protocol_proxy/manager/base.py @@ -133,3 +133,24 @@ def get_by_proxy_id(cls, proxy_id: UUID) -> tuple[Self | None, ProtocolProxyPeer if proxy_id in manager.peers: return manager, manager.peers[proxy_id] return None, None + + @abstractmethod + def log_subprocess_output(self, stream): + pass + + @staticmethod + def log_subprocess_output_line(raw_line): + line = raw_line.decode().strip() + try: + try: + entry = json.loads(line) + log = logging.getLogger(entry['name']) + log.log(logging.getLevelName(entry['level'].upper()), f":{entry['lineno']} {entry['message']}") + except (json.JSONDecodeError, KeyError, TypeError): + log = logging.getLogger(__name__) + log.log(logging.ERROR, line) + except UnicodeDecodeError: + log = logging.getLogger(__name__) + log.log(logging.ERROR, raw_line) + except Exception as e: + _log.error(f'Encountered unknown exception parsing logs from proxy: {e}') diff --git a/src/protocol_proxy/manager/gevent.py b/src/protocol_proxy/manager/gevent.py index cb5e728..633f741 100644 --- a/src/protocol_proxy/manager/gevent.py +++ b/src/protocol_proxy/manager/gevent.py @@ -3,11 +3,11 @@ import os from abc import ABC -from gevent import sleep, with_timeout +from gevent import sleep, spawn, with_timeout from gevent.subprocess import Popen, PIPE from gevent.timeout import Timeout from uuid import uuid4 -from typing import Type +from typing import IO, Type from ..ipc import callback, ProtocolHeaders, ProtocolProxyPeer from ..ipc.gevent import GeventIPCConnector, GeventProtocolProxyPeer @@ -43,10 +43,11 @@ def wait_for_peer_registration(): def get_proxy(self, unique_remote_id: tuple, **kwargs) -> ProtocolProxyPeer: command, proxy_id, proxy_name = self._setup_proxy_process_command(unique_remote_id, **kwargs) # , proxy_env if command: - # TODO: proxy_env parameter was added with block to discuss in super._setup_proxy_process_command(). Remove if that is. - proxy_process = Popen(command, stdin=PIPE) #, env=proxy_env) - # , stdout=PIPE, stderr=PIPE) - # TODO: Implement logging along lines of AIP.start_agent() (uncomment PIPES above too). + proxy_process = Popen(command, stdin=PIPE, stdout=PIPE, stderr=PIPE) + _log.info("proxy %s has PID %s", self.proxy_name, proxy_process.pid) + spawn(self.log_subprocess_output, proxy_process.stdout) + spawn(self.log_subprocess_output, proxy_process.stderr) + # TODO: Ensure that logging as implemented fits with VOLTTRON logging once that is fixed.. _log.info(f"PPM: Created new ProtocolProxy {proxy_name} with ID {str(proxy_id)}, pid: {proxy_process.pid}") new_peer_token = uuid4() proxy_process.stdin.write(new_peer_token.hex.encode()) @@ -61,6 +62,11 @@ def get_proxy(self, unique_remote_id: tuple, **kwargs) -> ProtocolProxyPeer: _log.debug(f"PPM: Proxy {proxy_id} created, waiting for registration before sending.") return self.peers[proxy_id] + def log_subprocess_output(self, stream: IO[bytes]): + """Reads lines from a pipe and logs them.""" + for raw_line in iter(stream.readline, b''): + self.log_subprocess_output_line(raw_line) + @callback def handle_peer_registration(self, headers: ProtocolHeaders, raw_message: bytes): return super().handle_peer_registration(headers, raw_message) diff --git a/src/protocol_proxy/proxy/launch.py b/src/protocol_proxy/proxy/launch.py index 85a6664..7b1b554 100644 --- a/src/protocol_proxy/proxy/launch.py +++ b/src/protocol_proxy/proxy/launch.py @@ -1,3 +1,4 @@ +import logging import sys from argparse import ArgumentParser @@ -5,6 +6,12 @@ from typing import Callable from uuid import UUID +logging.basicConfig( + level=logging.DEBUG, stream=sys.stdout, + format='{"name": "%(name)s", "lineno": "%(lineno)d", "level": "%(levelname)s", "message": "%(message)s"}' +) +_log = logging.getLogger(__name__) + def proxy_command_parser(parser: ArgumentParser = None): parser = parser if parser else ArgumentParser() parser.add_argument('--proxy-id', type=UUID, required=True) From e7f6e12b76f61f6c98a9bcd484cd7cfd6db3c28e Mon Sep 17 00:00:00 2001 From: "David M. Raker" Date: Sat, 27 Sep 2025 14:04:44 -0700 Subject: [PATCH 08/10] Removed unused parameter. --- src/protocol_proxy/proxy/asyncio.py | 2 +- src/protocol_proxy/proxy/gevent.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/protocol_proxy/proxy/asyncio.py b/src/protocol_proxy/proxy/asyncio.py index 7ecb18e..ea15b27 100644 --- a/src/protocol_proxy/proxy/asyncio.py +++ b/src/protocol_proxy/proxy/asyncio.py @@ -15,7 +15,7 @@ class AsyncioProtocolProxy(AsyncioIPCConnector, ProtocolProxy, ABC): def __init__(self, manager_address: str, manager_port: int, manager_id: UUID, manager_token: UUID, token: UUID, proxy_id: UUID, proxy_name: str = None, registration_retry_delay: float = 20.0, **kwargs): super(AsyncioProtocolProxy, self).__init__(manager_address=manager_address, manager_port=manager_port, - manager_id=manager_id, manager_token=manager_token, proxy_id=proxy_id, + manager_id=manager_id, proxy_id=proxy_id, registration_retry_delay=registration_retry_delay, token=token, proxy_name=proxy_name, **kwargs) self.peers[manager_id] = AsyncioProtocolProxyPeer(proxy_id=manager_id, socket_params=self.manager_params, diff --git a/src/protocol_proxy/proxy/gevent.py b/src/protocol_proxy/proxy/gevent.py index 5bd7e00..9879613 100644 --- a/src/protocol_proxy/proxy/gevent.py +++ b/src/protocol_proxy/proxy/gevent.py @@ -19,7 +19,7 @@ def __init__(self, *, proxy_id: UUID, token: UUID, manager_address: str, manager """ super(GeventProtocolProxy, self).__init__(proxy_id=proxy_id, token=token, proxy_name=proxy_name, manager_address=manager_address, manager_port=manager_port, - manager_id=manager_id, manager_token=manager_token, + manager_id=manager_id, registration_retry_delay=registration_retry_delay, **kwargs) self.peers[manager_id] = GeventProtocolProxyPeer(proxy_id=manager_id, socket_params=self.manager_params, token=manager_token) From d319d5c5a9fd8a2a6d7b9ad300d2b624bb78072b Mon Sep 17 00:00:00 2001 From: David Raker Date: Sat, 27 Sep 2025 14:54:01 -0700 Subject: [PATCH 09/10] Update src/protocol_proxy/ipc/decorator.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- src/protocol_proxy/ipc/decorator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/protocol_proxy/ipc/decorator.py b/src/protocol_proxy/ipc/decorator.py index 0288805..0d57db7 100644 --- a/src/protocol_proxy/ipc/decorator.py +++ b/src/protocol_proxy/ipc/decorator.py @@ -8,7 +8,7 @@ # TODO: Do we need an Asyncio version of this? # TODO: Did this work with the AsyncResult removed (just returns, possibly within greenlet)? def callback(func): - def verify(self, ipc, headers, raw_message: Any): + def verify(self, ipc, headers, raw_message: bytes): if peer := ipc.peers.get(headers.sender_id): if headers.sender_token == peer.token: return func(self, headers, raw_message) From e9a167fb08a33c44383b4b56253dc12558004640 Mon Sep 17 00:00:00 2001 From: "David M. Raker" Date: Sat, 27 Sep 2025 14:59:26 -0700 Subject: [PATCH 10/10] Corrected loop on StreamReader to end at_eof(). StreamReader itself is truthy. --- src/protocol_proxy/manager/asyncio.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/protocol_proxy/manager/asyncio.py b/src/protocol_proxy/manager/asyncio.py index 63e5251..ba04271 100644 --- a/src/protocol_proxy/manager/asyncio.py +++ b/src/protocol_proxy/manager/asyncio.py @@ -57,7 +57,7 @@ async def get_proxy(self, unique_remote_id: tuple, **kwargs) -> ProtocolProxyPee return self.peers[proxy_id] async def log_subprocess_output(self, stream: StreamReader): - while stream: + while not stream.at_eof(): raw_line = await stream.readline() self.log_subprocess_output_line(raw_line)