Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
255 changes: 255 additions & 0 deletions control_plane/contracts/product_environment_read_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
from control_plane.contracts.product_profile_record import (
LaunchplaneProductProfileRecord,
ProductLaneProfile,
ProductRuntimeConfigRequirement,
ProductSecretConfigRequirement,
)
from control_plane.contracts.runtime_environment_record import RuntimeEnvironmentRecord
from control_plane.contracts.secret_record import SecretBinding
Expand All @@ -25,6 +27,14 @@

ActionAllowed = Callable[[str, str, str], bool]
ProductSecretBindingTrustState = FreshnessStatus | Literal["disabled"]
ProductConfigItemStatus = Literal[
"configured",
"missing",
"disabled",
"unvalidated",
"stale",
"unsupported",
]


class ProductReadModelStore(Protocol):
Expand Down Expand Up @@ -190,6 +200,48 @@ class ProductEnvironmentDetail(BaseModel):
provenance: DataProvenance


class ProductRuntimeConfigStatusItem(BaseModel):
model_config = ConfigDict(extra="forbid")

key: str
status: ProductConfigItemStatus
context: str = ""
instance: str = ""
source_label: str = ""
updated_at: str = ""
trust_state: FreshnessStatus = "missing"


class ProductManagedSecretConfigStatusItem(BaseModel):
model_config = ConfigDict(extra="forbid")

binding_key: str
status: ProductConfigItemStatus
integration: str
context: str = ""
instance: str = ""
updated_at: str = ""
trust_state: ProductSecretBindingTrustState | Literal["unsupported"] = "missing"


class ProductEnvironmentConfigStatus(BaseModel):
model_config = ConfigDict(extra="forbid")

schema_version: int = Field(default=1, ge=1)
product: str
display_name: str
repository: str
driver_id: str
base_driver_id: str = ""
environment: str
context: str
runtime_settings: tuple[ProductRuntimeConfigStatusItem, ...] = ()
managed_secrets: tuple[ProductManagedSecretConfigStatusItem, ...] = ()
warnings: tuple[str, ...] = ()
trust_state: FreshnessStatus
provenance: DataProvenance


class ProductActivityRecordLink(BaseModel):
model_config = ConfigDict(extra="forbid")

Expand Down Expand Up @@ -331,6 +383,57 @@ def build_product_environment_detail(
)


def build_product_environment_config_status(
*,
record_store: ProductReadModelStore,
product: str,
environment: str,
) -> ProductEnvironmentConfigStatus:
profile = record_store.read_product_profile_record(product)
lane = _find_lane(profile=profile, environment=environment)
descriptor, descriptor_warning = _read_profile_descriptor(profile)
lane_summary = _read_product_lane_summary(
record_store=record_store,
profile=profile,
lane=lane,
)
provenance = (
lane_summary.provenance if lane_summary is not None else _missing_lane_provenance(lane)
)
runtime_settings = _runtime_config_status_items(
requirements=profile.expected_config.runtime_environment_keys,
lane=lane,
lane_summary=lane_summary,
)
managed_secrets = _managed_secret_config_status_items(
requirements=profile.expected_config.managed_secret_bindings,
lane=lane,
lane_summary=lane_summary,
)
warnings = tuple(warning for warning in (descriptor_warning,) if warning)
trust_state = _combine_trust_states(
(
*(_config_item_freshness(item.status) for item in runtime_settings),
*(_config_item_freshness(item.status) for item in managed_secrets),
),
fallback=provenance.freshness_status,
)
return ProductEnvironmentConfigStatus(
product=profile.product,
display_name=profile.display_name,
repository=profile.repository,
driver_id=profile.driver_id,
base_driver_id=descriptor.base_driver_id if descriptor is not None else "",
environment=lane.instance,
context=lane.context,
runtime_settings=runtime_settings,
managed_secrets=managed_secrets,
warnings=warnings,
trust_state=trust_state,
provenance=provenance,
)


def build_product_activity_read_model(
*, record_store: ProductReadModelStore, product: str, limit: int = 50
) -> ProductActivityReadModel:
Expand Down Expand Up @@ -1173,6 +1276,158 @@ def _secret_binding_summary(binding: SecretBinding) -> ProductSecretBindingSumma
)


def _runtime_config_status_items(
*,
requirements: tuple[ProductRuntimeConfigRequirement, ...],
lane: ProductLaneProfile,
lane_summary: LaunchplaneLaneSummary | None,
) -> tuple[ProductRuntimeConfigStatusItem, ...]:
applicable_requirements = tuple(
requirement
for requirement in requirements
if _config_requirement_applies(
requirement_context=requirement.context,
requirement_instance=requirement.instance,
lane=lane,
)
)
if not applicable_requirements:
return ()
records = lane_summary.runtime_environment_records if lane_summary is not None else ()
return tuple(
_runtime_config_status_item(requirement=requirement, records=records, lane=lane)
for requirement in applicable_requirements
)


def _runtime_config_status_item(
*,
requirement: ProductRuntimeConfigRequirement,
records: tuple[RuntimeEnvironmentRecord, ...],
lane: ProductLaneProfile,
) -> ProductRuntimeConfigStatusItem:
matching_record = next(
(record for record in records if _runtime_record_provides_key(record, requirement.key)),
None,
)
context = requirement.context or lane.context
instance = requirement.instance or (lane.instance if requirement.context else "")
if matching_record is None:
return ProductRuntimeConfigStatusItem(
key=requirement.key,
status="missing",
context=context,
instance=instance,
trust_state="missing",
)
return ProductRuntimeConfigStatusItem(
key=requirement.key,
status="configured",
context=matching_record.context,
instance=matching_record.instance,
source_label=matching_record.source_label,
updated_at=matching_record.updated_at,
trust_state="recorded",
)


def _runtime_record_provides_key(record: RuntimeEnvironmentRecord, key: str) -> bool:
return key in record.env


def _managed_secret_config_status_items(
*,
requirements: tuple[ProductSecretConfigRequirement, ...],
lane: ProductLaneProfile,
lane_summary: LaunchplaneLaneSummary | None,
) -> tuple[ProductManagedSecretConfigStatusItem, ...]:
applicable_requirements = tuple(
requirement
for requirement in requirements
if _config_requirement_applies(
requirement_context=requirement.context,
requirement_instance=requirement.instance,
lane=lane,
)
)
if not applicable_requirements:
return ()
bindings = lane_summary.secret_bindings if lane_summary is not None else ()
return tuple(
_managed_secret_config_status_item(
requirement=requirement,
bindings=bindings,
lane=lane,
)
for requirement in applicable_requirements
)


def _managed_secret_config_status_item(
*,
requirement: ProductSecretConfigRequirement,
bindings: tuple[SecretBinding, ...],
lane: ProductLaneProfile,
) -> ProductManagedSecretConfigStatusItem:
matching_binding = next(
(
binding
for binding in bindings
if binding.integration == requirement.integration
and binding.binding_key == requirement.binding_key
),
None,
)
context = requirement.context or lane.context
instance = requirement.instance or (lane.instance if requirement.context else "")
if matching_binding is None:
return ProductManagedSecretConfigStatusItem(
binding_key=requirement.binding_key,
status="missing",
integration=requirement.integration,
context=context,
instance=instance,
trust_state="missing",
)
if matching_binding.status == "disabled":
status: ProductConfigItemStatus = "disabled"
trust_state: ProductSecretBindingTrustState = "disabled"
else:
status = "configured"
trust_state = "recorded"
return ProductManagedSecretConfigStatusItem(
binding_key=requirement.binding_key,
status=status,
integration=matching_binding.integration,
context=matching_binding.context,
instance=matching_binding.instance,
updated_at=matching_binding.updated_at,
trust_state=trust_state,
)


def _config_item_freshness(status: ProductConfigItemStatus) -> FreshnessStatus:
if status == "configured":
return "recorded"
if status in {"disabled", "unvalidated"}:
return "missing"
if status == "stale":
return "stale"
if status == "unsupported":
return "unsupported"
return "missing"


def _config_requirement_applies(
*, requirement_context: str, requirement_instance: str, lane: ProductLaneProfile
) -> bool:
if not requirement_context:
return True
if requirement_context != lane.context:
return False
return not requirement_instance or requirement_instance == lane.instance


def _target_summary(lane_summary: LaunchplaneLaneSummary | None) -> ProductTargetSummary:
if lane_summary is None or lane_summary.dokploy_target is None:
return ProductTargetSummary()
Expand Down
47 changes: 47 additions & 0 deletions control_plane/contracts/product_onboarding_manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from control_plane.contracts.dokploy_target_record import DokployTargetType
from control_plane.contracts.product_profile_record import (
ProductExpectedConfigProfile,
ProductPreviewProfile,
ProductPromotionWorkflowProfile,
)
Expand Down Expand Up @@ -117,6 +118,10 @@ def _validate_binding(self) -> "ProductOnboardingSecretBindingManifest":
return self


class ProductOnboardingExpectedConfigManifest(ProductExpectedConfigProfile):
pass


class ProductOnboardingManifest(BaseModel):
model_config = ConfigDict(extra="forbid")

Expand All @@ -138,6 +143,9 @@ class ProductOnboardingManifest(BaseModel):
dokploy_targets: tuple[ProductOnboardingTargetManifest, ...] = ()
runtime_environments: tuple[ProductOnboardingRuntimeEnvironmentManifest, ...] = ()
secret_bindings: tuple[ProductOnboardingSecretBindingManifest, ...] = ()
expected_config: ProductOnboardingExpectedConfigManifest = Field(
default_factory=ProductOnboardingExpectedConfigManifest
)
updated_at: str = ""
source_label: str = "product-onboarding"

Expand Down Expand Up @@ -213,4 +221,43 @@ def _validate_manifest(self) -> "ProductOnboardingManifest":
"instance secret binding must match a stable lane: "
f"{binding.context}/{binding.instance}"
)
for runtime_requirement in self.expected_config.runtime_environment_keys:
self._validate_expected_config_route(
allowed_contexts=allowed_contexts,
lane_routes=lane_routes,
context=runtime_requirement.context,
instance=runtime_requirement.instance,
label="runtime config requirement",
)
for secret_requirement in self.expected_config.managed_secret_bindings:
self._validate_expected_config_route(
allowed_contexts=allowed_contexts,
lane_routes=lane_routes,
context=secret_requirement.context,
instance=secret_requirement.instance,
label="secret config requirement",
)
return self

@staticmethod
def _validate_expected_config_route(
*,
allowed_contexts: set[str],
lane_routes: set[tuple[str, str]],
context: str,
instance: str,
label: str,
) -> None:
if not context.strip():
return
if context.strip() not in allowed_contexts:
raise ValueError(f"{label} context is not owned by the product profile: {context}")
if instance.strip() and (context.strip(), instance.strip()) not in lane_routes:
raise ValueError(
f"instance {label} must match a stable lane: {context}/{instance}"
)

def product_expected_config_profile(self) -> ProductExpectedConfigProfile:
return ProductExpectedConfigProfile.model_validate(
self.expected_config.model_dump(mode="json")
)
Loading