Skip to content

Commit 7f14c5f

Browse files
authored
fix: recover on restart (#2313)
Resolves #2220 tlDr; Payloads require sequential height when send to the pubsub topics. Failed on gaps after restart. * Buffers in `HeaderCh, DataCh` were not persisted * The `syncService.Close` method never completed successfully due to timeout and non empty topic subscribers. This lead to data loss * Any error during `WriteToStoreAndBroadcast` was closing the consumer loop only, not the app * On graceful shutdown, not waited for the Go routines (producers/consumers) which caused data loss The `TestSlowConsumers` replicates the behaviour that I have seen on my box. When the heights have gaps after restart, the channel consumers in `node/full.go` stop their work and everything freezes. With `TestHeaderSyncServiceRestart`, I confirm that the go-headers store is persisting the data when submitted. <!-- Please read and fill out this form before submitting your PR. Please make sure you have reviewed our contributors guide before submitting your first PR. NOTE: PR titles should follow semantic commits: https://www.conventionalcommits.org/en/v1.0.0/ --> ## Overview <!-- Please provide an explanation of the PR, including the appropriate context, background, goal, and rationale. If there is an issue with this information, please provide a tl;dr and link the issue. Ex: Closes #<issue number> --> <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Improved block broadcasting with enhanced modularity and error handling. - Added robust tests simulating slow consumers and verifying persistence across restarts. - Enhanced synchronization service with explicit subscription management and restart tests. - Improved node process management in end-to-end tests, including command-based tracking and graceful shutdown. - **Bug Fixes** - Ensured all background tasks complete before node shutdown for safer operation. - Fixed potential issues with process cleanup and synchronization during tests. - **Refactor** - Replaced direct channel usage with broadcaster interfaces for block publishing. - Streamlined goroutine management and error handling in node operations. - Simplified and improved test setup and teardown procedures. - **Chores** - Updated dependencies to explicitly require key libraries. - Reorganized import statements for clarity. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
1 parent 03baaf6 commit 7f14c5f

10 files changed

Lines changed: 640 additions & 231 deletions

File tree

block/manager.go

Lines changed: 22 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
goheader "github.com/celestiaorg/go-header"
1818
ds "github.com/ipfs/go-datastore"
1919
"github.com/libp2p/go-libp2p/core/crypto"
20+
"golang.org/x/sync/errgroup"
2021

2122
coreda "github.com/rollkit/rollkit/core/da"
2223
coreexecutor "github.com/rollkit/rollkit/core/execution"
@@ -46,9 +47,6 @@ const (
4647
// This is temporary solution. It will be removed in future versions.
4748
maxSubmitAttempts = 30
4849

49-
// Applies to most channels, 100 is a large enough buffer to avoid blocking
50-
channelLength = 100
51-
5250
// Applies to the headerInCh and dataInCh, 10000 is a large enough number for headers per DA block.
5351
eventInChLength = 10000
5452

@@ -90,6 +88,10 @@ type BatchData struct {
9088
Data [][]byte
9189
}
9290

91+
type broadcaster[T any] interface {
92+
WriteToStoreAndBroadcast(ctx context.Context, payload T) error
93+
}
94+
9395
// Manager is responsible for aggregating transactions into blocks.
9496
type Manager struct {
9597
lastState types.State
@@ -104,8 +106,8 @@ type Manager struct {
104106

105107
daHeight *atomic.Uint64
106108

107-
HeaderCh chan *types.SignedHeader
108-
DataCh chan *types.Data
109+
headerBroadcaster broadcaster[*types.SignedHeader]
110+
dataBroadcaster broadcaster[*types.Data]
109111

110112
headerInCh chan NewHeaderEvent
111113
headerStore goheader.Store[*types.SignedHeader]
@@ -268,6 +270,8 @@ func NewManager(
268270
logger log.Logger,
269271
headerStore goheader.Store[*types.SignedHeader],
270272
dataStore goheader.Store[*types.Data],
273+
headerBroadcaster broadcaster[*types.SignedHeader],
274+
dataBroadcaster broadcaster[*types.Data],
271275
seqMetrics *Metrics,
272276
gasPrice float64,
273277
gasMultiplier float64,
@@ -326,15 +330,15 @@ func NewManager(
326330
daH.Store(s.DAHeight)
327331

328332
m := &Manager{
329-
signer: signer,
330-
config: config,
331-
genesis: genesis,
332-
lastState: s,
333-
store: store,
334-
daHeight: &daH,
333+
signer: signer,
334+
config: config,
335+
genesis: genesis,
336+
lastState: s,
337+
store: store,
338+
daHeight: &daH,
339+
headerBroadcaster: headerBroadcaster,
340+
dataBroadcaster: dataBroadcaster,
335341
// channels are buffered to avoid blocking on input/output operations, buffer sizes are arbitrary
336-
HeaderCh: make(chan *types.SignedHeader, channelLength),
337-
DataCh: make(chan *types.Data, channelLength),
338342
headerInCh: make(chan NewHeaderEvent, eventInChLength),
339343
dataInCh: make(chan NewDataEvent, eventInChLength),
340344
headerStoreCh: make(chan struct{}, 1),
@@ -654,24 +658,14 @@ func (m *Manager) publishBlockInternal(ctx context.Context) error {
654658

655659
m.recordMetrics(data)
656660

657-
// Check for shut down event prior to sending the header and block to
658-
// their respective channels. The reason for checking for the shutdown
659-
// event separately is due to the inconsistent nature of the select
660-
// statement when multiple cases are satisfied.
661-
select {
662-
case <-ctx.Done():
663-
return fmt.Errorf("unable to send header and block, context done: %w", ctx.Err())
664-
default:
661+
g, ctx := errgroup.WithContext(ctx)
662+
g.Go(func() error { return m.headerBroadcaster.WriteToStoreAndBroadcast(ctx, header) })
663+
g.Go(func() error { return m.dataBroadcaster.WriteToStoreAndBroadcast(ctx, data) })
664+
if err := g.Wait(); err != nil {
665+
return err
665666
}
666667

667-
// Publish header to channel so that header exchange service can broadcast
668-
m.HeaderCh <- header
669-
670-
// Publish block to channel so that block exchange service can broadcast
671-
m.DataCh <- data
672-
673668
m.logger.Debug("successfully proposed header", "proposer", hex.EncodeToString(header.ProposerAddress), "height", headerHeight)
674-
675669
return nil
676670
}
677671

block/publish_block2_test.go

Lines changed: 245 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,245 @@
1+
package block
2+
3+
import (
4+
"context"
5+
cryptoRand "crypto/rand"
6+
"errors"
7+
"fmt"
8+
"math/rand"
9+
"path/filepath"
10+
"sync"
11+
"testing"
12+
"time"
13+
14+
"cosmossdk.io/log"
15+
ds "github.com/ipfs/go-datastore"
16+
ktds "github.com/ipfs/go-datastore/keytransform"
17+
syncdb "github.com/ipfs/go-datastore/sync"
18+
logging "github.com/ipfs/go-log/v2"
19+
"github.com/libp2p/go-libp2p/core/crypto"
20+
"github.com/stretchr/testify/assert"
21+
"github.com/stretchr/testify/require"
22+
23+
coresequencer "github.com/rollkit/rollkit/core/sequencer"
24+
"github.com/rollkit/rollkit/pkg/config"
25+
genesispkg "github.com/rollkit/rollkit/pkg/genesis"
26+
"github.com/rollkit/rollkit/pkg/p2p"
27+
"github.com/rollkit/rollkit/pkg/p2p/key"
28+
"github.com/rollkit/rollkit/pkg/signer"
29+
"github.com/rollkit/rollkit/pkg/signer/noop"
30+
"github.com/rollkit/rollkit/pkg/store"
31+
rollkitSync "github.com/rollkit/rollkit/pkg/sync"
32+
"github.com/rollkit/rollkit/types"
33+
)
34+
35+
func TestSlowConsumers(t *testing.T) {
36+
logging.SetDebugLogging()
37+
blockTime := 100 * time.Millisecond
38+
specs := map[string]struct {
39+
headerConsumerDelay time.Duration
40+
dataConsumerDelay time.Duration
41+
}{
42+
"slow header consumer": {
43+
headerConsumerDelay: blockTime * 2,
44+
dataConsumerDelay: 0,
45+
},
46+
"slow data consumer": {
47+
headerConsumerDelay: 0,
48+
dataConsumerDelay: blockTime * 2,
49+
},
50+
"both slow": {
51+
headerConsumerDelay: blockTime,
52+
dataConsumerDelay: blockTime,
53+
},
54+
"both fast": {
55+
headerConsumerDelay: 0,
56+
dataConsumerDelay: 0,
57+
},
58+
}
59+
for name, spec := range specs {
60+
t.Run(name, func(t *testing.T) {
61+
workDir := t.TempDir()
62+
dbm := syncdb.MutexWrap(ds.NewMapDatastore())
63+
ctx, cancel := context.WithCancel(t.Context())
64+
65+
pk, _, err := crypto.GenerateEd25519Key(cryptoRand.Reader)
66+
require.NoError(t, err)
67+
noopSigner, err := noop.NewNoopSigner(pk)
68+
require.NoError(t, err)
69+
70+
manager, headerSync, dataSync := setupBlockManager(t, ctx, workDir, dbm, blockTime, noopSigner)
71+
var lastCapturedDataPayload *types.Data
72+
var lastCapturedHeaderPayload *types.SignedHeader
73+
manager.dataBroadcaster = capturingTailBroadcaster[*types.Data](spec.dataConsumerDelay, &lastCapturedDataPayload, dataSync)
74+
manager.headerBroadcaster = capturingTailBroadcaster[*types.SignedHeader](spec.headerConsumerDelay, &lastCapturedHeaderPayload, headerSync)
75+
76+
blockTime := manager.config.Node.BlockTime.Duration
77+
aggCtx, aggCancel := context.WithCancel(ctx)
78+
errChan := make(chan error, 1)
79+
var wg sync.WaitGroup
80+
wg.Add(1)
81+
go func() {
82+
manager.AggregationLoop(aggCtx, errChan)
83+
wg.Done()
84+
}()
85+
86+
// wait for messages to pile up
87+
select {
88+
case err := <-errChan:
89+
require.NoError(t, err)
90+
case <-time.After(spec.dataConsumerDelay + spec.headerConsumerDelay + 3*blockTime):
91+
}
92+
aggCancel()
93+
wg.Wait() // await aggregation loop to finish
94+
t.Log("shutting down block manager")
95+
require.NoError(t, dataSync.Stop(ctx))
96+
require.NoError(t, headerSync.Stop(ctx))
97+
cancel()
98+
require.NotNil(t, lastCapturedHeaderPayload)
99+
require.NotNil(t, lastCapturedDataPayload)
100+
101+
t.Log("restart with new block manager")
102+
ctx, cancel = context.WithCancel(t.Context())
103+
manager, headerSync, dataSync = setupBlockManager(t, ctx, workDir, dbm, blockTime, noopSigner)
104+
105+
var firstCapturedDataPayload *types.Data
106+
var firstCapturedHeaderPayload *types.SignedHeader
107+
manager.dataBroadcaster = capturingHeadBroadcaster[*types.Data](0, &firstCapturedDataPayload, dataSync)
108+
manager.headerBroadcaster = capturingHeadBroadcaster[*types.SignedHeader](0, &firstCapturedHeaderPayload, headerSync)
109+
go manager.AggregationLoop(ctx, errChan)
110+
select {
111+
case err := <-errChan:
112+
require.NoError(t, err)
113+
case <-time.After(spec.dataConsumerDelay + spec.headerConsumerDelay + 3*blockTime):
114+
}
115+
cancel()
116+
require.NotNil(t, firstCapturedHeaderPayload)
117+
assert.InDelta(t, lastCapturedDataPayload.Height(), firstCapturedDataPayload.Height(), 1)
118+
require.NotNil(t, firstCapturedDataPayload)
119+
assert.InDelta(t, lastCapturedHeaderPayload.Height(), firstCapturedHeaderPayload.Height(), 1)
120+
})
121+
}
122+
}
123+
124+
func capturingTailBroadcaster[T interface{ Height() uint64 }](waitDuration time.Duration, target *T, next ...broadcaster[T]) broadcaster[T] {
125+
var lastHeight uint64
126+
return broadcasterFn[T](func(ctx context.Context, payload T) error {
127+
if payload.Height() <= lastHeight {
128+
panic(fmt.Sprintf("got height %d, want %d", payload.Height(), lastHeight+1))
129+
}
130+
131+
time.Sleep(waitDuration)
132+
lastHeight = payload.Height()
133+
*target = payload
134+
var err error
135+
for _, n := range next {
136+
err = errors.Join(n.WriteToStoreAndBroadcast(ctx, payload))
137+
}
138+
139+
return err
140+
})
141+
}
142+
143+
func capturingHeadBroadcaster[T interface{ Height() uint64 }](waitDuration time.Duration, target *T, next ...broadcaster[T]) broadcaster[T] {
144+
var once sync.Once
145+
return broadcasterFn[T](func(ctx context.Context, payload T) error {
146+
once.Do(func() {
147+
*target = payload
148+
})
149+
var err error
150+
for _, n := range next {
151+
err = errors.Join(n.WriteToStoreAndBroadcast(ctx, payload))
152+
}
153+
time.Sleep(waitDuration)
154+
return err
155+
})
156+
}
157+
158+
type broadcasterFn[T any] func(ctx context.Context, payload T) error
159+
160+
func (b broadcasterFn[T]) WriteToStoreAndBroadcast(ctx context.Context, payload T) error {
161+
return b(ctx, payload)
162+
}
163+
164+
func setupBlockManager(t *testing.T, ctx context.Context, workDir string, mainKV ds.Batching, blockTime time.Duration, signer signer.Signer) (*Manager, *rollkitSync.HeaderSyncService, *rollkitSync.DataSyncService) {
165+
t.Helper()
166+
nodeConfig := config.DefaultConfig
167+
nodeConfig.Node.BlockTime = config.DurationWrapper{Duration: blockTime}
168+
nodeConfig.RootDir = workDir
169+
nodeKey, err := key.LoadOrGenNodeKey(filepath.Dir(nodeConfig.ConfigPath()))
170+
require.NoError(t, err)
171+
172+
proposerAddr, err := signer.GetAddress()
173+
require.NoError(t, err)
174+
genesisDoc := genesispkg.Genesis{
175+
ChainID: "test-chain-id",
176+
GenesisDAStartTime: time.Now(),
177+
InitialHeight: 1,
178+
ProposerAddress: proposerAddr,
179+
}
180+
181+
logger := log.NewTestLogger(t)
182+
p2pClient, err := p2p.NewClient(nodeConfig, nodeKey, mainKV, logger, p2p.NopMetrics())
183+
require.NoError(t, err)
184+
185+
// Start p2p client before creating sync service
186+
err = p2pClient.Start(ctx)
187+
require.NoError(t, err)
188+
189+
const RollkitPrefix = "0"
190+
ktds.Wrap(mainKV, ktds.PrefixTransform{Prefix: ds.NewKey(RollkitPrefix)})
191+
headerSyncService, err := rollkitSync.NewHeaderSyncService(mainKV, nodeConfig, genesisDoc, p2pClient, logger.With("module", "HeaderSyncService"))
192+
require.NoError(t, err)
193+
require.NoError(t, headerSyncService.Start(ctx))
194+
dataSyncService, err := rollkitSync.NewDataSyncService(mainKV, nodeConfig, genesisDoc, p2pClient, logger.With("module", "DataSyncService"))
195+
require.NoError(t, err)
196+
require.NoError(t, dataSyncService.Start(ctx))
197+
198+
result, err := NewManager(
199+
ctx,
200+
signer,
201+
nodeConfig,
202+
genesisDoc,
203+
store.New(mainKV),
204+
&mockExecutor{},
205+
coresequencer.NewDummySequencer(),
206+
nil,
207+
logger.With("module", "BlockManager"),
208+
headerSyncService.Store(),
209+
dataSyncService.Store(),
210+
nil,
211+
nil,
212+
NopMetrics(),
213+
1.,
214+
1.,
215+
)
216+
require.NoError(t, err)
217+
return result, headerSyncService, dataSyncService
218+
}
219+
220+
type mockExecutor struct{}
221+
222+
func (m mockExecutor) InitChain(ctx context.Context, genesisTime time.Time, initialHeight uint64, chainID string) (stateRoot []byte, maxBytes uint64, err error) {
223+
return bytesN(32), 10_000, nil
224+
}
225+
226+
func (m mockExecutor) GetTxs(ctx context.Context) ([][]byte, error) {
227+
panic("implement me")
228+
}
229+
230+
func (m mockExecutor) ExecuteTxs(ctx context.Context, txs [][]byte, blockHeight uint64, timestamp time.Time, prevStateRoot []byte) (updatedStateRoot []byte, maxBytes uint64, err error) {
231+
return bytesN(32), 10_000, nil
232+
233+
}
234+
235+
func (m mockExecutor) SetFinal(ctx context.Context, blockHeight uint64) error {
236+
return nil
237+
}
238+
239+
var rnd = rand.New(rand.NewSource(1)) //nolint:gosec // test code only
240+
241+
func bytesN(n int) []byte {
242+
data := make([]byte, n)
243+
_, _ = rnd.Read(data)
244+
return data
245+
}

0 commit comments

Comments
 (0)