Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions docs/custom.md
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,8 @@ set this field in the subset unmixer instead, enforcing one version per
subset
```

(mixins)=

## Mixins

A custom {class}`FilesDatabase <fcollections.core.FilesDatabase>` may need additional functionalities apart from
Expand All @@ -404,10 +406,12 @@ classes adding the functionalities are abstract, they should be mixed with other
classes to get a complete implementation: these abstract classes are then called
``mixins``.

Two mixins are currently available:
Three mixins are currently available:

- {class}`PeriodMixin <fcollections.core.PeriodMixin>`: works with time series
and can analyze the data to get the time coverage or detect holes
and can analyze the data to get the time coverage or detect holes.
- {class}`HalfOrbitMixin <fcollections.core.HalfOrbitMixin>`: works with half
orbit granules to extract the half orbit range.
- {class}`DownloadMixin <fcollections.core.DownloadMixin>`. Appends a download
endpoint to a remote database.

Expand Down
29 changes: 29 additions & 0 deletions docs/implementations/l2_lr_ssh.md
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,35 @@ The following examples can be used to build complex queries
:::
::::

## Generic information

Generic information about the files set can be extracted. This available
information is specific to the [mixins](#mixins) used to build
{class}`fcollections.implementations.NetcdfFilesDatabaseSwotLRL2`

::::{tab-set}
:::{tab-item} PeriodMixin
- Time coverage
```python
fc.time_coverage(subset='Expert', version='P?D?', phase='SCIENCE')
```
- Time holes
```python
fc.time_holes(subset='Expert', version='P?D?', phase='SCIENCE')
```
:::
:::{tab-item} HalfOrbitMixin
- Half orbit range
```python
fc.half_orbit_range(subset='Expert', version='P?D?', phase='SCIENCE')
```
- Cycle range
```python
fc.cycle_range(subset='Expert', version='P?D?', phase='SCIENCE')
```
:::
::::


## Stack for temporal analysis

Expand Down
29 changes: 29 additions & 0 deletions docs/implementations/l3_lr_ssh.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,35 @@ The following examples can be used to build complex queries
:::
::::

## Generic information

Generic information about the files set can be extracted. This available
information is specific to the [mixins](#mixins) used to build
{class}`fcollections.implementations.NetcdfFilesDatabaseSwotLRL3`

::::{tab-set}
:::{tab-item} PeriodMixin
- Time coverage
```python
fc.time_coverage(subset='Expert')
```
- Time holes
```python
fc.time_holes(subset='Expert')
```
:::
:::{tab-item} HalfOrbitMixin
- Half orbit range
```python
fc.half_orbit_range(subset='Expert', phase='SCIENCE')
```
- Cycle range
```python
fc.cycle_range(subset='Expert', phase='SCIENCE')
```
:::
::::

## Stack for temporal analysis

The most prominent functionality is the ability to stack the half orbits when
Expand Down
2 changes: 2 additions & 0 deletions src/fcollections/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
from ._mixins import (
DiscreteTimesMixin,
DownloadMixin,
HalfOrbitMixin,
ITemporalMixin,
PeriodMixin,
)
Expand Down Expand Up @@ -91,6 +92,7 @@
"DownloadMixin",
"CaseType",
"PeriodMixin",
"HalfOrbitMixin",
"GroupMetadata",
"group_metadata_from_netcdf",
"VariableMetadata",
Expand Down
133 changes: 96 additions & 37 deletions src/fcollections/core/_filesdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -498,17 +498,47 @@ def _files(
with warnings.catch_warnings():
warnings.simplefilter("error", category=PerformanceWarning)
try:
subset_filters = self.unmixer.pick_subset(self.subsets, **kwargs)
kwargs |= subset_filters
unmix = False
except IndexError:
logger.debug("No subset, nothing to unmix")
self._pick_subset_before_files_scan(kwargs)
unmix = False
except PerformanceWarning:
logger.debug(
"Subset unmixing could not be done before the files scan: it will be done after."
)

predicates, kwargs = self._auto_build_predicates_and_filters(predicates, kwargs)

df = self.discoverer.to_dataframe(
predicates=predicates,
stat_fields=stat_fields,
enable_layouts=self.enable_layouts,
**{k: kwargs[k] for k in kwargs if k in self.listing_parameters},
)

postprocesses = map(
lambda item: item[1],
filter(
lambda item: item[0],
[
(unmix and self.unmixer is not None, self.unmixer),
(deduplicate and self.deduplicator is not None, self.deduplicator),
(
sort and self.sort_keys is not None,
lambda df: df.sort_values(self.sort_keys, ignore_index=True),
),
],
),
)

for postprocess in postprocesses:
df = postprocess(df)

return df

def _auto_build_predicates_and_filters(
self,
predicates: tp.Iterable[tp.Callable[[tuple[tp.Any, ...]], bool]],
kwargs,
):
# Auto-build declared predicates and additionnal filters.
predicates = list(predicates)
if self.filter_builders is not None:
Expand Down Expand Up @@ -559,32 +589,7 @@ def _files(
filters.keys(),
)

df = self.discoverer.to_dataframe(
predicates=predicates,
stat_fields=stat_fields,
enable_layouts=self.enable_layouts,
**{k: kwargs[k] for k in kwargs if k in self.listing_parameters},
)

postprocesses = map(
lambda item: item[1],
filter(
lambda item: item[0],
[
(unmix and self.unmixer is not None, self.unmixer),
(deduplicate and self.deduplicator is not None, self.deduplicator),
(
sort and self.sort_keys is not None,
lambda df: df.sort_values(self.sort_keys, ignore_index=True),
),
],
),
)

for postprocess in postprocesses:
df = postprocess(df)

return df
return predicates, kwargs

def _query(self, **kwargs) -> xr_t.Dataset | None:
"""Query a dataset by reading selected files in file system.
Expand Down Expand Up @@ -834,13 +839,14 @@ def _filter_values(self, filter_name: str, **kwargs: tp.Any) -> set[tp.Any]:
# there is an ambiguity. We need the subsets list whether the listing
# is quick or slow. In case of a slow computation of subsets, a
# warning will be emitted
if unmix:
try:
kwargs |= self.unmixer.pick_subset(self.subsets, **kwargs)
except IndexError:
logger.debug("No subset, nothing to unmix")
if unmix and self.unmixer is not None:
self._pick_subset_before_files_scan(kwargs)

return {x[0] for x in metadata_collector.discover(**kwargs)}
edited_filters = kwargs.copy()
_, edited_filters = self._auto_build_predicates_and_filters(
[], edited_filters
)
return {x[0] for x in metadata_collector.discover(**edited_filters)}
except LayoutMismatchError:
msg = (
"Layouts are enabled and should contain information about "
Expand All @@ -852,6 +858,43 @@ def _filter_values(self, filter_name: str, **kwargs: tp.Any) -> set[tp.Any]:
warnings.warn(msg, PerformanceWarning)
return set(self.list_files(**kwargs, unmix=unmix)[filter_name])

def _pick_subset_before_files_scan(self, filters: dict[str, tp.Any]):
"""Pick a subset without listing the files metadata.

Listing the files metadata can be costly. If possible, we wish to
determine the subset by parsing the information in the folders.

Parameters
----------
filters
Filters that needs to be applied on the files. These should also
contain the mandatory filters for subset selection. This parameter
is modified in place to add the automatically set filters for the
subset (refer to :attr:`SubsetUnmixer.auto_pick_last`)

Warns
-----
PerformanceWarning
In case the subset information cannot be found in the folders.
"""
try:
# Sanitize field before
file_name_convention = self.layouts[0].conventions[-1]

sanitized_subset_parameters = {
field_name: file_name_convention.get_field(field_name).sanitize(
reference_value
)
for field_name, reference_value in filters.items()
if field_name in self.unmixer.partition_keys
}

filters |= self.unmixer.pick_subset(
self.subsets, **sanitized_subset_parameters
)
except IndexError:
logger.debug("No subset, nothing to unmix")

def _validate_field(self, filter_name: str):
"""Check a field is declared in one of the layouts.

Expand Down Expand Up @@ -1134,3 +1177,19 @@ def build_filter(cls, *args: tp.Any) -> dict[str, tp.Any]:
@abc.abstractmethod
def parameter(cls) -> FileNameField:
"""Initialization parameter for the class."""

@classmethod
@abc.abstractmethod
def target_fields(cls) -> tuple[str, ...]:
"""Target fields of the predicate.

The target fields determines which part of a metadata record (related
information about one file) is used. This can be useful to detect
incompatibilities between the predicate filtering, and more classic
filtering.

Returns
-------
tuple[str, ...]
Field names used by the predicate to filter a record.
"""
Loading
Loading