From 180db7041031a5489769f1300508afb009f9f160 Mon Sep 17 00:00:00 2001 From: Aldon Smith Date: Sat, 23 May 2026 15:24:43 -0400 Subject: [PATCH] feat: add read-only connection test API --- docs/DATA_CONTRACTS.md | 8 + docs/LEARNING_LOG.md | 54 +++++ services/api/README.md | 19 +- .../api/factory_api/connection_profiles.py | 19 +- services/api/factory_api/connection_tests.py | 210 ++++++++++++++++++ services/api/factory_api/main.py | 11 + .../api/tests/test_connection_profiles_api.py | 119 ++++++++++ 7 files changed, 434 insertions(+), 6 deletions(-) create mode 100644 services/api/factory_api/connection_tests.py diff --git a/docs/DATA_CONTRACTS.md b/docs/DATA_CONTRACTS.md index 2985861..fc15b18 100644 --- a/docs/DATA_CONTRACTS.md +++ b/docs/DATA_CONTRACTS.md @@ -528,6 +528,7 @@ contract: - `GET /connection-profiles/{profile_id}` - `PUT /connection-profiles/{profile_id}` - `POST /connection-profiles/{profile_id}/disable` +- `POST /connection-profiles/{profile_id}/test` - `DELETE /connection-profiles/{profile_id}` API responses are browser-facing and redact secret/certificate reference names. @@ -535,6 +536,13 @@ The local JSON store keeps the configured references so backend-only follow-up work can test connections without asking the browser to hold secrets or certificate locations. +The test endpoint returns a structured diagnostic result with `connection_id`, +`protocol`, `status`, `checked_at`, `duration_ms`, `message`, and +protocol-specific `diagnostics`. Current tests use fake protocol clients for +deterministic read-only behavior. Results must not include raw credentials, +tokens, private keys, certificates, or configured secret/certificate reference +names. + ## Versioning Use semantic-style schema versions: diff --git a/docs/LEARNING_LOG.md b/docs/LEARNING_LOG.md index 4d23834..9db430f 100644 --- a/docs/LEARNING_LOG.md +++ b/docs/LEARNING_LOG.md @@ -183,6 +183,60 @@ Use the stored profile contract to build the read-only test-connection API. Keep actual protocol polling, subscription workers, and tag browsing separate until the test-connection boundary is implemented and tested. +## 2026-05-23 - Read-only connection test API + +### What changed + +Added `POST /connection-profiles/{profile_id}/test` for read-only connection +diagnostics over stored OPC-UA, MQTT, and BACnet profiles. The endpoint returns +status, timing, readable messages, and protocol-specific diagnostic details. + +### Why it matters + +Operators need to verify a configured source before any adapter or ingestion +worker is introduced. This step proves the backend contract and response shape +without opening live protocol sessions, publishing messages, writing tags, or +starting ingestion. + +### How it works + +The endpoint reads the stored profile data, validates it with the shared schema, +skips disabled profiles, and uses deterministic fake protocol clients for +success and unreachable-target behavior. Results are redacted so credentials, +tokens, private keys, certificate bodies, and configured reference names are not +returned to the browser. + +### How to run it + +```bash +make api +``` + +Create a profile with `POST /connection-profiles`, then call: + +```text +POST /connection-profiles/{profile_id}/test +``` + +### How to test it + +```bash +.venv/bin/python -m pytest services/api/tests/test_connection_profiles_api.py +make test-integration +``` + +### Key files + +- `services/api/factory_api/connection_tests.py` +- `services/api/factory_api/connection_profiles.py` +- `services/api/factory_api/main.py` +- `services/api/tests/test_connection_profiles_api.py` + +### What to learn next + +Use this diagnostic response shape when building protocol diagnostics UI. Keep +live adapter implementations and tag/source browsing in their own scoped issues. + ## 2026-05-23 - OPC UA demo ingestion worker ### What changed diff --git a/services/api/README.md b/services/api/README.md index 0616aa7..2389e48 100644 --- a/services/api/README.md +++ b/services/api/README.md @@ -36,6 +36,8 @@ Endpoints: - `GET /connection-profiles/{profile_id}` - read one browser-facing profile. - `PUT /connection-profiles/{profile_id}` - replace an existing profile. - `POST /connection-profiles/{profile_id}/disable` - mark a profile disabled. +- `POST /connection-profiles/{profile_id}/test` - run a deterministic + read-only connection diagnostic. - `DELETE /connection-profiles/{profile_id}` - remove a local profile. Requests are validated with the shared `ProtocolConnectionProfile` schema from @@ -43,9 +45,20 @@ Requests are validated with the shared `ProtocolConnectionProfile` schema from certificate reference names. They preserve whether a reference is configured and its purpose, but they do not return the configured reference string. -Creating, updating, disabling, or deleting a profile does not start protocol -ingestion. Read-only test connection behavior and adapter polling are tracked as -separate follow-up work. +Creating, updating, disabling, deleting, or testing a profile does not start +protocol ingestion. + +The test endpoint currently uses fake protocol clients so API behavior is +deterministic before live adapter work lands. It validates the stored profile, +skips disabled profiles, simulates unreachable endpoints for readable failure +results, and returns protocol-specific diagnostics without raw credentials, +tokens, private keys, certificates, or configured secret/certificate reference +names. The diagnostic is limited to read-only operations such as connecting, +reading configured OPC-UA nodes, subscribing to configured MQTT topic filters, +or reading configured BACnet objects. + +Adapter polling, live protocol sessions, tag browsing, and ingestion workers are +tracked as separate follow-up work. ## Governed Recommendation Audit Reads diff --git a/services/api/factory_api/connection_profiles.py b/services/api/factory_api/connection_profiles.py index 3b99de1..735642e 100644 --- a/services/api/factory_api/connection_profiles.py +++ b/services/api/factory_api/connection_profiles.py @@ -23,16 +23,29 @@ def __init__(self, path: Path) -> None: self.path.parent.mkdir(parents=True, exist_ok=True) def list_profiles(self) -> list[ProtocolConnectionProfile]: - if not self.path.exists(): - return [] return [ validate_connection_profile(item) - for item in json.loads(self.path.read_text(encoding="utf-8")) + for item in self.list_profile_data() ] + def list_profile_data(self) -> list[dict[str, Any]]: + if not self.path.exists(): + return [] + return json.loads(self.path.read_text(encoding="utf-8")) + def get_profile(self, profile_id: str) -> ProtocolConnectionProfile | None: return next((profile for profile in self.list_profiles() if profile.id == profile_id), None) + def get_profile_data(self, profile_id: str) -> dict[str, Any] | None: + return next( + ( + profile + for profile in self.list_profile_data() + if isinstance(profile, dict) and profile.get("id") == profile_id + ), + None, + ) + def create_profile(self, profile: ProtocolConnectionProfile) -> ProtocolConnectionProfile: stored_profiles = self.list_profiles() if any(item.id == profile.id for item in stored_profiles): diff --git a/services/api/factory_api/connection_tests.py b/services/api/factory_api/connection_tests.py new file mode 100644 index 0000000..b13942f --- /dev/null +++ b/services/api/factory_api/connection_tests.py @@ -0,0 +1,210 @@ +from __future__ import annotations + +import re +from datetime import UTC, datetime +from time import perf_counter +from typing import Any, Literal, Protocol + +from factory_events import ProtocolConnectionProfile, validate_connection_profile +from pydantic import BaseModel, ConfigDict, Field, ValidationError + +ConnectionTestStatus = Literal["healthy", "failed", "disabled", "invalid"] + +SECRET_VALUE_PATTERN = re.compile( + r"(?i)(password|token|private[_-]?key|certificate|cert)([=:])([^@\s,;]+)" +) +URL_CREDENTIAL_PATTERN = re.compile(r"(://)([^/@:]+):([^/@]+)@") + + +class StrictModel(BaseModel): + model_config = ConfigDict(extra="forbid") + + +class ConnectionTestResult(StrictModel): + connection_id: str = Field(min_length=1) + protocol: str = Field(min_length=1) + status: ConnectionTestStatus + checked_at: datetime + duration_ms: float = Field(ge=0) + message: str = Field(min_length=1) + diagnostics: dict[str, Any] = Field(default_factory=dict) + + +class ProtocolTestClient(Protocol): + def test(self, profile: ProtocolConnectionProfile) -> dict[str, Any]: + ... + + +class FakeProtocolTestClient: + def test(self, profile: ProtocolConnectionProfile) -> dict[str, Any]: + if _looks_unreachable(profile.endpoint): + msg = f"{profile.protocol} endpoint is unreachable: {profile.endpoint}" + raise ConnectionError(redact_sensitive_text(msg)) + + if profile.protocol == "opcua": + if profile.opcua is None: + msg = "OPC-UA profile is missing its config block" + raise ValueError(msg) + return { + "read_only": True, + "allowed_operations": ["connect", "read_configured_nodes"], + "node_count": len(profile.opcua.node_ids), + "security_mode": profile.opcua.security_mode, + "security_policy_configured": profile.opcua.security_policy is not None, + } + if profile.protocol == "mqtt": + if profile.mqtt is None: + msg = "MQTT profile is missing its config block" + raise ValueError(msg) + return { + "read_only": True, + "allowed_operations": ["connect", "subscribe_configured_topics"], + "topic_filter_count": len(profile.mqtt.topic_filters), + "qos": profile.mqtt.qos, + "tls_enabled": profile.mqtt.use_tls, + } + if profile.protocol == "bacnet": + if profile.bacnet is None: + msg = "BACnet profile is missing its config block" + raise ValueError(msg) + return { + "read_only": True, + "allowed_operations": ["connect", "read_configured_objects"], + "device_instance": profile.bacnet.device_instance, + "object_count": len(profile.bacnet.object_references), + "network_number_configured": profile.bacnet.network_number is not None, + } + + msg = f"Unsupported protocol: {profile.protocol}" + raise ValueError(msg) + + +def test_connection_profile_data( + profile_data: dict[str, Any], + *, + client: ProtocolTestClient | None = None, +) -> ConnectionTestResult: + started = perf_counter() + checked_at = datetime.now(UTC) + connection_id = str(profile_data.get("id") or "unknown") + protocol = str(profile_data.get("protocol") or "unknown") + + try: + profile = validate_connection_profile(profile_data) + except ValidationError as exc: + return _result( + connection_id=connection_id, + protocol=protocol, + status="invalid", + checked_at=checked_at, + started=started, + message="Connection profile failed shared schema validation.", + diagnostics={ + "validation_errors": [ + { + "loc": list(error["loc"]), + "msg": str(error["msg"]), + "type": str(error["type"]), + } + for error in exc.errors() + ] + }, + ) + + if not profile.enabled: + return _result( + connection_id=profile.id, + protocol=profile.protocol, + status="disabled", + checked_at=checked_at, + started=started, + message="Connection profile is disabled; no protocol test was attempted.", + diagnostics={ + "read_only": True, + "attempted": False, + "reason": "disabled_profile", + }, + ) + + test_client = client or FakeProtocolTestClient() + try: + diagnostics = test_client.test(profile) + except (ConnectionError, TimeoutError, ValueError) as exc: + return _result( + connection_id=profile.id, + protocol=profile.protocol, + status="failed", + checked_at=checked_at, + started=started, + message=redact_sensitive_text(str(exc)), + diagnostics={ + "read_only": True, + "attempted": True, + "error_type": type(exc).__name__, + }, + ) + + return _result( + connection_id=profile.id, + protocol=profile.protocol, + status="healthy", + checked_at=checked_at, + started=started, + message="Read-only connection test completed.", + diagnostics={ + "attempted": True, + "endpoint_checked": True, + **redact_diagnostics(diagnostics), + }, + ) + + +def _result( + *, + connection_id: str, + protocol: str, + status: ConnectionTestStatus, + checked_at: datetime, + started: float, + message: str, + diagnostics: dict[str, Any], +) -> ConnectionTestResult: + return ConnectionTestResult( + connection_id=connection_id, + protocol=protocol, + status=status, + checked_at=checked_at, + duration_ms=round((perf_counter() - started) * 1000, 3), + message=redact_sensitive_text(message), + diagnostics=redact_diagnostics(diagnostics), + ) + + +def redact_diagnostics(value: Any) -> Any: + if isinstance(value, str): + return redact_sensitive_text(value) + if isinstance(value, list): + return [redact_diagnostics(item) for item in value] + if isinstance(value, dict): + redacted: dict[str, Any] = {} + for key, item in value.items(): + lowered = key.lower() + if any(term in lowered for term in ("password", "token", "private_key", "certificate")): + redacted[key] = "" + else: + redacted[key] = redact_diagnostics(item) + return redacted + return value + + +def redact_sensitive_text(value: str) -> str: + value = URL_CREDENTIAL_PATTERN.sub(r"\1@", value) + return SECRET_VALUE_PATTERN.sub( + lambda match: f"{match.group(1)}{match.group(2)}", + value, + ) + + +def _looks_unreachable(endpoint: str) -> bool: + lowered = endpoint.lower() + return any(token in lowered for token in ("unreachable", "offline", "timeout", "invalid-host")) diff --git a/services/api/factory_api/main.py b/services/api/factory_api/main.py index f66e82c..ae9da76 100644 --- a/services/api/factory_api/main.py +++ b/services/api/factory_api/main.py @@ -19,6 +19,7 @@ DuplicateConnectionProfileError, serialize_connection_profile_for_browser, ) +from factory_api.connection_tests import test_connection_profile_data from factory_api.domain import DomainData, build_demo_domain_data DEFAULT_CORS_ORIGINS = ( @@ -207,6 +208,16 @@ def delete_connection_profile(profile_id: str) -> None: f"Connection profile not found: {profile_id}", ) + @app.post("/connection-profiles/{profile_id}/test") + def test_connection_profile(profile_id: str) -> dict: + profile_data = connection_profile_store().get_profile_data(profile_id) + if profile_data is None: + raise_not_found( + "connection_profile_not_found", + f"Connection profile not found: {profile_id}", + ) + return test_connection_profile_data(profile_data).model_dump(mode="json") + @app.get("/events/{event_id}") def get_event(event_id: str) -> dict: event = event_store().get_event(event_id) diff --git a/services/api/tests/test_connection_profiles_api.py b/services/api/tests/test_connection_profiles_api.py index 05c58a2..3430ca4 100644 --- a/services/api/tests/test_connection_profiles_api.py +++ b/services/api/tests/test_connection_profiles_api.py @@ -5,6 +5,7 @@ from pathlib import Path from typing import Any +import pytest from factory_api.main import create_app from fastapi.testclient import TestClient @@ -176,3 +177,121 @@ def test_connection_profile_create_does_not_start_ingestion(tmp_path: Path) -> N assert response.status_code == 201 assert not events_store_path.exists() + + +@pytest.mark.parametrize( + ("fixture_name", "expected_protocol", "expected_diagnostic_key"), + [ + ("opcua_connection_profile.json", "opcua", "node_count"), + ("mqtt_connection_profile.json", "mqtt", "topic_filter_count"), + ("bacnet_connection_profile.json", "bacnet", "object_count"), + ], +) +def test_connection_profile_test_endpoint_returns_read_only_success_for_each_protocol( + tmp_path: Path, + fixture_name: str, + expected_protocol: str, + expected_diagnostic_key: str, +) -> None: + client = client_for(tmp_path) + profile = load_profile(fixture_name) + profile["enabled"] = True + client.post("/connection-profiles", json=profile) + + response = client.post(f"/connection-profiles/{profile['id']}/test") + + assert response.status_code == 200 + result = response.json() + assert result["connection_id"] == profile["id"] + assert result["protocol"] == expected_protocol + assert result["status"] == "healthy" + assert result["duration_ms"] >= 0 + assert result["checked_at"] + assert result["message"] == "Read-only connection test completed." + assert result["diagnostics"]["attempted"] is True + assert result["diagnostics"]["read_only"] is True + assert expected_diagnostic_key in result["diagnostics"] + assert "write" not in json.dumps(result["diagnostics"]).lower() + assert "local/" not in json.dumps(result) + assert not (tmp_path / "events.jsonl").exists() + + +def test_connection_profile_test_endpoint_reports_unreachable_target_with_redacted_error( + tmp_path: Path, +) -> None: + client = client_for(tmp_path) + profile = load_profile("opcua_connection_profile.json") + profile["endpoint"] = "opc.tcp://operator:secret-password@unreachable.local:4840/ofi?token=abc" + client.post("/connection-profiles", json=profile) + + response = client.post(f"/connection-profiles/{profile['id']}/test") + + assert response.status_code == 200 + result = response.json() + assert result["status"] == "failed" + assert result["protocol"] == "opcua" + assert result["diagnostics"]["attempted"] is True + assert result["diagnostics"]["read_only"] is True + assert result["diagnostics"]["error_type"] == "ConnectionError" + assert "unreachable" in result["message"] + serialized = json.dumps(result) + assert "secret-password" not in serialized + assert "token=abc" not in serialized + assert "" in serialized + + +def test_connection_profile_test_endpoint_reports_invalid_stored_profile( + tmp_path: Path, +) -> None: + client = client_for(tmp_path) + store_path = tmp_path / "connection_profiles.json" + invalid_profile = load_profile("opcua_connection_profile.json") + invalid_profile["protocol"] = "mqtt" + store_path.write_text(json.dumps([invalid_profile]), encoding="utf-8") + + response = client.post(f"/connection-profiles/{invalid_profile['id']}/test") + + assert response.status_code == 200 + result = response.json() + assert result["connection_id"] == invalid_profile["id"] + assert result["protocol"] == "mqtt" + assert result["status"] == "invalid" + assert result["message"] == "Connection profile failed shared schema validation." + assert result["diagnostics"]["validation_errors"] + + +def test_connection_profile_test_endpoint_skips_disabled_profile(tmp_path: Path) -> None: + client = client_for(tmp_path) + profile = load_profile("bacnet_connection_profile.json") + profile["enabled"] = False + profile["health_state"] = "disabled" + client.post("/connection-profiles", json=profile) + + response = client.post(f"/connection-profiles/{profile['id']}/test") + + assert response.status_code == 200 + result = response.json() + assert result["status"] == "disabled" + assert result["protocol"] == "bacnet" + assert result["message"] == "Connection profile is disabled; no protocol test was attempted." + assert result["diagnostics"] == { + "read_only": True, + "attempted": False, + "reason": "disabled_profile", + } + + +def test_connection_profile_test_endpoint_returns_not_found_for_missing_profile( + tmp_path: Path, +) -> None: + client = client_for(tmp_path) + + response = client.post("/connection-profiles/missing-profile/test") + + assert response.status_code == 404 + assert response.json() == { + "error": { + "code": "connection_profile_not_found", + "message": "Connection profile not found: missing-profile", + } + }