From 56fb218d83066e2421d19ac63e6ced548e628042 Mon Sep 17 00:00:00 2001 From: guillaumemichel Date: Wed, 16 Apr 2025 09:44:51 +0200 Subject: [PATCH 1/5] deduplication cache with additional state --- provider/internal/queue/queue.go | 9 +++++++-- provider/internal/queue/queue_test.go | 18 ++++++++++++++++++ 2 files changed, 25 insertions(+), 2 deletions(-) diff --git a/provider/internal/queue/queue.go b/provider/internal/queue/queue.go index 7144141c3..4f804b502 100644 --- a/provider/internal/queue/queue.go +++ b/provider/internal/queue/queue.go @@ -9,6 +9,7 @@ import ( "time" "github.com/gammazero/deque" + lru "github.com/hashicorp/golang-lru/v2" cid "github.com/ipfs/go-cid" datastore "github.com/ipfs/go-datastore" namespace "github.com/ipfs/go-datastore/namespace" @@ -127,12 +128,13 @@ func (q *Queue) worker(ctx context.Context) { var ( c cid.Cid counter uint64 - k datastore.Key = datastore.Key{} inBuf deque.Deque[cid.Cid] ) const baseCap = 1024 inBuf.SetBaseCap(baseCap) + k := datastore.Key{} + dedupCache, _ := lru.New[cid.Cid, struct{}](baseCap) defer func() { if c != cid.Undef { @@ -206,6 +208,9 @@ func (q *Queue) worker(ctx context.Context) { if !ok { return } + if found, _ := dedupCache.ContainsOrAdd(toQueue, struct{}{}); found { + continue + } idle = false if c == cid.Undef { @@ -283,7 +288,7 @@ func (q *Queue) commitInput(ctx context.Context, counter uint64, cids *deque.Deq cstr := makeCidString(cids.Front()) n := cids.Len() - for i := 0; i < n; i++ { + for i := range n { c := cids.At(i) key := datastore.NewKey(fmt.Sprintf("%020d/%s", counter, cstr)) if err = b.Put(ctx, key, c.Bytes()); err != nil { diff --git a/provider/internal/queue/queue_test.go b/provider/internal/queue/queue_test.go index d0250e995..be2874669 100644 --- a/provider/internal/queue/queue_test.go +++ b/provider/internal/queue/queue_test.go @@ -118,3 +118,21 @@ func TestInitializationWithManyCids(t *testing.T) { assertOrdered(cids, queue, t) } + +func TestDeduplicateCids(t *testing.T) { + ds := sync.MutexWrap(datastore.NewMapDatastore()) + queue := New(ds) + defer queue.Close() + + cids := random.Cids(5) + queue.Enqueue(cids[0]) + queue.Enqueue(cids[0]) + queue.Enqueue(cids[1]) + queue.Enqueue(cids[2]) + queue.Enqueue(cids[1]) + queue.Enqueue(cids[3]) + queue.Enqueue(cids[0]) + queue.Enqueue(cids[4]) + + assertOrdered(cids, queue, t) +} From 5955d27f01d98983579334344afeb683ac917350 Mon Sep 17 00:00:00 2001 From: guillaumemichel Date: Wed, 16 Apr 2025 09:52:10 +0200 Subject: [PATCH 2/5] changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index c9509bdc8..5da07dd3d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -36,6 +36,7 @@ The following emojis are used to highlight certain changes: - upgrade to `go-libp2p` [v0.41.1](https://github.com/libp2p/go-libp2p/releases/tag/v0.41.1) - `bitswap/network`: Add a new `requests_in_flight` metric gauge that measures how many bitswap streams are being written or read at a given time. - improve speed of data onboarding by batching/bufering provider queue writes [#888](https://github.com/ipfs/boxo/pull/888) +- `provider/queue` deduplicates CIDs [#910](https://github.com/ipfs/boxo/pull/910) ### Removed From 969cfb5fe5a189ee38f0d3fbc52f66d49d5b2f2b Mon Sep 17 00:00:00 2001 From: guillaumemichel Date: Tue, 22 Apr 2025 13:55:24 +0200 Subject: [PATCH 3/5] address review --- provider/internal/queue/queue.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/provider/internal/queue/queue.go b/provider/internal/queue/queue.go index 4f804b502..4ad59f357 100644 --- a/provider/internal/queue/queue.go +++ b/provider/internal/queue/queue.go @@ -23,6 +23,9 @@ const ( // batchSize is the limit on number of CIDs kept in memory at which ther // are all written to the datastore. batchSize = 16 * 1024 + // dedupCacheSize is the size of the LRU cache used to deduplicate CIDs in + // the queue. + dedupCacheSize = 1024 // idleWriteTime is the amout of time to check if the queue has been idle // (no input or output). If the queue has been idle since the last check, // then write all buffered CIDs to the datastore. @@ -134,7 +137,7 @@ func (q *Queue) worker(ctx context.Context) { const baseCap = 1024 inBuf.SetBaseCap(baseCap) k := datastore.Key{} - dedupCache, _ := lru.New[cid.Cid, struct{}](baseCap) + dedupCache, _ := lru.New[cid.Cid, struct{}](dedupCacheSize) defer func() { if c != cid.Undef { @@ -287,8 +290,7 @@ func (q *Queue) commitInput(ctx context.Context, counter uint64, cids *deque.Deq } cstr := makeCidString(cids.Front()) - n := cids.Len() - for i := range n { + for i := range cids.Len() { c := cids.At(i) key := datastore.NewKey(fmt.Sprintf("%020d/%s", counter, cstr)) if err = b.Put(ctx, key, c.Bytes()); err != nil { From c745bf7100713c5a59c821931cff90a47ecfdfcf Mon Sep 17 00:00:00 2001 From: guillaumemichel Date: Wed, 23 Apr 2025 09:56:56 +0200 Subject: [PATCH 4/5] update recentness on cache hit --- provider/internal/queue/queue.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/provider/internal/queue/queue.go b/provider/internal/queue/queue.go index 4ad59f357..8c4f137ce 100644 --- a/provider/internal/queue/queue.go +++ b/provider/internal/queue/queue.go @@ -212,6 +212,8 @@ func (q *Queue) worker(ctx context.Context) { return } if found, _ := dedupCache.ContainsOrAdd(toQueue, struct{}{}); found { + // update recentness in LRU cache + dedupCache.Add(toQueue, struct{}{}) continue } idle = false From 968098c5698669701eba0924e23628e6364af44f Mon Sep 17 00:00:00 2001 From: guillaumemichel Date: Wed, 23 Apr 2025 10:00:51 +0200 Subject: [PATCH 5/5] increase dedupCacheSize --- provider/internal/queue/queue.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/provider/internal/queue/queue.go b/provider/internal/queue/queue.go index 8c4f137ce..fbda5d7c4 100644 --- a/provider/internal/queue/queue.go +++ b/provider/internal/queue/queue.go @@ -25,7 +25,7 @@ const ( batchSize = 16 * 1024 // dedupCacheSize is the size of the LRU cache used to deduplicate CIDs in // the queue. - dedupCacheSize = 1024 + dedupCacheSize = 2 * 1024 // idleWriteTime is the amout of time to check if the queue has been idle // (no input or output). If the queue has been idle since the last check, // then write all buffered CIDs to the datastore.