Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 16 additions & 6 deletions CLAUDE.md
Original file line number Diff line number Diff line change
@@ -1,18 +1,22 @@
# Instructions for AI agents

This is a Python implementation of the Cyphal decentralized real-time publish-subscribe protocol. The key design goals are **simplicity** and **robustness**.

All features of the library MUST work on GNU/Linux, Windows, and macOS; the CI system must ensure that. Supported Python versions are starting from the oldest version specified in `pyproject.toml` up to the current latest stable Python.

To get a better feel of the problem domain, peruse `reference/cy`, especially the formal models and the reference implementation in C.

## Code Layout

Source is in `src/pycyphal/`, tests in `tests/`. The package is extremely compact by design and has very few modules:

- **`_common.py`** — Exception hierarchy, utilities, etc.
- **`_transport.py`** — Abstract `Transport` interface defining subject broadcast and unicast operations. `SubjectWriter` interface. `TransportArrival` dataclass.
- **`_node.py`** — Core `Node` implementation. Manages CRDT, gossip protocol, message routing, etc — all main functions of the protocol.
- **`_wire.py`** — Wire protocol: message headers, subject ID computation, CRDT timestamp reconciliation.
- **`__init__.py`** — Public API re-exports from the above modules.
- `_common.py` — Exception hierarchy, utilities, etc.
- `_transport.py` — Abstract `Transport` interface defining subject broadcast and unicast operations. `SubjectWriter` interface. `TransportArrival` dataclass.
- `_node.py` — Core `Node` implementation. Manages CRDT, gossip protocol, message routing, etc — all main functions of the protocol.
- `_wire.py` — Wire protocol: message headers, subject ID computation, CRDT timestamp reconciliation.
- `__init__.py` — Public API re-exports from the above modules.
- Concrete transports:
- **`udp.py`** — Cyphal/UDP transport implementation.
- `udp.py` — Cyphal/UDP transport implementation.

Internal implementation modules use leading underscores. Keep public symbols explicit through `__init__.py`; keep private helpers in underscore-prefixed modules.

Expand All @@ -34,4 +38,10 @@ Key mechanisms in `Node`:
- **Types**: Fully type-annotated; frozen dataclasses for data; `__slots__` for performance.
- **Dependencies**: Intentionally kept to the bare minimum.
- **Testing**: Mock transport/network in `tests/conftest.py`; tests are ~10x the size of source code.
- **Logging**: Rich and extensive logging is required throughout the codebase:
- DEBUG for super detailed traces;
- INFO for anything not on the hot data path;
- WARNING for anything unusual;
- ERROR for errors or anything unexpected;
- CRITICAL for fatal or high-severity errors.
- For agent-authored commits, set `GIT_AUTHOR_NAME="Agent"` and `GIT_COMMITTER_NAME="Agent"`.
157 changes: 79 additions & 78 deletions src/pycyphal/udp.py
Original file line number Diff line number Diff line change
Expand Up @@ -395,17 +395,16 @@ def close(self) -> None:
if self._handler in handlers:
handlers.remove(self._handler)
if not handlers:
# No more listeners for this subject -- clean up sockets
# No more listeners for this subject -- clean up sockets and tasks
self._transport._subject_handlers.pop(self._subject_id, None)
self._transport._reassemblers.pop(self._subject_id, None)
for i in range(len(self._transport._interfaces)):
key = (self._subject_id, i)
task = self._transport._mcast_rx_tasks.pop(key, None)
if task is not None:
task.cancel()
sock = self._transport._mcast_socks.pop(key, None)
if sock is not None:
try:
self._transport._loop.remove_reader(sock.fileno())
except Exception:
pass
sock.close()


Expand Down Expand Up @@ -491,9 +490,14 @@ def __init__(
self._remote_endpoints: dict[tuple[int, int], tuple[str, int]] = {}
self._next_unicast_transfer_id = int.from_bytes(os.urandom(6), "little")

# Register unicast RX readers on TX sockets
# Async RX tasks (platform-agnostic, replaces add_reader)
self._unicast_rx_tasks: list[asyncio.Task[None]] = []
self._mcast_rx_tasks: dict[tuple[int, int], asyncio.Task[None]] = {}

# Start unicast RX tasks on TX sockets
for i, sock in enumerate(self._tx_socks):
self._loop.add_reader(sock.fileno(), self._on_unicast_data, i)
task = self._loop.create_task(self._unicast_rx_loop(sock, i))
self._unicast_rx_tasks.append(task)

_logger.info(
"UDPTransport initialized: uid=0x%016x, interfaces=%s, modulus=%d",
Expand Down Expand Up @@ -535,30 +539,13 @@ def _create_mcast_socket(subject_id: int, iface: Interface) -> socket.socket:

async def _async_sendto(self, sock: socket.socket, data: bytes, addr: tuple[str, int], deadline: Instant) -> None:
"""Send a UDP datagram, suspending until writable or deadline exceeded."""
loop = self._loop
while True:
remaining_ns = deadline.ns - Instant.now().ns
if remaining_ns <= 0:
raise SendError("Deadline exceeded")
try:
sock.sendto(data, addr)
return
except BlockingIOError:
# Socket buffer full -- wait for writability or deadline
fut: asyncio.Future[None] = loop.create_future()
fd = sock.fileno()

def _ready() -> None:
loop.remove_writer(fd)
if not fut.done():
fut.set_result(None)

loop.add_writer(fd, _ready)
try:
await asyncio.wait_for(fut, timeout=remaining_ns * 1e-9)
except asyncio.TimeoutError:
loop.remove_writer(fd)
raise SendError("Deadline exceeded waiting for socket writability")
remaining_ns = deadline.ns - Instant.now().ns
if remaining_ns <= 0:
raise SendError("Deadline exceeded")
try:
await asyncio.wait_for(self._loop.sock_sendto(sock, data, addr), timeout=remaining_ns * 1e-9)
except asyncio.TimeoutError:
raise SendError("Deadline exceeded waiting for socket writability")

# -- Transport ABC --

Expand All @@ -574,7 +561,8 @@ def subject_listen(self, subject_id: int, handler: Callable[[TransportArrival],
key = (subject_id, i)
sock = self._create_mcast_socket(subject_id, iface)
self._mcast_socks[key] = sock
self._loop.add_reader(sock.fileno(), self._on_mcast_data, subject_id, i)
task = self._loop.create_task(self._mcast_rx_loop(sock, subject_id, i))
self._mcast_rx_tasks[key] = task
self._subject_handlers[subject_id].append(handler)
return _UDPSubjectListener(self, subject_id, handler)

Expand Down Expand Up @@ -619,65 +607,78 @@ def close(self) -> None:
return
self._closed = True
_logger.info("Closing UDPTransport uid=0x%016x", self._uid)
for task in self._unicast_rx_tasks:
task.cancel()
self._unicast_rx_tasks.clear()
for task in self._mcast_rx_tasks.values():
task.cancel()
self._mcast_rx_tasks.clear()
for sock in self._tx_socks:
try:
self._loop.remove_reader(sock.fileno())
except Exception:
pass
sock.close()
for sock in self._mcast_socks.values():
try:
self._loop.remove_reader(sock.fileno())
except Exception:
pass
sock.close()
self._mcast_socks.clear()
self._tx_socks.clear()
self._subject_handlers.clear()
self._reassemblers.clear()

# -- Internal RX callbacks --
# -- Internal async RX loops --

def _on_mcast_data(self, subject_id: int, iface_idx: int) -> None:
sock = self._mcast_socks.get((subject_id, iface_idx))
if sock is None:
return
async def _mcast_rx_loop(self, sock: socket.socket, subject_id: int, iface_idx: int) -> None:
"""Async receive loop for a multicast socket. Runs until cancelled or transport is closed."""
try:
data, addr = sock.recvfrom(65536)
except OSError as e:
_logger.debug("Multicast recv error on subject %d iface %d: %s", subject_id, iface_idx, e)
return
src_ip, src_port = addr[0], addr[1]
if (src_ip, src_port) in self._self_endpoints:
return # Self-send filter
self._process_subject_datagram(data, src_ip, src_port, subject_id, iface_idx)
while not self._closed:
try:
data, addr = await self._loop.sock_recvfrom(sock, 65536)
except OSError:
if self._closed:
break
_logger.debug("Multicast recv error on subject %d iface %d", subject_id, iface_idx)
await asyncio.sleep(0.1)
continue
src_ip, src_port = addr[0], addr[1]
if (src_ip, src_port) in self._self_endpoints:
continue # Self-send filter
self._process_subject_datagram(data, src_ip, src_port, subject_id, iface_idx)
except asyncio.CancelledError:
pass

def _on_unicast_data(self, iface_idx: int) -> None:
sock = self._tx_socks[iface_idx]
async def _unicast_rx_loop(self, sock: socket.socket, iface_idx: int) -> None:
"""Async receive loop for a unicast socket. Runs until cancelled or transport is closed."""
try:
data, addr = sock.recvfrom(65536)
except OSError as e:
_logger.debug("Unicast recv error on iface %d: %s", iface_idx, e)
return
src_ip, src_port = addr[0], addr[1]
if len(data) < HEADER_SIZE:
return
header = _header_deserialize(data[:HEADER_SIZE])
if header is None:
return
# Record remote endpoint for unicast discovery
self._remote_endpoints[(header.sender_uid, iface_idx)] = (src_ip, src_port)
payload_chunk = data[HEADER_SIZE:]
result = self._unicast_reassembler.accept(header, payload_chunk)
if result is not None:
sender_uid, priority, message = result
_logger.debug("Unicast transfer complete from sender_uid=0x%016x", sender_uid)
if self._unicast_handler is not None:
self._unicast_handler(
TransportArrival(
timestamp=Instant.now(), priority=Priority(priority), remote_id=sender_uid, message=message
)
)
while not self._closed:
try:
data, addr = await self._loop.sock_recvfrom(sock, 65536)
except OSError:
if self._closed:
break
_logger.debug("Unicast recv error on iface %d", iface_idx)
await asyncio.sleep(0.1)
continue
src_ip, src_port = addr[0], addr[1]
if len(data) < HEADER_SIZE:
continue
header = _header_deserialize(data[:HEADER_SIZE])
if header is None:
continue
# Record remote endpoint for unicast discovery
self._remote_endpoints[(header.sender_uid, iface_idx)] = (src_ip, src_port)
payload_chunk = data[HEADER_SIZE:]
result = self._unicast_reassembler.accept(header, payload_chunk)
if result is not None:
sender_uid, priority, message = result
_logger.debug("Unicast transfer complete from sender_uid=0x%016x", sender_uid)
if self._unicast_handler is not None:
self._unicast_handler(
TransportArrival(
timestamp=Instant.now(),
priority=Priority(priority),
remote_id=sender_uid,
message=message,
)
)
except asyncio.CancelledError:
pass

def _process_subject_datagram(
self, data: bytes, src_ip: str, src_port: int, subject_id: int, iface_idx: int
Expand Down
50 changes: 15 additions & 35 deletions tests/test_udp.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import os
import socket
import struct
import sys
from ipaddress import IPv4Address
from unittest.mock import patch

Expand Down Expand Up @@ -692,10 +691,6 @@ def loopback_iface():
return _get_loopback_iface()


_SKIP_WINDOWS = pytest.mark.skipif(sys.platform == "win32", reason="add_reader not supported on ProactorEventLoop")


@_SKIP_WINDOWS
class TestIntegrationPubSub:
@pytest.mark.asyncio
async def test_single_frame_pubsub(self, loopback_iface):
Expand Down Expand Up @@ -789,7 +784,6 @@ async def test_empty_payload(self, loopback_iface):
sub.close()


@_SKIP_WINDOWS
class TestIntegrationUnicast:
@pytest.mark.asyncio
async def test_unicast_roundtrip(self, loopback_iface):
Expand Down Expand Up @@ -826,7 +820,6 @@ async def test_unicast_roundtrip(self, loopback_iface):
b.close()


@_SKIP_WINDOWS
class TestIntegrationListenerLifecycle:
@pytest.mark.asyncio
async def test_listener_close_stops_delivery(self, loopback_iface):
Expand Down Expand Up @@ -902,7 +895,6 @@ async def test_close_one_listener_keeps_other(self, loopback_iface):
sub.close()


@_SKIP_WINDOWS
class TestIntegrationTransportClose:
@pytest.mark.asyncio
async def test_close_cleans_up(self, loopback_iface):
Expand Down Expand Up @@ -944,7 +936,6 @@ async def test_subject_id_modulus(self, loopback_iface):
t2.close()


@_SKIP_WINDOWS
class TestIntegrationSelfSendFilter:
@pytest.mark.asyncio
async def test_self_send_filtered(self, loopback_iface):
Expand All @@ -961,7 +952,6 @@ async def test_self_send_filtered(self, loopback_iface):
t.close()


@_SKIP_WINDOWS
class TestIntegrationDifferentSubjects:
@pytest.mark.asyncio
async def test_messages_isolated_by_subject(self, loopback_iface):
Expand Down Expand Up @@ -1007,7 +997,6 @@ async def test_empty_list_raises(self):
# =====================================================================================================================


@_SKIP_WINDOWS
class TestAsyncSendto:
@pytest.mark.asyncio
async def test_deadline_already_expired(self, loopback_iface):
Expand All @@ -1031,43 +1020,36 @@ async def test_sendto_immediate_success(self, loopback_iface):
t.close()

@pytest.mark.asyncio
async def test_sendto_retries_on_blocking(self, loopback_iface):
"""Mock sendto to raise BlockingIOError first, succeed second."""
async def test_sendto_delegates_to_loop(self, loopback_iface):
"""Verify _async_sendto delegates to loop.sock_sendto."""
t = UDPTransport(interfaces=[loopback_iface])
try:
sock = t._tx_socks[0]
original_sendto = socket.socket.sendto
call_count = 0
called = False

def mock_sendto(self_sock, data, *args):
nonlocal call_count
if self_sock is sock:
call_count += 1
if call_count == 1:
raise BlockingIOError()
return original_sendto(self_sock, data, *args)
async def mock_sock_sendto(s, data, addr):
nonlocal called
called = True

deadline = Instant.now() + 2.0
with patch.object(socket.socket, "sendto", mock_sendto):
with patch.object(t._loop, "sock_sendto", mock_sock_sendto):
await t._async_sendto(sock, b"retry", ("127.0.0.1", sock.getsockname()[1]), deadline)
assert call_count == 2
assert called
finally:
t.close()

@pytest.mark.asyncio
async def test_deadline_exceeded_during_wait(self, loopback_iface):
"""Mock socket always blocks, short deadline -> SendError."""
"""sock_sendto hangs forever, short deadline -> SendError."""
t = UDPTransport(interfaces=[loopback_iface])
try:
sock = t._tx_socks[0]

def mock_sendto(self_sock, data, *args):
if self_sock is sock:
raise BlockingIOError()
return socket.socket.sendto(self_sock, data, *args)
async def mock_sock_sendto(s, data, addr):
await asyncio.sleep(100)

deadline = Instant.now() + 0.05 # 50ms
with patch.object(socket.socket, "sendto", mock_sendto):
with patch.object(t._loop, "sock_sendto", mock_sock_sendto):
with pytest.raises(SendError):
await t._async_sendto(sock, b"block", ("127.0.0.1", 9999), deadline)
finally:
Expand All @@ -1080,13 +1062,11 @@ async def test_sendto_os_error_propagates(self, loopback_iface):
try:
sock = t._tx_socks[0]

def mock_sendto(self_sock, data, *args):
if self_sock is sock:
raise OSError("Network unreachable")
return socket.socket.sendto(self_sock, data, *args)
async def mock_sock_sendto(s, data, addr):
raise OSError("Network unreachable")

deadline = Instant.now() + 2.0
with patch.object(socket.socket, "sendto", mock_sendto):
with patch.object(t._loop, "sock_sendto", mock_sock_sendto):
with pytest.raises(OSError, match="Network unreachable"):
await t._async_sendto(sock, b"fail", ("127.0.0.1", 9999), deadline)
finally:
Expand Down
Loading