Skip to content

Commit f855a82

Browse files
committed
Make queue maintainer start cancellable on leadership change
1 parent a4e03c6 commit f855a82

File tree

1 file changed

+60
-21
lines changed

1 file changed

+60
-21
lines changed

client.go

Lines changed: 60 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1284,26 +1284,6 @@ func (c *Client[TTx]) logStatsLoop(ctx context.Context, shouldStart bool, starte
12841284
}
12851285

12861286
func (c *Client[TTx]) handleLeadershipChangeLoop(ctx context.Context, shouldStart bool, started, stopped func()) error {
1287-
handleLeadershipChange := func(ctx context.Context, notification *leadership.Notification) {
1288-
c.baseService.Logger.DebugContext(ctx, c.baseService.Name+": Election change received",
1289-
slog.String("client_id", c.config.ID), slog.Bool("is_leader", notification.IsLeader))
1290-
1291-
switch {
1292-
case notification.IsLeader:
1293-
// Starting the queue maintainer can take a little time so send to
1294-
// this test signal _first_ so tests waiting on it can finish,
1295-
// cancel the queue maintainer start, and overall run much faster.
1296-
c.testSignals.electedLeader.Signal(struct{}{})
1297-
1298-
// Start the queue maintainer with a few retries and exponential
1299-
// backoff. In case of total failure, request resignation.
1300-
c.tryStartQueueMaintainer(ctx)
1301-
1302-
default:
1303-
c.queueMaintainer.Stop()
1304-
}
1305-
}
1306-
13071287
if !shouldStart {
13081288
return nil
13091289
}
@@ -1315,13 +1295,48 @@ func (c *Client[TTx]) handleLeadershipChangeLoop(ctx context.Context, shouldStar
13151295
sub := c.elector.Listen()
13161296
defer sub.Unlisten()
13171297

1298+
// Cancel function for an in-progress tryStartQueueMaintainer. If
1299+
// leadership is lost while the start process is still retrying, this
1300+
// is used to abort it promptly instead of waiting for retries to
1301+
// finish.
1302+
var cancelQueueMaintainerStart context.CancelCauseFunc = func(_ error) {}
1303+
13181304
for {
13191305
select {
13201306
case <-ctx.Done():
1307+
cancelQueueMaintainerStart(context.Cause(ctx))
13211308
return
13221309

13231310
case notification := <-sub.C():
1324-
handleLeadershipChange(ctx, notification)
1311+
c.baseService.Logger.DebugContext(ctx, c.baseService.Name+": Election change received",
1312+
slog.String("client_id", c.config.ID), slog.Bool("is_leader", notification.IsLeader))
1313+
1314+
switch {
1315+
case notification.IsLeader:
1316+
// Starting the queue maintainer can take a little time so
1317+
// send to this test signal first so tests waiting on it
1318+
// can finish, cancel the queue maintainer start, and
1319+
// overall run much faster.
1320+
c.testSignals.electedLeader.Signal(struct{}{})
1321+
1322+
// Start the queue maintainer with a few retries and
1323+
// exponential backoff in a separate goroutine so the
1324+
// leadership change loop remains responsive to new
1325+
// notifications. startCtx is used for cancellation in case
1326+
// leadership is lost while retries are in progress.
1327+
var startCtx context.Context
1328+
startCtx, cancelQueueMaintainerStart = context.WithCancelCause(ctx)
1329+
go c.tryStartQueueMaintainer(startCtx)
1330+
1331+
default:
1332+
// Cancel any in-progress start attempts before stopping. We
1333+
// sent a startstop.ErrStop to make sure services like
1334+
// Reindexer run any specific cleanup code for stops.
1335+
cancelQueueMaintainerStart(startstop.ErrStop)
1336+
cancelQueueMaintainerStart = func(_ error) {}
1337+
1338+
c.queueMaintainer.Stop()
1339+
}
13251340
}
13261341
}
13271342
}()
@@ -1336,8 +1351,20 @@ func (c *Client[TTx]) handleLeadershipChangeLoop(ctx context.Context, shouldStar
13361351
func (c *Client[TTx]) tryStartQueueMaintainer(ctx context.Context) {
13371352
const maxStartAttempts = 3
13381353

1354+
ctxCancelled := func() bool {
1355+
if ctx.Err() != nil {
1356+
c.baseService.Logger.InfoContext(ctx, c.baseService.Name+": Queue maintainer start cancelled")
1357+
return true
1358+
}
1359+
return false
1360+
}
1361+
13391362
var lastErr error
13401363
for attempt := 1; attempt <= maxStartAttempts; attempt++ {
1364+
if ctxCancelled() {
1365+
return
1366+
}
1367+
13411368
if lastErr = c.queueMaintainer.Start(ctx); lastErr == nil {
13421369
return
13431370
}
@@ -1347,6 +1374,14 @@ func (c *Client[TTx]) tryStartQueueMaintainer(ctx context.Context) {
13471374

13481375
c.testSignals.queueMaintainerStartError.Signal(lastErr)
13491376

1377+
// If Start blocked long enough for this context to be cancelled
1378+
// (e.g. leadership was lost), bail out immediately. A newer
1379+
// leadership term may already have started the maintainer, and
1380+
// calling Stop here would tear it down.
1381+
if ctxCancelled() {
1382+
return
1383+
}
1384+
13501385
// Stop the queue maintainer to fully reset its state (and any
13511386
// sub-services) before retrying.
13521387
c.queueMaintainer.Stop()
@@ -1356,6 +1391,10 @@ func (c *Client[TTx]) tryStartQueueMaintainer(ctx context.Context) {
13561391
}
13571392
}
13581393

1394+
if ctxCancelled() {
1395+
return
1396+
}
1397+
13591398
c.baseService.Logger.ErrorContext(ctx, c.baseService.Name+": Queue maintainer failed to start after all attempts, requesting leader resignation",
13601399
slog.String("err", lastErr.Error()))
13611400

0 commit comments

Comments
 (0)