diff --git a/pkg/workloads/manager.go b/pkg/workloads/manager.go index 7b0187bac7..c509b849ff 100644 --- a/pkg/workloads/manager.go +++ b/pkg/workloads/manager.go @@ -45,7 +45,7 @@ type CompletionFunc func() error // NOTE: This interface may be split up in future PRs, in particular, operations // which are only relevant to the CLI/API use case will be split out. // -//go:generate mockgen -destination=mocks/mock_manager.go -package=mocks -source=manager.go Manager +//go:generate mockgen -destination=mocks/mock_manager.go -package=mocks github.com/stacklok/toolhive/pkg/workloads Manager type Manager interface { // GetWorkload retrieves details of the named workload including its status. GetWorkload(ctx context.Context, workloadName string) (core.Workload, error) @@ -97,6 +97,62 @@ type DefaultManager struct { runtime rt.Runtime statuses statuses.StatusManager configProvider config.Provider + + // retryConfig holds tunable parameters for the RunWorkload retry loop. + // Production callers leave this nil so RunWorkload uses defaultRetryConfig(); + // tests inject smaller values for fast execution. + retryConfig *retryConfig + + // newRunner is the factory used by RunWorkload to construct the per-attempt + // runner. Production callers leave this nil so RunWorkload falls back to + // runner.NewRunner; tests inject a mock factory. + newRunner mcpRunnerFactory +} + +// mcpRunner is the subset of *runner.Runner that RunWorkload's retry loop +// depends on. It exists only to make the retry loop unit-testable; it is +// not part of the workloads package's public API. +type mcpRunner interface { + Run(ctx context.Context) error +} + +// mcpRunnerFactory constructs an mcpRunner for one attempt of the retry loop. +type mcpRunnerFactory func(*runner.RunConfig, statuses.StatusManager) mcpRunner + +// retryConfig bundles the RunWorkload retry loop's tunable parameters. +type retryConfig struct { + maxRetries int + initialDelay time.Duration + maxBackoff time.Duration + stableRunThreshold time.Duration +} + +// defaultRetryConfig returns the production retry parameters. +func defaultRetryConfig() retryConfig { + return retryConfig{ + maxRetries: 10, + initialDelay: 5 * time.Second, + maxBackoff: 60 * time.Second, + stableRunThreshold: 30 * time.Second, + } +} + +// retryConfigOrDefault returns the manager's retryConfig if set, otherwise defaults. +func (d *DefaultManager) retryConfigOrDefault() retryConfig { + if d.retryConfig == nil { + return defaultRetryConfig() + } + return *d.retryConfig +} + +// newRunnerOrDefault returns the manager's runner factory if set, otherwise wraps runner.NewRunner. +func (d *DefaultManager) newRunnerOrDefault() mcpRunnerFactory { + if d.newRunner != nil { + return d.newRunner + } + return func(rc *runner.RunConfig, sm statuses.StatusManager) mcpRunner { + return runner.NewRunner(rc, sm) + } } // ErrWorkloadNotRunning is returned when a container cannot be found by name. @@ -416,8 +472,10 @@ func (d *DefaultManager) RunWorkload(ctx context.Context, runConfig *runner.RunC } // Retry loop with exponential backoff for container restarts - maxRetries := 10 // Allow many retries for transient issues - retryDelay := 5 * time.Second + cfg := d.retryConfigOrDefault() + maxRetries := cfg.maxRetries + newRunner := d.newRunnerOrDefault() + retryDelay := cfg.initialDelay for attempt := 1; attempt <= maxRetries; attempt++ { if attempt > 1 { @@ -426,19 +484,22 @@ func (d *DefaultManager) RunWorkload(ctx context.Context, runConfig *runner.RunC // Exponential backoff: 5s, 10s, 20s, 40s, 60s (capped) retryDelay *= 2 - if retryDelay > 60*time.Second { - retryDelay = 60 * time.Second + if retryDelay > cfg.maxBackoff { + retryDelay = cfg.maxBackoff } } - mcpRunner := runner.NewRunner(runConfig, d.statuses) + mcpRunner := newRunner(runConfig, d.statuses) + runStartTime := time.Now() err := mcpRunner.Run(ctx) + runDuration := time.Since(runStartTime) if err != nil { // Check if this is a "container exited, restart needed" error if errors.Is(err, runner.ErrContainerExitedRestartNeeded) { slog.Warn("workload exited unexpectedly, restarting", - "workload", runConfig.BaseName, "attempt", attempt, "maxRetries", maxRetries) + "workload", runConfig.BaseName, "attempt", attempt, "maxRetries", maxRetries, + "runDuration", runDuration) // Remove from client config so clients notice the restart clientManager, clientErr := client.NewManager(ctx) @@ -449,17 +510,44 @@ func (d *DefaultManager) RunWorkload(ctx context.Context, runConfig *runner.RunC } } + // A "stable" run reached at least stableRunThreshold of runtime before + // failing — long enough that we treat the next attempt as a fresh + // failure rather than another tick of the current retry sequence. + stable := runDuration >= cfg.stableRunThreshold + // Set status to starting (since we're restarting) + statusReason := "Container exited, restarting" + if stable { + statusReason = "Container exited after stable run, restarting" + } statusErr := d.statuses.SetWorkloadStatus( ctx, runConfig.BaseName, rt.WorkloadStatusStarting, - "Container exited, restarting", + statusReason, ) if statusErr != nil { slog.Warn("failed to set workload status to starting", "workload", runConfig.BaseName, "error", statusErr) } + // If the workload ran past the stable threshold, reset the retry + // counter. Without this reset, sustained external disturbances (a + // flapping container runtime, host sleep/wake) that repeatedly kill a + // healthy container exhaust the retry budget across many *successful* + // short runs, and the manager eventually gives up on a workload that + // would have kept recovering once the disturbance ended. + if stable { + slog.Debug("resetting retry counter after stable run", + "workload", runConfig.BaseName, "duration", runDuration) + // Setting attempt to 0 means the for-clause's attempt++ on `continue` + // lands at 1, so the `if attempt > 1` sleep block is skipped and the + // next run starts immediately. retryDelay is reset so that any later + // failures within this fresh sequence use the initial backoff. + attempt = 0 + retryDelay = cfg.initialDelay + continue + } + // If we haven't exhausted retries, continue the loop if attempt < maxRetries { continue diff --git a/pkg/workloads/manager_test.go b/pkg/workloads/manager_test.go index 5fead5aad4..f252a90f24 100644 --- a/pkg/workloads/manager_test.go +++ b/pkg/workloads/manager_test.go @@ -20,6 +20,7 @@ import ( runtimeMocks "github.com/stacklok/toolhive/pkg/container/runtime/mocks" "github.com/stacklok/toolhive/pkg/core" "github.com/stacklok/toolhive/pkg/runner" + "github.com/stacklok/toolhive/pkg/workloads/statuses" statusMocks "github.com/stacklok/toolhive/pkg/workloads/statuses/mocks" ) @@ -1158,6 +1159,158 @@ func TestDefaultManager_RunWorkload(t *testing.T) { } } +// fakeRunOutcome describes a single attempt's behavior for fakeRunnerFactory. +type fakeRunOutcome struct { + duration time.Duration + err error +} + +// fakeRunner implements mcpRunner for retry-loop tests by sleeping a +// configured duration before returning a configured error. +type fakeRunner struct { + duration time.Duration + err error +} + +func (f *fakeRunner) Run(ctx context.Context) error { + if f.duration > 0 { + select { + case <-time.After(f.duration): + case <-ctx.Done(): + return ctx.Err() + } + } + return f.err +} + +// fakeRunnerFactory replays a sequence of outcomes across successive calls. +// Calls beyond the configured sequence return ErrContainerExitedRestartNeeded +// with no delay so the test fails fast if the loop runs longer than expected. +type fakeRunnerFactory struct { + outcomes []fakeRunOutcome + callCount int +} + +func (f *fakeRunnerFactory) factory() mcpRunnerFactory { + return func(_ *runner.RunConfig, _ statuses.StatusManager) mcpRunner { + idx := f.callCount + f.callCount++ + if idx >= len(f.outcomes) { + return &fakeRunner{err: runner.ErrContainerExitedRestartNeeded} + } + return &fakeRunner{duration: f.outcomes[idx].duration, err: f.outcomes[idx].err} + } +} + +func TestDefaultManager_RunWorkload_RetryCounterReset(t *testing.T) { + t.Parallel() + + // Test thresholds: short enough to keep the test fast, but with a wide + // margin between "short" and "stable" run durations so scheduler jitter + // on a busy CI host does not flip a "short" run to "stable". Production + // thresholds are an order of magnitude larger. + testCfg := &retryConfig{ + maxRetries: 3, + initialDelay: 1 * time.Millisecond, + maxBackoff: 5 * time.Millisecond, + stableRunThreshold: 50 * time.Millisecond, + } + + short := fakeRunOutcome{duration: 5 * time.Millisecond, err: runner.ErrContainerExitedRestartNeeded} + stable := fakeRunOutcome{duration: 200 * time.Millisecond, err: runner.ErrContainerExitedRestartNeeded} + atThreshold := fakeRunOutcome{duration: 50 * time.Millisecond, err: runner.ErrContainerExitedRestartNeeded} + + tests := []struct { + name string + outcomes []fakeRunOutcome + expectError bool + errorContains string + expectedAttempts int + }{ + { + name: "all short runs exhaust the retry budget", + outcomes: []fakeRunOutcome{short, short, short}, + expectError: true, + errorContains: "container restart failed after 3 attempts", + expectedAttempts: 3, + }, + { + name: "single stable run at start resets counter, extends budget by one cycle", + outcomes: []fakeRunOutcome{stable, short, short, short}, + expectError: true, + errorContains: "container restart failed after 3 attempts", + expectedAttempts: 4, + }, + { + name: "stable run mid-sequence resets counter, granting a full fresh budget", + outcomes: []fakeRunOutcome{short, stable, short, short, short}, + expectError: true, + errorContains: "container restart failed after 3 attempts", + expectedAttempts: 5, + }, + { + name: "run exactly at threshold counts as stable and resets counter", + outcomes: []fakeRunOutcome{atThreshold, short, short, short}, + expectError: true, + errorContains: "container restart failed after 3 attempts", + expectedAttempts: 4, + }, + { + name: "stable run followed by clean shutdown returns success", + outcomes: []fakeRunOutcome{stable, {duration: 1 * time.Millisecond, err: nil}}, + expectError: false, + expectedAttempts: 2, + }, + { + name: "non-restart error bails immediately without using the retry budget", + outcomes: []fakeRunOutcome{{duration: 1 * time.Millisecond, err: errors.New("fatal config error")}}, + expectError: true, + errorContains: "fatal config error", + expectedAttempts: 1, + }, + { + name: "successful run returns without retrying", + outcomes: []fakeRunOutcome{{duration: 1 * time.Millisecond, err: nil}}, + expectError: false, + expectedAttempts: 1, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockStatusMgr := statusMocks.NewMockStatusManager(ctrl) + mockStatusMgr.EXPECT(). + SetWorkloadStatus(gomock.Any(), "retry-test", gomock.Any(), gomock.Any()). + Return(nil). + AnyTimes() + + factory := &fakeRunnerFactory{outcomes: tt.outcomes} + manager := &DefaultManager{ + statuses: mockStatusMgr, + retryConfig: testCfg, + newRunner: factory.factory(), + } + + err := manager.RunWorkload(context.Background(), &runner.RunConfig{BaseName: "retry-test"}) + + if tt.expectError { + require.Error(t, err) + if tt.errorContains != "" { + assert.Contains(t, err.Error(), tt.errorContains) + } + } else { + require.NoError(t, err) + } + assert.Equal(t, tt.expectedAttempts, factory.callCount, "unexpected number of run attempts") + }) + } +} + func TestDefaultManager_validateSecretParameters(t *testing.T) { t.Parallel() diff --git a/pkg/workloads/mocks/mock_manager.go b/pkg/workloads/mocks/mock_manager.go index 7480792951..f0e8c005b7 100644 --- a/pkg/workloads/mocks/mock_manager.go +++ b/pkg/workloads/mocks/mock_manager.go @@ -1,9 +1,9 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: manager.go +// Source: github.com/stacklok/toolhive/pkg/workloads (interfaces: Manager) // // Generated by this command: // -// mockgen -destination=mocks/mock_manager.go -package=mocks -source=manager.go Manager +// mockgen -destination=mocks/mock_manager.go -package=mocks github.com/stacklok/toolhive/pkg/workloads Manager // // Package mocks is a generated GoMock package.