-
-
Notifications
You must be signed in to change notification settings - Fork 371
Closed
Labels
bugPotential issues with the zarr-python libraryPotential issues with the zarr-python library
Description
Zarr version
2.16.1
Numcodecs version
0.12.1
Python Version
3.12.1
Operating System
Linux
Installation
using poetry in a virtual environment
Description
I am trying to write a zarr dataset using netcdf files. To try to limit memory usage, I am first creating daily zarr directories from hourly netcdf files, using xarray. Then I am combining the daily files into a monthly zarr directory. I finally want to write the monthly zarr to an azure blob storage. However, the process is killed (no stacktrace), I assume from running out of memory, when combining the daily zarrs to monthly. If I create a smaller final directory, for example combining two daily zarrs to one, it works fine. I am using xarray version 2024.2.0
Steps to reproduce
from datetime import datetime
from pathlib import Path
import pandas as pd
import xarray as xr
import zarr
from loguru import logger
#from met_data_to_horizon.horizon import (
# get_blob_service_client,
# get_container_client,
#)
weather_parameters_to_keep = [
"air_temperature_2m",
"air_pressure_at_sea_level",
"altitude",
"cloud_area_fraction",
"precipitation_amount",
"relative_humidity_2m",
"wind_direction_10m",
"wind_speed_10m",
]
# https://esipfed.github.io/cloud-computing-cluster/optimization-practices.html#chunk-size
# zarr tutorial recommends at least 1mb
# dask recommends ~100mb
# cloud usage - on the order of 10 - 100 mb
CHUNK_SIZES = {"time": 31 * 24, "x": 10, "y": 10}
def create_thredds_filename(date: pd.Timestamp) -> str:
return f"https://thredds.met.no/thredds/dodsC/metpparchive/{date.year}/{date.month:02d}/{date.day:02d}/met_analysis_1_0km_nordic_{date.strftime('%Y%m%dT%H')}Z.nc"
def read_monthly_netcdf_and_write_zarr_to_processed(year_and_month_folder: str) -> None:
start_date_month = pd.to_datetime(f"{year_and_month_folder}", format="%Y%m")
days_in_month = pd.date_range(
start=start_date_month,
end=start_date_month + pd.tseries.offsets.MonthEnd(0) + pd.to_timedelta("23h"),
freq="D",
)
zarr_filename_in_horizon = (
f"met_analysis_1_0km_nordic_{start_date_month.year}_{start_date_month.month:02d}"
)
data_dir = Path(__file__).parent / "data"
local_nc_file_dir = data_dir / "tmp_nc"
daily_zarr_dir = data_dir / "tmp_zarr_per_day"
local_nc_file_dir.mkdir(parents=True, exist_ok=True)
daily_zarr_dir.mkdir(parents=True, exist_ok=True)
for day in days_in_month:
day_str = f"{day.day:02d}"
day_nc_path = local_nc_file_dir / day_str
download_nc_files_per_day(day, day_nc_path)
write_daily_zarr_files_to_local_folder(
daily_zarr_dir, day_nc_path, day_str, zarr_filename_in_horizon
)
# shutil.rmtree(day_nc_path)
write_monthly_zarr_to_horizon_processed(daily_zarr_dir, zarr_filename_in_horizon)
# shutil.rmtree(daily_zarr_dir)
def download_nc_files_per_day(day: datetime, nc_path_per_day: Path) -> None:
hours_in_day = pd.date_range(start=day, end=day + pd.Timedelta("23h"), freq="h")
nc_path_per_day.mkdir(parents=True, exist_ok=True)
for timestamp in hours_in_day:
filename = create_thredds_filename(timestamp)
local_file_path = nc_path_per_day / Path(filename).name
if not local_file_path.exists():
# in my use case, we have a separate python service which downloads the netcdf files to
# our azure data platform, so in this step we download the data from the azure storage,
# but for a reproducible example, we'll download the netcdf files directly
xr.open_dataset(filename, engine="netcdf4").to_netcdf(local_file_path)
logger.info(f"Downloaded {local_file_path.name}")
def write_daily_zarr_files_to_local_folder(
daily_zarr_dir: Path,
day_nc_path: Path,
day_str: str,
zarr_filename_in_horizon: str,
chunk_sizes: dict | None = None,
) -> None:
if chunk_sizes is None:
chunk_sizes = CHUNK_SIZES
day_zarr_folder = daily_zarr_dir / f"{zarr_filename_in_horizon}_{day_str}"
if not day_zarr_folder.exists():
ds = xr.open_mfdataset(day_nc_path.glob("*.nc"), engine="netcdf4", combine="by_coords")[
weather_parameters_to_keep
]
logger.info("read dataset")
ds = ds.chunk(chunk_sizes)
logger.info("finished chunking")
ds.to_zarr(day_zarr_folder)
logger.info(f"created {day_str} zarr")
def write_monthly_zarr_to_horizon_processed(
daily_zarr_dir: Path,
zarr_filename_in_horizon: str,
chunk_sizes: dict | None = None,
) -> None:
if chunk_sizes is None:
chunk_sizes = CHUNK_SIZES
logger.info("Combining daily files to monthly..")
# The line below is where the process gets killed
ds = xr.open_mfdataset(daily_zarr_dir.glob("*"), engine="zarr", combine="by_coords")
logger.info("Done combining")
logger.info("Chunking data..")
ds = ds.chunk(chunk_sizes)
logger.info("Done chunking")
logger.info("Writing monthly data to horizon...")
# This part writes to our private cloud storage
# store = zarr.ABSStore(
# client=get_container_client(get_blob_service_client(layer="processed")),
# prefix=f"zarr/chunk_size_10/{zarr_filename_in_horizon}.zarr",
# )
# try:
# ds.to_zarr(store)
# except Exception as e:
# logger.info(f"Could not write monthly data to Horizon, error was {e}")
if __name__ == "__main__":
year_and_month_folder = "202312"
read_monthly_netcdf_and_write_zarr_to_processed(year_and_month_folder)Additional output
No response
Metadata
Metadata
Assignees
Labels
bugPotential issues with the zarr-python libraryPotential issues with the zarr-python library