From 9095e136e36b2d37de3e39c9a35783a94ff7fea7 Mon Sep 17 00:00:00 2001 From: thesues Date: Tue, 27 May 2025 21:36:15 +0000 Subject: [PATCH 1/2] fix: #179,fix double free uv_handle --- src/infinistore.cpp | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/src/infinistore.cpp b/src/infinistore.cpp index d5cf5a3..1af4670 100644 --- a/src/infinistore.cpp +++ b/src/infinistore.cpp @@ -117,11 +117,6 @@ Client::~Client() { uv_poll_stop(&poll_handle_); } - if (handle_) { - free(handle_); - handle_ = NULL; - } - if (send_mr_) { ibv_dereg_mr(send_mr_); send_mr_ = NULL; @@ -155,12 +150,24 @@ Client::~Client() { tcp_recv_buffer_ = NULL; } + while (!outstanding_rdma_ops_queue_.empty()) { + auto item = outstanding_rdma_ops_queue_.front(); + delete[] item.first; // ibv_send_wr array + delete[] item.second; // ibv_sge array + outstanding_rdma_ops_queue_.pop_front(); + } destroy_rdma_context(&rdma_ctx_); } void on_close(uv_handle_t *handle) { client_t *client = (client_t *)handle->data; - delete client; + + if (client) { + client->handle_ = NULL; + delete client; + } + + free(handle); } struct BulkWriteCtx { From a58ed7af2da55279cd9d47721c729e9f3871557b Mon Sep 17 00:00:00 2001 From: thesues Date: Fri, 6 Jun 2025 18:17:05 +0000 Subject: [PATCH 2/2] fix: remove all NULL --- src/infinistore.cpp | 62 +++++++++++++++++++++--------------------- src/libinfinistore.cpp | 40 +++++++++++++-------------- src/libinfinistore.h | 6 ++-- src/rdma.cpp | 22 +++++++-------- 4 files changed, 65 insertions(+), 65 deletions(-) diff --git a/src/infinistore.cpp b/src/infinistore.cpp index 1af4670..5e15d9a 100644 --- a/src/infinistore.cpp +++ b/src/infinistore.cpp @@ -53,10 +53,10 @@ const float ON_DEMAND_MIN_THRESHOLD = 0.8; const float ON_DEMAND_MAX_THRESHOLD = 0.95; struct Client { - uv_tcp_t *handle_ = NULL; // uv_stream_t - read_state_t state_; // state of the client, for parsing the request - size_t bytes_read_ = 0; // bytes read so far, for parsing the request - size_t expected_bytes_ = 0; // expected size of the body + uv_tcp_t *handle_ = nullptr; // uv_stream_t + read_state_t state_; // state of the client, for parsing the request + size_t bytes_read_ = 0; // bytes read so far, for parsing the request + size_t expected_bytes_ = 0; // expected size of the body header_t header_; boost::intrusive_ptr current_tcp_task_; @@ -66,14 +66,14 @@ struct Client { struct ibv_mr *recv_mr_[MAX_RECV_WR] = {}; // RDMA send buffer - char *send_buffer_ = NULL; - struct ibv_mr *send_mr_ = NULL; + char *send_buffer_ = nullptr; + struct ibv_mr *send_mr_ = nullptr; int outstanding_rdma_ops_ = 0; std::deque> outstanding_rdma_ops_queue_; // TCP send buffer - char *tcp_send_buffer_ = NULL; - char *tcp_recv_buffer_ = NULL; + char *tcp_send_buffer_ = nullptr; + char *tcp_recv_buffer_ = nullptr; rdma_conn_info_t remote_info_; rdma_conn_info_t local_info_; @@ -119,35 +119,35 @@ Client::~Client() { if (send_mr_) { ibv_dereg_mr(send_mr_); - send_mr_ = NULL; + send_mr_ = nullptr; } if (send_buffer_) { free(send_buffer_); - send_buffer_ = NULL; + send_buffer_ = nullptr; } for (int i = 0; i < MAX_RECV_WR; i++) { if (recv_mr_[i]) { - assert(recv_buffer_[i] != NULL); + assert(recv_buffer_[i] != nullptr); ibv_dereg_mr(recv_mr_[i]); - recv_mr_[i] = NULL; + recv_mr_[i] = nullptr; } if (recv_buffer_[i]) { free(recv_buffer_[i]); - recv_buffer_[i] = NULL; + recv_buffer_[i] = nullptr; } } if (tcp_send_buffer_) { free(tcp_send_buffer_); - tcp_send_buffer_ = NULL; + tcp_send_buffer_ = nullptr; } if (tcp_recv_buffer_) { free(tcp_recv_buffer_); - tcp_recv_buffer_ = NULL; + tcp_recv_buffer_ = nullptr; } while (!outstanding_rdma_ops_queue_.empty()) { @@ -163,7 +163,7 @@ void on_close(uv_handle_t *handle) { client_t *client = (client_t *)handle->data; if (client) { - client->handle_ = NULL; + client->handle_ = nullptr; delete client; } @@ -305,15 +305,15 @@ int Client::tcp_payload_request(const TCPPayloadRequest *req) { void Client::post_ack(int return_code) { // send an error code back - struct ibv_send_wr wr = {0}; - struct ibv_send_wr *bad_wr = NULL; + struct ibv_send_wr wr {}; + struct ibv_send_wr *bad_wr = nullptr; wr.wr_id = 0; wr.opcode = IBV_WR_SEND_WITH_IMM; wr.imm_data = return_code; wr.send_flags = 0; - wr.sg_list = NULL; + wr.sg_list = nullptr; wr.num_sge = 0; - wr.next = NULL; + wr.next = nullptr; int ret = ibv_post_send(rdma_ctx_.qp, &wr, &bad_wr); if (ret) { ERROR("Failed to send WITH_IMM message: {}", strerror(ret)); @@ -339,7 +339,7 @@ void Client::cq_poll_handle(uv_poll_t *handle, int status, int events) { ERROR("Failed to request CQ notification"); return; } - struct ibv_wc wc = {0}; + struct ibv_wc wc {}; while (ibv_poll_cq(cq, 1, &wc) > 0) { if (wc.status == IBV_WC_SUCCESS) { if (wc.opcode == IBV_WC_RECV) { // recv RDMA read/write request @@ -459,15 +459,15 @@ void extend_mempool() { } int Client::prepare_recv_rdma_request(int buf_idx) { - struct ibv_sge sge = {0}; - struct ibv_recv_wr rwr = {0}; - struct ibv_recv_wr *bad_wr = NULL; + struct ibv_sge sge {}; + struct ibv_recv_wr rwr {}; + struct ibv_recv_wr *bad_wr = nullptr; sge.addr = (uintptr_t)(recv_buffer_[buf_idx]); sge.length = PROTOCOL_BUFFER_SIZE; sge.lkey = recv_mr_[buf_idx]->lkey; rwr.wr_id = buf_idx; - rwr.next = NULL; + rwr.next = nullptr; rwr.sg_list = &sge; rwr.num_sge = 1; if (ibv_post_recv(rdma_ctx_.qp, &rwr, &bad_wr)) { @@ -762,7 +762,7 @@ int Client::rdma_exchange() { // send_resp send fixed size response to client. void Client::send_resp(int return_code, void *buf, size_t size) { if (size > 0) { - assert(buf != NULL); + assert(buf != nullptr); } uv_write_t *write_req = (uv_write_t *)malloc(sizeof(uv_write_t)); @@ -883,7 +883,7 @@ void handle_request(uv_stream_t *stream, client_t *client) { } if (error_code != 0) { - client->send_resp(error_code, NULL, 0); + client->send_resp(error_code, nullptr, 0); client->reset_client_read_state(); } @@ -931,7 +931,7 @@ void on_read(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) { } case READ_BODY: { - assert(client->tcp_recv_buffer_ != NULL); + assert(client->tcp_recv_buffer_ != nullptr); DEBUG("reading body, bytes_read: {}, expected_bytes: {}", client->bytes_read_, client->expected_bytes_); @@ -961,7 +961,7 @@ void on_read(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) { ptr->lru_it = --lru_queue.end(); client->current_tcp_task_.reset(); - client->send_resp(FINISH, NULL, 0); + client->send_resp(FINISH, nullptr, 0); client->reset_client_read_state(); } } @@ -990,7 +990,7 @@ void on_new_connection(uv_stream_t *server, int status) { uv_read_start((uv_stream_t *)client_handle, alloc_buffer, on_read); } else { - uv_close((uv_handle_t *)client_handle, NULL); + uv_close((uv_handle_t *)client_handle, nullptr); } } @@ -1009,7 +1009,7 @@ int register_server(unsigned long loop_ptr, server_config_t config) { loop = uv_default_loop(); loop = (uv_loop_t *)loop_ptr; - assert(loop != NULL); + assert(loop != nullptr); uv_tcp_init(loop, &server); struct sockaddr_in addr; uv_ip4_addr("0.0.0.0", config.service_port, &addr); diff --git a/src/libinfinistore.cpp b/src/libinfinistore.cpp index 3f5f79d..75bbd89 100644 --- a/src/libinfinistore.cpp +++ b/src/libinfinistore.cpp @@ -23,13 +23,13 @@ SendBuffer::SendBuffer(struct ibv_pd *pd, size_t size) { assert(false); } mr_ = ibv_reg_mr(pd, buffer_, PROTOCOL_BUFFER_SIZE, IBV_ACCESS_LOCAL_WRITE); - assert(mr_ != NULL); + assert(mr_ != nullptr); } SendBuffer::~SendBuffer() { DEBUG("destroying send buffer"); - assert(buffer_ != NULL); - assert(mr_ != NULL); + assert(buffer_ != nullptr); + assert(mr_ != nullptr); if (mr_) { ibv_dereg_mr(mr_); mr_ = nullptr; @@ -101,7 +101,7 @@ Connection::~Connection() { } void Connection::cq_handler() { - assert(ctx_.comp_channel != NULL); + assert(ctx_.comp_channel != nullptr); while (!stop_) { struct ibv_cq *ev_cq; @@ -179,7 +179,7 @@ void Connection::cq_handler() { SendBuffer *Connection::get_send_buffer() { /* - if send buffer list is empty,we just report error, and return NULL + if send buffer list is empty,we just report error, and return nullptr normal user should not have too many inflight requests, so we just report error */ assert(!send_buffers_.empty()); @@ -469,13 +469,13 @@ int Connection::delete_keys(const std::vector &keys) { } void Connection::post_recv_ack(rdma_info_base *info) { - struct ibv_recv_wr recv_wr = {0}; - struct ibv_recv_wr *bad_recv_wr = NULL; + struct ibv_recv_wr recv_wr {}; + struct ibv_recv_wr *bad_recv_wr = nullptr; recv_wr.wr_id = (uintptr_t)info; - recv_wr.next = NULL; - recv_wr.sg_list = NULL; + recv_wr.next = nullptr; + recv_wr.sg_list = nullptr; recv_wr.num_sge = 0; int ret = ibv_post_recv(ctx_.qp, &recv_wr, &bad_recv_wr); @@ -541,7 +541,7 @@ std::vector *Connection::r_tcp(const std::string &key) { } int Connection::w_tcp(const std::string &key, void *ptr, size_t size) { - assert(ptr != NULL); + assert(ptr != nullptr); FlatBufferBuilder builder(64 << 10); auto req = CreateTCPPayloadRequestDirect(builder, key.c_str(), size, OP_TCP_PUT); @@ -596,7 +596,7 @@ int Connection::w_tcp(const std::string &key, void *ptr, size_t size) { int Connection::w_rdma_async(const std::vector &keys, const std::vector offsets, int block_size, void *base_ptr, std::function callback) { - assert(base_ptr != NULL); + assert(base_ptr != nullptr); assert(offsets.size() == keys.size()); if (!local_mr_.count((uintptr_t)base_ptr)) { @@ -634,9 +634,9 @@ int Connection::w_rdma_async(const std::vector &keys, post_recv_ack(info); // send msg - struct ibv_sge sge = {0}; - struct ibv_send_wr wr = {0}; - struct ibv_send_wr *bad_wr = NULL; + struct ibv_sge sge {}; + struct ibv_send_wr wr {}; + struct ibv_send_wr *bad_wr = nullptr; sge.addr = (uintptr_t)builder.GetBufferPointer(); sge.length = builder.GetSize(); sge.lkey = send_buffer->mr_->lkey; @@ -659,7 +659,7 @@ int Connection::w_rdma_async(const std::vector &keys, int Connection::r_rdma_async(const std::vector &keys, const std::vector offsets, int block_size, void *base_ptr, std::function callback) { - assert(base_ptr != NULL); + assert(base_ptr != nullptr); if (!local_mr_.count((uintptr_t)base_ptr)) { ERROR("Please register memory first"); @@ -668,7 +668,7 @@ int Connection::r_rdma_async(const std::vector &keys, INFO("r_rdma,, block_size: {}, base_ptr: {}", block_size, base_ptr); struct ibv_mr *mr = local_mr_[(uintptr_t)base_ptr]; - assert(mr != NULL); + assert(mr != nullptr); auto *info = new rdma_read_info([callback](unsigned int code) { callback(code); }); post_recv_ack(info); @@ -700,13 +700,13 @@ int Connection::r_rdma_async(const std::vector &keys, builder.Finish(req); // send RDMA request - struct ibv_sge sge = {0}; + struct ibv_sge sge {}; sge.addr = (uintptr_t)builder.GetBufferPointer(); sge.length = builder.GetSize(); sge.lkey = send_buffer->mr_->lkey; - struct ibv_send_wr wr = {0}; - struct ibv_send_wr *bad_wr = NULL; + struct ibv_send_wr wr {}; + struct ibv_send_wr *bad_wr = nullptr; wr.wr_id = (uintptr_t)send_buffer; wr.opcode = IBV_WR_SEND; @@ -726,7 +726,7 @@ int Connection::r_rdma_async(const std::vector &keys, } int Connection::register_mr(void *base_ptr, size_t ptr_region_size) { - assert(base_ptr != NULL); + assert(base_ptr != nullptr); if (local_mr_.count((uintptr_t)base_ptr)) { WARN("this memory address is already registered!"); ibv_dereg_mr(local_mr_[(uintptr_t)base_ptr]); diff --git a/src/libinfinistore.h b/src/libinfinistore.h index e263a3f..1df9844 100644 --- a/src/libinfinistore.h +++ b/src/libinfinistore.h @@ -23,8 +23,8 @@ // because write_cache will be invoked asynchronously, // so each request will have a standalone send buffer. struct SendBuffer { - void *buffer_ = NULL; - struct ibv_mr *mr_ = NULL; + void *buffer_ = nullptr; + struct ibv_mr *mr_ = nullptr; SendBuffer(struct ibv_pd *pd, size_t size); SendBuffer(const SendBuffer &) = delete; @@ -78,7 +78,7 @@ class Connection { */ boost::lockfree::spsc_queue send_buffers_{MAX_RECV_WR}; - // struct ibv_comp_channel *comp_channel_ = NULL; + // struct ibv_comp_channel *comp_channel_ = nullptr; std::future cq_future_; // cq thread std::atomic stop_{false}; diff --git a/src/rdma.cpp b/src/rdma.cpp index b8dccf2..376334d 100644 --- a/src/rdma.cpp +++ b/src/rdma.cpp @@ -6,7 +6,7 @@ #include "utils.h" int close_rdma_device(struct rdma_device *rdma_dev) { - assert(rdma_dev != NULL); + assert(rdma_dev != nullptr); if (rdma_dev->pd) { ibv_dealloc_pd(rdma_dev->pd); @@ -39,7 +39,7 @@ int destroy_rdma_context(struct rdma_context *ctx) { int open_rdma_device(std::string dev_name, int ib_port, std::string link_type, int hint_gid_index, struct rdma_device *rdma_dev) { assert(link_type == "IB" || link_type == "Ethernet"); - assert(rdma_dev != NULL); + assert(rdma_dev != nullptr); rdma_dev->link_type = link_type; @@ -133,8 +133,8 @@ int open_rdma_device(std::string dev_name, int ib_port, std::string link_type, i } int init_rdma_context(struct rdma_context *ctx, struct rdma_device *rdma_dev) { - assert(ctx != NULL); - assert(rdma_dev != NULL); + assert(ctx != nullptr); + assert(rdma_dev != nullptr); ctx->comp_channel = ibv_create_comp_channel(rdma_dev->ib_ctx); if (!ctx->comp_channel) { @@ -144,7 +144,7 @@ int init_rdma_context(struct rdma_context *ctx, struct rdma_device *rdma_dev) { // Create Completion Queue ctx->cq = - ibv_create_cq(rdma_dev->ib_ctx, MAX_SEND_WR + MAX_RECV_WR, NULL, ctx->comp_channel, 0); + ibv_create_cq(rdma_dev->ib_ctx, MAX_SEND_WR + MAX_RECV_WR, nullptr, ctx->comp_channel, 0); if (!ctx->cq) { ERROR("Failed to create CQ"); return -1; @@ -192,7 +192,7 @@ int init_rdma_context(struct rdma_context *ctx, struct rdma_device *rdma_dev) { } rdma_conn_info_t get_rdma_conn_info(struct rdma_context *ctx, struct rdma_device *rdma_dev) { - assert(ctx != NULL); + assert(ctx != nullptr); rdma_conn_info_t conn_info = { .qpn = ctx->qp->qp_num, .psn = ctx->psn, @@ -204,8 +204,8 @@ rdma_conn_info_t get_rdma_conn_info(struct rdma_context *ctx, struct rdma_device } int modify_qp_to_init(struct rdma_context *ctx, struct rdma_device *rdma_dev) { - assert(ctx != NULL); - assert(rdma_dev != NULL); + assert(ctx != nullptr); + assert(rdma_dev != nullptr); struct ibv_qp_attr attr = {}; attr.qp_state = IBV_QPS_INIT; @@ -225,7 +225,7 @@ int modify_qp_to_init(struct rdma_context *ctx, struct rdma_device *rdma_dev) { } int modify_qp_to_rts(struct rdma_context *ctx) { - assert(ctx != NULL); + assert(ctx != nullptr); struct ibv_qp_attr attr = {}; attr.qp_state = IBV_QPS_RTS; @@ -248,8 +248,8 @@ int modify_qp_to_rts(struct rdma_context *ctx) { int modify_qp_to_rtr(struct rdma_context *ctx, struct rdma_device *rdma_dev, rdma_conn_info_t *remote_info) { - assert(ctx != NULL); - assert(rdma_dev != NULL); + assert(ctx != nullptr); + assert(rdma_dev != nullptr); struct ibv_qp_attr attr = {}; attr.qp_state = IBV_QPS_RTR;