From bdf9a9cbfdc7a167f47820d4abe374d03b779faa Mon Sep 17 00:00:00 2001 From: Ben Hayes Date: Fri, 13 Feb 2026 17:35:52 +0000 Subject: [PATCH] Add Google Cloud Storage (GCS) backend Add GcsStorage backend mirroring existing AWS S3 and Azure ABFS patterns, with full support for gcsfs/fsspec, URL parsing, logging, caching, config, and tests against fake-gcs-server emulator. Co-Authored-By: Claude Opus 4.6 --- docker-compose.yml | 6 + oasis_data_manager/filestore/backends/base.py | 2 +- oasis_data_manager/filestore/backends/gcs.py | 162 ++++++++++++++++++ oasis_data_manager/filestore/config.py | 14 ++ oasis_data_manager/filestore/filestore.py | 29 ++++ oasis_data_manager/filestore/log.py | 11 ++ optional-package.in | 1 + requirements.in | 1 + tests/filestorage/test_caching.py | 64 ++++++- tests/filestorage/test_gcs.py | 36 ++++ tests/filestorage/test_general.py | 17 ++ 11 files changed, 340 insertions(+), 3 deletions(-) create mode 100644 oasis_data_manager/filestore/backends/gcs.py create mode 100644 tests/filestorage/test_gcs.py diff --git a/docker-compose.yml b/docker-compose.yml index 0bd47d4..534175d 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -15,3 +15,9 @@ services: image: mcr.microsoft.com/azure-storage/azurite ports: - "127.0.0.1:10000:10000" + fake-gcs-server: + container_name: "${FAKE_GCS_DOCKER_NAME-fake_gcs_server}" + image: fsouza/fake-gcs-server + command: ["-scheme", "http", "-port", "4443"] + ports: + - "127.0.0.1:4443:4443" diff --git a/oasis_data_manager/filestore/backends/base.py b/oasis_data_manager/filestore/backends/base.py index a6c1e96..164d32b 100755 --- a/oasis_data_manager/filestore/backends/base.py +++ b/oasis_data_manager/filestore/backends/base.py @@ -208,7 +208,7 @@ def get_from_cache(self, reference, required=False, no_cache_target=None): enable_etag_cache = ( self.cache_root and not self._is_valid_url(reference) - and any(p in ("s3", "s3a", "az", "abfs", "abfss") for p in protocols) + and any(p in ("s3", "s3a", "az", "abfs", "abfss", "gs", "gcs") for p in protocols) ) # No cache root configured, just return data diff --git a/oasis_data_manager/filestore/backends/gcs.py b/oasis_data_manager/filestore/backends/gcs.py new file mode 100644 index 0000000..e07b939 --- /dev/null +++ b/oasis_data_manager/filestore/backends/gcs.py @@ -0,0 +1,162 @@ +import os +from pathlib import Path +from typing import Optional +from urllib import parse + +import fsspec + +from ..log import set_gcs_log_level +from .base import BaseStorage + + +class GcsStorage(BaseStorage): + fsspec_filesystem_class = fsspec.get_filesystem_class("gcs") + storage_connector = "GCS" + + def __init__( + self, + bucket_name: Optional[str] = None, + project: Optional[str] = None, + token: Optional[str] = None, + access: Optional[str] = "full_control", + endpoint_url: Optional[str] = None, + default_location: Optional[str] = None, + consistency: Optional[str] = None, + requester_pays: bool = False, + session_kwargs: Optional[dict] = None, + root_dir="", + gcs_log_level="", + **kwargs, + ): + """Storage Connector for Google Cloud Storage + + Store objects in a GCS bucket. Uses gcsfs/fsspec for filesystem access. + + Parameters + ---------- + :param bucket_name: GCS bucket name + :type bucket_name: str + + :param project: GCP project ID + :type project: str + + :param token: Authentication method - None (auto-detect), "google_default", + "anon", "browser", "cache", or path to service account JSON + :type token: str + + :param access: Access level - "read_only", "read_write", "full_control" + :type access: str + + :param endpoint_url: Custom endpoint (e.g. for fake-gcs-server emulator) + :type endpoint_url: str + + :param default_location: Default bucket location + :type default_location: str + + :param consistency: Write check method - "none", "size", "md5", "crc32c" + :type consistency: str + + :param requester_pays: Whether the requester pays for requests + :type requester_pays: bool + + :param session_kwargs: Dict for aiohttp session (proxy settings etc.) + :type session_kwargs: dict + """ + self._bucket = None + self.bucket_name = bucket_name + + self.project = project + self.token = token + self.access = access + self.endpoint_url = endpoint_url + self.default_location = default_location + self.consistency = consistency + self.requester_pays = requester_pays + self.session_kwargs = session_kwargs or {} + self.gcs_log_level = gcs_log_level + set_gcs_log_level(self.gcs_log_level) + + root_dir = os.path.join(self.bucket_name or "", root_dir) + if root_dir.startswith(os.path.sep): + root_dir = root_dir[1:] + if root_dir.endswith(os.path.sep): + root_dir = root_dir[:-1] + + super(GcsStorage, self).__init__(root_dir=root_dir, **kwargs) + + @property + def config_options(self): + return { + "bucket_name": self.bucket_name, + "project": self.project, + "token": self.token, + "access": self.access, + "endpoint_url": self.endpoint_url, + "default_location": self.default_location, + "consistency": self.consistency, + "requester_pays": self.requester_pays, + "session_kwargs": self.session_kwargs, + "root_dir": str(Path(self.root_dir).relative_to(self.bucket_name)), + "gcs_log_level": self.gcs_log_level, + } + + def get_fsspec_storage_options(self): + options = { + "project": self.project, + "token": self.token, + "access": self.access, + "requester_pays": self.requester_pays, + } + if self.endpoint_url: + options["endpoint_url"] = self.endpoint_url + if self.default_location: + options["default_location"] = self.default_location + if self.consistency: + options["consistency"] = self.consistency + if self.session_kwargs: + options["session_kwargs"] = self.session_kwargs + return options + + def get_storage_url(self, filename=None, suffix="tar.gz", encode_params=True): + filename = ( + filename if filename is not None else self._get_unique_filename(suffix) + ) + + params = {} + if encode_params: + if self.project: + params["project"] = self.project + + if self.token: + params["token"] = self.token + + if self.access: + params["access"] = self.access + + if self.endpoint_url: + params["endpoint"] = self.endpoint_url + + return ( + filename, + f"gs://{os.path.join(self.root_dir, filename)}{'?' if params else ''}{parse.urlencode(params) if params else ''}", + ) + + def url(self, object_name, parameters=None, expire=None): + """Return URL for object + + Parameters + ---------- + :param object_name: 'key' or name of object in bucket + :type object_name: str + + :param parameters: Dictionary of parameters + :type parameters: dict + + :param expire: Time in seconds for the URL to remain valid + :type expire: int + + :return: URL as string + :rtype str + """ + blob_key = self.fs._join(object_name) + return self.fs.fs.url(blob_key) diff --git a/oasis_data_manager/filestore/config.py b/oasis_data_manager/filestore/config.py index d742bf4..dd82287 100644 --- a/oasis_data_manager/filestore/config.py +++ b/oasis_data_manager/filestore/config.py @@ -75,12 +75,26 @@ class AbfsStorageConfig(BaseStorageConfig): endpoint_url: NotRequired[str] +class GcsStorageConfig(BaseStorageConfig): + bucket_name: NotRequired[str] + project: NotRequired[str] + token: NotRequired[str] + access: NotRequired[str] + endpoint_url: NotRequired[str] + default_location: NotRequired[str] + consistency: NotRequired[str] + requester_pays: NotRequired[bool] + session_kwargs: NotRequired[dict] + gcs_log_level: NotRequired[str] + + class StorageConfig(TypedDict): storage_class: str options: Union[ LocalStorageConfig, S3StorageConfig, AbfsStorageConfig, + GcsStorageConfig, ] diff --git a/oasis_data_manager/filestore/filestore.py b/oasis_data_manager/filestore/filestore.py index 382f67a..1dd4b4b 100644 --- a/oasis_data_manager/filestore/filestore.py +++ b/oasis_data_manager/filestore/filestore.py @@ -74,6 +74,33 @@ def split_azure_url(parts): ) +def split_gcs_url(parts): + query = parse.parse_qs(parts.query) + params = { + "project": query.get("project", [None])[0], + "token": query.get("token", ["anon"])[0], + "access": query.get("access", [None])[0], + } + + endpoint = query.get("endpoint", [None])[0] + if endpoint: + params["endpoint_url"] = endpoint + + return ( + urllib.parse.urlunparse( + ( + parts[0], + parts[1], + parts[2], + parts[3], + "", + parts[5], + ) + ), + params, + ) + + def parse_url_options(path): opener = fsspec.open @@ -84,6 +111,8 @@ def parse_url_options(path): path, params = split_s3_url(url_parts) elif url_parts.scheme in ["az", "abfs", "adl"]: path, params = split_azure_url(url_parts) + elif url_parts.scheme in ["gs", "gcs"]: + path, params = split_gcs_url(url_parts) return opener, path, params diff --git a/oasis_data_manager/filestore/log.py b/oasis_data_manager/filestore/log.py index 194d9c0..65e2352 100644 --- a/oasis_data_manager/filestore/log.py +++ b/oasis_data_manager/filestore/log.py @@ -21,3 +21,14 @@ def set_azure_log_level(log_level): except AttributeError: LOG_LEVEL = logging.WARNING logging.getLogger("azure").setLevel(LOG_LEVEL) + + +def set_gcs_log_level(log_level): + try: + LOG_LEVEL = getattr(logging, log_level.upper()) + except AttributeError: + LOG_LEVEL = logging.WARNING + logging.getLogger("google").setLevel(LOG_LEVEL) + logging.getLogger("gcsfs").setLevel(LOG_LEVEL) + logging.getLogger("google.auth").setLevel(LOG_LEVEL) + logging.getLogger("google.cloud").setLevel(LOG_LEVEL) diff --git a/optional-package.in b/optional-package.in index ac2b166..1e9bcb9 100644 --- a/optional-package.in +++ b/optional-package.in @@ -6,4 +6,5 @@ dask-sql distributed geopandas==0.14.4 pyogrio +gcsfs>=2023.12.2 s3fs>=2023.12.2 diff --git a/requirements.in b/requirements.in index 511ef58..e2f11ee 100644 --- a/requirements.in +++ b/requirements.in @@ -12,6 +12,7 @@ hypothesis httpx respx geodatasets +google-cloud-storage ipdb h5py xxhash diff --git a/tests/filestorage/test_caching.py b/tests/filestorage/test_caching.py index 249839c..e9c4aa8 100644 --- a/tests/filestorage/test_caching.py +++ b/tests/filestorage/test_caching.py @@ -10,9 +10,13 @@ from pathlib import Path from typing import Callable +from google.auth.credentials import AnonymousCredentials +from google.cloud import storage as gcs_storage + from oasis_data_manager.errors import OasisException from oasis_data_manager.filestore.backends.aws_s3 import AwsS3Storage from oasis_data_manager.filestore.backends.azure_abfs import AzureABFSStorage +from oasis_data_manager.filestore.backends.gcs import GcsStorage from oasis_data_manager.filestore.backends.base import MissingInputsException, BaseStorage @@ -111,6 +115,49 @@ def azure_cleanup(): return storage, azure_upload_file, azure_cleanup +def _setup_gcs_storage(cache_dir): + gcs_endpoint = "http://localhost:4443" + bucket_name = f"test-bucket-{uuid.uuid4().hex[:8]}" + + # GCS client pointing to fake-gcs-server (anonymous credentials for emulator) + client = gcs_storage.Client( + project="test-project", + credentials=AnonymousCredentials(), + ) + client._http._auth_request.session.verify = False + client._connection.API_BASE_URL = gcs_endpoint + client._connection.api_url = f"{gcs_endpoint}/storage/v1" + + # Create bucket + bucket = client.bucket(bucket_name) + bucket.create() + + # upload_file function + def gcs_upload_file(key, content): + blob = client.bucket(bucket_name).blob(key) + blob.upload_from_string(content) + + # cleanup function + def gcs_cleanup(): + bucket = client.bucket(bucket_name) + blobs = list(bucket.list_blobs()) + for blob in blobs: + blob.delete() + bucket.delete() + + # Storage instance + storage = GcsStorage( + bucket_name=bucket_name, + root_dir="", + cache_dir=cache_dir, + endpoint_url=gcs_endpoint, + token="anon", + project="test-project", + ) + + return storage, gcs_upload_file, gcs_cleanup + + @pytest.fixture(scope="function") def storage_context(request): config = request.param @@ -126,6 +173,8 @@ def storage_context(request): storage, upload_fn, cleanup_fn = _setup_s3_storage(cache_dir) elif backend_type == "azure": storage, upload_fn, cleanup_fn = _setup_azure_storage(cache_dir) + elif backend_type == "gcs": + storage, upload_fn, cleanup_fn = _setup_gcs_storage(cache_dir) else: raise OasisException(f"Unsupported backend_type ({backend_type}) for testing") @@ -139,6 +188,7 @@ def storage_context(request): @pytest.mark.parametrize("storage_context", [ {"backend": "s3"}, {"backend": "azure"}, + {"backend": "gcs"}, ], indirect=True) def test_etag_present(storage_context): """Test etag actually present""" @@ -156,6 +206,7 @@ def test_etag_present(storage_context): @pytest.mark.parametrize("storage_context", [ {"backend": "s3"}, {"backend": "azure"}, + {"backend": "gcs"}, ], indirect=True) def test_first_fetch_downloads_and_caches(storage_context): """Test basic download and caching""" @@ -173,6 +224,7 @@ def test_first_fetch_downloads_and_caches(storage_context): @pytest.mark.parametrize("storage_context", [ {"backend": "s3"}, {"backend": "azure"}, + {"backend": "gcs"}, ], indirect=True) def test_cache_hit_returns_same_file(storage_context): """Test that fetching the same file again hits the cache (ETag match)""" @@ -193,6 +245,7 @@ def test_cache_hit_returns_same_file(storage_context): @pytest.mark.parametrize("storage_context", [ {"backend": "s3"}, {"backend": "azure"}, + {"backend": "gcs"}, ], indirect=True) def test_cache_hit_does_not_redownload(storage_context, monkeypatch): """Test cache is actually used and file isn't redownloaded""" @@ -225,6 +278,7 @@ def fail_if_called(*args, **kwargs): @pytest.mark.parametrize("storage_context", [ {"backend": "s3"}, {"backend": "azure"}, + {"backend": "gcs"}, ], indirect=True) def test_cache_miss_on_etag_change(storage_context): """Test that changing file in S3 updates the cache""" @@ -248,6 +302,7 @@ def test_cache_miss_on_etag_change(storage_context): @pytest.mark.parametrize("storage_context", [ {"backend": "s3"}, {"backend": "azure"}, + {"backend": "gcs"}, ], indirect=True) def test_missing_file_raises_exception_when_required(storage_context): """Test that requesting a missing file with required=True raises exception""" @@ -259,6 +314,7 @@ def test_missing_file_raises_exception_when_required(storage_context): @pytest.mark.parametrize("storage_context", [ {"backend": "s3"}, {"backend": "azure"}, + {"backend": "gcs"}, ], indirect=True) def test_missing_file_returns_none_when_not_required(storage_context): """Test that requesting a missing file with required=False returns None""" @@ -270,6 +326,7 @@ def test_missing_file_returns_none_when_not_required(storage_context): @pytest.mark.parametrize("storage_context", [ {"backend": "s3"}, {"backend": "azure"}, + {"backend": "gcs"}, ], indirect=True) def test_missing_remote_etag_skips_cache(storage_context, caplog, monkeypatch): """Test missing remote etag skips hashing and returns file""" @@ -281,8 +338,8 @@ def test_missing_remote_etag_skips_cache(storage_context, caplog, monkeypatch): # Patch fs.info to simulate no ETag original_info = storage_context.storage.fs.fs.info - def fake_info(path): - d = original_info(path) + def fake_info(path, **kwargs): + d = original_info(path, **kwargs) d.pop("ETag", None) d.pop("etag", None) return d @@ -306,6 +363,7 @@ def fake_info(path): @pytest.mark.parametrize("storage_context", [ {"backend": "s3"}, {"backend": "azure"}, + {"backend": "gcs"}, ], indirect=True) def test_missing_etag_file_triggers_redownload(storage_context, monkeypatch): """Test missing etag file redownloads file""" @@ -341,6 +399,7 @@ def fake_open(path, mode="rb", *args, **kwargs): @pytest.mark.parametrize("storage_context", [ {"backend": "s3", "cache_dir": None}, {"backend": "azure", "cache_dir": None}, + {"backend": "gcs", "cache_dir": None}, ], indirect=True) def test_no_cache_target_required_when_cache_disabled(storage_context): """Test no_cache_target=None throws exception when cache_dir is None""" @@ -351,6 +410,7 @@ def test_no_cache_target_required_when_cache_disabled(storage_context): @pytest.mark.parametrize("storage_context", [ {"backend": "s3"}, {"backend": "azure"}, + {"backend": "gcs"}, ], indirect=True) def test_get_from_cache_directory_raises_error(storage_context): """Test get directory raises error""" diff --git a/tests/filestorage/test_gcs.py b/tests/filestorage/test_gcs.py new file mode 100644 index 0000000..3dfaf43 --- /dev/null +++ b/tests/filestorage/test_gcs.py @@ -0,0 +1,36 @@ +import os.path +import uuid + +from oasis_data_manager.filestore.backends.gcs import GcsStorage +from oasis_data_manager.filestore.config import get_storage_from_config + + +def make_storage(**kwargs): + kwargs.setdefault("bucket_name", uuid.uuid4().hex) + kwargs.setdefault("token", "anon") + kwargs.setdefault("endpoint_url", "http://localhost:4443") + kwargs.setdefault("project", "test-project") + kwargs.setdefault("cache_dir", None) + + fs = GcsStorage(**kwargs) + fs.fs.fs.mkdir(fs.bucket_name) + fs.fs.mkdirs("") + + return fs + + +def test_no_root_dir_is_set___root_dir_is_empty(): + storage = make_storage() + + assert storage.root_dir == storage.bucket_name + + +def test_storage_constructed_from_config_matches_initial(): + storage = make_storage(root_dir="test_root") + + result = get_storage_from_config(storage.to_config()) + + assert storage.root_dir == os.path.join(storage.bucket_name, "test_root") + assert isinstance(result, GcsStorage) + assert result.root_dir == storage.root_dir + assert result.bucket_name == storage.bucket_name diff --git a/tests/filestorage/test_general.py b/tests/filestorage/test_general.py index 617ee15..15a052d 100644 --- a/tests/filestorage/test_general.py +++ b/tests/filestorage/test_general.py @@ -10,6 +10,7 @@ from oasis_data_manager.filestore.backends.aws_s3 import AwsS3Storage from oasis_data_manager.filestore.backends.azure_abfs import AzureABFSStorage +from oasis_data_manager.filestore.backends.gcs import GcsStorage from oasis_data_manager.filestore.backends.local import LocalStorage test_file_name = "test_file.txt" @@ -48,6 +49,21 @@ def azure_abfs_storage(**kwargs): yield fs +@contextlib.contextmanager +def gcs_storage(**kwargs): + kwargs.setdefault("bucket_name", uuid.uuid4().hex) + kwargs.setdefault("token", "anon") + kwargs.setdefault("endpoint_url", "http://localhost:4443") + kwargs.setdefault("project", "test-project") + kwargs.setdefault("cache_dir", None) + + fs = GcsStorage(**kwargs) + fs.fs.fs.mkdir(fs.bucket_name) + fs.fs.mkdirs("") + + yield fs + + @contextlib.contextmanager def local_storage(root_dir="", **kwargs): kwargs.setdefault("cache_dir", None) @@ -59,6 +75,7 @@ def local_storage(root_dir="", **kwargs): local_storage, azure_abfs_storage, aws_s3_storage, + gcs_storage, ]