From 160c7127ca6df0d2b766429f7d399de8a54f2dd8 Mon Sep 17 00:00:00 2001 From: Johannes Schmidt Date: Thu, 2 Apr 2026 13:32:43 +0200 Subject: [PATCH 1/5] Read until end of file in response reading test-cases --- test/perfdata-perfdatatargetfixture.hpp | 11 ++++++++++- test/perfdata-perfdatawriterconnection.cpp | 4 ++-- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/test/perfdata-perfdatatargetfixture.hpp b/test/perfdata-perfdatatargetfixture.hpp index bac1c504de9..8c2a381bcf8 100644 --- a/test/perfdata-perfdatatargetfixture.hpp +++ b/test/perfdata-perfdatatargetfixture.hpp @@ -71,11 +71,20 @@ class PerfdataWriterTargetFixture BOOST_REQUIRE(stream->next_layer().IsVerifyOK()); } - void Shutdown() + void Shutdown(bool wait = false) { BOOST_REQUIRE(std::holds_alternative::Ptr>(m_Stream)); auto& stream = std::get::Ptr>(m_Stream); try { + if (wait) { + std::array buf{}; + boost::asio::mutable_buffer readBuf (buf.data(), buf.size()); + boost::system::error_code ec; + + do { + stream->read_some(readBuf, ec); + } while (!ec); + } stream->next_layer().shutdown(); } catch (const std::exception& ex) { if (const auto* se = dynamic_cast(&ex); diff --git a/test/perfdata-perfdatawriterconnection.cpp b/test/perfdata-perfdatawriterconnection.cpp index 16ed299a947..0f2435198d9 100644 --- a/test/perfdata-perfdatawriterconnection.cpp +++ b/test/perfdata-perfdatawriterconnection.cpp @@ -207,7 +207,7 @@ BOOST_AUTO_TEST_CASE(stuck_reading_response) requestReadPromise.set_value(); // Do not send a response but react to the shutdown to be polite. shutdownPromise.get_future().get(); - Shutdown(); + Shutdown(true); }}; TestThread timeoutThread{[&]() { @@ -315,7 +315,7 @@ BOOST_AUTO_TEST_CASE(http_send_retry) SendResponse(); - Shutdown(); + Shutdown(true); }}; boost::beast::http::request request{boost::beast::http::verb::post, "foo", 10}; From aecb3fe7d5a9b421b6b140aafee9b565993d3e47 Mon Sep 17 00:00:00 2001 From: Johannes Schmidt Date: Wed, 22 Apr 2026 11:38:29 +0200 Subject: [PATCH 2/5] Fix a race-condition when perfdata writer is stuck in handshake The issue occurs when ::Connect in `EnsureConnected()` returns after `Disconnect()` has already set `m_Stopped` to true. By adding a check and throwing an exception before entering `async_handshake()` the behavior should now always be consistent. --- lib/perfdata/perfdatawriterconnection.cpp | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/lib/perfdata/perfdatawriterconnection.cpp b/lib/perfdata/perfdatawriterconnection.cpp index 46000c28f47..e8a56e89980 100644 --- a/lib/perfdata/perfdatawriterconnection.cpp +++ b/lib/perfdata/perfdatawriterconnection.cpp @@ -62,7 +62,7 @@ bool PerfdataWriterConnection::IsStopped() const void PerfdataWriterConnection::Disconnect() { - if (m_Stopped.exchange(true, std::memory_order_relaxed)) { + if (m_Stopped.exchange(true)) { return; } @@ -133,6 +133,10 @@ void PerfdataWriterConnection::EnsureConnected(const boost::asio::yield_context& ::Connect(stream->lowest_layer(), m_Host, m_Port, yc); if constexpr (std::is_same_v, Shared::Ptr>) { + if (m_Stopped) { + BOOST_THROW_EXCEPTION(Stopped{}); + } + using type = boost::asio::ssl::stream_base::handshake_type; stream->next_layer().async_handshake(type::client, yc); From 864c79ef3b649c993e7cff0049e728d0cb1627c3 Mon Sep 17 00:00:00 2001 From: Johannes Schmidt Date: Wed, 22 Apr 2026 11:42:31 +0200 Subject: [PATCH 3/5] Fix ineffective cancel() when stuck in perfdata writer handshake --- lib/perfdata/perfdatawriterconnection.cpp | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/lib/perfdata/perfdatawriterconnection.cpp b/lib/perfdata/perfdatawriterconnection.cpp index e8a56e89980..c8cc80e9dbe 100644 --- a/lib/perfdata/perfdatawriterconnection.cpp +++ b/lib/perfdata/perfdatawriterconnection.cpp @@ -76,9 +76,13 @@ void PerfdataWriterConnection::Disconnect() * completion. */ std::visit( - [](const auto& stream) { + [&](const auto& stream) { if (stream->lowest_layer().is_open()) { - stream->lowest_layer().cancel(); + if (m_Connected) { + stream->lowest_layer().cancel(); + } else { + stream->lowest_layer().close(); + } } }, m_Stream @@ -160,7 +164,7 @@ void PerfdataWriterConnection::EnsureConnected(const boost::asio::yield_context& void PerfdataWriterConnection::Disconnect(boost::asio::yield_context yc) { - if (!m_Connected.exchange(false, std::memory_order_relaxed)) { + if (!m_Connected.exchange(false)) { return; } From df5bb46a022ad5183ac0c30576a2cf60ff6beab7 Mon Sep 17 00:00:00 2001 From: Johannes Schmidt Date: Wed, 13 May 2026 11:04:19 +0200 Subject: [PATCH 4/5] Implement `icinga::Connect()` overload that returns a `std::future` --- lib/base/tcpsocket.hpp | 50 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 50 insertions(+) diff --git a/lib/base/tcpsocket.hpp b/lib/base/tcpsocket.hpp index 40a93c86044..159fe5dea03 100644 --- a/lib/base/tcpsocket.hpp +++ b/lib/base/tcpsocket.hpp @@ -10,6 +10,7 @@ #include #include #include +#include #include namespace icinga @@ -96,6 +97,55 @@ void Connect(Socket& socket, const String& node, const String& service, boost::a } } +template +std::future Connect(Socket& socket, const String& node, const String& service, const Executor& executor) +{ + using boost::asio::ip::tcp; + + auto resolver = std::make_shared(IoEngine::Get().GetIoContext()); + auto promise = std::make_shared>(); + auto future = promise->get_future(); + + resolver->async_resolve( + node.GetData(), + service.GetData(), + boost::asio::bind_executor( + executor, + [&socket, &executor, resolver, promise = std::move(promise)]( + const boost::system::error_code& ec, tcp::resolver::results_type results + ) { + if (ec) { + promise->set_exception(std::make_exception_ptr(boost::system::system_error(ec))); + return; + } + + boost::asio::async_connect( + socket, + results, + boost::asio::bind_executor( + executor, [&socket, promise](const boost::system::error_code& ec, const tcp::endpoint&) { + if (ec) { + promise->set_exception(std::make_exception_ptr(boost::system::system_error(ec))); + return; + } + + boost::system::error_code optEc; + socket.set_option(tcp::socket::keep_alive(true), optEc); + if (optEc) { + promise->set_exception(std::make_exception_ptr(boost::system::system_error(optEc))); + return; + } + + promise->set_value(); + } + ) + ); + } + ) + ); + + return future; +} } #endif /* TCPSOCKET_H */ From cd891c9acedd637f778a6c4b2b090d7b406c7d20 Mon Sep 17 00:00:00 2001 From: Johannes Schmidt Date: Wed, 13 May 2026 16:02:32 +0200 Subject: [PATCH 5/5] Replace Coroutines with `use_future` in `PerfdataWriterConnection` --- lib/perfdata/perfdatawriterconnection.cpp | 155 ++++++++++++++-------- lib/perfdata/perfdatawriterconnection.hpp | 78 +++++------ 2 files changed, 131 insertions(+), 102 deletions(-) diff --git a/lib/perfdata/perfdatawriterconnection.cpp b/lib/perfdata/perfdatawriterconnection.cpp index c8cc80e9dbe..a22adffad9c 100644 --- a/lib/perfdata/perfdatawriterconnection.cpp +++ b/lib/perfdata/perfdatawriterconnection.cpp @@ -2,8 +2,6 @@ // SPDX-License-Identifier: GPL-3.0-or-later #include "perfdata/perfdatawriterconnection.hpp" -#include "base/tcpsocket.hpp" -#include #include #include #include @@ -11,6 +9,33 @@ using namespace icinga; using HttpResponse = PerfdataWriterConnection::HttpResponse; +template +static auto WrapOp(Lock& lock, bool& stopped, const Fn& fn) +{ + auto fut = fn(); + lock.unlock(); + Defer maybeLock{[&]() { + if (!lock.owns_lock()) { + lock.lock(); + } + }}; + + if constexpr (std::is_void_v) { + fut.get(); + lock.lock(); + if (stopped) { + BOOST_THROW_EXCEPTION(PerfdataWriterConnection::Stopped{}); + } + } else { + auto ret = fut.get(); + lock.lock(); + if (stopped) { + BOOST_THROW_EXCEPTION(PerfdataWriterConnection::Stopped{}); + } + return ret; + } +} + PerfdataWriterConnection::PerfdataWriterConnection( const ConfigObject::Ptr& parent, String host, @@ -42,7 +67,6 @@ PerfdataWriterConnection::PerfdataWriterConnection( m_Host(std::move(host)), m_Port(std::move(port)), m_ReconnectTimer(IoEngine::Get().GetIoContext()), - m_Strand(IoEngine::Get().GetIoContext()), m_Stream(MakeStream()) { } @@ -62,41 +86,31 @@ bool PerfdataWriterConnection::IsStopped() const void PerfdataWriterConnection::Disconnect() { - if (m_Stopped.exchange(true)) { + std::unique_lock lock{m_Mutex}; + if (std::exchange(m_Stopped, true)) { return; } - std::promise promise; - - IoEngine::SpawnCoroutine(m_Strand, [&](boost::asio::yield_context yc) { - try { - /* Cancel any outstanding operations of the other coroutine. - * Since we're on the same strand we're hopefully guaranteed that all cancellations - * result in exceptions thrown by the yield_context, even if its already queued for - * completion. - */ - std::visit( - [&](const auto& stream) { - if (stream->lowest_layer().is_open()) { - if (m_Connected) { - stream->lowest_layer().cancel(); - } else { - stream->lowest_layer().close(); - } - } - }, - m_Stream - ); - m_ReconnectTimer.cancel(); - - Disconnect(std::move(yc)); - promise.set_value(); - } catch (const std::exception& ex) { - promise.set_exception(std::current_exception()); - } - }); + /* Cancel any outstanding operations of the other coroutine. + * Since we're on the same strand we're hopefully guaranteed that all cancellations + * result in exceptions thrown by the yield_context, even if its already queued for + * completion. + */ + std::visit( + [&](const auto& stream) { + if (stream->lowest_layer().is_open()) { + if (m_Connected) { + stream->lowest_layer().cancel(); + } else { + stream->lowest_layer().close(); + } + } + }, + m_Stream + ); + m_ReconnectTimer.cancel(); - promise.get_future().get(); + Disconnect(lock); } AsioTlsOrTcpStream PerfdataWriterConnection::MakeStream() const @@ -117,16 +131,17 @@ AsioTlsOrTcpStream PerfdataWriterConnection::MakeStream() const * The waits between retries are doubled for each failure, up to a maximum of 32s, until it is * reset by a successful attempt. */ -void PerfdataWriterConnection::BackoffWait(const boost::asio::yield_context& yc) +void PerfdataWriterConnection::BackoffWait(std::unique_lock& lock) { m_ReconnectTimer.expires_after(m_RetryTimeout); if (m_RetryTimeout <= FinalRetryWait / 2) { m_RetryTimeout *= 2; } - m_ReconnectTimer.async_wait(yc); + + WrapOp(lock, m_Stopped, [this]() { return m_ReconnectTimer.async_wait(boost::asio::use_future); }); } -void PerfdataWriterConnection::EnsureConnected(const boost::asio::yield_context& yc) +void PerfdataWriterConnection::EnsureConnected(std::unique_lock& lock) { if (m_Connected) { return; @@ -134,16 +149,23 @@ void PerfdataWriterConnection::EnsureConnected(const boost::asio::yield_context& std::visit( [&](auto& stream) { - ::Connect(stream->lowest_layer(), m_Host, m_Port, yc); + boost::asio::ip::tcp::resolver resolver(IoEngine::Get().GetIoContext()); + auto results = WrapOp(lock, m_Stopped, [&]() { + return resolver.async_resolve(m_Host.GetData(), m_Port.GetData(), boost::asio::use_future); + }); - if constexpr (std::is_same_v, Shared::Ptr>) { - if (m_Stopped) { - BOOST_THROW_EXCEPTION(Stopped{}); - } + WrapOp(lock, m_Stopped, [&]() { + return boost::asio::async_connect(stream->lowest_layer(), results, boost::asio::use_future); + }); + + stream->lowest_layer().set_option(boost::asio::ip::tcp::socket::keep_alive(true)); + if constexpr (std::is_same_v, Shared::Ptr>) { using type = boost::asio::ssl::stream_base::handshake_type; - stream->next_layer().async_handshake(type::client, yc); + WrapOp(lock, m_Stopped, [stream]() { + return stream->next_layer().async_handshake(type::client, boost::asio::use_future); + }); if (m_VerifyPeerCertificate) { if (!stream->next_layer().IsVerifyOK()) { @@ -162,20 +184,34 @@ void PerfdataWriterConnection::EnsureConnected(const boost::asio::yield_context& m_Connected = true; } -void PerfdataWriterConnection::Disconnect(boost::asio::yield_context yc) +void PerfdataWriterConnection::Disconnect(std::unique_lock& lock) { - if (!m_Connected.exchange(false)) { + if (!std::exchange(m_Connected, false)) { return; } std::visit( [&](auto& stream) { if constexpr (std::is_same_v, Shared::Ptr>) { - stream->GracefulDisconnect(m_Strand, yc); - } else { - stream->lowest_layer().shutdown(boost::asio::socket_base::shutdown_both); - stream->lowest_layer().close(); + // Timeout shutdownTimeout(m_Strand, boost::posix_time::seconds(10), [&stream] { + // stream->ForceDisconnect(); + // }); + + try { + WrapOp(lock, m_Stopped, [stream]() { + return stream->next_layer().async_shutdown(boost::asio::use_future); + }); + } catch (const std::exception&) { + } } + + if (!stream->lowest_layer().is_open()) { + return; + } + + boost::system::error_code ec; + stream->lowest_layer().shutdown(boost::asio::socket_base::shutdown_both, ec); + stream->lowest_layer().close(ec); }, m_Stream ); @@ -183,34 +219,39 @@ void PerfdataWriterConnection::Disconnect(boost::asio::yield_context yc) m_Stream = MakeStream(); } -void PerfdataWriterConnection::WriteMessage(boost::asio::const_buffer buf, const boost::asio::yield_context& yc) +void PerfdataWriterConnection::WriteMessage(boost::asio::const_buffer buf, std::unique_lock& lock) { std::visit( [&](auto& stream) { - boost::asio::async_write(*stream, buf, yc); - stream->async_flush(yc); + WrapOp(lock, m_Stopped, [&]() { return boost::asio::async_write(*stream, buf, boost::asio::use_future); }); + WrapOp(lock, m_Stopped, [&]() { return stream->async_flush(boost::asio::use_future); }); }, m_Stream ); } -HttpResponse PerfdataWriterConnection::WriteMessage(const HttpRequest& request, const boost::asio::yield_context& yc) +HttpResponse PerfdataWriterConnection::WriteMessage(const HttpRequest& request, std::unique_lock& lock) { boost::beast::http::response response; std::visit( [&](auto& stream) { boost::beast::http::request_serializer sr{request}; - boost::beast::http::async_write(*stream, sr, yc); - stream->async_flush(yc); + WrapOp(lock, m_Stopped, [&]() { + return boost::beast::http::async_write(*stream, sr, boost::asio::use_future); + }); + + WrapOp(lock, m_Stopped, [&]() { return stream->async_flush(boost::asio::use_future); }); boost::beast::flat_buffer buf; - boost::beast::http::async_read(*stream, buf, response, yc); + WrapOp(lock, m_Stopped, [&]() { + return boost::beast::http::async_read(*stream, buf, response, boost::asio::use_future); + }); }, m_Stream ); if (!response.keep_alive()) { - Disconnect(yc); + Disconnect(lock); } return response; diff --git a/lib/perfdata/perfdatawriterconnection.hpp b/lib/perfdata/perfdatawriterconnection.hpp index 729878a2960..ae1b8b50dcf 100644 --- a/lib/perfdata/perfdatawriterconnection.hpp +++ b/lib/perfdata/perfdatawriterconnection.hpp @@ -3,10 +3,12 @@ #pragma once +#include "base/defer.hpp" #include "base/io-engine.hpp" #include "base/tlsstream.hpp" #include #include +#include #include #include #include @@ -61,51 +63,37 @@ class PerfdataWriterConnection final : public Object template auto Send(Buffer&& buf) { + std::unique_lock lock(m_Mutex); if (m_Stopped) { BOOST_THROW_EXCEPTION(Stopped{}); } - using RetType = decltype(WriteMessage(std::declval(), std::declval())); - std::promise promise; + Defer resetRetryTimeout{[this]() { m_RetryTimeout = InitialRetryWait; }}; + + while (true) { + try { + EnsureConnected(lock); + + return WriteMessage(std::forward(buf), lock); + } catch (const std::exception& ex) { + if (m_Stopped) { + BOOST_THROW_EXCEPTION(Stopped{}); + } + + Log(LogCritical, m_LogFacility) + << "Error while " << (m_Connected ? "sending" : "connecting") << " to '" << m_Host << ":" << m_Port + << "' for '" << m_ParentName << "': " << ex.what(); + + m_Stream = MakeStream(); + m_Connected = false; - IoEngine::SpawnCoroutine(m_Strand, [&](boost::asio::yield_context yc) { - while (true) { try { - EnsureConnected(yc); - - if constexpr (std::is_void_v) { - WriteMessage(std::forward(buf), yc); - promise.set_value(); - } else { - promise.set_value(WriteMessage(std::forward(buf), yc)); - } - - m_RetryTimeout = InitialRetryWait; - return; - } catch (const std::exception& ex) { - if (m_Stopped) { - promise.set_exception(std::make_exception_ptr(Stopped{})); - return; - } - - Log(LogCritical, m_LogFacility) - << "Error while " << (m_Connected ? "sending" : "connecting") << " to '" << m_Host << ":" - << m_Port << "' for '" << m_ParentName << "': " << ex.what(); - - m_Stream = MakeStream(); - m_Connected = false; - - try { - BackoffWait(yc); - } catch (const std::exception&) { - promise.set_exception(std::make_exception_ptr(Stopped{})); - return; - } + BackoffWait(lock); + } catch (const std::exception&) { + BOOST_THROW_EXCEPTION(Stopped{}); } } - }); - - return promise.get_future().get(); + } } void Disconnect(); @@ -130,15 +118,15 @@ class PerfdataWriterConnection final : public Object private: AsioTlsOrTcpStream MakeStream() const; - void BackoffWait(const boost::asio::yield_context& yc); - void EnsureConnected(const boost::asio::yield_context& yc); - void Disconnect(boost::asio::yield_context yc); + void BackoffWait(std::unique_lock& lock); + void EnsureConnected(std::unique_lock& lock); + void Disconnect(std::unique_lock& lock); - void WriteMessage(boost::asio::const_buffer, const boost::asio::yield_context& yc); - HttpResponse WriteMessage(const HttpRequest& request, const boost::asio::yield_context& yc); + void WriteMessage(boost::asio::const_buffer, std::unique_lock& lock); + HttpResponse WriteMessage(const HttpRequest& request, std::unique_lock& lock); - std::atomic_bool m_Stopped{false}; - std::atomic_bool m_Connected{false}; + bool m_Stopped{}; + bool m_Connected{}; bool m_VerifyPeerCertificate; Shared::Ptr m_SslContext; @@ -150,7 +138,7 @@ class PerfdataWriterConnection final : public Object std::chrono::milliseconds m_RetryTimeout{InitialRetryWait}; boost::asio::steady_timer m_ReconnectTimer; - boost::asio::io_context::strand m_Strand; + std::mutex m_Mutex; AsioTlsOrTcpStream m_Stream; };