From 8e7744f79b64fee8ce8b122af27afd6472d2e24f Mon Sep 17 00:00:00 2001 From: Jon Langevin Date: Wed, 3 Jun 2026 00:15:04 -0400 Subject: [PATCH 1/2] feat(sandbox,module): ArgoEphemeralRunner + exec_env: ephemeral (infra-admin P3c PR9/12) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds ArgoEphemeralRunner, a sandbox.SandboxRunner that executes a one-off command as an Argo Workflow on Kubernetes, wired as exec_env: ephemeral on step.sandbox_exec. Task 17 — ArgoEphemeralRunner (module/argo_ephemeral_runner.go): - argoSubmitter interface (SubmitWorkflow/WorkflowStatus/WorkflowLogs, all ctx-aware) satisfied by *ArgoWorkflowsModule; tests inject a fake. - buildSpec emits a single-container Workflow (entrypoint main); workflow names derive from a module-global atomic counter (no time/rand). - Status->exit-code mapping: Succeeded->0, Failed/Error->1 (documented Argo limitation: no per-container exit code from the status API). - secret:// env refs passed through UNRESOLVED (k8s secretKeyRef resolves at pod launch, ADR 0017). - Poll loop selects on ctx.Done() AND threads ctx into every submitter call so an in-flight HTTP request aborts on cancel (not just between ticks). - TTL: spec.TTLSecondsAfterFinished (default 300s) -> ttlStrategy. secondsAfterCompletion so the Argo controller GCs completed runs (no extra API call, prevents namespace accumulation). - Poll interval is injectable (default 2s; tests use 1ms). Task 18 — factory wiring (module/execenv_factory.go): - exec_env: ephemeral -> resolveEphemeralRunner: explicit argo_module name, or auto-detect the sole *ArgoWorkflowsModule (clear error on 0 or >1). - argo_module config threaded from step.sandbox_exec; resolveSandboxRunner takes a plain argoModuleName string (not variadic). - exec_env schema options -> [local-docker, ephemeral] + argo_module field in step_schema_builtins.go and module_schema.go; golden regenerated. ctx propagation (review fix): threaded ctx through (*ArgoWorkflowsModule). SubmitWorkflow/WorkflowStatus/WorkflowLogs/DeleteWorkflow/ListWorkflows -> the argoBackend methods -> doRequest, so the real backend honors cancellation/ deadline mid-HTTP. All step.argo_* Execute callers now pass their ctx. Tests: status->exit-code, spec shape, TTL (spec + rendered CRD), secret passthrough, ctx-cancel between ticks, ctx-cancel during in-flight status, WorkflowStatus error propagation, submit error, monotonic names, default poll-interval fallback; factory: ephemeral with/without/ambiguous argo module. Gate: golangci-lint cache clean + GOWORK=off go build ./... + go test ./... all exit 0; golangci-lint run 0 issues. Co-Authored-By: Claude Opus 4.8 (1M context) --- module/argo_ephemeral_runner.go | 212 ++++++++++ module/argo_ephemeral_runner_test.go | 456 +++++++++++++++++++++ module/argo_workflows.go | 94 +++-- module/argo_workflows_test.go | 28 +- module/execenv_factory.go | 65 ++- module/execenv_factory_test.go | 97 ++++- module/pipeline_step_argo.go | 20 +- module/pipeline_step_sandbox_exec.go | 14 +- schema/module_schema.go | 3 +- schema/step_schema_builtins.go | 3 +- schema/testdata/editor-schemas.golden.json | 11 +- 11 files changed, 920 insertions(+), 83 deletions(-) create mode 100644 module/argo_ephemeral_runner.go create mode 100644 module/argo_ephemeral_runner_test.go diff --git a/module/argo_ephemeral_runner.go b/module/argo_ephemeral_runner.go new file mode 100644 index 000000000..e2c5289ef --- /dev/null +++ b/module/argo_ephemeral_runner.go @@ -0,0 +1,212 @@ +package module + +import ( + "context" + "fmt" + "strings" + "sync/atomic" + "time" + + "github.com/GoCodeAlone/workflow/sandbox" +) + +// argoSubmitter is the minimal interface satisfied by *ArgoWorkflowsModule that +// the ephemeral runner depends on. Keeping it narrow allows tests to inject a +// fake without pulling in the full module struct. +// +// Every method takes a context so the underlying HTTP call (in the real backend) +// honors cancellation/deadline mid-flight — not just between the runner's poll +// ticks. +type argoSubmitter interface { + SubmitWorkflow(ctx context.Context, spec *ArgoWorkflowSpec) (string, error) + WorkflowStatus(ctx context.Context, workflowName string) (string, error) + WorkflowLogs(ctx context.Context, workflowName string) ([]string, error) +} + +// Compile-time assertion: *ArgoWorkflowsModule satisfies argoSubmitter. +var _ argoSubmitter = (*ArgoWorkflowsModule)(nil) + +// argoEphemeralCounter is a module-level monotonic counter used to generate +// deterministic, collision-resistant workflow names without time or random +// sources. It is safe for concurrent use. +var argoEphemeralCounter atomic.Uint64 + +// argoTerminalStatuses contains the Argo Workflows status phase strings that +// indicate a workflow has reached a terminal state. +// +// Source: Argo Workflows status.phase values from the Argo Workflows API +// (argoproj.io/v1alpha1 Workflow .status.phase): +// - "Succeeded" — all steps completed successfully. +// - "Failed" — one or more steps failed (exit code non-zero or assertion error). +// - "Error" — the workflow encountered an infrastructure / controller error +// (e.g. pod eviction, OOM, missing image). Distinct from "Failed". +var argoTerminalStatuses = map[string]bool{ + "Succeeded": true, + "Failed": true, + "Error": true, +} + +// argoExecPollInterval is the default poll interval for WorkflowStatus checks +// during Exec. It uses a constant duration to avoid time.Now / random sources. +// In production the Argo controller typically updates status within seconds. +// Tests inject a much shorter interval via newArgoEphemeralRunner. +const argoExecPollInterval = 2 * time.Second + +// argoEphemeralTTLSeconds is the default TTL applied to ephemeral workflows so +// the Argo controller garbage-collects completed runs (prevents namespace +// accumulation). Maps to spec.ttlStrategy.secondsAfterCompletion (300s = 5m). +const argoEphemeralTTLSeconds = 300 + +// ArgoEphemeralRunner implements sandbox.SandboxRunner by submitting a +// one-off Argo Workflow on Kubernetes and polling until it reaches a terminal +// status. It is wired as exec_env: ephemeral in step.sandbox_exec. +// +// Exit-code limitation: Argo exposes a workflow-level status phase, not the +// individual container exit code. ArgoEphemeralRunner maps: +// - "Succeeded" → ExitCode 0 +// - "Failed" / "Error" → ExitCode 1 +// +// Fine-grained exit codes (e.g. 2, 127) are not available from the Argo status +// API and cannot be recovered without instrumenting the workflow template to +// capture them. This is documented as a known limitation (ADR 0020). +// +// Secret refs: env values may carry "secret://" references. ArgoEphemeralRunner +// does NOT resolve them engine-side; it passes them through as-is into the Argo +// Workflow spec. Production deployments are expected to resolve secret refs via +// Kubernetes secretKeyRef / projected volumes at pod-launch time (ADR 0017). +// The engine-side secret:// string is intentionally preserved so the k8s +// admission/mutation webhook or a sidecar can substitute the real value. +type ArgoEphemeralRunner struct { + submitter argoSubmitter + namespace string + cfg sandbox.SandboxConfig + pollInterval time.Duration +} + +// Compile-time assertion: *ArgoEphemeralRunner implements sandbox.SandboxRunner. +var _ sandbox.SandboxRunner = (*ArgoEphemeralRunner)(nil) + +// newArgoEphemeralRunner constructs an ArgoEphemeralRunner. +// namespace is the Kubernetes namespace where Argo Workflows are submitted. +// cfg carries the image, env, and profile for this execution. +// pollInterval sets the WorkflowStatus poll cadence; a non-positive value +// falls back to the package default (argoExecPollInterval). Tests pass a small +// interval (e.g. 1ms) so status polling does not dominate test runtime. +func newArgoEphemeralRunner(submitter argoSubmitter, namespace string, cfg sandbox.SandboxConfig, pollInterval time.Duration) *ArgoEphemeralRunner { + if pollInterval <= 0 { + pollInterval = argoExecPollInterval + } + return &ArgoEphemeralRunner{ + submitter: submitter, + namespace: namespace, + cfg: cfg, + pollInterval: pollInterval, + } +} + +// Exec submits a single-container Argo Workflow for cmd, polls until terminal, +// then returns the combined log output as Stdout and maps Argo status to ExitCode. +// +// Workflow name: derived as "ephemeral-exec-" so that names +// are unique per process lifetime without relying on time or random sources. +// The counter is module-global and thread-safe (atomic.Uint64). +// +// ctx cancellation: ctx is threaded into every SubmitWorkflow/WorkflowStatus/ +// WorkflowLogs call so an in-flight HTTP request aborts on cancellation, and a +// select on ctx.Done() is also checked between poll ticks. On cancellation +// ctx.Err() is returned promptly. +func (r *ArgoEphemeralRunner) Exec(ctx context.Context, cmd []string) (*sandbox.ExecResult, error) { + seq := argoEphemeralCounter.Add(1) + wfName := fmt.Sprintf("ephemeral-exec-%d", seq) + + spec := r.buildSpec(wfName, cmd) + + runName, err := r.submitter.SubmitWorkflow(ctx, spec) + if err != nil { + return nil, fmt.Errorf("argo ephemeral runner: submit workflow: %w", err) + } + + // Poll until the workflow reaches a terminal status, respecting ctx cancellation. + ticker := time.NewTicker(r.pollInterval) + defer ticker.Stop() + + var finalStatus string + for { + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-ticker.C: + status, err := r.submitter.WorkflowStatus(ctx, runName) + if err != nil { + return nil, fmt.Errorf("argo ephemeral runner: poll workflow status: %w", err) + } + if argoTerminalStatuses[status] { + finalStatus = status + goto done + } + } + } + +done: + // Fetch logs and join into a single Stdout string. + lines, err := r.submitter.WorkflowLogs(ctx, runName) + if err != nil { + // Non-fatal: return partial result with empty stdout and log the issue. + lines = []string{fmt.Sprintf("[argo ephemeral runner] warning: could not retrieve logs: %v", err)} + } + stdout := strings.Join(lines, "\n") + + // Map Argo status phase to a process-style exit code. + // NOTE: Argo does not expose individual container exit codes via the status + // phase API. "Succeeded" maps to 0; any terminal failure maps to 1. + // Callers requiring fine-grained exit codes must instrument the workflow + // template to capture and surface them (e.g. via output parameters). + // See ArgoEphemeralRunner godoc and ADR 0020 for full discussion. + exitCode := 0 + if finalStatus != "Succeeded" { + exitCode = 1 + } + + return &sandbox.ExecResult{ + ExitCode: exitCode, + Stdout: stdout, + Stderr: "", // Argo log endpoint does not distinguish stdout/stderr streams. + }, nil +} + +// Close is a no-op: ArgoEphemeralRunner holds no persistent connections. +func (r *ArgoEphemeralRunner) Close() error { return nil } + +// buildSpec constructs the ArgoWorkflowSpec for a one-shot command execution. +// The workflow has a single container template named "main" and entrypoint "main". +// +// TTL: a TTLSecondsAfterFinished is set so the Argo controller auto-deletes the +// completed Workflow object (ttlStrategy.secondsAfterCompletion), preventing +// ephemeral-exec-N workflows from accumulating in the namespace. No extra API +// call from the engine is needed. +// +// Env passthrough: env values that carry "secret://" refs are forwarded as-is +// into the Argo Workflow spec. The engine does NOT resolve them engine-side; +// production Kubernetes admission / mutation logic is responsible for substituting +// real values (ADR 0017). See ArgoEphemeralRunner type-level godoc for details. +func (r *ArgoEphemeralRunner) buildSpec(name string, cmd []string) *ArgoWorkflowSpec { + return &ArgoWorkflowSpec{ + APIVersion: "argoproj.io/v1alpha1", + Kind: "Workflow", + Name: name, + Namespace: r.namespace, + Entrypoint: "main", + TTLSecondsAfterFinished: argoEphemeralTTLSeconds, + Templates: []ArgoTemplate{ + { + Name: "main", + Kind: "container", + Container: &ArgoContainer{ + Image: r.cfg.Image, + Command: cmd, + Env: r.cfg.Env, + }, + }, + }, + } +} diff --git a/module/argo_ephemeral_runner_test.go b/module/argo_ephemeral_runner_test.go new file mode 100644 index 000000000..8160f0bf1 --- /dev/null +++ b/module/argo_ephemeral_runner_test.go @@ -0,0 +1,456 @@ +package module + +import ( + "context" + "errors" + "strings" + "testing" + "time" + + "github.com/GoCodeAlone/workflow/sandbox" +) + +// Compile-time assertion: *ArgoEphemeralRunner implements sandbox.SandboxRunner. +var _ sandbox.SandboxRunner = (*ArgoEphemeralRunner)(nil) + +// testPollInterval is a tiny poll cadence so status tests don't pay the 2s +// production interval. The ctx is honored mid-call regardless of this value. +const testPollInterval = time.Millisecond + +// ─── fake argoSubmitter ─────────────────────────────────────────────────────── + +// fakeArgoSubmitter records the last submitted spec and simulates configurable +// status progressions and log returns. All methods take a context (matching the +// argoSubmitter interface) so cancellation can be exercised mid-call. +type fakeArgoSubmitter struct { + // submittedSpec is set when SubmitWorkflow is called. + submittedSpec *ArgoWorkflowSpec + + // statusSequence is the ordered list of status strings returned on + // successive WorkflowStatus calls. Once exhausted it always returns + // the last element (so a terminal status "sticks"). + statusSequence []string + statusIdx int + + // logs is the list of log lines returned by WorkflowLogs. + logs []string + + // submitErr, if non-nil, is returned by SubmitWorkflow. + submitErr error + + // statusErr, if non-nil, is returned by WorkflowStatus. + statusErr error + + // logsErr, if non-nil, is returned by WorkflowLogs. + logsErr error + + // runName is returned by SubmitWorkflow on success. + runName string + + // blockStatusOnCtx, when true, makes WorkflowStatus block until the passed + // ctx is cancelled, then returns ctx.Err(). This simulates an in-flight HTTP + // call that honors ctx cancellation (the real backend's doRequest behavior). + blockStatusOnCtx bool +} + +func (f *fakeArgoSubmitter) SubmitWorkflow(_ context.Context, spec *ArgoWorkflowSpec) (string, error) { + f.submittedSpec = spec + if f.submitErr != nil { + return "", f.submitErr + } + name := f.runName + if name == "" { + name = "fake-run" + } + return name, nil +} + +func (f *fakeArgoSubmitter) WorkflowStatus(ctx context.Context, _ string) (string, error) { + if f.blockStatusOnCtx { + // Simulate an in-flight HTTP request that aborts on ctx cancellation. + <-ctx.Done() + return "", ctx.Err() + } + if f.statusErr != nil { + return "", f.statusErr + } + if len(f.statusSequence) == 0 { + return "Succeeded", nil + } + idx := f.statusIdx + if idx >= len(f.statusSequence) { + idx = len(f.statusSequence) - 1 + } + f.statusIdx++ + return f.statusSequence[idx], nil +} + +func (f *fakeArgoSubmitter) WorkflowLogs(_ context.Context, _ string) ([]string, error) { + if f.logsErr != nil { + return nil, f.logsErr + } + return f.logs, nil +} + +// ─── helper ────────────────────────────────────────────────────────────────── + +// buildTestRunner builds an ArgoEphemeralRunner wired to a fakeArgoSubmitter +// with a 1ms poll interval so status tests run fast. +func buildTestRunner(f *fakeArgoSubmitter, image string, env map[string]string) *ArgoEphemeralRunner { + cfg := sandbox.SandboxConfig{ + Image: image, + Env: env, + } + return newArgoEphemeralRunner(f, "test-ns", cfg, testPollInterval) +} + +// ─── tests ─────────────────────────────────────────────────────────────────── + +// TestArgoEphemeralRunner_Succeeded verifies that a "Succeeded" Argo status maps +// to ExitCode 0 and that log lines are joined into Stdout. +func TestArgoEphemeralRunner_Succeeded(t *testing.T) { + fake := &fakeArgoSubmitter{ + statusSequence: []string{"Succeeded"}, + logs: []string{"hello", "world"}, + } + runner := buildTestRunner(fake, "alpine:3.19", nil) + + result, err := runner.Exec(context.Background(), []string{"echo", "hello"}) + if err != nil { + t.Fatalf("Exec: unexpected error: %v", err) + } + if result.ExitCode != 0 { + t.Errorf("ExitCode: got %d, want 0 for Succeeded status", result.ExitCode) + } + if !strings.Contains(result.Stdout, "hello") { + t.Errorf("Stdout: expected log content, got %q", result.Stdout) + } + if !strings.Contains(result.Stdout, "world") { + t.Errorf("Stdout: expected 'world' in logs, got %q", result.Stdout) + } +} + +// TestArgoEphemeralRunner_Failed verifies that a "Failed" Argo status maps to +// a non-zero ExitCode (1). +func TestArgoEphemeralRunner_Failed(t *testing.T) { + fake := &fakeArgoSubmitter{ + statusSequence: []string{"Failed"}, + logs: []string{"step failed"}, + } + runner := buildTestRunner(fake, "alpine:3.19", nil) + + result, err := runner.Exec(context.Background(), []string{"false"}) + if err != nil { + t.Fatalf("Exec: unexpected error: %v", err) + } + if result.ExitCode == 0 { + t.Errorf("ExitCode: got 0 for Failed status, want non-zero") + } +} + +// TestArgoEphemeralRunner_Error verifies that an "Error" Argo status (infrastructure +// failure) also maps to a non-zero ExitCode. +func TestArgoEphemeralRunner_Error(t *testing.T) { + fake := &fakeArgoSubmitter{ + statusSequence: []string{"Error"}, + logs: []string{"pod evicted"}, + } + runner := buildTestRunner(fake, "alpine:3.19", nil) + + result, err := runner.Exec(context.Background(), []string{"true"}) + if err != nil { + t.Fatalf("Exec: unexpected error: %v", err) + } + if result.ExitCode == 0 { + t.Errorf("ExitCode: got 0 for Error status, want non-zero") + } +} + +// TestArgoEphemeralRunner_SpecShape verifies that the built ArgoWorkflowSpec +// carries exactly one container template with the correct image, command, and env, +// and that a cleanup TTL is set so completed workflows are garbage-collected. +func TestArgoEphemeralRunner_SpecShape(t *testing.T) { + env := map[string]string{"FOO": "bar", "GOPATH": "/go"} + cmd := []string{"/bin/sh", "-c", "echo hello"} + + fake := &fakeArgoSubmitter{ + statusSequence: []string{"Succeeded"}, + logs: []string{"hello"}, + } + runner := buildTestRunner(fake, "alpine:3.19", env) + + if _, err := runner.Exec(context.Background(), cmd); err != nil { + t.Fatalf("Exec: %v", err) + } + + spec := fake.submittedSpec + if spec == nil { + t.Fatal("SubmitWorkflow was not called") + } + + // Exactly one template. + if len(spec.Templates) != 1 { + t.Fatalf("Templates: got %d, want 1", len(spec.Templates)) + } + tpl := spec.Templates[0] + + if tpl.Kind != "container" { + t.Errorf("Template.Kind: got %q, want %q", tpl.Kind, "container") + } + if tpl.Name != "main" { + t.Errorf("Template.Name: got %q, want %q", tpl.Name, "main") + } + if tpl.Container == nil { + t.Fatal("Template.Container is nil") + } + if tpl.Container.Image != "alpine:3.19" { + t.Errorf("Container.Image: got %q, want %q", tpl.Container.Image, "alpine:3.19") + } + if len(tpl.Container.Command) != len(cmd) { + t.Errorf("Container.Command length: got %d, want %d", len(tpl.Container.Command), len(cmd)) + } + for i, c := range cmd { + if tpl.Container.Command[i] != c { + t.Errorf("Container.Command[%d]: got %q, want %q", i, tpl.Container.Command[i], c) + } + } + for k, v := range env { + if tpl.Container.Env[k] != v { + t.Errorf("Container.Env[%q]: got %q, want %q", k, tpl.Container.Env[k], v) + } + } + + // Entrypoint and namespace. + if spec.Entrypoint != "main" { + t.Errorf("Entrypoint: got %q, want %q", spec.Entrypoint, "main") + } + if spec.Namespace != "test-ns" { + t.Errorf("Namespace: got %q, want %q", spec.Namespace, "test-ns") + } + if spec.Kind != "Workflow" { + t.Errorf("Kind: got %q, want %q", spec.Kind, "Workflow") + } + + // Cleanup TTL must be set (prevents namespace accumulation of completed runs). + if spec.TTLSecondsAfterFinished != argoEphemeralTTLSeconds { + t.Errorf("TTLSecondsAfterFinished: got %d, want %d", spec.TTLSecondsAfterFinished, argoEphemeralTTLSeconds) + } +} + +// TestArgoEphemeralRunner_TTLRendersInCRD verifies that the TTL set on the spec +// is rendered into the Argo Workflow CRD as spec.ttlStrategy.secondsAfterCompletion +// (the field the Argo controller honors for auto-deletion). +func TestArgoEphemeralRunner_TTLRendersInCRD(t *testing.T) { + cfg := sandbox.SandboxConfig{Image: "alpine:3.19"} + runner := newArgoEphemeralRunner(&fakeArgoSubmitter{}, "test-ns", cfg, testPollInterval) + + spec := runner.buildSpec("ephemeral-exec-1", []string{"true"}) + crd := argoWorkflowCRD(spec) + + specMap, ok := crd["spec"].(map[string]any) + if !ok { + t.Fatalf("crd[spec] not a map: %T", crd["spec"]) + } + ttlStrategy, ok := specMap["ttlStrategy"].(map[string]any) + if !ok { + t.Fatalf("crd spec.ttlStrategy missing or wrong type: %T", specMap["ttlStrategy"]) + } + if got := ttlStrategy["secondsAfterCompletion"]; got != argoEphemeralTTLSeconds { + t.Errorf("ttlStrategy.secondsAfterCompletion: got %v, want %d", got, argoEphemeralTTLSeconds) + } +} + +// TestArgoEphemeralRunner_SecretRefPassthrough verifies that a "secret://" value +// in the env map is NOT resolved by the runner — it is forwarded as-is to the +// Argo Workflow spec. Production Kubernetes admission/mutation logic is responsible +// for substituting the real value (ADR 0017). +func TestArgoEphemeralRunner_SecretRefPassthrough(t *testing.T) { + const secretRef = "secret://vault/my-token" + env := map[string]string{"TOKEN": secretRef} + + fake := &fakeArgoSubmitter{ + statusSequence: []string{"Succeeded"}, + logs: nil, + } + runner := buildTestRunner(fake, "alpine:3.19", env) + + if _, err := runner.Exec(context.Background(), []string{"true"}); err != nil { + t.Fatalf("Exec: %v", err) + } + + spec := fake.submittedSpec + if spec == nil { + t.Fatal("SubmitWorkflow was not called") + } + if len(spec.Templates) == 0 || spec.Templates[0].Container == nil { + t.Fatal("container template missing") + } + got := spec.Templates[0].Container.Env["TOKEN"] + if got != secretRef { + t.Errorf("TOKEN env: got %q, want %q (must NOT be resolved engine-side)", got, secretRef) + } +} + +// TestArgoEphemeralRunner_CtxCancelDuringPoll verifies that ctx cancellation +// between poll ticks causes Exec to return ctx.Err() promptly. +func TestArgoEphemeralRunner_CtxCancelDuringPoll(t *testing.T) { + // Return a non-terminal status so the poll loop never exits naturally. + fake := &fakeArgoSubmitter{ + statusSequence: []string{"Running", "Running", "Running"}, + } + cfg := sandbox.SandboxConfig{Image: "alpine:3.19"} + // Larger interval so the ctx.Done() branch (not a status return) wins. + runner := newArgoEphemeralRunner(fake, "test-ns", cfg, 10*time.Millisecond) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Millisecond) + defer cancel() + + _, err := runner.Exec(ctx, []string{"sleep", "9999"}) + if err == nil { + t.Fatal("expected error on context cancellation, got nil") + } + if !errors.Is(err, context.DeadlineExceeded) && !errors.Is(err, context.Canceled) { + t.Errorf("expected context error, got: %v", err) + } +} + +// TestArgoEphemeralRunner_CtxCancelDuringInFlightStatus verifies that ctx +// cancellation while a WorkflowStatus call is in flight (blocked) is honored: +// the runner threads ctx into the submitter call, so cancellation aborts the +// in-flight request rather than waiting out a client timeout. The fake's +// WorkflowStatus blocks on ctx.Done() to model the real backend's HTTP behavior. +func TestArgoEphemeralRunner_CtxCancelDuringInFlightStatus(t *testing.T) { + fake := &fakeArgoSubmitter{ + blockStatusOnCtx: true, + } + cfg := sandbox.SandboxConfig{Image: "alpine:3.19"} + // Long poll interval: the FIRST tick fires quickly, then the status call + // blocks until ctx is cancelled. This proves ctx reaches the submitter. + runner := newArgoEphemeralRunner(fake, "test-ns", cfg, time.Millisecond) + + ctx, cancel := context.WithCancel(context.Background()) + + done := make(chan error, 1) + go func() { + _, err := runner.Exec(ctx, []string{"true"}) + done <- err + }() + + // Give the runner time to submit, tick once, and enter the blocking status call. + time.Sleep(20 * time.Millisecond) + cancel() + + select { + case err := <-done: + if err == nil { + t.Fatal("expected ctx error, got nil") + } + if !errors.Is(err, context.Canceled) { + t.Errorf("expected context.Canceled, got: %v", err) + } + case <-time.After(2 * time.Second): + t.Fatal("Exec did not return after ctx cancel — in-flight status call ignored ctx") + } +} + +// TestArgoEphemeralRunner_WorkflowStatusError verifies that an error returned by +// WorkflowStatus during polling is propagated as an Exec error. +func TestArgoEphemeralRunner_WorkflowStatusError(t *testing.T) { + fake := &fakeArgoSubmitter{ + statusErr: errors.New("argo status endpoint 503"), + } + runner := buildTestRunner(fake, "alpine:3.19", nil) + + _, err := runner.Exec(context.Background(), []string{"true"}) + if err == nil { + t.Fatal("expected error from WorkflowStatus, got nil") + } + if !strings.Contains(err.Error(), "argo status endpoint 503") { + t.Errorf("error should contain status error message, got: %v", err) + } +} + +// TestArgoEphemeralRunner_SubmitError verifies that a SubmitWorkflow error is +// propagated as an Exec error. +func TestArgoEphemeralRunner_SubmitError(t *testing.T) { + fake := &fakeArgoSubmitter{ + submitErr: errors.New("argo server unavailable"), + } + runner := buildTestRunner(fake, "alpine:3.19", nil) + + _, err := runner.Exec(context.Background(), []string{"true"}) + if err == nil { + t.Fatal("expected error from SubmitWorkflow, got nil") + } + if !strings.Contains(err.Error(), "argo server unavailable") { + t.Errorf("error should contain submit error message, got: %v", err) + } +} + +// TestArgoEphemeralRunner_Close_NoOp verifies that Close always returns nil. +func TestArgoEphemeralRunner_Close_NoOp(t *testing.T) { + runner := buildTestRunner(&fakeArgoSubmitter{}, "alpine:3.19", nil) + if err := runner.Close(); err != nil { + t.Errorf("Close: expected nil, got %v", err) + } +} + +// TestArgoEphemeralRunner_DefaultPollInterval verifies the constructor falls back +// to the package default when given a non-positive interval. +func TestArgoEphemeralRunner_DefaultPollInterval(t *testing.T) { + cfg := sandbox.SandboxConfig{Image: "alpine:3.19"} + runner := newArgoEphemeralRunner(&fakeArgoSubmitter{}, "test-ns", cfg, 0) + if runner.pollInterval != argoExecPollInterval { + t.Errorf("pollInterval: got %v, want default %v", runner.pollInterval, argoExecPollInterval) + } +} + +// TestArgoEphemeralRunner_MonotonicNames verifies that successive Exec calls +// produce distinct workflow names (monotonic counter guarantee). +func TestArgoEphemeralRunner_MonotonicNames(t *testing.T) { + var names []string + fake := &fakeArgoSubmitter{ + statusSequence: []string{"Succeeded"}, + logs: nil, + } + // Override runName via a small wrapper to capture emitted names. + capturing := &capturingSubmitter{inner: fake, names: &names} + cfg := sandbox.SandboxConfig{Image: "alpine:3.19"} + runner := newArgoEphemeralRunner(capturing, "test-ns", cfg, testPollInterval) + + for i := 0; i < 3; i++ { + if _, err := runner.Exec(context.Background(), []string{"true"}); err != nil { + t.Fatalf("Exec[%d]: %v", i, err) + } + } + if len(names) != 3 { + t.Fatalf("expected 3 submitted workflow names, got %d", len(names)) + } + seen := map[string]bool{} + for _, n := range names { + if seen[n] { + t.Errorf("duplicate workflow name %q in successive Exec calls", n) + } + seen[n] = true + } +} + +// capturingSubmitter wraps a fakeArgoSubmitter and records all SubmitWorkflow +// spec names (the Name field in the spec, not the returned runName). +type capturingSubmitter struct { + inner *fakeArgoSubmitter + names *[]string +} + +func (c *capturingSubmitter) SubmitWorkflow(ctx context.Context, spec *ArgoWorkflowSpec) (string, error) { + *c.names = append(*c.names, spec.Name) + return c.inner.SubmitWorkflow(ctx, spec) +} + +func (c *capturingSubmitter) WorkflowStatus(ctx context.Context, name string) (string, error) { + return c.inner.WorkflowStatus(ctx, name) +} + +func (c *capturingSubmitter) WorkflowLogs(ctx context.Context, name string) ([]string, error) { + return c.inner.WorkflowLogs(ctx, name) +} diff --git a/module/argo_workflows.go b/module/argo_workflows.go index 8c1b56eb6..09ec4fc01 100644 --- a/module/argo_workflows.go +++ b/module/argo_workflows.go @@ -31,6 +31,13 @@ type ArgoWorkflowSpec struct { Entrypoint string `json:"entrypoint"` Templates []ArgoTemplate `json:"templates"` Arguments map[string]string `json:"arguments,omitempty"` + + // TTLSecondsAfterFinished, when > 0, instructs the Argo controller to + // auto-delete the Workflow object this many seconds after it completes + // (succeeds, fails, or errors). It maps to spec.ttlStrategy.secondsAfterCompletion + // in the Argo Workflow CRD. Used by the ephemeral runner to prevent + // completed one-off workflows from accumulating in the namespace. + TTLSecondsAfterFinished int `json:"ttlSecondsAfterFinished,omitempty"` } // ArgoTemplate is a single template (DAG or step list) within an Argo Workflow. @@ -70,16 +77,20 @@ type ArgoWorkflowsModule struct { } // argoBackend is the internal interface for Argo Workflows backends. +// +// The workflow run methods (submit/status/logs/delete/list) take a context so +// in-flight HTTP calls in the real backend honor cancellation/deadline rather +// than only between caller poll ticks (see argoRealBackend.doRequest). type argoBackend interface { plan(m *ArgoWorkflowsModule) (*PlatformPlan, error) apply(m *ArgoWorkflowsModule) (*PlatformResult, error) status(m *ArgoWorkflowsModule) (*ArgoWorkflowState, error) destroy(m *ArgoWorkflowsModule) error - submitWorkflow(m *ArgoWorkflowsModule, spec *ArgoWorkflowSpec) (string, error) - workflowStatus(m *ArgoWorkflowsModule, workflowName string) (string, error) - workflowLogs(m *ArgoWorkflowsModule, workflowName string) ([]string, error) - deleteWorkflow(m *ArgoWorkflowsModule, workflowName string) error - listWorkflows(m *ArgoWorkflowsModule, labelSelector string) ([]string, error) + submitWorkflow(ctx context.Context, m *ArgoWorkflowsModule, spec *ArgoWorkflowSpec) (string, error) + workflowStatus(ctx context.Context, m *ArgoWorkflowsModule, workflowName string) (string, error) + workflowLogs(ctx context.Context, m *ArgoWorkflowsModule, workflowName string) ([]string, error) + deleteWorkflow(ctx context.Context, m *ArgoWorkflowsModule, workflowName string) error + listWorkflows(ctx context.Context, m *ArgoWorkflowsModule, labelSelector string) ([]string, error) } // NewArgoWorkflowsModule creates a new ArgoWorkflowsModule. @@ -183,29 +194,34 @@ func (m *ArgoWorkflowsModule) Destroy() error { } // SubmitWorkflow translates a pipeline config into an Argo Workflow spec and submits it. -// Returns the workflow run name. -func (m *ArgoWorkflowsModule) SubmitWorkflow(spec *ArgoWorkflowSpec) (string, error) { - return m.backend.submitWorkflow(m, spec) +// Returns the workflow run name. The context bounds the underlying HTTP call in +// the real backend so cancellation/deadline aborts an in-flight request. +func (m *ArgoWorkflowsModule) SubmitWorkflow(ctx context.Context, spec *ArgoWorkflowSpec) (string, error) { + return m.backend.submitWorkflow(ctx, m, spec) } -// WorkflowStatus returns the execution status of a workflow run. -func (m *ArgoWorkflowsModule) WorkflowStatus(workflowName string) (string, error) { - return m.backend.workflowStatus(m, workflowName) +// WorkflowStatus returns the execution status of a workflow run. The context +// bounds the underlying HTTP call in the real backend. +func (m *ArgoWorkflowsModule) WorkflowStatus(ctx context.Context, workflowName string) (string, error) { + return m.backend.workflowStatus(ctx, m, workflowName) } -// WorkflowLogs returns log lines from a workflow run. -func (m *ArgoWorkflowsModule) WorkflowLogs(workflowName string) ([]string, error) { - return m.backend.workflowLogs(m, workflowName) +// WorkflowLogs returns log lines from a workflow run. The context bounds the +// underlying HTTP call in the real backend. +func (m *ArgoWorkflowsModule) WorkflowLogs(ctx context.Context, workflowName string) ([]string, error) { + return m.backend.workflowLogs(ctx, m, workflowName) } -// DeleteWorkflow removes a completed or failed workflow. -func (m *ArgoWorkflowsModule) DeleteWorkflow(workflowName string) error { - return m.backend.deleteWorkflow(m, workflowName) +// DeleteWorkflow removes a completed or failed workflow. The context bounds the +// underlying HTTP call in the real backend. +func (m *ArgoWorkflowsModule) DeleteWorkflow(ctx context.Context, workflowName string) error { + return m.backend.deleteWorkflow(ctx, m, workflowName) } -// ListWorkflows lists workflows matching the optional label selector. -func (m *ArgoWorkflowsModule) ListWorkflows(labelSelector string) ([]string, error) { - return m.backend.listWorkflows(m, labelSelector) +// ListWorkflows lists workflows matching the optional label selector. The context +// bounds the underlying HTTP call in the real backend. +func (m *ArgoWorkflowsModule) ListWorkflows(ctx context.Context, labelSelector string) ([]string, error) { + return m.backend.listWorkflows(ctx, m, labelSelector) } // namespace returns the configured namespace, falling back to "argo". @@ -344,7 +360,7 @@ func (b *argoMockBackend) destroy(m *ArgoWorkflowsModule) error { return nil } -func (b *argoMockBackend) submitWorkflow(m *ArgoWorkflowsModule, spec *ArgoWorkflowSpec) (string, error) { +func (b *argoMockBackend) submitWorkflow(_ context.Context, m *ArgoWorkflowsModule, spec *ArgoWorkflowSpec) (string, error) { b.ensureInit() if m.state.Status != "running" { return "", fmt.Errorf("argo.workflows %q: not running (status=%s)", m.name, m.state.Status) @@ -358,7 +374,7 @@ func (b *argoMockBackend) submitWorkflow(m *ArgoWorkflowsModule, spec *ArgoWorkf return runName, nil } -func (b *argoMockBackend) workflowStatus(m *ArgoWorkflowsModule, workflowName string) (string, error) { +func (b *argoMockBackend) workflowStatus(_ context.Context, m *ArgoWorkflowsModule, workflowName string) (string, error) { b.ensureInit() status, ok := b.workflows[workflowName] if !ok { @@ -372,7 +388,7 @@ func (b *argoMockBackend) workflowStatus(m *ArgoWorkflowsModule, workflowName st return status, nil } -func (b *argoMockBackend) workflowLogs(m *ArgoWorkflowsModule, workflowName string) ([]string, error) { +func (b *argoMockBackend) workflowLogs(_ context.Context, m *ArgoWorkflowsModule, workflowName string) ([]string, error) { b.ensureInit() if _, ok := b.workflows[workflowName]; !ok { return nil, fmt.Errorf("argo.workflows %q: workflow %q not found", m.name, workflowName) @@ -384,7 +400,7 @@ func (b *argoMockBackend) workflowLogs(m *ArgoWorkflowsModule, workflowName stri return lines, nil } -func (b *argoMockBackend) deleteWorkflow(m *ArgoWorkflowsModule, workflowName string) error { +func (b *argoMockBackend) deleteWorkflow(_ context.Context, m *ArgoWorkflowsModule, workflowName string) error { b.ensureInit() if _, ok := b.workflows[workflowName]; !ok { return fmt.Errorf("argo.workflows %q: workflow %q not found", m.name, workflowName) @@ -394,7 +410,7 @@ func (b *argoMockBackend) deleteWorkflow(m *ArgoWorkflowsModule, workflowName st return nil } -func (b *argoMockBackend) listWorkflows(m *ArgoWorkflowsModule, labelSelector string) ([]string, error) { +func (b *argoMockBackend) listWorkflows(_ context.Context, _ *ArgoWorkflowsModule, _ string) ([]string, error) { b.ensureInit() names := make([]string, 0, len(b.workflows)) for name := range b.workflows { @@ -510,7 +526,7 @@ func (b *argoRealBackend) destroy(m *ArgoWorkflowsModule) error { // submitWorkflow submits an Argo Workflow via the REST API. // Returns the server-assigned workflow name. -func (b *argoRealBackend) submitWorkflow(m *ArgoWorkflowsModule, spec *ArgoWorkflowSpec) (string, error) { +func (b *argoRealBackend) submitWorkflow(ctx context.Context, m *ArgoWorkflowsModule, spec *ArgoWorkflowSpec) (string, error) { ns := m.namespace() if spec.Namespace != "" { ns = spec.Namespace @@ -523,7 +539,7 @@ func (b *argoRealBackend) submitWorkflow(m *ArgoWorkflowsModule, spec *ArgoWorkf "workflow": wf, } - data, status, err := b.doRequest(context.Background(), http.MethodPost, + data, status, err := b.doRequest(ctx, http.MethodPost, fmt.Sprintf("/api/v1/workflows/%s", ns), reqBody) if err != nil { return "", fmt.Errorf("argo submit workflow: %w", err) @@ -543,9 +559,9 @@ func (b *argoRealBackend) submitWorkflow(m *ArgoWorkflowsModule, spec *ArgoWorkf return result.Metadata.Name, nil } -func (b *argoRealBackend) workflowStatus(m *ArgoWorkflowsModule, workflowName string) (string, error) { +func (b *argoRealBackend) workflowStatus(ctx context.Context, m *ArgoWorkflowsModule, workflowName string) (string, error) { ns := m.namespace() - data, status, err := b.doRequest(context.Background(), http.MethodGet, + data, status, err := b.doRequest(ctx, http.MethodGet, fmt.Sprintf("/api/v1/workflows/%s/%s", ns, workflowName), nil) if err != nil { return "", fmt.Errorf("argo get workflow status: %w", err) @@ -568,10 +584,10 @@ func (b *argoRealBackend) workflowStatus(m *ArgoWorkflowsModule, workflowName st return result.Status.Phase, nil } -func (b *argoRealBackend) workflowLogs(m *ArgoWorkflowsModule, workflowName string) ([]string, error) { +func (b *argoRealBackend) workflowLogs(ctx context.Context, m *ArgoWorkflowsModule, workflowName string) ([]string, error) { ns := m.namespace() // Use the Argo log endpoint: GET /api/v1/workflows/{ns}/{name}/log?logOptions.container=main - data, status, err := b.doRequest(context.Background(), http.MethodGet, + data, status, err := b.doRequest(ctx, http.MethodGet, fmt.Sprintf("/api/v1/workflows/%s/%s/log?logOptions.container=main&grep=&selector=", ns, workflowName), nil) if err != nil { return nil, fmt.Errorf("argo get workflow logs: %w", err) @@ -601,9 +617,9 @@ func (b *argoRealBackend) workflowLogs(m *ArgoWorkflowsModule, workflowName stri return lines, nil } -func (b *argoRealBackend) deleteWorkflow(m *ArgoWorkflowsModule, workflowName string) error { +func (b *argoRealBackend) deleteWorkflow(ctx context.Context, m *ArgoWorkflowsModule, workflowName string) error { ns := m.namespace() - data, status, err := b.doRequest(context.Background(), http.MethodDelete, + data, status, err := b.doRequest(ctx, http.MethodDelete, fmt.Sprintf("/api/v1/workflows/%s/%s", ns, workflowName), nil) if err != nil { return fmt.Errorf("argo delete workflow: %w", err) @@ -617,14 +633,14 @@ func (b *argoRealBackend) deleteWorkflow(m *ArgoWorkflowsModule, workflowName st return nil } -func (b *argoRealBackend) listWorkflows(m *ArgoWorkflowsModule, labelSelector string) ([]string, error) { +func (b *argoRealBackend) listWorkflows(ctx context.Context, m *ArgoWorkflowsModule, labelSelector string) ([]string, error) { ns := m.namespace() path := fmt.Sprintf("/api/v1/workflows/%s", ns) if labelSelector != "" { path += "?listOptions.labelSelector=" + labelSelector } - data, status, err := b.doRequest(context.Background(), http.MethodGet, path, nil) + data, status, err := b.doRequest(ctx, http.MethodGet, path, nil) if err != nil { return nil, fmt.Errorf("argo list workflows: %w", err) } @@ -712,5 +728,13 @@ func argoWorkflowCRD(spec *ArgoWorkflowSpec) map[string]any { wf["spec"].(map[string]any)["arguments"] = map[string]any{"parameters": params} } + // Auto-cleanup: when a TTL is set, emit a ttlStrategy so the Argo controller + // garbage-collects the completed Workflow object (prevents namespace buildup). + if spec.TTLSecondsAfterFinished > 0 { + wf["spec"].(map[string]any)["ttlStrategy"] = map[string]any{ + "secondsAfterCompletion": spec.TTLSecondsAfterFinished, + } + } + return wf } diff --git a/module/argo_workflows_test.go b/module/argo_workflows_test.go index 1341c2299..78a88a0ac 100644 --- a/module/argo_workflows_test.go +++ b/module/argo_workflows_test.go @@ -265,7 +265,7 @@ func TestArgoWorkflows_SubmitStatusLogsDeleteLifecycle(t *testing.T) { {"name": "build", "image": "golang:1.22"}, }) - runName, err := m.SubmitWorkflow(spec) + runName, err := m.SubmitWorkflow(context.Background(), spec) if err != nil { t.Fatalf("SubmitWorkflow: %v", err) } @@ -273,7 +273,7 @@ func TestArgoWorkflows_SubmitStatusLogsDeleteLifecycle(t *testing.T) { t.Fatal("expected non-empty run name") } - status, err := m.WorkflowStatus(runName) + status, err := m.WorkflowStatus(context.Background(), runName) if err != nil { t.Fatalf("WorkflowStatus: %v", err) } @@ -281,7 +281,7 @@ func TestArgoWorkflows_SubmitStatusLogsDeleteLifecycle(t *testing.T) { t.Errorf("expected Succeeded, got %q", status) } - logs, err := m.WorkflowLogs(runName) + logs, err := m.WorkflowLogs(context.Background(), runName) if err != nil { t.Fatalf("WorkflowLogs: %v", err) } @@ -289,11 +289,11 @@ func TestArgoWorkflows_SubmitStatusLogsDeleteLifecycle(t *testing.T) { t.Error("expected at least one log line") } - if err := m.DeleteWorkflow(runName); err != nil { + if err := m.DeleteWorkflow(context.Background(), runName); err != nil { t.Fatalf("DeleteWorkflow: %v", err) } - if _, err := m.WorkflowStatus(runName); err == nil { + if _, err := m.WorkflowStatus(context.Background(), runName); err == nil { t.Error("expected error after deletion, got nil") } } @@ -305,7 +305,7 @@ func TestArgoWorkflows_SubmitRequiresRunning(t *testing.T) { t.Fatalf("Init: %v", err) } spec := module.TranslatePipelineToArgo("wf", "argo", nil) - if _, err := m.SubmitWorkflow(spec); err == nil { + if _, err := m.SubmitWorkflow(context.Background(), spec); err == nil { t.Error("expected error submitting to non-running Argo, got nil") } } @@ -315,12 +315,12 @@ func TestArgoWorkflows_ListWorkflows(t *testing.T) { for _, wfName := range []string{"wf1", "wf2"} { spec := module.TranslatePipelineToArgo(wfName, "argo", nil) - if _, err := m.SubmitWorkflow(spec); err != nil { + if _, err := m.SubmitWorkflow(context.Background(), spec); err != nil { t.Fatalf("SubmitWorkflow %s: %v", wfName, err) } } - workflows, err := m.ListWorkflows("") + workflows, err := m.ListWorkflows(context.Background(), "") if err != nil { t.Fatalf("ListWorkflows: %v", err) } @@ -331,7 +331,7 @@ func TestArgoWorkflows_ListWorkflows(t *testing.T) { func TestArgoWorkflows_DeleteNonexistent(t *testing.T) { _, m := newRunningArgoApp(t) - if err := m.DeleteWorkflow("ghost-workflow"); err == nil { + if err := m.DeleteWorkflow(context.Background(), "ghost-workflow"); err == nil { t.Error("expected error deleting nonexistent workflow, got nil") } } @@ -394,7 +394,7 @@ func TestArgoStatusStep(t *testing.T) { app, m := newRunningArgoApp(t) spec := module.TranslatePipelineToArgo("ci", "argo", nil) - runName, err := m.SubmitWorkflow(spec) + runName, err := m.SubmitWorkflow(context.Background(), spec) if err != nil { t.Fatalf("SubmitWorkflow: %v", err) } @@ -424,7 +424,7 @@ func TestArgoStatusStep_FromContext(t *testing.T) { app, m := newRunningArgoApp(t) spec := module.TranslatePipelineToArgo("ci", "argo", nil) - runName, err := m.SubmitWorkflow(spec) + runName, err := m.SubmitWorkflow(context.Background(), spec) if err != nil { t.Fatalf("SubmitWorkflow: %v", err) } @@ -463,7 +463,7 @@ func TestArgoLogsStep(t *testing.T) { app, m := newRunningArgoApp(t) spec := module.TranslatePipelineToArgo("ci", "argo", []map[string]any{{"name": "build"}}) - runName, err := m.SubmitWorkflow(spec) + runName, err := m.SubmitWorkflow(context.Background(), spec) if err != nil { t.Fatalf("SubmitWorkflow: %v", err) } @@ -490,7 +490,7 @@ func TestArgoDeleteStep(t *testing.T) { app, m := newRunningArgoApp(t) spec := module.TranslatePipelineToArgo("ci", "argo", nil) - runName, err := m.SubmitWorkflow(spec) + runName, err := m.SubmitWorkflow(context.Background(), spec) if err != nil { t.Fatalf("SubmitWorkflow: %v", err) } @@ -517,7 +517,7 @@ func TestArgoListStep(t *testing.T) { app, m := newRunningArgoApp(t) spec := module.TranslatePipelineToArgo("wf", "argo", nil) - if _, err := m.SubmitWorkflow(spec); err != nil { + if _, err := m.SubmitWorkflow(context.Background(), spec); err != nil { t.Fatalf("SubmitWorkflow: %v", err) } diff --git a/module/execenv_factory.go b/module/execenv_factory.go index 49c2d34de..0100d0bb3 100644 --- a/module/execenv_factory.go +++ b/module/execenv_factory.go @@ -15,32 +15,87 @@ import ( // // Supported values for execEnv: // - "" or "local-docker" — local Docker daemon (default). +// - "ephemeral" — one-off Argo Workflow on Kubernetes (PR9). Requires a +// configured argo.workflows module; see resolveEphemeralRunner. // - any registered runner name — dispatches to the named RemoteRunner from the // sandbox.remote_runners registry (PR8). The registry is looked up from the // modular service registry via app. If the name is not registered, a clear // error is returned at runtime (step Execute time). // -// Deferred values: -// - "ephemeral" — ephemeral/cloud runner (PR9) +// Note: remote runner names are not enumerable at schema-definition time because +// they are registered dynamically via sandbox.remote_runners config. // // Validation strategy: the step factory (pipeline_step_sandbox_exec.go) no longer // rejects non-local-docker exec_env values at construction time — named runners are // determined by config at runtime, not build time. Any unresolved name (not in the // registry) returns an error at Execute time, which is the appropriate gate (same as // other service-name references in the pipeline). -func resolveSandboxRunner(ctx context.Context, app modular.Application, execEnv string, cfg sandbox.SandboxConfig) (sandbox.SandboxRunner, error) { +func resolveSandboxRunner(ctx context.Context, app modular.Application, execEnv string, cfg sandbox.SandboxConfig, argoModuleName string) (sandbox.SandboxRunner, error) { switch execEnv { case "", "local-docker": return sandbox.NewLocalDockerRunner(cfg) case "ephemeral": - // TODO(PR9): wire ephemeral/cloud runner here. - return nil, fmt.Errorf("sandbox_exec: exec_env %q not yet configured (deferred to PR9)", execEnv) + // Wire ephemeral runner via Argo Workflows module (PR9). + return resolveEphemeralRunner(app, argoModuleName, cfg) default: // Treat execEnv as a named remote runner. Look it up in the service registry. return resolveNamedRemoteRunner(ctx, app, execEnv, cfg) } } +// resolveEphemeralRunner resolves an ArgoEphemeralRunner for exec_env: ephemeral. +// +// Module resolution: +// - If argoModuleName is non-empty, look it up in the service registry directly +// (it must be a *ArgoWorkflowsModule). +// - If argoModuleName is empty, scan the entire service registry for the SOLE +// *ArgoWorkflowsModule instance. This is the zero-config path: if exactly one +// argo.workflows module is configured, no explicit name is needed. If zero or +// more than one are found, a clear error is returned. +func resolveEphemeralRunner(app modular.Application, argoModuleName string, cfg sandbox.SandboxConfig) (sandbox.SandboxRunner, error) { + if app == nil { + return nil, fmt.Errorf("sandbox_exec: exec_env ephemeral requires an application context (no app registered)") + } + + var argoMod *ArgoWorkflowsModule + + if argoModuleName != "" { + // Explicit name: look it up directly. + svc, ok := app.SvcRegistry()[argoModuleName] + if !ok { + return nil, fmt.Errorf("sandbox_exec: exec_env ephemeral: argo module %q not found in service registry", argoModuleName) + } + m, ok := svc.(*ArgoWorkflowsModule) + if !ok { + return nil, fmt.Errorf("sandbox_exec: exec_env ephemeral: service %q is not an *ArgoWorkflowsModule (got %T)", argoModuleName, svc) + } + argoMod = m + } else { + // Auto-detect: scan for the sole *ArgoWorkflowsModule. + var candidates []*ArgoWorkflowsModule + for _, svc := range app.SvcRegistry() { + if m, ok := svc.(*ArgoWorkflowsModule); ok { + candidates = append(candidates, m) + } + } + switch len(candidates) { + case 0: + return nil, fmt.Errorf("sandbox_exec: exec_env ephemeral requires a configured argo.workflows module (none found in service registry); set argo_module to name one explicitly") + case 1: + argoMod = candidates[0] + default: + names := make([]string, len(candidates)) + for i, m := range candidates { + names[i] = m.Name() + } + return nil, fmt.Errorf("sandbox_exec: exec_env ephemeral: ambiguous argo.workflows module (found %d: %v); set argo_module to select one explicitly", len(candidates), names) + } + } + + // pollInterval 0 → newArgoEphemeralRunner falls back to the package default. + return newArgoEphemeralRunner(argoMod, argoMod.namespace(), cfg, 0), nil +} + // resolveNamedRemoteRunner looks up a RemoteRunnerSpec by name from the // RemoteRunnerRegistry service, resolves the spec's bearer token through the // configured secrets provider, builds a RemoteRunner from the spec, and returns diff --git a/module/execenv_factory_test.go b/module/execenv_factory_test.go index fdf387ad2..1de2ccc90 100644 --- a/module/execenv_factory_test.go +++ b/module/execenv_factory_test.go @@ -15,7 +15,7 @@ func TestExecEnvFactory_DefaultLocalDocker(t *testing.T) { cfg := sandbox.SandboxConfig{Image: "alpine:3.19"} for _, execEnv := range []string{"", "local-docker"} { - runner, err := resolveSandboxRunner(context.Background(), nil, execEnv, cfg) + runner, err := resolveSandboxRunner(context.Background(), nil, execEnv, cfg, "") if err != nil { // Runner creation uses the Docker client env (DOCKER_HOST/TLS); a // failure here is a Docker-availability issue, not an exec_env-routing @@ -30,13 +30,15 @@ func TestExecEnvFactory_DefaultLocalDocker(t *testing.T) { } } -// TestExecEnvFactory_UnknownExecEnv_Error verifies that unknown or deferred -// exec_env values return a clear error rather than silently falling through. +// TestExecEnvFactory_UnknownExecEnv_Error verifies that unknown exec_env values +// return a clear error rather than silently falling through. // // As of PR8, exec_env values other than "" / "local-docker" / "ephemeral" are // treated as named remote runner names. Without an application context (nil app), -// they all return a "no application context" error. The reserved "ephemeral" -// value is still explicitly deferred to PR9 with its own message. +// they all return a "no application context" error. +// +// As of PR9, "ephemeral" is fully wired: with a nil app it returns a clear +// "requires an application context" error (not "deferred to PR9"). func TestExecEnvFactory_UnknownExecEnv_Error(t *testing.T) { cfg := sandbox.SandboxConfig{Image: "alpine:3.19"} @@ -44,8 +46,8 @@ func TestExecEnvFactory_UnknownExecEnv_Error(t *testing.T) { execEnv string errContains string }{ - // "ephemeral" is still explicitly deferred. - {"ephemeral", "not yet configured"}, + // "ephemeral" now returns a clear "no application context" error (PR9 wired). + {"ephemeral", "application context"}, // Named runner names fail with "no application context" when app is nil. {"remote", "not configured"}, {"nope", "not configured"}, @@ -53,7 +55,7 @@ func TestExecEnvFactory_UnknownExecEnv_Error(t *testing.T) { } for _, tt := range tests { - runner, err := resolveSandboxRunner(context.Background(), nil, tt.execEnv, cfg) + runner, err := resolveSandboxRunner(context.Background(), nil, tt.execEnv, cfg, "") if err == nil { t.Errorf("execEnv=%q: expected error, got nil runner=%v", tt.execEnv, runner) if runner != nil { @@ -88,7 +90,7 @@ func TestSandboxExec_ExecEnvAbsent_Unchanged(t *testing.T) { // Confirm the factory resolves it to a local runner without error. cfg := s.buildSandboxConfig() - runner, err := resolveSandboxRunner(context.Background(), s.app, s.execEnv, cfg) + runner, err := resolveSandboxRunner(context.Background(), s.app, s.execEnv, cfg, s.argoModule) if err != nil { t.Skipf("resolveSandboxRunner with empty execEnv: docker client unavailable: %v", err) } @@ -115,7 +117,7 @@ func TestSandboxExec_ExecEnvLocalDocker_ExplicitlySet(t *testing.T) { } cfg := s.buildSandboxConfig() - runner, err := resolveSandboxRunner(context.Background(), s.app, s.execEnv, cfg) + runner, err := resolveSandboxRunner(context.Background(), s.app, s.execEnv, cfg, s.argoModule) if err != nil { t.Skipf("docker client unavailable: %v", err) } @@ -266,7 +268,7 @@ func TestResolveNamedRemoteRunner_SecretTokenBuilds(t *testing.T) { t.Fatalf("RegisterService(registry): %v", err) } - runner, err := resolveSandboxRunner(context.Background(), app, "prod", sandbox.SandboxConfig{Image: "alpine", Profile: "standard"}) + runner, err := resolveSandboxRunner(context.Background(), app, "prod", sandbox.SandboxConfig{Image: "alpine", Profile: "standard"}, "") if err != nil { t.Fatalf("resolveSandboxRunner: %v", err) } @@ -299,8 +301,79 @@ func TestResolveNamedRemoteRunner_SecretTokenNoProvider_Errors(t *testing.T) { t.Fatalf("RegisterService(registry): %v", err) } - _, err := resolveSandboxRunner(context.Background(), app, "prod", sandbox.SandboxConfig{Image: "alpine"}) + _, err := resolveSandboxRunner(context.Background(), app, "prod", sandbox.SandboxConfig{Image: "alpine"}, "") if err == nil { t.Fatal("expected error: secret:// token with no provider must not build a runner") } } + +// ─── ephemeral (PR9) tests ──────────────────────────────────────────────────── + +// TestExecEnvFactory_Ephemeral_WithArgoModule verifies that exec_env: ephemeral +// with a registered *ArgoWorkflowsModule returns a non-nil ArgoEphemeralRunner. +func TestExecEnvFactory_Ephemeral_WithArgoModule(t *testing.T) { + app := NewMockApplication() + + // Register a mock argo.workflows module. + argoMod := NewArgoWorkflowsModule("my-argo", map[string]any{ + "backend": "mock", + "namespace": "argo", + }) + if err := argoMod.Init(app); err != nil { + t.Fatalf("argo module Init: %v", err) + } + + cfg := sandbox.SandboxConfig{Image: "alpine:3.19", Profile: "standard"} + + // Explicit name. + runner, err := resolveSandboxRunner(context.Background(), app, "ephemeral", cfg, "my-argo") + if err != nil { + t.Fatalf("resolveSandboxRunner ephemeral (explicit name): %v", err) + } + if runner == nil { + t.Fatal("expected non-nil ArgoEphemeralRunner") + } + _ = runner.Close() + + // Auto-detect (empty argoModuleName) — exactly one argo module registered. + runner2, err := resolveSandboxRunner(context.Background(), app, "ephemeral", cfg, "") + if err != nil { + t.Fatalf("resolveSandboxRunner ephemeral (auto-detect): %v", err) + } + if runner2 == nil { + t.Fatal("expected non-nil ArgoEphemeralRunner (auto-detect)") + } + _ = runner2.Close() +} + +// TestExecEnvFactory_Ephemeral_NoArgoModule verifies that exec_env: ephemeral +// with no registered argo module returns a clear error. +func TestExecEnvFactory_Ephemeral_NoArgoModule(t *testing.T) { + app := NewMockApplication() + // No argo module registered. + + cfg := sandbox.SandboxConfig{Image: "alpine:3.19"} + _, err := resolveSandboxRunner(context.Background(), app, "ephemeral", cfg, "") + if err == nil { + t.Fatal("expected error when no argo module is registered for ephemeral exec_env") + } + if !strings.Contains(err.Error(), "argo.workflows") { + t.Errorf("expected error to mention argo.workflows, got: %v", err) + } +} + +// TestExecEnvFactory_Ephemeral_ExplicitNameNotFound verifies a clear error when +// the explicitly named argo_module is not in the registry. +func TestExecEnvFactory_Ephemeral_ExplicitNameNotFound(t *testing.T) { + app := NewMockApplication() + // No module named "missing-argo" registered. + + cfg := sandbox.SandboxConfig{Image: "alpine:3.19"} + _, err := resolveSandboxRunner(context.Background(), app, "ephemeral", cfg, "missing-argo") + if err == nil { + t.Fatal("expected error for unknown argo_module name") + } + if !strings.Contains(err.Error(), "missing-argo") { + t.Errorf("expected error to mention module name, got: %v", err) + } +} diff --git a/module/pipeline_step_argo.go b/module/pipeline_step_argo.go index 15b7a822d..242e5de82 100644 --- a/module/pipeline_step_argo.go +++ b/module/pipeline_step_argo.go @@ -43,13 +43,13 @@ func NewArgoSubmitStepFactory() StepFactory { func (s *ArgoSubmitStep) Name() string { return s.name } -func (s *ArgoSubmitStep) Execute(_ context.Context, _ *PipelineContext) (*StepResult, error) { +func (s *ArgoSubmitStep) Execute(ctx context.Context, _ *PipelineContext) (*StepResult, error) { m, err := resolveArgoModule(s.app, s.service, s.name) if err != nil { return nil, err } spec := TranslatePipelineToArgo(s.wfName, m.namespace(), s.steps) - runName, err := m.SubmitWorkflow(spec) + runName, err := m.SubmitWorkflow(ctx, spec) if err != nil { return nil, fmt.Errorf("argo_submit step %q: %w", s.name, err) } @@ -86,7 +86,7 @@ func NewArgoStatusStepFactory() StepFactory { func (s *ArgoStatusStep) Name() string { return s.name } -func (s *ArgoStatusStep) Execute(_ context.Context, pc *PipelineContext) (*StepResult, error) { +func (s *ArgoStatusStep) Execute(ctx context.Context, pc *PipelineContext) (*StepResult, error) { m, err := resolveArgoModule(s.app, s.service, s.name) if err != nil { return nil, err @@ -101,7 +101,7 @@ func (s *ArgoStatusStep) Execute(_ context.Context, pc *PipelineContext) (*StepR if wfRun == "" { return nil, fmt.Errorf("argo_status step %q: 'workflow_run' is required (set in config or from prior argo_submit)", s.name) } - status, err := m.WorkflowStatus(wfRun) + status, err := m.WorkflowStatus(ctx, wfRun) if err != nil { return nil, fmt.Errorf("argo_status step %q: %w", s.name, err) } @@ -138,7 +138,7 @@ func NewArgoLogsStepFactory() StepFactory { func (s *ArgoLogsStep) Name() string { return s.name } -func (s *ArgoLogsStep) Execute(_ context.Context, pc *PipelineContext) (*StepResult, error) { +func (s *ArgoLogsStep) Execute(ctx context.Context, pc *PipelineContext) (*StepResult, error) { m, err := resolveArgoModule(s.app, s.service, s.name) if err != nil { return nil, err @@ -152,7 +152,7 @@ func (s *ArgoLogsStep) Execute(_ context.Context, pc *PipelineContext) (*StepRes if wfRun == "" { return nil, fmt.Errorf("argo_logs step %q: 'workflow_run' is required", s.name) } - lines, err := m.WorkflowLogs(wfRun) + lines, err := m.WorkflowLogs(ctx, wfRun) if err != nil { return nil, fmt.Errorf("argo_logs step %q: %w", s.name, err) } @@ -188,7 +188,7 @@ func NewArgoDeleteStepFactory() StepFactory { func (s *ArgoDeleteStep) Name() string { return s.name } -func (s *ArgoDeleteStep) Execute(_ context.Context, pc *PipelineContext) (*StepResult, error) { +func (s *ArgoDeleteStep) Execute(ctx context.Context, pc *PipelineContext) (*StepResult, error) { m, err := resolveArgoModule(s.app, s.service, s.name) if err != nil { return nil, err @@ -202,7 +202,7 @@ func (s *ArgoDeleteStep) Execute(_ context.Context, pc *PipelineContext) (*StepR if wfRun == "" { return nil, fmt.Errorf("argo_delete step %q: 'workflow_run' is required", s.name) } - if err := m.DeleteWorkflow(wfRun); err != nil { + if err := m.DeleteWorkflow(ctx, wfRun); err != nil { return nil, fmt.Errorf("argo_delete step %q: %w", s.name, err) } return &StepResult{Output: map[string]any{ @@ -236,12 +236,12 @@ func NewArgoListStepFactory() StepFactory { func (s *ArgoListStep) Name() string { return s.name } -func (s *ArgoListStep) Execute(_ context.Context, _ *PipelineContext) (*StepResult, error) { +func (s *ArgoListStep) Execute(ctx context.Context, _ *PipelineContext) (*StepResult, error) { m, err := resolveArgoModule(s.app, s.service, s.name) if err != nil { return nil, err } - workflows, err := m.ListWorkflows(s.labelSelector) + workflows, err := m.ListWorkflows(ctx, s.labelSelector) if err != nil { return nil, fmt.Errorf("argo_list step %q: %w", s.name, err) } diff --git a/module/pipeline_step_sandbox_exec.go b/module/pipeline_step_sandbox_exec.go index cc53acf52..7e82fe1a9 100644 --- a/module/pipeline_step_sandbox_exec.go +++ b/module/pipeline_step_sandbox_exec.go @@ -17,7 +17,8 @@ type SandboxExecStep struct { image string command []string securityProfile string - execEnv string // "" or "local-docker" → local Docker; others deferred to later PRs + execEnv string // "" or "local-docker" → local Docker; "ephemeral" → Argo; others are remote runner names + argoModule string // optional: argo.workflows module name for exec_env: ephemeral memoryLimit int64 cpuLimit float64 timeout time.Duration @@ -96,14 +97,21 @@ func NewSandboxExecStepFactory() StepFactory { if ee, ok := cfg["exec_env"].(string); ok && ee != "" { // exec_env validation: "local-docker" is the local runner; + // "ephemeral" routes to the Argo Workflows ephemeral runner (PR9); // any other non-empty string is treated as a named remote runner and // validated at Execute time by resolveSandboxRunner (PR8). We no longer // reject unknown values at construction time since named runner // registrations are config-driven and not known until runtime. - // The reserved "ephemeral" value is still deferred to PR9. step.execEnv = ee } + if am, ok := cfg["argo_module"].(string); ok && am != "" { + // argo_module names the argo.workflows service to use when + // exec_env is "ephemeral". If unset, the factory auto-detects the + // sole registered *ArgoWorkflowsModule (error if 0 or >1 found). + step.argoModule = am + } + if envRaw, ok := cfg["env"].(map[string]any); ok { step.env = make(map[string]string, len(envRaw)) for k, v := range envRaw { @@ -139,7 +147,7 @@ func (s *SandboxExecStep) Name() string { return s.name } func (s *SandboxExecStep) Execute(ctx context.Context, _ *PipelineContext) (*StepResult, error) { sbCfg := s.buildSandboxConfig() - sb, err := resolveSandboxRunner(ctx, s.app, s.execEnv, sbCfg) + sb, err := resolveSandboxRunner(ctx, s.app, s.execEnv, sbCfg, s.argoModule) if err != nil { return nil, fmt.Errorf("sandbox_exec step %q: failed to create sandbox: %w", s.name, err) } diff --git a/schema/module_schema.go b/schema/module_schema.go index 282a294b8..ab058a045 100644 --- a/schema/module_schema.go +++ b/schema/module_schema.go @@ -2328,7 +2328,8 @@ func (r *ModuleSchemaRegistry) registerBuiltins() { {Key: "memory_limit", Label: "Memory Limit", Type: FieldTypeString, Description: "Memory limit (e.g. 128m)"}, {Key: "timeout", Label: "Timeout", Type: FieldTypeDuration, Description: "Execution timeout"}, {Key: "env", Label: "Environment", Type: FieldTypeMap, Description: "Environment variables"}, - {Key: "exec_env", Label: "Execution Environment", Type: FieldTypeSelect, Options: []string{"local-docker"}, DefaultValue: "local-docker", Description: "Execution backend (default local-docker; remote/ephemeral wired in later phases)"}, + {Key: "exec_env", Label: "Execution Environment", Type: FieldTypeSelect, Options: []string{"local-docker", "ephemeral"}, DefaultValue: "local-docker", Description: "Execution backend: local-docker (default) or ephemeral (Argo Workflow on k8s). Named remote runners (sandbox.remote_runners) are not listed here because they are registered dynamically."}, + {Key: "argo_module", Label: "Argo Module", Type: FieldTypeString, Description: "Name of the argo.workflows service to use when exec_env is ephemeral. If unset, the sole registered argo.workflows module is used (error if 0 or >1 exist)."}, }, }) diff --git a/schema/step_schema_builtins.go b/schema/step_schema_builtins.go index 5dc5a9da5..2cd2d83a1 100644 --- a/schema/step_schema_builtins.go +++ b/schema/step_schema_builtins.go @@ -1309,7 +1309,8 @@ func (r *StepSchemaRegistry) registerBuiltins() { {Key: "timeout", Type: FieldTypeDuration, Description: "Execution timeout"}, {Key: "env", Type: FieldTypeMap, Description: "Environment variables"}, {Key: "fail_on_error", Type: FieldTypeBool, Description: "Stop pipeline if exit_code != 0", DefaultValue: true}, - {Key: "exec_env", Type: FieldTypeSelect, Description: "Execution environment backend (default local-docker; remote/ephemeral wired in later phases)", Options: []string{"local-docker"}, DefaultValue: "local-docker"}, + {Key: "exec_env", Type: FieldTypeSelect, Description: "Execution environment backend: local-docker (default) or ephemeral (Argo Workflow on k8s, PR9). Named remote runners (sandbox.remote_runners) are not listed here because they are registered dynamically.", Options: []string{"local-docker", "ephemeral"}, DefaultValue: "local-docker"}, + {Key: "argo_module", Type: FieldTypeString, Description: "Name of the argo.workflows service to use when exec_env is ephemeral. If unset, the sole registered argo.workflows module is used (error if 0 or >1 exist)."}, }, Outputs: []StepOutputDef{ {Key: "exit_code", Type: "number", Description: "Container exit code"}, diff --git a/schema/testdata/editor-schemas.golden.json b/schema/testdata/editor-schemas.golden.json index d3d5d9be8..d02a58f60 100644 --- a/schema/testdata/editor-schemas.golden.json +++ b/schema/testdata/editor-schemas.golden.json @@ -7132,11 +7132,18 @@ "key": "exec_env", "label": "Execution Environment", "type": "select", - "description": "Execution backend (default local-docker; remote/ephemeral wired in later phases)", + "description": "Execution backend: local-docker (default) or ephemeral (Argo Workflow on k8s). Named remote runners (sandbox.remote_runners) are not listed here because they are registered dynamically.", "defaultValue": "local-docker", "options": [ - "local-docker" + "local-docker", + "ephemeral" ] + }, + { + "key": "argo_module", + "label": "Argo Module", + "type": "string", + "description": "Name of the argo.workflows service to use when exec_env is ephemeral. If unset, the sole registered argo.workflows module is used (error if 0 or \u003e1 exist)." } ] }, From 5b7b3cac7126913893d4d4a1a75eefabdb07dd54 Mon Sep 17 00:00:00 2001 From: Jon Langevin Date: Wed, 3 Jun 2026 00:57:29 -0400 Subject: [PATCH 2/2] fix(sandbox,module): delete workflow on cancel + schema/comment doc fixes (Copilot) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - ArgoEphemeralRunner best-effort DeleteWorkflow on ctx cancellation (fresh short ctx) so a cancelled/timed-out ephemeral step doesn't leave the Argo workflow running in-cluster until TTL GC. Added to the argoSubmitter interface + both test fakes; cancel test asserts deleteCalled. - step.sandbox_exec schema description generalized (not just Docker — also remote/ephemeral via exec_env), in both schema registries + golden. - exec_env field description drops the internal PR ref; notes named remote runners are valid dynamic values. - logs-retrieval-failure comment matches the actual warning-line behavior. Co-Authored-By: Claude Opus 4.8 (1M context) --- module/argo_ephemeral_runner.go | 15 ++++++++++++++- module/argo_ephemeral_runner_test.go | 18 ++++++++++++++++++ schema/module_schema.go | 2 +- schema/step_schema_builtins.go | 4 ++-- schema/testdata/editor-schemas.golden.json | 2 +- 5 files changed, 36 insertions(+), 5 deletions(-) diff --git a/module/argo_ephemeral_runner.go b/module/argo_ephemeral_runner.go index e2c5289ef..b95b16b6c 100644 --- a/module/argo_ephemeral_runner.go +++ b/module/argo_ephemeral_runner.go @@ -3,6 +3,7 @@ package module import ( "context" "fmt" + "log/slog" "strings" "sync/atomic" "time" @@ -21,6 +22,7 @@ type argoSubmitter interface { SubmitWorkflow(ctx context.Context, spec *ArgoWorkflowSpec) (string, error) WorkflowStatus(ctx context.Context, workflowName string) (string, error) WorkflowLogs(ctx context.Context, workflowName string) ([]string, error) + DeleteWorkflow(ctx context.Context, workflowName string) error } // Compile-time assertion: *ArgoWorkflowsModule satisfies argoSubmitter. @@ -134,6 +136,16 @@ func (r *ArgoEphemeralRunner) Exec(ctx context.Context, cmd []string) (*sandbox. for { select { case <-ctx.Done(): + // The caller cancelled/timed out. Best-effort terminate the submitted + // workflow so it doesn't keep running (and billing) in the cluster + // until TTL GC — analogous to the local runner stopping its container. + // Use a fresh short-lived ctx since the caller's is already done. + delCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + if delErr := r.submitter.DeleteWorkflow(delCtx, runName); delErr != nil { + slog.Warn("argo ephemeral runner: failed to delete workflow after ctx cancellation", + "workflow", runName, "err", delErr) + } + cancel() return nil, ctx.Err() case <-ticker.C: status, err := r.submitter.WorkflowStatus(ctx, runName) @@ -151,7 +163,8 @@ done: // Fetch logs and join into a single Stdout string. lines, err := r.submitter.WorkflowLogs(ctx, runName) if err != nil { - // Non-fatal: return partial result with empty stdout and log the issue. + // Non-fatal: surface the failure as a warning line in stdout (so the + // caller still gets the exit-code verdict) rather than aborting. lines = []string{fmt.Sprintf("[argo ephemeral runner] warning: could not retrieve logs: %v", err)} } stdout := strings.Join(lines, "\n") diff --git a/module/argo_ephemeral_runner_test.go b/module/argo_ephemeral_runner_test.go index 8160f0bf1..16c2f7828 100644 --- a/module/argo_ephemeral_runner_test.go +++ b/module/argo_ephemeral_runner_test.go @@ -51,6 +51,15 @@ type fakeArgoSubmitter struct { // ctx is cancelled, then returns ctx.Err(). This simulates an in-flight HTTP // call that honors ctx cancellation (the real backend's doRequest behavior). blockStatusOnCtx bool + + // deleteCalled records whether DeleteWorkflow was invoked (best-effort + // cleanup on ctx cancellation). + deleteCalled bool +} + +func (f *fakeArgoSubmitter) DeleteWorkflow(_ context.Context, _ string) error { + f.deleteCalled = true + return nil } func (f *fakeArgoSubmitter) SubmitWorkflow(_ context.Context, spec *ArgoWorkflowSpec) (string, error) { @@ -312,6 +321,11 @@ func TestArgoEphemeralRunner_CtxCancelDuringPoll(t *testing.T) { if !errors.Is(err, context.DeadlineExceeded) && !errors.Is(err, context.Canceled) { t.Errorf("expected context error, got: %v", err) } + // On cancellation the runner best-effort deletes the submitted workflow so it + // doesn't keep running in the cluster until TTL GC. + if !fake.deleteCalled { + t.Error("expected DeleteWorkflow to be called on ctx cancellation") + } } // TestArgoEphemeralRunner_CtxCancelDuringInFlightStatus verifies that ctx @@ -454,3 +468,7 @@ func (c *capturingSubmitter) WorkflowStatus(ctx context.Context, name string) (s func (c *capturingSubmitter) WorkflowLogs(ctx context.Context, name string) ([]string, error) { return c.inner.WorkflowLogs(ctx, name) } + +func (c *capturingSubmitter) DeleteWorkflow(ctx context.Context, name string) error { + return c.inner.DeleteWorkflow(ctx, name) +} diff --git a/schema/module_schema.go b/schema/module_schema.go index ab058a045..4fffdf9db 100644 --- a/schema/module_schema.go +++ b/schema/module_schema.go @@ -2320,7 +2320,7 @@ func (r *ModuleSchemaRegistry) registerBuiltins() { Type: "step.sandbox_exec", Label: "Sandbox Exec", Category: "pipeline", - Description: "Runs a command in a hardened Docker sandbox container with resource limits", + Description: "Runs a command in a hardened sandbox with resource limits — locally in a Docker container (default) or, via exec_env, on a remote runner or an ephemeral Argo Workflow", ConfigFields: []ConfigFieldDef{ {Key: "image", Label: "Image", Type: FieldTypeString, Description: "Container image URI"}, {Key: "command", Label: "Command", Type: FieldTypeArray, Description: "Command to execute"}, diff --git a/schema/step_schema_builtins.go b/schema/step_schema_builtins.go index 2cd2d83a1..2bf2bfc5c 100644 --- a/schema/step_schema_builtins.go +++ b/schema/step_schema_builtins.go @@ -1299,7 +1299,7 @@ func (r *StepSchemaRegistry) registerBuiltins() { r.Register(&StepSchema{ Type: "step.sandbox_exec", Plugin: "pipelinesteps", - Description: "Runs a command in a hardened Docker sandbox container with resource limits.", + Description: "Runs a command in a hardened sandbox with resource limits — locally in a Docker container (default) or, via exec_env, on a remote runner or an ephemeral Argo Workflow.", ConfigFields: []ConfigFieldDef{ {Key: "image", Type: FieldTypeString, Description: "Container image URI", DefaultValue: "cgr.dev/chainguard/wolfi-base:latest"}, {Key: "command", Type: FieldTypeArray, Description: "Command to execute in sandbox"}, @@ -1309,7 +1309,7 @@ func (r *StepSchemaRegistry) registerBuiltins() { {Key: "timeout", Type: FieldTypeDuration, Description: "Execution timeout"}, {Key: "env", Type: FieldTypeMap, Description: "Environment variables"}, {Key: "fail_on_error", Type: FieldTypeBool, Description: "Stop pipeline if exit_code != 0", DefaultValue: true}, - {Key: "exec_env", Type: FieldTypeSelect, Description: "Execution environment backend: local-docker (default) or ephemeral (Argo Workflow on k8s, PR9). Named remote runners (sandbox.remote_runners) are not listed here because they are registered dynamically.", Options: []string{"local-docker", "ephemeral"}, DefaultValue: "local-docker"}, + {Key: "exec_env", Type: FieldTypeSelect, Description: "Execution environment backend: local-docker (default) or ephemeral (one-off Argo Workflow on k8s). Named remote runners (configured via sandbox.remote_runners) are also valid values but are not listed here because they are registered dynamically.", Options: []string{"local-docker", "ephemeral"}, DefaultValue: "local-docker"}, {Key: "argo_module", Type: FieldTypeString, Description: "Name of the argo.workflows service to use when exec_env is ephemeral. If unset, the sole registered argo.workflows module is used (error if 0 or >1 exist)."}, }, Outputs: []StepOutputDef{ diff --git a/schema/testdata/editor-schemas.golden.json b/schema/testdata/editor-schemas.golden.json index d02a58f60..3f1fd9b85 100644 --- a/schema/testdata/editor-schemas.golden.json +++ b/schema/testdata/editor-schemas.golden.json @@ -7085,7 +7085,7 @@ "type": "step.sandbox_exec", "label": "Sandbox Exec", "category": "pipeline", - "description": "Runs a command in a hardened Docker sandbox container with resource limits", + "description": "Runs a command in a hardened sandbox with resource limits — locally in a Docker container (default) or, via exec_env, on a remote runner or an ephemeral Argo Workflow", "configFields": [ { "key": "image",