Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 156 additions & 0 deletions control_plane/product_config_http.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
from __future__ import annotations

from collections.abc import Callable
from dataclasses import dataclass
from typing import cast

from pydantic import BaseModel, ConfigDict, Field, field_validator, model_validator

from control_plane import product_config as control_plane_product_config
from control_plane import product_config_service as control_plane_product_config_service
from control_plane.product_config import ProductConfigStore
from control_plane.service_auth import LaunchplaneAuthzPolicy, LaunchplaneIdentity


JsonResponse = Callable[..., list[bytes]]
StartResponse = Callable[[str, list[tuple[str, str]]], None]


class ProductConfigApplyEnvelope(BaseModel):
model_config = ConfigDict(extra="forbid")

schema_version: int = Field(default=1, ge=1)
mode: str
product: str
context: str = ""
instance: str = ""
source_label: str = "product-config-api"
runtime_env: dict[str, object] | None = None
runtime_environment: dict[str, object] | None = None
secrets: list[dict[str, object]] = Field(default_factory=list)

@field_validator("mode")
@classmethod
def _validate_mode(cls, value: str) -> str:
normalized_value = value.strip().lower()
if normalized_value not in {"dry-run", "apply"}:
raise ValueError("Product config mode must be 'dry-run' or 'apply'.")
return normalized_value

@model_validator(mode="after")
def _validate_product(self) -> "ProductConfigApplyEnvelope":
self.product = self.product.strip()
self.context = self.context.strip()
self.instance = self.instance.strip()
self.source_label = self.source_label.strip() or "product-config-api"
if not self.product:
raise ValueError("Product config apply requires product.")
return self

def product_config_payload(self) -> dict[str, object]:
payload: dict[str, object] = {
"schema_version": self.schema_version,
"product": self.product,
"context": self.context,
"instance": self.instance,
"secrets": self.secrets,
}
if self.runtime_env is not None:
payload["runtime_env"] = self.runtime_env
if self.runtime_environment is not None:
payload["runtime_environment"] = self.runtime_environment
return payload


@dataclass(frozen=True)
class ProductConfigRouteResult:
driver_result: dict[str, object] | None


def validate_product_config_apply_request(
*,
authz_policy: LaunchplaneAuthzPolicy,
identity: LaunchplaneIdentity,
payload: dict[str, object],
trace_id: str,
json_response: JsonResponse,
start_response: StartResponse,
) -> tuple[ProductConfigApplyEnvelope | None, list[bytes] | None]:
request = ProductConfigApplyEnvelope.model_validate(payload)
action = "product_config.apply" if request.mode == "apply" else "product_config.plan"
if authz_policy.allows(
identity=identity,
action=action,
product=request.product,
context=request.context,
):
return request, None
return None, json_response(
start_response=start_response,
status_code=403,
payload={
"status": "rejected",
"trace_id": trace_id,
"error": {
"code": "authorization_denied",
"message": (
"Workflow cannot plan or apply product config for the requested"
" product/context."
),
},
},
)


def apply_product_config_route(
*,
record_store: ProductConfigStore,
request: ProductConfigApplyEnvelope,
actor: str,
trace_id: str,
json_response: JsonResponse,
start_response: StartResponse,
) -> ProductConfigRouteResult | list[bytes]:
driver_result, product_config_error = (
control_plane_product_config_service.apply_product_config_service_request(
record_store=record_store,
payload=request.product_config_payload(),
mode=cast(control_plane_product_config.ProductConfigMode, request.mode),
actor=actor,
source_label=request.source_label,
)
)
if product_config_error is not None:
return json_response(
start_response=start_response,
status_code=product_config_error.status_code,
payload={
"status": "rejected",
"trace_id": trace_id,
"error": {
"code": product_config_error.code,
"message": product_config_error.message,
},
},
)
return ProductConfigRouteResult(driver_result=driver_result)


def product_config_database_required_response(
*,
trace_id: str,
json_response: JsonResponse,
start_response: StartResponse,
) -> list[bytes]:
return json_response(
start_response=start_response,
status_code=503,
payload={
"status": "rejected",
"trace_id": trace_id,
"error": {
"code": "database_required",
"message": "Product config apply requires DB-backed Launchplane storage.",
},
},
)
134 changes: 28 additions & 106 deletions control_plane/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@

from control_plane import authz_grant_service as control_plane_authz_grant_service
from control_plane import dokploy as control_plane_dokploy
from control_plane import product_config as control_plane_product_config
from control_plane import product_config_service as control_plane_product_config_service
from control_plane import product_context_audit as control_plane_product_context_audit
from control_plane import product_context_cutover as control_plane_product_context_cutover
from control_plane import product_onboarding_service as control_plane_product_onboarding_service
Expand Down Expand Up @@ -125,6 +123,12 @@
from control_plane.storage.filesystem import FilesystemRecordStore
from control_plane.storage.postgres import PostgresRecordStore
from control_plane.tracked_target_logs import build_tracked_target_logs_payload
from control_plane.product_config_http import (
ProductConfigRouteResult,
apply_product_config_route,
product_config_database_required_response,
validate_product_config_apply_request,
)
from control_plane.work_graph_github_projects import (
build_github_project_planning_facts,
load_github_project_planning_facts_config_from_env,
Expand Down Expand Up @@ -1350,52 +1354,6 @@ def _validate_alignment(self) -> "ProductOnboardingApplyEnvelope":
return self


class ProductConfigApplyEnvelope(BaseModel):
model_config = ConfigDict(extra="forbid")

schema_version: int = Field(default=1, ge=1)
mode: str
product: str
context: str = ""
instance: str = ""
source_label: str = "product-config-api"
runtime_env: dict[str, object] | None = None
runtime_environment: dict[str, object] | None = None
secrets: list[dict[str, object]] = Field(default_factory=list)

@field_validator("mode")
@classmethod
def _validate_mode(cls, value: str) -> str:
normalized_value = value.strip().lower()
if normalized_value not in {"dry-run", "apply"}:
raise ValueError("Product config mode must be 'dry-run' or 'apply'.")
return normalized_value

@model_validator(mode="after")
def _validate_product(self) -> "ProductConfigApplyEnvelope":
self.product = self.product.strip()
self.context = self.context.strip()
self.instance = self.instance.strip()
self.source_label = self.source_label.strip() or "product-config-api"
if not self.product:
raise ValueError("Product config apply requires product.")
return self

def product_config_payload(self) -> dict[str, object]:
payload: dict[str, object] = {
"schema_version": self.schema_version,
"product": self.product,
"context": self.context,
"instance": self.instance,
"secrets": self.secrets,
}
if self.runtime_env is not None:
payload["runtime_env"] = self.runtime_env
if self.runtime_environment is not None:
payload["runtime_environment"] = self.runtime_environment
return payload


class LiveTargetRuntimeApplyEnvelope(BaseModel):
model_config = ConfigDict(extra="forbid")

Expand Down Expand Up @@ -5493,33 +5451,19 @@ def product_action_allowed(
record_store.write_backup_gate_record(backup_gate_request.backup_gate)
result = {"backup_gate_record_id": backup_gate_request.backup_gate.record_id}
elif path == "/v1/product-config/apply":
product_config_request = ProductConfigApplyEnvelope.model_validate(payload)
action = (
"product_config.apply"
if product_config_request.mode == "apply"
else "product_config.plan"
)
if not authz_policy.allows(
identity=identity,
action=action,
product=product_config_request.product,
context=product_config_request.context,
):
return _json_response(
product_config_request, product_config_response = (
validate_product_config_apply_request(
authz_policy=authz_policy,
identity=identity,
payload=payload,
trace_id=request_trace_id,
json_response=_json_response,
start_response=start_response,
status_code=403,
payload={
"status": "rejected",
"trace_id": request_trace_id,
"error": {
"code": "authorization_denied",
"message": (
"Workflow cannot plan or apply product config for the"
" requested product/context."
),
},
},
)
)
if product_config_response is not None:
return product_config_response
assert product_config_request is not None
idempotent_response = _check_idempotent_request(
record_store=record_store,
scope=request_scope,
Expand All @@ -5532,44 +5476,22 @@ def product_action_allowed(
if idempotent_response is not None:
return idempotent_response
if not isinstance(record_store, PostgresRecordStore):
return _json_response(
return product_config_database_required_response(
trace_id=request_trace_id,
json_response=_json_response,
start_response=start_response,
status_code=503,
payload={
"status": "rejected",
"trace_id": request_trace_id,
"error": {
"code": "database_required",
"message": "Product config apply requires DB-backed Launchplane storage.",
},
},
)
(
driver_result,
product_config_error,
) = control_plane_product_config_service.apply_product_config_service_request(
product_config_result = apply_product_config_route(
record_store=record_store,
payload=product_config_request.product_config_payload(),
mode=cast(
control_plane_product_config.ProductConfigMode,
product_config_request.mode,
),
request=product_config_request,
actor=_identity_actor(identity),
source_label=product_config_request.source_label,
trace_id=request_trace_id,
json_response=_json_response,
start_response=start_response,
)
if product_config_error is not None:
return _json_response(
start_response=start_response,
status_code=product_config_error.status_code,
payload={
"status": "rejected",
"trace_id": request_trace_id,
"error": {
"code": product_config_error.code,
"message": product_config_error.message,
},
},
)
if not isinstance(product_config_result, ProductConfigRouteResult):
return product_config_result
driver_result = product_config_result.driver_result
elif path == "/v1/authz-policies/github-actions/grants":
authz_grant_request = control_plane_authz_grant_service.AuthzPolicyGitHubActionsGrantEnvelope.model_validate(
payload
Expand Down