Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 95 additions & 7 deletions custom_components/rce_pse/sensors/peak_hours.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,42 @@
from __future__ import annotations

import json
from pathlib import Path
from typing import TYPE_CHECKING, Any

from homeassistant.util import dt as dt_util

from ..const import PDGSZ_USAGE_FCST_TO_ATTR
from .base import RCEBaseSensor

if TYPE_CHECKING:
from ..coordinator import RCEPSEDataUpdateCoordinator

_TRANSLATIONS_DIR = Path(__file__).resolve().parent.parent / "translations"
_STATE_DISPLAY_CACHE: dict[tuple[str, str], dict[str, str]] = {}


def _load_state_display_names(lang: str, translation_key: str) -> dict[str, str]:
cache_key = (lang, translation_key)
if cache_key in _STATE_DISPLAY_CACHE:
return _STATE_DISPLAY_CACHE[cache_key]
result: dict[str, str] = {}
try:
path = _TRANSLATIONS_DIR / f"{lang}.json"
if not path.exists():
path = _TRANSLATIONS_DIR / "en.json"
if path.exists():
with open(path, encoding="utf-8") as f:
data = json.load(f)
attrs = data.get("entity", {}).get("sensor", {}).get(translation_key, {}).get("state_attributes", {})
for key, obj in attrs.items():
if isinstance(obj, dict) and "name" in obj:
result[key] = obj["name"]
except (OSError, ValueError, KeyError):
pass
_STATE_DISPLAY_CACHE[cache_key] = result
return result


def _pdgsz_records_to_ranges(records: list[dict]) -> dict[str, list[str]]:
attr_keys = set(PDGSZ_USAGE_FCST_TO_ATTR.values())
Expand Down Expand Up @@ -42,6 +71,36 @@ def _pdgsz_records_to_ranges(records: list[dict]) -> dict[str, list[str]]:
return result


def _pdgsz_records_to_hourly_state(records: list[dict]) -> dict[int, str]:
result: dict[int, str] = {}
for rec in records:
dtime_str = rec.get("dtime", "")
if " " not in dtime_str:
continue
try:
hour = int(dtime_str.split(" ", 1)[1].split(":")[0])
except (ValueError, IndexError):
continue
if 0 <= hour <= 23:
fcst = rec.get("usage_fcst", 1)
result[hour] = PDGSZ_USAGE_FCST_TO_ATTR.get(fcst, "normal_usage")
return result


def _hourly_states_attributes(
hourly: dict[int, str],
display_names: dict[str, str],
) -> list[dict[str, Any]]:
return [
{
"hour": f"{h:02d}:00",
"state": hourly.get(h),
"state_display": display_names.get(hourly.get(h, ""), ""),
}
for h in range(24)
]


class RCEPeakHoursSensorBase(RCEBaseSensor):
def __init__(self, coordinator: RCEPSEDataUpdateCoordinator, unique_id: str) -> None:
super().__init__(coordinator, unique_id)
Expand All @@ -50,25 +109,51 @@ def __init__(self, coordinator: RCEPSEDataUpdateCoordinator, unique_id: str) ->
def _get_pdgsz_records(self) -> list[dict]:
raise NotImplementedError

def _get_current_hour_for_state(self) -> int:
return dt_util.now().hour

def _get_state_display(self, state_key: str | None) -> str:
if not state_key:
return ""
try:
lang = self.hass.config.language
if lang not in ("pl", "en"):
lang = "en"
names = _load_state_display_names(lang, self._attr_translation_key)
return names.get(state_key, state_key)
except (AttributeError, KeyError):
return state_key

@property
def native_value(self) -> int | None:
def native_value(self) -> str | None:
records = self._get_pdgsz_records()
if not records:
hourly = _pdgsz_records_to_hourly_state(records)
current_hour = self._get_current_hour_for_state()
state_key = hourly.get(current_hour)
if state_key is None:
return None
ranges = _pdgsz_records_to_ranges(records)
return sum(len(v) for v in ranges.values())
return self._get_state_display(state_key)

@property
def extra_state_attributes(self) -> dict[str, Any]:
records = self._get_pdgsz_records()
return _pdgsz_records_to_ranges(records)
hourly = _pdgsz_records_to_hourly_state(records)
try:
lang = self.hass.config.language
if lang not in ("pl", "en"):
lang = "en"
display_names = _load_state_display_names(lang, self._attr_translation_key)
except (AttributeError, KeyError):
display_names = {}
return {
"records": records,
"hourly_states": _hourly_states_attributes(hourly, display_names),
}

@property
def available(self) -> bool:
if not self.coordinator.last_update_success or not self.coordinator.data:
return False
if "pdgsz_data" not in self.coordinator.data:
return False
return True


Expand All @@ -87,6 +172,9 @@ def __init__(self, coordinator: RCEPSEDataUpdateCoordinator) -> None:
def _get_pdgsz_records(self) -> list[dict]:
return self.get_tomorrow_pdgsz_data()

def _get_current_hour_for_state(self) -> int:
return 0

@property
def available(self) -> bool:
if not self.is_tomorrow_data_available():
Expand Down
6 changes: 3 additions & 3 deletions docs/SENSORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ Wszystkie zwracają **timestamp** (datetime). Aby pokazać tylko godzinę (np. H

Dane z raportu PSE „Godziny Szczytu” (API PDGSZ) – kiedy zalecane jest użytkowanie energii, a kiedy oszczędzanie.

- **Godziny Szczytu Dzisiaj** – wartość: liczba przedziałów godzinowych; atrybuty z listami przedziałów w formacie „HH:00–HH:00”. Nazwy tych czterech atrybutów są tłumaczone w interfejsie (PL/EN).
- **Godziny Szczytu Jutro** – to samo dla następnego dnia (dostępne po 14:00 CET).
- **Godziny Szczytu Dzisiaj** – stan: wartość tekstowa dla bieżącej godziny (np. „Zalecane użytkowanie”) lub brak danych jak przy sensorach „Jutro” (np. okna najniższej ceny) – wtedy stan „unknown”. Atrybuty: **records** – surowa odpowiedź API z dnia (tylko wpisy aktywne, is_active); **hourly_states** – lista 24 wpisów `{ "hour": "HH:00", "state": "klucz", "state_display": "tekst np. Zalecane użytkowanie" }` dla każdej godziny dnia.
- **Godziny Szczytu Jutro** – to samo dla następnego dnia (dostępne po 14:00 CET); stan odnosi się do godziny 00:00 jutro.

Cztery atrybuty (listy przedziałów):
Możliwe stany (wyświetlane jako tekst w języku interfejsu):

- **Zalecane użytkowanie** – korzystny czas na używanie energii.
- **Normalne użytkowanie** – użytkowanie bez szczególnych zaleceń.
Expand Down
1 change: 1 addition & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ def mock_hass():
hass = Mock(spec=HomeAssistant)
hass.config = Mock()
hass.config.time_zone = "Europe/Warsaw"
hass.config.language = "en"
hass.data = {}
return hass

Expand Down
70 changes: 52 additions & 18 deletions tests/test_peak_hours_sensors.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from unittest.mock import patch

from custom_components.rce_pse.sensors.peak_hours import (
_pdgsz_records_to_hourly_state,
_pdgsz_records_to_ranges,
RCETodayPeakHoursSensor,
RCETomorrowPeakHoursSensor,
Expand Down Expand Up @@ -58,55 +59,88 @@ def test_usage_fcst_3_maps_to_required_restriction(self):
assert result["recommended_usage"] == []


class TestPdgszRecordsToHourlyState:

def test_empty_returns_empty_dict(self):
assert _pdgsz_records_to_hourly_state([]) == {}

def test_maps_hour_to_attr_key(self):
records = [
{"dtime": "2025-05-29 07:00", "business_date": "2025-05-29", "usage_fcst": 0},
{"dtime": "2025-05-29 08:00", "business_date": "2025-05-29", "usage_fcst": 2},
]
result = _pdgsz_records_to_hourly_state(records)
assert result[7] == "recommended_usage"
assert result[8] == "recommended_saving"

def test_skips_invalid_dtime(self):
records = [
{"dtime": "no-space", "usage_fcst": 0},
]
assert _pdgsz_records_to_hourly_state(records) == {}


class TestTodayPeakHoursSensor:

def test_initialization(self, mock_coordinator):
sensor = RCETodayPeakHoursSensor(mock_coordinator)
assert sensor._attr_unique_id == "rce_pse_today_peak_hours"
assert sensor._attr_icon == "mdi:flash"

def test_native_value_with_data(self, mock_coordinator):
def test_native_value_with_data_returns_translated_text(self, mock_coordinator):
mock_coordinator.data["pdgsz_data"] = [
{"dtime": "2025-05-29 07:00", "business_date": "2025-05-29", "usage_fcst": 0},
{"dtime": "2025-05-29 08:00", "business_date": "2025-05-29", "usage_fcst": 1},
]
with patch.object(mock_coordinator, 'data', mock_coordinator.data):
sensor = RCETodayPeakHoursSensor(mock_coordinator)
with patch.object(sensor, 'get_today_pdgsz_data') as mock_get:
mock_get.return_value = mock_coordinator.data["pdgsz_data"]
assert sensor.native_value == 2
sensor = RCETodayPeakHoursSensor(mock_coordinator)
sensor.hass = mock_coordinator.hass
with patch.object(sensor, 'get_today_pdgsz_data') as mock_get:
mock_get.return_value = mock_coordinator.data["pdgsz_data"]
with patch.object(sensor, '_get_current_hour_for_state', return_value=7):
assert sensor.native_value == "Recommended usage"
with patch.object(sensor, '_get_current_hour_for_state', return_value=8):
assert sensor.native_value == "Normal usage"

def test_native_value_empty_data(self, mock_coordinator):
def test_native_value_empty_data_returns_none(self, mock_coordinator):
mock_coordinator.data["pdgsz_data"] = []
sensor = RCETodayPeakHoursSensor(mock_coordinator)
with patch.object(sensor, 'get_today_pdgsz_data', return_value=[]):
assert sensor.native_value is None

def test_extra_state_attributes_with_data(self, mock_coordinator):
def test_extra_state_attributes_records_and_hourly_states(self, mock_coordinator):
today = "2025-05-29"
mock_coordinator.data["pdgsz_data"] = [
records = [
{"dtime": f"{today} 07:00", "business_date": today, "usage_fcst": 0},
{"dtime": f"{today} 08:00", "business_date": today, "usage_fcst": 2},
]
mock_coordinator.data["pdgsz_data"] = records
sensor = RCETodayPeakHoursSensor(mock_coordinator)
sensor.hass = mock_coordinator.hass
with patch.object(sensor, 'get_today_pdgsz_data') as mock_get:
mock_get.return_value = mock_coordinator.data["pdgsz_data"]
mock_get.return_value = records
attrs = sensor.extra_state_attributes
assert "recommended_usage" in attrs
assert "recommended_saving" in attrs
assert attrs["recommended_usage"] == ["07:00–08:00"]
assert attrs["recommended_saving"] == ["08:00–09:00"]

def test_available_when_pdgsz_data_present(self, mock_coordinator):
assert "records" in attrs
assert attrs["records"] == records
assert "hourly_states" in attrs
hourly = attrs["hourly_states"]
assert len(hourly) == 24
assert hourly[7] == {"hour": "07:00", "state": "recommended_usage", "state_display": "Recommended usage"}
assert hourly[8] == {"hour": "08:00", "state": "recommended_saving", "state_display": "Recommended saving"}
assert hourly[0]["state"] is None
assert hourly[0]["state_display"] == ""

def test_available_when_coordinator_has_data(self, mock_coordinator):
mock_coordinator.data["pdgsz_data"] = []
sensor = RCETodayPeakHoursSensor(mock_coordinator)
assert sensor.available is True

def test_not_available_when_no_pdgsz_data_key(self, mock_coordinator):
def test_available_when_no_pdgsz_data_key_native_value_none(self, mock_coordinator):
if "pdgsz_data" in mock_coordinator.data:
del mock_coordinator.data["pdgsz_data"]
sensor = RCETodayPeakHoursSensor(mock_coordinator)
assert sensor.available is False
assert sensor.available is True
with patch.object(sensor, 'get_today_pdgsz_data', return_value=[]):
assert sensor.native_value is None


class TestTomorrowPeakHoursSensor:
Expand Down
Loading