From 194fd788d300c20e9b49c894894be2fedab5d82d Mon Sep 17 00:00:00 2001 From: Martin Raspaud Date: Thu, 26 Feb 2026 12:09:54 +0100 Subject: [PATCH 1/7] Do not replace __dict__ when creating a message Also introduced `from_string` and deprecates the `rawstr` argument of Message instantiation --- posttroll/message.py | 63 ++++++++++++++++++++++++++++----- posttroll/tests/test_message.py | 22 ++++++------ 2 files changed, 66 insertions(+), 19 deletions(-) diff --git a/posttroll/message.py b/posttroll/message.py index 5c66250..5906124 100644 --- a/posttroll/message.py +++ b/posttroll/message.py @@ -18,6 +18,7 @@ import re from functools import partial from typing import Any, Callable +from warnings import warn from posttroll import config @@ -98,9 +99,11 @@ class Message: def __init__(self, subject:str="", atype:str="", data:str|dict[str, Any]="", binary:bool=False, rawstr:str|bytes|None=None, version:str|None=None): - """Initialize a Message from a subject, type and data, or from a raw string.""" + """Initialize a Message from a subject, type and data.""" if rawstr: - self.__dict__ = _decode(rawstr) + warn("The `rawstr` argument of `Message` instantiation is being depracated in favour of the class method" + "`Message.from_string`.", PendingDeprecationWarning) + self._decode_string(rawstr) else: self.subject:str = subject self.type:str = atype @@ -136,7 +139,7 @@ def head(self): @staticmethod def decode(rawstr:str|bytes): """Decode a raw string into a Message.""" - return Message(rawstr=rawstr) + return Message.from_string(rawstr) def encode(self) -> str: """Encode a Message to a raw string.""" @@ -167,14 +170,58 @@ def _validate(self): raise MessageError("Invalid data: data is not JSON serializable: %s" % str(self.data)) - def __getstate__(self): + def __getstate__(self) -> str: """Get the Message state for pickle().""" return self.encode() - def __setstate__(self, state): + def __setstate__(self, state: str): """Set the Message when unpickling.""" - self.__dict__.clear() - self.__dict__ = _decode(state) + self._decode_string(state) + + def _decode_string(self, rawstr:str|bytes): + """Convert a raw string to a Message.""" + rawstr = _check_for_magic_word(rawstr) + + raw = _check_for_element_count(rawstr) + version = _check_for_version(raw) + + # Start to build message + self.subject = raw[0].strip() + self.type = raw[1].strip() + self.sender = raw[2].strip() + self.time = dt.datetime.fromisoformat(raw[3].strip()) + self.version = version + + # Data part + try: + mimetype = raw[5].lower() + except IndexError: + mimetype = None + self.data = "" + self.binary = False + return + else: + data = raw[6] + + if mimetype == "application/json": + try: + self.data = json.loads(raw[6], object_hook=datetime_decoder) + self.binary = False + except ValueError: + raise MessageError("JSON decode failed on '%s ...'" % raw[6][:36]) + elif mimetype == "text/ascii": + self.data = str(data) + self.binary = False + elif mimetype == "binary/octet-stream": + self.data = data + self.binary = True + else: + raise MessageError("Unknown mime-type '%s'" % mimetype) + + @classmethod + def from_string(cls, rawstr:str|bytes): + """Create a message from string.""" + return cls(rawstr=rawstr) # ----------------------------------------------------------------------------- @@ -184,7 +231,7 @@ def __setstate__(self, state): # ----------------------------------------------------------------------------- -def _is_valid_version(version): +def _is_valid_version(version: str) -> bool: """Check version.""" return version <= CURRENT_MESSAGE_VERSION diff --git a/posttroll/tests/test_message.py b/posttroll/tests/test_message.py index 2cbdaa5..8e7a410 100644 --- a/posttroll/tests/test_message.py +++ b/posttroll/tests/test_message.py @@ -39,7 +39,7 @@ def test_encode_decode(): @pytest.mark.parametrize("dstr", [r"2008-04-11T22:13:22.123000", r"2008-04-11T22:13:22.123000+00:00"]) -def test_decode(dstr): +def test_decode(dstr: str): """Test the decoding of a message.""" rawstr = (_MAGICK + r"/test/1/2/3 info ras@hawaii " + dstr + r" v1.2" + @@ -62,26 +62,26 @@ def test_encode(): @pytest.mark.parametrize("dstr", [r"2008-04-11T22:13:22.123000", r"2008-04-11T22:13:22.123000+00:00"]) -def test_unicode(dstr): +def test_unicode(dstr: str): """Test handling of unicode.""" msg = ("pytroll://PPS-monitorplot/3/norrköping/utv/polar/direct_readout/ file " "safusr.u@lxserv1096.smhi.se " + dstr + ' v1.2 application/json' ' {"start_time": "' + dstr + '"}') - assert msg == str(Message(rawstr=msg)) + assert msg == str(Message.from_string(msg)) msg = (u"pytroll://oper/polar/direct_readout/norrköping pong sat@MERLIN " + dstr + r' v1.2 application/json {"station": "norrk\u00f6ping"}') - assert msg == str(Message(rawstr=msg)) + assert msg == str(Message.from_string(msg)) @pytest.mark.parametrize("dstr", [r"2008-04-11T22:13:22.123000", r"2008-04-11T22:13:22.123000+00:00"]) -def test_iso(dstr): +def test_iso(dstr: str): """Test handling of iso-8859-1.""" msg = ("pytroll://oper/polar/direct_readout/norrköping pong sat@MERLIN " + dstr + ' v1.01 application/json {"station": "norrköping"}') iso_msg = msg.encode("iso-8859-1") - Message(rawstr=iso_msg) + Message.from_string(iso_msg) def test_pickle(): @@ -105,7 +105,7 @@ def test_pickle(): @pytest.mark.parametrize("mda", [TZ_UNAWARE_METADATA, TZ_AWARE_METADATA]) -def test_metadata(mda): +def test_metadata(mda: dict[str, dt.datetime|str|int|float]): """Test metadata encoding/decoding.""" metadata = copy.copy(mda) msg = Message.decode(Message("/sat/polar/smb/level1", "file", @@ -117,7 +117,7 @@ def test_metadata(mda): @pytest.mark.parametrize(("mda", "compare_file"), [(TZ_UNAWARE_METADATA, "/message_metadata_unaware.dumps"), (TZ_AWARE_METADATA, "/message_metadata_aware.dumps")]) -def test_serialization(mda, compare_file): +def test_serialization(mda: dict[str, dt.datetime|str|int|float], compare_file: str): """Test json serialization.""" import json metadata = copy.copy(mda) @@ -142,7 +142,7 @@ def test_message_can_take_version(): assert msg.version == version rawmsg = str(msg) assert version in rawmsg - msg = Message(rawstr=rawmsg) + msg = Message.from_string(rawmsg) assert msg.version == version @@ -154,7 +154,7 @@ def test_message_can_generate_v1_01(): version=version) rawmsg = str(msg) assert "+00:00" not in rawmsg.split(" ", 6)[-1] - msg = Message(rawstr=rawmsg) + msg = Message.from_string(rawmsg) assert "+00:00" not in rawmsg.split(" ", 6)[-1] assert str(msg) == rawmsg @@ -165,7 +165,7 @@ def test_message_has_timezone_by_default(): data=dict(start_time=dt.datetime.now(dt.timezone.utc))) rawmsg = str(msg) assert "+00:00" in rawmsg - msg = Message(rawstr=rawmsg) + msg = Message.from_string(rawmsg) assert "+00:00" in str(msg) assert str(msg) == rawmsg From b13c3edc3ac14d6899a7cebaf31ae2cc0db353e1 Mon Sep 17 00:00:00 2001 From: Martin Raspaud Date: Thu, 26 Feb 2026 12:14:36 +0100 Subject: [PATCH 2/7] Make keepalive options universal --- posttroll/backends/zmq/__init__.py | 13 +++---------- posttroll/backends/zmq/publisher.py | 9 ++++----- posttroll/backends/zmq/socket.py | 21 +++++++++------------ posttroll/backends/zmq/subscriber.py | 12 ++++-------- posttroll/publisher.py | 6 +++--- 5 files changed, 23 insertions(+), 38 deletions(-) diff --git a/posttroll/backends/zmq/__init__.py b/posttroll/backends/zmq/__init__.py index 1e6876f..ebcc7ba 100644 --- a/posttroll/backends/zmq/__init__.py +++ b/posttroll/backends/zmq/__init__.py @@ -25,22 +25,15 @@ def get_context() -> zmq.Context: return context[pid] -def destroy_context(linger=None): +def destroy_context(linger: int|None = None): """Destroy the context.""" pid = os.getpid() context.pop(pid).destroy(linger) -def _set_tcp_keepalive(socket): - """Set the tcp keepalive parameters on *socket*.""" - keepalive_options = get_tcp_keepalive_options() - for param, value in keepalive_options.items(): - socket.setsockopt(param, value) - - -def get_tcp_keepalive_options(): +def get_tcp_keepalive_options() -> dict[int, int]: """Get the tcp_keepalive options from config.""" - keepalive_options = dict() + keepalive_options: dict[int, int] = dict() for opt in ("tcp_keepalive", "tcp_keepalive_cnt", "tcp_keepalive_idle", diff --git a/posttroll/backends/zmq/publisher.py b/posttroll/backends/zmq/publisher.py index 47cc0ae..7f9a441 100644 --- a/posttroll/backends/zmq/publisher.py +++ b/posttroll/backends/zmq/publisher.py @@ -6,7 +6,6 @@ import zmq -from posttroll.backends.zmq import get_tcp_keepalive_options from posttroll.backends.zmq.socket import close_socket, set_up_server_socket LOGGER = logging.getLogger(__name__) @@ -42,12 +41,12 @@ def start(self): return self def _create_socket(self): - options = get_tcp_keepalive_options() - self.publish_socket, port, self._authenticator = set_up_server_socket(zmq.PUB, self.destination, options, - (self.min_port, self.max_port)) + self.publish_socket, port, self._authenticator = set_up_server_socket(zmq.PUB, self.destination, + port_interval=(self.min_port, + self.max_port)) self.port_number = port - def send(self, msg): + def send(self, msg: str): """Send the given message.""" with self._pub_lock: self.publish_socket.send_string(msg) diff --git a/posttroll/backends/zmq/socket.py b/posttroll/backends/zmq/socket.py index f115bb8..0cab4f6 100644 --- a/posttroll/backends/zmq/socket.py +++ b/posttroll/backends/zmq/socket.py @@ -12,7 +12,7 @@ from zmq.auth.thread import ThreadAuthenticator from posttroll import config -from posttroll.backends.zmq import get_context +from posttroll.backends.zmq import get_context, get_tcp_keepalive_options from posttroll.message import Message, MessageError authenticator_lock = Lock() @@ -26,9 +26,8 @@ def close_socket(sock: zmq.Socket[int]): def set_up_client_socket(socket_type: int, address: str, - options: dict[int|str, str]|None = None, backend: str|None = None) -> zmq.Socket[int]: + options: dict[int, int]|None = None, backend: str|None = None) -> zmq.Socket[int]: """Set up a client (connecting) zmq socket.""" - options = options or dict() backend = backend or config["backend"] # Skip secure setup for inproc (internal thread communication) if address.startswith("inproc://"): @@ -39,7 +38,7 @@ def set_up_client_socket(socket_type: int, address: str, sock = create_secure_client_socket(socket_type) else: raise NotImplementedError() - add_options(sock, options) + _add_options(sock, options) sock.connect(address) return sock @@ -49,11 +48,11 @@ def create_unsecure_client_socket(socket_type: int) -> zmq.Socket[int]: return get_context().socket(socket_type) -def add_options(sock, options=None): +def _add_options(sock: zmq.Socket[int], options: dict[int, int]|None = None): """Add options to a socket.""" - if not options: - return - for param, val in options.items(): + combined_options = get_tcp_keepalive_options() + combined_options.update(options or {}) + for param, val in combined_options.items(): sock.setsockopt(param, val) @@ -82,12 +81,10 @@ def create_secure_client_socket(socket_type: int) -> zmq.Socket[int]: return subscriber -def set_up_server_socket(socket_type: int, destination: str, options: dict[int, str]|None = None, +def set_up_server_socket(socket_type: int, destination: str, options: dict[int, int]|None = None, port_interval: tuple[int|None, int|None] = (None, None), backend: str|None = None) -> tuple[zmq.Socket[int], int, ThreadAuthenticator|None]: """Set up a server (binding) socket.""" - if options is None: - options = {} _backend:str = backend or config["backend"] # Skip ZAP for inproc (internal thread communication) enable_zap = not destination.startswith("inproc://") @@ -98,7 +95,7 @@ def set_up_server_socket(socket_type: int, destination: str, options: dict[int, else: raise NotImplementedError() - add_options(sock, options) + _add_options(sock, options) port = bind(sock, destination, port_interval) return sock, port, authenticator diff --git a/posttroll/backends/zmq/subscriber.py b/posttroll/backends/zmq/subscriber.py index 5c3b767..3ac172e 100644 --- a/posttroll/backends/zmq/subscriber.py +++ b/posttroll/backends/zmq/subscriber.py @@ -8,7 +8,6 @@ from zmq import PULL, SUB, SUBSCRIBE, ContextTerminated, ZMQError from posttroll import config -from posttroll.backends.zmq import get_tcp_keepalive_options from posttroll.backends.zmq.socket import SocketReceiver, close_socket, set_up_client_socket from posttroll.message import CURRENT_MESSAGE_VERSION @@ -116,8 +115,7 @@ def add_hook_pull(self, address, callback): specified subscription. Good for pushed 'inproc' messages from another thread. """ LOGGER.info("Subscriber adding PULL hook %s", str(address)) - options = get_tcp_keepalive_options() - socket = self._create_socket(PULL, address, options) + socket = self._create_socket(PULL, address) if self._sock_receiver: self._sock_receiver.register(socket) self._add_hook(socket, callback) @@ -200,8 +198,6 @@ def __del__(self): pass def _add_sub_socket(self, address: dict[str, str], topics): - - options = get_tcp_keepalive_options() try: backend = address.get("backend", "unsecure_zmq") uri = address["URI"] @@ -209,14 +205,14 @@ def _add_sub_socket(self, address: dict[str, str], topics): backend = config["backend"] uri = address - subscriber = self._create_socket(SUB, uri, options, backend) + subscriber = self._create_socket(SUB, uri, backend=backend) add_subscriptions(subscriber, topics) if self._sock_receiver: self._sock_receiver.register(subscriber) return subscriber - def _create_socket(self, socket_type: int, address: str, options, backend: str|None = None): + def _create_socket(self, socket_type: int, address: str, options: dict[int,int]|None, backend: str|None = None) return set_up_client_socket(socket_type, address, options, backend) @@ -227,7 +223,7 @@ def ensure_address_is_dict(addr: dict[str, str]|str) -> dict[str, str]: elif isinstance(addr, str): res = dict(URI=addr) else: - NotImplementedError(f"Don't know how to handle {type(addr)} addresses") + raise NotImplementedError(f"Don't know how to handle {type(addr)} addresses") res.setdefault("backend", config["backend"]) return res diff --git a/posttroll/publisher.py b/posttroll/publisher.py index 959944b..0f40fd8 100644 --- a/posttroll/publisher.py +++ b/posttroll/publisher.py @@ -82,7 +82,7 @@ def start(self): self._publisher.start() return self - def send(self, msg): + def send(self, msg: str): """Send the given message.""" return self._publisher.send(msg) @@ -181,7 +181,7 @@ def start(self): self._broadcaster.start() return self - def send(self, msg): + def send(self, msg: str): """Send a *msg*.""" return self._publisher.send(msg) @@ -241,7 +241,7 @@ class Publish: """ - def __init__(self, name, port=0, aliases=None, broadcast_interval=2, nameservers=None, + def __init__(self, name: str, port=0, aliases=None, broadcast_interval=2, nameservers=None, min_port=None, max_port=None): """Initialize the class.""" settings = {"name": name, "port": port, "min_port": min_port, "max_port": max_port, From 033b49973a688f378b44a22ba3551963c64f4ba2 Mon Sep 17 00:00:00 2001 From: Martin Raspaud Date: Wed, 4 Mar 2026 15:25:45 +0100 Subject: [PATCH 3/7] Fix typo --- posttroll/backends/zmq/subscriber.py | 2 +- posttroll/message.py | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/posttroll/backends/zmq/subscriber.py b/posttroll/backends/zmq/subscriber.py index 3ac172e..89dd560 100644 --- a/posttroll/backends/zmq/subscriber.py +++ b/posttroll/backends/zmq/subscriber.py @@ -212,7 +212,7 @@ def _add_sub_socket(self, address: dict[str, str], topics): self._sock_receiver.register(subscriber) return subscriber - def _create_socket(self, socket_type: int, address: str, options: dict[int,int]|None, backend: str|None = None) + def _create_socket(self, socket_type: int, address: str, options: dict[int,int]|None, backend: str|None = None): return set_up_client_socket(socket_type, address, options, backend) diff --git a/posttroll/message.py b/posttroll/message.py index 5906124..7fca854 100644 --- a/posttroll/message.py +++ b/posttroll/message.py @@ -136,10 +136,10 @@ def head(self): self._validate() return _encode(self, head=True) - @staticmethod - def decode(rawstr:str|bytes): + @classmethod + def decode(cls, rawstr:str|bytes): """Decode a raw string into a Message.""" - return Message.from_string(rawstr) + return cls.from_string(rawstr) def encode(self) -> str: """Encode a Message to a raw string.""" From f5796de9b29dbf8810ccd0a6c17070feb0d88bc5 Mon Sep 17 00:00:00 2001 From: Martin Raspaud Date: Wed, 4 Mar 2026 15:42:10 +0100 Subject: [PATCH 4/7] Make options a kwarg --- posttroll/backends/zmq/subscriber.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/posttroll/backends/zmq/subscriber.py b/posttroll/backends/zmq/subscriber.py index 89dd560..5a9f125 100644 --- a/posttroll/backends/zmq/subscriber.py +++ b/posttroll/backends/zmq/subscriber.py @@ -212,7 +212,7 @@ def _add_sub_socket(self, address: dict[str, str], topics): self._sock_receiver.register(subscriber) return subscriber - def _create_socket(self, socket_type: int, address: str, options: dict[int,int]|None, backend: str|None = None): + def _create_socket(self, socket_type: int, address: str, options: dict[int,int]|None=None, backend: str|None = None): return set_up_client_socket(socket_type, address, options, backend) From fd9843642dcff2ab7369be6e5afe418c6350c7a7 Mon Sep 17 00:00:00 2001 From: Martin Raspaud Date: Wed, 4 Mar 2026 15:55:36 +0100 Subject: [PATCH 5/7] Add -v flag to pytest --- .github/workflows/ci.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 2f05f80..0ebe84b 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -27,7 +27,7 @@ jobs: pip install --no-deps -e . - name: Run tests run: | - pytest --cov=posttroll posttroll/tests --cov-report=xml + pytest -v --cov=posttroll posttroll/tests --cov-report=xml - name: Upload coverage to Codecov uses: codecov/codecov-action@v5 with: From e5d8f24768bce56628b529cee78d73b629aa53f1 Mon Sep 17 00:00:00 2001 From: Martin Raspaud Date: Thu, 5 Mar 2026 12:01:39 +0100 Subject: [PATCH 6/7] Fix message encoding for v1.01 --- posttroll/message.py | 2 +- posttroll/tests/test_message.py | 11 ++++++++++- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/posttroll/message.py b/posttroll/message.py index 7fca854..2404b31 100644 --- a/posttroll/message.py +++ b/posttroll/message.py @@ -369,7 +369,7 @@ def _encode(msg:Message, head:bool=False, binary:bool=False) -> str: version = render_version(msg.version, msg.data, binary) rawstr = str(_MAGICK) + u"{0:s} {1:s} {2:s} {3:s} {4:s}".format( - msg.subject, msg.type, msg.sender, msg.time.isoformat(), version) + msg.subject, msg.type, msg.sender, create_datetime_encoder_for_version(version)(msg.time), version) if not head and msg.data: mimetype, data = _encode_data(msg.data, binary, version) return " ".join((rawstr, mimetype, data)) diff --git a/posttroll/tests/test_message.py b/posttroll/tests/test_message.py index 8e7a410..a90eace 100644 --- a/posttroll/tests/test_message.py +++ b/posttroll/tests/test_message.py @@ -57,7 +57,7 @@ def test_encode(): msg1 = Message(subject, atype, data=data) sender = "%s@%s" % (msg1.user, msg1.host) full_message = (_MAGICK + subject + " " + atype + " " + sender + " " + - str(msg1.time.isoformat()) + " " + "v1.01" + " " + "text/ascii" + " " + data) + msg1.time.replace(tzinfo=None).isoformat() + " " + "v1.01" + " " + "text/ascii" + " " + data) assert full_message == msg1.encode() @@ -159,6 +159,15 @@ def test_message_can_generate_v1_01(): assert str(msg) == rawmsg +def test_v1_01_header_timestamp_has_no_timezone(): + """Ensure the header timestamp has no timezone info for v1.01 messages.""" + version = "v1.01" + msg = Message("a", "b", data="hello", version=version) + rawmsg = str(msg) + header_timestamp = rawmsg.split(" ")[3] + assert "+00:00" not in header_timestamp + + def test_message_has_timezone_by_default(): """Ensure message contain time zone info.""" msg = Message("a", "b", From c5bdb94521586eb027fbb886ca5b748c3966979d Mon Sep 17 00:00:00 2001 From: Martin Raspaud Date: Thu, 5 Mar 2026 12:02:15 +0100 Subject: [PATCH 7/7] Do not let nameserver resent v1.2 is not needed --- posttroll/address_receiver.py | 3 ++- posttroll/tests/test_nameserver.py | 20 ++++++++++++++++++++ 2 files changed, 22 insertions(+), 1 deletion(-) diff --git a/posttroll/address_receiver.py b/posttroll/address_receiver.py index 88615f8..6952afe 100644 --- a/posttroll/address_receiver.py +++ b/posttroll/address_receiver.py @@ -16,7 +16,7 @@ from posttroll import config from posttroll.bbmcast import MulticastReceiver, get_configured_broadcast_port -from posttroll.message import Message +from posttroll.message import Message, version_needed from posttroll.publisher import Publish __all__ = ("AddressReceiver", "getaddress") @@ -178,6 +178,7 @@ def process_address_message(self, data, pub): if addr not in self._addresses: logger.info("nameserver: publish add '%s'", str(msg)) + msg.version = version_needed(msg.data, False) pub.send(msg.encode()) self._add(addr, metadata) diff --git a/posttroll/tests/test_nameserver.py b/posttroll/tests/test_nameserver.py index f03fa07..e306194 100644 --- a/posttroll/tests/test_nameserver.py +++ b/posttroll/tests/test_nameserver.py @@ -354,6 +354,26 @@ def test_unsecure_tcp_nameserver(tmp_path): thr.join() +@pytest.mark.parametrize("received_version", ["v1.01", "v1.2"]) +def test_address_receiver_republishes_as_v1_01(received_version): + """Address re-publications must be v1.01 so old posttroll clients can decode the header timestamp.""" + from unittest.mock import Mock + + from posttroll.address_receiver import AddressReceiver + + adr = AddressReceiver() + pub = Mock() + incoming = Message("/address/myservice", "info", + {"URI": "tcp://host:1234", "service": ["myservice"]}, + version=received_version) + adr.process_address_message(incoming.encode(), pub) + + published_raw = pub.send.call_args[0][0] + published = Message.decode(published_raw) + assert published.version == "v1.01" + assert "+00:00" not in published_raw.split(" ")[3] + + @pytest.mark.parametrize("version", ["v1.01", "v1.2"]) def test_message_version_compatibility(tmp_path, version): """Ensure the message version of nameserver responses."""