Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions doc/09-object-types.md
Original file line number Diff line number Diff line change
Expand Up @@ -1214,6 +1214,7 @@ Configuration Attributes:
port | Number | **Required.** Elasticsearch port. Defaults to `9200`.
index | String | **Required.** Prefix for the index names. Defaults to `icinga2`.
enable\_send\_perfdata | Boolean | **Optional.** Send parsed performance data metrics for check results. Defaults to `false`.
diconnect\_timeout | Duration | **Optional.** Timeout to wait for any outstanding data to be flushed to Elasticsearch before disconnecting. Defaults to `10s`.
flush\_interval | Duration | **Optional.** How long to buffer data points before transferring to Elasticsearch. Defaults to `10s`.
flush\_threshold | Number | **Optional.** How many data points to buffer before forcing a transfer to Elasticsearch. Defaults to `1024`.
username | String | **Optional.** Basic auth username if Elasticsearch is hidden behind an HTTP proxy.
Expand Down Expand Up @@ -1310,6 +1311,7 @@ Configuration Attributes:
--------------------------|-----------------------|----------------------------------
host | String | **Optional.** GELF receiver host address. Defaults to `127.0.0.1`.
port | Number | **Optional.** GELF receiver port. Defaults to `12201`.
diconnect\_timeout | Duration | **Optional.** Timeout to wait for any outstanding data to be flushed to GELF before disconnecting. Defaults to `10s`.
source | String | **Optional.** Source name for this instance. Defaults to `icinga2`.
enable\_send\_perfdata | Boolean | **Optional.** Enable performance data for 'CHECK RESULT' events.
enable\_ha | Boolean | **Optional.** Enable the high availability functionality. Only valid in a [cluster setup](06-distributed-monitoring.md#distributed-monitoring-high-availability-features). Defaults to `false`.
Expand Down Expand Up @@ -1340,6 +1342,7 @@ Configuration Attributes:
--------------------------|-----------------------|----------------------------------
host | String | **Optional.** Graphite Carbon host address. Defaults to `127.0.0.1`.
port | Number | **Optional.** Graphite Carbon port. Defaults to `2003`.
diconnect\_timeout | Duration | **Optional.** Timeout to wait for any outstanding data to be flushed to Graphite before disconnecting. Defaults to `10s`.
host\_name\_template | String | **Optional.** Metric prefix for host name. Defaults to `icinga2.$host.name$.host.$host.check_command$`.
service\_name\_template | String | **Optional.** Metric prefix for service name. Defaults to `icinga2.$host.name$.services.$service.name$.$service.check_command$`.
enable\_send\_thresholds | Boolean | **Optional.** Send additional threshold metrics. Defaults to `false`.
Expand Down Expand Up @@ -1682,6 +1685,7 @@ Configuration Attributes:
service\_template | Dictionary | **Required.** Service template to define the influxDB line protocol.
enable\_send\_thresholds | Boolean | **Optional.** Whether to send warn, crit, min & max tagged data.
enable\_send\_metadata | Boolean | **Optional.** Whether to send check metadata e.g. states, execution time, latency etc.
diconnect\_timeout | Duration | **Optional.** Timeout to wait for any outstanding data to be flushed to InfluxDB before disconnecting. Defaults to `10s`.
flush\_interval | Duration | **Optional.** How long to buffer data points before transferring to InfluxDB. Defaults to `10s`.
flush\_threshold | Number | **Optional.** How many data points to buffer before forcing a transfer to InfluxDB. Defaults to `1024`.
enable\_ha | Boolean | **Optional.** Enable the high availability functionality. Only valid in a [cluster setup](06-distributed-monitoring.md#distributed-monitoring-high-availability-features). Defaults to `false`.
Expand Down Expand Up @@ -1745,6 +1749,7 @@ Configuration Attributes:
service\_template | Dictionary | **Required.** Service template to define the influxDB line protocol.
enable\_send\_thresholds | Boolean | **Optional.** Whether to send warn, crit, min & max tagged data.
enable\_send\_metadata | Boolean | **Optional.** Whether to send check metadata e.g. states, execution time, latency etc.
diconnect\_timeout | Duration | **Optional.** Timeout to wait for any outstanding data to be flushed to InfluxDB before disconnecting. Defaults to `10s`.
flush\_interval | Duration | **Optional.** How long to buffer data points before transferring to InfluxDB. Defaults to `10s`.
flush\_threshold | Number | **Optional.** How many data points to buffer before forcing a transfer to InfluxDB. Defaults to `1024`.
enable\_ha | Boolean | **Optional.** Enable the high availability functionality. Only valid in a [cluster setup](06-distributed-monitoring.md#distributed-monitoring-high-availability-features). Defaults to `false`.
Expand Down Expand Up @@ -1860,6 +1865,7 @@ Configuration Attributes:
--------------------------|-----------------------|----------------------------------
host | String | **Optional.** OpenTSDB host address. Defaults to `127.0.0.1`.
port | Number | **Optional.** OpenTSDB port. Defaults to `4242`.
diconnect\_timeout | Duration | **Optional.** Timeout to wait for any outstanding data to be flushed to OpenTSDB before disconnecting. Defaults to `10s`.
enable\_ha | Boolean | **Optional.** Enable the high availability functionality. Only valid in a [cluster setup](06-distributed-monitoring.md#distributed-monitoring-high-availability-features). Defaults to `false`.
enable_generic_metrics | Boolean | **Optional.** Re-use metric names to store different perfdata values for a particular check. Use tags to distinguish perfdata instead of metric name. Defaults to `false`.
host_template | Dictionary | **Optional.** Specify additional tags to be included with host metrics. This requires a sub-dictionary named `tags`. Also specify a naming prefix by setting `metric`. More information can be found in [OpenTSDB custom tags](14-features.md#opentsdb-custom-tags) and [OpenTSDB Metric Prefix](14-features.md#opentsdb-metric-prefix). More information can be found in [OpenTSDB custom tags](14-features.md#opentsdb-custom-tags). Defaults to an `empty Dictionary`.
Expand Down
5 changes: 5 additions & 0 deletions lib/base/i2-base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,5 +76,10 @@
#define BOOST_BIND_NO_PLACEHOLDERS

#include <functional>
#include <chrono>

namespace icinga {
using namespace std::chrono_literals;
} // namespace icinga

#endif /* I2BASE_H */
1 change: 1 addition & 0 deletions lib/perfdata/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ set(perfdata_SOURCES
influxdb2writer.cpp influxdb2writer.hpp influxdb2writer-ti.hpp
opentsdbwriter.cpp opentsdbwriter.hpp opentsdbwriter-ti.hpp
perfdatawriter.cpp perfdatawriter.hpp perfdatawriter-ti.hpp
perfdatawriterconnection.cpp perfdatawriterconnection.hpp
)

if(ICINGA2_UNITY_BUILD)
Expand Down
197 changes: 56 additions & 141 deletions lib/perfdata/elasticsearchwriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,37 +2,22 @@
// SPDX-License-Identifier: GPL-2.0-or-later

#include "perfdata/elasticsearchwriter.hpp"
#include "base/defer.hpp"
#include "perfdata/elasticsearchwriter-ti.cpp"
#include "remote/url.hpp"
#include "icinga/compatutility.hpp"
#include "icinga/service.hpp"
#include "icinga/macroprocessor.hpp"
#include "icinga/checkcommand.hpp"
#include "base/application.hpp"
#include "base/defer.hpp"
#include "base/io-engine.hpp"
#include "base/tcpsocket.hpp"
#include "base/stream.hpp"
#include "base/base64.hpp"
#include "base/json.hpp"
#include "base/utility.hpp"
#include "base/networkstream.hpp"
#include "base/perfdatavalue.hpp"
#include "base/exception.hpp"
#include "base/statsfunction.hpp"
#include <boost/algorithm/string.hpp>
#include <boost/asio/ssl/context.hpp>
#include <boost/beast/core/flat_buffer.hpp>
#include <boost/beast/http/field.hpp>
#include <boost/beast/http/message.hpp>
#include <boost/beast/http/parser.hpp>
#include <boost/beast/http/read.hpp>
#include <boost/beast/http/status.hpp>
#include <boost/beast/http/string_body.hpp>
#include <boost/beast/http/verb.hpp>
#include <boost/beast/http/write.hpp>
#include <boost/scoped_array.hpp>
#include <memory>
#include <string>
#include <utility>

Expand Down Expand Up @@ -78,12 +63,25 @@ void ElasticsearchWriter::StatsFunc(const Dictionary::Ptr& status, const Array::
status->Set("elasticsearchwriter", new Dictionary(std::move(nodes)));
}

void ElasticsearchWriter::Start(bool runtimeCreated)
{
ObjectImpl::Start(runtimeCreated);

if (GetEnableTls()) {
try {
m_SslContext = MakeAsioSslContext(GetCertPath(), GetKeyPath(), GetCaPath());
} catch (const std::exception& ex) {
Log(LogCritical, "ElasticsearchWriter")
<< "Unable to create SSL context: " << ex.what();
throw;
}
}
}

void ElasticsearchWriter::Resume()
{
ObjectImpl<ElasticsearchWriter>::Resume();

m_EventPrefix = "icinga2.event.";

Log(LogInformation, "ElasticsearchWriter")
<< "'" << GetName() << "' resumed.";

Expand All @@ -96,6 +94,8 @@ void ElasticsearchWriter::Resume()
m_FlushTimer->Start();
m_FlushTimer->Reschedule(0);

m_Connection = new PerfdataWriterConnection{this, GetHost(), GetPort(), m_SslContext, !GetInsecureNoverify()};

/* Register for new metrics. */
m_HandleCheckResults = Checkable::OnNewCheckResult.connect([this](const Checkable::Ptr& checkable,
const CheckResult::Ptr& cr, const MessageOrigin::Ptr&) {
Expand All @@ -120,12 +120,17 @@ void ElasticsearchWriter::Pause()
m_HandleNotifications.disconnect();

m_FlushTimer->Stop(true);
m_WorkQueue.Join();

{
std::unique_lock<std::mutex> lock (m_DataBufferMutex);
std::promise<void> queueDonePromise;
m_WorkQueue.Enqueue([&]() {
Flush();
}
queueDonePromise.set_value();
}, PriorityLow);

auto timeout = std::chrono::duration<double>{GetDisconnectTimeout()};
m_Connection->CancelAfterTimeout(queueDonePromise.get_future(), timeout);

m_WorkQueue.Join();

Log(LogInformation, "ElasticsearchWriter")
<< "'" << GetName() << "' paused.";
Expand Down Expand Up @@ -269,6 +274,10 @@ void ElasticsearchWriter::CheckResultHandler(const Checkable::Ptr& checkable, co
AddTemplateTags(fields, checkable, cr);

m_WorkQueue.Enqueue([this, checkable, cr, fields = std::move(fields)]() {
if (m_Connection->IsStopped()) {
return;
}

CONTEXT("Elasticwriter processing check result for '" << checkable->GetName() << "'");

AddCheckResult(fields, checkable, cr);
Expand Down Expand Up @@ -308,6 +317,10 @@ void ElasticsearchWriter::StateChangeHandler(const Checkable::Ptr& checkable, co
AddTemplateTags(fields, checkable, cr);

m_WorkQueue.Enqueue([this, checkable, cr, fields = std::move(fields)]() {
if (m_Connection->IsStopped()) {
return;
}

CONTEXT("Elasticwriter processing state change '" << checkable->GetName() << "'");

AddCheckResult(fields, checkable, cr);
Expand Down Expand Up @@ -358,6 +371,10 @@ void ElasticsearchWriter::NotificationSentToAllUsersHandler(const Checkable::Ptr
AddTemplateTags(fields, checkable, cr);

m_WorkQueue.Enqueue([this, checkable, cr, fields = std::move(fields)]() {
if (m_Connection->IsStopped()) {
return;
}

CONTEXT("Elasticwriter processing notification to all users '" << checkable->GetName() << "'");

Log(LogDebug, "ElasticsearchWriter")
Expand All @@ -379,15 +396,10 @@ void ElasticsearchWriter::Enqueue(const Checkable::Ptr& checkable, const String&
{
AssertOnWorkQueue();

/* Atomically buffer the data point. */
std::unique_lock<std::mutex> lock(m_DataBufferMutex);

/* Format the timestamps to dynamically select the date datatype inside the index. */
fields->Set("@timestamp", FormatTimestamp(ts));
fields->Set("timestamp", FormatTimestamp(ts));

String eventType = m_EventPrefix + type;
fields->Set("type", eventType);
fields->Set("type", "icinga2.event." + type);

/* Every payload needs a line describing the index.
* We do it this way to avoid problems with a near full queue.
Expand All @@ -408,19 +420,21 @@ void ElasticsearchWriter::Enqueue(const Checkable::Ptr& checkable, const String&
}
}

/**
* Queues a Flush on the work-queue if there isn't one queued already.
*/
void ElasticsearchWriter::FlushTimeout()
{
/* Prevent new data points from being added to the array, there is a
* race condition where they could disappear.
*/
std::unique_lock<std::mutex> lock(m_DataBufferMutex);
if (m_FlushTimerInQueue.exchange(true, std::memory_order_relaxed)) {
return;
}

/* Flush if there are any data available. */
if (m_DataBuffer.size() > 0) {
Log(LogDebug, "ElasticsearchWriter")
<< "Timer expired writing " << m_DataBuffer.size() << " data points";
m_WorkQueue.Enqueue([&]() {
Defer resetFlushTimer{
[&]() { m_FlushTimerInQueue.store(false, std::memory_order_relaxed); }
};
Flush();
}
});
}

void ElasticsearchWriter::Flush()
Expand Down Expand Up @@ -466,22 +480,6 @@ void ElasticsearchWriter::SendRequest(const String& body)

url->SetPath(path);

OptionalTlsStream stream;

try {
stream = Connect();
} catch (const std::exception& ex) {
Log(LogWarning, "ElasticsearchWriter")
<< "Flush failed, cannot connect to Elasticsearch: " << DiagnosticInformation(ex, false);
return;
}

Defer s ([&stream]() {
if (stream.first) {
stream.first->next_layer().shutdown();
}
});

http::request<http::string_body> request (http::verb::post, std::string(url->Format(true)), 10);

request.set(http::field::user_agent, "Icinga/" + Application::GetAppVersion());
Expand Down Expand Up @@ -511,37 +509,14 @@ void ElasticsearchWriter::SendRequest(const String& body)
<< "Sending " << request.method_string() << " request" << ((!username.IsEmpty() && !password.IsEmpty()) ? " with basic auth" : "" )
<< " to '" << url->Format() << "'.";

decltype(m_Connection->Send(request)) response;
try {
if (stream.first) {
http::write(*stream.first, request);
stream.first->flush();
} else {
http::write(*stream.second, request);
stream.second->flush();
}
} catch (const std::exception&) {
Log(LogWarning, "ElasticsearchWriter")
<< "Cannot write to HTTP API on host '" << GetHost() << "' port '" << GetPort() << "'.";
throw;
}

http::parser<false, http::string_body> parser;
beast::flat_buffer buf;

try {
if (stream.first) {
http::read(*stream.first, buf, parser);
} else {
http::read(*stream.second, buf, parser);
}
} catch (const std::exception& ex) {
Log(LogWarning, "ElasticsearchWriter")
<< "Failed to parse HTTP response from host '" << GetHost() << "' port '" << GetPort() << "': " << DiagnosticInformation(ex, false);
throw;
response = m_Connection->Send(request);
} catch (const PerfdataWriterConnection::Stopped& ex) {
Log(LogDebug, "ElasticsearchWriter") << ex.what();
return;
}

auto& response (parser.get());

if (response.result_int() > 299) {
if (response.result() == http::status::unauthorized) {
/* More verbose error logging with Elasticsearch is hidden behind a proxy. */
Expand Down Expand Up @@ -589,66 +564,6 @@ void ElasticsearchWriter::SendRequest(const String& body)
}
}

OptionalTlsStream ElasticsearchWriter::Connect()
{
Log(LogNotice, "ElasticsearchWriter")
<< "Connecting to Elasticsearch on host '" << GetHost() << "' port '" << GetPort() << "'.";

OptionalTlsStream stream;
bool tls = GetEnableTls();

if (tls) {
Shared<boost::asio::ssl::context>::Ptr sslContext;

try {
sslContext = MakeAsioSslContext(GetCertPath(), GetKeyPath(), GetCaPath());
} catch (const std::exception&) {
Log(LogWarning, "ElasticsearchWriter")
<< "Unable to create SSL context.";
throw;
}

stream.first = Shared<AsioTlsStream>::Make(IoEngine::Get().GetIoContext(), *sslContext, GetHost());

} else {
stream.second = Shared<AsioTcpStream>::Make(IoEngine::Get().GetIoContext());
}

try {
icinga::Connect(tls ? stream.first->lowest_layer() : stream.second->lowest_layer(), GetHost(), GetPort());
} catch (const std::exception&) {
Log(LogWarning, "ElasticsearchWriter")
<< "Can't connect to Elasticsearch on host '" << GetHost() << "' port '" << GetPort() << "'.";
throw;
}

if (tls) {
auto& tlsStream (stream.first->next_layer());

try {
tlsStream.handshake(tlsStream.client);
} catch (const std::exception&) {
Log(LogWarning, "ElasticsearchWriter")
<< "TLS handshake with host '" << GetHost() << "' on port " << GetPort() << " failed.";
throw;
}

if (!GetInsecureNoverify()) {
if (!tlsStream.GetPeerCertificate()) {
BOOST_THROW_EXCEPTION(std::runtime_error("Elasticsearch didn't present any TLS certificate."));
}

if (!tlsStream.IsVerifyOK()) {
BOOST_THROW_EXCEPTION(std::runtime_error(
"TLS certificate validation failed: " + std::string(tlsStream.GetVerifyError())
));
}
}
}

return stream;
}

void ElasticsearchWriter::AssertOnWorkQueue()
{
ASSERT(m_WorkQueue.IsWorkerThread());
Expand Down
Loading
Loading