From 7bf3afbe7bfe3248f6ba5b3b59c16cfba25cb30b Mon Sep 17 00:00:00 2001 From: Chris Busillo Date: Thu, 7 May 2026 16:18:47 -0400 Subject: [PATCH] Extract work graph service routes --- control_plane/service.py | 79 +++++------------- control_plane/work_graph_http.py | 135 +++++++++++++++++++++++++++++++ 2 files changed, 155 insertions(+), 59 deletions(-) create mode 100644 control_plane/work_graph_http.py diff --git a/control_plane/service.py b/control_plane/service.py index adf01c9..52a5300 100644 --- a/control_plane/service.py +++ b/control_plane/service.py @@ -131,9 +131,11 @@ ) from control_plane.work_graph_service import ( WorkGraphPlanningFactsProvider, - WorkGraphRankEnvelope, - build_work_graph_rank_result, - build_work_graph_snapshot_service_payload, +) +from control_plane.work_graph_http import ( + handle_work_graph_snapshot_read, + rank_work_graph_snapshot, + work_graph_rank_denied_response, ) from control_plane.workflows.evidence_ingestion import ( apply_deployment_evidence, @@ -4963,50 +4965,16 @@ def app( query=query, ) if action == "work_graph.rank": - if not authz_policy.allows( + return handle_work_graph_snapshot_read( + authz_policy=authz_policy, identity=identity, - action=action, - product="launchplane", - context=_LAUNCHPLANE_SERVICE_CONTEXT, - ): - return _json_response( - start_response=start_response, - status_code=403, - payload={ - "status": "rejected", - "trace_id": request_trace_id, - "error": { - "code": "authorization_denied", - "message": "Workflow cannot read the Launchplane work graph snapshot.", - }, - }, - ) - - def work_graph_product_action_allowed( - requested_action: str, requested_product: str, requested_context: str - ) -> bool: - return authz_policy.allows( - identity=identity, - action=requested_action, - product=requested_product, - context=requested_context, - ) - - work_graph_payload = build_work_graph_snapshot_service_payload( - generated_at=_utc_now_timestamp(), + trace_id=request_trace_id, product_store=record_store, work_request_store=_every_code_work_request_store(record_store), - action_allowed=work_graph_product_action_allowed, planning_facts_provider=work_graph_planning_facts_provider, - ) - return _json_response( + utc_now=_utc_now_timestamp, + json_response=_json_response, start_response=start_response, - status_code=200, - payload={ - "status": "ok", - "trace_id": request_trace_id, - **work_graph_payload, - }, ) if action == "product_profile.read": if "product" in params: @@ -5435,26 +5403,19 @@ def product_action_allowed( payload=payload, ) elif path == "/v1/work-graph/rank": - rank_request = WorkGraphRankEnvelope.model_validate(payload) - if not authz_policy.allows( + work_graph_rank_result = rank_work_graph_snapshot( + authz_policy=authz_policy, identity=identity, - action="work_graph.rank", - product="launchplane", - context=_LAUNCHPLANE_SERVICE_CONTEXT, - ): - return _json_response( + payload=payload, + ) + if work_graph_rank_result is None: + return work_graph_rank_denied_response( + trace_id=request_trace_id, + json_response=_json_response, start_response=start_response, - status_code=403, - payload={ - "status": "rejected", - "trace_id": request_trace_id, - "error": { - "code": "authorization_denied", - "message": "Workflow cannot rank Launchplane work graph snapshots.", - }, - }, ) - result, driver_result = build_work_graph_rank_result(rank_request) + result = work_graph_rank_result.result + driver_result = work_graph_rank_result.driver_result elif path == "/v1/evidence/deployments": deployment_request = DeploymentEvidenceEnvelope.model_validate(payload) if not authz_policy.allows( diff --git a/control_plane/work_graph_http.py b/control_plane/work_graph_http.py new file mode 100644 index 0000000..77def64 --- /dev/null +++ b/control_plane/work_graph_http.py @@ -0,0 +1,135 @@ +from __future__ import annotations + +from collections.abc import Callable +from dataclasses import dataclass +from control_plane.contracts.product_environment_read_model import ( + ActionAllowed, + ProductReadModelStore, +) +from control_plane.service_auth import LaunchplaneAuthzPolicy, LaunchplaneIdentity +from control_plane.work_graph_service import ( + WorkGraphPlanningFactsProvider, + WorkGraphRankEnvelope, + WorkGraphWorkRequestStore, + build_work_graph_rank_result, + build_work_graph_snapshot_service_payload, +) + + +JsonResponse = Callable[..., list[bytes]] +StartResponse = Callable[[str, list[tuple[str, str]]], None] +UtcNow = Callable[[], str] + +LAUNCHPLANE_SERVICE_CONTEXT = "launchplane" + + +@dataclass(frozen=True) +class WorkGraphRankResult: + result: dict[str, object] + driver_result: dict[str, object] + + +def handle_work_graph_snapshot_read( + *, + authz_policy: LaunchplaneAuthzPolicy, + identity: LaunchplaneIdentity, + trace_id: str, + product_store: ProductReadModelStore, + work_request_store: WorkGraphWorkRequestStore, + planning_facts_provider: WorkGraphPlanningFactsProvider | None, + utc_now: UtcNow, + json_response: JsonResponse, + start_response: StartResponse, +) -> list[bytes]: + if not authz_policy.allows( + identity=identity, + action="work_graph.rank", + product="launchplane", + context=LAUNCHPLANE_SERVICE_CONTEXT, + ): + return json_response( + start_response=start_response, + status_code=403, + payload={ + "status": "rejected", + "trace_id": trace_id, + "error": { + "code": "authorization_denied", + "message": "Workflow cannot read the Launchplane work graph snapshot.", + }, + }, + ) + + work_graph_payload = build_work_graph_snapshot_service_payload( + generated_at=utc_now(), + product_store=product_store, + work_request_store=work_request_store, + action_allowed=_work_graph_product_action_allowed( + authz_policy=authz_policy, + identity=identity, + ), + planning_facts_provider=planning_facts_provider, + ) + return json_response( + start_response=start_response, + status_code=200, + payload={ + "status": "ok", + "trace_id": trace_id, + **work_graph_payload, + }, + ) + + +def rank_work_graph_snapshot( + *, + authz_policy: LaunchplaneAuthzPolicy, + identity: LaunchplaneIdentity, + payload: dict[str, object], +) -> WorkGraphRankResult | None: + rank_request = WorkGraphRankEnvelope.model_validate(payload) + if not authz_policy.allows( + identity=identity, + action="work_graph.rank", + product="launchplane", + context=LAUNCHPLANE_SERVICE_CONTEXT, + ): + return None + result, driver_result = build_work_graph_rank_result(rank_request) + return WorkGraphRankResult(result=result, driver_result=driver_result) + + +def work_graph_rank_denied_response( + *, + trace_id: str, + json_response: JsonResponse, + start_response: StartResponse, +) -> list[bytes]: + return json_response( + start_response=start_response, + status_code=403, + payload={ + "status": "rejected", + "trace_id": trace_id, + "error": { + "code": "authorization_denied", + "message": "Workflow cannot rank Launchplane work graph snapshots.", + }, + }, + ) + + +def _work_graph_product_action_allowed( + *, + authz_policy: LaunchplaneAuthzPolicy, + identity: LaunchplaneIdentity, +) -> ActionAllowed: + def action_allowed(requested_action: str, requested_product: str, requested_context: str) -> bool: + return authz_policy.allows( + identity=identity, + action=requested_action, + product=requested_product, + context=requested_context, + ) + + return action_allowed