Skip to content

Commit 9a157c0

Browse files
committed
more clean-ups
1 parent a376b8f commit 9a157c0

3 files changed

Lines changed: 36 additions & 71 deletions

File tree

block/internal/syncing/syncer.go

Lines changed: 25 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,8 @@ type Syncer struct {
5252
// State management
5353
lastState *atomic.Pointer[types.State]
5454

55-
// DA state
56-
daHeight *atomic.Uint64
55+
// DA retriever height
56+
daRetrieverHeight *atomic.Uint64
5757

5858
// P2P stores
5959
headerStore common.Broadcaster[*types.SignedHeader]
@@ -95,21 +95,21 @@ func NewSyncer(
9595
errorCh chan<- error,
9696
) *Syncer {
9797
return &Syncer{
98-
store: store,
99-
exec: exec,
100-
da: da,
101-
cache: cache,
102-
metrics: metrics,
103-
config: config,
104-
genesis: genesis,
105-
options: options,
106-
headerStore: headerStore,
107-
dataStore: dataStore,
108-
lastState: &atomic.Pointer[types.State]{},
109-
daHeight: &atomic.Uint64{},
110-
heightInCh: make(chan common.DAHeightEvent, 1_000),
111-
errorCh: errorCh,
112-
logger: logger.With().Str("component", "syncer").Logger(),
98+
store: store,
99+
exec: exec,
100+
da: da,
101+
cache: cache,
102+
metrics: metrics,
103+
config: config,
104+
genesis: genesis,
105+
options: options,
106+
headerStore: headerStore,
107+
dataStore: dataStore,
108+
lastState: &atomic.Pointer[types.State]{},
109+
daRetrieverHeight: &atomic.Uint64{},
110+
heightInCh: make(chan common.DAHeightEvent, 1_000),
111+
errorCh: errorCh,
112+
logger: logger.With().Str("component", "syncer").Logger(),
113113
}
114114
}
115115

@@ -175,16 +175,6 @@ func (s *Syncer) SetLastState(state types.State) {
175175
s.lastState.Store(&state)
176176
}
177177

178-
// GetDAHeight returns the current DA height
179-
func (s *Syncer) GetDAHeight() uint64 {
180-
return s.daHeight.Load()
181-
}
182-
183-
// SetDAHeight updates the DA height
184-
func (s *Syncer) SetDAHeight(height uint64) {
185-
s.daHeight.Store(height)
186-
}
187-
188178
// initializeState loads the current sync state
189179
func (s *Syncer) initializeState() error {
190180
// Load state from store
@@ -218,11 +208,11 @@ func (s *Syncer) initializeState() error {
218208

219209
// Set DA height to the maximum of the genesis start height, the state's DA height, and the cached DA height.
220210
// This ensures we resume from the highest known DA height, even if the cache is cleared on restart. If the DA height is too high because of a user error, reset it with --evnode.clear_cache. The DA height will be back to the last highest known executed DA height for a height.
221-
s.SetDAHeight(max(s.genesis.DAStartHeight, s.cache.DaHeight(), state.DAHeight))
211+
s.daRetrieverHeight.Store(max(s.genesis.DAStartHeight, s.cache.DaHeight(), state.DAHeight))
222212

223213
s.logger.Info().
224214
Uint64("height", state.LastBlockHeight).
225-
Uint64("da_height", s.GetDAHeight()).
215+
Uint64("da_height", s.daRetrieverHeight.Load()).
226216
Str("chain_id", state.ChainID).
227217
Msg("initialized syncer state")
228218

@@ -296,13 +286,13 @@ func (s *Syncer) fetchDAUntilCaughtUp() error {
296286
default:
297287
}
298288

299-
daHeight := max(s.GetDAHeight(), s.cache.DaHeight())
289+
daHeight := max(s.daRetrieverHeight.Load(), s.cache.DaHeight())
300290

301291
events, err := s.daRetriever.RetrieveFromDA(s.ctx, daHeight)
302292
if err != nil {
303293
switch {
304294
case errors.Is(err, coreda.ErrBlobNotFound):
305-
s.SetDAHeight(daHeight + 1)
295+
s.daRetrieverHeight.Store(daHeight + 1)
306296
continue // Fetch next height immediately
307297
case errors.Is(err, coreda.ErrHeightFromFuture):
308298
s.logger.Debug().Err(err).Uint64("da_height", daHeight).Msg("DA is ahead of local target; backing off future height requests")
@@ -327,8 +317,8 @@ func (s *Syncer) fetchDAUntilCaughtUp() error {
327317
}
328318
}
329319

330-
// increment DA height on successful retrieval
331-
s.SetDAHeight(daHeight + 1)
320+
// increment DA retrieval height on successful retrieval
321+
s.daRetrieverHeight.Store(daHeight + 1)
332322
}
333323
}
334324

@@ -534,13 +524,14 @@ func (s *Syncer) trySyncNextBlock(event *common.DAHeightEvent) error {
534524
return err
535525
}
536526

537-
// Apply block
538527
newState, err := s.applyBlock(header.Header, data, currentState)
539528
if err != nil {
540529
return fmt.Errorf("failed to apply block: %w", err)
541530
}
542531

543532
// Update DA height if needed
533+
// This height is only updated when an height is process from DA as P2P
534+
// events do not contain DA height information
544535
if event.DaHeight > newState.DAHeight {
545536
newState.DAHeight = event.DaHeight
546537
}
@@ -666,15 +657,6 @@ func (s *Syncer) sendCriticalError(err error) {
666657
}
667658
}
668659

669-
// sendNonBlockingSignal sends a signal without blocking
670-
func (s *Syncer) sendNonBlockingSignal(ch chan struct{}, name string) {
671-
select {
672-
case ch <- struct{}{}:
673-
default:
674-
s.logger.Debug().Str("channel", name).Msg("channel full, signal dropped")
675-
}
676-
}
677-
678660
// processPendingEvents fetches and processes pending events from cache
679661
// optimistically fetches the next events from cache until no matching heights are found
680662
func (s *Syncer) processPendingEvents() {

block/internal/syncing/syncer_benchmark_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ func BenchmarkSyncerIO(b *testing.B) {
5858
}
5959
require.Len(b, fixt.s.heightInCh, 0)
6060

61-
assert.Equal(b, spec.heights+daHeightOffset, fixt.s.daHeight)
61+
assert.Equal(b, spec.heights+daHeightOffset, fixt.s.daRetrieverHeight)
6262
gotStoreHeight, err := fixt.s.store.Height(b.Context())
6363
require.NoError(b, err)
6464
assert.Equal(b, spec.heights, gotStoreHeight)

block/internal/syncing/syncer_test.go

Lines changed: 10 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -274,23 +274,6 @@ func TestSequentialBlockSync(t *testing.T) {
274274
requireEmptyChan(t, errChan)
275275
}
276276

277-
func TestSyncer_sendNonBlockingSignal(t *testing.T) {
278-
s := &Syncer{logger: zerolog.Nop()}
279-
ch := make(chan struct{}, 1)
280-
ch <- struct{}{}
281-
done := make(chan struct{})
282-
go func() {
283-
s.sendNonBlockingSignal(ch, "test")
284-
close(done)
285-
}()
286-
select {
287-
case <-done:
288-
// ok
289-
case <-time.After(200 * time.Millisecond):
290-
t.Fatal("sendNonBlockingSignal blocked unexpectedly")
291-
}
292-
}
293-
294277
func TestSyncer_processPendingEvents(t *testing.T) {
295278
ds := dssync.MutexWrap(datastore.NewMapDatastore())
296279
st := store.New(ds)
@@ -432,7 +415,7 @@ func TestSyncLoopPersistState(t *testing.T) {
432415
requireEmptyChan(t, errorCh)
433416

434417
t.Log("sync workers on instance1 completed")
435-
require.Equal(t, myFutureDAHeight, syncerInst1.GetDAHeight())
418+
require.Equal(t, myFutureDAHeight, syncerInst1.daRetrieverHeight.Load())
436419

437420
// wait for all events consumed
438421
require.NoError(t, cacheMgr.SaveToDisk())
@@ -482,7 +465,7 @@ func TestSyncLoopPersistState(t *testing.T) {
482465
Run(func(arg mock.Arguments) {
483466
cancel()
484467
// retrieve last one again
485-
assert.Equal(t, syncerInst2.GetDAHeight(), arg.Get(1).(uint64))
468+
assert.Equal(t, syncerInst2.daRetrieverHeight.Load(), arg.Get(1).(uint64))
486469
}).
487470
Return(nil, nil)
488471

@@ -623,14 +606,14 @@ func TestSyncer_InitializeState_CallsReplayer(t *testing.T) {
623606

624607
// Create syncer with minimal dependencies
625608
syncer := &Syncer{
626-
store: mockStore,
627-
exec: mockExec,
628-
genesis: gen,
629-
lastState: &atomic.Pointer[types.State]{},
630-
daHeight: &atomic.Uint64{},
631-
logger: zerolog.Nop(),
632-
ctx: context.Background(),
633-
cache: cm,
609+
store: mockStore,
610+
exec: mockExec,
611+
genesis: gen,
612+
lastState: &atomic.Pointer[types.State]{},
613+
daRetrieverHeight: &atomic.Uint64{},
614+
logger: zerolog.Nop(),
615+
ctx: context.Background(),
616+
cache: cm,
634617
}
635618

636619
// Initialize state - this should call Replayer

0 commit comments

Comments
 (0)