Skip to content

Commit 5bb679d

Browse files
committed
perf(replication): optimize gossip tx replication paths and stabilize async lifecycle
Batch non-boundary ops in ReplicateRequest buffers, add sync direct stream send/recv fast path, and make async send non-blocking while fixing shutdown/context races that caused stale peer writes and queue-close replication failures. Signed-off-by: Adphi <philippe.adrien.nousse@gmail.com>
1 parent 7123a7f commit 5bb679d

22 files changed

Lines changed: 881 additions & 594 deletions

internal/badgerd/replication/async/queue.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,8 @@ func (q *Queue[Req, Res]) run() {
177177
return
178178
}
179179
msg.ch <- &Result[Res]{}
180+
closeSafe(q.close)
181+
return
180182
case <-q.close:
181183
return
182184
}

internal/badgerd/replication/gossip/gossip.go

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,7 @@ func New(ctx context.Context, db replication.DB, opts ...replication.Option) (re
231231
return nil, err
232232
}
233233

234-
r.run(ctx)
234+
r.run(r.ctx)
235235
<-r.ready
236236
log.Infof("init replication")
237237
if err := r.init(ctx); err != nil {
@@ -277,9 +277,20 @@ func (r *Gossip) run(ctx context.Context) {
277277
go r.handleEvents(ctx)
278278
go func() {
279279
log.Infof("waiting for leader or version to converge")
280-
<-r.converged
280+
select {
281+
case <-r.converged:
282+
case <-ctx.Done():
283+
return
284+
}
281285
log.Infof("leader or version converged")
282-
time.Sleep(time.Duration(rand.Intn(1000))*time.Millisecond + 100*time.Millisecond)
286+
select {
287+
case <-time.After(time.Duration(rand.Intn(1000))*time.Millisecond + 100*time.Millisecond):
288+
case <-ctx.Done():
289+
return
290+
}
291+
if ctx.Err() != nil {
292+
return
293+
}
283294
r.Elect(ctx)
284295
}()
285296
}

internal/badgerd/replication/gossip/nodes.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,11 +47,17 @@ type node struct {
4747

4848
func (r *Gossip) handleEvents(ctx context.Context) {
4949
for e := range r.events {
50-
go r.handleEvent(ctx, e)
50+
if ctx.Err() != nil {
51+
return
52+
}
53+
r.handleEvent(ctx, e)
5154
}
5255
}
5356

5457
func (r *Gossip) handleEvent(ctx context.Context, e memberlist.NodeEvent) {
58+
if ctx.Err() != nil {
59+
return
60+
}
5561
log := logger.C(ctx).WithField("component", "replication")
5662
r.mu.Lock()
5763
defer r.mu.Unlock()

internal/badgerd/replication/gossip/pb/gossip.pb.defaults.go

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

internal/badgerd/replication/gossip/pb/gossip.pb.fields.go

Lines changed: 6 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)