Skip to content

Commit 4c31a05

Browse files
authored
Merge pull request #3 from davidraker/bus_adapter_changes
Updates to README.md.
2 parents 67763e5 + f0ded7f commit 4c31a05

10 files changed

Lines changed: 190 additions & 56 deletions

File tree

README.md

Lines changed: 59 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1,59 @@
1-
# lib-protocol-proxy
1+
# Protocol Proxy
2+
![Python 3.10](https://img.shields.io/badge/python-3.10-blue.svg)
3+
![Python 3.11](https://img.shields.io/badge/python-3.11-blue.svg)
4+
[![Passing?](https://github.com/eclipse-volttron/lib-protocol-proxy/actions/workflows/run-tests.yml/badge.svg)](https://github.com/eclipse-volttron/lib-protocol-proxy/actions/workflows/run-tests.yml)
5+
[![pypi version](https://img.shields.io/pypi/v/protocol-proxy.svg)](https://pypi.org/project/protocol-proxy/)
6+
7+
This library provides the user with the ability to automatically deploy and manager proxy processes for handling
8+
network communication with remote devices using various protocols. A proxy to each remote peer is established in
9+
a separate process from the managing application. A manager class handles socket communication between the proxy
10+
subprocess and its owner. Individual protocols are implemented as plugins to this library. Integration with
11+
event and asyncio event loops are supported for each of the proxy and manager processes.
12+
13+
14+
## Automatically installed dependencies
15+
- python = ">=3.10,<4.0"
16+
17+
[//]: # (# Documentation)
18+
19+
[//]: # (More detailed documentation can be found on [ReadTheDocs]&#40;https://eclipse-volttron.readthedocs.io/en/latest/external-docs/lib-protocol-proxy/index.html. The RST source)
20+
21+
[//]: # (of the documentation for this component is located in the "docs" directory of this repository.)
22+
23+
# Installation
24+
This library can be installed using pip:
25+
26+
```shell
27+
pip install lib-protocol-proxy
28+
```
29+
30+
Protocol Proxy plugins should include "protocol-proxy" as a requirement, so users of existing
31+
plugins are encouraged to instead install the plugin for that pacakge directly.
32+
33+
# Development
34+
This library is maintained by the VOLTTRON Development Team.
35+
36+
Please see the following [guidelines](https://github.com/eclipse-volttron/volttron-core/blob/develop/CONTRIBUTING.md)
37+
for contributing to this and/or other VOLTTRON repositories.
38+
39+
[//]: # (Please see the following helpful guide about [using the Protocol Proxy]&#40;https://github.com/eclipse-volttron/lib-protocol-proxy/blob/develop/developing_with_protocol_proxy.md&#41;)
40+
41+
[//]: # (in your VOLTTRON agent or other applications.)
42+
43+
# Disclaimer Notice
44+
45+
This material was prepared as an account of work sponsored by an agency of the
46+
United States Government. Neither the United States Government nor the United
47+
States Department of Energy, nor Battelle, nor any of their employees, nor any
48+
jurisdiction or organization that has cooperated in the development of these
49+
materials, makes any warranty, express or implied, or assumes any legal
50+
liability or responsibility for the accuracy, completeness, or usefulness or any
51+
information, apparatus, product, software, or process disclosed, or represents
52+
that its use would not infringe privately owned rights.
53+
54+
Reference herein to any specific commercial product, process, or service by
55+
trade name, trademark, manufacturer, or otherwise does not necessarily
56+
constitute or imply its endorsement, recommendation, or favoring by the United
57+
States Government or any agency thereof, or Battelle Memorial Institute. The
58+
views and opinions of authors expressed herein do not necessarily state or
59+
reflect those of the United States Government or any agency thereof.

pyproject.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ build-backend = "poetry.core.masonry.api"
66
profile = "black"
77

88
[tool.mypy]
9-
python_version = 3.10
9+
python_version = "3.10"
1010
show_error_context = true
1111
pretty = true
1212
show_column_numbers = true
@@ -29,7 +29,7 @@ ignore_missing_imports = true
2929

3030
[tool.poetry]
3131
name = "protocol-proxy"
32-
version = "2.0.0rc1"
32+
version = "2.0.0rc2"
3333
description = "A system for launching and communicating with a proxy application for network communication which runs in a separate process.."
3434
authors = ["The VOLTTRON Development Team <volttron@pnnl.gov>"]
3535
license = "Apache License 2.0"

src/protocol_proxy/ipc/asyncio.py

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -83,20 +83,23 @@ async def _setup_inbound_server(self, socket_params: SocketParams = None):
8383
next_port = next(self.unused_ports(await self._get_ip_addresses(socket_params.address)))
8484
self.inbound_server = await self.loop.create_server(factory, socket_params.address, next_port,
8585
start_serving=True)
86-
_log.debug(f'{self.proxy_name} AFTER START SERVING. Server is: {self.inbound_server}')
86+
#_log.debug(f'{self.proxy_name} AFTER START SERVING. Server is: {self.inbound_server}')
8787
except OSError:
8888
continue
8989
except StopIteration:
9090
_log.error(f'Unable to bind inbound socket to {socket_params.address}'
9191
f' on any port in range: {self.min_port} - {self.max_port}.')
9292
break
9393
else:
94-
self.inbound_params = SocketParams(*self.inbound_server.sockets[0].getsockname())
94+
# Only take first 2 elements (host, port) from getsockname()
95+
# IPv6 sockets return 4-tuple (host, port, flowinfo, scope_id)
96+
sockname = self.inbound_server.sockets[0].getsockname()
97+
self.inbound_params = SocketParams(sockname[0], sockname[1])
9598
break
9699

97100
async def start(self, *_, **__):
98101
await self._setup_inbound_server(self.inbound_params)
99-
_log.debug(f' {self.proxy_name} STARTED with INBOUND PARAMS SENT AS: {self.inbound_params}.')
102+
#_log.debug(f' {self.proxy_name} STARTED with INBOUND PARAMS SENT AS: {self.inbound_params}.')
100103

101104
async def stop(self):
102105
self.inbound_server.close()
@@ -182,7 +185,7 @@ async def _run_callback(self, callback_info: ProtocolProxyCallback, headers, dat
182185
self.transport.close()
183186

184187
def connection_made(self, transport: Transport):
185-
_log.debug(f"[IPCProtocol] connection_made: transport={transport}")
188+
#_log.debug(f"[IPCProtocol] connection_made: transport={transport}")
186189
try:
187190
self.transport = transport
188191
if self.outgoing_message:
@@ -202,8 +205,9 @@ def _message_to_bytes(self, message: ProtocolProxyMessage):
202205

203206
def connection_lost(self, exc):
204207
try:
208+
pass
205209
# _log.debug(f'{self.connector.proxy_name} -- Connection lost, exc: "{exc}"')
206-
_log.debug(f'self.on_con_lost is a {type(self.on_con_lost)} with value: {self.on_con_lost}')
210+
# _log.debug(f'self.on_con_lost is a {type(self.on_con_lost)} with value: {self.on_con_lost}')
207211
# if self.on_con_lost is not None:
208212
# self.on_con_lost.set_result(True) # TODO: What is using the on_con_lost thing?
209213
except Exception as e:

src/protocol_proxy/ipc/base.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -105,8 +105,12 @@ def next_request_id(self):
105105
return next(self._request_id)
106106

107107
def register_callback(self, cb_method, method_name, provides_response=False, timeout=30.0):
108-
_log.info(f'{self.proxy_name} registered callback: {method_name}')
109-
self.callbacks[method_name] = ProtocolProxyCallback(cb_method, method_name, provides_response, timeout=timeout)
108+
if not self.callbacks.get(method_name):
109+
_log.info(f'{self.proxy_name} registered callback: {method_name}')
110+
self.callbacks[method_name] = ProtocolProxyCallback(cb_method, method_name, provides_response,
111+
timeout=timeout)
112+
else:
113+
_log.info(f'{self.proxy_name} confirmed callback: {method_name} is registered.')
110114

111115
@abstractmethod
112116
def start(self, *_, **__):

src/protocol_proxy/ipc/gevent.py

Lines changed: 79 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import logging
22
import struct
33

4+
from contextlib import contextmanager
45
from dataclasses import dataclass
56
from gevent import select, sleep, spawn
67
from gevent.event import AsyncResult
@@ -42,6 +43,9 @@ def _get_ip_addresses(self, host_name: str) -> set[str]:
4243
return {ai[4][0] for ai in getaddrinfo(host_name, None)}
4344

4445
def _setup_inbound_server(self, socket_params: SocketParams = None):
46+
if self.inbound_server_socket:
47+
#_log.debug('@@@@@@@ Using existing inbound server socket.')
48+
return
4549
inbound_socket: socket = socket(AF_INET, SOCK_STREAM)
4650
inbound_socket.setblocking(False)
4751
if socket_params:
@@ -73,6 +77,7 @@ def _setup_inbound_server(self, socket_params: SocketParams = None):
7377
_log.warning(f'{self.proxy_name}: Socket error listening on {self.inbound_params}: {e}')
7478
self.inbound_server_socket = inbound_socket
7579
self.inbounds.add(self.inbound_server_socket)
80+
_log.info(f'@@@@@@@ Created new inbound server socket: {self.inbound_params}.')
7681
return
7782

7883
@callback
@@ -102,8 +107,8 @@ def send(self, remote: ProtocolProxyPeer, message: ProtocolProxyMessage) -> bool
102107
return False
103108
if message.request_id is None:
104109
message.request_id = self.next_request_id
105-
self.outbounds.add(outbound)
106110
self.outbound_messages[outbound] = message
111+
self.outbounds.add(outbound)
107112
if message.response_expected:
108113
async_result = AsyncResult()
109114
self.response_results[message.request_id] = async_result
@@ -122,9 +127,12 @@ def select_loop(self):
122127
else:
123128
for s in readable: # Handle incoming sockets.
124129
if s is self.inbound_server_socket: # The server socket is ready to accept a connection
125-
client_socket, client_address = s.accept()
126-
client_socket.setblocking(0)
127-
self.inbounds.add(client_socket)
130+
try:
131+
client_socket, client_address = s.accept()
132+
client_socket.setblocking(0)
133+
self.inbounds.add(client_socket)
134+
except BlockingIOError:
135+
pass
128136
else:
129137
self.inbounds.discard(s)
130138
spawn(self._receive_socket, s)
@@ -142,32 +150,63 @@ def select_loop(self):
142150
finally:
143151
s.close()
144152

145-
def _receive_headers(self, s: socket) -> ProtocolHeaders | None:
153+
@contextmanager
154+
def _non_blocking_socket(self, func, io_wait_time, *args, **kwargs):
155+
#_log.debug(f'NEW CALL TO _NON_BLOCKING_SOCKET: FUNC: "{func}", IO_WAIT_TIME: {io_wait_time}, ARGS: {args}, KWARGS: {kwargs}')
156+
done = False
157+
while not done:
158+
try:
159+
#_log.debug(f'CALLING FUNC "{func}" with ARGS: {args} and KWARGS: {kwargs}')
160+
ret_val = func(*args, **kwargs)
161+
#_log.debug(f'RETURNING: {ret_val}')
162+
done = True
163+
yield ret_val, io_wait_time
164+
break
165+
except BlockingIOError as e:
166+
io_wait_time -= 0.1
167+
sleep(0.1)
168+
if io_wait_time <= 0:
169+
_log.info(f'Timed out after {self.max_io_wait_seconds} seconds with BlockingIOError: {e}')
170+
done = True
171+
#finally:
172+
#_log.debug('IN FINALLY OF _NON_BLOCKING_SOCKET')
173+
174+
def _receive_headers(self, s: socket) -> tuple[ProtocolHeaders | None, float]:
146175
try:
147-
received = s.recv(2)
148-
if len(received) == 0:
149-
_log.warning(f'{self.proxy_name} received closed socket from ({s.getpeername()}.')
150-
return None
151-
version_num = struct.unpack('>H', received)[0]
176+
with self._non_blocking_socket(s.recv, self.max_io_wait_seconds, 2) as (version_bytes, remaining_time):
177+
if len(version_bytes) == 0:
178+
try:
179+
peer_name = f' from {s.getpeername()}.'
180+
except OSError:
181+
peer_name = '.'
182+
_log.warning(f'{self.proxy_name} received closed socket {peer_name}')
183+
return None, remaining_time
184+
version_num = struct.unpack('>H', version_bytes)[0]
152185
if not (protocol := self.PROTOCOL_VERSION.get(version_num)):
153-
raise NotImplementedError(f'Unknown protocol version ({version_num})'
154-
f' received from: {s.getpeername()}')
155-
header_bytes = s.recv(protocol.HEADER_LENGTH)
156-
if len(header_bytes) == protocol.HEADER_LENGTH:
157-
return protocol.unpack(header_bytes)
158-
else:
159-
_log.warning(f'Failed to read headers. Received {len(header_bytes)} bytes: {header_bytes}')
186+
try:
187+
peer_name = f' received from: {s.getpeername()}.'
188+
except OSError:
189+
peer_name = '.'
190+
raise NotImplementedError(f'Unknown protocol version ({version_num}){peer_name}')
191+
with self._non_blocking_socket(s.recv, remaining_time, protocol.HEADER_LENGTH
192+
) as (header_bytes, remaining_time):
193+
if len(header_bytes) == protocol.HEADER_LENGTH:
194+
return protocol.unpack(header_bytes), remaining_time
195+
else:
196+
_log.warning(f'Failed to unpack headers. For header length of {protocol.HEADER_LENGTH},'
197+
f' received {len(header_bytes)} bytes: {header_bytes}')
198+
return None, remaining_time
160199
except (OSError, Exception) as e:
161-
_log.warning(f'{self.proxy_name}: Socket exception reading headers: {e}')
200+
# TODO: Why is this getting triggered at end of transmissions? _log.warning(f'{self.proxy_name}: Socket exception reading headers: {e}')
201+
return None, remaining_time if 'remaining_time' in locals() else self.max_io_wait_seconds
162202

163203
def _receive_socket(self, s: socket):
164-
_log.debug(f'{self.proxy_name}: IN RECEIVE SOCKET')
165-
headers = self._receive_headers(s)
204+
#_log.debug(f'{self.proxy_name}: IN RECEIVE SOCKET')
205+
headers, io_wait_time = self._receive_headers(s)
166206
if headers is not None and (cb_info := self.callbacks.get(headers.method_name)):
167207
remaining = headers.data_length
168208
buffer = b''
169209
done = False
170-
io_wait_time = self.max_io_wait_seconds
171210
while not done:
172211
try:
173212
while chunk := s.recv(read_length := max(0, remaining if remaining < self.chunk_size else self.chunk_size)):
@@ -194,11 +233,19 @@ def _receive_socket(self, s: socket):
194233
s.close()
195234
done = True
196235
elif headers:
236+
try:
237+
peer_name = f' from {s.getpeername()}'
238+
except OSError:
239+
peer_name = ''
197240
_log.warning(f'{self.proxy_name}: Received unknown method name: {headers.method_name}'
198-
f' from {s.getpeername()} with request ID: {headers.request_id}')
241+
f' {peer_name} with request ID: {headers.request_id}')
199242
s.close()
200243
else:
201-
_log.warning(f'{self.proxy_name}: Unable to read headers from socket: {s.getpeername()}')
244+
try:
245+
peer_name = f': {s.getpeername()}.'
246+
except OSError:
247+
peer_name = '.'
248+
# TODO: Why is this getting triggered at end of transmissions? _log.warning(f'{self.proxy_name}: Unable to read headers from socket: {peer_name}')
202249
s.close()
203250

204251
def _send_headers(self, s: socket, data_length: int, request_id: int, response_expected: bool, method_name: str,
@@ -214,17 +261,21 @@ def _send_headers(self, s: socket, data_length: int, request_id: int, response_e
214261
f' (request_id: {request_id}): {e}')
215262

216263
def _send_socket(self, s: socket):
217-
_log.debug(f'{self.proxy_name}: IN SEND SOCKET')
264+
#_log.debug(f'{self.proxy_name}: IN SEND SOCKET')
218265
if not (message := self.outbound_messages.get(s)):
219-
_log.warning(f'Outbound socket to {s.getpeername()} was ready, but no outbound message was found.')
266+
try:
267+
peer_name = f'to {s.getpeername()}'
268+
except OSError:
269+
peer_name = ''
270+
# TODO: Why is this getting triggered (apparently on every send)? _log.warning(f'Outbound socket to {peer_name} was ready, but no outbound message was found.')
220271
elif isinstance(message.payload, AsyncResult) and not message.payload.ready():
221272
self.outbounds.add(s)
222-
_log.debug('IN SEND SOCKET, WAS ADDED BACK TO OUTBOUND BECAUSE ASYNC_RESULT WAS NOT READY.')
273+
#_log.debug('IN SEND SOCKET, WAS ADDED BACK TO OUTBOUND BECAUSE ASYNC_RESULT WAS NOT READY.')
223274
else:
224275
payload = message.payload.get() if isinstance(message.payload, Greenlet) else message.payload
225276
self._send_headers(s, len(payload), message.request_id, message.response_expected, message.method_name)
226277
try:
227-
_log.debug('REACHED SENDALL IN GEVENT IPC SEND')
278+
#_log.debug('REACHED SENDALL IN GEVENT IPC SEND')
228279
s.sendall(payload) # TODO: Should we send in chunks and sleep in between?
229280
if message.response_expected:
230281
self.inbounds.add(s)
@@ -255,7 +306,7 @@ def _handle_exceptional_socket(self, s: socket):
255306

256307
def start(self, *_, **__):
257308
self._setup_inbound_server(self.inbound_params)
258-
_log.debug(f'{self.proxy_name} STARTED.')
309+
#_log.debug(f'{self.proxy_name} STARTED.')
259310

260311
def stop(self):
261312
self._stop = True

src/protocol_proxy/manager/base.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,19 +33,22 @@ def wait_peer_registered(self, peer, timeout, func=None, *args, **kwargs):
3333
"""
3434

3535
def _setup_proxy_process_command(self, unique_remote_id: tuple, **kwargs) -> tuple:
36-
_log.debug(f'UAI is: {unique_remote_id}')
36+
#_log.debug(f'@@@@@@@@@ PASSED UAI is: {unique_remote_id}')
3737
unique_remote_id = self.proxy_class.get_unique_remote_id(unique_remote_id)
38-
_log.debug(f'UAI is: {unique_remote_id}')
38+
#_log.debug(f'@@@@@@@@@ UAI AFTER LOOKUP is: {unique_remote_id}')
3939
proxy_id = self.get_proxy_id(unique_remote_id)
40+
#_log.debug('@@@@@@@@@ PROXY ID is: {}'.format(proxy_id))
4041
proxy_name = str(unique_remote_id)
4142
if proxy_id not in self.peers:
43+
#_log.debug(f'@@@@@@@@@ PROXY_ID IS NOT IN PEERS. SETTING UP COMMAND.')
4244
module, func = self.proxy_class.__module__, self.proxy_class.__name__
4345
protocol_specific_params = [i for pair in [(f"--{k.replace('_', '-')}", v)
4446
for k, v in kwargs.items()] for i in pair]
4547
command = [sys.executable, '-m', module, '--proxy-id', proxy_id.hex, '--proxy-name', proxy_name,
4648
'--manager-id', self.proxy_id.hex, '--manager-address', self.inbound_params.address,
4749
'--manager-port', str(self.inbound_params.port), *protocol_specific_params]
4850
else:
51+
#_log.debug(f'@@@@@@@@@ PROXY_ID IS IN PEERS. NOT SETTING UP COMMAND.')
4952
command = None
5053
return command, proxy_id, proxy_name
5154

0 commit comments

Comments
 (0)