-
-
Notifications
You must be signed in to change notification settings - Fork 998
fix: add missing retry to publisher.publish in gcpubsub _put #2465
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
2c97b72
3c28a62
e42766c
d7c57fa
ea640d0
c644840
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -3,12 +3,13 @@ | |
| from concurrent.futures import Future | ||
| from datetime import datetime | ||
| from queue import Empty | ||
| from unittest.mock import MagicMock, call, patch | ||
| from unittest.mock import ANY, MagicMock, call, patch | ||
|
|
||
| import pytest | ||
| from _socket import timeout as socket_timeout | ||
| from google.api_core.exceptions import (AlreadyExists, DeadlineExceeded, | ||
| PermissionDenied) | ||
| from google.api_core.retry import Retry | ||
| from google.pubsub_v1.types.pubsub import Subscription | ||
|
|
||
| from kombu.transport.gcpubsub import (AtomicCounter, Channel, QueueDescriptor, | ||
|
|
@@ -328,8 +329,37 @@ def test_put(self, channel): | |
| ) | ||
| channel._get_routing_key = MagicMock(return_value="test_key") | ||
| channel.publisher.publish = MagicMock() | ||
| channel.retry_timeout_seconds = 300 | ||
| channel._put(queue, message) | ||
| channel.publisher.publish.assert_called_once() | ||
| channel.publisher.publish.assert_called_once_with( | ||
| "topic_path", | ||
| ANY, | ||
| routing_key="test_key", | ||
| retry=ANY, | ||
| ) | ||
| call_kwargs = channel.publisher.publish.call_args[1] | ||
| assert isinstance(call_kwargs['retry'], Retry) | ||
| assert call_kwargs['retry']._timeout == 300 | ||
|
|
||
| def test_put_uses_custom_retry_timeout(self, channel): | ||
| queue = "test_queue" | ||
| message = { | ||
| "properties": {"delivery_info": {"routing_key": "test_key"}} | ||
| } | ||
| channel.entity_name = MagicMock(return_value=queue) | ||
| channel._queue_cache[channel.entity_name(queue)] = QueueDescriptor( | ||
| name=queue, | ||
| topic_path="topic_path", | ||
| subscription_id=queue, | ||
| subscription_path="subscription_path", | ||
| ) | ||
| channel._get_routing_key = MagicMock(return_value="test_key") | ||
| channel.publisher.publish = MagicMock() | ||
| channel.retry_timeout_seconds = 60 | ||
| channel._put(queue, message) | ||
| call_kwargs = channel.publisher.publish.call_args[1] | ||
| assert isinstance(call_kwargs['retry'], Retry) | ||
| assert call_kwargs['retry']._timeout == 60 | ||
|
Comment on lines
+360
to
+362
|
||
|
|
||
| def test_put_fanout(self, channel): | ||
| exchange = "test_exchange" | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The test is asserting against
Retry's private attribute._timeout. This is not part of the public API and can change acrossgoogle-api-coreversions (and may not reflect the value passed viadeadline=). Prefer asserting via a stable interface (e.g., patchingkombu.transport.gcpubsub.Retryand checking it was constructed withdeadline=..., or asserting on a public property if one exists in the supported versions).There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and this