Skip to content

Commit aca2ec7

Browse files
committed
feedback
1 parent 6c53bc4 commit aca2ec7

2 files changed

Lines changed: 58 additions & 35 deletions

File tree

pkg/store/cached_store.go

Lines changed: 29 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package store
33
import (
44
"context"
55
"sync"
6+
"time"
67

78
lru "github.com/hashicorp/golang-lru/v2"
89
"github.com/rs/zerolog"
@@ -18,6 +19,11 @@ const (
1819
DefaultBlockDataCacheSize = 200_000
1920

2021
asyncWriteBufferSize = 8192
22+
23+
// batchWindow is the time the write goroutine waits after receiving the first
24+
// op before flushing. This allows bursts of metadata writes (e.g. 3-4 per
25+
// height in the submitter) to be coalesced into a single Badger WriteBatch.
26+
batchWindow = 100 * time.Microsecond
2127
)
2228

2329
type asyncWriteOp struct {
@@ -113,13 +119,19 @@ func (cs *CachedStore) startWriteLoop() {
113119
defer close(cs.done)
114120
for op := range cs.writeCh {
115121
ops := []asyncWriteOp{op}
116-
drain:
122+
123+
timer := time.NewTimer(batchWindow)
124+
collect:
117125
for {
118126
select {
119-
case op := <-cs.writeCh:
127+
case op, ok := <-cs.writeCh:
128+
if !ok {
129+
timer.Stop()
130+
break collect
131+
}
120132
ops = append(ops, op)
121-
default:
122-
break drain
133+
case <-timer.C:
134+
break collect
123135
}
124136
}
125137

@@ -234,40 +246,33 @@ func (cs *CachedStore) PruneBlocks(ctx context.Context, height uint64) error {
234246
}
235247

236248
// SetMetadata queues an asynchronous metadata write. The write is persisted
237-
// by the background goroutine. If the buffer is full or the store has been
238-
// stopped, the write falls back to synchronous execution on the underlying store.
249+
// by the background goroutine via BatchMetadata. If the store has been stopped,
250+
// the write falls back to synchronous execution on the underlying store.
239251
func (cs *CachedStore) SetMetadata(ctx context.Context, key string, value []byte) error {
240252
cs.stopMu.RLock()
241253
if cs.stopped {
242254
cs.stopMu.RUnlock()
243255
return cs.Store.SetMetadata(ctx, key, value)
244256
}
245-
select {
246-
case cs.writeCh <- asyncWriteOp{key: key, value: value}:
247-
cs.stopMu.RUnlock()
248-
return nil
249-
default:
250-
cs.stopMu.RUnlock()
251-
return cs.Store.SetMetadata(ctx, key, value)
252-
}
257+
cs.stopMu.RUnlock()
258+
259+
valueCopy := append([]byte(nil), value...)
260+
cs.writeCh <- asyncWriteOp{key: key, value: valueCopy}
261+
return nil
253262
}
254263

255-
// DeleteMetadata queues an asynchronous metadata delete. If the buffer is full
256-
// or the store has been stopped, the delete falls back to synchronous execution.
264+
// DeleteMetadata queues an asynchronous metadata delete. If the store has been
265+
// stopped, the delete falls back to synchronous execution.
257266
func (cs *CachedStore) DeleteMetadata(ctx context.Context, key string) error {
258267
cs.stopMu.RLock()
259268
if cs.stopped {
260269
cs.stopMu.RUnlock()
261270
return cs.Store.DeleteMetadata(ctx, key)
262271
}
263-
select {
264-
case cs.writeCh <- asyncWriteOp{key: key, isDelete: true}:
265-
cs.stopMu.RUnlock()
266-
return nil
267-
default:
268-
cs.stopMu.RUnlock()
269-
return cs.Store.DeleteMetadata(ctx, key)
270-
}
272+
cs.stopMu.RUnlock()
273+
274+
cs.writeCh <- asyncWriteOp{key: key, isDelete: true}
275+
return nil
271276
}
272277

273278
// Close drains pending async writes, then closes the underlying store.

pkg/store/cached_store_test.go

Lines changed: 29 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,11 @@ package store
22

33
import (
44
"context"
5+
"fmt"
56
"testing"
67
"time"
78

9+
"github.com/rs/zerolog"
810
"github.com/stretchr/testify/assert"
911
"github.com/stretchr/testify/require"
1012

@@ -315,27 +317,43 @@ func TestCachedStore_AsyncDeleteMetadata(t *testing.T) {
315317
}
316318

317319
func TestCachedStore_Close_FlushesPendingWrites(t *testing.T) {
320+
ctx := context.Background()
321+
318322
kv, err := NewTestInMemoryKVStore()
319323
require.NoError(t, err)
320324

321325
base := New(kv)
322-
cs, err := NewCachedStore(base)
323-
require.NoError(t, err)
324326

325-
ctx := context.Background()
327+
writeCh := make(chan asyncWriteOp, asyncWriteBufferSize)
328+
done := make(chan struct{})
329+
330+
cs := &CachedStore{
331+
Store: base,
332+
writeCh: writeCh,
333+
done: done,
334+
logger: zerolog.Nop(),
335+
}
336+
337+
cs.startWriteLoop()
338+
326339
const n = 100
327340
for i := 0; i < n; i++ {
328-
k := []byte{byte(i)}
329-
require.NoError(t, cs.SetMetadata(ctx, string(k), k))
341+
k := fmt.Sprintf("key-%d", i)
342+
require.NoError(t, cs.SetMetadata(ctx, k, []byte(k)))
330343
}
331344

332-
// Wait for the last key to land via pass-through read
333-
require.Eventually(t, func() bool {
334-
v, err := cs.GetMetadata(ctx, string([]byte{byte(n - 1)}))
335-
return err == nil && len(v) == 1 && v[0] == byte(n-1)
336-
}, 2*time.Second, 10*time.Millisecond)
345+
cs.stopMu.Lock()
346+
cs.stopped = true
347+
close(writeCh)
348+
cs.stopMu.Unlock()
349+
<-done
337350

338-
require.NoError(t, cs.Close())
351+
for i := 0; i < n; i++ {
352+
k := fmt.Sprintf("key-%d", i)
353+
v, err := base.GetMetadata(ctx, k)
354+
require.NoError(t, err)
355+
require.Equal(t, []byte(k), v)
356+
}
339357
}
340358

341359
func TestCachedStore_WriteAfterClose_FallsBack(t *testing.T) {

0 commit comments

Comments
 (0)