Skip to content

Commit b01b838

Browse files
committed
Merge remote-tracking branch 'origin/main' into add-da-blob-api-client
2 parents 9a18a5e + 0fa3370 commit b01b838

22 files changed

Lines changed: 1917 additions & 826 deletions

File tree

.github/workflows/lint.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ jobs:
2626
**/**.go
2727
go.mod
2828
go.sum
29-
- uses: golangci/golangci-lint-action@v9.1.0
29+
- uses: golangci/golangci-lint-action@v9.2.0
3030
with:
3131
version: latest
3232
args: --timeout 10m

apps/evm/cmd/run.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,10 @@ func createSequencer(
169169
return nil, fmt.Errorf("based sequencer mode requires aggregator mode to be enabled")
170170
}
171171

172-
basedSeq := based.NewBasedSequencer(fiRetriever, da, nodeConfig, genesis, logger)
172+
basedSeq, err := based.NewBasedSequencer(ctx, fiRetriever, datastore, genesis, logger)
173+
if err != nil {
174+
return nil, fmt.Errorf("failed to create based sequencer: %w", err)
175+
}
173176

174177
logger.Info().
175178
Str("forced_inclusion_namespace", nodeConfig.DA.GetForcedInclusionNamespace()).

apps/grpc/cmd/run.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,10 @@ func createSequencer(
131131
return nil, fmt.Errorf("based sequencer mode requires aggregator mode to be enabled")
132132
}
133133

134-
basedSeq := based.NewBasedSequencer(fiRetriever, da, nodeConfig, genesis, logger)
134+
basedSeq, err := based.NewBasedSequencer(ctx, fiRetriever, datastore, genesis, logger)
135+
if err != nil {
136+
return nil, fmt.Errorf("failed to create based sequencer: %w", err)
137+
}
135138

136139
logger.Info().
137140
Str("forced_inclusion_namespace", nodeConfig.DA.GetForcedInclusionNamespace()).

apps/testapp/cmd/run.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,10 @@ func createSequencer(
131131
return nil, fmt.Errorf("based sequencer mode requires aggregator mode to be enabled")
132132
}
133133

134-
basedSeq := based.NewBasedSequencer(fiRetriever, da, nodeConfig, genesis, logger)
134+
basedSeq, err := based.NewBasedSequencer(ctx, fiRetriever, datastore, genesis, logger)
135+
if err != nil {
136+
return nil, fmt.Errorf("failed to create based sequencer: %w", err)
137+
}
135138

136139
logger.Info().
137140
Str("forced_inclusion_namespace", nodeConfig.DA.GetForcedInclusionNamespace()).

block/internal/da/client_test.go

Lines changed: 223 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -523,3 +523,226 @@ func TestClient_Retrieve_Timeout(t *testing.T) {
523523
assert.Assert(t, result.Message != "")
524524
})
525525
}
526+
527+
func TestClient_RetrieveHeaders(t *testing.T) {
528+
logger := zerolog.Nop()
529+
dataLayerHeight := uint64(100)
530+
mockIDs := [][]byte{[]byte("id1")}
531+
mockBlobs := [][]byte{[]byte("header-blob")}
532+
mockTimestamp := time.Now()
533+
534+
mockDAInstance := &mockDA{
535+
getIDsFunc: func(ctx context.Context, height uint64, namespace []byte) (*coreda.GetIDsResult, error) {
536+
return &coreda.GetIDsResult{
537+
IDs: mockIDs,
538+
Timestamp: mockTimestamp,
539+
}, nil
540+
},
541+
getFunc: func(ctx context.Context, ids []coreda.ID, namespace []byte) ([]coreda.Blob, error) {
542+
return mockBlobs, nil
543+
},
544+
}
545+
546+
client := NewClient(Config{
547+
DA: mockDAInstance,
548+
Logger: logger,
549+
Namespace: "test-header-ns",
550+
DataNamespace: "test-data-ns",
551+
})
552+
553+
result := client.RetrieveHeaders(context.Background(), dataLayerHeight)
554+
555+
assert.Equal(t, coreda.StatusSuccess, result.Code)
556+
assert.Equal(t, dataLayerHeight, result.Height)
557+
assert.Equal(t, len(mockBlobs), len(result.Data))
558+
}
559+
560+
func TestClient_RetrieveData(t *testing.T) {
561+
logger := zerolog.Nop()
562+
dataLayerHeight := uint64(200)
563+
mockIDs := [][]byte{[]byte("id1"), []byte("id2")}
564+
mockBlobs := [][]byte{[]byte("data-blob-1"), []byte("data-blob-2")}
565+
mockTimestamp := time.Now()
566+
567+
mockDAInstance := &mockDA{
568+
getIDsFunc: func(ctx context.Context, height uint64, namespace []byte) (*coreda.GetIDsResult, error) {
569+
return &coreda.GetIDsResult{
570+
IDs: mockIDs,
571+
Timestamp: mockTimestamp,
572+
}, nil
573+
},
574+
getFunc: func(ctx context.Context, ids []coreda.ID, namespace []byte) ([]coreda.Blob, error) {
575+
return mockBlobs, nil
576+
},
577+
}
578+
579+
client := NewClient(Config{
580+
DA: mockDAInstance,
581+
Logger: logger,
582+
Namespace: "test-header-ns",
583+
DataNamespace: "test-data-ns",
584+
})
585+
586+
result := client.RetrieveData(context.Background(), dataLayerHeight)
587+
588+
assert.Equal(t, coreda.StatusSuccess, result.Code)
589+
assert.Equal(t, dataLayerHeight, result.Height)
590+
assert.Equal(t, len(mockBlobs), len(result.Data))
591+
}
592+
593+
func TestClient_RetrieveBatched(t *testing.T) {
594+
logger := zerolog.Nop()
595+
dataLayerHeight := uint64(100)
596+
597+
// Create 200 IDs to exceed default batch size
598+
numIDs := 200
599+
mockIDs := make([][]byte, numIDs)
600+
for i := range numIDs {
601+
mockIDs[i] = []byte{byte(i)}
602+
}
603+
604+
// Track which batches were requested
605+
batchCalls := []int{}
606+
607+
mockDAInstance := &mockDA{
608+
getIDsFunc: func(ctx context.Context, height uint64, namespace []byte) (*coreda.GetIDsResult, error) {
609+
return &coreda.GetIDsResult{
610+
IDs: mockIDs,
611+
Timestamp: time.Now(),
612+
}, nil
613+
},
614+
getFunc: func(ctx context.Context, ids []coreda.ID, namespace []byte) ([]coreda.Blob, error) {
615+
batchCalls = append(batchCalls, len(ids))
616+
// Return a blob for each ID in the batch
617+
blobs := make([][]byte, len(ids))
618+
for i := range ids {
619+
blobs[i] = []byte("blob")
620+
}
621+
return blobs, nil
622+
},
623+
}
624+
625+
client := NewClient(Config{
626+
DA: mockDAInstance,
627+
Logger: logger,
628+
Namespace: "test-ns",
629+
DataNamespace: "test-data-ns",
630+
RetrieveBatchSize: 50, // Set smaller batch size for testing
631+
})
632+
633+
encodedNamespace := coreda.NamespaceFromString("test-ns")
634+
result := client.Retrieve(context.Background(), dataLayerHeight, encodedNamespace.Bytes())
635+
636+
assert.Equal(t, coreda.StatusSuccess, result.Code)
637+
assert.Equal(t, numIDs, len(result.Data))
638+
639+
// Should have made 4 batches: 50 + 50 + 50 + 50 = 200
640+
assert.Equal(t, 4, len(batchCalls))
641+
assert.Equal(t, 50, batchCalls[0])
642+
assert.Equal(t, 50, batchCalls[1])
643+
assert.Equal(t, 50, batchCalls[2])
644+
assert.Equal(t, 50, batchCalls[3])
645+
}
646+
647+
func TestClient_RetrieveBatched_PartialBatch(t *testing.T) {
648+
logger := zerolog.Nop()
649+
dataLayerHeight := uint64(100)
650+
651+
// Create 175 IDs to test partial batch at the end
652+
numIDs := 175
653+
mockIDs := make([][]byte, numIDs)
654+
for i := range numIDs {
655+
mockIDs[i] = []byte{byte(i)}
656+
}
657+
658+
batchCalls := []int{}
659+
660+
mockDAInstance := &mockDA{
661+
getIDsFunc: func(ctx context.Context, height uint64, namespace []byte) (*coreda.GetIDsResult, error) {
662+
return &coreda.GetIDsResult{
663+
IDs: mockIDs,
664+
Timestamp: time.Now(),
665+
}, nil
666+
},
667+
getFunc: func(ctx context.Context, ids []coreda.ID, namespace []byte) ([]coreda.Blob, error) {
668+
batchCalls = append(batchCalls, len(ids))
669+
blobs := make([][]byte, len(ids))
670+
for i := range ids {
671+
blobs[i] = []byte("blob")
672+
}
673+
return blobs, nil
674+
},
675+
}
676+
677+
client := NewClient(Config{
678+
DA: mockDAInstance,
679+
Logger: logger,
680+
Namespace: "test-ns",
681+
DataNamespace: "test-data-ns",
682+
RetrieveBatchSize: 50,
683+
})
684+
685+
encodedNamespace := coreda.NamespaceFromString("test-ns")
686+
result := client.Retrieve(context.Background(), dataLayerHeight, encodedNamespace.Bytes())
687+
688+
assert.Equal(t, coreda.StatusSuccess, result.Code)
689+
assert.Equal(t, numIDs, len(result.Data))
690+
691+
// Should have made 4 batches: 50 + 50 + 50 + 25 = 175
692+
assert.Equal(t, 4, len(batchCalls))
693+
assert.Equal(t, 50, batchCalls[0])
694+
assert.Equal(t, 50, batchCalls[1])
695+
assert.Equal(t, 50, batchCalls[2])
696+
assert.Equal(t, 25, batchCalls[3]) // Partial batch
697+
}
698+
699+
func TestClient_RetrieveBatched_ErrorInSecondBatch(t *testing.T) {
700+
logger := zerolog.Nop()
701+
dataLayerHeight := uint64(100)
702+
703+
// Create 200 IDs to require multiple batches
704+
numIDs := 200
705+
mockIDs := make([][]byte, numIDs)
706+
for i := range numIDs {
707+
mockIDs[i] = []byte{byte(i)}
708+
}
709+
710+
batchCallCount := 0
711+
712+
mockDAInstance := &mockDA{
713+
getIDsFunc: func(ctx context.Context, height uint64, namespace []byte) (*coreda.GetIDsResult, error) {
714+
return &coreda.GetIDsResult{
715+
IDs: mockIDs,
716+
Timestamp: time.Now(),
717+
}, nil
718+
},
719+
getFunc: func(ctx context.Context, ids []coreda.ID, namespace []byte) ([]coreda.Blob, error) {
720+
batchCallCount++
721+
// Fail on second batch
722+
if batchCallCount == 2 {
723+
return nil, errors.New("network error in batch 2")
724+
}
725+
blobs := make([][]byte, len(ids))
726+
for i := range ids {
727+
blobs[i] = []byte("blob")
728+
}
729+
return blobs, nil
730+
},
731+
}
732+
733+
client := NewClient(Config{
734+
DA: mockDAInstance,
735+
Logger: logger,
736+
Namespace: "test-ns",
737+
DataNamespace: "test-data-ns",
738+
RetrieveBatchSize: 50,
739+
})
740+
741+
encodedNamespace := coreda.NamespaceFromString("test-ns")
742+
result := client.Retrieve(context.Background(), dataLayerHeight, encodedNamespace.Bytes())
743+
744+
assert.Equal(t, coreda.StatusError, result.Code)
745+
assert.Assert(t, result.Message != "")
746+
// Error message should mention the batch range
747+
assert.Assert(t, len(result.Message) > 0)
748+
}

block/internal/da/forced_inclusion_retriever.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"errors"
66
"fmt"
7+
"time"
78

89
"github.com/rs/zerolog"
910

@@ -25,6 +26,7 @@ type ForcedInclusionRetriever struct {
2526

2627
// ForcedInclusionEvent contains forced inclusion transactions retrieved from DA.
2728
type ForcedInclusionEvent struct {
29+
Timestamp time.Time
2830
StartDaHeight uint64
2931
EndDaHeight uint64
3032
Txs [][]byte
@@ -158,6 +160,10 @@ func (r *ForcedInclusionRetriever) processForcedInclusionBlobs(
158160
}
159161
}
160162

163+
if result.Timestamp.After(event.Timestamp) {
164+
event.Timestamp = result.Timestamp
165+
}
166+
161167
r.logger.Debug().
162168
Uint64("height", height).
163169
Int("blob_count", len(result.Data)).

block/internal/da/forced_inclusion_retriever_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,7 @@ func TestForcedInclusionRetriever_RetrieveForcedIncludedTxs_MultiHeightEpoch(t *
244244
assert.Assert(t, event != nil)
245245
assert.Equal(t, event.StartDaHeight, uint64(100))
246246
assert.Equal(t, event.EndDaHeight, uint64(102))
247+
assert.Assert(t, event.Timestamp.After(time.Time{}))
247248

248249
// Should have collected all txs from all heights
249250
expectedTxCount := len(testBlobsByHeight[100]) + len(testBlobsByHeight[101]) + len(testBlobsByHeight[102])
@@ -334,6 +335,7 @@ func TestForcedInclusionRetriever_processForcedInclusionBlobs(t *testing.T) {
334335
} else {
335336
assert.NilError(t, err)
336337
assert.Equal(t, len(event.Txs), tt.expectedTxCount)
338+
assert.Equal(t, event.Timestamp, time.Time{})
337339
}
338340
})
339341
}

da/jsonrpc/blob/README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,17 @@
33
This package is a **trimmed copy** of code from `celestia-node` to stay JSON-compatible with the blob RPC without importing the full Cosmos/Celestia dependency set.
44

55
## Upstream source
6+
67
- `blob.go` comes from `celestia-node/blob/blob.go` @ tag `v0.28.4` (release v0.28.4), with unused pieces removed (blob v1, proof helpers, share length calc, appconsts dependency, etc.).
78
- `submit_options.go` mirrors the exported JSON fields of `celestia-node/state/tx_config.go` @ the same tag, leaving out functional options, defaults, and Cosmos keyring helpers.
89

910
## Why copy instead of import?
11+
1012
- Avoids pulling Cosmos SDK / celestia-app dependencies into ev-node for the small surface we need (blob JSON and commitment for v0).
1113
- Keeps binary size and module graph smaller while remaining wire-compatible with celestia-node's blob service.
1214

1315
## Keeping it in sync
16+
1417
- When celestia-node changes blob JSON or tx config fields, update this package manually:
1518
1. `diff -u pkg/blob/blob.go ../Celestia/celestia-node/blob/blob.go`
1619
2. `diff -u pkg/blob/submit_options.go ../Celestia/celestia-node/state/tx_config.go`

proto/evnode/v1/state.proto

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,3 +19,11 @@ message State {
1919

2020
reserved 7;
2121
}
22+
23+
// SequencerDACheckpoint tracks the position in the DA where transactions were last processed
24+
message SequencerDACheckpoint {
25+
// DA block height being processed
26+
uint64 da_height = 1;
27+
// Index of the next transaction to process within the DA block's forced inclusion batch
28+
uint64 tx_index = 2;
29+
}

0 commit comments

Comments
 (0)