From 921d958da7d120996ce6e9ec27f236273ae06771 Mon Sep 17 00:00:00 2001 From: Mohamed Belhsan Hmida Date: Mon, 9 Mar 2026 10:37:00 +0100 Subject: [PATCH 01/34] fix: enhance job ID return structure in TrainPredictPipeline Signed-off-by: Mohamed Belhsan Hmida --- .../data/models/forecasting/pipelines/train_predict.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flexmeasures/data/models/forecasting/pipelines/train_predict.py b/flexmeasures/data/models/forecasting/pipelines/train_predict.py index 6efb4d0fe9..6db5833397 100644 --- a/flexmeasures/data/models/forecasting/pipelines/train_predict.py +++ b/flexmeasures/data/models/forecasting/pipelines/train_predict.py @@ -315,9 +315,9 @@ def run( if len(cycle_job_ids) > 1: # Return the wrap-up job ID if multiple cycle jobs are queued - return wrap_up_job.id + return {"job_id" : wrap_up_job.id, "n_jobs": len(cycle_job_ids)} else: # Return the single cycle job ID if only one job is queued - return cycle_job_ids[0] if len(cycle_job_ids) == 1 else wrap_up_job.id + return {"job_id" : cycle_job_ids[0] if len(cycle_job_ids) == 1 else wrap_up_job.id, "n_jobs": 1} return self.return_values From b7068faab4a93ae4a2202f6942e6386d4092f2e5 Mon Sep 17 00:00:00 2001 From: Mohamed Belhsan Hmida Date: Mon, 9 Mar 2026 10:37:18 +0100 Subject: [PATCH 02/34] fix: update forecasting job return structure in SensorAPI Signed-off-by: Mohamed Belhsan Hmida --- flexmeasures/api/v3_0/sensors.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flexmeasures/api/v3_0/sensors.py b/flexmeasures/api/v3_0/sensors.py index 1f64cd4643..371f052b06 100644 --- a/flexmeasures/api/v3_0/sensors.py +++ b/flexmeasures/api/v3_0/sensors.py @@ -1630,13 +1630,13 @@ def trigger_forecast(self, id: int, **params): # Queue forecasting job try: - job_id = forecaster.compute(parameters=parameters, as_job=True) + pipeline_returns = forecaster.compute(parameters=parameters, as_job=True) except Exception as e: current_app.logger.exception("Forecast job failed to enqueue.") return unprocessable_entity(str(e)) d, s = request_processed() - return dict(forecast=job_id, **d), s + return dict(forecast=pipeline_returns["job_id"], **d), s @route("//forecasts/", methods=["GET"]) @use_kwargs( From 505c2840e4cfa750fc43a551744334f5e7273c12 Mon Sep 17 00:00:00 2001 From: Mohamed Belhsan Hmida Date: Mon, 9 Mar 2026 10:37:49 +0100 Subject: [PATCH 03/34] fix: update job fetching logic in test_train_predict_pipeline Signed-off-by: Mohamed Belhsan Hmida --- flexmeasures/data/tests/test_forecasting_pipeline.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flexmeasures/data/tests/test_forecasting_pipeline.py b/flexmeasures/data/tests/test_forecasting_pipeline.py index 348eff7f9a..ea2ed42c2f 100644 --- a/flexmeasures/data/tests/test_forecasting_pipeline.py +++ b/flexmeasures/data/tests/test_forecasting_pipeline.py @@ -197,7 +197,7 @@ def test_train_predict_pipeline( # noqa: C901 if as_job: # Fetch returned job - job = app.queues["forecasting"].fetch_job(pipeline_returns) + job = app.queues["forecasting"].fetch_job(pipeline_returns["job_id"]) assert ( job is not None From 10b370a65f9dcfeb79e47a44aa60852994cb7ce6 Mon Sep 17 00:00:00 2001 From: Mohamed Belhsan Hmida Date: Mon, 9 Mar 2026 10:38:21 +0100 Subject: [PATCH 04/34] chore: remove commented-out breakpoint in test_forecasting.py Signed-off-by: Mohamed Belhsan Hmida --- flexmeasures/data/schemas/tests/test_forecasting.py | 1 - 1 file changed, 1 deletion(-) diff --git a/flexmeasures/data/schemas/tests/test_forecasting.py b/flexmeasures/data/schemas/tests/test_forecasting.py index efb8a19e73..1ccdaf9007 100644 --- a/flexmeasures/data/schemas/tests/test_forecasting.py +++ b/flexmeasures/data/schemas/tests/test_forecasting.py @@ -439,7 +439,6 @@ def test_timing_parameters_of_forecaster_parameters_schema( **timing_input, } ) - # breakpoint() for k, v in expected_timing_output.items(): # Convert kebab-case key to snake_case to match data dictionary keys returned by schema snake_key = kebab_to_snake(k) From 6342867d15c62ab91ded7499cd6a8a0697e5f69a Mon Sep 17 00:00:00 2001 From: Mohamed Belhsan Hmida Date: Mon, 9 Mar 2026 10:39:33 +0100 Subject: [PATCH 05/34] fix: as_job is no longer in parameters Signed-off-by: Mohamed Belhsan Hmida --- flexmeasures/cli/data_add.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flexmeasures/cli/data_add.py b/flexmeasures/cli/data_add.py index 48b618dcca..be5df0c73c 100755 --- a/flexmeasures/cli/data_add.py +++ b/flexmeasures/cli/data_add.py @@ -1151,7 +1151,7 @@ def add_forecast( # noqa: C901 return # as_job case → list of job dicts like {"job-1": ""} - if parameters.get("as_job"): + if as_job: n_jobs = len(pipeline_returns) click.secho(f"Created {n_jobs} forecasting job(s).", **MsgStyle.SUCCESS) return From cc1bf1bf1e440c8583a3d06fe11832a6e606b9fb Mon Sep 17 00:00:00 2001 From: Mohamed Belhsan Hmida Date: Mon, 9 Mar 2026 10:40:05 +0100 Subject: [PATCH 06/34] fix: update job count retrieval in add_forecast function Signed-off-by: Mohamed Belhsan Hmida --- flexmeasures/cli/data_add.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flexmeasures/cli/data_add.py b/flexmeasures/cli/data_add.py index be5df0c73c..64886cac16 100755 --- a/flexmeasures/cli/data_add.py +++ b/flexmeasures/cli/data_add.py @@ -1152,7 +1152,7 @@ def add_forecast( # noqa: C901 # as_job case → list of job dicts like {"job-1": ""} if as_job: - n_jobs = len(pipeline_returns) + n_jobs = pipeline_returns["n_jobs"] click.secho(f"Created {n_jobs} forecasting job(s).", **MsgStyle.SUCCESS) return From a7ff4a9cc7375bd77d8318231b45aef2bd4b8b53 Mon Sep 17 00:00:00 2001 From: Mohamed Belhsan Hmida Date: Mon, 9 Mar 2026 11:02:31 +0100 Subject: [PATCH 07/34] fix: add connection queue to fetch job Signed-off-by: Mohamed Belhsan Hmida --- .../data/models/forecasting/pipelines/train_predict.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/flexmeasures/data/models/forecasting/pipelines/train_predict.py b/flexmeasures/data/models/forecasting/pipelines/train_predict.py index 6db5833397..958357fe62 100644 --- a/flexmeasures/data/models/forecasting/pipelines/train_predict.py +++ b/flexmeasures/data/models/forecasting/pipelines/train_predict.py @@ -46,9 +46,12 @@ def __init__( def run_wrap_up(self, cycle_job_ids: list[str]): """Log the status of all cycle jobs after completion.""" + connection = current_app.queues["forecasting"].connection + for index, job_id in enumerate(cycle_job_ids): + status = Job.fetch(job_id, connection=connection).get_status() logging.info( - f"forecasting job-{index}: {job_id} status: {Job.fetch(job_id).get_status()}" + f"forecasting job-{index}: {job_id} status: {status}" ) def run_cycle( From 745e5137669e9c6cd1e43eafd4619709c79ab063 Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Mon, 9 Mar 2026 13:22:17 +0100 Subject: [PATCH 08/34] style: black Signed-off-by: F.N. Claessen --- .../models/forecasting/pipelines/train_predict.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/flexmeasures/data/models/forecasting/pipelines/train_predict.py b/flexmeasures/data/models/forecasting/pipelines/train_predict.py index 958357fe62..1d9578a869 100644 --- a/flexmeasures/data/models/forecasting/pipelines/train_predict.py +++ b/flexmeasures/data/models/forecasting/pipelines/train_predict.py @@ -50,9 +50,7 @@ def run_wrap_up(self, cycle_job_ids: list[str]): for index, job_id in enumerate(cycle_job_ids): status = Job.fetch(job_id, connection=connection).get_status() - logging.info( - f"forecasting job-{index}: {job_id} status: {status}" - ) + logging.info(f"forecasting job-{index}: {job_id} status: {status}") def run_cycle( self, @@ -318,9 +316,14 @@ def run( if len(cycle_job_ids) > 1: # Return the wrap-up job ID if multiple cycle jobs are queued - return {"job_id" : wrap_up_job.id, "n_jobs": len(cycle_job_ids)} + return {"job_id": wrap_up_job.id, "n_jobs": len(cycle_job_ids)} else: # Return the single cycle job ID if only one job is queued - return {"job_id" : cycle_job_ids[0] if len(cycle_job_ids) == 1 else wrap_up_job.id, "n_jobs": 1} + return { + "job_id": ( + cycle_job_ids[0] if len(cycle_job_ids) == 1 else wrap_up_job.id + ), + "n_jobs": 1, + } return self.return_values From 47f160dbbfc5248c5304f8aa363fd7744f6a9745 Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Mon, 9 Mar 2026 13:23:31 +0100 Subject: [PATCH 09/34] docs: changelog entry Signed-off-by: F.N. Claessen --- documentation/changelog.rst | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/documentation/changelog.rst b/documentation/changelog.rst index 989fab9d0b..cfe072fc15 100644 --- a/documentation/changelog.rst +++ b/documentation/changelog.rst @@ -18,6 +18,14 @@ Bugfixes ----------- +v0.31.2 | March XX, 2026 +============================ + +Bugfixes +----------- +* Fix wrap-up forecasting job [see `PR #2011 `_] + + v0.31.1 | March 6, 2026 ============================ From 3af2c6264b6865c5d6ef8aaa04197a32928be2fe Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Mon, 9 Mar 2026 13:30:27 +0100 Subject: [PATCH 10/34] feat: check if wrap-up job actually finished rather than failed Signed-off-by: F.N. Claessen --- flexmeasures/data/tests/test_forecasting_pipeline.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/flexmeasures/data/tests/test_forecasting_pipeline.py b/flexmeasures/data/tests/test_forecasting_pipeline.py index ea2ed42c2f..7f185bd51c 100644 --- a/flexmeasures/data/tests/test_forecasting_pipeline.py +++ b/flexmeasures/data/tests/test_forecasting_pipeline.py @@ -202,6 +202,9 @@ def test_train_predict_pipeline( # noqa: C901 assert ( job is not None ), "a returned job should exist in the forecasting queue" + assert ( + job.is_finished + ), f"The wrap-up job should be finished, and not {job.get_status()}" if job.dependency_ids: cycle_job_ids = [job] # only one cycle job, no wrap-up job From 474b860379e868b3ffd9f83f549dbf37e957f8f6 Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Mon, 9 Mar 2026 13:44:35 +0100 Subject: [PATCH 11/34] feat: add test case for 2 cycles, yielding 2 jobs and a wrap-up job Signed-off-by: F.N. Claessen --- .../data/tests/test_forecasting_pipeline.py | 21 +++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/flexmeasures/data/tests/test_forecasting_pipeline.py b/flexmeasures/data/tests/test_forecasting_pipeline.py index 7f185bd51c..4451f8f595 100644 --- a/flexmeasures/data/tests/test_forecasting_pipeline.py +++ b/flexmeasures/data/tests/test_forecasting_pipeline.py @@ -58,6 +58,27 @@ True, None, ), + ( + { + # "model": "CustomLGBM", + "future-regressors": ["irradiance-sensor"], + "train-start": "2025-01-01T00:00+02:00", + "retrain-frequency": "PT12H", + }, + { + "sensor": "solar-sensor", + "model-save-dir": "flexmeasures/data/models/forecasting/artifacts/models", + "output-path": None, + "start": "2025-01-08T00:00+02:00", # start coincides with end of available data in sensor + "end": "2025-01-09T00:00+02:00", + "sensor-to-save": None, + "max-forecast-horizon": "PT1H", + "forecast-frequency": "PT12H", # 2 cycles and 2 viewpoint + "probabilistic": False, + }, + True, + None, + ), ( { # "model": "CustomLGBM", From 7f824a7c754db58a92180f4e437af558e7c40417 Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Mon, 9 Mar 2026 13:45:36 +0100 Subject: [PATCH 12/34] dev: comment out failing assert, which needs to be investgated and updated Signed-off-by: F.N. Claessen --- flexmeasures/data/tests/test_forecasting_pipeline.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/flexmeasures/data/tests/test_forecasting_pipeline.py b/flexmeasures/data/tests/test_forecasting_pipeline.py index 4451f8f595..868ea78e4a 100644 --- a/flexmeasures/data/tests/test_forecasting_pipeline.py +++ b/flexmeasures/data/tests/test_forecasting_pipeline.py @@ -205,9 +205,9 @@ def test_train_predict_pipeline( # noqa: C901 # 1 hour of forecasts is saved over 4 15-minute resolution events n_events_per_horizon = timedelta(hours=1) / dg_params["sensor"].event_resolution n_hourly_horizons = dg_params["max_forecast_horizon"] // timedelta(hours=1) - assert ( - len(forecasts) == m_viewpoints * n_hourly_horizons * n_events_per_horizon - ), f"we expect 4 forecasts per horizon for each viewpoint within the prediction window, and {m_viewpoints} viewpoints with each {n_hourly_horizons} hourly horizons" + # assert ( + # len(forecasts) == m_viewpoints * n_hourly_horizons * n_events_per_horizon + # ), f"we expect 4 forecasts per horizon for each viewpoint within the prediction window, and {m_viewpoints} viewpoints with each {n_hourly_horizons} hourly horizons" assert ( forecasts.lineage.number_of_belief_times == m_viewpoints ), f"we expect {m_viewpoints} viewpoints" From 29705a2a0b5c0f498bc3dfb40e39bc2af396f26f Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Mon, 9 Mar 2026 13:46:50 +0100 Subject: [PATCH 13/34] refactor: move checking the status of the wrap-up job to where it matters Signed-off-by: F.N. Claessen --- flexmeasures/data/tests/test_forecasting_pipeline.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/flexmeasures/data/tests/test_forecasting_pipeline.py b/flexmeasures/data/tests/test_forecasting_pipeline.py index 868ea78e4a..eaf093c3f7 100644 --- a/flexmeasures/data/tests/test_forecasting_pipeline.py +++ b/flexmeasures/data/tests/test_forecasting_pipeline.py @@ -223,13 +223,13 @@ def test_train_predict_pipeline( # noqa: C901 assert ( job is not None ), "a returned job should exist in the forecasting queue" - assert ( - job.is_finished - ), f"The wrap-up job should be finished, and not {job.get_status()}" if job.dependency_ids: cycle_job_ids = [job] # only one cycle job, no wrap-up job else: + assert ( + job.is_finished + ), f"The wrap-up job should be finished, and not {job.get_status()}" cycle_job_ids = job.kwargs.get("cycle_job_ids", []) # wrap-up job case finished_jobs = app.queues["forecasting"].finished_job_registry From f26c41b47c0f8965c20c3b9f2df8e2ee68723390 Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Mon, 9 Mar 2026 13:55:09 +0100 Subject: [PATCH 14/34] fix: use job ID itself in case the returned job is the one existing cycle job Signed-off-by: F.N. Claessen --- flexmeasures/data/tests/test_forecasting_pipeline.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flexmeasures/data/tests/test_forecasting_pipeline.py b/flexmeasures/data/tests/test_forecasting_pipeline.py index eaf093c3f7..de20f9b078 100644 --- a/flexmeasures/data/tests/test_forecasting_pipeline.py +++ b/flexmeasures/data/tests/test_forecasting_pipeline.py @@ -224,8 +224,8 @@ def test_train_predict_pipeline( # noqa: C901 job is not None ), "a returned job should exist in the forecasting queue" - if job.dependency_ids: - cycle_job_ids = [job] # only one cycle job, no wrap-up job + if not job.dependency_ids: + cycle_job_ids = [job.id] # only one cycle job, no wrap-up job else: assert ( job.is_finished From f326efc043673a29f4c8abe285c2b5b12251b448 Mon Sep 17 00:00:00 2001 From: Mohamed Belhsan Hmida Date: Tue, 10 Mar 2026 02:56:54 +0100 Subject: [PATCH 15/34] fix: add db.commit before forecasting jobs are created Signed-off-by: Mohamed Belhsan Hmida --- flexmeasures/data/models/forecasting/pipelines/train_predict.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/flexmeasures/data/models/forecasting/pipelines/train_predict.py b/flexmeasures/data/models/forecasting/pipelines/train_predict.py index 1d9578a869..2f933ec7e9 100644 --- a/flexmeasures/data/models/forecasting/pipelines/train_predict.py +++ b/flexmeasures/data/models/forecasting/pipelines/train_predict.py @@ -11,6 +11,7 @@ from flask import current_app +from flexmeasures.data import db from flexmeasures.data.models.forecasting import Forecaster from flexmeasures.data.models.forecasting.pipelines.predict import PredictPipeline from flexmeasures.data.models.forecasting.pipelines.train import TrainPipeline @@ -262,6 +263,7 @@ def run( # job metadata for tracking # Serialize start and end to ISO format strings # Workaround for https://github.com/Parallels/rq-dashboard/issues/510 + db.session.commit() # Ensure the data source ID is available in the database when the job runs. job_metadata = { "data_source_info": {"id": self.data_source.id}, "start": self._parameters["predict_start"].isoformat(), From 67862ed9c7996e69c4cdaa403e1993af57fa8979 Mon Sep 17 00:00:00 2001 From: Mohamed Belhsan Hmida Date: Wed, 11 Mar 2026 00:43:28 +0100 Subject: [PATCH 16/34] dev: uncomment test assertion statement Signed-off-by: Mohamed Belhsan Hmida --- flexmeasures/data/tests/test_forecasting_pipeline.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/flexmeasures/data/tests/test_forecasting_pipeline.py b/flexmeasures/data/tests/test_forecasting_pipeline.py index de20f9b078..7947168766 100644 --- a/flexmeasures/data/tests/test_forecasting_pipeline.py +++ b/flexmeasures/data/tests/test_forecasting_pipeline.py @@ -205,9 +205,9 @@ def test_train_predict_pipeline( # noqa: C901 # 1 hour of forecasts is saved over 4 15-minute resolution events n_events_per_horizon = timedelta(hours=1) / dg_params["sensor"].event_resolution n_hourly_horizons = dg_params["max_forecast_horizon"] // timedelta(hours=1) - # assert ( - # len(forecasts) == m_viewpoints * n_hourly_horizons * n_events_per_horizon - # ), f"we expect 4 forecasts per horizon for each viewpoint within the prediction window, and {m_viewpoints} viewpoints with each {n_hourly_horizons} hourly horizons" + assert ( + len(forecasts) == m_viewpoints * n_hourly_horizons * n_events_per_horizon * n_cycles + ), f"we expect 4 forecasts per horizon for each viewpoint within the prediction window, and {m_viewpoints} viewpoints with each {n_hourly_horizons} hourly horizons" assert ( forecasts.lineage.number_of_belief_times == m_viewpoints ), f"we expect {m_viewpoints} viewpoints" From 64465e94ea0fe8768e472bf7f23fdca877c954e3 Mon Sep 17 00:00:00 2001 From: Mohamed Belhsan Hmida Date: Wed, 11 Mar 2026 00:44:26 +0100 Subject: [PATCH 17/34] Test(feat): search all beliefs forecasts saved into the sensor by the pipeline Signed-off-by: Mohamed Belhsan Hmida --- flexmeasures/data/tests/test_forecasting_pipeline.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/flexmeasures/data/tests/test_forecasting_pipeline.py b/flexmeasures/data/tests/test_forecasting_pipeline.py index 7947168766..8c43a87592 100644 --- a/flexmeasures/data/tests/test_forecasting_pipeline.py +++ b/flexmeasures/data/tests/test_forecasting_pipeline.py @@ -197,7 +197,10 @@ def test_train_predict_pipeline( # noqa: C901 app.queues["forecasting"], exc_handler=handle_forecasting_exception ) - forecasts = sensor.search_beliefs(source_types=["forecaster"]) + forecasts = sensor.search_beliefs( + source_types=["forecaster"], most_recent_beliefs_only=False + ) + dg_params = pipeline._parameters # parameters stored in the data generator m_viewpoints = (dg_params["end_date"] - dg_params["predict_start"]) / ( dg_params["forecast_frequency"] From 4ac18462c1f3eaed9bb0be3442b143027b447d1d Mon Sep 17 00:00:00 2001 From: Mohamed Belhsan Hmida Date: Wed, 11 Mar 2026 00:45:17 +0100 Subject: [PATCH 18/34] test(feat): add n_cycles variable to use to account for length of forecasts Signed-off-by: Mohamed Belhsan Hmida --- flexmeasures/data/tests/test_forecasting_pipeline.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/flexmeasures/data/tests/test_forecasting_pipeline.py b/flexmeasures/data/tests/test_forecasting_pipeline.py index 8c43a87592..a551f19929 100644 --- a/flexmeasures/data/tests/test_forecasting_pipeline.py +++ b/flexmeasures/data/tests/test_forecasting_pipeline.py @@ -208,6 +208,14 @@ def test_train_predict_pipeline( # noqa: C901 # 1 hour of forecasts is saved over 4 15-minute resolution events n_events_per_horizon = timedelta(hours=1) / dg_params["sensor"].event_resolution n_hourly_horizons = dg_params["max_forecast_horizon"] // timedelta(hours=1) + n_cycles = max( + timedelta(hours=dg_params["predict_period_in_hours"]) + // max( + pipeline._config["retrain_frequency"], + pipeline._parameters["forecast_frequency"], + ), + 1, + ) assert ( len(forecasts) == m_viewpoints * n_hourly_horizons * n_events_per_horizon * n_cycles ), f"we expect 4 forecasts per horizon for each viewpoint within the prediction window, and {m_viewpoints} viewpoints with each {n_hourly_horizons} hourly horizons" From bdc5e28c025940360bad93cc610e9efcbf436c7e Mon Sep 17 00:00:00 2001 From: Mohamed Belhsan Hmida Date: Wed, 11 Mar 2026 00:49:29 +0100 Subject: [PATCH 19/34] style: run pre-commit Signed-off-by: Mohamed Belhsan Hmida --- flexmeasures/data/tests/test_forecasting_pipeline.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/flexmeasures/data/tests/test_forecasting_pipeline.py b/flexmeasures/data/tests/test_forecasting_pipeline.py index a551f19929..7d208f6125 100644 --- a/flexmeasures/data/tests/test_forecasting_pipeline.py +++ b/flexmeasures/data/tests/test_forecasting_pipeline.py @@ -217,7 +217,8 @@ def test_train_predict_pipeline( # noqa: C901 1, ) assert ( - len(forecasts) == m_viewpoints * n_hourly_horizons * n_events_per_horizon * n_cycles + len(forecasts) + == m_viewpoints * n_hourly_horizons * n_events_per_horizon * n_cycles ), f"we expect 4 forecasts per horizon for each viewpoint within the prediction window, and {m_viewpoints} viewpoints with each {n_hourly_horizons} hourly horizons" assert ( forecasts.lineage.number_of_belief_times == m_viewpoints From 5340350c844ab6990005ebe0803c1bb9d5131738 Mon Sep 17 00:00:00 2001 From: Mohamed Belhsan Hmida Date: Wed, 11 Mar 2026 00:57:22 +0100 Subject: [PATCH 20/34] fix: improve assertion message in test_train_predict_pipeline for clarity Signed-off-by: Mohamed Belhsan Hmida --- flexmeasures/data/tests/test_forecasting_pipeline.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/flexmeasures/data/tests/test_forecasting_pipeline.py b/flexmeasures/data/tests/test_forecasting_pipeline.py index 7d208f6125..da72e2d4e5 100644 --- a/flexmeasures/data/tests/test_forecasting_pipeline.py +++ b/flexmeasures/data/tests/test_forecasting_pipeline.py @@ -219,7 +219,11 @@ def test_train_predict_pipeline( # noqa: C901 assert ( len(forecasts) == m_viewpoints * n_hourly_horizons * n_events_per_horizon * n_cycles - ), f"we expect 4 forecasts per horizon for each viewpoint within the prediction window, and {m_viewpoints} viewpoints with each {n_hourly_horizons} hourly horizons" + ), ( + f"we expect {n_events_per_horizon} event(s) per horizon, " + f"{n_hourly_horizons} horizon(s), {m_viewpoints} viewpoint(s)" + f"{f', {n_cycles} cycle(s)' if n_cycles > 1 else ''}" + ) assert ( forecasts.lineage.number_of_belief_times == m_viewpoints ), f"we expect {m_viewpoints} viewpoints" From 4ebaa9768aebbaabe3f71b00aa8003272f04fc42 Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Tue, 17 Mar 2026 15:57:31 +0100 Subject: [PATCH 21/34] fix: first create all jobs, then queue all jobs, giving the db.session.commit() some time to finish writing a new source to the db Signed-off-by: F.N. Claessen --- .../data/models/forecasting/pipelines/train_predict.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/flexmeasures/data/models/forecasting/pipelines/train_predict.py b/flexmeasures/data/models/forecasting/pipelines/train_predict.py index 2f933ec7e9..6eca725cb5 100644 --- a/flexmeasures/data/models/forecasting/pipelines/train_predict.py +++ b/flexmeasures/data/models/forecasting/pipelines/train_predict.py @@ -270,6 +270,7 @@ def run( "end": self._parameters["end_date"].isoformat(), "sensor_id": self._parameters["sensor_to_save"].id, } + jobs = [] for cycle_params in cycles_job_params: job = Job.create( @@ -294,7 +295,7 @@ def run( # Store the job ID for this cycle cycle_job_ids.append(job.id) - current_app.queues[queue].enqueue_job(job) + jobs.append(job) current_app.job_cache.add( self._parameters["sensor"].id, job_id=job.id, @@ -314,6 +315,8 @@ def run( ), meta=job_metadata, ) + for job in jobs: + current_app.queues[queue].enqueue_job(job) current_app.queues[queue].enqueue_job(wrap_up_job) if len(cycle_job_ids) > 1: From 888b98079c492352d76e46f3eabf52f3acbe4194 Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Tue, 17 Mar 2026 15:59:10 +0100 Subject: [PATCH 22/34] feat: enqueue job only after the transactional request Signed-off-by: F.N. Claessen --- .../models/forecasting/pipelines/train_predict.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/flexmeasures/data/models/forecasting/pipelines/train_predict.py b/flexmeasures/data/models/forecasting/pipelines/train_predict.py index 6eca725cb5..38456f9df4 100644 --- a/flexmeasures/data/models/forecasting/pipelines/train_predict.py +++ b/flexmeasures/data/models/forecasting/pipelines/train_predict.py @@ -7,10 +7,9 @@ import logging from datetime import datetime, timedelta +from flask import after_this_request, current_app from rq.job import Job -from flask import current_app - from flexmeasures.data import db from flexmeasures.data.models.forecasting import Forecaster from flexmeasures.data.models.forecasting.pipelines.predict import PredictPipeline @@ -315,9 +314,14 @@ def run( ), meta=job_metadata, ) - for job in jobs: - current_app.queues[queue].enqueue_job(job) - current_app.queues[queue].enqueue_job(wrap_up_job) + + @after_this_request + def enqueue_job(response): + """After the request, RQ jobs get to see committed data from the transaction.""" + for job in jobs: + current_app.queues[queue].enqueue_job(job) + current_app.queues[queue].enqueue_job(wrap_up_job) + return response if len(cycle_job_ids) > 1: # Return the wrap-up job ID if multiple cycle jobs are queued From 56f825a393c49c0e4e09d7f831c337c01b40ae0c Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Tue, 17 Mar 2026 17:20:52 +0100 Subject: [PATCH 23/34] Revert "feat: enqueue job only after the transactional request" This reverts commit 888b98079c492352d76e46f3eabf52f3acbe4194. Signed-off-by: F.N. Claessen --- .../models/forecasting/pipelines/train_predict.py | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/flexmeasures/data/models/forecasting/pipelines/train_predict.py b/flexmeasures/data/models/forecasting/pipelines/train_predict.py index 38456f9df4..6eca725cb5 100644 --- a/flexmeasures/data/models/forecasting/pipelines/train_predict.py +++ b/flexmeasures/data/models/forecasting/pipelines/train_predict.py @@ -7,9 +7,10 @@ import logging from datetime import datetime, timedelta -from flask import after_this_request, current_app from rq.job import Job +from flask import current_app + from flexmeasures.data import db from flexmeasures.data.models.forecasting import Forecaster from flexmeasures.data.models.forecasting.pipelines.predict import PredictPipeline @@ -314,14 +315,9 @@ def run( ), meta=job_metadata, ) - - @after_this_request - def enqueue_job(response): - """After the request, RQ jobs get to see committed data from the transaction.""" - for job in jobs: - current_app.queues[queue].enqueue_job(job) - current_app.queues[queue].enqueue_job(wrap_up_job) - return response + for job in jobs: + current_app.queues[queue].enqueue_job(job) + current_app.queues[queue].enqueue_job(wrap_up_job) if len(cycle_job_ids) > 1: # Return the wrap-up job ID if multiple cycle jobs are queued From e41c1a56c41e2f669a53abb8d87982574c5a727c Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Tue, 17 Mar 2026 17:23:11 +0100 Subject: [PATCH 24/34] docs: resolve silent merge conflict in changelog Signed-off-by: F.N. Claessen --- documentation/changelog.rst | 7 ------- 1 file changed, 7 deletions(-) diff --git a/documentation/changelog.rst b/documentation/changelog.rst index 5aa0382a57..b1626ea738 100644 --- a/documentation/changelog.rst +++ b/documentation/changelog.rst @@ -23,13 +23,6 @@ v0.31.2 | March XX, 2026 Bugfixes ----------- * Fix an issue where asset context was accessed in schemas that do not define a ``context`` attribute [see `PR #2014 `_] - - -v0.31.2 | March XX, 2026 -============================ - -Bugfixes ------------ * Fix wrap-up forecasting job [see `PR #2011 `_] From 064f9cb0531c85c6827706b948e4a33c624f8e8f Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Mon, 16 Mar 2026 13:00:41 +0100 Subject: [PATCH 25/34] docs: delete duplicate changelog entry Signed-off-by: F.N. Claessen --- documentation/changelog.rst | 1 - 1 file changed, 1 deletion(-) diff --git a/documentation/changelog.rst b/documentation/changelog.rst index d34bb23eb3..e444f06d8c 100644 --- a/documentation/changelog.rst +++ b/documentation/changelog.rst @@ -72,7 +72,6 @@ New features * Improved the UX for creating sensors, clicking on ``Enter`` now validates and creates a sensor [see `PR #1876 `_] * Show zero values in bar charts even though they have 0 area [see `PR #1932 `_ and `PR #1936 `_] * Added ``root`` and ``depth`` fields to the `[GET] /assets` endpoint for listing assets, to allow selecting descendants of a given root asset up to a given depth [see `PR #1874 `_] -* Give ability to edit sensor timezone from the UI [see `PR #1900 `_] * Support creating schedules with only information known prior to some time, now also via the CLI (the API already supported it) [see `PR #1871 `_]. * Added capability to update an asset's parent from the UI [`PR #1957 `_] * Add ``fields`` param to the asset-listing endpoints, to save bandwidth in response data [see `PR #1884 `_] From 7ff82d14774ba214472e8fe30f80d362743c625e Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Tue, 17 Mar 2026 17:27:39 +0100 Subject: [PATCH 26/34] docs: add release date for v0.31.2 Signed-off-by: F.N. Claessen --- documentation/changelog.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/documentation/changelog.rst b/documentation/changelog.rst index e444f06d8c..441dc152f3 100644 --- a/documentation/changelog.rst +++ b/documentation/changelog.rst @@ -20,7 +20,7 @@ Infrastructure / Support * Improve contact information to get in touch with the FlexMeasures community [see `PR #2022 `_] -v0.31.2 | March XX, 2026 +v0.31.2 | March 17, 2026 ============================ Bugfixes From ff789883b8f4be2b97ceebd766268196f16565ed Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Tue, 17 Mar 2026 17:29:24 +0100 Subject: [PATCH 27/34] docs: advance a different bugfix to v0.31.2 Signed-off-by: F.N. Claessen --- documentation/changelog.rst | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/documentation/changelog.rst b/documentation/changelog.rst index c352a02c5a..1dc8c2f344 100644 --- a/documentation/changelog.rst +++ b/documentation/changelog.rst @@ -13,10 +13,6 @@ New features * Show sensor attributes on sensor page, if not empty [see `PR #2015 `_] * Support saving state-of-charge schedules to sensors with ``"%"`` unit, using the ``soc-max`` flex-model field as the capacity for unit conversion [see `PR #1996 `_] -Bugfixes ------------ -* Stop failure in the API endpoint that lists available endpoints (/api/v3_0/) [see `PR #2032 `_] - Infrastructure / Support ---------------------- * Migrate from ``pip`` to ``uv`` for dependency management [see `PR #1973 `_] @@ -24,6 +20,9 @@ Infrastructure / Support * Make the test environment used by agents and by the test workflow identical [see `PR #1998 `_] * Improve contact information to get in touch with the FlexMeasures community [see `PR #2022 `_] +Bugfixes +----------- + v0.31.2 | March 17, 2026 ============================ @@ -32,6 +31,7 @@ Bugfixes ----------- * Fix an issue where asset context was accessed in schemas that do not define a ``context`` attribute [see `PR #2014 `_] * Fix wrap-up forecasting job [see `PR #2011 `_] +* Stop failure in the API endpoint that lists available endpoints (/api/v3_0/) [see `PR #2032 `_] v0.31.1 | March 6, 2026 From 4ed641a79a98c5c150c954faf67413d9102d4776 Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Tue, 17 Mar 2026 20:54:50 +0100 Subject: [PATCH 28/34] fix: self.data_source found itself in a different session somehow, so we put it back into the session we are committing Signed-off-by: F.N. Claessen --- .../data/models/forecasting/pipelines/train_predict.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/flexmeasures/data/models/forecasting/pipelines/train_predict.py b/flexmeasures/data/models/forecasting/pipelines/train_predict.py index 6eca725cb5..5b016bdbaa 100644 --- a/flexmeasures/data/models/forecasting/pipelines/train_predict.py +++ b/flexmeasures/data/models/forecasting/pipelines/train_predict.py @@ -260,10 +260,13 @@ def run( if as_job: cycle_job_ids = [] + # Ensure the data source ID is available in the database when the job runs. + db.session.merge(self.data_source) + db.session.commit() + # job metadata for tracking # Serialize start and end to ISO format strings # Workaround for https://github.com/Parallels/rq-dashboard/issues/510 - db.session.commit() # Ensure the data source ID is available in the database when the job runs. job_metadata = { "data_source_info": {"id": self.data_source.id}, "start": self._parameters["predict_start"].isoformat(), From d276c8a2a64bbd172bbaa6fbcce9bac567b9106d Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Tue, 17 Mar 2026 20:56:44 +0100 Subject: [PATCH 29/34] Revert "fix: first create all jobs, then queue all jobs, giving the db.session.commit() some time to finish writing a new source to the db" This reverts commit 4ebaa9768aebbaabe3f71b00aa8003272f04fc42. Signed-off-by: F.N. Claessen --- .../data/models/forecasting/pipelines/train_predict.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/flexmeasures/data/models/forecasting/pipelines/train_predict.py b/flexmeasures/data/models/forecasting/pipelines/train_predict.py index 5b016bdbaa..955eb5a01e 100644 --- a/flexmeasures/data/models/forecasting/pipelines/train_predict.py +++ b/flexmeasures/data/models/forecasting/pipelines/train_predict.py @@ -273,7 +273,6 @@ def run( "end": self._parameters["end_date"].isoformat(), "sensor_id": self._parameters["sensor_to_save"].id, } - jobs = [] for cycle_params in cycles_job_params: job = Job.create( @@ -298,7 +297,7 @@ def run( # Store the job ID for this cycle cycle_job_ids.append(job.id) - jobs.append(job) + current_app.queues[queue].enqueue_job(job) current_app.job_cache.add( self._parameters["sensor"].id, job_id=job.id, @@ -318,8 +317,6 @@ def run( ), meta=job_metadata, ) - for job in jobs: - current_app.queues[queue].enqueue_job(job) current_app.queues[queue].enqueue_job(wrap_up_job) if len(cycle_job_ids) > 1: From ddd1cf2d3f3b9836a01d55028801f30ebd90bf1d Mon Sep 17 00:00:00 2001 From: Mohamed Belhsan Hmida Date: Wed, 18 Mar 2026 11:44:56 +0100 Subject: [PATCH 30/34] fix: reload forecasting pipeline orm state in worker session --- .../data/models/forecasting/pipelines/base.py | 38 +++++++++++++++---- .../models/forecasting/pipelines/predict.py | 16 +++++++- 2 files changed, 45 insertions(+), 9 deletions(-) diff --git a/flexmeasures/data/models/forecasting/pipelines/base.py b/flexmeasures/data/models/forecasting/pipelines/base.py index bb84cfd7e5..65570c76ce 100644 --- a/flexmeasures/data/models/forecasting/pipelines/base.py +++ b/flexmeasures/data/models/forecasting/pipelines/base.py @@ -8,9 +8,10 @@ import pandas as pd from darts import TimeSeries from darts.dataprocessing.transformers import MissingValuesFiller -from flexmeasures.data.models.time_series import Sensor from timely_beliefs import utils as tb_utils +from flexmeasures.data import db +from flexmeasures.data.models.time_series import Sensor from flexmeasures.data.models.forecasting.exceptions import NotEnoughDataException @@ -69,8 +70,8 @@ def __init__( predict_end: datetime | None = None, missing_threshold: float = 1.0, ) -> None: - self.future = future_regressors - self.past = past_regressors + self.future = [self._get_attached_sensor(sensor) for sensor in future_regressors] + self.past = [self._get_attached_sensor(sensor) for sensor in past_regressors] self.n_steps_to_predict = n_steps_to_predict self.max_forecast_horizon = max_forecast_horizon # rounds up so we get the number of viewpoints, each `forecast_frequency` apart @@ -82,15 +83,17 @@ def __init__( self.save_belief_time = ( save_belief_time # non floored belief time to save forecasts with ) - self.target_sensor = target_sensor - self.target = f"{target_sensor.name} (ID: {target_sensor.id})_target" + self.target_sensor = self._get_attached_sensor(target_sensor) + self.target = ( + f"{self.target_sensor.name} (ID: {self.target_sensor.id})_target" + ) self.future_regressors = [ f"{sensor.name} (ID: {sensor.id})_FR-{idx}" - for idx, sensor in enumerate(future_regressors) + for idx, sensor in enumerate(self.future) ] self.past_regressors = [ f"{sensor.name} (ID: {sensor.id})_PR-{idx}" - for idx, sensor in enumerate(past_regressors) + for idx, sensor in enumerate(self.past) ] self.predict_start = predict_start if predict_start else None self.predict_end = predict_end if predict_end else None @@ -102,6 +105,15 @@ def __init__( self.forecast_frequency = forecast_frequency self.missing_threshold = missing_threshold + @staticmethod + def _get_attached_sensor(sensor: Sensor | int) -> Sensor: + """Reload sensors through the active session to avoid cross-session ORM state.""" + sensor_id = sensor.id if isinstance(sensor, Sensor) else sensor + attached_sensor = db.session.get(Sensor, sensor_id) + if attached_sensor is None: + raise ValueError(f"Could not load sensor with id {sensor_id}.") + return attached_sensor + def load_data_all_beliefs(self) -> pd.DataFrame: """ This function fetches data for each sensor. @@ -415,6 +427,18 @@ def _slice_closed( target_end = first_target_end + delta forecast_end = first_forecast_end + delta + if is_predict_pipeline and self.predict_end is not None: + cycle_last_event_start = ( + pd.to_datetime(self.predict_end, utc=True).tz_localize(None) + - target_sensor_resolution + ) + forecast_end = min(forecast_end, cycle_last_event_start) + target_end = min( + target_end, + forecast_end + - pd.Timedelta(hours=self.max_forecast_horizon_in_hours), + ) + # Target split y_slice_df = _slice_closed(y_clean, target_start, target_end) y_split = self.detect_and_fill_missing_values( diff --git a/flexmeasures/data/models/forecasting/pipelines/predict.py b/flexmeasures/data/models/forecasting/pipelines/predict.py index 78fca20420..f90ce61106 100644 --- a/flexmeasures/data/models/forecasting/pipelines/predict.py +++ b/flexmeasures/data/models/forecasting/pipelines/predict.py @@ -13,6 +13,7 @@ from flexmeasures import Sensor, Source from flexmeasures.data import db +from flexmeasures.data.models.data_sources import DataSource from flexmeasures.data.models.forecasting.utils import data_to_bdf from flexmeasures.data.models.forecasting.pipelines.base import BasePipeline from flexmeasures.data.utils import save_to_db @@ -83,7 +84,7 @@ def __init__( self.quantiles = tuple(quantiles) if quantiles else None self.forecast_horizon = np.arange(1, max_forecast_horizon + 1) self.forecast_frequency = forecast_frequency - self.sensor_to_save = sensor_to_save + self.sensor_to_save = self._get_attached_sensor(sensor_to_save) self.predict_start = predict_start self.predict_end = predict_end @@ -92,7 +93,18 @@ def __init__( self.total_forecast_hours = ( self.max_forecast_horizon * self.sensor_resolution.total_seconds() / 3600 ) - self.data_source = data_source + self.data_source = self._get_attached_data_source(data_source) + + @staticmethod + def _get_attached_data_source(data_source: Source | int | None) -> DataSource | None: + """Reload the prediction source through the active session before saving beliefs.""" + if data_source is None: + return None + source_id = data_source.id if isinstance(data_source, DataSource) else data_source + attached_source = db.session.get(DataSource, source_id) + if attached_source is None: + raise ValueError(f"Could not load data source with id {source_id}.") + return attached_source def load_model(self): """ From 91f0e887d772c0d1b455a7b62dfd7d151dc563bd Mon Sep 17 00:00:00 2001 From: Mohamed Belhsan Hmida Date: Wed, 18 Mar 2026 11:45:01 +0100 Subject: [PATCH 31/34] fix: serialize train-predict cycle jobs for workers --- .../forecasting/pipelines/train_predict.py | 89 +++++++++++++++++-- 1 file changed, 80 insertions(+), 9 deletions(-) diff --git a/flexmeasures/data/models/forecasting/pipelines/train_predict.py b/flexmeasures/data/models/forecasting/pipelines/train_predict.py index 955eb5a01e..4020a3fbfb 100644 --- a/flexmeasures/data/models/forecasting/pipelines/train_predict.py +++ b/flexmeasures/data/models/forecasting/pipelines/train_predict.py @@ -12,9 +12,11 @@ from flask import current_app from flexmeasures.data import db +from flexmeasures.data.models.data_sources import DataSource from flexmeasures.data.models.forecasting import Forecaster from flexmeasures.data.models.forecasting.pipelines.predict import PredictPipeline from flexmeasures.data.models.forecasting.pipelines.train import TrainPipeline +from flexmeasures.data.models.time_series import Sensor from flexmeasures.data.schemas.forecasting.pipeline import ( ForecasterParametersSchema, TrainPredictPipelineConfigSchema, @@ -22,6 +24,29 @@ from flexmeasures.utils.flexmeasures_inflection import p +def run_train_predict_cycle_job( + config: dict, + parameters: dict, + data_source_id: int, + delete_model: bool, + **cycle_params, +): + """Reconstruct pipeline state inside the worker to avoid pickling ORM objects.""" + pipeline = TrainPredictPipeline(config=config, delete_model=delete_model) + pipeline._parameters = pipeline._parameters_schema.load(parameters) + pipeline._data_source = db.session.get(DataSource, data_source_id) + return pipeline.run_cycle(**cycle_params) + + +def run_train_predict_wrap_up_job(cycle_job_ids: list[str]): + """Log the status of all cycle jobs after completion.""" + connection = current_app.queues["forecasting"].connection + + for index, job_id in enumerate(cycle_job_ids): + status = Job.fetch(job_id, connection=connection).get_status() + logging.info(f"forecasting job-{index}: {job_id} status: {status}") + + class TrainPredictPipeline(Forecaster): __version__ = "1" @@ -47,11 +72,7 @@ def __init__( def run_wrap_up(self, cycle_job_ids: list[str]): """Log the status of all cycle jobs after completion.""" - connection = current_app.queues["forecasting"].connection - - for index, job_id in enumerate(cycle_job_ids): - status = Job.fetch(job_id, connection=connection).get_status() - logging.info(f"forecasting job-{index}: {job_id} status: {status}") + run_train_predict_wrap_up_job(cycle_job_ids) def run_cycle( self, @@ -66,6 +87,7 @@ def run_cycle( """ Runs a single training and prediction cycle. """ + self._reattach_worker_state() logging.info( f"Starting Train-Predict cycle from {train_start} to {predict_end}" ) @@ -148,6 +170,46 @@ def run_cycle( ) return total_runtime + @staticmethod + def _get_attached_sensor(sensor: Sensor | int) -> Sensor: + sensor_id = sensor.id if isinstance(sensor, Sensor) else sensor + attached_sensor = db.session.get(Sensor, sensor_id) + if attached_sensor is None: + raise ValueError(f"Could not load sensor with id {sensor_id}.") + return attached_sensor + + @staticmethod + def _get_attached_data_source( + data_source: DataSource | int | None, + ) -> DataSource | None: + if data_source is None: + return None + source_id = ( + data_source.id if isinstance(data_source, DataSource) else data_source + ) + attached_source = db.session.get(DataSource, source_id) + if attached_source is None: + raise ValueError(f"Could not load data source with id {source_id}.") + return attached_source + + def _reattach_worker_state(self) -> None: + """Reload ORM objects through the worker's active session.""" + self._config["future_regressors"] = [ + self._get_attached_sensor(sensor) + for sensor in self._config["future_regressors"] + ] + self._config["past_regressors"] = [ + self._get_attached_sensor(sensor) + for sensor in self._config["past_regressors"] + ] + self._parameters["sensor"] = self._get_attached_sensor( + self._parameters["sensor"] + ) + self._parameters["sensor_to_save"] = self._get_attached_sensor( + self._parameters["sensor_to_save"] + ) + self._data_source = self._get_attached_data_source(self.data_source) + def _compute_forecast(self, as_job: bool = False, **kwargs) -> list[dict[str, Any]]: # Run the train-and-predict pipeline return self.run(as_job=as_job, **kwargs) @@ -261,8 +323,10 @@ def run( cycle_job_ids = [] # Ensure the data source ID is available in the database when the job runs. - db.session.merge(self.data_source) + self._data_source = db.session.merge(self.data_source) db.session.commit() + serialized_config = self._config_schema.dump(self._config) + serialized_parameters = self._parameters_schema.dump(self._parameters) # job metadata for tracking # Serialize start and end to ISO format strings @@ -276,9 +340,16 @@ def run( for cycle_params in cycles_job_params: job = Job.create( - self.run_cycle, + run_train_predict_cycle_job, # Some cycle job params override job kwargs - kwargs={**job_kwargs, **cycle_params}, + kwargs={ + **job_kwargs, + "config": serialized_config, + "parameters": serialized_parameters, + "data_source_id": self.data_source.id, + "delete_model": self.delete_model, + **cycle_params, + }, connection=current_app.queues[queue].connection, ttl=int( current_app.config.get( @@ -306,7 +377,7 @@ def run( ) wrap_up_job = Job.create( - self.run_wrap_up, + run_train_predict_wrap_up_job, kwargs={"cycle_job_ids": cycle_job_ids}, # cycles jobs IDs to wait for connection=current_app.queues[queue].connection, depends_on=cycle_job_ids, # wrap-up job depends on all cycle jobs From 243c01197ff544d7b7bb5bfd8be7f8edef8847d2 Mon Sep 17 00:00:00 2001 From: Mohamed Belhsan Hmida Date: Wed, 18 Mar 2026 11:47:23 +0100 Subject: [PATCH 32/34] revert: commit not related to this pr Signed-off-by: Mohamed Belhsan Hmida --- .../data/models/forecasting/pipelines/base.py | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/flexmeasures/data/models/forecasting/pipelines/base.py b/flexmeasures/data/models/forecasting/pipelines/base.py index 65570c76ce..d8302875f2 100644 --- a/flexmeasures/data/models/forecasting/pipelines/base.py +++ b/flexmeasures/data/models/forecasting/pipelines/base.py @@ -427,18 +427,6 @@ def _slice_closed( target_end = first_target_end + delta forecast_end = first_forecast_end + delta - if is_predict_pipeline and self.predict_end is not None: - cycle_last_event_start = ( - pd.to_datetime(self.predict_end, utc=True).tz_localize(None) - - target_sensor_resolution - ) - forecast_end = min(forecast_end, cycle_last_event_start) - target_end = min( - target_end, - forecast_end - - pd.Timedelta(hours=self.max_forecast_horizon_in_hours), - ) - # Target split y_slice_df = _slice_closed(y_clean, target_start, target_end) y_split = self.detect_and_fill_missing_values( From 38dd206128c744ce321bf1efebc0f77189419119 Mon Sep 17 00:00:00 2001 From: Mohamed Belhsan Hmida Date: Thu, 19 Mar 2026 00:57:20 +0100 Subject: [PATCH 33/34] style: run pre-commit Signed-off-by: Mohamed Belhsan Hmida --- flexmeasures/data/models/forecasting/pipelines/base.py | 8 ++++---- flexmeasures/data/models/forecasting/pipelines/predict.py | 8 ++++++-- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/flexmeasures/data/models/forecasting/pipelines/base.py b/flexmeasures/data/models/forecasting/pipelines/base.py index d8302875f2..e2ff4aacaf 100644 --- a/flexmeasures/data/models/forecasting/pipelines/base.py +++ b/flexmeasures/data/models/forecasting/pipelines/base.py @@ -70,7 +70,9 @@ def __init__( predict_end: datetime | None = None, missing_threshold: float = 1.0, ) -> None: - self.future = [self._get_attached_sensor(sensor) for sensor in future_regressors] + self.future = [ + self._get_attached_sensor(sensor) for sensor in future_regressors + ] self.past = [self._get_attached_sensor(sensor) for sensor in past_regressors] self.n_steps_to_predict = n_steps_to_predict self.max_forecast_horizon = max_forecast_horizon @@ -84,9 +86,7 @@ def __init__( save_belief_time # non floored belief time to save forecasts with ) self.target_sensor = self._get_attached_sensor(target_sensor) - self.target = ( - f"{self.target_sensor.name} (ID: {self.target_sensor.id})_target" - ) + self.target = f"{self.target_sensor.name} (ID: {self.target_sensor.id})_target" self.future_regressors = [ f"{sensor.name} (ID: {sensor.id})_FR-{idx}" for idx, sensor in enumerate(self.future) diff --git a/flexmeasures/data/models/forecasting/pipelines/predict.py b/flexmeasures/data/models/forecasting/pipelines/predict.py index f90ce61106..9e23b55237 100644 --- a/flexmeasures/data/models/forecasting/pipelines/predict.py +++ b/flexmeasures/data/models/forecasting/pipelines/predict.py @@ -96,11 +96,15 @@ def __init__( self.data_source = self._get_attached_data_source(data_source) @staticmethod - def _get_attached_data_source(data_source: Source | int | None) -> DataSource | None: + def _get_attached_data_source( + data_source: Source | int | None, + ) -> DataSource | None: """Reload the prediction source through the active session before saving beliefs.""" if data_source is None: return None - source_id = data_source.id if isinstance(data_source, DataSource) else data_source + source_id = ( + data_source.id if isinstance(data_source, DataSource) else data_source + ) attached_source = db.session.get(DataSource, source_id) if attached_source is None: raise ValueError(f"Could not load data source with id {source_id}.") From 341751a43e065af93210150e1d54e5c58271f59c Mon Sep 17 00:00:00 2001 From: Mohamed Belhsan Hmida Date: Thu, 19 Mar 2026 01:10:42 +0100 Subject: [PATCH 34/34] fix: fix test merge commit Signed-off-by: Mohamed Belhsan Hmida --- .../data/tests/test_forecasting_pipeline.py | 44 +------------------ 1 file changed, 1 insertion(+), 43 deletions(-) diff --git a/flexmeasures/data/tests/test_forecasting_pipeline.py b/flexmeasures/data/tests/test_forecasting_pipeline.py index a87decb2da..4b727361df 100644 --- a/flexmeasures/data/tests/test_forecasting_pipeline.py +++ b/flexmeasures/data/tests/test_forecasting_pipeline.py @@ -73,7 +73,7 @@ "end": "2025-01-09T00:00+02:00", "sensor-to-save": None, "max-forecast-horizon": "PT1H", - "forecast-frequency": "PT12H", # 2 cycles and 2 viewpoint + "forecast-frequency": "PT12H", # 2 cycles and 2 viewpoints "probabilistic": False, }, True, @@ -197,58 +197,16 @@ def test_train_predict_pipeline( # noqa: C901 app.queues["forecasting"], exc_handler=handle_forecasting_exception ) - forecasts = sensor.search_beliefs( - source_types=["forecaster"], most_recent_beliefs_only=False - ) - - dg_params = pipeline._parameters # parameters stored in the data generator - m_viewpoints = (dg_params["end_date"] - dg_params["predict_start"]) / ( - dg_params["forecast_frequency"] - ) - # 1 hour of forecasts is saved over 4 15-minute resolution events - n_events_per_horizon = timedelta(hours=1) / dg_params["sensor"].event_resolution - n_hourly_horizons = dg_params["max_forecast_horizon"] // timedelta(hours=1) - n_cycles = max( - timedelta(hours=dg_params["predict_period_in_hours"]) - // max( - pipeline._config["retrain_frequency"], - pipeline._parameters["forecast_frequency"], - ), - 1, - ) - assert ( - len(forecasts) - == m_viewpoints * n_hourly_horizons * n_events_per_horizon * n_cycles - ), ( - f"we expect {n_events_per_horizon} event(s) per horizon, " - f"{n_hourly_horizons} horizon(s), {m_viewpoints} viewpoint(s)" - f"{f', {n_cycles} cycle(s)' if n_cycles > 1 else ''}" - ) - assert ( - forecasts.lineage.number_of_belief_times == m_viewpoints - ), f"we expect {m_viewpoints} viewpoints" - source = forecasts.lineage.sources[0] - assert "TrainPredictPipeline" in str( - source - ), "string representation of the Forecaster (DataSource) should mention the used model" - - if as_job: # Fetch returned job job = app.queues["forecasting"].fetch_job(pipeline_returns["job_id"]) - job = app.queues["forecasting"].fetch_job(pipeline_returns["job_id"]) assert ( job is not None ), "a returned job should exist in the forecasting queue" - if not job.dependency_ids: - cycle_job_ids = [job.id] # only one cycle job, no wrap-up job if not job.dependency_ids: cycle_job_ids = [job.id] # only one cycle job, no wrap-up job else: - assert ( - job.is_finished - ), f"The wrap-up job should be finished, and not {job.get_status()}" assert ( job.is_finished ), f"The wrap-up job should be finished, and not {job.get_status()}"