feat: Bitswap improvements for Kubo compatibility#1321
Conversation
…trieval process for compatibility wth kubo
…compatibility with ipfs kubo - Added support for signed records in the DHT by introducing `make_signed_put_record` function. - Updated `ValueStore` to create signed records when storing values. - Enhanced `Envelope` class to handle raw payload types for peer records. - Introduced utility functions for signing and verifying DHT records. - Updated protobuf definitions to include author and signature fields in records. - Improved logging and debug messages for better traceability.
…ce DAG-PB encoding Co-authored-by: Copilot <copilot@github.com>
… layout Co-authored-by: Copilot <copilot@github.com>
… in MerkleDag Co-authored-by: Copilot <copilot@github.com>
…s and implement add_stream method in MerkleDag for handling io.IOBase streams Co-authored-by: Copilot <copilot@github.com>
…improve chunk_stream documentation, and add Wantlist functionality Co-authored-by: Copilot <copilot@github.com>
- Introduced `test_block_service.py` to validate BlockService behavior including local hits, network fetches, auto-caching, and block storage. - Created `test_filesystem_blockstore.py` to manually test FilesystemBlockStore for basic operations, persistence, and directory structure. - Added `test_io_stream.py` to verify io.IOBase input support with chunk_stream and MerkleDag.add_stream functionalities. - Implemented `test_unixfs_encoding.py` to ensure add_file and add_bytes produce dag-pb leaf blocks and validate balanced layout tree structures. - Developed `test_wantlist.py` to test Wantlist and Message dataclasses, including backward compatibility and public API exports.
- Updated type hints in `make_service` function to allow for None. - Specified type hints for lists of bytes in block retrieval tests. - Added assertions to check for non-null `unixfs` in various tests to ensure proper decoding of DAG PB blocks. - Enhanced type hints for observer and subscriber peers in Gossipsub tests. - Improved type hints for candidate lists in opportunistic grafting tests. - Added type ignore comments for factory Meta classes to suppress type checker warnings. - Updated import statements for ID to include type ignore comments in interop utilities.
475086a to
6acceb2
Compare
…handling, and update tests for dag-pb leaf blocks Co-authored-by: Copilot <copilot@github.com>
…ndling in DAG fetching Co-authored-by: Copilot <copilot@github.com>
… and MerkleDag implementations
…leDag Co-authored-by: Copilot <copilot@github.com>
…aching in Bitswap Co-authored-by: Copilot <copilot@github.com>
…st files and factories
Co-authored-by: Copilot <copilot@github.com>
Co-authored-by: Copilot <copilot@github.com>
Co-authored-by: Copilot <copilot@github.com>
There was a problem hiding this comment.
@sumanjeet0012 Strong PR overall — the encoding root-causes are correct and well-tested (verified locally: 238/238 bitswap tests pass, wire format confirmed Links-before-Data). The PR description is exemplary.
Leaving a few comments inline. Three I'd flag as blocking before merge:
- Breaking API change in
add_filedefault (wrap_with_directory=True) — old callers get a directory CID instead of a file CID for the same call. Either revert the default or call this out loudly in the newsfragment. verify_recordonly handles Ed25519 — silently fails for RSA/Secp256k1 peers, breaking DHT interop with non-Ed25519 nodes.DEFAULT_CHUNK_SIZE = 63 KiB - 32doesn't match Kubo's 256 KiB default — files added by py-libp2p won't have the same root CID asipfs add file.binunless Kubo is told--chunker=size-65504. The PR header claims "Kubo CID compatibility"; this caveat needs to be documented.
Medium-priority items (perf and hygiene) inline. Everything else is comfortable post-merge cleanup.
Nice work on the manual DAG-PB outer envelope — limiting hand-rolling to 0x12 <varlen> <linkbytes> then 0x0a <varlen> <unixfs> while still using protobuf for the inner messages is the minimal, correct approach.
| @@ -187,7 +272,7 @@ async def add_file( | |||
|
|
|||
| dir_data = create_directory_node([(filename, cid, file_size)]) | |||
| dir_cid = compute_cid_v1(dir_data, codec=CODEC_DAG_PB) | |||
There was a problem hiding this comment.
Breaking API change (re: the wrap_with_directory=True default a few lines up at the parameter decl): defaulting this to True silently changes the behavior of add_file(path) for every existing caller — they'll now get a directory CID where they previously got a file CID, and fetch_file returns a (bytes, filename) tuple where it returned bytes before. This is invisible in the new tests because they all pass wrap_with_directory=False.
Suggest defaulting to False for back-compat, or version-bump and add a clear migration note to newsfragments/1321.feature.rst (currently doesn't flag this as breaking).
|
|
||
| """ | ||
| try: | ||
| public_key = Ed25519PublicKey.from_bytes(author_public_key) |
There was a problem hiding this comment.
Interop bug for non-Ed25519 peers. This hard-codes Ed25519 deserialization, so DHT records signed by RSA or Secp256k1 peers will silently fail verification (caught by the try/except and returned as False, indistinguishable from a genuinely invalid signature).
Compare with libp2p/peer/envelope.py:198-216 (pub_key_from_protobuf) which dispatches on KeyType for all three types. Either dispatch the same way here, or accept a PublicKey object and let the caller deserialize.
Kubo defaults to Ed25519 today, so this won't surface in basic interop tests, but it's a real correctness gap.
| DEFAULT_CHUNK_SIZE = 63 * 1024 | ||
| # 63 KB minus 32 bytes to leave room for the dag-pb leaf envelope overhead, | ||
| # ensuring wrapped blocks never exceed MAX_BLOCK_SIZE (63 * 1024). | ||
| DEFAULT_CHUNK_SIZE = 63 * 1024 - 32 |
There was a problem hiding this comment.
Mismatch with Kubo's default chunk size. Kubo's ipfs add uses size-262144 (256 KiB) by default. With this 63 KiB default, py-libp2p will produce a different root CID than ipfs add file.bin for the same content — contradicting the PR's "Kubo CID compatibility" headline.
Leaf CIDs match Kubo (because of RawLeaves=false + dag-pb wrapping), but the root over a multi-chunk file won't. Either:
- Document this clearly in
newsfragments/1321.feature.rst("CIDs matchipfs add --chunker=size-65504"), or - Match Kubo's 256 KiB default and split large messages at the wire layer instead of capping the chunk size.
| chunk = leaf_raw | ||
| logger.debug(f"[DAG] Leaf {idx + 1}: raw block {len(chunk)} bytes") | ||
|
|
||
| file_data += chunk |
There was a problem hiding this comment.
O(n²) bytes concat. file_data += chunk over potentially thousands of leaves means each += allocates a new bytes object and copies all previous data. For a 100 MB file at 63 KB chunks (~1626 leaves), this allocates roughly 80 GB of intermediate strings.
The fix is one line — accumulate into a bytearray (or list + b"".join(parts) at the end). This is the single biggest perf win available in the PR.
Same pattern in _read_message (client.py:1017-1047) and encode_dag_pb (dag_pb.py:125-149).
| await trio.to_thread.run_sync( | ||
| lambda: path.parent.mkdir(parents=True, exist_ok=True) | ||
| ) | ||
| await trio.to_thread.run_sync(path.write_bytes, data) |
There was a problem hiding this comment.
Non-atomic writes. path.write_bytes(data) writes in place. If the process crashes mid-write, the next startup finds a truncated file at a CID path. get_block will then return corrupted bytes that fail verification only if the caller checks.
Standard fix: write to path.with_suffix('.tmp'), then os.replace(tmp, path) — atomic on POSIX, durable on most filesystems.
| # Ensure the result is a plain dict (not a coroutine from a mock) | ||
| if isinstance(result, dict): | ||
| return result | ||
| except Exception: |
There was a problem hiding this comment.
Test infrastructure leaking into production code. The getattr probe + isinstance(result, dict) check + bare except Exception: pass is a workaround for MagicMock returning a coroutine object. This silently masks real failures in production.
Suggest defining a Protocol for the batch interface, typing self.bitswap with it, and removing the runtime probing entirely. Tests can then mock the protocol explicitly.
|
|
||
| # Send all CIDs in a single wantlist to the peer | ||
| if peer_id: | ||
| await self._send_wantlist_to_peer(peer_id, batch) |
There was a problem hiding this comment.
Swallowed exception in _send_wantlist_to_peer causes hangs. That helper (further down in this file) catches all exceptions, logs "Failed to send wantlist to peer", and returns. When called from this batch path, if host.new_stream or _write_message fails for one CID in the batch, the corresponding _pending_requests[cid].wait() below will block until trio.fail_after(timeout) cancels it — adding the full timeout (default 30s) to every per-batch failure.
Fix: propagate the failure from _send_wantlist_to_peer, or event.set() with a sentinel so the waiter can fail fast.
| # We manually construct the wire format to enforce the correct ordering. | ||
|
|
||
| # Add links | ||
| result = b"" |
There was a problem hiding this comment.
Minor: same O(n²) concat pattern as fetch_file reassembly. With 174 links on an internal node and large CIDs, each result += ... allocates a fresh bytes. Trivial fix: build into a bytearray and convert to bytes at return.
| yield chunk | ||
|
|
||
|
|
||
| def chunk_stream( |
There was a problem hiding this comment.
Nit: this new chunk_stream should also be added to the module's __all__ (further down in this file). Doesn't affect direct imports, but it breaks from chunker import * and confuses some IDE auto-import tools.
| ordered_leaf_cids: list[bytes] = [] | ||
|
|
||
| def _collect_leaves_local(cid_bytes: bytes, depth: int = 1) -> None: | ||
| """Traverse locally-fetched blocks to collect leaf CIDs.""" |
There was a problem hiding this comment.
Minor: _collect_leaves_local is unbounded recursion. For a balanced 174-fanout DAG over a 100 MB file the depth is ~2, so this never trips Python's default 1000-frame limit in practice. But a maliciously crafted DAG (e.g. a chain of single-link nodes) would crash with RecursionError. Convert to an explicit stack if you want to harden this.
PR #1321 AI Review1. Summary of Changes
2. Strengths
3. Issues FoundCritical
Major
Minor
4. Security Review
5. Documentation and Examples
6. Newsfragment Requirement
7. Tests and Validation
8. Recommendations for Improvement
9. Questions for the Author
10. Overall Assessment
|
… lookups and always send signed records. Co-authored-by: Copilot <copilot@github.com>
…rove unmarshal_public_key functionality
Bitswap + DHT improvements for Kubo compatibility
What was wrong?
Issue: py-libp2p's Bitswap file operations were not compatible with Kubo (Go-IPFS)
and the broader IPFS network. Files added by py-libp2p could not be fetched by Kubo
nodes, and vice versa. Several root causes were identified:
1. Wrong leaf block encoding (
CODEC_RAWinstead ofdag-pb+ UnixFS)MerkleDag.add_file()andadd_bytes()stored leaf chunks as raw blocks:Kubo wraps every leaf in
UnixFS Data(type=File, data=chunk)inside adag-pbPBNode.Using
CODEC_RAWproduced completely different CIDs for the same content.2. Flat DAG structure (no balanced layout)
create_file_node()put all chunks as direct links on the root node — a flat1-level tree. Kubo uses a balanced tree with a maximum of 174 links per node
(
balanced.Layout). For a 100 MB file at 63 KB chunks (~1,626 chunks), the rootblock was megabytes in size, exceeded
MAX_BLOCK_SIZE, and produced wrong CIDs.3. Wrong DAG-PB wire encoding (field ordering)
encode_dag_pb()usedPBNode.SerializeToString()which emitsData(field 1)before
Links(field 2). The DAG-PB specrequires Links before Data for canonical encoding. Wrong byte order → wrong CID
for every node, even when leaf encoding was correct.
4. No persistent block storage
MemoryBlockStorewas the onlyBlockStoreimplementation. All fetched blocks werelost when the process exited — not usable for any production or long-running node.
5. No transparent caching layer
MerkleDagcalledbitswap.get_block()/bitswap.add_block()directly with noservice layer. Network-fetched blocks were not auto-cached locally, and newly stored
blocks did not announce to waiting peers.
6. No stream input support
add_file()only accepted a file path string. There was no way to add aBytesIO,GzipFile, network pipe, or anyio.IOBasestream without loading everything intomemory first.
7. Magic integers in Bitswap message construction
create_wantlist_entry(cid, want_type=1)used raw integers with no type safety.No
WantTypeenum, noBlockPresencetype, no typedBitswapMessageclass.8. DHT record signing/verification not compatible with Kubo
DHT value records lacked proper signing and verification, causing incompatibility
with Kubo's DHT implementation.
How was it fixed?
Bitswap: Kubo CID compatibility
New
create_leaf_node(data)indag_pb.py:Wraps each chunk in
UnixFS Data(type=File)+PBNode— matches Kubo'sRawLeaves=falsedefault. Leaf CIDs are now byte-identical to Kubo.New
balanced_layout(leaves)indag_pb.py:Groups leaves into batches of 174, builds a tree level by level — exactly matching
Go's
balanced.Layout. Files of any size now produce correct CIDs.Fixed
encode_dag_pb()indag_pb.py:Manually constructs wire bytes with Links (field 2, tag
0x12) before Data(field 1, tag
0x0a) — DAG-PB canonical ordering.Updated
add_file()andadd_bytes()indag.py:Both methods now use
create_leaf_node()+balanced_layout()instead ofCODEC_RAW+create_file_node().Bitswap: batch fetching
Enhanced
get_blocks_batch()inclient.py:Sends all CIDs in a single wantlist message per batch, waits for all responses on
the same stream. Avoids opening hundreds of individual streams (which caused Kubo
to send
GO_AWAY).New: FilesystemBlockStore
New
FilesystemBlockStoreinblock_store.py:Stores each block as a file at
<base>/<cid[:2]>/<cid[2:]>. Usestrio.to_thread.run_syncfor non-blocking disk I/O. Drop-in replacement forMemoryBlockStore— blocks now survive process restarts.New: BlockService
New
block_service.py:Transparent local→network fallback layer between
MerkleDagandBitswapClient:get_block(): checks local store first (free), falls back to network, auto-caches resultput_block(): stores locally + callsbitswap.add_block()to announce to waiting peersget_blocks_batch(): local hits skip the network entirelyMerkleDagnow accepts an optionalblock_service: BlockServiceparameter. All blockaccess routes through
_put_block/_get_block/_get_blocks_batchhelpers.No regression —
MerkleDag(bitswap)withoutblock_servicestill works unchanged.New:
add_stream()+chunk_stream()New
chunk_stream(stream: io.IOBase)inchunker.py:Reads one chunk at a time from any
io.IOBase—BytesIO,open()handles,GzipFile,BZ2File, network streams.New
add_stream()method onMerkleDag:Accepts any
io.IOBasewith constant memory usage (O(chunk_size) regardless offile size). Produces the same CID as
add_file()for the same content.New: Wantlist / Message dataclasses
New
wantlist.pywith 6 typed dataclasses:WantTypeenum (Block=0,Have=1) replaces magic integersBlockPresenceTypeenum (Have=0,DontHave=1)WantlistEntry,Wantlist— typed wantlist withadd(),cancel(),contains()BlockPresence— typed HAVE/DONT_HAVE responseBitswapMessage— full message builder withto_proto()/from_proto()create_wantlist_entry()updated to acceptWantType | int— fully backward compatible.DHT: record signing and verification
Enhanced DHT record handling with proper signing and verification for compatibility
with Kubo's DHT implementation. Updated
kademlia.proto,value_store.py,envelope.py,peer_record.py, and addedrecords/record.py+records/utils.py.Files changed
libp2p/bitswap/dag_pb.pycreate_leaf_node(),balanced_layout(), fixedencode_dag_pb()canonical orderinglibp2p/bitswap/dag.pyadd_file(),add_bytes(), newadd_stream(),BlockServiceroutinglibp2p/bitswap/client.pyget_blocks_batch()with single-wantlist-per-batch strategylibp2p/bitswap/chunker.pychunk_stream(stream: io.IOBase)libp2p/bitswap/block_store.pyFilesystemBlockStorelibp2p/bitswap/block_service.pyBlockServicelibp2p/bitswap/wantlist.pylibp2p/bitswap/messages.pycreate_wantlist_entry()acceptsWantType | intlibp2p/bitswap/__init__.pylibp2p/kad_dht/libp2p/records/record.py,utils.pylibp2p/peer/envelope.py,peer_record.pyexamples/bitswap/bitswap.pyTest files added
test_unixfs_encoding.pydag-pbleaf encoding + balanced DAG layouttest_canonical_dag_pb.pytest_filesystem_blockstore.pyFilesystemBlockStorepersistence + round-triptest_block_service.pyBlockServicelocal hit / miss / auto-cache / announcetest_io_stream.pychunk_stream()+add_stream()with BytesIO, GzipFile, file handlestest_wantlist.pyWantType,Wantlist,BitswapMessage,to_proto()/from_proto()To-Do
Cute Animal Picture