diff --git a/README.md b/README.md index b4e54d0..68029e4 100644 --- a/README.md +++ b/README.md @@ -5,6 +5,7 @@ Migrate workloads from other platforms to [StackGuardian Platform](https://app.s ## Supported platforms for migration - Terraform Cloud +- Git VCS (GitHub, GitLab) — create workflows directly from Terraform repositories ## Overview @@ -12,11 +13,28 @@ Migrate workloads from other platforms to [StackGuardian Platform](https://app.s - Review the bulk workflow creation payload. - Run sg-cli with the bulk workflow creation payload. +## Transformers + +### Git VCS (GitHub / GitLab) + +Create workflows from Terraform repositories without needing Terraform Cloud. See [transformer/git-vcs/README.md](transformer/git-vcs/README.md) for full docs. + +```shell +cd transformer/git-vcs +pip install . +sg-git-scan --provider github --token ghp_xxx --org my-org +``` + +### Terraform Cloud + +Migrate workspaces from Terraform Cloud/Enterprise. + ## Prerequisites - An organization on [StackGuardian Platform](https://app.stackguardian.io) - Optionally, pre-configure VCS, cloud integrations or private runners to use when importing into StackGuardian Platform. -- Terraform +- Terraform (for the Terraform Cloud transformer) +- Python 3.10+ (for the Git VCS transformer) - [sg-cli](https://github.com/StackGuardian/sg-cli/tree/main/shell) ### Perform terraform login diff --git a/transformer/git-vcs/.gitignore b/transformer/git-vcs/.gitignore new file mode 100644 index 0000000..c20ea8c --- /dev/null +++ b/transformer/git-vcs/.gitignore @@ -0,0 +1,3 @@ +__pycache__/ +*.pyc +sg-payload.json diff --git a/transformer/git-vcs/README.md b/transformer/git-vcs/README.md new file mode 100644 index 0000000..15e60c0 --- /dev/null +++ b/transformer/git-vcs/README.md @@ -0,0 +1,104 @@ +# Git VCS Transformer + +Create StackGuardian workflows from Terraform repositories hosted on GitHub or GitLab. + +This transformer connects to your VCS provider, discovers all Terraform repositories, and generates an `sg-payload.json` that can be used to bulk-create workflows on the [StackGuardian Platform](https://app.stackguardian.io). + +## How it works + +1. **Discover** — Lists repositories in your GitHub org or GitLab group via API +2. **Scan** — Fetches the file tree of each repo and detects directories containing `.tf` files +3. **Transform** — Maps each Terraform project to a StackGuardian workflow payload, inferring: + - Terraform version (from `required_version`) + - Cloud provider (from `provider` blocks → `DeploymentPlatformConfig`) + - VCS source config (repo URL, branch, working directory) + - Extra CLI args (when `.tfvars` files are detected) +4. **Output** — Writes `sg-payload.json` for review and bulk import + +## Prerequisites + +- Python 3.10+ +- A GitHub PAT or GitLab PAT with repo read access +- [sg-cli](https://github.com/StackGuardian/sg-cli) for importing workflows + +## Install + +```bash +cd transformer/git-vcs +pip install . +``` + +## Usage + +```bash +# Scan a GitHub organization +sg-git-scan --provider github --token ghp_xxx --org my-org + +# Scan a GitLab group +sg-git-scan --provider gitlab --token glpat-xxx --org my-group + +# Limit to 50 repos, custom output path +sg-git-scan --provider github --token ghp_xxx --org my-org --max-repos 50 --output export/sg-payload.json +``` + +## CLI Options + +``` +Required: + --provider, -p VCS provider (github or gitlab) + --token, -t VCS access token + +Target: + --org, -o Organization (GitHub) or group (GitLab) + --user, -u User whose repos to scan + +Filtering: + --max-repos, -m Maximum repositories to scan + --include-archived Include archived repositories + --include-forks Include forked repositories + +StackGuardian defaults: + --wfgrp Workflow group name (default: imported-workflows) + --vcs-auth SG VCS integration path (e.g., /integrations/github_com) + --managed-state Enable SG-managed Terraform state + +Output: + --output, -O Output file (default: sg-payload.json) + --quiet, -q Minimal output + --verbose, -v Debug output +``` + +## After generating sg-payload.json + +Use the `example_payload.jsonc` file as a reference and edit the `sg-payload.json` to configure: + +- `DeploymentPlatformConfig` — Cloud connector (AWS/Azure/GCP integration ID) +- `VCSConfig.customSource.config.auth` — VCS integration for private repos +- `RunnerConstraints` — Shared or private runner +- `Approvers` — Approval emails +- `MiniSteps` — Notifications and workflow chaining +- `EnvironmentVariables` — Env vars for the workflows + +### Bulk import workflows to StackGuardian Platform + +```bash +export SG_API_TOKEN= +sg-cli workflow create --bulk --org "" -- sg-payload.json +``` + +## Output Format + +The `sg-payload.json` is a JSON array of workflow definitions. See `example_payload.jsonc` for the full annotated schema. + +Each workflow maps: + +| SG Field | Source | +|---|---| +| `ResourceName` | Repo name (+ subdir for monorepos) | +| `WfType` | `TERRAFORM` | +| `TerraformConfig.terraformVersion` | Parsed from `required_version` in `.tf` files | +| `VCSConfig.customSource.config.repo` | Repository URL | +| `VCSConfig.customSource.config.ref` | Default branch | +| `VCSConfig.customSource.config.workingDir` | Subdirectory (for monorepos) | +| `DeploymentPlatformConfig` | Inferred from providers (placeholder if unknown) | +| `Tags` | Repo topics + `terraform` | diff --git a/transformer/git-vcs/example_payload.jsonc b/transformer/git-vcs/example_payload.jsonc new file mode 100644 index 0000000..992fd07 --- /dev/null +++ b/transformer/git-vcs/example_payload.jsonc @@ -0,0 +1,79 @@ +{ + // This is an example of a single workflow entry in the sg-payload.json output. + // The transformer generates a JSON array of these objects — one per Terraform project found. + + "ResourceName": "my-terraform-vpc", // workflow name — derived from repo name + subdir for monorepos + "Description": "Workflow for acme-org/my-terraform-vpc", // repo description or auto-generated + "Tags": ["migrated", "terraform", "infrastructure"], // config tags + repo topics + + "WfType": "TERRAFORM", // always TERRAFORM for this transformer + + "TerraformConfig": { + "managedTerraformState": false, // true if --managed-state or config.managed_terraform_state + "terraformVersion": "1.5.0", // inferred from required_version in .tf files, or default 1.5.0 + "approvalPreApply": false // true when approvers list is non-empty + // "extraCLIArgs": "-var-file=terraform.tfvars" // added when .tfvars files are detected + }, + + "DeploymentPlatformConfig": [ + { + "kind": "AWS_RBAC", // inferred from providers (aws→AWS_RBAC, azurerm→AZURE_STATIC, google→GCP_STATIC), or from config + "config": { + "integrationId": "/integrations/aws-dev-account", // from config.yaml or PLEASE_CONFIGURE placeholder + "profileName": "default" + } + } + ], + + "VCSConfig": { + "iacVCSConfig": { + "useMarketplaceTemplate": false, + "customSource": { + "sourceConfigDestKind": "GITHUB_COM", // auto-detected from provider, or config override + "config": { + "repo": "https://github.com/acme-org/my-terraform-vpc", // repository URL + "ref": "main", // default branch + "isPrivate": true, // from repo metadata + "auth": "/integrations/github_com", // from --vcs-auth or config.vcs_auth_integration + "workingDir": "", // subdirectory path for monorepos (e.g., "infra/vpc") + "includeSubModule": false + } + } + }, + "iacInputData": { + "schemaType": "RAW_JSON", + "data": {} // Terraform input variables — empty by default, user can fill in + } + }, + + "RunnerConstraints": { + "type": "shared" // or "private" with "names": ["runner-group"] from config + }, + + "Approvers": [], // email addresses from config.approvers + + "EnvironmentVariables": [], // from config.environment_variables + + "MiniSteps": { + "wfChaining": { + "ERRORED": [], + "COMPLETED": [] + }, + "notifications": { + "email": { + "ERRORED": [], + "COMPLETED": [], + "APPROVAL_REQUIRED": [], + "CANCELLED": [] + } + } + }, + + "UserSchedules": [], + + "CLIConfiguration": { + "WorkflowGroup": { + "name": "imported-workflows" // from --wfgrp or config.wfgrp_name + } + } +} diff --git a/transformer/git-vcs/pyproject.toml b/transformer/git-vcs/pyproject.toml new file mode 100644 index 0000000..8d9200a --- /dev/null +++ b/transformer/git-vcs/pyproject.toml @@ -0,0 +1,18 @@ +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[project] +name = "sg-git-scan" +version = "0.1.0" +description = "Scan Git repositories for Terraform and generate StackGuardian bulk workflow payloads" +readme = "README.md" +license = {text = "MIT"} +requires-python = ">=3.10" +dependencies = [ + "httpx>=0.25.0", + "python-hcl2>=4.3.0", +] + +[project.scripts] +sg-git-scan = "sg_git_scan.cli:main" diff --git a/transformer/git-vcs/sg_git_scan/__init__.py b/transformer/git-vcs/sg_git_scan/__init__.py new file mode 100644 index 0000000..a3f1d6f --- /dev/null +++ b/transformer/git-vcs/sg_git_scan/__init__.py @@ -0,0 +1,3 @@ +"""StackGuardian Git VCS Transformer — scan Git repos, generate SG workflow payloads.""" + +__version__ = "0.1.0" diff --git a/transformer/git-vcs/sg_git_scan/__main__.py b/transformer/git-vcs/sg_git_scan/__main__.py new file mode 100644 index 0000000..8df7195 --- /dev/null +++ b/transformer/git-vcs/sg_git_scan/__main__.py @@ -0,0 +1,5 @@ +"""Allow running as: python -m sg_git_scan""" + +from sg_git_scan.cli import main + +main() diff --git a/transformer/git-vcs/sg_git_scan/cli.py b/transformer/git-vcs/sg_git_scan/cli.py new file mode 100644 index 0000000..59e6c82 --- /dev/null +++ b/transformer/git-vcs/sg_git_scan/cli.py @@ -0,0 +1,195 @@ +#!/usr/bin/env python3 +""" +StackGuardian Migrator — git-vcs transformer CLI. + +Connects to GitHub or GitLab, discovers Terraform repositories, +and generates an sg-payload.json for bulk workflow creation. + +Usage: + sg-git-scan --provider github --token ghp_xxx --org my-org + sg-git-scan --provider gitlab --token glpat-xxx --org my-group +""" + +import argparse +import json +import logging +import sys +from pathlib import Path +from typing import Any + +from sg_git_scan.vcs import GitHubClient, GitLabClient, VCSError +from sg_git_scan.scanner import detect_terraform_dirs +from sg_git_scan.transform import build_payload + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(message)s", + datefmt="%H:%M:%S", +) +logger = logging.getLogger(__name__) + + +def discover_repos(args: argparse.Namespace) -> list[dict[str, Any]]: + """Fetch repositories from VCS provider.""" + provider = args.provider.lower() + token = args.token + + if provider == "github": + client = GitHubClient(token=token) + repos = client.list_repos(org=args.org, user=args.user, max_repos=args.max_repos) + elif provider == "gitlab": + client = GitLabClient(token=token) + repos = client.list_repos(group=args.org, user=args.user, max_repos=args.max_repos) + else: + logger.error(f"Unsupported provider: {provider}. Use 'github' or 'gitlab'.") + sys.exit(1) + + # Filter out archived and forks by default + if not args.include_archived: + repos = [r for r in repos if not r.get("is_archived")] + if not args.include_forks: + repos = [r for r in repos if not r.get("is_fork")] + + logger.info(f"Discovered {len(repos)} repositories from {provider}") + return repos + + +def scan_repos( + repos: list[dict[str, Any]], + provider: str, + token: str, +) -> list[tuple[dict[str, Any], list[dict[str, Any]]]]: + """ + For each repo, fetch the file tree and detect Terraform projects. + Returns list of (repo, [project, ...]) tuples — only repos with TF detected. + """ + results: list[tuple[dict[str, Any], list[dict[str, Any]]]] = [] + + if provider == "github": + client = GitHubClient(token=token) + elif provider == "gitlab": + client = GitLabClient(token=token) + else: + return results + + total = len(repos) + for idx, repo in enumerate(repos, 1): + name = repo["full_name"] + logger.info(f"[{idx}/{total}] Scanning {name}...") + + try: + if provider == "github": + owner = repo["owner"] + repo_name = repo["name"] + ref = repo.get("default_branch", "HEAD") + file_tree = client.get_file_tree(owner, repo_name, ref=ref) + else: + file_tree = client.get_file_tree(repo["id"], ref=repo.get("default_branch", "HEAD")) + except VCSError as exc: + logger.warning(f" Could not fetch file tree for {name}: {exc}") + continue + + if not file_tree: + logger.debug(f" Empty file tree for {name} — skipping") + continue + + projects = detect_terraform_dirs(file_tree) + if projects: + logger.info(f" Found {len(projects)} Terraform project(s) in {name}") + results.append((repo, projects)) + else: + logger.debug(f" No Terraform detected in {name}") + + return results + + +def main() -> None: + parser = argparse.ArgumentParser( + description="StackGuardian Migrator — generate bulk workflow payload from Git repositories.", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog="""\ +examples: + sg-git-scan --provider github --token ghp_xxx --org my-org + sg-git-scan --provider gitlab --token glpat-xxx --org my-group + sg-git-scan --provider github --token ghp_xxx --org my-org --max-repos 50 --output export/sg-payload.json + """, + ) + + # Required + parser.add_argument("--provider", "-p", required=True, choices=["github", "gitlab"], + help="VCS provider (github or gitlab)") + parser.add_argument("--token", "-t", required=True, + help="VCS access token (GitHub PAT or GitLab PAT)") + + # Target + parser.add_argument("--org", "-o", default=None, + help="Organization (GitHub) or group (GitLab) to scan") + parser.add_argument("--user", "-u", default=None, + help="User whose repos to scan (if not using --org)") + + # Filtering + parser.add_argument("--max-repos", "-m", type=int, default=None, + help="Maximum repositories to scan") + parser.add_argument("--include-archived", action="store_true", default=False, + help="Include archived repositories") + parser.add_argument("--include-forks", action="store_true", default=False, + help="Include forked repositories") + + # SG defaults + parser.add_argument("--wfgrp", default="imported-workflows", + help="Workflow group name (default: imported-workflows)") + parser.add_argument("--vcs-auth", default="", + help="SG VCS integration path (e.g., /integrations/github_com)") + parser.add_argument("--managed-state", action="store_true", default=False, + help="Enable SG-managed Terraform state") + + # Output + parser.add_argument("--output", "-O", default="sg-payload.json", + help="Output file path (default: sg-payload.json)") + parser.add_argument("--quiet", "-q", action="store_true", default=False, + help="Minimal output") + parser.add_argument("--verbose", "-v", action="store_true", default=False, + help="Verbose/debug output") + + args = parser.parse_args() + + if args.verbose: + logging.getLogger().setLevel(logging.DEBUG) + if args.quiet: + logging.getLogger().setLevel(logging.WARNING) + + # --- Step 1: Discover repos --- + repos = discover_repos(args) + if not repos: + logger.warning("No repositories found. Check your token, org, and permissions.") + sys.exit(0) + + # --- Step 2: Scan for Terraform --- + repos_with_projects = scan_repos(repos, args.provider.lower(), args.token) + if not repos_with_projects: + logger.warning("No Terraform projects found in any repository.") + sys.exit(0) + + total_projects = sum(len(projects) for _, projects in repos_with_projects) + logger.info(f"Found {total_projects} Terraform project(s) across {len(repos_with_projects)} repo(s)") + + # --- Step 3: Transform to SG payload --- + payload = build_payload( + repos_with_projects, + wfgrp_name=args.wfgrp, + vcs_auth_integration=args.vcs_auth, + managed_terraform_state=args.managed_state, + ) + + # --- Step 4: Write output --- + output_path = Path(args.output) + output_path.parent.mkdir(parents=True, exist_ok=True) + output_path.write_text(json.dumps(payload, indent=2)) + + logger.info(f"Generated {len(payload)} workflow(s) → {output_path}") + logger.info(f"Use the example_payload.jsonc as a reference to edit sg-payload.json before importing") + logger.info(f"Next step: sg-cli workflow create --bulk --org \"\" -- {output_path}") + + +if __name__ == "__main__": + main() diff --git a/transformer/git-vcs/sg_git_scan/scanner.py b/transformer/git-vcs/sg_git_scan/scanner.py new file mode 100644 index 0000000..46f1d0b --- /dev/null +++ b/transformer/git-vcs/sg_git_scan/scanner.py @@ -0,0 +1,262 @@ +""" +IaC detection and Terraform parsing. + +Ported from sg-onboard — detects Terraform projects in a file tree +and parses HCL to extract providers, modules, variables, backend, and version info. + +Works in two modes: + 1. Remote: uses VCS API file trees (no clone needed) + 2. Local: scans cloned/local directories and parses .tf files with python-hcl2 +""" + +import logging +import re +from fnmatch import fnmatch +from pathlib import Path +from typing import Any, Optional + +logger = logging.getLogger(__name__) + + +# --------------------------------------------------------------------------- +# File-tree based detection (remote — no clone required) +# --------------------------------------------------------------------------- + +TF_FILE_PATTERNS = ["*.tf", "*.tf.json"] +TFVARS_PATTERNS = ["*.tfvars", "*.tfvars.json"] +LOCK_PATTERNS = [".terraform.lock.hcl"] +EXCLUDE_DIRS = { + ".git", ".terraform", ".terragrunt-cache", "node_modules", + "vendor", "__pycache__", ".venv", "venv", +} + + +def _matches(name: str, patterns: list[str]) -> bool: + return any(fnmatch(name, p) for p in patterns) + + +def detect_terraform_dirs(file_tree: list[str]) -> list[dict[str, Any]]: + """ + From a flat file-tree list, identify directories that contain Terraform files. + + Returns a list of dicts: + { + "path": "infra/vpc", # directory relative to repo root ("" for root) + "tf_files": ["main.tf", ...], + "has_tfvars": True, + "has_lockfile": False, + "tfvars_files": ["terraform.tfvars"], + } + """ + dirs: dict[str, dict[str, Any]] = {} + + for filepath in file_tree: + parts = filepath.split("/") + + # skip excluded dirs + if any(p in EXCLUDE_DIRS for p in parts): + continue + + name = parts[-1] + dir_path = "/".join(parts[:-1]) if len(parts) > 1 else "" + + if dir_path not in dirs: + dirs[dir_path] = { + "path": dir_path, + "tf_files": [], + "tfvars_files": [], + "has_lockfile": False, + } + + if _matches(name, TF_FILE_PATTERNS): + dirs[dir_path]["tf_files"].append(name) + elif _matches(name, TFVARS_PATTERNS): + dirs[dir_path]["tfvars_files"].append(name) + elif _matches(name, LOCK_PATTERNS): + dirs[dir_path]["has_lockfile"] = True + + # only keep directories that actually have .tf files + return [d for d in dirs.values() if d["tf_files"]] + + +# --------------------------------------------------------------------------- +# Local HCL parsing (requires cloned repo + python-hcl2) +# --------------------------------------------------------------------------- + +def parse_terraform_dir(dir_path: Path) -> Optional[dict[str, Any]]: + """ + Parse all .tf files in *dir_path* and return aggregated metadata. + + Returns None if the directory has no .tf files or parsing fails entirely. + """ + try: + import hcl2 + except ImportError: + logger.warning("python-hcl2 not installed — skipping deep parse") + return None + + tf_files = list(dir_path.glob("*.tf")) + if not tf_files: + return None + + providers: list[dict[str, Any]] = [] + modules: list[dict[str, Any]] = [] + variables: list[dict[str, Any]] = [] + outputs: list[dict[str, Any]] = [] + backend_type: Optional[str] = None + terraform_version: Optional[str] = None + has_backend = False + + seen_providers: set[str] = set() + + for tf_file in tf_files: + try: + content = tf_file.read_text(encoding="utf-8", errors="ignore") + parsed = hcl2.loads(content) + except Exception as exc: + logger.debug(f"Failed to parse {tf_file}: {exc}") + continue + + # --- providers --- + for blk in parsed.get("provider", []): + if isinstance(blk, dict): + for name, cfg in blk.items(): + if isinstance(cfg, list) and cfg: + cfg = cfg[0] + alias = cfg.get("alias") if isinstance(cfg, dict) else None + key = f"{name}:{alias or ''}" + if key not in seen_providers: + seen_providers.add(key) + providers.append({"name": name, "source": None, "version": None, "alias": alias}) + + for blk in parsed.get("terraform", []): + if not isinstance(blk, dict): + continue + + # terraform version + if blk.get("required_version"): + terraform_version = blk["required_version"] + + # required_providers + for rp in blk.get("required_providers", []): + if isinstance(rp, dict): + for name, cfg in rp.items(): + source = cfg.get("source") if isinstance(cfg, dict) else None + version = cfg.get("version") if isinstance(cfg, dict) else (cfg if isinstance(cfg, str) else None) + key = f"{name}:" + existing = next((p for p in providers if p["name"] == name and not p["alias"]), None) + if existing: + existing["source"] = existing["source"] or source + existing["version"] = existing["version"] or version + elif key not in seen_providers: + seen_providers.add(key) + providers.append({"name": name, "source": source, "version": version, "alias": None}) + + # backend + for be in blk.get("backend", []): + if isinstance(be, dict): + for bt in be: + backend_type = bt + has_backend = True + + # --- modules --- + for blk in parsed.get("module", []): + if isinstance(blk, dict): + for name, cfg in blk.items(): + if isinstance(cfg, list) and cfg: + cfg = cfg[0] + if not isinstance(cfg, dict): + continue + modules.append({ + "name": name, + "source": cfg.get("source", ""), + "version": cfg.get("version"), + }) + + # --- variables --- + for blk in parsed.get("variable", []): + if isinstance(blk, dict): + for name, cfg in blk.items(): + if isinstance(cfg, list) and cfg: + cfg = cfg[0] + variables.append({ + "name": name, + "type": str(cfg.get("type")) if isinstance(cfg, dict) and cfg.get("type") else None, + "description": cfg.get("description") if isinstance(cfg, dict) else None, + "default": cfg.get("default") if isinstance(cfg, dict) else None, + }) + + # --- outputs --- + for blk in parsed.get("output", []): + if isinstance(blk, dict): + for name, cfg in blk.items(): + if isinstance(cfg, list) and cfg: + cfg = cfg[0] + outputs.append({ + "name": name, + "description": cfg.get("description") if isinstance(cfg, dict) else None, + }) + + if not providers and not modules and not variables and not outputs and not has_backend: + # parsed OK but nothing useful extracted — still return structure + pass + + return { + "providers": providers, + "modules": modules, + "variables": variables, + "outputs": outputs, + "backend_type": backend_type, + "has_backend": has_backend, + "terraform_version": terraform_version, + "tf_files": [f.name for f in tf_files], + } + + +def infer_terraform_version(version_constraint: Optional[str]) -> str: + """ + Convert a Terraform version constraint into a concrete version for SG workflow. + + Examples: + ">= 1.5.0" -> "1.5.0" + "~> 1.3" -> "1.3.0" + "1.6.2" -> "1.6.2" + None -> "1.5.0" (sensible default) + """ + if not version_constraint: + return "1.5.0" + + # strip constraint operators + cleaned = re.sub(r"[><=~!\s]", "", version_constraint).strip() + if not cleaned: + return "1.5.0" + + # pick the first version-like token + match = re.search(r"(\d+\.\d+(?:\.\d+)?)", cleaned) + if match: + v = match.group(1) + # ensure three-part version + if v.count(".") == 1: + v += ".0" + return v + + return "1.5.0" + + +def infer_cloud_provider(providers: list[dict[str, Any]]) -> Optional[str]: + """ + Guess the primary cloud from the Terraform providers list. + + Returns one of: "AWS_RBAC", "AZURE_STATIC", "GCP_STATIC", or None. + """ + provider_names = {p["name"].lower() for p in providers} + source_names = {(p.get("source") or "").lower() for p in providers} + all_names = provider_names | source_names + + if any("aws" in n for n in all_names): + return "AWS_RBAC" + if any("azurerm" in n or "azure" in n for n in all_names): + return "AZURE_STATIC" + if any("google" in n or "gcp" in n for n in all_names): + return "GCP_STATIC" + return None diff --git a/transformer/git-vcs/sg_git_scan/transform.py b/transformer/git-vcs/sg_git_scan/transform.py new file mode 100644 index 0000000..57d39fc --- /dev/null +++ b/transformer/git-vcs/sg_git_scan/transform.py @@ -0,0 +1,194 @@ +""" +Transform scanned repository data into StackGuardian workflow payloads. + +Takes the per-repo metadata (from VCS API + optional HCL parsing) and +produces a list of workflow JSON objects compatible with: + - sg-cli workflow create --bulk + - StackGuardian Import Workflows UI +""" + +from typing import Any, Optional + +from sg_git_scan.scanner import infer_terraform_version, infer_cloud_provider + + +def build_workflow( + repo: dict[str, Any], + project: dict[str, Any], + *, + # SG defaults (from config / CLI flags) + wfgrp_name: str = "imported-workflows", + deployment_platform_config: Optional[list[dict[str, Any]]] = None, + vcs_auth_integration: str = "", + source_config_dest_kind: str = "", + runner_constraints: Optional[dict[str, Any]] = None, + approvers: Optional[list[str]] = None, + managed_terraform_state: bool = False, + environment_variables: Optional[list[dict[str, Any]]] = None, + tags: Optional[list[str]] = None, +) -> dict[str, Any]: + """ + Build a single StackGuardian workflow payload from repo + project metadata. + + Parameters + ---------- + repo : dict + Repository metadata from VCS client (id, name, url, clone_url, ...). + project : dict + Terraform project metadata: + path, tf_files, tfvars_files, providers, modules, variables, + terraform_version, has_backend, backend_type, ... + wfgrp_name : str + Workflow group name in StackGuardian. + deployment_platform_config : list + Cloud connector config (kind + integrationId). + vcs_auth_integration : str + SG integration/secret path for VCS auth (e.g., /integrations/github_com). + source_config_dest_kind : str + VCS type: GITHUB_COM, GITLAB_COM, BITBUCKET_ORG, AZURE_DEVOPS, GIT_OTHER. + runner_constraints : dict + Runner config (shared vs private). + approvers : list + Email list for plan approvals. + managed_terraform_state : bool + Whether SG should manage the Terraform state. + """ + providers = project.get("providers", []) + tf_version = infer_terraform_version(project.get("terraform_version")) + project_path = project.get("path", "") + + # --- Resource name --- + # For monorepos (multiple projects), include the subdir in the name + if project_path and project_path != ".": + resource_name = f"{repo['name']}-{project_path.replace('/', '-')}" + else: + resource_name = repo["name"] + + # --- VCS config --- + repo_url = repo.get("url", "") + default_branch = repo.get("default_branch", "main") + is_private = repo.get("is_private", True) + + # Auto-detect sourceConfigDestKind from provider field if not given + if not source_config_dest_kind: + provider_kind = repo.get("provider", "") + kind_map = { + "GITHUB_COM": "GITHUB_COM", + "GITLAB_COM": "GITLAB_COM", + "BITBUCKET_ORG": "BITBUCKET_ORG", + "AZURE_DEVOPS": "AZURE_DEVOPS", + } + source_config_dest_kind = kind_map.get(provider_kind, "GIT_OTHER") + + # --- Deployment platform --- + if not deployment_platform_config: + inferred_cloud = infer_cloud_provider(providers) + if inferred_cloud: + deployment_platform_config = [{ + "kind": inferred_cloud, + "config": { + "integrationId": f"/integrations/PLEASE_CONFIGURE", + "profileName": "default", + }, + }] + else: + deployment_platform_config = [] + + # --- Extra CLI args for tfvars --- + tfvars_files = project.get("tfvars_files", []) + extra_cli_args = "" + if tfvars_files: + # Use the first tfvars file found + tfvars_path = tfvars_files[0] + if project_path: + extra_cli_args = f"-var-file={tfvars_path}" + else: + extra_cli_args = f"-var-file={tfvars_path}" + + # --- Tags --- + wf_tags = list(tags or []) + repo_topics = repo.get("topics", []) + if repo_topics: + wf_tags.extend(repo_topics) + # add the iac type + wf_tags.append("terraform") + # deduplicate + wf_tags = list(dict.fromkeys(wf_tags)) + + # --- Build payload --- + workflow: dict[str, Any] = { + "ResourceName": resource_name, + "Description": repo.get("description") or f"Workflow for {repo['full_name']}", + "Tags": wf_tags, + "EnvironmentVariables": environment_variables or [], + "DeploymentPlatformConfig": deployment_platform_config, + "WfType": "TERRAFORM", + "TerraformConfig": { + "managedTerraformState": managed_terraform_state, + "terraformVersion": tf_version, + "approvalPreApply": bool(approvers), + }, + "VCSConfig": { + "iacVCSConfig": { + "useMarketplaceTemplate": False, + "customSource": { + "sourceConfigDestKind": source_config_dest_kind, + "config": { + "repo": repo_url, + "ref": default_branch, + "isPrivate": is_private, + "auth": vcs_auth_integration if is_private else "", + "workingDir": project_path if project_path and project_path != "." else "", + "includeSubModule": False, + }, + }, + }, + "iacInputData": { + "schemaType": "RAW_JSON", + "data": {}, + }, + }, + "RunnerConstraints": runner_constraints or {"type": "shared"}, + "Approvers": approvers or [], + "MiniSteps": { + "wfChaining": {"ERRORED": [], "COMPLETED": []}, + "notifications": { + "email": { + "ERRORED": [], + "COMPLETED": [], + "APPROVAL_REQUIRED": [], + "CANCELLED": [], + }, + }, + }, + "UserSchedules": [], + "CLIConfiguration": { + "WorkflowGroup": {"name": wfgrp_name}, + }, + } + + # Add extra CLI args if tfvars detected + if extra_cli_args: + workflow["TerraformConfig"]["extraCLIArgs"] = extra_cli_args + + return workflow + + +def build_payload( + repos_with_projects: list[tuple[dict[str, Any], list[dict[str, Any]]]], + **kwargs, +) -> list[dict[str, Any]]: + """ + Build the full sg-payload.json content from a list of (repo, projects) pairs. + + Each repo may have multiple Terraform projects (monorepo support). + Returns a list of workflow dicts ready for JSON serialization. + """ + workflows: list[dict[str, Any]] = [] + + for repo, projects in repos_with_projects: + for project in projects: + wf = build_workflow(repo, project, **kwargs) + workflows.append(wf) + + return workflows diff --git a/transformer/git-vcs/sg_git_scan/vcs.py b/transformer/git-vcs/sg_git_scan/vcs.py new file mode 100644 index 0000000..16d7e06 --- /dev/null +++ b/transformer/git-vcs/sg_git_scan/vcs.py @@ -0,0 +1,275 @@ +""" +VCS clients for GitHub and GitLab. + +Lightweight repo-listing clients ported from sg-onboard. +Only fetches repository metadata — no cloning logic here. +""" + +import logging +import time +import urllib.parse +from typing import Any, Optional + +import httpx + +logger = logging.getLogger(__name__) + +GITHUB_API_URL = "https://api.github.com" +GITLAB_API_URL = "https://gitlab.com/api/v4" + + +class VCSError(Exception): + def __init__(self, message: str, status_code: Optional[int] = None): + super().__init__(message) + self.status_code = status_code + + +class RateLimitError(VCSError): + def __init__(self, message: str, retry_after: int = 0, **kwargs): + super().__init__(message, **kwargs) + self.retry_after = retry_after + + +# --------------------------------------------------------------------------- +# GitHub +# --------------------------------------------------------------------------- + +class GitHubClient: + """Minimal GitHub API client for listing org/user repositories.""" + + def __init__(self, token: str, api_url: str = GITHUB_API_URL): + self.token = token + self.api_url = api_url.rstrip("/") + + def _headers(self) -> dict[str, str]: + return { + "Accept": "application/vnd.github.v3+json", + "Authorization": f"token {self.token}", + "X-GitHub-Api-Version": "2022-11-28", + } + + def _handle(self, resp: httpx.Response) -> Any: + if resp.status_code == 401: + raise VCSError("GitHub authentication failed", 401) + if resp.status_code == 403: + remaining = int(resp.headers.get("X-RateLimit-Remaining", "1")) + if remaining == 0: + reset_ts = int(resp.headers.get("X-RateLimit-Reset", "0")) + wait = max(0, reset_ts - int(time.time())) + raise RateLimitError(f"Rate limit exceeded, resets in {wait}s", retry_after=wait, status_code=403) + raise VCSError("GitHub access forbidden", 403) + if resp.status_code == 404: + raise VCSError("Not found", 404) + if resp.status_code >= 400: + raise VCSError(f"GitHub API error: {resp.text}", resp.status_code) + return resp.json() + + def _next_page(self, link_header: str) -> Optional[int]: + if not link_header: + return None + for part in link_header.split(","): + if 'rel="next"' in part: + try: + url_part = part.split(";")[0].strip().strip("<>") + params = urllib.parse.parse_qs(urllib.parse.urlparse(url_part).query) + if "page" in params: + return int(params["page"][0]) + except (ValueError, IndexError): + pass + return None + + def list_repos( + self, + org: Optional[str] = None, + user: Optional[str] = None, + max_repos: Optional[int] = None, + ) -> list[dict[str, Any]]: + """List repositories for an org, user, or the authenticated user.""" + repos: list[dict[str, Any]] = [] + page = 1 + per_page = 100 + + with httpx.Client(base_url=self.api_url, headers=self._headers(), timeout=30.0) as client: + while True: + params: dict[str, Any] = {"per_page": per_page, "page": page} + if org: + params["type"] = "all" + resp = client.get(f"/orgs/{org}/repos", params=params) + elif user: + params["type"] = "all" + resp = client.get(f"/users/{user}/repos", params=params) + else: + params["visibility"] = "all" + params["affiliation"] = "owner,collaborator,organization_member" + resp = client.get("/user/repos", params=params) + + data = self._handle(resp) + if not data: + break + + for r in data: + repos.append(self._format(r)) + + if max_repos and len(repos) >= max_repos: + repos = repos[:max_repos] + break + + next_page = self._next_page(resp.headers.get("Link", "")) + if next_page is None: + break + page = next_page + + return repos + + def get_file_tree(self, owner: str, repo: str, ref: str = "HEAD") -> list[str]: + """Fetch the full file tree of a repo via the Git Trees API (recursive).""" + with httpx.Client(base_url=self.api_url, headers=self._headers(), timeout=30.0) as client: + resp = client.get(f"/repos/{owner}/{repo}/git/trees/{ref}", params={"recursive": "1"}) + if resp.status_code == 404 or resp.status_code == 409: + return [] + data = self._handle(resp) + return [item["path"] for item in data.get("tree", []) if item.get("type") == "blob"] + + @staticmethod + def _format(r: dict[str, Any]) -> dict[str, Any]: + owner = r.get("owner", {}) + return { + "id": str(r.get("id")), + "name": r.get("name", ""), + "full_name": r.get("full_name", ""), + "url": r.get("html_url", ""), + "clone_url": r.get("clone_url", ""), + "owner": owner.get("login", ""), + "default_branch": r.get("default_branch", "main"), + "is_private": r.get("private", True), + "is_archived": r.get("archived", False), + "is_fork": r.get("fork", False), + "description": r.get("description"), + "topics": r.get("topics", []), + "language": r.get("language"), + "provider": "GITHUB_COM", + } + + +# --------------------------------------------------------------------------- +# GitLab +# --------------------------------------------------------------------------- + +class GitLabClient: + """Minimal GitLab API client for listing group/user projects.""" + + def __init__(self, token: str, api_url: str = GITLAB_API_URL): + self.token = token + self.api_url = api_url.rstrip("/") + + def _headers(self) -> dict[str, str]: + return { + "Accept": "application/json", + "PRIVATE-TOKEN": self.token, + } + + def _handle(self, resp: httpx.Response) -> Any: + if resp.status_code == 401: + raise VCSError("GitLab authentication failed", 401) + if resp.status_code == 403: + raise VCSError("GitLab access forbidden", 403) + if resp.status_code == 404: + raise VCSError("Not found", 404) + if resp.status_code >= 400: + raise VCSError(f"GitLab API error: {resp.text}", resp.status_code) + return resp.json() + + def _next_page(self, headers: httpx.Headers) -> Optional[int]: + np = headers.get("X-Next-Page") + if np and np.strip(): + try: + return int(np) + except ValueError: + pass + return None + + def list_repos( + self, + group: Optional[str] = None, + user: Optional[str] = None, + max_repos: Optional[int] = None, + ) -> list[dict[str, Any]]: + repos: list[dict[str, Any]] = [] + page = 1 + per_page = 100 + + with httpx.Client(base_url=self.api_url, headers=self._headers(), timeout=30.0) as client: + while True: + params: dict[str, Any] = {"per_page": per_page, "page": page, "order_by": "last_activity_at", "sort": "desc"} + if group: + encoded = urllib.parse.quote(group, safe="") + params["include_subgroups"] = "true" + resp = client.get(f"/groups/{encoded}/projects", params=params) + elif user: + resp = client.get(f"/users/{user}/projects", params=params) + else: + params["membership"] = "true" + resp = client.get("/projects", params=params) + + data = self._handle(resp) + if not data: + break + + for p in data: + repos.append(self._format(p)) + + if max_repos and len(repos) >= max_repos: + repos = repos[:max_repos] + break + + next_page = self._next_page(resp.headers) + if next_page is None: + break + page = next_page + + return repos + + def get_file_tree(self, project_id: str, ref: str = "HEAD") -> list[str]: + """Fetch the file tree of a GitLab project via the Repository Tree API.""" + files: list[str] = [] + page = 1 + with httpx.Client(base_url=self.api_url, headers=self._headers(), timeout=30.0) as client: + while True: + resp = client.get( + f"/projects/{project_id}/repository/tree", + params={"ref": ref, "recursive": "true", "per_page": 100, "page": page}, + ) + if resp.status_code in (404, 409): + return [] + data = self._handle(resp) + if not data: + break + files.extend(item["path"] for item in data if item.get("type") == "blob") + next_page = self._next_page(resp.headers) + if next_page is None: + break + page = next_page + return files + + @staticmethod + def _format(p: dict[str, Any]) -> dict[str, Any]: + ns = p.get("namespace", {}) + path_with_ns = p.get("path_with_namespace", "") + parts = path_with_ns.rsplit("/", 1) + owner = parts[0] if len(parts) > 1 else ns.get("path", "") + return { + "id": str(p.get("id")), + "name": p.get("name", ""), + "full_name": path_with_ns, + "url": p.get("web_url", ""), + "clone_url": p.get("http_url_to_repo", ""), + "owner": owner, + "default_branch": p.get("default_branch", "main"), + "is_private": p.get("visibility") == "private", + "is_archived": p.get("archived", False), + "is_fork": bool(p.get("forked_from_project")), + "description": p.get("description"), + "topics": p.get("topics", []) or p.get("tag_list", []), + "language": None, + "provider": "GITLAB_COM", + }