diff --git a/CHANGELOG.md b/CHANGELOG.md index 84d654c0..511aeef7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,7 @@ # Changelog ## Next +- Added optional Hampel outlier filter (`HAMPEL_WINDOW`, `HAMPEL_N_SIGMA`, `HAMPEL_MIN_THRESHOLD`) for rejecting wild samples from noisy powermeter sources (MQTT/HTTP/WiFi); configurable globally or per powermeter section, disabled by default - Added `DEDUPE_TIME_WINDOW` support to the Shelly emulator to drop burst-repeat requests from the same battery IP; the value can be set under `[GENERAL]` to apply regardless of which device type is emulated - Added opt-in web-based configuration editor (`WEB_CONFIG_ENABLED = True` in `[GENERAL]`) accessible at `http://:52500/config`; supports editing all config sections and keys with type-appropriate inputs, comment preservation, and a Save & Restart button - **Breaking:** Rebrand project from "B2500 Meter" to "AstraMeter" (formerly b2500-meter). Package renamed to `astrameter`, CLI commands are now `astrameter` and `astra-sim`. Docker image moved from `ghcr.io/tomquist/b2500-meter` to `ghcr.io/tomquist/astrameter` (the legacy `ghcr.io/tomquist/b2500-meter` image is still published in parallel for backward compatibility). Home Assistant users must update their app repository URL to `https://github.com/tomquist/astrameter#main`. diff --git a/README.md b/README.md index 7fddfc45..568a459b 100644 --- a/README.md +++ b/README.md @@ -202,6 +202,15 @@ Per-powermeter options (apply in any powermeter section, e.g. `[TASMOTA]` or `[H - **DEADBAND** (default 0 = disabled, W) — When the absolute reading is below this value, the wrapper emits zeros instead of chasing noise. Keeps batteries from hunting around the zero-crossing; 10–30 W is a sensible range. +- **HAMPEL_WINDOW** (default 0 = disabled) — Rolling window size for + median-based outlier rejection. Typical values 5–7. Useful for MQTT/HTTP + sources that occasionally emit wild samples; applied after throttling and + before EMA smoothing. +- **HAMPEL_N_SIGMA** (default 3.0) — Rejection threshold in MAD-derived sigmas. + Lower values reject more aggressively. +- **HAMPEL_MIN_THRESHOLD** (default 0, W) — Minimum rejection threshold in + watts. Prevents spikes from passing through during long periods of constant + readings (the MAD=0 degenerate case); 50 W is a reasonable starting value. CT002/CT003 active-steering options (all under `[CT002]` or `[CT003]`): - **ACTIVE_CONTROL** — When true (default), the emulator smooths the grid reading, splits diff --git a/config.ini.example b/config.ini.example index fd0a32c8..41595480 100644 --- a/config.ini.example +++ b/config.ini.example @@ -357,6 +357,26 @@ THROTTLE_INTERVAL = 0 #PID_OUTPUT_MAX = 800 #PID_MODE = bias +## --- Hampel outlier filter (optional) --- +## Stateful rolling-median rejection of wild samples (e.g. MQTT/WiFi glitches). +## Outliers are replaced with the window median; normal samples pass through +## unchanged. Disabled by default. +## +## When HAMPEL_WINDOW > 0, the wrapper is inserted after throttling and before +## EMA smoothing. This is orthogonal to the EMA: Hampel rejects impulse noise, +## the EMA smooths the cleaned signal. +## +## Recommended starting point for flaky HTTP/MQTT sources: +## HAMPEL_WINDOW = 5 # odd sizes recommended; 5–7 works well +## HAMPEL_N_SIGMA = 3.0 # reject beyond ~3σ (MAD-derived) +## HAMPEL_MIN_THRESHOLD = 50 # watts; floor for constant-signal (MAD=0) case +## +## Parameters can be set globally in [GENERAL] or per powermeter section. +## Per-section values override the global ones. +#HAMPEL_WINDOW = 5 +#HAMPEL_N_SIGMA = 3.0 +#HAMPEL_MIN_THRESHOLD = 50 + ## --- MQTT Insights (optional) --- ## Publishes internal state (grid power, targets, saturation, consumer topology) ## to MQTT with Home Assistant Device Discovery support. diff --git a/src/astrameter/config/config_loader.py b/src/astrameter/config/config_loader.py index b4273b7c..f36bdfb9 100644 --- a/src/astrameter/config/config_loader.py +++ b/src/astrameter/config/config_loader.py @@ -37,6 +37,7 @@ VZLogger, parse_sml_obis_config, ) +from astrameter.powermeter.wrappers.hampel import HampelPowermeter from astrameter.powermeter.wrappers.smoothing import ( DeadbandPowermeter, SmoothedPowermeter, @@ -162,6 +163,11 @@ def read_all_powermeter_configs( ) global_max_smooth_step = config.getfloat("GENERAL", "MAX_SMOOTH_STEP", fallback=0.0) global_deadband = config.getfloat("GENERAL", "DEADBAND", fallback=0.0) + global_hampel_window = config.getint("GENERAL", "HAMPEL_WINDOW", fallback=0) + global_hampel_n_sigma = config.getfloat("GENERAL", "HAMPEL_N_SIGMA", fallback=3.0) + global_hampel_min_threshold = config.getfloat( + "GENERAL", "HAMPEL_MIN_THRESHOLD", fallback=0.0 + ) global_pid_kp = config.getfloat("GENERAL", "PID_KP", fallback=0.0) global_pid_ki = config.getfloat("GENERAL", "PID_KI", fallback=0.0) global_pid_kd = config.getfloat("GENERAL", "PID_KD", fallback=0.0) @@ -208,6 +214,38 @@ def read_all_powermeter_configs( ) powermeter = ThrottledPowermeter(powermeter, section_throttle_interval) + section_hampel_window = config.getint( + section, "HAMPEL_WINDOW", fallback=global_hampel_window + ) + if section_hampel_window > 0: + section_hampel_n_sigma = config.getfloat( + section, "HAMPEL_N_SIGMA", fallback=global_hampel_n_sigma + ) + section_hampel_min_threshold = config.getfloat( + section, + "HAMPEL_MIN_THRESHOLD", + fallback=global_hampel_min_threshold, + ) + hampel_source = ( + "section-specific" + if config.has_option(section, "HAMPEL_WINDOW") + else "global" + ) + logger.info( + "Applying %s Hampel outlier filter (window=%d, n_sigma=%.2f, min_threshold=%.0fW) to %s", + hampel_source, + section_hampel_window, + section_hampel_n_sigma, + section_hampel_min_threshold, + section, + ) + powermeter = HampelPowermeter( + powermeter, + window=section_hampel_window, + n_sigma=section_hampel_n_sigma, + min_threshold=section_hampel_min_threshold, + ) + section_smooth_alpha = config.getfloat( section, "SMOOTH_TARGET_ALPHA", fallback=global_smooth_alpha ) diff --git a/src/astrameter/powermeter/__init__.py b/src/astrameter/powermeter/__init__.py index 437dee1e..d4ebf13a 100644 --- a/src/astrameter/powermeter/__init__.py +++ b/src/astrameter/powermeter/__init__.py @@ -18,6 +18,7 @@ from .vzlogger import VZLogger from .wrappers import ( DeadbandPowermeter, + HampelPowermeter, PidPowermeter, PowermeterWrapper, SmoothedPowermeter, @@ -30,6 +31,7 @@ "DeadbandPowermeter", "ESPHome", "Emlog", + "HampelPowermeter", "HomeAssistant", "HomeWizardPowermeter", "IoBroker", diff --git a/src/astrameter/powermeter/wrappers/__init__.py b/src/astrameter/powermeter/wrappers/__init__.py index d83bf582..d93af569 100644 --- a/src/astrameter/powermeter/wrappers/__init__.py +++ b/src/astrameter/powermeter/wrappers/__init__.py @@ -1,4 +1,5 @@ from .base import PowermeterWrapper +from .hampel import HampelPowermeter from .pid import PidPowermeter from .smoothing import DeadbandPowermeter, SmoothedPowermeter from .throttling import ThrottledPowermeter @@ -6,6 +7,7 @@ __all__ = [ "DeadbandPowermeter", + "HampelPowermeter", "PidPowermeter", "PowermeterWrapper", "SmoothedPowermeter", diff --git a/src/astrameter/powermeter/wrappers/hampel.py b/src/astrameter/powermeter/wrappers/hampel.py new file mode 100644 index 00000000..eea3dcf5 --- /dev/null +++ b/src/astrameter/powermeter/wrappers/hampel.py @@ -0,0 +1,88 @@ +"""Hampel outlier-rejection powermeter wrapper.""" + +from __future__ import annotations + +import statistics +from collections import deque + +from astrameter.config.logger import logger +from astrameter.powermeter.base import Powermeter + +from .base import PowermeterWrapper + + +class HampelPowermeter(PowermeterWrapper): + """Rolling-median outlier filter for sum-of-phases power readings. + + Maintains a rolling window of the most recent ``window`` totals. When the + next total lies more than ``n_sigma * 1.4826 * MAD`` away from the window + median (with a floor of ``min_threshold`` watts to handle the constant- + signal MAD=0 degenerate case), the sample is treated as an outlier: the + reported total is replaced by the median and per-phase values are + redistributed proportionally (equal split when ``|raw_total|`` is near + zero). The window entry itself is mutated to the median so a single spike + does not poison future detections — this is the canonical Hampel + identifier formulation used in control literature. + + Operates on the sum of phases, mirroring :class:`SmoothedPowermeter`. + A phase-cancelling outlier (e.g. +1000 W on L1 and -1000 W on L2) is + therefore invisible to this filter; that is acceptable because every + downstream wrapper (EMA, deadband, PID) also operates on sum-of-phases. + """ + + MAD_SCALE = 1.4826 + + def __init__( + self, + wrapped_powermeter: Powermeter, + window: int, + n_sigma: float = 3.0, + min_threshold: float = 0.0, + ) -> None: + if window < 1: + raise ValueError(f"Hampel window must be >= 1, got {window}") + if n_sigma < 0: + raise ValueError(f"Hampel n_sigma must be >= 0, got {n_sigma}") + if min_threshold < 0: + raise ValueError(f"Hampel min_threshold must be >= 0, got {min_threshold}") + super().__init__(wrapped_powermeter) + self._window: deque[float] = deque(maxlen=window) + self._window_size = window + self._n_sigma = n_sigma + self._min_threshold = min_threshold + + def reset(self) -> None: + super().reset() + logger.debug("HampelPowermeter: reset (window size=%d)", len(self._window)) + self._window.clear() + + async def get_powermeter_watts(self) -> list[float]: + raw_values = await self.wrapped_powermeter.get_powermeter_watts() + if not raw_values: + return [] + + raw_total = sum(raw_values) + self._window.append(raw_total) + + if len(self._window) < self._window_size: + return list(raw_values) + + median = statistics.median(self._window) + mad = statistics.median(abs(x - median) for x in self._window) + threshold = max(self._n_sigma * self.MAD_SCALE * mad, self._min_threshold) + + if threshold <= 0 or abs(raw_total - median) <= threshold: + return list(raw_values) + + self._window[-1] = median + logger.debug( + "HampelPowermeter: outlier rejected raw=%.2f median=%.2f threshold=%.2f", + raw_total, + median, + threshold, + ) + + if abs(raw_total) < 1e-9: + return [median / len(raw_values)] * len(raw_values) + ratio = median / raw_total + return [v * ratio for v in raw_values] diff --git a/src/astrameter/powermeter/wrappers/hampel_test.py b/src/astrameter/powermeter/wrappers/hampel_test.py new file mode 100644 index 00000000..aec96ce2 --- /dev/null +++ b/src/astrameter/powermeter/wrappers/hampel_test.py @@ -0,0 +1,268 @@ +"""Tests for HampelPowermeter.""" + +import pytest + +from .hampel import HampelPowermeter + + +class FakePowermeter: + """Minimal powermeter stub for testing wrappers.""" + + def __init__(self, values: list[float] | None = None): + self._values: list[float] = values if values is not None else [0.0] + self.started = False + self.stopped = False + self.reset_count = 0 + + def set(self, values: list[float]) -> None: + self._values = values + + async def get_powermeter_watts(self) -> list[float]: + return list(self._values) + + async def wait_for_message(self, timeout=5): + pass + + async def start(self): + self.started = True + + async def stop(self): + self.stopped = True + + def reset(self): + self.reset_count += 1 + + +async def _push(hp: HampelPowermeter, fake: FakePowermeter, values: list[float]): + fake.set(values) + return await hp.get_powermeter_watts() + + +class TestHampelPowermeter: + @pytest.mark.asyncio + async def test_warmup_passes_through(self): + fake = FakePowermeter([100.0]) + hp = HampelPowermeter(fake, window=5, n_sigma=3.0) + for _ in range(4): + result = await hp.get_powermeter_watts() + assert result == [100.0] + + @pytest.mark.asyncio + async def test_clean_signal_not_modified(self): + fake = FakePowermeter() + hp = HampelPowermeter(fake, window=5, n_sigma=3.0) + # Fill window with clean samples + for v in [100.0, 102.0, 98.0, 101.0, 99.0]: + await _push(hp, fake, [v]) + # Next clean sample — should pass through + result = await _push(hp, fake, [100.5]) + assert result == [100.5] + + @pytest.mark.asyncio + async def test_single_spike_replaced_by_median(self): + fake = FakePowermeter() + hp = HampelPowermeter(fake, window=5, n_sigma=3.0) + for v in [100.0, 102.0, 98.0, 101.0, 99.0]: + await _push(hp, fake, [v]) + result = await _push(hp, fake, [10000.0]) + # Median of the window after appending 10000 is 100 (or close), + # threshold is small, so the spike is replaced. + assert result[0] == pytest.approx(100.0, abs=5.0) + assert result[0] != 10000.0 + + @pytest.mark.asyncio + async def test_roll_forward_evicts_mutated_entry(self): + """After a replacement, the mutated (median) entry is what rolls out, + not the original outlier — so the outlier doesn't return to the window. + """ + fake = FakePowermeter() + hp = HampelPowermeter(fake, window=3, n_sigma=3.0) + # Fill window + await _push(hp, fake, [100.0]) + await _push(hp, fake, [100.0]) + await _push(hp, fake, [100.0]) + # Inject spike — window is [100, 100, 10000] before detection; after + # replacement the stored window is [100, 100, 100]. + await _push(hp, fake, [10000.0]) + # Next clean sample: window becomes [100, 100, 100] after evicting the + # oldest 100, and the new sample 101 is within threshold. + result = await _push(hp, fake, [101.0]) + assert result == [101.0] + + @pytest.mark.asyncio + async def test_constant_signal_mad_zero_no_floor(self): + """MAD=0 with min_threshold=0 → threshold is 0 → documented + limitation: spikes pass through.""" + fake = FakePowermeter() + hp = HampelPowermeter(fake, window=5, n_sigma=3.0, min_threshold=0.0) + for _ in range(5): + await _push(hp, fake, [0.0]) + result = await _push(hp, fake, [500.0]) + assert result == [500.0] + + @pytest.mark.asyncio + async def test_min_threshold_floor_catches_spike_on_constant(self): + fake = FakePowermeter() + hp = HampelPowermeter(fake, window=5, n_sigma=3.0, min_threshold=50.0) + for _ in range(5): + await _push(hp, fake, [0.0]) + result = await _push(hp, fake, [500.0]) + # Spike > 50 W floor → replaced with median (0) + assert result == [0.0] + + @pytest.mark.asyncio + async def test_min_threshold_floor_lets_small_changes_pass(self): + fake = FakePowermeter() + hp = HampelPowermeter(fake, window=5, n_sigma=3.0, min_threshold=50.0) + for _ in range(5): + await _push(hp, fake, [0.0]) + # Change smaller than the floor → passes through + result = await _push(hp, fake, [30.0]) + assert result == [30.0] + + @pytest.mark.asyncio + async def test_negative_outlier_symmetric(self): + fake = FakePowermeter() + hp = HampelPowermeter(fake, window=5, n_sigma=3.0) + for v in [100.0, 102.0, 98.0, 101.0, 99.0]: + await _push(hp, fake, [v]) + result = await _push(hp, fake, [-10000.0]) + assert result[0] == pytest.approx(100.0, abs=5.0) + assert result[0] != -10000.0 + + @pytest.mark.asyncio + async def test_phase_ratio_preserved_on_replacement(self): + # Constant total → MAD=0; use min_threshold to gate detection. + fake = FakePowermeter() + hp = HampelPowermeter(fake, window=5, n_sigma=3.0, min_threshold=50.0) + for phases in [ + [60.0, 30.0, 10.0], + [50.0, 30.0, 20.0], + [40.0, 40.0, 20.0], + [70.0, 20.0, 10.0], + [55.0, 35.0, 10.0], + ]: + await _push(hp, fake, phases) + # Spike with total=1000 and ratio 60:30:10 + result = await _push(hp, fake, [600.0, 300.0, 100.0]) + # Median of totals is 100. Replacement ratio = 100 / 1000 = 0.1 → + # per-phase [60, 30, 10]. + assert result == pytest.approx([60.0, 30.0, 10.0]) + + @pytest.mark.asyncio + async def test_raw_total_near_zero_equal_split(self): + fake = FakePowermeter() + hp = HampelPowermeter(fake, window=5, n_sigma=3.0, min_threshold=50.0) + # Fill window with a nonzero stable total so median is nonzero + for _ in range(5): + await _push(hp, fake, [100.0, 0.0, 0.0]) + # Next sample: phases cancel to zero total, which is a large departure + # from the median total of 100 → outlier. Replacement must equal-split. + result = await _push(hp, fake, [1.0, -1.0, 0.0]) + # Total was 0 → equal-split fallback: [median/3] * 3 + assert result == pytest.approx([100.0 / 3] * 3) + + @pytest.mark.asyncio + async def test_empty_raw_values_returns_empty(self): + fake = FakePowermeter([]) + hp = HampelPowermeter(fake, window=5, n_sigma=3.0) + result = await hp.get_powermeter_watts() + assert result == [] + + @pytest.mark.asyncio + async def test_window_one_always_passthrough(self): + """window=1 is degenerate: median == sample, MAD always 0 → passthrough + (when min_threshold=0).""" + fake = FakePowermeter() + hp = HampelPowermeter(fake, window=1, n_sigma=3.0, min_threshold=0.0) + for v in [100.0, 10000.0, -50.0, 0.0]: + result = await _push(hp, fake, [v]) + assert result == [v] + + @pytest.mark.asyncio + async def test_even_window_works(self): + """With an even window, statistics.median returns the mean of the two + middle values. Confirm no crash and sane behavior.""" + fake = FakePowermeter() + hp = HampelPowermeter(fake, window=4, n_sigma=3.0) + for v in [100.0, 102.0, 98.0, 101.0]: + result = await _push(hp, fake, [v]) + # Warmup: first 3 pass through; 4th fills window, median ≈ 100.5, + # MAD small → 101 is close → passes through. + assert result == [v] + # Now inject a clear spike + result = await _push(hp, fake, [10000.0]) + assert result[0] != 10000.0 + + @pytest.mark.asyncio + async def test_n_sigma_zero_relies_on_min_threshold(self): + fake = FakePowermeter() + hp = HampelPowermeter(fake, window=5, n_sigma=0.0, min_threshold=50.0) + for v in [100.0, 100.0, 100.0, 100.0, 100.0]: + await _push(hp, fake, [v]) + # n_sigma=0 → sigma term is 0; min_threshold=50 is the only gate. + # 120 vs median 100 = delta 20 < 50 → pass through + result = await _push(hp, fake, [120.0]) + assert result == [120.0] + # 200 vs median 100 = delta 100 > 50 → rejected + fake.set([200.0]) + result = await hp.get_powermeter_watts() + assert result[0] == pytest.approx(100.0, abs=1.0) + + @pytest.mark.asyncio + async def test_warmup_poisoning_recovers(self): + """A spike within the warmup window passes through, but subsequent + outliers after warmup still get detected once the window re-fills + with clean samples. Uses min_threshold so MAD=0 doesn't block.""" + fake = FakePowermeter() + hp = HampelPowermeter(fake, window=3, n_sigma=3.0, min_threshold=50.0) + # Warmup with poisoning: first sample is a spike + await _push(hp, fake, [10000.0]) # passes through (warmup) + await _push(hp, fake, [100.0]) # passes through (warmup) + await _push(hp, fake, [100.0]) # fills window; still has 10000 + await _push(hp, fake, [100.0]) # 10000 evicted; window=[100,100,100] + # Now inject a real spike — should be rejected via min_threshold. + result = await _push(hp, fake, [10000.0]) + assert result[0] != 10000.0 + + @pytest.mark.asyncio + async def test_reset_clears_window(self): + fake = FakePowermeter() + hp = HampelPowermeter(fake, window=3, n_sigma=3.0) + for _ in range(3): + await _push(hp, fake, [100.0]) + hp.reset() + assert fake.reset_count == 1 + # After reset, the next 2 samples are warmup and pass through + result = await _push(hp, fake, [10000.0]) + assert result == [10000.0] + + @pytest.mark.asyncio + async def test_invalid_window_rejected(self): + fake = FakePowermeter() + with pytest.raises(ValueError): + HampelPowermeter(fake, window=0) + + @pytest.mark.asyncio + async def test_invalid_n_sigma_rejected(self): + fake = FakePowermeter() + with pytest.raises(ValueError): + HampelPowermeter(fake, window=5, n_sigma=-1.0) + + @pytest.mark.asyncio + async def test_invalid_min_threshold_rejected(self): + fake = FakePowermeter() + with pytest.raises(ValueError): + HampelPowermeter(fake, window=5, min_threshold=-1.0) + + @pytest.mark.asyncio + async def test_lifecycle_delegation(self): + fake = FakePowermeter([100.0]) + hp = HampelPowermeter(fake, window=5, n_sigma=3.0) + await hp.start() + assert fake.started + await hp.stop() + assert fake.stopped + await hp.wait_for_message(timeout=1) + hp.reset() + assert fake.reset_count == 1