Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
52c188d
feat(bitswap): implement batch fetching of blocks and enhance file re…
sumanjeet0012 Apr 14, 2026
c7d74b0
feat: enhance DHT record handling with signing and verification for i…
sumanjeet0012 Apr 18, 2026
ea01b07
Merge remote-tracking branch 'origin/main' into improvement/bitswap
sumanjeet0012 May 2, 2026
5637cc3
feat: add FilesystemBlockStore for persistent block storage and enhan…
sumanjeet0012 May 3, 2026
c5982e7
feat: enhance Merkle DAG handling with DAG-PB leaf nodes and balanced…
sumanjeet0012 May 3, 2026
5f4e18a
feat: introduce BlockService for enhanced block retrieval and caching…
sumanjeet0012 May 3, 2026
796b5d5
feat: add chunk_stream function for efficient streaming of file chunk…
sumanjeet0012 May 3, 2026
3e8b881
feat: enhance BlockService and FilesystemBlockStore with type hints, …
sumanjeet0012 May 3, 2026
5df7ca8
Add comprehensive tests for Bitswap functionality
sumanjeet0012 May 3, 2026
58719a7
refactor: clean up imports and improve code formatting across multipl…
sumanjeet0012 May 3, 2026
ead47b0
Refactor type hints and add assertions in tests
sumanjeet0012 May 3, 2026
6acceb2
removed logs file
sumanjeet0012 May 3, 2026
a1456f5
fix: adjust DEFAULT_CHUNK_SIZE for DAG-PB overhead, enhance wantType …
sumanjeet0012 May 3, 2026
f7d27b6
feat: implement batch sending for Bitswap blocks and enhance error ha…
sumanjeet0012 May 3, 2026
54d7ebf
refactor: clean up whitespace and improve code readability in Bitswap…
sumanjeet0012 May 3, 2026
fe1a152
refactor: enhance type hints for batch processing in Bitswap and Merk…
sumanjeet0012 May 3, 2026
49ad3ef
feat: add ProviderQueryManager for DHT-based provider discovery and c…
sumanjeet0012 May 4, 2026
e88f3dc
refactor: clean up type hints and remove unnecessary whitespace in te…
sumanjeet0012 May 4, 2026
f65ca73
refactor: remove unnecessary whitespace in Gossipsub test files
sumanjeet0012 May 4, 2026
55a91e0
newsfragment added
sumanjeet0012 May 4, 2026
21fb316
refactor: improve formatting of docstring in add_block method
sumanjeet0012 May 4, 2026
0cbec92
refactor: improve docstring clarity for add_block method parameters
sumanjeet0012 May 4, 2026
0f0b6bb
refactor: update provider_query_manager to use find_providers for DHT…
sumanjeet0012 May 6, 2026
4d1137a
refactor: enhance verify_record to support multiple key types and imp…
sumanjeet0012 May 6, 2026
c0c9207
Merge branch 'main' into improvement/bitswap
acul71 May 15, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 15 additions & 13 deletions examples/bitswap/bitswap.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,8 @@ async def run_provider(file_path: str, port: int = 0):
# Create host
host = new_host()

async with host.run(listen_addrs=listen_addrs):
peer_id = host.get_id()
logger.info(f"Peer ID: {peer_id}")
async with host.run(listen_addrs=listen_addrs), trio.open_nursery() as nursery:
logger.info(f"Peer ID: {host.get_id()}")

# Get actual listening addresses
addrs = host.get_addrs()
Expand All @@ -91,7 +90,8 @@ async def run_provider(file_path: str, port: int = 0):
await bitswap.start()
logger.info("✓ Bitswap started")

# Create Merkle DAG
# Set nursery so bitswap can spawn background tasks
bitswap.set_nursery(nursery)
dag = MerkleDag(bitswap)

logger.info("")
Expand Down Expand Up @@ -198,13 +198,14 @@ async def run_client(
# Create host
host = new_host()

async with host.run(listen_addrs=listen_addrs):
async with host.run(listen_addrs=listen_addrs), trio.open_nursery() as nursery:
logger.info(f"Client Peer ID: {host.get_id()}")

# Start Bitswap
bitswap = BitswapClient(host)
await bitswap.start()
logger.info("✓ Bitswap started")
bitswap.set_nursery(nursery)

try:
# Connect to provider
Expand All @@ -214,7 +215,6 @@ async def run_client(
await host.connect(peer_info)
logger.info("✓ Connected")

# Create Merkle DAG
dag = MerkleDag(bitswap)

logger.info("")
Expand All @@ -232,7 +232,7 @@ def progress_callback(current: int, total: int, status: str):
# Fetch file with automatic filename extraction
try:
file_data, filename = await dag.fetch_file(
root_cid, progress_callback=progress_callback
root_cid, progress_callback=progress_callback, timeout=120.0
)

# Show fetch statistics
Expand Down Expand Up @@ -284,18 +284,18 @@ def progress_callback(current: int, total: int, status: str):
logger.info("=" * 70)
logger.info(f"Size: {format_size(len(file_data))}")

# Determine output filename
# Determine output filename (priority: metadata > generated)
if filename:
output_filename = filename
logger.info(f"Filename: {filename} (from metadata)")
final_filename = filename
logger.info(f"Filename: {final_filename} (from metadata)")
else:
output_filename = (
final_filename = (
f"file_{format_cid_for_display(root_cid, max_len=16)}.bin"
)
logger.info(f"Filename: {output_filename} (no metadata)")
logger.info(f"Filename: {final_filename} (generated from CID)")

# Handle filename conflicts
output_file = output_path / output_filename
output_file = output_path / final_filename
if output_file.exists():
stem = output_file.stem
suffix = output_file.suffix
Expand All @@ -315,7 +315,9 @@ def progress_callback(current: int, total: int, status: str):
except Exception as e:
logger.error(f"Failed: {e}")
logger.exception("Full traceback:")
raise
finally:
pass # Nursery will cleanup background tasks
await bitswap.stop()


Expand Down
20 changes: 19 additions & 1 deletion libp2p/bitswap/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@
New code should prefer the object-returning variants above.
"""

from .block_store import BlockStore, MemoryBlockStore
from .block_service import BlockService
from .block_store import BlockStore, FilesystemBlockStore, MemoryBlockStore
from .cid import (
CID_V0,
CID_V1,
Expand Down Expand Up @@ -65,12 +66,29 @@
MessageTooLargeError,
TimeoutError,
)
from .wantlist import (
BitswapMessage,
BlockPresence,
BlockPresenceType,
Wantlist,
WantlistEntry,
WantType,
)

__all__ = [
# Core
"BitswapClient",
"BlockService",
"BlockStore",
"MemoryBlockStore",
"FilesystemBlockStore",
# Messages
"BitswapMessage",
"BlockPresence",
"BlockPresenceType",
"Wantlist",
"WantlistEntry",
"WantType",
# CID types
"CIDInput",
"CIDObject",
Expand Down
196 changes: 196 additions & 0 deletions libp2p/bitswap/block_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
"""
BlockService: transparent local→network fallback for block retrieval.

Sits between MerkleDag and BitswapClient, providing:
- Local-first lookup (no network cost if block is already stored)
- Automatic caching of network-fetched blocks into the local store
- Peer announcement when new blocks are stored locally
- A clean abstraction so MerkleDag is not hardwired to BitswapClient
"""

from __future__ import annotations

from collections.abc import Sequence
import logging
from typing import TYPE_CHECKING

from .block_store import BlockStore
from .cid import CIDInput, cid_to_bytes, format_cid_for_display, parse_cid

if TYPE_CHECKING:
from libp2p.peer.id import ID as PeerID

from .client import BitswapClient

logger = logging.getLogger(__name__)


class BlockService:
"""
Combines a local BlockStore with a BitswapClient into one unified interface.

get_block() flow:
1. Check local BlockStore → return immediately if found (no network)
2. Fetch via BitswapClient → goes to the network
3. Auto-cache the result → store locally so next call is free

put_block() flow:
1. Write to local BlockStore
2. Call bitswap.add_block() so peers who have this CID in their
wantlist are notified and can receive it

This is a drop-in wrapper: MerkleDag can use BlockService instead of
calling bitswap directly, and the behaviour is identical but with the
caching and announcement benefits added transparently.

Example:
>>> store = FilesystemBlockStore("./blocks")
>>> service = BlockService(store, bitswap)
>>> dag = MerkleDag(bitswap, block_service=service)

"""

def __init__(self, store: BlockStore, bitswap: BitswapClient) -> None:
self.store = store
self.bitswap = bitswap

async def get_block(
self,
cid: CIDInput,
peer_id: PeerID | None = None,
timeout: float = 30.0,
) -> bytes | None:
"""
Get a block. Checks local store first, then fetches from network.
Any block fetched from the network is automatically cached locally.

Args:
cid: The CID of the block to retrieve
peer_id: Optional specific peer to fetch from (passed to bitswap)
timeout: Network timeout in seconds

Returns:
Block data bytes, or None if not found anywhere

"""
cid_bytes = cid_to_bytes(cid)
cid_obj = parse_cid(cid_bytes)

# 1. Local lookup — instant, no network cost
data = await self.store.get_block(cid_obj)
if data is not None:
logger.debug(
f"BlockService: local hit {format_cid_for_display(cid_obj, max_len=12)}"
)
return data

# 2. Network fetch via Bitswap
logger.debug(
f"BlockService: local miss, fetching from network "
f"{format_cid_for_display(cid_obj, max_len=12)}"
)
try:
data = await self.bitswap.get_block(cid_bytes, peer_id, timeout)
except Exception as e:
logger.warning(f"BlockService: network fetch failed: {e}")
return None

if data is not None:
# 3. Auto-cache locally — future requests for this block are free
await self.store.put_block(cid_obj, data)
logger.debug(
f"BlockService: cached fetched block "
f"{format_cid_for_display(cid_obj, max_len=12)}"
)

return data

async def put_block(self, cid: CIDInput, data: bytes) -> None:
"""
Store a block locally and announce it to waiting peers.

Calling bitswap.add_block() both writes to bitswap's own store AND
notifies any peers who have this CID in their pending wantlist.
We also write to our own store so get_block() local-hits on it.

Args:
cid: The CID of the block
data: The block data bytes

"""
cid_obj = parse_cid(cid_to_bytes(cid))

# Write to our local store
await self.store.put_block(cid_obj, data)

# add_block() writes to bitswap's internal store AND calls
# _notify_peers_about_block() for any peers waiting on this CID
await self.bitswap.add_block(cid_obj, data)

logger.debug(
f"BlockService: stored and announced "
f"{format_cid_for_display(cid_obj, max_len=12)}"
)

async def get_blocks_batch(
self,
cids: Sequence[CIDInput],
peer_id: PeerID | None = None,
timeout: float = 30.0,
batch_size: int = 32,
) -> dict[bytes, bytes]:
"""
Batch-fetch multiple blocks. Local hits are returned immediately;
only missing blocks go to the network. All network-fetched blocks
are auto-cached locally.

Args:
cids: List of CIDs to fetch
peer_id: Optional specific peer to fetch from
timeout: Network timeout in seconds
batch_size: Wantlist batch size passed to bitswap

Returns:
Dict mapping cid_bytes -> block_data for all found blocks

"""
results: dict[bytes, bytes] = {}
missing_cids: list[CIDInput] = []

# Local pass first
for cid in cids:
cid_bytes = cid_to_bytes(cid)
cid_obj = parse_cid(cid_bytes)
data = await self.store.get_block(cid_obj)
if data is not None:
results[cid_bytes] = data
else:
missing_cids.append(cid)

if not missing_cids:
logger.debug(f"BlockService.get_blocks_batch: all {len(cids)} blocks local")
return results

local_hits = len(cids) - len(missing_cids)
logger.debug(
f"BlockService.get_blocks_batch: {local_hits} local hits, "
f"{len(missing_cids)} fetching from network"
)

# Network pass for missing blocks
network_results = await self.bitswap.get_blocks_batch(
missing_cids, peer_id=peer_id, timeout=timeout, batch_size=batch_size
)

# Auto-cache all network-fetched blocks
for cid_bytes, data in network_results.items():
cid_obj = parse_cid(cid_bytes)
await self.store.put_block(cid_obj, data)
results[cid_bytes] = data

return results

@property
def block_store(self) -> BlockStore:
"""Expose the underlying BlockStore (used by MerkleDag internals)."""
return self.store
Loading
Loading