diff --git a/include/cppkafka/kafka_handle_base.h b/include/cppkafka/kafka_handle_base.h index ebd97562..6f47b463 100644 --- a/include/cppkafka/kafka_handle_base.h +++ b/include/cppkafka/kafka_handle_base.h @@ -362,6 +362,7 @@ class CPPKAFKA_API KafkaHandleBase { KafkaHandleBase(Configuration config); void set_handle(rd_kafka_t* handle); + void destroy_handle(); void check_error(rd_kafka_resp_err_t error) const; void check_error(rd_kafka_resp_err_t error, const rd_kafka_topic_partition_list_t* list_ptr) const; diff --git a/src/consumer.cpp b/src/consumer.cpp index c328ab28..407c78ea 100644 --- a/src/consumer.cpp +++ b/src/consumer.cpp @@ -73,29 +73,21 @@ Consumer::Consumer(Configuration config) } Consumer::~Consumer() { - try { - // make sure to destroy the function closures. in case they hold kafka - // objects, they will need to be destroyed before we destroy the handle - assignment_callback_ = nullptr; - revocation_callback_ = nullptr; - rebalance_error_callback_ = nullptr; - close(); - } - catch (const HandleException& ex) { - ostringstream error_msg; - error_msg << "Failed to close consumer [" << get_name() << "]: " << ex.what(); - CallbackInvoker error_cb("error", get_configuration().get_error_callback(), this); - CallbackInvoker logger_cb("log", get_configuration().get_log_callback(), nullptr); - if (error_cb) { - error_cb(*this, static_cast(ex.get_error().get_error()), error_msg.str()); - } - else if (logger_cb) { - logger_cb(*this, static_cast(LogLevel::LogErr), "cppkafka", error_msg.str()); - } - else { - rd_kafka_log_print(get_handle(), static_cast(LogLevel::LogErr), "cppkafka", error_msg.str().c_str()); - } - } + // Make sure to destroy the function closures. In case they hold kafka + // objects, they will need to be destroyed before we destroy the handle. + assignment_callback_ = nullptr; + revocation_callback_ = nullptr; + rebalance_error_callback_ = nullptr; + // Destroy the handle via rd_kafka_destroy_flags() which sets rk_terminate + // before calling rd_kafka_consumer_close(). This is critical: without + // rk_terminate set, consumer_close() takes the blocking path, dispatches + // a rebalance callback, handle_rebalance calls rd_kafka_assign() (a + // synchronous cross-thread RPC), and the reply can get purged by + // rd_kafka_cgrp_terminated() — deadlocking the caller. + // The handle stays valid during the destroy (needed by rebalance callbacks + // if NO_CONSUMER_CLOSE is not set) and is released from the unique_ptr + // afterwards to prevent double-destroy in ~KafkaHandleBase. + destroy_handle(); } void Consumer::set_assignment_callback(AssignmentCallback callback) { diff --git a/src/kafka_handle_base.cpp b/src/kafka_handle_base.cpp index f88720ef..90c13b00 100644 --- a/src/kafka_handle_base.cpp +++ b/src/kafka_handle_base.cpp @@ -298,6 +298,18 @@ int KafkaHandleBase::get_destroy_flags() const { #endif +void KafkaHandleBase::destroy_handle() { + rd_kafka_t* handle = get_handle(); + if (handle) { +#if RD_KAFKA_VERSION >= RD_KAFKA_DESTROY_FLAGS_SUPPORT_VERSION + rd_kafka_destroy_flags(handle, get_destroy_flags()); +#else + rd_kafka_destroy(handle); +#endif + handle_.release(); + } +} + void KafkaHandleBase::HandleDeleter::operator()(rd_kafka_t* handle) { #if RD_KAFKA_VERSION >= RD_KAFKA_DESTROY_FLAGS_SUPPORT_VERSION rd_kafka_destroy_flags(handle, handle_base_ptr_->get_destroy_flags());