Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion db.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,12 @@ func (db *DB) WriteTxn(tables ...TableMeta) WriteTxn {
txn.smus.Lock()
acquiredAt := time.Now()

txn.tableEntries = slices.Clone(*db.root.Load())
txn.oldRoot = db.root.Load()

// Clone the root. This new allocation will become the new root when
// we commit.
txn.tableEntries = slices.Clone(*txn.oldRoot)

txn.handle = db.handleName
txn.acquiredAt = acquiredAt

Expand Down
23 changes: 23 additions & 0 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -592,6 +592,29 @@ func TestDB_Changes(t *testing.T) {

assert.EqualValues(t, 0, expvarInt(metrics.ObjectCountVar.Get("test")), "ObjectCount")
assert.EqualValues(t, 0, expvarInt(metrics.GraveyardObjectCountVar.Get("test")), "GraveyardObjectCount")

// Create another iterator and test observing changes using a WriteTxn
// that is mutating the table. This will observe the changes up to the
// point WriteTxn() was called, but not changes made in the WriteTxn.
wtxn = db.WriteTxn(table)
iter3, err := table.Changes(wtxn)
require.NoError(t, err, "failed to create ChangeIterator")
_, _, err = table.Insert(wtxn, &testObject{ID: 1})
require.NoError(t, err, "Insert failed")
wtxn.Commit()

wtxn = db.WriteTxn(table)
_, _, err = table.Insert(wtxn, &testObject{ID: 2})
require.NoError(t, err, "Insert failed")
changes, _ = iter3.Next(wtxn)
// We don't observe the insert of ID 2
count = 0
for change := range changes {
require.EqualValues(t, 1, change.Object.ID)
count++
}
require.Equal(t, 1, count)
wtxn.Abort()
}

func TestDB_Observable(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ type changeIterator[Obj any] struct {
}

func (it *changeIterator[Obj]) refresh(txn ReadTxn) {
tableEntry := txn.root()[it.table.tablePos()]
tableEntry := txn.committedRoot()[it.table.tablePos()]
if it.iter != nil && tableEntry.locked {
var obj Obj
panic(fmt.Sprintf("Table[%T].Changes().Next() called with the target table locked. This is not supported.", obj))
Expand Down
5 changes: 5 additions & 0 deletions read_txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ func (r *readTxn) root() dbRoot {
return dbRoot(*r)
}

// committedRoot implements ReadTxn.
func (r *readTxn) committedRoot() dbRoot {
return dbRoot(*r)
}

// WriteJSON marshals out the database as JSON into the given writer.
// If tables are given then only these tables are written.
func (r *readTxn) WriteJSON(w io.Writer, tables ...string) error {
Expand Down
17 changes: 16 additions & 1 deletion types.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ type Table[Obj any] interface {
//
// If an object is created and deleted before the observer has iterated
// over the creation then only the deletion is seen.
//
// If [ChangeIterator.Next] is called with a [WriteTxn] targeting the
// table being observed then only the changes prior to that [WriteTxn]
// are observed.
Changes(WriteTxn) (ChangeIterator[Obj], error)
}

Expand Down Expand Up @@ -108,7 +112,9 @@ type ChangeIterator[Obj any] interface {
// The returned sequence is a single-use sequence and subsequent calls will return
// an empty sequence.
//
// Next will panic if called with a WriteTxn that has locked the target table.
// If Next is called with a [WriteTxn] targeting the table being observed then only
// the changes made prior to that [WriteTxn] are observed, e.g. we can only observe
// committed changes.
Next(ReadTxn) (iter.Seq2[Change[Obj], Revision], <-chan struct{})

// Close the change iterator. Once all change iterators for a given table are closed
Expand Down Expand Up @@ -254,8 +260,17 @@ type ReadTxn interface {
indexReadTxn(meta TableMeta, indexPos int) (tableIndexReader, error)
mustIndexReadTxn(meta TableMeta, indexPos int) tableIndexReader
getTableEntry(meta TableMeta) *tableEntry

// root returns the database root. If this is a WriteTxn it returns
// the current modified root.
root() dbRoot

// committedRoot returns the committed database root. If this is a
// WriteTxn it returns the root snapshotted at the time the WriteTxn
// was constructed and thus does not reflect any changes made in the
// transaction.
committedRoot() dbRoot

// WriteJSON writes the contents of the database as JSON.
WriteJSON(w io.Writer, tables ...string) error
}
Expand Down
6 changes: 6 additions & 0 deletions write_txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type writeTxnState struct {
acquiredAt time.Time // the time at which the transaction acquired the locks
duration atomic.Uint64 // the transaction duration after it finished

oldRoot *dbRoot // snapshot of the root at the time WriteTxn was called
tableEntries []*tableEntry // table entries being modified
numTxns int // number of index transactions opened
smus internal.SortableMutexes // the (sorted) table locks
Expand All @@ -44,6 +45,10 @@ func (txn *writeTxnState) unwrap() *writeTxnState {
return txn
}

func (txn *writeTxnState) committedRoot() dbRoot {
return *txn.oldRoot
}

func (txn *writeTxnState) root() dbRoot {
return txn.tableEntries
}
Expand Down Expand Up @@ -286,6 +291,7 @@ func (txn *writeTxnState) delete(meta TableMeta, guardRevision Revision, data an
// and returns it to the pool.
func (handle *writeTxnHandle) returnToPool() {
txn := handle.writeTxnState
txn.oldRoot = nil
txn.tableEntries = nil
txn.numTxns = 0
clear(txn.smus)
Expand Down