From f51da7da6226ed04de7cb8c032b80cf3f56c3932 Mon Sep 17 00:00:00 2001 From: Claude Date: Sun, 19 Apr 2026 21:06:55 +0000 Subject: [PATCH 1/3] Expose more tunables in the HA add-on and extend dedup to Shelly Add a shared RequestDeduplicator helper and wire it into both the CT002 emulator (keyed by parsed consumer id) and the Shelly emulator (keyed by battery IP, since Shelly sources use ephemeral ports). DEDUPE_TIME_WINDOW can now be set under [GENERAL] to apply regardless of which device type is emulated; the existing [CT002]/[CT003] override still wins when present. Surface dedupe_time_window, smooth_target_alpha, max_smooth_step, and deadband in the HA add-on UI so users don't need a custom config file for common tuning. https://claude.ai/code/session_01YZtwVEn4bic9TtfqLmDaZS --- CHANGELOG.md | 1 + README.md | 4 +- config.ini.example | 7 ++- ha_addon/config.yaml | 4 ++ ha_addon/run.sh | 12 +++++ ha_addon/translations/en.yaml | 14 +++++- src/astrameter/ct002/ct002.py | 27 +++++------ src/astrameter/ct002/ct002_test.py | 32 +++++++++++++ src/astrameter/main.py | 34 ++++++++++++-- src/astrameter/request_dedupe.py | 41 ++++++++++++++++ src/astrameter/request_dedupe_test.py | 60 ++++++++++++++++++++++++ src/astrameter/shelly/shelly.py | 8 ++++ src/astrameter/shelly/shelly_udp_test.py | 41 +++++++++++++++- src/astrameter/web_config.py | 1 + 14 files changed, 262 insertions(+), 24 deletions(-) create mode 100644 src/astrameter/ct002/ct002_test.py create mode 100644 src/astrameter/request_dedupe.py create mode 100644 src/astrameter/request_dedupe_test.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 216b5817..1d5a1de6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,7 @@ # Changelog ## Next +- Added `DEDUPE_TIME_WINDOW` support to the Shelly emulator to drop burst-repeat requests from the same battery IP; the value can be set under `[GENERAL]` to apply regardless of which device type is emulated - Added opt-in web-based configuration editor (`WEB_CONFIG_ENABLED = True` in `[GENERAL]`) accessible at `http://:52500/config`; supports editing all config sections and keys with type-appropriate inputs, comment preservation, and a Save & Restart button - **Breaking:** Rebrand project from "B2500 Meter" to "AstraMeter" (formerly b2500-meter). Package renamed to `astrameter`, CLI commands are now `astrameter` and `astra-sim`. Docker image moved from `ghcr.io/tomquist/b2500-meter` to `ghcr.io/tomquist/astrameter` (the legacy `ghcr.io/tomquist/b2500-meter` image is still published in parallel for backward compatibility). Home Assistant users must update their app repository URL to `https://github.com/tomquist/astrameter#main`. - Added CT002/CT003 emulation for steering multiple Marstek storage devices over the Marstek CT UDP protocol, with opt-in efficiency optimization that concentrates power on fewer batteries at low demand and rotates fairly over time (`MIN_EFFICIENT_POWER`, `EFFICIENCY_ROTATION_INTERVAL`, and related tuning options) diff --git a/README.md b/README.md index 34439977..e62d22cc 100644 --- a/README.md +++ b/README.md @@ -303,7 +303,9 @@ CT_MAC = 001122334455 UDP_PORT = 12345 # WiFi RSSI reported to the storage system WIFI_RSSI = -50 -# Ignore repeated requests from the same client within this window (seconds) +# Ignore repeated requests from the same consumer within this window (seconds). +# Also supported by the Shelly emulator (keyed by battery IP); set it under +# [GENERAL] to apply regardless of the emulated device type. DEDUPE_TIME_WINDOW = 0 # Forget consumers after this many seconds without updates (multi-consumer support) CONSUMER_TTL = 120 diff --git a/config.ini.example b/config.ini.example index 62069784..11acd0b0 100644 --- a/config.ini.example +++ b/config.ini.example @@ -26,6 +26,10 @@ THROTTLE_INTERVAL = 0 # response than a fresh one. Defaults to true. # Can be overridden per powermeter section. #WAIT_FOR_NEXT_MESSAGE = true +# Ignore repeated requests from the same emulator client within this window +# (seconds). Applies to CT002/CT003 (keyed by consumer id) and Shelly (keyed +# by battery IP). Can be overridden in the [CT002]/[CT003] section. 0 disables. +#DEDUPE_TIME_WINDOW = 0 #[CT002] ## CT type is derived from the emulated device (ct002 -> HME-4, ct003 -> HME-3). @@ -38,7 +42,8 @@ THROTTLE_INTERVAL = 0 #UDP_PORT = 12345 ## WiFi RSSI reported to the storage system #WIFI_RSSI = -50 -## Ignore repeated requests from the same client within this window (seconds) +## Ignore repeated requests from the same consumer within this window (seconds). +## Overrides the [GENERAL] DEDUPE_TIME_WINDOW for this section. #DEDUPE_TIME_WINDOW = 0 ## Forget consumers after this many seconds without updates (multi-consumer support) #CONSUMER_TTL = 120 diff --git a/ha_addon/config.yaml b/ha_addon/config.yaml index 19bc3509..7609f99c 100644 --- a/ha_addon/config.yaml +++ b/ha_addon/config.yaml @@ -56,5 +56,9 @@ schema: log_level: list(critical|error|warning|info|debug) power_offset: str? power_multiplier: str? + dedupe_time_window: float(0,)? + smooth_target_alpha: float(0,1)? + max_smooth_step: float(0,)? + deadband: float(0,)? custom_config: str? mqtt_uri: str? diff --git a/ha_addon/run.sh b/ha_addon/run.sh index 64904903..18b958fc 100644 --- a/ha_addon/run.sh +++ b/ha_addon/run.sh @@ -85,6 +85,9 @@ else echo "[GENERAL]" echo "DEVICE_TYPE=$(bashio::config 'device_types')" echo "THROTTLE_INTERVAL=$(bashio::config 'throttle_interval')" + if bashio::config.has_value 'dedupe_time_window'; then + echo "DEDUPE_TIME_WINDOW=$(bashio::config 'dedupe_time_window')" + fi echo "ENABLE_WEB_SERVER=true" echo "" if [ "$has_ct002" -eq 1 ] && [ "$has_ct003" -eq 1 ]; then @@ -143,6 +146,15 @@ else power_multiplier="$(bashio::config 'power_multiplier' | tr -d '\r\n')" echo "POWER_MULTIPLIER=$power_multiplier" fi + if bashio::config.has_value 'smooth_target_alpha'; then + echo "SMOOTH_TARGET_ALPHA=$(bashio::config 'smooth_target_alpha')" + fi + if bashio::config.has_value 'max_smooth_step'; then + echo "MAX_SMOOTH_STEP=$(bashio::config 'max_smooth_step')" + fi + if bashio::config.has_value 'deadband'; then + echo "DEADBAND=$(bashio::config 'deadband')" + fi # Fetch this add-on's slug from the supervisor so MQTT discovery can # link discovered meter devices to the add-on device via_device. diff --git a/ha_addon/translations/en.yaml b/ha_addon/translations/en.yaml index 901edb0d..a7b307fe 100644 --- a/ha_addon/translations/en.yaml +++ b/ha_addon/translations/en.yaml @@ -46,4 +46,16 @@ configuration: description: "Optional. Added to each power value after the multiplier is applied. Single value (e.g. -50) or a comma-separated list (one per phase, e.g. -50,-30,-40). Leave empty to disable." power_multiplier: name: Power Multiplier - description: "Optional. Scales each power value (formula: value * POWER_MULTIPLIER + POWER_OFFSET). Single value (e.g. 1.05) or a comma-separated list (one per phase, e.g. 1.05,1.02,1.03). Use -1 to flip polarity. Leave empty to disable." \ No newline at end of file + description: "Optional. Scales each power value (formula: value * POWER_MULTIPLIER + POWER_OFFSET). Single value (e.g. 1.05) or a comma-separated list (one per phase, e.g. 1.05,1.02,1.03). Use -1 to flip polarity. Leave empty to disable." + dedupe_time_window: + name: Deduplication Time Window + description: "Optional. Seconds during which repeat requests from the same battery are ignored. Applies to both CT002/CT003 (keyed by consumer id) and Shelly (keyed by battery IP) emulators. Set to 0 or leave empty to disable." + smooth_target_alpha: + name: Smoothing Alpha (EMA) + description: "Optional. Exponential moving-average factor in (0, 1] applied to power readings from Home Assistant. Smaller values smooth more heavily (e.g. 0.1 = heavy smoothing, 0.5 = light). Leave empty to disable." + max_smooth_step: + name: Max Smoothing Step + description: "Optional. Caps the maximum watt change per cycle when smoothing is active. Use together with Smoothing Alpha to bound per-step jumps (e.g. 200). Leave empty or 0 for unlimited." + deadband: + name: Deadband (W) + description: "Optional. Readings with absolute value below this threshold are reported as 0 W to stop battery micro-cycling near zero demand (e.g. 5). Leave empty or 0 to disable." \ No newline at end of file diff --git a/src/astrameter/ct002/ct002.py b/src/astrameter/ct002/ct002.py index 8b6ea5e4..682deefd 100644 --- a/src/astrameter/ct002/ct002.py +++ b/src/astrameter/ct002/ct002.py @@ -10,6 +10,7 @@ from typing import Any from astrameter.config.logger import logger +from astrameter.request_dedupe import RequestDeduplicator from .balancer import ( SATURATION_GRACE_SECONDS, @@ -142,7 +143,9 @@ def __init__( self._device_id = device_id self._consumers: dict[str, Consumer] = {} self._info_idx_counter = 0 - self._last_response_time: dict[tuple, float] = {} + self._dedup: RequestDeduplicator[str] = RequestDeduplicator( + dedupe_time_window, clock=clock or time.time + ) self._transport = None self._protocol: _CT002Protocol | None = None self._cleanup_task = None @@ -301,13 +304,7 @@ def _cleanup_consumers(self): self._call_event_listener(key, {"_removed": True}) del self._consumers[key] self._balancer.remove_consumer(key) - stale_addrs = [ - addr - for addr, ts in self._last_response_time.items() - if now - ts > self.dedupe_time_window - ] - for addr in stale_addrs: - self._last_response_time.pop(addr, None) + self._dedup.purge_older_than(self.consumer_ttl) def _consumer_mode(self, consumer_id: str | None) -> ConsumerMode: if not consumer_id: @@ -565,13 +562,15 @@ async def _handle_request(self, data, addr, transport): " in inspection mode" if in_inspection_mode else "", ) - # Deduplication check - current_time = time.time() - last_time = self._last_response_time.get(addr) - if last_time and (current_time - last_time) < self.dedupe_time_window: - logger.debug("Ignoring request from %s due to dedupe window", addr) + # Deduplication check (keyed by consumer id so repeats from the + # same battery are suppressed regardless of source UDP port). + if not self._dedup.should_process(consumer_id): + logger.debug( + "Ignoring request from %s (consumer=%s) due to dedupe window", + addr, + consumer_id, + ) return - self._last_response_time[addr] = current_time meter_dev_type = fields[0] if len(fields) > 0 else "" self._update_consumer_report( diff --git a/src/astrameter/ct002/ct002_test.py b/src/astrameter/ct002/ct002_test.py new file mode 100644 index 00000000..016d3614 --- /dev/null +++ b/src/astrameter/ct002/ct002_test.py @@ -0,0 +1,32 @@ +from astrameter.ct002.ct002 import CT002 + + +class FakeClock: + def __init__(self) -> None: + self.now = 0.0 + + def __call__(self) -> float: + return self.now + + +def test_dedup_uses_consumer_id_key_and_injected_clock() -> None: + clock = FakeClock() + ct = CT002(dedupe_time_window=1.0, clock=clock) + + # Same consumer within the window → dropped. + assert ct._dedup.should_process("consumer-A") is True + clock.now += 0.5 + assert ct._dedup.should_process("consumer-A") is False + + # Different consumers are independent, even within the window. + assert ct._dedup.should_process("consumer-B") is True + + # After the window elapses, the same consumer is accepted again. + clock.now += 1.0 + assert ct._dedup.should_process("consumer-A") is True + + +def test_dedup_window_zero_disables() -> None: + ct = CT002(dedupe_time_window=0.0) + for _ in range(3): + assert ct._dedup.should_process("consumer-A") is True diff --git a/src/astrameter/main.py b/src/astrameter/main.py index 2dca93db..7189506a 100644 --- a/src/astrameter/main.py +++ b/src/astrameter/main.py @@ -123,6 +123,10 @@ async def run_device( device: CT002 | Shelly + global_dedupe_time_window = cfg.getfloat( + "GENERAL", "DEDUPE_TIME_WINDOW", fallback=0.0 + ) + if device_type in ["ct002", "ct003"]: ct_section = get_ct_section(device_type, cfg) ct_type = "HME-4" if device_type == "ct002" else "HME-3" @@ -130,7 +134,7 @@ async def run_device( ct_udp_port = cfg.getint(ct_section, "UDP_PORT", fallback=UDP_PORT) wifi_rssi = cfg.getint(ct_section, "WIFI_RSSI", fallback=-50) dedupe_time_window = cfg.getfloat( - ct_section, "DEDUPE_TIME_WINDOW", fallback=0.0 + ct_section, "DEDUPE_TIME_WINDOW", fallback=global_dedupe_time_window ) consumer_ttl = cfg.getint(ct_section, "CONSUMER_TTL", fallback=120) debug_status = cfg.getboolean(ct_section, "DEBUG_STATUS", fallback=False) @@ -260,22 +264,42 @@ def _ct002_event_listener(dev_id, consumer_id, data): elif device_type == "shellypro3em_old": logger.debug("Shelly Pro 3EM Settings:") logger.debug(f"Device ID: {device_id}") - device = Shelly(powermeters=powermeters, device_id=device_id, udp_port=1010) + device = Shelly( + powermeters=powermeters, + device_id=device_id, + udp_port=1010, + dedupe_time_window=global_dedupe_time_window, + ) elif device_type == "shellypro3em_new": logger.debug("Shelly Pro 3EM Settings:") logger.debug(f"Device ID: {device_id}") - device = Shelly(powermeters=powermeters, device_id=device_id, udp_port=2220) + device = Shelly( + powermeters=powermeters, + device_id=device_id, + udp_port=2220, + dedupe_time_window=global_dedupe_time_window, + ) elif device_type == "shellyemg3": logger.debug("Shelly EM Gen3 Settings:") logger.debug(f"Device ID: {device_id}") - device = Shelly(powermeters=powermeters, device_id=device_id, udp_port=2222) + device = Shelly( + powermeters=powermeters, + device_id=device_id, + udp_port=2222, + dedupe_time_window=global_dedupe_time_window, + ) elif device_type == "shellyproem50": logger.debug("Shelly Pro EM 50 Settings:") logger.debug(f"Device ID: {device_id}") - device = Shelly(powermeters=powermeters, device_id=device_id, udp_port=2223) + device = Shelly( + powermeters=powermeters, + device_id=device_id, + udp_port=2223, + dedupe_time_window=global_dedupe_time_window, + ) else: raise ValueError(f"Unsupported device type: {device_type}") diff --git a/src/astrameter/request_dedupe.py b/src/astrameter/request_dedupe.py new file mode 100644 index 00000000..37d87559 --- /dev/null +++ b/src/astrameter/request_dedupe.py @@ -0,0 +1,41 @@ +from __future__ import annotations + +import time +from collections.abc import Callable, Hashable +from typing import Generic, TypeVar + +K = TypeVar("K", bound=Hashable) + + +class RequestDeduplicator(Generic[K]): + """Drop repeated incoming requests within a time window. + + Callers pick the key (e.g. a battery IP for the Shelly emulator, a + consumer id for CT002). A window of 0 disables dedup and every + request is allowed through. + """ + + def __init__( + self, + window_seconds: float, + clock: Callable[[], float] | None = None, + ) -> None: + self._window = max(0.0, window_seconds) + self._clock = clock or time.monotonic + self._last: dict[K, float] = {} + + def should_process(self, key: K) -> bool: + if self._window <= 0.0: + return True + now = self._clock() + last = self._last.get(key) + if last is not None and (now - last) < self._window: + return False + self._last[key] = now + return True + + def purge_older_than(self, max_age_seconds: float) -> None: + if not self._last: + return + cutoff = self._clock() - max_age_seconds + self._last = {k: t for k, t in self._last.items() if t >= cutoff} diff --git a/src/astrameter/request_dedupe_test.py b/src/astrameter/request_dedupe_test.py new file mode 100644 index 00000000..33c02b3d --- /dev/null +++ b/src/astrameter/request_dedupe_test.py @@ -0,0 +1,60 @@ +from astrameter.request_dedupe import RequestDeduplicator + + +class FakeClock: + def __init__(self, start: float = 0.0) -> None: + self.now = start + + def __call__(self) -> float: + return self.now + + +def test_window_zero_allows_everything() -> None: + clock = FakeClock() + dedup: RequestDeduplicator[str] = RequestDeduplicator(0.0, clock=clock) + for _ in range(5): + assert dedup.should_process("a") is True + + +def test_repeat_within_window_is_dropped() -> None: + clock = FakeClock() + dedup: RequestDeduplicator[str] = RequestDeduplicator(1.0, clock=clock) + assert dedup.should_process("a") is True + clock.now += 0.5 + assert dedup.should_process("a") is False + + +def test_repeat_after_window_passes() -> None: + clock = FakeClock() + dedup: RequestDeduplicator[str] = RequestDeduplicator(1.0, clock=clock) + assert dedup.should_process("a") is True + clock.now += 1.5 + assert dedup.should_process("a") is True + + +def test_different_keys_are_independent() -> None: + clock = FakeClock() + dedup: RequestDeduplicator[str] = RequestDeduplicator(10.0, clock=clock) + assert dedup.should_process("a") is True + assert dedup.should_process("b") is True + clock.now += 1.0 + assert dedup.should_process("a") is False + assert dedup.should_process("b") is False + + +def test_purge_drops_stale_entries() -> None: + clock = FakeClock() + dedup: RequestDeduplicator[str] = RequestDeduplicator(1.0, clock=clock) + dedup.should_process("old") + clock.now += 100.0 + dedup.should_process("new") + dedup.purge_older_than(max_age_seconds=10.0) + # "old" is 100s old and should be purged; "new" was just recorded. + clock.now += 0.1 + assert dedup.should_process("old") is True # record removed → fresh + assert dedup.should_process("new") is False # still within window + + +def test_purge_on_empty_is_noop() -> None: + dedup: RequestDeduplicator[str] = RequestDeduplicator(1.0) + dedup.purge_older_than(10.0) # should not raise diff --git a/src/astrameter/shelly/shelly.py b/src/astrameter/shelly/shelly.py index 173d14e3..9e4e8a79 100644 --- a/src/astrameter/shelly/shelly.py +++ b/src/astrameter/shelly/shelly.py @@ -11,6 +11,7 @@ from astrameter.config import ClientFilter from astrameter.config.logger import logger from astrameter.powermeter import Powermeter +from astrameter.request_dedupe import RequestDeduplicator BATTERY_INACTIVE_TIMEOUT_SECONDS = 120 POLL_INTERVAL_EMA_ALPHA = 0.3 @@ -38,6 +39,7 @@ def __init__( powermeters: list[tuple[Powermeter, ClientFilter, bool]], udp_port: int, device_id, + dedupe_time_window: float = 0.0, ): self._udp_port = udp_port self._device_id = device_id @@ -49,6 +51,7 @@ def __init__( self._inactive_batteries: set[str] = set() self._stopped = asyncio.Event() self._inactive_check_task = None + self._dedup: RequestDeduplicator[str] = RequestDeduplicator(dedupe_time_window) self.event_listener: Callable[[str, str, dict[str, Any]], None] | None = None def _calculate_derived_values(self, power): @@ -187,6 +190,10 @@ async def _safe_handle_request(self, transport, data, addr): async def _handle_request(self, transport, data, addr): poll_interval = self._track_battery_seen(addr) + if not self._dedup.should_process(addr[0]): + logger.debug("Ignoring request from %s due to dedupe window", addr) + return + try: request_str = data.decode() except UnicodeDecodeError: @@ -270,6 +277,7 @@ async def _inactive_check_loop(self): while True: await asyncio.sleep(1.0) self._log_inactive_batteries() + self._dedup.purge_older_than(BATTERY_INACTIVE_TIMEOUT_SECONDS) except asyncio.CancelledError: pass diff --git a/src/astrameter/shelly/shelly_udp_test.py b/src/astrameter/shelly/shelly_udp_test.py index b0ca7898..642430b5 100644 --- a/src/astrameter/shelly/shelly_udp_test.py +++ b/src/astrameter/shelly/shelly_udp_test.py @@ -17,6 +17,43 @@ async def get_powermeter_watts(self): return [1.0] +async def test_dedupe_window_drops_rapid_duplicates(): + dummy = DummyPowermeter() + cf = ClientFilter([IPv4Network("127.0.0.1/32")]) + + shelly = Shelly( + [(dummy, cf, False)], + udp_port=0, + device_id="test", + dedupe_time_window=0.3, + ) + await shelly.start() + port = shelly.udp_port + try: + # First request is answered normally. + first = await _send_req(port, 1) + assert first == 1 + calls_after_first = dummy.call_count + + # A second request from the same IP within the window is dropped: + # the emulator never responds, so the client times out. + try: + await _send_req(port, 2, timeout=0.2) + raise AssertionError("expected dedup to drop the duplicate request") + except TimeoutError: + pass + # No extra powermeter fetch for the dropped request. + assert dummy.call_count == calls_after_first + + # After the dedup window elapses, requests are answered again. + await asyncio.sleep(0.4) + third = await _send_req(port, 3) + assert third == 3 + assert dummy.call_count == calls_after_first + 1 + finally: + await shelly.stop() + + async def test_multiple_requests_with_throttling(): dummy = DummyPowermeter() pm = ThrottledPowermeter(dummy, throttle_interval=0.2) @@ -56,7 +93,7 @@ async def send_req(i): await shelly.stop() -async def _send_req(port, request_id): +async def _send_req(port, request_id, timeout=2.0): loop = asyncio.get_running_loop() transport, protocol = await loop.create_datagram_endpoint( lambda: _ClientProtocol(), @@ -71,7 +108,7 @@ async def _send_req(port, request_id): "params": {"id": 0}, } transport.sendto(json.dumps(req).encode(), ("127.0.0.1", port)) - data = await asyncio.wait_for(protocol.received, timeout=2.0) + data = await asyncio.wait_for(protocol.received, timeout=timeout) return json.loads(data.decode())["id"] finally: transport.close() diff --git a/src/astrameter/web_config.py b/src/astrameter/web_config.py index 133e3c1d..5d6bcee5 100644 --- a/src/astrameter/web_config.py +++ b/src/astrameter/web_config.py @@ -267,6 +267,7 @@ def _pm(**extras: dict[str, object]) -> dict[str, dict[str, object]]: "DISABLE_ABSOLUTE_VALUES": {"type": "boolean"}, "THROTTLE_INTERVAL": {"type": "float"}, "WAIT_FOR_NEXT_MESSAGE": {"type": "boolean"}, + "DEDUPE_TIME_WINDOW": {"type": "float", "min": 0}, "PID_KP": {"type": "float"}, "PID_KI": {"type": "float"}, "PID_KD": {"type": "float"}, From aa3d5bcdbab2fe9fe7cc3785e564b954e599366c Mon Sep 17 00:00:00 2001 From: Claude Date: Sun, 19 Apr 2026 21:23:29 +0000 Subject: [PATCH 2/3] Review fixes: deterministic test, large-window purge, edge cases - Make the Shelly dedup test deterministic by driving _handle_request directly with a fake transport and a fake clock injected into the deduplicator; no more real UDP sockets or sleep-based timing. - Honor DEDUPE_TIME_WINDOW values greater than the 120s Shelly battery-inactivity horizon by purging with max(horizon, window). - Clarify in CT002 why the deduplicator is constructed with time.time instead of defaulting to time.monotonic (shares a timebase with _cleanup_consumers). - RequestDeduplicator: treat non-finite and negative windows as disabled, and document the "window-from-last-accepted" semantic and the caller responsibility to purge. - Add tests for the new guards and for "dropped request must not refresh the timestamp." - Add DEDUPE_TIME_WINDOW to the README [GENERAL] example so Shelly users see it without hunting through the CT section. https://claude.ai/code/session_01YZtwVEn4bic9TtfqLmDaZS --- README.md | 4 ++ src/astrameter/ct002/ct002.py | 3 + src/astrameter/request_dedupe.py | 14 ++++- src/astrameter/request_dedupe_test.py | 30 ++++++++++ src/astrameter/shelly/shelly.py | 12 +++- src/astrameter/shelly/shelly_udp_test.py | 73 +++++++++++++++--------- 6 files changed, 106 insertions(+), 30 deletions(-) diff --git a/README.md b/README.md index e62d22cc..641e18aa 100644 --- a/README.md +++ b/README.md @@ -182,6 +182,10 @@ THROTTLE_INTERVAL = 0 # doesn't add latency to every CT002 response. Default: true. # Can be overridden per powermeter section. #WAIT_FOR_NEXT_MESSAGE = true +# Ignore repeated requests from the same emulator client within this window +# (seconds). Applies to CT002/CT003 (keyed by consumer id) and Shelly (keyed +# by battery IP). Can be overridden in the [CT002]/[CT003] section. 0 disables. +#DEDUPE_TIME_WINDOW = 0 ``` Per-powermeter options (apply in any powermeter section, e.g. `[TASMOTA]` or `[HOMEASSISTANT]`): diff --git a/src/astrameter/ct002/ct002.py b/src/astrameter/ct002/ct002.py index 682deefd..5150e310 100644 --- a/src/astrameter/ct002/ct002.py +++ b/src/astrameter/ct002/ct002.py @@ -143,6 +143,9 @@ def __init__( self._device_id = device_id self._consumers: dict[str, Consumer] = {} self._info_idx_counter = 0 + # Use wall-clock (time.time) so the dedup shares a timebase with + # _cleanup_consumers' purge; RequestDeduplicator would otherwise + # default to time.monotonic and mix timebases across the class. self._dedup: RequestDeduplicator[str] = RequestDeduplicator( dedupe_time_window, clock=clock or time.time ) diff --git a/src/astrameter/request_dedupe.py b/src/astrameter/request_dedupe.py index 37d87559..8dc92cba 100644 --- a/src/astrameter/request_dedupe.py +++ b/src/astrameter/request_dedupe.py @@ -1,5 +1,6 @@ from __future__ import annotations +import math import time from collections.abc import Callable, Hashable from typing import Generic, TypeVar @@ -13,6 +14,14 @@ class RequestDeduplicator(Generic[K]): Callers pick the key (e.g. a battery IP for the Shelly emulator, a consumer id for CT002). A window of 0 disables dedup and every request is allowed through. + + Semantics: + - ``should_process`` measures the window from the last **accepted** + request. Dropped requests do not refresh the timestamp, so the + next acceptance window stays aligned to the original. + - The internal dict grows unboundedly until ``purge_older_than`` is + called. Callers are expected to schedule periodic purges (e.g. + alongside their own TTL sweeps) to keep memory bounded. """ def __init__( @@ -20,7 +29,10 @@ def __init__( window_seconds: float, clock: Callable[[], float] | None = None, ) -> None: - self._window = max(0.0, window_seconds) + if not math.isfinite(window_seconds) or window_seconds <= 0.0: + self._window = 0.0 + else: + self._window = window_seconds self._clock = clock or time.monotonic self._last: dict[K, float] = {} diff --git a/src/astrameter/request_dedupe_test.py b/src/astrameter/request_dedupe_test.py index 33c02b3d..907e29ee 100644 --- a/src/astrameter/request_dedupe_test.py +++ b/src/astrameter/request_dedupe_test.py @@ -58,3 +58,33 @@ def test_purge_drops_stale_entries() -> None: def test_purge_on_empty_is_noop() -> None: dedup: RequestDeduplicator[str] = RequestDeduplicator(1.0) dedup.purge_older_than(10.0) # should not raise + + +def test_dropped_request_does_not_refresh_timestamp() -> None: + # The window is measured from the last *accepted* request. A dropped + # repeat within the window must not slide the window forward. + clock = FakeClock() + dedup: RequestDeduplicator[str] = RequestDeduplicator(1.0, clock=clock) + assert dedup.should_process("a") is True # accepted at t=0.0 + clock.now = 0.6 + assert dedup.should_process("a") is False # dropped; must not refresh + # At t=1.05 we're past the original accept (t=0.0) but would still be + # within 1.0s of the dropped attempt (t=0.6) if it had refreshed. + clock.now = 1.05 + assert dedup.should_process("a") is True + + +def test_non_finite_window_is_treated_as_disabled() -> None: + inf_dedup: RequestDeduplicator[str] = RequestDeduplicator(float("inf")) + for _ in range(3): + assert inf_dedup.should_process("a") is True + + nan_dedup: RequestDeduplicator[str] = RequestDeduplicator(float("nan")) + for _ in range(3): + assert nan_dedup.should_process("a") is True + + +def test_negative_window_is_treated_as_disabled() -> None: + dedup: RequestDeduplicator[str] = RequestDeduplicator(-5.0) + for _ in range(3): + assert dedup.should_process("a") is True diff --git a/src/astrameter/shelly/shelly.py b/src/astrameter/shelly/shelly.py index 9e4e8a79..45a7f87f 100644 --- a/src/astrameter/shelly/shelly.py +++ b/src/astrameter/shelly/shelly.py @@ -51,7 +51,10 @@ def __init__( self._inactive_batteries: set[str] = set() self._stopped = asyncio.Event() self._inactive_check_task = None - self._dedup: RequestDeduplicator[str] = RequestDeduplicator(dedupe_time_window) + self._dedupe_time_window = max(0.0, dedupe_time_window) + self._dedup: RequestDeduplicator[str] = RequestDeduplicator( + self._dedupe_time_window + ) self.event_listener: Callable[[str, str, dict[str, Any]], None] | None = None def _calculate_derived_values(self, power): @@ -277,7 +280,12 @@ async def _inactive_check_loop(self): while True: await asyncio.sleep(1.0) self._log_inactive_batteries() - self._dedup.purge_older_than(BATTERY_INACTIVE_TIMEOUT_SECONDS) + # Keep dedup entries until they've aged past both the + # inactive-battery horizon and the configured window, so + # windows greater than 120s are still honored. + self._dedup.purge_older_than( + max(BATTERY_INACTIVE_TIMEOUT_SECONDS, self._dedupe_time_window) + ) except asyncio.CancelledError: pass diff --git a/src/astrameter/shelly/shelly_udp_test.py b/src/astrameter/shelly/shelly_udp_test.py index 642430b5..a49a6c7f 100644 --- a/src/astrameter/shelly/shelly_udp_test.py +++ b/src/astrameter/shelly/shelly_udp_test.py @@ -5,6 +5,7 @@ from astrameter.config import ClientFilter from astrameter.powermeter import Powermeter, ThrottledPowermeter +from astrameter.request_dedupe import RequestDeduplicator from astrameter.shelly.shelly import Shelly @@ -17,41 +18,59 @@ async def get_powermeter_watts(self): return [1.0] +class _FakeClock: + def __init__(self) -> None: + self.now = 0.0 + + def __call__(self) -> float: + return self.now + + +class _FakeTransport: + def __init__(self) -> None: + self.sent: list[tuple[bytes, tuple]] = [] + + def sendto(self, data: bytes, addr: tuple) -> None: + self.sent.append((data, addr)) + + async def test_dedupe_window_drops_rapid_duplicates(): + # Drive the handler directly with a fake transport and a fake clock so + # the test is independent of wall-clock time and real UDP delivery. dummy = DummyPowermeter() cf = ClientFilter([IPv4Network("127.0.0.1/32")]) - shelly = Shelly( [(dummy, cf, False)], udp_port=0, device_id="test", - dedupe_time_window=0.3, + dedupe_time_window=10.0, ) - await shelly.start() - port = shelly.udp_port - try: - # First request is answered normally. - first = await _send_req(port, 1) - assert first == 1 - calls_after_first = dummy.call_count - - # A second request from the same IP within the window is dropped: - # the emulator never responds, so the client times out. - try: - await _send_req(port, 2, timeout=0.2) - raise AssertionError("expected dedup to drop the duplicate request") - except TimeoutError: - pass - # No extra powermeter fetch for the dropped request. - assert dummy.call_count == calls_after_first - - # After the dedup window elapses, requests are answered again. - await asyncio.sleep(0.4) - third = await _send_req(port, 3) - assert third == 3 - assert dummy.call_count == calls_after_first + 1 - finally: - await shelly.stop() + clock = _FakeClock() + shelly._dedup = RequestDeduplicator(10.0, clock=clock) + + transport = _FakeTransport() + req = json.dumps( + {"id": 1, "src": "cli", "method": "EM.GetStatus", "params": {"id": 0}} + ).encode() + addr = ("127.0.0.1", 54321) + + # First request: accepted and a response is sent. + await shelly._handle_request(transport, req, addr) + assert len(transport.sent) == 1 + assert dummy.call_count == 1 + + # Second request within the window: dropped. Same source IP, different + # port (mirroring real Shelly batteries which use ephemeral ports). + clock.now = 1.0 + await shelly._handle_request(transport, req, ("127.0.0.1", 54322)) + assert len(transport.sent) == 1 + assert dummy.call_count == 1 + + # After the window elapses, requests are answered again. + clock.now = 11.5 + await shelly._handle_request(transport, req, ("127.0.0.1", 54323)) + assert len(transport.sent) == 2 + assert dummy.call_count == 2 async def test_multiple_requests_with_throttling(): From 8e0c437d567571b16a647e813f8300f0f16a6ba0 Mon Sep 17 00:00:00 2001 From: Claude Date: Sun, 19 Apr 2026 21:31:20 +0000 Subject: [PATCH 3/3] Remove dead CT002 DEADBAND plumbing and fix mis-placed docs BalancerConfig.deadband was wired through main.py -> CT002 -> BalancerConfig but never actually read anywhere; the only "deadband" semantics that still matter are BALANCE_DEADBAND (multi-battery imbalance floor) and the per-powermeter DeadbandPowermeter wrapper. Drop the dead field, its clamp, the constructor parameter, and the corresponding cfg.getint read. Purge deadband=... from all BalancerConfig test constructions. While here, fix two related mis-placements that were misleading users: - web_config.py listed SMOOTH_TARGET_ALPHA / MAX_SMOOTH_STEP / DEADBAND under the [CT002] schema, but the runtime only reads them from powermeter sections or [GENERAL] (config_loader.py:161-252). Move them to GENERAL. - README described the same three keys as "CT002/CT003 active-steering" options. They are powermeter-wrapper options. Move them into the per-powermeter options list alongside THROTTLE_INTERVAL. https://claude.ai/code/session_01YZtwVEn4bic9TtfqLmDaZS --- README.md | 24 +++++++++++------------- src/astrameter/ct002/balancer.py | 2 -- src/astrameter/ct002/ct002.py | 2 -- src/astrameter/main.py | 2 -- src/astrameter/web_config.py | 6 +++--- tests/smoke_efficiency_saturation.py | 1 - tests/test_balancer_probe_lockup.py | 1 - tests/test_ct002_active_control.py | 11 ----------- tests/test_e2e_probe_lockup.py | 1 - tests/test_efficiency_e2e.py | 1 - 10 files changed, 14 insertions(+), 37 deletions(-) diff --git a/README.md b/README.md index 641e18aa..3a614842 100644 --- a/README.md +++ b/README.md @@ -188,28 +188,26 @@ THROTTLE_INTERVAL = 0 #DEDUPE_TIME_WINDOW = 0 ``` -Per-powermeter options (apply in any powermeter section, e.g. `[TASMOTA]` or `[HOMEASSISTANT]`): +Per-powermeter options (apply in any powermeter section, e.g. `[TASMOTA]` or `[HOMEASSISTANT]`, or globally under `[GENERAL]`): - **THROTTLE_INTERVAL** — Override global throttling for this powermeter - **WAIT_FOR_NEXT_MESSAGE** — Override the global wait-for-fresh-push behaviour for this powermeter (set to `false` to opt out of the wait entirely) +- **SMOOTH_TARGET_ALPHA** (default 0 = disabled) — EMA factor for the powermeter + reading in (0, 1]. Higher values track load changes faster; lower values filter + noise but add lag. Values close to 1.0 work well when the powermeter updates at + ≥ 1 Hz; reduce toward 0.3 if it updates significantly slower than 1 Hz. +- **MAX_SMOOTH_STEP** (default 0 = unlimited) — Maximum watts the smoothed reading + may change per request cycle when `SMOOTH_TARGET_ALPHA` is active. Acts as a + slew-rate limit. +- **DEADBAND** (default 0 = disabled, W) — When the absolute reading is below this + value, the wrapper emits zeros instead of chasing noise. Keeps batteries from + hunting around the zero-crossing; 10–30 W is a sensible range. CT002/CT003 active-steering options (all under `[CT002]` or `[CT003]`): - **ACTIVE_CONTROL** — When true (default), the emulator smooths the grid reading, splits the target across batteries, and balances their load. When false, the emulator relays raw meter values and batteries decide on their own. -*Smoothing — how fast the emulator tracks grid changes:* -- **SMOOTH_TARGET_ALPHA** (default 0.9) — EMA factor for the grid reading. Higher values - track load changes faster; lower values filter noise but add lag. The battery's own ramp - rate already filters noise, so values close to 1.0 work well when the powermeter updates - at ≥ 1 Hz. Reduce toward 0.3 if your powermeter updates significantly slower than 1 Hz - (higher lag in readings needs heavier smoothing to avoid oscillation). -- **DEADBAND** (default 20 W) — When the grid total is within ± this value, the smoothed - target decays toward zero instead of chasing noise. Keeps batteries from hunting around - the zero-crossing. 10–30 W is a sensible range; set to 0 to disable. -- **MAX_SMOOTH_STEP** (default 0 = unlimited) — Maximum watts the smoothed target may - change per request cycle. Acts as a slew-rate limit. Rarely needed at high alpha. - *Fair distribution — balancing load across multiple batteries:* - **FAIR_DISTRIBUTION** (default true) — Adjust each battery's target so they share the load evenly. Only matters with two or more batteries. diff --git a/src/astrameter/ct002/balancer.py b/src/astrameter/ct002/balancer.py index 2fdb7288..c3bf79ff 100644 --- a/src/astrameter/ct002/balancer.py +++ b/src/astrameter/ct002/balancer.py @@ -53,7 +53,6 @@ class BalancerConfig: error_reduce_threshold: float = 20 max_correction_per_step: float = 80 max_target_step: float = 0 - deadband: float = 20 min_efficient_power: float = 0 probe_min_power: float = 80 efficiency_rotation_interval: float = 900 @@ -74,7 +73,6 @@ def _clamp(name: str, lo: float, hi: float) -> None: _clamp("error_reduce_threshold", 0, float("inf")) _clamp("max_correction_per_step", 0, float("inf")) _clamp("max_target_step", 0, float("inf")) - _clamp("deadband", 0, float("inf")) _clamp("min_efficient_power", 0, float("inf")) _clamp("probe_min_power", 0, float("inf")) _clamp("efficiency_rotation_interval", 1, float("inf")) diff --git a/src/astrameter/ct002/ct002.py b/src/astrameter/ct002/ct002.py index 5150e310..528cd5c0 100644 --- a/src/astrameter/ct002/ct002.py +++ b/src/astrameter/ct002/ct002.py @@ -110,7 +110,6 @@ def __init__( error_boost_max=0.5, error_reduce_threshold=20, balance_deadband=15, - deadband=20, max_correction_per_step=80, max_target_step=0, saturation_detection=True, @@ -174,7 +173,6 @@ def __init__( error_reduce_threshold=error_reduce_threshold, max_correction_per_step=max_correction_per_step, max_target_step=max_target_step, - deadband=deadband, min_efficient_power=min_efficient_power, probe_min_power=probe_min_power, efficiency_rotation_interval=efficiency_rotation_interval, diff --git a/src/astrameter/main.py b/src/astrameter/main.py index 7189506a..667e1d9c 100644 --- a/src/astrameter/main.py +++ b/src/astrameter/main.py @@ -153,7 +153,6 @@ async def run_device( ct_section, "ERROR_REDUCE_THRESHOLD", fallback=20 ) balance_deadband = cfg.getint(ct_section, "BALANCE_DEADBAND", fallback=15) - deadband = cfg.getint(ct_section, "DEADBAND", fallback=20) max_correction_per_step = cfg.getint( ct_section, "MAX_CORRECTION_PER_STEP", fallback=80 ) @@ -227,7 +226,6 @@ async def run_device( error_boost_max=error_boost_max, error_reduce_threshold=error_reduce_threshold, balance_deadband=balance_deadband, - deadband=deadband, max_correction_per_step=max_correction_per_step, max_target_step=max_target_step, saturation_detection=saturation_detection, diff --git a/src/astrameter/web_config.py b/src/astrameter/web_config.py index 5d6bcee5..90e17f34 100644 --- a/src/astrameter/web_config.py +++ b/src/astrameter/web_config.py @@ -268,6 +268,9 @@ def _pm(**extras: dict[str, object]) -> dict[str, dict[str, object]]: "THROTTLE_INTERVAL": {"type": "float"}, "WAIT_FOR_NEXT_MESSAGE": {"type": "boolean"}, "DEDUPE_TIME_WINDOW": {"type": "float", "min": 0}, + "SMOOTH_TARGET_ALPHA": {"type": "float", "min": 0, "max": 1}, + "MAX_SMOOTH_STEP": {"type": "float", "min": 0}, + "DEADBAND": {"type": "float", "min": 0}, "PID_KP": {"type": "float"}, "PID_KI": {"type": "float"}, "PID_KD": {"type": "float"}, @@ -281,9 +284,6 @@ def _pm(**extras: dict[str, object]) -> dict[str, dict[str, object]]: "CONSUMER_TTL": {"type": "integer"}, "DEBUG_STATUS": {"type": "boolean"}, "ACTIVE_CONTROL": {"type": "boolean"}, - "SMOOTH_TARGET_ALPHA": {"type": "float", "min": 0, "max": 1}, - "DEADBAND": {"type": "integer"}, - "MAX_SMOOTH_STEP": {"type": "integer"}, "FAIR_DISTRIBUTION": {"type": "boolean"}, "BALANCE_GAIN": {"type": "float"}, "BALANCE_DEADBAND": {"type": "integer"}, diff --git a/tests/smoke_efficiency_saturation.py b/tests/smoke_efficiency_saturation.py index 206d7e1e..a377adb4 100644 --- a/tests/smoke_efficiency_saturation.py +++ b/tests/smoke_efficiency_saturation.py @@ -109,7 +109,6 @@ def __init__( ct_mac=ct_mac, active_control=True, fair_distribution=True, - deadband=5, min_efficient_power=min_efficient_power, efficiency_rotation_interval=scaled_rotation, efficiency_saturation_threshold=efficiency_saturation_threshold, diff --git a/tests/test_balancer_probe_lockup.py b/tests/test_balancer_probe_lockup.py index 7af9d024..860cb641 100644 --- a/tests/test_balancer_probe_lockup.py +++ b/tests/test_balancer_probe_lockup.py @@ -60,7 +60,6 @@ def _make_balancer( error_reduce_threshold=20, max_correction_per_step=80, max_target_step=0, - deadband=20, min_efficient_power=50, probe_min_power=80, efficiency_rotation_interval=1800, diff --git a/tests/test_ct002_active_control.py b/tests/test_ct002_active_control.py index 59ab542d..264a1a34 100644 --- a/tests/test_ct002_active_control.py +++ b/tests/test_ct002_active_control.py @@ -52,7 +52,6 @@ def test_underperforming_consumer_gets_higher_target(self): fair_distribution=True, balance_gain=0.3, balance_deadband=0, - deadband=0, max_correction_per_step=0, max_target_step=0, ) @@ -67,7 +66,6 @@ def test_overperforming_consumer_gets_lower_target(self): fair_distribution=True, balance_gain=0.3, balance_deadband=0, - deadband=0, max_correction_per_step=0, max_target_step=0, ) @@ -93,7 +91,6 @@ def test_balance_gain_zero_no_correction(self): fair_distribution=True, balance_gain=0, balance_deadband=0, - deadband=0, max_correction_per_step=0, max_target_step=0, ) @@ -111,7 +108,6 @@ def test_large_error_gets_boosted_correction(self): error_boost_threshold=100, error_boost_max=1.0, balance_deadband=0, - deadband=0, max_correction_per_step=0, max_target_step=0, ) @@ -129,7 +125,6 @@ def test_error_boost_disabled_when_threshold_zero(self): balance_gain=0.3, error_boost_threshold=0, balance_deadband=0, - deadband=0, max_correction_per_step=0, max_target_step=0, ) @@ -162,7 +157,6 @@ def test_error_reduce_disabled_when_threshold_zero(self): error_reduce_threshold=0, error_boost_threshold=0, balance_deadband=0, - deadband=0, max_correction_per_step=0, max_target_step=0, ) @@ -177,7 +171,6 @@ def test_balance_deadband_skips_small_correction(self): fair_distribution=True, balance_gain=0.3, balance_deadband=25, - deadband=0, max_correction_per_step=0, max_target_step=0, ) @@ -192,7 +185,6 @@ def test_max_correction_per_step_caps_correction(self): fair_distribution=True, balance_gain=0.5, balance_deadband=0, - deadband=0, max_correction_per_step=50, max_target_step=0, ) @@ -207,7 +199,6 @@ def test_max_target_step_caps_target_vs_actual(self): fair_distribution=True, balance_gain=0.5, balance_deadband=0, - deadband=0, max_correction_per_step=0, max_target_step=100, ) @@ -1185,7 +1176,6 @@ def test_probe_backup_uses_delta_not_absolute_output(self): probe_min_power=80, efficiency_fade_alpha=1.0, efficiency_rotation_interval=10, - deadband=0, ) device._update_consumer_report("a", "A", 200) device._update_consumer_report("b", "A", 0) @@ -1209,7 +1199,6 @@ def test_probe_backup_ignores_probe_output_and_follows_demand(self): probe_min_power=80, efficiency_fade_alpha=1.0, efficiency_rotation_interval=10, - deadband=0, ) device._update_consumer_report("a", "A", 200) device._update_consumer_report("b", "A", 40) diff --git a/tests/test_e2e_probe_lockup.py b/tests/test_e2e_probe_lockup.py index b9a77d2a..3183bf7d 100644 --- a/tests/test_e2e_probe_lockup.py +++ b/tests/test_e2e_probe_lockup.py @@ -149,7 +149,6 @@ def __init__( ct_mac=ct_mac, active_control=True, fair_distribution=True, - deadband=5, min_efficient_power=min_efficient_power, efficiency_rotation_interval=efficiency_rotation_interval, probe_min_power=20, # lower so the test's small loads can probe diff --git a/tests/test_efficiency_e2e.py b/tests/test_efficiency_e2e.py index 612b237d..b6d26971 100644 --- a/tests/test_efficiency_e2e.py +++ b/tests/test_efficiency_e2e.py @@ -141,7 +141,6 @@ def __init__( ct_mac=ct_mac, active_control=True, fair_distribution=True, - deadband=5, min_efficient_power=min_efficient_power, efficiency_rotation_interval=efficiency_rotation_interval, clock=self.clock,