From f7d1c68396761d963cc7cc4d84d3ecd14fa1d4c1 Mon Sep 17 00:00:00 2001 From: kbhat1 Date: Fri, 29 Aug 2025 10:50:08 -0400 Subject: [PATCH 1/3] Dense Logging --- ss/pebbledb/db.go | 24 ++++++++++++++++++++++++ ss/pebbledb/iterator.go | 12 ++++++++++++ 2 files changed, 36 insertions(+) diff --git a/ss/pebbledb/db.go b/ss/pebbledb/db.go index b1fe4916..957885a3 100644 --- a/ss/pebbledb/db.go +++ b/ss/pebbledb/db.go @@ -176,6 +176,7 @@ func (db *Database) Close() error { } func (db *Database) SetLatestVersion(version int64) error { + fmt.Printf("SSDEBUG - SetLatestVersion version %d\n", version) var ts [VersionSize]byte binary.LittleEndian.PutUint64(ts[:], uint64(version)) err := db.storage.Set([]byte(latestVersionKey), ts[:], defaultWriteOpts) @@ -183,6 +184,7 @@ func (db *Database) SetLatestVersion(version int64) error { } func (db *Database) GetLatestVersion() (int64, error) { + fmt.Printf("SSDEBUG - GetLatestVersion\n") bz, closer, err := db.storage.Get([]byte(latestVersionKey)) if err != nil { if errors.Is(err, pebble.ErrNotFound) { @@ -201,6 +203,7 @@ func (db *Database) GetLatestVersion() (int64, error) { } func (db *Database) SetEarliestVersion(version int64, ignoreVersion bool) error { + fmt.Printf("SSDEBUG - SetEarliestVersion version %d ignoreVersion %t\n", version, ignoreVersion) if version > db.earliestVersion || ignoreVersion { db.earliestVersion = version @@ -212,10 +215,12 @@ func (db *Database) SetEarliestVersion(version int64, ignoreVersion bool) error } func (db *Database) GetEarliestVersion() (int64, error) { + fmt.Printf("SSDEBUG - GetEarliestVersion\n") return db.earliestVersion, nil } func (db *Database) SetLastRangeHashed(latestHashed int64) error { + fmt.Printf("SSDEBUG - SetLastRangeHashed latestHashed %d\n", latestHashed) var ts [VersionSize]byte binary.LittleEndian.PutUint64(ts[:], uint64(latestHashed)) @@ -229,6 +234,7 @@ func (db *Database) SetLastRangeHashed(latestHashed int64) error { // GetLastRangeHashed returns the highest block that has been fully hashed in ranges. func (db *Database) GetLastRangeHashed() (int64, error) { + fmt.Printf("SSDEBUG - GetLastRangeHashed\n") // Return the cached value db.lastRangeHashedMu.RLock() cachedValue := db.lastRangeHashedCache @@ -258,11 +264,13 @@ func retrieveEarliestVersion(db *pebble.DB) (int64, error) { // SetLatestKey sets the latest key processed during migration. func (db *Database) SetLatestMigratedKey(key []byte) error { + fmt.Printf("SSDEBUG - SetLatestMigratedKey key length %d\n", len(key)) return db.storage.Set([]byte(latestMigratedKeyMetadata), key, defaultWriteOpts) } // GetLatestKey retrieves the latest key processed during migration. func (db *Database) GetLatestMigratedKey() ([]byte, error) { + fmt.Printf("SSDEBUG - GetLatestMigratedKey\n") bz, closer, err := db.storage.Get([]byte(latestMigratedKeyMetadata)) if err != nil { if errors.Is(err, pebble.ErrNotFound) { @@ -276,11 +284,13 @@ func (db *Database) GetLatestMigratedKey() ([]byte, error) { // SetLatestModule sets the latest module processed during migration. func (db *Database) SetLatestMigratedModule(module string) error { + fmt.Printf("SSDEBUG - SetLatestMigratedModule module %s\n", module) return db.storage.Set([]byte(latestMigratedModuleMetadata), []byte(module), defaultWriteOpts) } // GetLatestModule retrieves the latest module processed during migration. func (db *Database) GetLatestMigratedModule() (string, error) { + fmt.Printf("SSDEBUG - GetLatestMigratedModule\n") bz, closer, err := db.storage.Get([]byte(latestMigratedModuleMetadata)) if err != nil { if errors.Is(err, pebble.ErrNotFound) { @@ -293,6 +303,7 @@ func (db *Database) GetLatestMigratedModule() (string, error) { } func (db *Database) Has(storeKey string, version int64, key []byte) (bool, error) { + fmt.Printf("SSDEBUG - Has storeKey %s version %d key length %d\n", storeKey, version, len(key)) if version < db.earliestVersion { return false, nil } @@ -306,6 +317,7 @@ func (db *Database) Has(storeKey string, version int64, key []byte) (bool, error } func (db *Database) Get(storeKey string, targetVersion int64, key []byte) ([]byte, error) { + fmt.Printf("SSDEBUG - Get storeKey %s targetVersion %d key length %d\n", storeKey, targetVersion, len(key)) if targetVersion < db.earliestVersion { return nil, nil } @@ -346,6 +358,7 @@ func (db *Database) Get(storeKey string, targetVersion int64, key []byte) ([]byt } func (db *Database) ApplyChangeset(version int64, cs *proto.NamedChangeSet) error { + fmt.Printf("SSDEBUG - ApplyChangeset version %d store %s pairs length %d\n", version, cs.Name, len(cs.Changeset.Pairs)) // Check if version is 0 and change it to 1 // We do this specifically since keys written as part of genesis state come in as version 0 // But pebbledb treats version 0 as special, so apply the changeset at version 1 instead @@ -377,6 +390,7 @@ func (db *Database) ApplyChangeset(version int64, cs *proto.NamedChangeSet) erro } func (db *Database) ApplyChangesetAsync(version int64, changesets []*proto.NamedChangeSet) error { + fmt.Printf("SSDEBUG - ApplyChangesetAsync version %d changesets length %d\n", version, len(changesets)) // Write to WAL first if db.streamHandler != nil { entry := proto.ChangelogEntry{ @@ -413,6 +427,7 @@ func (db *Database) ApplyChangesetAsync(version int64, changesets []*proto.Named } func (db *Database) computeMissingRanges(latestVersion int64) error { + fmt.Printf("SSDEBUG - computeMissingRanges latestVersion %d\n", latestVersion) lastHashed, err := db.GetLastRangeHashed() if err != nil { return fmt.Errorf("failed to get last hashed range: %w", err) @@ -446,6 +461,7 @@ func (db *Database) computeMissingRanges(latestVersion int64) error { } func (db *Database) computeHashForRange(beginBlock, endBlock int64) error { + fmt.Printf("SSDEBUG - computeHashForRange beginBlock %d endBlock %d\n", beginBlock, endBlock) chunkSize := endBlock - beginBlock + 1 if chunkSize <= 0 { // Nothing to do @@ -534,6 +550,7 @@ func (db *Database) writeAsyncInBackground() { // it has been updated. This occurs when that module's keys are updated in between pruning runs, the node after is restarted. // This is not a large issue given the next time that module is updated, it will be properly pruned thereafter. func (db *Database) Prune(version int64) error { + fmt.Printf("SSDEBUG - Prune version %d\n", version) earliestVersion := version + 1 // we increment by 1 to include the provided version itr, err := db.storage.NewIter(nil) @@ -639,6 +656,7 @@ func (db *Database) Prune(version int64) error { } func (db *Database) Iterator(storeKey string, version int64, start, end []byte) (types.DBIterator, error) { + fmt.Printf("SSDEBUG - Iterator storeKey %s version %d start length %d end length %d\n", storeKey, version, len(start), len(end)) if (start != nil && len(start) == 0) || (end != nil && len(end) == 0) { return nil, errorutils.ErrKeyEmpty } @@ -677,6 +695,7 @@ func prefixEnd(b []byte) []byte { } func (db *Database) ReverseIterator(storeKey string, version int64, start, end []byte) (types.DBIterator, error) { + fmt.Printf("SSDEBUG - ReverseIterator storeKey %s version %d start length %d end length %d\n", storeKey, version, len(start), len(end)) if (start != nil && len(start) == 0) || (end != nil && len(end) == 0) { return nil, errorutils.ErrKeyEmpty } @@ -705,6 +724,7 @@ func (db *Database) ReverseIterator(storeKey string, version int64, start, end [ // Import loads the initial version of the state in parallel with numWorkers goroutines // TODO: Potentially add retries instead of panics func (db *Database) Import(version int64, ch <-chan types.SnapshotNode) error { + fmt.Printf("SSDEBUG - Import version %d\n", version) var wg sync.WaitGroup worker := func() { @@ -752,6 +772,7 @@ func (db *Database) Import(version int64, ch <-chan types.SnapshotNode) error { } func (db *Database) RawImport(ch <-chan types.RawSnapshotNode) error { + fmt.Printf("SSDEBUG - RawImport\n") var wg sync.WaitGroup worker := func() { @@ -834,6 +855,7 @@ func (db *Database) RawImport(ch <-chan types.RawSnapshotNode) error { // RawIterate iterates over all keys and values for a store func (db *Database) RawIterate(storeKey string, fn func(key []byte, value []byte, version int64) bool) (bool, error) { + fmt.Printf("SSDEBUG - RawIterate storeKey %s\n", storeKey) // Iterate through all keys and values for a store lowerBound := MVCCEncode(prependStoreKey(storeKey, nil), 0) prefix := storePrefix(storeKey) @@ -892,6 +914,7 @@ func (db *Database) RawIterate(storeKey string, fn func(key []byte, value []byte } func (db *Database) DeleteKeysAtVersion(module string, version int64) error { + fmt.Printf("SSDEBUG - DeleteKeysAtVersion module %s version %d\n", module, version) batch, err := NewBatch(db.storage, version) if err != nil { @@ -1024,6 +1047,7 @@ func valTombstoned(value []byte) bool { // WriteBlockRangeHash writes a hash for a range of blocks to the database func (db *Database) WriteBlockRangeHash(storeKey string, beginBlockRange, endBlockRange int64, hash []byte) error { + fmt.Printf("SSDEBUG - WriteBlockRangeHash storeKey %s beginBlockRange %d endBlockRange %d hash length %d\n", storeKey, beginBlockRange, endBlockRange, len(hash)) key := []byte(fmt.Sprintf(HashTpl, storeKey, beginBlockRange, endBlockRange)) err := db.storage.Set(key, hash, defaultWriteOpts) if err != nil { diff --git a/ss/pebbledb/iterator.go b/ss/pebbledb/iterator.go index 36ca8f53..a542fd9e 100644 --- a/ss/pebbledb/iterator.go +++ b/ss/pebbledb/iterator.go @@ -29,6 +29,7 @@ type iterator struct { } func newPebbleDBIterator(src *pebble.Iterator, prefix, mvccStart, mvccEnd []byte, version int64, earliestVersion int64, reverse bool) *iterator { + fmt.Printf("SSDEBUG - newPebbleDBIterator prefix length %d mvccStart length %d mvccEnd length %d version %d earliestVersion %d reverse %t\n", len(prefix), len(mvccStart), len(mvccEnd), version, earliestVersion, reverse) // Return invalid iterator if requested iterator height is lower than earliest version after pruning if version < earliestVersion { return &iterator{ @@ -100,10 +101,12 @@ func newPebbleDBIterator(src *pebble.Iterator, prefix, mvccStart, mvccEnd []byte // Domain returns the domain of the iterator. The caller must not modify the // return values. func (itr *iterator) Domain() ([]byte, []byte) { + fmt.Printf("SSDEBUG - iterator.Domain\n") return itr.start, itr.end } func (itr *iterator) Key() []byte { + fmt.Printf("SSDEBUG - iterator.Key\n") itr.assertIsValid() key, _, ok := SplitMVCCKey(itr.source.Key()) @@ -118,6 +121,7 @@ func (itr *iterator) Key() []byte { } func (itr *iterator) Value() []byte { + fmt.Printf("SSDEBUG - iterator.Value\n") itr.assertIsValid() val, _, ok := SplitMVCCKey(itr.source.Value()) @@ -131,6 +135,7 @@ func (itr *iterator) Value() []byte { } func (itr *iterator) nextForward() { + fmt.Printf("SSDEBUG - iterator.nextForward\n") if !itr.source.Valid() { itr.valid = false return @@ -220,6 +225,7 @@ func (itr *iterator) nextForward() { } func (itr *iterator) nextReverse() { + fmt.Printf("SSDEBUG - iterator.nextReverse\n") if !itr.source.Valid() { itr.valid = false return @@ -289,6 +295,7 @@ func (itr *iterator) nextReverse() { } func (itr *iterator) Next() { + fmt.Printf("SSDEBUG - iterator.Next reverse %t\n", itr.reverse) if itr.reverse { itr.nextReverse() } else { @@ -297,6 +304,7 @@ func (itr *iterator) Next() { } func (itr *iterator) Valid() bool { + fmt.Printf("SSDEBUG - iterator.Valid\n") // once invalid, forever invalid if !itr.valid || !itr.source.Valid() { itr.valid = false @@ -321,10 +329,12 @@ func (itr *iterator) Valid() bool { } func (itr *iterator) Error() error { + fmt.Printf("SSDEBUG - iterator.Error\n") return itr.source.Error() } func (itr *iterator) Close() error { + fmt.Printf("SSDEBUG - iterator.Close\n") _ = itr.source.Close() itr.source = nil itr.valid = false @@ -343,6 +353,7 @@ func (itr *iterator) assertIsValid() { // pair is tombstoned, the caller should call Next(). Note, this method assumes // the caller assures the iterator is valid first! func (itr *iterator) cursorTombstoned() bool { + fmt.Printf("SSDEBUG - iterator.cursorTombstoned\n") _, tombBz, ok := SplitMVCCKey(itr.source.Value()) if !ok { // XXX: This should not happen as that would indicate we have a malformed @@ -370,6 +381,7 @@ func (itr *iterator) cursorTombstoned() bool { } func (itr *iterator) DebugRawIterate() { + fmt.Printf("SSDEBUG - iterator.DebugRawIterate\n") valid := itr.source.Valid() if valid { // The first key may not represent the desired target version, so move the From 71197294d8c2de4fb1e00f3d9f8f0577b982dfc9 Mon Sep 17 00:00:00 2001 From: kbhat1 Date: Fri, 29 Aug 2025 10:55:29 -0400 Subject: [PATCH 2/3] String key --- ss/pebbledb/db.go | 26 ++++++++++++++++++++------ ss/pebbledb/iterator.go | 17 ++++++++++++++--- 2 files changed, 34 insertions(+), 9 deletions(-) diff --git a/ss/pebbledb/db.go b/ss/pebbledb/db.go index 957885a3..1b2e638a 100644 --- a/ss/pebbledb/db.go +++ b/ss/pebbledb/db.go @@ -264,7 +264,7 @@ func retrieveEarliestVersion(db *pebble.DB) (int64, error) { // SetLatestKey sets the latest key processed during migration. func (db *Database) SetLatestMigratedKey(key []byte) error { - fmt.Printf("SSDEBUG - SetLatestMigratedKey key length %d\n", len(key)) + fmt.Printf("SSDEBUG - SetLatestMigratedKey key %s (length %d)\n", string(key), len(key)) return db.storage.Set([]byte(latestMigratedKeyMetadata), key, defaultWriteOpts) } @@ -303,7 +303,7 @@ func (db *Database) GetLatestMigratedModule() (string, error) { } func (db *Database) Has(storeKey string, version int64, key []byte) (bool, error) { - fmt.Printf("SSDEBUG - Has storeKey %s version %d key length %d\n", storeKey, version, len(key)) + fmt.Printf("SSDEBUG - Has storeKey %s version %d key %s (length %d)\n", storeKey, version, string(key), len(key)) if version < db.earliestVersion { return false, nil } @@ -317,7 +317,7 @@ func (db *Database) Has(storeKey string, version int64, key []byte) (bool, error } func (db *Database) Get(storeKey string, targetVersion int64, key []byte) ([]byte, error) { - fmt.Printf("SSDEBUG - Get storeKey %s targetVersion %d key length %d\n", storeKey, targetVersion, len(key)) + fmt.Printf("SSDEBUG - Get storeKey %s targetVersion %d key %s (length %d)\n", storeKey, targetVersion, string(key), len(key)) if targetVersion < db.earliestVersion { return nil, nil } @@ -656,7 +656,14 @@ func (db *Database) Prune(version int64) error { } func (db *Database) Iterator(storeKey string, version int64, start, end []byte) (types.DBIterator, error) { - fmt.Printf("SSDEBUG - Iterator storeKey %s version %d start length %d end length %d\n", storeKey, version, len(start), len(end)) + var startStr, endStr string + if start != nil { + startStr = string(start) + } + if end != nil { + endStr = string(end) + } + fmt.Printf("SSDEBUG - Iterator storeKey %s version %d start %s (length %d) end %s (length %d)\n", storeKey, version, startStr, len(start), endStr, len(end)) if (start != nil && len(start) == 0) || (end != nil && len(end) == 0) { return nil, errorutils.ErrKeyEmpty } @@ -695,7 +702,14 @@ func prefixEnd(b []byte) []byte { } func (db *Database) ReverseIterator(storeKey string, version int64, start, end []byte) (types.DBIterator, error) { - fmt.Printf("SSDEBUG - ReverseIterator storeKey %s version %d start length %d end length %d\n", storeKey, version, len(start), len(end)) + var startStr, endStr string + if start != nil { + startStr = string(start) + } + if end != nil { + endStr = string(end) + } + fmt.Printf("SSDEBUG - ReverseIterator storeKey %s version %d start %s (length %d) end %s (length %d)\n", storeKey, version, startStr, len(start), endStr, len(end)) if (start != nil && len(start) == 0) || (end != nil && len(end) == 0) { return nil, errorutils.ErrKeyEmpty } @@ -1047,7 +1061,7 @@ func valTombstoned(value []byte) bool { // WriteBlockRangeHash writes a hash for a range of blocks to the database func (db *Database) WriteBlockRangeHash(storeKey string, beginBlockRange, endBlockRange int64, hash []byte) error { - fmt.Printf("SSDEBUG - WriteBlockRangeHash storeKey %s beginBlockRange %d endBlockRange %d hash length %d\n", storeKey, beginBlockRange, endBlockRange, len(hash)) + fmt.Printf("SSDEBUG - WriteBlockRangeHash storeKey %s beginBlockRange %d endBlockRange %d hash %s (length %d)\n", storeKey, beginBlockRange, endBlockRange, string(hash), len(hash)) key := []byte(fmt.Sprintf(HashTpl, storeKey, beginBlockRange, endBlockRange)) err := db.storage.Set(key, hash, defaultWriteOpts) if err != nil { diff --git a/ss/pebbledb/iterator.go b/ss/pebbledb/iterator.go index a542fd9e..34009932 100644 --- a/ss/pebbledb/iterator.go +++ b/ss/pebbledb/iterator.go @@ -29,7 +29,14 @@ type iterator struct { } func newPebbleDBIterator(src *pebble.Iterator, prefix, mvccStart, mvccEnd []byte, version int64, earliestVersion int64, reverse bool) *iterator { - fmt.Printf("SSDEBUG - newPebbleDBIterator prefix length %d mvccStart length %d mvccEnd length %d version %d earliestVersion %d reverse %t\n", len(prefix), len(mvccStart), len(mvccEnd), version, earliestVersion, reverse) + var startStr, endStr string + if mvccStart != nil { + startStr = string(mvccStart) + } + if mvccEnd != nil { + endStr = string(mvccEnd) + } + fmt.Printf("SSDEBUG - newPebbleDBIterator prefix %s (length %d) mvccStart %s (length %d) mvccEnd %s (length %d) version %d earliestVersion %d reverse %t\n", string(prefix), len(prefix), startStr, len(mvccStart), endStr, len(mvccEnd), version, earliestVersion, reverse) // Return invalid iterator if requested iterator height is lower than earliest version after pruning if version < earliestVersion { return &iterator{ @@ -117,7 +124,9 @@ func (itr *iterator) Key() []byte { } keyCopy := slices.Clone(key) - return keyCopy[len(itr.prefix):] + result := keyCopy[len(itr.prefix):] + fmt.Printf("SSDEBUG - iterator.Key returning key %s (length %d)\n", string(result), len(result)) + return result } func (itr *iterator) Value() []byte { @@ -131,7 +140,9 @@ func (itr *iterator) Value() []byte { panic(fmt.Sprintf("invalid PebbleDB MVCC value: %s", itr.source.Key())) } - return slices.Clone(val) + result := slices.Clone(val) + fmt.Printf("SSDEBUG - iterator.Value returning value %s (length %d)\n", string(result), len(result)) + return result } func (itr *iterator) nextForward() { From 0850b72b2b09dda0b64fe0dd49c0cdfc00792b52 Mon Sep 17 00:00:00 2001 From: kbhat1 Date: Wed, 3 Sep 2025 11:37:09 -0400 Subject: [PATCH 3/3] nano seconds --- ss/pebbledb/db.go | 48 ++++++++++++++++++++--------------------- ss/pebbledb/iterator.go | 29 +++++++++++++------------ 2 files changed, 39 insertions(+), 38 deletions(-) diff --git a/ss/pebbledb/db.go b/ss/pebbledb/db.go index 1b2e638a..df02a474 100644 --- a/ss/pebbledb/db.go +++ b/ss/pebbledb/db.go @@ -176,7 +176,7 @@ func (db *Database) Close() error { } func (db *Database) SetLatestVersion(version int64) error { - fmt.Printf("SSDEBUG - SetLatestVersion version %d\n", version) + fmt.Printf("[%d] SSDEBUG - SetLatestVersion version %d\n", time.Now().UnixNano(), version) var ts [VersionSize]byte binary.LittleEndian.PutUint64(ts[:], uint64(version)) err := db.storage.Set([]byte(latestVersionKey), ts[:], defaultWriteOpts) @@ -184,7 +184,7 @@ func (db *Database) SetLatestVersion(version int64) error { } func (db *Database) GetLatestVersion() (int64, error) { - fmt.Printf("SSDEBUG - GetLatestVersion\n") + fmt.Printf("[%d] SSDEBUG - GetLatestVersion\n", time.Now().UnixNano()) bz, closer, err := db.storage.Get([]byte(latestVersionKey)) if err != nil { if errors.Is(err, pebble.ErrNotFound) { @@ -203,7 +203,7 @@ func (db *Database) GetLatestVersion() (int64, error) { } func (db *Database) SetEarliestVersion(version int64, ignoreVersion bool) error { - fmt.Printf("SSDEBUG - SetEarliestVersion version %d ignoreVersion %t\n", version, ignoreVersion) + fmt.Printf("[%d] SSDEBUG - SetEarliestVersion version %d ignoreVersion %t\n", time.Now().UnixNano(), version, ignoreVersion) if version > db.earliestVersion || ignoreVersion { db.earliestVersion = version @@ -215,12 +215,12 @@ func (db *Database) SetEarliestVersion(version int64, ignoreVersion bool) error } func (db *Database) GetEarliestVersion() (int64, error) { - fmt.Printf("SSDEBUG - GetEarliestVersion\n") + fmt.Printf("[%d] SSDEBUG - GetEarliestVersion\n", time.Now().UnixNano()) return db.earliestVersion, nil } func (db *Database) SetLastRangeHashed(latestHashed int64) error { - fmt.Printf("SSDEBUG - SetLastRangeHashed latestHashed %d\n", latestHashed) + fmt.Printf("[%d] SSDEBUG - SetLastRangeHashed latestHashed %d\n", time.Now().UnixNano(), latestHashed) var ts [VersionSize]byte binary.LittleEndian.PutUint64(ts[:], uint64(latestHashed)) @@ -234,7 +234,7 @@ func (db *Database) SetLastRangeHashed(latestHashed int64) error { // GetLastRangeHashed returns the highest block that has been fully hashed in ranges. func (db *Database) GetLastRangeHashed() (int64, error) { - fmt.Printf("SSDEBUG - GetLastRangeHashed\n") + fmt.Printf("[%d] SSDEBUG - GetLastRangeHashed\n", time.Now().UnixNano()) // Return the cached value db.lastRangeHashedMu.RLock() cachedValue := db.lastRangeHashedCache @@ -264,13 +264,13 @@ func retrieveEarliestVersion(db *pebble.DB) (int64, error) { // SetLatestKey sets the latest key processed during migration. func (db *Database) SetLatestMigratedKey(key []byte) error { - fmt.Printf("SSDEBUG - SetLatestMigratedKey key %s (length %d)\n", string(key), len(key)) + fmt.Printf("[%d] SSDEBUG - SetLatestMigratedKey key %s (length %d)\n", time.Now().UnixNano(), string(key), len(key)) return db.storage.Set([]byte(latestMigratedKeyMetadata), key, defaultWriteOpts) } // GetLatestKey retrieves the latest key processed during migration. func (db *Database) GetLatestMigratedKey() ([]byte, error) { - fmt.Printf("SSDEBUG - GetLatestMigratedKey\n") + fmt.Printf("[%d] SSDEBUG - GetLatestMigratedKey\n", time.Now().UnixNano()) bz, closer, err := db.storage.Get([]byte(latestMigratedKeyMetadata)) if err != nil { if errors.Is(err, pebble.ErrNotFound) { @@ -284,13 +284,13 @@ func (db *Database) GetLatestMigratedKey() ([]byte, error) { // SetLatestModule sets the latest module processed during migration. func (db *Database) SetLatestMigratedModule(module string) error { - fmt.Printf("SSDEBUG - SetLatestMigratedModule module %s\n", module) + fmt.Printf("[%d] SSDEBUG - SetLatestMigratedModule module %s\n", time.Now().UnixNano(), module) return db.storage.Set([]byte(latestMigratedModuleMetadata), []byte(module), defaultWriteOpts) } // GetLatestModule retrieves the latest module processed during migration. func (db *Database) GetLatestMigratedModule() (string, error) { - fmt.Printf("SSDEBUG - GetLatestMigratedModule\n") + fmt.Printf("[%d] SSDEBUG - GetLatestMigratedModule\n", time.Now().UnixNano()) bz, closer, err := db.storage.Get([]byte(latestMigratedModuleMetadata)) if err != nil { if errors.Is(err, pebble.ErrNotFound) { @@ -303,7 +303,7 @@ func (db *Database) GetLatestMigratedModule() (string, error) { } func (db *Database) Has(storeKey string, version int64, key []byte) (bool, error) { - fmt.Printf("SSDEBUG - Has storeKey %s version %d key %s (length %d)\n", storeKey, version, string(key), len(key)) + fmt.Printf("[%d] SSDEBUG - Has storeKey %s version %d key %s (length %d)\n", time.Now().UnixNano(), storeKey, version, string(key), len(key)) if version < db.earliestVersion { return false, nil } @@ -317,7 +317,7 @@ func (db *Database) Has(storeKey string, version int64, key []byte) (bool, error } func (db *Database) Get(storeKey string, targetVersion int64, key []byte) ([]byte, error) { - fmt.Printf("SSDEBUG - Get storeKey %s targetVersion %d key %s (length %d)\n", storeKey, targetVersion, string(key), len(key)) + fmt.Printf("[%d] SSDEBUG - Get storeKey %s targetVersion %d key %s (length %d)\n", time.Now().UnixNano(), storeKey, targetVersion, string(key), len(key)) if targetVersion < db.earliestVersion { return nil, nil } @@ -358,7 +358,7 @@ func (db *Database) Get(storeKey string, targetVersion int64, key []byte) ([]byt } func (db *Database) ApplyChangeset(version int64, cs *proto.NamedChangeSet) error { - fmt.Printf("SSDEBUG - ApplyChangeset version %d store %s pairs length %d\n", version, cs.Name, len(cs.Changeset.Pairs)) + fmt.Printf("[%d] SSDEBUG - ApplyChangeset version %d store %s pairs length %d\n", time.Now().UnixNano(), version, cs.Name, len(cs.Changeset.Pairs)) // Check if version is 0 and change it to 1 // We do this specifically since keys written as part of genesis state come in as version 0 // But pebbledb treats version 0 as special, so apply the changeset at version 1 instead @@ -390,7 +390,7 @@ func (db *Database) ApplyChangeset(version int64, cs *proto.NamedChangeSet) erro } func (db *Database) ApplyChangesetAsync(version int64, changesets []*proto.NamedChangeSet) error { - fmt.Printf("SSDEBUG - ApplyChangesetAsync version %d changesets length %d\n", version, len(changesets)) + fmt.Printf("[%d] SSDEBUG - ApplyChangesetAsync version %d changesets length %d\n", time.Now().UnixNano(), version, len(changesets)) // Write to WAL first if db.streamHandler != nil { entry := proto.ChangelogEntry{ @@ -427,7 +427,7 @@ func (db *Database) ApplyChangesetAsync(version int64, changesets []*proto.Named } func (db *Database) computeMissingRanges(latestVersion int64) error { - fmt.Printf("SSDEBUG - computeMissingRanges latestVersion %d\n", latestVersion) + fmt.Printf("[%d] SSDEBUG - computeMissingRanges latestVersion %d\n", time.Now().UnixNano(), latestVersion) lastHashed, err := db.GetLastRangeHashed() if err != nil { return fmt.Errorf("failed to get last hashed range: %w", err) @@ -461,7 +461,7 @@ func (db *Database) computeMissingRanges(latestVersion int64) error { } func (db *Database) computeHashForRange(beginBlock, endBlock int64) error { - fmt.Printf("SSDEBUG - computeHashForRange beginBlock %d endBlock %d\n", beginBlock, endBlock) + fmt.Printf("[%d] SSDEBUG - computeHashForRange beginBlock %d endBlock %d\n", time.Now().UnixNano(), beginBlock, endBlock) chunkSize := endBlock - beginBlock + 1 if chunkSize <= 0 { // Nothing to do @@ -550,7 +550,7 @@ func (db *Database) writeAsyncInBackground() { // it has been updated. This occurs when that module's keys are updated in between pruning runs, the node after is restarted. // This is not a large issue given the next time that module is updated, it will be properly pruned thereafter. func (db *Database) Prune(version int64) error { - fmt.Printf("SSDEBUG - Prune version %d\n", version) + fmt.Printf("[%d] SSDEBUG - Prune version %d\n", time.Now().UnixNano(), version) earliestVersion := version + 1 // we increment by 1 to include the provided version itr, err := db.storage.NewIter(nil) @@ -663,7 +663,7 @@ func (db *Database) Iterator(storeKey string, version int64, start, end []byte) if end != nil { endStr = string(end) } - fmt.Printf("SSDEBUG - Iterator storeKey %s version %d start %s (length %d) end %s (length %d)\n", storeKey, version, startStr, len(start), endStr, len(end)) + fmt.Printf("[%d] SSDEBUG - Iterator storeKey %s version %d start %s (length %d) end %s (length %d)\n", time.Now().UnixNano(), storeKey, version, startStr, len(start), endStr, len(end)) if (start != nil && len(start) == 0) || (end != nil && len(end) == 0) { return nil, errorutils.ErrKeyEmpty } @@ -709,7 +709,7 @@ func (db *Database) ReverseIterator(storeKey string, version int64, start, end [ if end != nil { endStr = string(end) } - fmt.Printf("SSDEBUG - ReverseIterator storeKey %s version %d start %s (length %d) end %s (length %d)\n", storeKey, version, startStr, len(start), endStr, len(end)) + fmt.Printf("[%d] SSDEBUG - ReverseIterator storeKey %s version %d start %s (length %d) end %s (length %d)\n", time.Now().UnixNano(), storeKey, version, startStr, len(start), endStr, len(end)) if (start != nil && len(start) == 0) || (end != nil && len(end) == 0) { return nil, errorutils.ErrKeyEmpty } @@ -738,7 +738,7 @@ func (db *Database) ReverseIterator(storeKey string, version int64, start, end [ // Import loads the initial version of the state in parallel with numWorkers goroutines // TODO: Potentially add retries instead of panics func (db *Database) Import(version int64, ch <-chan types.SnapshotNode) error { - fmt.Printf("SSDEBUG - Import version %d\n", version) + fmt.Printf("[%d] SSDEBUG - Import version %d\n", time.Now().UnixNano(), version) var wg sync.WaitGroup worker := func() { @@ -786,7 +786,7 @@ func (db *Database) Import(version int64, ch <-chan types.SnapshotNode) error { } func (db *Database) RawImport(ch <-chan types.RawSnapshotNode) error { - fmt.Printf("SSDEBUG - RawImport\n") + fmt.Printf("[%d] SSDEBUG - RawImport\n", time.Now().UnixNano()) var wg sync.WaitGroup worker := func() { @@ -869,7 +869,7 @@ func (db *Database) RawImport(ch <-chan types.RawSnapshotNode) error { // RawIterate iterates over all keys and values for a store func (db *Database) RawIterate(storeKey string, fn func(key []byte, value []byte, version int64) bool) (bool, error) { - fmt.Printf("SSDEBUG - RawIterate storeKey %s\n", storeKey) + fmt.Printf("[%d] SSDEBUG - RawIterate storeKey %s\n", time.Now().UnixNano(), storeKey) // Iterate through all keys and values for a store lowerBound := MVCCEncode(prependStoreKey(storeKey, nil), 0) prefix := storePrefix(storeKey) @@ -928,7 +928,7 @@ func (db *Database) RawIterate(storeKey string, fn func(key []byte, value []byte } func (db *Database) DeleteKeysAtVersion(module string, version int64) error { - fmt.Printf("SSDEBUG - DeleteKeysAtVersion module %s version %d\n", module, version) + fmt.Printf("[%d] SSDEBUG - DeleteKeysAtVersion module %s version %d\n", time.Now().UnixNano(), module, version) batch, err := NewBatch(db.storage, version) if err != nil { @@ -1061,7 +1061,7 @@ func valTombstoned(value []byte) bool { // WriteBlockRangeHash writes a hash for a range of blocks to the database func (db *Database) WriteBlockRangeHash(storeKey string, beginBlockRange, endBlockRange int64, hash []byte) error { - fmt.Printf("SSDEBUG - WriteBlockRangeHash storeKey %s beginBlockRange %d endBlockRange %d hash %s (length %d)\n", storeKey, beginBlockRange, endBlockRange, string(hash), len(hash)) + fmt.Printf("[%d] SSDEBUG - WriteBlockRangeHash storeKey %s beginBlockRange %d endBlockRange %d hash %s (length %d)\n", time.Now().UnixNano(), storeKey, beginBlockRange, endBlockRange, string(hash), len(hash)) key := []byte(fmt.Sprintf(HashTpl, storeKey, beginBlockRange, endBlockRange)) err := db.storage.Set(key, hash, defaultWriteOpts) if err != nil { diff --git a/ss/pebbledb/iterator.go b/ss/pebbledb/iterator.go index 34009932..05a80d17 100644 --- a/ss/pebbledb/iterator.go +++ b/ss/pebbledb/iterator.go @@ -3,6 +3,7 @@ package pebbledb import ( "bytes" "fmt" + "time" "github.com/cockroachdb/pebble" "github.com/sei-protocol/sei-db/ss/types" @@ -36,7 +37,7 @@ func newPebbleDBIterator(src *pebble.Iterator, prefix, mvccStart, mvccEnd []byte if mvccEnd != nil { endStr = string(mvccEnd) } - fmt.Printf("SSDEBUG - newPebbleDBIterator prefix %s (length %d) mvccStart %s (length %d) mvccEnd %s (length %d) version %d earliestVersion %d reverse %t\n", string(prefix), len(prefix), startStr, len(mvccStart), endStr, len(mvccEnd), version, earliestVersion, reverse) + fmt.Printf("[%d] SSDEBUG - newPebbleDBIterator prefix %s (length %d) mvccStart %s (length %d) mvccEnd %s (length %d) version %d earliestVersion %d reverse %t\n", time.Now().UnixNano(), string(prefix), len(prefix), startStr, len(mvccStart), endStr, len(mvccEnd), version, earliestVersion, reverse) // Return invalid iterator if requested iterator height is lower than earliest version after pruning if version < earliestVersion { return &iterator{ @@ -108,12 +109,12 @@ func newPebbleDBIterator(src *pebble.Iterator, prefix, mvccStart, mvccEnd []byte // Domain returns the domain of the iterator. The caller must not modify the // return values. func (itr *iterator) Domain() ([]byte, []byte) { - fmt.Printf("SSDEBUG - iterator.Domain\n") + fmt.Printf("[%d] SSDEBUG - iterator.Domain\n", time.Now().UnixNano()) return itr.start, itr.end } func (itr *iterator) Key() []byte { - fmt.Printf("SSDEBUG - iterator.Key\n") + fmt.Printf("[%d] SSDEBUG - iterator.Key\n", time.Now().UnixNano()) itr.assertIsValid() key, _, ok := SplitMVCCKey(itr.source.Key()) @@ -125,12 +126,12 @@ func (itr *iterator) Key() []byte { keyCopy := slices.Clone(key) result := keyCopy[len(itr.prefix):] - fmt.Printf("SSDEBUG - iterator.Key returning key %s (length %d)\n", string(result), len(result)) + fmt.Printf("[%d] SSDEBUG - iterator.Key returning key %s (length %d)\n", time.Now().UnixNano(), string(result), len(result)) return result } func (itr *iterator) Value() []byte { - fmt.Printf("SSDEBUG - iterator.Value\n") + fmt.Printf("[%d] SSDEBUG - iterator.Value\n", time.Now().UnixNano()) itr.assertIsValid() val, _, ok := SplitMVCCKey(itr.source.Value()) @@ -141,12 +142,12 @@ func (itr *iterator) Value() []byte { } result := slices.Clone(val) - fmt.Printf("SSDEBUG - iterator.Value returning value %s (length %d)\n", string(result), len(result)) + fmt.Printf("[%d] SSDEBUG - iterator.Value returning value %s (length %d)\n", time.Now().UnixNano(), string(result), len(result)) return result } func (itr *iterator) nextForward() { - fmt.Printf("SSDEBUG - iterator.nextForward\n") + fmt.Printf("[%d] SSDEBUG - iterator.nextForward\n", time.Now().UnixNano()) if !itr.source.Valid() { itr.valid = false return @@ -236,7 +237,7 @@ func (itr *iterator) nextForward() { } func (itr *iterator) nextReverse() { - fmt.Printf("SSDEBUG - iterator.nextReverse\n") + fmt.Printf("[%d] SSDEBUG - iterator.nextReverse\n", time.Now().UnixNano()) if !itr.source.Valid() { itr.valid = false return @@ -306,7 +307,7 @@ func (itr *iterator) nextReverse() { } func (itr *iterator) Next() { - fmt.Printf("SSDEBUG - iterator.Next reverse %t\n", itr.reverse) + fmt.Printf("[%d] SSDEBUG - iterator.Next reverse %t\n", time.Now().UnixNano(), itr.reverse) if itr.reverse { itr.nextReverse() } else { @@ -315,7 +316,7 @@ func (itr *iterator) Next() { } func (itr *iterator) Valid() bool { - fmt.Printf("SSDEBUG - iterator.Valid\n") + fmt.Printf("[%d] SSDEBUG - iterator.Valid\n", time.Now().UnixNano()) // once invalid, forever invalid if !itr.valid || !itr.source.Valid() { itr.valid = false @@ -340,12 +341,12 @@ func (itr *iterator) Valid() bool { } func (itr *iterator) Error() error { - fmt.Printf("SSDEBUG - iterator.Error\n") + fmt.Printf("[%d] SSDEBUG - iterator.Error\n", time.Now().UnixNano()) return itr.source.Error() } func (itr *iterator) Close() error { - fmt.Printf("SSDEBUG - iterator.Close\n") + fmt.Printf("[%d] SSDEBUG - iterator.Close\n", time.Now().UnixNano()) _ = itr.source.Close() itr.source = nil itr.valid = false @@ -364,7 +365,7 @@ func (itr *iterator) assertIsValid() { // pair is tombstoned, the caller should call Next(). Note, this method assumes // the caller assures the iterator is valid first! func (itr *iterator) cursorTombstoned() bool { - fmt.Printf("SSDEBUG - iterator.cursorTombstoned\n") + fmt.Printf("[%d] SSDEBUG - iterator.cursorTombstoned\n", time.Now().UnixNano()) _, tombBz, ok := SplitMVCCKey(itr.source.Value()) if !ok { // XXX: This should not happen as that would indicate we have a malformed @@ -392,7 +393,7 @@ func (itr *iterator) cursorTombstoned() bool { } func (itr *iterator) DebugRawIterate() { - fmt.Printf("SSDEBUG - iterator.DebugRawIterate\n") + fmt.Printf("[%d] SSDEBUG - iterator.DebugRawIterate\n", time.Now().UnixNano()) valid := itr.source.Valid() if valid { // The first key may not represent the desired target version, so move the