55 "context"
66 "errors"
77 "fmt"
8- "sync"
8+ "sync/atomic "
99 "time"
1010
1111 ds "github.com/ipfs/go-datastore"
@@ -54,8 +54,7 @@ type Sequencer struct {
5454 // Forced inclusion support
5555 daRetriever DARetriever
5656 genesis genesis.Genesis
57- mu sync.RWMutex
58- daHeight uint64
57+ daHeight atomic.Uint64
5958}
6059
6160// NewSequencer creates a new Single Sequencer
@@ -82,8 +81,8 @@ func NewSequencer(
8281 proposer : proposer ,
8382 daRetriever : daRetriever ,
8483 genesis : gen ,
85- daHeight : gen .DAStartHeight ,
8684 }
85+ s .SetDAHeight (gen .DAStartHeight ) // will be overridden by the executor
8786
8887 loadCtx , cancel := context .WithTimeout (context .Background (), 10 * time .Second )
8988 defer cancel ()
@@ -131,55 +130,49 @@ func (c *Sequencer) GetNextBatch(ctx context.Context, req coresequencer.GetNextB
131130
132131 // Retrieve forced inclusion transactions if DARetriever is configured
133132 var forcedTxs [][]byte
134- if c .daRetriever != nil {
135- c .mu .Lock ()
136- currentDAHeight := c .daHeight
137- c .mu .Unlock ()
133+ currentDAHeight := c .daHeight .Load ()
138134
139- forcedEvent , err := c .daRetriever .RetrieveForcedIncludedTxsFromDA (ctx , currentDAHeight )
140- if err != nil {
141- // If we get a height from future error, keep the current DA height and return batch
142- // We'll retry the same height on the next call until DA produces that block
143- if errors .Is (err , coreda .ErrHeightFromFuture ) {
144- c .logger .Debug ().
145- Uint64 ("da_height" , currentDAHeight ).
146- Msg ("DA height from future, waiting for DA to produce block" )
147-
148- batch , err := c .queue .Next (ctx )
149- if err != nil {
150- return nil , err
151- }
152-
153- return & coresequencer.GetNextBatchResponse {
154- Batch : batch ,
155- Timestamp : time .Now (),
156- BatchData : req .LastBatchData ,
157- }, nil
135+ forcedEvent , err := c .daRetriever .RetrieveForcedIncludedTxsFromDA (ctx , currentDAHeight )
136+ if err != nil {
137+ // If we get a height from future error, keep the current DA height and return batch
138+ // We'll retry the same height on the next call until DA produces that block
139+ if errors .Is (err , coreda .ErrHeightFromFuture ) {
140+ c .logger .Debug ().
141+ Uint64 ("da_height" , currentDAHeight ).
142+ Msg ("DA height from future, waiting for DA to produce block" )
143+
144+ batch , err := c .queue .Next (ctx )
145+ if err != nil {
146+ return nil , err
158147 }
159148
160- // If forced inclusion is not configured, continue without forced txs
161- if ! errors .Is (err , block .ErrForceInclusionNotConfigured ) {
162- c .logger .Error ().Err (err ).Uint64 ("da_height" , currentDAHeight ).Msg ("failed to retrieve forced inclusion transactions" )
163- // Continue without forced txs on other errors
164- }
165- } else {
166- forcedTxs = forcedEvent .Txs
167-
168- // Update DA height based on the retrieved event
169- c .mu .Lock ()
170- if forcedEvent .EndDaHeight > c .daHeight {
171- c .daHeight = forcedEvent .EndDaHeight
172- } else if forcedEvent .StartDaHeight > c .daHeight {
173- c .daHeight = forcedEvent .StartDaHeight
174- }
175- c .mu .Unlock ()
149+ return & coresequencer.GetNextBatchResponse {
150+ Batch : batch ,
151+ Timestamp : time .Now (),
152+ BatchData : req .LastBatchData ,
153+ }, nil
154+ }
176155
177- c .logger .Info ().
178- Int ("tx_count" , len (forcedEvent .Txs )).
179- Uint64 ("da_height_start" , forcedEvent .StartDaHeight ).
180- Uint64 ("da_height_end" , forcedEvent .EndDaHeight ).
181- Msg ("retrieved forced inclusion transactions from DA" )
156+ // If forced inclusion is not configured, continue without forced txs
157+ if ! errors .Is (err , block .ErrForceInclusionNotConfigured ) {
158+ c .logger .Error ().Err (err ).Uint64 ("da_height" , currentDAHeight ).Msg ("failed to retrieve forced inclusion transactions" )
159+ // Continue without forced txs on other errors
160+ }
161+ } else {
162+ forcedTxs = forcedEvent .Txs
163+
164+ // Update DA height based on the retrieved event
165+ if forcedEvent .EndDaHeight > currentDAHeight {
166+ c .SetDAHeight (forcedEvent .EndDaHeight )
167+ } else if forcedEvent .StartDaHeight > currentDAHeight {
168+ c .SetDAHeight (forcedEvent .StartDaHeight )
182169 }
170+
171+ c .logger .Info ().
172+ Int ("tx_count" , len (forcedEvent .Txs )).
173+ Uint64 ("da_height_start" , forcedEvent .StartDaHeight ).
174+ Uint64 ("da_height_end" , forcedEvent .EndDaHeight ).
175+ Msg ("retrieved forced inclusion transactions from DA" )
183176 }
184177
185178 batch , err := c .queue .Next (ctx )
@@ -250,15 +243,11 @@ func (c *Sequencer) isValid(Id []byte) bool {
250243// SetDAHeight sets the current DA height for the sequencer
251244// This should be called when the sequencer needs to sync to a specific DA height
252245func (c * Sequencer ) SetDAHeight (height uint64 ) {
253- c .mu .Lock ()
254- defer c .mu .Unlock ()
255- c .daHeight = height
246+ c .daHeight .Store (height )
256247 c .logger .Debug ().Uint64 ("da_height" , height ).Msg ("DA height updated" )
257248}
258249
259250// GetDAHeight returns the current DA height
260251func (c * Sequencer ) GetDAHeight () uint64 {
261- c .mu .RLock ()
262- defer c .mu .RUnlock ()
263- return c .daHeight
252+ return c .daHeight .Load ()
264253}
0 commit comments