diff --git a/documentation/changelog.rst b/documentation/changelog.rst index 5da39a5a6a..398951daff 100644 --- a/documentation/changelog.rst +++ b/documentation/changelog.rst @@ -25,7 +25,11 @@ New features Infrastructure / Support ---------------------- +<<<<<<< feat/clean-up-old-rolling-forecasting-code +* Remove legacy rolling viewpoint forecasting code and utilities after migrating to fixed-point forecasting [see `PR #2082 `_] +======= * Support coupling data sources to accounts, and preserve user ID and account ID references in audit logs and data sources for traceability and compliance [see `PR #2058 `_] +>>>>>>> main * Stop creating new toy assets when restarting the docker-compose stack [see `PR #2018 `_] * Migrate from ``pip`` to ``uv`` for dependency management, and from ``make`` to ``poe`` [see `PR #1973 `_] * Improve contact information to get in touch with the FlexMeasures community [see `PR #2022 `_] diff --git a/flexmeasures/api/common/schemas/tests/test_sensor_data_schema.py b/flexmeasures/api/common/schemas/tests/test_sensor_data_schema.py index 52dae05cad..c8cb24fcc2 100644 --- a/flexmeasures/api/common/schemas/tests/test_sensor_data_schema.py +++ b/flexmeasures/api/common/schemas/tests/test_sensor_data_schema.py @@ -12,8 +12,8 @@ PostSensorDataSchema, GetSensorDataSchema, ) +from flexmeasures.data.models.forecasting.pipelines import TrainPredictPipeline from flexmeasures.data.models.time_series import Sensor -from flexmeasures.data.services.forecasting import create_forecasting_jobs from flexmeasures.data.services.scheduling import create_scheduling_job from flexmeasures.data.services.sensors import ( get_stalenesses, @@ -464,15 +464,6 @@ def test_asset_sensors_metadata( ] -def custom_model_params(): - """little training as we have little data, turn off transformations until they let this test run (TODO)""" - return dict( - training_and_testing_period=timedelta(hours=2), - outcome_var_transformation=None, - regressor_transformation={}, - ) - - def test_build_asset_jobs_data(db, app, add_battery_assets): """Check that we get both types of jobs for a battery asset.""" battery_asset = add_battery_assets["Test battery"] @@ -487,24 +478,40 @@ def test_build_asset_jobs_data(db, app, add_battery_assets): belief_time=start, resolution=timedelta(minutes=15), ) - forecasting_jobs = create_forecasting_jobs( - start_of_roll=as_server_time(datetime(2015, 1, 1, 6)), - end_of_roll=as_server_time(datetime(2015, 1, 1, 7)), - horizons=[timedelta(hours=1)], - sensor_id=battery.id, - custom_model_params=custom_model_params(), + pipeline = TrainPredictPipeline( + config={ + "train-start": "2015-01-01T00:00:00+00:00", + "retrain-frequency": "PT1H", + } ) + pipeline_returns = pipeline.compute( + as_job=True, + parameters={ + "sensor": battery.id, + "start": as_server_time(datetime(2015, 1, 1, 6)).isoformat(), + "end": as_server_time(datetime(2015, 1, 1, 7)).isoformat(), + "max-forecast-horizon": "PT1H", + "forecast-frequency": "PT1H", + }, + ) + forecasting_job = app.queues["forecasting"].fetch_job(pipeline_returns["job_id"]) jobs_data = build_asset_jobs_data(battery_asset) - assert sorted([j["queue"] for j in jobs_data]) == ["forecasting", "scheduling"] + forecasting_jobs_data = [j for j in jobs_data if j["queue"] == "forecasting"] + scheduling_jobs_data = [j for j in jobs_data if j["queue"] == "scheduling"] + assert len(forecasting_jobs_data) == 1 + assert scheduling_jobs_data + scheduling_job_ids = set() for job_data in jobs_data: metadata = json.loads(job_data["metadata"]) if job_data["queue"] == "forecasting": - assert metadata["job_id"] == forecasting_jobs[0].id + assert metadata["job_id"] == forecasting_job.id + assert job_data["entity"] == f"sensor: {battery.name} (Id: {battery.id})" else: - assert metadata["job_id"] == scheduling_job.id + scheduling_job_ids.add(metadata["job_id"]) assert job_data["status"] == "queued" - assert job_data["entity"] == f"sensor: {battery.name} (Id: {battery.id})" + + assert scheduling_job.id in scheduling_job_ids # Clean up queues app.queues["scheduling"].empty() diff --git a/flexmeasures/cli/testing.py b/flexmeasures/cli/testing.py index a444bde6f6..4dd4bd3a10 100644 --- a/flexmeasures/cli/testing.py +++ b/flexmeasures/cli/testing.py @@ -6,23 +6,16 @@ from flask import current_app as app import click -from timetomodel import ModelState, create_fitted_model, evaluate_models if os.name == "nt": from rq_win import WindowsWorker as Worker else: from rq import Worker -from flexmeasures.data.models.forecasting import lookup_model_specs_configurator from flexmeasures.data.models.time_series import TimedBelief -from flexmeasures.data.queries.sensors import ( - query_sensor_by_name_and_generic_asset_type_name, -) from flexmeasures.utils.time_utils import as_server_time -from flexmeasures.data.services.forecasting import ( - create_forecasting_jobs, - handle_forecasting_exception, -) +from flexmeasures.data.models.forecasting.pipelines import TrainPredictPipeline +from flexmeasures.data.services.forecasting import handle_forecasting_exception """ These functions are meant for FlexMeasures developers to manually test some internal @@ -35,7 +28,7 @@ # @app.cli.command() def test_making_forecasts(): """ - Manual test to enqueue and process a forecasting job via redis queue + Manual test to enqueue and process a fixed-viewpoint forecasting job via redis queue. """ click.echo("Manual forecasting job queuing started ...") @@ -54,11 +47,21 @@ def test_making_forecasts(): forecast_filter.delete() click.echo("Forecasts found before : %d" % forecast_filter.count()) - create_forecasting_jobs( - sensor_id=sensor_id, - horizons=[timedelta(hours=6)], - start_of_roll=as_server_time(datetime(2015, 4, 1)), - end_of_roll=as_server_time(datetime(2015, 4, 3)), + pipeline = TrainPredictPipeline( + config={ + "train-start": "2015-03-01T00:00:00+00:00", + "retrain-frequency": "PT24H", + } + ) + pipeline.compute( + as_job=True, + parameters={ + "sensor": sensor_id, + "start": as_server_time(datetime(2015, 4, 1)).isoformat(), + "end": as_server_time(datetime(2015, 4, 3)).isoformat(), + "max-forecast-horizon": "PT6H", + "forecast-frequency": "PT24H", + }, ) click.echo("Queue before working: %s" % app.queues["forecasting"].jobs) @@ -73,79 +76,5 @@ def test_making_forecasts(): click.echo("Queue after working: %s" % app.queues["forecasting"].jobs) click.echo( - "Forecasts found after (should be 24 * 2 * 4 = 192): %d" - % forecast_filter.count() + "Forecasts found after processing the queue: %d" % forecast_filter.count() ) - - -# un-comment to use as CLI function -# @app.cli.command() -@click.option( - "--asset-type", - "generic_asset_type_names", - multiple=True, - required=True, - help="Name of generic asset type.", -) -@click.option("--sensor", "sensor_name", help="Name of sensor.") -@click.option( - "--from_date", - default="2015-03-10", - help="Forecast from date. Follow up with a date in the form yyyy-mm-dd.", -) -@click.option("--period", default=3, help="Forecasting period in days.") -@click.option( - "--horizon", "horizon_hours", default=1, help="Forecasting horizon in hours." -) -@click.option( - "--training", default=30, help="Number of days in the training and testing period." -) -def test_generic_model( - generic_asset_type_names: list[str], - sensor_name: str | None = None, - from_date: str = "2015-03-10", - period: int = 3, - horizon_hours: int = 1, - training: int = 30, -): - """Manually test integration of timetomodel for our generic model.""" - - start = as_server_time(datetime.strptime(from_date, "%Y-%m-%d")) - end = start + timedelta(days=period) - training_and_testing_period = timedelta(days=training) - horizon = timedelta(hours=horizon_hours) - - with app.app_context(): - sensors = query_sensor_by_name_and_generic_asset_type_name( - sensor_name=sensor_name, - generic_asset_type_names=generic_asset_type_names, - ).all() - if len(sensors) == 0: - click.echo("No such sensor in db, so I will not add any forecasts.") - raise click.Abort() - elif len(sensors) > 1: - click.echo("No unique sensor found in db, so I will not add any forecasts.") - raise click.Abort() - - linear_model_configurator = lookup_model_specs_configurator("linear") - ( - model_specs, - model_identifier, - fallback_model_identifier, - ) = linear_model_configurator( - sensor=sensors[0], - forecast_start=start, - forecast_end=end, - forecast_horizon=horizon, - custom_model_params=dict( - training_and_testing_period=training_and_testing_period - ), - ) - - # Create and train the model - model = create_fitted_model(model_specs, model_identifier) - print("\n\nparams:\n%s\n\n" % model.params) - - evaluate_models(m1=ModelState(model, model_specs), plot_path=None) - - return ModelState(model, model_specs) diff --git a/flexmeasures/data/models/forecasting/__init__.py b/flexmeasures/data/models/forecasting/__init__.py index 5aa7683d5f..7481b820fd 100644 --- a/flexmeasures/data/models/forecasting/__init__.py +++ b/flexmeasures/data/models/forecasting/__init__.py @@ -3,20 +3,12 @@ import logging from copy import deepcopy -from typing import Any, Callable - -from timetomodel import ModelSpecs +from typing import Any from flexmeasures.data.models.data_sources import DataGenerator from flexmeasures.data.models.forecasting.custom_models.base_model import ( # noqa: F401 BaseModel, ) -from flexmeasures.data.models.forecasting.model_specs.naive import ( - naive_specs_configurator as naive_specs, -) -from flexmeasures.data.models.forecasting.model_specs.linear_regression import ( - ols_specs_configurator as linear_ols_specs, -) from flexmeasures.data.schemas.forecasting import ForecasterConfigSchema @@ -30,43 +22,6 @@ def filter(self, record): # Apply the filter to Darts.models loggers logging.getLogger("darts.models").addFilter(SuppressTorchWarning()) -model_map = { - "naive": naive_specs, - "linear": linear_ols_specs, - "linear-ols": linear_ols_specs, -} # use lower case only - - -def lookup_model_specs_configurator( - model_search_term: str = "linear-OLS", -) -> Callable[ - ..., # See model_spec_factory.create_initial_model_specs for an up-to-date type annotation - # Annotating here would require Python>=3.10 (specifically, ParamSpec from PEP 612) - tuple[ModelSpecs, str, str], -]: - """ - This function maps a model-identifying search term to a model configurator function, which can make model meta data. - Why use a string? It might be stored on RQ jobs. It might also leave more freedom, we can then - map multiple terms to the same model or vice versa (e.g. when different versions exist). - - Model meta data in this context means a tuple of: - * timetomodel.ModelSpecs. To fill in those specs, a configurator should accept: - - old_sensor: Asset | Market | WeatherSensor, - - start: datetime, # Start of forecast period - - end: datetime, # End of forecast period - - horizon: timedelta, # Duration between time of forecasting and time which is forecast - - ex_post_horizon: timedelta = None, - - custom_model_params: dict = None, # overwrite forecasting params, useful for testing or experimentation - * a model_identifier (useful in case the model_search_term was generic, e.g. "latest") - * a fallback_model_search_term: a string which the forecasting machinery can use to choose - a different model (using this mapping again) in case of failure. - - So to implement a model, write such a function and decide here which search term(s) map(s) to it. - """ - if model_search_term.lower() not in model_map.keys(): - raise Exception("No model found for search term '%s'" % model_search_term) - return model_map[model_search_term.lower()] - class Forecaster(DataGenerator): __version__ = None diff --git a/flexmeasures/data/models/forecasting/model_spec_factory.py b/flexmeasures/data/models/forecasting/model_spec_factory.py deleted file mode 100644 index bce0b259f1..0000000000 --- a/flexmeasures/data/models/forecasting/model_spec_factory.py +++ /dev/null @@ -1,314 +0,0 @@ -from __future__ import annotations - -from datetime import datetime, timedelta, tzinfo -from pprint import pformat -from typing import Any -import logging -import pytz - -from flask import current_app -from flexmeasures.data.queries.utils import ( - simplify_index, -) -from timely_beliefs import BeliefsDataFrame -from timetomodel import ModelSpecs -from timetomodel.exceptions import MissingData, NaNData -from timetomodel.speccing import SeriesSpecs -from timetomodel.transforming import ( - BoxCoxTransformation, - ReversibleTransformation, - Transformation, -) -import pandas as pd - -from flexmeasures.data.models.time_series import Sensor, TimedBelief -from flexmeasures.data.models.forecasting.utils import ( - create_lags, - set_training_and_testing_dates, - get_query_window, -) - -""" -Here we generate an initial version of timetomodel specs, given what asset and what timing -is defined. -These specs can be customized. -""" - - -logger = logging.getLogger(__name__) - - -class TBSeriesSpecs(SeriesSpecs): - """Compatibility for using timetomodel.SeriesSpecs with timely_beliefs.BeliefsDataFrames. - - This implements _load_series such that .search is called, - with the parameters in search_params. - The search function is expected to return a BeliefsDataFrame. - """ - - time_series_class: Any # with method (named "search" by default) - search_params: dict - - def __init__( - self, - search_params: dict, - name: str, - time_series_class: type | None = TimedBelief, - search_fnc: str = "search", - original_tz: tzinfo | None = pytz.utc, # postgres stores naive datetimes - feature_transformation: ReversibleTransformation | None = None, - post_load_processing: Transformation | None = None, - resampling_config: dict[str, Any] = None, - interpolation_config: dict[str, Any] = None, - ): - super().__init__( - name, - original_tz, - feature_transformation, - post_load_processing, - resampling_config, - interpolation_config, - ) - self.time_series_class = time_series_class - self.search_params = search_params - self.search_fnc = search_fnc - - def _load_series(self) -> pd.Series: - logger.info("Reading %s data from database" % self.time_series_class.__name__) - - bdf: BeliefsDataFrame = getattr(self.time_series_class, self.search_fnc)( - **self.search_params - ) - assert isinstance(bdf, BeliefsDataFrame) - df = simplify_index(bdf) - self.check_data(df) - - if self.post_load_processing is not None: - df = self.post_load_processing.transform_dataframe(df) - - return df["event_value"] - - def check_data(self, df: pd.DataFrame): - """Raise error if data is empty or contains nan values. - Here, other than in load_series, we can show the query, which is quite helpful. - """ - if df.empty: - raise MissingData( - "No values found in database for the requested %s data. It's no use to continue I'm afraid." - " Here's a print-out of what I tried to search for:\n\n%s\n\n" - % ( - self.time_series_class.__name__, - pformat(self.search_params, sort_dicts=False), - ) - ) - if df.isnull().values.any(): - raise NaNData( - "Nan values found in database for the requested %s data. It's no use to continue I'm afraid." - " Here's a print-out of what I tried to search for:\n\n%s\n\n" - % ( - self.time_series_class.__name__, - pformat(self.search_params, sort_dicts=False), - ) - ) - - -def create_initial_model_specs( # noqa: C901 - sensor: Sensor, - forecast_start: datetime, # Start of forecast period - forecast_end: datetime, # End of forecast period - forecast_horizon: timedelta, # Duration between time of forecasting and end time of the event that is forecast - ex_post_horizon: timedelta | None = None, - transform_to_normal: bool = True, - use_regressors: bool = True, # If false, do not create regressor specs - use_periodicity: bool = True, # If false, do not create lags given the asset's periodicity - custom_model_params: ( - dict | None - ) = None, # overwrite model params, most useful for tests or experiments - time_series_class: type | None = TimedBelief, -) -> ModelSpecs: - """ - Generic model specs for all asset types (also for markets and weather sensors) and horizons. - Fills in training, testing periods, lags. Specifies input and regressor data. - Does not fill in which model to actually use. - TODO: check if enough data is available both for lagged variables and regressors - TODO: refactor assets and markets to store a list of pandas offset or timedelta instead of booleans for - seasonality, because e.g. although solar and building assets both have daily seasonality, only the former is - insensitive to daylight savings. Therefore: solar periodicity is 24 hours, while building periodicity is 1 - calendar day. - """ - - params = _parameterise_forecasting_by_asset_and_asset_type( - sensor, transform_to_normal - ) - params.update(custom_model_params if custom_model_params is not None else {}) - - lags = create_lags( - params["n_lags"], - sensor, - forecast_horizon, - params["resolution"], - use_periodicity, - ) - - training_start, testing_end = set_training_and_testing_dates( - forecast_start, params["training_and_testing_period"] - ) - query_window = get_query_window(training_start, forecast_end, lags) - - regressor_specs = [] - regressor_transformation = {} - if use_regressors: - if custom_model_params: - if custom_model_params.get("regressor_transformation", None) is not None: - regressor_transformation = custom_model_params.get( - "regressor_transformation", {} - ) - regressor_specs = configure_regressors_for_nearest_weather_sensor( - sensor, - query_window, - forecast_horizon, - regressor_transformation, - transform_to_normal, - ) - - if ex_post_horizon is None: - ex_post_horizon = timedelta(hours=0) - - outcome_var_spec = TBSeriesSpecs( - name=sensor.generic_asset.generic_asset_type.name, - time_series_class=time_series_class, - search_params=dict( - sensors=sensor, - event_starts_after=query_window[0], - event_ends_before=query_window[1], - horizons_at_least=None, - horizons_at_most=ex_post_horizon, - ), - feature_transformation=params.get("outcome_var_transformation", None), - interpolation_config={"method": "time"}, - ) - # Set defaults if needed - if params.get("event_resolution", None) is None: - params["event_resolution"] = sensor.event_resolution - if params.get("remodel_frequency", None) is None: - params["remodel_frequency"] = timedelta(days=7) - specs = ModelSpecs( - outcome_var=outcome_var_spec, - model=None, # at least this will need to be configured still to make these specs usable! - frequency=params[ - "event_resolution" - ], # todo: timetomodel doesn't distinguish frequency and resolution yet - horizon=forecast_horizon, - lags=[int(lag / params["event_resolution"]) for lag in lags], - regressors=regressor_specs, - start_of_training=training_start, - end_of_testing=testing_end, - ratio_training_testing_data=params["ratio_training_testing_data"], - remodel_frequency=params["remodel_frequency"], - ) - - return specs - - -def _parameterise_forecasting_by_asset_and_asset_type( - sensor: Sensor, - transform_to_normal: bool, -) -> dict: - """Fill in the best parameters we know (generic or by asset (type))""" - params = dict() - - params["training_and_testing_period"] = timedelta(days=30) - params["ratio_training_testing_data"] = 14 / 15 - params["n_lags"] = 7 - params["resolution"] = sensor.event_resolution - - if transform_to_normal: - params["outcome_var_transformation"] = ( - get_normalization_transformation_from_sensor_attributes(sensor) - ) - - return params - - -def get_normalization_transformation_from_sensor_attributes( - sensor: Sensor, -) -> Transformation | None: - """ - Transform data to be normal, using the BoxCox transformation. Lambda parameter is chosen - according to the asset type. - """ - if ( - sensor.get_attribute("is_consumer") and not sensor.get_attribute("is_producer") - ) or ( - sensor.get_attribute("is_producer") and not sensor.get_attribute("is_consumer") - ): - return BoxCoxTransformation(lambda2=0.1) - elif sensor.generic_asset.generic_asset_type.name in [ - "wind speed", - "irradiance", - ]: - # Values cannot be negative and are often zero - return BoxCoxTransformation(lambda2=0.1) - elif sensor.generic_asset.generic_asset_type.name == "temperature": - # Values can be positive or negative when given in degrees Celsius, but non-negative only in Kelvin - return BoxCoxTransformation(lambda2=273.16) - else: - return None - - -def configure_regressors_for_nearest_weather_sensor( - sensor: Sensor, - query_window, - horizon, - regressor_transformation, # the regressor transformation can be passed in - transform_to_normal, # if not, it a normalization can be applied -) -> list[TBSeriesSpecs]: - """We use weather data as regressors. Here, we configure them.""" - regressor_specs = [] - correlated_sensor_names = sensor.get_attribute("weather_correlations") - if correlated_sensor_names: - current_app.logger.info( - "For %s, I need sensors: %s" % (sensor.name, correlated_sensor_names) - ) - for sensor_name in correlated_sensor_names: - - # Find the nearest weather sensor - closest_sensor = Sensor.find_closest( - generic_asset_type_name="weather station", - sensor_name=sensor_name, - object=sensor, - ) - if closest_sensor is None: - current_app.logger.warning( - "No sensor found of sensor type %s to use as regressor for %s." - % (sensor_name, sensor.name) - ) - else: - current_app.logger.info( - "Using sensor %s as regressor for %s." % (sensor_name, sensor.name) - ) - # Collect the weather data for the requested time window - regressor_specs_name = "%s_l0" % sensor_name - if len(regressor_transformation.keys()) == 0 and transform_to_normal: - regressor_transformation = ( - get_normalization_transformation_from_sensor_attributes( - closest_sensor, - ) - ) - regressor_specs.append( - TBSeriesSpecs( - name=regressor_specs_name, - time_series_class=TimedBelief, - search_params=dict( - sensors=closest_sensor, - event_starts_after=query_window[0], - event_ends_before=query_window[1], - horizons_at_least=horizon, - horizons_at_most=None, - ), - feature_transformation=regressor_transformation, - interpolation_config={"method": "time"}, - ) - ) - - return regressor_specs diff --git a/flexmeasures/data/models/forecasting/model_specs/__init__.py b/flexmeasures/data/models/forecasting/model_specs/__init__.py deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/flexmeasures/data/models/forecasting/model_specs/linear_regression.py b/flexmeasures/data/models/forecasting/model_specs/linear_regression.py deleted file mode 100644 index 695b49a746..0000000000 --- a/flexmeasures/data/models/forecasting/model_specs/linear_regression.py +++ /dev/null @@ -1,24 +0,0 @@ -from typing import Optional - -from statsmodels.api import OLS - -from flexmeasures.data.models.forecasting.model_spec_factory import ( - create_initial_model_specs, -) - -""" -Simple linear regression by ordinary least squares. -""" - -# update this version if small things like parametrisation change -version: int = 2 -# if a forecasting job using this model fails, fall back on this one -fallback_model_search_term: Optional[str] = "naive" - - -def ols_specs_configurator(**kwargs): - """Create and customize initial specs with OLS. See model_spec_factory for param docs.""" - model_specs = create_initial_model_specs(**kwargs) - model_specs.set_model(OLS) - model_identifier = "linear-OLS model v%d" % version - return model_specs, model_identifier, fallback_model_search_term diff --git a/flexmeasures/data/models/forecasting/model_specs/naive.py b/flexmeasures/data/models/forecasting/model_specs/naive.py deleted file mode 100644 index 9884de838e..0000000000 --- a/flexmeasures/data/models/forecasting/model_specs/naive.py +++ /dev/null @@ -1,44 +0,0 @@ -from datetime import timedelta - -from statsmodels.api import OLS - -from flexmeasures.data.models.forecasting.model_spec_factory import ( - create_initial_model_specs, -) - -""" -Naive model, which simply copies the measurement from which the forecasts is made. Useful as a fallback. - -Technically, the model has no regressors and just one lag - made using the horizon. -The value to be copied is this one lag. -This is because we assume the forecast is made for the very reason that the data point at this lag exists - why else would one make -a prediction from there with this horizon? -""" - -# update this version if small things like parametrisation change -version: int = 1 -# if a forecasting job using this model fails, fall back on this one -fallback_model_search_term: str | None = None - - -class Naive(OLS): - """Naive prediction model for a single input feature that simply throws back the given feature. - Under the hood, it uses linear regression by ordinary least squares, trained with points (0,0) and (1,1). - """ - - def __init__(self, *args, **kwargs): - super().__init__([0, 1], [0, 1]) - - -def naive_specs_configurator(**kwargs): - """Create and customize initial specs with OLS. See model_spec_factory for param docs.""" - kwargs["transform_to_normal"] = False - kwargs["use_regressors"] = False - kwargs["use_periodicity"] = False - kwargs["custom_model_params"] = dict( - training_and_testing_period=timedelta(hours=0), n_lags=1 - ) - model_specs = create_initial_model_specs(**kwargs) - model_specs.set_model(Naive, library_name="statsmodels") - model_identifier = "naive model v%d" % version - return model_specs, model_identifier, fallback_model_search_term diff --git a/flexmeasures/data/models/forecasting/utils.py b/flexmeasures/data/models/forecasting/utils.py index 047862b42c..0b474e5694 100644 --- a/flexmeasures/data/models/forecasting/utils.py +++ b/flexmeasures/data/models/forecasting/utils.py @@ -7,148 +7,8 @@ from flexmeasures.data.models.time_series import Sensor from datetime import datetime, timedelta -from sqlalchemy import select from flexmeasures.data import db -from flexmeasures.data.models.forecasting.exceptions import NotEnoughDataException -from flexmeasures.utils.time_utils import as_server_time - - -def check_data_availability( - old_sensor_model, - old_time_series_data_model, - forecast_start: datetime, - forecast_end: datetime, - query_window: tuple[datetime, datetime], - horizon: timedelta, -): - """Check if enough data is available in the database in the first place, - for training window and lagged variables. Otherwise, suggest new forecast period. - TODO: we could also check regressor data, if we get regressor specs passed in here. - TODO: The join is probably not needed, should be removed. The speed impactof join is negligible - """ - from sqlalchemy import func - - # Use aggregate MIN and MAX queries in the database, matching O(n) approach - first_q = ( - select(func.min(old_time_series_data_model.event_start)) - .join(old_sensor_model.__class__) - .filter(old_sensor_model.__class__.id == old_sensor_model.id) - ) - last_q = ( - select(func.max(old_time_series_data_model.event_start)) - .join(old_sensor_model.__class__) - .filter(old_sensor_model.__class__.id == old_sensor_model.id) - ) - first_event_start = db.session.execute(first_q).scalar() - last_event_start = db.session.execute(last_q).scalar() - if first_event_start is None or last_event_start is None: - raise NotEnoughDataException( - "No data available at all. Forecasting impossible." - ) - first = as_server_time(first_event_start) - last = as_server_time(last_event_start) - if query_window[0] < first: - suggested_start = forecast_start + (first - query_window[0]) - raise NotEnoughDataException( - f"Not enough data to forecast {old_sensor_model.name} " - f"for the forecast window {as_server_time(forecast_start)} to {as_server_time(forecast_end)}. " - f"I needed to query from {as_server_time(query_window[0])}, " - f"but the first value available is from {first} to {first + old_sensor_model.event_resolution}. " - f"Consider setting the start date to {as_server_time(suggested_start)}." - ) - if query_window[1] - horizon > last + old_sensor_model.event_resolution: - suggested_end = forecast_end + (last - (query_window[1] - horizon)) - raise NotEnoughDataException( - f"Not enough data to forecast {old_sensor_model.name} " - f"for the forecast window {as_server_time(forecast_start)} to {as_server_time(forecast_end)}. " - f"I needed to query until {as_server_time(query_window[1] - horizon)}, " - f"but the last value available is from {last} to {last + old_sensor_model.event_resolution}. " - f"Consider setting the end date to {as_server_time(suggested_end)}." - ) - - -def create_lags( - n_lags: int, - sensor: Sensor, - horizon: timedelta, - resolution: timedelta, - use_periodicity: bool, -) -> list[timedelta]: - """List the lags for this asset type, using horizon and resolution information.""" - lags = [] - - # Include a zero lag in case of backwards forecasting - # Todo: we should always take into account the latest forecast, so always append the zero lag if that belief exists - if horizon < timedelta(hours=0): - lags.append(timedelta(hours=0)) - - # Include latest measurements - lag_period = resolution - number_of_nan_lags = 1 + (horizon - resolution) // lag_period - for L in range(n_lags): - lags.append((L + number_of_nan_lags) * lag_period) - - # Include relevant measurements given the asset's periodicity - if use_periodicity and sensor.get_attribute("daily_seasonality"): - lag_period = timedelta(days=1) - number_of_nan_lags = 1 + (horizon - resolution) // lag_period - for L in range(n_lags): - lags.append((L + number_of_nan_lags) * lag_period) - - # Remove possible double entries - return list(set(lags)) - - -def get_query_window( - training_start: datetime, forecast_end: datetime, lags: list[timedelta] -) -> tuple[datetime, datetime]: - """Derive query window from start and end date, as well as lags (if any). - This makes sure we have enough data for lagging and forecasting.""" - if not lags: - query_start = training_start - else: - query_start = training_start - max(lags) - query_end = forecast_end - return query_start, query_end - - -def set_training_and_testing_dates( - forecast_start: datetime, - training_and_testing_period: timedelta | tuple[datetime, datetime], -) -> tuple[datetime, datetime]: - """If needed (if training_and_testing_period is a timedelta), - derive training_start and testing_end from forecasting_start, - otherwise simply return training_and_testing_period. - - - |------forecast_horizon/belief_horizon------| - | |-------resolution-------| - belief_time event_start event_end - - - |--resolution--|--resolution--|--resolution--|--resolution--|--resolution--|--resolution--| - |---------forecast_horizon--------| | | | | | - belief_time event_start | | | | | | - |---------forecast_horizon--------| | | | | - belief_time event_start | | | | | - | |---------forecast_horizon--------| | | | - | belief_time event_start | | | | - |--------max_lag-------|--------training_and_testing_period---------|---------------forecast_period--------------| - query_start training_start | | testing_end/forecast_start | forecast_end - |------min_lag-----| | |---------forecast_horizon--------| | | - | | belief_time event_start | | | - | | | |---------forecast_horizon--------| | - | | | belief_time event_start | | - | | | | |---------forecast_horizon--------| - | | | | belief_time event_start | - |--------------------------------------------------query_window--------------------------------------------------| - - """ - if isinstance(training_and_testing_period, timedelta): - return forecast_start - training_and_testing_period, forecast_start - else: - return training_and_testing_period def negative_to_zero(x: np.ndarray) -> np.ndarray: diff --git a/flexmeasures/data/scripts/data_gen.py b/flexmeasures/data/scripts/data_gen.py index cf310b7659..f2c1a7bde1 100644 --- a/flexmeasures/data/scripts/data_gen.py +++ b/flexmeasures/data/scripts/data_gen.py @@ -4,25 +4,17 @@ from __future__ import annotations -from datetime import datetime, timedelta +from datetime import timedelta -import pandas as pd from flask import current_app as app from flask_sqlalchemy import SQLAlchemy import click from sqlalchemy import func, and_, select, delete -from timetomodel.forecasting import make_rolling_forecasts -from timetomodel.exceptions import MissingData, NaNData -from humanize import naturaldelta -import inflect from flexmeasures.data.models.time_series import Sensor, TimedBelief from flexmeasures.data.models.generic_assets import GenericAssetType, GenericAsset from flexmeasures.data.models.data_sources import DataSource from flexmeasures.data.models.user import User, Role, AccountRole -from flexmeasures.data.models.forecasting import lookup_model_specs_configurator -from flexmeasures.data.models.forecasting.exceptions import NotEnoughDataException -from flexmeasures.utils.time_utils import ensure_local_timezone from flexmeasures.data.transactional import as_transaction from flexmeasures.cli.utils import MsgStyle @@ -30,8 +22,6 @@ BACKUP_PATH = app.config.get("FLEXMEASURES_DB_BACKUP_PATH") LOCAL_TIME_ZONE = app.config.get("FLEXMEASURES_TIMEZONE") -infl_eng = inflect.engine() - def add_default_data_sources(db: SQLAlchemy): for source_name, source_type in ( @@ -189,107 +179,6 @@ def populate_initial_structure(db: SQLAlchemy): ) -@as_transaction # noqa: C901 -def populate_time_series_forecasts( # noqa: C901 - db: SQLAlchemy, - sensor_ids: list[int], - horizons: list[timedelta], - forecast_start: datetime, - forecast_end: datetime, - event_resolution: timedelta | None = None, -): - training_and_testing_period = timedelta(days=30) - - click.echo( - "Populating the database %s with time series forecasts of %s ahead ..." - % (db.engine, infl_eng.join([naturaldelta(horizon) for horizon in horizons])) - ) - - # Set a data source for the forecasts - data_source = db.session.execute( - select(DataSource).filter_by(name="Seita", type="demo script") - ).scalar_one_or_none() - # List all sensors for which to forecast. - sensors = [ - db.session.execute( - select(Sensor).filter(Sensor.id.in_(sensor_ids)) - ).scalar_one_or_none() - ] - if not sensors: - click.echo("No such sensors in db, so I will not add any forecasts.") - return - - # Make a model for each sensor and horizon, make rolling forecasts and save to database. - # We cannot use (faster) bulk save, as forecasts might become regressors in other forecasts. - for sensor in sensors: - for horizon in horizons: - try: - default_model = lookup_model_specs_configurator() - model_specs, model_identifier, model_fallback = default_model( - sensor=sensor, - forecast_start=forecast_start, - forecast_end=forecast_end, - forecast_horizon=horizon, - custom_model_params=dict( - training_and_testing_period=training_and_testing_period, - event_resolution=event_resolution, - ), - ) - click.echo( - "Computing forecasts of %s ahead for sensor %s, " - "from %s to %s with a training and testing period of %s, using %s ..." - % ( - naturaldelta(horizon), - sensor.id, - forecast_start, - forecast_end, - naturaldelta(training_and_testing_period), - model_identifier, - ) - ) - model_specs.creation_time = forecast_start - forecasts, model_state = make_rolling_forecasts( - start=forecast_start, end=forecast_end, model_specs=model_specs - ) - # Upsample to sensor resolution if needed - if forecasts.index.freq > pd.Timedelta(sensor.event_resolution): - forecasts = model_specs.outcome_var.resample_data( - forecasts, - time_window=(forecasts.index.min(), forecasts.index.max()), - expected_frequency=sensor.event_resolution, - ) - except (NotEnoughDataException, MissingData, NaNData) as e: - click.echo("Skipping forecasts for sensor %s: %s" % (sensor, str(e))) - continue - - beliefs = [ - TimedBelief( - event_start=ensure_local_timezone(dt, tz_name=LOCAL_TIME_ZONE), - belief_horizon=horizon, - event_value=value, - sensor=sensor, - source=data_source, - ) - for dt, value in forecasts.items() - ] - - click.echo( - "Saving %s %s-forecasts for %s..." - % (len(beliefs), naturaldelta(horizon), sensor.id) - ) - for belief in beliefs: - db.session.add(belief) - - click.echo( - "DB now has %d forecasts" - % db.session.scalar( - select(func.count()) - .select_from(TimedBelief) - .filter(TimedBelief.belief_horizon > timedelta(hours=0)) - ) - ) - - @as_transaction def depopulate_structure(db: SQLAlchemy): click.echo("Depopulating structural data from the database %s ..." % db.engine) diff --git a/flexmeasures/data/services/forecasting.py b/flexmeasures/data/services/forecasting.py index 9b8089f288..9d1a98c105 100644 --- a/flexmeasures/data/services/forecasting.py +++ b/flexmeasures/data/services/forecasting.py @@ -1,275 +1,18 @@ -""" -Logic around scheduling (jobs) -""" +"""Forecasting job utilities.""" from __future__ import annotations -from datetime import datetime, timedelta +from datetime import datetime -from flask import current_app import click -from isodate import duration_isoformat -from rq import get_current_job -from rq.job import Job -from timetomodel.forecasting import make_rolling_forecasts -import timely_beliefs as tb - -from flexmeasures.data import db -from flexmeasures.data.models.forecasting import lookup_model_specs_configurator -from flexmeasures.data.models.forecasting.exceptions import InvalidHorizonException -from flexmeasures.data.models.time_series import Sensor, TimedBelief -from flexmeasures.data.models.forecasting.utils import ( - get_query_window, - check_data_availability, -) -from flexmeasures.data.utils import get_data_source, save_to_db -from flexmeasures.utils.time_utils import ( - as_server_time, - server_now, - forecast_horizons_for, - supported_horizons, -) - -""" -The life cycle of a forecasting job: -1. A forecasting job is born in create_forecasting_jobs. -2. It is run in make_rolling_viewpoint_forecasts or make_fixed_viewpoint_forecasts, which write results to the db. - This is also where model specs are configured and a possible fallback model is stored for step 3. -3. If an error occurs (and the worker is configured accordingly), handle_forecasting_exception comes in. - This might re-enqueue the job or try a different model (which creates a new job). -""" # TODO: we could also monitor the failed queue and re-enqueue jobs who had missing data # (and maybe failed less than three times so far) -class MisconfiguredForecastingJobException(Exception): - pass - - -def create_forecasting_jobs( - sensor_id: int, - start_of_roll: datetime, - end_of_roll: datetime, - resolution: timedelta = None, - horizons: list[timedelta] = None, - model_search_term="linear-OLS", - custom_model_params: dict = None, - enqueue: bool = True, -) -> list[Job]: - """Create forecasting jobs by rolling through a time window, for a number of given forecast horizons. - Start and end of the forecasting jobs are equal to the time window (start_of_roll, end_of_roll) plus the horizon. - - For example (with shorthand notation): - - start_of_roll = 3pm - end_of_roll = 5pm - resolution = 15min - horizons = [1h, 6h, 1d] - - This creates the following 3 jobs: - - 1) forecast each quarter-hour from 4pm to 6pm, i.e. the 1h forecast - 2) forecast each quarter-hour from 9pm to 11pm, i.e. the 6h forecast - 3) forecast each quarter-hour from 3pm to 5pm the next day, i.e. the 1d forecast - - If not given, relevant horizons are derived from the resolution of the posted data. - - The job needs a model configurator, for which you can supply a model search term. If omitted, the - current default model configuration will be used. - - It's possible to customize model parameters, but this feature is (currently) meant to only - be used by tests, so that model behaviour can be adapted to test conditions. If used outside - of testing, an exception is raised. - - if enqueue is True (default), the jobs are put on the redis queue. - - Returns the redis-queue forecasting jobs which were created. - """ - if horizons is None: - if resolution is None: - raise MisconfiguredForecastingJobException( - "Cannot create forecasting jobs - set either horizons or resolution." - ) - horizons = forecast_horizons_for(resolution) - jobs: list[Job] = [] - for horizon in horizons: - job = Job.create( - make_rolling_viewpoint_forecasts, - kwargs=dict( - sensor_id=sensor_id, - horizon=horizon, - start=start_of_roll + horizon, - end=end_of_roll + horizon, - custom_model_params=custom_model_params, - ), - connection=current_app.queues["forecasting"].connection, - ttl=int( - current_app.config.get( - "FLEXMEASURES_JOB_TTL", timedelta(-1) - ).total_seconds() - ), - ) - job.meta["model_search_term"] = model_search_term - # Serialize forecast kwargs for display in the job page - # Workaround for https://github.com/Parallels/rq-dashboard/issues/510 - job.meta["forecast_kwargs"] = { - "sensor_id": sensor_id, - "horizon": duration_isoformat(horizon), - "start": (start_of_roll + horizon).isoformat(), - "end": (end_of_roll + horizon).isoformat(), - } - job.save_meta() - jobs.append(job) - if enqueue: - current_app.queues["forecasting"].enqueue_job(job) - current_app.job_cache.add( - sensor_id, job.id, queue="forecasting", asset_or_sensor_type="sensor" - ) - return jobs - - -def make_fixed_viewpoint_forecasts( - sensor_id: int, - horizon: timedelta, - start: datetime, - end: datetime, - custom_model_params: dict = None, -) -> int: - """Build forecasting model specs, make fixed-viewpoint forecasts, and save the forecasts made. - - Each individual forecast is a belief about a time interval. - Fixed-viewpoint forecasts share the same belief time. - See the timely-beliefs lib for relevant terminology. - """ - # todo: implement fixed-viewpoint forecasts - raise NotImplementedError - - -def make_rolling_viewpoint_forecasts( - sensor_id: int, - horizon: timedelta, - start: datetime, - end: datetime, - custom_model_params: dict = None, -) -> int: - """Build forecasting model specs, make rolling-viewpoint forecasts, and save the forecasts made. - - Each individual forecast is a belief about a time interval. - Rolling-viewpoint forecasts share the same belief horizon (the duration between belief time and knowledge time). - Model specs are also retrained in a rolling fashion, but with its own frequency set in custom_model_params. - See the timely-beliefs lib for relevant terminology. - - Parameters - ---------- - :param sensor_id: int - To identify which sensor to forecast - :param horizon: timedelta - duration between the end of each interval and the time at which the belief about that interval is formed - :param start: datetime - start of forecast period, i.e. start time of the first interval to be forecast - :param end: datetime - end of forecast period, i.e end time of the last interval to be forecast - :param custom_model_params: dict - pass in params which will be passed to the model specs configurator, - e.g. outcome_var_transformation, only advisable to be used for testing. - :returns: int - the number of forecasts made - """ - # https://docs.sqlalchemy.org/en/13/faq/connections.html#how-do-i-use-engines-connections-sessions-with-python-multiprocessing-or-os-fork - db.engine.dispose() - - rq_job = get_current_job() - - # find out which model to run, fall back to latest recommended - model_search_term = rq_job.meta.get("model_search_term", "linear-OLS") - - # find sensor - sensor = db.session.get(Sensor, sensor_id) - - click.echo( - "Running Forecasting Job %s: %s for %s on model '%s', from %s to %s" - % (rq_job.id, sensor, horizon, model_search_term, start, end) - ) - - if hasattr(sensor, "market_type"): - ex_post_horizon = None # Todo: until we sorted out the ex_post_horizon, use all available price data - else: - ex_post_horizon = timedelta(hours=0) - - # Make model specs - model_configurator = lookup_model_specs_configurator(model_search_term) - model_specs, model_identifier, fallback_model_search_term = model_configurator( - sensor=sensor, - forecast_start=as_server_time(start), - forecast_end=as_server_time(end), - forecast_horizon=horizon, - ex_post_horizon=ex_post_horizon, - custom_model_params=custom_model_params, - ) - model_specs.creation_time = server_now() - - rq_job.meta["model_identifier"] = model_identifier - rq_job.meta["fallback_model_search_term"] = fallback_model_search_term - rq_job.save() - - # before we run the model, check if horizon is okay and enough data is available - if horizon not in supported_horizons(): - raise InvalidHorizonException( - "Invalid horizon on job %s: %s" % (rq_job.id, horizon) - ) - - query_window = get_query_window( - model_specs.start_of_training, - end, - [lag * model_specs.frequency for lag in model_specs.lags], - ) - check_data_availability( - sensor, - TimedBelief, - start, - end, - query_window, - horizon, - ) - - data_source = get_data_source( - data_source_name="Seita (%s)" - % rq_job.meta.get("model_identifier", "unknown model"), - data_source_type="forecasting script", - ) - - forecasts, model_state = make_rolling_forecasts( - start=as_server_time(start), - end=as_server_time(end), - model_specs=model_specs, - ) - click.echo("Job %s made %d forecasts." % (rq_job.id, len(forecasts))) - - ts_value_forecasts = [ - TimedBelief( - event_start=dt, - belief_horizon=horizon, - event_value=value, - sensor=sensor, - source=data_source, - ) - for dt, value in forecasts.items() - ] - bdf = tb.BeliefsDataFrame(ts_value_forecasts) - save_to_db(bdf) - db.session.commit() - - return len(forecasts) - - def handle_forecasting_exception(job, exc_type, exc_value, traceback): - """ - Decide if we can do something about this failure: - * Try a different model - * Re-queue at a later time (using rq_scheduler) - """ + """Persist forecasting job failure metadata without queueing a legacy fallback.""" click.echo( "HANDLING RQ FORECASTING WORKER EXCEPTION: %s:%s\n" % (exc_type, exc_value) ) @@ -280,26 +23,16 @@ def handle_forecasting_exception(job, exc_type, exc_value, traceback): job.meta["failures"] = job.meta["failures"] + 1 job.save_meta() - # We might use this to decide if we want to re-queue a failed job - # if job.meta['failures'] < 3: - # job.queue.failures.requeue(job) - - # TODO: use this to add more meta information? - # if exc_type == NotEnoughDataException: - - if "fallback_model_search_term" in job.meta: - if job.meta["fallback_model_search_term"] is not None: - new_job = Job.create( - make_rolling_viewpoint_forecasts, - args=job.args, - kwargs=job.kwargs, - connection=current_app.queues["forecasting"].connection, - ) - new_job.meta["model_search_term"] = job.meta["fallback_model_search_term"] - new_job.save_meta() - current_app.queues["forecasting"].enqueue_job(new_job) - + job.meta["exception"] = { + "type": exc_type.__name__ if exc_type is not None else None, + "message": str(exc_value), + } + if isinstance(job.meta.get("start"), datetime): + job.meta["start"] = job.meta["start"].isoformat() + if isinstance(job.meta.get("end"), datetime): + job.meta["end"] = job.meta["end"].isoformat() + job.save_meta() -def num_forecasts(start: datetime, end: datetime, resolution: timedelta) -> int: - """Compute how many forecasts a job needs to make, given a resolution""" - return (end - start) // resolution + # The fixed-viewpoint pipeline is the only supported forecasting path, so + # failed jobs stay failed until a user retries with an updated request. + return False diff --git a/flexmeasures/data/tests/conftest.py b/flexmeasures/data/tests/conftest.py index c0484442ae..c901aa9683 100644 --- a/flexmeasures/data/tests/conftest.py +++ b/flexmeasures/data/tests/conftest.py @@ -7,7 +7,6 @@ import pandas as pd import numpy as np from flask_sqlalchemy import SQLAlchemy -from statsmodels.api import OLS from flexmeasures import AssetType, Asset, Sensor import timely_beliefs as tb from sqlalchemy import select @@ -18,10 +17,6 @@ from flexmeasures.data.models.data_sources import DataSource from flexmeasures.data.models.time_series import TimedBelief from flexmeasures.data.models.generic_assets import GenericAsset, GenericAssetType -from flexmeasures.data.models.forecasting import model_map -from flexmeasures.data.models.forecasting.model_spec_factory import ( - create_initial_model_specs, -) from flexmeasures.utils.time_utils import as_server_time from marshmallow import fields @@ -96,24 +91,6 @@ def add_test_weather_sensor_and_forecasts(db: SQLAlchemy, setup_generic_asset_ty ) -@pytest.fixture(scope="module", autouse=True) -def add_failing_test_model(db): - """Add a test model specs to the lookup which should fail due to missing data. - It falls back to linear OLS (which falls back to naive).""" - - def test_specs(**args): - """Customize initial specs with OLS and too early training start.""" - model_specs = create_initial_model_specs(**args) - model_specs.set_model(OLS) - model_specs.start_of_training = model_specs.start_of_training - timedelta( - days=365 - ) - model_identifier = "failing-test model v1" - return model_specs, model_identifier, "linear-OLS" - - model_map["failing-test"] = test_specs - - @pytest.fixture(scope="module") def add_nearby_weather_sensors(db, add_weather_sensors) -> dict[str, Sensor]: temp_sensor_location = add_weather_sensors["temperature"].generic_asset.location diff --git a/flexmeasures/data/tests/test_forecasting_jobs.py b/flexmeasures/data/tests/test_forecasting_jobs.py index 3de6f85b79..e370122e96 100644 --- a/flexmeasures/data/tests/test_forecasting_jobs.py +++ b/flexmeasures/data/tests/test_forecasting_jobs.py @@ -1,277 +1,124 @@ -# flake8: noqa: E402 from __future__ import annotations -from datetime import datetime, timedelta +from datetime import datetime +import json import os -import numpy as np from rq.job import Job -from flexmeasures.data.models.data_sources import DataSource -from flexmeasures.data.models.time_series import Sensor, TimedBelief +from flexmeasures.data.models.forecasting.pipelines import TrainPredictPipeline +from flexmeasures.data.services.forecasting import handle_forecasting_exception from flexmeasures.utils.job_utils import work_on_rq -from flexmeasures.data.services.forecasting import ( - create_forecasting_jobs, - handle_forecasting_exception, -) from flexmeasures.utils.time_utils import as_server_time -def custom_model_params(): - """little training as we have little data, turn off transformations until they let this test run (TODO)""" - return dict( - training_and_testing_period=timedelta(hours=2), - outcome_var_transformation=None, - regressor_transformation={}, - ) - - -def get_data_source(model_identifier: str = "linear-OLS model v2"): - """This helper is a good way to check which model has been successfully used. - Only when the forecasting job is successful, will the created data source entry not be rolled back. - """ - data_source_name = "Seita (%s)" % model_identifier - return DataSource.query.filter_by( - name=data_source_name, type="forecasting script" - ).one_or_none() - - -def check_aggregate(overall_expected: int, horizon: timedelta, sensor_id: int): - """Check that the expected number of forecasts were made for the given horizon, - and check that each forecast is a number.""" - all_forecasts = ( - TimedBelief.query.filter(TimedBelief.sensor_id == sensor_id) - .filter(TimedBelief.belief_horizon == horizon) - .all() - ) - assert len(all_forecasts) == overall_expected - assert all([not np.isnan(f.event_value) for f in all_forecasts]) - - -def test_forecasting_an_hour_of_wind(db, run_as_cli, app, setup_test_data): - """Test one clean run of one job: - - data source was made, - - forecasts have been made - """ - # asset has only 1 power sensor - wind_device_1: Sensor = setup_test_data["wind-asset-1"].sensors[0] - - # Remove each seasonality, so we don't query test data that isn't there - wind_device_1.set_attribute("daily_seasonality", False) - wind_device_1.set_attribute("weekly_seasonality", False) - wind_device_1.set_attribute("yearly_seasonality", False) - - assert get_data_source() is None - - # makes 4 forecasts - horizon = timedelta(hours=1) - job = create_forecasting_jobs( - start_of_roll=as_server_time(datetime(2015, 1, 1, 6)), - end_of_roll=as_server_time(datetime(2015, 1, 1, 7)), - horizons=[horizon], - sensor_id=wind_device_1.id, - custom_model_params=custom_model_params(), - ) - - print("Job: %s" % job[0].id) - - work_on_rq(app.queues["forecasting"], exc_handler=handle_forecasting_exception) - - assert get_data_source() is not None - - forecasts = ( - TimedBelief.query.filter(TimedBelief.sensor_id == wind_device_1.id) - .filter(TimedBelief.belief_horizon == horizon) - .filter( - (TimedBelief.event_start >= as_server_time(datetime(2015, 1, 1, 7))) - & (TimedBelief.event_start < as_server_time(datetime(2015, 1, 1, 8))) - ) - .all() - ) - assert len(forecasts) == 4 - check_aggregate(4, horizon, wind_device_1.id) - - -def test_forecasting_two_hours_of_solar_at_edge_of_data_set( - db, run_as_cli, app, setup_test_data +def queue_forecasting_job( + sensor_id: int, + start: datetime, + end: datetime, + *, + config: dict | None = None, ): - # asset has only 1 power sensor - solar_device_1: Sensor = setup_test_data["solar-asset-1"].sensors[0] - - last_power_datetime = ( - ( - TimedBelief.query.filter(TimedBelief.sensor_id == solar_device_1.id) - .filter(TimedBelief.belief_horizon == timedelta(hours=0)) - .order_by(TimedBelief.event_start.desc()) - ) - .first() - .event_start - ) # datetime index of the last power value 11.45pm (Jan 1st) - - # makes 4 forecasts, 1 of which is for a new datetime index - horizon = timedelta(hours=6) - job = create_forecasting_jobs( - start_of_roll=last_power_datetime - - horizon - - timedelta(minutes=30), # start of data on which forecast is based (5.15pm) - end_of_roll=last_power_datetime - - horizon - + timedelta(minutes=30), # end of data on which forecast is based (6.15pm) - horizons=[ - timedelta(hours=6) - ], # so we want forecasts for 11.15pm (Jan 1st) to 0.15am (Jan 2nd) - sensor_id=solar_device_1.id, - custom_model_params=custom_model_params(), + pipeline = TrainPredictPipeline( + config=config + or { + "train-start": "2025-01-01T00:00:00+00:00", + "retrain-frequency": "PT1H", + } ) - print("Job: %s" % job[0].id) - - work_on_rq(app.queues["forecasting"], exc_handler=handle_forecasting_exception) - - forecasts = ( - TimedBelief.query.filter(TimedBelief.sensor_id == solar_device_1.id) - .filter(TimedBelief.belief_horizon == horizon) - .filter(TimedBelief.event_start > last_power_datetime) - .all() + return pipeline.compute( + as_job=True, + parameters={ + "sensor": sensor_id, + "start": as_server_time(start).isoformat(), + "end": as_server_time(end).isoformat(), + "max-forecast-horizon": "PT1H", + "forecast-frequency": "PT1H", + }, ) - assert len(forecasts) == 1 - check_aggregate(4, horizon, solar_device_1.id) def check_failures( redis_queue, failure_search_words: list[str] | None = None, - model_identifiers: list[str] | None = None, ): - """Check that there was at least one failure. - For each failure, the exception message can be checked for a search word - and the model identifier can also be compared to a string. - """ + """Check that there was at least one failed forecasting job.""" if os.name == "nt": print("Failed job registry not working on Windows. Skipping check...") return - failed = redis_queue.failed_job_registry - if failure_search_words is None: - failure_search_words = [] - if model_identifiers is None: - model_identifiers = [] - - failure_count = max(len(failure_search_words), len(model_identifiers), 1) + failed = redis_queue.failed_job_registry + failure_search_words = failure_search_words or [] + failure_count = max(len(failure_search_words), 1) - print( - "FAILURE QUEUE: %s" - % [ - Job.fetch(jid, connection=redis_queue.connection).meta - for jid in failed.get_job_ids() - ] - ) assert failed.count == failure_count for job_idx in range(failure_count): job = Job.fetch( failed.get_job_ids()[job_idx], connection=redis_queue.connection ) - - if len(failure_search_words) >= job_idx: + if failure_search_words: assert failure_search_words[job_idx] in job.latest_result().exc_string - if model_identifiers: - assert job.meta["model_identifier"] == model_identifiers[job_idx] - -def test_failed_forecasting_insufficient_data( - app, run_as_cli, clean_redis, setup_test_data +def test_failed_forecasting_job_does_not_enqueue_fallback( + app, + clean_redis, + setup_fresh_test_forecast_data_with_missing_data, ): - """This one (as well as the fallback) should fail as there is no underlying data. - (Power data is in 2015)""" - - # asset has only 1 power sensor - solar_device_1: Sensor = setup_test_data["solar-asset-1"].sensors[0] - - create_forecasting_jobs( - start_of_roll=as_server_time(datetime(2016, 1, 1, 20)), - end_of_roll=as_server_time(datetime(2016, 1, 1, 22)), - horizons=[timedelta(hours=1)], - sensor_id=solar_device_1.id, - custom_model_params=custom_model_params(), + sensor = setup_fresh_test_forecast_data_with_missing_data["solar-sensor"] + irradiance_sensor = setup_fresh_test_forecast_data_with_missing_data[ + "irradiance-sensor" + ] + + queue_forecasting_job( + sensor.id, + start=datetime(2025, 1, 25, 0), + end=datetime(2025, 1, 25, 2), + config={ + "train-start": "2025-01-01T00:00:00+00:00", + "retrain-frequency": "PT1H", + "missing-threshold": 0.0, + "future-regressors": [irradiance_sensor.id], + }, ) - work_on_rq(app.queues["forecasting"], exc_handler=handle_forecasting_exception) - check_failures(app.queues["forecasting"], 2 * ["NotEnoughDataException"]) - - -def test_failed_forecasting_invalid_horizon( - app, run_as_cli, clean_redis, setup_test_data -): - """This one (as well as the fallback) should fail as the horizon is invalid.""" - # asset has only 1 power sensor - solar_device_1: Sensor = setup_test_data["solar-asset-1"].sensors[0] - - create_forecasting_jobs( - start_of_roll=as_server_time(datetime(2015, 1, 1, 21)), - end_of_roll=as_server_time(datetime(2015, 1, 1, 23)), - horizons=[timedelta(hours=18)], - sensor_id=solar_device_1.id, - custom_model_params=custom_model_params(), - ) work_on_rq(app.queues["forecasting"], exc_handler=handle_forecasting_exception) - check_failures(app.queues["forecasting"], 2 * ["InvalidHorizonException"]) - - -def test_failed_unknown_model(app, clean_redis, setup_test_data): - """This one should fail because we use a model search term which yields no model configurator.""" - - # asset has only 1 power sensor - solar_device_1: Sensor = setup_test_data["solar-asset-1"].sensors[0] - - horizon = timedelta(hours=1) - cmp = custom_model_params() - cmp["training_and_testing_period"] = timedelta(days=365) - - create_forecasting_jobs( - start_of_roll=as_server_time(datetime(2015, 1, 1, 12)), - end_of_roll=as_server_time(datetime(2015, 1, 1, 14)), - horizons=[horizon], - sensor_id=solar_device_1.id, - model_search_term="no-one-knows-this", - custom_model_params=cmp, + check_failures( + app.queues["forecasting"], + ["NotEnoughDataException", "NotEnoughDataException"], ) - work_on_rq(app.queues["forecasting"], exc_handler=handle_forecasting_exception) - - check_failures(app.queues["forecasting"], ["No model found for search term"]) - + failed_job_ids = app.queues["forecasting"].failed_job_registry.get_job_ids() + for failed_job_id in failed_job_ids: + failed_job = Job.fetch( + failed_job_id, connection=app.queues["forecasting"].connection + ) + assert failed_job.meta["failures"] == 1 + assert failed_job.meta["exception"]["type"] == "NotEnoughDataException" + assert isinstance(failed_job.meta["exception"]["message"], str) + assert failed_job.meta["exception"]["message"] != "" + assert failed_job.meta.get("fallback_job_id") is None + assert app.queues["forecasting"].count == 0 -def test_forecasting_job_meta_is_json_serializable(app): - """Test that forecasting job meta is JSON-serializable. - This ensures the rq-dashboard job page can display forecasting jobs without errors, - as rq-dashboard calls json.dumps(job.get_meta()) when rendering job details. - See: https://github.com/Parallels/rq-dashboard/issues/510 - """ - import json +def test_forecasting_job_meta_is_json_serializable( + app, + setup_fresh_test_forecast_data, +): + sensor = setup_fresh_test_forecast_data["solar-sensor"] - horizon = timedelta(hours=1) - sensor_id = 1 # any int will do since enqueue=False (job won't run) - jobs = create_forecasting_jobs( - start_of_roll=as_server_time(datetime(2015, 1, 1, 6)), - end_of_roll=as_server_time(datetime(2015, 1, 1, 7)), - horizons=[horizon], - sensor_id=sensor_id, - enqueue=False, + pipeline_returns = queue_forecasting_job( + sensor.id, + start=datetime(2025, 1, 5, 0), + end=datetime(2025, 1, 5, 2), ) - assert len(jobs) == 1 - job = jobs[0] - # This should not raise a TypeError: Object of type datetime is not JSON serializable + job = app.queues["forecasting"].fetch_job(pipeline_returns["job_id"]) meta_json = json.dumps(job.get_meta()) assert meta_json is not None meta = json.loads(meta_json) - assert "model_search_term" in meta - assert "forecast_kwargs" in meta - assert meta["forecast_kwargs"]["sensor_id"] == sensor_id - # Verify horizon, start, end are stored as strings (not datetime/timedelta objects) - assert isinstance(meta["forecast_kwargs"]["horizon"], str) - assert isinstance(meta["forecast_kwargs"]["start"], str) - assert isinstance(meta["forecast_kwargs"]["end"], str) + assert meta["sensor_id"] == sensor.id + assert isinstance(meta["start"], str) + assert isinstance(meta["end"], str) diff --git a/flexmeasures/data/tests/test_forecasting_jobs_fresh_db.py b/flexmeasures/data/tests/test_forecasting_jobs_fresh_db.py index 25e862fd1f..cd062ec8cd 100644 --- a/flexmeasures/data/tests/test_forecasting_jobs_fresh_db.py +++ b/flexmeasures/data/tests/test_forecasting_jobs_fresh_db.py @@ -1,195 +1,37 @@ -from datetime import timedelta, datetime +from datetime import datetime -import pytest -from sqlalchemy import select, Select +from rq.job import Job +from sqlalchemy import select -from flexmeasures.data.models.time_series import Sensor, TimedBelief -from flexmeasures.data.services.forecasting import ( - create_forecasting_jobs, - handle_forecasting_exception, -) -from flexmeasures.data.tests.test_forecasting_jobs import ( - custom_model_params, - check_aggregate, - check_failures, - get_data_source, -) +from flexmeasures.data.models.time_series import TimedBelief +from flexmeasures.data.services.forecasting import handle_forecasting_exception +from flexmeasures.data.tests.test_forecasting_jobs import queue_forecasting_job from flexmeasures.utils.job_utils import work_on_rq -from flexmeasures.utils.time_utils import as_server_time -def test_forecasting_three_hours_of_wind( - app, run_as_cli, setup_fresh_test_data, clean_redis, fresh_db -): - # asset has only 1 power sensor - wind_device_2: Sensor = setup_fresh_test_data["wind-asset-2"].sensors[0] - - # makes 12 forecasts - horizon = timedelta(hours=1) - job = create_forecasting_jobs( - start_of_roll=as_server_time(datetime(2015, 1, 1, 10)), - end_of_roll=as_server_time(datetime(2015, 1, 1, 13)), - horizons=[horizon], - sensor_id=wind_device_2.id, - custom_model_params=custom_model_params(), - ) - print("Job: %s" % job[0].id) - - work_on_rq(app.queues["forecasting"], exc_handler=handle_forecasting_exception) - - forecasts = fresh_db.session.scalars( - select(TimedBelief) - .filter(TimedBelief.sensor_id == wind_device_2.id) - .filter(TimedBelief.belief_horizon == horizon) - .filter( - (TimedBelief.event_start >= as_server_time(datetime(2015, 1, 1, 11))) - & (TimedBelief.event_start < as_server_time(datetime(2015, 1, 1, 14))) - ) - ).all() - assert len(forecasts) == 12 - check_aggregate(12, horizon, wind_device_2.id) - - -def test_forecasting_two_hours_of_solar( - app, run_as_cli, setup_fresh_test_data, clean_redis, fresh_db -): - # asset has only 1 power sensor - solar_device_1: Sensor = setup_fresh_test_data["solar-asset-1"].sensors[0] - - # makes 8 forecasts - horizon = timedelta(hours=1) - job = create_forecasting_jobs( - start_of_roll=as_server_time(datetime(2015, 1, 1, 12)), - end_of_roll=as_server_time(datetime(2015, 1, 1, 14)), - horizons=[horizon], - sensor_id=solar_device_1.id, - custom_model_params=custom_model_params(), - ) - print("Job: %s" % job[0].id) - - work_on_rq(app.queues["forecasting"], exc_handler=handle_forecasting_exception) - forecasts = fresh_db.session.scalars( - select(TimedBelief) - .filter(TimedBelief.sensor_id == solar_device_1.id) - .filter(TimedBelief.belief_horizon == horizon) - .filter( - (TimedBelief.event_start >= as_server_time(datetime(2015, 1, 1, 13))) - & (TimedBelief.event_start < as_server_time(datetime(2015, 1, 1, 15))) - ) - ).all() - assert len(forecasts) == 8 - check_aggregate(8, horizon, solar_device_1.id) - - -@pytest.mark.parametrize( - "model_to_start_with, model_version", - [ - ("failing-test", 1), - ("linear-OLS", 2), - ], -) -def test_failed_model_with_too_much_training_then_succeed_with_fallback( +def test_forecasting_job_runs_on_fresh_db( app, - run_as_cli, clean_redis, - setup_fresh_test_data, - model_to_start_with, - model_version, fresh_db, + setup_fresh_test_forecast_data, ): - """ - Here we fail once - because we start with a model that needs too much training. - So we check for this failure happening as expected. - But then, we do succeed with the fallback model one level down. - (fail-test falls back to linear & linear falls back to naive). - As a result, there should be forecasts in the DB. - """ - # asset has only 1 power sensor - solar_device_1: Sensor = setup_fresh_test_data["solar-asset-1"].sensors[0] - - # Remove each seasonality, so we don't query test data that isn't there - solar_device_1.set_attribute("daily_seasonality", False) - solar_device_1.set_attribute("weekly_seasonality", False) - solar_device_1.set_attribute("yearly_seasonality", False) + sensor = setup_fresh_test_forecast_data["solar-sensor"] - horizon_hours = 1 - horizon = timedelta(hours=horizon_hours) + pipeline_returns = queue_forecasting_job( + sensor.id, + start=datetime(2025, 1, 5, 0), + end=datetime(2025, 1, 5, 2), + ) - cmp = custom_model_params() - hour_start = 5 - if model_to_start_with == "linear-OLS": - # making the linear model fail and fall back to naive - hour_start = 3 # Todo: explain this parameter; why would it fail to forecast if data is there for the full day? + job = app.queues["forecasting"].fetch_job(pipeline_returns["job_id"]) + assert job is not None - # The failed test model (this failure enqueues a new job) - create_forecasting_jobs( - start_of_roll=as_server_time(datetime(2015, 1, 1, hour_start)), - end_of_roll=as_server_time(datetime(2015, 1, 1, hour_start + 2)), - horizons=[horizon], - sensor_id=solar_device_1.id, - model_search_term=model_to_start_with, - custom_model_params=cmp, - ) work_on_rq(app.queues["forecasting"], exc_handler=handle_forecasting_exception) - # Check if the correct model failed in the expected way - check_failures( - app.queues["forecasting"], - ["NotEnoughDataException"], - ["%s model v%d" % (model_to_start_with, model_version)], - ) - - # this query is useful to check data: - def make_query(the_horizon_hours: int) -> Select: - the_horizon = timedelta(hours=the_horizon_hours) - return ( - select(TimedBelief) - .filter(TimedBelief.sensor_id == solar_device_1.id) - .filter(TimedBelief.belief_horizon == the_horizon) - .filter( - ( - TimedBelief.event_start - >= as_server_time( - datetime(2015, 1, 1, hour_start + the_horizon_hours) - ) - ) - & ( - TimedBelief.event_start - < as_server_time( - datetime(2015, 1, 1, hour_start + the_horizon_hours + 2) - ) - ) - ) - ) + refreshed_job = Job.fetch(job.id, connection=app.queues["forecasting"].connection) + assert refreshed_job.is_finished - # The successful (linear or naive) OLS leads to these. forecasts = fresh_db.session.scalars( - make_query(the_horizon_hours=horizon_hours) + select(TimedBelief).filter(TimedBelief.sensor_id == sensor.id) ).all() - - assert len(forecasts) == 8 - check_aggregate(8, horizon, solar_device_1.id) - - if model_to_start_with == "linear-OLS": - existing_data = fresh_db.session.scalars(make_query(the_horizon_hours=0)).all() - - for ed, fd in zip(existing_data, forecasts): - assert ed.event_value == fd.event_value - - # Now to check which models actually got to work. - # We check which data sources do and do not exist by now: - assert ( - get_data_source("failing-test model v1") is None - ) # the test failure model failed -> no data source - if model_to_start_with == "linear-OLS": - assert ( - get_data_source() is None - ) # the default (linear regression) (was made to) fail, as well - assert ( - get_data_source("naive model v1") is not None - ) # the naive one had to be used - else: - assert get_data_source() is not None # the default (linear regression) - assert ( - get_data_source("naive model v1") is None - ) # the naive one did not have to be used + assert forecasts diff --git a/flexmeasures/data/tests/test_job_cache.py b/flexmeasures/data/tests/test_job_cache.py index 14c464c399..d5688347f7 100644 --- a/flexmeasures/data/tests/test_job_cache.py +++ b/flexmeasures/data/tests/test_job_cache.py @@ -11,34 +11,35 @@ from rq.job import NoSuchJobError from flexmeasures.data.models.time_series import Sensor +from flexmeasures.data.models.forecasting.pipelines import TrainPredictPipeline from flexmeasures.data.services.job_cache import JobCache, NoRedisConfigured -from flexmeasures.data.services.forecasting import create_forecasting_jobs from flexmeasures.data.services.scheduling import create_scheduling_job from flexmeasures.utils.time_utils import as_server_time -def custom_model_params(): - """little training as we have little data, turn off transformations until they let this test run (TODO)""" - return dict( - training_and_testing_period=timedelta(hours=2), - outcome_var_transformation=None, - regressor_transformation={}, - ) - - def test_cache_on_create_forecasting_jobs(db, run_as_cli, app, setup_test_data): """Test we add job to cache on creating forecasting job + get job from cache""" wind_device_1: Sensor = setup_test_data["wind-asset-1"].sensors[0] - job = create_forecasting_jobs( - start_of_roll=as_server_time(datetime(2015, 1, 1, 6)), - end_of_roll=as_server_time(datetime(2015, 1, 1, 7)), - horizons=[timedelta(hours=1)], - sensor_id=wind_device_1.id, - custom_model_params=custom_model_params(), + pipeline = TrainPredictPipeline( + config={ + "train-start": "2015-01-01T00:00:00+00:00", + "retrain-frequency": "PT1H", + } + ) + pipeline_returns = pipeline.compute( + as_job=True, + parameters={ + "sensor": wind_device_1.id, + "start": as_server_time(datetime(2015, 1, 1, 6)).isoformat(), + "end": as_server_time(datetime(2015, 1, 1, 7)).isoformat(), + "max-forecast-horizon": "PT1H", + "forecast-frequency": "PT1H", + }, ) + job = app.queues["forecasting"].fetch_job(pipeline_returns["job_id"]) - assert app.job_cache.get(wind_device_1.id, "forecasting", "sensor") == [job[0]] + assert app.job_cache.get(wind_device_1.id, "forecasting", "sensor") == [job] def test_cache_on_create_scheduling_jobs(db, app, add_battery_assets, setup_test_data): diff --git a/flexmeasures/utils/time_utils.py b/flexmeasures/utils/time_utils.py index 0c9ba62cf0..382192c462 100644 --- a/flexmeasures/utils/time_utils.py +++ b/flexmeasures/utils/time_utils.py @@ -257,37 +257,6 @@ def get_first_day_of_next_month() -> datetime: return (datetime.now().replace(day=1) + timedelta(days=32)).replace(day=1) -def forecast_horizons_for(resolution: str | timedelta) -> list[str] | list[timedelta]: - """Return a list of horizons that are supported per resolution. - Return values or of the same type as the input.""" - if isinstance(resolution, timedelta): - resolution_str = timedelta_to_pandas_freq_str(resolution) - else: - resolution_str = resolution - horizons = [] - if resolution_str in ("5T", "5min", "10T", "10min"): - horizons = ["1h", "6h", "24h"] - elif resolution_str in ("15T", "15min", "1h", "H"): - horizons = ["1h", "6h", "24h", "48h"] - elif resolution_str in ("24h", "D"): - horizons = ["24h", "48h"] - elif resolution_str in ("168h", "7D"): - horizons = ["168h"] - if isinstance(resolution, timedelta): - return [pd.to_timedelta(to_offset(h)) for h in horizons] - else: - return horizons - - -def supported_horizons() -> list[timedelta]: - return [ - timedelta(hours=1), - timedelta(hours=6), - timedelta(hours=24), - timedelta(hours=48), - ] - - def timedelta_to_pandas_freq_str(resolution: timedelta) -> str: return to_offset(resolution).freqstr diff --git a/pyproject.toml b/pyproject.toml index 2b8e1e63d3..ac2b8b5aa0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -63,7 +63,6 @@ dependencies = [ # pinned to <6.9 due to a HiGHS deadlock, see https://github.com/FlexMeasures/flexmeasures/issues/1443 "pyomo>=5.6,<6.9", "tabulate>=0.9.0", - "timetomodel>=0.7.3", # 3.5.2: fixed issue with resampling to instantaneous "timely-beliefs[forecast]>=3.5.4", "python-dotenv>=1.2.1", diff --git a/uv.lock b/uv.lock index ecc01545fd..bc9df5e649 100644 --- a/uv.lock +++ b/uv.lock @@ -1094,7 +1094,6 @@ dependencies = [ { name = "sqlalchemy" }, { name = "tabulate" }, { name = "timely-beliefs", extra = ["forecast"] }, - { name = "timetomodel" }, { name = "tldextract" }, { name = "u8darts" }, { name = "uniplot" }, @@ -1215,7 +1214,6 @@ requires-dist = [ { name = "sqlalchemy", specifier = ">=2.0" }, { name = "tabulate", specifier = ">=0.9.0" }, { name = "timely-beliefs", extras = ["forecast"], specifier = ">=3.5.4" }, - { name = "timetomodel", specifier = ">=0.7.3" }, { name = "tldextract", specifier = ">=5.3.1" }, { name = "u8darts", specifier = ">=0.29.0" }, { name = "uniplot", specifier = ">=0.12.1" }, @@ -3729,27 +3727,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/5d/2f/811d8183abae998effbbb1a191af2469a6c18bfcb4d3edfd4fe9c096c5f8/times-0.7-py2.py3-none-any.whl", hash = "sha256:71a47e488fb727fecbd2bc6b4e3559fd4425eb6e20c68b1418069325a4c58a37", size = 3826, upload-time = "2014-08-24T06:13:21.84Z" }, ] -[[package]] -name = "timetomodel" -version = "0.7.3" -source = { registry = "https://pypi.org/simple" } -dependencies = [ - { name = "matplotlib" }, - { name = "numpy" }, - { name = "pandas" }, - { name = "python-dateutil" }, - { name = "pytz" }, - { name = "scikit-learn" }, - { name = "scipy", version = "1.15.3", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version < '3.11'" }, - { name = "scipy", version = "1.17.1", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version >= '3.11'" }, - { name = "sqlalchemy" }, - { name = "statsmodels" }, -] -sdist = { url = "https://files.pythonhosted.org/packages/9e/51/948aa9b4e8498924181eb3d17533eff0b43483d46ca9201b6c96d24b410c/timetomodel-0.7.3.tar.gz", hash = "sha256:dd57e20f1fea58240d8a93f38d0d9f598fc0600e88eeac5dceecbf34a750e08c", size = 141996, upload-time = "2023-06-07T13:37:05.398Z" } -wheels = [ - { url = "https://files.pythonhosted.org/packages/65/2d/379105ee6650157b650c5a0da42ccab698fae50687d3ee481a2532bb1ba6/timetomodel-0.7.3-py2.py3-none-any.whl", hash = "sha256:031f0d2cd8d6320c9a8dfd35cf3d976bdc4f0d74f9322690d6ad74d05445ff05", size = 29820, upload-time = "2023-06-07T13:37:03.356Z" }, -] - [[package]] name = "tldextract" version = "5.3.1"