diff --git a/src/enapter_mcp_server/core/application_server.py b/src/enapter_mcp_server/core/application_server.py index 84bfc29..a836f60 100644 --- a/src/enapter_mcp_server/core/application_server.py +++ b/src/enapter_mcp_server/core/application_server.py @@ -1,6 +1,8 @@ import datetime import re +import enapter + from enapter_mcp_server import domain from .auth_config import AuthConfig @@ -229,9 +231,32 @@ async def get_historical_telemetry( time_from: datetime.datetime, time_to: datetime.datetime, granularity: int, + aggregation: enapter.http.api.telemetry.Aggregation, ) -> domain.HistoricalTelemetry: return await self._enapter_api.get_historical_telemetry( - auth, device_id, attributes, time_from, time_to, granularity + auth, + device_id, + attributes, + time_from, + time_to, + granularity, + aggregation, + ) + + async def get_historical_telemetry_stats( + self, + auth: AuthConfig, + device_id: str, + attributes: list[str], + time_from: datetime.datetime, + time_to: datetime.datetime, + ) -> domain.HistoricalTelemetryStats: + return await self._enapter_api.get_historical_telemetry_stats( + auth, + device_id, + attributes, + time_from, + time_to, ) async def search_command_executions( diff --git a/src/enapter_mcp_server/core/enapter_api.py b/src/enapter_mcp_server/core/enapter_api.py index 5fe179b..494192e 100644 --- a/src/enapter_mcp_server/core/enapter_api.py +++ b/src/enapter_mcp_server/core/enapter_api.py @@ -53,4 +53,14 @@ async def get_historical_telemetry( time_from: datetime.datetime, time_to: datetime.datetime, granularity: int, + aggregation: enapter.http.api.telemetry.Aggregation, ) -> domain.HistoricalTelemetry: ... + + async def get_historical_telemetry_stats( + self, + auth: AuthConfig, + device_id: str, + attributes: list[str], + time_from: datetime.datetime, + time_to: datetime.datetime, + ) -> domain.HistoricalTelemetryStats: ... diff --git a/src/enapter_mcp_server/domain/__init__.py b/src/enapter_mcp_server/domain/__init__.py index 9a6cf66..c3cdc8e 100644 --- a/src/enapter_mcp_server/domain/__init__.py +++ b/src/enapter_mcp_server/domain/__init__.py @@ -14,6 +14,10 @@ from .device_type import DeviceType from .device_view import DeviceView from .historical_telemetry import HistoricalTelemetry +from .historical_telemetry_stats import ( + HistoricalTelemetryAttributeStats, + HistoricalTelemetryStats, +) from .property_declaration import PropertyDeclaration from .site import Site from .telemetry_attribute_declaration import TelemetryAttributeDeclaration @@ -35,6 +39,8 @@ "DeviceType", "DeviceView", "HistoricalTelemetry", + "HistoricalTelemetryAttributeStats", + "HistoricalTelemetryStats", "PropertyDeclaration", "Site", "TelemetryAttributeDeclaration", diff --git a/src/enapter_mcp_server/domain/historical_telemetry_stats.py b/src/enapter_mcp_server/domain/historical_telemetry_stats.py new file mode 100644 index 0000000..85b85d5 --- /dev/null +++ b/src/enapter_mcp_server/domain/historical_telemetry_stats.py @@ -0,0 +1,15 @@ +import dataclasses +from typing import Any + + +@dataclasses.dataclass(frozen=True, kw_only=True) +class HistoricalTelemetryAttributeStats: + min: Any + max: Any + avg: Any + last: Any + + +@dataclasses.dataclass(frozen=True, kw_only=True) +class HistoricalTelemetryStats: + values: dict[str, HistoricalTelemetryAttributeStats] diff --git a/src/enapter_mcp_server/http/enapter_api.py b/src/enapter_mcp_server/http/enapter_api.py index 5c51134..fe5e0d6 100644 --- a/src/enapter_mcp_server/http/enapter_api.py +++ b/src/enapter_mcp_server/http/enapter_api.py @@ -1,6 +1,6 @@ import contextlib import datetime -from typing import Any, AsyncGenerator, Self +from typing import Any, AsyncGenerator, Callable, Self import enapter @@ -100,6 +100,7 @@ async def get_historical_telemetry( time_from: datetime.datetime, time_to: datetime.datetime, granularity: int, + aggregation: enapter.http.api.telemetry.Aggregation, ) -> domain.HistoricalTelemetry: async with self._new_client(auth) as client: telemetry = await client.telemetry.wide_timeseries( @@ -108,12 +109,67 @@ async def get_historical_telemetry( granularity=granularity, selectors=[ enapter.http.api.telemetry.Selector( - device=device_id, attributes=attributes + device=device_id, + attributes=attributes, + aggregation=aggregation, ) ], ) return self._data_mapper.to_historical_telemetry(telemetry) + async def get_historical_telemetry_stats( + self, + auth: core.AuthConfig, + device_id: str, + attributes: list[str], + time_from: datetime.datetime, + time_to: datetime.datetime, + ) -> domain.HistoricalTelemetryStats: + # Platform API may return 1-2 points per attribute when PG time_bucket + # boundaries don't align with time_from; the reducer collapses them + # into a single scalar consistent with the requested aggregation. + reducers: dict[str, Callable[[list[Any]], Any]] = { + "min": min, + "max": max, + "avg": lambda xs: sum(xs) / len(xs), + "last": lambda xs: xs[-1], + } + granularity = int((time_to - time_from).total_seconds()) + async with self._new_client(auth) as client: + telemetry = await client.telemetry.wide_timeseries( + from_=time_from, + to=time_to, + granularity=granularity, + selectors=[ + enapter.http.api.telemetry.Selector( + device=device_id, + attributes=attributes, + aggregation=enapter.http.api.telemetry.Aggregation(agg.upper()), + ) + for agg in reducers + ], + ) + + scalars: dict[str, dict[str, Any]] = {agg: {} for agg in reducers} + for column in telemetry.columns: + agg = column.labels["aggregation"] + non_null = [v for v in column.values if v is not None] + scalars[agg][column.labels.telemetry] = ( + reducers[agg](non_null) if non_null else None + ) + + return domain.HistoricalTelemetryStats( + values={ + attr: domain.HistoricalTelemetryAttributeStats( + min=scalars["min"].get(attr), + max=scalars["max"].get(attr), + avg=scalars["avg"].get(attr), + last=scalars["last"].get(attr), + ) + for attr in attributes + } + ) + @contextlib.asynccontextmanager async def _new_client( self, auth: core.AuthConfig diff --git a/src/enapter_mcp_server/mcp/models/__init__.py b/src/enapter_mcp_server/mcp/models/__init__.py index fa14b10..369a0c3 100644 --- a/src/enapter_mcp_server/mcp/models/__init__.py +++ b/src/enapter_mcp_server/mcp/models/__init__.py @@ -13,6 +13,11 @@ from .device_type import DeviceType from .device_view import DeviceView from .historical_telemetry import HistoricalTelemetry +from .historical_telemetry_aggregation import HistoricalTelemetryAggregation +from .historical_telemetry_stats import ( + HistoricalTelemetryAttributeStats, + HistoricalTelemetryStats, +) from .property_declaration import PropertyDeclaration from .site import Site from .telemetry_attribute_declaration import TelemetryAttributeDeclaration @@ -33,6 +38,9 @@ "DeviceType", "DeviceView", "HistoricalTelemetry", + "HistoricalTelemetryAttributeStats", + "HistoricalTelemetryStats", + "HistoricalTelemetryAggregation", "PropertyDeclaration", "Site", "TelemetryAttributeDeclaration", diff --git a/src/enapter_mcp_server/mcp/models/historical_telemetry_aggregation.py b/src/enapter_mcp_server/mcp/models/historical_telemetry_aggregation.py new file mode 100644 index 0000000..0e4804f --- /dev/null +++ b/src/enapter_mcp_server/mcp/models/historical_telemetry_aggregation.py @@ -0,0 +1,3 @@ +from typing import Literal + +HistoricalTelemetryAggregation = Literal["auto", "avg", "min", "max", "last", "bool_or"] diff --git a/src/enapter_mcp_server/mcp/models/historical_telemetry_stats.py b/src/enapter_mcp_server/mcp/models/historical_telemetry_stats.py new file mode 100644 index 0000000..7618a54 --- /dev/null +++ b/src/enapter_mcp_server/mcp/models/historical_telemetry_stats.py @@ -0,0 +1,44 @@ +from typing import Any, Self + +import pydantic + +from enapter_mcp_server import domain + + +class HistoricalTelemetryAttributeStats(pydantic.BaseModel): + """Per-attribute min/max/avg/last over a time period.""" + + min: Any + max: Any + avg: Any + last: Any + + @classmethod + def from_domain(cls, stats: domain.HistoricalTelemetryAttributeStats) -> Self: + return cls( + min=stats.min, + max=stats.max, + avg=stats.avg, + last=stats.last, + ) + + +class HistoricalTelemetryStats(pydantic.BaseModel): + """Per-attribute min/max/avg/last over a time period. + + Unlike `get_historical_telemetry` (which returns a time series of + bucket-level aggregates), these values are computed over the raw + datapoints across the entire period — so short dropouts or spikes + are preserved and not smoothed away. + """ + + values: dict[str, HistoricalTelemetryAttributeStats] + + @classmethod + def from_domain(cls, stats: domain.HistoricalTelemetryStats) -> Self: + return cls( + values={ + k: HistoricalTelemetryAttributeStats.from_domain(v) + for k, v in stats.values.items() + } + ) diff --git a/src/enapter_mcp_server/mcp/server.py b/src/enapter_mcp_server/mcp/server.py index 204d8d0..0f6026e 100644 --- a/src/enapter_mcp_server/mcp/server.py +++ b/src/enapter_mcp_server/mcp/server.py @@ -126,6 +126,7 @@ def _register_tools(self, fastmcp_server: fastmcp.FastMCP) -> None: (self.search_command_executions, "Search Command Executions"), (self.read_blueprint, "Read Blueprint"), (self.get_historical_telemetry, "Get Historical Telemetry"), + (self.get_historical_telemetry_stats, "Get Historical Telemetry Stats"), ] for tool, title in read_only_tools: fastmcp_server.tool( @@ -303,14 +304,21 @@ async def get_historical_telemetry( time_from: datetime.datetime, time_to: datetime.datetime, granularity: int = 60 * 60, + aggregation: models.HistoricalTelemetryAggregation = "auto", ) -> models.HistoricalTelemetry: - """Retrieve aggregated telemetry data. + """Retrieve telemetry aggregated into time buckets over a time period. Most devices send telemetry data once per second. To reduce the amount of data transferred, the `granularity` parameter can be used to aggregate data over a specified interval (in seconds). For example, a granularity of 3600 seconds (1 hour) will return hourly averages of the telemetry data. + + `aggregation` picks the reducer per bucket: `auto` adapts per attribute + type; override with `avg`/`min`/`max` (numeric), `bool_or` (boolean), + or `last` (any). + + For a single set of raw min/max/avg/last, use `get_historical_telemetry_stats`. """ auth = await self._get_auth_config() telemetry = await self._app.get_historical_telemetry( @@ -320,9 +328,38 @@ async def get_historical_telemetry( time_from=time_from, time_to=time_to, granularity=granularity, + aggregation=enapter.http.api.telemetry.Aggregation(aggregation.upper()), ) return models.HistoricalTelemetry.from_domain(telemetry) + async def get_historical_telemetry_stats( + self, + device_id: str, + attributes: list[str], + time_from: datetime.datetime, + time_to: datetime.datetime, + ) -> models.HistoricalTelemetryStats: + """Retrieve per-attribute min/max/avg/last over a time period. + + Computed over raw datapoints — brief dropouts and spikes are preserved, + unlike the bucket averages in `get_historical_telemetry`. + + Numeric attributes only. + + `min`/`max`/`last` are exact for any period. `avg` is exact under 8h; + typical drift <1% for 8h–30 days, up to several % for longer periods + (read from pre-aggregated data). + """ + auth = await self._get_auth_config() + stats = await self._app.get_historical_telemetry_stats( + auth=auth, + device_id=device_id, + attributes=attributes, + time_from=time_from, + time_to=time_to, + ) + return models.HistoricalTelemetryStats.from_domain(stats) + async def _get_auth_config(self) -> core.AuthConfig: if self._config.oauth_proxy is None: headers = fastmcp.server.dependencies.get_http_headers() diff --git a/tests/unit/core/test_application_server.py b/tests/unit/core/test_application_server.py index 4992a04..1268454 100644 --- a/tests/unit/core/test_application_server.py +++ b/tests/unit/core/test_application_server.py @@ -32,6 +32,7 @@ def __init__( devices: list[core.DeviceDTO] | None = None, telemetry: dict[str, dict[str, Any]] | None = None, historical_telemetry: domain.HistoricalTelemetry | None = None, + historical_telemetry_stats: domain.HistoricalTelemetryStats | None = None, latest_telemetry_unavailable: bool = False, command_executions: dict[str, list[domain.CommandExecution]] | None = None, ): @@ -39,6 +40,7 @@ def __init__( self._devices = devices or [] self._telemetry = telemetry or {} self._historical_telemetry = historical_telemetry + self._historical_telemetry_stats = historical_telemetry_stats self._latest_telemetry_unavailable = latest_telemetry_unavailable self._command_executions = command_executions or {} self.latest_telemetry_batch_calls = 0 @@ -102,11 +104,24 @@ async def get_historical_telemetry( time_from: datetime.datetime, time_to: datetime.datetime, granularity: int, + aggregation: enapter.http.api.telemetry.Aggregation, ) -> domain.HistoricalTelemetry: if self._historical_telemetry is None: raise NotImplementedError() return self._historical_telemetry + async def get_historical_telemetry_stats( + self, + auth: core.AuthConfig, + device_id: str, + attributes: list[str], + time_from: datetime.datetime, + time_to: datetime.datetime, + ) -> domain.HistoricalTelemetryStats: + if self._historical_telemetry_stats is None: + raise NotImplementedError() + return self._historical_telemetry_stats + class TestApplicationServer: @@ -745,6 +760,7 @@ async def test_get_historical_telemetry(self) -> None: datetime.datetime.now(), datetime.datetime.now(), 60, + enapter.http.api.telemetry.Aggregation.AUTO, ) assert result == historical