From b92cacccfd1da28c42ae382a21367f920586a798 Mon Sep 17 00:00:00 2001 From: Albert Callarisa Date: Thu, 12 Feb 2026 11:51:18 +0100 Subject: [PATCH 01/10] Add support to bulk pubsub Signed-off-by: Albert Callarisa --- dapr/aio/clients/grpc/client.py | 93 ++++++++++++++++++++++++++++ dapr/clients/grpc/_response.py | 58 ++++++++++++++++++ dapr/clients/grpc/client.py | 95 +++++++++++++++++++++++++++++ examples/pubsub-simple/README.md | 8 ++- examples/pubsub-simple/publisher.py | 23 +++++++ 5 files changed, 275 insertions(+), 2 deletions(-) diff --git a/dapr/aio/clients/grpc/client.py b/dapr/aio/clients/grpc/client.py index a17b5311a..28cf24143 100644 --- a/dapr/aio/clients/grpc/client.py +++ b/dapr/aio/clients/grpc/client.py @@ -27,6 +27,7 @@ from google.protobuf.any_pb2 import Any as GrpcAny from google.protobuf.empty_pb2 import Empty as GrpcEmpty from google.protobuf.message import Message as GrpcMessage +from grpc import StatusCode # type: ignore from grpc.aio import ( # type: ignore AioRpcError, StreamStreamClientInterceptor, @@ -69,6 +70,8 @@ ) from dapr.clients.grpc._response import ( BindingResponse, + BulkPublishResponse, + BulkPublishResponseFailedEntry, BulkStateItem, BulkStatesResponse, ConfigurationResponse, @@ -484,6 +487,96 @@ async def publish_event( return DaprResponse(await call.initial_metadata()) + async def publish_events( + self, + pubsub_name: str, + topic_name: str, + data: Sequence[Union[bytes, str]], + publish_metadata: Dict[str, str] = {}, + data_content_type: Optional[str] = None, + ) -> BulkPublishResponse: + """Bulk publish multiple events to a given topic. + This publishes multiple events to a specified topic and pubsub component. + Each event can be bytes or str. The str data is encoded into bytes with + default charset of utf-8. + + The example publishes multiple string events to a topic: + + from dapr.aio.clients import DaprClient + async with DaprClient() as d: + resp = await d.publish_events( + pubsub_name='pubsub_1', + topic_name='TOPIC_A', + data=['message1', 'message2', 'message3'], + data_content_type='text/plain', + ) + # resp.failed_entries includes any entries that failed to publish. + + Args: + pubsub_name (str): the name of the pubsub component + topic_name (str): the topic name to publish to + data (Sequence[Union[bytes, str]]): sequence of events to publish; + each event must be bytes or str + publish_metadata (Dict[str, str], optional): Dapr metadata for the + bulk publish request + data_content_type (str, optional): content type of the event data + + Returns: + :class:`BulkPublishResponse` with any failed entries + """ + entries = [] + for event in data: + entry_id = str(uuid.uuid4()) + if isinstance(event, bytes): + event_data = event + content_type = data_content_type or 'application/octet-stream' + elif isinstance(event, str): + event_data = event.encode('utf-8') + content_type = data_content_type or 'text/plain' + else: + raise ValueError(f'invalid type for event {type(event)}') + + entries.append( + api_v1.BulkPublishRequestEntry( + entry_id=entry_id, + event=event_data, + content_type=content_type, + ) + ) + + req = api_v1.BulkPublishRequest( + pubsub_name=pubsub_name, + topic=topic_name, + entries=entries, + metadata=publish_metadata, + ) + + try: + call = self._stub.BulkPublishEvent(req) + response = await call + except AioRpcError as err: + if err.code() == StatusCode.UNIMPLEMENTED: + try: + call = self._stub.BulkPublishEventAlpha1(req) + response = await call + except AioRpcError as err2: + raise DaprGrpcError(err2) from err2 + else: + raise DaprGrpcError(err) from err + + failed_entries = [ + BulkPublishResponseFailedEntry( + entry_id=entry.entry_id, + error=entry.error, + ) + for entry in response.failedEntries + ] + + return BulkPublishResponse( + failed_entries=failed_entries, + headers=await call.initial_metadata(), + ) + async def subscribe( self, pubsub_name: str, diff --git a/dapr/clients/grpc/_response.py b/dapr/clients/grpc/_response.py index 6898bc42d..1c1856e6f 100644 --- a/dapr/clients/grpc/_response.py +++ b/dapr/clients/grpc/_response.py @@ -723,6 +723,64 @@ def _read_subscribe_config( pass +class BulkPublishResponseFailedEntry: + """A failed entry from the bulk publish response. + + Attributes: + entry_id (str): the entry ID that failed. + error (str): the error message for the failure. + """ + + def __init__(self, entry_id: str, error: str): + """Initializes BulkPublishResponseFailedEntry. + + Args: + entry_id (str): the entry ID that failed. + error (str): the error message for the failure. + """ + self._entry_id = entry_id + self._error = error + + @property + def entry_id(self) -> str: + """Gets the entry ID.""" + return self._entry_id + + @property + def error(self) -> str: + """Gets the error message.""" + return self._error + + +class BulkPublishResponse(DaprResponse): + """The response of publish_events (bulk publish) API. + + This inherits from DaprResponse + + Attributes: + failed_entries (List[BulkPublishResponseFailedEntry]): the entries that failed to publish. + """ + + def __init__( + self, + failed_entries: List[BulkPublishResponseFailedEntry] = [], + headers: MetadataTuple = (), + ): + """Initializes BulkPublishResponse from :obj:`runtime_v1.BulkPublishResponse`. + + Args: + failed_entries (List[BulkPublishResponseFailedEntry]): the entries that failed. + headers (Tuple, optional): the headers from Dapr gRPC response. + """ + super(BulkPublishResponse, self).__init__(headers) + self._failed_entries = failed_entries + + @property + def failed_entries(self) -> List[BulkPublishResponseFailedEntry]: + """Gets the failed entries.""" + return self._failed_entries + + class TopicEventResponseStatus(Enum): # success is the default behavior: message is acknowledged and not retried success = appcallback_v1.TopicEventResponse.TopicEventResponseStatus.SUCCESS diff --git a/dapr/clients/grpc/client.py b/dapr/clients/grpc/client.py index faf356491..f1635604c 100644 --- a/dapr/clients/grpc/client.py +++ b/dapr/clients/grpc/client.py @@ -31,6 +31,7 @@ from google.protobuf.struct_pb2 import Struct as GrpcStruct from grpc import ( # type: ignore RpcError, + StatusCode, StreamStreamClientInterceptor, StreamUnaryClientInterceptor, UnaryStreamClientInterceptor, @@ -60,6 +61,8 @@ ) from dapr.clients.grpc._response import ( BindingResponse, + BulkPublishResponse, + BulkPublishResponseFailedEntry, BulkStateItem, BulkStatesResponse, ConfigurationResponse, @@ -487,6 +490,98 @@ def publish_event( return DaprResponse(call.initial_metadata()) + def publish_events( + self, + pubsub_name: str, + topic_name: str, + data: Sequence[Union[bytes, str]], + publish_metadata: Dict[str, str] = {}, + data_content_type: Optional[str] = None, + ) -> BulkPublishResponse: + """Bulk publish multiple events to a given topic. + This publishes multiple events to a specified topic and pubsub component. + Each event can be bytes or str. The str data is encoded into bytes with + default charset of utf-8. + + The example publishes multiple string events to a topic: + + from dapr.clients import DaprClient + with DaprClient() as d: + resp = d.publish_events( + pubsub_name='pubsub_1', + topic_name='TOPIC_A', + data=['message1', 'message2', 'message3'], + data_content_type='text/plain', + ) + # resp.failed_entries includes any entries that failed to publish. + + Args: + pubsub_name (str): the name of the pubsub component + topic_name (str): the topic name to publish to + data (Sequence[Union[bytes, str]]): sequence of events to publish; + each event must be bytes or str + publish_metadata (Dict[str, str], optional): Dapr metadata for the + bulk publish request + data_content_type (str, optional): content type of the event data + + Returns: + :class:`BulkPublishResponse` with any failed entries + """ + entries = [] + for event in data: + entry_id = str(uuid.uuid4()) + if isinstance(event, bytes): + event_data = event + content_type = data_content_type or 'application/octet-stream' + elif isinstance(event, str): + event_data = event.encode('utf-8') + content_type = data_content_type or 'text/plain' + else: + raise ValueError(f'invalid type for event {type(event)}') + + entries.append( + api_v1.BulkPublishRequestEntry( + entry_id=entry_id, + event=event_data, + content_type=content_type, + ) + ) + + req = api_v1.BulkPublishRequest( + pubsub_name=pubsub_name, + topic=topic_name, + entries=entries, + metadata=publish_metadata, + ) + + try: + response, call = self.retry_policy.run_rpc( + self._stub.BulkPublishEvent.with_call, req + ) + except RpcError as err: + if err.code() == StatusCode.UNIMPLEMENTED: + try: + response, call = self.retry_policy.run_rpc( + self._stub.BulkPublishEventAlpha1.with_call, req + ) + except RpcError as err2: + raise DaprGrpcError(err2) from err2 + else: + raise DaprGrpcError(err) from err + + failed_entries = [ + BulkPublishResponseFailedEntry( + entry_id=entry.entry_id, + error=entry.error, + ) + for entry in response.failedEntries + ] + + return BulkPublishResponse( + failed_entries=failed_entries, + headers=call.initial_metadata(), + ) + def subscribe( self, pubsub_name: str, diff --git a/examples/pubsub-simple/README.md b/examples/pubsub-simple/README.md index 8abfad96b..98ff8ccfc 100644 --- a/examples/pubsub-simple/README.md +++ b/examples/pubsub-simple/README.md @@ -1,7 +1,7 @@ # Example - Publish and subscribe to messages -This example utilizes a publisher and a subscriber to show the pubsub pattern, it also shows `PublishEvent`, `OnTopicEvent`, `GetTopicSubscriptions`, and `TopicEventResponse` functionality. -It creates a publisher and calls the `publish_event` method in the `DaprClient`. +This example utilizes a publisher and a subscriber to show the pubsub pattern, it also shows `PublishEvent`, `PublishEvents` (bulk), `OnTopicEvent`, `GetTopicSubscriptions`, and `TopicEventResponse` functionality. +It creates a publisher and calls the `publish_event` and `publish_events` methods in the `DaprClient`. It will create a gRPC subscriber and bind the `OnTopicEvent` method, which gets triggered after a message is published to the subscribed topic. The subscriber will tell dapr to retry delivery of the first message it receives, logging that the message will be retried, and printing it at least once to standard output. @@ -39,6 +39,9 @@ expected_stdout_lines: - '== APP == Dead-Letter Subscriber. Originally intended topic: TOPIC_D' - '== APP == Subscriber received: TOPIC_CE' - '== APP == Subscriber received a json cloud event: id=8, message="hello world", content_type="application/json"' + - '== APP == Subscriber received: id=20, message="bulk event 1", content_type="application/json"' + - '== APP == Subscriber received: id=21, message="bulk event 2", content_type="application/json"' + - '== APP == Subscriber received: id=22, message="bulk event 3", content_type="application/json"' - '== APP == Subscriber received: TOPIC_CE' - '== APP == Subscriber received plain text cloud event: hello world, content_type="text/plain"' @@ -68,6 +71,7 @@ expected_stdout_lines: - "== APP == {'id': 6, 'message': 'hello world'}" - "== APP == {'id': 7, 'message': 'hello world'}" - "== APP == {'specversion': '1.0', 'type': 'com.example.event', 'source': 'my-service', 'id': 'abc-8', 'data': {'id': 8, 'message': 'hello world'}, 'datacontenttype': 'application/json'}" + - "== APP == Bulk published 3 events. Failed entries: 0" - "== APP == {'specversion': '1.0', 'type': 'com.example.event', 'source': 'my-service', 'id': 'abc-10', 'data': 'hello world', 'datacontenttype': 'text/plain'}" background: true sleep: 15 diff --git a/examples/pubsub-simple/publisher.py b/examples/pubsub-simple/publisher.py index e5954c651..b1ef88da5 100644 --- a/examples/pubsub-simple/publisher.py +++ b/examples/pubsub-simple/publisher.py @@ -91,6 +91,29 @@ time.sleep(0.5) + # Bulk publish multiple events at once using publish_events + bulk_events = [ + json.dumps({'id': 20, 'message': 'bulk event 1'}), + json.dumps({'id': 21, 'message': 'bulk event 2'}), + json.dumps({'id': 22, 'message': 'bulk event 3'}), + ] + + resp = d.publish_events( + pubsub_name='pubsub', + topic_name='TOPIC_A', + data=bulk_events, + data_content_type='application/json', + ) + + print(f'Bulk published {len(bulk_events)} events. ' + f'Failed entries: {len(resp.failed_entries)}', flush=True) + + if resp.failed_entries: + for entry in resp.failed_entries: + print(f' Failed entry_id={entry.entry_id}: {entry.error}', flush=True) + + time.sleep(0.5) + # Send a cloud event with plain text data id = 10 cloud_event = { From ff6dafd5501850206c7b4cdfa4e71bf9e6cf2029 Mon Sep 17 00:00:00 2001 From: Albert Callarisa Date: Thu, 12 Feb 2026 12:03:54 +0100 Subject: [PATCH 02/10] lint Signed-off-by: Albert Callarisa --- dapr/clients/grpc/client.py | 4 +--- examples/pubsub-simple/publisher.py | 6 ++++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/dapr/clients/grpc/client.py b/dapr/clients/grpc/client.py index f1635604c..fa69c359a 100644 --- a/dapr/clients/grpc/client.py +++ b/dapr/clients/grpc/client.py @@ -555,9 +555,7 @@ def publish_events( ) try: - response, call = self.retry_policy.run_rpc( - self._stub.BulkPublishEvent.with_call, req - ) + response, call = self.retry_policy.run_rpc(self._stub.BulkPublishEvent.with_call, req) except RpcError as err: if err.code() == StatusCode.UNIMPLEMENTED: try: diff --git a/examples/pubsub-simple/publisher.py b/examples/pubsub-simple/publisher.py index b1ef88da5..28232ec39 100644 --- a/examples/pubsub-simple/publisher.py +++ b/examples/pubsub-simple/publisher.py @@ -105,8 +105,10 @@ data_content_type='application/json', ) - print(f'Bulk published {len(bulk_events)} events. ' - f'Failed entries: {len(resp.failed_entries)}', flush=True) + print( + f'Bulk published {len(bulk_events)} events. Failed entries: {len(resp.failed_entries)}', + flush=True, + ) if resp.failed_entries: for entry in resp.failed_entries: From d25b53f3d2282f7b186e5d95f4036ed72ce33fe2 Mon Sep 17 00:00:00 2001 From: Albert Callarisa Date: Thu, 12 Feb 2026 12:05:44 +0100 Subject: [PATCH 03/10] install dapr last Signed-off-by: Albert Callarisa --- tox.ini | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tox.ini b/tox.ini index 1bdb17921..dbe093dbe 100644 --- a/tox.ini +++ b/tox.ini @@ -66,12 +66,12 @@ commands = ./validate.sh langgraph-checkpointer ./validate.sh ../ commands_pre = - pip3 install -e {toxinidir}/ pip3 install -e {toxinidir}/ext/dapr-ext-workflow/ pip3 install -e {toxinidir}/ext/dapr-ext-grpc/ pip3 install -e {toxinidir}/ext/dapr-ext-fastapi/ pip3 install -e {toxinidir}/ext/dapr-ext-langgraph/ pip3 install -e {toxinidir}/ext/dapr-ext-strands/ + pip3 install -e {toxinidir}/ allowlist_externals=* [testenv:example-component] From ffa0328f4240c11ee6f1f86ef18babe11c0c7657 Mon Sep 17 00:00:00 2001 From: Albert Callarisa Date: Thu, 12 Feb 2026 15:27:50 +0100 Subject: [PATCH 04/10] Fix tox dependencies install Signed-off-by: Albert Callarisa --- tox.ini | 68 +++++++++++++++++++++++++++++++++------------------------ 1 file changed, 39 insertions(+), 29 deletions(-) diff --git a/tox.ini b/tox.ini index dbe093dbe..a17f77c51 100644 --- a/tox.ini +++ b/tox.ini @@ -9,7 +9,14 @@ envlist = [testenv] setenv = PYTHONDONTWRITEBYTECODE=1 -deps = -rdev-requirements.txt +deps = + -rdev-requirements.txt + -e {toxinidir}/ext/dapr-ext-workflow/ + -e {toxinidir}/ext/dapr-ext-grpc/ + -e {toxinidir}/ext/dapr-ext-fastapi/ + -e {toxinidir}/ext/dapr-ext-langgraph/ + -e {toxinidir}/ext/dapr-ext-strands/ + -e {toxinidir}/ commands = coverage run -m unittest discover -v ./tests coverage run -a -m unittest discover -v ./ext/dapr-ext-workflow/tests @@ -19,14 +26,9 @@ commands = coverage run -a -m unittest discover -v ./ext/dapr-ext-strands/tests coverage run -a -m unittest discover -v ./ext/flask_dapr/tests coverage xml + commands_pre = - pip3 install -e {toxinidir}/ - pip3 install -e {toxinidir}/ext/dapr-ext-workflow/ - pip3 install -e {toxinidir}/ext/dapr-ext-grpc/ - pip3 install -e {toxinidir}/ext/dapr-ext-fastapi/ - pip3 install -e {toxinidir}/ext/dapr-ext-langgraph/ - pip3 install -e {toxinidir}/ext/dapr-ext-strands/ - pip3 install -e {toxinidir}/ext/flask_dapr/ + pip uninstall -y dapr dapr-ext-grpc dapr-ext-fastapi dapr-ext-langgraph dapr-ext-strands dapr-ext-flask dapr-ext-langgraph dapr-ext-strands [testenv:ruff] basepython = python3 @@ -41,6 +43,13 @@ basepython = python3 changedir = ./examples/ deps = mechanical-markdown + -e {toxinidir}/ext/dapr-ext-workflow/ + -e {toxinidir}/ext/dapr-ext-grpc/ + -e {toxinidir}/ext/dapr-ext-fastapi/ + -e {toxinidir}/ext/dapr-ext-langgraph/ + -e {toxinidir}/ext/dapr-ext-strands/ + -e {toxinidir}/ + commands = ./validate.sh conversation ./validate.sh crypto @@ -65,15 +74,11 @@ commands = ./validate.sh jobs ./validate.sh langgraph-checkpointer ./validate.sh ../ -commands_pre = - pip3 install -e {toxinidir}/ext/dapr-ext-workflow/ - pip3 install -e {toxinidir}/ext/dapr-ext-grpc/ - pip3 install -e {toxinidir}/ext/dapr-ext-fastapi/ - pip3 install -e {toxinidir}/ext/dapr-ext-langgraph/ - pip3 install -e {toxinidir}/ext/dapr-ext-strands/ - pip3 install -e {toxinidir}/ allowlist_externals=* +commands_pre = + pip uninstall -y dapr dapr-ext-grpc dapr-ext-fastapi dapr-ext-langgraph dapr-ext-strands dapr-ext-flask dapr-ext-langgraph dapr-ext-strands + [testenv:example-component] ; This environment is used to validate a specific example component. ; Usage: tox -e example-component -- component_name @@ -83,31 +88,36 @@ basepython = python3 changedir = ./examples/ deps = mechanical-markdown + -e {toxinidir}/ext/dapr-ext-workflow/ + -e {toxinidir}/ext/dapr-ext-grpc/ + -e {toxinidir}/ext/dapr-ext-fastapi/ + -e {toxinidir}/ext/dapr-ext-langgraph/ + -e {toxinidir}/ext/dapr-ext-strands/ + -e {toxinidir}/ commands = ./validate.sh {posargs} -commands_pre = - pip3 install -e {toxinidir}/ - pip3 install -e {toxinidir}/ext/dapr-ext-workflow/ - pip3 install -e {toxinidir}/ext/dapr-ext-grpc/ - pip3 install -e {toxinidir}/ext/dapr-ext-fastapi/ - pip3 install -e {toxinidir}/ext/dapr-ext-langgraph/ - pip3 install -e {toxinidir}/ext/dapr-ext-strands/ allowlist_externals=* +commands_pre = + pip uninstall -y dapr dapr-ext-grpc dapr-ext-fastapi dapr-ext-langgraph dapr-ext-strands dapr-ext-flask dapr-ext-langgraph dapr-ext-strands + [testenv:type] basepython = python3 usedevelop = False -deps = -rdev-requirements.txt +deps = + -rdev-requirements.txt + -e {toxinidir}/ext/dapr-ext-workflow/ + -e {toxinidir}/ext/dapr-ext-grpc/ + -e {toxinidir}/ext/dapr-ext-fastapi/ + -e {toxinidir}/ext/dapr-ext-langgraph/ + -e {toxinidir}/ext/dapr-ext-strands/ + -e {toxinidir}/ commands = mypy --config-file mypy.ini commands_pre = - pip3 install -e {toxinidir}/ - pip3 install -e {toxinidir}/ext/dapr-ext-workflow/ - pip3 install -e {toxinidir}/ext/dapr-ext-grpc/ - pip3 install -e {toxinidir}/ext/dapr-ext-fastapi/ - pip3 install -e {toxinidir}/ext/dapr-ext-langgraph/ - pip3 install -e {toxinidir}/ext/dapr-ext-strands/ + pip uninstall -y dapr dapr-ext-grpc dapr-ext-fastapi dapr-ext-langgraph dapr-ext-strands dapr-ext-flask dapr-ext-langgraph dapr-ext-strands + [testenv:doc] basepython = python3 usedevelop = False From 60a255099ab36db283a396be884ecc7fd4dfd2f2 Mon Sep 17 00:00:00 2001 From: Cassandra Coyle Date: Mon, 16 Feb 2026 18:50:25 -0600 Subject: [PATCH 05/10] add callback logic Signed-off-by: Cassandra Coyle --- ext/dapr-ext-grpc/dapr/ext/grpc/_servicer.py | 81 ++++++++++++++++++++ ext/dapr-ext-grpc/tests/test_servicier.py | 64 ++++++++++++++++ tests/clients/fake_dapr_server.py | 8 ++ tests/clients/test_dapr_grpc_client.py | 21 +++++ tests/clients/test_dapr_grpc_client_async.py | 21 +++++ 5 files changed, 195 insertions(+) diff --git a/ext/dapr-ext-grpc/dapr/ext/grpc/_servicer.py b/ext/dapr-ext-grpc/dapr/ext/grpc/_servicer.py index 8de632f97..eddd8d417 100644 --- a/ext/dapr-ext-grpc/dapr/ext/grpc/_servicer.py +++ b/ext/dapr-ext-grpc/dapr/ext/grpc/_servicer.py @@ -13,6 +13,7 @@ limitations under the License. """ +import warnings from typing import Callable, Dict, List, Optional, Tuple, Union from cloudevents.sdk.event import v1 # type: ignore @@ -29,6 +30,8 @@ from dapr.proto.runtime.v1.appcallback_pb2 import ( BindingEventRequest, JobEventRequest, + TopicEventBulkRequest, + TopicEventBulkResponse, TopicEventRequest, ) @@ -276,3 +279,81 @@ def OnJobEventAlpha1(self, request: JobEventRequest, context): # Return empty response return appcallback_v1.JobEventResponse() + + def _handle_bulk_topic_event( + self, request: TopicEventBulkRequest, context + ) -> TopicEventBulkResponse: + """Process bulk topic event request - routes each entry to the appropriate topic handler.""" + topic_key = request.pubsub_name + DELIMITER + request.topic + DELIMITER + request.path + no_validation_key = request.pubsub_name + DELIMITER + request.path + + if topic_key not in self._topic_map and no_validation_key not in self._topic_map: + return None # we don't have a handler + + handler_key = topic_key if topic_key in self._topic_map else no_validation_key + cb = self._topic_map[handler_key] # callback + + statuses = [] + for entry in request.entries: + entry_id = entry.entry_id + try: + # Build event from entry & send req with many entries + event = v1.Event() + extensions = dict() + if entry.HasField('cloud_event') and entry.cloud_event: + ce = entry.cloud_event + event.SetEventType(ce.type) + event.SetEventID(ce.id) + event.SetSource(ce.source) + event.SetData(ce.data) + event.SetContentType(ce.data_content_type) + if ce.extensions: + for k, v in ce.extensions.items(): + extensions[k] = v + else: + event.SetEventID(entry_id) + event.SetData(entry.bytes if entry.HasField('bytes') else b'') + event.SetContentType(entry.content_type or '') + event.SetSubject(request.topic) + if entry.metadata: + for k, v in entry.metadata.items(): + extensions[k] = v + for k, v in context.invocation_metadata(): + extensions['_metadata_' + k] = v + if extensions: + event.SetExtensions(extensions) + + response = cb(event) # invoke app registered handler and send event + if isinstance(response, TopicEventResponse): + status = response.status.value + else: + status = appcallback_v1.TopicEventResponse.TopicEventResponseStatus.SUCCESS + except Exception: + status = appcallback_v1.TopicEventResponse.TopicEventResponseStatus.RETRY + statuses.append( + appcallback_v1.TopicEventBulkResponseEntry(entry_id=entry_id, status=status) + ) + return appcallback_v1.TopicEventBulkResponse(statuses=statuses) + + def OnBulkTopicEvent(self, request: TopicEventBulkRequest, context): + """Subscribes bulk events from Pubsub""" + response = self._handle_bulk_topic_event(request, context) + if response is None: + context.set_code(grpc.StatusCode.UNIMPLEMENTED) # type: ignore + raise NotImplementedError(f'bulk topic {request.topic} is not implemented!') + return response + + def OnBulkTopicEventAlpha1(self, request: TopicEventBulkRequest, context): + """Subscribes bulk events from Pubsub. + Deprecated: Use OnBulkTopicEvent instead. + """ + warnings.warn( + 'OnBulkTopicEventAlpha1 is deprecated. Use OnBulkTopicEvent instead.', + DeprecationWarning, + stacklevel=2, + ) + response = self._handle_bulk_topic_event(request, context) + if response is None: + context.set_code(grpc.StatusCode.UNIMPLEMENTED) # type: ignore + raise NotImplementedError(f'bulk topic {request.topic} is not implemented!') + return response diff --git a/ext/dapr-ext-grpc/tests/test_servicier.py b/ext/dapr-ext-grpc/tests/test_servicier.py index 325d9b6d6..ca6e2f9bc 100644 --- a/ext/dapr-ext-grpc/tests/test_servicier.py +++ b/ext/dapr-ext-grpc/tests/test_servicier.py @@ -183,6 +183,70 @@ def test_non_registered_topic(self): ) +class BulkTopicEventTests(unittest.TestCase): + def setUp(self): + self._servicer = _CallbackServicer() + self._topic_method = Mock() + self._topic_method.return_value = TopicEventResponse('success') + self._servicer.register_topic('pubsub1', 'topic1', self._topic_method, {'session': 'key'}) + + self.fake_context = MagicMock() + self.fake_context.invocation_metadata.return_value = ( + ('key1', 'value1'), + ('key2', 'value1'), + ) + + def test_on_bulk_topic_event(self): + from dapr.proto.runtime.v1.appcallback_pb2 import ( + TopicEventBulkRequest, + TopicEventBulkRequestEntry, + ) + + entry1 = TopicEventBulkRequestEntry( + entry_id='entry1', + bytes=b'hello', + content_type='text/plain', + ) + entry2 = TopicEventBulkRequestEntry( + entry_id='entry2', + bytes=b'{"a": 1}', + content_type='application/json', + ) + request = TopicEventBulkRequest( + id='bulk1', + pubsub_name='pubsub1', + topic='topic1', + path='', + entries=[entry1, entry2], + ) + resp = self._servicer.OnBulkTopicEvent(request, self.fake_context) + self.assertEqual(2, len(resp.statuses)) + self.assertEqual('entry1', resp.statuses[0].entry_id) + self.assertEqual('entry2', resp.statuses[1].entry_id) + self.assertEqual( + appcallback_v1.TopicEventResponse.TopicEventResponseStatus.SUCCESS, + resp.statuses[0].status, + ) + self.assertEqual(2, self._topic_method.call_count) + + def test_on_bulk_topic_event_non_registered(self): + from dapr.proto.runtime.v1.appcallback_pb2 import ( + TopicEventBulkRequest, + TopicEventBulkRequestEntry, + ) + + entry = TopicEventBulkRequestEntry(entry_id='entry1', bytes=b'hello') + request = TopicEventBulkRequest( + id='bulk1', + pubsub_name='pubsub1', + topic='unknown_topic', + path='', + entries=[entry], + ) + with self.assertRaises(NotImplementedError): + self._servicer.OnBulkTopicEvent(request, self.fake_context) + + class BindingTests(unittest.TestCase): def setUp(self): self._servicer = _CallbackServicer() diff --git a/tests/clients/fake_dapr_server.py b/tests/clients/fake_dapr_server.py index 742b96763..3f63ebefe 100644 --- a/tests/clients/fake_dapr_server.py +++ b/tests/clients/fake_dapr_server.py @@ -155,6 +155,14 @@ def PublishEvent(self, request, context): context.set_trailing_metadata(trailers) return empty_pb2.Empty() + def BulkPublishEvent(self, request, context): + self.check_for_exception(context) + return api_v1.BulkPublishResponse() + + def BulkPublishEventAlpha1(self, request, context): + self.check_for_exception(context) + return api_v1.BulkPublishResponse() + def SubscribeTopicEventsAlpha1(self, request_iterator, context): for request in request_iterator: if request.HasField('initial_request'): diff --git a/tests/clients/test_dapr_grpc_client.py b/tests/clients/test_dapr_grpc_client.py index a52bbeb0d..6902c7773 100644 --- a/tests/clients/test_dapr_grpc_client.py +++ b/tests/clients/test_dapr_grpc_client.py @@ -271,6 +271,27 @@ def test_publish_error(self): data=111, ) + def test_publish_bulk_event(self): + dapr = DaprGrpcClient(f'{self.scheme}localhost:{self.grpc_port}') + resp = dapr.publish_bulk_event( + pubsub_name='pubsub', + topic_name='example', + events=[ + {'entry_id': '1', 'event': b'{"key": "value1"}'}, + {'entry_id': '2', 'event': b'{"key": "value2"}'}, + ], + ) + self.assertEqual(0, len(resp.failed_entries)) + + def test_publish_bulk_event_invalid_event_type(self): + dapr = DaprGrpcClient(f'{self.scheme}localhost:{self.grpc_port}') + with self.assertRaisesRegex(ValueError, 'invalid type for event data'): + dapr.publish_bulk_event( + pubsub_name='pubsub', + topic_name='example', + events=[{'entry_id': '1', 'event': 123}], + ) + def test_subscribe_topic(self): # The fake server we're using sends two messages and then closes the stream # The client should be able to read both messages, handle the stream closure and reconnect diff --git a/tests/clients/test_dapr_grpc_client_async.py b/tests/clients/test_dapr_grpc_client_async.py index 245c384dd..8406e3813 100644 --- a/tests/clients/test_dapr_grpc_client_async.py +++ b/tests/clients/test_dapr_grpc_client_async.py @@ -266,6 +266,27 @@ async def test_publish_error(self): data=111, ) + async def test_publish_bulk_event(self): + dapr = DaprGrpcClientAsync(f'{self.scheme}localhost:{self.grpc_port}') + resp = await dapr.publish_bulk_event( + pubsub_name='pubsub', + topic_name='example', + events=[ + {'entry_id': '1', 'event': b'{"key": "value1"}'}, + {'entry_id': '2', 'event': b'{"key": "value2"}'}, + ], + ) + self.assertEqual(0, len(resp.failed_entries)) + + async def test_publish_bulk_event_invalid_event_type(self): + dapr = DaprGrpcClientAsync(f'{self.scheme}localhost:{self.grpc_port}') + with self.assertRaisesRegex(ValueError, 'invalid type for event data'): + await dapr.publish_bulk_event( + pubsub_name='pubsub', + topic_name='example', + events=[{'entry_id': '1', 'event': 123}], + ) + async def test_subscribe_topic(self): # The fake server we're using sends two messages and then closes the stream # The client should be able to read both messages, handle the stream closure and reconnect From 23f5c034cd14f0366df1a417bdaba27769783e59 Mon Sep 17 00:00:00 2001 From: Albert Callarisa Date: Tue, 17 Feb 2026 11:31:42 +0100 Subject: [PATCH 06/10] Fix bulk pubsub unit tests Signed-off-by: Albert Callarisa --- tests/clients/test_dapr_grpc_client.py | 20 +++++++++++--------- tests/clients/test_dapr_grpc_client_async.py | 20 +++++++++++--------- tox.ini | 4 ++++ 3 files changed, 26 insertions(+), 18 deletions(-) diff --git a/tests/clients/test_dapr_grpc_client.py b/tests/clients/test_dapr_grpc_client.py index 6902c7773..75d417108 100644 --- a/tests/clients/test_dapr_grpc_client.py +++ b/tests/clients/test_dapr_grpc_client.py @@ -271,25 +271,27 @@ def test_publish_error(self): data=111, ) - def test_publish_bulk_event(self): + def test_publish_events(self): dapr = DaprGrpcClient(f'{self.scheme}localhost:{self.grpc_port}') - resp = dapr.publish_bulk_event( + resp = dapr.publish_events( pubsub_name='pubsub', topic_name='example', - events=[ - {'entry_id': '1', 'event': b'{"key": "value1"}'}, - {'entry_id': '2', 'event': b'{"key": "value2"}'}, + data=[ + b'{"key": "value1"}', + b'{"key": "value2"}', ], ) self.assertEqual(0, len(resp.failed_entries)) - def test_publish_bulk_event_invalid_event_type(self): + def test_publish_events_invalid_event_type(self): dapr = DaprGrpcClient(f'{self.scheme}localhost:{self.grpc_port}') - with self.assertRaisesRegex(ValueError, 'invalid type for event data'): - dapr.publish_bulk_event( + with self.assertRaisesRegex(ValueError, "invalid type for event "): + dapr.publish_events( pubsub_name='pubsub', topic_name='example', - events=[{'entry_id': '1', 'event': 123}], + data=[ + {'entry_id': '1', 'event': 123}, + ], ) def test_subscribe_topic(self): diff --git a/tests/clients/test_dapr_grpc_client_async.py b/tests/clients/test_dapr_grpc_client_async.py index 8406e3813..bf078cbcc 100644 --- a/tests/clients/test_dapr_grpc_client_async.py +++ b/tests/clients/test_dapr_grpc_client_async.py @@ -266,25 +266,27 @@ async def test_publish_error(self): data=111, ) - async def test_publish_bulk_event(self): + async def test_publish_events(self): dapr = DaprGrpcClientAsync(f'{self.scheme}localhost:{self.grpc_port}') - resp = await dapr.publish_bulk_event( + resp = await dapr.publish_events( pubsub_name='pubsub', topic_name='example', - events=[ - {'entry_id': '1', 'event': b'{"key": "value1"}'}, - {'entry_id': '2', 'event': b'{"key": "value2"}'}, + data=[ + b'{"key": "value1"}', + b'{"key": "value2"}', ], ) self.assertEqual(0, len(resp.failed_entries)) - async def test_publish_bulk_event_invalid_event_type(self): + async def test_publish_events_invalid_event_type(self): dapr = DaprGrpcClientAsync(f'{self.scheme}localhost:{self.grpc_port}') - with self.assertRaisesRegex(ValueError, 'invalid type for event data'): - await dapr.publish_bulk_event( + with self.assertRaisesRegex(ValueError, "invalid type for event "): + await dapr.publish_events( pubsub_name='pubsub', topic_name='example', - events=[{'entry_id': '1', 'event': 123}], + data=[ + {'entry_id': '1', 'event': 123}, + ], ) async def test_subscribe_topic(self): diff --git a/tox.ini b/tox.ini index a17f77c51..65f26ea0d 100644 --- a/tox.ini +++ b/tox.ini @@ -16,6 +16,7 @@ deps = -e {toxinidir}/ext/dapr-ext-fastapi/ -e {toxinidir}/ext/dapr-ext-langgraph/ -e {toxinidir}/ext/dapr-ext-strands/ + -e {toxinidir}/ext/flask_dapr/ -e {toxinidir}/ commands = coverage run -m unittest discover -v ./tests @@ -48,6 +49,7 @@ deps = -e {toxinidir}/ext/dapr-ext-fastapi/ -e {toxinidir}/ext/dapr-ext-langgraph/ -e {toxinidir}/ext/dapr-ext-strands/ + -e {toxinidir}/ext/flask_dapr/ -e {toxinidir}/ commands = @@ -93,6 +95,7 @@ deps = -e {toxinidir}/ext/dapr-ext-fastapi/ -e {toxinidir}/ext/dapr-ext-langgraph/ -e {toxinidir}/ext/dapr-ext-strands/ + -e {toxinidir}/ext/flask_dapr/ -e {toxinidir}/ commands = ./validate.sh {posargs} @@ -112,6 +115,7 @@ deps = -e {toxinidir}/ext/dapr-ext-fastapi/ -e {toxinidir}/ext/dapr-ext-langgraph/ -e {toxinidir}/ext/dapr-ext-strands/ + -e {toxinidir}/ext/flask_dapr/ -e {toxinidir}/ commands = mypy --config-file mypy.ini From 5e84bfa827db6176f9bfbebd3897d0e6be1f6db0 Mon Sep 17 00:00:00 2001 From: Albert Callarisa Date: Tue, 17 Feb 2026 11:34:15 +0100 Subject: [PATCH 07/10] lint Signed-off-by: Albert Callarisa --- ext/dapr-ext-grpc/dapr/ext/grpc/_servicer.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ext/dapr-ext-grpc/dapr/ext/grpc/_servicer.py b/ext/dapr-ext-grpc/dapr/ext/grpc/_servicer.py index eddd8d417..a3c8146b7 100644 --- a/ext/dapr-ext-grpc/dapr/ext/grpc/_servicer.py +++ b/ext/dapr-ext-grpc/dapr/ext/grpc/_servicer.py @@ -291,7 +291,7 @@ def _handle_bulk_topic_event( return None # we don't have a handler handler_key = topic_key if topic_key in self._topic_map else no_validation_key - cb = self._topic_map[handler_key] # callback + cb = self._topic_map[handler_key] # callback statuses = [] for entry in request.entries: @@ -323,7 +323,7 @@ def _handle_bulk_topic_event( if extensions: event.SetExtensions(extensions) - response = cb(event) # invoke app registered handler and send event + response = cb(event) # invoke app registered handler and send event if isinstance(response, TopicEventResponse): status = response.status.value else: From 91f1a8446c2517c73b28fada237b8abccf48731a Mon Sep 17 00:00:00 2001 From: Albert Callarisa Date: Tue, 17 Feb 2026 11:44:55 +0100 Subject: [PATCH 08/10] Fix type Signed-off-by: Albert Callarisa --- ext/dapr-ext-grpc/dapr/ext/grpc/_servicer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ext/dapr-ext-grpc/dapr/ext/grpc/_servicer.py b/ext/dapr-ext-grpc/dapr/ext/grpc/_servicer.py index a3c8146b7..3d9fcdb28 100644 --- a/ext/dapr-ext-grpc/dapr/ext/grpc/_servicer.py +++ b/ext/dapr-ext-grpc/dapr/ext/grpc/_servicer.py @@ -282,7 +282,7 @@ def OnJobEventAlpha1(self, request: JobEventRequest, context): def _handle_bulk_topic_event( self, request: TopicEventBulkRequest, context - ) -> TopicEventBulkResponse: + ) -> Optional[TopicEventBulkResponse]: """Process bulk topic event request - routes each entry to the appropriate topic handler.""" topic_key = request.pubsub_name + DELIMITER + request.topic + DELIMITER + request.path no_validation_key = request.pubsub_name + DELIMITER + request.path From 40021d4acd4c46e24a813e0ad45c83c495ba0c1e Mon Sep 17 00:00:00 2001 From: Albert Callarisa Date: Wed, 18 Feb 2026 16:59:38 +0100 Subject: [PATCH 09/10] Add tests for some edge cases Signed-off-by: Albert Callarisa --- ext/dapr-ext-grpc/tests/test_servicier.py | 73 ++++++++++++++++++++ tests/clients/fake_dapr_server.py | 32 ++++++++- tests/clients/test_dapr_grpc_client.py | 15 ++++ tests/clients/test_dapr_grpc_client_async.py | 15 ++++ 4 files changed, 132 insertions(+), 3 deletions(-) diff --git a/ext/dapr-ext-grpc/tests/test_servicier.py b/ext/dapr-ext-grpc/tests/test_servicier.py index ca6e2f9bc..1d362ea52 100644 --- a/ext/dapr-ext-grpc/tests/test_servicier.py +++ b/ext/dapr-ext-grpc/tests/test_servicier.py @@ -246,6 +246,79 @@ def test_on_bulk_topic_event_non_registered(self): with self.assertRaises(NotImplementedError): self._servicer.OnBulkTopicEvent(request, self.fake_context) + def test_on_bulk_topic_event_cloud_event_entry(self): + """Covers the cloud_event branch in _handle_bulk_topic_event.""" + from dapr.proto.runtime.v1.appcallback_pb2 import ( + TopicEventBulkRequest, + TopicEventBulkRequestEntry, + TopicEventCERequest, + ) + + ce = TopicEventCERequest( + id='ce-1', + source='test', + type='test.type', + spec_version='1.0', + data_content_type='text/plain', + data=b'cloud event payload', + ) + entry = TopicEventBulkRequestEntry(entry_id='entry1', cloud_event=ce) + request = TopicEventBulkRequest( + id='bulk1', + pubsub_name='pubsub1', + topic='topic1', + path='', + entries=[entry], + ) + resp = self._servicer.OnBulkTopicEvent(request, self.fake_context) + self.assertEqual(1, len(resp.statuses)) + self.assertEqual('entry1', resp.statuses[0].entry_id) + self._topic_method.assert_called_once() + + def test_on_bulk_topic_event_handler_raises_retry(self): + """Covers the exception -> RETRY path in _handle_bulk_topic_event.""" + from dapr.proto.runtime.v1.appcallback_pb2 import ( + TopicEventBulkRequest, + TopicEventBulkRequestEntry, + ) + + self._topic_method.side_effect = RuntimeError('handler failed') + entry = TopicEventBulkRequestEntry(entry_id='entry1', bytes=b'hello') + request = TopicEventBulkRequest( + id='bulk1', + pubsub_name='pubsub1', + topic='topic1', + path='', + entries=[entry], + ) + resp = self._servicer.OnBulkTopicEvent(request, self.fake_context) + self.assertEqual(1, len(resp.statuses)) + self.assertEqual( + appcallback_v1.TopicEventResponse.TopicEventResponseStatus.RETRY, + resp.statuses[0].status, + ) + + def test_on_bulk_topic_event_alpha1(self): + """Covers OnBulkTopicEventAlpha1 (deprecated) delegates like OnBulkTopicEvent.""" + from dapr.proto.runtime.v1.appcallback_pb2 import ( + TopicEventBulkRequest, + TopicEventBulkRequestEntry, + ) + + entry = TopicEventBulkRequestEntry(entry_id='alpha1', bytes=b'data') + request = TopicEventBulkRequest( + id='bulk1', + pubsub_name='pubsub1', + topic='topic1', + path='', + entries=[entry], + ) + with self.assertWarns(DeprecationWarning): + resp = self._servicer.OnBulkTopicEventAlpha1(request, self.fake_context) + self.assertEqual(1, len(resp.statuses)) + self.assertEqual('alpha1', resp.statuses[0].entry_id) + self._topic_method.assert_called_once() + class BindingTests(unittest.TestCase): def setUp(self): diff --git a/tests/clients/fake_dapr_server.py b/tests/clients/fake_dapr_server.py index 3f63ebefe..77734481d 100644 --- a/tests/clients/fake_dapr_server.py +++ b/tests/clients/fake_dapr_server.py @@ -1,6 +1,6 @@ import json from concurrent import futures -from typing import Dict +from typing import Dict, Optional, Tuple import grpc from google.protobuf import empty_pb2, struct_pb2 @@ -32,6 +32,18 @@ def __init__(self, grpc_port: int = 50001, http_port: int = 8080): self.jobs: Dict[str, api_v1.Job] = {} self.job_overwrites: Dict[str, bool] = {} self._next_exception = None + # When set, the next BulkPublishEvent call returns this many entries as failed. + self._bulk_publish_fail_next: Optional[Tuple[int, str]] = None + + def set_bulk_publish_failed_entries_on_next_call( + self, failed_entry_count: int = 1, error_message: str = 'simulated failure' + ) -> None: + """Configure the next BulkPublishEvent/BulkPublishEventAlpha1 call to return failed entries. + + The first failed_entry_count entries from the request will be reported as failed. + Useful for testing BulkPublishResponse with non-empty failed_entries. + """ + self._bulk_publish_fail_next = (failed_entry_count, error_message) def start(self): self._grpc_server.add_insecure_port(f'[::]:{self.grpc_port}') @@ -155,13 +167,27 @@ def PublishEvent(self, request, context): context.set_trailing_metadata(trailers) return empty_pb2.Empty() + def _bulk_publish_response(self, request) -> api_v1.BulkPublishResponse: + if not self._bulk_publish_fail_next or not request.entries: + return api_v1.BulkPublishResponse() + count, error_message = self._bulk_publish_fail_next + self._bulk_publish_fail_next = None + failed = [ + api_v1.BulkPublishResponseFailedEntry( + entry_id=entry.entry_id, + error=error_message, + ) + for entry in request.entries[:count] + ] + return api_v1.BulkPublishResponse(failedEntries=failed) + def BulkPublishEvent(self, request, context): self.check_for_exception(context) - return api_v1.BulkPublishResponse() + return self._bulk_publish_response(request) def BulkPublishEventAlpha1(self, request, context): self.check_for_exception(context) - return api_v1.BulkPublishResponse() + return self._bulk_publish_response(request) def SubscribeTopicEventsAlpha1(self, request_iterator, context): for request in request_iterator: diff --git a/tests/clients/test_dapr_grpc_client.py b/tests/clients/test_dapr_grpc_client.py index 75d417108..eb048832a 100644 --- a/tests/clients/test_dapr_grpc_client.py +++ b/tests/clients/test_dapr_grpc_client.py @@ -294,6 +294,21 @@ def test_publish_events_invalid_event_type(self): ], ) + def test_publish_events_with_failed_entries(self): + """Covers BulkPublishResponse with non-empty failed_entries.""" + self._fake_dapr_server.set_bulk_publish_failed_entries_on_next_call( + failed_entry_count=1, error_message='simulated failure' + ) + dapr = DaprGrpcClient(f'{self.scheme}localhost:{self.grpc_port}') + resp = dapr.publish_events( + pubsub_name='pubsub', + topic_name='example', + data=[b'first', b'second'], + ) + self.assertEqual(1, len(resp.failed_entries)) + self.assertEqual('simulated failure', resp.failed_entries[0].error) + self.assertIsNotNone(resp.failed_entries[0].entry_id) + def test_subscribe_topic(self): # The fake server we're using sends two messages and then closes the stream # The client should be able to read both messages, handle the stream closure and reconnect diff --git a/tests/clients/test_dapr_grpc_client_async.py b/tests/clients/test_dapr_grpc_client_async.py index bf078cbcc..975fa3871 100644 --- a/tests/clients/test_dapr_grpc_client_async.py +++ b/tests/clients/test_dapr_grpc_client_async.py @@ -289,6 +289,21 @@ async def test_publish_events_invalid_event_type(self): ], ) + async def test_publish_events_with_failed_entries(self): + """Covers BulkPublishResponse with non-empty failed_entries.""" + self._fake_dapr_server.set_bulk_publish_failed_entries_on_next_call( + failed_entry_count=1, error_message='simulated failure' + ) + dapr = DaprGrpcClientAsync(f'{self.scheme}localhost:{self.grpc_port}') + resp = await dapr.publish_events( + pubsub_name='pubsub', + topic_name='example', + data=[b'first', b'second'], + ) + self.assertEqual(1, len(resp.failed_entries)) + self.assertEqual('simulated failure', resp.failed_entries[0].error) + self.assertIsNotNone(resp.failed_entries[0].entry_id) + async def test_subscribe_topic(self): # The fake server we're using sends two messages and then closes the stream # The client should be able to read both messages, handle the stream closure and reconnect From e3dacf9344ff57763160ec1e1903f1fe3db049c7 Mon Sep 17 00:00:00 2001 From: Albert Callarisa Date: Wed, 18 Feb 2026 17:11:12 +0100 Subject: [PATCH 10/10] Add tests to cover more missed scenarios Signed-off-by: Albert Callarisa --- tests/clients/fake_dapr_server.py | 20 +++++++++++ tests/clients/test_dapr_grpc_client.py | 35 +++++++++++++++++- tests/clients/test_dapr_grpc_client_async.py | 38 +++++++++++++++++++- 3 files changed, 91 insertions(+), 2 deletions(-) diff --git a/tests/clients/fake_dapr_server.py b/tests/clients/fake_dapr_server.py index 77734481d..2c3d9b685 100644 --- a/tests/clients/fake_dapr_server.py +++ b/tests/clients/fake_dapr_server.py @@ -34,6 +34,16 @@ def __init__(self, grpc_port: int = 50001, http_port: int = 8080): self._next_exception = None # When set, the next BulkPublishEvent call returns this many entries as failed. self._bulk_publish_fail_next: Optional[Tuple[int, str]] = None + # When True, the next BulkPublishEvent (stable) call returns UNIMPLEMENTED; Alpha1 is unchanged. + self._bulk_publish_stable_unimplemented_next: bool = False + + def set_bulk_publish_unimplemented_on_stable_next(self) -> None: + """Make the next BulkPublishEvent (stable) call return UNIMPLEMENTED. + + BulkPublishEventAlpha1 is unchanged, so clients can fall back to Alpha1 and succeed. + Useful for testing the UNIMPLEMENTED fallback path in publish_events. + """ + self._bulk_publish_stable_unimplemented_next = True def set_bulk_publish_failed_entries_on_next_call( self, failed_entry_count: int = 1, error_message: str = 'simulated failure' @@ -182,6 +192,16 @@ def _bulk_publish_response(self, request) -> api_v1.BulkPublishResponse: return api_v1.BulkPublishResponse(failedEntries=failed) def BulkPublishEvent(self, request, context): + if self._bulk_publish_stable_unimplemented_next: + self._bulk_publish_stable_unimplemented_next = False + context.abort_with_status( + rpc_status.to_status( + status_pb2.Status( + code=code_pb2.UNIMPLEMENTED, + message='BulkPublishEvent not implemented', + ) + ) + ) self.check_for_exception(context) return self._bulk_publish_response(request) diff --git a/tests/clients/test_dapr_grpc_client.py b/tests/clients/test_dapr_grpc_client.py index eb048832a..8930e328b 100644 --- a/tests/clients/test_dapr_grpc_client.py +++ b/tests/clients/test_dapr_grpc_client.py @@ -271,7 +271,7 @@ def test_publish_error(self): data=111, ) - def test_publish_events(self): + def test_publish_events_bytes(self): dapr = DaprGrpcClient(f'{self.scheme}localhost:{self.grpc_port}') resp = dapr.publish_events( pubsub_name='pubsub', @@ -283,6 +283,15 @@ def test_publish_events(self): ) self.assertEqual(0, len(resp.failed_entries)) + def test_publish_events_strings(self): + dapr = DaprGrpcClient(f'{self.scheme}localhost:{self.grpc_port}') + resp = dapr.publish_events( + pubsub_name='pubsub', + topic_name='example', + data=['content1', 'content2'], + ) + self.assertEqual(0, len(resp.failed_entries)) + def test_publish_events_invalid_event_type(self): dapr = DaprGrpcClient(f'{self.scheme}localhost:{self.grpc_port}') with self.assertRaisesRegex(ValueError, "invalid type for event "): @@ -309,6 +318,30 @@ def test_publish_events_with_failed_entries(self): self.assertEqual('simulated failure', resp.failed_entries[0].error) self.assertIsNotNone(resp.failed_entries[0].entry_id) + def test_publish_events_fallback_to_alpha1_when_stable_unimplemented(self): + """Covers UNIMPLEMENTED -> BulkPublishEventAlpha1 fallback in publish_events.""" + self._fake_dapr_server.set_bulk_publish_unimplemented_on_stable_next() + dapr = DaprGrpcClient(f'{self.scheme}localhost:{self.grpc_port}') + resp = dapr.publish_events( + pubsub_name='pubsub', + topic_name='example', + data=[b'msg1', b'msg2'], + ) + self.assertEqual(0, len(resp.failed_entries)) + + def test_publish_events_raises_on_non_unimplemented_error(self): + """Covers non-UNIMPLEMENTED RpcError path in publish_events (raises DaprGrpcError).""" + self._fake_dapr_server.raise_exception_on_next_call( + status_pb2.Status(code=code_pb2.INTERNAL, message='bulk publish failed') + ) + dapr = DaprGrpcClient(f'{self.scheme}localhost:{self.grpc_port}') + with self.assertRaises(DaprGrpcError): + dapr.publish_events( + pubsub_name='pubsub', + topic_name='example', + data=[b'msg'], + ) + def test_subscribe_topic(self): # The fake server we're using sends two messages and then closes the stream # The client should be able to read both messages, handle the stream closure and reconnect diff --git a/tests/clients/test_dapr_grpc_client_async.py b/tests/clients/test_dapr_grpc_client_async.py index 975fa3871..e27b8dc52 100644 --- a/tests/clients/test_dapr_grpc_client_async.py +++ b/tests/clients/test_dapr_grpc_client_async.py @@ -266,7 +266,7 @@ async def test_publish_error(self): data=111, ) - async def test_publish_events(self): + async def test_publish_events_bytes(self): dapr = DaprGrpcClientAsync(f'{self.scheme}localhost:{self.grpc_port}') resp = await dapr.publish_events( pubsub_name='pubsub', @@ -278,6 +278,18 @@ async def test_publish_events(self): ) self.assertEqual(0, len(resp.failed_entries)) + async def test_publish_events_strings(self): + dapr = DaprGrpcClientAsync(f'{self.scheme}localhost:{self.grpc_port}') + resp = await dapr.publish_events( + pubsub_name='pubsub', + topic_name='example', + data=[ + 'content1', + 'content2', + ], + ) + self.assertEqual(0, len(resp.failed_entries)) + async def test_publish_events_invalid_event_type(self): dapr = DaprGrpcClientAsync(f'{self.scheme}localhost:{self.grpc_port}') with self.assertRaisesRegex(ValueError, "invalid type for event "): @@ -304,6 +316,30 @@ async def test_publish_events_with_failed_entries(self): self.assertEqual('simulated failure', resp.failed_entries[0].error) self.assertIsNotNone(resp.failed_entries[0].entry_id) + async def test_publish_events_fallback_to_alpha1_when_stable_unimplemented(self): + """Covers UNIMPLEMENTED -> BulkPublishEventAlpha1 fallback in publish_events.""" + self._fake_dapr_server.set_bulk_publish_unimplemented_on_stable_next() + dapr = DaprGrpcClientAsync(f'{self.scheme}localhost:{self.grpc_port}') + resp = await dapr.publish_events( + pubsub_name='pubsub', + topic_name='example', + data=[b'msg1', b'msg2'], + ) + self.assertEqual(0, len(resp.failed_entries)) + + async def test_publish_events_raises_on_non_unimplemented_error(self): + """Covers non-UNIMPLEMENTED AioRpcError path in publish_events (raises DaprGrpcError).""" + self._fake_dapr_server.raise_exception_on_next_call( + status_pb2.Status(code=code_pb2.INTERNAL, message='bulk publish failed') + ) + dapr = DaprGrpcClientAsync(f'{self.scheme}localhost:{self.grpc_port}') + with self.assertRaises(DaprGrpcError): + await dapr.publish_events( + pubsub_name='pubsub', + topic_name='example', + data=[b'msg'], + ) + async def test_subscribe_topic(self): # The fake server we're using sends two messages and then closes the stream # The client should be able to read both messages, handle the stream closure and reconnect