Skip to content

Commit fd2d51e

Browse files
committed
consolidate
1 parent f2a5390 commit fd2d51e

File tree

8 files changed

+668
-245
lines changed

8 files changed

+668
-245
lines changed

apps/evm/cmd/run.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,11 @@ var RunCmd = &cobra.Command{
7777
return err
7878
}
7979

80+
// Attach store to the EVM engine client for ExecMeta tracking (idempotent execution)
81+
if ec, ok := executor.(*evm.EngineClient); ok {
82+
ec.SetStore(store.New(datastore))
83+
}
84+
8085
genesisPath := filepath.Join(filepath.Dir(nodeConfig.ConfigPath()), "genesis.json")
8186
genesis, err := genesispkg.LoadGenesis(genesisPath)
8287
if err != nil {
@@ -250,7 +255,7 @@ func createExecutionClient(cmd *cobra.Command) (execution.Executor, error) {
250255
genesisHash := common.HexToHash(genesisHashStr)
251256
feeRecipient := common.HexToAddress(feeRecipientStr)
252257

253-
return evm.NewEngineExecutionClient(ethURL, engineURL, jwtSecret, genesisHash, feeRecipient)
258+
return evm.NewEngineExecutionClient(ethURL, engineURL, jwtSecret, genesisHash, feeRecipient, nil)
254259
}
255260

256261
// addFlags adds flags related to the EVM execution client

block/internal/executing/executor.go

Lines changed: 4 additions & 97 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package executing
33
import (
44
"bytes"
55
"context"
6-
"crypto/sha256"
76
"errors"
87
"fmt"
98
"sync"
@@ -26,15 +25,6 @@ import (
2625
"github.com/evstack/ev-node/types"
2726
)
2827

29-
// payloadResumer is an optional interface that EVM execution clients can implement
30-
// to support resuming in-progress payload builds after crashes.
31-
// This is defined locally to avoid coupling the core interface to EVM-specific concepts.
32-
type payloadResumer interface {
33-
// ResumePayload resumes an in-progress payload build using a stored payloadID.
34-
// This allows crash recovery without creating sibling blocks.
35-
ResumePayload(ctx context.Context, payloadID []byte) (stateRoot []byte, err error)
36-
}
37-
3828
// Executor handles block production, transaction processing, and state management
3929
type Executor struct {
4030
// Core components
@@ -380,25 +370,6 @@ func (e *Executor) produceBlock() error {
380370
return fmt.Errorf("failed to save block data: %w", err)
381371
}
382372

383-
// Save ExecMeta with Stage="started" for crash recovery and idempotent execution
384-
execMeta := &store.ExecMeta{
385-
Height: newHeight,
386-
Timestamp: header.Time().Unix(),
387-
Stage: store.ExecStageStarted,
388-
UpdatedAtUnix: time.Now().Unix(),
389-
}
390-
// Compute tx hash for sanity checks on retry
391-
if len(data.Txs) > 0 {
392-
h := sha256.New()
393-
for _, tx := range data.Txs {
394-
h.Write(tx)
395-
}
396-
execMeta.TxHash = h.Sum(nil)
397-
}
398-
if err = batch.SaveExecMeta(execMeta); err != nil {
399-
return fmt.Errorf("failed to save exec meta: %w", err)
400-
}
401-
402373
if err = batch.Commit(); err != nil {
403374
return fmt.Errorf("failed to commit early save batch: %w", err)
404375
}
@@ -452,18 +423,6 @@ func (e *Executor) produceBlock() error {
452423
return fmt.Errorf("failed to update state: %w", err)
453424
}
454425

455-
// Update ExecMeta to Stage="promoted" after successful execution
456-
execMeta := &store.ExecMeta{
457-
Height: newHeight,
458-
Timestamp: header.Time().Unix(),
459-
StateRoot: newState.AppHash,
460-
Stage: store.ExecStagePromoted,
461-
UpdatedAtUnix: time.Now().Unix(),
462-
}
463-
if err := batch.SaveExecMeta(execMeta); err != nil {
464-
return fmt.Errorf("failed to update exec meta to promoted: %w", err)
465-
}
466-
467426
if err := batch.Commit(); err != nil {
468427
return fmt.Errorf("failed to commit batch: %w", err)
469428
}
@@ -666,63 +625,11 @@ func (e *Executor) signHeader(header types.Header) (types.Signature, error) {
666625
}
667626

668627
// executeTxsWithRetry executes transactions with retry logic.
669-
// It first checks ExecMeta for idempotent execution - if a block was already built
670-
// at this height, it returns the stored StateRoot instead of rebuilding.
671-
// If a payloadID exists (started but not promoted), it attempts to resume the payload
672-
// using the PayloadResumer interface if available.
673-
// NOTE: the function retries the execution client call regardless of the error. Some execution clients errors are irrecoverable, and will eventually halt the node, as expected.
628+
// Idempotent execution and crash recovery are handled by the execution client
629+
// (e.g., EngineClient) via ExecMeta tracking.
630+
// NOTE: the function retries the execution client call regardless of the error.
631+
// Some execution client errors are irrecoverable, and will eventually halt the node, as expected.
674632
func (e *Executor) executeTxsWithRetry(ctx context.Context, rawTxs [][]byte, header types.Header, currentState types.State) ([]byte, error) {
675-
height := header.Height()
676-
677-
// Check ExecMeta for idempotent execution
678-
// If we already have a promoted block at this height, return the stored StateRoot
679-
execMeta, err := e.store.GetExecMeta(ctx, height)
680-
if err == nil && execMeta != nil {
681-
if execMeta.Stage == store.ExecStagePromoted && len(execMeta.StateRoot) > 0 {
682-
e.logger.Info().
683-
Uint64("height", height).
684-
Str("stage", execMeta.Stage).
685-
Msg("executeTxsWithRetry: reusing already-promoted execution (idempotent)")
686-
return execMeta.StateRoot, nil
687-
}
688-
689-
// If we have a started execution with a payloadID, try to resume
690-
// This handles crash recovery where we got a payloadID but didn't complete the build
691-
if execMeta.Stage == store.ExecStageStarted && len(execMeta.PayloadID) > 0 {
692-
e.logger.Info().
693-
Uint64("height", height).
694-
Str("stage", execMeta.Stage).
695-
Msg("executeTxsWithRetry: found in-progress execution with payloadID, attempting resume")
696-
697-
// Check if the executor implements payloadResumer (EVM-specific)
698-
if resumer, ok := e.exec.(payloadResumer); ok {
699-
stateRoot, err := resumer.ResumePayload(ctx, execMeta.PayloadID)
700-
if err == nil {
701-
e.logger.Info().
702-
Uint64("height", height).
703-
Msg("executeTxsWithRetry: successfully resumed payload")
704-
return stateRoot, nil
705-
}
706-
// Resume failed - log and fall through to normal execution
707-
// The EL-level idempotency check will handle if the block was already built
708-
e.logger.Warn().Err(err).
709-
Uint64("height", height).
710-
Msg("executeTxsWithRetry: failed to resume payload, falling back to normal execution")
711-
} else {
712-
e.logger.Debug().
713-
Uint64("height", height).
714-
Msg("executeTxsWithRetry: executor does not support PayloadResumer, using normal execution")
715-
}
716-
} else if execMeta.Stage == store.ExecStageStarted {
717-
// Started but no payloadID - log and proceed normally
718-
// The EL-level idempotency check in ExecuteTxs will handle reusing the block
719-
e.logger.Debug().
720-
Uint64("height", height).
721-
Str("stage", execMeta.Stage).
722-
Msg("executeTxsWithRetry: found in-progress execution without payloadID, will attempt EL-level idempotency")
723-
}
724-
}
725-
726633
for attempt := 1; attempt <= common.MaxRetriesBeforeHalt; attempt++ {
727634
newAppHash, _, err := e.exec.ExecuteTxs(ctx, rawTxs, header.Height(), header.Time(), currentState.AppHash)
728635
if err != nil {

execution/evm/execution.go

Lines changed: 128 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package evm
22

33
import (
44
"context"
5+
"crypto/sha256"
56
"encoding/hex"
67
"errors"
78
"fmt"
@@ -20,6 +21,7 @@ import (
2021
"github.com/rs/zerolog"
2122

2223
"github.com/evstack/ev-node/core/execution"
24+
"github.com/evstack/ev-node/pkg/store"
2325
)
2426

2527
const (
@@ -125,6 +127,10 @@ type EngineClient struct {
125127
initialHeight uint64
126128
feeRecipient common.Address // Address to receive transaction fees
127129

130+
// store provides persistence for ExecMeta to enable idempotent execution
131+
// and crash recovery. Optional - if nil, ExecMeta tracking is disabled.
132+
store store.Store
133+
128134
mu sync.Mutex // Mutex to protect concurrent access to block hashes
129135
currentHeadBlockHash common.Hash // Store last non-finalized HeadBlockHash
130136
currentHeadHeight uint64 // Height of the current head block (for safe lag calculation)
@@ -139,13 +145,16 @@ type EngineClient struct {
139145
logger zerolog.Logger
140146
}
141147

142-
// NewEngineExecutionClient creates a new instance of EngineAPIExecutionClient
148+
// NewEngineExecutionClient creates a new instance of EngineAPIExecutionClient.
149+
// The store parameter is optional - if provided, ExecMeta tracking is enabled
150+
// for idempotent execution and crash recovery.
143151
func NewEngineExecutionClient(
144152
ethURL,
145153
engineURL string,
146154
jwtSecret string,
147155
genesisHash common.Hash,
148156
feeRecipient common.Address,
157+
evStore store.Store,
149158
) (*EngineClient, error) {
150159
ethClient, err := ethclient.Dial(ethURL)
151160
if err != nil {
@@ -178,6 +187,7 @@ func NewEngineExecutionClient(
178187
ethClient: ethClient,
179188
genesisHash: genesisHash,
180189
feeRecipient: feeRecipient,
190+
store: evStore,
181191
currentHeadBlockHash: genesisHash,
182192
currentSafeBlockHash: genesisHash,
183193
currentFinalizedBlockHash: genesisHash,
@@ -190,6 +200,13 @@ func (c *EngineClient) SetLogger(l zerolog.Logger) {
190200
c.logger = l
191201
}
192202

203+
// SetStore allows callers to attach a store for ExecMeta tracking.
204+
// This enables idempotent execution and crash recovery features.
205+
// Must be called before ExecuteTxs if ExecMeta tracking is desired.
206+
func (c *EngineClient) SetStore(s store.Store) {
207+
c.store = s
208+
}
209+
193210
// InitChain initializes the blockchain with the given genesis parameters
194211
func (c *EngineClient) InitChain(ctx context.Context, genesisTime time.Time, initialHeight uint64, chainID string) ([]byte, uint64, error) {
195212
if initialHeight != 1 {
@@ -263,11 +280,51 @@ func (c *EngineClient) GetTxs(ctx context.Context) ([][]byte, error) {
263280
// ExecuteTxs executes the given transactions at the specified block height and timestamp.
264281
// This method is serialized via executeMu to prevent concurrent block builds that could
265282
// create sibling blocks (fork explosion).
283+
//
284+
// ExecMeta tracking (if store is configured):
285+
// - Checks for already-promoted blocks to enable idempotent execution
286+
// - Saves ExecMeta with payloadID after forkchoiceUpdatedV3 for crash recovery
287+
// - Updates ExecMeta to "promoted" after successful execution
266288
func (c *EngineClient) ExecuteTxs(ctx context.Context, txs [][]byte, blockHeight uint64, timestamp time.Time, prevStateRoot []byte) (updatedStateRoot []byte, maxBytes uint64, err error) {
267289
// Serialize all ExecuteTxs calls to prevent concurrent sibling block creation
268290
c.executeMu.Lock()
269291
defer c.executeMu.Unlock()
270292

293+
// Check ExecMeta for idempotent execution (if store is configured)
294+
if c.store != nil {
295+
execMeta, err := c.store.GetExecMeta(ctx, blockHeight)
296+
if err == nil && execMeta != nil {
297+
// If we already have a promoted block at this height, return the stored StateRoot
298+
if execMeta.Stage == store.ExecStagePromoted && len(execMeta.StateRoot) > 0 {
299+
c.logger.Info().
300+
Uint64("height", blockHeight).
301+
Str("stage", execMeta.Stage).
302+
Msg("ExecuteTxs: reusing already-promoted execution (idempotent)")
303+
return execMeta.StateRoot, 0, nil
304+
}
305+
306+
// If we have a started execution with a payloadID, try to resume
307+
if execMeta.Stage == store.ExecStageStarted && len(execMeta.PayloadID) > 0 {
308+
c.logger.Info().
309+
Uint64("height", blockHeight).
310+
Str("stage", execMeta.Stage).
311+
Msg("ExecuteTxs: found in-progress execution with payloadID, attempting resume")
312+
313+
stateRoot, err := c.ResumePayload(ctx, execMeta.PayloadID)
314+
if err == nil {
315+
c.logger.Info().
316+
Uint64("height", blockHeight).
317+
Msg("ExecuteTxs: successfully resumed payload")
318+
return stateRoot, 0, nil
319+
}
320+
// Resume failed - log and fall through to EL-level idempotency check
321+
c.logger.Warn().Err(err).
322+
Uint64("height", blockHeight).
323+
Msg("ExecuteTxs: failed to resume payload, falling back to EL-level check")
324+
}
325+
}
326+
}
327+
271328
// Idempotency check: if EL already has a block at this height with matching
272329
// timestamp, return its state root instead of building a new block.
273330
// This handles retries and crash recovery without creating sibling blocks.
@@ -279,14 +336,19 @@ func (c *EngineClient) ExecuteTxs(ctx context.Context, txs [][]byte, blockHeight
279336
Uint64("height", blockHeight).
280337
Str("blockHash", existingBlockHash.Hex()).
281338
Str("stateRoot", existingStateRoot.Hex()).
282-
Msg("ExecuteTxs: reusing existing block at height (idempotency)")
339+
Msg("ExecuteTxs: reusing existing block at height (EL idempotency)")
283340

284341
// Update head to point to this existing block
285342
if err := c.setHead(ctx, existingBlockHash); err != nil {
286343
c.logger.Warn().Err(err).Msg("ExecuteTxs: failed to update head to existing block")
287344
// Continue anyway - the block exists and we can return its state root
288345
}
289346

347+
// Update ExecMeta to promoted if store is configured
348+
if c.store != nil {
349+
c.saveExecMeta(ctx, blockHeight, timestamp.Unix(), nil, existingBlockHash[:], existingStateRoot.Bytes(), txs, store.ExecStagePromoted)
350+
}
351+
290352
return existingStateRoot.Bytes(), 0, nil
291353
}
292354
// Timestamp mismatch - this is unexpected, log a warning but proceed
@@ -416,6 +478,12 @@ func (c *EngineClient) ExecuteTxs(ctx context.Context, txs [][]byte, blockHeight
416478
return nil, 0, err
417479
}
418480

481+
// Save ExecMeta with payloadID for crash recovery (Stage="started")
482+
// This allows resuming the payload build if we crash before completing
483+
if c.store != nil {
484+
c.saveExecMeta(ctx, blockHeight, timestamp.Unix(), payloadID[:], nil, nil, txs, store.ExecStageStarted)
485+
}
486+
419487
// get payload
420488
var payloadResult engine.ExecutionPayloadEnvelope
421489
err = c.engineClient.CallContext(ctx, &payloadResult, "engine_getPayloadV4", *payloadID)
@@ -459,6 +527,11 @@ func (c *EngineClient) ExecuteTxs(ctx context.Context, txs [][]byte, blockHeight
459527
return nil, 0, err
460528
}
461529

530+
// Update ExecMeta to "promoted" after successful execution
531+
if c.store != nil {
532+
c.saveExecMeta(ctx, blockHeight, timestamp.Unix(), payloadID[:], blockHash[:], payloadResult.ExecutionPayload.StateRoot.Bytes(), txs, store.ExecStagePromoted)
533+
}
534+
462535
return payloadResult.ExecutionPayload.StateRoot.Bytes(), payloadResult.ExecutionPayload.GasUsed, nil
463536
}
464537

@@ -689,6 +762,11 @@ func (c *EngineClient) ResumePayload(ctx context.Context, payloadIDBytes []byte)
689762
Uint64("blockHeight", blockHeight).
690763
Msg("ResumePayload: successfully resumed payload")
691764

765+
// Update ExecMeta to "promoted" after successful resume
766+
if c.store != nil {
767+
c.saveExecMeta(ctx, blockHeight, int64(payloadResult.ExecutionPayload.Timestamp), payloadIDBytes, blockHash[:], payloadResult.ExecutionPayload.StateRoot.Bytes(), nil, store.ExecStagePromoted)
768+
}
769+
692770
return payloadResult.ExecutionPayload.StateRoot.Bytes(), nil
693771
}
694772

@@ -706,6 +784,54 @@ func (c *EngineClient) getBlockInfo(ctx context.Context, height uint64) (common.
706784
return header.Hash(), header.Root, header.GasLimit, header.Time, nil
707785
}
708786

787+
// saveExecMeta persists execution metadata to the store for crash recovery.
788+
// This is a best-effort operation - failures are logged but don't fail the execution.
789+
func (c *EngineClient) saveExecMeta(ctx context.Context, height uint64, timestamp int64, payloadID []byte, blockHash []byte, stateRoot []byte, txs [][]byte, stage string) {
790+
if c.store == nil {
791+
return
792+
}
793+
794+
execMeta := &store.ExecMeta{
795+
Height: height,
796+
Timestamp: timestamp,
797+
PayloadID: payloadID,
798+
BlockHash: blockHash,
799+
StateRoot: stateRoot,
800+
Stage: stage,
801+
UpdatedAtUnix: time.Now().Unix(),
802+
}
803+
804+
// Compute tx hash for sanity checks on retry
805+
if len(txs) > 0 {
806+
h := sha256.New()
807+
for _, tx := range txs {
808+
h.Write(tx)
809+
}
810+
execMeta.TxHash = h.Sum(nil)
811+
}
812+
813+
batch, err := c.store.NewBatch(ctx)
814+
if err != nil {
815+
c.logger.Warn().Err(err).Uint64("height", height).Msg("saveExecMeta: failed to create batch")
816+
return
817+
}
818+
819+
if err := batch.SaveExecMeta(execMeta); err != nil {
820+
c.logger.Warn().Err(err).Uint64("height", height).Msg("saveExecMeta: failed to save exec meta")
821+
return
822+
}
823+
824+
if err := batch.Commit(); err != nil {
825+
c.logger.Warn().Err(err).Uint64("height", height).Msg("saveExecMeta: failed to commit batch")
826+
return
827+
}
828+
829+
c.logger.Debug().
830+
Uint64("height", height).
831+
Str("stage", stage).
832+
Msg("saveExecMeta: saved execution metadata")
833+
}
834+
709835
// GetLatestHeight returns the current block height of the execution layer
710836
func (c *EngineClient) GetLatestHeight(ctx context.Context) (uint64, error) {
711837
header, err := c.ethClient.HeaderByNumber(ctx, nil) // nil = latest block

0 commit comments

Comments
 (0)