From 87879c144c2654123d04865b484cddf57ebb830c Mon Sep 17 00:00:00 2001 From: Lewa Reka Date: Sun, 15 Mar 2026 08:33:28 +0100 Subject: [PATCH] fix: ps peak hours --- .../rce_pse/sensors/peak_hours.py | 102 ++++++++++++++++-- docs/SENSORY.md | 6 +- tests/conftest.py | 1 + tests/test_peak_hours_sensors.py | 70 ++++++++---- 4 files changed, 151 insertions(+), 28 deletions(-) diff --git a/custom_components/rce_pse/sensors/peak_hours.py b/custom_components/rce_pse/sensors/peak_hours.py index 39d1c2b..27b4e6b 100644 --- a/custom_components/rce_pse/sensors/peak_hours.py +++ b/custom_components/rce_pse/sensors/peak_hours.py @@ -1,13 +1,42 @@ from __future__ import annotations +import json +from pathlib import Path from typing import TYPE_CHECKING, Any +from homeassistant.util import dt as dt_util + from ..const import PDGSZ_USAGE_FCST_TO_ATTR from .base import RCEBaseSensor if TYPE_CHECKING: from ..coordinator import RCEPSEDataUpdateCoordinator +_TRANSLATIONS_DIR = Path(__file__).resolve().parent.parent / "translations" +_STATE_DISPLAY_CACHE: dict[tuple[str, str], dict[str, str]] = {} + + +def _load_state_display_names(lang: str, translation_key: str) -> dict[str, str]: + cache_key = (lang, translation_key) + if cache_key in _STATE_DISPLAY_CACHE: + return _STATE_DISPLAY_CACHE[cache_key] + result: dict[str, str] = {} + try: + path = _TRANSLATIONS_DIR / f"{lang}.json" + if not path.exists(): + path = _TRANSLATIONS_DIR / "en.json" + if path.exists(): + with open(path, encoding="utf-8") as f: + data = json.load(f) + attrs = data.get("entity", {}).get("sensor", {}).get(translation_key, {}).get("state_attributes", {}) + for key, obj in attrs.items(): + if isinstance(obj, dict) and "name" in obj: + result[key] = obj["name"] + except (OSError, ValueError, KeyError): + pass + _STATE_DISPLAY_CACHE[cache_key] = result + return result + def _pdgsz_records_to_ranges(records: list[dict]) -> dict[str, list[str]]: attr_keys = set(PDGSZ_USAGE_FCST_TO_ATTR.values()) @@ -42,6 +71,36 @@ def _pdgsz_records_to_ranges(records: list[dict]) -> dict[str, list[str]]: return result +def _pdgsz_records_to_hourly_state(records: list[dict]) -> dict[int, str]: + result: dict[int, str] = {} + for rec in records: + dtime_str = rec.get("dtime", "") + if " " not in dtime_str: + continue + try: + hour = int(dtime_str.split(" ", 1)[1].split(":")[0]) + except (ValueError, IndexError): + continue + if 0 <= hour <= 23: + fcst = rec.get("usage_fcst", 1) + result[hour] = PDGSZ_USAGE_FCST_TO_ATTR.get(fcst, "normal_usage") + return result + + +def _hourly_states_attributes( + hourly: dict[int, str], + display_names: dict[str, str], +) -> list[dict[str, Any]]: + return [ + { + "hour": f"{h:02d}:00", + "state": hourly.get(h), + "state_display": display_names.get(hourly.get(h, ""), ""), + } + for h in range(24) + ] + + class RCEPeakHoursSensorBase(RCEBaseSensor): def __init__(self, coordinator: RCEPSEDataUpdateCoordinator, unique_id: str) -> None: super().__init__(coordinator, unique_id) @@ -50,25 +109,51 @@ def __init__(self, coordinator: RCEPSEDataUpdateCoordinator, unique_id: str) -> def _get_pdgsz_records(self) -> list[dict]: raise NotImplementedError + def _get_current_hour_for_state(self) -> int: + return dt_util.now().hour + + def _get_state_display(self, state_key: str | None) -> str: + if not state_key: + return "" + try: + lang = self.hass.config.language + if lang not in ("pl", "en"): + lang = "en" + names = _load_state_display_names(lang, self._attr_translation_key) + return names.get(state_key, state_key) + except (AttributeError, KeyError): + return state_key + @property - def native_value(self) -> int | None: + def native_value(self) -> str | None: records = self._get_pdgsz_records() - if not records: + hourly = _pdgsz_records_to_hourly_state(records) + current_hour = self._get_current_hour_for_state() + state_key = hourly.get(current_hour) + if state_key is None: return None - ranges = _pdgsz_records_to_ranges(records) - return sum(len(v) for v in ranges.values()) + return self._get_state_display(state_key) @property def extra_state_attributes(self) -> dict[str, Any]: records = self._get_pdgsz_records() - return _pdgsz_records_to_ranges(records) + hourly = _pdgsz_records_to_hourly_state(records) + try: + lang = self.hass.config.language + if lang not in ("pl", "en"): + lang = "en" + display_names = _load_state_display_names(lang, self._attr_translation_key) + except (AttributeError, KeyError): + display_names = {} + return { + "records": records, + "hourly_states": _hourly_states_attributes(hourly, display_names), + } @property def available(self) -> bool: if not self.coordinator.last_update_success or not self.coordinator.data: return False - if "pdgsz_data" not in self.coordinator.data: - return False return True @@ -87,6 +172,9 @@ def __init__(self, coordinator: RCEPSEDataUpdateCoordinator) -> None: def _get_pdgsz_records(self) -> list[dict]: return self.get_tomorrow_pdgsz_data() + def _get_current_hour_for_state(self) -> int: + return 0 + @property def available(self) -> bool: if not self.is_tomorrow_data_available(): diff --git a/docs/SENSORY.md b/docs/SENSORY.md index ddd7db3..cd06a4d 100644 --- a/docs/SENSORY.md +++ b/docs/SENSORY.md @@ -36,10 +36,10 @@ Wszystkie zwracają **timestamp** (datetime). Aby pokazać tylko godzinę (np. H Dane z raportu PSE „Godziny Szczytu” (API PDGSZ) – kiedy zalecane jest użytkowanie energii, a kiedy oszczędzanie. -- **Godziny Szczytu Dzisiaj** – wartość: liczba przedziałów godzinowych; atrybuty z listami przedziałów w formacie „HH:00–HH:00”. Nazwy tych czterech atrybutów są tłumaczone w interfejsie (PL/EN). -- **Godziny Szczytu Jutro** – to samo dla następnego dnia (dostępne po 14:00 CET). +- **Godziny Szczytu Dzisiaj** – stan: wartość tekstowa dla bieżącej godziny (np. „Zalecane użytkowanie”) lub brak danych jak przy sensorach „Jutro” (np. okna najniższej ceny) – wtedy stan „unknown”. Atrybuty: **records** – surowa odpowiedź API z dnia (tylko wpisy aktywne, is_active); **hourly_states** – lista 24 wpisów `{ "hour": "HH:00", "state": "klucz", "state_display": "tekst np. Zalecane użytkowanie" }` dla każdej godziny dnia. +- **Godziny Szczytu Jutro** – to samo dla następnego dnia (dostępne po 14:00 CET); stan odnosi się do godziny 00:00 jutro. -Cztery atrybuty (listy przedziałów): +Możliwe stany (wyświetlane jako tekst w języku interfejsu): - **Zalecane użytkowanie** – korzystny czas na używanie energii. - **Normalne użytkowanie** – użytkowanie bez szczególnych zaleceń. diff --git a/tests/conftest.py b/tests/conftest.py index b22d3f1..06b3ae6 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -19,6 +19,7 @@ def mock_hass(): hass = Mock(spec=HomeAssistant) hass.config = Mock() hass.config.time_zone = "Europe/Warsaw" + hass.config.language = "en" hass.data = {} return hass diff --git a/tests/test_peak_hours_sensors.py b/tests/test_peak_hours_sensors.py index 9883972..a48b0bc 100644 --- a/tests/test_peak_hours_sensors.py +++ b/tests/test_peak_hours_sensors.py @@ -3,6 +3,7 @@ from unittest.mock import patch from custom_components.rce_pse.sensors.peak_hours import ( + _pdgsz_records_to_hourly_state, _pdgsz_records_to_ranges, RCETodayPeakHoursSensor, RCETomorrowPeakHoursSensor, @@ -58,6 +59,27 @@ def test_usage_fcst_3_maps_to_required_restriction(self): assert result["recommended_usage"] == [] +class TestPdgszRecordsToHourlyState: + + def test_empty_returns_empty_dict(self): + assert _pdgsz_records_to_hourly_state([]) == {} + + def test_maps_hour_to_attr_key(self): + records = [ + {"dtime": "2025-05-29 07:00", "business_date": "2025-05-29", "usage_fcst": 0}, + {"dtime": "2025-05-29 08:00", "business_date": "2025-05-29", "usage_fcst": 2}, + ] + result = _pdgsz_records_to_hourly_state(records) + assert result[7] == "recommended_usage" + assert result[8] == "recommended_saving" + + def test_skips_invalid_dtime(self): + records = [ + {"dtime": "no-space", "usage_fcst": 0}, + ] + assert _pdgsz_records_to_hourly_state(records) == {} + + class TestTodayPeakHoursSensor: def test_initialization(self, mock_coordinator): @@ -65,48 +87,60 @@ def test_initialization(self, mock_coordinator): assert sensor._attr_unique_id == "rce_pse_today_peak_hours" assert sensor._attr_icon == "mdi:flash" - def test_native_value_with_data(self, mock_coordinator): + def test_native_value_with_data_returns_translated_text(self, mock_coordinator): mock_coordinator.data["pdgsz_data"] = [ {"dtime": "2025-05-29 07:00", "business_date": "2025-05-29", "usage_fcst": 0}, {"dtime": "2025-05-29 08:00", "business_date": "2025-05-29", "usage_fcst": 1}, ] - with patch.object(mock_coordinator, 'data', mock_coordinator.data): - sensor = RCETodayPeakHoursSensor(mock_coordinator) - with patch.object(sensor, 'get_today_pdgsz_data') as mock_get: - mock_get.return_value = mock_coordinator.data["pdgsz_data"] - assert sensor.native_value == 2 + sensor = RCETodayPeakHoursSensor(mock_coordinator) + sensor.hass = mock_coordinator.hass + with patch.object(sensor, 'get_today_pdgsz_data') as mock_get: + mock_get.return_value = mock_coordinator.data["pdgsz_data"] + with patch.object(sensor, '_get_current_hour_for_state', return_value=7): + assert sensor.native_value == "Recommended usage" + with patch.object(sensor, '_get_current_hour_for_state', return_value=8): + assert sensor.native_value == "Normal usage" - def test_native_value_empty_data(self, mock_coordinator): + def test_native_value_empty_data_returns_none(self, mock_coordinator): mock_coordinator.data["pdgsz_data"] = [] sensor = RCETodayPeakHoursSensor(mock_coordinator) with patch.object(sensor, 'get_today_pdgsz_data', return_value=[]): assert sensor.native_value is None - def test_extra_state_attributes_with_data(self, mock_coordinator): + def test_extra_state_attributes_records_and_hourly_states(self, mock_coordinator): today = "2025-05-29" - mock_coordinator.data["pdgsz_data"] = [ + records = [ {"dtime": f"{today} 07:00", "business_date": today, "usage_fcst": 0}, {"dtime": f"{today} 08:00", "business_date": today, "usage_fcst": 2}, ] + mock_coordinator.data["pdgsz_data"] = records sensor = RCETodayPeakHoursSensor(mock_coordinator) + sensor.hass = mock_coordinator.hass with patch.object(sensor, 'get_today_pdgsz_data') as mock_get: - mock_get.return_value = mock_coordinator.data["pdgsz_data"] + mock_get.return_value = records attrs = sensor.extra_state_attributes - assert "recommended_usage" in attrs - assert "recommended_saving" in attrs - assert attrs["recommended_usage"] == ["07:00–08:00"] - assert attrs["recommended_saving"] == ["08:00–09:00"] - - def test_available_when_pdgsz_data_present(self, mock_coordinator): + assert "records" in attrs + assert attrs["records"] == records + assert "hourly_states" in attrs + hourly = attrs["hourly_states"] + assert len(hourly) == 24 + assert hourly[7] == {"hour": "07:00", "state": "recommended_usage", "state_display": "Recommended usage"} + assert hourly[8] == {"hour": "08:00", "state": "recommended_saving", "state_display": "Recommended saving"} + assert hourly[0]["state"] is None + assert hourly[0]["state_display"] == "" + + def test_available_when_coordinator_has_data(self, mock_coordinator): mock_coordinator.data["pdgsz_data"] = [] sensor = RCETodayPeakHoursSensor(mock_coordinator) assert sensor.available is True - def test_not_available_when_no_pdgsz_data_key(self, mock_coordinator): + def test_available_when_no_pdgsz_data_key_native_value_none(self, mock_coordinator): if "pdgsz_data" in mock_coordinator.data: del mock_coordinator.data["pdgsz_data"] sensor = RCETodayPeakHoursSensor(mock_coordinator) - assert sensor.available is False + assert sensor.available is True + with patch.object(sensor, 'get_today_pdgsz_data', return_value=[]): + assert sensor.native_value is None class TestTomorrowPeakHoursSensor: