Skip to content

Commit 48c077d

Browse files
reinkrulclaude
andcommitted
Fix #4162: limit concurrent outgoing TransactionListQuery conversations
When many peers connect at startup and we're out of sync, sendTransactionListQuery can be called for many peers simultaneously. Each peer responds with TransactionList messages that cause BBolt writes, starving read operations (e.g. notifier startup). Add a counting semaphore (default 3, configurable via network.v2.maxtransactionlistqueries) that limits concurrent outgoing TransactionListQuery conversations. Queries that exceed the limit are dropped; the gossip/state-comparison mechanism will retry them. The slot is released when the last TransactionList message of the conversation arrives. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent a5c362a commit 48c077d

File tree

4 files changed

+67
-11
lines changed

4 files changed

+67
-11
lines changed

network/transport/v2/protocol.go

Lines changed: 25 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -54,18 +54,24 @@ type Config struct {
5454
GossipInterval int `koanf:"gossipinterval"`
5555
// DiagnosticsInterval specifies how often (in milliseconds) the node should broadcast its diagnostics message.
5656
DiagnosticsInterval int `koanf:"diagnosticsinterval"`
57+
// MaxConcurrentTransactionListQueries limits the number of outgoing TransactionListQuery conversations active at
58+
// the same time. Limiting this reduces BBolt write lock contention during peer synchronization at startup.
59+
// See https://github.com/nuts-foundation/nuts-node/issues/4162
60+
MaxConcurrentTransactionListQueries int `koanf:"maxtransactionlistqueries"`
5761
}
5862

5963
const defaultPayloadRetryDelay = 5 * time.Second
6064
const defaultGossipInterval = 5000
6165
const defaultDiagnosticsInterval = 5000
66+
const defaultMaxConcurrentTransactionListQueries = 3
6267

6368
// DefaultConfig returns the default config for protocol v2
6469
func DefaultConfig() Config {
6570
return Config{
66-
PayloadRetryDelay: defaultPayloadRetryDelay,
67-
GossipInterval: defaultGossipInterval,
68-
DiagnosticsInterval: defaultDiagnosticsInterval,
71+
PayloadRetryDelay: defaultPayloadRetryDelay,
72+
GossipInterval: defaultGossipInterval,
73+
DiagnosticsInterval: defaultDiagnosticsInterval,
74+
MaxConcurrentTransactionListQueries: defaultMaxConcurrentTransactionListQueries,
6975
}
7076
}
7177

@@ -79,16 +85,20 @@ func New(
7985
diagnosticsProvider func() transport.Diagnostics,
8086
dagStore stoabs.KVStore,
8187
) transport.Protocol {
88+
if config.MaxConcurrentTransactionListQueries <= 0 {
89+
config.MaxConcurrentTransactionListQueries = defaultMaxConcurrentTransactionListQueries
90+
}
8291
ctx, cancel := context.WithCancel(context.Background())
8392
p := &protocol{
84-
cancel: cancel,
85-
config: config,
86-
ctx: ctx,
87-
state: state,
88-
nodeDID: nodeDID,
89-
decrypter: decrypter,
90-
docResolver: docResolver,
91-
dagStore: dagStore,
93+
cancel: cancel,
94+
config: config,
95+
ctx: ctx,
96+
state: state,
97+
nodeDID: nodeDID,
98+
decrypter: decrypter,
99+
docResolver: docResolver,
100+
dagStore: dagStore,
101+
transactionListQuerySem: make(chan struct{}, config.MaxConcurrentTransactionListQueries),
92102
}
93103
p.sender = p
94104
p.diagnosticsMan = newPeerDiagnosticsManager(diagnosticsProvider, p.sender.broadcastDiagnostics)
@@ -113,6 +123,10 @@ type protocol struct {
113123
sender messageSender
114124
listHandler *transactionListHandler
115125
dagStore stoabs.KVStore
126+
// transactionListQuerySem is a counting semaphore that limits the number of concurrent outgoing
127+
// TransactionListQuery conversations. See https://github.com/nuts-foundation/nuts-node/issues/4162
128+
transactionListQuerySem chan struct{}
129+
transactionListQueryConvs sync.Map // tracks conversationID strings that hold a semaphore slot
116130
}
117131

118132
func (p *protocol) CreateClientStream(outgoingContext context.Context, grpcConn grpcLib.ClientConnInterface) (grpcLib.ClientStream, error) {

network/transport/v2/senders.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,20 @@ func (p *protocol) sendTransactionListQuery(connection grpc.Connection, refs []h
8282
return nil
8383
}
8484

85+
// Limit the number of concurrent outgoing TransactionListQuery conversations to avoid flooding BBolt with
86+
// write operations when many peers respond simultaneously. See https://github.com/nuts-foundation/nuts-node/issues/4162
87+
select {
88+
case p.transactionListQuerySem <- struct{}{}:
89+
p.transactionListQueryConvs.Store(conversation.conversationID.String(), struct{}{})
90+
default:
91+
log.Logger().
92+
WithFields(connection.Peer().ToFields()).
93+
WithField(core.LogFieldConversationID, conversation.conversationID.String()).
94+
Debug("TransactionListQuery limit reached, not requesting TransactionList from peer")
95+
p.cMan.done(conversation.conversationID)
96+
return nil
97+
}
98+
8599
log.Logger().
86100
WithFields(connection.Peer().ToFields()).
87101
WithField(core.LogFieldConversationID, conversation.conversationID.String()).

network/transport/v2/senders_test.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ package v2
2121

2222
import (
2323
"errors"
24+
"fmt"
2425
"github.com/stretchr/testify/require"
2526
"math"
2627
"testing"
@@ -168,6 +169,29 @@ func TestProtocol_sendTransactionListQuery(t *testing.T) {
168169
assert.NotNil(t, actualEnvelope.GetTransactionListQuery().GetConversationID())
169170
})
170171

172+
t.Run("limit reached - query is dropped", func(t *testing.T) {
173+
proto, mocks := newTestProtocol(t, nil)
174+
// Fill the semaphore to the limit (default 3), using different peers so the per-peer
175+
// conversation guard doesn't block us.
176+
for i := range proto.config.MaxConcurrentTransactionListQueries {
177+
mockConn := grpc.NewMockConnection(mocks.Controller)
178+
mockConn.EXPECT().Peer().AnyTimes().Return(transport.Peer{ID: transport.PeerID(fmt.Sprintf("peer-%d", i))})
179+
mockConn.EXPECT().Send(proto, gomock.Any(), false).Return(nil)
180+
_ = proto.sendTransactionListQuery(mockConn, []hash.SHA256Hash{hash.FromSlice([]byte("list query"))})
181+
}
182+
require.Equal(t, proto.config.MaxConcurrentTransactionListQueries, len(proto.transactionListQuerySem))
183+
184+
// Next query on a new peer should be dropped (semaphore full).
185+
overLimitConn := grpc.NewMockConnection(mocks.Controller)
186+
overLimitConn.EXPECT().Peer().AnyTimes().Return(transport.Peer{ID: "over-limit"})
187+
// No Send expected: query must be dropped.
188+
189+
err := proto.sendTransactionListQuery(overLimitConn, []hash.SHA256Hash{hash.FromSlice([]byte("list query"))})
190+
191+
assert.NoError(t, err)
192+
assert.Equal(t, proto.config.MaxConcurrentTransactionListQueries, len(proto.transactionListQuerySem))
193+
})
194+
171195
performMultipleConversationsTest(t, peer, func(c grpc.Connection, p *protocol, mocks protocolMocks) error {
172196
return p.sendTransactionListQuery(c, []hash.SHA256Hash{hash.FromSlice([]byte("list query"))})
173197
})

network/transport/v2/transactionlist_handler.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,10 @@ func (p *protocol) handleTransactionList(ctx context.Context, connection grpc.Co
120120

121121
if msg.MessageNumber >= msg.TotalMessages {
122122
p.cMan.done(cid)
123+
// Release the semaphore slot if this conversation held one (TransactionListQuery, not TransactionRangeQuery).
124+
if _, held := p.transactionListQueryConvs.LoadAndDelete(cid.String()); held {
125+
<-p.transactionListQuerySem
126+
}
123127
} else {
124128
p.cMan.resetTimeout(cid)
125129
}

0 commit comments

Comments
 (0)