From 97788de9a626ec4435bf3df8f9e7e12c63fd6f59 Mon Sep 17 00:00:00 2001 From: valera Date: Sun, 12 Apr 2026 22:06:49 +0400 Subject: [PATCH 01/14] feat(telemetry): propagate aggregation to historical telemetry selector Allow callers to pick the aggregation function (MIN, MAX, AVG, ...) when fetching historical telemetry, instead of always relying on the SDK's default AUTO behaviour. Needed so that higher-level features can request true min/max over a period rather than extremes of pre-averaged buckets. --- src/enapter_mcp_server/core/enapter_api.py | 1 + src/enapter_mcp_server/http/enapter_api.py | 5 ++++- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/src/enapter_mcp_server/core/enapter_api.py b/src/enapter_mcp_server/core/enapter_api.py index 5fe179b..3014152 100644 --- a/src/enapter_mcp_server/core/enapter_api.py +++ b/src/enapter_mcp_server/core/enapter_api.py @@ -53,4 +53,5 @@ async def get_historical_telemetry( time_from: datetime.datetime, time_to: datetime.datetime, granularity: int, + aggregation: enapter.http.api.telemetry.Aggregation | None = None, ) -> domain.HistoricalTelemetry: ... diff --git a/src/enapter_mcp_server/http/enapter_api.py b/src/enapter_mcp_server/http/enapter_api.py index 5c51134..c876449 100644 --- a/src/enapter_mcp_server/http/enapter_api.py +++ b/src/enapter_mcp_server/http/enapter_api.py @@ -100,6 +100,7 @@ async def get_historical_telemetry( time_from: datetime.datetime, time_to: datetime.datetime, granularity: int, + aggregation: enapter.http.api.telemetry.Aggregation | None = None, ) -> domain.HistoricalTelemetry: async with self._new_client(auth) as client: telemetry = await client.telemetry.wide_timeseries( @@ -108,7 +109,9 @@ async def get_historical_telemetry( granularity=granularity, selectors=[ enapter.http.api.telemetry.Selector( - device=device_id, attributes=attributes + device=device_id, + attributes=attributes, + aggregation=aggregation, ) ], ) From 616039d8f6eee4007db1d1f4ec8d0095a239891a Mon Sep 17 00:00:00 2001 From: valera Date: Sun, 12 Apr 2026 22:32:31 +0400 Subject: [PATCH 02/14] feat(telemetry): expose aggregation on get_historical_telemetry MCP tool Let clients pick per-bucket aggregation (avg/min/max/last/bool_or) instead of always getting the server default. Enables use cases like "daily minimum voltage series" or "hourly peak demand" from the MCP surface. Omitting the parameter preserves prior behaviour. --- src/enapter_mcp_server/core/application_server.py | 11 ++++++++++- src/enapter_mcp_server/mcp/models/__init__.py | 2 ++ .../mcp/models/telemetry_aggregation.py | 3 +++ src/enapter_mcp_server/mcp/server.py | 5 +++++ 4 files changed, 20 insertions(+), 1 deletion(-) create mode 100644 src/enapter_mcp_server/mcp/models/telemetry_aggregation.py diff --git a/src/enapter_mcp_server/core/application_server.py b/src/enapter_mcp_server/core/application_server.py index 84bfc29..fa6587b 100644 --- a/src/enapter_mcp_server/core/application_server.py +++ b/src/enapter_mcp_server/core/application_server.py @@ -1,6 +1,8 @@ import datetime import re +import enapter + from enapter_mcp_server import domain from .auth_config import AuthConfig @@ -229,9 +231,16 @@ async def get_historical_telemetry( time_from: datetime.datetime, time_to: datetime.datetime, granularity: int, + aggregation: enapter.http.api.telemetry.Aggregation | None = None, ) -> domain.HistoricalTelemetry: return await self._enapter_api.get_historical_telemetry( - auth, device_id, attributes, time_from, time_to, granularity + auth, + device_id, + attributes, + time_from, + time_to, + granularity, + aggregation=aggregation, ) async def search_command_executions( diff --git a/src/enapter_mcp_server/mcp/models/__init__.py b/src/enapter_mcp_server/mcp/models/__init__.py index fa14b10..81f4858 100644 --- a/src/enapter_mcp_server/mcp/models/__init__.py +++ b/src/enapter_mcp_server/mcp/models/__init__.py @@ -15,6 +15,7 @@ from .historical_telemetry import HistoricalTelemetry from .property_declaration import PropertyDeclaration from .site import Site +from .telemetry_aggregation import TelemetryAggregation from .telemetry_attribute_declaration import TelemetryAttributeDeclaration __all__ = [ @@ -35,5 +36,6 @@ "HistoricalTelemetry", "PropertyDeclaration", "Site", + "TelemetryAggregation", "TelemetryAttributeDeclaration", ] diff --git a/src/enapter_mcp_server/mcp/models/telemetry_aggregation.py b/src/enapter_mcp_server/mcp/models/telemetry_aggregation.py new file mode 100644 index 0000000..7f7aa36 --- /dev/null +++ b/src/enapter_mcp_server/mcp/models/telemetry_aggregation.py @@ -0,0 +1,3 @@ +from typing import Literal + +TelemetryAggregation = Literal["auto", "avg", "min", "max", "last", "bool_or"] diff --git a/src/enapter_mcp_server/mcp/server.py b/src/enapter_mcp_server/mcp/server.py index 204d8d0..18922c3 100644 --- a/src/enapter_mcp_server/mcp/server.py +++ b/src/enapter_mcp_server/mcp/server.py @@ -303,6 +303,7 @@ async def get_historical_telemetry( time_from: datetime.datetime, time_to: datetime.datetime, granularity: int = 60 * 60, + aggregation: models.TelemetryAggregation = "auto", ) -> models.HistoricalTelemetry: """Retrieve aggregated telemetry data. @@ -311,6 +312,9 @@ async def get_historical_telemetry( aggregate data over a specified interval (in seconds). For example, a granularity of 3600 seconds (1 hour) will return hourly averages of the telemetry data. + + The `aggregation` parameter controls how datapoints within a bucket + are combined. `auto` picks a sensible per-attribute default. """ auth = await self._get_auth_config() telemetry = await self._app.get_historical_telemetry( @@ -320,6 +324,7 @@ async def get_historical_telemetry( time_from=time_from, time_to=time_to, granularity=granularity, + aggregation=enapter.http.api.telemetry.Aggregation(aggregation.upper()), ) return models.HistoricalTelemetry.from_domain(telemetry) From 460430a44c97d30cd5a00ab5d9f2b08fe1295c19 Mon Sep 17 00:00:00 2001 From: valera Date: Sun, 12 Apr 2026 22:35:50 +0400 Subject: [PATCH 03/14] feat(telemetry): add get_telemetry_extremes MCP tool MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Expose a dedicated tool for retrieving real per-attribute min/max over a time period. Under the hood, it issues two parallel historical telemetry queries with Aggregation.MIN/MAX and granularity equal to the full period, so Platform API returns one scalar per attribute computed over raw datapoints (rollup tables store _min/_max columns separately, so extremes are preserved even on long periods). Returning extremes as a dedicated tool — rather than asking callers to craft get_historical_telemetry(aggregation=min, granularity=period) themselves — encodes the trick into the contract and produces a compact scalar response instead of a series to parse. --- .../core/application_server.py | 52 +++++++++++++++++++ src/enapter_mcp_server/domain/__init__.py | 3 ++ .../domain/telemetry_extremes.py | 13 +++++ src/enapter_mcp_server/mcp/models/__init__.py | 3 ++ .../mcp/models/telemetry_extremes.py | 36 +++++++++++++ src/enapter_mcp_server/mcp/server.py | 27 ++++++++++ 6 files changed, 134 insertions(+) create mode 100644 src/enapter_mcp_server/domain/telemetry_extremes.py create mode 100644 src/enapter_mcp_server/mcp/models/telemetry_extremes.py diff --git a/src/enapter_mcp_server/core/application_server.py b/src/enapter_mcp_server/core/application_server.py index fa6587b..a3d21e2 100644 --- a/src/enapter_mcp_server/core/application_server.py +++ b/src/enapter_mcp_server/core/application_server.py @@ -1,5 +1,7 @@ +import asyncio import datetime import re +from typing import Any import enapter @@ -243,6 +245,56 @@ async def get_historical_telemetry( aggregation=aggregation, ) + async def get_telemetry_extremes( + self, + auth: AuthConfig, + device_id: str, + attributes: list[str], + time_from: datetime.datetime, + time_to: datetime.datetime, + ) -> domain.TelemetryExtremes: + granularity = int((time_to - time_from).total_seconds()) + + async def query( + aggregation: enapter.http.api.telemetry.Aggregation, + ) -> domain.HistoricalTelemetry: + return await self._enapter_api.get_historical_telemetry( + auth, + device_id, + attributes, + time_from, + time_to, + granularity, + aggregation=aggregation, + ) + + min_ts, max_ts = await asyncio.gather( + query(enapter.http.api.telemetry.Aggregation.MIN), + query(enapter.http.api.telemetry.Aggregation.MAX), + ) + + # Platform API may return 1-2 points per attribute when PG time_bucket + # boundaries don't align with time_from; reduce locally to one scalar. + def reduce(ts: domain.HistoricalTelemetry, reducer: Any) -> dict[str, Any]: + out: dict[str, Any] = {} + for attr, values in ts.values.items(): + non_null = [v for v in values if v is not None] + out[attr] = reducer(non_null) if non_null else None + return out + + min_by_attr = reduce(min_ts, min) + max_by_attr = reduce(max_ts, max) + + return domain.TelemetryExtremes( + values={ + attr: domain.AttributeExtremes( + min=min_by_attr.get(attr), + max=max_by_attr.get(attr), + ) + for attr in attributes + } + ) + async def search_command_executions( self, auth: AuthConfig, diff --git a/src/enapter_mcp_server/domain/__init__.py b/src/enapter_mcp_server/domain/__init__.py index 9a6cf66..2b5b047 100644 --- a/src/enapter_mcp_server/domain/__init__.py +++ b/src/enapter_mcp_server/domain/__init__.py @@ -17,10 +17,12 @@ from .property_declaration import PropertyDeclaration from .site import Site from .telemetry_attribute_declaration import TelemetryAttributeDeclaration +from .telemetry_extremes import AttributeExtremes, TelemetryExtremes __all__ = [ "AlertDeclaration", "AlertSeverity", + "AttributeExtremes", "BlueprintSection", "BlueprintSummary", "CommandArgumentDeclaration", @@ -38,4 +40,5 @@ "PropertyDeclaration", "Site", "TelemetryAttributeDeclaration", + "TelemetryExtremes", ] diff --git a/src/enapter_mcp_server/domain/telemetry_extremes.py b/src/enapter_mcp_server/domain/telemetry_extremes.py new file mode 100644 index 0000000..53b7976 --- /dev/null +++ b/src/enapter_mcp_server/domain/telemetry_extremes.py @@ -0,0 +1,13 @@ +import dataclasses +from typing import Any + + +@dataclasses.dataclass(frozen=True, kw_only=True) +class AttributeExtremes: + min: Any + max: Any + + +@dataclasses.dataclass(frozen=True, kw_only=True) +class TelemetryExtremes: + values: dict[str, AttributeExtremes] diff --git a/src/enapter_mcp_server/mcp/models/__init__.py b/src/enapter_mcp_server/mcp/models/__init__.py index 81f4858..5052764 100644 --- a/src/enapter_mcp_server/mcp/models/__init__.py +++ b/src/enapter_mcp_server/mcp/models/__init__.py @@ -17,10 +17,12 @@ from .site import Site from .telemetry_aggregation import TelemetryAggregation from .telemetry_attribute_declaration import TelemetryAttributeDeclaration +from .telemetry_extremes import AttributeExtremes, TelemetryExtremes __all__ = [ "AlertDeclaration", "AlertSeverity", + "AttributeExtremes", "BlueprintSection", "BlueprintSummary", "CommandArgumentDeclaration", @@ -38,4 +40,5 @@ "Site", "TelemetryAggregation", "TelemetryAttributeDeclaration", + "TelemetryExtremes", ] diff --git a/src/enapter_mcp_server/mcp/models/telemetry_extremes.py b/src/enapter_mcp_server/mcp/models/telemetry_extremes.py new file mode 100644 index 0000000..e33e561 --- /dev/null +++ b/src/enapter_mcp_server/mcp/models/telemetry_extremes.py @@ -0,0 +1,36 @@ +from typing import Any, Self + +import pydantic + +from enapter_mcp_server import domain + + +class AttributeExtremes(pydantic.BaseModel): + """Real min/max of a telemetry attribute over a time period.""" + + min: Any + max: Any + + @classmethod + def from_domain(cls, extremes: domain.AttributeExtremes) -> Self: + return cls(min=extremes.min, max=extremes.max) + + +class TelemetryExtremes(pydantic.BaseModel): + """Per-attribute real min/max over a time period. + + Unlike `get_historical_telemetry` (which returns a time series of + bucket-level averages), these extremes are computed over the raw + datapoints across the entire period — so short dropouts or spikes + are preserved and not smoothed away. + """ + + values: dict[str, AttributeExtremes] + + @classmethod + def from_domain(cls, extremes: domain.TelemetryExtremes) -> Self: + return cls( + values={ + k: AttributeExtremes.from_domain(v) for k, v in extremes.values.items() + } + ) diff --git a/src/enapter_mcp_server/mcp/server.py b/src/enapter_mcp_server/mcp/server.py index 18922c3..161f307 100644 --- a/src/enapter_mcp_server/mcp/server.py +++ b/src/enapter_mcp_server/mcp/server.py @@ -126,6 +126,7 @@ def _register_tools(self, fastmcp_server: fastmcp.FastMCP) -> None: (self.search_command_executions, "Search Command Executions"), (self.read_blueprint, "Read Blueprint"), (self.get_historical_telemetry, "Get Historical Telemetry"), + (self.get_telemetry_extremes, "Get Telemetry Extremes"), ] for tool, title in read_only_tools: fastmcp_server.tool( @@ -328,6 +329,32 @@ async def get_historical_telemetry( ) return models.HistoricalTelemetry.from_domain(telemetry) + async def get_telemetry_extremes( + self, + device_id: str, + attributes: list[str], + time_from: datetime.datetime, + time_to: datetime.datetime, + ) -> models.TelemetryExtremes: + """Retrieve true per-attribute min/max over the given time period. + + Computed over the raw datapoints across the entire period, so short + dropouts or spikes are preserved — unlike `get_historical_telemetry`, + whose bucket-level averages smooth them away. + + Use this tool when you need real extremes (e.g. detecting a momentary + power dropout to zero that would be hidden by 5-minute averages). + """ + auth = await self._get_auth_config() + extremes = await self._app.get_telemetry_extremes( + auth=auth, + device_id=device_id, + attributes=attributes, + time_from=time_from, + time_to=time_to, + ) + return models.TelemetryExtremes.from_domain(extremes) + async def _get_auth_config(self) -> core.AuthConfig: if self._config.oauth_proxy is None: headers = fastmcp.server.dependencies.get_http_headers() From a4314cea9ef57ce57a783d2ed3595ad0c8354e55 Mon Sep 17 00:00:00 2001 From: valera Date: Sun, 12 Apr 2026 22:35:50 +0400 Subject: [PATCH 04/14] test(telemetry): update MockEnapterAPI for aggregation parameter MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Follow-up to the aggregation plumbing — the mock's get_historical_telemetry signature needed the new optional aggregation parameter to satisfy the EnapterAPI protocol under mypy. --- tests/unit/core/test_application_server.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/unit/core/test_application_server.py b/tests/unit/core/test_application_server.py index 4992a04..bcb1941 100644 --- a/tests/unit/core/test_application_server.py +++ b/tests/unit/core/test_application_server.py @@ -102,6 +102,7 @@ async def get_historical_telemetry( time_from: datetime.datetime, time_to: datetime.datetime, granularity: int, + aggregation: enapter.http.api.telemetry.Aggregation | None = None, ) -> domain.HistoricalTelemetry: if self._historical_telemetry is None: raise NotImplementedError() From 671d38d43f0c612dab5866cc6dd56bcfd4eca573 Mon Sep 17 00:00:00 2001 From: valera Date: Mon, 13 Apr 2026 01:19:38 +0400 Subject: [PATCH 05/14] feat(telemetry): extend extremes to stats with avg/last and rename tool Rename the MCP tool `get_telemetry_extremes` to `get_historical_telemetry_stats` to align with the `get_historical_telemetry_*` family naming, so all period-based telemetry tools sort together in the tools catalog. Extend the returned per-attribute values from just min/max to min/max/avg/last via two additional parallel Platform API queries (AVG, LAST). All four remain exact over raw datapoints for periods under 8 hours; for longer periods min, max and last stay exact while avg becomes approximate (driven by Platform API pre-aggregation tiers). Rename the corresponding domain and model types for consistency: - TelemetryExtremes -> HistoricalTelemetryStats - AttributeExtremes -> HistoricalTelemetryAttributeStats --- .../core/application_server.py | 16 ++++--- src/enapter_mcp_server/domain/__init__.py | 9 ++-- ...remes.py => historical_telemetry_stats.py} | 8 ++-- src/enapter_mcp_server/mcp/models/__init__.py | 9 ++-- .../mcp/models/historical_telemetry_stats.py | 44 +++++++++++++++++++ .../mcp/models/telemetry_extremes.py | 36 --------------- src/enapter_mcp_server/mcp/server.py | 34 +++++++------- 7 files changed, 87 insertions(+), 69 deletions(-) rename src/enapter_mcp_server/domain/{telemetry_extremes.py => historical_telemetry_stats.py} (52%) create mode 100644 src/enapter_mcp_server/mcp/models/historical_telemetry_stats.py delete mode 100644 src/enapter_mcp_server/mcp/models/telemetry_extremes.py diff --git a/src/enapter_mcp_server/core/application_server.py b/src/enapter_mcp_server/core/application_server.py index a3d21e2..cbd0d16 100644 --- a/src/enapter_mcp_server/core/application_server.py +++ b/src/enapter_mcp_server/core/application_server.py @@ -245,14 +245,14 @@ async def get_historical_telemetry( aggregation=aggregation, ) - async def get_telemetry_extremes( + async def get_historical_telemetry_stats( self, auth: AuthConfig, device_id: str, attributes: list[str], time_from: datetime.datetime, time_to: datetime.datetime, - ) -> domain.TelemetryExtremes: + ) -> domain.HistoricalTelemetryStats: granularity = int((time_to - time_from).total_seconds()) async def query( @@ -268,9 +268,11 @@ async def query( aggregation=aggregation, ) - min_ts, max_ts = await asyncio.gather( + min_ts, max_ts, avg_ts, last_ts = await asyncio.gather( query(enapter.http.api.telemetry.Aggregation.MIN), query(enapter.http.api.telemetry.Aggregation.MAX), + query(enapter.http.api.telemetry.Aggregation.AVG), + query(enapter.http.api.telemetry.Aggregation.LAST), ) # Platform API may return 1-2 points per attribute when PG time_bucket @@ -284,12 +286,16 @@ def reduce(ts: domain.HistoricalTelemetry, reducer: Any) -> dict[str, Any]: min_by_attr = reduce(min_ts, min) max_by_attr = reduce(max_ts, max) + avg_by_attr = reduce(avg_ts, lambda xs: sum(xs) / len(xs)) + last_by_attr = reduce(last_ts, lambda xs: xs[-1]) - return domain.TelemetryExtremes( + return domain.HistoricalTelemetryStats( values={ - attr: domain.AttributeExtremes( + attr: domain.HistoricalTelemetryAttributeStats( min=min_by_attr.get(attr), max=max_by_attr.get(attr), + avg=avg_by_attr.get(attr), + last=last_by_attr.get(attr), ) for attr in attributes } diff --git a/src/enapter_mcp_server/domain/__init__.py b/src/enapter_mcp_server/domain/__init__.py index 2b5b047..c3cdc8e 100644 --- a/src/enapter_mcp_server/domain/__init__.py +++ b/src/enapter_mcp_server/domain/__init__.py @@ -14,15 +14,17 @@ from .device_type import DeviceType from .device_view import DeviceView from .historical_telemetry import HistoricalTelemetry +from .historical_telemetry_stats import ( + HistoricalTelemetryAttributeStats, + HistoricalTelemetryStats, +) from .property_declaration import PropertyDeclaration from .site import Site from .telemetry_attribute_declaration import TelemetryAttributeDeclaration -from .telemetry_extremes import AttributeExtremes, TelemetryExtremes __all__ = [ "AlertDeclaration", "AlertSeverity", - "AttributeExtremes", "BlueprintSection", "BlueprintSummary", "CommandArgumentDeclaration", @@ -37,8 +39,9 @@ "DeviceType", "DeviceView", "HistoricalTelemetry", + "HistoricalTelemetryAttributeStats", + "HistoricalTelemetryStats", "PropertyDeclaration", "Site", "TelemetryAttributeDeclaration", - "TelemetryExtremes", ] diff --git a/src/enapter_mcp_server/domain/telemetry_extremes.py b/src/enapter_mcp_server/domain/historical_telemetry_stats.py similarity index 52% rename from src/enapter_mcp_server/domain/telemetry_extremes.py rename to src/enapter_mcp_server/domain/historical_telemetry_stats.py index 53b7976..85b85d5 100644 --- a/src/enapter_mcp_server/domain/telemetry_extremes.py +++ b/src/enapter_mcp_server/domain/historical_telemetry_stats.py @@ -3,11 +3,13 @@ @dataclasses.dataclass(frozen=True, kw_only=True) -class AttributeExtremes: +class HistoricalTelemetryAttributeStats: min: Any max: Any + avg: Any + last: Any @dataclasses.dataclass(frozen=True, kw_only=True) -class TelemetryExtremes: - values: dict[str, AttributeExtremes] +class HistoricalTelemetryStats: + values: dict[str, HistoricalTelemetryAttributeStats] diff --git a/src/enapter_mcp_server/mcp/models/__init__.py b/src/enapter_mcp_server/mcp/models/__init__.py index 5052764..39c6693 100644 --- a/src/enapter_mcp_server/mcp/models/__init__.py +++ b/src/enapter_mcp_server/mcp/models/__init__.py @@ -13,16 +13,18 @@ from .device_type import DeviceType from .device_view import DeviceView from .historical_telemetry import HistoricalTelemetry +from .historical_telemetry_stats import ( + HistoricalTelemetryAttributeStats, + HistoricalTelemetryStats, +) from .property_declaration import PropertyDeclaration from .site import Site from .telemetry_aggregation import TelemetryAggregation from .telemetry_attribute_declaration import TelemetryAttributeDeclaration -from .telemetry_extremes import AttributeExtremes, TelemetryExtremes __all__ = [ "AlertDeclaration", "AlertSeverity", - "AttributeExtremes", "BlueprintSection", "BlueprintSummary", "CommandArgumentDeclaration", @@ -36,9 +38,10 @@ "DeviceType", "DeviceView", "HistoricalTelemetry", + "HistoricalTelemetryAttributeStats", + "HistoricalTelemetryStats", "PropertyDeclaration", "Site", "TelemetryAggregation", "TelemetryAttributeDeclaration", - "TelemetryExtremes", ] diff --git a/src/enapter_mcp_server/mcp/models/historical_telemetry_stats.py b/src/enapter_mcp_server/mcp/models/historical_telemetry_stats.py new file mode 100644 index 0000000..7618a54 --- /dev/null +++ b/src/enapter_mcp_server/mcp/models/historical_telemetry_stats.py @@ -0,0 +1,44 @@ +from typing import Any, Self + +import pydantic + +from enapter_mcp_server import domain + + +class HistoricalTelemetryAttributeStats(pydantic.BaseModel): + """Per-attribute min/max/avg/last over a time period.""" + + min: Any + max: Any + avg: Any + last: Any + + @classmethod + def from_domain(cls, stats: domain.HistoricalTelemetryAttributeStats) -> Self: + return cls( + min=stats.min, + max=stats.max, + avg=stats.avg, + last=stats.last, + ) + + +class HistoricalTelemetryStats(pydantic.BaseModel): + """Per-attribute min/max/avg/last over a time period. + + Unlike `get_historical_telemetry` (which returns a time series of + bucket-level aggregates), these values are computed over the raw + datapoints across the entire period — so short dropouts or spikes + are preserved and not smoothed away. + """ + + values: dict[str, HistoricalTelemetryAttributeStats] + + @classmethod + def from_domain(cls, stats: domain.HistoricalTelemetryStats) -> Self: + return cls( + values={ + k: HistoricalTelemetryAttributeStats.from_domain(v) + for k, v in stats.values.items() + } + ) diff --git a/src/enapter_mcp_server/mcp/models/telemetry_extremes.py b/src/enapter_mcp_server/mcp/models/telemetry_extremes.py deleted file mode 100644 index e33e561..0000000 --- a/src/enapter_mcp_server/mcp/models/telemetry_extremes.py +++ /dev/null @@ -1,36 +0,0 @@ -from typing import Any, Self - -import pydantic - -from enapter_mcp_server import domain - - -class AttributeExtremes(pydantic.BaseModel): - """Real min/max of a telemetry attribute over a time period.""" - - min: Any - max: Any - - @classmethod - def from_domain(cls, extremes: domain.AttributeExtremes) -> Self: - return cls(min=extremes.min, max=extremes.max) - - -class TelemetryExtremes(pydantic.BaseModel): - """Per-attribute real min/max over a time period. - - Unlike `get_historical_telemetry` (which returns a time series of - bucket-level averages), these extremes are computed over the raw - datapoints across the entire period — so short dropouts or spikes - are preserved and not smoothed away. - """ - - values: dict[str, AttributeExtremes] - - @classmethod - def from_domain(cls, extremes: domain.TelemetryExtremes) -> Self: - return cls( - values={ - k: AttributeExtremes.from_domain(v) for k, v in extremes.values.items() - } - ) diff --git a/src/enapter_mcp_server/mcp/server.py b/src/enapter_mcp_server/mcp/server.py index 161f307..789aaa8 100644 --- a/src/enapter_mcp_server/mcp/server.py +++ b/src/enapter_mcp_server/mcp/server.py @@ -126,7 +126,7 @@ def _register_tools(self, fastmcp_server: fastmcp.FastMCP) -> None: (self.search_command_executions, "Search Command Executions"), (self.read_blueprint, "Read Blueprint"), (self.get_historical_telemetry, "Get Historical Telemetry"), - (self.get_telemetry_extremes, "Get Telemetry Extremes"), + (self.get_historical_telemetry_stats, "Get Historical Telemetry Stats"), ] for tool, title in read_only_tools: fastmcp_server.tool( @@ -306,16 +306,14 @@ async def get_historical_telemetry( granularity: int = 60 * 60, aggregation: models.TelemetryAggregation = "auto", ) -> models.HistoricalTelemetry: - """Retrieve aggregated telemetry data. + """Retrieve telemetry aggregated into time buckets over a time period. - Most devices send telemetry data once per second. To reduce the amount - of data transferred, the `granularity` parameter can be used to - aggregate data over a specified interval (in seconds). For example, a - granularity of 3600 seconds (1 hour) will return hourly averages of the - telemetry data. + `granularity` (seconds) sets bucket size — e.g. 3600 = hourly. + `aggregation` picks the reducer per bucket: `auto` adapts per attribute + type; override with `avg`/`min`/`max` (numeric), `bool_or` (boolean), + or `last` (any). - The `aggregation` parameter controls how datapoints within a bucket - are combined. `auto` picks a sensible per-attribute default. + For a single set of raw min/max/avg/last, use `get_historical_telemetry_stats`. """ auth = await self._get_auth_config() telemetry = await self._app.get_historical_telemetry( @@ -329,31 +327,29 @@ async def get_historical_telemetry( ) return models.HistoricalTelemetry.from_domain(telemetry) - async def get_telemetry_extremes( + async def get_historical_telemetry_stats( self, device_id: str, attributes: list[str], time_from: datetime.datetime, time_to: datetime.datetime, - ) -> models.TelemetryExtremes: - """Retrieve true per-attribute min/max over the given time period. + ) -> models.HistoricalTelemetryStats: + """Retrieve per-attribute min/max/avg/last over a time period. - Computed over the raw datapoints across the entire period, so short - dropouts or spikes are preserved — unlike `get_historical_telemetry`, - whose bucket-level averages smooth them away. + Computed over raw datapoints — brief dropouts and spikes are preserved, + unlike the bucket averages in `get_historical_telemetry`. - Use this tool when you need real extremes (e.g. detecting a momentary - power dropout to zero that would be hidden by 5-minute averages). + Numeric attributes only. """ auth = await self._get_auth_config() - extremes = await self._app.get_telemetry_extremes( + stats = await self._app.get_historical_telemetry_stats( auth=auth, device_id=device_id, attributes=attributes, time_from=time_from, time_to=time_to, ) - return models.TelemetryExtremes.from_domain(extremes) + return models.HistoricalTelemetryStats.from_domain(stats) async def _get_auth_config(self) -> core.AuthConfig: if self._config.oauth_proxy is None: From d4300b851039d545e2fb34ead307dce7c0580c95 Mon Sep 17 00:00:00 2001 From: valera Date: Mon, 13 Apr 2026 01:31:47 +0400 Subject: [PATCH 06/14] docs(telemetry): document stats precision across period tiers MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Tell LLMs that min/max/last are always exact while avg becomes approximate for periods beyond 8 hours, with concrete drift estimates tied to Platform API pre-aggregation tiers (8h raw / 30d minute-10min / beyond hourly). This prevents agents from trying to work around the drift via windowing or pagination — the underlying data simply isn't stored at higher resolution. --- src/enapter_mcp_server/mcp/server.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/enapter_mcp_server/mcp/server.py b/src/enapter_mcp_server/mcp/server.py index 789aaa8..0db2535 100644 --- a/src/enapter_mcp_server/mcp/server.py +++ b/src/enapter_mcp_server/mcp/server.py @@ -340,6 +340,10 @@ async def get_historical_telemetry_stats( unlike the bucket averages in `get_historical_telemetry`. Numeric attributes only. + + `min`/`max`/`last` are exact for any period. `avg` is exact under 8h; + typical drift <1% for 8h–30 days, up to several % for longer periods + (read from pre-aggregated data). """ auth = await self._get_auth_config() stats = await self._app.get_historical_telemetry_stats( From 48aeae74e2269393c1646d73498b688b296d3ccb Mon Sep 17 00:00:00 2001 From: valera Date: Sun, 12 Apr 2026 22:06:49 +0400 Subject: [PATCH 07/14] feat(telemetry): propagate aggregation to historical telemetry selector Allow callers to pick the aggregation function (MIN, MAX, AVG, ...) when fetching historical telemetry, instead of always relying on the SDK's default AUTO behaviour. Needed so that higher-level features can request true min/max over a period rather than extremes of pre-averaged buckets. --- src/enapter_mcp_server/core/enapter_api.py | 1 + src/enapter_mcp_server/http/enapter_api.py | 5 ++++- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/src/enapter_mcp_server/core/enapter_api.py b/src/enapter_mcp_server/core/enapter_api.py index 5fe179b..3014152 100644 --- a/src/enapter_mcp_server/core/enapter_api.py +++ b/src/enapter_mcp_server/core/enapter_api.py @@ -53,4 +53,5 @@ async def get_historical_telemetry( time_from: datetime.datetime, time_to: datetime.datetime, granularity: int, + aggregation: enapter.http.api.telemetry.Aggregation | None = None, ) -> domain.HistoricalTelemetry: ... diff --git a/src/enapter_mcp_server/http/enapter_api.py b/src/enapter_mcp_server/http/enapter_api.py index 5c51134..c876449 100644 --- a/src/enapter_mcp_server/http/enapter_api.py +++ b/src/enapter_mcp_server/http/enapter_api.py @@ -100,6 +100,7 @@ async def get_historical_telemetry( time_from: datetime.datetime, time_to: datetime.datetime, granularity: int, + aggregation: enapter.http.api.telemetry.Aggregation | None = None, ) -> domain.HistoricalTelemetry: async with self._new_client(auth) as client: telemetry = await client.telemetry.wide_timeseries( @@ -108,7 +109,9 @@ async def get_historical_telemetry( granularity=granularity, selectors=[ enapter.http.api.telemetry.Selector( - device=device_id, attributes=attributes + device=device_id, + attributes=attributes, + aggregation=aggregation, ) ], ) From 6c6fd2d68019e7e61433d547d2125ee703cfdb67 Mon Sep 17 00:00:00 2001 From: valera Date: Sun, 12 Apr 2026 22:32:31 +0400 Subject: [PATCH 08/14] feat(telemetry): expose aggregation on get_historical_telemetry MCP tool Let clients pick per-bucket aggregation (avg/min/max/last/bool_or) instead of always getting the server default. Enables use cases like "daily minimum voltage series" or "hourly peak demand" from the MCP surface. Omitting the parameter preserves prior behaviour. --- src/enapter_mcp_server/core/application_server.py | 11 ++++++++++- src/enapter_mcp_server/mcp/models/__init__.py | 2 ++ .../mcp/models/telemetry_aggregation.py | 3 +++ src/enapter_mcp_server/mcp/server.py | 5 +++++ 4 files changed, 20 insertions(+), 1 deletion(-) create mode 100644 src/enapter_mcp_server/mcp/models/telemetry_aggregation.py diff --git a/src/enapter_mcp_server/core/application_server.py b/src/enapter_mcp_server/core/application_server.py index 84bfc29..fa6587b 100644 --- a/src/enapter_mcp_server/core/application_server.py +++ b/src/enapter_mcp_server/core/application_server.py @@ -1,6 +1,8 @@ import datetime import re +import enapter + from enapter_mcp_server import domain from .auth_config import AuthConfig @@ -229,9 +231,16 @@ async def get_historical_telemetry( time_from: datetime.datetime, time_to: datetime.datetime, granularity: int, + aggregation: enapter.http.api.telemetry.Aggregation | None = None, ) -> domain.HistoricalTelemetry: return await self._enapter_api.get_historical_telemetry( - auth, device_id, attributes, time_from, time_to, granularity + auth, + device_id, + attributes, + time_from, + time_to, + granularity, + aggregation=aggregation, ) async def search_command_executions( diff --git a/src/enapter_mcp_server/mcp/models/__init__.py b/src/enapter_mcp_server/mcp/models/__init__.py index fa14b10..81f4858 100644 --- a/src/enapter_mcp_server/mcp/models/__init__.py +++ b/src/enapter_mcp_server/mcp/models/__init__.py @@ -15,6 +15,7 @@ from .historical_telemetry import HistoricalTelemetry from .property_declaration import PropertyDeclaration from .site import Site +from .telemetry_aggregation import TelemetryAggregation from .telemetry_attribute_declaration import TelemetryAttributeDeclaration __all__ = [ @@ -35,5 +36,6 @@ "HistoricalTelemetry", "PropertyDeclaration", "Site", + "TelemetryAggregation", "TelemetryAttributeDeclaration", ] diff --git a/src/enapter_mcp_server/mcp/models/telemetry_aggregation.py b/src/enapter_mcp_server/mcp/models/telemetry_aggregation.py new file mode 100644 index 0000000..7f7aa36 --- /dev/null +++ b/src/enapter_mcp_server/mcp/models/telemetry_aggregation.py @@ -0,0 +1,3 @@ +from typing import Literal + +TelemetryAggregation = Literal["auto", "avg", "min", "max", "last", "bool_or"] diff --git a/src/enapter_mcp_server/mcp/server.py b/src/enapter_mcp_server/mcp/server.py index 204d8d0..18922c3 100644 --- a/src/enapter_mcp_server/mcp/server.py +++ b/src/enapter_mcp_server/mcp/server.py @@ -303,6 +303,7 @@ async def get_historical_telemetry( time_from: datetime.datetime, time_to: datetime.datetime, granularity: int = 60 * 60, + aggregation: models.TelemetryAggregation = "auto", ) -> models.HistoricalTelemetry: """Retrieve aggregated telemetry data. @@ -311,6 +312,9 @@ async def get_historical_telemetry( aggregate data over a specified interval (in seconds). For example, a granularity of 3600 seconds (1 hour) will return hourly averages of the telemetry data. + + The `aggregation` parameter controls how datapoints within a bucket + are combined. `auto` picks a sensible per-attribute default. """ auth = await self._get_auth_config() telemetry = await self._app.get_historical_telemetry( @@ -320,6 +324,7 @@ async def get_historical_telemetry( time_from=time_from, time_to=time_to, granularity=granularity, + aggregation=enapter.http.api.telemetry.Aggregation(aggregation.upper()), ) return models.HistoricalTelemetry.from_domain(telemetry) From 0fa8ad45ff786773dca34027a47a33a06970a62d Mon Sep 17 00:00:00 2001 From: valera Date: Sun, 12 Apr 2026 22:35:50 +0400 Subject: [PATCH 09/14] test(telemetry): update MockEnapterAPI for aggregation parameter MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Follow-up to the aggregation plumbing — the mock's get_historical_telemetry signature needed the new optional aggregation parameter to satisfy the EnapterAPI protocol under mypy. --- tests/unit/core/test_application_server.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/unit/core/test_application_server.py b/tests/unit/core/test_application_server.py index 4992a04..bcb1941 100644 --- a/tests/unit/core/test_application_server.py +++ b/tests/unit/core/test_application_server.py @@ -102,6 +102,7 @@ async def get_historical_telemetry( time_from: datetime.datetime, time_to: datetime.datetime, granularity: int, + aggregation: enapter.http.api.telemetry.Aggregation | None = None, ) -> domain.HistoricalTelemetry: if self._historical_telemetry is None: raise NotImplementedError() From 1c94f17d8552a6b58d2621db8295c0d736c4fae4 Mon Sep 17 00:00:00 2001 From: valera Date: Mon, 13 Apr 2026 14:57:18 +0400 Subject: [PATCH 10/14] refactor(telemetry): drop dead None default on aggregation parameter --- src/enapter_mcp_server/core/application_server.py | 4 ++-- src/enapter_mcp_server/core/enapter_api.py | 2 +- src/enapter_mcp_server/http/enapter_api.py | 2 +- tests/unit/core/test_application_server.py | 3 ++- 4 files changed, 6 insertions(+), 5 deletions(-) diff --git a/src/enapter_mcp_server/core/application_server.py b/src/enapter_mcp_server/core/application_server.py index fa6587b..b46a6a8 100644 --- a/src/enapter_mcp_server/core/application_server.py +++ b/src/enapter_mcp_server/core/application_server.py @@ -231,7 +231,7 @@ async def get_historical_telemetry( time_from: datetime.datetime, time_to: datetime.datetime, granularity: int, - aggregation: enapter.http.api.telemetry.Aggregation | None = None, + aggregation: enapter.http.api.telemetry.Aggregation, ) -> domain.HistoricalTelemetry: return await self._enapter_api.get_historical_telemetry( auth, @@ -240,7 +240,7 @@ async def get_historical_telemetry( time_from, time_to, granularity, - aggregation=aggregation, + aggregation, ) async def search_command_executions( diff --git a/src/enapter_mcp_server/core/enapter_api.py b/src/enapter_mcp_server/core/enapter_api.py index 3014152..c65dd52 100644 --- a/src/enapter_mcp_server/core/enapter_api.py +++ b/src/enapter_mcp_server/core/enapter_api.py @@ -53,5 +53,5 @@ async def get_historical_telemetry( time_from: datetime.datetime, time_to: datetime.datetime, granularity: int, - aggregation: enapter.http.api.telemetry.Aggregation | None = None, + aggregation: enapter.http.api.telemetry.Aggregation, ) -> domain.HistoricalTelemetry: ... diff --git a/src/enapter_mcp_server/http/enapter_api.py b/src/enapter_mcp_server/http/enapter_api.py index c876449..2b75852 100644 --- a/src/enapter_mcp_server/http/enapter_api.py +++ b/src/enapter_mcp_server/http/enapter_api.py @@ -100,7 +100,7 @@ async def get_historical_telemetry( time_from: datetime.datetime, time_to: datetime.datetime, granularity: int, - aggregation: enapter.http.api.telemetry.Aggregation | None = None, + aggregation: enapter.http.api.telemetry.Aggregation, ) -> domain.HistoricalTelemetry: async with self._new_client(auth) as client: telemetry = await client.telemetry.wide_timeseries( diff --git a/tests/unit/core/test_application_server.py b/tests/unit/core/test_application_server.py index bcb1941..27c0a82 100644 --- a/tests/unit/core/test_application_server.py +++ b/tests/unit/core/test_application_server.py @@ -102,7 +102,7 @@ async def get_historical_telemetry( time_from: datetime.datetime, time_to: datetime.datetime, granularity: int, - aggregation: enapter.http.api.telemetry.Aggregation | None = None, + aggregation: enapter.http.api.telemetry.Aggregation, ) -> domain.HistoricalTelemetry: if self._historical_telemetry is None: raise NotImplementedError() @@ -746,6 +746,7 @@ async def test_get_historical_telemetry(self) -> None: datetime.datetime.now(), datetime.datetime.now(), 60, + enapter.http.api.telemetry.Aggregation.AUTO, ) assert result == historical From ea498026fb077461a55e80b29520fb01f6562fa8 Mon Sep 17 00:00:00 2001 From: valera Date: Mon, 13 Apr 2026 15:02:05 +0400 Subject: [PATCH 11/14] feat(telemetry): replace TelemetryAggregation with HistoricalTelemetryAggregation and update imports --- src/enapter_mcp_server/mcp/models/__init__.py | 4 ++-- .../mcp/models/historical_telemetry_aggregation.py | 3 +++ src/enapter_mcp_server/mcp/models/telemetry_aggregation.py | 3 --- src/enapter_mcp_server/mcp/server.py | 2 +- 4 files changed, 6 insertions(+), 6 deletions(-) create mode 100644 src/enapter_mcp_server/mcp/models/historical_telemetry_aggregation.py delete mode 100644 src/enapter_mcp_server/mcp/models/telemetry_aggregation.py diff --git a/src/enapter_mcp_server/mcp/models/__init__.py b/src/enapter_mcp_server/mcp/models/__init__.py index 81f4858..e95979b 100644 --- a/src/enapter_mcp_server/mcp/models/__init__.py +++ b/src/enapter_mcp_server/mcp/models/__init__.py @@ -13,9 +13,9 @@ from .device_type import DeviceType from .device_view import DeviceView from .historical_telemetry import HistoricalTelemetry +from .historical_telemetry_aggregation import HistoricalTelemetryAggregation from .property_declaration import PropertyDeclaration from .site import Site -from .telemetry_aggregation import TelemetryAggregation from .telemetry_attribute_declaration import TelemetryAttributeDeclaration __all__ = [ @@ -34,8 +34,8 @@ "DeviceType", "DeviceView", "HistoricalTelemetry", + "HistoricalTelemetryAggregation", "PropertyDeclaration", "Site", - "TelemetryAggregation", "TelemetryAttributeDeclaration", ] diff --git a/src/enapter_mcp_server/mcp/models/historical_telemetry_aggregation.py b/src/enapter_mcp_server/mcp/models/historical_telemetry_aggregation.py new file mode 100644 index 0000000..0e4804f --- /dev/null +++ b/src/enapter_mcp_server/mcp/models/historical_telemetry_aggregation.py @@ -0,0 +1,3 @@ +from typing import Literal + +HistoricalTelemetryAggregation = Literal["auto", "avg", "min", "max", "last", "bool_or"] diff --git a/src/enapter_mcp_server/mcp/models/telemetry_aggregation.py b/src/enapter_mcp_server/mcp/models/telemetry_aggregation.py deleted file mode 100644 index 7f7aa36..0000000 --- a/src/enapter_mcp_server/mcp/models/telemetry_aggregation.py +++ /dev/null @@ -1,3 +0,0 @@ -from typing import Literal - -TelemetryAggregation = Literal["auto", "avg", "min", "max", "last", "bool_or"] diff --git a/src/enapter_mcp_server/mcp/server.py b/src/enapter_mcp_server/mcp/server.py index 18922c3..06be4d9 100644 --- a/src/enapter_mcp_server/mcp/server.py +++ b/src/enapter_mcp_server/mcp/server.py @@ -303,7 +303,7 @@ async def get_historical_telemetry( time_from: datetime.datetime, time_to: datetime.datetime, granularity: int = 60 * 60, - aggregation: models.TelemetryAggregation = "auto", + aggregation: models.HistoricalTelemetryAggregation = "auto", ) -> models.HistoricalTelemetry: """Retrieve aggregated telemetry data. From 91c37feb79f17e682a33fe67742c73f24da7bd47 Mon Sep 17 00:00:00 2001 From: valera Date: Mon, 13 Apr 2026 18:02:10 +0400 Subject: [PATCH 12/14] feat(telemetry): implement get_historical_telemetry_stats method and update ApplicationServer to use it --- .../core/application_server.py | 56 ++-------------- src/enapter_mcp_server/http/enapter_api.py | 64 +++++++++++++++++++ tests/unit/core/test_application_server.py | 14 ++++ 3 files changed, 84 insertions(+), 50 deletions(-) diff --git a/src/enapter_mcp_server/core/application_server.py b/src/enapter_mcp_server/core/application_server.py index 3272c37..a836f60 100644 --- a/src/enapter_mcp_server/core/application_server.py +++ b/src/enapter_mcp_server/core/application_server.py @@ -1,9 +1,5 @@ -import asyncio import datetime import re -from typing import Any - -import enapter import enapter @@ -255,52 +251,12 @@ async def get_historical_telemetry_stats( time_from: datetime.datetime, time_to: datetime.datetime, ) -> domain.HistoricalTelemetryStats: - granularity = int((time_to - time_from).total_seconds()) - - async def query( - aggregation: enapter.http.api.telemetry.Aggregation, - ) -> domain.HistoricalTelemetry: - return await self._enapter_api.get_historical_telemetry( - auth, - device_id, - attributes, - time_from, - time_to, - granularity, - aggregation, - ) - - min_ts, max_ts, avg_ts, last_ts = await asyncio.gather( - query(enapter.http.api.telemetry.Aggregation.MIN), - query(enapter.http.api.telemetry.Aggregation.MAX), - query(enapter.http.api.telemetry.Aggregation.AVG), - query(enapter.http.api.telemetry.Aggregation.LAST), - ) - - # Platform API may return 1-2 points per attribute when PG time_bucket - # boundaries don't align with time_from; reduce locally to one scalar. - def reduce(ts: domain.HistoricalTelemetry, reducer: Any) -> dict[str, Any]: - out: dict[str, Any] = {} - for attr, values in ts.values.items(): - non_null = [v for v in values if v is not None] - out[attr] = reducer(non_null) if non_null else None - return out - - min_by_attr = reduce(min_ts, min) - max_by_attr = reduce(max_ts, max) - avg_by_attr = reduce(avg_ts, lambda xs: sum(xs) / len(xs)) - last_by_attr = reduce(last_ts, lambda xs: xs[-1]) - - return domain.HistoricalTelemetryStats( - values={ - attr: domain.HistoricalTelemetryAttributeStats( - min=min_by_attr.get(attr), - max=max_by_attr.get(attr), - avg=avg_by_attr.get(attr), - last=last_by_attr.get(attr), - ) - for attr in attributes - } + return await self._enapter_api.get_historical_telemetry_stats( + auth, + device_id, + attributes, + time_from, + time_to, ) async def search_command_executions( diff --git a/src/enapter_mcp_server/http/enapter_api.py b/src/enapter_mcp_server/http/enapter_api.py index 2b75852..f9b8dc1 100644 --- a/src/enapter_mcp_server/http/enapter_api.py +++ b/src/enapter_mcp_server/http/enapter_api.py @@ -117,6 +117,70 @@ async def get_historical_telemetry( ) return self._data_mapper.to_historical_telemetry(telemetry) + async def get_historical_telemetry_stats( + self, + auth: core.AuthConfig, + device_id: str, + attributes: list[str], + time_from: datetime.datetime, + time_to: datetime.datetime, + ) -> domain.HistoricalTelemetryStats: + granularity = int((time_to - time_from).total_seconds()) + aggregations = [ + enapter.http.api.telemetry.Aggregation.MIN, + enapter.http.api.telemetry.Aggregation.MAX, + enapter.http.api.telemetry.Aggregation.AVG, + enapter.http.api.telemetry.Aggregation.LAST, + ] + async with self._new_client(auth) as client: + telemetry = await client.telemetry.wide_timeseries( + from_=time_from, + to=time_to, + granularity=granularity, + selectors=[ + enapter.http.api.telemetry.Selector( + device=device_id, attributes=attributes, aggregation=a + ) + for a in aggregations + ], + ) + + buckets: dict[str, dict[str, list[Any]]] = { + a.value.lower(): {} for a in aggregations + } + for column in telemetry.columns: + buckets[column.labels["aggregation"]][column.labels.telemetry] = ( + column.values + ) + + # Platform API may return 1-2 points per attribute when PG time_bucket + # boundaries don't align with time_from; reduce to one scalar. + def reduce( + by_attr: dict[str, list[Any]], reducer: Any + ) -> dict[str, Any]: + out: dict[str, Any] = {} + for attr, values in by_attr.items(): + non_null = [v for v in values if v is not None] + out[attr] = reducer(non_null) if non_null else None + return out + + min_by_attr = reduce(buckets["min"], min) + max_by_attr = reduce(buckets["max"], max) + avg_by_attr = reduce(buckets["avg"], lambda xs: sum(xs) / len(xs)) + last_by_attr = reduce(buckets["last"], lambda xs: xs[-1]) + + return domain.HistoricalTelemetryStats( + values={ + attr: domain.HistoricalTelemetryAttributeStats( + min=min_by_attr.get(attr), + max=max_by_attr.get(attr), + avg=avg_by_attr.get(attr), + last=last_by_attr.get(attr), + ) + for attr in attributes + } + ) + @contextlib.asynccontextmanager async def _new_client( self, auth: core.AuthConfig diff --git a/tests/unit/core/test_application_server.py b/tests/unit/core/test_application_server.py index 27c0a82..1268454 100644 --- a/tests/unit/core/test_application_server.py +++ b/tests/unit/core/test_application_server.py @@ -32,6 +32,7 @@ def __init__( devices: list[core.DeviceDTO] | None = None, telemetry: dict[str, dict[str, Any]] | None = None, historical_telemetry: domain.HistoricalTelemetry | None = None, + historical_telemetry_stats: domain.HistoricalTelemetryStats | None = None, latest_telemetry_unavailable: bool = False, command_executions: dict[str, list[domain.CommandExecution]] | None = None, ): @@ -39,6 +40,7 @@ def __init__( self._devices = devices or [] self._telemetry = telemetry or {} self._historical_telemetry = historical_telemetry + self._historical_telemetry_stats = historical_telemetry_stats self._latest_telemetry_unavailable = latest_telemetry_unavailable self._command_executions = command_executions or {} self.latest_telemetry_batch_calls = 0 @@ -108,6 +110,18 @@ async def get_historical_telemetry( raise NotImplementedError() return self._historical_telemetry + async def get_historical_telemetry_stats( + self, + auth: core.AuthConfig, + device_id: str, + attributes: list[str], + time_from: datetime.datetime, + time_to: datetime.datetime, + ) -> domain.HistoricalTelemetryStats: + if self._historical_telemetry_stats is None: + raise NotImplementedError() + return self._historical_telemetry_stats + class TestApplicationServer: From 9846479f54946bf090b8ee736e7f02f59580e6ba Mon Sep 17 00:00:00 2001 From: valera Date: Mon, 13 Apr 2026 18:17:21 +0400 Subject: [PATCH 13/14] feat(telemetry): implement get_historical_telemetry_stats method and update ApplicationServer to use it --- src/enapter_mcp_server/core/enapter_api.py | 9 ++++ src/enapter_mcp_server/http/enapter_api.py | 57 +++++++++------------- 2 files changed, 32 insertions(+), 34 deletions(-) diff --git a/src/enapter_mcp_server/core/enapter_api.py b/src/enapter_mcp_server/core/enapter_api.py index c65dd52..494192e 100644 --- a/src/enapter_mcp_server/core/enapter_api.py +++ b/src/enapter_mcp_server/core/enapter_api.py @@ -55,3 +55,12 @@ async def get_historical_telemetry( granularity: int, aggregation: enapter.http.api.telemetry.Aggregation, ) -> domain.HistoricalTelemetry: ... + + async def get_historical_telemetry_stats( + self, + auth: AuthConfig, + device_id: str, + attributes: list[str], + time_from: datetime.datetime, + time_to: datetime.datetime, + ) -> domain.HistoricalTelemetryStats: ... diff --git a/src/enapter_mcp_server/http/enapter_api.py b/src/enapter_mcp_server/http/enapter_api.py index f9b8dc1..fe5e0d6 100644 --- a/src/enapter_mcp_server/http/enapter_api.py +++ b/src/enapter_mcp_server/http/enapter_api.py @@ -1,6 +1,6 @@ import contextlib import datetime -from typing import Any, AsyncGenerator, Self +from typing import Any, AsyncGenerator, Callable, Self import enapter @@ -125,13 +125,16 @@ async def get_historical_telemetry_stats( time_from: datetime.datetime, time_to: datetime.datetime, ) -> domain.HistoricalTelemetryStats: + # Platform API may return 1-2 points per attribute when PG time_bucket + # boundaries don't align with time_from; the reducer collapses them + # into a single scalar consistent with the requested aggregation. + reducers: dict[str, Callable[[list[Any]], Any]] = { + "min": min, + "max": max, + "avg": lambda xs: sum(xs) / len(xs), + "last": lambda xs: xs[-1], + } granularity = int((time_to - time_from).total_seconds()) - aggregations = [ - enapter.http.api.telemetry.Aggregation.MIN, - enapter.http.api.telemetry.Aggregation.MAX, - enapter.http.api.telemetry.Aggregation.AVG, - enapter.http.api.telemetry.Aggregation.LAST, - ] async with self._new_client(auth) as client: telemetry = await client.telemetry.wide_timeseries( from_=time_from, @@ -139,43 +142,29 @@ async def get_historical_telemetry_stats( granularity=granularity, selectors=[ enapter.http.api.telemetry.Selector( - device=device_id, attributes=attributes, aggregation=a + device=device_id, + attributes=attributes, + aggregation=enapter.http.api.telemetry.Aggregation(agg.upper()), ) - for a in aggregations + for agg in reducers ], ) - buckets: dict[str, dict[str, list[Any]]] = { - a.value.lower(): {} for a in aggregations - } + scalars: dict[str, dict[str, Any]] = {agg: {} for agg in reducers} for column in telemetry.columns: - buckets[column.labels["aggregation"]][column.labels.telemetry] = ( - column.values + agg = column.labels["aggregation"] + non_null = [v for v in column.values if v is not None] + scalars[agg][column.labels.telemetry] = ( + reducers[agg](non_null) if non_null else None ) - # Platform API may return 1-2 points per attribute when PG time_bucket - # boundaries don't align with time_from; reduce to one scalar. - def reduce( - by_attr: dict[str, list[Any]], reducer: Any - ) -> dict[str, Any]: - out: dict[str, Any] = {} - for attr, values in by_attr.items(): - non_null = [v for v in values if v is not None] - out[attr] = reducer(non_null) if non_null else None - return out - - min_by_attr = reduce(buckets["min"], min) - max_by_attr = reduce(buckets["max"], max) - avg_by_attr = reduce(buckets["avg"], lambda xs: sum(xs) / len(xs)) - last_by_attr = reduce(buckets["last"], lambda xs: xs[-1]) - return domain.HistoricalTelemetryStats( values={ attr: domain.HistoricalTelemetryAttributeStats( - min=min_by_attr.get(attr), - max=max_by_attr.get(attr), - avg=avg_by_attr.get(attr), - last=last_by_attr.get(attr), + min=scalars["min"].get(attr), + max=scalars["max"].get(attr), + avg=scalars["avg"].get(attr), + last=scalars["last"].get(attr), ) for attr in attributes } From 7fa2b559986d778e04917d51b6ad79cf49571bb3 Mon Sep 17 00:00:00 2001 From: valera Date: Mon, 13 Apr 2026 18:17:34 +0400 Subject: [PATCH 14/14] refactor(telemetry): remove unused TelemetryAggregation --- src/enapter_mcp_server/mcp/models/__init__.py | 3 +-- src/enapter_mcp_server/mcp/models/telemetry_aggregation.py | 3 --- 2 files changed, 1 insertion(+), 5 deletions(-) delete mode 100644 src/enapter_mcp_server/mcp/models/telemetry_aggregation.py diff --git a/src/enapter_mcp_server/mcp/models/__init__.py b/src/enapter_mcp_server/mcp/models/__init__.py index 2f98016..369a0c3 100644 --- a/src/enapter_mcp_server/mcp/models/__init__.py +++ b/src/enapter_mcp_server/mcp/models/__init__.py @@ -13,14 +13,13 @@ from .device_type import DeviceType from .device_view import DeviceView from .historical_telemetry import HistoricalTelemetry +from .historical_telemetry_aggregation import HistoricalTelemetryAggregation from .historical_telemetry_stats import ( HistoricalTelemetryAttributeStats, HistoricalTelemetryStats, ) -from .historical_telemetry_aggregation import HistoricalTelemetryAggregation from .property_declaration import PropertyDeclaration from .site import Site -from .telemetry_aggregation import TelemetryAggregation from .telemetry_attribute_declaration import TelemetryAttributeDeclaration __all__ = [ diff --git a/src/enapter_mcp_server/mcp/models/telemetry_aggregation.py b/src/enapter_mcp_server/mcp/models/telemetry_aggregation.py deleted file mode 100644 index 7f7aa36..0000000 --- a/src/enapter_mcp_server/mcp/models/telemetry_aggregation.py +++ /dev/null @@ -1,3 +0,0 @@ -from typing import Literal - -TelemetryAggregation = Literal["auto", "avg", "min", "max", "last", "bool_or"]