diff --git a/control_plane/service.py b/control_plane/service.py index 52a5300..b1b34be 100644 --- a/control_plane/service.py +++ b/control_plane/service.py @@ -6053,6 +6053,9 @@ def product_action_allowed( "source_health_status": driver_result.source_health_status, "destination_health_status": driver_result.destination_health_status, "backup_status": driver_result.backup_status, + "release_status": driver_result.release_status, + "release_tag": driver_result.release_tag, + "release_url": driver_result.release_url, "dry_run": driver_result.dry_run, } elif path == _GENERIC_WEB_PROD_PROMOTION_WORKFLOW_ROUTE.route_path: diff --git a/control_plane/workflows/generic_web_deploy.py b/control_plane/workflows/generic_web_deploy.py index 23dccec..ce68fd8 100644 --- a/control_plane/workflows/generic_web_deploy.py +++ b/control_plane/workflows/generic_web_deploy.py @@ -10,6 +10,7 @@ from control_plane import runtime_environments as control_plane_runtime_environments from control_plane.contracts.deployment_record import DeploymentRecord, ResolvedTargetEvidence from control_plane.contracts.dokploy_target_record import DokployTargetType +from control_plane.contracts.environment_inventory import EnvironmentInventory from control_plane.contracts.product_profile_record import ( LaunchplaneProductProfileRecord, ProductLaneProfile, @@ -17,6 +18,7 @@ from control_plane.contracts.promotion_record import HealthcheckEvidence from control_plane.contracts.ship_request import ShipRequest from control_plane.workflows.dokploy_deploy import execute_dokploy_artifact_deploy +from control_plane.workflows.inventory import build_environment_inventory from control_plane.workflows.ship import ( build_deployment_record, generate_deployment_record_id, @@ -29,6 +31,8 @@ def read_product_profile_record(self, product: str) -> LaunchplaneProductProfile def write_deployment_record(self, record: DeploymentRecord) -> object: ... + def write_environment_inventory(self, record: EnvironmentInventory) -> object: ... + class GenericWebDeployRequest(BaseModel): model_config = ConfigDict(extra="forbid") @@ -305,15 +309,20 @@ def execute_generic_web_deploy( ) finished_at = utc_now_timestamp() - record_store.write_deployment_record( - build_deployment_record( - request=ship_request, - record_id=record_id, - deployment_id="control-plane-dokploy", - deployment_status="pass", - started_at=started_at, - finished_at=finished_at, - resolved_target=resolved_target, + deployment_record = build_deployment_record( + request=ship_request, + record_id=record_id, + deployment_id="control-plane-dokploy", + deployment_status="pass", + started_at=started_at, + finished_at=finished_at, + resolved_target=resolved_target, + ) + record_store.write_deployment_record(deployment_record) + record_store.write_environment_inventory( + build_environment_inventory( + deployment_record=deployment_record, + updated_at=finished_at, ) ) return GenericWebDeployResult( diff --git a/control_plane/workflows/generic_web_promotion.py b/control_plane/workflows/generic_web_promotion.py index 71873db..2fc88dc 100644 --- a/control_plane/workflows/generic_web_promotion.py +++ b/control_plane/workflows/generic_web_promotion.py @@ -4,6 +4,7 @@ from pathlib import Path from typing import Literal, Protocol from urllib.error import HTTPError, URLError +from urllib.parse import quote from urllib.request import Request, urlopen import click @@ -33,6 +34,10 @@ resolve_generic_web_profile_lane, ) from control_plane.workflows.inventory import build_environment_inventory +from control_plane.workflows.launchplane import ( + github_api_request, + resolve_launchplane_github_token, +) from control_plane.workflows.promote import generate_promotion_record_id from control_plane.workflows.ship import utc_now_timestamp @@ -40,6 +45,10 @@ class GenericWebPromotionStore(GenericWebDeployStore, Protocol): + def read_environment_inventory( + self, *, context_name: str, instance_name: str + ) -> EnvironmentInventory: ... + def read_backup_gate_record(self, record_id: str) -> BackupGateRecord: ... def read_deployment_record(self, record_id: str) -> DeploymentRecord: ... @@ -66,6 +75,7 @@ class GenericWebProdPromotionRequest(BaseModel): health_timeout_seconds: int = Field(default=DEFAULT_GENERIC_WEB_HEALTH_TIMEOUT_SECONDS, ge=1) dry_run: bool = False no_cache: bool = False + release_tag: str = "" @model_validator(mode="after") def _validate_request(self) -> "GenericWebProdPromotionRequest": @@ -75,6 +85,7 @@ def _validate_request(self) -> "GenericWebProdPromotionRequest": self.from_instance = self.from_instance.strip().lower() self.to_instance = self.to_instance.strip().lower() self.backup_record_id = self.backup_record_id.strip() + self.release_tag = self.release_tag.strip() if not self.product: raise ValueError("generic web prod promotion requires product") if not self.artifact_id: @@ -111,6 +122,9 @@ class GenericWebProdPromotionResult(BaseModel): backup_status: ReleaseStatus = "skipped" source_health_status: ReleaseStatus = "skipped" destination_health_status: ReleaseStatus = "skipped" + release_status: ReleaseStatus = "skipped" + release_tag: str = "" + release_url: str = "" target_name: str = "" target_type: DokployTargetType | Literal[""] = "" target_id: str = "" @@ -167,6 +181,11 @@ def execute_generic_web_prod_promotion( ) } ) + _validate_source_inventory_matches_request( + record_store=record_store, + request=request, + source_lane=source_lane, + ) promotion_record_id = generate_promotion_record_id( context_name=destination_lane.context, from_instance_name=source_lane.instance, @@ -203,6 +222,8 @@ def execute_generic_web_prod_promotion( backup_status=backup_gate.status, source_health_status=source_health.status, destination_health_status=destination_health.status, + release_status="pending" if request.release_tag else "skipped", + release_tag=request.release_tag, dry_run=True, ) @@ -318,7 +339,7 @@ def execute_generic_web_prod_promotion( ) record_store.write_environment_inventory(inventory) inventory_record_id = f"{inventory.context}-{inventory.instance}" - return _result_from_record( + final_result = _result_from_record( request=request, record=final_record, deployment_record=deployment_record, @@ -327,6 +348,33 @@ def execute_generic_web_prod_promotion( dry_run=False, error_message=deploy_result.error_message, ) + if final_result.promotion_status != "pass" or not request.release_tag: + return final_result + try: + release_url = _create_or_verify_github_release( + control_plane_root=control_plane_root, + profile=profile, + context=destination_lane.context, + request=request, + promotion_record_id=promotion_record_id, + deployment_record_id=deploy_result.deployment_record_id, + inventory_record_id=inventory_record_id, + ) + except click.ClickException as error: + return final_result.model_copy( + update={ + "release_status": "fail", + "release_tag": request.release_tag, + "error_message": str(error), + } + ) + return final_result.model_copy( + update={ + "release_status": "pass", + "release_tag": request.release_tag, + "release_url": release_url, + } + ) def _resolve_backup_gate( @@ -367,6 +415,34 @@ def _resolve_backup_gate( ) +def _validate_source_inventory_matches_request( + *, + record_store: GenericWebPromotionStore, + request: GenericWebProdPromotionRequest, + source_lane: ProductLaneProfile, +) -> None: + try: + source_inventory = record_store.read_environment_inventory( + context_name=source_lane.context, + instance_name=source_lane.instance, + ) + except FileNotFoundError as exc: + raise click.ClickException( + "Generic web prod promotion requires current source environment inventory. " + f"No inventory was found for {source_lane.context}/{source_lane.instance}." + ) from exc + inventory_artifact_id = "" + if source_inventory.artifact_identity is not None: + inventory_artifact_id = source_inventory.artifact_identity.artifact_id.strip() + if inventory_artifact_id != request.artifact_id or source_inventory.source_git_ref != request.source_git_ref: + raise click.ClickException( + "Generic web prod promotion request does not match current source inventory. " + f"Inventory artifact={inventory_artifact_id or ''} " + f"source_ref={source_inventory.source_git_ref}; " + f"request artifact={request.artifact_id} source_ref={request.source_git_ref}." + ) + + def _health_url_for_lane(*, lane: ProductLaneProfile, health_path: str) -> str: health_url = lane.health_url.strip() if health_url: @@ -541,6 +617,185 @@ def _fallback_target_name( return f"{request.product}-{lane.instance}" +def _create_or_verify_github_release( + *, + control_plane_root: Path, + profile: LaunchplaneProductProfileRecord, + context: str, + request: GenericWebProdPromotionRequest, + promotion_record_id: str, + deployment_record_id: str, + inventory_record_id: str, +) -> str: + owner, repo = _repository_parts(profile.repository) + token = resolve_launchplane_github_token( + control_plane_root=control_plane_root, + context_name=context, + ) + if not token: + raise click.ClickException( + f"Generic web prod promotion requires GitHub token for context '{context}' to create release '{request.release_tag}'." + ) + tag_target_sha = _github_tag_target_sha( + owner=owner, + repo=repo, + release_tag=request.release_tag, + token=token, + ) + if tag_target_sha and not _revisions_match(tag_target_sha, request.source_git_ref): + raise click.ClickException( + f"GitHub tag '{request.release_tag}' points at '{tag_target_sha}', not promoted revision '{request.source_git_ref}'." + ) + existing_release = _github_release_for_tag( + owner=owner, + repo=repo, + release_tag=request.release_tag, + token=token, + ) + if existing_release is not None: + return _release_html_url(existing_release) + + release_payload = github_api_request( + path=f"/repos/{owner}/{repo}/releases", + token=token, + method="POST", + body={ + "tag_name": request.release_tag, + "target_commitish": request.source_git_ref, + "name": request.release_tag, + "body": _github_release_body( + request=request, + promotion_record_id=promotion_record_id, + deployment_record_id=deployment_record_id, + inventory_record_id=inventory_record_id, + ), + "draft": False, + "prerelease": False, + }, + ) + if not isinstance(release_payload, dict): + raise click.ClickException( + f"GitHub release create response for {owner}/{repo} {request.release_tag} must be an object." + ) + return _release_html_url(release_payload) + + +def _repository_parts(repository: str) -> tuple[str, str]: + owner, separator, repo = repository.strip().partition("/") + if not owner or separator != "/" or not repo: + raise click.ClickException( + f"Generic web prod promotion requires product repository in OWNER/REPO form, got {repository!r}." + ) + return owner, repo + + +def _github_release_for_tag( + *, owner: str, repo: str, release_tag: str, token: str +) -> dict[str, object] | None: + try: + payload = github_api_request( + path=f"/repos/{owner}/{repo}/releases/tags/{quote(release_tag, safe='')}", + token=token, + ) + except click.ClickException as error: + if _github_not_found(error): + return None + raise + if not isinstance(payload, dict): + raise click.ClickException( + f"GitHub release lookup response for {owner}/{repo} {release_tag} must be an object." + ) + return payload + + +def _github_tag_target_sha(*, owner: str, repo: str, release_tag: str, token: str) -> str: + try: + payload = github_api_request( + path=f"/repos/{owner}/{repo}/git/ref/tags/{quote(release_tag, safe='')}", + token=token, + ) + except click.ClickException as error: + if _github_not_found(error): + return "" + raise + if not isinstance(payload, dict): + raise click.ClickException( + f"GitHub tag lookup response for {owner}/{repo} {release_tag} must be an object." + ) + target = _github_ref_object_sha(payload) + if _github_ref_object_type(payload) != "tag": + return target + annotated_payload = github_api_request( + path=f"/repos/{owner}/{repo}/git/tags/{quote(target, safe='')}", + token=token, + ) + if not isinstance(annotated_payload, dict): + raise click.ClickException( + f"GitHub annotated tag response for {owner}/{repo} {release_tag} must be an object." + ) + return _github_ref_object_sha(annotated_payload) + + +def _github_ref_object_sha(payload: dict[str, object]) -> str: + obj = payload.get("object") + if not isinstance(obj, dict): + raise click.ClickException("GitHub tag response is missing object data.") + sha = obj.get("sha") + if not isinstance(sha, str) or not sha.strip(): + raise click.ClickException("GitHub tag response is missing object sha.") + return sha.strip() + + +def _github_ref_object_type(payload: dict[str, object]) -> str: + obj = payload.get("object") + if not isinstance(obj, dict): + raise click.ClickException("GitHub tag response is missing object data.") + object_type = obj.get("type") + return object_type.strip() if isinstance(object_type, str) else "" + + +def _github_not_found(error: click.ClickException) -> bool: + message = str(error) + return "HTTP Error 404" in message or "404: Not Found" in message + + +def _revisions_match(left: str, right: str) -> bool: + normalized_left = left.strip() + normalized_right = right.strip() + if not normalized_left or not normalized_right: + return False + return ( + normalized_left == normalized_right + or (len(normalized_left) >= 7 and normalized_right.startswith(normalized_left)) + or (len(normalized_right) >= 7 and normalized_left.startswith(normalized_right)) + ) + + +def _release_html_url(payload: dict[str, object]) -> str: + html_url = payload.get("html_url") + return html_url.strip() if isinstance(html_url, str) else "" + + +def _github_release_body( + *, + request: GenericWebProdPromotionRequest, + promotion_record_id: str, + deployment_record_id: str, + inventory_record_id: str, +) -> str: + return "\n".join( + ( + f"Promoted {request.product} to {request.to_instance}.", + "", + f"- Artifact: `{request.artifact_id}`", + f"- Source git ref: `{request.source_git_ref}`", + f"- Promotion record: `{promotion_record_id}`", + f"- Deployment record: `{deployment_record_id}`", + f"- Inventory record: `{inventory_record_id}`", + ) + ) + + def _result_from_record( *, request: GenericWebProdPromotionRequest, @@ -569,6 +824,7 @@ def _result_from_record( backup_status=record.backup_gate.status, source_health_status=record.source_health.status, destination_health_status=record.destination_health.status, + release_tag=request.release_tag, target_name=record.deploy.target_name, target_type=record.deploy.target_type, target_id=target_id, diff --git a/tests/test_generic_web_deploy.py b/tests/test_generic_web_deploy.py index a3e6f0f..9d63a95 100644 --- a/tests/test_generic_web_deploy.py +++ b/tests/test_generic_web_deploy.py @@ -5,6 +5,7 @@ import click from control_plane.contracts.deployment_record import DeploymentRecord +from control_plane.contracts.environment_inventory import EnvironmentInventory from control_plane.contracts.product_profile_record import ( LaunchplaneProductProfileRecord, ProductImageProfile, @@ -24,6 +25,7 @@ class _GenericWebDeployStore: def __init__(self, profile: LaunchplaneProductProfileRecord) -> None: self.profile = profile self.deployments: list[DeploymentRecord] = [] + self.inventories: list[EnvironmentInventory] = [] def read_product_profile_record(self, product: str) -> LaunchplaneProductProfileRecord: if product != self.profile.product: @@ -33,6 +35,9 @@ def read_product_profile_record(self, product: str) -> LaunchplaneProductProfile def write_deployment_record(self, record: DeploymentRecord) -> None: self.deployments.append(record) + def write_environment_inventory(self, record: EnvironmentInventory) -> None: + self.inventories.append(record) + def _profile() -> LaunchplaneProductProfileRecord: return LaunchplaneProductProfileRecord( @@ -135,6 +140,14 @@ def test_execute_generic_web_deploy_writes_pass_record_for_profile_lane(self) -> self.assertEqual(result.target_id, "target-123") self.assertEqual(len(store.deployments), 1) self.assertEqual(store.deployments[0].deploy.status, "pass") + self.assertEqual(len(store.inventories), 1) + self.assertEqual(store.inventories[0].context, "sellyouroutboard-testing") + self.assertEqual(store.inventories[0].instance, "testing") + self.assertEqual(store.inventories[0].source_git_ref, "abc123") + self.assertEqual( + store.inventories[0].deployment_record_id, + store.deployments[0].record_id, + ) artifact_identity = store.deployments[0].artifact_identity assert artifact_identity is not None self.assertEqual( @@ -218,6 +231,7 @@ def test_execute_generic_web_deploy_records_failure_when_provider_fails(self) -> self.assertEqual(result.error_message, "provider failed") self.assertEqual(len(store.deployments), 1) self.assertEqual(store.deployments[0].deploy.status, "fail") + self.assertEqual(store.inventories, []) def test_resolve_generic_web_profile_lane_rejects_missing_lane(self) -> None: store = _GenericWebDeployStore(_profile()) diff --git a/tests/test_generic_web_promotion.py b/tests/test_generic_web_promotion.py index fe09169..b3c862f 100644 --- a/tests/test_generic_web_promotion.py +++ b/tests/test_generic_web_promotion.py @@ -60,6 +60,14 @@ def read_deployment_record(self, record_id: str) -> DeploymentRecord: except KeyError as exc: raise FileNotFoundError(record_id) from exc + def read_environment_inventory( + self, *, context_name: str, instance_name: str + ) -> EnvironmentInventory: + try: + return self.inventories[(context_name, instance_name)] + except KeyError as exc: + raise FileNotFoundError(f"{context_name}/{instance_name}") from exc + def read_backup_gate_record(self, record_id: str) -> BackupGateRecord: raise FileNotFoundError(record_id) @@ -149,6 +157,26 @@ def _deployment_record() -> DeploymentRecord: ) +def _testing_inventory(**overrides: object) -> EnvironmentInventory: + deployment_record = _deployment_record().model_copy( + update={"instance": "testing"} + ) + payload = { + "context": "sellyouroutboard-testing", + "instance": "testing", + "artifact_identity": ArtifactIdentityReference( + artifact_id="ghcr.io/cbusillo/sellyouroutboard@sha256:abc123" + ), + "source_git_ref": "abc123", + "deploy": deployment_record.deploy, + "destination_health": HealthcheckEvidence(status="pass"), + "updated_at": "2026-05-01T21:01:00Z", + "deployment_record_id": "deployment-syo-testing", + } + payload.update(overrides) + return EnvironmentInventory.model_validate(payload) + + def _deploy_result(*, deploy_status: Literal["pass", "fail"] = "pass") -> GenericWebDeployResult: return GenericWebDeployResult( deployment_record_id="deployment-syo-prod", @@ -168,6 +196,7 @@ def _deploy_result(*, deploy_status: Literal["pass", "fail"] = "pass") -> Generi class GenericWebProdPromotionTests(unittest.TestCase): def test_execute_records_source_destination_health_promotion_and_inventory(self) -> None: store = _GenericWebPromotionStore(_profile()) + store.write_environment_inventory(_testing_inventory()) def fake_deploy(**kwargs: object) -> GenericWebDeployResult: store.write_deployment_record(_deployment_record()) @@ -205,6 +234,15 @@ def fake_deploy(**kwargs: object) -> GenericWebDeployResult: def test_execute_prod_promotion_qualifies_bare_release_tag(self) -> None: store = _GenericWebPromotionStore(_profile()) + expected_artifact_id = ( + "ghcr.io/cbusillo/sellyouroutboard:sha-2da6435e10cade0870ed5cbdf40c8048594f8b1c" + ) + store.write_environment_inventory( + _testing_inventory( + artifact_identity=ArtifactIdentityReference(artifact_id=expected_artifact_id), + source_git_ref="2da6435e10cade0870ed5cbdf40c8048594f8b1c", + ) + ) seen_artifact_ids: list[str] = [] def fake_deploy(**kwargs: object) -> GenericWebDeployResult: @@ -241,16 +279,136 @@ def fake_deploy(**kwargs: object) -> GenericWebDeployResult: ), ) - expected_artifact_id = ( - "ghcr.io/cbusillo/sellyouroutboard:sha-2da6435e10cade0870ed5cbdf40c8048594f8b1c" - ) self.assertEqual(seen_artifact_ids, [expected_artifact_id]) self.assertEqual(result.artifact_id, expected_artifact_id) promotion = next(iter(store.promotions.values())) self.assertEqual(promotion.artifact_identity.artifact_id, expected_artifact_id) + def test_execute_prod_promotion_creates_github_release(self) -> None: + store = _GenericWebPromotionStore(_profile()) + store.write_environment_inventory(_testing_inventory()) + github_requests: list[tuple[str, str, dict[str, object] | None]] = [] + + def fake_deploy(**kwargs: object) -> GenericWebDeployResult: + store.write_deployment_record(_deployment_record()) + return _deploy_result() + + def fake_github_api_request( + *, path: str, token: str, method: str = "GET", body: dict[str, object] | None = None + ) -> object: + github_requests.append((method, path, body)) + self.assertEqual(token, "release-token") + if path.endswith("/git/ref/tags/v0.3.0"): + raise click.ClickException("GitHub API request failed: HTTP Error 404: Not Found") + if path.endswith("/releases/tags/v0.3.0"): + raise click.ClickException("GitHub API request failed: HTTP Error 404: Not Found") + if method == "POST" and path.endswith("/releases"): + return {"html_url": "https://github.com/cbusillo/sellyouroutboard/releases/tag/v0.3.0"} + raise AssertionError(path) + + with ( + patch( + "control_plane.workflows.generic_web_promotion.execute_generic_web_deploy", + side_effect=fake_deploy, + ), + patch( + "control_plane.workflows.generic_web_promotion._wait_for_healthcheck", + return_value=None, + ), + patch( + "control_plane.workflows.generic_web_promotion.resolve_launchplane_github_token", + return_value="release-token", + ), + patch( + "control_plane.workflows.generic_web_promotion.github_api_request", + side_effect=fake_github_api_request, + ), + ): + result = execute_generic_web_prod_promotion( + control_plane_root=Path("."), + record_store=store, + request=_request(release_tag="v0.3.0"), + ) + + self.assertEqual(result.promotion_status, "pass") + self.assertEqual(result.release_status, "pass") + self.assertEqual(result.release_tag, "v0.3.0") + self.assertEqual( + result.release_url, + "https://github.com/cbusillo/sellyouroutboard/releases/tag/v0.3.0", + ) + self.assertEqual( + github_requests[-1], + ( + "POST", + "/repos/cbusillo/sellyouroutboard/releases", + { + "tag_name": "v0.3.0", + "target_commitish": "abc123", + "name": "v0.3.0", + "body": "\n".join( + ( + "Promoted sellyouroutboard to prod.", + "", + "- Artifact: `ghcr.io/cbusillo/sellyouroutboard@sha256:abc123`", + "- Source git ref: `abc123`", + f"- Promotion record: `{result.promotion_record_id}`", + "- Deployment record: `deployment-syo-prod`", + "- Inventory record: `sellyouroutboard-testing-prod`", + ) + ), + "draft": False, + "prerelease": False, + }, + ), + ) + + def test_execute_prod_promotion_rejects_release_tag_mismatch(self) -> None: + store = _GenericWebPromotionStore(_profile()) + store.write_environment_inventory(_testing_inventory()) + + def fake_deploy(**kwargs: object) -> GenericWebDeployResult: + store.write_deployment_record(_deployment_record()) + return _deploy_result() + + def fake_github_api_request( + *, path: str, token: str, method: str = "GET", body: dict[str, object] | None = None + ) -> object: + if path.endswith("/git/ref/tags/v0.3.0"): + return {"object": {"type": "commit", "sha": "different"}} + raise AssertionError(path) + + with ( + patch( + "control_plane.workflows.generic_web_promotion.execute_generic_web_deploy", + side_effect=fake_deploy, + ), + patch( + "control_plane.workflows.generic_web_promotion._wait_for_healthcheck", + return_value=None, + ), + patch( + "control_plane.workflows.generic_web_promotion.resolve_launchplane_github_token", + return_value="release-token", + ), + patch( + "control_plane.workflows.generic_web_promotion.github_api_request", + side_effect=fake_github_api_request, + ), + ): + result = execute_generic_web_prod_promotion( + control_plane_root=Path("."), + record_store=store, + request=_request(release_tag="v0.3.0"), + ) + + self.assertEqual(result.promotion_status, "pass") + self.assertEqual(result.release_status, "fail") + self.assertIn("not promoted revision", result.error_message) + def test_dry_run_returns_pending_evidence_without_mutation(self) -> None: store = _GenericWebPromotionStore(_profile()) + store.write_environment_inventory(_testing_inventory()) result = execute_generic_web_prod_promotion( control_plane_root=Path("."), @@ -271,6 +429,7 @@ def test_request_requires_testing_to_prod(self) -> None: def test_execute_refreshes_inventory_when_health_is_skipped(self) -> None: store = _GenericWebPromotionStore(_profile()) + store.write_environment_inventory(_testing_inventory()) def fake_deploy(**kwargs: object) -> GenericWebDeployResult: store.write_deployment_record(_deployment_record()) @@ -295,6 +454,7 @@ def test_health_fallback_uses_product_health_path(self) -> None: store = _GenericWebPromotionStore( _profile(health_path="/healthz", explicit_health_urls=False) ) + store.write_environment_inventory(_testing_inventory()) def fake_deploy(**kwargs: object) -> GenericWebDeployResult: store.write_deployment_record(_deployment_record()) @@ -328,6 +488,7 @@ def fake_deploy(**kwargs: object) -> GenericWebDeployResult: def test_source_health_failure_records_failed_promotion_without_deploy(self) -> None: store = _GenericWebPromotionStore(_profile()) + store.write_environment_inventory(_testing_inventory()) with ( patch( @@ -354,6 +515,7 @@ def test_source_health_failure_records_failed_promotion_without_deploy(self) -> def test_deploy_failure_marks_destination_health_skipped(self) -> None: store = _GenericWebPromotionStore(_profile()) + store.write_environment_inventory(_testing_inventory()) def fake_deploy(**kwargs: object) -> GenericWebDeployResult: deployment_record = _deployment_record().model_copy( @@ -383,6 +545,42 @@ def fake_deploy(**kwargs: object) -> GenericWebDeployResult: self.assertEqual(result.destination_health_status, "skipped") self.assertEqual(healthcheck.call_count, 1) + def test_execute_rejects_stale_source_inventory(self) -> None: + store = _GenericWebPromotionStore(_profile()) + store.write_environment_inventory( + _testing_inventory( + artifact_identity=ArtifactIdentityReference( + artifact_id="ghcr.io/cbusillo/sellyouroutboard@sha256:newer" + ), + source_git_ref="newer", + ) + ) + + with self.assertRaises(click.ClickException) as caught: + execute_generic_web_prod_promotion( + control_plane_root=Path("."), + record_store=store, + request=_request(), + ) + + self.assertIn("does not match current source inventory", str(caught.exception)) + self.assertEqual(store.deployments, {}) + self.assertEqual(store.promotions, {}) + + def test_execute_rejects_missing_source_inventory(self) -> None: + store = _GenericWebPromotionStore(_profile()) + + with self.assertRaises(click.ClickException) as caught: + execute_generic_web_prod_promotion( + control_plane_root=Path("."), + record_store=store, + request=_request(), + ) + + self.assertIn("requires current source environment inventory", str(caught.exception)) + self.assertEqual(store.deployments, {}) + self.assertEqual(store.promotions, {}) + class GenericWebPromotionWorkflowTests(unittest.TestCase): def test_dispatches_product_workflow_and_observes_run(self) -> None: