Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion oasis_data_manager/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '0.1.6'
__version__ = '0.2.0'
33 changes: 15 additions & 18 deletions oasis_data_manager/filestore/backends/azure_abfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,24 +114,21 @@ def get_fsspec_storage_options(self):
def connection_string(self):
if self._connection_string:
return self._connection_string
else:
fsspec_storage_options = {
"anon": not self.account_key,
"account_name": self.account_name,
"account_key": self.account_key,
"use_ssl": self.azure_ssl,
}
fs = self.fsspec_filesystem_class(**fsspec_storage_options)

cs = ""
if self.endpoint_url:
cs += f"BlobEndpoint={self.endpoint_url};"
if fs.account_name:
cs += f"AccountName={fs.account_name};"
if fs.account_key:
cs += f"AccountKey={fs.account_key};"

return cs

cs_parts = [
f"DefaultEndpointsProtocol={'https' if self.azure_ssl else 'http'}",
f"AccountName={self.account_name}",
f"AccountKey={self.account_key}"
]

if self.endpoint_url:
# Azurite requires the account name in the endpoint path
endpoint = self.endpoint_url.rstrip('/')
if self.account_name not in endpoint:
endpoint = f"{endpoint}/{self.account_name}"
cs_parts.append(f"BlobEndpoint={endpoint};")

return ";".join(cs_parts)

def get_storage_url(self, filename=None, suffix="tar.gz", encode_params=True):
filename = (
Expand Down
113 changes: 83 additions & 30 deletions oasis_data_manager/filestore/backends/base.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
import base64
import contextlib
import io
import logging
import os
import shutil
Expand All @@ -16,6 +14,7 @@
from fsspec.implementations.dirfs import DirFileSystem

from oasis_data_manager.errors import OasisException
import xxhash

LOG_FILE_SUFFIX = "txt"
ARCHIVE_FILE_SUFFIX = "tar.gz"
Expand Down Expand Up @@ -191,39 +190,93 @@ def get_from_cache(self, reference, required=False, no_cache_target=None):
:return: Absolute filepath to stored Object
:rtype str
"""
# null ref given
if not reference:
if required:
raise MissingInputsException(reference)
else:
return None
return None

# check if the file is in the cache, if so return that path
cache_filename = base64.b64encode(reference.encode()).decode()
if self.cache_root:
cached_file = os.path.join(self.cache_root, cache_filename)
else:
os.makedirs(os.path.dirname(no_cache_target), exist_ok=True)
cached_file = no_cache_target

if self.cache_root:
if os.path.exists(cached_file):
logging.info("Get from Cache: {}".format(reference))
return cached_file

if self._is_valid_url(reference):
# if the file is not in the path, and is a url download it to the cache
response = urlopen(reference)
fdata = response.read()

with io.open(cached_file, "w+b") as f:
f.write(fdata)
logging.info("Get from URL: {}".format(reference))
fs_protocol = getattr(self.fs.fs, "protocol", None)

# Normalize protocol into a tuple of strings
if isinstance(fs_protocol, str):
protocols = (fs_protocol,)
elif isinstance(fs_protocol, (list, tuple)):
protocols = tuple(fs_protocol)
else:
# otherwise get it from the storage and add it to the cache
self.fs.get(reference, cached_file, recursive=True)
protocols = ()

enable_etag_cache = (
self.cache_root
and not self._is_valid_url(reference)
and any(p in ("s3", "s3a", "az", "abfs", "abfss") for p in protocols)
)

return cached_file
# No cache root configured, just return data
if not enable_etag_cache:
if not no_cache_target:
raise OasisException("Error: caching disabled for this filesystem and no_cache_target not provided")
Path(no_cache_target).parent.mkdir(parents=True, exist_ok=True)
if self._is_valid_url(reference):
with urlopen(reference, timeout=30) as r:
data = r.read()
with open(no_cache_target, "wb") as f:
f.write(data)
logging.info("Get from URL: {}".format(reference))
else:
self.fs.get(reference, no_cache_target, recursive=True)
logging.info("Get from Filestore: {}".format(reference))
return no_cache_target

# Caching enabled
# Get metadata
try:
info = self.fs.info(reference)
except FileNotFoundError:
if required:
raise MissingInputsException(reference)
return None

# Raise error if type is not file
if info.get("type") == "directory":
raise OasisException(f"Directories are not supported in get_from_cache: {reference}")

remote_etag = info.get("ETag") or info.get("etag")
if remote_etag is None:
self.logger.warning(f"ETag missing for {reference} — skipping cache and returning fresh download")
if no_cache_target is not None:
dest_path = no_cache_target
Path(dest_path).parent.mkdir(parents=True, exist_ok=True)
else:
tmp = tempfile.NamedTemporaryFile(delete=False)
dest_path = tmp.name
tmp.close()
with self.fs.open(reference, "rb") as src, open(dest_path, "wb") as out:
shutil.copyfileobj(src, out)
return dest_path

# Create Cache dir
content_dir = Path(self.cache_root)
content_dir.mkdir(parents=True, exist_ok=True)

# Create reference hash for fast lookup
ref_hash = xxhash.xxh64(reference.encode()).hexdigest()
file_dir = content_dir / ref_hash
file_dir.mkdir(parents=True, exist_ok=True)
file_path = file_dir / "data"
etag_path = file_dir / "etag"

# Return if etag matches
if file_path.exists() and etag_path.exists():
cached_etag = etag_path.read_text()
if cached_etag == remote_etag:
return str(file_path)

# Redownload data and write etag
with self.fs.open(reference, "rb") as f, open(file_path, "wb") as out:
shutil.copyfileobj(f, out)
etag_path.write_text(remote_etag)

return str(file_path)

def get(self, reference, output_path="", subdir="", required=False):
"""Retrieve stored object and stores it in the output path
Expand Down Expand Up @@ -386,7 +439,7 @@ def fs(self) -> fsspec.AbstractFileSystem:
self._fs = StrictRootDirFs(
path=self.root_dir,
fs=(
self.fsspec_filesystem_class(**self.get_fsspec_storage_options())
self.fsspec_filesystem_class(**self.get_fsspec_storage_options(), asynchronous=False)
if self.fsspec_filesystem_class
else None
),
Expand Down
1 change: 1 addition & 0 deletions requirements-package.in
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ fastparquet
fsspec>=2023.12.2
pandas
typing_extensions
xxhash
1 change: 1 addition & 0 deletions requirements.in
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,6 @@ respx
geodatasets
ipdb
h5py
xxhash
-r requirements-package.in
-r optional-package.in
36 changes: 16 additions & 20 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
#
# This file is autogenerated by pip-compile with Python 3.11
# This file is autogenerated by pip-compile with Python 3.12
# by the following command:
#
# pip-compile
#
adlfs==2024.12.0
# via -r /home/sam/work/oasis-repos/OasisDataManager/optional-package.in
# via -r optional-package.in
aiobotocore==2.22.0
# via s3fs
aiohappyeyeballs==2.6.1
Expand Down Expand Up @@ -44,7 +44,7 @@ azure-identity==1.23.0
azure-storage-blob==12.25.1
# via adlfs
boto3==1.37.3
# via -r /home/sam/work/oasis-repos/OasisDataManager/optional-package.in
# via -r optional-package.in
botocore==1.37.3
# via
# aiobotocore
Expand Down Expand Up @@ -98,28 +98,28 @@ cryptography==44.0.3
# pyjwt
dask[array,dataframe]==2025.5.0
# via
# -r /home/sam/work/oasis-repos/OasisDataManager/optional-package.in
# -r optional-package.in
# dask-geopandas
# dask-sql
# distributed
dask-geopandas==0.4.3
# via -r /home/sam/work/oasis-repos/OasisDataManager/optional-package.in
# via -r optional-package.in
dask-sql==2024.5.0
# via -r /home/sam/work/oasis-repos/OasisDataManager/optional-package.in
# via -r optional-package.in
decorator==5.2.1
# via
# ipdb
# ipython
distributed==2025.5.0
# via
# -r /home/sam/work/oasis-repos/OasisDataManager/optional-package.in
# -r optional-package.in
# dask-sql
executing==2.2.0
# via stack-data
fastapi==0.115.12
# via dask-sql
fastparquet==2024.11.0
# via -r /home/sam/work/oasis-repos/OasisDataManager/requirements-package.in
# via -r requirements-package.in
filelock==3.18.0
# via pytest-mypy
fiona==1.10.1
Expand All @@ -132,7 +132,7 @@ frozenlist==1.6.0
# aiosignal
fsspec==2025.3.2
# via
# -r /home/sam/work/oasis-repos/OasisDataManager/requirements-package.in
# -r requirements-package.in
# adlfs
# dask
# fastparquet
Expand All @@ -141,7 +141,7 @@ geodatasets==2024.8.0
# via -r requirements.in
geopandas==0.14.4
# via
# -r /home/sam/work/oasis-repos/OasisDataManager/optional-package.in
# -r optional-package.in
# dask-geopandas
h11==0.16.0
# via
Expand All @@ -164,8 +164,6 @@ idna==3.10
# httpx
# requests
# yarl
importlib-metadata==8.7.0
# via dask
iniconfig==2.1.0
# via pytest
ipdb==0.13.13
Expand Down Expand Up @@ -237,7 +235,7 @@ packaging==25.0
# pytest
pandas==2.2.3
# via
# -r /home/sam/work/oasis-repos/OasisDataManager/requirements-package.in
# -r requirements-package.in
# dask
# dask-sql
# fastparquet
Expand Down Expand Up @@ -292,7 +290,7 @@ pyjwt[crypto]==2.10.1
# msal
# pyjwt
pyogrio==0.11.0
# via -r /home/sam/work/oasis-repos/OasisDataManager/optional-package.in
# via -r optional-package.in
pyproj==3.7.1
# via geopandas
pytest==8.3.5
Expand Down Expand Up @@ -327,7 +325,7 @@ requests==2.32.3
respx==0.22.0
# via -r requirements.in
s3fs==2025.3.2
# via -r /home/sam/work/oasis-repos/OasisDataManager/optional-package.in
# via -r optional-package.in
s3transfer==0.11.3
# via boto3
shapely==2.1.0
Expand Down Expand Up @@ -377,17 +375,15 @@ types-setuptools==80.4.0.20250511
# via -r requirements.in
typing-extensions==4.13.2
# via
# -r /home/sam/work/oasis-repos/OasisDataManager/requirements-package.in
# -r requirements-package.in
# anyio
# azure-core
# azure-identity
# azure-storage-blob
# fastapi
# ipython
# mypy
# pydantic
# pydantic-core
# types-boto3
# typing-inspection
typing-inspection==0.4.0
# via pydantic
Expand All @@ -406,12 +402,12 @@ wcwidth==0.2.13
# via prompt-toolkit
wrapt==1.17.2
# via aiobotocore
xxhash==3.6.0
# via -r requirements.in
yarl==1.20.0
# via aiohttp
zict==3.0.0
# via distributed
zipp==3.21.0
# via importlib-metadata

# The following packages are considered to be unsafe in a requirements file:
# setuptools
Loading
Loading