From d513fd0bfe765aec718c70620258f1a0e13667c9 Mon Sep 17 00:00:00 2001 From: Greg Katz Date: Sun, 3 May 2026 15:10:49 -0700 Subject: [PATCH] Reset RunWorkload retry counter after stable run When the Docker daemon is repeatedly unresponsive, the RunWorkload retry loop's 10-attempt budget can be exhausted by short-lived but successful starts: the container exits, the manager retries, the new container runs healthily for tens of seconds before the next disruption kills it, and the cycle repeats. After 10 cycles the manager logs "failed to restart after max attempts, giving up" and the workload stays offline until manually restarted, even though the original cause was transient. Track the duration of each runner.Run call. If the workload ran past a stable threshold before failing, treat the next attempt as a fresh failure rather than a continuation of the current retry sequence: reset the attempt counter and backoff delay. This preserves the budget's role as a bound on tight crash loops while allowing recovery from sustained external disturbances. Add a small testability seam (runner factory on DefaultManager, defaulting to runner.NewRunner) so the retry loop has unit-test coverage of the boundary cases. Fixes #5171 Co-authored-by: Claude Opus 4.7 Signed-off-by: Greg Katz --- pkg/workloads/manager.go | 104 +++++++++++++++++-- pkg/workloads/manager_test.go | 153 ++++++++++++++++++++++++++++ pkg/workloads/mocks/mock_manager.go | 4 +- 3 files changed, 251 insertions(+), 10 deletions(-) diff --git a/pkg/workloads/manager.go b/pkg/workloads/manager.go index 7b0187bac7..c509b849ff 100644 --- a/pkg/workloads/manager.go +++ b/pkg/workloads/manager.go @@ -45,7 +45,7 @@ type CompletionFunc func() error // NOTE: This interface may be split up in future PRs, in particular, operations // which are only relevant to the CLI/API use case will be split out. // -//go:generate mockgen -destination=mocks/mock_manager.go -package=mocks -source=manager.go Manager +//go:generate mockgen -destination=mocks/mock_manager.go -package=mocks github.com/stacklok/toolhive/pkg/workloads Manager type Manager interface { // GetWorkload retrieves details of the named workload including its status. GetWorkload(ctx context.Context, workloadName string) (core.Workload, error) @@ -97,6 +97,62 @@ type DefaultManager struct { runtime rt.Runtime statuses statuses.StatusManager configProvider config.Provider + + // retryConfig holds tunable parameters for the RunWorkload retry loop. + // Production callers leave this nil so RunWorkload uses defaultRetryConfig(); + // tests inject smaller values for fast execution. + retryConfig *retryConfig + + // newRunner is the factory used by RunWorkload to construct the per-attempt + // runner. Production callers leave this nil so RunWorkload falls back to + // runner.NewRunner; tests inject a mock factory. + newRunner mcpRunnerFactory +} + +// mcpRunner is the subset of *runner.Runner that RunWorkload's retry loop +// depends on. It exists only to make the retry loop unit-testable; it is +// not part of the workloads package's public API. +type mcpRunner interface { + Run(ctx context.Context) error +} + +// mcpRunnerFactory constructs an mcpRunner for one attempt of the retry loop. +type mcpRunnerFactory func(*runner.RunConfig, statuses.StatusManager) mcpRunner + +// retryConfig bundles the RunWorkload retry loop's tunable parameters. +type retryConfig struct { + maxRetries int + initialDelay time.Duration + maxBackoff time.Duration + stableRunThreshold time.Duration +} + +// defaultRetryConfig returns the production retry parameters. +func defaultRetryConfig() retryConfig { + return retryConfig{ + maxRetries: 10, + initialDelay: 5 * time.Second, + maxBackoff: 60 * time.Second, + stableRunThreshold: 30 * time.Second, + } +} + +// retryConfigOrDefault returns the manager's retryConfig if set, otherwise defaults. +func (d *DefaultManager) retryConfigOrDefault() retryConfig { + if d.retryConfig == nil { + return defaultRetryConfig() + } + return *d.retryConfig +} + +// newRunnerOrDefault returns the manager's runner factory if set, otherwise wraps runner.NewRunner. +func (d *DefaultManager) newRunnerOrDefault() mcpRunnerFactory { + if d.newRunner != nil { + return d.newRunner + } + return func(rc *runner.RunConfig, sm statuses.StatusManager) mcpRunner { + return runner.NewRunner(rc, sm) + } } // ErrWorkloadNotRunning is returned when a container cannot be found by name. @@ -416,8 +472,10 @@ func (d *DefaultManager) RunWorkload(ctx context.Context, runConfig *runner.RunC } // Retry loop with exponential backoff for container restarts - maxRetries := 10 // Allow many retries for transient issues - retryDelay := 5 * time.Second + cfg := d.retryConfigOrDefault() + maxRetries := cfg.maxRetries + newRunner := d.newRunnerOrDefault() + retryDelay := cfg.initialDelay for attempt := 1; attempt <= maxRetries; attempt++ { if attempt > 1 { @@ -426,19 +484,22 @@ func (d *DefaultManager) RunWorkload(ctx context.Context, runConfig *runner.RunC // Exponential backoff: 5s, 10s, 20s, 40s, 60s (capped) retryDelay *= 2 - if retryDelay > 60*time.Second { - retryDelay = 60 * time.Second + if retryDelay > cfg.maxBackoff { + retryDelay = cfg.maxBackoff } } - mcpRunner := runner.NewRunner(runConfig, d.statuses) + mcpRunner := newRunner(runConfig, d.statuses) + runStartTime := time.Now() err := mcpRunner.Run(ctx) + runDuration := time.Since(runStartTime) if err != nil { // Check if this is a "container exited, restart needed" error if errors.Is(err, runner.ErrContainerExitedRestartNeeded) { slog.Warn("workload exited unexpectedly, restarting", - "workload", runConfig.BaseName, "attempt", attempt, "maxRetries", maxRetries) + "workload", runConfig.BaseName, "attempt", attempt, "maxRetries", maxRetries, + "runDuration", runDuration) // Remove from client config so clients notice the restart clientManager, clientErr := client.NewManager(ctx) @@ -449,17 +510,44 @@ func (d *DefaultManager) RunWorkload(ctx context.Context, runConfig *runner.RunC } } + // A "stable" run reached at least stableRunThreshold of runtime before + // failing — long enough that we treat the next attempt as a fresh + // failure rather than another tick of the current retry sequence. + stable := runDuration >= cfg.stableRunThreshold + // Set status to starting (since we're restarting) + statusReason := "Container exited, restarting" + if stable { + statusReason = "Container exited after stable run, restarting" + } statusErr := d.statuses.SetWorkloadStatus( ctx, runConfig.BaseName, rt.WorkloadStatusStarting, - "Container exited, restarting", + statusReason, ) if statusErr != nil { slog.Warn("failed to set workload status to starting", "workload", runConfig.BaseName, "error", statusErr) } + // If the workload ran past the stable threshold, reset the retry + // counter. Without this reset, sustained external disturbances (a + // flapping container runtime, host sleep/wake) that repeatedly kill a + // healthy container exhaust the retry budget across many *successful* + // short runs, and the manager eventually gives up on a workload that + // would have kept recovering once the disturbance ended. + if stable { + slog.Debug("resetting retry counter after stable run", + "workload", runConfig.BaseName, "duration", runDuration) + // Setting attempt to 0 means the for-clause's attempt++ on `continue` + // lands at 1, so the `if attempt > 1` sleep block is skipped and the + // next run starts immediately. retryDelay is reset so that any later + // failures within this fresh sequence use the initial backoff. + attempt = 0 + retryDelay = cfg.initialDelay + continue + } + // If we haven't exhausted retries, continue the loop if attempt < maxRetries { continue diff --git a/pkg/workloads/manager_test.go b/pkg/workloads/manager_test.go index 5fead5aad4..f252a90f24 100644 --- a/pkg/workloads/manager_test.go +++ b/pkg/workloads/manager_test.go @@ -20,6 +20,7 @@ import ( runtimeMocks "github.com/stacklok/toolhive/pkg/container/runtime/mocks" "github.com/stacklok/toolhive/pkg/core" "github.com/stacklok/toolhive/pkg/runner" + "github.com/stacklok/toolhive/pkg/workloads/statuses" statusMocks "github.com/stacklok/toolhive/pkg/workloads/statuses/mocks" ) @@ -1158,6 +1159,158 @@ func TestDefaultManager_RunWorkload(t *testing.T) { } } +// fakeRunOutcome describes a single attempt's behavior for fakeRunnerFactory. +type fakeRunOutcome struct { + duration time.Duration + err error +} + +// fakeRunner implements mcpRunner for retry-loop tests by sleeping a +// configured duration before returning a configured error. +type fakeRunner struct { + duration time.Duration + err error +} + +func (f *fakeRunner) Run(ctx context.Context) error { + if f.duration > 0 { + select { + case <-time.After(f.duration): + case <-ctx.Done(): + return ctx.Err() + } + } + return f.err +} + +// fakeRunnerFactory replays a sequence of outcomes across successive calls. +// Calls beyond the configured sequence return ErrContainerExitedRestartNeeded +// with no delay so the test fails fast if the loop runs longer than expected. +type fakeRunnerFactory struct { + outcomes []fakeRunOutcome + callCount int +} + +func (f *fakeRunnerFactory) factory() mcpRunnerFactory { + return func(_ *runner.RunConfig, _ statuses.StatusManager) mcpRunner { + idx := f.callCount + f.callCount++ + if idx >= len(f.outcomes) { + return &fakeRunner{err: runner.ErrContainerExitedRestartNeeded} + } + return &fakeRunner{duration: f.outcomes[idx].duration, err: f.outcomes[idx].err} + } +} + +func TestDefaultManager_RunWorkload_RetryCounterReset(t *testing.T) { + t.Parallel() + + // Test thresholds: short enough to keep the test fast, but with a wide + // margin between "short" and "stable" run durations so scheduler jitter + // on a busy CI host does not flip a "short" run to "stable". Production + // thresholds are an order of magnitude larger. + testCfg := &retryConfig{ + maxRetries: 3, + initialDelay: 1 * time.Millisecond, + maxBackoff: 5 * time.Millisecond, + stableRunThreshold: 50 * time.Millisecond, + } + + short := fakeRunOutcome{duration: 5 * time.Millisecond, err: runner.ErrContainerExitedRestartNeeded} + stable := fakeRunOutcome{duration: 200 * time.Millisecond, err: runner.ErrContainerExitedRestartNeeded} + atThreshold := fakeRunOutcome{duration: 50 * time.Millisecond, err: runner.ErrContainerExitedRestartNeeded} + + tests := []struct { + name string + outcomes []fakeRunOutcome + expectError bool + errorContains string + expectedAttempts int + }{ + { + name: "all short runs exhaust the retry budget", + outcomes: []fakeRunOutcome{short, short, short}, + expectError: true, + errorContains: "container restart failed after 3 attempts", + expectedAttempts: 3, + }, + { + name: "single stable run at start resets counter, extends budget by one cycle", + outcomes: []fakeRunOutcome{stable, short, short, short}, + expectError: true, + errorContains: "container restart failed after 3 attempts", + expectedAttempts: 4, + }, + { + name: "stable run mid-sequence resets counter, granting a full fresh budget", + outcomes: []fakeRunOutcome{short, stable, short, short, short}, + expectError: true, + errorContains: "container restart failed after 3 attempts", + expectedAttempts: 5, + }, + { + name: "run exactly at threshold counts as stable and resets counter", + outcomes: []fakeRunOutcome{atThreshold, short, short, short}, + expectError: true, + errorContains: "container restart failed after 3 attempts", + expectedAttempts: 4, + }, + { + name: "stable run followed by clean shutdown returns success", + outcomes: []fakeRunOutcome{stable, {duration: 1 * time.Millisecond, err: nil}}, + expectError: false, + expectedAttempts: 2, + }, + { + name: "non-restart error bails immediately without using the retry budget", + outcomes: []fakeRunOutcome{{duration: 1 * time.Millisecond, err: errors.New("fatal config error")}}, + expectError: true, + errorContains: "fatal config error", + expectedAttempts: 1, + }, + { + name: "successful run returns without retrying", + outcomes: []fakeRunOutcome{{duration: 1 * time.Millisecond, err: nil}}, + expectError: false, + expectedAttempts: 1, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockStatusMgr := statusMocks.NewMockStatusManager(ctrl) + mockStatusMgr.EXPECT(). + SetWorkloadStatus(gomock.Any(), "retry-test", gomock.Any(), gomock.Any()). + Return(nil). + AnyTimes() + + factory := &fakeRunnerFactory{outcomes: tt.outcomes} + manager := &DefaultManager{ + statuses: mockStatusMgr, + retryConfig: testCfg, + newRunner: factory.factory(), + } + + err := manager.RunWorkload(context.Background(), &runner.RunConfig{BaseName: "retry-test"}) + + if tt.expectError { + require.Error(t, err) + if tt.errorContains != "" { + assert.Contains(t, err.Error(), tt.errorContains) + } + } else { + require.NoError(t, err) + } + assert.Equal(t, tt.expectedAttempts, factory.callCount, "unexpected number of run attempts") + }) + } +} + func TestDefaultManager_validateSecretParameters(t *testing.T) { t.Parallel() diff --git a/pkg/workloads/mocks/mock_manager.go b/pkg/workloads/mocks/mock_manager.go index 7480792951..f0e8c005b7 100644 --- a/pkg/workloads/mocks/mock_manager.go +++ b/pkg/workloads/mocks/mock_manager.go @@ -1,9 +1,9 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: manager.go +// Source: github.com/stacklok/toolhive/pkg/workloads (interfaces: Manager) // // Generated by this command: // -// mockgen -destination=mocks/mock_manager.go -package=mocks -source=manager.go Manager +// mockgen -destination=mocks/mock_manager.go -package=mocks github.com/stacklok/toolhive/pkg/workloads Manager // // Package mocks is a generated GoMock package.