From 42ab95398d8825b531b138bbec7d1975382312a9 Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Tue, 26 May 2026 20:53:39 +0000 Subject: [PATCH 1/9] feat(storage): parse finalize_time and server crc32c in async object stream --- .../asyncio/async_read_object_stream.py | 19 ++++++++++ .../asyncio/test_async_read_object_stream.py | 37 ++++++++++++++++++- 2 files changed, 55 insertions(+), 1 deletion(-) diff --git a/packages/google-cloud-storage/google/cloud/storage/asyncio/async_read_object_stream.py b/packages/google-cloud-storage/google/cloud/storage/asyncio/async_read_object_stream.py index cd7ae067c631..8655ed005d61 100644 --- a/packages/google-cloud-storage/google/cloud/storage/asyncio/async_read_object_stream.py +++ b/packages/google-cloud-storage/google/cloud/storage/asyncio/async_read_object_stream.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import datetime from typing import List, Optional, Tuple from google.api_core.bidi_async import AsyncBidiRpc @@ -79,6 +80,9 @@ def __init__( self.socket_like_rpc: Optional[AsyncBidiRpc] = None self._is_stream_open: bool = False self.persisted_size: Optional[int] = None + self.is_finalized: bool = False + self.full_obj_server_crc32c: Optional[int] = None + self.object_metadata: Optional[_storage_v2.Object] = None async def open(self, metadata: Optional[List[Tuple[str, str]]] = None) -> None: """Opens the bidi-gRPC connection to read from the object. @@ -132,6 +136,21 @@ async def open(self, metadata: Optional[List[Tuple[str, str]]] = None) -> None: self.generation_number = response.metadata.generation # update persisted size self.persisted_size = response.metadata.size + self.object_metadata = response.metadata + # Since full object checksum validation is only required for finalized objects, + # check finalize_time (which is DatetimeWithNanoseconds/datetime in production, or mocked in tests). + finalize_time = getattr(response.metadata, "finalize_time", None) + if finalize_time: + is_finalized_val = False + if isinstance(finalize_time, datetime.datetime): + is_finalized_val = True + elif hasattr(finalize_time, "seconds") and finalize_time.seconds > 0: + is_finalized_val = True + + if is_finalized_val: + self.is_finalized = True + if hasattr(response.metadata, "checksums") and response.metadata.checksums: + self.full_obj_server_crc32c = response.metadata.checksums.crc32c if response and response.read_handle: self.read_handle = response.read_handle diff --git a/packages/google-cloud-storage/tests/unit/asyncio/test_async_read_object_stream.py b/packages/google-cloud-storage/tests/unit/asyncio/test_async_read_object_stream.py index f5783be6bf94..fc16ae41a0ac 100644 --- a/packages/google-cloud-storage/tests/unit/asyncio/test_async_read_object_stream.py +++ b/packages/google-cloud-storage/tests/unit/asyncio/test_async_read_object_stream.py @@ -38,9 +38,11 @@ async def instantiate_read_obj_stream(mock_client, mock_cls_async_bidi_rpc, open socket_like_rpc.open = AsyncMock() recv_response = mock.MagicMock(spec=_storage_v2.BidiReadObjectResponse) - recv_response.metadata = mock.MagicMock(spec=_storage_v2.Object) + recv_response.metadata = mock.MagicMock() recv_response.metadata.generation = _TEST_GENERATION_NUMBER recv_response.metadata.size = _TEST_OBJECT_SIZE + recv_response.metadata.finalize_time.seconds = 12345 + recv_response.metadata.checksums.crc32c = 98765 recv_response.read_handle = _TEST_READ_HANDLE socket_like_rpc.recv = AsyncMock(return_value=recv_response) @@ -130,6 +132,8 @@ async def test_open(mock_client, mock_cls_async_bidi_rpc): assert read_obj_stream.generation_number == _TEST_GENERATION_NUMBER assert read_obj_stream.read_handle == _TEST_READ_HANDLE assert read_obj_stream.persisted_size == _TEST_OBJECT_SIZE + assert read_obj_stream.is_finalized is True + assert read_obj_stream.full_obj_server_crc32c == 98765 assert read_obj_stream.is_stream_open @@ -381,3 +385,34 @@ async def test_recv_updates_read_handle_on_refresh( await stream.recv() assert stream.read_handle == refreshed_handle + + +@mock.patch("google.cloud.storage.asyncio.async_read_object_stream.AsyncBidiRpc") +@mock.patch( + "google.cloud.storage.asyncio.async_grpc_client.AsyncGrpcClient.grpc_client" +) +@pytest.mark.asyncio +async def test_open_unfinalized_object_skips_checksum(mock_client, mock_cls_async_bidi_rpc): + socket_like_rpc = AsyncMock() + mock_cls_async_bidi_rpc.return_value = socket_like_rpc + socket_like_rpc.open = AsyncMock() + + recv_response = mock.MagicMock(spec=_storage_v2.BidiReadObjectResponse) + recv_response.metadata = mock.MagicMock() + recv_response.metadata.generation = _TEST_GENERATION_NUMBER + recv_response.metadata.size = _TEST_OBJECT_SIZE + recv_response.metadata.finalize_time.seconds = 0 # NOT finalized! + recv_response.metadata.checksums.crc32c = 98765 + recv_response.read_handle = _TEST_READ_HANDLE + socket_like_rpc.recv = AsyncMock(return_value=recv_response) + + read_obj_stream = _AsyncReadObjectStream( + client=mock_client, + bucket_name=_TEST_BUCKET_NAME, + object_name=_TEST_OBJECT_NAME, + ) + + await read_obj_stream.open() + + assert read_obj_stream.is_finalized is False + assert read_obj_stream.full_obj_server_crc32c is None From d972c2cc3aea01e14633054b961eff577bff4910 Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Tue, 26 May 2026 20:54:04 +0000 Subject: [PATCH 2/9] feat(storage): implement rolling checksum and verification in reads resumption strategy --- .../retry/reads_resumption_strategy.py | 29 ++++++++- .../retry/test_reads_resumption_strategy.py | 59 ++++++++++++++++++- 2 files changed, 83 insertions(+), 5 deletions(-) diff --git a/packages/google-cloud-storage/google/cloud/storage/asyncio/retry/reads_resumption_strategy.py b/packages/google-cloud-storage/google/cloud/storage/asyncio/retry/reads_resumption_strategy.py index 845770c3a215..35d974ae2911 100644 --- a/packages/google-cloud-storage/google/cloud/storage/asyncio/retry/reads_resumption_strategy.py +++ b/packages/google-cloud-storage/google/cloud/storage/asyncio/retry/reads_resumption_strategy.py @@ -36,7 +36,7 @@ class _DownloadState: """A helper class to track the state of a single range download.""" def __init__( - self, initial_offset: int, initial_length: int, user_buffer: IO[bytes] + self, initial_offset: int, initial_length: int, user_buffer: IO[bytes], is_full_object_read: bool = False ): self.initial_offset = initial_offset self.initial_length = initial_length @@ -44,6 +44,8 @@ def __init__( self.bytes_written = 0 self.next_expected_offset = initial_offset self.is_complete = False + self.is_full_object_read = is_full_object_read + self.rolling_checksum = google_crc32c.Checksum() if is_full_object_read else None class _ReadResumptionStrategy(_BaseResumptionStrategy): @@ -90,6 +92,7 @@ def update_state_from_response( ) download_states = state["download_states"] + checksum_enabled = state.get("enable_checksum", True) for object_data_range in proto.object_data_ranges: # Ignore empty ranges or ranges for IDs not in our state @@ -125,7 +128,7 @@ def update_state_from_response( checksummed_data = object_data_range.checksummed_data data = checksummed_data.content - if checksummed_data.HasField("crc32c"): + if checksum_enabled and checksummed_data.HasField("crc32c"): server_checksum = checksummed_data.crc32c client_checksum = google_crc32c.value(data) if server_checksum != client_checksum: @@ -138,10 +141,14 @@ def update_state_from_response( # Update State & Write Data chunk_size = len(data) read_state.user_buffer.write(data) + + # Commit updates only after the write succeeds + if read_state.rolling_checksum is not None: + read_state.rolling_checksum.update(data) read_state.bytes_written += chunk_size read_state.next_expected_offset += chunk_size - # Final Byte Count Verification + # Final Byte Count & Full Object Checksum Verification if object_data_range.range_end: read_state.is_complete = True if ( @@ -154,6 +161,22 @@ def update_state_from_response( f"Expected {read_state.initial_length}, got {read_state.bytes_written}", ) + # Perform full-object checksum verification once the stream finishes. + if read_state.is_full_object_read and checksum_enabled: + full_obj_server_crc32c = state.get("full_obj_server_crc32c") + if full_obj_server_crc32c is not None: + # Use standard big-endian byte conversion to retrieve the rolling checksum value. + client_checksum = int.from_bytes( + read_state.rolling_checksum.digest(), + byteorder="big", + ) + if client_checksum != full_obj_server_crc32c: + raise DataCorruption( + response, + f"Full object checksum mismatch for read_id {read_id}. " + f"Server authoritative crc32c: {full_obj_server_crc32c}, client calculated rolling: {client_checksum}.", + ) + async def recover_state_on_failure(self, error: Exception, state: Any) -> None: """Handles BidiReadObjectRedirectedError for reads.""" routing_token, read_handle = _handle_redirect(error) diff --git a/packages/google-cloud-storage/tests/unit/asyncio/retry/test_reads_resumption_strategy.py b/packages/google-cloud-storage/tests/unit/asyncio/retry/test_reads_resumption_strategy.py index dc27cb701974..88bae515729b 100644 --- a/packages/google-cloud-storage/tests/unit/asyncio/retry/test_reads_resumption_strategy.py +++ b/packages/google-cloud-storage/tests/unit/asyncio/retry/test_reads_resumption_strategy.py @@ -45,6 +45,24 @@ def test_initialization(self): self.assertEqual(state.bytes_written, 0) self.assertEqual(state.next_expected_offset, initial_offset) self.assertFalse(state.is_complete) + self.assertFalse(state.is_full_object_read) + self.assertIsNone(state.rolling_checksum) + + def test_initialization_with_full_object_read(self): + """Test that _DownloadState initializes correctly when is_full_object_read is True.""" + initial_offset = 10 + initial_length = 100 + user_buffer = io.BytesIO() + state_full = _DownloadState(initial_offset, initial_length, user_buffer, is_full_object_read=True) + + self.assertEqual(state_full.initial_offset, initial_offset) + self.assertEqual(state_full.initial_length, initial_length) + self.assertEqual(state_full.user_buffer, user_buffer) + self.assertEqual(state_full.bytes_written, 0) + self.assertEqual(state_full.next_expected_offset, initial_offset) + self.assertFalse(state_full.is_complete) + self.assertTrue(state_full.is_full_object_read) + self.assertIsNotNone(state_full.rolling_checksum) class TestReadResumptionStrategy(unittest.TestCase): @@ -53,12 +71,12 @@ def setUp(self): self.state = {"download_states": {}, "read_handle": None, "routing_token": None} - def _add_download(self, read_id, offset=0, length=100, buffer=None): + def _add_download(self, read_id, offset=0, length=100, buffer=None, is_full_object_read=False): """Helper to inject a download state into the correct nested location.""" if buffer is None: buffer = io.BytesIO() state = _DownloadState( - initial_offset=offset, initial_length=length, user_buffer=buffer + initial_offset=offset, initial_length=length, user_buffer=buffer, is_full_object_read=is_full_object_read ) self.state["download_states"][read_id] = state return state @@ -358,3 +376,40 @@ async def run(): # Token should remain unchanged self.assertEqual(self.state["routing_token"], "existing-token") + + def test_update_state_full_object_checksum_success(self): + """Test that full object checksum verification succeeds on range_end.""" + read_state = self._add_download(_READ_ID, offset=0, length=9, is_full_object_read=True) + self.state["enable_checksum"] = True + self.state["full_obj_server_crc32c"] = google_crc32c.value(b"testdata1") + + resp1 = self._create_response(b"test", _READ_ID, offset=0) + self.strategy.update_state_from_response(resp1, self.state) + + resp2 = self._create_response(b"data1", _READ_ID, offset=4, range_end=True) + self.strategy.update_state_from_response(resp2, self.state) + + self.assertTrue(read_state.is_complete) + self.assertEqual(read_state.bytes_written, 9) + + def test_update_state_full_object_checksum_failure(self): + """Test that full object checksum verification raises DataCorruption on mismatch at range_end.""" + self._add_download(_READ_ID, offset=0, length=9, is_full_object_read=True) + self.state["enable_checksum"] = True + self.state["full_obj_server_crc32c"] = 111111 # Wrong server checksum! + + resp1 = self._create_response(b"test", _READ_ID, offset=0) + self.strategy.update_state_from_response(resp1, self.state) + + resp2 = self._create_response(b"data1", _READ_ID, offset=4, range_end=True) + with self.assertRaisesRegex(DataCorruption, "Full object checksum mismatch"): + self.strategy.update_state_from_response(resp2, self.state) + + def test_update_state_checksum_mismatch_ignored_when_disabled(self): + """Test that a CRC32C mismatch is ignored when enable_checksum is False.""" + self._add_download(_READ_ID) + self.state["enable_checksum"] = False + response = self._create_response(b"data", _READ_ID, offset=0, crc=999999) + + # Should NOT raise DataCorruption! + self.strategy.update_state_from_response(response, self.state) From b4a7a849aa0814f27d75399288aa3aa84c0d7875 Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Tue, 26 May 2026 20:54:24 +0000 Subject: [PATCH 3/9] feat(storage): integrate full-object checksum in AsyncMultiRangeDownloader --- .../asyncio/async_multi_range_downloader.py | 42 +++++- .../tests/system/test_zonal.py | 78 ++++++++++++ .../test_async_multi_range_downloader.py | 120 +++++++++++++++++- 3 files changed, 231 insertions(+), 9 deletions(-) diff --git a/packages/google-cloud-storage/google/cloud/storage/asyncio/async_multi_range_downloader.py b/packages/google-cloud-storage/google/cloud/storage/asyncio/async_multi_range_downloader.py index ac0844519e2d..fa680e95dba2 100644 --- a/packages/google-cloud-storage/google/cloud/storage/asyncio/async_multi_range_downloader.py +++ b/packages/google-cloud-storage/google/cloud/storage/asyncio/async_multi_range_downloader.py @@ -45,6 +45,7 @@ _ReadResumptionStrategy, ) +from google.cloud.storage.exceptions import DataCorruption from ._utils import raise_if_no_fast_crc32c _MAX_READ_RANGES_PER_BIDI_READ_REQUEST = 100 @@ -219,8 +220,6 @@ def __init__( ) generation = kwargs.pop("generation_number") - raise_if_no_fast_crc32c() - self.client = client self.bucket_name = bucket_name self.object_name = object_name @@ -232,6 +231,8 @@ def __init__( self._multiplexer: Optional[_StreamMultiplexer] = None self.persisted_size: Optional[int] = None # updated after opening the stream self._open_retries: int = 0 + self.is_finalized: bool = False + self.full_obj_server_crc32c: Optional[int] = None async def __aenter__(self): """Opens the underlying bidi-gRPC connection to read from the object.""" @@ -327,6 +328,8 @@ async def _do_open(): self.read_handle = self.read_obj_str.read_handle if self.read_obj_str.persisted_size is not None: self.persisted_size = self.read_obj_str.persisted_size + self.is_finalized = self.read_obj_str.is_finalized + self.full_obj_server_crc32c = self.read_obj_str.full_obj_server_crc32c self._is_stream_open = True @@ -363,6 +366,8 @@ async def factory(): self.generation = stream.generation_number if stream.read_handle: self.read_handle = stream.read_handle + self.is_finalized = stream.is_finalized + self.full_obj_server_crc32c = stream.full_obj_server_crc32c self.read_obj_str = stream self._is_stream_open = True @@ -377,6 +382,7 @@ async def download_ranges( lock: asyncio.Lock = None, retry_policy: Optional[AsyncRetry] = None, metadata: Optional[List[Tuple[str, str]]] = None, + enable_checksum: bool = True, ) -> None: """Downloads multiple byte ranges from the object into the buffers provided by user with automatic retries. @@ -412,6 +418,9 @@ async def download_ranges( "Invalid input - length of read_ranges cannot be more than 1000" ) + if enable_checksum: + raise_if_no_fast_crc32c() + if not self._is_stream_open: raise ValueError("Underlying bidi-gRPC stream is not open") @@ -422,16 +431,29 @@ async def download_ranges( download_states = {} for read_range in read_ranges: read_id = generate_random_56_bit_integer() + # Unpack tuple into self-documenting variable names to improve readability. + offset, length, user_buffer = read_range + + # Heuristic to detect full object reads: + # - Implicit full object read: start offset is 0 and length is 0 (read all). + # - Explicit full object read: start offset is 0 and length matches the exact persisted size. + is_full_object_read = ( + (offset == 0 and length == 0) or + (self.persisted_size is not None and offset == 0 and length == self.persisted_size) + ) download_states[read_id] = _DownloadState( - initial_offset=read_range[0], - initial_length=read_range[1], - user_buffer=read_range[2], + initial_offset=offset, + initial_length=length, + user_buffer=user_buffer, + is_full_object_read=is_full_object_read, ) initial_state = { "download_states": download_states, "read_handle": self.read_handle, "routing_token": None, + "enable_checksum": enable_checksum, + "full_obj_server_crc32c": self.full_obj_server_crc32c, } read_ids = set(download_states.keys()) @@ -519,12 +541,18 @@ async def generator(): strategy, send_and_recv_via_multiplexer ) - await retry_manager.execute(initial_state, retry_policy) + try: + await retry_manager.execute(initial_state, retry_policy) + except DataCorruption: + if self.is_stream_open: + await self.close() + raise if initial_state.get("read_handle"): self.read_handle = initial_state["read_handle"] finally: - self._multiplexer.unregister(read_ids) + if self._multiplexer is not None: + self._multiplexer.unregister(read_ids) async def close(self): """ diff --git a/packages/google-cloud-storage/tests/system/test_zonal.py b/packages/google-cloud-storage/tests/system/test_zonal.py index 20e172a1adee..b11f584192fe 100644 --- a/packages/google-cloud-storage/tests/system/test_zonal.py +++ b/packages/google-cloud-storage/tests/system/test_zonal.py @@ -26,6 +26,7 @@ ObjectContexts, ObjectCustomContextPayload, ) +from google.cloud.storage.exceptions import DataCorruption pytestmark = pytest.mark.skipif( os.getenv("RUN_ZONAL_SYSTEM_TESTS") != "True", @@ -961,3 +962,80 @@ async def _run(): blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name)) event_loop.run_until_complete(_run()) + + +@pytest.mark.parametrize( + "read_start, read_length, enable_checksum", + [ + (0, 0, True), + (0, 1024 * 1024, True), + (0, 0, False), + ], +) +def test_mrd_checksum_validation( + storage_client, blobs_to_delete, event_loop, grpc_client_direct, read_start, read_length, enable_checksum +): + """ + Tests full downloads with specified offset, length, and enable_checksum toggle on finalized objects. + """ + object_size = 1024 * 1024 # 1MB + object_name = f"test_mrd_chksum-{uuid.uuid4()}" + + async def _run(): + object_data = os.urandom(object_size) + + writer = AsyncAppendableObjectWriter( + grpc_client_direct, _ZONAL_BUCKET, object_name + ) + await writer.open() + await writer.append(object_data) + await writer.close(finalize_on_close=True) + + async with AsyncMultiRangeDownloader( + grpc_client_direct, _ZONAL_BUCKET, object_name + ) as mrd: + buffer = BytesIO() + await mrd.download_ranges([(read_start, read_length, buffer)], enable_checksum=enable_checksum) + assert buffer.getvalue() == object_data + + # cleanup + del writer + gc.collect() + blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name)) + + event_loop.run_until_complete(_run()) + + +def test_mrd_checksum_unfinalized_appendable_skipped( + storage_client, blobs_to_delete, event_loop, grpc_client_direct +): + """ + Verifies that live, unfinalized appendable objects skip the full-object checksum check + naturally without raising any exceptions. + """ + object_name = f"test_mrd_chksum_unfin-{uuid.uuid4()}" + + async def _run(): + writer = AsyncAppendableObjectWriter( + grpc_client_direct, _ZONAL_BUCKET, object_name + ) + await writer.open() + await writer.append(_BYTES_TO_UPLOAD) + await writer.flush() # Flushed but not finalized! + + # Download the unfinalized appendable object with enable_checksum=True + async with AsyncMultiRangeDownloader( + grpc_client_direct, _ZONAL_BUCKET, object_name + ) as mrd: + buffer = BytesIO() + # Since it's unfinalized, it should skip the checksum check without raising + await mrd.download_ranges([(0, 0, buffer)], enable_checksum=True) + assert buffer.getvalue() == _BYTES_TO_UPLOAD + + # cleanup + await writer.close() + del writer + gc.collect() + blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name)) + + event_loop.run_until_complete(_run()) diff --git a/packages/google-cloud-storage/tests/unit/asyncio/test_async_multi_range_downloader.py b/packages/google-cloud-storage/tests/unit/asyncio/test_async_multi_range_downloader.py index 24a632b68131..047c749b94c4 100644 --- a/packages/google-cloud-storage/tests/unit/asyncio/test_async_multi_range_downloader.py +++ b/packages/google-cloud-storage/tests/unit/asyncio/test_async_multi_range_downloader.py @@ -308,12 +308,14 @@ async def test_downloading_without_opening_should_throw_error(self): assert not mrd.is_stream_open @mock.patch("google.cloud.storage.asyncio._utils.google_crc32c") - def test_init_raises_if_crc32c_c_extension_is_missing(self, mock_google_crc32c): + @pytest.mark.asyncio + async def test_download_ranges_raises_if_crc32c_c_extension_is_missing(self, mock_google_crc32c): mock_google_crc32c.implementation = "python" mock_client = mock.MagicMock() + mrd = AsyncMultiRangeDownloader(mock_client, "bucket", "object") with pytest.raises(exceptions.FailedPrecondition) as exc_info: - AsyncMultiRangeDownloader(mock_client, "bucket", "object") + await mrd.download_ranges([(0, 10, BytesIO())]) assert "The google-crc32c package is not installed with C support" in str( exc_info.value @@ -579,3 +581,117 @@ async def staged_recv(): # Assert mock_logger.info.assert_any_call("Resuming download (attempt 2) for 1 ranges.") + + @mock.patch( + "google.cloud.storage.asyncio.async_multi_range_downloader._AsyncReadObjectStream" + ) + @pytest.mark.asyncio + async def test_open_populates_checksum_properties(self, mock_cls_async_read_object_stream): + # Arrange + mock_client = mock.MagicMock() + mock_client.grpc_client = mock.AsyncMock() + mock_stream = mock_cls_async_read_object_stream.return_value + mock_stream.open = AsyncMock() + mock_stream.generation_number = 123 + mock_stream.persisted_size = 100 + mock_stream.read_handle = b"h" + mock_stream.is_finalized = True + mock_stream.full_obj_server_crc32c = 999 + + mrd = AsyncMultiRangeDownloader(mock_client, "bucket", "object") + assert mrd.is_finalized is False + assert mrd.full_obj_server_crc32c is None + + # Act + await mrd.open() + + # Assert + assert mrd.is_finalized is True + assert mrd.full_obj_server_crc32c == 999 + + @mock.patch( + "google.cloud.storage.asyncio.async_multi_range_downloader._ReadResumptionStrategy" + ) + @mock.patch( + "google.cloud.storage.asyncio.async_multi_range_downloader._BidiStreamRetryManager" + ) + @mock.patch( + "google.cloud.storage.asyncio.async_multi_range_downloader._AsyncReadObjectStream" + ) + @pytest.mark.asyncio + async def test_download_ranges_configures_full_object_read_state( + self, mock_cls_async_read_object_stream, mock_retry_manager_cls, mock_strategy_cls + ): + # Arrange + mock_client = mock.MagicMock() + mock_client.grpc_client = mock.AsyncMock() + mock_stream = mock_cls_async_read_object_stream.return_value + mock_stream.open = AsyncMock() + mock_stream.persisted_size = 100 + mock_stream.is_finalized = True + mock_stream.full_obj_server_crc32c = 999 + + mrd = await AsyncMultiRangeDownloader.create_mrd(mock_client, "b", "o") + + mock_retry_manager = mock_retry_manager_cls.return_value + mock_retry_manager.execute = AsyncMock() + + # Act + # Implicit full read (0, 0) and explicit full read (0, persisted_size=100) + ranges = [(0, 0, BytesIO()), (0, 100, BytesIO()), (10, 20, BytesIO())] + await mrd.download_ranges(ranges, enable_checksum=True) + + # Assert + mock_retry_manager.execute.assert_called_once() + initial_state = mock_retry_manager.execute.call_args[0][0] + + download_states = initial_state["download_states"] + assert len(download_states) == 3 + + states_list = list(download_states.values()) + # First state: (0, 0) -> is_full_object_read is True + assert states_list[0].is_full_object_read is True + assert states_list[0].rolling_checksum is not None + + # Second state: (0, 100) -> is_full_object_read is True + assert states_list[1].is_full_object_read is True + assert states_list[1].rolling_checksum is not None + + # Third state: (10, 20) -> is_full_object_read is False + assert states_list[2].is_full_object_read is False + assert states_list[2].rolling_checksum is None + + # State values for enable_checksum and crc32c + assert initial_state["enable_checksum"] is True + assert initial_state["full_obj_server_crc32c"] == 999 + + @mock.patch( + "google.cloud.storage.asyncio.async_multi_range_downloader._ReadResumptionStrategy" + ) + @mock.patch( + "google.cloud.storage.asyncio.async_multi_range_downloader._BidiStreamRetryManager" + ) + @mock.patch( + "google.cloud.storage.asyncio.async_multi_range_downloader._AsyncReadObjectStream" + ) + @pytest.mark.asyncio + async def test_download_ranges_closes_on_datacorruption( + self, mock_cls_async_read_object_stream, mock_retry_manager_cls, mock_strategy_cls + ): + # Arrange + mock_client = mock.MagicMock() + mock_client.grpc_client = mock.AsyncMock() + mock_stream = mock_cls_async_read_object_stream.return_value + mock_stream.open = AsyncMock() + + mrd = await AsyncMultiRangeDownloader.create_mrd(mock_client, "b", "o") + mrd.close = AsyncMock() + + mock_retry_manager = mock_retry_manager_cls.return_value + mock_retry_manager.execute = AsyncMock(side_effect=DataCorruption(None, "corrupted")) + + # Act & Assert + with pytest.raises(DataCorruption): + await mrd.download_ranges([(0, 0, BytesIO())]) + + mrd.close.assert_called_once() From 7110744e7cfcd68e75d66fce375dd66611c99962 Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Tue, 26 May 2026 21:02:53 +0000 Subject: [PATCH 4/9] style(storage): apply user feedback on finalize_time and fix unused import lint error --- .../asyncio/async_read_object_stream.py | 23 +++++++------------ 1 file changed, 8 insertions(+), 15 deletions(-) diff --git a/packages/google-cloud-storage/google/cloud/storage/asyncio/async_read_object_stream.py b/packages/google-cloud-storage/google/cloud/storage/asyncio/async_read_object_stream.py index 8655ed005d61..965c1abb9835 100644 --- a/packages/google-cloud-storage/google/cloud/storage/asyncio/async_read_object_stream.py +++ b/packages/google-cloud-storage/google/cloud/storage/asyncio/async_read_object_stream.py @@ -12,7 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import datetime from typing import List, Optional, Tuple from google.api_core.bidi_async import AsyncBidiRpc @@ -137,20 +136,14 @@ async def open(self, metadata: Optional[List[Tuple[str, str]]] = None) -> None: # update persisted size self.persisted_size = response.metadata.size self.object_metadata = response.metadata - # Since full object checksum validation is only required for finalized objects, - # check finalize_time (which is DatetimeWithNanoseconds/datetime in production, or mocked in tests). - finalize_time = getattr(response.metadata, "finalize_time", None) - if finalize_time: - is_finalized_val = False - if isinstance(finalize_time, datetime.datetime): - is_finalized_val = True - elif hasattr(finalize_time, "seconds") and finalize_time.seconds > 0: - is_finalized_val = True - - if is_finalized_val: - self.is_finalized = True - if hasattr(response.metadata, "checksums") and response.metadata.checksums: - self.full_obj_server_crc32c = response.metadata.checksums.crc32c + if ( + hasattr(response.metadata, "finalize_time") + and response.metadata.finalize_time + and response.metadata.finalize_time.seconds > 0 + ): + self.is_finalized = True + if hasattr(response.metadata, "checksums") and response.metadata.checksums: + self.full_obj_server_crc32c = response.metadata.checksums.crc32c if response and response.read_handle: self.read_handle = response.read_handle From 31ef2e399f59becde18ab59cb909f8a5f4916e87 Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Wed, 27 May 2026 08:01:53 +0000 Subject: [PATCH 5/9] style(storage): run ruff format to resolve CI lint failures --- .../google/cloud/storage/asyncio/async_read_object_stream.py | 5 ++++- .../tests/unit/asyncio/test_async_read_object_stream.py | 4 +++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/packages/google-cloud-storage/google/cloud/storage/asyncio/async_read_object_stream.py b/packages/google-cloud-storage/google/cloud/storage/asyncio/async_read_object_stream.py index 965c1abb9835..8d5595a81ed7 100644 --- a/packages/google-cloud-storage/google/cloud/storage/asyncio/async_read_object_stream.py +++ b/packages/google-cloud-storage/google/cloud/storage/asyncio/async_read_object_stream.py @@ -142,7 +142,10 @@ async def open(self, metadata: Optional[List[Tuple[str, str]]] = None) -> None: and response.metadata.finalize_time.seconds > 0 ): self.is_finalized = True - if hasattr(response.metadata, "checksums") and response.metadata.checksums: + if ( + hasattr(response.metadata, "checksums") + and response.metadata.checksums + ): self.full_obj_server_crc32c = response.metadata.checksums.crc32c if response and response.read_handle: diff --git a/packages/google-cloud-storage/tests/unit/asyncio/test_async_read_object_stream.py b/packages/google-cloud-storage/tests/unit/asyncio/test_async_read_object_stream.py index fc16ae41a0ac..9aaec2daed1b 100644 --- a/packages/google-cloud-storage/tests/unit/asyncio/test_async_read_object_stream.py +++ b/packages/google-cloud-storage/tests/unit/asyncio/test_async_read_object_stream.py @@ -392,7 +392,9 @@ async def test_recv_updates_read_handle_on_refresh( "google.cloud.storage.asyncio.async_grpc_client.AsyncGrpcClient.grpc_client" ) @pytest.mark.asyncio -async def test_open_unfinalized_object_skips_checksum(mock_client, mock_cls_async_bidi_rpc): +async def test_open_unfinalized_object_skips_checksum( + mock_client, mock_cls_async_bidi_rpc +): socket_like_rpc = AsyncMock() mock_cls_async_bidi_rpc.return_value = socket_like_rpc socket_like_rpc.open = AsyncMock() From c371f59259cf39dea9c96587674437c6f12c4fd0 Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Wed, 27 May 2026 08:06:22 +0000 Subject: [PATCH 6/9] test(storage): add conftest autouse event loop fixture to resolve pytest-asyncio issues --- .../tests/unit/conftest.py | 32 +++++++++++++++++++ 1 file changed, 32 insertions(+) create mode 100644 packages/google-cloud-storage/tests/unit/conftest.py diff --git a/packages/google-cloud-storage/tests/unit/conftest.py b/packages/google-cloud-storage/tests/unit/conftest.py new file mode 100644 index 000000000000..2eeabdc990e6 --- /dev/null +++ b/packages/google-cloud-storage/tests/unit/conftest.py @@ -0,0 +1,32 @@ +# -*- coding: utf-8 -*- +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import asyncio +import pytest + + +@pytest.fixture(autouse=True) +def set_event_loop(): + try: + asyncio.get_running_loop() + yield + except RuntimeError: + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + try: + yield + finally: + loop.close() + asyncio.set_event_loop(None) From c47064135cc470a0e706df2d0cbf7c125c569218 Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Wed, 27 May 2026 08:40:18 +0000 Subject: [PATCH 7/9] fix(storage): use second attribute instead of seconds for DatetimeWithNanoseconds --- .../google/cloud/storage/asyncio/async_read_object_stream.py | 2 +- .../tests/unit/asyncio/test_async_read_object_stream.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/packages/google-cloud-storage/google/cloud/storage/asyncio/async_read_object_stream.py b/packages/google-cloud-storage/google/cloud/storage/asyncio/async_read_object_stream.py index 8d5595a81ed7..8fd98d623571 100644 --- a/packages/google-cloud-storage/google/cloud/storage/asyncio/async_read_object_stream.py +++ b/packages/google-cloud-storage/google/cloud/storage/asyncio/async_read_object_stream.py @@ -139,7 +139,7 @@ async def open(self, metadata: Optional[List[Tuple[str, str]]] = None) -> None: if ( hasattr(response.metadata, "finalize_time") and response.metadata.finalize_time - and response.metadata.finalize_time.seconds > 0 + and response.metadata.finalize_time.second > 0 ): self.is_finalized = True if ( diff --git a/packages/google-cloud-storage/tests/unit/asyncio/test_async_read_object_stream.py b/packages/google-cloud-storage/tests/unit/asyncio/test_async_read_object_stream.py index 9aaec2daed1b..a8f64422765e 100644 --- a/packages/google-cloud-storage/tests/unit/asyncio/test_async_read_object_stream.py +++ b/packages/google-cloud-storage/tests/unit/asyncio/test_async_read_object_stream.py @@ -41,7 +41,7 @@ async def instantiate_read_obj_stream(mock_client, mock_cls_async_bidi_rpc, open recv_response.metadata = mock.MagicMock() recv_response.metadata.generation = _TEST_GENERATION_NUMBER recv_response.metadata.size = _TEST_OBJECT_SIZE - recv_response.metadata.finalize_time.seconds = 12345 + recv_response.metadata.finalize_time.second = 30 recv_response.metadata.checksums.crc32c = 98765 recv_response.read_handle = _TEST_READ_HANDLE socket_like_rpc.recv = AsyncMock(return_value=recv_response) @@ -403,7 +403,7 @@ async def test_open_unfinalized_object_skips_checksum( recv_response.metadata = mock.MagicMock() recv_response.metadata.generation = _TEST_GENERATION_NUMBER recv_response.metadata.size = _TEST_OBJECT_SIZE - recv_response.metadata.finalize_time.seconds = 0 # NOT finalized! + recv_response.metadata.finalize_time.second = 0 # NOT finalized! recv_response.metadata.checksums.crc32c = 98765 recv_response.read_handle = _TEST_READ_HANDLE socket_like_rpc.recv = AsyncMock(return_value=recv_response) From 2f49dcb1d83012f0d0ea29ce5714a54fcbcbd211 Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Wed, 27 May 2026 09:18:23 +0000 Subject: [PATCH 8/9] perf(storage): bypass rolling checksum updates when checksum validation is disabled --- .../retry/reads_resumption_strategy.py | 12 ++++++-- .../retry/test_reads_resumption_strategy.py | 30 ++++++++++++++++--- 2 files changed, 35 insertions(+), 7 deletions(-) diff --git a/packages/google-cloud-storage/google/cloud/storage/asyncio/retry/reads_resumption_strategy.py b/packages/google-cloud-storage/google/cloud/storage/asyncio/retry/reads_resumption_strategy.py index 35d974ae2911..3a782c8135eb 100644 --- a/packages/google-cloud-storage/google/cloud/storage/asyncio/retry/reads_resumption_strategy.py +++ b/packages/google-cloud-storage/google/cloud/storage/asyncio/retry/reads_resumption_strategy.py @@ -36,7 +36,11 @@ class _DownloadState: """A helper class to track the state of a single range download.""" def __init__( - self, initial_offset: int, initial_length: int, user_buffer: IO[bytes], is_full_object_read: bool = False + self, + initial_offset: int, + initial_length: int, + user_buffer: IO[bytes], + is_full_object_read: bool = False, ): self.initial_offset = initial_offset self.initial_length = initial_length @@ -45,7 +49,9 @@ def __init__( self.next_expected_offset = initial_offset self.is_complete = False self.is_full_object_read = is_full_object_read - self.rolling_checksum = google_crc32c.Checksum() if is_full_object_read else None + self.rolling_checksum = ( + google_crc32c.Checksum() if is_full_object_read else None + ) class _ReadResumptionStrategy(_BaseResumptionStrategy): @@ -143,7 +149,7 @@ def update_state_from_response( read_state.user_buffer.write(data) # Commit updates only after the write succeeds - if read_state.rolling_checksum is not None: + if checksum_enabled and read_state.rolling_checksum is not None: read_state.rolling_checksum.update(data) read_state.bytes_written += chunk_size read_state.next_expected_offset += chunk_size diff --git a/packages/google-cloud-storage/tests/unit/asyncio/retry/test_reads_resumption_strategy.py b/packages/google-cloud-storage/tests/unit/asyncio/retry/test_reads_resumption_strategy.py index 88bae515729b..4f7849801acd 100644 --- a/packages/google-cloud-storage/tests/unit/asyncio/retry/test_reads_resumption_strategy.py +++ b/packages/google-cloud-storage/tests/unit/asyncio/retry/test_reads_resumption_strategy.py @@ -53,7 +53,9 @@ def test_initialization_with_full_object_read(self): initial_offset = 10 initial_length = 100 user_buffer = io.BytesIO() - state_full = _DownloadState(initial_offset, initial_length, user_buffer, is_full_object_read=True) + state_full = _DownloadState( + initial_offset, initial_length, user_buffer, is_full_object_read=True + ) self.assertEqual(state_full.initial_offset, initial_offset) self.assertEqual(state_full.initial_length, initial_length) @@ -71,12 +73,17 @@ def setUp(self): self.state = {"download_states": {}, "read_handle": None, "routing_token": None} - def _add_download(self, read_id, offset=0, length=100, buffer=None, is_full_object_read=False): + def _add_download( + self, read_id, offset=0, length=100, buffer=None, is_full_object_read=False + ): """Helper to inject a download state into the correct nested location.""" if buffer is None: buffer = io.BytesIO() state = _DownloadState( - initial_offset=offset, initial_length=length, user_buffer=buffer, is_full_object_read=is_full_object_read + initial_offset=offset, + initial_length=length, + user_buffer=buffer, + is_full_object_read=is_full_object_read, ) self.state["download_states"][read_id] = state return state @@ -379,7 +386,9 @@ async def run(): def test_update_state_full_object_checksum_success(self): """Test that full object checksum verification succeeds on range_end.""" - read_state = self._add_download(_READ_ID, offset=0, length=9, is_full_object_read=True) + read_state = self._add_download( + _READ_ID, offset=0, length=9, is_full_object_read=True + ) self.state["enable_checksum"] = True self.state["full_obj_server_crc32c"] = google_crc32c.value(b"testdata1") @@ -413,3 +422,16 @@ def test_update_state_checksum_mismatch_ignored_when_disabled(self): # Should NOT raise DataCorruption! self.strategy.update_state_from_response(response, self.state) + + def test_update_state_full_object_checksum_mismatch_ignored_when_disabled(self): + """Test that a full-object CRC32C mismatch is ignored when enable_checksum is False.""" + self._add_download(_READ_ID, offset=0, length=9, is_full_object_read=True) + self.state["enable_checksum"] = False + self.state["full_obj_server_crc32c"] = 111111 # Wrong server checksum! + + resp1 = self._create_response(b"test", _READ_ID, offset=0) + self.strategy.update_state_from_response(resp1, self.state) + + resp2 = self._create_response(b"data1", _READ_ID, offset=4, range_end=True) + # Should NOT raise DataCorruption! + self.strategy.update_state_from_response(resp2, self.state) From 36511bcc662e62ff454740b892684a9e13344581 Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Thu, 4 Jun 2026 13:15:32 +0000 Subject: [PATCH 9/9] style(storage): fix lints and reformat code --- .../google/cloud/storage/_helpers.py | 4 +++- .../google/cloud/storage/_http.py | 2 +- .../asyncio/async_multi_range_downloader.py | 9 +++---- .../tests/system/test_zonal.py | 14 ++++++++--- .../test_async_multi_range_downloader.py | 24 ++++++++++++++----- .../tests/unit/conftest.py | 1 + 6 files changed, 39 insertions(+), 15 deletions(-) diff --git a/packages/google-cloud-storage/google/cloud/storage/_helpers.py b/packages/google-cloud-storage/google/cloud/storage/_helpers.py index 04039971ef41..c5d2de61796b 100644 --- a/packages/google-cloud-storage/google/cloud/storage/_helpers.py +++ b/packages/google-cloud-storage/google/cloud/storage/_helpers.py @@ -33,9 +33,11 @@ from google.cloud.exceptions import NotFound from google.cloud.storage._opentelemetry_tracing import ( - create_trace_span as _base_create_trace_span, _is_bucket_metadata_disabled, ) +from google.cloud.storage._opentelemetry_tracing import ( + create_trace_span as _base_create_trace_span, +) from google.cloud.storage.constants import _DEFAULT_TIMEOUT from google.cloud.storage.retry import ( DEFAULT_RETRY, diff --git a/packages/google-cloud-storage/google/cloud/storage/_http.py b/packages/google-cloud-storage/google/cloud/storage/_http.py index bfe2bf3843af..72734f511b41 100644 --- a/packages/google-cloud-storage/google/cloud/storage/_http.py +++ b/packages/google-cloud-storage/google/cloud/storage/_http.py @@ -25,9 +25,9 @@ from google.cloud.storage import __version__, _helpers from google.cloud.storage._opentelemetry_tracing import ( HAS_OPENTELEMETRY, + _is_bucket_metadata_disabled, create_trace_span, enable_otel_traces, - _is_bucket_metadata_disabled, ) logger = logging.getLogger(__name__) diff --git a/packages/google-cloud-storage/google/cloud/storage/asyncio/async_multi_range_downloader.py b/packages/google-cloud-storage/google/cloud/storage/asyncio/async_multi_range_downloader.py index fa680e95dba2..6d3f5e2fab4b 100644 --- a/packages/google-cloud-storage/google/cloud/storage/asyncio/async_multi_range_downloader.py +++ b/packages/google-cloud-storage/google/cloud/storage/asyncio/async_multi_range_downloader.py @@ -44,8 +44,8 @@ _DownloadState, _ReadResumptionStrategy, ) - from google.cloud.storage.exceptions import DataCorruption + from ._utils import raise_if_no_fast_crc32c _MAX_READ_RANGES_PER_BIDI_READ_REQUEST = 100 @@ -437,9 +437,10 @@ async def download_ranges( # Heuristic to detect full object reads: # - Implicit full object read: start offset is 0 and length is 0 (read all). # - Explicit full object read: start offset is 0 and length matches the exact persisted size. - is_full_object_read = ( - (offset == 0 and length == 0) or - (self.persisted_size is not None and offset == 0 and length == self.persisted_size) + is_full_object_read = (offset == 0 and length == 0) or ( + self.persisted_size is not None + and offset == 0 + and length == self.persisted_size ) download_states[read_id] = _DownloadState( initial_offset=offset, diff --git a/packages/google-cloud-storage/tests/system/test_zonal.py b/packages/google-cloud-storage/tests/system/test_zonal.py index b11f584192fe..2d79ec8a817c 100644 --- a/packages/google-cloud-storage/tests/system/test_zonal.py +++ b/packages/google-cloud-storage/tests/system/test_zonal.py @@ -26,7 +26,7 @@ ObjectContexts, ObjectCustomContextPayload, ) -from google.cloud.storage.exceptions import DataCorruption + pytestmark = pytest.mark.skipif( os.getenv("RUN_ZONAL_SYSTEM_TESTS") != "True", @@ -973,7 +973,13 @@ async def _run(): ], ) def test_mrd_checksum_validation( - storage_client, blobs_to_delete, event_loop, grpc_client_direct, read_start, read_length, enable_checksum + storage_client, + blobs_to_delete, + event_loop, + grpc_client_direct, + read_start, + read_length, + enable_checksum, ): """ Tests full downloads with specified offset, length, and enable_checksum toggle on finalized objects. @@ -995,7 +1001,9 @@ async def _run(): grpc_client_direct, _ZONAL_BUCKET, object_name ) as mrd: buffer = BytesIO() - await mrd.download_ranges([(read_start, read_length, buffer)], enable_checksum=enable_checksum) + await mrd.download_ranges( + [(read_start, read_length, buffer)], enable_checksum=enable_checksum + ) assert buffer.getvalue() == object_data # cleanup diff --git a/packages/google-cloud-storage/tests/unit/asyncio/test_async_multi_range_downloader.py b/packages/google-cloud-storage/tests/unit/asyncio/test_async_multi_range_downloader.py index 047c749b94c4..6ead8d8964e9 100644 --- a/packages/google-cloud-storage/tests/unit/asyncio/test_async_multi_range_downloader.py +++ b/packages/google-cloud-storage/tests/unit/asyncio/test_async_multi_range_downloader.py @@ -309,7 +309,9 @@ async def test_downloading_without_opening_should_throw_error(self): @mock.patch("google.cloud.storage.asyncio._utils.google_crc32c") @pytest.mark.asyncio - async def test_download_ranges_raises_if_crc32c_c_extension_is_missing(self, mock_google_crc32c): + async def test_download_ranges_raises_if_crc32c_c_extension_is_missing( + self, mock_google_crc32c + ): mock_google_crc32c.implementation = "python" mock_client = mock.MagicMock() mrd = AsyncMultiRangeDownloader(mock_client, "bucket", "object") @@ -586,7 +588,9 @@ async def staged_recv(): "google.cloud.storage.asyncio.async_multi_range_downloader._AsyncReadObjectStream" ) @pytest.mark.asyncio - async def test_open_populates_checksum_properties(self, mock_cls_async_read_object_stream): + async def test_open_populates_checksum_properties( + self, mock_cls_async_read_object_stream + ): # Arrange mock_client = mock.MagicMock() mock_client.grpc_client = mock.AsyncMock() @@ -620,7 +624,10 @@ async def test_open_populates_checksum_properties(self, mock_cls_async_read_obje ) @pytest.mark.asyncio async def test_download_ranges_configures_full_object_read_state( - self, mock_cls_async_read_object_stream, mock_retry_manager_cls, mock_strategy_cls + self, + mock_cls_async_read_object_stream, + mock_retry_manager_cls, + mock_strategy_cls, ): # Arrange mock_client = mock.MagicMock() @@ -644,7 +651,7 @@ async def test_download_ranges_configures_full_object_read_state( # Assert mock_retry_manager.execute.assert_called_once() initial_state = mock_retry_manager.execute.call_args[0][0] - + download_states = initial_state["download_states"] assert len(download_states) == 3 @@ -676,7 +683,10 @@ async def test_download_ranges_configures_full_object_read_state( ) @pytest.mark.asyncio async def test_download_ranges_closes_on_datacorruption( - self, mock_cls_async_read_object_stream, mock_retry_manager_cls, mock_strategy_cls + self, + mock_cls_async_read_object_stream, + mock_retry_manager_cls, + mock_strategy_cls, ): # Arrange mock_client = mock.MagicMock() @@ -688,7 +698,9 @@ async def test_download_ranges_closes_on_datacorruption( mrd.close = AsyncMock() mock_retry_manager = mock_retry_manager_cls.return_value - mock_retry_manager.execute = AsyncMock(side_effect=DataCorruption(None, "corrupted")) + mock_retry_manager.execute = AsyncMock( + side_effect=DataCorruption(None, "corrupted") + ) # Act & Assert with pytest.raises(DataCorruption): diff --git a/packages/google-cloud-storage/tests/unit/conftest.py b/packages/google-cloud-storage/tests/unit/conftest.py index 2eeabdc990e6..ef3d3a1afc21 100644 --- a/packages/google-cloud-storage/tests/unit/conftest.py +++ b/packages/google-cloud-storage/tests/unit/conftest.py @@ -14,6 +14,7 @@ # limitations under the License. import asyncio + import pytest