Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions rules/collect.smk
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,22 @@ rule process_costs:
),


rule create_renewable_profiles:
input:
expand(
resources("profile_{clusters}_{tech}.nc"),
tech=[
tech
for tech in config["electricity"]["renewable_carriers"]
if tech != "hydro"
],
**config["scenario"],
run=config["run"]["name"],
),
message:
"Collection renewable profiles."


rule cluster_networks:
message:
"Collecting clustered network files"
Expand Down
42 changes: 38 additions & 4 deletions scripts/_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -1024,35 +1024,69 @@ def rename_techs(label: str) -> str:
return label


def _get_netcdf_chunk_sizes(path: str) -> dict[str, int]:
"""Read chunk sizes from a netCDF file to align dask chunks with on-disk layout."""
import netCDF4

nc = netCDF4.Dataset(path)
chunks = {}
for v in nc.variables.values():
chunking = v.chunking()
if not isinstance(chunking, list) or len(v.dimensions) < 3:
continue
for dim, size in zip(v.dimensions, chunking):
if dim not in chunks:
chunks[dim] = size
break
nc.close()
return chunks


def load_cutout(
cutout_files: str | list[str], time: None | pd.DatetimeIndex = None
cutout_files: str | list[str],
time: None | pd.DatetimeIndex = None,
chunks: dict | None = None,
) -> atlite.Cutout:
"""
Load and optionally combine multiple cutout files.

Reads chunk sizes from the netCDF file on disk to align dask chunks with
the storage layout, loads data eagerly into memory, then re-chunks as dask
arrays so downstream computation can use the threaded scheduler without
HDF5 thread-safety issues.

Parameters
----------
cutout_files : str or list of str
Path to a single cutout file or a list of paths to multiple cutout files.
If a list is provided, the cutouts will be concatenated along the time dimension.
time : pd.DatetimeIndex, optional
If provided, select only the specified times from the cutout.
chunks : dict, optional
Dask chunk sizes for the returned cutout. If None, reads chunk sizes
from the netCDF file.

Returns
-------
atlite.Cutout
Merged cutout with optional time selection applied.
Cutout with in-memory data re-chunked as dask arrays.
"""
first_file = cutout_files if isinstance(cutout_files, str) else cutout_files[0]
if chunks is None:
chunks = _get_netcdf_chunk_sizes(first_file) or {"time": 100}

if isinstance(cutout_files, str):
cutout = atlite.Cutout(cutout_files)
cutout = atlite.Cutout(cutout_files, chunks=chunks)
elif isinstance(cutout_files, list):
cutout_da = [atlite.Cutout(c).data for c in cutout_files]
cutout_da = [atlite.Cutout(c, chunks=chunks).data for c in cutout_files]
combined_data = xr.concat(cutout_da, dim="time", data_vars="minimal")
cutout = atlite.Cutout(NamedTemporaryFile().name, data=combined_data)

if time is not None:
cutout.data = cutout.data.sel(time=time)

cutout.data = cutout.data.load().chunk(chunks)

return cutout


Expand Down
6 changes: 2 additions & 4 deletions scripts/build_daily_heat_demand.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@

import logging

import dask
import geopandas as gpd
import numpy as np
import xarray as xr
from dask.distributed import Client, LocalCluster

from scripts._helpers import (
configure_logging,
Expand All @@ -43,8 +43,7 @@
set_scenario_config(snakemake)

nprocesses = int(snakemake.threads)
cluster = LocalCluster(n_workers=nprocesses, threads_per_worker=1)
client = Client(cluster, asynchronous=True)
dask.config.set(scheduler="threads", num_workers=nprocesses)

cutout_name = snakemake.input.cutout

Expand All @@ -71,7 +70,6 @@
heat_demand = cutout.heat_demand(
matrix=M.T,
index=clustered_regions.index,
dask_kwargs=dict(scheduler=client),
show_progress=False,
).sel(time=daily)

Expand Down
9 changes: 3 additions & 6 deletions scripts/build_hac_features.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@

import logging

import dask
import geopandas as gpd
from atlite.aggregate import aggregate_matrix
from dask.distributed import Client

from scripts._helpers import (
configure_logging,
Expand All @@ -31,10 +31,7 @@
params = snakemake.params
nprocesses = int(snakemake.threads)

if nprocesses > 1:
client = Client(n_workers=nprocesses, threads_per_worker=1)
else:
client = None
dask.config.set(scheduler="threads", num_workers=nprocesses)

time = get_snapshots(params.snapshots, params.drop_leap_day)

Expand All @@ -47,6 +44,6 @@
aggregate_matrix, matrix=I, index=regions.index
)

ds = ds.load(scheduler=client)
ds = ds.load()

ds.to_netcdf(snakemake.output[0])
12 changes: 4 additions & 8 deletions scripts/build_line_rating.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@
import re

import atlite
import dask
import geopandas as gpd
import numpy as np
import pypsa
import xarray as xr
from dask.distributed import Client
from shapely.geometry import LineString as Line
from shapely.geometry import Point

Expand Down Expand Up @@ -73,7 +73,7 @@ def calculate_line_rating(
n: pypsa.Network,
cutout: atlite.Cutout,
show_progress: bool = True,
dask_kwargs: dict = None,
dask_kwargs: dict | None = None,
) -> xr.DataArray:
"""
Calculates the maximal allowed power flow in each line for each time step
Expand Down Expand Up @@ -144,16 +144,12 @@ def calculate_line_rating(
nprocesses = int(snakemake.threads)
show_progress = not snakemake.config["run"].get("disable_progressbar", True)
show_progress = show_progress and snakemake.config["atlite"]["show_progress"]
if nprocesses > 1:
client = Client(n_workers=nprocesses, threads_per_worker=1)
else:
client = None
dask_kwargs = {"scheduler": client}
dask.config.set(scheduler="threads", num_workers=nprocesses)

n = pypsa.Network(snakemake.input.base_network)
time = get_snapshots(snakemake.params.snapshots, snakemake.params.drop_leap_day)

cutout = load_cutout(snakemake.input.cutout, time=time)

da = calculate_line_rating(n, cutout, show_progress, dask_kwargs)
da = calculate_line_rating(n, cutout, show_progress)
da.to_netcdf(snakemake.output[0])
9 changes: 2 additions & 7 deletions scripts/build_renewable_profiles.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,12 @@
import time
from itertools import product

import dask
import geopandas as gpd
import numpy as np
import pandas as pd
import xarray as xr
from atlite.gis import ExclusionContainer
from dask.distributed import Client

from scripts._helpers import (
configure_logging,
Expand Down Expand Up @@ -140,10 +140,7 @@
if correction_factor != 1.0:
logger.info(f"correction_factor is set as {correction_factor}")

if nprocesses > 1:
client = Client(n_workers=nprocesses, threads_per_worker=1)
else:
client = None
dask.config.set(scheduler="threads", num_workers=nprocesses)

sns = get_snapshots(snakemake.params.snapshots, snakemake.params.drop_leap_day)

Expand Down Expand Up @@ -173,8 +170,6 @@
)

func = getattr(cutout, resource.pop("method"))
if client is not None:
resource["dask_kwargs"] = {"scheduler": client}

logger.info(
f"Calculate average capacity factor per grid cell for technology {technology}..."
Expand Down
6 changes: 2 additions & 4 deletions scripts/build_solar_thermal_profiles.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@

import logging

import dask
import geopandas as gpd
import numpy as np
import xarray as xr
from dask.distributed import Client, LocalCluster

from scripts._helpers import (
configure_logging,
Expand All @@ -36,8 +36,7 @@
set_scenario_config(snakemake)

nprocesses = int(snakemake.threads)
cluster = LocalCluster(n_workers=nprocesses, threads_per_worker=1)
client = Client(cluster, asynchronous=True)
dask.config.set(scheduler="threads", num_workers=nprocesses)

config = snakemake.params.solar_thermal
config.pop("cutout", None)
Expand Down Expand Up @@ -65,7 +64,6 @@
**config,
matrix=M_tilde.T,
index=clustered_regions.index,
dask_kwargs=dict(scheduler=client),
show_progress=False,
)

Expand Down
7 changes: 2 additions & 5 deletions scripts/build_temperature_profiles.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@

import logging

import dask
import geopandas as gpd
import numpy as np
import xarray as xr
from dask.distributed import Client, LocalCluster

from scripts._helpers import (
configure_logging,
Expand All @@ -41,8 +41,7 @@
set_scenario_config(snakemake)

nprocesses = int(snakemake.threads)
cluster = LocalCluster(n_workers=nprocesses, threads_per_worker=1)
client = Client(cluster, asynchronous=True)
dask.config.set(scheduler="threads", num_workers=nprocesses)

time = get_snapshots(snakemake.params.snapshots, snakemake.params.drop_leap_day)

Expand All @@ -66,7 +65,6 @@
temp_air = cutout.temperature(
matrix=M_tilde.T,
index=clustered_regions.index,
dask_kwargs=dict(scheduler=client),
show_progress=False,
)

Expand All @@ -75,7 +73,6 @@
temp_soil = cutout.soil_temperature(
matrix=M_tilde.T,
index=clustered_regions.index,
dask_kwargs=dict(scheduler=client),
show_progress=False,
)

Expand Down
Loading