From e240454a9fca7f01f71356942aa2b78892ce9acf Mon Sep 17 00:00:00 2001 From: FAQ Bot Date: Thu, 22 Jan 2026 23:51:15 +0000 Subject: [PATCH] NEW: How do you manually manage offsets in a Kafka consumer? --- ...manual-offset-management-kafka-consumer.md | 34 +++++++++++++++++++ 1 file changed, 34 insertions(+) create mode 100644 _questions/data-engineering-zoomcamp/module-6/029_cef28035ac_manual-offset-management-kafka-consumer.md diff --git a/_questions/data-engineering-zoomcamp/module-6/029_cef28035ac_manual-offset-management-kafka-consumer.md b/_questions/data-engineering-zoomcamp/module-6/029_cef28035ac_manual-offset-management-kafka-consumer.md new file mode 100644 index 00000000..2382b345 --- /dev/null +++ b/_questions/data-engineering-zoomcamp/module-6/029_cef28035ac_manual-offset-management-kafka-consumer.md @@ -0,0 +1,34 @@ +--- +id: cef28035ac +question: How do you manually manage offsets in a Kafka consumer? +sort_order: 29 +--- + +How do you manually manage offsets in a Kafka consumer? + +Kafka allows you to manually control when message offsets are committed. +This is useful when you want to **commit offsets only after successful message processing**, ensuring reliable processing. +**Step 1: Disable Auto Commit** +```python +consumer = KafkaConsumer( +'taxi-trips', +bootstrap_servers=['localhost:9092'], +enable_auto_commit=False # Manual commit +) +``` +Setting `enable_auto_commit=False` prevents Kafka from automatically committing offsets. +--- +**Step 2: Process Messages and Commit Manually** +```python +for message in consumer: +try: +# Process the message +process_trip(message.value) +# Commit offset after successful processing +consumer.commit() +except Exception as e: +print(f"Error: {e}") +# Do not commit on failure → message will be reprocessed +``` +Offsets are committed **only after** the message is successfully processed. +If an error occurs, the offset is not committed, so the message can be consumed again. \ No newline at end of file