Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 143 additions & 0 deletions control_plane/runtime_key_safety_http.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
from __future__ import annotations

from collections.abc import Callable
from dataclasses import dataclass

from pydantic import BaseModel, ConfigDict, Field, model_validator

from control_plane import runtime_key_safety_service as control_plane_runtime_key_safety_service
from control_plane.contracts.runtime_key_safety_policy import RuntimeSecretSafetyRule
from control_plane.runtime_key_safety_service import RuntimeKeySafetyPolicyStore
from control_plane.service_auth import LaunchplaneAuthzPolicy, LaunchplaneIdentity


JsonResponse = Callable[..., list[bytes]]
RecordSlugProvider = Callable[[str], str]
StartResponse = Callable[[str, list[tuple[str, str]]], None]
TimestampProvider = Callable[[], str]

LAUNCHPLANE_SERVICE_CONTEXT = "launchplane"


class RuntimeKeySafetyPolicyApplyEnvelope(BaseModel):
model_config = ConfigDict(extra="forbid")

schema_version: int = Field(default=1, ge=1)
product: str
source_label: str = "service:runtime-key-safety-policy"
rules: tuple[RuntimeSecretSafetyRule, ...]

@model_validator(mode="after")
def _validate_alignment(self) -> "RuntimeKeySafetyPolicyApplyEnvelope":
if self.product.strip() != "launchplane":
raise ValueError("Runtime key-safety policy writes require product 'launchplane'.")
self.product = "launchplane"
self.source_label = self.source_label.strip() or "service:runtime-key-safety-policy"
if not self.rules:
raise ValueError("Runtime key-safety policy writes require at least one rule.")
return self


@dataclass(frozen=True)
class RuntimeKeySafetyPolicyRouteResult:
result: dict[str, object]
driver_result: dict[str, object]


def validate_runtime_key_safety_policy_request(
*,
authz_policy: LaunchplaneAuthzPolicy,
identity: LaunchplaneIdentity,
payload: dict[str, object],
trace_id: str,
json_response: JsonResponse,
start_response: StartResponse,
) -> tuple[RuntimeKeySafetyPolicyApplyEnvelope | None, list[bytes] | None]:
request = RuntimeKeySafetyPolicyApplyEnvelope.model_validate(payload)
if _runtime_key_safety_policy_authorized(
authz_policy=authz_policy,
identity=identity,
product=request.product,
):
return request, None
return None, json_response(
start_response=start_response,
status_code=403,
payload={
"status": "rejected",
"trace_id": trace_id,
"error": {
"code": "authorization_denied",
"message": "Workflow cannot write Launchplane runtime key-safety policy records.",
},
},
)


def apply_runtime_key_safety_policy_route(
*,
record_store: RuntimeKeySafetyPolicyStore,
request: RuntimeKeySafetyPolicyApplyEnvelope,
now_timestamp: TimestampProvider,
record_slug: RecordSlugProvider,
) -> RuntimeKeySafetyPolicyRouteResult:
runtime_policy_record, changed = (
control_plane_runtime_key_safety_service.write_runtime_key_safety_policy(
record_store=record_store,
rules=request.rules,
source_label=request.source_label,
now_timestamp=now_timestamp,
record_slug=record_slug,
)
)
return RuntimeKeySafetyPolicyRouteResult(
result={
"runtime_key_safety_policy_record_id": runtime_policy_record.record_id,
"runtime_key_safety_policy_changed": str(changed).lower(),
},
driver_result={
"runtime_key_safety_policy": control_plane_runtime_key_safety_service.summarize_runtime_key_safety_policy_record(
runtime_policy_record
),
"changed": changed,
},
)


def runtime_key_safety_database_required_response(
*,
trace_id: str,
json_response: JsonResponse,
start_response: StartResponse,
) -> list[bytes]:
return json_response(
start_response=start_response,
status_code=503,
payload={
"status": "rejected",
"trace_id": trace_id,
"error": {
"code": "database_required",
"message": "Runtime key-safety policy writes require Launchplane database storage.",
},
},
)


def _runtime_key_safety_policy_authorized(
*,
authz_policy: LaunchplaneAuthzPolicy,
identity: LaunchplaneIdentity,
product: str,
) -> bool:
return authz_policy.allows(
identity=identity,
action="runtime_key_safety.write",
product=product,
context=LAUNCHPLANE_SERVICE_CONTEXT,
) or authz_policy.allows(
identity=identity,
action="launchplane_service_deploy.execute",
product=product,
context=LAUNCHPLANE_SERVICE_CONTEXT,
)
99 changes: 26 additions & 73 deletions control_plane/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
from control_plane import product_onboarding_service as control_plane_product_onboarding_service
from control_plane import product_read_service as control_plane_product_read_service
from control_plane import live_target_runtime as control_plane_live_target_runtime
from control_plane import runtime_key_safety_service as control_plane_runtime_key_safety_service
from control_plane import secrets as control_plane_secrets
from control_plane.contracts.authz_policy_record import (
LaunchplaneAuthzPolicyRecord,
Expand Down Expand Up @@ -89,7 +88,12 @@
PromotionRecord,
ReleaseStatus,
)
from control_plane.contracts.runtime_key_safety_policy import RuntimeSecretSafetyRule
from control_plane.runtime_key_safety_http import (
RuntimeKeySafetyPolicyRouteResult,
apply_runtime_key_safety_policy_route,
runtime_key_safety_database_required_response,
validate_runtime_key_safety_policy_request,
)
from control_plane.drivers.registry import (
build_driver_context_view,
list_driver_descriptors,
Expand Down Expand Up @@ -1320,25 +1324,6 @@ class EveryCodePreviewGateEnvelope(EveryCodePreviewGateRecord):
pass


class RuntimeKeySafetyPolicyApplyEnvelope(BaseModel):
model_config = ConfigDict(extra="forbid")

schema_version: int = Field(default=1, ge=1)
product: str
source_label: str = "service:runtime-key-safety-policy"
rules: tuple[RuntimeSecretSafetyRule, ...]

@model_validator(mode="after")
def _validate_alignment(self) -> "RuntimeKeySafetyPolicyApplyEnvelope":
if self.product.strip() != "launchplane":
raise ValueError("Runtime key-safety policy writes require product 'launchplane'.")
self.product = "launchplane"
self.source_label = self.source_label.strip() or "service:runtime-key-safety-policy"
if not self.rules:
raise ValueError("Runtime key-safety policy writes require at least one rule.")
return self


class ProductOnboardingApplyEnvelope(BaseModel):
model_config = ConfigDict(extra="forbid")

Expand Down Expand Up @@ -5611,45 +5596,24 @@ def product_action_allowed(
)
)
elif path == "/v1/runtime-key-safety/policies/apply":
runtime_policy_request = RuntimeKeySafetyPolicyApplyEnvelope.model_validate(payload)
if not isinstance(record_store, PostgresRecordStore):
return _json_response(
start_response=start_response,
status_code=503,
payload={
"status": "rejected",
"trace_id": request_trace_id,
"error": {
"code": "database_required",
"message": "Runtime key-safety policy writes require Launchplane database storage.",
},
},
)
if not (
authz_policy.allows(
identity=identity,
action="runtime_key_safety.write",
product=runtime_policy_request.product,
context=_LAUNCHPLANE_SERVICE_CONTEXT,
)
or authz_policy.allows(
runtime_policy_request, runtime_policy_response = (
validate_runtime_key_safety_policy_request(
authz_policy=authz_policy,
identity=identity,
action="launchplane_service_deploy.execute",
product=runtime_policy_request.product,
context=_LAUNCHPLANE_SERVICE_CONTEXT,
payload=payload,
trace_id=request_trace_id,
json_response=_json_response,
start_response=start_response,
)
):
return _json_response(
)
if runtime_policy_response is not None:
return runtime_policy_response
assert runtime_policy_request is not None
if not isinstance(record_store, PostgresRecordStore):
return runtime_key_safety_database_required_response(
trace_id=request_trace_id,
json_response=_json_response,
start_response=start_response,
status_code=403,
payload={
"status": "rejected",
"trace_id": request_trace_id,
"error": {
"code": "authorization_denied",
"message": "Workflow cannot write Launchplane runtime key-safety policy records.",
},
},
)
idempotent_response = _check_idempotent_request(
record_store=record_store,
Expand All @@ -5662,26 +5626,15 @@ def product_action_allowed(
)
if idempotent_response is not None:
return idempotent_response
(
runtime_policy_record,
changed,
) = control_plane_runtime_key_safety_service.write_runtime_key_safety_policy(
runtime_policy_result = apply_runtime_key_safety_policy_route(
record_store=record_store,
rules=runtime_policy_request.rules,
source_label=runtime_policy_request.source_label,
request=runtime_policy_request,
now_timestamp=_now_timestamp,
record_slug=_record_slug,
)
result = {
"runtime_key_safety_policy_record_id": runtime_policy_record.record_id,
"runtime_key_safety_policy_changed": str(changed).lower(),
}
driver_result = {
"runtime_key_safety_policy": control_plane_runtime_key_safety_service.summarize_runtime_key_safety_policy_record(
runtime_policy_record
),
"changed": changed,
}
assert isinstance(runtime_policy_result, RuntimeKeySafetyPolicyRouteResult)
result = runtime_policy_result.result
driver_result = runtime_policy_result.driver_result
elif path == "/v1/live-target-runtime/apply":
live_target_runtime_request = LiveTargetRuntimeApplyEnvelope.model_validate(payload)
action = (
Expand Down