Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 43 additions & 36 deletions src/infinistore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,10 @@ const float ON_DEMAND_MIN_THRESHOLD = 0.8;
const float ON_DEMAND_MAX_THRESHOLD = 0.95;

struct Client {
uv_tcp_t *handle_ = NULL; // uv_stream_t
read_state_t state_; // state of the client, for parsing the request
size_t bytes_read_ = 0; // bytes read so far, for parsing the request
size_t expected_bytes_ = 0; // expected size of the body
uv_tcp_t *handle_ = nullptr; // uv_stream_t
read_state_t state_; // state of the client, for parsing the request
size_t bytes_read_ = 0; // bytes read so far, for parsing the request
size_t expected_bytes_ = 0; // expected size of the body
header_t header_;

boost::intrusive_ptr<PTR> current_tcp_task_;
Expand All @@ -66,14 +66,14 @@ struct Client {
struct ibv_mr *recv_mr_[MAX_RECV_WR] = {};

// RDMA send buffer
char *send_buffer_ = NULL;
struct ibv_mr *send_mr_ = NULL;
char *send_buffer_ = nullptr;
struct ibv_mr *send_mr_ = nullptr;
int outstanding_rdma_ops_ = 0;
std::deque<std::pair<struct ibv_send_wr *, struct ibv_sge *>> outstanding_rdma_ops_queue_;

// TCP send buffer
char *tcp_send_buffer_ = NULL;
char *tcp_recv_buffer_ = NULL;
char *tcp_send_buffer_ = nullptr;
char *tcp_recv_buffer_ = nullptr;

rdma_conn_info_t remote_info_;
rdma_conn_info_t local_info_;
Expand Down Expand Up @@ -117,50 +117,57 @@ Client::~Client() {
uv_poll_stop(&poll_handle_);
}

if (handle_) {
free(handle_);
handle_ = NULL;
}

if (send_mr_) {
ibv_dereg_mr(send_mr_);
send_mr_ = NULL;
send_mr_ = nullptr;
}

if (send_buffer_) {
free(send_buffer_);
send_buffer_ = NULL;
send_buffer_ = nullptr;
}

for (int i = 0; i < MAX_RECV_WR; i++) {
if (recv_mr_[i]) {
assert(recv_buffer_[i] != NULL);
assert(recv_buffer_[i] != nullptr);
ibv_dereg_mr(recv_mr_[i]);
recv_mr_[i] = NULL;
recv_mr_[i] = nullptr;
}

if (recv_buffer_[i]) {
free(recv_buffer_[i]);
recv_buffer_[i] = NULL;
recv_buffer_[i] = nullptr;
}
}

if (tcp_send_buffer_) {
free(tcp_send_buffer_);
tcp_send_buffer_ = NULL;
tcp_send_buffer_ = nullptr;
}

if (tcp_recv_buffer_) {
free(tcp_recv_buffer_);
tcp_recv_buffer_ = NULL;
tcp_recv_buffer_ = nullptr;
}

while (!outstanding_rdma_ops_queue_.empty()) {
auto item = outstanding_rdma_ops_queue_.front();
delete[] item.first; // ibv_send_wr array
delete[] item.second; // ibv_sge array
outstanding_rdma_ops_queue_.pop_front();
}
destroy_rdma_context(&rdma_ctx_);
}

void on_close(uv_handle_t *handle) {
client_t *client = (client_t *)handle->data;
delete client;

if (client) {
client->handle_ = nullptr;
delete client;
}

free(handle);
}

struct BulkWriteCtx {
Expand Down Expand Up @@ -298,15 +305,15 @@ int Client::tcp_payload_request(const TCPPayloadRequest *req) {

void Client::post_ack(int return_code) {
// send an error code back
struct ibv_send_wr wr = {0};
struct ibv_send_wr *bad_wr = NULL;
struct ibv_send_wr wr {};
struct ibv_send_wr *bad_wr = nullptr;
wr.wr_id = 0;
wr.opcode = IBV_WR_SEND_WITH_IMM;
wr.imm_data = return_code;
wr.send_flags = 0;
wr.sg_list = NULL;
wr.sg_list = nullptr;
wr.num_sge = 0;
wr.next = NULL;
wr.next = nullptr;
int ret = ibv_post_send(rdma_ctx_.qp, &wr, &bad_wr);
if (ret) {
ERROR("Failed to send WITH_IMM message: {}", strerror(ret));
Expand All @@ -332,7 +339,7 @@ void Client::cq_poll_handle(uv_poll_t *handle, int status, int events) {
ERROR("Failed to request CQ notification");
return;
}
struct ibv_wc wc = {0};
struct ibv_wc wc {};
while (ibv_poll_cq(cq, 1, &wc) > 0) {
if (wc.status == IBV_WC_SUCCESS) {
if (wc.opcode == IBV_WC_RECV) { // recv RDMA read/write request
Expand Down Expand Up @@ -452,15 +459,15 @@ void extend_mempool() {
}

int Client::prepare_recv_rdma_request(int buf_idx) {
struct ibv_sge sge = {0};
struct ibv_recv_wr rwr = {0};
struct ibv_recv_wr *bad_wr = NULL;
struct ibv_sge sge {};
struct ibv_recv_wr rwr {};
struct ibv_recv_wr *bad_wr = nullptr;
sge.addr = (uintptr_t)(recv_buffer_[buf_idx]);
sge.length = PROTOCOL_BUFFER_SIZE;
sge.lkey = recv_mr_[buf_idx]->lkey;

rwr.wr_id = buf_idx;
rwr.next = NULL;
rwr.next = nullptr;
rwr.sg_list = &sge;
rwr.num_sge = 1;
if (ibv_post_recv(rdma_ctx_.qp, &rwr, &bad_wr)) {
Expand Down Expand Up @@ -755,7 +762,7 @@ int Client::rdma_exchange() {
// send_resp send fixed size response to client.
void Client::send_resp(int return_code, void *buf, size_t size) {
if (size > 0) {
assert(buf != NULL);
assert(buf != nullptr);
}
uv_write_t *write_req = (uv_write_t *)malloc(sizeof(uv_write_t));

Expand Down Expand Up @@ -876,7 +883,7 @@ void handle_request(uv_stream_t *stream, client_t *client) {
}

if (error_code != 0) {
client->send_resp(error_code, NULL, 0);
client->send_resp(error_code, nullptr, 0);
client->reset_client_read_state();
}

Expand Down Expand Up @@ -924,7 +931,7 @@ void on_read(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) {
}

case READ_BODY: {
assert(client->tcp_recv_buffer_ != NULL);
assert(client->tcp_recv_buffer_ != nullptr);

DEBUG("reading body, bytes_read: {}, expected_bytes: {}", client->bytes_read_,
client->expected_bytes_);
Expand Down Expand Up @@ -954,7 +961,7 @@ void on_read(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) {
ptr->lru_it = --lru_queue.end();

client->current_tcp_task_.reset();
client->send_resp(FINISH, NULL, 0);
client->send_resp(FINISH, nullptr, 0);
client->reset_client_read_state();
}
}
Expand Down Expand Up @@ -983,7 +990,7 @@ void on_new_connection(uv_stream_t *server, int status) {
uv_read_start((uv_stream_t *)client_handle, alloc_buffer, on_read);
}
else {
uv_close((uv_handle_t *)client_handle, NULL);
uv_close((uv_handle_t *)client_handle, nullptr);
}
}

Expand All @@ -1002,7 +1009,7 @@ int register_server(unsigned long loop_ptr, server_config_t config) {
loop = uv_default_loop();

loop = (uv_loop_t *)loop_ptr;
assert(loop != NULL);
assert(loop != nullptr);
uv_tcp_init(loop, &server);
struct sockaddr_in addr;
uv_ip4_addr("0.0.0.0", config.service_port, &addr);
Expand Down
40 changes: 20 additions & 20 deletions src/libinfinistore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@ SendBuffer::SendBuffer(struct ibv_pd *pd, size_t size) {
assert(false);
}
mr_ = ibv_reg_mr(pd, buffer_, PROTOCOL_BUFFER_SIZE, IBV_ACCESS_LOCAL_WRITE);
assert(mr_ != NULL);
assert(mr_ != nullptr);
}

SendBuffer::~SendBuffer() {
DEBUG("destroying send buffer");
assert(buffer_ != NULL);
assert(mr_ != NULL);
assert(buffer_ != nullptr);
assert(mr_ != nullptr);
if (mr_) {
ibv_dereg_mr(mr_);
mr_ = nullptr;
Expand Down Expand Up @@ -101,7 +101,7 @@ Connection::~Connection() {
}

void Connection::cq_handler() {
assert(ctx_.comp_channel != NULL);
assert(ctx_.comp_channel != nullptr);

while (!stop_) {
struct ibv_cq *ev_cq;
Expand Down Expand Up @@ -179,7 +179,7 @@ void Connection::cq_handler() {

SendBuffer *Connection::get_send_buffer() {
/*
if send buffer list is empty,we just report error, and return NULL
if send buffer list is empty,we just report error, and return nullptr
normal user should not have too many inflight requests, so we just report error
*/
assert(!send_buffers_.empty());
Expand Down Expand Up @@ -469,13 +469,13 @@ int Connection::delete_keys(const std::vector<std::string> &keys) {
}

void Connection::post_recv_ack(rdma_info_base *info) {
struct ibv_recv_wr recv_wr = {0};
struct ibv_recv_wr *bad_recv_wr = NULL;
struct ibv_recv_wr recv_wr {};
struct ibv_recv_wr *bad_recv_wr = nullptr;

recv_wr.wr_id = (uintptr_t)info;

recv_wr.next = NULL;
recv_wr.sg_list = NULL;
recv_wr.next = nullptr;
recv_wr.sg_list = nullptr;
recv_wr.num_sge = 0;

int ret = ibv_post_recv(ctx_.qp, &recv_wr, &bad_recv_wr);
Expand Down Expand Up @@ -541,7 +541,7 @@ std::vector<unsigned char> *Connection::r_tcp(const std::string &key) {
}

int Connection::w_tcp(const std::string &key, void *ptr, size_t size) {
assert(ptr != NULL);
assert(ptr != nullptr);

FlatBufferBuilder builder(64 << 10);
auto req = CreateTCPPayloadRequestDirect(builder, key.c_str(), size, OP_TCP_PUT);
Expand Down Expand Up @@ -596,7 +596,7 @@ int Connection::w_tcp(const std::string &key, void *ptr, size_t size) {
int Connection::w_rdma_async(const std::vector<std::string> &keys,
const std::vector<size_t> offsets, int block_size, void *base_ptr,
std::function<void(int)> callback) {
assert(base_ptr != NULL);
assert(base_ptr != nullptr);
assert(offsets.size() == keys.size());

if (!local_mr_.count((uintptr_t)base_ptr)) {
Expand Down Expand Up @@ -634,9 +634,9 @@ int Connection::w_rdma_async(const std::vector<std::string> &keys,
post_recv_ack(info);

// send msg
struct ibv_sge sge = {0};
struct ibv_send_wr wr = {0};
struct ibv_send_wr *bad_wr = NULL;
struct ibv_sge sge {};
struct ibv_send_wr wr {};
struct ibv_send_wr *bad_wr = nullptr;
sge.addr = (uintptr_t)builder.GetBufferPointer();
sge.length = builder.GetSize();
sge.lkey = send_buffer->mr_->lkey;
Expand All @@ -659,7 +659,7 @@ int Connection::w_rdma_async(const std::vector<std::string> &keys,
int Connection::r_rdma_async(const std::vector<std::string> &keys,
const std::vector<size_t> offsets, int block_size, void *base_ptr,
std::function<void(unsigned int code)> callback) {
assert(base_ptr != NULL);
assert(base_ptr != nullptr);

if (!local_mr_.count((uintptr_t)base_ptr)) {
ERROR("Please register memory first");
Expand All @@ -668,7 +668,7 @@ int Connection::r_rdma_async(const std::vector<std::string> &keys,

INFO("r_rdma,, block_size: {}, base_ptr: {}", block_size, base_ptr);
struct ibv_mr *mr = local_mr_[(uintptr_t)base_ptr];
assert(mr != NULL);
assert(mr != nullptr);

auto *info = new rdma_read_info([callback](unsigned int code) { callback(code); });
post_recv_ack(info);
Expand Down Expand Up @@ -700,13 +700,13 @@ int Connection::r_rdma_async(const std::vector<std::string> &keys,
builder.Finish(req);

// send RDMA request
struct ibv_sge sge = {0};
struct ibv_sge sge {};
sge.addr = (uintptr_t)builder.GetBufferPointer();
sge.length = builder.GetSize();
sge.lkey = send_buffer->mr_->lkey;

struct ibv_send_wr wr = {0};
struct ibv_send_wr *bad_wr = NULL;
struct ibv_send_wr wr {};
struct ibv_send_wr *bad_wr = nullptr;

wr.wr_id = (uintptr_t)send_buffer;
wr.opcode = IBV_WR_SEND;
Expand All @@ -726,7 +726,7 @@ int Connection::r_rdma_async(const std::vector<std::string> &keys,
}

int Connection::register_mr(void *base_ptr, size_t ptr_region_size) {
assert(base_ptr != NULL);
assert(base_ptr != nullptr);
if (local_mr_.count((uintptr_t)base_ptr)) {
WARN("this memory address is already registered!");
ibv_dereg_mr(local_mr_[(uintptr_t)base_ptr]);
Expand Down
6 changes: 3 additions & 3 deletions src/libinfinistore.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
// because write_cache will be invoked asynchronously,
// so each request will have a standalone send buffer.
struct SendBuffer {
void *buffer_ = NULL;
struct ibv_mr *mr_ = NULL;
void *buffer_ = nullptr;
struct ibv_mr *mr_ = nullptr;

SendBuffer(struct ibv_pd *pd, size_t size);
SendBuffer(const SendBuffer &) = delete;
Expand Down Expand Up @@ -78,7 +78,7 @@ class Connection {
*/
boost::lockfree::spsc_queue<SendBuffer *> send_buffers_{MAX_RECV_WR};

// struct ibv_comp_channel *comp_channel_ = NULL;
// struct ibv_comp_channel *comp_channel_ = nullptr;
std::future<void> cq_future_; // cq thread

std::atomic<bool> stop_{false};
Expand Down
Loading