Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 59 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1 +1,59 @@
# lib-protocol-proxy
# Protocol Proxy
![Python 3.10](https://img.shields.io/badge/python-3.10-blue.svg)
![Python 3.11](https://img.shields.io/badge/python-3.11-blue.svg)
[![Passing?](https://github.com/eclipse-volttron/lib-protocol-proxy/actions/workflows/run-tests.yml/badge.svg)](https://github.com/eclipse-volttron/lib-protocol-proxy/actions/workflows/run-tests.yml)
[![pypi version](https://img.shields.io/pypi/v/protocol-proxy.svg)](https://pypi.org/project/protocol-proxy/)

This library provides the user with the ability to automatically deploy and manager proxy processes for handling
network communication with remote devices using various protocols. A proxy to each remote peer is established in
a separate process from the managing application. A manager class handles socket communication between the proxy
subprocess and its owner. Individual protocols are implemented as plugins to this library. Integration with
event and asyncio event loops are supported for each of the proxy and manager processes.


## Automatically installed dependencies
- python = ">=3.10,<4.0"

[//]: # (# Documentation)

[//]: # (More detailed documentation can be found on [ReadTheDocs]&#40;https://eclipse-volttron.readthedocs.io/en/latest/external-docs/lib-protocol-proxy/index.html. The RST source)

[//]: # (of the documentation for this component is located in the "docs" directory of this repository.)

# Installation
This library can be installed using pip:

```shell
pip install lib-protocol-proxy
```

Protocol Proxy plugins should include "protocol-proxy" as a requirement, so users of existing
plugins are encouraged to instead install the plugin for that pacakge directly.

# Development
This library is maintained by the VOLTTRON Development Team.

Please see the following [guidelines](https://github.com/eclipse-volttron/volttron-core/blob/develop/CONTRIBUTING.md)
for contributing to this and/or other VOLTTRON repositories.

[//]: # (Please see the following helpful guide about [using the Protocol Proxy]&#40;https://github.com/eclipse-volttron/lib-protocol-proxy/blob/develop/developing_with_protocol_proxy.md&#41;)

[//]: # (in your VOLTTRON agent or other applications.)

# Disclaimer Notice

This material was prepared as an account of work sponsored by an agency of the
United States Government. Neither the United States Government nor the United
States Department of Energy, nor Battelle, nor any of their employees, nor any
jurisdiction or organization that has cooperated in the development of these
materials, makes any warranty, express or implied, or assumes any legal
liability or responsibility for the accuracy, completeness, or usefulness or any
information, apparatus, product, software, or process disclosed, or represents
that its use would not infringe privately owned rights.

Reference herein to any specific commercial product, process, or service by
trade name, trademark, manufacturer, or otherwise does not necessarily
constitute or imply its endorsement, recommendation, or favoring by the United
States Government or any agency thereof, or Battelle Memorial Institute. The
views and opinions of authors expressed herein do not necessarily state or
reflect those of the United States Government or any agency thereof.
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ build-backend = "poetry.core.masonry.api"
profile = "black"

[tool.mypy]
python_version = 3.10
python_version = "3.10"
show_error_context = true
pretty = true
show_column_numbers = true
Expand All @@ -29,7 +29,7 @@ ignore_missing_imports = true

[tool.poetry]
name = "protocol-proxy"
version = "2.0.0rc1"
version = "2.0.0rc2"
description = "A system for launching and communicating with a proxy application for network communication which runs in a separate process.."
authors = ["The VOLTTRON Development Team <volttron@pnnl.gov>"]
license = "Apache License 2.0"
Expand Down
14 changes: 9 additions & 5 deletions src/protocol_proxy/ipc/asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,20 +83,23 @@ async def _setup_inbound_server(self, socket_params: SocketParams = None):
next_port = next(self.unused_ports(await self._get_ip_addresses(socket_params.address)))
self.inbound_server = await self.loop.create_server(factory, socket_params.address, next_port,
start_serving=True)
_log.debug(f'{self.proxy_name} AFTER START SERVING. Server is: {self.inbound_server}')
#_log.debug(f'{self.proxy_name} AFTER START SERVING. Server is: {self.inbound_server}')
except OSError:
continue
except StopIteration:
_log.error(f'Unable to bind inbound socket to {socket_params.address}'
f' on any port in range: {self.min_port} - {self.max_port}.')
break
else:
self.inbound_params = SocketParams(*self.inbound_server.sockets[0].getsockname())
# Only take first 2 elements (host, port) from getsockname()
# IPv6 sockets return 4-tuple (host, port, flowinfo, scope_id)
sockname = self.inbound_server.sockets[0].getsockname()
self.inbound_params = SocketParams(sockname[0], sockname[1])
break

async def start(self, *_, **__):
await self._setup_inbound_server(self.inbound_params)
_log.debug(f' {self.proxy_name} STARTED with INBOUND PARAMS SENT AS: {self.inbound_params}.')
#_log.debug(f' {self.proxy_name} STARTED with INBOUND PARAMS SENT AS: {self.inbound_params}.')

async def stop(self):
self.inbound_server.close()
Expand Down Expand Up @@ -182,7 +185,7 @@ async def _run_callback(self, callback_info: ProtocolProxyCallback, headers, dat
self.transport.close()

def connection_made(self, transport: Transport):
_log.debug(f"[IPCProtocol] connection_made: transport={transport}")
#_log.debug(f"[IPCProtocol] connection_made: transport={transport}")
try:
self.transport = transport
if self.outgoing_message:
Expand All @@ -202,8 +205,9 @@ def _message_to_bytes(self, message: ProtocolProxyMessage):

def connection_lost(self, exc):
try:
pass
# _log.debug(f'{self.connector.proxy_name} -- Connection lost, exc: "{exc}"')
_log.debug(f'self.on_con_lost is a {type(self.on_con_lost)} with value: {self.on_con_lost}')
# _log.debug(f'self.on_con_lost is a {type(self.on_con_lost)} with value: {self.on_con_lost}')
# if self.on_con_lost is not None:
# self.on_con_lost.set_result(True) # TODO: What is using the on_con_lost thing?
except Exception as e:
Expand Down
8 changes: 6 additions & 2 deletions src/protocol_proxy/ipc/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,12 @@ def next_request_id(self):
return next(self._request_id)

def register_callback(self, cb_method, method_name, provides_response=False, timeout=30.0):
_log.info(f'{self.proxy_name} registered callback: {method_name}')
self.callbacks[method_name] = ProtocolProxyCallback(cb_method, method_name, provides_response, timeout=timeout)
if not self.callbacks.get(method_name):
_log.info(f'{self.proxy_name} registered callback: {method_name}')
self.callbacks[method_name] = ProtocolProxyCallback(cb_method, method_name, provides_response,
timeout=timeout)
else:
_log.info(f'{self.proxy_name} confirmed callback: {method_name} is registered.')

@abstractmethod
def start(self, *_, **__):
Expand Down
107 changes: 79 additions & 28 deletions src/protocol_proxy/ipc/gevent.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging
import struct

from contextlib import contextmanager
from dataclasses import dataclass
from gevent import select, sleep, spawn
from gevent.event import AsyncResult
Expand Down Expand Up @@ -42,6 +43,9 @@ def _get_ip_addresses(self, host_name: str) -> set[str]:
return {ai[4][0] for ai in getaddrinfo(host_name, None)}

def _setup_inbound_server(self, socket_params: SocketParams = None):
if self.inbound_server_socket:
#_log.debug('@@@@@@@ Using existing inbound server socket.')
return
inbound_socket: socket = socket(AF_INET, SOCK_STREAM)
inbound_socket.setblocking(False)
if socket_params:
Expand Down Expand Up @@ -73,6 +77,7 @@ def _setup_inbound_server(self, socket_params: SocketParams = None):
_log.warning(f'{self.proxy_name}: Socket error listening on {self.inbound_params}: {e}')
self.inbound_server_socket = inbound_socket
self.inbounds.add(self.inbound_server_socket)
_log.info(f'@@@@@@@ Created new inbound server socket: {self.inbound_params}.')
return

@callback
Expand Down Expand Up @@ -102,8 +107,8 @@ def send(self, remote: ProtocolProxyPeer, message: ProtocolProxyMessage) -> bool
return False
if message.request_id is None:
message.request_id = self.next_request_id
self.outbounds.add(outbound)
self.outbound_messages[outbound] = message
self.outbounds.add(outbound)
if message.response_expected:
async_result = AsyncResult()
self.response_results[message.request_id] = async_result
Expand All @@ -122,9 +127,12 @@ def select_loop(self):
else:
for s in readable: # Handle incoming sockets.
if s is self.inbound_server_socket: # The server socket is ready to accept a connection
client_socket, client_address = s.accept()
client_socket.setblocking(0)
self.inbounds.add(client_socket)
try:
client_socket, client_address = s.accept()
client_socket.setblocking(0)
self.inbounds.add(client_socket)
except BlockingIOError:
pass
else:
self.inbounds.discard(s)
spawn(self._receive_socket, s)
Expand All @@ -142,32 +150,63 @@ def select_loop(self):
finally:
s.close()

def _receive_headers(self, s: socket) -> ProtocolHeaders | None:
@contextmanager
def _non_blocking_socket(self, func, io_wait_time, *args, **kwargs):
#_log.debug(f'NEW CALL TO _NON_BLOCKING_SOCKET: FUNC: "{func}", IO_WAIT_TIME: {io_wait_time}, ARGS: {args}, KWARGS: {kwargs}')
done = False
while not done:
try:
#_log.debug(f'CALLING FUNC "{func}" with ARGS: {args} and KWARGS: {kwargs}')
ret_val = func(*args, **kwargs)
#_log.debug(f'RETURNING: {ret_val}')
done = True
yield ret_val, io_wait_time
break
except BlockingIOError as e:
io_wait_time -= 0.1
sleep(0.1)
if io_wait_time <= 0:
_log.info(f'Timed out after {self.max_io_wait_seconds} seconds with BlockingIOError: {e}')
done = True
#finally:
#_log.debug('IN FINALLY OF _NON_BLOCKING_SOCKET')

def _receive_headers(self, s: socket) -> tuple[ProtocolHeaders | None, float]:
try:
received = s.recv(2)
if len(received) == 0:
_log.warning(f'{self.proxy_name} received closed socket from ({s.getpeername()}.')
return None
version_num = struct.unpack('>H', received)[0]
with self._non_blocking_socket(s.recv, self.max_io_wait_seconds, 2) as (version_bytes, remaining_time):
if len(version_bytes) == 0:
try:
peer_name = f' from {s.getpeername()}.'
except OSError:
peer_name = '.'
_log.warning(f'{self.proxy_name} received closed socket {peer_name}')
return None, remaining_time
version_num = struct.unpack('>H', version_bytes)[0]
if not (protocol := self.PROTOCOL_VERSION.get(version_num)):
raise NotImplementedError(f'Unknown protocol version ({version_num})'
f' received from: {s.getpeername()}')
header_bytes = s.recv(protocol.HEADER_LENGTH)
if len(header_bytes) == protocol.HEADER_LENGTH:
return protocol.unpack(header_bytes)
else:
_log.warning(f'Failed to read headers. Received {len(header_bytes)} bytes: {header_bytes}')
try:
peer_name = f' received from: {s.getpeername()}.'
except OSError:
peer_name = '.'
raise NotImplementedError(f'Unknown protocol version ({version_num}){peer_name}')
with self._non_blocking_socket(s.recv, remaining_time, protocol.HEADER_LENGTH
) as (header_bytes, remaining_time):
if len(header_bytes) == protocol.HEADER_LENGTH:
return protocol.unpack(header_bytes), remaining_time
else:
_log.warning(f'Failed to unpack headers. For header length of {protocol.HEADER_LENGTH},'
f' received {len(header_bytes)} bytes: {header_bytes}')
return None, remaining_time
except (OSError, Exception) as e:
_log.warning(f'{self.proxy_name}: Socket exception reading headers: {e}')
# TODO: Why is this getting triggered at end of transmissions? _log.warning(f'{self.proxy_name}: Socket exception reading headers: {e}')
return None, remaining_time if 'remaining_time' in locals() else self.max_io_wait_seconds

def _receive_socket(self, s: socket):
_log.debug(f'{self.proxy_name}: IN RECEIVE SOCKET')
headers = self._receive_headers(s)
#_log.debug(f'{self.proxy_name}: IN RECEIVE SOCKET')
headers, io_wait_time = self._receive_headers(s)
if headers is not None and (cb_info := self.callbacks.get(headers.method_name)):
remaining = headers.data_length
buffer = b''
done = False
io_wait_time = self.max_io_wait_seconds
while not done:
try:
while chunk := s.recv(read_length := max(0, remaining if remaining < self.chunk_size else self.chunk_size)):
Expand All @@ -194,11 +233,19 @@ def _receive_socket(self, s: socket):
s.close()
done = True
elif headers:
try:
peer_name = f' from {s.getpeername()}'
except OSError:
peer_name = ''
_log.warning(f'{self.proxy_name}: Received unknown method name: {headers.method_name}'
f' from {s.getpeername()} with request ID: {headers.request_id}')
f' {peer_name} with request ID: {headers.request_id}')
s.close()
else:
_log.warning(f'{self.proxy_name}: Unable to read headers from socket: {s.getpeername()}')
try:
peer_name = f': {s.getpeername()}.'
except OSError:
peer_name = '.'
# TODO: Why is this getting triggered at end of transmissions? _log.warning(f'{self.proxy_name}: Unable to read headers from socket: {peer_name}')
s.close()

def _send_headers(self, s: socket, data_length: int, request_id: int, response_expected: bool, method_name: str,
Expand All @@ -214,17 +261,21 @@ def _send_headers(self, s: socket, data_length: int, request_id: int, response_e
f' (request_id: {request_id}): {e}')

def _send_socket(self, s: socket):
_log.debug(f'{self.proxy_name}: IN SEND SOCKET')
#_log.debug(f'{self.proxy_name}: IN SEND SOCKET')
if not (message := self.outbound_messages.get(s)):
_log.warning(f'Outbound socket to {s.getpeername()} was ready, but no outbound message was found.')
try:
peer_name = f'to {s.getpeername()}'
except OSError:
peer_name = ''
# TODO: Why is this getting triggered (apparently on every send)? _log.warning(f'Outbound socket to {peer_name} was ready, but no outbound message was found.')
elif isinstance(message.payload, AsyncResult) and not message.payload.ready():
self.outbounds.add(s)
_log.debug('IN SEND SOCKET, WAS ADDED BACK TO OUTBOUND BECAUSE ASYNC_RESULT WAS NOT READY.')
#_log.debug('IN SEND SOCKET, WAS ADDED BACK TO OUTBOUND BECAUSE ASYNC_RESULT WAS NOT READY.')
else:
payload = message.payload.get() if isinstance(message.payload, Greenlet) else message.payload
self._send_headers(s, len(payload), message.request_id, message.response_expected, message.method_name)
try:
_log.debug('REACHED SENDALL IN GEVENT IPC SEND')
#_log.debug('REACHED SENDALL IN GEVENT IPC SEND')
s.sendall(payload) # TODO: Should we send in chunks and sleep in between?
if message.response_expected:
self.inbounds.add(s)
Expand Down Expand Up @@ -255,7 +306,7 @@ def _handle_exceptional_socket(self, s: socket):

def start(self, *_, **__):
self._setup_inbound_server(self.inbound_params)
_log.debug(f'{self.proxy_name} STARTED.')
#_log.debug(f'{self.proxy_name} STARTED.')

def stop(self):
self._stop = True
Expand Down
7 changes: 5 additions & 2 deletions src/protocol_proxy/manager/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,19 +33,22 @@ def wait_peer_registered(self, peer, timeout, func=None, *args, **kwargs):
"""

def _setup_proxy_process_command(self, unique_remote_id: tuple, **kwargs) -> tuple:
_log.debug(f'UAI is: {unique_remote_id}')
#_log.debug(f'@@@@@@@@@ PASSED UAI is: {unique_remote_id}')
unique_remote_id = self.proxy_class.get_unique_remote_id(unique_remote_id)
_log.debug(f'UAI is: {unique_remote_id}')
#_log.debug(f'@@@@@@@@@ UAI AFTER LOOKUP is: {unique_remote_id}')
proxy_id = self.get_proxy_id(unique_remote_id)
#_log.debug('@@@@@@@@@ PROXY ID is: {}'.format(proxy_id))
proxy_name = str(unique_remote_id)
if proxy_id not in self.peers:
#_log.debug(f'@@@@@@@@@ PROXY_ID IS NOT IN PEERS. SETTING UP COMMAND.')
module, func = self.proxy_class.__module__, self.proxy_class.__name__
protocol_specific_params = [i for pair in [(f"--{k.replace('_', '-')}", v)
for k, v in kwargs.items()] for i in pair]
command = [sys.executable, '-m', module, '--proxy-id', proxy_id.hex, '--proxy-name', proxy_name,
'--manager-id', self.proxy_id.hex, '--manager-address', self.inbound_params.address,
'--manager-port', str(self.inbound_params.port), *protocol_specific_params]
else:
#_log.debug(f'@@@@@@@@@ PROXY_ID IS IN PEERS. NOT SETTING UP COMMAND.')
command = None
return command, proxy_id, proxy_name

Expand Down
Loading