Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,11 @@ func example() {
...
}

// Iterate over all objects in reverse order
for obj := range myObjects.AllReverse() {
...
}

// Iterate with revision
for obj, revision := range myObjects.All() {
...
Expand All @@ -147,6 +152,9 @@ func example() {
// Iterate objects where ID is between 0x1000_0000 and 0x1fff_ffff
objs, watch = myObjects.PrefixWatch(txn, IDIndex.Query(0x1000_0000))
for obj := range objs { ... }

// Iterate objects where ID is between 0x1000_0000 and 0x1fff_ffff in reverse order
for obj := range myObjects.PrefixReverse(txn, IDIndex.Query(0x1000_0000)) { ... }
}
```

Expand Down
88 changes: 88 additions & 0 deletions benchmarks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"iter"
"log/slog"
"math/rand"
"net/netip"
"slices"
"testing"
"time"
Expand Down Expand Up @@ -461,6 +462,7 @@ func BenchmarkDB_Prefix_SecondaryIndex(b *testing.B) {
}

const numObjectsIteration = 100000
const numLPMObjectsIteration = 50000

func BenchmarkDB_FullIteration_All(b *testing.B) {
db, table := newTestDBWithMetrics(b, &NopMetrics{})
Expand All @@ -487,6 +489,32 @@ func BenchmarkDB_FullIteration_All(b *testing.B) {
b.ReportMetric(float64(numObjectsIteration*b.N)/b.Elapsed().Seconds(), "objects/sec")
}

func BenchmarkDB_FullIteration_AllReverse(b *testing.B) {
db, table := newTestDBWithMetrics(b, &NopMetrics{})
wtxn := db.WriteTxn(table)
for i := range numObjectsIteration {
_, _, err := table.Insert(wtxn, &testObject{ID: uint64(i)})
require.NoError(b, err)
}
wtxn.Commit()

for b.Loop() {
txn := db.ReadTxn()
i := uint64(0)
for obj := range table.AllReverse(txn) {
expected := uint64(numObjectsIteration - 1 - i)
if obj.ID != expected {
b.Fatalf("expected ID %d, got %d", expected, obj.ID)
}
i++
}
if numObjectsIteration != i {
b.Fatalf("expected to iterate %d objects, got %d", numObjectsIteration, i)
}
}
b.ReportMetric(float64(numObjectsIteration*b.N)/b.Elapsed().Seconds(), "objects/sec")
}

func BenchmarkDB_FullIteration_Prefix(b *testing.B) {
db, table := newTestDBWithMetrics(b, &NopMetrics{})
wtxn := db.WriteTxn(table)
Expand Down Expand Up @@ -514,6 +542,66 @@ func BenchmarkDB_FullIteration_Prefix(b *testing.B) {
b.ReportMetric(float64(numObjectsIteration*b.N)/b.Elapsed().Seconds(), "objects/sec")
}

func BenchmarkDB_FullIteration_PrefixReverse(b *testing.B) {
db, table := newTestDBWithMetrics(b, &NopMetrics{})
wtxn := db.WriteTxn(table)
for i := range numObjectsIteration {
_, _, err := table.Insert(wtxn, &testObject{ID: uint64(i)})
require.NoError(b, err)
}
wtxn.Commit()

query := Query[*testObject]{index: idIndex.indexName()}

for b.Loop() {
txn := db.ReadTxn()
i := uint64(0)
for obj := range table.PrefixReverse(txn, query) {
expected := uint64(numObjectsIteration - 1 - i)
if obj.ID != expected {
b.Fatalf("expected ID %d, got %d", expected, obj.ID)
}
i++
}
if numObjectsIteration != i {
b.Fatalf("expected to iterate %d objects, got %d", numObjectsIteration, i)
}
}
b.ReportMetric(float64(numObjectsIteration*b.N)/b.Elapsed().Seconds(), "objects/sec")
}

func BenchmarkDB_LPM_PrefixReverse(b *testing.B) {
db := New()
table := newLPMTestTable(db)

wtxn := db.WriteTxn(table)
for i := 0; i < numLPMObjectsIteration; i++ {
addr := netip.AddrFrom4([4]byte{10, byte(i >> 8), byte(i), 1})
prefix := netip.PrefixFrom(addr, 32)
_, _, err := table.Insert(wtxn, lpmTestObject{
ID: uint16(i),
Prefix: prefix,
PortPrefixLen: 16,
})
require.NoError(b, err)
}
txn := wtxn.Commit()

query := lpmPrefixIndex.QueryPrefix(netip.MustParsePrefix("10.0.0.0/8"))
b.ResetTimer()

for b.Loop() {
count := 0
for range table.PrefixReverse(txn, query) {
count++
}
if numLPMObjectsIteration != count {
b.Fatalf("expected to iterate %d objects, got %d", numLPMObjectsIteration, count)
}
}
b.ReportMetric(float64(numLPMObjectsIteration*b.N)/b.Elapsed().Seconds(), "objects/sec")
}

func BenchmarkDB_FullIteration_Get(b *testing.B) {
db, table := newTestDBWithMetrics(b, &NopMetrics{})
wtxn := db.WriteTxn(table)
Expand Down
73 changes: 73 additions & 0 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,31 @@ func TestDB_Prefix(t *testing.T) {
require.Equal(t, Collect(Map(iter, (*testObject).getID)), []uint64{71, 82, 99})
}

func TestDB_PrefixReverse(t *testing.T) {
t.Parallel()

db, table := newTestDBWithMetrics(t, &NopMetrics{}, tagsIndex)

{
txn := db.WriteTxn(table)
_, _, err := table.Insert(txn, &testObject{ID: 42, Tags: part.NewSet("a", "b")})
require.NoError(t, err, "Insert failed")
_, _, err = table.Insert(txn, &testObject{ID: 82, Tags: part.NewSet("abc")})
require.NoError(t, err, "Insert failed")
_, _, err = table.Insert(txn, &testObject{ID: 71, Tags: part.NewSet("ab")})
require.NoError(t, err, "Insert failed")
_, _, err = table.Insert(txn, &testObject{ID: 99, Tags: part.NewSet("abcd")})
require.NoError(t, err, "Insert failed")
txn.Commit()
}

txn := db.ReadTxn()
forward := Collect(Map(table.Prefix(txn, tagsIndex.Query("ab")), (*testObject).getID))
reverse := Collect(Map(table.PrefixReverse(txn, tagsIndex.Query("ab")), (*testObject).getID))
slices.Reverse(forward)
require.Equal(t, forward, reverse)
}

func TestDB_Changes(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -717,6 +742,29 @@ func TestDB_All(t *testing.T) {
}
}

func TestDB_AllReverse(t *testing.T) {
t.Parallel()

db, table, _ := newTestDB(t)

{
txn := db.WriteTxn(table)
_, _, err := table.Insert(txn, &testObject{ID: uint64(1)})
require.NoError(t, err, "Insert failed")
_, _, err = table.Insert(txn, &testObject{ID: uint64(2)})
require.NoError(t, err, "Insert failed")
_, _, err = table.Insert(txn, &testObject{ID: uint64(3)})
require.NoError(t, err, "Insert failed")
txn.Commit()
}

txn := db.ReadTxn()
forward := Collect(Map(table.All(txn), (*testObject).getID))
reverse := Collect(Map(table.AllReverse(txn), (*testObject).getID))
slices.Reverse(forward)
require.Equal(t, forward, reverse)
}

func TestDB_Modify(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -889,6 +937,31 @@ func TestDB_GetList(t *testing.T) {
}
}

func TestDB_ListReverse(t *testing.T) {
t.Parallel()

db, table, _ := newTestDB(t, tagsIndex)

{
txn := db.WriteTxn(table)
for i := 1; i <= 10; i++ {
tag := "odd"
if i%2 == 0 {
tag = "even"
}
_, _, err := table.Insert(txn, &testObject{ID: uint64(i), Tags: part.NewSet(tag)})
require.NoError(t, err)
}
txn.Commit()
}

txn := db.ReadTxn()
forward := Collect(Map(table.List(txn, tagsIndex.Query("odd")), (*testObject).getID))
reverse := Collect(Map(table.ListReverse(txn, tagsIndex.Query("odd")), (*testObject).getID))
slices.Reverse(forward)
require.Equal(t, forward, reverse)
}

func TestDB_CommitAbort(t *testing.T) {
t.Parallel()

Expand Down
91 changes: 91 additions & 0 deletions lpm/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,16 @@ type Iterator[T any] struct {
stack []*lpmNode[T]
}

type reverseFrame[T any] struct {
node *lpmNode[T]
visited bool
}

type ReverseIterator[T any] struct {
start *lpmNode[T]
stack []reverseFrame[T]
}

func (it *Iterator[T]) All(yield func([]byte, T) bool) {
if it == nil {
return
Expand Down Expand Up @@ -74,3 +84,84 @@ func (it *Iterator[T]) Next() (key []byte, value T, ok bool) {
}
return
}

func (it *ReverseIterator[T]) All(yield func([]byte, T) bool) {
if it == nil {
return
}
var (
// Use a stack allocated array for holding the next nodes
// to explore. If this isn't large enough [append] will heap
// allocate.
stackArray [32]reverseFrame[T]

stack []reverseFrame[T]
)

if it.start != nil {
stack = stackArray[0:1:32]
stack[0] = reverseFrame[T]{node: it.start}
} else if len(it.stack) < cap(stackArray) {
stack = stackArray[:len(it.stack)]
copy(stack, it.stack)
} else {
stack = slices.Clone(it.stack)
}

for len(stack) > 0 {
frame := stack[len(stack)-1]
stack = stack[:len(stack)-1]
if frame.node == nil {
continue
}
if frame.visited {
if !frame.node.imaginary {
if !yield(frame.node.key, frame.node.value) {
return
}
}
continue
}

stack = append(stack, reverseFrame[T]{node: frame.node, visited: true})
if frame.node.children[0] != nil {
stack = append(stack, reverseFrame[T]{node: frame.node.children[0]})
}
if frame.node.children[1] != nil {
stack = append(stack, reverseFrame[T]{node: frame.node.children[1]})
}
}
}

func (it *ReverseIterator[T]) Next() (key []byte, value T, ok bool) {
if it == nil {
return
}
if it.start != nil {
it.stack = []reverseFrame[T]{{node: it.start}}
it.start = nil
}

for len(it.stack) > 0 {
frame := it.stack[len(it.stack)-1]
it.stack = it.stack[:len(it.stack)-1]
if frame.node == nil {
continue
}
if frame.visited {
if !frame.node.imaginary {
return frame.node.key, frame.node.value, true
}
continue
}

it.stack = append(it.stack, reverseFrame[T]{node: frame.node, visited: true})
if frame.node.children[0] != nil {
it.stack = append(it.stack, reverseFrame[T]{node: frame.node.children[0]})
}
if frame.node.children[1] != nil {
it.stack = append(it.stack, reverseFrame[T]{node: frame.node.children[1]})
}
}
return
}
Loading