diff --git a/docs/LEARNING_LOG.md b/docs/LEARNING_LOG.md index a2830f2..fbea380 100644 --- a/docs/LEARNING_LOG.md +++ b/docs/LEARNING_LOG.md @@ -22,6 +22,54 @@ This file should be updated by Codex after each meaningful change. ### What to learn next ``` +## 2026-05-22 - OPC UA demo drift controls + +### What changed + +Added deterministic drift mode to the OPC UA demo simulator. The server now +starts in normal mode, exposes a demo-only `StartDrift` method, handles repeated +start calls safely, and exposes a `ResetDemo` method that returns values and +state tags to normal mode. + +### Why it matters + +The manufacturer demo can show the moment drift begins instead of starting with +an already-drifting signal. This keeps the OPC UA source simulator interactive +without introducing production writeback or arbitrary tag commands. + +### How it works + +The `StartDrift` method sets `drift_active=true`, changes `scenario` to +`fill_weight_drift_demo_gradual_drift`, and advances deterministic fill-weight, +nozzle-pressure, and final-fill-weight values once per second. `ResetDemo` +restores normal-mode values and state. + +### How to run it + +```bash +docker compose -f infra/docker/docker-compose.yml up --build opcua-simulator +``` + +### How to test it + +```bash +.venv/bin/python -m pytest services/simulator/tests/test_opcua_demo_server.py +``` + +### Key files + +- `services/simulator/factory_simulator/opcua_demo.py` +- `services/simulator/factory_simulator/opcua_server.py` +- `services/simulator/tests/test_opcua_demo_server.py` +- `services/simulator/README.md` +- `docs/demo/OPC_UA_DEMO_NAMESPACE.md` + +### What to learn next + +Use the explicit demo methods from a future UI or smoke-test control path, while +keeping production command modeling, security, and writeback out of this demo +slice. + ## 2026-05-22 - Dockerized OPC UA demo simulator ### What changed diff --git a/docs/demo/OPC_UA_DEMO_NAMESPACE.md b/docs/demo/OPC_UA_DEMO_NAMESPACE.md index 812565b..45eb3b2 100644 --- a/docs/demo/OPC_UA_DEMO_NAMESPACE.md +++ b/docs/demo/OPC_UA_DEMO_NAMESPACE.md @@ -65,6 +65,22 @@ these state tags: | `scenario` | `ns=2;s=OFI.Demo.Greenville.Packaging.Line2.State.Scenario` | `fill_weight_drift_demo` | Identifies the deterministic manufacturer demo scenario. | | `drift_active` | `ns=2;s=OFI.Demo.Greenville.Packaging.Line2.State.DriftActive` | `false` | Confirms the OPC UA server starts in normal mode before any future drift behavior is enabled. | +## Demo Control Methods + +The demo server exposes explicit control methods on the State object. These are +demo-only controls, not a production OPC UA command model and not arbitrary tag +writes. + +| Method | Node ID | Result | +| --- | --- | --- | +| `StartDrift` | `ns=2;s=OFI.Demo.Greenville.Packaging.Line2.State.StartDrift` | Starts deterministic gradual fill-weight drift, sets `scenario=fill_weight_drift_demo_gradual_drift`, and sets `drift_active=true`. Repeated calls return `already_active` without resetting the drift clock. | +| `ResetDemo` | `ns=2;s=OFI.Demo.Greenville.Packaging.Line2.State.ResetDemo` | Restores normal-mode process and quality values, sets `scenario=fill_weight_drift_demo`, and sets `drift_active=false`. | + +Drift is compressed for the 8-10 minute manufacturer demo. After `StartDrift`, +the server updates once per second; fill weight increases by `0.45 g` per +update and filler nozzle pressure increases by `0.015 bar` per update. No drift +is applied before the start method is called. + ## Mapping Rules 1. OPC UA source values are normalized into existing `FactoryEvent` envelopes. diff --git a/infra/docker/docker-compose.yml b/infra/docker/docker-compose.yml index 2497aba..da37a7a 100644 --- a/infra/docker/docker-compose.yml +++ b/infra/docker/docker-compose.yml @@ -25,7 +25,6 @@ services: OPCUA_DEMO_PORT: "4840" OPCUA_DEMO_ENDPOINT_PATH: /ofi/demo OPCUA_DEMO_SCENARIO: fill_weight_drift_demo - OPCUA_DEMO_DRIFT_ACTIVE: "false" ports: - "4840:4840" diff --git a/infra/docker/opcua-simulator.Dockerfile b/infra/docker/opcua-simulator.Dockerfile index 0fe1019..22ac296 100644 --- a/infra/docker/opcua-simulator.Dockerfile +++ b/infra/docker/opcua-simulator.Dockerfile @@ -14,7 +14,6 @@ ENV OPCUA_DEMO_HOST=0.0.0.0 ENV OPCUA_DEMO_PORT=4840 ENV OPCUA_DEMO_ENDPOINT_PATH=/ofi/demo ENV OPCUA_DEMO_SCENARIO=fill_weight_drift_demo -ENV OPCUA_DEMO_DRIFT_ACTIVE=false EXPOSE 4840 diff --git a/services/simulator/README.md b/services/simulator/README.md index ee0667b..e4b9125 100644 --- a/services/simulator/README.md +++ b/services/simulator/README.md @@ -217,6 +217,19 @@ Required demo state tags: | `scenario` | `ns=2;s=OFI.Demo.Greenville.Packaging.Line2.State.Scenario` | `fill_weight_drift_demo` | | `drift_active` | `ns=2;s=OFI.Demo.Greenville.Packaging.Line2.State.DriftActive` | `false` | +Demo control methods: + +| Method | Node ID | Behavior | +| --- | --- | --- | +| `StartDrift` | `ns=2;s=OFI.Demo.Greenville.Packaging.Line2.State.StartDrift` | Switches to `fill_weight_drift_demo_gradual_drift`, sets `drift_active=true`, and begins deterministic fill-weight drift. Repeated calls are safe and return `already_active`. | +| `ResetDemo` | `ns=2;s=OFI.Demo.Greenville.Packaging.Line2.State.ResetDemo` | Returns the simulator to normal-mode values, sets `scenario=fill_weight_drift_demo`, and sets `drift_active=false`. | + +Drift timing is compressed for an 8-10 minute demo. After `StartDrift`, the +server updates once per second. Fill weight increases by `0.45 g` per update, +filler nozzle pressure increases by `0.015 bar` per update, and final fill +weight follows the drifting fill-weight value. Normal-mode values do not drift +until `StartDrift` is called. + Startup logs include the endpoint, namespace URI, scenario name, drift flag, and a warning that the service is simulator-backed demo infrastructure. diff --git a/services/simulator/factory_simulator/opcua_demo.py b/services/simulator/factory_simulator/opcua_demo.py index b43d631..c2a5869 100644 --- a/services/simulator/factory_simulator/opcua_demo.py +++ b/services/simulator/factory_simulator/opcua_demo.py @@ -6,7 +6,12 @@ NAMESPACE_URI = "urn:open-factory-initiative:factory-intelligence-platform:demo" DEFAULT_ENDPOINT_PATH = "/ofi/demo" DEFAULT_SCENARIO = "fill_weight_drift_demo" +DRIFT_SCENARIO = "fill_weight_drift_demo_gradual_drift" DEFAULT_DRIFT_ACTIVE = False +DRIFT_UPDATE_SECONDS = 1.0 +DRIFT_FILL_WEIGHT_STEP = 0.45 +DRIFT_NOZZLE_PRESSURE_STEP = 0.015 +DRIFT_FINAL_FILL_WEIGHT_OFFSET = 0.08 DEMO_SITE_ID = "greenville_demo_site" DEMO_AREA_ID = "packaging_area" diff --git a/services/simulator/factory_simulator/opcua_server.py b/services/simulator/factory_simulator/opcua_server.py index 5039c8d..3dddfab 100644 --- a/services/simulator/factory_simulator/opcua_server.py +++ b/services/simulator/factory_simulator/opcua_server.py @@ -5,13 +5,17 @@ import logging import os from collections.abc import Sequence -from dataclasses import replace +from dataclasses import dataclass, replace from factory_simulator.opcua_demo import ( - DEFAULT_DRIFT_ACTIVE, DEFAULT_ENDPOINT_PATH, DEFAULT_SCENARIO, DEMO_OPC_UA_NODES, + DRIFT_FILL_WEIGHT_STEP, + DRIFT_FINAL_FILL_WEIGHT_OFFSET, + DRIFT_NOZZLE_PRESSURE_STEP, + DRIFT_SCENARIO, + DRIFT_UPDATE_SECONDS, NAMESPACE_URI, OpcUaDemoNode, ) @@ -19,10 +23,78 @@ LOGGER = logging.getLogger("factory_simulator.opcua_server") -def _bool_from_env(value: str | None, *, default: bool) -> bool: - if value is None: - return default - return value.strip().lower() in {"1", "true", "yes", "on"} +@dataclass(frozen=True) +class DemoOpcUaServer: + server: object + controller: DemoDriftController + + +class DemoDriftController: + def __init__(self, variables: dict[str, object]) -> None: + self._variables = variables + self._lock = asyncio.Lock() + self._drift_active = False + self._drift_step = 0 + self._normal_values = {node.tag_name: node.value for node in DEMO_OPC_UA_NODES} + + async def start_drift(self) -> str: + async with self._lock: + if self._drift_active: + LOGGER.info("StartDrift ignored because drift is already active") + return "already_active" + self._drift_active = True + self._drift_step = 0 + await self._write("drift_active", True) + await self._write("scenario", DRIFT_SCENARIO) + LOGGER.info("StartDrift accepted; drift_active=True scenario=%s", DRIFT_SCENARIO) + return "drift_started" + + async def reset(self) -> str: + async with self._lock: + self._drift_active = False + self._drift_step = 0 + for tag_name, value in self._normal_values.items(): + await self._write(tag_name, value) + LOGGER.info("ResetDemo completed; drift_active=False scenario=%s", DEFAULT_SCENARIO) + return "reset_to_normal" + + async def run(self, stop_event: asyncio.Event) -> None: + while not stop_event.is_set(): + try: + await asyncio.wait_for(stop_event.wait(), timeout=DRIFT_UPDATE_SECONDS) + except TimeoutError: + await self._tick() + + async def _tick(self) -> None: + async with self._lock: + if not self._drift_active: + return + self._drift_step += 1 + fill_weight = round( + float(self._normal_values["filler_f_201.fill_weight"]) + + (self._drift_step * DRIFT_FILL_WEIGHT_STEP), + 3, + ) + nozzle_pressure = round( + float(self._normal_values["filler_f_201.filler_nozzle_pressure"]) + + (self._drift_step * DRIFT_NOZZLE_PRESSURE_STEP), + 3, + ) + final_fill_weight = round(fill_weight + DRIFT_FINAL_FILL_WEIGHT_OFFSET, 3) + + await self._write("filler_f_201.fill_weight", fill_weight) + await self._write("filler_f_201.filler_nozzle_pressure", nozzle_pressure) + await self._write("checkweigher_cw_201.final_fill_weight", final_fill_weight) + LOGGER.info( + "drift tick step=%s fill_weight=%s nozzle_pressure=%s final_fill_weight=%s", + self._drift_step, + fill_weight, + nozzle_pressure, + final_fill_weight, + ) + + async def _write(self, tag_name: str, value: float | bool | str) -> None: + await self._variables[tag_name].write_value(value) def _endpoint(host: str, port: int, path: str) -> str: @@ -63,9 +135,9 @@ async def build_server( port: int, endpoint_path: str, scenario: str, - drift_active: bool, ): from asyncua import Server, ua + from asyncua.common.methods import uamethod server = Server() await server.init() @@ -81,7 +153,10 @@ async def build_server( line = await area.add_folder(namespace_index, "Line2") filler = await line.add_folder(namespace_index, "FillerF201") checkweigher = await line.add_folder(namespace_index, "CheckweigherCW201") - state = await line.add_folder(namespace_index, "State") + state = await line.add_folder( + ua.NodeId("OFI.Demo.Greenville.Packaging.Line2.State", namespace_index), + "State", + ) context = await line.add_folder(namespace_index, "Context") parents = { @@ -95,13 +170,15 @@ async def build_server( "line_2": line, } - for node in _configured_nodes(scenario=scenario, drift_active=drift_active): + variables: dict[str, object] = {} + for node in _configured_nodes(scenario=scenario, drift_active=False): parent = process_asset_parents.get(node.asset_id, parents[node.category]) variable = await parent.add_variable( ua.NodeId(node.node_id, namespace_index), node.browse_name, node.value, ) + variables[node.tag_name] = variable await _add_properties(variable, namespace_index, node) LOGGER.info( "exposed demo OPC UA node node_id=ns=%s;s=%s tag_name=%s value=%r", @@ -111,7 +188,39 @@ async def build_server( node.value, ) - return server + controller = DemoDriftController(variables) + + @uamethod + async def start_drift(parent) -> str: + return await controller.start_drift() + + @uamethod + async def reset_demo(parent) -> str: + return await controller.reset() + + await state.add_method( + ua.NodeId("OFI.Demo.Greenville.Packaging.Line2.State.StartDrift", namespace_index), + "StartDrift", + start_drift, + [], + [ua.VariantType.String], + ) + await state.add_method( + ua.NodeId("OFI.Demo.Greenville.Packaging.Line2.State.ResetDemo", namespace_index), + "ResetDemo", + reset_demo, + [], + [ua.VariantType.String], + ) + LOGGER.info( + "exposed demo OPC UA controls start_drift=ns=%s;s=%s reset=ns=%s;s=%s", + namespace_index, + "OFI.Demo.Greenville.Packaging.Line2.State.StartDrift", + namespace_index, + "OFI.Demo.Greenville.Packaging.Line2.State.ResetDemo", + ) + + return DemoOpcUaServer(server=server, controller=controller) async def run_server( @@ -120,7 +229,6 @@ async def run_server( port: int = 4840, endpoint_path: str = DEFAULT_ENDPOINT_PATH, scenario: str = DEFAULT_SCENARIO, - drift_active: bool = DEFAULT_DRIFT_ACTIVE, ready_event: asyncio.Event | None = None, stop_event: asyncio.Event | None = None, ) -> None: @@ -130,20 +238,18 @@ async def run_server( "not a production OPC UA connector." ) LOGGER.info( - "starting OPC UA demo simulator endpoint=%s namespace_uri=%s scenario=%s " - "drift_active=%s", + "starting OPC UA demo simulator endpoint=%s namespace_uri=%s scenario=%s", endpoint, NAMESPACE_URI, scenario, - drift_active, ) - server = await build_server( + demo_server = await build_server( host=host, port=port, endpoint_path=endpoint_path, scenario=scenario, - drift_active=drift_active, ) + server = demo_server.server async with server: LOGGER.info("OPC UA demo simulator listening endpoint=%s", endpoint) @@ -151,7 +257,11 @@ async def run_server( ready_event.set() if stop_event is None: stop_event = asyncio.Event() - await stop_event.wait() + controller_task = asyncio.create_task(demo_server.controller.run(stop_event)) + try: + await stop_event.wait() + finally: + await asyncio.wait_for(controller_task, timeout=5) def main(argv: Sequence[str] | None = None) -> None: @@ -163,11 +273,6 @@ def main(argv: Sequence[str] | None = None) -> None: default=os.getenv("OPCUA_DEMO_ENDPOINT_PATH", DEFAULT_ENDPOINT_PATH), ) parser.add_argument("--scenario", default=os.getenv("OPCUA_DEMO_SCENARIO", DEFAULT_SCENARIO)) - parser.add_argument( - "--drift-active", - action="store_true", - default=_bool_from_env(os.getenv("OPCUA_DEMO_DRIFT_ACTIVE"), default=False), - ) args = parser.parse_args(argv) logging.basicConfig(level=logging.INFO, format="%(levelname)s %(name)s: %(message)s") @@ -178,7 +283,6 @@ def main(argv: Sequence[str] | None = None) -> None: port=args.port, endpoint_path=args.endpoint_path, scenario=args.scenario, - drift_active=args.drift_active, ) ) except KeyboardInterrupt: diff --git a/services/simulator/tests/test_opcua_demo_server.py b/services/simulator/tests/test_opcua_demo_server.py index 1e365ec..e5560eb 100644 --- a/services/simulator/tests/test_opcua_demo_server.py +++ b/services/simulator/tests/test_opcua_demo_server.py @@ -10,6 +10,7 @@ from factory_simulator.opcua_demo import ( DEFAULT_SCENARIO, DEMO_OPC_UA_NODES, + DRIFT_SCENARIO, NAMESPACE_URI, ) from factory_simulator.opcua_server import run_server @@ -70,7 +71,6 @@ def test_opcua_demo_compose_service_is_documented_and_demo_scoped() -> None: "infra/docker/opcua-simulator.Dockerfile", "4840:4840", "OPCUA_DEMO_SCENARIO: fill_weight_drift_demo", - "OPCUA_DEMO_DRIFT_ACTIVE: \"false\"", ] for term in required_terms: assert term in compose @@ -145,4 +145,95 @@ async def exercise_server() -> None: asyncio.run(exercise_server()) assert "simulator-backed demo infrastructure" in caplog.text assert "OPC UA demo simulator listening endpoint=opc.tcp://127.0.0.1:" in caplog.text - assert "scenario=fill_weight_drift_demo drift_active=False" in caplog.text + assert "scenario=fill_weight_drift_demo" in caplog.text + + +def test_opcua_demo_server_start_drift_is_idempotent_and_reset_returns_to_normal() -> None: + pytest.importorskip("asyncua") + + async def exercise_server() -> None: + from asyncua import Client, ua + + port = next(_free_port()) + ready = asyncio.Event() + stop = asyncio.Event() + task = asyncio.create_task( + run_server( + host="127.0.0.1", + port=port, + endpoint_path="/ofi/demo-drift-test", + ready_event=ready, + stop_event=stop, + ) + ) + + fill_weight_node = ( + "ns=2;s=OFI.Demo.Greenville.Packaging.Line2.FillerF201.FillWeight" + ) + pressure_node = ( + "ns=2;s=OFI.Demo.Greenville.Packaging.Line2.FillerF201.NozzlePressure" + ) + quality_node = ( + "ns=2;s=OFI.Demo.Greenville.Packaging.Line2.CheckweigherCW201.FinalFillWeight" + ) + scenario_node = "ns=2;s=OFI.Demo.Greenville.Packaging.Line2.State.Scenario" + drift_active_node = "ns=2;s=OFI.Demo.Greenville.Packaging.Line2.State.DriftActive" + state_node_id = "ns=2;s=OFI.Demo.Greenville.Packaging.Line2.State" + + try: + await asyncio.wait_for(ready.wait(), timeout=5) + async with Client(f"opc.tcp://127.0.0.1:{port}/ofi/demo-drift-test") as client: + assert await client.get_node(fill_weight_node).read_value() == 500.12 + assert await client.get_node(drift_active_node).read_value() is False + assert await client.get_node(scenario_node).read_value() == DEFAULT_SCENARIO + + state_node = client.get_node(state_node_id) + assert ( + await state_node.call_method( + ua.NodeId( + "OFI.Demo.Greenville.Packaging.Line2.State.StartDrift", + 2, + ) + ) + == "drift_started" + ) + assert ( + await state_node.call_method( + ua.NodeId( + "OFI.Demo.Greenville.Packaging.Line2.State.StartDrift", + 2, + ) + ) + == "already_active" + ) + assert await client.get_node(drift_active_node).read_value() is True + assert await client.get_node(scenario_node).read_value() == DRIFT_SCENARIO + + await asyncio.sleep(2.2) + drift_fill_weight = await client.get_node(fill_weight_node).read_value() + drift_pressure = await client.get_node(pressure_node).read_value() + drift_quality = await client.get_node(quality_node).read_value() + + assert drift_fill_weight > 500.12 + assert drift_pressure > 2.1 + assert drift_quality > 500.2 + + assert ( + await state_node.call_method( + ua.NodeId( + "OFI.Demo.Greenville.Packaging.Line2.State.ResetDemo", + 2, + ) + ) + == "reset_to_normal" + ) + assert await client.get_node(fill_weight_node).read_value() == 500.12 + assert await client.get_node(pressure_node).read_value() == 2.1 + assert await client.get_node(quality_node).read_value() == 500.2 + assert await client.get_node(drift_active_node).read_value() is False + assert await client.get_node(scenario_node).read_value() == DEFAULT_SCENARIO + finally: + stop.set() + await asyncio.wait_for(task, timeout=5) + + asyncio.run(exercise_server())