diff --git a/xbout/__init__.py b/xbout/__init__.py index 9511b93d..b94242c6 100644 --- a/xbout/__init__.py +++ b/xbout/__init__.py @@ -1,4 +1,5 @@ from .load import open_boutdataset, collect +from .lazyload import lazy_open_boutdataset from . import geometries from .geometries import register_geometry, REGISTERED_GEOMETRIES @@ -22,6 +23,7 @@ __all__ = [ "open_boutdataset", + "lazy_open_boutdataset", "collect", "geometries", "register_geometry", diff --git a/xbout/lazyload.py b/xbout/lazyload.py new file mode 100644 index 00000000..aeb59940 --- /dev/null +++ b/xbout/lazyload.py @@ -0,0 +1,368 @@ +""" +Fast lazy-loading of BOUT++ multi-file datasets. + +Overview +-------- +BOUT++ writes output distributed across many NetCDF files, one per processor. +The standard xarray.open_mfdataset opens every file to read metadata before +constructing the dataset, which is slow when there are hundreds of files on a +parallel filesystem where each file open incurs a metadata server round-trip. + +This module avoids that overhead by reading metadata from a single file and +constructing a lazy dask-backed xarray Dataset without opening any other files. +Data is only read from disk when explicitly requested via .compute() or .load(). + +Method +------ +All BOUT++ output files share the same array shapes and metadata. The processor +layout (NXPE x NYPE grid) and array dimensions are read from the first file +(BOUT.dmp.0.nc). From this, the slice of the global array stored in each +processor's file is determined, accounting for MXG/MYG guard cells. + +A dask task graph is constructed as a Python dict, with one entry per processor +file per variable. Each task uses dask's internal getter() function, which +calls __getitem__ on a LazyFileArray object. LazyFileArray defers all file I/O +to h5py, reading only the requested hyperslab when the task is executed. + +Slice fusion +------------ +Dask's _optimize_slices pass (dask/array/optimization.py) recognizes chained +getter() tasks and fuses them into a single getter() call with composed slices. +Each variable's task graph uses two chained tasks per file: + + lazy task: LazyFileArray object + slice task: (getter, lazy_task_key, boundary_slices, False, False) + +The boundary slice removes MXG/MYG guard cells from the raw file data as needed, +depending on keep_xboundaries and keep_yboundaries. +When the user slices the resulting DataArray, dask fuses the user slice with +the boundary slice into a single composed slice, which is passed through +getter() to LazyFileArray.__getitem__() and then directly to h5py as an HDF5 +hyperslab selection. This means only the requested bytes are read from disk, +with a single file open per chunk per compute() call. + +Usage +----- + ds = lazy_open_boutdataset('/path/to/BOUT/dmp/files/') + ds = lazy_open_boutdataset('/path/to/files/', keep_xboundaries=True) + + # Data is not read until here: + data = ds['Ne'].isel(t=slice(10, 20)).compute() +""" + +import xarray as xr +import dask +from dask.array.core import getter +import os +import h5py +import warnings + + +class LazyFileArray: + """Presents a numpy-like interface that defers reads to HDF5 hyperslabs.""" + + def __init__(self, filepath, varname: str, shape, dtype, info: bool = False): + """ + filepath: str or Path + Full path to the file + varname: str + Name of the array to read + shape: tuple + The shape of the array in the file + dtype + Type of the elements in the array (e.g. numpy.float64) + info: bool + Print debugging information on read? + """ + self.filepath = filepath + self.varname = varname + self.shape = shape + self.dtype = dtype + self.ndim = len(shape) + self.info = info + + def __getitem__(self, slices): + """ + Read data hyperslice from file. + No cacheing is performed, so repeated access + will open and read the file again. + + Uses h5py to access data because it has lower overhead + when opening and closing files than NetCDF. + """ + if self.info: + print(f"Reading {self.filepath}:{self.varname}:{slices}") + with h5py.File(self.filepath, "r") as f: + return f[self.varname][slices] + + +def make_chunkinfo( + metadata: dict, keep_xboundaries: bool = True, keep_yboundaries: bool = True +): + """ + Identify processor layout and the array slices to be extracted + from each file. Handles single and double-null configurations. + + metadata: dict + Dictionary of scalars read from the first processor's file + keep_xboundaries: bool + Keep the MXG cells on inner and outer X boundaries + keep_yboundaries: bool + Keep MYG cells on all Y boundaries (upper and lower targets if DN) + + Returns a dict that is used in make_lazy_array. + """ + NXPE = metadata["NXPE"] + NYPE = metadata["NYPE"] + + MXG = metadata["MXG"] + MYG = metadata["MYG"] + + MXSUB = metadata["MXSUB"] + MYSUB = metadata["MYSUB"] + + # Double null if it has upper legs + is_double_null = metadata["jyseps2_1"] != metadata["jyseps1_2"] + + # Number of processors before the upper targets + nyproc_inner = metadata["ny_inner"] // MYSUB + + # Size of the (x,y) array in each file + nxsub = MXSUB + 2 * MXG + nysub = MYSUB + 2 * MYG + + # Indices and size of each file's chunk + xchunks = [] + xslices = [] + for i in range(NXPE): + xslice = slice( + 0 if (i == 0 and keep_xboundaries) else MXG, # Skip guard cells + nxsub if (i == (NXPE - 1) and keep_xboundaries) else nxsub - MXG, + ) + xchunks.append(xslice.stop - xslice.start) + xslices.append(xslice) + xchunks = tuple(xchunks) + + ychunks = [] + yslices = [] + for j in range(NYPE): + yslice = slice( + ( + 0 + if ( + keep_yboundaries + and ( + j == 0 # Lower inner target + or (is_double_null and j == nyproc_inner) + ) # Upper outer target + ) + else MYG + ), # Skip guard cells + ( + nysub + if ( + keep_yboundaries + and (j == (NYPE - 1) or (is_double_null and j == nyproc_inner - 1)) + ) + else (nysub - MYG) + ), + ) + ychunks.append(yslice.stop - yslice.start) + yslices.append(yslice) + ychunks = tuple(ychunks) + + return { + "NXPE": NXPE, # Number of processors in X + "NYPE": NYPE, # Number of processors in Y + "nxsub": nxsub, # X size of the array in each file + "nysub": nysub, # Y size of the array in each file + "xslices": xslices, # List of slices in X + "xchunks": xchunks, # Tuple of slice sizes in X + "yslices": yslices, # List of slices in Y + "ychunks": ychunks, # Tuple of slice sizes in Y + } + + +def make_lazy_array( + datafilepath, + ds, + chunkinfo: dict, + varname, + prefix: str = "BOUT.dmp", + info: bool = False, +): + """ + Creates a lazy-loaded array, gathering data + from a collection of NetCDF files. + + The array must have 'x' and 'y' dimensions but can + have an arbitrary number of other dimensions. + + datafilepath : str or Path + Directory containing BOUT.dmp.*.nc files + ds : xarray.DataSet + DataSet from one file + chunkinfo : dict + Describes processor layouts + varname : str + Name of the variable to read + """ + NXPE = chunkinfo["NXPE"] + NYPE = chunkinfo["NYPE"] + xslices = chunkinfo["xslices"] + yslices = chunkinfo["yslices"] + xchunks = chunkinfo["xchunks"] + ychunks = chunkinfo["ychunks"] + + # Get shape and type of the array in one file. + # These are assumed to be the same for all files + dtype = ds[varname].dtype + file_shape = ds[varname].shape + + # Find x and y dimension indices + xdim = ds[varname].dims.index("x") + ydim = ds[varname].dims.index("y") + ndims = len(file_shape) + + # Check x and y dimension sizes + assert file_shape[xdim] == chunkinfo["nxsub"] + assert file_shape[ydim] == chunkinfo["nysub"] + + # The name serves two purposes: + # 1. Graph key prefix — it's the first element of every task key tuple + # (name, i, j, ...). Dask uses these keys to identify tasks in the graph, + # so the name must be unique across all arrays in a computation to avoid + # accidental key collisions between arrays that would cause one array's + # tasks to silently substitute for another's. + # 2. Cache/fusion identity — when dask optimizes or fuses graphs, arrays with + # the same name are assumed to be identical. If two Array objects share + # a name, dask will treat them as the same array and only compute it once + # in a joint dask.compute() call. This is the mechanism behind deduplication. + name = f"load-{varname}-{dask.base.tokenize(datafilepath, varname, chunkinfo)}" + + # Create a dict of tasks. + dsk = {} + for i in range(NXPE): + xslice = xslices[i] + for j in range(NYPE): + yslice = yslices[j] + + filepath = os.path.join(datafilepath, f"{prefix}.{j * NXPE + i}.nc") + # Create a lazy-loaded array + lazy = LazyFileArray(filepath, varname, file_shape, dtype, info=info) + + # Store the lazy object as a separate key. + # lazy_name resolves to an object, not an array of data. + # Dask only reads data when a task returns a numpy array. + lazy_name = (f"lazy-{name}", i, j) + dsk[lazy_name] = lazy + + # The integer indices in the task key tuple + # (name, i0, i1, i2, i3) directly map to chunk positions + chunkpos = tuple( + i if d == xdim else j if d == ydim else 0 for d in range(ndims) + ) + + # Keep all dimensions but slice in x and y + slices = [ + slice(None), + ] * ndims + slices[xdim] = xslice + slices[ydim] = yslice + slices = tuple(slices) + + # Slice the LazyFileArray to remove boundary cells. + # Dask can fuse slices only if they use dask.array.core.getter + # The optimization is in _optimize_slices implemented here: + # https://github.com/dask/dask/blob/main/dask/array/optimization.py#L94 + # getter then passes slices through to LazyFileArray so that only the + # required data is read from disk. + dsk[(name, *chunkpos)] = ( + getter, + lazy_name, + slices, # Remove boundary cells + False, # asarray + False, # lock + ) + # Chunk sizes. Use file_shape except in 'x' and 'y' dimensions + chunks = [(size,) for size in file_shape] + chunks[xdim] = xchunks + chunks[ydim] = ychunks + chunks = tuple(chunks) + + return dask.array.Array(dsk, name, chunks, dtype=dtype) + + +def lazy_open_boutdataset( + datapath, + keep_xboundaries: bool = False, + keep_yboundaries: bool = False, + is_restart: bool = False, + info: bool = False, + **kwargs, +): + """ + Open a multi-file dataset by only opening one file. + Dask chunks are created for all processors using the + metadata read from the first file. + + datapath : str or Path + Directory containing the BOUT++ data files + + keep_xboundaries : bool, optional + If true, keep x-direction boundary cells (the cells past the + physical edges of the grid, where boundary conditions are + set); increases the size of the x dimension in the returned + data-set. If false, trim these cells. + + keep_yboundaries : bool, optional + If true, keep y-direction boundary cells (the cells past the + physical edges of the grid, where boundary conditions are + set); increases the size of the y dimension in the returned + data-set. If false, trim these cells. + + """ + prefix = "BOUT.restart" if is_restart else "BOUT.dmp" + + # Open first file to read metadata + ds = xr.open_dataset(os.path.join(datapath, f"{prefix}.0.nc")) + + # Extract all scalars as metadata + metadata = { + name: var.item() for name, var in ds.data_vars.items() if len(var.dims) == 0 + } + + # Identify processor layout and the array slices from each file + chunkinfo = make_chunkinfo( + metadata, keep_xboundaries=keep_xboundaries, keep_yboundaries=keep_yboundaries + ) + + # Process all data variables + data_vars = {} + for name, var in ds.data_vars.items(): + if "x" in var.dims and "y" in var.dims: + # Array distributed over processors in x and y + data_vars[name] = xr.DataArray( + make_lazy_array( + datapath, ds, chunkinfo, name, prefix=prefix, info=info + ), + dims=var.dims, + attrs=var.attrs, + ) + elif len(var.dims) == 0: + continue # scalars already in metadata + elif ("x" not in var.dims) and ("y" not in var.dims): + # Take DataArray from first processor + data_vars[name] = var + else: + # If only 'x' or only 'y' dimension then skip + warnings.warn( + f"Variable '{name}' has only one of x/y dimensions and will be skipped" + ) + + coords = {} + if "t_array" in ds: + coords["t"] = ds["t_array"].values + + # Create a global dataset + return xr.Dataset(data_vars, coords=coords, attrs={"metadata": metadata}) diff --git a/xbout/load.py b/xbout/load.py index c3ddedd1..8213a653 100644 --- a/xbout/load.py +++ b/xbout/load.py @@ -10,6 +10,7 @@ from natsort import natsorted from . import geometries +from . import lazyload from .utils import ( _set_attrs_on_all_vars, _separate_metadata, @@ -64,12 +65,13 @@ def open_boutdataset( gridfilepath=None, grid_mismatch="raise", chunks=None, - keep_xboundaries=True, - keep_yboundaries=False, + keep_xboundaries: bool = True, + keep_yboundaries: bool = False, run_name=None, - info=True, - is_restart=None, - is_mms_dump=False, + info: bool = True, + is_restart: bool = None, + is_mms_dump: bool = False, + lazy_load: bool = True, **kwargs, ): """Load a dataset from a set of BOUT output files, including the @@ -156,6 +158,10 @@ def open_boutdataset( set); increases the size of the y dimension in the returned data-set. If false, trim these cells. + lazy_load : bool, optional + If true, lazy load the dataset when possible. In a multi-file + dataset this avoids opening more files than necessary. + run_name : str, optional Name to give to the whole dataset, e.g. 'JET_ELM_high_resolution'. Useful if you are going to @@ -283,15 +289,52 @@ def attrs_remove_section(obj, section): # Determine if file is a grid file or data dump files remove_yboundaries = False if "dump" in input_type or "restart" in input_type: - # Gather pointers to all numerical data from BOUT++ output files - ds, remove_yboundaries = _auto_open_mfboutdataset( - datapath=datapath, - chunks=chunks, - keep_xboundaries=keep_xboundaries, - keep_yboundaries=keep_yboundaries, - is_restart=is_restart, - **kwargs, - ) + + def is_netcdf_collection(datapath): + if not isinstance(datapath, str): + return None + # Expand globs into a list of files + p = Path(datapath) + filepaths = list(p.parent.glob(p.name)) + if len(filepaths) == 0: + raise ValueError(f"File not found: {datapath}") + if all( + [ + filepath.parent == filepaths[0].parent and filepath.suffix == ".nc" + for filepath in filepaths + ] + ): + return filepaths[0].parent + return None + + # The directory containing the files or None if not a collection + dataset_dir = is_netcdf_collection(datapath) + + if lazy_load and dataset_dir: + # All files are NetCDF and all in the same directory + # Lazyload only opens one file and infers file layout from that + + ds = lazyload.lazy_open_boutdataset( + dataset_dir, + keep_xboundaries=keep_xboundaries, + keep_yboundaries=keep_yboundaries, + is_restart=is_restart, + info=info, + **kwargs, + ) + remove_yboundaries = False + else: + # Gather pointers to all numerical data from BOUT++ output files + # This opens all files then concatenates + ds, remove_yboundaries = _auto_open_mfboutdataset( + datapath=datapath, + chunks=chunks, + keep_xboundaries=keep_xboundaries, + keep_yboundaries=keep_yboundaries, + is_restart=is_restart, + **kwargs, + ) + elif "grid" in input_type: # Its a grid file ds = _open_grid( @@ -305,6 +348,7 @@ def attrs_remove_section(obj, section): raise ValueError(f"internal error: unexpected input_type={input_type}") ds, metadata = _separate_metadata(ds) + # Store as ints because netCDF doesn't support bools, so we can't save # bool attributes metadata["keep_xboundaries"] = int(keep_xboundaries) diff --git a/xbout/utils.py b/xbout/utils.py index b4a00f31..0c109f96 100644 --- a/xbout/utils.py +++ b/xbout/utils.py @@ -79,6 +79,9 @@ def _separate_metadata(ds): metadata_vals = [ds[var].values.item() for var in scalar_vars] metadata_vals = [x.decode() if isinstance(x, bytes) else x for x in metadata_vals] metadata = dict(zip(scalar_vars, metadata_vals)) + if hasattr(ds, "metadata"): + # Combine with existing metadata + metadata = ds.metadata | metadata # Add default values for dimensions to metadata. These may be modified later by # apply_geometry()