Skip to content

Commit 2d3dfd7

Browse files
committed
De-duplicate batched writes by key in cached store
1 parent aca2ec7 commit 2d3dfd7

2 files changed

Lines changed: 46 additions & 5 deletions

File tree

pkg/store/cached_store.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ func (cs *CachedStore) startWriteLoop() {
121121
ops := []asyncWriteOp{op}
122122

123123
timer := time.NewTimer(batchWindow)
124-
collect:
124+
collect:
125125
for {
126126
select {
127127
case op, ok := <-cs.writeCh:
@@ -135,17 +135,22 @@ func (cs *CachedStore) startWriteLoop() {
135135
}
136136
}
137137

138+
last := make(map[string]asyncWriteOp, len(ops))
139+
for _, o := range ops {
140+
last[o.key] = o
141+
}
142+
138143
var puts []MetadataKV
139144
var deletes []string
140-
for _, o := range ops {
145+
for _, o := range last {
141146
if o.isDelete {
142147
deletes = append(deletes, o.key)
143148
} else {
144149
puts = append(puts, MetadataKV{Key: o.key, Value: o.value})
145150
}
146151
}
147152

148-
if err := cs.Store.BatchMetadata(context.Background(), puts, deletes); err != nil {
153+
if err := cs.BatchMetadata(context.Background(), puts, deletes); err != nil {
149154
for _, o := range ops {
150155
cs.logger.Error().Err(err).Str("key", o.key).
151156
Bool("delete", o.isDelete).

pkg/store/cached_store_test.go

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"testing"
77
"time"
88

9+
ds "github.com/ipfs/go-datastore"
910
"github.com/rs/zerolog"
1011
"github.com/stretchr/testify/assert"
1112
"github.com/stretchr/testify/require"
@@ -337,7 +338,7 @@ func TestCachedStore_Close_FlushesPendingWrites(t *testing.T) {
337338
cs.startWriteLoop()
338339

339340
const n = 100
340-
for i := 0; i < n; i++ {
341+
for i := range n {
341342
k := fmt.Sprintf("key-%d", i)
342343
require.NoError(t, cs.SetMetadata(ctx, k, []byte(k)))
343344
}
@@ -348,7 +349,7 @@ func TestCachedStore_Close_FlushesPendingWrites(t *testing.T) {
348349
cs.stopMu.Unlock()
349350
<-done
350351

351-
for i := 0; i < n; i++ {
352+
for i := range n {
352353
k := fmt.Sprintf("key-%d", i)
353354
v, err := base.GetMetadata(ctx, k)
354355
require.NoError(t, err)
@@ -372,3 +373,38 @@ func TestCachedStore_WriteAfterClose_FallsBack(t *testing.T) {
372373
err = cs.SetMetadata(ctx, "after", []byte("sync"))
373374
require.Error(t, err)
374375
}
376+
377+
func TestCachedStore_CoalescesSameKeyOps(t *testing.T) {
378+
ctx := context.Background()
379+
380+
kv, err := NewTestInMemoryKVStore()
381+
require.NoError(t, err)
382+
383+
require.NoError(t, kv.Put(ctx, ds.NewKey(GetMetaKey("k")), []byte("original")))
384+
385+
base := New(kv)
386+
387+
writeCh := make(chan asyncWriteOp, asyncWriteBufferSize)
388+
done := make(chan struct{})
389+
cs := &CachedStore{
390+
Store: base,
391+
writeCh: writeCh,
392+
done: done,
393+
logger: zerolog.Nop(),
394+
}
395+
cs.startWriteLoop()
396+
397+
require.NoError(t, cs.SetMetadata(ctx, "k", []byte("v1")))
398+
require.NoError(t, cs.DeleteMetadata(ctx, "k"))
399+
require.NoError(t, cs.SetMetadata(ctx, "k", []byte("v2")))
400+
401+
cs.stopMu.Lock()
402+
cs.stopped = true
403+
close(writeCh)
404+
cs.stopMu.Unlock()
405+
<-done
406+
407+
v, err := base.GetMetadata(ctx, "k")
408+
require.NoError(t, err)
409+
require.Equal(t, []byte("v2"), v, "last write (Set) should win over delete")
410+
}

0 commit comments

Comments
 (0)