Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion kafka/partitioner/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from kafka.partitioner.default import DefaultPartitioner, murmur2
from kafka.partitioner.sticky import StickyPartitioner


__all__ = [
'DefaultPartitioner', 'murmur2'
'DefaultPartitioner', 'StickyPartitioner', 'murmur2'
]
121 changes: 121 additions & 0 deletions kafka/partitioner/sticky.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
"""KIP-480 sticky partitioner.

Records with a non-None key are hashed to a partition just like
:class:`~kafka.partitioner.default.DefaultPartitioner`. Records with a
None key go to a *sticky* partition — i.e. the same partition is reused
for every null-key record on a topic until KafkaProducer signals that a
batch has been completed (via :meth:`StickyPartitioner.on_new_batch`),
at which point a different partition is picked.

The goal is to give the RecordAccumulator larger, denser batches for
null-key sends so per-batch overhead (CRC, compression, broker
round-trip) is amortized across more records. Java's benchmark in
KIP-480 reported substantial throughput/latency improvements over the
default-random behavior, though kafka-python is unlikely to see similar
improvements while predominantly CPU-bound on per-record overhead.
"""

import random

from kafka.partitioner.default import murmur2


class StickyPartitioner:
"""Partitioner that sticks null-key records to one partition per
topic until ``on_new_batch`` rotates it.

Thread-safety: the underlying ``_sticky`` dict is mutated only by
individually-atomic Python ops (get / setitem / contains). Two
concurrent partitioners may pick different sticky partitions; the
last write wins and both choices are valid, so no lock is needed.
"""

def __init__(self):
self._sticky = {} # topic -> partition_id
# Java's accumulator distinguishes "first batch created on a
# partition" (no rotation) from "existing batch filled, new one
# being created" (rotate). Our accumulator collapses both into
# ``new_batch_created=True``, so the partitioner absorbs the
# *first* on_new_batch event per sticky and only rotates on the
# subsequent one. Without this, we'd rotate on every record
# whose partition has no existing batch, defeating stickiness.
self._sticky_seen_batch = set() # topics whose current sticky has had >=1 batch event

def partition(self, topic, key, all_partitions, available):
"""Choose a partition for the next record.

Arguments:
topic (str): topic to partition on.
key (bytes or None): partitioning key.
all_partitions (list[int]): every partition ID for the topic,
sorted ascending.
available (list[int]): partitions whose leader is currently
known (may be empty when metadata is stale).

Returns:
int: chosen partition ID.
"""
if key is not None:
idx = murmur2(key)
idx &= 0x7fffffff
idx %= len(all_partitions)
return all_partitions[idx]
# Null key: reuse the sticky partition if still valid.
partition = self._sticky.get(topic)
if partition is not None:
if available:
if partition in available:
return partition
elif partition in all_partitions:
return partition
# Stale (leader unavailable, topic shrunk); fall through to re-pick.
return self._pick_sticky(topic, all_partitions, available)

def on_new_batch(self, topic, all_partitions, prev_partition):
"""Hook called by ``KafkaProducer`` when the accumulator just
opened a new batch for ``topic`` on ``prev_partition``.

The *first* event per sticky is absorbed silently: it
corresponds to the first batch ever being created on the
partition we just picked, which is expected — we want
subsequent records to keep landing there. The *second* event
means the previous batch filled up and a new one was opened;
that's the signal to rotate to a different partition so the
next records build up a fresh dense batch elsewhere.
"""
if self._sticky.get(topic) != prev_partition:
# Someone else (or a key-routed send) already moved us off
# this partition; don't override their choice.
return
if topic not in self._sticky_seen_batch:
self._sticky_seen_batch.add(topic)
return
# Existing batch filled; rotate.
self._sticky_seen_batch.discard(topic)
self._pick_sticky(topic, all_partitions, None,
avoid=prev_partition)

def _pick_sticky(self, topic, all_partitions, available, avoid=None):
pool = available if available else all_partitions
candidates = [p for p in pool if p != avoid] if avoid is not None else pool
if not candidates:
# Single-partition topic, or only the avoid-partition is
# available — no rotation possible.
candidates = pool
partition = random.choice(candidates)
self._sticky[topic] = partition
# Reset the seen-batch flag; the new sticky has had no batches yet.
self._sticky_seen_batch.discard(topic)
return partition

# Compatibility shim: legacy code paths that treat partitioners as
# bare callables (key, all_partitions, available) still work, though
# they lose the per-topic stickiness.
def __call__(self, key, all_partitions, available):
if key is not None:
idx = murmur2(key)
idx &= 0x7fffffff
idx %= len(all_partitions)
return all_partitions[idx]
pool = available if available else all_partitions
return random.choice(pool)
31 changes: 27 additions & 4 deletions kafka/producer/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from kafka.codec import has_gzip, has_snappy, has_lz4, has_zstd
from kafka.metrics import MetricConfig, Metrics
from kafka.partitioner.default import DefaultPartitioner
from kafka.partitioner.sticky import StickyPartitioner
from kafka.producer.future import FutureRecordMetadata, FutureProduceResult
from kafka.producer.record_accumulator import AtomicInteger, RecordAccumulator
from kafka.producer.sender import Sender
Expand Down Expand Up @@ -390,7 +391,7 @@ class KafkaProducer:
'retries': float('inf'),
'batch_size': 16384,
'linger_ms': 0,
'partitioner': DefaultPartitioner(),
'partitioner': StickyPartitioner(),
'connections_max_idle_ms': 9 * 60 * 1000,
'max_block_ms': 60000,
'max_request_size': 1048576,
Expand Down Expand Up @@ -876,6 +877,18 @@ def send(self, topic, value=None, key=None, headers=None, partition=None, timest
log.debug("%s: Waking up the sender since %s is either full or"
" getting a new batch", str(self), tp)
self._sender.wakeup()
# KIP-480: notify a sticky-aware partitioner that this null-key
# record opened a new batch on `partition`, so the next null-key
# send for `topic` rotates to a different partition. Keyed
# records hash deterministically and don't participate in sticky
# rotation, so skip the hook for them.
if new_batch_created and key_bytes is None:
partitioner = self.config['partitioner']
on_new_batch = getattr(partitioner, 'on_new_batch', None)
if on_new_batch is not None:
all_partitions = self._metadata.partitions_for_topic(topic)
if all_partitions is not None:
on_new_batch(topic, sorted(all_partitions), partition)
return future

def flush(self, timeout=None):
Expand Down Expand Up @@ -977,9 +990,19 @@ def _partition(self, topic, partition, key, value,
assert partition in all_partitions, 'Unrecognized partition'
return partition

return self.config['partitioner'](serialized_key,
sorted(all_partitions),
list(available))
# Prefer the topic-aware partition() method (KIP-480 sticky
# partitioner needs the topic for its per-topic stickiness).
# Fall back to the legacy callable interface so user-supplied
# custom partitioners written against pre-KIP-480 kafka-python
# continue to work unchanged.
partitioner = self.config['partitioner']
if hasattr(partitioner, 'partition'):
return partitioner.partition(topic, serialized_key,
sorted(all_partitions),
list(available))
return partitioner(serialized_key,
sorted(all_partitions),
list(available))

def metrics(self, raw=False):
"""Get metrics on producer performance.
Expand Down
97 changes: 96 additions & 1 deletion test/producer/test_partitioner.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import pytest

from kafka.partitioner import DefaultPartitioner, murmur2
from kafka.partitioner import DefaultPartitioner, StickyPartitioner, murmur2


def test_default_partitioner():
Expand Down Expand Up @@ -34,3 +34,98 @@ def test_murmur2_not_ascii():
# Verify no regression of murmur2() bug encoding py2 bytes that don't ascii encode
murmur2(b'\xa4')
murmur2(b'\x81' * 1000)


# KIP-480 sticky partitioner


class TestStickyPartitioner:
def test_keyed_records_hash_like_default(self):
sticky = StickyPartitioner()
default = DefaultPartitioner()
all_partitions = available = list(range(100))
assert (sticky.partition('t', b'foo', all_partitions, available)
== default(b'foo', all_partitions, available))
assert (sticky.partition('t', b'bar', all_partitions, available)
== default(b'bar', all_partitions, available))

def test_null_key_sticks_until_second_on_new_batch(self):
"""The *first* on_new_batch event is absorbed (it's the first
batch being opened on the newly-picked sticky — exactly what we
want). Rotation only happens on the *second* event, which
signals that the previous batch filled up. Without this, the
partitioner would rotate on every record whose partition had
no existing batch."""
sticky = StickyPartitioner()
all_partitions = available = list(range(10))
p1 = sticky.partition('t', None, all_partitions, available)
for _ in range(50):
assert sticky.partition('t', None, all_partitions, available) == p1

# First on_new_batch: opens the first batch on p1 — no rotation.
sticky.on_new_batch('t', all_partitions, p1)
assert sticky.partition('t', None, all_partitions, available) == p1

# Second on_new_batch: previous batch filled — rotate.
sticky.on_new_batch('t', all_partitions, p1)
p2 = sticky.partition('t', None, all_partitions, available)
assert p2 != p1, 'second on_new_batch should rotate'
for _ in range(50):
assert sticky.partition('t', None, all_partitions, available) == p2

def test_per_topic_state_independent(self):
"""Stickiness is per-topic; rotating one topic doesn't affect another."""
sticky = StickyPartitioner()
all_partitions = available = list(range(10))
p_a = sticky.partition('a', None, all_partitions, available)
p_b = sticky.partition('b', None, all_partitions, available)
# Two on_new_batch events on 'a' to actually rotate it.
sticky.on_new_batch('a', all_partitions, p_a)
sticky.on_new_batch('a', all_partitions, p_a)
# 'b' is untouched.
assert sticky.partition('b', None, all_partitions, available) == p_b

def test_unavailable_sticky_partition_repicks(self):
"""If the stuck partition's leader becomes unavailable, the next
partition() call repicks from the available set."""
sticky = StickyPartitioner()
all_partitions = list(range(10))
p1 = sticky.partition('t', None, all_partitions, all_partitions)
# Now only partitions != p1 are available.
available = [p for p in all_partitions if p != p1]
p2 = sticky.partition('t', None, all_partitions, available)
assert p2 != p1
assert p2 in available

def test_single_partition_topic_cannot_rotate(self):
"""on_new_batch on a single-partition topic just keeps the same
partition — there's nothing else to rotate to."""
sticky = StickyPartitioner()
all_partitions = available = [0]
assert sticky.partition('t', None, all_partitions, available) == 0
sticky.on_new_batch('t', all_partitions, 0)
assert sticky.partition('t', None, all_partitions, available) == 0

def test_on_new_batch_ignores_stale_prev_partition(self):
"""If a key-routed send or another caller already rotated the
sticky between when we picked and when on_new_batch fires, the
hook is a no-op (don't override their choice)."""
sticky = StickyPartitioner()
all_partitions = available = list(range(10))
p1 = sticky.partition('t', None, all_partitions, available)
# Simulate someone else rotating away.
sticky._sticky['t'] = (p1 + 1) % len(all_partitions)
current = sticky._sticky['t']
sticky.on_new_batch('t', all_partitions, prev_partition=p1)
assert sticky._sticky['t'] == current, 'should not overwrite live sticky'

def test_legacy_callable_interface_still_works(self):
"""A user-supplied custom partitioner written against the old
callable signature must keep working. We exercise that shim
via StickyPartitioner itself (it implements both forms)."""
sticky = StickyPartitioner()
all_partitions = available = list(range(100))
# __call__ (no topic) ignores stickiness; just verify it returns
# a valid partition for both keyed and null-key inputs.
assert sticky(b'foo', all_partitions, available) in all_partitions
assert sticky(None, all_partitions, available) in all_partitions
39 changes: 39 additions & 0 deletions test/producer/test_producer.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import gc
import platform
import threading
from unittest.mock import MagicMock

import pytest

from kafka import KafkaProducer
from kafka.partitioner import DefaultPartitioner, StickyPartitioner
from kafka.producer.transaction_manager import TransactionManager, ProducerIdAndEpoch


Expand All @@ -25,6 +27,43 @@ def test_kafka_producer_context_manager_closes_on_exit():
assert threading.active_count() == threads


def test_partition_uses_topic_aware_api_when_available():
"""_partition routes through partitioner.partition(topic, ...) when
the configured partitioner exposes it (KIP-480 sticky path)."""
producer = KafkaProducer.__new__(KafkaProducer)
producer._metadata = MagicMock()
producer._metadata.partitions_for_topic.return_value = {0, 1, 2}
producer._metadata.available_partitions_for_topic.return_value = {0, 1, 2}

partitioner = MagicMock()
partitioner.partition.return_value = 1
producer.config = {'partitioner': partitioner}

result = producer._partition('t', None, None, None, b'key-bytes', b'val')
assert result == 1
partitioner.partition.assert_called_once_with('t', b'key-bytes', [0, 1, 2], [0, 1, 2])


def test_partition_falls_back_to_legacy_callable():
"""Custom partitioners written against the legacy callable signature
(no .partition method) keep working unchanged."""
producer = KafkaProducer.__new__(KafkaProducer)
producer._metadata = MagicMock()
producer._metadata.partitions_for_topic.return_value = {0, 1, 2}
producer._metadata.available_partitions_for_topic.return_value = {0, 1, 2}

# A plain function — no .partition attribute — must still work.
calls = []
def legacy_partitioner(key, all_partitions, available):
calls.append((key, all_partitions, available))
return 2
producer.config = {'partitioner': legacy_partitioner}

result = producer._partition('t', None, None, None, b'k', b'v')
assert result == 2
assert calls == [(b'k', [0, 1, 2], [0, 1, 2])]


def test_idempotent_producer_reset_producer_id(cluster):
transaction_manager = TransactionManager(
transactional_id=None,
Expand Down
Loading