Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 26 additions & 1 deletion src/enapter_mcp_server/core/application_server.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import datetime
import re

import enapter

from enapter_mcp_server import domain

from .auth_config import AuthConfig
Expand Down Expand Up @@ -229,9 +231,32 @@ async def get_historical_telemetry(
time_from: datetime.datetime,
time_to: datetime.datetime,
granularity: int,
aggregation: enapter.http.api.telemetry.Aggregation,
) -> domain.HistoricalTelemetry:
return await self._enapter_api.get_historical_telemetry(
auth, device_id, attributes, time_from, time_to, granularity
auth,
device_id,
attributes,
time_from,
time_to,
granularity,
aggregation,
)

async def get_historical_telemetry_stats(
self,
auth: AuthConfig,
device_id: str,
attributes: list[str],
time_from: datetime.datetime,
time_to: datetime.datetime,
) -> domain.HistoricalTelemetryStats:
return await self._enapter_api.get_historical_telemetry_stats(
auth,
device_id,
attributes,
time_from,
time_to,
)

async def search_command_executions(
Expand Down
10 changes: 10 additions & 0 deletions src/enapter_mcp_server/core/enapter_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,14 @@ async def get_historical_telemetry(
time_from: datetime.datetime,
time_to: datetime.datetime,
granularity: int,
aggregation: enapter.http.api.telemetry.Aggregation,
) -> domain.HistoricalTelemetry: ...

async def get_historical_telemetry_stats(
self,
auth: AuthConfig,
device_id: str,
attributes: list[str],
time_from: datetime.datetime,
time_to: datetime.datetime,
) -> domain.HistoricalTelemetryStats: ...
6 changes: 6 additions & 0 deletions src/enapter_mcp_server/domain/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@
from .device_type import DeviceType
from .device_view import DeviceView
from .historical_telemetry import HistoricalTelemetry
from .historical_telemetry_stats import (
HistoricalTelemetryAttributeStats,
HistoricalTelemetryStats,
)
from .property_declaration import PropertyDeclaration
from .site import Site
from .telemetry_attribute_declaration import TelemetryAttributeDeclaration
Expand All @@ -35,6 +39,8 @@
"DeviceType",
"DeviceView",
"HistoricalTelemetry",
"HistoricalTelemetryAttributeStats",
"HistoricalTelemetryStats",
"PropertyDeclaration",
"Site",
"TelemetryAttributeDeclaration",
Expand Down
15 changes: 15 additions & 0 deletions src/enapter_mcp_server/domain/historical_telemetry_stats.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import dataclasses
from typing import Any


@dataclasses.dataclass(frozen=True, kw_only=True)
class HistoricalTelemetryAttributeStats:
min: Any
max: Any
avg: Any
last: Any


@dataclasses.dataclass(frozen=True, kw_only=True)
class HistoricalTelemetryStats:
values: dict[str, HistoricalTelemetryAttributeStats]
60 changes: 58 additions & 2 deletions src/enapter_mcp_server/http/enapter_api.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import contextlib
import datetime
from typing import Any, AsyncGenerator, Self
from typing import Any, AsyncGenerator, Callable, Self

import enapter

Expand Down Expand Up @@ -100,6 +100,7 @@ async def get_historical_telemetry(
time_from: datetime.datetime,
time_to: datetime.datetime,
granularity: int,
aggregation: enapter.http.api.telemetry.Aggregation,
) -> domain.HistoricalTelemetry:
async with self._new_client(auth) as client:
telemetry = await client.telemetry.wide_timeseries(
Expand All @@ -108,12 +109,67 @@ async def get_historical_telemetry(
granularity=granularity,
selectors=[
enapter.http.api.telemetry.Selector(
device=device_id, attributes=attributes
device=device_id,
attributes=attributes,
aggregation=aggregation,
)
],
)
return self._data_mapper.to_historical_telemetry(telemetry)

async def get_historical_telemetry_stats(
self,
auth: core.AuthConfig,
device_id: str,
attributes: list[str],
time_from: datetime.datetime,
time_to: datetime.datetime,
) -> domain.HistoricalTelemetryStats:
# Platform API may return 1-2 points per attribute when PG time_bucket
# boundaries don't align with time_from; the reducer collapses them
# into a single scalar consistent with the requested aggregation.
reducers: dict[str, Callable[[list[Any]], Any]] = {
"min": min,
"max": max,
"avg": lambda xs: sum(xs) / len(xs),
"last": lambda xs: xs[-1],
}
granularity = int((time_to - time_from).total_seconds())
async with self._new_client(auth) as client:
telemetry = await client.telemetry.wide_timeseries(
from_=time_from,
to=time_to,
granularity=granularity,
selectors=[
enapter.http.api.telemetry.Selector(
device=device_id,
attributes=attributes,
aggregation=enapter.http.api.telemetry.Aggregation(agg.upper()),
)
for agg in reducers
],
)

scalars: dict[str, dict[str, Any]] = {agg: {} for agg in reducers}
for column in telemetry.columns:
agg = column.labels["aggregation"]
non_null = [v for v in column.values if v is not None]
scalars[agg][column.labels.telemetry] = (
reducers[agg](non_null) if non_null else None
)

return domain.HistoricalTelemetryStats(
values={
attr: domain.HistoricalTelemetryAttributeStats(
min=scalars["min"].get(attr),
max=scalars["max"].get(attr),
avg=scalars["avg"].get(attr),
last=scalars["last"].get(attr),
)
for attr in attributes
}
)

@contextlib.asynccontextmanager
async def _new_client(
self, auth: core.AuthConfig
Expand Down
8 changes: 8 additions & 0 deletions src/enapter_mcp_server/mcp/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@
from .device_type import DeviceType
from .device_view import DeviceView
from .historical_telemetry import HistoricalTelemetry
from .historical_telemetry_aggregation import HistoricalTelemetryAggregation
from .historical_telemetry_stats import (
HistoricalTelemetryAttributeStats,
HistoricalTelemetryStats,
)
from .property_declaration import PropertyDeclaration
from .site import Site
from .telemetry_attribute_declaration import TelemetryAttributeDeclaration
Expand All @@ -33,6 +38,9 @@
"DeviceType",
"DeviceView",
"HistoricalTelemetry",
"HistoricalTelemetryAttributeStats",
"HistoricalTelemetryStats",
"HistoricalTelemetryAggregation",
"PropertyDeclaration",
"Site",
"TelemetryAttributeDeclaration",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from typing import Literal

HistoricalTelemetryAggregation = Literal["auto", "avg", "min", "max", "last", "bool_or"]
44 changes: 44 additions & 0 deletions src/enapter_mcp_server/mcp/models/historical_telemetry_stats.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
from typing import Any, Self

import pydantic

from enapter_mcp_server import domain


class HistoricalTelemetryAttributeStats(pydantic.BaseModel):
"""Per-attribute min/max/avg/last over a time period."""

min: Any
max: Any
avg: Any
last: Any

@classmethod
def from_domain(cls, stats: domain.HistoricalTelemetryAttributeStats) -> Self:
return cls(
min=stats.min,
max=stats.max,
avg=stats.avg,
last=stats.last,
)


class HistoricalTelemetryStats(pydantic.BaseModel):
"""Per-attribute min/max/avg/last over a time period.

Unlike `get_historical_telemetry` (which returns a time series of
bucket-level aggregates), these values are computed over the raw
datapoints across the entire period — so short dropouts or spikes
are preserved and not smoothed away.
"""

values: dict[str, HistoricalTelemetryAttributeStats]

@classmethod
def from_domain(cls, stats: domain.HistoricalTelemetryStats) -> Self:
return cls(
values={
k: HistoricalTelemetryAttributeStats.from_domain(v)
for k, v in stats.values.items()
}
)
39 changes: 38 additions & 1 deletion src/enapter_mcp_server/mcp/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ def _register_tools(self, fastmcp_server: fastmcp.FastMCP) -> None:
(self.search_command_executions, "Search Command Executions"),
(self.read_blueprint, "Read Blueprint"),
(self.get_historical_telemetry, "Get Historical Telemetry"),
(self.get_historical_telemetry_stats, "Get Historical Telemetry Stats"),
]
for tool, title in read_only_tools:
fastmcp_server.tool(
Expand Down Expand Up @@ -303,14 +304,21 @@ async def get_historical_telemetry(
time_from: datetime.datetime,
time_to: datetime.datetime,
granularity: int = 60 * 60,
aggregation: models.HistoricalTelemetryAggregation = "auto",
) -> models.HistoricalTelemetry:
"""Retrieve aggregated telemetry data.
"""Retrieve telemetry aggregated into time buckets over a time period.

Most devices send telemetry data once per second. To reduce the amount
of data transferred, the `granularity` parameter can be used to
aggregate data over a specified interval (in seconds). For example, a
granularity of 3600 seconds (1 hour) will return hourly averages of the
telemetry data.

`aggregation` picks the reducer per bucket: `auto` adapts per attribute
type; override with `avg`/`min`/`max` (numeric), `bool_or` (boolean),
or `last` (any).

For a single set of raw min/max/avg/last, use `get_historical_telemetry_stats`.
"""
auth = await self._get_auth_config()
telemetry = await self._app.get_historical_telemetry(
Expand All @@ -320,9 +328,38 @@ async def get_historical_telemetry(
time_from=time_from,
time_to=time_to,
granularity=granularity,
aggregation=enapter.http.api.telemetry.Aggregation(aggregation.upper()),
)
return models.HistoricalTelemetry.from_domain(telemetry)

async def get_historical_telemetry_stats(
self,
device_id: str,
attributes: list[str],
time_from: datetime.datetime,
time_to: datetime.datetime,
) -> models.HistoricalTelemetryStats:
"""Retrieve per-attribute min/max/avg/last over a time period.

Computed over raw datapoints — brief dropouts and spikes are preserved,
unlike the bucket averages in `get_historical_telemetry`.

Numeric attributes only.

`min`/`max`/`last` are exact for any period. `avg` is exact under 8h;
typical drift <1% for 8h–30 days, up to several % for longer periods
(read from pre-aggregated data).
"""
auth = await self._get_auth_config()
stats = await self._app.get_historical_telemetry_stats(
auth=auth,
device_id=device_id,
attributes=attributes,
time_from=time_from,
time_to=time_to,
)
return models.HistoricalTelemetryStats.from_domain(stats)

async def _get_auth_config(self) -> core.AuthConfig:
if self._config.oauth_proxy is None:
headers = fastmcp.server.dependencies.get_http_headers()
Expand Down
16 changes: 16 additions & 0 deletions tests/unit/core/test_application_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,15 @@ def __init__(
devices: list[core.DeviceDTO] | None = None,
telemetry: dict[str, dict[str, Any]] | None = None,
historical_telemetry: domain.HistoricalTelemetry | None = None,
historical_telemetry_stats: domain.HistoricalTelemetryStats | None = None,
latest_telemetry_unavailable: bool = False,
command_executions: dict[str, list[domain.CommandExecution]] | None = None,
):
self._sites = sites or []
self._devices = devices or []
self._telemetry = telemetry or {}
self._historical_telemetry = historical_telemetry
self._historical_telemetry_stats = historical_telemetry_stats
self._latest_telemetry_unavailable = latest_telemetry_unavailable
self._command_executions = command_executions or {}
self.latest_telemetry_batch_calls = 0
Expand Down Expand Up @@ -102,11 +104,24 @@ async def get_historical_telemetry(
time_from: datetime.datetime,
time_to: datetime.datetime,
granularity: int,
aggregation: enapter.http.api.telemetry.Aggregation,
) -> domain.HistoricalTelemetry:
if self._historical_telemetry is None:
raise NotImplementedError()
return self._historical_telemetry

async def get_historical_telemetry_stats(
self,
auth: core.AuthConfig,
device_id: str,
attributes: list[str],
time_from: datetime.datetime,
time_to: datetime.datetime,
) -> domain.HistoricalTelemetryStats:
if self._historical_telemetry_stats is None:
raise NotImplementedError()
return self._historical_telemetry_stats


class TestApplicationServer:

Expand Down Expand Up @@ -745,6 +760,7 @@ async def test_get_historical_telemetry(self) -> None:
datetime.datetime.now(),
datetime.datetime.now(),
60,
enapter.http.api.telemetry.Aggregation.AUTO,
)

assert result == historical
Expand Down
Loading