Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions doc/09-object-types.md
Original file line number Diff line number Diff line change
Expand Up @@ -1317,6 +1317,8 @@ Configuration Attributes:
diconnect\_timeout | Duration | **Optional.** Timeout to wait for any outstanding data to be flushed to GELF before disconnecting. Defaults to `10s`.
source | String | **Optional.** Source name for this instance. Defaults to `icinga2`.
enable\_send\_perfdata | Boolean | **Optional.** Enable performance data for 'CHECK RESULT' events.
flush\_interval | Duration | **Optional.** How long to buffer data points before sending. Defaults to `15s`.
flush\_threshold | Number | **Optional.** How many bytes to buffer before forcing a flush to the backend. Defaults to `2MiB`.
enable\_ha | Boolean | **Optional.** Enable the high availability functionality. Only valid in a [cluster setup](06-distributed-monitoring.md#distributed-monitoring-high-availability-features). Defaults to `false`.
enable\_tls | Boolean | **Optional.** Whether to use a TLS stream. Defaults to `false`.
insecure\_noverify | Boolean | **Optional.** Disable TLS peer verification.
Expand Down Expand Up @@ -1350,6 +1352,8 @@ Configuration Attributes:
service\_name\_template | String | **Optional.** Metric prefix for service name. Defaults to `icinga2.$host.name$.services.$service.name$.$service.check_command$`.
enable\_send\_thresholds | Boolean | **Optional.** Send additional threshold metrics. Defaults to `false`.
enable\_send\_metadata | Boolean | **Optional.** Send additional metadata metrics. Defaults to `false`.
flush\_interval | Duration | **Optional.** How long to buffer data points before sending. Defaults to `15s`.
flush\_threshold | Number | **Optional.** How many bytes to buffer before forcing a flush to the backend. Defaults to `2MiB`.
enable\_ha | Boolean | **Optional.** Enable the high availability functionality. Only valid in a [cluster setup](06-distributed-monitoring.md#distributed-monitoring-high-availability-features). Defaults to `false`.

Additional usage examples can be found [here](14-features.md#graphite-carbon-cache-writer).
Expand Down Expand Up @@ -1865,6 +1869,8 @@ Configuration Attributes:
--------------------------|-----------------------|----------------------------------
host | String | **Optional.** OpenTSDB host address. Defaults to `127.0.0.1`.
port | Number | **Optional.** OpenTSDB port. Defaults to `4242`.
flush\_interval | Duration | **Optional.** How long to buffer data points before sending. Defaults to `15s`.
flush\_threshold | Number | **Optional.** How many bytes to buffer before forcing a flush to the backend. Defaults to `2MiB`.
diconnect\_timeout | Duration | **Optional.** Timeout to wait for any outstanding data to be flushed to OpenTSDB before disconnecting. Defaults to `10s`.
enable\_ha | Boolean | **Optional.** Enable the high availability functionality. Only valid in a [cluster setup](06-distributed-monitoring.md#distributed-monitoring-high-availability-features). Defaults to `false`.
enable_generic_metrics | Boolean | **Optional.** Re-use metric names to store different perfdata values for a particular check. Use tags to distinguish perfdata instead of metric name. Defaults to `false`.
Expand Down
47 changes: 38 additions & 9 deletions lib/perfdata/gelfwriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// SPDX-License-Identifier: GPL-2.0-or-later

#include "perfdata/gelfwriter.hpp"
#include "base/defer.hpp"
#include "perfdata/gelfwriter-ti.cpp"
#include "icinga/service.hpp"
#include "icinga/notification.hpp"
Expand Down Expand Up @@ -90,6 +91,13 @@ void GelfWriter::Resume()
/* Register exception handler for WQ tasks. */
m_WorkQueue.SetExceptionCallback([this](boost::exception_ptr exp) { ExceptionHandler(std::move(exp)); });

/* Setup timer for periodically flushing m_DataBuffer */
m_FlushTimer = Timer::Create();
m_FlushTimer->SetInterval(GetFlushInterval());
m_FlushTimer->OnTimerExpired.connect([this](const Timer * const&) { FlushTimeout(); });
m_FlushTimer->Start();
m_FlushTimer->Reschedule(0);

m_Connection = new PerfdataWriterConnection{this, GetHost(), GetPort(), m_SslContext, !GetInsecureNoverify()};

/* Register event handlers. */
Expand All @@ -115,6 +123,8 @@ void GelfWriter::Pause()
m_HandleNotifications.disconnect();
m_HandleStateChanges.disconnect();

m_FlushTimer->Stop(true);

std::promise<void> queueDonePromise;

m_WorkQueue.Enqueue([&]() {
Expand Down Expand Up @@ -360,19 +370,38 @@ void GelfWriter::SendLogMessage(const Checkable::Ptr& checkable, const String& g
{
AssertOnWorkQueue();

std::ostringstream msgbuf;
msgbuf << gelfMessage;
msgbuf << '\0';
Log(LogDebug, "GelfWriter")
<< "Checkable '" << checkable->GetName() << "' sending message '" << gelfMessage << "'.";

auto log = msgbuf.str();
m_MsgBuf.GetData().reserve(m_MsgBuf.GetLength() + gelfMessage.GetLength() + 1);
m_MsgBuf += gelfMessage;
m_MsgBuf += '\0';

try {
Log(LogDebug, "GelfWriter")
<< "Checkable '" << checkable->GetName() << "' sending message '" << log << "'.";
if (GetFlushThreshold() <= m_MsgBuf.GetLength()) {
Flush();
}
}

m_Connection->Send(boost::asio::const_buffer{log.data(), log.length()});
/**
* Queues a Flush on the work-queue if none is queued yet.
*/
void GelfWriter::FlushTimeout()
{
if (m_FlushTimerInQueue.exchange(true, std::memory_order_relaxed)) {
return;
}

m_WorkQueue.Enqueue([&]() {
Defer resetFlushTimer{[&]() { m_FlushTimerInQueue.store(false, std::memory_order_relaxed); }};
Flush();
});
}

void GelfWriter::Flush()
{
try {
m_Connection->Send(boost::asio::buffer(std::exchange(m_MsgBuf.GetData(), {})));
} catch (const PerfdataWriterConnection::Stopped& ex) {
Log(LogDebug, "GelfWriter") << ex.what();
return;
}
}
5 changes: 5 additions & 0 deletions lib/perfdata/gelfwriter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ class GelfWriter final : public ObjectImpl<GelfWriter>

private:
PerfdataWriterConnection::Ptr m_Connection;
Timer::Ptr m_FlushTimer;
std::atomic_bool m_FlushTimerInQueue{false};
String m_MsgBuf;
WorkQueue m_WorkQueue{10000000, 1};
Shared<boost::asio::ssl::context>::Ptr m_SslContext;

Expand All @@ -46,6 +49,8 @@ class GelfWriter final : public ObjectImpl<GelfWriter>

String ComposeGelfMessage(const Dictionary::Ptr& fields, const String& source, double ts);
void SendLogMessage(const Checkable::Ptr& checkable, const String& gelfMessage);
void FlushTimeout();
void Flush();

void AssertOnWorkQueue();

Expand Down
6 changes: 6 additions & 0 deletions lib/perfdata/gelfwriter.ti
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@ class GelfWriter : ConfigObject
[config] bool enable_send_perfdata {
default {{{ return false; }}}
};
[config] int flush_interval {
default {{{ return 15; }}}
};
[config] std::size_t flush_threshold {
default {{{ return 2 * 1024 * 1024; }}}
};

[config] double disconnect_timeout {
default {{{ return 10; }}}
Expand Down
55 changes: 44 additions & 11 deletions lib/perfdata/graphitewriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// SPDX-License-Identifier: GPL-2.0-or-later

#include "perfdata/graphitewriter.hpp"
#include "base/defer.hpp"
#include "perfdata/graphitewriter-ti.cpp"
#include "icinga/service.hpp"
#include "icinga/checkcommand.hpp"
Expand Down Expand Up @@ -85,6 +86,13 @@ void GraphiteWriter::Resume()
/* Register exception handler for WQ tasks. */
m_WorkQueue.SetExceptionCallback([this](boost::exception_ptr exp) { ExceptionHandler(std::move(exp)); });

/* Setup timer for periodically flushing m_DataBuffer */
m_FlushTimer = Timer::Create();
m_FlushTimer->SetInterval(GetFlushInterval());
m_FlushTimer->OnTimerExpired.connect([this](const Timer * const&) { FlushTimeout(); });
m_FlushTimer->Start();
m_FlushTimer->Reschedule(0);

m_Connection = new PerfdataWriterConnection{this, GetHost(), GetPort()};

/* Register event handlers. */
Expand All @@ -101,6 +109,8 @@ void GraphiteWriter::Pause()
{
m_HandleCheckResults.disconnect();

m_FlushTimer->Stop(true);

std::promise<void> queueDonePromise;

m_WorkQueue.Enqueue([&]() {
Expand Down Expand Up @@ -199,10 +209,10 @@ void GraphiteWriter::CheckResultHandler(const Checkable::Ptr& checkable, const C
CONTEXT("Processing check result for '" << checkable->GetName() << "'");

for (auto& [name, val] : metadata) {
SendMetric(checkable, prefix + ".metadata", name, val, cr->GetExecutionEnd());
AddMetric(checkable, prefix + ".metadata", name, val, cr->GetExecutionEnd());
}

SendPerfdata(checkable, prefix + ".perfdata", cr);
AddPerfdata(checkable, prefix + ".perfdata", cr);
});
}

Expand All @@ -213,7 +223,7 @@ void GraphiteWriter::CheckResultHandler(const Checkable::Ptr& checkable, const C
* @param prefix Metric prefix string
* @param cr Check result including performance data
*/
void GraphiteWriter::SendPerfdata(const Checkable::Ptr& checkable, const String& prefix, const CheckResult::Ptr& cr)
void GraphiteWriter::AddPerfdata(const Checkable::Ptr& checkable, const String& prefix, const CheckResult::Ptr& cr)
{
AssertOnWorkQueue();

Expand Down Expand Up @@ -245,17 +255,17 @@ void GraphiteWriter::SendPerfdata(const Checkable::Ptr& checkable, const String&
String escapedKey = EscapeMetricLabel(pdv->GetLabel());
double ts = cr->GetExecutionEnd();

SendMetric(checkable, prefix, escapedKey + ".value", pdv->GetValue(), ts);
AddMetric(checkable, prefix, escapedKey + ".value", pdv->GetValue(), ts);

if (GetEnableSendThresholds()) {
if (!pdv->GetCrit().IsEmpty())
SendMetric(checkable, prefix, escapedKey + ".crit", pdv->GetCrit(), ts);
AddMetric(checkable, prefix, escapedKey + ".crit", pdv->GetCrit(), ts);
if (!pdv->GetWarn().IsEmpty())
SendMetric(checkable, prefix, escapedKey + ".warn", pdv->GetWarn(), ts);
AddMetric(checkable, prefix, escapedKey + ".warn", pdv->GetWarn(), ts);
if (!pdv->GetMin().IsEmpty())
SendMetric(checkable, prefix, escapedKey + ".min", pdv->GetMin(), ts);
AddMetric(checkable, prefix, escapedKey + ".min", pdv->GetMin(), ts);
if (!pdv->GetMax().IsEmpty())
SendMetric(checkable, prefix, escapedKey + ".max", pdv->GetMax(), ts);
AddMetric(checkable, prefix, escapedKey + ".max", pdv->GetMax(), ts);
}
}
}
Expand All @@ -269,7 +279,7 @@ void GraphiteWriter::SendPerfdata(const Checkable::Ptr& checkable, const String&
* @param value Metric value
* @param ts Timestamp when the check result was created
*/
void GraphiteWriter::SendMetric(const Checkable::Ptr& checkable, const String& prefix, const String& name, double value, double ts)
void GraphiteWriter::AddMetric(const Checkable::Ptr& checkable, const String& prefix, const String& name, double value, double ts)
{
AssertOnWorkQueue();

Expand All @@ -284,11 +294,34 @@ void GraphiteWriter::SendMetric(const Checkable::Ptr& checkable, const String& p
// do not send \n to debug log
msgbuf << "\n";

m_MsgBuf += std::move(msgbuf).str();

if (GetFlushThreshold() <= m_MsgBuf.GetLength()) {
Flush();
}
}

/**
* Queues a Flush on the work-queue if none is queued yet.
*/
void GraphiteWriter::FlushTimeout()
{
if (m_FlushTimerInQueue.exchange(true, std::memory_order_relaxed)) {
return;
}

m_WorkQueue.Enqueue([&]() {
Defer resetFlushTimer{[&]() { m_FlushTimerInQueue.store(false, std::memory_order_relaxed); }};
Flush();
});
}

void GraphiteWriter::Flush()
{
try {
m_Connection->Send(asio::buffer(msgbuf.str()));
m_Connection->Send(boost::asio::buffer(std::exchange(m_MsgBuf.GetData(), {})));
} catch (const PerfdataWriterConnection::Stopped& ex) {
Log(LogDebug, "GraphiteWriter") << ex.what();
return;
}
}

Expand Down
9 changes: 7 additions & 2 deletions lib/perfdata/graphitewriter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,18 @@ class GraphiteWriter final : public ObjectImpl<GraphiteWriter>

private:
PerfdataWriterConnection::Ptr m_Connection;
Timer::Ptr m_FlushTimer;
std::atomic_bool m_FlushTimerInQueue{false};
String m_MsgBuf;
WorkQueue m_WorkQueue{10000000, 1};

boost::signals2::connection m_HandleCheckResults;

void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
void SendMetric(const Checkable::Ptr& checkable, const String& prefix, const String& name, double value, double ts);
void SendPerfdata(const Checkable::Ptr& checkable, const String& prefix, const CheckResult::Ptr& cr);
void AddMetric(const Checkable::Ptr& checkable, const String& prefix, const String& name, double value, double ts);
void AddPerfdata(const Checkable::Ptr& checkable, const String& prefix, const CheckResult::Ptr& cr);
void FlushTimeout();
void Flush();
static String EscapeMetric(const String& str);
static String EscapeMetricLabel(const String& str);
static Value EscapeMacroMetric(const Value& value);
Expand Down
10 changes: 8 additions & 2 deletions lib/perfdata/graphitewriter.ti
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,14 @@ class GraphiteWriter : ConfigObject
[config] String service_name_template {
default {{{ return "icinga2.$host.name$.services.$service.name$.$service.check_command$"; }}}
};
[config] bool enable_send_thresholds;
[config] bool enable_send_metadata;
[config] bool enable_send_thresholds;
[config] bool enable_send_metadata;
[config] int flush_interval {
default {{{ return 15; }}}
};
[config] std::size_t flush_threshold {
default {{{ return 2 * 1024 * 1024; }}}
};

[config] double disconnect_timeout {
default {{{ return 10; }}}
Expand Down
33 changes: 30 additions & 3 deletions lib/perfdata/opentsdbwriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// SPDX-License-Identifier: GPL-2.0-or-later

#include "perfdata/opentsdbwriter.hpp"
#include "base/defer.hpp"
#include "perfdata/opentsdbwriter-ti.cpp"
#include "icinga/service.hpp"
#include "icinga/checkcommand.hpp"
Expand Down Expand Up @@ -88,6 +89,13 @@ void OpenTsdbWriter::Resume()
<< "Exception during OpenTsdb operation: " << DiagnosticInformation(exp);
});

/* Setup timer for periodically flushing m_DataBuffer */
m_FlushTimer = Timer::Create();
m_FlushTimer->SetInterval(GetFlushInterval());
m_FlushTimer->OnTimerExpired.connect([this](const Timer * const&) { FlushTimeout(); });
m_FlushTimer->Start();
m_FlushTimer->Reschedule(0);

ReadConfigTemplate();

m_Connection = new PerfdataWriterConnection{this, GetHost(), GetPort()};
Expand All @@ -104,6 +112,8 @@ void OpenTsdbWriter::Pause()
{
m_HandleCheckResults.disconnect();

m_FlushTimer->Stop(true);

std::promise<void> queueDonePromise;

m_WorkQueue.Enqueue([&]() {
Expand Down Expand Up @@ -282,7 +292,9 @@ void OpenTsdbWriter::CheckResultHandler(const Checkable::Ptr& checkable, const C
AddMetric(checkable, metric + ".latency", tags, cr->CalculateLatency(), ts);
AddMetric(checkable, metric + ".execution_time", tags, cr->CalculateExecutionTime(), ts);

SendMsgBuffer();
if (GetFlushThreshold() <= m_MsgBuf.GetLength()) {
SendMsgBuffer();
}
}
);
}
Expand Down Expand Up @@ -387,7 +399,22 @@ void OpenTsdbWriter::AddMetric(const Checkable::Ptr& checkable, const String& me

/* do not send \n to debug log */
msgbuf << "\n";
m_MsgBuf.append(msgbuf.str());
m_MsgBuf += msgbuf.str();
}

/**
* Queues a Flush on the work-queue if none is queued yet.
*/
void OpenTsdbWriter::FlushTimeout()
{
if (m_FlushTimerInQueue.exchange(true, std::memory_order_relaxed)) {
return;
}

m_WorkQueue.Enqueue([&]() {
Defer resetFlushTimer{[&]() { m_FlushTimerInQueue.store(false, std::memory_order_relaxed); }};
SendMsgBuffer();
});
}

void OpenTsdbWriter::SendMsgBuffer()
Expand All @@ -398,7 +425,7 @@ void OpenTsdbWriter::SendMsgBuffer()
<< "Flushing data buffer to OpenTsdb.";

try {
m_Connection->Send(boost::asio::buffer(std::exchange(m_MsgBuf, std::string{})));
m_Connection->Send(boost::asio::buffer(std::exchange(m_MsgBuf.GetData(), {})));
} catch (const PerfdataWriterConnection::Stopped& ex) {
Log(LogDebug, "OpenTsdbWriter") << ex.what();
return;
Expand Down
5 changes: 4 additions & 1 deletion lib/perfdata/opentsdbwriter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ class OpenTsdbWriter final : public ObjectImpl<OpenTsdbWriter>

private:
WorkQueue m_WorkQueue{10000000, 1};
std::string m_MsgBuf;
Timer::Ptr m_FlushTimer;
std::atomic_bool m_FlushTimerInQueue{false};
String m_MsgBuf;
PerfdataWriterConnection::Ptr m_Connection;

boost::signals2::connection m_HandleCheckResults;
Expand All @@ -46,6 +48,7 @@ class OpenTsdbWriter final : public ObjectImpl<OpenTsdbWriter>
void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
void AddMetric(const Checkable::Ptr& checkable, const String& metric,
const std::map<String, String>& tags, double value, double ts);
void FlushTimeout();
void SendMsgBuffer();
void AddPerfdata(const Checkable::Ptr& checkable, const String& metric,
const std::map<String, String>& tags, const CheckResult::Ptr& cr, double ts);
Expand Down
Loading
Loading