Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion config/config.default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ electricity:
# docs in https://pypsa-eur.readthedocs.io/en/latest/configuration.html#atlite
atlite:
default_cutout: "europe-2013-sarah3-era5"
nprocesses: 16
nprocesses: 1
show_progress: false
plot_availability_matrix: false
cutouts:
Expand Down
4 changes: 2 additions & 2 deletions config/schema.default.json
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@
"description": "Defines a default cutout. Can refer to a single cutout or a list of cutouts."
},
"nprocesses": {
"default": 16,
"default": 1,
"description": "Number of parallel processes in cutout preparation.",
"type": "integer"
},
Expand Down Expand Up @@ -8787,7 +8787,7 @@
"description": "Defines a default cutout. Can refer to a single cutout or a list of cutouts."
},
"nprocesses": {
"default": 16,
"default": 1,
"description": "Number of parallel processes in cutout preparation.",
"type": "integer"
},
Expand Down
2 changes: 2 additions & 0 deletions doc/release_notes.rst
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ Release Notes

* feat: Add options for carrier specific load shedding and load sinks configurable via `load_shedding` and `load_sinks` respectively (https://github.com/PyPSA/pypsa-eur/pull/2105).

* perf: Optimize dask settings for computing weather-dependent profiles (https://github.com/PyPSA/pypsa-eur/pull/2137).

PyPSA-Eur v2026.02.0 (18th February 2026)
=========================================

Expand Down
22 changes: 19 additions & 3 deletions scripts/_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#
# SPDX-License-Identifier: MIT

import atexit
import contextlib
import copy
import logging
Expand All @@ -12,6 +13,7 @@
from functools import partial, wraps
from pathlib import Path
from tempfile import NamedTemporaryFile
from typing import Literal

import atlite
import fiona
Expand All @@ -21,6 +23,7 @@
import requests
import xarray as xr
import yaml
from dask.distributed import Client, LocalCluster
from snakemake.utils import update_config
from tqdm import tqdm

Expand Down Expand Up @@ -1025,7 +1028,9 @@ def rename_techs(label: str) -> str:


def load_cutout(
cutout_files: str | list[str], time: None | pd.DatetimeIndex = None
cutout_files: str | list[str],
time: None | pd.DatetimeIndex = None,
chunks: Literal["auto"] | dict | None = "auto",
) -> atlite.Cutout:
"""
Load and optionally combine multiple cutout files.
Expand All @@ -1044,9 +1049,9 @@ def load_cutout(
Merged cutout with optional time selection applied.
"""
if isinstance(cutout_files, str):
cutout = atlite.Cutout(cutout_files)
cutout = atlite.Cutout(cutout_files, chunks=chunks)
elif isinstance(cutout_files, list):
cutout_da = [atlite.Cutout(c).data for c in cutout_files]
cutout_da = [atlite.Cutout(c, chunks=chunks).data for c in cutout_files]
combined_data = xr.concat(cutout_da, dim="time", data_vars="minimal")
cutout = atlite.Cutout(NamedTemporaryFile().name, data=combined_data)

Expand All @@ -1056,6 +1061,17 @@ def load_cutout(
return cutout


def setup_dask(nprocesses: int) -> dict:
if nprocesses > 1:
cluster = LocalCluster(n_workers=nprocesses, threads_per_worker=1)
client = Client(cluster, asynchronous=True)
atexit.register(client.shutdown)
else:
client = None

return dict(scheduler=client)


def load_costs(cost_file: str) -> pd.DataFrame:
"""
Load prepared cost data from CSV.
Expand Down
7 changes: 3 additions & 4 deletions scripts/build_daily_heat_demand.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@
import geopandas as gpd
import numpy as np
import xarray as xr
from dask.distributed import Client, LocalCluster

from scripts._helpers import (
configure_logging,
get_snapshots,
load_cutout,
set_scenario_config,
setup_dask,
)

logger = logging.getLogger(__name__)
Expand All @@ -43,8 +43,7 @@
set_scenario_config(snakemake)

nprocesses = int(snakemake.threads)
cluster = LocalCluster(n_workers=nprocesses, threads_per_worker=1)
client = Client(cluster, asynchronous=True)
dask_kwargs = setup_dask(nprocesses)

cutout_name = snakemake.input.cutout

Expand All @@ -71,7 +70,7 @@
heat_demand = cutout.heat_demand(
matrix=M.T,
index=clustered_regions.index,
dask_kwargs=dict(scheduler=client),
dask_kwargs=dask_kwargs,
show_progress=False,
).sel(time=daily)

Expand Down
9 changes: 3 additions & 6 deletions scripts/build_hac_features.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@

import geopandas as gpd
from atlite.aggregate import aggregate_matrix
from dask.distributed import Client

from scripts._helpers import (
configure_logging,
get_snapshots,
load_cutout,
set_scenario_config,
setup_dask,
)

logger = logging.getLogger(__name__)
Expand All @@ -31,10 +31,7 @@
params = snakemake.params
nprocesses = int(snakemake.threads)

if nprocesses > 1:
client = Client(n_workers=nprocesses, threads_per_worker=1)
else:
client = None
dask_kwargs = setup_dask(nprocesses)

time = get_snapshots(params.snapshots, params.drop_leap_day)

Expand All @@ -47,6 +44,6 @@
aggregate_matrix, matrix=I, index=regions.index
)

ds = ds.load(scheduler=client)
ds = ds.load(**dask_kwargs)

ds.to_netcdf(snakemake.output[0])
8 changes: 2 additions & 6 deletions scripts/build_line_rating.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import numpy as np
import pypsa
import xarray as xr
from dask.distributed import Client
from shapely.geometry import LineString as Line
from shapely.geometry import Point

Expand All @@ -42,6 +41,7 @@
get_snapshots,
load_cutout,
set_scenario_config,
setup_dask,
)

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -144,11 +144,7 @@ def calculate_line_rating(
nprocesses = int(snakemake.threads)
show_progress = not snakemake.config["run"].get("disable_progressbar", True)
show_progress = show_progress and snakemake.config["atlite"]["show_progress"]
if nprocesses > 1:
client = Client(n_workers=nprocesses, threads_per_worker=1)
else:
client = None
dask_kwargs = {"scheduler": client}
dask_kwargs = setup_dask(nprocesses)

n = pypsa.Network(snakemake.input.base_network)
time = get_snapshots(snakemake.params.snapshots, snakemake.params.drop_leap_day)
Expand Down
16 changes: 8 additions & 8 deletions scripts/build_renewable_profiles.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,13 +97,13 @@
import pandas as pd
import xarray as xr
from atlite.gis import ExclusionContainer
from dask.distributed import Client

from scripts._helpers import (
configure_logging,
get_snapshots,
load_cutout,
set_scenario_config,
setup_dask,
)
from scripts.build_shapes import _simplify_polys

Expand Down Expand Up @@ -140,10 +140,7 @@
if correction_factor != 1.0:
logger.info(f"correction_factor is set as {correction_factor}")

if nprocesses > 1:
client = Client(n_workers=nprocesses, threads_per_worker=1)
else:
client = None
dask_kwargs = setup_dask(nprocesses)

sns = get_snapshots(snakemake.params.snapshots, snakemake.params.drop_leap_day)

Expand Down Expand Up @@ -173,15 +170,17 @@
)

func = getattr(cutout, resource.pop("method"))
if client is not None:
resource["dask_kwargs"] = {"scheduler": client}

logger.info(
f"Calculate average capacity factor per grid cell for technology {technology}..."
)
start = time.time()

capacity_factor = correction_factor * func(capacity_factor=True, **resource)
capacity_factor = correction_factor * func(
capacity_factor=True,
dask_kwargs=dask_kwargs,
**resource,
)

duration = time.time() - start
logger.info(
Expand Down Expand Up @@ -265,6 +264,7 @@
index=matrix.indexes["bus_bin"],
per_unit=True,
return_capacity=False,
dask_kwargs=dask_kwargs,
**resource,
)
profile = profile.unstack("bus_bin")
Expand Down
7 changes: 3 additions & 4 deletions scripts/build_solar_thermal_profiles.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@
import geopandas as gpd
import numpy as np
import xarray as xr
from dask.distributed import Client, LocalCluster

from scripts._helpers import (
configure_logging,
get_snapshots,
load_cutout,
set_scenario_config,
setup_dask,
)

logger = logging.getLogger(__name__)
Expand All @@ -36,8 +36,7 @@
set_scenario_config(snakemake)

nprocesses = int(snakemake.threads)
cluster = LocalCluster(n_workers=nprocesses, threads_per_worker=1)
client = Client(cluster, asynchronous=True)
dask_kwargs = setup_dask(nprocesses)

config = snakemake.params.solar_thermal
config.pop("cutout", None)
Expand Down Expand Up @@ -65,7 +64,7 @@
**config,
matrix=M_tilde.T,
index=clustered_regions.index,
dask_kwargs=dict(scheduler=client),
dask_kwargs=dask_kwargs,
show_progress=False,
)

Expand Down
9 changes: 4 additions & 5 deletions scripts/build_temperature_profiles.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@
import geopandas as gpd
import numpy as np
import xarray as xr
from dask.distributed import Client, LocalCluster

from scripts._helpers import (
configure_logging,
get_snapshots,
load_cutout,
set_scenario_config,
setup_dask,
)

logger = logging.getLogger(__name__)
Expand All @@ -41,8 +41,7 @@
set_scenario_config(snakemake)

nprocesses = int(snakemake.threads)
cluster = LocalCluster(n_workers=nprocesses, threads_per_worker=1)
client = Client(cluster, asynchronous=True)
dask_kwargs = setup_dask(nprocesses)

time = get_snapshots(snakemake.params.snapshots, snakemake.params.drop_leap_day)

Expand All @@ -66,7 +65,7 @@
temp_air = cutout.temperature(
matrix=M_tilde.T,
index=clustered_regions.index,
dask_kwargs=dict(scheduler=client),
dask_kwargs=dask_kwargs,
show_progress=False,
)

Expand All @@ -75,7 +74,7 @@
temp_soil = cutout.soil_temperature(
matrix=M_tilde.T,
index=clustered_regions.index,
dask_kwargs=dict(scheduler=client),
dask_kwargs=dask_kwargs,
show_progress=False,
)

Expand Down
2 changes: 1 addition & 1 deletion scripts/lib/validation/config/atlite.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ class AtliteConfig(BaseModel):
description="Defines a default cutout. Can refer to a single cutout or a list of cutouts.",
)
nprocesses: int = Field(
16,
1,
description="Number of parallel processes in cutout preparation.",
)
show_progress: bool = Field(
Expand Down
Loading