Skip to content
14 changes: 12 additions & 2 deletions pyleco/actors/actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@

import zmq

from pyleco.core.message import Message

from ..utils.message_handler import MessageHandler
from ..utils.data_publisher import DataPublisher
from ..utils.extended_data_publisher import ExtendedDataPublisher as DataPublisher
from ..utils.timers import RepeatingTimer


Expand Down Expand Up @@ -101,7 +103,9 @@ def __init__(
self.pipeL.connect(f"inproc://listenerPipe:{pipe_port}")

self.timer = RepeatingTimer(interval=periodic_reading, function=self.queue_readout)
self.publisher = DataPublisher(full_name=name, log=self.root_logger)
self.publisher = DataPublisher(
full_name=name, log=self.root_logger, send_message_method=self.send_message
)

if auto_connect:
self.connect(**auto_connect)
Expand All @@ -118,13 +122,19 @@ def register_rpc_methods(self) -> None:
self.register_rpc_method(self.set_polling_interval)
self.register_rpc_method(self.connect)
self.register_rpc_method(self.disconnect)
self.register_rpc_method(self.publisher.register_subscriber)
self.register_rpc_method(self.publisher.unregister_subscriber)

def register_device_method(self, method: Callable) -> None:
"""Make a device method available via RPC. The method name is prefixed with `device.`."""
# TODO TBD how to call a device method?
name = method.__name__
self.register_rpc_method(method=method, name="device." + name)

def handle_json_error(self, message: Message) -> None:
self.publisher.handle_json_error(message=message)
return super().handle_json_error(message)

def __del__(self) -> None:
self.disconnect()

Expand Down
2 changes: 2 additions & 0 deletions pyleco/json_utils/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@
class JSONRPCError(Exception):
"""Base error that all JSON RPC exceptions extend."""

rpc_error: JsonRpcError

def __init__(self, error: JsonRpcError) -> None:
msg = f"{error.code}: {error.message}"
self.rpc_error = error
Expand Down
110 changes: 110 additions & 0 deletions pyleco/utils/extended_data_publisher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
#
# This file is part of the PyLECO package.
#
# Copyright (c) 2023-2024 PyLECO Developers
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# Copyright (c) 2023-2024 PyLECO Developers
# Copyright (c) 2023-2025 PyLECO Developers

#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
#

from __future__ import annotations
from json import JSONDecodeError
from typing import Any, cast, Callable, Generator, Union

from ..json_utils.errors import JSONRPCError, NODE_UNKNOWN, RECEIVER_UNKNOWN, METHOD_NOT_FOUND
from ..json_utils.json_objects import Notification
from ..core.message import Message, MessageTypes
from ..core.data_message import DataMessage
from ..json_utils.rpc_generator import RPCGenerator
from .data_publisher import DataPublisher

class ExtendedDataPublisher(DataPublisher):
"""A DataPublisher, which sends the data also via the control protocol.

Handle unsolicited error messages, e.g. unavailable subscribers or not implemented receiving
method, with :meth:`handle_json_error` to remove these subscribers from the list of subscribers.
"""

def __init__(
self, full_name: str, send_message_method: Callable[[Message], None], **kwargs
) -> None:
super().__init__(full_name, **kwargs)
self.send_control_message = send_message_method
self.subscribers: set[bytes] = set()
self.rpc_generator = RPCGenerator()

def register_subscriber(self, subscriber: Union[bytes, str]) -> None:
"""Register a subscriber, that it may receive data messages via command protocol."""
if isinstance(subscriber, str):
subscriber = subscriber.encode()
self.subscribers.add(subscriber)

def unregister_subscriber(self, subscriber: Union[bytes, str]) -> None:
"""Unregister a subscriber, that it may not receive data messages via command protocol."""
if isinstance(subscriber, str):
subscriber = subscriber.encode()
self.subscribers.discard(subscriber)

def convert_data_message_to_messages(
self, data_message: DataMessage, receivers: Union[set[Union[bytes, str]], set[bytes]],
) -> Generator[Message, Any, Any]:
cid = data_message.conversation_id
raw_message = Message(
receiver="dummy",
data=Notification("add_subscription_message"),
conversation_id=cid,
additional_payload=data_message.payload,
message_type=MessageTypes.JSON,
)
for receiver in receivers:
raw_message.receiver = receiver.encode() if isinstance(receiver, str) else receiver
yield raw_message

def send_message(self, message: DataMessage) -> None:
super().send_message(message)
for msg in self.convert_data_message_to_messages(message, self.subscribers):
self.send_control_message(msg)

def handle_json_error(self, message: Message) -> None:
"""Unregister unavailable subscribers in an error message.

Call this method from wherever you handle incoming json errors, for example in the
message handler.
"""
try:
data: dict[str, Any] = message.data # type: ignore
except JSONDecodeError as exc:
self.log.exception(f"Could not decode json message {message}", exc_info=exc)
return
try:
self.rpc_generator.get_result_from_response(data)
except JSONRPCError as exc:
error_code = exc.rpc_error.code
try:
error_data = cast(str, exc.rpc_error.data) # type: ignore
except AttributeError:
return
if error_code in (RECEIVER_UNKNOWN.code, METHOD_NOT_FOUND.code):
self.unregister_subscriber(error_data)
if error_code == NODE_UNKNOWN.code:
if isinstance(error_data, str):
error_data = error_data.encode()
for subscriber in self.subscribers:
if subscriber.startswith(error_data):
self.unregister_subscriber(subscriber)

22 changes: 21 additions & 1 deletion pyleco/utils/extended_message_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from __future__ import annotations
import json
import pickle
from typing import Optional
from typing import Optional, Union

import zmq

Expand Down Expand Up @@ -59,6 +59,7 @@ def register_rpc_methods(self) -> None:
self.register_rpc_method(self.subscribe)
self.register_rpc_method(self.unsubscribe)
self.register_rpc_method(self.unsubscribe_all)
self.register_rpc_method(self.add_subscription_message)

def close(self) -> None:
self.subscriber.close(1)
Expand Down Expand Up @@ -126,3 +127,22 @@ def unsubscribe_all(self) -> None:
"""Unsubscribe from all subscriptions."""
while self._subscriptions:
self.unsubscribe_single(self._subscriptions.pop())

# methods for data protocol via control protocol
def subscribe_via_control(self, topic: Union[bytes, str]) -> None:
"""Subscribe to a topic via the control protocol."""
self.ask_rpc(receiver=topic, method="register_subscriber")

def unsubscribe_via_control(self, topic: Union[bytes, str]) -> None:
"""Unsubscribe to a topic via the control protocol."""
self.ask_rpc(receiver=topic, method="unregister_subscriber")

def add_subscription_message(self) -> None:
"""Set a subscription message as if it had been received via data protocol."""
msg = self.current_message
dm = DataMessage(
topic=msg.sender,
conversation_id=msg.conversation_id,
additional_payload=msg.payload[1:],
)
self.handle_subscription_message(dm)
18 changes: 18 additions & 0 deletions tests/acceptance_tests/test_director_actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,14 @@ def binary_method_created(additional_payload: list[bytes]) -> tuple[None, list[b
"""Receive binary data and return it. Create binary method by registering it."""
return None, [additional_payload[0] * 2]

def publish():
actor.publisher.send_data("super content")

actor.register_rpc_method(binary_method_manually)
actor.register_binary_rpc_method(
binary_method_created, accept_binary_input=True, return_binary_output=True
)
actor.register_rpc_method(publish)
actor.connect()
actor.rpc.method()(actor.device.triple)
actor.register_device_method(actor.device.triple)
Expand Down Expand Up @@ -165,3 +169,17 @@ def test_binary_data_transfer_created(director: Director):
assert director.ask_rpc(
method="binary_method_created", additional_payload=[b"123"], extract_additional_payload=True
) == (None, [b"123123"])


def test_data_via_control_protocol(director: Director):
# act
director.ask_rpc("register_subscriber")
director.ask_rpc("publish")

msg = director.communicator.read_message(conversation_id=None)

# teardown
director.ask_rpc("unregister_subscriber")

assert msg.data == {"jsonrpc": "2.0", "method": "add_subscription_message"}
assert msg.payload[1:] == [b'super content']
118 changes: 118 additions & 0 deletions tests/utils/test_extended_data_publisher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
#
# This file is part of the PyLECO package.
#
# Copyright (c) 2023-2024 PyLECO Developers
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
#

import pytest

from pyleco.test import FakeContext
from pyleco.core.message import Message, MessageTypes
from pyleco.core.data_message import DataMessage
from pyleco.json_utils.json_objects import Notification
from pyleco.utils.extended_data_publisher import ExtendedDataPublisher


CID = b"conversation_id;"
messages = [] # for tests

@pytest.fixture
def fake_send_message():
global messages
messages = []
def _fsm(message: Message):
global messages
messages.append(message)
return _fsm


@pytest.fixture
def publisher(fake_send_message) -> ExtendedDataPublisher:
publisher = ExtendedDataPublisher(
"fn", send_message_method=fake_send_message,
context=FakeContext(), # type: ignore
)
return publisher


@pytest.fixture
def data_message() -> DataMessage:
return DataMessage(
topic="topic", conversation_id=CID, data=b"0", additional_payload=[b"1", b"2"]
)


def test_register_subscribers(publisher: ExtendedDataPublisher):
# act
publisher.register_subscriber("abcdef")
assert b"abcdef" in publisher.subscribers

publisher.register_subscriber(b"ghi")
assert b"ghi" in publisher.subscribers


def test_unregister_subscribers(publisher: ExtendedDataPublisher):
# arrange
publisher.subscribers.add(b"abc")
publisher.subscribers.add(b"def")
# act
# str
publisher.unregister_subscriber("abc")
assert b"abc" not in publisher.subscribers
# bytes
publisher.unregister_subscriber(b"def")
assert b"def" not in publisher.subscribers
# assert that no error is raised at repeated unregistering
publisher.unregister_subscriber(b"def")


@pytest.mark.parametrize("receivers", (set(), {b"abc"}, {b"abc", b"def"}, {"string"}))
def test_convert(publisher: ExtendedDataPublisher, receivers, data_message: DataMessage):
msgs = list(publisher.convert_data_message_to_messages(data_message, receivers=receivers))
assert len(msgs) == len(receivers)
for rec, msg in zip(receivers, msgs):
assert msg == Message(
receiver=rec,
data=Notification(method="add_subscription_message"),
conversation_id=CID,
message_type=MessageTypes.JSON,
additional_payload=data_message.payload,
)


def test_send_message(publisher: ExtendedDataPublisher, data_message: DataMessage):
# arrange
publisher.register_subscriber("abc")
# act
publisher.send_message(data_message)
# assert that the data message is sent
assert publisher.socket._s == [data_message.to_frames()]
# assert that the control message is sent
global messages
assert messages == [
Message(
"abc",
data=Notification(method="add_subscription_message"),
conversation_id=CID,
message_type=MessageTypes.JSON,
additional_payload=data_message.payload,
)
]
Loading
Loading