Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ jobs:
cd build
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/usr/local/lib
ldconfig -v || echo "always continue"
MC_METADATA_SERVER=http://127.0.0.1:8080/metadata DEFAULT_KV_LEASE_TTL=500 ctest -j --output-on-failure -E ub_transport_test
MC_METADATA_SERVER=http://127.0.0.1:8080/metadata DEFAULT_KV_LEASE_TTL=500 ctest -j --output-on-failure
shell: bash

- name: Drain HTTP E2E test
Expand Down
2 changes: 1 addition & 1 deletion mooncake-common/FindUrma.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ include(FetchContent)
FetchContent_Declare(
urma
GIT_REPOSITORY https://atomgit.com/openeuler/umdk.git
GIT_TAG v25.12.0
GIT_TAG v25.12.0.B081
)

FetchContent_MakeAvailable(urma)
Expand Down
4 changes: 4 additions & 0 deletions mooncake-p2p-store/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ if [ -d "/usr/local/musa/lib" ]; then
EXT_LDFLAGS+=" -L/usr/local/musa/lib -lmusart"
fi

if [ -e "/usr/lib64/liburma.so" ]; then
EXT_LDFLAGS+=" -L/usr/lib64 -lurma"
fi

if [ "$USE_ETCD" = "ON" ]; then
if [ "$USE_ETCD_LEGACY" = "ON" ]; then
EXT_LDFLAGS+=" -letcd-cpp-api -lprotobuf -lgrpc++ -lgrpc"
Expand Down
11 changes: 11 additions & 0 deletions mooncake-store/include/real_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -766,11 +766,22 @@ class RealClient : public PyClient {
}
};

struct UbSegmentDeleter {
size_t size = 0;
std::string protocol = "ub";
void operator()(void *ptr) const {
if (ptr && size > 0) {
free_memory(protocol.c_str(), ptr);
}
}
};

std::vector<std::unique_ptr<void, HugepageSegmentDeleter>>
hugepage_segment_ptrs_;
std::vector<std::unique_ptr<void, SegmentDeleter>> segment_ptrs_;
std::vector<std::unique_ptr<void, AscendSegmentDeleter>>
ascend_segment_ptrs_;
std::vector<std::unique_ptr<void, UbSegmentDeleter>> ub_segment_ptrs_;
std::string protocol;
std::string device_name;
std::string local_hostname;
Expand Down
17 changes: 17 additions & 0 deletions mooncake-store/src/client_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -569,6 +569,23 @@ ErrorCode Client::InitTransferEngine(
LOG(ERROR) << "Failed to install CXL transport";
return ErrorCode::INTERNAL_ERROR;
}
} else if (protocol == "ub") {
auto deviceName = device_names.value_or("bonding_dev_0");
LOG(ERROR) << "ub protocol entable device names is " << deviceName;
auto devices = splitString(deviceName, ',', true);
auto topology = transfer_engine_->getLocalTopology();
if (topology) {
topology->discover(devices);
LOG(INFO) << "Topology discovery complete with specified "
"devices. Found "
<< topology->getHcaList().size() << " HCAs";
}
transport = transfer_engine_->installTransport("ub", nullptr);
if (!transport) {
LOG(ERROR) << "Failed to install ub transport with specified "
"devices";
return ErrorCode::INTERNAL_ERROR;
}
} else {
LOG(ERROR) << "unsupported_protocol protocol=" << protocol;
return ErrorCode::INVALID_PARAMS;
Expand Down
4 changes: 4 additions & 0 deletions mooncake-store/src/real_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -850,6 +850,9 @@ tl::expected<void, ErrorCode> RealClient::setup_internal(
if (this->protocol == "ascend" || this->protocol == "ubshmem") {
ascend_segment_ptrs_.emplace_back(
ptr, AscendSegmentDeleter{this->protocol});
} else if (this->protocol == "ub") {
ub_segment_ptrs_.emplace_back(ptr,
UbSegmentDeleter{mapped_size});
} else if (!seg_numa_nodes.empty() || should_use_hugepage) {
// NUMA-segmented or hugepage: track as mmap allocation for
// munmap cleanup
Expand Down Expand Up @@ -1106,6 +1109,7 @@ tl::expected<void, ErrorCode> RealClient::tearDownAll_internal() {
client_buffer_allocator_.reset();
port_binder_.reset();
hugepage_segment_ptrs_.clear();
ub_segment_ptrs_.clear();
segment_ptrs_.clear();
local_hostname = "";
device_name = "";
Expand Down
13 changes: 12 additions & 1 deletion mooncake-store/src/utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#include "mmap_arena.h"
#include "config.h"
#include "common.h"
#include "ub_allocator.h"

#include <Slab.h>
#include <gflags/gflags.h>
Expand Down Expand Up @@ -117,6 +118,11 @@ void *allocate_buffer_allocator_memory(size_t total_size,
return ascend_allocate_memory(total_size, protocol);
}
#endif
#if defined(USE_UB)
if (protocol == "ub") {
return mooncake::ub_allocate_memory(alignment, total_size);
}
#endif
#ifdef USE_NOF
if (use_spdk_dma && total_size > 0) {
return mooncake::SpdkWrapper::GetInstance().Alloc(total_size, alignment,
Expand Down Expand Up @@ -371,7 +377,12 @@ void free_memory(const std::string &protocol, void *ptr) {
return ascend_free_memory(protocol, ptr);
}
#endif

#if defined(USE_UB)
if (protocol == "ub") {
mooncake::ub_free_memory(ptr);
return;
}
#endif
free(ptr);
}

Expand Down
1 change: 1 addition & 0 deletions mooncake-transfer-engine/include/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,6 @@ install(FILES multi_transport.h DESTINATION include)
install(FILES topology.h DESTINATION include)
install(FILES transfer_engine.h DESTINATION include)
install(FILES transfer_metadata.h DESTINATION include)
install(FILES ub_allocator.h DESTINATION include)
install(FILES common/base/status.h DESTINATION include/common/base)
install(FILES transport/transport.h DESTINATION include/transport)
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ class UbEndpointStore {
virtual std::shared_ptr<UbEndPoint> insertEndpoint(
const std::string& peer_nic_path, UbContext* context) = 0;
virtual int deleteEndpoint(const std::string& peer_nic_path) = 0;
virtual int deleteEndpointByPtr(UbEndPoint* point_ptr) = 0;
virtual void evictEndpoint() = 0;
virtual void reclaimEndpoint() = 0;
virtual size_t getSize() = 0;
Expand All @@ -102,6 +103,7 @@ class UbSIEVEEndpointStore : public UbEndpointStore {
std::shared_ptr<UbEndPoint> insertEndpoint(const std::string& peer_nic_path,
UbContext* context) override;
int deleteEndpoint(const std::string& peer_nic_path) override;
int deleteEndpointByPtr(UbEndPoint* point_ptr) override;
void evictEndpoint() override;
void reclaimEndpoint() override;
size_t getSize() override;
Expand Down Expand Up @@ -229,6 +231,10 @@ class UbContext {
return endpoint_store_->deleteEndpoint(peer_nic_path);
}

int deleteEndpointByPtr(UbEndPoint* point_ptr) {
return endpoint_store_->deleteEndpointByPtr(point_ptr);
}

int disconnectAllEndpoints() { return endpoint_store_->disconnect(); }

// Device name, such as `mlx5_3`
Expand Down
1 change: 1 addition & 0 deletions mooncake-transfer-engine/include/transport/transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ class Transport {
uint32_t max_retry_cnt;
void *r_seg;
void *l_seg;
void *endpoint;
} ub;
struct {
void *dest_addr;
Expand Down
11 changes: 11 additions & 0 deletions mooncake-transfer-engine/include/ub_allocator.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
#pragma once

namespace mooncake {

void* ub_allocate_memory(size_t alignment, size_t total_size);

void ub_free_memory(void* ptr);

bool ub_is_store_memory(void* addr, size_t length);

} // namespace mooncake
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
file(GLOB UB_SOURCES "*.cpp" "urma/urma_endpoint.cpp")
file(GLOB UB_SOURCES "*.cpp" "urma/urma_endpoint.cpp" "ub_allocator.cpp")

# Check if liburma.so exists
find_library(URMA_LIBRARY urma PATHS /usr/lib64)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
#include <algorithm>
#include <cstddef>
#include <mutex>
#include <unordered_map>
#include <vector>
#include <glog/logging.h>
#include <numa.h>

#include "ub_allocator.h"

namespace mooncake {
struct UbStoreMemRange {
void* base;
size_t size;
};
std::mutex g_ub_store_mem_mutex;
std::vector<UbStoreMemRange> g_ub_store_mem_ranges;

size_t remove_store_memory_range(void* ptr) {
std::lock_guard<std::mutex> store_lock(g_ub_store_mem_mutex);

auto it = std::find_if(
g_ub_store_mem_ranges.begin(), g_ub_store_mem_ranges.end(),
[ptr](const UbStoreMemRange& range) { return range.base == ptr; });

if (it == g_ub_store_mem_ranges.end()) {
LOG(ERROR) << "failed for UB protocol, addr at " << ptr;
return 0;
}

size_t sz = it->size;
g_ub_store_mem_ranges.erase(it);
return sz;
}

void* ub_allocate_memory(size_t alignment, size_t total_size) {
void* ptr = numa_alloc_local(total_size);
if (!ptr) {
LOG(ERROR) << "failed for UB protocol, size=" << total_size
<< ", alignment : " << alignment;
return nullptr;
}
LOG(INFO) << "UB: allocated total size : " << total_size
<< ", alignment : " << alignment << " addr at " << ptr;

std::lock_guard<std::mutex> store_lock(g_ub_store_mem_mutex);
g_ub_store_mem_ranges.push_back({ptr, total_size});

return ptr;
}

void ub_free_memory(void* ptr) {
if (!ptr) {
return;
}
auto size = remove_store_memory_range(ptr);
numa_free(ptr, size);
LOG(INFO) << "UB: freed bytes at " << ptr;
}

bool ub_is_store_memory(void* addr, size_t length) {
if (!addr || length == 0) return false;
auto addr_start = reinterpret_cast<uintptr_t>(addr);
uintptr_t addr_end = addr_start + length;
std::lock_guard<std::mutex> lock(g_ub_store_mem_mutex);
for (const auto& range : g_ub_store_mem_ranges) {
auto range_start = reinterpret_cast<uintptr_t>(range.base);
uintptr_t range_end = range_start + range.size;
if (addr_start >= range_start && addr_end <= range_end) {
return true;
}
}
return false;
}

} // namespace mooncake
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,29 @@ int UbSIEVEEndpointStore::deleteEndpoint(const std::string& peer_nic_path) {
return 0;
}

int UbSIEVEEndpointStore::deleteEndpointByPtr(UbEndPoint* point_ptr) {
RWSpinlock::WriteGuard guard(endpoint_map_lock_);
for (auto iter = endpoint_map_.begin(); iter != endpoint_map_.end();
iter++) {
if (iter->second.first.get() == point_ptr) {
std::string peer_nic_path = iter->first;
iter->second.first->deconstruct();
waiting_list_len_++;
waiting_list_.insert(iter->second.first);
auto fifo_iter = fifo_map_[peer_nic_path];
if (hand_.has_value() && hand_.value() == fifo_iter) {
fifo_iter == fifo_list_.begin() ? hand_ = std::nullopt
: hand_ = std::prev(fifo_iter);
}
fifo_list_.erase(fifo_iter);
fifo_map_.erase(peer_nic_path);
endpoint_map_.erase(iter);
return 0;
}
}
return 0;
}

void UbSIEVEEndpointStore::evictEndpoint() {
if (fifo_list_.empty()) {
return;
Expand Down Expand Up @@ -246,6 +269,12 @@ int UbWorkerPool::submitPostSend(
auto targetSegment =
peer_segment_desc->buffers[buffer_id].tseg[device_id];
slice->ub.r_seg = context_.retrieveRemoteSeg(targetSegment);
if (!slice->ub.r_seg) {
LOG(ERROR) << "[UB] retrieveRemoteSeg failed for target_id="
<< slice->target_id << " buffer_id=" << buffer_id
<< " device_id" << device_id
<< " dest_addr=" << slice->ub.dest_addr;
}
auto peer_nic_path =
MakeNicPath(peer_segment_desc->name,
peer_segment_desc->devices[device_id].name);
Expand Down Expand Up @@ -333,7 +362,7 @@ void UbWorkerPool::performPostSend(int thread_id) {
}
if (!endpoint->active()) {
if (endpoint->inactiveTime() > 1.0)
context_.deleteEndpoint(entry.first);
context_.deleteEndpointByPtr(endpoint.get());
// enable for re-establishation
for (auto& slice : entry.second) failed_slice_list.push_back(slice);
entry.second.clear();
Expand All @@ -355,6 +384,10 @@ void UbWorkerPool::performPostSend(int thread_id) {
entry.second.clear();
continue;
}
// Set endpoint pointer for each slice before submitting
for (auto& slice : entry.second) {
slice->ub.endpoint = endpoint.get();
}
endpoint->submitPostSend(entry.second, failed_slice_list);
#endif
}
Expand Down Expand Up @@ -392,9 +425,12 @@ void UbWorkerPool::performPoll(int thread_id) {
<< context_.nicPath() << ", mark it inactive";
context_.set_active(false);
}
context_.deleteEndpoint(slice->peer_nic_path);
slice->ub.retry_cnt++;
if (slice->ub.retry_cnt >= slice->ub.max_retry_cnt) {
if (slice->ub.endpoint) {
auto ptr = static_cast<UbEndPoint*>(slice->ub.endpoint);
context_.deleteEndpointByPtr(ptr);
}
slice->markFailed();
processed_slice_count_++;
} else {
Expand Down
Loading
Loading