diff --git a/network/transport/v2/protocol.go b/network/transport/v2/protocol.go index c1bf43d0fb..3df1c50a0e 100644 --- a/network/transport/v2/protocol.go +++ b/network/transport/v2/protocol.go @@ -54,18 +54,24 @@ type Config struct { GossipInterval int `koanf:"gossipinterval"` // DiagnosticsInterval specifies how often (in milliseconds) the node should broadcast its diagnostics message. DiagnosticsInterval int `koanf:"diagnosticsinterval"` + // TransactionListQueryRate limits the rate (per second) at which outgoing TransactionListQuery and + // TransactionRangeQuery messages are sent. Limiting this reduces BBolt write lock contention during + // peer synchronization at startup. See https://github.com/nuts-foundation/nuts-node/issues/4162 + TransactionListQueryRate int `koanf:"transactionlistqueryrate"` } const defaultPayloadRetryDelay = 5 * time.Second const defaultGossipInterval = 5000 const defaultDiagnosticsInterval = 5000 +const defaultTransactionListQueryRate = 2 // DefaultConfig returns the default config for protocol v2 func DefaultConfig() Config { return Config{ - PayloadRetryDelay: defaultPayloadRetryDelay, - GossipInterval: defaultGossipInterval, - DiagnosticsInterval: defaultDiagnosticsInterval, + PayloadRetryDelay: defaultPayloadRetryDelay, + GossipInterval: defaultGossipInterval, + DiagnosticsInterval: defaultDiagnosticsInterval, + TransactionListQueryRate: defaultTransactionListQueryRate, } } @@ -79,16 +85,25 @@ func New( diagnosticsProvider func() transport.Diagnostics, dagStore stoabs.KVStore, ) transport.Protocol { + if config.TransactionListQueryRate <= 0 { + config.TransactionListQueryRate = defaultTransactionListQueryRate + } + // Pre-fill the token bucket to capacity so queries are allowed immediately after startup. + tokens := make(chan struct{}, config.TransactionListQueryRate) + for range config.TransactionListQueryRate { + tokens <- struct{}{} + } ctx, cancel := context.WithCancel(context.Background()) p := &protocol{ - cancel: cancel, - config: config, - ctx: ctx, - state: state, - nodeDID: nodeDID, - decrypter: decrypter, - docResolver: docResolver, - dagStore: dagStore, + cancel: cancel, + config: config, + ctx: ctx, + state: state, + nodeDID: nodeDID, + decrypter: decrypter, + docResolver: docResolver, + dagStore: dagStore, + transactionListQueryTokens: tokens, } p.sender = p p.diagnosticsMan = newPeerDiagnosticsManager(diagnosticsProvider, p.sender.broadcastDiagnostics) @@ -113,6 +128,9 @@ type protocol struct { sender messageSender listHandler *transactionListHandler dagStore stoabs.KVStore + // transactionListQueryTokens is a token bucket that rate-limits outgoing TransactionListQuery and + // TransactionRangeQuery messages. See https://github.com/nuts-foundation/nuts-node/issues/4162 + transactionListQueryTokens chan struct{} } func (p *protocol) CreateClientStream(outgoingContext context.Context, grpcConn grpcLib.ClientConnInterface) (grpcLib.ClientStream, error) { @@ -208,6 +226,26 @@ func (p *protocol) Start() (err error) { p.listHandler.start() }(p.routines) + // Fill the token bucket at the configured rate. Tokens that don't fit (bucket full) are discarded. + // See https://github.com/nuts-foundation/nuts-node/issues/4162 + p.routines.Add(1) + go func(w *sync.WaitGroup) { + defer w.Done() + ticker := time.NewTicker(time.Second / time.Duration(p.config.TransactionListQueryRate)) + defer ticker.Stop() + for { + select { + case <-p.ctx.Done(): + return + case <-ticker.C: + select { + case p.transactionListQueryTokens <- struct{}{}: + default: // bucket full, discard token + } + } + } + }(p.routines) + return } diff --git a/network/transport/v2/senders.go b/network/transport/v2/senders.go index 4509dc48ef..307d7211f3 100644 --- a/network/transport/v2/senders.go +++ b/network/transport/v2/senders.go @@ -82,6 +82,13 @@ func (p *protocol) sendTransactionListQuery(connection grpc.Connection, refs []h return nil } + // Rate-limit outgoing queries to avoid flooding BBolt with write operations when many peers respond + // simultaneously. See https://github.com/nuts-foundation/nuts-node/issues/4162 + if !p.consumeTransactionListQueryToken(connection) { + p.cMan.done(conversation.conversationID) + return nil + } + log.Logger(). WithFields(connection.Peer().ToFields()). WithField(core.LogFieldConversationID, conversation.conversationID.String()). @@ -126,6 +133,13 @@ func (p *protocol) sendTransactionRangeQuery(connection grpc.Connection, lcStart return nil } + // Rate-limit outgoing queries to avoid flooding BBolt with write operations when many peers respond + // simultaneously. See https://github.com/nuts-foundation/nuts-node/issues/4162 + if !p.consumeTransactionListQueryToken(connection) { + p.cMan.done(conversation.conversationID) + return nil + } + log.Logger(). WithFields(connection.Peer().ToFields()). WithField(core.LogFieldConversationID, conversation.conversationID.String()). @@ -134,6 +148,20 @@ func (p *protocol) sendTransactionRangeQuery(connection grpc.Connection, lcStart return connection.Send(p, &Envelope{Message: msg}, false) } +// consumeTransactionListQueryToken tries to consume a token from the rate-limiting bucket. +// Returns true if a token was available (the caller may proceed), false if the rate limit is exceeded. +func (p *protocol) consumeTransactionListQueryToken(connection grpc.Connection) bool { + select { + case <-p.transactionListQueryTokens: + return true + default: + log.Logger(). + WithFields(connection.Peer().ToFields()). + Debug("Transaction list/range query rate limit reached, dropping query") + return false + } +} + // chunkTransactionList splits a large set of transactions into smaller sets. Each set adheres to the maximum message size. func chunkTransactionList(transactions []*Transaction) [][]*Transaction { chunked := make([][]*Transaction, 0) diff --git a/network/transport/v2/senders_test.go b/network/transport/v2/senders_test.go index 8739d667e6..562aa8c9be 100644 --- a/network/transport/v2/senders_test.go +++ b/network/transport/v2/senders_test.go @@ -168,6 +168,23 @@ func TestProtocol_sendTransactionListQuery(t *testing.T) { assert.NotNil(t, actualEnvelope.GetTransactionListQuery().GetConversationID()) }) + t.Run("rate limit reached - query is dropped", func(t *testing.T) { + proto, mocks := newTestProtocol(t, nil) + // Drain all tokens from the bucket (none available = rate limit exceeded). + for range proto.config.TransactionListQueryRate { + <-proto.transactionListQueryTokens + } + + mockConn := grpc.NewMockConnection(mocks.Controller) + mockConn.EXPECT().Peer().AnyTimes().Return(transport.Peer{ID: "peer-1"}) + // No Send expected: query must be dropped. + + err := proto.sendTransactionListQuery(mockConn, []hash.SHA256Hash{hash.FromSlice([]byte("list query"))}) + + assert.NoError(t, err) + assert.Empty(t, proto.cMan.conversations) // conversation was cancelled + }) + performMultipleConversationsTest(t, peer, func(c grpc.Connection, p *protocol, mocks protocolMocks) error { return p.sendTransactionListQuery(c, []hash.SHA256Hash{hash.FromSlice([]byte("list query"))}) })