diff --git a/doc/09-object-types.md b/doc/09-object-types.md index 3fe29702f2d..791b2581141 100644 --- a/doc/09-object-types.md +++ b/doc/09-object-types.md @@ -1317,6 +1317,8 @@ Configuration Attributes: diconnect\_timeout | Duration | **Optional.** Timeout to wait for any outstanding data to be flushed to GELF before disconnecting. Defaults to `10s`. source | String | **Optional.** Source name for this instance. Defaults to `icinga2`. enable\_send\_perfdata | Boolean | **Optional.** Enable performance data for 'CHECK RESULT' events. + flush\_interval | Duration | **Optional.** How long to buffer data points before sending. Defaults to `15s`. + flush\_threshold | Number | **Optional.** How many bytes to buffer before forcing a flush to the backend. Defaults to `2MiB`. enable\_ha | Boolean | **Optional.** Enable the high availability functionality. Only valid in a [cluster setup](06-distributed-monitoring.md#distributed-monitoring-high-availability-features). Defaults to `false`. enable\_tls | Boolean | **Optional.** Whether to use a TLS stream. Defaults to `false`. insecure\_noverify | Boolean | **Optional.** Disable TLS peer verification. @@ -1350,6 +1352,8 @@ Configuration Attributes: service\_name\_template | String | **Optional.** Metric prefix for service name. Defaults to `icinga2.$host.name$.services.$service.name$.$service.check_command$`. enable\_send\_thresholds | Boolean | **Optional.** Send additional threshold metrics. Defaults to `false`. enable\_send\_metadata | Boolean | **Optional.** Send additional metadata metrics. Defaults to `false`. + flush\_interval | Duration | **Optional.** How long to buffer data points before sending. Defaults to `15s`. + flush\_threshold | Number | **Optional.** How many bytes to buffer before forcing a flush to the backend. Defaults to `2MiB`. enable\_ha | Boolean | **Optional.** Enable the high availability functionality. Only valid in a [cluster setup](06-distributed-monitoring.md#distributed-monitoring-high-availability-features). Defaults to `false`. Additional usage examples can be found [here](14-features.md#graphite-carbon-cache-writer). @@ -1865,6 +1869,8 @@ Configuration Attributes: --------------------------|-----------------------|---------------------------------- host | String | **Optional.** OpenTSDB host address. Defaults to `127.0.0.1`. port | Number | **Optional.** OpenTSDB port. Defaults to `4242`. + flush\_interval | Duration | **Optional.** How long to buffer data points before sending. Defaults to `15s`. + flush\_threshold | Number | **Optional.** How many bytes to buffer before forcing a flush to the backend. Defaults to `2MiB`. diconnect\_timeout | Duration | **Optional.** Timeout to wait for any outstanding data to be flushed to OpenTSDB before disconnecting. Defaults to `10s`. enable\_ha | Boolean | **Optional.** Enable the high availability functionality. Only valid in a [cluster setup](06-distributed-monitoring.md#distributed-monitoring-high-availability-features). Defaults to `false`. enable_generic_metrics | Boolean | **Optional.** Re-use metric names to store different perfdata values for a particular check. Use tags to distinguish perfdata instead of metric name. Defaults to `false`. diff --git a/lib/perfdata/gelfwriter.cpp b/lib/perfdata/gelfwriter.cpp index 6f8567f7073..0bb18dbf161 100644 --- a/lib/perfdata/gelfwriter.cpp +++ b/lib/perfdata/gelfwriter.cpp @@ -2,6 +2,7 @@ // SPDX-License-Identifier: GPL-2.0-or-later #include "perfdata/gelfwriter.hpp" +#include "base/defer.hpp" #include "perfdata/gelfwriter-ti.cpp" #include "icinga/service.hpp" #include "icinga/notification.hpp" @@ -90,6 +91,13 @@ void GelfWriter::Resume() /* Register exception handler for WQ tasks. */ m_WorkQueue.SetExceptionCallback([this](boost::exception_ptr exp) { ExceptionHandler(std::move(exp)); }); + /* Setup timer for periodically flushing m_DataBuffer */ + m_FlushTimer = Timer::Create(); + m_FlushTimer->SetInterval(GetFlushInterval()); + m_FlushTimer->OnTimerExpired.connect([this](const Timer * const&) { FlushTimeout(); }); + m_FlushTimer->Start(); + m_FlushTimer->Reschedule(0); + m_Connection = new PerfdataWriterConnection{this, GetHost(), GetPort(), m_SslContext, !GetInsecureNoverify()}; /* Register event handlers. */ @@ -115,6 +123,8 @@ void GelfWriter::Pause() m_HandleNotifications.disconnect(); m_HandleStateChanges.disconnect(); + m_FlushTimer->Stop(true); + std::promise queueDonePromise; m_WorkQueue.Enqueue([&]() { @@ -360,19 +370,38 @@ void GelfWriter::SendLogMessage(const Checkable::Ptr& checkable, const String& g { AssertOnWorkQueue(); - std::ostringstream msgbuf; - msgbuf << gelfMessage; - msgbuf << '\0'; + Log(LogDebug, "GelfWriter") + << "Checkable '" << checkable->GetName() << "' sending message '" << gelfMessage << "'."; - auto log = msgbuf.str(); + m_MsgBuf.GetData().reserve(m_MsgBuf.GetLength() + gelfMessage.GetLength() + 1); + m_MsgBuf += gelfMessage; + m_MsgBuf += '\0'; - try { - Log(LogDebug, "GelfWriter") - << "Checkable '" << checkable->GetName() << "' sending message '" << log << "'."; + if (GetFlushThreshold() <= m_MsgBuf.GetLength()) { + Flush(); + } +} - m_Connection->Send(boost::asio::const_buffer{log.data(), log.length()}); +/** + * Queues a Flush on the work-queue if none is queued yet. + */ +void GelfWriter::FlushTimeout() +{ + if (m_FlushTimerInQueue.exchange(true, std::memory_order_relaxed)) { + return; + } + + m_WorkQueue.Enqueue([&]() { + Defer resetFlushTimer{[&]() { m_FlushTimerInQueue.store(false, std::memory_order_relaxed); }}; + Flush(); + }); +} + +void GelfWriter::Flush() +{ + try { + m_Connection->Send(boost::asio::buffer(std::exchange(m_MsgBuf.GetData(), {}))); } catch (const PerfdataWriterConnection::Stopped& ex) { Log(LogDebug, "GelfWriter") << ex.what(); - return; } } diff --git a/lib/perfdata/gelfwriter.hpp b/lib/perfdata/gelfwriter.hpp index f7d2a10c339..f5f3b7d6a82 100644 --- a/lib/perfdata/gelfwriter.hpp +++ b/lib/perfdata/gelfwriter.hpp @@ -34,6 +34,9 @@ class GelfWriter final : public ObjectImpl private: PerfdataWriterConnection::Ptr m_Connection; + Timer::Ptr m_FlushTimer; + std::atomic_bool m_FlushTimerInQueue{false}; + String m_MsgBuf; WorkQueue m_WorkQueue{10000000, 1}; Shared::Ptr m_SslContext; @@ -46,6 +49,8 @@ class GelfWriter final : public ObjectImpl String ComposeGelfMessage(const Dictionary::Ptr& fields, const String& source, double ts); void SendLogMessage(const Checkable::Ptr& checkable, const String& gelfMessage); + void FlushTimeout(); + void Flush(); void AssertOnWorkQueue(); diff --git a/lib/perfdata/gelfwriter.ti b/lib/perfdata/gelfwriter.ti index 46c194d1a4a..1b3e516a9c5 100644 --- a/lib/perfdata/gelfwriter.ti +++ b/lib/perfdata/gelfwriter.ti @@ -24,6 +24,12 @@ class GelfWriter : ConfigObject [config] bool enable_send_perfdata { default {{{ return false; }}} }; + [config] int flush_interval { + default {{{ return 15; }}} + }; + [config] std::size_t flush_threshold { + default {{{ return 2 * 1024 * 1024; }}} + }; [config] double disconnect_timeout { default {{{ return 10; }}} diff --git a/lib/perfdata/graphitewriter.cpp b/lib/perfdata/graphitewriter.cpp index e00cd927589..28c684db87d 100644 --- a/lib/perfdata/graphitewriter.cpp +++ b/lib/perfdata/graphitewriter.cpp @@ -2,6 +2,7 @@ // SPDX-License-Identifier: GPL-2.0-or-later #include "perfdata/graphitewriter.hpp" +#include "base/defer.hpp" #include "perfdata/graphitewriter-ti.cpp" #include "icinga/service.hpp" #include "icinga/checkcommand.hpp" @@ -85,6 +86,13 @@ void GraphiteWriter::Resume() /* Register exception handler for WQ tasks. */ m_WorkQueue.SetExceptionCallback([this](boost::exception_ptr exp) { ExceptionHandler(std::move(exp)); }); + /* Setup timer for periodically flushing m_DataBuffer */ + m_FlushTimer = Timer::Create(); + m_FlushTimer->SetInterval(GetFlushInterval()); + m_FlushTimer->OnTimerExpired.connect([this](const Timer * const&) { FlushTimeout(); }); + m_FlushTimer->Start(); + m_FlushTimer->Reschedule(0); + m_Connection = new PerfdataWriterConnection{this, GetHost(), GetPort()}; /* Register event handlers. */ @@ -101,6 +109,8 @@ void GraphiteWriter::Pause() { m_HandleCheckResults.disconnect(); + m_FlushTimer->Stop(true); + std::promise queueDonePromise; m_WorkQueue.Enqueue([&]() { @@ -199,10 +209,10 @@ void GraphiteWriter::CheckResultHandler(const Checkable::Ptr& checkable, const C CONTEXT("Processing check result for '" << checkable->GetName() << "'"); for (auto& [name, val] : metadata) { - SendMetric(checkable, prefix + ".metadata", name, val, cr->GetExecutionEnd()); + AddMetric(checkable, prefix + ".metadata", name, val, cr->GetExecutionEnd()); } - SendPerfdata(checkable, prefix + ".perfdata", cr); + AddPerfdata(checkable, prefix + ".perfdata", cr); }); } @@ -213,7 +223,7 @@ void GraphiteWriter::CheckResultHandler(const Checkable::Ptr& checkable, const C * @param prefix Metric prefix string * @param cr Check result including performance data */ -void GraphiteWriter::SendPerfdata(const Checkable::Ptr& checkable, const String& prefix, const CheckResult::Ptr& cr) +void GraphiteWriter::AddPerfdata(const Checkable::Ptr& checkable, const String& prefix, const CheckResult::Ptr& cr) { AssertOnWorkQueue(); @@ -245,17 +255,17 @@ void GraphiteWriter::SendPerfdata(const Checkable::Ptr& checkable, const String& String escapedKey = EscapeMetricLabel(pdv->GetLabel()); double ts = cr->GetExecutionEnd(); - SendMetric(checkable, prefix, escapedKey + ".value", pdv->GetValue(), ts); + AddMetric(checkable, prefix, escapedKey + ".value", pdv->GetValue(), ts); if (GetEnableSendThresholds()) { if (!pdv->GetCrit().IsEmpty()) - SendMetric(checkable, prefix, escapedKey + ".crit", pdv->GetCrit(), ts); + AddMetric(checkable, prefix, escapedKey + ".crit", pdv->GetCrit(), ts); if (!pdv->GetWarn().IsEmpty()) - SendMetric(checkable, prefix, escapedKey + ".warn", pdv->GetWarn(), ts); + AddMetric(checkable, prefix, escapedKey + ".warn", pdv->GetWarn(), ts); if (!pdv->GetMin().IsEmpty()) - SendMetric(checkable, prefix, escapedKey + ".min", pdv->GetMin(), ts); + AddMetric(checkable, prefix, escapedKey + ".min", pdv->GetMin(), ts); if (!pdv->GetMax().IsEmpty()) - SendMetric(checkable, prefix, escapedKey + ".max", pdv->GetMax(), ts); + AddMetric(checkable, prefix, escapedKey + ".max", pdv->GetMax(), ts); } } } @@ -269,7 +279,7 @@ void GraphiteWriter::SendPerfdata(const Checkable::Ptr& checkable, const String& * @param value Metric value * @param ts Timestamp when the check result was created */ -void GraphiteWriter::SendMetric(const Checkable::Ptr& checkable, const String& prefix, const String& name, double value, double ts) +void GraphiteWriter::AddMetric(const Checkable::Ptr& checkable, const String& prefix, const String& name, double value, double ts) { AssertOnWorkQueue(); @@ -284,11 +294,34 @@ void GraphiteWriter::SendMetric(const Checkable::Ptr& checkable, const String& p // do not send \n to debug log msgbuf << "\n"; + m_MsgBuf += std::move(msgbuf).str(); + + if (GetFlushThreshold() <= m_MsgBuf.GetLength()) { + Flush(); + } +} + +/** + * Queues a Flush on the work-queue if none is queued yet. + */ +void GraphiteWriter::FlushTimeout() +{ + if (m_FlushTimerInQueue.exchange(true, std::memory_order_relaxed)) { + return; + } + + m_WorkQueue.Enqueue([&]() { + Defer resetFlushTimer{[&]() { m_FlushTimerInQueue.store(false, std::memory_order_relaxed); }}; + Flush(); + }); +} + +void GraphiteWriter::Flush() +{ try { - m_Connection->Send(asio::buffer(msgbuf.str())); + m_Connection->Send(boost::asio::buffer(std::exchange(m_MsgBuf.GetData(), {}))); } catch (const PerfdataWriterConnection::Stopped& ex) { Log(LogDebug, "GraphiteWriter") << ex.what(); - return; } } diff --git a/lib/perfdata/graphitewriter.hpp b/lib/perfdata/graphitewriter.hpp index 470fcc07dac..5f1eda9ce19 100644 --- a/lib/perfdata/graphitewriter.hpp +++ b/lib/perfdata/graphitewriter.hpp @@ -36,13 +36,18 @@ class GraphiteWriter final : public ObjectImpl private: PerfdataWriterConnection::Ptr m_Connection; + Timer::Ptr m_FlushTimer; + std::atomic_bool m_FlushTimerInQueue{false}; + String m_MsgBuf; WorkQueue m_WorkQueue{10000000, 1}; boost::signals2::connection m_HandleCheckResults; void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr); - void SendMetric(const Checkable::Ptr& checkable, const String& prefix, const String& name, double value, double ts); - void SendPerfdata(const Checkable::Ptr& checkable, const String& prefix, const CheckResult::Ptr& cr); + void AddMetric(const Checkable::Ptr& checkable, const String& prefix, const String& name, double value, double ts); + void AddPerfdata(const Checkable::Ptr& checkable, const String& prefix, const CheckResult::Ptr& cr); + void FlushTimeout(); + void Flush(); static String EscapeMetric(const String& str); static String EscapeMetricLabel(const String& str); static Value EscapeMacroMetric(const Value& value); diff --git a/lib/perfdata/graphitewriter.ti b/lib/perfdata/graphitewriter.ti index f0d9bfb8056..2cafdf38dbb 100644 --- a/lib/perfdata/graphitewriter.ti +++ b/lib/perfdata/graphitewriter.ti @@ -24,8 +24,14 @@ class GraphiteWriter : ConfigObject [config] String service_name_template { default {{{ return "icinga2.$host.name$.services.$service.name$.$service.check_command$"; }}} }; - [config] bool enable_send_thresholds; - [config] bool enable_send_metadata; + [config] bool enable_send_thresholds; + [config] bool enable_send_metadata; + [config] int flush_interval { + default {{{ return 15; }}} + }; + [config] std::size_t flush_threshold { + default {{{ return 2 * 1024 * 1024; }}} + }; [config] double disconnect_timeout { default {{{ return 10; }}} diff --git a/lib/perfdata/opentsdbwriter.cpp b/lib/perfdata/opentsdbwriter.cpp index 1b2f82a7d9c..e5ea9ea5138 100644 --- a/lib/perfdata/opentsdbwriter.cpp +++ b/lib/perfdata/opentsdbwriter.cpp @@ -2,6 +2,7 @@ // SPDX-License-Identifier: GPL-2.0-or-later #include "perfdata/opentsdbwriter.hpp" +#include "base/defer.hpp" #include "perfdata/opentsdbwriter-ti.cpp" #include "icinga/service.hpp" #include "icinga/checkcommand.hpp" @@ -88,6 +89,13 @@ void OpenTsdbWriter::Resume() << "Exception during OpenTsdb operation: " << DiagnosticInformation(exp); }); + /* Setup timer for periodically flushing m_DataBuffer */ + m_FlushTimer = Timer::Create(); + m_FlushTimer->SetInterval(GetFlushInterval()); + m_FlushTimer->OnTimerExpired.connect([this](const Timer * const&) { FlushTimeout(); }); + m_FlushTimer->Start(); + m_FlushTimer->Reschedule(0); + ReadConfigTemplate(); m_Connection = new PerfdataWriterConnection{this, GetHost(), GetPort()}; @@ -104,6 +112,8 @@ void OpenTsdbWriter::Pause() { m_HandleCheckResults.disconnect(); + m_FlushTimer->Stop(true); + std::promise queueDonePromise; m_WorkQueue.Enqueue([&]() { @@ -282,7 +292,9 @@ void OpenTsdbWriter::CheckResultHandler(const Checkable::Ptr& checkable, const C AddMetric(checkable, metric + ".latency", tags, cr->CalculateLatency(), ts); AddMetric(checkable, metric + ".execution_time", tags, cr->CalculateExecutionTime(), ts); - SendMsgBuffer(); + if (GetFlushThreshold() <= m_MsgBuf.GetLength()) { + SendMsgBuffer(); + } } ); } @@ -387,7 +399,22 @@ void OpenTsdbWriter::AddMetric(const Checkable::Ptr& checkable, const String& me /* do not send \n to debug log */ msgbuf << "\n"; - m_MsgBuf.append(msgbuf.str()); + m_MsgBuf += msgbuf.str(); +} + +/** + * Queues a Flush on the work-queue if none is queued yet. + */ +void OpenTsdbWriter::FlushTimeout() +{ + if (m_FlushTimerInQueue.exchange(true, std::memory_order_relaxed)) { + return; + } + + m_WorkQueue.Enqueue([&]() { + Defer resetFlushTimer{[&]() { m_FlushTimerInQueue.store(false, std::memory_order_relaxed); }}; + SendMsgBuffer(); + }); } void OpenTsdbWriter::SendMsgBuffer() @@ -398,7 +425,7 @@ void OpenTsdbWriter::SendMsgBuffer() << "Flushing data buffer to OpenTsdb."; try { - m_Connection->Send(boost::asio::buffer(std::exchange(m_MsgBuf, std::string{}))); + m_Connection->Send(boost::asio::buffer(std::exchange(m_MsgBuf.GetData(), {}))); } catch (const PerfdataWriterConnection::Stopped& ex) { Log(LogDebug, "OpenTsdbWriter") << ex.what(); return; diff --git a/lib/perfdata/opentsdbwriter.hpp b/lib/perfdata/opentsdbwriter.hpp index 5db2985400a..04fe0779ef1 100644 --- a/lib/perfdata/opentsdbwriter.hpp +++ b/lib/perfdata/opentsdbwriter.hpp @@ -35,7 +35,9 @@ class OpenTsdbWriter final : public ObjectImpl private: WorkQueue m_WorkQueue{10000000, 1}; - std::string m_MsgBuf; + Timer::Ptr m_FlushTimer; + std::atomic_bool m_FlushTimerInQueue{false}; + String m_MsgBuf; PerfdataWriterConnection::Ptr m_Connection; boost::signals2::connection m_HandleCheckResults; @@ -46,6 +48,7 @@ class OpenTsdbWriter final : public ObjectImpl void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr); void AddMetric(const Checkable::Ptr& checkable, const String& metric, const std::map& tags, double value, double ts); + void FlushTimeout(); void SendMsgBuffer(); void AddPerfdata(const Checkable::Ptr& checkable, const String& metric, const std::map& tags, const CheckResult::Ptr& cr, double ts); diff --git a/lib/perfdata/opentsdbwriter.ti b/lib/perfdata/opentsdbwriter.ti index dcad571682b..c0935a8874c 100644 --- a/lib/perfdata/opentsdbwriter.ti +++ b/lib/perfdata/opentsdbwriter.ti @@ -31,6 +31,12 @@ class OpenTsdbWriter : ConfigObject [config] bool enable_generic_metrics { default {{{ return false; }}} }; + [config] int flush_interval { + default {{{ return 15; }}} + }; + [config] std::size_t flush_threshold { + default {{{ return 2 * 1024 * 1024; }}} + }; [config] double disconnect_timeout { default {{{ return 10; }}} };