From fb4a3ca93b1177bd59dc8944c85a195867c12c72 Mon Sep 17 00:00:00 2001 From: Casper Nielsen Date: Mon, 9 Feb 2026 10:59:40 +0100 Subject: [PATCH 1/4] fix(bug): ensure handling of shutdown when _response_stream is a generator. Future TODO: figure out if it's ever a grpc.Future. Signed-off-by: Casper Nielsen --- durabletask/worker.py | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/durabletask/worker.py b/durabletask/worker.py index 0ec2f66..5f2a81f 100644 --- a/durabletask/worker.py +++ b/durabletask/worker.py @@ -38,7 +38,6 @@ otel_tracer = None - class VersionNotRegisteredException(Exception): pass @@ -740,7 +739,13 @@ def stop(self): self._logger.info("Stopping gRPC worker...") if self._response_stream is not None: - self._response_stream.cancel() + if isinstance(self._response_stream, grpc.Future): + self._response_stream.cancel() + elif hasattr(self._response_stream, "call"): + # This is a generator returned by the gRPC stub + self._response_stream.call.cancel() + else: + self._logger.warning("Unknown response stream type, cannot cancel directly") self._shutdown.set() # Explicitly close the gRPC channel to ensure OTel interceptors and other resources are cleaned up if self._current_channel is not None: @@ -854,13 +859,15 @@ def _execute_activity( if otel_tracer is not None: span_context = otel_tracer.start_as_current_span( - name=f'activity: {req.name}', - context=otel_propagator.extract(carrier={"traceparent": req.parentTraceContext.traceParent}), + name=f"activity: {req.name}", + context=otel_propagator.extract( + carrier={"traceparent": req.parentTraceContext.traceParent} + ), attributes={ "durabletask.task.instance_id": instance_id, "durabletask.task.id": req.taskId, "durabletask.activity.name": req.name, - } + }, ) else: span_context = contextlib.nullcontext() From 0f8a1d6ebdf12336827cb938d8f5e71c8b298715 Mon Sep 17 00:00:00 2001 From: Casper Nielsen Date: Mon, 9 Feb 2026 10:59:52 +0100 Subject: [PATCH 2/4] chore(format): ruff Signed-off-by: Casper Nielsen --- durabletask/aio/client.py | 1 + durabletask/client.py | 2 ++ tests/durabletask/test_registry.py | 5 ++++- 3 files changed, 7 insertions(+), 1 deletion(-) diff --git a/durabletask/aio/client.py b/durabletask/aio/client.py index a57fe01..b229541 100644 --- a/durabletask/aio/client.py +++ b/durabletask/aio/client.py @@ -27,6 +27,7 @@ # If `opentelemetry-instrumentation-grpc` is available, enable the gRPC client interceptor try: from opentelemetry.instrumentation.grpc import GrpcInstrumentorClient + GrpcInstrumentorClient().instrument() except ImportError: pass diff --git a/durabletask/client.py b/durabletask/client.py index 749f90d..2d9ca37 100644 --- a/durabletask/client.py +++ b/durabletask/client.py @@ -24,10 +24,12 @@ # If `opentelemetry-instrumentation-grpc` is available, enable the gRPC client interceptor try: from opentelemetry.instrumentation.grpc import GrpcInstrumentorClient + GrpcInstrumentorClient().instrument() except ImportError: pass + class OrchestrationStatus(Enum): """The status of an orchestration instance.""" diff --git a/tests/durabletask/test_registry.py b/tests/durabletask/test_registry.py index b5fcfa9..f5cf319 100644 --- a/tests/durabletask/test_registry.py +++ b/tests/durabletask/test_registry.py @@ -165,6 +165,7 @@ def activity2(ctx, input): assert registry.get_activity(name1) is activity1 assert registry.get_activity(name2) is activity2 + def test_registry_add_named_versioned_orchestrators(): """Test adding versioned orchestrators.""" registry = worker._Registry() @@ -179,7 +180,9 @@ def orchestrator3(ctx, input): return "two" registry.add_named_orchestrator(name="orchestrator", fn=orchestrator1, version_name="v1") - registry.add_named_orchestrator(name="orchestrator", fn=orchestrator2, version_name="v2", is_latest=True) + registry.add_named_orchestrator( + name="orchestrator", fn=orchestrator2, version_name="v2", is_latest=True + ) registry.add_named_orchestrator(name="orchestrator", fn=orchestrator3, version_name="v3") orquestrator, version = registry.get_orchestrator(name="orchestrator") From ef07dbd2529dbe44d7cf742a171731c5ad4d4783 Mon Sep 17 00:00:00 2001 From: Casper Nielsen Date: Mon, 9 Feb 2026 11:51:05 +0100 Subject: [PATCH 3/4] test: add test case for testing .stop() branches Signed-off-by: Casper Nielsen --- tests/durabletask/test_worker_stop.py | 57 +++++++++++++++++++++++++++ 1 file changed, 57 insertions(+) create mode 100644 tests/durabletask/test_worker_stop.py diff --git a/tests/durabletask/test_worker_stop.py b/tests/durabletask/test_worker_stop.py new file mode 100644 index 0000000..ec6812e --- /dev/null +++ b/tests/durabletask/test_worker_stop.py @@ -0,0 +1,57 @@ +from unittest.mock import MagicMock, patch + +import grpc + +from durabletask.worker import TaskHubGrpcWorker + + +# Helper to create a running worker with a mocked runLoop +def _make_running_worker(): + worker = TaskHubGrpcWorker() + worker._is_running = True + worker._runLoop = MagicMock() + worker._runLoop.is_alive.return_value = False + return worker + + +def test_stop_with_grpc_future(): + worker = _make_running_worker() + mock_future = MagicMock(spec=grpc.Future) + worker._response_stream = mock_future + worker.stop() + mock_future.cancel.assert_called_once() + + +def test_stop_with_generator_call(): + worker = _make_running_worker() + mock_call = MagicMock() + mock_stream = MagicMock() + mock_stream.call = mock_call + worker._response_stream = mock_stream + worker.stop() + mock_call.cancel.assert_called_once() + + +def test_stop_with_unknown_stream_type(caplog): + worker = _make_running_worker() + # Not a grpc.Future, no 'call' attribute + worker._response_stream = object() + with caplog.at_level("WARNING"): + worker.stop() + assert any("Unknown response stream type" in m for m in caplog.text.splitlines()) + + +def test_stop_with_none_stream(): + worker = _make_running_worker() + worker._response_stream = None + # Should not raise + worker.stop() + + +def test_stop_when_not_running(): + worker = TaskHubGrpcWorker() + worker._is_running = False + # Should return immediately, not set _shutdown + with patch.object(worker._shutdown, "set") as shutdown_set: + worker.stop() + shutdown_set.assert_not_called() From b98630e693aa9689ce9a943334fdaea6947fa980 Mon Sep 17 00:00:00 2001 From: Casper Nielsen Date: Mon, 9 Feb 2026 12:37:55 +0100 Subject: [PATCH 4/4] fix: set better type handler and handle both grpc.Future generator based and single grpc.Future _response_stream close methods Signed-off-by: Casper Nielsen --- durabletask/worker.py | 27 +++++++++++++++------------ tests/durabletask/test_worker_stop.py | 2 +- 2 files changed, 16 insertions(+), 13 deletions(-) diff --git a/durabletask/worker.py b/durabletask/worker.py index 5f2a81f..7d05f01 100644 --- a/durabletask/worker.py +++ b/durabletask/worker.py @@ -12,7 +12,7 @@ from datetime import datetime, timedelta from threading import Event, Thread from types import GeneratorType -from typing import Any, Generator, Optional, Sequence, TypeVar, Union +from typing import Any, Generator, Iterator, Optional, Sequence, TypeVar, Union import grpc from google.protobuf import empty_pb2 @@ -282,7 +282,7 @@ class TaskHubGrpcWorker: activity function. """ - _response_stream: Optional[grpc.Future] = None + _response_stream: Optional[Union[Iterator[grpc.Future], grpc.Future]] = None _interceptors: Optional[list[shared.ClientInterceptor]] = None def __init__( @@ -420,9 +420,12 @@ def invalidate_connection(): # Cancel the response stream first to signal the reader thread to stop if self._response_stream is not None: try: - self._response_stream.cancel() - except Exception: - pass + if hasattr(self._response_stream, "call"): + self._response_stream.call.cancel() # type: ignore + else: + self._response_stream.cancel() # type: ignore + except Exception as e: + self._logger.warning(f"Error cancelling response stream: {e}") self._response_stream = None # Wait for the reader thread to finish @@ -739,13 +742,13 @@ def stop(self): self._logger.info("Stopping gRPC worker...") if self._response_stream is not None: - if isinstance(self._response_stream, grpc.Future): - self._response_stream.cancel() - elif hasattr(self._response_stream, "call"): - # This is a generator returned by the gRPC stub - self._response_stream.call.cancel() - else: - self._logger.warning("Unknown response stream type, cannot cancel directly") + try: + if hasattr(self._response_stream, "call"): + self._response_stream.call.cancel() # type: ignore + else: + self._response_stream.cancel() # type: ignore + except Exception as e: + self._logger.warning(f"Error cancelling response stream: {e}") self._shutdown.set() # Explicitly close the gRPC channel to ensure OTel interceptors and other resources are cleaned up if self._current_channel is not None: diff --git a/tests/durabletask/test_worker_stop.py b/tests/durabletask/test_worker_stop.py index ec6812e..c9cb70f 100644 --- a/tests/durabletask/test_worker_stop.py +++ b/tests/durabletask/test_worker_stop.py @@ -38,7 +38,7 @@ def test_stop_with_unknown_stream_type(caplog): worker._response_stream = object() with caplog.at_level("WARNING"): worker.stop() - assert any("Unknown response stream type" in m for m in caplog.text.splitlines()) + assert any("Error cancelling response stream: " in m for m in caplog.text.splitlines()) def test_stop_with_none_stream():