Skip to content

Commit 41fc2b9

Browse files
committed
rebase from merge problem
1 parent 502deae commit 41fc2b9

1 file changed

Lines changed: 71 additions & 57 deletions

File tree

block/internal/syncing/syncer.go

Lines changed: 71 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package syncing
33
import (
44
"bytes"
55
"context"
6+
"encoding/binary"
67
"errors"
78
"fmt"
89
"sync"
@@ -52,8 +53,8 @@ type Syncer struct {
5253
// State management
5354
lastState *atomic.Pointer[types.State]
5455

55-
// DA state
56-
daHeight *atomic.Uint64
56+
// DA retriever
57+
daRetrieverHeight *atomic.Uint64
5758

5859
// P2P stores
5960
headerStore common.Broadcaster[*types.SignedHeader]
@@ -64,8 +65,8 @@ type Syncer struct {
6465
errorCh chan<- error // Channel to report critical execution client failures
6566

6667
// Handlers
67-
daRetriever daRetriever
68-
p2pHandler p2pHandler
68+
daRetrieverHandler daRetriever
69+
p2pHandler p2pHandler
6970

7071
// Logging
7172
logger zerolog.Logger
@@ -95,42 +96,45 @@ func NewSyncer(
9596
errorCh chan<- error,
9697
) *Syncer {
9798
return &Syncer{
98-
store: store,
99-
exec: exec,
100-
da: da,
101-
cache: cache,
102-
metrics: metrics,
103-
config: config,
104-
genesis: genesis,
105-
options: options,
106-
headerStore: headerStore,
107-
dataStore: dataStore,
108-
lastState: &atomic.Pointer[types.State]{},
109-
daHeight: &atomic.Uint64{},
110-
heightInCh: make(chan common.DAHeightEvent, 1_000),
111-
errorCh: errorCh,
112-
logger: logger.With().Str("component", "syncer").Logger(),
99+
store: store,
100+
exec: exec,
101+
da: da,
102+
cache: cache,
103+
metrics: metrics,
104+
config: config,
105+
genesis: genesis,
106+
options: options,
107+
headerStore: headerStore,
108+
dataStore: dataStore,
109+
lastState: &atomic.Pointer[types.State]{},
110+
daRetrieverHeight: &atomic.Uint64{},
111+
heightInCh: make(chan common.DAHeightEvent, 1_000),
112+
errorCh: errorCh,
113+
logger: logger.With().Str("component", "syncer").Logger(),
113114
}
114115
}
115116

116117
// Start begins the syncing component
117118
func (s *Syncer) Start(ctx context.Context) error {
118119
s.ctx, s.cancel = context.WithCancel(ctx)
119120

120-
// Initialize state
121121
if err := s.initializeState(); err != nil {
122122
return fmt.Errorf("failed to initialize syncer state: %w", err)
123123
}
124124

125125
// Initialize handlers
126-
s.daRetriever = NewDARetriever(s.da, s.cache, s.config, s.genesis, s.logger)
126+
s.daRetrieverHandler = NewDARetriever(s.da, s.cache, s.config, s.genesis, s.logger)
127127
s.p2pHandler = NewP2PHandler(s.headerStore.Store(), s.dataStore.Store(), s.cache, s.genesis, s.logger)
128128
if currentHeight, err := s.store.Height(s.ctx); err != nil {
129129
s.logger.Error().Err(err).Msg("failed to set initial processed height for p2p handler")
130130
} else {
131131
s.p2pHandler.SetProcessedHeight(currentHeight)
132132
}
133133

134+
if !s.waitForGenesis() {
135+
return nil
136+
}
137+
134138
// Start main processing loop
135139
s.wg.Add(1)
136140
go func() {
@@ -175,16 +179,6 @@ func (s *Syncer) SetLastState(state types.State) {
175179
s.lastState.Store(&state)
176180
}
177181

178-
// GetDAHeight returns the current DA height
179-
func (s *Syncer) GetDAHeight() uint64 {
180-
return s.daHeight.Load()
181-
}
182-
183-
// SetDAHeight updates the DA height
184-
func (s *Syncer) SetDAHeight(height uint64) {
185-
s.daHeight.Store(height)
186-
}
187-
188182
// initializeState loads the current sync state
189183
func (s *Syncer) initializeState() error {
190184
// Load state from store
@@ -216,12 +210,13 @@ func (s *Syncer) initializeState() error {
216210
}
217211
s.SetLastState(state)
218212

219-
// Set DA height
220-
s.SetDAHeight(state.DAHeight)
213+
// Set DA height to the maximum of the genesis start height, the state's DA height, the cached DA height, and the highest stored included DA height.
214+
// This ensures we resume from the highest known DA height, even if the cache is cleared on restart. If the DA height is too high because of a user error, reset it with --evnode.clear_cache. The DA height will be back to the last highest known executed DA height for a height.
215+
s.daRetrieverHeight.Store(max(s.genesis.DAStartHeight, s.cache.DaHeight(), state.DAHeight, s.getHighestStoredDAHeight()))
221216

222217
s.logger.Info().
223218
Uint64("height", state.LastBlockHeight).
224-
Uint64("da_height", s.GetDAHeight()).
219+
Uint64("da_height", s.daRetrieverHeight.Load()).
225220
Str("chain_id", state.ChainID).
226221
Msg("initialized syncer state")
227222

@@ -259,10 +254,6 @@ func (s *Syncer) startSyncWorkers() {
259254
func (s *Syncer) daWorkerLoop() {
260255
defer s.wg.Done()
261256

262-
if !s.waitForGenesis() {
263-
return
264-
}
265-
266257
s.logger.Info().Msg("starting DA worker")
267258
defer s.logger.Info().Msg("DA worker stopped")
268259

@@ -297,17 +288,13 @@ func (s *Syncer) fetchDAUntilCaughtUp() error {
297288
default:
298289
}
299290

300-
daHeight := s.GetDAHeight()
301-
302-
// Create a new context with a timeout for the DA call
303-
ctx, cancel := context.WithTimeout(s.ctx, 5*time.Second)
304-
defer cancel()
291+
daHeight := max(s.daRetrieverHeight.Load(), s.cache.DaHeight())
305292

306-
events, err := s.daRetriever.RetrieveFromDA(ctx, daHeight)
293+
events, err := s.daRetrieverHandler.RetrieveFromDA(s.ctx, daHeight)
307294
if err != nil {
308295
switch {
309296
case errors.Is(err, da.ErrBlobNotFound):
310-
s.SetDAHeight(daHeight + 1)
297+
s.daRetrieverHeight.Store(daHeight + 1)
311298
continue // Fetch next height immediately
312299
case errors.Is(err, da.ErrHeightFromFuture):
313300
s.logger.Debug().Err(err).Uint64("da_height", daHeight).Msg("DA is ahead of local target; backing off future height requests")
@@ -332,18 +319,14 @@ func (s *Syncer) fetchDAUntilCaughtUp() error {
332319
}
333320
}
334321

335-
// increment DA height on successful retrieval
336-
s.SetDAHeight(daHeight + 1)
322+
// increment DA retrieval height on successful retrieval
323+
s.daRetrieverHeight.Store(daHeight + 1)
337324
}
338325
}
339326

340327
func (s *Syncer) pendingWorkerLoop() {
341328
defer s.wg.Done()
342329

343-
if !s.waitForGenesis() {
344-
return
345-
}
346-
347330
s.logger.Info().Msg("starting pending worker")
348331
defer s.logger.Info().Msg("pending worker stopped")
349332

@@ -363,10 +346,6 @@ func (s *Syncer) pendingWorkerLoop() {
363346
func (s *Syncer) p2pWorkerLoop() {
364347
defer s.wg.Done()
365348

366-
if !s.waitForGenesis() {
367-
return
368-
}
369-
370349
logger := s.logger.With().Str("worker", "p2p").Logger()
371350
logger.Info().Msg("starting P2P worker")
372351
defer logger.Info().Msg("P2P worker stopped")
@@ -547,13 +526,14 @@ func (s *Syncer) trySyncNextBlock(event *common.DAHeightEvent) error {
547526
return err
548527
}
549528

550-
// Apply block
551529
newState, err := s.applyBlock(header.Header, data, currentState)
552530
if err != nil {
553531
return fmt.Errorf("failed to apply block: %w", err)
554532
}
555533

556534
// Update DA height if needed
535+
// This height is only updated when a height is processed from DA as P2P
536+
// events do not contain DA height information
557537
if event.DaHeight > newState.DAHeight {
558538
newState.DAHeight = event.DaHeight
559539
}
@@ -621,7 +601,8 @@ func (s *Syncer) applyBlock(header types.Header, data *types.Data, currentState
621601
return newState, nil
622602
}
623603

624-
// executeTxsWithRetry executes transactions with retry logic
604+
// executeTxsWithRetry executes transactions with retry logic.
605+
// NOTE: the function retries the execution client call regardless of the error. Some execution clients errors are irrecoverable, and will eventually halt the node, as expected.
625606
func (s *Syncer) executeTxsWithRetry(ctx context.Context, rawTxs [][]byte, header types.Header, currentState types.State) ([]byte, error) {
626607
for attempt := 1; attempt <= common.MaxRetriesBeforeHalt; attempt++ {
627608
newAppHash, _, err := s.exec.ExecuteTxs(ctx, rawTxs, header.Height(), header.Time(), currentState.AppHash)
@@ -750,6 +731,39 @@ func (s *Syncer) sleepOrDone(duration time.Duration) bool {
750731
}
751732
}
752733

734+
// getHighestStoredDAHeight retrieves the highest DA height from the store by checking
735+
// the DA heights stored for the last DA included height
736+
// this relies on the node syncing with DA and setting included heights.
737+
func (s *Syncer) getHighestStoredDAHeight() uint64 {
738+
// Get the DA included height from store
739+
daIncludedHeightBytes, err := s.store.GetMetadata(s.ctx, store.DAIncludedHeightKey)
740+
if err != nil || len(daIncludedHeightBytes) != 8 {
741+
return 0
742+
}
743+
daIncludedHeight := binary.LittleEndian.Uint64(daIncludedHeightBytes)
744+
if daIncludedHeight == 0 {
745+
return 0
746+
}
747+
748+
var highestDAHeight uint64
749+
750+
// Get header DA height for the last included height
751+
headerKey := store.GetHeightToDAHeightHeaderKey(daIncludedHeight)
752+
if headerBytes, err := s.store.GetMetadata(s.ctx, headerKey); err == nil && len(headerBytes) == 8 {
753+
headerDAHeight := binary.LittleEndian.Uint64(headerBytes)
754+
highestDAHeight = max(highestDAHeight, headerDAHeight)
755+
}
756+
757+
// Get data DA height for the last included height
758+
dataKey := store.GetHeightToDAHeightDataKey(daIncludedHeight)
759+
if dataBytes, err := s.store.GetMetadata(s.ctx, dataKey); err == nil && len(dataBytes) == 8 {
760+
dataDAHeight := binary.LittleEndian.Uint64(dataBytes)
761+
highestDAHeight = max(highestDAHeight, dataDAHeight)
762+
}
763+
764+
return highestDAHeight
765+
}
766+
753767
type p2pWaitState struct {
754768
height uint64
755769
cancel context.CancelFunc

0 commit comments

Comments
 (0)