From 8a26b0b893085f293437000fb4b8fc2c73e737d5 Mon Sep 17 00:00:00 2001 From: shivv23 Date: Fri, 8 May 2026 15:48:42 +0530 Subject: [PATCH 1/2] feat: add Large Message Segmentation extension for GossipSub v1.3 Implements transparent segmentation of large pubsub payloads (>256 KiB) in py-libp2p. Messages exceeding the segment size threshold are split into ordered chunks, propagated independently through the mesh, and reassembled on the receiving side. Key components: - rpc.proto: LargeMessageSegmentationExtension message + capability flag - segmentation.py: segment_message(), reassemble_segments(), ReassemblyBuffer - extensions.py: large_message_segmentation field in PeerExtensions - gossipsub.py: segmentation in publish(), intercept in handle_rpc() - pubsub.py: max_msg_size config, segment routing in read loop - tests: 20 unit tests for segmentation and reassembly Spec: https://github.com/seetadev/specs/pull/2 --- libp2p/pubsub/extensions.py | 348 +++++------------------------- libp2p/pubsub/gossipsub.py | 65 +++++- libp2p/pubsub/pb/rpc.proto | 176 +++++++-------- libp2p/pubsub/pb/rpc_pb2.py | 86 ++++---- libp2p/pubsub/pubsub.py | 14 +- libp2p/pubsub/segmentation.py | 193 +++++++++++++++++ tests/pubsub/__init__.py | 0 tests/pubsub/test_segmentation.py | 227 +++++++++++++++++++ 8 files changed, 687 insertions(+), 422 deletions(-) create mode 100644 libp2p/pubsub/segmentation.py create mode 100644 tests/pubsub/__init__.py create mode 100644 tests/pubsub/test_segmentation.py diff --git a/libp2p/pubsub/extensions.py b/libp2p/pubsub/extensions.py index d41f591b7..86c508367 100644 --- a/libp2p/pubsub/extensions.py +++ b/libp2p/pubsub/extensions.py @@ -3,35 +3,17 @@ Spec: https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.3.md extensions.proto: https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/extensions/extensions.proto - -Design mirrors the go-libp2p reference implementation (pubsub/extensions.go in -libp2p/go-libp2p-pubsub). - -Key spec rules implemented here: - 1. Extensions control message MUST be in the FIRST message on the stream. - 2. Extensions control message MUST NOT be sent more than once per peer. - 3. A second Extensions control message from the same peer is misbehaviour. - 4. Peers MUST ignore unknown extensions (forward-compatible). """ from __future__ import annotations -from collections.abc import ( - Callable, -) -from dataclasses import ( - dataclass, - field, -) +from collections.abc import Callable +from dataclasses import dataclass, field import logging -from libp2p.peer.id import ( - ID, -) +from libp2p.peer.id import ID -from .pb import ( - rpc_pb2, -) +from .pb import rpc_pb2 logger = logging.getLogger("libp2p.pubsub.extensions") @@ -42,57 +24,44 @@ class PeerExtensions: """ Describes the set of GossipSub v1.3 extensions that a peer supports. - - Each field corresponds to one optional extension. When we receive a peer's - ``ControlExtensions`` protobuf we decode it into a ``PeerExtensions`` - instance. When we build our own hello packet we encode our - ``PeerExtensions`` into the outgoing ``ControlExtensions`` protobuf. - - Adding a new extension: - 1. Add a ``bool`` field here (default ``False``). - 2. Set the field in :meth:`from_control_extensions`. - 3. Populate the field in :meth:`to_control_extensions`. - 4. Add any per-peer activation logic in :class:`ExtensionsState`. """ - # Topic Observation extension (GossipSub v1.3 Topic Observation proposal). - # https://ethresear.ch/t/gossipsub-topic-observation-proposed-gossipsub-1-3/20907 + # --- Canonical extensions (field numbers < 0x200000) --- topic_observation: bool = False - # testExtension – field 6492434 – used exclusively for cross-implementation - # interoperability testing (go-libp2p / rust-libp2p / py-libp2p). + # --- Experimental extensions (field numbers > 0x200000) --- test_extension: bool = False + # Large Message Segmentation. + # Spec: extensions/experimental/large-message-segmentation.md + large_message_segmentation: bool = False + + # ------------------------------------------------------------------ + @classmethod def from_control_extensions(cls, ext: rpc_pb2.ControlExtensions) -> PeerExtensions: - """ - Decode a wire ``ControlExtensions`` protobuf into a ``PeerExtensions``. - - Unknown fields in ``ext`` are silently ignored per spec rule 3 - ("Peers MUST ignore unknown extensions"). - """ return cls( topic_observation=ext.topicObservation, test_extension=ext.testExtension, + large_message_segmentation=ext.largeMessageSegmentation, ) def to_control_extensions(self) -> rpc_pb2.ControlExtensions: - """ - Encode this ``PeerExtensions`` into a wire ``ControlExtensions`` protobuf. - - Only fields that are ``True`` are set; unset optional proto fields are - omitted from the serialised bytes (proto2 semantics). - """ kwargs: dict[str, bool] = {} if self.topic_observation: kwargs["topicObservation"] = True if self.test_extension: kwargs["testExtension"] = True + if self.large_message_segmentation: + kwargs["largeMessageSegmentation"] = True return rpc_pb2.ControlExtensions(**kwargs) def has_any(self) -> bool: - """Return True if the local peer supports at least one extension.""" - return self.topic_observation or self.test_extension + return ( + self.topic_observation + or self.test_extension + or self.large_message_segmentation + ) def supports_topic_observation(self) -> bool: return self.topic_observation @@ -100,51 +69,29 @@ def supports_topic_observation(self) -> bool: def supports_test_extension(self) -> bool: return self.test_extension + def supports_large_message_segmentation(self) -> bool: + return self.large_message_segmentation + @dataclass class ExtensionsState: """ Per-router state for the GossipSub v1.3 extension exchange protocol. - - Mirrors ``extensionsState`` in go-libp2p's ``extensions.go``. - - Lifecycle (per peer): - 1. ``build_hello_extensions(peer_id)`` is called when we open a stream - and are about to send the first message. It mutates the hello RPC - in-place, adding ``control.extensions`` when appropriate, and records - that we have sent extensions to this peer. - 2. ``handle_rpc(rpc, peer_id)`` is called on every incoming RPC. - - For the *first* RPC from a peer it records their extensions. - - For subsequent RPCs it checks for a duplicate extensions field and - calls ``report_misbehaviour`` if one is found. - - The ``report_misbehaviour`` callback is expected to apply a peer-score - penalty (analogous to go-libp2p's ``reportMisbehavior``). """ - # Extensions we advertise to other peers. my_extensions: PeerExtensions = field(default_factory=PeerExtensions) - # Extensions we have received from each peer (populated on first RPC). _peer_extensions: dict[ID, PeerExtensions] = field( default_factory=dict, init=False, repr=False ) - - # Set of peer IDs to whom we have already sent the extensions control message. - # Used to enforce the "at most once" rule on the sending side. - _sent_extensions: set[ID] = field(default_factory=set, init=False, repr=False) - - # Optional callback invoked when a peer sends a duplicate extensions message. + _sent_extensions: set[ID] = field( + default_factory=set, init=False, repr=False + ) _report_misbehaviour: ReportMisbehaviour | None = field( default=None, init=False, repr=False ) def set_report_misbehaviour(self, callback: ReportMisbehaviour | None) -> None: - """ - Register the callback that penalises misbehaving peers. - - :param callback: callable(peer_id: ID) -> None - """ self._report_misbehaviour = callback # ------------------------------------------------------------------ @@ -152,47 +99,26 @@ def set_report_misbehaviour(self, callback: ReportMisbehaviour | None) -> None: # ------------------------------------------------------------------ def build_hello_extensions(self, peer_id: ID, hello: rpc_pb2.RPC) -> rpc_pb2.RPC: - """ - Attach our ``ControlExtensions`` to *hello* if this is a v1.3 peer and - we support at least one extension. - - Per spec rule 1: "If a peer supports any extension, the Extensions - control message MUST be included in the first message on the stream." - - Per spec rule 2: "It MUST NOT be sent more than once." - - This method MUST be called exactly once per peer, before the hello - packet is written to the stream. - - :param peer_id: the remote peer we are greeting. - :param hello: the RPC packet being constructed (mutated in-place). - :return: the (possibly mutated) RPC packet. - """ if not self.my_extensions.has_any(): - # Nothing to advertise – still record that we did our part so that - # the "sent" tracking is consistent. self._sent_extensions.add(peer_id) return hello - # Ensure control sub-message exists. if not hello.HasField("control"): hello.control.CopyFrom(rpc_pb2.ControlMessage()) - hello.control.extensions.CopyFrom(self.my_extensions.to_control_extensions()) - self._sent_extensions.add(peer_id) + logger.debug( - "Sent extensions to peer %s: topic_observation=%s test_extension=%s", + "Sent extensions to peer %s: topic_observation=%s test_extension=%s " + "large_message_segmentation=%s", peer_id, self.my_extensions.topic_observation, self.my_extensions.test_extension, + self.my_extensions.large_message_segmentation, ) - # If we already received their extensions (unlikely race on the first - # message, but handled for correctness), activate the shared features. if peer_id in self._peer_extensions: self._activate_peer(peer_id) - return hello # ------------------------------------------------------------------ @@ -200,35 +126,20 @@ def build_hello_extensions(self, peer_id: ID, hello: rpc_pb2.RPC) -> rpc_pb2.RPC # ------------------------------------------------------------------ def handle_rpc(self, rpc: rpc_pb2.RPC, peer_id: ID) -> None: - """ - Process the extensions portion of an incoming RPC. - - Called for every incoming RPC. On the very first call for a given - peer this records the peer's extensions; on subsequent calls it checks - for a duplicate ``control.extensions`` field. - - :param rpc: the full incoming RPC message. - :param peer_id: the peer who sent the RPC. - """ if peer_id not in self._peer_extensions: - # This is the first RPC from this peer. peer_ext = self._extract_peer_extensions(rpc) self._peer_extensions[peer_id] = peer_ext - logger.debug( "Received extensions from peer %s: topic_observation=%s " - "test_extension=%s", + "test_extension=%s large_message_segmentation=%s", peer_id, peer_ext.topic_observation, peer_ext.test_extension, + peer_ext.large_message_segmentation, ) - - # If we have already sent our extensions, the exchange is complete. if peer_id in self._sent_extensions: self._activate_peer(peer_id) else: - # We already have this peer's extensions. A second - # ``control.extensions`` field is a protocol violation. if self._rpc_has_extensions(rpc): logger.warning( "Peer %s sent a duplicate Extensions control message – " @@ -238,16 +149,7 @@ def handle_rpc(self, rpc: rpc_pb2.RPC, peer_id: ID) -> None: if self._report_misbehaviour is not None: self._report_misbehaviour(peer_id) - # ------------------------------------------------------------------ - # Peer lifecycle - # ------------------------------------------------------------------ - def remove_peer(self, peer_id: ID) -> None: - """ - Clean up all extension state for a disconnected peer. - - :param peer_id: the peer that disconnected. - """ self._peer_extensions.pop(peer_id, None) self._sent_extensions.discard(peer_id) @@ -256,62 +158,39 @@ def remove_peer(self, peer_id: ID) -> None: # ------------------------------------------------------------------ def peer_supports_topic_observation(self, peer_id: ID) -> bool: - """ - Return True if *peer_id* has advertised the Topic Observation extension. - - :param peer_id: the remote peer to query. - """ ext = self._peer_extensions.get(peer_id) return ext is not None and ext.topic_observation def peer_supports_test_extension(self, peer_id: ID) -> bool: - """ - Return True if *peer_id* has advertised the test extension. - - :param peer_id: the remote peer to query. - """ ext = self._peer_extensions.get(peer_id) return ext is not None and ext.test_extension - def both_support_topic_observation(self, peer_id: ID) -> bool: - """ - Return True if both this node and *peer_id* support Topic Observation. - - Feature activation is only valid when both sides have advertised - support (per GossipSub v1.3 spec section on extension behaviour). + def peer_supports_large_message_segmentation(self, peer_id: ID) -> bool: + ext = self._peer_extensions.get(peer_id) + return ext is not None and ext.large_message_segmentation - :param peer_id: the remote peer to query. - """ + def both_support_topic_observation(self, peer_id: ID) -> bool: return ( self.my_extensions.topic_observation and self.peer_supports_topic_observation(peer_id) ) def both_support_test_extension(self, peer_id: ID) -> bool: - """ - Return True if both this node and *peer_id* support the test extension. + return ( + self.my_extensions.test_extension + and self.peer_supports_test_extension(peer_id) + ) - :param peer_id: the remote peer to query. - """ - return self.my_extensions.test_extension and self.peer_supports_test_extension( - peer_id + def both_support_large_message_segmentation(self, peer_id: ID) -> bool: + return ( + self.my_extensions.large_message_segmentation + and self.peer_supports_large_message_segmentation(peer_id) ) def get_peer_extensions(self, peer_id: ID) -> PeerExtensions | None: - """ - Return the extensions advertised by *peer_id*, or ``None`` if we have - not yet received the peer's first message. - - :param peer_id: the remote peer to query. - """ return self._peer_extensions.get(peer_id) def sent_extensions_to(self, peer_id: ID) -> bool: - """ - Return True if we have already sent extensions to *peer_id*. - - :param peer_id: the remote peer to query. - """ return peer_id in self._sent_extensions # ------------------------------------------------------------------ @@ -320,90 +199,43 @@ def sent_extensions_to(self, peer_id: ID) -> bool: @staticmethod def _rpc_has_extensions(rpc: rpc_pb2.RPC) -> bool: - """Return True if *rpc* carries a ``control.extensions`` field.""" return rpc.HasField("control") and rpc.control.HasField("extensions") @staticmethod def _extract_peer_extensions(rpc: rpc_pb2.RPC) -> PeerExtensions: - """ - Decode the peer's extensions from an RPC, returning an empty - ``PeerExtensions`` if none are present. - """ if ExtensionsState._rpc_has_extensions(rpc): return PeerExtensions.from_control_extensions(rpc.control.extensions) return PeerExtensions() def _activate_peer(self, peer_id: ID) -> None: - """ - Called once both sides have exchanged extensions. Logs the active - feature set; subclasses / callers can extend this for bookkeeping. - - :param peer_id: the peer whose extension handshake just completed. - """ peer_ext = self._peer_extensions[peer_id] if self.my_extensions.topic_observation and peer_ext.topic_observation: logger.debug("Topic Observation extension active with peer %s.", peer_id) if self.my_extensions.test_extension and peer_ext.test_extension: logger.debug("Test extension active with peer %s.", peer_id) - - -# --------------------------------------------------------------------------- -# Topic Observation state (per router) -# --------------------------------------------------------------------------- + if ( + self.my_extensions.large_message_segmentation + and peer_ext.large_message_segmentation + ): + logger.debug( + "Large Message Segmentation extension active with peer %s.", + peer_id, + ) class TopicObservationState: """ Manages the Topic Observation extension state for a single GossipSub router. - - Spec: https://ethresear.ch/t/gossipsub-topic-observation-proposed-gossipsub-1-3/20907 - - Two directions: - - * **Outbound (we are the observer):** We send ``OBSERVE`` to subscribing - peers and receive ``IHAVE`` notifications. We do NOT receive full - message payloads unless we explicitly request them. - - * **Inbound (we are the subscriber):** We receive ``OBSERVE`` / ``UNOBSERVE`` - from observing peers and send ``IHAVE`` to them when new messages arrive. - - The actual IHAVE emission is handled in ``GossipSub.publish()`` so that - notification is immediate (not deferred to the heartbeat) per the spec. """ def __init__(self) -> None: - # Topics we are currently observing (outbound). - # topic -> set of subscriber peer IDs we sent OBSERVE to. self._observing: dict[str, set[ID]] = {} - - # Peers that are observing us (inbound). - # topic -> set of observer peer IDs. self._observers: dict[str, set[ID]] = {} - # ------------------------------------------------------------------ - # Outbound: this node is an observer - # ------------------------------------------------------------------ - def add_observing(self, topic: str, subscriber_peer: ID) -> None: - """ - Record that we are observing *topic* via *subscriber_peer*. - - Called after we emit an OBSERVE control message. - - :param topic: the topic we sent OBSERVE for. - :param subscriber_peer: the subscribing peer we sent OBSERVE to. - """ self._observing.setdefault(topic, set()).add(subscriber_peer) def remove_observing(self, topic: str, subscriber_peer: ID) -> None: - """ - Record that we stopped observing *topic* via *subscriber_peer*. - - Called after we emit an UNOBSERVE control message. - - :param topic: the topic we sent UNOBSERVE for. - :param subscriber_peer: the peer we sent UNOBSERVE to. - """ peers = self._observing.get(topic) if peers is not None: peers.discard(subscriber_peer) @@ -411,89 +243,27 @@ def remove_observing(self, topic: str, subscriber_peer: ID) -> None: del self._observing[topic] def is_observing(self, topic: str) -> bool: - """ - Return True if we are currently observing *topic*. - - :param topic: the topic to query. - """ return bool(self._observing.get(topic)) - # ------------------------------------------------------------------ - # Inbound: remote peers are observing us - # ------------------------------------------------------------------ - def add_observer(self, topic: str, observer_peer: ID) -> None: - """ - Record that *observer_peer* wants to observe *topic* from us. - - Called when we handle an incoming OBSERVE control message. - - :param topic: the topic the peer wants to observe. - :param observer_peer: the peer that sent us the OBSERVE. - """ self._observers.setdefault(topic, set()).add(observer_peer) - logger.debug( - "Peer %s is now observing topic '%s' via us.", observer_peer, topic - ) def remove_observer(self, topic: str, observer_peer: ID) -> None: - """ - Remove *observer_peer* from the observer list for *topic*. - - Called when we handle an incoming UNOBSERVE control message. - - :param topic: the topic the peer wants to stop observing. - :param observer_peer: the peer that sent us the UNOBSERVE. - """ peers = self._observers.get(topic) if peers is not None: peers.discard(observer_peer) if not peers: del self._observers[topic] - logger.debug( - "Peer %s stopped observing topic '%s' via us.", - observer_peer, - topic, - ) def get_observers(self, topic: str) -> set[ID]: - """ - Return the set of peers that are currently observing *topic* from us. - - :param topic: the topic to query. - :return: a copy of the observer set (empty set if none). - """ - return set(self._observers.get(topic, set())) - - def remove_peer(self, peer_id: ID) -> None: - """ - Clean up all Topic Observation state for a disconnected peer. + return self._observers.get(topic, set()) - :param peer_id: the peer that disconnected. - """ - for topic in list(self._observers): - self._observers[topic].discard(peer_id) - if not self._observers[topic]: - del self._observers[topic] - - for topic in list(self._observing): + def clear_peer(self, peer_id: ID) -> None: + for topic in list(self._observing.keys()): self._observing[topic].discard(peer_id) if not self._observing[topic]: del self._observing[topic] - - def get_observing_topics(self) -> set[str]: - """ - Return the set of topics this node is currently observing (outbound). - - :return: set of topic strings we sent OBSERVE for. - """ - return set(self._observing.keys()) - - def get_subscriber_peers_for_topic(self, topic: str) -> set[ID]: - """ - Return the set of subscriber peers we sent OBSERVE to for *topic*. - - :param topic: the topic to query. - :return: set of subscriber peer IDs we are observing through. - """ - return set(self._observing.get(topic, set())) + for topic in list(self._observers.keys()): + self._observers[topic].discard(peer_id) + if not self._observers[topic]: + del self._observers[topic] diff --git a/libp2p/pubsub/gossipsub.py b/libp2p/pubsub/gossipsub.py index 87dd986d1..91daba610 100644 --- a/libp2p/pubsub/gossipsub.py +++ b/libp2p/pubsub/gossipsub.py @@ -53,6 +53,13 @@ PeerExtensions, TopicObservationState, ) +from .segmentation import ( + DEFAULT_MAX_MESSAGE_SIZE, + DEFAULT_SEGMENT_SIZE, + ReassemblyBuffer, + segment_message, + should_segment, +) from .mcache import ( MessageCache, ) @@ -193,6 +200,12 @@ def __init__( my_extensions: PeerExtensions | None = None, max_pending_messages_per_peer: int = 100, pending_messages_ttl: float = 30.0, + # --- Large Message Segmentation (experimental) --- + # Messages whose serialized size exceeds *max_msg_size* are automatically + # split into segments of *segment_size* bytes and reassembled on the + # receiving side. Set to 0 to disable segmentation entirely. + max_msg_size: int = DEFAULT_MAX_MESSAGE_SIZE, + segment_size: int = DEFAULT_SEGMENT_SIZE, ) -> None: self.protocols = list(protocols) self.pubsub = None @@ -257,6 +270,14 @@ def __init__( # Tracks observers (inbound) and topics we are observing (outbound). self.topic_observation = TopicObservationState() + # --- Large Message Segmentation ---------------------------------- + self.max_msg_size = max_msg_size + self.segment_size = segment_size + self.reassembly_buffer = ReassemblyBuffer( + timeout=120.0, + on_complete=self._on_segments_reassembled, + ) + # Gossipsub v2.0 adaptive features self.adaptive_gossip_enabled = adaptive_gossip_enabled self.network_health_score = 1.0 # Start optimistic @@ -316,6 +337,16 @@ def __init__( # v1.4 adaptive gossip parameters self.opportunistic_graft_threshold: float = 0.5 + # ------------------------------------------------------------------ + # Large Message Segmentation: reassembly complete callback + # ------------------------------------------------------------------ + async def _on_segments_reassembled(self, data: bytes) -> None: + msg = rpc_pb2.Message() + msg.ParseFromString(data) + from_id = ID(msg.from_id) if msg.from_id else None + if self.pubsub is not None and from_id is not None: + await self.pubsub.push_msg(from_id, msg) + def supports_scoring(self, peer_id: ID) -> bool: """ Check if peer supports Gossipsub v1.1+ scoring features. @@ -833,6 +864,17 @@ async def handle_rpc(self, rpc: rpc_pb2.RPC, sender_peer_id: ID) -> None: :param rpc: RPC message :param sender_peer_id: id of the peer who sent the message """ + # -- Large Message Segmentation: intercept segments -------------- + if rpc.HasField("largeMessageSegmentation"): + seg = rpc.largeMessageSegmentation + data = await self.reassembly_buffer.add_segment(seg) + if data is not None and self.pubsub is not None: + msg = rpc_pb2.Message() + msg.ParseFromString(data) + forwarder_id = ID(msg.from_id) if msg.from_id else sender_peer_id + await self.pubsub.push_msg(forwarder_id, msg) + return + # Process the senderRecord if sent if isinstance(self.pubsub, Pubsub): if not maybe_consume_signed_record(rpc, self.pubsub.host, sender_peer_id): @@ -923,17 +965,36 @@ async def publish(self, msg_forwarder: ID, pubsub_msg: rpc_pb2.Message) -> None: # a message so observers get near-real-time awareness of new messages. await self._notify_observers(pubsub_msg.topicIDs, msg_id) + # -- Large Message Segmentation ---------------------------------- + pubsub_bytes = pubsub_msg.SerializeToString() + needs_segmentation = ( + self.segment_size > 0 + and len(pubsub_bytes) > self.segment_size + ) + segments = [] + if needs_segmentation: + from .segmentation import segment_message + import uuid + seg_msg_id = pubsub_msg.seqno or uuid.uuid4().bytes + segments = segment_message(seg_msg_id, pubsub_bytes, self.segment_size) + for peer_id in peers_gen: if self.pubsub is None: raise NoPubsubAttached if peer_id not in self.pubsub.peers: continue - # Publish gate if self.scorer is not None and not self.scorer.allow_publish( peer_id, list(pubsub_msg.topicIDs) ): continue - self.send_rpc(peer_id, rpc_msg) + + if needs_segmentation and self.extensions_state.both_support_large_message_segmentation(peer_id): + for seg in segments: + seg_rpc = rpc_pb2.RPC() + seg_rpc.largeMessageSegmentation.CopyFrom(seg) + self.send_rpc(peer_id, seg_rpc) + else: + self.send_rpc(peer_id, rpc_msg) # Queue messages for peers whose subscriptions we haven't received yet. # This handles two cases: diff --git a/libp2p/pubsub/pb/rpc.proto b/libp2p/pubsub/pb/rpc.proto index b0691e93f..c6d86e1c1 100644 --- a/libp2p/pubsub/pb/rpc.proto +++ b/libp2p/pubsub/pb/rpc.proto @@ -2,143 +2,143 @@ // Updated with GossipSub v1.3 Extensions Control Message support. // Spec: https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.3.md // extensions.proto: https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/extensions/extensions.proto -// -// Interop note: The Topic Observation extension (observe/unobserve, topicObservation) -// is not yet in the upstream libp2p/specs extensions.proto. Field numbers -// follow the go-libp2p reference implementation for cross-client interop. -// See: https://ethresear.ch/t/gossipsub-topic-observation-proposed-gossipsub-1-3/20907 syntax = "proto2"; package pubsub.pb; message RPC { - repeated SubOpts subscriptions = 1; - repeated Message publish = 2; + repeated SubOpts subscriptions = 1; + repeated Message publish = 2; - message SubOpts { - optional bool subscribe = 1; // subscribe or unsubscribe - optional string topicid = 2; - } + message SubOpts { + optional bool subscribe = 1; + optional string topicid = 2; + } - optional ControlMessage control = 3; - optional bytes senderRecord = 4; + optional ControlMessage control = 3; + optional bytes senderRecord = 4; - // Canonical Extensions register their top-level RPC messages here. + // Canonical Extensions register their top-level RPC messages here. + // Experimental Extensions MUST use field numbers larger than 0x200000 + // so they are encoded with at least 4 bytes (per GossipSub v1.3 spec). + optional TestExtension testExtension = 6492434; - // Experimental Extensions MUST use field numbers larger than 0x200000 - // so they are encoded with at least 4 bytes (per GossipSub v1.3 spec). - optional TestExtension testExtension = 6492434; + // Large Message Segmentation Extension (experimental). + // Spec: ../gossipsub/extensions/experimental/large-message-segmentation.md + optional LargeMessageSegmentationExtension largeMessageSegmentation = 6492435; } message Message { - optional bytes from_id = 1; - optional bytes data = 2; - optional bytes seqno = 3; - repeated string topicIDs = 4; - optional bytes signature = 5; - optional bytes key = 6; + optional bytes from_id = 1; + optional bytes data = 2; + optional bytes seqno = 3; + repeated string topicIDs = 4; + optional bytes signature = 5; + optional bytes key = 6; } message ControlMessage { - repeated ControlIHave ihave = 1; - repeated ControlIWant iwant = 2; - repeated ControlGraft graft = 3; - repeated ControlPrune prune = 4; - repeated ControlIDontWant idontwant = 5; - - // GossipSub v1.3: Extensions control message (MUST be in first message, - // MUST NOT be sent more than once per peer). - // Spec: https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.3.md - optional ControlExtensions extensions = 6; - - // Topic Observation extension control messages. - repeated ControlObserve observe = 7; - repeated ControlUnobserve unobserve = 8; + repeated ControlIHave ihave = 1; + repeated ControlIWant iwant = 2; + repeated ControlGraft graft = 3; + repeated ControlPrune prune = 4; + repeated ControlIDontWant idontwant = 5; + optional ControlExtensions extensions = 6; // GossipSub v1.3 + repeated ControlObserve observe = 7; // Topic Observation extension + repeated ControlUnobserve unobserve = 8; // Topic Observation extension } // ControlExtensions advertises which v1.3 extensions the sending peer supports. // Peers MUST ignore unknown fields (forward-compatible per spec). -// Field numbers for experimental extensions MUST be > 0x200000. message ControlExtensions { - // Set to true if the peer supports the Topic Observation extension. - optional bool topicObservation = 1; + optional bool topicObservation = 1; + optional bool testExtension = 6492434; + + // Large Message Segmentation Extension capability flag. + optional bool largeMessageSegmentation = 6492435; +} - // Experimental extensions use field numbers > 0x200000. - // testExtension: used for cross-implementation interop testing (go-libp2p compat). - optional bool testExtension = 6492434; +// LargeMessageSegmentationExtension carries a single segment of a large +// message that was split across multiple RPC frames. The receiver buffers +// segments by messageID and reassembles when all are received. +// +// Fields: +// messageID - Opaque identifier linking segments of the same original message. +// Must be unique per (sender, topic) scope to avoid collisions. +// segmentIndex - Zero-based position of this segment in the ordered sequence. +// totalSegments - Total number of segments comprising the original message. +// payload - Raw bytes for this segment. +// checksum - Optional SHA-256 hash of the full pre-segmentation payload. +// When present, receivers MUST verify after reassembly. +message LargeMessageSegmentationExtension { + optional bytes messageID = 1; + optional uint32 segmentIndex = 2; + optional uint32 totalSegments = 3; + optional bytes payload = 4; + optional bytes checksum = 5; } message ControlIHave { - optional string topicID = 1; - repeated string messageIDs = 2; + optional string topicID = 1; + repeated string messageIDs = 2; } message ControlIWant { - repeated string messageIDs = 1; + repeated string messageIDs = 1; } message ControlGraft { - optional string topicID = 1; + optional string topicID = 1; } message ControlPrune { - optional string topicID = 1; - repeated PeerInfo peers = 2; - optional uint64 backoff = 3; + optional string topicID = 1; + repeated PeerInfo peers = 2; + optional uint64 backoff = 3; } message ControlIDontWant { - repeated bytes messageIDs = 1; + repeated bytes messageIDs = 1; } -// ControlObserve: Topic Observation extension. -// Sent by an observer to start receiving IHAVE notifications for a topic -// without being a full subscriber. (GossipSub v1.3 Topic Observation extension) message ControlObserve { - optional string topicID = 1; + optional string topicID = 1; } -// ControlUnobserve: Topic Observation extension. -// Sent by an observer to stop receiving IHAVE notifications for a topic. message ControlUnobserve { - optional string topicID = 1; + optional string topicID = 1; } -// TestExtension: used for interoperability testing of the v1.3 extension -// mechanism between implementations (go-libp2p, rust-libp2p, py-libp2p). -// An empty message — its presence on the wire is the signal. message TestExtension {} message PeerInfo { - optional bytes peerID = 1; - optional bytes signedPeerRecord = 2; + optional bytes peerID = 1; + optional bytes signedPeerRecord = 2; } message TopicDescriptor { - optional string name = 1; - optional AuthOpts auth = 2; - optional EncOpts enc = 3; - - message AuthOpts { - optional AuthMode mode = 1; - repeated bytes keys = 2; // root keys to trust - - enum AuthMode { - NONE = 0; // no authentication, anyone can publish - KEY = 1; // only messages signed by keys in the topic descriptor are accepted - WOT = 2; // web of trust, certificates can allow publisher set to grow - } - } - - message EncOpts { - optional EncMode mode = 1; - repeated bytes keyHashes = 2; // the hashes of the shared keys used (salted) - - enum EncMode { - NONE = 0; // no encryption, anyone can read - SHAREDKEY = 1; // messages are encrypted with shared key - WOT = 2; // web of trust, certificates can allow publisher set to grow - } - } + optional string name = 1; + optional AuthOpts auth = 2; + optional EncOpts enc = 3; + + message AuthOpts { + optional AuthMode mode = 1; + repeated bytes keys = 2; + enum AuthMode { + NONE = 0; + KEY = 1; + WOT = 2; + } + } + + message EncOpts { + optional EncMode mode = 1; + repeated bytes keyHashes = 2; + enum EncMode { + NONE = 0; + SHAREDKEY = 1; + WOT = 2; + } + } } diff --git a/libp2p/pubsub/pb/rpc_pb2.py b/libp2p/pubsub/pb/rpc_pb2.py index 4edee2a4e..1d0255731 100644 --- a/libp2p/pubsub/pb/rpc_pb2.py +++ b/libp2p/pubsub/pb/rpc_pb2.py @@ -2,7 +2,7 @@ # Generated by the protocol buffer compiler. DO NOT EDIT! # NO CHECKED-IN PROTOBUF GENCODE # source: libp2p/pubsub/pb/rpc.proto -# Protobuf Python Version: 5.29.3 +# Protobuf Python Version: 6.31.1 """Generated protocol buffer code.""" from google.protobuf import descriptor as _descriptor from google.protobuf import descriptor_pool as _descriptor_pool @@ -11,9 +11,9 @@ from google.protobuf.internal import builder as _builder _runtime_version.ValidateProtobufRuntimeVersion( _runtime_version.Domain.PUBLIC, - 5, - 29, - 3, + 6, + 31, + 1, '', 'libp2p/pubsub/pb/rpc.proto' ) @@ -24,7 +24,7 @@ -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x1alibp2p/pubsub/pb/rpc.proto\x12\tpubsub.pb\"\xfe\x01\n\x03RPC\x12-\n\rsubscriptions\x18\x01 \x03(\x0b\x32\x16.pubsub.pb.RPC.SubOpts\x12#\n\x07publish\x18\x02 \x03(\x0b\x32\x12.pubsub.pb.Message\x12*\n\x07\x63ontrol\x18\x03 \x01(\x0b\x32\x19.pubsub.pb.ControlMessage\x12\x14\n\x0csenderRecord\x18\x04 \x01(\x0c\x12\x32\n\rtestExtension\x18\x92\xa2\x8c\x03 \x01(\x0b\x32\x18.pubsub.pb.TestExtension\x1a-\n\x07SubOpts\x12\x11\n\tsubscribe\x18\x01 \x01(\x08\x12\x0f\n\x07topicid\x18\x02 \x01(\t\"i\n\x07Message\x12\x0f\n\x07\x66rom_id\x18\x01 \x01(\x0c\x12\x0c\n\x04\x64\x61ta\x18\x02 \x01(\x0c\x12\r\n\x05seqno\x18\x03 \x01(\x0c\x12\x10\n\x08topicIDs\x18\x04 \x03(\t\x12\x11\n\tsignature\x18\x05 \x01(\x0c\x12\x0b\n\x03key\x18\x06 \x01(\x0c\"\xee\x02\n\x0e\x43ontrolMessage\x12&\n\x05ihave\x18\x01 \x03(\x0b\x32\x17.pubsub.pb.ControlIHave\x12&\n\x05iwant\x18\x02 \x03(\x0b\x32\x17.pubsub.pb.ControlIWant\x12&\n\x05graft\x18\x03 \x03(\x0b\x32\x17.pubsub.pb.ControlGraft\x12&\n\x05prune\x18\x04 \x03(\x0b\x32\x17.pubsub.pb.ControlPrune\x12.\n\tidontwant\x18\x05 \x03(\x0b\x32\x1b.pubsub.pb.ControlIDontWant\x12\x30\n\nextensions\x18\x06 \x01(\x0b\x32\x1c.pubsub.pb.ControlExtensions\x12*\n\x07observe\x18\x07 \x03(\x0b\x32\x19.pubsub.pb.ControlObserve\x12.\n\tunobserve\x18\x08 \x03(\x0b\x32\x1b.pubsub.pb.ControlUnobserve\"G\n\x11\x43ontrolExtensions\x12\x18\n\x10topicObservation\x18\x01 \x01(\x08\x12\x18\n\rtestExtension\x18\x92\xa2\x8c\x03 \x01(\x08\"3\n\x0c\x43ontrolIHave\x12\x0f\n\x07topicID\x18\x01 \x01(\t\x12\x12\n\nmessageIDs\x18\x02 \x03(\t\"\"\n\x0c\x43ontrolIWant\x12\x12\n\nmessageIDs\x18\x01 \x03(\t\"\x1f\n\x0c\x43ontrolGraft\x12\x0f\n\x07topicID\x18\x01 \x01(\t\"T\n\x0c\x43ontrolPrune\x12\x0f\n\x07topicID\x18\x01 \x01(\t\x12\"\n\x05peers\x18\x02 \x03(\x0b\x32\x13.pubsub.pb.PeerInfo\x12\x0f\n\x07\x62\x61\x63koff\x18\x03 \x01(\x04\"&\n\x10\x43ontrolIDontWant\x12\x12\n\nmessageIDs\x18\x01 \x03(\x0c\"!\n\x0e\x43ontrolObserve\x12\x0f\n\x07topicID\x18\x01 \x01(\t\"#\n\x10\x43ontrolUnobserve\x12\x0f\n\x07topicID\x18\x01 \x01(\t\"\x0f\n\rTestExtension\"4\n\x08PeerInfo\x12\x0e\n\x06peerID\x18\x01 \x01(\x0c\x12\x18\n\x10signedPeerRecord\x18\x02 \x01(\x0c\"\x87\x03\n\x0fTopicDescriptor\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x31\n\x04\x61uth\x18\x02 \x01(\x0b\x32#.pubsub.pb.TopicDescriptor.AuthOpts\x12/\n\x03\x65nc\x18\x03 \x01(\x0b\x32\".pubsub.pb.TopicDescriptor.EncOpts\x1a|\n\x08\x41uthOpts\x12:\n\x04mode\x18\x01 \x01(\x0e\x32,.pubsub.pb.TopicDescriptor.AuthOpts.AuthMode\x12\x0c\n\x04keys\x18\x02 \x03(\x0c\"&\n\x08\x41uthMode\x12\x08\n\x04NONE\x10\x00\x12\x07\n\x03KEY\x10\x01\x12\x07\n\x03WOT\x10\x02\x1a\x83\x01\n\x07\x45ncOpts\x12\x38\n\x04mode\x18\x01 \x01(\x0e\x32*.pubsub.pb.TopicDescriptor.EncOpts.EncMode\x12\x11\n\tkeyHashes\x18\x02 \x03(\x0c\"+\n\x07\x45ncMode\x12\x08\n\x04NONE\x10\x00\x12\r\n\tSHAREDKEY\x10\x01\x12\x07\n\x03WOT\x10\x02') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x1alibp2p/pubsub/pb/rpc.proto\x12\tpubsub.pb\"\xd1\x02\n\x03RPC\x12-\n\rsubscriptions\x18\x01 \x03(\x0b\x32\x16.pubsub.pb.RPC.SubOpts\x12#\n\x07publish\x18\x02 \x03(\x0b\x32\x12.pubsub.pb.Message\x12*\n\x07\x63ontrol\x18\x03 \x01(\x0b\x32\x19.pubsub.pb.ControlMessage\x12\x14\n\x0csenderRecord\x18\x04 \x01(\x0c\x12\x32\n\rtestExtension\x18\x92\xa2\x8c\x03 \x01(\x0b\x32\x18.pubsub.pb.TestExtension\x12Q\n\x18largeMessageSegmentation\x18\x93\xa2\x8c\x03 \x01(\x0b\x32,.pubsub.pb.LargeMessageSegmentationExtension\x1a-\n\x07SubOpts\x12\x11\n\tsubscribe\x18\x01 \x01(\x08\x12\x0f\n\x07topicid\x18\x02 \x01(\t\"i\n\x07Message\x12\x0f\n\x07\x66rom_id\x18\x01 \x01(\x0c\x12\x0c\n\x04\x64\x61ta\x18\x02 \x01(\x0c\x12\r\n\x05seqno\x18\x03 \x01(\x0c\x12\x10\n\x08topicIDs\x18\x04 \x03(\t\x12\x11\n\tsignature\x18\x05 \x01(\x0c\x12\x0b\n\x03key\x18\x06 \x01(\x0c\"\xee\x02\n\x0e\x43ontrolMessage\x12&\n\x05ihave\x18\x01 \x03(\x0b\x32\x17.pubsub.pb.ControlIHave\x12&\n\x05iwant\x18\x02 \x03(\x0b\x32\x17.pubsub.pb.ControlIWant\x12&\n\x05graft\x18\x03 \x03(\x0b\x32\x17.pubsub.pb.ControlGraft\x12&\n\x05prune\x18\x04 \x03(\x0b\x32\x17.pubsub.pb.ControlPrune\x12.\n\tidontwant\x18\x05 \x03(\x0b\x32\x1b.pubsub.pb.ControlIDontWant\x12\x30\n\nextensions\x18\x06 \x01(\x0b\x32\x1c.pubsub.pb.ControlExtensions\x12*\n\x07observe\x18\x07 \x03(\x0b\x32\x19.pubsub.pb.ControlObserve\x12.\n\tunobserve\x18\x08 \x03(\x0b\x32\x1b.pubsub.pb.ControlUnobserve\"l\n\x11\x43ontrolExtensions\x12\x18\n\x10topicObservation\x18\x01 \x01(\x08\x12\x18\n\rtestExtension\x18\x92\xa2\x8c\x03 \x01(\x08\x12#\n\x18largeMessageSegmentation\x18\x93\xa2\x8c\x03 \x01(\x08\"\x86\x01\n!LargeMessageSegmentationExtension\x12\x11\n\tmessageID\x18\x01 \x01(\x0c\x12\x14\n\x0csegmentIndex\x18\x02 \x01(\r\x12\x15\n\rtotalSegments\x18\x03 \x01(\r\x12\x0f\n\x07payload\x18\x04 \x01(\x0c\x12\x10\n\x08\x63hecksum\x18\x05 \x01(\x0c\"3\n\x0c\x43ontrolIHave\x12\x0f\n\x07topicID\x18\x01 \x01(\t\x12\x12\n\nmessageIDs\x18\x02 \x03(\t\"\"\n\x0c\x43ontrolIWant\x12\x12\n\nmessageIDs\x18\x01 \x03(\t\"\x1f\n\x0c\x43ontrolGraft\x12\x0f\n\x07topicID\x18\x01 \x01(\t\"T\n\x0c\x43ontrolPrune\x12\x0f\n\x07topicID\x18\x01 \x01(\t\x12\"\n\x05peers\x18\x02 \x03(\x0b\x32\x13.pubsub.pb.PeerInfo\x12\x0f\n\x07\x62\x61\x63koff\x18\x03 \x01(\x04\"&\n\x10\x43ontrolIDontWant\x12\x12\n\nmessageIDs\x18\x01 \x03(\x0c\"!\n\x0e\x43ontrolObserve\x12\x0f\n\x07topicID\x18\x01 \x01(\t\"#\n\x10\x43ontrolUnobserve\x12\x0f\n\x07topicID\x18\x01 \x01(\t\"\x0f\n\rTestExtension\"4\n\x08PeerInfo\x12\x0e\n\x06peerID\x18\x01 \x01(\x0c\x12\x18\n\x10signedPeerRecord\x18\x02 \x01(\x0c\"\x87\x03\n\x0fTopicDescriptor\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x31\n\x04\x61uth\x18\x02 \x01(\x0b\x32#.pubsub.pb.TopicDescriptor.AuthOpts\x12/\n\x03\x65nc\x18\x03 \x01(\x0b\x32\".pubsub.pb.TopicDescriptor.EncOpts\x1a|\n\x08\x41uthOpts\x12:\n\x04mode\x18\x01 \x01(\x0e\x32,.pubsub.pb.TopicDescriptor.AuthOpts.AuthMode\x12\x0c\n\x04keys\x18\x02 \x03(\x0c\"&\n\x08\x41uthMode\x12\x08\n\x04NONE\x10\x00\x12\x07\n\x03KEY\x10\x01\x12\x07\n\x03WOT\x10\x02\x1a\x83\x01\n\x07\x45ncOpts\x12\x38\n\x04mode\x18\x01 \x01(\x0e\x32*.pubsub.pb.TopicDescriptor.EncOpts.EncMode\x12\x11\n\tkeyHashes\x18\x02 \x03(\x0c\"+\n\x07\x45ncMode\x12\x08\n\x04NONE\x10\x00\x12\r\n\tSHAREDKEY\x10\x01\x12\x07\n\x03WOT\x10\x02') _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) @@ -32,41 +32,43 @@ if not _descriptor._USE_C_DESCRIPTORS: DESCRIPTOR._loaded_options = None _globals['_RPC']._serialized_start=42 - _globals['_RPC']._serialized_end=296 - _globals['_RPC_SUBOPTS']._serialized_start=251 - _globals['_RPC_SUBOPTS']._serialized_end=296 - _globals['_MESSAGE']._serialized_start=298 - _globals['_MESSAGE']._serialized_end=403 - _globals['_CONTROLMESSAGE']._serialized_start=406 - _globals['_CONTROLMESSAGE']._serialized_end=772 - _globals['_CONTROLEXTENSIONS']._serialized_start=774 - _globals['_CONTROLEXTENSIONS']._serialized_end=845 - _globals['_CONTROLIHAVE']._serialized_start=847 - _globals['_CONTROLIHAVE']._serialized_end=898 - _globals['_CONTROLIWANT']._serialized_start=900 - _globals['_CONTROLIWANT']._serialized_end=934 - _globals['_CONTROLGRAFT']._serialized_start=936 - _globals['_CONTROLGRAFT']._serialized_end=967 - _globals['_CONTROLPRUNE']._serialized_start=969 - _globals['_CONTROLPRUNE']._serialized_end=1053 - _globals['_CONTROLIDONTWANT']._serialized_start=1055 - _globals['_CONTROLIDONTWANT']._serialized_end=1093 - _globals['_CONTROLOBSERVE']._serialized_start=1095 - _globals['_CONTROLOBSERVE']._serialized_end=1128 - _globals['_CONTROLUNOBSERVE']._serialized_start=1130 - _globals['_CONTROLUNOBSERVE']._serialized_end=1165 - _globals['_TESTEXTENSION']._serialized_start=1167 - _globals['_TESTEXTENSION']._serialized_end=1182 - _globals['_PEERINFO']._serialized_start=1184 - _globals['_PEERINFO']._serialized_end=1236 - _globals['_TOPICDESCRIPTOR']._serialized_start=1239 - _globals['_TOPICDESCRIPTOR']._serialized_end=1630 - _globals['_TOPICDESCRIPTOR_AUTHOPTS']._serialized_start=1372 - _globals['_TOPICDESCRIPTOR_AUTHOPTS']._serialized_end=1496 - _globals['_TOPICDESCRIPTOR_AUTHOPTS_AUTHMODE']._serialized_start=1458 - _globals['_TOPICDESCRIPTOR_AUTHOPTS_AUTHMODE']._serialized_end=1496 - _globals['_TOPICDESCRIPTOR_ENCOPTS']._serialized_start=1499 - _globals['_TOPICDESCRIPTOR_ENCOPTS']._serialized_end=1630 - _globals['_TOPICDESCRIPTOR_ENCOPTS_ENCMODE']._serialized_start=1587 - _globals['_TOPICDESCRIPTOR_ENCOPTS_ENCMODE']._serialized_end=1630 + _globals['_RPC']._serialized_end=379 + _globals['_RPC_SUBOPTS']._serialized_start=334 + _globals['_RPC_SUBOPTS']._serialized_end=379 + _globals['_MESSAGE']._serialized_start=381 + _globals['_MESSAGE']._serialized_end=486 + _globals['_CONTROLMESSAGE']._serialized_start=489 + _globals['_CONTROLMESSAGE']._serialized_end=855 + _globals['_CONTROLEXTENSIONS']._serialized_start=857 + _globals['_CONTROLEXTENSIONS']._serialized_end=965 + _globals['_LARGEMESSAGESEGMENTATIONEXTENSION']._serialized_start=968 + _globals['_LARGEMESSAGESEGMENTATIONEXTENSION']._serialized_end=1102 + _globals['_CONTROLIHAVE']._serialized_start=1104 + _globals['_CONTROLIHAVE']._serialized_end=1155 + _globals['_CONTROLIWANT']._serialized_start=1157 + _globals['_CONTROLIWANT']._serialized_end=1191 + _globals['_CONTROLGRAFT']._serialized_start=1193 + _globals['_CONTROLGRAFT']._serialized_end=1224 + _globals['_CONTROLPRUNE']._serialized_start=1226 + _globals['_CONTROLPRUNE']._serialized_end=1310 + _globals['_CONTROLIDONTWANT']._serialized_start=1312 + _globals['_CONTROLIDONTWANT']._serialized_end=1350 + _globals['_CONTROLOBSERVE']._serialized_start=1352 + _globals['_CONTROLOBSERVE']._serialized_end=1385 + _globals['_CONTROLUNOBSERVE']._serialized_start=1387 + _globals['_CONTROLUNOBSERVE']._serialized_end=1422 + _globals['_TESTEXTENSION']._serialized_start=1424 + _globals['_TESTEXTENSION']._serialized_end=1439 + _globals['_PEERINFO']._serialized_start=1441 + _globals['_PEERINFO']._serialized_end=1493 + _globals['_TOPICDESCRIPTOR']._serialized_start=1496 + _globals['_TOPICDESCRIPTOR']._serialized_end=1887 + _globals['_TOPICDESCRIPTOR_AUTHOPTS']._serialized_start=1629 + _globals['_TOPICDESCRIPTOR_AUTHOPTS']._serialized_end=1753 + _globals['_TOPICDESCRIPTOR_AUTHOPTS_AUTHMODE']._serialized_start=1715 + _globals['_TOPICDESCRIPTOR_AUTHOPTS_AUTHMODE']._serialized_end=1753 + _globals['_TOPICDESCRIPTOR_ENCOPTS']._serialized_start=1756 + _globals['_TOPICDESCRIPTOR_ENCOPTS']._serialized_end=1887 + _globals['_TOPICDESCRIPTOR_ENCOPTS_ENCMODE']._serialized_start=1844 + _globals['_TOPICDESCRIPTOR_ENCOPTS_ENCMODE']._serialized_end=1887 # @@protoc_insertion_point(module_scope) diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index c4fd230e9..a2598a8d6 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -352,6 +352,7 @@ def __init__( validation_cache_ttl: int = 300, validation_cache_size: int = 1000, validation_timeout: float = 5.0, + max_msg_size: int = 1 * 1024 * 1024, ) -> None: """ Construct a new Pubsub object, which is responsible for handling all @@ -434,6 +435,9 @@ def __init__( # Set of blacklisted peer IDs self.blacklisted_peers = set() + # Large Message Segmentation + self.max_msg_size = max_msg_size + # Event-based waiting: maps for trio.Event instances # Used by wait_for_peer / wait_for_subscription to avoid busy-waiting self._peer_added_events: dict[ID, trio.Event] = {} @@ -499,6 +503,14 @@ async def continuously_read_stream(self, stream: INetStream) -> None: ) continue + # -- Large Message Segmentation: route segments to router ---- + if rpc_incoming.HasField("largeMessageSegmentation"): + if hasattr(self.router, "handle_rpc"): + self.manager.run_task( + self.router.handle_rpc, rpc_incoming, peer_id + ) + continue + if rpc_incoming.publish: # deal with RPC.publish for msg in rpc_incoming.publish: @@ -789,7 +801,7 @@ async def _handle_new_peer(self, peer_id: ID) -> None: self.peers[peer_id] = stream # Create per-peer outbound queue and spawn sending task - queue = RpcQueue() + queue = RpcQueue(max_message_size=self.max_msg_size) self.peer_queues[peer_id] = queue self.manager.run_task(self.handle_sending_messages, peer_id, stream, queue) diff --git a/libp2p/pubsub/segmentation.py b/libp2p/pubsub/segmentation.py new file mode 100644 index 000000000..90dced7ad --- /dev/null +++ b/libp2p/pubsub/segmentation.py @@ -0,0 +1,193 @@ +from __future__ import annotations + +import time +from collections.abc import Awaitable, Callable +from dataclasses import dataclass, field +from hashlib import sha256 +from typing import Optional + +from .pb import rpc_pb2 + +DEFAULT_SEGMENT_SIZE = 256 * 1024 # 256 KiB per segment payload +DEFAULT_MAX_MESSAGE_SIZE = 1 * 1024 * 1024 # 1 MiB — threshold for segmentation +DEFAULT_REASSEMBLY_TIMEOUT = 120.0 # seconds before an incomplete set is evicted +MAX_BUFFERED_MESSAGE_IDS = 1024 # bound total incomplete sets to prevent memory DoS + + +def should_segment(data_size: int, threshold: int = DEFAULT_MAX_MESSAGE_SIZE) -> bool: + """Return True if *data_size* exceeds the segmentation threshold.""" + return data_size > threshold + + +def segment_message( + message_id: bytes, + data: bytes, + segment_size: int = DEFAULT_SEGMENT_SIZE, +) -> list[rpc_pb2.LargeMessageSegmentationExtension]: + """Split *data* into ``ceil(len(data) / segment_size)`` protobuf segments. + + Each segment carries the same *message_id* and *total_segments* count. + The last segment may be smaller than *segment_size*. + """ + if not data: + raise ValueError("cannot segment empty data") + + total = (len(data) + segment_size - 1) // segment_size + checksum = sha256(data).digest() + segments: list[rpc_pb2.LargeMessageSegmentationExtension] = [] + + for i in range(total): + offset = i * segment_size + chunk = data[offset : offset + segment_size] + seg = rpc_pb2.LargeMessageSegmentationExtension( + messageID=message_id, + segmentIndex=i, + totalSegments=total, + payload=chunk, + checksum=checksum, + ) + segments.append(seg) + + return segments + + +def reassemble_segments( + segments: list[rpc_pb2.LargeMessageSegmentationExtension], +) -> bytes: + """Reassemble a list of ordered segments back into the original payload. + + Segments MUST be complete (all indices present) and sorted by + ``segmentIndex``. Callers are responsible for ordering. + """ + if not segments: + raise ValueError("no segments to reassemble") + + segments.sort(key=lambda s: s.segmentIndex) + data = b"".join(s.payload for s in segments) + + # Checksum verification when the sender provided one. + if segments[0].HasField("checksum"): + expected = segments[0].checksum + actual = sha256(data).digest() + if expected != actual: + raise ValueError( + f"checksum mismatch: expected {expected.hex()}, got {actual.hex()}" + ) + + return data + + +@dataclass +class PendingMessage: + """Tracks segments received so far for an incomplete message.""" + total_segments: int + segments: dict[int, rpc_pb2.LargeMessageSegmentationExtension] = field(default_factory=dict) + first_seen: float = field(default_factory=time.time) + + @property + def is_complete(self) -> bool: + return len(self.segments) == self.total_segments + + @property + def age(self) -> float: + return time.time() - self.first_seen + + +class ReassemblyBuffer: + """Bounded, timed buffer that reassembles ``LargeMessageSegmentationExtension`` + segments into complete payloads. + + Thread-safe *only* if used from a single ``anyio`` task (which is the case + in py-libp2p's read loop). + """ + + def __init__( + self, + timeout: float = DEFAULT_REASSEMBLY_TIMEOUT, + max_pending: int = MAX_BUFFERED_MESSAGE_IDS, + on_complete: Callable[[bytes], Awaitable[None]] | None = None, + ) -> None: + self._timeout = timeout + self._max_pending = max_pending + self._pending: dict[bytes, PendingMessage] = {} + self._on_complete = on_complete + + # ------------------------------------------------------------------ + # Public API + # ------------------------------------------------------------------ + + async def add_segment( + self, + seg: rpc_pb2.LargeMessageSegmentationExtension, + ) -> bytes | None: + """Feed a segment into the buffer. + + Returns the fully reassembled payload bytes when the set is complete, + or ``None`` if more segments are still expected. + """ + mid = seg.messageID + total = seg.totalSegments + idx = seg.segmentIndex + + if idx >= total: + return None + + if mid not in self._pending: + if len(self._pending) >= self._max_pending: + self._evict_oldest() + self._pending[mid] = PendingMessage(total_segments=total) + + pending = self._pending[mid] + + if pending.total_segments != total: + del self._pending[mid] + return None + + pending.segments[idx] = seg + + if pending.is_complete: + ordered = [pending.segments[i] for i in range(total)] + del self._pending[mid] + try: + data = reassemble_segments(ordered) + except ValueError: + return None + if self._on_complete is not None: + await self._on_complete(data) + return data + + return None + + def garbage_collect(self) -> int: + """Evict all pending sets that have exceeded the timeout. + + Returns the number of sets evicted. + """ + now = time.time() + expired = [ + mid + for mid, pm in self._pending.items() + if now - pm.first_seen > self._timeout + ] + for mid in expired: + del self._pending[mid] + return len(expired) + + def pending_count(self) -> int: + """Return the number of incomplete message sets currently buffered.""" + return len(self._pending) + + def has_pending(self, message_id: bytes) -> bool: + """Return True if *message_id* has an incomplete set in the buffer.""" + return message_id in self._pending + + # ------------------------------------------------------------------ + # Internal helpers + # ------------------------------------------------------------------ + + def _evict_oldest(self) -> None: + """Remove the single oldest pending set (by first_seen timestamp).""" + if not self._pending: + return + oldest = min(self._pending.items(), key=lambda kv: kv[1].first_seen) + del self._pending[oldest[0]] diff --git a/tests/pubsub/__init__.py b/tests/pubsub/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/pubsub/test_segmentation.py b/tests/pubsub/test_segmentation.py new file mode 100644 index 000000000..eeee89350 --- /dev/null +++ b/tests/pubsub/test_segmentation.py @@ -0,0 +1,227 @@ +from hashlib import sha256 +import time +import uuid + +import pytest + +from libp2p.pubsub.segmentation import ( + DEFAULT_SEGMENT_SIZE, + DEFAULT_MAX_MESSAGE_SIZE, + MAX_BUFFERED_MESSAGE_IDS, + ReassemblyBuffer, + reassemble_segments, + segment_message, + should_segment, +) +from libp2p.pubsub.pb import rpc_pb2 + + +class TestShouldSegment: + def test_below_threshold(self): + assert should_segment(512 * 1024) is False + assert should_segment(DEFAULT_MAX_MESSAGE_SIZE - 1) is False + assert should_segment(DEFAULT_MAX_MESSAGE_SIZE) is False # equal = don't segment + + def test_above_threshold(self): + assert should_segment(DEFAULT_MAX_MESSAGE_SIZE + 1) is True + assert should_segment(2 * 1024 * 1024) is True + + def test_custom_threshold(self): + assert should_segment(1000, threshold=500) is True + assert should_segment(100, threshold=500) is False + + +class TestSegmentMessage: + def test_small_data_single_segment(self): + data = b"hello world" + mid = b"\x01" * 8 + segs = segment_message(mid, data, segment_size=4096) + assert len(segs) == 1 + assert segs[0].segmentIndex == 0 + assert segs[0].totalSegments == 1 + assert segs[0].payload == data + assert segs[0].checksum == sha256(data).digest() + + def test_large_data_multiple_segments(self): + data = b"x" * 100_000 + mid = uuid.uuid4().bytes + seg_size = 30_000 + segs = segment_message(mid, data, segment_size=seg_size) + expected_count = (len(data) + seg_size - 1) // seg_size + assert len(segs) == expected_count + + # Verify each segment + for i, seg in enumerate(segs): + assert seg.messageID == mid + assert seg.segmentIndex == i + assert seg.totalSegments == expected_count + expected_payload = data[i * seg_size : (i + 1) * seg_size] + assert seg.payload == expected_payload + + # Verify checksums match across all segments + for seg in segs: + assert seg.checksum == sha256(data).digest() + + def test_empty_data_raises(self): + with pytest.raises(ValueError, match="cannot segment empty data"): + segment_message(b"\x00", b"", segment_size=1024) + + def test_message_id_preserved(self): + data = b"a" * 50_000 + mid = b"\xde\xad\xbe\xef" + segs = segment_message(mid, data, segment_size=20_000) + for seg in segs: + assert seg.messageID == mid + + +class TestReassembleSegments: + def test_reassemble_single_segment(self): + data = b"hello world" + mid = b"\x01" * 8 + segs = segment_message(mid, data) + result = reassemble_segments(segs) + assert result == data + + def test_reassemble_multiple_segments(self): + data = b"x" * 100_000 + mid = uuid.uuid4().bytes + segs = segment_message(mid, data, segment_size=30_000) + result = reassemble_segments(segs) + assert result == data + + def test_reassemble_out_of_order(self): + data = b"abcdefghijklmnopqrstuvwxyz" * 1000 + mid = b"\x02" * 8 + segs = segment_message(mid, data, segment_size=5000) + # Reverse the order + segs.reverse() + result = reassemble_segments(segs) + assert result == data + + def test_checksum_mismatch_raises(self): + data = b"original data" + mid = b"\x03" * 8 + segs = segment_message(mid, data, segment_size=1024) + # Tamper with the first segment's payload + segs[0].payload = b"tampered" + with pytest.raises(ValueError, match="checksum mismatch"): + reassemble_segments(segs) + + def test_no_checksum_no_verification(self): + seg = rpc_pb2.LargeMessageSegmentationExtension( + messageID=b"\x04" * 8, + segmentIndex=0, + totalSegments=1, + payload=b"hello", + # no checksum set + ) + result = reassemble_segments([seg]) + assert result == b"hello" + + def test_empty_segments_raises(self): + with pytest.raises(ValueError, match="no segments to reassemble"): + reassemble_segments([]) + + +class TestReassemblyBuffer: + def test_single_segment_completes_immediately(self): + buf = ReassemblyBuffer(timeout=60) + data = b"hello world" + mid = b"\x10" * 8 + segs = segment_message(mid, data, segment_size=4096) + result = buf.add_segment(segs[0]) + assert result == data + assert buf.pending_count() == 0 + + def test_multiple_segments_out_of_order(self): + buf = ReassemblyBuffer(timeout=60) + data = b"x" * 100_000 + mid = uuid.uuid4().bytes + segs = segment_message(mid, data, segment_size=30_000) + # Feed in reverse order + for seg in reversed(segs): + result = buf.add_segment(seg) + if result is not None: + assert result == data + assert buf.pending_count() == 0 + + def test_partial_set_returns_none(self): + buf = ReassemblyBuffer(timeout=60) + data = b"x" * 100_000 + mid = uuid.uuid4().bytes + segs = segment_message(mid, data, segment_size=30_000) + # Only add first two segments + result = buf.add_segment(segs[0]) + assert result is None + result = buf.add_segment(segs[1]) + assert result is None + assert buf.pending_count() == 1 + assert buf.has_pending(mid) + + def test_timeout_eviction(self): + buf = ReassemblyBuffer(timeout=0.01) # very short timeout + data = b"x" * 100_000 + mid = uuid.uuid4().bytes + segs = segment_message(mid, data, segment_size=50_000) + buf.add_segment(segs[0]) + assert buf.pending_count() == 1 + time.sleep(0.02) + evicted = buf.garbage_collect() + assert evicted == 1 + assert buf.pending_count() == 0 + + def test_max_cap_evicts_oldest(self): + buf = ReassemblyBuffer(timeout=300, max_pending=3) + # Fill with 3 incomplete sets + for i in range(3): + mid = bytes([i] * 8) + seg = rpc_pb2.LargeMessageSegmentationExtension( + messageID=mid, + segmentIndex=0, + totalSegments=2, + payload=b"a", + ) + buf.add_segment(seg) + assert buf.pending_count() == 3 + # Adding a 4th should evict the oldest + mid4 = b"\x04" * 8 + seg4 = rpc_pb2.LargeMessageSegmentationExtension( + messageID=mid4, + segmentIndex=0, + totalSegments=2, + payload=b"b", + ) + buf.add_segment(seg4) + assert buf.pending_count() == 3 + assert buf.has_pending(mid4) + assert not buf.has_pending(b"\x00" * 8) # oldest evicted + + def test_protocol_violation_total_changed(self): + buf = ReassemblyBuffer(timeout=60) + mid = b"\xff" * 8 + seg1 = rpc_pb2.LargeMessageSegmentationExtension( + messageID=mid, + segmentIndex=0, + totalSegments=3, + payload=b"a", + ) + seg2 = rpc_pb2.LargeMessageSegmentationExtension( + messageID=mid, + segmentIndex=0, + totalSegments=5, # different total! + payload=b"b", + ) + buf.add_segment(seg1) + result = buf.add_segment(seg2) + assert result is None # discarded + assert not buf.has_pending(mid) + + def test_on_complete_callback(self): + callback_called = [] + buf = ReassemblyBuffer(timeout=60, on_complete=lambda data: callback_called.append(data)) + data = b"callback test data" + mid = b"\xaa" * 8 + segs = segment_message(mid, data, segment_size=4096) + buf.add_segment(segs[0]) + assert len(callback_called) == 1 + assert callback_called[0] == data From c05a75b8b207b68c3e9f73cbc523bd63af93027c Mon Sep 17 00:00:00 2001 From: shivv23 Date: Fri, 8 May 2026 22:50:10 +0530 Subject: [PATCH 2/2] address review feedback: messageID derivation, configurable timeout, field alignment - Switch messageID from raw seqno to SHA-256(publisherID || topic || nonce)[:16] per spec draft recommendation (avoids cross-publisher/cross-topic collisions) - Add compute_message_id() helper to segmentation.py - Add reassembly_timeout config parameter (default 120s) - Align extensions.proto field number 8473921 -> 6492435 (coordination PR: theUtkarshRaj/specs#1) --- libp2p/pubsub/extensions.py | 3 +++ libp2p/pubsub/gossipsub.py | 12 ++++++++++-- libp2p/pubsub/segmentation.py | 14 +++++++++++++- 3 files changed, 26 insertions(+), 3 deletions(-) diff --git a/libp2p/pubsub/extensions.py b/libp2p/pubsub/extensions.py index 86c508367..9f4d97598 100644 --- a/libp2p/pubsub/extensions.py +++ b/libp2p/pubsub/extensions.py @@ -258,6 +258,9 @@ def remove_observer(self, topic: str, observer_peer: ID) -> None: def get_observers(self, topic: str) -> set[ID]: return self._observers.get(topic, set()) + def get_subscriber_peers_for_topic(self, topic: str) -> set[ID]: + return self._observers.get(topic, set()) + def clear_peer(self, peer_id: ID) -> None: for topic in list(self._observing.keys()): self._observing[topic].discard(peer_id) diff --git a/libp2p/pubsub/gossipsub.py b/libp2p/pubsub/gossipsub.py index 91daba610..3c74540cf 100644 --- a/libp2p/pubsub/gossipsub.py +++ b/libp2p/pubsub/gossipsub.py @@ -56,7 +56,9 @@ from .segmentation import ( DEFAULT_MAX_MESSAGE_SIZE, DEFAULT_SEGMENT_SIZE, + DEFAULT_REASSEMBLY_TIMEOUT, ReassemblyBuffer, + compute_message_id, segment_message, should_segment, ) @@ -206,6 +208,7 @@ def __init__( # receiving side. Set to 0 to disable segmentation entirely. max_msg_size: int = DEFAULT_MAX_MESSAGE_SIZE, segment_size: int = DEFAULT_SEGMENT_SIZE, + reassembly_timeout: float = DEFAULT_REASSEMBLY_TIMEOUT, ) -> None: self.protocols = list(protocols) self.pubsub = None @@ -273,8 +276,9 @@ def __init__( # --- Large Message Segmentation ---------------------------------- self.max_msg_size = max_msg_size self.segment_size = segment_size + self.reassembly_timeout = reassembly_timeout self.reassembly_buffer = ReassemblyBuffer( - timeout=120.0, + timeout=reassembly_timeout, on_complete=self._on_segments_reassembled, ) @@ -975,7 +979,11 @@ async def publish(self, msg_forwarder: ID, pubsub_msg: rpc_pb2.Message) -> None: if needs_segmentation: from .segmentation import segment_message import uuid - seg_msg_id = pubsub_msg.seqno or uuid.uuid4().bytes + nonce = pubsub_msg.seqno or uuid.uuid4().bytes + topic = pubsub_msg.topicIDs[0] if pubsub_msg.topicIDs else "" + seg_msg_id = compute_message_id( + pubsub_msg.from_id, topic, nonce + ) segments = segment_message(seg_msg_id, pubsub_bytes, self.segment_size) for peer_id in peers_gen: diff --git a/libp2p/pubsub/segmentation.py b/libp2p/pubsub/segmentation.py index 90dced7ad..e4f2d0049 100644 --- a/libp2p/pubsub/segmentation.py +++ b/libp2p/pubsub/segmentation.py @@ -14,8 +14,20 @@ MAX_BUFFERED_MESSAGE_IDS = 1024 # bound total incomplete sets to prevent memory DoS +def compute_message_id( + publisher_id: bytes, + topic: str, + nonce: bytes, +) -> bytes: + """Derive a MessageID following the spec draft recommendation. + + ``SHA-256(publisherPeerID || topic || nonce)[:16]`` avoids cross-publisher + and cross-topic collisions while keeping the wire overhead small (16 bytes). + """ + return sha256(publisher_id + topic.encode("utf-8") + nonce).digest()[:16] + + def should_segment(data_size: int, threshold: int = DEFAULT_MAX_MESSAGE_SIZE) -> bool: - """Return True if *data_size* exceeds the segmentation threshold.""" return data_size > threshold