Skip to content

Commit c63be78

Browse files
committed
Fix tests
1 parent f61feda commit c63be78

File tree

2 files changed

+15
-10
lines changed

2 files changed

+15
-10
lines changed

tests/durabletask/test_worker_concurrency_loop.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -128,8 +128,8 @@ def fn(*args, **kwargs):
128128

129129
# Submit more work than concurrency allows
130130
for i in range(5):
131-
manager.submit_orchestration(make_work("orch", i))
132-
manager.submit_activity(make_work("act", i))
131+
manager.submit_orchestration(make_work("orch", i), lambda *a, **k: None)
132+
manager.submit_activity(make_work("act", i), lambda *a, **k: None)
133133

134134
# Run the manager loop in a thread (sync context)
135135
def run_manager():
@@ -139,6 +139,11 @@ def run_manager():
139139
t.start()
140140
time.sleep(1.5) # Let work process
141141
manager.shutdown()
142+
143+
# Ensure the queues have been started
144+
if (manager.activity_queue is None or manager.orchestration_queue is None):
145+
raise RuntimeError("Worker manager queues not initialized")
146+
142147
# Unblock the consumers by putting dummy items in the queues
143148
manager.activity_queue.put_nowait((lambda: None, (), {}))
144149
manager.orchestration_queue.put_nowait((lambda: None, (), {}))

tests/durabletask/test_worker_concurrency_loop_async.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -46,25 +46,25 @@ def test_worker_concurrency_loop_async():
4646
grpc_worker = TaskHubGrpcWorker(concurrency_options=options)
4747
stub = DummyStub()
4848

49-
async def dummy_orchestrator(req, stub, completionToken):
49+
async def dummy_orchestrator(self, req, stub, completionToken):
5050
await asyncio.sleep(0.1)
5151
stub.CompleteOrchestratorTask('ok')
5252

53-
async def cancel_dummy_orchestrator(req, stub, completionToken):
53+
async def cancel_dummy_orchestrator(self, req, stub, completionToken):
5454
pass
5555

56-
async def dummy_activity(req, stub, completionToken):
56+
async def dummy_activity(self, req, stub, completionToken):
5757
await asyncio.sleep(0.1)
5858
stub.CompleteActivityTask('ok')
5959

60-
async def cancel_dummy_activity(req, stub, completionToken):
60+
async def cancel_dummy_activity(self, req, stub, completionToken):
6161
pass
6262

6363
# Patch the worker's _execute_orchestrator and _execute_activity
64-
grpc_worker._execute_orchestrator = dummy_orchestrator
65-
grpc_worker._cancel_orchestrator = cancel_dummy_orchestrator
66-
grpc_worker._execute_activity = dummy_activity
67-
grpc_worker._cancel_activity = cancel_dummy_activity
64+
grpc_worker._execute_orchestrator = dummy_orchestrator.__get__(grpc_worker, TaskHubGrpcWorker)
65+
grpc_worker._cancel_orchestrator = cancel_dummy_orchestrator.__get__(grpc_worker, TaskHubGrpcWorker)
66+
grpc_worker._execute_activity = dummy_activity.__get__(grpc_worker, TaskHubGrpcWorker)
67+
grpc_worker._cancel_activity = cancel_dummy_activity.__get__(grpc_worker, TaskHubGrpcWorker)
6868

6969
orchestrator_requests = [DummyRequest('orchestrator', f'orch{i}') for i in range(3)]
7070
activity_requests = [DummyRequest('activity', f'act{i}') for i in range(4)]

0 commit comments

Comments
 (0)