SentinelChannel: add backward-compatible fanout for mixed-version clusters#2457
SentinelChannel: add backward-compatible fanout for mixed-version clusters#2457TheCodingLand wants to merge 4 commits intocelery:mainfrom
Conversation
943395e to
8f53bff
Compare
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #2457 +/- ##
==========================================
+ Coverage 81.11% 81.12% +0.01%
==========================================
Files 77 77
Lines 9738 9828 +90
Branches 1098 1119 +21
==========================================
+ Hits 7899 7973 +74
- Misses 1631 1636 +5
- Partials 208 219 +11 ☔ View full report in Codecov by Sentry. |
There was a problem hiding this comment.
Pull request overview
Adds an opt-in Redis Sentinel fanout compatibility mode so mixed kombu versions (pre/post 5.4.0 topic formatting change) can still exchange broadcast/control commands during rolling upgrades.
Changes:
- Introduces
sentinel_fanout_compattransport option onSentinelChannelto dual-publish and dual-subscribe to both formatted (/0.exchange) and legacy (/{db}.exchange) topics. - Implements deduplication logic to avoid double-delivery when listening on both topics.
- Adds unit tests covering default behavior, dual publish/subscribe, unsubscribe, and dedup behavior.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
kombu/transport/redis.py |
Adds sentinel_fanout_compat option and SentinelChannel logic for dual topic fanout + dedup. |
t/unit/transport/test_redis.py |
Adds tests validating compat option behavior, topic selection, and dedup. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
8f53bff to
0ea47cd
Compare
|
I updated the PR based on review feedback |
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 2 out of 2 changed files in this pull request and generated 4 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| # -- deduplicate using delivery_tag | ||
| tag = message.get('properties', {}).get( | ||
| 'delivery_tag') | ||
| if tag is None: | ||
| # Fallback for non-standard messages: use | ||
| # hash of the raw serialised bytes. | ||
| raw = payload['data'] | ||
| if isinstance(raw, str): | ||
| raw = raw.encode() | ||
| tag = hash(raw) | ||
| if tag in self._seen_fanout_payloads: |
There was a problem hiding this comment.
Deduplication keys are currently just delivery_tag (or hash(raw) fallback). If two different fanout exchanges ever reuse the same delivery_tag (or if the hash fallback collides / same payload bytes are published on two exchanges), messages could be incorrectly dropped. Consider namespacing the dedup key by exchange/channel (e.g., (exchange, delivery_tag)), and for the fallback prefer a stable digest of the raw bytes plus channel instead of Python’s salted hash().
There was a problem hiding this comment.
@TheCodingLand can you please cross check the suggestions? and share your findings
| super()._put_fanout(exchange, message, routing_key, **kwargs) | ||
| if self._compat_enabled: | ||
| legacy_topic = self._get_legacy_publish_topic( | ||
| exchange, routing_key) | ||
| with self.conn_or_acquire() as client: | ||
| client.publish(legacy_topic, dumps(message)) | ||
|
|
There was a problem hiding this comment.
When compat is enabled, _put_fanout publishes via super()._put_fanout(...) and then opens a second conn_or_acquire() context to publish the legacy topic. This results in two separate client acquisitions per fanout publish. Consider acquiring a client once and publishing to both topics on the same connection to reduce overhead.
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
auvipy
left a comment
There was a problem hiding this comment.
also, please fix the merge conflict carefully
auvipy
left a comment
There was a problem hiding this comment.
please, fix the merge conflicts
Commit 06a3b23 fixed
SentinelChannelto formatkeyprefix_fanoutwith the actual database number (e.g./{db}.→/0.), aligning sentinel with the baseChannelbehavior.However, this changed the PUB/SUB topic that sentinel workers use for fanout (broadcast) messages, which means workers running kombu < 5.4.0 and >= 5.4.0 publish and subscribe to different Redis channels and can no longer exchange control commands (
ping,shutdown,rate_limit, etc.) during rolling upgrades.This commit adds a
sentinel_fanout_compattransport option (default False) that, when explicitly enabled:/0.exchange) and the legacy literal topic (/{db}.exchange)This allows mixed-version clusters to communicate during rolling upgrades. Users performing a rolling upgrade should set
sentinel_fanout_compat=Truein theirtransport_optionsuntil all workers run kombu >= 5.4.0, then remove it.Closes #2152