From 46948371771091a9e00390689484cf3ca9ec55c9 Mon Sep 17 00:00:00 2001 From: superzordon Date: Thu, 8 Aug 2024 18:53:43 -0400 Subject: [PATCH 1/8] Update ancestral record routine for mempool syncing --- lib/db_utils.go | 54 +++++++++++++++++++++++++++----------- lib/event_manager.go | 10 +++++-- lib/server.go | 12 ++++----- lib/state_change_syncer.go | 51 ++++++++++++++++++++++++++++++++++- 4 files changed, 103 insertions(+), 24 deletions(-) diff --git a/lib/db_utils.go b/lib/db_utils.go index 5657e8404..13a042756 100644 --- a/lib/db_utils.go +++ b/lib/db_utils.go @@ -1107,12 +1107,23 @@ func DBSetWithTxn(txn *badger.Txn, snap *Snapshot, key []byte, value []byte, eve var ancestralValue []byte var getError error + isCoreState := isCoreStateKey(key) + // If snapshot was provided, we will need to load the current value of the record // so that we can later write it in the ancestral record. We first lookup cache. - if isState { - // We check if we've already read this key and stored it in the cache. - // Otherwise, we fetch the current value of this record from the DB. - ancestralValue, getError = DBGetWithTxn(txn, snap, key) + if isState || (isCoreState && eventManager != nil && eventManager.isMempoolManager) { + + // When we are syncing state from the mempool, we need to read the last committed view txn. + // This is because we will be querying the badger DB, and during the flush loop, every entry that is + // updated will first be deleted. In order to counteract this, we reference a badger transaction that was + // initiated before the flush loop started. + if eventManager.isMempoolManager && eventManager.lastCommittedViewTxn != nil { + ancestralValue, getError = DBGetWithTxn(eventManager.lastCommittedViewTxn, snap, key) + } else { + // We check if we've already read this key and stored it in the cache. + // Otherwise, we fetch the current value of this record from the DB. + ancestralValue, getError = DBGetWithTxn(txn, snap, key) + } // If there is some error with the DB read, other than non-existent key, we return. if getError != nil && getError != badger.ErrKeyNotFound { @@ -1120,6 +1131,7 @@ func DBSetWithTxn(txn *badger.Txn, snap *Snapshot, key []byte, value []byte, eve "from DB with key: %v", key) } } + // TODO: Do the same thing for deletes. // We update the DB record with the intended value. err := txn.Set(key, value) @@ -1183,6 +1195,8 @@ func DBGetWithTxn(txn *badger.Txn, snap *Snapshot, key []byte) ([]byte, error) { } } + fmt.Printf("Key string: %v\n", string(key)) + // If record doesn't exist in cache, we get it from the DB. item, err := txn.Get(key) if err != nil { @@ -1203,15 +1217,25 @@ func DBDeleteWithTxn(txn *badger.Txn, snap *Snapshot, key []byte, eventManager * var getError error isState := snap != nil && snap.isState(key) + isCoreState := isCoreStateKey(key) + // If snapshot was provided, we will need to load the current value of the record // so that we can later write it in the ancestral record. We first lookup cache. - if isState { - // We check if we've already read this key and stored it in the cache. - // Otherwise, we fetch the current value of this record from the DB. - ancestralValue, getError = DBGetWithTxn(txn, snap, key) - // If the key doesn't exist then there is no point in deleting this entry. - if getError == badger.ErrKeyNotFound { - return nil + if isState || (isCoreState && eventManager != nil && eventManager.isMempoolManager) { + // When we are syncing state from the mempool, we need to read the last committed view txn. + // This is because we will be querying the badger DB, and during the flush loop, every entry that is + // updated will first be deleted. In order to counteract this, we reference a badger transaction that was + // initiated before the flush loop started. + if eventManager.isMempoolManager && eventManager.lastCommittedViewTxn != nil { + ancestralValue, getError = DBGetWithTxn(eventManager.lastCommittedViewTxn, snap, key) + } else { + // We check if we've already read this key and stored it in the cache. + // Otherwise, we fetch the current value of this record from the DB. + ancestralValue, getError = DBGetWithTxn(txn, snap, key) + // If the key doesn't exist then there is no point in deleting this entry. + if getError == badger.ErrKeyNotFound { + return nil + } } // If there is some error with the DB read, other than non-existent key, we return. @@ -5305,8 +5329,8 @@ func InitDbWithDeSoGenesisBlock(params *DeSoParams, handle *badger.DB, blockHash, 0, // Height diffTarget, - BytesToBigint(ExpectedWorkForBlockHash(diffTarget)[:]), // CumWork - genesisBlock.Header, // Header + BytesToBigint(ExpectedWorkForBlockHash(diffTarget)[:]), // CumWork + genesisBlock.Header, // Header StatusHeaderValidated|StatusBlockProcessed|StatusBlockStored|StatusBlockValidated, // Status ) @@ -9489,7 +9513,7 @@ func DBGetPaginatedPostsOrderedByTime( postIndexKeys, _, err := DBGetPaginatedKeysAndValuesForPrefix( db, startPostPrefix, Prefixes.PrefixTstampNanosPostHash, /*validForPrefix*/ len(Prefixes.PrefixTstampNanosPostHash)+len(maxUint64Tstamp)+HashSizeBytes, /*keyLen*/ - numToFetch, reverse /*reverse*/, false /*fetchValues*/) + numToFetch, reverse /*reverse*/, false /*fetchValues*/) if err != nil { return nil, nil, nil, fmt.Errorf("DBGetPaginatedPostsOrderedByTime: %v", err) } @@ -9616,7 +9640,7 @@ func DBGetPaginatedProfilesByDeSoLocked( profileIndexKeys, _, err := DBGetPaginatedKeysAndValuesForPrefix( db, startProfilePrefix, Prefixes.PrefixCreatorDeSoLockedNanosCreatorPKID, /*validForPrefix*/ keyLen /*keyLen*/, numToFetch, - true /*reverse*/, false /*fetchValues*/) + true /*reverse*/, false /*fetchValues*/) if err != nil { return nil, nil, fmt.Errorf("DBGetPaginatedProfilesByDeSoLocked: %v", err) } diff --git a/lib/event_manager.go b/lib/event_manager.go index 4584b0101..486f7390e 100644 --- a/lib/event_manager.go +++ b/lib/event_manager.go @@ -1,6 +1,9 @@ package lib -import "github.com/google/uuid" +import ( + "github.com/dgraph-io/badger/v3" + "github.com/google/uuid" +) type TransactionEventFunc func(event *TransactionEvent) type StateSyncerOperationEventFunc func(event *StateSyncerOperationEvent) @@ -59,7 +62,10 @@ type EventManager struct { blockCommittedHandlers []BlockEventFunc blockAcceptedHandlers []BlockEventFunc snapshotCompletedHandlers []SnapshotCompletedEventFunc - isMempoolManager bool + // A transaction used by the state syncer mempool routine to reference the state of the badger db + // prior to flushing mempool transactions. This represents the last committed view of the db. + lastCommittedViewTxn *badger.Txn + isMempoolManager bool } func NewEventManager() *EventManager { diff --git a/lib/server.go b/lib/server.go index d54f56d1e..bdd1fc098 100644 --- a/lib/server.go +++ b/lib/server.go @@ -165,7 +165,7 @@ type Server struct { // It can be used to find computational bottlenecks. timer *Timer - stateChangeSyncer *StateChangeSyncer + StateChangeSyncer *StateChangeSyncer // DbMutex protects the badger database from concurrent access when it's being closed & re-opened. // This is necessary because the database is closed & re-opened when the node finishes hypersyncing in order // to change the database options from Default options to Performance options. @@ -484,7 +484,7 @@ func NewServer( } if stateChangeSyncer != nil { - srv.stateChangeSyncer = stateChangeSyncer + srv.StateChangeSyncer = stateChangeSyncer } // The same timesource is used in the chain data structure and in the connection @@ -556,8 +556,8 @@ func NewServer( _connectIps, _targetOutboundPeers, _maxInboundPeers, _limitOneInboundConnectionPerIP, _peerConnectionRefreshIntervalMillis, _minFeeRateNanosPerKB, nodeServices) - if srv.stateChangeSyncer != nil { - srv.stateChangeSyncer.BlockHeight = uint64(_chain.headerTip().Height) + if srv.StateChangeSyncer != nil { + srv.StateChangeSyncer.BlockHeight = uint64(_chain.headerTip().Height) } // Create a mempool to store transactions until they're ready to be mined into @@ -3292,8 +3292,8 @@ func (srv *Server) Start() { } // Initialize state syncer mempool job, if needed. - if srv.stateChangeSyncer != nil { - srv.stateChangeSyncer.StartMempoolSyncRoutine(srv) + if srv.StateChangeSyncer != nil { + srv.StateChangeSyncer.StartMempoolSyncRoutine(srv) } // Start the network manager's internal event loop to open and close connections to peers. diff --git a/lib/state_change_syncer.go b/lib/state_change_syncer.go index 7498690b7..45a9bb38f 100644 --- a/lib/state_change_syncer.go +++ b/lib/state_change_syncer.go @@ -178,6 +178,9 @@ func (stateChangeEntry *StateChangeEntry) RawDecodeWithoutMetadata(blockHeight u return errors.Wrapf(err, "StateChangeEntry.RawDecodeWithoutMetadata: error decoding ancestral record") } + // Store the ancestral record bytes. + stateChangeEntry.AncestralRecordBytes = EncodeToBytes(blockHeight, ancestralRecord) + // Decode the flush UUID. flushIdBytes := make([]byte, 16) _, err = rr.Read(flushIdBytes) @@ -294,6 +297,9 @@ type StateChangeSyncer struct { BlocksyncCompleteEntriesFlushed bool MempoolTxnSyncLimit uint64 + + PauseMempoolSync bool + PauseBlocksync bool } // Open a file, create if it doesn't exist. @@ -390,6 +396,16 @@ func (stateChangeSyncer *StateChangeSyncer) Reset() { // It also writes the offset of the entry in the file to a separate index file, such that a consumer can look up a // particular entry index in the state change file. func (stateChangeSyncer *StateChangeSyncer) _handleStateSyncerOperation(event *StateSyncerOperationEvent) { + for stateChangeSyncer.PauseBlocksync && !event.IsMempoolTxn { + time.Sleep(1 * time.Second) + fmt.Printf("Pausing blocksync\n") + } + + for stateChangeSyncer.PauseMempoolSync && event.IsMempoolTxn { + time.Sleep(1 * time.Second) + fmt.Printf("Pausing mempool sync\n") + } + // If we're in blocksync mode, we only want to flush entries once the sync is complete. if !stateChangeSyncer.BlocksyncCompleteEntriesFlushed && stateChangeSyncer.SyncType == NodeSyncTypeBlockSync { return @@ -415,7 +431,7 @@ func (stateChangeSyncer *StateChangeSyncer) _handleStateSyncerOperation(event *S if event.IsMempoolTxn { // Set the flushId to the mempool flush ID. - //flushId = stateChangeSyncer.BlockSyncFlushI + //flushId = StateChangeSyncer.BlockSyncFlushI // If the event flush ID is nil, then we need to use the global mempool flush ID. if flushId == uuid.Nil { @@ -445,6 +461,22 @@ func (stateChangeSyncer *StateChangeSyncer) _handleStateSyncerOperation(event *S } encoderType = encoder.GetEncoderType() + + //if encoderType == EncoderTypeProfileEntry { + // fmt.Printf("\nHandling profile entry\n") + // // Decode the profile entry to get the public key. + // profileEntry := &ProfileEntry{} + // _, err := DecodeFromBytes(profileEntry, bytes.NewReader(event.StateChangeEntry.EncoderBytes)) + // if err != nil { + // fmt.Printf("Error decoding profile entry: %v\n", err) + // return + // } + // fmt.Printf("\n\nHere is the event: %+v\n", event) + // fmt.Printf("Here is the profile entry: %v: %+v\n", string(profileEntry.Username[:]), profileEntry) + // fmt.Printf("Here is the ancestral record: %+v\n", event.StateChangeEntry.AncestralRecord) + // fmt.Printf("Here is the ancestral record bytes: %+v\n", event.StateChangeEntry.AncestralRecordBytes) + //} + } else { // If the value associated with the key is not an encoder, then we decode the encoder entirely from the key bytes. // Examples of this are FollowEntry, LikeEntry, DeSoBalanceEntry, etc. @@ -506,6 +538,16 @@ func (stateChangeSyncer *StateChangeSyncer) _handleStateSyncerOperation(event *S // _handleStateSyncerFlush is called when a Badger db flush takes place. It calls a helper function that takes the bytes that // have been cached on the StateChangeSyncer and writes them to the state change file. func (stateChangeSyncer *StateChangeSyncer) _handleStateSyncerFlush(event *StateSyncerFlushedEvent) { + for stateChangeSyncer.PauseBlocksync && !event.IsMempoolFlush { + time.Sleep(1 * time.Second) + fmt.Printf("Pausing blocksync\n") + } + + for stateChangeSyncer.PauseMempoolSync && event.IsMempoolFlush { + time.Sleep(1 * time.Second) + fmt.Printf("Pausing mempool sync\n") + } + stateChangeSyncer.StateSyncerMutex.Lock() defer stateChangeSyncer.StateSyncerMutex.Unlock() @@ -813,6 +855,12 @@ func (stateChangeSyncer *StateChangeSyncer) SyncMempoolToStateSyncer(server *Ser // more than once in the mempool transactions. txn := server.blockchain.db.NewTransaction(true) defer txn.Discard() + + // Create a read-only view of the badger DB prior to the mempool flush. This view will be used to get the ancestral + // records of entries that are being modified in the mempool. + mempoolEventManager.lastCommittedViewTxn = server.blockchain.db.NewTransaction(false) + defer mempoolEventManager.lastCommittedViewTxn.Discard() + glog.V(2).Infof("Time since mempool sync start: %v", time.Since(startTime)) startTime = time.Now() err = mempoolUtxoView.FlushToDbWithTxn(txn, uint64(server.blockchain.bestChain[len(server.blockchain.bestChain)-1].Height)) @@ -1033,6 +1081,7 @@ func (stateChangeSyncer *StateChangeSyncer) StartMempoolSyncRoutine(server *Serv // Sleep for a short while to avoid a tight loop. time.Sleep(100 * time.Millisecond) var err error + // If the mempool is not empty, sync the mempool to the state syncer. mempoolClosed, err = stateChangeSyncer.SyncMempoolToStateSyncer(server) if err != nil { From 338bebd29d2545608d5f6023818652ef48d582bb Mon Sep 17 00:00:00 2001 From: superzordon Date: Thu, 8 Aug 2024 18:56:11 -0400 Subject: [PATCH 2/8] Remove unused code --- lib/db_utils.go | 11 ++++------- lib/state_change_syncer.go | 40 +------------------------------------- 2 files changed, 5 insertions(+), 46 deletions(-) diff --git a/lib/db_utils.go b/lib/db_utils.go index 13a042756..e14051ce7 100644 --- a/lib/db_utils.go +++ b/lib/db_utils.go @@ -1131,7 +1131,6 @@ func DBSetWithTxn(txn *badger.Txn, snap *Snapshot, key []byte, value []byte, eve "from DB with key: %v", key) } } - // TODO: Do the same thing for deletes. // We update the DB record with the intended value. err := txn.Set(key, value) @@ -1195,8 +1194,6 @@ func DBGetWithTxn(txn *badger.Txn, snap *Snapshot, key []byte) ([]byte, error) { } } - fmt.Printf("Key string: %v\n", string(key)) - // If record doesn't exist in cache, we get it from the DB. item, err := txn.Get(key) if err != nil { @@ -5329,8 +5326,8 @@ func InitDbWithDeSoGenesisBlock(params *DeSoParams, handle *badger.DB, blockHash, 0, // Height diffTarget, - BytesToBigint(ExpectedWorkForBlockHash(diffTarget)[:]), // CumWork - genesisBlock.Header, // Header + BytesToBigint(ExpectedWorkForBlockHash(diffTarget)[:]), // CumWork + genesisBlock.Header, // Header StatusHeaderValidated|StatusBlockProcessed|StatusBlockStored|StatusBlockValidated, // Status ) @@ -9513,7 +9510,7 @@ func DBGetPaginatedPostsOrderedByTime( postIndexKeys, _, err := DBGetPaginatedKeysAndValuesForPrefix( db, startPostPrefix, Prefixes.PrefixTstampNanosPostHash, /*validForPrefix*/ len(Prefixes.PrefixTstampNanosPostHash)+len(maxUint64Tstamp)+HashSizeBytes, /*keyLen*/ - numToFetch, reverse /*reverse*/, false /*fetchValues*/) + numToFetch, reverse /*reverse*/, false /*fetchValues*/) if err != nil { return nil, nil, nil, fmt.Errorf("DBGetPaginatedPostsOrderedByTime: %v", err) } @@ -9640,7 +9637,7 @@ func DBGetPaginatedProfilesByDeSoLocked( profileIndexKeys, _, err := DBGetPaginatedKeysAndValuesForPrefix( db, startProfilePrefix, Prefixes.PrefixCreatorDeSoLockedNanosCreatorPKID, /*validForPrefix*/ keyLen /*keyLen*/, numToFetch, - true /*reverse*/, false /*fetchValues*/) + true /*reverse*/, false /*fetchValues*/) if err != nil { return nil, nil, fmt.Errorf("DBGetPaginatedProfilesByDeSoLocked: %v", err) } diff --git a/lib/state_change_syncer.go b/lib/state_change_syncer.go index 45a9bb38f..45a80679e 100644 --- a/lib/state_change_syncer.go +++ b/lib/state_change_syncer.go @@ -297,9 +297,6 @@ type StateChangeSyncer struct { BlocksyncCompleteEntriesFlushed bool MempoolTxnSyncLimit uint64 - - PauseMempoolSync bool - PauseBlocksync bool } // Open a file, create if it doesn't exist. @@ -396,16 +393,6 @@ func (stateChangeSyncer *StateChangeSyncer) Reset() { // It also writes the offset of the entry in the file to a separate index file, such that a consumer can look up a // particular entry index in the state change file. func (stateChangeSyncer *StateChangeSyncer) _handleStateSyncerOperation(event *StateSyncerOperationEvent) { - for stateChangeSyncer.PauseBlocksync && !event.IsMempoolTxn { - time.Sleep(1 * time.Second) - fmt.Printf("Pausing blocksync\n") - } - - for stateChangeSyncer.PauseMempoolSync && event.IsMempoolTxn { - time.Sleep(1 * time.Second) - fmt.Printf("Pausing mempool sync\n") - } - // If we're in blocksync mode, we only want to flush entries once the sync is complete. if !stateChangeSyncer.BlocksyncCompleteEntriesFlushed && stateChangeSyncer.SyncType == NodeSyncTypeBlockSync { return @@ -461,22 +448,7 @@ func (stateChangeSyncer *StateChangeSyncer) _handleStateSyncerOperation(event *S } encoderType = encoder.GetEncoderType() - - //if encoderType == EncoderTypeProfileEntry { - // fmt.Printf("\nHandling profile entry\n") - // // Decode the profile entry to get the public key. - // profileEntry := &ProfileEntry{} - // _, err := DecodeFromBytes(profileEntry, bytes.NewReader(event.StateChangeEntry.EncoderBytes)) - // if err != nil { - // fmt.Printf("Error decoding profile entry: %v\n", err) - // return - // } - // fmt.Printf("\n\nHere is the event: %+v\n", event) - // fmt.Printf("Here is the profile entry: %v: %+v\n", string(profileEntry.Username[:]), profileEntry) - // fmt.Printf("Here is the ancestral record: %+v\n", event.StateChangeEntry.AncestralRecord) - // fmt.Printf("Here is the ancestral record bytes: %+v\n", event.StateChangeEntry.AncestralRecordBytes) - //} - + } else { // If the value associated with the key is not an encoder, then we decode the encoder entirely from the key bytes. // Examples of this are FollowEntry, LikeEntry, DeSoBalanceEntry, etc. @@ -538,16 +510,6 @@ func (stateChangeSyncer *StateChangeSyncer) _handleStateSyncerOperation(event *S // _handleStateSyncerFlush is called when a Badger db flush takes place. It calls a helper function that takes the bytes that // have been cached on the StateChangeSyncer and writes them to the state change file. func (stateChangeSyncer *StateChangeSyncer) _handleStateSyncerFlush(event *StateSyncerFlushedEvent) { - for stateChangeSyncer.PauseBlocksync && !event.IsMempoolFlush { - time.Sleep(1 * time.Second) - fmt.Printf("Pausing blocksync\n") - } - - for stateChangeSyncer.PauseMempoolSync && event.IsMempoolFlush { - time.Sleep(1 * time.Second) - fmt.Printf("Pausing mempool sync\n") - } - stateChangeSyncer.StateSyncerMutex.Lock() defer stateChangeSyncer.StateSyncerMutex.Unlock() From bd85e3efbe56731e1318b668909411b8cf05a836 Mon Sep 17 00:00:00 2001 From: superzordon Date: Thu, 8 Aug 2024 18:58:31 -0400 Subject: [PATCH 3/8] Remove test utils change --- lib/state_change_syncer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/state_change_syncer.go b/lib/state_change_syncer.go index 45a80679e..85e9faf1e 100644 --- a/lib/state_change_syncer.go +++ b/lib/state_change_syncer.go @@ -448,7 +448,7 @@ func (stateChangeSyncer *StateChangeSyncer) _handleStateSyncerOperation(event *S } encoderType = encoder.GetEncoderType() - + } else { // If the value associated with the key is not an encoder, then we decode the encoder entirely from the key bytes. // Examples of this are FollowEntry, LikeEntry, DeSoBalanceEntry, etc. From e40536480af3f73f8c88cf8470b1dd2cb3cbc214 Mon Sep 17 00:00:00 2001 From: superzordon Date: Thu, 8 Aug 2024 19:28:16 -0400 Subject: [PATCH 4/8] Fix broken check --- lib/db_utils.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/lib/db_utils.go b/lib/db_utils.go index e14051ce7..153e9db24 100644 --- a/lib/db_utils.go +++ b/lib/db_utils.go @@ -1117,7 +1117,7 @@ func DBSetWithTxn(txn *badger.Txn, snap *Snapshot, key []byte, value []byte, eve // This is because we will be querying the badger DB, and during the flush loop, every entry that is // updated will first be deleted. In order to counteract this, we reference a badger transaction that was // initiated before the flush loop started. - if eventManager.isMempoolManager && eventManager.lastCommittedViewTxn != nil { + if eventManager != nil && eventManager.isMempoolManager && eventManager.lastCommittedViewTxn != nil { ancestralValue, getError = DBGetWithTxn(eventManager.lastCommittedViewTxn, snap, key) } else { // We check if we've already read this key and stored it in the cache. @@ -1223,7 +1223,7 @@ func DBDeleteWithTxn(txn *badger.Txn, snap *Snapshot, key []byte, eventManager * // This is because we will be querying the badger DB, and during the flush loop, every entry that is // updated will first be deleted. In order to counteract this, we reference a badger transaction that was // initiated before the flush loop started. - if eventManager.isMempoolManager && eventManager.lastCommittedViewTxn != nil { + if eventManager != nil && eventManager.isMempoolManager && eventManager.lastCommittedViewTxn != nil { ancestralValue, getError = DBGetWithTxn(eventManager.lastCommittedViewTxn, snap, key) } else { // We check if we've already read this key and stored it in the cache. @@ -5326,8 +5326,8 @@ func InitDbWithDeSoGenesisBlock(params *DeSoParams, handle *badger.DB, blockHash, 0, // Height diffTarget, - BytesToBigint(ExpectedWorkForBlockHash(diffTarget)[:]), // CumWork - genesisBlock.Header, // Header + BytesToBigint(ExpectedWorkForBlockHash(diffTarget)[:]), // CumWork + genesisBlock.Header, // Header StatusHeaderValidated|StatusBlockProcessed|StatusBlockStored|StatusBlockValidated, // Status ) @@ -9510,7 +9510,7 @@ func DBGetPaginatedPostsOrderedByTime( postIndexKeys, _, err := DBGetPaginatedKeysAndValuesForPrefix( db, startPostPrefix, Prefixes.PrefixTstampNanosPostHash, /*validForPrefix*/ len(Prefixes.PrefixTstampNanosPostHash)+len(maxUint64Tstamp)+HashSizeBytes, /*keyLen*/ - numToFetch, reverse /*reverse*/, false /*fetchValues*/) + numToFetch, reverse /*reverse*/, false /*fetchValues*/) if err != nil { return nil, nil, nil, fmt.Errorf("DBGetPaginatedPostsOrderedByTime: %v", err) } @@ -9637,7 +9637,7 @@ func DBGetPaginatedProfilesByDeSoLocked( profileIndexKeys, _, err := DBGetPaginatedKeysAndValuesForPrefix( db, startProfilePrefix, Prefixes.PrefixCreatorDeSoLockedNanosCreatorPKID, /*validForPrefix*/ keyLen /*keyLen*/, numToFetch, - true /*reverse*/, false /*fetchValues*/) + true /*reverse*/, false /*fetchValues*/) if err != nil { return nil, nil, fmt.Errorf("DBGetPaginatedProfilesByDeSoLocked: %v", err) } From 522761855258bf0ce002a06bb2e04d6764fd7c98 Mon Sep 17 00:00:00 2001 From: superzordon Date: Thu, 8 Aug 2024 19:31:40 -0400 Subject: [PATCH 5/8] Go fmt --- lib/db_utils.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/lib/db_utils.go b/lib/db_utils.go index 153e9db24..571e4ce60 100644 --- a/lib/db_utils.go +++ b/lib/db_utils.go @@ -5326,8 +5326,8 @@ func InitDbWithDeSoGenesisBlock(params *DeSoParams, handle *badger.DB, blockHash, 0, // Height diffTarget, - BytesToBigint(ExpectedWorkForBlockHash(diffTarget)[:]), // CumWork - genesisBlock.Header, // Header + BytesToBigint(ExpectedWorkForBlockHash(diffTarget)[:]), // CumWork + genesisBlock.Header, // Header StatusHeaderValidated|StatusBlockProcessed|StatusBlockStored|StatusBlockValidated, // Status ) @@ -9510,7 +9510,7 @@ func DBGetPaginatedPostsOrderedByTime( postIndexKeys, _, err := DBGetPaginatedKeysAndValuesForPrefix( db, startPostPrefix, Prefixes.PrefixTstampNanosPostHash, /*validForPrefix*/ len(Prefixes.PrefixTstampNanosPostHash)+len(maxUint64Tstamp)+HashSizeBytes, /*keyLen*/ - numToFetch, reverse /*reverse*/, false /*fetchValues*/) + numToFetch, reverse /*reverse*/, false /*fetchValues*/) if err != nil { return nil, nil, nil, fmt.Errorf("DBGetPaginatedPostsOrderedByTime: %v", err) } @@ -9637,7 +9637,7 @@ func DBGetPaginatedProfilesByDeSoLocked( profileIndexKeys, _, err := DBGetPaginatedKeysAndValuesForPrefix( db, startProfilePrefix, Prefixes.PrefixCreatorDeSoLockedNanosCreatorPKID, /*validForPrefix*/ keyLen /*keyLen*/, numToFetch, - true /*reverse*/, false /*fetchValues*/) + true /*reverse*/, false /*fetchValues*/) if err != nil { return nil, nil, fmt.Errorf("DBGetPaginatedProfilesByDeSoLocked: %v", err) } From 515221046088045254862460177b5c6a0e3400f5 Mon Sep 17 00:00:00 2001 From: superzordon Date: Fri, 9 Aug 2024 15:45:14 -0400 Subject: [PATCH 6/8] Fix delete errors --- lib/db_utils.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/lib/db_utils.go b/lib/db_utils.go index 571e4ce60..869b3b95a 100644 --- a/lib/db_utils.go +++ b/lib/db_utils.go @@ -1243,7 +1243,10 @@ func DBDeleteWithTxn(txn *badger.Txn, snap *Snapshot, key []byte, eventManager * } err := txn.Delete(key) - if err != nil { + if err != nil && err == badger.ErrKeyNotFound && eventManager != nil && eventManager.isMempoolManager { + // If the key doesn't exist then there is no point in deleting this entry. + return nil + } else if err != nil { return errors.Wrapf(err, "DBDeleteWithTxn: Problem deleting record "+ "from DB with key: %v", key) } From 981305a139cbdc6082761e8ce353ff71c85de0ba Mon Sep 17 00:00:00 2001 From: superzordon Date: Fri, 9 Aug 2024 16:10:45 -0400 Subject: [PATCH 7/8] Fix mempool sync error --- lib/db_utils.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/db_utils.go b/lib/db_utils.go index 869b3b95a..e5b3e0b52 100644 --- a/lib/db_utils.go +++ b/lib/db_utils.go @@ -1236,7 +1236,7 @@ func DBDeleteWithTxn(txn *badger.Txn, snap *Snapshot, key []byte, eventManager * } // If there is some error with the DB read, other than non-existent key, we return. - if getError != nil { + if getError != nil && getError != badger.ErrKeyNotFound { return errors.Wrapf(getError, "DBDeleteWithTxn: problem checking for DB record "+ "with key: %v", key) } From ee96f77593b86463da15d42d4c1c610363824a83 Mon Sep 17 00:00:00 2001 From: superzordon Date: Fri, 16 Aug 2024 14:04:33 -0400 Subject: [PATCH 8/8] Fix state syncer ancestral record decode --- cmd/config.go | 2 ++ cmd/node.go | 2 +- integration_testing/tools.go | 1 + lib/db_utils.go | 2 +- lib/state_change_syncer.go | 18 ++++++++++++++---- 5 files changed, 19 insertions(+), 6 deletions(-) diff --git a/cmd/config.go b/cmd/config.go index 77ce94606..220ef7dbb 100644 --- a/cmd/config.go +++ b/cmd/config.go @@ -79,6 +79,7 @@ type Config struct { LogDBSummarySnapshots bool DatadogProfiler bool TimeEvents bool + LogToStdErr bool // State Syncer StateChangeDir string @@ -192,6 +193,7 @@ func LoadConfig() *Config { config.LogDBSummarySnapshots = viper.GetBool("log-db-summary-snapshots") config.DatadogProfiler = viper.GetBool("datadog-profiler") config.TimeEvents = viper.GetBool("time-events") + config.LogToStdErr = true // State Syncer config.StateChangeDir = viper.GetString("state-change-dir") diff --git a/cmd/node.go b/cmd/node.go index bed017139..45518d297 100644 --- a/cmd/node.go +++ b/cmd/node.go @@ -66,7 +66,7 @@ func (node *Node) Start(exitChannels ...*chan struct{}) { flag.Set("log_dir", node.Config.LogDirectory) flag.Set("v", fmt.Sprintf("%d", node.Config.GlogV)) flag.Set("vmodule", node.Config.GlogVmodule) - flag.Set("alsologtostderr", "true") + flag.Set("alsologtostderr", fmt.Sprintf("%t", node.Config.LogToStdErr)) flag.Parse() glog.CopyStandardLogTo("INFO") node.runningMutex.Lock() diff --git a/integration_testing/tools.go b/integration_testing/tools.go index 94617bd18..613a93947 100644 --- a/integration_testing/tools.go +++ b/integration_testing/tools.go @@ -82,6 +82,7 @@ func _generateConfig(t *testing.T, config *cmd.Config, port uint32, dataDir stri config.ConnectIPs = []string{} config.PrivateMode = true config.GlogV = 0 + config.LogToStdErr = true config.GlogVmodule = "*bitcoin_manager*=0,*balance*=0,*view*=0,*frontend*=0,*peer*=0,*addr*=0,*network*=0,*utils*=0,*connection*=0,*main*=0,*server*=0,*mempool*=0,*miner*=0,*blockchain*=0" config.MaxInboundPeers = maxPeers config.TargetOutboundPeers = maxPeers diff --git a/lib/db_utils.go b/lib/db_utils.go index e5b3e0b52..8263881d9 100644 --- a/lib/db_utils.go +++ b/lib/db_utils.go @@ -1118,7 +1118,7 @@ func DBSetWithTxn(txn *badger.Txn, snap *Snapshot, key []byte, value []byte, eve // updated will first be deleted. In order to counteract this, we reference a badger transaction that was // initiated before the flush loop started. if eventManager != nil && eventManager.isMempoolManager && eventManager.lastCommittedViewTxn != nil { - ancestralValue, getError = DBGetWithTxn(eventManager.lastCommittedViewTxn, snap, key) + ancestralValue, getError = DBGetWithTxn(eventManager.lastCommittedViewTxn, nil, key) } else { // We check if we've already read this key and stored it in the cache. // Otherwise, we fetch the current value of this record from the DB. diff --git a/lib/state_change_syncer.go b/lib/state_change_syncer.go index 85e9faf1e..080c31a14 100644 --- a/lib/state_change_syncer.go +++ b/lib/state_change_syncer.go @@ -174,13 +174,13 @@ func (stateChangeEntry *StateChangeEntry) RawDecodeWithoutMetadata(blockHeight u ancestralRecord := stateChangeEntry.EncoderType.New() if exist, err := DecodeFromBytes(ancestralRecord, rr); exist && err == nil { stateChangeEntry.AncestralRecord = ancestralRecord + stateChangeEntry.AncestralRecordBytes = EncodeToBytes(blockHeight, ancestralRecord) } else if err != nil { return errors.Wrapf(err, "StateChangeEntry.RawDecodeWithoutMetadata: error decoding ancestral record") + } else { + stateChangeEntry.AncestralRecordBytes = EncodeToBytes(blockHeight, nil) } - // Store the ancestral record bytes. - stateChangeEntry.AncestralRecordBytes = EncodeToBytes(blockHeight, ancestralRecord) - // Decode the flush UUID. flushIdBytes := make([]byte, 16) _, err = rr.Read(flushIdBytes) @@ -459,8 +459,18 @@ func (stateChangeSyncer *StateChangeSyncer) _handleStateSyncerOperation(event *S encoderType = keyEncoder.GetEncoderType() stateChangeEntry.Encoder = keyEncoder stateChangeEntry.EncoderBytes = nil - } + if stateChangeEntry.AncestralRecordBytes != nil && len(stateChangeEntry.AncestralRecordBytes) > 0 { + // Decode the ancestral record. + ancestralRecord, err := DecodeStateKey(stateChangeEntry.KeyBytes, stateChangeEntry.AncestralRecordBytes) + if err != nil { + glog.Fatalf("Server._handleStateSyncerOperation: Error decoding ancestral record: %v", err) + } + stateChangeEntry.AncestralRecord = ancestralRecord + stateChangeEntry.AncestralRecordBytes = nil + } + } + // Set the encoder type. stateChangeEntry.EncoderType = encoderType