|
17 | 17 | #include "ApMonBackend.h" |
18 | 18 | #include <iostream> |
19 | 19 | #include <sstream> |
| 20 | +#include <vector> |
| 21 | +#include <map> |
20 | 22 | #include <unistd.h> |
21 | 23 | #include <limits.h> |
| 24 | +#include <cstdlib> |
22 | 25 | #include "../MonLogger.h" |
23 | 26 | #include "../Exceptions/MonitoringException.h" |
24 | 27 |
|
@@ -78,84 +81,108 @@ std::string ApMonBackend::getNodeName() |
78 | 81 | return "unknown"; |
79 | 82 | } |
80 | 83 |
|
81 | | -void ApMonBackend::send(const Metric& metric) |
| 84 | +void ApMonBackend::sendBatch(const std::vector<reference_wrapper<const Metric>>& metrics) |
82 | 85 | { |
83 | 86 | std::string clusterName(mClusterName); |
84 | | - std::string metricName = metric.getName(); |
85 | 87 | std::string nodeName = getNodeName(); |
86 | | - std::string entity = mEntity; |
87 | | - for (const auto& [key, value] : metric.getTags()) { |
88 | | - entity += ','; |
89 | | - entity += tags::TAG_KEY[key]; |
90 | | - entity += '='; |
91 | | - (value > 0) ? entity += tags::GetValue(value) : entity += std::to_string(0 - value); |
| 88 | + |
| 89 | + int totalValues = 0; |
| 90 | + for (const auto& metric : metrics) { |
| 91 | + totalValues += metric.get().getValuesSize(); |
92 | 92 | } |
93 | | - if (mRunNumber != 0) entity += (",run=" + std::to_string(mRunNumber)); |
94 | | - |
95 | | - int valueSize = metric.getValuesSize(); |
96 | | - int totalParams = valueSize * 2; // each metric value has a source parameter |
97 | | - char **paramNames, **paramValues; |
98 | | - int* valueTypes; |
99 | | - paramNames = (char**)std::malloc(totalParams * sizeof(char*)); |
100 | | - paramValues = (char**)std::malloc(totalParams * sizeof(char*)); |
101 | | - valueTypes = (int*)std::malloc(totalParams * sizeof(int)); |
102 | | - // the scope of values must be the same as sendTimedParameters method |
103 | | - int intValue; |
104 | | - double doubleValue; |
105 | | - std::string stringValue; |
106 | | - |
107 | | - auto& values = metric.getValues(); |
108 | | - std::string sourceName = metricName + "_src"; |
109 | | - |
110 | | - for (int i = 0; i < valueSize; ++i) { |
111 | | - int metricIdx = i * 2; |
112 | | - int sourceIdx = metricIdx + 1; |
113 | | - paramNames[metricIdx] = const_cast<char*>(metricName.c_str()); |
114 | | - std::visit(overloaded{ |
115 | | - [&](int value) { |
116 | | - valueTypes[metricIdx] = XDR_INT32; |
117 | | - intValue = value; |
118 | | - paramValues[metricIdx] = reinterpret_cast<char*>(&intValue); |
119 | | - }, |
120 | | - [&](double value) { |
121 | | - valueTypes[metricIdx] = XDR_REAL64; |
122 | | - doubleValue = value; |
123 | | - paramValues[metricIdx] = reinterpret_cast<char*>(&doubleValue); |
124 | | - }, |
125 | | - [&](const std::string& value) { |
126 | | - valueTypes[metricIdx] = XDR_STRING; |
127 | | - stringValue = value; |
128 | | - paramValues[metricIdx] = const_cast<char*>(stringValue.c_str()); |
129 | | - }, |
130 | | - [&](uint64_t value) { |
131 | | - valueTypes[metricIdx] = XDR_REAL64; |
132 | | - doubleValue = static_cast<double>(value); |
133 | | - paramValues[metricIdx] = reinterpret_cast<char*>(&doubleValue); |
134 | | - }, |
135 | | - }, values[metricIdx].second); |
136 | | - |
137 | | - paramNames[sourceIdx] = const_cast<char*>(sourceName.c_str()); |
138 | | - valueTypes[sourceIdx] = XDR_STRING; |
139 | | - stringValue = entity; |
140 | | - paramValues[sourceIdx] = const_cast<char*>(stringValue.c_str()); |
| 93 | + const int totalParams = totalValues * 2; |
| 94 | + std::vector<int> intValues; |
| 95 | + std::vector<double> doubleValues; |
| 96 | + std::vector<std::string> stringValues; |
| 97 | + std::vector<char*> paramNames; |
| 98 | + std::vector<char*> paramValues; |
| 99 | + std::vector<int> valueTypes; |
| 100 | + |
| 101 | + intValues.reserve(totalValues); |
| 102 | + doubleValues.reserve(totalValues); |
| 103 | + stringValues.reserve(metrics.size() * 3 + totalValues); |
| 104 | + paramNames.reserve(totalParams); |
| 105 | + paramValues.reserve(totalParams); |
| 106 | + valueTypes.reserve(totalParams); |
| 107 | + |
| 108 | + for (const auto& metric : metrics) { |
| 109 | + std::string entity = mEntity; |
| 110 | + for (const auto& [key, value] : metric.get().getTags()) { |
| 111 | + entity += ','; |
| 112 | + entity += tags::TAG_KEY[key]; |
| 113 | + entity += '='; |
| 114 | + (value > 0) ? entity += tags::GetValue(value) : entity += std::to_string(0 - value); |
| 115 | + } |
| 116 | + if (mRunNumber != 0) entity += (",run=" + std::to_string(mRunNumber)); |
| 117 | + |
| 118 | + auto& values = metric.get().getValues(); |
| 119 | + const int valueSize = metric.get().getValuesSize(); |
| 120 | + |
| 121 | + const std::string_view metricName = metric.get().getName(); |
| 122 | + stringValues.emplace_back(metricName); |
| 123 | + const char* metriNamePtr = stringValues.back().c_str(); |
| 124 | + stringValues.emplace_back(std::string(metricName) + "_src"); |
| 125 | + const char* metriNameSrcPtr = stringValues.back().c_str(); |
| 126 | + stringValues.push_back(std::move(entity)); |
| 127 | + const char* entityPtr = stringValues.back().c_str(); |
| 128 | + |
| 129 | + for (int i = 0; i < valueSize; ++i) { |
| 130 | + paramNames.push_back(const_cast<char*>(metriNamePtr)); |
| 131 | + std::visit(overloaded{ |
| 132 | + [&](int value) { |
| 133 | + valueTypes.push_back(XDR_INT32); |
| 134 | + intValues.push_back(value); |
| 135 | + paramValues.push_back(reinterpret_cast<char*>(&intValues.back())); |
| 136 | + }, |
| 137 | + [&](double value) { |
| 138 | + valueTypes.push_back(XDR_REAL64); |
| 139 | + doubleValues.push_back(value); |
| 140 | + paramValues.push_back(reinterpret_cast<char*>(&doubleValues.back())); |
| 141 | + }, |
| 142 | + [&](const std::string& value) { |
| 143 | + valueTypes.push_back(XDR_STRING); |
| 144 | + stringValues.push_back(value); |
| 145 | + paramValues.push_back(const_cast<char*>(stringValues.back().c_str())); |
| 146 | + }, |
| 147 | + [&](uint64_t value) { |
| 148 | + valueTypes.push_back(XDR_REAL64); |
| 149 | + doubleValues.push_back(static_cast<double>(value)); |
| 150 | + paramValues.push_back(reinterpret_cast<char*>(&doubleValues.back())); |
| 151 | + }, |
| 152 | + }, values[i].second); |
| 153 | + |
| 154 | + paramNames.push_back(const_cast<char*>(metriNameSrcPtr)); |
| 155 | + valueTypes.push_back(XDR_STRING); |
| 156 | + paramValues.push_back(const_cast<char*>(entityPtr)); |
| 157 | + } |
141 | 158 | } |
142 | 159 |
|
143 | 160 | mApMon->sendTimedParameters( |
144 | 161 | const_cast<char*>(clusterName.c_str()), |
145 | 162 | const_cast<char*>(nodeName.c_str()), |
146 | | - totalParams, paramNames, valueTypes, paramValues, |
147 | | - convertTimestamp(metric.getTimestamp()) |
| 163 | + totalParams, paramNames.data(), valueTypes.data(), paramValues.data(), |
| 164 | + convertTimestamp(metrics[0].get().getTimestamp()) |
148 | 165 | ); |
| 166 | +} |
149 | 167 |
|
150 | | - std::free(paramNames); |
151 | | - std::free(paramValues); |
152 | | - std::free(valueTypes); |
| 168 | +void ApMonBackend::send(const Metric& metric) |
| 169 | +{ |
| 170 | + sendBatch(std::vector<std::reference_wrapper<const Metric>>{std::cref(metric)}); |
153 | 171 | } |
154 | 172 |
|
155 | 173 | void ApMonBackend::send(std::vector<Metric>&& metrics) |
156 | 174 | { |
157 | | - for (auto& metric : metrics) { |
158 | | - send(metric); |
| 175 | + if (metrics.empty()) { |
| 176 | + return; |
| 177 | + } |
| 178 | + |
| 179 | + std::map<int, std::vector<std::reference_wrapper<const Metric>>> metricsByTimestamp; |
| 180 | + for (const auto& metric : metrics) { |
| 181 | + metricsByTimestamp[convertTimestamp(metric.getTimestamp())].push_back(std::cref(metric)); |
| 182 | + } |
| 183 | + |
| 184 | + for (const auto& [timestamp, metricsGroup] : metricsByTimestamp) { |
| 185 | + sendBatch(metricsGroup); |
159 | 186 | } |
160 | 187 | } |
161 | 188 |
|
|
0 commit comments