Skip to content
This repository was archived by the owner on Aug 19, 2019. It is now read-only.

Commit 7fe7b36

Browse files
committed
Add support for watching and ingesting Services and Endponts.
For services and endpoints, we use generic code, since computing resource full name, type, location etc does not need to be type specific.
1 parent 757c49b commit 7fe7b36

3 files changed

Lines changed: 148 additions & 1 deletion

File tree

src/kubernetes.cc

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
#include <algorithm>
2020
#include <boost/algorithm/string/join.hpp>
21+
#include <boost/algorithm/string/split.hpp>
2122
#include <boost/asio/ip/host_name.hpp>
2223
#include <boost/network/protocol/http/client.hpp>
2324
#include <boost/range/iterator_range.hpp>
@@ -106,6 +107,86 @@ class KubernetesReader::NonRetriableError
106107
NonRetriableError(const std::string& what) : QueryException(what) {}
107108
};
108109

110+
const std::pair<std::string, std::string> TypeAndVersion(
111+
const std::string api_version, const std::string kind) {
112+
const int group_name_len = api_version.find('/');
113+
if (group_name_len == std::string::npos) {
114+
return std::make_pair("io.k8s." + kind, api_version);
115+
} else {
116+
std::vector<std::string> slash_split;
117+
boost::algorithm::split(
118+
slash_split, api_version, boost::algorithm::is_any_of("/"));
119+
const std::string group_name = slash_split[0];
120+
const std::string version = slash_split[1];
121+
return std::make_pair("io.k8s." + group_name + "." + kind, version);
122+
}
123+
}
124+
125+
const std::string KubernetesReader::FullResourceName(
126+
const std::string& self_link) const {
127+
std::vector<std::string> slash_split;
128+
boost::algorithm::split(
129+
slash_split, self_link, boost::algorithm::is_any_of("/"));
130+
131+
std::vector<std::string> link_components;
132+
if(slash_split[1] == "api") {
133+
// Core resources, start with "/api/<version>/..."
134+
link_components.assign(slash_split.begin() + 3, slash_split.end());
135+
} else {
136+
// Non-core resources, start with "/apis/<group-name>/<version>/..."
137+
const std::string group_name = slash_split[2];
138+
link_components.push_back(group_name);
139+
link_components.insert(link_components.end(),
140+
slash_split.begin() + 4, slash_split.end());
141+
142+
}
143+
const std::string relative_link =
144+
boost::algorithm::join(link_components, "/");
145+
const std::string cluster_full_name = ClusterFullName();
146+
return cluster_full_name + "/k8s/" + relative_link;
147+
}
148+
149+
MetadataUpdater::ResourceMetadata KubernetesReader::GetResourceMetadata(
150+
const json::Object* resource, Timestamp collected_at, bool is_deleted) const
151+
throw(json::Exception) {
152+
const std::string cluster_location = environment_.KubernetesClusterLocation();
153+
154+
const std::string kind = resource->Get<json::String>("kind");
155+
const std::string api_version = resource->Get<json::String>("apiVersion");
156+
const json::Object* metadata = resource->Get<json::Object>("metadata");
157+
const std::string self_link = metadata->Get<json::String>("selfLink");
158+
const std::pair<std::string, std::string> type_and_version =
159+
TypeAndVersion(api_version, kind);
160+
const std::string type = type_and_version.first;
161+
const std::string version = type_and_version.second;
162+
163+
const std::string schema =
164+
format::Substitute(std::string(kKubernetesSchemaNameFormat),
165+
{{"type", type}, {"version", version}});
166+
const std::string created_str =
167+
metadata->Get<json::String>("creationTimestamp");
168+
Timestamp created_at = time::rfc3339::FromString(created_str);
169+
const std::string resource_full_name = FullResourceName(self_link);
170+
171+
if (config_.VerboseLogging()) {
172+
LOG(INFO) << "Raw resource metadata for full name: " << resource_full_name
173+
<< ": " << *resource;
174+
}
175+
176+
const MonitoredResource dummy_mr("", {});
177+
return MetadataUpdater::ResourceMetadata(
178+
std::vector<std::string>{}, dummy_mr,
179+
resource_full_name,
180+
#ifdef ENABLE_KUBERNETES_METADATA
181+
MetadataStore::Metadata(type, cluster_location, version,
182+
schema, is_deleted, created_at, collected_at,
183+
resource->Clone())
184+
#else
185+
MetadataStore::Metadata::IGNORED()
186+
#endif
187+
);
188+
}
189+
109190
KubernetesReader::KubernetesReader(const Configuration& config,
110191
HealthChecker* health_checker)
111192
: config_(config), environment_(config), health_checker_(health_checker) {}
@@ -791,6 +872,41 @@ void KubernetesReader::ValidateDynamicConfiguration() const
791872
}
792873
}
793874

875+
void KubernetesReader::ResourceCallback(
876+
MetadataUpdater::UpdateCallback callback,
877+
const json::Object* resource, Timestamp collected_at, bool is_deleted) const
878+
throw(json::Exception) {
879+
std::vector<MetadataUpdater::ResourceMetadata> result_vector;
880+
result_vector.emplace_back(
881+
GetResourceMetadata(resource, collected_at, is_deleted));
882+
callback(std::move(result_vector));
883+
}
884+
885+
void KubernetesReader::WatchResources(
886+
const std::string& api_path, const std::string& name,
887+
MetadataUpdater::UpdateCallback callback) const {
888+
LOG(INFO) << "Watch thread (" << name << ") started";
889+
890+
try {
891+
// TODO: There seems to be a Kubernetes API bug with watch=true.
892+
WatchMaster(
893+
name, api_path,
894+
[=](const json::Object* resource, Timestamp collected_at,
895+
bool is_deleted) {
896+
ResourceCallback(callback, resource, collected_at, is_deleted);
897+
});
898+
} catch (const json::Exception& e) {
899+
LOG(ERROR) << e.what();
900+
LOG(ERROR) << "No more " << name << " metadata will be collected";
901+
} catch (const KubernetesReader::QueryException& e) {
902+
LOG(ERROR) << "No more " << name << " metadata will be collected";
903+
}
904+
if (health_checker_) {
905+
health_checker_->SetUnhealthy("kubernetes_node_thread");
906+
}
907+
LOG(INFO) << "Watch thread (" << name << ") exiting";
908+
}
909+
794910
void KubernetesReader::PodCallback(
795911
MetadataUpdater::UpdateCallback callback,
796912
const json::Object* pod, Timestamp collected_at, bool is_deleted) const
@@ -912,6 +1028,12 @@ void KubernetesUpdater::StartUpdater() {
9121028
pod_watch_thread_ = std::thread([=]() {
9131029
reader_.WatchPods(watched_node, cb);
9141030
});
1031+
service_watch_thread_ = std::thread([=]() {
1032+
reader_.WatchResources("/api/v1/services", "Service", cb);
1033+
});
1034+
endpoints_watch_thread_ = std::thread([=]() {
1035+
reader_.WatchResources("/api/v1/endpoints", "Endpoints", cb);
1036+
});
9151037
} else {
9161038
// Only try to poll if watch is disabled.
9171039
PollingMetadataUpdater::StartUpdater();

src/kubernetes.h

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,10 @@ class KubernetesReader {
5555
void WatchPods(const std::string& node_name,
5656
MetadataUpdater::UpdateCallback callback) const;
5757

58+
// Generic Kubernetes resource watcher.
59+
void WatchResources(const std::string& api_path, const std::string& name,
60+
MetadataUpdater::UpdateCallback callback) const;
61+
5862
// Gets the name of the node the agent is running on.
5963
// Returns an empty string if unable to find the current node.
6064
const std::string& CurrentNode() const;
@@ -71,6 +75,9 @@ class KubernetesReader {
7175
};
7276
class NonRetriableError;
7377

78+
// Computes the full resource name given the self link.
79+
const std::string FullResourceName(const std::string& self_link) const;
80+
7481
// Issues a Kubernetes master API query at a given path and
7582
// returns a parsed JSON response. The path has to start with "/".
7683
json::value QueryMaster(const std::string& path) const
@@ -100,6 +107,15 @@ class KubernetesReader {
100107
MetadataUpdater::UpdateCallback callback, const json::Object* pod,
101108
Timestamp collected_at, bool is_deleted) const throw(json::Exception);
102109

110+
// Kubernetes resource watch callback.
111+
void ResourceCallback(
112+
MetadataUpdater::UpdateCallback callback, const json::Object* resource,
113+
Timestamp collected_at, bool is_deleted) const throw(json::Exception);
114+
115+
// Given a generic kubernetes object, return the associated metadata.
116+
MetadataUpdater::ResourceMetadata GetResourceMetadata(
117+
const json::Object* resource, Timestamp collected_at, bool is_deleted)
118+
const throw(json::Exception);
103119
// Given a node object, return the associated metadata.
104120
MetadataUpdater::ResourceMetadata GetNodeMetadata(
105121
const json::Object* node, Timestamp collected_at, bool is_deleted) const
@@ -163,6 +179,12 @@ class KubernetesUpdater : public PollingMetadataUpdater {
163179
if (pod_watch_thread_.joinable()) {
164180
pod_watch_thread_.join();
165181
}
182+
if (service_watch_thread_.joinable()) {
183+
service_watch_thread_.join();
184+
}
185+
if (endpoints_watch_thread_.joinable()) {
186+
endpoints_watch_thread_.join();
187+
}
166188
}
167189

168190
protected:
@@ -180,6 +202,8 @@ class KubernetesUpdater : public PollingMetadataUpdater {
180202
HealthChecker* health_checker_;
181203
std::thread node_watch_thread_;
182204
std::thread pod_watch_thread_;
205+
std::thread service_watch_thread_;
206+
std::thread endpoints_watch_thread_;
183207
};
184208

185209
}

src/sample_agent_config.yaml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,8 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
MetadataIngestionEndpointFormat: "https://staging-stackdriver.sandbox.googleapis.com/v1beta3/projects/{{project_id}}/resourceMetadata:publish"
15+
MetadataIngestionHost: "https://staging-stackdriver.sandbox.googleapis.com"
16+
KubernetesUseWatch: true
1617
#CredentialsFile: /tmp/token.json
1718
#ProjectId: "1234567890"
1819
#InstanceZone: "us-central1-a"

0 commit comments

Comments
 (0)