diff --git a/docs/getting_started.md b/docs/getting_started.md index 853ab7c..e804b10 100644 --- a/docs/getting_started.md +++ b/docs/getting_started.md @@ -112,6 +112,15 @@ optimal performance, this method should be called with the layouts enabled, with files organized with folders (see the [advanced section](#disable-layouts)), and on filters whose information is encoded in the folders. +In case you are working on a small set of data, it is safe to ignore this +warning + +```{code-cell} +from fcollections.core import PerformanceWarning +import warnings +warnings.simplefilter("ignore", PerformanceWarning) +``` + ## Access metadata The database can display information about the variables and attributes diff --git a/src/fcollections/core/__init__.py b/src/fcollections/core/__init__.py index f34ed6e..f02ffff 100644 --- a/src/fcollections/core/__init__.py +++ b/src/fcollections/core/__init__.py @@ -29,6 +29,7 @@ FilesDatabase, IFilterBuilder, NotExistingPathError, + PerformanceWarning, SubsetsUnmixer, ) from ._listing import ( @@ -114,4 +115,5 @@ "walk", "VisitResult", "FileSystemMetadataCollector", + "PerformanceWarning", ] diff --git a/src/fcollections/core/_filesdb.py b/src/fcollections/core/_filesdb.py index b51cb49..eacf56e 100644 --- a/src/fcollections/core/_filesdb.py +++ b/src/fcollections/core/_filesdb.py @@ -310,6 +310,10 @@ def _patch_method( getattr(cls, name).__signature__ = signature +class PerformanceWarning(UserWarning): + """Warn user about slow computations.""" + + class FilesDatabase(metaclass=FilesDatabaseMeta): """Abstract database mapping. @@ -489,6 +493,22 @@ def _files( msg = f"list_files() got unexpected keyword argument(s): {bad_kwargs}" raise ValueError(msg) + # Try to ensure that the subset is unique prior to launching the scan. + if unmix and self.unmixer is not None: + with warnings.catch_warnings(): + warnings.simplefilter("error", category=PerformanceWarning) + try: + subset_filters = self.unmixer.pick_subset(self.subsets, **kwargs) + kwargs |= subset_filters + unmix = False + except IndexError: + logger.debug("No subset, nothing to unmix") + unmix = False + except PerformanceWarning: + logger.debug( + "Subset unmixing could not be done before the files scan: it will be done after." + ) + # Auto-build declared predicates and additionnal filters. predicates = list(predicates) if self.filter_builders is not None: @@ -735,13 +755,15 @@ def _filter_values(self, filter_name: str, **kwargs: tp.Any) -> set[tp.Any]: Warns ----- - UserWarning + PerformanceWarning If the requested filter information cannot be extracted from the layouts' intermediate node. In this case, a full scan is triggered, which will slow down the computation. - UserWarning + PerformanceWarning If layouts are not enabled for this instance. In this case, a full scan is triggered, which will slow down the computation. + PerformanceWarning + If listing the subsets is slow. Raises ------ @@ -759,37 +781,16 @@ def _filter_values(self, filter_name: str, **kwargs: tp.Any) -> set[tp.Any]: subsets To list the subsets keys. """ - if self.enable_layouts: - conventions = map(lambda l: l.conventions, self.layouts) - conventions = functools.reduce(lambda a, b: a + b, conventions) - fields = map( - lambda convention: {f.name for f in convention.fields}, conventions - ) - field_names = functools.reduce(lambda a, b: a | b, fields) - else: - field_names = { - field.name for field in self.layouts[0].conventions[-1].fields - } - - if filter_name not in field_names: - msg = ( - f"Unknown filter name {filter_name}. Possible values are " - f"{field_names}" - ) - raise ValueError(msg) - - missing_partition_keys = set(self.unmixer.partition_keys) - set(kwargs.keys()) - if ( - filter_name not in self.unmixer.partition_keys - and len(self.subsets) > 1 - and len(missing_partition_keys) > 0 - ): - msg = ( - "Cannot work on heterogenous datasets, subset should be chosen " - f"by providing the following filters: {self.unmixer.partition_keys} " - "(subsets can be displayed using the 'subsets' property)." - ) - raise ValueError(msg) + # Implementation note: all branches leading to calling list_files must + # raise a PerformanceWarning prior to calling the method. list_files + # itself calls filter_values to try unmixing the subsets prior to the + # listing. To avoid an recursion error, list_files relies on the emitted + # warnings to know that it cannot list the subsets quickly, and must do + # the unmixing after the listing instead. + self._validate_field(filter_name) + unmix = ( + self.unmixer is not None and filter_name not in self.unmixer.partition_keys + ) if not self.enable_layouts: msg = ( @@ -797,8 +798,8 @@ def _filter_values(self, filter_name: str, **kwargs: tp.Any) -> set[tp.Any]: f"possible values of filter '{filter_name}'. Enable layouts to " "improve performance." ) - warnings.warn(msg) - return set(self.list_files(**kwargs)[filter_name]) + warnings.warn(msg, PerformanceWarning) + return set(self.list_files(**kwargs, unmix=unmix)[filter_name]) selected_layouts = [] for layout in self.layouts: @@ -821,24 +822,71 @@ def _filter_values(self, filter_name: str, **kwargs: tp.Any) -> set[tp.Any]: "to full scan to deduce the filters' possible values (the " "performance is degraded)." ) - warnings.warn(msg) - return set(self.list_files(**kwargs)[filter_name]) + warnings.warn(msg, PerformanceWarning) + return set(self.list_files(**kwargs, unmix=unmix)[filter_name]) metadata_collector = FileSystemMetadataCollector( selected_layouts, self.discoverer.root_node ) try: + # Check if there is one selected subset. It will raise a ValueError if + # there is an ambiguity. We need the subsets list whether the listing + # is quick or slow. In case of a slow computation of subsets, a + # warning will be emitted + if unmix: + try: + kwargs |= self.unmixer.pick_subset(self.subsets, **kwargs) + except IndexError: + logger.debug("No subset, nothing to unmix") + return {x[0] for x in metadata_collector.discover(**kwargs)} except LayoutMismatchError: - logger.info( + msg = ( "Layouts are enabled and should contain information about " - "filter '%s' in their intermediate nodes. However," - " the actual file organization does not match the layouts. " - "Falling back to full scan (the performance is degraded).", - filter_name, + f"filter '{filter_name}' in their intermediate folders. However," + " the actual file organization either is flat or does not match" + " the folders. Falling back to full scan (the performance is " + "degraded)." ) - return set(self.list_files(**kwargs)[filter_name]) + warnings.warn(msg, PerformanceWarning) + return set(self.list_files(**kwargs, unmix=unmix)[filter_name]) + + def _validate_field(self, filter_name: str): + """Check a field is declared in one of the layouts. + + If the layouts are not enabled, only the file name convention (last + convention of one of the layouts) is considered for the check. + + Parameters + ---------- + filter_name + Name of the filter to check against the declared fields in the + conventions. + + Raises + ------ + ValueError + If the input filter name is unknown. + """ + if self.enable_layouts: + conventions = map(lambda l: l.conventions, self.layouts) + conventions = functools.reduce(lambda a, b: a + b, conventions) + fields = map( + lambda convention: {f.name for f in convention.fields}, conventions + ) + field_names = functools.reduce(lambda a, b: a | b, fields) + else: + field_names = { + field.name for field in self.layouts[0].conventions[-1].fields + } + + if filter_name not in field_names: + msg = ( + f"Unknown filter name {filter_name}. Possible values are " + f"{field_names}" + ) + raise ValueError(msg) @property def subsets(self) -> list[dict[str, tp.Any]]: @@ -863,47 +911,77 @@ def _subsets( @dc.dataclass class SubsetsUnmixer: - partition_keys: tuple[str, ...] | dict[str, tp.Callable | None] + """Subset unmixing. + + In case multiple subsets are present, this class can be used to + enforce an auto pick for some pre-configured keys. The second role + of this class is to enforce that only one subset is present at a + time. An error will be raised if the auto picks cannot reduce the + number of subsets to 1. + + Raises + ------ + ValueError + If the auto_pick keys are not partitioning keys. + """ + + partition_keys: tuple[str, ...] + """Partitioning keys for subsets.""" auto_pick_last: tuple[str, ...] = dc.field(default_factory=tuple) + """Auto picked keys for subset selection. - def __call__(self, df: pda.DataFrame) -> pda.DataFrame: - if len(df) == 0: - return df + The auto picked keys are used to select one subset amongst multiple + choices. The keys must sortable so that the last element is chosen. + """ - try: - subsets = df.groupby( - [ - ( - df[partition_key].apply(transform) - if transform is not None - else df[partition_key] - ) - for partition_key, transform in self.partition_keys.items() - ] - ) - except AttributeError: - # We have a tuple - grouping_keys = list(self.partition_keys) - subsets = df.groupby( - # In pandas 2, a list with one single element gave a scalar keys - # for the groups. In pandas 3, a future warning is raised, - # asking to give either a single key or a list with more than - # one key - grouping_keys if len(grouping_keys) > 1 else grouping_keys[0], - # Pandas 3 tries to sort the groups, which can raise an error if - # a column cannot be ordered. We don't need this sort so we - # disable it - sort=False, + def __post_init__(self): + not_partition_keys = set(self.auto_pick_last) - set(self.partition_keys) + if len(not_partition_keys) > 0: + msg = ( + "Auto pick keys should be partitioning keys: " + f"{not_partition_keys} are not partitioning keys" ) + raise ValueError(msg) - # Pick one subset using panda duplicate handling - subset_names = [ - (group,) if len(self.partition_keys) == 1 else group - for group in subsets.groups.keys() - ] - df_subsets = pda.DataFrame.from_records( - subset_names, columns=self.partition_keys + def pick_subset( + self, subsets: list[dict[str, tp.Any]], **subset_filters: tp.Any + ) -> dict[str, tp.Any]: + """Manual and auto pick of a subset amongst multiple choices. + + Parameters + ---------- + subsets + List of subsets given as mapping between the partitioning keys and + their values. + subset_filters + Manual values for filtering the subsets. + + Raises + ------ + ValueError + In case the auto pick does not reduce the number of subsets to 1. + + Returns + ------- + dict[str, tp.Any] + The automatically selected subset. + """ + logger.debug("%s subsets before manual pick", len(subsets)) + subsets_filtered = list( + filter( + lambda subset: all( + [ + (item[0] not in subset_filters) + or (item[1] == subset_filters[item[0]]) + for item in subset.items() + ] + ), + subsets, + ) ) + logger.debug("%s subsets after manual pick", len(subsets_filtered)) + + df_subsets = pda.DataFrame.from_records(subsets_filtered) # Sort the dataframe containing the subset using the auto_pick_last keys # Unique records of manual_pick keys will be chosen relying on this sort @@ -929,14 +1007,68 @@ def __call__(self, df: pda.DataFrame) -> pda.DataFrame: ) raise ValueError(msg) - group_name = tuple(df_subsets.to_records(index=False)[-1]) - group_name = group_name if len(group_name) > 1 else group_name[0] - logger.debug("Subset selected %s", group_name) - return subsets.get_group(group_name) + subset = df_subsets.iloc[-1].to_dict() + logger.info("Picked subset %s", subset) + return subset + + def __call__(self, df: pda.DataFrame) -> pda.DataFrame: + """Auto pick a subset in the input dataframe. + + Parameters + ---------- + df + The input dataframe with possibly multiple subsets. + + Raises + ------ + ValueError + In case the auto pick does not reduce the number of subsets to 1. + + Returns + ------- + pandas.DataFrame + The reduced dataframe with only one subset. + """ + if len(df) == 0: + return df + + # We have a tuple + grouping_keys = list(self.partition_keys) + df_grouped = df.groupby( + # In pandas 2, a list with one single element gave a scalar keys + # for the groups. In pandas 3, a future warning is raised, + # asking to give either a single key or a list with more than + # one key + grouping_keys if len(grouping_keys) > 1 else grouping_keys[0], + # Pandas 3 tries to sort the groups, which can raise an error if + # a column cannot be ordered. We don't need this sort so we + # disable it + sort=False, + ) + + # Pick one subset using panda duplicate handling + subsets = [ + ( + dict(zip(grouping_keys, group)) + if len(grouping_keys) > 1 + else {grouping_keys[0]: group} + ) + for group in df_grouped.groups + ] + subset = self.pick_subset(subsets) + + group_name = tuple(subset[k] for k in grouping_keys) + group_name = group_name if len(subset) > 1 else group_name[0] + return df_grouped.get_group(group_name) @property def keys(self) -> set[str]: - return set(self.partition_keys) | set(self.auto_pick_last) + """Alias for the partition keys. + + Used by ``FilesDatabase`` to compare different class to the declared + fields in the layouts. + """ + return set(self.partition_keys) @dc.dataclass diff --git a/tests/core/test_filesdb.py b/tests/core/test_filesdb.py index 7346ccb..5b339d8 100644 --- a/tests/core/test_filesdb.py +++ b/tests/core/test_filesdb.py @@ -3,6 +3,7 @@ import re import sys import typing as tp +from contextlib import nullcontext from pathlib import Path import dask @@ -25,6 +26,7 @@ Layout, LayoutMismatchError, NotExistingPathError, + PerformanceWarning, SubsetsUnmixer, ) @@ -49,11 +51,6 @@ class FilesDatabaseTestInconsistentDeduplicator(FilesDatabase): deduplicator = Deduplicator(("a1",), ("a2",)) -class FilesDatabaseTestInconsistentUnmixer(FilesDatabase): - layouts = [Layout([FileNameConventionTest()])] - unmixer = SubsetsUnmixer(("a1",), ("a2",)) - - class ReaderStub(IFilesReader): def read( @@ -205,8 +202,8 @@ def test_deduplicator_empty(): def test_unmixer_inconsistent(tmpdir: Path): - with pytest.raises(ValueError, match="Subsets Unmixer"): - FilesDatabaseTestInconsistentUnmixer(tmpdir) + with pytest.raises(ValueError, match="are not partitioning"): + SubsetsUnmixer(("a1",), ("a2",)) @pytest.mark.parametrize( @@ -237,6 +234,55 @@ def test_unmixing_empty(): assert len(unmixer(pda.DataFrame(columns=("version", "product")))) == 0 +@pytest.fixture(scope="session") +def subsets() -> list[dict[str, str]]: + return [ + {"version": "v1", "product": "B"}, + {"version": "v1", "product": "C"}, + {"version": "v2", "product": "A"}, + {"version": "v2", "product": "B"}, + ] + + +@pytest.mark.parametrize( + "context, auto_pick, subset_filters, expected", + [ + (nullcontext(), ("version", "product"), {}, {"version": "v2", "product": "B"}), + (nullcontext(), ("product", "version"), {}, {"version": "v1", "product": "C"}), + (pytest.raises(ValueError, match="not be unmixed"), ("version",), {}, None), + ( + nullcontext(), + ("version",), + {"product": "B"}, + {"version": "v2", "product": "B"}, + ), + ], +) +def test_unmixing_auto_pick_subset( + subsets: list[dict[str, str]], + context, + auto_pick: tuple[str, ...], + subset_filters: dict[str, str], + expected: dict[str, str], +): + + unmixer = SubsetsUnmixer( + partition_keys=("version", "product"), auto_pick_last=auto_pick + ) + + with context: + subset = unmixer.pick_subset(subsets, **subset_filters) + assert subset == expected + + +def test_unmixing_auto_pick_subset_no_unmix(subsets: list[dict[str, str]]): + + unmixer = SubsetsUnmixer( + partition_keys=("version", "product"), auto_pick_last=tuple() + ) + assert unmixer.pick_subset(subsets[:1]) == subsets[0] + + @pytest.mark.parametrize( "auto_pick, group_names", [ @@ -244,7 +290,7 @@ def test_unmixing_empty(): (("version", "product"), ("v2", "Expert")), ], ) -def test_unmixing_auto_pick( +def test_unmixing_auto_pick_dataframe( df_with_duplicates: pda.DataFrame, auto_pick: tuple[str, str], group_names: tuple[str, str], @@ -270,33 +316,6 @@ def test_unmixing_manual_pick(df_with_duplicates: pda.DataFrame): assert df.equals(df_with_duplicates[subset]) -def test_unmixing_callable(df_with_duplicates: pda.DataFrame): - """Use a callable to transform the columns prior to auto pick.""" - unmixer = SubsetsUnmixer( - partition_keys=("version", "product"), auto_pick_last=("product", "version") - ) - df = unmixer(df_with_duplicates) - subset = (df_with_duplicates["version"] == "v1") & ( - df_with_duplicates["product"] == "Unsmoothed" - ) - assert df.equals(df_with_duplicates[subset]) - - # We reverse the product column sort internally -> selecting Expert instead - # of Unsmoothed - unmixer = SubsetsUnmixer( - partition_keys={ - "version": None, - "product": lambda x: 1 if x == "Expert" else 0, - }, - auto_pick_last=("product", "version"), - ) - df = unmixer(df_with_duplicates) - subset = (df_with_duplicates["version"] == "v2") & ( - df_with_duplicates["product"] == "Expert" - ) - assert df.equals(df_with_duplicates[subset]) - - @pytest.fixture(scope="session") def db_with_files() -> FilesDatabaseTest: fs = fs_mem.MemoryFileSystem() @@ -600,6 +619,43 @@ def test_subsets(db_with_files_good_layout: FilesDatabaseTest): ) +class FilesDatabaseTestMultipleKeys(FilesDatabaseTestNoUnmixer): + unmixer = SubsetsUnmixer(("a_number", "time")) + + +@pytest.fixture(scope="session") +def db_with_files_multiple_subsets_keys( + db_with_files_bad_layout: FilesDatabaseTest, +) -> FilesDatabaseTestMultipleKeys: + fs = db_with_files_bad_layout.fs + fixed_path = Path(db_with_files_bad_layout.path) / "baz" + db = FilesDatabaseTestMultipleKeys(fixed_path, fs, enable_layouts=True) + return db + + +def test_subsets_multiple_keys( + db_with_files_multiple_subsets_keys: FilesDatabaseTestMultipleKeys, +): + with pytest.warns(PerformanceWarning): + assert len(db_with_files_multiple_subsets_keys.subsets) == 2 + + assert all( + [ + subset + in [ + {"a_number": 1, "time": np.datetime64("2025-01-01")}, + {"a_number": 2, "time": np.datetime64("2025-01-01")}, + ] + for subset in db_with_files_multiple_subsets_keys.subsets + ] + ) + + +def test_filters_value_empty_dir(tmp_path: Path): + db = FilesDatabaseTest(path=tmp_path) + assert len(db.filter_values("b_string")) == 0 + + def test_filters_value_full_scan_filter_not_in_layout( db_with_files_good_layout: FilesDatabaseTest, ): @@ -624,7 +680,7 @@ def test_filters_value_layouts_disabled_full_scan( db = FilesDatabaseTest( db_with_files_bad_layout.path, db_with_files_bad_layout.fs, enable_layouts=False ) - with pytest.warns(UserWarning, match="enabled"): + with pytest.warns(PerformanceWarning, match="enabled"): values = db.filter_values( "a_number", ) @@ -632,14 +688,18 @@ def test_filters_value_layouts_disabled_full_scan( def test_filters_value_full_scan_flat(db_with_files: FilesDatabaseTest): - values = db_with_files.filter_values("a_number") + with pytest.warns(PerformanceWarning, match="flat"): + values = db_with_files.filter_values("a_number") assert values == {1, 2} def test_filters_value_full_scan_layouts_mismatch( db_with_files_bad_layout: FilesDatabaseTest, ): - with pytest.raises(LayoutMismatchError): + with ( + pytest.warns(PerformanceWarning, match="flat"), + pytest.raises(LayoutMismatchError), + ): db_with_files_bad_layout.filter_values("b_string", a_number=1) @@ -656,5 +716,5 @@ def test_filters_value_unknown(db_with_files_good_layout: FilesDatabaseTest): def test_filters_value_missing_subset_selection( db_with_files_good_layout: FilesDatabaseTest, ): - with pytest.raises(ValueError, match="heterogenous datasets"): + with pytest.raises(ValueError, match="could not be unmixed"): db_with_files_good_layout.filter_values("b_string") diff --git a/tests/implementations/collections/test_l2_lr_ssh.py b/tests/implementations/collections/test_l2_lr_ssh.py index 11f5c87..480f460 100644 --- a/tests/implementations/collections/test_l2_lr_ssh.py +++ b/tests/implementations/collections/test_l2_lr_ssh.py @@ -8,6 +8,7 @@ import xarray as xr from utils import brute_force_geographical_selection, extract_box_from_polygon +from fcollections.core import PerformanceWarning from fcollections.implementations import ( AVISO_L2_LR_SSH_LAYOUT, L2Version, @@ -732,5 +733,6 @@ def test_subsets_flat(l2_lr_ssh_dir_empty_files: Path): ] db = NetcdfFilesDatabaseSwotLRL2(l2_lr_ssh_dir_empty_files) - assert len(db.subsets) == len(expected) - assert all([x in expected for x in db.subsets]) + with pytest.warns(PerformanceWarning): + assert len(db.subsets) == len(expected) + assert all([x in expected for x in db.subsets]) diff --git a/tests/implementations/collections/test_l3_lr_ssh.py b/tests/implementations/collections/test_l3_lr_ssh.py index 0857fe1..69f1ced 100644 --- a/tests/implementations/collections/test_l3_lr_ssh.py +++ b/tests/implementations/collections/test_l3_lr_ssh.py @@ -11,7 +11,7 @@ import xarray as xr from utils import brute_force_geographical_selection, extract_box_from_polygon -from fcollections.core import FileNameConvention, Layout +from fcollections.core import FileNameConvention, Layout, PerformanceWarning from fcollections.implementations import ( AVISO_L3_LR_SSH_LAYOUT_V2, AVISO_L3_LR_SSH_LAYOUT_V3, @@ -781,5 +781,7 @@ def test_subsets(l3_lr_ssh_dir: Path): "version": "1.0.2", }, ] - assert len(db.subsets) == len(expected) - assert all([subset in expected for subset in db.subsets]) + + with pytest.warns(PerformanceWarning): + assert len(db.subsets) == len(expected) + assert all([subset in expected for subset in db.subsets])