From 1145b473a3f491a268b5ab5d9df8483b1989c63b Mon Sep 17 00:00:00 2001 From: "dongmao.zhang" Date: Tue, 6 Jan 2026 15:31:19 -0800 Subject: [PATCH] fix #179 --- setup.py | 4 +-- src/infinistore.cpp | 63 ++++++++++++++++++++++++++++----------------- 2 files changed, 40 insertions(+), 27 deletions(-) diff --git a/setup.py b/setup.py index a2a73f2..05a44e2 100644 --- a/setup.py +++ b/setup.py @@ -21,9 +21,7 @@ def get_version(): return f"{latest_tag}.{commit_count}" except subprocess.CalledProcessError: - raise Exception( - "Please make sure you have git installed, or you have a tag number" - ) + raise Exception("Please make sure you have git installed and have a tag number") # invoke the make command to build the shared library diff --git a/src/infinistore.cpp b/src/infinistore.cpp index d5cf5a3..3951fec 100644 --- a/src/infinistore.cpp +++ b/src/infinistore.cpp @@ -117,10 +117,8 @@ Client::~Client() { uv_poll_stop(&poll_handle_); } - if (handle_) { - free(handle_); - handle_ = NULL; - } + // uv_close will free handle_, weak ptr here + handle_ = NULL; if (send_mr_) { ibv_dereg_mr(send_mr_); @@ -155,28 +153,49 @@ Client::~Client() { tcp_recv_buffer_ = NULL; } + for (auto &item : outstanding_rdma_ops_queue_) { + delete[] item.first; + delete[] item.second; + } + destroy_rdma_context(&rdma_ctx_); } void on_close(uv_handle_t *handle) { client_t *client = (client_t *)handle->data; delete client; + free(handle); } struct BulkWriteCtx { - client_t *client; - uint32_t *header_buf; + client_t *client; // weak ptr + uint32_t header_buf[2]; boost::intrusive_ptr ptr; size_t offset; size_t total_size; + + BulkWriteCtx(client_t *client, boost::intrusive_ptr ptr) + : client(client), ptr(ptr), offset(0), total_size(ptr->size) { + header_buf[0] = FINISH; + header_buf[1] = static_cast(ptr->size); + } + BulkWriteCtx(BulkWriteCtx &) = delete; + ~BulkWriteCtx() = default; }; void on_chunk_write(uv_write_t *req, int status) { BulkWriteCtx *ctx = (BulkWriteCtx *)req->data; + uv_handle_t *handle = (uv_handle_t *)req->handle; + free(req); + + if (uv_is_closing(handle)) { + delete ctx; + return; + } + if (status < 0) { ERROR("Write error {}", uv_strerror(status)); - uv_close((uv_handle_t *)req->handle, on_close); - free(req); + uv_close(handle, on_close); delete ctx; return; } @@ -184,7 +203,6 @@ void on_chunk_write(uv_write_t *req, int status) { if (ctx->offset == ctx->total_size) { DEBUG("write done"); ctx->client->reset_client_read_state(); - free(req); delete ctx; return; } @@ -195,17 +213,22 @@ void on_chunk_write(uv_write_t *req, int status) { uv_write_t *write_req = (uv_write_t *)malloc(sizeof(uv_write_t)); write_req->data = ctx; uv_write(write_req, (uv_stream_t *)ctx->client->handle_, &buf, 1, on_chunk_write); - free(req); } void on_head_write(uv_write_t *req, int status) { BulkWriteCtx *ctx = (BulkWriteCtx *)req->data; + uv_handle_t *handle = (uv_handle_t *)req->handle; + free(req); + + if (uv_is_closing(handle)) { + delete ctx; + return; + } + if (status < 0) { ERROR("Write error {}", uv_strerror(status)); - free(ctx->header_buf); delete ctx; - uv_close((uv_handle_t *)req->handle, on_close); - free(req); + uv_close(handle, on_close); return; } @@ -217,7 +240,6 @@ void on_head_write(uv_write_t *req, int status) { uv_write_t *write_req = (uv_write_t *)malloc(sizeof(uv_write_t)); write_req->data = ctx; uv_write(write_req, (uv_stream_t *)ctx->client->handle_, &buf, 1, on_chunk_write); - free(req); } void evict_cache(float min_threshold, float max_threshold) { @@ -273,20 +295,13 @@ int Client::tcp_payload_request(const TCPPayloadRequest *req) { lru_queue.push_back(ptr); ptr->lru_it = --lru_queue.end(); - uint32_t *header_buf = (uint32_t *)malloc(sizeof(uint32_t) * 2); - header_buf[0] = FINISH; - header_buf[1] = static_cast(ptr->size); - uv_write_t *write_req = (uv_write_t *)malloc(sizeof(uv_write_t)); // safe PTR to prevent it from being deleted early. - write_req->data = new BulkWriteCtx{.client = this, - .header_buf = header_buf, - .ptr = ptr, - .offset = 0, - .total_size = ptr->size}; + write_req->data = new BulkWriteCtx(this, ptr); - uv_buf_t buf = uv_buf_init((char *)header_buf, sizeof(uint32_t) * 2); + uv_buf_t buf = uv_buf_init((char *)((BulkWriteCtx *)write_req->data)->header_buf, + sizeof(uint32_t) * 2); uv_write(write_req, (uv_stream_t *)handle_, &buf, 1, on_head_write);