Skip to content

Commit 38532d8

Browse files
committed
perf(pending): reduce WAL allocation churn and add safe raw replay for replication
Store WAL pointers by value, add ReplayRaw to avoid per-entry replay allocations, and update replication commit paths to copy replayed key/value bytes before handing them to Badger to prevent mmap-backed slice faults. Signed-off-by: Adphi <philippe.adrien.nousse@gmail.com>
1 parent 51836fa commit 38532d8

9 files changed

Lines changed: 77 additions & 41 deletions

File tree

internal/badgerd/pending/memory.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,15 @@ func (m *mem) Replay(fn func(e *badger.Entry) error) error {
117117
return nil
118118
}
119119

120+
func (m *mem) ReplayRaw(fn func(key, value []byte, userMeta byte, expiresAt uint64) error) error {
121+
for _, e := range m.m {
122+
if err := fn(e.Key, e.Value, e.UserMeta, e.ExpiresAt); err != nil {
123+
return err
124+
}
125+
}
126+
return nil
127+
}
128+
120129
func (m *mem) Close() error {
121130
return nil
122131
}

internal/badgerd/pending/wal.go

Lines changed: 34 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -88,9 +88,9 @@ func newWal(path string, tx *badger.Txn, m *mem, size int64) *wal {
8888
y.Check(err)
8989
}
9090
if m == nil {
91-
return &wal{f: f, m: make(map[uint64]*pointer), dirty: true}
91+
return &wal{f: f, m: make(map[uint64]pointer), dirty: true}
9292
}
93-
w := &wal{f: f, tx: tx, m: make(map[uint64]*pointer, len(m.m)), dirty: true}
93+
w := &wal{f: f, tx: tx, m: make(map[uint64]pointer, len(m.m)), dirty: true}
9494
for _, e := range m.m {
9595
y.Check(w.append(e))
9696
}
@@ -116,7 +116,7 @@ func walInitSize(max int64) int {
116116
type wal struct {
117117
tx *badger.Txn
118118
f *z.MmapFile
119-
m map[uint64]*pointer
119+
m map[uint64]pointer
120120
pos int64
121121
o sync.Once
122122

@@ -190,7 +190,7 @@ func (w *wal) append(e *badger.Entry) error {
190190
if err != nil {
191191
return err
192192
}
193-
p := &pointer{
193+
p := pointer{
194194
key: e.Key,
195195
offset: uint32(w.pos),
196196
len: l,
@@ -219,35 +219,47 @@ func (w *wal) read(key []byte, cpy bool) (*badger.Entry, error) {
219219
return w.readPtr(p, cpy)
220220
}
221221

222-
func (w *wal) readPtr(p *pointer, cpy bool) (*badger.Entry, error) {
222+
func (w *wal) readPtrRaw(p pointer) (k, v []byte, userMeta byte, expiresAt uint64) {
223223
buf := w.f.Slice(int(p.offset))
224-
var e walEntry
225-
e.h.decode(buf)
226-
e.k = buf[headerSize : headerSize+e.h.klen]
227-
e.v = buf[headerSize+e.h.klen : headerSize+e.h.klen+e.h.vlen]
224+
var h header
225+
h.decode(buf)
226+
k = buf[headerSize : headerSize+h.klen]
227+
v = buf[headerSize+h.klen : headerSize+h.klen+h.vlen]
228+
return k, v, h.userMeta, h.expiresAt
229+
}
230+
231+
func (w *wal) readPtr(p pointer, cpy bool) (*badger.Entry, error) {
232+
k, v, userMeta, expiresAt := w.readPtrRaw(p)
228233
if cpy {
229-
e.k = y.SafeCopy(nil, e.k)
230-
e.v = y.SafeCopy(nil, e.v)
234+
k = y.SafeCopy(nil, k)
235+
v = y.SafeCopy(nil, v)
231236
}
232237
return &badger.Entry{
233-
Key: e.k,
234-
Value: e.v,
235-
UserMeta: e.h.userMeta,
236-
ExpiresAt: e.h.expiresAt,
238+
Key: k,
239+
Value: v,
240+
UserMeta: userMeta,
241+
ExpiresAt: expiresAt,
237242
}, nil
238243
}
239244

240-
func (w *wal) Replay(fn func(e *badger.Entry) error) error {
241-
for _, v := range w.m {
242-
e, err := w.readPtr(v, true)
243-
if err != nil {
245+
func (w *wal) ReplayRaw(fn func(key, value []byte, userMeta byte, expiresAt uint64) error) error {
246+
for _, p := range w.m {
247+
k, v, userMeta, expiresAt := w.readPtrRaw(p)
248+
if err := fn(k, v, userMeta, expiresAt); err != nil {
244249
return err
245250
}
251+
}
252+
return nil
253+
}
254+
255+
func (w *wal) Replay(fn func(e *badger.Entry) error) error {
256+
return w.ReplayRaw(func(key, value []byte, userMeta byte, expiresAt uint64) error {
257+
e := &badger.Entry{Key: y.SafeCopy(nil, key), Value: y.SafeCopy(nil, value), UserMeta: userMeta, ExpiresAt: expiresAt}
246258
if err := fn(e); err != nil {
247259
return err
248260
}
249-
}
250-
return nil
261+
return nil
262+
})
251263
}
252264

253265
func (w *wal) newIterator(prefix []byte, readTs uint64, reversed bool) iterator {
@@ -287,7 +299,7 @@ func (w *wal) rebuildEntries() {
287299
type entry struct {
288300
h uint64
289301
key []byte
290-
p *pointer
302+
p pointer
291303
}
292304

293305
type walIterator struct {

internal/badgerd/pending/wal_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ func TestWal(t *testing.T) {
6363
t.Run("Set", func(t *testing.T) {
6464
w.Set(e)
6565
assert.Len(t, w.m, 1)
66-
assert.Equal(t, &pointer{key: e.Key, len: s}, w.m[z.MemHash(e.Key)])
66+
assert.Equal(t, pointer{key: e.Key, len: s}, w.m[z.MemHash(e.Key)])
6767
require.NoError(t, w.Replay(func(e *badger.Entry) error {
6868
assert.Equal(t, entries[0].Key, e.Key)
6969
assert.Equal(t, entries[0].Value, e.Value)
@@ -74,7 +74,7 @@ func TestWal(t *testing.T) {
7474
t.Run("Delete", func(t *testing.T) {
7575
w.Delete(e.Key)
7676
// memfile.AllocateSlice add 4 bytes where it stores the slice length
77-
assert.Equal(t, &pointer{key: e.Key, offset: s + 4, len: uint32(headerSize + len(e.Key)), deleted: true}, w.m[z.MemHash(e.Key)])
77+
assert.Equal(t, pointer{key: e.Key, offset: s + 4, len: uint32(headerSize + len(e.Key)), deleted: true}, w.m[z.MemHash(e.Key)])
7878
require.NoError(t, w.Replay(func(e *badger.Entry) error {
7979
assert.Equal(t, entries[0].Key, e.Key)
8080
assert.Equal(t, BitDelete, e.UserMeta)

internal/badgerd/pending/writes.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ type Writes interface {
2323
Get(key []byte) (Item, error)
2424
Set(e *badger.Entry)
2525
Delete(key []byte)
26+
ReplayRaw(fn func(key, value []byte, userMeta byte, expiresAt uint64) error) error
2627
Replay(fn func(e *badger.Entry) error) error
2728
Close() error
2829
}
@@ -96,6 +97,10 @@ func (w *writes) Replay(fn func(e *badger.Entry) error) error {
9697
return w.c.Replay(fn)
9798
}
9899

100+
func (w *writes) ReplayRaw(fn func(key, value []byte, userMeta byte, expiresAt uint64) error) error {
101+
return w.c.ReplayRaw(fn)
102+
}
103+
99104
func (w *writes) Close() error {
100105
return w.c.Close()
101106
}

internal/badgerd/replication/gossip/service.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -105,11 +105,13 @@ func (r *Gossip) Replicate(ss pb2.ReplicationService_ReplicateServer) error {
105105
defer r.txnMark.Done(cmsg.Commit.At)
106106
log.Infof("commit transaction at %d", cmsg.Commit.At)
107107
batch = r.db.NewWriteBatchAt(cmsg.Commit.At)
108-
y.Check(w.Replay(func(e *badger.Entry) error {
109-
if e.UserMeta != 0 {
110-
return batch.DeleteAt(e.Key, cmsg.Commit.At)
108+
y.Check(w.ReplayRaw(func(key, value []byte, userMeta byte, expiresAt uint64) error {
109+
k := y.SafeCopy(nil, key)
110+
if userMeta != 0 {
111+
return batch.DeleteAt(k, cmsg.Commit.At)
111112
}
112-
return batch.SetEntryAt(e, cmsg.Commit.At)
113+
v := y.SafeCopy(nil, value)
114+
return batch.SetEntryAt(&badger.Entry{Key: k, Value: v, ExpiresAt: expiresAt, UserMeta: userMeta}, cmsg.Commit.At)
113115
}))
114116
y.Check(batch.Flush())
115117
r.db.SetVersion(cmsg.Commit.At)

internal/badgerd/replication/gossip/tx.go

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"sync/atomic"
2525

2626
"github.com/dgraph-io/badger/v3"
27+
"github.com/dgraph-io/badger/v3/y"
2728
"go.linka.cloud/grpc-toolkit/logger"
2829
"go.uber.org/multierr"
2930
"google.golang.org/protobuf/encoding/protowire"
@@ -147,11 +148,13 @@ func (r *tx) Commit(ctx context.Context, at uint64) error {
147148
}
148149
b := r.db.NewWriteBatchAt(r.readTs)
149150
defer b.Cancel()
150-
if err := r.w.Replay(func(e *badger.Entry) error {
151-
if e.UserMeta != 0 {
152-
return b.DeleteAt(e.Key, at)
151+
if err := r.w.ReplayRaw(func(key, value []byte, userMeta byte, expiresAt uint64) error {
152+
k := y.SafeCopy(nil, key)
153+
if userMeta != 0 {
154+
return b.DeleteAt(k, at)
153155
}
154-
return b.SetEntryAt(e, at)
156+
v := y.SafeCopy(nil, value)
157+
return b.SetEntryAt(&badger.Entry{Key: k, Value: v, ExpiresAt: expiresAt, UserMeta: userMeta}, at)
155158
}); err != nil {
156159
return err
157160
}
@@ -277,7 +280,7 @@ func deleteOpWireSize(key []byte) int {
277280
func (r *tx) send(ctx context.Context, req *pb.ReplicateRequest) error {
278281
if r.mode == replication.ModeAsync {
279282
for _, v := range r.streams() {
280-
a := v.q.Send(req)
283+
a := v.q.Send(req.CloneVT())
281284
r.swg.Add(1)
282285
go func(v *stream, a async.Async[*pb.Ack]) {
283286
defer r.swg.Done()

internal/badgerd/replication/maybe.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"context"
1919

2020
"github.com/dgraph-io/badger/v3"
21+
"github.com/dgraph-io/badger/v3/y"
2122

2223
"go.linka.cloud/protodb/internal/badgerd/pending"
2324
)
@@ -80,11 +81,13 @@ func (s *Maybe) CommitAt(ctx context.Context, at uint64) error {
8081
}
8182
b := s.DB.NewWriteBatchAt(s.readTs)
8283
defer b.Cancel()
83-
if err := s.w.Replay(func(e *badger.Entry) error {
84-
if e.UserMeta != 0 {
85-
return b.DeleteAt(e.Key, at)
84+
if err := s.w.ReplayRaw(func(key, value []byte, userMeta byte, expiresAt uint64) error {
85+
k := y.SafeCopy(nil, key)
86+
if userMeta != 0 {
87+
return b.DeleteAt(k, at)
8688
}
87-
return b.SetEntryAt(e, at)
89+
v := y.SafeCopy(nil, value)
90+
return b.SetEntryAt(&badger.Entry{Key: k, Value: v, ExpiresAt: expiresAt, UserMeta: userMeta}, at)
8891
}); err != nil {
8992
return err
9093
}

internal/db/db.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,7 @@ func (db *db) Watch(ctx context.Context, m proto.Message, opts ...protodb.GetOpt
185185
}
186186

187187
o := makeGetOpts(opts...)
188+
matcher := pf.NewMatcher()
188189

189190
k, _, _, _ := protodb.DataPrefix(m)
190191
log := logger.C(ctx).WithFields("service", "protodb", "action", "watch", "key", string(k))
@@ -255,14 +256,14 @@ func (db *db) Watch(ctx context.Context, m proto.Message, opts ...protodb.GetOpt
255256
}
256257
var was bool
257258
if old != nil {
258-
was, err = pf.Match(old, o.Filter)
259+
was, err = matcher.Match(old, o.Filter)
259260
if err != nil {
260261
return err
261262
}
262263
}
263264
var is bool
264265
if new != nil {
265-
is, err = pf.Match(new, o.Filter)
266+
is, err = matcher.Match(new, o.Filter)
266267
if err != nil {
267268
return err
268269
}

internal/db/tx.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@ func (tx *tx) get(ctx context.Context, m proto.Message, opts ...protodb.GetOptio
119119
return nil, nil, badger.ErrDBClosed
120120
}
121121
o := makeGetOpts(opts...)
122+
matcher := pf.NewMatcher()
122123
prefix, field, value, _ := protodb.DataPrefix(m)
123124
span.SetAttributes(
124125
attribute.String("prefix", string(prefix)),
@@ -149,7 +150,7 @@ func (tx *tx) get(ctx context.Context, m proto.Message, opts ...protodb.GetOptio
149150
return nil, nil, err
150151
}
151152
if o.Filter != nil {
152-
match, err := pf.Match(v, o.Filter)
153+
match, err := matcher.Match(v, o.Filter)
153154
if err != nil {
154155
return nil, nil, err
155156
}
@@ -345,7 +346,7 @@ func (tx *tx) get(ctx context.Context, m proto.Message, opts ...protodb.GetOptio
345346
return err
346347
}
347348
if o.Filter != nil {
348-
match, err = pf.Match(v, o.Filter)
349+
match, err = matcher.Match(v, o.Filter)
349350
if err != nil {
350351
return err
351352
}

0 commit comments

Comments
 (0)