|
17 | 17 | #include "ApMonBackend.h" |
18 | 18 | #include <iostream> |
19 | 19 | #include <sstream> |
| 20 | +#include <vector> |
| 21 | +#include <map> |
| 22 | +#include <unistd.h> |
| 23 | +#include <limits.h> |
| 24 | +#include <cstdlib> |
20 | 25 | #include "../MonLogger.h" |
21 | 26 | #include "../Exceptions/MonitoringException.h" |
22 | 27 |
|
@@ -59,69 +64,125 @@ void ApMonBackend::addGlobalTag(std::string_view /*name*/, std::string_view valu |
59 | 64 | mEntity += value; |
60 | 65 | } |
61 | 66 |
|
62 | | -void ApMonBackend::send(const Metric& metric) |
| 67 | +std::string ApMonBackend::getNodeName() |
63 | 68 | { |
64 | | - std::string name = metric.getName(); |
65 | | - std::string entity = mEntity; |
66 | | - for (const auto& [key, value] : metric.getTags()) { |
67 | | - entity += ','; |
68 | | - entity += tags::TAG_KEY[key]; |
69 | | - entity += '='; |
70 | | - (value > 0) ? entity += tags::GetValue(value) : entity += std::to_string(0 - value); |
| 69 | + const char* env_p = std::getenv("ALIEN_PROC_ID"); |
| 70 | + if (env_p) { |
| 71 | + return std::string(env_p); |
71 | 72 | } |
72 | | - if (mRunNumber != 0) entity += (",run=" + std::to_string(mRunNumber)); |
73 | | - |
74 | | - int valueSize = metric.getValuesSize(); |
75 | | - char **paramNames, **paramValues; |
76 | | - int* valueTypes; |
77 | | - paramNames = (char**)std::malloc(valueSize * sizeof(char*)); |
78 | | - paramValues = (char**)std::malloc(valueSize * sizeof(char*)); |
79 | | - valueTypes = (int*)std::malloc(valueSize * sizeof(int)); |
80 | | - // the scope of values must be the same as sendTimedParameters method |
81 | | - int intValue; |
82 | | - double doubleValue; |
83 | | - std::string stringValue; |
84 | | - |
85 | | - auto& values = metric.getValues(); |
86 | | - |
87 | | - for (int i = 0; i < valueSize; i++) { |
88 | | - paramNames[i] = const_cast<char*>(values[i].first.c_str()); |
89 | | - std::visit(overloaded{ |
90 | | - [&](int value) { |
91 | | - valueTypes[i] = XDR_INT32; |
92 | | - intValue = value; |
93 | | - paramValues[i] = reinterpret_cast<char*>(&intValue); |
94 | | - }, |
95 | | - [&](double value) { |
96 | | - valueTypes[i] = XDR_REAL64; |
97 | | - doubleValue = value; |
98 | | - paramValues[i] = reinterpret_cast<char*>(&doubleValue); |
99 | | - }, |
100 | | - [&](const std::string& value) { |
101 | | - valueTypes[i] = XDR_STRING; |
102 | | - stringValue = value; |
103 | | - paramValues[i] = const_cast<char*>(stringValue.c_str()); |
104 | | - }, |
105 | | - [&](uint64_t value) { |
106 | | - valueTypes[i] = XDR_REAL64; |
107 | | - doubleValue = static_cast<double>(value); |
108 | | - paramValues[i] = reinterpret_cast<char*>(&doubleValue); |
109 | | - }, |
110 | | - }, values[i].second); |
| 73 | + |
| 74 | + char hostname[HOST_NAME_MAX]; |
| 75 | + if (gethostname(hostname, sizeof(hostname)) == 0) { |
| 76 | + hostname[sizeof(hostname) - 1] = '\0'; |
| 77 | + return std::string(hostname); |
111 | 78 | } |
112 | 79 |
|
113 | | - mApMon->sendTimedParameters(const_cast<char*>(name.c_str()), const_cast<char*>(entity.c_str()), |
114 | | - valueSize, paramNames, valueTypes, paramValues, convertTimestamp(metric.getTimestamp())); |
| 80 | + MonLogger::Get(Severity::Error) << "Failed to get hostname, using 'unknown'" << MonLogger::End(); |
| 81 | + return "unknown"; |
| 82 | +} |
| 83 | + |
| 84 | +void ApMonBackend::sendBatch(const std::vector<reference_wrapper<const Metric>>& metrics) |
| 85 | +{ |
| 86 | + std::string clusterName(mClusterName); |
| 87 | + std::string nodeName = getNodeName(); |
| 88 | + |
| 89 | + int totalValues = 0; |
| 90 | + for (const auto& metric : metrics) { |
| 91 | + totalValues += metric.get().getValuesSize(); |
| 92 | + } |
| 93 | + const int totalParams = totalValues * 2; |
| 94 | + std::vector<int> intValues; |
| 95 | + std::vector<double> doubleValues; |
| 96 | + std::vector<std::string> stringValues; |
| 97 | + std::vector<char*> paramNames; |
| 98 | + std::vector<char*> paramValues; |
| 99 | + std::vector<int> valueTypes; |
| 100 | + |
| 101 | + intValues.reserve(totalValues); |
| 102 | + doubleValues.reserve(totalValues); |
| 103 | + stringValues.reserve(metrics.size() * 3 + totalValues); |
| 104 | + paramNames.reserve(totalParams); |
| 105 | + paramValues.reserve(totalParams); |
| 106 | + valueTypes.reserve(totalParams); |
| 107 | + |
| 108 | + for (const auto& metric : metrics) { |
| 109 | + std::string entity = mEntity; |
| 110 | + for (const auto& [key, value] : metric.get().getTags()) { |
| 111 | + entity += ','; |
| 112 | + entity += tags::TAG_KEY[key]; |
| 113 | + entity += '='; |
| 114 | + (value > 0) ? entity += tags::GetValue(value) : entity += std::to_string(0 - value); |
| 115 | + } |
| 116 | + if (mRunNumber != 0) entity += (",run=" + std::to_string(mRunNumber)); |
| 117 | + |
| 118 | + auto& values = metric.get().getValues(); |
| 119 | + const int valueSize = metric.get().getValuesSize(); |
| 120 | + |
| 121 | + const std::string_view metricName = metric.get().getName(); |
| 122 | + stringValues.emplace_back(metricName); |
| 123 | + const char* metriNamePtr = stringValues.back().c_str(); |
| 124 | + stringValues.emplace_back(std::string(metricName) + "_src"); |
| 125 | + const char* metriNameSrcPtr = stringValues.back().c_str(); |
| 126 | + stringValues.push_back(std::move(entity)); |
| 127 | + const char* entityPtr = stringValues.back().c_str(); |
| 128 | + |
| 129 | + for (int i = 0; i < valueSize; ++i) { |
| 130 | + paramNames.push_back(const_cast<char*>(metriNamePtr)); |
| 131 | + std::visit(overloaded{ |
| 132 | + [&](int value) { |
| 133 | + valueTypes.push_back(XDR_INT32); |
| 134 | + intValues.push_back(value); |
| 135 | + paramValues.push_back(reinterpret_cast<char*>(&intValues.back())); |
| 136 | + }, |
| 137 | + [&](double value) { |
| 138 | + valueTypes.push_back(XDR_REAL64); |
| 139 | + doubleValues.push_back(value); |
| 140 | + paramValues.push_back(reinterpret_cast<char*>(&doubleValues.back())); |
| 141 | + }, |
| 142 | + [&](const std::string& value) { |
| 143 | + valueTypes.push_back(XDR_STRING); |
| 144 | + stringValues.push_back(value); |
| 145 | + paramValues.push_back(const_cast<char*>(stringValues.back().c_str())); |
| 146 | + }, |
| 147 | + [&](uint64_t value) { |
| 148 | + valueTypes.push_back(XDR_REAL64); |
| 149 | + doubleValues.push_back(static_cast<double>(value)); |
| 150 | + paramValues.push_back(reinterpret_cast<char*>(&doubleValues.back())); |
| 151 | + }, |
| 152 | + }, values[i].second); |
115 | 153 |
|
116 | | - std::free(paramNames); |
117 | | - std::free(paramValues); |
118 | | - std::free(valueTypes); |
| 154 | + paramNames.push_back(const_cast<char*>(metriNameSrcPtr)); |
| 155 | + valueTypes.push_back(XDR_STRING); |
| 156 | + paramValues.push_back(const_cast<char*>(entityPtr)); |
| 157 | + } |
| 158 | + } |
| 159 | + |
| 160 | + mApMon->sendTimedParameters( |
| 161 | + const_cast<char*>(clusterName.c_str()), |
| 162 | + const_cast<char*>(nodeName.c_str()), |
| 163 | + totalParams, paramNames.data(), valueTypes.data(), paramValues.data(), |
| 164 | + convertTimestamp(metrics[0].get().getTimestamp()) |
| 165 | + ); |
| 166 | +} |
| 167 | + |
| 168 | +void ApMonBackend::send(const Metric& metric) |
| 169 | +{ |
| 170 | + sendBatch(std::vector<std::reference_wrapper<const Metric>>{std::cref(metric)}); |
119 | 171 | } |
120 | 172 |
|
121 | 173 | void ApMonBackend::send(std::vector<Metric>&& metrics) |
122 | 174 | { |
123 | | - for (auto& metric : metrics) { |
124 | | - send(metric); |
| 175 | + if (metrics.empty()) { |
| 176 | + return; |
| 177 | + } |
| 178 | + |
| 179 | + std::map<int, std::vector<std::reference_wrapper<const Metric>>> metricsByTimestamp; |
| 180 | + for (const auto& metric : metrics) { |
| 181 | + metricsByTimestamp[convertTimestamp(metric.getTimestamp())].push_back(std::cref(metric)); |
| 182 | + } |
| 183 | + |
| 184 | + for (const auto& [timestamp, metricsGroup] : metricsByTimestamp) { |
| 185 | + sendBatch(metricsGroup); |
125 | 186 | } |
126 | 187 | } |
127 | 188 |
|
|
0 commit comments