Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion include/nebula/mclient/MetaClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <thread>
#include <unordered_map>
#include <vector>
#include <mutex>

#include "common/datatypes/HostAddr.h"
#include "common/thrift/ThriftTypes.h"
Expand Down Expand Up @@ -108,14 +109,19 @@ class MetaClient {
void getResponse(Request req,
RemoteFunc remoteFunc,
RespGenerator respGen,
folly::Promise<std::pair<bool, Response>> pro);
folly::Promise<std::pair<bool, Response>> pro,
int32_t retry = 0,
int32_t retry_limit = 3);
void updateLeader(HostAddr leader = HostAddr());

private:
std::vector<HostAddr> metaAddrs_;
MConfig mConfig_;
SpaceNameIdMap spaceIndexByName_;
SpaceEdgeNameTypeMap spaceEdgeIndexByName_;
SpaceTagNameTypeMap spaceTagIndexByName_;
std::mutex hostLock_;
HostAddr leader_;
std::unordered_map<std::pair<GraphSpaceID, PartitionID>, HostAddr, pair_hash> spacePartLeaderMap_;
std::unordered_map<GraphSpaceID, std::vector<PartitionID>> spacePartsMap_;
std::shared_ptr<folly::IOThreadPoolExecutor> ioExecutor_;
Expand Down
116 changes: 94 additions & 22 deletions src/mclient/MetaClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ MetaClient::MetaClient(const std::vector<std::string>& metaAddrs, const MConfig&
metaAddrs_.emplace_back(ip_port[0], folly::to<int32_t>(ip_port[1]));
}
CHECK(!metaAddrs_.empty()) << "metaAddrs_ is empty";
leader_ = metaAddrs_.back();
mConfig_ = mConfig;
SSLConfig sslcfg;
sslcfg.enable_mtls = mConfig_.enableMTLS_;
Expand Down Expand Up @@ -234,36 +235,107 @@ void MetaClient::getResponse(Request req,
RespGenerator respGen,
folly::Promise<std::pair<bool, Response>> pro) {
auto* evb = DCHECK_NOTNULL(ioExecutor_)->getEventBase();
HostAddr host = metaAddrs_.back();

HostAddr host;
{
std::lock_guard<std::mutex> holder(hostLock_);
host = leader_;
}
folly::via(evb,
[host,
evb,
req = std::move(req),
remoteFunc = std::move(remoteFunc),
respGen = std::move(respGen),
pro = std::move(pro),
retry,
retryLimit,
this]() mutable {
auto client = clientsMan_->client(host, evb, false, mConfig_.clientTimeoutInMs_);
LOG(INFO) << "Send request to meta " << host;
remoteFunc(client, req)
.via(evb)
.then([host, respGen = std::move(respGen), pro = std::move(pro)](
folly::Try<RpcResponse>&& t) mutable {
// exception occurred during RPC
if (t.hasException()) {
LOG(ERROR) << "Send request to meta" << host << " failed";
pro.setValue(std::make_pair(false, Response()));
return;
}
auto&& resp = t.value();
if (resp.get_code() == nebula::cpp2::ErrorCode::SUCCEEDED) {
// succeeded
pro.setValue(respGen(std::move(resp)));
return;
}
pro.setValue(std::make_pair(false, Response()));
}); // then
}); // via
auto client = clientsMan_->client(host, evb, false, mConfig_.clientTimeoutInMs_);
LOG(INFO) << "Send request to meta " << host;
remoteFunc(client, req)
.via(evb)
.then([host,
req = std::move(req),
remoteFunc = std::move(remoteFunc),
respGen = std::move(respGen),
pro = std::move(pro),
retry,
retryLimit,
evb,
this](folly::Try<RpcResponse>&& t) mutable {
// exception occurred during RPC
if (t.hasException()) {
updateLeader();
if (retry < retryLimit) {
evb->runAfterDelay(
[req = std::move(req),
remoteFunc = std::move(remoteFunc),
respGen = std::move(respGen),
pro = std::move(pro),
retry,
retryLimit,
this]() mutable {
getResponse(std::move(req),
std::move(remoteFunc),
std::move(respGen),
std::move(pro),
retry + 1,
retryLimit);
},
1000);
return;
} else {
LOG(ERROR) << "Send request to " << host << ", exceed retry limit";
pro.setValue(std::make_pair(false, Response()));
}
return;
}

auto&& resp = t.value();
auto code = resp.get_code();
if (code == nebula::cpp2::ErrorCode::SUCCEEDED) {
// succeeded
pro.setValue(respGen(std::move(resp)));
return;
} else if (code == nebula::cpp2::ErrorCode::E_LEADER_CHANGED ||
code == nebula::cpp2::ErrorCode::E_MACHINE_NOT_FOUND) {
updateLeader(resp.get_leader());
if (retry < retryLimit) {
evb->runAfterDelay(
[req = std::move(req),
remoteFunc = std::move(remoteFunc),
respGen = std::move(respGen),
pro = std::move(pro),
retry,
retryLimit,
this]() mutable {
getResponse(std::move(req),
std::move(remoteFunc),
std::move(respGen),
std::move(pro),
retry + 1,
retryLimit);
},
1000);
return;
}
} else if (code == nebula::cpp2::ErrorCode::E_CLIENT_SERVER_INCOMPATIBLE) {
pro.setValue(respGen(std::move(resp)));
return;
}
pro.setValue(std::make_pair(false, Response()));
}); // then
}); // via
}

void MetaClient::updateLeader(HostAddr leader) {
std::lock_guard<std::mutex> holder(hostLock_);
if (leader != HostAddr("", 0)) {
leader_ = leader;
} else {
leader_ = addrs_[folly::Random::rand64(addrs_.size())];
}
}

} // namespace nebula