Skip to content

Commit 03baaf6

Browse files
test: Add back missing node integration tests (#2280)
<!-- Please read and fill out this form before submitting your PR. Please make sure you have reviewed our contributors guide before submitting your first PR. NOTE: PR titles should follow semantic commits: https://www.conventionalcommits.org/en/v1.0.0/ --> ## Overview This PR adds/updates the following integration tests for full node and sequencer behavior in Rollkit: **TestTxGossipingMultipleNodesNoDA** Validates transaction gossiping and block synchronization across multiple nodes (1 sequencer, 3 full nodes) over P2P, with the DA layer disabled. **TestTxGossipingMultipleNodesDAIncluded** Ensures transaction gossiping and block synchronization across multiple nodes (1 sequencer, 3 full nodes) using only the DA layer, with P2P gossiping disabled. **TestFastDASync** Verifies that a new node can quickly synchronize with the DA layer using fast sync, ensuring the sync speed is within a small delta of the DA block time. **TestSingleSequencerTwoFullNodesBlockSyncSpeed** Ensures that block synchronization among nodes is not bottlenecked by DA block time, using a fast block time and a slow DA block time. **TestDataExchange** Verifies data exchange and synchronization between nodes in various network topologies, including single and multiple full nodes, and trusted hash scenarios. **TestHeaderExchange** Verifies header exchange and synchronization between nodes in various network topologies, including single and multiple full nodes, and trusted hash scenarios. **TestTwoChainsInOneNamespace** Verifies that two chains in the same namespace (with same or different chain IDs) can coexist and synchronize independently without issues. Also, fixes **TestMaxPendingHeaders** behavior. <!-- Please provide an explanation of the PR, including the appropriate context, background, goal, and rationale. If there is an issue with this information, please provide a tl;dr and link the issue. Ex: Closes #<issue number> --> <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Added height simulation and multi-batch handling in dummy DA and sequencer components for more realistic testing. - Introduced multi-node orchestration helpers to facilitate complex integration tests and synchronization verification. - Expanded full node integration tests to verify transaction propagation, block production, and DA inclusion across various scenarios. - Enhanced node shutdown logging for better observability during tests. - **Bug Fixes** - Corrected data inclusion checks to account for current sync height, preventing false positives. - Improved error handling for future height queries in DA clients and JSON-RPC, providing clearer error messages. - **Documentation** - Updated comments and terminology to reflect new data sync naming conventions and reference cleanup. - **Tests** - Extended testing framework with multi-node setup, synchronization, and timing controls. - Added tests for fast sync, block speed, and multiple chain coexistence. - Refined test timing, context management, and error handling for robustness. - **Chores** - Updated dependency versions, especially core module, to support new features. - Removed obsolete test files and simplified test helper functions for maintainability. - **Style** - Improved debug logs for node shutdown and data retrieval, aiding troubleshooting and observability. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
1 parent baad1ec commit 03baaf6

29 files changed

Lines changed: 1140 additions & 531 deletions

apps/evm/based/cmd/run.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ func NewExtendedRunNodeCmd(ctx context.Context) *cobra.Command {
117117
}
118118
rollDA = &client.DA
119119
} else {
120-
rollDA = coreda.NewDummyDA(100_000, 0, 0)
120+
rollDA = coreda.NewDummyDA(100_000, 0, 0, nodeConfig.DA.BlockTime.Duration)
121121
}
122122

123123
var basedDA coreda.DA
@@ -128,7 +128,7 @@ func NewExtendedRunNodeCmd(ctx context.Context) *cobra.Command {
128128
}
129129
basedDA = &client.DA
130130
} else {
131-
basedDA = coreda.NewDummyDA(100_000, 0, 0)
131+
basedDA = coreda.NewDummyDA(100_000, 0, 0, nodeConfig.DA.BlockTime.Duration)
132132
}
133133

134134
datastore, err := store.NewDefaultKVStore(nodeConfig.RootDir, nodeConfig.DBPath, "based")

block/aggregation.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,8 @@ func (m *Manager) AggregationLoop(ctx context.Context, errCh chan<- error) {
3434
blockTimer := time.NewTimer(0)
3535
defer blockTimer.Stop()
3636

37-
// Lazy Aggregator mode.
38-
// In Lazy Aggregator mode, blocks are built only when there are
37+
// Lazy Sequencer mode.
38+
// In Lazy Sequencer mode, blocks are built only when there are
3939
// transactions or every LazyBlockTime.
4040
if m.config.Node.LazyMode {
4141
if err := m.lazyAggregationLoop(ctx, blockTimer); err != nil {

block/da_includer_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ func newTestManager(t *testing.T) (*Manager, *mocks.Store, *mocks.Executor, *Moc
2424
logger.On("Info", mock.Anything, mock.Anything).Maybe()
2525
logger.On("Warn", mock.Anything, mock.Anything).Maybe()
2626
logger.On("Error", mock.Anything, mock.Anything).Maybe()
27+
// Mock Height to always return a high value so IsDAIncluded works
28+
store.On("Height", mock.Anything).Return(uint64(100), nil).Maybe()
2729
m := &Manager{
2830
store: store,
2931
headerCache: cache.NewCache[types.SignedHeader](),

block/manager.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -442,6 +442,13 @@ func (m *Manager) IsBlockHashSeen(blockHash string) bool {
442442
// IsDAIncluded returns true if the block with the given hash has been seen on DA.
443443
// TODO(tac0turtle): should we use this for pending header system to verify how far ahead a chain is?
444444
func (m *Manager) IsDAIncluded(ctx context.Context, height uint64) (bool, error) {
445+
syncedHeight, err := m.store.Height(ctx)
446+
if err != nil {
447+
return false, err
448+
}
449+
if syncedHeight < height {
450+
return false, nil
451+
}
445452
header, data, err := m.store.GetBlockData(ctx, height)
446453
if err != nil {
447454
return false, err

block/manager_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ func getManager(t *testing.T, da da.DA, gasPrice float64, gasMultiplier float64)
6161
// TestInitialStateClean verifies that getInitialState initializes state correctly when no state is stored.
6262
func TestInitialStateClean(t *testing.T) {
6363
require := require.New(t)
64-
ctx := context.TODO()
64+
ctx := t.Context()
6565

6666
// Create genesis document
6767
genesisData, _, _ := types.GetGenesisWithPrivkey("TestInitialStateClean")
@@ -186,6 +186,7 @@ func TestIsDAIncluded(t *testing.T) {
186186
height := uint64(1)
187187
header, data := types.GetRandomBlock(height, 5, "TestIsDAIncluded")
188188
mockStore.On("GetBlockData", mock.Anything, height).Return(header, data, nil).Times(3)
189+
mockStore.On("Height", mock.Anything).Return(uint64(100), nil).Maybe()
189190
ctx := context.Background()
190191
// IsDAIncluded should return false for unseen hash
191192
require.False(m.IsDAIncluded(ctx, height))

block/retriever.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ func (m *Manager) processNextDAHeaderAndData(ctx context.Context) error {
7676
m.logger.Debug("no blob data found", "daHeight", daHeight, "reason", blobsResp.Message)
7777
return nil
7878
}
79-
m.logger.Debug("retrieved potential data", "n", len(blobsResp.Data), "daHeight", daHeight)
79+
m.logger.Debug("retrieved potential blob data", "n", len(blobsResp.Data), "daHeight", daHeight)
8080
for _, bz := range blobsResp.Data {
8181
if len(bz) == 0 {
8282
m.logger.Debug("ignoring nil or empty blob", "daHeight", daHeight)

block/sync.go

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,6 @@ func (m *Manager) SyncLoop(ctx context.Context, errCh chan<- error) {
4646
}
4747
m.headerCache.SetItem(headerHeight, header)
4848

49-
m.sendNonBlockingSignalToHeaderStoreCh()
50-
m.sendNonBlockingSignalToRetrieveCh()
51-
5249
// check if the dataHash is dataHashForEmptyTxs
5350
// no need to wait for syncing Data, instead prepare now and set
5451
// so that trySyncNextBlock can progress
@@ -70,12 +67,22 @@ func (m *Manager) SyncLoop(ctx context.Context, errCh chan<- error) {
7067
m.headerCache.SetSeen(headerHash)
7168
case dataEvent := <-m.dataInCh:
7269
data := dataEvent.Data
70+
if len(data.Txs) == 0 {
71+
continue
72+
}
73+
7374
daHeight := dataEvent.DAHeight
7475
dataHash := data.DACommitment().String()
76+
dataHeight := uint64(0)
77+
if data.Metadata != nil {
78+
dataHeight = data.Metadata.Height
79+
}
7580
m.logger.Debug("data retrieved",
7681
"daHeight", daHeight,
7782
"hash", dataHash,
83+
"height", dataHeight,
7884
)
85+
7986
if m.dataCache.IsSeen(dataHash) {
8087
m.logger.Debug("data already seen", "data hash", dataHash)
8188
continue
@@ -87,13 +94,12 @@ func (m *Manager) SyncLoop(ctx context.Context, errCh chan<- error) {
8794
}
8895
if data.Metadata != nil {
8996
// Data was sent via the P2P network
90-
dataHeight := data.Metadata.Height
97+
dataHeight = data.Metadata.Height
9198
if dataHeight <= height {
9299
m.logger.Debug("data already seen", "height", dataHeight, "data hash", dataHash)
93100
continue
94101
}
95102
m.dataCache.SetItem(dataHeight, data)
96-
m.dataCache.SetItemByHash(dataHash, data)
97103
}
98104
// If the header is synced already, the data commitment should be associated with a height
99105
if val, ok := m.dataCommitmentToHeight.Load(dataHash); ok {
@@ -109,9 +115,6 @@ func (m *Manager) SyncLoop(ctx context.Context, errCh chan<- error) {
109115

110116
m.dataCache.SetItemByHash(dataHash, data)
111117

112-
m.sendNonBlockingSignalToDataStoreCh()
113-
m.sendNonBlockingSignalToRetrieveCh()
114-
115118
err = m.trySyncNextBlock(ctx, daHeight)
116119
if err != nil {
117120
errCh <- fmt.Errorf("failed to sync next block: %w", err)
@@ -164,6 +167,10 @@ func (m *Manager) trySyncNextBlock(ctx context.Context, daHeight uint64) error {
164167
return fmt.Errorf("failed to apply block: %w", err)
165168
}
166169

170+
if err = m.updateState(ctx, newState); err != nil {
171+
return fmt.Errorf("failed to save updated state: %w", err)
172+
}
173+
167174
if err = m.store.SaveBlockData(ctx, h, d, &h.Signature); err != nil {
168175
return fmt.Errorf("failed to save block: %w", err)
169176
}
@@ -177,10 +184,6 @@ func (m *Manager) trySyncNextBlock(ctx context.Context, daHeight uint64) error {
177184
newState.DAHeight = daHeight
178185
}
179186

180-
if err = m.updateState(ctx, newState); err != nil {
181-
return fmt.Errorf("failed to save updated state: %w", err)
182-
}
183-
184187
m.headerCache.DeleteItem(currentHeight + 1)
185188
m.dataCache.DeleteItem(currentHeight + 1)
186189
m.dataCache.DeleteItemByHash(h.DataHash.String())

core/da/dummy.go

Lines changed: 46 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"crypto/sha256"
66
"errors"
7+
"fmt"
78
"sync"
89
"time"
910
)
@@ -19,10 +20,17 @@ type DummyDA struct {
1920
maxBlobSize uint64
2021
gasPrice float64
2122
gasMultiplier float64
23+
24+
// DA height simulation
25+
currentHeight uint64
26+
blockTime time.Duration
27+
stopCh chan struct{}
2228
}
2329

24-
// NewDummyDA creates a new instance of DummyDA with the specified maximum blob size.
25-
func NewDummyDA(maxBlobSize uint64, gasPrice float64, gasMultiplier float64) *DummyDA {
30+
var ErrHeightFromFutureStr = fmt.Errorf("given height is from the future")
31+
32+
// NewDummyDA creates a new instance of DummyDA with the specified maximum blob size and block time.
33+
func NewDummyDA(maxBlobSize uint64, gasPrice float64, gasMultiplier float64, blockTime time.Duration) *DummyDA {
2634
return &DummyDA{
2735
blobs: make(map[string]Blob),
2836
commitments: make(map[string]Commitment),
@@ -32,9 +40,34 @@ func NewDummyDA(maxBlobSize uint64, gasPrice float64, gasMultiplier float64) *Du
3240
maxBlobSize: maxBlobSize,
3341
gasPrice: gasPrice,
3442
gasMultiplier: gasMultiplier,
43+
blockTime: blockTime,
44+
stopCh: make(chan struct{}),
3545
}
3646
}
3747

48+
// StartHeightTicker starts a goroutine that increments currentHeight every blockTime.
49+
func (d *DummyDA) StartHeightTicker() {
50+
go func() {
51+
ticker := time.NewTicker(d.blockTime)
52+
defer ticker.Stop()
53+
for {
54+
select {
55+
case <-ticker.C:
56+
d.mu.Lock()
57+
d.currentHeight++
58+
d.mu.Unlock()
59+
case <-d.stopCh:
60+
return
61+
}
62+
}
63+
}()
64+
}
65+
66+
// StopHeightTicker stops the height ticker goroutine.
67+
func (d *DummyDA) StopHeightTicker() {
68+
close(d.stopCh)
69+
}
70+
3871
// MaxBlobSize returns the maximum blob size.
3972
func (d *DummyDA) MaxBlobSize(ctx context.Context) (uint64, error) {
4073
return d.maxBlobSize, nil
@@ -71,6 +104,10 @@ func (d *DummyDA) GetIDs(ctx context.Context, height uint64, namespace []byte) (
71104
d.mu.RLock()
72105
defer d.mu.RUnlock()
73106

107+
if height > d.currentHeight {
108+
return nil, fmt.Errorf("%w: requested %d, current %d", ErrHeightFromFutureStr, height, d.currentHeight)
109+
}
110+
74111
ids, exists := d.blobsByHeight[height]
75112
if !exists {
76113
return &GetIDsResult{
@@ -125,7 +162,7 @@ func (d *DummyDA) SubmitWithOptions(ctx context.Context, blobs []Blob, gasPrice
125162
d.mu.Lock()
126163
defer d.mu.Unlock()
127164

128-
height := uint64(len(d.blobsByHeight))
165+
height := d.currentHeight + 1
129166
ids := make([]ID, 0, len(blobs))
130167
var currentSize uint64
131168

@@ -164,7 +201,12 @@ func (d *DummyDA) SubmitWithOptions(ctx context.Context, blobs []Blob, gasPrice
164201
ids = append(ids, id)
165202
}
166203

167-
d.blobsByHeight[height] = ids
204+
// Add the IDs to the blobsByHeight map if they don't already exist
205+
if existingIDs, exists := d.blobsByHeight[height]; exists {
206+
d.blobsByHeight[height] = append(existingIDs, ids...)
207+
} else {
208+
d.blobsByHeight[height] = ids
209+
}
168210
d.timestampsByHeight[height] = time.Now()
169211

170212
return ids, nil

core/da/dummy_test.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,16 @@ package da
33
import (
44
"context"
55
"testing"
6+
"time"
67
)
78

89
func TestDummyDA(t *testing.T) {
10+
testDABlockTime := 100 * time.Millisecond
911
// Create a new DummyDA instance with a max blob size of 1024 bytes
10-
dummyDA := NewDummyDA(1024, 0, 0)
12+
dummyDA := NewDummyDA(1024, 0, 0, testDABlockTime)
13+
dummyDA.StartHeightTicker()
14+
defer dummyDA.StopHeightTicker()
15+
// Height is always 0
1116
ctx := context.Background()
1217

1318
// Test MaxBlobSize
@@ -25,6 +30,7 @@ func TestDummyDA(t *testing.T) {
2530
[]byte("test blob 2"),
2631
}
2732
ids, err := dummyDA.Submit(ctx, blobs, 0, nil)
33+
time.Sleep(testDABlockTime)
2834
if err != nil {
2935
t.Fatalf("Submit failed: %v", err)
3036
}
@@ -47,7 +53,7 @@ func TestDummyDA(t *testing.T) {
4753
}
4854

4955
// Test GetIDs
50-
result, err := dummyDA.GetIDs(ctx, 0, nil)
56+
result, err := dummyDA.GetIDs(ctx, 1, nil)
5157
if err != nil {
5258
t.Fatalf("GetIDs failed: %v", err)
5359
}

core/da/errors.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,5 +11,5 @@ var (
1111
ErrTxAlreadyInMempool = errors.New("tx already in mempool")
1212
ErrTxIncorrectAccountSequence = errors.New("incorrect account sequence")
1313
ErrContextDeadline = errors.New("context deadline")
14-
ErrFutureHeight = errors.New("future height")
14+
ErrHeightFromFuture = errors.New("given height is from the future")
1515
)

0 commit comments

Comments
 (0)