From d3c54a70a7113bcd90cbaf89641b4cd090e7188a Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Wed, 20 May 2026 08:41:05 -0700 Subject: [PATCH] KIP-480: StickyPartitioner for KafkaProducer --- kafka/partitioner/__init__.py | 3 +- kafka/partitioner/sticky.py | 121 ++++++++++++++++++++++++++++++ kafka/producer/kafka.py | 31 +++++++- test/producer/test_partitioner.py | 97 +++++++++++++++++++++++- test/producer/test_producer.py | 39 ++++++++++ 5 files changed, 285 insertions(+), 6 deletions(-) create mode 100644 kafka/partitioner/sticky.py diff --git a/kafka/partitioner/__init__.py b/kafka/partitioner/__init__.py index eed1dca69..7bf64fd9d 100644 --- a/kafka/partitioner/__init__.py +++ b/kafka/partitioner/__init__.py @@ -1,6 +1,7 @@ from kafka.partitioner.default import DefaultPartitioner, murmur2 +from kafka.partitioner.sticky import StickyPartitioner __all__ = [ - 'DefaultPartitioner', 'murmur2' + 'DefaultPartitioner', 'StickyPartitioner', 'murmur2' ] diff --git a/kafka/partitioner/sticky.py b/kafka/partitioner/sticky.py new file mode 100644 index 000000000..a9e4cec18 --- /dev/null +++ b/kafka/partitioner/sticky.py @@ -0,0 +1,121 @@ +"""KIP-480 sticky partitioner. + +Records with a non-None key are hashed to a partition just like +:class:`~kafka.partitioner.default.DefaultPartitioner`. Records with a +None key go to a *sticky* partition — i.e. the same partition is reused +for every null-key record on a topic until KafkaProducer signals that a +batch has been completed (via :meth:`StickyPartitioner.on_new_batch`), +at which point a different partition is picked. + +The goal is to give the RecordAccumulator larger, denser batches for +null-key sends so per-batch overhead (CRC, compression, broker +round-trip) is amortized across more records. Java's benchmark in +KIP-480 reported substantial throughput/latency improvements over the +default-random behavior, though kafka-python is unlikely to see similar +improvements while predominantly CPU-bound on per-record overhead. +""" + +import random + +from kafka.partitioner.default import murmur2 + + +class StickyPartitioner: + """Partitioner that sticks null-key records to one partition per + topic until ``on_new_batch`` rotates it. + + Thread-safety: the underlying ``_sticky`` dict is mutated only by + individually-atomic Python ops (get / setitem / contains). Two + concurrent partitioners may pick different sticky partitions; the + last write wins and both choices are valid, so no lock is needed. + """ + + def __init__(self): + self._sticky = {} # topic -> partition_id + # Java's accumulator distinguishes "first batch created on a + # partition" (no rotation) from "existing batch filled, new one + # being created" (rotate). Our accumulator collapses both into + # ``new_batch_created=True``, so the partitioner absorbs the + # *first* on_new_batch event per sticky and only rotates on the + # subsequent one. Without this, we'd rotate on every record + # whose partition has no existing batch, defeating stickiness. + self._sticky_seen_batch = set() # topics whose current sticky has had >=1 batch event + + def partition(self, topic, key, all_partitions, available): + """Choose a partition for the next record. + + Arguments: + topic (str): topic to partition on. + key (bytes or None): partitioning key. + all_partitions (list[int]): every partition ID for the topic, + sorted ascending. + available (list[int]): partitions whose leader is currently + known (may be empty when metadata is stale). + + Returns: + int: chosen partition ID. + """ + if key is not None: + idx = murmur2(key) + idx &= 0x7fffffff + idx %= len(all_partitions) + return all_partitions[idx] + # Null key: reuse the sticky partition if still valid. + partition = self._sticky.get(topic) + if partition is not None: + if available: + if partition in available: + return partition + elif partition in all_partitions: + return partition + # Stale (leader unavailable, topic shrunk); fall through to re-pick. + return self._pick_sticky(topic, all_partitions, available) + + def on_new_batch(self, topic, all_partitions, prev_partition): + """Hook called by ``KafkaProducer`` when the accumulator just + opened a new batch for ``topic`` on ``prev_partition``. + + The *first* event per sticky is absorbed silently: it + corresponds to the first batch ever being created on the + partition we just picked, which is expected — we want + subsequent records to keep landing there. The *second* event + means the previous batch filled up and a new one was opened; + that's the signal to rotate to a different partition so the + next records build up a fresh dense batch elsewhere. + """ + if self._sticky.get(topic) != prev_partition: + # Someone else (or a key-routed send) already moved us off + # this partition; don't override their choice. + return + if topic not in self._sticky_seen_batch: + self._sticky_seen_batch.add(topic) + return + # Existing batch filled; rotate. + self._sticky_seen_batch.discard(topic) + self._pick_sticky(topic, all_partitions, None, + avoid=prev_partition) + + def _pick_sticky(self, topic, all_partitions, available, avoid=None): + pool = available if available else all_partitions + candidates = [p for p in pool if p != avoid] if avoid is not None else pool + if not candidates: + # Single-partition topic, or only the avoid-partition is + # available — no rotation possible. + candidates = pool + partition = random.choice(candidates) + self._sticky[topic] = partition + # Reset the seen-batch flag; the new sticky has had no batches yet. + self._sticky_seen_batch.discard(topic) + return partition + + # Compatibility shim: legacy code paths that treat partitioners as + # bare callables (key, all_partitions, available) still work, though + # they lose the per-topic stickiness. + def __call__(self, key, all_partitions, available): + if key is not None: + idx = murmur2(key) + idx &= 0x7fffffff + idx %= len(all_partitions) + return all_partitions[idx] + pool = available if available else all_partitions + return random.choice(pool) diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index f179c256e..a86a51c72 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -12,6 +12,7 @@ from kafka.codec import has_gzip, has_snappy, has_lz4, has_zstd from kafka.metrics import MetricConfig, Metrics from kafka.partitioner.default import DefaultPartitioner +from kafka.partitioner.sticky import StickyPartitioner from kafka.producer.future import FutureRecordMetadata, FutureProduceResult from kafka.producer.record_accumulator import AtomicInteger, RecordAccumulator from kafka.producer.sender import Sender @@ -390,7 +391,7 @@ class KafkaProducer: 'retries': float('inf'), 'batch_size': 16384, 'linger_ms': 0, - 'partitioner': DefaultPartitioner(), + 'partitioner': StickyPartitioner(), 'connections_max_idle_ms': 9 * 60 * 1000, 'max_block_ms': 60000, 'max_request_size': 1048576, @@ -876,6 +877,18 @@ def send(self, topic, value=None, key=None, headers=None, partition=None, timest log.debug("%s: Waking up the sender since %s is either full or" " getting a new batch", str(self), tp) self._sender.wakeup() + # KIP-480: notify a sticky-aware partitioner that this null-key + # record opened a new batch on `partition`, so the next null-key + # send for `topic` rotates to a different partition. Keyed + # records hash deterministically and don't participate in sticky + # rotation, so skip the hook for them. + if new_batch_created and key_bytes is None: + partitioner = self.config['partitioner'] + on_new_batch = getattr(partitioner, 'on_new_batch', None) + if on_new_batch is not None: + all_partitions = self._metadata.partitions_for_topic(topic) + if all_partitions is not None: + on_new_batch(topic, sorted(all_partitions), partition) return future def flush(self, timeout=None): @@ -977,9 +990,19 @@ def _partition(self, topic, partition, key, value, assert partition in all_partitions, 'Unrecognized partition' return partition - return self.config['partitioner'](serialized_key, - sorted(all_partitions), - list(available)) + # Prefer the topic-aware partition() method (KIP-480 sticky + # partitioner needs the topic for its per-topic stickiness). + # Fall back to the legacy callable interface so user-supplied + # custom partitioners written against pre-KIP-480 kafka-python + # continue to work unchanged. + partitioner = self.config['partitioner'] + if hasattr(partitioner, 'partition'): + return partitioner.partition(topic, serialized_key, + sorted(all_partitions), + list(available)) + return partitioner(serialized_key, + sorted(all_partitions), + list(available)) def metrics(self, raw=False): """Get metrics on producer performance. diff --git a/test/producer/test_partitioner.py b/test/producer/test_partitioner.py index c347fe02c..7c094ebf7 100644 --- a/test/producer/test_partitioner.py +++ b/test/producer/test_partitioner.py @@ -1,6 +1,6 @@ import pytest -from kafka.partitioner import DefaultPartitioner, murmur2 +from kafka.partitioner import DefaultPartitioner, StickyPartitioner, murmur2 def test_default_partitioner(): @@ -34,3 +34,98 @@ def test_murmur2_not_ascii(): # Verify no regression of murmur2() bug encoding py2 bytes that don't ascii encode murmur2(b'\xa4') murmur2(b'\x81' * 1000) + + +# KIP-480 sticky partitioner + + +class TestStickyPartitioner: + def test_keyed_records_hash_like_default(self): + sticky = StickyPartitioner() + default = DefaultPartitioner() + all_partitions = available = list(range(100)) + assert (sticky.partition('t', b'foo', all_partitions, available) + == default(b'foo', all_partitions, available)) + assert (sticky.partition('t', b'bar', all_partitions, available) + == default(b'bar', all_partitions, available)) + + def test_null_key_sticks_until_second_on_new_batch(self): + """The *first* on_new_batch event is absorbed (it's the first + batch being opened on the newly-picked sticky — exactly what we + want). Rotation only happens on the *second* event, which + signals that the previous batch filled up. Without this, the + partitioner would rotate on every record whose partition had + no existing batch.""" + sticky = StickyPartitioner() + all_partitions = available = list(range(10)) + p1 = sticky.partition('t', None, all_partitions, available) + for _ in range(50): + assert sticky.partition('t', None, all_partitions, available) == p1 + + # First on_new_batch: opens the first batch on p1 — no rotation. + sticky.on_new_batch('t', all_partitions, p1) + assert sticky.partition('t', None, all_partitions, available) == p1 + + # Second on_new_batch: previous batch filled — rotate. + sticky.on_new_batch('t', all_partitions, p1) + p2 = sticky.partition('t', None, all_partitions, available) + assert p2 != p1, 'second on_new_batch should rotate' + for _ in range(50): + assert sticky.partition('t', None, all_partitions, available) == p2 + + def test_per_topic_state_independent(self): + """Stickiness is per-topic; rotating one topic doesn't affect another.""" + sticky = StickyPartitioner() + all_partitions = available = list(range(10)) + p_a = sticky.partition('a', None, all_partitions, available) + p_b = sticky.partition('b', None, all_partitions, available) + # Two on_new_batch events on 'a' to actually rotate it. + sticky.on_new_batch('a', all_partitions, p_a) + sticky.on_new_batch('a', all_partitions, p_a) + # 'b' is untouched. + assert sticky.partition('b', None, all_partitions, available) == p_b + + def test_unavailable_sticky_partition_repicks(self): + """If the stuck partition's leader becomes unavailable, the next + partition() call repicks from the available set.""" + sticky = StickyPartitioner() + all_partitions = list(range(10)) + p1 = sticky.partition('t', None, all_partitions, all_partitions) + # Now only partitions != p1 are available. + available = [p for p in all_partitions if p != p1] + p2 = sticky.partition('t', None, all_partitions, available) + assert p2 != p1 + assert p2 in available + + def test_single_partition_topic_cannot_rotate(self): + """on_new_batch on a single-partition topic just keeps the same + partition — there's nothing else to rotate to.""" + sticky = StickyPartitioner() + all_partitions = available = [0] + assert sticky.partition('t', None, all_partitions, available) == 0 + sticky.on_new_batch('t', all_partitions, 0) + assert sticky.partition('t', None, all_partitions, available) == 0 + + def test_on_new_batch_ignores_stale_prev_partition(self): + """If a key-routed send or another caller already rotated the + sticky between when we picked and when on_new_batch fires, the + hook is a no-op (don't override their choice).""" + sticky = StickyPartitioner() + all_partitions = available = list(range(10)) + p1 = sticky.partition('t', None, all_partitions, available) + # Simulate someone else rotating away. + sticky._sticky['t'] = (p1 + 1) % len(all_partitions) + current = sticky._sticky['t'] + sticky.on_new_batch('t', all_partitions, prev_partition=p1) + assert sticky._sticky['t'] == current, 'should not overwrite live sticky' + + def test_legacy_callable_interface_still_works(self): + """A user-supplied custom partitioner written against the old + callable signature must keep working. We exercise that shim + via StickyPartitioner itself (it implements both forms).""" + sticky = StickyPartitioner() + all_partitions = available = list(range(100)) + # __call__ (no topic) ignores stickiness; just verify it returns + # a valid partition for both keyed and null-key inputs. + assert sticky(b'foo', all_partitions, available) in all_partitions + assert sticky(None, all_partitions, available) in all_partitions diff --git a/test/producer/test_producer.py b/test/producer/test_producer.py index 30dc32a48..a9227ad43 100644 --- a/test/producer/test_producer.py +++ b/test/producer/test_producer.py @@ -1,10 +1,12 @@ import gc import platform import threading +from unittest.mock import MagicMock import pytest from kafka import KafkaProducer +from kafka.partitioner import DefaultPartitioner, StickyPartitioner from kafka.producer.transaction_manager import TransactionManager, ProducerIdAndEpoch @@ -25,6 +27,43 @@ def test_kafka_producer_context_manager_closes_on_exit(): assert threading.active_count() == threads +def test_partition_uses_topic_aware_api_when_available(): + """_partition routes through partitioner.partition(topic, ...) when + the configured partitioner exposes it (KIP-480 sticky path).""" + producer = KafkaProducer.__new__(KafkaProducer) + producer._metadata = MagicMock() + producer._metadata.partitions_for_topic.return_value = {0, 1, 2} + producer._metadata.available_partitions_for_topic.return_value = {0, 1, 2} + + partitioner = MagicMock() + partitioner.partition.return_value = 1 + producer.config = {'partitioner': partitioner} + + result = producer._partition('t', None, None, None, b'key-bytes', b'val') + assert result == 1 + partitioner.partition.assert_called_once_with('t', b'key-bytes', [0, 1, 2], [0, 1, 2]) + + +def test_partition_falls_back_to_legacy_callable(): + """Custom partitioners written against the legacy callable signature + (no .partition method) keep working unchanged.""" + producer = KafkaProducer.__new__(KafkaProducer) + producer._metadata = MagicMock() + producer._metadata.partitions_for_topic.return_value = {0, 1, 2} + producer._metadata.available_partitions_for_topic.return_value = {0, 1, 2} + + # A plain function — no .partition attribute — must still work. + calls = [] + def legacy_partitioner(key, all_partitions, available): + calls.append((key, all_partitions, available)) + return 2 + producer.config = {'partitioner': legacy_partitioner} + + result = producer._partition('t', None, None, None, b'k', b'v') + assert result == 2 + assert calls == [(b'k', [0, 1, 2], [0, 1, 2])] + + def test_idempotent_producer_reset_producer_id(cluster): transaction_manager = TransactionManager( transactional_id=None,