Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions include/cppkafka/kafka_handle_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,7 @@ class CPPKAFKA_API KafkaHandleBase {
KafkaHandleBase(Configuration config);

void set_handle(rd_kafka_t* handle);
void destroy_handle();
void check_error(rd_kafka_resp_err_t error) const;
void check_error(rd_kafka_resp_err_t error,
const rd_kafka_topic_partition_list_t* list_ptr) const;
Expand Down
38 changes: 15 additions & 23 deletions src/consumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,29 +73,21 @@ Consumer::Consumer(Configuration config)
}

Consumer::~Consumer() {
try {
// make sure to destroy the function closures. in case they hold kafka
// objects, they will need to be destroyed before we destroy the handle
assignment_callback_ = nullptr;
revocation_callback_ = nullptr;
rebalance_error_callback_ = nullptr;
close();
}
catch (const HandleException& ex) {
ostringstream error_msg;
error_msg << "Failed to close consumer [" << get_name() << "]: " << ex.what();
CallbackInvoker<Configuration::ErrorCallback> error_cb("error", get_configuration().get_error_callback(), this);
CallbackInvoker<Configuration::LogCallback> logger_cb("log", get_configuration().get_log_callback(), nullptr);
if (error_cb) {
error_cb(*this, static_cast<int>(ex.get_error().get_error()), error_msg.str());
}
else if (logger_cb) {
logger_cb(*this, static_cast<int>(LogLevel::LogErr), "cppkafka", error_msg.str());
}
else {
rd_kafka_log_print(get_handle(), static_cast<int>(LogLevel::LogErr), "cppkafka", error_msg.str().c_str());
}
}
// Make sure to destroy the function closures. In case they hold kafka
// objects, they will need to be destroyed before we destroy the handle.
assignment_callback_ = nullptr;
revocation_callback_ = nullptr;
rebalance_error_callback_ = nullptr;
// Destroy the handle via rd_kafka_destroy_flags() which sets rk_terminate
// before calling rd_kafka_consumer_close(). This is critical: without
// rk_terminate set, consumer_close() takes the blocking path, dispatches
// a rebalance callback, handle_rebalance calls rd_kafka_assign() (a
// synchronous cross-thread RPC), and the reply can get purged by
// rd_kafka_cgrp_terminated() — deadlocking the caller.
// The handle stays valid during the destroy (needed by rebalance callbacks
// if NO_CONSUMER_CLOSE is not set) and is released from the unique_ptr
// afterwards to prevent double-destroy in ~KafkaHandleBase.
destroy_handle();
Comment thread
azat marked this conversation as resolved.
}

void Consumer::set_assignment_callback(AssignmentCallback callback) {
Expand Down
12 changes: 12 additions & 0 deletions src/kafka_handle_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,18 @@ int KafkaHandleBase::get_destroy_flags() const {
#endif


void KafkaHandleBase::destroy_handle() {
rd_kafka_t* handle = get_handle();
if (handle) {
#if RD_KAFKA_VERSION >= RD_KAFKA_DESTROY_FLAGS_SUPPORT_VERSION
rd_kafka_destroy_flags(handle, get_destroy_flags());
#else
rd_kafka_destroy(handle);
#endif
handle_.release();
}
}

void KafkaHandleBase::HandleDeleter::operator()(rd_kafka_t* handle) {
#if RD_KAFKA_VERSION >= RD_KAFKA_DESTROY_FLAGS_SUPPORT_VERSION
rd_kafka_destroy_flags(handle, handle_base_ptr_->get_destroy_flags());
Expand Down