Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions docs/DATA_CONTRACTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -528,13 +528,21 @@ contract:
- `GET /connection-profiles/{profile_id}`
- `PUT /connection-profiles/{profile_id}`
- `POST /connection-profiles/{profile_id}/disable`
- `POST /connection-profiles/{profile_id}/test`
- `DELETE /connection-profiles/{profile_id}`

API responses are browser-facing and redact secret/certificate reference names.
The local JSON store keeps the configured references so backend-only follow-up
work can test connections without asking the browser to hold secrets or
certificate locations.

The test endpoint returns a structured diagnostic result with `connection_id`,
`protocol`, `status`, `checked_at`, `duration_ms`, `message`, and
protocol-specific `diagnostics`. Current tests use fake protocol clients for
deterministic read-only behavior. Results must not include raw credentials,
tokens, private keys, certificates, or configured secret/certificate reference
names.

## Versioning

Use semantic-style schema versions:
Expand Down
54 changes: 54 additions & 0 deletions docs/LEARNING_LOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,60 @@ Use the stored profile contract to build the read-only test-connection API.
Keep actual protocol polling, subscription workers, and tag browsing separate
until the test-connection boundary is implemented and tested.

## 2026-05-23 - Read-only connection test API

### What changed

Added `POST /connection-profiles/{profile_id}/test` for read-only connection
diagnostics over stored OPC-UA, MQTT, and BACnet profiles. The endpoint returns
status, timing, readable messages, and protocol-specific diagnostic details.

### Why it matters

Operators need to verify a configured source before any adapter or ingestion
worker is introduced. This step proves the backend contract and response shape
without opening live protocol sessions, publishing messages, writing tags, or
starting ingestion.

### How it works

The endpoint reads the stored profile data, validates it with the shared schema,
skips disabled profiles, and uses deterministic fake protocol clients for
success and unreachable-target behavior. Results are redacted so credentials,
tokens, private keys, certificate bodies, and configured reference names are not
returned to the browser.

### How to run it

```bash
make api
```

Create a profile with `POST /connection-profiles`, then call:

```text
POST /connection-profiles/{profile_id}/test
```

### How to test it

```bash
.venv/bin/python -m pytest services/api/tests/test_connection_profiles_api.py
make test-integration
```

### Key files

- `services/api/factory_api/connection_tests.py`
- `services/api/factory_api/connection_profiles.py`
- `services/api/factory_api/main.py`
- `services/api/tests/test_connection_profiles_api.py`

### What to learn next

Use this diagnostic response shape when building protocol diagnostics UI. Keep
live adapter implementations and tag/source browsing in their own scoped issues.

## 2026-05-23 - OPC UA demo ingestion worker

### What changed
Expand Down
19 changes: 16 additions & 3 deletions services/api/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,29 @@ Endpoints:
- `GET /connection-profiles/{profile_id}` - read one browser-facing profile.
- `PUT /connection-profiles/{profile_id}` - replace an existing profile.
- `POST /connection-profiles/{profile_id}/disable` - mark a profile disabled.
- `POST /connection-profiles/{profile_id}/test` - run a deterministic
read-only connection diagnostic.
- `DELETE /connection-profiles/{profile_id}` - remove a local profile.

Requests are validated with the shared `ProtocolConnectionProfile` schema from
`packages/factory-events`. Browser-facing responses redact secret and
certificate reference names. They preserve whether a reference is configured and
its purpose, but they do not return the configured reference string.

Creating, updating, disabling, or deleting a profile does not start protocol
ingestion. Read-only test connection behavior and adapter polling are tracked as
separate follow-up work.
Creating, updating, disabling, deleting, or testing a profile does not start
protocol ingestion.

The test endpoint currently uses fake protocol clients so API behavior is
deterministic before live adapter work lands. It validates the stored profile,
skips disabled profiles, simulates unreachable endpoints for readable failure
results, and returns protocol-specific diagnostics without raw credentials,
tokens, private keys, certificates, or configured secret/certificate reference
names. The diagnostic is limited to read-only operations such as connecting,
reading configured OPC-UA nodes, subscribing to configured MQTT topic filters,
or reading configured BACnet objects.

Adapter polling, live protocol sessions, tag browsing, and ingestion workers are
tracked as separate follow-up work.

## Governed Recommendation Audit Reads

Expand Down
19 changes: 16 additions & 3 deletions services/api/factory_api/connection_profiles.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,29 @@ def __init__(self, path: Path) -> None:
self.path.parent.mkdir(parents=True, exist_ok=True)

def list_profiles(self) -> list[ProtocolConnectionProfile]:
if not self.path.exists():
return []
return [
validate_connection_profile(item)
for item in json.loads(self.path.read_text(encoding="utf-8"))
for item in self.list_profile_data()
]

def list_profile_data(self) -> list[dict[str, Any]]:
if not self.path.exists():
return []
return json.loads(self.path.read_text(encoding="utf-8"))

def get_profile(self, profile_id: str) -> ProtocolConnectionProfile | None:
return next((profile for profile in self.list_profiles() if profile.id == profile_id), None)

def get_profile_data(self, profile_id: str) -> dict[str, Any] | None:
return next(
(
profile
for profile in self.list_profile_data()
if isinstance(profile, dict) and profile.get("id") == profile_id
),
None,
)

def create_profile(self, profile: ProtocolConnectionProfile) -> ProtocolConnectionProfile:
stored_profiles = self.list_profiles()
if any(item.id == profile.id for item in stored_profiles):
Expand Down
210 changes: 210 additions & 0 deletions services/api/factory_api/connection_tests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
from __future__ import annotations

import re
from datetime import UTC, datetime
from time import perf_counter
from typing import Any, Literal, Protocol

from factory_events import ProtocolConnectionProfile, validate_connection_profile
from pydantic import BaseModel, ConfigDict, Field, ValidationError

ConnectionTestStatus = Literal["healthy", "failed", "disabled", "invalid"]

SECRET_VALUE_PATTERN = re.compile(
r"(?i)(password|token|private[_-]?key|certificate|cert)([=:])([^@\s,;]+)"
)
URL_CREDENTIAL_PATTERN = re.compile(r"(://)([^/@:]+):([^/@]+)@")


class StrictModel(BaseModel):
model_config = ConfigDict(extra="forbid")


class ConnectionTestResult(StrictModel):
connection_id: str = Field(min_length=1)
protocol: str = Field(min_length=1)
status: ConnectionTestStatus
checked_at: datetime
duration_ms: float = Field(ge=0)
message: str = Field(min_length=1)
diagnostics: dict[str, Any] = Field(default_factory=dict)


class ProtocolTestClient(Protocol):
def test(self, profile: ProtocolConnectionProfile) -> dict[str, Any]:
...


class FakeProtocolTestClient:
def test(self, profile: ProtocolConnectionProfile) -> dict[str, Any]:
if _looks_unreachable(profile.endpoint):
msg = f"{profile.protocol} endpoint is unreachable: {profile.endpoint}"
raise ConnectionError(redact_sensitive_text(msg))

if profile.protocol == "opcua":
if profile.opcua is None:
msg = "OPC-UA profile is missing its config block"
raise ValueError(msg)
return {
"read_only": True,
"allowed_operations": ["connect", "read_configured_nodes"],
"node_count": len(profile.opcua.node_ids),
"security_mode": profile.opcua.security_mode,
"security_policy_configured": profile.opcua.security_policy is not None,
}
if profile.protocol == "mqtt":
if profile.mqtt is None:
msg = "MQTT profile is missing its config block"
raise ValueError(msg)
return {
"read_only": True,
"allowed_operations": ["connect", "subscribe_configured_topics"],
"topic_filter_count": len(profile.mqtt.topic_filters),
"qos": profile.mqtt.qos,
"tls_enabled": profile.mqtt.use_tls,
}
if profile.protocol == "bacnet":
if profile.bacnet is None:
msg = "BACnet profile is missing its config block"
raise ValueError(msg)
return {
"read_only": True,
"allowed_operations": ["connect", "read_configured_objects"],
"device_instance": profile.bacnet.device_instance,
"object_count": len(profile.bacnet.object_references),
"network_number_configured": profile.bacnet.network_number is not None,
}

msg = f"Unsupported protocol: {profile.protocol}"
raise ValueError(msg)


def test_connection_profile_data(
profile_data: dict[str, Any],
*,
client: ProtocolTestClient | None = None,
) -> ConnectionTestResult:
started = perf_counter()
checked_at = datetime.now(UTC)
connection_id = str(profile_data.get("id") or "unknown")
protocol = str(profile_data.get("protocol") or "unknown")

try:
profile = validate_connection_profile(profile_data)
except ValidationError as exc:
return _result(
connection_id=connection_id,
protocol=protocol,
status="invalid",
checked_at=checked_at,
started=started,
message="Connection profile failed shared schema validation.",
diagnostics={
"validation_errors": [
{
"loc": list(error["loc"]),
"msg": str(error["msg"]),
"type": str(error["type"]),
}
for error in exc.errors()
]
},
)

if not profile.enabled:
return _result(
connection_id=profile.id,
protocol=profile.protocol,
status="disabled",
checked_at=checked_at,
started=started,
message="Connection profile is disabled; no protocol test was attempted.",
diagnostics={
"read_only": True,
"attempted": False,
"reason": "disabled_profile",
},
)

test_client = client or FakeProtocolTestClient()
try:
diagnostics = test_client.test(profile)
except (ConnectionError, TimeoutError, ValueError) as exc:
return _result(
connection_id=profile.id,
protocol=profile.protocol,
status="failed",
checked_at=checked_at,
started=started,
message=redact_sensitive_text(str(exc)),
diagnostics={
"read_only": True,
"attempted": True,
"error_type": type(exc).__name__,
},
)

return _result(
connection_id=profile.id,
protocol=profile.protocol,
status="healthy",
checked_at=checked_at,
started=started,
message="Read-only connection test completed.",
diagnostics={
"attempted": True,
"endpoint_checked": True,
**redact_diagnostics(diagnostics),
},
)


def _result(
*,
connection_id: str,
protocol: str,
status: ConnectionTestStatus,
checked_at: datetime,
started: float,
message: str,
diagnostics: dict[str, Any],
) -> ConnectionTestResult:
return ConnectionTestResult(
connection_id=connection_id,
protocol=protocol,
status=status,
checked_at=checked_at,
duration_ms=round((perf_counter() - started) * 1000, 3),
message=redact_sensitive_text(message),
diagnostics=redact_diagnostics(diagnostics),
)


def redact_diagnostics(value: Any) -> Any:
if isinstance(value, str):
return redact_sensitive_text(value)
if isinstance(value, list):
return [redact_diagnostics(item) for item in value]
if isinstance(value, dict):
redacted: dict[str, Any] = {}
for key, item in value.items():
lowered = key.lower()
if any(term in lowered for term in ("password", "token", "private_key", "certificate")):
redacted[key] = "<redacted>"
else:
redacted[key] = redact_diagnostics(item)
return redacted
return value


def redact_sensitive_text(value: str) -> str:
value = URL_CREDENTIAL_PATTERN.sub(r"\1<redacted>@", value)
return SECRET_VALUE_PATTERN.sub(
lambda match: f"{match.group(1)}{match.group(2)}<redacted>",
value,
)


def _looks_unreachable(endpoint: str) -> bool:
lowered = endpoint.lower()
return any(token in lowered for token in ("unreachable", "offline", "timeout", "invalid-host"))
11 changes: 11 additions & 0 deletions services/api/factory_api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
DuplicateConnectionProfileError,
serialize_connection_profile_for_browser,
)
from factory_api.connection_tests import test_connection_profile_data
from factory_api.domain import DomainData, build_demo_domain_data

DEFAULT_CORS_ORIGINS = (
Expand Down Expand Up @@ -207,6 +208,16 @@ def delete_connection_profile(profile_id: str) -> None:
f"Connection profile not found: {profile_id}",
)

@app.post("/connection-profiles/{profile_id}/test")
def test_connection_profile(profile_id: str) -> dict:
profile_data = connection_profile_store().get_profile_data(profile_id)
if profile_data is None:
raise_not_found(
"connection_profile_not_found",
f"Connection profile not found: {profile_id}",
)
return test_connection_profile_data(profile_data).model_dump(mode="json")

@app.get("/events/{event_id}")
def get_event(event_id: str) -> dict:
event = event_store().get_event(event_id)
Expand Down
Loading
Loading