Skip to content

Commit ebe01ac

Browse files
committed
ethfinalizer: stuck transaction detection
1 parent ccf84b9 commit ebe01ac

3 files changed

Lines changed: 167 additions & 55 deletions

File tree

ethfinalizer/ethfinalizer.go

Lines changed: 55 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,15 @@ type FinalizerOptions[T any] struct {
4949
// Recommended to be at least 15, 10 is the default chosen by go-ethereum.
5050
PriceBump int
5151

52+
// NonceStuckTimeout is the period after which the finalizer is considered stuck on a nonce.
53+
NonceStuckTimeout time.Duration
54+
// TransactionStuckTimeout is the period after which a transaction is considered stuck.
55+
TransactionStuckTimeout time.Duration
56+
// OnStuck is called when the finalizer is stuck.
57+
OnStuck func(first, latest *Status[T])
58+
// OnUnstuck is called when the finalizer is unstuck.
59+
OnUnstuck func()
60+
5261
// SubscriptionBuffer is the size of the buffer for transaction events.
5362
SubscriptionBuffer int
5463
}
@@ -86,6 +95,14 @@ func (o FinalizerOptions[T]) IsValid() error {
8695
return fmt.Errorf("negative price bump %v", o.PriceBump)
8796
}
8897

98+
if o.NonceStuckTimeout < 0 {
99+
return fmt.Errorf("negative nonce stuck timeout %v", o.NonceStuckTimeout)
100+
}
101+
102+
if o.TransactionStuckTimeout < 0 {
103+
return fmt.Errorf("negative transaction stuck timeout %v", o.TransactionStuckTimeout)
104+
}
105+
89106
if o.SubscriptionBuffer < 0 {
90107
return fmt.Errorf("negative subscription buffer %v", o.SubscriptionBuffer)
91108
}
@@ -100,7 +117,7 @@ func (o FinalizerOptions[T]) IsValid() error {
100117
type Finalizer[T any] struct {
101118
FinalizerOptions[T]
102119

103-
isRunning atomic.Bool
120+
isRunning, isStuck atomic.Bool
104121

105122
subscriptions map[chan Event[T]]struct{}
106123
subscriptionsMu sync.RWMutex
@@ -121,6 +138,18 @@ type Event[T any] struct {
121138
Added *Transaction[T]
122139
}
123140

141+
// Status is the result of sending a Transaction on chain.
142+
//
143+
// Type parameters:
144+
// - T: transaction metadata type
145+
type Status[T any] struct {
146+
Transaction *Transaction[T]
147+
// Time is when the transaction was first committed.
148+
Time time.Time
149+
// Error is the most recent error from sending the transaction on chain.
150+
Error error
151+
}
152+
124153
// Transaction is a transaction with metadata of type T.
125154
//
126155
// Type parameters:
@@ -256,6 +285,19 @@ func (f *Finalizer[T]) Run(ctx context.Context) error {
256285
return fmt.Errorf("unable to read chain nonce: %w", err)
257286
}
258287

288+
first, latest, err := f.Mempool.Status(ctx, chainNonce)
289+
if err == nil {
290+
if first != nil && latest != nil && (f.NonceStuckTimeout != 0 && time.Since(first.Time) >= f.NonceStuckTimeout || f.TransactionStuckTimeout != 0 && time.Since(latest.Time) >= f.TransactionStuckTimeout) {
291+
if f.isStuck.CompareAndSwap(false, true) && f.OnStuck != nil {
292+
f.OnStuck(first, latest)
293+
}
294+
} else if f.isStuck.CompareAndSwap(true, false) && f.OnUnstuck != nil {
295+
f.OnUnstuck()
296+
}
297+
} else {
298+
f.Logger.ErrorContext(ctx, "unable to read status", slog.Any("error", err), slog.Uint64("nonce", chainNonce))
299+
}
300+
259301
transactions, err := f.Mempool.PriciestTransactions(ctx, chainNonce, time.Now().Add(-f.RetryDelay))
260302
if err != nil {
261303
return fmt.Errorf("unable to read mempool transactions: %w", err)
@@ -445,6 +487,10 @@ func (f *Finalizer[T]) Run(ctx context.Context) error {
445487
}
446488

447489
if err := f.Chain.Send(ctx, transaction.Transaction); err != nil {
490+
if err := f.Mempool.SetError(ctx, transaction.Hash(), err); err != nil {
491+
f.Logger.ErrorContext(ctx, "unable to set transaction error", slog.Any("error", err), slog.String("transaction", transaction.Hash().String()))
492+
}
493+
448494
f.Logger.ErrorContext(ctx, "unable to resend transaction to chain", slog.Any("error", err), slog.String("transaction", transaction.Hash().String()))
449495
}
450496
} else {
@@ -455,6 +501,10 @@ func (f *Finalizer[T]) Run(ctx context.Context) error {
455501
}
456502

457503
if err := f.Chain.Send(ctx, replacement); err != nil {
504+
if err := f.Mempool.SetError(ctx, replacement.Hash(), err); err != nil {
505+
f.Logger.ErrorContext(ctx, "unable to set transaction error", slog.Any("error", err), slog.String("transaction", replacement.Hash().String()))
506+
}
507+
458508
f.Logger.ErrorContext(ctx, "unable to send replacement transaction to chain", slog.Any("error", err), slog.String("transaction", replacement.Hash().String()))
459509
}
460510
} else {
@@ -646,6 +696,10 @@ func (f *Finalizer[T]) Send(ctx context.Context, transaction *types.Transaction,
646696
}
647697

648698
if err := f.Chain.Send(ctx, transaction); err != nil {
699+
if err := f.Mempool.SetError(ctx, transaction.Hash(), err); err != nil {
700+
f.Logger.ErrorContext(ctx, "unable to set transaction error", slog.Any("error", err), slog.String("transaction", transaction.Hash().String()))
701+
}
702+
649703
f.Logger.ErrorContext(ctx, "unable to send transaction to chain", slog.Any("error", err), slog.String("transaction", transaction.Hash().String()))
650704
}
651705

ethfinalizer/ethfinalizer_test.go

Lines changed: 44 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -19,17 +19,21 @@ import (
1919
)
2020

2121
const (
22-
TestDuration = 10 * time.Second
23-
MonitorPollInterval = 100 * time.Millisecond
24-
FinalizerPollInterval = 1 * time.Second
25-
FinalizerPollTimeout = 1 * time.Second
26-
FinalizerRetryDelay = 5 * time.Second
27-
TransactionsPerSecond = 2
28-
BlockPeriod = 2 * time.Second
29-
StallPeriod = 20 * time.Second
30-
MineProbability = 0.8
31-
ReorgProbability = 0.1
32-
ReorgLimit = 10
22+
TestDuration = 10 * time.Second
23+
MonitorPollInterval = 100 * time.Millisecond
24+
FinalizerPollInterval = 1 * time.Second
25+
FinalizerPollTimeout = 1 * time.Second
26+
FinalizerRetryDelay = 5 * time.Second
27+
FinalizerFeeMargin = 20
28+
FinalizerPriceBump = 10
29+
FinalizerNonceStuckTimeout = 10 * time.Second
30+
FinalizerTransactionStuckTimeout = 5 * time.Second
31+
TransactionsPerSecond = 2
32+
BlockPeriod = 2 * time.Second
33+
StallPeriod = 20 * time.Second
34+
MineProbability = 0.8
35+
ReorgProbability = 0.1
36+
ReorgLimit = 10
3337
)
3438

3539
func TestFinalizerNoEIP1559(t *testing.T) {
@@ -62,22 +66,39 @@ func test(t *testing.T, isEIP1559 bool) {
6266

6367
mempool := ethfinalizer.NewMemoryMempool[struct{}]()
6468

69+
ctx, cancel := context.WithTimeout(context.Background(), TestDuration)
70+
defer cancel()
71+
6572
finalizer, err := ethfinalizer.NewFinalizer(ethfinalizer.FinalizerOptions[struct{}]{
66-
Wallet: wallet,
67-
Chain: chain,
68-
Mempool: mempool,
69-
Logger: logger,
70-
PollInterval: FinalizerPollInterval,
71-
PollTimeout: FinalizerPollTimeout,
72-
RetryDelay: FinalizerRetryDelay,
73-
FeeMargin: 20,
74-
PriceBump: 10,
73+
Wallet: wallet,
74+
Chain: chain,
75+
Mempool: mempool,
76+
Logger: logger,
77+
PollInterval: FinalizerPollInterval,
78+
PollTimeout: FinalizerPollTimeout,
79+
RetryDelay: FinalizerRetryDelay,
80+
FeeMargin: FinalizerFeeMargin,
81+
PriceBump: FinalizerPriceBump,
82+
NonceStuckTimeout: FinalizerNonceStuckTimeout,
83+
TransactionStuckTimeout: FinalizerTransactionStuckTimeout,
84+
OnStuck: func(first, latest *ethfinalizer.Status[struct{}]) {
85+
logger.DebugContext(
86+
ctx,
87+
"stuck",
88+
slog.String("first", first.Transaction.Hash().String()),
89+
slog.Uint64("firstNonce", first.Transaction.Nonce()),
90+
slog.Duration("firstAge", time.Since(first.Time)),
91+
slog.String("latest", latest.Transaction.Hash().String()),
92+
slog.Uint64("latestNonce", latest.Transaction.Nonce()),
93+
slog.Duration("latestAge", time.Since(latest.Time)),
94+
)
95+
},
96+
OnUnstuck: func() {
97+
logger.DebugContext(ctx, "unstuck")
98+
},
7599
})
76100
assert.NoError(t, err)
77101

78-
ctx, cancel := context.WithTimeout(context.Background(), TestDuration)
79-
defer cancel()
80-
81102
var wg sync.WaitGroup
82103

83104
wg.Go(func() {

ethfinalizer/mempool.go

Lines changed: 68 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package ethfinalizer
22

33
import (
44
"context"
5+
"fmt"
56
"sync"
67
"time"
78

@@ -15,35 +16,40 @@ type Mempool[T any] interface {
1516
Nonce(ctx context.Context) (uint64, error)
1617

1718
// Commit persists the signed transaction with its metadata in the store.
18-
// The transaction must be persisted with a timestamp of the current time.
19-
// If the transaction already exists in the mempool, the timestamp must be updated.
19+
// The transaction must be persisted with timestamps of the first and latest submissions.
20+
// If the transaction already exists in the mempool, the latest timestamp must be updated.
2021
Commit(ctx context.Context, transaction *types.Transaction, metadata T) error
2122

23+
// SetError sets the error for a previously committed transaction.
24+
SetError(ctx context.Context, transaction common.Hash, err error) error
25+
2226
// Transactions returns the transactions for the specified hashes which are signed by this specific wallet for this specific chain.
2327
Transactions(ctx context.Context, hashes map[common.Hash]struct{}) (map[common.Hash]*Transaction[T], error)
2428

2529
// PriciestTransactions returns, by nonce, the most expensive transactions signed by this specific wallet for this specific chain, with a minimum nonce and a latest timestamp.
2630
PriciestTransactions(ctx context.Context, fromNonce uint64, before time.Time) (map[uint64]*Transaction[T], error)
31+
32+
// Status returns the statuses for the first and latest transactions for a given nonce.
33+
Status(ctx context.Context, nonce uint64) (*Status[T], *Status[T], error)
2734
}
2835

2936
type memoryMempool[T any] struct {
30-
transactions map[common.Hash]*Transaction[T]
31-
priciestTransactions map[uint64]*timestampedTransaction[T]
32-
highestNonce *uint64
33-
mu sync.RWMutex
37+
transactions map[common.Hash]*Status[T]
38+
nonces map[uint64]*nonceStatus[T]
39+
highestNonce *uint64
40+
mu sync.RWMutex
3441
}
3542

36-
type timestampedTransaction[T any] struct {
37-
*Transaction[T]
38-
39-
timestamp time.Time
43+
type nonceStatus[T any] struct {
44+
first, latest *Status[T]
45+
time time.Time
4046
}
4147

4248
// NewMemoryMempool creates a minimal in-memory Mempool.
4349
func NewMemoryMempool[T any]() Mempool[T] {
4450
return &memoryMempool[T]{
45-
transactions: map[common.Hash]*Transaction[T]{},
46-
priciestTransactions: map[uint64]*timestampedTransaction[T]{},
51+
transactions: map[common.Hash]*Status[T]{},
52+
nonces: map[uint64]*nonceStatus[T]{},
4753
}
4854
}
4955

@@ -62,28 +68,49 @@ func (m *memoryMempool[T]) Commit(ctx context.Context, transaction *types.Transa
6268
m.mu.Lock()
6369
defer m.mu.Unlock()
6470

65-
transaction_ := Transaction[T]{
66-
Transaction: transaction,
67-
Metadata: metadata,
68-
}
69-
70-
m.transactions[transaction.Hash()] = &transaction_
71+
now := time.Now()
7172

72-
previous := m.priciestTransactions[transaction.Nonce()]
73-
if previous == nil || transaction.GasFeeCapCmp(previous.Transaction.Transaction) > 0 && transaction.GasTipCapCmp(previous.Transaction.Transaction) > 0 {
74-
m.priciestTransactions[transaction.Nonce()] = &timestampedTransaction[T]{
75-
Transaction: &transaction_,
76-
timestamp: time.Now(),
73+
status_ := Status[T]{
74+
Transaction: &Transaction[T]{
75+
Transaction: transaction,
76+
Metadata: metadata,
77+
},
78+
Time: now,
79+
}
80+
m.transactions[transaction.Hash()] = &status_
81+
82+
status := m.nonces[transaction.Nonce()]
83+
if status == nil {
84+
status = &nonceStatus[T]{
85+
first: &status_,
86+
latest: &status_,
87+
time: now,
7788
}
89+
m.nonces[transaction.Nonce()] = status
7890

7991
if m.highestNonce == nil || transaction.Nonce() > *m.highestNonce {
8092
m.highestNonce = new(uint64)
8193
*m.highestNonce = transaction.Nonce()
8294
}
83-
} else if previous.Hash() == transaction.Hash() {
84-
previous.timestamp = time.Now()
8595
}
8696

97+
if transaction.Hash() == status.latest.Transaction.Hash() {
98+
status.time = now
99+
} else if transaction.GasFeeCapCmp(status.latest.Transaction.Transaction) > 0 && transaction.GasTipCapCmp(status.latest.Transaction.Transaction) > 0 {
100+
status.latest = &status_
101+
status.time = now
102+
}
103+
104+
return nil
105+
}
106+
107+
func (m *memoryMempool[T]) SetError(ctx context.Context, transaction common.Hash, err error) error {
108+
status := m.transactions[transaction]
109+
if status == nil {
110+
return fmt.Errorf("unknown transaction %v", transaction)
111+
}
112+
113+
status.Error = err
87114
return nil
88115
}
89116

@@ -93,9 +120,9 @@ func (m *memoryMempool[T]) Transactions(ctx context.Context, hashes map[common.H
93120

94121
transactions := make(map[common.Hash]*Transaction[T], len(hashes))
95122
for hash := range hashes {
96-
transaction := m.transactions[hash]
97-
if transaction != nil {
98-
transactions[hash] = transaction
123+
status := m.transactions[hash]
124+
if status != nil {
125+
transactions[hash] = status.Transaction
99126
}
100127
}
101128

@@ -113,12 +140,22 @@ func (m *memoryMempool[T]) PriciestTransactions(ctx context.Context, fromNonce u
113140

114141
transactions := make(map[uint64]*Transaction[T], capacity)
115142
for nonce := fromNonce; ; nonce++ {
116-
transaction := m.priciestTransactions[nonce]
117-
if transaction == nil || !transaction.timestamp.Before(before) {
143+
status := m.nonces[nonce]
144+
if status == nil || !status.time.Before(before) {
118145
break
119146
}
120-
transactions[nonce] = transaction.Transaction
147+
148+
transactions[nonce] = status.latest.Transaction
121149
}
122150

123151
return transactions, nil
124152
}
153+
154+
func (m *memoryMempool[T]) Status(ctx context.Context, nonce uint64) (*Status[T], *Status[T], error) {
155+
status := m.nonces[nonce]
156+
if status == nil {
157+
return nil, nil, nil
158+
}
159+
160+
return status.first, status.latest, nil
161+
}

0 commit comments

Comments
 (0)