Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 41 additions & 2 deletions custom_components/pytap/config_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
rather than a comma-separated text blob.

Flow: user (host/port) → modules_menu → add_module (repeat) → finish
Options: init (menu) → add_module / remove_module → done
Options: init (menu) → add_module / remove_module / change_connection /
change_reporting → done
"""

from __future__ import annotations
Expand All @@ -32,8 +33,10 @@
CONF_MODULE_PEAK_POWER,
CONF_MODULE_STRING,
CONF_MODULES,
CONF_WRITE_INTERVAL,
DEFAULT_PEAK_POWER,
DEFAULT_PORT,
DEFAULT_WRITE_INTERVAL,
DOMAIN,
)

Expand Down Expand Up @@ -242,7 +245,13 @@ async def async_step_init(
"""Show the options menu: change connection / add / remove / done."""
return self.async_show_menu(
step_id="init",
menu_options=["change_connection", "add_module", "remove_module", "done"],
menu_options=[
"change_connection",
"change_reporting",
"add_module",
"remove_module",
"done",
],
description_placeholders={
"modules_list": _modules_description(self._modules),
},
Expand Down Expand Up @@ -308,6 +317,36 @@ async def async_step_change_connection(
errors=errors,
)

async def async_step_change_reporting(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Allow the user to change reporting settings (write interval)."""
if user_input is not None:
new_data = {**self._config_entry.data}
new_data[CONF_WRITE_INTERVAL] = user_input.get(
CONF_WRITE_INTERVAL, DEFAULT_WRITE_INTERVAL
)
self.hass.config_entries.async_update_entry(
self._config_entry, data=new_data
)
return await self.async_step_init()

current_write_interval = self._config_entry.data.get(
CONF_WRITE_INTERVAL, DEFAULT_WRITE_INTERVAL
)
schema = vol.Schema(
{
vol.Optional(
CONF_WRITE_INTERVAL, default=current_write_interval
): vol.All(vol.Coerce(int), vol.Range(min=1, max=300)),
}
)

return self.async_show_form(
step_id="change_reporting",
data_schema=schema,
)

async def async_step_add_module(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
Expand Down
2 changes: 2 additions & 0 deletions custom_components/pytap/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
CONF_HOST = "host"
CONF_PORT = "port"
CONF_MODULES = "modules"
CONF_WRITE_INTERVAL = "write_interval"

# Module dict keys
CONF_MODULE_STRING = "string"
Expand All @@ -19,6 +20,7 @@
DEFAULT_STRING_NAME = "Default"
DEFAULT_PEAK_POWER = 455
DEFAULT_SCAN_INTERVAL = 30
DEFAULT_WRITE_INTERVAL = 5
RECONNECT_TIMEOUT = 60
RECONNECT_DELAY = 5
RECONNECT_RETRIES = 0
Expand Down
113 changes: 105 additions & 8 deletions custom_components/pytap/coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@
CONF_MODULE_PEAK_POWER,
CONF_MODULE_STRING,
CONF_MODULES,
CONF_WRITE_INTERVAL,
DEFAULT_PEAK_POWER,
DEFAULT_PORT,
DEFAULT_WRITE_INTERVAL,
DOMAIN,
ENERGY_GAP_THRESHOLD_SECONDS,
ENERGY_LOW_POWER_THRESHOLD_W,
Expand All @@ -52,6 +54,20 @@
STORE_VERSION = 2
SAVE_DELAY_SECONDS = 10

# Numeric node fields that are averaged over the write interval before being
# pushed to Home Assistant. Averaging prevents the HA database from receiving
# every raw reading while still giving a representative value per interval.
_AVERAGED_FIELDS = (
"voltage_in",
"voltage_out",
"current_in",
"current_out",
"power",
"temperature",
"dc_dc_duty_cycle",
"rssi",
)


class _MigratingStore(Store):
"""Store subclass with explicit migration support.
Expand Down Expand Up @@ -94,6 +110,9 @@ def __init__(self, hass: HomeAssistant, entry: ConfigEntry) -> None:
self._host: str = entry.data[CONF_HOST]
self._port: int = entry.data.get(CONF_PORT, DEFAULT_PORT)
self._modules: list[dict[str, Any]] = entry.data.get(CONF_MODULES, [])
self._write_interval: float = float(
entry.data.get(CONF_WRITE_INTERVAL, DEFAULT_WRITE_INTERVAL)
)

# Build barcode allowlist from configured modules
self._configured_barcodes: set[str] = {
Expand Down Expand Up @@ -129,6 +148,13 @@ def __init__(self, hass: HomeAssistant, entry: ConfigEntry) -> None:
self._listener_task: asyncio.Task | None = None
self._stop_event = threading.Event()

# Write-interval throttle state (accessed only from the listener thread)
self._last_ha_update: float = 0.0
self._ha_update_pending: bool = False
# Per-barcode buffer of raw numeric readings accumulated between HA
# writes. Flushed (averaged) each time the write interval fires.
self._reading_buffers: dict[str, list[dict[str, Any]]] = {}

# Midnight reset timer handle
self._midnight_reset_unsub: asyncio.TimerHandle | None = None

Expand Down Expand Up @@ -318,14 +344,39 @@ def _listen(self) -> None:
last_data_time = time.monotonic()
events = parser.feed(data)
for event in events:
data_changed = self._process_event(event)
if data_changed:
# Push each event individually to HA
self.data["counters"] = parser.counters
self.hass.loop.call_soon_threadsafe(
self.async_set_updated_data,
dict(self.data),
)
if self._process_event(event):
self._ha_update_pending = True

# Push to HA at most once per write interval, sending
# per-barcode averages over the buffered readings rather
# than the most-recent raw snapshot.
now = time.monotonic()
if self._ha_update_pending and (
now - self._last_ha_update >= self._write_interval
):
self.data["counters"] = parser.counters
snapshot = self._build_averaged_snapshot()
per_node_counts = {
barcode: len(readings)
for barcode, readings in self._reading_buffers.items()
if readings and barcode in snapshot["nodes"]
}
_LOGGER.debug(
"HA update: %d node(s) — %s",
len(per_node_counts),
", ".join(
f"{snapshot['nodes'][b].get('name', b)}: "
f"{n} reading(s)"
for b, n in per_node_counts.items()
),
)
self._reading_buffers.clear()
self.hass.loop.call_soon_threadsafe(
self.async_set_updated_data,
snapshot,
)
self._last_ha_update = now
self._ha_update_pending = False
elif (
RECONNECT_TIMEOUT > 0
and (time.monotonic() - last_data_time) > RECONNECT_TIMEOUT
Expand All @@ -341,6 +392,17 @@ def _listen(self) -> None:
return
_LOGGER.error("Connection error: %s", err)
finally:
# Flush any data that was held back by the write interval.
# Skip updating counters here - parser may not have processed
# any data yet, and node/gateway data is what matters for sensors.
# On disconnect we discard the buffer and send the latest values.
if self._ha_update_pending:
self._reading_buffers.clear()
self.hass.loop.call_soon_threadsafe(
self.async_set_updated_data,
dict(self.data),
)
self._ha_update_pending = False
with self._source_lock:
if self._source is not None:
try:
Expand Down Expand Up @@ -552,6 +614,9 @@ def _handle_power_report(self, event: PowerReportEvent) -> bool:
"daily_reset_date": acc.daily_reset_date,
"last_update": now.isoformat(),
}
self._reading_buffers.setdefault(barcode, []).append(
{field: self.data["nodes"][barcode][field] for field in _AVERAGED_FIELDS}
)
self._schedule_save()
return True

Expand Down Expand Up @@ -706,6 +771,38 @@ def _handle_topology(self, event: TopologyEvent) -> bool:
# Persistence helpers
# -------------------------------------------------------------------

def _build_averaged_snapshot(self) -> dict[str, Any]:
"""Return the data snapshot to push to Home Assistant.

For each barcode in ``_reading_buffers``, the buffered numeric
readings are averaged and written into a copy of the node dict.
``self.data["nodes"]`` is **not** mutated — the raw latest values
are preserved there for persistence; only the returned snapshot
carries averaged values.

Must be called before ``_reading_buffers`` is cleared.
"""
snapshot_nodes = dict(self.data["nodes"])
for barcode, readings in self._reading_buffers.items():
node = snapshot_nodes.get(barcode)
if node is None or not readings:
continue
avg_node = dict(node)
for field in _AVERAGED_FIELDS:
values = [r[field] for r in readings if r[field] is not None]
avg_node[field] = (
round(sum(values) / len(values), 3) if values else None
)
if avg_node["power"] is not None:
avg_node["performance"] = round(
(max(avg_node["power"], 0.0) / avg_node["peak_power"]) * 100.0,
2,
)
else:
avg_node["performance"] = None
snapshot_nodes[barcode] = avg_node
return {**self.data, "nodes": snapshot_nodes}

def _build_node_payload(
self,
barcode: str,
Expand Down
13 changes: 12 additions & 1 deletion custom_components/pytap/strings.json
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,8 @@
"change_connection": "Change connection settings",
"add_module": "Add a module",
"remove_module": "Remove a module",
"done": "Save and close"
"done": "Save and close",
"change_reporting": "Change reporting settings"
}
},
"change_connection": {
Expand Down Expand Up @@ -165,6 +166,16 @@
"data": {
"remove_barcode": "Select module to remove"
}
},
"change_reporting": {
"title": "Reporting Settings",
"description": "Configure how often this integration pushes data to Home Assistant.",
"data": {
"write_interval": "Write interval (seconds)"
},
"data_description": {
"write_interval": "How often (in seconds) to push updates to Home Assistant. Higher values reduce CPU load. Default: 5 seconds."
}
}
}
}
Expand Down
13 changes: 12 additions & 1 deletion custom_components/pytap/translations/en.json
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,8 @@
"change_connection": "Change connection settings",
"add_module": "Add a module",
"remove_module": "Remove a module",
"done": "Save and close"
"done": "Save and close",
"change_reporting": "Change reporting settings"
}
},
"change_connection": {
Expand Down Expand Up @@ -165,6 +166,16 @@
"data": {
"remove_barcode": "Select module to remove"
}
},
"change_reporting": {
"title": "Reporting Settings",
"description": "Configure how often this integration pushes data to Home Assistant.",
"data": {
"write_interval": "Write interval (seconds)"
},
"data_description": {
"write_interval": "How often (in seconds) to push updates to Home Assistant. Higher values reduce CPU load. Default: 5 seconds."
}
}
}
}
Expand Down
Loading