Skip to content

Commit 6592eae

Browse files
authored
Merge pull request #10668 from Icinga/perfdata-writers-connection-handling
Add PerfdatawriterConnection to handle network requests for Perfdata Writers
2 parents 41109e2 + 75b2ec6 commit 6592eae

34 files changed

Lines changed: 1750 additions & 760 deletions

doc/09-object-types.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1214,6 +1214,7 @@ Configuration Attributes:
12141214
port | Number | **Required.** Elasticsearch port. Defaults to `9200`.
12151215
index | String | **Required.** Prefix for the index names. Defaults to `icinga2`.
12161216
enable\_send\_perfdata | Boolean | **Optional.** Send parsed performance data metrics for check results. Defaults to `false`.
1217+
diconnect\_timeout | Duration | **Optional.** Timeout to wait for any outstanding data to be flushed to Elasticsearch before disconnecting. Defaults to `10s`.
12171218
flush\_interval | Duration | **Optional.** How long to buffer data points before transferring to Elasticsearch. Defaults to `10s`.
12181219
flush\_threshold | Number | **Optional.** How many data points to buffer before forcing a transfer to Elasticsearch. Defaults to `1024`.
12191220
username | String | **Optional.** Basic auth username if Elasticsearch is hidden behind an HTTP proxy.
@@ -1310,6 +1311,7 @@ Configuration Attributes:
13101311
--------------------------|-----------------------|----------------------------------
13111312
host | String | **Optional.** GELF receiver host address. Defaults to `127.0.0.1`.
13121313
port | Number | **Optional.** GELF receiver port. Defaults to `12201`.
1314+
diconnect\_timeout | Duration | **Optional.** Timeout to wait for any outstanding data to be flushed to GELF before disconnecting. Defaults to `10s`.
13131315
source | String | **Optional.** Source name for this instance. Defaults to `icinga2`.
13141316
enable\_send\_perfdata | Boolean | **Optional.** Enable performance data for 'CHECK RESULT' events.
13151317
enable\_ha | Boolean | **Optional.** Enable the high availability functionality. Only valid in a [cluster setup](06-distributed-monitoring.md#distributed-monitoring-high-availability-features). Defaults to `false`.
@@ -1340,6 +1342,7 @@ Configuration Attributes:
13401342
--------------------------|-----------------------|----------------------------------
13411343
host | String | **Optional.** Graphite Carbon host address. Defaults to `127.0.0.1`.
13421344
port | Number | **Optional.** Graphite Carbon port. Defaults to `2003`.
1345+
diconnect\_timeout | Duration | **Optional.** Timeout to wait for any outstanding data to be flushed to Graphite before disconnecting. Defaults to `10s`.
13431346
host\_name\_template | String | **Optional.** Metric prefix for host name. Defaults to `icinga2.$host.name$.host.$host.check_command$`.
13441347
service\_name\_template | String | **Optional.** Metric prefix for service name. Defaults to `icinga2.$host.name$.services.$service.name$.$service.check_command$`.
13451348
enable\_send\_thresholds | Boolean | **Optional.** Send additional threshold metrics. Defaults to `false`.
@@ -1682,6 +1685,7 @@ Configuration Attributes:
16821685
service\_template | Dictionary | **Required.** Service template to define the influxDB line protocol.
16831686
enable\_send\_thresholds | Boolean | **Optional.** Whether to send warn, crit, min & max tagged data.
16841687
enable\_send\_metadata | Boolean | **Optional.** Whether to send check metadata e.g. states, execution time, latency etc.
1688+
diconnect\_timeout | Duration | **Optional.** Timeout to wait for any outstanding data to be flushed to InfluxDB before disconnecting. Defaults to `10s`.
16851689
flush\_interval | Duration | **Optional.** How long to buffer data points before transferring to InfluxDB. Defaults to `10s`.
16861690
flush\_threshold | Number | **Optional.** How many data points to buffer before forcing a transfer to InfluxDB. Defaults to `1024`.
16871691
enable\_ha | Boolean | **Optional.** Enable the high availability functionality. Only valid in a [cluster setup](06-distributed-monitoring.md#distributed-monitoring-high-availability-features). Defaults to `false`.
@@ -1745,6 +1749,7 @@ Configuration Attributes:
17451749
service\_template | Dictionary | **Required.** Service template to define the influxDB line protocol.
17461750
enable\_send\_thresholds | Boolean | **Optional.** Whether to send warn, crit, min & max tagged data.
17471751
enable\_send\_metadata | Boolean | **Optional.** Whether to send check metadata e.g. states, execution time, latency etc.
1752+
diconnect\_timeout | Duration | **Optional.** Timeout to wait for any outstanding data to be flushed to InfluxDB before disconnecting. Defaults to `10s`.
17481753
flush\_interval | Duration | **Optional.** How long to buffer data points before transferring to InfluxDB. Defaults to `10s`.
17491754
flush\_threshold | Number | **Optional.** How many data points to buffer before forcing a transfer to InfluxDB. Defaults to `1024`.
17501755
enable\_ha | Boolean | **Optional.** Enable the high availability functionality. Only valid in a [cluster setup](06-distributed-monitoring.md#distributed-monitoring-high-availability-features). Defaults to `false`.
@@ -1860,6 +1865,7 @@ Configuration Attributes:
18601865
--------------------------|-----------------------|----------------------------------
18611866
host | String | **Optional.** OpenTSDB host address. Defaults to `127.0.0.1`.
18621867
port | Number | **Optional.** OpenTSDB port. Defaults to `4242`.
1868+
diconnect\_timeout | Duration | **Optional.** Timeout to wait for any outstanding data to be flushed to OpenTSDB before disconnecting. Defaults to `10s`.
18631869
enable\_ha | Boolean | **Optional.** Enable the high availability functionality. Only valid in a [cluster setup](06-distributed-monitoring.md#distributed-monitoring-high-availability-features). Defaults to `false`.
18641870
enable_generic_metrics | Boolean | **Optional.** Re-use metric names to store different perfdata values for a particular check. Use tags to distinguish perfdata instead of metric name. Defaults to `false`.
18651871
host_template | Dictionary | **Optional.** Specify additional tags to be included with host metrics. This requires a sub-dictionary named `tags`. Also specify a naming prefix by setting `metric`. More information can be found in [OpenTSDB custom tags](14-features.md#opentsdb-custom-tags) and [OpenTSDB Metric Prefix](14-features.md#opentsdb-metric-prefix). More information can be found in [OpenTSDB custom tags](14-features.md#opentsdb-custom-tags). Defaults to an `empty Dictionary`.

lib/base/i2-base.hpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,5 +76,10 @@
7676
#define BOOST_BIND_NO_PLACEHOLDERS
7777

7878
#include <functional>
79+
#include <chrono>
80+
81+
namespace icinga {
82+
using namespace std::chrono_literals;
83+
} // namespace icinga
7984

8085
#endif /* I2BASE_H */

lib/perfdata/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ set(perfdata_SOURCES
1919
influxdb2writer.cpp influxdb2writer.hpp influxdb2writer-ti.hpp
2020
opentsdbwriter.cpp opentsdbwriter.hpp opentsdbwriter-ti.hpp
2121
perfdatawriter.cpp perfdatawriter.hpp perfdatawriter-ti.hpp
22+
perfdatawriterconnection.cpp perfdatawriterconnection.hpp
2223
)
2324

2425
if(ICINGA2_UNITY_BUILD)

lib/perfdata/elasticsearchwriter.cpp

Lines changed: 56 additions & 141 deletions
Original file line numberDiff line numberDiff line change
@@ -2,37 +2,22 @@
22
// SPDX-License-Identifier: GPL-2.0-or-later
33

44
#include "perfdata/elasticsearchwriter.hpp"
5+
#include "base/defer.hpp"
56
#include "perfdata/elasticsearchwriter-ti.cpp"
67
#include "remote/url.hpp"
78
#include "icinga/compatutility.hpp"
89
#include "icinga/service.hpp"
910
#include "icinga/macroprocessor.hpp"
1011
#include "icinga/checkcommand.hpp"
1112
#include "base/application.hpp"
12-
#include "base/defer.hpp"
13-
#include "base/io-engine.hpp"
14-
#include "base/tcpsocket.hpp"
1513
#include "base/stream.hpp"
1614
#include "base/base64.hpp"
1715
#include "base/json.hpp"
1816
#include "base/utility.hpp"
19-
#include "base/networkstream.hpp"
2017
#include "base/perfdatavalue.hpp"
2118
#include "base/exception.hpp"
2219
#include "base/statsfunction.hpp"
2320
#include <boost/algorithm/string.hpp>
24-
#include <boost/asio/ssl/context.hpp>
25-
#include <boost/beast/core/flat_buffer.hpp>
26-
#include <boost/beast/http/field.hpp>
27-
#include <boost/beast/http/message.hpp>
28-
#include <boost/beast/http/parser.hpp>
29-
#include <boost/beast/http/read.hpp>
30-
#include <boost/beast/http/status.hpp>
31-
#include <boost/beast/http/string_body.hpp>
32-
#include <boost/beast/http/verb.hpp>
33-
#include <boost/beast/http/write.hpp>
34-
#include <boost/scoped_array.hpp>
35-
#include <memory>
3621
#include <string>
3722
#include <utility>
3823

@@ -78,12 +63,25 @@ void ElasticsearchWriter::StatsFunc(const Dictionary::Ptr& status, const Array::
7863
status->Set("elasticsearchwriter", new Dictionary(std::move(nodes)));
7964
}
8065

66+
void ElasticsearchWriter::Start(bool runtimeCreated)
67+
{
68+
ObjectImpl::Start(runtimeCreated);
69+
70+
if (GetEnableTls()) {
71+
try {
72+
m_SslContext = MakeAsioSslContext(GetCertPath(), GetKeyPath(), GetCaPath());
73+
} catch (const std::exception& ex) {
74+
Log(LogCritical, "ElasticsearchWriter")
75+
<< "Unable to create SSL context: " << ex.what();
76+
throw;
77+
}
78+
}
79+
}
80+
8181
void ElasticsearchWriter::Resume()
8282
{
8383
ObjectImpl<ElasticsearchWriter>::Resume();
8484

85-
m_EventPrefix = "icinga2.event.";
86-
8785
Log(LogInformation, "ElasticsearchWriter")
8886
<< "'" << GetName() << "' resumed.";
8987

@@ -96,6 +94,8 @@ void ElasticsearchWriter::Resume()
9694
m_FlushTimer->Start();
9795
m_FlushTimer->Reschedule(0);
9896

97+
m_Connection = new PerfdataWriterConnection{this, GetHost(), GetPort(), m_SslContext, !GetInsecureNoverify()};
98+
9999
/* Register for new metrics. */
100100
m_HandleCheckResults = Checkable::OnNewCheckResult.connect([this](const Checkable::Ptr& checkable,
101101
const CheckResult::Ptr& cr, const MessageOrigin::Ptr&) {
@@ -120,12 +120,17 @@ void ElasticsearchWriter::Pause()
120120
m_HandleNotifications.disconnect();
121121

122122
m_FlushTimer->Stop(true);
123-
m_WorkQueue.Join();
124123

125-
{
126-
std::unique_lock<std::mutex> lock (m_DataBufferMutex);
124+
std::promise<void> queueDonePromise;
125+
m_WorkQueue.Enqueue([&]() {
127126
Flush();
128-
}
127+
queueDonePromise.set_value();
128+
}, PriorityLow);
129+
130+
auto timeout = std::chrono::duration<double>{GetDisconnectTimeout()};
131+
m_Connection->CancelAfterTimeout(queueDonePromise.get_future(), timeout);
132+
133+
m_WorkQueue.Join();
129134

130135
Log(LogInformation, "ElasticsearchWriter")
131136
<< "'" << GetName() << "' paused.";
@@ -269,6 +274,10 @@ void ElasticsearchWriter::CheckResultHandler(const Checkable::Ptr& checkable, co
269274
AddTemplateTags(fields, checkable, cr);
270275

271276
m_WorkQueue.Enqueue([this, checkable, cr, fields = std::move(fields)]() {
277+
if (m_Connection->IsStopped()) {
278+
return;
279+
}
280+
272281
CONTEXT("Elasticwriter processing check result for '" << checkable->GetName() << "'");
273282

274283
AddCheckResult(fields, checkable, cr);
@@ -308,6 +317,10 @@ void ElasticsearchWriter::StateChangeHandler(const Checkable::Ptr& checkable, co
308317
AddTemplateTags(fields, checkable, cr);
309318

310319
m_WorkQueue.Enqueue([this, checkable, cr, fields = std::move(fields)]() {
320+
if (m_Connection->IsStopped()) {
321+
return;
322+
}
323+
311324
CONTEXT("Elasticwriter processing state change '" << checkable->GetName() << "'");
312325

313326
AddCheckResult(fields, checkable, cr);
@@ -358,6 +371,10 @@ void ElasticsearchWriter::NotificationSentToAllUsersHandler(const Checkable::Ptr
358371
AddTemplateTags(fields, checkable, cr);
359372

360373
m_WorkQueue.Enqueue([this, checkable, cr, fields = std::move(fields)]() {
374+
if (m_Connection->IsStopped()) {
375+
return;
376+
}
377+
361378
CONTEXT("Elasticwriter processing notification to all users '" << checkable->GetName() << "'");
362379

363380
Log(LogDebug, "ElasticsearchWriter")
@@ -379,15 +396,10 @@ void ElasticsearchWriter::Enqueue(const Checkable::Ptr& checkable, const String&
379396
{
380397
AssertOnWorkQueue();
381398

382-
/* Atomically buffer the data point. */
383-
std::unique_lock<std::mutex> lock(m_DataBufferMutex);
384-
385399
/* Format the timestamps to dynamically select the date datatype inside the index. */
386400
fields->Set("@timestamp", FormatTimestamp(ts));
387401
fields->Set("timestamp", FormatTimestamp(ts));
388-
389-
String eventType = m_EventPrefix + type;
390-
fields->Set("type", eventType);
402+
fields->Set("type", "icinga2.event." + type);
391403

392404
/* Every payload needs a line describing the index.
393405
* We do it this way to avoid problems with a near full queue.
@@ -408,19 +420,21 @@ void ElasticsearchWriter::Enqueue(const Checkable::Ptr& checkable, const String&
408420
}
409421
}
410422

423+
/**
424+
* Queues a Flush on the work-queue if there isn't one queued already.
425+
*/
411426
void ElasticsearchWriter::FlushTimeout()
412427
{
413-
/* Prevent new data points from being added to the array, there is a
414-
* race condition where they could disappear.
415-
*/
416-
std::unique_lock<std::mutex> lock(m_DataBufferMutex);
428+
if (m_FlushTimerInQueue.exchange(true, std::memory_order_relaxed)) {
429+
return;
430+
}
417431

418-
/* Flush if there are any data available. */
419-
if (m_DataBuffer.size() > 0) {
420-
Log(LogDebug, "ElasticsearchWriter")
421-
<< "Timer expired writing " << m_DataBuffer.size() << " data points";
432+
m_WorkQueue.Enqueue([&]() {
433+
Defer resetFlushTimer{
434+
[&]() { m_FlushTimerInQueue.store(false, std::memory_order_relaxed); }
435+
};
422436
Flush();
423-
}
437+
});
424438
}
425439

426440
void ElasticsearchWriter::Flush()
@@ -466,22 +480,6 @@ void ElasticsearchWriter::SendRequest(const String& body)
466480

467481
url->SetPath(path);
468482

469-
OptionalTlsStream stream;
470-
471-
try {
472-
stream = Connect();
473-
} catch (const std::exception& ex) {
474-
Log(LogWarning, "ElasticsearchWriter")
475-
<< "Flush failed, cannot connect to Elasticsearch: " << DiagnosticInformation(ex, false);
476-
return;
477-
}
478-
479-
Defer s ([&stream]() {
480-
if (stream.first) {
481-
stream.first->next_layer().shutdown();
482-
}
483-
});
484-
485483
http::request<http::string_body> request (http::verb::post, std::string(url->Format(true)), 10);
486484

487485
request.set(http::field::user_agent, "Icinga/" + Application::GetAppVersion());
@@ -511,37 +509,14 @@ void ElasticsearchWriter::SendRequest(const String& body)
511509
<< "Sending " << request.method_string() << " request" << ((!username.IsEmpty() && !password.IsEmpty()) ? " with basic auth" : "" )
512510
<< " to '" << url->Format() << "'.";
513511

512+
decltype(m_Connection->Send(request)) response;
514513
try {
515-
if (stream.first) {
516-
http::write(*stream.first, request);
517-
stream.first->flush();
518-
} else {
519-
http::write(*stream.second, request);
520-
stream.second->flush();
521-
}
522-
} catch (const std::exception&) {
523-
Log(LogWarning, "ElasticsearchWriter")
524-
<< "Cannot write to HTTP API on host '" << GetHost() << "' port '" << GetPort() << "'.";
525-
throw;
526-
}
527-
528-
http::parser<false, http::string_body> parser;
529-
beast::flat_buffer buf;
530-
531-
try {
532-
if (stream.first) {
533-
http::read(*stream.first, buf, parser);
534-
} else {
535-
http::read(*stream.second, buf, parser);
536-
}
537-
} catch (const std::exception& ex) {
538-
Log(LogWarning, "ElasticsearchWriter")
539-
<< "Failed to parse HTTP response from host '" << GetHost() << "' port '" << GetPort() << "': " << DiagnosticInformation(ex, false);
540-
throw;
514+
response = m_Connection->Send(request);
515+
} catch (const PerfdataWriterConnection::Stopped& ex) {
516+
Log(LogDebug, "ElasticsearchWriter") << ex.what();
517+
return;
541518
}
542519

543-
auto& response (parser.get());
544-
545520
if (response.result_int() > 299) {
546521
if (response.result() == http::status::unauthorized) {
547522
/* More verbose error logging with Elasticsearch is hidden behind a proxy. */
@@ -589,66 +564,6 @@ void ElasticsearchWriter::SendRequest(const String& body)
589564
}
590565
}
591566

592-
OptionalTlsStream ElasticsearchWriter::Connect()
593-
{
594-
Log(LogNotice, "ElasticsearchWriter")
595-
<< "Connecting to Elasticsearch on host '" << GetHost() << "' port '" << GetPort() << "'.";
596-
597-
OptionalTlsStream stream;
598-
bool tls = GetEnableTls();
599-
600-
if (tls) {
601-
Shared<boost::asio::ssl::context>::Ptr sslContext;
602-
603-
try {
604-
sslContext = MakeAsioSslContext(GetCertPath(), GetKeyPath(), GetCaPath());
605-
} catch (const std::exception&) {
606-
Log(LogWarning, "ElasticsearchWriter")
607-
<< "Unable to create SSL context.";
608-
throw;
609-
}
610-
611-
stream.first = Shared<AsioTlsStream>::Make(IoEngine::Get().GetIoContext(), *sslContext, GetHost());
612-
613-
} else {
614-
stream.second = Shared<AsioTcpStream>::Make(IoEngine::Get().GetIoContext());
615-
}
616-
617-
try {
618-
icinga::Connect(tls ? stream.first->lowest_layer() : stream.second->lowest_layer(), GetHost(), GetPort());
619-
} catch (const std::exception&) {
620-
Log(LogWarning, "ElasticsearchWriter")
621-
<< "Can't connect to Elasticsearch on host '" << GetHost() << "' port '" << GetPort() << "'.";
622-
throw;
623-
}
624-
625-
if (tls) {
626-
auto& tlsStream (stream.first->next_layer());
627-
628-
try {
629-
tlsStream.handshake(tlsStream.client);
630-
} catch (const std::exception&) {
631-
Log(LogWarning, "ElasticsearchWriter")
632-
<< "TLS handshake with host '" << GetHost() << "' on port " << GetPort() << " failed.";
633-
throw;
634-
}
635-
636-
if (!GetInsecureNoverify()) {
637-
if (!tlsStream.GetPeerCertificate()) {
638-
BOOST_THROW_EXCEPTION(std::runtime_error("Elasticsearch didn't present any TLS certificate."));
639-
}
640-
641-
if (!tlsStream.IsVerifyOK()) {
642-
BOOST_THROW_EXCEPTION(std::runtime_error(
643-
"TLS certificate validation failed: " + std::string(tlsStream.GetVerifyError())
644-
));
645-
}
646-
}
647-
}
648-
649-
return stream;
650-
}
651-
652567
void ElasticsearchWriter::AssertOnWorkQueue()
653568
{
654569
ASSERT(m_WorkQueue.IsWorkerThread());

0 commit comments

Comments
 (0)