diff --git a/.github/workflows/devsecops.yml b/.github/workflows/devsecops.yml index efd5e38..c96db0e 100644 --- a/.github/workflows/devsecops.yml +++ b/.github/workflows/devsecops.yml @@ -106,7 +106,7 @@ jobs: run: docker build -t cast-scan:${{ github.sha }} . - name: Trivy scan if: steps.check_dockerfile.outputs.found == 'true' - uses: aquasecurity/trivy-action@master + uses: aquasecurity/trivy-action@v0.36.0 with: image-ref: cast-scan:${{ github.sha }} format: sarif diff --git a/README.md b/README.md index c12dfa0..b06ed55 100644 --- a/README.md +++ b/README.md @@ -177,6 +177,145 @@ CAST detects your project type and CI platform by looking for marker files: | GitLab | `.gitlab-ci.yml` exists | | GitHub | `.github/` directory exists (default) | +#### `cast audit` + +Audit the current repo's pipeline against CAST baseline compliance. + +``` +Usage: cast audit [OPTIONS] [PATH] + + Audit the current repo's pipeline against CAST baseline. + +Options: + --json Output results as JSON. + --help Show this message and exit. +``` + +**Examples:** + +```bash +# Check current directory +cast audit + +# Check a specific repository +cast audit /path/to/repo + +# Machine-readable output +cast audit --json +``` + +**Checks performed:** + +| Check | What it verifies | +|-------|------------------| +| Workflow file exists | `.github/workflows/*.yml` or `.gitlab-ci.yml` present | +| Secrets scanning | Gitleaks or equivalent configured | +| SAST | Semgrep, CodeQL, or equivalent configured | +| Dependency scanning (SCA) | pip-audit, npm audit, govulncheck, etc. | +| Container scanning | Trivy or equivalent configured | +| Code quality | Ruff, ESLint, staticcheck, etc. | +| SARIF upload | Findings uploaded to GitHub Security tab | +| Security gate | conftest/OPA policy gate configured | +| Action pinning (SHA) | GitHub Actions pinned by commit SHA | +| Workflow permissions | Top-level permissions block declared | +| OpenSSF Scorecard | Scorecard workflow integrated | +| Branch protection | Informational only (requires GitHub API) | + +Exit codes: `0` = all checks passed, `1` = one or more checks failed. + +#### `cast upgrade` + +Check if your CAST workflow is up-to-date with the latest baseline template. + +``` +Usage: cast upgrade [OPTIONS] [PATH] + + Check if your CAST workflow is up-to-date with the latest baseline. + +Options: + --diff Show unified diff between current and baseline. + --help Show this message and exit. +``` + +**Examples:** + +```bash +# Check for drift +cast upgrade + +# Show detailed diff +cast upgrade --diff +``` + +Exit codes: `0` = up-to-date, `1` = drift detected or no workflow found. + +#### `cast org audit` + +Audit multiple repositories at once — the organization-level governance view. + +``` +Usage: cast org audit [OPTIONS] ORG_PATH + + Organization-level governance: audit multiple repos at once. + +Options: + --json Output results as JSON. + --help Show this message and exit. +``` + +**Examples:** + +```bash +# Scan a directory of repos +cast org audit ~/my-org-repos/ + +# JSON output for dashboards/CI +cast org audit --json ~/my-org-repos/ +``` + +Output table: + +| Repo | CAST installed | Security gate | Score | +|------|---------------|---------------|-------| +| repo-a | yes | yes | 92 | +| repo-b | no | no | 34 | + +#### `cast profiles` + +List available baseline profiles for `cast init --profile`. + +```bash +cast profiles +``` + +**Available profiles:** + +| Profile | Policy | Scorecard | SLSA | Use case | +|---------|--------|-----------|------|----------| +| `opensource` | permissive | — | — | Public open-source projects | +| `enterprise` | strict | ✓ | ✓ | Enterprise with compliance requirements | +| `strict` | strict | ✓ | — | Maximum security posture | +| `ai-generated-code` | strict | ✓ | — | AI-generated codebases with extra checks | + +```bash +cast init --profile enterprise +``` + +#### `cast init --profile` + +Initialize a pipeline with a pre-configured baseline profile. + +```bash +# Enterprise: strict policy + Scorecard + SLSA +cast init --profile enterprise + +# Open-source: permissive policy, minimal overhead +cast init --profile opensource + +# AI-generated code: extra lint rules for generated code +cast init --profile ai-generated-code +``` + #### `cast version` Display the installed version of `castops`. @@ -357,6 +496,20 @@ templates and policy gates — so the standard is enforced by the pipeline itsel AI can generate a pipeline that runs. CAST enforces a pipeline that complies. +### CAST and OpenSSF Scorecard + +CAST is not another security scanner. It is the **enforcement layer** that translates industry +standards into executable CI baselines: + +> **OpenSSF Scorecard tells you what is wrong. CAST helps you install and enforce the fix across CI/CD.** + +Scorecard evaluates open-source project security practices. CAST takes those findings — along with +org-specific policies — and turns them into automated checks that run on every commit. Together, +they provide full-cycle governance: Scorecard for assessment, CAST for enforcement. + +Use `cast init --profile enterprise` to install a baseline that includes OpenSSF Scorecard +integration out of the box. + --- ## License diff --git a/src/cast_cli/audit.py b/src/cast_cli/audit.py new file mode 100644 index 0000000..732091d --- /dev/null +++ b/src/cast_cli/audit.py @@ -0,0 +1,382 @@ +"""cast audit — check if a repo's pipeline complies with CAST baseline.""" + +import re +from dataclasses import dataclass, field +from pathlib import Path + + +@dataclass +class AuditCheck: + """A single compliance check result.""" + + name: str + passed: bool + detail: str = "" + severity: str = "error" # error | warning | info + + +@dataclass +class AuditResult: + """Overall audit result for a repository.""" + + path: Path + platform: str # github / gitlab + checks: list[AuditCheck] = field(default_factory=list) + score: int = 0 + max_score: int = 0 + + @property + def failed_checks(self) -> list[AuditCheck]: + return [c for c in self.checks if not c.passed and c.severity != "info"] + + +# ── GitHub Actions baseline checks ───────────────────────────────────────────── + + +def _check_workflow_exists(path: Path) -> AuditCheck: + """Check if a CAST-like workflow file exists.""" + workflow = path / ".github" / "workflows" / "devsecops.yml" + if workflow.exists(): + return AuditCheck("Workflow file exists", True, str(workflow)) + # check if any workflow exists + wf_dir = path / ".github" / "workflows" + if wf_dir.exists(): + ymls = list(wf_dir.glob("*.yml")) + list(wf_dir.glob("*.yaml")) + if ymls: + return AuditCheck( + "Workflow file exists", + True, + f"Found {len(ymls)} workflow(s): {', '.join(f.name for f in ymls)}", + ) + return AuditCheck("Workflow file exists", False, "No .github/workflows/*.yml found") + + +def _check_secrets_scanning(workflow_content: str) -> AuditCheck: + """Check for Gitleaks / secrets scanning.""" + indicators = ["gitleaks", "trufflehog", "detect-secrets", "secret scanning"] + if any(indicator in workflow_content.lower() for indicator in indicators): + return AuditCheck("Secrets scanning", True, "Gitleaks detected in workflow") + return AuditCheck("Secrets scanning", False, "No secrets scanning tool found") + + +def _check_sast(workflow_content: str) -> AuditCheck: + """Check for SAST tooling.""" + indicators = ["semgrep", "codeql", "sonarqube", "sast"] + if any(indicator in workflow_content.lower() for indicator in indicators): + return AuditCheck("SAST", True, "Semgrep/CodeQL detected in workflow") + return AuditCheck("SAST", False, "No SAST tool found") + + +def _check_sca(workflow_content: str) -> AuditCheck: + """Check for dependency scanning.""" + content_lower = workflow_content.lower() + indicators = ["pip-audit", "npm audit", "govulncheck", "dependency-check", "snyk"] + if any(indicator in content_lower for indicator in indicators): + return AuditCheck("Dependency scanning (SCA)", True, "SCA tool detected in workflow") + # Check for "sca" as a whole-word or section label (not in random words like "scanner") + if re.search(r'\bsca\b', content_lower): + return AuditCheck("Dependency scanning (SCA)", True, "SCA section detected in workflow") + return AuditCheck("Dependency scanning (SCA)", False, "No dependency scanner found") + + +def _check_container_scanning(workflow_content: str) -> AuditCheck: + """Check for container/image scanning.""" + indicators = ["trivy", "dockerscout", "clair", "container scanning", "container scan"] + if any(indicator in workflow_content.lower() for indicator in indicators): + return AuditCheck("Container scanning", True, "Trivy or equivalent detected") + return AuditCheck("Container scanning", False, "No container scanner found") + + +def _check_code_quality(workflow_content: str) -> AuditCheck: + """Check for code quality tooling.""" + indicators = ["ruff", "eslint", "staticcheck", "golangci-lint", "pylint", "lint", "code quality"] + if any(indicator in workflow_content.lower() for indicator in indicators): + return AuditCheck("Code quality", True, "Linter detected in workflow") + return AuditCheck("Code quality", False, "No code quality tool found") + + +def _check_sarif_upload(workflow_content: str) -> AuditCheck: + """Check for SARIF upload steps.""" + content_lower = workflow_content.lower() + # Check for specific SARIF upload patterns, not just substring "sarif" + indicators = ["upload-sarif", "sarif_file", "sarif upload"] + if any(indicator in content_lower for indicator in indicators): + return AuditCheck("SARIF upload", True, "SARIF upload to Security tab enabled") + # Check for sarif being used meaningfully (--sarif flag, .sarif extension) + if re.search(r'--sarif|\.sarif', content_lower): + return AuditCheck("SARIF upload", True, "SARIF output detected in workflow") + return AuditCheck("SARIF upload", False, "No SARIF upload step found") + + +def _check_security_gate(workflow_content: str) -> AuditCheck: + """Check for a security gate / policy evaluation step.""" + content_lower = workflow_content.lower() + indicators = ["conftest", "policy gate", "security gate", "opa"] + if any(indicator in content_lower for indicator in indicators): + return AuditCheck("Security gate", True, "Policy gate (conftest/OPA) detected") + # Check for "gate" only in a meaningful context (not "navigate", "delegate") + if re.search(r'\bgate\b', content_lower): + return AuditCheck("Security gate", True, "Gate job detected in workflow") + return AuditCheck("Security gate", False, "No policy gate found — merges not gated") + + +def _check_action_pinning(workflow_content: str) -> AuditCheck: + """Check if GitHub Actions are pinned by commit SHA instead of tags/branches. + + Best practice: pin to full commit SHA for supply-chain security. + Tags (@v4) are mutable; branches (@master) are extremely risky. + """ + # Find all `uses:` lines (YAML step syntax: `- uses:` or just `uses:`) + uses_pattern = re.compile(r'^\s*(?:-\s+)?uses:\s*["\']?([^"\'\s#]+)', re.MULTILINE) + actions = uses_pattern.findall(workflow_content) + + if not actions: + return AuditCheck( + "Action pinning (SHA)", + True, + "No external actions used", + severity="info", + ) + + # SHA pinning: @ followed by exactly 40 hex chars + sha_pattern = re.compile(r'@[0-9a-f]{40}$') + tag_pattern = re.compile(r'@v?\d') + branch_pattern = re.compile(r'@(master|main|latest)$') + + unpinned: list[str] = [] + branch_pinned: list[str] = [] + + for action in actions: + if sha_pattern.search(action): + continue # pinned by SHA — good + if branch_pattern.search(action): + branch_pinned.append(action) + elif tag_pattern.search(action): + unpinned.append(action) + else: + # e.g. no version at all + unpinned.append(action) + + if branch_pinned: + names = ", ".join(branch_pinned[:3]) + if len(branch_pinned) > 3: + names += f" (+{len(branch_pinned) - 3} more)" + return AuditCheck( + "Action pinning (SHA)", + False, + f"Actions pinned to mutable branches: {names}", + ) + + if unpinned: + names = ", ".join(unpinned[:3]) + if len(unpinned) > 3: + names += f" (+{len(unpinned) - 3} more)" + return AuditCheck( + "Action pinning (SHA)", + False, + f"Actions not pinned by commit SHA: {names}", + severity="warning", + ) + + return AuditCheck("Action pinning (SHA)", True, f"All {len(actions)} actions pinned by SHA") + + +def _check_scorecard(workflow_content: str) -> AuditCheck: + """Check for OpenSSF Scorecard integration.""" + indicators = ["scorecard", "openssf", "ossf/scorecard"] + if any(indicator in workflow_content.lower() for indicator in indicators): + return AuditCheck("OpenSSF Scorecard", True, "Scorecard workflow detected") + return AuditCheck( + "OpenSSF Scorecard", + False, + "No Scorecard workflow found — run: cast init --profile enterprise", + severity="warning", + ) + + +def _check_branch_protection(repo_path: Path) -> AuditCheck: + """Check for local evidence of branch protection (we can't query GitHub API here).""" + # Can't truly check branch protection from local files alone. + # We note this as informational — true check needs API. + return AuditCheck( + "Branch protection", + False, + "Cannot verify locally — check Settings → Branches in your repo", + severity="info", + ) + + +def _check_permissions(workflow_content: str) -> AuditCheck: + """Check workflow-level permissions are declared.""" + if re.search(r'^permissions:', workflow_content, re.MULTILINE): + return AuditCheck("Workflow permissions declared", True, "Top-level permissions block found") + return AuditCheck( + "Workflow permissions declared", + False, + "No top-level permissions block — least-privilege not enforced", + severity="warning", + ) + + +# ── GitLab CI baseline checks ────────────────────────────────────────────────── + + +def _check_gitlab_ci_exists(path: Path) -> AuditCheck: + """Check if .gitlab-ci.yml exists.""" + ci_file = path / ".gitlab-ci.yml" + if ci_file.exists(): + return AuditCheck("GitLab CI file exists", True, str(ci_file)) + return AuditCheck("GitLab CI file exists", False, "No .gitlab-ci.yml found") + + +def _check_gitlab_secrets(ci_content: str) -> AuditCheck: + return _check_secrets_scanning(ci_content) + + +def _check_gitlab_sast(ci_content: str) -> AuditCheck: + return _check_sast(ci_content) + + +def _check_gitlab_sca(ci_content: str) -> AuditCheck: + return _check_sca(ci_content) + + +def _check_gitlab_container(ci_content: str) -> AuditCheck: + return _check_container_scanning(ci_content) + + +def _check_gitlab_quality(ci_content: str) -> AuditCheck: + return _check_code_quality(ci_content) + + +def _check_gitlab_gate(ci_content: str) -> AuditCheck: + return _check_security_gate(ci_content) + + +def _check_gitlab_artifact_reports(ci_content: str) -> AuditCheck: + """Check for GitLab artifact:reports for security dashboard.""" + if re.search(r'reports:', ci_content): + return AuditCheck("Security report artifacts", True, "SARIF reports published to GitLab dashboard") + return AuditCheck("Security report artifacts", False, "No artifact:reports for security dashboard") + + +# ── Main audit logic ─────────────────────────────────────────────────────────── + + +def _read_workflow_files(repo_path: Path, platform: str) -> str: + """Read all relevant workflow files and concatenate for analysis.""" + if platform == "gitlab": + ci_file = repo_path / ".gitlab-ci.yml" + return ci_file.read_text(encoding="utf-8") if ci_file.exists() else "" + + # GitHub: read all workflow files + wf_dir = repo_path / ".github" / "workflows" + if not wf_dir.exists(): + return "" + content_parts: list[str] = [] + for yml_file in sorted(wf_dir.glob("*.yml")): + try: + content_parts.append(yml_file.read_text(encoding="utf-8")) + except OSError: + pass + for yaml_file in sorted(wf_dir.glob("*.yaml")): + try: + content_parts.append(yaml_file.read_text(encoding="utf-8")) + except OSError: + pass + return "\n---\n".join(content_parts) + + +def audit_repo(repo_path: Path) -> AuditResult: + """Run all compliance checks against a single repository. + + Returns an AuditResult with scored checks. + """ + path = repo_path.resolve() + result = AuditResult(path=path, platform="unknown") + + # Determine platform + if (path / ".gitlab-ci.yml").exists(): + result.platform = "gitlab" + elif (path / ".github").exists(): + result.platform = "github" + else: + result.platform = "github" # default assumption + + workflow_content = _read_workflow_files(path, result.platform) + + if result.platform == "gitlab": + checks = [ + _check_gitlab_ci_exists(path), + ] + if workflow_content: + checks.extend([ + _check_gitlab_secrets(workflow_content), + _check_gitlab_sast(workflow_content), + _check_gitlab_sca(workflow_content), + _check_gitlab_container(workflow_content), + _check_gitlab_quality(workflow_content), + _check_gitlab_gate(workflow_content), + _check_gitlab_artifact_reports(workflow_content), + ]) + else: + checks.append(AuditCheck("GitLab CI content", False, "No .gitlab-ci.yml to analyze")) + else: + checks = [ + _check_workflow_exists(path), + ] + if workflow_content: + checks.extend([ + _check_secrets_scanning(workflow_content), + _check_sast(workflow_content), + _check_sca(workflow_content), + _check_container_scanning(workflow_content), + _check_code_quality(workflow_content), + _check_sarif_upload(workflow_content), + _check_security_gate(workflow_content), + _check_action_pinning(workflow_content), + _check_permissions(workflow_content), + _check_scorecard(workflow_content), + ]) + else: + checks.append(AuditCheck("Workflow content", False, "No workflow files to analyze")) + checks.append(_check_branch_protection(path)) + + result.checks = checks + # Score: count passed, exclude info-severity from max_score + scorable = [c for c in checks if c.severity != "info"] + passed = sum(1 for c in scorable if c.passed) + result.max_score = len(scorable) + result.score = int(passed / result.max_score * 100) if result.max_score > 0 else 0 + + return result + + +def audit_org(org_path: Path, *, max_depth: int = 2) -> list[AuditResult]: + """Scan a directory of repositories and return audit results for each repo. + + A repo is identified by the presence of a .git directory, a .github directory, + or a .gitlab-ci.yml file. + """ + results: list[AuditResult] = [] + + def _is_repo(p: Path) -> bool: + return (p / ".git").is_dir() or (p / ".github").is_dir() or (p / ".gitlab-ci.yml").exists() + + if _is_repo(org_path): + results.append(audit_repo(org_path)) + return results + + # Walk subdirectories + for entry in sorted(org_path.iterdir()): + if not entry.is_dir() or entry.name.startswith("."): + continue + if _is_repo(entry): + results.append(audit_repo(entry)) + elif max_depth > 1: + # Look one level deeper + for sub in sorted(entry.iterdir()): + if sub.is_dir() and _is_repo(sub): + results.append(audit_repo(sub)) + + return results diff --git a/src/cast_cli/main.py b/src/cast_cli/main.py index 96f10f5..cc411bf 100644 --- a/src/cast_cli/main.py +++ b/src/cast_cli/main.py @@ -9,6 +9,7 @@ import typer from rich.console import Console from rich.panel import Panel +from rich.table import Table from cast_cli.detect import detect_platform, detect_project from cast_cli.install import ( @@ -18,6 +19,9 @@ is_supported, write_template, ) +from cast_cli.audit import audit_repo, audit_org +from cast_cli.upgrade import check_upgrade +from cast_cli.profiles import get_profile, list_profiles app = typer.Typer( name="cast", @@ -76,6 +80,10 @@ def init( None, "--platform", "-p", help="CI platform (github/gitlab). Auto-detected if omitted.", ), + profile: Optional[str] = typer.Option( + None, "--profile", + help="Baseline profile: opensource / enterprise / strict / ai-generated-code", + ), ) -> None: """Initialize a DevSecOps pipeline for your project.""" @@ -136,6 +144,23 @@ def init( ) raise typer.Exit(1) + # ── resolve profile ────────────────────────────────────────────────────── + active_profile = None + if profile: + active_profile = get_profile(profile) + if active_profile is None: + valid = ", ".join(p.name for p in list_profiles()) + console.print(f"[red]Unknown profile:[/red] {profile!r} (valid: {valid})") + raise typer.Exit(1) + console.print(f"Profile: [bold magenta]{active_profile.name}[/bold magenta] — {active_profile.description}") + console.print(f" Policy: {active_profile.policy}") + if active_profile.include_scorecard: + console.print(" OpenSSF Scorecard: [green]enabled[/green]") + if active_profile.include_slsa: + console.print(" SLSA provenance: [green]enabled[/green]") + if active_profile.include_ai_checks: + console.print(" AI code checks: [green]enabled[/green]") + # ── fetch + write ───────────────────────────────────────────────────────── console.print("Installing template...", end=" ") @@ -150,6 +175,18 @@ def init( console.print("[green]done[/green]") console.print(f"\n[bold green]✓[/bold green] Created [cyan]{workflow_path}[/cyan]") + # ── profile-specific post-install notes ───────────────────────────────── + if active_profile and active_profile.include_scorecard: + console.print( + "\n[bold cyan]ℹ[/bold cyan] OpenSSF Scorecard: add the scorecard workflow separately:\n" + " https://github.com/ossf/scorecard#installation" + ) + if active_profile and active_profile.include_slsa: + console.print( + "\n[bold cyan]ℹ[/bold cyan] SLSA provenance: add the SLSA generator workflow:\n" + " https://github.com/slsa-framework/slsa-github-generator" + ) + if resolved_platform == "gitlab": console.print( "\nCommit and push to activate your DevSecOps pipeline:\n" @@ -305,3 +342,306 @@ def validate( console.print(f" Gate: [green]✓ would allow (policy: {effective_policy})[/green]") + +# ── audit command ───────────────────────────────────────────────────────────── + +_ICONS = {"error": "✗", "warning": "⚠", "info": "ℹ"} +_ICON_COLORS = {"error": "red", "warning": "yellow", "info": "dim"} + + +@app.command() +def audit( + path: Optional[str] = typer.Argument( + None, help="Path to repository. Defaults to current directory." + ), + json_output: bool = typer.Option( + False, "--json", help="Output results as JSON." + ), +) -> None: + """Audit the current repo's pipeline against CAST baseline. + + Checks for: + • Secrets scanning (Gitleaks) + • SAST (Semgrep/CodeQL) + • Dependency scanning (SCA) + • Container scanning (Trivy) + • Code quality linting + • SARIF upload to Security tab + • Security gate (conftest/OPA) + • GitHub Action SHA pinning + • Workflow permissions declaration + • OpenSSF Scorecard integration + • Branch protection (informational only) + + Exit codes: + 0 — all checks passed + 1 — one or more checks failed + """ + repo_path = Path(path).resolve() if path else Path.cwd() + result = audit_repo(repo_path) + + if json_output: + import json as _json + output = { + "path": str(result.path), + "platform": result.platform, + "score": result.score, + "max_score": result.max_score, + "checks": [ + { + "name": c.name, + "passed": c.passed, + "detail": c.detail, + "severity": c.severity, + } + for c in result.checks + ], + } + # Use plain print for reliable CliRunner capture + print(_json.dumps(output, indent=2)) + if result.score < 100: + raise typer.Exit(1) + return + + # ── rich table output ──────────────────────────────────────────────────── + console.print( + f"\n[bold]CAST Audit[/bold] — {result.path.name}\n" + f" Platform: [cyan]{result.platform}[/cyan]\n" + ) + + table = Table(show_header=False, padding=(0, 1), box=None) + table.add_column("status", width=3) + table.add_column("check", style="bold") + table.add_column("detail", style="dim") + + for check in result.checks: + icon = "✓" if check.passed else _ICONS.get(check.severity, "?") + color = "green" if check.passed else _ICON_COLORS.get(check.severity, "red") + table.add_row(f"[{color}]{icon}[/{color}]", check.name, check.detail) + + console.print(table) + + # ── score ───────────────────────────────────────────────────────────────── + scorable = [c for c in result.checks if c.severity != "info"] + passed = sum(1 for c in scorable if c.passed) + total = len(scorable) + score = int(passed / total * 100) if total > 0 else 100 + + score_color = "green" if score >= 80 else "yellow" if score >= 60 else "red" + console.print( + f"\n[bold]Score: [{score_color}]{score}[/{score_color}]/{total}[/bold] " + f"({passed}/{total} checks passed)" + ) + + if score < 100: + console.print( + "\n[dim]Run [bold]cast init[/bold] to install the baseline, " + "or [bold]cast upgrade[/bold] to see what changed.[/dim]" + ) + raise typer.Exit(1) + + console.print("\n[bold green]All checks passed ✓[/bold green]") + + +# ── org command ──────────────────────────────────────────────────────────────── + +@app.command() +def org( + subcommand: str = typer.Argument(..., help="Subcommand: audit"), + org_path: str = typer.Argument( + ".", help="Path to directory containing repositories." + ), + json_output: bool = typer.Option( + False, "--json", help="Output results as JSON." + ), +) -> None: + """Organization-level governance: audit multiple repos at once. + + Usage: + cast org audit /path/to/repos/ + cast org audit --json /path/to/repos/ # JSON output + + A repo is detected by the presence of .git/ directory, .github/ directory, + or .gitlab-ci.yml file. + """ + if subcommand != "audit": + console.print(f"[red]Unknown subcommand:[/red] {subcommand}. Use: cast org audit ") + raise typer.Exit(1) + + root = Path(org_path).resolve() + if not root.exists(): + console.print(f"[red]Path not found:[/red] {root}") + raise typer.Exit(1) + + results = audit_org(root) + + if not results: + console.print("[yellow]No repositories found.[/yellow]") + return + + if json_output: + import json as _json + output = { + "repositories": [ + { + "path": str(r.path), + "platform": r.platform, + "score": r.score, + "cast_installed": any( + c.name == "Workflow file exists" and c.passed + for c in r.checks + ) or any( + c.name == "GitLab CI file exists" and c.passed + for c in r.checks + ), + "policy": "—", # requires config file; placeholder + "security_gate": any( + c.name == "Security gate" and c.passed for c in r.checks + ), + "checks": [ + {"name": c.name, "passed": c.passed, "severity": c.severity} + for c in r.checks + ], + } + for r in results + ] + } + print(_json.dumps(output, indent=2)) + return + + # ── rich table output ──────────────────────────────────────────────────── + console.print(f"\n[bold]CAST Org Audit[/bold] — {root}\n") + + table = Table(show_header=True, padding=(0, 1)) + table.add_column("Repo", style="bold") + table.add_column("CAST installed", justify="center") + table.add_column("Security gate", justify="center") + table.add_column("Score", justify="right") + + for r in results: + cast_installed = any( + (c.name == "Workflow file exists" or c.name == "GitLab CI file exists") + and c.passed + for c in r.checks + ) + security_gate = any( + c.name == "Security gate" and c.passed for c in r.checks + ) + + cast_str = "[green]yes[/green]" if cast_installed else "[red]no[/red]" + gate_str = "[green]yes[/green]" if security_gate else "[red]no[/red]" + score_color = "green" if r.score >= 80 else "yellow" if r.score >= 60 else "red" + score_str = f"[{score_color}]{r.score}[/{score_color}]" + + table.add_row(r.path.name, cast_str, gate_str, score_str) + + console.print(table) + + # Summary + compliant = sum(1 for r in results if r.score >= 80) + console.print( + f"\n[dim]{compliant}/{len(results)} repos compliant (≥80), " + f"{len(results) - compliant} need attention[/dim]" + ) + + +# ── upgrade command ──────────────────────────────────────────────────────────── + +@app.command() +def upgrade( + path: Optional[str] = typer.Argument( + None, help="Path to repository. Defaults to current directory." + ), + diff_only: bool = typer.Option( + False, "--diff", help="Show unified diff between current and baseline." + ), +) -> None: + """Check if your CAST workflow is up-to-date with the latest baseline. + + Compares the current workflow file against the latest CAST template + for your project type and platform. + + Exit codes: + 0 — workflow is up-to-date + 1 — workflow differs from baseline (or no workflow found) + """ + repo_path = Path(path).resolve() if path else Path.cwd() + result = check_upgrade(repo_path) + + if result.get("error"): + console.print(f"[red]Error:[/red] {result['error']}") + raise typer.Exit(1) + + console.print( + f"\n[bold]CAST Upgrade Check[/bold]\n" + f" Project type: [cyan]{result['project_type']}[/cyan]\n" + f" Platform: [cyan]{result['platform']}[/cyan]\n" + f" Workflow: [dim]{result['current_path']}[/dim]\n" + ) + + if not result["current_exists"]: + console.print( + "\n[yellow]No CAST workflow found.[/yellow]\n" + "Run [bold]cast init[/bold] to create one." + ) + raise typer.Exit(1) + + if result["up_to_date"]: + console.print("\n[bold green]✓ Workflow is up-to-date with the latest CAST baseline.[/bold green]") + return + + console.print("\n[yellow]⚠ Workflow differs from the latest CAST baseline.[/yellow]") + + if diff_only or result["diff"]: + console.print("\n[bold]Changes needed:[/bold]\n") + # Colorize the diff output + for line in result["diff"].splitlines(): + if line.startswith("---") or line.startswith("+++"): + console.print(f"[bold]{line}[/bold]") + elif line.startswith("@@"): + console.print(f"[cyan]{line}[/cyan]") + elif line.startswith("+"): + console.print(f"[green]{line}[/green]") + elif line.startswith("-"): + console.print(f"[red]{line}[/red]") + else: + console.print(line) + + console.print( + "\n[dim]To apply the latest baseline:[/dim]\n" + " [bold]cast init --force[/bold] (overwrites current workflow)\n" + " Review the diff above before overwriting." + ) + + raise typer.Exit(1) + + +# ── profiles command ─────────────────────────────────────────────────────────── + +@app.command(name="profiles") +def list_profiles_command() -> None: + """List available baseline profiles for cast init.""" + console.print("\n[bold]Available CAST Profiles[/bold]\n") + + table = Table(show_header=True, padding=(0, 1)) + table.add_column("Profile", style="bold magenta") + table.add_column("Policy") + table.add_column("Scorecard") + table.add_column("SLSA") + table.add_column("Description") + + for p in list_profiles(): + table.add_row( + p.name, + p.policy, + "✓" if p.include_scorecard else "—", + "✓" if p.include_slsa else "—", + p.description, + ) + + console.print(table) + console.print( + "\n[dim]Usage:[/dim] [bold]cast init --profile [/bold]\n" + "Example: [bold]cast init --profile enterprise[/bold]" + ) + diff --git a/src/cast_cli/profiles.py b/src/cast_cli/profiles.py new file mode 100644 index 0000000..33c95e6 --- /dev/null +++ b/src/cast_cli/profiles.py @@ -0,0 +1,72 @@ +"""CAST baseline profiles for `cast init --profile `. + +Profiles control which template variant is selected and override policy defaults. +""" + +from dataclasses import dataclass + +PROFILES: dict[str, "Profile"] = {} + + +@dataclass +class Profile: + """A named baseline profile that tunes template + policy defaults.""" + + name: str + description: str + policy: str = "default" # default / strict / permissive + template_variant: str = "" # sub-directory of the language template (e.g., "opensource") + include_scorecard: bool = False + include_slsa: bool = False + include_ai_checks: bool = False + + def __post_init__(self) -> None: + PROFILES[self.name] = self + + +# ── Profile definitions ──────────────────────────────────────────────────────── + +Profile( + name="opensource", + description="Open-source project: permissive policy, no SLSA/Scorecard required", + policy="permissive", + template_variant="opensource", + include_scorecard=False, + include_slsa=False, +) + +Profile( + name="enterprise", + description="Enterprise: strict policy + OpenSSF Scorecard + SLSA provenance", + policy="strict", + template_variant="enterprise", + include_scorecard=True, + include_slsa=True, +) + +Profile( + name="strict", + description="Maximum security: blocks on HIGH+CRITICAL, action SHA pinning enforced", + policy="strict", + template_variant="strict", + include_scorecard=True, +) + +Profile( + name="ai-generated-code", + description="Extra scanning for AI-generated code: stricter lint rules, prompt-injection checks", + policy="strict", + template_variant="ai-generated", + include_ai_checks=True, + include_scorecard=True, +) + + +def get_profile(name: str) -> Profile | None: + """Look up a profile by name. Returns None if not found.""" + return PROFILES.get(name) + + +def list_profiles() -> list[Profile]: + """Return all registered profiles.""" + return list(PROFILES.values()) diff --git a/src/cast_cli/templates/go/devsecops.yml b/src/cast_cli/templates/go/devsecops.yml index 523a6da..fa4f1fb 100644 --- a/src/cast_cli/templates/go/devsecops.yml +++ b/src/cast_cli/templates/go/devsecops.yml @@ -97,7 +97,7 @@ jobs: run: docker build -t cast-scan:${{ github.sha }} . - name: Trivy scan if: steps.check_dockerfile.outputs.found == 'true' - uses: aquasecurity/trivy-action@master + uses: aquasecurity/trivy-action@v0.36.0 with: image-ref: cast-scan:${{ github.sha }} format: sarif diff --git a/src/cast_cli/templates/nodejs/devsecops.yml b/src/cast_cli/templates/nodejs/devsecops.yml index 0f3f5af..bdbd4e2 100644 --- a/src/cast_cli/templates/nodejs/devsecops.yml +++ b/src/cast_cli/templates/nodejs/devsecops.yml @@ -97,7 +97,7 @@ jobs: run: docker build -t cast-scan:${{ github.sha }} . - name: Trivy scan if: steps.check_dockerfile.outputs.found == 'true' - uses: aquasecurity/trivy-action@master + uses: aquasecurity/trivy-action@v0.36.0 with: image-ref: cast-scan:${{ github.sha }} format: sarif diff --git a/src/cast_cli/templates/python/devsecops.yml b/src/cast_cli/templates/python/devsecops.yml index 28d8c27..f38912f 100644 --- a/src/cast_cli/templates/python/devsecops.yml +++ b/src/cast_cli/templates/python/devsecops.yml @@ -98,7 +98,7 @@ jobs: run: docker build -t cast-scan:${{ github.sha }} . - name: Trivy scan if: steps.check_dockerfile.outputs.found == 'true' - uses: aquasecurity/trivy-action@master + uses: aquasecurity/trivy-action@v0.36.0 with: image-ref: cast-scan:${{ github.sha }} format: sarif diff --git a/src/cast_cli/upgrade.py b/src/cast_cli/upgrade.py new file mode 100644 index 0000000..e4c93a2 --- /dev/null +++ b/src/cast_cli/upgrade.py @@ -0,0 +1,93 @@ +"""cast upgrade — detect drift between current workflow and latest CAST baseline.""" + +import difflib +from pathlib import Path +from typing import Optional + +from cast_cli.detect import detect_project, detect_platform +from cast_cli.install import load_template, get_workflow_path, is_supported + + +def _read_current_workflow(repo_path: Path, platform: str) -> Optional[str]: + """Read the current workflow file content, if it exists.""" + wf_path = get_workflow_path(platform) + full_path = repo_path / wf_path + if not full_path.exists(): + return None + return full_path.read_text(encoding="utf-8") + + +def check_upgrade(repo_path: Path) -> dict: + """Compare the current workflow against the latest CAST template. + + Returns a dict with: + - project_type: detected project type + - platform: detected CI platform + - current_exists: bool + - current_path: path to the current workflow + - up_to_date: bool (True if identical to template) + - diff: unified diff (empty string if up-to-date) + - current_content: current file content + - template_content: latest template content + """ + repo_path = repo_path.resolve() + project_type = detect_project(repo_path) + platform = detect_platform(repo_path) + + result: dict = { + "project_type": project_type, + "platform": platform, + "current_exists": False, + "current_path": str(repo_path / get_workflow_path(platform)), + "up_to_date": True, + "diff": "", + "current_content": "", + "template_content": "", + "error": None, + } + + if project_type is None: + result["error"] = ( + "Cannot detect project type. Use --type to specify one of: python, nodejs, go" + ) + return result + + if not is_supported(project_type): + result["error"] = f"Unsupported project type: {project_type}" + return result + + # Load the latest template + try: + template = load_template(project_type, platform) + except Exception as e: + result["error"] = f"Failed to load template for {project_type}/{platform}: {e}" + return result + + result["template_content"] = template + + # Read current workflow + current = _read_current_workflow(repo_path, platform) + if current is None: + result["current_exists"] = False + result["up_to_date"] = False + result["diff"] = f"No workflow found at {result['current_path']}.\nRun `cast init` to create one." + return result + + result["current_exists"] = True + result["current_content"] = current + + if current.rstrip() == template.rstrip(): + result["up_to_date"] = True + else: + result["up_to_date"] = False + diff_lines = list( + difflib.unified_diff( + current.splitlines(keepends=True), + template.splitlines(keepends=True), + fromfile=str(get_workflow_path(platform)), + tofile=f"template:{project_type}/{platform}/devsecops.yml", + ) + ) + result["diff"] = "".join(diff_lines) + + return result diff --git a/tests/test_audit.py b/tests/test_audit.py new file mode 100644 index 0000000..a5d3524 --- /dev/null +++ b/tests/test_audit.py @@ -0,0 +1,378 @@ +"""Tests for `cast audit` command.""" + +import json + +import pytest +from typer.testing import CliRunner + +from cast_cli.main import app +from cast_cli.audit import ( + audit_repo, + _check_secrets_scanning, + _check_sast, + _check_sarif_upload, + _check_security_gate, + _check_action_pinning, + _check_scorecard, + _check_permissions, + _check_workflow_exists, + _check_branch_protection, + _check_sca, + _check_container_scanning, + _check_code_quality, +) + +runner = CliRunner() + + +# ── Unit tests for individual checks ───────────────────────────────────────── + + +class TestCheckWorkflowExists: + def test_exists_when_devsecops_present(self, tmp_path): + wf = tmp_path / ".github" / "workflows" / "devsecops.yml" + wf.parent.mkdir(parents=True) + wf.write_text("name: test") + check = _check_workflow_exists(tmp_path) + assert check.passed + + def test_exists_when_any_workflow_present(self, tmp_path): + wf = tmp_path / ".github" / "workflows" / "ci.yml" + wf.parent.mkdir(parents=True) + wf.write_text("name: ci") + check = _check_workflow_exists(tmp_path) + assert check.passed + + def test_fails_when_no_workflows(self, tmp_path): + check = _check_workflow_exists(tmp_path) + assert not check.passed + + +class TestCheckSecretsScanning: + def test_gitleaks_detected(self): + wf = " - uses: gitleaks/gitleaks-action@v2" + assert _check_secrets_scanning(wf).passed + + def test_trufflehog_detected(self): + wf = " - name: trufflehog scan" + assert _check_secrets_scanning(wf).passed + + def test_not_detected(self): + wf = " - name: build\n run: make" + assert not _check_secrets_scanning(wf).passed + + +class TestCheckSast: + def test_semgrep_detected(self): + wf = "semgrep ci --sarif" + assert _check_sast(wf).passed + + def test_codeql_detected(self): + wf = "uses: github/codeql-action/init@v3" + assert _check_sast(wf).passed + + def test_not_detected(self): + wf = "echo hello" + assert not _check_sast(wf).passed + + +class TestCheckSca: + def test_pip_audit_detected(self): + assert _check_sca("pip-audit -r requirements.txt").passed + + def test_npm_audit_detected(self): + assert _check_sca("npm audit --audit-level=high").passed + + def test_govulncheck_detected(self): + assert _check_sca("govulncheck ./...").passed + + def test_not_detected(self): + assert not _check_sca("echo hello world").passed + + +class TestCheckContainerScanning: + def test_trivy_detected(self): + assert _check_container_scanning("trivy scan").passed + + def test_not_detected(self): + assert not _check_container_scanning("echo hello").passed + + +class TestCheckCodeQuality: + def test_ruff_detected(self): + assert _check_code_quality("ruff check .").passed + + def test_eslint_detected(self): + assert _check_code_quality("npx eslint").passed + + def test_not_detected(self): + assert not _check_code_quality("echo hello").passed + + +class TestCheckSarifUpload: + def test_upload_sarif_detected(self): + wf = "uses: github/codeql-action/upload-sarif@v3" + assert _check_sarif_upload(wf).passed + + def test_sarif_file_detected(self): + wf = "sarif_file: semgrep.sarif" + assert _check_sarif_upload(wf).passed + + def test_not_detected(self): + wf = "echo no sarif" + assert not _check_sarif_upload(wf).passed + + +class TestCheckSecurityGate: + def test_conftest_detected(self): + wf = "conftest test sarif-results/*.sarif" + assert _check_security_gate(wf).passed + + def test_gate_detected(self): + wf = "Security gate blocked merge" + assert _check_security_gate(wf).passed + + def test_not_detected(self): + wf = "echo hello world" + assert not _check_security_gate(wf).passed + + +class TestCheckActionPinning: + def test_sha_pinning_passes(self): + wf = "uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683" + assert _check_action_pinning(wf).passed + + def test_tag_pinning_warns(self): + wf = "uses: actions/checkout@v4" + check = _check_action_pinning(wf) + assert not check.passed + assert check.severity == "warning" + + def test_branch_pinning_fails(self): + wf = "uses: aquasecurity/trivy-action@master" + check = _check_action_pinning(wf) + assert not check.passed + assert check.severity == "error" + + def test_no_actions_info(self): + wf = "name: simple workflow\nrun: echo hello" + check = _check_action_pinning(wf) + assert check.passed + assert check.severity == "info" + + def test_mixed_pinning_reports_unpinned(self): + wf = """\ +uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 +uses: actions/setup-python@v5 +uses: some-action@master""" + check = _check_action_pinning(wf) + assert not check.passed + + +class TestCheckScorecard: + def test_scorecard_detected(self): + assert _check_scorecard("uses: ossf/scorecard-action@v2").passed + + def test_openssf_detected(self): + assert _check_scorecard("OpenSSF scorecard scan").passed + + def test_not_detected(self): + assert not _check_scorecard("echo hello").passed + assert _check_scorecard("").passed is False # returns warning + + +class TestCheckPermissions: + def test_permissions_present(self): + wf = "permissions:\n contents: read" + assert _check_permissions(wf).passed + + def test_no_permissions_warns(self): + wf = "name: test\non: push" + check = _check_permissions(wf) + assert not check.passed + assert check.severity == "warning" + + +class TestCheckBranchProtection: + def test_always_info(self, tmp_path): + check = _check_branch_protection(tmp_path) + assert check.severity == "info" + + +# ── audit_repo integration tests ────────────────────────────────────────────── + + +class TestAuditRepo: + def test_empty_repo_scores_zero(self, tmp_path, monkeypatch): + monkeypatch.chdir(tmp_path) + result = audit_repo(tmp_path) + assert result.platform == "github" + # Without workflow files, only workflow-exists and branch-protection checks + scorable = [c for c in result.checks if c.severity != "info"] + assert all(not c.passed for c in scorable) + + def test_full_workflow_scores_high(self, tmp_path, monkeypatch): + # Create a well-configured workflow + wf = tmp_path / ".github" / "workflows" / "devsecops.yml" + wf.parent.mkdir(parents=True) + wf.write_text("""\ +permissions: + contents: read + security-events: write + +jobs: + secrets: + steps: + - uses: gitleaks/gitleaks-action@v2 + sast: + steps: + - run: semgrep ci --sarif + - uses: github/codeql-action/upload-sarif@v3 + sca: + steps: + - run: pip-audit + container: + steps: + - run: trivy scan + quality: + steps: + - run: ruff check . + gate: + steps: + - run: conftest test +""") + monkeypatch.chdir(tmp_path) + result = audit_repo(tmp_path) + scorable = [c for c in result.checks if c.severity != "info"] + passed = sum(1 for c in scorable if c.passed) + # At least most checks should pass + assert passed >= len(scorable) - 2 # Allow branch protection + scorecard + + +# ── CLI integration tests ───────────────────────────────────────────────────── + + +class TestAuditCommand: + @pytest.fixture(autouse=True) + def _chdir(self, tmp_path, monkeypatch): + monkeypatch.chdir(tmp_path) + + def test_audit_empty_dir_exits_one(self, tmp_path): + result = runner.invoke(app, ["audit"]) + assert result.exit_code == 1 + + def test_audit_json_output(self, tmp_path, monkeypatch): + # Create a minimal workflow so we get some data + wf = tmp_path / ".github" / "workflows" / "ci.yml" + wf.parent.mkdir(parents=True) + wf.write_text("name: ci\npermissions:\n contents: read\njobs:\n test:\n steps: []") + result = runner.invoke(app, ["audit", "--json"]) + assert result.exit_code == 1 # not all pass + data = json.loads(result.output) + assert "checks" in data + assert "score" in data + assert isinstance(data["checks"], list) + + def test_audit_with_path_argument(self, tmp_path): + # Create a repo in a subdirectory + sub = tmp_path / "my-repo" + sub.mkdir() + wf = sub / ".github" / "workflows" / "devsecops.yml" + wf.parent.mkdir(parents=True) + wf.write_text("""\ +permissions: + contents: read +jobs: + secrets: + steps: + - uses: gitleaks/gitleaks-action@v2 + sast: + steps: + - run: semgrep ci --sarif + - uses: github/codeql-action/upload-sarif@v3 + sca: + steps: + - run: pip-audit + container: + steps: + - run: trivy + quality: + steps: + - run: ruff check . + gate: + steps: + - run: conftest test +""") + result = runner.invoke(app, ["audit", str(sub)]) + assert result.exit_code == 0 or result.exit_code == 1 + + +class TestOrgAuditCommand: + @pytest.fixture(autouse=True) + def _chdir(self, tmp_path, monkeypatch): + monkeypatch.chdir(tmp_path) + + def test_org_audit_no_repos(self, tmp_path): + result = runner.invoke(app, ["org", "audit", str(tmp_path)]) + assert result.exit_code == 0 + assert "No repositories found" in result.output + + def test_org_audit_with_repos(self, tmp_path): + # Create two repos + for name in ["repo-a", "repo-b"]: + repo = tmp_path / name + repo.mkdir() + wf = repo / ".github" / "workflows" / "devsecops.yml" + wf.parent.mkdir(parents=True) + wf.write_text("""\ +permissions: + contents: read +jobs: + secrets: + steps: + - uses: gitleaks/gitleaks-action@v2 + sast: + steps: + - run: semgrep ci --sarif + - uses: github/codeql-action/upload-sarif@v3 + sca: + steps: + - run: pip-audit + container: + steps: + - run: trivy + quality: + steps: + - run: ruff + gate: + steps: + - run: conftest test +""") + + result = runner.invoke(app, ["org", "audit", str(tmp_path)]) + assert result.exit_code == 0 + assert "repo-a" in result.output + assert "repo-b" in result.output + + def test_org_audit_json_output(self, tmp_path): + repo = tmp_path / "my-repo" + repo.mkdir() + wf = repo / ".github" / "workflows" / "devsecops.yml" + wf.parent.mkdir(parents=True) + wf.write_text("name: ci\npermissions:\n contents: read\njobs:\n test:\n steps: []") + + result = runner.invoke(app, ["org", "audit", "--json", str(tmp_path)]) + assert result.exit_code == 0 + data = json.loads(result.output) + assert "repositories" in data + assert len(data["repositories"]) == 1 + + +class TestProfilesCommand: + def test_list_profiles(self): + result = runner.invoke(app, ["profiles"]) + assert result.exit_code == 0 + assert "opensource" in result.output + assert "enterprise" in result.output + assert "strict" in result.output + assert "ai-generated-code" in result.output diff --git a/tests/test_upgrade.py b/tests/test_upgrade.py new file mode 100644 index 0000000..f36c769 --- /dev/null +++ b/tests/test_upgrade.py @@ -0,0 +1,125 @@ +"""Tests for `cast upgrade` command.""" + +import pytest +from typer.testing import CliRunner + +from cast_cli.main import app +from cast_cli.upgrade import check_upgrade + +runner = CliRunner() + + +class TestCheckUpgrade: + def test_no_workflow_returns_not_up_to_date(self, tmp_path, monkeypatch): + monkeypatch.chdir(tmp_path) + # No project markers → error + result = check_upgrade(tmp_path) + assert result["error"] is not None + + def test_missing_workflow_returns_not_found(self, tmp_path, monkeypatch): + # Create a Python project with no workflow + (tmp_path / "pyproject.toml").touch() + monkeypatch.chdir(tmp_path) + result = check_upgrade(tmp_path) + assert not result["up_to_date"] + assert not result["current_exists"] + assert "No workflow found" in result["diff"] + + def test_workflow_matches_template_is_up_to_date(self, tmp_path, monkeypatch): + from cast_cli.install import load_template + + (tmp_path / "pyproject.toml").touch() + wf_path = tmp_path / ".github" / "workflows" / "devsecops.yml" + wf_path.parent.mkdir(parents=True) + template = load_template("python", "github") + wf_path.write_text(template) + + monkeypatch.chdir(tmp_path) + result = check_upgrade(tmp_path) + assert result["up_to_date"] + assert result["current_exists"] + + def test_modified_workflow_is_not_up_to_date(self, tmp_path, monkeypatch): + from cast_cli.install import load_template + + (tmp_path / "pyproject.toml").touch() + wf_path = tmp_path / ".github" / "workflows" / "devsecops.yml" + wf_path.parent.mkdir(parents=True) + template = load_template("python", "github") + # Modify the template + modified = template.replace("CAST DevSecOps", "My Custom Pipeline") + wf_path.write_text(modified) + + monkeypatch.chdir(tmp_path) + result = check_upgrade(tmp_path) + assert not result["up_to_date"] + assert len(result["diff"]) > 0 + + def test_gitlab_project_detected(self, tmp_path, monkeypatch): + from cast_cli.install import load_template + + (tmp_path / "pyproject.toml").touch() + (tmp_path / ".gitlab-ci.yml").touch() + ci_file = tmp_path / ".gitlab-ci.yml" + template = load_template("python", "gitlab") + ci_file.write_text(template) + + monkeypatch.chdir(tmp_path) + result = check_upgrade(tmp_path) + assert result["up_to_date"] + assert result["platform"] == "gitlab" + + +class TestUpgradeCommand: + @pytest.fixture(autouse=True) + def _chdir(self, tmp_path, monkeypatch): + monkeypatch.chdir(tmp_path) + + def test_upgrade_no_project_exits_one(self, tmp_path): + result = runner.invoke(app, ["upgrade"]) + assert result.exit_code == 1 + + def test_upgrade_no_workflow_exits_one(self, tmp_path): + (tmp_path / "pyproject.toml").touch() + result = runner.invoke(app, ["upgrade"]) + assert result.exit_code == 1 + assert "No CAST workflow found" in result.output + + def test_upgrade_workflow_up_to_date_exits_zero(self, tmp_path): + from cast_cli.install import load_template + + (tmp_path / "pyproject.toml").touch() + wf = tmp_path / ".github" / "workflows" / "devsecops.yml" + wf.parent.mkdir(parents=True) + wf.write_text(load_template("python", "github")) + + result = runner.invoke(app, ["upgrade"]) + assert result.exit_code == 0 + assert "up-to-date" in result.output + + def test_upgrade_detects_drift(self, tmp_path): + from cast_cli.install import load_template + + (tmp_path / "pyproject.toml").touch() + wf = tmp_path / ".github" / "workflows" / "devsecops.yml" + wf.parent.mkdir(parents=True) + modified = load_template("python", "github").replace("CAST DevSecOps", "My Pipeline") + wf.write_text(modified) + + result = runner.invoke(app, ["upgrade"]) + assert result.exit_code == 1 + assert "differs" in result.output.lower() + + def test_upgrade_diff_flag_shows_diff(self, tmp_path): + from cast_cli.install import load_template + + (tmp_path / "pyproject.toml").touch() + wf = tmp_path / ".github" / "workflows" / "devsecops.yml" + wf.parent.mkdir(parents=True) + modified = load_template("python", "github").replace("CAST DevSecOps", "My Pipeline") + wf.write_text(modified) + + result = runner.invoke(app, ["upgrade", "--diff"]) + assert result.exit_code == 1 + # Should contain diff markers + assert "---" in result.output or "+++" in result.output or "@@" in result.output