From 72cdf8a04de75d98424fc5df329e63f543448c0b Mon Sep 17 00:00:00 2001 From: Aldon Smith Date: Sat, 23 May 2026 15:14:04 -0400 Subject: [PATCH] feat: add connection profile API --- docs/DATA_CONTRACTS.md | 15 ++ docs/LEARNING_LOG.md | 53 ++++++ services/api/README.md | 35 ++++ .../api/factory_api/connection_profiles.py | 127 +++++++++++++ services/api/factory_api/main.py | 95 +++++++++- .../api/tests/test_connection_profiles_api.py | 178 ++++++++++++++++++ 6 files changed, 502 insertions(+), 1 deletion(-) create mode 100644 services/api/factory_api/connection_profiles.py create mode 100644 services/api/tests/test_connection_profiles_api.py diff --git a/docs/DATA_CONTRACTS.md b/docs/DATA_CONTRACTS.md index c1b7821..2985861 100644 --- a/docs/DATA_CONTRACTS.md +++ b/docs/DATA_CONTRACTS.md @@ -520,6 +520,21 @@ Run the contract tests with: .venv/bin/python -m pytest packages/factory-events/tests/test_connection_profile_contracts.py ``` +The API service exposes local profile management endpoints backed by the same +contract: + +- `POST /connection-profiles` +- `GET /connection-profiles` +- `GET /connection-profiles/{profile_id}` +- `PUT /connection-profiles/{profile_id}` +- `POST /connection-profiles/{profile_id}/disable` +- `DELETE /connection-profiles/{profile_id}` + +API responses are browser-facing and redact secret/certificate reference names. +The local JSON store keeps the configured references so backend-only follow-up +work can test connections without asking the browser to hold secrets or +certificate locations. + ## Versioning Use semantic-style schema versions: diff --git a/docs/LEARNING_LOG.md b/docs/LEARNING_LOG.md index b27554b..4d23834 100644 --- a/docs/LEARNING_LOG.md +++ b/docs/LEARNING_LOG.md @@ -130,6 +130,59 @@ make test-contract Use this contract to implement the connection profile API and storage layer without adding adapter polling, tag browsing, or industrial writeback behavior. +## 2026-05-23 - Connection profile API and storage + +### What changed + +Added local JSON-backed API endpoints for creating, listing, reading, updating, +disabling, and deleting protocol connection profiles. The API validates request +bodies with the shared `ProtocolConnectionProfile` schema and redacts secret and +certificate reference names before returning browser-facing responses. + +### Why it matters + +The Workbench and future connector diagnostics need a backend-owned source of +truth for OPC-UA, MQTT, and BACnet profile definitions. This step adds that +source of truth without starting protocol ingestion or adding any industrial +writeback path. + +### How it works + +`ConnectionProfileStore` persists validated profiles to +`.local/storage/connection_profiles.json` by default. API responses keep the +profile shape useful for the browser but replace reference names with +`configured` and `purpose` metadata. Create/update requests fail validation +before storage if they do not satisfy the shared schema. + +### How to run it + +```bash +make api +``` + +Use `FACTORY_CONNECTION_PROFILES_STORE` to point the API at a different local +JSON profile store when testing. + +### How to test it + +```bash +.venv/bin/python -m pytest services/api/tests/test_connection_profiles_api.py +make test-integration +``` + +### Key files + +- `services/api/factory_api/connection_profiles.py` +- `services/api/factory_api/main.py` +- `services/api/tests/test_connection_profiles_api.py` +- `services/api/README.md` + +### What to learn next + +Use the stored profile contract to build the read-only test-connection API. +Keep actual protocol polling, subscription workers, and tag browsing separate +until the test-connection boundary is implemented and tested. + ## 2026-05-23 - OPC UA demo ingestion worker ### What changed diff --git a/services/api/README.md b/services/api/README.md index 95c385e..0616aa7 100644 --- a/services/api/README.md +++ b/services/api/README.md @@ -7,11 +7,46 @@ The API currently exposes: - Health and event query endpoints. - Read-only domain context endpoints for sites, areas, equipment, process signals, batches, quality results, deviations, alerts, and investigations. +- Local connection profile CRUD endpoints for OPC-UA, MQTT, and BACnet + definitions. - Process Sentinel detections, evidence, recommendations, and RCA/CAPA draft endpoints over local demo state. - Governed recommendation review endpoints, including status-filtered recommendation lists, decision history, and local audit events. +## Connection Profile API + +Connection profiles are stored locally in JSON for the development stack. The +default store is: + +```text +.local/storage/connection_profiles.json +``` + +Override it with: + +```bash +FACTORY_CONNECTION_PROFILES_STORE=.local/storage/connection_profiles.json +``` + +Endpoints: + +- `POST /connection-profiles` - create a validated profile. +- `GET /connection-profiles` - list browser-facing profiles. +- `GET /connection-profiles/{profile_id}` - read one browser-facing profile. +- `PUT /connection-profiles/{profile_id}` - replace an existing profile. +- `POST /connection-profiles/{profile_id}/disable` - mark a profile disabled. +- `DELETE /connection-profiles/{profile_id}` - remove a local profile. + +Requests are validated with the shared `ProtocolConnectionProfile` schema from +`packages/factory-events`. Browser-facing responses redact secret and +certificate reference names. They preserve whether a reference is configured and +its purpose, but they do not return the configured reference string. + +Creating, updating, disabling, or deleting a profile does not start protocol +ingestion. Read-only test connection behavior and adapter polling are tracked as +separate follow-up work. + ## Governed Recommendation Audit Reads The simulator-backed demo API records local governed recommendation decisions diff --git a/services/api/factory_api/connection_profiles.py b/services/api/factory_api/connection_profiles.py new file mode 100644 index 0000000..3b99de1 --- /dev/null +++ b/services/api/factory_api/connection_profiles.py @@ -0,0 +1,127 @@ +from __future__ import annotations + +import json +from pathlib import Path +from typing import Any + +from factory_events import ProtocolConnectionProfile, validate_connection_profile + +REFERENCE_FIELD_SUFFIXES = ("_secret_ref", "_certificate_ref") + + +class DuplicateConnectionProfileError(ValueError): + pass + + +class ConnectionProfileIdMismatchError(ValueError): + pass + + +class ConnectionProfileStore: + def __init__(self, path: Path) -> None: + self.path = path + self.path.parent.mkdir(parents=True, exist_ok=True) + + def list_profiles(self) -> list[ProtocolConnectionProfile]: + if not self.path.exists(): + return [] + return [ + validate_connection_profile(item) + for item in json.loads(self.path.read_text(encoding="utf-8")) + ] + + def get_profile(self, profile_id: str) -> ProtocolConnectionProfile | None: + return next((profile for profile in self.list_profiles() if profile.id == profile_id), None) + + def create_profile(self, profile: ProtocolConnectionProfile) -> ProtocolConnectionProfile: + stored_profiles = self.list_profiles() + if any(item.id == profile.id for item in stored_profiles): + msg = f"connection profile already exists: {profile.id}" + raise DuplicateConnectionProfileError(msg) + stored_profile = self._validate_before_storage(profile) + stored_profiles.append(stored_profile) + self._write_profiles(stored_profiles) + return stored_profile + + def replace_profile( + self, profile_id: str, profile: ProtocolConnectionProfile + ) -> ProtocolConnectionProfile | None: + if profile.id != profile_id: + msg = "path profile_id must match request body id" + raise ConnectionProfileIdMismatchError(msg) + + replacement = self._validate_before_storage(profile) + stored_profiles = self.list_profiles() + replaced = False + updated_profiles: list[ProtocolConnectionProfile] = [] + for stored_profile in stored_profiles: + if stored_profile.id == profile_id: + updated_profiles.append(replacement) + replaced = True + else: + updated_profiles.append(stored_profile) + if not replaced: + return None + self._write_profiles(updated_profiles) + return replacement + + def disable_profile(self, profile_id: str) -> ProtocolConnectionProfile | None: + stored_profiles = self.list_profiles() + updated_profiles: list[ProtocolConnectionProfile] = [] + disabled_profile: ProtocolConnectionProfile | None = None + for profile in stored_profiles: + if profile.id == profile_id: + disabled_profile = profile.model_copy( + update={"enabled": False, "health_state": "disabled"} + ) + updated_profiles.append(disabled_profile) + else: + updated_profiles.append(profile) + if disabled_profile is None: + return None + self._write_profiles(updated_profiles) + return disabled_profile + + def delete_profile(self, profile_id: str) -> bool: + stored_profiles = self.list_profiles() + kept_profiles = [profile for profile in stored_profiles if profile.id != profile_id] + if len(kept_profiles) == len(stored_profiles): + return False + self._write_profiles(kept_profiles) + return True + + def _validate_before_storage( + self, profile: ProtocolConnectionProfile + ) -> ProtocolConnectionProfile: + return validate_connection_profile(profile.model_dump(mode="json")) + + def _write_profiles(self, profiles: list[ProtocolConnectionProfile]) -> None: + self.path.write_text( + json.dumps( + [profile.model_dump(mode="json") for profile in profiles], + indent=2, + sort_keys=True, + ) + + "\n", + encoding="utf-8", + ) + + +def serialize_connection_profile_for_browser(profile: ProtocolConnectionProfile) -> dict[str, Any]: + return redact_connection_profile_references(profile.model_dump(mode="json", exclude_none=True)) + + +def redact_connection_profile_references(value: Any, *, field_name: str | None = None) -> Any: + if isinstance(value, list): + return [redact_connection_profile_references(item) for item in value] + if isinstance(value, dict): + if field_name is not None and field_name.endswith(REFERENCE_FIELD_SUFFIXES): + return { + "configured": True, + "purpose": value["purpose"], + } + return { + key: redact_connection_profile_references(item, field_name=key) + for key, item in value.items() + } + return value diff --git a/services/api/factory_api/main.py b/services/api/factory_api/main.py index 6386433..f66e82c 100644 --- a/services/api/factory_api/main.py +++ b/services/api/factory_api/main.py @@ -4,13 +4,21 @@ from pathlib import Path from typing import Literal +from factory_events import ProtocolConnectionProfile from factory_ingestion.storage import JsonlEventStore from fastapi import FastAPI +from fastapi.exceptions import RequestValidationError from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse from process_sentinel.storage import InvalidRecommendationTransitionError, SentinelStateStore from pydantic import BaseModel, Field +from factory_api.connection_profiles import ( + ConnectionProfileIdMismatchError, + ConnectionProfileStore, + DuplicateConnectionProfileError, + serialize_connection_profile_for_browser, +) from factory_api.domain import DomainData, build_demo_domain_data DEFAULT_CORS_ORIGINS = ( @@ -51,6 +59,7 @@ def create_app( *, events_store_path: Path | None = None, sentinel_state_dir: Path | None = None, + connection_profiles_store_path: Path | None = None, domain_data: DomainData | None = None, ) -> FastAPI: resolved_events_store = events_store_path or Path( @@ -59,6 +68,9 @@ def create_app( resolved_state_dir = sentinel_state_dir or Path( os.getenv("SENTINEL_STATE_DIR", ".local/storage/sentinel") ) + resolved_connection_profiles_store = connection_profiles_store_path or Path( + os.getenv("FACTORY_CONNECTION_PROFILES_STORE", ".local/storage/connection_profiles.json") + ) resolved_domain_data = domain_data or build_demo_domain_data() app = FastAPI( @@ -71,7 +83,7 @@ def create_app( app.add_middleware( CORSMiddleware, allow_headers=["accept", "content-type"], - allow_methods=["GET", "POST", "OPTIONS"], + allow_methods=["DELETE", "GET", "OPTIONS", "POST", "PUT"], allow_origins=cors_origins, ) @@ -89,12 +101,35 @@ def conflict_handler(_request: object, exc: ApiConflictError) -> JSONResponse: content={"error": {"code": exc.code, "message": exc.message}}, ) + @app.exception_handler(RequestValidationError) + def validation_error_handler(_request: object, exc: RequestValidationError) -> JSONResponse: + return JSONResponse( + status_code=422, + content={ + "error": { + "code": "request_validation_failed", + "message": "Request validation failed.", + "issues": [ + { + "loc": list(error["loc"]), + "msg": str(error["msg"]), + "type": str(error["type"]), + } + for error in exc.errors() + ], + } + }, + ) + def event_store() -> JsonlEventStore: return JsonlEventStore(resolved_events_store) def sentinel_store() -> SentinelStateStore: return SentinelStateStore(resolved_state_dir) + def connection_profile_store() -> ConnectionProfileStore: + return ConnectionProfileStore(resolved_connection_profiles_store) + def domain() -> DomainData: return resolved_domain_data @@ -105,6 +140,7 @@ def health() -> dict: "simulator_backed": True, "events_store": str(resolved_events_store), "sentinel_state_dir": str(resolved_state_dir), + "connection_profiles_store": str(resolved_connection_profiles_store), } @app.get("/events") @@ -114,6 +150,63 @@ def list_events(event_type: str | None = None) -> list[dict]: events = [event for event in events if event.event_type == event_type] return [event.model_dump(mode="json") for event in events] + @app.post("/connection-profiles", status_code=201) + def create_connection_profile(profile: ProtocolConnectionProfile) -> dict: + try: + stored_profile = connection_profile_store().create_profile(profile) + except DuplicateConnectionProfileError as exc: + raise_conflict("connection_profile_exists", str(exc)) + return serialize_connection_profile_for_browser(stored_profile) + + @app.get("/connection-profiles") + def list_connection_profiles() -> list[dict]: + return [ + serialize_connection_profile_for_browser(profile) + for profile in connection_profile_store().list_profiles() + ] + + @app.get("/connection-profiles/{profile_id}") + def get_connection_profile(profile_id: str) -> dict: + profile = connection_profile_store().get_profile(profile_id) + if profile is None: + raise_not_found( + "connection_profile_not_found", + f"Connection profile not found: {profile_id}", + ) + return serialize_connection_profile_for_browser(profile) + + @app.put("/connection-profiles/{profile_id}") + def update_connection_profile(profile_id: str, profile: ProtocolConnectionProfile) -> dict: + try: + updated_profile = connection_profile_store().replace_profile(profile_id, profile) + except ConnectionProfileIdMismatchError as exc: + raise_conflict("connection_profile_id_mismatch", str(exc)) + if updated_profile is None: + raise_not_found( + "connection_profile_not_found", + f"Connection profile not found: {profile_id}", + ) + return serialize_connection_profile_for_browser(updated_profile) + + @app.post("/connection-profiles/{profile_id}/disable") + def disable_connection_profile(profile_id: str) -> dict: + profile = connection_profile_store().disable_profile(profile_id) + if profile is None: + raise_not_found( + "connection_profile_not_found", + f"Connection profile not found: {profile_id}", + ) + return serialize_connection_profile_for_browser(profile) + + @app.delete("/connection-profiles/{profile_id}", status_code=204) + def delete_connection_profile(profile_id: str) -> None: + deleted = connection_profile_store().delete_profile(profile_id) + if not deleted: + raise_not_found( + "connection_profile_not_found", + f"Connection profile not found: {profile_id}", + ) + @app.get("/events/{event_id}") def get_event(event_id: str) -> dict: event = event_store().get_event(event_id) diff --git a/services/api/tests/test_connection_profiles_api.py b/services/api/tests/test_connection_profiles_api.py new file mode 100644 index 0000000..05c58a2 --- /dev/null +++ b/services/api/tests/test_connection_profiles_api.py @@ -0,0 +1,178 @@ +from __future__ import annotations + +import copy +import json +from pathlib import Path +from typing import Any + +from factory_api.main import create_app +from fastapi.testclient import TestClient + +REPO_ROOT = Path(__file__).resolve().parents[3] +PROFILE_FIXTURES = REPO_ROOT / "packages" / "test-fixtures" / "valid-connection-profiles" + + +def load_profile(name: str) -> dict[str, Any]: + return json.loads((PROFILE_FIXTURES / name).read_text(encoding="utf-8")) + + +def client_for(tmp_path: Path) -> TestClient: + return TestClient( + create_app( + events_store_path=tmp_path / "events.jsonl", + sentinel_state_dir=tmp_path / "sentinel", + connection_profiles_store_path=tmp_path / "connection_profiles.json", + ) + ) + + +def test_connection_profile_create_list_and_detail_are_redacted(tmp_path: Path) -> None: + client = client_for(tmp_path) + profile = load_profile("opcua_connection_profile.json") + + create_response = client.post("/connection-profiles", json=profile) + list_response = client.get("/connection-profiles") + detail_response = client.get(f"/connection-profiles/{profile['id']}") + + assert create_response.status_code == 201 + assert list_response.status_code == 200 + assert detail_response.status_code == 200 + assert list_response.json() == [detail_response.json()] + assert detail_response.json()["id"] == "conn_opcua_filler_1" + assert detail_response.json()["protocol"] == "opcua" + assert detail_response.json()["opcua"]["auth_secret_ref"] == { + "configured": True, + "purpose": "username_password", + } + assert detail_response.json()["opcua"]["client_certificate_ref"] == { + "configured": True, + "purpose": "client_certificate", + } + serialized_response = json.dumps(detail_response.json()) + assert "local/opcua/filler-1/user" not in serialized_response + assert "local/opcua/filler-1/client-cert" not in serialized_response + assert "local/opcua/filler-1/server-cert" not in serialized_response + + +def test_connection_profile_create_persists_unredacted_references_locally( + tmp_path: Path, +) -> None: + client = client_for(tmp_path) + profile = load_profile("mqtt_connection_profile.json") + store_path = tmp_path / "connection_profiles.json" + + response = client.post("/connection-profiles", json=profile) + + assert response.status_code == 201 + stored_profiles = json.loads(store_path.read_text(encoding="utf-8")) + assert stored_profiles[0]["id"] == "conn_mqtt_packaging_uns" + assert stored_profiles[0]["mqtt"]["auth_secret_ref"]["ref"] == ( + "local/mqtt/packaging/client-auth" + ) + assert "local/mqtt/packaging/client-auth" not in json.dumps(response.json()) + + +def test_connection_profile_update_replaces_existing_profile(tmp_path: Path) -> None: + client = client_for(tmp_path) + profile = load_profile("mqtt_connection_profile.json") + client.post("/connection-profiles", json=profile) + updated_profile = copy.deepcopy(profile) + updated_profile["name"] = "Packaging UNS MQTT Updated" + updated_profile["description"] = "Updated read-only MQTT subscription." + updated_profile["updated_at"] = "2026-05-23T12:05:00Z" + + update_response = client.put(f"/connection-profiles/{profile['id']}", json=updated_profile) + detail_response = client.get(f"/connection-profiles/{profile['id']}") + + assert update_response.status_code == 200 + assert update_response.json()["name"] == "Packaging UNS MQTT Updated" + assert detail_response.json()["description"] == "Updated read-only MQTT subscription." + assert len(client.get("/connection-profiles").json()) == 1 + + +def test_connection_profile_disable_and_delete_behavior(tmp_path: Path) -> None: + client = client_for(tmp_path) + profile = load_profile("bacnet_connection_profile.json") + profile["enabled"] = True + profile["health_state"] = "unknown" + client.post("/connection-profiles", json=profile) + + disable_response = client.post(f"/connection-profiles/{profile['id']}/disable") + detail_response = client.get(f"/connection-profiles/{profile['id']}") + delete_response = client.delete(f"/connection-profiles/{profile['id']}") + missing_response = client.get(f"/connection-profiles/{profile['id']}") + + assert disable_response.status_code == 200 + assert disable_response.json()["enabled"] is False + assert disable_response.json()["health_state"] == "disabled" + assert detail_response.json()["enabled"] is False + assert delete_response.status_code == 204 + assert missing_response.status_code == 404 + assert missing_response.json()["error"]["code"] == "connection_profile_not_found" + + +def test_connection_profile_validation_failure_does_not_store_profile(tmp_path: Path) -> None: + client = client_for(tmp_path) + profile = load_profile("opcua_connection_profile.json") + profile["protocol"] = "mqtt" + + response = client.post("/connection-profiles", json=profile) + + assert response.status_code == 422 + assert client.get("/connection-profiles").json() == [] + assert not (tmp_path / "connection_profiles.json").exists() + + +def test_connection_profile_validation_error_does_not_echo_raw_secret( + tmp_path: Path, +) -> None: + client = client_for(tmp_path) + profile = load_profile("opcua_connection_profile.json") + profile["opcua"]["auth_secret_ref"]["password"] = "super-secret-value" + + response = client.post("/connection-profiles", json=profile) + + assert response.status_code == 422 + assert response.json()["error"]["code"] == "request_validation_failed" + assert "super-secret-value" not in response.text + + +def test_connection_profile_rejects_duplicate_and_path_id_mismatch(tmp_path: Path) -> None: + client = client_for(tmp_path) + profile = load_profile("opcua_connection_profile.json") + client.post("/connection-profiles", json=profile) + + duplicate_response = client.post("/connection-profiles", json=profile) + mismatch_response = client.put("/connection-profiles/different-id", json=profile) + + assert duplicate_response.status_code == 409 + assert duplicate_response.json()["error"]["code"] == "connection_profile_exists" + assert mismatch_response.status_code == 409 + assert mismatch_response.json()["error"]["code"] == "connection_profile_id_mismatch" + + +def test_connection_profile_update_and_delete_missing_profiles_return_not_found( + tmp_path: Path, +) -> None: + client = client_for(tmp_path) + profile = load_profile("opcua_connection_profile.json") + + update_response = client.put(f"/connection-profiles/{profile['id']}", json=profile) + disable_response = client.post(f"/connection-profiles/{profile['id']}/disable") + delete_response = client.delete(f"/connection-profiles/{profile['id']}") + + assert update_response.status_code == 404 + assert disable_response.status_code == 404 + assert delete_response.status_code == 404 + assert update_response.json()["error"]["code"] == "connection_profile_not_found" + + +def test_connection_profile_create_does_not_start_ingestion(tmp_path: Path) -> None: + client = client_for(tmp_path) + profile = load_profile("opcua_connection_profile.json") + events_store_path = tmp_path / "events.jsonl" + + response = client.post("/connection-profiles", json=profile) + + assert response.status_code == 201 + assert not events_store_path.exists()