Skip to content

Commit d88ccdd

Browse files
stevenvegtclaude
andcommitted
Batch transaction writes in handleTransactionList to reduce fsync count
Add State.AddMany() that writes multiple transactions in a single BBolt write transaction. Previously, each transaction in a TransactionList message was added individually via State.Add(), each acquiring a write lock and triggering its own fsync. For a message with N transactions, this meant N separate fsyncs. On network-attached storage (e.g. Azure premium SMB), fsync latency is 10-100ms compared to <1ms on local SSD. Combined with the go-stoabs read lock issue (see go-stoabs#146), this creates a compounding effect: slow fsyncs hold the write lock longer, blocking concurrent reads via the RWMutex writer-preference, which in turn blocks subsequent writes. A bootup that takes 3 minutes on local storage can take 30+ minutes on SMB because the lock contention multiplies the raw I/O penalty. With batching, N transactions require only 1 fsync. Verification happens inside the write transaction so that later transactions in the batch can reference earlier ones. On the first failure, processing stops and all successfully added transactions are committed. The caller receives the count of added transactions and the first error, preserving the existing error handling (ErrPreviousTransactionMissing triggers state reconciliation, other errors are logged and recovered via gossip). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent d135f2a commit d88ccdd

5 files changed

Lines changed: 138 additions & 22 deletions

File tree

network/dag/interface.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,12 @@ type State interface {
5555
// If the transaction already exists, nothing is added and no observers are notified.
5656
// The payload may be passed as well. Allowing for better notification of observers
5757
Add(ctx context.Context, transactions Transaction, payload []byte) error
58+
// AddMany adds multiple transactions to the DAG in a single write transaction,
59+
// requiring only a single fsync. Transactions are processed in order so that later
60+
// transactions can reference earlier ones in the same batch.
61+
// Returns the number of transactions successfully added and the first error encountered.
62+
// Successfully added transactions are committed even when a later transaction fails.
63+
AddMany(ctx context.Context, transactions []Transaction, payloads [][]byte) (int, error)
5864
// FindBetweenLC finds all transactions which lamport clock value lies between startInclusive and endExclusive.
5965
// They are returned in order: first sorted on lamport clock value, then on transaction reference (byte order).
6066
FindBetweenLC(ctx context.Context, startInclusive uint32, endExclusive uint32) ([]Transaction, error)

network/dag/mock.go

Lines changed: 15 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

network/dag/state.go

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,96 @@ func (s *state) Add(ctx context.Context, transaction Transaction, payload []byte
219219
}), stoabs.WithWriteLock())
220220
}
221221

222+
func (s *state) AddMany(ctx context.Context, transactions []Transaction, payloads [][]byte) (int, error) {
223+
added := 0
224+
var txEvents []Event
225+
var payloadEvents []Event
226+
var firstErr error
227+
228+
err := s.db.Write(ctx, func(tx stoabs.WriteTx) error {
229+
for i, transaction := range transactions {
230+
if ctx.Err() != nil {
231+
firstErr = ctx.Err()
232+
break
233+
}
234+
// Skip if already present
235+
if s.graph.isPresent(tx, transaction.Ref()) {
236+
continue
237+
}
238+
239+
// Verify within the write TX so that earlier TXs in the batch are visible
240+
if err := s.verifyTX(tx, transaction); err != nil {
241+
firstErr = fmt.Errorf("transaction verification failed (tx=%s): %w", transaction.Ref(), err)
242+
break
243+
}
244+
245+
payload := payloads[i]
246+
if payload != nil {
247+
payloadHash := hash.SHA256Sum(payload)
248+
if !transaction.PayloadHash().Equals(payloadHash) {
249+
firstErr = fmt.Errorf("tx.PayloadHash does not match hash of payload (tx=%s)", transaction.Ref())
250+
break
251+
}
252+
if err := s.payloadStore.writePayload(tx, payloadHash, payload); err != nil {
253+
firstErr = err
254+
break
255+
}
256+
event := Event{
257+
Type: PayloadEventType,
258+
Hash: transaction.Ref(),
259+
Transaction: transaction,
260+
Payload: payload,
261+
}
262+
if err := s.saveEvent(tx, event); err != nil {
263+
firstErr = err
264+
break
265+
}
266+
payloadEvents = append(payloadEvents, event)
267+
}
268+
269+
if err := s.graph.add(tx, transaction); err != nil {
270+
firstErr = err
271+
break
272+
}
273+
event := Event{
274+
Type: TransactionEventType,
275+
Hash: transaction.Ref(),
276+
Transaction: transaction,
277+
Payload: payload,
278+
}
279+
if err := s.saveEvent(tx, event); err != nil {
280+
firstErr = err
281+
break
282+
}
283+
txEvents = append(txEvents, event)
284+
285+
if err := s.updateState(tx, transaction); err != nil {
286+
firstErr = err
287+
break
288+
}
289+
added++
290+
}
291+
// Always return nil to commit what we have, even if a TX failed
292+
return nil
293+
}, stoabs.OnRollback(func() {
294+
log.Logger().Warn("Reloading the XOR and IBLT trees due to a DB transaction Rollback")
295+
s.loadState(ctx)
296+
}), stoabs.AfterCommit(func() {
297+
for _, event := range txEvents {
298+
s.notify(event)
299+
}
300+
for _, event := range payloadEvents {
301+
s.notify(event)
302+
}
303+
}), stoabs.AfterCommit(func() {
304+
s.transactionCount.Add(float64(added))
305+
}), stoabs.WithWriteLock())
306+
if err != nil {
307+
return added, err
308+
}
309+
return added, firstErr
310+
}
311+
222312
func (s *state) updateState(tx stoabs.WriteTx, transaction Transaction) error {
223313
clock := transaction.Clock()
224314
for {

network/transport/v2/transactionlist_handler.go

Lines changed: 21 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -94,28 +94,33 @@ func (p *protocol) handleTransactionList(ctx context.Context, connection grpc.Co
9494
return err
9595
}
9696

97+
// Validate that all public transactions include a payload before adding
98+
payloads := make([][]byte, len(txs))
9799
for i, tx := range txs {
98-
if ctx.Err() != nil {
99-
// For loop might be long-running, support cancellation
100-
break
101-
}
102100
// TODO does this always trigger fetching missing payloads? (through observer on DAG) Prolly not for v2
103101
if len(tx.PAL()) == 0 && len(msg.Transactions[i].Payload) == 0 {
104102
return fmt.Errorf("peer did not provide payload for transaction (tx=%s)", tx.Ref())
105103
}
106-
if err = p.state.Add(ctx, tx, msg.Transactions[i].Payload); err != nil {
107-
if errors.Is(err, dag.ErrPreviousTransactionMissing) {
108-
p.cMan.done(cid)
109-
log.Logger().
110-
WithFields(connection.Peer().ToFields()).
111-
WithField(core.LogFieldConversationID, cid).
112-
WithField(core.LogFieldTransactionRef, tx.Ref()).
113-
Warn("Ignoring remainder of TransactionList due to missing prevs")
114-
xor, clock := p.state.XOR(dag.MaxLamportClock)
115-
return p.sender.sendState(connection, xor, clock)
116-
}
117-
return fmt.Errorf("unable to add received transaction to DAG (tx=%s): %w", tx.Ref(), err)
104+
payloads[i] = msg.Transactions[i].Payload
105+
}
106+
107+
if ctx.Err() != nil {
108+
return nil
109+
}
110+
111+
added, err := p.state.AddMany(ctx, txs, payloads)
112+
if err != nil {
113+
if errors.Is(err, dag.ErrPreviousTransactionMissing) {
114+
p.cMan.done(cid)
115+
log.Logger().
116+
WithFields(connection.Peer().ToFields()).
117+
WithField(core.LogFieldConversationID, cid).
118+
WithField(core.LogFieldTransactionRef, txs[added].Ref()).
119+
Warn("Ignoring remainder of TransactionList due to missing prevs")
120+
xor, clock := p.state.XOR(dag.MaxLamportClock)
121+
return p.sender.sendState(connection, xor, clock)
118122
}
123+
return fmt.Errorf("unable to add received transaction to DAG (tx=%s): %w", txs[added].Ref(), err)
119124
}
120125

121126
if msg.MessageNumber >= msg.TotalMessages {

network/transport/v2/transactionlist_handler_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ func TestProtocol_handleTransactionList(t *testing.T) {
7979
p, mocks := newTestProtocol(t, nil)
8080
conversation := p.cMan.startConversation(request, peer)
8181
envelope := envelopeWithConversation(conversation)
82-
mocks.State.EXPECT().Add(context.Background(), tx, payload).Return(nil)
82+
mocks.State.EXPECT().AddMany(context.Background(), []dag.Transaction{tx}, [][]byte{payload}).Return(1, nil)
8383

8484
err := p.handleTransactionList(context.Background(), connection, envelope)
8585

@@ -102,7 +102,7 @@ func TestProtocol_handleTransactionList(t *testing.T) {
102102
p, mocks := newTestProtocol(t, nil)
103103
conversation := p.cMan.startConversation(request, peer)
104104
envelope := envelopeWithConversation(conversation)
105-
mocks.State.EXPECT().Add(context.Background(), tx, payload).Return(nil)
105+
mocks.State.EXPECT().AddMany(context.Background(), []dag.Transaction{tx}, [][]byte{payload}).Return(1, nil)
106106

107107
err := p.handleTransactionList(context.Background(), connection, envelope)
108108

@@ -113,7 +113,7 @@ func TestProtocol_handleTransactionList(t *testing.T) {
113113
p, mocks := newTestProtocol(t, nil)
114114
conversation := p.cMan.startConversation(request, peer)
115115
envelope := envelopeWithConversation(conversation)
116-
mocks.State.EXPECT().Add(context.Background(), tx, payload).Return(dag.ErrPreviousTransactionMissing)
116+
mocks.State.EXPECT().AddMany(context.Background(), []dag.Transaction{tx}, [][]byte{payload}).Return(0, dag.ErrPreviousTransactionMissing)
117117
mocks.State.EXPECT().XOR(uint32(dag.MaxLamportClock)).Return(hash.FromSlice([]byte("stateXor")), uint32(7))
118118
mocks.Sender.EXPECT().sendState(connection, hash.FromSlice([]byte("stateXor")), uint32(7))
119119

@@ -127,7 +127,7 @@ func TestProtocol_handleTransactionList(t *testing.T) {
127127
p, mocks := newTestProtocol(t, nil)
128128
conversation := p.cMan.startConversation(request, peer)
129129
envelope := envelopeWithConversation(conversation)
130-
mocks.State.EXPECT().Add(context.Background(), tx, payload).Return(nil)
130+
mocks.State.EXPECT().AddMany(context.Background(), []dag.Transaction{tx}, [][]byte{payload}).Return(1, nil)
131131

132132
err := p.handleTransactionList(context.Background(), connection, envelope)
133133

@@ -142,7 +142,7 @@ func TestProtocol_handleTransactionList(t *testing.T) {
142142
conversation := p.cMan.startConversation(request2, peer)
143143
cStartTime := conversation.expiry.Add(-1 * time.Millisecond)
144144
conversation.expiry = cStartTime
145-
mocks.State.EXPECT().Add(context.Background(), tx, payload).Return(nil)
145+
mocks.State.EXPECT().AddMany(context.Background(), []dag.Transaction{tx}, [][]byte{payload}).Return(1, nil)
146146

147147
err := p.handleTransactionList(context.Background(), connection, &Envelope{Message: &Envelope_TransactionList{
148148
TransactionList: &TransactionList{
@@ -163,7 +163,7 @@ func TestProtocol_handleTransactionList(t *testing.T) {
163163
p, mocks := newTestProtocol(t, nil)
164164
conversation := p.cMan.startConversation(request, peer)
165165
envelope := envelopeWithConversation(conversation)
166-
mocks.State.EXPECT().Add(context.Background(), tx, payload).Return(errors.New("custom"))
166+
mocks.State.EXPECT().AddMany(context.Background(), []dag.Transaction{tx}, [][]byte{payload}).Return(0, errors.New("custom"))
167167

168168
err := p.handleTransactionList(context.Background(), connection, envelope)
169169

0 commit comments

Comments
 (0)