Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,9 @@ services:
image: mcr.microsoft.com/azure-storage/azurite
ports:
- "127.0.0.1:10000:10000"
fake-gcs-server:
container_name: "${FAKE_GCS_DOCKER_NAME-fake_gcs_server}"
image: fsouza/fake-gcs-server
command: ["-scheme", "http", "-port", "4443"]
ports:
- "127.0.0.1:4443:4443"
2 changes: 1 addition & 1 deletion oasis_data_manager/filestore/backends/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ def get_from_cache(self, reference, required=False, no_cache_target=None):
enable_etag_cache = (
self.cache_root
and not self._is_valid_url(reference)
and any(p in ("s3", "s3a", "az", "abfs", "abfss") for p in protocols)
and any(p in ("s3", "s3a", "az", "abfs", "abfss", "gs", "gcs") for p in protocols)
)

# No cache root configured, just return data
Expand Down
162 changes: 162 additions & 0 deletions oasis_data_manager/filestore/backends/gcs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
import os
from pathlib import Path
from typing import Optional
from urllib import parse

import fsspec

from ..log import set_gcs_log_level
from .base import BaseStorage


class GcsStorage(BaseStorage):
fsspec_filesystem_class = fsspec.get_filesystem_class("gcs")
storage_connector = "GCS"

def __init__(
self,
bucket_name: Optional[str] = None,
project: Optional[str] = None,
token: Optional[str] = None,
access: Optional[str] = "full_control",
endpoint_url: Optional[str] = None,
default_location: Optional[str] = None,
consistency: Optional[str] = None,
requester_pays: bool = False,
session_kwargs: Optional[dict] = None,
root_dir="",
gcs_log_level="",
**kwargs,
):
"""Storage Connector for Google Cloud Storage

Store objects in a GCS bucket. Uses gcsfs/fsspec for filesystem access.

Parameters
----------
:param bucket_name: GCS bucket name
:type bucket_name: str

:param project: GCP project ID
:type project: str

:param token: Authentication method - None (auto-detect), "google_default",
"anon", "browser", "cache", or path to service account JSON
:type token: str

:param access: Access level - "read_only", "read_write", "full_control"
:type access: str

:param endpoint_url: Custom endpoint (e.g. for fake-gcs-server emulator)
:type endpoint_url: str

:param default_location: Default bucket location
:type default_location: str

:param consistency: Write check method - "none", "size", "md5", "crc32c"
:type consistency: str

:param requester_pays: Whether the requester pays for requests
:type requester_pays: bool

:param session_kwargs: Dict for aiohttp session (proxy settings etc.)
:type session_kwargs: dict
"""
self._bucket = None
self.bucket_name = bucket_name

self.project = project
self.token = token
self.access = access
self.endpoint_url = endpoint_url
self.default_location = default_location
self.consistency = consistency
self.requester_pays = requester_pays
self.session_kwargs = session_kwargs or {}
self.gcs_log_level = gcs_log_level
set_gcs_log_level(self.gcs_log_level)

root_dir = os.path.join(self.bucket_name or "", root_dir)
if root_dir.startswith(os.path.sep):
root_dir = root_dir[1:]
if root_dir.endswith(os.path.sep):
root_dir = root_dir[:-1]

super(GcsStorage, self).__init__(root_dir=root_dir, **kwargs)

@property
def config_options(self):
return {
"bucket_name": self.bucket_name,
"project": self.project,
"token": self.token,
"access": self.access,
"endpoint_url": self.endpoint_url,
"default_location": self.default_location,
"consistency": self.consistency,
"requester_pays": self.requester_pays,
"session_kwargs": self.session_kwargs,
"root_dir": str(Path(self.root_dir).relative_to(self.bucket_name)),
"gcs_log_level": self.gcs_log_level,
}

def get_fsspec_storage_options(self):
options = {
"project": self.project,
"token": self.token,
"access": self.access,
"requester_pays": self.requester_pays,
}
if self.endpoint_url:
options["endpoint_url"] = self.endpoint_url
if self.default_location:
options["default_location"] = self.default_location
if self.consistency:
options["consistency"] = self.consistency
if self.session_kwargs:
options["session_kwargs"] = self.session_kwargs
return options

def get_storage_url(self, filename=None, suffix="tar.gz", encode_params=True):
filename = (
filename if filename is not None else self._get_unique_filename(suffix)
)

params = {}
if encode_params:
if self.project:
params["project"] = self.project

if self.token:
params["token"] = self.token

if self.access:
params["access"] = self.access

if self.endpoint_url:
params["endpoint"] = self.endpoint_url

return (
filename,
f"gs://{os.path.join(self.root_dir, filename)}{'?' if params else ''}{parse.urlencode(params) if params else ''}",
)

def url(self, object_name, parameters=None, expire=None):
"""Return URL for object

Parameters
----------
:param object_name: 'key' or name of object in bucket
:type object_name: str

:param parameters: Dictionary of parameters
:type parameters: dict

:param expire: Time in seconds for the URL to remain valid
:type expire: int

:return: URL as string
:rtype str
"""
blob_key = self.fs._join(object_name)
return self.fs.fs.url(blob_key)
14 changes: 14 additions & 0 deletions oasis_data_manager/filestore/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,26 @@ class AbfsStorageConfig(BaseStorageConfig):
endpoint_url: NotRequired[str]


class GcsStorageConfig(BaseStorageConfig):
bucket_name: NotRequired[str]
project: NotRequired[str]
token: NotRequired[str]
access: NotRequired[str]
endpoint_url: NotRequired[str]
default_location: NotRequired[str]
consistency: NotRequired[str]
requester_pays: NotRequired[bool]
session_kwargs: NotRequired[dict]
gcs_log_level: NotRequired[str]


class StorageConfig(TypedDict):
storage_class: str
options: Union[
LocalStorageConfig,
S3StorageConfig,
AbfsStorageConfig,
GcsStorageConfig,
]


Expand Down
29 changes: 29 additions & 0 deletions oasis_data_manager/filestore/filestore.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,33 @@ def split_azure_url(parts):
)


def split_gcs_url(parts):
query = parse.parse_qs(parts.query)
params = {
"project": query.get("project", [None])[0],
"token": query.get("token", ["anon"])[0],
"access": query.get("access", [None])[0],
}

endpoint = query.get("endpoint", [None])[0]
if endpoint:
params["endpoint_url"] = endpoint

return (
urllib.parse.urlunparse(
(
parts[0],
parts[1],
parts[2],
parts[3],
"",
parts[5],
)
),
params,
)


def parse_url_options(path):
opener = fsspec.open

Expand All @@ -84,6 +111,8 @@ def parse_url_options(path):
path, params = split_s3_url(url_parts)
elif url_parts.scheme in ["az", "abfs", "adl"]:
path, params = split_azure_url(url_parts)
elif url_parts.scheme in ["gs", "gcs"]:
path, params = split_gcs_url(url_parts)

return opener, path, params

Expand Down
11 changes: 11 additions & 0 deletions oasis_data_manager/filestore/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,14 @@ def set_azure_log_level(log_level):
except AttributeError:
LOG_LEVEL = logging.WARNING
logging.getLogger("azure").setLevel(LOG_LEVEL)


def set_gcs_log_level(log_level):
try:
LOG_LEVEL = getattr(logging, log_level.upper())
except AttributeError:
LOG_LEVEL = logging.WARNING
logging.getLogger("google").setLevel(LOG_LEVEL)
logging.getLogger("gcsfs").setLevel(LOG_LEVEL)
logging.getLogger("google.auth").setLevel(LOG_LEVEL)
logging.getLogger("google.cloud").setLevel(LOG_LEVEL)
1 change: 1 addition & 0 deletions optional-package.in
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@ dask-sql
distributed
geopandas==0.14.4
pyogrio
gcsfs>=2023.12.2
s3fs>=2023.12.2
1 change: 1 addition & 0 deletions requirements.in
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ hypothesis
httpx
respx
geodatasets
google-cloud-storage
ipdb
h5py
xxhash
Expand Down
Loading
Loading