Skip to content
4 changes: 2 additions & 2 deletions src/protocol_proxy/ipc/asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ def __init__(self, connector: AsyncioIPCConnector, on_lost_connection=None, buff
minimum_read_size: int = 76, outgoing_message=None, protocol_version: int = 1):
self.buffer_size = buffer_size
self.connector: AsyncioIPCConnector = connector
# _log.debug(f'{self.connector.proxy_name} INBOUND AIPC PROTOCOL: IN PROTOCOL INIT')
# _log.debug(f'{self.connector.proxy_name} INBOUND ASyncioIPC PROTOCOL: IN PROTOCOL INIT')
self.minimum_read_size = minimum_read_size # TODO: Default is V1 header length. Is this appropriate?
self.outgoing_message = outgoing_message
self.on_con_lost = on_lost_connection
Expand Down Expand Up @@ -139,7 +139,7 @@ def buffer_updated(self, n_bytes: int) -> None:
# if len(self.received_data) > version_end:
# if not (protocol := self.connector.PROTOCOL_VERSION.get(struct.unpack('>H', self.received_data[:2])[0])):
# raise NotImplementedError(f'Unknown protocol version ({protocol.VERSION})'
# f' received from: {self.transport.get_extra_info("peername")}')
# f' received from: {self.transport.get_extra_info("peer_name")}')
# header_end = version_end + protocol.HEADER_LENGTH

if self.count >= self.header_length:
Expand Down
4 changes: 2 additions & 2 deletions src/protocol_proxy/ipc/base.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import logging

from abc import abstractmethod, ABC
from abc import abstractmethod, ABC, ABCMeta
from collections.abc import Generator
from dataclasses import dataclass
from itertools import cycle
Expand Down Expand Up @@ -50,7 +50,7 @@ class ProtocolProxyPeer(ABC):
socket_params: SocketParams = None


class IPCConnector:
class IPCConnector(metaclass=ABCMeta):
PROTOCOL_VERSION = {1: HeadersV1}

def __init__(self, *, proxy_id: UUID, token: UUID, proxy_name: str = None, inbound_params: SocketParams = None,
Expand Down
6 changes: 5 additions & 1 deletion src/protocol_proxy/ipc/decorator.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,21 @@
import logging

from typing import Any

_log = logging.getLogger(__name__)


# TODO: Do we need an Asyncio version of this?
# TODO: Did this work with the AsyncResult removed (just returns, possibly within greenlet)?
def callback(func):
def verify(self, ipc, headers, raw_message: any):
def verify(self, ipc, headers, raw_message: bytes):
if peer := ipc.peers.get(headers.sender_id):
if headers.sender_token == peer.token:
return func(self, headers, raw_message)
else:
_log.warning(f'Unable to authenticate caller: {headers.sender_id}')
return None
else:
_log.warning(f'Request from unknown party: {headers.sender_id}')
return None
return verify
2 changes: 1 addition & 1 deletion src/protocol_proxy/ipc/gevent.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ def _receive_socket(self, s: socket):
remaining = headers.data_length
buffer = b''
done = False
io_wait_time = 0.0
io_wait_time = self.max_io_wait_seconds
while not done:
try:
while chunk := s.recv(read_length := max(0, remaining if remaining < self.chunk_size else self.chunk_size)):
Expand Down
12 changes: 6 additions & 6 deletions src/protocol_proxy/ipc/headers.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
import abc
import logging
import struct

from abc import abstractmethod, ABCMeta
from uuid import UUID

_log = logging.getLogger(__name__)


class ProtocolHeaders:
class ProtocolHeaders(metaclass=ABCMeta):
# The endian_indicator and version number are added where needed and not included in the portion of the format here.
FORMAT = 'Q32sH16s16s'
HEADER_LENGTH = struct.calcsize(FORMAT)
VERSION = 0 # 2 byte (16 bit) integer (H)

@abc.abstractmethod
@abstractmethod
def __init__(self, data_length: int, method_name: str, request_id: int, sender_id, sender_token, **kwargs):
if kwargs:
_log.warning(f'Received extra kwargs for Proxy Protocol version {self.VERSION}: {list(kwargs.keys())}')
Expand All @@ -28,20 +28,20 @@ def __init__(self, data_length: int, method_name: str, request_id: int, sender_i
def bitflag_is_set(bit_position, byte_value):
return bool((byte_value & (1 << bit_position)) >> bit_position)

@abc.abstractmethod
@abstractmethod
def pack(self):
# TODO: The sender_id is currently the proxy_id, which is a tuple. This should probably become a UUID?
# (Need to figure out how/where to map one to the other.)
return struct.pack(f'>H{ProtocolHeaders.FORMAT}', self.VERSION, self.data_length,
self.method_name.encode('utf8'), self.request_id, self.sender_id.bytes,
self.sender_token.bytes)

@abc.abstractmethod
@abstractmethod
def unpack(self, header_bytes):
pass

def __repr__(self):
return f'ProtocolHeaders(FORMAT={self.FORMAT}, VERISON={self.VERSION},' \
return f'ProtocolHeaders(FORMAT={self.FORMAT}, VERSION={self.VERSION},' \
f'data_length={self.data_length}, method_name={self.method_name},' \
f' request_id={self.request_id}, sender_id={self.sender_id}, sender_token={self.sender_token}'

Expand Down
27 changes: 16 additions & 11 deletions src/protocol_proxy/manager/asyncio.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
import asyncio
import atexit
import logging
import os
import signal

from abc import ABC
from asyncio.subprocess import Process
from asyncio import StreamReader, subprocess
from uuid import uuid4
from typing import Type

Expand Down Expand Up @@ -41,21 +40,27 @@ async def wait_peer_registered(self, peer, timeout, func=None, *args, **kwargs):
async def get_proxy(self, unique_remote_id: tuple, **kwargs) -> ProtocolProxyPeer:
command, proxy_id, proxy_name = self._setup_proxy_process_command(unique_remote_id, **kwargs) # , proxy_env
if command:
proxy_process = await asyncio.create_subprocess_exec(*command, stdin=asyncio.subprocess.PIPE)
# , stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE)
# TODO: Implement logging along lines of AIP.start_agent() (uncomment PIPES above too).
proxy_process = await asyncio.create_subprocess_exec(*command, stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE)
self.loop.create_task(self.log_subprocess_output(proxy_process.stdout))
self.loop.create_task(self.log_subprocess_output(proxy_process.stderr))
_log.info(f"PPM: Created new ProtocolProxy {proxy_name} with ID {str(proxy_id)}, pid: {proxy_process.pid}")
peer_token = uuid4()
proxy_process.stdin.write(peer_token.hex.encode())
proxy_process.stdin.write(self.token.hex.encode())
await proxy_process.stdin.drain()
proxy_process.stdin.close()
proxy_process.stdin = open(os.devnull)
proxy_process.stdin = None
self.peers[proxy_id] = AsyncioProtocolProxyPeer(process=proxy_process, proxy_id=proxy_id, token=peer_token)
self._setup_exit(proxy_process)
atexit.register(self.finalize_process, proxy_process)
return self.peers[proxy_id]

async def log_subprocess_output(self, stream: StreamReader):
while not stream.at_eof():
raw_line = await stream.readline()
self.log_subprocess_output_line(raw_line)

@callback
async def handle_peer_registration(self, headers: ProtocolHeaders, raw_message: bytes):
proxy: AsyncioProtocolProxyPeer | None = self.peers.get(headers.sender_id)
Expand All @@ -68,19 +73,19 @@ async def handle_peer_registration(self, headers: ProtocolHeaders, raw_message:
return success

@staticmethod
def _setup_exit(process: Process):
def _setup_exit(process: subprocess.Process):
"""Set up cleanup for the proxy process on exit."""
def cleanup_func(process):
if process.returncode is None:
def cleanup_func(proc):
if proc.returncode is None:
try:
process.terminate()
proc.terminate()
except ProcessLookupError:
pass
asyncio.get_event_loop().add_signal_handler(signal.SIGTERM, cleanup_func, process)
asyncio.get_event_loop().add_signal_handler(signal.SIGINT, cleanup_func, process)

@staticmethod
def finalize_process(process: Process):
def finalize_process(process: subprocess.Process):
try:
process.kill()
except ProcessLookupError:
Expand Down
33 changes: 24 additions & 9 deletions src/protocol_proxy/manager/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,16 +45,9 @@ def _setup_proxy_process_command(self, unique_remote_id: tuple, **kwargs) -> tup
command = [sys.executable, '-m', module, '--proxy-id', proxy_id.hex, '--proxy-name', proxy_name,
'--manager-id', self.proxy_id.hex, '--manager-address', self.inbound_params.address,
'--manager-port', str(self.inbound_params.port), *protocol_specific_params]

# # TODO: Discuss with Riley why/whether this block was necessary and/or helpful:
# # Set PYTHONPATH so the proxy subprocess can import protocol_proxy
# proxy_env = os.environ.copy()
# src_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '../..', 'src'))
# proxy_env['PYTHONPATH'] = src_dir + os.pathsep + proxy_env.get('PYTHONPATH', '')
# # TODO: END block to discuss.
else:
command = None #, proxy_env = None, None
return command, proxy_id, proxy_name # , proxy_env
command = None
return command, proxy_id, proxy_name

@classmethod
@abstractmethod
Expand All @@ -76,6 +69,7 @@ def get_proxy(cls, unique_remote_id: tuple, **kwargs) -> tuple[Self, ProtocolPro
return manager, manager.get_proxy(unique_remote_id, **kwargs)
except (ImportError, ValueError) as e:
_log.warning(f'Unable to find a manager for get_proxy call: {e}')
return None

def get_proxy_id(self, unique_remote_id: tuple | str) -> UUID:
"""Lookup or create a UUID for the proxy server
Expand Down Expand Up @@ -139,3 +133,24 @@ def get_by_proxy_id(cls, proxy_id: UUID) -> tuple[Self | None, ProtocolProxyPeer
if proxy_id in manager.peers:
return manager, manager.peers[proxy_id]
return None, None

@abstractmethod
def log_subprocess_output(self, stream):
pass

@staticmethod
def log_subprocess_output_line(raw_line):
line = raw_line.decode().strip()
try:
try:
entry = json.loads(line)
log = logging.getLogger(entry['name'])
log.log(logging.getLevelName(entry['level'].upper()), f":{entry['lineno']} {entry['message']}")
except (json.JSONDecodeError, KeyError, TypeError):
log = logging.getLogger(__name__)
log.log(logging.ERROR, line)
except UnicodeDecodeError:
log = logging.getLogger(__name__)
log.log(logging.ERROR, raw_line)
except Exception as e:
_log.error(f'Encountered unknown exception parsing logs from proxy: {e}')
21 changes: 13 additions & 8 deletions src/protocol_proxy/manager/gevent.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
import os

from abc import ABC
from gevent import sleep, with_timeout
from gevent import sleep, spawn, with_timeout
from gevent.subprocess import Popen, PIPE
from gevent.timeout import Timeout
from uuid import uuid4
from typing import Type
from typing import IO, Type

from ..ipc import callback, ProtocolHeaders, ProtocolProxyPeer
from ..ipc.gevent import GeventIPCConnector, GeventProtocolProxyPeer
Expand Down Expand Up @@ -43,10 +43,11 @@ def wait_for_peer_registration():
def get_proxy(self, unique_remote_id: tuple, **kwargs) -> ProtocolProxyPeer:
command, proxy_id, proxy_name = self._setup_proxy_process_command(unique_remote_id, **kwargs) # , proxy_env
if command:
# TODO: proxy_env parameter was added with block to discuss in super._setup_proxy_process_command(). Remove if that is.
proxy_process = Popen(command, stdin=PIPE) #, env=proxy_env)
# , stdout=PIPE, stderr=PIPE)
# TODO: Implement logging along lines of AIP.start_agent() (uncomment PIPES above too).
proxy_process = Popen(command, stdin=PIPE, stdout=PIPE, stderr=PIPE)
_log.info("proxy %s has PID %s", self.proxy_name, proxy_process.pid)
spawn(self.log_subprocess_output, proxy_process.stdout)
spawn(self.log_subprocess_output, proxy_process.stderr)
# TODO: Ensure that logging as implemented fits with VOLTTRON logging once that is fixed..
_log.info(f"PPM: Created new ProtocolProxy {proxy_name} with ID {str(proxy_id)}, pid: {proxy_process.pid}")
new_peer_token = uuid4()
proxy_process.stdin.write(new_peer_token.hex.encode())
Expand All @@ -61,6 +62,11 @@ def get_proxy(self, unique_remote_id: tuple, **kwargs) -> ProtocolProxyPeer:
_log.debug(f"PPM: Proxy {proxy_id} created, waiting for registration before sending.")
return self.peers[proxy_id]

def log_subprocess_output(self, stream: IO[bytes]):
"""Reads lines from a pipe and logs them."""
for raw_line in iter(stream.readline, b''):
self.log_subprocess_output_line(raw_line)

@callback
def handle_peer_registration(self, headers: ProtocolHeaders, raw_message: bytes):
return super().handle_peer_registration(headers, raw_message)
Expand All @@ -79,5 +85,4 @@ def _cleanup_proxy_process(self, process: Popen, timeout: float = 5.0):
except Timeout:
process.kill()
except Exception as e:
_log.warning(f'Exception encountered attempting to terminate proxy process: {process.pid}')

_log.warning(f'Exception encountered attempting to terminate proxy process ({process.pid}): {e}')
5 changes: 3 additions & 2 deletions src/protocol_proxy/proxy/asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import logging

from abc import ABC
from typing import cast
from uuid import UUID

from ..ipc.asyncio import AsyncioIPCConnector, Future, AsyncioProtocolProxyPeer, SocketParams
Expand All @@ -14,7 +15,7 @@ class AsyncioProtocolProxy(AsyncioIPCConnector, ProtocolProxy, ABC):
def __init__(self, manager_address: str, manager_port: int, manager_id: UUID, manager_token: UUID, token: UUID,
proxy_id: UUID, proxy_name: str = None, registration_retry_delay: float = 20.0, **kwargs):
super(AsyncioProtocolProxy, self).__init__(manager_address=manager_address, manager_port=manager_port,
manager_id=manager_id, manager_token=manager_token, proxy_id=proxy_id,
manager_id=manager_id, proxy_id=proxy_id,
registration_retry_delay=registration_retry_delay,
token=token, proxy_name=proxy_name, **kwargs)
self.peers[manager_id] = AsyncioProtocolProxyPeer(proxy_id=manager_id, socket_params=self.manager_params,
Expand Down Expand Up @@ -43,7 +44,7 @@ async def send_registration(self, remote: AsyncioProtocolProxyPeer):

async def start(self):
await super(AsyncioProtocolProxy, self).start()
await self.send_registration(self.peers[self.manager])
await self.send_registration(cast(AsyncioProtocolProxyPeer, self.peers[self.manager]))

async with self.inbound_server:
await self.inbound_server.serve_forever()
4 changes: 2 additions & 2 deletions src/protocol_proxy/proxy/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@


# noinspection PyMissingConstructor
class ProtocolProxy(IPCConnector):
def __init__(self, *, manager_address: str, manager_port: int, manager_id: UUID, manager_token: UUID,
class ProtocolProxy(IPCConnector, metaclass=abc.ABCMeta):
def __init__(self, *, manager_address: str, manager_port: int, manager_id: UUID,
registration_retry_delay: float = 20.0, **kwargs):
"""NOTE: Proxy implementations MUST:
1. Subclass a multitasking subclass of IPCConnector (gevent, asyncio, etc.)
Expand Down
5 changes: 3 additions & 2 deletions src/protocol_proxy/proxy/gevent.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from abc import ABC
from gevent import sleep, spawn
from gevent.event import AsyncResult
from typing import cast
from uuid import UUID

from ..ipc.gevent import GeventIPCConnector, GeventProtocolProxyPeer, SocketParams
Expand All @@ -18,11 +19,11 @@ def __init__(self, *, proxy_id: UUID, token: UUID, manager_address: str, manager
"""
super(GeventProtocolProxy, self).__init__(proxy_id=proxy_id, token=token, proxy_name=proxy_name,
manager_address=manager_address, manager_port=manager_port,
manager_id=manager_id, manager_token=manager_token,
manager_id=manager_id,
registration_retry_delay=registration_retry_delay, **kwargs)
self.peers[manager_id] = GeventProtocolProxyPeer(proxy_id=manager_id, socket_params=self.manager_params,
token=manager_token)
spawn(self.send_registration, self.peers[manager_id])
spawn(self.send_registration, cast(GeventProtocolProxyPeer, self.peers[manager_id]))

def get_local_socket_params(self) -> SocketParams:
return self.inbound_server_socket.getsockname()
Expand Down
11 changes: 9 additions & 2 deletions src/protocol_proxy/proxy/launch.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
import logging
import sys

from argparse import ArgumentParser
from asyncio import iscoroutinefunction, run
from typing import Callable
from uuid import UUID

logging.basicConfig(
level=logging.DEBUG, stream=sys.stdout,
format='{"name": "%(name)s", "lineno": "%(lineno)d", "level": "%(levelname)s", "message": "%(message)s"}'
)
_log = logging.getLogger(__name__)

def proxy_command_parser(parser: ArgumentParser = None):
parser = parser if parser else ArgumentParser()
parser.add_argument('--proxy-id', type=UUID, required=True)
Expand All @@ -26,8 +33,8 @@ def launch(launcher_func: Callable):
parser = proxy_command_parser()
parser, proxy_runner = launcher_func(parser)
opts = parser.parse_args()
proxy_token = UUID(hex=sys.stdin.buffer.read(32).hex())
manager_token = UUID(hex=sys.stdin.buffer.read(32).hex())
proxy_token = UUID(hex=sys.stdin.buffer.read(32).decode('utf8'))
manager_token = UUID(hex=sys.stdin.buffer.read(32).decode('utf8'))
if iscoroutinefunction(proxy_runner):
run(proxy_runner(token=proxy_token, manager_token=manager_token, **vars(opts)))
else:
Expand Down
Loading