diff --git a/src/oshconnect/eventbus.py b/src/oshconnect/eventbus.py index 7040d2f..308f5ac 100644 --- a/src/oshconnect/eventbus.py +++ b/src/oshconnect/eventbus.py @@ -5,9 +5,25 @@ # Contact Email: ian@botts-inc.com # ============================================================================= import collections +from typing import Any +from uuid import UUID from abc import ABC +class Event(ABC): + """ + A base class for events in the event bus system. + """ + id: UUID + topic: str + payload: Any + + def __init__(self, id: UUID, topic: str, payload: Any): + self.id = id + self.topic = topic + self.payload = payload + + class EventBus(ABC): """ A base class for an event bus system. diff --git a/src/oshconnect/oshconnectapi.py b/src/oshconnect/oshconnectapi.py index 751c54f..2c88871 100644 --- a/src/oshconnect/oshconnectapi.py +++ b/src/oshconnect/oshconnectapi.py @@ -11,7 +11,7 @@ from .csapi4py.default_api_helpers import APIHelper from .datastore import DataStore from .resource_datamodels import DatastreamResource -from .streamableresource import Node, System, SessionManager, Datastream +from .streamableresource import Node, System, SessionManager, Datastream, ControlStream from .styling import Styling from .timemanagement import TemporalModes, TimeManagement, TimePeriod @@ -120,6 +120,7 @@ def discover_datastreams(self): datastreams = list( map(lambda ds: Datastream(parent_node=system.get_parent_node(), id=ds.ds_id, datastream_resource=ds), res_datastreams)) + for ds in datastreams: ds.set_parent_resource_id(system.get_underlying_resource().system_id) # datastreams = [ds.set_parent_resource_id(system.get_underlying_resource().system_id) for ds in datastreams] @@ -142,7 +143,14 @@ def discover_systems(self, nodes: list[str] = None): self._systems.extend(res_systems) def discover_controlstreams(self, streams: list): - pass + for system in self._systems: + res_controlstreams = system.discover_controlstreams() + controlstreams = list( + map(lambda cs: ControlStream(parent_node=system.get_parent_node(), id=cs.cs_id, + controlstream_resource=cs), res_controlstreams)) + for cs in controlstreams: + cs.set_parent_resource_id(system.get_underlying_resource().system_id) + self._datataskers.extend(controlstreams) def authenticate_user(self, user: dict): pass diff --git a/src/oshconnect/resource_datamodels.py b/src/oshconnect/resource_datamodels.py index 0b3fad8..766bcc9 100644 --- a/src/oshconnect/resource_datamodels.py +++ b/src/oshconnect/resource_datamodels.py @@ -8,13 +8,14 @@ from typing import List +from timemanagement import TimeInstant from .geometry import Geometry from .api_utils import Link from pydantic import BaseModel, ConfigDict, Field, SerializeAsAny, model_validator from shapely import Point from .schema_datamodels import DatastreamRecordSchema, CommandSchema -from .timemanagement import DateTimeSchema, TimePeriod +from .timemanagement import TimePeriod class BoundingBox(BaseModel): @@ -114,7 +115,7 @@ class SystemResource(BaseModel): keywords: List[str] = Field(None) identifiers: List[str] = Field(None) classifiers: List[str] = Field(None) - valid_time: DateTimeSchema = Field(None, alias="validTime") + valid_time: TimePeriod = Field(None, alias="validTime") security_constraints: List[SecurityConstraints] = Field(None, alias="securityConstraints") legal_constraints: List[LegalConstraints] = Field(None, alias="legalConstraints") characteristics: List[Characteristics] = Field(None) @@ -177,31 +178,31 @@ def handle_aliases(cls, values): class ObservationResource(BaseModel): - model_config = ConfigDict(populate_by_name=True) + model_config = ConfigDict(populate_by_name=True, arbitrary_types_allowed=True) sampling_feature_id: str = Field(None, alias="samplingFeature@Id") procedure_link: Link = Field(None, alias="procedure@link") - phenomenon_time: DateTimeSchema = Field(None, alias="phenomenonTime") - result_time: DateTimeSchema = Field(..., alias="resultTime") + phenomenon_time: TimeInstant = Field(None, alias="phenomenonTime") + result_time: TimeInstant = Field(..., alias="resultTime") parameters: dict = Field(None) result: dict = Field(...) result_link: Link = Field(None, alias="result@link") class ControlStreamResource(BaseModel): - model_config = ConfigDict(populate_by_name=True) + model_config = ConfigDict(populate_by_name=True, arbitrary_types_allowed=True) cs_id: str = Field(None, alias="id") name: str = Field(...) description: str = Field(None) - valid_time: TimePeriod = Field(..., alias="validTime") + valid_time: TimePeriod = Field(None, alias="validTime") input_name: str = Field(None, alias="inputName") procedure_link: Link = Field(None, alias="procedureLink@link") deployment_link: Link = Field(None, alias="deploymentLink@link") feature_of_interest_link: Link = Field(None, alias="featureOfInterest@link") sampling_feature_link: Link = Field(None, alias="samplingFeature@link") - issue_time: DateTimeSchema = Field(None, alias="issueTime") - execution_time: DateTimeSchema = Field(None, alias="executionTime") + issue_time: TimePeriod = Field(None, alias="issueTime") + execution_time: TimePeriod = Field(None, alias="executionTime") live: bool = Field(None) asynchronous: bool = Field(True, alias="async") command_schema: SerializeAsAny[CommandSchema] = Field(None, alias="schema") diff --git a/src/oshconnect/streamableresource.py b/src/oshconnect/streamableresource.py index c636654..0977cfe 100644 --- a/src/oshconnect/streamableresource.py +++ b/src/oshconnect/streamableresource.py @@ -574,6 +574,18 @@ def discover_datastreams(self) -> list[DatastreamResource]: return ds_resources + def discover_controlstreams(self) -> list[ControlStreamResource]: + res = self._parent_node.get_api_helper().get_resource(APIResourceTypes.SYSTEM, self._resource_id, + APIResourceTypes.CONTROL_CHANNEL) + controlstream_json = res.json()['items'] + cs_resources = [] + + for cs in controlstream_json: + controlstream_objs = ControlStreamResource.model_validate(cs) + cs_resources.append(controlstream_objs) + + return cs_resources + @staticmethod def from_system_resource(system_resource: SystemResource, parent_node: Node) -> System: other_props = system_resource.model_dump() @@ -802,7 +814,6 @@ class ControlStream(StreamableResource[ControlStreamResource]): _inbound_status_deque: deque _outbound_status_deque: deque - def __init__(self, node: Node = None, controlstream_resource: ControlStreamResource = None): super().__init__(node=node) self._underlying_resource = controlstream_resource @@ -819,14 +830,6 @@ def init_mqtt(self): super().init_mqtt() self._topic = self.get_mqtt_topic(subresource=APIResourceTypes.COMMAND) - # def subscribe_to_status(self, topic: str): - # # TODO: This should probably be a flag to subscribe to status updates as the commands come in, trying to manage this manually would - # # prove tedious - # pass - # - # def publish_status(self, payload): - # pass - def get_mqtt_status_topic(self): return self.get_mqtt_topic(subresource=APIResourceTypes.STATUS) diff --git a/src/oshconnect/timemanagement.py b/src/oshconnect/timemanagement.py index b7bdd8b..84c5381 100644 --- a/src/oshconnect/timemanagement.py +++ b/src/oshconnect/timemanagement.py @@ -11,9 +11,9 @@ import time from datetime import datetime, timezone from enum import Enum -from typing import Any, Self +from typing import Any -from pydantic import BaseModel, ConfigDict, Field, field_validator, model_serializer, model_validator +from pydantic import BaseModel, ConfigDict, Field, model_serializer, model_validator class TemporalModes(Enum): @@ -200,24 +200,25 @@ def __repr__(self): return f'{self.get_iso_time()}' -class DateTimeSchema(BaseModel): - is_instant: bool = Field(True, description="Whether the date time is an instant or a period.") - iso_date: str = Field(None, description="The ISO formatted date time.") - time_period: tuple = Field(None, description="The time period of the date time.") - - @model_validator(mode='before') - def valid_datetime_type(self) -> Self: - if self.is_instant: - if self.iso_date is None: - raise ValueError("Instant date time must have a valid ISO8601 date.") - return self - - @field_validator('iso_date') - @classmethod - def check_iso_date(cls, v) -> str: - if not v: - raise ValueError("Instant date time must have a valid ISO8601 date.") - return v +# class DateTimeSchema(BaseModel): +# is_instant: bool = Field(True, description="Whether the date time is an instant or a period.") +# iso_date: str = Field(None, description="The ISO formatted date time.") +# time_period: tuple = Field(None, description="The time period of the date time.") +# +# @model_validator(mode='before') +# def valid_datetime_type(self) -> Self: +# print("DEBUGGING DateTimeSchema valid_datetime_type") +# if self.is_instant: +# if self.iso_date is None: +# raise ValueError("Instant date time must have a valid ISO8601 date.") +# return self +# +# @field_validator('iso_date') +# @classmethod +# def check_iso_date(cls, v) -> str: +# if not v: +# raise ValueError("Instant date time must have a valid ISO8601 date.") +# return v class IndeterminateTime(Enum): @@ -245,7 +246,7 @@ def valid_time_period(cls, data) -> Any: data_dict['end'] = cls.check_mbr_type(data['end']) if not cls.compare_start_lt_end(data_dict['start'], data_dict['end']): - raise ValueError("Start time must be less than end time") + raise ValueError("Start time must be less than or equal to end time") return data_dict @@ -263,11 +264,12 @@ def check_mbr_type(value): return tp elif isinstance(value, TimeInstant): return value + return None @classmethod def compare_start_lt_end(cls, start: TimeInstant | str, end: TimeInstant | str) -> bool: if isinstance(start, TimeInstant) and isinstance(end, TimeInstant): - return start < end + return start <= end elif isinstance(start, str) and isinstance(end, str): if start == "now" and end == "now": raise ValueError("Start and end cannot both be 'now'") diff --git a/tests/test_resource_datamodels.py b/tests/test_resource_datamodels.py new file mode 100644 index 0000000..f6c8d66 --- /dev/null +++ b/tests/test_resource_datamodels.py @@ -0,0 +1,15 @@ +# ============================================================================= +# Copyright (c) 2025 Botts Innovative Research Inc. +# Date: 2025/10/22 +# Author: Ian Patterson +# Contact Email: ian@botts-inc.com +# ============================================================================= +from src.oshconnect.resource_datamodels import ControlStreamResource + + +def test_control_stream_resource(): + res_str = {'id': '0228vl6tn15g', 'name': 'Puppy Pi Control', 'description': 'Puppy pi control', 'system@id': '029tjlvogsng', 'system@link': {'href': 'http://192.168.8.136:8080/sensorhub/api/systems/029tjlvogsng?f=json', 'uid': 'urn:puppypi:001', 'type': 'application/geo+json'}, 'inputName': 'puppypicontrol', 'validTime': ['2025-10-21T19:04:56.505817Z', 'now'], 'issueTime': ['2025-10-22T17:12:58.51182Z', '2025-10-22T17:12:58.51182Z'], 'controlledProperties': [{'definition': 'http://sensorml.com/ont/swe/property/triggercontrol', 'label': 'Forward', 'description': 'Moves the puppy pi forward when true'}], 'formats': ['application/json', 'application/swe+json', 'application/swe+csv', 'application/swe+xml', 'application/swe+binary']} + # res_dict = json.loads(res_str) + csr = ControlStreamResource.model_validate(res_str) + + assert isinstance(csr, ControlStreamResource)