diff --git a/kafka/__init__.py b/kafka/__init__.py index 29131ace4..b4a4ae396 100644 --- a/kafka/__init__.py +++ b/kafka/__init__.py @@ -17,7 +17,10 @@ ) from kafka.producer import KafkaProducer from kafka.serializer import Serializer, Deserializer -from kafka.structs import TopicPartition, TopicPartitionReplica, OffsetAndMetadata +from kafka.structs import ( + ConsumerGroupMetadata, OffsetAndMetadata, + TopicPartition, TopicPartitionReplica, +) from kafka.protocol.consumer import IsolationLevel, OffsetSpec @@ -25,6 +28,7 @@ 'KafkaAdminClient', 'KafkaConsumer', 'KafkaProducer', 'AsyncConsumerRebalanceListener', 'ConsumerRebalanceListener', 'Serializer', 'Deserializer', - 'TopicPartition', 'TopicPartitionReplica', 'OffsetAndMetadata', + 'ConsumerGroupMetadata', 'OffsetAndMetadata', + 'TopicPartition', 'TopicPartitionReplica', 'IsolationLevel', 'OffsetSpec', ] diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 1b4c95b23..9d041a323 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -16,7 +16,7 @@ from kafka.metrics import MetricConfig, Metrics from kafka.net.compat import KafkaNetClient from kafka.protocol.consumer import OffsetResetStrategy -from kafka.structs import OffsetAndMetadata, TopicPartition +from kafka.structs import ConsumerGroupMetadata, OffsetAndMetadata, TopicPartition from kafka.util import Timer from kafka.version import __version__ @@ -631,6 +631,22 @@ def commit(self, offsets=None, timeout_ms=None): offsets = self._subscription.all_consumed_offsets() self._coordinator.commit_offsets_sync(offsets, timeout_ms=timeout_ms) + def group_metadata(self): + """Return a snapshot of this consumer's group membership (KIP-447). + + Pass the result to KafkaProducer.send_offsets_to_transaction() so the + broker can fence stale instances of this consumer when committing + offsets inside a transaction. The snapshot is always safe to call: + if no group_id is configured (manual assignment) the returned + ConsumerGroupMetadata has group_id=None. + + Returns: + ConsumerGroupMetadata + """ + if self.config['group_id'] is None: + return ConsumerGroupMetadata(group_id=None) + return self._coordinator.group_metadata() + def committed(self, partition, metadata=False, timeout_ms=None): """Get the last committed offset for the given partition. diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index 1d310bb73..654c5f038 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -16,6 +16,7 @@ HeartbeatRequest, JoinGroupRequest, LeaveGroupRequest, SyncGroupRequest, DEFAULT_GENERATION_ID, UNKNOWN_MEMBER_ID, ) +from kafka.structs import ConsumerGroupMetadata from kafka.util import Timer log = logging.getLogger('kafka.coordinator') @@ -797,6 +798,23 @@ def generation_if_stable(self): return None return self._generation + def group_metadata(self): + """Return a snapshot of this member's group identity (KIP-447). + + Returns the current generation_id / member_id / group_instance_id even + when the group is not stable; the caller (typically + KafkaProducer.send_offsets_to_transaction) needs whatever is current + so the broker can fence stale instances. If the consumer has never + joined, the snapshot has the no-generation defaults. + """ + with self._lock: + return ConsumerGroupMetadata( + group_id=self.group_id, + generation_id=self._generation.generation_id, + member_id=self._generation.member_id, + group_instance_id=self.group_instance_id, + ) + # deprecated def generation(self): warnings.warn("Function coordinator.generation() has been renamed to generation_if_stable()", diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index a86a51c72..441dd2317 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -738,7 +738,7 @@ def begin_transaction(self): raise Errors.IllegalStateError("Cannot use transactional methods without enabling transactions") self._transaction_manager.begin_transaction() - def send_offsets_to_transaction(self, offsets, consumer_group_id): + def send_offsets_to_transaction(self, offsets, group_metadata): """ Sends a list of consumed offsets to the consumer group coordinator, and also marks those offsets as part of the current transaction. These offsets will be considered @@ -750,7 +750,10 @@ def send_offsets_to_transaction(self, offsets, consumer_group_id): Arguments: offsets ({TopicPartition: OffsetAndMetadata}): map of topic-partition -> offsets to commit as part of current transaction. - consumer_group_id (str): Name of consumer group for offsets commit. + group_metadata (ConsumerGroupMetadata or str): full group metadata from + KafkaConsumer.group_metadata() (preferred — enables broker-side fencing + of stale consumer instances per KIP-447 against Kafka 2.5+ brokers), or + a bare consumer_group_id str for backwards compatibility. Raises: IllegalStateError: if no transactional_id, or transaction has not been started. @@ -764,7 +767,7 @@ def send_offsets_to_transaction(self, offsets, consumer_group_id): """ if not self._transaction_manager: raise Errors.IllegalStateError("Cannot use transactional methods without enabling transactions") - result = self._transaction_manager.send_offsets_to_transaction(offsets, consumer_group_id) + result = self._transaction_manager.send_offsets_to_transaction(offsets, group_metadata) self._sender.wakeup() result.wait() diff --git a/kafka/producer/transaction_manager.py b/kafka/producer/transaction_manager.py index d2cbe7956..40f4df9e1 100644 --- a/kafka/producer/transaction_manager.py +++ b/kafka/producer/transaction_manager.py @@ -11,7 +11,7 @@ AddOffsetsToTxnRequest, AddPartitionsToTxnRequest, EndTxnRequest, InitProducerIdRequest, TxnOffsetCommitRequest, ) -from kafka.structs import TopicPartition +from kafka.structs import ConsumerGroupMetadata, TopicPartition log = logging.getLogger(__name__) @@ -210,15 +210,35 @@ def _begin_completing_transaction(self, committed): self._enqueue_request(handler) return handler.result - def send_offsets_to_transaction(self, offsets, consumer_group_id): + def send_offsets_to_transaction(self, offsets, group_metadata): + """Send consumer-group offsets as part of the current transaction. + + Arguments: + offsets ({TopicPartition: OffsetAndMetadata}): offsets to commit. + group_metadata (ConsumerGroupMetadata or str): full group metadata + from KafkaConsumer.group_metadata() (preferred — enables + broker-side fencing per KIP-447), or a bare group_id string + for backwards compatibility (broker treats it as v0–v2). + + Returns: + FutureRecordMetadata-style Future that completes once the offsets + are durably committed (or fails fatally). + """ + if isinstance(group_metadata, str): + group_metadata = ConsumerGroupMetadata(group_id=group_metadata) + elif not isinstance(group_metadata, ConsumerGroupMetadata): + raise TypeError( + "send_offsets_to_transaction expects group_metadata to be a " + "ConsumerGroupMetadata or a group_id str, got %r" % (type(group_metadata),)) + with self._lock: self._ensure_transactional() self._maybe_fail_with_error() if self._current_state != TransactionState.IN_TRANSACTION: raise Errors.KafkaError("Cannot send offsets to transaction because the producer is not in an active transaction") - log.debug("Begin adding offsets %s for consumer group %s to transaction", offsets, consumer_group_id) - handler = AddOffsetsToTxnHandler(self, consumer_group_id, offsets) + log.debug("Begin adding offsets %s for consumer group %s to transaction", offsets, group_metadata.group_id) + handler = AddOffsetsToTxnHandler(self, group_metadata, offsets) self._enqueue_request(handler) return handler.result @@ -1128,11 +1148,14 @@ def handle_response(self, response): class AddOffsetsToTxnHandler(TxnRequestHandler): - def __init__(self, transaction_manager, consumer_group_id, offsets): + def __init__(self, transaction_manager, group_metadata, offsets): super().__init__(transaction_manager) - self.consumer_group_id = consumer_group_id + self.group_metadata = group_metadata + self.consumer_group_id = group_metadata.group_id self.offsets = offsets + # max_version=3 is the highest we know how to drive (v4 is KIP-890). + # The connection negotiates the actual wire version against the broker. self.request = AddOffsetsToTxnRequest( transactional_id=self.transactional_id, producer_id=self.producer_id, @@ -1154,7 +1177,7 @@ def handle_response(self, response): # note the result is not completed until the TxnOffsetCommit returns for tp, offset in self.offsets.items(): self.transaction_manager._pending_txn_offset_commits[tp] = offset - handler = TxnOffsetCommitHandler(self.transaction_manager, self.consumer_group_id, + handler = TxnOffsetCommitHandler(self.transaction_manager, self.group_metadata, self.transaction_manager._pending_txn_offset_commits, self._result) self.transaction_manager._enqueue_request(handler) self.transaction_manager._transaction_started = True @@ -1177,14 +1200,20 @@ def handle_response(self, response): class TxnOffsetCommitHandler(TxnRequestHandler): - def __init__(self, transaction_manager, consumer_group_id, offsets, result): + def __init__(self, transaction_manager, group_metadata, offsets, result): super().__init__(transaction_manager, result=result) - self.consumer_group_id = consumer_group_id + self.group_metadata = group_metadata + self.consumer_group_id = group_metadata.group_id self.offsets = offsets self.request = self._build_request() def _build_request(self): + # KIP-447: v3+ carries member_id / generation_id / group_instance_id + # so the broker can fence stale consumer instances. We always set them + # — the protocol drops them when the connection negotiates v0-v2 + # against an older broker. max_version is the highest version this + # client knows how to drive: v4/v5 belong to KIP-890. Topic = TxnOffsetCommitRequest.TxnOffsetCommitRequestTopic Partition = Topic.TxnOffsetCommitRequestPartition @@ -1201,9 +1230,12 @@ def _build_request(self): group_id=self.consumer_group_id, producer_id=self.producer_id, producer_epoch=self.producer_epoch, + generation_id=self.group_metadata.generation_id, + member_id=self.group_metadata.member_id, + group_instance_id=self.group_metadata.group_instance_id, topics=[Topic(name=topic, partitions=partitions) for topic, partitions in topic_data.items()], - max_version=2, + max_version=3, ) @property diff --git a/kafka/structs.py b/kafka/structs.py index 4189cc1bc..a6b590e97 100644 --- a/kafka/structs.py +++ b/kafka/structs.py @@ -49,3 +49,21 @@ timestamp (int): The timestamp associated to the offset leader_epoch (int): The last known epoch from the leader / broker """ + + +ConsumerGroupMetadata = namedtuple("ConsumerGroupMetadata", + ["group_id", "generation_id", "member_id", "group_instance_id"], + defaults=[-1, '', None]) +ConsumerGroupMetadata.__doc__ = """A snapshot of a consumer's group membership (KIP-447). + +Passed to KafkaProducer.send_offsets_to_transaction() so the broker can fence +stale consumer instances when committing offsets inside a transaction. The +broker uses member_id + generation_id + group_instance_id to verify the +producer is acting on behalf of the current group generation. + +Keyword Arguments: + group_id (str): The consumer group id. + generation_id (int): The current generation id (-1 if unjoined). + member_id (str): The current member id ('' if unjoined). + group_instance_id (str): The static membership instance id, or None. +""" diff --git a/test/consumer/test_consumer.py b/test/consumer/test_consumer.py index 3e416df5a..b96f6b75a 100644 --- a/test/consumer/test_consumer.py +++ b/test/consumer/test_consumer.py @@ -1,6 +1,6 @@ import pytest -from kafka import KafkaConsumer, TopicPartition +from kafka import ConsumerGroupMetadata, KafkaConsumer, TopicPartition from kafka.errors import KafkaConfigurationError, IllegalStateError @@ -68,3 +68,27 @@ def test_context_manager_suppresses_autocommit_on_exception(): with consumer: raise RuntimeError('boom') assert consumer._closed is True + + +def test_group_metadata_without_group_id_returns_empty_snapshot(): + """Manual-assignment consumers have no group_id; group_metadata() must + still return a valid snapshot so users can pass it through to + producer.send_offsets_to_transaction without first checking.""" + consumer = KafkaConsumer(api_version=(0, 10, 0)) + gm = consumer.group_metadata() + assert isinstance(gm, ConsumerGroupMetadata) + assert gm.group_id is None + assert gm.generation_id == -1 + assert gm.member_id == '' + consumer.close() + + +def test_group_metadata_with_group_id_delegates_to_coordinator(): + """When configured with a group_id, group_metadata() returns the live + coordinator snapshot (unjoined defaults until poll() drives a join).""" + consumer = KafkaConsumer(api_version=(0, 10, 0), group_id='grp') + gm = consumer.group_metadata() + assert gm.group_id == 'grp' + assert gm.generation_id == -1 + assert gm.member_id == '' + consumer.close() diff --git a/test/consumer/test_coordinator.py b/test/consumer/test_coordinator.py index 27b1d6621..0c9acb5bc 100644 --- a/test/consumer/test_coordinator.py +++ b/test/consumer/test_coordinator.py @@ -61,6 +61,36 @@ def test_protocol_type(coordinator): assert coordinator.protocol_type() == 'consumer' +def test_group_metadata_unjoined(coordinator): + """Before any join, group_metadata() returns the unjoined defaults so + KafkaProducer can still send v3 TxnOffsetCommit (broker will treat the + -1 generation as unfenced) without raising.""" + gm = coordinator.group_metadata() + assert gm.group_id == coordinator.group_id + # NO_GENERATION constants from coordinator.base + assert gm.member_id == '' # UNKNOWN_MEMBER_ID is '' + assert gm.generation_id == -1 # DEFAULT_GENERATION_ID + assert gm.group_instance_id is None + + +def test_group_metadata_after_join(coordinator): + """After joining, group_metadata() reflects the live generation.""" + coordinator._generation = Generation(generation_id=42, + member_id='mbr-1', + protocol='range') + coordinator.state = MemberState.STABLE + gm = coordinator.group_metadata() + assert gm.generation_id == 42 + assert gm.member_id == 'mbr-1' + # group_instance_id comes from config (None by default for this fixture). + assert gm.group_instance_id is None + + # Still returns the snapshot even while rebalancing — the producer needs + # *something* to send and the broker handles fencing. + coordinator.state = MemberState.REBALANCING + assert coordinator.group_metadata().generation_id == 42 + + def test_group_protocols(coordinator): # Requires a subscription try: diff --git a/test/producer/test_transaction_manager_mock_broker.py b/test/producer/test_transaction_manager_mock_broker.py index df95f334b..258c06f22 100644 --- a/test/producer/test_transaction_manager_mock_broker.py +++ b/test/producer/test_transaction_manager_mock_broker.py @@ -39,7 +39,7 @@ TxnOffsetCommitResponse, ) from kafka.record import MemoryRecords -from kafka.structs import OffsetAndMetadata, TopicPartition +from kafka.structs import ConsumerGroupMetadata, OffsetAndMetadata, TopicPartition from test.mock_broker import MockBroker from test.test_mock_broker import _poll_for_future @@ -836,7 +836,8 @@ def _enqueue_add_offsets(self, tm, group_id='my-group', TopicPartition('foo', 0): OffsetAndMetadata(offset=10, metadata='', leader_epoch=-1), } - handler = AddOffsetsToTxnHandler(tm, group_id, offsets) + group_metadata = ConsumerGroupMetadata(group_id=group_id) + handler = AddOffsetsToTxnHandler(tm, group_metadata, offsets) tm._enqueue_request(handler) return handler, group_id, offsets @@ -974,7 +975,8 @@ def _enqueue_offset_commit(self, tm, group_id='my-group', tm._pending_txn_offset_commits.update(offsets) from kafka.producer.transaction_manager import TransactionalRequestResult result = TransactionalRequestResult() - handler = TxnOffsetCommitHandler(tm, group_id, offsets, result) + group_metadata = ConsumerGroupMetadata(group_id=group_id) + handler = TxnOffsetCommitHandler(tm, group_metadata, offsets, result) tm._enqueue_request(handler) return handler, tp @@ -1105,7 +1107,7 @@ def test_partial_retriable_retries_only_failed(self, broker, client): tm._pending_txn_offset_commits.update(offsets) from kafka.producer.transaction_manager import TransactionalRequestResult result = TransactionalRequestResult() - handler = TxnOffsetCommitHandler(tm, 'my-group', offsets, result) + handler = TxnOffsetCommitHandler(tm, ConsumerGroupMetadata('my-group'), offsets, result) tm._enqueue_request(handler) broker.respond( @@ -1126,6 +1128,119 @@ def test_partial_retriable_retries_only_failed(self, broker, client): assert not result.is_done +# --------------------------------------------------------------------------- +# KIP-447: ConsumerGroupMetadata threading +# --------------------------------------------------------------------------- + + +class TestKip447ConsumerGroupMetadata: + """KIP-447: TxnOffsetCommit v3+ must carry generation_id / member_id / + group_instance_id so the broker can fence stale consumer instances. + + The TxnOffsetCommit request uses the modern style (max_version=3 cap + + per-connection api_versions negotiation), so we assert the v3 fields + are *populated* on the request object — the wire encoding drops them + automatically when the negotiated version is v0-v2. + """ + + def _offsets(self): + return {TopicPartition('foo', 0): + OffsetAndMetadata(offset=10, metadata='', leader_epoch=-1)} + + def test_v3_request_carries_member_and_generation(self, broker, client): + tm = _make_manager(client) + tm._current_state = TransactionState.IN_TRANSACTION + tm._consumer_group_coordinator = 0 + gm = ConsumerGroupMetadata(group_id='g', generation_id=42, + member_id='m-1', group_instance_id='inst-A') + from kafka.producer.transaction_manager import TransactionalRequestResult + result = TransactionalRequestResult() + handler = TxnOffsetCommitHandler(tm, gm, self._offsets(), result) + + # max_version caps at 3; connection negotiates the actual version. + assert handler.request._max_version == 3 + assert handler.request.API_VERSION is None + assert handler.request.generation_id == 42 + assert handler.request.member_id == 'm-1' + assert handler.request.group_instance_id == 'inst-A' + assert handler.request.group_id == 'g' + + def test_request_negotiates_down_to_v2_against_old_broker(self, broker, client): + """When the broker only supports v0-v2, the connection negotiates v2 + and the v3-only fields drop off the wire.""" + from kafka.protocol.broker_version_data import BrokerVersionData + tm = _make_manager(client) + tm._current_state = TransactionState.IN_TRANSACTION + gm = ConsumerGroupMetadata(group_id='g', generation_id=42, + member_id='m-1', group_instance_id='inst') + from kafka.producer.transaction_manager import TransactionalRequestResult + handler = TxnOffsetCommitHandler(tm, gm, self._offsets(), + TransactionalRequestResult()) + + # Simulate a 2.1-era broker capped at TxnOffsetCommit v2. + broker_data = BrokerVersionData(api_versions={28: (0, 2)}) + assert broker_data.api_version(handler.request) == 2 + + # And a 2.5+ broker negotiates v3. + broker_data = BrokerVersionData(api_versions={28: (0, 3)}) + assert broker_data.api_version(handler.request) == 3 + + def test_send_offsets_to_transaction_accepts_bare_string(self, broker, client): + """Back-compat: legacy callers pass a group_id str; wrap into metadata + with no-generation defaults so broker treats it as a non-fenced + commit (which is the v0–v2 behavior on older brokers anyway).""" + tm = _make_manager(client) + tm._current_state = TransactionState.IN_TRANSACTION + tm._consumer_group_coordinator = 0 + result = tm.send_offsets_to_transaction(self._offsets(), 'legacy-group') + + # Find the enqueued AddOffsetsToTxnHandler and inspect its metadata. + handlers = [h for h in _pending_handlers(tm) + if isinstance(h, AddOffsetsToTxnHandler)] + assert len(handlers) == 1 + assert handlers[0].consumer_group_id == 'legacy-group' + assert handlers[0].group_metadata.generation_id == -1 + assert handlers[0].group_metadata.member_id == '' + assert handlers[0].group_metadata.group_instance_id is None + # Cleanup: not driving through to completion in this test. + assert not result.is_done + + def test_send_offsets_to_transaction_rejects_garbage(self, broker, client): + tm = _make_manager(client) + tm._current_state = TransactionState.IN_TRANSACTION + with pytest.raises(TypeError): + tm.send_offsets_to_transaction(self._offsets(), 42) + + def test_group_metadata_propagates_through_add_offsets_to_commit_handler( + self, broker, client): + """The AddOffsetsToTxn -> TxnOffsetCommit chain must preserve the + full metadata; otherwise v3 would silently lose member/generation.""" + tm = _make_manager(client) + tm._current_state = TransactionState.IN_TRANSACTION + tm._consumer_group_coordinator = 0 + gm = ConsumerGroupMetadata(group_id='g', generation_id=7, + member_id='m', group_instance_id='inst') + offsets = self._offsets() + handler = AddOffsetsToTxnHandler(tm, gm, offsets) + tm._enqueue_request(handler) + + # Drive the AddOffsetsToTxn round-trip; success should enqueue a + # TxnOffsetCommitHandler initialized from the same metadata. + broker.respond(AddOffsetsToTxnResponse, + AddOffsetsToTxnResponse(throttle_time_ms=0, error_code=0)) + _, future = _dispatch_next(client, tm) + _poll_for_future(client, future) + + commit_handlers = [h for h in _pending_handlers(tm) + if isinstance(h, TxnOffsetCommitHandler)] + assert len(commit_handlers) == 1 + assert commit_handlers[0].group_metadata == gm + assert commit_handlers[0].request._max_version == 3 + assert commit_handlers[0].request.generation_id == 7 + assert commit_handlers[0].request.member_id == 'm' + assert commit_handlers[0].request.group_instance_id == 'inst' + + # --------------------------------------------------------------------------- # Idempotent producer ordering on retry # ---------------------------------------------------------------------------