Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cmd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ type Config struct {
LogDBSummarySnapshots bool
DatadogProfiler bool
TimeEvents bool
LogToStdErr bool

// State Syncer
StateChangeDir string
Expand Down Expand Up @@ -192,6 +193,7 @@ func LoadConfig() *Config {
config.LogDBSummarySnapshots = viper.GetBool("log-db-summary-snapshots")
config.DatadogProfiler = viper.GetBool("datadog-profiler")
config.TimeEvents = viper.GetBool("time-events")
config.LogToStdErr = true

// State Syncer
config.StateChangeDir = viper.GetString("state-change-dir")
Expand Down
2 changes: 1 addition & 1 deletion cmd/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (node *Node) Start(exitChannels ...*chan struct{}) {
flag.Set("log_dir", node.Config.LogDirectory)
flag.Set("v", fmt.Sprintf("%d", node.Config.GlogV))
flag.Set("vmodule", node.Config.GlogVmodule)
flag.Set("alsologtostderr", "true")
flag.Set("alsologtostderr", fmt.Sprintf("%t", node.Config.LogToStdErr))
flag.Parse()
glog.CopyStandardLogTo("INFO")
node.runningMutex.Lock()
Expand Down
1 change: 1 addition & 0 deletions integration_testing/tools.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ func _generateConfig(t *testing.T, config *cmd.Config, port uint32, dataDir stri
config.ConnectIPs = []string{}
config.PrivateMode = true
config.GlogV = 0
config.LogToStdErr = true
config.GlogVmodule = "*bitcoin_manager*=0,*balance*=0,*view*=0,*frontend*=0,*peer*=0,*addr*=0,*network*=0,*utils*=0,*connection*=0,*main*=0,*server*=0,*mempool*=0,*miner*=0,*blockchain*=0"
config.MaxInboundPeers = maxPeers
config.TargetOutboundPeers = maxPeers
Expand Down
50 changes: 37 additions & 13 deletions lib/db_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -1107,12 +1107,23 @@ func DBSetWithTxn(txn *badger.Txn, snap *Snapshot, key []byte, value []byte, eve
var ancestralValue []byte
var getError error

isCoreState := isCoreStateKey(key)

// If snapshot was provided, we will need to load the current value of the record
// so that we can later write it in the ancestral record. We first lookup cache.
if isState {
// We check if we've already read this key and stored it in the cache.
// Otherwise, we fetch the current value of this record from the DB.
ancestralValue, getError = DBGetWithTxn(txn, snap, key)
if isState || (isCoreState && eventManager != nil && eventManager.isMempoolManager) {

// When we are syncing state from the mempool, we need to read the last committed view txn.
// This is because we will be querying the badger DB, and during the flush loop, every entry that is
// updated will first be deleted. In order to counteract this, we reference a badger transaction that was
// initiated before the flush loop started.
if eventManager != nil && eventManager.isMempoolManager && eventManager.lastCommittedViewTxn != nil {
ancestralValue, getError = DBGetWithTxn(eventManager.lastCommittedViewTxn, nil, key)
} else {
// We check if we've already read this key and stored it in the cache.
// Otherwise, we fetch the current value of this record from the DB.
ancestralValue, getError = DBGetWithTxn(txn, snap, key)
}

// If there is some error with the DB read, other than non-existent key, we return.
if getError != nil && getError != badger.ErrKeyNotFound {
Expand Down Expand Up @@ -1203,26 +1214,39 @@ func DBDeleteWithTxn(txn *badger.Txn, snap *Snapshot, key []byte, eventManager *
var getError error
isState := snap != nil && snap.isState(key)

isCoreState := isCoreStateKey(key)

// If snapshot was provided, we will need to load the current value of the record
// so that we can later write it in the ancestral record. We first lookup cache.
if isState {
// We check if we've already read this key and stored it in the cache.
// Otherwise, we fetch the current value of this record from the DB.
ancestralValue, getError = DBGetWithTxn(txn, snap, key)
// If the key doesn't exist then there is no point in deleting this entry.
if getError == badger.ErrKeyNotFound {
return nil
if isState || (isCoreState && eventManager != nil && eventManager.isMempoolManager) {
// When we are syncing state from the mempool, we need to read the last committed view txn.
// This is because we will be querying the badger DB, and during the flush loop, every entry that is
// updated will first be deleted. In order to counteract this, we reference a badger transaction that was
// initiated before the flush loop started.
if eventManager != nil && eventManager.isMempoolManager && eventManager.lastCommittedViewTxn != nil {
ancestralValue, getError = DBGetWithTxn(eventManager.lastCommittedViewTxn, snap, key)
} else {
// We check if we've already read this key and stored it in the cache.
// Otherwise, we fetch the current value of this record from the DB.
ancestralValue, getError = DBGetWithTxn(txn, snap, key)
// If the key doesn't exist then there is no point in deleting this entry.
if getError == badger.ErrKeyNotFound {
return nil
}
}

// If there is some error with the DB read, other than non-existent key, we return.
if getError != nil {
if getError != nil && getError != badger.ErrKeyNotFound {
return errors.Wrapf(getError, "DBDeleteWithTxn: problem checking for DB record "+
"with key: %v", key)
}
}

err := txn.Delete(key)
if err != nil {
if err != nil && err == badger.ErrKeyNotFound && eventManager != nil && eventManager.isMempoolManager {
// If the key doesn't exist then there is no point in deleting this entry.
return nil
} else if err != nil {
return errors.Wrapf(err, "DBDeleteWithTxn: Problem deleting record "+
"from DB with key: %v", key)
}
Expand Down
10 changes: 8 additions & 2 deletions lib/event_manager.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package lib

import "github.com/google/uuid"
import (
"github.com/dgraph-io/badger/v3"
"github.com/google/uuid"
)

type TransactionEventFunc func(event *TransactionEvent)
type StateSyncerOperationEventFunc func(event *StateSyncerOperationEvent)
Expand Down Expand Up @@ -59,7 +62,10 @@ type EventManager struct {
blockCommittedHandlers []BlockEventFunc
blockAcceptedHandlers []BlockEventFunc
snapshotCompletedHandlers []SnapshotCompletedEventFunc
isMempoolManager bool
// A transaction used by the state syncer mempool routine to reference the state of the badger db
// prior to flushing mempool transactions. This represents the last committed view of the db.
lastCommittedViewTxn *badger.Txn
isMempoolManager bool
}

func NewEventManager() *EventManager {
Expand Down
12 changes: 6 additions & 6 deletions lib/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ type Server struct {
// It can be used to find computational bottlenecks.
timer *Timer

stateChangeSyncer *StateChangeSyncer
StateChangeSyncer *StateChangeSyncer
// DbMutex protects the badger database from concurrent access when it's being closed & re-opened.
// This is necessary because the database is closed & re-opened when the node finishes hypersyncing in order
// to change the database options from Default options to Performance options.
Expand Down Expand Up @@ -484,7 +484,7 @@ func NewServer(
}

if stateChangeSyncer != nil {
srv.stateChangeSyncer = stateChangeSyncer
srv.StateChangeSyncer = stateChangeSyncer
}

// The same timesource is used in the chain data structure and in the connection
Expand Down Expand Up @@ -556,8 +556,8 @@ func NewServer(
_connectIps, _targetOutboundPeers, _maxInboundPeers, _limitOneInboundConnectionPerIP,
_peerConnectionRefreshIntervalMillis, _minFeeRateNanosPerKB, nodeServices)

if srv.stateChangeSyncer != nil {
srv.stateChangeSyncer.BlockHeight = uint64(_chain.headerTip().Height)
if srv.StateChangeSyncer != nil {
srv.StateChangeSyncer.BlockHeight = uint64(_chain.headerTip().Height)
}

// Create a mempool to store transactions until they're ready to be mined into
Expand Down Expand Up @@ -3292,8 +3292,8 @@ func (srv *Server) Start() {
}

// Initialize state syncer mempool job, if needed.
if srv.stateChangeSyncer != nil {
srv.stateChangeSyncer.StartMempoolSyncRoutine(srv)
if srv.StateChangeSyncer != nil {
srv.StateChangeSyncer.StartMempoolSyncRoutine(srv)
}

// Start the network manager's internal event loop to open and close connections to peers.
Expand Down
25 changes: 23 additions & 2 deletions lib/state_change_syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,11 @@ func (stateChangeEntry *StateChangeEntry) RawDecodeWithoutMetadata(blockHeight u
ancestralRecord := stateChangeEntry.EncoderType.New()
if exist, err := DecodeFromBytes(ancestralRecord, rr); exist && err == nil {
stateChangeEntry.AncestralRecord = ancestralRecord
stateChangeEntry.AncestralRecordBytes = EncodeToBytes(blockHeight, ancestralRecord)
} else if err != nil {
return errors.Wrapf(err, "StateChangeEntry.RawDecodeWithoutMetadata: error decoding ancestral record")
} else {
stateChangeEntry.AncestralRecordBytes = EncodeToBytes(blockHeight, nil)
}

// Decode the flush UUID.
Expand Down Expand Up @@ -415,7 +418,7 @@ func (stateChangeSyncer *StateChangeSyncer) _handleStateSyncerOperation(event *S

if event.IsMempoolTxn {
// Set the flushId to the mempool flush ID.
//flushId = stateChangeSyncer.BlockSyncFlushI
//flushId = StateChangeSyncer.BlockSyncFlushI

// If the event flush ID is nil, then we need to use the global mempool flush ID.
if flushId == uuid.Nil {
Expand Down Expand Up @@ -445,6 +448,7 @@ func (stateChangeSyncer *StateChangeSyncer) _handleStateSyncerOperation(event *S
}

encoderType = encoder.GetEncoderType()

} else {
// If the value associated with the key is not an encoder, then we decode the encoder entirely from the key bytes.
// Examples of this are FollowEntry, LikeEntry, DeSoBalanceEntry, etc.
Expand All @@ -455,8 +459,18 @@ func (stateChangeSyncer *StateChangeSyncer) _handleStateSyncerOperation(event *S
encoderType = keyEncoder.GetEncoderType()
stateChangeEntry.Encoder = keyEncoder
stateChangeEntry.EncoderBytes = nil
}

if stateChangeEntry.AncestralRecordBytes != nil && len(stateChangeEntry.AncestralRecordBytes) > 0 {
// Decode the ancestral record.
ancestralRecord, err := DecodeStateKey(stateChangeEntry.KeyBytes, stateChangeEntry.AncestralRecordBytes)
if err != nil {
glog.Fatalf("Server._handleStateSyncerOperation: Error decoding ancestral record: %v", err)
}
stateChangeEntry.AncestralRecord = ancestralRecord
stateChangeEntry.AncestralRecordBytes = nil
}
}

// Set the encoder type.
stateChangeEntry.EncoderType = encoderType

Expand Down Expand Up @@ -813,6 +827,12 @@ func (stateChangeSyncer *StateChangeSyncer) SyncMempoolToStateSyncer(server *Ser
// more than once in the mempool transactions.
txn := server.blockchain.db.NewTransaction(true)
defer txn.Discard()

// Create a read-only view of the badger DB prior to the mempool flush. This view will be used to get the ancestral
// records of entries that are being modified in the mempool.
mempoolEventManager.lastCommittedViewTxn = server.blockchain.db.NewTransaction(false)
defer mempoolEventManager.lastCommittedViewTxn.Discard()

glog.V(2).Infof("Time since mempool sync start: %v", time.Since(startTime))
startTime = time.Now()
err = mempoolUtxoView.FlushToDbWithTxn(txn, uint64(server.blockchain.bestChain[len(server.blockchain.bestChain)-1].Height))
Expand Down Expand Up @@ -1033,6 +1053,7 @@ func (stateChangeSyncer *StateChangeSyncer) StartMempoolSyncRoutine(server *Serv
// Sleep for a short while to avoid a tight loop.
time.Sleep(100 * time.Millisecond)
var err error

// If the mempool is not empty, sync the mempool to the state syncer.
mempoolClosed, err = stateChangeSyncer.SyncMempoolToStateSyncer(server)
if err != nil {
Expand Down