Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ jobs:
strategy:
fail-fast: true
matrix:
python-version: ["3.10", "3.11", "3.12"]
python-version: ["3.12", "3.13", "3.14"]
experimental: [false]
steps:
- name: Checkout source
Expand Down
3 changes: 3 additions & 0 deletions doc/source/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,9 @@ These settings can also be set using the posttroll config object, for example::

The posttroll configuration uses donfig, for more information, check https://donfig.readthedocs.io/en/latest/.

Nameserver also now use both a secure and unsecure port for communicating. The port for secure connection can be set with
the "secure_zmq_nameserver_port" config item, while the unsecure one use the known "nameserver_port" config item.


Generating the public and secret key pairs
******************************************
Expand Down
4 changes: 2 additions & 2 deletions posttroll/address_receiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ def get_local_ips():
class AddressReceiver:
"""General thread to receive broadcast addresses."""

def __init__(self, max_age=ten_minutes, port=None,
do_heartbeat=True, multicast_enabled=True, restrict_to_localhost=False):
def __init__(self, max_age: dt.timedelta = ten_minutes, port: int|None =None,
do_heartbeat: bool = True, multicast_enabled: bool = True, restrict_to_localhost: bool = False):
"""Set up the address receiver."""
self._max_age = max_age
self._port = port or get_configured_address_port()
Expand Down
2 changes: 1 addition & 1 deletion posttroll/backends/zmq/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
context = {}


def get_context():
def get_context() -> zmq.Context:
"""Provide the context to use.

This function takes care of creating new contexts in case of forks.
Expand Down
22 changes: 11 additions & 11 deletions posttroll/backends/zmq/address_receiver.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,32 @@
"""ZMQ implementation of the the simple receiver."""
"""ZMQ implementation of the simple receiver."""

from zmq import REP
import zmq

from posttroll.address_receiver import get_configured_address_port
from posttroll.backends.zmq.socket import close_socket, set_up_server_socket


class SimpleReceiver(object):
class SimpleReceiver:
"""Simple listing on port for address messages."""

def __init__(self, port=None, timeout=2):
"""Set up the receiver."""
self._port = port or get_configured_address_port()
address = "tcp://*:" + str(port)
self._socket, _, self._authenticator = set_up_server_socket(REP, address)
address = "tcp://*:" + str(self._port)
self._socket, _, self._authenticator = set_up_server_socket(zmq.REP, address)
self._socket.setsockopt(zmq.RCVTIMEO, timeout * 1000) # timeout in milliseconds
self._running = True
self.timeout = timeout

def __call__(self):
"""Receive a message."""
while self._running:
try:
message = self._socket.recv_string(self.timeout)
except TimeoutError:
continue
else:
self._socket.send_string("ok")
return message, None
message = self._socket.recv_string()
except zmq.Again:
raise TimeoutError("Receive timed out")
self._socket.send_string("ok")
return message, None

def close(self):
"""Close the receiver."""
Expand Down
8 changes: 4 additions & 4 deletions posttroll/backends/zmq/message_broadcaster.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,18 @@
class ZMQDesignatedReceiversSender:
"""Sends message to multiple *receivers* on *port*."""

def __init__(self, default_port, receivers):
def __init__(self, default_port: int, receivers: list[str]):
"""Set up the sender."""
self.default_port = default_port
self.receivers = receivers
self.receivers: list[str] = receivers
self._shutdown_event = threading.Event()

def __call__(self, data):
def __call__(self, data: str):
"""Send data."""
for receiver in self.receivers:
self._send_to_address(receiver, data)

def _send_to_address(self, address, data, timeout=10):
def _send_to_address(self, address: str, data: str, timeout: int = 10):
"""Send data to *address* and *port* without verification of response."""
# Socket to talk to server
if address.find(":") == -1:
Expand Down
82 changes: 63 additions & 19 deletions posttroll/backends/zmq/ns.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""ZMQ implexentation of ns."""
"""ZMQ implementation of ns."""

import logging
from contextlib import suppress
Expand All @@ -7,11 +7,23 @@

from zmq import LINGER, REP, REQ

from posttroll.backends.zmq.socket import SocketReceiver, close_socket, set_up_client_socket, set_up_server_socket
from posttroll import config
from posttroll.address_receiver import AddressReceiver
from posttroll.backends.zmq.socket import (
ConfigurationError,
SocketReceiver,
close_socket,
set_up_client_socket,
set_up_server_socket,
)
from posttroll.message import Message
from posttroll.ns import get_active_address, get_configured_nameserver_port
from posttroll.ns import (
get_active_address,
get_configured_secure_zmq_nameserver_port,
get_configured_unsecure_zmq_nameserver_port,
)

logger = logging.getLogger("__name__")
logger = logging.getLogger(__name__)

nslock = Lock()

Expand All @@ -21,18 +33,26 @@ def zmq_get_pub_address(name: str, timeout: float | int = 10, nameserver: str =

For a given publisher *name* from the nameserver on *nameserver* (localhost by default).
"""
nameserver_address = create_nameserver_address(nameserver)
backend = config["backend"]
if backend == "unsecure_zmq":
nameserver_address = create_unsecure_zmq_nameserver_address(nameserver)
elif backend == "secure_zmq":
nameserver_address = create_secure_zmq_nameserver_address(nameserver)
else:
raise NotImplementedError()
return _fetch_address_using_socket(nameserver_address, name, timeout)


def create_nameserver_address(nameserver:str):
def create_unsecure_zmq_nameserver_address(nameserver:str):
"""Create the nameserver address.

If `nameserver` is already preformatted and complete, the address is returned without change.
"""
url_parts = urlsplit(nameserver)
port = get_configured_nameserver_port()
port = get_configured_unsecure_zmq_nameserver_port()
return _create_nameserver_address(nameserver, port)

def _create_nameserver_address(nameserver:str, port:int):
url_parts = urlsplit(nameserver)
if not url_parts.scheme:
nameserver_address = "tcp://" + nameserver + ":" + str(port)
elif url_parts.scheme == "tcp" and url_parts.port is None:
Expand All @@ -42,6 +62,15 @@ def create_nameserver_address(nameserver:str):
return nameserver_address


def create_secure_zmq_nameserver_address(nameserver:str):
"""Create the nameserver address.

If `nameserver` is already preformatted and complete, the address is returned without change.
"""
port = get_configured_secure_zmq_nameserver_port()
return _create_nameserver_address(nameserver, port)


def _fetch_address_using_socket(nameserver_address, name, timeout):
try:
request = Message("/oper/ns", "request", {"service": name})
Expand Down Expand Up @@ -83,44 +112,59 @@ class ZMQNameServer:
def __init__(self):
"""Set up the nameserver."""
self.running: bool = True
self.listener: SocketReceiver | None = None
self.unsecure_listener: SocketReceiver | None = None
self.secure_listener: SocketReceiver | None = None
self._authenticator = None

def run(self, address_receiver, address:str|None=None):
def run(self, address_receiver: AddressReceiver, address:str|None=None):
"""Run the listener and answer to requests."""
port = get_configured_nameserver_port()
unsecure_port = get_configured_unsecure_zmq_nameserver_port()

try:
# stop was called before we could start running, exit
if not self.running:
return
if address is None:
address = "*"
address = create_nameserver_address(address)
self.listener, _, self._authenticator = set_up_server_socket(REP, address)
logger.debug(f"Nameserver listening on port {port}")
unsecure_address = create_unsecure_zmq_nameserver_address(address)
self.unsecure_listener, _, self._authenticator = set_up_server_socket(REP, unsecure_address, backend="unsecure_zmq")
socks = [self.unsecure_listener]
ports = [unsecure_port]
try:
secure_port = get_configured_secure_zmq_nameserver_port()
secure_address = create_secure_zmq_nameserver_address(address)
self.secure_listener, _, self._authenticator = set_up_server_socket(REP, secure_address, backend="secure_zmq")
socks.append(self.secure_listener)
ports.append(secure_port)
except ConfigurationError as err:
logger.warning(f"Cannot create secure access to nameserver: {str(err)}")

logger.debug(f"Nameserver listening on ports {ports}")
socket_receiver = SocketReceiver()
socket_receiver.register(self.listener)
for sock in socks:
socket_receiver.register(sock)
while self.running:
try:
for msg, _ in socket_receiver.receive(self.listener, timeout=1):
for msg, sock in socket_receiver.receive(*socks, timeout=1):
logger.debug("Replying to request: " + str(msg))
active_address = get_active_address(msg.data["service"],
address_receiver, msg.version)
self.listener.send_unicode(str(active_address))
sock.send_unicode(str(active_address))
except TimeoutError:
continue
except KeyboardInterrupt:
# Needed to stop the nameserver.
pass
finally:
socket_receiver.unregister(self.listener)
with suppress(UnboundLocalError):
for sock in socks:
socket_receiver.unregister(sock)
self.close_sockets_and_threads()

def close_sockets_and_threads(self):
"""Close all sockets and threads."""
with suppress(AttributeError):
close_socket(self.listener)
close_socket(self.unsecure_listener)
with suppress(AttributeError):
self._authenticator.stop()

Expand Down
2 changes: 1 addition & 1 deletion posttroll/backends/zmq/publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
class ZMQPublisher:
"""Unsecure ZMQ implementation of the publisher class."""

def __init__(self, address, name="", min_port=None, max_port=None):
def __init__(self, address:str, name="", min_port=None, max_port=None):
"""Set up the publisher.

Args:
Expand Down
Loading
Loading