Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions documentation/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,11 @@ New features

Infrastructure / Support
----------------------
<<<<<<< feat/clean-up-old-rolling-forecasting-code
* Remove legacy rolling viewpoint forecasting code and utilities after migrating to fixed-point forecasting [see `PR #2082 <https://www.github.com/FlexMeasures/flexmeasures/pull/2082>`_]
=======
* Support coupling data sources to accounts, and preserve user ID and account ID references in audit logs and data sources for traceability and compliance [see `PR #2058 <https://www.github.com/FlexMeasures/flexmeasures/pull/2058>`_]
>>>>>>> main
* Stop creating new toy assets when restarting the docker-compose stack [see `PR #2018 <https://www.github.com/FlexMeasures/flexmeasures/pull/2018>`_]
* Migrate from ``pip`` to ``uv`` for dependency management, and from ``make`` to ``poe`` [see `PR #1973 <https://github.com/FlexMeasures/flexmeasures/pull/1973>`_]
* Improve contact information to get in touch with the FlexMeasures community [see `PR #2022 <https://www.github.com/FlexMeasures/flexmeasures/pull/2022>`_]
Expand Down
47 changes: 27 additions & 20 deletions flexmeasures/api/common/schemas/tests/test_sensor_data_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
PostSensorDataSchema,
GetSensorDataSchema,
)
from flexmeasures.data.models.forecasting.pipelines import TrainPredictPipeline
from flexmeasures.data.models.time_series import Sensor
from flexmeasures.data.services.forecasting import create_forecasting_jobs
from flexmeasures.data.services.scheduling import create_scheduling_job
from flexmeasures.data.services.sensors import (
get_stalenesses,
Expand Down Expand Up @@ -464,15 +464,6 @@ def test_asset_sensors_metadata(
]


def custom_model_params():
"""little training as we have little data, turn off transformations until they let this test run (TODO)"""
return dict(
training_and_testing_period=timedelta(hours=2),
outcome_var_transformation=None,
regressor_transformation={},
)


def test_build_asset_jobs_data(db, app, add_battery_assets):
"""Check that we get both types of jobs for a battery asset."""
battery_asset = add_battery_assets["Test battery"]
Expand All @@ -487,24 +478,40 @@ def test_build_asset_jobs_data(db, app, add_battery_assets):
belief_time=start,
resolution=timedelta(minutes=15),
)
forecasting_jobs = create_forecasting_jobs(
start_of_roll=as_server_time(datetime(2015, 1, 1, 6)),
end_of_roll=as_server_time(datetime(2015, 1, 1, 7)),
horizons=[timedelta(hours=1)],
sensor_id=battery.id,
custom_model_params=custom_model_params(),
pipeline = TrainPredictPipeline(
config={
"train-start": "2015-01-01T00:00:00+00:00",
"retrain-frequency": "PT1H",
}
)
pipeline_returns = pipeline.compute(
as_job=True,
parameters={
"sensor": battery.id,
"start": as_server_time(datetime(2015, 1, 1, 6)).isoformat(),
"end": as_server_time(datetime(2015, 1, 1, 7)).isoformat(),
"max-forecast-horizon": "PT1H",
"forecast-frequency": "PT1H",
},
)
forecasting_job = app.queues["forecasting"].fetch_job(pipeline_returns["job_id"])

jobs_data = build_asset_jobs_data(battery_asset)
assert sorted([j["queue"] for j in jobs_data]) == ["forecasting", "scheduling"]
forecasting_jobs_data = [j for j in jobs_data if j["queue"] == "forecasting"]
scheduling_jobs_data = [j for j in jobs_data if j["queue"] == "scheduling"]
assert len(forecasting_jobs_data) == 1
assert scheduling_jobs_data
scheduling_job_ids = set()
for job_data in jobs_data:
metadata = json.loads(job_data["metadata"])
if job_data["queue"] == "forecasting":
assert metadata["job_id"] == forecasting_jobs[0].id
assert metadata["job_id"] == forecasting_job.id
assert job_data["entity"] == f"sensor: {battery.name} (Id: {battery.id})"
else:
assert metadata["job_id"] == scheduling_job.id
scheduling_job_ids.add(metadata["job_id"])
assert job_data["status"] == "queued"
assert job_data["entity"] == f"sensor: {battery.name} (Id: {battery.id})"

assert scheduling_job.id in scheduling_job_ids

# Clean up queues
app.queues["scheduling"].empty()
Expand Down
109 changes: 19 additions & 90 deletions flexmeasures/cli/testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,16 @@

from flask import current_app as app
import click
from timetomodel import ModelState, create_fitted_model, evaluate_models

if os.name == "nt":
from rq_win import WindowsWorker as Worker
else:
from rq import Worker

from flexmeasures.data.models.forecasting import lookup_model_specs_configurator
from flexmeasures.data.models.time_series import TimedBelief
from flexmeasures.data.queries.sensors import (
query_sensor_by_name_and_generic_asset_type_name,
)
from flexmeasures.utils.time_utils import as_server_time
from flexmeasures.data.services.forecasting import (
create_forecasting_jobs,
handle_forecasting_exception,
)
from flexmeasures.data.models.forecasting.pipelines import TrainPredictPipeline
from flexmeasures.data.services.forecasting import handle_forecasting_exception

"""
These functions are meant for FlexMeasures developers to manually test some internal
Expand All @@ -35,7 +28,7 @@
# @app.cli.command()
def test_making_forecasts():
"""
Manual test to enqueue and process a forecasting job via redis queue
Manual test to enqueue and process a fixed-viewpoint forecasting job via redis queue.
"""

click.echo("Manual forecasting job queuing started ...")
Expand All @@ -54,11 +47,21 @@ def test_making_forecasts():
forecast_filter.delete()
click.echo("Forecasts found before : %d" % forecast_filter.count())

create_forecasting_jobs(
sensor_id=sensor_id,
horizons=[timedelta(hours=6)],
start_of_roll=as_server_time(datetime(2015, 4, 1)),
end_of_roll=as_server_time(datetime(2015, 4, 3)),
pipeline = TrainPredictPipeline(
config={
"train-start": "2015-03-01T00:00:00+00:00",
"retrain-frequency": "PT24H",
}
)
pipeline.compute(
as_job=True,
parameters={
"sensor": sensor_id,
"start": as_server_time(datetime(2015, 4, 1)).isoformat(),
"end": as_server_time(datetime(2015, 4, 3)).isoformat(),
"max-forecast-horizon": "PT6H",
"forecast-frequency": "PT24H",
},
)

click.echo("Queue before working: %s" % app.queues["forecasting"].jobs)
Expand All @@ -73,79 +76,5 @@ def test_making_forecasts():
click.echo("Queue after working: %s" % app.queues["forecasting"].jobs)

click.echo(
"Forecasts found after (should be 24 * 2 * 4 = 192): %d"
% forecast_filter.count()
"Forecasts found after processing the queue: %d" % forecast_filter.count()
)


# un-comment to use as CLI function
# @app.cli.command()
@click.option(
"--asset-type",
"generic_asset_type_names",
multiple=True,
required=True,
help="Name of generic asset type.",
)
@click.option("--sensor", "sensor_name", help="Name of sensor.")
@click.option(
"--from_date",
default="2015-03-10",
help="Forecast from date. Follow up with a date in the form yyyy-mm-dd.",
)
@click.option("--period", default=3, help="Forecasting period in days.")
@click.option(
"--horizon", "horizon_hours", default=1, help="Forecasting horizon in hours."
)
@click.option(
"--training", default=30, help="Number of days in the training and testing period."
)
def test_generic_model(
generic_asset_type_names: list[str],
sensor_name: str | None = None,
from_date: str = "2015-03-10",
period: int = 3,
horizon_hours: int = 1,
training: int = 30,
):
"""Manually test integration of timetomodel for our generic model."""

start = as_server_time(datetime.strptime(from_date, "%Y-%m-%d"))
end = start + timedelta(days=period)
training_and_testing_period = timedelta(days=training)
horizon = timedelta(hours=horizon_hours)

with app.app_context():
sensors = query_sensor_by_name_and_generic_asset_type_name(
sensor_name=sensor_name,
generic_asset_type_names=generic_asset_type_names,
).all()
if len(sensors) == 0:
click.echo("No such sensor in db, so I will not add any forecasts.")
raise click.Abort()
elif len(sensors) > 1:
click.echo("No unique sensor found in db, so I will not add any forecasts.")
raise click.Abort()

linear_model_configurator = lookup_model_specs_configurator("linear")
(
model_specs,
model_identifier,
fallback_model_identifier,
) = linear_model_configurator(
sensor=sensors[0],
forecast_start=start,
forecast_end=end,
forecast_horizon=horizon,
custom_model_params=dict(
training_and_testing_period=training_and_testing_period
),
)

# Create and train the model
model = create_fitted_model(model_specs, model_identifier)
print("\n\nparams:\n%s\n\n" % model.params)

evaluate_models(m1=ModelState(model, model_specs), plot_path=None)

return ModelState(model, model_specs)
47 changes: 1 addition & 46 deletions flexmeasures/data/models/forecasting/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,12 @@
import logging

from copy import deepcopy
from typing import Any, Callable

from timetomodel import ModelSpecs
from typing import Any

from flexmeasures.data.models.data_sources import DataGenerator
from flexmeasures.data.models.forecasting.custom_models.base_model import ( # noqa: F401
BaseModel,
)
from flexmeasures.data.models.forecasting.model_specs.naive import (
naive_specs_configurator as naive_specs,
)
from flexmeasures.data.models.forecasting.model_specs.linear_regression import (
ols_specs_configurator as linear_ols_specs,
)
from flexmeasures.data.schemas.forecasting import ForecasterConfigSchema


Expand All @@ -30,43 +22,6 @@ def filter(self, record):
# Apply the filter to Darts.models loggers
logging.getLogger("darts.models").addFilter(SuppressTorchWarning())

model_map = {
"naive": naive_specs,
"linear": linear_ols_specs,
"linear-ols": linear_ols_specs,
} # use lower case only


def lookup_model_specs_configurator(
model_search_term: str = "linear-OLS",
) -> Callable[
..., # See model_spec_factory.create_initial_model_specs for an up-to-date type annotation
# Annotating here would require Python>=3.10 (specifically, ParamSpec from PEP 612)
tuple[ModelSpecs, str, str],
]:
"""
This function maps a model-identifying search term to a model configurator function, which can make model meta data.
Why use a string? It might be stored on RQ jobs. It might also leave more freedom, we can then
map multiple terms to the same model or vice versa (e.g. when different versions exist).

Model meta data in this context means a tuple of:
* timetomodel.ModelSpecs. To fill in those specs, a configurator should accept:
- old_sensor: Asset | Market | WeatherSensor,
- start: datetime, # Start of forecast period
- end: datetime, # End of forecast period
- horizon: timedelta, # Duration between time of forecasting and time which is forecast
- ex_post_horizon: timedelta = None,
- custom_model_params: dict = None, # overwrite forecasting params, useful for testing or experimentation
* a model_identifier (useful in case the model_search_term was generic, e.g. "latest")
* a fallback_model_search_term: a string which the forecasting machinery can use to choose
a different model (using this mapping again) in case of failure.

So to implement a model, write such a function and decide here which search term(s) map(s) to it.
"""
if model_search_term.lower() not in model_map.keys():
raise Exception("No model found for search term '%s'" % model_search_term)
return model_map[model_search_term.lower()]


class Forecaster(DataGenerator):
__version__ = None
Expand Down
Loading
Loading