From 15f8d18126cf93634d6333ef4ae371f4172662a3 Mon Sep 17 00:00:00 2001 From: zchuango Date: Wed, 13 May 2026 22:15:17 +0800 Subject: [PATCH 01/16] entable the ub transport for mooncake store --- mooncake-common/FindUrma.cmake | 2 +- mooncake-p2p-store/build.sh | 4 ++++ mooncake-store/src/client_buffer.cpp | 2 +- mooncake-store/src/client_service.cpp | 15 +++++++++++++++ .../kunpeng_transport/urma/urma_endpoint.cpp | 11 ++++++++--- scripts/build_wheel.sh | 1 + 6 files changed, 30 insertions(+), 5 deletions(-) diff --git a/mooncake-common/FindUrma.cmake b/mooncake-common/FindUrma.cmake index d2d93cc38b..fbc50ce61c 100644 --- a/mooncake-common/FindUrma.cmake +++ b/mooncake-common/FindUrma.cmake @@ -4,7 +4,7 @@ include(FetchContent) FetchContent_Declare( urma GIT_REPOSITORY https://atomgit.com/openeuler/umdk.git - GIT_TAG v25.12.0 + GIT_TAG master ) FetchContent_MakeAvailable(urma) diff --git a/mooncake-p2p-store/build.sh b/mooncake-p2p-store/build.sh index 9b98a73e7d..f2f1df7ebf 100644 --- a/mooncake-p2p-store/build.sh +++ b/mooncake-p2p-store/build.sh @@ -52,6 +52,10 @@ if [ -d "/usr/local/musa/lib" ]; then EXT_LDFLAGS+=" -L/usr/local/musa/lib -lmusart" fi +if [ -e "/usr/lib64/liburma.so" ]; then + EXT_LDFLAGS+=" -L/usr/lib64 -lurma" +fi + if [ "$USE_ETCD" = "ON" ]; then if [ "$USE_ETCD_LEGACY" = "ON" ]; then EXT_LDFLAGS+=" -letcd-cpp-api -lprotobuf -lgrpc++ -lgrpc" diff --git a/mooncake-store/src/client_buffer.cpp b/mooncake-store/src/client_buffer.cpp index c1f62816df..ae4df97312 100644 --- a/mooncake-store/src/client_buffer.cpp +++ b/mooncake-store/src/client_buffer.cpp @@ -34,7 +34,7 @@ ClientBufferAllocator::ClientBufferAllocator(size_t size, return; } // Align to 64 bytes(cache line size) for better cache performance - constexpr size_t alignment = 64; + constexpr size_t alignment = 4096; if (use_hugepage_) { buffer_ = allocate_buffer_mmap_memory(size, alignment); } else { diff --git a/mooncake-store/src/client_service.cpp b/mooncake-store/src/client_service.cpp index 1373f59019..0da7a6355e 100644 --- a/mooncake-store/src/client_service.cpp +++ b/mooncake-store/src/client_service.cpp @@ -524,6 +524,21 @@ ErrorCode Client::InitTransferEngine( LOG(ERROR) << "Failed to install CXL transport"; return ErrorCode::INTERNAL_ERROR; } + } else if (protocol == "ub") { + if (!device_names.has_value() || device_names->empty()) { + LOG(ERROR) << "ub protocol requires device names when auto " + "discovery is disabled"; + return ErrorCode::INVALID_PARAMS; + } + auto deviceName = device_names.value_or("bonding_dev_0"); + auto devices = splitString(deviceName, ',', true); + transfer_engine_->getLocalTopology()->discover(devices); + transport = transfer_engine_->installTransport("ub", nullptr); + if (!transport) { + LOG(ERROR) << "Failed to install ub transport with specified " + "devices"; + return ErrorCode::INTERNAL_ERROR; + } } else { LOG(ERROR) << "unsupported_protocol protocol=" << protocol; return ErrorCode::INVALID_PARAMS; diff --git a/mooncake-transfer-engine/src/transport/kunpeng_transport/urma/urma_endpoint.cpp b/mooncake-transfer-engine/src/transport/kunpeng_transport/urma/urma_endpoint.cpp index f7080a764b..9b7cba9fbb 100644 --- a/mooncake-transfer-engine/src/transport/kunpeng_transport/urma/urma_endpoint.cpp +++ b/mooncake-transfer-engine/src/transport/kunpeng_transport/urma/urma_endpoint.cpp @@ -420,13 +420,16 @@ int UrmaContext::openDevice(const std::string& device_name, uint8_t port, return ERR_CONTEXT; } for (int p = 0; p < MAX_PORT_CNT; p++) { - if (dev_attr_.port_attr[p].state == URMA_PORT_ACTIVE) { + auto port_attr = dev_attr_.port_attr[p]; + if (port_attr.state == URMA_PORT_ACTIVE || + port_attr.state == URMA_PORT_ACTIVE_DEFER) { port_ = p; break; } } - if (dev_attr_.port_cnt != 0 && - dev_attr_.port_attr[port_].state != URMA_PORT_ACTIVE) { + if (dev_attr_.port_cnt != 0 && ( + dev_attr_.port_attr[port_].state != URMA_PORT_ACTIVE || + dev_attr_.port_attr[port_].state != URMA_PORT_ACTIVE_DEFER)) { LOG(WARNING) << "Device " << device_name << " not found active port"; if (urma_delete_context(context)) { @@ -966,6 +969,8 @@ int UrmaEndpoint::doSetupConnection(int jetty_index, rjetty.jetty_id.eid = eid; rjetty.trans_mode = URMA_TM_RC; rjetty.type = URMA_JETTY; + rjetty.tp_type = URMA_CTP; + rjetty.flag.value = 0; LOG(INFO) << "Peer jetty id = " << peer_jetty_num; urma_target_jetty_t* imported_jetty = urma_import_jetty(context_->urma_context_, &rjetty, &urma_token); diff --git a/scripts/build_wheel.sh b/scripts/build_wheel.sh index 3b0456ab0b..b1788e7fa5 100755 --- a/scripts/build_wheel.sh +++ b/scripts/build_wheel.sh @@ -328,6 +328,7 @@ auditwheel repair ${OUTPUT_DIR}/*.whl \ --exclude libllm_datadist*.so \ --exclude ascend_transport*.so \ --exclude libaccl_barex.so* \ + --exclude liburma.so* \ -w ${REPAIRED_DIR}/ --plat ${PLATFORM_TAG} # Inject CUDA extensions into the repaired wheel. patchelf (used by auditwheel) From d2809573ca8e4fa94ec3685e568f05914a451e1b Mon Sep 17 00:00:00 2001 From: zchuango Date: Thu, 14 May 2026 22:08:25 +0800 Subject: [PATCH 02/16] modify some bug for client buffer --- mooncake-store/include/client_buffer.hpp | 7 ++++ mooncake-store/include/real_client.h | 14 ++++++++ mooncake-store/include/utils.h | 4 +-- mooncake-store/src/client_buffer.cpp | 30 +++++++++++++++++ mooncake-store/src/real_client.cpp | 22 ++++++++++--- mooncake-store/src/utils.cpp | 32 +++++++++++++++++-- .../kunpeng_transport/ub_context.cpp | 6 ++++ .../kunpeng_transport/urma/urma_endpoint.cpp | 4 +++ 8 files changed, 110 insertions(+), 9 deletions(-) diff --git a/mooncake-store/include/client_buffer.hpp b/mooncake-store/include/client_buffer.hpp index 44bb6f0680..ac26f5b887 100644 --- a/mooncake-store/include/client_buffer.hpp +++ b/mooncake-store/include/client_buffer.hpp @@ -32,6 +32,11 @@ class ClientBufferAllocator size_t size, const std::string& protocol = "", bool use_hugepage = false); + // Create with explicit NUMA node (for UB protocol) + static std::shared_ptr create( + size_t size, const std::string& protocol, + bool use_hugepage, int numa_node); + // Create for shared memory static std::shared_ptr create( void* addr, size_t size, const std::string& protocol = ""); @@ -62,6 +67,8 @@ class ClientBufferAllocator private: ClientBufferAllocator(size_t size, const std::string& protocol, bool use_hugepage); + ClientBufferAllocator(size_t size, const std::string& protocol, + bool use_hugepage, int numa_node); std::shared_ptr allocator_; diff --git a/mooncake-store/include/real_client.h b/mooncake-store/include/real_client.h index 0c6088e551..e92ad8de2b 100644 --- a/mooncake-store/include/real_client.h +++ b/mooncake-store/include/real_client.h @@ -4,7 +4,9 @@ #include #include #include +#include #include +#include #include #include #include @@ -755,11 +757,23 @@ class RealClient : public PyClient { } }; + struct NumaSegmentDeleter { + size_t size = 0; + void operator()(void *ptr) const { + if (ptr && size > 0) { + munmap(ptr, size); + numa_free(ptr, size); + } + } + }; + std::vector> hugepage_segment_ptrs_; std::vector> segment_ptrs_; std::vector> ascend_segment_ptrs_; + std::vector> + numa_segment_ptrs_; std::string protocol; std::string device_name; std::string local_hostname; diff --git a/mooncake-store/include/utils.h b/mooncake-store/include/utils.h index 5bed64d99f..9a48630837 100644 --- a/mooncake-store/include/utils.h +++ b/mooncake-store/include/utils.h @@ -294,7 +294,7 @@ constexpr double BYTES_PER_GIB = static_cast(SZ_1GB); */ void* allocate_buffer_allocator_memory( size_t total_size, const std::string& protocol = "", - size_t alignment = facebook::cachelib::Slab::kSize); + size_t alignment = facebook::cachelib::Slab::kSize, int numa_node=-1); inline size_t align_up(size_t size, size_t alignment) { if (alignment == 0) { @@ -408,7 +408,7 @@ void* allocate_buffer_numa_segments(size_t total_size, size_t page_size = 0); void free_memory(const std::string& protocol, void* ptr); - +void free_memory(const std::string& protocol, void* ptr, size_t size); // Network utility functions /** diff --git a/mooncake-store/src/client_buffer.cpp b/mooncake-store/src/client_buffer.cpp index ae4df97312..e0e7d10bd7 100644 --- a/mooncake-store/src/client_buffer.cpp +++ b/mooncake-store/src/client_buffer.cpp @@ -18,12 +18,42 @@ std::shared_ptr ClientBufferAllocator::create( new ClientBufferAllocator(size, protocol, use_hugepage)); } +std::shared_ptr ClientBufferAllocator::create( + size_t size, const std::string& protocol, bool use_hugepage, int numa_node) { + return std::shared_ptr( + new ClientBufferAllocator(size, protocol, use_hugepage, numa_node)); +} + std::shared_ptr ClientBufferAllocator::create( void* addr, size_t size, const std::string& protocol) { return std::shared_ptr( new ClientBufferAllocator(addr, size, protocol)); } +ClientBufferAllocator::ClientBufferAllocator(size_t size, + const std::string& protocol, + bool use_hugepage, + int numa_node) + : protocol(protocol), buffer_size_(size), use_hugepage_(use_hugepage) { + if (size == 0) { + buffer_ = nullptr; + allocator_ = nullptr; + return; + } + // Align to 64 bytes(cache line size) for better cache performance + constexpr size_t alignment = 4096; + if (use_hugepage_) { + buffer_ = allocate_buffer_mmap_memory(size, alignment); + } else { + buffer_ = allocate_buffer_allocator_memory(size, protocol, alignment, numa_node); + } + if (!buffer_) { + throw std::bad_alloc(); + } + + allocator_ = mooncake::offset_allocator::OffsetAllocator::create( + reinterpret_cast(buffer_), size); +} ClientBufferAllocator::ClientBufferAllocator(size_t size, const std::string& protocol, bool use_hugepage) diff --git a/mooncake-store/src/real_client.cpp b/mooncake-store/src/real_client.cpp index 3303168944..a578890f43 100644 --- a/mooncake-store/src/real_client.cpp +++ b/mooncake-store/src/real_client.cpp @@ -716,13 +716,25 @@ tl::expected RealClient::setup_internal( } } + // Determine NUMA node for UB protocol + int ub_numa_node = -1; + if (protocol == "ub") { + auto nic_numa_nodes = client_->GetNicNumaNodes(); + if (!nic_numa_nodes.empty()) { + ub_numa_node = nic_numa_nodes[0]; + LOG(INFO) << "UB: using NUMA node " << ub_numa_node << " for memory allocation"; + } else { + LOG(WARNING) << "UB: cannot detect NIC NUMA node, defaulting to 0"; + } + } + // Local_buffer_size is allowed to be 0, but we only register memory when // local_buffer_size > 0. Invoke ibv_reg_mr() with size=0 is UB, and may // fail in some rdma implementations. // Dummy Client can create shm and share it with Real Client, so Real Client // can create client buffer allocator on the shared memory later. client_buffer_allocator_ = ClientBufferAllocator::create( - local_buffer_size, this->protocol, should_use_hugepage); + local_buffer_size, this->protocol, should_use_hugepage, ub_numa_node); if (local_buffer_size > 0 && protocol != "cxl") { LOG(INFO) << "Registering local memory: " << local_buffer_size << " bytes"; @@ -1080,7 +1092,7 @@ tl::expected RealClient::tearDownAll_internal() { } for (const auto &entry : records_to_free) { if (entry.second.base) { - free_memory(entry.second.protocol, entry.second.base); + free_memory(entry.second.protocol, entry.second.base, entry.second.size); } } @@ -1089,6 +1101,7 @@ tl::expected RealClient::tearDownAll_internal() { client_buffer_allocator_.reset(); port_binder_.reset(); hugepage_segment_ptrs_.clear(); + numa_segment_ptrs_.clear(); segment_ptrs_.clear(); local_hostname = ""; device_name = ""; @@ -1343,7 +1356,7 @@ int RealClient::allocateAndMountSegment( client_->MountSegmentAndGetId(ptr, chunk_size, protocol, location); if (!result.has_value()) { LOG(ERROR) << "MountSegmentAndGetId failed"; - free_memory(protocol, ptr); + free_memory(protocol, ptr, chunk_size); break; } @@ -1362,7 +1375,8 @@ int RealClient::allocateAndMountSegment( } if (allocated_records[i].base) { free_memory(allocated_records[i].protocol, - allocated_records[i].base); + allocated_records[i].base, + allocated_records[i].size); } } out_segment_ids.clear(); diff --git a/mooncake-store/src/utils.cpp b/mooncake-store/src/utils.cpp index 70a8b209b5..b2a798181a 100644 --- a/mooncake-store/src/utils.cpp +++ b/mooncake-store/src/utils.cpp @@ -13,6 +13,8 @@ #include #include #include +#include +#include #include #include @@ -101,7 +103,8 @@ AutoPortBinder::~AutoPortBinder() { void *allocate_buffer_allocator_memory(size_t total_size, const std::string &protocol, - size_t alignment) { + size_t alignment, + int numa_node) { const size_t default_alignment = facebook::cachelib::Slab::kSize; // Ensure total_size is a multiple of alignment if (alignment == default_alignment && total_size < alignment) { @@ -113,7 +116,20 @@ void *allocate_buffer_allocator_memory(size_t total_size, return ascend_allocate_memory(total_size, protocol); } #endif - +#if defined(USE_UB) + if (protocol == "ub") { + int node = (numa_node >= 0) ? numa_node: 0; + void *ptr = numa_alloc_onnode(total_size, node); + if (!ptr) { + LOG(ERROR) << "numa_alloc_onnode failed for UB protocal, size=" + << total_size; + } else { + LOG(INFO) << "UB: numa_alloc_onnode allocated " << total_size + << " bytes at " << ptr; + } + return ptr; + } +#endif // Allocate aligned memory return aligned_alloc(alignment, total_size); } @@ -357,12 +373,22 @@ void *allocate_buffer_numa_segments(size_t total_size, } void free_memory(const std::string &protocol, void *ptr) { + free_memory(protocol, ptr, 0); +} + +void free_memory(const std::string &protocol, void *ptr, size_t size) { #if defined(USE_ASCEND_DIRECT) || defined(USE_UBSHMEM) if (protocol == "ascend" || protocol == "ubshmem") { return ascend_free_memory(protocol, ptr); } #endif - +#if defined(USE_UB) + if (protocol == "ub") { + munmap(ptr, size); // for urma + numa_free(ptr, size); + return; + } +#endif free(ptr); } diff --git a/mooncake-transfer-engine/src/transport/kunpeng_transport/ub_context.cpp b/mooncake-transfer-engine/src/transport/kunpeng_transport/ub_context.cpp index 4814a183f8..def82b9a1e 100644 --- a/mooncake-transfer-engine/src/transport/kunpeng_transport/ub_context.cpp +++ b/mooncake-transfer-engine/src/transport/kunpeng_transport/ub_context.cpp @@ -246,6 +246,12 @@ int UbWorkerPool::submitPostSend( auto targetSegment = peer_segment_desc->buffers[buffer_id].tseg[device_id]; slice->ub.r_seg = context_.retrieveRemoteSeg(targetSegment); + if (!slice->ub.r_seg) { + LOG(ERROR) << "[UB] retrieveRemoteSeg failed for target_id=" + << slice->target_id << " buffer_id=" << buffer_id + << " device_id" << device_id + << " dest_addr=" << slice->ub.dest_addr; + } auto peer_nic_path = MakeNicPath(peer_segment_desc->name, peer_segment_desc->devices[device_id].name); diff --git a/mooncake-transfer-engine/src/transport/kunpeng_transport/urma/urma_endpoint.cpp b/mooncake-transfer-engine/src/transport/kunpeng_transport/urma/urma_endpoint.cpp index 9b7cba9fbb..9591d92eb5 100644 --- a/mooncake-transfer-engine/src/transport/kunpeng_transport/urma/urma_endpoint.cpp +++ b/mooncake-transfer-engine/src/transport/kunpeng_transport/urma/urma_endpoint.cpp @@ -889,6 +889,10 @@ int UrmaEndpoint::submitPostSend( wr.flag.bs.inline_flag = 0; // Check if the jetty is in the imported_jetty_map_ auto it = imported_jetty_map_.find(jetty_list_[jetty_index]); + if (it == imported_jetty_map_.end()) { + LOG(ERROR) << "Jetty not imported for endpoint, tjetty is nullptr" << jetty_index + << ", local_nic="; + } if (it != imported_jetty_map_.end()) { wr.tjetty = it->second; } else { From 01953ce9a3b3032615ed873ba19a179a8b11d814 Mon Sep 17 00:00:00 2001 From: zchuango Date: Sat, 16 May 2026 09:12:34 +0000 Subject: [PATCH 03/16] add the ub transport allocator --- mooncake-store/src/utils.cpp | 15 +--- .../include/CMakeLists.txt | 1 + .../include/ub_allocator.h | 13 +++ .../kunpeng_transport/CMakeLists.txt | 2 +- .../kunpeng_transport/ub_allocator.cpp | 79 +++++++++++++++++++ 5 files changed, 97 insertions(+), 13 deletions(-) create mode 100644 mooncake-transfer-engine/include/ub_allocator.h create mode 100644 mooncake-transfer-engine/src/transport/kunpeng_transport/ub_allocator.cpp diff --git a/mooncake-store/src/utils.cpp b/mooncake-store/src/utils.cpp index b2a798181a..e9d2a35f5e 100644 --- a/mooncake-store/src/utils.cpp +++ b/mooncake-store/src/utils.cpp @@ -2,6 +2,7 @@ #include "mmap_arena.h" #include "config.h" #include "common.h" +#include "ub_allocator.h" #include #include @@ -118,16 +119,7 @@ void *allocate_buffer_allocator_memory(size_t total_size, #endif #if defined(USE_UB) if (protocol == "ub") { - int node = (numa_node >= 0) ? numa_node: 0; - void *ptr = numa_alloc_onnode(total_size, node); - if (!ptr) { - LOG(ERROR) << "numa_alloc_onnode failed for UB protocal, size=" - << total_size; - } else { - LOG(INFO) << "UB: numa_alloc_onnode allocated " << total_size - << " bytes at " << ptr; - } - return ptr; + return mooncake::ub_allocate_memory(total_size, numa_node); } #endif // Allocate aligned memory @@ -384,8 +376,7 @@ void free_memory(const std::string &protocol, void *ptr, size_t size) { #endif #if defined(USE_UB) if (protocol == "ub") { - munmap(ptr, size); // for urma - numa_free(ptr, size); + mooncake::ub_free_memory(ptr, size); return; } #endif diff --git a/mooncake-transfer-engine/include/CMakeLists.txt b/mooncake-transfer-engine/include/CMakeLists.txt index 4f52e1e5cd..56929077df 100644 --- a/mooncake-transfer-engine/include/CMakeLists.txt +++ b/mooncake-transfer-engine/include/CMakeLists.txt @@ -7,5 +7,6 @@ install(FILES multi_transport.h DESTINATION include) install(FILES topology.h DESTINATION include) install(FILES transfer_engine.h DESTINATION include) install(FILES transfer_metadata.h DESTINATION include) +install(FILES ub_allocator.h DESTINATION include) install(FILES common/base/status.h DESTINATION include/common/base) install(FILES transport/transport.h DESTINATION include/transport) diff --git a/mooncake-transfer-engine/include/ub_allocator.h b/mooncake-transfer-engine/include/ub_allocator.h new file mode 100644 index 0000000000..fbe34caf19 --- /dev/null +++ b/mooncake-transfer-engine/include/ub_allocator.h @@ -0,0 +1,13 @@ +#pragma once + +#include + +namespace mooncake { + +void* ub_allocate_memory(size_t total_size, int numa_node = -1); + +void ub_free_memory(void* ptr, size_t size); + +bool ub_is_store_memory(void* addr, size_t length); + +} // namespace mooncake \ No newline at end of file diff --git a/mooncake-transfer-engine/src/transport/kunpeng_transport/CMakeLists.txt b/mooncake-transfer-engine/src/transport/kunpeng_transport/CMakeLists.txt index c861696859..5583b7d813 100644 --- a/mooncake-transfer-engine/src/transport/kunpeng_transport/CMakeLists.txt +++ b/mooncake-transfer-engine/src/transport/kunpeng_transport/CMakeLists.txt @@ -1,4 +1,4 @@ -file(GLOB UB_SOURCES "*.cpp" "urma/urma_endpoint.cpp") +file(GLOB UB_SOURCES "*.cpp" "urma/urma_endpoint.cpp" "ub_allocator.cpp") # Check if liburma.so exists find_library(URMA_LIBRARY urma PATHS /usr/lib64) diff --git a/mooncake-transfer-engine/src/transport/kunpeng_transport/ub_allocator.cpp b/mooncake-transfer-engine/src/transport/kunpeng_transport/ub_allocator.cpp new file mode 100644 index 0000000000..25872fe18c --- /dev/null +++ b/mooncake-transfer-engine/src/transport/kunpeng_transport/ub_allocator.cpp @@ -0,0 +1,79 @@ +#include +#include +#include +#include + +#include +#include +#include +#include + +#include "ub_allocator.h" + +namespace mooncake { +namespace { + +struct StoreMemRange { + void* base; + size_t size; +}; + +std::mutex g_store_mem_mutex; +std::vector g_store_mem_ranges; + +void remove_store_memory_range(void* ptr) { + std::lock_guard store_lock(g_store_mem_mutex); + auto it = std::remove_if( + g_store_mem_ranges.begin(), g_store_mem_ranges.end(), + [ptr](const StoreMemRange& range) { return range.base == ptr; }); + g_store_mem_ranges.erase(it, g_store_mem_ranges.end()); +} + +} // namespace + +void* ub_allocate_memory(size_t total_size, int numa_node) { + int node = (numa_node >= 0) ? numa_node : 0; + void* ptr = numa_alloc_onnode(total_size, node); + if (!ptr) { + LOG(ERROR) << "numa_alloc_onnode failed for UB protocol, size=" + << total_size << ", numa_node=" << node; + return nullptr; + } + LOG(INFO) << "UB: numa_alloc_onnode allocated " << total_size + << " bytes at " << ptr << " on NUMA node " << node; + + std::lock_guard store_lock(g_store_mem_mutex); + g_store_mem_ranges.push_back({ptr, total_size}); + + return ptr; +} + +void ub_free_memory(void* ptr, size_t size) { + if (!ptr) { + return; + } + + remove_store_memory_range(ptr); + + munmap(ptr, size); + numa_free(ptr, size); + + LOG(INFO) << "UB: freed " << size << " bytes at " << ptr; +} + +bool ub_is_store_memory(void* addr, size_t length) { + if (!addr || length == 0) return false; + auto addr_start = reinterpret_cast(addr); + uintptr_t addr_end = addr_start + length; + std::lock_guard lock(g_store_mem_mutex); + for (const auto& range : g_store_mem_ranges) { + auto range_start = reinterpret_cast(range.base); + uintptr_t range_end = range_start + range.size; + if (addr_start < range_end && addr_end > range_start) { + return true; + } + } + return false; +} + +} // namespace mooncake \ No newline at end of file From 1cd0df5a08eb7d2c4a094ea3217548de1f4123a2 Mon Sep 17 00:00:00 2001 From: zchuango Date: Tue, 19 May 2026 12:04:32 +0800 Subject: [PATCH 04/16] modify some bug for client buffer for free --- mooncake-store/src/client_buffer.cpp | 4 ++-- mooncake-store/src/real_client.cpp | 3 +++ 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/mooncake-store/src/client_buffer.cpp b/mooncake-store/src/client_buffer.cpp index e0e7d10bd7..d8b4f19932 100644 --- a/mooncake-store/src/client_buffer.cpp +++ b/mooncake-store/src/client_buffer.cpp @@ -41,7 +41,7 @@ ClientBufferAllocator::ClientBufferAllocator(size_t size, return; } // Align to 64 bytes(cache line size) for better cache performance - constexpr size_t alignment = 4096; + constexpr size_t alignment = 64; if (use_hugepage_) { buffer_ = allocate_buffer_mmap_memory(size, alignment); } else { @@ -93,7 +93,7 @@ ClientBufferAllocator::~ClientBufferAllocator() { if (use_hugepage_) { free_buffer_mmap_memory(buffer_, buffer_size_); } else { - free_memory(protocol, buffer_); + free_memory(protocol, buffer_, buffer_size_); } } } diff --git a/mooncake-store/src/real_client.cpp b/mooncake-store/src/real_client.cpp index a578890f43..c11548a5c1 100644 --- a/mooncake-store/src/real_client.cpp +++ b/mooncake-store/src/real_client.cpp @@ -837,6 +837,9 @@ tl::expected RealClient::setup_internal( if (this->protocol == "ascend" || this->protocol == "ubshmem") { ascend_segment_ptrs_.emplace_back( ptr, AscendSegmentDeleter{this->protocol}); + } else if (this->protocol == "ub") { + numa_segment_ptrs_.emplace_back( + ptr, NumaSegmentDeleter{mapped_size}); } else if (!seg_numa_nodes.empty() || should_use_hugepage) { // NUMA-segmented or hugepage: track as mmap allocation for // munmap cleanup From 691321a4cd6367d78e47488616289c7dc3f51e4a Mon Sep 17 00:00:00 2001 From: zchuango Date: Wed, 20 May 2026 06:55:38 +0000 Subject: [PATCH 05/16] modify the urma version --- mooncake-common/FindUrma.cmake | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mooncake-common/FindUrma.cmake b/mooncake-common/FindUrma.cmake index fbc50ce61c..0af8d1a7ca 100644 --- a/mooncake-common/FindUrma.cmake +++ b/mooncake-common/FindUrma.cmake @@ -4,7 +4,7 @@ include(FetchContent) FetchContent_Declare( urma GIT_REPOSITORY https://atomgit.com/openeuler/umdk.git - GIT_TAG master + GIT_TAG v25.12.0.B081 ) FetchContent_MakeAvailable(urma) From b65dcc4d312a1dd04f154516adbb75a1ed6cf5df Mon Sep 17 00:00:00 2001 From: zchuango Date: Wed, 20 May 2026 20:06:18 +0800 Subject: [PATCH 06/16] refine the ub allocator for client buffer --- mooncake-store/include/client_buffer.hpp | 7 -- mooncake-store/include/real_client.h | 12 ++-- mooncake-store/include/utils.h | 4 +- mooncake-store/src/client_buffer.cpp | 34 +--------- mooncake-store/src/real_client.cpp | 27 ++------ mooncake-store/src/utils.cpp | 13 +--- .../include/ub_allocator.h | 6 +- .../kunpeng_transport/ub_allocator.cpp | 65 ++++++++----------- 8 files changed, 48 insertions(+), 120 deletions(-) diff --git a/mooncake-store/include/client_buffer.hpp b/mooncake-store/include/client_buffer.hpp index ac26f5b887..44bb6f0680 100644 --- a/mooncake-store/include/client_buffer.hpp +++ b/mooncake-store/include/client_buffer.hpp @@ -32,11 +32,6 @@ class ClientBufferAllocator size_t size, const std::string& protocol = "", bool use_hugepage = false); - // Create with explicit NUMA node (for UB protocol) - static std::shared_ptr create( - size_t size, const std::string& protocol, - bool use_hugepage, int numa_node); - // Create for shared memory static std::shared_ptr create( void* addr, size_t size, const std::string& protocol = ""); @@ -67,8 +62,6 @@ class ClientBufferAllocator private: ClientBufferAllocator(size_t size, const std::string& protocol, bool use_hugepage); - ClientBufferAllocator(size_t size, const std::string& protocol, - bool use_hugepage, int numa_node); std::shared_ptr allocator_; diff --git a/mooncake-store/include/real_client.h b/mooncake-store/include/real_client.h index e92ad8de2b..5c8844a794 100644 --- a/mooncake-store/include/real_client.h +++ b/mooncake-store/include/real_client.h @@ -4,9 +4,7 @@ #include #include #include -#include #include -#include #include #include #include @@ -757,12 +755,12 @@ class RealClient : public PyClient { } }; - struct NumaSegmentDeleter { + struct UbSegmentDeleter { size_t size = 0; + std::string protocol = "ub"; void operator()(void *ptr) const { if (ptr && size > 0) { - munmap(ptr, size); - numa_free(ptr, size); + free_memory(protocol.c_str(), ptr); } } }; @@ -772,8 +770,8 @@ class RealClient : public PyClient { std::vector> segment_ptrs_; std::vector> ascend_segment_ptrs_; - std::vector> - numa_segment_ptrs_; + std::vector> + ub_segment_ptrs_; std::string protocol; std::string device_name; std::string local_hostname; diff --git a/mooncake-store/include/utils.h b/mooncake-store/include/utils.h index 9a48630837..5bed64d99f 100644 --- a/mooncake-store/include/utils.h +++ b/mooncake-store/include/utils.h @@ -294,7 +294,7 @@ constexpr double BYTES_PER_GIB = static_cast(SZ_1GB); */ void* allocate_buffer_allocator_memory( size_t total_size, const std::string& protocol = "", - size_t alignment = facebook::cachelib::Slab::kSize, int numa_node=-1); + size_t alignment = facebook::cachelib::Slab::kSize); inline size_t align_up(size_t size, size_t alignment) { if (alignment == 0) { @@ -408,7 +408,7 @@ void* allocate_buffer_numa_segments(size_t total_size, size_t page_size = 0); void free_memory(const std::string& protocol, void* ptr); -void free_memory(const std::string& protocol, void* ptr, size_t size); + // Network utility functions /** diff --git a/mooncake-store/src/client_buffer.cpp b/mooncake-store/src/client_buffer.cpp index d8b4f19932..c1f62816df 100644 --- a/mooncake-store/src/client_buffer.cpp +++ b/mooncake-store/src/client_buffer.cpp @@ -18,42 +18,12 @@ std::shared_ptr ClientBufferAllocator::create( new ClientBufferAllocator(size, protocol, use_hugepage)); } -std::shared_ptr ClientBufferAllocator::create( - size_t size, const std::string& protocol, bool use_hugepage, int numa_node) { - return std::shared_ptr( - new ClientBufferAllocator(size, protocol, use_hugepage, numa_node)); -} - std::shared_ptr ClientBufferAllocator::create( void* addr, size_t size, const std::string& protocol) { return std::shared_ptr( new ClientBufferAllocator(addr, size, protocol)); } -ClientBufferAllocator::ClientBufferAllocator(size_t size, - const std::string& protocol, - bool use_hugepage, - int numa_node) - : protocol(protocol), buffer_size_(size), use_hugepage_(use_hugepage) { - if (size == 0) { - buffer_ = nullptr; - allocator_ = nullptr; - return; - } - // Align to 64 bytes(cache line size) for better cache performance - constexpr size_t alignment = 64; - if (use_hugepage_) { - buffer_ = allocate_buffer_mmap_memory(size, alignment); - } else { - buffer_ = allocate_buffer_allocator_memory(size, protocol, alignment, numa_node); - } - if (!buffer_) { - throw std::bad_alloc(); - } - - allocator_ = mooncake::offset_allocator::OffsetAllocator::create( - reinterpret_cast(buffer_), size); -} ClientBufferAllocator::ClientBufferAllocator(size_t size, const std::string& protocol, bool use_hugepage) @@ -64,7 +34,7 @@ ClientBufferAllocator::ClientBufferAllocator(size_t size, return; } // Align to 64 bytes(cache line size) for better cache performance - constexpr size_t alignment = 4096; + constexpr size_t alignment = 64; if (use_hugepage_) { buffer_ = allocate_buffer_mmap_memory(size, alignment); } else { @@ -93,7 +63,7 @@ ClientBufferAllocator::~ClientBufferAllocator() { if (use_hugepage_) { free_buffer_mmap_memory(buffer_, buffer_size_); } else { - free_memory(protocol, buffer_, buffer_size_); + free_memory(protocol, buffer_); } } } diff --git a/mooncake-store/src/real_client.cpp b/mooncake-store/src/real_client.cpp index c11548a5c1..8fcf185d9f 100644 --- a/mooncake-store/src/real_client.cpp +++ b/mooncake-store/src/real_client.cpp @@ -716,25 +716,13 @@ tl::expected RealClient::setup_internal( } } - // Determine NUMA node for UB protocol - int ub_numa_node = -1; - if (protocol == "ub") { - auto nic_numa_nodes = client_->GetNicNumaNodes(); - if (!nic_numa_nodes.empty()) { - ub_numa_node = nic_numa_nodes[0]; - LOG(INFO) << "UB: using NUMA node " << ub_numa_node << " for memory allocation"; - } else { - LOG(WARNING) << "UB: cannot detect NIC NUMA node, defaulting to 0"; - } - } - // Local_buffer_size is allowed to be 0, but we only register memory when // local_buffer_size > 0. Invoke ibv_reg_mr() with size=0 is UB, and may // fail in some rdma implementations. // Dummy Client can create shm and share it with Real Client, so Real Client // can create client buffer allocator on the shared memory later. client_buffer_allocator_ = ClientBufferAllocator::create( - local_buffer_size, this->protocol, should_use_hugepage, ub_numa_node); + local_buffer_size, this->protocol, should_use_hugepage); if (local_buffer_size > 0 && protocol != "cxl") { LOG(INFO) << "Registering local memory: " << local_buffer_size << " bytes"; @@ -838,8 +826,8 @@ tl::expected RealClient::setup_internal( ascend_segment_ptrs_.emplace_back( ptr, AscendSegmentDeleter{this->protocol}); } else if (this->protocol == "ub") { - numa_segment_ptrs_.emplace_back( - ptr, NumaSegmentDeleter{mapped_size}); + ub_segment_ptrs_.emplace_back( + ptr, UbSegmentDeleter{mapped_size}); } else if (!seg_numa_nodes.empty() || should_use_hugepage) { // NUMA-segmented or hugepage: track as mmap allocation for // munmap cleanup @@ -1095,7 +1083,7 @@ tl::expected RealClient::tearDownAll_internal() { } for (const auto &entry : records_to_free) { if (entry.second.base) { - free_memory(entry.second.protocol, entry.second.base, entry.second.size); + free_memory(entry.second.protocol, entry.second.base); } } @@ -1104,7 +1092,7 @@ tl::expected RealClient::tearDownAll_internal() { client_buffer_allocator_.reset(); port_binder_.reset(); hugepage_segment_ptrs_.clear(); - numa_segment_ptrs_.clear(); + ub_segment_ptrs_.clear(); segment_ptrs_.clear(); local_hostname = ""; device_name = ""; @@ -1359,7 +1347,7 @@ int RealClient::allocateAndMountSegment( client_->MountSegmentAndGetId(ptr, chunk_size, protocol, location); if (!result.has_value()) { LOG(ERROR) << "MountSegmentAndGetId failed"; - free_memory(protocol, ptr, chunk_size); + free_memory(protocol, ptr); break; } @@ -1378,8 +1366,7 @@ int RealClient::allocateAndMountSegment( } if (allocated_records[i].base) { free_memory(allocated_records[i].protocol, - allocated_records[i].base, - allocated_records[i].size); + allocated_records[i].base); } } out_segment_ids.clear(); diff --git a/mooncake-store/src/utils.cpp b/mooncake-store/src/utils.cpp index e9d2a35f5e..7dd19e9b6e 100644 --- a/mooncake-store/src/utils.cpp +++ b/mooncake-store/src/utils.cpp @@ -14,8 +14,6 @@ #include #include #include -#include -#include #include #include @@ -104,8 +102,7 @@ AutoPortBinder::~AutoPortBinder() { void *allocate_buffer_allocator_memory(size_t total_size, const std::string &protocol, - size_t alignment, - int numa_node) { + size_t alignment) { const size_t default_alignment = facebook::cachelib::Slab::kSize; // Ensure total_size is a multiple of alignment if (alignment == default_alignment && total_size < alignment) { @@ -119,7 +116,7 @@ void *allocate_buffer_allocator_memory(size_t total_size, #endif #if defined(USE_UB) if (protocol == "ub") { - return mooncake::ub_allocate_memory(total_size, numa_node); + return mooncake::ub_allocate_memory(alignment, total_size); } #endif // Allocate aligned memory @@ -365,10 +362,6 @@ void *allocate_buffer_numa_segments(size_t total_size, } void free_memory(const std::string &protocol, void *ptr) { - free_memory(protocol, ptr, 0); -} - -void free_memory(const std::string &protocol, void *ptr, size_t size) { #if defined(USE_ASCEND_DIRECT) || defined(USE_UBSHMEM) if (protocol == "ascend" || protocol == "ubshmem") { return ascend_free_memory(protocol, ptr); @@ -376,7 +369,7 @@ void free_memory(const std::string &protocol, void *ptr, size_t size) { #endif #if defined(USE_UB) if (protocol == "ub") { - mooncake::ub_free_memory(ptr, size); + mooncake::ub_free_memory(ptr); return; } #endif diff --git a/mooncake-transfer-engine/include/ub_allocator.h b/mooncake-transfer-engine/include/ub_allocator.h index fbe34caf19..a753f42165 100644 --- a/mooncake-transfer-engine/include/ub_allocator.h +++ b/mooncake-transfer-engine/include/ub_allocator.h @@ -1,12 +1,10 @@ #pragma once -#include - namespace mooncake { -void* ub_allocate_memory(size_t total_size, int numa_node = -1); +void* ub_allocate_memory(size_t alignment, size_t total_size); -void ub_free_memory(void* ptr, size_t size); +void ub_free_memory(void* ptr); bool ub_is_store_memory(void* addr, size_t length); diff --git a/mooncake-transfer-engine/src/transport/kunpeng_transport/ub_allocator.cpp b/mooncake-transfer-engine/src/transport/kunpeng_transport/ub_allocator.cpp index 25872fe18c..e010929302 100644 --- a/mooncake-transfer-engine/src/transport/kunpeng_transport/ub_allocator.cpp +++ b/mooncake-transfer-engine/src/transport/kunpeng_transport/ub_allocator.cpp @@ -2,71 +2,60 @@ #include #include #include - #include -#include -#include -#include #include "ub_allocator.h" namespace mooncake { -namespace { - -struct StoreMemRange { +struct UbStoreMemRange { void* base; size_t size; }; - -std::mutex g_store_mem_mutex; -std::vector g_store_mem_ranges; +std::mutex g_ub_store_mem_mutex; +std::vector g_ub_store_mem_ranges; void remove_store_memory_range(void* ptr) { - std::lock_guard store_lock(g_store_mem_mutex); + std::lock_guard store_lock(g_ub_store_mem_mutex); auto it = std::remove_if( - g_store_mem_ranges.begin(), g_store_mem_ranges.end(), - [ptr](const StoreMemRange& range) { return range.base == ptr; }); - g_store_mem_ranges.erase(it, g_store_mem_ranges.end()); + g_ub_store_mem_ranges.begin(), g_ub_store_mem_ranges.end(), + [ptr](const UbStoreMemRange& range) { return range.base == ptr; }); + g_ub_store_mem_ranges.erase(it, g_ub_store_mem_ranges.end()); } -} // namespace - -void* ub_allocate_memory(size_t total_size, int numa_node) { - int node = (numa_node >= 0) ? numa_node : 0; - void* ptr = numa_alloc_onnode(total_size, node); - if (!ptr) { - LOG(ERROR) << "numa_alloc_onnode failed for UB protocol, size=" - << total_size << ", numa_node=" << node; +void* ub_allocate_memory(size_t alignment, size_t total_size) { + size_t page_size = getpagesize(); + size_t ub_alignment = std::max(alignment / page_size * page_size, page_size); + void** ptr = nullptr; + auto ret = posix_memalign(ptr, ub_alignment, total_size); + if (!ret) { + LOG(ERROR) << "failed for UB protocol, size=" + << total_size << ", alignment : " << alignment; return nullptr; } - LOG(INFO) << "UB: numa_alloc_onnode allocated " << total_size - << " bytes at " << ptr << " on NUMA node " << node; - - std::lock_guard store_lock(g_store_mem_mutex); - g_store_mem_ranges.push_back({ptr, total_size}); - - return ptr; + LOG(INFO) << "UB: allocated total size : " << total_size + << ", alignment : " << alignment << " addr at " << *ptr; + + std::lock_guard store_lock(g_ub_store_mem_mutex); + g_ub_store_mem_ranges.push_back({*ptr, total_size}); + + return *ptr; } -void ub_free_memory(void* ptr, size_t size) { +void ub_free_memory(void* ptr) { if (!ptr) { return; } - remove_store_memory_range(ptr); - - munmap(ptr, size); - numa_free(ptr, size); - - LOG(INFO) << "UB: freed " << size << " bytes at " << ptr; + free(ptr); + LOG(INFO) << "UB: freed bytes at " << ptr; } bool ub_is_store_memory(void* addr, size_t length) { if (!addr || length == 0) return false; auto addr_start = reinterpret_cast(addr); uintptr_t addr_end = addr_start + length; - std::lock_guard lock(g_store_mem_mutex); - for (const auto& range : g_store_mem_ranges) { + std::lock_guard lock(g_ub_store_mem_mutex); + for (const auto& range : g_ub_store_mem_ranges) { auto range_start = reinterpret_cast(range.base); uintptr_t range_end = range_start + range.size; if (addr_start < range_end && addr_end > range_start) { From 050b58c8645d9fab3991f2232ffc59ede8703584 Mon Sep 17 00:00:00 2001 From: zchuango Date: Thu, 21 May 2026 03:18:31 +0000 Subject: [PATCH 07/16] fix the allocator free --- .../kunpeng_transport/ub_allocator.cpp | 26 +++++++++++-------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/mooncake-transfer-engine/src/transport/kunpeng_transport/ub_allocator.cpp b/mooncake-transfer-engine/src/transport/kunpeng_transport/ub_allocator.cpp index e010929302..a21d3cc68c 100644 --- a/mooncake-transfer-engine/src/transport/kunpeng_transport/ub_allocator.cpp +++ b/mooncake-transfer-engine/src/transport/kunpeng_transport/ub_allocator.cpp @@ -1,8 +1,10 @@ #include +#include #include #include #include #include +#include #include "ub_allocator.h" @@ -14,39 +16,41 @@ struct UbStoreMemRange { std::mutex g_ub_store_mem_mutex; std::vector g_ub_store_mem_ranges; -void remove_store_memory_range(void* ptr) { +size_t remove_store_memory_range(void* ptr) { std::lock_guard store_lock(g_ub_store_mem_mutex); auto it = std::remove_if( g_ub_store_mem_ranges.begin(), g_ub_store_mem_ranges.end(), [ptr](const UbStoreMemRange& range) { return range.base == ptr; }); g_ub_store_mem_ranges.erase(it, g_ub_store_mem_ranges.end()); + if (it != g_ub_store_mem_ranges.end()) { + return it->size; + } + LOG(ERROR) << "failed for UB protocol, addr at " << ptr; + return 0; } void* ub_allocate_memory(size_t alignment, size_t total_size) { - size_t page_size = getpagesize(); - size_t ub_alignment = std::max(alignment / page_size * page_size, page_size); - void** ptr = nullptr; - auto ret = posix_memalign(ptr, ub_alignment, total_size); - if (!ret) { + void* ptr = numa_alloc_local(total_size); + if (!ptr) { LOG(ERROR) << "failed for UB protocol, size=" << total_size << ", alignment : " << alignment; return nullptr; } LOG(INFO) << "UB: allocated total size : " << total_size - << ", alignment : " << alignment << " addr at " << *ptr; + << ", alignment : " << alignment << " addr at " << ptr; std::lock_guard store_lock(g_ub_store_mem_mutex); - g_ub_store_mem_ranges.push_back({*ptr, total_size}); + g_ub_store_mem_ranges.push_back({ptr, total_size}); - return *ptr; + return ptr; } void ub_free_memory(void* ptr) { if (!ptr) { return; } - remove_store_memory_range(ptr); - free(ptr); + auto size = remove_store_memory_range(ptr); + numa_free(ptr, size); LOG(INFO) << "UB: freed bytes at " << ptr; } From 7ef4b6faee697bbb5678d528cd8d6a9c4810ca57 Mon Sep 17 00:00:00 2001 From: zchuango Date: Thu, 21 May 2026 06:57:36 +0000 Subject: [PATCH 08/16] fix the bug for mooncake ub allocator --- .../kunpeng_transport/ub_allocator.cpp | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/mooncake-transfer-engine/src/transport/kunpeng_transport/ub_allocator.cpp b/mooncake-transfer-engine/src/transport/kunpeng_transport/ub_allocator.cpp index a21d3cc68c..48e7191e49 100644 --- a/mooncake-transfer-engine/src/transport/kunpeng_transport/ub_allocator.cpp +++ b/mooncake-transfer-engine/src/transport/kunpeng_transport/ub_allocator.cpp @@ -18,15 +18,20 @@ std::vector g_ub_store_mem_ranges; size_t remove_store_memory_range(void* ptr) { std::lock_guard store_lock(g_ub_store_mem_mutex); - auto it = std::remove_if( - g_ub_store_mem_ranges.begin(), g_ub_store_mem_ranges.end(), + + auto it = std::find_if( + g_ub_store_mem_ranges.begin(), + g_ub_store_mem_ranges.end(), [ptr](const UbStoreMemRange& range) { return range.base == ptr; }); - g_ub_store_mem_ranges.erase(it, g_ub_store_mem_ranges.end()); - if (it != g_ub_store_mem_ranges.end()) { - return it->size; + + if (it == g_ub_store_mem_ranges.end()) { + LOG(ERROR) << "failed for UB protocol, addr at " << ptr; + return 0; } - LOG(ERROR) << "failed for UB protocol, addr at " << ptr; - return 0; + + size_t sz = it->size; // 先保存 size + g_ub_store_mem_ranges.erase(it); // 再删除 + return sz; } void* ub_allocate_memory(size_t alignment, size_t total_size) { From c59670d35330b22b46fbd16d03503849723f62ff Mon Sep 17 00:00:00 2001 From: zchuango Date: Thu, 21 May 2026 12:37:31 +0000 Subject: [PATCH 09/16] optimize the urma_endpoint polling method --- .../src/transport/kunpeng_transport/ub_context.cpp | 2 +- .../src/transport/kunpeng_transport/urma/urma_endpoint.cpp | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/mooncake-transfer-engine/src/transport/kunpeng_transport/ub_context.cpp b/mooncake-transfer-engine/src/transport/kunpeng_transport/ub_context.cpp index def82b9a1e..7eb5eafed9 100644 --- a/mooncake-transfer-engine/src/transport/kunpeng_transport/ub_context.cpp +++ b/mooncake-transfer-engine/src/transport/kunpeng_transport/ub_context.cpp @@ -398,9 +398,9 @@ void UbWorkerPool::performPoll(int thread_id) { << context_.nicPath() << ", mark it inactive"; context_.set_active(false); } - context_.deleteEndpoint(slice->peer_nic_path); slice->ub.retry_cnt++; if (slice->ub.retry_cnt >= slice->ub.max_retry_cnt) { + context_.deleteEndpoint(slice->peer_nic_path); slice->markFailed(); processed_slice_count_++; } else { diff --git a/mooncake-transfer-engine/src/transport/kunpeng_transport/urma/urma_endpoint.cpp b/mooncake-transfer-engine/src/transport/kunpeng_transport/urma/urma_endpoint.cpp index 9591d92eb5..3dd4898f7e 100644 --- a/mooncake-transfer-engine/src/transport/kunpeng_transport/urma/urma_endpoint.cpp +++ b/mooncake-transfer-engine/src/transport/kunpeng_transport/urma/urma_endpoint.cpp @@ -531,9 +531,9 @@ int UrmaContext::poll(int num_entries, Transport::Slice** slices, if (!slice) { continue; } + slices[i] = slice; if (cr[i].status == URMA_CR_SUCCESS) { slice->markSuccess(); - slices[i] = slice; continue; } if (cr[i].status != URMA_CR_WR_FLUSH_ERR || From ed39ab9d72f2763c7cc43be0b76096695ecfb8fd Mon Sep 17 00:00:00 2001 From: zchuango Date: Sat, 23 May 2026 03:00:53 +0000 Subject: [PATCH 10/16] add the format for store --- mooncake-store/include/real_client.h | 3 +-- mooncake-store/src/real_client.cpp | 4 ++-- .../transport/kunpeng_transport/ub_allocator.cpp | 15 +++++++-------- .../kunpeng_transport/urma/urma_endpoint.cpp | 10 +++++----- 4 files changed, 15 insertions(+), 17 deletions(-) diff --git a/mooncake-store/include/real_client.h b/mooncake-store/include/real_client.h index d557e36bf8..0f73edddb5 100644 --- a/mooncake-store/include/real_client.h +++ b/mooncake-store/include/real_client.h @@ -781,8 +781,7 @@ class RealClient : public PyClient { std::vector> segment_ptrs_; std::vector> ascend_segment_ptrs_; - std::vector> - ub_segment_ptrs_; + std::vector> ub_segment_ptrs_; std::string protocol; std::string device_name; std::string local_hostname; diff --git a/mooncake-store/src/real_client.cpp b/mooncake-store/src/real_client.cpp index 773f535910..943f969b8b 100644 --- a/mooncake-store/src/real_client.cpp +++ b/mooncake-store/src/real_client.cpp @@ -827,8 +827,8 @@ tl::expected RealClient::setup_internal( ascend_segment_ptrs_.emplace_back( ptr, AscendSegmentDeleter{this->protocol}); } else if (this->protocol == "ub") { - ub_segment_ptrs_.emplace_back( - ptr, UbSegmentDeleter{mapped_size}); + ub_segment_ptrs_.emplace_back(ptr, + UbSegmentDeleter{mapped_size}); } else if (!seg_numa_nodes.empty() || should_use_hugepage) { // NUMA-segmented or hugepage: track as mmap allocation for // munmap cleanup diff --git a/mooncake-transfer-engine/src/transport/kunpeng_transport/ub_allocator.cpp b/mooncake-transfer-engine/src/transport/kunpeng_transport/ub_allocator.cpp index 48e7191e49..d47133de6c 100644 --- a/mooncake-transfer-engine/src/transport/kunpeng_transport/ub_allocator.cpp +++ b/mooncake-transfer-engine/src/transport/kunpeng_transport/ub_allocator.cpp @@ -18,10 +18,9 @@ std::vector g_ub_store_mem_ranges; size_t remove_store_memory_range(void* ptr) { std::lock_guard store_lock(g_ub_store_mem_mutex); - + auto it = std::find_if( - g_ub_store_mem_ranges.begin(), - g_ub_store_mem_ranges.end(), + g_ub_store_mem_ranges.begin(), g_ub_store_mem_ranges.end(), [ptr](const UbStoreMemRange& range) { return range.base == ptr; }); if (it == g_ub_store_mem_ranges.end()) { @@ -29,20 +28,20 @@ size_t remove_store_memory_range(void* ptr) { return 0; } - size_t sz = it->size; // 先保存 size - g_ub_store_mem_ranges.erase(it); // 再删除 + size_t sz = it->size; // 先保存 size + g_ub_store_mem_ranges.erase(it); // 再删除 return sz; } void* ub_allocate_memory(size_t alignment, size_t total_size) { void* ptr = numa_alloc_local(total_size); if (!ptr) { - LOG(ERROR) << "failed for UB protocol, size=" - << total_size << ", alignment : " << alignment; + LOG(ERROR) << "failed for UB protocol, size=" << total_size + << ", alignment : " << alignment; return nullptr; } LOG(INFO) << "UB: allocated total size : " << total_size - << ", alignment : " << alignment << " addr at " << ptr; + << ", alignment : " << alignment << " addr at " << ptr; std::lock_guard store_lock(g_ub_store_mem_mutex); g_ub_store_mem_ranges.push_back({ptr, total_size}); diff --git a/mooncake-transfer-engine/src/transport/kunpeng_transport/urma/urma_endpoint.cpp b/mooncake-transfer-engine/src/transport/kunpeng_transport/urma/urma_endpoint.cpp index 3dd4898f7e..f2bffe1ebd 100644 --- a/mooncake-transfer-engine/src/transport/kunpeng_transport/urma/urma_endpoint.cpp +++ b/mooncake-transfer-engine/src/transport/kunpeng_transport/urma/urma_endpoint.cpp @@ -427,9 +427,9 @@ int UrmaContext::openDevice(const std::string& device_name, uint8_t port, break; } } - if (dev_attr_.port_cnt != 0 && ( - dev_attr_.port_attr[port_].state != URMA_PORT_ACTIVE || - dev_attr_.port_attr[port_].state != URMA_PORT_ACTIVE_DEFER)) { + if (dev_attr_.port_cnt != 0 && + (dev_attr_.port_attr[port_].state != URMA_PORT_ACTIVE || + dev_attr_.port_attr[port_].state != URMA_PORT_ACTIVE_DEFER)) { LOG(WARNING) << "Device " << device_name << " not found active port"; if (urma_delete_context(context)) { @@ -890,8 +890,8 @@ int UrmaEndpoint::submitPostSend( // Check if the jetty is in the imported_jetty_map_ auto it = imported_jetty_map_.find(jetty_list_[jetty_index]); if (it == imported_jetty_map_.end()) { - LOG(ERROR) << "Jetty not imported for endpoint, tjetty is nullptr" << jetty_index - << ", local_nic="; + LOG(ERROR) << "Jetty not imported for endpoint, tjetty is nullptr" + << jetty_index << ", local_nic="; } if (it != imported_jetty_map_.end()) { wr.tjetty = it->second; From b62637b9e2f7da0e7c61ce089f11117ba295119f Mon Sep 17 00:00:00 2001 From: Chuang Zhang Date: Sat, 23 May 2026 16:49:32 +0800 Subject: [PATCH 11/16] Update mooncake-transfer-engine/src/transport/kunpeng_transport/ub_allocator.cpp Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> --- .../src/transport/kunpeng_transport/ub_allocator.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mooncake-transfer-engine/src/transport/kunpeng_transport/ub_allocator.cpp b/mooncake-transfer-engine/src/transport/kunpeng_transport/ub_allocator.cpp index d47133de6c..19e4311b0a 100644 --- a/mooncake-transfer-engine/src/transport/kunpeng_transport/ub_allocator.cpp +++ b/mooncake-transfer-engine/src/transport/kunpeng_transport/ub_allocator.cpp @@ -66,7 +66,7 @@ bool ub_is_store_memory(void* addr, size_t length) { for (const auto& range : g_ub_store_mem_ranges) { auto range_start = reinterpret_cast(range.base); uintptr_t range_end = range_start + range.size; - if (addr_start < range_end && addr_end > range_start) { + if (addr_start >= range_start && addr_end <= range_end) { return true; } } From 70cf34861b2a465a15448b53f7bdf5862be0eca2 Mon Sep 17 00:00:00 2001 From: zchuango Date: Sat, 23 May 2026 08:56:39 +0000 Subject: [PATCH 12/16] modify the condition bug for dev_attr_.port.state --- .../src/transport/kunpeng_transport/urma/urma_endpoint.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/mooncake-transfer-engine/src/transport/kunpeng_transport/urma/urma_endpoint.cpp b/mooncake-transfer-engine/src/transport/kunpeng_transport/urma/urma_endpoint.cpp index f2bffe1ebd..1525fe4c53 100644 --- a/mooncake-transfer-engine/src/transport/kunpeng_transport/urma/urma_endpoint.cpp +++ b/mooncake-transfer-engine/src/transport/kunpeng_transport/urma/urma_endpoint.cpp @@ -428,8 +428,8 @@ int UrmaContext::openDevice(const std::string& device_name, uint8_t port, } } if (dev_attr_.port_cnt != 0 && - (dev_attr_.port_attr[port_].state != URMA_PORT_ACTIVE || - dev_attr_.port_attr[port_].state != URMA_PORT_ACTIVE_DEFER)) { + dev_attr_.port_attr[port_].state != URMA_PORT_ACTIVE && + dev_attr_.port_attr[port_].state != URMA_PORT_ACTIVE_DEFER) { LOG(WARNING) << "Device " << device_name << " not found active port"; if (urma_delete_context(context)) { From 4bf7ce9f1ec1cd59ac8b5082497f74bc5ad5449a Mon Sep 17 00:00:00 2001 From: zchuango Date: Tue, 26 May 2026 09:29:24 +0000 Subject: [PATCH 13/16] add the deletePointByPtr method --- .../transport/kunpeng_transport/ub_context.h | 6 ++++ .../include/transport/transport.h | 1 + .../kunpeng_transport/ub_context.cpp | 34 +++++++++++++++++-- .../kunpeng_transport/urma/urma_endpoint.cpp | 2 ++ 4 files changed, 41 insertions(+), 2 deletions(-) diff --git a/mooncake-transfer-engine/include/transport/kunpeng_transport/ub_context.h b/mooncake-transfer-engine/include/transport/kunpeng_transport/ub_context.h index 7240ff7bbd..fd7f937141 100644 --- a/mooncake-transfer-engine/include/transport/kunpeng_transport/ub_context.h +++ b/mooncake-transfer-engine/include/transport/kunpeng_transport/ub_context.h @@ -83,6 +83,7 @@ class UbEndpointStore { virtual std::shared_ptr insertEndpoint( const std::string& peer_nic_path, UbContext* context) = 0; virtual int deleteEndpoint(const std::string& peer_nic_path) = 0; + virtual int deleteEndpointByPtr(UbEndPoint* point_ptr) = 0; virtual void evictEndpoint() = 0; virtual void reclaimEndpoint() = 0; virtual size_t getSize() = 0; @@ -102,6 +103,7 @@ class UbSIEVEEndpointStore : public UbEndpointStore { std::shared_ptr insertEndpoint(const std::string& peer_nic_path, UbContext* context) override; int deleteEndpoint(const std::string& peer_nic_path) override; + int deleteEndpointByPtr(UbEndPoint* point_ptr) override; void evictEndpoint() override; void reclaimEndpoint() override; size_t getSize() override; @@ -229,6 +231,10 @@ class UbContext { return endpoint_store_->deleteEndpoint(peer_nic_path); } + int deleteEndpointByPtr(UbEndPoint* point_ptr) { + return endpoint_store_->deleteEndpointByPtr(point_ptr); + } + int disconnectAllEndpoints() { return endpoint_store_->disconnect(); } // Device name, such as `mlx5_3` diff --git a/mooncake-transfer-engine/include/transport/transport.h b/mooncake-transfer-engine/include/transport/transport.h index 60f46fba39..aca0a0d15c 100644 --- a/mooncake-transfer-engine/include/transport/transport.h +++ b/mooncake-transfer-engine/include/transport/transport.h @@ -132,6 +132,7 @@ class Transport { uint32_t max_retry_cnt; void *r_seg; void *l_seg; + void *endpoint; } ub; struct { void *dest_addr; diff --git a/mooncake-transfer-engine/src/transport/kunpeng_transport/ub_context.cpp b/mooncake-transfer-engine/src/transport/kunpeng_transport/ub_context.cpp index 7eb5eafed9..ac10faa515 100644 --- a/mooncake-transfer-engine/src/transport/kunpeng_transport/ub_context.cpp +++ b/mooncake-transfer-engine/src/transport/kunpeng_transport/ub_context.cpp @@ -80,6 +80,29 @@ int UbSIEVEEndpointStore::deleteEndpoint(const std::string& peer_nic_path) { return 0; } +int UbSIEVEEndpointStore::deleteEndpointByPtr(UbEndPoint* point_ptr) { + RWSpinlock::WriteGuard guard(endpoint_map_lock_); + for (auto iter = endpoint_map_.begin(); iter != endpoint_map_.end(); + iter++) { + if (iter->second.first.get() == point_ptr) { + std::string peer_nic_path = iter->first; + iter->second.first->deconstruct(); + waiting_list_len_++; + waiting_list_.insert(iter->second.first); + auto fifo_iter = fifo_map_[peer_nic_path]; + if (hand_.has_value() && hand_.value() == fifo_iter) { + fifo_iter == fifo_list_.begin() ? hand_ = std::nullopt + : hand_ = std::prev(fifo_iter); + } + fifo_list_.erase(fifo_iter); + fifo_map_.erase(peer_nic_path); + endpoint_map_.erase(iter); + return 0; + } + } + return 0; +} + void UbSIEVEEndpointStore::evictEndpoint() { if (fifo_list_.empty()) { return; @@ -339,7 +362,7 @@ void UbWorkerPool::performPostSend(int thread_id) { } if (!endpoint->active()) { if (endpoint->inactiveTime() > 1.0) - context_.deleteEndpoint(entry.first); + context_.deleteEndpointByPtr(endpoint.get()); // enable for re-establishation for (auto& slice : entry.second) failed_slice_list.push_back(slice); entry.second.clear(); @@ -361,6 +384,10 @@ void UbWorkerPool::performPostSend(int thread_id) { entry.second.clear(); continue; } + // Set endpoint pointer for each slice before submitting + for (auto& slice : entry.second) { + slice->ub.endpoint = endpoint.get(); + } endpoint->submitPostSend(entry.second, failed_slice_list); #endif } @@ -400,7 +427,10 @@ void UbWorkerPool::performPoll(int thread_id) { } slice->ub.retry_cnt++; if (slice->ub.retry_cnt >= slice->ub.max_retry_cnt) { - context_.deleteEndpoint(slice->peer_nic_path); + if (slice->ub.endpoint) { + auto ptr = static_cast(slice->ub.endpoint); + context_.deleteEndpointByPtr(ptr); + } slice->markFailed(); processed_slice_count_++; } else { diff --git a/mooncake-transfer-engine/src/transport/kunpeng_transport/urma/urma_endpoint.cpp b/mooncake-transfer-engine/src/transport/kunpeng_transport/urma/urma_endpoint.cpp index 1525fe4c53..0d9b3c61a1 100644 --- a/mooncake-transfer-engine/src/transport/kunpeng_transport/urma/urma_endpoint.cpp +++ b/mooncake-transfer-engine/src/transport/kunpeng_transport/urma/urma_endpoint.cpp @@ -902,6 +902,8 @@ int UrmaEndpoint::submitPostSend( slice->ts = getCurrentTimeInNano(); slice->status = Transport::Slice::POSTED; slice->ub.jetty_depth = &wr_depth_list_[jetty_index]; + // Set endpoint pointer for each slice before submitting + slice->ub.endpoint = this; } __sync_fetch_and_add(&wr_depth_list_[jetty_index], wr_count); __sync_fetch_and_add(jfc_outstanding_, wr_count); From 8646f0272e05d63ada12da1c75c08ab28d344721 Mon Sep 17 00:00:00 2001 From: zchuango Date: Thu, 28 May 2026 14:44:00 +0800 Subject: [PATCH 14/16] resolve the conflicts and optimize the code comment --- mooncake-store/src/utils.cpp | 1 + .../src/transport/kunpeng_transport/ub_allocator.cpp | 4 ++-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/mooncake-store/src/utils.cpp b/mooncake-store/src/utils.cpp index e88022ea47..810a7cf871 100644 --- a/mooncake-store/src/utils.cpp +++ b/mooncake-store/src/utils.cpp @@ -2,6 +2,7 @@ #include "mmap_arena.h" #include "config.h" #include "common.h" +#include "ub_allocator.h" #include #include diff --git a/mooncake-transfer-engine/src/transport/kunpeng_transport/ub_allocator.cpp b/mooncake-transfer-engine/src/transport/kunpeng_transport/ub_allocator.cpp index 19e4311b0a..609ecf9843 100644 --- a/mooncake-transfer-engine/src/transport/kunpeng_transport/ub_allocator.cpp +++ b/mooncake-transfer-engine/src/transport/kunpeng_transport/ub_allocator.cpp @@ -28,8 +28,8 @@ size_t remove_store_memory_range(void* ptr) { return 0; } - size_t sz = it->size; // 先保存 size - g_ub_store_mem_ranges.erase(it); // 再删除 + size_t sz = it->size; + g_ub_store_mem_ranges.erase(it); return sz; } From e742367e1fbeea87b546e6f03948b0f352fb3e2e Mon Sep 17 00:00:00 2001 From: zchuango Date: Fri, 29 May 2026 03:41:43 +0000 Subject: [PATCH 15/16] refine the client service initTransferEngine for ub --- mooncake-store/src/client_service.cpp | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/mooncake-store/src/client_service.cpp b/mooncake-store/src/client_service.cpp index fa00bcb4a6..f69d25c6c8 100644 --- a/mooncake-store/src/client_service.cpp +++ b/mooncake-store/src/client_service.cpp @@ -570,14 +570,16 @@ ErrorCode Client::InitTransferEngine( return ErrorCode::INTERNAL_ERROR; } } else if (protocol == "ub") { - if (!device_names.has_value() || device_names->empty()) { - LOG(ERROR) << "ub protocol requires device names when auto " - "discovery is disabled"; - return ErrorCode::INVALID_PARAMS; - } auto deviceName = device_names.value_or("bonding_dev_0"); + LOG(ERROR) << "ub protocol entable device names is " << deviceName; auto devices = splitString(deviceName, ',', true); - transfer_engine_->getLocalTopology()->discover(devices); + auto topology = transfer_engine_->getLocalTopology(); + if (topology) { + topology->discover(devices); + LOG(INFO) << "Topology discovery complete with specified " + "devices. Found " + << topology->getHcaList().size() << " HCAs"; + } transport = transfer_engine_->installTransport("ub", nullptr); if (!transport) { LOG(ERROR) << "Failed to install ub transport with specified " From db7e32ec4e2954ce6329509b55d3adf6b613ff17 Mon Sep 17 00:00:00 2001 From: zchuango Date: Sat, 30 May 2026 12:08:42 +0000 Subject: [PATCH 16/16] fix the ub_transport_test bug --- .github/workflows/ci.yml | 2 +- .../kunpeng_transport/urma/mock_urma.cpp | 261 +++++++++++------- 2 files changed, 167 insertions(+), 96 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 27555d70cb..003c1c0098 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -184,7 +184,7 @@ jobs: cd build export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/usr/local/lib ldconfig -v || echo "always continue" - MC_METADATA_SERVER=http://127.0.0.1:8080/metadata DEFAULT_KV_LEASE_TTL=500 ctest -j --output-on-failure -E ub_transport_test + MC_METADATA_SERVER=http://127.0.0.1:8080/metadata DEFAULT_KV_LEASE_TTL=500 ctest -j --output-on-failure shell: bash - name: Drain HTTP E2E test diff --git a/mooncake-transfer-engine/src/transport/kunpeng_transport/urma/mock_urma.cpp b/mooncake-transfer-engine/src/transport/kunpeng_transport/urma/mock_urma.cpp index 020c3a6b19..c392577a6b 100644 --- a/mooncake-transfer-engine/src/transport/kunpeng_transport/urma/mock_urma.cpp +++ b/mooncake-transfer-engine/src/transport/kunpeng_transport/urma/mock_urma.cpp @@ -1,16 +1,26 @@ #include "urma_api.h" -#include -#include +#include +#include #include +#include +#include #include +#include +#include namespace { -std::mutex mock_mutex; + +struct JfcState { + std::mutex mutex; + std::deque pending_ctx; +}; + +std::shared_mutex g_rw_mutex; bool initialized = false; std::vector device_list; std::map context_map; std::map jfce_map; -std::map> jfc_user_ctx_map; +std::map jfc_state_map; std::map jfr_map; std::map seg_map; std::map jetty_map; @@ -32,10 +42,11 @@ urma_eid_info_t mock_eid_info = { .eid = {{0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F, 0x10}}, .eid_index = 0}; + } // namespace urma_status_t urma_init(urma_init_attr_t *init_attr) { - std::lock_guard lock(mock_mutex); + std::unique_lock lock(g_rw_mutex); if (initialized) { return URMA_EEXIST; } @@ -44,7 +55,7 @@ urma_status_t urma_init(urma_init_attr_t *init_attr) { } urma_status_t urma_uninit(void) { - std::lock_guard lock(mock_mutex); + std::unique_lock lock(g_rw_mutex); initialized = false; for (auto device : device_list) { delete device; @@ -52,7 +63,10 @@ urma_status_t urma_uninit(void) { device_list.clear(); context_map.clear(); jfce_map.clear(); - jfc_user_ctx_map.clear(); + for (auto &kv : jfc_state_map) { + delete kv.second; + } + jfc_state_map.clear(); jfr_map.clear(); seg_map.clear(); jetty_map.clear(); @@ -61,53 +75,81 @@ urma_status_t urma_uninit(void) { } urma_device_t **urma_get_device_list(int *num_devices) { - std::lock_guard lock(mock_mutex); - if (!initialized) { - *num_devices = 0; - return nullptr; - } - - if (device_list.empty()) { - urma_device_t *device = new urma_device_t; - strcpy(device->name, "mock_urma_device"); - strcpy(device->path, "/sys/class/infiniband/mock_device"); - device->type = URMA_TRANSPORT_UB; - device->ops = nullptr; - device->sysfs_dev = nullptr; - device_list.push_back(device); + { + std::shared_lock lock(g_rw_mutex); + if (!initialized) { + *num_devices = 0; + return nullptr; + } + if (!device_list.empty()) { + *num_devices = device_list.size(); + urma_device_t **devices = new urma_device_t *[device_list.size()]; + for (size_t i = 0; i < device_list.size(); ++i) { + devices[i] = device_list[i]; + } + return devices; + } } - - *num_devices = device_list.size(); - urma_device_t **devices = new urma_device_t *[device_list.size()]; - for (size_t i = 0; i < device_list.size(); ++i) { - devices[i] = device_list[i]; + { + std::unique_lock write_lock(g_rw_mutex); + if (!initialized) { + *num_devices = 0; + return nullptr; + } + if (device_list.empty()) { + urma_device_t *device = new urma_device_t; + strcpy(device->name, "mock_urma_device"); + strcpy(device->path, "/sys/class/infiniband/mock_device"); + device->type = URMA_TRANSPORT_UB; + device->ops = nullptr; + device->sysfs_dev = nullptr; + device_list.push_back(device); + } + *num_devices = device_list.size(); + urma_device_t **devices = new urma_device_t *[device_list.size()]; + for (size_t i = 0; i < device_list.size(); ++i) { + devices[i] = device_list[i]; + } + return devices; } - return devices; } urma_device_t *urma_get_device_by_name(const char *name) { - std::lock_guard lock(mock_mutex); - if (!initialized) { - return nullptr; - } - - if (device_list.empty()) { - auto *device = new urma_device_t; - strcpy(device->name, "mock_urma_device"); - strcpy(device->path, "/sys/class/infiniband/mock_device"); - device->type = URMA_TRANSPORT_UB; - device->ops = nullptr; - device->sysfs_dev = nullptr; - device_list.push_back(device); + { + std::shared_lock lock(g_rw_mutex); + if (!initialized) { + return nullptr; + } + if (!device_list.empty()) { + for (auto device : device_list) { + if (strcmp(device->name, name) == 0) { + return device; + } + } + return device_list[0]; + } } - - for (auto device : device_list) { - if (strcmp(device->name, name) == 0) { - return device; + { + std::unique_lock write_lock(g_rw_mutex); + if (!initialized) { + return nullptr; + } + if (device_list.empty()) { + auto *device = new urma_device_t; + strcpy(device->name, "mock_urma_device"); + strcpy(device->path, "/sys/class/infiniband/mock_device"); + device->type = URMA_TRANSPORT_UB; + device->ops = nullptr; + device->sysfs_dev = nullptr; + device_list.push_back(device); } + for (auto device : device_list) { + if (strcmp(device->name, name) == 0) { + return device; + } + } + return device_list.empty() ? nullptr : device_list[0]; } - - return device_list.empty() ? nullptr : device_list[0]; } void urma_free_device_list(urma_device_t **device_list) { @@ -118,7 +160,6 @@ void urma_free_device_list(urma_device_t **device_list) { urma_status_t urma_query_device(urma_device_t *device, urma_device_attr_t *attr) { - std::lock_guard lock(mock_mutex); if (!device || !attr) { return URMA_EINVAL; } @@ -129,7 +170,6 @@ urma_status_t urma_query_device(urma_device_t *device, } urma_eid_info_t *urma_get_eid_list(urma_device_t *device, uint32_t *eid_cnt) { - std::lock_guard lock(mock_mutex); if (!device || !eid_cnt) { return nullptr; } @@ -146,7 +186,7 @@ void urma_free_eid_list(urma_eid_info_t *eid_list) { } urma_context_t *urma_create_context(urma_device_t *device, uint32_t eid_index) { - std::lock_guard lock(mock_mutex); + std::unique_lock lock(g_rw_mutex); if (!device) { return nullptr; } @@ -158,7 +198,7 @@ urma_context_t *urma_create_context(urma_device_t *device, uint32_t eid_index) { } urma_status_t urma_delete_context(urma_context_t *ctx) { - std::lock_guard lock(mock_mutex); + std::unique_lock lock(g_rw_mutex); if (!ctx || context_map.find(ctx) == context_map.end()) { return URMA_EINVAL; } @@ -168,7 +208,7 @@ urma_status_t urma_delete_context(urma_context_t *ctx) { } urma_jfce_t *urma_create_jfce(urma_context_t *ctx) { - std::lock_guard lock(mock_mutex); + std::unique_lock lock(g_rw_mutex); if (!ctx || context_map.find(ctx) == context_map.end()) { return nullptr; } @@ -178,7 +218,7 @@ urma_jfce_t *urma_create_jfce(urma_context_t *ctx) { } urma_status_t urma_delete_jfce(urma_jfce_t *jfce) { - std::lock_guard lock(mock_mutex); + std::unique_lock lock(g_rw_mutex); if (!jfce || jfce_map.find(jfce) == jfce_map.end()) { return URMA_EINVAL; } @@ -188,7 +228,7 @@ urma_status_t urma_delete_jfce(urma_jfce_t *jfce) { } urma_jfc_t *urma_create_jfc(urma_context_t *ctx, urma_jfc_cfg_t *cfg) { - std::lock_guard lock(mock_mutex); + std::unique_lock lock(g_rw_mutex); if (!ctx || !cfg || context_map.find(ctx) == context_map.end()) { return nullptr; } @@ -201,22 +241,23 @@ urma_jfc_t *urma_create_jfc(urma_context_t *ctx, urma_jfc_cfg_t *cfg) { jfc->comp_events_acked = 0; jfc->async_events_acked = 0; jfc->jfc_cfg = *cfg; - jfc_user_ctx_map[jfc] = std::vector(); + jfc_state_map[jfc] = new JfcState(); return jfc; } urma_status_t urma_delete_jfc(urma_jfc_t *jfc) { - std::lock_guard lock(mock_mutex); - if (!jfc || jfc_user_ctx_map.find(jfc) == jfc_user_ctx_map.end()) { + std::unique_lock lock(g_rw_mutex); + if (!jfc || jfc_state_map.find(jfc) == jfc_state_map.end()) { return URMA_EINVAL; } - jfc_user_ctx_map.erase(jfc); + delete jfc_state_map[jfc]; + jfc_state_map.erase(jfc); delete jfc; return URMA_SUCCESS; } urma_jfr_t *urma_create_jfr(urma_context_t *ctx, urma_jfr_cfg_t *cfg) { - std::lock_guard lock(mock_mutex); + std::unique_lock lock(g_rw_mutex); if (!ctx || !cfg || context_map.find(ctx) == context_map.end()) { return nullptr; } @@ -226,7 +267,7 @@ urma_jfr_t *urma_create_jfr(urma_context_t *ctx, urma_jfr_cfg_t *cfg) { } urma_status_t urma_delete_jfr(urma_jfr_t *jfr) { - std::lock_guard lock(mock_mutex); + std::unique_lock lock(g_rw_mutex); if (!jfr || jfr_map.find(jfr) == jfr_map.end()) { return URMA_EINVAL; } @@ -236,7 +277,7 @@ urma_status_t urma_delete_jfr(urma_jfr_t *jfr) { } urma_target_seg_t *urma_register_seg(urma_context_t *ctx, urma_seg_cfg_t *cfg) { - std::lock_guard lock(mock_mutex); + std::unique_lock lock(g_rw_mutex); if (!ctx || !cfg || context_map.find(ctx) == context_map.end()) { return nullptr; } @@ -252,7 +293,7 @@ urma_target_seg_t *urma_register_seg(urma_context_t *ctx, urma_seg_cfg_t *cfg) { } urma_status_t urma_unregister_seg(urma_target_seg_t *seg) { - std::lock_guard lock(mock_mutex); + std::unique_lock lock(g_rw_mutex); if (!seg || seg_map.find(seg) == seg_map.end()) { return URMA_EINVAL; } @@ -264,7 +305,7 @@ urma_status_t urma_unregister_seg(urma_target_seg_t *seg) { urma_target_seg_t *urma_import_seg(urma_context_t *ctx, urma_seg_t *seg, urma_token_t *token_value, uint64_t addr, urma_import_seg_flag_t flag) { - std::lock_guard lock(mock_mutex); + std::unique_lock lock(g_rw_mutex); if (!ctx || !seg || !token_value || context_map.find(ctx) == context_map.end()) { return nullptr; @@ -277,7 +318,7 @@ urma_target_seg_t *urma_import_seg(urma_context_t *ctx, urma_seg_t *seg, } urma_status_t urma_unimport_seg(urma_target_seg_t *tseg) { - std::lock_guard lock(mock_mutex); + std::unique_lock lock(g_rw_mutex); if (!tseg || seg_map.find(tseg) == seg_map.end()) { return URMA_EINVAL; } @@ -288,8 +329,11 @@ urma_status_t urma_unimport_seg(urma_target_seg_t *tseg) { urma_status_t urma_get_async_event(urma_context_t *ctx, urma_async_event_t *event) { - std::lock_guard lock(mock_mutex); - if (!ctx || !event || context_map.find(ctx) == context_map.end()) { + if (!ctx || !event) { + return URMA_EINVAL; + } + std::shared_lock lock(g_rw_mutex); + if (context_map.find(ctx) == context_map.end()) { return URMA_EINVAL; } return URMA_ETIMEOUT; @@ -298,7 +342,7 @@ urma_status_t urma_get_async_event(urma_context_t *ctx, void urma_ack_async_event(urma_async_event_t *event) {} urma_jetty_t *urma_create_jetty(urma_context_t *ctx, urma_jetty_cfg_t *cfg) { - std::lock_guard lock(mock_mutex); + std::unique_lock lock(g_rw_mutex); if (!ctx || !cfg || context_map.find(ctx) == context_map.end()) { return nullptr; } @@ -314,7 +358,7 @@ urma_jetty_t *urma_create_jetty(urma_context_t *ctx, urma_jetty_cfg_t *cfg) { } urma_status_t urma_delete_jetty(urma_jetty_t *jetty) { - std::lock_guard lock(mock_mutex); + std::unique_lock lock(g_rw_mutex); if (!jetty || jetty_map.find(jetty) == jetty_map.end()) { return URMA_EINVAL; } @@ -324,7 +368,7 @@ urma_status_t urma_delete_jetty(urma_jetty_t *jetty) { } urma_status_t urma_unbind_jetty(urma_jetty_t *jetty) { - std::lock_guard lock(mock_mutex); + std::unique_lock lock(g_rw_mutex); if (!jetty || jetty_map.find(jetty) == jetty_map.end()) { return URMA_EINVAL; } @@ -335,7 +379,7 @@ urma_status_t urma_unbind_jetty(urma_jetty_t *jetty) { urma_target_jetty_t *urma_import_jetty(urma_context_t *ctx, urma_rjetty_t *rjetty, urma_token_t *token_value) { - std::lock_guard lock(mock_mutex); + std::unique_lock lock(g_rw_mutex); if (!ctx || !rjetty || !token_value || context_map.find(ctx) == context_map.end()) { return nullptr; @@ -348,7 +392,7 @@ urma_target_jetty_t *urma_import_jetty(urma_context_t *ctx, } urma_status_t urma_unimport_jetty(urma_target_jetty_t *tjetty) { - std::lock_guard lock(mock_mutex); + std::unique_lock lock(g_rw_mutex); if (!tjetty || target_jetty_map.find(tjetty) == target_jetty_map.end()) { return URMA_EINVAL; } @@ -359,7 +403,7 @@ urma_status_t urma_unimport_jetty(urma_target_jetty_t *tjetty) { urma_status_t urma_bind_jetty(urma_jetty_t *jetty, urma_target_jetty_t *tjetty) { - std::lock_guard lock(mock_mutex); + std::unique_lock lock(g_rw_mutex); if (!jetty || !tjetty || jetty_map.find(jetty) == jetty_map.end() || target_jetty_map.find(tjetty) == target_jetty_map.end()) { return URMA_EINVAL; @@ -369,7 +413,7 @@ urma_status_t urma_bind_jetty(urma_jetty_t *jetty, } urma_status_t urma_modify_jetty(urma_jetty_t *jetty, urma_jetty_attr_t *attr) { - std::lock_guard lock(mock_mutex); + std::shared_lock lock(g_rw_mutex); if (!jetty || !attr || jetty_map.find(jetty) == jetty_map.end()) { return URMA_EINVAL; } @@ -378,19 +422,37 @@ urma_status_t urma_modify_jetty(urma_jetty_t *jetty, urma_jetty_attr_t *attr) { urma_status_t urma_post_jetty_send_wr(urma_jetty_t *jetty, urma_jfs_wr_t *wr, urma_jfs_wr_t **bad_wr) { - std::lock_guard lock(mock_mutex); - if (!jetty || !wr || jetty_map.find(jetty) == jetty_map.end()) { - if (bad_wr) { - *bad_wr = wr; + { + std::shared_lock lock(g_rw_mutex); + if (!jetty || !wr || jetty_map.find(jetty) == jetty_map.end()) { + if (bad_wr) { + *bad_wr = wr; + } + return URMA_EINVAL; } - return URMA_EINVAL; } - urma_jfs_wr_t *current_wr = wr; - while (current_wr) { - jfc_user_ctx_map[jetty->jetty_cfg.jfs_cfg.jfc].push_back( - current_wr->user_ctx); - current_wr = current_wr->next; + urma_jfc_t *jfc = jetty->jetty_cfg.jfs_cfg.jfc; + JfcState *state = nullptr; + { + std::shared_lock lock(g_rw_mutex); + auto it = jfc_state_map.find(jfc); + if (it == jfc_state_map.end()) { + if (bad_wr) { + *bad_wr = wr; + } + return URMA_EINVAL; + } + state = it->second; + } + + { + std::lock_guard jfc_lock(state->mutex); + urma_jfs_wr_t *current_wr = wr; + while (current_wr) { + state->pending_ctx.push_back(current_wr->user_ctx); + current_wr = current_wr->next; + } } if (bad_wr) { @@ -400,18 +462,27 @@ urma_status_t urma_post_jetty_send_wr(urma_jetty_t *jetty, urma_jfs_wr_t *wr, } int urma_poll_jfc(urma_jfc_t *jfc, int num_entries, urma_cr_t *cr_list) { - std::lock_guard lock(mock_mutex); - if (!jfc || !cr_list || - jfc_user_ctx_map.find(jfc) == jfc_user_ctx_map.end()) { - return -1; - } - int available = jfc_user_ctx_map[jfc].size(); - int num_completed = std::min(num_entries, available); - for (int i = 0; i < num_completed; ++i) { - cr_list[i].status = URMA_CR_SUCCESS; - cr_list[i].user_ctx = jfc_user_ctx_map[jfc][i]; - } - jfc_user_ctx_map[jfc].erase(jfc_user_ctx_map[jfc].begin(), - jfc_user_ctx_map[jfc].begin() + num_completed); + JfcState *state = nullptr; + { + std::shared_lock lock(g_rw_mutex); + auto it = jfc_state_map.find(jfc); + if (it == jfc_state_map.end()) { + return -1; + } + state = it->second; + } + + int num_completed = 0; + { + std::lock_guard jfc_lock(state->mutex); + int available = static_cast(state->pending_ctx.size()); + num_completed = std::min(num_entries, available); + for (int i = 0; i < num_completed; ++i) { + cr_list[i].status = URMA_CR_SUCCESS; + cr_list[i].user_ctx = state->pending_ctx[i]; + } + state->pending_ctx.erase(state->pending_ctx.begin(), + state->pending_ctx.begin() + num_completed); + } return num_completed; }