diff --git a/x/contracts/transactor/transactor.go b/x/contracts/transactor/transactor.go index b7f2fc1da..8bd1bbfa2 100644 --- a/x/contracts/transactor/transactor.go +++ b/x/contracts/transactor/transactor.go @@ -2,6 +2,7 @@ package transactor import ( "context" + "log/slog" "time" "github.com/ethereum/go-ethereum/accounts/abi/bind" @@ -11,6 +12,14 @@ import ( const ( txnRetriesLimit = 3 + + // defaultMaxNonceDrift is the maximum allowed difference between the + // local nonce counter and the chain's pending nonce before the transactor + // resets to the chain nonce. Since sends are serialized via a size-1 + // channel, under normal operation the drift should be 0. A drift larger + // than this threshold indicates that previously sent transactions were + // lost (e.g. mempool cleared after a node restart). + defaultMaxNonceDrift uint64 = 5 ) // Watcher is an interface that is used to manage the lifecycle of a transaction. @@ -37,24 +46,52 @@ type Watcher interface { // of an error, the nonce is put back into the channel so that it can be reused. type Transactor struct { bind.ContractBackend - nonceChan chan uint64 - watcher Watcher + nonceChan chan uint64 + watcher Watcher + maxNonceDrift uint64 + logger *slog.Logger +} + +// Option is a functional option for configuring the Transactor. +type Option func(*Transactor) + +// WithMaxNonceDrift sets the maximum allowed drift between the local nonce +// counter and the chain's pending nonce before the transactor self-heals by +// resetting to the chain nonce. +func WithMaxNonceDrift(n uint64) Option { + return func(t *Transactor) { + t.maxNonceDrift = n + } +} + +// WithLogger sets the logger for the transactor. +func WithLogger(l *slog.Logger) Option { + return func(t *Transactor) { + t.logger = l + } } func NewTransactor( backend bind.ContractBackend, watcher Watcher, + opts ...Option, ) *Transactor { nonceChan := make(chan uint64, 1) // We need to send a value to the channel so that the first transaction // can be sent. The value is not important as the first transaction will // get the nonce from the blockchain. nonceChan <- 0 - return &Transactor{ + t := &Transactor{ ContractBackend: backend, watcher: watcher, nonceChan: nonceChan, + maxNonceDrift: defaultMaxNonceDrift, + logger: slog.Default(), + } + for _, opt := range opts { + opt(t) } + return t } func (t *Transactor) PendingNonceAt(ctx context.Context, account common.Address) (uint64, error) { @@ -73,6 +110,19 @@ func (t *Transactor) PendingNonceAt(ctx context.Context, account common.Address) if pendingNonce > nonce { return pendingNonce, nil } + // Self-healing: if local nonce drifts too far ahead of the chain's + // pending nonce, transactions were likely lost (e.g. mempool cleared + // after a node restart). Reset to chain nonce to close the gap. + // This does not add any extra RPC calls — we already query the + // chain's pending nonce above. + if nonce > pendingNonce+t.maxNonceDrift { + t.logger.Warn("nonce drift detected, resetting to chain pending nonce", + "localNonce", nonce, + "chainPendingNonce", pendingNonce, + "drift", nonce-pendingNonce, + ) + return pendingNonce, nil + } return nonce, nil } } @@ -90,28 +140,34 @@ func (t *Transactor) SendTransaction(ctx context.Context, tx *types.Transaction) return ctx.Err() } + var sendErr error delay := 1 * time.Second for tries := 0; tries <= txnRetriesLimit; tries++ { cctx, cancel := context.WithTimeout(ctx, 5*time.Second) - defer cancel() + sendErr = t.ContractBackend.SendTransaction(cctx, tx) + cancel() - if err := t.ContractBackend.SendTransaction(cctx, tx); err != nil { - if err == context.DeadlineExceeded { + if sendErr != nil { + if sendErr == context.DeadlineExceeded { delay *= 2 retryTimer := time.NewTimer(delay) select { case <-ctx.Done(): + retryTimer.Stop() return ctx.Err() case <-retryTimer.C: - _ = retryTimer.Stop() } continue } - return err + return sendErr } break } + if sendErr != nil { + return sendErr + } + // If the transaction is successful, we need to update the nonce and notify the // watcher. t.watcher.Sent(ctx, tx) diff --git a/x/contracts/transactor/transactor_test.go b/x/contracts/transactor/transactor_test.go index 895748e2d..18c86dcb1 100644 --- a/x/contracts/transactor/transactor_test.go +++ b/x/contracts/transactor/transactor_test.go @@ -134,6 +134,159 @@ func TestTrasactor(t *testing.T) { } } +// autoAllowWatcher is a watcher that always allows and records sent txs. +type autoAllowWatcher struct { + txnChan chan *types.Transaction +} + +func (w *autoAllowWatcher) Allow(_ context.Context, _ uint64) bool { + return true +} + +func (w *autoAllowWatcher) Sent(_ context.Context, tx *types.Transaction) { + if w.txnChan != nil { + w.txnChan <- tx + } +} + +func TestNonceDriftSelfHealing(t *testing.T) { + t.Parallel() + + backend := &testBackend{ + nonce: 100, // chain pending nonce + } + watcher := &autoAllowWatcher{txnChan: make(chan *types.Transaction, 16)} + // maxNonceDrift = 5: drift of 6+ triggers reset + txnSender := transactor.NewTransactor(backend, watcher, transactor.WithMaxNonceDrift(5)) + + // First call: local nonce is 0 (initial), chain says 100 → returns 100 + nonce, err := txnSender.PendingNonceAt(context.Background(), common.Address{}) + if err != nil { + t.Fatal(err) + } + if nonce != 100 { + t.Fatalf("expected nonce 100, got %d", nonce) + } + + // Send nonces 100-109 to advance local nonce to 110 + for i := uint64(100); i <= 109; i++ { + backend.nonce = i + 1 // chain keeps up + if i > 100 { + nonce, err = txnSender.PendingNonceAt(context.Background(), common.Address{}) + if err != nil { + t.Fatal(err) + } + } + err = txnSender.SendTransaction(context.Background(), types.NewTransaction(nonce, common.Address{}, nil, 0, nil, nil)) + if err != nil { + t.Fatal(err) + } + <-watcher.txnChan + nonce = i + 1 + } + + // Local nonce is now 110. Simulate mempool wipe: chain nonce drops to 100. + // This simulates the node restart scenario where all pending txs are lost. + backend.nonce = 100 + + // Drift = 110 - 100 = 10 > maxNonceDrift(5) → should reset to 100 + nonce, err = txnSender.PendingNonceAt(context.Background(), common.Address{}) + if err != nil { + t.Fatal(err) + } + if nonce != 100 { + t.Fatalf("expected nonce to reset to 100 (chain nonce), got %d", nonce) + } +} + +func TestNonceDriftWithinThreshold(t *testing.T) { + t.Parallel() + + backend := &testBackend{ + nonce: 100, + } + watcher := &autoAllowWatcher{txnChan: make(chan *types.Transaction, 16)} + txnSender := transactor.NewTransactor(backend, watcher, transactor.WithMaxNonceDrift(5)) + + // Get initial nonce from chain (100) + nonce, err := txnSender.PendingNonceAt(context.Background(), common.Address{}) + if err != nil { + t.Fatal(err) + } + + // Send txs 100-104 to advance local nonce to 105 + for i := uint64(100); i <= 104; i++ { + backend.nonce = i // chain nonce stays behind (simulating pending txs) + if i > 100 { + nonce, err = txnSender.PendingNonceAt(context.Background(), common.Address{}) + if err != nil { + t.Fatal(err) + } + } + err = txnSender.SendTransaction(context.Background(), types.NewTransaction(nonce, common.Address{}, nil, 0, nil, nil)) + if err != nil { + t.Fatal(err) + } + <-watcher.txnChan + nonce = i + 1 + } + + // Local nonce = 105. Chain nonce = 104 (drift = 1, within threshold). + // Should use local nonce, NOT reset. + backend.nonce = 100 + nonce, err = txnSender.PendingNonceAt(context.Background(), common.Address{}) + if err != nil { + t.Fatal(err) + } + if nonce != 105 { + t.Fatalf("expected local nonce 105 (within drift threshold), got %d", nonce) + } +} + +func TestSendTransactionRetriesExhausted(t *testing.T) { + t.Parallel() + + backend := &testBackend{ + nonce: 10, + sendTxErr: context.DeadlineExceeded, // all sends timeout + } + watcher := &autoAllowWatcher{txnChan: make(chan *types.Transaction, 1)} + txnSender := transactor.NewTransactor(backend, watcher) + + // Get initial nonce + nonce, err := txnSender.PendingNonceAt(context.Background(), common.Address{}) + if err != nil { + t.Fatal(err) + } + if nonce != 10 { + t.Fatalf("expected nonce 10, got %d", nonce) + } + + // SendTransaction should fail after exhausting all retries + err = txnSender.SendTransaction(context.Background(), types.NewTransaction(nonce, common.Address{}, nil, 0, nil, nil)) + if err == nil { + t.Fatal("expected error when all retries exhausted, got nil") + } + + // The nonce should NOT have been incremented — it should be reusable. + // Since the defer puts the nonce back, the next PendingNonceAt should return 10. + backend.sendTxErr = nil // clear the error so future sends work + nonce, err = txnSender.PendingNonceAt(context.Background(), common.Address{}) + if err != nil { + t.Fatal(err) + } + if nonce != 10 { + t.Fatalf("expected nonce to remain 10 after failed retries, got %d", nonce) + } + + // Verify no transaction was reported as sent + select { + case <-watcher.txnChan: + t.Fatal("watcher.Sent should not have been called for failed transaction") + default: + } +} + type testWatcher struct { allowChan chan uint64 txnChan chan *types.Transaction @@ -157,6 +310,7 @@ type testBackend struct { nonce uint64 errNonce uint64 pendingNonceErr error + sendTxErr error // if set, SendTransaction always returns this error } func (b *testBackend) PendingNonceAt(ctx context.Context, account common.Address) (uint64, error) { @@ -167,6 +321,9 @@ func (b *testBackend) PendingNonceAt(ctx context.Context, account common.Address } func (b *testBackend) SendTransaction(ctx context.Context, tx *types.Transaction) error { + if b.sendTxErr != nil { + return b.sendTxErr + } if b.errNonce == tx.Nonce() { return errors.New("nonce error") }