Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion python/bucketbase/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"""
beware of the order of imports, as some of the imports are circular, like fs_bucket due to (named_lock_manager)
"""
from bucketbase.ibucket import S3_NAME_CHARS_NO_SEP, IBucket, ShallowListing
from bucketbase.ibucket import S3_NAME_CHARS_NO_SEP, IBucket, ObjectVersion, ShallowListing
from bucketbase.errors import DeleteError

from bucketbase.cached_immutable_bucket import CachedImmutableBucket
Expand Down
37 changes: 37 additions & 0 deletions python/bucketbase/ibucket.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,18 @@ class ShallowListing:
prefixes: slist[str]


@dataclass(frozen=True)
class ObjectVersion:
"""
Object version metadata returned by version-aware bucket implementations.
"""

name: PurePosixPath
version_id: str
is_latest: bool
is_delete_marker: bool = False


class ObjectStream(AbstractContextManager[BinaryIO]):
def __init__(self, stream: BinaryIO, name: PurePosixPath) -> None:
self._stream = stream
Expand Down Expand Up @@ -351,6 +363,31 @@ def remove_objects(self, names: Iterable[PurePosixPath | str]) -> slist[DeleteEr
"""
raise NotImplementedError()

def list_object_versions(self, name: PurePosixPath | str) -> slist[ObjectVersion]:
"""
Lists versions for a single object name.
"""
raise NotImplementedError()

def get_object_version(self, name: PurePosixPath | str, version_id: str) -> bytes:
"""
Retrieves a specific object version. Use get_object() to retrieve the current object version.
"""
with self.get_object_version_stream(name, version_id) as response:
return response.read()

def get_object_version_stream(self, name: PurePosixPath | str, version_id: str) -> ObjectStream:
"""
Retrieves a stream for a specific object version. Use get_object_stream() to retrieve the current object version.
"""
raise NotImplementedError()

def remove_object_with_versions(self, name: PurePosixPath | str) -> slist[DeleteError]:
"""
Deletes every listed version of a single object name, including delete markers when supported.
"""
raise NotImplementedError()

def open_write(self, name: PurePosixPath | str, timeout_sec: Optional[float] = None) -> AbstractContextManager[BinaryIO]:
"""
Returns a writable stream that, for MinIO, supports multipart upload functionality.
Expand Down
51 changes: 50 additions & 1 deletion python/bucketbase/memory_bucket.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import io
import multiprocessing
import uuid
import weakref
from contextlib import contextmanager
from multiprocessing.managers import DictProxy, SyncManager
Expand All @@ -11,7 +12,9 @@
from streamerate import stream as sstream

from bucketbase import DeleteError
from bucketbase.ibucket import IBucket, ObjectStream, ShallowListing
from bucketbase.ibucket import IBucket, ObjectStream, ObjectVersion, ShallowListing

MemoryObjectVersion = tuple[str, bytes | None, bool]


class _NonClosingBytesIO(io.BytesIO):
Expand All @@ -30,14 +33,21 @@ class MemoryBucket(IBucket):

def __init__(self) -> None:
self._objects: dict[str, bytes] | DictProxy[str, bytes] = {}
self._object_versions: dict[str, list[MemoryObjectVersion]] | DictProxy[str, list[MemoryObjectVersion]] = {}
self._lock: Any = RLock()

def _store_object_version(self, name: str, content: bytes | None, is_delete_marker: bool) -> None:
versions = self._object_versions.get(name, [])
versions.append((uuid.uuid4().hex, content, is_delete_marker))
self._object_versions[name] = versions

def put_object(self, name: PurePosixPath | str, content: Union[str, bytes, bytearray]) -> None:
_name = self._validate_name(name)

_content = self._encode_content(content)
with self._lock:
self._objects[_name] = _content
self._store_object_version(_name, _content, is_delete_marker=False)

def put_object_stream(self, name: PurePosixPath | str, stream: BinaryIO) -> None:
_content = stream.read()
Expand All @@ -55,6 +65,20 @@ def get_object_stream(self, name: PurePosixPath | str) -> ObjectStream:
content = self.get_object(name)
return ObjectStream(io.BytesIO(content), PurePosixPath(name))

def get_object_version(self, name: PurePosixPath | str, version_id: str) -> bytes:
_name = self._validate_name(name)
with self._lock:
for stored_version_id, content, is_delete_marker in self._object_versions.get(_name, []):
if stored_version_id == version_id:
if is_delete_marker or content is None:

@amaximciuc amaximciuc May 20, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why isn't the is_delete_marker enough? why must we check for content is None?

raise FileNotFoundError(f"Object {_name} version {version_id} is a delete marker in MemoryObjectStore")
return content
raise FileNotFoundError(f"Object {_name} version {version_id} not found in MemoryObjectStore")

def get_object_version_stream(self, name: PurePosixPath | str, version_id: str) -> ObjectStream:
content = self.get_object_version(name, version_id)
return ObjectStream(io.BytesIO(content), PurePosixPath(name))

def list_objects(self, prefix: PurePosixPath | str = "") -> slist[PurePosixPath]:
self._split_prefix(prefix) # validate prefix
str_prefix = str(prefix)
Expand All @@ -79,6 +103,21 @@ def shallow_list_objects(self, prefix: PurePosixPath | str = "") -> ShallowListi
prefixes.add(common_prefix)
return ShallowListing(objects=objects, prefixes=prefixes.to_list())

def list_object_versions(self, name: PurePosixPath | str) -> slist[ObjectVersion]:
_name = self._validate_name(name)
with self._lock:
versions = self._object_versions.get(_name, [])
latest_index = len(versions) - 1
return slist(
ObjectVersion(
name=PurePosixPath(_name),
version_id=version_id,
is_latest=index == latest_index,
is_delete_marker=is_delete_marker,
)
for index, (version_id, _content, is_delete_marker) in reversed(list(enumerate(versions)))
)

def exists(self, name: PurePosixPath | str) -> bool:
_name = self._validate_name(name)
with self._lock:
Expand All @@ -93,8 +132,16 @@ def remove_objects(self, names: Iterable[PurePosixPath | str]) -> slist[DeleteEr
obj = self._validate_name(obj)
if obj in self._objects:
self._objects.pop(obj)
self._store_object_version(obj, content=None, is_delete_marker=True)
return delete_errors

def remove_object_with_versions(self, name: PurePosixPath | str) -> slist[DeleteError]:
_name = self._validate_name(name)
with self._lock:
self._objects.pop(_name, None)
self._object_versions.pop(_name, None)
return slist()

def get_size(self, name: PurePosixPath | str) -> int:
_name = self._validate_name(name)

Expand Down Expand Up @@ -141,6 +188,7 @@ def open_write_sync(self, name: PurePosixPath | str) -> Generator[BinaryIO, None
if not exception_occurred:
with self._lock:
self._objects[_name] = content
self._store_object_version(_name, content, is_delete_marker=False)


class SharedMemoryBucket(MemoryBucket):
Expand Down Expand Up @@ -170,6 +218,7 @@ def __init__(self, manager: Optional[SyncManager] = None) -> None:

# override parent's structures with managed ones. This is a small hack, but it's simple & clear enough
self._objects: dict[str, bytes] | DictProxy[str, bytes] = manager.dict()
self._object_versions: dict[str, list[MemoryObjectVersion]] | DictProxy[str, list[MemoryObjectVersion]] = manager.dict()
self._lock: Any = manager.RLock()

def __getstate__(self) -> dict[str, Any]:
Expand Down
70 changes: 61 additions & 9 deletions python/bucketbase/minio_bucket.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from streamerate import stream as sstream
from urllib3 import BaseHTTPResponse

from bucketbase.ibucket import IBucket, ObjectStream, ShallowListing
from bucketbase.ibucket import IBucket, ObjectStream, ObjectVersion, ShallowListing


class MinioObjectStream(ObjectStream):
Expand Down Expand Up @@ -125,18 +125,42 @@ def __init__(self, bucket_name: str, minio_client: Minio, part_size: int | None

@classmethod
def _get_object_name(cls, obj: Object) -> str:
return obj.name if cls._USES_NAME_ATTRIBUTE else obj.object_name
object_name = obj.name if cls._USES_NAME_ATTRIBUTE else obj.object_name
if object_name is None:
raise ValueError("Minio object listing item has no object name")
return object_name

@staticmethod
def _read_response(response: BaseHTTPResponse) -> bytes:
try:
data = bytes()
for buffer in response.stream(amt=1024 * 1024):
data += buffer
return data
Comment on lines +136 to +139
finally:
response.release_conn()

@staticmethod
def _to_bool(value: object) -> bool:
return value if isinstance(value, bool) else str(value).lower() == "true"

@classmethod
def _to_object_version(cls, obj: Object) -> ObjectVersion:
object_name = cls._get_object_name(obj)
version_id = obj.version_id
if version_id is None:
raise ValueError(f"Minio object listing item {object_name} has no version id")
return ObjectVersion(
name=PurePosixPath(object_name),
version_id=version_id,
is_latest=cls._to_bool(obj.is_latest),
is_delete_marker=obj.is_delete_marker,
)

def get_object(self, name: PurePosixPath | str) -> bytes:
with self.get_object_stream(name) as response:
assert isinstance(response, BaseHTTPResponse), f"Expected IOBase, got {type(response)}"
try:
data = bytes()
for buffer in response.stream(amt=1024 * 1024):
data += buffer
return data
finally:
response.release_conn()
return self._read_response(response)

def get_object_stream(self, name: PurePosixPath | str) -> ObjectStream:
_name = self._validate_name(name)
Expand All @@ -150,6 +174,21 @@ def get_object_stream(self, name: PurePosixPath | str) -> ObjectStream:
_name_path = PurePosixPath(_name) if isinstance(_name, str) else _name
return MinioObjectStream(response, _name_path)

def get_object_version(self, name: PurePosixPath | str, version_id: str) -> bytes:
with self.get_object_version_stream(name, version_id) as response:
assert isinstance(response, BaseHTTPResponse), f"Expected IOBase, got {type(response)}"
return self._read_response(response)

def get_object_version_stream(self, name: PurePosixPath | str, version_id: str) -> ObjectStream:
_name = self._validate_name(name)
try:
response: BaseHTTPResponse = self._minio_client.get_object(self._bucket_name, _name, version_id=version_id)
except minio.error.S3Error as e:
if e.code in ("MethodNotAllowed", "NoSuchKey", "NoSuchVersion"):
raise FileNotFoundError(f"Object {_name} version {version_id} not found in bucket {self._bucket_name} on Minio") from e
raise
return MinioObjectStream(response, PurePosixPath(_name))

def fget_object(self, name: PurePosixPath | str, file_path: Path) -> None:
"""
Raises:
Expand Down Expand Up @@ -198,6 +237,11 @@ def shallow_list_objects(self, prefix: PurePosixPath | str = "") -> ShallowListi
objects = object_names.filter(lambda x: not x.endswith("/")).map(PurePosixPath).to_list()
return ShallowListing(objects=objects, prefixes=prefixes)

def list_object_versions(self, name: PurePosixPath | str) -> slist[ObjectVersion]:
_name = self._validate_name(name)
listing_itr = self._minio_client.list_objects(bucket_name=self._bucket_name, prefix=_name, recursive=True, include_version=True)
return sstream(listing_itr).filter(lambda obj: self._get_object_name(obj) == _name).map(self._to_object_version).to_list()

def exists(self, name: PurePosixPath | str) -> bool:
_name = self._validate_name(name)
try:
Expand All @@ -216,6 +260,14 @@ def remove_objects(self, names: Iterable[PurePosixPath | str]) -> slist[DeleteEr
errors = slist(self._minio_client.remove_objects(self._bucket_name, delete_objects_stream))
return errors

def remove_object_with_versions(self, name: PurePosixPath | str) -> slist[DeleteError]:
versions = self.list_object_versions(name)
if versions.size() == 0:
return slist()

delete_objects_stream = versions.map(lambda version: DeleteObject(str(version.name), version.version_id))
return slist(self._minio_client.remove_objects(self._bucket_name, delete_objects_stream))

def get_size(self, name: PurePosixPath | str) -> int:
try:
st = self._minio_client.stat_object(self._bucket_name, str(name))
Expand Down
Loading
Loading