From c447ee3b94f43b65dbc366d4b13bf22efd2982ce Mon Sep 17 00:00:00 2001 From: Anish Kothikar <42143884+SkylordA@users.noreply.github.com> Date: Wed, 4 Feb 2026 13:32:20 +0000 Subject: [PATCH 1/3] fix/base_storage_caching (#30) * saving work, changes get_from_cache to hash based on file contents instead of file names, adds xxhash to requirements * flake8 * changes/correction to get_from_cache, adds test cases * updates get_from_cache to use etags for s3 and azure, adds tests for s3, updates test.yml to start localstack for s3 tests * update localstack command * test ci * test ci * test ci * test ci * test ci * test ci * adds azure tests, corrects azure connection_string property to account for azurite * updates unit tests for get_from_cache to parameterize remote storage backends which use the same test cases * refactor test remote backend setup * reorder imports * adds extra tests, corrects one test with monkeypatch * update requirements and add etag check test for sanity --- .../filestore/backends/azure_abfs.py | 33 +- oasis_data_manager/filestore/backends/base.py | 113 ++++-- requirements.in | 1 + requirements.txt | 36 +- tests/filestorage/test_caching.py | 363 ++++++++++++++++++ 5 files changed, 478 insertions(+), 68 deletions(-) create mode 100644 tests/filestorage/test_caching.py diff --git a/oasis_data_manager/filestore/backends/azure_abfs.py b/oasis_data_manager/filestore/backends/azure_abfs.py index 5eea8eb..8a16d10 100755 --- a/oasis_data_manager/filestore/backends/azure_abfs.py +++ b/oasis_data_manager/filestore/backends/azure_abfs.py @@ -114,24 +114,21 @@ def get_fsspec_storage_options(self): def connection_string(self): if self._connection_string: return self._connection_string - else: - fsspec_storage_options = { - "anon": not self.account_key, - "account_name": self.account_name, - "account_key": self.account_key, - "use_ssl": self.azure_ssl, - } - fs = self.fsspec_filesystem_class(**fsspec_storage_options) - - cs = "" - if self.endpoint_url: - cs += f"BlobEndpoint={self.endpoint_url};" - if fs.account_name: - cs += f"AccountName={fs.account_name};" - if fs.account_key: - cs += f"AccountKey={fs.account_key};" - - return cs + + cs_parts = [ + f"DefaultEndpointsProtocol={'https' if self.azure_ssl else 'http'}", + f"AccountName={self.account_name}", + f"AccountKey={self.account_key}" + ] + + if self.endpoint_url: + # Azurite requires the account name in the endpoint path + endpoint = self.endpoint_url.rstrip('/') + if self.account_name not in endpoint: + endpoint = f"{endpoint}/{self.account_name}" + cs_parts.append(f"BlobEndpoint={endpoint};") + + return ";".join(cs_parts) def get_storage_url(self, filename=None, suffix="tar.gz", encode_params=True): filename = ( diff --git a/oasis_data_manager/filestore/backends/base.py b/oasis_data_manager/filestore/backends/base.py index e5b4c23..a6c1e96 100755 --- a/oasis_data_manager/filestore/backends/base.py +++ b/oasis_data_manager/filestore/backends/base.py @@ -1,6 +1,4 @@ -import base64 import contextlib -import io import logging import os import shutil @@ -16,6 +14,7 @@ from fsspec.implementations.dirfs import DirFileSystem from oasis_data_manager.errors import OasisException +import xxhash LOG_FILE_SUFFIX = "txt" ARCHIVE_FILE_SUFFIX = "tar.gz" @@ -191,39 +190,93 @@ def get_from_cache(self, reference, required=False, no_cache_target=None): :return: Absolute filepath to stored Object :rtype str """ - # null ref given if not reference: if required: raise MissingInputsException(reference) - else: - return None + return None - # check if the file is in the cache, if so return that path - cache_filename = base64.b64encode(reference.encode()).decode() - if self.cache_root: - cached_file = os.path.join(self.cache_root, cache_filename) - else: - os.makedirs(os.path.dirname(no_cache_target), exist_ok=True) - cached_file = no_cache_target - - if self.cache_root: - if os.path.exists(cached_file): - logging.info("Get from Cache: {}".format(reference)) - return cached_file - - if self._is_valid_url(reference): - # if the file is not in the path, and is a url download it to the cache - response = urlopen(reference) - fdata = response.read() - - with io.open(cached_file, "w+b") as f: - f.write(fdata) - logging.info("Get from URL: {}".format(reference)) + fs_protocol = getattr(self.fs.fs, "protocol", None) + + # Normalize protocol into a tuple of strings + if isinstance(fs_protocol, str): + protocols = (fs_protocol,) + elif isinstance(fs_protocol, (list, tuple)): + protocols = tuple(fs_protocol) else: - # otherwise get it from the storage and add it to the cache - self.fs.get(reference, cached_file, recursive=True) + protocols = () + + enable_etag_cache = ( + self.cache_root + and not self._is_valid_url(reference) + and any(p in ("s3", "s3a", "az", "abfs", "abfss") for p in protocols) + ) - return cached_file + # No cache root configured, just return data + if not enable_etag_cache: + if not no_cache_target: + raise OasisException("Error: caching disabled for this filesystem and no_cache_target not provided") + Path(no_cache_target).parent.mkdir(parents=True, exist_ok=True) + if self._is_valid_url(reference): + with urlopen(reference, timeout=30) as r: + data = r.read() + with open(no_cache_target, "wb") as f: + f.write(data) + logging.info("Get from URL: {}".format(reference)) + else: + self.fs.get(reference, no_cache_target, recursive=True) + logging.info("Get from Filestore: {}".format(reference)) + return no_cache_target + + # Caching enabled + # Get metadata + try: + info = self.fs.info(reference) + except FileNotFoundError: + if required: + raise MissingInputsException(reference) + return None + + # Raise error if type is not file + if info.get("type") == "directory": + raise OasisException(f"Directories are not supported in get_from_cache: {reference}") + + remote_etag = info.get("ETag") or info.get("etag") + if remote_etag is None: + self.logger.warning(f"ETag missing for {reference} — skipping cache and returning fresh download") + if no_cache_target is not None: + dest_path = no_cache_target + Path(dest_path).parent.mkdir(parents=True, exist_ok=True) + else: + tmp = tempfile.NamedTemporaryFile(delete=False) + dest_path = tmp.name + tmp.close() + with self.fs.open(reference, "rb") as src, open(dest_path, "wb") as out: + shutil.copyfileobj(src, out) + return dest_path + + # Create Cache dir + content_dir = Path(self.cache_root) + content_dir.mkdir(parents=True, exist_ok=True) + + # Create reference hash for fast lookup + ref_hash = xxhash.xxh64(reference.encode()).hexdigest() + file_dir = content_dir / ref_hash + file_dir.mkdir(parents=True, exist_ok=True) + file_path = file_dir / "data" + etag_path = file_dir / "etag" + + # Return if etag matches + if file_path.exists() and etag_path.exists(): + cached_etag = etag_path.read_text() + if cached_etag == remote_etag: + return str(file_path) + + # Redownload data and write etag + with self.fs.open(reference, "rb") as f, open(file_path, "wb") as out: + shutil.copyfileobj(f, out) + etag_path.write_text(remote_etag) + + return str(file_path) def get(self, reference, output_path="", subdir="", required=False): """Retrieve stored object and stores it in the output path @@ -386,7 +439,7 @@ def fs(self) -> fsspec.AbstractFileSystem: self._fs = StrictRootDirFs( path=self.root_dir, fs=( - self.fsspec_filesystem_class(**self.get_fsspec_storage_options()) + self.fsspec_filesystem_class(**self.get_fsspec_storage_options(), asynchronous=False) if self.fsspec_filesystem_class else None ), diff --git a/requirements.in b/requirements.in index f5e275f..511ef58 100644 --- a/requirements.in +++ b/requirements.in @@ -14,5 +14,6 @@ respx geodatasets ipdb h5py +xxhash -r requirements-package.in -r optional-package.in diff --git a/requirements.txt b/requirements.txt index 0e894bf..1a08eb3 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,11 +1,11 @@ # -# This file is autogenerated by pip-compile with Python 3.11 +# This file is autogenerated by pip-compile with Python 3.12 # by the following command: # # pip-compile # adlfs==2024.12.0 - # via -r /home/sam/work/oasis-repos/OasisDataManager/optional-package.in + # via -r optional-package.in aiobotocore==2.22.0 # via s3fs aiohappyeyeballs==2.6.1 @@ -44,7 +44,7 @@ azure-identity==1.23.0 azure-storage-blob==12.25.1 # via adlfs boto3==1.37.3 - # via -r /home/sam/work/oasis-repos/OasisDataManager/optional-package.in + # via -r optional-package.in botocore==1.37.3 # via # aiobotocore @@ -98,28 +98,28 @@ cryptography==44.0.3 # pyjwt dask[array,dataframe]==2025.5.0 # via - # -r /home/sam/work/oasis-repos/OasisDataManager/optional-package.in + # -r optional-package.in # dask-geopandas # dask-sql # distributed dask-geopandas==0.4.3 - # via -r /home/sam/work/oasis-repos/OasisDataManager/optional-package.in + # via -r optional-package.in dask-sql==2024.5.0 - # via -r /home/sam/work/oasis-repos/OasisDataManager/optional-package.in + # via -r optional-package.in decorator==5.2.1 # via # ipdb # ipython distributed==2025.5.0 # via - # -r /home/sam/work/oasis-repos/OasisDataManager/optional-package.in + # -r optional-package.in # dask-sql executing==2.2.0 # via stack-data fastapi==0.115.12 # via dask-sql fastparquet==2024.11.0 - # via -r /home/sam/work/oasis-repos/OasisDataManager/requirements-package.in + # via -r requirements-package.in filelock==3.18.0 # via pytest-mypy fiona==1.10.1 @@ -132,7 +132,7 @@ frozenlist==1.6.0 # aiosignal fsspec==2025.3.2 # via - # -r /home/sam/work/oasis-repos/OasisDataManager/requirements-package.in + # -r requirements-package.in # adlfs # dask # fastparquet @@ -141,7 +141,7 @@ geodatasets==2024.8.0 # via -r requirements.in geopandas==0.14.4 # via - # -r /home/sam/work/oasis-repos/OasisDataManager/optional-package.in + # -r optional-package.in # dask-geopandas h11==0.16.0 # via @@ -164,8 +164,6 @@ idna==3.10 # httpx # requests # yarl -importlib-metadata==8.7.0 - # via dask iniconfig==2.1.0 # via pytest ipdb==0.13.13 @@ -237,7 +235,7 @@ packaging==25.0 # pytest pandas==2.2.3 # via - # -r /home/sam/work/oasis-repos/OasisDataManager/requirements-package.in + # -r requirements-package.in # dask # dask-sql # fastparquet @@ -292,7 +290,7 @@ pyjwt[crypto]==2.10.1 # msal # pyjwt pyogrio==0.11.0 - # via -r /home/sam/work/oasis-repos/OasisDataManager/optional-package.in + # via -r optional-package.in pyproj==3.7.1 # via geopandas pytest==8.3.5 @@ -327,7 +325,7 @@ requests==2.32.3 respx==0.22.0 # via -r requirements.in s3fs==2025.3.2 - # via -r /home/sam/work/oasis-repos/OasisDataManager/optional-package.in + # via -r optional-package.in s3transfer==0.11.3 # via boto3 shapely==2.1.0 @@ -377,17 +375,15 @@ types-setuptools==80.4.0.20250511 # via -r requirements.in typing-extensions==4.13.2 # via - # -r /home/sam/work/oasis-repos/OasisDataManager/requirements-package.in + # -r requirements-package.in # anyio # azure-core # azure-identity # azure-storage-blob # fastapi - # ipython # mypy # pydantic # pydantic-core - # types-boto3 # typing-inspection typing-inspection==0.4.0 # via pydantic @@ -406,12 +402,12 @@ wcwidth==0.2.13 # via prompt-toolkit wrapt==1.17.2 # via aiobotocore +xxhash==3.6.0 + # via -r requirements.in yarl==1.20.0 # via aiohttp zict==3.0.0 # via distributed -zipp==3.21.0 - # via importlib-metadata # The following packages are considered to be unsafe in a requirements file: # setuptools diff --git a/tests/filestorage/test_caching.py b/tests/filestorage/test_caching.py new file mode 100644 index 0000000..249839c --- /dev/null +++ b/tests/filestorage/test_caching.py @@ -0,0 +1,363 @@ +import boto3 +import logging +import os +import pytest +import tempfile +import uuid +from azure.core.exceptions import ResourceExistsError +from azure.storage.blob import BlobServiceClient +from dataclasses import dataclass +from pathlib import Path +from typing import Callable + +from oasis_data_manager.errors import OasisException +from oasis_data_manager.filestore.backends.aws_s3 import AwsS3Storage +from oasis_data_manager.filestore.backends.azure_abfs import AzureABFSStorage +from oasis_data_manager.filestore.backends.base import MissingInputsException, BaseStorage + + +@dataclass +class StorageContext: + storage: BaseStorage + upload_file: Callable[[str, bytes], None] # upload file function + + +def _setup_s3_storage(cache_dir): + localstack_endpoint = "http://localhost:4566" + bucket_name = f"test-bucket-{uuid.uuid4().hex[:8]}" + + # boto3 client/resource pointing to LocalStack + s3 = boto3.resource( + "s3", + endpoint_url=localstack_endpoint, + aws_access_key_id="test", + aws_secret_access_key="test", + region_name="eu-west-2", + ) + # Create bucket + s3.create_bucket( + Bucket=bucket_name, + CreateBucketConfiguration={"LocationConstraint": "eu-west-2"}, + ) + + # upload_file function + def s3_upload_file(key, content): + s3.Object(bucket_name, key).put(Body=content) + + # cleanup function + def s3_cleanup(): + bucket = s3.Bucket(bucket_name) + bucket.objects.all().delete() + bucket.delete() + + # Storage instance + storage = AwsS3Storage( + bucket_name=bucket_name, + root_dir="", + cache_dir=cache_dir, + endpoint_url=localstack_endpoint, + access_key="test", + secret_key="test", + region_name="eu-west-2", + ) + + return storage, s3_upload_file, s3_cleanup + + +def _setup_azure_storage(cache_dir): + azurite_acc_name = "devstoreaccount1" + azurite_acc_key = ( + "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==" + ) + azurite_endpoint = "http://127.0.0.1:10000" + container_name = f"test-container-{uuid.uuid4().hex[:8]}" + + # Blob service client (Azurite) + blob_service = BlobServiceClient( + account_url=f"{azurite_endpoint}/{azurite_acc_name}", + credential=azurite_acc_key, + ) + + # Create container + try: + blob_service.create_container(container_name) + print(f"Container '{container_name}' created successfully.") + except ResourceExistsError: + print(f"Container '{container_name}' already exists.") + except Exception as e: + raise OasisException(f"An error occurred for creating the azurite container: {e}") + container_client = blob_service.get_container_client(container_name) + + # upload_file function + def azure_upload_file(key, content): + container_client.upload_blob(key, content, overwrite=True) + + # cleanup function + def azure_cleanup(): + for blob in container_client.list_blobs(): + container_client.delete_blob(blob.name) + container_client.delete_container() + + # Storage instance + storage = AzureABFSStorage( + account_name=azurite_acc_name, + account_key=azurite_acc_key, + azure_container=container_name, + endpoint_url=azurite_endpoint, + cache_dir=cache_dir, + root_dir="", + ) + + return storage, azure_upload_file, azure_cleanup + + +@pytest.fixture(scope="function") +def storage_context(request): + config = request.param + backend_type = config["backend"] + temp_cache = tempfile.TemporaryDirectory() + cache_dir = ( + temp_cache.name + if "cache_dir" not in config or config["cache_dir"] is not None + else None + ) + + if backend_type == "s3": + storage, upload_fn, cleanup_fn = _setup_s3_storage(cache_dir) + elif backend_type == "azure": + storage, upload_fn, cleanup_fn = _setup_azure_storage(cache_dir) + else: + raise OasisException(f"Unsupported backend_type ({backend_type}) for testing") + + yield StorageContext(storage=storage, upload_file=upload_fn) + + # Cleanup + cleanup_fn() + temp_cache.cleanup() + + +@pytest.mark.parametrize("storage_context", [ + {"backend": "s3"}, + {"backend": "azure"}, +], indirect=True) +def test_etag_present(storage_context): + """Test etag actually present""" + key = "etag-check/file.txt" + content = b"etag test" + + storage_context.upload_file(key, content) + + info = storage_context.storage.fs.info(key) + + etag = info.get("ETag") or info.get("etag") + assert etag is not None + + +@pytest.mark.parametrize("storage_context", [ + {"backend": "s3"}, + {"backend": "azure"}, +], indirect=True) +def test_first_fetch_downloads_and_caches(storage_context): + """Test basic download and caching""" + key = "test/file.txt" + content = b"hello world" + + storage_context.upload_file(key, content) + + cached_path = storage_context.storage.get_from_cache(key) + assert cached_path + assert Path(cached_path).exists() + assert Path(cached_path).read_bytes() == content + + +@pytest.mark.parametrize("storage_context", [ + {"backend": "s3"}, + {"backend": "azure"}, +], indirect=True) +def test_cache_hit_returns_same_file(storage_context): + """Test that fetching the same file again hits the cache (ETag match)""" + key = "test/file2.txt" + content = b"cached content" + + storage_context.upload_file(key, content) + + # First fetch + cached_path_1 = storage_context.storage.get_from_cache(key) + # Second fetch + cached_path_2 = storage_context.storage.get_from_cache(key) + + assert cached_path_1 == cached_path_2 + assert Path(cached_path_2).read_bytes() == content + + +@pytest.mark.parametrize("storage_context", [ + {"backend": "s3"}, + {"backend": "azure"}, +], indirect=True) +def test_cache_hit_does_not_redownload(storage_context, monkeypatch): + """Test cache is actually used and file isn't redownloaded""" + key = "speed/file.txt" + content = b"x" * 1024 + + storage = storage_context.storage + storage_context.upload_file(key, content) + + # First fetch populates cache + storage.get_from_cache(key) + + called = False + + def fail_if_called(*args, **kwargs): + nonlocal called + called = True + raise AssertionError("Data stream should not be opened on cache hit") + + # fs.open is only used during download + monkeypatch.setattr(storage.fs, "open", fail_if_called) + + # Cache hit + result = storage.get_from_cache(key) + + assert Path(result).read_bytes() == content + assert called is False + + +@pytest.mark.parametrize("storage_context", [ + {"backend": "s3"}, + {"backend": "azure"}, +], indirect=True) +def test_cache_miss_on_etag_change(storage_context): + """Test that changing file in S3 updates the cache""" + key = "test/file3.txt" + content1 = b"first version" + content2 = b"second version" + + storage_context.upload_file(key, content1) + + cached_path_1 = storage_context.storage.get_from_cache(key) + + # Overwrite the S3 object + storage_context.upload_file(key, content2) + + cached_path_2 = storage_context.storage.get_from_cache(key) + + assert Path(cached_path_1).parent == Path(cached_path_2).parent + assert Path(cached_path_2).read_bytes() == content2 + + +@pytest.mark.parametrize("storage_context", [ + {"backend": "s3"}, + {"backend": "azure"}, +], indirect=True) +def test_missing_file_raises_exception_when_required(storage_context): + """Test that requesting a missing file with required=True raises exception""" + key = "nonexistent/file.txt" + with pytest.raises(MissingInputsException): + storage_context.storage.get_from_cache(key, required=True) + + +@pytest.mark.parametrize("storage_context", [ + {"backend": "s3"}, + {"backend": "azure"}, +], indirect=True) +def test_missing_file_returns_none_when_not_required(storage_context): + """Test that requesting a missing file with required=False returns None""" + key = "nonexistent/file.txt" + result = storage_context.storage.get_from_cache(key, required=False) + assert result is None + + +@pytest.mark.parametrize("storage_context", [ + {"backend": "s3"}, + {"backend": "azure"}, +], indirect=True) +def test_missing_remote_etag_skips_cache(storage_context, caplog, monkeypatch): + """Test missing remote etag skips hashing and returns file""" + key = "noetag/file.txt" + content = b"something" + + storage_context.upload_file(key, content) + + # Patch fs.info to simulate no ETag + original_info = storage_context.storage.fs.fs.info + + def fake_info(path): + d = original_info(path) + d.pop("ETag", None) + d.pop("etag", None) + return d + + monkeypatch.setattr( + storage_context.storage.fs.fs, + "info", fake_info + ) + + caplog.set_level(logging.WARNING) + result = storage_context.storage.get_from_cache(key) + + # Check warning message output + assert f"ETag missing for {key} — skipping cache and returning fresh download" in caplog.text + + # Cache dir should NOT contain ref_hash folder + assert os.listdir(storage_context.storage.cache_root) == [] + assert Path(result).read_bytes() == content + + +@pytest.mark.parametrize("storage_context", [ + {"backend": "s3"}, + {"backend": "azure"}, +], indirect=True) +def test_missing_etag_file_triggers_redownload(storage_context, monkeypatch): + """Test missing etag file redownloads file""" + key = "etagmissing/file.txt" + content = b"original" + + storage = storage_context.storage + storage_context.upload_file(key, content) + + old_cached_path = storage.get_from_cache(key) + + # Remove etag file to trigger redownload + etag_path = Path(old_cached_path).parent / "etag" + etag_path.unlink() + + fs_open_called = {"called": False} + original_open = storage.fs.open + + def fake_open(path, mode="rb", *args, **kwargs): + if "r" in mode: + fs_open_called["called"] = True + return original_open(path, mode, *args, **kwargs) + + monkeypatch.setattr(storage.fs, "open", fake_open) + + # Redownload should happen + new_cached_path = storage.get_from_cache(key) + + assert Path(new_cached_path).read_bytes() == content + assert fs_open_called["called"] is True + + +@pytest.mark.parametrize("storage_context", [ + {"backend": "s3", "cache_dir": None}, + {"backend": "azure", "cache_dir": None}, +], indirect=True) +def test_no_cache_target_required_when_cache_disabled(storage_context): + """Test no_cache_target=None throws exception when cache_dir is None""" + with pytest.raises(OasisException): + storage_context.storage.get_from_cache("anything", no_cache_target=None) + + +@pytest.mark.parametrize("storage_context", [ + {"backend": "s3"}, + {"backend": "azure"}, +], indirect=True) +def test_get_from_cache_directory_raises_error(storage_context): + """Test get directory raises error""" + prefix = "somedir/" + key = f"{prefix}file.txt" + content = b"x" + storage_context.upload_file(key, content) + + with pytest.raises(OasisException): + storage_context.storage.get_from_cache(prefix) From 326380690b8b325593499f0064a6ba77aba2e810 Mon Sep 17 00:00:00 2001 From: Sam Gamble Date: Thu, 5 Feb 2026 10:21:13 +0000 Subject: [PATCH 2/3] fix missing package requ --- requirements-package.in | 1 + 1 file changed, 1 insertion(+) diff --git a/requirements-package.in b/requirements-package.in index 65806e2..fc82d01 100644 --- a/requirements-package.in +++ b/requirements-package.in @@ -2,3 +2,4 @@ fastparquet fsspec>=2023.12.2 pandas typing_extensions +xxhash From bcee0aec9a77577fd00f76f4462f5b6d8ea24a71 Mon Sep 17 00:00:00 2001 From: Sam Gamble Date: Fri, 6 Feb 2026 11:15:16 +0000 Subject: [PATCH 3/3] set version 0.2.0 --- oasis_data_manager/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/oasis_data_manager/__init__.py b/oasis_data_manager/__init__.py index 2fb2513..7fd229a 100644 --- a/oasis_data_manager/__init__.py +++ b/oasis_data_manager/__init__.py @@ -1 +1 @@ -__version__ = '0.1.6' +__version__ = '0.2.0'