Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 20 additions & 59 deletions control_plane/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,9 +131,11 @@
)
from control_plane.work_graph_service import (
WorkGraphPlanningFactsProvider,
WorkGraphRankEnvelope,
build_work_graph_rank_result,
build_work_graph_snapshot_service_payload,
)
from control_plane.work_graph_http import (
handle_work_graph_snapshot_read,
rank_work_graph_snapshot,
work_graph_rank_denied_response,
)
from control_plane.workflows.evidence_ingestion import (
apply_deployment_evidence,
Expand Down Expand Up @@ -4963,50 +4965,16 @@ def app(
query=query,
)
if action == "work_graph.rank":
if not authz_policy.allows(
return handle_work_graph_snapshot_read(
authz_policy=authz_policy,
identity=identity,
action=action,
product="launchplane",
context=_LAUNCHPLANE_SERVICE_CONTEXT,
):
return _json_response(
start_response=start_response,
status_code=403,
payload={
"status": "rejected",
"trace_id": request_trace_id,
"error": {
"code": "authorization_denied",
"message": "Workflow cannot read the Launchplane work graph snapshot.",
},
},
)

def work_graph_product_action_allowed(
requested_action: str, requested_product: str, requested_context: str
) -> bool:
return authz_policy.allows(
identity=identity,
action=requested_action,
product=requested_product,
context=requested_context,
)

work_graph_payload = build_work_graph_snapshot_service_payload(
generated_at=_utc_now_timestamp(),
trace_id=request_trace_id,
product_store=record_store,
work_request_store=_every_code_work_request_store(record_store),
action_allowed=work_graph_product_action_allowed,
planning_facts_provider=work_graph_planning_facts_provider,
)
return _json_response(
utc_now=_utc_now_timestamp,
json_response=_json_response,
start_response=start_response,
status_code=200,
payload={
"status": "ok",
"trace_id": request_trace_id,
**work_graph_payload,
},
)
if action == "product_profile.read":
if "product" in params:
Expand Down Expand Up @@ -5435,26 +5403,19 @@ def product_action_allowed(
payload=payload,
)
elif path == "/v1/work-graph/rank":
rank_request = WorkGraphRankEnvelope.model_validate(payload)
if not authz_policy.allows(
work_graph_rank_result = rank_work_graph_snapshot(
authz_policy=authz_policy,
identity=identity,
action="work_graph.rank",
product="launchplane",
context=_LAUNCHPLANE_SERVICE_CONTEXT,
):
return _json_response(
payload=payload,
)
if work_graph_rank_result is None:
return work_graph_rank_denied_response(
trace_id=request_trace_id,
json_response=_json_response,
start_response=start_response,
status_code=403,
payload={
"status": "rejected",
"trace_id": request_trace_id,
"error": {
"code": "authorization_denied",
"message": "Workflow cannot rank Launchplane work graph snapshots.",
},
},
)
result, driver_result = build_work_graph_rank_result(rank_request)
result = work_graph_rank_result.result
driver_result = work_graph_rank_result.driver_result
elif path == "/v1/evidence/deployments":
deployment_request = DeploymentEvidenceEnvelope.model_validate(payload)
if not authz_policy.allows(
Expand Down
135 changes: 135 additions & 0 deletions control_plane/work_graph_http.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
from __future__ import annotations

from collections.abc import Callable
from dataclasses import dataclass
from control_plane.contracts.product_environment_read_model import (
ActionAllowed,
ProductReadModelStore,
)
from control_plane.service_auth import LaunchplaneAuthzPolicy, LaunchplaneIdentity
from control_plane.work_graph_service import (
WorkGraphPlanningFactsProvider,
WorkGraphRankEnvelope,
WorkGraphWorkRequestStore,
build_work_graph_rank_result,
build_work_graph_snapshot_service_payload,
)


JsonResponse = Callable[..., list[bytes]]
StartResponse = Callable[[str, list[tuple[str, str]]], None]
UtcNow = Callable[[], str]

LAUNCHPLANE_SERVICE_CONTEXT = "launchplane"


@dataclass(frozen=True)
class WorkGraphRankResult:
result: dict[str, object]
driver_result: dict[str, object]


def handle_work_graph_snapshot_read(
*,
authz_policy: LaunchplaneAuthzPolicy,
identity: LaunchplaneIdentity,
trace_id: str,
product_store: ProductReadModelStore,
work_request_store: WorkGraphWorkRequestStore,
planning_facts_provider: WorkGraphPlanningFactsProvider | None,
utc_now: UtcNow,
json_response: JsonResponse,
start_response: StartResponse,
) -> list[bytes]:
if not authz_policy.allows(
identity=identity,
action="work_graph.rank",
product="launchplane",
context=LAUNCHPLANE_SERVICE_CONTEXT,
):
return json_response(
start_response=start_response,
status_code=403,
payload={
"status": "rejected",
"trace_id": trace_id,
"error": {
"code": "authorization_denied",
"message": "Workflow cannot read the Launchplane work graph snapshot.",
},
},
)

work_graph_payload = build_work_graph_snapshot_service_payload(
generated_at=utc_now(),
product_store=product_store,
work_request_store=work_request_store,
action_allowed=_work_graph_product_action_allowed(
authz_policy=authz_policy,
identity=identity,
),
planning_facts_provider=planning_facts_provider,
)
return json_response(
start_response=start_response,
status_code=200,
payload={
"status": "ok",
"trace_id": trace_id,
**work_graph_payload,
},
)


def rank_work_graph_snapshot(
*,
authz_policy: LaunchplaneAuthzPolicy,
identity: LaunchplaneIdentity,
payload: dict[str, object],
) -> WorkGraphRankResult | None:
rank_request = WorkGraphRankEnvelope.model_validate(payload)
if not authz_policy.allows(
identity=identity,
action="work_graph.rank",
product="launchplane",
context=LAUNCHPLANE_SERVICE_CONTEXT,
):
return None
result, driver_result = build_work_graph_rank_result(rank_request)
return WorkGraphRankResult(result=result, driver_result=driver_result)


def work_graph_rank_denied_response(
*,
trace_id: str,
json_response: JsonResponse,
start_response: StartResponse,
) -> list[bytes]:
return json_response(
start_response=start_response,
status_code=403,
payload={
"status": "rejected",
"trace_id": trace_id,
"error": {
"code": "authorization_denied",
"message": "Workflow cannot rank Launchplane work graph snapshots.",
},
},
)


def _work_graph_product_action_allowed(
*,
authz_policy: LaunchplaneAuthzPolicy,
identity: LaunchplaneIdentity,
) -> ActionAllowed:
def action_allowed(requested_action: str, requested_product: str, requested_context: str) -> bool:
return authz_policy.allows(
identity=identity,
action=requested_action,
product=requested_product,
context=requested_context,
)

return action_allowed