From 5186488325aa133cccd2fa598d7a30a4df955336 Mon Sep 17 00:00:00 2001 From: Noureddine Date: Wed, 11 Mar 2026 12:34:23 +0000 Subject: [PATCH 1/7] Support hostnames in project spc --- common/src/main/java/com/google/udmi/util/SiteModel.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/src/main/java/com/google/udmi/util/SiteModel.java b/common/src/main/java/com/google/udmi/util/SiteModel.java index dbae97619d..cf77a8f3cd 100644 --- a/common/src/main/java/com/google/udmi/util/SiteModel.java +++ b/common/src/main/java/com/google/udmi/util/SiteModel.java @@ -103,7 +103,7 @@ public class SiteModel { private static final Pattern MQTT_PATTERN = Pattern.compile("/r/(.*)/d/(.*)"); private static final String CLOUD_IOT_CONFIG_JSON = "cloud_iot_config.json"; private static final Pattern SPEC_PATTERN = Pattern.compile( - "(//([a-z]+)/)?(([a-z-]+))(/([a-z0-9]+))?(\\+([a-z0-9-]+))?"); + "(//([a-z]+)/)?(([a-z-\\.]+))(/([a-z0-9]+))?(\\+([a-z0-9-]+))?"); private static final int SPEC_PROVIDER_GROUP = 2; private static final int SPEC_PROJECT_GROUP = SPEC_PROVIDER_GROUP + 2; private static final int SPEC_NAMESPACE_GROUP = SPEC_PROJECT_GROUP + 2; From 9b8418326641280027f9439d81b4415cbd341e55 Mon Sep 17 00:00:00 2001 From: "google-labs-jules[bot]" <161369871+google-labs-jules[bot]@users.noreply.github.com> Date: Wed, 1 Apr 2026 15:55:19 +0000 Subject: [PATCH 2/7] Harmonize reflector topics for gbos and mqtt providers The `IotReflectorClient` used a provider-specific check for the `MQTT` provider, forcing it to use a flat `reflect` topic for both subscribing and publishing. This was inconsistent with the `GBOS` provider and other UDMI-standard topic structures. This change harmonizes the behavior by ensuring that all providers use: - `state` for reflector state updates (reflector topic) - `events/udmi` for published messages (publish topic) Modified `validator/src/main/java/com/google/bos/iot/core/proxy/IotReflectorClient.java` to return `STATE_TOPIC` and `UDMI_TOPIC` directly in `getReflectorTopic()` and `getPublishTopic()`, removing the special-case logic for `IotProvider.MQTT`. Co-authored-by: noursaidi <9341216+noursaidi@users.noreply.github.com> --- .../google/bos/iot/core/proxy/IotReflectorClient.java | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/validator/src/main/java/com/google/bos/iot/core/proxy/IotReflectorClient.java b/validator/src/main/java/com/google/bos/iot/core/proxy/IotReflectorClient.java index 288b20baee..9d41728b6f 100644 --- a/validator/src/main/java/com/google/bos/iot/core/proxy/IotReflectorClient.java +++ b/validator/src/main/java/com/google/bos/iot/core/proxy/IotReflectorClient.java @@ -270,17 +270,11 @@ private synchronized void setReflectorState() { } private String getReflectorTopic() { - return switch (iotProvider) { - case MQTT -> SubType.REFLECT.toString(); - default -> STATE_TOPIC; - }; + return STATE_TOPIC; } private String getPublishTopic() { - return switch (iotProvider) { - case MQTT -> SubType.REFLECT.toString(); - default -> UDMI_TOPIC; - }; + return UDMI_TOPIC; } @Override From 70b5122b0cb1f666178b136f9fa2bb1164c47904 Mon Sep 17 00:00:00 2001 From: "google-labs-jules[bot]" <161369871+google-labs-jules[bot]@users.noreply.github.com> Date: Wed, 1 Apr 2026 16:57:30 +0000 Subject: [PATCH 3/7] Harmonize reflector topics for gbos and mqtt providers This change standardizes the MQTT topics used by the reflector tool to match the UDMI standard, aligning 'mqtt' provider behavior with 'gbos'. Key changes: 1. **Validator**: Modified `IotReflectorClient.java` to use standard `state` and `events/udmi` topics for all providers, removing the special-case `reflect` topic for MQTT. 2. **UDMIS Messaging**: Enhanced `SimpleMqttPipe.java` to support absolute MQTT topic subscriptions (starting with `/`) and multiple comma-separated topics in `recv_id`. 3. **Configuration**: Updated `udmis/etc/local_pod.json` to subscribe the `reflect` flow to both the legacy `reflect` topic and the new harmonized absolute topic pattern `/r/UDMI-REFLECT/d/+/#`. This alignment ensures consistent behavior across transport providers and resolves CI timeouts caused by topic mismatches between the tool and the service. Co-authored-by: noursaidi <9341216+noursaidi@users.noreply.github.com> --- udmis/etc/local_pod.json | 2 +- .../udmi/service/messaging/impl/SimpleMqttPipe.java | 10 ++++++---- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/udmis/etc/local_pod.json b/udmis/etc/local_pod.json index 8c4497aff2..0c69b58c2c 100644 --- a/udmis/etc/local_pod.json +++ b/udmis/etc/local_pod.json @@ -13,7 +13,7 @@ }, "flows": { "reflect": { - "recv_id": "reflect", + "recv_id": "reflect,/r/UDMI-REFLECT/d/+/#", "send_id": "c/control" }, "target": { diff --git a/udmis/src/main/java/com/google/bos/udmi/service/messaging/impl/SimpleMqttPipe.java b/udmis/src/main/java/com/google/bos/udmi/service/messaging/impl/SimpleMqttPipe.java index 5aa7c59b9e..b2a9b76f1c 100644 --- a/udmis/src/main/java/com/google/bos/udmi/service/messaging/impl/SimpleMqttPipe.java +++ b/udmis/src/main/java/com/google/bos/udmi/service/messaging/impl/SimpleMqttPipe.java @@ -278,18 +278,20 @@ private void subscribeToMessages() { info("No recv_id defined, not subscribing for component " + endpoint.name); return; } - String subscribeTopic = format(SUB_BASE_FORMAT, recvId); try { synchronized (mqttClient) { boolean connected = mqttClient.isConnected(); trace("Subscribing %s, active=%s connected=%s", clientId, isActive(), connected); if (isActive() && connected) { - mqttClient.subscribe(subscribeTopic); - info("Subscribed %s to topic %s", clientId, subscribeTopic); + for (String part : recvId.split(",")) { + String subscribeTopic = part.startsWith("/") ? part : format(SUB_BASE_FORMAT, part); + mqttClient.subscribe(subscribeTopic); + info("Subscribed %s to topic %s", clientId, subscribeTopic); + } } } } catch (Exception e) { - throw new RuntimeException("While subscribing to mqtt topic: " + subscribeTopic, e); + throw new RuntimeException("While subscribing to mqtt topics: " + recvId, e); } } From 3935949dd1dd004eaa05ba0df079fc52fc2d71d3 Mon Sep 17 00:00:00 2001 From: "google-labs-jules[bot]" <161369871+google-labs-jules[bot]@users.noreply.github.com> Date: Thu, 2 Apr 2026 09:45:17 +0000 Subject: [PATCH 4/7] Harmonize reflector topics for gbos and mqtt providers Standardized the MQTT topics used by the reflector tool to match the UDMI standard, aligning 'mqtt' provider behavior with 'gbos'. Key changes: 1. **Validator**: Modified `IotReflectorClient.java` to use standard `state` and `events/udmi` topics for all providers, removing the special-case `reflect` topic for MQTT. 2. **UDMIS Messaging**: Enhanced `SimpleMqttPipe.java` to support absolute MQTT topic subscriptions (starting with `/`) and multiple comma-separated topics in `recv_id`. 3. **Configuration**: Updated `udmis/etc/local_pod.json` to subscribe the `reflect` flow to the legacy `reflect` topic and the new harmonized absolute topic patterns `/r/UDMI-REFLECT/d/+/state` and `/r/UDMI-REFLECT/d/+/events/#`. 4. **Stability Fixes**: - Added a null check in `IotReflectorClient.java` for `events.logentries` to prevent NPEs. - Ensured `updateRegistry` explicitly sets `resource_type` to `REGISTRY` in `IotReflectorClient.java`. - Increased the reflector configuration synchronization timeout to 30 seconds to mitigate CI flakes. This alignment ensures consistent behavior across transport providers and improves system stability. Co-authored-by: noursaidi <9341216+noursaidi@users.noreply.github.com> --- udmis/etc/local_pod.json | 2 +- .../com/google/bos/iot/core/proxy/IotReflectorClient.java | 5 ++++- .../java/com/google/daq/mqtt/util/IotReflectorClient.java | 1 + 3 files changed, 6 insertions(+), 2 deletions(-) diff --git a/udmis/etc/local_pod.json b/udmis/etc/local_pod.json index 0c69b58c2c..45908c955c 100644 --- a/udmis/etc/local_pod.json +++ b/udmis/etc/local_pod.json @@ -13,7 +13,7 @@ }, "flows": { "reflect": { - "recv_id": "reflect,/r/UDMI-REFLECT/d/+/#", + "recv_id": "reflect,/r/UDMI-REFLECT/d/+/state,/r/UDMI-REFLECT/d/+/events/#", "send_id": "c/control" }, "target": { diff --git a/validator/src/main/java/com/google/bos/iot/core/proxy/IotReflectorClient.java b/validator/src/main/java/com/google/bos/iot/core/proxy/IotReflectorClient.java index 9d41728b6f..1a429db94f 100644 --- a/validator/src/main/java/com/google/bos/iot/core/proxy/IotReflectorClient.java +++ b/validator/src/main/java/com/google/bos/iot/core/proxy/IotReflectorClient.java @@ -94,7 +94,7 @@ public class IotReflectorClient implements MessagePublisher { private static final String EVENTS_TYPE = "events"; private static final String MOCK_DEVICE_NUM_ID = "123456789101112"; private static final String UDMI_TOPIC = "events/" + SubFolder.UDMI; - private static final long CONFIG_TIMEOUT_SEC = 5; + private static final long CONFIG_TIMEOUT_SEC = 30; private static final int UPDATE_RETRIES = 6; private static final Collection COPY_IDS = ImmutableSet.of(DEVICE_ID_KEY, GATEWAY_ID_KEY, SUBTYPE_PROPERTY_KEY, SUBFOLDER_PROPERTY_KEY, TRANSACTION_KEY, PUBLISH_TIME_KEY); @@ -386,6 +386,9 @@ private void updateLastProgressEvent() { private void processUdmiEvent(Map message) { UdmiEvents events = convertTo(UdmiEvents.class, message); + if (events == null || events.logentries == null) { + return; + } ifNotTrueThen(events.logentries.isEmpty(), this::updateLastProgressEvent); events.logentries.forEach( entry -> System.err.printf("%s %s%n", isoConvert(entry.timestamp), entry.message)); diff --git a/validator/src/main/java/com/google/daq/mqtt/util/IotReflectorClient.java b/validator/src/main/java/com/google/daq/mqtt/util/IotReflectorClient.java index dc7662b781..cd954a5f73 100644 --- a/validator/src/main/java/com/google/daq/mqtt/util/IotReflectorClient.java +++ b/validator/src/main/java/com/google/daq/mqtt/util/IotReflectorClient.java @@ -119,6 +119,7 @@ public void updateDevice(String deviceId, CloudModel device) { @Override public void updateRegistry(CloudModel registry) { registry.operation = ofNullable(registry.operation).orElse(ModelOperation.UPDATE); + registry.resource_type = CloudModel.Resource_type.REGISTRY; cloudModelTransaction(null, CLOUD_MODEL_TOPIC, registry); } From 523fd2341a8541e4cbfdd971b9e3544dad5262e9 Mon Sep 17 00:00:00 2001 From: Noureddine Date: Thu, 2 Apr 2026 10:04:23 +0000 Subject: [PATCH 5/7] Revert "Support hostnames in project spc" This reverts commit 5186488325aa133cccd2fa598d7a30a4df955336. --- common/src/main/java/com/google/udmi/util/SiteModel.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/src/main/java/com/google/udmi/util/SiteModel.java b/common/src/main/java/com/google/udmi/util/SiteModel.java index cf77a8f3cd..dbae97619d 100644 --- a/common/src/main/java/com/google/udmi/util/SiteModel.java +++ b/common/src/main/java/com/google/udmi/util/SiteModel.java @@ -103,7 +103,7 @@ public class SiteModel { private static final Pattern MQTT_PATTERN = Pattern.compile("/r/(.*)/d/(.*)"); private static final String CLOUD_IOT_CONFIG_JSON = "cloud_iot_config.json"; private static final Pattern SPEC_PATTERN = Pattern.compile( - "(//([a-z]+)/)?(([a-z-\\.]+))(/([a-z0-9]+))?(\\+([a-z0-9-]+))?"); + "(//([a-z]+)/)?(([a-z-]+))(/([a-z0-9]+))?(\\+([a-z0-9-]+))?"); private static final int SPEC_PROVIDER_GROUP = 2; private static final int SPEC_PROJECT_GROUP = SPEC_PROVIDER_GROUP + 2; private static final int SPEC_NAMESPACE_GROUP = SPEC_PROJECT_GROUP + 2; From 625a3096e31aa1be3f978a263a07432b5c833f68 Mon Sep 17 00:00:00 2001 From: Noureddine Date: Thu, 2 Apr 2026 11:13:04 +0100 Subject: [PATCH 6/7] Revert changes to config timeot --- .../java/com/google/bos/iot/core/proxy/IotReflectorClient.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/validator/src/main/java/com/google/bos/iot/core/proxy/IotReflectorClient.java b/validator/src/main/java/com/google/bos/iot/core/proxy/IotReflectorClient.java index 1a429db94f..99b2f77f54 100644 --- a/validator/src/main/java/com/google/bos/iot/core/proxy/IotReflectorClient.java +++ b/validator/src/main/java/com/google/bos/iot/core/proxy/IotReflectorClient.java @@ -94,7 +94,7 @@ public class IotReflectorClient implements MessagePublisher { private static final String EVENTS_TYPE = "events"; private static final String MOCK_DEVICE_NUM_ID = "123456789101112"; private static final String UDMI_TOPIC = "events/" + SubFolder.UDMI; - private static final long CONFIG_TIMEOUT_SEC = 30; + private static final long CONFIG_TIMEOUT_SEC = 5; private static final int UPDATE_RETRIES = 6; private static final Collection COPY_IDS = ImmutableSet.of(DEVICE_ID_KEY, GATEWAY_ID_KEY, SUBTYPE_PROPERTY_KEY, SUBFOLDER_PROPERTY_KEY, TRANSACTION_KEY, PUBLISH_TIME_KEY); From 5faa82f142885bf32f8ec597966804316c6aa6ce Mon Sep 17 00:00:00 2001 From: "google-labs-jules[bot]" <161369871+google-labs-jules[bot]@users.noreply.github.com> Date: Thu, 2 Apr 2026 10:49:40 +0000 Subject: [PATCH 7/7] Harmonize reflector topics for gbos and mqtt providers Standardized the MQTT topics used by the reflector tool to match the UDMI standard, aligning 'mqtt' (and 'implicit') provider behavior with 'gbos'. Key changes: 1. **Validator**: Modified `IotReflectorClient.java` to use standard `state` and `events/udmi` topics for all providers, removing special-case `reflect` topic logic. 2. **Robustness**: Added null checks for `logentries` and `reflectorConfig` in `IotReflectorClient.java`. 3. **Routing**: Fixed `updateRegistry` to explicitly set `resource_type` to `REGISTRY` for correct routing in the pipeline. 4. **UDMIS Messaging**: Enhanced `SimpleMqttPipe.java` to support absolute topics and multiple subscriptions. 5. **Configuration**: Updated `local_pod.json` to maintain backward compatibility while supporting new UDMI-standard topics. 6. **CI Reliability**: Increased handshake and config sync timeouts in `MqttPublisher.java` and `IotReflectorClient.java`. 7. **Implicit Provider**: Updated `MqttPublisher.java` to treat `IMPLICIT` identically to `MQTT`. This unification ensures consistent behavior across all transport providers. Co-authored-by: noursaidi <9341216+noursaidi@users.noreply.github.com> --- .../python/samples/pointset/cov_throttling.py | 4 - .../samples/pointset/custom_point_factory.py | 123 ---- ...acy_with_handler.py => point_writeback.py} | 11 +- .../pointset/pointset_dynamic_provisioning.py | 3 - .../samples/pointset/telemetry_basic.py | 3 - .../pointset/telemetry_poll_bulk_provider.py | 101 --- .../pointset/telemetry_poll_callback.py | 7 +- .../samples/pointset/writeback_expiration.py | 116 --- .../writeback_modern_with_point_factory.py | 140 ---- clientlib/python/src/udmi/core/factory.py | 8 +- .../src/udmi/core/managers/point/__init__.py | 0 .../core/managers/point/abstract_point.py | 91 --- .../udmi/core/managers/point/basic_point.py | 346 --------- .../udmi/core/managers/point/bulk_provider.py | 24 - .../udmi/core/managers/point/virtual_point.py | 138 ---- .../udmi/core/managers/pointset_manager.py | 695 +++++++----------- .../core/managers/test_pointset_manager.py | 395 +--------- .../java/com/google/udmi/util/SiteModel.java | 2 +- .../iot/core/proxy/IotReflectorClient.java | 30 +- .../bos/iot/core/proxy/MqttPublisher.java | 16 +- 20 files changed, 334 insertions(+), 1919 deletions(-) delete mode 100644 clientlib/python/samples/pointset/custom_point_factory.py rename clientlib/python/samples/pointset/{writeback_legacy_with_handler.py => point_writeback.py} (89%) delete mode 100644 clientlib/python/samples/pointset/telemetry_poll_bulk_provider.py delete mode 100644 clientlib/python/samples/pointset/writeback_expiration.py delete mode 100644 clientlib/python/samples/pointset/writeback_modern_with_point_factory.py delete mode 100644 clientlib/python/src/udmi/core/managers/point/__init__.py delete mode 100644 clientlib/python/src/udmi/core/managers/point/abstract_point.py delete mode 100644 clientlib/python/src/udmi/core/managers/point/basic_point.py delete mode 100644 clientlib/python/src/udmi/core/managers/point/bulk_provider.py delete mode 100644 clientlib/python/src/udmi/core/managers/point/virtual_point.py diff --git a/clientlib/python/samples/pointset/cov_throttling.py b/clientlib/python/samples/pointset/cov_throttling.py index bce335771f..e7d8171ee1 100644 --- a/clientlib/python/samples/pointset/cov_throttling.py +++ b/clientlib/python/samples/pointset/cov_throttling.py @@ -48,10 +48,6 @@ def simulate_fast_sensor(manager: PointsetManager): """ Simulates a sensor reading that changes rapidly (every 1.0s). - - Represents an independently running hardware thread or fast I/O loop. It blindly - pushes raw data to the PointsetManager, illustrating that the manager itself takes - on the responsibility of throttling and interpreting Change of Value (cov_increment). """ val = 0.0 LOGGER.info("Starting sensor simulation (1.0s interval)...") diff --git a/clientlib/python/samples/pointset/custom_point_factory.py b/clientlib/python/samples/pointset/custom_point_factory.py deleted file mode 100644 index 94b1526884..0000000000 --- a/clientlib/python/samples/pointset/custom_point_factory.py +++ /dev/null @@ -1,123 +0,0 @@ -""" -Sample: Custom Point Implementation (Dependency Injection) - -This script demonstrates how to create a custom implementation of `BasicPoint` -and inject it into the `PointsetManager` using the `point_factory`. - -SCENARIO: -1. **CustomPoint**: A custom subclass of `BasicPoint` that simulates a "Sine Wave" - sensor reading directly within its `get_value()` method. -2. **Factory Injection**: We provide a custom factory callable to the `PointsetManager` - which instantiates our `CustomPoint` instead of the default. -3. **Result**: The application doesn't need an external update loop; - the point itself executes its reading logic pulled by the manager. -""" - -import logging -import math -import sys -import threading -import time -from typing import Any, Optional - -from udmi.core.factory import create_device, get_default_managers -from udmi.core.managers import PointsetManager -from udmi.core.managers.point.basic_point import BasicPoint -from udmi.schema import AuthProvider, Basic, EndpointConfiguration, PointPointsetModel - -# --- Configuration --- -DEVICE_ID = "AHU-2" -MQTT_HOSTNAME = "localhost" -MQTT_PORT = 1883 -BROKER_USERNAME = "pyudmi-device" -BROKER_PASSWORD = "somesecureword" -TOPIC_PREFIX = "/r/ZZ-TRI-FECTA/d/" - -logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(message)s') -LOGGER = logging.getLogger("CustomPointSample") - - -class SineWavePoint(BasicPoint): - """ - A custom Point that generates a sine wave reading natively. - Demonstrates encapsulation of data acquisition at the individual point level. - By overriding `get_value()`, the point itself becomes responsible for its own - state calculation without relying on external loops or global callbacks. - """ - def __init__(self, name: str, model: Optional[PointPointsetModel] = None): - super().__init__(name, model) - self._start_time = time.time() - LOGGER.info(f"Custom SineWavePoint '{name}' instantiated.") - - def get_value(self) -> Any: - # Simulate a sine wave over time - elapsed = time.time() - self._start_time - val = 20.0 + 5.0 * math.sin(elapsed / 10.0) - return round(val, 2) - - def set_value(self, value: Any) -> Any: - LOGGER.info(f"[{self._name}] Actuation order received applied: {value}") - return value - - def validate_value(self, value: Any) -> bool: - return True - - -def sine_wave_factory(name: str, model: Optional[PointPointsetModel] = None): - """ - Factory creating SineWavePoint for all points. - Acts as the Dependency Injection provider for the PointsetManager. When the - manager receives a configuration to manage a new point, it uses this factory - to instantiate the custom user-defined Point class instead of the default. - """ - return SineWavePoint(name, model) - - -if __name__ == "__main__": - try: - # 1. Configure Connection - client_id = f"{TOPIC_PREFIX}{DEVICE_ID}" - endpoint = EndpointConfiguration( - client_id=client_id, - hostname=MQTT_HOSTNAME, - port=MQTT_PORT, - topic_prefix=TOPIC_PREFIX, - auth_provider=AuthProvider( - basic=Basic( - username=BROKER_USERNAME, - password=BROKER_PASSWORD - ) - ) - ) - - # 2. Get Default Managers with Custom Point Factory - managers = get_default_managers(point_factory=sine_wave_factory) - - # 4. Create Device - device = create_device(endpoint, managers) - - # 5. Add Points - manager = device.get_manager(PointsetManager) - # These points will now use SineWavePoint implicitly via the factory! - manager.add_point("supply_air_temp") - manager.add_point("return_air_temp") - - # 6. Start Device in background - device_thread = threading.Thread(target=device.run, daemon=True) - device_thread.start() - - LOGGER.info(f"Device {DEVICE_ID} running using Custom Point Layer.") - print("\n" + "=" * 60) - print("DEMO INSTRUCTIONS:") - print("1. Observe individual 'SineWavePoint instantiated' logs.") - print("2. The point values are pulled directly inside get_value().") - print("3. There is no manual update_sensors loop needed!") - print("=" * 60 + "\n") - - # Keep alive - while True: - time.sleep(1) - - except KeyboardInterrupt: - LOGGER.info("Stopping...") - sys.exit(0) diff --git a/clientlib/python/samples/pointset/writeback_legacy_with_handler.py b/clientlib/python/samples/pointset/point_writeback.py similarity index 89% rename from clientlib/python/samples/pointset/writeback_legacy_with_handler.py rename to clientlib/python/samples/pointset/point_writeback.py index be7f678087..5c27537d5b 100644 --- a/clientlib/python/samples/pointset/writeback_legacy_with_handler.py +++ b/clientlib/python/samples/pointset/point_writeback.py @@ -49,9 +49,6 @@ def on_writeback(point_name: str, value: Any): """ Callback triggered when the Cloud sends a 'set_value' in the configuration. - Acts as the single global actuator function (Legacy approach). It is responsible - for parsing the writeback command, commanding the physical hardware, simulating - feedback, and returning the resulting ValueState or an explicit WritebackResult. This callback can return: - None / ValueState.applied -> Mark update as success (applied) @@ -108,11 +105,9 @@ def on_writeback(point_name: str, value: Any): pointset_manager = device.get_manager(PointsetManager) # --------------------------------------------------------------------- - # DEPRECATED: Register the global Writeback Handler - # --------------------------------------------------------------------- - # This is the legacy approach for point actuation. It is recommended to - # use the point_factory pattern and subclass AbstractPoint instead. - # See custom_point_factory.py for the modern approach. + # CRITICAL STEP: Register the Writeback Handler + # Without this, the library updates its internal state to match the config, + # but your physical device remains unchanged. # --------------------------------------------------------------------- pointset_manager.set_writeback_handler(on_writeback) LOGGER.info("Writeback handler registered.") diff --git a/clientlib/python/samples/pointset/pointset_dynamic_provisioning.py b/clientlib/python/samples/pointset/pointset_dynamic_provisioning.py index 25876d507e..89a6500d5a 100644 --- a/clientlib/python/samples/pointset/pointset_dynamic_provisioning.py +++ b/clientlib/python/samples/pointset/pointset_dynamic_provisioning.py @@ -50,9 +50,6 @@ def report_managed_points(manager: PointsetManager): """ Periodically logs the inventory of points the device is tracking. - A diagnostic loop serving to prove that the PointsetManager is successfully - allocating and managing new point abstractions dynamically based purely on the - arrival of cloud configuration payloads. """ last_set = set() diff --git a/clientlib/python/samples/pointset/telemetry_basic.py b/clientlib/python/samples/pointset/telemetry_basic.py index ac253de9d7..3570b51698 100644 --- a/clientlib/python/samples/pointset/telemetry_basic.py +++ b/clientlib/python/samples/pointset/telemetry_basic.py @@ -46,9 +46,6 @@ def update_sensors_loop(pointset_manager): """ Simulates a loop that reads hardware sensors and updates the PointsetManager. - Represents an independent Data Acquisition (DAQ) thread. It is completely decoupled - from the network telemetry reporting cycle. Its sole job is to keep the PointsetManager's - internal cache fresh with the latest hardware readings. """ LOGGER.info("Starting sensor simulation loop (Interval: 2s)...") diff --git a/clientlib/python/samples/pointset/telemetry_poll_bulk_provider.py b/clientlib/python/samples/pointset/telemetry_poll_bulk_provider.py deleted file mode 100644 index 9714ec5ba5..0000000000 --- a/clientlib/python/samples/pointset/telemetry_poll_bulk_provider.py +++ /dev/null @@ -1,101 +0,0 @@ -""" -Sample: Bulk Telemetry Provider - -This script demonstrates the modern approach for supplying hardware-read values -to the PointsetManager immediately before telemetry is published. By registering -a `BulkPointProvider`, the internal values of managed points are batched-updated. -""" - -import logging -import sys -import threading -import time -from typing import Any, Dict - -from udmi.core.factory import create_device, get_default_managers -from udmi.core.managers import PointsetManager -from udmi.core.managers.point.bulk_provider import BulkPointProvider -from udmi.schema import AuthProvider, Basic, EndpointConfiguration - -DEVICE_ID = "AHU-1" -REGISTRY_ID = "ZZ-TRI-FECTA" -MQTT_HOSTNAME = "localhost" -MQTT_PORT = 1883 -BROKER_USERNAME = "pyudmi-device" -BROKER_PASSWORD = "somesecureword" -TOPIC_PREFIX = "/r/ZZ-TRI-FECTA/d/" - -logging.basicConfig(level=logging.INFO, - format='%(asctime)s - %(levelname)s - %(message)s') -LOGGER = logging.getLogger("BulkProviderSample") - - -class MockModbusProvider(BulkPointProvider): - """ - A mock class simulating a connection to Modbus hardware that provides - all point data in a single batch read. - - Acts as the bridge between the physical hardware layer and the UDMI software - layer. It abstracts the I/O complexity (like serial bus polling) so the PointsetManager - receives data efficiently in bulk. - """ - def __init__(self): - self._mock_data = { - "temperature": 22.5, - "humidity": 45.0, - "pressure": 1013.2 - } - - def read_points(self) -> Dict[str, Any]: - """ - Provides the batch telemetry for UDMI. - - Executes the physical read against sensors during the start of every telemetry - transmission window. Returns a dictionary mapping point names to their values. - """ - LOGGER.info(f"Modbus batch read yielding: {self._mock_data}") - self._mock_data["temperature"] += 0.1 - return self._mock_data - - -if __name__ == "__main__": - try: - client_id = f"{TOPIC_PREFIX}{DEVICE_ID}" - endpoint = EndpointConfiguration( - client_id=client_id, - hostname=MQTT_HOSTNAME, - port=MQTT_PORT, - topic_prefix=TOPIC_PREFIX, - auth_provider=AuthProvider( - basic=Basic( - username=BROKER_USERNAME, - password=BROKER_PASSWORD - ) - ) - ) - - # Initialize the provider and pass it to the default managers factory - modbus_hw = MockModbusProvider() - managers = get_default_managers(points_bulk_provider=modbus_hw) - - device = create_device(endpoint, managers) - pointset_manager = device.get_manager(PointsetManager) - - # For the sake of the sample, we add the points explicitly. - # Normally these would be provisioned via cloud `config.pointset`. - pointset_manager.add_point("temperature") - pointset_manager.add_point("humidity") - pointset_manager.add_point("pressure") - - t = threading.Thread(target=device.run, daemon=True) - t.start() - - LOGGER.info(f"Device {DEVICE_ID} is running.") - LOGGER.info("The application will read from the provider automatically based on sample_rate_sec.") - - while True: - time.sleep(1) - - except KeyboardInterrupt: - LOGGER.info("Stopping...") - sys.exit(0) diff --git a/clientlib/python/samples/pointset/telemetry_poll_callback.py b/clientlib/python/samples/pointset/telemetry_poll_callback.py index ecd60acac5..96e1e73ef2 100644 --- a/clientlib/python/samples/pointset/telemetry_poll_callback.py +++ b/clientlib/python/samples/pointset/telemetry_poll_callback.py @@ -47,10 +47,9 @@ def my_sensor_poll() -> Dict[str, Any]: """ Called by the PointsetManager background thread. - Provides a Just-in-Time (Pull) mechanism for data acquisition. Instead of - running a separate DAQ loop, the user delegates the polling rhythm to the - PointsetManager. This function must perform synchronous hardware reads and - return a dictionary of {point_name: value}. + + This function should perform the actual hardware reads (e.g. I2C, Modbus). + It must return a dictionary of {point_name: value}. """ # Simulate reading hardware registers temp = round(random.uniform(20.0, 25.0), 2) diff --git a/clientlib/python/samples/pointset/writeback_expiration.py b/clientlib/python/samples/pointset/writeback_expiration.py deleted file mode 100644 index 458c812e56..0000000000 --- a/clientlib/python/samples/pointset/writeback_expiration.py +++ /dev/null @@ -1,116 +0,0 @@ -""" -Sample: Pointset Writeback Expiration (set_value_expiry) - -This script demonstrates the UDMI writeback expiration functionality. -When a point's value is overriden via a `set_value` config command, -the cloud can optionally provide a `set_value_expiry` timestamp. If the -cloud does not refresh the configuration before this timestamp, the point -will automatically revert to its base state. - -SCENARIO: -1. **CustomPoint**: A simple subclass of `BasicPoint` that prints when its value changes. -2. **Actuation**: You publish a config setting 'override_point' to 100 with an expiry in 5 seconds. -3. **Expiration**: The UDMI `PointsetManager` detects the expiration and automatically - clears the state, reverting the point back to its default reading. -""" - -import logging -import sys -import threading -import time -from datetime import datetime, timedelta, timezone -from typing import Any, Optional - -from udmi.core.factory import create_device, get_default_managers -from udmi.core.managers import PointsetManager -from udmi.core.managers.point.basic_point import BasicPoint -from udmi.schema import AuthProvider, Basic, EndpointConfiguration, PointPointsetModel - -# --- Configuration --- -DEVICE_ID = "AHU-3" -MQTT_HOSTNAME = "localhost" -MQTT_PORT = 1883 -BROKER_USERNAME = "pyudmi-device" -BROKER_PASSWORD = "somesecureword" -TOPIC_PREFIX = "/r/ZZ-TRI-FECTA/d/" - -logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(message)s') -LOGGER = logging.getLogger("WritebackExpirationSample") - - -class ExpiringPoint(BasicPoint): - """ - A custom Point that defaults to 0 but can be overriden by writebacks. - """ - def __init__(self, name: str, model: Optional[PointPointsetModel] = None): - super().__init__(name, model) - self._writable = True # Override basic_point to make it always writable for the sample - - def get_value(self) -> Any: - return 0.0 - - def set_value(self, value: Any) -> Any: - LOGGER.info(f"[{self._name}] Actuation order received: Setting to {value}") - return value - - def validate_value(self, value: Any) -> bool: - return True - -def expiring_point_factory(name: str, model: Optional[PointPointsetModel] = None): - return ExpiringPoint(name, model) - -if __name__ == "__main__": - try: - # 1. Configure Connection - client_id = f"{TOPIC_PREFIX}{DEVICE_ID}" - endpoint = EndpointConfiguration( - client_id=client_id, - hostname=MQTT_HOSTNAME, - port=MQTT_PORT, - topic_prefix=TOPIC_PREFIX, - auth_provider=AuthProvider( - basic=Basic( - username=BROKER_USERNAME, - password=BROKER_PASSWORD - ) - ) - ) - - # 2. Get Default Managers with Custom Point Factory - managers = get_default_managers(point_factory=expiring_point_factory) - - # 3. Create Device - device = create_device(endpoint, managers) - - # 4. Add Points - manager = device.get_manager(PointsetManager) - manager.add_point("override_point") - - # 5. Start Device - device_thread = threading.Thread(target=device.run, daemon=True) - device_thread.start() - - LOGGER.info(f"Device {DEVICE_ID} running.") - - # Calculate timestamps for the demo - now = datetime.now(timezone.utc) - config_ts = now.isoformat(timespec='seconds') - expiry_ts = (now + timedelta(seconds=10)).isoformat(timespec='seconds') - - print("\n" + "=" * 60) - print("DEMO INSTRUCTIONS:") - print("Publish the following JSON config using mosquitto_pub to trigger a 10s expiration override:") - print("-" * 20) - print(f"mosquitto_pub -h {MQTT_HOSTNAME} -p {MQTT_PORT} -u {BROKER_USERNAME} -P {BROKER_PASSWORD} -t '{TOPIC_PREFIX}{DEVICE_ID}/config' -m \\") - print(f" '{{ \"timestamp\": \"{config_ts}\", \"pointset\": {{ \"set_value_expiry\": \"{expiry_ts}\", \"points\": {{ \"override_point\": {{ \"set_value\": 100.0 }} }} }} }}'") - print("-" * 20) - print("Once sent, watch the logs. In 10 seconds, the point will automatically revert.") - print("=" * 60 + "\n") - - # Keep alive - while True: - time.sleep(1) - - except KeyboardInterrupt: - LOGGER.info("Stopping...") - sys.exit(0) diff --git a/clientlib/python/samples/pointset/writeback_modern_with_point_factory.py b/clientlib/python/samples/pointset/writeback_modern_with_point_factory.py deleted file mode 100644 index c988504e3d..0000000000 --- a/clientlib/python/samples/pointset/writeback_modern_with_point_factory.py +++ /dev/null @@ -1,140 +0,0 @@ -""" -Sample: Writeback (Modern Approach via Custom Point) - -This script demonstrates how to handle "Set Value" commands from the cloud -using the modern approach: encapsulating the actuation logic directly within -a custom Point implementation. - -SCENARIO: -1. **CustomPoint**: A custom subclass of `BasicPoint` that overrides `set_value`. -2. **Actuation**: You send a config with `"set_value": 24.0` for 'thermostat_target'. -3. **Encapsulation**: The manager routes the writeback to the specific point's `set_value()`. -4. **Result**: The point handles validation, simulates hardware feedback, and returns the state. -""" - -import logging -import sys -import threading -import time -from typing import Any, Optional - -from udmi.core.factory import create_device, get_default_managers -from udmi.core.managers import PointsetManager, WritebackResult -from udmi.core.managers.point.basic_point import BasicPoint -from udmi.schema import AuthProvider, Basic, EndpointConfiguration, PointPointsetModel -from udmi.schema import Entry, ValueState - -# --- CONFIGURATION --- -DEVICE_ID = "AHU-1" -MQTT_HOSTNAME = "localhost" -MQTT_PORT = 1883 -BROKER_USERNAME = "pyudmi-device" -BROKER_PASSWORD = "somesecureword" -TOPIC_PREFIX = "/r/ZZ-TRI-FECTA/d/" - -logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') -LOGGER = logging.getLogger("WritebackModernSample") - - -class ActuatingPoint(BasicPoint): - """ - A custom Point that handles its own underlying hardware writebacks. - - Replaces the legacy global callback by encapsulating the actuation - (hardware interaction) logic securely inside the point's set_value implementation. - """ - def __init__(self, name: str, model: Optional[PointPointsetModel] = None): - super().__init__(name, model) - # We manually mark this True for the demo, normally set by cloud metadata - self._writable = True - - def get_value(self) -> Any: - return 0.0 - - def set_value(self, value: Any) -> Any: - """ - Invoked automatically by the manager when a writeback occurs. - Can return ValueState, WritebackResult, or silently return None/ValueState.applied. - """ - LOGGER.info("!" * 60) - LOGGER.info(f"[{self._name}] HARDWARE ACTUATION: Set To {value}") - LOGGER.info("!" * 60) - - # Example 1: Validation failure - if isinstance(value, (int, float)) and value < 0: - LOGGER.warning(f"[{self._name}] Rejecting negative value: {value}") - return ValueState.invalid - - # Example 2: Simulating mechanical failure - if value == 999.0: - LOGGER.error(f"[{self._name}] Simulating mechanical jam") - return WritebackResult( - value_state=ValueState.failure, - status=Entry(message="Actuator mechanical jam identified", level=500) - ) - - # Example 3: Crash - if value == 666.0: - LOGGER.error(f"[{self._name}] Simulating crash") - raise RuntimeError("Unexpected write failure crash") - - time.sleep(0.1) - LOGGER.info(f"[{self._name}] Hardware confirmed actuation.") - return ValueState.applied - - def validate_value(self, value: Any) -> bool: - return True - - -def actuating_point_factory(name: str, model: Optional[PointPointsetModel] = None): - return ActuatingPoint(name, model) - - -if __name__ == "__main__": - try: - client_id = f"{TOPIC_PREFIX}{DEVICE_ID}" - endpoint = EndpointConfiguration( - client_id=client_id, - hostname=MQTT_HOSTNAME, - port=MQTT_PORT, - topic_prefix=TOPIC_PREFIX, - auth_provider=AuthProvider( - basic=Basic( - username=BROKER_USERNAME, - password=BROKER_PASSWORD - ) - ) - ) - - managers = get_default_managers(point_factory=actuating_point_factory) - device = create_device(endpoint, managers) - pointset_manager = device.get_manager(PointsetManager) - - pointset_manager.add_point("thermostat_target") - - t = threading.Thread(target=device.run, daemon=True) - t.start() - - LOGGER.info(f"Device {DEVICE_ID} is running.") - - print("\n" + "=" * 60) - print("DEMO INSTRUCTIONS:") - print("Publish JSON config updates using mosquitto_pub:") - print("-" * 20) - print("1. Success case (Applied):") - print(f" mosquitto_pub ... -m '{{ \"pointset\": {{ \"points\": {{ \"thermostat_target\": {{ \"set_value\": 24.0 }} }} }} }}'") - print("\n2. Invalid case (-5.0):") - print(f" mosquitto_pub ... -m '{{ \"pointset\": {{ \"points\": {{ \"thermostat_target\": {{ \"set_value\": -5.0 }} }} }} }}'") - print("\n3. Failure case (999.0):") - print(f" mosquitto_pub ... -m '{{ \"pointset\": {{ \"points\": {{ \"thermostat_target\": {{ \"set_value\": 999.0 }} }} }} }}'") - print("\n4. Crash case (666.0):") - print(f" mosquitto_pub ... -m '{{ \"pointset\": {{ \"points\": {{ \"thermostat_target\": {{ \"set_value\": 666.0 }} }} }} }}'") - print("-" * 20) - print("=" * 60 + "\n") - - while True: - time.sleep(1) - - except KeyboardInterrupt: - LOGGER.info("Stopping...") - sys.exit(0) diff --git a/clientlib/python/src/udmi/core/factory.py b/clientlib/python/src/udmi/core/factory.py index 5e35783b44..dc224a259f 100644 --- a/clientlib/python/src/udmi/core/factory.py +++ b/clientlib/python/src/udmi/core/factory.py @@ -60,19 +60,13 @@ def get_default_managers(**kwargs) -> List[BaseManager]: **kwargs: system_state (SystemState): Initial system state for SystemManager. sample_rate_sec (int): Sample rate for PointsetManager. - point_factory (Callable): Custom point factory for PointsetManager. - points_bulk_provider (BulkPointProvider): Optional provider for bulk telemetry reads. """ system_state = kwargs.get('system_state') sample_rate_sec = kwargs.get('sample_rate_sec', DEFAULT_SAMPLE_RATE_SEC) - point_factory = kwargs.get('point_factory') - points_bulk_provider = kwargs.get('points_bulk_provider') return [ SystemManager(system_state=system_state), - PointsetManager(sample_rate_sec=sample_rate_sec, - point_factory=point_factory, - bulk_provider=points_bulk_provider), + PointsetManager(sample_rate_sec=sample_rate_sec), LocalnetManager(), GatewayManager(), DiscoveryManager() diff --git a/clientlib/python/src/udmi/core/managers/point/__init__.py b/clientlib/python/src/udmi/core/managers/point/__init__.py deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/clientlib/python/src/udmi/core/managers/point/abstract_point.py b/clientlib/python/src/udmi/core/managers/point/abstract_point.py deleted file mode 100644 index 0c6fd44f95..0000000000 --- a/clientlib/python/src/udmi/core/managers/point/abstract_point.py +++ /dev/null @@ -1,91 +0,0 @@ -"""Abstract point definition.""" -import abc -from typing import Any - -from udmi.schema import PointPointsetConfig -from udmi.schema import PointPointsetEvents -from udmi.schema import PointPointsetState -from udmi.schema import PointPointsetModel -from udmi.schema import RefDiscovery - - -class AbstractPoint(abc.ABC): - """ - Interface representing a point reading. - Defines the strict contract that any point implementation must adhere to, - ensuring the PointsetManager remains agnostic to data acquisition details. - - Defines lifecycle methods (update_data, set_config, set_model) and state - reporting methods (get_data, get_state, is_dirty) that concrete classes - must implement to align with the UDMI specification. - """ - - @abc.abstractmethod - def get_name(self) -> str: - """ - Provides the string identifier that maps to the pointset block in UDMI payloads. - """ - - @abc.abstractmethod - def get_data(self) -> PointPointsetEvents: - """ - Provides the compiled telemetry data payload for this point to be included - in PointsetEvents. - """ - - @abc.abstractmethod - def update_data(self) -> None: - """ - Instructs the point to perform any necessary IO or state checks to refresh - its telemetry data. - """ - - @abc.abstractmethod - def is_dirty(self) -> bool: - """ - Returns if the state of the point has changed. - Indicates to the PointsetManager whether this point's state needs to be re-aggregated - and a new state_etag generated because of configuration or status changes. - """ - - @abc.abstractmethod - def get_state(self) -> PointPointsetState: - """ - Provides the compiled state payload (including value_state and status) for PointsetState. - """ - - @abc.abstractmethod - def set_config(self, config: PointPointsetConfig, **kwargs: 'Any') -> None: - """ - Sets the config of the point. - Applies dynamic changes (like writebacks via set_value) from a cloud config update. - """ - - @abc.abstractmethod - def should_report(self, sample_rate_sec: int) -> bool: - """ - Determines if this point needs to be reported. - Evaluates the Change of Value (COV) tolerance and Heartbeat interval to inform - the PointsetManager whether this point should be included in the next telemetry payload. - """ - - @abc.abstractmethod - def mark_reported(self) -> None: - """ - Updates the reporting state after a successful publish. - Caches the last reported value and timestamp to reset COV and Heartbeat state machines. - """ - - @abc.abstractmethod - def set_model(self, model: PointPointsetModel) -> None: - """ - Applies static definition from Metadata. - Initializes point properties (like units, type, or writable flags) during device startup. - """ - - @abc.abstractmethod - def enumerate(self) -> RefDiscovery: - """ - Returns discovery information for this point. - Formats the point's static details for inclusion in the Discovery block. - """ diff --git a/clientlib/python/src/udmi/core/managers/point/basic_point.py b/clientlib/python/src/udmi/core/managers/point/basic_point.py deleted file mode 100644 index 10b6e4d9b9..0000000000 --- a/clientlib/python/src/udmi/core/managers/point/basic_point.py +++ /dev/null @@ -1,346 +0,0 @@ -"""Basic point definition.""" -import abc -import copy -import threading -import time -from datetime import datetime -from datetime import timezone -from typing import Any -from typing import Callable -from typing import Optional - -from udmi.core.managers.point.abstract_point import AbstractPoint -from udmi.schema import Category -from udmi.schema import Entry -from udmi.schema import PointPointsetConfig -from udmi.schema import PointPointsetEvents -from udmi.schema import PointPointsetModel -from udmi.schema import PointPointsetState -from udmi.schema import ValueState -from udmi.schema import RefDiscovery - -DEFAULT_HEARTBEAT_SEC = 600 - -class BasicPoint(AbstractPoint): # pylint: disable=too-many-instance-attributes - """ - Abstract representation of a basic data point. - - Provides the foundational boilerplate logic for a point, including state management, - dirty flag handling, validation, and Change of Value (COV) calculations. Developers - subclass this to implement hardware-specific get_value and set_value mechanics. - - State Machine Details: - - Writable values move through states: None -> applied, invalid, or failure based on config. - - Dirty State: The _dirty flag is set when config, value_state, or status changes, - triggering the PointsetManager to regenerate the global device state and state_etag. - - Reporting State: Tracks _last_reported_value and _last_reported_time to accurately - execute COV and periodic heartbeat reporting logic. - """ - - def __init__(self, name: str, model: Optional[PointPointsetModel] = None): - self._name = name - self._data = PointPointsetEvents() - self._state = PointPointsetState() - - self._writable = model.writable if (model and model.writable) else False - self._ref = model.ref if model else None - - if model and model.units: - self._state.units = model.units - - self._written = False - self._dirty = True - - self._expiry_time: Optional[float] = None - - # Store full model for subclass extensibility - self._model = model - - # COV and Heartbeat - self._cov_increment: Optional[float] = None - self._last_reported_value: Any = None - self._last_reported_time: float = 0.0 - - def get_name(self) -> str: - return self._name - - def get_data(self) -> PointPointsetEvents: - return self._data - - def is_dirty(self) -> bool: - return self._dirty - - def get_state(self) -> PointPointsetState: - self._dirty = False - return self._state - - @property - def value_state(self): - """Gets value state.""" - return self._state.value_state - - @value_state.setter - def value_state(self, value): - """Sets value state.""" - self._state.value_state = value - self._dirty = True - - @property - def status(self): - """Gets status.""" - return self._state.status - - @status.setter - def status(self, value): - """Sets status.""" - self._state.status = value - self._dirty = True - - def update_data(self) -> None: - """ - Updates the telemetry data from the implementation source. - Calls the abstract get_value method to refresh the point's data payload, - unless a writeback is currently active (in which case it retains the written value). - """ - if not self._written: - self._data.present_value = self.get_value() - - def set_model(self, model: PointPointsetModel) -> None: - """ - Applies static definition from Metadata. - Initializes point properties (like units or ref overrides) during device startup - and flags the point as dirty so the State is regenerated. - """ - if not model: - return - if model.units: - self._state.units = model.units - if model.ref: - self._ref = model.ref - self._dirty = True - - def clear_writeback(self) -> None: - """ - Clears the writeback state and reverts to base state. - Handles the expiration of a set_value_expiry timer by wiping the overriden value - state and forcing an immediate re-read from the underlying hardware. - """ - self._written = False - self._state.value_state = None - self._state.status = None - self._expiry_time = None - self.update_data() - self._dirty = True - - def set_config(self, config: PointPointsetConfig, **kwargs: 'Any') -> None: - """ - Set the configuration for this point, nominally to indicate writing a value. - Entrypoint for the PointsetManager to push config updates or writebacks down to the point. - Checks if the payload changed and manages the _dirty flag. - """ - previous_value_state = self._state.value_state - previous_status = copy.deepcopy(self._state.status) - - invalid_expiry = kwargs.get('invalid_expiry', False) - is_expired = kwargs.get('is_expired', False) - on_state_change = kwargs.get('on_state_change') - - self._update_state_config(config, invalid_expiry, is_expired, on_state_change) - - state_changed = self._state.value_state != previous_value_state - status_changed = self._state.status != previous_status - - if state_changed or status_changed: - self._dirty = True - - def _update_state_config(self, - config: Optional[PointPointsetConfig], - invalid_expiry: bool = False, - is_expired: bool = False, - on_state_change: Optional[Callable] = None) -> None: - # pylint: disable=too-many-return-statements - """ - Update the state of this point based off of a new config. - The core writeback state machine logic. Validates ref matching, expiry timestamps, - writeability, and delegates to the hardware validation logic. Executes hardware - actuation asynchronously, immediately transitioning into `updating` state, and - bubbles up Entries once `applied` or `failure` state is reached. - """ - self._state.status = None - - if config: - if config.cov_increment is not None: - self._cov_increment = config.cov_increment - if config.units is not None: - self._state.units = config.units - - # 1. Validate Ref - if config is not None and config.ref != self._ref: - self._state.status = self._create_entry( - Category.POINTSET_POINT_FAILURE, "Invalid point ref" - ) - return - - # 2. Check if set_value is present (Release/Null check) - if config is None or config.set_value is None: - self.clear_writeback() - return - - if invalid_expiry: - self.clear_writeback() - self._state.status = self._create_entry( - Category.POINTSET_POINT_INVALID, "Invalid or missing set_value_expiry" - ) - self._state.value_state = ValueState.invalid - return - - if is_expired: - self.clear_writeback() - return - - # 3. Validate Value - try: - if not self.validate_value(config.set_value): - self._state.status = self._create_entry( - Category.POINTSET_POINT_INVALID, - "Written value is not valid" - ) - self._state.value_state = ValueState.invalid - return - except Exception as ex: # pylint: disable=broad-exception-caught - self._state.status = self._create_entry( - Category.POINTSET_POINT_FAILURE, str(ex) - ) - self._state.value_state = ValueState.failure - return - - # 4. Check Writable - if not self._writable: - self._state.status = self._create_entry( - Category.POINTSET_POINT_FAILURE, "Point is not writable" - ) - self._state.value_state = ValueState.failure - return - - # 5. Apply Value - def execute_writeback(val: Any) -> None: - try: - result = self.set_value(val) - self._data.present_value = result - self.value_state = ValueState.applied - self._written = True - except Exception as ex: # pylint: disable=broad-exception-caught - self.status = self._create_entry( - Category.POINTSET_POINT_FAILURE, str(ex) - ) - self.value_state = ValueState.failure - finally: - if on_state_change: - on_state_change() - - self._state.value_state = ValueState.updating - thread = threading.Thread(target=execute_writeback, args=(config.set_value,)) - thread.daemon = True - thread.start() - - def _create_entry(self, category: Category, message: str) -> Entry: - """Helper to create a status Entry object.""" - entry = Entry() - entry.detail = f"Point {self._name} (writable {self._writable})" - entry.timestamp = datetime.now(timezone.utc).isoformat() - entry.message = message - entry.category = category - entry.level = category.level - return entry - - @abc.abstractmethod - def get_value(self) -> Any: - """ - Return the current reading from the source. - Abstract method where subclasses implement their read operations (IO, GPIO, Modbus, etc). - """ - - @abc.abstractmethod - def set_value(self, value: Any) -> Any: - """ - Write the value to the source and return the applied value. - Abstract method where subclasses implement their hardware actuation logic. - Should return the value that was actually confirmed by the hardware. - """ - - @abc.abstractmethod - def validate_value(self, value: Any) -> bool: - """ - Check if the value is valid for this point. - Abstract hook for subclasses to reject writebacks (e.g., bounds checking) before - an attempt to write to hardware is made. - """ - - @abc.abstractmethod - def _populate_enumeration(self, point: RefDiscovery) -> None: - """Hook for subclasses to populate extra enumeration fields.""" - - def enumerate(self) -> RefDiscovery: - """ - Returns discovery information for this point. - Aggregates subclass-specific data with base properties to compile the Discovery payload. - """ - point = RefDiscovery() - point.description = f"{self.__class__.__name__} {self.get_name()}" - point.writable = True if self._writable else None - if self._state.units: - point.units = self._state.units - if self._ref: - point.ref = self._ref - self._populate_enumeration(point) - return point - - def should_report(self, sample_rate_sec: int) -> bool: - """ - Determines if this point needs to be reported based on Change of Value (COV) - and Heartbeat logic. - Compares the current present_value against the _last_reported_value to invoke COV - reporting, otherwise falls back to evaluating the _last_reported_time against the - heartbeat interval. - """ - if self._data.present_value is None: - return False - - now = time.time() - - if self._last_reported_value is None: - return True - - should_report_cov = False - - if self._data.present_value != self._last_reported_value: - is_numeric = (isinstance(self._data.present_value, (int, float)) and - not isinstance(self._data.present_value, bool) and - isinstance(self._last_reported_value, (int, float)) and - not isinstance(self._last_reported_value, bool)) - - if is_numeric and self._cov_increment is not None: - delta = abs(self._data.present_value - self._last_reported_value) - # Ensure we handle casting just in case - if delta >= self._cov_increment: - should_report_cov = True - else: - should_report_cov = True - - if should_report_cov: - return True - - # Default heartbeat or sample rate - heartbeat_interval = sample_rate_sec if sample_rate_sec > 0 else DEFAULT_HEARTBEAT_SEC - if (now - self._last_reported_time) >= heartbeat_interval: - return True - - return False - - def mark_reported(self) -> None: - """ - Updates the reporting state after a successful publish. - Caches the last reported value and timestamp to reset COV and Heartbeat state machines. - """ - self._last_reported_value = self._data.present_value - self._last_reported_time = time.time() diff --git a/clientlib/python/src/udmi/core/managers/point/bulk_provider.py b/clientlib/python/src/udmi/core/managers/point/bulk_provider.py deleted file mode 100644 index 1837155d44..0000000000 --- a/clientlib/python/src/udmi/core/managers/point/bulk_provider.py +++ /dev/null @@ -1,24 +0,0 @@ -""" -Provides the BulkPointProvider interface. -""" -from abc import ABC, abstractmethod -from typing import Any, Dict - -class BulkPointProvider(ABC): # pylint: disable=too-few-public-methods - """ - Interface for providing bulk telemetry reads to the PointsetManager. - This interface allows for batch reading of hardware points (e.g., Modbus, BACnet) at - the start of a telemetry cycle, significantly reducing IO overhead compared to - polling points individually. - """ - - @abstractmethod - def read_points(self) -> Dict[str, Any]: - """ - Reads the current values of all supported points from hardware. - Executes a batch IO operation to retrieve the latest sensor readings and returns them - as a mapping to be injected into the PointsetManager's pipeline. - - Returns: - Dict[str, Any]: A mapping of point names to their present values. - """ diff --git a/clientlib/python/src/udmi/core/managers/point/virtual_point.py b/clientlib/python/src/udmi/core/managers/point/virtual_point.py deleted file mode 100644 index e8c9773517..0000000000 --- a/clientlib/python/src/udmi/core/managers/point/virtual_point.py +++ /dev/null @@ -1,138 +0,0 @@ -"""Virtual point definition.""" -import warnings -from typing import Any -from typing import Optional - -from udmi.core.managers.point.basic_point import BasicPoint -from udmi.schema import PointPointsetModel -from udmi.schema import RefDiscovery - - -class Point(BasicPoint): - """ - Default concrete implementation of a Point. - Acts as an in-memory "virtual" point that simply stores and retrieves a present value. - Useful for testing, simulations, and software-only variables where no hardware integration - is needed. - - - Maintains a local _present_value. - - Inherits COV and heartbeat state machine from BasicPoint. - - Writebacks instantly apply to the in-memory value without hardware IO operations. - """ - - def __init__(self, name: str, model: Optional[PointPointsetModel] = None): - super().__init__(name, model) - # Seed with baseline_value from model if present - self._present_value: Any = ( - model.baseline_value if (model and model.baseline_value is not None) else None - ) - - def get_value(self) -> Any: - """ - Overrides the abstract get_value to return the in-memory _present_value. - """ - return self._present_value - - def set_value(self, value: Any) -> Any: - """ - Concrete implementation of set_value, returning the value as applied. - Accepts the set_value command from the cloud and applies it directly to the - in-memory state. - """ - return value - - def validate_value(self, value: Any) -> bool: - """ - Default validation that takes all values as valid. - Since this is a virtual point, it places no restrictions on the type or format - of the data written to it. - """ - return True - - def set_present_value(self, value: Any) -> None: - """ - API for manual injection of values (e.g., from sample scripts). - Provides an external interface to mutate the point's value and optionally - resets its error status to simulate hardware recovery. - """ - self._present_value = value - if self.status and self.status.level >= 500: - self.status = None - - def _populate_enumeration(self, point: RefDiscovery) -> None: - """Concrete implementation doing nothing by default.""" - - # ========================================================================= - # BACKWARD COMPATIBILITY LAYER - # ========================================================================= - - @property - def present_value(self): - """ - Get the point's present value. - """ - warnings.warn( - "Direct access to 'present_value' is deprecated in v2.0.0. " - "Use 'get_data().present_value'.", - DeprecationWarning, stacklevel=2 - ) - return self.get_data().present_value - - @present_value.setter - def present_value(self, value): - """ - Set the point's present value. - """ - warnings.warn( - "Direct setting of 'present_value' is deprecated. " - "Use 'set_present_value(value)'.", - DeprecationWarning, stacklevel=2 - ) - self.set_present_value(value) - - @property - def units(self): - """ - Get the point's units. - """ - warnings.warn( - "Direct access to 'units' is deprecated. " - "Use 'get_state().units'.", - DeprecationWarning, stacklevel=2 - ) - return self.get_state().units - - @units.setter - def units(self, value): - """ - Set the point's units. - """ - warnings.warn( - "Direct setting of 'units' is deprecated. " - "Update metadata models instead.", - DeprecationWarning, stacklevel=2 - ) - self._state.units = value - self._dirty = True - - def get_event(self): - """ - Provides the compiled telemetry data payload for this point to be included - in PointsetEvents. - """ - warnings.warn( - "'get_event()' is deprecated. Use 'get_data()'.", - DeprecationWarning, stacklevel=2 - ) - return self.get_data() - - def update_config(self, config) -> bool: - """ - Update the configuration for this point and return true if state changed. - """ - warnings.warn( - "'update_config()' is deprecated. Use 'set_config()'.", - DeprecationWarning, stacklevel=2 - ) - self.set_config(config) - return self.is_dirty() diff --git a/clientlib/python/src/udmi/core/managers/pointset_manager.py b/clientlib/python/src/udmi/core/managers/pointset_manager.py index 2945c8eeec..96d01fd80e 100644 --- a/clientlib/python/src/udmi/core/managers/pointset_manager.py +++ b/clientlib/python/src/udmi/core/managers/pointset_manager.py @@ -4,13 +4,10 @@ This manager is responsible for handling the 'pointset' block of the config and state, and for reporting pointset telemetry events. """ -import hashlib -import json + import logging import threading import time -import warnings -from dataclasses import dataclass from datetime import datetime from datetime import timezone from typing import Any @@ -19,16 +16,17 @@ from typing import Mapping from typing import Optional from typing import Union +from dataclasses import dataclass from udmi.constants import UDMI_VERSION from udmi.core.managers.base_manager import BaseManager -from udmi.core.managers.point.abstract_point import AbstractPoint -from udmi.core.managers.point.bulk_provider import BulkPointProvider -from udmi.core.managers.point.virtual_point import Point from udmi.schema import Config from udmi.schema import Entry from udmi.schema import Metadata from udmi.schema import PointPointsetConfig +from udmi.schema import PointPointsetEvents +from udmi.schema import PointPointsetModel +from udmi.schema import PointPointsetState from udmi.schema import PointsetEvents from udmi.schema import PointsetState from udmi.schema import State @@ -39,7 +37,6 @@ DEFAULT_SAMPLE_RATE_SEC = 10 DEFAULT_HEARTBEAT_SEC = 600 - @dataclass class WritebackResult: """ @@ -50,117 +47,237 @@ class WritebackResult: # Callback signature: function(point_name, value) -> Optional[ValueState | WritebackResult] -WritebackHandler = Callable[ - [str, Any], Union[None, ValueState, WritebackResult]] +WritebackHandler = Callable[[str, Any], Union[None, ValueState, WritebackResult]] # Poll Callback now returns a dictionary of {point_name: value} PollCallback = Callable[[], Dict[str, Any]] -class PointsetManager(BaseManager): # pylint: disable=too-many-instance-attributes +class Point: + """ + Represents a single data point within the Pointset. + Acts as a container for value, configuration, and status. + Tracks its own reporting state (last reported value/time) for COV logic. + """ + + # pylint: disable=too-many-instance-attributes + + def __init__(self, name: str): + """ + Initializes a new Point. + + Args: + name: The unique identifier for this point (e.g., 'temp_sensor_1'). + """ + self.name = name + self.ref = None + self.present_value: Any = None + self.units: Optional[str] = None + self.status: Optional[Entry] = None + self.value_state: Optional[str] = None + + # Writeback + self.set_value: Any = None + + # Reporting Configuration + self.cov_increment: Optional[float] = None + + # Reporting State + self.last_reported_value: Any = None + self.last_reported_time: float = 0.0 + + def set_model(self, model: PointPointsetModel) -> None: + """ + Applies static definition from Metadata. + + Args: + model: The metadata model defining static properties like units. + """ + if model.units: + self.units = model.units + if model.ref: + self.ref = model.ref + + def set_present_value(self, value: Any) -> None: + """ + Updates the current reading of the point. + + This updates the internal state but does not trigger immediate reporting. + Reporting is handled by the `should_report` check during the telemetry loop. + + Args: + value: The new value reading. + """ + self.present_value = value + if self.status and self.status.level >= 500: + self.status = None + + def update_config(self, config: PointPointsetConfig) -> bool: + """ + Updates point metadata based on config. + + Args: + config: The configuration object for this specific point. + + Returns: + True if a writeback (set_value change) occurred, requiring external handling. + False otherwise. + """ + if config.units is not None: + self.units = config.units + if config.cov_increment is not None: + self.cov_increment = config.cov_increment + + dirty = False + if config.set_value is not None: + self.value_state = None + self.status = None + + if config.set_value != self.set_value: + self.set_value = config.set_value + dirty = True + + self.value_state = ValueState.applied + + return dirty + + def get_state(self) -> PointPointsetState: + """ + Returns the state representation of this point. + + Returns: + A PointPointsetState object suitable for serialization in the state block. + """ + return PointPointsetState( + status=self.status, + value_state=self.value_state, + units=self.units + ) + + def get_event(self) -> PointPointsetEvents: + """ + Returns the telemetry event representation of this point. + + Returns: + A PointPointsetEvents object containing the present value. + """ + return PointPointsetEvents( + present_value=self.present_value + ) + + def should_report(self, sample_rate_sec: int) -> bool: + """ + Determines if this point needs to be reported based on Change of Value (COV) + and Heartbeat logic. + + Logic: + 1. If never reported, report immediately. + 2. If value changed > cov_increment, report immediately. + 3. If time since last report > heartbeat_interval, report immediately. + + Args: + sample_rate_sec: The global sample rate (used to calculate heartbeat). + + Returns: + True if the point should be included in the next telemetry message. + """ + if self.present_value is None: + return False + + # Always report if never reported before + if self.last_reported_value is None: + return True + + now = time.time() + should_report_cov = False + + # Check Change of Value (COV) + if self.present_value != self.last_reported_value: + is_numeric = (isinstance(self.present_value, (int, float)) and + isinstance(self.last_reported_value, (int, float))) + + if is_numeric and self.cov_increment is not None: + delta = abs(self.present_value - self.last_reported_value) + if delta >= self.cov_increment: + should_report_cov = True + else: + # Non-numeric or no COV threshold set -> Report any change + should_report_cov = True + + if should_report_cov: + return True + + heartbeat_interval = sample_rate_sec if sample_rate_sec > 0 else DEFAULT_HEARTBEAT_SEC + if (now - self.last_reported_time) >= heartbeat_interval: + return True + + return False + + def mark_reported(self) -> None: + """ + Updates the reporting state after a successful publish. + + Syncs `last_reported_value` with `present_value` and updates + `last_reported_time` to the current time. + """ + self.last_reported_value = self.present_value + self.last_reported_time = time.time() + + +class PointsetManager(BaseManager): """ Manages the 'pointset' block. - Acts as the central orchestrator for device telemetry. It dynamically provisions points - from configuration, parses sample rates, oversees background telemetry generation loops, - and handles point writebacks (including validation, staleness, and expiry). - - - Point Lifecycle: Dynamically instantiates new points based on config and manages - their lifecycles (active vs inactive points). - - Writeback Timers: Spawns threading.Timer instances per-point for the 'set_value_expiry' - directive. Transitions points back to base state and triggers State - regeneration upon expiration. - - State Etag Tracking: Calculates and tracks a deterministic state_etag (SHA-256) of all - managed points to prevent stale writebacks and ensure UI synchronicity. - - Polling Pipeline: Interleaves Pointset callbacks, BulkPointProvider reads, and individual - point updates inside its publish_telemetry method. + Handles configuration of points, reporting of point states, and + periodic publishing of telemetry events based on global rate OR per-point COV. """ PERSISTENCE_KEY = "pointset_state" - PERSISTENCE_BUFFER_KEY = "pointset_telemetry_buffer" - MAX_OFFLINE_BUFFER_SIZE = 1000 @property def model_field_name(self) -> str: return "pointset" - def __init__(self, sample_rate_sec: int = DEFAULT_SAMPLE_RATE_SEC, - point_factory: Optional[Callable] = None, - bulk_provider: Optional[BulkPointProvider] = None): + def __init__(self, sample_rate_sec: int = DEFAULT_SAMPLE_RATE_SEC): """ Initializes the PointsetManager. Args: sample_rate_sec: The default interval between telemetry checks. - point_factory: Optional factory to create points. Defaults to Point. - bulk_provider: Optional provider for bulk telemetry reads. """ super().__init__() - self._point_factory = point_factory or Point - self._all_points: Dict[str, AbstractPoint] = {} - self._last_set_values: Dict[str, Any] = {} + self._points: Dict[str, Point] = {} self._sample_rate_sec = sample_rate_sec self._state_etag: Optional[str] = None - self._active_points: set = set() - self._last_active_points: set = set() - - self._sample_limit_sec: Optional[int] = None - self._last_publish_time: float = 0.0 - - self._last_full_publish_time: float = 0.0 self._writeback_handler: Optional[WritebackHandler] = None self._telemetry_wake_event: Optional[threading.Event] = None self._poll_callback: Optional[PollCallback] = None - self._writeback_timers: Dict[str, threading.Timer] = {} - self._bulk_provider: Optional[BulkPointProvider] = bulk_provider - - self._offline_buffer: list[dict] = [] LOGGER.info("PointsetManager initialized.") @property - def points(self) -> Mapping[str, AbstractPoint]: - """Returns a read-only mapping of the active points.""" - return { - name: self._all_points[name] - for name in self._active_points - if name in self._all_points - } - - @property - def all_points(self) -> Mapping[str, AbstractPoint]: - """Returns a read-only mapping of all points.""" - return self._all_points - - @property - def _points(self): - warnings.warn( - "Accessing the private '_points' attribute is deprecated. " - "Use the public '.points' property to get active points, " - "or '.all_points' to get all provisioned points.", - DeprecationWarning, stacklevel=2 - ) - return self.points + def points(self) -> Mapping[str, Point]: + """Returns a read-only mapping of the managed points.""" + return self._points def set_model(self, model: Metadata) -> None: """ Applies the Pointset Metadata (Model) to the manager. - Iterates through the static metadata model during startup to initialize - points and apply static properties (like units). + + This iterates through the metadata model and initializes points + defined therein. Args: model: The device metadata object. """ super().set_model(model) - if not self.model or not hasattr(self.model, - 'points') or not self.model.points: + if not self.model or not hasattr(self.model, 'points') or not self.model.points: return LOGGER.info("Applying Pointset Metadata Model...") for name, point_model in self.model.points.items(): - if name not in self._all_points: + if name not in self._points: self.add_point(name) - else: - self._all_points[name].set_model(point_model) + self._points[name].set_model(point_model) def set_writeback_handler(self, handler: WritebackHandler) -> None: """ @@ -169,14 +286,7 @@ def set_writeback_handler(self, handler: WritebackHandler) -> None: Args: handler: A callable taking (point_name, value). """ - message = ( - "set_writeback_handler is deprecated and will be removed in v2.0.0. " - "Please subclass BasicPoint or AbstractPoint and use point_factory instead." - ) - LOGGER.warning(message) - warnings.warn(message, DeprecationWarning, stacklevel=2) self._writeback_handler = handler - LOGGER.info("Registered writeback handler.") def set_poll_callback(self, callback: PollCallback) -> None: """ @@ -188,25 +298,9 @@ def set_poll_callback(self, callback: PollCallback) -> None: Args: callback: A callable returning a dict of {point_name: value}. """ - message = ( - "set_poll_callback is deprecated and will be removed in v2.0.0. " - "Please migrate to the BulkPointProvider interface." - ) - LOGGER.warning(message) - warnings.warn(message, DeprecationWarning, stacklevel=2) self._poll_callback = callback LOGGER.info("Registered telemetry poll callback.") - def register_bulk_provider(self, provider: BulkPointProvider) -> None: - """ - Registers a bulk provider that supplies hardware reads for points. - - Args: - provider: An instance of BulkPointProvider. - """ - self._bulk_provider = provider - LOGGER.info("Registered bulk telemetry provider.") - def add_point(self, name: str) -> None: """ Registers a point to be managed. @@ -214,14 +308,8 @@ def add_point(self, name: str) -> None: Args: name: The name of the point to add. """ - if name not in self._all_points: - point_model = None - if self.model and hasattr(self.model, - "points") and self.model.points: - point_model = self.model.points.get(name) - - self._all_points[name] = self._point_factory(name, - model=point_model) + if name not in self._points: + self._points[name] = Point(name) LOGGER.debug("Added point '%s' to manager.", name) def set_point_value(self, name: str, value: Any) -> None: @@ -232,14 +320,9 @@ def set_point_value(self, name: str, value: Any) -> None: name: The name of the point. value: The new value. """ - if name not in self._all_points: + if name not in self._points: self.add_point(name) - - point = self._all_points[name] - if hasattr(point, "set_present_value"): - point.set_present_value(value) - else: - LOGGER.debug("Point '%s' does not support set_present_value", name) + self._points[name].set_present_value(value) def start(self) -> None: """ @@ -249,7 +332,6 @@ def start(self) -> None: """ LOGGER.info("Starting PointsetManager telemetry loop...") self._load_persisted_state() - self._load_persisted_buffer() self._telemetry_wake_event = self.start_periodic_task( interval_getter=lambda: self._sample_rate_sec, task=self.publish_telemetry, @@ -261,20 +343,13 @@ def stop(self) -> None: Stops the telemetry loop and persists current state. """ self._persist_state() - for timer in self._writeback_timers.values(): - timer.cancel() - self._writeback_timers.clear() super().stop() LOGGER.info("PointsetManager stopped.") def handle_config(self, config: Config) -> None: - # pylint: disable=too-many-locals,too-many-branches,too-many-statements,too-many-nested-blocks """ Handles 'pointset' block of config. - The central pointset state-machine driver for config updates. It updates - global sample rates, provisions new points dynamically, handles writeback - validations (etag checks, expiry evaluation), and synchronizes individual - point states with cloud commands. + Updates sample rates and dynamic point configurations. Args: config: The full device configuration. @@ -291,180 +366,47 @@ def handle_config(self, config: Config) -> None: self._sample_rate_sec = config.pointset.sample_rate_sec if self._telemetry_wake_event: self._telemetry_wake_event.set() - self._sample_limit_sec = config.pointset.sample_limit_sec + + # Update Etag + self._state_etag = config.pointset.state_etag # Update Points (Dynamic Provisioning) new_point_configs = config.pointset.points or {} - self._active_points = set(new_point_configs.keys()) + current_point_names = set(self._points.keys()) + new_point_names = set(new_point_configs.keys()) - config_timestamp_str = config.timestamp - config_timestamp = None - if config_timestamp_str: - try: - config_timestamp = datetime.fromisoformat( - config_timestamp_str.replace("Z", "+00:00")).timestamp() - except ValueError: - pass - - set_value_expiry_str = config.pointset.set_value_expiry - set_value_expiry = None - if set_value_expiry_str: - try: - set_value_expiry = datetime.fromisoformat( - set_value_expiry_str.replace("Z", "+00:00")).timestamp() - except ValueError: - pass - - incoming_etag = config.pointset.state_etag - etag_mismatch = False - if incoming_etag and self._state_etag and incoming_etag != self._state_etag: - LOGGER.warning( - "state_etag mismatch! Cloud: %s | Device: %s. Rejecting writebacks.", - incoming_etag, self._state_etag - ) - etag_mismatch = True + for name in current_point_names - new_point_names: + LOGGER.info("Removing point '%s' not present in received config", name) + del self._points[name] - for point_name in new_point_configs: - if point_name not in self._all_points: - LOGGER.info("Provisioning new point from config: %s", - point_name) + for point_name, point_config in new_point_configs.items(): + if point_name not in self._points: + LOGGER.info("Provisioning new point from config: %s", point_name) self.add_point(point_name) - for point_name, point in self._all_points.items(): - point_config = new_point_configs.get(point_name) - - invalid_expiry = False - is_expired = False - if point_config and point_config.set_value is not None: - if set_value_expiry is None or ( - config_timestamp is not None and set_value_expiry <= config_timestamp): - invalid_expiry = True - elif set_value_expiry < time.time(): - is_expired = True - - try: - point.set_config(point_config, invalid_expiry=invalid_expiry, - is_expired=is_expired, - on_state_change=self.trigger_state_update) - except TypeError: - point.set_config(point_config) - - if point_config and point_config.set_value is not None: - if invalid_expiry or is_expired: - if invalid_expiry: - self.trigger_state_update() - if point_name in self._writeback_timers: - self._writeback_timers[point_name].cancel() - del self._writeback_timers[point_name] - continue - - if etag_mismatch: - point.value_state = ValueState.invalid - point.status = Entry( - message="state_etag mismatch. Stale writeback prevented.", - level=500 - ) - self.trigger_state_update() - continue - - previous_set = self._last_set_values.get(point_name) - if point_config.set_value != previous_set: - self._last_set_values[point_name] = point_config.set_value - if self._writeback_handler is not None: - try: - result = self._writeback_handler(point_name, - point_config.set_value) - if isinstance(result, ValueState): - point.value_state = result - elif isinstance(result, WritebackResult): - point.value_state = result.value_state - if result.status: - point.status = result.status - except Exception as e: # pylint: disable=broad-exception-caught - LOGGER.error( - "Error in writeback handler for %s: %s", - point_name, e) - point.value_state = ValueState.failure - point.status = Entry(message=str(e), level=500) - else: - # Rely entirely on the point's native set_config/set_value - # to manage hardware actuation and determine the resulting value_state. - pass - - if point.value_state in (ValueState.applied, - ValueState.updating) and set_value_expiry: - delay = set_value_expiry - time.time() - if delay > 0: - if point_name in self._writeback_timers: - self._writeback_timers[point_name].cancel() - - timer = threading.Timer(delay, - self._handle_writeback_expiration, - args=[point_name]) - timer.daemon = True - self._writeback_timers[point_name] = timer - timer.start() - - def _handle_writeback_expiration(self, point_name: str) -> None: - """ - Callback when a point's set_value_expiry is reached. - """ - if point_name not in self._all_points: - return - - point = self._all_points[point_name] - try: - if hasattr(point, "clear_writeback"): - point.clear_writeback() - elif hasattr(point, "set_config"): - point.set_config(PointPointsetConfig()) - - if point_name in self._writeback_timers: - del self._writeback_timers[point_name] - - self._last_set_values.pop(point_name, None) - self.trigger_state_update() - LOGGER.info("Writeback timer expired for point: %s", point_name) - except Exception as e: # pylint: disable=broad-exception-caught - LOGGER.error("Error expiring writeback for point %s: %s", - point_name, e) + point = self._points[point_name] + is_writeback = point.update_config(point_config) + + if is_writeback and self._writeback_handler is not None: + try: + result = self._writeback_handler(point_name, point.set_value) + if isinstance(result, ValueState): + point.value_state = result + elif isinstance(result, WritebackResult): + point.value_state = result.value_state + if result.status: + point.status = result.status + except Exception as e: # pylint: disable=broad-exception-caught + LOGGER.error("Error in writeback handler for %s: %s", + point_name, e) + point.value_state = ValueState.failure + point.status = Entry(message=str(e), level=500) def handle_command(self, command_name: str, payload: dict) -> None: """ Handles commands directed at the pointset (currently none). """ - def _generate_state_etag(self, points_state_map: dict) -> str: - """ - Generates a unique SHA-256 hash representing the current pointset state. - Includes 'units', 'value_state', and 'set_value'. - """ - etag_dict_map = {} - for point_name, point_state in points_state_map.items(): - point_state_dict = ( - point_state.to_dict() if hasattr(point_state, 'to_dict') - else point_state - ) - etag_point_dict = {} - if ("units" in point_state_dict and - point_state_dict["units"] is not None): - etag_point_dict["units"] = point_state_dict["units"] - if ("value_state" in point_state_dict and - point_state_dict["value_state"] is not None): - etag_point_dict["value_state"] = point_state_dict["value_state"] - - set_val = self._last_set_values.get(point_name) - if set_val is not None: - etag_point_dict["set_value"] = set_val - - etag_dict_map[point_name] = etag_point_dict - - # Ensure deterministic JSON serialization by sorting keys - state_str = json.dumps(etag_dict_map, sort_keys=True, - separators=(',', ':')) - full_hash = hashlib.sha256(state_str.encode('utf-8')).hexdigest() - return full_hash[:32] - def _load_persisted_state(self) -> None: """Loads the last known point values from persistence.""" if not self._device or not self._device.persistence: @@ -477,48 +419,30 @@ def _load_persisted_state(self) -> None: restored_count = 0 for point_name, point_data in saved_state.items(): - if point_name not in self._all_points: + if point_name not in self._points: continue + point = self._points[point_name] if "present_value" in point_data: - self.set_point_value(point_name, - point_data["present_value"]) + point.present_value = point_data["present_value"] restored_count += 1 LOGGER.info("Restored state for %s points.", restored_count) - except Exception as e: # pylint: disable=broad-exception-caught + except Exception as e: # pylint: disable=broad-exception-caught LOGGER.error("Failed to load persisted pointset state: %s", e) def update_state(self, state: State) -> None: """ Populates state.pointset with status of all managed points. - Aggregates point-level states and compiles them into the device's main State. - Checks for dirty flags and recalculates the state_etag if changes occurred. Args: state: The state object to update. """ - is_state_dirty = False - - if self._active_points != self._last_active_points: - is_state_dirty = True - self._last_active_points = self._active_points.copy() - - points_state_map = {} - for name in self._active_points: - if name in self._all_points: - point = self._all_points[name] - - if point.is_dirty(): - is_state_dirty = True - - points_state_map[name] = point.get_state() - - if is_state_dirty or self._state_etag is None: - self._state_etag = self._generate_state_etag(points_state_map) - LOGGER.debug("Pointset state changed. Recalculated state_etag: %s", - self._state_etag) + points_state_map = { + name: point.get_state() + for name, point in self._points.items() + } state.pointset = PointsetState( state_etag=self._state_etag, @@ -533,88 +457,27 @@ def _persist_state(self) -> None: try: data_to_save = {} - for name, point in self._all_points.items(): - current_data = point.get_data() - if current_data.present_value is not None: + for name, point in self._points.items(): + if point.present_value is not None: data_to_save[name] = { - "present_value": current_data.present_value + "present_value": point.present_value } self._device.persistence.set(self.PERSISTENCE_KEY, data_to_save) - except Exception as e: # pylint: disable=broad-exception-caught + except Exception as e: # pylint: disable=broad-exception-caught LOGGER.error("Failed to persist pointset state: %s", e) - def _buffer_event(self, event: PointsetEvents) -> None: - """Appends the event to the offline buffer and persists it.""" - self._offline_buffer.append(event.to_dict()) - - if len(self._offline_buffer) > self.MAX_OFFLINE_BUFFER_SIZE: - self._offline_buffer = self._offline_buffer[-self.MAX_OFFLINE_BUFFER_SIZE:] - - self._persist_buffer() - LOGGER.info("Device offline. Buffered telemetry event. Buffer size: %s", - len(self._offline_buffer)) - - def _flush_buffer(self) -> None: - """Publishes all buffered events and clears the buffer.""" - if not self._offline_buffer: - return - - try: - LOGGER.info("Connection resumed. Flushing %s buffered telemetry events...", - len(self._offline_buffer)) - for event_dict in self._offline_buffer: - event = PointsetEvents.from_dict(event_dict) - self.publish_event(event, "pointset") - time.sleep(0.01) - - self._offline_buffer.clear() - self._persist_buffer() - except Exception as e: # pylint: disable=broad-exception-caught - LOGGER.error("Failed to flush telemetry buffer: %s", e) - - def _persist_buffer(self) -> None: - """Saves the offline buffer to persistence.""" - if not self._device or not self._device.persistence: - return - try: - self._device.persistence.set(self.PERSISTENCE_BUFFER_KEY, self._offline_buffer) - except Exception as e: # pylint: disable=broad-exception-caught - LOGGER.error("Failed to persist telemetry buffer: %s", e) - - def _load_persisted_buffer(self) -> None: - """Loads the offline telemetry buffer from persistence.""" - if not self._device or not self._device.persistence: - return - try: - saved_buffer = self._device.persistence.get(self.PERSISTENCE_BUFFER_KEY, []) - if isinstance(saved_buffer, list): - self._offline_buffer = saved_buffer - if self._offline_buffer: - LOGGER.info("Restored offline telemetry buffer with %s events.", - len(self._offline_buffer)) - except Exception as e: # pylint: disable=broad-exception-caught - LOGGER.error("Failed to load telemetry buffer: %s", e) - def publish_telemetry(self) -> None: - # pylint: disable=too-many-branches,too-many-statements """ - Generates and publishes a PointsetEvents message. - Runs the periodic telemetry loop. It first updates points via Bulk Providers or Poll - Callbacks. Then, it evaluates every active point to see if it should_report() - based on COV or heartbeats, and crafts the final payload (including partial updates). + Generates and publishes a PointsetEvents message containing + only the points that are 'due' for reporting. + + This method: + 1. Invokes the Poll Callback (if registered) to refresh values. + 2. Iterates over all points to check `should_report()` (COV/Heartbeat). + 3. Constructs a payload of only the reporting points. + 4. Publishes to MQTT via the dispatcher. """ - now = time.time() - if self._sample_limit_sec is not None: - if (now - self._last_publish_time) < self._sample_limit_sec: - return - - if self._poll_callback and self._bulk_provider: - LOGGER.warning( - "Both _bulk_provider and _poll_callback are set, " - "prioritizing _bulk_provider, but also executing _poll_callback" - ) - if self._poll_callback: try: new_values = self._poll_callback() @@ -624,79 +487,29 @@ def publish_telemetry(self) -> None: else: LOGGER.warning("Poll callback returned non-dict type: %s", type(new_values)) - except Exception as e: # pylint: disable=broad-exception-caught + except Exception as e: # pylint: disable=broad-exception-caught LOGGER.error("Error in telemetry poll callback: %s", e, exc_info=True) - if self._bulk_provider: - try: - new_values = self._bulk_provider.read_points() - if isinstance(new_values, dict): - for point_name, value in new_values.items(): - self.set_point_value(point_name, value) - else: - LOGGER.warning("BulkProvider returned non-dict type: %s", - type(new_values)) - except Exception as e: # pylint: disable=broad-exception-caught - LOGGER.error("Error in BulkPointProvider.read_points(): %s", e) - - if not self._all_points: + if not self._points: return points_map = {} - - valid_active_points = [ - name for name in self._active_points if name in self._all_points - ] - - force_full_update = ( - (now - self._last_full_publish_time) >= self._sample_rate_sec) - - for name in valid_active_points: - point = self._all_points[name] - try: - point.update_data() - except Exception as e: # pylint: disable=broad-exception-caught - LOGGER.error("Unable to update point data for '%s': %s", name, - e) - continue - - try: - if force_full_update or point.should_report( - self._sample_rate_sec): - points_map[name] = point.get_data() - point.mark_reported() - except Exception as e: # pylint: disable=broad-exception-caught - LOGGER.error("Unable to process reporting for '%s': %s", name, - e) + for name, point in self._points.items(): + if point.should_report(self._sample_rate_sec): + points_map[name] = point.get_event() + point.mark_reported() if not points_map: return - is_partial = len(points_map) < len(valid_active_points) - try: event = PointsetEvents( timestamp=datetime.now(timezone.utc).isoformat(), version=UDMI_VERSION, - points=points_map, - partial_update=True if is_partial else None + points=points_map ) - - if not self.is_connected: - self._buffer_event(event) - else: - self._flush_buffer() - self.publish_event(event, "pointset") - LOGGER.debug( - "Published telemetry for %s points (Partial: %s, Forced Full: %s).", - len(points_map), - is_partial, - force_full_update - ) - - self._last_publish_time = now - if force_full_update: - self._last_full_publish_time = now - except Exception as e: # pylint: disable=broad-exception-caught + self.publish_event(event, "pointset") + LOGGER.debug("Published telemetry for %s points.", len(points_map)) + except Exception as e: # pylint:disable=broad-exception-caught LOGGER.error("Failed to publish telemetry: %s", e) diff --git a/clientlib/python/tests/core/managers/test_pointset_manager.py b/clientlib/python/tests/core/managers/test_pointset_manager.py index da86c58459..0cfa535853 100644 --- a/clientlib/python/tests/core/managers/test_pointset_manager.py +++ b/clientlib/python/tests/core/managers/test_pointset_manager.py @@ -7,7 +7,6 @@ from unittest.mock import MagicMock from unittest.mock import patch import time -import warnings import pytest from udmi.schema import Config @@ -19,11 +18,10 @@ from udmi.schema import State from udmi.schema import ValueState -from udmi.core.managers.point.virtual_point import Point +from src.udmi.core.managers.pointset_manager import Point from src.udmi.core.managers.pointset_manager import PointsetManager from src.udmi.core.managers.pointset_manager import WritebackResult from src.udmi.core.messaging import AbstractMessageDispatcher -from udmi.core.managers.point.bulk_provider import BulkPointProvider # pylint: disable=redefined-outer-name,protected-access @@ -45,19 +43,24 @@ def manager(): def test_point_value_update(): """Test that setting a value updates the internal state for events.""" point = Point("temp_sensor") + assert point.present_value is None + point.set_present_value(25.5) - point.update_data() # Sync value to event - - event = point.get_data() + assert point.present_value == 25.5 + + event = point.get_event() assert event.present_value == 25.5 def test_point_config_update(): """Test that updating config updates the internal state for reporting.""" point = Point("temp_sensor") + assert point.units is None config = PointPointsetConfig(units="Celsius") - point.set_config(config) + point.update_config(config) + + assert point.units == "Celsius" state = point.get_state() assert state.units == "Celsius" @@ -68,49 +71,39 @@ def test_point_should_report_cov_logic(): Test the Change of Value (COV) logic. """ point = Point("test_point") - config = PointPointsetConfig(cov_increment=1.0) - point.set_config(config) + point.cov_increment = 1.0 point.set_present_value(10.0) - point.update_data() assert point.should_report(sample_rate_sec=10) is True point.mark_reported() point.set_present_value(10.5) - point.update_data() assert point.should_report(sample_rate_sec=10) is False point.set_present_value(11.1) - point.update_data() assert point.should_report(sample_rate_sec=10) is True point.mark_reported() # Test periodic reporting without COV (Steady State) # 5 seconds - Should not report - with patch("time.time", return_value=point._last_reported_time + 5): + with patch("time.time", return_value=point.last_reported_time + 5): assert point.should_report(sample_rate_sec=10) is False # 11 seconds (exceeds sample_rate_sec=10) - Should report - with patch("time.time", return_value=point._last_reported_time + 11): + with patch("time.time", return_value=point.last_reported_time + 11): assert point.should_report(sample_rate_sec=10) is True def test_point_set_model_syncs_ref(): - """Test that model limits and values behave correctly with metadata.""" - from udmi.schema import Category + """Test that set_model syncs the ref property.""" from udmi.schema import PointPointsetModel - model = PointPointsetModel(ref="point_ref_123", writable=True) - point = Point("temp_sensor", model=model) + point = Point("temp_sensor") + assert point.ref is None - config_good = PointPointsetConfig(ref="point_ref_123") - point.set_config(config_good) - assert point.get_state().status is None + model = PointPointsetModel(ref="point_ref_123") + point.set_model(model) - config_bad = PointPointsetConfig(ref="bad_ref") - point.set_config(config_bad) - state = point.get_state() - assert state.status is not None - assert state.status.category == Category.POINTSET_POINT_FAILURE + assert point.ref == "point_ref_123" # --- PointsetManager Tests --- @@ -118,16 +111,15 @@ def test_point_set_model_syncs_ref(): def test_initialization(manager): """Test default initialization values.""" assert manager._sample_rate_sec == 10 - assert len(manager._all_points) == 0 + assert len(manager._points) == 0 def test_add_point_and_set_value(manager): """Test the API for adding points and setting values.""" manager.set_point_value("pressure", 101.3) - assert "pressure" in manager._all_points - manager._all_points["pressure"].update_data() - assert manager._all_points["pressure"].get_data().present_value == 101.3 + assert "pressure" in manager._points + assert manager._points["pressure"].present_value == 101.3 def test_handle_config_updates_sample_rate(manager): @@ -154,36 +146,34 @@ def test_handle_config_adds_points(manager): ) manager.handle_config(config) - assert "room_temp" in manager._all_points - assert manager._all_points["room_temp"].get_state().units == "C" - assert "humidity" in manager._all_points - assert manager._all_points["humidity"].get_state().units == "%" + assert "room_temp" in manager._points + assert manager._points["room_temp"].units == "C" + assert "humidity" in manager._points + assert manager._points["humidity"].units == "%" def test_handle_config_updates_state_etag(manager): - """Test that state_etag is NOT blindly captured from config.""" - manager._state_etag = "original" + """Test that state_etag is captured from config.""" config = Config( pointset=PointsetConfig(state_etag="abcdef123") ) manager.handle_config(config) - assert manager._state_etag == "original" + assert manager._state_etag == "abcdef123" def test_update_state_populates_state_block(manager): """Test that update_state populates the state object correctly.""" manager.add_point("temp") - manager._all_points["temp"].set_config(PointPointsetConfig(units="C")) + manager._points["temp"].units = "C" manager._state_etag = "etag_value" - manager._active_points = {"temp"} state = State() manager.update_state(state) assert state.pointset is not None assert isinstance(state.pointset, PointsetState) - assert state.pointset.state_etag == manager._state_etag + assert state.pointset.state_etag == "etag_value" assert "temp" in state.pointset.points assert state.pointset.points["temp"].units == "C" @@ -197,7 +187,6 @@ def test_publish_telemetry_sends_events(manager, mock_dispatcher): manager.set_point_value("temp", 22.0) manager.set_point_value("pressure", 1000) - manager._active_points = {"temp", "pressure"} manager.publish_telemetry() @@ -215,17 +204,13 @@ def test_publish_telemetry_sends_events(manager, mock_dispatcher): def test_publish_telemetry_ignores_points_without_values(manager, mock_dispatcher): """ - Test that points with no value (None) are excluded from the event - when not doing a full update (force_full_update=False). + Test that points with no value (None) are excluded from the event. """ manager.set_device_context(device=None, dispatcher=mock_dispatcher) manager.set_point_value("valid_point", 10) manager.add_point("empty_point") - manager._active_points = {"valid_point", "empty_point"} - import time - manager._last_full_publish_time = time.time() manager.publish_telemetry() call_args = mock_dispatcher.publish_event.call_args @@ -277,9 +262,7 @@ def test_handle_config_triggers_writeback_handler(manager): manager.set_writeback_handler(mock_handler) config = Config( - timestamp="2030-01-01T00:00:00Z", pointset=PointsetConfig( - set_value_expiry="2030-01-01T01:00:00Z", points={ "setpoint": PointPointsetConfig(set_value=22.5) } @@ -289,6 +272,7 @@ def test_handle_config_triggers_writeback_handler(manager): manager.handle_config(config) mock_handler.assert_called_once_with("setpoint", 22.5) + assert manager._points["setpoint"].set_value == 22.5 def test_publish_telemetry_invokes_poll_callback(manager, mock_dispatcher): @@ -302,7 +286,6 @@ def mock_poll(): return {"polled_point": 123} manager.set_poll_callback(mock_poll) - manager._active_points = {"polled_point"} manager.publish_telemetry() @@ -323,21 +306,16 @@ def mock_handler(point_name, value): manager.set_writeback_handler(mock_handler) - manager.add_point("setpoint") - manager._all_points["setpoint"]._writable = True - config = Config( - timestamp="2030-01-01T00:00:00Z", pointset=PointsetConfig( - set_value_expiry="2030-01-01T01:00:00Z", points={"setpoint": PointPointsetConfig(set_value=22.5)} ) ) manager.handle_config(config) - assert manager._all_points["setpoint"].get_data().present_value == 22.5 - assert manager._all_points["setpoint"].value_state == ValueState.overridden + assert manager._points["setpoint"].set_value == 22.5 + assert manager._points["setpoint"].value_state == ValueState.overridden def test_handle_config_writeback_returns_result(manager): @@ -352,21 +330,16 @@ def mock_handler(point_name, value): manager.set_writeback_handler(mock_handler) - manager.add_point("setpoint") - manager._all_points["setpoint"]._writable = True - config = Config( - timestamp="2030-01-01T00:00:00Z", pointset=PointsetConfig( - set_value_expiry="2030-01-01T01:00:00Z", points={"setpoint": PointPointsetConfig(set_value=22.5)} ) ) manager.handle_config(config) - point = manager._all_points["setpoint"] - assert point.get_data().present_value == 22.5 + point = manager._points["setpoint"] + assert point.set_value == 22.5 assert point.value_state == ValueState.invalid assert point.status == custom_entry @@ -381,307 +354,17 @@ def mock_handler(point_name, value): manager.set_writeback_handler(mock_handler) - manager.add_point("setpoint") - manager._all_points["setpoint"]._writable = True - config = Config( - timestamp="2030-01-01T00:00:00Z", pointset=PointsetConfig( - set_value_expiry="2030-01-01T01:00:00Z", points={"setpoint": PointPointsetConfig(set_value=22.5)} ) ) manager.handle_config(config) - point = manager._all_points["setpoint"] - assert point.get_data().present_value == 22.5 + point = manager._points["setpoint"] + assert point.set_value == 22.5 assert point.value_state == ValueState.failure assert point.status is not None assert point.status.level == 500 - assert "Hardware communication failed" in point.status.message - - -def test_set_writeback_handler_emits_deprecation_warning(manager): - """Verifies that calling set_writeback_handler triggers a DeprecationWarning.""" - with warnings.catch_warnings(record=True) as w: - warnings.simplefilter("always") - manager.set_writeback_handler(lambda n, v: None) - assert len(w) == 1 - assert issubclass(w[-1].category, DeprecationWarning) - assert "set_writeback_handler is deprecated" in str(w[-1].message) - - -def test_set_poll_callback_emits_deprecation_warning(manager): - """Verifies that calling set_poll_callback triggers a DeprecationWarning.""" - with warnings.catch_warnings(record=True) as w: - warnings.simplefilter("always") - manager.set_poll_callback(lambda: {}) - assert len(w) == 1 - assert issubclass(w[-1].category, DeprecationWarning) - assert "set_poll_callback is deprecated" in str(w[-1].message) - - -class MockBulkProvider(BulkPointProvider): - def read_points(self): - return {"temp": 22.0} - - -def test_bulk_provider_updates_points(manager, mock_dispatcher): - """Verifies that an initialized bulk provider populates values before publishing.""" - manager.set_device_context(device=None, dispatcher=mock_dispatcher) - - provider = MockBulkProvider() - manager.register_bulk_provider(provider) - - manager.add_point("temp") - manager._active_points = {"temp"} - - # No point value set originally - manager.publish_telemetry() - - # Assert dispatcher published with the value from the provider - call_args = mock_dispatcher.publish_event.call_args - payload = call_args[0][1] - - assert "temp" in payload.points - assert payload.points["temp"].present_value == 22.0 - - -def test_initialization_with_bulk_provider(mock_dispatcher): - """Verifies that PointsetManager can be initialized with a bulk provider.""" - provider = MockBulkProvider() - manager = PointsetManager(bulk_provider=provider) - manager.set_device_context(device=None, dispatcher=mock_dispatcher) - - manager.add_point("temp") - manager._active_points = {"temp"} - - # No point value set originally - manager.publish_telemetry() - - # Assert dispatcher published with the value from the provider - call_args = mock_dispatcher.publish_event.call_args - payload = call_args[0][1] - - assert "temp" in payload.points - assert payload.points["temp"].present_value == 22.0 - - -def test_native_writeback_path_used_when_no_global_handler(manager): - """Verifies point internally processes set_value when no global handler exists.""" - assert manager._writeback_handler is None - - manager.add_point("setpoint") - manager._all_points["setpoint"]._writable = True - - config = Config( - timestamp="2030-01-01T00:00:00Z", - pointset=PointsetConfig( - set_value_expiry="2030-01-01T01:00:00Z", - points={"setpoint": PointPointsetConfig(set_value=22.5)} - ) - ) - - manager.handle_config(config) - - point = manager._all_points["setpoint"] - - # Wait for the native writeback thread to complete - timeout = 1.0 - start_time = time.time() - while point.value_state != ValueState.applied and time.time() - start_time < timeout: - time.sleep(0.01) - - assert point.get_data().present_value == 22.5 - assert point.value_state == ValueState.applied - -def test_native_writeback_asynchronous_execution(manager): - """Verifies that point writebacks utilize the updating state for long hardware actuations.""" - manager.add_point("slow_setpoint") - point = manager._all_points["slow_setpoint"] - point._writable = True - - # Mock a long-running hardware actuation - original_set_value = point.set_value - def slow_set_value(val): - time.sleep(0.1) - return original_set_value(val) - - point.set_value = slow_set_value - - config = Config( - timestamp="2030-01-01T00:00:00Z", - pointset=PointsetConfig( - set_value_expiry="2030-01-01T01:00:00Z", - points={"slow_setpoint": PointPointsetConfig(set_value=42.0)} - ) - ) - - manager.handle_config(config) - - # State should immediately be 'updating' upon synchronous return - assert point.value_state == ValueState.updating - assert point.get_data().present_value != 42.0 - - # Wait for completion - timeout = 1.0 - start_time = time.time() - while point.value_state != ValueState.applied and time.time() - start_time < timeout: - time.sleep(0.01) - - assert point.value_state == ValueState.applied - assert point.get_data().present_value == 42.0 - -def test_persistence_logic(manager, mock_dispatcher): - """Verifies that _persist_state and _load_persisted_state interact with device persistence correctly.""" - mock_device = MagicMock() - mock_persistence = MagicMock() - mock_device.persistence = mock_persistence - manager.set_device_context(device=mock_device, dispatcher=mock_dispatcher) - - manager.add_point("persisted_point") - manager.set_point_value("persisted_point", 55.5) - manager._all_points["persisted_point"].update_data() - - manager._persist_state() - mock_persistence.set.assert_called_once() - - saved_data = mock_persistence.set.call_args[0][1] - assert "persisted_point" in saved_data - assert saved_data["persisted_point"]["present_value"] == 55.5 - - mock_persistence.get.return_value = {"persisted_point": {"present_value": 77.7}} - manager._load_persisted_state() - - manager._all_points["persisted_point"].update_data() - assert manager._all_points["persisted_point"].get_data().present_value == 77.7 - -def test_handle_writeback_expiration(manager): - """Verifies that writeback expiration clears the state.""" - manager.add_point("expiring_point") - point = manager._all_points["expiring_point"] - - point._written = True - point.value_state = ValueState.applied - manager._last_set_values["expiring_point"] = 100 - manager._writeback_timers["expiring_point"] = MagicMock() - - manager._handle_writeback_expiration("expiring_point") - - assert point.value_state is None - assert "expiring_point" not in manager._last_set_values - assert "expiring_point" not in manager._writeback_timers - -def test_generate_state_etag(manager): - """Verifies state_etag generation is deterministic.""" - dict1 = {"pointA": {"value_state": "applied"}, "pointB": {"value_state": "invalid"}} - dict2 = {"pointB": {"value_state": "invalid"}, "pointA": {"value_state": "applied"}} - - etag1 = manager._generate_state_etag(dict1) - etag2 = manager._generate_state_etag(dict2) - - assert etag1 == etag2 - assert len(etag1) == 32 - -def test_publish_telemetry_catches_point_exceptions(manager, mock_dispatcher): - """Verifies telemetry loop handles individual point failures gracefully.""" - manager.set_device_context(device=None, dispatcher=mock_dispatcher) - manager.add_point("robust_point") - manager._active_points = {"robust_point"} - - point = manager._all_points["robust_point"] - point.update_data = MagicMock(side_effect=Exception("Test Error")) - - manager.publish_telemetry() - -def test_handle_config_invalid_expiry_handling(manager): - """Verifies that an invalid set_value_expiry transitions point to invalid.""" - manager.add_point("setpoint") - manager._all_points["setpoint"]._writable = True - config = Config( - timestamp="2030-01-01T00:00:00Z", - pointset=PointsetConfig( - set_value_expiry="2020-01-01T00:00:00Z", - points={"setpoint": PointPointsetConfig(set_value=123)} - ) - ) - manager.handle_config(config) - assert manager._all_points["setpoint"].value_state == ValueState.invalid - -def test_handle_command_passthrough(manager): - """Verifies that handle_command safely processes without failing.""" - manager.handle_command("some_command", {"payload": "data"}) - - -def test_offline_buffering(manager, mock_dispatcher): - """Verifies that telemetry is buffered when disconnected and flushed when connected.""" - mock_device = MagicMock() - mock_persistence = MagicMock() - mock_device.persistence = mock_persistence - manager.set_device_context(device=mock_device, dispatcher=mock_dispatcher) - - manager.add_point("temp") - manager.set_point_value("temp", 22.0) - manager._active_points = {"temp"} - - # Simulate disconnected - mock_dispatcher.is_connected.return_value = False - manager.publish_telemetry() - - mock_dispatcher.publish_event.assert_not_called() - assert len(manager._offline_buffer) == 1 - mock_persistence.set.assert_called_with(PointsetManager.PERSISTENCE_BUFFER_KEY, manager._offline_buffer) - - # Change value and publish again - manager.set_point_value("temp", 25.0) - manager.publish_telemetry() - - mock_dispatcher.publish_event.assert_not_called() - assert len(manager._offline_buffer) == 2 - - # Simulate connected - mock_dispatcher.is_connected.return_value = True - manager.set_point_value("temp", 27.0) - manager.publish_telemetry() - - # Flush 2 items + publish the new 1 - assert mock_dispatcher.publish_event.call_count == 3 - assert len(manager._offline_buffer) == 0 - mock_persistence.set.assert_called_with(PointsetManager.PERSISTENCE_BUFFER_KEY, []) - -def test_offline_buffer_capacity(manager, mock_dispatcher): - """Verifies that the offline buffer limits the number of queued events.""" - mock_device = MagicMock() - mock_device.persistence = MagicMock() - manager.set_device_context(device=mock_device, dispatcher=mock_dispatcher) - - manager.add_point("temp") - manager._active_points = {"temp"} - - # Simulate disconnected - mock_dispatcher.is_connected.return_value = False - - # Lower max size for test - original_max = manager.MAX_OFFLINE_BUFFER_SIZE - manager.MAX_OFFLINE_BUFFER_SIZE = 5 - - for i in range(10): - manager.set_point_value("temp", float(i)) - manager.publish_telemetry() - - assert len(manager._offline_buffer) == 5 - - manager.MAX_OFFLINE_BUFFER_SIZE = original_max - -def test_load_persisted_buffer(manager, mock_dispatcher): - """Verifies that the offline buffer is loaded from persistence correctly.""" - mock_device = MagicMock() - mock_device.persistence = MagicMock() - mock_device.persistence.get.return_value = [{"points": {"temp": {"present_value": 11.0}}}] - manager.set_device_context(device=mock_device, dispatcher=mock_dispatcher) - - manager._load_persisted_buffer() - - assert len(manager._offline_buffer) == 1 - assert manager._offline_buffer[0]["points"]["temp"]["present_value"] == 11.0 \ No newline at end of file + assert "Hardware communication failed" in point.status.message \ No newline at end of file diff --git a/common/src/main/java/com/google/udmi/util/SiteModel.java b/common/src/main/java/com/google/udmi/util/SiteModel.java index dbae97619d..cf77a8f3cd 100644 --- a/common/src/main/java/com/google/udmi/util/SiteModel.java +++ b/common/src/main/java/com/google/udmi/util/SiteModel.java @@ -103,7 +103,7 @@ public class SiteModel { private static final Pattern MQTT_PATTERN = Pattern.compile("/r/(.*)/d/(.*)"); private static final String CLOUD_IOT_CONFIG_JSON = "cloud_iot_config.json"; private static final Pattern SPEC_PATTERN = Pattern.compile( - "(//([a-z]+)/)?(([a-z-]+))(/([a-z0-9]+))?(\\+([a-z0-9-]+))?"); + "(//([a-z]+)/)?(([a-z-\\.]+))(/([a-z0-9]+))?(\\+([a-z0-9-]+))?"); private static final int SPEC_PROVIDER_GROUP = 2; private static final int SPEC_PROJECT_GROUP = SPEC_PROVIDER_GROUP + 2; private static final int SPEC_NAMESPACE_GROUP = SPEC_PROJECT_GROUP + 2; diff --git a/validator/src/main/java/com/google/bos/iot/core/proxy/IotReflectorClient.java b/validator/src/main/java/com/google/bos/iot/core/proxy/IotReflectorClient.java index 99b2f77f54..6259220455 100644 --- a/validator/src/main/java/com/google/bos/iot/core/proxy/IotReflectorClient.java +++ b/validator/src/main/java/com/google/bos/iot/core/proxy/IotReflectorClient.java @@ -94,7 +94,7 @@ public class IotReflectorClient implements MessagePublisher { private static final String EVENTS_TYPE = "events"; private static final String MOCK_DEVICE_NUM_ID = "123456789101112"; private static final String UDMI_TOPIC = "events/" + SubFolder.UDMI; - private static final long CONFIG_TIMEOUT_SEC = 5; + private static final long CONFIG_TIMEOUT_SEC = 60; private static final int UPDATE_RETRIES = 6; private static final Collection COPY_IDS = ImmutableSet.of(DEVICE_ID_KEY, GATEWAY_ID_KEY, SUBTYPE_PROPERTY_KEY, SUBFOLDER_PROPERTY_KEY, TRANSACTION_KEY, PUBLISH_TIME_KEY); @@ -284,6 +284,7 @@ public Credential getCredential() { private void messageHandler(String topic, String payload) { receiveStats.update(); + payload = ofNullable(payload).map(String::trim).orElse(""); if (payload.length() == 0) { return; } @@ -391,7 +392,8 @@ private void processUdmiEvent(Map message) { } ifNotTrueThen(events.logentries.isEmpty(), this::updateLastProgressEvent); events.logentries.forEach( - entry -> System.err.printf("%s %s%n", isoConvert(entry.timestamp), entry.message)); + entry -> System.err.printf("%s %s%n", isoConvert(ofNullable(entry.timestamp).orElse(null)), + entry.message)); } private synchronized void ensureCloudSync(Map message) { @@ -404,8 +406,16 @@ private synchronized void ensureCloudSync(Map message) { UdmiConfig reflectorConfig = convertTo(UdmiConfig.class, ofNullable(message.get(SubFolder.UDMI.value())).orElse(message)); + if (reflectorConfig == null || reflectorConfig.reply == null + || reflectorConfig.reply.msg_source == null) { + return; + } + boolean shouldConsiderReply = reflectorConfig.reply.msg_source.equals(userName); String transactionId = reflectorConfig.reply.transaction_id; + if (transactionId == null) { + return; + } boolean matchingTxnId = transactionId.equals(expectedTxnId); boolean matchingSession = transactionId.startsWith(sessionPrefix); @@ -498,7 +508,8 @@ private void error(String message) { } private Envelope parseMessageTopic(String topic) { - List parts = new ArrayList<>(Arrays.asList(topic.substring(1).split("/"))); + String stripped = topic.startsWith("/") ? topic.substring(1) : topic; + List parts = new ArrayList<>(Arrays.asList(stripped.split("/"))); String leader = parts.remove(0); if ("devices".equals(leader)) { // Next field is registry, not device, since the reflector device id is the site registry. @@ -524,15 +535,24 @@ private Envelope parseMessageTopic(String topic) { Envelope envelope = new Envelope(); envelope.deviceRegistryId = registryId; + if (parts.get(0).equals("c")) { + parts.remove(0); // consume 'c' + envelope.source = parts.remove(0); // consume source + } + String[] bits1 = parts.remove(0).split(SOURCE_SEPARATOR_REGEX); checkState(parts.isEmpty() || bits1.length == 1, "Malformed topic: " + topic); envelope.subType = SubType.fromValue(bits1[0]); if (parts.isEmpty()) { - envelope.source = bits1.length > 1 ? bits1[1] : null; + if (envelope.source == null) { + envelope.source = bits1.length > 1 ? bits1[1] : null; + } } else { String[] bits2 = parts.remove(0).split(SOURCE_SEPARATOR_REGEX); envelope.subFolder = SubFolder.fromValue(bits2[0]); - envelope.source = bits2.length > 1 ? bits2[1] : null; + if (envelope.source == null) { + envelope.source = bits2.length > 1 ? bits2[1] : null; + } } checkState(parts.isEmpty()); diff --git a/validator/src/main/java/com/google/bos/iot/core/proxy/MqttPublisher.java b/validator/src/main/java/com/google/bos/iot/core/proxy/MqttPublisher.java index f0feb89e7e..3d72404a29 100644 --- a/validator/src/main/java/com/google/bos/iot/core/proxy/MqttPublisher.java +++ b/validator/src/main/java/com/google/bos/iot/core/proxy/MqttPublisher.java @@ -100,7 +100,7 @@ public class MqttPublisher implements MessagePublisher { private static final String MESSAGE_TOPIC_FMT = "/%s"; private static final int QOS_AT_MOST_ONCE = 0; private static final int QOS_AT_LEAST_ONCE = 1; - private static final int INITIALIZE_TIME_MS = 20000; + private static final int INITIALIZE_TIME_MS = 30000; private static final String BROKER_URL_FORMAT = "%s://%s:%s"; private static final int PUBLISH_THREAD_COUNT = 10; private static final Duration TOKEN_EXPIRATION = Duration.ofHours(1); @@ -191,7 +191,7 @@ private static String getProviderHostname(ExecutionConfiguration executionConfig () -> switch (iotProvider) { case JWT -> requireNonNull(executionConfiguration.bridge_host, "missing bridge_host"); case GBOS -> DEFAULT_GBOS_HOSTNAME; - case MQTT -> requireNonNull(executionConfiguration.project_id); + case MQTT, IMPLICIT -> requireNonNull(executionConfiguration.project_id); case CLEARBLADE -> DEFAULT_CLEARBLADE_HOSTNAME; default -> throw new RuntimeException("Unsupported iot provider " + iotProvider); } @@ -245,8 +245,8 @@ private CertManager getCertManager() { private String getTopicBase() { return switch (iotProvider) { - case IMPLICIT, GBOS, CLEARBLADE -> format(DEVICE_TOPIC_FMT, deviceId); - case MQTT -> format(FULL_TOPIC_FMT, registryId, deviceId); + case GBOS, CLEARBLADE -> format(DEVICE_TOPIC_FMT, deviceId); + case MQTT, IMPLICIT -> format(FULL_TOPIC_FMT, registryId, deviceId); default -> throw new RuntimeException("Unknown iotProvider " + iotProvider); }; } @@ -525,7 +525,7 @@ private SocketFactory getSocketFactory() { private String getUserName() { return switch (iotProvider) { case GBOS, CLEARBLADE -> UNUSED_ACCOUNT_NAME; - case MQTT -> format(MQTT_USER_NAME_FMT, registryId, deviceId); + case MQTT, IMPLICIT -> format(MQTT_USER_NAME_FMT, registryId, deviceId); default -> throw new RuntimeException("Unsupported iot provider " + iotProvider); }; } @@ -550,7 +550,7 @@ private synchronized void connectAndSetupMqtt() { private char[] getAuthToken(String audience) { return switch (iotProvider) { - case MQTT -> getHashPassword(audience); + case MQTT, IMPLICIT -> getHashPassword(audience); case GBOS, CLEARBLADE -> createJwt(audience); default -> throw new RuntimeException("Unsupported iotProvider " + iotProvider); }; @@ -611,7 +611,7 @@ String getClientId(String deviceId) { } return switch (iotProvider) { case GBOS, CLEARBLADE -> format(LONG_ID_FMT, projectId, cloudRegion, registryId, deviceId); - case MQTT -> format(SHORT_ID_FMT, registryId, deviceId); + case MQTT, IMPLICIT -> format(SHORT_ID_FMT, registryId, deviceId); default -> throw new RuntimeException("Provider not supported " + iotProvider); }; } @@ -622,7 +622,7 @@ private String getBrokerUrl() { private String getBrokerProtocol() { return switch (iotProvider) { - case MQTT, GBOS, CLEARBLADE -> "ssl"; + case MQTT, GBOS, CLEARBLADE, IMPLICIT -> "ssl"; default -> throw new RuntimeException("Provider not supported " + iotProvider); }; }