Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 68 additions & 0 deletions docs/DATA_CONTRACTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,74 @@ human-reviewed; high-impact action still requires the governed approval flow.
- Unknown event types must be rejected or sent to a dead-letter path.
- Schema changes require tests and documentation updates.

## Protocol Connection Profiles

Production-oriented protocol connectors use `ProtocolConnectionProfile` in:

```text
packages/factory-events/factory_events/connection_profiles.py
```

This contract defines connection profiles only. It does not store profiles,
open network connections, read tags, browse sources, or write to industrial
systems.

Common profile fields:

- `id`: stable profile identity.
- `name`: operator-facing profile name.
- `protocol`: one of `opcua`, `mqtt`, or `bacnet`.
- `enabled`: whether the profile may be used by later read-only connector work.
- `description`: optional human-readable context.
- `endpoint`: protocol endpoint or network address.
- `acquisition`: poll or subscription settings.
- `mapping_reference`: reference to the source-to-FactoryEvent mapping.
- `health_state`: one of `unknown`, `healthy`, `degraded`, `failed`, or
`disabled`.
- `created_at` and `updated_at`: timezone-aware UTC timestamps.

Protocol-specific config blocks:

- `opcua`: node IDs, namespace URIs, security mode/policy, auth secret
reference, client certificate reference, and trusted server certificate
reference.
- `mqtt`: client ID, topic filters, QoS, TLS flag, auth secret reference, CA
certificate reference, and client certificate reference.
- `bacnet`: device instance, object references, and optional network number.

Validation rules:

- Exactly one protocol-specific config block must be present.
- The config block must match `protocol`.
- MQTT profiles must use subscription acquisition.
- BACnet profiles must use polling acquisition.
- Poll acquisition requires `poll_interval_seconds`.
- Subscription acquisition requires `subscription_interval_seconds`.
- `updated_at` must be greater than or equal to `created_at`.
- Raw secret, token, private key, or certificate body fields are not part of the
schema and are rejected by strict validation.

Secret and certificate handling:

- Profiles contain only `SecretReference` and `CertificateReference` objects.
- References use stable names such as `local/mqtt/packaging/client-auth`.
- Raw passwords, tokens, private keys, and certificate PEM bodies must not be
committed to fixtures, docs, or browser-facing responses.
- Browser/API serialization should use `to_redacted_dict()`, which returns the
profile with references preserved and raw secret values absent.

Fixtures for all three supported protocols live in:

```text
packages/test-fixtures/valid-connection-profiles/
```

Run the contract tests with:

```bash
.venv/bin/python -m pytest packages/factory-events/tests/test_connection_profile_contracts.py
```

## Versioning

Use semantic-style schema versions:
Expand Down
58 changes: 58 additions & 0 deletions docs/LEARNING_LOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,64 @@ Use the ADR to implement the shared connection profile schema without adding
protocol adapter behavior, UI forms, or writeback controls before the safety
boundary is covered by tests.

## 2026-05-23 - Protocol connection profile schema

### What changed

Added the shared `ProtocolConnectionProfile` contract for OPC-UA, MQTT, and
BACnet connection definitions. The change includes protocol-specific config
blocks, acquisition settings, secret/certificate references, protocol fixtures,
and contract tests for valid profiles, invalid protocol combinations, and
redacted serialization.

### Why it matters

Connection management needs a stable contract before API storage, test
connection routes, adapters, or Workbench forms are built. This keeps later work
small and makes the safety boundary testable without opening real industrial
connections.

### How it works

A profile carries common metadata such as `id`, `name`, `protocol`, `enabled`,
`endpoint`, `mapping_reference`, `health_state`, and UTC timestamps. Exactly one
protocol block must match the selected protocol. Secret and certificate fields
store references only, and strict validation rejects raw credential-shaped
payloads.

### How to run it

Use the schema from the shared package:

```python
from factory_events import validate_connection_profile
```

Fixtures live under:

```text
packages/test-fixtures/valid-connection-profiles/
```

### How to test it

```bash
.venv/bin/python -m pytest packages/factory-events/tests/test_connection_profile_contracts.py
make test-contract
```

### Key files

- `packages/factory-events/factory_events/connection_profiles.py`
- `packages/factory-events/tests/test_connection_profile_contracts.py`
- `packages/test-fixtures/valid-connection-profiles/`
- `docs/DATA_CONTRACTS.md`

### What to learn next

Use this contract to implement the connection profile API and storage layer
without adding adapter polling, tag browsing, or industrial writeback behavior.

## 2026-05-23 - OPC UA demo ingestion worker

### What changed
Expand Down
6 changes: 6 additions & 0 deletions packages/factory-events/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,9 @@ backward-compatible name for existing service code.
The current MVP contract includes typed envelopes for process measurements,
quality measurements, asset status updates, batch lifecycle events, work order
lifecycle events, and governed recommendation proposals.

`ProtocolConnectionProfile` defines the shared read-only connection profile
contract for future OPC-UA, MQTT, and BACnet connector work. It captures common
profile metadata, acquisition settings, health state, mapping reference,
protocol-specific config blocks, and secret/certificate references without raw
credential values.
18 changes: 18 additions & 0 deletions packages/factory-events/factory_events/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,13 @@
from factory_events.connection_profiles import (
AcquisitionSettings,
BacnetConnectionConfig,
CertificateReference,
MqttConnectionConfig,
OpcUaConnectionConfig,
ProtocolConnectionProfile,
SecretReference,
validate_connection_profile,
)
from factory_events.models import (
ApprovalDecisionPayload,
AssetEvent,
Expand Down Expand Up @@ -25,27 +35,35 @@
)

__all__ = [
"AcquisitionSettings",
"ApprovalDecisionPayload",
"AssetEvent",
"AssetEventPayload",
"AuditEventPayload",
"BacnetConnectionConfig",
"BatchEvent",
"BatchEventPayload",
"CertificateReference",
"EventContext",
"EventEnvelope",
"EventMetadata",
"EventSource",
"FactoryEvent",
"MqttConnectionConfig",
"OpcUaConnectionConfig",
"ProcessMeasurementPayload",
"ProcessSignalEvent",
"ProtocolConnectionProfile",
"QualityEvent",
"QualityMeasurementPayload",
"RecommendationEvent",
"RecommendationPayload",
"SecretReference",
"SentinelDetectionPayload",
"UnsupportedEventTypeError",
"WorkOrderEvent",
"WorkOrderEventPayload",
"payload_model_for_event_type",
"validate_connection_profile",
"validate_event",
]
134 changes: 134 additions & 0 deletions packages/factory-events/factory_events/connection_profiles.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
from __future__ import annotations

from datetime import UTC, datetime
from typing import Any, Literal

from pydantic import BaseModel, ConfigDict, Field, field_validator, model_validator

Protocol = Literal["opcua", "mqtt", "bacnet"]
ConnectionHealthState = Literal["unknown", "healthy", "degraded", "failed", "disabled"]
AcquisitionMode = Literal["poll", "subscribe"]


class StrictModel(BaseModel):
model_config = ConfigDict(extra="forbid")


class SecretReference(StrictModel):
ref: str = Field(min_length=1)
purpose: Literal["username_password", "token", "client_key", "other"]


class CertificateReference(StrictModel):
ref: str = Field(min_length=1)
purpose: Literal["client_certificate", "trusted_server_certificate", "ca_certificate"]


class AcquisitionSettings(StrictModel):
mode: AcquisitionMode
poll_interval_seconds: float | None = Field(default=None, gt=0)
subscription_interval_seconds: float | None = Field(default=None, gt=0)

@model_validator(mode="after")
def validate_mode_settings(self) -> AcquisitionSettings:
if self.mode == "poll":
if self.poll_interval_seconds is None:
msg = "poll acquisition requires poll_interval_seconds"
raise ValueError(msg)
if self.subscription_interval_seconds is not None:
msg = "poll acquisition cannot define subscription_interval_seconds"
raise ValueError(msg)
if self.mode == "subscribe":
if self.subscription_interval_seconds is None:
msg = "subscribe acquisition requires subscription_interval_seconds"
raise ValueError(msg)
if self.poll_interval_seconds is not None:
msg = "subscribe acquisition cannot define poll_interval_seconds"
raise ValueError(msg)
return self


class OpcUaConnectionConfig(StrictModel):
node_ids: list[str] = Field(min_length=1)
namespace_uris: list[str] = Field(default_factory=list)
security_mode: Literal["none", "sign", "sign_and_encrypt"] = "none"
security_policy: str | None = None
auth_secret_ref: SecretReference | None = None
client_certificate_ref: CertificateReference | None = None
trusted_server_certificate_ref: CertificateReference | None = None


class MqttConnectionConfig(StrictModel):
client_id: str = Field(min_length=1)
topic_filters: list[str] = Field(min_length=1)
qos: Literal[0, 1, 2] = 0
use_tls: bool = True
auth_secret_ref: SecretReference | None = None
ca_certificate_ref: CertificateReference | None = None
client_certificate_ref: CertificateReference | None = None


class BacnetConnectionConfig(StrictModel):
device_instance: int = Field(ge=0, le=4_194_303)
object_references: list[str] = Field(min_length=1)
network_number: int | None = Field(default=None, ge=0, le=65_535)


class ProtocolConnectionProfile(StrictModel):
id: str = Field(min_length=1)
name: str = Field(min_length=1)
protocol: Protocol
enabled: bool
description: str | None = None
endpoint: str = Field(min_length=1)
acquisition: AcquisitionSettings
mapping_reference: str = Field(min_length=1)
health_state: ConnectionHealthState = "unknown"
created_at: datetime
updated_at: datetime
opcua: OpcUaConnectionConfig | None = None
mqtt: MqttConnectionConfig | None = None
bacnet: BacnetConnectionConfig | None = None

@field_validator("created_at", "updated_at")
@classmethod
def require_utc_timestamp(cls, value: datetime) -> datetime:
if value.tzinfo is None or value.utcoffset() != UTC.utcoffset(value):
msg = "connection profile timestamps must be timezone-aware UTC datetimes"
raise ValueError(msg)
return value

@model_validator(mode="after")
def validate_profile_rules(self) -> ProtocolConnectionProfile:
if self.updated_at < self.created_at:
msg = "updated_at must be greater than or equal to created_at"
raise ValueError(msg)

configured_protocols = [
protocol
for protocol, config in {
"opcua": self.opcua,
"mqtt": self.mqtt,
"bacnet": self.bacnet,
}.items()
if config is not None
]
if configured_protocols != [self.protocol]:
msg = "profile.protocol must match exactly one protocol config block"
raise ValueError(msg)

if self.protocol == "mqtt" and self.acquisition.mode != "subscribe":
msg = "mqtt profiles must use subscribe acquisition"
raise ValueError(msg)
if self.protocol == "bacnet" and self.acquisition.mode != "poll":
msg = "bacnet profiles must use poll acquisition"
raise ValueError(msg)
return self

def to_redacted_dict(self) -> dict[str, Any]:
"""Serialize the profile for APIs without raw credential or certificate values."""
return self.model_dump(mode="json", exclude_none=True)


def validate_connection_profile(data: dict[str, Any]) -> ProtocolConnectionProfile:
return ProtocolConnectionProfile.model_validate(data)
Loading
Loading