From 1cb33b10cb1da76142fc923c3049b6b928b663b4 Mon Sep 17 00:00:00 2001 From: Claude Date: Fri, 17 Apr 2026 08:34:55 +0000 Subject: [PATCH 1/7] Add Marstek MQTT responder for CT002/CT003 in MQTT Insights When [MARSTEK] credentials are configured, the managed fake CT MAC returned by ensure_managed_fake_device() is now used to answer the Marstek CT002/CT003 MQTT poll protocol (cd=1 on hame_energy/marstek_energy topics) on the same broker as MQTT Insights. Combined with hame-relay, this makes the emulator's readings visible as a CT in the Marstek app. Enabled by default via MARSTEK_MQTT_ENABLED in [MQTT_INSIGHTS]; opt out by setting it to false. Without Marstek credentials the responder stays silent (one info log per CT device). https://claude.ai/code/session_01K5ypPxYASWXJewf7Lk9a1e --- README.md | 3 + config.ini.example | 6 + src/astrameter/config/config_loader.py | 3 + src/astrameter/main.py | 76 +++++- src/astrameter/mqtt_insights/__init__.py | 8 +- src/astrameter/mqtt_insights/marstek_mqtt.py | 104 ++++++++ .../mqtt_insights/marstek_mqtt_test.py | 191 ++++++++++++++ .../mqtt_insights/mqtt_insights_test.py | 242 ++++++++++++++++++ src/astrameter/mqtt_insights/service.py | 135 +++++++++- 9 files changed, 757 insertions(+), 11 deletions(-) create mode 100644 src/astrameter/mqtt_insights/marstek_mqtt.py create mode 100644 src/astrameter/mqtt_insights/marstek_mqtt_test.py diff --git a/README.md b/README.md index af0d8f86..b9f939bc 100644 --- a/README.md +++ b/README.md @@ -889,6 +889,9 @@ HA_DISCOVERY_PREFIX = homeassistant | `BASE_TOPIC` | `astrameter` | Root topic for all published messages | | `HA_DISCOVERY` | `true` | Enable Home Assistant MQTT Device Discovery | | `HA_DISCOVERY_PREFIX` | `homeassistant` | HA discovery topic prefix | +| `MARSTEK_MQTT_ENABLED` | `true` | Respond to Marstek CT002/CT003 MQTT polls on this broker (requires `[MARSTEK]`) | + +**Marstek app visibility**: when `[MARSTEK]` credentials are configured, AstraMeter registers a managed fake CT device in the Marstek cloud. With `MARSTEK_MQTT_ENABLED=true` (default) it also answers the CT's MQTT polls on this broker. Combined with [hame-relay](https://github.com/tomquist/hame-relay) bridging the local broker to the Marstek cloud, the emulator's readings then show up as a CT002/CT003 in the Marstek mobile app. Set `MARSTEK_MQTT_ENABLED=false` to keep MQTT Insights but opt out of this responder. **Published entities** (per CT002 consumer): - Grid power (L1/L2/L3/total), charge target (L1/L2/L3), reported power, saturation diff --git a/config.ini.example b/config.ini.example index 1fa8aec3..249b2743 100644 --- a/config.ini.example +++ b/config.ini.example @@ -418,3 +418,9 @@ THROTTLE_INTERVAL = 0 #HA_DISCOVERY = true ## HA discovery prefix (default: homeassistant) #HA_DISCOVERY_PREFIX = homeassistant +## Respond to Marstek CT002/CT003 MQTT polls on this broker (default: true). +## Combined with hame-relay (https://github.com/tomquist/hame-relay) the +## emulator's CT readings surface in the Marstek mobile app. Requires a +## working [MARSTEK] section: the MAC used in the MQTT topics is the +## managed fake device AstraMeter registers in the Marstek cloud. +#MARSTEK_MQTT_ENABLED = true diff --git a/src/astrameter/config/config_loader.py b/src/astrameter/config/config_loader.py index 895d8302..a742a755 100644 --- a/src/astrameter/config/config_loader.py +++ b/src/astrameter/config/config_loader.py @@ -732,5 +732,8 @@ def read_mqtt_insights_config( addon_slug=( config.get(section, "ADDON_SLUG", fallback="").strip() or None ), + marstek_mqtt_enabled=config.getboolean( + section, "MARSTEK_MQTT_ENABLED", fallback=True + ), ) return None diff --git a/src/astrameter/main.py b/src/astrameter/main.py index 667e1d9c..2dcd890e 100644 --- a/src/astrameter/main.py +++ b/src/astrameter/main.py @@ -1,6 +1,7 @@ import argparse import asyncio import configparser +import contextlib import os import signal from collections import OrderedDict @@ -18,7 +19,11 @@ MarstekConfig, ensure_managed_fake_device, ) -from astrameter.mqtt_insights import MqttInsightsService +from astrameter.mqtt_insights import ( + MarstekMqttBinding, + MqttInsightsService, + normalize_mac, +) from astrameter.powermeter import Powermeter from astrameter.shelly import Shelly from astrameter.version_info import get_git_commit_sha @@ -118,6 +123,7 @@ async def run_device( powermeters: list[tuple[Powermeter, ClientFilter, bool]], device_id: str | None = None, insights: MqttInsightsService | None = None, + marstek_mac: str = "", ): logger.debug(f"Starting device: {device_type}") @@ -341,11 +347,51 @@ def _shelly_event_listener(dev_id, battery_ip, data): device_id or "", device.force_efficiency_rotation ) + # Marstek MQTT responder — only wired up when Marstek credentials + # yielded a managed MAC (so hame-relay can route the replies back to + # the Marstek app) and the feature is enabled. + if isinstance(device, CT002) and insights and insights.marstek_mqtt_enabled: + if marstek_mac: + + async def _marstek_get_values( + _pms: list[tuple[Powermeter, ClientFilter]] = powermeters, + ) -> list[float]: + chosen: Powermeter | None = next( + (pm for pm, cf in _pms if cf.matches("0.0.0.0")), None + ) + if chosen is None and _pms: + chosen = _pms[0][0] + if chosen is None: + return [0.0, 0.0, 0.0] + await chosen.wait_for_next_message() + vs = await chosen.get_powermeter_watts() + return [float(vs[i]) if i < len(vs) else 0.0 for i in range(3)] + + await insights.register_marstek( + MarstekMqttBinding( + device_id=device_id or "", + ct_type=device.ct_type, + mac=marstek_mac, + get_values=_marstek_get_values, + wifi_rssi=device.wifi_rssi, + ) + ) + else: + logger.info( + "Marstek MQTT responder not wired for %s: no managed MAC " + "available. Enable [MARSTEK] with MAILBOX/PASSWORD to use " + "this feature, or set MARSTEK_MQTT_ENABLED=false to silence " + "this notice.", + device_id, + ) + try: await device.wait() finally: if insights and isinstance(device, CT002): insights.unregister_handlers(device_id or "") + with contextlib.suppress(Exception): + await insights.unregister_marstek(device_id or "") try: await device.stop() except Exception: @@ -358,7 +404,9 @@ async def async_main( device_types: list[str], device_ids: list[str], skip_test: bool, + managed_macs: dict[str, str] | None = None, ): + managed_macs = managed_macs or {} web_server = None if cfg.getboolean("GENERAL", "ENABLE_WEB_SERVER", fallback=True): logger.info("Starting web server...") @@ -411,7 +459,15 @@ async def async_main( await asyncio.gather( *( - run_device(device_type, cfg, args, powermeters, device_id, insights) + run_device( + device_type, + cfg, + args, + powermeters, + device_id, + insights, + managed_macs.get(device_type, ""), + ) for device_type, device_id in zip( device_types, device_ids, strict=False ) @@ -568,7 +624,11 @@ def main(): _apply_cli_overrides(cfg, args) - # Optional Marstek cloud registration for managed fake CT devices (sync, before event loop) + # Optional Marstek cloud registration for managed fake CT devices (sync, before event loop). + # When registration succeeds, the returned MAC is captured per device + # type so the Marstek MQTT responder in MQTT Insights uses the same + # MAC that hame-relay will route back to the Marstek app. + managed_macs: dict[str, str] = {} marstek_enabled = cfg.getboolean("MARSTEK", "ENABLE", fallback=False) if marstek_enabled: mailbox = cfg.get("MARSTEK", "MAILBOX", fallback="") @@ -592,7 +652,11 @@ def main(): for dt in ("ct002", "ct003"): if dt in device_types: any_ct = True - ensure_managed_fake_device(marstek_cfg, dt) + created = ensure_managed_fake_device(marstek_cfg, dt) + if created is not None: + normalized = normalize_mac(str(created.get("mac", ""))) + if normalized: + managed_macs[dt] = normalized if any_ct: logger.info( "Managed fake CT registration completed. Fake CT devices appear as offline in the Marstek app CT list (this is expected)." @@ -640,7 +704,9 @@ def _restart_handler(signum, frame): while True: restart_requested = False try: - asyncio.run(async_main(cfg, args, device_types, device_ids, skip_test)) + asyncio.run( + async_main(cfg, args, device_types, device_ids, skip_test, managed_macs) + ) break # clean exit except KeyboardInterrupt: if not restart_requested: diff --git a/src/astrameter/mqtt_insights/__init__.py b/src/astrameter/mqtt_insights/__init__.py index ff739e51..3d97ec30 100644 --- a/src/astrameter/mqtt_insights/__init__.py +++ b/src/astrameter/mqtt_insights/__init__.py @@ -1,5 +1,11 @@ """MQTT Insights — publish internal state to MQTT with HA Discovery.""" +from .marstek_mqtt import MarstekMqttBinding, normalize_mac from .service import MqttInsightsConfig, MqttInsightsService -__all__ = ["MqttInsightsConfig", "MqttInsightsService"] +__all__ = [ + "MarstekMqttBinding", + "MqttInsightsConfig", + "MqttInsightsService", + "normalize_mac", +] diff --git a/src/astrameter/mqtt_insights/marstek_mqtt.py b/src/astrameter/mqtt_insights/marstek_mqtt.py new file mode 100644 index 00000000..cf22193e --- /dev/null +++ b/src/astrameter/mqtt_insights/marstek_mqtt.py @@ -0,0 +1,104 @@ +"""Marstek MQTT responder — answer CT002/CT003 poll requests on the local broker. + +Pure helpers (topic formatting, payload building, poll detection) plus a +binding dataclass that the :class:`MqttInsightsService` stores per device. + +The wire protocol (topics + CSV key=value payload) matches what the real +Marstek CT002 uses against the Marstek cloud broker. When hame-relay is +bridging the local broker to the cloud, responses produced here reach the +Marstek mobile app as if they came from a real CT002. +""" + +from __future__ import annotations + +import re +from collections.abc import Awaitable, Callable +from dataclasses import dataclass + +APP_TOPIC_TEMPLATES = ( + "hame_energy/{ct_type}/App/{mac}/ctrl", + "marstek_energy/{ct_type}/App/{mac}/ctrl", +) +DEVICE_TOPIC_TEMPLATES = ( + "hame_energy/{ct_type}/device/{mac}/ctrl", + "marstek_energy/{ct_type}/device/{mac}/ctrl", +) + +# Matches observed real-device values; included so hm2mqtt-style parsers +# recognise the message as a well-formed runtime-info frame. +DEFAULT_VER_V = 148 + +_APP_TOPIC_RE = re.compile( + r"^(?:hame|marstek)_energy/(?P[^/]+)/App/(?P[^/]+)/ctrl$" +) +_MAC_HEX_RE = re.compile(r"^[0-9a-f]{12}$") + + +@dataclass(frozen=True) +class MarstekMqttBinding: + """Per-device registration used by the MQTT Insights service.""" + + device_id: str + ct_type: str + mac: str + get_values: Callable[[], Awaitable[list[float]]] + wifi_rssi: int + ver_v: int = DEFAULT_VER_V + + +def normalize_mac(raw: str) -> str: + """Lowercase, strip ``:``/``-``; return ``""`` if not 12 hex chars.""" + if not raw: + return "" + cleaned = raw.replace(":", "").replace("-", "").strip().lower() + return cleaned if _MAC_HEX_RE.fullmatch(cleaned) else "" + + +def is_poll_payload(body: bytes) -> bool: + """Return True iff *body* is a CSV containing ``cd=1``.""" + if not body: + return False + try: + text = body.decode("utf-8") + except UnicodeDecodeError: + return False + for chunk in text.split(","): + key, sep, value = chunk.partition("=") + if not sep: + continue + if key.strip().lower() == "cd" and value.strip() == "1": + return True + return False + + +def parse_app_topic(topic: str) -> tuple[str, str] | None: + """Return ``(ct_type, mac)`` for a Marstek App topic, else ``None``.""" + match = _APP_TOPIC_RE.match(topic) + if not match: + return None + return match.group("ct_type"), match.group("mac").lower() + + +def app_topics_for(binding: MarstekMqttBinding) -> tuple[str, str]: + return tuple( # type: ignore[return-value] + t.format(ct_type=binding.ct_type, mac=binding.mac) for t in APP_TOPIC_TEMPLATES + ) + + +def device_topics_for(binding: MarstekMqttBinding) -> tuple[str, str]: + return tuple( # type: ignore[return-value] + t.format(ct_type=binding.ct_type, mac=binding.mac) + for t in DEVICE_TOPIC_TEMPLATES + ) + + +def build_response(binding: MarstekMqttBinding, watts: list[float]) -> bytes: + """Build the CSV ``k=v`` response body for a runtime-info frame.""" + vs = list(watts) + [0.0] * (3 - len(watts)) + a, b, c = (round(v) for v in vs[:3]) + total = a + b + c + payload = ( + f"pwr_a={a},pwr_b={b},pwr_c={c},pwr_t={total}," + f"wif_r={binding.wifi_rssi},ver_v={binding.ver_v},wif_s=2" + ) + return payload.encode("utf-8") diff --git a/src/astrameter/mqtt_insights/marstek_mqtt_test.py b/src/astrameter/mqtt_insights/marstek_mqtt_test.py new file mode 100644 index 00000000..bafc39fe --- /dev/null +++ b/src/astrameter/mqtt_insights/marstek_mqtt_test.py @@ -0,0 +1,191 @@ +"""Unit tests for Marstek MQTT helpers (pure, no broker).""" + +from __future__ import annotations + +import pytest + +from .marstek_mqtt import ( + DEFAULT_VER_V, + MarstekMqttBinding, + app_topics_for, + build_response, + device_topics_for, + is_poll_payload, + normalize_mac, + parse_app_topic, +) + + +def _binding( + *, ct_type: str = "HME-4", mac: str = "02b250aabbcc", wifi_rssi: int = -50 +) -> MarstekMqttBinding: + async def _noop() -> list[float]: + return [0.0, 0.0, 0.0] + + return MarstekMqttBinding( + device_id="device-1", + ct_type=ct_type, + mac=mac, + get_values=_noop, + wifi_rssi=wifi_rssi, + ) + + +# ── normalize_mac ───────────────────────────────────────────────────────── + + +@pytest.mark.parametrize( + "raw,expected", + [ + ("AA:BB:CC:DD:EE:FF", "aabbccddeeff"), + ("aa-bb-cc-dd-ee-ff", "aabbccddeeff"), + ("AABBCCDDEEFF", "aabbccddeeff"), + ("aabbccddeeff", "aabbccddeeff"), + (" 02b250AABBCC ", "02b250aabbcc"), + ], +) +def test_normalize_mac_accepts_common_formats(raw: str, expected: str) -> None: + assert normalize_mac(raw) == expected + + +@pytest.mark.parametrize( + "raw", + ["", "zz", "aabbccddeef", "aabbccddeeffff", "ghijklmnopqr", "aa:bb:cc"], +) +def test_normalize_mac_rejects_invalid(raw: str) -> None: + assert normalize_mac(raw) == "" + + +# ── is_poll_payload ─────────────────────────────────────────────────────── + + +@pytest.mark.parametrize( + "body", + [ + b"cd=1", + b"cd=1,foo=bar", + b"foo=bar,cd=1", + b" cd = 1 ", + b"cd=1\n", + b"CD=1", + ], +) +def test_is_poll_payload_accepts_cd1(body: bytes) -> None: + assert is_poll_payload(body) is True + + +@pytest.mark.parametrize( + "body", + [b"", b"cd=0", b"cd=", b"garbage", b"pwr_a=1", b"\xff\xfe"], +) +def test_is_poll_payload_rejects_non_cd1(body: bytes) -> None: + assert is_poll_payload(body) is False + + +# ── parse_app_topic ─────────────────────────────────────────────────────── + + +@pytest.mark.parametrize( + "topic,expected", + [ + ( + "hame_energy/HME-4/App/02b250aabbcc/ctrl", + ("HME-4", "02b250aabbcc"), + ), + ( + "marstek_energy/HME-3/App/02b250ccddee/ctrl", + ("HME-3", "02b250ccddee"), + ), + ( + "hame_energy/HME-4/App/02B250AABBCC/ctrl", + ("HME-4", "02b250aabbcc"), + ), + ], +) +def test_parse_app_topic_accepts_valid(topic: str, expected: tuple[str, str]) -> None: + assert parse_app_topic(topic) == expected + + +@pytest.mark.parametrize( + "topic", + [ + "hame_energy/HME-4/device/02b250aabbcc/ctrl", + "hame_energy/HME-4/App/02b250aabbcc", + "marstek_energy/HME-4/App//ctrl", + "other/HME-4/App/02b250aabbcc/ctrl", + "hame_energy/HME-4/App/02b250aabbcc/extra/ctrl", + "", + ], +) +def test_parse_app_topic_rejects_invalid(topic: str) -> None: + assert parse_app_topic(topic) is None + + +# ── topic helpers ───────────────────────────────────────────────────────── + + +def test_app_topics_for() -> None: + b = _binding(ct_type="HME-4", mac="02b250aabbcc") + assert app_topics_for(b) == ( + "hame_energy/HME-4/App/02b250aabbcc/ctrl", + "marstek_energy/HME-4/App/02b250aabbcc/ctrl", + ) + + +def test_device_topics_for() -> None: + b = _binding(ct_type="HME-3", mac="02b250ccddee") + assert device_topics_for(b) == ( + "hame_energy/HME-3/device/02b250ccddee/ctrl", + "marstek_energy/HME-3/device/02b250ccddee/ctrl", + ) + + +# ── build_response ──────────────────────────────────────────────────────── + + +def test_build_response_includes_required_and_optional_keys() -> None: + b = _binding(wifi_rssi=-50) + body = build_response(b, [100.0, 200.0, 300.0]) + assert body == ( + b"pwr_a=100,pwr_b=200,pwr_c=300,pwr_t=600,wif_r=-50,ver_v=148,wif_s=2" + ) + + +def test_build_response_rounds_and_sums() -> None: + b = _binding(wifi_rssi=-42) + # 123.6 → 124, 45.4 → 45, -67.9 → -68; total = 124 + 45 - 68 = 101 + body = build_response(b, [123.6, 45.4, -67.9]) + text = body.decode() + assert "pwr_a=124" in text + assert "pwr_b=45" in text + assert "pwr_c=-68" in text + assert "pwr_t=101" in text + assert "wif_r=-42" in text + assert f"ver_v={DEFAULT_VER_V}" in text + assert "wif_s=2" in text + + +def test_build_response_pads_short_list() -> None: + b = _binding() + body = build_response(b, [123.0]) + text = body.decode() + assert "pwr_a=123" in text + assert "pwr_b=0" in text + assert "pwr_c=0" in text + assert "pwr_t=123" in text + + +def test_build_response_custom_ver_v() -> None: + async def _g() -> list[float]: + return [0.0, 0.0, 0.0] + + b = MarstekMqttBinding( + device_id="d", + ct_type="HME-4", + mac="02b250aabbcc", + get_values=_g, + wifi_rssi=-60, + ver_v=200, + ) + body = build_response(b, [0.0, 0.0, 0.0]) + assert b"ver_v=200" in body diff --git a/src/astrameter/mqtt_insights/mqtt_insights_test.py b/src/astrameter/mqtt_insights/mqtt_insights_test.py index c985caaa..2edc74a2 100644 --- a/src/astrameter/mqtt_insights/mqtt_insights_test.py +++ b/src/astrameter/mqtt_insights/mqtt_insights_test.py @@ -23,6 +23,7 @@ build_shelly_battery_discovery, build_shelly_device_discovery, ) +from .marstek_mqtt import MarstekMqttBinding from .service import MqttInsightsConfig, MqttInsightsService, _arp_lookup # ── Discovery payload unit tests ────────────────────────────────────────── @@ -386,6 +387,24 @@ def test_read_mqtt_insights_config_absent(): assert read_mqtt_insights_config(cfg) is None +def test_read_mqtt_insights_config_marstek_mqtt_default_true(): + cfg = configparser.ConfigParser() + cfg.read_string("[MQTT_INSIGHTS]\nBROKER = localhost\n") + result = read_mqtt_insights_config(cfg) + assert result is not None + assert result.marstek_mqtt_enabled is True + + +def test_read_mqtt_insights_config_marstek_mqtt_opt_out(): + cfg = configparser.ConfigParser() + cfg.read_string( + "[MQTT_INSIGHTS]\nBROKER = localhost\nMARSTEK_MQTT_ENABLED = false\n" + ) + result = read_mqtt_insights_config(cfg) + assert result is not None + assert result.marstek_mqtt_enabled is False + + # ── Service unit tests (no broker) ─────────────────────────────────────── @@ -820,3 +839,226 @@ def test_consumer_state_includes_manual_target_fields(): } assert consumer_state["manual_target"] is None assert consumer_state["auto_target"] is True + + +# ── Marstek MQTT responder tests ───────────────────────────────────────── + + +def _make_binding( + *, + device_id: str = "ct002-dev1", + ct_type: str = "HME-4", + mac: str = "02b250aabbcc", + wifi_rssi: int = -50, + values: list[float] | None = None, + raises: BaseException | None = None, +) -> tuple[MarstekMqttBinding, list[tuple[str, ...]]]: + calls: list[tuple[str, ...]] = [] + vs = [100.0, 200.0, 300.0] if values is None else values + + async def _get() -> list[float]: + calls.append(("called",)) + if raises is not None: + raise raises + return list(vs) + + return ( + MarstekMqttBinding( + device_id=device_id, + ct_type=ct_type, + mac=mac, + get_values=_get, + wifi_rssi=wifi_rssi, + ), + calls, + ) + + +def test_register_marstek_while_disconnected_stores_binding(): + """register_marstek before start() only populates the dict.""" + service = MqttInsightsService(MqttInsightsConfig(broker="localhost")) + binding, _ = _make_binding() + + async def _run() -> None: + await service.register_marstek(binding) + + asyncio.run(_run()) + assert service._marstek_bindings["ct002-dev1"] is binding + + +def test_register_marstek_no_op_when_disabled(): + service = MqttInsightsService( + MqttInsightsConfig(broker="localhost", marstek_mqtt_enabled=False) + ) + binding, _ = _make_binding() + + async def _run() -> None: + await service.register_marstek(binding) + + asyncio.run(_run()) + assert service._marstek_bindings == {} + + +@needs_mosquitto +async def test_marstek_poll_responds_on_both_topics(mqtt_broker): + port = mqtt_broker + service = _make_service(port) + binding, calls = _make_binding(values=[100.0, 200.0, 300.0]) + await service.register_marstek(binding) + await service.start() + + try: + await service.wait_connected() + await _poll(lambda: service._client is not None) + + received = [] + async with aiomqtt.Client(hostname="127.0.0.1", port=port) as client: + await client.subscribe(f"hame_energy/HME-4/device/{binding.mac}/ctrl") + await client.subscribe(f"marstek_energy/HME-4/device/{binding.mac}/ctrl") + await client.publish( + f"hame_energy/HME-4/App/{binding.mac}/ctrl", + payload=b"cd=1", + ) + await _collect_messages( + client, received, timeout=5, stop=lambda _: len(received) >= 2 + ) + + assert len(received) == 2 + topics = sorted(str(m.topic) for m in received) + assert topics == [ + f"hame_energy/HME-4/device/{binding.mac}/ctrl", + f"marstek_energy/HME-4/device/{binding.mac}/ctrl", + ] + expected = ( + b"pwr_a=100,pwr_b=200,pwr_c=300,pwr_t=600,wif_r=-50,ver_v=148,wif_s=2" + ) + for msg in received: + assert msg.payload == expected + assert len(calls) == 1 + finally: + await service.stop() + + +@needs_mosquitto +async def test_marstek_ignores_non_poll_payload(mqtt_broker): + port = mqtt_broker + service = _make_service(port) + binding, calls = _make_binding() + await service.register_marstek(binding) + await service.start() + + try: + await service.wait_connected() + await _poll(lambda: service._client is not None) + + received = [] + async with aiomqtt.Client(hostname="127.0.0.1", port=port) as client: + await client.subscribe(f"hame_energy/HME-4/device/{binding.mac}/ctrl") + await client.publish( + f"hame_energy/HME-4/App/{binding.mac}/ctrl", payload=b"cd=0" + ) + await _collect_messages(client, received, timeout=1) + + assert received == [] + assert calls == [] + finally: + await service.stop() + + +@needs_mosquitto +async def test_marstek_unregister_stops_replies(mqtt_broker): + port = mqtt_broker + service = _make_service(port) + binding, _ = _make_binding() + await service.register_marstek(binding) + await service.start() + + try: + await service.wait_connected() + await _poll(lambda: service._client is not None) + + async with aiomqtt.Client(hostname="127.0.0.1", port=port) as client: + await client.subscribe(f"hame_energy/HME-4/device/{binding.mac}/ctrl") + + # Initial poll — expect one reply + first = [] + await client.publish( + f"hame_energy/HME-4/App/{binding.mac}/ctrl", payload=b"cd=1" + ) + await _collect_messages( + client, first, timeout=5, stop=lambda _: len(first) >= 1 + ) + assert len(first) == 1 + + # Unregister and poll again — expect no reply + await service.unregister_marstek(binding.device_id) + second = [] + await client.publish( + f"hame_energy/HME-4/App/{binding.mac}/ctrl", payload=b"cd=1" + ) + await _collect_messages(client, second, timeout=1) + assert second == [] + finally: + await service.stop() + + +@needs_mosquitto +async def test_marstek_opt_out_disables_subscription(mqtt_broker): + port = mqtt_broker + global _test_counter + _test_counter += 1 + service = MqttInsightsService( + MqttInsightsConfig( + broker="127.0.0.1", + port=port, + base_topic=f"test_insights_{_test_counter}", + ha_discovery=True, + ha_discovery_prefix=f"ha_disc_{_test_counter}", + marstek_mqtt_enabled=False, + ) + ) + binding, calls = _make_binding() + await service.register_marstek(binding) # no-op when disabled + await service.start() + + try: + await service.wait_connected() + + async with aiomqtt.Client(hostname="127.0.0.1", port=port) as client: + await client.subscribe(f"hame_energy/HME-4/device/{binding.mac}/ctrl") + await client.publish( + f"hame_energy/HME-4/App/{binding.mac}/ctrl", payload=b"cd=1" + ) + received = [] + await _collect_messages(client, received, timeout=1) + assert received == [] + assert calls == [] + finally: + await service.stop() + + +@needs_mosquitto +async def test_marstek_get_values_failure_suppressed(mqtt_broker): + port = mqtt_broker + service = _make_service(port) + binding, calls = _make_binding(raises=RuntimeError("powermeter offline")) + await service.register_marstek(binding) + await service.start() + + try: + await service.wait_connected() + await _poll(lambda: service._client is not None) + + async with aiomqtt.Client(hostname="127.0.0.1", port=port) as client: + await client.subscribe(f"hame_energy/HME-4/device/{binding.mac}/ctrl") + await client.publish( + f"hame_energy/HME-4/App/{binding.mac}/ctrl", payload=b"cd=1" + ) + received = [] + await _collect_messages(client, received, timeout=1) + assert received == [] + # get_values was called but no reply was published + assert calls == [("called",)] + assert binding.device_id in service._marstek_get_values_failed + finally: + await service.stop() diff --git a/src/astrameter/mqtt_insights/service.py b/src/astrameter/mqtt_insights/service.py index 18b3a918..7ca7cd73 100644 --- a/src/astrameter/mqtt_insights/service.py +++ b/src/astrameter/mqtt_insights/service.py @@ -22,6 +22,14 @@ build_shelly_battery_discovery, build_shelly_device_discovery, ) +from .marstek_mqtt import ( + MarstekMqttBinding, + app_topics_for, + build_response, + device_topics_for, + is_poll_payload, + parse_app_topic, +) RECONNECT_DELAY = 5 QUEUE_MAX_SIZE = 100 @@ -59,6 +67,11 @@ class MqttInsightsConfig: ha_discovery: bool = True ha_discovery_prefix: str = "homeassistant" addon_slug: str | None = None + # Respond to Marstek app MQTT polls for CT002/CT003 on the same + # broker connection. Combined with hame-relay this surfaces the + # emulator in the Marstek app. Default on; requires [MARSTEK] + # credentials so that the managed MAC matches the cloud device. + marstek_mqtt_enabled: bool = True @dataclass @@ -84,6 +97,13 @@ def __init__(self, config: MqttInsightsConfig) -> None: self._auto_target_handlers: dict[str, Callable[[str, bool], None]] = {} self._rotation_handlers: dict[str, Callable[[], None]] = {} self._connected = asyncio.Event() + # Marstek MQTT responder state — populated via register_marstek(). + self._marstek_bindings: dict[str, MarstekMqttBinding] = {} + self._marstek_lock = asyncio.Lock() + self._client: aiomqtt.Client | None = None + # Rate-limit per-device get_values failure logging so a broken + # powermeter doesn't flood the log at hm2mqtt's poll cadence. + self._marstek_get_values_failed: set[str] = set() # ── Public API (called from device event listeners) ─────────────── @@ -142,6 +162,46 @@ def unregister_handlers(self, device_id: str) -> None: self._auto_target_handlers.pop(device_id, None) self._rotation_handlers.pop(device_id, None) + # ── Marstek MQTT responder ──────────────────────────────────────── + + @property + def marstek_mqtt_enabled(self) -> bool: + return self._config.marstek_mqtt_enabled + + async def register_marstek(self, binding: MarstekMqttBinding) -> None: + """Register a CT002/CT003 Marstek MQTT responder for *binding*. + + If already connected, live-subscribes to the App topics; otherwise + the ``_run`` loop picks up the new entry on the next (re)connect. + """ + if not self._config.marstek_mqtt_enabled: + return + async with self._marstek_lock: + existing = self._marstek_bindings.get(binding.device_id) + if existing is not None and existing.mac != binding.mac: + logger.warning( + "Marstek MQTT: re-registering %s with a different MAC (%s → %s)", + binding.device_id, + existing.mac, + binding.mac, + ) + self._marstek_bindings[binding.device_id] = binding + client = self._client + if client is not None: + for topic in app_topics_for(binding): + with contextlib.suppress(aiomqtt.MqttError): + await client.subscribe(topic) + + async def unregister_marstek(self, device_id: str) -> None: + async with self._marstek_lock: + binding = self._marstek_bindings.pop(device_id, None) + self._marstek_get_values_failed.discard(device_id) + client = self._client + if binding is not None and client is not None: + for topic in app_topics_for(binding): + with contextlib.suppress(aiomqtt.MqttError): + await client.unsubscribe(topic) + # ── Lifecycle ───────────────────────────────────────────────────── async def start(self) -> None: @@ -212,13 +272,27 @@ async def _run(self) -> None: await client.subscribe(f"{cfg.base_topic}/ct002/+/consumer/+/set") await client.subscribe(f"{cfg.base_topic}/ct002/+/set") + # Subscribe to Marstek App topics for every registered + # binding. Store the client so register_marstek() called + # while already connected can live-subscribe too. + if cfg.marstek_mqtt_enabled: + async with self._marstek_lock: + self._client = client + for binding in self._marstek_bindings.values(): + for topic in app_topics_for(binding): + await client.subscribe(topic) + self._connected.set() - # Run publish loop and message listener concurrently - await asyncio.gather( - self._publish_loop(client), - self._listen_commands(client), - ) + try: + # Run publish loop and message listener concurrently + await asyncio.gather( + self._publish_loop(client), + self._listen_commands(client), + ) + finally: + async with self._marstek_lock: + self._client = None except asyncio.CancelledError: self._connected.clear() @@ -473,6 +547,11 @@ async def _listen_commands(self, client: aiomqtt.Client) -> None: async for message in client.messages: topic_str = str(message.topic) + if topic_str.startswith("hame_energy/") or topic_str.startswith( + "marstek_energy/" + ): + await self._handle_marstek_message(client, message) + continue if not topic_str.startswith(prefix) or not topic_str.endswith(suffix): continue @@ -604,3 +683,49 @@ def _handle_device_command(self, device_id: str, cmd: dict) -> None: logger.exception("Rotation handler error for device %s", device_id) else: logger.debug("No rotation handler for device %s", device_id) + + async def _handle_marstek_message( + self, client: aiomqtt.Client, message: aiomqtt.Message + ) -> None: + topic = str(message.topic) + parsed = parse_app_topic(topic) + if parsed is None: + return + ct_type, mac = parsed + binding = self._find_marstek_binding(ct_type, mac) + if binding is None: + logger.debug("Marstek MQTT: no binding for %s/%s", ct_type, mac) + return + + body = message.payload if isinstance(message.payload, bytes) else b"" + if not is_poll_payload(body): + logger.debug("Marstek MQTT: non-poll payload on %s", topic) + return + + try: + watts = await binding.get_values() + except Exception: + if binding.device_id not in self._marstek_get_values_failed: + logger.exception( + "Marstek MQTT: get_values failed for %s; suppressing " + "further failures until values recover", + binding.device_id, + ) + self._marstek_get_values_failed.add(binding.device_id) + return + if binding.device_id in self._marstek_get_values_failed: + logger.info("Marstek MQTT: get_values recovered for %s", binding.device_id) + self._marstek_get_values_failed.discard(binding.device_id) + + payload = build_response(binding, list(watts)) + for reply_topic in device_topics_for(binding): + with contextlib.suppress(aiomqtt.MqttError): + await client.publish(reply_topic, payload=payload, qos=0, retain=False) + + def _find_marstek_binding( + self, ct_type: str, mac: str + ) -> MarstekMqttBinding | None: + for binding in self._marstek_bindings.values(): + if binding.ct_type == ct_type and binding.mac == mac.lower(): + return binding + return None From 74c52b1df3fe2f6341b33d889e51e715073450c1 Mon Sep 17 00:00:00 2001 From: Claude Date: Sun, 19 Apr 2026 09:40:53 +0000 Subject: [PATCH 2/7] Offload Marstek poll handler so a slow powermeter can't stall listener The MQTT Insights listener is a single async-for loop. When a Marstek poll handler awaited binding.get_values() inline it could block every subsequent message (other CT polls, Insights commands) for as long as the powermeter took to yield a reading. Spawn each response in its own task instead, and track/cancel those tasks on disconnect/shutdown. Also snapshot _marstek_bindings under the lock before scanning in _find_marstek_binding, and drop the type: ignore on the topic helpers by returning an explicit 2-tuple. Adds two integration tests: register-before-start populates subscriptions on first connect, and a slow handler on one binding doesn't block a concurrent fast poll on another binding. https://claude.ai/code/session_01K5ypPxYASWXJewf7Lk9a1e --- src/astrameter/mqtt_insights/marstek_mqtt.py | 13 ++- .../mqtt_insights/mqtt_insights_test.py | 94 ++++++++++++++++++- src/astrameter/mqtt_insights/service.py | 35 ++++++- 3 files changed, 132 insertions(+), 10 deletions(-) diff --git a/src/astrameter/mqtt_insights/marstek_mqtt.py b/src/astrameter/mqtt_insights/marstek_mqtt.py index cf22193e..39a44c30 100644 --- a/src/astrameter/mqtt_insights/marstek_mqtt.py +++ b/src/astrameter/mqtt_insights/marstek_mqtt.py @@ -80,15 +80,18 @@ def parse_app_topic(topic: str) -> tuple[str, str] | None: def app_topics_for(binding: MarstekMqttBinding) -> tuple[str, str]: - return tuple( # type: ignore[return-value] - t.format(ct_type=binding.ct_type, mac=binding.mac) for t in APP_TOPIC_TEMPLATES + old, new = APP_TOPIC_TEMPLATES + return ( + old.format(ct_type=binding.ct_type, mac=binding.mac), + new.format(ct_type=binding.ct_type, mac=binding.mac), ) def device_topics_for(binding: MarstekMqttBinding) -> tuple[str, str]: - return tuple( # type: ignore[return-value] - t.format(ct_type=binding.ct_type, mac=binding.mac) - for t in DEVICE_TOPIC_TEMPLATES + old, new = DEVICE_TOPIC_TEMPLATES + return ( + old.format(ct_type=binding.ct_type, mac=binding.mac), + new.format(ct_type=binding.ct_type, mac=binding.mac), ) diff --git a/src/astrameter/mqtt_insights/mqtt_insights_test.py b/src/astrameter/mqtt_insights/mqtt_insights_test.py index 2edc74a2..c4611f5b 100644 --- a/src/astrameter/mqtt_insights/mqtt_insights_test.py +++ b/src/astrameter/mqtt_insights/mqtt_insights_test.py @@ -1058,7 +1058,99 @@ async def test_marstek_get_values_failure_suppressed(mqtt_broker): await _collect_messages(client, received, timeout=1) assert received == [] # get_values was called but no reply was published + await _poll(lambda: binding.device_id in service._marstek_get_values_failed) assert calls == [("called",)] - assert binding.device_id in service._marstek_get_values_failed finally: await service.stop() + + +@needs_mosquitto +async def test_marstek_register_before_start_subscribes_on_connect(mqtt_broker): + """A binding registered before start() must get its App topics + subscribed on the first connect.""" + port = mqtt_broker + service = _make_service(port) + binding, _ = _make_binding() + # Register *before* start — the service must pick this up on connect. + await service.register_marstek(binding) + await service.start() + + try: + await service.wait_connected() + await _poll(lambda: service._client is not None) + + async with aiomqtt.Client(hostname="127.0.0.1", port=port) as client: + await client.subscribe(f"hame_energy/HME-4/device/{binding.mac}/ctrl") + await client.publish( + f"hame_energy/HME-4/App/{binding.mac}/ctrl", payload=b"cd=1" + ) + received = [] + await _collect_messages( + client, received, timeout=5, stop=lambda _: len(received) >= 1 + ) + assert len(received) == 1 + assert received[0].payload.startswith( + b"pwr_a=100,pwr_b=200,pwr_c=300,pwr_t=600" + ) + finally: + await service.stop() + + +@needs_mosquitto +async def test_marstek_slow_handler_does_not_stall_listener(mqtt_broker): + """A slow get_values for one binding must not block polls for another. + + With the offload-to-task design, the listener stays responsive even + while a prior poll handler is still awaiting its powermeter. + """ + port = mqtt_broker + service = _make_service(port) + + slow_gate = asyncio.Event() + + async def _slow_values() -> list[float]: + # Block until the test explicitly releases this handler. + await slow_gate.wait() + return [1.0, 2.0, 3.0] + + slow = MarstekMqttBinding( + device_id="slow-ct", + ct_type="HME-4", + mac="02b250111111", + get_values=_slow_values, + wifi_rssi=-50, + ) + fast, _ = _make_binding( + device_id="fast-ct", mac="02b250222222", values=[10.0, 20.0, 30.0] + ) + + await service.register_marstek(slow) + await service.register_marstek(fast) + await service.start() + + try: + await service.wait_connected() + await _poll(lambda: service._client is not None) + + async with aiomqtt.Client(hostname="127.0.0.1", port=port) as client: + await client.subscribe(f"hame_energy/HME-4/device/{fast.mac}/ctrl") + + # Trigger the slow poll first — its handler will block in get_values. + await client.publish( + f"hame_energy/HME-4/App/{slow.mac}/ctrl", payload=b"cd=1" + ) + # Immediately trigger the fast poll — if the listener were + # stalled, we'd never see its reply. + await client.publish( + f"hame_energy/HME-4/App/{fast.mac}/ctrl", payload=b"cd=1" + ) + received = [] + await _collect_messages( + client, received, timeout=5, stop=lambda _: len(received) >= 1 + ) + + assert len(received) == 1 + assert received[0].payload.startswith(b"pwr_a=10,pwr_b=20,pwr_c=30,pwr_t=60") + finally: + slow_gate.set() + await service.stop() diff --git a/src/astrameter/mqtt_insights/service.py b/src/astrameter/mqtt_insights/service.py index 7ca7cd73..8bef3053 100644 --- a/src/astrameter/mqtt_insights/service.py +++ b/src/astrameter/mqtt_insights/service.py @@ -104,6 +104,10 @@ def __init__(self, config: MqttInsightsConfig) -> None: # Rate-limit per-device get_values failure logging so a broken # powermeter doesn't flood the log at hm2mqtt's poll cadence. self._marstek_get_values_failed: set[str] = set() + # In-flight poll handlers — tracked so one slow powermeter doesn't + # block the listener loop, and so we can cancel pending tasks on + # reconnect / shutdown. + self._marstek_tasks: set[asyncio.Task[None]] = set() # ── Public API (called from device event listeners) ─────────────── @@ -293,6 +297,7 @@ async def _run(self) -> None: finally: async with self._marstek_lock: self._client = None + await self._cancel_marstek_tasks() except asyncio.CancelledError: self._connected.clear() @@ -687,12 +692,14 @@ def _handle_device_command(self, device_id: str, cmd: dict) -> None: async def _handle_marstek_message( self, client: aiomqtt.Client, message: aiomqtt.Message ) -> None: + """Dispatch a poll quickly; offload the response to a task so a + slow powermeter can't stall the listener loop.""" topic = str(message.topic) parsed = parse_app_topic(topic) if parsed is None: return ct_type, mac = parsed - binding = self._find_marstek_binding(ct_type, mac) + binding = await self._find_marstek_binding(ct_type, mac) if binding is None: logger.debug("Marstek MQTT: no binding for %s/%s", ct_type, mac) return @@ -702,6 +709,13 @@ async def _handle_marstek_message( logger.debug("Marstek MQTT: non-poll payload on %s", topic) return + task = asyncio.create_task(self._serve_marstek_poll(client, binding)) + self._marstek_tasks.add(task) + task.add_done_callback(self._marstek_tasks.discard) + + async def _serve_marstek_poll( + self, client: aiomqtt.Client, binding: MarstekMqttBinding + ) -> None: try: watts = await binding.get_values() except Exception: @@ -722,10 +736,23 @@ async def _handle_marstek_message( with contextlib.suppress(aiomqtt.MqttError): await client.publish(reply_topic, payload=payload, qos=0, retain=False) - def _find_marstek_binding( + async def _cancel_marstek_tasks(self) -> None: + pending = tuple(self._marstek_tasks) + for task in pending: + task.cancel() + if pending: + await asyncio.gather(*pending, return_exceptions=True) + self._marstek_tasks.clear() + + async def _find_marstek_binding( self, ct_type: str, mac: str ) -> MarstekMqttBinding | None: - for binding in self._marstek_bindings.values(): - if binding.ct_type == ct_type and binding.mac == mac.lower(): + # Snapshot under the lock so a concurrent (un)register can't mutate + # the dict mid-scan. + async with self._marstek_lock: + candidates = tuple(self._marstek_bindings.values()) + mac_lower = mac.lower() + for binding in candidates: + if binding.ct_type == ct_type and binding.mac == mac_lower: return binding return None From 245e16d05c0a4c318629a41b7d461901489f52d2 Mon Sep 17 00:00:00 2001 From: Tom Quist Date: Sat, 16 May 2026 01:12:29 +0200 Subject: [PATCH 3/7] Improve Marstek MQTT responder and decouple CT002 from wire format. Align cd=1/cd=4 payloads with HME-style parsing, propagate cloud ver_v into bindings, add optional MARSTEK_MQTT_INTERVAL for periodic broadcasts, and expose typed reporting_consumer_* data from CT002 while building slave CSV in the MQTT layer. --- config.ini.example | 5 + src/astrameter/config/config_loader.py | 7 + src/astrameter/ct002/__init__.py | 4 +- src/astrameter/ct002/ct002.py | 62 +++++- src/astrameter/main.py | 33 +++- src/astrameter/marstek_api.py | 1 + src/astrameter/mqtt_insights/__init__.py | 9 +- src/astrameter/mqtt_insights/marstek_mqtt.py | 169 +++++++++++++++-- .../mqtt_insights/marstek_mqtt_test.py | 106 ++++++++++- .../mqtt_insights/mqtt_insights_test.py | 178 +++++++++++++++++- src/astrameter/mqtt_insights/service.py | 100 ++++++++-- tests/test_ct002_protocol.py | 23 ++- 12 files changed, 636 insertions(+), 61 deletions(-) diff --git a/config.ini.example b/config.ini.example index 249b2743..4973aa6e 100644 --- a/config.ini.example +++ b/config.ini.example @@ -424,3 +424,8 @@ THROTTLE_INTERVAL = 0 ## working [MARSTEK] section: the MAC used in the MQTT topics is the ## managed fake device AstraMeter registers in the Marstek cloud. #MARSTEK_MQTT_ENABLED = true +## Periodic broadcast interval (seconds) for Marstek MQTT responses. +## Publishes power values for all registered bindings at this cadence +## so the Marstek app stays up-to-date without relying solely on its own polls. +## Set to 0 to disable periodic broadcasts (respond only to polls). +#MARSTEK_MQTT_INTERVAL = 300 diff --git a/src/astrameter/config/config_loader.py b/src/astrameter/config/config_loader.py index a742a755..9cfc3c17 100644 --- a/src/astrameter/config/config_loader.py +++ b/src/astrameter/config/config_loader.py @@ -735,5 +735,12 @@ def read_mqtt_insights_config( marstek_mqtt_enabled=config.getboolean( section, "MARSTEK_MQTT_ENABLED", fallback=True ), + marstek_mqtt_interval=float(raw_interval) + if ( + raw_interval := config.get( + section, "MARSTEK_MQTT_INTERVAL", fallback="" + ).strip() + ) + else 300, ) return None diff --git a/src/astrameter/ct002/__init__.py b/src/astrameter/ct002/__init__.py index b835ecfc..6903c484 100644 --- a/src/astrameter/ct002/__init__.py +++ b/src/astrameter/ct002/__init__.py @@ -1,3 +1,3 @@ -from .ct002 import CT002, UDP_PORT +from .ct002 import CT002, UDP_PORT, ReportingConsumerRow, ReportingPhase -__all__ = ["CT002", "UDP_PORT"] +__all__ = ["CT002", "UDP_PORT", "ReportingConsumerRow", "ReportingPhase"] diff --git a/src/astrameter/ct002/ct002.py b/src/astrameter/ct002/ct002.py index 9bb8cd0b..4ee1f0b4 100644 --- a/src/astrameter/ct002/ct002.py +++ b/src/astrameter/ct002/ct002.py @@ -7,7 +7,7 @@ import time from collections.abc import Awaitable, Callable from datetime import datetime, timezone -from typing import Any +from typing import Any, Literal, cast from astrameter.config.logger import logger from astrameter.request_dedupe import RequestDeduplicator @@ -41,6 +41,8 @@ "SOH", "STX", "UDP_PORT", + "ReportingConsumerRow", + "ReportingPhase", "build_payload", "calculate_checksum", "compute_length", @@ -75,6 +77,21 @@ class Consumer: manual_target: float = 0.0 manual_enabled: bool = False active: bool = True + # Last UDP source address seen for this consumer, if the protocol provides it. + last_ip: str = "" + + +ReportingPhase = Literal["a", "b", "c"] + + +@dataclasses.dataclass(frozen=True, slots=True) +class ReportingConsumerRow: + """One UDP-reporting consumer, for integrations that need a stable device list.""" + + device_type: str + consumer_id: str + last_ip: str + phase: ReportingPhase class _CT002Protocol(asyncio.DatagramProtocol): @@ -259,7 +276,15 @@ def _call_event_listener(self, consumer_id: str, data: dict[str, Any]) -> None: "event_listener failed for %s: %s", consumer_id, exc, exc_info=True ) - def _update_consumer_report(self, consumer_id, phase, power, device_type=""): + def _update_consumer_report( + self, + consumer_id, + phase, + power, + device_type="", + *, + source_ip: str | None = None, + ): normalized_phase = str(phase).upper() if phase else "A" consumer = self._get_consumer(consumer_id) previous_phase = consumer.phase if consumer.timestamp > 0 else None @@ -278,6 +303,8 @@ def _update_consumer_report(self, consumer_id, phase, power, device_type=""): consumer.power = parse_int(power, 0) consumer.timestamp = now consumer.device_type = device_type + if source_ip: + consumer.last_ip = source_ip if normalized_phase in ("A", "B", "C") and previous_phase != normalized_phase: if previous_phase in ("A", "B", "C"): @@ -373,6 +400,36 @@ def _collect_reports_by_phase(self): by_phase[phase]["dchrg_power"] += power return by_phase + def reporting_consumer_count(self) -> int: + """Number of consumers that have reported at least once over UDP.""" + return sum(1 for c in self._consumers.values() if c.timestamp > 0) + + def reporting_consumer_rows(self) -> tuple[ReportingConsumerRow, ...]: + """Stable-ordered view of reporting consumers for integrations. + + *phase* is normalized to ``a``/``b``/``c``; *last_ip* may be empty when unknown. + Rows follow sorted ``consumer_id`` so list position stays predictable. + """ + reporters = sorted( + (c for c in self._consumers.values() if c.timestamp > 0), + key=lambda c: c.consumer_id, + ) + out: list[ReportingConsumerRow] = [] + for c in reporters: + pu = (c.phase or "A").strip().lower() + if pu not in ("a", "b", "c"): + pu = "a" + host = c.last_ip.strip() if c.last_ip else "" + out.append( + ReportingConsumerRow( + device_type=(c.device_type or "").strip(), + consumer_id=c.consumer_id.strip(), + last_ip=host, + phase=cast(ReportingPhase, pu), + ) + ) + return tuple(out) + def _format_status(self, values, phase_values, consumer_id=None, meter_value=None): """Concise one-line status: phase consumption and consumer charge/discharge reports.""" if not values or len(values) != 3: @@ -579,6 +636,7 @@ async def _handle_request(self, data, addr, transport): phase=reported_phase if not in_inspection_mode else "A", power=reported_power, device_type=meter_dev_type, + source_ip=str(addr[0]), ) updated = await self._call_before_send(addr, fields, consumer_id) diff --git a/src/astrameter/main.py b/src/astrameter/main.py index 2dcd890e..480924ff 100644 --- a/src/astrameter/main.py +++ b/src/astrameter/main.py @@ -22,7 +22,9 @@ from astrameter.mqtt_insights import ( MarstekMqttBinding, MqttInsightsService, + format_cd4_slave_csv, normalize_mac, + ver_v_from_marstek_api_version, ) from astrameter.powermeter import Powermeter from astrameter.shelly import Shelly @@ -124,6 +126,7 @@ async def run_device( device_id: str | None = None, insights: MqttInsightsService | None = None, marstek_mac: str = "", + marstek_ver_v: int | None = None, ): logger.debug(f"Starting device: {device_type}") @@ -367,13 +370,24 @@ async def _marstek_get_values( vs = await chosen.get_powermeter_watts() return [float(vs[i]) if i < len(vs) else 0.0 for i in range(3)] + def _marstek_connected_slave_count() -> int: + return device.reporting_consumer_count() + + def _marstek_cd4_slave_csv() -> str: + return format_cd4_slave_csv(device.reporting_consumer_rows()) + await insights.register_marstek( MarstekMqttBinding( device_id=device_id or "", ct_type=device.ct_type, mac=marstek_mac, get_values=_marstek_get_values, + get_connected_slave_count=_marstek_connected_slave_count, + get_cd4_slave_csv=_marstek_cd4_slave_csv, wifi_rssi=device.wifi_rssi, + ver_v=marstek_ver_v + if marstek_ver_v is not None + else ver_v_from_marstek_api_version(None), ) ) else: @@ -404,9 +418,9 @@ async def async_main( device_types: list[str], device_ids: list[str], skip_test: bool, - managed_macs: dict[str, str] | None = None, + managed_marstek: dict[str, tuple[str, int]] | None = None, ): - managed_macs = managed_macs or {} + managed_marstek = managed_marstek or {} web_server = None if cfg.getboolean("GENERAL", "ENABLE_WEB_SERVER", fallback=True): logger.info("Starting web server...") @@ -466,7 +480,7 @@ async def async_main( powermeters, device_id, insights, - managed_macs.get(device_type, ""), + *managed_marstek.get(device_type, ("", None)), ) for device_type, device_id in zip( device_types, device_ids, strict=False @@ -628,7 +642,7 @@ def main(): # When registration succeeds, the returned MAC is captured per device # type so the Marstek MQTT responder in MQTT Insights uses the same # MAC that hame-relay will route back to the Marstek app. - managed_macs: dict[str, str] = {} + managed_marstek: dict[str, tuple[str, int]] = {} marstek_enabled = cfg.getboolean("MARSTEK", "ENABLE", fallback=False) if marstek_enabled: mailbox = cfg.get("MARSTEK", "MAILBOX", fallback="") @@ -656,7 +670,12 @@ def main(): if created is not None: normalized = normalize_mac(str(created.get("mac", ""))) if normalized: - managed_macs[dt] = normalized + managed_marstek[dt] = ( + normalized, + ver_v_from_marstek_api_version( + created.get("version") + ), + ) if any_ct: logger.info( "Managed fake CT registration completed. Fake CT devices appear as offline in the Marstek app CT list (this is expected)." @@ -705,7 +724,9 @@ def _restart_handler(signum, frame): restart_requested = False try: asyncio.run( - async_main(cfg, args, device_types, device_ids, skip_test, managed_macs) + async_main( + cfg, args, device_types, device_ids, skip_test, managed_marstek + ) ) break # clean exit except KeyboardInterrupt: diff --git a/src/astrameter/marstek_api.py b/src/astrameter/marstek_api.py index b7a96300..a77edf6c 100644 --- a/src/astrameter/marstek_api.py +++ b/src/astrameter/marstek_api.py @@ -204,6 +204,7 @@ def _add_device( "bluetooth_name": f"MST-SMR_{suffix}", "position": "{}", "timeZone": cfg.timezone, + "version": "121", } headers = { diff --git a/src/astrameter/mqtt_insights/__init__.py b/src/astrameter/mqtt_insights/__init__.py index 3d97ec30..4df09db5 100644 --- a/src/astrameter/mqtt_insights/__init__.py +++ b/src/astrameter/mqtt_insights/__init__.py @@ -1,11 +1,18 @@ """MQTT Insights — publish internal state to MQTT with HA Discovery.""" -from .marstek_mqtt import MarstekMqttBinding, normalize_mac +from .marstek_mqtt import ( + MarstekMqttBinding, + format_cd4_slave_csv, + normalize_mac, + ver_v_from_marstek_api_version, +) from .service import MqttInsightsConfig, MqttInsightsService __all__ = [ "MarstekMqttBinding", "MqttInsightsConfig", "MqttInsightsService", + "format_cd4_slave_csv", "normalize_mac", + "ver_v_from_marstek_api_version", ] diff --git a/src/astrameter/mqtt_insights/marstek_mqtt.py b/src/astrameter/mqtt_insights/marstek_mqtt.py index 39a44c30..08b8aa04 100644 --- a/src/astrameter/mqtt_insights/marstek_mqtt.py +++ b/src/astrameter/mqtt_insights/marstek_mqtt.py @@ -3,17 +3,21 @@ Pure helpers (topic formatting, payload building, poll detection) plus a binding dataclass that the :class:`MqttInsightsService` stores per device. -The wire protocol (topics + CSV key=value payload) matches what the real -Marstek CT002 uses against the Marstek cloud broker. When hame-relay is -bridging the local broker to the cloud, responses produced here reach the -Marstek mobile app as if they came from a real CT002. +Wire format (UTF-8): the Marstek app typically parses payloads with +``replaceAll(' ', '')``, then ``split(',')``, then each token ``split('=')`` +expecting **exactly one** ``=`` per token. Replies to ``cd=1`` / ``cd=4`` polls +omit a ``cd=`` echo; ``cd=4`` slave lists use flat ``slv_t/…/slv_p`` tokens only. +Aggregate replies include power, ``slv_n``, optional extras, and kWh placeholders. """ from __future__ import annotations import re -from collections.abc import Awaitable, Callable +from collections.abc import Awaitable, Callable, Sequence from dataclasses import dataclass +from typing import Any + +from astrameter.ct002 import ReportingConsumerRow APP_TOPIC_TEMPLATES = ( "hame_energy/{ct_type}/App/{mac}/ctrl", @@ -27,6 +31,10 @@ # Matches observed real-device values; included so hm2mqtt-style parsers # recognise the message as a well-formed runtime-info frame. DEFAULT_VER_V = 148 +# Observed-style firmware/build stamp for ``fc4_v`` (string-ish numeric). +DEFAULT_FC4_V = "202409090159" +# Placeholder kWh fields (two decimal places; app may parse as float or centi-units). +DEFAULT_CD1_KWH = (0.0, 0.0, 0.0, 0.0) _APP_TOPIC_RE = re.compile( r"^(?:hame|marstek)_energy/(?P[^/]+)/App/(?P[^/]+)/ctrl$" @@ -34,6 +42,29 @@ _MAC_HEX_RE = re.compile(r"^[0-9a-f]{12}$") +def ver_v_from_marstek_api_version(value: Any) -> int: + """Turn EMS/device-list ``version`` into MQTT ``ver_v`` (integer firmware-style field). + + The cloud list sometimes omits or changes shape; keep the wire value aligned + when present so app-side parsers see a consistent ``ver_v`` with the API record. + """ + if isinstance(value, bool): + return DEFAULT_VER_V + if isinstance(value, int): + return value + if isinstance(value, float) and value == int(value): + return int(value) + if isinstance(value, str): + s = value.strip() + if not s: + return DEFAULT_VER_V + try: + return int(s) + except ValueError: + return DEFAULT_VER_V + return DEFAULT_VER_V + + @dataclass(frozen=True) class MarstekMqttBinding: """Per-device registration used by the MQTT Insights service.""" @@ -44,6 +75,10 @@ class MarstekMqttBinding: get_values: Callable[[], Awaitable[list[float]]] wifi_rssi: int ver_v: int = DEFAULT_VER_V + get_connected_slave_count: Callable[[], int] | None = None + get_cd4_slave_csv: Callable[[], str] | None = None + ble_s: int = 0 + fc4_v: str = DEFAULT_FC4_V def normalize_mac(raw: str) -> str: @@ -54,21 +89,67 @@ def normalize_mac(raw: str) -> str: return cleaned if _MAC_HEX_RE.fullmatch(cleaned) else "" -def is_poll_payload(body: bytes) -> bool: - """Return True iff *body* is a CSV containing ``cd=1``.""" +def _parse_ctrl_kv(body: bytes) -> dict[str, str] | None: + """Decode *body* as UTF-8 CSV ``k=v`` pairs; keys lowercased. ``None`` if invalid. + + Matches a naive split like the app's outer pass (after stripping spaces). Any + value that must contain ``,`` or ``=`` cannot round-trip here; ``cd=4`` slave + lists are emitted as **flat** repeated ``slv_*`` tokens instead. + """ if not body: - return False + return None try: text = body.decode("utf-8") except UnicodeDecodeError: - return False + return None + out: dict[str, str] = {} for chunk in text.split(","): key, sep, value = chunk.partition("=") if not sep: continue - if key.strip().lower() == "cd" and value.strip() == "1": - return True - return False + k = key.strip().lower() + if k: + out[k] = value.strip() + return out + + +@dataclass(frozen=True) +class MarstekPollContext: + """How to answer an App/ctrl runtime-info request (aggregate vs slave list).""" + + echo_cd: int + slave_id: int | None = None + + +def parse_marstek_poll_payload(body: bytes) -> MarstekPollContext | None: + """Recognise Marstek poll requests: ``cd=1`` (aggregate) or ``cd=4`` + ``p1`` (slave list). + + ``cd=4`` without ``p1`` is ignored so we do not invent a selector the app + did not send. + """ + kv = _parse_ctrl_kv(body) + if kv is None or "cd" not in kv: + return None + try: + cd = int(kv["cd"], 10) + except ValueError: + return None + if cd == 1: + return MarstekPollContext(echo_cd=1, slave_id=None) + if cd == 4: + if "p1" not in kv: + return None + try: + slave_id = int(kv["p1"], 10) + except ValueError: + return None + return MarstekPollContext(echo_cd=4, slave_id=slave_id) + return None + + +def is_poll_payload(body: bytes) -> bool: + """Return True iff *body* requests runtime info (``cd=1`` or ``cd=4`` with ``p1``).""" + return parse_marstek_poll_payload(body) is not None def parse_app_topic(topic: str) -> tuple[str, str] | None: @@ -95,13 +176,69 @@ def device_topics_for(binding: MarstekMqttBinding) -> tuple[str, str]: ) -def build_response(binding: MarstekMqttBinding, watts: list[float]) -> bytes: - """Build the CSV ``k=v`` response body for a runtime-info frame.""" +def _fmt_kwh(x: float) -> str: + return f"{x:.2f}" + + +def _cd4_escape_field(value: str) -> str: + # Marstek-style outer parse splits on commas; each token must have a single "=". + return value.replace(",", "_").replace(";", "_").replace("=", "_") + + +def format_cd4_slave_csv(rows: Sequence[ReportingConsumerRow]) -> str: + """CSV body for a ``cd=4`` reply: repeated ``slv_t/slv_id/slv_ip/slv_p`` tokens. + + *rows* come from :meth:`astrameter.ct002.ct002.CT002.reporting_consumer_rows`. + """ + if not rows: + return "" + parts: list[str] = [] + for row in rows: + host = row.last_ip.strip() or "0.0.0.0" + parts.append( + f"slv_t={_cd4_escape_field(row.device_type)},slv_id={_cd4_escape_field(row.consumer_id)}," + f"slv_ip={_cd4_escape_field(host)},slv_p={row.phase}" + ) + return ",".join(parts) + + +def build_cd4_response(slave_kv_tail: str) -> bytes: + """Slave list reply (no ``cd=`` echo): flat ``slv_t/…/slv_p`` tokens only.""" + return slave_kv_tail.encode() + + +def build_response( + binding: MarstekMqttBinding, + watts: list[float], + *, + poll: MarstekPollContext | None = None, + connected_slave_count: int = 0, + kwh_fields: tuple[float, float, float, float] | None = None, +) -> bytes: + """Build the CSV ``k=v`` body for aggregate power/status (or legacy core only). + + When *poll* has ``echo_cd == 1``, emit the extended runtime frame **without** a + ``cd=`` key (the app already knows the poll kind). Order: phase powers, + ``wif_s`` before RSSI/version, ``slv_n``, ``cur_d=0``, ``ble_s``, ``fc4_v``, kWh placeholders. + """ vs = list(watts) + [0.0] * (3 - len(watts)) a, b, c = (round(v) for v in vs[:3]) total = a + b + c - payload = ( + core = ( f"pwr_a={a},pwr_b={b},pwr_c={c},pwr_t={total}," f"wif_r={binding.wifi_rssi},ver_v={binding.ver_v},wif_s=2" ) - return payload.encode("utf-8") + k0, k1, k2, k3 = kwh_fields if kwh_fields is not None else DEFAULT_CD1_KWH + cd1_tail = ( + f"ble_s={binding.ble_s},fc4_v={binding.fc4_v}," + f"kwh={_fmt_kwh(k0)},n_kwh={_fmt_kwh(k1)},used_kwh={_fmt_kwh(k2)},fed_kwh={_fmt_kwh(k3)}" + ) + if poll is not None and poll.echo_cd == 1: + payload = ( + f"pwr_a={a},pwr_b={b},pwr_c={c},pwr_t={total},wif_s=2," + f"wif_r={binding.wifi_rssi},ver_v={binding.ver_v},slv_n={connected_slave_count},cur_d=0," + f"{cd1_tail}" + ) + else: + payload = core + return payload.encode() diff --git a/src/astrameter/mqtt_insights/marstek_mqtt_test.py b/src/astrameter/mqtt_insights/marstek_mqtt_test.py index bafc39fe..a4c2a078 100644 --- a/src/astrameter/mqtt_insights/marstek_mqtt_test.py +++ b/src/astrameter/mqtt_insights/marstek_mqtt_test.py @@ -4,15 +4,22 @@ import pytest +from astrameter.ct002 import ReportingConsumerRow + from .marstek_mqtt import ( DEFAULT_VER_V, MarstekMqttBinding, + MarstekPollContext, app_topics_for, + build_cd4_response, build_response, device_topics_for, + format_cd4_slave_csv, is_poll_payload, normalize_mac, parse_app_topic, + parse_marstek_poll_payload, + ver_v_from_marstek_api_version, ) @@ -56,32 +63,60 @@ def test_normalize_mac_rejects_invalid(raw: str) -> None: assert normalize_mac(raw) == "" -# ── is_poll_payload ─────────────────────────────────────────────────────── +# ── parse_marstek_poll_payload / is_poll_payload ────────────────────────── @pytest.mark.parametrize( "body", [ b"cd=1", + b"cd=01", b"cd=1,foo=bar", + b"cd=01,foo=bar", b"foo=bar,cd=1", + b"foo=bar,cd=01", b" cd = 1 ", + b" cd = 01 ", b"cd=1\n", + b"cd=01\n", b"CD=1", + b"CD=01", + b"cd=4,p1=0", + b"p1=2,cd=4", + b"cd=04,p1=00", ], ) -def test_is_poll_payload_accepts_cd1(body: bytes) -> None: +def test_is_poll_payload_accepts_poll_requests(body: bytes) -> None: assert is_poll_payload(body) is True @pytest.mark.parametrize( "body", - [b"", b"cd=0", b"cd=", b"garbage", b"pwr_a=1", b"\xff\xfe"], + [ + b"", + b"cd=0", + b"cd=", + b"cd=4", + b"cd=4,foo=1", + b"cd=4,p1=", + b"cd=4,p1=abc", + b"garbage", + b"pwr_a=1", + b"\xff\xfe", + ], ) -def test_is_poll_payload_rejects_non_cd1(body: bytes) -> None: +def test_is_poll_payload_rejects_non_poll(body: bytes) -> None: assert is_poll_payload(body) is False +def test_parse_marstek_poll_payload_cd1_and_cd4() -> None: + assert parse_marstek_poll_payload(b"cd=1") == MarstekPollContext(1, None) + assert parse_marstek_poll_payload(b"cd=4,p1=0") == MarstekPollContext(4, 0) + assert parse_marstek_poll_payload(b"p1=3,cd=4") == MarstekPollContext(4, 3) + assert parse_marstek_poll_payload(b"cd=4") is None + assert parse_marstek_poll_payload(b"cd=2,p1=0") is None + + # ── parse_app_topic ─────────────────────────────────────────────────────── @@ -189,3 +224,66 @@ async def _g() -> list[float]: ) body = build_response(b, [0.0, 0.0, 0.0]) assert b"ver_v=200" in body + + +def test_build_response_aggregate_cd1_matches_hme_shape() -> None: + b = _binding(wifi_rssi=-50) + poll = MarstekPollContext(echo_cd=1, slave_id=None) + body = build_response(b, [100.0, 200.0, 300.0], poll=poll, connected_slave_count=2) + assert body == ( + b"pwr_a=100,pwr_b=200,pwr_c=300,pwr_t=600,wif_s=2,wif_r=-50,ver_v=148,slv_n=2,cur_d=0," + b"ble_s=0,fc4_v=202409090159,kwh=0.00,n_kwh=0.00,used_kwh=0.00,fed_kwh=0.00" + ) + + +def test_format_cd4_slave_csv_empty() -> None: + assert format_cd4_slave_csv(()) == "" + + +def test_format_cd4_slave_csv_matches_joined_tokens() -> None: + rows = ( + ReportingConsumerRow("HME-4", "a-mac", "192.168.1.50", "a"), + ReportingConsumerRow("HMA-2", "z-mac", "192.168.1.51", "c"), + ) + assert format_cd4_slave_csv(rows) == ( + "slv_t=HME-4,slv_id=a-mac,slv_ip=192.168.1.50,slv_p=a," + "slv_t=HMA-2,slv_id=z-mac,slv_ip=192.168.1.51,slv_p=c" + ) + + +def test_format_cd4_slave_csv_empty_ip_becomes_zero() -> None: + assert ( + format_cd4_slave_csv((ReportingConsumerRow("T", "id", "", "a"),)) + == "slv_t=T,slv_id=id,slv_ip=0.0.0.0,slv_p=a" + ) + + +def test_format_cd4_slave_csv_escapes_specials() -> None: + s = format_cd4_slave_csv((ReportingConsumerRow("a,b", "x=y", "1.2.3.4", "b"),)) + assert s == "slv_t=a_b,slv_id=x_y,slv_ip=1.2.3.4,slv_p=b" + + +def test_build_cd4_response_flat_tokens() -> None: + tail = "slv_t=HMA-2,slv_id=aabbccddeeff,slv_ip=192.168.1.50,slv_p=a" + assert build_cd4_response(tail) == tail.encode() + + +def test_build_cd4_response_empty_slave_list() -> None: + assert build_cd4_response("") == b"" + + +@pytest.mark.parametrize( + "raw,expected", + [ + (None, DEFAULT_VER_V), + ("", DEFAULT_VER_V), + (" ", DEFAULT_VER_V), + ("148", 148), + (" 121 ", 121), + (148, 148), + (True, DEFAULT_VER_V), + ("not-a-number", DEFAULT_VER_V), + ], +) +def test_ver_v_from_marstek_api_version(raw: object, expected: int) -> None: + assert ver_v_from_marstek_api_version(raw) == expected diff --git a/src/astrameter/mqtt_insights/mqtt_insights_test.py b/src/astrameter/mqtt_insights/mqtt_insights_test.py index c4611f5b..54bc9228 100644 --- a/src/astrameter/mqtt_insights/mqtt_insights_test.py +++ b/src/astrameter/mqtt_insights/mqtt_insights_test.py @@ -405,6 +405,30 @@ def test_read_mqtt_insights_config_marstek_mqtt_opt_out(): assert result.marstek_mqtt_enabled is False +def test_read_mqtt_insights_config_marstek_mqtt_interval_default(): + cfg = configparser.ConfigParser() + cfg.read_string("[MQTT_INSIGHTS]\nBROKER = localhost\n") + result = read_mqtt_insights_config(cfg) + assert result is not None + assert result.marstek_mqtt_interval == 300 + + +def test_read_mqtt_insights_config_marstek_mqtt_interval_custom(): + cfg = configparser.ConfigParser() + cfg.read_string("[MQTT_INSIGHTS]\nBROKER = localhost\nMARSTEK_MQTT_INTERVAL = 60\n") + result = read_mqtt_insights_config(cfg) + assert result is not None + assert result.marstek_mqtt_interval == 60 + + +def test_read_mqtt_insights_config_marstek_mqtt_interval_zero(): + cfg = configparser.ConfigParser() + cfg.read_string("[MQTT_INSIGHTS]\nBROKER = localhost\nMARSTEK_MQTT_INTERVAL = 0\n") + result = read_mqtt_insights_config(cfg) + assert result is not None + assert result.marstek_mqtt_interval == 0 + + # ── Service unit tests (no broker) ─────────────────────────────────────── @@ -843,6 +867,11 @@ def test_consumer_state_includes_manual_target_fields(): # ── Marstek MQTT responder tests ───────────────────────────────────────── +_MARSTEK_CD1_FULL = ( + b"pwr_a=100,pwr_b=200,pwr_c=300,pwr_t=600,wif_s=2,wif_r=-50,ver_v=148,slv_n=0,cur_d=0," + b"ble_s=0,fc4_v=202409090159,kwh=0.00,n_kwh=0.00,used_kwh=0.00,fed_kwh=0.00" +) + def _make_binding( *, @@ -852,16 +881,27 @@ def _make_binding( wifi_rssi: int = -50, values: list[float] | None = None, raises: BaseException | None = None, -) -> tuple[MarstekMqttBinding, list[tuple[str, ...]]]: - calls: list[tuple[str, ...]] = [] + cd4_csv: str | None = None, +) -> tuple[MarstekMqttBinding, list[tuple[object, ...]]]: + calls: list[tuple[object, ...]] = [] vs = [100.0, 200.0, 300.0] if values is None else values async def _get() -> list[float]: - calls.append(("called",)) + calls.append(("meter",)) if raises is not None: raise raises return list(vs) + if cd4_csv is None: + get_cd4_fn = None + else: + + def _cd4() -> str: + calls.append(("cd4",)) + return cd4_csv + + get_cd4_fn = _cd4 + return ( MarstekMqttBinding( device_id=device_id, @@ -869,6 +909,7 @@ async def _get() -> list[float]: mac=mac, get_values=_get, wifi_rssi=wifi_rssi, + get_cd4_slave_csv=get_cd4_fn, ), calls, ) @@ -929,9 +970,7 @@ async def test_marstek_poll_responds_on_both_topics(mqtt_broker): f"hame_energy/HME-4/device/{binding.mac}/ctrl", f"marstek_energy/HME-4/device/{binding.mac}/ctrl", ] - expected = ( - b"pwr_a=100,pwr_b=200,pwr_c=300,pwr_t=600,wif_r=-50,ver_v=148,wif_s=2" - ) + expected = _MARSTEK_CD1_FULL for msg in received: assert msg.payload == expected assert len(calls) == 1 @@ -939,6 +978,46 @@ async def test_marstek_poll_responds_on_both_topics(mqtt_broker): await service.stop() +@needs_mosquitto +async def test_marstek_poll_cd4_responds_with_slave_list(mqtt_broker): + port = mqtt_broker + service = _make_service(port) + inner = ( + "slv_t=HME-4,slv_id=bat-a,slv_ip=192.168.1.50,slv_p=a," + "slv_t=HMA-2,slv_id=bat-b,slv_ip=192.168.1.51,slv_p=b" + ) + binding, calls = _make_binding( + values=[100.0, 200.0, 300.0], + cd4_csv=inner, + ) + await service.register_marstek(binding) + await service.start() + + try: + await service.wait_connected() + await _poll(lambda: service._client is not None) + + received = [] + async with aiomqtt.Client(hostname="127.0.0.1", port=port) as client: + await client.subscribe(f"hame_energy/HME-4/device/{binding.mac}/ctrl") + await client.subscribe(f"marstek_energy/HME-4/device/{binding.mac}/ctrl") + await client.publish( + f"hame_energy/HME-4/App/{binding.mac}/ctrl", + payload=b"cd=4,p1=0", + ) + await _collect_messages( + client, received, timeout=5, stop=lambda _: len(received) >= 2 + ) + + assert len(received) == 2 + expected = inner.encode() + for msg in received: + assert msg.payload == expected + assert calls == [("cd4",)] + finally: + await service.stop() + + @needs_mosquitto async def test_marstek_ignores_non_poll_payload(mqtt_broker): port = mqtt_broker @@ -1059,7 +1138,7 @@ async def test_marstek_get_values_failure_suppressed(mqtt_broker): assert received == [] # get_values was called but no reply was published await _poll(lambda: binding.device_id in service._marstek_get_values_failed) - assert calls == [("called",)] + assert calls == [("meter",)] finally: await service.stop() @@ -1090,8 +1169,87 @@ async def test_marstek_register_before_start_subscribes_on_connect(mqtt_broker): ) assert len(received) == 1 assert received[0].payload.startswith( - b"pwr_a=100,pwr_b=200,pwr_c=300,pwr_t=600" + b"pwr_a=100,pwr_b=200,pwr_c=300,pwr_t=600,wif_s=2,wif_r=-50,ver_v=148,slv_n=0,cur_d=0," + ) + finally: + await service.stop() + + +@needs_mosquitto +async def test_marstek_periodic_broadcast(mqtt_broker): + """When marstek_mqtt_interval > 0, responses are published periodically + without requiring a poll request from the app.""" + port = mqtt_broker + global _test_counter + _test_counter += 1 + service = MqttInsightsService( + MqttInsightsConfig( + broker="127.0.0.1", + port=port, + base_topic=f"test_insights_{_test_counter}", + ha_discovery=True, + ha_discovery_prefix=f"ha_disc_{_test_counter}", + marstek_mqtt_interval=0.2, ) + ) + binding, calls = _make_binding(values=[100.0, 200.0, 300.0]) + await service.register_marstek(binding) + + async with aiomqtt.Client(hostname="127.0.0.1", port=port) as sub: + await sub.subscribe(f"hame_energy/HME-4/device/{binding.mac}/ctrl") + await sub.subscribe(f"marstek_energy/HME-4/device/{binding.mac}/ctrl") + + await service.start() + try: + await service.wait_connected() + + received: list[aiomqtt.Message] = [] + # At least 2 broadcast rounds -> 4 messages (2 topics x 2 rounds) + await _collect_messages( + sub, received, timeout=5, stop=lambda _: len(received) >= 4 + ) + + assert len(received) >= 4 + expected = _MARSTEK_CD1_FULL + for msg in received: + assert msg.payload == expected + assert len(calls) >= 2 + finally: + await service.stop() + + +@needs_mosquitto +async def test_marstek_broadcast_disabled_when_interval_zero(mqtt_broker): + """marstek_mqtt_interval=0 disables the periodic broadcast loop; only + explicit poll requests trigger a response.""" + port = mqtt_broker + global _test_counter + _test_counter += 1 + service = MqttInsightsService( + MqttInsightsConfig( + broker="127.0.0.1", + port=port, + base_topic=f"test_insights_{_test_counter}", + ha_discovery=True, + ha_discovery_prefix=f"ha_disc_{_test_counter}", + marstek_mqtt_interval=0, + ) + ) + binding, calls = _make_binding(values=[100.0, 200.0, 300.0]) + await service.register_marstek(binding) + await service.start() + + try: + await service.wait_connected() + await _poll(lambda: service._client is not None) + + received: list[aiomqtt.Message] = [] + async with aiomqtt.Client(hostname="127.0.0.1", port=port) as sub: + await sub.subscribe(f"hame_energy/HME-4/device/{binding.mac}/ctrl") + await _collect_messages(sub, received, timeout=1) + + assert received == [] + assert calls == [] finally: await service.stop() @@ -1150,7 +1308,9 @@ async def _slow_values() -> list[float]: ) assert len(received) == 1 - assert received[0].payload.startswith(b"pwr_a=10,pwr_b=20,pwr_c=30,pwr_t=60") + assert received[0].payload.startswith( + b"pwr_a=10,pwr_b=20,pwr_c=30,pwr_t=60,wif_s=2,wif_r=-50,ver_v=148,slv_n=0,cur_d=0," + ) finally: slow_gate.set() await service.stop() diff --git a/src/astrameter/mqtt_insights/service.py b/src/astrameter/mqtt_insights/service.py index 8bef3053..4a4f834c 100644 --- a/src/astrameter/mqtt_insights/service.py +++ b/src/astrameter/mqtt_insights/service.py @@ -24,11 +24,13 @@ ) from .marstek_mqtt import ( MarstekMqttBinding, + MarstekPollContext, app_topics_for, + build_cd4_response, build_response, device_topics_for, - is_poll_payload, parse_app_topic, + parse_marstek_poll_payload, ) RECONNECT_DELAY = 5 @@ -72,6 +74,10 @@ class MqttInsightsConfig: # emulator in the Marstek app. Default on; requires [MARSTEK] # credentials so that the managed MAC matches the cloud device. marstek_mqtt_enabled: bool = True + # Periodic broadcast interval (seconds). When > 0 and marstek_mqtt_enabled, + # publish power values for every registered binding at this cadence so the + # Marstek app stays up-to-date without relying solely on its own polls. + marstek_mqtt_interval: float = 1 @dataclass @@ -289,11 +295,13 @@ async def _run(self) -> None: self._connected.set() try: - # Run publish loop and message listener concurrently - await asyncio.gather( + coros: list[Any] = [ self._publish_loop(client), self._listen_commands(client), - ) + ] + if cfg.marstek_mqtt_enabled and cfg.marstek_mqtt_interval > 0: + coros.append(self._marstek_broadcast_loop(client)) + await asyncio.gather(*coros) finally: async with self._marstek_lock: self._client = None @@ -689,6 +697,22 @@ def _handle_device_command(self, device_id: str, cmd: dict) -> None: else: logger.debug("No rotation handler for device %s", device_id) + async def _marstek_broadcast_loop(self, client: aiomqtt.Client) -> None: + """Periodically publish power values for all registered bindings.""" + interval = self._config.marstek_mqtt_interval + while True: + async with self._marstek_lock: + bindings = tuple(self._marstek_bindings.values()) + for binding in bindings: + task = asyncio.create_task( + self._serve_marstek_poll( + client, binding, MarstekPollContext(echo_cd=1, slave_id=None) + ) + ) + self._marstek_tasks.add(task) + task.add_done_callback(self._marstek_tasks.discard) + await asyncio.sleep(interval) + async def _handle_marstek_message( self, client: aiomqtt.Client, message: aiomqtt.Message ) -> None: @@ -705,33 +729,69 @@ async def _handle_marstek_message( return body = message.payload if isinstance(message.payload, bytes) else b"" - if not is_poll_payload(body): + poll = parse_marstek_poll_payload(body) + if poll is None: logger.debug("Marstek MQTT: non-poll payload on %s", topic) return - task = asyncio.create_task(self._serve_marstek_poll(client, binding)) + task = asyncio.create_task(self._serve_marstek_poll(client, binding, poll)) self._marstek_tasks.add(task) task.add_done_callback(self._marstek_tasks.discard) async def _serve_marstek_poll( - self, client: aiomqtt.Client, binding: MarstekMqttBinding + self, + client: aiomqtt.Client, + binding: MarstekMqttBinding, + poll: MarstekPollContext, ) -> None: - try: - watts = await binding.get_values() - except Exception: - if binding.device_id not in self._marstek_get_values_failed: - logger.exception( - "Marstek MQTT: get_values failed for %s; suppressing " - "further failures until values recover", + if poll.echo_cd == 4: + try: + if binding.get_cd4_slave_csv is None: + slv = "" + else: + slv = binding.get_cd4_slave_csv() + payload = build_cd4_response(slv) + except Exception: + if binding.device_id not in self._marstek_get_values_failed: + logger.exception( + "Marstek MQTT: cd=4 slave list failed for %s; suppressing " + "further failures until recovery", + binding.device_id, + ) + self._marstek_get_values_failed.add(binding.device_id) + return + if binding.device_id in self._marstek_get_values_failed: + logger.info( + "Marstek MQTT: poll value fetch recovered for %s", binding.device_id, ) - self._marstek_get_values_failed.add(binding.device_id) - return - if binding.device_id in self._marstek_get_values_failed: - logger.info("Marstek MQTT: get_values recovered for %s", binding.device_id) - self._marstek_get_values_failed.discard(binding.device_id) + self._marstek_get_values_failed.discard(binding.device_id) + else: + try: + watts = await binding.get_values() + except Exception: + if binding.device_id not in self._marstek_get_values_failed: + logger.exception( + "Marstek MQTT: poll value fetch failed for %s; suppressing " + "further failures until values recover", + binding.device_id, + ) + self._marstek_get_values_failed.add(binding.device_id) + return + if binding.device_id in self._marstek_get_values_failed: + logger.info( + "Marstek MQTT: poll value fetch recovered for %s", + binding.device_id, + ) + self._marstek_get_values_failed.discard(binding.device_id) + + n_slaves = 0 + if binding.get_connected_slave_count is not None: + n_slaves = binding.get_connected_slave_count() + payload = build_response( + binding, list(watts), poll=poll, connected_slave_count=n_slaves + ) - payload = build_response(binding, list(watts)) for reply_topic in device_topics_for(binding): with contextlib.suppress(aiomqtt.MqttError): await client.publish(reply_topic, payload=payload, qos=0, retain=False) diff --git a/tests/test_ct002_protocol.py b/tests/test_ct002_protocol.py index df846813..b2b4c833 100644 --- a/tests/test_ct002_protocol.py +++ b/tests/test_ct002_protocol.py @@ -1,6 +1,6 @@ import logging -from astrameter.ct002.ct002 import CT002 +from astrameter.ct002 import CT002, ReportingConsumerRow from astrameter.ct002.protocol import ( ETX, RESPONSE_LABELS, @@ -74,6 +74,27 @@ def test_ct002_response_field_count_stable(): assert len(response_fields) == len(RESPONSE_LABELS) +def test_reporting_consumer_count() -> None: + device = CT002() + assert device.reporting_consumer_count() == 0 + device._update_consumer_report("a", "A", 1) + device._update_consumer_report("b", "B", -2) + assert device.reporting_consumer_count() == 2 + + +def test_reporting_consumer_rows_order_and_shape() -> None: + device = CT002() + assert device.reporting_consumer_rows() == () + + device._update_consumer_report("z-mac", "C", 1, "HMA-2", source_ip="192.168.1.51") + device._update_consumer_report("a-mac", "A", 2, "HME-4", source_ip="192.168.1.50") + rows = device.reporting_consumer_rows() + assert rows == ( + ReportingConsumerRow("HME-4", "a-mac", "192.168.1.50", "a"), + ReportingConsumerRow("HMA-2", "z-mac", "192.168.1.51", "c"), + ) + + def test_ct002_relays_sum_of_all_storage_reports_by_phase(): device = CT002() request_fields = ["HMG-50", "AABBCCDDEEFF", "HME-4", "112233445566", "B", "-100"] From 536d8babb4260ba34c70465ac65d892a85226015 Mon Sep 17 00:00:00 2001 From: Tom Quist Date: Sat, 16 May 2026 01:18:11 +0200 Subject: [PATCH 4/7] Changelog --- CHANGELOG.md | 1 + src/astrameter/main.py | 4 ++-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 82479dad..6b2c9c7f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ ### Added - **Added** CT002/CT003 emulation for steering multiple Marstek storage devices over the Marstek CT UDP protocol. Active control is on by default (`ACTIVE_CONTROL = True`): the emulator smooths the grid reading, splits the target across batteries with a 15 W `BALANCE_DEADBAND`, and runs time-weighted saturation detection with handoff — set `ACTIVE_CONTROL = False` for relay mode (raw meter values forwarded, batteries decide). Includes fair-share balancing (`FAIR_DISTRIBUTION`, `BALANCE_GAIN`), manual target override and forced rotation via MQTT, ARP-based consumer discovery, and an opt-in efficiency mode that concentrates power on fewer batteries at low demand (`MIN_EFFICIENT_POWER`, `EFFICIENCY_ROTATION_INTERVAL`, probe-based fades, `SATURATION_GRACE_SECONDS`) ([#283](https://github.com/tomquist/astrameter/pull/283), [#284](https://github.com/tomquist/astrameter/pull/284), [#287](https://github.com/tomquist/astrameter/pull/287), [#289](https://github.com/tomquist/astrameter/pull/289), [#291](https://github.com/tomquist/astrameter/pull/291), [#293](https://github.com/tomquist/astrameter/pull/293), [#294](https://github.com/tomquist/astrameter/pull/294), [#296](https://github.com/tomquist/astrameter/pull/296), [#298](https://github.com/tomquist/astrameter/pull/298), [#301](https://github.com/tomquist/astrameter/pull/301), [#303](https://github.com/tomquist/astrameter/pull/303), [#310](https://github.com/tomquist/astrameter/pull/310), [#311](https://github.com/tomquist/astrameter/pull/311), [#320](https://github.com/tomquist/astrameter/pull/320), [#321](https://github.com/tomquist/astrameter/pull/321)). - **Added** MQTT Insights: optional `[MQTT_INSIGHTS]` section publishes internal state (grid power, targets, saturation, consumer topology, EMA poll interval) to MQTT with Home Assistant Device Discovery; per-consumer active/pause + manual target control; Shelly battery offline availability; auto-configured in the HA app when Mosquitto is installed ([#292](https://github.com/tomquist/astrameter/pull/292), [#294](https://github.com/tomquist/astrameter/pull/294), [#297](https://github.com/tomquist/astrameter/pull/297), [#300](https://github.com/tomquist/astrameter/pull/300), [#306](https://github.com/tomquist/astrameter/pull/306)). +- **Added** Marstek MQTT responder inside MQTT Insights: when `[MARSTEK]` credentials are configured, AstraMeter answers the Marstek CT002/CT003 poll protocol on the MQTT broker using the managed cloud MAC, so combined with [hame-relay](https://github.com/tomquist/hame-relay) the emulator's readings appear as a CT002/CT003 in the Marstek mobile app. Enabled by default; set `MARSTEK_MQTT_ENABLED = false` in `[MQTT_INSIGHTS]` to opt out. - **Added** opt-in web-based configuration editor (`WEB_CONFIG_ENABLED = True` in `[GENERAL]`) accessible at `http://:52500/config`; supports editing all config sections and keys with type-appropriate inputs, comment preservation, and a Save & Restart button ([#319](https://github.com/tomquist/astrameter/pull/319)). - **Added** HomeWizard P1 powermeter via the device WebSocket API, with optional `VERIFY_SSL` ([#231](https://github.com/tomquist/astrameter/pull/231), [#254](https://github.com/tomquist/astrameter/pull/254)). - **Added** Enphase IQ Gateway (Envoy) powermeter via the local HTTPS `production.json` API, with optional Enlighten-cloud token acquisition and automatic refresh on 401, and auto-detection of single- vs three-phase readings ([#245](https://github.com/tomquist/astrameter/pull/245)). diff --git a/src/astrameter/main.py b/src/astrameter/main.py index 480924ff..3b4084f3 100644 --- a/src/astrameter/main.py +++ b/src/astrameter/main.py @@ -357,10 +357,10 @@ def _shelly_event_listener(dev_id, battery_ip, data): if marstek_mac: async def _marstek_get_values( - _pms: list[tuple[Powermeter, ClientFilter]] = powermeters, + _pms: list[tuple[Powermeter, ClientFilter, bool]] = powermeters, ) -> list[float]: chosen: Powermeter | None = next( - (pm for pm, cf in _pms if cf.matches("0.0.0.0")), None + (pm for pm, cf, _ in _pms if cf.matches("0.0.0.0")), None ) if chosen is None and _pms: chosen = _pms[0][0] From 7acd55be6f69fbd6dc3cc6ba4ccbf0cd9602c41b Mon Sep 17 00:00:00 2001 From: Tom Quist Date: Sat, 16 May 2026 01:30:20 +0200 Subject: [PATCH 5/7] Send raw readings --- src/astrameter/main.py | 2 +- src/astrameter/powermeter/base.py | 10 ++++++++++ src/astrameter/powermeter/wrappers/base.py | 3 +++ src/astrameter/powermeter/wrappers/pid_test.py | 1 + .../powermeter/wrappers/throttling.py | 5 +++++ .../powermeter/wrappers/throttling_test.py | 18 ++++++++++++++++++ .../powermeter/wrappers/transform_test.py | 11 ++++++++++- 7 files changed, 48 insertions(+), 2 deletions(-) diff --git a/src/astrameter/main.py b/src/astrameter/main.py index 3b4084f3..e65d45ac 100644 --- a/src/astrameter/main.py +++ b/src/astrameter/main.py @@ -367,7 +367,7 @@ async def _marstek_get_values( if chosen is None: return [0.0, 0.0, 0.0] await chosen.wait_for_next_message() - vs = await chosen.get_powermeter_watts() + vs = await chosen.get_powermeter_watts_raw() return [float(vs[i]) if i < len(vs) else 0.0 for i in range(3)] def _marstek_connected_slave_count() -> int: diff --git a/src/astrameter/powermeter/base.py b/src/astrameter/powermeter/base.py index dc7de1bd..3050aaa7 100644 --- a/src/astrameter/powermeter/base.py +++ b/src/astrameter/powermeter/base.py @@ -3,6 +3,16 @@ class Powermeter: async def get_powermeter_watts(self) -> list[float]: raise NotImplementedError() + async def get_powermeter_watts_raw(self) -> list[float]: + """Per-phase watts before section/global processing wrappers. + + Used when a consumer (e.g. Marstek MQTT display) should match the physical + meter while control still uses :meth:`get_powermeter_watts`. Defaults to + the same values as :meth:`get_powermeter_watts` for sources with no inner + pipeline. + """ + return await self.get_powermeter_watts() + async def wait_for_message(self, timeout=5): pass diff --git a/src/astrameter/powermeter/wrappers/base.py b/src/astrameter/powermeter/wrappers/base.py index 291a9b28..ac28ac87 100644 --- a/src/astrameter/powermeter/wrappers/base.py +++ b/src/astrameter/powermeter/wrappers/base.py @@ -10,6 +10,9 @@ def __init__(self, wrapped_powermeter: Powermeter) -> None: async def get_powermeter_watts(self) -> list[float]: raise NotImplementedError() + async def get_powermeter_watts_raw(self) -> list[float]: + return await self.wrapped_powermeter.get_powermeter_watts_raw() + async def wait_for_message(self, timeout=5): await self.wrapped_powermeter.wait_for_message(timeout) diff --git a/src/astrameter/powermeter/wrappers/pid_test.py b/src/astrameter/powermeter/wrappers/pid_test.py index 05d1ac91..93bf81d8 100644 --- a/src/astrameter/powermeter/wrappers/pid_test.py +++ b/src/astrameter/powermeter/wrappers/pid_test.py @@ -10,6 +10,7 @@ def mock_powermeter(): """Return a mock powermeter with async stubs for all interface methods.""" pm = Mock() pm.get_powermeter_watts = AsyncMock() + pm.get_powermeter_watts_raw = pm.get_powermeter_watts pm.wait_for_message = AsyncMock() pm.wait_for_next_message = AsyncMock() pm.start = AsyncMock() diff --git a/src/astrameter/powermeter/wrappers/throttling.py b/src/astrameter/powermeter/wrappers/throttling.py index 29f8b6a7..afb0af55 100644 --- a/src/astrameter/powermeter/wrappers/throttling.py +++ b/src/astrameter/powermeter/wrappers/throttling.py @@ -29,6 +29,11 @@ def __init__(self, wrapped_powermeter: Powermeter, throttle_interval: float = 0. self._last_values: list[float] | None = None self._pending_fetch: asyncio.Future[list[float]] | None = None + async def get_powermeter_watts_raw(self) -> list[float]: + # Raw reads skip throttle coalescing so the Marstek app can show sensor-level + # watts without being tied to the CT002 control cadence. + return await self.wrapped_powermeter.get_powermeter_watts_raw() + async def get_powermeter_watts(self) -> list[float]: if self.throttle_interval <= 0: return await self.wrapped_powermeter.get_powermeter_watts() diff --git a/src/astrameter/powermeter/wrappers/throttling_test.py b/src/astrameter/powermeter/wrappers/throttling_test.py index e4f94fbc..09501df2 100644 --- a/src/astrameter/powermeter/wrappers/throttling_test.py +++ b/src/astrameter/powermeter/wrappers/throttling_test.py @@ -10,6 +10,7 @@ async def test_no_throttling_always_fetches_fresh_values(): """Test that when throttling is disabled, fresh values are always fetched.""" mock_pm = Mock() mock_pm.get_powermeter_watts = AsyncMock(return_value=[100.0, 200.0, 300.0]) + mock_pm.get_powermeter_watts_raw = mock_pm.get_powermeter_watts throttled = ThrottledPowermeter(mock_pm, throttle_interval=0) result1 = await throttled.get_powermeter_watts() @@ -24,6 +25,7 @@ async def test_throttling_waits_for_interval(): """Test that throttling waits for remaining time before fetching new values.""" mock_pm = Mock() mock_pm.get_powermeter_watts = AsyncMock(return_value=[100.0, 200.0, 300.0]) + mock_pm.get_powermeter_watts_raw = mock_pm.get_powermeter_watts throttled = ThrottledPowermeter(mock_pm, throttle_interval=0.2) result1 = await throttled.get_powermeter_watts() @@ -46,6 +48,7 @@ async def test_throttling_fetches_fresh_after_interval(): """Test that fresh values are fetched after throttling interval passes.""" mock_pm = Mock() mock_pm.get_powermeter_watts = AsyncMock(return_value=[100.0, 200.0, 300.0]) + mock_pm.get_powermeter_watts_raw = mock_pm.get_powermeter_watts throttled = ThrottledPowermeter(mock_pm, throttle_interval=0.1) result1 = await throttled.get_powermeter_watts() @@ -86,6 +89,7 @@ async def test_exception_handling_with_cache(): mock_pm.start = AsyncMock() mock_pm.stop = AsyncMock() mock_pm.get_powermeter_watts = AsyncMock(return_value=[100.0, 200.0]) + mock_pm.get_powermeter_watts_raw = mock_pm.get_powermeter_watts throttled = ThrottledPowermeter(mock_pm, throttle_interval=0.1) @@ -104,8 +108,22 @@ async def test_exception_raises_without_cache(): mock_pm.start = AsyncMock() mock_pm.stop = AsyncMock() mock_pm.get_powermeter_watts = AsyncMock(side_effect=Exception("Network error")) + mock_pm.get_powermeter_watts_raw = mock_pm.get_powermeter_watts throttled = ThrottledPowermeter(mock_pm, throttle_interval=0.1) with pytest.raises(Exception, match="Network error"): await throttled.get_powermeter_watts() + + +async def test_throttled_raw_bypasses_get_and_throttle_coalescing(): + mock_pm = Mock() + get_m = AsyncMock(return_value=[1.0]) + raw_m = AsyncMock(return_value=[2.0, 3.0, 4.0]) + mock_pm.get_powermeter_watts = get_m + mock_pm.get_powermeter_watts_raw = raw_m + throttled = ThrottledPowermeter(mock_pm, throttle_interval=3600.0) + + assert await throttled.get_powermeter_watts_raw() == [2.0, 3.0, 4.0] + raw_m.assert_awaited_once() + get_m.assert_not_called() diff --git a/src/astrameter/powermeter/wrappers/transform_test.py b/src/astrameter/powermeter/wrappers/transform_test.py index aec212bd..7e2cdc0f 100644 --- a/src/astrameter/powermeter/wrappers/transform_test.py +++ b/src/astrameter/powermeter/wrappers/transform_test.py @@ -8,12 +8,21 @@ @pytest.fixture def mock_powermeter(): pm = Mock() - pm.get_powermeter_watts = AsyncMock() + m = AsyncMock() + pm.get_powermeter_watts = m + pm.get_powermeter_watts_raw = m pm.wait_for_message = AsyncMock() pm.wait_for_next_message = AsyncMock() return pm +async def test_transformed_raw_matches_wrapped_without_offset(mock_powermeter): + mock_powermeter.get_powermeter_watts.return_value = [100.0, 200.0, 300.0] + t = TransformedPowermeter(mock_powermeter, [10.0], [1.0]) + assert await t.get_powermeter_watts() == [110.0, 210.0, 310.0] + assert await t.get_powermeter_watts_raw() == [100.0, 200.0, 300.0] + + async def test_identity_single_phase(mock_powermeter): mock_powermeter.get_powermeter_watts.return_value = [500.0] t = TransformedPowermeter(mock_powermeter, [0.0], [1.0]) From 79bf9365195e97c0f643865b026deb7b9f2d34c8 Mon Sep 17 00:00:00 2001 From: Tom Quist Date: Sat, 16 May 2026 01:36:15 +0200 Subject: [PATCH 6/7] Fix tests --- src/astrameter/main_test.py | 16 ++++++++++++++++ .../powermeter/wrappers/hampel_test.py | 3 +++ .../powermeter/wrappers/smoothing_test.py | 3 +++ src/astrameter/shelly/shelly_udp_test.py | 5 +++++ 4 files changed, 27 insertions(+) diff --git a/src/astrameter/main_test.py b/src/astrameter/main_test.py index 11b1d41c..40107db2 100644 --- a/src/astrameter/main_test.py +++ b/src/astrameter/main_test.py @@ -21,6 +21,9 @@ def __init__( async def get_powermeter_watts(self) -> list[float]: return list(self._values) + async def get_powermeter_watts_raw(self) -> list[float]: + return list(self._values) + async def wait_for_next_message(self, timeout=5): self._wait_calls.append(timeout) if self._wait_raises is not None: @@ -57,6 +60,19 @@ async def test_read_ct_powermeter_calls_wait_with_2s_when_enabled(): assert pm._wait_calls == [2] +async def test_stub_powermeter_raw_matches_watts(): + pm = _StubPowermeter([3.0, 4.0, 5.0]) + assert ( + await pm.get_powermeter_watts_raw() + == await pm.get_powermeter_watts() + == [ + 3.0, + 4.0, + 5.0, + ] + ) + + async def test_read_ct_powermeter_swallows_timeout_and_serves_cached(): """Issue #327: a slow push meter must not break CT002 responses.""" pm = _StubPowermeter( diff --git a/src/astrameter/powermeter/wrappers/hampel_test.py b/src/astrameter/powermeter/wrappers/hampel_test.py index aec96ce2..f19f4c4e 100644 --- a/src/astrameter/powermeter/wrappers/hampel_test.py +++ b/src/astrameter/powermeter/wrappers/hampel_test.py @@ -20,6 +20,9 @@ def set(self, values: list[float]) -> None: async def get_powermeter_watts(self) -> list[float]: return list(self._values) + async def get_powermeter_watts_raw(self) -> list[float]: + return list(self._values) + async def wait_for_message(self, timeout=5): pass diff --git a/src/astrameter/powermeter/wrappers/smoothing_test.py b/src/astrameter/powermeter/wrappers/smoothing_test.py index a311ca3a..979fbce5 100644 --- a/src/astrameter/powermeter/wrappers/smoothing_test.py +++ b/src/astrameter/powermeter/wrappers/smoothing_test.py @@ -20,6 +20,9 @@ def set(self, values: list[float]) -> None: async def get_powermeter_watts(self) -> list[float]: return list(self._values) + async def get_powermeter_watts_raw(self) -> list[float]: + return list(self._values) + async def wait_for_message(self, timeout=5): pass diff --git a/src/astrameter/shelly/shelly_udp_test.py b/src/astrameter/shelly/shelly_udp_test.py index a49a6c7f..45b7f215 100644 --- a/src/astrameter/shelly/shelly_udp_test.py +++ b/src/astrameter/shelly/shelly_udp_test.py @@ -17,6 +17,11 @@ async def get_powermeter_watts(self): self.call_count += 1 return [1.0] + async def get_powermeter_watts_raw(self): + # Same physical reading as get_powermeter_watts; do not bump call_count so + # throttling/dedupe tests that only observe get_powermeter_watts stay stable. + return [1.0] + class _FakeClock: def __init__(self) -> None: From 9e7a299b93c202c3287ee976d022bf3bfa30945e Mon Sep 17 00:00:00 2001 From: Tom Quist Date: Sat, 16 May 2026 01:44:58 +0200 Subject: [PATCH 7/7] Readme and tests --- CHANGELOG.md | 2 +- README.md | 28 +++++++++++++++---- config.ini.example | 22 +++++++-------- .../mqtt_insights/mqtt_insights_test.py | 3 ++ src/astrameter/mqtt_insights/service.py | 2 +- 5 files changed, 39 insertions(+), 18 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6b2c9c7f..27c4b2e9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,7 +13,7 @@ ### Added - **Added** CT002/CT003 emulation for steering multiple Marstek storage devices over the Marstek CT UDP protocol. Active control is on by default (`ACTIVE_CONTROL = True`): the emulator smooths the grid reading, splits the target across batteries with a 15 W `BALANCE_DEADBAND`, and runs time-weighted saturation detection with handoff — set `ACTIVE_CONTROL = False` for relay mode (raw meter values forwarded, batteries decide). Includes fair-share balancing (`FAIR_DISTRIBUTION`, `BALANCE_GAIN`), manual target override and forced rotation via MQTT, ARP-based consumer discovery, and an opt-in efficiency mode that concentrates power on fewer batteries at low demand (`MIN_EFFICIENT_POWER`, `EFFICIENCY_ROTATION_INTERVAL`, probe-based fades, `SATURATION_GRACE_SECONDS`) ([#283](https://github.com/tomquist/astrameter/pull/283), [#284](https://github.com/tomquist/astrameter/pull/284), [#287](https://github.com/tomquist/astrameter/pull/287), [#289](https://github.com/tomquist/astrameter/pull/289), [#291](https://github.com/tomquist/astrameter/pull/291), [#293](https://github.com/tomquist/astrameter/pull/293), [#294](https://github.com/tomquist/astrameter/pull/294), [#296](https://github.com/tomquist/astrameter/pull/296), [#298](https://github.com/tomquist/astrameter/pull/298), [#301](https://github.com/tomquist/astrameter/pull/301), [#303](https://github.com/tomquist/astrameter/pull/303), [#310](https://github.com/tomquist/astrameter/pull/310), [#311](https://github.com/tomquist/astrameter/pull/311), [#320](https://github.com/tomquist/astrameter/pull/320), [#321](https://github.com/tomquist/astrameter/pull/321)). - **Added** MQTT Insights: optional `[MQTT_INSIGHTS]` section publishes internal state (grid power, targets, saturation, consumer topology, EMA poll interval) to MQTT with Home Assistant Device Discovery; per-consumer active/pause + manual target control; Shelly battery offline availability; auto-configured in the HA app when Mosquitto is installed ([#292](https://github.com/tomquist/astrameter/pull/292), [#294](https://github.com/tomquist/astrameter/pull/294), [#297](https://github.com/tomquist/astrameter/pull/297), [#300](https://github.com/tomquist/astrameter/pull/300), [#306](https://github.com/tomquist/astrameter/pull/306)). -- **Added** Marstek MQTT responder inside MQTT Insights: when `[MARSTEK]` credentials are configured, AstraMeter answers the Marstek CT002/CT003 poll protocol on the MQTT broker using the managed cloud MAC, so combined with [hame-relay](https://github.com/tomquist/hame-relay) the emulator's readings appear as a CT002/CT003 in the Marstek mobile app. Enabled by default; set `MARSTEK_MQTT_ENABLED = false` in `[MQTT_INSIGHTS]` to opt out. +- **Added** optional Marstek MQTT responder alongside MQTT Insights (HA is the main use case): when `[MARSTEK]` is configured, AstraMeter can answer CT002/CT003 poll traffic on the same broker using the managed cloud MAC; with [hame-relay](https://github.com/tomquist/hame-relay) **≥ 1.3.5** on that broker the Marstek mobile app shows live readings (see README, MQTT Insights). On by default; set `MARSTEK_MQTT_ENABLED = false` in `[MQTT_INSIGHTS]` to disable only this add-on. - **Added** opt-in web-based configuration editor (`WEB_CONFIG_ENABLED = True` in `[GENERAL]`) accessible at `http://:52500/config`; supports editing all config sections and keys with type-appropriate inputs, comment preservation, and a Save & Restart button ([#319](https://github.com/tomquist/astrameter/pull/319)). - **Added** HomeWizard P1 powermeter via the device WebSocket API, with optional `VERIFY_SSL` ([#231](https://github.com/tomquist/astrameter/pull/231), [#254](https://github.com/tomquist/astrameter/pull/254)). - **Added** Enphase IQ Gateway (Envoy) powermeter via the local HTTPS `production.json` API, with optional Enlighten-cloud token acquisition and automatic refresh on 401, and auto-detection of single- vs three-phase readings ([#245](https://github.com/tomquist/astrameter/pull/245)). diff --git a/README.md b/README.md index b9f939bc..a37fa2fc 100644 --- a/README.md +++ b/README.md @@ -336,6 +336,7 @@ Optional Marstek cloud auto-registration: - Refresh the CT device list after registration (or log out/in if needed). Then select `AstraMeter CT002` / `AstraMeter CT003`, switch battery mode to automatic, and choose that CT. It should be selectable as soon as it appears in the device list. - Marstek credentials are only needed for one-time registration. You can remove `MARSTEK.MAILBOX` / `MARSTEK.PASSWORD` immediately after registration succeeds (or if the managed device already exists). - If you use Home Assistant app `custom_config`, values from that file take precedence over app UI fields. + - **Marstek app (optional):** live CT grid power over MQTT uses the same `[MQTT_INSIGHTS]` broker as [hame-relay](https://github.com/tomquist/hame-relay) **≥ 1.3.5**; see [MQTT Insights](#mqtt-insights) (optional Marstek subsection). HA entities do not depend on this. ### Value Transformation @@ -861,11 +862,13 @@ CURRENT_POWER_ENTITY = sensor.current_power ### MQTT Insights -Publishes internal state (grid power per phase, charge targets, saturation, consumer topology) to MQTT with optional Home Assistant auto-discovery. +**Primary use:** publish CT002/Shelly internal state (grid power, targets, saturation, topology, switches) to MQTT with **optional Home Assistant MQTT Device Discovery** so entities show up in HA. -**Home Assistant App**: When running as an HA app with the Mosquitto broker app installed, MQTT Insights is auto-configured — no manual setup needed. Entities appear automatically in HA. +**Home Assistant app:** With the Mosquitto add-on installed, MQTT Insights is auto-configured; entities appear without manual `[MQTT_INSIGHTS]` wiring. -**Manual configuration**: +**Small add-on:** the same broker connection can optionally answer **Marstek CT002/CT003 MQTT polls** so the Marstek mobile app shows live grid power when you use [hame-relay](https://github.com/tomquist/hame-relay) on that broker (see below). You can turn that off with `MARSTEK_MQTT_ENABLED=false` and keep HA publishing unchanged. + +**Manual configuration** (when not using the HA app defaults): ```ini [MQTT_INSIGHTS] @@ -889,9 +892,24 @@ HA_DISCOVERY_PREFIX = homeassistant | `BASE_TOPIC` | `astrameter` | Root topic for all published messages | | `HA_DISCOVERY` | `true` | Enable Home Assistant MQTT Device Discovery | | `HA_DISCOVERY_PREFIX` | `homeassistant` | HA discovery topic prefix | -| `MARSTEK_MQTT_ENABLED` | `true` | Respond to Marstek CT002/CT003 MQTT polls on this broker (requires `[MARSTEK]`) | +| `MARSTEK_MQTT_ENABLED` | `true` | Optional: answer Marstek app CT002/CT003 polls on this broker (needs `[MARSTEK]`); set `false` for HA-only | +| `MARSTEK_MQTT_INTERVAL` | `300` | Optional: seconds between background aggregate publishes for the app; `0` = polls only | + +#### Optional: Marstek mobile app (live MQTT) + +This is **not** required for Home Assistant. It only helps the **Marstek app** show live CT002/CT003 grid power over the same cloud MQTT path when **[hame-relay](https://github.com/tomquist/hame-relay)** bridges your broker—use **hame-relay ≥ 1.3.5** so poll/replies work reliably. UDP between batteries and AstraMeter is unchanged for control. + +**If you want it** + +- **`[MARSTEK]`** — Managed fake CT so the **MQTT MAC** matches the cloud device. +- **Same broker as hame-relay** — `[MQTT_INSIGHTS]` must point at the broker relay uses toward Marstek's cloud. + +**Toggles** (defaults in table) + +- **`MARSTEK_MQTT_ENABLED`** — `false` = HA MQTT Insights only, no Marstek poll replies. +- **`MARSTEK_MQTT_INTERVAL`** — Optional periodic aggregate pushes; **`0`** = answer polls only. -**Marstek app visibility**: when `[MARSTEK]` credentials are configured, AstraMeter registers a managed fake CT device in the Marstek cloud. With `MARSTEK_MQTT_ENABLED=true` (default) it also answers the CT's MQTT polls on this broker. Combined with [hame-relay](https://github.com/tomquist/hame-relay) bridging the local broker to the Marstek cloud, the emulator's readings then show up as a CT002/CT003 in the Marstek mobile app. Set `MARSTEK_MQTT_ENABLED=false` to keep MQTT Insights but opt out of this responder. +Replies follow the usual `hame_energy/…` / `marstek_energy/…` App/device topics for a real CT; AstraMeter matches your CT002/CT003 **type** and **MAC**. **Published entities** (per CT002 consumer): - Grid power (L1/L2/L3/total), charge target (L1/L2/L3), reported power, saturation diff --git a/config.ini.example b/config.ini.example index 4973aa6e..508784c2 100644 --- a/config.ini.example +++ b/config.ini.example @@ -130,6 +130,9 @@ THROTTLE_INTERVAL = 0 ## Prefix is fixed to 02b250 (locally administered MAC space) for managed fake devices. ## Used for add-device request #TIMEZONE = Europe/Berlin +## Optional: live grid in the Marstek mobile app over MQTT uses [MQTT_INSIGHTS] +## on the same broker as hame-relay (https://github.com/tomquist/hame-relay) >= 1.3.5 +## (HA discovery does not depend on this). #[SHELLY] #TYPE = 1PM #PLUS1PM #EM #3EM #3EMPRO @@ -398,9 +401,10 @@ THROTTLE_INTERVAL = 0 #HAMPEL_MIN_THRESHOLD = 50 ## --- MQTT Insights (optional) --- -## Publishes internal state (grid power, targets, saturation, consumer topology) -## to MQTT with Home Assistant Device Discovery support. +## Mainly for Home Assistant: publishes internal state (grid power, targets, +## saturation, consumer topology) to MQTT with HA Device Discovery. ## When running as a Home Assistant app with Mosquitto, this is auto-configured. +## Optional add-on: Marstek app poll replies use the same broker (see MARSTEK_* below). #[MQTT_INSIGHTS] #BROKER = 192.168.1.100 #PORT = 1883 @@ -418,14 +422,10 @@ THROTTLE_INTERVAL = 0 #HA_DISCOVERY = true ## HA discovery prefix (default: homeassistant) #HA_DISCOVERY_PREFIX = homeassistant -## Respond to Marstek CT002/CT003 MQTT polls on this broker (default: true). -## Combined with hame-relay (https://github.com/tomquist/hame-relay) the -## emulator's CT readings surface in the Marstek mobile app. Requires a -## working [MARSTEK] section: the MAC used in the MQTT topics is the -## managed fake device AstraMeter registers in the Marstek cloud. +## Optional Marstek mobile app: answer CT002/CT003 MQTT polls on this broker (default: true). +## Live readings in the app need hame-relay (https://github.com/tomquist/hame-relay) >= 1.3.5 +## on the same broker; older relays may not forward poll/replies. Requires [MARSTEK] so the +## topic MAC matches the managed fake device. Set false for HA MQTT Insights only. #MARSTEK_MQTT_ENABLED = true -## Periodic broadcast interval (seconds) for Marstek MQTT responses. -## Publishes power values for all registered bindings at this cadence -## so the Marstek app stays up-to-date without relying solely on its own polls. -## Set to 0 to disable periodic broadcasts (respond only to polls). +## Seconds between optional Marstek aggregate (cd=1) broadcasts when the app is quiet; 0 = polls only. #MARSTEK_MQTT_INTERVAL = 300 diff --git a/src/astrameter/mqtt_insights/mqtt_insights_test.py b/src/astrameter/mqtt_insights/mqtt_insights_test.py index 54bc9228..043747a6 100644 --- a/src/astrameter/mqtt_insights/mqtt_insights_test.py +++ b/src/astrameter/mqtt_insights/mqtt_insights_test.py @@ -492,6 +492,9 @@ def _make_service(port: int, base_topic: str | None = None) -> MqttInsightsServi base_topic=base_topic, ha_discovery=True, ha_discovery_prefix=f"ha_disc_{_test_counter}", + # Broader E2E tests assert poll-only Marstek behaviour; periodic traffic + # is covered by test_marstek_periodic_broadcast. + marstek_mqtt_interval=0.0, ) ) diff --git a/src/astrameter/mqtt_insights/service.py b/src/astrameter/mqtt_insights/service.py index 4a4f834c..1be880d9 100644 --- a/src/astrameter/mqtt_insights/service.py +++ b/src/astrameter/mqtt_insights/service.py @@ -77,7 +77,7 @@ class MqttInsightsConfig: # Periodic broadcast interval (seconds). When > 0 and marstek_mqtt_enabled, # publish power values for every registered binding at this cadence so the # Marstek app stays up-to-date without relying solely on its own polls. - marstek_mqtt_interval: float = 1 + marstek_mqtt_interval: float = 300.0 @dataclass