Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions src/oshconnect/eventbus.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,25 @@
# Contact Email: ian@botts-inc.com
# =============================================================================
import collections
from typing import Any
from uuid import UUID
from abc import ABC


class Event(ABC):
"""
A base class for events in the event bus system.
"""
id: UUID
topic: str
payload: Any

def __init__(self, id: UUID, topic: str, payload: Any):
self.id = id
self.topic = topic
self.payload = payload


class EventBus(ABC):
"""
A base class for an event bus system.
Expand Down
12 changes: 10 additions & 2 deletions src/oshconnect/oshconnectapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from .csapi4py.default_api_helpers import APIHelper
from .datastore import DataStore
from .resource_datamodels import DatastreamResource
from .streamableresource import Node, System, SessionManager, Datastream
from .streamableresource import Node, System, SessionManager, Datastream, ControlStream
from .styling import Styling
from .timemanagement import TemporalModes, TimeManagement, TimePeriod

Expand Down Expand Up @@ -120,6 +120,7 @@ def discover_datastreams(self):
datastreams = list(
map(lambda ds: Datastream(parent_node=system.get_parent_node(), id=ds.ds_id, datastream_resource=ds),
res_datastreams))

for ds in datastreams:
ds.set_parent_resource_id(system.get_underlying_resource().system_id)
# datastreams = [ds.set_parent_resource_id(system.get_underlying_resource().system_id) for ds in datastreams]
Expand All @@ -142,7 +143,14 @@ def discover_systems(self, nodes: list[str] = None):
self._systems.extend(res_systems)

def discover_controlstreams(self, streams: list):
pass
for system in self._systems:
res_controlstreams = system.discover_controlstreams()
controlstreams = list(
map(lambda cs: ControlStream(parent_node=system.get_parent_node(), id=cs.cs_id,
controlstream_resource=cs), res_controlstreams))
for cs in controlstreams:
cs.set_parent_resource_id(system.get_underlying_resource().system_id)
self._datataskers.extend(controlstreams)

def authenticate_user(self, user: dict):
pass
Expand Down
19 changes: 10 additions & 9 deletions src/oshconnect/resource_datamodels.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@

from typing import List

from timemanagement import TimeInstant
from .geometry import Geometry
from .api_utils import Link
from pydantic import BaseModel, ConfigDict, Field, SerializeAsAny, model_validator
from shapely import Point

from .schema_datamodels import DatastreamRecordSchema, CommandSchema
from .timemanagement import DateTimeSchema, TimePeriod
from .timemanagement import TimePeriod


class BoundingBox(BaseModel):
Expand Down Expand Up @@ -114,7 +115,7 @@ class SystemResource(BaseModel):
keywords: List[str] = Field(None)
identifiers: List[str] = Field(None)
classifiers: List[str] = Field(None)
valid_time: DateTimeSchema = Field(None, alias="validTime")
valid_time: TimePeriod = Field(None, alias="validTime")
security_constraints: List[SecurityConstraints] = Field(None, alias="securityConstraints")
legal_constraints: List[LegalConstraints] = Field(None, alias="legalConstraints")
characteristics: List[Characteristics] = Field(None)
Expand Down Expand Up @@ -177,31 +178,31 @@ def handle_aliases(cls, values):


class ObservationResource(BaseModel):
model_config = ConfigDict(populate_by_name=True)
model_config = ConfigDict(populate_by_name=True, arbitrary_types_allowed=True)

sampling_feature_id: str = Field(None, alias="samplingFeature@Id")
procedure_link: Link = Field(None, alias="procedure@link")
phenomenon_time: DateTimeSchema = Field(None, alias="phenomenonTime")
result_time: DateTimeSchema = Field(..., alias="resultTime")
phenomenon_time: TimeInstant = Field(None, alias="phenomenonTime")
result_time: TimeInstant = Field(..., alias="resultTime")
parameters: dict = Field(None)
result: dict = Field(...)
result_link: Link = Field(None, alias="result@link")


class ControlStreamResource(BaseModel):
model_config = ConfigDict(populate_by_name=True)
model_config = ConfigDict(populate_by_name=True, arbitrary_types_allowed=True)

cs_id: str = Field(None, alias="id")
name: str = Field(...)
description: str = Field(None)
valid_time: TimePeriod = Field(..., alias="validTime")
valid_time: TimePeriod = Field(None, alias="validTime")
input_name: str = Field(None, alias="inputName")
procedure_link: Link = Field(None, alias="procedureLink@link")
deployment_link: Link = Field(None, alias="deploymentLink@link")
feature_of_interest_link: Link = Field(None, alias="featureOfInterest@link")
sampling_feature_link: Link = Field(None, alias="samplingFeature@link")
issue_time: DateTimeSchema = Field(None, alias="issueTime")
execution_time: DateTimeSchema = Field(None, alias="executionTime")
issue_time: TimePeriod = Field(None, alias="issueTime")
execution_time: TimePeriod = Field(None, alias="executionTime")
live: bool = Field(None)
asynchronous: bool = Field(True, alias="async")
command_schema: SerializeAsAny[CommandSchema] = Field(None, alias="schema")
Expand Down
21 changes: 12 additions & 9 deletions src/oshconnect/streamableresource.py
Original file line number Diff line number Diff line change
Expand Up @@ -574,6 +574,18 @@ def discover_datastreams(self) -> list[DatastreamResource]:

return ds_resources

def discover_controlstreams(self) -> list[ControlStreamResource]:
res = self._parent_node.get_api_helper().get_resource(APIResourceTypes.SYSTEM, self._resource_id,
APIResourceTypes.CONTROL_CHANNEL)
controlstream_json = res.json()['items']
cs_resources = []

for cs in controlstream_json:
controlstream_objs = ControlStreamResource.model_validate(cs)
cs_resources.append(controlstream_objs)

return cs_resources

@staticmethod
def from_system_resource(system_resource: SystemResource, parent_node: Node) -> System:
other_props = system_resource.model_dump()
Expand Down Expand Up @@ -802,7 +814,6 @@ class ControlStream(StreamableResource[ControlStreamResource]):
_inbound_status_deque: deque
_outbound_status_deque: deque


def __init__(self, node: Node = None, controlstream_resource: ControlStreamResource = None):
super().__init__(node=node)
self._underlying_resource = controlstream_resource
Expand All @@ -819,14 +830,6 @@ def init_mqtt(self):
super().init_mqtt()
self._topic = self.get_mqtt_topic(subresource=APIResourceTypes.COMMAND)

# def subscribe_to_status(self, topic: str):
# # TODO: This should probably be a flag to subscribe to status updates as the commands come in, trying to manage this manually would
# # prove tedious
# pass
#
# def publish_status(self, payload):
# pass

def get_mqtt_status_topic(self):
return self.get_mqtt_topic(subresource=APIResourceTypes.STATUS)

Expand Down
46 changes: 24 additions & 22 deletions src/oshconnect/timemanagement.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@
import time
from datetime import datetime, timezone
from enum import Enum
from typing import Any, Self
from typing import Any

from pydantic import BaseModel, ConfigDict, Field, field_validator, model_serializer, model_validator
from pydantic import BaseModel, ConfigDict, Field, model_serializer, model_validator


class TemporalModes(Enum):
Expand Down Expand Up @@ -200,24 +200,25 @@ def __repr__(self):
return f'{self.get_iso_time()}'


class DateTimeSchema(BaseModel):
is_instant: bool = Field(True, description="Whether the date time is an instant or a period.")
iso_date: str = Field(None, description="The ISO formatted date time.")
time_period: tuple = Field(None, description="The time period of the date time.")

@model_validator(mode='before')
def valid_datetime_type(self) -> Self:
if self.is_instant:
if self.iso_date is None:
raise ValueError("Instant date time must have a valid ISO8601 date.")
return self

@field_validator('iso_date')
@classmethod
def check_iso_date(cls, v) -> str:
if not v:
raise ValueError("Instant date time must have a valid ISO8601 date.")
return v
# class DateTimeSchema(BaseModel):
# is_instant: bool = Field(True, description="Whether the date time is an instant or a period.")
# iso_date: str = Field(None, description="The ISO formatted date time.")
# time_period: tuple = Field(None, description="The time period of the date time.")
#
# @model_validator(mode='before')
# def valid_datetime_type(self) -> Self:
# print("DEBUGGING DateTimeSchema valid_datetime_type")
# if self.is_instant:
# if self.iso_date is None:
# raise ValueError("Instant date time must have a valid ISO8601 date.")
# return self
#
# @field_validator('iso_date')
# @classmethod
# def check_iso_date(cls, v) -> str:
# if not v:
# raise ValueError("Instant date time must have a valid ISO8601 date.")
# return v


class IndeterminateTime(Enum):
Expand Down Expand Up @@ -245,7 +246,7 @@ def valid_time_period(cls, data) -> Any:
data_dict['end'] = cls.check_mbr_type(data['end'])

if not cls.compare_start_lt_end(data_dict['start'], data_dict['end']):
raise ValueError("Start time must be less than end time")
raise ValueError("Start time must be less than or equal to end time")

return data_dict

Expand All @@ -263,11 +264,12 @@ def check_mbr_type(value):
return tp
elif isinstance(value, TimeInstant):
return value
return None

@classmethod
def compare_start_lt_end(cls, start: TimeInstant | str, end: TimeInstant | str) -> bool:
if isinstance(start, TimeInstant) and isinstance(end, TimeInstant):
return start < end
return start <= end
elif isinstance(start, str) and isinstance(end, str):
if start == "now" and end == "now":
raise ValueError("Start and end cannot both be 'now'")
Expand Down
15 changes: 15 additions & 0 deletions tests/test_resource_datamodels.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# =============================================================================
# Copyright (c) 2025 Botts Innovative Research Inc.
# Date: 2025/10/22
# Author: Ian Patterson
# Contact Email: ian@botts-inc.com
# =============================================================================
from src.oshconnect.resource_datamodels import ControlStreamResource


def test_control_stream_resource():
res_str = {'id': '0228vl6tn15g', 'name': 'Puppy Pi Control', 'description': 'Puppy pi control', 'system@id': '029tjlvogsng', 'system@link': {'href': 'http://192.168.8.136:8080/sensorhub/api/systems/029tjlvogsng?f=json', 'uid': 'urn:puppypi:001', 'type': 'application/geo+json'}, 'inputName': 'puppypicontrol', 'validTime': ['2025-10-21T19:04:56.505817Z', 'now'], 'issueTime': ['2025-10-22T17:12:58.51182Z', '2025-10-22T17:12:58.51182Z'], 'controlledProperties': [{'definition': 'http://sensorml.com/ont/swe/property/triggercontrol', 'label': 'Forward', 'description': 'Moves the puppy pi forward when true'}], 'formats': ['application/json', 'application/swe+json', 'application/swe+csv', 'application/swe+xml', 'application/swe+binary']}
# res_dict = json.loads(res_str)
csr = ControlStreamResource.model_validate(res_str)

assert isinstance(csr, ControlStreamResource)