From 1a3bd5200f52236dfe154e023f1c4e814c41efb1 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 18 Mar 2026 22:53:41 +0000 Subject: [PATCH 1/5] Initial plan From 2c731d6f92a847f557161f46cad202a772cfcff9 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 18 Mar 2026 23:00:27 +0000 Subject: [PATCH 2/5] Replace add_reader/add_writer with async socket ops for Windows compatibility Replace loop.add_reader()/remove_reader() with async recv loop tasks using loop.sock_recvfrom() for both unicast and multicast sockets. Replace loop.add_writer()/remove_writer() in _async_sendto with loop.sock_sendto(). These asyncio socket methods work on all platforms including Windows' ProactorEventLoop, which does not support add_reader/add_writer. Remove _SKIP_WINDOWS markers from integration tests. Co-authored-by: pavel-kirienko <3298404+pavel-kirienko@users.noreply.github.com> --- src/pycyphal/udp.py | 155 ++++++++++++++++++++++---------------------- tests/test_udp.py | 11 ---- 2 files changed, 77 insertions(+), 89 deletions(-) diff --git a/src/pycyphal/udp.py b/src/pycyphal/udp.py index aab28096..224d152a 100644 --- a/src/pycyphal/udp.py +++ b/src/pycyphal/udp.py @@ -395,17 +395,16 @@ def close(self) -> None: if self._handler in handlers: handlers.remove(self._handler) if not handlers: - # No more listeners for this subject -- clean up sockets + # No more listeners for this subject -- clean up sockets and tasks self._transport._subject_handlers.pop(self._subject_id, None) self._transport._reassemblers.pop(self._subject_id, None) for i in range(len(self._transport._interfaces)): key = (self._subject_id, i) + task = self._transport._mcast_rx_tasks.pop(key, None) + if task is not None: + task.cancel() sock = self._transport._mcast_socks.pop(key, None) if sock is not None: - try: - self._transport._loop.remove_reader(sock.fileno()) - except Exception: - pass sock.close() @@ -491,9 +490,14 @@ def __init__( self._remote_endpoints: dict[tuple[int, int], tuple[str, int]] = {} self._next_unicast_transfer_id = int.from_bytes(os.urandom(6), "little") - # Register unicast RX readers on TX sockets + # Async RX tasks (platform-agnostic, replaces add_reader) + self._unicast_rx_tasks: list[asyncio.Task[None]] = [] + self._mcast_rx_tasks: dict[tuple[int, int], asyncio.Task[None]] = {} + + # Start unicast RX tasks on TX sockets for i, sock in enumerate(self._tx_socks): - self._loop.add_reader(sock.fileno(), self._on_unicast_data, i) + task = self._loop.create_task(self._unicast_rx_loop(sock, i)) + self._unicast_rx_tasks.append(task) _logger.info( "UDPTransport initialized: uid=0x%016x, interfaces=%s, modulus=%d", @@ -535,30 +539,13 @@ def _create_mcast_socket(subject_id: int, iface: Interface) -> socket.socket: async def _async_sendto(self, sock: socket.socket, data: bytes, addr: tuple[str, int], deadline: Instant) -> None: """Send a UDP datagram, suspending until writable or deadline exceeded.""" - loop = self._loop - while True: - remaining_ns = deadline.ns - Instant.now().ns - if remaining_ns <= 0: - raise SendError("Deadline exceeded") - try: - sock.sendto(data, addr) - return - except BlockingIOError: - # Socket buffer full -- wait for writability or deadline - fut: asyncio.Future[None] = loop.create_future() - fd = sock.fileno() - - def _ready() -> None: - loop.remove_writer(fd) - if not fut.done(): - fut.set_result(None) - - loop.add_writer(fd, _ready) - try: - await asyncio.wait_for(fut, timeout=remaining_ns * 1e-9) - except asyncio.TimeoutError: - loop.remove_writer(fd) - raise SendError("Deadline exceeded waiting for socket writability") + remaining_ns = deadline.ns - Instant.now().ns + if remaining_ns <= 0: + raise SendError("Deadline exceeded") + try: + await asyncio.wait_for(self._loop.sock_sendto(sock, data, addr), timeout=remaining_ns * 1e-9) + except asyncio.TimeoutError: + raise SendError("Deadline exceeded waiting for socket writability") # -- Transport ABC -- @@ -574,7 +561,8 @@ def subject_listen(self, subject_id: int, handler: Callable[[TransportArrival], key = (subject_id, i) sock = self._create_mcast_socket(subject_id, iface) self._mcast_socks[key] = sock - self._loop.add_reader(sock.fileno(), self._on_mcast_data, subject_id, i) + task = self._loop.create_task(self._mcast_rx_loop(sock, subject_id, i)) + self._mcast_rx_tasks[key] = task self._subject_handlers[subject_id].append(handler) return _UDPSubjectListener(self, subject_id, handler) @@ -619,65 +607,76 @@ def close(self) -> None: return self._closed = True _logger.info("Closing UDPTransport uid=0x%016x", self._uid) + for task in self._unicast_rx_tasks: + task.cancel() + self._unicast_rx_tasks.clear() + for task in self._mcast_rx_tasks.values(): + task.cancel() + self._mcast_rx_tasks.clear() for sock in self._tx_socks: - try: - self._loop.remove_reader(sock.fileno()) - except Exception: - pass sock.close() for sock in self._mcast_socks.values(): - try: - self._loop.remove_reader(sock.fileno()) - except Exception: - pass sock.close() self._mcast_socks.clear() self._tx_socks.clear() self._subject_handlers.clear() self._reassemblers.clear() - # -- Internal RX callbacks -- + # -- Internal async RX loops -- - def _on_mcast_data(self, subject_id: int, iface_idx: int) -> None: - sock = self._mcast_socks.get((subject_id, iface_idx)) - if sock is None: - return + async def _mcast_rx_loop(self, sock: socket.socket, subject_id: int, iface_idx: int) -> None: + """Async loop that reads from a multicast socket and processes datagrams.""" try: - data, addr = sock.recvfrom(65536) - except OSError as e: - _logger.debug("Multicast recv error on subject %d iface %d: %s", subject_id, iface_idx, e) - return - src_ip, src_port = addr[0], addr[1] - if (src_ip, src_port) in self._self_endpoints: - return # Self-send filter - self._process_subject_datagram(data, src_ip, src_port, subject_id, iface_idx) + while not self._closed: + try: + data, addr = await self._loop.sock_recvfrom(sock, 65536) + except OSError: + if self._closed: + break + _logger.debug("Multicast recv error on subject %d iface %d", subject_id, iface_idx) + continue + src_ip, src_port = addr[0], addr[1] + if (src_ip, src_port) in self._self_endpoints: + continue # Self-send filter + self._process_subject_datagram(data, src_ip, src_port, subject_id, iface_idx) + except asyncio.CancelledError: + pass - def _on_unicast_data(self, iface_idx: int) -> None: - sock = self._tx_socks[iface_idx] + async def _unicast_rx_loop(self, sock: socket.socket, iface_idx: int) -> None: + """Async loop that reads from a unicast socket and processes datagrams.""" try: - data, addr = sock.recvfrom(65536) - except OSError as e: - _logger.debug("Unicast recv error on iface %d: %s", iface_idx, e) - return - src_ip, src_port = addr[0], addr[1] - if len(data) < HEADER_SIZE: - return - header = _header_deserialize(data[:HEADER_SIZE]) - if header is None: - return - # Record remote endpoint for unicast discovery - self._remote_endpoints[(header.sender_uid, iface_idx)] = (src_ip, src_port) - payload_chunk = data[HEADER_SIZE:] - result = self._unicast_reassembler.accept(header, payload_chunk) - if result is not None: - sender_uid, priority, message = result - _logger.debug("Unicast transfer complete from sender_uid=0x%016x", sender_uid) - if self._unicast_handler is not None: - self._unicast_handler( - TransportArrival( - timestamp=Instant.now(), priority=Priority(priority), remote_id=sender_uid, message=message - ) - ) + while not self._closed: + try: + data, addr = await self._loop.sock_recvfrom(sock, 65536) + except OSError: + if self._closed: + break + _logger.debug("Unicast recv error on iface %d", iface_idx) + continue + src_ip, src_port = addr[0], addr[1] + if len(data) < HEADER_SIZE: + continue + header = _header_deserialize(data[:HEADER_SIZE]) + if header is None: + continue + # Record remote endpoint for unicast discovery + self._remote_endpoints[(header.sender_uid, iface_idx)] = (src_ip, src_port) + payload_chunk = data[HEADER_SIZE:] + result = self._unicast_reassembler.accept(header, payload_chunk) + if result is not None: + sender_uid, priority, message = result + _logger.debug("Unicast transfer complete from sender_uid=0x%016x", sender_uid) + if self._unicast_handler is not None: + self._unicast_handler( + TransportArrival( + timestamp=Instant.now(), + priority=Priority(priority), + remote_id=sender_uid, + message=message, + ) + ) + except asyncio.CancelledError: + pass def _process_subject_datagram( self, data: bytes, src_ip: str, src_port: int, subject_id: int, iface_idx: int diff --git a/tests/test_udp.py b/tests/test_udp.py index 825cdb04..85b79259 100644 --- a/tests/test_udp.py +++ b/tests/test_udp.py @@ -6,7 +6,6 @@ import os import socket import struct -import sys from ipaddress import IPv4Address from unittest.mock import patch @@ -692,10 +691,6 @@ def loopback_iface(): return _get_loopback_iface() -_SKIP_WINDOWS = pytest.mark.skipif(sys.platform == "win32", reason="add_reader not supported on ProactorEventLoop") - - -@_SKIP_WINDOWS class TestIntegrationPubSub: @pytest.mark.asyncio async def test_single_frame_pubsub(self, loopback_iface): @@ -789,7 +784,6 @@ async def test_empty_payload(self, loopback_iface): sub.close() -@_SKIP_WINDOWS class TestIntegrationUnicast: @pytest.mark.asyncio async def test_unicast_roundtrip(self, loopback_iface): @@ -826,7 +820,6 @@ async def test_unicast_roundtrip(self, loopback_iface): b.close() -@_SKIP_WINDOWS class TestIntegrationListenerLifecycle: @pytest.mark.asyncio async def test_listener_close_stops_delivery(self, loopback_iface): @@ -902,7 +895,6 @@ async def test_close_one_listener_keeps_other(self, loopback_iface): sub.close() -@_SKIP_WINDOWS class TestIntegrationTransportClose: @pytest.mark.asyncio async def test_close_cleans_up(self, loopback_iface): @@ -944,7 +936,6 @@ async def test_subject_id_modulus(self, loopback_iface): t2.close() -@_SKIP_WINDOWS class TestIntegrationSelfSendFilter: @pytest.mark.asyncio async def test_self_send_filtered(self, loopback_iface): @@ -961,7 +952,6 @@ async def test_self_send_filtered(self, loopback_iface): t.close() -@_SKIP_WINDOWS class TestIntegrationDifferentSubjects: @pytest.mark.asyncio async def test_messages_isolated_by_subject(self, loopback_iface): @@ -1007,7 +997,6 @@ async def test_empty_list_raises(self): # ===================================================================================================================== -@_SKIP_WINDOWS class TestAsyncSendto: @pytest.mark.asyncio async def test_deadline_already_expired(self, loopback_iface): From ff200cf56c135a7c66f8991b4fba5dbaeab30302 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 18 Mar 2026 23:01:28 +0000 Subject: [PATCH 3/5] Address review: add sleep on OSError and improve docstrings in RX loops Co-authored-by: pavel-kirienko <3298404+pavel-kirienko@users.noreply.github.com> --- src/pycyphal/udp.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/pycyphal/udp.py b/src/pycyphal/udp.py index 224d152a..e8fe1320 100644 --- a/src/pycyphal/udp.py +++ b/src/pycyphal/udp.py @@ -625,7 +625,7 @@ def close(self) -> None: # -- Internal async RX loops -- async def _mcast_rx_loop(self, sock: socket.socket, subject_id: int, iface_idx: int) -> None: - """Async loop that reads from a multicast socket and processes datagrams.""" + """Async receive loop for a multicast socket. Runs until cancelled or transport is closed.""" try: while not self._closed: try: @@ -634,6 +634,7 @@ async def _mcast_rx_loop(self, sock: socket.socket, subject_id: int, iface_idx: if self._closed: break _logger.debug("Multicast recv error on subject %d iface %d", subject_id, iface_idx) + await asyncio.sleep(0.1) continue src_ip, src_port = addr[0], addr[1] if (src_ip, src_port) in self._self_endpoints: @@ -643,7 +644,7 @@ async def _mcast_rx_loop(self, sock: socket.socket, subject_id: int, iface_idx: pass async def _unicast_rx_loop(self, sock: socket.socket, iface_idx: int) -> None: - """Async loop that reads from a unicast socket and processes datagrams.""" + """Async receive loop for a unicast socket. Runs until cancelled or transport is closed.""" try: while not self._closed: try: @@ -652,6 +653,7 @@ async def _unicast_rx_loop(self, sock: socket.socket, iface_idx: int) -> None: if self._closed: break _logger.debug("Unicast recv error on iface %d", iface_idx) + await asyncio.sleep(0.1) continue src_ip, src_port = addr[0], addr[1] if len(data) < HEADER_SIZE: From 44f205bd59914d7253bc3fb9e82a87151e27c9ed Mon Sep 17 00:00:00 2001 From: Pavel Kirienko Date: Thu, 19 Mar 2026 12:54:19 +0200 Subject: [PATCH 4/5] Fix Windows CI: mock loop.sock_sendto instead of socket.socket.sendto On Windows (ProactorEventLoop), loop.sock_sendto() doesn't go through socket.socket.sendto, so patching the latter had no effect. Mock at the event loop level instead, which works cross-platform. Co-Authored-By: Claude Opus 4.6 (1M context) --- CLAUDE.md | 16 ++++++++++------ tests/test_udp.py | 39 +++++++++++++++------------------------ 2 files changed, 25 insertions(+), 30 deletions(-) diff --git a/CLAUDE.md b/CLAUDE.md index a3a5d8d9..341cea01 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -1,18 +1,22 @@ # Instructions for AI agents +This is a Python implementation of the Cyphal decentralized real-time publish-subscribe protocol. The key design goals are **simplicity** and **robustness**. + +All features of the library MUST work on GNU/Linux, Windows, and macOS; the CI system must ensure that. Supported Python versions are starting from the oldest version specified in `pyproject.toml` up to the current latest stable Python. + To get a better feel of the problem domain, peruse `reference/cy`, especially the formal models and the reference implementation in C. ## Code Layout Source is in `src/pycyphal/`, tests in `tests/`. The package is extremely compact by design and has very few modules: -- **`_common.py`** — Exception hierarchy, utilities, etc. -- **`_transport.py`** — Abstract `Transport` interface defining subject broadcast and unicast operations. `SubjectWriter` interface. `TransportArrival` dataclass. -- **`_node.py`** — Core `Node` implementation. Manages CRDT, gossip protocol, message routing, etc — all main functions of the protocol. -- **`_wire.py`** — Wire protocol: message headers, subject ID computation, CRDT timestamp reconciliation. -- **`__init__.py`** — Public API re-exports from the above modules. +- `_common.py` — Exception hierarchy, utilities, etc. +- `_transport.py` — Abstract `Transport` interface defining subject broadcast and unicast operations. `SubjectWriter` interface. `TransportArrival` dataclass. +- `_node.py` — Core `Node` implementation. Manages CRDT, gossip protocol, message routing, etc — all main functions of the protocol. +- `_wire.py` — Wire protocol: message headers, subject ID computation, CRDT timestamp reconciliation. +- `__init__.py` — Public API re-exports from the above modules. - Concrete transports: - - **`udp.py`** — Cyphal/UDP transport implementation. + - `udp.py` — Cyphal/UDP transport implementation. Internal implementation modules use leading underscores. Keep public symbols explicit through `__init__.py`; keep private helpers in underscore-prefixed modules. diff --git a/tests/test_udp.py b/tests/test_udp.py index 85b79259..866f73b7 100644 --- a/tests/test_udp.py +++ b/tests/test_udp.py @@ -1020,43 +1020,36 @@ async def test_sendto_immediate_success(self, loopback_iface): t.close() @pytest.mark.asyncio - async def test_sendto_retries_on_blocking(self, loopback_iface): - """Mock sendto to raise BlockingIOError first, succeed second.""" + async def test_sendto_delegates_to_loop(self, loopback_iface): + """Verify _async_sendto delegates to loop.sock_sendto.""" t = UDPTransport(interfaces=[loopback_iface]) try: sock = t._tx_socks[0] - original_sendto = socket.socket.sendto - call_count = 0 + called = False - def mock_sendto(self_sock, data, *args): - nonlocal call_count - if self_sock is sock: - call_count += 1 - if call_count == 1: - raise BlockingIOError() - return original_sendto(self_sock, data, *args) + async def mock_sock_sendto(s, data, addr): + nonlocal called + called = True deadline = Instant.now() + 2.0 - with patch.object(socket.socket, "sendto", mock_sendto): + with patch.object(t._loop, "sock_sendto", mock_sock_sendto): await t._async_sendto(sock, b"retry", ("127.0.0.1", sock.getsockname()[1]), deadline) - assert call_count == 2 + assert called finally: t.close() @pytest.mark.asyncio async def test_deadline_exceeded_during_wait(self, loopback_iface): - """Mock socket always blocks, short deadline -> SendError.""" + """sock_sendto hangs forever, short deadline -> SendError.""" t = UDPTransport(interfaces=[loopback_iface]) try: sock = t._tx_socks[0] - def mock_sendto(self_sock, data, *args): - if self_sock is sock: - raise BlockingIOError() - return socket.socket.sendto(self_sock, data, *args) + async def mock_sock_sendto(s, data, addr): + await asyncio.sleep(100) deadline = Instant.now() + 0.05 # 50ms - with patch.object(socket.socket, "sendto", mock_sendto): + with patch.object(t._loop, "sock_sendto", mock_sock_sendto): with pytest.raises(SendError): await t._async_sendto(sock, b"block", ("127.0.0.1", 9999), deadline) finally: @@ -1069,13 +1062,11 @@ async def test_sendto_os_error_propagates(self, loopback_iface): try: sock = t._tx_socks[0] - def mock_sendto(self_sock, data, *args): - if self_sock is sock: - raise OSError("Network unreachable") - return socket.socket.sendto(self_sock, data, *args) + async def mock_sock_sendto(s, data, addr): + raise OSError("Network unreachable") deadline = Instant.now() + 2.0 - with patch.object(socket.socket, "sendto", mock_sendto): + with patch.object(t._loop, "sock_sendto", mock_sock_sendto): with pytest.raises(OSError, match="Network unreachable"): await t._async_sendto(sock, b"fail", ("127.0.0.1", 9999), deadline) finally: From f1659cf5c012d0d5b8a05cec731342c1f1123661 Mon Sep 17 00:00:00 2001 From: Pavel Kirienko Date: Thu, 19 Mar 2026 13:07:22 +0200 Subject: [PATCH 5/5] logging guidelines --- CLAUDE.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/CLAUDE.md b/CLAUDE.md index 341cea01..c3056909 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -38,4 +38,10 @@ Key mechanisms in `Node`: - **Types**: Fully type-annotated; frozen dataclasses for data; `__slots__` for performance. - **Dependencies**: Intentionally kept to the bare minimum. - **Testing**: Mock transport/network in `tests/conftest.py`; tests are ~10x the size of source code. +- **Logging**: Rich and extensive logging is required throughout the codebase: + - DEBUG for super detailed traces; + - INFO for anything not on the hot data path; + - WARNING for anything unusual; + - ERROR for errors or anything unexpected; + - CRITICAL for fatal or high-severity errors. - For agent-authored commits, set `GIT_AUTHOR_NAME="Agent"` and `GIT_COMMITTER_NAME="Agent"`.