Skip to content

Commit ce4eadd

Browse files
committed
fix: apply per-request timeout in DA retriever instead of per-height
Previously, the timeout for DA retrieval was derived from the parent context, which could cause the overall operation to timeout even when each individual request completed within its timeout. This was problematic for DA heights with thousands of blocks, where multiple batches of 100 blobs each need to be retrieved. Changes: - Use context.Background() for each GetIDs and Get batch request to ensure each gets a fresh, independent timeout - Check parent context cancellation between requests to still respect graceful shutdown - Add comprehensive tests verifying per-request timeout behavior
1 parent b5f5640 commit ce4eadd

File tree

3 files changed

+250
-14
lines changed

3 files changed

+250
-14
lines changed

block/internal/da/client.go

Lines changed: 46 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -149,11 +149,25 @@ func (c *client) Submit(ctx context.Context, data [][]byte, gasPrice float64, na
149149
}
150150

151151
// Retrieve retrieves blobs from the DA layer at the specified height and namespace.
152+
// Each request (GetIDs and each batch of Get) has its own independent timeout,
153+
// ensuring that retrieval of heights with many blobs doesn't fail due to an overall timeout.
152154
func (c *client) Retrieve(ctx context.Context, height uint64, namespace []byte) coreda.ResultRetrieve {
153-
// 1. Get IDs
154-
getIDsCtx, cancel := context.WithTimeout(ctx, c.defaultTimeout)
155-
defer cancel()
155+
// Check for parent context cancellation before starting
156+
if err := ctx.Err(); err != nil {
157+
return coreda.ResultRetrieve{
158+
BaseResult: coreda.BaseResult{
159+
Code: coreda.StatusError,
160+
Message: fmt.Sprintf("context cancelled before retrieval: %s", err.Error()),
161+
Height: height,
162+
Timestamp: time.Now(),
163+
},
164+
}
165+
}
166+
167+
// 1. Get IDs with per-request timeout (independent of parent context deadline)
168+
getIDsCtx, cancel := context.WithTimeout(context.Background(), c.defaultTimeout)
156169
idsResult, err := c.da.GetIDs(getIDsCtx, height, namespace)
170+
cancel()
157171
if err != nil {
158172
// Handle specific "not found" error
159173
if strings.Contains(err.Error(), coreda.ErrBlobNotFound.Error()) {
@@ -202,13 +216,41 @@ func (c *client) Retrieve(ctx context.Context, height uint64, namespace []byte)
202216
},
203217
}
204218
}
219+
// Check for parent context cancellation after GetIDs
220+
if err := ctx.Err(); err != nil {
221+
return coreda.ResultRetrieve{
222+
BaseResult: coreda.BaseResult{
223+
Code: coreda.StatusError,
224+
Message: fmt.Sprintf("context cancelled after GetIDs: %s", err.Error()),
225+
Height: height,
226+
Timestamp: time.Now(),
227+
},
228+
}
229+
}
230+
205231
// 2. Get Blobs using the retrieved IDs in batches
232+
// Each batch has its own independent timeout to ensure large retrievals don't timeout
206233
batchSize := 100
207234
blobs := make([][]byte, 0, len(idsResult.IDs))
208235
for i := 0; i < len(idsResult.IDs); i += batchSize {
236+
// Check for parent context cancellation before each batch
237+
if err := ctx.Err(); err != nil {
238+
c.logger.Debug().Uint64("height", height).Int("batch_start", i).Err(err).Msg("Context cancelled during batch retrieval")
239+
return coreda.ResultRetrieve{
240+
BaseResult: coreda.BaseResult{
241+
Code: coreda.StatusError,
242+
Message: fmt.Sprintf("context cancelled during batch retrieval at %d: %s", i, err.Error()),
243+
Height: height,
244+
Timestamp: time.Now(),
245+
},
246+
}
247+
}
248+
209249
end := min(i+batchSize, len(idsResult.IDs))
210250

211-
getBlobsCtx, cancel := context.WithTimeout(ctx, c.defaultTimeout)
251+
// Use context.Background() for timeout to ensure each batch gets a fresh timeout
252+
// independent of any parent context deadline
253+
getBlobsCtx, cancel := context.WithTimeout(context.Background(), c.defaultTimeout)
212254
batchBlobs, err := c.da.Get(getBlobsCtx, idsResult.IDs[i:end], namespace)
213255
cancel()
214256
if err != nil {

block/internal/da/client_test.go

Lines changed: 188 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package da
33
import (
44
"context"
55
"errors"
6+
"fmt"
67
"testing"
78
"time"
89

@@ -456,3 +457,190 @@ func TestClient_Retrieve_Timeout(t *testing.T) {
456457
assert.Assert(t, result.Message != "")
457458
})
458459
}
460+
461+
func TestClient_Retrieve_PerRequestTimeout(t *testing.T) {
462+
logger := zerolog.Nop()
463+
dataLayerHeight := uint64(100)
464+
encodedNamespace := coreda.NamespaceFromString("test-namespace")
465+
466+
t.Run("each batch gets independent timeout", func(t *testing.T) {
467+
// Create 250 IDs to force 3 batches (100, 100, 50)
468+
mockIDs := make([][]byte, 250)
469+
for i := range mockIDs {
470+
mockIDs[i] = []byte(fmt.Sprintf("id%d", i))
471+
}
472+
mockTimestamp := time.Now()
473+
474+
batchCount := 0
475+
batchTimeout := 50 * time.Millisecond
476+
477+
mockDAInstance := &mockDA{
478+
getIDsFunc: func(ctx context.Context, height uint64, namespace []byte) (*coreda.GetIDsResult, error) {
479+
return &coreda.GetIDsResult{
480+
IDs: mockIDs,
481+
Timestamp: mockTimestamp,
482+
}, nil
483+
},
484+
getFunc: func(ctx context.Context, ids []coreda.ID, namespace []byte) ([]coreda.Blob, error) {
485+
batchCount++
486+
// Simulate some delay for each batch (less than timeout)
487+
time.Sleep(20 * time.Millisecond)
488+
489+
// Verify each batch's context has its own deadline
490+
deadline, ok := ctx.Deadline()
491+
assert.Assert(t, ok, "batch should have a deadline")
492+
// The deadline should be roughly batchTimeout from now (within tolerance)
493+
remaining := time.Until(deadline)
494+
assert.Assert(t, remaining > 0, "deadline should be in the future")
495+
assert.Assert(t, remaining <= batchTimeout, "deadline should be at most batchTimeout")
496+
497+
// Return mock blobs
498+
blobs := make([][]byte, len(ids))
499+
for i := range blobs {
500+
blobs[i] = []byte("blob")
501+
}
502+
return blobs, nil
503+
},
504+
}
505+
506+
client := NewClient(Config{
507+
DA: mockDAInstance,
508+
Logger: logger,
509+
Namespace: "test-namespace",
510+
DataNamespace: "test-data-namespace",
511+
DefaultTimeout: batchTimeout,
512+
})
513+
514+
result := client.Retrieve(context.Background(), dataLayerHeight, encodedNamespace.Bytes())
515+
516+
assert.Equal(t, coreda.StatusSuccess, result.Code)
517+
assert.Equal(t, 3, batchCount, "should have made 3 batch requests")
518+
assert.Equal(t, 250, len(result.Data), "should have retrieved all blobs")
519+
})
520+
521+
t.Run("succeeds even when total time exceeds single timeout", func(t *testing.T) {
522+
// This test verifies that even if the total operation takes longer than
523+
// a single timeout period, it succeeds because each individual request
524+
// gets its own fresh timeout.
525+
mockIDs := make([][]byte, 300) // 3 batches
526+
for i := range mockIDs {
527+
mockIDs[i] = []byte(fmt.Sprintf("id%d", i))
528+
}
529+
mockTimestamp := time.Now()
530+
531+
perRequestTimeout := 100 * time.Millisecond
532+
delayPerBatch := 40 * time.Millisecond // Each batch takes 40ms
533+
534+
mockDAInstance := &mockDA{
535+
getIDsFunc: func(ctx context.Context, height uint64, namespace []byte) (*coreda.GetIDsResult, error) {
536+
time.Sleep(delayPerBatch) // GetIDs also takes time
537+
return &coreda.GetIDsResult{
538+
IDs: mockIDs,
539+
Timestamp: mockTimestamp,
540+
}, nil
541+
},
542+
getFunc: func(ctx context.Context, ids []coreda.ID, namespace []byte) ([]coreda.Blob, error) {
543+
time.Sleep(delayPerBatch)
544+
blobs := make([][]byte, len(ids))
545+
for i := range blobs {
546+
blobs[i] = []byte("blob")
547+
}
548+
return blobs, nil
549+
},
550+
}
551+
552+
client := NewClient(Config{
553+
DA: mockDAInstance,
554+
Logger: logger,
555+
Namespace: "test-namespace",
556+
DataNamespace: "test-data-namespace",
557+
DefaultTimeout: perRequestTimeout,
558+
})
559+
560+
start := time.Now()
561+
result := client.Retrieve(context.Background(), dataLayerHeight, encodedNamespace.Bytes())
562+
elapsed := time.Since(start)
563+
564+
// Total time: GetIDs (40ms) + 3 batches * 40ms = 160ms
565+
// This is greater than perRequestTimeout (100ms), but should still succeed
566+
// because each individual request completes within its timeout
567+
assert.Equal(t, coreda.StatusSuccess, result.Code)
568+
assert.Assert(t, elapsed > perRequestTimeout, "total time should exceed single timeout")
569+
assert.Equal(t, 300, len(result.Data))
570+
})
571+
572+
t.Run("respects parent context cancellation", func(t *testing.T) {
573+
// Use 5 batches to ensure we have enough time to cancel mid-operation
574+
mockIDs := make([][]byte, 500) // 5 batches of 100
575+
for i := range mockIDs {
576+
mockIDs[i] = []byte(fmt.Sprintf("id%d", i))
577+
}
578+
mockTimestamp := time.Now()
579+
batchCount := 0
580+
581+
mockDAInstance := &mockDA{
582+
getIDsFunc: func(ctx context.Context, height uint64, namespace []byte) (*coreda.GetIDsResult, error) {
583+
return &coreda.GetIDsResult{
584+
IDs: mockIDs,
585+
Timestamp: mockTimestamp,
586+
}, nil
587+
},
588+
getFunc: func(ctx context.Context, ids []coreda.ID, namespace []byte) ([]coreda.Blob, error) {
589+
batchCount++
590+
time.Sleep(50 * time.Millisecond) // Each batch takes 50ms
591+
blobs := make([][]byte, len(ids))
592+
for i := range blobs {
593+
blobs[i] = []byte("blob")
594+
}
595+
return blobs, nil
596+
},
597+
}
598+
599+
client := NewClient(Config{
600+
DA: mockDAInstance,
601+
Logger: logger,
602+
Namespace: "test-namespace",
603+
DataNamespace: "test-data-namespace",
604+
DefaultTimeout: 1 * time.Second,
605+
})
606+
607+
// Create a context that will be cancelled after the second batch completes
608+
// but before all batches finish
609+
ctx, cancel := context.WithCancel(context.Background())
610+
go func() {
611+
time.Sleep(120 * time.Millisecond) // Cancel after ~2 batches (2 * 50ms = 100ms)
612+
cancel()
613+
}()
614+
615+
result := client.Retrieve(ctx, dataLayerHeight, encodedNamespace.Bytes())
616+
617+
// Should fail due to context cancellation
618+
assert.Equal(t, coreda.StatusError, result.Code)
619+
assert.Assert(t, batchCount < 5, "should not complete all batches, got %d", batchCount)
620+
})
621+
622+
t.Run("returns early if parent context already cancelled", func(t *testing.T) {
623+
mockDAInstance := &mockDA{
624+
getIDsFunc: func(ctx context.Context, height uint64, namespace []byte) (*coreda.GetIDsResult, error) {
625+
t.Fatal("GetIDs should not be called if context is already cancelled")
626+
return nil, nil
627+
},
628+
}
629+
630+
client := NewClient(Config{
631+
DA: mockDAInstance,
632+
Logger: logger,
633+
Namespace: "test-namespace",
634+
DataNamespace: "test-data-namespace",
635+
DefaultTimeout: 1 * time.Second,
636+
})
637+
638+
ctx, cancel := context.WithCancel(context.Background())
639+
cancel() // Cancel immediately
640+
641+
result := client.Retrieve(ctx, dataLayerHeight, encodedNamespace.Bytes())
642+
643+
assert.Equal(t, coreda.StatusError, result.Code)
644+
assert.Assert(t, result.Message != "")
645+
})
646+
}

block/internal/syncing/da_retriever_test.go

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,11 @@ import (
2727

2828
// newTestDARetriever creates a DA retriever for testing with the given DA implementation
2929
func newTestDARetriever(t *testing.T, mockDA coreda.DA, cfg config.Config, gen genesis.Genesis) *daRetriever {
30+
return newTestDARetrieverWithTimeout(t, mockDA, cfg, gen, 0)
31+
}
32+
33+
// newTestDARetrieverWithTimeout creates a DA retriever for testing with a custom timeout
34+
func newTestDARetrieverWithTimeout(t *testing.T, mockDA coreda.DA, cfg config.Config, gen genesis.Genesis, timeout time.Duration) *daRetriever {
3035
t.Helper()
3136
if cfg.DA.Namespace == "" {
3237
cfg.DA.Namespace = "test-ns"
@@ -39,10 +44,11 @@ func newTestDARetriever(t *testing.T, mockDA coreda.DA, cfg config.Config, gen g
3944
require.NoError(t, err)
4045

4146
daClient := da.NewClient(da.Config{
42-
DA: mockDA,
43-
Logger: zerolog.Nop(),
44-
Namespace: cfg.DA.Namespace,
45-
DataNamespace: cfg.DA.DataNamespace,
47+
DA: mockDA,
48+
Logger: zerolog.Nop(),
49+
DefaultTimeout: timeout, // 0 means use default (30s)
50+
Namespace: cfg.DA.Namespace,
51+
DataNamespace: cfg.DA.DataNamespace,
4652
})
4753

4854
return NewDARetriever(daClient, cm, gen, zerolog.Nop())
@@ -111,18 +117,19 @@ func TestDARetriever_RetrieveFromDA_HeightFromFuture(t *testing.T) {
111117
}
112118

113119
func TestDARetriever_RetrieveFromDA_Timeout(t *testing.T) {
114-
t.Skip("Skipping flaky timeout test - timing is now controlled by DA client")
115-
116120
mockDA := testmocks.NewMockDA(t)
117121

122+
// Use a short timeout for testing
123+
testTimeout := 50 * time.Millisecond
124+
118125
// Mock GetIDs to hang longer than the timeout
119126
mockDA.EXPECT().GetIDs(mock.Anything, mock.Anything, mock.Anything).
120127
Run(func(ctx context.Context, height uint64, namespace []byte) {
121128
<-ctx.Done()
122129
}).
123130
Return(nil, context.DeadlineExceeded).Maybe()
124131

125-
r := newTestDARetriever(t, mockDA, config.DefaultConfig(), genesis.Genesis{})
132+
r := newTestDARetrieverWithTimeout(t, mockDA, config.DefaultConfig(), genesis.Genesis{}, testTimeout)
126133

127134
start := time.Now()
128135
events, err := r.RetrieveFromDA(context.Background(), 42)
@@ -135,9 +142,8 @@ func TestDARetriever_RetrieveFromDA_Timeout(t *testing.T) {
135142
assert.Len(t, events, 0)
136143

137144
// Verify timeout occurred approximately at expected time (with some tolerance)
138-
// DA client has a 30-second default timeout
139-
assert.Greater(t, duration, 29*time.Second, "should timeout after approximately 30 seconds")
140-
assert.Less(t, duration, 35*time.Second, "should not take much longer than timeout")
145+
assert.GreaterOrEqual(t, duration, testTimeout, "should timeout after approximately the configured timeout")
146+
assert.Less(t, duration, testTimeout*3, "should not take much longer than timeout")
141147
}
142148

143149
func TestDARetriever_RetrieveFromDA_TimeoutFast(t *testing.T) {

0 commit comments

Comments
 (0)