Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 96 additions & 8 deletions pkg/workloads/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ type CompletionFunc func() error
// NOTE: This interface may be split up in future PRs, in particular, operations
// which are only relevant to the CLI/API use case will be split out.
//
//go:generate mockgen -destination=mocks/mock_manager.go -package=mocks -source=manager.go Manager
//go:generate mockgen -destination=mocks/mock_manager.go -package=mocks github.com/stacklok/toolhive/pkg/workloads Manager
type Manager interface {
// GetWorkload retrieves details of the named workload including its status.
GetWorkload(ctx context.Context, workloadName string) (core.Workload, error)
Expand Down Expand Up @@ -97,6 +97,62 @@ type DefaultManager struct {
runtime rt.Runtime
statuses statuses.StatusManager
configProvider config.Provider

// retryConfig holds tunable parameters for the RunWorkload retry loop.
// Production callers leave this nil so RunWorkload uses defaultRetryConfig();
// tests inject smaller values for fast execution.
retryConfig *retryConfig

// newRunner is the factory used by RunWorkload to construct the per-attempt
// runner. Production callers leave this nil so RunWorkload falls back to
// runner.NewRunner; tests inject a mock factory.
newRunner mcpRunnerFactory
}

// mcpRunner is the subset of *runner.Runner that RunWorkload's retry loop
// depends on. It exists only to make the retry loop unit-testable; it is
// not part of the workloads package's public API.
type mcpRunner interface {
Run(ctx context.Context) error
}

// mcpRunnerFactory constructs an mcpRunner for one attempt of the retry loop.
type mcpRunnerFactory func(*runner.RunConfig, statuses.StatusManager) mcpRunner

// retryConfig bundles the RunWorkload retry loop's tunable parameters.
type retryConfig struct {
maxRetries int
initialDelay time.Duration
maxBackoff time.Duration
stableRunThreshold time.Duration
}

// defaultRetryConfig returns the production retry parameters.
func defaultRetryConfig() retryConfig {
return retryConfig{
maxRetries: 10,
initialDelay: 5 * time.Second,
maxBackoff: 60 * time.Second,
stableRunThreshold: 30 * time.Second,
}
}

// retryConfigOrDefault returns the manager's retryConfig if set, otherwise defaults.
func (d *DefaultManager) retryConfigOrDefault() retryConfig {
if d.retryConfig == nil {
return defaultRetryConfig()
}
return *d.retryConfig
}

// newRunnerOrDefault returns the manager's runner factory if set, otherwise wraps runner.NewRunner.
func (d *DefaultManager) newRunnerOrDefault() mcpRunnerFactory {
if d.newRunner != nil {
return d.newRunner
}
return func(rc *runner.RunConfig, sm statuses.StatusManager) mcpRunner {
return runner.NewRunner(rc, sm)
}
}

// ErrWorkloadNotRunning is returned when a container cannot be found by name.
Expand Down Expand Up @@ -416,8 +472,10 @@ func (d *DefaultManager) RunWorkload(ctx context.Context, runConfig *runner.RunC
}

// Retry loop with exponential backoff for container restarts
maxRetries := 10 // Allow many retries for transient issues
retryDelay := 5 * time.Second
cfg := d.retryConfigOrDefault()
maxRetries := cfg.maxRetries
newRunner := d.newRunnerOrDefault()
retryDelay := cfg.initialDelay

for attempt := 1; attempt <= maxRetries; attempt++ {
if attempt > 1 {
Expand All @@ -426,19 +484,22 @@ func (d *DefaultManager) RunWorkload(ctx context.Context, runConfig *runner.RunC

// Exponential backoff: 5s, 10s, 20s, 40s, 60s (capped)
retryDelay *= 2
if retryDelay > 60*time.Second {
retryDelay = 60 * time.Second
if retryDelay > cfg.maxBackoff {
retryDelay = cfg.maxBackoff
}
}

mcpRunner := runner.NewRunner(runConfig, d.statuses)
mcpRunner := newRunner(runConfig, d.statuses)
runStartTime := time.Now()
err := mcpRunner.Run(ctx)
runDuration := time.Since(runStartTime)

if err != nil {
// Check if this is a "container exited, restart needed" error
if errors.Is(err, runner.ErrContainerExitedRestartNeeded) {
slog.Warn("workload exited unexpectedly, restarting",
"workload", runConfig.BaseName, "attempt", attempt, "maxRetries", maxRetries)
"workload", runConfig.BaseName, "attempt", attempt, "maxRetries", maxRetries,
"runDuration", runDuration)

// Remove from client config so clients notice the restart
clientManager, clientErr := client.NewManager(ctx)
Expand All @@ -449,17 +510,44 @@ func (d *DefaultManager) RunWorkload(ctx context.Context, runConfig *runner.RunC
}
}

// A "stable" run reached at least stableRunThreshold of runtime before
// failing — long enough that we treat the next attempt as a fresh
// failure rather than another tick of the current retry sequence.
stable := runDuration >= cfg.stableRunThreshold

// Set status to starting (since we're restarting)
statusReason := "Container exited, restarting"
if stable {
statusReason = "Container exited after stable run, restarting"
}
statusErr := d.statuses.SetWorkloadStatus(
ctx,
runConfig.BaseName,
rt.WorkloadStatusStarting,
"Container exited, restarting",
statusReason,
)
if statusErr != nil {
slog.Warn("failed to set workload status to starting", "workload", runConfig.BaseName, "error", statusErr)
}

// If the workload ran past the stable threshold, reset the retry
// counter. Without this reset, sustained external disturbances (a
// flapping container runtime, host sleep/wake) that repeatedly kill a
// healthy container exhaust the retry budget across many *successful*
// short runs, and the manager eventually gives up on a workload that
// would have kept recovering once the disturbance ended.
if stable {
slog.Debug("resetting retry counter after stable run",
"workload", runConfig.BaseName, "duration", runDuration)
// Setting attempt to 0 means the for-clause's attempt++ on `continue`
// lands at 1, so the `if attempt > 1` sleep block is skipped and the
// next run starts immediately. retryDelay is reset so that any later
// failures within this fresh sequence use the initial backoff.
attempt = 0
retryDelay = cfg.initialDelay
continue
}

// If we haven't exhausted retries, continue the loop
if attempt < maxRetries {
continue
Expand Down
153 changes: 153 additions & 0 deletions pkg/workloads/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
runtimeMocks "github.com/stacklok/toolhive/pkg/container/runtime/mocks"
"github.com/stacklok/toolhive/pkg/core"
"github.com/stacklok/toolhive/pkg/runner"
"github.com/stacklok/toolhive/pkg/workloads/statuses"
statusMocks "github.com/stacklok/toolhive/pkg/workloads/statuses/mocks"
)

Expand Down Expand Up @@ -1158,6 +1159,158 @@ func TestDefaultManager_RunWorkload(t *testing.T) {
}
}

// fakeRunOutcome describes a single attempt's behavior for fakeRunnerFactory.
type fakeRunOutcome struct {
duration time.Duration
err error
}

// fakeRunner implements mcpRunner for retry-loop tests by sleeping a
// configured duration before returning a configured error.
type fakeRunner struct {
duration time.Duration
err error
}

func (f *fakeRunner) Run(ctx context.Context) error {
if f.duration > 0 {
select {
case <-time.After(f.duration):
case <-ctx.Done():
return ctx.Err()
}
}
return f.err
}

// fakeRunnerFactory replays a sequence of outcomes across successive calls.
// Calls beyond the configured sequence return ErrContainerExitedRestartNeeded
// with no delay so the test fails fast if the loop runs longer than expected.
type fakeRunnerFactory struct {
outcomes []fakeRunOutcome
callCount int
}

func (f *fakeRunnerFactory) factory() mcpRunnerFactory {
return func(_ *runner.RunConfig, _ statuses.StatusManager) mcpRunner {
idx := f.callCount
f.callCount++
if idx >= len(f.outcomes) {
return &fakeRunner{err: runner.ErrContainerExitedRestartNeeded}
}
return &fakeRunner{duration: f.outcomes[idx].duration, err: f.outcomes[idx].err}
}
}

func TestDefaultManager_RunWorkload_RetryCounterReset(t *testing.T) {
t.Parallel()

// Test thresholds: short enough to keep the test fast, but with a wide
// margin between "short" and "stable" run durations so scheduler jitter
// on a busy CI host does not flip a "short" run to "stable". Production
// thresholds are an order of magnitude larger.
testCfg := &retryConfig{
maxRetries: 3,
initialDelay: 1 * time.Millisecond,
maxBackoff: 5 * time.Millisecond,
stableRunThreshold: 50 * time.Millisecond,
}

short := fakeRunOutcome{duration: 5 * time.Millisecond, err: runner.ErrContainerExitedRestartNeeded}
stable := fakeRunOutcome{duration: 200 * time.Millisecond, err: runner.ErrContainerExitedRestartNeeded}
atThreshold := fakeRunOutcome{duration: 50 * time.Millisecond, err: runner.ErrContainerExitedRestartNeeded}

tests := []struct {
name string
outcomes []fakeRunOutcome
expectError bool
errorContains string
expectedAttempts int
}{
{
name: "all short runs exhaust the retry budget",
outcomes: []fakeRunOutcome{short, short, short},
expectError: true,
errorContains: "container restart failed after 3 attempts",
expectedAttempts: 3,
},
{
name: "single stable run at start resets counter, extends budget by one cycle",
outcomes: []fakeRunOutcome{stable, short, short, short},
expectError: true,
errorContains: "container restart failed after 3 attempts",
expectedAttempts: 4,
},
{
name: "stable run mid-sequence resets counter, granting a full fresh budget",
outcomes: []fakeRunOutcome{short, stable, short, short, short},
expectError: true,
errorContains: "container restart failed after 3 attempts",
expectedAttempts: 5,
},
{
name: "run exactly at threshold counts as stable and resets counter",
outcomes: []fakeRunOutcome{atThreshold, short, short, short},
expectError: true,
errorContains: "container restart failed after 3 attempts",
expectedAttempts: 4,
},
{
name: "stable run followed by clean shutdown returns success",
outcomes: []fakeRunOutcome{stable, {duration: 1 * time.Millisecond, err: nil}},
expectError: false,
expectedAttempts: 2,
},
{
name: "non-restart error bails immediately without using the retry budget",
outcomes: []fakeRunOutcome{{duration: 1 * time.Millisecond, err: errors.New("fatal config error")}},
expectError: true,
errorContains: "fatal config error",
expectedAttempts: 1,
},
{
name: "successful run returns without retrying",
outcomes: []fakeRunOutcome{{duration: 1 * time.Millisecond, err: nil}},
expectError: false,
expectedAttempts: 1,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
t.Parallel()

ctrl := gomock.NewController(t)
defer ctrl.Finish()

mockStatusMgr := statusMocks.NewMockStatusManager(ctrl)
mockStatusMgr.EXPECT().
SetWorkloadStatus(gomock.Any(), "retry-test", gomock.Any(), gomock.Any()).
Return(nil).
AnyTimes()

factory := &fakeRunnerFactory{outcomes: tt.outcomes}
manager := &DefaultManager{
statuses: mockStatusMgr,
retryConfig: testCfg,
newRunner: factory.factory(),
}

err := manager.RunWorkload(context.Background(), &runner.RunConfig{BaseName: "retry-test"})

if tt.expectError {
require.Error(t, err)
if tt.errorContains != "" {
assert.Contains(t, err.Error(), tt.errorContains)
}
} else {
require.NoError(t, err)
}
assert.Equal(t, tt.expectedAttempts, factory.callCount, "unexpected number of run attempts")
})
}
}

func TestDefaultManager_validateSecretParameters(t *testing.T) {
t.Parallel()

Expand Down
4 changes: 2 additions & 2 deletions pkg/workloads/mocks/mock_manager.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading