From 5d4dd2efe6257c8696cfc019c2f0da17b4c00083 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Mon, 13 Apr 2026 13:21:09 +0000 Subject: [PATCH 1/3] Initial plan From ec03f05f9614eeaddde7db9e738bdab2a6ef92ad Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Mon, 13 Apr 2026 13:32:38 +0000 Subject: [PATCH 2/3] Move data ingestion logic to job queue (ingestion queue) Agent-Logs-Url: https://github.com/FlexMeasures/flexmeasures/sessions/d8f1f856-844d-4274-a34b-36e33e6d8f69 Co-authored-by: joshuaunity <45713692+joshuaunity@users.noreply.github.com> --- documentation/host/queues.rst | 7 +++- flexmeasures/api/common/utils/api_utils.py | 22 +++++++++- flexmeasures/app.py | 1 + flexmeasures/cli/jobs.py | 4 +- flexmeasures/data/services/data_ingestion.py | 44 ++++++++++++++++++++ 5 files changed, 74 insertions(+), 4 deletions(-) create mode 100644 flexmeasures/data/services/data_ingestion.py diff --git a/documentation/host/queues.rst b/documentation/host/queues.rst index ac489a2b69..58219cd80c 100644 --- a/documentation/host/queues.rst +++ b/documentation/host/queues.rst @@ -24,7 +24,7 @@ Here is how to run one worker for each kind of job (in separate terminals): .. code-block:: bash - $ flexmeasures jobs run-worker --name our-only-worker --queue forecasting|scheduling + $ flexmeasures jobs run-worker --name our-only-worker --queue forecasting|scheduling|ingestion Running multiple workers in parallel might be a great idea. @@ -32,6 +32,7 @@ Running multiple workers in parallel might be a great idea. $ flexmeasures jobs run-worker --name forecaster --queue forecasting $ flexmeasures jobs run-worker --name scheduler --queue scheduling + $ flexmeasures jobs run-worker --name ingester --queue ingestion You can also clear the job queues: @@ -39,10 +40,14 @@ You can also clear the job queues: $ flexmeasures jobs clear-queue --queue forecasting $ flexmeasures jobs clear-queue --queue scheduling + $ flexmeasures jobs clear-queue --queue ingestion When the main FlexMeasures process runs (e.g. by ``flexmeasures run``\ ), the queues of forecasting and scheduling jobs can be visited at ``http://localhost:5000/tasks/forecasting`` and ``http://localhost:5000/tasks/schedules``\ , respectively (by admins). +.. note:: + The ``ingestion`` queue is used for sensor data posted via the API. If no worker is connected to this queue, data is processed synchronously (in the web process) with a warning logged. Running a dedicated ingestion worker is recommended in production to keep API responses fast when large amounts of data are posted. + Inspect the queue and jobs diff --git a/flexmeasures/api/common/utils/api_utils.py b/flexmeasures/api/common/utils/api_utils.py index 5b70551ef3..c984579a4b 100644 --- a/flexmeasures/api/common/utils/api_utils.py +++ b/flexmeasures/api/common/utils/api_utils.py @@ -138,7 +138,27 @@ def save_and_enqueue( forecasting_jobs: list[Job] | None = None, save_changed_beliefs_only: bool = True, ) -> ResponseTuple: - # Attempt to save + from rq import Worker + + from flexmeasures.data.services.data_ingestion import add_beliefs_to_database + + ingestion_queue = current_app.queues.get("ingestion") + if ingestion_queue is not None: + workers = Worker.all(queue=ingestion_queue) + if workers: + ingestion_queue.enqueue( + add_beliefs_to_database, + data, + forecasting_jobs=forecasting_jobs, + save_changed_beliefs_only=save_changed_beliefs_only, + ) + return request_processed() + else: + current_app.logger.warning( + "No workers connected to the ingestion queue. Processing sensor data directly." + ) + + # Attempt to save directly (fallback when no ingestion workers are available) status = save_to_db(data, save_changed_beliefs_only=save_changed_beliefs_only) db.session.commit() diff --git a/flexmeasures/app.py b/flexmeasures/app.py index 538d503eed..c574f92129 100644 --- a/flexmeasures/app.py +++ b/flexmeasures/app.py @@ -110,6 +110,7 @@ def create( # noqa C901 app.queues = dict( forecasting=Queue(connection=redis_conn, name="forecasting"), scheduling=Queue(connection=redis_conn, name="scheduling"), + ingestion=Queue(connection=redis_conn, name="ingestion"), # reporting=Queue(connection=redis_conn, name="reporting"), # labelling=Queue(connection=redis_conn, name="labelling"), # alerting=Queue(connection=redis_conn, name="alerting"), diff --git a/flexmeasures/cli/jobs.py b/flexmeasures/cli/jobs.py index bea4ce8bdd..e9132f605e 100644 --- a/flexmeasures/cli/jobs.py +++ b/flexmeasures/cli/jobs.py @@ -292,7 +292,7 @@ def run_job(job_id: str): "--queue", default=None, required=True, - help="State which queue(s) to work on (using '|' as separator), e.g. 'forecasting', 'scheduling' or 'forecasting|scheduling'.", + help="State which queue(s) to work on (using '|' as separator), e.g. 'forecasting', 'scheduling', 'ingestion' or 'forecasting|scheduling'.", ) @click.option( "--name", @@ -302,7 +302,7 @@ def run_job(job_id: str): ) def run_worker(queue: str, name: str | None): """ - Start a worker process for forecasting and/or scheduling jobs. + Start a worker process for forecasting, scheduling and/or ingestion jobs. We use the app context to find out which redis queues to use. """ diff --git a/flexmeasures/data/services/data_ingestion.py b/flexmeasures/data/services/data_ingestion.py new file mode 100644 index 0000000000..3c0a1735dc --- /dev/null +++ b/flexmeasures/data/services/data_ingestion.py @@ -0,0 +1,44 @@ +""" +Logic around data ingestion (jobs) +""" + +from __future__ import annotations + +from flask import current_app +from rq.job import Job +import timely_beliefs as tb + +from flexmeasures.data import db +from flexmeasures.data.utils import save_to_db + + +def add_beliefs_to_database( + data: tb.BeliefsDataFrame | list[tb.BeliefsDataFrame], + forecasting_jobs: list[Job] | None = None, + save_changed_beliefs_only: bool = True, +) -> str: + """Save sensor data to the database and optionally enqueue forecasting jobs. + + This function is intended to be called as an RQ job by an ingestion queue worker, + but can also be called directly (e.g. as a fallback when no workers are available). + + :param data: BeliefsDataFrame (or list thereof) to be saved. + :param forecasting_jobs: Optional list of forecasting Jobs to enqueue after saving. + :param save_changed_beliefs_only: If True, skip saving beliefs whose value hasn't changed. + :returns: Status string, one of: + - 'success' + - 'success_with_unchanged_beliefs_skipped' + - 'success_but_nothing_new' + """ + status = save_to_db(data, save_changed_beliefs_only=save_changed_beliefs_only) + db.session.commit() + + # Only enqueue forecasting jobs upon successfully saving new data + if status[:7] == "success" and status != "success_but_nothing_new": + if forecasting_jobs is not None: + [ + current_app.queues["forecasting"].enqueue_job(job) + for job in forecasting_jobs + ] + + return status From ea634dbebb2a5ac07af0a6ec5435c7b76d1a7d86 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Mon, 13 Apr 2026 13:35:26 +0000 Subject: [PATCH 3/3] Fix code review comments: use for loop and move imports to module level Agent-Logs-Url: https://github.com/FlexMeasures/flexmeasures/sessions/d8f1f856-844d-4274-a34b-36e33e6d8f69 Co-authored-by: joshuaunity <45713692+joshuaunity@users.noreply.github.com> --- flexmeasures/api/common/utils/api_utils.py | 6 ++---- flexmeasures/data/services/data_ingestion.py | 4 +--- 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/flexmeasures/api/common/utils/api_utils.py b/flexmeasures/api/common/utils/api_utils.py index c984579a4b..1f1facae5c 100644 --- a/flexmeasures/api/common/utils/api_utils.py +++ b/flexmeasures/api/common/utils/api_utils.py @@ -8,12 +8,14 @@ from werkzeug.exceptions import Forbidden, Unauthorized from numpy import array from psycopg2.errors import UniqueViolation +from rq import Worker from rq.job import Job, JobStatus, NoSuchJobError from sqlalchemy import select from sqlalchemy.exc import IntegrityError from flexmeasures.data import db from flexmeasures.data.models.user import Account +from flexmeasures.data.services.data_ingestion import add_beliefs_to_database from flexmeasures.data.utils import save_to_db from flexmeasures.auth.policy import check_access from flexmeasures.api.common.responses import ( @@ -138,10 +140,6 @@ def save_and_enqueue( forecasting_jobs: list[Job] | None = None, save_changed_beliefs_only: bool = True, ) -> ResponseTuple: - from rq import Worker - - from flexmeasures.data.services.data_ingestion import add_beliefs_to_database - ingestion_queue = current_app.queues.get("ingestion") if ingestion_queue is not None: workers = Worker.all(queue=ingestion_queue) diff --git a/flexmeasures/data/services/data_ingestion.py b/flexmeasures/data/services/data_ingestion.py index 3c0a1735dc..74a9c7cec3 100644 --- a/flexmeasures/data/services/data_ingestion.py +++ b/flexmeasures/data/services/data_ingestion.py @@ -36,9 +36,7 @@ def add_beliefs_to_database( # Only enqueue forecasting jobs upon successfully saving new data if status[:7] == "success" and status != "success_but_nothing_new": if forecasting_jobs is not None: - [ + for job in forecasting_jobs: current_app.queues["forecasting"].enqueue_job(job) - for job in forecasting_jobs - ] return status