Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion documentation/host/queues.rst
Original file line number Diff line number Diff line change
Expand Up @@ -24,25 +24,30 @@ Here is how to run one worker for each kind of job (in separate terminals):

.. code-block:: bash

$ flexmeasures jobs run-worker --name our-only-worker --queue forecasting|scheduling
$ flexmeasures jobs run-worker --name our-only-worker --queue forecasting|scheduling|ingestion

Running multiple workers in parallel might be a great idea.

.. code-block:: bash

$ flexmeasures jobs run-worker --name forecaster --queue forecasting
$ flexmeasures jobs run-worker --name scheduler --queue scheduling
$ flexmeasures jobs run-worker --name ingester --queue ingestion

You can also clear the job queues:

.. code-block:: bash

$ flexmeasures jobs clear-queue --queue forecasting
$ flexmeasures jobs clear-queue --queue scheduling
$ flexmeasures jobs clear-queue --queue ingestion


When the main FlexMeasures process runs (e.g. by ``flexmeasures run``\ ), the queues of forecasting and scheduling jobs can be visited at ``http://localhost:5000/tasks/forecasting`` and ``http://localhost:5000/tasks/schedules``\ , respectively (by admins).

.. note::
The ``ingestion`` queue is used for sensor data posted via the API. If no worker is connected to this queue, data is processed synchronously (in the web process) with a warning logged. Running a dedicated ingestion worker is recommended in production to keep API responses fast when large amounts of data are posted.



Inspect the queue and jobs
Expand Down
20 changes: 19 additions & 1 deletion flexmeasures/api/common/utils/api_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@
from werkzeug.exceptions import Forbidden, Unauthorized
from numpy import array
from psycopg2.errors import UniqueViolation
from rq import Worker
from rq.job import Job, JobStatus, NoSuchJobError
from sqlalchemy import select
from sqlalchemy.exc import IntegrityError

from flexmeasures.data import db
from flexmeasures.data.models.user import Account
from flexmeasures.data.services.data_ingestion import add_beliefs_to_database
from flexmeasures.data.utils import save_to_db
from flexmeasures.auth.policy import check_access
from flexmeasures.api.common.responses import (
Expand Down Expand Up @@ -138,7 +140,23 @@ def save_and_enqueue(
forecasting_jobs: list[Job] | None = None,
save_changed_beliefs_only: bool = True,
) -> ResponseTuple:
# Attempt to save
ingestion_queue = current_app.queues.get("ingestion")
if ingestion_queue is not None:
workers = Worker.all(queue=ingestion_queue)
if workers:
ingestion_queue.enqueue(
add_beliefs_to_database,
data,
forecasting_jobs=forecasting_jobs,
save_changed_beliefs_only=save_changed_beliefs_only,
)
return request_processed()
else:
current_app.logger.warning(
"No workers connected to the ingestion queue. Processing sensor data directly."
)

# Attempt to save directly (fallback when no ingestion workers are available)
status = save_to_db(data, save_changed_beliefs_only=save_changed_beliefs_only)
db.session.commit()

Expand Down
1 change: 1 addition & 0 deletions flexmeasures/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ def create( # noqa C901
app.queues = dict(
forecasting=Queue(connection=redis_conn, name="forecasting"),
scheduling=Queue(connection=redis_conn, name="scheduling"),
ingestion=Queue(connection=redis_conn, name="ingestion"),
# reporting=Queue(connection=redis_conn, name="reporting"),
# labelling=Queue(connection=redis_conn, name="labelling"),
# alerting=Queue(connection=redis_conn, name="alerting"),
Expand Down
4 changes: 2 additions & 2 deletions flexmeasures/cli/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ def run_job(job_id: str):
"--queue",
default=None,
required=True,
help="State which queue(s) to work on (using '|' as separator), e.g. 'forecasting', 'scheduling' or 'forecasting|scheduling'.",
help="State which queue(s) to work on (using '|' as separator), e.g. 'forecasting', 'scheduling', 'ingestion' or 'forecasting|scheduling'.",
)
@click.option(
"--name",
Expand All @@ -302,7 +302,7 @@ def run_job(job_id: str):
)
def run_worker(queue: str, name: str | None):
"""
Start a worker process for forecasting and/or scheduling jobs.
Start a worker process for forecasting, scheduling and/or ingestion jobs.

We use the app context to find out which redis queues to use.
"""
Expand Down
42 changes: 42 additions & 0 deletions flexmeasures/data/services/data_ingestion.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
"""
Logic around data ingestion (jobs)
"""

from __future__ import annotations

from flask import current_app
from rq.job import Job
import timely_beliefs as tb

from flexmeasures.data import db
from flexmeasures.data.utils import save_to_db


def add_beliefs_to_database(
data: tb.BeliefsDataFrame | list[tb.BeliefsDataFrame],
forecasting_jobs: list[Job] | None = None,
save_changed_beliefs_only: bool = True,
) -> str:
"""Save sensor data to the database and optionally enqueue forecasting jobs.

This function is intended to be called as an RQ job by an ingestion queue worker,
but can also be called directly (e.g. as a fallback when no workers are available).

:param data: BeliefsDataFrame (or list thereof) to be saved.
:param forecasting_jobs: Optional list of forecasting Jobs to enqueue after saving.
:param save_changed_beliefs_only: If True, skip saving beliefs whose value hasn't changed.
:returns: Status string, one of:
- 'success'
- 'success_with_unchanged_beliefs_skipped'
- 'success_but_nothing_new'
"""
status = save_to_db(data, save_changed_beliefs_only=save_changed_beliefs_only)
db.session.commit()

# Only enqueue forecasting jobs upon successfully saving new data
if status[:7] == "success" and status != "success_but_nothing_new":
if forecasting_jobs is not None:
for job in forecasting_jobs:
current_app.queues["forecasting"].enqueue_job(job)

return status
Loading