Skip to content
This repository was archived by the owner on Aug 19, 2019. It is now read-only.

Commit a8e609c

Browse files
committed
Add support for watching and ingesting more Kubernetes resources. (#194)
The threads to watch resources other than Node and Pod are started based on the Kubernetes resource kind and API version provided in a static vector. This works for all resource types that use the generic callback.
1 parent 52dedaa commit a8e609c

2 files changed

Lines changed: 46 additions & 30 deletions

File tree

src/kubernetes.cc

Lines changed: 34 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -951,6 +951,28 @@ KubernetesUpdater::KubernetesUpdater(const Configuration& config,
951951
config.KubernetesUpdaterIntervalSeconds(),
952952
[=]() { return reader_.MetadataQuery(); }) { }
953953

954+
const std::vector<KubernetesUpdater::WatchId>&
955+
KubernetesUpdater::ClusterLevelObjectTypes() {
956+
static const std::vector<WatchId>* cluster_level_object_types =
957+
new std::vector<WatchId>{
958+
{"cronjobs", "batch/v1beta1"},
959+
{"daemonsets", "apps/v1"},
960+
{"daemonsets", "extensions/v1beta1"},
961+
{"deployments", "apps/v1"},
962+
{"deployments", "extensions/v1beta1"},
963+
{"endpoints", "v1"},
964+
{"ingresses", "extensions/v1beta1"},
965+
{"jobs", "batch/v1"},
966+
{"namespaces", "v1"},
967+
{"replicasets", "apps/v1"},
968+
{"replicasets", "extensions/v1beta1"},
969+
{"replicationcontrollers", "v1"},
970+
{"services", "v1"},
971+
{"statefulsets", "apps/v1"},
972+
};
973+
return *cluster_level_object_types;
974+
}
975+
954976
void KubernetesUpdater::ValidateDynamicConfiguration() const
955977
throw(ConfigurationValidationError) {
956978
PollingMetadataUpdater::ValidateDynamicConfiguration();
@@ -983,20 +1005,20 @@ void KubernetesUpdater::StartUpdater() {
9831005
auto cb = [=](std::vector<MetadataUpdater::ResourceMetadata>&& results) {
9841006
MetadataCallback(std::move(results));
9851007
};
986-
node_watch_thread_ = std::thread([=]() {
1008+
object_watch_threads_.emplace(WatchId("nodes", "v1"), std::thread([=]() {
9871009
reader_.WatchNodes(watched_node, cb);
988-
});
989-
pod_watch_thread_ = std::thread([=]() {
1010+
}));
1011+
object_watch_threads_.emplace(WatchId("pods", "v1"), std::thread([=]() {
9901012
reader_.WatchPods(watched_node, cb);
991-
});
992-
if (config().KubernetesClusterLevelMetadata() &&
993-
config().KubernetesServiceMetadata()) {
994-
service_watch_thread_ = std::thread([=]() {
995-
reader_.WatchObjects("services", "v1", cb);
996-
});
997-
endpoints_watch_thread_ = std::thread([=]() {
998-
reader_.WatchObjects("endpoints", "v1", cb);
999-
});
1013+
}));
1014+
if (config().KubernetesClusterLevelMetadata()) {
1015+
for (const auto& watch_id: ClusterLevelObjectTypes()) {
1016+
const std::string& plural_kind = watch_id.first;
1017+
const std::string& api_version = watch_id.second;
1018+
object_watch_threads_.emplace(watch_id, std::thread([=]() {
1019+
reader_.WatchObjects(plural_kind, api_version, cb);
1020+
}));
1021+
}
10001022
}
10011023
} else {
10021024
// Only try to poll if watch is disabled.

src/kubernetes.h

Lines changed: 12 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -174,9 +174,6 @@ class KubernetesReader {
174174
mutable std::string current_node_;
175175
mutable std::string kubernetes_api_token_;
176176
mutable std::string kubernetes_namespace_;
177-
// A memoized map from version to a map from kind to name.
178-
mutable std::map<std::string, std::map<std::string, std::string>>
179-
version_to_kind_to_name_;
180177

181178
const Configuration& config_;
182179
HealthChecker* health_checker_;
@@ -189,17 +186,11 @@ class KubernetesUpdater : public PollingMetadataUpdater {
189186
KubernetesUpdater(const Configuration& config, HealthChecker* health_checker,
190187
MetadataStore* store);
191188
~KubernetesUpdater() {
192-
if (node_watch_thread_.joinable()) {
193-
node_watch_thread_.join();
194-
}
195-
if (pod_watch_thread_.joinable()) {
196-
pod_watch_thread_.join();
197-
}
198-
if (service_watch_thread_.joinable()) {
199-
service_watch_thread_.join();
200-
}
201-
if (endpoints_watch_thread_.joinable()) {
202-
endpoints_watch_thread_.join();
189+
for (auto& thread_it: object_watch_threads_) {
190+
std::thread& watch_thread = thread_it.second;
191+
if (watch_thread.joinable()) {
192+
watch_thread.join();
193+
}
203194
}
204195
}
205196

@@ -211,15 +202,18 @@ class KubernetesUpdater : public PollingMetadataUpdater {
211202
void NotifyStopUpdater();
212203

213204
private:
205+
// WatchId combines the plural Kubernetes kind and API version.
206+
using WatchId = std::pair<std::string, std::string>;
207+
// List of cluster level objects to watch.
208+
static const std::vector<WatchId>& ClusterLevelObjectTypes();
209+
214210
// Metadata watcher callback.
215211
void MetadataCallback(std::vector<ResourceMetadata>&& result_vector);
216212

217213
KubernetesReader reader_;
218214
HealthChecker* health_checker_;
219-
std::thread node_watch_thread_;
220-
std::thread pod_watch_thread_;
221-
std::thread service_watch_thread_;
222-
std::thread endpoints_watch_thread_;
215+
// Map from the watch IDs to the respective threads.
216+
std::map<WatchId, std::thread> object_watch_threads_;
223217
};
224218

225219
}

0 commit comments

Comments
 (0)