Skip to content

Commit 69f86b3

Browse files
committed
perf(db,index): migrate fully to uid-first indexing and drop key-based path
Upgrade protofilters and switch index integration to UID transactions end-to-end. Remove unused key-based store/tx implementation, and route indexed tx.Get through UID find options (offset/limit/reverse) to reduce allocations and latency. Signed-off-by: Adphi <philippe.adrien.nousse@gmail.com>
1 parent 9f1cecd commit 69f86b3

5 files changed

Lines changed: 149 additions & 160 deletions

File tree

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ require (
2828
go.linka.cloud/protoc-gen-defaults v0.4.0
2929
go.linka.cloud/protoc-gen-go-fields v0.4.0
3030
go.linka.cloud/protoc-gen-proxy v0.0.0-20230802234945-cc173b85cf13
31-
go.linka.cloud/protofilters v0.9.1-0.20260305145701-8dffea6caa89
31+
go.linka.cloud/protofilters v0.9.1-0.20260308143537-aeb5a53b4953
3232
go.linka.cloud/pubsub v0.0.0-20220728154114-8213058139f3
3333
go.opentelemetry.io/otel v1.36.0
3434
go.opentelemetry.io/otel/trace v1.36.0

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -900,6 +900,8 @@ go.linka.cloud/protofilters v0.9.1-0.20260226141520-e8496a545e49 h1:WgE+7/KFse9I
900900
go.linka.cloud/protofilters v0.9.1-0.20260226141520-e8496a545e49/go.mod h1:CTyTQrkSBCYR2Yn/mifQsP1c7KDmSxv/Kdh5DGgqWnc=
901901
go.linka.cloud/protofilters v0.9.1-0.20260305145701-8dffea6caa89 h1:gug/7sP8vr479qR/eCK1S6tMwsx3WqlG8zqIyBeEl/A=
902902
go.linka.cloud/protofilters v0.9.1-0.20260305145701-8dffea6caa89/go.mod h1:CTyTQrkSBCYR2Yn/mifQsP1c7KDmSxv/Kdh5DGgqWnc=
903+
go.linka.cloud/protofilters v0.9.1-0.20260308143537-aeb5a53b4953 h1:53Lr9sLzHHWGamDfz0//8K47Vy5fSCw/L2j5k5XFtmM=
904+
go.linka.cloud/protofilters v0.9.1-0.20260308143537-aeb5a53b4953/go.mod h1:CTyTQrkSBCYR2Yn/mifQsP1c7KDmSxv/Kdh5DGgqWnc=
903905
go.linka.cloud/pubsub v0.0.0-20220728154114-8213058139f3 h1:8qfogYXX5OrCrDa8CI9w6K1NnEYCz27n32bDveCoCvM=
904906
go.linka.cloud/pubsub v0.0.0-20220728154114-8213058139f3/go.mod h1:n1VFgi7RahshaOFJYKThBv/LwaMxKoBTfymxVWRJHJk=
905907
go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU=

internal/db/tx.go

Lines changed: 43 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ import (
2121
"encoding/base64"
2222
"errors"
2323
"fmt"
24-
"sort"
2524
"strings"
2625
"sync"
2726
"time"
@@ -36,6 +35,7 @@ import (
3635
"google.golang.org/protobuf/reflect/protoreflect"
3736

3837
"go.linka.cloud/protodb/internal/badgerd"
38+
idxstore "go.linka.cloud/protodb/internal/index"
3939
"go.linka.cloud/protodb/internal/protodb"
4040
"go.linka.cloud/protodb/internal/token"
4141
)
@@ -219,56 +219,66 @@ func (tx *tx) get(ctx context.Context, m proto.Message, opts ...protodb.GetOptio
219219
}
220220
span.SetAttributes(attribute.Bool("use_index", useIndex))
221221
if useIndex {
222-
keys, _, err := idxr.FindKeys(ctx, tx, m.ProtoReflect().Descriptor().FullName(), o.Filter)
222+
windowLimit := o.Paging.GetLimit()
223+
if o.One && (windowLimit == 0 || windowLimit > 1) {
224+
windowLimit = 1
225+
}
226+
findOpts := idxstore.FindOpts{Reverse: o.Reverse}
227+
if !hasContinuationToken {
228+
findOpts.Offset = o.Paging.GetOffset()
229+
if windowLimit != 0 {
230+
findOpts.Limit = windowLimit + 1
231+
}
232+
}
233+
uids, err := idxr.FindUIDs(ctx, tx, m.ProtoReflect().Descriptor().FullName(), o.Filter, findOpts)
223234
if err != nil {
224235
return nil, nil, err
225236
}
226-
sort.Strings(keys)
227-
if o.Reverse {
228-
for i, j := 0, len(keys)-1; i < j; i, j = i+1, j-1 {
229-
keys[i], keys[j] = keys[j], keys[i]
230-
}
237+
hasNext := false
238+
if !hasContinuationToken && windowLimit != 0 && uint64(len(uids)) > windowLimit {
239+
hasNext = true
240+
uids = uids[:windowLimit]
231241
}
232242
var (
233-
hasNext = false
234-
matched = uint64(0)
235243
reads int
236244
readBytes int
237245
resultsBytes int
238246
)
239-
windowStart := uint64(0)
240-
windowLimit := o.Paging.GetLimit()
241-
if !hasContinuationToken {
242-
windowStart = o.Paging.GetOffset()
243-
}
244-
if o.One && (windowLimit == 0 || windowLimit > 1) {
245-
windowLimit = 1
246-
}
247-
for _, key := range keys {
247+
for _, uid := range uids {
248248
if err := ctx.Err(); err != nil {
249249
return nil, nil, err
250250
}
251-
fullKey := stringToBytesNoCopy(key)
252-
if !bytes.HasPrefix(fullKey, prefix) {
251+
uidItem, err := tx.txn.Get(ctx, protodb.UIDKey(uid))
252+
if err != nil {
253+
if errors.Is(err, badger.ErrKeyNotFound) {
254+
continue
255+
}
256+
return nil, nil, err
257+
}
258+
var fullKey string
259+
if err := uidItem.Value(func(val []byte) error {
260+
fullKey = string(val)
261+
return nil
262+
}); err != nil {
263+
return nil, nil, err
264+
}
265+
fullKeyBytes := stringToBytesNoCopy(fullKey)
266+
if !bytes.HasPrefix(fullKeyBytes, prefix) {
253267
continue
254268
}
255269
if hasContinuationToken {
256-
if !o.Reverse && bytes.Compare(fullKey, inToken.GetLastPrefix()) <= 0 {
270+
if !o.Reverse && bytes.Compare(fullKeyBytes, inToken.GetLastPrefix()) <= 0 {
257271
continue
258272
}
259-
if o.Reverse && bytes.Compare(fullKey, inToken.GetLastPrefix()) >= 0 {
273+
if o.Reverse && bytes.Compare(fullKeyBytes, inToken.GetLastPrefix()) >= 0 {
260274
continue
261275
}
276+
if windowLimit != 0 && uint64(len(out)) >= windowLimit {
277+
hasNext = true
278+
break
279+
}
262280
}
263-
matched++
264-
if matched <= windowStart {
265-
continue
266-
}
267-
if windowLimit != 0 && uint64(len(out)) >= windowLimit {
268-
hasNext = true
269-
break
270-
}
271-
item, err := tx.txn.Get(ctx, fullKey)
281+
item, err := tx.txn.Get(ctx, fullKeyBytes)
272282
if err != nil {
273283
if errors.Is(err, badger.ErrKeyNotFound) {
274284
continue
@@ -288,13 +298,13 @@ func (tx *tx) get(ctx context.Context, m proto.Message, opts ...protodb.GetOptio
288298
}); err != nil {
289299
return nil, nil, err
290300
}
291-
outToken.LastPrefix = append(outToken.LastPrefix[:0], fullKey...)
301+
outToken.LastPrefix = append(outToken.LastPrefix[:0], fullKeyBytes...)
292302
if o.FieldMask != nil {
293303
if err := FilterFieldMask(v, o.FieldMask); err != nil {
294304
return nil, nil, err
295305
}
296306
}
297-
tx.txn.AddReadKey(fullKey)
307+
tx.txn.AddReadKey(fullKeyBytes)
298308
out = append(out, v)
299309
resultsBytes += int(size)
300310
if o.One {

internal/index/indexer.go

Lines changed: 70 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,12 @@ type Indexer struct {
6161
entries map[protoreflect.FullName]entryCache
6262
}
6363

64+
type FindOpts struct {
65+
Offset uint64
66+
Limit uint64
67+
Reverse bool
68+
}
69+
6470
type entryCache struct {
6571
sig string
6672
entries []string
@@ -270,9 +276,43 @@ func (idx *Indexer) FindKeys(ctx context.Context, tx Tx, name protoreflect.FullN
270276
ctx, span := idxTracer.Start(ctx, "Indexer.FindKeys")
271277
defer span.End()
272278
span.SetAttributes(attribute.String("index.type", string(name)))
273-
ix := pfindex.New(newTxStore(tx.Txn(), idx.reg), idx.Selector)
274-
keys, collisions, err := ix.Find(ctx, name, f)
275-
return keys, collisions, err
279+
uids, err := idx.FindUIDs(ctx, tx, name, f, FindOpts{})
280+
if err != nil {
281+
return nil, nil, err
282+
}
283+
keys := make([]string, 0)
284+
for _, uid := range uids {
285+
item, err := tx.Txn().Get(ctx, protodb.UIDKey(uid))
286+
if err != nil {
287+
if errors.Is(err, badger.ErrKeyNotFound) {
288+
continue
289+
}
290+
return nil, nil, err
291+
}
292+
var key string
293+
if err := item.Value(func(val []byte) error {
294+
key = string(val)
295+
return nil
296+
}); err != nil {
297+
return nil, nil, err
298+
}
299+
if key != "" {
300+
keys = append(keys, key)
301+
}
302+
}
303+
return keys, nil, nil
304+
}
305+
306+
func (idx *Indexer) FindUIDs(ctx context.Context, tx Tx, name protoreflect.FullName, f protodb.Filter, opts FindOpts) ([]uint64, error) {
307+
ix := pfindex.NewUID(newUIDTxStore(tx.Txn(), idx.reg), idx.Selector)
308+
out := make([]uint64, 0)
309+
for uid, err := range ix.Find(ctx, name, f, pfindex.FindOptions{Offset: opts.Offset, Limit: opts.Limit, Reverse: opts.Reverse}) {
310+
if err != nil {
311+
return nil, err
312+
}
313+
out = append(out, uid)
314+
}
315+
return out, nil
276316
}
277317

278318
func (idx *Indexer) Insert(ctx context.Context, tx Tx, key []byte, m proto.Message) error {
@@ -281,8 +321,15 @@ func (idx *Indexer) Insert(ctx context.Context, tx Tx, key []byte, m proto.Messa
281321
if m != nil {
282322
span.SetAttributes(attribute.String("index.type", string(m.ProtoReflect().Descriptor().FullName())))
283323
}
284-
ix := pfindex.New(newTxStore(tx.Txn(), idx.reg), idx.Selector)
285-
return ix.Insert(ctx, string(key), m)
324+
uid, ok, err := tx.UID(ctx, key, false)
325+
if err != nil {
326+
return err
327+
}
328+
if !ok {
329+
return badger.ErrKeyNotFound
330+
}
331+
ix := pfindex.NewUID(newUIDTxStore(tx.Txn(), idx.reg), idx.Selector)
332+
return ix.Insert(ctx, uid, m)
286333
}
287334

288335
func (idx *Indexer) Update(ctx context.Context, tx Tx, key []byte, oldMsg, newMsg proto.Message) error {
@@ -295,25 +342,31 @@ func (idx *Indexer) Update(ctx context.Context, tx Tx, key []byte, oldMsg, newMs
295342
attribute.Bool("index.has_old", oldMsg != nil),
296343
attribute.Bool("index.has_new", newMsg != nil),
297344
)
298-
ix := pfindex.New(newTxStore(tx.Txn(), idx.reg), idx.Selector)
345+
uid, ok, err := tx.UID(ctx, key, false)
346+
if err != nil {
347+
return err
348+
}
349+
if !ok {
350+
return badger.ErrKeyNotFound
351+
}
352+
ix := pfindex.NewUID(newUIDTxStore(tx.Txn(), idx.reg), idx.Selector)
299353
if oldMsg != nil {
300-
txv, err := newTxStore(tx.Txn(), idx.reg).Tx(ctx)
354+
txv, err := newUIDTxStore(tx.Txn(), idx.reg).Tx(ctx)
301355
if err != nil {
302356
return err
303357
}
304-
if err := idx.removeValues(ctx, txv, key, oldMsg.ProtoReflect()); err != nil {
358+
if err := idx.removeValues(ctx, txv, uid, oldMsg.ProtoReflect()); err != nil {
305359
return err
306360
}
307361
}
308362
if newMsg == nil {
309363
return nil
310364
}
311-
return ix.Insert(ctx, string(key), newMsg)
365+
return ix.Insert(ctx, uid, newMsg)
312366
}
313367

314-
func (idx *Indexer) removeValues(ctx context.Context, tx pfindex.Tx, key []byte, msg protoreflect.Message, fds ...protoreflect.FieldDescriptor) error {
368+
func (idx *Indexer) removeValues(ctx context.Context, tx pfindex.UIDTx, uid uint64, msg protoreflect.Message, fds ...protoreflect.FieldDescriptor) error {
315369
fields := msg.Descriptor().Fields()
316-
keyStr := string(key)
317370
for i := 0; i < fields.Len(); i++ {
318371
fd := fields.Get(i)
319372
path := append(fds, fd)
@@ -326,7 +379,7 @@ func (idx *Indexer) removeValues(ctx context.Context, tx pfindex.Tx, key []byte,
326379
list := rval.List()
327380
for j := 0; j < list.Len(); j++ {
328381
child := list.Get(j).Message()
329-
if err := idx.removeValues(ctx, tx, key, child, path...); err != nil {
382+
if err := idx.removeValues(ctx, tx, uid, child, path...); err != nil {
330383
return err
331384
}
332385
}
@@ -341,7 +394,7 @@ func (idx *Indexer) removeValues(ctx context.Context, tx pfindex.Tx, key []byte,
341394
}
342395
list := rval.List()
343396
for j := 0; j < list.Len(); j++ {
344-
if err := tx.Remove(ctx, keyStr, fd, list.Get(j)); err != nil {
397+
if err := tx.RemoveUID(ctx, uid, fd, list.Get(j)); err != nil {
345398
return err
346399
}
347400
}
@@ -351,7 +404,7 @@ func (idx *Indexer) removeValues(ctx context.Context, tx pfindex.Tx, key []byte,
351404
if !rval.Message().IsValid() {
352405
continue
353406
}
354-
if err := idx.removeValues(ctx, tx, key, rval.Message(), path...); err != nil {
407+
if err := idx.removeValues(ctx, tx, uid, rval.Message(), path...); err != nil {
355408
return err
356409
}
357410
continue
@@ -366,7 +419,7 @@ func (idx *Indexer) removeValues(ctx context.Context, tx pfindex.Tx, key []byte,
366419
if !ok {
367420
continue
368421
}
369-
if err := tx.Remove(ctx, keyStr, fd, rval); err != nil {
422+
if err := tx.RemoveUID(ctx, uid, fd, rval); err != nil {
370423
return err
371424
}
372425
}
@@ -520,7 +573,7 @@ func (idx *Indexer) addIndexEntries(ctx context.Context, tx Tx, md protoreflect.
520573
_, ok := allowed[path]
521574
return ok, nil
522575
}
523-
partial := pfindex.New(newTxStore(tx.Txn(), idx.reg), selector)
576+
partial := pfindex.NewUID(newUIDTxStore(tx.Txn(), idx.reg), selector)
524577
dataPrefix := fmt.Appendf(nil, "%s/%s/", protodb.Data, md.FullName())
525578
dataIt := tx.Txn().Iterator(badger.IteratorOptions{Prefix: dataPrefix, PrefetchValues: false})
526579
defer dataIt.Close()
@@ -550,7 +603,7 @@ func (idx *Indexer) addIndexEntries(ctx context.Context, tx Tx, md protoreflect.
550603
return err
551604
}
552605
}
553-
if err := partial.Insert(ctx, string(key), msg); err != nil {
606+
if err := partial.Insert(ctx, uid, msg); err != nil {
554607
return err
555608
}
556609
}

0 commit comments

Comments
 (0)