diff --git a/ss/pebbledb/db.go b/ss/pebbledb/db.go index b1fe4916..df02a474 100644 --- a/ss/pebbledb/db.go +++ b/ss/pebbledb/db.go @@ -176,6 +176,7 @@ func (db *Database) Close() error { } func (db *Database) SetLatestVersion(version int64) error { + fmt.Printf("[%d] SSDEBUG - SetLatestVersion version %d\n", time.Now().UnixNano(), version) var ts [VersionSize]byte binary.LittleEndian.PutUint64(ts[:], uint64(version)) err := db.storage.Set([]byte(latestVersionKey), ts[:], defaultWriteOpts) @@ -183,6 +184,7 @@ func (db *Database) SetLatestVersion(version int64) error { } func (db *Database) GetLatestVersion() (int64, error) { + fmt.Printf("[%d] SSDEBUG - GetLatestVersion\n", time.Now().UnixNano()) bz, closer, err := db.storage.Get([]byte(latestVersionKey)) if err != nil { if errors.Is(err, pebble.ErrNotFound) { @@ -201,6 +203,7 @@ func (db *Database) GetLatestVersion() (int64, error) { } func (db *Database) SetEarliestVersion(version int64, ignoreVersion bool) error { + fmt.Printf("[%d] SSDEBUG - SetEarliestVersion version %d ignoreVersion %t\n", time.Now().UnixNano(), version, ignoreVersion) if version > db.earliestVersion || ignoreVersion { db.earliestVersion = version @@ -212,10 +215,12 @@ func (db *Database) SetEarliestVersion(version int64, ignoreVersion bool) error } func (db *Database) GetEarliestVersion() (int64, error) { + fmt.Printf("[%d] SSDEBUG - GetEarliestVersion\n", time.Now().UnixNano()) return db.earliestVersion, nil } func (db *Database) SetLastRangeHashed(latestHashed int64) error { + fmt.Printf("[%d] SSDEBUG - SetLastRangeHashed latestHashed %d\n", time.Now().UnixNano(), latestHashed) var ts [VersionSize]byte binary.LittleEndian.PutUint64(ts[:], uint64(latestHashed)) @@ -229,6 +234,7 @@ func (db *Database) SetLastRangeHashed(latestHashed int64) error { // GetLastRangeHashed returns the highest block that has been fully hashed in ranges. func (db *Database) GetLastRangeHashed() (int64, error) { + fmt.Printf("[%d] SSDEBUG - GetLastRangeHashed\n", time.Now().UnixNano()) // Return the cached value db.lastRangeHashedMu.RLock() cachedValue := db.lastRangeHashedCache @@ -258,11 +264,13 @@ func retrieveEarliestVersion(db *pebble.DB) (int64, error) { // SetLatestKey sets the latest key processed during migration. func (db *Database) SetLatestMigratedKey(key []byte) error { + fmt.Printf("[%d] SSDEBUG - SetLatestMigratedKey key %s (length %d)\n", time.Now().UnixNano(), string(key), len(key)) return db.storage.Set([]byte(latestMigratedKeyMetadata), key, defaultWriteOpts) } // GetLatestKey retrieves the latest key processed during migration. func (db *Database) GetLatestMigratedKey() ([]byte, error) { + fmt.Printf("[%d] SSDEBUG - GetLatestMigratedKey\n", time.Now().UnixNano()) bz, closer, err := db.storage.Get([]byte(latestMigratedKeyMetadata)) if err != nil { if errors.Is(err, pebble.ErrNotFound) { @@ -276,11 +284,13 @@ func (db *Database) GetLatestMigratedKey() ([]byte, error) { // SetLatestModule sets the latest module processed during migration. func (db *Database) SetLatestMigratedModule(module string) error { + fmt.Printf("[%d] SSDEBUG - SetLatestMigratedModule module %s\n", time.Now().UnixNano(), module) return db.storage.Set([]byte(latestMigratedModuleMetadata), []byte(module), defaultWriteOpts) } // GetLatestModule retrieves the latest module processed during migration. func (db *Database) GetLatestMigratedModule() (string, error) { + fmt.Printf("[%d] SSDEBUG - GetLatestMigratedModule\n", time.Now().UnixNano()) bz, closer, err := db.storage.Get([]byte(latestMigratedModuleMetadata)) if err != nil { if errors.Is(err, pebble.ErrNotFound) { @@ -293,6 +303,7 @@ func (db *Database) GetLatestMigratedModule() (string, error) { } func (db *Database) Has(storeKey string, version int64, key []byte) (bool, error) { + fmt.Printf("[%d] SSDEBUG - Has storeKey %s version %d key %s (length %d)\n", time.Now().UnixNano(), storeKey, version, string(key), len(key)) if version < db.earliestVersion { return false, nil } @@ -306,6 +317,7 @@ func (db *Database) Has(storeKey string, version int64, key []byte) (bool, error } func (db *Database) Get(storeKey string, targetVersion int64, key []byte) ([]byte, error) { + fmt.Printf("[%d] SSDEBUG - Get storeKey %s targetVersion %d key %s (length %d)\n", time.Now().UnixNano(), storeKey, targetVersion, string(key), len(key)) if targetVersion < db.earliestVersion { return nil, nil } @@ -346,6 +358,7 @@ func (db *Database) Get(storeKey string, targetVersion int64, key []byte) ([]byt } func (db *Database) ApplyChangeset(version int64, cs *proto.NamedChangeSet) error { + fmt.Printf("[%d] SSDEBUG - ApplyChangeset version %d store %s pairs length %d\n", time.Now().UnixNano(), version, cs.Name, len(cs.Changeset.Pairs)) // Check if version is 0 and change it to 1 // We do this specifically since keys written as part of genesis state come in as version 0 // But pebbledb treats version 0 as special, so apply the changeset at version 1 instead @@ -377,6 +390,7 @@ func (db *Database) ApplyChangeset(version int64, cs *proto.NamedChangeSet) erro } func (db *Database) ApplyChangesetAsync(version int64, changesets []*proto.NamedChangeSet) error { + fmt.Printf("[%d] SSDEBUG - ApplyChangesetAsync version %d changesets length %d\n", time.Now().UnixNano(), version, len(changesets)) // Write to WAL first if db.streamHandler != nil { entry := proto.ChangelogEntry{ @@ -413,6 +427,7 @@ func (db *Database) ApplyChangesetAsync(version int64, changesets []*proto.Named } func (db *Database) computeMissingRanges(latestVersion int64) error { + fmt.Printf("[%d] SSDEBUG - computeMissingRanges latestVersion %d\n", time.Now().UnixNano(), latestVersion) lastHashed, err := db.GetLastRangeHashed() if err != nil { return fmt.Errorf("failed to get last hashed range: %w", err) @@ -446,6 +461,7 @@ func (db *Database) computeMissingRanges(latestVersion int64) error { } func (db *Database) computeHashForRange(beginBlock, endBlock int64) error { + fmt.Printf("[%d] SSDEBUG - computeHashForRange beginBlock %d endBlock %d\n", time.Now().UnixNano(), beginBlock, endBlock) chunkSize := endBlock - beginBlock + 1 if chunkSize <= 0 { // Nothing to do @@ -534,6 +550,7 @@ func (db *Database) writeAsyncInBackground() { // it has been updated. This occurs when that module's keys are updated in between pruning runs, the node after is restarted. // This is not a large issue given the next time that module is updated, it will be properly pruned thereafter. func (db *Database) Prune(version int64) error { + fmt.Printf("[%d] SSDEBUG - Prune version %d\n", time.Now().UnixNano(), version) earliestVersion := version + 1 // we increment by 1 to include the provided version itr, err := db.storage.NewIter(nil) @@ -639,6 +656,14 @@ func (db *Database) Prune(version int64) error { } func (db *Database) Iterator(storeKey string, version int64, start, end []byte) (types.DBIterator, error) { + var startStr, endStr string + if start != nil { + startStr = string(start) + } + if end != nil { + endStr = string(end) + } + fmt.Printf("[%d] SSDEBUG - Iterator storeKey %s version %d start %s (length %d) end %s (length %d)\n", time.Now().UnixNano(), storeKey, version, startStr, len(start), endStr, len(end)) if (start != nil && len(start) == 0) || (end != nil && len(end) == 0) { return nil, errorutils.ErrKeyEmpty } @@ -677,6 +702,14 @@ func prefixEnd(b []byte) []byte { } func (db *Database) ReverseIterator(storeKey string, version int64, start, end []byte) (types.DBIterator, error) { + var startStr, endStr string + if start != nil { + startStr = string(start) + } + if end != nil { + endStr = string(end) + } + fmt.Printf("[%d] SSDEBUG - ReverseIterator storeKey %s version %d start %s (length %d) end %s (length %d)\n", time.Now().UnixNano(), storeKey, version, startStr, len(start), endStr, len(end)) if (start != nil && len(start) == 0) || (end != nil && len(end) == 0) { return nil, errorutils.ErrKeyEmpty } @@ -705,6 +738,7 @@ func (db *Database) ReverseIterator(storeKey string, version int64, start, end [ // Import loads the initial version of the state in parallel with numWorkers goroutines // TODO: Potentially add retries instead of panics func (db *Database) Import(version int64, ch <-chan types.SnapshotNode) error { + fmt.Printf("[%d] SSDEBUG - Import version %d\n", time.Now().UnixNano(), version) var wg sync.WaitGroup worker := func() { @@ -752,6 +786,7 @@ func (db *Database) Import(version int64, ch <-chan types.SnapshotNode) error { } func (db *Database) RawImport(ch <-chan types.RawSnapshotNode) error { + fmt.Printf("[%d] SSDEBUG - RawImport\n", time.Now().UnixNano()) var wg sync.WaitGroup worker := func() { @@ -834,6 +869,7 @@ func (db *Database) RawImport(ch <-chan types.RawSnapshotNode) error { // RawIterate iterates over all keys and values for a store func (db *Database) RawIterate(storeKey string, fn func(key []byte, value []byte, version int64) bool) (bool, error) { + fmt.Printf("[%d] SSDEBUG - RawIterate storeKey %s\n", time.Now().UnixNano(), storeKey) // Iterate through all keys and values for a store lowerBound := MVCCEncode(prependStoreKey(storeKey, nil), 0) prefix := storePrefix(storeKey) @@ -892,6 +928,7 @@ func (db *Database) RawIterate(storeKey string, fn func(key []byte, value []byte } func (db *Database) DeleteKeysAtVersion(module string, version int64) error { + fmt.Printf("[%d] SSDEBUG - DeleteKeysAtVersion module %s version %d\n", time.Now().UnixNano(), module, version) batch, err := NewBatch(db.storage, version) if err != nil { @@ -1024,6 +1061,7 @@ func valTombstoned(value []byte) bool { // WriteBlockRangeHash writes a hash for a range of blocks to the database func (db *Database) WriteBlockRangeHash(storeKey string, beginBlockRange, endBlockRange int64, hash []byte) error { + fmt.Printf("[%d] SSDEBUG - WriteBlockRangeHash storeKey %s beginBlockRange %d endBlockRange %d hash %s (length %d)\n", time.Now().UnixNano(), storeKey, beginBlockRange, endBlockRange, string(hash), len(hash)) key := []byte(fmt.Sprintf(HashTpl, storeKey, beginBlockRange, endBlockRange)) err := db.storage.Set(key, hash, defaultWriteOpts) if err != nil { diff --git a/ss/pebbledb/iterator.go b/ss/pebbledb/iterator.go index 36ca8f53..05a80d17 100644 --- a/ss/pebbledb/iterator.go +++ b/ss/pebbledb/iterator.go @@ -3,6 +3,7 @@ package pebbledb import ( "bytes" "fmt" + "time" "github.com/cockroachdb/pebble" "github.com/sei-protocol/sei-db/ss/types" @@ -29,6 +30,14 @@ type iterator struct { } func newPebbleDBIterator(src *pebble.Iterator, prefix, mvccStart, mvccEnd []byte, version int64, earliestVersion int64, reverse bool) *iterator { + var startStr, endStr string + if mvccStart != nil { + startStr = string(mvccStart) + } + if mvccEnd != nil { + endStr = string(mvccEnd) + } + fmt.Printf("[%d] SSDEBUG - newPebbleDBIterator prefix %s (length %d) mvccStart %s (length %d) mvccEnd %s (length %d) version %d earliestVersion %d reverse %t\n", time.Now().UnixNano(), string(prefix), len(prefix), startStr, len(mvccStart), endStr, len(mvccEnd), version, earliestVersion, reverse) // Return invalid iterator if requested iterator height is lower than earliest version after pruning if version < earliestVersion { return &iterator{ @@ -100,10 +109,12 @@ func newPebbleDBIterator(src *pebble.Iterator, prefix, mvccStart, mvccEnd []byte // Domain returns the domain of the iterator. The caller must not modify the // return values. func (itr *iterator) Domain() ([]byte, []byte) { + fmt.Printf("[%d] SSDEBUG - iterator.Domain\n", time.Now().UnixNano()) return itr.start, itr.end } func (itr *iterator) Key() []byte { + fmt.Printf("[%d] SSDEBUG - iterator.Key\n", time.Now().UnixNano()) itr.assertIsValid() key, _, ok := SplitMVCCKey(itr.source.Key()) @@ -114,10 +125,13 @@ func (itr *iterator) Key() []byte { } keyCopy := slices.Clone(key) - return keyCopy[len(itr.prefix):] + result := keyCopy[len(itr.prefix):] + fmt.Printf("[%d] SSDEBUG - iterator.Key returning key %s (length %d)\n", time.Now().UnixNano(), string(result), len(result)) + return result } func (itr *iterator) Value() []byte { + fmt.Printf("[%d] SSDEBUG - iterator.Value\n", time.Now().UnixNano()) itr.assertIsValid() val, _, ok := SplitMVCCKey(itr.source.Value()) @@ -127,10 +141,13 @@ func (itr *iterator) Value() []byte { panic(fmt.Sprintf("invalid PebbleDB MVCC value: %s", itr.source.Key())) } - return slices.Clone(val) + result := slices.Clone(val) + fmt.Printf("[%d] SSDEBUG - iterator.Value returning value %s (length %d)\n", time.Now().UnixNano(), string(result), len(result)) + return result } func (itr *iterator) nextForward() { + fmt.Printf("[%d] SSDEBUG - iterator.nextForward\n", time.Now().UnixNano()) if !itr.source.Valid() { itr.valid = false return @@ -220,6 +237,7 @@ func (itr *iterator) nextForward() { } func (itr *iterator) nextReverse() { + fmt.Printf("[%d] SSDEBUG - iterator.nextReverse\n", time.Now().UnixNano()) if !itr.source.Valid() { itr.valid = false return @@ -289,6 +307,7 @@ func (itr *iterator) nextReverse() { } func (itr *iterator) Next() { + fmt.Printf("[%d] SSDEBUG - iterator.Next reverse %t\n", time.Now().UnixNano(), itr.reverse) if itr.reverse { itr.nextReverse() } else { @@ -297,6 +316,7 @@ func (itr *iterator) Next() { } func (itr *iterator) Valid() bool { + fmt.Printf("[%d] SSDEBUG - iterator.Valid\n", time.Now().UnixNano()) // once invalid, forever invalid if !itr.valid || !itr.source.Valid() { itr.valid = false @@ -321,10 +341,12 @@ func (itr *iterator) Valid() bool { } func (itr *iterator) Error() error { + fmt.Printf("[%d] SSDEBUG - iterator.Error\n", time.Now().UnixNano()) return itr.source.Error() } func (itr *iterator) Close() error { + fmt.Printf("[%d] SSDEBUG - iterator.Close\n", time.Now().UnixNano()) _ = itr.source.Close() itr.source = nil itr.valid = false @@ -343,6 +365,7 @@ func (itr *iterator) assertIsValid() { // pair is tombstoned, the caller should call Next(). Note, this method assumes // the caller assures the iterator is valid first! func (itr *iterator) cursorTombstoned() bool { + fmt.Printf("[%d] SSDEBUG - iterator.cursorTombstoned\n", time.Now().UnixNano()) _, tombBz, ok := SplitMVCCKey(itr.source.Value()) if !ok { // XXX: This should not happen as that would indicate we have a malformed @@ -370,6 +393,7 @@ func (itr *iterator) cursorTombstoned() bool { } func (itr *iterator) DebugRawIterate() { + fmt.Printf("[%d] SSDEBUG - iterator.DebugRawIterate\n", time.Now().UnixNano()) valid := itr.source.Valid() if valid { // The first key may not represent the desired target version, so move the