Skip to content

Commit 3ca1936

Browse files
committed
Fix DA submission tracking and shutdown ordering
1 parent 1daa429 commit 3ca1936

2 files changed

Lines changed: 24 additions & 6 deletions

File tree

block/internal/submitting/da_submitter.go

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -223,11 +223,14 @@ func (s *DASubmitter) SubmitHeaders(ctx context.Context, headers []*types.Signed
223223

224224
postSubmit := s.makeHeaderPostSubmit(ctx, cache)
225225
namespace := s.client.GetHeaderNamespace()
226+
submittedOffset := 0
226227

227228
s.wg.Go(func() {
228229
s.submitWithRetry(ctx, envelopes, namespace, func(submittedCount int, daHeight uint64) {
229230
if submittedCount > 0 {
230-
postSubmit(headers[:submittedCount], &datypes.ResultSubmit{BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, SubmittedCount: uint64(submittedCount), Height: daHeight}})
231+
end := submittedOffset + submittedCount
232+
postSubmit(headers[submittedOffset:end], &datypes.ResultSubmit{BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, SubmittedCount: uint64(submittedCount), Height: daHeight}})
233+
submittedOffset = end
231234
}
232235
if onSubmitSuccess != nil {
233236
onSubmitSuccess()
@@ -280,11 +283,14 @@ func (s *DASubmitter) SubmitData(ctx context.Context, unsignedDataList []*types.
280283

281284
postSubmit := s.makeDataPostSubmit(ctx, cache)
282285
namespace := s.client.GetDataNamespace()
286+
submittedOffset := 0
283287

284288
s.wg.Go(func() {
285289
s.submitWithRetry(ctx, signedDataListBz, namespace, func(submittedCount int, daHeight uint64) {
286290
if submittedCount > 0 {
287-
postSubmit(signedDataList[:submittedCount], &datypes.ResultSubmit{BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, SubmittedCount: uint64(submittedCount), Height: daHeight}})
291+
end := submittedOffset + submittedCount
292+
postSubmit(signedDataList[submittedOffset:end], &datypes.ResultSubmit{BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, SubmittedCount: uint64(submittedCount), Height: daHeight}})
293+
submittedOffset = end
288294
}
289295
if onSubmitSuccess != nil {
290296
onSubmitSuccess()
@@ -383,6 +389,15 @@ func (s *DASubmitter) submitWithRetry(
383389
switch res.Code {
384390
case datypes.StatusSuccess:
385391
submitted := int(res.SubmittedCount)
392+
if submitted <= 0 || submitted > len(marshaled) {
393+
err := fmt.Errorf("invalid submitted count %d for batch size %d", submitted, len(marshaled))
394+
s.recordFailure(common.DASubmitterFailureReasonUnknown)
395+
s.logger.Error().Err(err).Str("itemType", itemType).Msg("DA layer returned invalid submitted count")
396+
if onError != nil {
397+
onError(err)
398+
}
399+
return
400+
}
386401
if onSuccess != nil {
387402
onSuccess(submitted, res.Height)
388403
}

block/internal/submitting/submitter.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,6 @@ func (s *Submitter) Stop() error {
154154
if s.cancel != nil {
155155
s.cancel()
156156
}
157-
s.daSubmitter.Close()
158157
// Wait for goroutines to finish with a timeout to prevent hanging
159158
done := make(chan struct{})
160159
go func() {
@@ -163,10 +162,10 @@ func (s *Submitter) Stop() error {
163162
}()
164163
select {
165164
case <-done:
166-
// All goroutines finished cleanly
167165
case <-time.After(5 * time.Second):
168166
s.logger.Warn().Msg("submitter shutdown timed out waiting for goroutines, proceeding anyway")
169167
}
168+
s.daSubmitter.Close()
170169
s.logger.Info().Msg("submitter stopped")
171170
return nil
172171
}
@@ -256,7 +255,9 @@ func (s *Submitter) daSubmissionLoop() {
256255
s.sendCriticalError(fmt.Errorf("unrecoverable DA submission error: %w", err))
257256
return
258257
}
259-
s.logger.Error().Err(err).Msg("failed to submit headers")
258+
if err != nil {
259+
s.logger.Error().Err(err).Msg("failed to submit headers")
260+
}
260261
}
261262
if err := s.daSubmitter.SubmitHeaders(s.ctx, headers, marshalledHeaders, s.cache, s.signer, onSuccess, onError); err != nil {
262263
if len(headers) > 0 {
@@ -334,7 +335,9 @@ func (s *Submitter) daSubmissionLoop() {
334335
s.sendCriticalError(fmt.Errorf("unrecoverable DA submission error: %w", err))
335336
return
336337
}
337-
s.logger.Error().Err(err).Msg("failed to submit data")
338+
if err != nil {
339+
s.logger.Error().Err(err).Msg("failed to submit data")
340+
}
338341
}
339342
if err := s.daSubmitter.SubmitData(s.ctx, signedDataList, marshalledData, s.cache, s.signer, s.genesis, onSuccess, onError); err != nil {
340343
if len(signedDataList) > 0 {

0 commit comments

Comments
 (0)