Skip to content

Commit fed58ce

Browse files
committed
Python 3.9 specific test fix
1 parent cb9695e commit fed58ce

File tree

1 file changed

+65
-30
lines changed

1 file changed

+65
-30
lines changed

durabletask/worker.py

Lines changed: 65 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,7 @@ class TaskHubGrpcWorker:
210210
ActivityNotRegisteredError: If an activity work item references an unregistered
211211
activity function.
212212
"""
213+
213214
_response_stream: Optional[grpc.Future] = None
214215
_interceptors: Optional[list[shared.ClientInterceptor]] = None
215216

@@ -1323,15 +1324,13 @@ def __init__(self, concurrency_options: ConcurrencyOptions):
13231324
self.concurrency_options = concurrency_options
13241325
self.activity_semaphore = None
13251326
self.orchestration_semaphore = None
1326-
self.activity_queue: asyncio.Queue = asyncio.Queue()
1327-
self.orchestration_queue: asyncio.Queue = asyncio.Queue()
1327+
# Don't create queues here - defer until we have an event loop
1328+
self.activity_queue: Optional[asyncio.Queue] = None
1329+
self.orchestration_queue: Optional[asyncio.Queue] = None
13281330
self._queue_event_loop: Optional[asyncio.AbstractEventLoop] = None
1329-
# Try to capture the current event loop when queues are created
1330-
try:
1331-
self._queue_event_loop = asyncio.get_running_loop()
1332-
except RuntimeError:
1333-
# No event loop running when manager was created
1334-
pass
1331+
# Store work items when no event loop is available
1332+
self._pending_activity_work: list = []
1333+
self._pending_orchestration_work: list = []
13351334
self.thread_pool = ThreadPoolExecutor(
13361335
max_workers=concurrency_options.maximum_thread_pool_workers,
13371336
thread_name_prefix="DurableTask",
@@ -1346,26 +1345,30 @@ def _ensure_queues_for_current_loop(self):
13461345
# No event loop running, can't create queues
13471346
return
13481347

1349-
if self._queue_event_loop is current_loop and hasattr(self, 'activity_queue') and hasattr(self, 'orchestration_queue'):
1350-
# Queues are already bound to the current loop and exist
1351-
return
1348+
# Check if queues are already properly set up for current loop
1349+
if self._queue_event_loop is current_loop:
1350+
if self.activity_queue is not None and self.orchestration_queue is not None:
1351+
# Queues are already bound to the current loop and exist
1352+
return
13521353

13531354
# Need to recreate queues for the current event loop
13541355
# First, preserve any existing work items
13551356
existing_activity_items = []
13561357
existing_orchestration_items = []
13571358

1358-
if hasattr(self, 'activity_queue'):
1359+
if self.activity_queue is not None:
13591360
try:
13601361
while not self.activity_queue.empty():
13611362
existing_activity_items.append(self.activity_queue.get_nowait())
13621363
except Exception:
13631364
pass
13641365

1365-
if hasattr(self, 'orchestration_queue'):
1366+
if self.orchestration_queue is not None:
13661367
try:
13671368
while not self.orchestration_queue.empty():
1368-
existing_orchestration_items.append(self.orchestration_queue.get_nowait())
1369+
existing_orchestration_items.append(
1370+
self.orchestration_queue.get_nowait()
1371+
)
13691372
except Exception:
13701373
pass
13711374

@@ -1380,6 +1383,16 @@ def _ensure_queues_for_current_loop(self):
13801383
for item in existing_orchestration_items:
13811384
self.orchestration_queue.put_nowait(item)
13821385

1386+
# Move pending work items to the queues
1387+
for item in self._pending_activity_work:
1388+
self.activity_queue.put_nowait(item)
1389+
for item in self._pending_orchestration_work:
1390+
self.orchestration_queue.put_nowait(item)
1391+
1392+
# Clear the pending work lists
1393+
self._pending_activity_work.clear()
1394+
self._pending_orchestration_work.clear()
1395+
13831396
async def run(self):
13841397
# Reset shutdown flag in case this manager is being reused
13851398
self._shutdown = False
@@ -1396,10 +1409,13 @@ async def run(self):
13961409
)
13971410

13981411
# Start background consumers for each work type
1399-
await asyncio.gather(
1400-
self._consume_queue(self.activity_queue, self.activity_semaphore),
1401-
self._consume_queue(self.orchestration_queue, self.orchestration_semaphore),
1402-
)
1412+
if self.activity_queue is not None and self.orchestration_queue is not None:
1413+
await asyncio.gather(
1414+
self._consume_queue(self.activity_queue, self.activity_semaphore),
1415+
self._consume_queue(
1416+
self.orchestration_queue, self.orchestration_semaphore
1417+
),
1418+
)
14031419

14041420
async def _consume_queue(self, queue: asyncio.Queue, semaphore: asyncio.Semaphore):
14051421
# List to track running tasks
@@ -1421,10 +1437,14 @@ async def _consume_queue(self, queue: asyncio.Queue, semaphore: asyncio.Semaphor
14211437

14221438
func, args, kwargs = work
14231439
# Create a concurrent task for processing
1424-
task = asyncio.create_task(self._process_work_item(semaphore, queue, func, args, kwargs))
1440+
task = asyncio.create_task(
1441+
self._process_work_item(semaphore, queue, func, args, kwargs)
1442+
)
14251443
running_tasks.add(task)
14261444

1427-
async def _process_work_item(self, semaphore: asyncio.Semaphore, queue: asyncio.Queue, func, args, kwargs):
1445+
async def _process_work_item(
1446+
self, semaphore: asyncio.Semaphore, queue: asyncio.Queue, func, args, kwargs
1447+
):
14281448
async with semaphore:
14291449
try:
14301450
await self._run_func(func, *args, **kwargs)
@@ -1437,17 +1457,32 @@ async def _run_func(self, func, *args, **kwargs):
14371457
else:
14381458
loop = asyncio.get_running_loop()
14391459
# Avoid submitting to executor after shutdown
1440-
if getattr(self, '_shutdown', False) and getattr(self, 'thread_pool', None) and getattr(self.thread_pool, '_shutdown', False):
1460+
if (
1461+
getattr(self, "_shutdown", False) and getattr(self, "thread_pool", None) and getattr(
1462+
self.thread_pool, "_shutdown", False)
1463+
):
14411464
return None
1442-
return await loop.run_in_executor(self.thread_pool, lambda: func(*args, **kwargs))
1465+
return await loop.run_in_executor(
1466+
self.thread_pool, lambda: func(*args, **kwargs)
1467+
)
14431468

14441469
def submit_activity(self, func, *args, **kwargs):
1470+
work_item = (func, args, kwargs)
14451471
self._ensure_queues_for_current_loop()
1446-
self.activity_queue.put_nowait((func, args, kwargs))
1472+
if self.activity_queue is not None:
1473+
self.activity_queue.put_nowait(work_item)
1474+
else:
1475+
# No event loop running, store in pending list
1476+
self._pending_activity_work.append(work_item)
14471477

14481478
def submit_orchestration(self, func, *args, **kwargs):
1479+
work_item = (func, args, kwargs)
14491480
self._ensure_queues_for_current_loop()
1450-
self.orchestration_queue.put_nowait((func, args, kwargs))
1481+
if self.orchestration_queue is not None:
1482+
self.orchestration_queue.put_nowait(work_item)
1483+
else:
1484+
# No event loop running, store in pending list
1485+
self._pending_orchestration_work.append(work_item)
14511486

14521487
def shutdown(self):
14531488
self._shutdown = True
@@ -1457,24 +1492,24 @@ def reset_for_new_run(self):
14571492
"""Reset the manager state for a new run."""
14581493
self._shutdown = False
14591494
# Clear any existing queues - they'll be recreated when needed
1460-
if hasattr(self, 'activity_queue'):
1495+
if self.activity_queue is not None:
14611496
# Clear existing queue by creating a new one
14621497
# This ensures no items from previous runs remain
14631498
try:
14641499
while not self.activity_queue.empty():
14651500
self.activity_queue.get_nowait()
14661501
except Exception:
14671502
pass
1468-
if hasattr(self, 'orchestration_queue'):
1503+
if self.orchestration_queue is not None:
14691504
try:
14701505
while not self.orchestration_queue.empty():
14711506
self.orchestration_queue.get_nowait()
14721507
except Exception:
14731508
pass
1509+
# Clear pending work lists
1510+
self._pending_activity_work.clear()
1511+
self._pending_orchestration_work.clear()
14741512

14751513

14761514
# Export public API
1477-
__all__ = [
1478-
'ConcurrencyOptions',
1479-
'TaskHubGrpcWorker'
1480-
]
1515+
__all__ = ["ConcurrencyOptions", "TaskHubGrpcWorker"]

0 commit comments

Comments
 (0)