Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
f3a387e
Initial plan
Copilot Feb 23, 2026
2d5eb3e
Add trigger_forecast, get_forecast, and trigger_and_get_forecast methods
Copilot Feb 23, 2026
0a7f2d9
Add tests for trigger_forecast, get_forecast, and trigger_and_get_for…
Copilot Feb 23, 2026
0fea65b
Fix black linting issue and add copilot agent instructions
Copilot Feb 24, 2026
9f0766b
Add forecasting examples to README and docs/forecasting.rst; simplify…
Copilot Feb 24, 2026
d4e653c
Merge remote-tracking branch 'origin/main' into copilot/support-forec…
Flix6x Feb 24, 2026
47ede74
feat: use trigger forecasts endpoint to trigger forecasts in hems script
BelhsanHmida Mar 9, 2026
dfdedbb
Merge branch 'main' into feat/use-forecasting-endpoint-hems
BelhsanHmida Mar 9, 2026
7456803
fix: correct syntax error in forecast trigger failure message
BelhsanHmida Mar 10, 2026
9262395
feat: update forecast generation to wait for job completion and provi…
BelhsanHmida Mar 10, 2026
866f74c
Merge branch 'main' into feat/use-forecasting-endpoint-hems
BelhsanHmida Mar 11, 2026
1cb50b1
Merge branch 'main' into feat/use-forecasting-endpoint-hems
BelhsanHmida Mar 24, 2026
d9e7543
refactor: clean up formatting and spacing in generate_sensor_forecast…
BelhsanHmida Mar 24, 2026
83983db
feat: add ensure_minimum_server_version method to validate server ver…
BelhsanHmida Mar 24, 2026
7864530
test: add tests for ensure_minimum_server_version method
BelhsanHmida Mar 24, 2026
0c3e367
feat: ensure minimum server version for HEMS example
BelhsanHmida Mar 24, 2026
1843f85
docs: update forecasting cli reference in docs
BelhsanHmida Mar 24, 2026
155fc7b
docs: update terminal instructions for running the HEMS tutorial
BelhsanHmida Mar 24, 2026
8eecce6
docs: update HEMS tutorial note about future API reporting
BelhsanHmida Mar 24, 2026
08086b7
docs: update terminal instructions for running the HEMS tutorial. to …
BelhsanHmida Mar 24, 2026
ca214d0
fix: surface failed forecast job details during polling
BelhsanHmida Mar 24, 2026
409bff9
fix: report failed HEMS forecast jobs
BelhsanHmida Mar 24, 2026
2213cb0
Update docs/HEMS.rst
BelhsanHmida Mar 25, 2026
1e6950b
Update docs/HEMS.rst
BelhsanHmida Mar 25, 2026
de2d014
Update docs/HEMS.rst
BelhsanHmida Mar 25, 2026
2ec0697
Handle forecast failure messages from API
BelhsanHmida Mar 27, 2026
cb117a9
Remove failed-202 forecast polling branch
BelhsanHmida Mar 27, 2026
b13694e
Update src/flexmeasures_client/response_handling.py
BelhsanHmida Mar 30, 2026
e617a8a
Update examples/HEMS/forecasting.py
BelhsanHmida Mar 30, 2026
ae9345a
Return consistent type in HEMS forecasting helper
BelhsanHmida Mar 30, 2026
e2082de
Escape regex match in server version test
BelhsanHmida Mar 30, 2026
e07d20f
Update examples/HEMS/forecasting.py
nhoening Mar 31, 2026
feb54b7
run black
nhoening Mar 31, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions docs/HEMS.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ HEMS tutorial
We wrote a complete tutorial with the client*, which sets up a HEMS from scratch (from nothing but a FlexMeasures account).

- It creates the whole structure - with PV, battery and a heat pump.
- It loads two weeks of historical data and creates forecasts based on it
- It loads two weeks of historical data and creates forecasts through the forecasting API.
- It goes through one week in 4h steps, forecasting and scheduling all flexible assets.

This is the resulting dashboard:
Expand All @@ -15,7 +15,7 @@ This is the resulting dashboard:
:align: center
|

.. note:: The tutorial still uses the CLI for two things: forecasting and reporting. We are working on those...
.. note:: The tutorial still uses the CLI for reporting. In future versions, we might make reporting available via the API, as well.


Set up your environment
Expand Down Expand Up @@ -61,11 +61,13 @@ Open three terminals. In the first terminal, run the server:

flexmeasures run

In the second terminal, run a flexmeasures worker for the scheduling jobs:
In the second terminal, run a flexmeasures worker that listens to both the scheduling and forecasting queues:

.. code-block:: bash

flexmeasures jobs run-worker --queue "scheduling"
flexmeasures jobs run-worker --queue "forecasting|scheduling"

Comment thread
BelhsanHmida marked this conversation as resolved.
Note: you can run the same command in two terminals (2 workers), to speed up the computation!

In the third terminal, run the client script using the `/examples/HEMS` folder as the current working directory:

Expand Down
5 changes: 5 additions & 0 deletions examples/HEMS/HEMS_setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ async def main(
client = FlexMeasuresClient(email=usr, password=pwd, host=host)

try:
await client.ensure_minimum_server_version(
"0.31.0",
"The HEMS example requires a FlexMeasures server of v0.31.0 or above.",
)

# Get user account information
account = await client.get_account()
if not account:
Expand Down
90 changes: 36 additions & 54 deletions examples/HEMS/forecasting.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
import subprocess

from const import (
FORECAST_HORIZON_HOURS,
FORECASTING_START,
Expand All @@ -21,18 +19,13 @@ async def generate_sensor_forecasts(
asset_name: str,
community_name: str,
regressors: list[tuple[str, str]] | None = None,
):
"""Generate forecasts using FlexMeasures CLI for the second week."""
) -> str | None:
"""
Generate forecasts for the second week and wait until the job finishes.
Returns sensor ID as str, or None if a failure happened.
"""
print(f"Generating {sensor_name} forecasts for {asset_name}...")

# Check if flexmeasures CLI is available
check_cmd = ["which", "flexmeasures"]
check_result = subprocess.run(check_cmd, capture_output=True, text=True)

if check_result.returncode != 0:
print("FlexMeasures CLI not found. Skipping forecast generation.")
return False

# Find sensors
sensor_mappings = [
# (key, sensor name, asset name)
Expand All @@ -56,50 +49,39 @@ async def generate_sensor_forecasts(

if not target_sensor:
print("Could not find required sensors for forecasting")
return False

# Run CLI command
# NOTE: This uses the CLI because there is no public API yet.
# An API endpoint is coming soon, so this can later be done via the client.
# Requires FlexMeasures PR #1546.
cmd = [
"flexmeasures",
"add",
"forecasts",
"--sensor",
str(target_sensor["id"]),
"--train-start",
TUTORIAL_START_DATE,
"--from-date",
FORECASTING_START,
"--to-date",
SCHEDULING_END,
"--max-forecast-horizon",
f"PT{FORECAST_HORIZON_HOURS}H",
"--forecast-frequency",
f"PT{SIMULATION_STEP_HOURS}H",
"--ensure-positive",
"--model-save-dir",
"forecaster_models",
]

if regressor_sensors:
cmd.extend(
[
"--past-regressors",
",".join([str(sensor["id"]) for sensor in regressor_sensors]),
]
) # TODO: to be changed to --regressors when the sensor has irradiance forecasts
return None

forecast_id: str | None = None
try:
forecast_id = await client.trigger_forecast(
sensor_id=target_sensor["id"],
train_start=TUTORIAL_START_DATE,
start=FORECASTING_START,
end=SCHEDULING_END,
max_forecast_horizon=f"PT{FORECAST_HORIZON_HOURS}H",
forecast_frequency=f"PT{SIMULATION_STEP_HOURS}H",
past_regressors=(
[sensor["id"] for sensor in regressor_sensors]
if regressor_sensors
else None
),
)
print(f"Forecast triggered with ID: {forecast_id}")
await client.get_forecast(
sensor_id=target_sensor["id"],
forecast_id=forecast_id,
)
except Exception as exc:
job_id = forecast_id if forecast_id is not None else "unknown"
print(f"Forecast job {job_id} failed for {sensor_name} on {asset_name}: {exc}")
print(
"Look up this job in the RQ dashboard for more details about the failure."
)
return None

print(f"Running: {' '.join(cmd)}")
result = subprocess.run(cmd, capture_output=True, text=True, timeout=300)
print(f"Forecast job completed for {sensor_name} on {asset_name}")

if result.returncode == 0:
Comment thread
BelhsanHmida marked this conversation as resolved.
print(f"{sensor_name} forecasts for {asset_name} generated successfully")
return True
else:
print(f"{sensor_name} forecasts for {asset_name} failed: {result.stderr}")
return False
return forecast_id


async def generate_forecasts(
Expand Down
29 changes: 21 additions & 8 deletions src/flexmeasures_client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,23 @@ async def ensure_server_version(self):
version_info = await self.get_versions()
self.server_version = version_info["server_version"]

async def ensure_minimum_server_version(
self,
minimum_server_version: str,
minimum_server_version_msg: str | None = None,
):
"""Ensure that the server version meets a minimum requirement."""
await self.ensure_server_version()
if Version(cast(str, self.server_version)) < Version(minimum_server_version):
msg = (
"This functionality requires FlexMeasures server of "
f"{minimum_server_version} or above. Current server has version "
f"{self.server_version}."
)
if minimum_server_version_msg:
msg += f"\n{minimum_server_version_msg}"
raise InsufficientServerVersionError(msg)

async def request(
self,
uri: str,
Expand Down Expand Up @@ -212,14 +229,10 @@ async def request(
"404" in str(exception)
and minimum_server_version is not None
):
await self.ensure_server_version()
if Version(self.server_version) < Version(
minimum_server_version
):
msg = f"This functionality requires FlexMeasures server of {minimum_server_version} or above. Current server has version {self.server_version}."
if minimum_server_version_msg:
msg += f"\n{minimum_server_version_msg}"
raise InsufficientServerVersionError(msg)
await self.ensure_minimum_server_version(
minimum_server_version,
minimum_server_version_msg,
)
raise ConnectionError(
f"Error occurred while communicating with the API: {exception}"
) from exception
Expand Down
7 changes: 7 additions & 0 deletions src/flexmeasures_client/response_handling.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,13 @@ async def check_response(
elif payload.get("errors"):
# try to raise any error messages from the response
raise ValueError(" ,".join(payload.get("errors")))
elif payload.get("message") and status != 404:
# For most non-2xx responses with a JSON message, raise ValueError.
# 404s are excluded so they fall through to response.raise_for_status(),
# preserving the aiohttp.ClientError behavior used by version checks.
raise ValueError(
f"Request failed with status code {status}: {payload.get('message')}"
)
else:
message = f"""
status: {status}
Expand Down
35 changes: 35 additions & 0 deletions tests/client/test_forecast.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,41 @@ async def test_get_forecast_polling() -> None:
await flexmeasures_client.close()


@pytest.mark.asyncio
async def test_get_forecast_failed_job() -> None:
"""Test getting a forecast surfaces API failure details for failed jobs."""
sensor_id = 1
forecast_id = "failed-uuid"
url = f"http://localhost:5000/api/v3_0/sensors/{sensor_id}/forecasts/{forecast_id}"

with aioresponses() as m:
m.get(
url=url,
status=422,
payload={
"message": "Training data is incomplete.",
},
)

flexmeasures_client = FlexMeasuresClient(
email="test@test.test",
password="test",
request_timeout=2,
polling_interval=0.2,
access_token="skip-auth",
)

with pytest.raises(
ValueError,
match="Request failed with status code 422: Training data is incomplete.",
):
await flexmeasures_client.get_forecast(
sensor_id=sensor_id, forecast_id=forecast_id
)

await flexmeasures_client.close()


@pytest.mark.asyncio
async def test_trigger_and_get_forecast() -> None:
"""Test triggering and getting a forecast in one call."""
Expand Down
29 changes: 29 additions & 0 deletions tests/client/test_init.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import asyncio
import re
from unittest.mock import MagicMock

import pytest
Expand Down Expand Up @@ -346,6 +347,34 @@ async def test_ensure_server_version_already_known():
await client.close()


@pytest.mark.asyncio
async def test_ensure_minimum_server_version_already_satisfied():
client = FlexMeasuresClient(email="test@test.test", password="test")
client.server_version = "0.31.0"
await client.ensure_minimum_server_version("0.31.0")
await client.close()


@pytest.mark.asyncio
async def test_ensure_minimum_server_version_raises_with_message():
client = FlexMeasuresClient(email="test@test.test", password="test")
client.server_version = "0.30.0"
expected_message = (
"This functionality requires FlexMeasures server of 0.31.0 or above. "
"Current server has version 0.30.0.\n"
"The HEMS example requires a FlexMeasures server of v0.31.0 or above."
)
with pytest.raises(
InsufficientServerVersionError,
match=re.escape(expected_message),
):
Comment thread
BelhsanHmida marked this conversation as resolved.
await client.ensure_minimum_server_version(
"0.31.0",
"The HEMS example requires a FlexMeasures server of v0.31.0 or above.",
)
await client.close()


@pytest.mark.asyncio
async def test_determine_port_conflict():
"""Port set in both host and port param raises WrongHostError."""
Expand Down
Loading