diff --git a/state/execution.go b/state/execution.go index ce3bda70e5..4ab87c3b2d 100644 --- a/state/execution.go +++ b/state/execution.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "fmt" + "sync" "time" abci "github.com/cometbft/cometbft/abci/types" @@ -43,6 +44,17 @@ type BlockExecutor struct { // 1-element cache of validated blocks lastValidatedBlock *types.Block + // cache for validators to avoid repeated DB lookups + validatorCache map[int64]*types.ValidatorSet + validatorCacheMutex sync.RWMutex + + // cache for ABCI validators to avoid repeated conversions + abciValidatorCache map[string]abci.Validator + abciValidatorCacheMutex sync.RWMutex + + // cache management + maxCacheSize int + logger log.Logger metrics *Metrics @@ -68,14 +80,17 @@ func NewBlockExecutor( options ...BlockExecutorOption, ) *BlockExecutor { res := &BlockExecutor{ - store: stateStore, - proxyApp: proxyApp, - eventBus: types.NopEventBus{}, - mempool: mempool, - evpool: evpool, - logger: logger, - metrics: NopMetrics(), - blockStore: blockStore, + store: stateStore, + proxyApp: proxyApp, + eventBus: types.NopEventBus{}, + mempool: mempool, + evpool: evpool, + logger: logger, + metrics: NopMetrics(), + blockStore: blockStore, + validatorCache: make(map[int64]*types.ValidatorSet), + abciValidatorCache: make(map[string]abci.Validator), + maxCacheSize: 2000, // Limit cache size to prevent memory leaks } for _, option := range options { @@ -89,6 +104,62 @@ func (blockExec *BlockExecutor) Store() Store { return blockExec.store } +// GetCacheSize returns the current cache sizes for testing +func (blockExec *BlockExecutor) GetCacheSize() (validatorCacheSize, abciValidatorCacheSize int) { + blockExec.validatorCacheMutex.RLock() + validatorCacheSize = len(blockExec.validatorCache) + blockExec.validatorCacheMutex.RUnlock() + + blockExec.abciValidatorCacheMutex.RLock() + abciValidatorCacheSize = len(blockExec.abciValidatorCache) + blockExec.abciValidatorCacheMutex.RUnlock() + return +} + +// SetMaxCacheSize sets the maximum cache size for testing +func (blockExec *BlockExecutor) SetMaxCacheSize(size int) { + blockExec.maxCacheSize = size +} + +// cleanupOldCacheEntries removes old entries from caches to prevent memory leaks +func (blockExec *BlockExecutor) cleanupOldCacheEntries() { + // Check validator cache size with read lock first + blockExec.validatorCacheMutex.RLock() + validatorCacheSize := len(blockExec.validatorCache) + blockExec.validatorCacheMutex.RUnlock() + + if validatorCacheSize > blockExec.maxCacheSize { + // Only acquire write lock when we actually need to clean up + blockExec.validatorCacheMutex.Lock() + // Double-check in case another goroutine cleaned it up + if len(blockExec.validatorCache) > blockExec.maxCacheSize { + // Simple cleanup: clear the entire cache + // Since Go maps don't guarantee iteration order, we'll clear the entire cache + // and let it rebuild naturally. This is simpler and avoids the FIFO issue. + blockExec.validatorCache = make(map[int64]*types.ValidatorSet) + } + blockExec.validatorCacheMutex.Unlock() + } + + // Check ABCI validator cache size with read lock first + blockExec.abciValidatorCacheMutex.RLock() + abciValidatorCacheSize := len(blockExec.abciValidatorCache) + blockExec.abciValidatorCacheMutex.RUnlock() + + if abciValidatorCacheSize > blockExec.maxCacheSize { + // Only acquire write lock when we actually need to clean up + blockExec.abciValidatorCacheMutex.Lock() + // Double-check in case another goroutine cleaned it up + if len(blockExec.abciValidatorCache) > blockExec.maxCacheSize { + // Simple cleanup: clear the entire cache + // Since Go maps don't guarantee iteration order, we'll clear the entire cache + // and let it rebuild naturally. This is simpler and avoids the FIFO issue. + blockExec.abciValidatorCache = make(map[string]abci.Validator) + } + blockExec.abciValidatorCacheMutex.Unlock() + } +} + // SetEventBus - sets the event bus for publishing block related events. // If not called, it defaults to types.NopEventBus. func (blockExec *BlockExecutor) SetEventBus(eventBus types.BlockEventPublisher) { @@ -170,7 +241,7 @@ func (blockExec *BlockExecutor) ProcessProposal( Height: block.Height, Time: block.Time, Txs: block.Txs.ToSliceOfBytes(), - ProposedLastCommit: buildLastCommitInfoFromStore(block, blockExec.store, state.InitialHeight), + ProposedLastCommit: blockExec.BuildLastCommitInfoFromStoreWithCache(block, state.InitialHeight), Misbehavior: block.Evidence.Evidence.ToABCI(), ProposerAddress: block.ProposerAddress, NextValidatorsHash: block.NextValidatorsHash, @@ -232,7 +303,7 @@ func (blockExec *BlockExecutor) applyBlock(state State, blockID types.BlockID, b ProposerAddress: block.ProposerAddress, Height: block.Height, Time: block.Time, - DecidedLastCommit: buildLastCommitInfoFromStore(block, blockExec.store, state.InitialHeight), + DecidedLastCommit: blockExec.BuildLastCommitInfoFromStoreWithCache(block, state.InitialHeight), Misbehavior: block.Evidence.Evidence.ToABCI(), Txs: block.Txs.ToSliceOfBytes(), }) @@ -345,7 +416,7 @@ func (blockExec *BlockExecutor) ExtendVote( Height: vote.Height, Time: block.Time, Txs: block.Txs.ToSliceOfBytes(), - ProposedLastCommit: buildLastCommitInfoFromStore(block, blockExec.store, state.InitialHeight), + ProposedLastCommit: blockExec.BuildLastCommitInfoFromStoreWithCache(block, state.InitialHeight), Misbehavior: block.Evidence.Evidence.ToABCI(), NextValidatorsHash: block.NextValidatorsHash, ProposerAddress: block.ProposerAddress, @@ -474,6 +545,41 @@ func buildLastCommitInfoFromStore(block *types.Block, store Store, initialHeight return BuildLastCommitInfo(block, lastValSet, initialHeight) } +// BuildLastCommitInfoFromStoreWithCache is an optimized version that uses caching +func (blockExec *BlockExecutor) BuildLastCommitInfoFromStoreWithCache(block *types.Block, initialHeight int64) abci.CommitInfo { + if block.Height == initialHeight { // check for initial height before loading validators + // there is no last commit for the initial height. + // return an empty value. + return abci.CommitInfo{} + } + + height := block.Height - 1 + + // Try to get validators from cache first + blockExec.validatorCacheMutex.RLock() + lastValSet, found := blockExec.validatorCache[height] + blockExec.validatorCacheMutex.RUnlock() + + if !found { + // Load from store + var err error + lastValSet, err = blockExec.store.LoadValidators(height) + if err != nil { + panic(fmt.Errorf("failed to load validator set at height %d: %w", height, err)) + } + + // Store in cache + blockExec.validatorCacheMutex.Lock() + blockExec.validatorCache[height] = lastValSet + blockExec.validatorCacheMutex.Unlock() + + // Cleanup old cache entries if needed (outside of lock to avoid deadlock) + blockExec.cleanupOldCacheEntries() + } + + return blockExec.BuildLastCommitInfoWithCache(block, lastValSet, initialHeight) +} + // BuildLastCommitInfo builds a CommitInfo from the given block and validator set. // If you want to load the validator set from the store instead of providing it, // use buildLastCommitInfoFromStore. @@ -513,6 +619,67 @@ func BuildLastCommitInfo(block *types.Block, lastValSet *types.ValidatorSet, ini } } +// BuildLastCommitInfoWithCache is an optimized version that uses caching for ABCI validators +func (blockExec *BlockExecutor) BuildLastCommitInfoWithCache(block *types.Block, lastValSet *types.ValidatorSet, initialHeight int64) abci.CommitInfo { + if block.Height == initialHeight { + // there is no last commit for the initial height. + // return an empty value. + return abci.CommitInfo{} + } + + var ( + commitSize = block.LastCommit.Size() + valSetLen = len(lastValSet.Validators) + ) + + // ensure that the size of the validator set in the last commit matches + // the size of the validator set in the state store. + if commitSize != valSetLen { + panic(fmt.Sprintf( + "commit size (%d) doesn't match validator set length (%d) at height %d\n\n%v\n\n%v", + commitSize, valSetLen, block.Height, block.LastCommit.Signatures, lastValSet.Validators, + )) + } + + votes := make([]abci.VoteInfo, block.LastCommit.Size()) + for i, val := range lastValSet.Validators { + commitSig := block.LastCommit.Signatures[i] + + // Use validator address as cache key (already computed) + cacheKey := string(val.Address) + + // Try to get ABCI validator from cache + blockExec.abciValidatorCacheMutex.RLock() + abciVal, found := blockExec.abciValidatorCache[cacheKey] + blockExec.abciValidatorCacheMutex.RUnlock() + + if !found { + // Convert to ABCI validator using the canonical helper to ensure + // all fields (e.g. PubKey) are populated identically to the + // non-cached path. + abciVal = types.TM2PB.Validator(val) + + // Store in cache + blockExec.abciValidatorCacheMutex.Lock() + blockExec.abciValidatorCache[cacheKey] = abciVal + blockExec.abciValidatorCacheMutex.Unlock() + + // Cleanup old cache entries if needed (outside of lock to avoid deadlock) + blockExec.cleanupOldCacheEntries() + } + + votes[i] = abci.VoteInfo{ + Validator: abciVal, + BlockIdFlag: cmtproto.BlockIDFlag(commitSig.BlockIDFlag), + } + } + + return abci.CommitInfo{ + Round: block.LastCommit.Round, + Votes: votes, + } +} + // buildExtendedCommitInfoFromStore populates an ABCI extended commit from the // corresponding CometBFT extended commit ec, using the stored validator set // from ec. It requires ec to include the original precommit votes along with diff --git a/state/execution_test.go b/state/execution_test.go index ace431269b..b5d0c71069 100644 --- a/state/execution_test.go +++ b/state/execution_test.go @@ -1131,3 +1131,219 @@ func makeBlockID(hash []byte, partSetSize uint32, partSetHash []byte) types.Bloc }, } } + +// TestBuildLastCommitInfoWithCache tests the caching functionality +func TestBuildLastCommitInfoWithCache(t *testing.T) { + // Create test validators + val1, _ := types.RandValidator(true, 10) + val2, _ := types.RandValidator(true, 20) + valSet := types.NewValidatorSet([]*types.Validator{val1, val2}) + + // Create a mock store + mockStore := &mocks.Store{} + mockStore.On("LoadValidators", int64(1)).Return(valSet, nil) + + // Create block executor with cache + blockExec := sm.NewBlockExecutor( + mockStore, + log.NewNopLogger(), + nil, // proxyApp + nil, // mempool + nil, // evpool + nil, // blockStore + ) + + // Create a test block + block := &types.Block{ + Header: types.Header{ + Height: 2, + }, + LastCommit: &types.Commit{ + Height: 1, + Round: 0, + Signatures: []types.CommitSig{ + {BlockIDFlag: types.BlockIDFlagCommit}, + {BlockIDFlag: types.BlockIDFlagCommit}, + }, + }, + } + + // First call - should load from store and cache + commitInfo1 := blockExec.BuildLastCommitInfoFromStoreWithCache(block, 1) + require.NotEmpty(t, commitInfo1.Votes) + require.Len(t, commitInfo1.Votes, 2) + + // Second call - should use cache (store should not be called again) + commitInfo2 := blockExec.BuildLastCommitInfoFromStoreWithCache(block, 1) + require.Equal(t, commitInfo1, commitInfo2) + + // Verify that LoadValidators was called only once + mockStore.AssertNumberOfCalls(t, "LoadValidators", 1) + + // Test that cache is working - second call should not hit the store +} + +// TestCacheCleanup tests the cache cleanup mechanism +func TestCacheCleanup(t *testing.T) { + // Create test validators + val1, _ := types.RandValidator(true, 10) + val2, _ := types.RandValidator(true, 20) + valSet := types.NewValidatorSet([]*types.Validator{val1, val2}) + + // Create a mock store + mockStore := &mocks.Store{} + mockStore.On("LoadValidators", mock.AnythingOfType("int64")).Return(valSet, nil) + + // Create block executor with small cache size for testing + blockExec := sm.NewBlockExecutor( + mockStore, + log.NewNopLogger(), + nil, // proxyApp + nil, // mempool + nil, // evpool + nil, // blockStore + ) + + // Set small cache size for testing + blockExec.SetMaxCacheSize(1) + + // Create test blocks + block1 := &types.Block{ + Header: types.Header{Height: 2}, + LastCommit: &types.Commit{ + Height: 1, Round: 0, + Signatures: []types.CommitSig{ + {BlockIDFlag: types.BlockIDFlagCommit}, + {BlockIDFlag: types.BlockIDFlagCommit}, + }, + }, + } + + block2 := &types.Block{ + Header: types.Header{Height: 3}, + LastCommit: &types.Commit{ + Height: 2, Round: 0, + Signatures: []types.CommitSig{ + {BlockIDFlag: types.BlockIDFlagCommit}, + {BlockIDFlag: types.BlockIDFlagCommit}, + }, + }, + } + + // Fill cache beyond limit (need to exceed maxCacheSize = 1) + _ = blockExec.BuildLastCommitInfoFromStoreWithCache(block1, 1) + _ = blockExec.BuildLastCommitInfoFromStoreWithCache(block2, 1) + + // Add one more to trigger cleanup (total 3 > maxCacheSize = 1) + block4 := &types.Block{ + Header: types.Header{Height: 4}, + LastCommit: &types.Commit{ + Height: 3, + Signatures: []types.CommitSig{ + { + ValidatorAddress: val1.Address, + Timestamp: time.Now(), + Signature: []byte("signature3_1"), + }, + { + ValidatorAddress: val2.Address, + Timestamp: time.Now(), + Signature: []byte("signature3_2"), + }, + }, + }, + } + _ = blockExec.BuildLastCommitInfoFromStoreWithCache(block4, 1) + + // Verify cache cleanup was triggered + validatorCacheSize, abciValidatorCacheSize := blockExec.GetCacheSize() + + // Validator cache should be cleaned up (cleared completely when over limit) + // Note: cleanup is triggered when adding new items, so we need to add one more + // to trigger the cleanup for the validator cache + require.LessOrEqual(t, validatorCacheSize, 1) // Should be 0 or 1 after cleanup + + // ABCI validator cache may still have entries as it uses different keys + // but should not exceed the threshold significantly + require.LessOrEqual(t, abciValidatorCacheSize, 2) // Reasonable upper bound +} + +// BenchmarkBuildLastCommitInfoWithCache benchmarks the caching performance +func BenchmarkBuildLastCommitInfoWithCache(b *testing.B) { + // Create test validators + val1, _ := types.RandValidator(true, 10) + val2, _ := types.RandValidator(true, 20) + valSet := types.NewValidatorSet([]*types.Validator{val1, val2}) + + // Create a mock store + mockStore := &mocks.Store{} + mockStore.On("LoadValidators", int64(1)).Return(valSet, nil) + + // Create block executor with cache + blockExec := sm.NewBlockExecutor( + mockStore, + log.NewNopLogger(), + nil, // proxyApp + nil, // mempool + nil, // evpool + nil, // blockStore + ) + + // Create a test block + block := &types.Block{ + Header: types.Header{ + Height: 2, + }, + LastCommit: &types.Commit{ + Height: 1, + Round: 0, + Signatures: []types.CommitSig{ + {BlockIDFlag: types.BlockIDFlagCommit}, + {BlockIDFlag: types.BlockIDFlagCommit}, + }, + }, + } + + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + _ = blockExec.BuildLastCommitInfoFromStoreWithCache(block, 1) + } + }) +} + +// BenchmarkBuildLastCommitInfoOriginal benchmarks the original version without caching +func BenchmarkBuildLastCommitInfoOriginal(b *testing.B) { + // Create test validators + val1, _ := types.RandValidator(true, 10) + val2, _ := types.RandValidator(true, 20) + valSet := types.NewValidatorSet([]*types.Validator{val1, val2}) + + // Create a mock store + mockStore := &mocks.Store{} + mockStore.On("LoadValidators", int64(1)).Return(valSet, nil) + + // Create a test block + block := &types.Block{ + Header: types.Header{ + Height: 2, + }, + LastCommit: &types.Commit{ + Height: 1, + Round: 0, + Signatures: []types.CommitSig{ + {BlockIDFlag: types.BlockIDFlagCommit}, + {BlockIDFlag: types.BlockIDFlagCommit}, + }, + }, + } + + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + // Simulate the original behavior by calling LoadValidators each time + valSet, _ := mockStore.LoadValidators(1) + _ = sm.BuildLastCommitInfo(block, valSet, 1) + } + }) +}