From f69cb7986657ca785cb5210189f82cb44be260d5 Mon Sep 17 00:00:00 2001 From: Tuvie Date: Sat, 22 Apr 2023 17:56:53 +0800 Subject: [PATCH 1/2] avoid rdma::GlobalRelease before event dispatcher is stop --- src/brpc/rdma/rdma_helper.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/brpc/rdma/rdma_helper.cpp b/src/brpc/rdma/rdma_helper.cpp index ef9107061b..f72aa755c3 100644 --- a/src/brpc/rdma/rdma_helper.cpp +++ b/src/brpc/rdma/rdma_helper.cpp @@ -510,6 +510,8 @@ static void GlobalRdmaInitializeOrDieImpl() { ExitWithError(); } + atexit(GlobalRelease); + SocketOptions opt; opt.fd = g_context->async_fd; butil::make_close_on_exec(opt.fd); @@ -523,8 +525,6 @@ static void GlobalRdmaInitializeOrDieImpl() { ExitWithError(); } - atexit(GlobalRelease); - g_mem_alloc = butil::iobuf::blockmem_allocate; g_mem_dealloc = butil::iobuf::blockmem_deallocate; butil::iobuf::blockmem_allocate = BlockAllocate; From 7de5e0f11b05b592466895367575c00c751e26ce Mon Sep 17 00:00:00 2001 From: Tuvie Date: Tue, 30 May 2023 22:16:23 +0800 Subject: [PATCH 2/2] allow disabling send side zerocopy for rdma --- example/rdma_performance/client.cpp | 8 ++++- src/brpc/rdma/rdma_endpoint.cpp | 46 ++++++++++++++++++++++------- src/brpc/rdma/rdma_endpoint.h | 2 ++ src/brpc/rdma/rdma_helper.cpp | 13 ++++---- test/brpc_rdma_unittest.cpp | 7 ++++- 5 files changed, 58 insertions(+), 18 deletions(-) diff --git a/example/rdma_performance/client.cpp b/example/rdma_performance/client.cpp index e6013b55ca..a069de92e9 100644 --- a/example/rdma_performance/client.cpp +++ b/example/rdma_performance/client.cpp @@ -38,6 +38,7 @@ DEFINE_int32(expected_qps, 0, "The expected QPS"); DEFINE_int32(max_thread_num, 16, "The max number of threads are used"); DEFINE_int32(attachment_size, -1, "Attachment size is used (in Bytes)"); DEFINE_bool(echo_attachment, false, "Select whether attachment should be echo"); +DEFINE_bool(attachment_as_userdata, false, "Append attachment as user_data"); DEFINE_string(connection_type, "single", "Connection type of the channel"); DEFINE_string(protocol, "baidu_std", "Protocol type."); DEFINE_string(servers, "0.0.0.0:8002+0.0.0.0:8002", "IP Address of servers"); @@ -86,7 +87,12 @@ class PerformanceTest { if (attachment_size > 0) { _addr = malloc(attachment_size); butil::fast_rand_bytes(_addr, attachment_size); - _attachment.append(_addr, attachment_size); + if (FLAGS_attachment_as_userdata) { + brpc::rdma::RegisterMemoryForRdma(_addr, (size_t)attachment_size); + _attachment.append_user_data(_addr, attachment_size, NULL); + } else { + _attachment.append(_addr, attachment_size); + } } _echo_attachment = echo_attachment; } diff --git a/src/brpc/rdma/rdma_endpoint.cpp b/src/brpc/rdma/rdma_endpoint.cpp index d3a91560cd..6664e685f3 100644 --- a/src/brpc/rdma/rdma_endpoint.cpp +++ b/src/brpc/rdma/rdma_endpoint.cpp @@ -49,6 +49,7 @@ extern bool g_skip_rdma_init; DEFINE_int32(rdma_sq_size, 128, "SQ size for RDMA"); DEFINE_int32(rdma_rq_size, 128, "RQ size for RDMA"); +DEFINE_bool(rdma_send_zerocopy, true, "Enable zerocopy for send side"); DEFINE_bool(rdma_recv_zerocopy, true, "Enable zerocopy for receive side"); DEFINE_int32(rdma_zerocopy_min_size, 512, "The minimal size for receive zerocopy"); DEFINE_string(rdma_recv_block_type, "default", "Default size type for recv WR: " @@ -801,29 +802,45 @@ ssize_t RdmaEndpoint::CutFromIOBufList(butil::IOBuf** from, size_t ndata) { wr.sg_list = sglist; wr.opcode = IBV_WR_SEND_WITH_IMM; - RdmaIOBuf* data = (RdmaIOBuf*)from[current]; size_t sge_index = 0; while (sge_index < (uint32_t)max_sge && this_len < _remote_recv_block_size) { - if (data->size() == 0) { + if (from[current]->size() == 0) { // The current IOBuf is empty, find next one ++current; if (current == ndata) { break; } - data = (RdmaIOBuf*)from[current]; continue; } - ssize_t len = data->cut_into_sglist_and_iobuf( - sglist, &sge_index, to, max_sge, - _remote_recv_block_size - this_len); - if (len < 0) { - return -1; + ssize_t len = 0; + if (FLAGS_rdma_send_zerocopy) { + ssize_t len = ((RdmaIOBuf*)from[current])->cut_into_sglist_and_iobuf( + sglist, &sge_index, to, max_sge, + _remote_recv_block_size - this_len); + if (len < 0) { + return -1; + } + this_len += len; + total_len += len; + } else { + len = _remote_recv_block_size - this_len; + void* buf = AllocBlock(len); + if (!buf) { + return -1; + } + len = from[current]->copy_to(buf, len); + from[current]->cutn(to, len); + sglist[sge_index].length = len; + sglist[sge_index].addr = (uint64_t)buf; + sglist[sge_index].lkey = GetLKey(buf); + ++sge_index; + this_len += len; + total_len += len; + _sbuf_data[_sq_current] = buf; + break; } - CHECK(len > 0); - this_len += len; - total_len += len; } if (this_len == 0) { continue; @@ -951,6 +968,9 @@ ssize_t RdmaEndpoint::HandleCompletion(ibv_wc& wc) { uint32_t acks = butil::NetToHost32(wc.imm_data); uint32_t num = acks; while (num > 0) { + if (!FLAGS_rdma_send_zerocopy) { + DeallocBlock(_sbuf_data[_sq_sent]); + } _sbuf[_sq_sent++].clear(); if (_sq_sent == _sq_size - RESERVED_WR_NUM) { _sq_sent = 0; @@ -1139,6 +1159,10 @@ int RdmaEndpoint::AllocateResources() { if (_rbuf.size() != _rq_size) { return -1; } + _sbuf_data.resize(_sq_size, NULL); + if (_sbuf_data.size() != _sq_size) { + return -1; + } _rbuf_data.resize(_rq_size, NULL); if (_rbuf_data.size() != _rq_size) { return -1; diff --git a/src/brpc/rdma/rdma_endpoint.h b/src/brpc/rdma/rdma_endpoint.h index 10f9a57a97..7571ab9785 100644 --- a/src/brpc/rdma/rdma_endpoint.h +++ b/src/brpc/rdma/rdma_endpoint.h @@ -213,6 +213,8 @@ friend class brpc::Socket; // Act as sendbuf and recvbuf, but requires no memcpy std::vector _sbuf; std::vector _rbuf; + // Data address of _sbuf + std::vector _sbuf_data; // Data address of _rbuf std::vector _rbuf_data; // Remote block size for receiving diff --git a/src/brpc/rdma/rdma_helper.cpp b/src/brpc/rdma/rdma_helper.cpp index cf1cce9505..df89094484 100644 --- a/src/brpc/rdma/rdma_helper.cpp +++ b/src/brpc/rdma/rdma_helper.cpp @@ -643,12 +643,15 @@ ibv_pd* GetRdmaPd() { } uint32_t GetLKey(void* buf) { - BAIDU_SCOPED_LOCK(*g_user_mrs_lock); - ibv_mr** mr_ptr = g_user_mrs->seek(buf); - if (mr_ptr) { - return (*mr_ptr)->lkey; + uint32_t lkey = GetRegionId(buf); + if (lkey == 0) { + BAIDU_SCOPED_LOCK(*g_user_mrs_lock); + ibv_mr** mr_ptr = g_user_mrs->seek(buf); + if (mr_ptr) { + return (*mr_ptr)->lkey; + } } - return 0; + return lkey; } ibv_gid GetRdmaGid() { diff --git a/test/brpc_rdma_unittest.cpp b/test/brpc_rdma_unittest.cpp index 5c52bd639e..f009ed037d 100644 --- a/test/brpc_rdma_unittest.cpp +++ b/test/brpc_rdma_unittest.cpp @@ -68,6 +68,7 @@ struct HelloMessage { }; DECLARE_bool(rdma_trace_verbose); +DECLARE_bool(rdma_send_zerocopy); DECLARE_int32(rdma_memory_pool_max_regions); extern ibv_cq* (*IbvCreateCq)(ibv_context*, int, void*, ibv_comp_channel*, int); extern int (*IbvDestroyCq)(ibv_cq*); @@ -1873,7 +1874,11 @@ TEST_F(RdmaTest, send_rpcs_with_user_defined_iobuf) { google::protobuf::Closure* done = DoNothing(); ::test::EchoService::Stub(&channel).Echo(&cntl[0], &req[0], &res[0], done); bthread_id_join(cntl[0].call_id()); - ASSERT_EQ(ERDMAMEM, cntl[0].ErrorCode()); + if (rdma::FLAGS_rdma_send_zerocopy) { + ASSERT_EQ(ERDMAMEM, cntl[0].ErrorCode()); + } else { + ASSERT_EQ(0, cntl[0].ErrorCode()); + } attach.clear(); sleep(2); // wait for client recover from EHOSTDOWN cntl[0].Reset();