Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
349 changes: 61 additions & 288 deletions libp2p/pubsub/extensions.py

Large diffs are not rendered by default.

73 changes: 71 additions & 2 deletions libp2p/pubsub/gossipsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,15 @@
PeerExtensions,
TopicObservationState,
)
from .segmentation import (
DEFAULT_MAX_MESSAGE_SIZE,
DEFAULT_SEGMENT_SIZE,
DEFAULT_REASSEMBLY_TIMEOUT,
ReassemblyBuffer,
compute_message_id,
segment_message,
should_segment,
)
from .mcache import (
MessageCache,
)
Expand Down Expand Up @@ -193,6 +202,13 @@ def __init__(
my_extensions: PeerExtensions | None = None,
max_pending_messages_per_peer: int = 100,
pending_messages_ttl: float = 30.0,
# --- Large Message Segmentation (experimental) ---
# Messages whose serialized size exceeds *max_msg_size* are automatically
# split into segments of *segment_size* bytes and reassembled on the
# receiving side. Set to 0 to disable segmentation entirely.
max_msg_size: int = DEFAULT_MAX_MESSAGE_SIZE,
segment_size: int = DEFAULT_SEGMENT_SIZE,
reassembly_timeout: float = DEFAULT_REASSEMBLY_TIMEOUT,
) -> None:
self.protocols = list(protocols)
self.pubsub = None
Expand Down Expand Up @@ -257,6 +273,15 @@ def __init__(
# Tracks observers (inbound) and topics we are observing (outbound).
self.topic_observation = TopicObservationState()

# --- Large Message Segmentation ----------------------------------
self.max_msg_size = max_msg_size
self.segment_size = segment_size
self.reassembly_timeout = reassembly_timeout
self.reassembly_buffer = ReassemblyBuffer(
timeout=reassembly_timeout,
on_complete=self._on_segments_reassembled,
)

# Gossipsub v2.0 adaptive features
self.adaptive_gossip_enabled = adaptive_gossip_enabled
self.network_health_score = 1.0 # Start optimistic
Expand Down Expand Up @@ -316,6 +341,16 @@ def __init__(
# v1.4 adaptive gossip parameters
self.opportunistic_graft_threshold: float = 0.5

# ------------------------------------------------------------------
# Large Message Segmentation: reassembly complete callback
# ------------------------------------------------------------------
async def _on_segments_reassembled(self, data: bytes) -> None:
msg = rpc_pb2.Message()
msg.ParseFromString(data)
from_id = ID(msg.from_id) if msg.from_id else None
if self.pubsub is not None and from_id is not None:
await self.pubsub.push_msg(from_id, msg)

def supports_scoring(self, peer_id: ID) -> bool:
"""
Check if peer supports Gossipsub v1.1+ scoring features.
Expand Down Expand Up @@ -833,6 +868,17 @@ async def handle_rpc(self, rpc: rpc_pb2.RPC, sender_peer_id: ID) -> None:
:param rpc: RPC message
:param sender_peer_id: id of the peer who sent the message
"""
# -- Large Message Segmentation: intercept segments --------------
if rpc.HasField("largeMessageSegmentation"):
seg = rpc.largeMessageSegmentation
data = await self.reassembly_buffer.add_segment(seg)
if data is not None and self.pubsub is not None:
msg = rpc_pb2.Message()
msg.ParseFromString(data)
forwarder_id = ID(msg.from_id) if msg.from_id else sender_peer_id
await self.pubsub.push_msg(forwarder_id, msg)
return

# Process the senderRecord if sent
if isinstance(self.pubsub, Pubsub):
if not maybe_consume_signed_record(rpc, self.pubsub.host, sender_peer_id):
Expand Down Expand Up @@ -923,17 +969,40 @@ async def publish(self, msg_forwarder: ID, pubsub_msg: rpc_pb2.Message) -> None:
# a message so observers get near-real-time awareness of new messages.
await self._notify_observers(pubsub_msg.topicIDs, msg_id)

# -- Large Message Segmentation ----------------------------------
pubsub_bytes = pubsub_msg.SerializeToString()
needs_segmentation = (
self.segment_size > 0
and len(pubsub_bytes) > self.segment_size
)
segments = []
if needs_segmentation:
from .segmentation import segment_message
import uuid
nonce = pubsub_msg.seqno or uuid.uuid4().bytes
topic = pubsub_msg.topicIDs[0] if pubsub_msg.topicIDs else ""
seg_msg_id = compute_message_id(
pubsub_msg.from_id, topic, nonce
)
segments = segment_message(seg_msg_id, pubsub_bytes, self.segment_size)

for peer_id in peers_gen:
if self.pubsub is None:
raise NoPubsubAttached
if peer_id not in self.pubsub.peers:
continue
# Publish gate
if self.scorer is not None and not self.scorer.allow_publish(
peer_id, list(pubsub_msg.topicIDs)
):
continue
self.send_rpc(peer_id, rpc_msg)

if needs_segmentation and self.extensions_state.both_support_large_message_segmentation(peer_id):
for seg in segments:
seg_rpc = rpc_pb2.RPC()
seg_rpc.largeMessageSegmentation.CopyFrom(seg)
self.send_rpc(peer_id, seg_rpc)
else:
self.send_rpc(peer_id, rpc_msg)

# Queue messages for peers whose subscriptions we haven't received yet.
# This handles two cases:
Expand Down
176 changes: 88 additions & 88 deletions libp2p/pubsub/pb/rpc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2,143 +2,143 @@
// Updated with GossipSub v1.3 Extensions Control Message support.
// Spec: https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.3.md
// extensions.proto: https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/extensions/extensions.proto
//
// Interop note: The Topic Observation extension (observe/unobserve, topicObservation)
// is not yet in the upstream libp2p/specs extensions.proto. Field numbers
// follow the go-libp2p reference implementation for cross-client interop.
// See: https://ethresear.ch/t/gossipsub-topic-observation-proposed-gossipsub-1-3/20907

syntax = "proto2";

package pubsub.pb;

message RPC {
repeated SubOpts subscriptions = 1;
repeated Message publish = 2;
repeated SubOpts subscriptions = 1;
repeated Message publish = 2;

message SubOpts {
optional bool subscribe = 1; // subscribe or unsubscribe
optional string topicid = 2;
}
message SubOpts {
optional bool subscribe = 1;
optional string topicid = 2;
}

optional ControlMessage control = 3;
optional bytes senderRecord = 4;
optional ControlMessage control = 3;
optional bytes senderRecord = 4;

// Canonical Extensions register their top-level RPC messages here.
// Canonical Extensions register their top-level RPC messages here.
// Experimental Extensions MUST use field numbers larger than 0x200000
// so they are encoded with at least 4 bytes (per GossipSub v1.3 spec).
optional TestExtension testExtension = 6492434;

// Experimental Extensions MUST use field numbers larger than 0x200000
// so they are encoded with at least 4 bytes (per GossipSub v1.3 spec).
optional TestExtension testExtension = 6492434;
// Large Message Segmentation Extension (experimental).
// Spec: ../gossipsub/extensions/experimental/large-message-segmentation.md
optional LargeMessageSegmentationExtension largeMessageSegmentation = 6492435;
}

message Message {
optional bytes from_id = 1;
optional bytes data = 2;
optional bytes seqno = 3;
repeated string topicIDs = 4;
optional bytes signature = 5;
optional bytes key = 6;
optional bytes from_id = 1;
optional bytes data = 2;
optional bytes seqno = 3;
repeated string topicIDs = 4;
optional bytes signature = 5;
optional bytes key = 6;
}

message ControlMessage {
repeated ControlIHave ihave = 1;
repeated ControlIWant iwant = 2;
repeated ControlGraft graft = 3;
repeated ControlPrune prune = 4;
repeated ControlIDontWant idontwant = 5;

// GossipSub v1.3: Extensions control message (MUST be in first message,
// MUST NOT be sent more than once per peer).
// Spec: https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.3.md
optional ControlExtensions extensions = 6;

// Topic Observation extension control messages.
repeated ControlObserve observe = 7;
repeated ControlUnobserve unobserve = 8;
repeated ControlIHave ihave = 1;
repeated ControlIWant iwant = 2;
repeated ControlGraft graft = 3;
repeated ControlPrune prune = 4;
repeated ControlIDontWant idontwant = 5;
optional ControlExtensions extensions = 6; // GossipSub v1.3
repeated ControlObserve observe = 7; // Topic Observation extension
repeated ControlUnobserve unobserve = 8; // Topic Observation extension
}

// ControlExtensions advertises which v1.3 extensions the sending peer supports.
// Peers MUST ignore unknown fields (forward-compatible per spec).
// Field numbers for experimental extensions MUST be > 0x200000.
message ControlExtensions {
// Set to true if the peer supports the Topic Observation extension.
optional bool topicObservation = 1;
optional bool topicObservation = 1;
optional bool testExtension = 6492434;

// Large Message Segmentation Extension capability flag.
optional bool largeMessageSegmentation = 6492435;
}

// Experimental extensions use field numbers > 0x200000.
// testExtension: used for cross-implementation interop testing (go-libp2p compat).
optional bool testExtension = 6492434;
// LargeMessageSegmentationExtension carries a single segment of a large
// message that was split across multiple RPC frames. The receiver buffers
// segments by messageID and reassembles when all are received.
//
// Fields:
// messageID - Opaque identifier linking segments of the same original message.
// Must be unique per (sender, topic) scope to avoid collisions.
// segmentIndex - Zero-based position of this segment in the ordered sequence.
// totalSegments - Total number of segments comprising the original message.
// payload - Raw bytes for this segment.
// checksum - Optional SHA-256 hash of the full pre-segmentation payload.
// When present, receivers MUST verify after reassembly.
message LargeMessageSegmentationExtension {
optional bytes messageID = 1;
optional uint32 segmentIndex = 2;
optional uint32 totalSegments = 3;
optional bytes payload = 4;
optional bytes checksum = 5;
}

message ControlIHave {
optional string topicID = 1;
repeated string messageIDs = 2;
optional string topicID = 1;
repeated string messageIDs = 2;
}

message ControlIWant {
repeated string messageIDs = 1;
repeated string messageIDs = 1;
}

message ControlGraft {
optional string topicID = 1;
optional string topicID = 1;
}

message ControlPrune {
optional string topicID = 1;
repeated PeerInfo peers = 2;
optional uint64 backoff = 3;
optional string topicID = 1;
repeated PeerInfo peers = 2;
optional uint64 backoff = 3;
}

message ControlIDontWant {
repeated bytes messageIDs = 1;
repeated bytes messageIDs = 1;
}

// ControlObserve: Topic Observation extension.
// Sent by an observer to start receiving IHAVE notifications for a topic
// without being a full subscriber. (GossipSub v1.3 Topic Observation extension)
message ControlObserve {
optional string topicID = 1;
optional string topicID = 1;
}

// ControlUnobserve: Topic Observation extension.
// Sent by an observer to stop receiving IHAVE notifications for a topic.
message ControlUnobserve {
optional string topicID = 1;
optional string topicID = 1;
}

// TestExtension: used for interoperability testing of the v1.3 extension
// mechanism between implementations (go-libp2p, rust-libp2p, py-libp2p).
// An empty message — its presence on the wire is the signal.
message TestExtension {}

message PeerInfo {
optional bytes peerID = 1;
optional bytes signedPeerRecord = 2;
optional bytes peerID = 1;
optional bytes signedPeerRecord = 2;
}

message TopicDescriptor {
optional string name = 1;
optional AuthOpts auth = 2;
optional EncOpts enc = 3;

message AuthOpts {
optional AuthMode mode = 1;
repeated bytes keys = 2; // root keys to trust

enum AuthMode {
NONE = 0; // no authentication, anyone can publish
KEY = 1; // only messages signed by keys in the topic descriptor are accepted
WOT = 2; // web of trust, certificates can allow publisher set to grow
}
}

message EncOpts {
optional EncMode mode = 1;
repeated bytes keyHashes = 2; // the hashes of the shared keys used (salted)

enum EncMode {
NONE = 0; // no encryption, anyone can read
SHAREDKEY = 1; // messages are encrypted with shared key
WOT = 2; // web of trust, certificates can allow publisher set to grow
}
}
optional string name = 1;
optional AuthOpts auth = 2;
optional EncOpts enc = 3;

message AuthOpts {
optional AuthMode mode = 1;
repeated bytes keys = 2;
enum AuthMode {
NONE = 0;
KEY = 1;
WOT = 2;
}
}

message EncOpts {
optional EncMode mode = 1;
repeated bytes keyHashes = 2;
enum EncMode {
NONE = 0;
SHAREDKEY = 1;
WOT = 2;
}
}
}
Loading