diff --git a/clientlib/python/samples/pointset/cov_throttling.py b/clientlib/python/samples/pointset/cov_throttling.py index bce335771f..e7d8171ee1 100644 --- a/clientlib/python/samples/pointset/cov_throttling.py +++ b/clientlib/python/samples/pointset/cov_throttling.py @@ -48,10 +48,6 @@ def simulate_fast_sensor(manager: PointsetManager): """ Simulates a sensor reading that changes rapidly (every 1.0s). - - Represents an independently running hardware thread or fast I/O loop. It blindly - pushes raw data to the PointsetManager, illustrating that the manager itself takes - on the responsibility of throttling and interpreting Change of Value (cov_increment). """ val = 0.0 LOGGER.info("Starting sensor simulation (1.0s interval)...") diff --git a/clientlib/python/samples/pointset/custom_point_factory.py b/clientlib/python/samples/pointset/custom_point_factory.py deleted file mode 100644 index 94b1526884..0000000000 --- a/clientlib/python/samples/pointset/custom_point_factory.py +++ /dev/null @@ -1,123 +0,0 @@ -""" -Sample: Custom Point Implementation (Dependency Injection) - -This script demonstrates how to create a custom implementation of `BasicPoint` -and inject it into the `PointsetManager` using the `point_factory`. - -SCENARIO: -1. **CustomPoint**: A custom subclass of `BasicPoint` that simulates a "Sine Wave" - sensor reading directly within its `get_value()` method. -2. **Factory Injection**: We provide a custom factory callable to the `PointsetManager` - which instantiates our `CustomPoint` instead of the default. -3. **Result**: The application doesn't need an external update loop; - the point itself executes its reading logic pulled by the manager. -""" - -import logging -import math -import sys -import threading -import time -from typing import Any, Optional - -from udmi.core.factory import create_device, get_default_managers -from udmi.core.managers import PointsetManager -from udmi.core.managers.point.basic_point import BasicPoint -from udmi.schema import AuthProvider, Basic, EndpointConfiguration, PointPointsetModel - -# --- Configuration --- -DEVICE_ID = "AHU-2" -MQTT_HOSTNAME = "localhost" -MQTT_PORT = 1883 -BROKER_USERNAME = "pyudmi-device" -BROKER_PASSWORD = "somesecureword" -TOPIC_PREFIX = "/r/ZZ-TRI-FECTA/d/" - -logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(message)s') -LOGGER = logging.getLogger("CustomPointSample") - - -class SineWavePoint(BasicPoint): - """ - A custom Point that generates a sine wave reading natively. - Demonstrates encapsulation of data acquisition at the individual point level. - By overriding `get_value()`, the point itself becomes responsible for its own - state calculation without relying on external loops or global callbacks. - """ - def __init__(self, name: str, model: Optional[PointPointsetModel] = None): - super().__init__(name, model) - self._start_time = time.time() - LOGGER.info(f"Custom SineWavePoint '{name}' instantiated.") - - def get_value(self) -> Any: - # Simulate a sine wave over time - elapsed = time.time() - self._start_time - val = 20.0 + 5.0 * math.sin(elapsed / 10.0) - return round(val, 2) - - def set_value(self, value: Any) -> Any: - LOGGER.info(f"[{self._name}] Actuation order received applied: {value}") - return value - - def validate_value(self, value: Any) -> bool: - return True - - -def sine_wave_factory(name: str, model: Optional[PointPointsetModel] = None): - """ - Factory creating SineWavePoint for all points. - Acts as the Dependency Injection provider for the PointsetManager. When the - manager receives a configuration to manage a new point, it uses this factory - to instantiate the custom user-defined Point class instead of the default. - """ - return SineWavePoint(name, model) - - -if __name__ == "__main__": - try: - # 1. Configure Connection - client_id = f"{TOPIC_PREFIX}{DEVICE_ID}" - endpoint = EndpointConfiguration( - client_id=client_id, - hostname=MQTT_HOSTNAME, - port=MQTT_PORT, - topic_prefix=TOPIC_PREFIX, - auth_provider=AuthProvider( - basic=Basic( - username=BROKER_USERNAME, - password=BROKER_PASSWORD - ) - ) - ) - - # 2. Get Default Managers with Custom Point Factory - managers = get_default_managers(point_factory=sine_wave_factory) - - # 4. Create Device - device = create_device(endpoint, managers) - - # 5. Add Points - manager = device.get_manager(PointsetManager) - # These points will now use SineWavePoint implicitly via the factory! - manager.add_point("supply_air_temp") - manager.add_point("return_air_temp") - - # 6. Start Device in background - device_thread = threading.Thread(target=device.run, daemon=True) - device_thread.start() - - LOGGER.info(f"Device {DEVICE_ID} running using Custom Point Layer.") - print("\n" + "=" * 60) - print("DEMO INSTRUCTIONS:") - print("1. Observe individual 'SineWavePoint instantiated' logs.") - print("2. The point values are pulled directly inside get_value().") - print("3. There is no manual update_sensors loop needed!") - print("=" * 60 + "\n") - - # Keep alive - while True: - time.sleep(1) - - except KeyboardInterrupt: - LOGGER.info("Stopping...") - sys.exit(0) diff --git a/clientlib/python/samples/pointset/writeback_legacy_with_handler.py b/clientlib/python/samples/pointset/point_writeback.py similarity index 89% rename from clientlib/python/samples/pointset/writeback_legacy_with_handler.py rename to clientlib/python/samples/pointset/point_writeback.py index be7f678087..5c27537d5b 100644 --- a/clientlib/python/samples/pointset/writeback_legacy_with_handler.py +++ b/clientlib/python/samples/pointset/point_writeback.py @@ -49,9 +49,6 @@ def on_writeback(point_name: str, value: Any): """ Callback triggered when the Cloud sends a 'set_value' in the configuration. - Acts as the single global actuator function (Legacy approach). It is responsible - for parsing the writeback command, commanding the physical hardware, simulating - feedback, and returning the resulting ValueState or an explicit WritebackResult. This callback can return: - None / ValueState.applied -> Mark update as success (applied) @@ -108,11 +105,9 @@ def on_writeback(point_name: str, value: Any): pointset_manager = device.get_manager(PointsetManager) # --------------------------------------------------------------------- - # DEPRECATED: Register the global Writeback Handler - # --------------------------------------------------------------------- - # This is the legacy approach for point actuation. It is recommended to - # use the point_factory pattern and subclass AbstractPoint instead. - # See custom_point_factory.py for the modern approach. + # CRITICAL STEP: Register the Writeback Handler + # Without this, the library updates its internal state to match the config, + # but your physical device remains unchanged. # --------------------------------------------------------------------- pointset_manager.set_writeback_handler(on_writeback) LOGGER.info("Writeback handler registered.") diff --git a/clientlib/python/samples/pointset/pointset_dynamic_provisioning.py b/clientlib/python/samples/pointset/pointset_dynamic_provisioning.py index 25876d507e..89a6500d5a 100644 --- a/clientlib/python/samples/pointset/pointset_dynamic_provisioning.py +++ b/clientlib/python/samples/pointset/pointset_dynamic_provisioning.py @@ -50,9 +50,6 @@ def report_managed_points(manager: PointsetManager): """ Periodically logs the inventory of points the device is tracking. - A diagnostic loop serving to prove that the PointsetManager is successfully - allocating and managing new point abstractions dynamically based purely on the - arrival of cloud configuration payloads. """ last_set = set() diff --git a/clientlib/python/samples/pointset/telemetry_basic.py b/clientlib/python/samples/pointset/telemetry_basic.py index ac253de9d7..3570b51698 100644 --- a/clientlib/python/samples/pointset/telemetry_basic.py +++ b/clientlib/python/samples/pointset/telemetry_basic.py @@ -46,9 +46,6 @@ def update_sensors_loop(pointset_manager): """ Simulates a loop that reads hardware sensors and updates the PointsetManager. - Represents an independent Data Acquisition (DAQ) thread. It is completely decoupled - from the network telemetry reporting cycle. Its sole job is to keep the PointsetManager's - internal cache fresh with the latest hardware readings. """ LOGGER.info("Starting sensor simulation loop (Interval: 2s)...") diff --git a/clientlib/python/samples/pointset/telemetry_poll_bulk_provider.py b/clientlib/python/samples/pointset/telemetry_poll_bulk_provider.py deleted file mode 100644 index 9714ec5ba5..0000000000 --- a/clientlib/python/samples/pointset/telemetry_poll_bulk_provider.py +++ /dev/null @@ -1,101 +0,0 @@ -""" -Sample: Bulk Telemetry Provider - -This script demonstrates the modern approach for supplying hardware-read values -to the PointsetManager immediately before telemetry is published. By registering -a `BulkPointProvider`, the internal values of managed points are batched-updated. -""" - -import logging -import sys -import threading -import time -from typing import Any, Dict - -from udmi.core.factory import create_device, get_default_managers -from udmi.core.managers import PointsetManager -from udmi.core.managers.point.bulk_provider import BulkPointProvider -from udmi.schema import AuthProvider, Basic, EndpointConfiguration - -DEVICE_ID = "AHU-1" -REGISTRY_ID = "ZZ-TRI-FECTA" -MQTT_HOSTNAME = "localhost" -MQTT_PORT = 1883 -BROKER_USERNAME = "pyudmi-device" -BROKER_PASSWORD = "somesecureword" -TOPIC_PREFIX = "/r/ZZ-TRI-FECTA/d/" - -logging.basicConfig(level=logging.INFO, - format='%(asctime)s - %(levelname)s - %(message)s') -LOGGER = logging.getLogger("BulkProviderSample") - - -class MockModbusProvider(BulkPointProvider): - """ - A mock class simulating a connection to Modbus hardware that provides - all point data in a single batch read. - - Acts as the bridge between the physical hardware layer and the UDMI software - layer. It abstracts the I/O complexity (like serial bus polling) so the PointsetManager - receives data efficiently in bulk. - """ - def __init__(self): - self._mock_data = { - "temperature": 22.5, - "humidity": 45.0, - "pressure": 1013.2 - } - - def read_points(self) -> Dict[str, Any]: - """ - Provides the batch telemetry for UDMI. - - Executes the physical read against sensors during the start of every telemetry - transmission window. Returns a dictionary mapping point names to their values. - """ - LOGGER.info(f"Modbus batch read yielding: {self._mock_data}") - self._mock_data["temperature"] += 0.1 - return self._mock_data - - -if __name__ == "__main__": - try: - client_id = f"{TOPIC_PREFIX}{DEVICE_ID}" - endpoint = EndpointConfiguration( - client_id=client_id, - hostname=MQTT_HOSTNAME, - port=MQTT_PORT, - topic_prefix=TOPIC_PREFIX, - auth_provider=AuthProvider( - basic=Basic( - username=BROKER_USERNAME, - password=BROKER_PASSWORD - ) - ) - ) - - # Initialize the provider and pass it to the default managers factory - modbus_hw = MockModbusProvider() - managers = get_default_managers(points_bulk_provider=modbus_hw) - - device = create_device(endpoint, managers) - pointset_manager = device.get_manager(PointsetManager) - - # For the sake of the sample, we add the points explicitly. - # Normally these would be provisioned via cloud `config.pointset`. - pointset_manager.add_point("temperature") - pointset_manager.add_point("humidity") - pointset_manager.add_point("pressure") - - t = threading.Thread(target=device.run, daemon=True) - t.start() - - LOGGER.info(f"Device {DEVICE_ID} is running.") - LOGGER.info("The application will read from the provider automatically based on sample_rate_sec.") - - while True: - time.sleep(1) - - except KeyboardInterrupt: - LOGGER.info("Stopping...") - sys.exit(0) diff --git a/clientlib/python/samples/pointset/telemetry_poll_callback.py b/clientlib/python/samples/pointset/telemetry_poll_callback.py index ecd60acac5..96e1e73ef2 100644 --- a/clientlib/python/samples/pointset/telemetry_poll_callback.py +++ b/clientlib/python/samples/pointset/telemetry_poll_callback.py @@ -47,10 +47,9 @@ def my_sensor_poll() -> Dict[str, Any]: """ Called by the PointsetManager background thread. - Provides a Just-in-Time (Pull) mechanism for data acquisition. Instead of - running a separate DAQ loop, the user delegates the polling rhythm to the - PointsetManager. This function must perform synchronous hardware reads and - return a dictionary of {point_name: value}. + + This function should perform the actual hardware reads (e.g. I2C, Modbus). + It must return a dictionary of {point_name: value}. """ # Simulate reading hardware registers temp = round(random.uniform(20.0, 25.0), 2) diff --git a/clientlib/python/samples/pointset/writeback_expiration.py b/clientlib/python/samples/pointset/writeback_expiration.py deleted file mode 100644 index 458c812e56..0000000000 --- a/clientlib/python/samples/pointset/writeback_expiration.py +++ /dev/null @@ -1,116 +0,0 @@ -""" -Sample: Pointset Writeback Expiration (set_value_expiry) - -This script demonstrates the UDMI writeback expiration functionality. -When a point's value is overriden via a `set_value` config command, -the cloud can optionally provide a `set_value_expiry` timestamp. If the -cloud does not refresh the configuration before this timestamp, the point -will automatically revert to its base state. - -SCENARIO: -1. **CustomPoint**: A simple subclass of `BasicPoint` that prints when its value changes. -2. **Actuation**: You publish a config setting 'override_point' to 100 with an expiry in 5 seconds. -3. **Expiration**: The UDMI `PointsetManager` detects the expiration and automatically - clears the state, reverting the point back to its default reading. -""" - -import logging -import sys -import threading -import time -from datetime import datetime, timedelta, timezone -from typing import Any, Optional - -from udmi.core.factory import create_device, get_default_managers -from udmi.core.managers import PointsetManager -from udmi.core.managers.point.basic_point import BasicPoint -from udmi.schema import AuthProvider, Basic, EndpointConfiguration, PointPointsetModel - -# --- Configuration --- -DEVICE_ID = "AHU-3" -MQTT_HOSTNAME = "localhost" -MQTT_PORT = 1883 -BROKER_USERNAME = "pyudmi-device" -BROKER_PASSWORD = "somesecureword" -TOPIC_PREFIX = "/r/ZZ-TRI-FECTA/d/" - -logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(message)s') -LOGGER = logging.getLogger("WritebackExpirationSample") - - -class ExpiringPoint(BasicPoint): - """ - A custom Point that defaults to 0 but can be overriden by writebacks. - """ - def __init__(self, name: str, model: Optional[PointPointsetModel] = None): - super().__init__(name, model) - self._writable = True # Override basic_point to make it always writable for the sample - - def get_value(self) -> Any: - return 0.0 - - def set_value(self, value: Any) -> Any: - LOGGER.info(f"[{self._name}] Actuation order received: Setting to {value}") - return value - - def validate_value(self, value: Any) -> bool: - return True - -def expiring_point_factory(name: str, model: Optional[PointPointsetModel] = None): - return ExpiringPoint(name, model) - -if __name__ == "__main__": - try: - # 1. Configure Connection - client_id = f"{TOPIC_PREFIX}{DEVICE_ID}" - endpoint = EndpointConfiguration( - client_id=client_id, - hostname=MQTT_HOSTNAME, - port=MQTT_PORT, - topic_prefix=TOPIC_PREFIX, - auth_provider=AuthProvider( - basic=Basic( - username=BROKER_USERNAME, - password=BROKER_PASSWORD - ) - ) - ) - - # 2. Get Default Managers with Custom Point Factory - managers = get_default_managers(point_factory=expiring_point_factory) - - # 3. Create Device - device = create_device(endpoint, managers) - - # 4. Add Points - manager = device.get_manager(PointsetManager) - manager.add_point("override_point") - - # 5. Start Device - device_thread = threading.Thread(target=device.run, daemon=True) - device_thread.start() - - LOGGER.info(f"Device {DEVICE_ID} running.") - - # Calculate timestamps for the demo - now = datetime.now(timezone.utc) - config_ts = now.isoformat(timespec='seconds') - expiry_ts = (now + timedelta(seconds=10)).isoformat(timespec='seconds') - - print("\n" + "=" * 60) - print("DEMO INSTRUCTIONS:") - print("Publish the following JSON config using mosquitto_pub to trigger a 10s expiration override:") - print("-" * 20) - print(f"mosquitto_pub -h {MQTT_HOSTNAME} -p {MQTT_PORT} -u {BROKER_USERNAME} -P {BROKER_PASSWORD} -t '{TOPIC_PREFIX}{DEVICE_ID}/config' -m \\") - print(f" '{{ \"timestamp\": \"{config_ts}\", \"pointset\": {{ \"set_value_expiry\": \"{expiry_ts}\", \"points\": {{ \"override_point\": {{ \"set_value\": 100.0 }} }} }} }}'") - print("-" * 20) - print("Once sent, watch the logs. In 10 seconds, the point will automatically revert.") - print("=" * 60 + "\n") - - # Keep alive - while True: - time.sleep(1) - - except KeyboardInterrupt: - LOGGER.info("Stopping...") - sys.exit(0) diff --git a/clientlib/python/samples/pointset/writeback_modern_with_point_factory.py b/clientlib/python/samples/pointset/writeback_modern_with_point_factory.py deleted file mode 100644 index c988504e3d..0000000000 --- a/clientlib/python/samples/pointset/writeback_modern_with_point_factory.py +++ /dev/null @@ -1,140 +0,0 @@ -""" -Sample: Writeback (Modern Approach via Custom Point) - -This script demonstrates how to handle "Set Value" commands from the cloud -using the modern approach: encapsulating the actuation logic directly within -a custom Point implementation. - -SCENARIO: -1. **CustomPoint**: A custom subclass of `BasicPoint` that overrides `set_value`. -2. **Actuation**: You send a config with `"set_value": 24.0` for 'thermostat_target'. -3. **Encapsulation**: The manager routes the writeback to the specific point's `set_value()`. -4. **Result**: The point handles validation, simulates hardware feedback, and returns the state. -""" - -import logging -import sys -import threading -import time -from typing import Any, Optional - -from udmi.core.factory import create_device, get_default_managers -from udmi.core.managers import PointsetManager, WritebackResult -from udmi.core.managers.point.basic_point import BasicPoint -from udmi.schema import AuthProvider, Basic, EndpointConfiguration, PointPointsetModel -from udmi.schema import Entry, ValueState - -# --- CONFIGURATION --- -DEVICE_ID = "AHU-1" -MQTT_HOSTNAME = "localhost" -MQTT_PORT = 1883 -BROKER_USERNAME = "pyudmi-device" -BROKER_PASSWORD = "somesecureword" -TOPIC_PREFIX = "/r/ZZ-TRI-FECTA/d/" - -logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') -LOGGER = logging.getLogger("WritebackModernSample") - - -class ActuatingPoint(BasicPoint): - """ - A custom Point that handles its own underlying hardware writebacks. - - Replaces the legacy global callback by encapsulating the actuation - (hardware interaction) logic securely inside the point's set_value implementation. - """ - def __init__(self, name: str, model: Optional[PointPointsetModel] = None): - super().__init__(name, model) - # We manually mark this True for the demo, normally set by cloud metadata - self._writable = True - - def get_value(self) -> Any: - return 0.0 - - def set_value(self, value: Any) -> Any: - """ - Invoked automatically by the manager when a writeback occurs. - Can return ValueState, WritebackResult, or silently return None/ValueState.applied. - """ - LOGGER.info("!" * 60) - LOGGER.info(f"[{self._name}] HARDWARE ACTUATION: Set To {value}") - LOGGER.info("!" * 60) - - # Example 1: Validation failure - if isinstance(value, (int, float)) and value < 0: - LOGGER.warning(f"[{self._name}] Rejecting negative value: {value}") - return ValueState.invalid - - # Example 2: Simulating mechanical failure - if value == 999.0: - LOGGER.error(f"[{self._name}] Simulating mechanical jam") - return WritebackResult( - value_state=ValueState.failure, - status=Entry(message="Actuator mechanical jam identified", level=500) - ) - - # Example 3: Crash - if value == 666.0: - LOGGER.error(f"[{self._name}] Simulating crash") - raise RuntimeError("Unexpected write failure crash") - - time.sleep(0.1) - LOGGER.info(f"[{self._name}] Hardware confirmed actuation.") - return ValueState.applied - - def validate_value(self, value: Any) -> bool: - return True - - -def actuating_point_factory(name: str, model: Optional[PointPointsetModel] = None): - return ActuatingPoint(name, model) - - -if __name__ == "__main__": - try: - client_id = f"{TOPIC_PREFIX}{DEVICE_ID}" - endpoint = EndpointConfiguration( - client_id=client_id, - hostname=MQTT_HOSTNAME, - port=MQTT_PORT, - topic_prefix=TOPIC_PREFIX, - auth_provider=AuthProvider( - basic=Basic( - username=BROKER_USERNAME, - password=BROKER_PASSWORD - ) - ) - ) - - managers = get_default_managers(point_factory=actuating_point_factory) - device = create_device(endpoint, managers) - pointset_manager = device.get_manager(PointsetManager) - - pointset_manager.add_point("thermostat_target") - - t = threading.Thread(target=device.run, daemon=True) - t.start() - - LOGGER.info(f"Device {DEVICE_ID} is running.") - - print("\n" + "=" * 60) - print("DEMO INSTRUCTIONS:") - print("Publish JSON config updates using mosquitto_pub:") - print("-" * 20) - print("1. Success case (Applied):") - print(f" mosquitto_pub ... -m '{{ \"pointset\": {{ \"points\": {{ \"thermostat_target\": {{ \"set_value\": 24.0 }} }} }} }}'") - print("\n2. Invalid case (-5.0):") - print(f" mosquitto_pub ... -m '{{ \"pointset\": {{ \"points\": {{ \"thermostat_target\": {{ \"set_value\": -5.0 }} }} }} }}'") - print("\n3. Failure case (999.0):") - print(f" mosquitto_pub ... -m '{{ \"pointset\": {{ \"points\": {{ \"thermostat_target\": {{ \"set_value\": 999.0 }} }} }} }}'") - print("\n4. Crash case (666.0):") - print(f" mosquitto_pub ... -m '{{ \"pointset\": {{ \"points\": {{ \"thermostat_target\": {{ \"set_value\": 666.0 }} }} }} }}'") - print("-" * 20) - print("=" * 60 + "\n") - - while True: - time.sleep(1) - - except KeyboardInterrupt: - LOGGER.info("Stopping...") - sys.exit(0) diff --git a/clientlib/python/src/udmi/core/factory.py b/clientlib/python/src/udmi/core/factory.py index 5e35783b44..dc224a259f 100644 --- a/clientlib/python/src/udmi/core/factory.py +++ b/clientlib/python/src/udmi/core/factory.py @@ -60,19 +60,13 @@ def get_default_managers(**kwargs) -> List[BaseManager]: **kwargs: system_state (SystemState): Initial system state for SystemManager. sample_rate_sec (int): Sample rate for PointsetManager. - point_factory (Callable): Custom point factory for PointsetManager. - points_bulk_provider (BulkPointProvider): Optional provider for bulk telemetry reads. """ system_state = kwargs.get('system_state') sample_rate_sec = kwargs.get('sample_rate_sec', DEFAULT_SAMPLE_RATE_SEC) - point_factory = kwargs.get('point_factory') - points_bulk_provider = kwargs.get('points_bulk_provider') return [ SystemManager(system_state=system_state), - PointsetManager(sample_rate_sec=sample_rate_sec, - point_factory=point_factory, - bulk_provider=points_bulk_provider), + PointsetManager(sample_rate_sec=sample_rate_sec), LocalnetManager(), GatewayManager(), DiscoveryManager() diff --git a/clientlib/python/src/udmi/core/managers/point/__init__.py b/clientlib/python/src/udmi/core/managers/point/__init__.py deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/clientlib/python/src/udmi/core/managers/point/abstract_point.py b/clientlib/python/src/udmi/core/managers/point/abstract_point.py deleted file mode 100644 index 0c6fd44f95..0000000000 --- a/clientlib/python/src/udmi/core/managers/point/abstract_point.py +++ /dev/null @@ -1,91 +0,0 @@ -"""Abstract point definition.""" -import abc -from typing import Any - -from udmi.schema import PointPointsetConfig -from udmi.schema import PointPointsetEvents -from udmi.schema import PointPointsetState -from udmi.schema import PointPointsetModel -from udmi.schema import RefDiscovery - - -class AbstractPoint(abc.ABC): - """ - Interface representing a point reading. - Defines the strict contract that any point implementation must adhere to, - ensuring the PointsetManager remains agnostic to data acquisition details. - - Defines lifecycle methods (update_data, set_config, set_model) and state - reporting methods (get_data, get_state, is_dirty) that concrete classes - must implement to align with the UDMI specification. - """ - - @abc.abstractmethod - def get_name(self) -> str: - """ - Provides the string identifier that maps to the pointset block in UDMI payloads. - """ - - @abc.abstractmethod - def get_data(self) -> PointPointsetEvents: - """ - Provides the compiled telemetry data payload for this point to be included - in PointsetEvents. - """ - - @abc.abstractmethod - def update_data(self) -> None: - """ - Instructs the point to perform any necessary IO or state checks to refresh - its telemetry data. - """ - - @abc.abstractmethod - def is_dirty(self) -> bool: - """ - Returns if the state of the point has changed. - Indicates to the PointsetManager whether this point's state needs to be re-aggregated - and a new state_etag generated because of configuration or status changes. - """ - - @abc.abstractmethod - def get_state(self) -> PointPointsetState: - """ - Provides the compiled state payload (including value_state and status) for PointsetState. - """ - - @abc.abstractmethod - def set_config(self, config: PointPointsetConfig, **kwargs: 'Any') -> None: - """ - Sets the config of the point. - Applies dynamic changes (like writebacks via set_value) from a cloud config update. - """ - - @abc.abstractmethod - def should_report(self, sample_rate_sec: int) -> bool: - """ - Determines if this point needs to be reported. - Evaluates the Change of Value (COV) tolerance and Heartbeat interval to inform - the PointsetManager whether this point should be included in the next telemetry payload. - """ - - @abc.abstractmethod - def mark_reported(self) -> None: - """ - Updates the reporting state after a successful publish. - Caches the last reported value and timestamp to reset COV and Heartbeat state machines. - """ - - @abc.abstractmethod - def set_model(self, model: PointPointsetModel) -> None: - """ - Applies static definition from Metadata. - Initializes point properties (like units, type, or writable flags) during device startup. - """ - - @abc.abstractmethod - def enumerate(self) -> RefDiscovery: - """ - Returns discovery information for this point. - Formats the point's static details for inclusion in the Discovery block. - """ diff --git a/clientlib/python/src/udmi/core/managers/point/basic_point.py b/clientlib/python/src/udmi/core/managers/point/basic_point.py deleted file mode 100644 index 10b6e4d9b9..0000000000 --- a/clientlib/python/src/udmi/core/managers/point/basic_point.py +++ /dev/null @@ -1,346 +0,0 @@ -"""Basic point definition.""" -import abc -import copy -import threading -import time -from datetime import datetime -from datetime import timezone -from typing import Any -from typing import Callable -from typing import Optional - -from udmi.core.managers.point.abstract_point import AbstractPoint -from udmi.schema import Category -from udmi.schema import Entry -from udmi.schema import PointPointsetConfig -from udmi.schema import PointPointsetEvents -from udmi.schema import PointPointsetModel -from udmi.schema import PointPointsetState -from udmi.schema import ValueState -from udmi.schema import RefDiscovery - -DEFAULT_HEARTBEAT_SEC = 600 - -class BasicPoint(AbstractPoint): # pylint: disable=too-many-instance-attributes - """ - Abstract representation of a basic data point. - - Provides the foundational boilerplate logic for a point, including state management, - dirty flag handling, validation, and Change of Value (COV) calculations. Developers - subclass this to implement hardware-specific get_value and set_value mechanics. - - State Machine Details: - - Writable values move through states: None -> applied, invalid, or failure based on config. - - Dirty State: The _dirty flag is set when config, value_state, or status changes, - triggering the PointsetManager to regenerate the global device state and state_etag. - - Reporting State: Tracks _last_reported_value and _last_reported_time to accurately - execute COV and periodic heartbeat reporting logic. - """ - - def __init__(self, name: str, model: Optional[PointPointsetModel] = None): - self._name = name - self._data = PointPointsetEvents() - self._state = PointPointsetState() - - self._writable = model.writable if (model and model.writable) else False - self._ref = model.ref if model else None - - if model and model.units: - self._state.units = model.units - - self._written = False - self._dirty = True - - self._expiry_time: Optional[float] = None - - # Store full model for subclass extensibility - self._model = model - - # COV and Heartbeat - self._cov_increment: Optional[float] = None - self._last_reported_value: Any = None - self._last_reported_time: float = 0.0 - - def get_name(self) -> str: - return self._name - - def get_data(self) -> PointPointsetEvents: - return self._data - - def is_dirty(self) -> bool: - return self._dirty - - def get_state(self) -> PointPointsetState: - self._dirty = False - return self._state - - @property - def value_state(self): - """Gets value state.""" - return self._state.value_state - - @value_state.setter - def value_state(self, value): - """Sets value state.""" - self._state.value_state = value - self._dirty = True - - @property - def status(self): - """Gets status.""" - return self._state.status - - @status.setter - def status(self, value): - """Sets status.""" - self._state.status = value - self._dirty = True - - def update_data(self) -> None: - """ - Updates the telemetry data from the implementation source. - Calls the abstract get_value method to refresh the point's data payload, - unless a writeback is currently active (in which case it retains the written value). - """ - if not self._written: - self._data.present_value = self.get_value() - - def set_model(self, model: PointPointsetModel) -> None: - """ - Applies static definition from Metadata. - Initializes point properties (like units or ref overrides) during device startup - and flags the point as dirty so the State is regenerated. - """ - if not model: - return - if model.units: - self._state.units = model.units - if model.ref: - self._ref = model.ref - self._dirty = True - - def clear_writeback(self) -> None: - """ - Clears the writeback state and reverts to base state. - Handles the expiration of a set_value_expiry timer by wiping the overriden value - state and forcing an immediate re-read from the underlying hardware. - """ - self._written = False - self._state.value_state = None - self._state.status = None - self._expiry_time = None - self.update_data() - self._dirty = True - - def set_config(self, config: PointPointsetConfig, **kwargs: 'Any') -> None: - """ - Set the configuration for this point, nominally to indicate writing a value. - Entrypoint for the PointsetManager to push config updates or writebacks down to the point. - Checks if the payload changed and manages the _dirty flag. - """ - previous_value_state = self._state.value_state - previous_status = copy.deepcopy(self._state.status) - - invalid_expiry = kwargs.get('invalid_expiry', False) - is_expired = kwargs.get('is_expired', False) - on_state_change = kwargs.get('on_state_change') - - self._update_state_config(config, invalid_expiry, is_expired, on_state_change) - - state_changed = self._state.value_state != previous_value_state - status_changed = self._state.status != previous_status - - if state_changed or status_changed: - self._dirty = True - - def _update_state_config(self, - config: Optional[PointPointsetConfig], - invalid_expiry: bool = False, - is_expired: bool = False, - on_state_change: Optional[Callable] = None) -> None: - # pylint: disable=too-many-return-statements - """ - Update the state of this point based off of a new config. - The core writeback state machine logic. Validates ref matching, expiry timestamps, - writeability, and delegates to the hardware validation logic. Executes hardware - actuation asynchronously, immediately transitioning into `updating` state, and - bubbles up Entries once `applied` or `failure` state is reached. - """ - self._state.status = None - - if config: - if config.cov_increment is not None: - self._cov_increment = config.cov_increment - if config.units is not None: - self._state.units = config.units - - # 1. Validate Ref - if config is not None and config.ref != self._ref: - self._state.status = self._create_entry( - Category.POINTSET_POINT_FAILURE, "Invalid point ref" - ) - return - - # 2. Check if set_value is present (Release/Null check) - if config is None or config.set_value is None: - self.clear_writeback() - return - - if invalid_expiry: - self.clear_writeback() - self._state.status = self._create_entry( - Category.POINTSET_POINT_INVALID, "Invalid or missing set_value_expiry" - ) - self._state.value_state = ValueState.invalid - return - - if is_expired: - self.clear_writeback() - return - - # 3. Validate Value - try: - if not self.validate_value(config.set_value): - self._state.status = self._create_entry( - Category.POINTSET_POINT_INVALID, - "Written value is not valid" - ) - self._state.value_state = ValueState.invalid - return - except Exception as ex: # pylint: disable=broad-exception-caught - self._state.status = self._create_entry( - Category.POINTSET_POINT_FAILURE, str(ex) - ) - self._state.value_state = ValueState.failure - return - - # 4. Check Writable - if not self._writable: - self._state.status = self._create_entry( - Category.POINTSET_POINT_FAILURE, "Point is not writable" - ) - self._state.value_state = ValueState.failure - return - - # 5. Apply Value - def execute_writeback(val: Any) -> None: - try: - result = self.set_value(val) - self._data.present_value = result - self.value_state = ValueState.applied - self._written = True - except Exception as ex: # pylint: disable=broad-exception-caught - self.status = self._create_entry( - Category.POINTSET_POINT_FAILURE, str(ex) - ) - self.value_state = ValueState.failure - finally: - if on_state_change: - on_state_change() - - self._state.value_state = ValueState.updating - thread = threading.Thread(target=execute_writeback, args=(config.set_value,)) - thread.daemon = True - thread.start() - - def _create_entry(self, category: Category, message: str) -> Entry: - """Helper to create a status Entry object.""" - entry = Entry() - entry.detail = f"Point {self._name} (writable {self._writable})" - entry.timestamp = datetime.now(timezone.utc).isoformat() - entry.message = message - entry.category = category - entry.level = category.level - return entry - - @abc.abstractmethod - def get_value(self) -> Any: - """ - Return the current reading from the source. - Abstract method where subclasses implement their read operations (IO, GPIO, Modbus, etc). - """ - - @abc.abstractmethod - def set_value(self, value: Any) -> Any: - """ - Write the value to the source and return the applied value. - Abstract method where subclasses implement their hardware actuation logic. - Should return the value that was actually confirmed by the hardware. - """ - - @abc.abstractmethod - def validate_value(self, value: Any) -> bool: - """ - Check if the value is valid for this point. - Abstract hook for subclasses to reject writebacks (e.g., bounds checking) before - an attempt to write to hardware is made. - """ - - @abc.abstractmethod - def _populate_enumeration(self, point: RefDiscovery) -> None: - """Hook for subclasses to populate extra enumeration fields.""" - - def enumerate(self) -> RefDiscovery: - """ - Returns discovery information for this point. - Aggregates subclass-specific data with base properties to compile the Discovery payload. - """ - point = RefDiscovery() - point.description = f"{self.__class__.__name__} {self.get_name()}" - point.writable = True if self._writable else None - if self._state.units: - point.units = self._state.units - if self._ref: - point.ref = self._ref - self._populate_enumeration(point) - return point - - def should_report(self, sample_rate_sec: int) -> bool: - """ - Determines if this point needs to be reported based on Change of Value (COV) - and Heartbeat logic. - Compares the current present_value against the _last_reported_value to invoke COV - reporting, otherwise falls back to evaluating the _last_reported_time against the - heartbeat interval. - """ - if self._data.present_value is None: - return False - - now = time.time() - - if self._last_reported_value is None: - return True - - should_report_cov = False - - if self._data.present_value != self._last_reported_value: - is_numeric = (isinstance(self._data.present_value, (int, float)) and - not isinstance(self._data.present_value, bool) and - isinstance(self._last_reported_value, (int, float)) and - not isinstance(self._last_reported_value, bool)) - - if is_numeric and self._cov_increment is not None: - delta = abs(self._data.present_value - self._last_reported_value) - # Ensure we handle casting just in case - if delta >= self._cov_increment: - should_report_cov = True - else: - should_report_cov = True - - if should_report_cov: - return True - - # Default heartbeat or sample rate - heartbeat_interval = sample_rate_sec if sample_rate_sec > 0 else DEFAULT_HEARTBEAT_SEC - if (now - self._last_reported_time) >= heartbeat_interval: - return True - - return False - - def mark_reported(self) -> None: - """ - Updates the reporting state after a successful publish. - Caches the last reported value and timestamp to reset COV and Heartbeat state machines. - """ - self._last_reported_value = self._data.present_value - self._last_reported_time = time.time() diff --git a/clientlib/python/src/udmi/core/managers/point/bulk_provider.py b/clientlib/python/src/udmi/core/managers/point/bulk_provider.py deleted file mode 100644 index 1837155d44..0000000000 --- a/clientlib/python/src/udmi/core/managers/point/bulk_provider.py +++ /dev/null @@ -1,24 +0,0 @@ -""" -Provides the BulkPointProvider interface. -""" -from abc import ABC, abstractmethod -from typing import Any, Dict - -class BulkPointProvider(ABC): # pylint: disable=too-few-public-methods - """ - Interface for providing bulk telemetry reads to the PointsetManager. - This interface allows for batch reading of hardware points (e.g., Modbus, BACnet) at - the start of a telemetry cycle, significantly reducing IO overhead compared to - polling points individually. - """ - - @abstractmethod - def read_points(self) -> Dict[str, Any]: - """ - Reads the current values of all supported points from hardware. - Executes a batch IO operation to retrieve the latest sensor readings and returns them - as a mapping to be injected into the PointsetManager's pipeline. - - Returns: - Dict[str, Any]: A mapping of point names to their present values. - """ diff --git a/clientlib/python/src/udmi/core/managers/point/virtual_point.py b/clientlib/python/src/udmi/core/managers/point/virtual_point.py deleted file mode 100644 index e8c9773517..0000000000 --- a/clientlib/python/src/udmi/core/managers/point/virtual_point.py +++ /dev/null @@ -1,138 +0,0 @@ -"""Virtual point definition.""" -import warnings -from typing import Any -from typing import Optional - -from udmi.core.managers.point.basic_point import BasicPoint -from udmi.schema import PointPointsetModel -from udmi.schema import RefDiscovery - - -class Point(BasicPoint): - """ - Default concrete implementation of a Point. - Acts as an in-memory "virtual" point that simply stores and retrieves a present value. - Useful for testing, simulations, and software-only variables where no hardware integration - is needed. - - - Maintains a local _present_value. - - Inherits COV and heartbeat state machine from BasicPoint. - - Writebacks instantly apply to the in-memory value without hardware IO operations. - """ - - def __init__(self, name: str, model: Optional[PointPointsetModel] = None): - super().__init__(name, model) - # Seed with baseline_value from model if present - self._present_value: Any = ( - model.baseline_value if (model and model.baseline_value is not None) else None - ) - - def get_value(self) -> Any: - """ - Overrides the abstract get_value to return the in-memory _present_value. - """ - return self._present_value - - def set_value(self, value: Any) -> Any: - """ - Concrete implementation of set_value, returning the value as applied. - Accepts the set_value command from the cloud and applies it directly to the - in-memory state. - """ - return value - - def validate_value(self, value: Any) -> bool: - """ - Default validation that takes all values as valid. - Since this is a virtual point, it places no restrictions on the type or format - of the data written to it. - """ - return True - - def set_present_value(self, value: Any) -> None: - """ - API for manual injection of values (e.g., from sample scripts). - Provides an external interface to mutate the point's value and optionally - resets its error status to simulate hardware recovery. - """ - self._present_value = value - if self.status and self.status.level >= 500: - self.status = None - - def _populate_enumeration(self, point: RefDiscovery) -> None: - """Concrete implementation doing nothing by default.""" - - # ========================================================================= - # BACKWARD COMPATIBILITY LAYER - # ========================================================================= - - @property - def present_value(self): - """ - Get the point's present value. - """ - warnings.warn( - "Direct access to 'present_value' is deprecated in v2.0.0. " - "Use 'get_data().present_value'.", - DeprecationWarning, stacklevel=2 - ) - return self.get_data().present_value - - @present_value.setter - def present_value(self, value): - """ - Set the point's present value. - """ - warnings.warn( - "Direct setting of 'present_value' is deprecated. " - "Use 'set_present_value(value)'.", - DeprecationWarning, stacklevel=2 - ) - self.set_present_value(value) - - @property - def units(self): - """ - Get the point's units. - """ - warnings.warn( - "Direct access to 'units' is deprecated. " - "Use 'get_state().units'.", - DeprecationWarning, stacklevel=2 - ) - return self.get_state().units - - @units.setter - def units(self, value): - """ - Set the point's units. - """ - warnings.warn( - "Direct setting of 'units' is deprecated. " - "Update metadata models instead.", - DeprecationWarning, stacklevel=2 - ) - self._state.units = value - self._dirty = True - - def get_event(self): - """ - Provides the compiled telemetry data payload for this point to be included - in PointsetEvents. - """ - warnings.warn( - "'get_event()' is deprecated. Use 'get_data()'.", - DeprecationWarning, stacklevel=2 - ) - return self.get_data() - - def update_config(self, config) -> bool: - """ - Update the configuration for this point and return true if state changed. - """ - warnings.warn( - "'update_config()' is deprecated. Use 'set_config()'.", - DeprecationWarning, stacklevel=2 - ) - self.set_config(config) - return self.is_dirty() diff --git a/clientlib/python/src/udmi/core/managers/pointset_manager.py b/clientlib/python/src/udmi/core/managers/pointset_manager.py index 2945c8eeec..96d01fd80e 100644 --- a/clientlib/python/src/udmi/core/managers/pointset_manager.py +++ b/clientlib/python/src/udmi/core/managers/pointset_manager.py @@ -4,13 +4,10 @@ This manager is responsible for handling the 'pointset' block of the config and state, and for reporting pointset telemetry events. """ -import hashlib -import json + import logging import threading import time -import warnings -from dataclasses import dataclass from datetime import datetime from datetime import timezone from typing import Any @@ -19,16 +16,17 @@ from typing import Mapping from typing import Optional from typing import Union +from dataclasses import dataclass from udmi.constants import UDMI_VERSION from udmi.core.managers.base_manager import BaseManager -from udmi.core.managers.point.abstract_point import AbstractPoint -from udmi.core.managers.point.bulk_provider import BulkPointProvider -from udmi.core.managers.point.virtual_point import Point from udmi.schema import Config from udmi.schema import Entry from udmi.schema import Metadata from udmi.schema import PointPointsetConfig +from udmi.schema import PointPointsetEvents +from udmi.schema import PointPointsetModel +from udmi.schema import PointPointsetState from udmi.schema import PointsetEvents from udmi.schema import PointsetState from udmi.schema import State @@ -39,7 +37,6 @@ DEFAULT_SAMPLE_RATE_SEC = 10 DEFAULT_HEARTBEAT_SEC = 600 - @dataclass class WritebackResult: """ @@ -50,117 +47,237 @@ class WritebackResult: # Callback signature: function(point_name, value) -> Optional[ValueState | WritebackResult] -WritebackHandler = Callable[ - [str, Any], Union[None, ValueState, WritebackResult]] +WritebackHandler = Callable[[str, Any], Union[None, ValueState, WritebackResult]] # Poll Callback now returns a dictionary of {point_name: value} PollCallback = Callable[[], Dict[str, Any]] -class PointsetManager(BaseManager): # pylint: disable=too-many-instance-attributes +class Point: + """ + Represents a single data point within the Pointset. + Acts as a container for value, configuration, and status. + Tracks its own reporting state (last reported value/time) for COV logic. + """ + + # pylint: disable=too-many-instance-attributes + + def __init__(self, name: str): + """ + Initializes a new Point. + + Args: + name: The unique identifier for this point (e.g., 'temp_sensor_1'). + """ + self.name = name + self.ref = None + self.present_value: Any = None + self.units: Optional[str] = None + self.status: Optional[Entry] = None + self.value_state: Optional[str] = None + + # Writeback + self.set_value: Any = None + + # Reporting Configuration + self.cov_increment: Optional[float] = None + + # Reporting State + self.last_reported_value: Any = None + self.last_reported_time: float = 0.0 + + def set_model(self, model: PointPointsetModel) -> None: + """ + Applies static definition from Metadata. + + Args: + model: The metadata model defining static properties like units. + """ + if model.units: + self.units = model.units + if model.ref: + self.ref = model.ref + + def set_present_value(self, value: Any) -> None: + """ + Updates the current reading of the point. + + This updates the internal state but does not trigger immediate reporting. + Reporting is handled by the `should_report` check during the telemetry loop. + + Args: + value: The new value reading. + """ + self.present_value = value + if self.status and self.status.level >= 500: + self.status = None + + def update_config(self, config: PointPointsetConfig) -> bool: + """ + Updates point metadata based on config. + + Args: + config: The configuration object for this specific point. + + Returns: + True if a writeback (set_value change) occurred, requiring external handling. + False otherwise. + """ + if config.units is not None: + self.units = config.units + if config.cov_increment is not None: + self.cov_increment = config.cov_increment + + dirty = False + if config.set_value is not None: + self.value_state = None + self.status = None + + if config.set_value != self.set_value: + self.set_value = config.set_value + dirty = True + + self.value_state = ValueState.applied + + return dirty + + def get_state(self) -> PointPointsetState: + """ + Returns the state representation of this point. + + Returns: + A PointPointsetState object suitable for serialization in the state block. + """ + return PointPointsetState( + status=self.status, + value_state=self.value_state, + units=self.units + ) + + def get_event(self) -> PointPointsetEvents: + """ + Returns the telemetry event representation of this point. + + Returns: + A PointPointsetEvents object containing the present value. + """ + return PointPointsetEvents( + present_value=self.present_value + ) + + def should_report(self, sample_rate_sec: int) -> bool: + """ + Determines if this point needs to be reported based on Change of Value (COV) + and Heartbeat logic. + + Logic: + 1. If never reported, report immediately. + 2. If value changed > cov_increment, report immediately. + 3. If time since last report > heartbeat_interval, report immediately. + + Args: + sample_rate_sec: The global sample rate (used to calculate heartbeat). + + Returns: + True if the point should be included in the next telemetry message. + """ + if self.present_value is None: + return False + + # Always report if never reported before + if self.last_reported_value is None: + return True + + now = time.time() + should_report_cov = False + + # Check Change of Value (COV) + if self.present_value != self.last_reported_value: + is_numeric = (isinstance(self.present_value, (int, float)) and + isinstance(self.last_reported_value, (int, float))) + + if is_numeric and self.cov_increment is not None: + delta = abs(self.present_value - self.last_reported_value) + if delta >= self.cov_increment: + should_report_cov = True + else: + # Non-numeric or no COV threshold set -> Report any change + should_report_cov = True + + if should_report_cov: + return True + + heartbeat_interval = sample_rate_sec if sample_rate_sec > 0 else DEFAULT_HEARTBEAT_SEC + if (now - self.last_reported_time) >= heartbeat_interval: + return True + + return False + + def mark_reported(self) -> None: + """ + Updates the reporting state after a successful publish. + + Syncs `last_reported_value` with `present_value` and updates + `last_reported_time` to the current time. + """ + self.last_reported_value = self.present_value + self.last_reported_time = time.time() + + +class PointsetManager(BaseManager): """ Manages the 'pointset' block. - Acts as the central orchestrator for device telemetry. It dynamically provisions points - from configuration, parses sample rates, oversees background telemetry generation loops, - and handles point writebacks (including validation, staleness, and expiry). - - - Point Lifecycle: Dynamically instantiates new points based on config and manages - their lifecycles (active vs inactive points). - - Writeback Timers: Spawns threading.Timer instances per-point for the 'set_value_expiry' - directive. Transitions points back to base state and triggers State - regeneration upon expiration. - - State Etag Tracking: Calculates and tracks a deterministic state_etag (SHA-256) of all - managed points to prevent stale writebacks and ensure UI synchronicity. - - Polling Pipeline: Interleaves Pointset callbacks, BulkPointProvider reads, and individual - point updates inside its publish_telemetry method. + Handles configuration of points, reporting of point states, and + periodic publishing of telemetry events based on global rate OR per-point COV. """ PERSISTENCE_KEY = "pointset_state" - PERSISTENCE_BUFFER_KEY = "pointset_telemetry_buffer" - MAX_OFFLINE_BUFFER_SIZE = 1000 @property def model_field_name(self) -> str: return "pointset" - def __init__(self, sample_rate_sec: int = DEFAULT_SAMPLE_RATE_SEC, - point_factory: Optional[Callable] = None, - bulk_provider: Optional[BulkPointProvider] = None): + def __init__(self, sample_rate_sec: int = DEFAULT_SAMPLE_RATE_SEC): """ Initializes the PointsetManager. Args: sample_rate_sec: The default interval between telemetry checks. - point_factory: Optional factory to create points. Defaults to Point. - bulk_provider: Optional provider for bulk telemetry reads. """ super().__init__() - self._point_factory = point_factory or Point - self._all_points: Dict[str, AbstractPoint] = {} - self._last_set_values: Dict[str, Any] = {} + self._points: Dict[str, Point] = {} self._sample_rate_sec = sample_rate_sec self._state_etag: Optional[str] = None - self._active_points: set = set() - self._last_active_points: set = set() - - self._sample_limit_sec: Optional[int] = None - self._last_publish_time: float = 0.0 - - self._last_full_publish_time: float = 0.0 self._writeback_handler: Optional[WritebackHandler] = None self._telemetry_wake_event: Optional[threading.Event] = None self._poll_callback: Optional[PollCallback] = None - self._writeback_timers: Dict[str, threading.Timer] = {} - self._bulk_provider: Optional[BulkPointProvider] = bulk_provider - - self._offline_buffer: list[dict] = [] LOGGER.info("PointsetManager initialized.") @property - def points(self) -> Mapping[str, AbstractPoint]: - """Returns a read-only mapping of the active points.""" - return { - name: self._all_points[name] - for name in self._active_points - if name in self._all_points - } - - @property - def all_points(self) -> Mapping[str, AbstractPoint]: - """Returns a read-only mapping of all points.""" - return self._all_points - - @property - def _points(self): - warnings.warn( - "Accessing the private '_points' attribute is deprecated. " - "Use the public '.points' property to get active points, " - "or '.all_points' to get all provisioned points.", - DeprecationWarning, stacklevel=2 - ) - return self.points + def points(self) -> Mapping[str, Point]: + """Returns a read-only mapping of the managed points.""" + return self._points def set_model(self, model: Metadata) -> None: """ Applies the Pointset Metadata (Model) to the manager. - Iterates through the static metadata model during startup to initialize - points and apply static properties (like units). + + This iterates through the metadata model and initializes points + defined therein. Args: model: The device metadata object. """ super().set_model(model) - if not self.model or not hasattr(self.model, - 'points') or not self.model.points: + if not self.model or not hasattr(self.model, 'points') or not self.model.points: return LOGGER.info("Applying Pointset Metadata Model...") for name, point_model in self.model.points.items(): - if name not in self._all_points: + if name not in self._points: self.add_point(name) - else: - self._all_points[name].set_model(point_model) + self._points[name].set_model(point_model) def set_writeback_handler(self, handler: WritebackHandler) -> None: """ @@ -169,14 +286,7 @@ def set_writeback_handler(self, handler: WritebackHandler) -> None: Args: handler: A callable taking (point_name, value). """ - message = ( - "set_writeback_handler is deprecated and will be removed in v2.0.0. " - "Please subclass BasicPoint or AbstractPoint and use point_factory instead." - ) - LOGGER.warning(message) - warnings.warn(message, DeprecationWarning, stacklevel=2) self._writeback_handler = handler - LOGGER.info("Registered writeback handler.") def set_poll_callback(self, callback: PollCallback) -> None: """ @@ -188,25 +298,9 @@ def set_poll_callback(self, callback: PollCallback) -> None: Args: callback: A callable returning a dict of {point_name: value}. """ - message = ( - "set_poll_callback is deprecated and will be removed in v2.0.0. " - "Please migrate to the BulkPointProvider interface." - ) - LOGGER.warning(message) - warnings.warn(message, DeprecationWarning, stacklevel=2) self._poll_callback = callback LOGGER.info("Registered telemetry poll callback.") - def register_bulk_provider(self, provider: BulkPointProvider) -> None: - """ - Registers a bulk provider that supplies hardware reads for points. - - Args: - provider: An instance of BulkPointProvider. - """ - self._bulk_provider = provider - LOGGER.info("Registered bulk telemetry provider.") - def add_point(self, name: str) -> None: """ Registers a point to be managed. @@ -214,14 +308,8 @@ def add_point(self, name: str) -> None: Args: name: The name of the point to add. """ - if name not in self._all_points: - point_model = None - if self.model and hasattr(self.model, - "points") and self.model.points: - point_model = self.model.points.get(name) - - self._all_points[name] = self._point_factory(name, - model=point_model) + if name not in self._points: + self._points[name] = Point(name) LOGGER.debug("Added point '%s' to manager.", name) def set_point_value(self, name: str, value: Any) -> None: @@ -232,14 +320,9 @@ def set_point_value(self, name: str, value: Any) -> None: name: The name of the point. value: The new value. """ - if name not in self._all_points: + if name not in self._points: self.add_point(name) - - point = self._all_points[name] - if hasattr(point, "set_present_value"): - point.set_present_value(value) - else: - LOGGER.debug("Point '%s' does not support set_present_value", name) + self._points[name].set_present_value(value) def start(self) -> None: """ @@ -249,7 +332,6 @@ def start(self) -> None: """ LOGGER.info("Starting PointsetManager telemetry loop...") self._load_persisted_state() - self._load_persisted_buffer() self._telemetry_wake_event = self.start_periodic_task( interval_getter=lambda: self._sample_rate_sec, task=self.publish_telemetry, @@ -261,20 +343,13 @@ def stop(self) -> None: Stops the telemetry loop and persists current state. """ self._persist_state() - for timer in self._writeback_timers.values(): - timer.cancel() - self._writeback_timers.clear() super().stop() LOGGER.info("PointsetManager stopped.") def handle_config(self, config: Config) -> None: - # pylint: disable=too-many-locals,too-many-branches,too-many-statements,too-many-nested-blocks """ Handles 'pointset' block of config. - The central pointset state-machine driver for config updates. It updates - global sample rates, provisions new points dynamically, handles writeback - validations (etag checks, expiry evaluation), and synchronizes individual - point states with cloud commands. + Updates sample rates and dynamic point configurations. Args: config: The full device configuration. @@ -291,180 +366,47 @@ def handle_config(self, config: Config) -> None: self._sample_rate_sec = config.pointset.sample_rate_sec if self._telemetry_wake_event: self._telemetry_wake_event.set() - self._sample_limit_sec = config.pointset.sample_limit_sec + + # Update Etag + self._state_etag = config.pointset.state_etag # Update Points (Dynamic Provisioning) new_point_configs = config.pointset.points or {} - self._active_points = set(new_point_configs.keys()) + current_point_names = set(self._points.keys()) + new_point_names = set(new_point_configs.keys()) - config_timestamp_str = config.timestamp - config_timestamp = None - if config_timestamp_str: - try: - config_timestamp = datetime.fromisoformat( - config_timestamp_str.replace("Z", "+00:00")).timestamp() - except ValueError: - pass - - set_value_expiry_str = config.pointset.set_value_expiry - set_value_expiry = None - if set_value_expiry_str: - try: - set_value_expiry = datetime.fromisoformat( - set_value_expiry_str.replace("Z", "+00:00")).timestamp() - except ValueError: - pass - - incoming_etag = config.pointset.state_etag - etag_mismatch = False - if incoming_etag and self._state_etag and incoming_etag != self._state_etag: - LOGGER.warning( - "state_etag mismatch! Cloud: %s | Device: %s. Rejecting writebacks.", - incoming_etag, self._state_etag - ) - etag_mismatch = True + for name in current_point_names - new_point_names: + LOGGER.info("Removing point '%s' not present in received config", name) + del self._points[name] - for point_name in new_point_configs: - if point_name not in self._all_points: - LOGGER.info("Provisioning new point from config: %s", - point_name) + for point_name, point_config in new_point_configs.items(): + if point_name not in self._points: + LOGGER.info("Provisioning new point from config: %s", point_name) self.add_point(point_name) - for point_name, point in self._all_points.items(): - point_config = new_point_configs.get(point_name) - - invalid_expiry = False - is_expired = False - if point_config and point_config.set_value is not None: - if set_value_expiry is None or ( - config_timestamp is not None and set_value_expiry <= config_timestamp): - invalid_expiry = True - elif set_value_expiry < time.time(): - is_expired = True - - try: - point.set_config(point_config, invalid_expiry=invalid_expiry, - is_expired=is_expired, - on_state_change=self.trigger_state_update) - except TypeError: - point.set_config(point_config) - - if point_config and point_config.set_value is not None: - if invalid_expiry or is_expired: - if invalid_expiry: - self.trigger_state_update() - if point_name in self._writeback_timers: - self._writeback_timers[point_name].cancel() - del self._writeback_timers[point_name] - continue - - if etag_mismatch: - point.value_state = ValueState.invalid - point.status = Entry( - message="state_etag mismatch. Stale writeback prevented.", - level=500 - ) - self.trigger_state_update() - continue - - previous_set = self._last_set_values.get(point_name) - if point_config.set_value != previous_set: - self._last_set_values[point_name] = point_config.set_value - if self._writeback_handler is not None: - try: - result = self._writeback_handler(point_name, - point_config.set_value) - if isinstance(result, ValueState): - point.value_state = result - elif isinstance(result, WritebackResult): - point.value_state = result.value_state - if result.status: - point.status = result.status - except Exception as e: # pylint: disable=broad-exception-caught - LOGGER.error( - "Error in writeback handler for %s: %s", - point_name, e) - point.value_state = ValueState.failure - point.status = Entry(message=str(e), level=500) - else: - # Rely entirely on the point's native set_config/set_value - # to manage hardware actuation and determine the resulting value_state. - pass - - if point.value_state in (ValueState.applied, - ValueState.updating) and set_value_expiry: - delay = set_value_expiry - time.time() - if delay > 0: - if point_name in self._writeback_timers: - self._writeback_timers[point_name].cancel() - - timer = threading.Timer(delay, - self._handle_writeback_expiration, - args=[point_name]) - timer.daemon = True - self._writeback_timers[point_name] = timer - timer.start() - - def _handle_writeback_expiration(self, point_name: str) -> None: - """ - Callback when a point's set_value_expiry is reached. - """ - if point_name not in self._all_points: - return - - point = self._all_points[point_name] - try: - if hasattr(point, "clear_writeback"): - point.clear_writeback() - elif hasattr(point, "set_config"): - point.set_config(PointPointsetConfig()) - - if point_name in self._writeback_timers: - del self._writeback_timers[point_name] - - self._last_set_values.pop(point_name, None) - self.trigger_state_update() - LOGGER.info("Writeback timer expired for point: %s", point_name) - except Exception as e: # pylint: disable=broad-exception-caught - LOGGER.error("Error expiring writeback for point %s: %s", - point_name, e) + point = self._points[point_name] + is_writeback = point.update_config(point_config) + + if is_writeback and self._writeback_handler is not None: + try: + result = self._writeback_handler(point_name, point.set_value) + if isinstance(result, ValueState): + point.value_state = result + elif isinstance(result, WritebackResult): + point.value_state = result.value_state + if result.status: + point.status = result.status + except Exception as e: # pylint: disable=broad-exception-caught + LOGGER.error("Error in writeback handler for %s: %s", + point_name, e) + point.value_state = ValueState.failure + point.status = Entry(message=str(e), level=500) def handle_command(self, command_name: str, payload: dict) -> None: """ Handles commands directed at the pointset (currently none). """ - def _generate_state_etag(self, points_state_map: dict) -> str: - """ - Generates a unique SHA-256 hash representing the current pointset state. - Includes 'units', 'value_state', and 'set_value'. - """ - etag_dict_map = {} - for point_name, point_state in points_state_map.items(): - point_state_dict = ( - point_state.to_dict() if hasattr(point_state, 'to_dict') - else point_state - ) - etag_point_dict = {} - if ("units" in point_state_dict and - point_state_dict["units"] is not None): - etag_point_dict["units"] = point_state_dict["units"] - if ("value_state" in point_state_dict and - point_state_dict["value_state"] is not None): - etag_point_dict["value_state"] = point_state_dict["value_state"] - - set_val = self._last_set_values.get(point_name) - if set_val is not None: - etag_point_dict["set_value"] = set_val - - etag_dict_map[point_name] = etag_point_dict - - # Ensure deterministic JSON serialization by sorting keys - state_str = json.dumps(etag_dict_map, sort_keys=True, - separators=(',', ':')) - full_hash = hashlib.sha256(state_str.encode('utf-8')).hexdigest() - return full_hash[:32] - def _load_persisted_state(self) -> None: """Loads the last known point values from persistence.""" if not self._device or not self._device.persistence: @@ -477,48 +419,30 @@ def _load_persisted_state(self) -> None: restored_count = 0 for point_name, point_data in saved_state.items(): - if point_name not in self._all_points: + if point_name not in self._points: continue + point = self._points[point_name] if "present_value" in point_data: - self.set_point_value(point_name, - point_data["present_value"]) + point.present_value = point_data["present_value"] restored_count += 1 LOGGER.info("Restored state for %s points.", restored_count) - except Exception as e: # pylint: disable=broad-exception-caught + except Exception as e: # pylint: disable=broad-exception-caught LOGGER.error("Failed to load persisted pointset state: %s", e) def update_state(self, state: State) -> None: """ Populates state.pointset with status of all managed points. - Aggregates point-level states and compiles them into the device's main State. - Checks for dirty flags and recalculates the state_etag if changes occurred. Args: state: The state object to update. """ - is_state_dirty = False - - if self._active_points != self._last_active_points: - is_state_dirty = True - self._last_active_points = self._active_points.copy() - - points_state_map = {} - for name in self._active_points: - if name in self._all_points: - point = self._all_points[name] - - if point.is_dirty(): - is_state_dirty = True - - points_state_map[name] = point.get_state() - - if is_state_dirty or self._state_etag is None: - self._state_etag = self._generate_state_etag(points_state_map) - LOGGER.debug("Pointset state changed. Recalculated state_etag: %s", - self._state_etag) + points_state_map = { + name: point.get_state() + for name, point in self._points.items() + } state.pointset = PointsetState( state_etag=self._state_etag, @@ -533,88 +457,27 @@ def _persist_state(self) -> None: try: data_to_save = {} - for name, point in self._all_points.items(): - current_data = point.get_data() - if current_data.present_value is not None: + for name, point in self._points.items(): + if point.present_value is not None: data_to_save[name] = { - "present_value": current_data.present_value + "present_value": point.present_value } self._device.persistence.set(self.PERSISTENCE_KEY, data_to_save) - except Exception as e: # pylint: disable=broad-exception-caught + except Exception as e: # pylint: disable=broad-exception-caught LOGGER.error("Failed to persist pointset state: %s", e) - def _buffer_event(self, event: PointsetEvents) -> None: - """Appends the event to the offline buffer and persists it.""" - self._offline_buffer.append(event.to_dict()) - - if len(self._offline_buffer) > self.MAX_OFFLINE_BUFFER_SIZE: - self._offline_buffer = self._offline_buffer[-self.MAX_OFFLINE_BUFFER_SIZE:] - - self._persist_buffer() - LOGGER.info("Device offline. Buffered telemetry event. Buffer size: %s", - len(self._offline_buffer)) - - def _flush_buffer(self) -> None: - """Publishes all buffered events and clears the buffer.""" - if not self._offline_buffer: - return - - try: - LOGGER.info("Connection resumed. Flushing %s buffered telemetry events...", - len(self._offline_buffer)) - for event_dict in self._offline_buffer: - event = PointsetEvents.from_dict(event_dict) - self.publish_event(event, "pointset") - time.sleep(0.01) - - self._offline_buffer.clear() - self._persist_buffer() - except Exception as e: # pylint: disable=broad-exception-caught - LOGGER.error("Failed to flush telemetry buffer: %s", e) - - def _persist_buffer(self) -> None: - """Saves the offline buffer to persistence.""" - if not self._device or not self._device.persistence: - return - try: - self._device.persistence.set(self.PERSISTENCE_BUFFER_KEY, self._offline_buffer) - except Exception as e: # pylint: disable=broad-exception-caught - LOGGER.error("Failed to persist telemetry buffer: %s", e) - - def _load_persisted_buffer(self) -> None: - """Loads the offline telemetry buffer from persistence.""" - if not self._device or not self._device.persistence: - return - try: - saved_buffer = self._device.persistence.get(self.PERSISTENCE_BUFFER_KEY, []) - if isinstance(saved_buffer, list): - self._offline_buffer = saved_buffer - if self._offline_buffer: - LOGGER.info("Restored offline telemetry buffer with %s events.", - len(self._offline_buffer)) - except Exception as e: # pylint: disable=broad-exception-caught - LOGGER.error("Failed to load telemetry buffer: %s", e) - def publish_telemetry(self) -> None: - # pylint: disable=too-many-branches,too-many-statements """ - Generates and publishes a PointsetEvents message. - Runs the periodic telemetry loop. It first updates points via Bulk Providers or Poll - Callbacks. Then, it evaluates every active point to see if it should_report() - based on COV or heartbeats, and crafts the final payload (including partial updates). + Generates and publishes a PointsetEvents message containing + only the points that are 'due' for reporting. + + This method: + 1. Invokes the Poll Callback (if registered) to refresh values. + 2. Iterates over all points to check `should_report()` (COV/Heartbeat). + 3. Constructs a payload of only the reporting points. + 4. Publishes to MQTT via the dispatcher. """ - now = time.time() - if self._sample_limit_sec is not None: - if (now - self._last_publish_time) < self._sample_limit_sec: - return - - if self._poll_callback and self._bulk_provider: - LOGGER.warning( - "Both _bulk_provider and _poll_callback are set, " - "prioritizing _bulk_provider, but also executing _poll_callback" - ) - if self._poll_callback: try: new_values = self._poll_callback() @@ -624,79 +487,29 @@ def publish_telemetry(self) -> None: else: LOGGER.warning("Poll callback returned non-dict type: %s", type(new_values)) - except Exception as e: # pylint: disable=broad-exception-caught + except Exception as e: # pylint: disable=broad-exception-caught LOGGER.error("Error in telemetry poll callback: %s", e, exc_info=True) - if self._bulk_provider: - try: - new_values = self._bulk_provider.read_points() - if isinstance(new_values, dict): - for point_name, value in new_values.items(): - self.set_point_value(point_name, value) - else: - LOGGER.warning("BulkProvider returned non-dict type: %s", - type(new_values)) - except Exception as e: # pylint: disable=broad-exception-caught - LOGGER.error("Error in BulkPointProvider.read_points(): %s", e) - - if not self._all_points: + if not self._points: return points_map = {} - - valid_active_points = [ - name for name in self._active_points if name in self._all_points - ] - - force_full_update = ( - (now - self._last_full_publish_time) >= self._sample_rate_sec) - - for name in valid_active_points: - point = self._all_points[name] - try: - point.update_data() - except Exception as e: # pylint: disable=broad-exception-caught - LOGGER.error("Unable to update point data for '%s': %s", name, - e) - continue - - try: - if force_full_update or point.should_report( - self._sample_rate_sec): - points_map[name] = point.get_data() - point.mark_reported() - except Exception as e: # pylint: disable=broad-exception-caught - LOGGER.error("Unable to process reporting for '%s': %s", name, - e) + for name, point in self._points.items(): + if point.should_report(self._sample_rate_sec): + points_map[name] = point.get_event() + point.mark_reported() if not points_map: return - is_partial = len(points_map) < len(valid_active_points) - try: event = PointsetEvents( timestamp=datetime.now(timezone.utc).isoformat(), version=UDMI_VERSION, - points=points_map, - partial_update=True if is_partial else None + points=points_map ) - - if not self.is_connected: - self._buffer_event(event) - else: - self._flush_buffer() - self.publish_event(event, "pointset") - LOGGER.debug( - "Published telemetry for %s points (Partial: %s, Forced Full: %s).", - len(points_map), - is_partial, - force_full_update - ) - - self._last_publish_time = now - if force_full_update: - self._last_full_publish_time = now - except Exception as e: # pylint: disable=broad-exception-caught + self.publish_event(event, "pointset") + LOGGER.debug("Published telemetry for %s points.", len(points_map)) + except Exception as e: # pylint:disable=broad-exception-caught LOGGER.error("Failed to publish telemetry: %s", e) diff --git a/clientlib/python/tests/core/managers/test_pointset_manager.py b/clientlib/python/tests/core/managers/test_pointset_manager.py index da86c58459..0cfa535853 100644 --- a/clientlib/python/tests/core/managers/test_pointset_manager.py +++ b/clientlib/python/tests/core/managers/test_pointset_manager.py @@ -7,7 +7,6 @@ from unittest.mock import MagicMock from unittest.mock import patch import time -import warnings import pytest from udmi.schema import Config @@ -19,11 +18,10 @@ from udmi.schema import State from udmi.schema import ValueState -from udmi.core.managers.point.virtual_point import Point +from src.udmi.core.managers.pointset_manager import Point from src.udmi.core.managers.pointset_manager import PointsetManager from src.udmi.core.managers.pointset_manager import WritebackResult from src.udmi.core.messaging import AbstractMessageDispatcher -from udmi.core.managers.point.bulk_provider import BulkPointProvider # pylint: disable=redefined-outer-name,protected-access @@ -45,19 +43,24 @@ def manager(): def test_point_value_update(): """Test that setting a value updates the internal state for events.""" point = Point("temp_sensor") + assert point.present_value is None + point.set_present_value(25.5) - point.update_data() # Sync value to event - - event = point.get_data() + assert point.present_value == 25.5 + + event = point.get_event() assert event.present_value == 25.5 def test_point_config_update(): """Test that updating config updates the internal state for reporting.""" point = Point("temp_sensor") + assert point.units is None config = PointPointsetConfig(units="Celsius") - point.set_config(config) + point.update_config(config) + + assert point.units == "Celsius" state = point.get_state() assert state.units == "Celsius" @@ -68,49 +71,39 @@ def test_point_should_report_cov_logic(): Test the Change of Value (COV) logic. """ point = Point("test_point") - config = PointPointsetConfig(cov_increment=1.0) - point.set_config(config) + point.cov_increment = 1.0 point.set_present_value(10.0) - point.update_data() assert point.should_report(sample_rate_sec=10) is True point.mark_reported() point.set_present_value(10.5) - point.update_data() assert point.should_report(sample_rate_sec=10) is False point.set_present_value(11.1) - point.update_data() assert point.should_report(sample_rate_sec=10) is True point.mark_reported() # Test periodic reporting without COV (Steady State) # 5 seconds - Should not report - with patch("time.time", return_value=point._last_reported_time + 5): + with patch("time.time", return_value=point.last_reported_time + 5): assert point.should_report(sample_rate_sec=10) is False # 11 seconds (exceeds sample_rate_sec=10) - Should report - with patch("time.time", return_value=point._last_reported_time + 11): + with patch("time.time", return_value=point.last_reported_time + 11): assert point.should_report(sample_rate_sec=10) is True def test_point_set_model_syncs_ref(): - """Test that model limits and values behave correctly with metadata.""" - from udmi.schema import Category + """Test that set_model syncs the ref property.""" from udmi.schema import PointPointsetModel - model = PointPointsetModel(ref="point_ref_123", writable=True) - point = Point("temp_sensor", model=model) + point = Point("temp_sensor") + assert point.ref is None - config_good = PointPointsetConfig(ref="point_ref_123") - point.set_config(config_good) - assert point.get_state().status is None + model = PointPointsetModel(ref="point_ref_123") + point.set_model(model) - config_bad = PointPointsetConfig(ref="bad_ref") - point.set_config(config_bad) - state = point.get_state() - assert state.status is not None - assert state.status.category == Category.POINTSET_POINT_FAILURE + assert point.ref == "point_ref_123" # --- PointsetManager Tests --- @@ -118,16 +111,15 @@ def test_point_set_model_syncs_ref(): def test_initialization(manager): """Test default initialization values.""" assert manager._sample_rate_sec == 10 - assert len(manager._all_points) == 0 + assert len(manager._points) == 0 def test_add_point_and_set_value(manager): """Test the API for adding points and setting values.""" manager.set_point_value("pressure", 101.3) - assert "pressure" in manager._all_points - manager._all_points["pressure"].update_data() - assert manager._all_points["pressure"].get_data().present_value == 101.3 + assert "pressure" in manager._points + assert manager._points["pressure"].present_value == 101.3 def test_handle_config_updates_sample_rate(manager): @@ -154,36 +146,34 @@ def test_handle_config_adds_points(manager): ) manager.handle_config(config) - assert "room_temp" in manager._all_points - assert manager._all_points["room_temp"].get_state().units == "C" - assert "humidity" in manager._all_points - assert manager._all_points["humidity"].get_state().units == "%" + assert "room_temp" in manager._points + assert manager._points["room_temp"].units == "C" + assert "humidity" in manager._points + assert manager._points["humidity"].units == "%" def test_handle_config_updates_state_etag(manager): - """Test that state_etag is NOT blindly captured from config.""" - manager._state_etag = "original" + """Test that state_etag is captured from config.""" config = Config( pointset=PointsetConfig(state_etag="abcdef123") ) manager.handle_config(config) - assert manager._state_etag == "original" + assert manager._state_etag == "abcdef123" def test_update_state_populates_state_block(manager): """Test that update_state populates the state object correctly.""" manager.add_point("temp") - manager._all_points["temp"].set_config(PointPointsetConfig(units="C")) + manager._points["temp"].units = "C" manager._state_etag = "etag_value" - manager._active_points = {"temp"} state = State() manager.update_state(state) assert state.pointset is not None assert isinstance(state.pointset, PointsetState) - assert state.pointset.state_etag == manager._state_etag + assert state.pointset.state_etag == "etag_value" assert "temp" in state.pointset.points assert state.pointset.points["temp"].units == "C" @@ -197,7 +187,6 @@ def test_publish_telemetry_sends_events(manager, mock_dispatcher): manager.set_point_value("temp", 22.0) manager.set_point_value("pressure", 1000) - manager._active_points = {"temp", "pressure"} manager.publish_telemetry() @@ -215,17 +204,13 @@ def test_publish_telemetry_sends_events(manager, mock_dispatcher): def test_publish_telemetry_ignores_points_without_values(manager, mock_dispatcher): """ - Test that points with no value (None) are excluded from the event - when not doing a full update (force_full_update=False). + Test that points with no value (None) are excluded from the event. """ manager.set_device_context(device=None, dispatcher=mock_dispatcher) manager.set_point_value("valid_point", 10) manager.add_point("empty_point") - manager._active_points = {"valid_point", "empty_point"} - import time - manager._last_full_publish_time = time.time() manager.publish_telemetry() call_args = mock_dispatcher.publish_event.call_args @@ -277,9 +262,7 @@ def test_handle_config_triggers_writeback_handler(manager): manager.set_writeback_handler(mock_handler) config = Config( - timestamp="2030-01-01T00:00:00Z", pointset=PointsetConfig( - set_value_expiry="2030-01-01T01:00:00Z", points={ "setpoint": PointPointsetConfig(set_value=22.5) } @@ -289,6 +272,7 @@ def test_handle_config_triggers_writeback_handler(manager): manager.handle_config(config) mock_handler.assert_called_once_with("setpoint", 22.5) + assert manager._points["setpoint"].set_value == 22.5 def test_publish_telemetry_invokes_poll_callback(manager, mock_dispatcher): @@ -302,7 +286,6 @@ def mock_poll(): return {"polled_point": 123} manager.set_poll_callback(mock_poll) - manager._active_points = {"polled_point"} manager.publish_telemetry() @@ -323,21 +306,16 @@ def mock_handler(point_name, value): manager.set_writeback_handler(mock_handler) - manager.add_point("setpoint") - manager._all_points["setpoint"]._writable = True - config = Config( - timestamp="2030-01-01T00:00:00Z", pointset=PointsetConfig( - set_value_expiry="2030-01-01T01:00:00Z", points={"setpoint": PointPointsetConfig(set_value=22.5)} ) ) manager.handle_config(config) - assert manager._all_points["setpoint"].get_data().present_value == 22.5 - assert manager._all_points["setpoint"].value_state == ValueState.overridden + assert manager._points["setpoint"].set_value == 22.5 + assert manager._points["setpoint"].value_state == ValueState.overridden def test_handle_config_writeback_returns_result(manager): @@ -352,21 +330,16 @@ def mock_handler(point_name, value): manager.set_writeback_handler(mock_handler) - manager.add_point("setpoint") - manager._all_points["setpoint"]._writable = True - config = Config( - timestamp="2030-01-01T00:00:00Z", pointset=PointsetConfig( - set_value_expiry="2030-01-01T01:00:00Z", points={"setpoint": PointPointsetConfig(set_value=22.5)} ) ) manager.handle_config(config) - point = manager._all_points["setpoint"] - assert point.get_data().present_value == 22.5 + point = manager._points["setpoint"] + assert point.set_value == 22.5 assert point.value_state == ValueState.invalid assert point.status == custom_entry @@ -381,307 +354,17 @@ def mock_handler(point_name, value): manager.set_writeback_handler(mock_handler) - manager.add_point("setpoint") - manager._all_points["setpoint"]._writable = True - config = Config( - timestamp="2030-01-01T00:00:00Z", pointset=PointsetConfig( - set_value_expiry="2030-01-01T01:00:00Z", points={"setpoint": PointPointsetConfig(set_value=22.5)} ) ) manager.handle_config(config) - point = manager._all_points["setpoint"] - assert point.get_data().present_value == 22.5 + point = manager._points["setpoint"] + assert point.set_value == 22.5 assert point.value_state == ValueState.failure assert point.status is not None assert point.status.level == 500 - assert "Hardware communication failed" in point.status.message - - -def test_set_writeback_handler_emits_deprecation_warning(manager): - """Verifies that calling set_writeback_handler triggers a DeprecationWarning.""" - with warnings.catch_warnings(record=True) as w: - warnings.simplefilter("always") - manager.set_writeback_handler(lambda n, v: None) - assert len(w) == 1 - assert issubclass(w[-1].category, DeprecationWarning) - assert "set_writeback_handler is deprecated" in str(w[-1].message) - - -def test_set_poll_callback_emits_deprecation_warning(manager): - """Verifies that calling set_poll_callback triggers a DeprecationWarning.""" - with warnings.catch_warnings(record=True) as w: - warnings.simplefilter("always") - manager.set_poll_callback(lambda: {}) - assert len(w) == 1 - assert issubclass(w[-1].category, DeprecationWarning) - assert "set_poll_callback is deprecated" in str(w[-1].message) - - -class MockBulkProvider(BulkPointProvider): - def read_points(self): - return {"temp": 22.0} - - -def test_bulk_provider_updates_points(manager, mock_dispatcher): - """Verifies that an initialized bulk provider populates values before publishing.""" - manager.set_device_context(device=None, dispatcher=mock_dispatcher) - - provider = MockBulkProvider() - manager.register_bulk_provider(provider) - - manager.add_point("temp") - manager._active_points = {"temp"} - - # No point value set originally - manager.publish_telemetry() - - # Assert dispatcher published with the value from the provider - call_args = mock_dispatcher.publish_event.call_args - payload = call_args[0][1] - - assert "temp" in payload.points - assert payload.points["temp"].present_value == 22.0 - - -def test_initialization_with_bulk_provider(mock_dispatcher): - """Verifies that PointsetManager can be initialized with a bulk provider.""" - provider = MockBulkProvider() - manager = PointsetManager(bulk_provider=provider) - manager.set_device_context(device=None, dispatcher=mock_dispatcher) - - manager.add_point("temp") - manager._active_points = {"temp"} - - # No point value set originally - manager.publish_telemetry() - - # Assert dispatcher published with the value from the provider - call_args = mock_dispatcher.publish_event.call_args - payload = call_args[0][1] - - assert "temp" in payload.points - assert payload.points["temp"].present_value == 22.0 - - -def test_native_writeback_path_used_when_no_global_handler(manager): - """Verifies point internally processes set_value when no global handler exists.""" - assert manager._writeback_handler is None - - manager.add_point("setpoint") - manager._all_points["setpoint"]._writable = True - - config = Config( - timestamp="2030-01-01T00:00:00Z", - pointset=PointsetConfig( - set_value_expiry="2030-01-01T01:00:00Z", - points={"setpoint": PointPointsetConfig(set_value=22.5)} - ) - ) - - manager.handle_config(config) - - point = manager._all_points["setpoint"] - - # Wait for the native writeback thread to complete - timeout = 1.0 - start_time = time.time() - while point.value_state != ValueState.applied and time.time() - start_time < timeout: - time.sleep(0.01) - - assert point.get_data().present_value == 22.5 - assert point.value_state == ValueState.applied - -def test_native_writeback_asynchronous_execution(manager): - """Verifies that point writebacks utilize the updating state for long hardware actuations.""" - manager.add_point("slow_setpoint") - point = manager._all_points["slow_setpoint"] - point._writable = True - - # Mock a long-running hardware actuation - original_set_value = point.set_value - def slow_set_value(val): - time.sleep(0.1) - return original_set_value(val) - - point.set_value = slow_set_value - - config = Config( - timestamp="2030-01-01T00:00:00Z", - pointset=PointsetConfig( - set_value_expiry="2030-01-01T01:00:00Z", - points={"slow_setpoint": PointPointsetConfig(set_value=42.0)} - ) - ) - - manager.handle_config(config) - - # State should immediately be 'updating' upon synchronous return - assert point.value_state == ValueState.updating - assert point.get_data().present_value != 42.0 - - # Wait for completion - timeout = 1.0 - start_time = time.time() - while point.value_state != ValueState.applied and time.time() - start_time < timeout: - time.sleep(0.01) - - assert point.value_state == ValueState.applied - assert point.get_data().present_value == 42.0 - -def test_persistence_logic(manager, mock_dispatcher): - """Verifies that _persist_state and _load_persisted_state interact with device persistence correctly.""" - mock_device = MagicMock() - mock_persistence = MagicMock() - mock_device.persistence = mock_persistence - manager.set_device_context(device=mock_device, dispatcher=mock_dispatcher) - - manager.add_point("persisted_point") - manager.set_point_value("persisted_point", 55.5) - manager._all_points["persisted_point"].update_data() - - manager._persist_state() - mock_persistence.set.assert_called_once() - - saved_data = mock_persistence.set.call_args[0][1] - assert "persisted_point" in saved_data - assert saved_data["persisted_point"]["present_value"] == 55.5 - - mock_persistence.get.return_value = {"persisted_point": {"present_value": 77.7}} - manager._load_persisted_state() - - manager._all_points["persisted_point"].update_data() - assert manager._all_points["persisted_point"].get_data().present_value == 77.7 - -def test_handle_writeback_expiration(manager): - """Verifies that writeback expiration clears the state.""" - manager.add_point("expiring_point") - point = manager._all_points["expiring_point"] - - point._written = True - point.value_state = ValueState.applied - manager._last_set_values["expiring_point"] = 100 - manager._writeback_timers["expiring_point"] = MagicMock() - - manager._handle_writeback_expiration("expiring_point") - - assert point.value_state is None - assert "expiring_point" not in manager._last_set_values - assert "expiring_point" not in manager._writeback_timers - -def test_generate_state_etag(manager): - """Verifies state_etag generation is deterministic.""" - dict1 = {"pointA": {"value_state": "applied"}, "pointB": {"value_state": "invalid"}} - dict2 = {"pointB": {"value_state": "invalid"}, "pointA": {"value_state": "applied"}} - - etag1 = manager._generate_state_etag(dict1) - etag2 = manager._generate_state_etag(dict2) - - assert etag1 == etag2 - assert len(etag1) == 32 - -def test_publish_telemetry_catches_point_exceptions(manager, mock_dispatcher): - """Verifies telemetry loop handles individual point failures gracefully.""" - manager.set_device_context(device=None, dispatcher=mock_dispatcher) - manager.add_point("robust_point") - manager._active_points = {"robust_point"} - - point = manager._all_points["robust_point"] - point.update_data = MagicMock(side_effect=Exception("Test Error")) - - manager.publish_telemetry() - -def test_handle_config_invalid_expiry_handling(manager): - """Verifies that an invalid set_value_expiry transitions point to invalid.""" - manager.add_point("setpoint") - manager._all_points["setpoint"]._writable = True - config = Config( - timestamp="2030-01-01T00:00:00Z", - pointset=PointsetConfig( - set_value_expiry="2020-01-01T00:00:00Z", - points={"setpoint": PointPointsetConfig(set_value=123)} - ) - ) - manager.handle_config(config) - assert manager._all_points["setpoint"].value_state == ValueState.invalid - -def test_handle_command_passthrough(manager): - """Verifies that handle_command safely processes without failing.""" - manager.handle_command("some_command", {"payload": "data"}) - - -def test_offline_buffering(manager, mock_dispatcher): - """Verifies that telemetry is buffered when disconnected and flushed when connected.""" - mock_device = MagicMock() - mock_persistence = MagicMock() - mock_device.persistence = mock_persistence - manager.set_device_context(device=mock_device, dispatcher=mock_dispatcher) - - manager.add_point("temp") - manager.set_point_value("temp", 22.0) - manager._active_points = {"temp"} - - # Simulate disconnected - mock_dispatcher.is_connected.return_value = False - manager.publish_telemetry() - - mock_dispatcher.publish_event.assert_not_called() - assert len(manager._offline_buffer) == 1 - mock_persistence.set.assert_called_with(PointsetManager.PERSISTENCE_BUFFER_KEY, manager._offline_buffer) - - # Change value and publish again - manager.set_point_value("temp", 25.0) - manager.publish_telemetry() - - mock_dispatcher.publish_event.assert_not_called() - assert len(manager._offline_buffer) == 2 - - # Simulate connected - mock_dispatcher.is_connected.return_value = True - manager.set_point_value("temp", 27.0) - manager.publish_telemetry() - - # Flush 2 items + publish the new 1 - assert mock_dispatcher.publish_event.call_count == 3 - assert len(manager._offline_buffer) == 0 - mock_persistence.set.assert_called_with(PointsetManager.PERSISTENCE_BUFFER_KEY, []) - -def test_offline_buffer_capacity(manager, mock_dispatcher): - """Verifies that the offline buffer limits the number of queued events.""" - mock_device = MagicMock() - mock_device.persistence = MagicMock() - manager.set_device_context(device=mock_device, dispatcher=mock_dispatcher) - - manager.add_point("temp") - manager._active_points = {"temp"} - - # Simulate disconnected - mock_dispatcher.is_connected.return_value = False - - # Lower max size for test - original_max = manager.MAX_OFFLINE_BUFFER_SIZE - manager.MAX_OFFLINE_BUFFER_SIZE = 5 - - for i in range(10): - manager.set_point_value("temp", float(i)) - manager.publish_telemetry() - - assert len(manager._offline_buffer) == 5 - - manager.MAX_OFFLINE_BUFFER_SIZE = original_max - -def test_load_persisted_buffer(manager, mock_dispatcher): - """Verifies that the offline buffer is loaded from persistence correctly.""" - mock_device = MagicMock() - mock_device.persistence = MagicMock() - mock_device.persistence.get.return_value = [{"points": {"temp": {"present_value": 11.0}}}] - manager.set_device_context(device=mock_device, dispatcher=mock_dispatcher) - - manager._load_persisted_buffer() - - assert len(manager._offline_buffer) == 1 - assert manager._offline_buffer[0]["points"]["temp"]["present_value"] == 11.0 \ No newline at end of file + assert "Hardware communication failed" in point.status.message \ No newline at end of file diff --git a/common/src/main/java/com/google/udmi/util/SiteModel.java b/common/src/main/java/com/google/udmi/util/SiteModel.java index dbae97619d..cf77a8f3cd 100644 --- a/common/src/main/java/com/google/udmi/util/SiteModel.java +++ b/common/src/main/java/com/google/udmi/util/SiteModel.java @@ -103,7 +103,7 @@ public class SiteModel { private static final Pattern MQTT_PATTERN = Pattern.compile("/r/(.*)/d/(.*)"); private static final String CLOUD_IOT_CONFIG_JSON = "cloud_iot_config.json"; private static final Pattern SPEC_PATTERN = Pattern.compile( - "(//([a-z]+)/)?(([a-z-]+))(/([a-z0-9]+))?(\\+([a-z0-9-]+))?"); + "(//([a-z]+)/)?(([a-z-\\.]+))(/([a-z0-9]+))?(\\+([a-z0-9-]+))?"); private static final int SPEC_PROVIDER_GROUP = 2; private static final int SPEC_PROJECT_GROUP = SPEC_PROVIDER_GROUP + 2; private static final int SPEC_NAMESPACE_GROUP = SPEC_PROJECT_GROUP + 2; diff --git a/udmis/etc/local_pod.json b/udmis/etc/local_pod.json index 8c4497aff2..45908c955c 100644 --- a/udmis/etc/local_pod.json +++ b/udmis/etc/local_pod.json @@ -13,7 +13,7 @@ }, "flows": { "reflect": { - "recv_id": "reflect", + "recv_id": "reflect,/r/UDMI-REFLECT/d/+/state,/r/UDMI-REFLECT/d/+/events/#", "send_id": "c/control" }, "target": { diff --git a/udmis/src/main/java/com/google/bos/udmi/service/messaging/impl/SimpleMqttPipe.java b/udmis/src/main/java/com/google/bos/udmi/service/messaging/impl/SimpleMqttPipe.java index 5aa7c59b9e..b2a9b76f1c 100644 --- a/udmis/src/main/java/com/google/bos/udmi/service/messaging/impl/SimpleMqttPipe.java +++ b/udmis/src/main/java/com/google/bos/udmi/service/messaging/impl/SimpleMqttPipe.java @@ -278,18 +278,20 @@ private void subscribeToMessages() { info("No recv_id defined, not subscribing for component " + endpoint.name); return; } - String subscribeTopic = format(SUB_BASE_FORMAT, recvId); try { synchronized (mqttClient) { boolean connected = mqttClient.isConnected(); trace("Subscribing %s, active=%s connected=%s", clientId, isActive(), connected); if (isActive() && connected) { - mqttClient.subscribe(subscribeTopic); - info("Subscribed %s to topic %s", clientId, subscribeTopic); + for (String part : recvId.split(",")) { + String subscribeTopic = part.startsWith("/") ? part : format(SUB_BASE_FORMAT, part); + mqttClient.subscribe(subscribeTopic); + info("Subscribed %s to topic %s", clientId, subscribeTopic); + } } } } catch (Exception e) { - throw new RuntimeException("While subscribing to mqtt topic: " + subscribeTopic, e); + throw new RuntimeException("While subscribing to mqtt topics: " + recvId, e); } } diff --git a/validator/src/main/java/com/google/bos/iot/core/proxy/IotReflectorClient.java b/validator/src/main/java/com/google/bos/iot/core/proxy/IotReflectorClient.java index 288b20baee..6259220455 100644 --- a/validator/src/main/java/com/google/bos/iot/core/proxy/IotReflectorClient.java +++ b/validator/src/main/java/com/google/bos/iot/core/proxy/IotReflectorClient.java @@ -94,7 +94,7 @@ public class IotReflectorClient implements MessagePublisher { private static final String EVENTS_TYPE = "events"; private static final String MOCK_DEVICE_NUM_ID = "123456789101112"; private static final String UDMI_TOPIC = "events/" + SubFolder.UDMI; - private static final long CONFIG_TIMEOUT_SEC = 5; + private static final long CONFIG_TIMEOUT_SEC = 60; private static final int UPDATE_RETRIES = 6; private static final Collection COPY_IDS = ImmutableSet.of(DEVICE_ID_KEY, GATEWAY_ID_KEY, SUBTYPE_PROPERTY_KEY, SUBFOLDER_PROPERTY_KEY, TRANSACTION_KEY, PUBLISH_TIME_KEY); @@ -270,17 +270,11 @@ private synchronized void setReflectorState() { } private String getReflectorTopic() { - return switch (iotProvider) { - case MQTT -> SubType.REFLECT.toString(); - default -> STATE_TOPIC; - }; + return STATE_TOPIC; } private String getPublishTopic() { - return switch (iotProvider) { - case MQTT -> SubType.REFLECT.toString(); - default -> UDMI_TOPIC; - }; + return UDMI_TOPIC; } @Override @@ -290,6 +284,7 @@ public Credential getCredential() { private void messageHandler(String topic, String payload) { receiveStats.update(); + payload = ofNullable(payload).map(String::trim).orElse(""); if (payload.length() == 0) { return; } @@ -392,9 +387,13 @@ private void updateLastProgressEvent() { private void processUdmiEvent(Map message) { UdmiEvents events = convertTo(UdmiEvents.class, message); + if (events == null || events.logentries == null) { + return; + } ifNotTrueThen(events.logentries.isEmpty(), this::updateLastProgressEvent); events.logentries.forEach( - entry -> System.err.printf("%s %s%n", isoConvert(entry.timestamp), entry.message)); + entry -> System.err.printf("%s %s%n", isoConvert(ofNullable(entry.timestamp).orElse(null)), + entry.message)); } private synchronized void ensureCloudSync(Map message) { @@ -407,8 +406,16 @@ private synchronized void ensureCloudSync(Map message) { UdmiConfig reflectorConfig = convertTo(UdmiConfig.class, ofNullable(message.get(SubFolder.UDMI.value())).orElse(message)); + if (reflectorConfig == null || reflectorConfig.reply == null + || reflectorConfig.reply.msg_source == null) { + return; + } + boolean shouldConsiderReply = reflectorConfig.reply.msg_source.equals(userName); String transactionId = reflectorConfig.reply.transaction_id; + if (transactionId == null) { + return; + } boolean matchingTxnId = transactionId.equals(expectedTxnId); boolean matchingSession = transactionId.startsWith(sessionPrefix); @@ -501,7 +508,8 @@ private void error(String message) { } private Envelope parseMessageTopic(String topic) { - List parts = new ArrayList<>(Arrays.asList(topic.substring(1).split("/"))); + String stripped = topic.startsWith("/") ? topic.substring(1) : topic; + List parts = new ArrayList<>(Arrays.asList(stripped.split("/"))); String leader = parts.remove(0); if ("devices".equals(leader)) { // Next field is registry, not device, since the reflector device id is the site registry. @@ -527,15 +535,24 @@ private Envelope parseMessageTopic(String topic) { Envelope envelope = new Envelope(); envelope.deviceRegistryId = registryId; + if (parts.get(0).equals("c")) { + parts.remove(0); // consume 'c' + envelope.source = parts.remove(0); // consume source + } + String[] bits1 = parts.remove(0).split(SOURCE_SEPARATOR_REGEX); checkState(parts.isEmpty() || bits1.length == 1, "Malformed topic: " + topic); envelope.subType = SubType.fromValue(bits1[0]); if (parts.isEmpty()) { - envelope.source = bits1.length > 1 ? bits1[1] : null; + if (envelope.source == null) { + envelope.source = bits1.length > 1 ? bits1[1] : null; + } } else { String[] bits2 = parts.remove(0).split(SOURCE_SEPARATOR_REGEX); envelope.subFolder = SubFolder.fromValue(bits2[0]); - envelope.source = bits2.length > 1 ? bits2[1] : null; + if (envelope.source == null) { + envelope.source = bits2.length > 1 ? bits2[1] : null; + } } checkState(parts.isEmpty()); diff --git a/validator/src/main/java/com/google/bos/iot/core/proxy/MqttPublisher.java b/validator/src/main/java/com/google/bos/iot/core/proxy/MqttPublisher.java index f0feb89e7e..3d72404a29 100644 --- a/validator/src/main/java/com/google/bos/iot/core/proxy/MqttPublisher.java +++ b/validator/src/main/java/com/google/bos/iot/core/proxy/MqttPublisher.java @@ -100,7 +100,7 @@ public class MqttPublisher implements MessagePublisher { private static final String MESSAGE_TOPIC_FMT = "/%s"; private static final int QOS_AT_MOST_ONCE = 0; private static final int QOS_AT_LEAST_ONCE = 1; - private static final int INITIALIZE_TIME_MS = 20000; + private static final int INITIALIZE_TIME_MS = 30000; private static final String BROKER_URL_FORMAT = "%s://%s:%s"; private static final int PUBLISH_THREAD_COUNT = 10; private static final Duration TOKEN_EXPIRATION = Duration.ofHours(1); @@ -191,7 +191,7 @@ private static String getProviderHostname(ExecutionConfiguration executionConfig () -> switch (iotProvider) { case JWT -> requireNonNull(executionConfiguration.bridge_host, "missing bridge_host"); case GBOS -> DEFAULT_GBOS_HOSTNAME; - case MQTT -> requireNonNull(executionConfiguration.project_id); + case MQTT, IMPLICIT -> requireNonNull(executionConfiguration.project_id); case CLEARBLADE -> DEFAULT_CLEARBLADE_HOSTNAME; default -> throw new RuntimeException("Unsupported iot provider " + iotProvider); } @@ -245,8 +245,8 @@ private CertManager getCertManager() { private String getTopicBase() { return switch (iotProvider) { - case IMPLICIT, GBOS, CLEARBLADE -> format(DEVICE_TOPIC_FMT, deviceId); - case MQTT -> format(FULL_TOPIC_FMT, registryId, deviceId); + case GBOS, CLEARBLADE -> format(DEVICE_TOPIC_FMT, deviceId); + case MQTT, IMPLICIT -> format(FULL_TOPIC_FMT, registryId, deviceId); default -> throw new RuntimeException("Unknown iotProvider " + iotProvider); }; } @@ -525,7 +525,7 @@ private SocketFactory getSocketFactory() { private String getUserName() { return switch (iotProvider) { case GBOS, CLEARBLADE -> UNUSED_ACCOUNT_NAME; - case MQTT -> format(MQTT_USER_NAME_FMT, registryId, deviceId); + case MQTT, IMPLICIT -> format(MQTT_USER_NAME_FMT, registryId, deviceId); default -> throw new RuntimeException("Unsupported iot provider " + iotProvider); }; } @@ -550,7 +550,7 @@ private synchronized void connectAndSetupMqtt() { private char[] getAuthToken(String audience) { return switch (iotProvider) { - case MQTT -> getHashPassword(audience); + case MQTT, IMPLICIT -> getHashPassword(audience); case GBOS, CLEARBLADE -> createJwt(audience); default -> throw new RuntimeException("Unsupported iotProvider " + iotProvider); }; @@ -611,7 +611,7 @@ String getClientId(String deviceId) { } return switch (iotProvider) { case GBOS, CLEARBLADE -> format(LONG_ID_FMT, projectId, cloudRegion, registryId, deviceId); - case MQTT -> format(SHORT_ID_FMT, registryId, deviceId); + case MQTT, IMPLICIT -> format(SHORT_ID_FMT, registryId, deviceId); default -> throw new RuntimeException("Provider not supported " + iotProvider); }; } @@ -622,7 +622,7 @@ private String getBrokerUrl() { private String getBrokerProtocol() { return switch (iotProvider) { - case MQTT, GBOS, CLEARBLADE -> "ssl"; + case MQTT, GBOS, CLEARBLADE, IMPLICIT -> "ssl"; default -> throw new RuntimeException("Provider not supported " + iotProvider); }; } diff --git a/validator/src/main/java/com/google/daq/mqtt/util/IotReflectorClient.java b/validator/src/main/java/com/google/daq/mqtt/util/IotReflectorClient.java index dc7662b781..cd954a5f73 100644 --- a/validator/src/main/java/com/google/daq/mqtt/util/IotReflectorClient.java +++ b/validator/src/main/java/com/google/daq/mqtt/util/IotReflectorClient.java @@ -119,6 +119,7 @@ public void updateDevice(String deviceId, CloudModel device) { @Override public void updateRegistry(CloudModel registry) { registry.operation = ofNullable(registry.operation).orElse(ModelOperation.UPDATE); + registry.resource_type = CloudModel.Resource_type.REGISTRY; cloudModelTransaction(null, CLOUD_MODEL_TOPIC, registry); }