feat: add Large Message Segmentation extension for GossipSub v1.3#1323
feat: add Large Message Segmentation extension for GossipSub v1.3#1323shivv23 wants to merge 2 commits into
Conversation
Implements transparent segmentation of large pubsub payloads (>256 KiB) in py-libp2p. Messages exceeding the segment size threshold are split into ordered chunks, propagated independently through the mesh, and reassembled on the receiving side. Key components: - rpc.proto: LargeMessageSegmentationExtension message + capability flag - segmentation.py: segment_message(), reassemble_segments(), ReassemblyBuffer - extensions.py: large_message_segmentation field in PeerExtensions - gossipsub.py: segmentation in publish(), intercept in handle_rpc() - pubsub.py: max_msg_size config, segment routing in read loop - tests: 20 unit tests for segmentation and reassembly Spec: seetadev/specs#2
|
@shivv23 — C4GT DMP 2026 contributor, working on GossipSub 1.4 Large Message Handling 👋 Hey @seetadev, @johannamoran, @theUtkarshRaj, @mhchia, @ralexstokes — this PR implements the Large Message Segmentation extension for py-libp2p's GossipSub v1.3. Context: This is part of the GossipSub 1.4 spec effort (experimental draft at seetadev/specs#2). The spec defines the wire format and protocol semantics; this PR is the py-libp2p reference implementation. Design decisions I'd particularly like feedback on:
Happy to iterate on any of this — design, style, test coverage, whatever makes sense for the project. |
Hey @shivv23, this is great to see — the spec PR was sitting in a vacuum without an implementation tracking it, so this helps a lot. Your approach (handshake-gated fallback through v1.3 ControlExtensions) is basically what I had in mind when drafting it. Couple of things on the spec side worth aligning on: Field numbers — I went with Segment size — you defaulted to 256 KiB, I have 1 MiB as the max in the spec. These don't actually conflict, 256 KiB is fine as an implementation default under a 1 MiB ceiling, but I should probably make that more explicit in the spec text so it doesn't look like we disagree. On your open questions:
I'll push a small spec update soon to clarify the segment-size point and tighten the messageID wording. For interop testing — once you have the test plan landing, happy to coordinate on the nim-libp2p side. |
|
Thanks @theUtkarshRaj — this is exactly the kind of alignment I was hoping for. Let me lock down the action items: Field numbers: Let's go with messageID derivation: You're right that seqno alone is too narrow. I'll switch to Segment size / ceiling: Agreed — I'll leave the code default at 256 KiB and note in a comment that the spec ceiling is 1 MiB. Your clarification in the spec text would be helpful for other implementers reading it. Interop: Once the next iteration is pushed (messageID fix + any review feedback), I'll draft the interop test plan. Would be good to set up a quick sync — if there's a channel or meeting cadence for this project, loop me in. One thing I'd like your eyes on specifically: the |
Thanks @shivv23 — sounds good, this is roughly how I was hoping the spec and implementation would converge. Yes, please go ahead and open the PR against my spec draft with the field number update (8473921 → 6492435) and the segment-size ceiling note. Saves me a cycle and it's cleaner for reviewers to see the spec and code align in the same iteration. On the messageID change — should be a small diff in your publish path. Once it lands I'll cross-reference your implementation from the spec text as the reference derivation. On the reassembly timeout: I think this is worth pinning down in the spec rather than leaving fully per-implementation. 120s is a reasonable default but the right number depends on expected payload size and mesh propagation latency. My lean is to parameterize it as a configurable per-topic value with a recommended default in the 60–120s range — implementations expose the knob, applications tune it. nim-libp2p's prototype hasn't standardized this either as far as I've seen, so we have room to set the precedent. I'll add a reassembly lifecycle section to the spec in the next push covering this alongside the buffer caps from the security considerations. On sync — there's no formal channel set up that I'm aware of. For now I'd suggest using the PR comment threads as the primary surface and pinging @seetadev / @johannamoran when we hit decisions that need mentor input. Closer to interop testing it'll probably make sense to set up a real-time sync, happy to organize that when we get there. |
…field alignment - Switch messageID from raw seqno to SHA-256(publisherID || topic || nonce)[:16] per spec draft recommendation (avoids cross-publisher/cross-topic collisions) - Add compute_message_id() helper to segmentation.py - Add reassembly_timeout config parameter (default 120s) - Align extensions.proto field number 8473921 -> 6492435 (coordination PR: theUtkarshRaj/specs#1)
|
All aligned and pushed: Spec PR: theUtkarshRaj/specs#1 — field number updated to py-libp2p update (c05a75b):
|
- Change largeMessageSegmentation field from 8473921 to 6492435 in both ControlExtensions and RPC to match py-libp2p (PR libp2p/py-libp2p#1323) - Rename RPC.largeSegmentation to RPC.largeMessageSegmentation for consistency - Note py-libp2p's 256 KiB default segment size under Open Question 2
Summary
Implement the Large Message Segmentation extension for GossipSub v1.3 in py-libp2p. This allows nodes to transparently split oversize pubsub payloads (>256 KiB) into ordered segments, propagate them independently through the mesh, and reassemble them on the receiving side — without any changes to the application-layer API.
The implementation follows the experimental spec draft from seetadev/specs#2 and integrates with the existing GossipSub v1.3 Extensions Control Message framework (
gossipsub-v1.3.md).Why this matters: Several emerging libp2p workloads — distributed ML training checkpoints, Ethereum blob propagation (EIP-7594 / PeerDAS), large event logs — routinely produce pubsub messages in the megabyte range. Currently, py-libp2p silently drops any message whose serialized RPC exceeds 1 MiB (see
RpcQueue.split_rpc). This PR closes that gap.Design
Wire format
A new experimental protobuf message is added to
rpc.proto:Registered on the
RPCmessage at field6492435(experimental range, >0x200000 per v1.3 spec).Extension advertisement
Peers advertise support via the v1.3
ControlExtensionshandshake.PeerExtensionsgains alarge_message_segmentation: boolfield. TheExtensionsState.both_support_large_message_segmentation(peer_id)query gates segmentation to peers that have mutually negotiated the extension — unmodified peers receive the full message as before.Segmentation (sender side, in
GossipSub.publish)pubsub_msg.SerializeToString(). If the serialized size ≤segment_size(default 256 KiB), send as-is.messageID(reusesseqnowhen available), split the serialized message into ceiling(len(data)/segment_size) chunks.RPCcontaining only theLargeMessageSegmentationExtension— nopublish,control, orsubscriptionsfields.largeMessageSegmentationsupport. All other peers receive the original message (which may be dropped at the transport if >1 MiB, but the protocol remains correct).Reassembly (receiver side, via
ReassemblyBufferinsegmentation.py)continuously_read_streamdetects thelargeMessageSegmentationfield on the incomingRPCand routes it to theReassemblyBuffer.messageID. It tracks total expected count, received indices, and a first-seen timestamp.len(received) == totalSegments, the segments are sorted byindex, concatenated, and the SHA-256 checksum is verified.rpc_pb2.Messageand injected into the normalpush_msgvalidation pipeline. The application sees a normal pubsub message — no API changes required.Backward compatibility
Files changed
libp2p/pubsub/pb/rpc.protoLargeMessageSegmentationExtensionmessage +largeMessageSegmentationfield onRPCandControlExtensionslibp2p/pubsub/segmentation.pySegment,ReassemblyBuffer,segment_message(),reassemble_segments(),should_segment()libp2p/pubsub/extensions.pylarge_message_segmentationtoPeerExtensions+ query methodslibp2p/pubsub/gossipsub.pypublish(); segment intercept inhandle_rpc();max_msg_size/segment_sizeparamslibp2p/pubsub/pubsub.pymax_msg_sizeplumbing →RpcQueue; segment intercept incontinuously_read_streamTest plan
segment_message/reassemble_segments(happy path, checksum mismatch, empty input)ReassemblyBuffer(single segment, out-of-order delivery, partial set, timeout eviction, max cap eviction)Open questions for reviewers
seqnowhich is(peer_id, counter). Is this sufficient, or should we include a topic hash to avoid cross-topic collisions?