Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions .github/workflows/control-plane.yml
Original file line number Diff line number Diff line change
Expand Up @@ -537,8 +537,6 @@ jobs:
)
if [[ "${TARGET_PUBLISH}" == "true" ]]; then
args+=(--publish)
else
args+=(--no-integration)
fi
python -m aio_fleet "${args[@]}" 2>&1 | tee central-control-check.log

Expand Down
27 changes: 19 additions & 8 deletions src/aio_fleet/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
catalog_repo_failures,
derived_repo_failures,
pinned_action_failures,
repo_local_workflow_failures,
repo_policy_failures,
template_metadata_failures,
tracked_artifact_failures,
Expand Down Expand Up @@ -536,6 +537,18 @@ def cmd_validate_actions(args: argparse.Namespace) -> int:
return 0


def cmd_verify_caller(args: argparse.Namespace) -> int:
manifest = load_manifest(Path(args.manifest))
repo = _repo_for_identifier(manifest, args.repo)
repo_at_path = _repo_with_path(repo, Path(args.repo_path).resolve())
failures = repo_local_workflow_failures(repo_at_path)
if failures:
print("\n".join(failures), file=sys.stderr)
return 1
print(f"{repo.name} caller policy checks passed")
return 0


def cmd_validate_derived(args: argparse.Namespace) -> int:
failures = derived_repo_failures(
Path(args.repo_path).resolve(),
Expand Down Expand Up @@ -1090,14 +1103,7 @@ def cmd_registry_publish(args: argparse.Namespace) -> int:
if args.dry_run:
print(" ".join(shlex.quote(part) for part in command))
return 0
tags = compute_registry_tags(repo, sha=sha, component=args.component)
preflight_failures = verify_registry_tags(tags.all_tags)
if not preflight_failures:
print(f"{repo.name}:{args.component}: registry=already-current", flush=True)
return 0
print(f"{repo.name}:{args.component}: registry=preflight-missing", flush=True)
for failure in preflight_failures:
print(f"{repo.name}:{args.component}: preflight: {failure}", file=sys.stderr)
print(f"{repo.name}:{args.component}: registry=publishing", flush=True)
try:
with _registry_publish_environment(repo) as publish_env:
result = _run(command, cwd=repo.path, env=publish_env)
Expand Down Expand Up @@ -2660,6 +2666,11 @@ def build_parser() -> argparse.ArgumentParser:
actions.add_argument("--repo-path", default=".")
actions.set_defaults(func=cmd_validate_actions)

caller = sub.add_parser("verify-caller")
caller.add_argument("--repo", required=True)
caller.add_argument("--repo-path", required=True)
caller.set_defaults(func=cmd_verify_caller)

derived = sub.add_parser("validate-derived")
derived.add_argument("--repo-path", default=".")
derived.add_argument("--strict-placeholders", action="store_true")
Expand Down
37 changes: 29 additions & 8 deletions src/aio_fleet/control_plane.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,23 +68,40 @@ def central_check_steps(
include_integration: bool = True,
) -> list[Step]:
manifest_args = ["--manifest", str(manifest_path)] if manifest_path else []
trusted_cwd = _trusted_aio_root()
steps = [
Step(
"validate-template-common",
"validate-repo",
[
sys.executable,
"-m",
"aio_fleet.cli",
*manifest_args,
"validate-template-common",
"validate-repo",
"--repo",
repo.name,
"--repo-path",
str(repo.path),
],
repo.path,
trusted_cwd,
inherit_secrets=False,
)
),
Step(
"verify-caller",
[
sys.executable,
"-m",
"aio_fleet.cli",
*manifest_args,
"verify-caller",
"--repo",
repo.name,
"--repo-path",
str(repo.path),
],
trusted_cwd,
inherit_secrets=False,
),
]
install = _install_test_dependencies_step(repo.path)
if install is not None:
Expand Down Expand Up @@ -113,7 +130,7 @@ def central_check_steps(
prebuilt_integration_image = False
if (
include_integration
and event in {"push", "release", "workflow_dispatch"}
and event in {"pull_request", "push", "release", "workflow_dispatch"}
and integration_args
):
if publish:
Expand Down Expand Up @@ -157,7 +174,7 @@ def central_check_steps(
str(repo.path),
"--no-fix",
],
repo.path,
trusted_cwd,
inherit_secrets=False,
)
)
Expand Down Expand Up @@ -186,7 +203,7 @@ def central_check_steps(
"--component",
component,
],
repo.path,
trusted_cwd,
stream_output=True,
timeout_seconds=_repo_timeout_seconds(
repo, "registry_publish_timeout_seconds", default=3600
Expand All @@ -196,6 +213,10 @@ def central_check_steps(
return steps


def _trusted_aio_root() -> Path:
return Path(__file__).resolve().parents[2]


def run_steps(steps: list[Step], *, dry_run: bool = False) -> list[str]:
failures: list[str] = []
for step in steps:
Expand Down Expand Up @@ -443,7 +464,7 @@ def _repo_python(repo_path: Path) -> str:

def _install_test_dependencies_step(repo_path: Path) -> Step | None:
if (repo_path / "tests").exists():
aio_root = Path(__file__).resolve().parents[2]
aio_root = _trusted_aio_root()
return Step(
"install-test-deps",
[
Expand Down
14 changes: 13 additions & 1 deletion src/aio_fleet/github_policy.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import json
import os
import shutil
import subprocess # nosec B404
from pathlib import Path
Expand Down Expand Up @@ -203,15 +204,26 @@ def _gh_json(args: list[str]) -> Any:
gh = shutil.which("gh")
if gh is None:
raise RuntimeError("gh CLI is required for GitHub policy validation")
env = _gh_env()
result = subprocess.run( # nosec B603
[gh, *args], check=False, text=True, capture_output=True
[gh, *args], check=False, text=True, capture_output=True, env=env
)
if result.returncode != 0:
raise RuntimeError(result.stderr.strip() or f"gh {' '.join(args)} failed")
text = result.stdout.strip()
return json.loads(text) if text else None


def _gh_env() -> dict[str, str]:
env = dict(os.environ)
if not env.get("GH_TOKEN"):
for key in ("AIO_FLEET_WORKFLOW_TOKEN", "AIO_FLEET_CHECK_TOKEN", "APP_TOKEN"):
if env.get(key):
env["GH_TOKEN"] = env[key]
break
return env


def _deep_merge(base: dict[str, Any], override: dict[str, Any]) -> dict[str, Any]:
merged = dict(base)
for key, value in override.items():
Expand Down
13 changes: 13 additions & 0 deletions src/aio_fleet/safety.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import base64
import json
import os
import re
import subprocess # nosec B404
import urllib.error
Expand Down Expand Up @@ -627,15 +628,27 @@ def _confidence(level: str, signals: list[str], warnings: list[str]) -> float:


def _gh_json(args: list[str], *, check: bool = True) -> Any:
env = _gh_env()
result = subprocess.run( # nosec
["gh", *args],
text=True,
capture_output=True,
check=False,
env=env,
)
if result.returncode != 0:
if check:
raise RuntimeError(result.stderr.strip() or "gh command failed")
return None
text = result.stdout.strip()
return json.loads(text) if text else None


def _gh_env() -> dict[str, str]:
env = dict(os.environ)
if not env.get("GH_TOKEN"):
for key in ("AIO_FLEET_WORKFLOW_TOKEN", "AIO_FLEET_CHECK_TOKEN", "APP_TOKEN"):
if env.get(key):
env["GH_TOKEN"] = env[key]
break
return env
136 changes: 129 additions & 7 deletions src/aio_fleet/upstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import urllib.error
import urllib.parse
import urllib.request
from dataclasses import dataclass
from dataclasses import dataclass, replace
from functools import lru_cache
from pathlib import Path
from typing import Any
Expand Down Expand Up @@ -78,17 +78,139 @@ def monitor_repo(
*,
write: bool = False,
) -> list[UpstreamMonitorResult]:
results: list[UpstreamMonitorResult] = []
for config in monitor_configs(repo):
result = evaluate_monitor(repo, config)
results.append(result)
if write and result.updates_available and result.strategy == "pr":
configs = monitor_configs(repo)
results = [evaluate_monitor(repo, config) for config in configs]
results = _align_shared_release_digest_groups(configs, results)
if write:
_write_monitor_results(repo, configs, results)
return results


def _write_monitor_results(
repo: RepoConfig,
configs: list[dict[str, Any]],
results: list[UpstreamMonitorResult],
) -> None:
for config, result in zip(configs, results, strict=True):
if result.updates_available and result.strategy == "pr":
write_arg(result.dockerfile, result.version_key, result.latest_version)
if result.digest_key and result.latest_digest:
write_arg(result.dockerfile, result.digest_key, result.latest_digest)
if result.version_update:
update_submodule(repo, config, result)
return results


def _align_shared_release_digest_groups(
configs: list[dict[str, Any]],
results: list[UpstreamMonitorResult],
) -> list[UpstreamMonitorResult]:
grouped: dict[tuple[Path, str, str, bool, str], list[int]] = {}
for index, (config, result) in enumerate(zip(configs, results, strict=True)):
if (
config.get("source") != "github-releases"
or not result.digest_key
or not str(config.get("image", "")).strip()
or not str(config.get("digest_source", "")).strip()
or not result.version_key
):
continue
key = (
result.dockerfile.resolve(),
result.version_key,
str(config.get("repo", "")),
bool(config.get("stable_only", True)),
str(config.get("version_strip_prefix", "")),
)
grouped.setdefault(key, []).append(index)

aligned = list(results)
for (
_dockerfile,
_version_key,
upstream_repo,
stable_only,
strip_prefix,
), indexes in grouped.items():
if len(indexes) < 2:
continue
candidates, skipped = github_release_candidates_result(
upstream_repo,
stable_only=stable_only,
strip_prefix=strip_prefix,
)
current_version = aligned[indexes[0]].current_version
current_key = (
version_sort_key(current_version)
if SEMVER_RE.match(current_version)
else None
)
missing: list[dict[str, str]] = []
selected_version = current_version
selected_digests = {index: aligned[index].current_digest for index in indexes}
selected_tag = candidates[0].tag

for candidate in candidates:
selected_tag = candidate.tag
if (
current_key is not None
and version_sort_key(candidate.version) <= current_key
):
if candidate.version != current_version:
missing.append(
{
"version": candidate.version,
"reason": "not-newer-than-current",
}
)
break
candidate_digests: dict[int, str] = {}
candidate_missing = False
for index in indexes:
config = configs[index]
image = str(config.get("image", "")).strip()
registry = str(config.get("digest_source", "")).strip()
try:
candidate_digests[index] = registry_digest_for_version(
image,
candidate.version,
registry=registry,
prefix=str(config.get("digest_tag_prefix", "")),
)
except RegistryDigestNotFoundError:
missing.append(
{
"version": candidate.version,
"reason": f"missing-{registry}-digest",
}
)
candidate_missing = True
if candidate_missing:
continue
selected_version = candidate.version
selected_digests = candidate_digests
break

skipped_versions = tuple(missing) + skipped_github_release_report(
skipped, latest_tag=selected_tag
)
for index in indexes:
result = aligned[index]
latest_digest = selected_digests[index]
aligned[index] = replace(
result,
latest_version=selected_version,
latest_digest=latest_digest,
version_update=selected_version != result.current_version,
digest_update=bool(result.digest_key)
and latest_digest != result.current_digest,
submodule_ref=submodule_ref_for_version(
configs[index],
latest_version=selected_version,
current_version=result.current_version,
),
skipped_versions=skipped_versions,
)
return aligned


def monitor_configs(repo: RepoConfig) -> list[dict[str, Any]]:
Expand Down
Loading