Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
5e1ee05
use path mapper registries and customizations module
ungarj Oct 27, 2025
fa1540d
dev commit
ungarj Oct 28, 2025
46dc14e
rename Source to Sentinel2Source
ungarj Oct 28, 2025
a49a97d
replace Archive with Source instances
ungarj Oct 28, 2025
fead4c6
dev commit
ungarj Oct 29, 2025
892ef82
fix eo_bands and static catalog
ungarj Oct 29, 2025
27f86e1
deactivate obsolete test; fix eostac cat_baseurl functionality
ungarj Oct 29, 2025
a54df3d
trim the fat
ungarj Oct 29, 2025
2a0f0c6
added first test for known sources
ungarj Oct 29, 2025
908b774
extend source tests
ungarj Oct 30, 2025
f0a818c
restructured modules
ungarj Oct 30, 2025
71efdac
add guesser functions
ungarj Oct 30, 2025
4ecb087
dev commit
ungarj Nov 3, 2025
23f286e
only provide URL to a single collection to make sure a source is unique
ungarj Nov 3, 2025
3ff03e2
fix some tests
ungarj Nov 3, 2025
e858f65
fix some more tests
ungarj Nov 3, 2025
68b92de
fix typo
ungarj Nov 3, 2025
a18fb1c
use cql2 to filter items on static catalog
ungarj Nov 3, 2025
f0d8b5a
streamline code
ungarj Nov 3, 2025
138dd47
make asset_mpath() and get_item_property() handle multiple keys; add …
ungarj Nov 4, 2025
1033c13
fix query param
ungarj Nov 4, 2025
55a0191
add metadata xml mapper for CDSE
ungarj Nov 4, 2025
64ecf9c
fix test
ungarj Nov 4, 2025
02f8cc1
remove duplicate cdse test
ungarj Nov 4, 2025
f9a52f3
removed deprecated mapchete_eo.geometry module
ungarj Nov 4, 2025
0d40569
set bounds CRS
ungarj Nov 4, 2025
9726756
update test mapchete files
ungarj Nov 4, 2025
58a8ee5
move from catalogs to collections
ungarj Nov 4, 2025
0f732f2
fix writing static catalog
ungarj Nov 5, 2025
87b208f
fix utm_search
ungarj Nov 5, 2025
67fd285
fix utm_search
ungarj Nov 5, 2025
d5e805e
increase test coverage
ungarj Nov 5, 2025
21f7275
enable multiple sources and add test
ungarj Nov 6, 2025
b6e7c9b
fix query format
ungarj Nov 6, 2025
293971a
allow for bounds parameter per source
ungarj Nov 6, 2025
0f1cc4c
clean up
ungarj Nov 7, 2025
301fdd6
make EOSTAC driver work, some typing for searching and adding basic test
Scartography Nov 10, 2025
15ffec7
area=box(*area.bounds), as search area instaed of passing the whole p…
Scartography Nov 11, 2025
20a8d23
fix optional time
ungarj Nov 12, 2025
80a786f
Merge pull request #19 from mapchete/eostac-earthsearch-copdem
ungarj Nov 12, 2025
b392d77
no point in having private methods for this
ungarj Nov 12, 2025
c9a740d
make metadata parsing lazy
ungarj Nov 12, 2025
e7a1784
don't bother searching if area is empty anyways
ungarj Nov 12, 2025
886af71
enable lazy loading stac item
ungarj Nov 12, 2025
098701b
extend lazy item loads capabilities by adding an item properties cache
ungarj Nov 13, 2025
b25fde6
remove dev KeyError
ungarj Nov 13, 2025
457b903
fix recursion
ungarj Nov 13, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file removed mapchete_eo/archives/__init__.py
Empty file.
65 changes: 0 additions & 65 deletions mapchete_eo/archives/base.py

This file was deleted.

148 changes: 94 additions & 54 deletions mapchete_eo/base.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from __future__ import annotations

import warnings
import logging
from functools import cached_property
from typing import Any, Callable, List, Optional, Type, Union
from typing import Any, Callable, List, Optional, Sequence, Type, Union, Dict, Generator

import croniter
from mapchete import Bounds
Expand All @@ -17,23 +18,23 @@
from mapchete.path import MPath
from mapchete.tile import BufferedTile
from mapchete.types import MPathLike, NodataVal, NodataVals
from pydantic import BaseModel
from pydantic import BaseModel, model_validator
from pystac import Item
from rasterio.enums import Resampling
from rasterio.features import geometry_mask
from shapely.geometry import mapping
from shapely.geometry.base import BaseGeometry

from mapchete_eo.archives.base import Archive
from mapchete_eo.exceptions import CorruptedProductMetadata, PreprocessingNotFinished
from mapchete_eo.io import (
products_to_np_array,
products_to_xarray,
read_levelled_cube_to_np_array,
read_levelled_cube_to_xarray,
)
from mapchete_eo.source import Source
from mapchete_eo.product import EOProduct
from mapchete_eo.protocols import EOProductProtocol
from mapchete_eo.search.stac_static import STACStaticCatalog
from mapchete_eo.settings import mapchete_eo_settings
from mapchete_eo.sort import SortMethodConfig, TargetDateSort
from mapchete_eo.time import to_datetime
Expand All @@ -44,13 +45,39 @@

class BaseDriverConfig(BaseModel):
format: str
time: Union[TimeRange, List[TimeRange]]
source: Sequence[Source]
time: Optional[Union[TimeRange, List[TimeRange]]] = None
cat_baseurl: Optional[str] = None
cache: Optional[Any] = None
footprint_buffer: float = 0
area: Optional[Union[MPathLike, dict, type[BaseGeometry]]] = None
preprocessing_tasks: bool = False
archive: Optional[Type[Archive]] = None
search_kwargs: Optional[Dict[str, Any]] = None

@model_validator(mode="before")
def to_list(cls, values: Dict[str, Any]) -> Dict[str, Any]:
"""Expands source to list."""
for field in ["source"]:
value = values.get(field)
if value is not None and not isinstance(value, list):
values[field] = [value]
return values

@model_validator(mode="before")
def deprecate_cat_baseurl(cls, values: Dict[str, Any]) -> Dict[str, Any]:
cat_baseurl = values.get("cat_baseurl")
if cat_baseurl: # pragma: no cover
warnings.warn(
"'cat_baseurl' will be deprecated soon. Please use 'catalog_type=static' in the source.",
category=DeprecationWarning,
stacklevel=2,
)
if values.get("source", []):
raise ValueError(
"deprecated cat_baseurl field found alongside sources."
)
values["source"] = [dict(collection=cat_baseurl, catalog_type="static")]
return values


class EODataCube(base.InputTile):
Expand All @@ -63,7 +90,7 @@ class EODataCube(base.InputTile):

tile: BufferedTile
eo_bands: dict
time: List[TimeRange]
time: Optional[List[TimeRange]]
area: BaseGeometry
area_pixelbuffer: int = 0

Expand All @@ -72,7 +99,7 @@ def __init__(
tile: BufferedTile,
products: Optional[List[EOProductProtocol]],
eo_bands: dict,
time: List[TimeRange],
time: Optional[List[TimeRange]] = None,
input_key: Optional[str] = None,
area: Optional[BaseGeometry] = None,
**kwargs,
Expand Down Expand Up @@ -314,27 +341,25 @@ def filter_products(
"""
Return a filtered list of input products.
"""
if any([start_time, end_time, timestamps]):
if any([start_time, end_time, timestamps]): # pragma: no cover
raise NotImplementedError("time subsets are not yet implemented")

if time_pattern:
# filter products by time pattern
tz = tzutc()
coord_time = [
t.replace(tzinfo=tz)
for t in croniter.croniter_range(
to_datetime(self.start_time),
to_datetime(self.end_time),
time_pattern,
)
]
return [
product
for product in self.products
if product.item.datetime in coord_time
if product.item.datetime
in [
t.replace(tzinfo=tzutc())
for t in croniter.croniter_range(
to_datetime(self.start_time),
to_datetime(self.end_time),
time_pattern,
)
]
]
else:
return self.products
return self.products

def is_empty(self) -> bool: # pragma: no cover
"""
Expand All @@ -358,16 +383,16 @@ def default_read_values(
nodatavals = self.default_read_nodataval
merge_products_by = merge_products_by or self.default_read_merge_products_by
merge_method = merge_method or self.default_read_merge_method
if resampling is None:
resampling = self.default_read_resampling
else:
resampling = (
resampling
if isinstance(resampling, Resampling)
else Resampling[resampling]
)
return dict(
resampling=resampling,
resampling=(
self.default_read_resampling
if resampling is None
else (
resampling
if isinstance(resampling, Resampling)
else Resampling[resampling]
)
),
nodatavals=nodatavals,
merge_products_by=merge_products_by,
merge_method=merge_method,
Expand Down Expand Up @@ -401,8 +426,7 @@ class InputData(base.InputData):
default_preprocessing_task: Callable = staticmethod(EOProduct.from_stac_item)
driver_config_model: Type[BaseDriverConfig] = BaseDriverConfig
params: BaseDriverConfig
archive: Archive
time: Union[TimeRange, List[TimeRange]]
time: Optional[Union[TimeRange, List[TimeRange]]]
area: BaseGeometry
_products: Optional[IndexedFeatures] = None

Expand All @@ -421,6 +445,8 @@ def __init__(
self.standalone = standalone

self.params = self.driver_config_model(**input_params["abstract"])
self.conf_dir = input_params.get("conf_dir")

# we have to make sure, the cache path is absolute
# not quite fond of this solution
if self.params.cache:
Expand All @@ -429,14 +455,18 @@ def __init__(
).absolute_path(base_dir=input_params.get("conf_dir"))
self.area = self._init_area(input_params)
self.time = self.params.time
if self.readonly: # pragma: no cover
return

self.set_archive(base_dir=input_params["conf_dir"])
self.eo_bands = [
eo_band
for source in self.params.source
for eo_band in source.eo_bands(base_dir=self.conf_dir)
]

if self.readonly: # pragma: no cover
return
# don't use preprocessing tasks for Sentinel-2 products:
if self.params.preprocessing_tasks or self.params.cache is not None:
for item in self.archive.items():
for item in self.source_items():
self.add_preprocessing_task(
self.default_preprocessing_task,
fargs=(item,),
Expand All @@ -455,7 +485,7 @@ def __init__(
self.default_preprocessing_task(
item, cache_config=self.params.cache, cache_all=True
)
for item in self.archive.items()
for item in self.source_items()
]
)

Expand All @@ -467,7 +497,7 @@ def _init_area(self, input_params: dict) -> BaseGeometry:
configured_area, configured_area_crs = guess_geometry(
self.params.area,
bounds=Bounds.from_inp(
input_params.get("delimiters", {}).get("bounds"),
input_params.get("delimiters", {}).get("effective_bounds"),
crs=getattr(input_params.get("pyramid"), "crs"),
),
raise_if_empty=False,
Expand All @@ -481,20 +511,30 @@ def _init_area(self, input_params: dict) -> BaseGeometry:
)
return process_area

def set_archive(self, base_dir: MPath):
# this only works with some static archive:
if self.params.cat_baseurl:
self.archive = Archive(
catalog=STACStaticCatalog(
baseurl=MPath(self.params.cat_baseurl).absolute_path(
base_dir=base_dir
),
),
area=self.bbox(mapchete_eo_settings.default_catalog_crs),
time=self.time,
def source_items(self) -> Generator[Item, None, None]:
already_returned = set()
for source in self.params.source:
area = reproject_geometry(
self.area,
src_crs=self.crs,
dst_crs=source.catalog_crs,
)
else:
raise NotImplementedError()
if area.is_empty:
continue
for item in source.search(
time=self.time,
area=area,
base_dir=self.conf_dir,
):
# if item was already found in previous source, skip
if item.id in already_returned:
continue

# if item is new, add to list and yield
already_returned.add(item.id)
item.properties["mapchete_eo:source"] = source
yield item
logger.debug("returned set of %s items", len(already_returned))

def bbox(self, out_crs: Optional[str] = None) -> BaseGeometry:
"""Return data bounding box."""
Expand All @@ -517,15 +557,15 @@ def products(self) -> IndexedFeatures:
return self._products

# TODO: copied it from mapchete_satellite, not yet sure which use case this is
elif self.standalone:
elif self.standalone: # pragma: no cover
raise NotImplementedError()

# if preprocessing tasks are ready, index them for further use
elif self.preprocessing_tasks_results:
return IndexedFeatures(
[
self.get_preprocessing_task_result(item.id)
for item in self.archive.items()
for item in self.source_items()
if not isinstance(item, CorruptedProductMetadata)
],
crs=self.crs,
Expand Down Expand Up @@ -557,7 +597,7 @@ def open(self, tile, **kwargs) -> EODataCube:
return self.input_tile_cls(
tile,
products=tile_products,
eo_bands=self.archive.catalog.eo_bands,
eo_bands=self.eo_bands,
time=self.time,
# passing on the input key is essential so dependent preprocessing tasks can be found!
input_key=self.input_key,
Expand Down
Loading