Skip to content

Commit ede26f4

Browse files
authored
NIFI-15464 Commit pending offsets for revoked partitions in ConsumeKafka (#10769)
Signed-off-by: David Handermann <exceptionfactory@apache.org>
1 parent de6f17c commit ede26f4

5 files changed

Lines changed: 817 additions & 11 deletions

File tree

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,360 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.nifi.kafka.processors;
18+
19+
import org.apache.kafka.clients.admin.Admin;
20+
import org.apache.kafka.clients.admin.AdminClientConfig;
21+
import org.apache.kafka.clients.admin.NewTopic;
22+
import org.apache.kafka.clients.admin.TopicDescription;
23+
import org.apache.kafka.clients.consumer.ConsumerConfig;
24+
import org.apache.kafka.clients.consumer.KafkaConsumer;
25+
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
26+
import org.apache.kafka.clients.producer.KafkaProducer;
27+
import org.apache.kafka.clients.producer.ProducerConfig;
28+
import org.apache.kafka.clients.producer.ProducerRecord;
29+
import org.apache.kafka.common.TopicPartition;
30+
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
31+
import org.apache.kafka.common.serialization.StringSerializer;
32+
import org.apache.nifi.kafka.service.api.consumer.AutoOffsetReset;
33+
import org.apache.nifi.kafka.service.api.record.ByteRecord;
34+
import org.apache.nifi.kafka.service.consumer.Kafka3ConsumerService;
35+
import org.apache.nifi.kafka.service.consumer.Subscription;
36+
import org.apache.nifi.logging.ComponentLog;
37+
import org.junit.jupiter.api.Test;
38+
import org.junit.jupiter.api.Timeout;
39+
40+
import java.time.Duration;
41+
import java.util.Collections;
42+
import java.util.HashSet;
43+
import java.util.Map;
44+
import java.util.Properties;
45+
import java.util.Set;
46+
import java.util.UUID;
47+
import java.util.concurrent.ExecutionException;
48+
import java.util.concurrent.TimeUnit;
49+
import java.util.concurrent.atomic.AtomicInteger;
50+
51+
import static org.junit.jupiter.api.Assertions.assertEquals;
52+
import static org.junit.jupiter.api.Assertions.assertFalse;
53+
import static org.junit.jupiter.api.Assertions.assertTrue;
54+
import static org.mockito.Mockito.mock;
55+
56+
/**
57+
* Integration tests for verifying that ConsumeKafka correctly handles consumer group rebalances
58+
* without causing duplicate message processing.
59+
*/
60+
class ConsumeKafkaRebalanceIT extends AbstractConsumeKafkaIT {
61+
62+
private static final int NUM_PARTITIONS = 3;
63+
private static final int MESSAGES_PER_PARTITION = 20;
64+
65+
/**
66+
* Tests that when onPartitionsRevoked is called (simulating rebalance), the consumer
67+
* correctly commits offsets, and a subsequent consumer in the same group doesn't
68+
* re-consume the same messages (no duplicates).
69+
*
70+
* This test:
71+
* 1. Produces messages to a multi-partition topic
72+
* 2. Consumer 1 polls and processes messages
73+
* 3. Simulates rebalance by calling onPartitionsRevoked on Consumer 1
74+
* 4. Consumer 2 joins and continues consuming from committed offsets
75+
* 5. Verifies no duplicate messages were consumed
76+
*/
77+
@Test
78+
@Timeout(value = 60, unit = TimeUnit.SECONDS)
79+
void testRebalanceDoesNotCauseDuplicates() throws Exception {
80+
final String topic = "rebalance-test-" + UUID.randomUUID();
81+
final String groupId = "rebalance-group-" + UUID.randomUUID();
82+
final int totalMessages = NUM_PARTITIONS * MESSAGES_PER_PARTITION;
83+
84+
createTopic(topic, NUM_PARTITIONS);
85+
produceMessagesToTopic(topic, NUM_PARTITIONS, MESSAGES_PER_PARTITION);
86+
87+
final Set<String> consumedMessages = new HashSet<>();
88+
final AtomicInteger duplicateCount = new AtomicInteger(0);
89+
final ComponentLog mockLog = mock(ComponentLog.class);
90+
91+
final Properties props1 = getConsumerProperties(groupId);
92+
try (KafkaConsumer<byte[], byte[]> kafkaConsumer1 = new KafkaConsumer<>(props1)) {
93+
final Subscription subscription = new Subscription(groupId, Collections.singletonList(topic), AutoOffsetReset.EARLIEST);
94+
final Kafka3ConsumerService service1 = new Kafka3ConsumerService(mockLog, kafkaConsumer1, subscription);
95+
96+
int consumer1Count = 0;
97+
int maxAttempts = 20;
98+
while (consumer1Count < totalMessages / 2 && maxAttempts-- > 0) {
99+
for (ByteRecord record : service1.poll(Duration.ofSeconds(2))) {
100+
final String messageId = record.getTopic() + "-" + record.getPartition() + "-" + record.getOffset();
101+
if (!consumedMessages.add(messageId)) {
102+
duplicateCount.incrementAndGet();
103+
}
104+
consumer1Count++;
105+
}
106+
}
107+
108+
final Set<TopicPartition> assignment = kafkaConsumer1.assignment();
109+
service1.onPartitionsRevoked(assignment);
110+
// Simulate processor committing offsets after successful session commit
111+
service1.commitOffsetsForRevokedPartitions();
112+
service1.close();
113+
}
114+
115+
final Properties props2 = getConsumerProperties(groupId);
116+
try (KafkaConsumer<byte[], byte[]> kafkaConsumer2 = new KafkaConsumer<>(props2)) {
117+
final Subscription subscription = new Subscription(groupId, Collections.singletonList(topic), AutoOffsetReset.EARLIEST);
118+
final Kafka3ConsumerService service2 = new Kafka3ConsumerService(mockLog, kafkaConsumer2, subscription);
119+
120+
int emptyPolls = 0;
121+
while (emptyPolls < 5 && consumedMessages.size() < totalMessages) {
122+
boolean hasRecords = false;
123+
for (ByteRecord record : service2.poll(Duration.ofSeconds(2))) {
124+
hasRecords = true;
125+
final String messageId = record.getTopic() + "-" + record.getPartition() + "-" + record.getOffset();
126+
if (!consumedMessages.add(messageId)) {
127+
duplicateCount.incrementAndGet();
128+
}
129+
}
130+
if (!hasRecords) {
131+
emptyPolls++;
132+
} else {
133+
emptyPolls = 0;
134+
}
135+
}
136+
137+
service2.close();
138+
}
139+
140+
assertEquals(0, duplicateCount.get(),
141+
"Expected no duplicate messages but found " + duplicateCount.get());
142+
assertEquals(totalMessages, consumedMessages.size(),
143+
"Expected to consume " + totalMessages + " unique messages but got " + consumedMessages.size());
144+
}
145+
146+
/**
147+
* Tests that offsets can be committed after rebalance when processor calls commitOffsetsForRevokedPartitions.
148+
*
149+
* This test:
150+
* 1. Creates a consumer and polls messages
151+
* 2. Manually invokes onPartitionsRevoked (simulating what Kafka does during rebalance)
152+
* 3. Calls commitOffsetsForRevokedPartitions (simulating processor committing after session commit)
153+
* 4. Verifies that offsets were committed to Kafka
154+
*/
155+
@Test
156+
@Timeout(value = 60, unit = TimeUnit.SECONDS)
157+
void testOffsetsCommittedDuringRebalance() throws Exception {
158+
final String topic = "rebalance-offset-test-" + UUID.randomUUID();
159+
final String groupId = "rebalance-offset-group-" + UUID.randomUUID();
160+
final int messagesPerPartition = 10;
161+
162+
createTopic(topic, NUM_PARTITIONS);
163+
produceMessagesToTopic(topic, NUM_PARTITIONS, messagesPerPartition);
164+
165+
final ComponentLog mockLog = mock(ComponentLog.class);
166+
final Properties props = getConsumerProperties(groupId);
167+
168+
try (KafkaConsumer<byte[], byte[]> kafkaConsumer = new KafkaConsumer<>(props)) {
169+
final Subscription subscription = new Subscription(groupId, Collections.singletonList(topic), AutoOffsetReset.EARLIEST);
170+
final Kafka3ConsumerService service = new Kafka3ConsumerService(mockLog, kafkaConsumer, subscription);
171+
172+
int polledCount = 0;
173+
int maxAttempts = 20;
174+
while (polledCount < 15 && maxAttempts-- > 0) {
175+
for (ByteRecord ignored : service.poll(Duration.ofSeconds(2))) {
176+
polledCount++;
177+
}
178+
}
179+
180+
assertTrue(polledCount > 0, "Should have polled at least some messages");
181+
182+
final Set<TopicPartition> assignment = kafkaConsumer.assignment();
183+
assertFalse(assignment.isEmpty(), "Consumer should have partition assignments");
184+
185+
service.onPartitionsRevoked(assignment);
186+
// Simulate processor committing offsets after successful session commit
187+
service.commitOffsetsForRevokedPartitions();
188+
service.close();
189+
}
190+
191+
try (KafkaConsumer<byte[], byte[]> verifyConsumer = new KafkaConsumer<>(getConsumerProperties(groupId))) {
192+
final Set<TopicPartition> partitions = new HashSet<>();
193+
for (int i = 0; i < NUM_PARTITIONS; i++) {
194+
partitions.add(new TopicPartition(topic, i));
195+
}
196+
197+
final Map<TopicPartition, OffsetAndMetadata> committedOffsets = verifyConsumer.committed(partitions);
198+
199+
long totalCommitted = committedOffsets.values().stream()
200+
.filter(o -> o != null)
201+
.mapToLong(OffsetAndMetadata::offset)
202+
.sum();
203+
204+
assertTrue(totalCommitted > 0,
205+
"Expected offsets to be committed after commitOffsetsForRevokedPartitions, but total committed offset was " + totalCommitted);
206+
}
207+
}
208+
209+
/**
210+
* Tests that records are NOT lost when a rebalance occurs before processing is complete.
211+
*
212+
* This test simulates the scenario where:
213+
* 1. Consumer polls and iterates through records (tracking offsets internally)
214+
* 2. Rebalance occurs (onPartitionsRevoked called) BEFORE the processor commits its session
215+
* 3. Consumer "fails" (simulating crash or processing failure) without committing offsets
216+
* 4. New consumer joins with the same group
217+
* 5. The new consumer receives the same records since they were never successfully processed
218+
*/
219+
@Test
220+
@Timeout(value = 60, unit = TimeUnit.SECONDS)
221+
void testNoDataLossWhenRebalanceOccursBeforeProcessingComplete() throws Exception {
222+
final String topic = "dataloss-test-" + UUID.randomUUID();
223+
final String groupId = "dataloss-group-" + UUID.randomUUID();
224+
final int messagesPerPartition = 10;
225+
final int totalMessages = NUM_PARTITIONS * messagesPerPartition;
226+
227+
createTopic(topic, NUM_PARTITIONS);
228+
produceMessagesToTopic(topic, NUM_PARTITIONS, messagesPerPartition);
229+
230+
final ComponentLog mockLog = mock(ComponentLog.class);
231+
int recordsPolledByFirstConsumer = 0;
232+
233+
// Consumer 1: Poll and iterate records, then rebalance occurs, but processing "fails"
234+
final Properties props1 = getConsumerProperties(groupId);
235+
try (KafkaConsumer<byte[], byte[]> kafkaConsumer1 = new KafkaConsumer<>(props1)) {
236+
final Subscription subscription = new Subscription(groupId, Collections.singletonList(topic), AutoOffsetReset.EARLIEST);
237+
final Kafka3ConsumerService service1 = new Kafka3ConsumerService(mockLog, kafkaConsumer1, subscription);
238+
239+
// Poll and iterate through records - this tracks offsets internally
240+
int maxAttempts = 20;
241+
while (recordsPolledByFirstConsumer < totalMessages && maxAttempts-- > 0) {
242+
for (ByteRecord ignored : service1.poll(Duration.ofSeconds(2))) {
243+
recordsPolledByFirstConsumer++;
244+
}
245+
}
246+
247+
assertTrue(recordsPolledByFirstConsumer > 0, "First consumer should have polled some records");
248+
249+
// Simulate rebalance occurring before processor commits its session
250+
final Set<TopicPartition> assignment = kafkaConsumer1.assignment();
251+
assertFalse(assignment.isEmpty(), "Consumer should have partition assignments");
252+
service1.onPartitionsRevoked(assignment);
253+
254+
// DO NOT call any "commit" or "process" method - simulating that the processor
255+
// never completed processing (e.g., session commit failed, process crashed, etc.)
256+
257+
service1.close();
258+
}
259+
260+
// Consumer 2: Should receive the SAME records because processing was never completed
261+
int recordsPolledBySecondConsumer = 0;
262+
final Properties props2 = getConsumerProperties(groupId);
263+
try (KafkaConsumer<byte[], byte[]> kafkaConsumer2 = new KafkaConsumer<>(props2)) {
264+
final Subscription subscription = new Subscription(groupId, Collections.singletonList(topic), AutoOffsetReset.EARLIEST);
265+
final Kafka3ConsumerService service2 = new Kafka3ConsumerService(mockLog, kafkaConsumer2, subscription);
266+
267+
// Poll for records - if no data loss, we should get the same records again
268+
int emptyPolls = 0;
269+
while (emptyPolls < 5) {
270+
boolean hasRecords = false;
271+
for (ByteRecord ignored : service2.poll(Duration.ofSeconds(2))) {
272+
hasRecords = true;
273+
recordsPolledBySecondConsumer++;
274+
}
275+
if (!hasRecords) {
276+
emptyPolls++;
277+
} else {
278+
emptyPolls = 0;
279+
}
280+
}
281+
282+
service2.close();
283+
}
284+
285+
// Records should NOT be lost - the second consumer should receive
286+
// at least the records that were polled by the first consumer but never processed
287+
assertTrue(recordsPolledBySecondConsumer >= recordsPolledByFirstConsumer,
288+
"Data loss detected! First consumer polled " + recordsPolledByFirstConsumer +
289+
" records but second consumer only received " + recordsPolledBySecondConsumer +
290+
" records. Expected second consumer to receive at least " + recordsPolledByFirstConsumer +
291+
" records since processing was never completed.");
292+
}
293+
294+
/**
295+
* Produces messages to a specific topic with a given number of partitions.
296+
*/
297+
private void produceMessagesToTopic(final String topic, final int numPartitions, final int messagesPerPartition) throws Exception {
298+
final Properties producerProps = new Properties();
299+
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers());
300+
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
301+
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
302+
303+
try (KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps)) {
304+
for (int partition = 0; partition < numPartitions; partition++) {
305+
for (int i = 0; i < messagesPerPartition; i++) {
306+
final String key = "key-" + partition + "-" + i;
307+
final String value = "value-" + partition + "-" + i;
308+
producer.send(new ProducerRecord<>(topic, partition, key, value)).get();
309+
}
310+
}
311+
}
312+
}
313+
314+
private void createTopic(final String topic, final int numPartitions) throws Exception {
315+
final Properties adminProps = new Properties();
316+
adminProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers());
317+
318+
try (Admin admin = Admin.create(adminProps)) {
319+
final NewTopic newTopic = new NewTopic(topic, numPartitions, (short) 1);
320+
admin.createTopics(Collections.singletonList(newTopic)).all().get(30, TimeUnit.SECONDS);
321+
waitForTopicReady(admin, topic, numPartitions);
322+
}
323+
}
324+
325+
private void waitForTopicReady(final Admin admin, final String topic, final int expectedPartitions) throws Exception {
326+
final long startTime = System.currentTimeMillis();
327+
final long timeoutMillis = 30000;
328+
329+
while (System.currentTimeMillis() - startTime < timeoutMillis) {
330+
try {
331+
final Map<String, TopicDescription> descriptions = admin.describeTopics(Collections.singletonList(topic))
332+
.allTopicNames()
333+
.get(10, TimeUnit.SECONDS);
334+
final TopicDescription description = descriptions.get(topic);
335+
if (description != null && description.partitions().size() == expectedPartitions) {
336+
return;
337+
}
338+
} catch (ExecutionException ignored) {
339+
// Topic not ready yet, continue polling
340+
}
341+
Thread.sleep(100);
342+
}
343+
throw new RuntimeException("Topic " + topic + " not ready after " + timeoutMillis + "ms");
344+
}
345+
346+
private Properties getConsumerProperties(final String groupId) {
347+
final Properties props = new Properties();
348+
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers());
349+
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
350+
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
351+
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
352+
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
353+
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
354+
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "100");
355+
// Use shorter session timeout to speed up rebalance detection
356+
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000");
357+
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "3000");
358+
return props;
359+
}
360+
}

0 commit comments

Comments
 (0)