From 9875fec017a571c143b980bf32198bdbc6844c78 Mon Sep 17 00:00:00 2001 From: Claude Date: Sun, 19 Apr 2026 23:15:43 +0000 Subject: [PATCH 1/2] Add Enphase IQ Gateway (Envoy) powermeter MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Re-implementation of the stalled #245. Polls the Envoy's local HTTPS /production.json?details=1 and reports net-consumption as grid power, auto-detecting single- vs three-phase from the response. Optional Enlighten-cloud credentials seed and refresh the JWT on 401; static tokens are accepted as well. Uses aiohttp with per-instance SSL context attached via TCPConnector. VERIFY_SSL defaults to False (Envoy ships a self-signed cert with no public CA bundle) and only affects the local Envoy session — Enlighten cloud requests use a separate session with default system TLS so the user's password is never sent over an unverified connection. https://claude.ai/code/session_01NiEwZMNZm9batkSCQCaRaz --- CHANGELOG.md | 1 + README.md | 25 +++ config.ini.example | 16 ++ src/astrameter/config/config_loader.py | 17 ++ src/astrameter/powermeter/__init__.py | 2 + src/astrameter/powermeter/envoy.py | 193 ++++++++++++++++++++ src/astrameter/powermeter/envoy_test.py | 224 ++++++++++++++++++++++++ 7 files changed, 478 insertions(+) create mode 100644 src/astrameter/powermeter/envoy.py create mode 100644 src/astrameter/powermeter/envoy_test.py diff --git a/CHANGELOG.md b/CHANGELOG.md index f8ea1f26..59216c28 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ - **Added** MQTT Insights: optional `[MQTT_INSIGHTS]` section publishes internal state (grid power, targets, saturation, consumer topology, EMA poll interval) to MQTT with Home Assistant Device Discovery; per-consumer active/pause + manual target control; Shelly battery offline availability; auto-configured in the HA app when Mosquitto is installed ([#292](https://github.com/tomquist/astrameter/pull/292), [#294](https://github.com/tomquist/astrameter/pull/294), [#297](https://github.com/tomquist/astrameter/pull/297), [#300](https://github.com/tomquist/astrameter/pull/300), [#306](https://github.com/tomquist/astrameter/pull/306)). - **Added** opt-in web-based configuration editor (`WEB_CONFIG_ENABLED = True` in `[GENERAL]`) accessible at `http://:52500/config`; supports editing all config sections and keys with type-appropriate inputs, comment preservation, and a Save & Restart button ([#319](https://github.com/tomquist/astrameter/pull/319)). - **Added** HomeWizard P1 powermeter via the device WebSocket API, with optional `VERIFY_SSL` ([#231](https://github.com/tomquist/astrameter/pull/231), [#254](https://github.com/tomquist/astrameter/pull/254)). +- **Added** Enphase IQ Gateway (Envoy) powermeter via the local HTTPS `production.json` API, with optional Enlighten-cloud token acquisition and automatic refresh on 401, and auto-detection of single- vs three-phase readings ([#245](https://github.com/tomquist/astrameter/pull/245)). - **Added** SMA Energy Meter / Sunny Home Manager support via Speedwire multicast with device auto-detection and per-phase readings ([#252](https://github.com/tomquist/astrameter/pull/252)). - **Added** SML powermeter for smart meters over a local serial port (IR head), with optional per-phase OBIS overrides ([#229](https://github.com/tomquist/astrameter/pull/229)). - **Added** multi-phase support to the MQTT powermeter via `TOPICS` / `JSON_PATHS` ([#280](https://github.com/tomquist/astrameter/pull/280), [issue #136](https://github.com/tomquist/b2500-meter/issues/136)). diff --git a/README.md b/README.md index 568a459b..c6840e64 100644 --- a/README.md +++ b/README.md @@ -740,6 +740,31 @@ SERIAL = your_device_serial # THROTTLE_INTERVAL = 0 ``` +### Enphase Envoy (IQ Gateway) + +Reads grid power from an [Enphase IQ Gateway / Envoy](https://enphase.com/installers/microinverters/iq-gateway) over the local HTTPS API (`/production.json?details=1`). The reading comes from the `net-consumption` measurement (positive = grid import, negative = export). Per-phase readings are reported automatically when the gateway exposes them; otherwise the aggregate single-phase value is used. Requires consumption CTs installed on the Envoy. + +```ini +[ENVOY] +HOST = 192.168.1.120 +# Option A: pre-obtained long-lived JWT (recommended) +TOKEN = eyJ... +# Option B: let AstraMeter fetch and refresh tokens via the Enphase Enlighten cloud +# USERNAME = you@example.com +# PASSWORD = your-enphase-password +# SERIAL = 123456789012 +# Envoy ships a self-signed certificate; verification is disabled by default. +# VERIFY_SSL = False +``` + +**Token acquisition.** Generate a long-lived (~1 year) static token at . Alternatively, configure `USERNAME`/`PASSWORD`/`SERIAL` and AstraMeter will fetch a token on first use and refresh it automatically when the Envoy returns 401. + +**TLS.** `VERIFY_SSL` defaults to `False` because Enphase does not publish a CA bundle for the IQ Gateway's self-signed certificate. This option **only affects the local Envoy connection** — Enphase Enlighten cloud requests (login and token endpoints) always verify TLS using the system trust store, regardless of this setting. + +**MFA.** The auto-fetch flow does not support Enlighten accounts with multi-factor authentication enabled. Those users must supply a static `TOKEN`. + +**CT direction.** If your readings have the wrong sign (export shows as import or vice versa), one or more CTs are mounted backwards. Flip them in software with the global `POWER_MULTIPLIER = -1` (or per-phase, e.g. `POWER_MULTIPLIER = 1, -1, 1`). + ### SMA Energy Meter Reads an [SMA Energy Meter](https://www.sma.de/) (EM 1.0/2.0) or Sunny Home Manager via the **Speedwire** multicast protocol (UDP). The listener joins the default multicast group and reports per-phase active power (L1, L2, L3). Use `SERIAL_NUMBER = 0` to auto-detect the first meter seen on the network, or set the device serial to pin a specific unit. Like other UDP-based features, this requires the host to receive multicast traffic (use Docker host networking or equivalent). diff --git a/config.ini.example b/config.ini.example index 41595480..fd613ac9 100644 --- a/config.ini.example +++ b/config.ini.example @@ -303,6 +303,22 @@ THROTTLE_INTERVAL = 0 ## Per-powermeter throttling override (optional) #THROTTLE_INTERVAL = 0 +#[ENVOY] +## Enphase IQ Gateway (Envoy) via local HTTPS API +## Reads grid power from /production.json?details=1 (net-consumption). +## Auto-detects single- vs three-phase from the response. +#HOST = 192.168.1.120 +## Option A: long-lived JWT token from https://entrez.enphaseenergy.com/ +#TOKEN = eyJ... +## Option B: let AstraMeter obtain a token via the Enphase Enlighten cloud +## (auto-refreshes on 401; not compatible with MFA-enabled Enlighten accounts) +#USERNAME = you@example.com +#PASSWORD = your-enphase-password +#SERIAL = 123456789012 +## Envoy uses a self-signed certificate; verification is disabled by default. +## Affects only the local Envoy connection — cloud requests always verify TLS. +#VERIFY_SSL = False + #[SMA_ENERGY_METER] ## SMA Energy Meter / Sunny Home Manager via Speedwire multicast ## Listens for UDP multicast broadcasts and reports per-phase power (L1, L2, L3) diff --git a/src/astrameter/config/config_loader.py b/src/astrameter/config/config_loader.py index f36bdfb9..895d8302 100644 --- a/src/astrameter/config/config_loader.py +++ b/src/astrameter/config/config_loader.py @@ -13,6 +13,7 @@ from astrameter.powermeter import ( AmisReader, Emlog, + Envoy, ESPHome, HomeAssistant, HomeWizardPowermeter, @@ -58,6 +59,7 @@ JSON_HTTP_SECTION = "JSON_HTTP" TQ_EM_SECTION = "TQ_EM" HOMEWIZARD_SECTION = "HOMEWIZARD" +ENVOY_SECTION = "ENVOY" SMA_ENERGY_METER_SECTION = "SMA_ENERGY_METER" MQTT_INSIGHTS_SECTION = "MQTT_INSIGHTS" @@ -379,6 +381,8 @@ def create_powermeter( return create_json_http_powermeter(section, config) elif section.startswith(HOMEWIZARD_SECTION): return create_homewizard_powermeter(section, config) + elif section.startswith(ENVOY_SECTION): + return create_envoy_powermeter(section, config) elif section.startswith(SMA_ENERGY_METER_SECTION): return create_sma_energy_meter_powermeter(section, config) elif section.startswith("MQTT") and not section.startswith(MQTT_INSIGHTS_SECTION): @@ -661,6 +665,19 @@ def create_homewizard_powermeter( ) +def create_envoy_powermeter( + section: str, config: configparser.ConfigParser +) -> Powermeter: + return Envoy( + host=config.get(section, "HOST", fallback=""), + token=config.get(section, "TOKEN", fallback=""), + username=config.get(section, "USERNAME", fallback=""), + password=config.get(section, "PASSWORD", fallback=""), + serial=config.get(section, "SERIAL", fallback=""), + verify_ssl=config.getboolean(section, "VERIFY_SSL", fallback=False), + ) + + def create_sma_energy_meter_powermeter( section: str, config: configparser.ConfigParser ) -> Powermeter: diff --git a/src/astrameter/powermeter/__init__.py b/src/astrameter/powermeter/__init__.py index d4ebf13a..cd531bbf 100644 --- a/src/astrameter/powermeter/__init__.py +++ b/src/astrameter/powermeter/__init__.py @@ -1,6 +1,7 @@ from .amisreader import AmisReader from .base import Powermeter from .emlog import Emlog +from .envoy import Envoy from .esphome import ESPHome from .homeassistant import HomeAssistant from .homewizard import HomeWizardPowermeter @@ -31,6 +32,7 @@ "DeadbandPowermeter", "ESPHome", "Emlog", + "Envoy", "HampelPowermeter", "HomeAssistant", "HomeWizardPowermeter", diff --git a/src/astrameter/powermeter/envoy.py b/src/astrameter/powermeter/envoy.py new file mode 100644 index 00000000..60daf78b --- /dev/null +++ b/src/astrameter/powermeter/envoy.py @@ -0,0 +1,193 @@ +from __future__ import annotations + +import asyncio +import logging +import ssl +from typing import Any + +import aiohttp +from aiohttp import ClientResponseError, ClientTimeout, TCPConnector + +from .base import Powermeter + +logger = logging.getLogger("astrameter") + +ENLIGHTEN_LOGIN_URL = "https://enlighten.enphaseenergy.com/login/login.json" +ENTREZ_TOKEN_URL = "https://entrez.enphaseenergy.com/tokens" +DEFAULT_TIMEOUT_SECONDS = 10.0 + + +def _build_ssl_context(verify_ssl: bool) -> ssl.SSLContext: + ctx = ssl.create_default_context() + if not verify_ssl: + # Order matters: verify_mode=CERT_NONE requires check_hostname=False first. + ctx.check_hostname = False + ctx.verify_mode = ssl.CERT_NONE + return ctx + + +async def _obtain_token( + cloud_session: aiohttp.ClientSession, + username: str, + password: str, + serial: str, +) -> str: + async with cloud_session.post( + ENLIGHTEN_LOGIN_URL, + data={"user[email]": username, "user[password]": password}, + ) as resp: + resp.raise_for_status() + login_payload = await resp.json(content_type=None) + session_id = ( + login_payload.get("session_id") if isinstance(login_payload, dict) else None + ) + if not session_id: + message = ( + login_payload.get("message", "unknown") + if isinstance(login_payload, dict) + else "unknown" + ) + raise ValueError( + f"Envoy: Enlighten login response missing session_id (message: {message})" + ) + + async with cloud_session.post( + ENTREZ_TOKEN_URL, + json={ + "session_id": session_id, + "serial_num": serial, + "username": username, + }, + ) as resp: + resp.raise_for_status() + token = (await resp.text()).strip() + + if not token.startswith("eyJ") or token.count(".") != 2: + raise ValueError( + f"Envoy: entrez token endpoint did not return a JWT (body: {token[:200]!r})" + ) + + logger.info("Envoy: obtained new JWT token from Enlighten cloud") + return token + + +class Envoy(Powermeter): + def __init__( + self, + host: str, + token: str = "", + username: str = "", + password: str = "", + serial: str = "", + verify_ssl: bool = False, + ) -> None: + if not host: + raise ValueError("Envoy: HOST is required") + has_credentials = bool(username and password and serial) + if not token and not has_credentials: + raise ValueError("Envoy: provide either TOKEN or USERNAME/PASSWORD/SERIAL") + + self.host = host + self._username = username + self._password = password + self._serial = serial + self._has_credentials = has_credentials + self._verify_ssl = verify_ssl + self._ssl_context = _build_ssl_context(verify_ssl) + self._token = token + self._token_lock = asyncio.Lock() + self._session: aiohttp.ClientSession | None = None + self._cloud_session: aiohttp.ClientSession | None = None + + if not verify_ssl: + logger.warning( + "Envoy: TLS certificate verification is disabled for the local " + "Envoy (VERIFY_SSL=False); use only on a trusted LAN. Enphase " + "Enlighten cloud requests are unaffected and always use system TLS." + ) + + async def start(self) -> None: + if self._session is not None: + return + timeout = ClientTimeout(total=DEFAULT_TIMEOUT_SECONDS) + self._session = aiohttp.ClientSession( + connector=TCPConnector(ssl=self._ssl_context), + timeout=timeout, + ) + # Separate session for the Enphase cloud: always uses default system TLS, + # never weakened by VERIFY_SSL=False on the local Envoy. + self._cloud_session = aiohttp.ClientSession(timeout=timeout) + + async def stop(self) -> None: + if self._session is not None: + await self._session.close() + self._session = None + if self._cloud_session is not None: + await self._cloud_session.close() + self._cloud_session = None + + async def _ensure_token(self) -> None: + if self._token: + return + async with self._token_lock: + if self._token: + return + assert self._cloud_session is not None + self._token = await _obtain_token( + self._cloud_session, self._username, self._password, self._serial + ) + + async def _refresh_token(self) -> None: + async with self._token_lock: + assert self._cloud_session is not None + self._token = await _obtain_token( + self._cloud_session, self._username, self._password, self._serial + ) + + async def _get_production(self) -> dict[str, Any]: + assert self._session is not None + url = f"https://{self.host}/production.json?details=1" + headers = {"Authorization": f"Bearer {self._token}"} + async with self._session.get(url, headers=headers) as resp: + resp.raise_for_status() + data = await resp.json(content_type=None) + return data if isinstance(data, dict) else {} + + async def _fetch_production(self) -> dict[str, Any]: + await self._ensure_token() + try: + return await self._get_production() + except ClientResponseError as e: + if e.status != 401 or not self._has_credentials: + raise + logger.info("Envoy: token rejected (401), refreshing") + await self._refresh_token() + return await self._get_production() + + async def get_powermeter_watts(self) -> list[float]: + data = await self._fetch_production() + consumption = data.get("consumption") + if not isinstance(consumption, list): + raise ValueError( + "Envoy: production.json missing 'consumption' array; " + "consumption CTs are required" + ) + + entry = next( + ( + c + for c in consumption + if isinstance(c, dict) and c.get("measurementType") == "net-consumption" + ), + None, + ) + if entry is None: + raise ValueError( + "Envoy: response does not expose 'net-consumption'; " + "consumption CTs are required" + ) + + lines = entry.get("lines") + if isinstance(lines, list) and lines: + return [float(line["wNow"]) for line in lines[:3]] + return [float(entry["wNow"])] diff --git a/src/astrameter/powermeter/envoy_test.py b/src/astrameter/powermeter/envoy_test.py new file mode 100644 index 00000000..2a24c714 --- /dev/null +++ b/src/astrameter/powermeter/envoy_test.py @@ -0,0 +1,224 @@ +import ssl +from unittest.mock import AsyncMock, MagicMock + +import aiohttp +import pytest + +from astrameter.powermeter import envoy as envoy_module +from astrameter.powermeter.envoy import Envoy + +SAMPLE_LINES_RESPONSE = { + "consumption": [ + { + "measurementType": "total-consumption", + "wNow": 1200.5, + "lines": [{"wNow": 400.0}, {"wNow": 350.0}, {"wNow": 450.5}], + }, + { + "measurementType": "net-consumption", + "wNow": -300.0, + "lines": [{"wNow": -100.0}, {"wNow": -80.0}, {"wNow": -120.0}], + }, + ], +} + + +def _mock_response(json_data: dict | None = None, *, raise_status: int | None = None): + response = AsyncMock() + response.json = AsyncMock(return_value=json_data or {}) + if raise_status is not None: + response.raise_for_status = MagicMock( + side_effect=aiohttp.ClientResponseError( + request_info=MagicMock(), + history=(), + status=raise_status, + message="error", + ) + ) + else: + response.raise_for_status = MagicMock() + return response + + +def _ctx(response) -> MagicMock: + ctx = MagicMock() + ctx.__aenter__ = AsyncMock(return_value=response) + ctx.__aexit__ = AsyncMock(return_value=False) + return ctx + + +def _mock_session(json_data: dict) -> MagicMock: + session = MagicMock() + session.get.return_value = _ctx(_mock_response(json_data)) + session.close = AsyncMock() + return session + + +def _mock_session_sequence(responses: list[MagicMock]) -> MagicMock: + session = MagicMock() + session.get.side_effect = [_ctx(r) for r in responses] + session.close = AsyncMock() + return session + + +def _make_envoy(**overrides: object) -> Envoy: + defaults: dict[str, object] = {"host": "192.168.1.200", "token": "test-token"} + defaults.update(overrides) + return Envoy(**defaults) # type: ignore[arg-type] + + +# 1. Single-phase response (no `lines`) -> [wNow_as_float]; assert element type is float. +async def test_single_phase_no_lines() -> None: + envoy = _make_envoy() + envoy._session = _mock_session( + {"consumption": [{"measurementType": "net-consumption", "wNow": 1234.5}]} + ) + result = await envoy.get_powermeter_watts() + assert result == [1234.5] + assert all(isinstance(v, float) for v in result) + + +# 2. Single-phase fallback when lines: [] -> uses aggregate wNow. +async def test_single_phase_empty_lines() -> None: + envoy = _make_envoy() + envoy._session = _mock_session( + { + "consumption": [ + {"measurementType": "net-consumption", "wNow": 999.0, "lines": []} + ] + } + ) + assert await envoy.get_powermeter_watts() == [999.0] + + +# 3. Three-phase with three lines -> [float, float, float]. +async def test_three_phase() -> None: + envoy = _make_envoy() + envoy._session = _mock_session(SAMPLE_LINES_RESPONSE) + result = await envoy.get_powermeter_watts() + assert result == [-100.0, -80.0, -120.0] + assert all(isinstance(v, float) for v in result) + + +# 4. Missing net-consumption entry -> ValueError mentioning net-consumption / CTs. +async def test_missing_net_consumption_raises() -> None: + envoy = _make_envoy() + envoy._session = _mock_session( + {"consumption": [{"measurementType": "total-consumption", "wNow": 800.0}]} + ) + with pytest.raises(ValueError, match="net-consumption"): + await envoy.get_powermeter_watts() + + +# 5. Missing consumption key entirely -> ValueError. +async def test_missing_consumption_key_raises() -> None: + envoy = _make_envoy() + envoy._session = _mock_session({"production": []}) + with pytest.raises(ValueError, match="consumption"): + await envoy.get_powermeter_watts() + + +# 6. Static-token path: Authorization: Bearer header passed through. +async def test_static_token_header() -> None: + envoy = _make_envoy(token="my-static-token") + envoy._session = _mock_session(SAMPLE_LINES_RESPONSE) + await envoy.get_powermeter_watts() + headers = envoy._session.get.call_args.kwargs["headers"] + assert headers["Authorization"] == "Bearer my-static-token" + + +# 7. Auto-obtain when no static token: monkeypatch _obtain_token. +async def test_auto_obtain_when_no_token(monkeypatch) -> None: + obtain = AsyncMock(return_value="fresh-jwt") + monkeypatch.setattr(envoy_module, "_obtain_token", obtain) + envoy = _make_envoy(token="", username="u@example.com", password="pw", serial="123") + envoy._session = _mock_session(SAMPLE_LINES_RESPONSE) + envoy._cloud_session = MagicMock() + await envoy.get_powermeter_watts() + obtain.assert_awaited_once_with(envoy._cloud_session, "u@example.com", "pw", "123") + assert envoy._token == "fresh-jwt" + headers = envoy._session.get.call_args.kwargs["headers"] + assert headers["Authorization"] == "Bearer fresh-jwt" + + +# 8. 401 refresh: first .get() yields 401, second yields data; obtain awaited once. +async def test_refreshes_on_401(monkeypatch) -> None: + obtain = AsyncMock(return_value="new-jwt") + monkeypatch.setattr(envoy_module, "_obtain_token", obtain) + envoy = _make_envoy( + token="expired", username="u@example.com", password="pw", serial="123" + ) + envoy._session = _mock_session_sequence( + [ + _mock_response(raise_status=401), + _mock_response(SAMPLE_LINES_RESPONSE), + ] + ) + envoy._cloud_session = MagicMock() + result = await envoy.get_powermeter_watts() + assert result == [-100.0, -80.0, -120.0] + obtain.assert_awaited_once() + assert envoy._token == "new-jwt" + # Second call must use the refreshed token. + second_call_headers = envoy._session.get.call_args_list[1].kwargs["headers"] + assert second_call_headers["Authorization"] == "Bearer new-jwt" + + +# 9. 401 with no credentials configured (static token only) -> propagates. +async def test_401_without_credentials_propagates(monkeypatch) -> None: + obtain = AsyncMock() + monkeypatch.setattr(envoy_module, "_obtain_token", obtain) + envoy = _make_envoy(token="static-only") + envoy._session = _mock_session_sequence([_mock_response(raise_status=401)]) + with pytest.raises(aiohttp.ClientResponseError): + await envoy.get_powermeter_watts() + obtain.assert_not_awaited() + + +# 10. __init__ without any auth config raises ValueError. +def test_init_without_auth_raises() -> None: + with pytest.raises(ValueError, match="TOKEN or USERNAME"): + Envoy(host="192.168.1.200") + + +def test_init_without_host_raises() -> None: + with pytest.raises(ValueError, match="HOST"): + Envoy(host="", token="t") + + +# 11. Cloud session ignores VERIFY_SSL: spy on TCPConnector to confirm only the +# local session gets the no-verify SSLContext; the cloud session uses defaults. +async def test_cloud_session_ignores_verify_ssl(monkeypatch) -> None: + captured: list[dict] = [] + real_connector = aiohttp.TCPConnector + + def spy(**kwargs): + captured.append(kwargs) + return real_connector(**kwargs) + + monkeypatch.setattr(envoy_module, "TCPConnector", spy) + + envoy = _make_envoy(verify_ssl=False) + await envoy.start() + try: + assert len(captured) == 1, ( + "Only the local Envoy session should construct a custom TCPConnector; " + "the cloud session must use aiohttp's default secure connector." + ) + local_ssl = captured[0]["ssl"] + assert isinstance(local_ssl, ssl.SSLContext) + assert local_ssl.verify_mode == ssl.CERT_NONE + finally: + await envoy.stop() + + +def test_build_ssl_context_verify_true() -> None: + ctx = envoy_module._build_ssl_context(verify_ssl=True) + assert ctx.verify_mode == ssl.CERT_REQUIRED + assert ctx.check_hostname is True + + +def test_build_ssl_context_verify_false() -> None: + ctx = envoy_module._build_ssl_context(verify_ssl=False) + assert ctx.verify_mode == ssl.CERT_NONE + assert ctx.check_hostname is False From 992ca6c3bbbaf1cde8db3e6ad62fd9249706fba8 Mon Sep 17 00:00:00 2001 From: Claude Date: Mon, 20 Apr 2026 05:25:32 +0000 Subject: [PATCH 2/2] Harden Envoy error handling - Validate each per-phase line entry before casting; raise descriptive ValueError instead of leaking KeyError/TypeError for malformed payloads - Replace lifecycle asserts with explicit RuntimeError so the checks remain under python -O, matching json_http.py - Skip redundant token refresh on 401 when another coroutine has already rotated self._token while we were awaiting https://claude.ai/code/session_01NiEwZMNZm9batkSCQCaRaz --- src/astrameter/powermeter/envoy.py | 39 +++++++++--- src/astrameter/powermeter/envoy_test.py | 81 +++++++++++++++++++++++++ 2 files changed, 113 insertions(+), 7 deletions(-) diff --git a/src/astrameter/powermeter/envoy.py b/src/astrameter/powermeter/envoy.py index 60daf78b..d06df516 100644 --- a/src/astrameter/powermeter/envoy.py +++ b/src/astrameter/powermeter/envoy.py @@ -132,20 +132,23 @@ async def _ensure_token(self) -> None: async with self._token_lock: if self._token: return - assert self._cloud_session is not None + if self._cloud_session is None: + raise RuntimeError("Cloud session not started; call start() first") self._token = await _obtain_token( self._cloud_session, self._username, self._password, self._serial ) async def _refresh_token(self) -> None: async with self._token_lock: - assert self._cloud_session is not None + if self._cloud_session is None: + raise RuntimeError("Cloud session not started; call start() first") self._token = await _obtain_token( self._cloud_session, self._username, self._password, self._serial ) async def _get_production(self) -> dict[str, Any]: - assert self._session is not None + if self._session is None: + raise RuntimeError("Session not started; call start() first") url = f"https://{self.host}/production.json?details=1" headers = {"Authorization": f"Bearer {self._token}"} async with self._session.get(url, headers=headers) as resp: @@ -155,13 +158,17 @@ async def _get_production(self) -> dict[str, Any]: async def _fetch_production(self) -> dict[str, Any]: await self._ensure_token() + old_token = self._token try: return await self._get_production() except ClientResponseError as e: if e.status != 401 or not self._has_credentials: raise - logger.info("Envoy: token rejected (401), refreshing") - await self._refresh_token() + # If another coroutine already refreshed while we were awaiting, + # skip our own refresh and retry with the fresh token. + if self._token == old_token: + logger.info("Envoy: token rejected (401), refreshing") + await self._refresh_token() return await self._get_production() async def get_powermeter_watts(self) -> list[float]: @@ -189,5 +196,23 @@ async def get_powermeter_watts(self) -> list[float]: lines = entry.get("lines") if isinstance(lines, list) and lines: - return [float(line["wNow"]) for line in lines[:3]] - return [float(entry["wNow"])] + values: list[float] = [] + for i, line in enumerate(lines[:3]): + if not isinstance(line, dict) or "wNow" not in line: + raise ValueError( + f"Envoy: malformed net-consumption line entry at index {i}" + ) + try: + values.append(float(line["wNow"])) + except (TypeError, ValueError) as err: + raise ValueError( + f"Envoy: non-numeric 'wNow' in net-consumption line at index {i}" + ) from err + return values + + if "wNow" not in entry: + raise ValueError("Envoy: net-consumption entry missing 'wNow'") + try: + return [float(entry["wNow"])] + except (TypeError, ValueError) as err: + raise ValueError("Envoy: non-numeric 'wNow' in net-consumption") from err diff --git a/src/astrameter/powermeter/envoy_test.py b/src/astrameter/powermeter/envoy_test.py index 2a24c714..03a3be36 100644 --- a/src/astrameter/powermeter/envoy_test.py +++ b/src/astrameter/powermeter/envoy_test.py @@ -222,3 +222,84 @@ def test_build_ssl_context_verify_false() -> None: ctx = envoy_module._build_ssl_context(verify_ssl=False) assert ctx.verify_mode == ssl.CERT_NONE assert ctx.check_hostname is False + + +# Malformed line entries produce descriptive ValueErrors instead of raw +# KeyError/TypeError. +async def test_malformed_line_entry_raises() -> None: + envoy = _make_envoy() + envoy._session = _mock_session( + { + "consumption": [ + { + "measurementType": "net-consumption", + "wNow": 0.0, + "lines": [{"wNow": 100.0}, "oops", {"wNow": 300.0}], + } + ] + } + ) + with pytest.raises(ValueError, match="malformed net-consumption line"): + await envoy.get_powermeter_watts() + + +async def test_non_numeric_line_wnow_raises() -> None: + envoy = _make_envoy() + envoy._session = _mock_session( + { + "consumption": [ + { + "measurementType": "net-consumption", + "wNow": 0.0, + "lines": [{"wNow": "not-a-number"}], + } + ] + } + ) + with pytest.raises(ValueError, match="non-numeric 'wNow'"): + await envoy.get_powermeter_watts() + + +# Lifecycle: operations before start() raise RuntimeError (not AssertionError, +# so the check survives python -O). +async def test_get_without_start_raises_runtime_error() -> None: + envoy = _make_envoy() + with pytest.raises(RuntimeError, match="not started"): + await envoy.get_powermeter_watts() + + +# Thundering-herd guard: if another coroutine already refreshed the token while +# we were awaiting the 401 response, skip our own refresh and just retry. +async def test_401_skips_refresh_if_token_changed(monkeypatch) -> None: + obtain = AsyncMock(return_value="should-not-be-used") + monkeypatch.setattr(envoy_module, "_obtain_token", obtain) + envoy = _make_envoy( + token="expired", username="u@example.com", password="pw", serial="123" + ) + + calls = iter( + [ + _mock_response(raise_status=401), + _mock_response(SAMPLE_LINES_RESPONSE), + ] + ) + + def fake_get(*_args, **_kwargs): + resp = next(calls) + if resp.raise_for_status.side_effect is not None: + # Simulate another coroutine having refreshed between the 401 + # being returned and our except handler running. + envoy._token = "already-refreshed" + return _ctx(resp) + + session = MagicMock() + session.get.side_effect = fake_get + envoy._session = session + envoy._cloud_session = MagicMock() + + result = await envoy.get_powermeter_watts() + assert result == [-100.0, -80.0, -120.0] + obtain.assert_not_awaited() + # Second call uses the token another coroutine already put in place. + second_headers = session.get.call_args_list[1].kwargs["headers"] + assert second_headers["Authorization"] == "Bearer already-refreshed"