diff --git a/config/config.default.yaml b/config/config.default.yaml index 146429bbc1..0286a2813e 100644 --- a/config/config.default.yaml +++ b/config/config.default.yaml @@ -404,20 +404,28 @@ renewable: natura: true excluder_resolution: 100 clip_p_max_pu: 0.01 - hydro: - cutout: default - carriers: - - ror - - PHS - - hydro - PHS_max_hours: 6 - hydro_max_hours: energy_capacity_totals_by_country - flatten_dispatch: false - flatten_dispatch_buffer: 0.2 - clip_min_inflow: 1.0 - eia_norm_year: false - eia_correct_by_capacity: false - eia_approximate_missing: false + hydro: + cutout: default + GloFAS_ERA5: + methods: GloFAS #'GloFAS' for GloFAS-ERA5 methodology instead of ERA5 Runoff + q_min: 0.2 # qmin fraction with respect to qmax for RoR conversion + eff: 0.85 # mean efficiency for inflow conversion to MW + reservoir_subcluster_distance_threshold: 1.0 + reservoir_subcluster_seasonal_weight: 0.5 + reservoir_subcluster_inflow_level_weight: 1.75 + reservoir_subcluster_max_hours_weight: 0.75 + carriers: + - ror + - PHS + - hydro + PHS_max_hours: 6 + hydro_max_hours: energy_capacity_totals_by_country + flatten_dispatch: false + flatten_dispatch_buffer: 0.2 + clip_min_inflow: 1.0 + eia_norm_year: false + eia_correct_by_capacity: false + eia_approximate_missing: false # docs in https://pypsa-eur.readthedocs.io/en/latest/configuration.html#conventional conventional: @@ -1108,7 +1116,7 @@ clustering: hac_features: - wnd100m - influx_direct - exclude_carriers: [] + exclude_carriers: ["hydro"] #necessary for GloFAS-ERA5 methodology consider_efficiency_classes: false aggregation_strategies: generators: diff --git a/scripts/add_electricity.py b/scripts/add_electricity.py index 84fd226dfd..48f7dfe86f 100755 --- a/scripts/add_electricity.py +++ b/scripts/add_electricity.py @@ -59,6 +59,8 @@ import pypsa import xarray as xr from pypsa.clustering.spatial import DEFAULT_ONE_PORT_STRATEGIES, normed_or_uniform +from sklearn.cluster import AgglomerativeClustering +from sklearn.preprocessing import StandardScaler from scripts._helpers import ( PYPSA_V1, @@ -731,6 +733,484 @@ def attach_existing_batteries( logger.info(f"Added {len(batt)} existing battery storage units\n({stats} MW)") +def _fill_reservoir_missing_values(df: pd.DataFrame) -> pd.DataFrame: + # safety check deletable + + df = df.copy() + + df["damheight_m"] = pd.to_numeric(df["damheight_m"], errors="coerce") + df["volume_mm3"] = pd.to_numeric(df["volume_mm3"], errors="coerce") + df["p_nom"] = pd.to_numeric(df["p_nom"], errors="coerce") + + median_head_country = df.groupby("country")["damheight_m"].median() + median_vol_country = df.groupby("country")["volume_mm3"].median() + + median_head_global = df["damheight_m"].median(skipna=True) + median_vol_global = df["volume_mm3"].median(skipna=True) + + for country in df["country"].dropna().unique(): + mask_country = df["country"] == country + + mask_head_nan = mask_country & df["damheight_m"].isna() + head_value = median_head_country.get(country, median_head_global) + df.loc[mask_head_nan, "damheight_m"] = head_value + + mask_vol_nan = mask_country & df["volume_mm3"].isna() + vol_value = median_vol_country.get(country, median_vol_global) + df.loc[mask_vol_nan, "volume_mm3"] = vol_value + + df.loc[df["damheight_m"] <= 0, "damheight_m"] = 1.0 + df = df.loc[df["p_nom"].notna() & (df["p_nom"] > 0)].copy() + + return df + + +def _safe_corr(x: pd.Series, y: pd.Series) -> float: + x = pd.to_numeric(x, errors="coerce") + y = pd.to_numeric(y, errors="coerce") + + valid = x.notna() & y.notna() + x_valid = x.loc[valid] + y_valid = y.loc[valid] + + if len(x_valid) < 2: + return 0.0 + + x_std = x_valid.std() + y_std = y_valid.std() + + if pd.isna(x_std) or pd.isna(y_std) or x_std == 0 or y_std == 0: + return 0.0 + + return float(x_valid.corr(y_valid)) + + +def _build_hydro_clustering_features( + inflow_plants: pd.DataFrame, + plants: pd.DataFrame, + seasonal_weight: float = 1.0, + inflow_level_weight: float = 1.0, + max_hours_weight: float = 1.0, +) -> pd.DataFrame: + plants = plants.copy() + inflow_plants = inflow_plants.copy() + + inflow_plants = inflow_plants.loc[:, plants.index] + + # Metric A: seasonal shape similarity to mean bus shape + # Each plant inflow is divided by its own mean inflow, then correlation is computed against the average normalized inflow shape of the bus + plant_mean_inflow = inflow_plants.mean(axis=0).replace(0.0, np.nan) + inflow_norm = inflow_plants.divide(plant_mean_inflow, axis=1) + + bus_mean_shape = inflow_norm.mean(axis=1) + + A = pd.DataFrame(index=plants.index) + A["seasonal_corr"] = [ + _safe_corr(inflow_norm[col], bus_mean_shape) for col in plants.index + ] + + # Metric B: inflow level relative to nominal capacity + # mean(inflow) / p_nom [equivalent to sum(inflow) / (len(inflow) * p_nom)] + # --------------------------------------------------------- + n_steps = len(inflow_plants.index) + + inflow_level = (inflow_plants.sum(axis=0) / (n_steps * plants["p_nom"])).replace( + [np.inf, -np.inf], np.nan + ) + + B = pd.DataFrame(index=plants.index) + B["inflow_level"] = inflow_level + + # Metric C: max_hours aka the volume + C = pd.DataFrame(index=plants.index) + C["max_hours"] = pd.to_numeric(plants["max_hours_plant"], errors="coerce").replace( + [np.inf, -np.inf], np.nan + ) + + # Fill NaNs metric-wise + for df_block in [A, B, C]: + for col in df_block.columns: + if df_block[col].isna().all(): + df_block[col] = 0.0 + elif df_block[col].isna().any(): + df_block[col] = df_block[col].fillna(df_block[col].median(skipna=True)) + + # Standardize each metric separately + scaler_A = StandardScaler() + A_std = pd.DataFrame( + scaler_A.fit_transform(A), + index=A.index, + columns=A.columns, + ) + + scaler_B = StandardScaler() + B_std = pd.DataFrame( + scaler_B.fit_transform(B), + index=B.index, + columns=B.columns, + ) + + scaler_C = StandardScaler() + C_std = pd.DataFrame( + scaler_C.fit_transform(C), + index=C.index, + columns=C.columns, + ) + + # Apply weights + A_std *= seasonal_weight + B_std *= inflow_level_weight + C_std *= max_hours_weight + + X = pd.concat([A_std, B_std, C_std], axis=1) + + return X + + +def _cluster_reservoirs_within_bus( + bus_plants: pd.DataFrame, + inflow_bus: pd.DataFrame, + distance_threshold: float = 2.0, + seasonal_weight: float = 1.0, + inflow_level_weight: float = 1.0, + max_hours_weight: float = 1.0, +) -> pd.Series: + # function that creates reservoir clusters to manage their aggregation within the bus. + # when `distance_threshold = 0`, each reservoir remains separate; + # when `distance_threshold = inf`, all reservoirs within the same bus are aggregated together (no clusters). + bus_plants = bus_plants.copy() + + if len(bus_plants) == 1: + return pd.Series(0, index=bus_plants.index, name="subcluster") + + X = _build_hydro_clustering_features( + inflow_plants=inflow_bus, + plants=bus_plants, + seasonal_weight=seasonal_weight, + inflow_level_weight=inflow_level_weight, + max_hours_weight=max_hours_weight, + ) + + if len(bus_plants) == 2: + dist = np.linalg.norm(X.iloc[0].values - X.iloc[1].values) + if dist <= distance_threshold: + labels = np.array([0, 0], dtype=int) + else: + labels = np.array([0, 1], dtype=int) + else: + model = AgglomerativeClustering( + n_clusters=None, + distance_threshold=distance_threshold, + linkage="ward", + ) + labels = model.fit_predict(X.values) + + return pd.Series(labels, index=bus_plants.index, name="subcluster") + + +def _compute_reservoir_max_hours( + df: pd.DataFrame, + reservoir_energy_efficiency: float = 0.70, +) -> pd.DataFrame: + # function that converts volume into the max_hours parameter + df = df.copy() + + df["reservoir_mwh"] = ( + df["volume_mm3"] + * 1e6 + * df["damheight_m"] + * 9.81 + * reservoir_energy_efficiency + * 1e-3 + / 3600 + ) + + df["max_hours_plant"] = (df["reservoir_mwh"] / df["p_nom"]).replace( + [np.inf, -np.inf], np.nan + ) + + return df + + +def attach_hydro_GloFAS( + n: pypsa.Network, + costs: pd.DataFrame, + ppl: pd.DataFrame, + profile_hydro: str, + carriers: list, + **params, +): + """ + Attach hydro generators and storage units to the network using + plant-level GloFAS/ERA5-based hydro profiles. + + Parameters + ---------- + n : pypsa.Network + The PyPSA network to attach the hydro units to. + costs : pd.DataFrame + DataFrame containing the cost data. + ppl : pd.DataFrame + DataFrame containing the power plant data. + profile_hydro : str + Path to the plant-level hydro profile dataset. + carriers : list + List of hydro energy carriers. + **params : + Additional parameters for the GloFAS hydro workflow. + """ + add_missing_carriers(n, carriers) + add_co2_emissions(n, costs, carriers) + + ror_agg = ppl.query('carrier == "ror"').copy() + phs_agg = ppl.query('carrier == "PHS"').copy() + res_plants_all = ppl.query('carrier == "hydro"').copy() + + for df in [ror_agg, phs_agg, res_plants_all]: + df.columns = df.columns.str.lower() + + ds = xr.open_dataset(profile_hydro) + + # Load raw hydro mapping for ROR aggregation + ppls_raw = pd.read_csv( + r"/home/dselva/projects/pypsa-eur/resources/Europe_2019_GloFAS_SABER/powerplants_s_100.csv", #####PATHHHH + index_col=0, + ) + + ppls_raw.index = ppls_raw.index.astype(str) + ppls_raw.columns = ppls_raw.columns.str.strip() + ppls_raw = ppls_raw.rename(columns={"Capacity": "p_nom"}) + + ppls_raw["carrier"] = ppls_raw["Technology"].replace( + { + "Run-Of-River": "ror", + "Reservoir": "hydro", + "Pumped Storage": "PHS", + } + ) + + ppls_raw = ppls_raw[ppls_raw["carrier"].isin(["ror", "hydro", "PHS"])].copy() + + ppls_raw["group_key_agg"] = ( + ppls_raw["bus"].astype(str) + " " + ppls_raw["carrier"].astype(str) + ) + + ppls_raw["group_key_disagg"] = ( + ppls_raw["bus"].astype(str) + + " " + + ppls_raw["carrier"].astype(str) + + " " + + ppls_raw.index.astype(str) + ) + + # ROR aggregation + p_max_pu_ror_agg = None + + if not ror_agg.empty and "p_max_pu_ror" in ds: + p_max_pu_ror_plants = ds["p_max_pu_ror"].to_pandas() + p_max_pu_ror_plants.columns = p_max_pu_ror_plants.columns.astype(str) + + common = ppls_raw.index.intersection(p_max_pu_ror_plants.columns) + + p_max_pu_ror_plants = p_max_pu_ror_plants[common] + + ror_plants = ppls_raw.loc[common].copy() + ror_plants = ror_plants[ror_plants["carrier"] == "ror"].copy() + + ror_target_index = pd.Index(ror_agg.index.astype(str)) + + ror_plants["group_key"] = np.where( + ror_plants["group_key_disagg"].isin(ror_target_index), + ror_plants["group_key_disagg"], + ror_plants["group_key_agg"], + ) + + p_max_pu_ror_plants = p_max_pu_ror_plants[ror_plants.index] + + inflow_equiv = p_max_pu_ror_plants.multiply(ror_plants["p_nom"], axis=1) + + inflow_equiv_agg = inflow_equiv.T.groupby(ror_plants["group_key"]).sum().T + + p_nom_ror_agg = ror_plants.groupby("group_key")["p_nom"].sum() + + p_max_pu_ror_agg = inflow_equiv_agg.divide(p_nom_ror_agg, axis=1).clip( + upper=1.0 + ) + + p_max_pu_ror_agg = p_max_pu_ror_agg.reindex(columns=ror_agg.index) + + # Reservoir custom aggregation inside each final bus + res_agg = None + inflow_res_agg = None + + if not res_plants_all.empty and "inflow_reservoir" in ds: + if "source_id" not in res_plants_all.columns: + raise ValueError( + "source_id column missing in ppl for disaggregated hydro workflow." + ) + + inflow_res_plants = ds["inflow_reservoir"].to_pandas() + inflow_res_plants.columns = inflow_res_plants.columns.astype(str) + + res_plants_all["source_id"] = res_plants_all["source_id"].astype(str) + + # fill missing data and compute plant-level max_hours + res_plants_all = _fill_reservoir_missing_values(res_plants_all) + res_plants_all = _compute_reservoir_max_hours(res_plants_all) + + # Keep only plants for which inflow exists + common_mask = res_plants_all["source_id"].isin(inflow_res_plants.columns) + res_plants_use = res_plants_all.loc[common_mask].copy() + + if res_plants_use.empty: + raise ValueError( + "No hydro reservoir plants matched inflow_reservoir columns." + ) + + inflow_res_use = inflow_res_plants[res_plants_use["source_id"]].copy() + inflow_res_use.columns = res_plants_use.index + + subgroup_records = [] + + glofas_cfg = params.get("GloFAS_ERA5", {}) + + distance_threshold = glofas_cfg.get( + "reservoir_subcluster_distance_threshold", 1.0 + ) + seasonal_weight = glofas_cfg.get("reservoir_subcluster_seasonal_weight", 0.5) + inflow_level_weight = glofas_cfg.get( + "reservoir_subcluster_inflow_level_weight", 1.75 + ) + max_hours_weight = glofas_cfg.get("reservoir_subcluster_max_hours_weight", 0.75) + + # Cluster separately inside each final bus + for bus, bus_plants in res_plants_use.groupby("bus"): + inflow_bus = inflow_res_use[bus_plants.index] + + labels = _cluster_reservoirs_within_bus( + bus_plants=bus_plants, + inflow_bus=inflow_bus, + distance_threshold=distance_threshold, + seasonal_weight=seasonal_weight, + inflow_level_weight=inflow_level_weight, + max_hours_weight=max_hours_weight, + ) + + bus_plants = bus_plants.copy() + bus_plants["subcluster"] = labels + bus_plants["group_key_final"] = [ + f"{bus} hydro sg{int(lbl)}" for lbl in bus_plants["subcluster"] + ] + + subgroup_records.append(bus_plants) + + res_plants_grouped = pd.concat(subgroup_records, axis=0) + + # Aggregate inflow by subgroup + inflow_res_agg = ( + inflow_res_use.T.groupby(res_plants_grouped["group_key_final"]).sum().T + ) + + # Aggregate static parameters by subgroup + p_nom_agg = res_plants_grouped.groupby("group_key_final")["p_nom"].sum() + reservoir_energy_agg = res_plants_grouped.groupby("group_key_final")[ + "reservoir_mwh" + ].sum() + + hydro_max_hours = ( + (reservoir_energy_agg / p_nom_agg) + .replace([np.inf, -np.inf], np.nan) + .fillna(6.0) + ) + + bus_agg = res_plants_grouped.groupby("group_key_final")["bus"].first() + country_agg = res_plants_grouped.groupby("group_key_final")["country"].first() + + damheight_agg = ( + (res_plants_grouped["damheight_m"] * res_plants_grouped["p_nom"]) + .groupby(res_plants_grouped["group_key_final"]) + .sum() + .divide(p_nom_agg) + ) + + volume_agg = res_plants_grouped.groupby("group_key_final")["volume_mm3"].sum() + n_plants_agg = res_plants_grouped.groupby("group_key_final").size() + inflow_ann_agg = inflow_res_agg.sum(axis=0) + + res_agg = pd.DataFrame( + { + "bus": bus_agg, + "country": country_agg, + "p_nom": p_nom_agg, + "damheight_m": damheight_agg, + "volume_mm3": volume_agg, + "max_hours": hydro_max_hours, + "n_plants_subgroup": n_plants_agg, + "annual_inflow_mw_sum": inflow_ann_agg, + } + ) + + inflow_res_agg = inflow_res_agg.reindex(columns=res_agg.index) + + # Attach ROR + if "ror" in carriers and not ror_agg.empty and p_max_pu_ror_agg is not None: + n.add( + "Generator", + ror_agg.index, + carrier="ror", + bus=ror_agg["bus"], + p_nom=ror_agg["p_nom"], + efficiency=costs.at["ror", "efficiency"], + capital_cost=costs.at["ror", "capital_cost"], + weight=ror_agg["p_nom"], + p_max_pu=p_max_pu_ror_agg, + ) + + # 7) Attach Reservoir + if "hydro" in carriers and res_agg is not None and not res_agg.empty: + n.add( + "StorageUnit", + res_agg.index, + carrier="hydro", + bus=res_agg["bus"], + p_nom=res_agg["p_nom"], + max_hours=res_agg["max_hours"], + capital_cost=costs.at["hydro", "capital_cost"], + marginal_cost=costs.at["hydro", "marginal_cost"], + p_max_pu=1.0, + p_min_pu=0.0, + efficiency_dispatch=costs.at["hydro", "efficiency"], + efficiency_store=0.0, + cyclic_state_of_charge=True, + inflow=inflow_res_agg.loc[:, res_agg.index] + if inflow_res_agg is not None + else None, + ) + + # Attach PHS + if "PHS" in carriers and not phs_agg.empty: + max_hours_default = params.get("PHS_max_hours", 6) + + phs_agg["max_hours"] = ( + phs_agg["max_hours"].replace([0, np.nan], max_hours_default) + if "max_hours" in phs_agg + else max_hours_default + ) + + n.add( + "StorageUnit", + phs_agg.index, + carrier="PHS", + bus=phs_agg["bus"], + p_nom=phs_agg["p_nom"], + max_hours=phs_agg["max_hours"], + capital_cost=costs.at["PHS", "capital_cost"], + efficiency_store=np.sqrt(costs.at["PHS", "efficiency"]), + efficiency_dispatch=np.sqrt(costs.at["PHS", "efficiency"]), + cyclic_state_of_charge=True, + ) + + def attach_hydro( n: pypsa.Network, costs: pd.DataFrame, @@ -1272,17 +1752,32 @@ def attach_stores( ) if "hydro" in renewable_carriers: - p = params.renewable["hydro"] - carriers = p.pop("carriers", []) - attach_hydro( - n, - costs, - ppl, - snakemake.input.profile_hydro, - snakemake.input.hydro_capacities, - carriers, - **p, - ) + p = params.renewable["hydro"].copy() + + carriers = p.get("carriers", ["ror", "PHS", "hydro"]) + + glofas_cfg = p.get("GloFAS_ERA5", {}) + use_glofas = glofas_cfg.get("methods", False) + + if use_glofas: + attach_hydro_GloFAS( + n, + costs, + ppl, + snakemake.input.profile_hydro, + carriers, + **p, + ) + else: + attach_hydro( + n, + costs, + ppl, + snakemake.input.profile_hydro, + snakemake.input.hydro_capacities, + carriers, + **p, + ) estimate_renewable_caps = params.electricity["estimate_renewable_capacities"] if estimate_renewable_caps["enable"]: diff --git a/scripts/build_hydro_profile.py b/scripts/build_hydro_profile.py index 7a4e2d9461..850b0f588f 100644 --- a/scripts/build_hydro_profile.py +++ b/scripts/build_hydro_profile.py @@ -28,6 +28,7 @@ import country_converter as coco import geopandas as gpd import pandas as pd +import xarray as xr from numpy.polynomial import Polynomial from scripts._helpers import ( @@ -141,6 +142,97 @@ def approximate_missing_eia_stats( return pd.concat([eia_stats, eia_stats_approximated]).sort_index() +def add_qmax_turb(hydro_ppls, efficiency, q_min_factor=0.2): + # function that adds the maximum and minimum turbinable flow rates for each hydro plant + + hydro_ppls["technology"] = hydro_ppls["technology"].fillna("Run-Of-River") + + # Compute country-level median heads + ror_mask = hydro_ppls["technology"] == "Run-Of-River" + res_mask = hydro_ppls["technology"].isin(["Reservoir", "Pumped Storage"]) + + median_ror_country = hydro_ppls[ror_mask].groupby("country")["damheight_m"].median() + + median_res_country = hydro_ppls[res_mask].groupby("country")["damheight_m"].median() + + # Global fallback medians + median_ror_global = hydro_ppls.loc[ror_mask, "damheight_m"].median(skipna=True) + median_res_global = hydro_ppls.loc[res_mask, "damheight_m"].median(skipna=True) + + # Replace NaN heads using country medians for safety + mask_nan = hydro_ppls["damheight_m"].isna() + + for country in hydro_ppls["country"].unique(): + # ROR replacement + mask_country_ror = mask_nan & (hydro_ppls["country"] == country) & ror_mask + + country_median = median_ror_country.get(country, median_ror_global) + + hydro_ppls.loc[mask_country_ror, "damheight_m"] = country_median + + # Reservoir + PHS replacement + mask_country_res = mask_nan & (hydro_ppls["country"] == country) & res_mask + + country_median = median_res_country.get(country, median_res_global) + + hydro_ppls.loc[mask_country_res, "damheight_m"] = country_median + + # Replace zero or negative heads with non zero value + mask_zero_or_neg = hydro_ppls["damheight_m"] <= 0 + hydro_ppls.loc[mask_zero_or_neg, "damheight_m"] = 1.0 + + # Compute qmax and qmin + hydro_ppls["qmax_turb"] = hydro_ppls["p_nom"] / ( + 9.81 * hydro_ppls["damheight_m"] * 1e-3 * efficiency + ) + + hydro_ppls["qmin_turb"] = q_min_factor * hydro_ppls["qmax_turb"] + + return hydro_ppls + + +def ror_conversion(ror_plants, inflow_m3s): + # function that converts the inflows from ROR plants into relative output (p_max_pu) + + plant_ids = ror_plants.index.astype(str).values + inflow_sel = inflow_m3s.sel(plant=plant_ids).load() + + out = xr.zeros_like(inflow_sel) + + for pid in plant_ids: + qmax = float(ror_plants.loc[pid, "qmax_turb"]) + qmin = float(ror_plants.loc[pid, "qmin_turb"]) + + s = inflow_sel.sel(plant=pid) + + denom = (qmax - qmin) if (qmax - qmin) != 0 else 1.0 + + p = xr.where(s < qmin, 0.0, xr.where(s > qmax, 1.0, (s - qmin) / denom)) + + out.loc[dict(plant=pid)] = p + + df = out.to_dataframe(name="p_max_pu").reset_index() + return df.pivot(index="time", columns="plant", values="p_max_pu") + + +def reservoir_conversion(res_plants, inflow_m3s, eff): + # function that converts the inflows of reservoir plants from flow rate to MW + + plant_ids = res_plants.index.astype(str).values + inflow_sel = inflow_m3s.sel(plant=plant_ids).load() + + converted = {} + + for pid in plant_ids: + head = float(res_plants.loc[pid, "damheight_m"]) + + q = inflow_sel.sel(plant=pid).values + + converted[pid] = q * 9.81 * head * eff * 1e-3 + + return pd.DataFrame(converted, index=inflow_sel.coords["time"].values) + + logger = logging.getLogger(__name__) if __name__ == "__main__": @@ -198,20 +290,125 @@ def approximate_missing_eia_stats( if norm_year: eia_stats.loc[years_in_time] = eia_stats.loc[norm_year] elif missing_years.any(): - for year in missing_years: - eia_stats.loc[year] = eia_stats.median() - - inflow = cutout.runoff( - shapes=country_shapes, - smooth=True, - lower_threshold_quantile=True, - normalize_using_yearly=eia_stats, - ) + eia_stats.loc[missing_years] = eia_stats.median() - if full_years_available: - inflow = inflow.sel(time=time) + # Hydro inflow calculation method + config_hydro = snakemake.config["renewable"]["hydro"] + GloFAS_ERA5 = config_hydro.get("GloFAS_ERA5", {}) + method = GloFAS_ERA5.get("methods", False) + + if method == "GloFAS": + eff = GloFAS_ERA5.get("eff", 0.85) + q_min = GloFAS_ERA5.get("q_min", 0.2) + + # 1️⃣ Load powerplants + ppls = pd.read_csv( + r"/home/dselva/projects/pypsa-eur/resources/Europe_2019_GloFAS_SABER/powerplants_s_100.csv", # PATHHHHHHH + index_col=0, + ) + + ppls.columns = ppls.columns.str.strip().str.lower() + ppls.index = ppls.index.astype(str) + + hydro_ppls = ppls[ppls["fueltype"].str.lower() == "hydro"].copy() + hydro_ppls["technology"] = hydro_ppls["technology"].fillna("Run-Of-River") + hydro_ppls = hydro_ppls.rename(columns={"capacity": "p_nom"}) + hydro_ppls = add_qmax_turb(hydro_ppls, eff, q_min) + + # Load inflow + inflow_saber = xr.open_dataarray( + r"/home/dselva/projects/pypsa-eur/data/Europe_inflow_2019_SABER_regional.nc" # PATHHHHHH + ) + + inflow_saber = inflow_saber.assign_coords( + plant=inflow_saber.coords["plant"].astype(str) + ) + + if "clip_min_inflow" in params_hydro: + inflow_saber = inflow_saber.where( + inflow_saber > params_hydro["clip_min_inflow"], 0 + ) + + inflow_index = pd.Index(inflow_saber.coords["plant"].values.astype(str)) + hydro_index = hydro_ppls.index.astype(str) + + # Intersection only + common_index = inflow_index.intersection(hydro_index) + + print("Common plants:", len(common_index)) # logger + + # Check that all hydro plants have inflow + missing_inflow = hydro_index.difference(inflow_index) + if len(missing_inflow) > 0: + raise ValueError(f"{len(missing_inflow)} hydro plants have no inflow data.") + + # Now restrict inflow to matched plants only + inflow_matched = inflow_saber.sel(plant=common_index) + hydro_matched = hydro_ppls.loc[common_index] + + # Split technologies + ror_plants = hydro_matched[hydro_matched["technology"] == "Run-Of-River"] + + reservoir_plants = hydro_matched[ + hydro_matched["technology"].isin(["Reservoir", "Pumped Storage"]) + ] + + inflow_ror = inflow_matched.sel(plant=ror_plants.index) + inflow_res = inflow_matched.sel(plant=reservoir_plants.index) + + # Convert inflow + if not ror_plants.empty: + p_max_pu_ror = ror_conversion( + ror_plants, + inflow_ror, + ) + else: + p_max_pu_ror = pd.DataFrame() + + if not reservoir_plants.empty: + inflow_res_mw = reservoir_conversion(reservoir_plants, inflow_res, eff=eff) + else: + inflow_res_mw = pd.DataFrame() + + outputs = {} + + if not p_max_pu_ror.empty: + outputs["p_max_pu_ror"] = xr.DataArray( + p_max_pu_ror.values, + dims=("time", "plant"), + coords={ + "time": p_max_pu_ror.index, + "plant": p_max_pu_ror.columns.values, + }, + ) + + if not inflow_res_mw.empty: + outputs["inflow_reservoir"] = xr.DataArray( + inflow_res_mw.values, + dims=("time", "plant"), + coords={ + "time": inflow_res_mw.index, + "plant": inflow_res_mw.columns.values, + }, + ) + + inflow = xr.Dataset(outputs) + + print("Final dataset dims:", inflow.dims) - if "clip_min_inflow" in params_hydro: - inflow = inflow.where(inflow > params_hydro["clip_min_inflow"], 0) + else: + # Default PyPSA-Eur behaviour + inflow = cutout.runoff( + shapes=country_shapes, + smooth=True, + lower_threshold_quantile=True, + normalize_using_yearly=eia_stats, + ) + + if full_years_available: + inflow = inflow.sel(time=time) + + if "clip_min_inflow" in params_hydro: + inflow = inflow.where(inflow > params_hydro["clip_min_inflow"], 0) inflow.to_netcdf(snakemake.output.profile)