Skip to content

Commit d8d470e

Browse files
kant777Alok
authored andcommitted
fix: transactor nonce gaps causing stale queued txs (#896)
* fix: self-healing nonce drift detection in transactor When transactions are lost from the mempool (e.g. after an Erigon node restart), the transactor's local nonce counter can drift permanently ahead of the chain's pending nonce, creating an unrecoverable nonce gap. All subsequent transactions pile up in Erigon's queued pool and never execute. Add drift detection in PendingNonceAt: if the local nonce exceeds the chain's pending nonce by more than maxNonceDrift (default: 5), reset to the chain nonce. This uses no extra RPC calls — the chain's pending nonce is already queried on every call. Also fix a context leak where defer cancel() inside the SendTransaction retry loop caused all context cancellations to pile up until function return instead of being released per iteration. * fix: prevent nonce gap when SendTransaction retries exhaust When all retry attempts fail with DeadlineExceeded, the for loop exits and falls through to the success path, incorrectly calling watcher.Sent() and incrementing the nonce for a transaction that was never actually sent. This creates a permanent nonce gap that leads to stale queued transactions in the mempool. Track the send error and check it after the loop exits. If all retries failed, return the error so the defer puts the nonce back into the channel for reuse.
1 parent 6f86e2a commit d8d470e

2 files changed

Lines changed: 221 additions & 8 deletions

File tree

x/contracts/transactor/transactor.go

Lines changed: 64 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package transactor
22

33
import (
44
"context"
5+
"log/slog"
56
"time"
67

78
"github.com/ethereum/go-ethereum/accounts/abi/bind"
@@ -11,6 +12,14 @@ import (
1112

1213
const (
1314
txnRetriesLimit = 3
15+
16+
// defaultMaxNonceDrift is the maximum allowed difference between the
17+
// local nonce counter and the chain's pending nonce before the transactor
18+
// resets to the chain nonce. Since sends are serialized via a size-1
19+
// channel, under normal operation the drift should be 0. A drift larger
20+
// than this threshold indicates that previously sent transactions were
21+
// lost (e.g. mempool cleared after a node restart).
22+
defaultMaxNonceDrift uint64 = 5
1423
)
1524

1625
// Watcher is an interface that is used to manage the lifecycle of a transaction.
@@ -37,24 +46,52 @@ type Watcher interface {
3746
// of an error, the nonce is put back into the channel so that it can be reused.
3847
type Transactor struct {
3948
bind.ContractBackend
40-
nonceChan chan uint64
41-
watcher Watcher
49+
nonceChan chan uint64
50+
watcher Watcher
51+
maxNonceDrift uint64
52+
logger *slog.Logger
53+
}
54+
55+
// Option is a functional option for configuring the Transactor.
56+
type Option func(*Transactor)
57+
58+
// WithMaxNonceDrift sets the maximum allowed drift between the local nonce
59+
// counter and the chain's pending nonce before the transactor self-heals by
60+
// resetting to the chain nonce.
61+
func WithMaxNonceDrift(n uint64) Option {
62+
return func(t *Transactor) {
63+
t.maxNonceDrift = n
64+
}
65+
}
66+
67+
// WithLogger sets the logger for the transactor.
68+
func WithLogger(l *slog.Logger) Option {
69+
return func(t *Transactor) {
70+
t.logger = l
71+
}
4272
}
4373

4474
func NewTransactor(
4575
backend bind.ContractBackend,
4676
watcher Watcher,
77+
opts ...Option,
4778
) *Transactor {
4879
nonceChan := make(chan uint64, 1)
4980
// We need to send a value to the channel so that the first transaction
5081
// can be sent. The value is not important as the first transaction will
5182
// get the nonce from the blockchain.
5283
nonceChan <- 0
53-
return &Transactor{
84+
t := &Transactor{
5485
ContractBackend: backend,
5586
watcher: watcher,
5687
nonceChan: nonceChan,
88+
maxNonceDrift: defaultMaxNonceDrift,
89+
logger: slog.Default(),
90+
}
91+
for _, opt := range opts {
92+
opt(t)
5793
}
94+
return t
5895
}
5996

6097
func (t *Transactor) PendingNonceAt(ctx context.Context, account common.Address) (uint64, error) {
@@ -73,6 +110,19 @@ func (t *Transactor) PendingNonceAt(ctx context.Context, account common.Address)
73110
if pendingNonce > nonce {
74111
return pendingNonce, nil
75112
}
113+
// Self-healing: if local nonce drifts too far ahead of the chain's
114+
// pending nonce, transactions were likely lost (e.g. mempool cleared
115+
// after a node restart). Reset to chain nonce to close the gap.
116+
// This does not add any extra RPC calls — we already query the
117+
// chain's pending nonce above.
118+
if nonce > pendingNonce+t.maxNonceDrift {
119+
t.logger.Warn("nonce drift detected, resetting to chain pending nonce",
120+
"localNonce", nonce,
121+
"chainPendingNonce", pendingNonce,
122+
"drift", nonce-pendingNonce,
123+
)
124+
return pendingNonce, nil
125+
}
76126
return nonce, nil
77127
}
78128
}
@@ -90,28 +140,34 @@ func (t *Transactor) SendTransaction(ctx context.Context, tx *types.Transaction)
90140
return ctx.Err()
91141
}
92142

143+
var sendErr error
93144
delay := 1 * time.Second
94145
for tries := 0; tries <= txnRetriesLimit; tries++ {
95146
cctx, cancel := context.WithTimeout(ctx, 5*time.Second)
96-
defer cancel()
147+
sendErr = t.ContractBackend.SendTransaction(cctx, tx)
148+
cancel()
97149

98-
if err := t.ContractBackend.SendTransaction(cctx, tx); err != nil {
99-
if err == context.DeadlineExceeded {
150+
if sendErr != nil {
151+
if sendErr == context.DeadlineExceeded {
100152
delay *= 2
101153
retryTimer := time.NewTimer(delay)
102154
select {
103155
case <-ctx.Done():
156+
retryTimer.Stop()
104157
return ctx.Err()
105158
case <-retryTimer.C:
106-
_ = retryTimer.Stop()
107159
}
108160
continue
109161
}
110-
return err
162+
return sendErr
111163
}
112164
break
113165
}
114166

167+
if sendErr != nil {
168+
return sendErr
169+
}
170+
115171
// If the transaction is successful, we need to update the nonce and notify the
116172
// watcher.
117173
t.watcher.Sent(ctx, tx)

x/contracts/transactor/transactor_test.go

Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,159 @@ func TestTrasactor(t *testing.T) {
134134
}
135135
}
136136

137+
// autoAllowWatcher is a watcher that always allows and records sent txs.
138+
type autoAllowWatcher struct {
139+
txnChan chan *types.Transaction
140+
}
141+
142+
func (w *autoAllowWatcher) Allow(_ context.Context, _ uint64) bool {
143+
return true
144+
}
145+
146+
func (w *autoAllowWatcher) Sent(_ context.Context, tx *types.Transaction) {
147+
if w.txnChan != nil {
148+
w.txnChan <- tx
149+
}
150+
}
151+
152+
func TestNonceDriftSelfHealing(t *testing.T) {
153+
t.Parallel()
154+
155+
backend := &testBackend{
156+
nonce: 100, // chain pending nonce
157+
}
158+
watcher := &autoAllowWatcher{txnChan: make(chan *types.Transaction, 16)}
159+
// maxNonceDrift = 5: drift of 6+ triggers reset
160+
txnSender := transactor.NewTransactor(backend, watcher, transactor.WithMaxNonceDrift(5))
161+
162+
// First call: local nonce is 0 (initial), chain says 100 → returns 100
163+
nonce, err := txnSender.PendingNonceAt(context.Background(), common.Address{})
164+
if err != nil {
165+
t.Fatal(err)
166+
}
167+
if nonce != 100 {
168+
t.Fatalf("expected nonce 100, got %d", nonce)
169+
}
170+
171+
// Send nonces 100-109 to advance local nonce to 110
172+
for i := uint64(100); i <= 109; i++ {
173+
backend.nonce = i + 1 // chain keeps up
174+
if i > 100 {
175+
nonce, err = txnSender.PendingNonceAt(context.Background(), common.Address{})
176+
if err != nil {
177+
t.Fatal(err)
178+
}
179+
}
180+
err = txnSender.SendTransaction(context.Background(), types.NewTransaction(nonce, common.Address{}, nil, 0, nil, nil))
181+
if err != nil {
182+
t.Fatal(err)
183+
}
184+
<-watcher.txnChan
185+
nonce = i + 1
186+
}
187+
188+
// Local nonce is now 110. Simulate mempool wipe: chain nonce drops to 100.
189+
// This simulates the node restart scenario where all pending txs are lost.
190+
backend.nonce = 100
191+
192+
// Drift = 110 - 100 = 10 > maxNonceDrift(5) → should reset to 100
193+
nonce, err = txnSender.PendingNonceAt(context.Background(), common.Address{})
194+
if err != nil {
195+
t.Fatal(err)
196+
}
197+
if nonce != 100 {
198+
t.Fatalf("expected nonce to reset to 100 (chain nonce), got %d", nonce)
199+
}
200+
}
201+
202+
func TestNonceDriftWithinThreshold(t *testing.T) {
203+
t.Parallel()
204+
205+
backend := &testBackend{
206+
nonce: 100,
207+
}
208+
watcher := &autoAllowWatcher{txnChan: make(chan *types.Transaction, 16)}
209+
txnSender := transactor.NewTransactor(backend, watcher, transactor.WithMaxNonceDrift(5))
210+
211+
// Get initial nonce from chain (100)
212+
nonce, err := txnSender.PendingNonceAt(context.Background(), common.Address{})
213+
if err != nil {
214+
t.Fatal(err)
215+
}
216+
217+
// Send txs 100-104 to advance local nonce to 105
218+
for i := uint64(100); i <= 104; i++ {
219+
backend.nonce = i // chain nonce stays behind (simulating pending txs)
220+
if i > 100 {
221+
nonce, err = txnSender.PendingNonceAt(context.Background(), common.Address{})
222+
if err != nil {
223+
t.Fatal(err)
224+
}
225+
}
226+
err = txnSender.SendTransaction(context.Background(), types.NewTransaction(nonce, common.Address{}, nil, 0, nil, nil))
227+
if err != nil {
228+
t.Fatal(err)
229+
}
230+
<-watcher.txnChan
231+
nonce = i + 1
232+
}
233+
234+
// Local nonce = 105. Chain nonce = 104 (drift = 1, within threshold).
235+
// Should use local nonce, NOT reset.
236+
backend.nonce = 100
237+
nonce, err = txnSender.PendingNonceAt(context.Background(), common.Address{})
238+
if err != nil {
239+
t.Fatal(err)
240+
}
241+
if nonce != 105 {
242+
t.Fatalf("expected local nonce 105 (within drift threshold), got %d", nonce)
243+
}
244+
}
245+
246+
func TestSendTransactionRetriesExhausted(t *testing.T) {
247+
t.Parallel()
248+
249+
backend := &testBackend{
250+
nonce: 10,
251+
sendTxErr: context.DeadlineExceeded, // all sends timeout
252+
}
253+
watcher := &autoAllowWatcher{txnChan: make(chan *types.Transaction, 1)}
254+
txnSender := transactor.NewTransactor(backend, watcher)
255+
256+
// Get initial nonce
257+
nonce, err := txnSender.PendingNonceAt(context.Background(), common.Address{})
258+
if err != nil {
259+
t.Fatal(err)
260+
}
261+
if nonce != 10 {
262+
t.Fatalf("expected nonce 10, got %d", nonce)
263+
}
264+
265+
// SendTransaction should fail after exhausting all retries
266+
err = txnSender.SendTransaction(context.Background(), types.NewTransaction(nonce, common.Address{}, nil, 0, nil, nil))
267+
if err == nil {
268+
t.Fatal("expected error when all retries exhausted, got nil")
269+
}
270+
271+
// The nonce should NOT have been incremented — it should be reusable.
272+
// Since the defer puts the nonce back, the next PendingNonceAt should return 10.
273+
backend.sendTxErr = nil // clear the error so future sends work
274+
nonce, err = txnSender.PendingNonceAt(context.Background(), common.Address{})
275+
if err != nil {
276+
t.Fatal(err)
277+
}
278+
if nonce != 10 {
279+
t.Fatalf("expected nonce to remain 10 after failed retries, got %d", nonce)
280+
}
281+
282+
// Verify no transaction was reported as sent
283+
select {
284+
case <-watcher.txnChan:
285+
t.Fatal("watcher.Sent should not have been called for failed transaction")
286+
default:
287+
}
288+
}
289+
137290
type testWatcher struct {
138291
allowChan chan uint64
139292
txnChan chan *types.Transaction
@@ -157,6 +310,7 @@ type testBackend struct {
157310
nonce uint64
158311
errNonce uint64
159312
pendingNonceErr error
313+
sendTxErr error // if set, SendTransaction always returns this error
160314
}
161315

162316
func (b *testBackend) PendingNonceAt(ctx context.Context, account common.Address) (uint64, error) {
@@ -167,6 +321,9 @@ func (b *testBackend) PendingNonceAt(ctx context.Context, account common.Address
167321
}
168322

169323
func (b *testBackend) SendTransaction(ctx context.Context, tx *types.Transaction) error {
324+
if b.sendTxErr != nil {
325+
return b.sendTxErr
326+
}
170327
if b.errNonce == tx.Nonce() {
171328
return errors.New("nonce error")
172329
}

0 commit comments

Comments
 (0)