Skip to content

Commit b00280d

Browse files
committed
ISSUE-65 new cluster interface
1 parent 6d4fc7c commit b00280d

20 files changed

Lines changed: 443 additions & 410 deletions

src/app_util/AppInfo.cpp

Lines changed: 160 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,170 @@ See the License for the specific language governing permissions and
1111
limitations under the License.
1212
**************************************************************************/
1313

14+
#include <absl/strings/str_split.h>
15+
#include <boost/property_tree/ptree.hpp>
16+
#include <boost/property_tree/json_parser.hpp>
1417
#include <spdlog/spdlog.h>
1518

1619
#include "AppInfo.h"
1720

1821
namespace gringofts::app {
1922

20-
void AppInfo::init(const INIReader &reader) {
23+
bool AppClusterParser::checkHasRoute(const std::string &routeStr, uint64_t clusterId, uint64_t epoch) {
24+
using boost::property_tree::ptree;
25+
using boost::property_tree::read_json;
26+
using boost::property_tree::write_json;
27+
using boost::property_tree::json_parser_error;
28+
std::stringstream ss(routeStr);
29+
ptree globalRoute;
30+
try {
31+
read_json(ss, globalRoute);
32+
auto routeEpoch = std::stoi(globalRoute.get_child("epoch").data());
33+
if (routeEpoch < epoch) {
34+
SPDLOG_WARN("global epoch {} is less than local epoch {}", routeEpoch, epoch);
35+
}
36+
auto infos = globalRoute.get_child("routeInfos");
37+
for (auto &[k, v] : infos) {
38+
auto clusterNode = v.get_child("clusterId");
39+
auto id = std::stoi(clusterNode.data());
40+
if (clusterId == id) {
41+
std::stringstream sout;
42+
write_json(sout, v);
43+
SPDLOG_INFO("find route for cluster {} : {}", clusterId, sout.str());
44+
return true;
45+
}
46+
}
47+
return false;
48+
} catch (const json_parser_error &err) {
49+
SPDLOG_ERROR("error when parse json {} for {}", routeStr, err.message());
50+
return false;
51+
} catch (const std::exception &err) {
52+
SPDLOG_ERROR("error when parse json {} for {}", routeStr, err.what());
53+
return false;
54+
}
55+
}
56+
57+
std::tuple<NodeId, ClusterId, ClusterParser::ClusterMap> AppClusterParser::parse(const INIReader &iniReader) {
58+
std::string storeType = iniReader.Get("cluster", "persistence.type", "UNKNOWN");
59+
assert(storeType == "raft");
60+
bool externalEnabled = iniReader.GetBoolean("raft.external", "enable", false);
61+
if (!externalEnabled) {
62+
/// load from local config, the cluster id and node id must be specified
63+
auto clusterConf = iniReader.Get("cluster", "cluster.conf", "");
64+
auto allClusterInfo = parseToClusterInfo(clusterConf);
65+
auto myClusterId = iniReader.GetInteger("cluster", "self.clusterId", 0);
66+
auto myNodeId = iniReader.GetInteger("cluster", "self.nodeId", 0);
67+
bool hasMe = false;
68+
for (auto &[clusterId, info] : allClusterInfo) {
69+
if (myClusterId == clusterId) {
70+
for (auto &[nodeId, node] : info.getAllNodes()) {
71+
if (nodeId == myNodeId) {
72+
hasMe = true;
73+
break;
74+
}
75+
}
76+
}
77+
}
78+
assert(hasMe);
79+
80+
Signal::hub.handle<RouteSignal>([](const Signal &s) {
81+
const auto &signal = dynamic_cast<const RouteSignal &>(s);
82+
SPDLOG_WARN("for non-external controlled cluster direct start raft");
83+
signal.passValue(true);
84+
});
85+
86+
SPDLOG_INFO("read raft cluster conf from local, "
87+
"cluster.conf={}, self.clusterId={}, self.nodeId={}",
88+
clusterConf, myClusterId, myNodeId);
89+
return {myClusterId, myNodeId, allClusterInfo};
90+
} else {
91+
// if enable external kv store for cluster info, it must have kv factory
92+
assert(mKvFactory);
93+
std::string externalConfigFile = iniReader.Get("raft.external", "config.file", "");
94+
std::string clusterConfKey = iniReader.Get("raft.external", "cluster.conf.key", "");
95+
std::string clusterRouteKey = iniReader.Get("raft.external", "cluster.route.key", "");
96+
assert(!externalConfigFile.empty());
97+
assert(!clusterConfKey.empty());
98+
99+
/// init external client
100+
auto client = mKvFactory->produce(INIReader(externalConfigFile));
101+
/// read raft cluster conf from external
102+
std::string clusterConf;
103+
auto r = client->getValue(clusterConfKey, &clusterConf);
104+
assert(!clusterConfKey.empty());
105+
auto allClusterInfo = parseToClusterInfo(clusterConf);
106+
/// N.B.: when using external, the assumption is two nodes will never run on the same host,
107+
/// otherwise below logic will break.
108+
auto myHostname = Util::getHostname();
109+
std::optional<ClusterId> myClusterId = std::nullopt;
110+
std::optional<NodeId> myNodeId = std::nullopt;
111+
for (auto &[clusterId, info] : allClusterInfo) {
112+
for (auto &[nodeId, node] : info.getAllNodes()) {
113+
if (myHostname == node->hostName()) {
114+
myClusterId = clusterId;
115+
myNodeId = nodeId;
116+
break;
117+
}
118+
}
119+
}
120+
assert(myClusterId);
121+
assert(myNodeId);
122+
123+
SPDLOG_INFO("raft cluster conf passed from external, "
124+
"cluster.conf={}, hostname={}", clusterConf, myHostname);
125+
auto clusterId = *myClusterId;
126+
127+
Signal::hub.handle<RouteSignal>([client, clusterRouteKey, clusterId](const Signal &s) {
128+
const auto &signal = dynamic_cast<const RouteSignal &>(s);
129+
std::string val;
130+
SPDLOG_INFO("receive signal for query route, cluster {}, epoch {}", clusterId, signal.mEpoch);
131+
assert(clusterId == signal.mClusterId);
132+
assert(!clusterRouteKey.empty());
133+
client->getValue(clusterRouteKey, &val);
134+
signal.passValue(checkHasRoute(val, clusterId, signal.mEpoch));
135+
});
136+
137+
return {*myClusterId, *myNodeId, allClusterInfo};
138+
}
139+
}
140+
141+
std::unordered_map<ClusterId, Cluster> AppClusterParser::parseToClusterInfo(const std::string &infoStr) const {
142+
std::unordered_map<ClusterId, Cluster> result;
143+
std::vector<std::string> clusters = absl::StrSplit(infoStr, ";");
144+
for (auto &c : clusters) {
145+
Cluster info;
146+
std::pair<std::string, std::string> clusterIdWithNodes = absl::StrSplit(c, "#");
147+
info.setClusterId(std::stoi(clusterIdWithNodes.first));
148+
std::vector<std::string> nodes = absl::StrSplit(clusterIdWithNodes.second, ",");
149+
for (auto &n : nodes) {
150+
std::pair<std::string, std::string> hostWithPort = absl::StrSplit(n, ":");
151+
std::pair<std::string, std::string> idWithHost = absl::StrSplit(hostWithPort.first, "@");
152+
auto nodeId = std::stoi(idWithHost.first);
153+
auto hostname = idWithHost.second;
154+
std::shared_ptr<Node> node;
155+
if (hostWithPort.second.empty()) {
156+
SPDLOG_INFO("{} no specific port, using default one", hostWithPort.second);
157+
node = std::make_shared<AppNode>(nodeId, hostname);
158+
} else {
159+
std::vector<std::string> ports = absl::StrSplit(hostWithPort.second, "|");
160+
assert(ports.size() == 6);
161+
auto portForRaft = std::stoi(ports[0]);
162+
auto portForGateway = std::stoi(ports[1]);
163+
auto portForDumper = std::stoi(ports[2]);
164+
auto portForStream = std::stoi(ports[3]);
165+
auto portForNetAdmin = std::stoi(ports[4]);
166+
auto portForScale = std::stoi(ports[5]);
167+
node = std::make_shared<AppNode>(nodeId, hostname, portForRaft, portForStream,
168+
portForGateway, portForDumper, portForNetAdmin, portForScale);
169+
}
170+
info.addNode(node);
171+
}
172+
result[info.id()] = info;
173+
}
174+
return result;
175+
}
176+
177+
void AppInfo::init(const INIReader &reader, std::unique_ptr<ClusterParser> parser) {
21178
auto &appInfo = getInstance();
22179

23180
auto &initialized = appInfo.initialized;
@@ -28,7 +185,7 @@ void AppInfo::init(const INIReader &reader) {
28185
return;
29186
}
30187

31-
auto[myClusterId, myNodeId, allClusterInfo] = ClusterInfo::resolveAllClusters(reader, nullptr);
188+
auto[myClusterId, myNodeId, allClusterInfo] = parser->parse(reader);
32189
appInfo.mMyClusterId = myClusterId;
33190
appInfo.mMyNodeId = myNodeId;
34191
appInfo.mAllClusterInfo = allClusterInfo;
@@ -54,4 +211,5 @@ void AppInfo::init(const INIReader &reader) {
54211
appInfo.mMyClusterId,
55212
appInfo.mMyNodeId);
56213
}
214+
57215
} /// namespace gringofts::app

src/app_util/AppInfo.h

Lines changed: 59 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,59 @@ limitations under the License.
1717

1818
#include <INIReader.h>
1919

20-
2120
#include "../infra/common_types.h"
22-
#include "../infra/util/ClusterInfo.h"
21+
#include "../infra/util/Cluster.h"
22+
#include "../infra/util/Signal.h"
23+
#include "../infra/util/KVClient.h"
2324

2425
namespace gringofts {
2526
namespace app {
2627

28+
struct RouteSignal : public FutureSignal<bool> {
29+
RouteSignal(uint64_t epoch, uint64_t clusterId) : mEpoch(epoch), mClusterId(clusterId) {}
30+
uint64_t mEpoch;
31+
uint64_t mClusterId;
32+
};
33+
34+
class AppNode : public RaftNode {
35+
static constexpr Port kDefaultGatewayPort = 50055;
36+
static constexpr Port kDefaultFetchPort = 50056;
37+
static constexpr Port kDefaultNetAdminPort = 50065;
38+
static constexpr Port kDefaultScalePort = 61203;
39+
public:
40+
AppNode(NodeId id, const HostName &hostName) : RaftNode(id, hostName) {}
41+
AppNode(NodeId id, const HostName &hostName,
42+
Port raftPort, Port streamPort,
43+
Port gateWayPort, Port dumperPort, Port netAdminPort, Port scalePort)
44+
: RaftNode(id, hostName, raftPort, streamPort),
45+
mPortForGateway(gateWayPort), mPortForFetch(dumperPort),
46+
mPortForNetAdmin(netAdminPort), mPortForScale(scalePort) {
47+
}
48+
inline Port gateWayPort() const { return mPortForGateway; }
49+
inline Port fetchPort() const { return mPortForFetch; }
50+
inline Port netAdminPort() const { return mPortForNetAdmin; }
51+
inline Port scalePort() const { return mPortForScale; }
52+
private:
53+
Port mPortForGateway = kDefaultGatewayPort;
54+
Port mPortForFetch = kDefaultFetchPort;
55+
Port mPortForNetAdmin = kDefaultNetAdminPort;
56+
Port mPortForScale = kDefaultScalePort;
57+
};
58+
59+
class AppClusterParser : public ClusterParser {
60+
public:
61+
AppClusterParser() : mKvFactory(nullptr) {}
62+
explicit AppClusterParser(std::unique_ptr<kv::ClientFactory> factory) : mKvFactory(std::move(factory)) {}
63+
std::tuple<NodeId, ClusterId, ClusterMap> parse(const INIReader &) override;
64+
65+
static bool checkHasRoute(const std::string &routeStr, uint64_t clusterId, uint64_t epoch);
66+
67+
private:
68+
std::unordered_map<ClusterId, Cluster> parseToClusterInfo(const std::string &infoStr) const;
69+
70+
std::unique_ptr<kv::ClientFactory> mKvFactory;
71+
};
72+
2773
class AppInfo final {
2874
public:
2975
~AppInfo() = default;
@@ -32,7 +78,8 @@ class AppInfo final {
3278
getInstance().initialized = false;
3379
}
3480

35-
static void init(const INIReader &reader);
81+
static void init(const INIReader &reader,
82+
std::unique_ptr<ClusterParser> parser = std::make_unique<AppClusterParser>());
3683

3784
/// disallow copy ctor and copy assignment
3885
AppInfo(const AppInfo &) = delete;
@@ -44,16 +91,16 @@ class AppInfo final {
4491
static bool stressTestEnabled() { return getInstance().mStressTestEnabled; }
4592
static std::string appVersion() { return getInstance().mAppVersion; }
4693

47-
static ClusterInfo getMyClusterInfo() {
94+
static Cluster getMyClusterInfo() {
4895
return getInstance().mAllClusterInfo[getInstance().mMyClusterId];
4996
}
5097

51-
static ClusterInfo::Node getMyNode() {
98+
static std::shared_ptr<AppNode> getMyNode() {
5299
assert(getInstance().initialized);
53-
return getMyClusterInfo().getAllNodeInfo()[getMyNodeId()];
100+
return std::dynamic_pointer_cast<AppNode>(getMyClusterInfo().getAllNodes()[getMyNodeId()]);
54101
}
55102

56-
static std::optional<ClusterInfo> getClusterInfo(uint64_t clusterId) {
103+
static std::optional<Cluster> getClusterInfo(uint64_t clusterId) {
57104
if (getInstance().mAllClusterInfo.count(clusterId)) {
58105
return getInstance().mAllClusterInfo[clusterId];
59106
} else {
@@ -65,20 +112,19 @@ class AppInfo final {
65112
static ClusterId getMyClusterId() { return getInstance().mMyClusterId; }
66113

67114
static Port netAdminPort() {
68-
auto node = getMyClusterInfo().getAllNodeInfo()[getMyNodeId()];
69-
return node.mPortForNetAdmin;
115+
return getMyNode()->netAdminPort();
70116
}
71117

72118
static Port scalePort() {
73-
return getMyNode().mPortForScale;
119+
return getMyNode()->scalePort();
74120
}
75121

76122
static Port gatewayPort() {
77-
return getMyNode().mPortForGateway;
123+
return getMyNode()->gateWayPort();
78124
}
79125

80126
static Port fetchPort() {
81-
return getMyNode().mPortForFetcher;
127+
return getMyNode()->fetchPort();
82128
}
83129

84130
private:
@@ -123,7 +169,7 @@ class AppInfo final {
123169
/**
124170
* Cluster Info
125171
*/
126-
std::map<ClusterId, ClusterInfo> mAllClusterInfo;
172+
std::unordered_map<ClusterId, Cluster> mAllClusterInfo;
127173
ClusterId mMyClusterId;
128174
NodeId mMyNodeId;
129175
};

src/app_util/NetAdminServer.h

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ using gringofts::raft::RaftRole;
6565
class NetAdminServer final : public AppNetAdmin::Service {
6666
public:
6767
NetAdminServer(const INIReader &reader,
68-
std::shared_ptr<NetAdminServiceProvider> netAdminProxy, uint64_t port = kDefaultNetAdminPort) :
68+
std::shared_ptr<NetAdminServiceProvider> netAdminProxy, uint64_t port = AppInfo::netAdminPort()) :
6969
mServiceProvider(netAdminProxy),
7070
mSnapshotTakenCounter(getCounter("snapshot_taken_counter", {})),
7171
mSnapshotFailedCounter(getCounter("snapshot_failed_counter", {})),
@@ -240,9 +240,9 @@ class NetAdminServer final : public AppNetAdmin::Service {
240240
}
241241

242242
std::vector<std::string> targets;
243-
for (const auto &nodeKV : fromClusterOpt->getAllNodeInfo()) {
244-
auto &node = nodeKV.second;
245-
auto targetHost = node.mHostName + ":" + std::to_string(node.mPortForStream);
243+
for (const auto &nodeKV : fromClusterOpt->getAllNodes()) {
244+
auto node = std::dynamic_pointer_cast<AppNode>(nodeKV.second);
245+
auto targetHost = node->hostName() + ":" + std::to_string(node->streamPort());
246246
SPDLOG_INFO("set up sync target {}", targetHost);
247247
targets.push_back(std::move(targetHost));
248248
}

src/app_util/control/CtrlState.cpp

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ limitations under the License.
1717
#include <spdlog/spdlog.h>
1818

1919
#include "../AppInfo.h"
20-
#include "../../infra/util/ClusterInfo.h"
2120
#include "../../infra/util/Signal.h"
2221
#include "../../infra/raft/RaftSignal.h"
2322

@@ -82,7 +81,7 @@ void CtrlState::recoverForEAL(std::string_view str) {
8281
// for cluster Id > 0, need to start raft
8382
// for cluster = 0, direct start raft
8483
if (hasState() && mClusterId > 0) {
85-
auto routeSignal = std::make_shared<gringofts::RouteSignal>(mEpoch, mClusterId);
84+
auto routeSignal = std::make_shared<gringofts::app::RouteSignal>(mEpoch, mClusterId);
8685
// query route info to guarantee it can start raft
8786
Signal::hub << routeSignal;
8887
if (routeSignal->getFuture().get()) {

src/infra/CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ set(GRINGOFTS_MONITOR_SRC
3333

3434
set(GRINGOFTS_UTIL_SRC
3535
util/BigDecimal.cpp
36-
util/ClusterInfo.cpp
36+
util/Cluster.cpp
3737
util/CryptoUtil.cpp
3838
util/FileUtil.cpp
3939
util/TrackingMemoryResource.cpp

src/infra/raft/RaftBuilder.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ namespace raft {
2929
inline std::shared_ptr<RaftInterface> buildRaftImpl(
3030
const char *configPath,
3131
const NodeId &nodeId,
32-
const ClusterInfo &clusterInfo,
32+
const Cluster &cluster,
3333
std::shared_ptr<DNSResolver> dnsResolver = nullptr,
3434
RaftRole role = RaftRole::Follower ) {
3535
INIReader iniReader(configPath);
@@ -46,7 +46,7 @@ inline std::shared_ptr<RaftInterface> buildRaftImpl(
4646
/// use default dns resolver
4747
dnsResolver = std::make_shared<DNSResolver>();
4848
}
49-
return std::make_shared<v2::RaftCore>(configPath, nodeId, clusterInfo, dnsResolver, role);
49+
return std::make_shared<v2::RaftCore>(configPath, nodeId, cluster, dnsResolver, role);
5050
} else {
5151
SPDLOG_ERROR("Unknown raft implement version {}.", version);
5252
exit(1);

0 commit comments

Comments
 (0)