From 6f0d5c1d338c4ca368e3bc1757713c44056bfc86 Mon Sep 17 00:00:00 2001 From: Aldon Smith Date: Fri, 22 May 2026 21:51:44 -0400 Subject: [PATCH] feat: add OPC UA demo ingestion worker --- Makefile | 10 +- docs/LEARNING_LOG.md | 46 +++ docs/demo/MANUFACTURER_DEMO_RUNBOOK.md | 3 + services/ingestion/README.md | 77 ++++ .../factory_ingestion/opcua_demo_worker.py | 341 ++++++++++++++++++ .../ingestion/tests/test_opcua_demo_worker.py | 188 ++++++++++ 6 files changed, 664 insertions(+), 1 deletion(-) create mode 100644 services/ingestion/factory_ingestion/opcua_demo_worker.py create mode 100644 services/ingestion/tests/test_opcua_demo_worker.py diff --git a/Makefile b/Makefile index 205de71..3ecf3f1 100644 --- a/Makefile +++ b/Makefile @@ -18,10 +18,14 @@ DEMO_OUTPUT ?= $(DEMO_EVENTS_DIR)/$(DEMO_SCENARIO).jsonl DEMO_EVENTS_STORE ?= $(DEMO_STORAGE_DIR)/$(DEMO_SCENARIO)_events.jsonl DEMO_DEAD_LETTER ?= $(DEMO_STORAGE_DIR)/$(DEMO_SCENARIO)_dead_letter.jsonl DEMO_SENTINEL_STATE_DIR ?= $(DEMO_STORAGE_DIR)/$(DEMO_SCENARIO)_sentinel +OPCUA_ENDPOINT ?= opc.tcp://localhost:4840/ofi/demo +OPCUA_POLL_COUNT ?= 6 +OPCUA_POLL_INTERVAL ?= 1 +OPCUA_EVENTS_STORE ?= .local/storage/opcua_demo_events.jsonl PYTHONPATH := packages/factory-events:services/simulator:services/ingestion:services/process-sentinel:services/api export PYTHONPATH -.PHONY: help setup dev dev-db simulate ingest sentinel-run demo demo-reset demo-data demo-ingest demo-sentinel-run demo-api-smoke api api-reload test test-unit test-integration test-contract test-e2e lint typecheck docs +.PHONY: help setup dev dev-db simulate ingest opcua-demo-ingest sentinel-run demo demo-reset demo-data demo-ingest demo-sentinel-run demo-api-smoke api api-reload test test-unit test-integration test-contract test-e2e lint typecheck docs help: @echo "Factory Intelligence Platform" @@ -31,6 +35,7 @@ help: @echo " make dev-db Start local PostgreSQL with Docker Compose" @echo " make simulate Generate simulator JSONL events" @echo " make ingest Validate and ingest simulator events" + @echo " make opcua-demo-ingest Poll the local demo OPC UA server into FactoryEvents" @echo " make sentinel-run Run Process Sentinel over ingested events" @echo " make demo Prepare and verify deterministic demo state" @echo " make demo-reset Clear generated local demo files" @@ -71,6 +76,9 @@ simulate: ingest: $(PYTHON) -m factory_ingestion.cli --input $(INPUT) --events-store $(EVENTS_STORE) +opcua-demo-ingest: + $(PYTHON) -m factory_ingestion.opcua_demo_worker --endpoint $(OPCUA_ENDPOINT) --events-store $(OPCUA_EVENTS_STORE) --poll-count $(OPCUA_POLL_COUNT) --poll-interval $(OPCUA_POLL_INTERVAL) + sentinel-run: $(PYTHON) -m process_sentinel.cli --events-store $(EVENTS_STORE) --state-dir $(SENTINEL_STATE_DIR) diff --git a/docs/LEARNING_LOG.md b/docs/LEARNING_LOG.md index fbea380..be3cc84 100644 --- a/docs/LEARNING_LOG.md +++ b/docs/LEARNING_LOG.md @@ -22,6 +22,52 @@ This file should be updated by Codex after each meaningful change. ### What to learn next ``` +## 2026-05-23 - OPC UA demo ingestion worker + +### What changed + +Added a demo-only OPC UA ingestion worker that polls the local OPC UA simulator +and writes normalized FactoryEvents to a JSONL event store. + +### Why it matters + +The demo OPC UA server can now feed the existing ingestion and Process Sentinel +path without adding a production OPC UA connector or browse-all-tags framework. + +### How it works + +The worker reads the configured demo namespace tags, builds work-order, batch, +process measurement, and quality measurement events, validates them through the +shared FactoryEvent models, and stores them with the existing `JsonlEventStore`. +The default six one-second polls are sized for local demo use without flooding +storage. + +### How to run it + +```bash +docker compose -f infra/docker/docker-compose.yml up --build opcua-simulator +make opcua-demo-ingest +``` + +### How to test it + +```bash +.venv/bin/python -m pytest services/ingestion/tests/test_opcua_demo_worker.py +``` + +### Key files + +- `services/ingestion/factory_ingestion/opcua_demo_worker.py` +- `services/ingestion/tests/test_opcua_demo_worker.py` +- `services/ingestion/README.md` +- `Makefile` + +### What to learn next + +Use the worker output as the source for a future demo smoke test that runs the +OPC UA simulator, polls values, and then runs Process Sentinel over the accepted +event store. + ## 2026-05-22 - OPC UA demo drift controls ### What changed diff --git a/docs/demo/MANUFACTURER_DEMO_RUNBOOK.md b/docs/demo/MANUFACTURER_DEMO_RUNBOOK.md index 8dba71f..32e81dc 100644 --- a/docs/demo/MANUFACTURER_DEMO_RUNBOOK.md +++ b/docs/demo/MANUFACTURER_DEMO_RUNBOOK.md @@ -315,6 +315,9 @@ tags to FactoryEvent process and quality events, see For the Dockerized OPC UA demo simulator endpoint, startup command, and tag list, see `services/simulator/README.md`. +For the demo OPC UA ingestion worker command, output path, polling interval, and +demo-only boundary, see `services/ingestion/README.md`. + For the screen-by-screen user goal, component, data dependency, safety language, risk, and success-criteria map, see `docs/demo/MANUFACTURER_DEMO_USER_JOURNEY.md`. diff --git a/services/ingestion/README.md b/services/ingestion/README.md index 2e5f719..ca23878 100644 --- a/services/ingestion/README.md +++ b/services/ingestion/README.md @@ -110,6 +110,83 @@ The equivalent direct command is: --dead-letter .local/storage/dead_letter.jsonl ``` +## OPC UA Demo Ingestion Worker + +The demo OPC UA ingestion worker polls the local simulator-backed OPC UA server +and writes normalized FactoryEvents to the same JSONL event store used by the +existing ingestion and Process Sentinel path. This is demo infrastructure, not a +production OPC UA connector, browse-all-tags implementation, certificate model, +or high-availability worker. + +Start the demo OPC UA server in one terminal: + +```bash +docker compose -f infra/docker/docker-compose.yml up --build opcua-simulator +``` + +Poll the demo tags into FactoryEvents from another terminal: + +```bash +make opcua-demo-ingest +``` + +Default worker settings: + +| Setting | Default | +| --- | --- | +| Endpoint | `opc.tcp://localhost:4840/ofi/demo` | +| Poll count | `6` | +| Poll interval | `1` second | +| Output path | `.local/storage/opcua_demo_events.jsonl` | + +The default six one-second polls are compressed for the 8-10 minute demo. They +produce enough process and quality events to inspect locally without flooding +the JSONL store. + +Useful command variants: + +```bash +make opcua-demo-ingest \ + OPCUA_ENDPOINT=opc.tcp://localhost:4840/ofi/demo \ + OPCUA_POLL_COUNT=10 \ + OPCUA_POLL_INTERVAL=1 \ + OPCUA_EVENTS_STORE=.local/storage/opcua_demo_events.jsonl +``` + +The equivalent direct command is: + +```bash +.venv/bin/python -m factory_ingestion.opcua_demo_worker \ + --endpoint opc.tcp://localhost:4840/ofi/demo \ + --poll-count 6 \ + --poll-interval 1 \ + --events-store .local/storage/opcua_demo_events.jsonl +``` + +The worker reads the configured demo namespace tags for Greenville Demo Site, +Line 2, Filler F-201, and Checkweigher CW-201. It writes: + +- One work-order started event and one batch started event on the first poll so + product context is present in the accepted store. +- Process measurement events for `filler_f_201.fill_weight`, + `filler_f_201.filler_nozzle_pressure`, and `line_2.line_speed`. +- A quality measurement event for `Final Fill Weight`. + +Expected summary: + +```text +opc ua demo ingestion summary +endpoint: opc.tcp://localhost:4840/ofi/demo +poll_count: 6 +poll_interval_seconds: 1 +emitted_events: 26 +accepted_output: .local/storage/opcua_demo_events.jsonl +demo_boundary: simulator-backed demo OPC UA source; not a production connector +``` + +If the OPC UA server is unavailable, the worker exits with a readable error +that includes the endpoint and the Docker Compose startup command. + ## Accepted Event Storage Accepted events are written to the local JSONL event store: diff --git a/services/ingestion/factory_ingestion/opcua_demo_worker.py b/services/ingestion/factory_ingestion/opcua_demo_worker.py new file mode 100644 index 0000000..cb5b88d --- /dev/null +++ b/services/ingestion/factory_ingestion/opcua_demo_worker.py @@ -0,0 +1,341 @@ +from __future__ import annotations + +import argparse +import asyncio +import sys +from collections.abc import Awaitable, Callable, Sequence +from dataclasses import dataclass +from datetime import UTC, datetime, timedelta +from pathlib import Path +from typing import Any + +from factory_events import ( + BatchEventPayload, + EventContext, + EventEnvelope, + EventMetadata, + EventSource, + ProcessMeasurementPayload, + QualityMeasurementPayload, + WorkOrderEventPayload, +) +from factory_simulator.opcua_demo import ( + DEMO_OPC_UA_NODES, + DEMO_PRODUCT_ID, + DEMO_PRODUCT_NAME, + OpcUaDemoNode, +) + +from factory_ingestion.storage import JsonlEventStore + +DEFAULT_OPCUA_ENDPOINT = "opc.tcp://localhost:4840/ofi/demo" +DEFAULT_OPCUA_POLL_COUNT = 6 +DEFAULT_OPCUA_POLL_INTERVAL_SECONDS = 1.0 + +SnapshotReader = Callable[[str], Awaitable[dict[str, Any]]] + + +class OpcUaDemoWorkerError(RuntimeError): + """Base error for the demo-only OPC UA ingestion worker.""" + + +class OpcUaDemoServerUnavailableError(OpcUaDemoWorkerError): + """Raised when the worker cannot read the configured demo OPC UA endpoint.""" + + +@dataclass(frozen=True) +class OpcUaDemoIngestionResult: + endpoint_url: str + poll_count: int + poll_interval_seconds: float + emitted_count: int + events_store_path: Path + + +def _node_id(node: OpcUaDemoNode) -> str: + return f"ns=2;s={node.node_id}" + + +def _nodes_by_tag() -> dict[str, OpcUaDemoNode]: + return {node.tag_name: node for node in DEMO_OPC_UA_NODES} + + +def _signal_name(signal_id: str) -> str: + return " ".join(part.capitalize() for part in signal_id.split("_")) + + +async def read_demo_snapshot(endpoint_url: str) -> dict[str, Any]: + from asyncua import Client + + values: dict[str, Any] = {} + try: + async with Client(endpoint_url) as client: + for node in DEMO_OPC_UA_NODES: + values[node.tag_name] = await client.get_node(_node_id(node)).read_value() + except Exception as exc: # asyncua raises a mix of transport and status exceptions. + msg = ( + "unable to read OPC UA demo server at " + f"{endpoint_url}. Start it with " + "`docker compose -f infra/docker/docker-compose.yml up --build opcua-simulator` " + "or verify the endpoint URL." + ) + raise OpcUaDemoServerUnavailableError(msg) from exc + return values + + +def build_events_from_snapshot( + snapshot: dict[str, Any], + *, + poll_index: int, + timestamp: datetime, + include_context_events: bool, +) -> list[EventEnvelope]: + nodes_by_tag = _nodes_by_tag() + source = EventSource( + system="factory-simulator", + adapter="opcua-demo-worker", + source_event_id=f"opcua-demo-poll-{poll_index:04d}", + ) + metadata = EventMetadata(simulated=True, trace_id=f"trace_opcua_demo_poll_{poll_index:04d}") + events: list[EventEnvelope] = [] + + if include_context_events: + events.extend( + _context_events( + snapshot, + timestamp=timestamp, + source=source, + metadata=metadata, + ) + ) + + for node in (node for node in DEMO_OPC_UA_NODES if node.category == "process"): + signal_id = node.tag_name.split(".", maxsplit=1)[1] + events.append( + EventEnvelope( + event_id=f"evt_opcua_demo_{signal_id}_{poll_index:04d}", + event_type="process.measurement.recorded", + schema_version="1.0.0", + timestamp=timestamp, + source=source.model_copy( + update={ + "source_event_id": f"opcua:{_node_id(node)}:poll:{poll_index:04d}" + } + ), + context=_measurement_context(snapshot, asset_id=node.asset_id), + payload=ProcessMeasurementPayload( + signal_id=signal_id, + signal_name=_signal_name(signal_id), + tag_name=node.tag_name, + value=float(snapshot[node.tag_name]), + unit=node.unit or "unknown", + quality="good", + normal_min=node.normal_min, + normal_max=node.normal_max, + target_value=node.target_value, + ), + metadata=metadata, + ) + ) + + quality_node = nodes_by_tag["checkweigher_cw_201.final_fill_weight"] + quality_value = float(snapshot[quality_node.tag_name]) + quality_result = ( + "pass" + if quality_node.normal_min <= quality_value <= quality_node.normal_max + else "fail" + ) + events.append( + EventEnvelope( + event_id=f"evt_opcua_demo_quality_final_fill_weight_{poll_index:04d}", + event_type="quality.measurement.recorded", + schema_version="1.0.0", + timestamp=timestamp + timedelta(milliseconds=100), + source=source.model_copy( + update={ + "source_event_id": f"opcua:{_node_id(quality_node)}:poll:{poll_index:04d}" + } + ), + context=_measurement_context(snapshot, asset_id=quality_node.asset_id), + payload=QualityMeasurementPayload( + quality_check_type="inline_check", + measurement_name="Final Fill Weight", + value=quality_value, + unit=quality_node.unit or "unknown", + result_status=quality_result, + result=quality_result, + severity="high" if quality_result == "fail" else "low", + spec_min=quality_node.normal_min, + spec_max=quality_node.normal_max, + ), + metadata=metadata, + ) + ) + + return events + + +async def run_opcua_demo_worker( + *, + endpoint_url: str, + events_store: JsonlEventStore, + poll_count: int = DEFAULT_OPCUA_POLL_COUNT, + poll_interval_seconds: float = DEFAULT_OPCUA_POLL_INTERVAL_SECONDS, + reader: SnapshotReader = read_demo_snapshot, + start_time: datetime | None = None, +) -> OpcUaDemoIngestionResult: + if poll_count < 1: + msg = "poll_count must be at least 1" + raise ValueError(msg) + if poll_interval_seconds < 0: + msg = "poll_interval_seconds must be greater than or equal to 0" + raise ValueError(msg) + + base_time = start_time or datetime.now(UTC) + emitted_count = 0 + + for poll_index in range(poll_count): + try: + snapshot = await reader(endpoint_url) + except OpcUaDemoServerUnavailableError: + raise + except Exception as exc: + msg = f"unable to read OPC UA demo server at {endpoint_url}: {exc}" + raise OpcUaDemoServerUnavailableError(msg) from exc + + timestamp = base_time + timedelta(seconds=poll_index * poll_interval_seconds) + events = build_events_from_snapshot( + snapshot, + poll_index=poll_index, + timestamp=timestamp, + include_context_events=poll_index == 0, + ) + for event in events: + events_store.append(event) + emitted_count += len(events) + + if poll_index < poll_count - 1 and poll_interval_seconds > 0: + await asyncio.sleep(poll_interval_seconds) + + return OpcUaDemoIngestionResult( + endpoint_url=endpoint_url, + poll_count=poll_count, + poll_interval_seconds=poll_interval_seconds, + emitted_count=emitted_count, + events_store_path=events_store.path, + ) + + +def format_opcua_demo_summary(result: OpcUaDemoIngestionResult) -> str: + return "\n".join( + [ + "opc ua demo ingestion summary", + f"endpoint: {result.endpoint_url}", + f"poll_count: {result.poll_count}", + f"poll_interval_seconds: {result.poll_interval_seconds:g}", + f"emitted_events: {result.emitted_count}", + f"accepted_output: {result.events_store_path}", + "demo_boundary: simulator-backed demo OPC UA source; not a production connector", + ] + ) + + +def main(argv: Sequence[str] | None = None) -> int: + parser = argparse.ArgumentParser(description="Read demo OPC UA tags into FactoryEvents.") + parser.add_argument("--endpoint", default=DEFAULT_OPCUA_ENDPOINT) + parser.add_argument( + "--events-store", + type=Path, + default=Path(".local/storage/opcua_demo_events.jsonl"), + ) + parser.add_argument("--poll-count", type=int, default=DEFAULT_OPCUA_POLL_COUNT) + parser.add_argument( + "--poll-interval", + type=float, + default=DEFAULT_OPCUA_POLL_INTERVAL_SECONDS, + help="Seconds between demo OPC UA polls.", + ) + args = parser.parse_args(argv) + + try: + result = asyncio.run( + run_opcua_demo_worker( + endpoint_url=args.endpoint, + events_store=JsonlEventStore(args.events_store), + poll_count=args.poll_count, + poll_interval_seconds=args.poll_interval, + ) + ) + except (OpcUaDemoWorkerError, ValueError) as exc: + print(f"opc ua demo ingestion error: {exc}", file=sys.stderr) + return 2 + + print(format_opcua_demo_summary(result)) + return 0 + + +def _measurement_context(snapshot: dict[str, Any], *, asset_id: str | None) -> EventContext: + return EventContext( + site_id=str(snapshot["site_id"]), + area_id=str(snapshot["area_id"]), + line_id=str(snapshot["line_id"]), + asset_id=asset_id, + batch_id=str(snapshot["batch_id"]), + work_order_id=str(snapshot["work_order_id"]), + ) + + +def _context_events( + snapshot: dict[str, Any], + *, + timestamp: datetime, + source: EventSource, + metadata: EventMetadata, +) -> list[EventEnvelope]: + context = EventContext( + site_id=str(snapshot["site_id"]), + area_id=str(snapshot["area_id"]), + line_id=str(snapshot["line_id"]), + batch_id=str(snapshot["batch_id"]), + work_order_id=str(snapshot["work_order_id"]), + ) + return [ + EventEnvelope( + event_id="evt_opcua_demo_work_order_started", + event_type="production.work_order.started", + schema_version="1.0.0", + timestamp=timestamp - timedelta(milliseconds=200), + source=source.model_copy(update={"source_event_id": "opcua-demo-context-work-order"}), + context=context, + payload=WorkOrderEventPayload( + work_order_id=str(snapshot["work_order_id"]), + product_id=str(snapshot["product_id"]), + product_name=str(snapshot["product_name"]), + batch_id=str(snapshot["batch_id"]), + lot_id=str(snapshot["batch_id"]), + status="started", + ), + metadata=metadata, + ), + EventEnvelope( + event_id="evt_opcua_demo_batch_started", + event_type="production.batch.started", + schema_version="1.0.0", + timestamp=timestamp - timedelta(milliseconds=100), + source=source.model_copy(update={"source_event_id": "opcua-demo-context-batch"}), + context=context, + payload=BatchEventPayload( + batch_id=str(snapshot["batch_id"]), + lot_id=str(snapshot["batch_id"]), + product_id=str(snapshot.get("product_id", DEMO_PRODUCT_ID)), + product_name=str(snapshot.get("product_name", DEMO_PRODUCT_NAME)), + work_order_id=str(snapshot["work_order_id"]), + status="started", + ), + metadata=metadata, + ), + ] + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/services/ingestion/tests/test_opcua_demo_worker.py b/services/ingestion/tests/test_opcua_demo_worker.py new file mode 100644 index 0000000..d1aa2bd --- /dev/null +++ b/services/ingestion/tests/test_opcua_demo_worker.py @@ -0,0 +1,188 @@ +from __future__ import annotations + +import asyncio +import socket +from datetime import UTC, datetime +from pathlib import Path + +import pytest +from factory_ingestion.opcua_demo_worker import ( + DEFAULT_OPCUA_ENDPOINT, + OpcUaDemoServerUnavailableError, + build_events_from_snapshot, + read_demo_snapshot, + run_opcua_demo_worker, +) +from factory_ingestion.opcua_demo_worker import ( + main as opcua_worker_cli_main, +) +from factory_ingestion.storage import JsonlEventStore +from factory_simulator.opcua_demo import DEMO_OPC_UA_NODES +from factory_simulator.opcua_server import run_server + + +def _free_port() -> int: + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: + sock.bind(("127.0.0.1", 0)) + return int(sock.getsockname()[1]) + + +def _normal_snapshot() -> dict[str, object]: + return {node.tag_name: node.value for node in DEMO_OPC_UA_NODES} + + +def test_opcua_demo_snapshot_maps_to_valid_factory_events() -> None: + events = build_events_from_snapshot( + _normal_snapshot(), + poll_index=0, + timestamp=datetime(2026, 1, 1, 12, 0, tzinfo=UTC), + include_context_events=True, + ) + + assert [event.event_type for event in events] == [ + "production.work_order.started", + "production.batch.started", + "process.measurement.recorded", + "process.measurement.recorded", + "process.measurement.recorded", + "quality.measurement.recorded", + ] + assert {event.context.site_id for event in events} == {"greenville_demo_site"} + assert {event.context.line_id for event in events} == {"line_2"} + assert {event.context.work_order_id for event in events} == {"WO-DEMO-1007"} + + work_order = events[0] + assert work_order.payload.product_id == "ofi_demo_beverage" + assert work_order.payload.product_name == "OFI Demo Beverage" + assert work_order.context.batch_id == "BATCH-DEMO-1007" + + process_events = [ + event for event in events if event.event_type == "process.measurement.recorded" + ] + assert {event.payload.tag_name for event in process_events} == { + "filler_f_201.fill_weight", + "filler_f_201.filler_nozzle_pressure", + "line_2.line_speed", + } + assert {event.context.asset_id for event in process_events} == {"filler_f_201", "line_2"} + + quality_event = events[-1] + assert quality_event.context.asset_id == "checkweigher_cw_201" + assert quality_event.payload.measurement_name == "Final Fill Weight" + assert quality_event.payload.result_status == "pass" + + +def test_opcua_demo_worker_writes_polled_events_to_jsonl_store(tmp_path: Path) -> None: + async def fake_reader(endpoint_url: str) -> dict[str, object]: + assert endpoint_url == DEFAULT_OPCUA_ENDPOINT + return _normal_snapshot() + + store = JsonlEventStore(tmp_path / "opcua_events.jsonl") + + result = asyncio.run( + run_opcua_demo_worker( + endpoint_url=DEFAULT_OPCUA_ENDPOINT, + events_store=store, + poll_count=2, + poll_interval_seconds=0, + reader=fake_reader, + start_time=datetime(2026, 1, 1, 12, 0, tzinfo=UTC), + ) + ) + + stored_events = store.list_events() + assert result.emitted_count == 10 + assert result.poll_count == 2 + assert result.poll_interval_seconds == 0 + assert len(stored_events) == 10 + assert sum(event.event_type == "process.measurement.recorded" for event in stored_events) == 6 + assert sum(event.event_type == "quality.measurement.recorded" for event in stored_events) == 2 + assert any(event.event_type == "production.work_order.started" for event in stored_events) + assert any(event.event_type == "production.batch.started" for event in stored_events) + + +def test_opcua_demo_worker_reports_unavailable_server_clearly(tmp_path: Path) -> None: + async def failing_reader(endpoint_url: str) -> dict[str, object]: + raise ConnectionError("connection refused") + + with pytest.raises(OpcUaDemoServerUnavailableError) as exc_info: + asyncio.run( + run_opcua_demo_worker( + endpoint_url="opc.tcp://127.0.0.1:9/ofi/demo", + events_store=JsonlEventStore(tmp_path / "events.jsonl"), + poll_count=1, + poll_interval_seconds=0, + reader=failing_reader, + ) + ) + + assert "unable to read OPC UA demo server" in str(exc_info.value) + assert "opc.tcp://127.0.0.1:9/ofi/demo" in str(exc_info.value) + + +def test_opcua_demo_worker_cli_reports_unavailable_server( + tmp_path: Path, + capsys: pytest.CaptureFixture[str], +) -> None: + events_store = tmp_path / "events.jsonl" + + result = opcua_worker_cli_main( + [ + "--endpoint", + "opc.tcp://127.0.0.1:9/ofi/demo", + "--events-store", + str(events_store), + "--poll-count", + "1", + "--poll-interval", + "0", + ] + ) + + captured = capsys.readouterr() + assert result == 2 + assert "opc ua demo ingestion error:" in captured.err + assert "unable to read OPC UA demo server" in captured.err + + +def test_opcua_demo_worker_reads_local_demo_server(tmp_path: Path) -> None: + pytest.importorskip("asyncua") + + async def exercise_worker() -> None: + port = _free_port() + ready = asyncio.Event() + stop = asyncio.Event() + server_task = asyncio.create_task( + run_server( + host="127.0.0.1", + port=port, + endpoint_path="/ofi/worker-test", + ready_event=ready, + stop_event=stop, + ) + ) + store = JsonlEventStore(tmp_path / "opcua_events.jsonl") + + try: + await asyncio.wait_for(ready.wait(), timeout=5) + snapshot = await read_demo_snapshot(f"opc.tcp://127.0.0.1:{port}/ofi/worker-test") + assert snapshot["filler_f_201.fill_weight"] == 500.12 + + result = await run_opcua_demo_worker( + endpoint_url=f"opc.tcp://127.0.0.1:{port}/ofi/worker-test", + events_store=store, + poll_count=1, + poll_interval_seconds=0, + start_time=datetime(2026, 1, 1, 12, 0, tzinfo=UTC), + ) + finally: + stop.set() + await asyncio.wait_for(server_task, timeout=5) + + assert result.emitted_count == 6 + stored_events = store.list_events() + assert len(stored_events) == 6 + assert {event.context.site_id for event in stored_events} == {"greenville_demo_site"} + assert {event.context.work_order_id for event in stored_events} == {"WO-DEMO-1007"} + + asyncio.run(exercise_worker())