Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions include/pingcap/kv/Cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ struct Cluster

Cluster()
: pd_client(std::make_shared<pd::MockPDClient>())
, rpc_client(std::make_unique<RpcClient>())
, thread_pool(std::make_unique<pingcap::common::FixedThreadPool>(1))
, rpc_client(std::make_unique<RpcClient>(pd_client, ClusterConfig{}))
, thread_pool(std::make_unique<pingcap::common::FixedThreadPool>(2))
, mpp_prober(std::make_unique<common::MPPProber>(this))
{
startBackgroundTasks();
Expand All @@ -44,11 +44,11 @@ struct Cluster
Cluster(const std::vector<std::string> & pd_addrs, const ClusterConfig & config)
: pd_client(std::make_shared<pd::CodecClient>(pd_addrs, config))
, region_cache(std::make_unique<RegionCache>(pd_client, config))
, rpc_client(std::make_unique<RpcClient>(config))
, rpc_client(std::make_unique<RpcClient>(pd_client, config))
, oracle(std::make_unique<pd::Oracle>(pd_client, std::chrono::milliseconds(oracle_update_interval)))
, lock_resolver(std::make_unique<LockResolver>(this))
, api_version(config.api_version)
, thread_pool(std::make_unique<pingcap::common::FixedThreadPool>(2))
, thread_pool(std::make_unique<pingcap::common::FixedThreadPool>(3))
, mpp_prober(std::make_unique<common::MPPProber>(this))
{
startBackgroundTasks();
Expand All @@ -64,6 +64,7 @@ struct Cluster
// (e.g. background threads) that cluster object holds so as to exit elegantly.
~Cluster()
{
rpc_client->stop();
mpp_prober->stop();
if (region_cache)
region_cache->stop();
Expand Down
6 changes: 3 additions & 3 deletions include/pingcap/kv/RegionClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ struct RegionClient
}
std::string err_msg = rpc.errMsg(status);
log->warning(err_msg);
onSendFail(bo, Exception(err_msg, GRPCErrorCode), ctx);
onSendFail(bo, Exception(err_msg, GRPCErrorCode), ctx, status);
continue;
}
if (resp->has_region_error())
Expand Down Expand Up @@ -199,15 +199,15 @@ struct RegionClient
}
std::string err_msg = rpc.errMsg(status);
log->warning(err_msg);
onSendFail(bo, Exception(err_msg, GRPCErrorCode), ctx);
onSendFail(bo, Exception(err_msg, GRPCErrorCode), ctx, status);
}
}

protected:
void onRegionError(Backoffer & bo, RPCContextPtr rpc_ctx, const errorpb::Error & err) const;

// Normally, it happens when machine down or network partition between tidb and kv or process crash.
void onSendFail(Backoffer & bo, const Exception & e, RPCContextPtr rpc_ctx) const;
void onSendFail(Backoffer & bo, const Exception & e, RPCContextPtr rpc_ctx, const ::grpc::Status & status) const;
};

} // namespace kv
Expand Down
34 changes: 33 additions & 1 deletion include/pingcap/kv/Rpc.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,21 @@
#include <pingcap/kv/RegionCache.h>
#include <pingcap/kv/internal/type_traits.h>

#include <atomic>
#include <chrono>
#include <condition_variable>
#include <map>
#include <mutex>
#include <type_traits>
#include <utility>
#include <vector>

namespace pingcap
{
namespace kv
{
constexpr auto rpc_conn_check_interval = std::chrono::minutes(10);
constexpr auto rpc_conn_check_interval_jitter = std::chrono::minutes(5);

struct ConnArray
{
std::mutex mutex;
Expand All @@ -34,31 +41,56 @@ using GRPCMetaData = std::multimap<std::string, std::string>;
struct RpcClient
{
ClusterConfig config;
pd::ClientPtr pd_client;

std::mutex mutex;

std::map<std::string, ConnArrayPtr> conns;

Logger * log = &Logger::get("pingcap.RpcClient");
std::chrono::minutes scan_interval = rpc_conn_check_interval;
std::atomic<bool> stopped = false;
std::condition_variable scan_cv;

RpcClient() = default;

explicit RpcClient(const ClusterConfig & config_)
: config(config_)
{}

RpcClient(pd::ClientPtr pd_client_, const ClusterConfig & config_)
: config(config_)
, pd_client(std::move(pd_client_))
{}

void update(const ClusterConfig & config_)
{
std::unique_lock lk(mutex);
config = config_;
conns.clear();
}

void run();

void stop();

void scanAndRemoveInvalidConns();

void removeConn(const std::string & addr);

ConnArrayPtr getConnArray(const std::string & addr);

ConnArrayPtr createConnArray(const std::string & addr);
};

using RpcClientPtr = std::unique_ptr<RpcClient>;

inline void dropConnIfNeeded(const RpcClientPtr & client, const std::string & addr, const ::grpc::Status & status)
{
if (status.error_code() == grpc::StatusCode::UNAVAILABLE)
client->removeConn(addr);
}

// RpcCall holds the request and response, and delegates RPC calls.
template <typename T>
class RpcCall
Expand Down
3 changes: 3 additions & 0 deletions src/kv/Cluster.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ void Cluster::startBackgroundTasks()
{
thread_pool->start();

thread_pool->enqueue([this] {
rpc_client->run();
});
thread_pool->enqueue([this] {
mpp_prober->run();
});
Expand Down
3 changes: 2 additions & 1 deletion src/kv/RegionClient.cc
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,10 @@ void RegionClient::onRegionError(Backoffer & bo, RPCContextPtr rpc_ctx, const er
cluster->region_cache->dropRegion(rpc_ctx->region);
}

void RegionClient::onSendFail(Backoffer & bo, const Exception & e, RPCContextPtr rpc_ctx) const
void RegionClient::onSendFail(Backoffer & bo, const Exception & e, RPCContextPtr rpc_ctx, const ::grpc::Status & status) const
{
cluster->region_cache->onSendReqFail(rpc_ctx, e);
dropConnIfNeeded(cluster->rpc_client, rpc_ctx->addr, status);
// Retry on send request failure when it's not canceled.
// When a store is not available, the leader of related region should be elected quickly.
bo.backoff(boTiKVRPC, e);
Expand Down
104 changes: 103 additions & 1 deletion src/kv/Rpc.cc
Original file line number Diff line number Diff line change
@@ -1,9 +1,40 @@
#include <pingcap/Exception.h>
#include <pingcap/kv/Rpc.h>

#include <random>
#include <unordered_set>

namespace pingcap
{
namespace kv
{
namespace
{
std::unordered_set<std::string> getStoreAddresses(const pd::ClientPtr & pd_client)
{
std::unordered_set<std::string> store_addrs;
const auto stores = pd_client->getAllStores(true);
store_addrs.reserve(stores.size());
for (const auto & store : stores)
{
if (!store.address().empty())
store_addrs.emplace(store.address());
}
return store_addrs;
}

std::chrono::seconds getRandomScanInterval(std::chrono::minutes scan_interval)
{
const auto min_seconds = std::chrono::duration_cast<std::chrono::seconds>(scan_interval);
const auto max_seconds = std::chrono::duration_cast<std::chrono::seconds>(
scan_interval + rpc_conn_check_interval_jitter);

thread_local std::mt19937_64 generator(std::random_device{}());
std::uniform_int_distribution<std::chrono::seconds::rep> distribution(min_seconds.count(), max_seconds.count());
return std::chrono::seconds(distribution(generator));
}
} // namespace

ConnArray::ConnArray(size_t max_size, const std::string & addr, const ClusterConfig & config_)
: address(addr)
, index(0)
Expand All @@ -22,6 +53,78 @@ std::shared_ptr<KvConnClient> ConnArray::get()
return vec[index];
}

void RpcClient::run()
{
while (!stopped.load())
{
{
const auto wait_interval = getRandomScanInterval(scan_interval);
std::unique_lock lock(mutex);
scan_cv.wait_for(lock, wait_interval, [this] {
return stopped.load();
});
}

if (stopped.load())
return;

try
{
scanAndRemoveInvalidConns();
}
catch (...)
{
log->warning(getCurrentExceptionMsg("RpcClient scan conns failed: "));
}
}
}

void RpcClient::stop()
{
stopped.store(true);
scan_cv.notify_all();
}

void RpcClient::scanAndRemoveInvalidConns()
{
if (!pd_client || pd_client->isMock())
return;

std::vector<std::string> conn_snapshot;
{
std::lock_guard<std::mutex> lock(mutex);
conn_snapshot.reserve(conns.size());
for (const auto & conn : conns)
conn_snapshot.emplace_back(conn.first);
}

if (conn_snapshot.empty())
return;

const auto store_addrs = getStoreAddresses(pd_client);
std::vector<std::string> invalid_conns;
for (const auto & addr : conn_snapshot)
{
if (store_addrs.find(addr) == store_addrs.end())
invalid_conns.push_back(addr);
}

if (invalid_conns.empty())
return;

for (const auto & addr : invalid_conns)
{
removeConn(addr);
}
}

void RpcClient::removeConn(const std::string & addr)
{
std::lock_guard<std::mutex> lock(mutex);
if (conns.erase(addr))
log->information("delete invalid addr: " + addr);
}

ConnArrayPtr RpcClient::getConnArray(const std::string & addr)
{
std::lock_guard<std::mutex> lock(mutex);
Expand All @@ -39,6 +142,5 @@ ConnArrayPtr RpcClient::createConnArray(const std::string & addr)
conns[addr] = conn_array;
return conn_array;
}

} // namespace kv
} // namespace pingcap