Skip to content

Commit 47bfd38

Browse files
committed
add size filtering in da retriever
1 parent bb9284d commit 47bfd38

1 file changed

Lines changed: 154 additions & 17 deletions

File tree

block/internal/syncing/da_retriever.go

Lines changed: 154 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,13 @@ import (
2222
// defaultDATimeout is the default timeout for DA retrieval operations
2323
const defaultDATimeout = 10 * time.Second
2424

25+
// pendingForcedInclusionTx represents a forced inclusion transaction that couldn't fit in the current epoch
26+
// and needs to be retried in future epochs.
27+
type pendingForcedInclusionTx struct {
28+
Data []byte // The transaction data
29+
OriginalHeight uint64 // Original DA height where this transaction was found
30+
}
31+
2532
// DARetriever defines the interface for retrieving events from the DA layer
2633
type DARetriever interface {
2734
RetrieveFromDA(ctx context.Context, daHeight uint64) ([]common.DAHeightEvent, error)
@@ -47,6 +54,10 @@ type daRetriever struct {
4754
// on restart, will be refetch as da height is updated by syncer
4855
pendingHeaders map[uint64]*types.SignedHeader
4956
pendingData map[uint64]*types.Data
57+
58+
// Forced inclusion transactions that couldn't fit in the current epoch
59+
// and need to be retried in future epochs.
60+
pendingForcedInclusionTxs []pendingForcedInclusionTx
5061
}
5162

5263
// NewDARetriever creates a new DA retriever
@@ -77,6 +88,7 @@ func NewDARetriever(
7788
daEpochSize: genesis.DAEpochForcedInclusion,
7889
pendingHeaders: make(map[uint64]*types.SignedHeader),
7990
pendingData: make(map[uint64]*types.Data),
91+
pendingForcedInclusionTxs: make([]pendingForcedInclusionTx, 0),
8092
}
8193
}
8294

@@ -106,8 +118,18 @@ var (
106118
)
107119

108120
// RetrieveForcedIncludedTxsFromDA retrieves forced inclusion transactions from the DA layer.
109-
// It only fetches when daHeight is at the start of an epoch to prevent redundant fetching.
110-
// Returns an error if the epoch start height is not yet available on DA (caller should backoff).
121+
//
122+
// Behavior:
123+
// - At epoch boundaries (when daHeight == epochStart): fetches new forced-inclusion transactions
124+
// from the DA layer for the entire epoch range, processes them, and returns all that fit within
125+
// the max blob size limit. Transactions that don't fit are stored in the pending queue for retry.
126+
// - Outside epoch boundaries (when daHeight != epochStart): returns any pending transactions from
127+
// the queue that were deferred from previous epochs.
128+
// - Pending transactions are kept in-memory only and will be lost on node restart.
129+
//
130+
// Returns:
131+
// - ForcedIncludedEvent with transactions that should be included in the next block (may be empty)
132+
// - Error if forced inclusion is not configured or DA layer is unavailable
111133
func (r *daRetriever) RetrieveForcedIncludedTxsFromDA(ctx context.Context, daHeight uint64) (*common.ForcedIncludedEvent, error) {
112134
if !r.hasForcedInclusionNs {
113135
return nil, ErrForceInclusionNotConfigured
@@ -116,19 +138,42 @@ func (r *daRetriever) RetrieveForcedIncludedTxsFromDA(ctx context.Context, daHei
116138
// Calculate deterministic epoch boundaries
117139
epochStart, epochEnd := types.CalculateEpochBoundaries(daHeight, r.genesis.DAStartHeight, r.daEpochSize)
118140

119-
// Only fetch at epoch start to prevent double fetching as DA height progresses
141+
// If we're not at epoch start, return pending transactions only (if any)
120142
if daHeight != epochStart {
121143
r.logger.Debug().
122144
Uint64("da_height", daHeight).
123145
Uint64("epoch_start", epochStart).
124-
Msg("skipping forced inclusion fetch - not at epoch start")
125-
return &common.ForcedIncludedEvent{
146+
Int("pending_count", len(r.pendingForcedInclusionTxs)).
147+
Msg("not at epoch start - returning pending transactions only")
148+
149+
event := &common.ForcedIncludedEvent{
126150
StartDaHeight: daHeight,
127151
EndDaHeight: daHeight,
128152
Txs: [][]byte{},
129-
}, nil
153+
}
154+
155+
// Return pending txs if any exist
156+
if len(r.pendingForcedInclusionTxs) > 0 {
157+
pendingTxs, indicesToRemove, _ := r.processPendingForcedInclusionTxs()
158+
event.Txs = pendingTxs
159+
160+
// Remove successfully included pending transactions
161+
if len(indicesToRemove) > 0 {
162+
r.removePendingForcedInclusionTxs(indicesToRemove)
163+
r.logger.Debug().
164+
Int("included_count", len(indicesToRemove)).
165+
Int("remaining_count", len(r.pendingForcedInclusionTxs)).
166+
Msg("included pending forced inclusion transactions")
167+
}
168+
}
169+
170+
return event, nil
130171
}
131172

173+
// We're at epoch start - fetch new transactions from DA
174+
175+
currentEpochNumber := types.CalculateEpochNumber(daHeight, r.genesis.DAStartHeight, r.daEpochSize)
176+
132177
event := &common.ForcedIncludedEvent{
133178
StartDaHeight: epochStart,
134179
}
@@ -137,7 +182,7 @@ func (r *daRetriever) RetrieveForcedIncludedTxsFromDA(ctx context.Context, daHei
137182
Uint64("da_height", daHeight).
138183
Uint64("epoch_start", epochStart).
139184
Uint64("epoch_end", epochEnd).
140-
Uint64("epoch_num", types.CalculateEpochNumber(daHeight, r.genesis.DAStartHeight, r.daEpochSize)).
185+
Uint64("epoch_num", currentEpochNumber).
141186
Msg("retrieving forced included transactions from DA")
142187

143188
// Check if both epoch start and end are available before fetching
@@ -161,11 +206,24 @@ func (r *daRetriever) RetrieveForcedIncludedTxsFromDA(ctx context.Context, daHei
161206
}
162207
}
163208

164-
var currentSize int
165209
lastProcessedHeight := epochStart
210+
newPendingTxs := []pendingForcedInclusionTx{}
211+
212+
// Prepend pending transactions from previous epochs at the start of this epoch
213+
pendingTxs, indicesToRemove, currentSize := r.processPendingForcedInclusionTxs()
214+
event.Txs = pendingTxs
215+
216+
// Remove successfully included pending transactions
217+
if len(indicesToRemove) > 0 {
218+
r.removePendingForcedInclusionTxs(indicesToRemove)
219+
r.logger.Debug().
220+
Int("included_count", len(indicesToRemove)).
221+
Int("remaining_count", len(r.pendingForcedInclusionTxs)).
222+
Msg("included pending forced inclusion transactions")
223+
}
166224

167225
// Process epoch start
168-
if err := r.processForcedInclusionBlobs(event, &currentSize, &lastProcessedHeight, epochStartResult, epochStart); err != nil {
226+
if err := r.processForcedInclusionBlobs(event, &currentSize, &lastProcessedHeight, &newPendingTxs, epochStartResult, epochStart); err != nil {
169227
return nil, err
170228
}
171229

@@ -182,30 +240,41 @@ func (r *daRetriever) RetrieveForcedIncludedTxsFromDA(ctx context.Context, daHei
182240
break
183241
}
184242

185-
if err := r.processForcedInclusionBlobs(event, &currentSize, &lastProcessedHeight, result, epochHeight); err != nil {
243+
if err := r.processForcedInclusionBlobs(event, &currentSize, &lastProcessedHeight, &newPendingTxs, result, epochHeight); err != nil {
186244
return nil, err
187245
}
188246
}
189247

190248
// Process epoch end (only if different from start)
191249
if epochEnd != epochStart {
192-
if err := r.processForcedInclusionBlobs(event, &currentSize, &lastProcessedHeight, epochEndResult, epochEnd); err != nil {
250+
if err := r.processForcedInclusionBlobs(event, &currentSize, &lastProcessedHeight, &newPendingTxs, epochEndResult, epochEnd); err != nil {
193251
return nil, err
194252
}
195253
}
196254

255+
// Store any new pending transactions that couldn't fit in this epoch
256+
if len(newPendingTxs) > 0 {
257+
r.pendingForcedInclusionTxs = append(r.pendingForcedInclusionTxs, newPendingTxs...)
258+
r.logger.Info().
259+
Int("new_pending_count", len(newPendingTxs)).
260+
Int("total_pending_count", len(r.pendingForcedInclusionTxs)).
261+
Msg("stored pending forced inclusion transactions for next epoch")
262+
}
263+
197264
// Set the DA height range based on what we actually processed
198265
event.StartDaHeight = epochStart
199266
event.EndDaHeight = lastProcessedHeight
200267

201268
return event, nil
202269
}
203270

204-
// processForcedInclusionBlobs processes blobs from a DA retrieval result and adds them to the event
271+
// processForcedInclusionBlobs processes forced inclusion blobs from a single DA height.
272+
// It accumulates transactions that fit within maxBlobSize and stores excess in newPendingTxs.
205273
func (r *daRetriever) processForcedInclusionBlobs(
206274
event *common.ForcedIncludedEvent,
207275
currentSize *int,
208276
lastProcessedHeight *uint64,
277+
newPendingTxs *[]pendingForcedInclusionTx,
209278
result coreda.ResultRetrieve,
210279
daHeight uint64,
211280
) error {
@@ -230,15 +299,33 @@ func (r *daRetriever) processForcedInclusionBlobs(
230299
// Calculate size of this specific data item
231300
dataSize := len(data)
232301

233-
// Check if adding this data would exceed max blob size
234-
if *currentSize+dataSize > common.DefaultMaxBlobSize {
235-
r.logger.Warn().Msg("forced inclusion data exceeds maximum blob size - reduce DAEpochForcedInclusion if always often")
236-
237-
// TODO(@julienrbrt): we need to keep track of which that haven't been included, so they are retried in the next epoch
302+
// Check if individual blob exceeds max size
303+
if dataSize > int(common.DefaultMaxBlobSize) {
304+
r.logger.Warn().
305+
Uint64("da_height", daHeight).
306+
Int("blob_size", dataSize).
307+
Float64("max_size", common.DefaultMaxBlobSize).
308+
Msg("forced inclusion blob exceeds maximum size - skipping")
309+
return fmt.Errorf("%w: blob size %d exceeds maximum %f", ErrForcedInclusionDataTooLarge, dataSize, common.DefaultMaxBlobSize)
310+
}
238311

312+
// Check if adding this blob would exceed the current epoch's max size
313+
if *currentSize+dataSize > int(common.DefaultMaxBlobSize) {
314+
r.logger.Debug().
315+
Uint64("da_height", daHeight).
316+
Int("current_size", *currentSize).
317+
Int("blob_size", dataSize).
318+
Msg("blob would exceed max size for this epoch - deferring to pending queue")
319+
320+
// Store for next epoch
321+
*newPendingTxs = append(*newPendingTxs, pendingForcedInclusionTx{
322+
Data: data,
323+
OriginalHeight: daHeight,
324+
})
239325
continue
240326
}
241327

328+
// Include this transaction
242329
event.Txs = append(event.Txs, data)
243330
*currentSize += dataSize
244331
*lastProcessedHeight = daHeight
@@ -513,3 +600,53 @@ func createEmptyDataForHeader(ctx context.Context, header *types.SignedHeader) *
513600
},
514601
}
515602
}
603+
604+
// processPendingForcedInclusionTxs processes pending transactions and returns those that fit within the max blob size.
605+
// Returns the transactions to include, the indices of transactions to remove, and the total size used.
606+
func (r *daRetriever) processPendingForcedInclusionTxs() ([][]byte, []int, int) {
607+
var (
608+
currentSize int
609+
txs [][]byte
610+
indicesToRemove []int
611+
)
612+
613+
for i, pendingTx := range r.pendingForcedInclusionTxs {
614+
dataSize := len(pendingTx.Data)
615+
if currentSize+dataSize > int(common.DefaultMaxBlobSize) {
616+
r.logger.Debug().
617+
Int("current_size", currentSize).
618+
Int("data_size", dataSize).
619+
Msg("pending transaction would exceed max blob size, will retry later")
620+
break
621+
}
622+
623+
txs = append(txs, pendingTx.Data)
624+
currentSize += dataSize
625+
indicesToRemove = append(indicesToRemove, i)
626+
}
627+
628+
return txs, indicesToRemove, currentSize
629+
}
630+
631+
// removePendingForcedInclusionTxs removes pending transactions at the specified indices.
632+
// Indices must be sorted in ascending order.
633+
func (r *daRetriever) removePendingForcedInclusionTxs(indices []int) {
634+
if len(indices) == 0 {
635+
return
636+
}
637+
638+
// Create a new slice without the removed elements
639+
newPending := make([]pendingForcedInclusionTx, 0, len(r.pendingForcedInclusionTxs)-len(indices))
640+
removeMap := make(map[int]bool, len(indices))
641+
for _, idx := range indices {
642+
removeMap[idx] = true
643+
}
644+
645+
for i, tx := range r.pendingForcedInclusionTxs {
646+
if !removeMap[i] {
647+
newPending = append(newPending, tx)
648+
}
649+
}
650+
651+
r.pendingForcedInclusionTxs = newPending
652+
}

0 commit comments

Comments
 (0)