diff --git a/AGENTS.md b/AGENTS.md index c2f0100d..b9f02159 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -50,347 +50,42 @@ The .gitignore exists for a reason. Overriding it for pipeline state files (CONT -# Role: Delivery +# Role: Docs Writer -You are the Delivery cataractae. You own everything from branch to merged. -Fix whatever is in the way. Resolve merge conflicts and review comments unconditionally. Recirculate after 2 failed fix attempts on the same code-level CI check. +You are a documentation writer in a Cistern Aqueduct. You review changes and +ensure the documentation is accurate and complete before delivery. -## Step 0 — Pre-flight +## Context -```bash -go mod tidy -go build ./... -``` -If go mod tidy changed go.mod/go.sum: -```bash -git add go.mod go.sum -- ':!CONTEXT.md' -if git ls-files CONTEXT.md | grep -q CONTEXT.md; then - git rm --cached CONTEXT.md -fi -git commit -m "chore: go mod tidy" -``` -If go build fails: fix it before touching git. A broken build should not reach a PR. - -## Step 0.5 — Check for zero-commit branch - -```bash -DROPLET_ID=$(grep '^## Item:' CONTEXT.md | awk '{print $3}') -git fetch origin main -FETCH_EXIT=$? -``` - -If the fetch fails (`FETCH_EXIT != 0`), skip this step entirely and continue to Step 1. - -If the fetch succeeds: - -```bash -COMMIT_COUNT=$(git log origin/main..HEAD --oneline | wc -l) -``` - -- If `COMMIT_COUNT` is **0**: the branch has no commits against `origin/main` — the work was already delivered upstream. Signal immediately and stop: - ```bash - ct droplet pass $DROPLET_ID --notes "No commits on branch — work already delivered upstream. Signaling pass without PR." - ``` - Do not proceed further. - -- If `COMMIT_COUNT` is **non-zero**: continue to Step 1 normally. - -## Step 1 — Extract droplet ID and branch - -```bash -DROPLET_ID=$(grep '^## Item:' CONTEXT.md | awk '{print $3}') -BRANCH=$(git branch --show-current) -BASE=main -echo "Delivering $DROPLET_ID from $BRANCH" -``` - -Do NOT git stash. Per-droplet worktrees are clean by design. Stashing discards -uncommitted work from prior cataractae silently. - -## Step 2 — Rebase onto origin/main before PR - -This step is mandatory. Do not open a PR until the branch is based on the current -tip of `origin/$BASE`. - -```bash -git fetch origin $BASE -if MERGE_BASE=$(git merge-base HEAD origin/$BASE) && ORIGIN_TIP=$(git rev-parse origin/$BASE); then - if [ "$MERGE_BASE" = "$ORIGIN_TIP" ]; then - echo "Branch is already based on origin/$BASE — no rebase needed" - else - echo "Branch is behind origin/$BASE — rebasing" - git rebase origin/$BASE - fi -else - echo "merge-base check failed — rebasing unconditionally" - git rebase origin/$BASE -fi -``` - -If conflicts arise during rebase, resolve them — see Conflict Resolution below. -After fetch and any rebase: -```bash -go build ./... && go test ./... -if grep -rq '^<<<<<<<' . --include='*.md' --include='*.go' --include='*.yaml'; then - echo 'ERROR: conflict markers found after rebase — resolve before pushing' - ct droplet pool $DROPLET_ID --notes 'Pooled: conflict markers present after rebase — manual resolution required' - exit 1 -fi -git push --force-with-lease origin $BRANCH -``` - -## Conflict Resolution - -Most conflicts are additive: HEAD added X, this branch adds Y. Keep both. - -```bash -git diff --name-only --diff-filter=U # see conflicted files -``` - -For each file: -1. Understand what HEAD added and what this branch adds -2. Keep both sets of additions — never discard the branch's work -3. Verify: go build ./... - -After resolving all files: -```bash -git add $(git diff --name-only --diff-filter=U) -if git ls-files CONTEXT.md | grep -q CONTEXT.md; then - git rm --cached CONTEXT.md -fi -git rebase --continue -go build ./... && go test ./... -if grep -rq '^<<<<<<<' . --include='*.md' --include='*.go' --include='*.yaml'; then - echo 'ERROR: conflict markers found after rebase — resolve before pushing' - ct droplet pool $DROPLET_ID --notes 'Pooled: conflict markers present after rebase — manual resolution required' - exit 1 -fi -git push --force-with-lease origin $BRANCH -``` - -## Step 3 — Open or locate the PR - -```bash -PR_TITLE=$(grep '^\*\*Title:\*\*' CONTEXT.md | sed 's/\*\*Title:\*\* //') -PR_URL=$(gh pr create \ - --title "$PR_TITLE" \ - --body "Closes droplet $DROPLET_ID." \ - --base $BASE --head $BRANCH 2>&1) || true - -if echo "$PR_URL" | grep -q "already exists"; then - PR_URL=$(gh pr view $BRANCH --json url --jq '.url') -fi -echo "PR: $PR_URL" -``` - -## Step 4 — CI and review - -```bash -CHECKS=$(gh pr checks "$PR_URL") -GH_EXIT=$? -if [ $GH_EXIT -ne 0 ] && [ -z "$CHECKS" ]; then - echo "ERROR: gh pr checks failed (exit $GH_EXIT)" - ct droplet pool $DROPLET_ID --notes "gh pr checks failed (exit $GH_EXIT) — cannot verify CI — $PR_URL" - exit 1 -elif [ -z "$CHECKS" ]; then - echo "No CI checks configured — proceeding to merge" -else - echo "$CHECKS" - # Wait for all checks to pass before merging. -fi -``` - -### Per-check attempt counter - -Before entering the fix loop, initialize an associative array keyed by check name: - -```bash -declare -A CHECK_ATTEMPTS # key = check name, value = number of fix attempts made -``` - -Each time you take any action to fix a specific failing check — including a `gh run rerun` — increment `CHECK_ATTEMPTS[""]`. The counter is per check name, not per push. A rerun is not a free retry: it counts as attempt 1, and if the same check fails again after the rerun, that is attempt 2 — do not issue a second rerun, apply a code-level fix instead; a third failure triggers recirculation. - -### Failure classification +You have **full codebase access**. Your environment contains: -Classify each failing check before acting on it. Classification determines whether the attempt counter applies. +- The full repository with the implementation committed +- `CONTEXT.md` describing the work item and requirements -**Recirculate-eligible** — code-level failures the implementer can address (attempt counter applies): -- Test failures: output contains `FAIL`, `--- FAIL`, `FAIL\t`, assertion errors, `expected X got Y`, `not equal` -- API errors: application returns unexpected `4xx` or `5xx` status -- Schema mismatches: `field missing`, `type mismatch`, `unknown field`, `validation error` -- Compilation errors in test or application code +Read `CONTEXT.md` first to understand your droplet ID and what was built. -**Pooled-eligible** — infrastructure failures the implementer cannot address (attempt counter does NOT apply): -- Port conflicts: `address already in use`, `bind: address already in use` -- Container startup failures: `container exited with code`, `failed to start container`, `OOMKilled` -- Service unavailable: `connection refused`, `no such host`, `dial tcp.*refused`, `i/o timeout` +## Protocol -**Counter-exempt** — process-level issues that block CI but are not code failures; resolve unconditionally (attempt counter does NOT apply): -- Merge conflicts: branch is behind `origin/main`, CI detects out-of-date branch -- Unresolved review comments: reviewer has requested changes +1. **Read CONTEXT.md** — note your droplet ID and what changed +2. **Run git diff main...HEAD** — understand all user-visible changes +3. **Find all .md files** — `find . -name "*.md" -not -path "./.git/*"` +4. **Check each changed area** — for CLI, config, pipeline, and architecture + changes: verify docs exist and are accurate +5. **If no user-visible changes** — pass immediately: + `ct droplet pass --notes "No documentation updates required."` +6. **Otherwise** — update outdated sections, add missing docs +7. **Commit** — `git add -A && git commit -m ": docs: update documentation for changes"` +8. **Signal outcome** -For pooled-eligible failures, signal immediately without incrementing the counter: -```bash -ct droplet pool $DROPLET_ID --notes "Pooled: — $PR_URL" -``` - -### Counter-exempt handling - -Before entering the fix loop, resolve all counter-exempt issues unconditionally — no attempt counter applies: - -- **Merge conflict detected by CI** → rebase (Step 2) and push, then re-check CI -- **Unresolved review comment** → address it, commit, push, then re-check CI - -Repeat until no counter-exempt issues remain, then proceed to the fix loop. - -### Fix loop - -For each recirculate-eligible failing check: - -1. Increment `CHECK_ATTEMPTS[""]` -2. If `CHECK_ATTEMPTS[""] > 2`, recirculate — see **Recirculate path** below. -3. Otherwise, apply the appropriate fix and push: - - Compile error → fix code, `go build ./...`, commit, push - - Test failure → fix test or code, `go test ./...`, commit, push - - Flaky test → `gh run rerun ` and wait for result (**this counts as attempt 1; if the same check fails again after the rerun, that is attempt 2 — do not issue a second rerun, apply a code-level fix instead; a third failure triggers recirculation**) - -After each fix commit: -```bash -git add -A -- ':!CONTEXT.md' -if git ls-files CONTEXT.md | grep -q CONTEXT.md; then - git rm --cached CONTEXT.md -fi -git commit -m "fix: " && git push -``` - -Wait for the check to complete, then return to step 1 of the loop for any remaining failures. - -### Recirculate path - -When `CHECK_ATTEMPTS[""] > 2`, stop and recirculate with a structured diagnostic. All five fields are required — do not recirculate with a partial note. - -```bash -ct droplet recirculate $DROPLET_ID --notes "$(cat <<'EOF' -CI recirculation: 2 failed fix attempts on the same check. - -Failed check: - -Error snippet: - - -Fix attempt 1: - -Fix attempt 2: - -Recommended fix: -EOF -)" -``` - -Wait for all checks to pass before merging. If `gh pr checks` returns no output, there are no CI checks — proceed directly to Step 5. - -## Step 5 — Merge - -```bash -git fetch origin && git rebase origin/$BASE -if grep -rq '^<<<<<<<' . --include='*.md' --include='*.go' --include='*.yaml'; then - echo 'ERROR: conflict markers found after rebase — resolve before pushing' - ct droplet pool $DROPLET_ID --notes 'Pooled: conflict markers present after rebase — manual resolution required' - exit 1 -fi -git push --force-with-lease && gh pr merge "$PR_URL" --squash --delete-branch -STATE=$(gh pr view "$PR_URL" --json state --jq '.state') -if [ "$STATE" != "MERGED" ]; then - echo "ERROR: merge failed — state is $STATE" - ct droplet pool $DROPLET_ID --notes "Merge failed: state=$STATE — $PR_URL" - exit 1 -fi -echo "Confirmed: PR state is MERGED" -``` - -## Step 6 — Signal +## Signaling -Only after MERGED is confirmed: -```bash -ct droplet pass $DROPLET_ID --notes "Delivered: $PR_URL — " ``` - -If merge is impossible after exhausting all options: -```bash -ct droplet pool $DROPLET_ID --notes "Cannot merge: — $PR_URL" +ct droplet pass --notes "Updated docs: ." +ct droplet recirculate --notes "Ambiguous: " ``` -## Rules -- Never signal pass until gh pr view confirms state == "MERGED" -- Never discard branch additions in conflicts — always keep both sides -- go build + go test must pass before every push -- Fix CI, conflicts, and review comments yourself — do not recirculate for routine failures -- Recirculate after 2 failed fix attempts on the same code-level CI check (see Step 4 recirculate path) -- Recirculate only for code-level failures — never recirculate for infrastructure/pooled failures (pool instead) -- Never run git add CONTEXT.md or git add -f CONTEXT.md under any circumstances -- CONTEXT.md is pipeline state injected at dispatch time; it must never be committed - ## Skills -## Skill: cistern-github - ---- -name: cistern-github -description: GitHub CLI operations for Cistern delivery cataractae. Use for PR creation, CI checks, and squash-merge in per-droplet delivery workflows. ---- - -# Cistern GitHub Operations - -## Tools - -Use `gh` CLI for all GitHub operations. Prefer CLI over GitHub MCP servers for lower context usage. - -## PR Lifecycle - -```bash -# Create a PR for the current droplet branch -gh pr create \ - --title "$PR_TITLE" \ - --body "Closes droplet $DROPLET_ID." \ - --base main --head $BRANCH - -# If PR already exists -gh pr view $BRANCH --json url --jq '.url' - -# Check CI status -gh pr checks $PR_URL - -# Squash-merge when all checks pass -gh pr merge $PR_URL --squash --delete-branch - -# Confirm merge -gh pr view $PR_URL --json state --jq '.state' # must be "MERGED" -``` - -## Conflict Resolution - -**Conflicts MUST be resolved automatically. Never stop and ask the user.** - -Cistern agents resolve conflicts by keeping both sets of changes. The canonical -protocol is in `cataractae/delivery/INSTRUCTIONS.md` — follow it exactly. - -Summary: -1. `git diff --name-only --diff-filter=U` — identify conflicted files -2. For each file: keep what HEAD added AND keep what this branch adds -3. `go build ./...` — verify the merge compiles -4. `git add $(git diff --name-only --diff-filter=U)` — stage resolved files -5. `git rebase --continue` -6. `go build ./... && go test ./...` — verify after full rebase -7. `git push --force-with-lease origin $BRANCH` - -Most conflicts are additive: HEAD added X, this branch adds Y — keep both. -Never discard branch additions. - -## Cistern Delivery Model - -Cistern uses **per-droplet branches** (`feat/`), not stacked PRs. -Each droplet is independent. There is no stacked-PR workflow. - ## Skill: cistern-droplet-state # Cistern Droplet State diff --git a/CHANGELOG.md b/CHANGELOG.md index c194c68d..44cefa1c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,18 @@ All notable changes to this project will be documented here. - **Evaluate mutation cache invalidation**: `evaluateMutation.onSuccess` now invalidates `queryKeys.qualityGates.evaluations()` so the EvaluationHistory panel shows fresh results after evaluation without a manual refresh. +- **K8s worker security hardening**: Worker containers now run with a strict security context: `runAsNonRoot`, `runAsUser: 1000`, `readOnlyRootFilesystem`, dropped all capabilities, disabled privilege escalation, `RuntimeDefault` seccomp profile, and no auto-mounted service account token. Previously, containers ran as root with no restrictions. + +- **K8s worker token stored as Secret**: The `ST_WORKER_TOKEN` is no longer passed as a plain environment variable (visible via `kubectl describe pod`). Instead, each execution creates a dedicated K8s Secret (`st-worker-token-`) and injects it via `secretKeyRef`. The Secret is automatically cleaned up on cancellation or when the reconciler detects an orphaned job. + +- **K8s worker resource limits**: Worker pods now have default resource requests (128Mi memory / 250m CPU) and limits (512Mi memory / 500m CPU), configurable via `ST_WORKER_CPU_REQUEST`, `ST_WORKER_CPU_LIMIT`, `ST_WORKER_MEMORY_REQUEST`, and `ST_WORKER_MEMORY_LIMIT` environment variables. Previously, pods had no resource constraints, allowing runaway tests to consume unlimited resources. + +- **Execution reconciler**: A background reconciler periodically scans for orphaned `running` executions whose K8s job has finished or was never created, and marks them as `failed`. This prevents executions from staying in `running` forever when the worker crashes before reporting status. Configurable via `ST_RECONCILE_INTERVAL` (default: 60s) and `ST_RECONCILE_ORPHAN_TIMEOUT` (default: 5m). + +- **Team-scoped K8s lookups**: Cancellation endpoints now use team-scoped queries for K8s job name and worker token Secret lookups, preventing cross-team access to K8s resources. + +- **Status guard on markExecutionFailed**: The `markExecutionFailed` function now includes `AND status = 'running'` to prevent overwriting a completed or failed execution's final status. + - **Store layer extraction**: All HTTP handlers now use store interfaces instead of embedding `*db.Pool` directly. Store interfaces are defined on the handler side and implemented in `internal/store/`, making handlers testable without a running database and centralizing SQL query knowledge. Affected handlers: auth, teams, analytics, executions, reports, admin, invitations, oauth. - **Bulk test result inserts**: Report ingestion now uses `pgx.Batch` to insert test results in bulk instead of one query per result. This eliminates the N+1 insert pattern that caused 1000+ round-trips for large reports. diff --git a/README.md b/README.md index 51529a9c..6f4185cb 100644 --- a/README.md +++ b/README.md @@ -73,6 +73,16 @@ export OPENAI_API_KEY=your_api_key # Required if ST_LLM_PROVIDER=openai # Optional: disable rate limiting (test environments only — never use in production) export ST_DISABLE_RATE_LIMIT=true + +# Optional: Kubernetes worker resource limits (defaults shown) +export ST_WORKER_CPU_REQUEST=250m +export ST_WORKER_CPU_LIMIT=500m +export ST_WORKER_MEMORY_REQUEST=128Mi +export ST_WORKER_MEMORY_LIMIT=512Mi + +# Optional: execution reconciler — detects orphaned K8s jobs and marks them failed +export ST_RECONCILE_INTERVAL=60s # How often to check for orphans +export ST_RECONCILE_ORPHAN_TIMEOUT=5m # Grace period before declaring an execution orphaned ``` When `ST_SMTP_HOST` is not set the mailer runs in no-op mode — all outbound email is silently discarded. Set it to enable email notifications. diff --git a/cmd/server/main.go b/cmd/server/main.go index 5ff5a0e1..ddd6cb03 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -62,7 +62,7 @@ func main() { log.Fatal().Msg("ST_JWT_SECRET must be at least 32 characters in production") } - router := server.NewRouter(cfg, pool) + router, reconciler := server.NewRouter(cfg, pool) srv := &http.Server{ Addr: fmt.Sprintf(":%d", cfg.Port), @@ -76,6 +76,11 @@ func main() { ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) defer stop() + // Start the execution reconciler if K8s + DB are available + if reconciler != nil { + go reconciler.Start(ctx) + } + go func() { log.Info().Int("port", cfg.Port).Msg("server starting") if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed { diff --git a/docs/deployment/getting-started.md b/docs/deployment/getting-started.md index bf7db3f9..6aaa1645 100644 --- a/docs/deployment/getting-started.md +++ b/docs/deployment/getting-started.md @@ -69,10 +69,18 @@ OAuth is optional. Email/password login works without it. | Variable | Default | Description | |---|---|---| | `ST_WORKER_IMAGE` | `scaledtest/worker:latest` | Container image used for distributed test worker Jobs | -| `ST_WORKER_TOKEN` | _(none)_ | API token injected into worker pods for authenticated reporting | +| `ST_WORKER_TOKEN` | _(none)_ | API token injected into worker pods via K8s Secret for authenticated reporting | | `ST_K8S_NAMESPACE` | `default` | Kubernetes namespace where worker Jobs are created | | `ST_K8S_IN_CLUSTER` | `false` | Set `true` when running inside a Kubernetes cluster | | `ST_K8S_KUBECONFIG` | _(none)_ | Path to kubeconfig file when running outside a cluster | +| `ST_WORKER_CPU_REQUEST` | `250m` | CPU request per worker container | +| `ST_WORKER_CPU_LIMIT` | `500m` | CPU limit per worker container | +| `ST_WORKER_MEMORY_REQUEST` | `128Mi` | Memory request per worker container | +| `ST_WORKER_MEMORY_LIMIT` | `512Mi` | Memory limit per worker container | +| `ST_RECONCILE_INTERVAL` | `60s` | How often the execution reconciler checks for orphaned jobs | +| `ST_RECONCILE_ORPHAN_TIMEOUT` | `5m` | Grace period before an execution without a K8s job is considered orphaned | + +Worker pods run with a hardened security context (non-root user, read-only filesystem, dropped capabilities, seccomp profile) and resource limits by default. The `ST_WORKER_TOKEN` is stored in a dedicated K8s Secret per execution rather than a plain environment variable, preventing exposure via `kubectl describe pod`. --- diff --git a/internal/db/db_test.go b/internal/db/db_test.go index 0e97936c..e82beedb 100644 --- a/internal/db/db_test.go +++ b/internal/db/db_test.go @@ -32,8 +32,8 @@ func TestMigrationsEmbedded(t *testing.T) { t.Errorf("migration up/down mismatch: %d up, %d down", ups, downs) } - if ups != 22 { - t.Errorf("expected 22 migration pairs, got %d", ups) + if ups != 23 { + t.Errorf("expected 23 migration pairs, got %d", ups) } // Verify each up has a matching down diff --git a/internal/db/migrations/000023_add_worker_token_secret_to_executions.down.sql b/internal/db/migrations/000023_add_worker_token_secret_to_executions.down.sql new file mode 100644 index 00000000..838f014d --- /dev/null +++ b/internal/db/migrations/000023_add_worker_token_secret_to_executions.down.sql @@ -0,0 +1,2 @@ +ALTER TABLE test_executions + DROP COLUMN worker_token_secret; \ No newline at end of file diff --git a/internal/db/migrations/000023_add_worker_token_secret_to_executions.up.sql b/internal/db/migrations/000023_add_worker_token_secret_to_executions.up.sql new file mode 100644 index 00000000..98d41676 --- /dev/null +++ b/internal/db/migrations/000023_add_worker_token_secret_to_executions.up.sql @@ -0,0 +1,4 @@ +-- Store the actual K8s Secret name used for the worker token so that +-- cancellation and reconciliation can clean up only auto-created secrets. +ALTER TABLE test_executions + ADD COLUMN worker_token_secret TEXT; \ No newline at end of file diff --git a/internal/handler/executions.go b/internal/handler/executions.go index 01f3ce4d..c788581e 100644 --- a/internal/handler/executions.go +++ b/internal/handler/executions.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "net/http" + "strings" "time" "github.com/go-chi/chi/v5" @@ -148,16 +149,17 @@ func (h *ExecutionsHandler) Create(w http.ResponseWriter, r *http.Request) { APIBaseURL: h.APIBaseURL, ExecutionID: id, } - if _, err := h.K8s.CreateJob(r.Context(), jobCfg); err != nil { + result, err := h.K8s.CreateJob(r.Context(), jobCfg) + if err != nil { log.Error().Err(err).Str("execution_id", id).Msg("failed to launch k8s job") markExecutionFailed(r.Context(), h.DB, id, "job launch failed: "+err.Error()) Error(w, http.StatusInternalServerError, "execution created but job launch failed") return } - // Store K8s job name on the execution record (best-effort) + // Store K8s job name and worker token secret on the execution record (best-effort) _, _ = h.DB.Exec(r.Context(), - `UPDATE test_executions SET k8s_job_name = $1, updated_at = $2 WHERE id = $3`, - jobName, time.Now(), id) + `UPDATE test_executions SET k8s_job_name = $1, worker_token_secret = $2, updated_at = $3 WHERE id = $4`, + jobName, result.WorkerTokenSecret, time.Now(), id) } // Log audit event after commit @@ -243,12 +245,18 @@ func (h *ExecutionsHandler) Cancel(w http.ResponseWriter, r *http.Request) { } if h.K8s != nil { - jobName, _ := h.ExecStore.GetK8sJobName(r.Context(), executionID) + jobName, _ := h.ExecStore.GetK8sJobNameByTeam(r.Context(), executionID, claims.TeamID) if jobName != nil && *jobName != "" { if err := h.K8s.DeleteJob(r.Context(), *jobName); err != nil { log.Error().Err(err).Str("job", *jobName).Msg("failed to delete k8s job on cancel") } } + secretName, _ := h.ExecStore.GetWorkerTokenSecretByTeam(r.Context(), executionID, claims.TeamID) + if secretName != nil && *secretName != "" && strings.HasPrefix(*secretName, k8s.WorkerTokenSecretPrefix) { + if err := h.K8s.DeleteSecret(r.Context(), *secretName); err != nil { + log.Warn().Err(err).Str("secret", *secretName).Msg("failed to delete worker token secret on cancel") + } + } } if h.AuditStore != nil { @@ -375,7 +383,7 @@ func markExecutionFailed(ctx context.Context, pool *db.Pool, id, errMsg string) } defer tx.Rollback(ctx) if _, err := tx.Exec(ctx, - `UPDATE test_executions SET status = 'failed', error_msg = $1, updated_at = $2 WHERE id = $3`, + `UPDATE test_executions SET status = 'failed', error_msg = $1, updated_at = $2 WHERE id = $3 AND status = 'running'`, errMsg, time.Now(), id); err != nil { log.Error().Err(err).Str("execution_id", id).Msg("failed to update execution failure status") return diff --git a/internal/handler/executions_test.go b/internal/handler/executions_test.go index de5ff6f8..a1172df2 100644 --- a/internal/handler/executions_test.go +++ b/internal/handler/executions_test.go @@ -16,15 +16,18 @@ import ( ) type mockExecutionsStore struct { - listFn func(ctx context.Context, teamID string, limit, offset int) ([]model.TestExecution, int, error) - createFn func(ctx context.Context, teamID, command string, configJSON []byte) (string, error) - getFn func(ctx context.Context, id, teamID string) (*model.TestExecution, error) - cancelFn func(ctx context.Context, id, teamID string, now time.Time) (int64, error) - updateStatusFn func(ctx context.Context, id, teamID, status string, now time.Time, errorMsg *string) (int64, error) - existsFn func(ctx context.Context, id, teamID string) (bool, error) - getK8sJobNameFn func(ctx context.Context, id string) (*string, error) - setK8sJobNameFn func(ctx context.Context, id, jobName string, now time.Time) error - markFailedFn func(ctx context.Context, id, errorMsg string, now time.Time) error + listFn func(ctx context.Context, teamID string, limit, offset int) ([]model.TestExecution, int, error) + createFn func(ctx context.Context, teamID, command string, configJSON []byte) (string, error) + getFn func(ctx context.Context, id, teamID string) (*model.TestExecution, error) + cancelFn func(ctx context.Context, id, teamID string, now time.Time) (int64, error) + updateStatusFn func(ctx context.Context, id, teamID, status string, now time.Time, errorMsg *string) (int64, error) + existsFn func(ctx context.Context, id, teamID string) (bool, error) + getK8sJobNameFn func(ctx context.Context, id string) (*string, error) + getK8sJobNameByTeamFn func(ctx context.Context, id, teamID string) (*string, error) + setK8sJobNameFn func(ctx context.Context, id, jobName string, now time.Time) error + getWorkerTokenSecretFn func(ctx context.Context, id string) (*string, error) + getWorkerTokenSecretByTeamFn func(ctx context.Context, id, teamID string) (*string, error) + markFailedFn func(ctx context.Context, id, errorMsg string, now time.Time) error } func (m *mockExecutionsStore) List(ctx context.Context, teamID string, limit, offset int) ([]model.TestExecution, int, error) { @@ -48,12 +51,33 @@ func (m *mockExecutionsStore) Exists(ctx context.Context, id, teamID string) (bo func (m *mockExecutionsStore) GetK8sJobName(ctx context.Context, id string) (*string, error) { return m.getK8sJobNameFn(ctx, id) } +func (m *mockExecutionsStore) GetK8sJobNameByTeam(ctx context.Context, id, teamID string) (*string, error) { + if m.getK8sJobNameByTeamFn != nil { + return m.getK8sJobNameByTeamFn(ctx, id, teamID) + } + return m.getK8sJobNameFn(ctx, id) +} func (m *mockExecutionsStore) SetK8sJobName(ctx context.Context, id, jobName string, now time.Time) error { return m.setK8sJobNameFn(ctx, id, jobName, now) } +func (m *mockExecutionsStore) GetWorkerTokenSecret(ctx context.Context, id string) (*string, error) { + if m.getWorkerTokenSecretFn != nil { + return m.getWorkerTokenSecretFn(ctx, id) + } + return nil, nil +} +func (m *mockExecutionsStore) GetWorkerTokenSecretByTeam(ctx context.Context, id, teamID string) (*string, error) { + if m.getWorkerTokenSecretByTeamFn != nil { + return m.getWorkerTokenSecretByTeamFn(ctx, id, teamID) + } + return m.GetWorkerTokenSecret(ctx, id) +} func (m *mockExecutionsStore) MarkFailed(ctx context.Context, id, errorMsg string, now time.Time) error { return m.markFailedFn(ctx, id, errorMsg, now) } +func (m *mockExecutionsStore) ListRunning(ctx context.Context) ([]model.TestExecution, error) { + return nil, nil +} func TestListExecutions_Unauthorized(t *testing.T) { h := &ExecutionsHandler{} diff --git a/internal/handler/store_interfaces.go b/internal/handler/store_interfaces.go index 352cdd0f..7a5cd867 100644 --- a/internal/handler/store_interfaces.go +++ b/internal/handler/store_interfaces.go @@ -42,8 +42,12 @@ type executionsStore interface { UpdateStatus(ctx context.Context, id, teamID, status string, now time.Time, errorMsg *string) (int64, error) Exists(ctx context.Context, id, teamID string) (bool, error) GetK8sJobName(ctx context.Context, id string) (*string, error) + GetK8sJobNameByTeam(ctx context.Context, id, teamID string) (*string, error) SetK8sJobName(ctx context.Context, id, jobName string, now time.Time) error + GetWorkerTokenSecret(ctx context.Context, id string) (*string, error) + GetWorkerTokenSecretByTeam(ctx context.Context, id, teamID string) (*string, error) MarkFailed(ctx context.Context, id, errorMsg string, now time.Time) error + ListRunning(ctx context.Context) ([]model.TestExecution, error) } // reportsStore abstracts report persistence operations. diff --git a/internal/k8s/k8s.go b/internal/k8s/k8s.go index 5d18f461..2c67ea87 100644 --- a/internal/k8s/k8s.go +++ b/internal/k8s/k8s.go @@ -3,6 +3,9 @@ package k8s import ( "context" "fmt" + "os" + "strings" + "time" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" @@ -12,11 +15,58 @@ import ( "k8s.io/client-go/tools/clientcmd" "github.com/rs/zerolog/log" + "k8s.io/apimachinery/pkg/api/resource" ) +func ptrBool(v bool) *bool { return &v } +func ptrInt64(v int64) *int64 { return &v } +func ptrInt32(v int32) *int32 { return &v } + +// WorkerTokenSecretPrefix is the prefix for auto-created worker token Secrets. +// Exported so callers can construct Secret names for cleanup (e.g. on cancellation). +const WorkerTokenSecretPrefix = "st-worker-token-" + +// workerTokenSecretName returns the auto-generated Secret name for an execution. +func workerTokenSecretName(executionID string) string { + return WorkerTokenSecretPrefix + executionID +} + +func resourceQty(s string) (resource.Quantity, error) { + q, err := resource.ParseQuantity(s) + if err != nil { + return resource.Quantity{}, fmt.Errorf("parse resource quantity %q: %w", s, err) + } + return q, nil +} + +type resourceSpecs struct { + CPUReq resource.Quantity + CPULim resource.Quantity + MemReq resource.Quantity + MemLim resource.Quantity +} + +func parseResources(cpuReq, cpuLim, memReq, memLim string) (resourceSpecs, error) { + var specs resourceSpecs + var err error + if specs.CPUReq, err = resourceQty(cpuReq); err != nil { + return specs, fmt.Errorf("cpu request: %w", err) + } + if specs.CPULim, err = resourceQty(cpuLim); err != nil { + return specs, fmt.Errorf("cpu limit: %w", err) + } + if specs.MemReq, err = resourceQty(memReq); err != nil { + return specs, fmt.Errorf("memory request: %w", err) + } + if specs.MemLim, err = resourceQty(memLim); err != nil { + return specs, fmt.Errorf("memory limit: %w", err) + } + return specs, nil +} + // Client wraps the Kubernetes clientset for test execution Job management. type Client struct { - clientset *kubernetes.Clientset + clientset kubernetes.Interface namespace string } @@ -56,58 +106,185 @@ func NewClient(namespace string, inCluster bool, kubeconfig string) (*Client, er // JobConfig defines the parameters for creating a test execution Job. type JobConfig struct { - Name string // Job name (typically execution ID) - Image string // Worker container image - Command string // Test command to run - EnvVars map[string]string // Additional environment variables - WorkerToken string // Auth token for worker to report back - APIBaseURL string // Base URL of the ScaledTest API - ExecutionID string // Execution ID for result reporting + Name string // Job name (typically execution ID) + Image string // Worker container image + Command string // Test command to run + EnvVars map[string]string // Additional environment variables + WorkerToken string // Auth token for worker to report back (used to create Secret) + WorkerTokenSecret string // Name of existing K8s Secret holding ST_WORKER_TOKEN + APIBaseURL string // Base URL of the ScaledTest API + ExecutionID string // Execution ID for result reporting + CPURequest string // Container CPU request (e.g. "250m"), defaults to env ST_WORKER_CPU_REQUEST + CPULimit string // Container CPU limit (e.g. "500m"), defaults to env ST_WORKER_CPU_LIMIT + MemoryRequest string // Container memory request (e.g. "128Mi"), defaults to env ST_WORKER_MEMORY_REQUEST + MemoryLimit string // Container memory limit (e.g. "512Mi"), defaults to env ST_WORKER_MEMORY_LIMIT +} + +const ( + defaultCPURequest = "250m" + defaultCPULimit = "500m" + defaultMemoryRequest = "128Mi" + defaultMemoryLimit = "512Mi" +) + +func envOr(key, fallback string) string { + if v := os.Getenv(key); v != "" { + return v + } + return fallback +} + +func workerLabels(executionID string) map[string]string { + return map[string]string{ + "app.kubernetes.io/name": "scaledtest-worker", + "app.kubernetes.io/managed-by": "scaledtest", + "scaledtest/execution-id": executionID, + } +} + +func workerTokenLabels(executionID string) map[string]string { + return map[string]string{ + "app.kubernetes.io/name": "scaledtest-worker-token", + "app.kubernetes.io/managed-by": "scaledtest", + "scaledtest/execution-id": executionID, + } +} + +// ResourceDefaults returns the effective resource values, falling back to env +// vars then built-in defaults. +func (cfg *JobConfig) ResourceDefaults() (cpuReq, cpuLim, memReq, memLim string) { + cpuReq = cfg.CPURequest + if cpuReq == "" { + cpuReq = envOr("ST_WORKER_CPU_REQUEST", defaultCPURequest) + } + cpuLim = cfg.CPULimit + if cpuLim == "" { + cpuLim = envOr("ST_WORKER_CPU_LIMIT", defaultCPULimit) + } + memReq = cfg.MemoryRequest + if memReq == "" { + memReq = envOr("ST_WORKER_MEMORY_REQUEST", defaultMemoryRequest) + } + memLim = cfg.MemoryLimit + if memLim == "" { + memLim = envOr("ST_WORKER_MEMORY_LIMIT", defaultMemoryLimit) + } + return +} + +// CreateJobResult holds the created Job and metadata about the worker token Secret. +type CreateJobResult struct { + Job *batchv1.Job + WorkerTokenSecret string // Name of the K8s Secret used for ST_WORKER_TOKEN + AutoCreatedSecret bool // True if the Secret was auto-created (and should be cleaned up) } // CreateJob creates a Kubernetes Job for test execution. -func (c *Client) CreateJob(ctx context.Context, cfg JobConfig) (*batchv1.Job, error) { - envVars := []corev1.EnvVar{ - {Name: "ST_WORKER_TOKEN", Value: cfg.WorkerToken}, - {Name: "ST_API_URL", Value: cfg.APIBaseURL}, - {Name: "ST_EXECUTION_ID", Value: cfg.ExecutionID}, - {Name: "ST_COMMAND", Value: cfg.Command}, +func (c *Client) CreateJob(ctx context.Context, cfg JobConfig) (*CreateJobResult, error) { + var envVars []corev1.EnvVar + + secretName := cfg.WorkerTokenSecret + autoCreatedSecret := false + if secretName == "" { + secretName = workerTokenSecretName(cfg.ExecutionID) + secret := &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: secretName, + Namespace: c.namespace, + Labels: workerTokenLabels(cfg.ExecutionID), + }, + StringData: map[string]string{ + "ST_WORKER_TOKEN": cfg.WorkerToken, + }, + } + if _, err := c.clientset.CoreV1().Secrets(c.namespace).Create(ctx, secret, metav1.CreateOptions{}); err != nil { + return nil, fmt.Errorf("create worker token secret: %w", err) + } + autoCreatedSecret = true + log.Info().Str("secret", secretName).Msg("worker token secret created") } + envVars = append(envVars, + corev1.EnvVar{ + Name: "ST_WORKER_TOKEN", + ValueFrom: &corev1.EnvVarSource{ + SecretKeyRef: &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{Name: secretName}, + Key: "ST_WORKER_TOKEN", + }, + }, + }, + corev1.EnvVar{Name: "ST_API_URL", Value: cfg.APIBaseURL}, + corev1.EnvVar{Name: "ST_EXECUTION_ID", Value: cfg.ExecutionID}, + corev1.EnvVar{Name: "ST_COMMAND", Value: cfg.Command}, + ) + for k, v := range cfg.EnvVars { envVars = append(envVars, corev1.EnvVar{Name: k, Value: v}) } - backoffLimit := int32(0) // No retries — fail fast + cpuReq, cpuLim, memReq, memLim := cfg.ResourceDefaults() + + resources, err := parseResources(cpuReq, cpuLim, memReq, memLim) + if err != nil { + return nil, err + } + + containerSecurityContext := &corev1.SecurityContext{ + RunAsNonRoot: ptrBool(true), + RunAsUser: ptrInt64(1000), + ReadOnlyRootFilesystem: ptrBool(true), + AllowPrivilegeEscalation: ptrBool(false), + Capabilities: &corev1.Capabilities{ + Drop: []corev1.Capability{"ALL"}, + }, + } + + podSecurityContext := &corev1.PodSecurityContext{ + RunAsNonRoot: ptrBool(true), + RunAsUser: ptrInt64(1000), + FSGroup: ptrInt64(1000), + SeccompProfile: &corev1.SeccompProfile{ + Type: corev1.SeccompProfileTypeRuntimeDefault, + }, + } + + backoffLimit := int32(0) ttlSeconds := int32(3600) job := &batchv1.Job{ ObjectMeta: metav1.ObjectMeta{ Name: cfg.Name, Namespace: c.namespace, - Labels: map[string]string{ - "app.kubernetes.io/name": "scaledtest-worker", - "app.kubernetes.io/managed-by": "scaledtest", - "scaledtest/execution-id": cfg.ExecutionID, - }, + Labels: workerLabels(cfg.ExecutionID), }, Spec: batchv1.JobSpec{ BackoffLimit: &backoffLimit, TTLSecondsAfterFinished: &ttlSeconds, Template: corev1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ - Labels: map[string]string{ - "app.kubernetes.io/name": "scaledtest-worker", - "scaledtest/execution-id": cfg.ExecutionID, - }, + Labels: workerLabels(cfg.ExecutionID), }, Spec: corev1.PodSpec{ - RestartPolicy: corev1.RestartPolicyNever, + RestartPolicy: corev1.RestartPolicyNever, + SecurityContext: podSecurityContext, + AutomountServiceAccountToken: ptrBool(false), Containers: []corev1.Container{ { - Name: "worker", - Image: cfg.Image, - Env: envVars, + Name: "worker", + Image: cfg.Image, + Env: envVars, + SecurityContext: containerSecurityContext, + Resources: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resources.CPUReq, + corev1.ResourceMemory: resources.MemReq, + }, + Limits: corev1.ResourceList{ + corev1.ResourceCPU: resources.CPULim, + corev1.ResourceMemory: resources.MemLim, + }, + }, }, }, }, @@ -117,6 +294,11 @@ func (c *Client) CreateJob(ctx context.Context, cfg JobConfig) (*batchv1.Job, er created, err := c.clientset.BatchV1().Jobs(c.namespace).Create(ctx, job, metav1.CreateOptions{}) if err != nil { + if autoCreatedSecret { + if delErr := c.clientset.CoreV1().Secrets(c.namespace).Delete(ctx, secretName, metav1.DeleteOptions{}); delErr != nil { + log.Warn().Err(delErr).Str("secret", secretName).Msg("failed to clean up worker token secret after job creation failure") + } + } return nil, fmt.Errorf("create job: %w", err) } @@ -125,7 +307,11 @@ func (c *Client) CreateJob(ctx context.Context, cfg JobConfig) (*batchv1.Job, er Str("execution_id", cfg.ExecutionID). Msg("k8s job created") - return created, nil + return &CreateJobResult{ + Job: created, + WorkerTokenSecret: secretName, + AutoCreatedSecret: autoCreatedSecret, + }, nil } // DeleteJob deletes a Kubernetes Job and its pods (for cancellation). @@ -142,6 +328,16 @@ func (c *Client) DeleteJob(ctx context.Context, jobName string) error { return nil } +// DeleteSecret deletes a Kubernetes Secret (for cleanup after job completion). +func (c *Client) DeleteSecret(ctx context.Context, name string) error { + err := c.clientset.CoreV1().Secrets(c.namespace).Delete(ctx, name, metav1.DeleteOptions{}) + if err != nil { + return fmt.Errorf("delete secret: %w", err) + } + log.Info().Str("secret", name).Msg("k8s secret deleted") + return nil +} + // GetJobStatus returns the current status of a Job. func (c *Client) GetJobStatus(ctx context.Context, jobName string) (*JobStatus, error) { job, err := c.clientset.BatchV1().Jobs(c.namespace).Get(ctx, jobName, metav1.GetOptions{}) @@ -182,3 +378,161 @@ type JobStatus struct { func (s *JobStatus) IsFinished() bool { return s.Completed || s.FailedCondition } + +// RunningExecution represents a test execution that is currently in 'running' state. +type RunningExecution struct { + ID string + K8sJobName *string + WorkerTokenSecret *string + StartedAt *time.Time +} + +// JobStatusGetter abstracts K8s job status lookup for reconciliation. +type JobStatusGetter interface { + GetJobStatus(ctx context.Context, jobName string) (*JobStatus, error) +} + +// ExecutionReconciler marks orphaned running executions as failed when their +// K8s job has finished but the worker never reported status back. +type ExecutionReconciler struct { + JobStatusGetter JobStatusGetter + SecretDeleter SecretDeleter + ListRunning func(ctx context.Context) ([]RunningExecution, error) + MarkFailed func(ctx context.Context, id, errorMsg string, now time.Time) error + OrphanTimeout time.Duration + ReconcileInterval time.Duration +} + +// SecretDeleter abstracts K8s Secret deletion for cleanup after job completion. +type SecretDeleter interface { + DeleteSecret(ctx context.Context, name string) error +} + +const ( + defaultOrphanTimeout = 5 * time.Minute + defaultReconcileInterval = 60 * time.Second +) + +// NewExecutionReconciler creates a reconciler with sensible defaults. +func NewExecutionReconciler(k8sClient JobStatusGetter, secretDeleter SecretDeleter, listRunning func(ctx context.Context) ([]RunningExecution, error), markFailed func(ctx context.Context, id, errorMsg string, now time.Time) error) *ExecutionReconciler { + return &ExecutionReconciler{ + JobStatusGetter: k8sClient, + SecretDeleter: secretDeleter, + ListRunning: listRunning, + MarkFailed: markFailed, + OrphanTimeout: envOrDuration("ST_RECONCILE_ORPHAN_TIMEOUT", defaultOrphanTimeout), + ReconcileInterval: envOrDuration("ST_RECONCILE_INTERVAL", defaultReconcileInterval), + } +} + +func envOrDuration(key string, fallback time.Duration) time.Duration { + if v := os.Getenv(key); v != "" { + if d, err := time.ParseDuration(v); err == nil { + return d + } + log.Warn().Str("key", key).Str("value", v).Msg("invalid duration in env var, using default") + } + return fallback +} + +// ReconcileOnce performs a single reconciliation pass over running executions. +// It handles three cases: +// 1. Executions without a K8s job name that have been running longer than +// OrphanTimeout — marks them failed (the job was never created). +// 2. Executions with a K8s job name but still within OrphanTimeout of their +// start time — skipped to give recently-started jobs a grace period. +// 3. Executions with a K8s job name whose job has finished — marks them failed +// and cleans up the associated worker token Secret. +func (r *ExecutionReconciler) ReconcileOnce(ctx context.Context) (reconciled int, err error) { + executions, err := r.ListRunning(ctx) + if err != nil { + return 0, fmt.Errorf("list running executions: %w", err) + } + + now := time.Now() + for _, exec := range executions { + if exec.K8sJobName == nil || *exec.K8sJobName == "" { + if exec.StartedAt != nil && now.Sub(*exec.StartedAt) > r.OrphanTimeout { + errMsg := "execution orphaned: no k8s job assigned and running beyond timeout" + if markErr := r.MarkFailed(ctx, exec.ID, errMsg, now); markErr != nil { + log.Error().Err(markErr).Str("execution_id", exec.ID).Msg("reconcile: failed to mark execution failed") + continue + } + log.Info(). + Str("execution_id", exec.ID). + Msg("reconcile: marked orphaned execution (no k8s job) as failed") + reconciled++ + } + continue + } + + if exec.StartedAt != nil && now.Sub(*exec.StartedAt) < r.OrphanTimeout { + continue + } + + jobStatus, jobErr := r.JobStatusGetter.GetJobStatus(ctx, *exec.K8sJobName) + if jobErr != nil { + log.Warn().Err(jobErr).Str("job", *exec.K8sJobName).Msg("reconcile: failed to get job status") + continue + } + + if !jobStatus.IsFinished() { + continue + } + + errMsg := "execution orphaned: k8s job finished but worker did not report status" + if jobStatus.FailureMessage != "" { + errMsg = jobStatus.FailureMessage + } + + if markErr := r.MarkFailed(ctx, exec.ID, errMsg, now); markErr != nil { + log.Error().Err(markErr).Str("execution_id", exec.ID).Msg("reconcile: failed to mark execution failed") + continue + } + + log.Info(). + Str("execution_id", exec.ID). + Str("job", *exec.K8sJobName). + Str("failure", errMsg). + Msg("reconcile: marked orphaned execution as failed") + + if r.SecretDeleter != nil && exec.WorkerTokenSecret != nil { + secretName := *exec.WorkerTokenSecret + if strings.HasPrefix(secretName, WorkerTokenSecretPrefix) { + if delErr := r.SecretDeleter.DeleteSecret(ctx, secretName); delErr != nil { + log.Warn().Err(delErr).Str("secret", secretName).Msg("reconcile: failed to delete worker token secret") + } + } + } + + reconciled++ + } + + return reconciled, nil +} + +// Start runs the reconciliation loop at the configured interval until ctx is cancelled. +func (r *ExecutionReconciler) Start(ctx context.Context) { + ticker := time.NewTicker(r.ReconcileInterval) + defer ticker.Stop() + + log.Info(). + Dur("interval", r.ReconcileInterval). + Dur("orphan_timeout", r.OrphanTimeout). + Msg("execution reconciler started") + + for { + select { + case <-ctx.Done(): + log.Info().Msg("execution reconciler stopped") + return + case <-ticker.C: + n, err := r.ReconcileOnce(ctx) + if err != nil { + log.Error().Err(err).Msg("reconcile pass failed") + } else if n > 0 { + log.Info().Int("reconciled", n).Msg("reconcile pass completed") + } + } + } +} diff --git a/internal/k8s/k8s_test.go b/internal/k8s/k8s_test.go index 1a6c2256..14fcd5ac 100644 --- a/internal/k8s/k8s_test.go +++ b/internal/k8s/k8s_test.go @@ -1,9 +1,37 @@ package k8s import ( + "context" + "fmt" + "os" "testing" + "time" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/kubernetes/fake" + k8stesting "k8s.io/client-go/testing" ) +type testJobStatusGetter struct { + Fn func(ctx context.Context, jobName string) (*JobStatus, error) +} + +func (t *testJobStatusGetter) GetJobStatus(ctx context.Context, jobName string) (*JobStatus, error) { + return t.Fn(ctx, jobName) +} + +type testSecretDeleter struct { + deleted []string + err error +} + +func (t *testSecretDeleter) DeleteSecret(ctx context.Context, name string) error { + t.deleted = append(t.deleted, name) + return t.err +} + func TestJobStatusIsFinished(t *testing.T) { tests := []struct { name string @@ -86,3 +114,1015 @@ func TestJobStatusStates(t *testing.T) { }) } } + +func TestResourceDefaults_WhenFieldsSet(t *testing.T) { + cfg := JobConfig{ + CPURequest: "500m", + CPULimit: "1000m", + MemoryRequest: "256Mi", + MemoryLimit: "1Gi", + } + cpuReq, cpuLim, memReq, memLim := cfg.ResourceDefaults() + if cpuReq != "500m" { + t.Errorf("CPURequest = %q, want %q", cpuReq, "500m") + } + if cpuLim != "1000m" { + t.Errorf("CPULimit = %q, want %q", cpuLim, "1000m") + } + if memReq != "256Mi" { + t.Errorf("MemoryRequest = %q, want %q", memReq, "256Mi") + } + if memLim != "1Gi" { + t.Errorf("MemoryLimit = %q, want %q", memLim, "1Gi") + } +} + +func TestResourceDefaults_WhenFieldsEmptyAndNoEnv(t *testing.T) { + cfg := JobConfig{} + cpuReq, cpuLim, memReq, memLim := cfg.ResourceDefaults() + if cpuReq != defaultCPURequest { + t.Errorf("CPURequest = %q, want %q", cpuReq, defaultCPURequest) + } + if cpuLim != defaultCPULimit { + t.Errorf("CPULimit = %q, want %q", cpuLim, defaultCPULimit) + } + if memReq != defaultMemoryRequest { + t.Errorf("MemoryRequest = %q, want %q", memReq, defaultMemoryRequest) + } + if memLim != defaultMemoryLimit { + t.Errorf("MemoryLimit = %q, want %q", memLim, defaultMemoryLimit) + } +} + +func TestResourceDefaults_WhenEnvOverrides(t *testing.T) { + os.Setenv("ST_WORKER_CPU_REQUEST", "1") + os.Setenv("ST_WORKER_CPU_LIMIT", "2") + os.Setenv("ST_WORKER_MEMORY_REQUEST", "2Gi") + os.Setenv("ST_WORKER_MEMORY_LIMIT", "4Gi") + defer func() { + os.Unsetenv("ST_WORKER_CPU_REQUEST") + os.Unsetenv("ST_WORKER_CPU_LIMIT") + os.Unsetenv("ST_WORKER_MEMORY_REQUEST") + os.Unsetenv("ST_WORKER_MEMORY_LIMIT") + }() + + cfg := JobConfig{} + cpuReq, cpuLim, memReq, memLim := cfg.ResourceDefaults() + if cpuReq != "1" { + t.Errorf("CPURequest = %q, want %q", cpuReq, "1") + } + if cpuLim != "2" { + t.Errorf("CPULimit = %q, want %q", cpuLim, "2") + } + if memReq != "2Gi" { + t.Errorf("MemoryRequest = %q, want %q", memReq, "2Gi") + } + if memLim != "4Gi" { + t.Errorf("MemoryLimit = %q, want %q", memLim, "4Gi") + } +} + +func TestResourceDefaults_WhenConfigOverridesEnv(t *testing.T) { + os.Setenv("ST_WORKER_CPU_REQUEST", "1") + defer os.Unsetenv("ST_WORKER_CPU_REQUEST") + + cfg := JobConfig{CPURequest: "200m"} + cpuReq, _, _, _ := cfg.ResourceDefaults() + if cpuReq != "200m" { + t.Errorf("CPURequest = %q, want %q (config should override env)", cpuReq, "200m") + } +} + +func TestPtrHelpers(t *testing.T) { + b := ptrBool(true) + if *b != true { + t.Errorf("ptrBool(true) = %v", *b) + } + i64 := ptrInt64(42) + if *i64 != 42 { + t.Errorf("ptrInt64(42) = %v", *i64) + } + i32 := ptrInt32(7) + if *i32 != 7 { + t.Errorf("ptrInt32(7) = %v", *i32) + } +} + +func TestResourceQty(t *testing.T) { + q, err := resourceQty("250m") + if err != nil { + t.Fatalf("resourceQty(\"250m\") error = %v", err) + } + if q.String() != "250m" { + t.Errorf("resourceQty(\"250m\") = %q", q.String()) + } + q, err = resourceQty("128Mi") + if err != nil { + t.Fatalf("resourceQty(\"128Mi\") error = %v", err) + } + if q.String() != "128Mi" { + t.Errorf("resourceQty(\"128Mi\") = %q", q.String()) + } +} + +func TestResourceQty_InvalidInput(t *testing.T) { + _, err := resourceQty("not-a-quantity") + if err == nil { + t.Error("resourceQty should return error for invalid input") + } +} + +func TestJobConfig_WorkerTokenSecret_Field(t *testing.T) { + cfg := JobConfig{ + Name: "exec-secret", + Image: "scaledtest/worker:latest", + Command: "npm test", + ExecutionID: "exec-secret", + WorkerToken: "secret-token", + WorkerTokenSecret: "pre-existing-secret", + APIBaseURL: "http://api:8080", + } + if cfg.WorkerTokenSecret != "pre-existing-secret" { + t.Errorf("WorkerTokenSecret = %q, want %q", cfg.WorkerTokenSecret, "pre-existing-secret") + } +} + +func TestReconcileOnce_WhenJobFinished(t *testing.T) { + jobName := "st-exec-orphan1" + secretName := WorkerTokenSecretPrefix + "orphan1" + var markedID string + var markedMsg string + sd := &testSecretDeleter{} + + getter := &testJobStatusGetter{ + Fn: func(ctx context.Context, name string) (*JobStatus, error) { + return &JobStatus{FailedCondition: true, Failed: 1, FailureMessage: "OOMKilled"}, nil + }, + } + + startedAt := time.Now().Add(-10 * time.Minute) + reconciler := &ExecutionReconciler{ + JobStatusGetter: getter, + SecretDeleter: sd, + ListRunning: func(ctx context.Context) ([]RunningExecution, error) { + return []RunningExecution{ + {ID: "orphan1", K8sJobName: &jobName, WorkerTokenSecret: &secretName, StartedAt: &startedAt}, + }, nil + }, + MarkFailed: func(ctx context.Context, id, errorMsg string, now time.Time) error { + markedID = id + markedMsg = errorMsg + return nil + }, + OrphanTimeout: defaultOrphanTimeout, + ReconcileInterval: defaultReconcileInterval, + } + + n, err := reconciler.ReconcileOnce(context.Background()) + if err != nil { + t.Fatalf("ReconcileOnce() error = %v", err) + } + if n != 1 { + t.Errorf("reconciled count = %d, want 1", n) + } + if markedID != "orphan1" { + t.Errorf("marked ID = %q, want %q", markedID, "orphan1") + } + if markedMsg != "OOMKilled" { + t.Errorf("marked message = %q, want %q", markedMsg, "OOMKilled") + } + if len(sd.deleted) != 1 || sd.deleted[0] != "st-worker-token-orphan1" { + t.Errorf("secret deletion = %v, want [st-worker-token-orphan1]", sd.deleted) + } +} + +func TestReconcileOnce_WhenJobStillRunning(t *testing.T) { + jobName := "st-exec-active1" + called := false + sd := &testSecretDeleter{} + + getter := &testJobStatusGetter{ + Fn: func(ctx context.Context, name string) (*JobStatus, error) { + return &JobStatus{Active: 1}, nil + }, + } + + startedAt := time.Now().Add(-10 * time.Minute) + reconciler := &ExecutionReconciler{ + JobStatusGetter: getter, + SecretDeleter: sd, + ListRunning: func(ctx context.Context) ([]RunningExecution, error) { + return []RunningExecution{ + {ID: "active1", K8sJobName: &jobName, StartedAt: &startedAt}, + }, nil + }, + MarkFailed: func(ctx context.Context, id, errorMsg string, now time.Time) error { + called = true + return nil + }, + OrphanTimeout: defaultOrphanTimeout, + ReconcileInterval: defaultReconcileInterval, + } + + n, err := reconciler.ReconcileOnce(context.Background()) + if err != nil { + t.Fatalf("ReconcileOnce() error = %v", err) + } + if n != 0 { + t.Errorf("reconciled count = %d, want 0", n) + } + if called { + t.Error("MarkFailed should not have been called for active job") + } +} + +func TestReconcileOnce_WhenNoK8sJobName_BeyondTimeout(t *testing.T) { + var markedID string + var markedMsg string + sd := &testSecretDeleter{} + + startedAt := time.Now().Add(-10 * time.Minute) + reconciler := &ExecutionReconciler{ + JobStatusGetter: &testJobStatusGetter{}, + SecretDeleter: sd, + ListRunning: func(ctx context.Context) ([]RunningExecution, error) { + return []RunningExecution{ + {ID: "nojob", K8sJobName: nil, StartedAt: &startedAt}, + }, nil + }, + MarkFailed: func(ctx context.Context, id, errorMsg string, now time.Time) error { + markedID = id + markedMsg = errorMsg + return nil + }, + OrphanTimeout: defaultOrphanTimeout, + ReconcileInterval: defaultReconcileInterval, + } + + n, err := reconciler.ReconcileOnce(context.Background()) + if err != nil { + t.Fatalf("ReconcileOnce() error = %v", err) + } + if n != 1 { + t.Errorf("reconciled count = %d, want 1", n) + } + if markedID != "nojob" { + t.Errorf("marked ID = %q, want %q", markedID, "nojob") + } + wantMsg := "execution orphaned: no k8s job assigned and running beyond timeout" + if markedMsg != wantMsg { + t.Errorf("marked message = %q, want %q", markedMsg, wantMsg) + } +} + +func TestReconcileOnce_WhenNoK8sJobName_WithinTimeout(t *testing.T) { + sd := &testSecretDeleter{} + + startedAt := time.Now().Add(-30 * time.Second) + reconciler := &ExecutionReconciler{ + JobStatusGetter: &testJobStatusGetter{}, + SecretDeleter: sd, + ListRunning: func(ctx context.Context) ([]RunningExecution, error) { + return []RunningExecution{ + {ID: "nojob-new", K8sJobName: nil, StartedAt: &startedAt}, + }, nil + }, + MarkFailed: func(ctx context.Context, id, errorMsg string, now time.Time) error { + t.Error("MarkFailed should not be called for recently-started execution without K8s job") + return nil + }, + OrphanTimeout: defaultOrphanTimeout, + ReconcileInterval: defaultReconcileInterval, + } + + n, err := reconciler.ReconcileOnce(context.Background()) + if err != nil { + t.Fatalf("ReconcileOnce() error = %v", err) + } + if n != 0 { + t.Errorf("reconciled count = %d, want 0", n) + } +} + +func TestReconcileOnce_WhenNoK8sJobName_NilStartedAt(t *testing.T) { + sd := &testSecretDeleter{} + + reconciler := &ExecutionReconciler{ + JobStatusGetter: &testJobStatusGetter{}, + SecretDeleter: sd, + ListRunning: func(ctx context.Context) ([]RunningExecution, error) { + return []RunningExecution{ + {ID: "nojob-nil-start", K8sJobName: nil, StartedAt: nil}, + }, nil + }, + MarkFailed: func(ctx context.Context, id, errorMsg string, now time.Time) error { + t.Error("MarkFailed should not be called when StartedAt is nil and no K8s job") + return nil + }, + OrphanTimeout: defaultOrphanTimeout, + ReconcileInterval: defaultReconcileInterval, + } + + n, err := reconciler.ReconcileOnce(context.Background()) + if err != nil { + t.Fatalf("ReconcileOnce() error = %v", err) + } + if n != 0 { + t.Errorf("reconciled count = %d, want 0", n) + } +} + +func TestReconcileOnce_GracePeriod_SkipsRecentJob(t *testing.T) { + jobName := "st-exec-recent1" + sd := &testSecretDeleter{} + + getter := &testJobStatusGetter{ + Fn: func(ctx context.Context, name string) (*JobStatus, error) { + return &JobStatus{FailedCondition: true, Failed: 1}, nil + }, + } + + startedAt := time.Now().Add(-30 * time.Second) + markCalled := false + reconciler := &ExecutionReconciler{ + JobStatusGetter: getter, + SecretDeleter: sd, + ListRunning: func(ctx context.Context) ([]RunningExecution, error) { + return []RunningExecution{ + {ID: "recent1", K8sJobName: &jobName, StartedAt: &startedAt}, + }, nil + }, + MarkFailed: func(ctx context.Context, id, errorMsg string, now time.Time) error { + markCalled = true + return nil + }, + OrphanTimeout: defaultOrphanTimeout, + ReconcileInterval: defaultReconcileInterval, + } + + n, err := reconciler.ReconcileOnce(context.Background()) + if err != nil { + t.Fatalf("ReconcileOnce() error = %v", err) + } + if n != 0 { + t.Errorf("reconciled count = %d, want 0 (within grace period)", n) + } + if markCalled { + t.Error("MarkFailed should not be called for execution within grace period") + } +} + +func TestReconcileOnce_WhenGetJobStatusFails(t *testing.T) { + jobName := "st-exec-err1" + called := false + sd := &testSecretDeleter{} + secretName := WorkerTokenSecretPrefix + "err1" + + getter := &testJobStatusGetter{ + Fn: func(ctx context.Context, name string) (*JobStatus, error) { + return nil, context.DeadlineExceeded + }, + } + + startedAt := time.Now().Add(-10 * time.Minute) + reconciler := &ExecutionReconciler{ + JobStatusGetter: getter, + SecretDeleter: sd, + ListRunning: func(ctx context.Context) ([]RunningExecution, error) { + return []RunningExecution{ + {ID: "err1", K8sJobName: &jobName, WorkerTokenSecret: &secretName, StartedAt: &startedAt}, + }, nil + }, + MarkFailed: func(ctx context.Context, id, errorMsg string, now time.Time) error { + called = true + return nil + }, + OrphanTimeout: defaultOrphanTimeout, + ReconcileInterval: defaultReconcileInterval, + } + + n, err := reconciler.ReconcileOnce(context.Background()) + if err != nil { + t.Fatalf("ReconcileOnce() error = %v", err) + } + if n != 0 { + t.Errorf("reconciled count = %d, want 0", n) + } + if called { + t.Error("MarkFailed should not be called when GetJobStatus fails") + } +} + +func TestReconcileOnce_WhenEmptyList(t *testing.T) { + sd := &testSecretDeleter{} + reconciler := &ExecutionReconciler{ + JobStatusGetter: &testJobStatusGetter{}, + SecretDeleter: sd, + ListRunning: func(ctx context.Context) ([]RunningExecution, error) { + return nil, nil + }, + MarkFailed: func(ctx context.Context, id, errorMsg string, now time.Time) error { + t.Error("MarkFailed should not be called for empty list") + return nil + }, + OrphanTimeout: defaultOrphanTimeout, + ReconcileInterval: defaultReconcileInterval, + } + + n, err := reconciler.ReconcileOnce(context.Background()) + if err != nil { + t.Fatalf("ReconcileOnce() error = %v", err) + } + if n != 0 { + t.Errorf("reconciled count = %d, want 0", n) + } +} + +func TestReconcileOnce_WhenMarkFailedFails(t *testing.T) { + jobName := "st-exec-mf1" + sd := &testSecretDeleter{} + secretName := WorkerTokenSecretPrefix + "mf1" + + getter := &testJobStatusGetter{ + Fn: func(ctx context.Context, name string) (*JobStatus, error) { + return &JobStatus{Completed: true, Succeeded: 1}, nil + }, + } + + startedAt := time.Now().Add(-10 * time.Minute) + reconciler := &ExecutionReconciler{ + JobStatusGetter: getter, + SecretDeleter: sd, + ListRunning: func(ctx context.Context) ([]RunningExecution, error) { + return []RunningExecution{ + {ID: "mf1", K8sJobName: &jobName, WorkerTokenSecret: &secretName, StartedAt: &startedAt}, + }, nil + }, + MarkFailed: func(ctx context.Context, id, errorMsg string, now time.Time) error { + return context.DeadlineExceeded + }, + OrphanTimeout: defaultOrphanTimeout, + ReconcileInterval: defaultReconcileInterval, + } + + n, err := reconciler.ReconcileOnce(context.Background()) + if err != nil { + t.Fatalf("ReconcileOnce() should not return error when MarkFailed fails, got %v", err) + } + if n != 0 { + t.Errorf("reconciled count = %d, want 0 (MarkFailed failed)", n) + } +} + +func TestReconcileOnce_WhenListRunningFails(t *testing.T) { + sd := &testSecretDeleter{} + reconciler := &ExecutionReconciler{ + JobStatusGetter: &testJobStatusGetter{}, + SecretDeleter: sd, + ListRunning: func(ctx context.Context) ([]RunningExecution, error) { + return nil, context.DeadlineExceeded + }, + MarkFailed: func(ctx context.Context, id, errorMsg string, now time.Time) error { + return nil + }, + OrphanTimeout: defaultOrphanTimeout, + ReconcileInterval: defaultReconcileInterval, + } + + _, err := reconciler.ReconcileOnce(context.Background()) + if err == nil { + t.Fatal("ReconcileOnce() should return error when ListRunning fails") + } +} + +func TestReconcileOnce_DefaultErrorMessage(t *testing.T) { + jobName := "st-exec-default1" + var markedMsg string + secretName := WorkerTokenSecretPrefix + "default1" + sd := &testSecretDeleter{} + + getter := &testJobStatusGetter{ + Fn: func(ctx context.Context, name string) (*JobStatus, error) { + return &JobStatus{FailedCondition: true, Failed: 1}, nil + }, + } + + startedAt := time.Now().Add(-10 * time.Minute) + reconciler := &ExecutionReconciler{ + JobStatusGetter: getter, + SecretDeleter: sd, + ListRunning: func(ctx context.Context) ([]RunningExecution, error) { + return []RunningExecution{ + {ID: "default1", K8sJobName: &jobName, WorkerTokenSecret: &secretName, StartedAt: &startedAt}, + }, nil + }, + MarkFailed: func(ctx context.Context, id, errorMsg string, now time.Time) error { + markedMsg = errorMsg + return nil + }, + OrphanTimeout: defaultOrphanTimeout, + ReconcileInterval: defaultReconcileInterval, + } + + n, err := reconciler.ReconcileOnce(context.Background()) + if err != nil { + t.Fatalf("ReconcileOnce() error = %v", err) + } + if n != 1 { + t.Errorf("reconciled count = %d, want 1", n) + } + wantMsg := "execution orphaned: k8s job finished but worker did not report status" + if markedMsg != wantMsg { + t.Errorf("marked message = %q, want %q", markedMsg, wantMsg) + } +} + +func TestReconcileOnce_MultipleExecutions(t *testing.T) { + job1 := "st-exec-multi1" + job2 := "st-exec-multi2" + job3 := "st-exec-multi3" + secret1 := WorkerTokenSecretPrefix + "multi1" + secret2 := WorkerTokenSecretPrefix + "multi2" + var markedIDs []string + sd := &testSecretDeleter{} + + getter := &testJobStatusGetter{ + Fn: func(ctx context.Context, name string) (*JobStatus, error) { + switch name { + case job1, job2: + return &JobStatus{FailedCondition: true, Failed: 1}, nil + case job3: + return &JobStatus{Active: 1}, nil + default: + return &JobStatus{}, nil + } + }, + } + + startedAt := time.Now().Add(-10 * time.Minute) + reconciler := &ExecutionReconciler{ + JobStatusGetter: getter, + SecretDeleter: sd, + ListRunning: func(ctx context.Context) ([]RunningExecution, error) { + return []RunningExecution{ + {ID: "multi1", K8sJobName: &job1, WorkerTokenSecret: &secret1, StartedAt: &startedAt}, + {ID: "multi2", K8sJobName: &job2, WorkerTokenSecret: &secret2, StartedAt: &startedAt}, + {ID: "multi3", K8sJobName: &job3, StartedAt: &startedAt}, + }, nil + }, + MarkFailed: func(ctx context.Context, id, errorMsg string, now time.Time) error { + markedIDs = append(markedIDs, id) + return nil + }, + OrphanTimeout: defaultOrphanTimeout, + ReconcileInterval: defaultReconcileInterval, + } + + n, err := reconciler.ReconcileOnce(context.Background()) + if err != nil { + t.Fatalf("ReconcileOnce() error = %v", err) + } + if n != 2 { + t.Errorf("reconciled count = %d, want 2", n) + } + if len(markedIDs) != 2 { + t.Fatalf("marked count = %d, want 2", len(markedIDs)) + } + if markedIDs[0] != "multi1" { + t.Errorf("first marked ID = %q, want %q", markedIDs[0], "multi1") + } + if markedIDs[1] != "multi2" { + t.Errorf("second marked ID = %q, want %q", markedIDs[1], "multi2") + } +} + +func TestReconcileOnce_SecretCleanup(t *testing.T) { + jobName := "st-exec-secret1" + secretName := WorkerTokenSecretPrefix + "secret1" + sd := &testSecretDeleter{} + + getter := &testJobStatusGetter{ + Fn: func(ctx context.Context, name string) (*JobStatus, error) { + return &JobStatus{Completed: true, Succeeded: 1}, nil + }, + } + + startedAt := time.Now().Add(-10 * time.Minute) + reconciler := &ExecutionReconciler{ + JobStatusGetter: getter, + SecretDeleter: sd, + ListRunning: func(ctx context.Context) ([]RunningExecution, error) { + return []RunningExecution{ + {ID: "secret1", K8sJobName: &jobName, WorkerTokenSecret: &secretName, StartedAt: &startedAt}, + }, nil + }, + MarkFailed: func(ctx context.Context, id, errorMsg string, now time.Time) error { + return nil + }, + OrphanTimeout: defaultOrphanTimeout, + ReconcileInterval: defaultReconcileInterval, + } + + n, err := reconciler.ReconcileOnce(context.Background()) + if err != nil { + t.Fatalf("ReconcileOnce() error = %v", err) + } + if n != 1 { + t.Errorf("reconciled count = %d, want 1", n) + } + if len(sd.deleted) != 1 { + t.Fatalf("secret deletions = %d, want 1", len(sd.deleted)) + } + if sd.deleted[0] != "st-worker-token-secret1" { + t.Errorf("deleted secret = %q, want %q", sd.deleted[0], "st-worker-token-secret1") + } +} + +func TestReconcileOnce_SecretCleanupError_DoesNotBlock(t *testing.T) { + jobName := "st-exec-secerr" + secretName := WorkerTokenSecretPrefix + "secerr" + sd := &testSecretDeleter{err: context.DeadlineExceeded} + var markedID string + + getter := &testJobStatusGetter{ + Fn: func(ctx context.Context, name string) (*JobStatus, error) { + return &JobStatus{FailedCondition: true, Failed: 1}, nil + }, + } + + startedAt := time.Now().Add(-10 * time.Minute) + reconciler := &ExecutionReconciler{ + JobStatusGetter: getter, + SecretDeleter: sd, + ListRunning: func(ctx context.Context) ([]RunningExecution, error) { + return []RunningExecution{ + {ID: "secerr", K8sJobName: &jobName, WorkerTokenSecret: &secretName, StartedAt: &startedAt}, + }, nil + }, + MarkFailed: func(ctx context.Context, id, errorMsg string, now time.Time) error { + markedID = id + return nil + }, + OrphanTimeout: defaultOrphanTimeout, + ReconcileInterval: defaultReconcileInterval, + } + + n, err := reconciler.ReconcileOnce(context.Background()) + if err != nil { + t.Fatalf("ReconcileOnce() error = %v", err) + } + if n != 1 { + t.Errorf("reconciled count = %d, want 1 (secret cleanup failure should not block)", n) + } + if markedID != "secerr" { + t.Errorf("marked ID = %q, want %q", markedID, "secerr") + } +} + +func TestNewExecutionReconciler_Defaults(t *testing.T) { + r := NewExecutionReconciler(nil, nil, nil, nil) + if r.OrphanTimeout != defaultOrphanTimeout { + t.Errorf("OrphanTimeout = %v, want %v", r.OrphanTimeout, defaultOrphanTimeout) + } + if r.ReconcileInterval != defaultReconcileInterval { + t.Errorf("ReconcileInterval = %v, want %v", r.ReconcileInterval, defaultReconcileInterval) + } +} + +func TestNewExecutionReconciler_EnvOverrides(t *testing.T) { + os.Setenv("ST_RECONCILE_ORPHAN_TIMEOUT", "10m") + os.Setenv("ST_RECONCILE_INTERVAL", "30s") + defer func() { + os.Unsetenv("ST_RECONCILE_ORPHAN_TIMEOUT") + os.Unsetenv("ST_RECONCILE_INTERVAL") + }() + + r := NewExecutionReconciler(nil, nil, nil, nil) + if r.OrphanTimeout != 10*time.Minute { + t.Errorf("OrphanTimeout = %v, want %v", r.OrphanTimeout, 10*time.Minute) + } + if r.ReconcileInterval != 30*time.Second { + t.Errorf("ReconcileInterval = %v, want %v", r.ReconcileInterval, 30*time.Second) + } +} + +func TestEnvOrDuration_InvalidInput(t *testing.T) { + os.Setenv("ST_RECONCILE_INTERVAL", "not-a-duration") + defer os.Unsetenv("ST_RECONCILE_INTERVAL") + + result := envOrDuration("ST_RECONCILE_INTERVAL", defaultReconcileInterval) + if result != defaultReconcileInterval { + t.Errorf("envOrDuration with invalid input = %v, want default %v", result, defaultReconcileInterval) + } +} + +func TestEnvOrDuration_ValidInput(t *testing.T) { + os.Setenv("ST_RECONCILE_INTERVAL", "45s") + defer os.Unsetenv("ST_RECONCILE_INTERVAL") + + result := envOrDuration("ST_RECONCILE_INTERVAL", defaultReconcileInterval) + if result != 45*time.Second { + t.Errorf("envOrDuration with valid input = %v, want %v", result, 45*time.Second) + } +} + +func TestEnvOrDuration_EmptyEnv(t *testing.T) { + result := envOrDuration("ST_RECONCILE_INTERVAL_UNSET_XYZ", defaultReconcileInterval) + if result != defaultReconcileInterval { + t.Errorf("envOrDuration with empty env = %v, want default %v", result, defaultReconcileInterval) + } +} + +func TestWorkerTokenSecretPrefix(t *testing.T) { + if WorkerTokenSecretPrefix != "st-worker-token-" { + t.Errorf("WorkerTokenSecretPrefix = %q, want %q", WorkerTokenSecretPrefix, "st-worker-token-") + } +} + +func TestCreateJob_SecurityContextAndResources(t *testing.T) { + fakeClient := fake.NewSimpleClientset(&corev1.SecretList{}) + client := &Client{clientset: fakeClient, namespace: "test-ns"} + + cfg := JobConfig{ + Name: "st-exec-testsec", + Image: "scaledtest/worker:latest", + Command: "npm test", + ExecutionID: "testsec", + WorkerToken: "secret-token-value", + APIBaseURL: "http://api:8080", + CPURequest: "250m", + CPULimit: "500m", + MemoryRequest: "128Mi", + MemoryLimit: "512Mi", + } + + result, err := client.CreateJob(context.Background(), cfg) + if err != nil { + t.Fatalf("CreateJob() error = %v", err) + } + if result.Job == nil { + t.Fatal("CreateJob() returned nil Job") + } + + job := result.Job + + if result.WorkerTokenSecret != "st-worker-token-testsec" { + t.Errorf("WorkerTokenSecret = %q, want %q", result.WorkerTokenSecret, "st-worker-token-testsec") + } + if !result.AutoCreatedSecret { + t.Error("AutoCreatedSecret = false, want true") + } + + podSpec := job.Spec.Template.Spec + container := podSpec.Containers[0] + + if podSpec.AutomountServiceAccountToken == nil || *podSpec.AutomountServiceAccountToken != false { + t.Error("AutomountServiceAccountToken should be false") + } + + if podSpec.SecurityContext == nil { + t.Fatal("Pod SecurityContext is nil") + } + podSC := podSpec.SecurityContext + if podSC.RunAsNonRoot == nil || !*podSC.RunAsNonRoot { + t.Error("Pod RunAsNonRoot should be true") + } + if podSC.RunAsUser == nil || *podSC.RunAsUser != 1000 { + t.Error("Pod RunAsUser should be 1000") + } + if podSC.FSGroup == nil || *podSC.FSGroup != 1000 { + t.Error("Pod FSGroup should be 1000") + } + if podSC.SeccompProfile == nil || podSC.SeccompProfile.Type != corev1.SeccompProfileTypeRuntimeDefault { + t.Error("Pod SeccompProfile should be RuntimeDefault") + } + + if container.SecurityContext == nil { + t.Fatal("Container SecurityContext is nil") + } + contSC := container.SecurityContext + if contSC.RunAsNonRoot == nil || !*contSC.RunAsNonRoot { + t.Error("Container RunAsNonRoot should be true") + } + if contSC.RunAsUser == nil || *contSC.RunAsUser != 1000 { + t.Error("Container RunAsUser should be 1000") + } + if contSC.ReadOnlyRootFilesystem == nil || !*contSC.ReadOnlyRootFilesystem { + t.Error("Container ReadOnlyRootFilesystem should be true") + } + if contSC.AllowPrivilegeEscalation == nil || *contSC.AllowPrivilegeEscalation != false { + t.Error("Container AllowPrivilegeEscalation should be false") + } + if contSC.Capabilities == nil || len(contSC.Capabilities.Drop) != 1 || contSC.Capabilities.Drop[0] != "ALL" { + t.Error("Container Capabilities.Drop should be [ALL]") + } + + cpuReq := container.Resources.Requests[corev1.ResourceCPU] + if cpuReq.String() != "250m" { + t.Errorf("CPU request = %q, want %q", cpuReq.String(), "250m") + } + cpuLim := container.Resources.Limits[corev1.ResourceCPU] + if cpuLim.String() != "500m" { + t.Errorf("CPU limit = %q, want %q", cpuLim.String(), "500m") + } + memReq := container.Resources.Requests[corev1.ResourceMemory] + if memReq.String() != "128Mi" { + t.Errorf("Memory request = %q, want %q", memReq.String(), "128Mi") + } + memLim := container.Resources.Limits[corev1.ResourceMemory] + if memLim.String() != "512Mi" { + t.Errorf("Memory limit = %q, want %q", memLim.String(), "512Mi") + } + + foundToken := false + for _, env := range container.Env { + if env.Name == "ST_WORKER_TOKEN" { + foundToken = true + if env.ValueFrom == nil || env.ValueFrom.SecretKeyRef == nil { + t.Error("ST_WORKER_TOKEN should use SecretKeyRef") + } else { + if env.ValueFrom.SecretKeyRef.Name != "st-worker-token-testsec" { + t.Errorf("ST_WORKER_TOKEN SecretKeyRef.Name = %q, want %q", env.ValueFrom.SecretKeyRef.Name, "st-worker-token-testsec") + } + if env.ValueFrom.SecretKeyRef.Key != "ST_WORKER_TOKEN" { + t.Errorf("ST_WORKER_TOKEN SecretKeyRef.Key = %q, want %q", env.ValueFrom.SecretKeyRef.Key, "ST_WORKER_TOKEN") + } + } + if env.Value != "" { + t.Error("ST_WORKER_TOKEN should not have a plain Value when using SecretKeyRef") + } + } + } + if !foundToken { + t.Error("ST_WORKER_TOKEN env var not found") + } +} + +func TestCreateJob_WithPreExistingSecret(t *testing.T) { + fakeClient := fake.NewSimpleClientset(&corev1.SecretList{}) + client := &Client{clientset: fakeClient, namespace: "test-ns"} + + cfg := JobConfig{ + Name: "st-exec-presec", + Image: "scaledtest/worker:latest", + Command: "npm test", + ExecutionID: "presec", + WorkerToken: "secret-token-value", + WorkerTokenSecret: "my-pre-existing-secret", + APIBaseURL: "http://api:8080", + } + + result, err := client.CreateJob(context.Background(), cfg) + if err != nil { + t.Fatalf("CreateJob() error = %v", err) + } + + if result.WorkerTokenSecret != "my-pre-existing-secret" { + t.Errorf("WorkerTokenSecret = %q, want %q", result.WorkerTokenSecret, "my-pre-existing-secret") + } + if result.AutoCreatedSecret { + t.Error("AutoCreatedSecret = true for pre-existing secret, want false") + } + + container := result.Job.Spec.Template.Spec.Containers[0] + foundToken := false + for _, env := range container.Env { + if env.Name == "ST_WORKER_TOKEN" { + foundToken = true + if env.ValueFrom == nil || env.ValueFrom.SecretKeyRef == nil { + t.Error("ST_WORKER_TOKEN should use SecretKeyRef") + } else if env.ValueFrom.SecretKeyRef.Name != "my-pre-existing-secret" { + t.Errorf("ST_WORKER_TOKEN SecretKeyRef.Name = %q, want %q", env.ValueFrom.SecretKeyRef.Name, "my-pre-existing-secret") + } + } + } + if !foundToken { + t.Error("ST_WORKER_TOKEN env var not found") + } + + secrets, err := fakeClient.CoreV1().Secrets("test-ns").List(context.Background(), metav1.ListOptions{}) + if err != nil { + t.Fatalf("list secrets: %v", err) + } + if len(secrets.Items) != 0 { + t.Errorf("expected 0 auto-created secrets, got %d", len(secrets.Items)) + } +} + +func TestCreateJob_SecretCleanedUpOnJobFailure(t *testing.T) { + fakeClient := fake.NewSimpleClientset() + fakeClient.PrependReactor("create", "jobs", func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) { + return true, nil, fmt.Errorf("job creation failed") + }) + + cfg := JobConfig{ + Name: "st-exec-failsec", + Image: "scaledtest/worker:latest", + Command: "npm test", + ExecutionID: "failsec", + WorkerToken: "secret-token-value", + APIBaseURL: "http://api:8080", + } + + client := &Client{clientset: fakeClient, namespace: "test-ns"} + + result, err := client.CreateJob(context.Background(), cfg) + if err == nil { + t.Fatal("CreateJob() should fail when Job creation fails, but it didn't") + } + if result != nil { + t.Errorf("CreateJob() result should be nil on failure, got %+v", result) + } + + secrets, err := fakeClient.CoreV1().Secrets("test-ns").List(context.Background(), metav1.ListOptions{}) + if err != nil { + t.Fatalf("list secrets: %v", err) + } + if len(secrets.Items) != 0 { + t.Errorf("auto-created Secret should be cleaned up on Job creation failure, but found %d secrets", len(secrets.Items)) + } +} + +func TestReconcileOnce_SkipsPreExistingSecret(t *testing.T) { + jobName := "st-exec-presec1" + preExistingSecret := "my-team-secret" + sd := &testSecretDeleter{} + var markedIDs []string + + getter := &testJobStatusGetter{ + Fn: func(ctx context.Context, name string) (*JobStatus, error) { + return &JobStatus{FailedCondition: true, Failed: 1}, nil + }, + } + + startedAt := time.Now().Add(-10 * time.Minute) + reconciler := &ExecutionReconciler{ + JobStatusGetter: getter, + SecretDeleter: sd, + ListRunning: func(ctx context.Context) ([]RunningExecution, error) { + return []RunningExecution{ + {ID: "presec1", K8sJobName: &jobName, WorkerTokenSecret: &preExistingSecret, StartedAt: &startedAt}, + }, nil + }, + MarkFailed: func(ctx context.Context, id, errorMsg string, now time.Time) error { + markedIDs = append(markedIDs, id) + return nil + }, + OrphanTimeout: defaultOrphanTimeout, + ReconcileInterval: defaultReconcileInterval, + } + + n, err := reconciler.ReconcileOnce(context.Background()) + if err != nil { + t.Fatalf("ReconcileOnce() error = %v", err) + } + if n != 1 { + t.Errorf("reconciled count = %d, want 1", n) + } + if len(markedIDs) != 1 || markedIDs[0] != "presec1" { + t.Errorf("marked IDs = %v, want [presec1]", markedIDs) + } + if len(sd.deleted) != 0 { + t.Errorf("pre-existing secret should NOT be deleted, but got deletions: %v", sd.deleted) + } +} + +func TestReconcileOnce_NilWorkerTokenSecret_SkipsDeletion(t *testing.T) { + jobName := "st-exec-nosec1" + sd := &testSecretDeleter{} + var markedIDs []string + + getter := &testJobStatusGetter{ + Fn: func(ctx context.Context, name string) (*JobStatus, error) { + return &JobStatus{FailedCondition: true, Failed: 1}, nil + }, + } + + startedAt := time.Now().Add(-10 * time.Minute) + reconciler := &ExecutionReconciler{ + JobStatusGetter: getter, + SecretDeleter: sd, + ListRunning: func(ctx context.Context) ([]RunningExecution, error) { + return []RunningExecution{ + {ID: "nosec1", K8sJobName: &jobName, WorkerTokenSecret: nil, StartedAt: &startedAt}, + }, nil + }, + MarkFailed: func(ctx context.Context, id, errorMsg string, now time.Time) error { + markedIDs = append(markedIDs, id) + return nil + }, + OrphanTimeout: defaultOrphanTimeout, + ReconcileInterval: defaultReconcileInterval, + } + + n, err := reconciler.ReconcileOnce(context.Background()) + if err != nil { + t.Fatalf("ReconcileOnce() error = %v", err) + } + if n != 1 { + t.Errorf("reconciled count = %d, want 1", n) + } + if len(sd.deleted) != 0 { + t.Errorf("nil WorkerTokenSecret should not trigger deletion, but got: %v", sd.deleted) + } +} diff --git a/internal/model/model.go b/internal/model/model.go index 157f39e5..fc18161b 100644 --- a/internal/model/model.go +++ b/internal/model/model.go @@ -68,19 +68,20 @@ type APIToken struct { // TestExecution tracks a K8s Job-based test execution. type TestExecution struct { - ID string `json:"id"` - TeamID string `json:"team_id"` - Status string `json:"status"` // pending, running, completed, failed, cancelled - Command string `json:"command"` - Config json.RawMessage `json:"config,omitempty"` - ReportID *string `json:"report_id,omitempty"` - K8sJobName *string `json:"k8s_job_name,omitempty"` - K8sPodName *string `json:"k8s_pod_name,omitempty"` - ErrorMsg *string `json:"error_msg,omitempty"` - StartedAt *time.Time `json:"started_at,omitempty"` - FinishedAt *time.Time `json:"finished_at,omitempty"` - CreatedAt time.Time `json:"created_at"` - UpdatedAt time.Time `json:"updated_at"` + ID string `json:"id"` + TeamID string `json:"team_id"` + Status string `json:"status"` // pending, running, completed, failed, cancelled + Command string `json:"command"` + Config json.RawMessage `json:"config,omitempty"` + ReportID *string `json:"report_id,omitempty"` + K8sJobName *string `json:"k8s_job_name,omitempty"` + WorkerTokenSecret *string `json:"-"` + K8sPodName *string `json:"k8s_pod_name,omitempty"` + ErrorMsg *string `json:"error_msg,omitempty"` + StartedAt *time.Time `json:"started_at,omitempty"` + FinishedAt *time.Time `json:"finished_at,omitempty"` + CreatedAt time.Time `json:"created_at"` + UpdatedAt time.Time `json:"updated_at"` } // TestReport represents a CTRF test report. @@ -150,18 +151,18 @@ type QualityGateEvaluation struct { // TestDurationHistory tracks historical test durations for intelligent sharding. type TestDurationHistory struct { - ID string `json:"id"` - TeamID string `json:"team_id"` - TestName string `json:"test_name"` - Suite string `json:"suite"` - AvgDurationMs int64 `json:"avg_duration_ms"` - P95DurationMs int64 `json:"p95_duration_ms"` - MinDurationMs int64 `json:"min_duration_ms"` - MaxDurationMs int64 `json:"max_duration_ms"` - RunCount int `json:"run_count"` - LastStatus string `json:"last_status"` - UpdatedAt time.Time `json:"updated_at"` - CreatedAt time.Time `json:"created_at"` + ID string `json:"id"` + TeamID string `json:"team_id"` + TestName string `json:"test_name"` + Suite string `json:"suite"` + AvgDurationMs int64 `json:"avg_duration_ms"` + P95DurationMs int64 `json:"p95_duration_ms"` + MinDurationMs int64 `json:"min_duration_ms"` + MaxDurationMs int64 `json:"max_duration_ms"` + RunCount int `json:"run_count"` + LastStatus string `json:"last_status"` + UpdatedAt time.Time `json:"updated_at"` + CreatedAt time.Time `json:"created_at"` } // AuditLog records a user action for compliance and observability. @@ -180,28 +181,28 @@ type AuditLog struct { // ShardAssignment represents a test assigned to a specific worker shard. type ShardAssignment struct { - WorkerID string `json:"worker_id"` - TestName string `json:"test_name"` - Suite string `json:"suite,omitempty"` - EstDurationMs int64 `json:"est_duration_ms"` + WorkerID string `json:"worker_id"` + TestName string `json:"test_name"` + Suite string `json:"suite,omitempty"` + EstDurationMs int64 `json:"est_duration_ms"` } // ShardPlan is the complete distribution plan for a sharded execution. type ShardPlan struct { - ExecutionID string `json:"execution_id"` - TotalWorkers int `json:"total_workers"` - Strategy string `json:"strategy"` // duration_balanced, round_robin, suite_grouped - Shards []Shard `json:"shards"` - EstTotalMs int64 `json:"est_total_ms"` - EstWallClockMs int64 `json:"est_wall_clock_ms"` + ExecutionID string `json:"execution_id"` + TotalWorkers int `json:"total_workers"` + Strategy string `json:"strategy"` // duration_balanced, round_robin, suite_grouped + Shards []Shard `json:"shards"` + EstTotalMs int64 `json:"est_total_ms"` + EstWallClockMs int64 `json:"est_wall_clock_ms"` } // Shard represents one worker's assigned tests. type Shard struct { - WorkerID string `json:"worker_id"` - TestNames []string `json:"test_names"` - EstDurationMs int64 `json:"est_duration_ms"` - TestCount int `json:"test_count"` + WorkerID string `json:"worker_id"` + TestNames []string `json:"test_names"` + EstDurationMs int64 `json:"est_duration_ms"` + TestCount int `json:"test_count"` } // Webhook represents a webhook subscription. diff --git a/internal/model/model_test.go b/internal/model/model_test.go new file mode 100644 index 00000000..8ed14195 --- /dev/null +++ b/internal/model/model_test.go @@ -0,0 +1,55 @@ +package model + +import ( + "encoding/json" + "testing" +) + +func TestTestExecution_WorkerTokenSecret_NotSerializedInJSON(t *testing.T) { + secret := "st-worker-token-exec-123" + e := TestExecution{ + ID: "exec-123", + TeamID: "team-1", + Status: "running", + Command: "npm test", + WorkerTokenSecret: &secret, + } + + data, err := json.Marshal(e) + if err != nil { + t.Fatalf("Marshal TestExecution: %v", err) + } + + var m map[string]interface{} + if err := json.Unmarshal(data, &m); err != nil { + t.Fatalf("Unmarshal marshalled TestExecution: %v", err) + } + + if _, ok := m["worker_token_secret"]; ok { + t.Errorf("worker_token_secret should not appear in JSON output, got: %s", string(data)) + } +} + +func TestTestExecution_WorkerTokenSecret_NilNotSerializedInJSON(t *testing.T) { + e := TestExecution{ + ID: "exec-456", + TeamID: "team-2", + Status: "completed", + Command: "pytest", + WorkerTokenSecret: nil, + } + + data, err := json.Marshal(e) + if err != nil { + t.Fatalf("Marshal TestExecution: %v", err) + } + + var m map[string]interface{} + if err := json.Unmarshal(data, &m); err != nil { + t.Fatalf("Unmarshal marshalled TestExecution: %v", err) + } + + if _, ok := m["worker_token_secret"]; ok { + t.Errorf("worker_token_secret should not appear in JSON output even when nil, got: %s", string(data)) + } +} diff --git a/internal/server/routes.go b/internal/server/routes.go index b2ba8b32..e50b7177 100644 --- a/internal/server/routes.go +++ b/internal/server/routes.go @@ -1,6 +1,7 @@ package server import ( + "context" "net/http" "strings" "time" @@ -30,7 +31,8 @@ import ( // NewRouter creates the chi router with all middleware and route groups. // pool may be nil when running without a database (dev mode). -func NewRouter(cfg *config.Config, pool ...*db.Pool) http.Handler { +// If a k8s reconciler is created, it is returned so the caller can Start/Stop it. +func NewRouter(cfg *config.Config, pool ...*db.Pool) (http.Handler, *k8s.ExecutionReconciler) { var dbPool *db.Pool if len(pool) > 0 { dbPool = pool[0] @@ -68,7 +70,7 @@ func NewRouter(cfg *config.Config, pool ...*db.Pool) http.Handler { jwtMgr, err := auth.NewJWTManager(cfg.JWTSecret, accessDur, refreshDur) if err != nil { log.Fatal().Err(err).Msg("invalid JWT configuration") - return nil + return nil, nil } // Auth middleware with API token lookup @@ -85,7 +87,7 @@ func NewRouter(cfg *config.Config, pool ...*db.Pool) http.Handler { csrfMW, err := auth.CSRFMiddleware([]byte(cfg.JWTSecret)) if err != nil { log.Fatal().Err(err).Msg("failed to create CSRF middleware") - return nil + return nil, nil } // Stores @@ -106,6 +108,34 @@ func NewRouter(cfg *config.Config, pool ...*db.Pool) http.Handler { k8sClient = k8sC } + var reconciler *k8s.ExecutionReconciler + if k8sClient != nil && dbPool != nil { + execStore := store.NewExecutionsStore(dbPool) + reconciler = k8s.NewExecutionReconciler( + k8sClient, + k8sClient, + func(ctx context.Context) ([]k8s.RunningExecution, error) { + executions, err := execStore.ListRunning(ctx) + if err != nil { + return nil, err + } + result := make([]k8s.RunningExecution, 0, len(executions)) + for _, e := range executions { + result = append(result, k8s.RunningExecution{ + ID: e.ID, + K8sJobName: e.K8sJobName, + WorkerTokenSecret: e.WorkerTokenSecret, + StartedAt: e.StartedAt, + }) + } + return result, nil + }, + func(ctx context.Context, id, errorMsg string, now time.Time) error { + return execStore.MarkFailed(ctx, id, errorMsg, now) + }, + ) + } + var whStore *store.WebhookStore if dbPool != nil { whStore = store.NewWebhookStore(dbPool) @@ -374,10 +404,10 @@ func NewRouter(cfg *config.Config, pool ...*db.Pool) http.Handler { // SPA fallback — serves embedded React app if err := spa.Mount(r); err != nil { log.Fatal().Err(err).Msg("failed to mount SPA") - return nil + return nil, nil } - return r + return r, reconciler } func zerologMiddleware(next http.Handler) http.Handler { diff --git a/internal/server/routes_test.go b/internal/server/routes_test.go index d0d48033..0d9e7c73 100644 --- a/internal/server/routes_test.go +++ b/internal/server/routes_test.go @@ -61,7 +61,7 @@ func addCSRF(req *http.Request, token string, cookie *http.Cookie) { } func TestHealthEndpoint(t *testing.T) { - router := NewRouter(testConfig(), nil) + router, _ := NewRouter(testConfig(), nil) // No Authorization header — endpoint must be publicly accessible. req := httptest.NewRequest("GET", "/health", nil) @@ -91,7 +91,7 @@ func TestHealthEndpoint(t *testing.T) { } func TestPublicAuthEndpoints(t *testing.T) { - router := NewRouter(testConfig(), nil) + router, _ := NewRouter(testConfig(), nil) endpoints := []struct { method string @@ -118,7 +118,7 @@ func TestPublicAuthEndpoints(t *testing.T) { } func TestAuthenticatedEndpointsRequireToken(t *testing.T) { - router := NewRouter(testConfig(), nil) + router, _ := NewRouter(testConfig(), nil) endpoints := []struct { method string @@ -143,7 +143,7 @@ func TestAuthenticatedEndpointsRequireToken(t *testing.T) { } func TestAuthenticatedEndpointsWithToken(t *testing.T) { - router := NewRouter(testConfig(), nil) + router, _ := NewRouter(testConfig(), nil) token := testToken() endpoints := []struct { @@ -175,7 +175,7 @@ func TestAuthenticatedEndpointsWithToken(t *testing.T) { } func TestAdminEndpointRequiresOwnerRole(t *testing.T) { - router := NewRouter(testConfig(), nil) + router, _ := NewRouter(testConfig(), nil) // Create a token with readonly role mgr, _ := auth.NewJWTManager(testJWTSecret, 15*time.Minute, 7*24*time.Hour) @@ -192,7 +192,7 @@ func TestAdminEndpointRequiresOwnerRole(t *testing.T) { } func TestCTRFReportIngestion(t *testing.T) { - router := NewRouter(testConfig(), nil) + router, _ := NewRouter(testConfig(), nil) token := testToken() csrfToken, csrfCookie := testCSRFToken(t, router) @@ -211,7 +211,7 @@ func TestCTRFReportIngestion(t *testing.T) { } func TestCTRFReportInvalidPayload(t *testing.T) { - router := NewRouter(testConfig(), nil) + router, _ := NewRouter(testConfig(), nil) token := testToken() csrfToken, csrfCookie := testCSRFToken(t, router) @@ -228,7 +228,7 @@ func TestCTRFReportInvalidPayload(t *testing.T) { } func TestCSRFTokenEndpoint(t *testing.T) { - router := NewRouter(testConfig(), nil) + router, _ := NewRouter(testConfig(), nil) csrfToken, cookie := testCSRFToken(t, router) if csrfToken == "" { @@ -243,7 +243,7 @@ func TestCSRFTokenEndpoint(t *testing.T) { } func TestCSRFAllowsBearerJWTWithoutCSRF(t *testing.T) { - router := NewRouter(testConfig(), nil) + router, _ := NewRouter(testConfig(), nil) token := testToken() // Bearer JWT POSTs should NOT be blocked by CSRF — the Authorization header @@ -260,7 +260,7 @@ func TestCSRFAllowsBearerJWTWithoutCSRF(t *testing.T) { } func TestAuthRateLimiting(t *testing.T) { - router := NewRouter(testConfig(), nil) + router, _ := NewRouter(testConfig(), nil) // Auth endpoints allow 10 requests per minute per IP. // Send 11 requests — the 11th should be rate-limited. @@ -290,7 +290,7 @@ func TestAuthRateLimiting(t *testing.T) { } func TestExecutionCreateRateLimiting(t *testing.T) { - router := NewRouter(testConfig(), nil) + router, _ := NewRouter(testConfig(), nil) token := testToken() csrfToken, csrfCookie := testCSRFToken(t, router) @@ -326,7 +326,7 @@ func TestExecutionCreateRateLimiting(t *testing.T) { } func TestReportUploadRateLimiting(t *testing.T) { - router := NewRouter(testConfig(), nil) + router, _ := NewRouter(testConfig(), nil) token := testToken() csrfToken, csrfCookie := testCSRFToken(t, router) @@ -364,7 +364,7 @@ func TestReportUploadRateLimiting(t *testing.T) { } func TestCSRFAllowsAPITokenWithoutCSRF(t *testing.T) { - router := NewRouter(testConfig(), nil) + router, _ := NewRouter(testConfig(), nil) // API tokens (sct_) should bypass CSRF — they'll fail auth (no DB) but not CSRF req := httptest.NewRequest("POST", "/api/v1/reports", strings.NewReader(`{}`)) @@ -380,7 +380,7 @@ func TestCSRFAllowsAPITokenWithoutCSRF(t *testing.T) { } func TestReadonlyCannotCreateReport(t *testing.T) { - router := NewRouter(testConfig(), nil) + router, _ := NewRouter(testConfig(), nil) csrfToken, csrfCookie := testCSRFToken(t, router) // Create a token with readonly role @@ -401,7 +401,7 @@ func TestReadonlyCannotCreateReport(t *testing.T) { } func TestReadonlyCannotDeleteReport(t *testing.T) { - router := NewRouter(testConfig(), nil) + router, _ := NewRouter(testConfig(), nil) csrfToken, csrfCookie := testCSRFToken(t, router) mgr, _ := auth.NewJWTManager(testJWTSecret, 15*time.Minute, 7*24*time.Hour) @@ -419,7 +419,7 @@ func TestReadonlyCannotDeleteReport(t *testing.T) { } func TestMaintainerCanCreateReport(t *testing.T) { - router := NewRouter(testConfig(), nil) + router, _ := NewRouter(testConfig(), nil) csrfToken, csrfCookie := testCSRFToken(t, router) mgr, _ := auth.NewJWTManager(testJWTSecret, 15*time.Minute, 7*24*time.Hour) @@ -440,7 +440,7 @@ func TestMaintainerCanCreateReport(t *testing.T) { } func TestReadonlyCannotCreateExecution(t *testing.T) { - router := NewRouter(testConfig(), nil) + router, _ := NewRouter(testConfig(), nil) csrfToken, csrfCookie := testCSRFToken(t, router) mgr, _ := auth.NewJWTManager(testJWTSecret, 15*time.Minute, 7*24*time.Hour) @@ -459,7 +459,7 @@ func TestReadonlyCannotCreateExecution(t *testing.T) { } func TestReadonlyCannotCancelExecution(t *testing.T) { - router := NewRouter(testConfig(), nil) + router, _ := NewRouter(testConfig(), nil) csrfToken, csrfCookie := testCSRFToken(t, router) mgr, _ := auth.NewJWTManager(testJWTSecret, 15*time.Minute, 7*24*time.Hour) @@ -477,7 +477,7 @@ func TestReadonlyCannotCancelExecution(t *testing.T) { } func TestMaintainerCanCreateExecution(t *testing.T) { - router := NewRouter(testConfig(), nil) + router, _ := NewRouter(testConfig(), nil) csrfToken, csrfCookie := testCSRFToken(t, router) mgr, _ := auth.NewJWTManager(testJWTSecret, 15*time.Minute, 7*24*time.Hour) @@ -497,7 +497,7 @@ func TestMaintainerCanCreateExecution(t *testing.T) { } func TestReadonlyCanListExecutions(t *testing.T) { - router := NewRouter(testConfig(), nil) + router, _ := NewRouter(testConfig(), nil) mgr, _ := auth.NewJWTManager(testJWTSecret, 15*time.Minute, 7*24*time.Hour) pair, _ := mgr.GenerateTokenPair("user-ro", "readonly@example.com", "readonly", "team-1") @@ -514,7 +514,7 @@ func TestReadonlyCanListExecutions(t *testing.T) { } func TestReadonlyCanListReports(t *testing.T) { - router := NewRouter(testConfig(), nil) + router, _ := NewRouter(testConfig(), nil) mgr, _ := auth.NewJWTManager(testJWTSecret, 15*time.Minute, 7*24*time.Hour) pair, _ := mgr.GenerateTokenPair("user-ro", "readonly@example.com", "readonly", "team-1") @@ -577,7 +577,7 @@ func TestRateLimitMWDisabledViaConfig(t *testing.T) { // regardless of request volume. cfg := testConfig() cfg.DisableRateLimit = true - router := NewRouter(cfg, nil) + router, _ := NewRouter(cfg, nil) for i := 0; i < 15; i++ { req := httptest.NewRequest("POST", "/auth/login", strings.NewReader(`{"email":"a@b.com","password":"12345678"}`)) diff --git a/internal/server/team_access_test.go b/internal/server/team_access_test.go index 21ac4fca..ff87b995 100644 --- a/internal/server/team_access_test.go +++ b/internal/server/team_access_test.go @@ -25,7 +25,7 @@ func tokenForTeam(t *testing.T, userID, email, role, teamID string) string { } func TestTeamIsolation_DifferentTeamTokensAccepted(t *testing.T) { - router := NewRouter(testConfig()) + router, _ := NewRouter(testConfig()) teamAToken := tokenForTeam(t, "user-a", "a@example.com", "owner", "team-alpha") teamBToken := tokenForTeam(t, "user-b", "b@example.com", "owner", "team-beta") @@ -159,7 +159,7 @@ func TestTeamIsolation_CrossTeamTokensDiffer(t *testing.T) { } func TestTeamIsolation_AdminEndpointRequiresOwnerRole(t *testing.T) { - router := NewRouter(testConfig()) + router, _ := NewRouter(testConfig()) tests := []struct { name string @@ -189,7 +189,7 @@ func TestTeamIsolation_AdminEndpointRequiresOwnerRole(t *testing.T) { } func TestTeamIsolation_NoTokenReturnsUnauthorized(t *testing.T) { - router := NewRouter(testConfig()) + router, _ := NewRouter(testConfig()) endpoints := []string{ "/api/v1/reports", @@ -217,7 +217,7 @@ func TestTeamIsolation_NoTokenReturnsUnauthorized(t *testing.T) { } func TestTeamIsolation_InvalidTokenRejected(t *testing.T) { - router := NewRouter(testConfig()) + router, _ := NewRouter(testConfig()) invalidTokens := []struct { name string @@ -297,7 +297,7 @@ func TestTeamIsolation_APITokenWithTeamScope(t *testing.T) { func TestTeamIsolation_TeamScopedEndpointResponses(t *testing.T) { // Verify that team-scoped endpoints return valid JSON with expected structure. // When DB is wired, these should return only data for the requesting team. - router := NewRouter(testConfig()) + router, _ := NewRouter(testConfig()) teamAToken := tokenForTeam(t, "user-a", "a@example.com", "owner", "team-alpha") @@ -342,7 +342,7 @@ func TestTeamIsolation_TeamScopedEndpointResponses(t *testing.T) { } func TestTeamIsolation_CTRFIngestionAcceptsBothTeams(t *testing.T) { - router := NewRouter(testConfig()) + router, _ := NewRouter(testConfig()) report := `{"results":{"tool":{"name":"jest"},"summary":{"tests":1,"passed":1,"failed":0,"skipped":0,"pending":0,"other":0},"tests":[{"name":"test1","status":"passed","duration":50}]}}` @@ -377,7 +377,7 @@ func TestTeamIsolation_CTRFIngestionAcceptsBothTeams(t *testing.T) { func TestTeamIsolation_RoleScopingAcrossTeams(t *testing.T) { // Verify that role enforcement applies regardless of which team the user belongs to. // A readonly user from ANY team should be denied admin access. - router := NewRouter(testConfig()) + router, _ := NewRouter(testConfig()) tests := []struct { name string @@ -417,7 +417,7 @@ func TestTeamIsolation_RoleScopingAcrossTeams(t *testing.T) { func TestTeamIsolation_EmptyTeamIDAllowed(t *testing.T) { // Tokens without a team_id (e.g., during initial login before team assignment) // should still be accepted by the auth middleware. - router := NewRouter(testConfig()) + router, _ := NewRouter(testConfig()) token := tokenForTeam(t, "user-new", "new@example.com", "owner", "") @@ -433,7 +433,7 @@ func TestTeamIsolation_EmptyTeamIDAllowed(t *testing.T) { func TestTeamIsolation_TeamTokenURLParams(t *testing.T) { // Verify team-scoped URL params are accepted (e.g., /teams/{teamID}). - router := NewRouter(testConfig()) + router, _ := NewRouter(testConfig()) tokenA := tokenForTeam(t, "user-a", "a@example.com", "owner", "team-alpha") @@ -463,7 +463,7 @@ func TestTeamIsolation_TeamTokenURLParams(t *testing.T) { func TestTeamIsolation_TeamTokenSubresources(t *testing.T) { // Verify team-scoped sub-resource routes (e.g., /teams/{teamID}/tokens). - router := NewRouter(testConfig()) + router, _ := NewRouter(testConfig()) tokenA := tokenForTeam(t, "user-a", "a@example.com", "owner", "team-alpha") diff --git a/internal/store/executions.go b/internal/store/executions.go index 98957edf..baedf9a9 100644 --- a/internal/store/executions.go +++ b/internal/store/executions.go @@ -22,7 +22,7 @@ func NewExecutionsStore(pool *pgxpool.Pool) *ExecutionsStore { func (s *ExecutionsStore) List(ctx context.Context, teamID string, limit, offset int) ([]model.TestExecution, int, error) { rows, err := s.pool.Query(ctx, `SELECT id, team_id, status, command, config, report_id, k8s_job_name, k8s_pod_name, - error_msg, started_at, finished_at, created_at, updated_at + worker_token_secret, error_msg, started_at, finished_at, created_at, updated_at FROM test_executions WHERE team_id = $1 ORDER BY created_at DESC @@ -38,7 +38,7 @@ func (s *ExecutionsStore) List(ctx context.Context, teamID string, limit, offset var e model.TestExecution if err := rows.Scan( &e.ID, &e.TeamID, &e.Status, &e.Command, &e.Config, &e.ReportID, - &e.K8sJobName, &e.K8sPodName, &e.ErrorMsg, &e.StartedAt, + &e.K8sJobName, &e.K8sPodName, &e.WorkerTokenSecret, &e.ErrorMsg, &e.StartedAt, &e.FinishedAt, &e.CreatedAt, &e.UpdatedAt, ); err != nil { return nil, 0, err @@ -74,12 +74,12 @@ func (s *ExecutionsStore) Get(ctx context.Context, id, teamID string) (*model.Te var e model.TestExecution err := s.pool.QueryRow(ctx, `SELECT id, team_id, status, command, config, report_id, k8s_job_name, k8s_pod_name, - error_msg, started_at, finished_at, created_at, updated_at + worker_token_secret, error_msg, started_at, finished_at, created_at, updated_at FROM test_executions WHERE id = $1 AND team_id = $2`, id, teamID).Scan( &e.ID, &e.TeamID, &e.Status, &e.Command, &e.Config, &e.ReportID, - &e.K8sJobName, &e.K8sPodName, &e.ErrorMsg, &e.StartedAt, + &e.K8sJobName, &e.K8sPodName, &e.WorkerTokenSecret, &e.ErrorMsg, &e.StartedAt, &e.FinishedAt, &e.CreatedAt, &e.UpdatedAt) if err != nil { return nil, err @@ -144,6 +144,36 @@ func (s *ExecutionsStore) GetK8sJobName(ctx context.Context, id string) (*string return jobName, nil } +func (s *ExecutionsStore) GetK8sJobNameByTeam(ctx context.Context, id, teamID string) (*string, error) { + var jobName *string + err := s.pool.QueryRow(ctx, + `SELECT k8s_job_name FROM test_executions WHERE id = $1 AND team_id = $2`, id, teamID).Scan(&jobName) + if err != nil { + return nil, err + } + return jobName, nil +} + +func (s *ExecutionsStore) GetWorkerTokenSecret(ctx context.Context, id string) (*string, error) { + var secretName *string + err := s.pool.QueryRow(ctx, + `SELECT worker_token_secret FROM test_executions WHERE id = $1`, id).Scan(&secretName) + if err != nil { + return nil, err + } + return secretName, nil +} + +func (s *ExecutionsStore) GetWorkerTokenSecretByTeam(ctx context.Context, id, teamID string) (*string, error) { + var secretName *string + err := s.pool.QueryRow(ctx, + `SELECT worker_token_secret FROM test_executions WHERE id = $1 AND team_id = $2`, id, teamID).Scan(&secretName) + if err != nil { + return nil, err + } + return secretName, nil +} + func (s *ExecutionsStore) SetK8sJobName(ctx context.Context, id, jobName string, now time.Time) error { _, err := s.pool.Exec(ctx, `UPDATE test_executions SET k8s_job_name = $1, updated_at = $2 WHERE id = $3`, @@ -153,7 +183,41 @@ func (s *ExecutionsStore) SetK8sJobName(ctx context.Context, id, jobName string, func (s *ExecutionsStore) MarkFailed(ctx context.Context, id, errorMsg string, now time.Time) error { _, err := s.pool.Exec(ctx, - `UPDATE test_executions SET status = 'failed', error_msg = $1, updated_at = $2 WHERE id = $3`, + `UPDATE test_executions SET status = 'failed', error_msg = $1, updated_at = $2 WHERE id = $3 AND status = 'running'`, errorMsg, now, id) return err } + +const defaultListRunningLimit = 1000 + +func (s *ExecutionsStore) ListRunning(ctx context.Context) ([]model.TestExecution, error) { + return s.ListRunningLimit(ctx, defaultListRunningLimit) +} + +func (s *ExecutionsStore) ListRunningLimit(ctx context.Context, limit int) ([]model.TestExecution, error) { + rows, err := s.pool.Query(ctx, + `SELECT id, team_id, status, command, config, report_id, k8s_job_name, k8s_pod_name, + worker_token_secret, error_msg, started_at, finished_at, created_at, updated_at + FROM test_executions + WHERE status = 'running' + ORDER BY created_at ASC + LIMIT $1`, limit) + if err != nil { + return nil, err + } + defer rows.Close() + + var executions []model.TestExecution + for rows.Next() { + var e model.TestExecution + if err := rows.Scan( + &e.ID, &e.TeamID, &e.Status, &e.Command, &e.Config, &e.ReportID, + &e.K8sJobName, &e.K8sPodName, &e.WorkerTokenSecret, &e.ErrorMsg, &e.StartedAt, + &e.FinishedAt, &e.CreatedAt, &e.UpdatedAt, + ); err != nil { + return nil, err + } + executions = append(executions, e) + } + return executions, rows.Err() +} diff --git a/k8s/README.md b/k8s/README.md index af19ec20..953b1af3 100644 --- a/k8s/README.md +++ b/k8s/README.md @@ -116,12 +116,51 @@ pick up the new image value. | Variable | Source | Purpose | |---|---|---| +| `ST_WORKER_TOKEN` | K8s Secret (`st-worker-token-`) | API token used by the worker to authenticate | | `ST_API_URL` | Config/secret | ScaledTest API base URL for progress reporting | -| `ST_WORKER_TOKEN` | Secret | API token used by the worker to authenticate | | `ST_EXECUTION_ID` | Runtime | Execution record the worker reports results into | | `ST_COMMAND` | Runtime | Playwright command, e.g. `npx playwright test` | | `E2E_BASE_URL` | Optional | Target app URL (default: `http://localhost:5173`) | +The `ST_WORKER_TOKEN` is no longer passed as a plain environment variable. Instead, each execution creates a dedicated Kubernetes Secret (`st-worker-token-`) and injects it via `secretKeyRef`. This prevents the token from being visible via `kubectl describe pod`. The Secret is automatically cleaned up when the execution is cancelled or when the reconciler detects an orphaned job. + +### Worker pod security context + +All worker pods run with a hardened security context: + +- **Non-root**: `runAsUser: 1000`, `runAsNonRoot: true` +- **Read-only root filesystem**: `readOnlyRootFilesystem: true` +- **Dropped capabilities**: `drop ALL` +- **No privilege escalation**: `allowPrivilegeEscalation: false` +- **Seccomp profile**: `RuntimeDefault` +- **Service account token**: not auto-mounted (`automountServiceAccountToken: false`) + +### Worker pod resource limits + +Each worker pod has default resource requests and limits, configurable via environment variables: + +| Variable | Default | Description | +|---|---|---| +| `ST_WORKER_CPU_REQUEST` | `250m` | CPU request per worker container | +| `ST_WORKER_CPU_LIMIT` | `500m` | CPU limit per worker container | +| `ST_WORKER_MEMORY_REQUEST` | `128Mi` | Memory request per worker container | +| `ST_WORKER_MEMORY_LIMIT` | `512Mi` | Memory limit per worker container | + +Values use Kubernetes resource quantity format (e.g. `250m`, `1`, `128Mi`, `1Gi`). + +### Execution reconciler + +When the server is running with K8s and a database, an execution reconciler runs as a background goroutine. It periodically scans for `running` executions whose K8s job has finished (completed or failed) but the worker never reported status back. These orphaned executions are marked as `failed`. + +Configuration: + +| Variable | Default | Description | +|---|---|---| +| `ST_RECONCILE_INTERVAL` | `60s` | How often the reconciler checks for orphans | +| `ST_RECONCILE_ORPHAN_TIMEOUT` | `5m` | Grace period before an execution without a K8s job is considered orphaned | + +Values use Go duration format (e.g. `60s`, `5m`, `1h`). Invalid values fall back to the default with a warning log. + ## Production Considerations - **Database**: Use a managed PostgreSQL with TimescaleDB (e.g., Timescale Cloud, AWS RDS) instead of the in-cluster StatefulSet