diff --git a/README.md b/README.md index 7361f0f..6a2f6a3 100644 --- a/README.md +++ b/README.md @@ -1 +1,59 @@ -# lib-protocol-proxy \ No newline at end of file +# Protocol Proxy +![Python 3.10](https://img.shields.io/badge/python-3.10-blue.svg) +![Python 3.11](https://img.shields.io/badge/python-3.11-blue.svg) +[![Passing?](https://github.com/eclipse-volttron/lib-protocol-proxy/actions/workflows/run-tests.yml/badge.svg)](https://github.com/eclipse-volttron/lib-protocol-proxy/actions/workflows/run-tests.yml) +[![pypi version](https://img.shields.io/pypi/v/protocol-proxy.svg)](https://pypi.org/project/protocol-proxy/) + +This library provides the user with the ability to automatically deploy and manager proxy processes for handling +network communication with remote devices using various protocols. A proxy to each remote peer is established in +a separate process from the managing application. A manager class handles socket communication between the proxy +subprocess and its owner. Individual protocols are implemented as plugins to this library. Integration with +event and asyncio event loops are supported for each of the proxy and manager processes. + + +## Automatically installed dependencies +- python = ">=3.10,<4.0" + +[//]: # (# Documentation) + +[//]: # (More detailed documentation can be found on [ReadTheDocs](https://eclipse-volttron.readthedocs.io/en/latest/external-docs/lib-protocol-proxy/index.html. The RST source) + +[//]: # (of the documentation for this component is located in the "docs" directory of this repository.) + +# Installation +This library can be installed using pip: + +```shell +pip install lib-protocol-proxy +``` + +Protocol Proxy plugins should include "protocol-proxy" as a requirement, so users of existing +plugins are encouraged to instead install the plugin for that pacakge directly. + +# Development +This library is maintained by the VOLTTRON Development Team. + +Please see the following [guidelines](https://github.com/eclipse-volttron/volttron-core/blob/develop/CONTRIBUTING.md) +for contributing to this and/or other VOLTTRON repositories. + +[//]: # (Please see the following helpful guide about [using the Protocol Proxy](https://github.com/eclipse-volttron/lib-protocol-proxy/blob/develop/developing_with_protocol_proxy.md)) + +[//]: # (in your VOLTTRON agent or other applications.) + +# Disclaimer Notice + +This material was prepared as an account of work sponsored by an agency of the +United States Government. Neither the United States Government nor the United +States Department of Energy, nor Battelle, nor any of their employees, nor any +jurisdiction or organization that has cooperated in the development of these +materials, makes any warranty, express or implied, or assumes any legal +liability or responsibility for the accuracy, completeness, or usefulness or any +information, apparatus, product, software, or process disclosed, or represents +that its use would not infringe privately owned rights. + +Reference herein to any specific commercial product, process, or service by +trade name, trademark, manufacturer, or otherwise does not necessarily +constitute or imply its endorsement, recommendation, or favoring by the United +States Government or any agency thereof, or Battelle Memorial Institute. The +views and opinions of authors expressed herein do not necessarily state or +reflect those of the United States Government or any agency thereof. diff --git a/pyproject.toml b/pyproject.toml index c258a95..099a91c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -6,7 +6,7 @@ build-backend = "poetry.core.masonry.api" profile = "black" [tool.mypy] -python_version = 3.10 +python_version = "3.10" show_error_context = true pretty = true show_column_numbers = true @@ -29,7 +29,7 @@ ignore_missing_imports = true [tool.poetry] name = "protocol-proxy" -version = "2.0.0rc1" +version = "2.0.0rc2" description = "A system for launching and communicating with a proxy application for network communication which runs in a separate process.." authors = ["The VOLTTRON Development Team "] license = "Apache License 2.0" diff --git a/src/protocol_proxy/ipc/asyncio.py b/src/protocol_proxy/ipc/asyncio.py index 8945de7..b438320 100644 --- a/src/protocol_proxy/ipc/asyncio.py +++ b/src/protocol_proxy/ipc/asyncio.py @@ -83,7 +83,7 @@ async def _setup_inbound_server(self, socket_params: SocketParams = None): next_port = next(self.unused_ports(await self._get_ip_addresses(socket_params.address))) self.inbound_server = await self.loop.create_server(factory, socket_params.address, next_port, start_serving=True) - _log.debug(f'{self.proxy_name} AFTER START SERVING. Server is: {self.inbound_server}') + #_log.debug(f'{self.proxy_name} AFTER START SERVING. Server is: {self.inbound_server}') except OSError: continue except StopIteration: @@ -91,12 +91,15 @@ async def _setup_inbound_server(self, socket_params: SocketParams = None): f' on any port in range: {self.min_port} - {self.max_port}.') break else: - self.inbound_params = SocketParams(*self.inbound_server.sockets[0].getsockname()) + # Only take first 2 elements (host, port) from getsockname() + # IPv6 sockets return 4-tuple (host, port, flowinfo, scope_id) + sockname = self.inbound_server.sockets[0].getsockname() + self.inbound_params = SocketParams(sockname[0], sockname[1]) break async def start(self, *_, **__): await self._setup_inbound_server(self.inbound_params) - _log.debug(f' {self.proxy_name} STARTED with INBOUND PARAMS SENT AS: {self.inbound_params}.') + #_log.debug(f' {self.proxy_name} STARTED with INBOUND PARAMS SENT AS: {self.inbound_params}.') async def stop(self): self.inbound_server.close() @@ -182,7 +185,7 @@ async def _run_callback(self, callback_info: ProtocolProxyCallback, headers, dat self.transport.close() def connection_made(self, transport: Transport): - _log.debug(f"[IPCProtocol] connection_made: transport={transport}") + #_log.debug(f"[IPCProtocol] connection_made: transport={transport}") try: self.transport = transport if self.outgoing_message: @@ -202,8 +205,9 @@ def _message_to_bytes(self, message: ProtocolProxyMessage): def connection_lost(self, exc): try: + pass # _log.debug(f'{self.connector.proxy_name} -- Connection lost, exc: "{exc}"') - _log.debug(f'self.on_con_lost is a {type(self.on_con_lost)} with value: {self.on_con_lost}') + # _log.debug(f'self.on_con_lost is a {type(self.on_con_lost)} with value: {self.on_con_lost}') # if self.on_con_lost is not None: # self.on_con_lost.set_result(True) # TODO: What is using the on_con_lost thing? except Exception as e: diff --git a/src/protocol_proxy/ipc/base.py b/src/protocol_proxy/ipc/base.py index fe264ef..2403ea6 100644 --- a/src/protocol_proxy/ipc/base.py +++ b/src/protocol_proxy/ipc/base.py @@ -105,8 +105,12 @@ def next_request_id(self): return next(self._request_id) def register_callback(self, cb_method, method_name, provides_response=False, timeout=30.0): - _log.info(f'{self.proxy_name} registered callback: {method_name}') - self.callbacks[method_name] = ProtocolProxyCallback(cb_method, method_name, provides_response, timeout=timeout) + if not self.callbacks.get(method_name): + _log.info(f'{self.proxy_name} registered callback: {method_name}') + self.callbacks[method_name] = ProtocolProxyCallback(cb_method, method_name, provides_response, + timeout=timeout) + else: + _log.info(f'{self.proxy_name} confirmed callback: {method_name} is registered.') @abstractmethod def start(self, *_, **__): diff --git a/src/protocol_proxy/ipc/gevent.py b/src/protocol_proxy/ipc/gevent.py index 4d8a0c1..8e65a15 100644 --- a/src/protocol_proxy/ipc/gevent.py +++ b/src/protocol_proxy/ipc/gevent.py @@ -1,6 +1,7 @@ import logging import struct +from contextlib import contextmanager from dataclasses import dataclass from gevent import select, sleep, spawn from gevent.event import AsyncResult @@ -42,6 +43,9 @@ def _get_ip_addresses(self, host_name: str) -> set[str]: return {ai[4][0] for ai in getaddrinfo(host_name, None)} def _setup_inbound_server(self, socket_params: SocketParams = None): + if self.inbound_server_socket: + #_log.debug('@@@@@@@ Using existing inbound server socket.') + return inbound_socket: socket = socket(AF_INET, SOCK_STREAM) inbound_socket.setblocking(False) if socket_params: @@ -73,6 +77,7 @@ def _setup_inbound_server(self, socket_params: SocketParams = None): _log.warning(f'{self.proxy_name}: Socket error listening on {self.inbound_params}: {e}') self.inbound_server_socket = inbound_socket self.inbounds.add(self.inbound_server_socket) + _log.info(f'@@@@@@@ Created new inbound server socket: {self.inbound_params}.') return @callback @@ -102,8 +107,8 @@ def send(self, remote: ProtocolProxyPeer, message: ProtocolProxyMessage) -> bool return False if message.request_id is None: message.request_id = self.next_request_id - self.outbounds.add(outbound) self.outbound_messages[outbound] = message + self.outbounds.add(outbound) if message.response_expected: async_result = AsyncResult() self.response_results[message.request_id] = async_result @@ -122,9 +127,12 @@ def select_loop(self): else: for s in readable: # Handle incoming sockets. if s is self.inbound_server_socket: # The server socket is ready to accept a connection - client_socket, client_address = s.accept() - client_socket.setblocking(0) - self.inbounds.add(client_socket) + try: + client_socket, client_address = s.accept() + client_socket.setblocking(0) + self.inbounds.add(client_socket) + except BlockingIOError: + pass else: self.inbounds.discard(s) spawn(self._receive_socket, s) @@ -142,32 +150,63 @@ def select_loop(self): finally: s.close() - def _receive_headers(self, s: socket) -> ProtocolHeaders | None: + @contextmanager + def _non_blocking_socket(self, func, io_wait_time, *args, **kwargs): + #_log.debug(f'NEW CALL TO _NON_BLOCKING_SOCKET: FUNC: "{func}", IO_WAIT_TIME: {io_wait_time}, ARGS: {args}, KWARGS: {kwargs}') + done = False + while not done: + try: + #_log.debug(f'CALLING FUNC "{func}" with ARGS: {args} and KWARGS: {kwargs}') + ret_val = func(*args, **kwargs) + #_log.debug(f'RETURNING: {ret_val}') + done = True + yield ret_val, io_wait_time + break + except BlockingIOError as e: + io_wait_time -= 0.1 + sleep(0.1) + if io_wait_time <= 0: + _log.info(f'Timed out after {self.max_io_wait_seconds} seconds with BlockingIOError: {e}') + done = True + #finally: + #_log.debug('IN FINALLY OF _NON_BLOCKING_SOCKET') + + def _receive_headers(self, s: socket) -> tuple[ProtocolHeaders | None, float]: try: - received = s.recv(2) - if len(received) == 0: - _log.warning(f'{self.proxy_name} received closed socket from ({s.getpeername()}.') - return None - version_num = struct.unpack('>H', received)[0] + with self._non_blocking_socket(s.recv, self.max_io_wait_seconds, 2) as (version_bytes, remaining_time): + if len(version_bytes) == 0: + try: + peer_name = f' from {s.getpeername()}.' + except OSError: + peer_name = '.' + _log.warning(f'{self.proxy_name} received closed socket {peer_name}') + return None, remaining_time + version_num = struct.unpack('>H', version_bytes)[0] if not (protocol := self.PROTOCOL_VERSION.get(version_num)): - raise NotImplementedError(f'Unknown protocol version ({version_num})' - f' received from: {s.getpeername()}') - header_bytes = s.recv(protocol.HEADER_LENGTH) - if len(header_bytes) == protocol.HEADER_LENGTH: - return protocol.unpack(header_bytes) - else: - _log.warning(f'Failed to read headers. Received {len(header_bytes)} bytes: {header_bytes}') + try: + peer_name = f' received from: {s.getpeername()}.' + except OSError: + peer_name = '.' + raise NotImplementedError(f'Unknown protocol version ({version_num}){peer_name}') + with self._non_blocking_socket(s.recv, remaining_time, protocol.HEADER_LENGTH + ) as (header_bytes, remaining_time): + if len(header_bytes) == protocol.HEADER_LENGTH: + return protocol.unpack(header_bytes), remaining_time + else: + _log.warning(f'Failed to unpack headers. For header length of {protocol.HEADER_LENGTH},' + f' received {len(header_bytes)} bytes: {header_bytes}') + return None, remaining_time except (OSError, Exception) as e: - _log.warning(f'{self.proxy_name}: Socket exception reading headers: {e}') + # TODO: Why is this getting triggered at end of transmissions? _log.warning(f'{self.proxy_name}: Socket exception reading headers: {e}') + return None, remaining_time if 'remaining_time' in locals() else self.max_io_wait_seconds def _receive_socket(self, s: socket): - _log.debug(f'{self.proxy_name}: IN RECEIVE SOCKET') - headers = self._receive_headers(s) + #_log.debug(f'{self.proxy_name}: IN RECEIVE SOCKET') + headers, io_wait_time = self._receive_headers(s) if headers is not None and (cb_info := self.callbacks.get(headers.method_name)): remaining = headers.data_length buffer = b'' done = False - io_wait_time = self.max_io_wait_seconds while not done: try: while chunk := s.recv(read_length := max(0, remaining if remaining < self.chunk_size else self.chunk_size)): @@ -194,11 +233,19 @@ def _receive_socket(self, s: socket): s.close() done = True elif headers: + try: + peer_name = f' from {s.getpeername()}' + except OSError: + peer_name = '' _log.warning(f'{self.proxy_name}: Received unknown method name: {headers.method_name}' - f' from {s.getpeername()} with request ID: {headers.request_id}') + f' {peer_name} with request ID: {headers.request_id}') s.close() else: - _log.warning(f'{self.proxy_name}: Unable to read headers from socket: {s.getpeername()}') + try: + peer_name = f': {s.getpeername()}.' + except OSError: + peer_name = '.' + # TODO: Why is this getting triggered at end of transmissions? _log.warning(f'{self.proxy_name}: Unable to read headers from socket: {peer_name}') s.close() def _send_headers(self, s: socket, data_length: int, request_id: int, response_expected: bool, method_name: str, @@ -214,17 +261,21 @@ def _send_headers(self, s: socket, data_length: int, request_id: int, response_e f' (request_id: {request_id}): {e}') def _send_socket(self, s: socket): - _log.debug(f'{self.proxy_name}: IN SEND SOCKET') + #_log.debug(f'{self.proxy_name}: IN SEND SOCKET') if not (message := self.outbound_messages.get(s)): - _log.warning(f'Outbound socket to {s.getpeername()} was ready, but no outbound message was found.') + try: + peer_name = f'to {s.getpeername()}' + except OSError: + peer_name = '' + # TODO: Why is this getting triggered (apparently on every send)? _log.warning(f'Outbound socket to {peer_name} was ready, but no outbound message was found.') elif isinstance(message.payload, AsyncResult) and not message.payload.ready(): self.outbounds.add(s) - _log.debug('IN SEND SOCKET, WAS ADDED BACK TO OUTBOUND BECAUSE ASYNC_RESULT WAS NOT READY.') + #_log.debug('IN SEND SOCKET, WAS ADDED BACK TO OUTBOUND BECAUSE ASYNC_RESULT WAS NOT READY.') else: payload = message.payload.get() if isinstance(message.payload, Greenlet) else message.payload self._send_headers(s, len(payload), message.request_id, message.response_expected, message.method_name) try: - _log.debug('REACHED SENDALL IN GEVENT IPC SEND') + #_log.debug('REACHED SENDALL IN GEVENT IPC SEND') s.sendall(payload) # TODO: Should we send in chunks and sleep in between? if message.response_expected: self.inbounds.add(s) @@ -255,7 +306,7 @@ def _handle_exceptional_socket(self, s: socket): def start(self, *_, **__): self._setup_inbound_server(self.inbound_params) - _log.debug(f'{self.proxy_name} STARTED.') + #_log.debug(f'{self.proxy_name} STARTED.') def stop(self): self._stop = True diff --git a/src/protocol_proxy/manager/base.py b/src/protocol_proxy/manager/base.py index dc1b238..ac347f4 100644 --- a/src/protocol_proxy/manager/base.py +++ b/src/protocol_proxy/manager/base.py @@ -33,12 +33,14 @@ def wait_peer_registered(self, peer, timeout, func=None, *args, **kwargs): """ def _setup_proxy_process_command(self, unique_remote_id: tuple, **kwargs) -> tuple: - _log.debug(f'UAI is: {unique_remote_id}') + #_log.debug(f'@@@@@@@@@ PASSED UAI is: {unique_remote_id}') unique_remote_id = self.proxy_class.get_unique_remote_id(unique_remote_id) - _log.debug(f'UAI is: {unique_remote_id}') + #_log.debug(f'@@@@@@@@@ UAI AFTER LOOKUP is: {unique_remote_id}') proxy_id = self.get_proxy_id(unique_remote_id) + #_log.debug('@@@@@@@@@ PROXY ID is: {}'.format(proxy_id)) proxy_name = str(unique_remote_id) if proxy_id not in self.peers: + #_log.debug(f'@@@@@@@@@ PROXY_ID IS NOT IN PEERS. SETTING UP COMMAND.') module, func = self.proxy_class.__module__, self.proxy_class.__name__ protocol_specific_params = [i for pair in [(f"--{k.replace('_', '-')}", v) for k, v in kwargs.items()] for i in pair] @@ -46,6 +48,7 @@ def _setup_proxy_process_command(self, unique_remote_id: tuple, **kwargs) -> tup '--manager-id', self.proxy_id.hex, '--manager-address', self.inbound_params.address, '--manager-port', str(self.inbound_params.port), *protocol_specific_params] else: + #_log.debug(f'@@@@@@@@@ PROXY_ID IS IN PEERS. NOT SETTING UP COMMAND.') command = None return command, proxy_id, proxy_name diff --git a/src/protocol_proxy/manager/gevent.py b/src/protocol_proxy/manager/gevent.py index 633f741..9c63548 100644 --- a/src/protocol_proxy/manager/gevent.py +++ b/src/protocol_proxy/manager/gevent.py @@ -28,6 +28,7 @@ def wait_peer_registered(self, peer, timeout, func=None, *args, **kwargs): def wait_for_peer_registration(): while peer.socket_params is None: sleep(0.1) + try: with peer.ready: with_timeout(timeout, wait_for_peer_registration) @@ -41,10 +42,12 @@ def wait_for_peer_registration(): del self.peers[peer.proxy_id] def get_proxy(self, unique_remote_id: tuple, **kwargs) -> ProtocolProxyPeer: + #_log.debug('@@@@@@@ IN GET_PROXY @@@@@@@') command, proxy_id, proxy_name = self._setup_proxy_process_command(unique_remote_id, **kwargs) # , proxy_env + #_log.debug(f'@@@@@@@ AFTER SETUP_PROXY_PROCESS_COMMAND: {(command, proxy_id, proxy_name)} @@@@@@@') if command: proxy_process = Popen(command, stdin=PIPE, stdout=PIPE, stderr=PIPE) - _log.info("proxy %s has PID %s", self.proxy_name, proxy_process.pid) + _log.info(f'proxy {proxy_name} has PID {proxy_process.pid}') spawn(self.log_subprocess_output, proxy_process.stdout) spawn(self.log_subprocess_output, proxy_process.stderr) # TODO: Ensure that logging as implemented fits with VOLTTRON logging once that is fixed.. @@ -59,7 +62,8 @@ def get_proxy(self, unique_remote_id: tuple, **kwargs) -> ProtocolProxyPeer: token=new_peer_token) atexit.register(self._cleanup_proxy_process, proxy_process) # Do NOT send to the proxy until it has registered and socket_params is set! - _log.debug(f"PPM: Proxy {proxy_id} created, waiting for registration before sending.") + _log.info(f"PPM: Proxy {proxy_id} created, waiting for registration before sending.") + #_log.debug(f'@@@@@@@ GET_PROXY WILL RETURN PEER: {self.peers[proxy_id]} @@@@@@@') return self.peers[proxy_id] def log_subprocess_output(self, stream: IO[bytes]): diff --git a/src/protocol_proxy/proxy/asyncio.py b/src/protocol_proxy/proxy/asyncio.py index ea15b27..a17c733 100644 --- a/src/protocol_proxy/proxy/asyncio.py +++ b/src/protocol_proxy/proxy/asyncio.py @@ -22,15 +22,18 @@ def __init__(self, manager_address: str, manager_port: int, manager_id: UUID, ma token=manager_token) def get_local_socket_params(self) -> SocketParams: - return self.inbound_server.sockets[0].getsockname() + # Only take first 2 elements (host, port) from getsockname() + # IPv6 sockets return 4-tuple (host, port, flowinfo, scope_id) + sockname = self.inbound_server.sockets[0].getsockname() + return SocketParams(sockname[0], sockname[1]) async def send_registration(self, remote: AsyncioProtocolProxyPeer): - _log.debug(f"[send_registration] Attempting to register with manager at: {remote} (type={type(remote)})") + _log.info(f"Attempting to register with manager at: {remote} (type={type(remote)})") message = self._get_registration_message() manager_response = await self.send(remote, message) success_bytes = await manager_response if isinstance(manager_response, Future) else manager_response success = json.loads(success_bytes.decode('utf8')) - _log.debug(f'{self.proxy_name} IN SEND REGISTRATION, FUTURE.RESULT() IS : {success}') + #_log.debug(f'{self.proxy_name} IN SEND REGISTRATION, FUTURE.RESULT() IS : {success}') # TODO: Implement error handling and failure (along lines of copy below from gevent version (but working): # tries_remaining = 2 # if not success: diff --git a/src/protocol_proxy/proxy/base.py b/src/protocol_proxy/proxy/base.py index 882296f..d5fd9f8 100644 --- a/src/protocol_proxy/proxy/base.py +++ b/src/protocol_proxy/proxy/base.py @@ -20,7 +20,7 @@ def __init__(self, *, manager_address: str, manager_port: int, manager_id: UUID, 4. Create a ProtocolProxyPeer subclass for the manager and store it in self.peers. 5. Call send_registration asynchronously in their constructor after super calls. """ - _log.debug('PP: IN INIT.') + #_log.debug('PP: IN INIT.') super(ProtocolProxy, self).__init__(**kwargs) self.registration_retry_delay: float = registration_retry_delay self.manager_params = SocketParams(manager_address, manager_port) diff --git a/src/protocol_proxy/proxy/launch.py b/src/protocol_proxy/proxy/launch.py index 7b1b554..c914cf8 100644 --- a/src/protocol_proxy/proxy/launch.py +++ b/src/protocol_proxy/proxy/launch.py @@ -1,4 +1,5 @@ import logging +from os import environ import sys from argparse import ArgumentParser @@ -6,9 +7,11 @@ from typing import Callable from uuid import UUID +optional_log_params = {'filename': proxy_log} if (proxy_log := environ.get('PROTOCOL_PROXY_LOG')) else {} logging.basicConfig( level=logging.DEBUG, stream=sys.stdout, - format='{"name": "%(name)s", "lineno": "%(lineno)d", "level": "%(levelname)s", "message": "%(message)s"}' + format='{"name": "%(name)s", "lineno": "%(lineno)d", "level": "%(levelname)s", "message": "%(message)s"}', + **optional_log_params ) _log = logging.getLogger(__name__) @@ -30,12 +33,16 @@ def proxy_command_parser(parser: ArgumentParser = None): return parser def launch(launcher_func: Callable): - parser = proxy_command_parser() - parser, proxy_runner = launcher_func(parser) - opts = parser.parse_args() - proxy_token = UUID(hex=sys.stdin.buffer.read(32).decode('utf8')) - manager_token = UUID(hex=sys.stdin.buffer.read(32).decode('utf8')) - if iscoroutinefunction(proxy_runner): - run(proxy_runner(token=proxy_token, manager_token=manager_token, **vars(opts))) - else: - proxy_runner(token=proxy_token, manager_token=manager_token, **vars(opts)) + try: + parser = proxy_command_parser() + parser, proxy_runner = launcher_func(parser) + opts = parser.parse_args() + _log.info(f'Launching Proxy with parameters: {opts}') + proxy_token = UUID(hex=sys.stdin.buffer.read(32).decode('utf8')) + manager_token = UUID(hex=sys.stdin.buffer.read(32).decode('utf8')) + if iscoroutinefunction(proxy_runner): + run(proxy_runner(token=proxy_token, manager_token=manager_token, **vars(opts))) + else: + proxy_runner(token=proxy_token, manager_token=manager_token, **vars(opts)) + except BaseException as e: + _log.warning(f'Proxy Launch: Launcher caught exception: {e}')