From 99ee85634f804deabe12f094c1aaa387a72c8569 Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Tue, 24 Mar 2026 15:00:37 +0100 Subject: [PATCH] Fix deadlock in ~Consumer() during rd_kafka_consumer_close() MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ~Consumer() called close() which invoked rd_kafka_consumer_close() directly, bypassing rd_kafka_destroy_flags(). This meant rk_terminate was never set, so consumer_close() took the blocking path, dispatched a rebalance callback, and handle_rebalance() called rd_kafka_assign() — a synchronous cross-thread RPC to the cgrp ops queue. The reply could get purged by rd_kafka_cgrp_terminated() racing on the internal main thread, deadlocking the caller forever. Replace close() with destroy_handle() which calls rd_kafka_destroy_flags() directly — the same path librdkafka uses internally. This sets rk_terminate before calling consumer_close(), enabling the fast non-blocking path when RD_KAFKA_DESTROY_F_NO_CONSUMER_CLOSE is set. The handle stays valid in the unique_ptr during the destroy (so get_handle() works if rebalance callbacks fire), then is released to prevent double-destroy in ~KafkaHandleBase. Co-Authored-By: Claude Opus 4.6 (1M context) --- include/cppkafka/kafka_handle_base.h | 1 + src/consumer.cpp | 38 +++++++++++----------------- src/kafka_handle_base.cpp | 12 +++++++++ 3 files changed, 28 insertions(+), 23 deletions(-) diff --git a/include/cppkafka/kafka_handle_base.h b/include/cppkafka/kafka_handle_base.h index ebd97562..6f47b463 100644 --- a/include/cppkafka/kafka_handle_base.h +++ b/include/cppkafka/kafka_handle_base.h @@ -362,6 +362,7 @@ class CPPKAFKA_API KafkaHandleBase { KafkaHandleBase(Configuration config); void set_handle(rd_kafka_t* handle); + void destroy_handle(); void check_error(rd_kafka_resp_err_t error) const; void check_error(rd_kafka_resp_err_t error, const rd_kafka_topic_partition_list_t* list_ptr) const; diff --git a/src/consumer.cpp b/src/consumer.cpp index c328ab28..407c78ea 100644 --- a/src/consumer.cpp +++ b/src/consumer.cpp @@ -73,29 +73,21 @@ Consumer::Consumer(Configuration config) } Consumer::~Consumer() { - try { - // make sure to destroy the function closures. in case they hold kafka - // objects, they will need to be destroyed before we destroy the handle - assignment_callback_ = nullptr; - revocation_callback_ = nullptr; - rebalance_error_callback_ = nullptr; - close(); - } - catch (const HandleException& ex) { - ostringstream error_msg; - error_msg << "Failed to close consumer [" << get_name() << "]: " << ex.what(); - CallbackInvoker error_cb("error", get_configuration().get_error_callback(), this); - CallbackInvoker logger_cb("log", get_configuration().get_log_callback(), nullptr); - if (error_cb) { - error_cb(*this, static_cast(ex.get_error().get_error()), error_msg.str()); - } - else if (logger_cb) { - logger_cb(*this, static_cast(LogLevel::LogErr), "cppkafka", error_msg.str()); - } - else { - rd_kafka_log_print(get_handle(), static_cast(LogLevel::LogErr), "cppkafka", error_msg.str().c_str()); - } - } + // Make sure to destroy the function closures. In case they hold kafka + // objects, they will need to be destroyed before we destroy the handle. + assignment_callback_ = nullptr; + revocation_callback_ = nullptr; + rebalance_error_callback_ = nullptr; + // Destroy the handle via rd_kafka_destroy_flags() which sets rk_terminate + // before calling rd_kafka_consumer_close(). This is critical: without + // rk_terminate set, consumer_close() takes the blocking path, dispatches + // a rebalance callback, handle_rebalance calls rd_kafka_assign() (a + // synchronous cross-thread RPC), and the reply can get purged by + // rd_kafka_cgrp_terminated() — deadlocking the caller. + // The handle stays valid during the destroy (needed by rebalance callbacks + // if NO_CONSUMER_CLOSE is not set) and is released from the unique_ptr + // afterwards to prevent double-destroy in ~KafkaHandleBase. + destroy_handle(); } void Consumer::set_assignment_callback(AssignmentCallback callback) { diff --git a/src/kafka_handle_base.cpp b/src/kafka_handle_base.cpp index f88720ef..90c13b00 100644 --- a/src/kafka_handle_base.cpp +++ b/src/kafka_handle_base.cpp @@ -298,6 +298,18 @@ int KafkaHandleBase::get_destroy_flags() const { #endif +void KafkaHandleBase::destroy_handle() { + rd_kafka_t* handle = get_handle(); + if (handle) { +#if RD_KAFKA_VERSION >= RD_KAFKA_DESTROY_FLAGS_SUPPORT_VERSION + rd_kafka_destroy_flags(handle, get_destroy_flags()); +#else + rd_kafka_destroy(handle); +#endif + handle_.release(); + } +} + void KafkaHandleBase::HandleDeleter::operator()(rd_kafka_t* handle) { #if RD_KAFKA_VERSION >= RD_KAFKA_DESTROY_FLAGS_SUPPORT_VERSION rd_kafka_destroy_flags(handle, handle_base_ptr_->get_destroy_flags());