Skip to content

Commit 2361ba6

Browse files
authored
feat(storage): full object checksum: implement rolling checksum and verification in reads resumption strategy (#17262)
### 1. Overview of the Solution This solution implements end-to-end full-object checksum validation in `AsyncMultiRangeDownloader` for the asynchronous Google Cloud Storage Python client library. As asynchronous multiplexed downloads of non-contiguous ranges are performed concurrently over a single bidirectional gRPC connection, this feature automatically and incrementally calculates a rolling checksum as bytes arrive and validates it against the server's authoritative object checksum once the download completes. The technical approach consists of three coordinated layers: * **`_AsyncReadObjectStream` (Stream Ingestion)**: Safely extracts the authoritative server checksum (`full_obj_server_crc32c`) and finalization status (`is_finalized`) from the object metadata received in the first data payload response of the stream. * **`_ReadResumptionStrategy` & `_DownloadState` (Verification Logic)**: Computes an isolated, persistent rolling checksum in the individual `_DownloadState` object to ensure calculations do not bleed across concurrent multiplexed ranges. Crucially, the rolling hash updates only *after* buffer writes succeed to prevent state corruption during retry re-connects, raising a `DataCorruption` exception on completion if a mismatch occurs. * **`AsyncMultiRangeDownloader` (Orchestration & Cleanup)**: Detects candidate full-object ranges (e.g., `(0, 0)` or `(0, persisted_size)`), propagates checksum settings to the resumption strategy, and guarantees robust cleanup (closing the stream immediately and unregistering IDs) if data corruption or write errors occur. ### 2. What This PR Specifically Does This PR implements **Step 2: Full-Object Rolling Checksum & Resumption Verification Logic** of the solution: * Upgrades `_DownloadState` to track `is_full_object_read` and initialize an isolated `google_crc32c.Checksum()` rolling instance. * Updates `_ReadResumptionStrategy.update_state_from_response()` to run buffer writes *before* updating the rolling checksum, ensuring transactional safety during connection failures and retry reconnects. * Optimizes performance by bypassing rolling checksum calculations entirely if `enable_checksum` is `False`. * Performs the final validation match at `range_end` against the server's authoritative checksum, raising a `DataCorruption` exception if a mismatch is found. * Adds comprehensive unit tests in `test_reads_resumption_strategy.py` to verify successful validation, failure exceptions, and bypassed checks when validation is disabled.
1 parent e4a207d commit 2361ba6

2 files changed

Lines changed: 153 additions & 5 deletions

File tree

packages/google-cloud-storage/google/cloud/storage/asyncio/retry/reads_resumption_strategy.py

Lines changed: 39 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,14 +36,25 @@ class _DownloadState:
3636
"""A helper class to track the state of a single range download."""
3737

3838
def __init__(
39-
self, initial_offset: int, initial_length: int, user_buffer: IO[bytes]
39+
self,
40+
initial_offset: int,
41+
initial_length: int,
42+
user_buffer: IO[bytes],
43+
is_full_object_read: bool = False,
44+
enable_checksum: bool = True,
4045
):
4146
self.initial_offset = initial_offset
4247
self.initial_length = initial_length
4348
self.user_buffer = user_buffer
4449
self.bytes_written = 0
4550
self.next_expected_offset = initial_offset
4651
self.is_complete = False
52+
self.is_full_object_read = is_full_object_read
53+
self.rolling_checksum = (
54+
google_crc32c.Checksum()
55+
if (is_full_object_read and enable_checksum)
56+
else None
57+
)
4758

4859

4960
class _ReadResumptionStrategy(_BaseResumptionStrategy):
@@ -90,6 +101,7 @@ def update_state_from_response(
90101
)
91102

92103
download_states = state["download_states"]
104+
checksum_enabled = state.get("enable_checksum", True)
93105

94106
for object_data_range in proto.object_data_ranges:
95107
# Ignore empty ranges or ranges for IDs not in our state
@@ -125,7 +137,7 @@ def update_state_from_response(
125137
checksummed_data = object_data_range.checksummed_data
126138
data = checksummed_data.content
127139

128-
if checksummed_data.HasField("crc32c"):
140+
if checksum_enabled and checksummed_data.HasField("crc32c"):
129141
server_checksum = checksummed_data.crc32c
130142
client_checksum = google_crc32c.value(data)
131143
if server_checksum != client_checksum:
@@ -138,10 +150,14 @@ def update_state_from_response(
138150
# Update State & Write Data
139151
chunk_size = len(data)
140152
read_state.user_buffer.write(data)
153+
154+
# Commit updates only after the write succeeds
155+
if checksum_enabled and read_state.rolling_checksum is not None:
156+
read_state.rolling_checksum.update(data)
141157
read_state.bytes_written += chunk_size
142158
read_state.next_expected_offset += chunk_size
143159

144-
# Final Byte Count Verification
160+
# Final Byte Count & Full Object Checksum Verification
145161
if object_data_range.range_end:
146162
read_state.is_complete = True
147163
if (
@@ -154,6 +170,26 @@ def update_state_from_response(
154170
f"Expected {read_state.initial_length}, got {read_state.bytes_written}",
155171
)
156172

173+
# Perform full-object checksum verification once the stream finishes.
174+
if (
175+
read_state.is_full_object_read
176+
and checksum_enabled
177+
and read_state.rolling_checksum is not None
178+
):
179+
full_obj_server_crc32c = state.get("full_obj_server_crc32c")
180+
if full_obj_server_crc32c is not None:
181+
# Use standard big-endian byte conversion to retrieve the rolling checksum value.
182+
client_checksum = int.from_bytes(
183+
read_state.rolling_checksum.digest(),
184+
byteorder="big",
185+
)
186+
if client_checksum != full_obj_server_crc32c:
187+
raise DataCorruption(
188+
response,
189+
f"Full object checksum mismatch for read_id {read_id}. "
190+
f"Server authoritative crc32c: {full_obj_server_crc32c}, client calculated rolling: {client_checksum}.",
191+
)
192+
157193
async def recover_state_on_failure(self, error: Exception, state: Any) -> None:
158194
"""Handles BidiReadObjectRedirectedError for reads."""
159195
routing_token, read_handle = _handle_redirect(error)

packages/google-cloud-storage/tests/unit/asyncio/retry/test_reads_resumption_strategy.py

Lines changed: 114 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,48 @@ def test_initialization(self):
4545
self.assertEqual(state.bytes_written, 0)
4646
self.assertEqual(state.next_expected_offset, initial_offset)
4747
self.assertFalse(state.is_complete)
48+
self.assertFalse(state.is_full_object_read)
49+
self.assertIsNone(state.rolling_checksum)
50+
51+
def test_initialization_with_full_object_read(self):
52+
"""Test that _DownloadState initializes correctly when is_full_object_read is True."""
53+
initial_offset = 10
54+
initial_length = 100
55+
user_buffer = io.BytesIO()
56+
state_full = _DownloadState(
57+
initial_offset, initial_length, user_buffer, is_full_object_read=True
58+
)
59+
60+
self.assertEqual(state_full.initial_offset, initial_offset)
61+
self.assertEqual(state_full.initial_length, initial_length)
62+
self.assertEqual(state_full.user_buffer, user_buffer)
63+
self.assertEqual(state_full.bytes_written, 0)
64+
self.assertEqual(state_full.next_expected_offset, initial_offset)
65+
self.assertFalse(state_full.is_complete)
66+
self.assertTrue(state_full.is_full_object_read)
67+
self.assertIsNotNone(state_full.rolling_checksum)
68+
69+
def test_initialization_with_full_object_read_and_checksum_disabled(self):
70+
"""Test that _DownloadState does not initialize rolling_checksum when enable_checksum is False."""
71+
initial_offset = 10
72+
initial_length = 100
73+
user_buffer = io.BytesIO()
74+
state_full = _DownloadState(
75+
initial_offset,
76+
initial_length,
77+
user_buffer,
78+
is_full_object_read=True,
79+
enable_checksum=False,
80+
)
81+
82+
self.assertEqual(state_full.initial_offset, initial_offset)
83+
self.assertEqual(state_full.initial_length, initial_length)
84+
self.assertEqual(state_full.user_buffer, user_buffer)
85+
self.assertEqual(state_full.bytes_written, 0)
86+
self.assertEqual(state_full.next_expected_offset, initial_offset)
87+
self.assertFalse(state_full.is_complete)
88+
self.assertTrue(state_full.is_full_object_read)
89+
self.assertIsNone(state_full.rolling_checksum)
4890

4991

5092
class TestReadResumptionStrategy(unittest.TestCase):
@@ -53,12 +95,24 @@ def setUp(self):
5395

5496
self.state = {"download_states": {}, "read_handle": None, "routing_token": None}
5597

56-
def _add_download(self, read_id, offset=0, length=100, buffer=None):
98+
def _add_download(
99+
self,
100+
read_id,
101+
offset=0,
102+
length=100,
103+
buffer=None,
104+
is_full_object_read=False,
105+
enable_checksum=True,
106+
):
57107
"""Helper to inject a download state into the correct nested location."""
58108
if buffer is None:
59109
buffer = io.BytesIO()
60110
state = _DownloadState(
61-
initial_offset=offset, initial_length=length, user_buffer=buffer
111+
initial_offset=offset,
112+
initial_length=length,
113+
user_buffer=buffer,
114+
is_full_object_read=is_full_object_read,
115+
enable_checksum=enable_checksum,
62116
)
63117
self.state["download_states"][read_id] = state
64118
return state
@@ -358,3 +412,61 @@ async def run():
358412

359413
# Token should remain unchanged
360414
self.assertEqual(self.state["routing_token"], "existing-token")
415+
416+
def test_update_state_full_object_checksum_success(self):
417+
"""Test that full object checksum verification succeeds on range_end."""
418+
read_state = self._add_download(
419+
_READ_ID, offset=0, length=9, is_full_object_read=True
420+
)
421+
self.state["enable_checksum"] = True
422+
self.state["full_obj_server_crc32c"] = google_crc32c.value(b"testdata1")
423+
424+
resp1 = self._create_response(b"test", _READ_ID, offset=0)
425+
self.strategy.update_state_from_response(resp1, self.state)
426+
427+
resp2 = self._create_response(b"data1", _READ_ID, offset=4, range_end=True)
428+
self.strategy.update_state_from_response(resp2, self.state)
429+
430+
self.assertTrue(read_state.is_complete)
431+
self.assertEqual(read_state.bytes_written, 9)
432+
433+
def test_update_state_full_object_checksum_failure(self):
434+
"""Test that full object checksum verification raises DataCorruption on mismatch at range_end."""
435+
self._add_download(_READ_ID, offset=0, length=9, is_full_object_read=True)
436+
self.state["enable_checksum"] = True
437+
self.state["full_obj_server_crc32c"] = 111111 # Wrong server checksum!
438+
439+
resp1 = self._create_response(b"test", _READ_ID, offset=0)
440+
self.strategy.update_state_from_response(resp1, self.state)
441+
442+
resp2 = self._create_response(b"data1", _READ_ID, offset=4, range_end=True)
443+
with self.assertRaisesRegex(DataCorruption, "Full object checksum mismatch"):
444+
self.strategy.update_state_from_response(resp2, self.state)
445+
446+
def test_update_state_checksum_mismatch_ignored_when_disabled(self):
447+
"""Test that a CRC32C mismatch is ignored when enable_checksum is False."""
448+
self._add_download(_READ_ID)
449+
self.state["enable_checksum"] = False
450+
response = self._create_response(b"data", _READ_ID, offset=0, crc=999999)
451+
452+
# Should NOT raise DataCorruption!
453+
self.strategy.update_state_from_response(response, self.state)
454+
455+
def test_update_state_full_object_checksum_mismatch_ignored_when_disabled(self):
456+
"""Test that a full-object CRC32C mismatch is ignored when enable_checksum is False."""
457+
self._add_download(
458+
_READ_ID,
459+
offset=0,
460+
length=9,
461+
is_full_object_read=True,
462+
enable_checksum=False,
463+
)
464+
self.state["enable_checksum"] = False
465+
self.state["full_obj_server_crc32c"] = 111111 # Wrong server checksum!
466+
467+
resp1 = self._create_response(b"test", _READ_ID, offset=0)
468+
self.strategy.update_state_from_response(resp1, self.state)
469+
470+
resp2 = self._create_response(b"data1", _READ_ID, offset=4, range_end=True)
471+
# Should NOT raise DataCorruption!
472+
self.strategy.update_state_from_response(resp2, self.state)

0 commit comments

Comments
 (0)