Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1286,12 +1286,15 @@ async def read_rows_sharded(

# limit the number of concurrent requests using a semaphore
concurrency_sem = CrossSync.Semaphore(_CONCURRENCY_LIMIT)
# lock to ensure rpc_timeout_generator is thread-safe in sync version
gen_lock = CrossSync.Semaphore(1)

@CrossSync.convert
async def read_rows_with_semaphore(query):
async with concurrency_sem:
# calculate new timeout based on time left in overall operation
shard_timeout = next(rpc_timeout_generator)
async with gen_lock:
# calculate new timeout based on time left in overall operation
shard_timeout = next(rpc_timeout_generator)
if shard_timeout <= 0:
raise DeadlineExceeded(
"Operation timeout exceeded before starting query"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1049,10 +1049,12 @@ def read_rows_sharded(
operation_timeout, operation_timeout
)
concurrency_sem = CrossSync._Sync_Impl.Semaphore(_CONCURRENCY_LIMIT)
gen_lock = CrossSync._Sync_Impl.Semaphore(1)

def read_rows_with_semaphore(query):
with concurrency_sem:
shard_timeout = next(rpc_timeout_generator)
with gen_lock:
shard_timeout = next(rpc_timeout_generator)
if shard_timeout <= 0:
raise DeadlineExceeded(
"Operation timeout exceeded before starting query"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2304,7 +2304,7 @@ async def test_read_rows_sharded_expirary(self):
from google.cloud.bigtable.data._helpers import _CONCURRENCY_LIMIT
from google.cloud.bigtable.data.exceptions import ShardedReadRowsExceptionGroup
Comment thread
chalmerlowe marked this conversation as resolved.

operation_timeout = 0.1
operation_timeout = 5.0

# let the first batch complete, but the next batch times out
num_queries = 15
Expand All @@ -2317,7 +2317,7 @@ async def mock_call(*args, **kwargs):
if isinstance(next_item, Exception):
raise next_item
else:
await asyncio.sleep(next_item)
await CrossSync.sleep(next_item)
return [mock.Mock()]

async with self._make_client() as client:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1928,7 +1928,7 @@ def test_read_rows_sharded_expirary(self):
from google.cloud.bigtable.data._helpers import _CONCURRENCY_LIMIT
from google.cloud.bigtable.data.exceptions import ShardedReadRowsExceptionGroup

operation_timeout = 0.1
operation_timeout = 5.0
num_queries = 15
sleeps = [0] * _CONCURRENCY_LIMIT + [DeadlineExceeded("times up")] * (
num_queries - _CONCURRENCY_LIMIT
Expand All @@ -1939,7 +1939,7 @@ def mock_call(*args, **kwargs):
if isinstance(next_item, Exception):
raise next_item
else:
asyncio.sleep(next_item)
CrossSync._Sync_Impl.sleep(next_item)
return [mock.Mock()]

with self._make_client() as client:
Expand Down
Loading