From 70d0f2b5a6ca81d94d62da3d704857b3b6b067d8 Mon Sep 17 00:00:00 2001 From: "David M. Raker" Date: Wed, 1 Oct 2025 13:57:56 -0700 Subject: [PATCH 01/11] Updates to README.md. --- README.md | 60 ++++++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 59 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 7361f0f..6a2f6a3 100644 --- a/README.md +++ b/README.md @@ -1 +1,59 @@ -# lib-protocol-proxy \ No newline at end of file +# Protocol Proxy +![Python 3.10](https://img.shields.io/badge/python-3.10-blue.svg) +![Python 3.11](https://img.shields.io/badge/python-3.11-blue.svg) +[![Passing?](https://github.com/eclipse-volttron/lib-protocol-proxy/actions/workflows/run-tests.yml/badge.svg)](https://github.com/eclipse-volttron/lib-protocol-proxy/actions/workflows/run-tests.yml) +[![pypi version](https://img.shields.io/pypi/v/protocol-proxy.svg)](https://pypi.org/project/protocol-proxy/) + +This library provides the user with the ability to automatically deploy and manager proxy processes for handling +network communication with remote devices using various protocols. A proxy to each remote peer is established in +a separate process from the managing application. A manager class handles socket communication between the proxy +subprocess and its owner. Individual protocols are implemented as plugins to this library. Integration with +event and asyncio event loops are supported for each of the proxy and manager processes. + + +## Automatically installed dependencies +- python = ">=3.10,<4.0" + +[//]: # (# Documentation) + +[//]: # (More detailed documentation can be found on [ReadTheDocs](https://eclipse-volttron.readthedocs.io/en/latest/external-docs/lib-protocol-proxy/index.html. The RST source) + +[//]: # (of the documentation for this component is located in the "docs" directory of this repository.) + +# Installation +This library can be installed using pip: + +```shell +pip install lib-protocol-proxy +``` + +Protocol Proxy plugins should include "protocol-proxy" as a requirement, so users of existing +plugins are encouraged to instead install the plugin for that pacakge directly. + +# Development +This library is maintained by the VOLTTRON Development Team. + +Please see the following [guidelines](https://github.com/eclipse-volttron/volttron-core/blob/develop/CONTRIBUTING.md) +for contributing to this and/or other VOLTTRON repositories. + +[//]: # (Please see the following helpful guide about [using the Protocol Proxy](https://github.com/eclipse-volttron/lib-protocol-proxy/blob/develop/developing_with_protocol_proxy.md)) + +[//]: # (in your VOLTTRON agent or other applications.) + +# Disclaimer Notice + +This material was prepared as an account of work sponsored by an agency of the +United States Government. Neither the United States Government nor the United +States Department of Energy, nor Battelle, nor any of their employees, nor any +jurisdiction or organization that has cooperated in the development of these +materials, makes any warranty, express or implied, or assumes any legal +liability or responsibility for the accuracy, completeness, or usefulness or any +information, apparatus, product, software, or process disclosed, or represents +that its use would not infringe privately owned rights. + +Reference herein to any specific commercial product, process, or service by +trade name, trademark, manufacturer, or otherwise does not necessarily +constitute or imply its endorsement, recommendation, or favoring by the United +States Government or any agency thereof, or Battelle Memorial Institute. The +views and opinions of authors expressed herein do not necessarily state or +reflect those of the United States Government or any agency thereof. From 7487dac4cc797ede8c177ba6b99dd6ce6435e8c7 Mon Sep 17 00:00:00 2001 From: "David M. Raker" Date: Wed, 22 Oct 2025 08:29:53 -0700 Subject: [PATCH 02/11] Added try blocks around use of getpeername in ipc.gevent. --- src/protocol_proxy/ipc/gevent.py | 31 +++++++++++++++++++++++++------ 1 file changed, 25 insertions(+), 6 deletions(-) diff --git a/src/protocol_proxy/ipc/gevent.py b/src/protocol_proxy/ipc/gevent.py index 4d8a0c1..293e987 100644 --- a/src/protocol_proxy/ipc/gevent.py +++ b/src/protocol_proxy/ipc/gevent.py @@ -146,12 +146,19 @@ def _receive_headers(self, s: socket) -> ProtocolHeaders | None: try: received = s.recv(2) if len(received) == 0: - _log.warning(f'{self.proxy_name} received closed socket from ({s.getpeername()}.') + try: + peer_name = f' from {s.getpeername()}.' + except OSError: + peer_name = '.' + _log.warning(f'{self.proxy_name} received closed socket {peer_name}') return None version_num = struct.unpack('>H', received)[0] if not (protocol := self.PROTOCOL_VERSION.get(version_num)): - raise NotImplementedError(f'Unknown protocol version ({version_num})' - f' received from: {s.getpeername()}') + try: + peer_name = f' received from: {s.getpeername()}.' + except OSError: + peer_name = '.' + raise NotImplementedError(f'Unknown protocol version ({version_num}){peer_name}') header_bytes = s.recv(protocol.HEADER_LENGTH) if len(header_bytes) == protocol.HEADER_LENGTH: return protocol.unpack(header_bytes) @@ -194,11 +201,19 @@ def _receive_socket(self, s: socket): s.close() done = True elif headers: + try: + peer_name = f' from {s.getpeername()}' + except OSError: + peer_name = '' _log.warning(f'{self.proxy_name}: Received unknown method name: {headers.method_name}' - f' from {s.getpeername()} with request ID: {headers.request_id}') + f' {peer_name} with request ID: {headers.request_id}') s.close() else: - _log.warning(f'{self.proxy_name}: Unable to read headers from socket: {s.getpeername()}') + try: + peer_name = f': {s.getpeername()}.' + except OSError: + peer_name = '.' + _log.warning(f'{self.proxy_name}: Unable to read headers from socket: {peer_name}') s.close() def _send_headers(self, s: socket, data_length: int, request_id: int, response_expected: bool, method_name: str, @@ -216,7 +231,11 @@ def _send_headers(self, s: socket, data_length: int, request_id: int, response_e def _send_socket(self, s: socket): _log.debug(f'{self.proxy_name}: IN SEND SOCKET') if not (message := self.outbound_messages.get(s)): - _log.warning(f'Outbound socket to {s.getpeername()} was ready, but no outbound message was found.') + try: + peer_name = f'to {s.getpeername()}' + except OSError: + peer_name = '' + _log.warning(f'Outbound socket to {peer_name} was ready, but no outbound message was found.') elif isinstance(message.payload, AsyncResult) and not message.payload.ready(): self.outbounds.add(s) _log.debug('IN SEND SOCKET, WAS ADDED BACK TO OUTBOUND BECAUSE ASYNC_RESULT WAS NOT READY.') From 4751e96c691d09a91bc8ce9c7e914346e8a75f1b Mon Sep 17 00:00:00 2001 From: riley206-pnnl Date: Wed, 12 Nov 2025 15:50:24 -0800 Subject: [PATCH 03/11] Refine inbound parameters extraction to handle IPv6 socket tuples correctly --- src/protocol_proxy/ipc/asyncio.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/protocol_proxy/ipc/asyncio.py b/src/protocol_proxy/ipc/asyncio.py index 8945de7..8878cd4 100644 --- a/src/protocol_proxy/ipc/asyncio.py +++ b/src/protocol_proxy/ipc/asyncio.py @@ -91,7 +91,10 @@ async def _setup_inbound_server(self, socket_params: SocketParams = None): f' on any port in range: {self.min_port} - {self.max_port}.') break else: - self.inbound_params = SocketParams(*self.inbound_server.sockets[0].getsockname()) + # Only take first 2 elements (host, port) from getsockname() + # IPv6 sockets return 4-tuple (host, port, flowinfo, scope_id) + sockname = self.inbound_server.sockets[0].getsockname() + self.inbound_params = SocketParams(sockname[0], sockname[1]) break async def start(self, *_, **__): From fbf255f466451c5d9fc2198f0c66cbac35f6f7d9 Mon Sep 17 00:00:00 2001 From: riley206-pnnl Date: Wed, 12 Nov 2025 16:32:56 -0800 Subject: [PATCH 04/11] Fix formatting in pyproject.toml and update version to 2.0.0rc2 --- pyproject.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index d58ffb3..099a91c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -6,7 +6,7 @@ build-backend = "poetry.core.masonry.api" profile = "black" [tool.mypy] -python_version = 3.10 +python_version = "3.10" show_error_context = true pretty = true show_column_numbers = true @@ -29,7 +29,7 @@ ignore_missing_imports = true [tool.poetry] name = "protocol-proxy" -version = "2.0.0rc0" +version = "2.0.0rc2" description = "A system for launching and communicating with a proxy application for network communication which runs in a separate process.." authors = ["The VOLTTRON Development Team "] license = "Apache License 2.0" From 148f2cb1fc267ce1b3e9b3940e16f8d88bc10805 Mon Sep 17 00:00:00 2001 From: riley206-pnnl Date: Wed, 12 Nov 2025 16:38:55 -0800 Subject: [PATCH 05/11] Refactor get_local_socket_params to handle IPv6 socket tuples correctly --- src/protocol_proxy/proxy/asyncio.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/protocol_proxy/proxy/asyncio.py b/src/protocol_proxy/proxy/asyncio.py index ea15b27..f1ca147 100644 --- a/src/protocol_proxy/proxy/asyncio.py +++ b/src/protocol_proxy/proxy/asyncio.py @@ -22,7 +22,10 @@ def __init__(self, manager_address: str, manager_port: int, manager_id: UUID, ma token=manager_token) def get_local_socket_params(self) -> SocketParams: - return self.inbound_server.sockets[0].getsockname() + # Only take first 2 elements (host, port) from getsockname() + # IPv6 sockets return 4-tuple (host, port, flowinfo, scope_id) + sockname = self.inbound_server.sockets[0].getsockname() + return SocketParams(sockname[0], sockname[1]) async def send_registration(self, remote: AsyncioProtocolProxyPeer): _log.debug(f"[send_registration] Attempting to register with manager at: {remote} (type={type(remote)})") From daa20dba1ac576e7a808be0748f6a6de24b96cfa Mon Sep 17 00:00:00 2001 From: "David M. Raker" Date: Wed, 22 Oct 2025 10:16:48 -0700 Subject: [PATCH 06/11] Stability improvements and fixes for multiple instantiations. --- src/protocol_proxy/ipc/base.py | 8 ++- src/protocol_proxy/ipc/gevent.py | 74 ++++++++++++++++++++-------- src/protocol_proxy/manager/base.py | 7 ++- src/protocol_proxy/manager/gevent.py | 6 ++- 4 files changed, 69 insertions(+), 26 deletions(-) diff --git a/src/protocol_proxy/ipc/base.py b/src/protocol_proxy/ipc/base.py index fe264ef..2403ea6 100644 --- a/src/protocol_proxy/ipc/base.py +++ b/src/protocol_proxy/ipc/base.py @@ -105,8 +105,12 @@ def next_request_id(self): return next(self._request_id) def register_callback(self, cb_method, method_name, provides_response=False, timeout=30.0): - _log.info(f'{self.proxy_name} registered callback: {method_name}') - self.callbacks[method_name] = ProtocolProxyCallback(cb_method, method_name, provides_response, timeout=timeout) + if not self.callbacks.get(method_name): + _log.info(f'{self.proxy_name} registered callback: {method_name}') + self.callbacks[method_name] = ProtocolProxyCallback(cb_method, method_name, provides_response, + timeout=timeout) + else: + _log.info(f'{self.proxy_name} confirmed callback: {method_name} is registered.') @abstractmethod def start(self, *_, **__): diff --git a/src/protocol_proxy/ipc/gevent.py b/src/protocol_proxy/ipc/gevent.py index 293e987..e4f0871 100644 --- a/src/protocol_proxy/ipc/gevent.py +++ b/src/protocol_proxy/ipc/gevent.py @@ -1,6 +1,7 @@ import logging import struct +from contextlib import contextmanager from dataclasses import dataclass from gevent import select, sleep, spawn from gevent.event import AsyncResult @@ -42,6 +43,9 @@ def _get_ip_addresses(self, host_name: str) -> set[str]: return {ai[4][0] for ai in getaddrinfo(host_name, None)} def _setup_inbound_server(self, socket_params: SocketParams = None): + if self.inbound_server_socket: + _log.debug('@@@@@@@ Using existing inbound server socket.') + return inbound_socket: socket = socket(AF_INET, SOCK_STREAM) inbound_socket.setblocking(False) if socket_params: @@ -73,6 +77,7 @@ def _setup_inbound_server(self, socket_params: SocketParams = None): _log.warning(f'{self.proxy_name}: Socket error listening on {self.inbound_params}: {e}') self.inbound_server_socket = inbound_socket self.inbounds.add(self.inbound_server_socket) + _log.info(f'@@@@@@@ Created new inbound server socket: {self.inbound_params}.') return @callback @@ -102,8 +107,8 @@ def send(self, remote: ProtocolProxyPeer, message: ProtocolProxyMessage) -> bool return False if message.request_id is None: message.request_id = self.next_request_id - self.outbounds.add(outbound) self.outbound_messages[outbound] = message + self.outbounds.add(outbound) if message.response_expected: async_result = AsyncResult() self.response_results[message.request_id] = async_result @@ -122,9 +127,12 @@ def select_loop(self): else: for s in readable: # Handle incoming sockets. if s is self.inbound_server_socket: # The server socket is ready to accept a connection - client_socket, client_address = s.accept() - client_socket.setblocking(0) - self.inbounds.add(client_socket) + try: + client_socket, client_address = s.accept() + client_socket.setblocking(0) + self.inbounds.add(client_socket) + except BlockingIOError: + pass else: self.inbounds.discard(s) spawn(self._receive_socket, s) @@ -142,39 +150,63 @@ def select_loop(self): finally: s.close() - def _receive_headers(self, s: socket) -> ProtocolHeaders | None: + @contextmanager + def _non_blocking_socket(self, func, io_wait_time, *args, **kwargs): + _log.debug(f'NEW CALL TO _NON_BLOCKING_SOCKET: FUNC: "{func}", IO_WAIT_TIME: {io_wait_time}, ARGS: {args}, KWARGS: {kwargs}') + done = False + while not done: + try: + _log.debug(f'CALLING FUNC "{func}" with ARGS: {args} and KWARGS: {kwargs}') + ret_val = func(*args, **kwargs) + _log.debug(f'RETURNING: {ret_val}') + done = True + yield ret_val, io_wait_time + break + except BlockingIOError as e: + io_wait_time -= 0.1 + sleep(0.1) + if io_wait_time <= 0: + _log.info(f'Timed out after {self.max_io_wait_seconds} seconds with BlockingIOError: {e}') + done = True + finally: + _log.debug('IN FINALLY OF _NON_BLOCKING_SOCKET') + + def _receive_headers(self, s: socket) -> tuple[ProtocolHeaders | None, float]: try: - received = s.recv(2) - if len(received) == 0: - try: - peer_name = f' from {s.getpeername()}.' - except OSError: - peer_name = '.' - _log.warning(f'{self.proxy_name} received closed socket {peer_name}') - return None - version_num = struct.unpack('>H', received)[0] + with self._non_blocking_socket(s.recv, self.max_io_wait_seconds, 2) as (version_bytes, remaining_time): + if len(version_bytes) == 0: + try: + peer_name = f' from {s.getpeername()}.' + except OSError: + peer_name = '.' + _log.warning(f'{self.proxy_name} received closed socket {peer_name}') + return None, remaining_time + version_num = struct.unpack('>H', version_bytes)[0] if not (protocol := self.PROTOCOL_VERSION.get(version_num)): try: peer_name = f' received from: {s.getpeername()}.' except OSError: peer_name = '.' raise NotImplementedError(f'Unknown protocol version ({version_num}){peer_name}') - header_bytes = s.recv(protocol.HEADER_LENGTH) - if len(header_bytes) == protocol.HEADER_LENGTH: - return protocol.unpack(header_bytes) - else: - _log.warning(f'Failed to read headers. Received {len(header_bytes)} bytes: {header_bytes}') + with self._non_blocking_socket(s.recv, remaining_time, protocol.HEADER_LENGTH + ) as (header_bytes, remaining_time): + if len(header_bytes) == protocol.HEADER_LENGTH: + return protocol.unpack(header_bytes), remaining_time + else: + _log.warning(f'Failed to unpack headers. For header length of {protocol.HEADER_LENGTH},' + f' received {len(header_bytes)} bytes: {header_bytes}') + return None, remaining_time except (OSError, Exception) as e: _log.warning(f'{self.proxy_name}: Socket exception reading headers: {e}') + return None, remaining_time if 'remaining_time' in locals() else self.max_io_wait_seconds def _receive_socket(self, s: socket): _log.debug(f'{self.proxy_name}: IN RECEIVE SOCKET') - headers = self._receive_headers(s) + headers, io_wait_time = self._receive_headers(s) if headers is not None and (cb_info := self.callbacks.get(headers.method_name)): remaining = headers.data_length buffer = b'' done = False - io_wait_time = self.max_io_wait_seconds while not done: try: while chunk := s.recv(read_length := max(0, remaining if remaining < self.chunk_size else self.chunk_size)): diff --git a/src/protocol_proxy/manager/base.py b/src/protocol_proxy/manager/base.py index dc1b238..4aaf05f 100644 --- a/src/protocol_proxy/manager/base.py +++ b/src/protocol_proxy/manager/base.py @@ -33,12 +33,14 @@ def wait_peer_registered(self, peer, timeout, func=None, *args, **kwargs): """ def _setup_proxy_process_command(self, unique_remote_id: tuple, **kwargs) -> tuple: - _log.debug(f'UAI is: {unique_remote_id}') + _log.debug(f'@@@@@@@@@ PASSED UAI is: {unique_remote_id}') unique_remote_id = self.proxy_class.get_unique_remote_id(unique_remote_id) - _log.debug(f'UAI is: {unique_remote_id}') + _log.debug(f'@@@@@@@@@ UAI AFTER LOOKUP is: {unique_remote_id}') proxy_id = self.get_proxy_id(unique_remote_id) + _log.debug('@@@@@@@@@ PROXY ID is: {}'.format(proxy_id)) proxy_name = str(unique_remote_id) if proxy_id not in self.peers: + _log.debug(f'@@@@@@@@@ PROXY_ID IS NOT IN PEERS. SETTING UP COMMAND.') module, func = self.proxy_class.__module__, self.proxy_class.__name__ protocol_specific_params = [i for pair in [(f"--{k.replace('_', '-')}", v) for k, v in kwargs.items()] for i in pair] @@ -46,6 +48,7 @@ def _setup_proxy_process_command(self, unique_remote_id: tuple, **kwargs) -> tup '--manager-id', self.proxy_id.hex, '--manager-address', self.inbound_params.address, '--manager-port', str(self.inbound_params.port), *protocol_specific_params] else: + _log.debug(f'@@@@@@@@@ PROXY_ID IS IN PEERS. NOT SETTING UP COMMAND.') command = None return command, proxy_id, proxy_name diff --git a/src/protocol_proxy/manager/gevent.py b/src/protocol_proxy/manager/gevent.py index 633f741..49285e5 100644 --- a/src/protocol_proxy/manager/gevent.py +++ b/src/protocol_proxy/manager/gevent.py @@ -28,6 +28,7 @@ def wait_peer_registered(self, peer, timeout, func=None, *args, **kwargs): def wait_for_peer_registration(): while peer.socket_params is None: sleep(0.1) + try: with peer.ready: with_timeout(timeout, wait_for_peer_registration) @@ -41,10 +42,12 @@ def wait_for_peer_registration(): del self.peers[peer.proxy_id] def get_proxy(self, unique_remote_id: tuple, **kwargs) -> ProtocolProxyPeer: + _log.debug('@@@@@@@ IN GET_PROXY @@@@@@@') command, proxy_id, proxy_name = self._setup_proxy_process_command(unique_remote_id, **kwargs) # , proxy_env + _log.debug(f'@@@@@@@ AFTER SETUP_PROXY_PROCESS_COMMAND: {(command, proxy_id, proxy_name)} @@@@@@@') if command: proxy_process = Popen(command, stdin=PIPE, stdout=PIPE, stderr=PIPE) - _log.info("proxy %s has PID %s", self.proxy_name, proxy_process.pid) + _log.info(f'proxy {proxy_name} has PID {proxy_process.pid}') spawn(self.log_subprocess_output, proxy_process.stdout) spawn(self.log_subprocess_output, proxy_process.stderr) # TODO: Ensure that logging as implemented fits with VOLTTRON logging once that is fixed.. @@ -60,6 +63,7 @@ def get_proxy(self, unique_remote_id: tuple, **kwargs) -> ProtocolProxyPeer: atexit.register(self._cleanup_proxy_process, proxy_process) # Do NOT send to the proxy until it has registered and socket_params is set! _log.debug(f"PPM: Proxy {proxy_id} created, waiting for registration before sending.") + _log.debug(f'@@@@@@@ GET_PROXY WILL RETURN PEER: {self.peers[proxy_id]} @@@@@@@') return self.peers[proxy_id] def log_subprocess_output(self, stream: IO[bytes]): From de6f1cd10862cefdc6089d1199cec02d79960711 Mon Sep 17 00:00:00 2001 From: "David M. Raker" Date: Mon, 24 Nov 2025 08:25:49 -0800 Subject: [PATCH 07/11] Added option to separately log proxy to aid in debugging scenarios. --- src/protocol_proxy/proxy/launch.py | 27 +++++++++++++++++---------- 1 file changed, 17 insertions(+), 10 deletions(-) diff --git a/src/protocol_proxy/proxy/launch.py b/src/protocol_proxy/proxy/launch.py index 7b1b554..f124565 100644 --- a/src/protocol_proxy/proxy/launch.py +++ b/src/protocol_proxy/proxy/launch.py @@ -1,4 +1,5 @@ import logging +from os import environ import sys from argparse import ArgumentParser @@ -6,9 +7,11 @@ from typing import Callable from uuid import UUID +optional_log_params = {'filename': proxy_log} if (proxy_log := environ.get('PROTOCOL_PROXY_LOG')) else {} logging.basicConfig( level=logging.DEBUG, stream=sys.stdout, - format='{"name": "%(name)s", "lineno": "%(lineno)d", "level": "%(levelname)s", "message": "%(message)s"}' + format='{"name": "%(name)s", "lineno": "%(lineno)d", "level": "%(levelname)s", "message": "%(message)s"}', + **optional_log_params ) _log = logging.getLogger(__name__) @@ -30,12 +33,16 @@ def proxy_command_parser(parser: ArgumentParser = None): return parser def launch(launcher_func: Callable): - parser = proxy_command_parser() - parser, proxy_runner = launcher_func(parser) - opts = parser.parse_args() - proxy_token = UUID(hex=sys.stdin.buffer.read(32).decode('utf8')) - manager_token = UUID(hex=sys.stdin.buffer.read(32).decode('utf8')) - if iscoroutinefunction(proxy_runner): - run(proxy_runner(token=proxy_token, manager_token=manager_token, **vars(opts))) - else: - proxy_runner(token=proxy_token, manager_token=manager_token, **vars(opts)) + try: + parser = proxy_command_parser() + parser, proxy_runner = launcher_func(parser) + opts = parser.parse_args() + _log.info(f'Launching Proxy with parameters: {opts}') + proxy_token = UUID(hex=sys.stdin.buffer.read(32).decode('utf8')) + manager_token = UUID(hex=sys.stdin.buffer.read(32).decode('utf8')) + if iscoroutinefunction(proxy_runner): + run(proxy_runner(token=proxy_token, manager_token=manager_token, **vars(opts))) + else: + proxy_runner(token=proxy_token, manager_token=manager_token, **vars(opts)) + except BaseException as e: + _log.debug(f'Proxy Launch: Launcher caught exception: {e}') From 72bb24fcb2417fed440110e8a3e77da8919e7afc Mon Sep 17 00:00:00 2001 From: "David M. Raker" Date: Tue, 30 Dec 2025 13:27:50 -0800 Subject: [PATCH 08/11] Commented warnings triggered at the end of every read. Replaced with TODOs. These seem to be triggered at times that are not really errors, but it is not clear yet why. --- src/protocol_proxy/ipc/gevent.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/protocol_proxy/ipc/gevent.py b/src/protocol_proxy/ipc/gevent.py index e4f0871..176941f 100644 --- a/src/protocol_proxy/ipc/gevent.py +++ b/src/protocol_proxy/ipc/gevent.py @@ -197,7 +197,7 @@ def _receive_headers(self, s: socket) -> tuple[ProtocolHeaders | None, float]: f' received {len(header_bytes)} bytes: {header_bytes}') return None, remaining_time except (OSError, Exception) as e: - _log.warning(f'{self.proxy_name}: Socket exception reading headers: {e}') + # TODO: Why is this getting triggered at end of transmissions? _log.warning(f'{self.proxy_name}: Socket exception reading headers: {e}') return None, remaining_time if 'remaining_time' in locals() else self.max_io_wait_seconds def _receive_socket(self, s: socket): @@ -245,7 +245,7 @@ def _receive_socket(self, s: socket): peer_name = f': {s.getpeername()}.' except OSError: peer_name = '.' - _log.warning(f'{self.proxy_name}: Unable to read headers from socket: {peer_name}') + # TODO: Why is this getting triggered at end of transmissions? _log.warning(f'{self.proxy_name}: Unable to read headers from socket: {peer_name}') s.close() def _send_headers(self, s: socket, data_length: int, request_id: int, response_expected: bool, method_name: str, From 453f6e6b76bfd4dd03e8951a3d9cb994a67f3238 Mon Sep 17 00:00:00 2001 From: "David M. Raker" Date: Tue, 6 Jan 2026 11:15:39 -0800 Subject: [PATCH 09/11] Commented warning at end of every write. --- src/protocol_proxy/ipc/gevent.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/protocol_proxy/ipc/gevent.py b/src/protocol_proxy/ipc/gevent.py index 176941f..763d0f4 100644 --- a/src/protocol_proxy/ipc/gevent.py +++ b/src/protocol_proxy/ipc/gevent.py @@ -267,7 +267,7 @@ def _send_socket(self, s: socket): peer_name = f'to {s.getpeername()}' except OSError: peer_name = '' - _log.warning(f'Outbound socket to {peer_name} was ready, but no outbound message was found.') + # TODO: Why is this getting triggered (apparently on every send)? _log.warning(f'Outbound socket to {peer_name} was ready, but no outbound message was found.') elif isinstance(message.payload, AsyncResult) and not message.payload.ready(): self.outbounds.add(s) _log.debug('IN SEND SOCKET, WAS ADDED BACK TO OUTBOUND BECAUSE ASYNC_RESULT WAS NOT READY.') From 2a714cfe7f0278de9113232314fbf609ab0ac88b Mon Sep 17 00:00:00 2001 From: "David M. Raker" Date: Thu, 8 Jan 2026 10:48:55 -0800 Subject: [PATCH 10/11] Commented extraneous debug statements. --- src/protocol_proxy/ipc/asyncio.py | 6 +++--- src/protocol_proxy/ipc/gevent.py | 22 +++++++++++----------- src/protocol_proxy/manager/base.py | 10 +++++----- src/protocol_proxy/manager/gevent.py | 8 ++++---- src/protocol_proxy/proxy/asyncio.py | 4 ++-- src/protocol_proxy/proxy/base.py | 2 +- src/protocol_proxy/proxy/launch.py | 2 +- 7 files changed, 27 insertions(+), 27 deletions(-) diff --git a/src/protocol_proxy/ipc/asyncio.py b/src/protocol_proxy/ipc/asyncio.py index 8878cd4..2f6685d 100644 --- a/src/protocol_proxy/ipc/asyncio.py +++ b/src/protocol_proxy/ipc/asyncio.py @@ -83,7 +83,7 @@ async def _setup_inbound_server(self, socket_params: SocketParams = None): next_port = next(self.unused_ports(await self._get_ip_addresses(socket_params.address))) self.inbound_server = await self.loop.create_server(factory, socket_params.address, next_port, start_serving=True) - _log.debug(f'{self.proxy_name} AFTER START SERVING. Server is: {self.inbound_server}') + #_log.debug(f'{self.proxy_name} AFTER START SERVING. Server is: {self.inbound_server}') except OSError: continue except StopIteration: @@ -99,7 +99,7 @@ async def _setup_inbound_server(self, socket_params: SocketParams = None): async def start(self, *_, **__): await self._setup_inbound_server(self.inbound_params) - _log.debug(f' {self.proxy_name} STARTED with INBOUND PARAMS SENT AS: {self.inbound_params}.') + #_log.debug(f' {self.proxy_name} STARTED with INBOUND PARAMS SENT AS: {self.inbound_params}.') async def stop(self): self.inbound_server.close() @@ -185,7 +185,7 @@ async def _run_callback(self, callback_info: ProtocolProxyCallback, headers, dat self.transport.close() def connection_made(self, transport: Transport): - _log.debug(f"[IPCProtocol] connection_made: transport={transport}") + #_log.debug(f"[IPCProtocol] connection_made: transport={transport}") try: self.transport = transport if self.outgoing_message: diff --git a/src/protocol_proxy/ipc/gevent.py b/src/protocol_proxy/ipc/gevent.py index 763d0f4..8e65a15 100644 --- a/src/protocol_proxy/ipc/gevent.py +++ b/src/protocol_proxy/ipc/gevent.py @@ -44,7 +44,7 @@ def _get_ip_addresses(self, host_name: str) -> set[str]: def _setup_inbound_server(self, socket_params: SocketParams = None): if self.inbound_server_socket: - _log.debug('@@@@@@@ Using existing inbound server socket.') + #_log.debug('@@@@@@@ Using existing inbound server socket.') return inbound_socket: socket = socket(AF_INET, SOCK_STREAM) inbound_socket.setblocking(False) @@ -152,13 +152,13 @@ def select_loop(self): @contextmanager def _non_blocking_socket(self, func, io_wait_time, *args, **kwargs): - _log.debug(f'NEW CALL TO _NON_BLOCKING_SOCKET: FUNC: "{func}", IO_WAIT_TIME: {io_wait_time}, ARGS: {args}, KWARGS: {kwargs}') + #_log.debug(f'NEW CALL TO _NON_BLOCKING_SOCKET: FUNC: "{func}", IO_WAIT_TIME: {io_wait_time}, ARGS: {args}, KWARGS: {kwargs}') done = False while not done: try: - _log.debug(f'CALLING FUNC "{func}" with ARGS: {args} and KWARGS: {kwargs}') + #_log.debug(f'CALLING FUNC "{func}" with ARGS: {args} and KWARGS: {kwargs}') ret_val = func(*args, **kwargs) - _log.debug(f'RETURNING: {ret_val}') + #_log.debug(f'RETURNING: {ret_val}') done = True yield ret_val, io_wait_time break @@ -168,8 +168,8 @@ def _non_blocking_socket(self, func, io_wait_time, *args, **kwargs): if io_wait_time <= 0: _log.info(f'Timed out after {self.max_io_wait_seconds} seconds with BlockingIOError: {e}') done = True - finally: - _log.debug('IN FINALLY OF _NON_BLOCKING_SOCKET') + #finally: + #_log.debug('IN FINALLY OF _NON_BLOCKING_SOCKET') def _receive_headers(self, s: socket) -> tuple[ProtocolHeaders | None, float]: try: @@ -201,7 +201,7 @@ def _receive_headers(self, s: socket) -> tuple[ProtocolHeaders | None, float]: return None, remaining_time if 'remaining_time' in locals() else self.max_io_wait_seconds def _receive_socket(self, s: socket): - _log.debug(f'{self.proxy_name}: IN RECEIVE SOCKET') + #_log.debug(f'{self.proxy_name}: IN RECEIVE SOCKET') headers, io_wait_time = self._receive_headers(s) if headers is not None and (cb_info := self.callbacks.get(headers.method_name)): remaining = headers.data_length @@ -261,7 +261,7 @@ def _send_headers(self, s: socket, data_length: int, request_id: int, response_e f' (request_id: {request_id}): {e}') def _send_socket(self, s: socket): - _log.debug(f'{self.proxy_name}: IN SEND SOCKET') + #_log.debug(f'{self.proxy_name}: IN SEND SOCKET') if not (message := self.outbound_messages.get(s)): try: peer_name = f'to {s.getpeername()}' @@ -270,12 +270,12 @@ def _send_socket(self, s: socket): # TODO: Why is this getting triggered (apparently on every send)? _log.warning(f'Outbound socket to {peer_name} was ready, but no outbound message was found.') elif isinstance(message.payload, AsyncResult) and not message.payload.ready(): self.outbounds.add(s) - _log.debug('IN SEND SOCKET, WAS ADDED BACK TO OUTBOUND BECAUSE ASYNC_RESULT WAS NOT READY.') + #_log.debug('IN SEND SOCKET, WAS ADDED BACK TO OUTBOUND BECAUSE ASYNC_RESULT WAS NOT READY.') else: payload = message.payload.get() if isinstance(message.payload, Greenlet) else message.payload self._send_headers(s, len(payload), message.request_id, message.response_expected, message.method_name) try: - _log.debug('REACHED SENDALL IN GEVENT IPC SEND') + #_log.debug('REACHED SENDALL IN GEVENT IPC SEND') s.sendall(payload) # TODO: Should we send in chunks and sleep in between? if message.response_expected: self.inbounds.add(s) @@ -306,7 +306,7 @@ def _handle_exceptional_socket(self, s: socket): def start(self, *_, **__): self._setup_inbound_server(self.inbound_params) - _log.debug(f'{self.proxy_name} STARTED.') + #_log.debug(f'{self.proxy_name} STARTED.') def stop(self): self._stop = True diff --git a/src/protocol_proxy/manager/base.py b/src/protocol_proxy/manager/base.py index 4aaf05f..ac347f4 100644 --- a/src/protocol_proxy/manager/base.py +++ b/src/protocol_proxy/manager/base.py @@ -33,14 +33,14 @@ def wait_peer_registered(self, peer, timeout, func=None, *args, **kwargs): """ def _setup_proxy_process_command(self, unique_remote_id: tuple, **kwargs) -> tuple: - _log.debug(f'@@@@@@@@@ PASSED UAI is: {unique_remote_id}') + #_log.debug(f'@@@@@@@@@ PASSED UAI is: {unique_remote_id}') unique_remote_id = self.proxy_class.get_unique_remote_id(unique_remote_id) - _log.debug(f'@@@@@@@@@ UAI AFTER LOOKUP is: {unique_remote_id}') + #_log.debug(f'@@@@@@@@@ UAI AFTER LOOKUP is: {unique_remote_id}') proxy_id = self.get_proxy_id(unique_remote_id) - _log.debug('@@@@@@@@@ PROXY ID is: {}'.format(proxy_id)) + #_log.debug('@@@@@@@@@ PROXY ID is: {}'.format(proxy_id)) proxy_name = str(unique_remote_id) if proxy_id not in self.peers: - _log.debug(f'@@@@@@@@@ PROXY_ID IS NOT IN PEERS. SETTING UP COMMAND.') + #_log.debug(f'@@@@@@@@@ PROXY_ID IS NOT IN PEERS. SETTING UP COMMAND.') module, func = self.proxy_class.__module__, self.proxy_class.__name__ protocol_specific_params = [i for pair in [(f"--{k.replace('_', '-')}", v) for k, v in kwargs.items()] for i in pair] @@ -48,7 +48,7 @@ def _setup_proxy_process_command(self, unique_remote_id: tuple, **kwargs) -> tup '--manager-id', self.proxy_id.hex, '--manager-address', self.inbound_params.address, '--manager-port', str(self.inbound_params.port), *protocol_specific_params] else: - _log.debug(f'@@@@@@@@@ PROXY_ID IS IN PEERS. NOT SETTING UP COMMAND.') + #_log.debug(f'@@@@@@@@@ PROXY_ID IS IN PEERS. NOT SETTING UP COMMAND.') command = None return command, proxy_id, proxy_name diff --git a/src/protocol_proxy/manager/gevent.py b/src/protocol_proxy/manager/gevent.py index 49285e5..9c63548 100644 --- a/src/protocol_proxy/manager/gevent.py +++ b/src/protocol_proxy/manager/gevent.py @@ -42,9 +42,9 @@ def wait_for_peer_registration(): del self.peers[peer.proxy_id] def get_proxy(self, unique_remote_id: tuple, **kwargs) -> ProtocolProxyPeer: - _log.debug('@@@@@@@ IN GET_PROXY @@@@@@@') + #_log.debug('@@@@@@@ IN GET_PROXY @@@@@@@') command, proxy_id, proxy_name = self._setup_proxy_process_command(unique_remote_id, **kwargs) # , proxy_env - _log.debug(f'@@@@@@@ AFTER SETUP_PROXY_PROCESS_COMMAND: {(command, proxy_id, proxy_name)} @@@@@@@') + #_log.debug(f'@@@@@@@ AFTER SETUP_PROXY_PROCESS_COMMAND: {(command, proxy_id, proxy_name)} @@@@@@@') if command: proxy_process = Popen(command, stdin=PIPE, stdout=PIPE, stderr=PIPE) _log.info(f'proxy {proxy_name} has PID {proxy_process.pid}') @@ -62,8 +62,8 @@ def get_proxy(self, unique_remote_id: tuple, **kwargs) -> ProtocolProxyPeer: token=new_peer_token) atexit.register(self._cleanup_proxy_process, proxy_process) # Do NOT send to the proxy until it has registered and socket_params is set! - _log.debug(f"PPM: Proxy {proxy_id} created, waiting for registration before sending.") - _log.debug(f'@@@@@@@ GET_PROXY WILL RETURN PEER: {self.peers[proxy_id]} @@@@@@@') + _log.info(f"PPM: Proxy {proxy_id} created, waiting for registration before sending.") + #_log.debug(f'@@@@@@@ GET_PROXY WILL RETURN PEER: {self.peers[proxy_id]} @@@@@@@') return self.peers[proxy_id] def log_subprocess_output(self, stream: IO[bytes]): diff --git a/src/protocol_proxy/proxy/asyncio.py b/src/protocol_proxy/proxy/asyncio.py index f1ca147..a17c733 100644 --- a/src/protocol_proxy/proxy/asyncio.py +++ b/src/protocol_proxy/proxy/asyncio.py @@ -28,12 +28,12 @@ def get_local_socket_params(self) -> SocketParams: return SocketParams(sockname[0], sockname[1]) async def send_registration(self, remote: AsyncioProtocolProxyPeer): - _log.debug(f"[send_registration] Attempting to register with manager at: {remote} (type={type(remote)})") + _log.info(f"Attempting to register with manager at: {remote} (type={type(remote)})") message = self._get_registration_message() manager_response = await self.send(remote, message) success_bytes = await manager_response if isinstance(manager_response, Future) else manager_response success = json.loads(success_bytes.decode('utf8')) - _log.debug(f'{self.proxy_name} IN SEND REGISTRATION, FUTURE.RESULT() IS : {success}') + #_log.debug(f'{self.proxy_name} IN SEND REGISTRATION, FUTURE.RESULT() IS : {success}') # TODO: Implement error handling and failure (along lines of copy below from gevent version (but working): # tries_remaining = 2 # if not success: diff --git a/src/protocol_proxy/proxy/base.py b/src/protocol_proxy/proxy/base.py index 882296f..d5fd9f8 100644 --- a/src/protocol_proxy/proxy/base.py +++ b/src/protocol_proxy/proxy/base.py @@ -20,7 +20,7 @@ def __init__(self, *, manager_address: str, manager_port: int, manager_id: UUID, 4. Create a ProtocolProxyPeer subclass for the manager and store it in self.peers. 5. Call send_registration asynchronously in their constructor after super calls. """ - _log.debug('PP: IN INIT.') + #_log.debug('PP: IN INIT.') super(ProtocolProxy, self).__init__(**kwargs) self.registration_retry_delay: float = registration_retry_delay self.manager_params = SocketParams(manager_address, manager_port) diff --git a/src/protocol_proxy/proxy/launch.py b/src/protocol_proxy/proxy/launch.py index f124565..c914cf8 100644 --- a/src/protocol_proxy/proxy/launch.py +++ b/src/protocol_proxy/proxy/launch.py @@ -45,4 +45,4 @@ def launch(launcher_func: Callable): else: proxy_runner(token=proxy_token, manager_token=manager_token, **vars(opts)) except BaseException as e: - _log.debug(f'Proxy Launch: Launcher caught exception: {e}') + _log.warning(f'Proxy Launch: Launcher caught exception: {e}') From f0ded7f01ba509b747a15afc4ff41968f29de1d2 Mon Sep 17 00:00:00 2001 From: "David M. Raker" Date: Thu, 8 Jan 2026 16:22:52 -0800 Subject: [PATCH 11/11] Commented extraneous debug statements. --- src/protocol_proxy/ipc/asyncio.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/protocol_proxy/ipc/asyncio.py b/src/protocol_proxy/ipc/asyncio.py index 2f6685d..b438320 100644 --- a/src/protocol_proxy/ipc/asyncio.py +++ b/src/protocol_proxy/ipc/asyncio.py @@ -205,8 +205,9 @@ def _message_to_bytes(self, message: ProtocolProxyMessage): def connection_lost(self, exc): try: + pass # _log.debug(f'{self.connector.proxy_name} -- Connection lost, exc: "{exc}"') - _log.debug(f'self.on_con_lost is a {type(self.on_con_lost)} with value: {self.on_con_lost}') + # _log.debug(f'self.on_con_lost is a {type(self.on_con_lost)} with value: {self.on_con_lost}') # if self.on_con_lost is not None: # self.on_con_lost.set_result(True) # TODO: What is using the on_con_lost thing? except Exception as e: