Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 52 additions & 36 deletions lindi/LindiH5pyFile/LindiReferenceFileSystemStore.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
from typing import Literal, Dict, Union
from typing import Literal, Dict, Union, Any, Mapping, Sequence, Union
import json
import base64
import requests
from zarr.storage import Store as ZarrStore
from zarr.storage import Store as ZarrStore, FSStore
from zarr.context import Context


from ..LocalCache.LocalCache import ChunkTooLargeError, LocalCache

Expand Down Expand Up @@ -104,6 +106,9 @@ def __init__(self, rfs: dict, *, mode: Literal["r", "r+"] = "r+", local_cache: U
else:
raise Exception(f"Problem with {k}: value must be a string or a list")

import fsspec
self.fs_store = FSStore("", fs=fsspec.filesystem("reference", fo=rfs))

# validate templates
if "templates" in rfs:
for k, v in rfs["templates"].items():
Expand All @@ -121,40 +126,41 @@ def __contains__(self, key: object):
return key in self.rfs["refs"]

def __getitem__(self, key: str):
if key not in self.rfs["refs"]:
raise KeyError(key)
x = self.rfs["refs"][key]
if isinstance(x, str):
if x.startswith("base64:"):
return base64.b64decode(x[len("base64:"):])
else:
return x.encode("utf-8")
elif isinstance(x, dict):
return json.dumps(x).encode("utf-8")
elif isinstance(x, list):
if len(x) != 3:
raise Exception("list must have 3 elements") # pragma: no cover
url = x[0]
offset = x[1]
length = x[2]
if '{{' in url and '}}' in url and 'templates' in self.rfs:
for k, v in self.rfs["templates"].items():
url = url.replace("{{" + k + "}}", v)
if self.local_cache is not None:
x = self.local_cache.get_remote_chunk(url=url, offset=offset, size=length)
if x is not None:
return x
val = _read_bytes_from_url_or_path(url, offset, length)
if self.local_cache is not None:
try:
self.local_cache.put_remote_chunk(url=url, offset=offset, size=length, data=val)
except ChunkTooLargeError:
print(f'Warning: unable to cache chunk of size {length} on LocalCache (key: {key})')
return val
else:
# should not happen given checks in __init__, but self.rfs is mutable
# and contains mutable lists
raise Exception(f"Problem with {key}: value {x} must be a string or a list")
return self.fs_store[key]
# if key not in self.rfs["refs"]:
# raise KeyError(key)
# x = self.rfs["refs"][key]
# if isinstance(x, str):
# if x.startswith("base64:"):
# return base64.b64decode(x[len("base64:"):])
# else:
# return x.encode("utf-8")
# elif isinstance(x, dict):
# return json.dumps(x).encode("utf-8")
# elif isinstance(x, list):
# if len(x) != 3:
# raise Exception("list must have 3 elements") # pragma: no cover
# url = x[0]
# offset = x[1]
# length = x[2]
# if '{{' in url and '}}' in url and 'templates' in self.rfs:
# for k, v in self.rfs["templates"].items():
# url = url.replace("{{" + k + "}}", v)
# if self.local_cache is not None:
# x = self.local_cache.get_remote_chunk(url=url, offset=offset, size=length)
# if x is not None:
# return x
# val = _read_bytes_from_url_or_path(url, offset, length)
# if self.local_cache is not None:
# try:
# self.local_cache.put_remote_chunk(url=url, offset=offset, size=length, data=val)
# except ChunkTooLargeError:
# print(f'Warning: unable to cache chunk of size {length} on LocalCache (key: {key})')
# return val
# else:
# # should not happen given checks in __init__, but self.rfs is mutable
# # and contains mutable lists
# raise Exception(f"Problem with {key}: value {x} must be a string or a list")

def __setitem__(self, key: str, value: bytes):
# We intentionally do not allow value to be a dict here! When the rfs is
Expand Down Expand Up @@ -193,6 +199,15 @@ def is_listable(self):
def is_erasable(self):
return False

def getitems(
self, keys: Sequence[str], *, contexts: Mapping[str, Context]
) -> Mapping[str, Any]:
"""Retrieve data from multiple keys.

See zarr.storage.FSStore.getitems for more information.
"""
return self.fs_store.getitems(keys, contexts=contexts)

@staticmethod
def replace_meta_file_contents_with_dicts_in_rfs(rfs: dict) -> None:
"""
Expand Down Expand Up @@ -247,6 +262,7 @@ def _read_bytes_from_url_or_path(url_or_path: str, offset: int, length: int):
Read a range of bytes from a URL.
"""
from ..LindiRemfile.LindiRemfile import _resolve_url

if url_or_path.startswith('http://') or url_or_path.startswith('https://'):
url_resolved = _resolve_url(url_or_path) # handle DANDI auth
range_start = offset
Expand Down