diff --git a/docs/DATA_CONTRACTS.md b/docs/DATA_CONTRACTS.md index 8ee2fc9..c1b7821 100644 --- a/docs/DATA_CONTRACTS.md +++ b/docs/DATA_CONTRACTS.md @@ -452,6 +452,74 @@ human-reviewed; high-impact action still requires the governed approval flow. - Unknown event types must be rejected or sent to a dead-letter path. - Schema changes require tests and documentation updates. +## Protocol Connection Profiles + +Production-oriented protocol connectors use `ProtocolConnectionProfile` in: + +```text +packages/factory-events/factory_events/connection_profiles.py +``` + +This contract defines connection profiles only. It does not store profiles, +open network connections, read tags, browse sources, or write to industrial +systems. + +Common profile fields: + +- `id`: stable profile identity. +- `name`: operator-facing profile name. +- `protocol`: one of `opcua`, `mqtt`, or `bacnet`. +- `enabled`: whether the profile may be used by later read-only connector work. +- `description`: optional human-readable context. +- `endpoint`: protocol endpoint or network address. +- `acquisition`: poll or subscription settings. +- `mapping_reference`: reference to the source-to-FactoryEvent mapping. +- `health_state`: one of `unknown`, `healthy`, `degraded`, `failed`, or + `disabled`. +- `created_at` and `updated_at`: timezone-aware UTC timestamps. + +Protocol-specific config blocks: + +- `opcua`: node IDs, namespace URIs, security mode/policy, auth secret + reference, client certificate reference, and trusted server certificate + reference. +- `mqtt`: client ID, topic filters, QoS, TLS flag, auth secret reference, CA + certificate reference, and client certificate reference. +- `bacnet`: device instance, object references, and optional network number. + +Validation rules: + +- Exactly one protocol-specific config block must be present. +- The config block must match `protocol`. +- MQTT profiles must use subscription acquisition. +- BACnet profiles must use polling acquisition. +- Poll acquisition requires `poll_interval_seconds`. +- Subscription acquisition requires `subscription_interval_seconds`. +- `updated_at` must be greater than or equal to `created_at`. +- Raw secret, token, private key, or certificate body fields are not part of the + schema and are rejected by strict validation. + +Secret and certificate handling: + +- Profiles contain only `SecretReference` and `CertificateReference` objects. +- References use stable names such as `local/mqtt/packaging/client-auth`. +- Raw passwords, tokens, private keys, and certificate PEM bodies must not be + committed to fixtures, docs, or browser-facing responses. +- Browser/API serialization should use `to_redacted_dict()`, which returns the + profile with references preserved and raw secret values absent. + +Fixtures for all three supported protocols live in: + +```text +packages/test-fixtures/valid-connection-profiles/ +``` + +Run the contract tests with: + +```bash +.venv/bin/python -m pytest packages/factory-events/tests/test_connection_profile_contracts.py +``` + ## Versioning Use semantic-style schema versions: diff --git a/docs/LEARNING_LOG.md b/docs/LEARNING_LOG.md index 4159010..b27554b 100644 --- a/docs/LEARNING_LOG.md +++ b/docs/LEARNING_LOG.md @@ -72,6 +72,64 @@ Use the ADR to implement the shared connection profile schema without adding protocol adapter behavior, UI forms, or writeback controls before the safety boundary is covered by tests. +## 2026-05-23 - Protocol connection profile schema + +### What changed + +Added the shared `ProtocolConnectionProfile` contract for OPC-UA, MQTT, and +BACnet connection definitions. The change includes protocol-specific config +blocks, acquisition settings, secret/certificate references, protocol fixtures, +and contract tests for valid profiles, invalid protocol combinations, and +redacted serialization. + +### Why it matters + +Connection management needs a stable contract before API storage, test +connection routes, adapters, or Workbench forms are built. This keeps later work +small and makes the safety boundary testable without opening real industrial +connections. + +### How it works + +A profile carries common metadata such as `id`, `name`, `protocol`, `enabled`, +`endpoint`, `mapping_reference`, `health_state`, and UTC timestamps. Exactly one +protocol block must match the selected protocol. Secret and certificate fields +store references only, and strict validation rejects raw credential-shaped +payloads. + +### How to run it + +Use the schema from the shared package: + +```python +from factory_events import validate_connection_profile +``` + +Fixtures live under: + +```text +packages/test-fixtures/valid-connection-profiles/ +``` + +### How to test it + +```bash +.venv/bin/python -m pytest packages/factory-events/tests/test_connection_profile_contracts.py +make test-contract +``` + +### Key files + +- `packages/factory-events/factory_events/connection_profiles.py` +- `packages/factory-events/tests/test_connection_profile_contracts.py` +- `packages/test-fixtures/valid-connection-profiles/` +- `docs/DATA_CONTRACTS.md` + +### What to learn next + +Use this contract to implement the connection profile API and storage layer +without adding adapter polling, tag browsing, or industrial writeback behavior. + ## 2026-05-23 - OPC UA demo ingestion worker ### What changed diff --git a/packages/factory-events/README.md b/packages/factory-events/README.md index 2bbe4df..fc886ab 100644 --- a/packages/factory-events/README.md +++ b/packages/factory-events/README.md @@ -14,3 +14,9 @@ backward-compatible name for existing service code. The current MVP contract includes typed envelopes for process measurements, quality measurements, asset status updates, batch lifecycle events, work order lifecycle events, and governed recommendation proposals. + +`ProtocolConnectionProfile` defines the shared read-only connection profile +contract for future OPC-UA, MQTT, and BACnet connector work. It captures common +profile metadata, acquisition settings, health state, mapping reference, +protocol-specific config blocks, and secret/certificate references without raw +credential values. diff --git a/packages/factory-events/factory_events/__init__.py b/packages/factory-events/factory_events/__init__.py index d2a1107..960a4a1 100644 --- a/packages/factory-events/factory_events/__init__.py +++ b/packages/factory-events/factory_events/__init__.py @@ -1,3 +1,13 @@ +from factory_events.connection_profiles import ( + AcquisitionSettings, + BacnetConnectionConfig, + CertificateReference, + MqttConnectionConfig, + OpcUaConnectionConfig, + ProtocolConnectionProfile, + SecretReference, + validate_connection_profile, +) from factory_events.models import ( ApprovalDecisionPayload, AssetEvent, @@ -25,27 +35,35 @@ ) __all__ = [ + "AcquisitionSettings", "ApprovalDecisionPayload", "AssetEvent", "AssetEventPayload", "AuditEventPayload", + "BacnetConnectionConfig", "BatchEvent", "BatchEventPayload", + "CertificateReference", "EventContext", "EventEnvelope", "EventMetadata", "EventSource", "FactoryEvent", + "MqttConnectionConfig", + "OpcUaConnectionConfig", "ProcessMeasurementPayload", "ProcessSignalEvent", + "ProtocolConnectionProfile", "QualityEvent", "QualityMeasurementPayload", "RecommendationEvent", "RecommendationPayload", + "SecretReference", "SentinelDetectionPayload", "UnsupportedEventTypeError", "WorkOrderEvent", "WorkOrderEventPayload", "payload_model_for_event_type", + "validate_connection_profile", "validate_event", ] diff --git a/packages/factory-events/factory_events/connection_profiles.py b/packages/factory-events/factory_events/connection_profiles.py new file mode 100644 index 0000000..5e40c33 --- /dev/null +++ b/packages/factory-events/factory_events/connection_profiles.py @@ -0,0 +1,134 @@ +from __future__ import annotations + +from datetime import UTC, datetime +from typing import Any, Literal + +from pydantic import BaseModel, ConfigDict, Field, field_validator, model_validator + +Protocol = Literal["opcua", "mqtt", "bacnet"] +ConnectionHealthState = Literal["unknown", "healthy", "degraded", "failed", "disabled"] +AcquisitionMode = Literal["poll", "subscribe"] + + +class StrictModel(BaseModel): + model_config = ConfigDict(extra="forbid") + + +class SecretReference(StrictModel): + ref: str = Field(min_length=1) + purpose: Literal["username_password", "token", "client_key", "other"] + + +class CertificateReference(StrictModel): + ref: str = Field(min_length=1) + purpose: Literal["client_certificate", "trusted_server_certificate", "ca_certificate"] + + +class AcquisitionSettings(StrictModel): + mode: AcquisitionMode + poll_interval_seconds: float | None = Field(default=None, gt=0) + subscription_interval_seconds: float | None = Field(default=None, gt=0) + + @model_validator(mode="after") + def validate_mode_settings(self) -> AcquisitionSettings: + if self.mode == "poll": + if self.poll_interval_seconds is None: + msg = "poll acquisition requires poll_interval_seconds" + raise ValueError(msg) + if self.subscription_interval_seconds is not None: + msg = "poll acquisition cannot define subscription_interval_seconds" + raise ValueError(msg) + if self.mode == "subscribe": + if self.subscription_interval_seconds is None: + msg = "subscribe acquisition requires subscription_interval_seconds" + raise ValueError(msg) + if self.poll_interval_seconds is not None: + msg = "subscribe acquisition cannot define poll_interval_seconds" + raise ValueError(msg) + return self + + +class OpcUaConnectionConfig(StrictModel): + node_ids: list[str] = Field(min_length=1) + namespace_uris: list[str] = Field(default_factory=list) + security_mode: Literal["none", "sign", "sign_and_encrypt"] = "none" + security_policy: str | None = None + auth_secret_ref: SecretReference | None = None + client_certificate_ref: CertificateReference | None = None + trusted_server_certificate_ref: CertificateReference | None = None + + +class MqttConnectionConfig(StrictModel): + client_id: str = Field(min_length=1) + topic_filters: list[str] = Field(min_length=1) + qos: Literal[0, 1, 2] = 0 + use_tls: bool = True + auth_secret_ref: SecretReference | None = None + ca_certificate_ref: CertificateReference | None = None + client_certificate_ref: CertificateReference | None = None + + +class BacnetConnectionConfig(StrictModel): + device_instance: int = Field(ge=0, le=4_194_303) + object_references: list[str] = Field(min_length=1) + network_number: int | None = Field(default=None, ge=0, le=65_535) + + +class ProtocolConnectionProfile(StrictModel): + id: str = Field(min_length=1) + name: str = Field(min_length=1) + protocol: Protocol + enabled: bool + description: str | None = None + endpoint: str = Field(min_length=1) + acquisition: AcquisitionSettings + mapping_reference: str = Field(min_length=1) + health_state: ConnectionHealthState = "unknown" + created_at: datetime + updated_at: datetime + opcua: OpcUaConnectionConfig | None = None + mqtt: MqttConnectionConfig | None = None + bacnet: BacnetConnectionConfig | None = None + + @field_validator("created_at", "updated_at") + @classmethod + def require_utc_timestamp(cls, value: datetime) -> datetime: + if value.tzinfo is None or value.utcoffset() != UTC.utcoffset(value): + msg = "connection profile timestamps must be timezone-aware UTC datetimes" + raise ValueError(msg) + return value + + @model_validator(mode="after") + def validate_profile_rules(self) -> ProtocolConnectionProfile: + if self.updated_at < self.created_at: + msg = "updated_at must be greater than or equal to created_at" + raise ValueError(msg) + + configured_protocols = [ + protocol + for protocol, config in { + "opcua": self.opcua, + "mqtt": self.mqtt, + "bacnet": self.bacnet, + }.items() + if config is not None + ] + if configured_protocols != [self.protocol]: + msg = "profile.protocol must match exactly one protocol config block" + raise ValueError(msg) + + if self.protocol == "mqtt" and self.acquisition.mode != "subscribe": + msg = "mqtt profiles must use subscribe acquisition" + raise ValueError(msg) + if self.protocol == "bacnet" and self.acquisition.mode != "poll": + msg = "bacnet profiles must use poll acquisition" + raise ValueError(msg) + return self + + def to_redacted_dict(self) -> dict[str, Any]: + """Serialize the profile for APIs without raw credential or certificate values.""" + return self.model_dump(mode="json", exclude_none=True) + + +def validate_connection_profile(data: dict[str, Any]) -> ProtocolConnectionProfile: + return ProtocolConnectionProfile.model_validate(data) diff --git a/packages/factory-events/tests/test_connection_profile_contracts.py b/packages/factory-events/tests/test_connection_profile_contracts.py new file mode 100644 index 0000000..14a330a --- /dev/null +++ b/packages/factory-events/tests/test_connection_profile_contracts.py @@ -0,0 +1,123 @@ +from __future__ import annotations + +import json +from pathlib import Path +from typing import Any + +import pytest +from factory_events import validate_connection_profile +from pydantic import ValidationError + +REPO_ROOT = Path(__file__).resolve().parents[3] +FIXTURES = REPO_ROOT / "packages" / "test-fixtures" / "valid-connection-profiles" + +VALID_PROFILE_FIXTURES = [ + ("opcua_connection_profile.json", "opcua"), + ("mqtt_connection_profile.json", "mqtt"), + ("bacnet_connection_profile.json", "bacnet"), +] + + +def load_fixture(name: str) -> dict[str, Any]: + return json.loads((FIXTURES / name).read_text()) + + +@pytest.mark.parametrize(("fixture_name", "expected_protocol"), VALID_PROFILE_FIXTURES) +def test_valid_connection_profile_fixtures_validate( + fixture_name: str, + expected_protocol: str, +) -> None: + profile = validate_connection_profile(load_fixture(fixture_name)) + + assert profile.protocol == expected_protocol + assert profile.id + assert profile.name + assert profile.endpoint + assert profile.mapping_reference + assert profile.created_at <= profile.updated_at + + +def test_opcua_profile_defines_read_settings_and_secret_references() -> None: + profile = validate_connection_profile(load_fixture("opcua_connection_profile.json")) + + assert profile.protocol == "opcua" + assert profile.acquisition.mode == "poll" + assert profile.acquisition.poll_interval_seconds == 5 + assert profile.opcua is not None + assert profile.opcua.node_ids == [ + "ns=2;s=Packaging.Line1.Filler1.FillWeight", + "ns=2;s=Packaging.Line1.Filler1.Temperature", + ] + assert profile.opcua.auth_secret_ref is not None + assert profile.opcua.auth_secret_ref.ref == "local/opcua/filler-1/user" + assert profile.opcua.client_certificate_ref is not None + + +def test_mqtt_profile_requires_subscription_settings() -> None: + profile = validate_connection_profile(load_fixture("mqtt_connection_profile.json")) + + assert profile.protocol == "mqtt" + assert profile.acquisition.mode == "subscribe" + assert profile.acquisition.subscription_interval_seconds == 1 + assert profile.mqtt is not None + assert profile.mqtt.topic_filters == [ + "ofi/site_demo/packaging/line_1/+/telemetry", + "ofi/site_demo/packaging/line_1/+/quality", + ] + + +def test_bacnet_profile_defines_poll_settings_and_object_references() -> None: + profile = validate_connection_profile(load_fixture("bacnet_connection_profile.json")) + + assert profile.protocol == "bacnet" + assert profile.enabled is False + assert profile.health_state == "disabled" + assert profile.acquisition.mode == "poll" + assert profile.bacnet is not None + assert profile.bacnet.device_instance == 12001 + assert profile.bacnet.object_references == [ + "analogInput:1.presentValue", + "analogInput:2.presentValue", + ] + + +def test_profile_rejects_protocol_config_mismatch() -> None: + profile = load_fixture("opcua_connection_profile.json") + profile["protocol"] = "mqtt" + + with pytest.raises(ValidationError, match="profile.protocol must match exactly one"): + validate_connection_profile(profile) + + +def test_profile_rejects_mqtt_polling_combination() -> None: + profile = load_fixture("mqtt_connection_profile.json") + profile["acquisition"] = {"mode": "poll", "poll_interval_seconds": 15} + + with pytest.raises(ValidationError, match="mqtt profiles must use subscribe acquisition"): + validate_connection_profile(profile) + + +def test_profile_rejects_raw_secret_values() -> None: + profile = load_fixture("opcua_connection_profile.json") + profile["opcua"]["auth_secret_ref"]["password"] = "not-allowed" + + with pytest.raises(ValidationError) as error: + validate_connection_profile(profile) + + errors = error.value.errors() + assert any(item["loc"] == ("opcua", "auth_secret_ref", "password") for item in errors) + + +def test_redacted_serialization_returns_references_without_raw_secret_fields() -> None: + profile = validate_connection_profile(load_fixture("mqtt_connection_profile.json")) + + redacted = profile.to_redacted_dict() + + assert redacted["mqtt"]["auth_secret_ref"] == { + "ref": "local/mqtt/packaging/client-auth", + "purpose": "username_password", + } + serialized = json.dumps(redacted) + assert "password" not in serialized.lower().replace("username_password", "") + assert "private_key" not in serialized.lower() + assert "certificate_pem" not in serialized.lower() diff --git a/packages/test-fixtures/valid-connection-profiles/bacnet_connection_profile.json b/packages/test-fixtures/valid-connection-profiles/bacnet_connection_profile.json new file mode 100644 index 0000000..36bc9c4 --- /dev/null +++ b/packages/test-fixtures/valid-connection-profiles/bacnet_connection_profile.json @@ -0,0 +1,24 @@ +{ + "id": "conn_bacnet_hvac_line_1", + "name": "Line 1 HVAC BACnet", + "protocol": "bacnet", + "enabled": false, + "description": "Read-only BACnet profile for environmental process context.", + "endpoint": "192.0.2.20:47808", + "acquisition": { + "mode": "poll", + "poll_interval_seconds": 30 + }, + "mapping_reference": "mappings/packaging/bacnet/environment-v1", + "health_state": "disabled", + "created_at": "2026-05-23T12:00:00Z", + "updated_at": "2026-05-23T12:00:00Z", + "bacnet": { + "device_instance": 12001, + "network_number": 10, + "object_references": [ + "analogInput:1.presentValue", + "analogInput:2.presentValue" + ] + } +} diff --git a/packages/test-fixtures/valid-connection-profiles/mqtt_connection_profile.json b/packages/test-fixtures/valid-connection-profiles/mqtt_connection_profile.json new file mode 100644 index 0000000..6f7061d --- /dev/null +++ b/packages/test-fixtures/valid-connection-profiles/mqtt_connection_profile.json @@ -0,0 +1,37 @@ +{ + "id": "conn_mqtt_packaging_uns", + "name": "Packaging UNS MQTT", + "protocol": "mqtt", + "enabled": true, + "description": "Read-only MQTT subscription for packaging telemetry topics.", + "endpoint": "mqtts://broker.local:8883", + "acquisition": { + "mode": "subscribe", + "subscription_interval_seconds": 1 + }, + "mapping_reference": "mappings/packaging/mqtt/uns-v1", + "health_state": "unknown", + "created_at": "2026-05-23T12:00:00Z", + "updated_at": "2026-05-23T12:00:00Z", + "mqtt": { + "client_id": "ofi-fip-packaging-readonly", + "topic_filters": [ + "ofi/site_demo/packaging/line_1/+/telemetry", + "ofi/site_demo/packaging/line_1/+/quality" + ], + "qos": 1, + "use_tls": true, + "auth_secret_ref": { + "ref": "local/mqtt/packaging/client-auth", + "purpose": "username_password" + }, + "ca_certificate_ref": { + "ref": "local/mqtt/packaging/ca-cert", + "purpose": "ca_certificate" + }, + "client_certificate_ref": { + "ref": "local/mqtt/packaging/client-cert", + "purpose": "client_certificate" + } + } +} diff --git a/packages/test-fixtures/valid-connection-profiles/opcua_connection_profile.json b/packages/test-fixtures/valid-connection-profiles/opcua_connection_profile.json new file mode 100644 index 0000000..fd5bf4e --- /dev/null +++ b/packages/test-fixtures/valid-connection-profiles/opcua_connection_profile.json @@ -0,0 +1,39 @@ +{ + "id": "conn_opcua_filler_1", + "name": "Filler 1 OPC-UA", + "protocol": "opcua", + "enabled": true, + "description": "Read-only OPC-UA profile for filler process measurements.", + "endpoint": "opc.tcp://127.0.0.1:4840/ofi/demo", + "acquisition": { + "mode": "poll", + "poll_interval_seconds": 5 + }, + "mapping_reference": "mappings/filler_1/opcua/process-signals-v1", + "health_state": "unknown", + "created_at": "2026-05-23T12:00:00Z", + "updated_at": "2026-05-23T12:00:00Z", + "opcua": { + "node_ids": [ + "ns=2;s=Packaging.Line1.Filler1.FillWeight", + "ns=2;s=Packaging.Line1.Filler1.Temperature" + ], + "namespace_uris": [ + "urn:ofi:demo:packaging" + ], + "security_mode": "sign_and_encrypt", + "security_policy": "Basic256Sha256", + "auth_secret_ref": { + "ref": "local/opcua/filler-1/user", + "purpose": "username_password" + }, + "client_certificate_ref": { + "ref": "local/opcua/filler-1/client-cert", + "purpose": "client_certificate" + }, + "trusted_server_certificate_ref": { + "ref": "local/opcua/filler-1/server-cert", + "purpose": "trusted_server_certificate" + } + } +}