Skip to content

Commit cb64ef1

Browse files
(WIP) Abstract out connection-handling for Perfdata-Writers
1 parent d545b74 commit cb64ef1

24 files changed

Lines changed: 827 additions & 601 deletions

lib/base/i2-base.hpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,5 +75,10 @@
7575
#define BOOST_BIND_NO_PLACEHOLDERS
7676

7777
#include <functional>
78+
#include <chrono>
79+
80+
namespace icinga {
81+
using namespace std::chrono_literals;
82+
} // namespace icinga
7883

7984
#endif /* I2BASE_H */

lib/perfdata/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ set(perfdata_SOURCES
1818
influxdb2writer.cpp influxdb2writer.hpp influxdb2writer-ti.hpp
1919
opentsdbwriter.cpp opentsdbwriter.hpp opentsdbwriter-ti.hpp
2020
perfdatawriter.cpp perfdatawriter.hpp perfdatawriter-ti.hpp
21+
perfdatabackendconnection.cpp
2122
)
2223

2324
if(ICINGA2_UNITY_BUILD)

lib/perfdata/elasticsearchwriter.cpp

Lines changed: 28 additions & 103 deletions
Original file line numberDiff line numberDiff line change
@@ -91,10 +91,25 @@ void ElasticsearchWriter::Resume()
9191
/* Setup timer for periodically flushing m_DataBuffer */
9292
m_FlushTimer = Timer::Create();
9393
m_FlushTimer->SetInterval(GetFlushInterval());
94-
m_FlushTimer->OnTimerExpired.connect([this](const Timer * const&) { FlushTimeout(); });
94+
m_FlushTimer->OnTimerExpired.connect([this](const Timer* const&) {
95+
m_WorkQueue.Enqueue([&]() { FlushTimeout(); });
96+
});
9597
m_FlushTimer->Start();
9698
m_FlushTimer->Reschedule(0);
9799

100+
Shared<boost::asio::ssl::context>::Ptr sslContext;
101+
if (GetEnableTls()) {
102+
try {
103+
sslContext = MakeAsioSslContext(GetCertPath(), GetKeyPath(), GetCaPath());
104+
} catch (const std::exception& ex) {
105+
Log(LogWarning, GetReflectionType()->GetName())
106+
<< "Unable to create SSL context.";
107+
throw;
108+
}
109+
}
110+
111+
m_Connection = new PerfdataWriterConnection{GetHost(), GetPort(), sslContext};
112+
98113
/* Register for new metrics. */
99114
m_HandleCheckResults = Checkable::OnNewCheckResult.connect([this](const Checkable::Ptr& checkable,
100115
const CheckResult::Ptr& cr, const MessageOrigin::Ptr&) {
@@ -118,6 +133,10 @@ void ElasticsearchWriter::Pause()
118133
m_HandleStateChanges.disconnect();
119134
m_HandleNotifications.disconnect();
120135

136+
m_Connection->StartDisconnectTimeout(
137+
std::chrono::milliseconds{static_cast<unsigned>(GetDisconnectTimeout() * 1000)}
138+
);
139+
121140
m_FlushTimer->Stop(true);
122141
m_WorkQueue.Join();
123142

@@ -465,22 +484,6 @@ void ElasticsearchWriter::SendRequest(const String& body)
465484

466485
url->SetPath(path);
467486

468-
OptionalTlsStream stream;
469-
470-
try {
471-
stream = Connect();
472-
} catch (const std::exception& ex) {
473-
Log(LogWarning, "ElasticsearchWriter")
474-
<< "Flush failed, cannot connect to Elasticsearch: " << DiagnosticInformation(ex, false);
475-
return;
476-
}
477-
478-
Defer s ([&stream]() {
479-
if (stream.first) {
480-
stream.first->next_layer().shutdown();
481-
}
482-
});
483-
484487
http::request<http::string_body> request (http::verb::post, std::string(url->Format(true)), 10);
485488

486489
request.set(http::field::user_agent, "Icinga/" + Application::GetAppVersion());
@@ -510,37 +513,19 @@ void ElasticsearchWriter::SendRequest(const String& body)
510513
<< "Sending " << request.method_string() << " request" << ((!username.IsEmpty() && !password.IsEmpty()) ? " with basic auth" : "" )
511514
<< " to '" << url->Format() << "'.";
512515

516+
decltype(m_Connection->Send(request)) response;
513517
try {
514-
if (stream.first) {
515-
http::write(*stream.first, request);
516-
stream.first->flush();
517-
} else {
518-
http::write(*stream.second, request);
519-
stream.second->flush();
518+
response = m_Connection->Send(request);
519+
} catch (const std::exception& ex) {
520+
if (const auto* se = dynamic_cast<const boost::system::system_error*>(&ex);
521+
se->code() == boost::system::errc::operation_canceled) {
522+
Log(LogDebug, "ElasticsearchWriter") << "Operation Cancelled.";
523+
return;
520524
}
521-
} catch (const std::exception&) {
522-
Log(LogWarning, "ElasticsearchWriter")
523-
<< "Cannot write to HTTP API on host '" << GetHost() << "' port '" << GetPort() << "'.";
524-
throw;
525-
}
526-
527-
http::parser<false, http::string_body> parser;
528-
beast::flat_buffer buf;
529525

530-
try {
531-
if (stream.first) {
532-
http::read(*stream.first, buf, parser);
533-
} else {
534-
http::read(*stream.second, buf, parser);
535-
}
536-
} catch (const std::exception& ex) {
537-
Log(LogWarning, "ElasticsearchWriter")
538-
<< "Failed to parse HTTP response from host '" << GetHost() << "' port '" << GetPort() << "': " << DiagnosticInformation(ex, false);
539-
throw;
526+
Log(LogCritical, "ElasticsearchWriter") << "Error sending Request: " << ex.what();
540527
}
541528

542-
auto& response (parser.get());
543-
544529
if (response.result_int() > 299) {
545530
if (response.result() == http::status::unauthorized) {
546531
/* More verbose error logging with Elasticsearch is hidden behind a proxy. */
@@ -588,66 +573,6 @@ void ElasticsearchWriter::SendRequest(const String& body)
588573
}
589574
}
590575

591-
OptionalTlsStream ElasticsearchWriter::Connect()
592-
{
593-
Log(LogNotice, "ElasticsearchWriter")
594-
<< "Connecting to Elasticsearch on host '" << GetHost() << "' port '" << GetPort() << "'.";
595-
596-
OptionalTlsStream stream;
597-
bool tls = GetEnableTls();
598-
599-
if (tls) {
600-
Shared<boost::asio::ssl::context>::Ptr sslContext;
601-
602-
try {
603-
sslContext = MakeAsioSslContext(GetCertPath(), GetKeyPath(), GetCaPath());
604-
} catch (const std::exception&) {
605-
Log(LogWarning, "ElasticsearchWriter")
606-
<< "Unable to create SSL context.";
607-
throw;
608-
}
609-
610-
stream.first = Shared<AsioTlsStream>::Make(IoEngine::Get().GetIoContext(), *sslContext, GetHost());
611-
612-
} else {
613-
stream.second = Shared<AsioTcpStream>::Make(IoEngine::Get().GetIoContext());
614-
}
615-
616-
try {
617-
icinga::Connect(tls ? stream.first->lowest_layer() : stream.second->lowest_layer(), GetHost(), GetPort());
618-
} catch (const std::exception&) {
619-
Log(LogWarning, "ElasticsearchWriter")
620-
<< "Can't connect to Elasticsearch on host '" << GetHost() << "' port '" << GetPort() << "'.";
621-
throw;
622-
}
623-
624-
if (tls) {
625-
auto& tlsStream (stream.first->next_layer());
626-
627-
try {
628-
tlsStream.handshake(tlsStream.client);
629-
} catch (const std::exception&) {
630-
Log(LogWarning, "ElasticsearchWriter")
631-
<< "TLS handshake with host '" << GetHost() << "' on port " << GetPort() << " failed.";
632-
throw;
633-
}
634-
635-
if (!GetInsecureNoverify()) {
636-
if (!tlsStream.GetPeerCertificate()) {
637-
BOOST_THROW_EXCEPTION(std::runtime_error("Elasticsearch didn't present any TLS certificate."));
638-
}
639-
640-
if (!tlsStream.IsVerifyOK()) {
641-
BOOST_THROW_EXCEPTION(std::runtime_error(
642-
"TLS certificate validation failed: " + std::string(tlsStream.GetVerifyError())
643-
));
644-
}
645-
}
646-
}
647-
648-
return stream;
649-
}
650-
651576
void ElasticsearchWriter::AssertOnWorkQueue()
652577
{
653578
ASSERT(m_WorkQueue.IsWorkerThread());

lib/perfdata/elasticsearchwriter.hpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
#include "base/workqueue.hpp"
1010
#include "base/timer.hpp"
1111
#include "base/tlsstream.hpp"
12+
#include "perfdata/perfdatabackendconnection.hpp"
1213

1314
namespace icinga
1415
{
@@ -39,6 +40,8 @@ class ElasticsearchWriter final : public ObjectImpl<ElasticsearchWriter>
3940
std::vector<String> m_DataBuffer;
4041
std::mutex m_DataBufferMutex;
4142

43+
PerfdataWriterConnection::Ptr m_Connection;
44+
4245
void AddCheckResult(const Dictionary::Ptr& fields, const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
4346
void AddTemplateTags(const Dictionary::Ptr& fields, const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
4447

@@ -50,7 +53,6 @@ class ElasticsearchWriter final : public ObjectImpl<ElasticsearchWriter>
5053
void Enqueue(const Checkable::Ptr& checkable, const String& type,
5154
const Dictionary::Ptr& fields, double ts);
5255

53-
OptionalTlsStream Connect();
5456
void AssertOnWorkQueue();
5557
void ExceptionHandler(boost::exception_ptr exp);
5658
void FlushTimeout();

lib/perfdata/elasticsearchwriter.ti

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,10 @@ class ElasticsearchWriter : ConfigObject
3939
[config] String cert_path;
4040
[config] String key_path;
4141

42-
[config] int flush_interval {
42+
[config] double disconnect_timeout {
43+
default {{{ return 0.5; }}}
44+
};
45+
[config] double flush_interval {
4346
default {{{ return 10; }}}
4447
};
4548
[config] int flush_threshold {

0 commit comments

Comments
 (0)