Skip to content

Commit ad0baae

Browse files
kant777Alok
authored andcommitted
fix: monitor-driven nonce reset for stuck transactions (#899)
* fix: replace maxNonceDrift with monitor-driven nonce reset Instead of the transactor detecting nonce drift on every PendingNonceAt call (which conflicts with maxPendingTxs=2048), move stuck detection to the Monitor. When the confirmed nonce doesn't advance for 30s while there are pending txs, the Monitor signals the transactor to reset its local nonce via a NonceOverride channel. This cleanly separates concerns: the Monitor already tracks confirmed nonces and pending transactions, making it the right place for stuck detection. The transactor simply reacts to the override signal. Replaces the maxNonceDrift approach from PR #896 (bug 2 fix). Retains the retry loop fall-through fix (bug 1) from PR #896. * review: fix data race in log, add monitor stuck detection tests - Fix data race: len(m.waitMap) was read without lock in the stuck detection log. Replaced hasPendingTxs() with pendingTxCount() that returns the count under lock, reused in both the condition and log. - Add SetStuckDuration() for testing with short durations. - Add TestStuckDetectionSendsNonceOverride: verifies monitor sends override when confirmed nonce doesn't advance with pending txs. - Add TestNoOverrideWhenNonceAdvances: verifies no override fires when the confirmed nonce advances before stuckDuration.
1 parent 2a8b4d0 commit ad0baae

4 files changed

Lines changed: 275 additions & 66 deletions

File tree

x/contracts/transactor/transactor.go

Lines changed: 19 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -12,23 +12,19 @@ import (
1212

1313
const (
1414
txnRetriesLimit = 3
15-
16-
// defaultMaxNonceDrift is the maximum allowed difference between the
17-
// local nonce counter and the chain's pending nonce before the transactor
18-
// resets to the chain nonce. Since sends are serialized via a size-1
19-
// channel, under normal operation the drift should be 0. A drift larger
20-
// than this threshold indicates that previously sent transactions were
21-
// lost (e.g. mempool cleared after a node restart).
22-
defaultMaxNonceDrift uint64 = 5
2315
)
2416

2517
// Watcher is an interface that is used to manage the lifecycle of a transaction.
2618
// The Allow method is used to determine if a transaction should be sent. The context
2719
// is passed to the method so that the watcher can determine this based on the context.
28-
// The Sent method is is used to notify the watcher that the transaction has been sent.
20+
// The Sent method is used to notify the watcher that the transaction has been sent.
21+
// The NonceOverride method returns a channel that signals the transactor to reset its
22+
// local nonce to the provided value. This is used by the monitor to recover from stuck
23+
// states where transactions were lost (e.g. mempool cleared after a node restart).
2924
type Watcher interface {
3025
Allow(ctx context.Context, nonce uint64) bool
3126
Sent(ctx context.Context, tx *types.Transaction)
27+
NonceOverride() <-chan uint64
3228
}
3329

3430
// Transactor is a wrapper around a bind.ContractBackend that ensures that
@@ -46,24 +42,14 @@ type Watcher interface {
4642
// of an error, the nonce is put back into the channel so that it can be reused.
4743
type Transactor struct {
4844
bind.ContractBackend
49-
nonceChan chan uint64
50-
watcher Watcher
51-
maxNonceDrift uint64
52-
logger *slog.Logger
45+
nonceChan chan uint64
46+
watcher Watcher
47+
logger *slog.Logger
5348
}
5449

5550
// Option is a functional option for configuring the Transactor.
5651
type Option func(*Transactor)
5752

58-
// WithMaxNonceDrift sets the maximum allowed drift between the local nonce
59-
// counter and the chain's pending nonce before the transactor self-heals by
60-
// resetting to the chain nonce.
61-
func WithMaxNonceDrift(n uint64) Option {
62-
return func(t *Transactor) {
63-
t.maxNonceDrift = n
64-
}
65-
}
66-
6753
// WithLogger sets the logger for the transactor.
6854
func WithLogger(l *slog.Logger) Option {
6955
return func(t *Transactor) {
@@ -85,7 +71,6 @@ func NewTransactor(
8571
ContractBackend: backend,
8672
watcher: watcher,
8773
nonceChan: nonceChan,
88-
maxNonceDrift: defaultMaxNonceDrift,
8974
logger: slog.Default(),
9075
}
9176
for _, opt := range opts {
@@ -99,6 +84,17 @@ func (t *Transactor) PendingNonceAt(ctx context.Context, account common.Address)
9984
case <-ctx.Done():
10085
return 0, ctx.Err()
10186
case nonce := <-t.nonceChan:
87+
// Check if the monitor has signaled a nonce override (stuck detection).
88+
select {
89+
case override := <-t.watcher.NonceOverride():
90+
t.logger.Warn("nonce override from monitor, resetting",
91+
"localNonce", nonce,
92+
"overrideNonce", override,
93+
)
94+
nonce = override
95+
default:
96+
}
97+
10298
pendingNonce, err := t.ContractBackend.PendingNonceAt(ctx, account)
10399
if err != nil {
104100
// this naked write is safe as only the SendTransaction writes to
@@ -110,19 +106,6 @@ func (t *Transactor) PendingNonceAt(ctx context.Context, account common.Address)
110106
if pendingNonce > nonce {
111107
return pendingNonce, nil
112108
}
113-
// Self-healing: if local nonce drifts too far ahead of the chain's
114-
// pending nonce, transactions were likely lost (e.g. mempool cleared
115-
// after a node restart). Reset to chain nonce to close the gap.
116-
// This does not add any extra RPC calls — we already query the
117-
// chain's pending nonce above.
118-
if nonce > pendingNonce+t.maxNonceDrift {
119-
t.logger.Warn("nonce drift detected, resetting to chain pending nonce",
120-
"localNonce", nonce,
121-
"chainPendingNonce", pendingNonce,
122-
"drift", nonce-pendingNonce,
123-
)
124-
return pendingNonce, nil
125-
}
126109
return nonce, nil
127110
}
128111
}

x/contracts/transactor/transactor_test.go

Lines changed: 57 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -149,17 +149,44 @@ func (w *autoAllowWatcher) Sent(_ context.Context, tx *types.Transaction) {
149149
}
150150
}
151151

152-
func TestNonceDriftSelfHealing(t *testing.T) {
152+
func (w *autoAllowWatcher) NonceOverride() <-chan uint64 {
153+
return make(chan uint64)
154+
}
155+
156+
// nonceOverrideWatcher is a watcher that allows all txs and supports nonce override.
157+
type nonceOverrideWatcher struct {
158+
txnChan chan *types.Transaction
159+
overrideChan chan uint64
160+
}
161+
162+
func (w *nonceOverrideWatcher) Allow(_ context.Context, _ uint64) bool {
163+
return true
164+
}
165+
166+
func (w *nonceOverrideWatcher) Sent(_ context.Context, tx *types.Transaction) {
167+
if w.txnChan != nil {
168+
w.txnChan <- tx
169+
}
170+
}
171+
172+
func (w *nonceOverrideWatcher) NonceOverride() <-chan uint64 {
173+
return w.overrideChan
174+
}
175+
176+
func TestNonceOverrideFromMonitor(t *testing.T) {
153177
t.Parallel()
154178

155179
backend := &testBackend{
156-
nonce: 100, // chain pending nonce
180+
nonce: 100,
181+
}
182+
overrideChan := make(chan uint64, 1)
183+
watcher := &nonceOverrideWatcher{
184+
txnChan: make(chan *types.Transaction, 16),
185+
overrideChan: overrideChan,
157186
}
158-
watcher := &autoAllowWatcher{txnChan: make(chan *types.Transaction, 16)}
159-
// maxNonceDrift = 5: drift of 6+ triggers reset
160-
txnSender := transactor.NewTransactor(backend, watcher, transactor.WithMaxNonceDrift(5))
187+
txnSender := transactor.NewTransactor(backend, watcher)
161188

162-
// First call: local nonce is 0 (initial), chain says 100 → returns 100
189+
// First call: local nonce is 0, chain says 100 → returns 100
163190
nonce, err := txnSender.PendingNonceAt(context.Background(), common.Address{})
164191
if err != nil {
165192
t.Fatal(err)
@@ -170,7 +197,7 @@ func TestNonceDriftSelfHealing(t *testing.T) {
170197

171198
// Send nonces 100-109 to advance local nonce to 110
172199
for i := uint64(100); i <= 109; i++ {
173-
backend.nonce = i + 1 // chain keeps up
200+
backend.nonce = i + 1
174201
if i > 100 {
175202
nonce, err = txnSender.PendingNonceAt(context.Background(), common.Address{})
176203
if err != nil {
@@ -185,28 +212,32 @@ func TestNonceDriftSelfHealing(t *testing.T) {
185212
nonce = i + 1
186213
}
187214

188-
// Local nonce is now 110. Simulate mempool wipe: chain nonce drops to 100.
189-
// This simulates the node restart scenario where all pending txs are lost.
215+
// Local nonce is now 110. Simulate monitor detecting stuck state and
216+
// sending a nonce override to reset to the confirmed nonce (100).
190217
backend.nonce = 100
218+
overrideChan <- 100
191219

192-
// Drift = 110 - 100 = 10 > maxNonceDrift(5) → should reset to 100
193220
nonce, err = txnSender.PendingNonceAt(context.Background(), common.Address{})
194221
if err != nil {
195222
t.Fatal(err)
196223
}
197224
if nonce != 100 {
198-
t.Fatalf("expected nonce to reset to 100 (chain nonce), got %d", nonce)
225+
t.Fatalf("expected nonce to reset to 100 via monitor override, got %d", nonce)
199226
}
200227
}
201228

202-
func TestNonceDriftWithinThreshold(t *testing.T) {
229+
func TestNonceOverrideNotTriggeredWithoutSignal(t *testing.T) {
203230
t.Parallel()
204231

205232
backend := &testBackend{
206233
nonce: 100,
207234
}
208-
watcher := &autoAllowWatcher{txnChan: make(chan *types.Transaction, 16)}
209-
txnSender := transactor.NewTransactor(backend, watcher, transactor.WithMaxNonceDrift(5))
235+
overrideChan := make(chan uint64, 1)
236+
watcher := &nonceOverrideWatcher{
237+
txnChan: make(chan *types.Transaction, 16),
238+
overrideChan: overrideChan,
239+
}
240+
txnSender := transactor.NewTransactor(backend, watcher)
210241

211242
// Get initial nonce from chain (100)
212243
nonce, err := txnSender.PendingNonceAt(context.Background(), common.Address{})
@@ -216,7 +247,7 @@ func TestNonceDriftWithinThreshold(t *testing.T) {
216247

217248
// Send txs 100-104 to advance local nonce to 105
218249
for i := uint64(100); i <= 104; i++ {
219-
backend.nonce = i // chain nonce stays behind (simulating pending txs)
250+
backend.nonce = i + 1
220251
if i > 100 {
221252
nonce, err = txnSender.PendingNonceAt(context.Background(), common.Address{})
222253
if err != nil {
@@ -231,15 +262,18 @@ func TestNonceDriftWithinThreshold(t *testing.T) {
231262
nonce = i + 1
232263
}
233264

234-
// Local nonce = 105. Chain nonce = 104 (drift = 1, within threshold).
235-
// Should use local nonce, NOT reset.
265+
// No override signal sent — local nonce should be used even if
266+
// chain nonce is lower, because the monitor hasn't detected a stuck state.
267+
// After sending 100-104, local nonce is 105. The last PendingNonceAt in the
268+
// loop saw chain=105, so transactor internal nonce is 105+1=106 after the
269+
// last SendTransaction.
236270
backend.nonce = 100
237271
nonce, err = txnSender.PendingNonceAt(context.Background(), common.Address{})
238272
if err != nil {
239273
t.Fatal(err)
240274
}
241-
if nonce != 105 {
242-
t.Fatalf("expected local nonce 105 (within drift threshold), got %d", nonce)
275+
if nonce != 106 {
276+
t.Fatalf("expected local nonce 106 (no override), got %d", nonce)
243277
}
244278
}
245279

@@ -305,6 +339,10 @@ func (w *testWatcher) Sent(ctx context.Context, tx *types.Transaction) {
305339
w.txnChan <- tx
306340
}
307341

342+
func (w *testWatcher) NonceOverride() <-chan uint64 {
343+
return make(chan uint64)
344+
}
345+
308346
type testBackend struct {
309347
bind.ContractBackend
310348
nonce uint64

x/contracts/txmonitor/txmonitor.go

Lines changed: 66 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,13 @@ var (
2626
ErrMonitorClosed = errors.New("monitor was closed")
2727
)
2828

29+
const (
30+
// defaultStuckDuration is how long the confirmed nonce must remain
31+
// unchanged (while there are pending txs) before the monitor signals
32+
// the transactor to reset its nonce.
33+
defaultStuckDuration = 30 * time.Second
34+
)
35+
2936
type TxnDetails struct {
3037
Hash common.Hash
3138
Nonce uint64
@@ -68,9 +75,11 @@ type Monitor struct {
6875
newTxAdded chan struct{}
6976
nonceUpdate chan struct{}
7077
blockUpdate chan waitCheck
78+
nonceOverrideChan chan uint64
7179
logger *slog.Logger
7280
lastConfirmedNonce atomic.Uint64
7381
maxPendingTxs uint64
82+
stuckDuration time.Duration
7483
metrics *metrics
7584
}
7685

@@ -86,17 +95,19 @@ func New(
8695
saver = noopSaver{}
8796
}
8897
m := &Monitor{
89-
owner: owner,
90-
client: client,
91-
logger: logger,
92-
helper: helper,
93-
saver: saver,
94-
maxPendingTxs: maxPendingTxs,
95-
metrics: newMetrics(),
96-
waitMap: make(map[uint64]map[common.Hash][]chan Result),
97-
newTxAdded: make(chan struct{}),
98-
nonceUpdate: make(chan struct{}),
99-
blockUpdate: make(chan waitCheck),
98+
owner: owner,
99+
client: client,
100+
logger: logger,
101+
helper: helper,
102+
saver: saver,
103+
maxPendingTxs: maxPendingTxs,
104+
stuckDuration: defaultStuckDuration,
105+
metrics: newMetrics(),
106+
waitMap: make(map[uint64]map[common.Hash][]chan Result),
107+
newTxAdded: make(chan struct{}),
108+
nonceUpdate: make(chan struct{}),
109+
blockUpdate: make(chan waitCheck),
110+
nonceOverrideChan: make(chan uint64, 1),
100111
}
101112

102113
pending, err := saver.PendingTxns()
@@ -111,10 +122,30 @@ func New(
111122
return m
112123
}
113124

125+
// SetStuckDuration overrides the default stuck detection duration.
126+
// This is intended for testing.
127+
func (m *Monitor) SetStuckDuration(d time.Duration) {
128+
m.stuckDuration = d
129+
}
130+
114131
func (m *Monitor) Metrics() []prometheus.Collector {
115132
return m.metrics.Metrics()
116133
}
117134

135+
// NonceOverride returns a channel that the transactor reads from to detect
136+
// nonce resets. When the monitor detects that the confirmed nonce has not
137+
// advanced for stuckDuration despite having pending transactions, it sends
138+
// the confirmed nonce on this channel to tell the transactor to reset.
139+
func (m *Monitor) NonceOverride() <-chan uint64 {
140+
return m.nonceOverrideChan
141+
}
142+
143+
func (m *Monitor) pendingTxCount() int {
144+
m.mtx.Lock()
145+
defer m.mtx.Unlock()
146+
return len(m.waitMap)
147+
}
148+
118149
func (m *Monitor) Start(ctx context.Context) <-chan struct{} {
119150
wg := sync.WaitGroup{}
120151
done := make(chan struct{})
@@ -142,6 +173,8 @@ func (m *Monitor) Start(ctx context.Context) <-chan struct{} {
142173

143174
m.logger.Info("monitor started")
144175
lastBlock := uint64(0)
176+
var lastNonceAdvance time.Time
177+
var lastSeenNonce uint64
145178
for {
146179
newTx := false
147180
select {
@@ -177,6 +210,28 @@ func (m *Monitor) Start(ctx context.Context) <-chan struct{} {
177210
m.metrics.lastBlockNumber.Set(float64(currentBlock))
178211
m.triggerNonceUpdate()
179212

213+
// Stuck detection: if the confirmed nonce hasn't advanced
214+
// for stuckDuration while we have pending txs, signal
215+
// the transactor to reset its local nonce.
216+
if lastNonce > lastSeenNonce {
217+
lastSeenNonce = lastNonce
218+
lastNonceAdvance = time.Now()
219+
} else if pendingCount := m.pendingTxCount(); pendingCount > 0 && !lastNonceAdvance.IsZero() &&
220+
time.Since(lastNonceAdvance) >= m.stuckDuration {
221+
m.logger.Warn("stuck detected: confirmed nonce not advancing, signaling nonce reset",
222+
"confirmedNonce", lastNonce,
223+
"stuckFor", time.Since(lastNonceAdvance).String(),
224+
"pendingTxs", pendingCount,
225+
)
226+
select {
227+
case m.nonceOverrideChan <- lastNonce:
228+
default:
229+
// channel already has a pending override
230+
}
231+
// Reset the timer so we don't spam overrides
232+
lastNonceAdvance = time.Now()
233+
}
234+
180235
select {
181236
case m.blockUpdate <- waitCheck{lastNonce, currentBlock}:
182237
default:

0 commit comments

Comments
 (0)