Skip to content

Commit 435bc08

Browse files
acrocacicoyle
authored andcommitted
Add support to bulk pubsub (#915)
* Add support to bulk pubsub Signed-off-by: Albert Callarisa <albert@diagrid.io> * lint Signed-off-by: Albert Callarisa <albert@diagrid.io> * install dapr last Signed-off-by: Albert Callarisa <albert@diagrid.io> * Fix tox dependencies install Signed-off-by: Albert Callarisa <albert@diagrid.io> * add callback logic Signed-off-by: Cassandra Coyle <cassie@diagrid.io> * Fix bulk pubsub unit tests Signed-off-by: Albert Callarisa <albert@diagrid.io> * lint Signed-off-by: Albert Callarisa <albert@diagrid.io> * Fix type Signed-off-by: Albert Callarisa <albert@diagrid.io> * Add tests for some edge cases Signed-off-by: Albert Callarisa <albert@diagrid.io> * Add tests to cover more missed scenarios Signed-off-by: Albert Callarisa <albert@diagrid.io> --------- Signed-off-by: Albert Callarisa <albert@diagrid.io> Signed-off-by: Cassandra Coyle <cassie@diagrid.io> Co-authored-by: Cassandra Coyle <cassie@diagrid.io> (cherry picked from commit 4e0e38c)
1 parent 010bca5 commit 435bc08

11 files changed

Lines changed: 736 additions & 32 deletions

File tree

dapr/aio/clients/grpc/client.py

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
from google.protobuf.any_pb2 import Any as GrpcAny
2828
from google.protobuf.empty_pb2 import Empty as GrpcEmpty
2929
from google.protobuf.message import Message as GrpcMessage
30+
from grpc import StatusCode # type: ignore
3031
from grpc.aio import ( # type: ignore
3132
AioRpcError,
3233
StreamStreamClientInterceptor,
@@ -69,6 +70,8 @@
6970
)
7071
from dapr.clients.grpc._response import (
7172
BindingResponse,
73+
BulkPublishResponse,
74+
BulkPublishResponseFailedEntry,
7275
BulkStateItem,
7376
BulkStatesResponse,
7477
ConfigurationResponse,
@@ -484,6 +487,96 @@ async def publish_event(
484487

485488
return DaprResponse(await call.initial_metadata())
486489

490+
async def publish_events(
491+
self,
492+
pubsub_name: str,
493+
topic_name: str,
494+
data: Sequence[Union[bytes, str]],
495+
publish_metadata: Dict[str, str] = {},
496+
data_content_type: Optional[str] = None,
497+
) -> BulkPublishResponse:
498+
"""Bulk publish multiple events to a given topic.
499+
This publishes multiple events to a specified topic and pubsub component.
500+
Each event can be bytes or str. The str data is encoded into bytes with
501+
default charset of utf-8.
502+
503+
The example publishes multiple string events to a topic:
504+
505+
from dapr.aio.clients import DaprClient
506+
async with DaprClient() as d:
507+
resp = await d.publish_events(
508+
pubsub_name='pubsub_1',
509+
topic_name='TOPIC_A',
510+
data=['message1', 'message2', 'message3'],
511+
data_content_type='text/plain',
512+
)
513+
# resp.failed_entries includes any entries that failed to publish.
514+
515+
Args:
516+
pubsub_name (str): the name of the pubsub component
517+
topic_name (str): the topic name to publish to
518+
data (Sequence[Union[bytes, str]]): sequence of events to publish;
519+
each event must be bytes or str
520+
publish_metadata (Dict[str, str], optional): Dapr metadata for the
521+
bulk publish request
522+
data_content_type (str, optional): content type of the event data
523+
524+
Returns:
525+
:class:`BulkPublishResponse` with any failed entries
526+
"""
527+
entries = []
528+
for event in data:
529+
entry_id = str(uuid.uuid4())
530+
if isinstance(event, bytes):
531+
event_data = event
532+
content_type = data_content_type or 'application/octet-stream'
533+
elif isinstance(event, str):
534+
event_data = event.encode('utf-8')
535+
content_type = data_content_type or 'text/plain'
536+
else:
537+
raise ValueError(f'invalid type for event {type(event)}')
538+
539+
entries.append(
540+
api_v1.BulkPublishRequestEntry(
541+
entry_id=entry_id,
542+
event=event_data,
543+
content_type=content_type,
544+
)
545+
)
546+
547+
req = api_v1.BulkPublishRequest(
548+
pubsub_name=pubsub_name,
549+
topic=topic_name,
550+
entries=entries,
551+
metadata=publish_metadata,
552+
)
553+
554+
try:
555+
call = self._stub.BulkPublishEvent(req)
556+
response = await call
557+
except AioRpcError as err:
558+
if err.code() == StatusCode.UNIMPLEMENTED:
559+
try:
560+
call = self._stub.BulkPublishEventAlpha1(req)
561+
response = await call
562+
except AioRpcError as err2:
563+
raise DaprGrpcError(err2) from err2
564+
else:
565+
raise DaprGrpcError(err) from err
566+
567+
failed_entries = [
568+
BulkPublishResponseFailedEntry(
569+
entry_id=entry.entry_id,
570+
error=entry.error,
571+
)
572+
for entry in response.failedEntries
573+
]
574+
575+
return BulkPublishResponse(
576+
failed_entries=failed_entries,
577+
headers=await call.initial_metadata(),
578+
)
579+
487580
async def subscribe(
488581
self,
489582
pubsub_name: str,

dapr/clients/grpc/_response.py

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -723,6 +723,64 @@ def _read_subscribe_config(
723723
pass
724724

725725

726+
class BulkPublishResponseFailedEntry:
727+
"""A failed entry from the bulk publish response.
728+
729+
Attributes:
730+
entry_id (str): the entry ID that failed.
731+
error (str): the error message for the failure.
732+
"""
733+
734+
def __init__(self, entry_id: str, error: str):
735+
"""Initializes BulkPublishResponseFailedEntry.
736+
737+
Args:
738+
entry_id (str): the entry ID that failed.
739+
error (str): the error message for the failure.
740+
"""
741+
self._entry_id = entry_id
742+
self._error = error
743+
744+
@property
745+
def entry_id(self) -> str:
746+
"""Gets the entry ID."""
747+
return self._entry_id
748+
749+
@property
750+
def error(self) -> str:
751+
"""Gets the error message."""
752+
return self._error
753+
754+
755+
class BulkPublishResponse(DaprResponse):
756+
"""The response of publish_events (bulk publish) API.
757+
758+
This inherits from DaprResponse
759+
760+
Attributes:
761+
failed_entries (List[BulkPublishResponseFailedEntry]): the entries that failed to publish.
762+
"""
763+
764+
def __init__(
765+
self,
766+
failed_entries: List[BulkPublishResponseFailedEntry] = [],
767+
headers: MetadataTuple = (),
768+
):
769+
"""Initializes BulkPublishResponse from :obj:`runtime_v1.BulkPublishResponse`.
770+
771+
Args:
772+
failed_entries (List[BulkPublishResponseFailedEntry]): the entries that failed.
773+
headers (Tuple, optional): the headers from Dapr gRPC response.
774+
"""
775+
super(BulkPublishResponse, self).__init__(headers)
776+
self._failed_entries = failed_entries
777+
778+
@property
779+
def failed_entries(self) -> List[BulkPublishResponseFailedEntry]:
780+
"""Gets the failed entries."""
781+
return self._failed_entries
782+
783+
726784
class TopicEventResponseStatus(Enum):
727785
# success is the default behavior: message is acknowledged and not retried
728786
success = appcallback_v1.TopicEventResponse.TopicEventResponseStatus.SUCCESS

dapr/clients/grpc/client.py

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
from google.protobuf.message import Message as GrpcMessage
3030
from grpc import ( # type: ignore
3131
RpcError,
32+
StatusCode,
3233
StreamStreamClientInterceptor,
3334
StreamUnaryClientInterceptor,
3435
UnaryStreamClientInterceptor,
@@ -58,6 +59,8 @@
5859
)
5960
from dapr.clients.grpc._response import (
6061
BindingResponse,
62+
BulkPublishResponse,
63+
BulkPublishResponseFailedEntry,
6164
BulkStateItem,
6265
BulkStatesResponse,
6366
ConfigurationResponse,
@@ -485,6 +488,96 @@ def publish_event(
485488

486489
return DaprResponse(call.initial_metadata())
487490

491+
def publish_events(
492+
self,
493+
pubsub_name: str,
494+
topic_name: str,
495+
data: Sequence[Union[bytes, str]],
496+
publish_metadata: Dict[str, str] = {},
497+
data_content_type: Optional[str] = None,
498+
) -> BulkPublishResponse:
499+
"""Bulk publish multiple events to a given topic.
500+
This publishes multiple events to a specified topic and pubsub component.
501+
Each event can be bytes or str. The str data is encoded into bytes with
502+
default charset of utf-8.
503+
504+
The example publishes multiple string events to a topic:
505+
506+
from dapr.clients import DaprClient
507+
with DaprClient() as d:
508+
resp = d.publish_events(
509+
pubsub_name='pubsub_1',
510+
topic_name='TOPIC_A',
511+
data=['message1', 'message2', 'message3'],
512+
data_content_type='text/plain',
513+
)
514+
# resp.failed_entries includes any entries that failed to publish.
515+
516+
Args:
517+
pubsub_name (str): the name of the pubsub component
518+
topic_name (str): the topic name to publish to
519+
data (Sequence[Union[bytes, str]]): sequence of events to publish;
520+
each event must be bytes or str
521+
publish_metadata (Dict[str, str], optional): Dapr metadata for the
522+
bulk publish request
523+
data_content_type (str, optional): content type of the event data
524+
525+
Returns:
526+
:class:`BulkPublishResponse` with any failed entries
527+
"""
528+
entries = []
529+
for event in data:
530+
entry_id = str(uuid.uuid4())
531+
if isinstance(event, bytes):
532+
event_data = event
533+
content_type = data_content_type or 'application/octet-stream'
534+
elif isinstance(event, str):
535+
event_data = event.encode('utf-8')
536+
content_type = data_content_type or 'text/plain'
537+
else:
538+
raise ValueError(f'invalid type for event {type(event)}')
539+
540+
entries.append(
541+
api_v1.BulkPublishRequestEntry(
542+
entry_id=entry_id,
543+
event=event_data,
544+
content_type=content_type,
545+
)
546+
)
547+
548+
req = api_v1.BulkPublishRequest(
549+
pubsub_name=pubsub_name,
550+
topic=topic_name,
551+
entries=entries,
552+
metadata=publish_metadata,
553+
)
554+
555+
try:
556+
response, call = self.retry_policy.run_rpc(self._stub.BulkPublishEvent.with_call, req)
557+
except RpcError as err:
558+
if err.code() == StatusCode.UNIMPLEMENTED:
559+
try:
560+
response, call = self.retry_policy.run_rpc(
561+
self._stub.BulkPublishEventAlpha1.with_call, req
562+
)
563+
except RpcError as err2:
564+
raise DaprGrpcError(err2) from err2
565+
else:
566+
raise DaprGrpcError(err) from err
567+
568+
failed_entries = [
569+
BulkPublishResponseFailedEntry(
570+
entry_id=entry.entry_id,
571+
error=entry.error,
572+
)
573+
for entry in response.failedEntries
574+
]
575+
576+
return BulkPublishResponse(
577+
failed_entries=failed_entries,
578+
headers=call.initial_metadata(),
579+
)
580+
488581
def subscribe(
489582
self,
490583
pubsub_name: str,

examples/pubsub-simple/README.md

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
# Example - Publish and subscribe to messages
22

3-
This example utilizes a publisher and a subscriber to show the pubsub pattern, it also shows `PublishEvent`, `OnTopicEvent`, `GetTopicSubscriptions`, and `TopicEventResponse` functionality.
4-
It creates a publisher and calls the `publish_event` method in the `DaprClient`.
3+
This example utilizes a publisher and a subscriber to show the pubsub pattern, it also shows `PublishEvent`, `PublishEvents` (bulk), `OnTopicEvent`, `GetTopicSubscriptions`, and `TopicEventResponse` functionality.
4+
It creates a publisher and calls the `publish_event` and `publish_events` methods in the `DaprClient`.
55
It will create a gRPC subscriber and bind the `OnTopicEvent` method, which gets triggered after a message is published to the subscribed topic.
66
The subscriber will tell dapr to retry delivery of the first message it receives, logging that the message will be retried, and printing it at least once to standard output.
77

@@ -39,6 +39,9 @@ expected_stdout_lines:
3939
- '== APP == Dead-Letter Subscriber. Originally intended topic: TOPIC_D'
4040
- '== APP == Subscriber received: TOPIC_CE'
4141
- '== APP == Subscriber received a json cloud event: id=8, message="hello world", content_type="application/json"'
42+
- '== APP == Subscriber received: id=20, message="bulk event 1", content_type="application/json"'
43+
- '== APP == Subscriber received: id=21, message="bulk event 2", content_type="application/json"'
44+
- '== APP == Subscriber received: id=22, message="bulk event 3", content_type="application/json"'
4245
- '== APP == Subscriber received: TOPIC_CE'
4346
- '== APP == Subscriber received plain text cloud event: hello world, content_type="text/plain"'
4447
@@ -68,6 +71,7 @@ expected_stdout_lines:
6871
- "== APP == {'id': 6, 'message': 'hello world'}"
6972
- "== APP == {'id': 7, 'message': 'hello world'}"
7073
- "== APP == {'specversion': '1.0', 'type': 'com.example.event', 'source': 'my-service', 'id': 'abc-8', 'data': {'id': 8, 'message': 'hello world'}, 'datacontenttype': 'application/json'}"
74+
- "== APP == Bulk published 3 events. Failed entries: 0"
7175
- "== APP == {'specversion': '1.0', 'type': 'com.example.event', 'source': 'my-service', 'id': 'abc-10', 'data': 'hello world', 'datacontenttype': 'text/plain'}"
7276
background: true
7377
sleep: 15

examples/pubsub-simple/publisher.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,31 @@
9191

9292
time.sleep(0.5)
9393

94+
# Bulk publish multiple events at once using publish_events
95+
bulk_events = [
96+
json.dumps({'id': 20, 'message': 'bulk event 1'}),
97+
json.dumps({'id': 21, 'message': 'bulk event 2'}),
98+
json.dumps({'id': 22, 'message': 'bulk event 3'}),
99+
]
100+
101+
resp = d.publish_events(
102+
pubsub_name='pubsub',
103+
topic_name='TOPIC_A',
104+
data=bulk_events,
105+
data_content_type='application/json',
106+
)
107+
108+
print(
109+
f'Bulk published {len(bulk_events)} events. Failed entries: {len(resp.failed_entries)}',
110+
flush=True,
111+
)
112+
113+
if resp.failed_entries:
114+
for entry in resp.failed_entries:
115+
print(f' Failed entry_id={entry.entry_id}: {entry.error}', flush=True)
116+
117+
time.sleep(0.5)
118+
94119
# Send a cloud event with plain text data
95120
id = 10
96121
cloud_event = {

0 commit comments

Comments
 (0)