Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@ def get_version():

return f"{latest_tag}.{commit_count}"
except subprocess.CalledProcessError:
raise Exception(
"Please make sure you have git installed, or you have a tag number"
)
raise Exception("Please make sure you have git installed and have a tag number")


# invoke the make command to build the shared library
Expand Down
63 changes: 39 additions & 24 deletions src/infinistore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,8 @@ Client::~Client() {
uv_poll_stop(&poll_handle_);
}

if (handle_) {
free(handle_);
handle_ = NULL;
}
// uv_close will free handle_, weak ptr here
handle_ = NULL;

if (send_mr_) {
ibv_dereg_mr(send_mr_);
Expand Down Expand Up @@ -155,36 +153,56 @@ Client::~Client() {
tcp_recv_buffer_ = NULL;
}

for (auto &item : outstanding_rdma_ops_queue_) {
delete[] item.first;
delete[] item.second;
}

destroy_rdma_context(&rdma_ctx_);
}

void on_close(uv_handle_t *handle) {
client_t *client = (client_t *)handle->data;
delete client;
free(handle);
}

struct BulkWriteCtx {
client_t *client;
uint32_t *header_buf;
client_t *client; // weak ptr
uint32_t header_buf[2];
boost::intrusive_ptr<PTR> ptr;
size_t offset;
size_t total_size;

BulkWriteCtx(client_t *client, boost::intrusive_ptr<PTR> ptr)
: client(client), ptr(ptr), offset(0), total_size(ptr->size) {
header_buf[0] = FINISH;
header_buf[1] = static_cast<uint32_t>(ptr->size);
}
BulkWriteCtx(BulkWriteCtx &) = delete;
~BulkWriteCtx() = default;
};

void on_chunk_write(uv_write_t *req, int status) {
BulkWriteCtx *ctx = (BulkWriteCtx *)req->data;
uv_handle_t *handle = (uv_handle_t *)req->handle;
free(req);

if (uv_is_closing(handle)) {
delete ctx;
return;
}

if (status < 0) {
ERROR("Write error {}", uv_strerror(status));
uv_close((uv_handle_t *)req->handle, on_close);
free(req);
uv_close(handle, on_close);
delete ctx;
return;
}

if (ctx->offset == ctx->total_size) {
DEBUG("write done");
ctx->client->reset_client_read_state();
free(req);
delete ctx;
return;
}
Expand All @@ -195,17 +213,22 @@ void on_chunk_write(uv_write_t *req, int status) {
uv_write_t *write_req = (uv_write_t *)malloc(sizeof(uv_write_t));
write_req->data = ctx;
uv_write(write_req, (uv_stream_t *)ctx->client->handle_, &buf, 1, on_chunk_write);
free(req);
}

void on_head_write(uv_write_t *req, int status) {
BulkWriteCtx *ctx = (BulkWriteCtx *)req->data;
uv_handle_t *handle = (uv_handle_t *)req->handle;
free(req);

if (uv_is_closing(handle)) {
delete ctx;
return;
}

if (status < 0) {
ERROR("Write error {}", uv_strerror(status));
free(ctx->header_buf);
delete ctx;
uv_close((uv_handle_t *)req->handle, on_close);
free(req);
uv_close(handle, on_close);
return;
}

Expand All @@ -217,7 +240,6 @@ void on_head_write(uv_write_t *req, int status) {
uv_write_t *write_req = (uv_write_t *)malloc(sizeof(uv_write_t));
write_req->data = ctx;
uv_write(write_req, (uv_stream_t *)ctx->client->handle_, &buf, 1, on_chunk_write);
free(req);
}

void evict_cache(float min_threshold, float max_threshold) {
Expand Down Expand Up @@ -273,20 +295,13 @@ int Client::tcp_payload_request(const TCPPayloadRequest *req) {
lru_queue.push_back(ptr);
ptr->lru_it = --lru_queue.end();

uint32_t *header_buf = (uint32_t *)malloc(sizeof(uint32_t) * 2);
header_buf[0] = FINISH;
header_buf[1] = static_cast<uint32_t>(ptr->size);

uv_write_t *write_req = (uv_write_t *)malloc(sizeof(uv_write_t));

// safe PTR to prevent it from being deleted early.
write_req->data = new BulkWriteCtx{.client = this,
.header_buf = header_buf,
.ptr = ptr,
.offset = 0,
.total_size = ptr->size};
write_req->data = new BulkWriteCtx(this, ptr);

uv_buf_t buf = uv_buf_init((char *)header_buf, sizeof(uint32_t) * 2);
uv_buf_t buf = uv_buf_init((char *)((BulkWriteCtx *)write_req->data)->header_buf,
sizeof(uint32_t) * 2);

uv_write(write_req, (uv_stream_t *)handle_, &buf, 1, on_head_write);

Expand Down