diff --git a/lindi/LindiH5pyFile/LindiReferenceFileSystemStore.py b/lindi/LindiH5pyFile/LindiReferenceFileSystemStore.py index 7b51fcc..1816806 100644 --- a/lindi/LindiH5pyFile/LindiReferenceFileSystemStore.py +++ b/lindi/LindiH5pyFile/LindiReferenceFileSystemStore.py @@ -1,8 +1,10 @@ -from typing import Literal, Dict, Union +from typing import Literal, Dict, Union, Any, Mapping, Sequence, Union import json import base64 import requests -from zarr.storage import Store as ZarrStore +from zarr.storage import Store as ZarrStore, FSStore +from zarr.context import Context + from ..LocalCache.LocalCache import ChunkTooLargeError, LocalCache @@ -104,6 +106,9 @@ def __init__(self, rfs: dict, *, mode: Literal["r", "r+"] = "r+", local_cache: U else: raise Exception(f"Problem with {k}: value must be a string or a list") + import fsspec + self.fs_store = FSStore("", fs=fsspec.filesystem("reference", fo=rfs)) + # validate templates if "templates" in rfs: for k, v in rfs["templates"].items(): @@ -121,40 +126,41 @@ def __contains__(self, key: object): return key in self.rfs["refs"] def __getitem__(self, key: str): - if key not in self.rfs["refs"]: - raise KeyError(key) - x = self.rfs["refs"][key] - if isinstance(x, str): - if x.startswith("base64:"): - return base64.b64decode(x[len("base64:"):]) - else: - return x.encode("utf-8") - elif isinstance(x, dict): - return json.dumps(x).encode("utf-8") - elif isinstance(x, list): - if len(x) != 3: - raise Exception("list must have 3 elements") # pragma: no cover - url = x[0] - offset = x[1] - length = x[2] - if '{{' in url and '}}' in url and 'templates' in self.rfs: - for k, v in self.rfs["templates"].items(): - url = url.replace("{{" + k + "}}", v) - if self.local_cache is not None: - x = self.local_cache.get_remote_chunk(url=url, offset=offset, size=length) - if x is not None: - return x - val = _read_bytes_from_url_or_path(url, offset, length) - if self.local_cache is not None: - try: - self.local_cache.put_remote_chunk(url=url, offset=offset, size=length, data=val) - except ChunkTooLargeError: - print(f'Warning: unable to cache chunk of size {length} on LocalCache (key: {key})') - return val - else: - # should not happen given checks in __init__, but self.rfs is mutable - # and contains mutable lists - raise Exception(f"Problem with {key}: value {x} must be a string or a list") + return self.fs_store[key] + # if key not in self.rfs["refs"]: + # raise KeyError(key) + # x = self.rfs["refs"][key] + # if isinstance(x, str): + # if x.startswith("base64:"): + # return base64.b64decode(x[len("base64:"):]) + # else: + # return x.encode("utf-8") + # elif isinstance(x, dict): + # return json.dumps(x).encode("utf-8") + # elif isinstance(x, list): + # if len(x) != 3: + # raise Exception("list must have 3 elements") # pragma: no cover + # url = x[0] + # offset = x[1] + # length = x[2] + # if '{{' in url and '}}' in url and 'templates' in self.rfs: + # for k, v in self.rfs["templates"].items(): + # url = url.replace("{{" + k + "}}", v) + # if self.local_cache is not None: + # x = self.local_cache.get_remote_chunk(url=url, offset=offset, size=length) + # if x is not None: + # return x + # val = _read_bytes_from_url_or_path(url, offset, length) + # if self.local_cache is not None: + # try: + # self.local_cache.put_remote_chunk(url=url, offset=offset, size=length, data=val) + # except ChunkTooLargeError: + # print(f'Warning: unable to cache chunk of size {length} on LocalCache (key: {key})') + # return val + # else: + # # should not happen given checks in __init__, but self.rfs is mutable + # # and contains mutable lists + # raise Exception(f"Problem with {key}: value {x} must be a string or a list") def __setitem__(self, key: str, value: bytes): # We intentionally do not allow value to be a dict here! When the rfs is @@ -193,6 +199,15 @@ def is_listable(self): def is_erasable(self): return False + def getitems( + self, keys: Sequence[str], *, contexts: Mapping[str, Context] + ) -> Mapping[str, Any]: + """Retrieve data from multiple keys. + + See zarr.storage.FSStore.getitems for more information. + """ + return self.fs_store.getitems(keys, contexts=contexts) + @staticmethod def replace_meta_file_contents_with_dicts_in_rfs(rfs: dict) -> None: """ @@ -247,6 +262,7 @@ def _read_bytes_from_url_or_path(url_or_path: str, offset: int, length: int): Read a range of bytes from a URL. """ from ..LindiRemfile.LindiRemfile import _resolve_url + if url_or_path.startswith('http://') or url_or_path.startswith('https://'): url_resolved = _resolve_url(url_or_path) # handle DANDI auth range_start = offset