diff --git a/CHANGELOG.md b/CHANGELOG.md index c9509bdc8..5da07dd3d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -36,6 +36,7 @@ The following emojis are used to highlight certain changes: - upgrade to `go-libp2p` [v0.41.1](https://github.com/libp2p/go-libp2p/releases/tag/v0.41.1) - `bitswap/network`: Add a new `requests_in_flight` metric gauge that measures how many bitswap streams are being written or read at a given time. - improve speed of data onboarding by batching/bufering provider queue writes [#888](https://github.com/ipfs/boxo/pull/888) +- `provider/queue` deduplicates CIDs [#910](https://github.com/ipfs/boxo/pull/910) ### Removed diff --git a/provider/internal/queue/queue.go b/provider/internal/queue/queue.go index 7144141c3..fbda5d7c4 100644 --- a/provider/internal/queue/queue.go +++ b/provider/internal/queue/queue.go @@ -9,6 +9,7 @@ import ( "time" "github.com/gammazero/deque" + lru "github.com/hashicorp/golang-lru/v2" cid "github.com/ipfs/go-cid" datastore "github.com/ipfs/go-datastore" namespace "github.com/ipfs/go-datastore/namespace" @@ -22,6 +23,9 @@ const ( // batchSize is the limit on number of CIDs kept in memory at which ther // are all written to the datastore. batchSize = 16 * 1024 + // dedupCacheSize is the size of the LRU cache used to deduplicate CIDs in + // the queue. + dedupCacheSize = 2 * 1024 // idleWriteTime is the amout of time to check if the queue has been idle // (no input or output). If the queue has been idle since the last check, // then write all buffered CIDs to the datastore. @@ -127,12 +131,13 @@ func (q *Queue) worker(ctx context.Context) { var ( c cid.Cid counter uint64 - k datastore.Key = datastore.Key{} inBuf deque.Deque[cid.Cid] ) const baseCap = 1024 inBuf.SetBaseCap(baseCap) + k := datastore.Key{} + dedupCache, _ := lru.New[cid.Cid, struct{}](dedupCacheSize) defer func() { if c != cid.Undef { @@ -206,6 +211,11 @@ func (q *Queue) worker(ctx context.Context) { if !ok { return } + if found, _ := dedupCache.ContainsOrAdd(toQueue, struct{}{}); found { + // update recentness in LRU cache + dedupCache.Add(toQueue, struct{}{}) + continue + } idle = false if c == cid.Undef { @@ -282,8 +292,7 @@ func (q *Queue) commitInput(ctx context.Context, counter uint64, cids *deque.Deq } cstr := makeCidString(cids.Front()) - n := cids.Len() - for i := 0; i < n; i++ { + for i := range cids.Len() { c := cids.At(i) key := datastore.NewKey(fmt.Sprintf("%020d/%s", counter, cstr)) if err = b.Put(ctx, key, c.Bytes()); err != nil { diff --git a/provider/internal/queue/queue_test.go b/provider/internal/queue/queue_test.go index d0250e995..be2874669 100644 --- a/provider/internal/queue/queue_test.go +++ b/provider/internal/queue/queue_test.go @@ -118,3 +118,21 @@ func TestInitializationWithManyCids(t *testing.T) { assertOrdered(cids, queue, t) } + +func TestDeduplicateCids(t *testing.T) { + ds := sync.MutexWrap(datastore.NewMapDatastore()) + queue := New(ds) + defer queue.Close() + + cids := random.Cids(5) + queue.Enqueue(cids[0]) + queue.Enqueue(cids[0]) + queue.Enqueue(cids[1]) + queue.Enqueue(cids[2]) + queue.Enqueue(cids[1]) + queue.Enqueue(cids[3]) + queue.Enqueue(cids[0]) + queue.Enqueue(cids[4]) + + assertOrdered(cids, queue, t) +}