From 51f04d28f6cf60e50bd604999512e5ec268f768b Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Tue, 2 Jun 2026 13:56:59 -0700 Subject: [PATCH] fix(bigtable): ensure deadline is respected for read_rows_sharded --- .../google/cloud/bigtable/data/_async/client.py | 7 +++++-- .../google/cloud/bigtable/data/_sync_autogen/client.py | 4 +++- .../tests/unit/data/_async/test_client.py | 4 ++-- .../tests/unit/data/_sync_autogen/test_client.py | 4 ++-- 4 files changed, 12 insertions(+), 7 deletions(-) diff --git a/packages/google-cloud-bigtable/google/cloud/bigtable/data/_async/client.py b/packages/google-cloud-bigtable/google/cloud/bigtable/data/_async/client.py index 61cefa6ac1da..5d0a23e54364 100644 --- a/packages/google-cloud-bigtable/google/cloud/bigtable/data/_async/client.py +++ b/packages/google-cloud-bigtable/google/cloud/bigtable/data/_async/client.py @@ -1286,12 +1286,15 @@ async def read_rows_sharded( # limit the number of concurrent requests using a semaphore concurrency_sem = CrossSync.Semaphore(_CONCURRENCY_LIMIT) + # lock to ensure rpc_timeout_generator is thread-safe in sync version + gen_lock = CrossSync.Semaphore(1) @CrossSync.convert async def read_rows_with_semaphore(query): async with concurrency_sem: - # calculate new timeout based on time left in overall operation - shard_timeout = next(rpc_timeout_generator) + async with gen_lock: + # calculate new timeout based on time left in overall operation + shard_timeout = next(rpc_timeout_generator) if shard_timeout <= 0: raise DeadlineExceeded( "Operation timeout exceeded before starting query" diff --git a/packages/google-cloud-bigtable/google/cloud/bigtable/data/_sync_autogen/client.py b/packages/google-cloud-bigtable/google/cloud/bigtable/data/_sync_autogen/client.py index 384f2cbecd1b..6d808fe9719f 100644 --- a/packages/google-cloud-bigtable/google/cloud/bigtable/data/_sync_autogen/client.py +++ b/packages/google-cloud-bigtable/google/cloud/bigtable/data/_sync_autogen/client.py @@ -1049,10 +1049,12 @@ def read_rows_sharded( operation_timeout, operation_timeout ) concurrency_sem = CrossSync._Sync_Impl.Semaphore(_CONCURRENCY_LIMIT) + gen_lock = CrossSync._Sync_Impl.Semaphore(1) def read_rows_with_semaphore(query): with concurrency_sem: - shard_timeout = next(rpc_timeout_generator) + with gen_lock: + shard_timeout = next(rpc_timeout_generator) if shard_timeout <= 0: raise DeadlineExceeded( "Operation timeout exceeded before starting query" diff --git a/packages/google-cloud-bigtable/tests/unit/data/_async/test_client.py b/packages/google-cloud-bigtable/tests/unit/data/_async/test_client.py index b61dad59c709..391c38006df5 100644 --- a/packages/google-cloud-bigtable/tests/unit/data/_async/test_client.py +++ b/packages/google-cloud-bigtable/tests/unit/data/_async/test_client.py @@ -2304,7 +2304,7 @@ async def test_read_rows_sharded_expirary(self): from google.cloud.bigtable.data._helpers import _CONCURRENCY_LIMIT from google.cloud.bigtable.data.exceptions import ShardedReadRowsExceptionGroup - operation_timeout = 0.1 + operation_timeout = 5.0 # let the first batch complete, but the next batch times out num_queries = 15 @@ -2317,7 +2317,7 @@ async def mock_call(*args, **kwargs): if isinstance(next_item, Exception): raise next_item else: - await asyncio.sleep(next_item) + await CrossSync.sleep(next_item) return [mock.Mock()] async with self._make_client() as client: diff --git a/packages/google-cloud-bigtable/tests/unit/data/_sync_autogen/test_client.py b/packages/google-cloud-bigtable/tests/unit/data/_sync_autogen/test_client.py index efd90c7a9c34..f8edea5e1a32 100644 --- a/packages/google-cloud-bigtable/tests/unit/data/_sync_autogen/test_client.py +++ b/packages/google-cloud-bigtable/tests/unit/data/_sync_autogen/test_client.py @@ -1928,7 +1928,7 @@ def test_read_rows_sharded_expirary(self): from google.cloud.bigtable.data._helpers import _CONCURRENCY_LIMIT from google.cloud.bigtable.data.exceptions import ShardedReadRowsExceptionGroup - operation_timeout = 0.1 + operation_timeout = 5.0 num_queries = 15 sleeps = [0] * _CONCURRENCY_LIMIT + [DeadlineExceeded("times up")] * ( num_queries - _CONCURRENCY_LIMIT @@ -1939,7 +1939,7 @@ def mock_call(*args, **kwargs): if isinstance(next_item, Exception): raise next_item else: - asyncio.sleep(next_item) + CrossSync._Sync_Impl.sleep(next_item) return [mock.Mock()] with self._make_client() as client: