From 148c6c152d9dce9c90056672b72c744e67489939 Mon Sep 17 00:00:00 2001 From: kant Date: Thu, 5 Mar 2026 00:27:08 -0800 Subject: [PATCH 1/2] fix: self-healing nonce drift detection in transactor MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When transactions are lost from the mempool (e.g. after an Erigon node restart), the transactor's local nonce counter can drift permanently ahead of the chain's pending nonce, creating an unrecoverable nonce gap. All subsequent transactions pile up in Erigon's queued pool and never execute. Add drift detection in PendingNonceAt: if the local nonce exceeds the chain's pending nonce by more than maxNonceDrift (default: 5), reset to the chain nonce. This uses no extra RPC calls — the chain's pending nonce is already queried on every call. Also fix a context leak where defer cancel() inside the SendTransaction retry loop caused all context cancellations to pile up until function return instead of being released per iteration. --- x/contracts/transactor/transactor.go | 63 +++++++++++-- x/contracts/transactor/transactor_test.go | 109 ++++++++++++++++++++++ 2 files changed, 166 insertions(+), 6 deletions(-) diff --git a/x/contracts/transactor/transactor.go b/x/contracts/transactor/transactor.go index b7f2fc1da..568cba968 100644 --- a/x/contracts/transactor/transactor.go +++ b/x/contracts/transactor/transactor.go @@ -2,6 +2,7 @@ package transactor import ( "context" + "log/slog" "time" "github.com/ethereum/go-ethereum/accounts/abi/bind" @@ -11,6 +12,14 @@ import ( const ( txnRetriesLimit = 3 + + // defaultMaxNonceDrift is the maximum allowed difference between the + // local nonce counter and the chain's pending nonce before the transactor + // resets to the chain nonce. Since sends are serialized via a size-1 + // channel, under normal operation the drift should be 0. A drift larger + // than this threshold indicates that previously sent transactions were + // lost (e.g. mempool cleared after a node restart). + defaultMaxNonceDrift uint64 = 5 ) // Watcher is an interface that is used to manage the lifecycle of a transaction. @@ -37,24 +46,52 @@ type Watcher interface { // of an error, the nonce is put back into the channel so that it can be reused. type Transactor struct { bind.ContractBackend - nonceChan chan uint64 - watcher Watcher + nonceChan chan uint64 + watcher Watcher + maxNonceDrift uint64 + logger *slog.Logger +} + +// Option is a functional option for configuring the Transactor. +type Option func(*Transactor) + +// WithMaxNonceDrift sets the maximum allowed drift between the local nonce +// counter and the chain's pending nonce before the transactor self-heals by +// resetting to the chain nonce. +func WithMaxNonceDrift(n uint64) Option { + return func(t *Transactor) { + t.maxNonceDrift = n + } +} + +// WithLogger sets the logger for the transactor. +func WithLogger(l *slog.Logger) Option { + return func(t *Transactor) { + t.logger = l + } } func NewTransactor( backend bind.ContractBackend, watcher Watcher, + opts ...Option, ) *Transactor { nonceChan := make(chan uint64, 1) // We need to send a value to the channel so that the first transaction // can be sent. The value is not important as the first transaction will // get the nonce from the blockchain. nonceChan <- 0 - return &Transactor{ + t := &Transactor{ ContractBackend: backend, watcher: watcher, nonceChan: nonceChan, + maxNonceDrift: defaultMaxNonceDrift, + logger: slog.Default(), + } + for _, opt := range opts { + opt(t) } + return t } func (t *Transactor) PendingNonceAt(ctx context.Context, account common.Address) (uint64, error) { @@ -73,6 +110,19 @@ func (t *Transactor) PendingNonceAt(ctx context.Context, account common.Address) if pendingNonce > nonce { return pendingNonce, nil } + // Self-healing: if local nonce drifts too far ahead of the chain's + // pending nonce, transactions were likely lost (e.g. mempool cleared + // after a node restart). Reset to chain nonce to close the gap. + // This does not add any extra RPC calls — we already query the + // chain's pending nonce above. + if nonce > pendingNonce+t.maxNonceDrift { + t.logger.Warn("nonce drift detected, resetting to chain pending nonce", + "localNonce", nonce, + "chainPendingNonce", pendingNonce, + "drift", nonce-pendingNonce, + ) + return pendingNonce, nil + } return nonce, nil } } @@ -93,17 +143,18 @@ func (t *Transactor) SendTransaction(ctx context.Context, tx *types.Transaction) delay := 1 * time.Second for tries := 0; tries <= txnRetriesLimit; tries++ { cctx, cancel := context.WithTimeout(ctx, 5*time.Second) - defer cancel() + err := t.ContractBackend.SendTransaction(cctx, tx) + cancel() - if err := t.ContractBackend.SendTransaction(cctx, tx); err != nil { + if err != nil { if err == context.DeadlineExceeded { delay *= 2 retryTimer := time.NewTimer(delay) select { case <-ctx.Done(): + retryTimer.Stop() return ctx.Err() case <-retryTimer.C: - _ = retryTimer.Stop() } continue } diff --git a/x/contracts/transactor/transactor_test.go b/x/contracts/transactor/transactor_test.go index 895748e2d..b334e0d59 100644 --- a/x/contracts/transactor/transactor_test.go +++ b/x/contracts/transactor/transactor_test.go @@ -134,6 +134,115 @@ func TestTrasactor(t *testing.T) { } } +// autoAllowWatcher is a watcher that always allows and records sent txs. +type autoAllowWatcher struct { + txnChan chan *types.Transaction +} + +func (w *autoAllowWatcher) Allow(_ context.Context, _ uint64) bool { + return true +} + +func (w *autoAllowWatcher) Sent(_ context.Context, tx *types.Transaction) { + if w.txnChan != nil { + w.txnChan <- tx + } +} + +func TestNonceDriftSelfHealing(t *testing.T) { + t.Parallel() + + backend := &testBackend{ + nonce: 100, // chain pending nonce + } + watcher := &autoAllowWatcher{txnChan: make(chan *types.Transaction, 16)} + // maxNonceDrift = 5: drift of 6+ triggers reset + txnSender := transactor.NewTransactor(backend, watcher, transactor.WithMaxNonceDrift(5)) + + // First call: local nonce is 0 (initial), chain says 100 → returns 100 + nonce, err := txnSender.PendingNonceAt(context.Background(), common.Address{}) + if err != nil { + t.Fatal(err) + } + if nonce != 100 { + t.Fatalf("expected nonce 100, got %d", nonce) + } + + // Send nonces 100-109 to advance local nonce to 110 + for i := uint64(100); i <= 109; i++ { + backend.nonce = i + 1 // chain keeps up + if i > 100 { + nonce, err = txnSender.PendingNonceAt(context.Background(), common.Address{}) + if err != nil { + t.Fatal(err) + } + } + err = txnSender.SendTransaction(context.Background(), types.NewTransaction(nonce, common.Address{}, nil, 0, nil, nil)) + if err != nil { + t.Fatal(err) + } + <-watcher.txnChan + nonce = i + 1 + } + + // Local nonce is now 110. Simulate mempool wipe: chain nonce drops to 100. + // This simulates the node restart scenario where all pending txs are lost. + backend.nonce = 100 + + // Drift = 110 - 100 = 10 > maxNonceDrift(5) → should reset to 100 + nonce, err = txnSender.PendingNonceAt(context.Background(), common.Address{}) + if err != nil { + t.Fatal(err) + } + if nonce != 100 { + t.Fatalf("expected nonce to reset to 100 (chain nonce), got %d", nonce) + } +} + +func TestNonceDriftWithinThreshold(t *testing.T) { + t.Parallel() + + backend := &testBackend{ + nonce: 100, + } + watcher := &autoAllowWatcher{txnChan: make(chan *types.Transaction, 16)} + txnSender := transactor.NewTransactor(backend, watcher, transactor.WithMaxNonceDrift(5)) + + // Get initial nonce from chain (100) + nonce, err := txnSender.PendingNonceAt(context.Background(), common.Address{}) + if err != nil { + t.Fatal(err) + } + + // Send txs 100-104 to advance local nonce to 105 + for i := uint64(100); i <= 104; i++ { + backend.nonce = i // chain nonce stays behind (simulating pending txs) + if i > 100 { + nonce, err = txnSender.PendingNonceAt(context.Background(), common.Address{}) + if err != nil { + t.Fatal(err) + } + } + err = txnSender.SendTransaction(context.Background(), types.NewTransaction(nonce, common.Address{}, nil, 0, nil, nil)) + if err != nil { + t.Fatal(err) + } + <-watcher.txnChan + nonce = i + 1 + } + + // Local nonce = 105. Chain nonce = 104 (drift = 1, within threshold). + // Should use local nonce, NOT reset. + backend.nonce = 100 + nonce, err = txnSender.PendingNonceAt(context.Background(), common.Address{}) + if err != nil { + t.Fatal(err) + } + if nonce != 105 { + t.Fatalf("expected local nonce 105 (within drift threshold), got %d", nonce) + } +} + type testWatcher struct { allowChan chan uint64 txnChan chan *types.Transaction From 8234d8d8c75619bbb57a9c377f5a5e0345db4f21 Mon Sep 17 00:00:00 2001 From: kant Date: Thu, 5 Mar 2026 00:37:28 -0800 Subject: [PATCH 2/2] fix: prevent nonce gap when SendTransaction retries exhaust When all retry attempts fail with DeadlineExceeded, the for loop exits and falls through to the success path, incorrectly calling watcher.Sent() and incrementing the nonce for a transaction that was never actually sent. This creates a permanent nonce gap that leads to stale queued transactions in the mempool. Track the send error and check it after the loop exits. If all retries failed, return the error so the defer puts the nonce back into the channel for reuse. --- x/contracts/transactor/transactor.go | 13 ++++-- x/contracts/transactor/transactor_test.go | 48 +++++++++++++++++++++++ 2 files changed, 57 insertions(+), 4 deletions(-) diff --git a/x/contracts/transactor/transactor.go b/x/contracts/transactor/transactor.go index 568cba968..8bd1bbfa2 100644 --- a/x/contracts/transactor/transactor.go +++ b/x/contracts/transactor/transactor.go @@ -140,14 +140,15 @@ func (t *Transactor) SendTransaction(ctx context.Context, tx *types.Transaction) return ctx.Err() } + var sendErr error delay := 1 * time.Second for tries := 0; tries <= txnRetriesLimit; tries++ { cctx, cancel := context.WithTimeout(ctx, 5*time.Second) - err := t.ContractBackend.SendTransaction(cctx, tx) + sendErr = t.ContractBackend.SendTransaction(cctx, tx) cancel() - if err != nil { - if err == context.DeadlineExceeded { + if sendErr != nil { + if sendErr == context.DeadlineExceeded { delay *= 2 retryTimer := time.NewTimer(delay) select { @@ -158,11 +159,15 @@ func (t *Transactor) SendTransaction(ctx context.Context, tx *types.Transaction) } continue } - return err + return sendErr } break } + if sendErr != nil { + return sendErr + } + // If the transaction is successful, we need to update the nonce and notify the // watcher. t.watcher.Sent(ctx, tx) diff --git a/x/contracts/transactor/transactor_test.go b/x/contracts/transactor/transactor_test.go index b334e0d59..18c86dcb1 100644 --- a/x/contracts/transactor/transactor_test.go +++ b/x/contracts/transactor/transactor_test.go @@ -243,6 +243,50 @@ func TestNonceDriftWithinThreshold(t *testing.T) { } } +func TestSendTransactionRetriesExhausted(t *testing.T) { + t.Parallel() + + backend := &testBackend{ + nonce: 10, + sendTxErr: context.DeadlineExceeded, // all sends timeout + } + watcher := &autoAllowWatcher{txnChan: make(chan *types.Transaction, 1)} + txnSender := transactor.NewTransactor(backend, watcher) + + // Get initial nonce + nonce, err := txnSender.PendingNonceAt(context.Background(), common.Address{}) + if err != nil { + t.Fatal(err) + } + if nonce != 10 { + t.Fatalf("expected nonce 10, got %d", nonce) + } + + // SendTransaction should fail after exhausting all retries + err = txnSender.SendTransaction(context.Background(), types.NewTransaction(nonce, common.Address{}, nil, 0, nil, nil)) + if err == nil { + t.Fatal("expected error when all retries exhausted, got nil") + } + + // The nonce should NOT have been incremented — it should be reusable. + // Since the defer puts the nonce back, the next PendingNonceAt should return 10. + backend.sendTxErr = nil // clear the error so future sends work + nonce, err = txnSender.PendingNonceAt(context.Background(), common.Address{}) + if err != nil { + t.Fatal(err) + } + if nonce != 10 { + t.Fatalf("expected nonce to remain 10 after failed retries, got %d", nonce) + } + + // Verify no transaction was reported as sent + select { + case <-watcher.txnChan: + t.Fatal("watcher.Sent should not have been called for failed transaction") + default: + } +} + type testWatcher struct { allowChan chan uint64 txnChan chan *types.Transaction @@ -266,6 +310,7 @@ type testBackend struct { nonce uint64 errNonce uint64 pendingNonceErr error + sendTxErr error // if set, SendTransaction always returns this error } func (b *testBackend) PendingNonceAt(ctx context.Context, account common.Address) (uint64, error) { @@ -276,6 +321,9 @@ func (b *testBackend) PendingNonceAt(ctx context.Context, account common.Address } func (b *testBackend) SendTransaction(ctx context.Context, tx *types.Transaction) error { + if b.sendTxErr != nil { + return b.sendTxErr + } if b.errNonce == tx.Nonce() { return errors.New("nonce error") }