Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 64 additions & 8 deletions x/contracts/transactor/transactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package transactor

import (
"context"
"log/slog"
"time"

"github.com/ethereum/go-ethereum/accounts/abi/bind"
Expand All @@ -11,6 +12,14 @@ import (

const (
txnRetriesLimit = 3

// defaultMaxNonceDrift is the maximum allowed difference between the
// local nonce counter and the chain's pending nonce before the transactor
// resets to the chain nonce. Since sends are serialized via a size-1
// channel, under normal operation the drift should be 0. A drift larger
// than this threshold indicates that previously sent transactions were
// lost (e.g. mempool cleared after a node restart).
defaultMaxNonceDrift uint64 = 5
)

// Watcher is an interface that is used to manage the lifecycle of a transaction.
Expand All @@ -37,24 +46,52 @@ type Watcher interface {
// of an error, the nonce is put back into the channel so that it can be reused.
type Transactor struct {
bind.ContractBackend
nonceChan chan uint64
watcher Watcher
nonceChan chan uint64
watcher Watcher
maxNonceDrift uint64
logger *slog.Logger
}

// Option is a functional option for configuring the Transactor.
type Option func(*Transactor)

// WithMaxNonceDrift sets the maximum allowed drift between the local nonce
// counter and the chain's pending nonce before the transactor self-heals by
// resetting to the chain nonce.
func WithMaxNonceDrift(n uint64) Option {
return func(t *Transactor) {
t.maxNonceDrift = n
}
}

// WithLogger sets the logger for the transactor.
func WithLogger(l *slog.Logger) Option {
return func(t *Transactor) {
t.logger = l
}
}

func NewTransactor(
backend bind.ContractBackend,
watcher Watcher,
opts ...Option,
) *Transactor {
nonceChan := make(chan uint64, 1)
// We need to send a value to the channel so that the first transaction
// can be sent. The value is not important as the first transaction will
// get the nonce from the blockchain.
nonceChan <- 0
return &Transactor{
t := &Transactor{
ContractBackend: backend,
watcher: watcher,
nonceChan: nonceChan,
maxNonceDrift: defaultMaxNonceDrift,
logger: slog.Default(),
}
for _, opt := range opts {
opt(t)
}
return t
}

func (t *Transactor) PendingNonceAt(ctx context.Context, account common.Address) (uint64, error) {
Expand All @@ -73,6 +110,19 @@ func (t *Transactor) PendingNonceAt(ctx context.Context, account common.Address)
if pendingNonce > nonce {
return pendingNonce, nil
}
// Self-healing: if local nonce drifts too far ahead of the chain's
// pending nonce, transactions were likely lost (e.g. mempool cleared
// after a node restart). Reset to chain nonce to close the gap.
// This does not add any extra RPC calls — we already query the
// chain's pending nonce above.
if nonce > pendingNonce+t.maxNonceDrift {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typically during heavy load (especially in stress test) this will replace existing transactions. The Watcher.Allow allows certain number of transactions to be enqueued beyond the backend nonce. This number is much higher (around 1024 I think). This nonceDrift is overloading this behaviour.

In order to do this type of healing, we should also keep track of when last successful txn was sent and when context.Deadline is occurring (on line 100). We need to ensure we are trying to send transactions but we are stuck for sometime before we can heal.

t.logger.Warn("nonce drift detected, resetting to chain pending nonce",
"localNonce", nonce,
"chainPendingNonce", pendingNonce,
"drift", nonce-pendingNonce,
)
return pendingNonce, nil
}
return nonce, nil
}
}
Expand All @@ -90,28 +140,34 @@ func (t *Transactor) SendTransaction(ctx context.Context, tx *types.Transaction)
return ctx.Err()
}

var sendErr error
delay := 1 * time.Second
for tries := 0; tries <= txnRetriesLimit; tries++ {
cctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
sendErr = t.ContractBackend.SendTransaction(cctx, tx)
cancel()

if err := t.ContractBackend.SendTransaction(cctx, tx); err != nil {
if err == context.DeadlineExceeded {
if sendErr != nil {
if sendErr == context.DeadlineExceeded {
delay *= 2
retryTimer := time.NewTimer(delay)
select {
case <-ctx.Done():
retryTimer.Stop()
return ctx.Err()
case <-retryTimer.C:
_ = retryTimer.Stop()
}
continue
}
return err
return sendErr
}
break
}

if sendErr != nil {
return sendErr
}

// If the transaction is successful, we need to update the nonce and notify the
// watcher.
t.watcher.Sent(ctx, tx)
Expand Down
157 changes: 157 additions & 0 deletions x/contracts/transactor/transactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,159 @@ func TestTrasactor(t *testing.T) {
}
}

// autoAllowWatcher is a watcher that always allows and records sent txs.
type autoAllowWatcher struct {
txnChan chan *types.Transaction
}

func (w *autoAllowWatcher) Allow(_ context.Context, _ uint64) bool {
return true
}

func (w *autoAllowWatcher) Sent(_ context.Context, tx *types.Transaction) {
if w.txnChan != nil {
w.txnChan <- tx
}
}

func TestNonceDriftSelfHealing(t *testing.T) {
t.Parallel()

backend := &testBackend{
nonce: 100, // chain pending nonce
}
watcher := &autoAllowWatcher{txnChan: make(chan *types.Transaction, 16)}
// maxNonceDrift = 5: drift of 6+ triggers reset
txnSender := transactor.NewTransactor(backend, watcher, transactor.WithMaxNonceDrift(5))

// First call: local nonce is 0 (initial), chain says 100 → returns 100
nonce, err := txnSender.PendingNonceAt(context.Background(), common.Address{})
if err != nil {
t.Fatal(err)
}
if nonce != 100 {
t.Fatalf("expected nonce 100, got %d", nonce)
}

// Send nonces 100-109 to advance local nonce to 110
for i := uint64(100); i <= 109; i++ {
backend.nonce = i + 1 // chain keeps up
if i > 100 {
nonce, err = txnSender.PendingNonceAt(context.Background(), common.Address{})
if err != nil {
t.Fatal(err)
}
}
err = txnSender.SendTransaction(context.Background(), types.NewTransaction(nonce, common.Address{}, nil, 0, nil, nil))
if err != nil {
t.Fatal(err)
}
<-watcher.txnChan
nonce = i + 1
}

// Local nonce is now 110. Simulate mempool wipe: chain nonce drops to 100.
// This simulates the node restart scenario where all pending txs are lost.
backend.nonce = 100

// Drift = 110 - 100 = 10 > maxNonceDrift(5) → should reset to 100
nonce, err = txnSender.PendingNonceAt(context.Background(), common.Address{})
if err != nil {
t.Fatal(err)
}
if nonce != 100 {
t.Fatalf("expected nonce to reset to 100 (chain nonce), got %d", nonce)
}
}

func TestNonceDriftWithinThreshold(t *testing.T) {
t.Parallel()

backend := &testBackend{
nonce: 100,
}
watcher := &autoAllowWatcher{txnChan: make(chan *types.Transaction, 16)}
txnSender := transactor.NewTransactor(backend, watcher, transactor.WithMaxNonceDrift(5))

// Get initial nonce from chain (100)
nonce, err := txnSender.PendingNonceAt(context.Background(), common.Address{})
if err != nil {
t.Fatal(err)
}

// Send txs 100-104 to advance local nonce to 105
for i := uint64(100); i <= 104; i++ {
backend.nonce = i // chain nonce stays behind (simulating pending txs)
if i > 100 {
nonce, err = txnSender.PendingNonceAt(context.Background(), common.Address{})
if err != nil {
t.Fatal(err)
}
}
err = txnSender.SendTransaction(context.Background(), types.NewTransaction(nonce, common.Address{}, nil, 0, nil, nil))
if err != nil {
t.Fatal(err)
}
<-watcher.txnChan
nonce = i + 1
}

// Local nonce = 105. Chain nonce = 104 (drift = 1, within threshold).
// Should use local nonce, NOT reset.
backend.nonce = 100
nonce, err = txnSender.PendingNonceAt(context.Background(), common.Address{})
if err != nil {
t.Fatal(err)
}
if nonce != 105 {
t.Fatalf("expected local nonce 105 (within drift threshold), got %d", nonce)
}
}

func TestSendTransactionRetriesExhausted(t *testing.T) {
t.Parallel()

backend := &testBackend{
nonce: 10,
sendTxErr: context.DeadlineExceeded, // all sends timeout
}
watcher := &autoAllowWatcher{txnChan: make(chan *types.Transaction, 1)}
txnSender := transactor.NewTransactor(backend, watcher)

// Get initial nonce
nonce, err := txnSender.PendingNonceAt(context.Background(), common.Address{})
if err != nil {
t.Fatal(err)
}
if nonce != 10 {
t.Fatalf("expected nonce 10, got %d", nonce)
}

// SendTransaction should fail after exhausting all retries
err = txnSender.SendTransaction(context.Background(), types.NewTransaction(nonce, common.Address{}, nil, 0, nil, nil))
if err == nil {
t.Fatal("expected error when all retries exhausted, got nil")
}

// The nonce should NOT have been incremented — it should be reusable.
// Since the defer puts the nonce back, the next PendingNonceAt should return 10.
backend.sendTxErr = nil // clear the error so future sends work
nonce, err = txnSender.PendingNonceAt(context.Background(), common.Address{})
if err != nil {
t.Fatal(err)
}
if nonce != 10 {
t.Fatalf("expected nonce to remain 10 after failed retries, got %d", nonce)
}

// Verify no transaction was reported as sent
select {
case <-watcher.txnChan:
t.Fatal("watcher.Sent should not have been called for failed transaction")
default:
}
}

type testWatcher struct {
allowChan chan uint64
txnChan chan *types.Transaction
Expand All @@ -157,6 +310,7 @@ type testBackend struct {
nonce uint64
errNonce uint64
pendingNonceErr error
sendTxErr error // if set, SendTransaction always returns this error
}

func (b *testBackend) PendingNonceAt(ctx context.Context, account common.Address) (uint64, error) {
Expand All @@ -167,6 +321,9 @@ func (b *testBackend) PendingNonceAt(ctx context.Context, account common.Address
}

func (b *testBackend) SendTransaction(ctx context.Context, tx *types.Transaction) error {
if b.sendTxErr != nil {
return b.sendTxErr
}
if b.errNonce == tx.Nonce() {
return errors.New("nonce error")
}
Expand Down
Loading