Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
189 changes: 178 additions & 11 deletions state/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"fmt"
"sync"
"time"

abci "github.com/cometbft/cometbft/abci/types"
Expand Down Expand Up @@ -43,6 +44,17 @@ type BlockExecutor struct {
// 1-element cache of validated blocks
lastValidatedBlock *types.Block

// cache for validators to avoid repeated DB lookups
validatorCache map[int64]*types.ValidatorSet
validatorCacheMutex sync.RWMutex

// cache for ABCI validators to avoid repeated conversions
abciValidatorCache map[string]abci.Validator
abciValidatorCacheMutex sync.RWMutex

// cache management
maxCacheSize int

logger log.Logger

metrics *Metrics
Expand All @@ -68,14 +80,17 @@ func NewBlockExecutor(
options ...BlockExecutorOption,
) *BlockExecutor {
res := &BlockExecutor{
store: stateStore,
proxyApp: proxyApp,
eventBus: types.NopEventBus{},
mempool: mempool,
evpool: evpool,
logger: logger,
metrics: NopMetrics(),
blockStore: blockStore,
store: stateStore,
proxyApp: proxyApp,
eventBus: types.NopEventBus{},
mempool: mempool,
evpool: evpool,
logger: logger,
metrics: NopMetrics(),
blockStore: blockStore,
validatorCache: make(map[int64]*types.ValidatorSet),
abciValidatorCache: make(map[string]abci.Validator),
Comment on lines +91 to +92
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in real execution, no values from these caches are ever removed, so these will grow forever. We will need someway to periodically remove elements from them.

maxCacheSize: 2000, // Limit cache size to prevent memory leaks
}

for _, option := range options {
Expand All @@ -89,6 +104,62 @@ func (blockExec *BlockExecutor) Store() Store {
return blockExec.store
}

// GetCacheSize returns the current cache sizes for testing
func (blockExec *BlockExecutor) GetCacheSize() (validatorCacheSize, abciValidatorCacheSize int) {
blockExec.validatorCacheMutex.RLock()
validatorCacheSize = len(blockExec.validatorCache)
blockExec.validatorCacheMutex.RUnlock()

blockExec.abciValidatorCacheMutex.RLock()
abciValidatorCacheSize = len(blockExec.abciValidatorCache)
blockExec.abciValidatorCacheMutex.RUnlock()
return
}

// SetMaxCacheSize sets the maximum cache size for testing
func (blockExec *BlockExecutor) SetMaxCacheSize(size int) {
blockExec.maxCacheSize = size
}

// cleanupOldCacheEntries removes old entries from caches to prevent memory leaks
func (blockExec *BlockExecutor) cleanupOldCacheEntries() {
// Check validator cache size with read lock first
blockExec.validatorCacheMutex.RLock()
validatorCacheSize := len(blockExec.validatorCache)
blockExec.validatorCacheMutex.RUnlock()

if validatorCacheSize > blockExec.maxCacheSize {
// Only acquire write lock when we actually need to clean up
blockExec.validatorCacheMutex.Lock()
// Double-check in case another goroutine cleaned it up
if len(blockExec.validatorCache) > blockExec.maxCacheSize {
// Simple cleanup: clear the entire cache
// Since Go maps don't guarantee iteration order, we'll clear the entire cache
// and let it rebuild naturally. This is simpler and avoids the FIFO issue.
blockExec.validatorCache = make(map[int64]*types.ValidatorSet)
}
blockExec.validatorCacheMutex.Unlock()
}

// Check ABCI validator cache size with read lock first
blockExec.abciValidatorCacheMutex.RLock()
abciValidatorCacheSize := len(blockExec.abciValidatorCache)
blockExec.abciValidatorCacheMutex.RUnlock()

if abciValidatorCacheSize > blockExec.maxCacheSize {
// Only acquire write lock when we actually need to clean up
blockExec.abciValidatorCacheMutex.Lock()
// Double-check in case another goroutine cleaned it up
if len(blockExec.abciValidatorCache) > blockExec.maxCacheSize {
// Simple cleanup: clear the entire cache
// Since Go maps don't guarantee iteration order, we'll clear the entire cache
// and let it rebuild naturally. This is simpler and avoids the FIFO issue.
blockExec.abciValidatorCache = make(map[string]abci.Validator)
}
blockExec.abciValidatorCacheMutex.Unlock()
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Cache Size Data Race and Incorrect Documentation

The maxCacheSize field has a data race due to unsynchronized reads in cleanupOldCacheEntries() and writes in SetMaxCacheSize(). Additionally, cleanupOldCacheEntries() comments (lines 128, 137) misleadingly state it clears half the cache, but the implementation clears the entire cache, potentially impacting performance.

Fix in Cursor Fix in Web


// SetEventBus - sets the event bus for publishing block related events.
// If not called, it defaults to types.NopEventBus.
func (blockExec *BlockExecutor) SetEventBus(eventBus types.BlockEventPublisher) {
Expand Down Expand Up @@ -170,7 +241,7 @@ func (blockExec *BlockExecutor) ProcessProposal(
Height: block.Height,
Time: block.Time,
Txs: block.Txs.ToSliceOfBytes(),
ProposedLastCommit: buildLastCommitInfoFromStore(block, blockExec.store, state.InitialHeight),
ProposedLastCommit: blockExec.BuildLastCommitInfoFromStoreWithCache(block, state.InitialHeight),
Misbehavior: block.Evidence.Evidence.ToABCI(),
ProposerAddress: block.ProposerAddress,
NextValidatorsHash: block.NextValidatorsHash,
Expand Down Expand Up @@ -232,7 +303,7 @@ func (blockExec *BlockExecutor) applyBlock(state State, blockID types.BlockID, b
ProposerAddress: block.ProposerAddress,
Height: block.Height,
Time: block.Time,
DecidedLastCommit: buildLastCommitInfoFromStore(block, blockExec.store, state.InitialHeight),
DecidedLastCommit: blockExec.BuildLastCommitInfoFromStoreWithCache(block, state.InitialHeight),
Misbehavior: block.Evidence.Evidence.ToABCI(),
Txs: block.Txs.ToSliceOfBytes(),
})
Expand Down Expand Up @@ -345,7 +416,7 @@ func (blockExec *BlockExecutor) ExtendVote(
Height: vote.Height,
Time: block.Time,
Txs: block.Txs.ToSliceOfBytes(),
ProposedLastCommit: buildLastCommitInfoFromStore(block, blockExec.store, state.InitialHeight),
ProposedLastCommit: blockExec.BuildLastCommitInfoFromStoreWithCache(block, state.InitialHeight),
Misbehavior: block.Evidence.Evidence.ToABCI(),
NextValidatorsHash: block.NextValidatorsHash,
ProposerAddress: block.ProposerAddress,
Expand Down Expand Up @@ -474,6 +545,41 @@ func buildLastCommitInfoFromStore(block *types.Block, store Store, initialHeight
return BuildLastCommitInfo(block, lastValSet, initialHeight)
}

// BuildLastCommitInfoFromStoreWithCache is an optimized version that uses caching
func (blockExec *BlockExecutor) BuildLastCommitInfoFromStoreWithCache(block *types.Block, initialHeight int64) abci.CommitInfo {
if block.Height == initialHeight { // check for initial height before loading validators
// there is no last commit for the initial height.
// return an empty value.
return abci.CommitInfo{}
}

height := block.Height - 1

// Try to get validators from cache first
blockExec.validatorCacheMutex.RLock()
lastValSet, found := blockExec.validatorCache[height]
blockExec.validatorCacheMutex.RUnlock()

if !found {
// Load from store
var err error
lastValSet, err = blockExec.store.LoadValidators(height)
if err != nil {
panic(fmt.Errorf("failed to load validator set at height %d: %w", height, err))
}

// Store in cache
blockExec.validatorCacheMutex.Lock()
blockExec.validatorCache[height] = lastValSet
blockExec.validatorCacheMutex.Unlock()

// Cleanup old cache entries if needed (outside of lock to avoid deadlock)
blockExec.cleanupOldCacheEntries()
}

return blockExec.BuildLastCommitInfoWithCache(block, lastValSet, initialHeight)
}

// BuildLastCommitInfo builds a CommitInfo from the given block and validator set.
// If you want to load the validator set from the store instead of providing it,
// use buildLastCommitInfoFromStore.
Expand Down Expand Up @@ -513,6 +619,67 @@ func BuildLastCommitInfo(block *types.Block, lastValSet *types.ValidatorSet, ini
}
}

// BuildLastCommitInfoWithCache is an optimized version that uses caching for ABCI validators
func (blockExec *BlockExecutor) BuildLastCommitInfoWithCache(block *types.Block, lastValSet *types.ValidatorSet, initialHeight int64) abci.CommitInfo {
if block.Height == initialHeight {
// there is no last commit for the initial height.
// return an empty value.
return abci.CommitInfo{}
}

var (
commitSize = block.LastCommit.Size()
valSetLen = len(lastValSet.Validators)
)

// ensure that the size of the validator set in the last commit matches
// the size of the validator set in the state store.
if commitSize != valSetLen {
panic(fmt.Sprintf(
"commit size (%d) doesn't match validator set length (%d) at height %d\n\n%v\n\n%v",
commitSize, valSetLen, block.Height, block.LastCommit.Signatures, lastValSet.Validators,
))
}

votes := make([]abci.VoteInfo, block.LastCommit.Size())
for i, val := range lastValSet.Validators {
commitSig := block.LastCommit.Signatures[i]

// Use validator address as cache key (already computed)
cacheKey := string(val.Address)

// Try to get ABCI validator from cache
blockExec.abciValidatorCacheMutex.RLock()
abciVal, found := blockExec.abciValidatorCache[cacheKey]
blockExec.abciValidatorCacheMutex.RUnlock()

if !found {
// Convert to ABCI validator using the canonical helper to ensure
// all fields (e.g. PubKey) are populated identically to the
// non-cached path.
abciVal = types.TM2PB.Validator(val)

// Store in cache
blockExec.abciValidatorCacheMutex.Lock()
blockExec.abciValidatorCache[cacheKey] = abciVal
blockExec.abciValidatorCacheMutex.Unlock()

// Cleanup old cache entries if needed (outside of lock to avoid deadlock)
blockExec.cleanupOldCacheEntries()
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Validator Cache Issues in ABCI

The abciValidatorCache in BuildLastCommitInfoWithCache has a few problems: it uses a manual abci.Validator conversion that may miss important logic, its cache key (validator address) can lead to stale voting power if power changes, and it has a race condition allowing redundant conversions and cache writes.

Fix in Cursor Fix in Web


votes[i] = abci.VoteInfo{
Validator: abciVal,
BlockIdFlag: cmtproto.BlockIDFlag(commitSig.BlockIDFlag),
}
}

return abci.CommitInfo{
Round: block.LastCommit.Round,
Votes: votes,
}
}

// buildExtendedCommitInfoFromStore populates an ABCI extended commit from the
// corresponding CometBFT extended commit ec, using the stored validator set
// from ec. It requires ec to include the original precommit votes along with
Expand Down
Loading
Loading