Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
15f8d18
entable the ub transport for mooncake store
zchuango May 13, 2026
d280957
modify some bug for client buffer
zchuango May 14, 2026
01953ce
add the ub transport allocator
zchuango May 16, 2026
1cd0df5
modify some bug for client buffer for free
zchuango May 19, 2026
4d93a48
Merge remote-tracking branch 'origin/ub_transport_store_dev' into ub_…
zchuango May 19, 2026
691321a
modify the urma version
zchuango May 20, 2026
b65dcc4
refine the ub allocator for client buffer
zchuango May 20, 2026
783dd26
Merge remote-tracking branch 'origin/ub_transport_store_dev' into ub_…
zchuango May 20, 2026
050b58c
fix the allocator free
zchuango May 21, 2026
7ef4b6f
fix the bug for mooncake ub allocator
zchuango May 21, 2026
c59670d
optimize the urma_endpoint polling method
zchuango May 21, 2026
96728a9
Merge branch 'main' into ub_transport_store_dev
zchuango May 23, 2026
ed39ab9
add the format for store
zchuango May 23, 2026
b62637b
Update mooncake-transfer-engine/src/transport/kunpeng_transport/ub_al…
zchuango May 23, 2026
70cf348
modify the condition bug for dev_attr_.port.state
zchuango May 23, 2026
7a198b8
add the deletePointByPtr method
zchuango May 26, 2026
4bf7ce9
add the deletePointByPtr method
zchuango May 26, 2026
804f5c8
Merge branch 'main' into ub_transport_store_dev
zchuango May 28, 2026
0bc3c62
Merge branch 'ub_transport_store_dev' into ub_transport_test
zchuango May 28, 2026
8646f02
resolve the conflicts and optimize the code comment
zchuango May 28, 2026
725cad0
Merge branch 'ub_transport_store_dev' into ub_transport_test
zchuango May 29, 2026
7768f15
fix the ub_transport_test bug
zchuango May 29, 2026
e742367
refine the client service initTransferEngine for ub
zchuango May 29, 2026
154df78
Merge branch 'ub_transport_store_dev' into ub_transport_test
zchuango May 30, 2026
a862d10
resolve the deadlock for urma_mock
zchuango May 30, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ jobs:
cd build
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/usr/local/lib
ldconfig -v || echo "always continue"
MC_METADATA_SERVER=http://127.0.0.1:8080/metadata DEFAULT_KV_LEASE_TTL=500 ctest -j --output-on-failure -E ub_transport_test
MC_METADATA_SERVER=http://127.0.0.1:8080/metadata DEFAULT_KV_LEASE_TTL=500 ctest -j --output-on-failure
shell: bash

- name: Drain HTTP E2E test
Expand Down
2 changes: 1 addition & 1 deletion mooncake-common/FindUrma.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ include(FetchContent)
FetchContent_Declare(
urma
GIT_REPOSITORY https://atomgit.com/openeuler/umdk.git
GIT_TAG v25.12.0
GIT_TAG v25.12.0.B081
)

FetchContent_MakeAvailable(urma)
Expand Down
4 changes: 4 additions & 0 deletions mooncake-p2p-store/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ if [ -d "/usr/local/musa/lib" ]; then
EXT_LDFLAGS+=" -L/usr/local/musa/lib -lmusart"
fi

if [ -e "/usr/lib64/liburma.so" ]; then
EXT_LDFLAGS+=" -L/usr/lib64 -lurma"
fi

if [ "$USE_ETCD" = "ON" ]; then
if [ "$USE_ETCD_LEGACY" = "ON" ]; then
EXT_LDFLAGS+=" -letcd-cpp-api -lprotobuf -lgrpc++ -lgrpc"
Expand Down
11 changes: 11 additions & 0 deletions mooncake-store/include/real_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -766,11 +766,22 @@ class RealClient : public PyClient {
}
};

struct UbSegmentDeleter {
size_t size = 0;
std::string protocol = "ub";
void operator()(void *ptr) const {
if (ptr && size > 0) {
free_memory(protocol.c_str(), ptr);
}
}
};

std::vector<std::unique_ptr<void, HugepageSegmentDeleter>>
hugepage_segment_ptrs_;
std::vector<std::unique_ptr<void, SegmentDeleter>> segment_ptrs_;
std::vector<std::unique_ptr<void, AscendSegmentDeleter>>
ascend_segment_ptrs_;
std::vector<std::unique_ptr<void, UbSegmentDeleter>> ub_segment_ptrs_;
std::string protocol;
std::string device_name;
std::string local_hostname;
Expand Down
17 changes: 17 additions & 0 deletions mooncake-store/src/client_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -569,6 +569,23 @@ ErrorCode Client::InitTransferEngine(
LOG(ERROR) << "Failed to install CXL transport";
return ErrorCode::INTERNAL_ERROR;
}
} else if (protocol == "ub") {
auto deviceName = device_names.value_or("bonding_dev_0");
LOG(ERROR) << "ub protocol entable device names is " << deviceName;
auto devices = splitString(deviceName, ',', true);
auto topology = transfer_engine_->getLocalTopology();
if (topology) {
topology->discover(devices);
LOG(INFO) << "Topology discovery complete with specified "
"devices. Found "
<< topology->getHcaList().size() << " HCAs";
}
transport = transfer_engine_->installTransport("ub", nullptr);
if (!transport) {
LOG(ERROR) << "Failed to install ub transport with specified "
"devices";
return ErrorCode::INTERNAL_ERROR;
}
} else {
LOG(ERROR) << "unsupported_protocol protocol=" << protocol;
return ErrorCode::INVALID_PARAMS;
Expand Down
4 changes: 4 additions & 0 deletions mooncake-store/src/real_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -850,6 +850,9 @@ tl::expected<void, ErrorCode> RealClient::setup_internal(
if (this->protocol == "ascend" || this->protocol == "ubshmem") {
ascend_segment_ptrs_.emplace_back(
ptr, AscendSegmentDeleter{this->protocol});
} else if (this->protocol == "ub") {
ub_segment_ptrs_.emplace_back(ptr,
UbSegmentDeleter{mapped_size});
} else if (!seg_numa_nodes.empty() || should_use_hugepage) {
// NUMA-segmented or hugepage: track as mmap allocation for
// munmap cleanup
Expand Down Expand Up @@ -1106,6 +1109,7 @@ tl::expected<void, ErrorCode> RealClient::tearDownAll_internal() {
client_buffer_allocator_.reset();
port_binder_.reset();
hugepage_segment_ptrs_.clear();
ub_segment_ptrs_.clear();
segment_ptrs_.clear();
local_hostname = "";
device_name = "";
Expand Down
13 changes: 12 additions & 1 deletion mooncake-store/src/utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#include "mmap_arena.h"
#include "config.h"
#include "common.h"
#include "ub_allocator.h"

#include <Slab.h>
#include <gflags/gflags.h>
Expand Down Expand Up @@ -117,6 +118,11 @@ void *allocate_buffer_allocator_memory(size_t total_size,
return ascend_allocate_memory(total_size, protocol);
}
#endif
#if defined(USE_UB)
if (protocol == "ub") {
return mooncake::ub_allocate_memory(alignment, total_size);
}
#endif
#ifdef USE_NOF
if (use_spdk_dma && total_size > 0) {
return mooncake::SpdkWrapper::GetInstance().Alloc(total_size, alignment,
Expand Down Expand Up @@ -371,7 +377,12 @@ void free_memory(const std::string &protocol, void *ptr) {
return ascend_free_memory(protocol, ptr);
}
#endif

#if defined(USE_UB)
if (protocol == "ub") {
mooncake::ub_free_memory(ptr);
return;
}
#endif
free(ptr);
}

Expand Down
1 change: 1 addition & 0 deletions mooncake-transfer-engine/include/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,6 @@ install(FILES multi_transport.h DESTINATION include)
install(FILES topology.h DESTINATION include)
install(FILES transfer_engine.h DESTINATION include)
install(FILES transfer_metadata.h DESTINATION include)
install(FILES ub_allocator.h DESTINATION include)
install(FILES common/base/status.h DESTINATION include/common/base)
install(FILES transport/transport.h DESTINATION include/transport)
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ class UbEndpointStore {
virtual std::shared_ptr<UbEndPoint> insertEndpoint(
const std::string& peer_nic_path, UbContext* context) = 0;
virtual int deleteEndpoint(const std::string& peer_nic_path) = 0;
virtual int deleteEndpointByPtr(UbEndPoint* point_ptr) = 0;
virtual void evictEndpoint() = 0;
virtual void reclaimEndpoint() = 0;
virtual size_t getSize() = 0;
Expand All @@ -102,6 +103,7 @@ class UbSIEVEEndpointStore : public UbEndpointStore {
std::shared_ptr<UbEndPoint> insertEndpoint(const std::string& peer_nic_path,
UbContext* context) override;
int deleteEndpoint(const std::string& peer_nic_path) override;
int deleteEndpointByPtr(UbEndPoint* point_ptr) override;
void evictEndpoint() override;
void reclaimEndpoint() override;
size_t getSize() override;
Expand Down Expand Up @@ -229,6 +231,10 @@ class UbContext {
return endpoint_store_->deleteEndpoint(peer_nic_path);
}

int deleteEndpointByPtr(UbEndPoint* point_ptr) {
return endpoint_store_->deleteEndpointByPtr(point_ptr);
}

int disconnectAllEndpoints() { return endpoint_store_->disconnect(); }

// Device name, such as `mlx5_3`
Expand Down
1 change: 1 addition & 0 deletions mooncake-transfer-engine/include/transport/transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ class Transport {
uint32_t max_retry_cnt;
void *r_seg;
void *l_seg;
void *endpoint;
} ub;
struct {
void *dest_addr;
Expand Down
11 changes: 11 additions & 0 deletions mooncake-transfer-engine/include/ub_allocator.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
#pragma once

namespace mooncake {

void* ub_allocate_memory(size_t alignment, size_t total_size);

void ub_free_memory(void* ptr);

bool ub_is_store_memory(void* addr, size_t length);

} // namespace mooncake
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
file(GLOB UB_SOURCES "*.cpp" "urma/urma_endpoint.cpp")
file(GLOB UB_SOURCES "*.cpp" "urma/urma_endpoint.cpp" "ub_allocator.cpp")

# Check if liburma.so exists
find_library(URMA_LIBRARY urma PATHS /usr/lib64)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
#include <algorithm>
#include <cstddef>
#include <mutex>
#include <unordered_map>
#include <vector>
#include <glog/logging.h>
#include <numa.h>

#include "ub_allocator.h"

namespace mooncake {
struct UbStoreMemRange {
void* base;
size_t size;
};
std::mutex g_ub_store_mem_mutex;
std::vector<UbStoreMemRange> g_ub_store_mem_ranges;

size_t remove_store_memory_range(void* ptr) {
std::lock_guard<std::mutex> store_lock(g_ub_store_mem_mutex);

auto it = std::find_if(
g_ub_store_mem_ranges.begin(), g_ub_store_mem_ranges.end(),
[ptr](const UbStoreMemRange& range) { return range.base == ptr; });

if (it == g_ub_store_mem_ranges.end()) {
LOG(ERROR) << "failed for UB protocol, addr at " << ptr;
return 0;
}

size_t sz = it->size;
g_ub_store_mem_ranges.erase(it);
return sz;
}

void* ub_allocate_memory(size_t alignment, size_t total_size) {
void* ptr = numa_alloc_local(total_size);
if (!ptr) {
LOG(ERROR) << "failed for UB protocol, size=" << total_size
<< ", alignment : " << alignment;
return nullptr;
}
LOG(INFO) << "UB: allocated total size : " << total_size
<< ", alignment : " << alignment << " addr at " << ptr;

std::lock_guard<std::mutex> store_lock(g_ub_store_mem_mutex);
g_ub_store_mem_ranges.push_back({ptr, total_size});

return ptr;
}

void ub_free_memory(void* ptr) {
if (!ptr) {
return;
}
auto size = remove_store_memory_range(ptr);
numa_free(ptr, size);
LOG(INFO) << "UB: freed bytes at " << ptr;
}

bool ub_is_store_memory(void* addr, size_t length) {
if (!addr || length == 0) return false;
auto addr_start = reinterpret_cast<uintptr_t>(addr);
uintptr_t addr_end = addr_start + length;
std::lock_guard<std::mutex> lock(g_ub_store_mem_mutex);
for (const auto& range : g_ub_store_mem_ranges) {
auto range_start = reinterpret_cast<uintptr_t>(range.base);
uintptr_t range_end = range_start + range.size;
if (addr_start >= range_start && addr_end <= range_end) {
return true;
}
}
return false;
}

} // namespace mooncake
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,29 @@ int UbSIEVEEndpointStore::deleteEndpoint(const std::string& peer_nic_path) {
return 0;
}

int UbSIEVEEndpointStore::deleteEndpointByPtr(UbEndPoint* point_ptr) {
RWSpinlock::WriteGuard guard(endpoint_map_lock_);
for (auto iter = endpoint_map_.begin(); iter != endpoint_map_.end();
iter++) {
if (iter->second.first.get() == point_ptr) {
std::string peer_nic_path = iter->first;
iter->second.first->deconstruct();
waiting_list_len_++;
waiting_list_.insert(iter->second.first);
auto fifo_iter = fifo_map_[peer_nic_path];
if (hand_.has_value() && hand_.value() == fifo_iter) {
fifo_iter == fifo_list_.begin() ? hand_ = std::nullopt
: hand_ = std::prev(fifo_iter);
}
fifo_list_.erase(fifo_iter);
fifo_map_.erase(peer_nic_path);
endpoint_map_.erase(iter);
return 0;
}
}
return 0;
}

void UbSIEVEEndpointStore::evictEndpoint() {
if (fifo_list_.empty()) {
return;
Expand Down Expand Up @@ -246,6 +269,12 @@ int UbWorkerPool::submitPostSend(
auto targetSegment =
peer_segment_desc->buffers[buffer_id].tseg[device_id];
slice->ub.r_seg = context_.retrieveRemoteSeg(targetSegment);
if (!slice->ub.r_seg) {
LOG(ERROR) << "[UB] retrieveRemoteSeg failed for target_id="
<< slice->target_id << " buffer_id=" << buffer_id
<< " device_id" << device_id
<< " dest_addr=" << slice->ub.dest_addr;
}
auto peer_nic_path =
MakeNicPath(peer_segment_desc->name,
peer_segment_desc->devices[device_id].name);
Expand Down Expand Up @@ -333,7 +362,7 @@ void UbWorkerPool::performPostSend(int thread_id) {
}
if (!endpoint->active()) {
if (endpoint->inactiveTime() > 1.0)
context_.deleteEndpoint(entry.first);
context_.deleteEndpointByPtr(endpoint.get());
// enable for re-establishation
for (auto& slice : entry.second) failed_slice_list.push_back(slice);
entry.second.clear();
Expand All @@ -355,6 +384,10 @@ void UbWorkerPool::performPostSend(int thread_id) {
entry.second.clear();
continue;
}
// Set endpoint pointer for each slice before submitting
for (auto& slice : entry.second) {
slice->ub.endpoint = endpoint.get();
}
endpoint->submitPostSend(entry.second, failed_slice_list);
#endif
}
Expand Down Expand Up @@ -392,9 +425,12 @@ void UbWorkerPool::performPoll(int thread_id) {
<< context_.nicPath() << ", mark it inactive";
context_.set_active(false);
}
context_.deleteEndpoint(slice->peer_nic_path);
slice->ub.retry_cnt++;
if (slice->ub.retry_cnt >= slice->ub.max_retry_cnt) {
if (slice->ub.endpoint) {
auto ptr = static_cast<UbEndPoint*>(slice->ub.endpoint);
context_.deleteEndpointByPtr(ptr);
}
slice->markFailed();
processed_slice_count_++;
} else {
Expand Down
Loading
Loading