diff --git a/.github/workflows/doc.yaml b/.github/workflows/doc.yaml index 3b3f99d..97fb160 100644 --- a/.github/workflows/doc.yaml +++ b/.github/workflows/doc.yaml @@ -17,6 +17,10 @@ jobs: runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 + # https://github.com/actions/checkout/issues/1471 + # Clobber local unannotated tag + - name: Fetch tags + run: git fetch --prune --unshallow --tags --force - uses: mamba-org/setup-micromamba@v2 with: environment-name: docs diff --git a/.gitignore b/.gitignore index bd62804..860ba5d 100644 --- a/.gitignore +++ b/.gitignore @@ -10,3 +10,5 @@ dist coverage.xml jupyter_execute docs/implementations/data +docs/implementations/data_l2_lr_ssh +docs/implementations/data_l3_lr_ssh diff --git a/docs/advanced.md b/docs/advanced.md index dc3d8e0..f1088c9 100644 --- a/docs/advanced.md +++ b/docs/advanced.md @@ -48,6 +48,7 @@ Remote file system listing can be quite long. Implementations are usually shipped with layouts for an improved listing speed. See the {ref}`Layout ` introduction if listing performance becomes an issue. +(disable-layouts)= ## Disable layouts diff --git a/docs/getting_started.md b/docs/getting_started.md index 54ee7fa..853ab7c 100644 --- a/docs/getting_started.md +++ b/docs/getting_started.md @@ -48,6 +48,7 @@ given criterias ```{code-cell} from fcollections.implementations import NetcdfFilesDatabaseSwotLRL2 + fc = NetcdfFilesDatabaseSwotLRL2(path) fc.list_files(cycle_number=1) ``` @@ -83,6 +84,8 @@ ds = fc.query(selected_variables=['ssha']) list(ds.variables) ``` +### Filter types + Each implementation has its own filters. By order of availability, the user should consult: @@ -95,23 +98,42 @@ should consult: fc.query? ``` +### Filter values + +Possible values for a given filter can be displayed + +```{code-cell} +fc.filter_values('version') +``` + +Only filters whose information are contained in the intermediated folders can be +scanned in a quick way, other will trigger a full scan. As such, to ensure +optimal performance, this method should be called with the layouts enabled, with +files organized with folders (see the [advanced section](#disable-layouts)), and +on filters whose information is encoded in the folders. + ## Access metadata The database can display information about the variables and attributes contained in the files' collection using the ``variables_info`` method ```{code-cell} -fc.variables_info(subset='Expert') +# Use the enumeration name for filtering a specific subset +fc.variables_info() ``` It will offer a simple collapsible tree view with multiple levels of nesting depending on the data you manipulate -In order to return consistent metadata, the method ensures that only one -homogeneous subset is selected. In case you handle unmixable data (for example -Expert and Unsmoothed datasets), you must give proper filters on the subset -partitioning keys ``fc.unmixer.partition_keys``. If these filters are missing, -an error with the possible choices will be raised. +## Subsets + +### Errors on mixed subsets + +In order to return consistent results, most methods must work on an homogeneous +subset of data. In case multiple subsets are mixed (for example Expert and +Unsmoothed datasets), proper filters matching the partitioning keys must be +given. If these filters are missing, an error with the possible choices will be +raised. ```{code-cell} :tags: [raises-exception] @@ -123,7 +145,32 @@ ds.to_netcdf(f'{path}/SWOT_L2_LR_SSH_Unsmoothed_001_012_20240101T030000_20240101 fc.variables_info() ``` +### Compatibility matrix + +The following table summarizes which methods can work on mixed data. Most +methods need homogeneous data and will require filtering the subset. + +| Method | Works on mixed data ? | +|--------------------|-----------------------| +| ``list_files`` | Yes | +| ``variables_info`` | No | +| ``filter_values`` | No | +| ``query`` | No | +| ``map`` | No | + +### Listing subsets + +Subsets that are on the file system can be listed using the +{meth}`subsets ` property. + +```{code-cell} +fc.subsets +``` + +One of the returned choices must be selected and used as a filter to work on an +homogeneous dataset. + ```{code-cell} -# Use the enumeration name for filtering +# Use the enumeration name for filtering a specific subset fc.variables_info(subset='Expert') ``` diff --git a/docs/implementations/l2_lr_ssh.md b/docs/implementations/l2_lr_ssh.md index ab69416..f2f2849 100644 --- a/docs/implementations/l2_lr_ssh.md +++ b/docs/implementations/l2_lr_ssh.md @@ -140,7 +140,7 @@ There are currently three modes for stacking the half orbits are cropped and we need an additional dimension to reflect the spatial jump ```{code-cell} -fc = NetcdfFilesDatabaseSwotLRL2("data") +fc = NetcdfFilesDatabaseSwotLRL2("data_l2_lr_ssh") ds = fc.query(stack='CYCLES', cycle_number=[9, 10, 11], pass_number=10, subset='Basic') ds.ssha_karin_2.data ``` diff --git a/docs/implementations/l3_lr_ssh.md b/docs/implementations/l3_lr_ssh.md index a81ab61..02d201c 100644 --- a/docs/implementations/l3_lr_ssh.md +++ b/docs/implementations/l3_lr_ssh.md @@ -127,7 +127,7 @@ There are currently three modes for stacking the half orbits are cropped and we need an additional dimension to reflect the spatial jump ```{code-cell} -fc = NetcdfFilesDatabaseSwotLRL3("data") +fc = NetcdfFilesDatabaseSwotLRL3("data_l3_lr_ssh") ds = fc.query(stack='CYCLES', version='2.0.1', cycle_number=[1, 2, 3], pass_number=10, subset='Basic') ds.ssha_filtered.data ``` diff --git a/docs/implementations/scripts/pull_data_l2_lr_ssh.py b/docs/implementations/scripts/pull_data_l2_lr_ssh.py index 32358a1..b0a8a53 100644 --- a/docs/implementations/scripts/pull_data_l2_lr_ssh.py +++ b/docs/implementations/scripts/pull_data_l2_lr_ssh.py @@ -6,7 +6,7 @@ logging.basicConfig() logging.getLogger("altimetry_downloader_aviso").setLevel("INFO") -DATA_DIR = Path(__file__).resolve().parent.parent / "data" +DATA_DIR = Path(__file__).resolve().parent.parent / "data_l2_lr_ssh" DATA_DIR.mkdir(exist_ok=True) if __name__ == "__main__": diff --git a/docs/implementations/scripts/pull_data_l3_lr_ssh.py b/docs/implementations/scripts/pull_data_l3_lr_ssh.py index fe02351..58d17c8 100644 --- a/docs/implementations/scripts/pull_data_l3_lr_ssh.py +++ b/docs/implementations/scripts/pull_data_l3_lr_ssh.py @@ -6,7 +6,7 @@ logging.basicConfig() logging.getLogger("altimetry_downloader_aviso").setLevel("INFO") -DATA_DIR = Path(__file__).resolve().parent.parent / "data" +DATA_DIR = Path(__file__).resolve().parent.parent / "data_l3_lr_ssh" DATA_DIR.mkdir(exist_ok=True) if __name__ == "__main__": diff --git a/src/fcollections/core/_filesdb.py b/src/fcollections/core/_filesdb.py index c72996d..40cafcb 100644 --- a/src/fcollections/core/_filesdb.py +++ b/src/fcollections/core/_filesdb.py @@ -4,12 +4,14 @@ import abc import dataclasses as dc +import functools import inspect import logging import textwrap import typing as tp import warnings from abc import ABCMeta +from copy import copy import docstring_parser as dcs import pandas as pda @@ -17,7 +19,7 @@ from fsspec.implementations.local import LocalFileSystem from ._filenames import FileNameConvention -from ._listing import DirNode, FileSystemMetadataCollector, Layout +from ._listing import DirNode, FileSystemMetadataCollector, Layout, LayoutMismatchError from ._metadata import GroupMetadata from ._readers import IFilesReader @@ -45,6 +47,7 @@ def __new__(cls, clsname, bases, attrs): _create_method(attrs, "query", "_query") _create_method(attrs, "list_files", "_files") _create_method(attrs, "variables_info", "_variables_info") + _create_method(attrs, "filter_values", "_filter_values") _create_method(attrs, "map", "_map") new_class = super().__new__(cls, clsname, bases, attrs) @@ -88,7 +91,13 @@ def __new__(cls, clsname, bases, attrs): new_class._variables_info.__doc__, *method_parameters["variables_info"], ) - + _patch_method( + new_class, + "filter_values", + "_filter_values", + new_class._filter_values.__doc__, + *method_parameters["filter_values"], + ) return new_class @@ -159,6 +168,7 @@ def _is_subset_key( map(lambda x: x[1], filter(_is_subset_key, parameters["convention"][1].items())) ) out["variables_info"] = (info_docstring, info_signature) + out["filter_values"] = (info_docstring, info_signature) return out @@ -581,29 +591,32 @@ def _query(self, **kwargs) -> xr_t.Dataset | None: return ds def _variables_info(self, **kwargs) -> GroupMetadata | None: - """Returns the variables metadata. + """Return metadata describing the variables. Because the files collection may mix multiple subsets, we want to ensure that we return the variables of one subset only. The parameters of this method are the subset partitioning keys and can be given by the user to ensure a consistent set of variables. If the input parameters are not sufficient to unmix the subsets, the user will be notified with a - ValueError + ``ValueError``. Returns ------- + GroupMetadata | None A GroupMetadata containing the variables, dimensions, attributes and subgroups. None is returned in case no files is found for the given subset + See Also + -------- + FilesDatabase.subsets: To list the subsets keys. + Raises ------ LayoutMismatchError - In case ``enable_layouts`` is True and a mismatch between the - layouts and the actual files is detected + Raised if ``enable_layouts`` is True and a mismatch is detected. ValueError - In case if one unique and homogeneous subset could not be extracted - from the files metadata table + Raised if a unique and homogeneous subset cannot be extracted. """ # This docstring will be superseded by the metaclass unknown = kwargs.keys() - ( @@ -667,6 +680,152 @@ def wrapped(record: dict[str, tp.Any]): bag = dask.bag.core.from_sequence(df.to_records()) return bag.map(wrapped) + def _filter_values(self, filter_name: str, **kwargs: tp.Any) -> set[tp.Any]: + """Returns the possible values for a given filter. + + Because the files collection may mix multiple subsets, we want to ensure + that we return the filter values for one subset only. The parameters of + this method are the subset partitioning keys and can be given by the + user. + + Parameters + ---------- + filter_name + Name of the filter for which the possible values should be + requested. + + Returns + ------- + : + A set containing the possible values for a given filter. + + Warns + ----- + UserWarning + If the requested filter information cannot be extracted from the + layouts' intermediate node. In this case, a full scan is triggered, + which will slow down the computation. + UserWarning + If layouts are not enabled for this instance. In this case, a full + scan is triggered, which will slow down the computation. + + Raises + ------ + LayoutMismatchError + In case ``enable_layouts`` is True and a mismatch between the + layouts and the actual files is detected + ValueError + In case if one unique and homogeneous subset could not be extracted + from the files metadata table + ValueError + In case the input filter name is in none of the configured layouts. + + See Also + -------- + subsets + To list the subsets keys. + """ + if self.enable_layouts: + conventions = map(lambda l: l.conventions, self.layouts) + conventions = functools.reduce(lambda a, b: a + b, conventions) + fields = map( + lambda convention: {f.name for f in convention.fields}, conventions + ) + field_names = functools.reduce(lambda a, b: a | b, fields) + else: + field_names = { + field.name for field in self.layouts[0].conventions[-1].fields + } + + if filter_name not in field_names: + msg = ( + f"Unknown filter name {filter_name}. Possible values are " + f"{field_names}" + ) + raise ValueError(msg) + + missing_partition_keys = set(self.unmixer.partition_keys) - set(kwargs.keys()) + if ( + filter_name not in self.unmixer.partition_keys + and len(self.subsets) > 1 + and len(missing_partition_keys) > 0 + ): + msg = ( + "Cannot work on heterogenous datasets, subset should be chosen " + f"by providing the following filters: {self.unmixer.partition_keys} " + "(subsets can be displayed using the 'subsets' property)." + ) + raise ValueError(msg) + + if not self.enable_layouts: + msg = ( + "Layouts are not enabled, full scan is necessary to deduce " + f"possible values of filter '{filter_name}'. Enable layouts to " + "improve performance." + ) + warnings.warn(msg) + return set(self.list_files(**kwargs)[filter_name]) + + selected_layouts = [] + for layout in self.layouts: + for ii, convention in enumerate(layout.conventions[:-1]): + # We could also handle a more complex case with a + # record containing multiple pieces of information. However, the + # current payload returned by the Visitors must be reviewed + # beforehand + if ( + len(convention.fields) == 1 + and convention.fields[0].name == filter_name + ): + selected_layouts.append(Layout(layout.conventions[: ii + 1])) + break + + if len(selected_layouts) == 0: + msg = ( + "Layouts do not contain intermediate nodes (directories) " + f"with information about filter '{filter_name}'. Falling back " + "to full scan to deduce the filters' possible values (the " + "performance is degraded)." + ) + warnings.warn(msg) + return set(self.list_files(**kwargs)[filter_name]) + + metadata_collector = FileSystemMetadataCollector( + selected_layouts, self.discoverer.root_node + ) + + try: + return {x[0] for x in metadata_collector.discover(**kwargs)} + except LayoutMismatchError: + logger.info( + "Layouts are enabled and should contain information about " + "filter '%s' in their intermediate nodes. However," + " the actual file organization does not match the layouts. " + "Falling back to full scan (the performance is degraded).", + filter_name, + ) + return set(self.list_files(**kwargs)[filter_name]) + + @property + def subsets(self) -> list[dict[str, tp.Any]]: + """List of the subsets combinations.""" + if self.unmixer is None: + return [] + return list(self._subsets(0)) + + def _subsets( + self, current_index: int, **higher_partition_values: tp.Any + ) -> tp.Generator[dict[str, tp.Any], None, None]: + current_key = self.unmixer.partition_keys[current_index] + current_values = self._filter_values(current_key, **higher_partition_values) + for current_value in current_values: + result = copy(higher_partition_values) + result[current_key] = current_value + if current_index < len(self.unmixer.partition_keys) - 1: + yield from self._subsets(current_index + 1, **result) + else: + yield result + @dc.dataclass class SubsetsUnmixer: diff --git a/src/fcollections/core/_listing.py b/src/fcollections/core/_listing.py index 2d2dfd1..dfe076a 100644 --- a/src/fcollections/core/_listing.py +++ b/src/fcollections/core/_listing.py @@ -264,6 +264,9 @@ def children(self) -> tp.Iterator[INode]: The child nodes, either files or folders """ + def clear(self): + """Clear child nodes.""" + class FileNode(INode): """File node of a file system tree.""" @@ -326,6 +329,9 @@ def children(self) -> tp.Iterable[INode]: self._children = list(self._compute_children()) return self._children + def clear(self): + self._children = None + def _compute_children(self) -> tp.Iterator[INode]: # return list of FileNode or DirNode instances # Block of code extracted from fsspec and simplified (no topbottom @@ -523,7 +529,7 @@ def __init__( layouts: list[Layout], stat_fields: tp.Iterable[str] = tuple(), on_mismatch_directory: LayoutMismatchHandling = LayoutMismatchHandling.RAISE, - on_mismatch_file: LayoutMismatchHandling = LayoutMismatchHandling.IGNORE, + on_mismatch_file: LayoutMismatchHandling = LayoutMismatchHandling.RAISE, ): self.layouts = layouts self.stat_fields = list(stat_fields) @@ -576,6 +582,7 @@ def visit_dir(self, dir_node: DirNode) -> VisitResult: layouts_for_children: list[Layout] = [] record = None + return_payload = False for layout in self.layouts: # Prune non matching layouts for this directory. We need to test all # layouts to eliminate non matching layouts as early as possible in @@ -583,6 +590,15 @@ def visit_dir(self, dir_node: DirNode) -> VisitResult: result = layout.parse_node(dir_node.level - 1, dir_node.name) if result is not None: layouts_for_children.append(layout) + + # In case the layout leaf is a folder, the exploration must be + # stopped even though the actual DirNode has children. This is + # because the layout won't be able to go deeper. This case arises + # when we want to extract information at a given level in the file + # system hierarchy. + if result is not None and dir_node.level == len(layout.conventions): + return_payload = True + if record is None: # Do not override a valid record with a None record = result @@ -603,8 +619,12 @@ def visit_dir(self, dir_node: DirNode) -> VisitResult: ) return VisitResult(False) - # Don't return a payload for dir nodes (will be subject to change later) - return VisitResult(True, None, layouts_for_children) + # Return payload for dir nodes only if dir nodes are the last convention + # defined in the layouts. + if return_payload: + return VisitResult(False, record, []) + else: + return VisitResult(True, None, layouts_for_children) def visit_file(self, file_node: FileNode) -> VisitResult: """Visits a file node. @@ -784,6 +804,7 @@ def walk(node: INode, visitor: IVisitor) -> tp.Iterator[tp.Any]: return for child in node.children(): + logger.debug("child %s", child.name) yield from walk(child, visitor.advance(result)) @@ -915,6 +936,7 @@ def discover( # to modify the Layout interface layout.set_filters(**filters) + self.root_node.clear() if enable_layouts: logger.debug("Using layouts to speed up listing") visitor = LayoutVisitor(self.layouts, stat_fields) diff --git a/src/fcollections/implementations/_l2_lr_ssh.py b/src/fcollections/implementations/_l2_lr_ssh.py index dde774a..c43c06c 100644 --- a/src/fcollections/implementations/_l2_lr_ssh.py +++ b/src/fcollections/implementations/_l2_lr_ssh.py @@ -31,7 +31,7 @@ SWOT_L2_PATTERN = re.compile( - r"SWOT_(?P.*)_LR_SSH_(?P.*)_(?P\d{3})_(?P\d{3})_" + r"SWOT_L2_LR_SSH_(?P.*)_(?P\d{3})_(?P\d{3})_" r"(?P