Skip to content

Commit b5b87a7

Browse files
authored
feat(dag/walker): opt-in BloomTracker to avoid duplicated walks (#1124)
* feat(dag/walker): add VisitedTracker with BloomTracker and MapTracker VisitedTracker interface for memory-efficient DAG traversal dedup. BloomTracker uses a scalable bloom filter chain (~4 bytes/CID vs ~75 for a map), enabling dedup on repos with tens of millions of CIDs. - BloomTracker: auto-scaling chain, configurable FP rate via BloomParams, unique random SipHash keys per instance (uncorrelated FPs across nodes) - MapTracker: exact dedup for tests and small datasets - *cid.Set satisfies the interface for drop-in compatibility - go.mod: update ipfs/bbloom to master (for NewWithKeys) * feat(dag/walker): add WalkDAG with codec-agnostic link extraction iterative DFS walker that integrates VisitedTracker dedup directly into the traversal loop, skipping entire subtrees in O(1). - LinksFetcherFromBlockstore: extracts links from any codec registered in the global multicodec registry (dag-pb, dag-cbor, raw, etc.) - ~2x faster than legacy go-ipld-prime selector traversal (no selector machinery, simpler decoding, fewer allocations) - WithLocality option for MFS providers to skip non-local blocks - best-effort error handling: fetch failures log and skip, do not mark the CID as visited (allows retry via another pin or next cycle) - benchmarks comparing BlockAll vs WalkDAG across dag-pb, dag-cbor, and mixed-codec DAGs * feat(dag/walker): add WalkEntityRoots for entity-aware traversal emits entity roots (files, directories, HAMT shards) skipping internal file chunks. core of the +entities provide strategy. - NodeFetcherFromBlockstore: detects UnixFS entity type from the ipld-prime decoded node's Data field - directories and HAMT shards: emit and recurse into children - non-UnixFS codecs (dag-cbor, dag-json): emit and follow links - same options as WalkDAG: WithVisitedTracker, WithLocality - tests: dag-pb, raw, dag-cbor, mixed codecs, HAMT, dedup, error handling, stop conditions * test(dag/walker): add BloomTracker FP rate regression tests catch unexpected regressions in ipfs/bbloom behavior or BloomParams derivation that would silently degrade the false positive rate. - measurable rate (1/1000): 100K probes produce observable FPs, asserts rate is within 5x of target - default rate (1/4.75M): 100K probes must produce exactly 0 FPs * fix(provider): stream error continues to next, add NewConcatProvider - NewPrioritizedProvider: stream init error no longer stops remaining streams (e.g. MFS flush error does not prevent pinned content from being provided) - NewConcatProvider: concatenates pre-deduplicated streams without its own visited set, for use with shared VisitedTracker * feat(pinner): add NewUniquePinnedProvider and NewPinnedEntityRootsProvider NewUniquePinnedProvider: emits all pinned blocks with cross-pin dedup via shared VisitedTracker (bloom or map). walks recursive pin DAGs first, then direct pins. NewPinnedEntityRootsProvider: same structure but uses WalkEntityRoots, emitting only entity roots and skipping internal file chunks. existing NewPinnedProvider is unchanged. * test: add PrioritizedProvider error-continue regression test - remove unused daggen variable in uniquepinprovider_test.go * refactor(provider): use labeled break in NewConcatProvider for consistency match the defensive read-side ctx.Done select pattern already used by NewPrioritizedProvider in the same file * refactor(dag/walker): extract shared linkSystemForBlockstore helper - deduplicate LinkSystem construction used by both LinksFetcherFromBlockstore and NodeFetcherFromBlockstore - wrap blockstore with NewIdStore so identity CIDs (multihash 0x00, data inline in the CID) are decoded without a datastore lookup * fix(dag/walker): skip emitting identity CIDs, add tests identity CIDs (multihash 0x00) embed data inline, so providing them to the DHT is wasteful. the walker now traverses through identity CIDs (following their links) but never emits them. - add isIdentityCID check to WalkDAG and WalkEntityRoots - simplify WalkEntityRoots emit/descend logic - tests for identity raw leaf, identity dag-pb directory with normal children, normal directory with identity child * test(dag/walker): add symlink entity detection tests * refactor: consolidate identity CID tests, filter direct pins - inline identity CID check (c.Prefix().MhType == mh.IDENTITY) in all emit paths: WalkDAG, WalkEntityRoots, and direct pin loops in both NewUniquePinnedProvider and NewPinnedEntityRootsProvider - move all identity CID tests to dag/walker/identity_test.go - add provider-level identity tests for direct pins and recursive DAGs * fix(dag/walker): visit siblings in left-to-right link order the stack-based DFS was pushing children in link order, causing the last child to be popped first (right-to-left). reverse children before pushing so the first link is on top and gets visited first. this matches the legacy fetcherhelpers.BlockAll selector traversal (ipld-prime iterates list/map entries in insertion order) and the conventional DFS order described in IPIP-0412. - walker.go, entity.go: slices.Reverse(children) before stack push - walker.go: document traversal order in WalkDAG godoc - entity.go: document order parity in WalkEntityRoots godoc - walker_test.go, entity_test.go: add sibling order regression tests * fix(pinner): continue on pin iteration error in unique providers a corrupted pin entry was stopping the entire provide cycle because the goroutine returned on RecursiveKeys/DirectKeys error. change to continue so remaining pins are still provided (best-effort). the error from the pinner iterator already contains context (bad CID bytes, datastore key, etc.) -- sc.Pin.Key is zero-value on error so including it in the log would be noise. matches the best-effort pattern used in WalkDAG/WalkEntityRoots where fetch errors are logged and skipped. * docs(dag/walker): document implicit behaviors - collectLinks: note that map keys are not recursed (no known codec uses link-typed map keys) - detectEntityType: extract c.Prefix() once for readability - grow: document MinBloomCapacity invariant that prevents small-bitset FP rate issues in grown blooms * fix: address review feedback from gammazero uniquepinprovider: use skip-early style for tracker.Visit in direct pin loops (clearer control flow) visited.go: document that VisitedTracker implementations may be probabilistic, and must keep FP rate negligible or allow callers to adjust it * feat(walker): log bloom tracker creation and autoscaling log capacity, FP rate, and hash parameters on creation. log previous/new capacity and chain length on autoscale. helps operators understand bloom sizing and detect unexpected growth during reprovide cycles. * feat(walker): add Deduplicated() to BloomTracker and MapTracker counts Visit() calls that returned false (CID already seen). callers can log this after a reprovide cycle to show how much dedup the bloom filter achieved. * chore: update ipfs/bbloom to v0.1.0 * docs(walker): document bloom iteration order tradeoff Addresses review feedback: #1124 (comment) #1124 (comment) * docs(walker): remove Kubo-specific 22h reprovide interval The reprovide interval is configured by the caller, not by boxo. Addresses review feedback: #1124 (comment) * refactor(walker): consolidate WalkDAG and WalkEntityRoots Extract shared walkLoop for the common iterative DFS logic (stack management, tracker dedup, locality check, identity CID skip, emit). WalkDAG and WalkEntityRoots now differ only in their fetch callback. Addresses review feedback: #1124 (comment) * refactor(pinner): consolidate pin provider functions Extract shared newPinnedProvider for the common goroutine, emit, pin iteration, and direct-pin logic. NewUniquePinnedProvider and NewPinnedEntityRootsProvider now differ only in the walk callback. Addresses review feedback: #1124 (comment) * docs: move pin provider entry to Added in CHANGELOG These are new functions, not fixes to existing ones. Addresses review feedback: #1124 (comment)
1 parent c0f7759 commit b5b87a7

18 files changed

Lines changed: 3331 additions & 8 deletions

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,15 @@ The following emojis are used to highlight certain changes:
1616

1717
### Added
1818

19+
- `dag/walker`: new package for memory-efficient DAG traversal with deduplication. `VisitedTracker` interface with `BloomTracker` (scalable bloom filter chain, ~4 bytes/CID vs ~75 bytes for a map) and `MapTracker` (exact, for tests). `WalkDAG` provides iterative DFS traversal with integrated dedup, supporting dag-pb, dag-cbor, raw, and other registered codecs. ~2x faster than the legacy go-ipld-prime selector-based traversal. `WalkEntityRoots` emits only entity roots (files, directories, HAMT shards) instead of every block, skipping internal file chunks. [#1124](https://github.com/ipfs/boxo/pull/1124)
20+
- `pinner`: `NewUniquePinnedProvider` and `NewPinnedEntityRootsProvider` log and skip corrupted pin entries instead of aborting the provide cycle, allowing remaining pins to still be provided. [#1124](https://github.com/ipfs/boxo/pull/1124)
1921
- `routing/http/client`: `WithProviderInfoFunc` option resolves provider addresses at provide-time instead of client construction time. This only impacts legacy HTTP-only custom routing setups that depend on [IPIP-526](https://github.com/ipfs/specs/pull/526) and were sending unresolved `0.0.0.0` addresses in provider records instead of actual interface addresses. [#1115](https://github.com/ipfs/boxo/pull/1115)
2022
- `chunker`: added `Register` function to allow custom chunkers to be registered for use with `FromString`.
2123
- `mfs`: added `Directory.Mode()` and `Directory.ModTime()` getters to match the existing `File.Mode()` and `File.ModTime()` API. [#1131](https://github.com/ipfs/boxo/pull/1131)
2224

2325
### Changed
2426

27+
- `provider`: `NewPrioritizedProvider` now continues to the next stream when one fails instead of stopping all streams. `NewConcatProvider` added for pre-deduplicated streams. [#1124](https://github.com/ipfs/boxo/pull/1124)
2528
- `chunker`: `FromString` now rejects malformed `size-` strings with extra parameters (e.g. `size-123-extra` was previously silently accepted).
2629
- `gateway`: compliance with gateway-conformance [v0.13](https://github.com/ipfs/gateway-conformance/releases/tag/v0.13)
2730
- upgrade to `go-libp2p` [v0.48.0](https://github.com/libp2p/go-libp2p/releases/tag/v0.48.0)

dag/walker/doc.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
// Package walker provides memory-efficient DAG traversal with
2+
// deduplication. Optimized for the IPFS provide system, but useful
3+
// anywhere repeated DAG walks need to skip already-visited subtrees.
4+
//
5+
// The primary entry point is [WalkDAG], which walks a DAG rooted at a
6+
// given CID, emitting each visited CID to a callback. When combined
7+
// with a [VisitedTracker] (e.g. [BloomTracker]), entire subtrees
8+
// already seen are skipped in O(1).
9+
//
10+
// For entity-aware traversal that only emits file/directory/HAMT roots
11+
// instead of every block, see [WalkEntityRoots].
12+
//
13+
// Blocks are decoded using the codecs registered in the process via
14+
// the global multicodec registry. In a standard kubo build this
15+
// includes dag-pb, dag-cbor, dag-json, cbor, json, and raw.
16+
//
17+
// Use [LinksFetcherFromBlockstore] to create a fetcher backed by a
18+
// local blockstore. For custom link extraction (e.g. a different codec
19+
// registry or non-blockstore storage), pass your own [LinksFetcher]
20+
// function directly to [WalkDAG].
21+
package walker

dag/walker/entity.go

Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
package walker
2+
3+
import (
4+
"context"
5+
6+
blockstore "github.com/ipfs/boxo/blockstore"
7+
"github.com/ipfs/boxo/ipld/unixfs"
8+
cid "github.com/ipfs/go-cid"
9+
ipld "github.com/ipld/go-ipld-prime"
10+
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
11+
basicnode "github.com/ipld/go-ipld-prime/node/basic"
12+
)
13+
14+
// EntityType represents the semantic type of a DAG entity.
15+
type EntityType int
16+
17+
const (
18+
EntityUnknown EntityType = iota
19+
EntityFile // UnixFS file root (not its chunks)
20+
EntityDirectory // UnixFS flat directory
21+
EntityHAMTShard // UnixFS HAMT sharded directory bucket
22+
EntitySymlink // UnixFS symbolic link
23+
)
24+
25+
// NodeFetcher returns child link CIDs and entity type for a given CID.
26+
// Used by [WalkEntityRoots] which needs UnixFS type detection to decide
27+
// whether to descend into children (directories, HAMT shards) or stop
28+
// (files, symlinks).
29+
type NodeFetcher func(ctx context.Context, c cid.Cid) (linkCIDs []cid.Cid, entityType EntityType, err error)
30+
31+
// NodeFetcherFromBlockstore creates a [NodeFetcher] backed by a local
32+
// blockstore. Like [LinksFetcherFromBlockstore], it decodes blocks via
33+
// ipld-prime's global multicodec registry (dag-pb, dag-cbor, raw, etc.)
34+
// and handles identity CIDs transparently via [blockstore.NewIdStore].
35+
//
36+
// Entity type detection:
37+
// - dag-pb with valid UnixFS Data: file, directory, HAMT shard, or symlink
38+
// - dag-pb without valid UnixFS Data: EntityUnknown
39+
// - raw codec: EntityFile (small file stored as a single raw block)
40+
// - all other codecs (dag-cbor, dag-json, etc.): EntityUnknown
41+
func NodeFetcherFromBlockstore(bs blockstore.Blockstore) NodeFetcher {
42+
ls := linkSystemForBlockstore(bs)
43+
44+
return func(ctx context.Context, c cid.Cid) ([]cid.Cid, EntityType, error) {
45+
lnk := cidlink.Link{Cid: c}
46+
nd, err := ls.Load(ipld.LinkContext{Ctx: ctx}, lnk, basicnode.Prototype.Any)
47+
if err != nil {
48+
return nil, EntityUnknown, err
49+
}
50+
51+
links := collectLinks(c, nd)
52+
entityType := detectEntityType(c, nd)
53+
return links, entityType, nil
54+
}
55+
}
56+
57+
// detectEntityType infers the UnixFS entity type from an ipld-prime
58+
// decoded node. For dag-pb nodes, it reads the "Data" field and parses
59+
// it as UnixFS protobuf. For raw codec nodes, it returns EntityFile.
60+
// For everything else, it returns EntityUnknown.
61+
func detectEntityType(c cid.Cid, nd ipld.Node) EntityType {
62+
codec := c.Prefix().Codec
63+
64+
// raw codec: small file stored as a single block
65+
if codec == cid.Raw {
66+
return EntityFile
67+
}
68+
69+
// only dag-pb has UnixFS semantics; other codecs are unknown
70+
if codec != cid.DagProtobuf {
71+
return EntityUnknown
72+
}
73+
74+
// dag-pb: try to read the "Data" field for UnixFS type
75+
dataField, err := nd.LookupByString("Data")
76+
if err != nil || dataField.IsAbsent() || dataField.IsNull() {
77+
return EntityUnknown
78+
}
79+
80+
dataBytes, err := dataField.AsBytes()
81+
if err != nil {
82+
return EntityUnknown
83+
}
84+
85+
fsn, err := unixfs.FSNodeFromBytes(dataBytes)
86+
if err != nil {
87+
return EntityUnknown
88+
}
89+
90+
switch fsn.Type() {
91+
case unixfs.TFile, unixfs.TRaw:
92+
return EntityFile
93+
case unixfs.TDirectory:
94+
return EntityDirectory
95+
case unixfs.THAMTShard:
96+
return EntityHAMTShard
97+
case unixfs.TSymlink:
98+
return EntitySymlink
99+
default:
100+
return EntityUnknown
101+
}
102+
}
103+
104+
// WalkEntityRoots traverses a DAG calling emit for each entity root.
105+
//
106+
// Entity roots are semantic boundaries in the DAG:
107+
// - File/symlink roots: emitted, children (chunks) NOT traversed
108+
// - Directory roots: emitted, children recursed
109+
// - HAMT shard nodes: emitted (needed for directory enumeration),
110+
// children recursed
111+
// - Non-UnixFS nodes (dag-cbor, dag-json, etc.): emitted AND children
112+
// recursed to discover further content. The +entities optimization
113+
// (skip chunks) only applies to UnixFS files; for all other codecs,
114+
// every reachable CID is emitted.
115+
// - Raw leaf nodes: emitted (no children to recurse)
116+
//
117+
// Same traversal order as [WalkDAG]: pre-order DFS with left-to-right
118+
// sibling visiting. Uses the same option types: [WithVisitedTracker]
119+
// for bloom/map dedup across walks, [WithLocality] for MFS locality
120+
// checks.
121+
func WalkEntityRoots(
122+
ctx context.Context,
123+
root cid.Cid,
124+
fetch NodeFetcher,
125+
emit func(cid.Cid) bool,
126+
opts ...Option,
127+
) error {
128+
cfg := &walkConfig{}
129+
for _, o := range opts {
130+
o(cfg)
131+
}
132+
return walkLoop(ctx, root, func(ctx context.Context, c cid.Cid) ([]cid.Cid, error) {
133+
children, entityType, err := fetch(ctx, c)
134+
if err != nil {
135+
return nil, err
136+
}
137+
// Only descend into directories, HAMT shards, and unknown
138+
// node types. File and symlink children (chunks) are not
139+
// entity roots, so we stop here.
140+
if entityType == EntityFile || entityType == EntitySymlink {
141+
return nil, nil
142+
}
143+
return children, nil
144+
}, emit, cfg)
145+
}

0 commit comments

Comments
 (0)