Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 49 additions & 11 deletions network/transport/v2/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,18 +54,24 @@ type Config struct {
GossipInterval int `koanf:"gossipinterval"`
// DiagnosticsInterval specifies how often (in milliseconds) the node should broadcast its diagnostics message.
DiagnosticsInterval int `koanf:"diagnosticsinterval"`
// TransactionListQueryRate limits the rate (per second) at which outgoing TransactionListQuery and
// TransactionRangeQuery messages are sent. Limiting this reduces BBolt write lock contention during
// peer synchronization at startup. See https://github.com/nuts-foundation/nuts-node/issues/4162
TransactionListQueryRate int `koanf:"transactionlistqueryrate"`
}

const defaultPayloadRetryDelay = 5 * time.Second
const defaultGossipInterval = 5000
const defaultDiagnosticsInterval = 5000
const defaultTransactionListQueryRate = 2

// DefaultConfig returns the default config for protocol v2
func DefaultConfig() Config {
return Config{
PayloadRetryDelay: defaultPayloadRetryDelay,
GossipInterval: defaultGossipInterval,
DiagnosticsInterval: defaultDiagnosticsInterval,
PayloadRetryDelay: defaultPayloadRetryDelay,
GossipInterval: defaultGossipInterval,
DiagnosticsInterval: defaultDiagnosticsInterval,
TransactionListQueryRate: defaultTransactionListQueryRate,
}
}

Expand All @@ -79,16 +85,25 @@ func New(
diagnosticsProvider func() transport.Diagnostics,
dagStore stoabs.KVStore,
) transport.Protocol {
if config.TransactionListQueryRate <= 0 {
config.TransactionListQueryRate = defaultTransactionListQueryRate
}
// Pre-fill the token bucket to capacity so queries are allowed immediately after startup.
tokens := make(chan struct{}, config.TransactionListQueryRate)
for range config.TransactionListQueryRate {
tokens <- struct{}{}
}
ctx, cancel := context.WithCancel(context.Background())
p := &protocol{
cancel: cancel,
config: config,
ctx: ctx,
state: state,
nodeDID: nodeDID,
decrypter: decrypter,
docResolver: docResolver,
dagStore: dagStore,
cancel: cancel,
config: config,
ctx: ctx,
state: state,
nodeDID: nodeDID,
decrypter: decrypter,
docResolver: docResolver,
dagStore: dagStore,
transactionListQueryTokens: tokens,
}
p.sender = p
p.diagnosticsMan = newPeerDiagnosticsManager(diagnosticsProvider, p.sender.broadcastDiagnostics)
Expand All @@ -113,6 +128,9 @@ type protocol struct {
sender messageSender
listHandler *transactionListHandler
dagStore stoabs.KVStore
// transactionListQueryTokens is a token bucket that rate-limits outgoing TransactionListQuery and
// TransactionRangeQuery messages. See https://github.com/nuts-foundation/nuts-node/issues/4162
transactionListQueryTokens chan struct{}
}

func (p *protocol) CreateClientStream(outgoingContext context.Context, grpcConn grpcLib.ClientConnInterface) (grpcLib.ClientStream, error) {
Expand Down Expand Up @@ -208,6 +226,26 @@ func (p *protocol) Start() (err error) {
p.listHandler.start()
}(p.routines)

// Fill the token bucket at the configured rate. Tokens that don't fit (bucket full) are discarded.
// See https://github.com/nuts-foundation/nuts-node/issues/4162
p.routines.Add(1)
go func(w *sync.WaitGroup) {
defer w.Done()
ticker := time.NewTicker(time.Second / time.Duration(p.config.TransactionListQueryRate))
defer ticker.Stop()
for {
select {
case <-p.ctx.Done():
return
case <-ticker.C:
select {
case p.transactionListQueryTokens <- struct{}{}:
default: // bucket full, discard token
}
}
}
}(p.routines)

return
}

Expand Down
28 changes: 28 additions & 0 deletions network/transport/v2/senders.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,13 @@ func (p *protocol) sendTransactionListQuery(connection grpc.Connection, refs []h
return nil
}

// Rate-limit outgoing queries to avoid flooding BBolt with write operations when many peers respond
// simultaneously. See https://github.com/nuts-foundation/nuts-node/issues/4162
if !p.consumeTransactionListQueryToken(connection) {
p.cMan.done(conversation.conversationID)
return nil
}

log.Logger().
WithFields(connection.Peer().ToFields()).
WithField(core.LogFieldConversationID, conversation.conversationID.String()).
Expand Down Expand Up @@ -126,6 +133,13 @@ func (p *protocol) sendTransactionRangeQuery(connection grpc.Connection, lcStart
return nil
}

// Rate-limit outgoing queries to avoid flooding BBolt with write operations when many peers respond
// simultaneously. See https://github.com/nuts-foundation/nuts-node/issues/4162
if !p.consumeTransactionListQueryToken(connection) {
p.cMan.done(conversation.conversationID)
return nil
}

log.Logger().
WithFields(connection.Peer().ToFields()).
WithField(core.LogFieldConversationID, conversation.conversationID.String()).
Expand All @@ -134,6 +148,20 @@ func (p *protocol) sendTransactionRangeQuery(connection grpc.Connection, lcStart
return connection.Send(p, &Envelope{Message: msg}, false)
}

// consumeTransactionListQueryToken tries to consume a token from the rate-limiting bucket.
// Returns true if a token was available (the caller may proceed), false if the rate limit is exceeded.
func (p *protocol) consumeTransactionListQueryToken(connection grpc.Connection) bool {
select {
case <-p.transactionListQueryTokens:
return true
default:
log.Logger().
WithFields(connection.Peer().ToFields()).
Debug("Transaction list/range query rate limit reached, dropping query")
return false
}
}

// chunkTransactionList splits a large set of transactions into smaller sets. Each set adheres to the maximum message size.
func chunkTransactionList(transactions []*Transaction) [][]*Transaction {
chunked := make([][]*Transaction, 0)
Expand Down
17 changes: 17 additions & 0 deletions network/transport/v2/senders_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,23 @@ func TestProtocol_sendTransactionListQuery(t *testing.T) {
assert.NotNil(t, actualEnvelope.GetTransactionListQuery().GetConversationID())
})

t.Run("rate limit reached - query is dropped", func(t *testing.T) {
proto, mocks := newTestProtocol(t, nil)
// Drain all tokens from the bucket (none available = rate limit exceeded).
for range proto.config.TransactionListQueryRate {
<-proto.transactionListQueryTokens
}

mockConn := grpc.NewMockConnection(mocks.Controller)
mockConn.EXPECT().Peer().AnyTimes().Return(transport.Peer{ID: "peer-1"})
// No Send expected: query must be dropped.

err := proto.sendTransactionListQuery(mockConn, []hash.SHA256Hash{hash.FromSlice([]byte("list query"))})

assert.NoError(t, err)
assert.Empty(t, proto.cMan.conversations) // conversation was cancelled
})

performMultipleConversationsTest(t, peer, func(c grpc.Connection, p *protocol, mocks protocolMocks) error {
return p.sendTransactionListQuery(c, []hash.SHA256Hash{hash.FromSlice([]byte("list query"))})
})
Expand Down
Loading