Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions kafka/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,18 @@
)
from kafka.producer import KafkaProducer
from kafka.serializer import Serializer, Deserializer
from kafka.structs import TopicPartition, TopicPartitionReplica, OffsetAndMetadata
from kafka.structs import (
ConsumerGroupMetadata, OffsetAndMetadata,
TopicPartition, TopicPartitionReplica,
)
from kafka.protocol.consumer import IsolationLevel, OffsetSpec


__all__ = [
'KafkaAdminClient', 'KafkaConsumer', 'KafkaProducer',
'AsyncConsumerRebalanceListener', 'ConsumerRebalanceListener',
'Serializer', 'Deserializer',
'TopicPartition', 'TopicPartitionReplica', 'OffsetAndMetadata',
'ConsumerGroupMetadata', 'OffsetAndMetadata',
'TopicPartition', 'TopicPartitionReplica',
'IsolationLevel', 'OffsetSpec',
]
18 changes: 17 additions & 1 deletion kafka/consumer/group.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from kafka.metrics import MetricConfig, Metrics
from kafka.net.compat import KafkaNetClient
from kafka.protocol.consumer import OffsetResetStrategy
from kafka.structs import OffsetAndMetadata, TopicPartition
from kafka.structs import ConsumerGroupMetadata, OffsetAndMetadata, TopicPartition
from kafka.util import Timer
from kafka.version import __version__

Expand Down Expand Up @@ -631,6 +631,22 @@ def commit(self, offsets=None, timeout_ms=None):
offsets = self._subscription.all_consumed_offsets()
self._coordinator.commit_offsets_sync(offsets, timeout_ms=timeout_ms)

def group_metadata(self):
"""Return a snapshot of this consumer's group membership (KIP-447).

Pass the result to KafkaProducer.send_offsets_to_transaction() so the
broker can fence stale instances of this consumer when committing
offsets inside a transaction. The snapshot is always safe to call:
if no group_id is configured (manual assignment) the returned
ConsumerGroupMetadata has group_id=None.

Returns:
ConsumerGroupMetadata
"""
if self.config['group_id'] is None:
return ConsumerGroupMetadata(group_id=None)
return self._coordinator.group_metadata()

def committed(self, partition, metadata=False, timeout_ms=None):
"""Get the last committed offset for the given partition.

Expand Down
18 changes: 18 additions & 0 deletions kafka/coordinator/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
HeartbeatRequest, JoinGroupRequest, LeaveGroupRequest, SyncGroupRequest,
DEFAULT_GENERATION_ID, UNKNOWN_MEMBER_ID,
)
from kafka.structs import ConsumerGroupMetadata
from kafka.util import Timer

log = logging.getLogger('kafka.coordinator')
Expand Down Expand Up @@ -797,6 +798,23 @@ def generation_if_stable(self):
return None
return self._generation

def group_metadata(self):
"""Return a snapshot of this member's group identity (KIP-447).

Returns the current generation_id / member_id / group_instance_id even
when the group is not stable; the caller (typically
KafkaProducer.send_offsets_to_transaction) needs whatever is current
so the broker can fence stale instances. If the consumer has never
joined, the snapshot has the no-generation defaults.
"""
with self._lock:
return ConsumerGroupMetadata(
group_id=self.group_id,
generation_id=self._generation.generation_id,
member_id=self._generation.member_id,
group_instance_id=self.group_instance_id,
)

# deprecated
def generation(self):
warnings.warn("Function coordinator.generation() has been renamed to generation_if_stable()",
Expand Down
9 changes: 6 additions & 3 deletions kafka/producer/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -738,7 +738,7 @@ def begin_transaction(self):
raise Errors.IllegalStateError("Cannot use transactional methods without enabling transactions")
self._transaction_manager.begin_transaction()

def send_offsets_to_transaction(self, offsets, consumer_group_id):
def send_offsets_to_transaction(self, offsets, group_metadata):
"""
Sends a list of consumed offsets to the consumer group coordinator, and also marks
those offsets as part of the current transaction. These offsets will be considered
Expand All @@ -750,7 +750,10 @@ def send_offsets_to_transaction(self, offsets, consumer_group_id):
Arguments:
offsets ({TopicPartition: OffsetAndMetadata}): map of topic-partition -> offsets to commit
as part of current transaction.
consumer_group_id (str): Name of consumer group for offsets commit.
group_metadata (ConsumerGroupMetadata or str): full group metadata from
KafkaConsumer.group_metadata() (preferred — enables broker-side fencing
of stale consumer instances per KIP-447 against Kafka 2.5+ brokers), or
a bare consumer_group_id str for backwards compatibility.

Raises:
IllegalStateError: if no transactional_id, or transaction has not been started.
Expand All @@ -764,7 +767,7 @@ def send_offsets_to_transaction(self, offsets, consumer_group_id):
"""
if not self._transaction_manager:
raise Errors.IllegalStateError("Cannot use transactional methods without enabling transactions")
result = self._transaction_manager.send_offsets_to_transaction(offsets, consumer_group_id)
result = self._transaction_manager.send_offsets_to_transaction(offsets, group_metadata)
self._sender.wakeup()
result.wait()

Expand Down
52 changes: 42 additions & 10 deletions kafka/producer/transaction_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
AddOffsetsToTxnRequest, AddPartitionsToTxnRequest,
EndTxnRequest, InitProducerIdRequest, TxnOffsetCommitRequest,
)
from kafka.structs import TopicPartition
from kafka.structs import ConsumerGroupMetadata, TopicPartition


log = logging.getLogger(__name__)
Expand Down Expand Up @@ -210,15 +210,35 @@ def _begin_completing_transaction(self, committed):
self._enqueue_request(handler)
return handler.result

def send_offsets_to_transaction(self, offsets, consumer_group_id):
def send_offsets_to_transaction(self, offsets, group_metadata):
"""Send consumer-group offsets as part of the current transaction.

Arguments:
offsets ({TopicPartition: OffsetAndMetadata}): offsets to commit.
group_metadata (ConsumerGroupMetadata or str): full group metadata
from KafkaConsumer.group_metadata() (preferred — enables
broker-side fencing per KIP-447), or a bare group_id string
for backwards compatibility (broker treats it as v0–v2).

Returns:
FutureRecordMetadata-style Future that completes once the offsets
are durably committed (or fails fatally).
"""
if isinstance(group_metadata, str):
group_metadata = ConsumerGroupMetadata(group_id=group_metadata)
elif not isinstance(group_metadata, ConsumerGroupMetadata):
raise TypeError(
"send_offsets_to_transaction expects group_metadata to be a "
"ConsumerGroupMetadata or a group_id str, got %r" % (type(group_metadata),))

with self._lock:
self._ensure_transactional()
self._maybe_fail_with_error()
if self._current_state != TransactionState.IN_TRANSACTION:
raise Errors.KafkaError("Cannot send offsets to transaction because the producer is not in an active transaction")

log.debug("Begin adding offsets %s for consumer group %s to transaction", offsets, consumer_group_id)
handler = AddOffsetsToTxnHandler(self, consumer_group_id, offsets)
log.debug("Begin adding offsets %s for consumer group %s to transaction", offsets, group_metadata.group_id)
handler = AddOffsetsToTxnHandler(self, group_metadata, offsets)
self._enqueue_request(handler)
return handler.result

Expand Down Expand Up @@ -1128,11 +1148,14 @@ def handle_response(self, response):


class AddOffsetsToTxnHandler(TxnRequestHandler):
def __init__(self, transaction_manager, consumer_group_id, offsets):
def __init__(self, transaction_manager, group_metadata, offsets):
super().__init__(transaction_manager)

self.consumer_group_id = consumer_group_id
self.group_metadata = group_metadata
self.consumer_group_id = group_metadata.group_id
self.offsets = offsets
# max_version=3 is the highest we know how to drive (v4 is KIP-890).
# The connection negotiates the actual wire version against the broker.
self.request = AddOffsetsToTxnRequest(
transactional_id=self.transactional_id,
producer_id=self.producer_id,
Expand All @@ -1154,7 +1177,7 @@ def handle_response(self, response):
# note the result is not completed until the TxnOffsetCommit returns
for tp, offset in self.offsets.items():
self.transaction_manager._pending_txn_offset_commits[tp] = offset
handler = TxnOffsetCommitHandler(self.transaction_manager, self.consumer_group_id,
handler = TxnOffsetCommitHandler(self.transaction_manager, self.group_metadata,
self.transaction_manager._pending_txn_offset_commits, self._result)
self.transaction_manager._enqueue_request(handler)
self.transaction_manager._transaction_started = True
Expand All @@ -1177,14 +1200,20 @@ def handle_response(self, response):


class TxnOffsetCommitHandler(TxnRequestHandler):
def __init__(self, transaction_manager, consumer_group_id, offsets, result):
def __init__(self, transaction_manager, group_metadata, offsets, result):
super().__init__(transaction_manager, result=result)

self.consumer_group_id = consumer_group_id
self.group_metadata = group_metadata
self.consumer_group_id = group_metadata.group_id
self.offsets = offsets
self.request = self._build_request()

def _build_request(self):
# KIP-447: v3+ carries member_id / generation_id / group_instance_id
# so the broker can fence stale consumer instances. We always set them
# — the protocol drops them when the connection negotiates v0-v2
# against an older broker. max_version is the highest version this
# client knows how to drive: v4/v5 belong to KIP-890.
Topic = TxnOffsetCommitRequest.TxnOffsetCommitRequestTopic
Partition = Topic.TxnOffsetCommitRequestPartition

Expand All @@ -1201,9 +1230,12 @@ def _build_request(self):
group_id=self.consumer_group_id,
producer_id=self.producer_id,
producer_epoch=self.producer_epoch,
generation_id=self.group_metadata.generation_id,
member_id=self.group_metadata.member_id,
group_instance_id=self.group_metadata.group_instance_id,
topics=[Topic(name=topic, partitions=partitions)
for topic, partitions in topic_data.items()],
max_version=2,
max_version=3,
)

@property
Expand Down
18 changes: 18 additions & 0 deletions kafka/structs.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,21 @@
timestamp (int): The timestamp associated to the offset
leader_epoch (int): The last known epoch from the leader / broker
"""


ConsumerGroupMetadata = namedtuple("ConsumerGroupMetadata",
["group_id", "generation_id", "member_id", "group_instance_id"],
defaults=[-1, '', None])
ConsumerGroupMetadata.__doc__ = """A snapshot of a consumer's group membership (KIP-447).

Passed to KafkaProducer.send_offsets_to_transaction() so the broker can fence
stale consumer instances when committing offsets inside a transaction. The
broker uses member_id + generation_id + group_instance_id to verify the
producer is acting on behalf of the current group generation.

Keyword Arguments:
group_id (str): The consumer group id.
generation_id (int): The current generation id (-1 if unjoined).
member_id (str): The current member id ('' if unjoined).
group_instance_id (str): The static membership instance id, or None.
"""
26 changes: 25 additions & 1 deletion test/consumer/test_consumer.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import pytest

from kafka import KafkaConsumer, TopicPartition
from kafka import ConsumerGroupMetadata, KafkaConsumer, TopicPartition
from kafka.errors import KafkaConfigurationError, IllegalStateError


Expand Down Expand Up @@ -68,3 +68,27 @@ def test_context_manager_suppresses_autocommit_on_exception():
with consumer:
raise RuntimeError('boom')
assert consumer._closed is True


def test_group_metadata_without_group_id_returns_empty_snapshot():
"""Manual-assignment consumers have no group_id; group_metadata() must
still return a valid snapshot so users can pass it through to
producer.send_offsets_to_transaction without first checking."""
consumer = KafkaConsumer(api_version=(0, 10, 0))
gm = consumer.group_metadata()
assert isinstance(gm, ConsumerGroupMetadata)
assert gm.group_id is None
assert gm.generation_id == -1
assert gm.member_id == ''
consumer.close()


def test_group_metadata_with_group_id_delegates_to_coordinator():
"""When configured with a group_id, group_metadata() returns the live
coordinator snapshot (unjoined defaults until poll() drives a join)."""
consumer = KafkaConsumer(api_version=(0, 10, 0), group_id='grp')
gm = consumer.group_metadata()
assert gm.group_id == 'grp'
assert gm.generation_id == -1
assert gm.member_id == ''
consumer.close()
30 changes: 30 additions & 0 deletions test/consumer/test_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,36 @@ def test_protocol_type(coordinator):
assert coordinator.protocol_type() == 'consumer'


def test_group_metadata_unjoined(coordinator):
"""Before any join, group_metadata() returns the unjoined defaults so
KafkaProducer can still send v3 TxnOffsetCommit (broker will treat the
-1 generation as unfenced) without raising."""
gm = coordinator.group_metadata()
assert gm.group_id == coordinator.group_id
# NO_GENERATION constants from coordinator.base
assert gm.member_id == '' # UNKNOWN_MEMBER_ID is ''
assert gm.generation_id == -1 # DEFAULT_GENERATION_ID
assert gm.group_instance_id is None


def test_group_metadata_after_join(coordinator):
"""After joining, group_metadata() reflects the live generation."""
coordinator._generation = Generation(generation_id=42,
member_id='mbr-1',
protocol='range')
coordinator.state = MemberState.STABLE
gm = coordinator.group_metadata()
assert gm.generation_id == 42
assert gm.member_id == 'mbr-1'
# group_instance_id comes from config (None by default for this fixture).
assert gm.group_instance_id is None

# Still returns the snapshot even while rebalancing — the producer needs
# *something* to send and the broker handles fencing.
coordinator.state = MemberState.REBALANCING
assert coordinator.group_metadata().generation_id == 42


def test_group_protocols(coordinator):
# Requires a subscription
try:
Expand Down
Loading
Loading