Skip to content

Commit 56bdbe2

Browse files
Add TestTxGoMultipleDAIncluded
1 parent 5d71ce6 commit 56bdbe2

11 files changed

Lines changed: 135 additions & 39 deletions

File tree

core/da/dummy.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ func (d *DummyDA) SubmitWithOptions(ctx context.Context, blobs []Blob, gasPrice
162162
d.mu.Lock()
163163
defer d.mu.Unlock()
164164

165-
height := d.currentHeight
165+
height := d.currentHeight + 1
166166
ids := make([]ID, 0, len(blobs))
167167
var currentSize uint64
168168

@@ -201,7 +201,12 @@ func (d *DummyDA) SubmitWithOptions(ctx context.Context, blobs []Blob, gasPrice
201201
ids = append(ids, id)
202202
}
203203

204-
d.blobsByHeight[height] = ids
204+
// Add the IDs to the blobsByHeight map if they don't already exist
205+
if existingIDs, exists := d.blobsByHeight[height]; exists {
206+
d.blobsByHeight[height] = append(existingIDs, ids...)
207+
} else {
208+
d.blobsByHeight[height] = ids
209+
}
205210
d.timestampsByHeight[height] = time.Now()
206211

207212
return ids, nil

core/da/dummy_test.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,11 @@ import (
77
)
88

99
func TestDummyDA(t *testing.T) {
10+
testDABlockTime := 100 * time.Millisecond
1011
// Create a new DummyDA instance with a max blob size of 1024 bytes
11-
dummyDA := NewDummyDA(1024, 0, 0, 10*time.Second)
12+
dummyDA := NewDummyDA(1024, 0, 0, testDABlockTime)
13+
dummyDA.StartHeightTicker()
14+
defer dummyDA.StopHeightTicker()
1215
// Height is always 0
1316
ctx := context.Background()
1417

@@ -27,6 +30,7 @@ func TestDummyDA(t *testing.T) {
2730
[]byte("test blob 2"),
2831
}
2932
ids, err := dummyDA.Submit(ctx, blobs, 0, nil)
33+
time.Sleep(testDABlockTime)
3034
if err != nil {
3135
t.Fatalf("Submit failed: %v", err)
3236
}
@@ -49,7 +53,7 @@ func TestDummyDA(t *testing.T) {
4953
}
5054

5155
// Test GetIDs
52-
result, err := dummyDA.GetIDs(ctx, 0, nil)
56+
result, err := dummyDA.GetIDs(ctx, 1, nil)
5357
if err != nil {
5458
t.Fatalf("GetIDs failed: %v", err)
5559
}

core/sequencer/dummy_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -316,7 +316,7 @@ func TestDummySequencer_BatchOverwrite(t *testing.T) {
316316

317317
// Get the second batch and verify it's batch2
318318
getResp, err = seq.GetNextBatch(ctx, GetNextBatchRequest{
319-
RollupId: rollupID,
319+
Id: ID,
320320
})
321321

322322
if err != nil {

da/jsonrpc/client.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import (
66
"errors"
77
"fmt"
88
"net/http"
9-
"strings"
109

1110
"cosmossdk.io/log"
1211
"github.com/filecoin-project/go-jsonrpc"
@@ -61,7 +60,7 @@ func (api *API) GetIDs(ctx context.Context, height uint64, _ []byte) (*da.GetIDs
6160
api.Logger.Debug("RPC call indicates blobs not found", "method", "GetIDs", "height", height)
6261
return nil, err // Return the specific ErrBlobNotFound
6362
}
64-
if strings.Contains(err.Error(), da.ErrHeightFromFuture.Error()) {
63+
if errors.Is(err, da.ErrHeightFromFuture) {
6564
api.Logger.Debug("RPC call indicates height from future", "method", "GetIDs", "height", height)
6665
return nil, err // Return the specific ErrHeightFromFuture
6766
}

da/jsonrpc/proxy_test.go

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ func TestProxy(t *testing.T) {
7373
t.Run("Given height is from the future", func(t *testing.T) {
7474
HeightFromFutureTest(t, &client.DA)
7575
})
76+
dummy.StopHeightTicker()
7677
}
7778

7879
// BasicDATest tests round trip of messages to DA and back.
@@ -137,6 +138,7 @@ func GetIDsTest(t *testing.T, d coreda.DA) {
137138

138139
ctx := t.Context()
139140
ids, err := d.Submit(ctx, msgs, 0, testNamespace)
141+
time.Sleep(getTestDABlockTime())
140142
assert.NoError(t, err)
141143
assert.Len(t, ids, len(msgs))
142144
found := false
@@ -145,7 +147,7 @@ func GetIDsTest(t *testing.T, d coreda.DA) {
145147
// To Keep It Simple: we assume working with DA used exclusively for this test (mock, devnet, etc)
146148
// As we're the only user, we don't need to handle external data (that could be submitted in real world).
147149
// There is no notion of height, so we need to scan the DA to get test data back.
148-
for i := uint64(0); !found && !time.Now().After(end); i++ {
150+
for i := uint64(1); !found && !time.Now().After(end); i++ {
149151
ret, err := d.GetIDs(ctx, i, []byte{})
150152
if err != nil {
151153
if strings.Contains(err.Error(), coreda.ErrHeightFromFuture.Error()) {
@@ -161,11 +163,19 @@ func GetIDsTest(t *testing.T, d coreda.DA) {
161163

162164
// Submit ensures atomicity of batch, so it makes sense to compare actual blobs (bodies) only when lengths
163165
// of slices is the same.
164-
if len(blobs) == len(msgs) {
166+
if len(blobs) >= len(msgs) {
165167
found = true
166-
for b := 0; b < len(blobs); b++ {
167-
if !bytes.Equal(blobs[b], msgs[b]) {
168+
for _, msg := range msgs {
169+
msgFound := false
170+
for _, blob := range blobs {
171+
if bytes.Equal(blob, msg) {
172+
msgFound = true
173+
break
174+
}
175+
}
176+
if !msgFound {
168177
found = false
178+
break
169179
}
170180
}
171181
}

node/full_node_integration_test.go

Lines changed: 59 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,61 @@ import (
1111
rollkitconfig "github.com/rollkit/rollkit/pkg/config"
1212
)
1313

14-
// TestTxGossipingAndAggregation tests that transactions are gossiped and blocks are aggregated and synced across multiple nodes.
14+
// TestTxGossipingMultipleNodesNoDA tests that transactions are gossiped and blocks are sequenced and synced across multiple nodes without the DA layer over P2P.
15+
// It creates 4 nodes (1 sequencer, 3 full nodes), injects a transaction, waits for all nodes to sync, and asserts block equality.
16+
func TestTxGossipingMultipleNodesNoDA(t *testing.T) {
17+
require := require.New(t)
18+
config := getTestConfig(t, 1)
19+
// Set the DA block time to a very large value to ensure that the DA layer is not used
20+
config.DA.BlockTime = rollkitconfig.DurationWrapper{Duration: 100 * time.Second}
21+
numNodes := 4
22+
nodes, cleanups := createNodesWithCleanup(t, numNodes, config)
23+
for _, cleanup := range cleanups {
24+
defer cleanup()
25+
}
26+
27+
ctxs, cancels := createNodeContexts(numNodes)
28+
var runningWg sync.WaitGroup
29+
30+
// Start only the sequencer first
31+
startNodeInBackground(t, nodes, ctxs, &runningWg, 0)
32+
33+
// Wait for the first block to be produced by the sequencer
34+
err := waitForFirstBlock(nodes[0], Header)
35+
require.NoError(err)
36+
37+
// Verify block manager is properly initialized
38+
require.NotNil(nodes[0].blockManager, "Block manager should be initialized")
39+
40+
// Start the other nodes
41+
for i := 1; i < numNodes; i++ {
42+
startNodeInBackground(t, nodes, ctxs, &runningWg, i)
43+
}
44+
45+
// Inject a transaction into the sequencer's executor
46+
executor := nodes[0].blockManager.GetExecutor().(*coreexecutor.DummyExecutor)
47+
executor.InjectTx([]byte("test tx"))
48+
49+
blocksToWaitFor := uint64(5)
50+
// Wait for all nodes to reach at least 5 blocks with DA inclusion
51+
for _, node := range nodes {
52+
require.NoError(waitForAtLeastNBlocks(node, blocksToWaitFor, Store))
53+
}
54+
55+
// Shutdown all nodes and wait
56+
shutdownAndWait(t, cancels, &runningWg, 5*time.Second)
57+
58+
// Assert that all nodes have the same block up to height blocksToWaitFor
59+
assertAllNodesSynced(t, nodes, blocksToWaitFor)
60+
}
61+
62+
// TestTxGossipingMultipleNodesDAIncluded tests that transactions are gossiped and blocks are sequenced and synced across multiple nodes only using DA. P2P gossiping is disabled.
1563
// It creates 4 nodes (1 sequencer, 3 full nodes), injects a transaction, waits for all nodes to sync with DA inclusion, and asserts block equality.
16-
func TestTxGossipingAndAggregation(t *testing.T) {
64+
func TestTxGossipingMultipleNodesDAIncluded(t *testing.T) {
1765
require := require.New(t)
1866
config := getTestConfig(t, 1)
67+
// Disable P2P gossiping
68+
config.P2P.Peers = "none"
1969

2070
numNodes := 4
2171
nodes, cleanups := createNodesWithCleanup(t, numNodes, config)
@@ -68,8 +118,10 @@ func TestFastDASync(t *testing.T) {
68118

69119
// Set up two nodes with different block and DA block times
70120
config := getTestConfig(t, 1)
71-
config.Node.BlockTime = rollkitconfig.DurationWrapper{Duration: 200 * time.Millisecond}
72-
config.DA.BlockTime = rollkitconfig.DurationWrapper{Duration: 50 * time.Millisecond}
121+
// Set the block time to 2 seconds and the DA block time to 1 second
122+
// Note: these are large values to avoid test failures due to slow CI machines
123+
config.Node.BlockTime = rollkitconfig.DurationWrapper{Duration: 2 * time.Second}
124+
config.DA.BlockTime = rollkitconfig.DurationWrapper{Duration: 1 * time.Second}
73125

74126
nodes, cleanups := createNodesWithCleanup(t, 2, config)
75127
for _, cleanup := range cleanups {
@@ -83,7 +135,7 @@ func TestFastDASync(t *testing.T) {
83135
startNodeInBackground(t, nodes, ctxs, &runningWg, 0)
84136

85137
// Wait for the first node to produce a few blocks
86-
blocksToWaitFor := uint64(5)
138+
blocksToWaitFor := uint64(2)
87139
require.NoError(waitForAtLeastNDAIncludedHeight(nodes[0], blocksToWaitFor))
88140

89141
// Now start the second node and time its sync
@@ -94,7 +146,7 @@ func TestFastDASync(t *testing.T) {
94146
syncDuration := time.Since(start)
95147

96148
// Ensure node syncs within a small delta of DA block time
97-
delta := 75 * time.Millisecond
149+
delta := 250 * time.Millisecond
98150
require.Less(syncDuration, config.DA.BlockTime.Duration+delta, "Block sync should be faster than DA block time")
99151

100152
// Verify both nodes are synced and that the synced block is DA-included
@@ -276,7 +328,7 @@ func testSingleSequencerTwoFullNodes(t *testing.T, source Source) {
276328
shutdownAndWait(t, cancels, &runningWg, 5*time.Second)
277329
}
278330

279-
// testSingleSequencerSingleFullNode sets up a single sequencer and a single full node with a trusted hash, starts the sequencer, waits for it to produce a block, then starts the full node with the trusted hash.
331+
// testSingleSequencerSingleFullNodeTrustedHash sets up a single sequencer and a single full node with a trusted hash, starts the sequencer, waits for it to produce a block, then starts the full node with the trusted hash.
280332
// It waits for both nodes to reach a target block height (using the provided 'source' to determine block inclusion), verifies that both nodes are fully synced, and then shuts them down.
281333
func testSingleSequencerSingleFullNodeTrustedHash(t *testing.T, source Source) {
282334
require := require.New(t)

node/helpers_test.go

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -42,12 +42,16 @@ const (
4242
)
4343

4444
// createTestComponents creates test components for node initialization
45-
func createTestComponents(t *testing.T, config rollkitconfig.Config) (coreexecutor.Executor, coresequencer.Sequencer, coreda.DA, *p2p.Client, datastore.Batching, *key.NodeKey) {
45+
func createTestComponents(t *testing.T, config rollkitconfig.Config) (coreexecutor.Executor, coresequencer.Sequencer, coreda.DA, *p2p.Client, datastore.Batching, *key.NodeKey, func()) {
4646
executor := coreexecutor.NewDummyExecutor()
4747
sequencer := coresequencer.NewDummySequencer()
4848
dummyDA := coreda.NewDummyDA(100_000, 0, 0, config.DA.BlockTime.Duration)
4949
dummyDA.StartHeightTicker()
5050

51+
stopDAHeightTicker := func() {
52+
dummyDA.StopHeightTicker()
53+
}
54+
5155
// Create genesis and keys for P2P client
5256
_, genesisValidatorKey, _ := types.GetGenesisWithPrivkey("test-chain")
5357
p2pKey := &key.NodeKey{
@@ -59,7 +63,7 @@ func createTestComponents(t *testing.T, config rollkitconfig.Config) (coreexecut
5963
require.NotNil(t, p2pClient)
6064
ds := dssync.MutexWrap(datastore.NewMapDatastore())
6165

62-
return executor, sequencer, dummyDA, p2pClient, ds, p2pKey
66+
return executor, sequencer, dummyDA, p2pClient, ds, p2pKey, stopDAHeightTicker
6367
}
6468

6569
func getTestConfig(t *testing.T, n int) rollkitconfig.Config {
@@ -97,8 +101,7 @@ func createNodeWithCleanup(t *testing.T, config rollkitconfig.Config) (*FullNode
97101
remoteSigner, err := remote_signer.NewNoopSigner(genesisValidatorKey)
98102
require.NoError(t, err)
99103

100-
executor, sequencer, dac, p2pClient, ds, _ := createTestComponents(t, config)
101-
104+
executor, sequencer, dac, p2pClient, ds, _, stopDAHeightTicker := createTestComponents(t, config)
102105
require.NoError(t, err)
103106

104107
node, err := NewNode(
@@ -120,6 +123,7 @@ func createNodeWithCleanup(t *testing.T, config rollkitconfig.Config) (*FullNode
120123
cleanup := func() {
121124
// Cancel the context to stop the node
122125
cancel()
126+
stopDAHeightTicker()
123127
}
124128

125129
return node.(*FullNode), cleanup
@@ -141,7 +145,8 @@ func createNodesWithCleanup(t *testing.T, num int, config rollkitconfig.Config)
141145
require.NoError(err)
142146

143147
aggListenAddress := config.P2P.ListenAddress
144-
executor, sequencer, dac, p2pClient, ds, aggP2PKey := createTestComponents(t, config)
148+
aggPeers := config.P2P.Peers
149+
executor, sequencer, dac, p2pClient, ds, aggP2PKey, stopDAHeightTicker := createTestComponents(t, config)
145150
aggPeerID, err := peer.IDFromPrivateKey(aggP2PKey.PrivKey)
146151
require.NoError(err)
147152

@@ -164,18 +169,24 @@ func createNodesWithCleanup(t *testing.T, num int, config rollkitconfig.Config)
164169
cleanup := func() {
165170
// Cancel the context to stop the node
166171
aggCancel()
172+
stopDAHeightTicker()
167173
}
168174

169175
nodes[0], cleanups[0] = aggNode.(*FullNode), cleanup
170176
config.Node.Aggregator = false
171-
aggPeerAddress := fmt.Sprintf("%s/p2p/%s", aggListenAddress, aggPeerID.Loggable()["peerID"].(string))
172-
peersList := []string{aggPeerAddress}
177+
peersList := []string{}
178+
if aggPeers != "none" {
179+
aggPeerAddress := fmt.Sprintf("%s/p2p/%s", aggListenAddress, aggPeerID.Loggable()["peerID"].(string))
180+
peersList = append(peersList, aggPeerAddress)
181+
}
173182
for i := 1; i < num; i++ {
174183
ctx, cancel := context.WithCancel(context.Background())
175-
config.P2P.Peers = strings.Join(peersList, ",")
184+
if aggPeers != "none" {
185+
config.P2P.Peers = strings.Join(peersList, ",")
186+
}
176187
config.P2P.ListenAddress = fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", 40001+i)
177188
config.RPC.Address = fmt.Sprintf("127.0.0.1:%d", 8001+i)
178-
executor, sequencer, _, p2pClient, _, nodeP2PKey := createTestComponents(t, config)
189+
executor, sequencer, _, p2pClient, _, nodeP2PKey, stopDAHeightTicker := createTestComponents(t, config)
179190
node, err := NewNode(
180191
ctx,
181192
config,
@@ -194,6 +205,7 @@ func createNodesWithCleanup(t *testing.T, num int, config rollkitconfig.Config)
194205
cleanup := func() {
195206
// Cancel the context to stop the node
196207
cancel()
208+
stopDAHeightTicker()
197209
}
198210
nodes[i], cleanups[i] = node.(*FullNode), cleanup
199211
nodePeerID, err := peer.IDFromPrivateKey(nodeP2PKey.PrivKey)

pkg/cmd/run_node_test.go

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,14 @@ import (
2424

2525
const MockDANamespace = "test"
2626

27-
func createTestComponents(_ context.Context, t *testing.T) (coreexecutor.Executor, coresequencer.Sequencer, coreda.DA, signer.Signer, *p2p.Client, datastore.Batching) {
27+
func createTestComponents(_ context.Context, t *testing.T) (coreexecutor.Executor, coresequencer.Sequencer, coreda.DA, signer.Signer, *p2p.Client, datastore.Batching, func()) {
2828
executor := coreexecutor.NewDummyExecutor()
2929
sequencer := coresequencer.NewDummySequencer()
3030
dummyDA := coreda.NewDummyDA(100_000, 0, 0, 10*time.Second)
3131
dummyDA.StartHeightTicker()
32+
stopDAHeightTicker := func() {
33+
dummyDA.StopHeightTicker()
34+
}
3235
tmpDir := t.TempDir()
3336
keyProvider, err := filesigner.CreateFileSystemSigner(filepath.Join(tmpDir, "config"), []byte{})
3437
if err != nil {
@@ -38,7 +41,7 @@ func createTestComponents(_ context.Context, t *testing.T) (coreexecutor.Executo
3841
p2pClient := &p2p.Client{}
3942
ds := datastore.NewMapDatastore()
4043

41-
return executor, sequencer, dummyDA, keyProvider, p2pClient, ds
44+
return executor, sequencer, dummyDA, keyProvider, p2pClient, ds, stopDAHeightTicker
4245
}
4346

4447
func TestParseFlags(t *testing.T) {
@@ -78,7 +81,8 @@ func TestParseFlags(t *testing.T) {
7881

7982
args := append([]string{"start"}, flags...)
8083

81-
executor, sequencer, dac, keyProvider, p2pClient, ds := createTestComponents(context.Background(), t)
84+
executor, sequencer, dac, keyProvider, p2pClient, ds, stopDAHeightTicker := createTestComponents(context.Background(), t)
85+
defer stopDAHeightTicker()
8286

8387
nodeConfig := rollconf.DefaultConfig
8488
nodeConfig.RootDir = t.TempDir()
@@ -154,7 +158,8 @@ func TestAggregatorFlagInvariants(t *testing.T) {
154158
for i, flags := range flagVariants {
155159
args := append([]string{"start"}, flags...)
156160

157-
executor, sequencer, dac, keyProvider, p2pClient, ds := createTestComponents(context.Background(), t)
161+
executor, sequencer, dac, keyProvider, p2pClient, ds, stopDAHeightTicker := createTestComponents(context.Background(), t)
162+
defer stopDAHeightTicker()
158163

159164
nodeConfig := rollconf.DefaultConfig
160165
nodeConfig.RootDir = t.TempDir()
@@ -189,7 +194,8 @@ func TestDefaultAggregatorValue(t *testing.T) {
189194
}
190195
for _, tc := range testCases {
191196
t.Run(tc.name, func(t *testing.T) {
192-
executor, sequencer, dac, keyProvider, p2pClient, ds := createTestComponents(context.Background(), t)
197+
executor, sequencer, dac, keyProvider, p2pClient, ds, stopDAHeightTicker := createTestComponents(context.Background(), t)
198+
defer stopDAHeightTicker()
193199

194200
nodeConfig := rollconf.DefaultConfig
195201

@@ -268,7 +274,8 @@ func TestCentralizedAddresses(t *testing.T) {
268274
"--rollkit.da.address=http://central-da:26657",
269275
}
270276

271-
executor, sequencer, dac, keyProvider, p2pClient, ds := createTestComponents(context.Background(), t)
277+
executor, sequencer, dac, keyProvider, p2pClient, ds, stopDAHeightTicker := createTestComponents(context.Background(), t)
278+
defer stopDAHeightTicker()
272279

273280
cmd := newRunNodeCmd(t.Context(), executor, sequencer, dac, keyProvider, p2pClient, ds, nodeConfig)
274281
_ = cmd.Flags().Set(rollconf.FlagRootDir, "custom/root/dir")
@@ -291,7 +298,9 @@ func TestStartNodeErrors(t *testing.T) {
291298
logger := log.NewNopLogger() // Use NopLogger for tests unless specific logging output is needed
292299

293300
// Common setup
294-
executor, sequencer, dac, _, p2pClient, ds := createTestComponents(baseCtx, t)
301+
executor, sequencer, dac, _, p2pClient, ds, stopDAHeightTicker := createTestComponents(baseCtx, t)
302+
defer stopDAHeightTicker()
303+
295304
tmpDir := t.TempDir()
296305

297306
// Create a dummy genesis file for successful load cases

0 commit comments

Comments
 (0)