-
Notifications
You must be signed in to change notification settings - Fork 943
Open
Description
Description
In the confluent-kafka version 2.13.0 the behavior of the consumer .consume() method was changed.
It seems like the num_messages and timeout parameters don't have effect.
Messages are consumed in batches as soon as they available in broker.
The confluent-kafka version 2.12.2 works as expected.
Script to reproduce
import json
import logging
import sys
import threading
import time
from itertools import batched
from confluent_kafka import Consumer, Producer, version
from confluent_kafka.admin import AdminClient, NewTopic
BOOTSTRAP_SERVERS = '127.0.0.1:29092'
CONSUMER_CONFIG = {
'bootstrap.servers': BOOTSTRAP_SERVERS,
'group.id': 'test-consumer',
'auto.offset.reset': 'earliest',
'enable.auto.commit': False,
}
KAFKA_TOPIC = 'test-topic'
PRODUCER_BATCHES_COUNT = 10
PRODUCER_BATCH_SIZE = 4
PRODUCER_TIMEOUT = 4
CONSUMER_BATCH_SIZE = 10
CONSUMER_TIMEOUT = 10
def create_topic(topic: str, partitions: int = 1):
admin_client = AdminClient({'bootstrap.servers': BOOTSTRAP_SERVERS})
cluster_metadata = admin_client.list_topics()
if topic not in cluster_metadata.topics:
topic_instance = NewTopic(topic=topic, num_partitions=partitions)
create_topic_features = admin_client.create_topics(new_topics=[topic_instance])
create_topic_features[topic].result()
def generate_messages(messages_count: int) -> list[dict[str, str]]:
messages = [
{'key': json.dumps({'key': i}), 'value': json.dumps({'value': i * 10})}
for i in range(messages_count)
]
return messages
def produce_messages():
producer = Producer({'bootstrap.servers': BOOTSTRAP_SERVERS})
messages = generate_messages(messages_count=PRODUCER_BATCHES_COUNT * PRODUCER_BATCH_SIZE)
for messages_batch in batched(messages, PRODUCER_BATCH_SIZE):
producer.produce_batch(topic=KAFKA_TOPIC, messages=list(messages_batch))
producer.flush()
logging.info(f'[Producer] produced messages count: {len(messages_batch)}')
time.sleep(PRODUCER_TIMEOUT)
def consume_messages():
consumer = Consumer(CONSUMER_CONFIG)
try:
consumer.subscribe(topics=[KAFKA_TOPIC])
while True:
start_time = time.perf_counter()
logging.info('[Consumer] start message batch consuming')
messages = consumer.consume(num_messages=CONSUMER_BATCH_SIZE, timeout=CONSUMER_TIMEOUT)
elapsed_time = time.perf_counter() - start_time
logging.info(f'[Consumer] consumed messages count: {len(messages)}; elapsed time: {elapsed_time:.2f}s')
consumer.commit(asynchronous=False)
finally:
consumer.close()
def main():
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(message)s', stream=sys.stdout)
logging.info(f'python version: {sys.version}')
logging.info(f'confluent-kafka version: {version()}')
create_topic(topic=KAFKA_TOPIC)
producer_thread = threading.Thread(target=produce_messages)
producer_thread.start()
consumer_thread = threading.Thread(target=consume_messages)
consumer_thread.start()
for thread in (producer_thread, consumer_thread):
thread.join()
if __name__ == '__main__':
main()Script logs
v2.12.2 (as expected)
2026-02-26 22:41:45,760 - python version: 3.13.12 (main, Feb 4 2026, 00:00:00) [GCC 15.2.1 20260123 (Red Hat 15.2.1-7)]
2026-02-26 22:41:45,760 - confluent-kafka version: 2.12.2
2026-02-26 22:41:45,837 - [Consumer] start message batch consuming
2026-02-26 22:41:45,838 - [Producer] produced messages count: 4
2026-02-26 22:41:49,840 - [Producer] produced messages count: 4
2026-02-26 22:41:53,842 - [Producer] produced messages count: 4
2026-02-26 22:41:53,842 - [Consumer] consumed messages count: 10; elapsed time: 8.01s
2026-02-26 22:41:53,844 - [Consumer] start message batch consuming
2026-02-26 22:41:57,843 - [Producer] produced messages count: 4
2026-02-26 22:42:01,845 - [Producer] produced messages count: 4
2026-02-26 22:42:01,845 - [Consumer] consumed messages count: 10; elapsed time: 8.00s
2026-02-26 22:42:01,847 - [Consumer] start message batch consuming
2026-02-26 22:42:05,847 - [Producer] produced messages count: 4
2026-02-26 22:42:09,848 - [Producer] produced messages count: 4
2026-02-26 22:42:11,848 - [Consumer] consumed messages count: 8; elapsed time: 10.00s
v2.13.0 (changed)
2026-02-26 22:43:42,903 - python version: 3.13.12 (main, Feb 4 2026, 00:00:00) [GCC 15.2.1 20260123 (Red Hat 15.2.1-7)]
2026-02-26 22:43:42,903 - confluent-kafka version: 2.13.0
2026-02-26 22:43:42,982 - [Consumer] start message batch consuming
2026-02-26 22:43:42,984 - [Producer] produced messages count: 4
2026-02-26 22:43:46,185 - [Consumer] consumed messages count: 4; elapsed time: 3.20s
2026-02-26 22:43:46,187 - [Consumer] start message batch consuming
2026-02-26 22:43:46,986 - [Producer] produced messages count: 4
2026-02-26 22:43:46,989 - [Consumer] consumed messages count: 4; elapsed time: 0.80s
2026-02-26 22:43:46,990 - [Consumer] start message batch consuming
2026-02-26 22:43:50,988 - [Producer] produced messages count: 4
2026-02-26 22:43:50,993 - [Consumer] consumed messages count: 4; elapsed time: 4.00s
2026-02-26 22:43:50,994 - [Consumer] start message batch consuming
2026-02-26 22:43:54,990 - [Producer] produced messages count: 4
2026-02-26 22:43:54,996 - [Consumer] consumed messages count: 4; elapsed time: 4.00s
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
No labels