Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions lib/base/tcpsocket.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <boost/asio/error.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/spawn.hpp>
#include <boost/asio/use_future.hpp>
#include <boost/system/system_error.hpp>

namespace icinga
Expand Down Expand Up @@ -96,6 +97,55 @@
}
}

template<typename Socket, typename Executor>
std::future<void> Connect(Socket& socket, const String& node, const String& service, const Executor& executor)
{
using boost::asio::ip::tcp;

auto resolver = std::make_shared<tcp::resolver>(IoEngine::Get().GetIoContext());
auto promise = std::make_shared<std::promise<void>>();
auto future = promise->get_future();

resolver->async_resolve(
node.GetData(),
service.GetData(),
boost::asio::bind_executor(
executor,
[&socket, &executor, resolver, promise = std::move(promise)](
const boost::system::error_code& ec, tcp::resolver::results_type results
) {
if (ec) {
promise->set_exception(std::make_exception_ptr(boost::system::system_error(ec)));
return;
}

boost::asio::async_connect(

Check failure on line 122 in lib/base/tcpsocket.hpp

View workflow job for this annotation

GitHub Actions / amazonlinux:2

'async_connect' is not a member of 'boost::asio'
socket,
results,
boost::asio::bind_executor(
executor, [&socket, promise](const boost::system::error_code& ec, const tcp::endpoint&) {
if (ec) {
promise->set_exception(std::make_exception_ptr(boost::system::system_error(ec)));
return;
}

boost::system::error_code optEc;
socket.set_option(tcp::socket::keep_alive(true), optEc);
if (optEc) {
promise->set_exception(std::make_exception_ptr(boost::system::system_error(optEc)));
return;
}

promise->set_value();
}
)
);
}
)
);

return future;
}
}

#endif /* TCPSOCKET_H */
147 changes: 98 additions & 49 deletions lib/perfdata/perfdatawriterconnection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,40 @@
// SPDX-License-Identifier: GPL-3.0-or-later

#include "perfdata/perfdatawriterconnection.hpp"
#include "base/tcpsocket.hpp"
#include <boost/asio/use_future.hpp>
#include <boost/beast/http/read.hpp>
#include <boost/beast/http/write.hpp>
#include <utility>

using namespace icinga;
using HttpResponse = PerfdataWriterConnection::HttpResponse;

template<typename Fn, typename Lock>
static auto WrapOp(Lock& lock, bool& stopped, const Fn& fn)
{
auto fut = fn();
lock.unlock();
Defer maybeLock{[&]() {
if (!lock.owns_lock()) {
lock.lock();
}
}};

if constexpr (std::is_void_v<decltype(fut.get())>) {
fut.get();
lock.lock();
if (stopped) {
BOOST_THROW_EXCEPTION(PerfdataWriterConnection::Stopped{});
}
} else {
auto ret = fut.get();
lock.lock();
if (stopped) {
BOOST_THROW_EXCEPTION(PerfdataWriterConnection::Stopped{});
}
return ret;
}
}

PerfdataWriterConnection::PerfdataWriterConnection(
const ConfigObject::Ptr& parent,
String host,
Expand Down Expand Up @@ -42,7 +67,6 @@ PerfdataWriterConnection::PerfdataWriterConnection(
m_Host(std::move(host)),
m_Port(std::move(port)),
m_ReconnectTimer(IoEngine::Get().GetIoContext()),
m_Strand(IoEngine::Get().GetIoContext()),
m_Stream(MakeStream())
{
}
Expand All @@ -62,37 +86,31 @@ bool PerfdataWriterConnection::IsStopped() const

void PerfdataWriterConnection::Disconnect()
{
if (m_Stopped.exchange(true, std::memory_order_relaxed)) {
std::unique_lock lock{m_Mutex};
if (std::exchange(m_Stopped, true)) {
return;
}

std::promise<void> promise;

IoEngine::SpawnCoroutine(m_Strand, [&](boost::asio::yield_context yc) {
try {
/* Cancel any outstanding operations of the other coroutine.
* Since we're on the same strand we're hopefully guaranteed that all cancellations
* result in exceptions thrown by the yield_context, even if its already queued for
* completion.
*/
std::visit(
[](const auto& stream) {
if (stream->lowest_layer().is_open()) {
stream->lowest_layer().cancel();
}
},
m_Stream
);
m_ReconnectTimer.cancel();

Disconnect(std::move(yc));
promise.set_value();
} catch (const std::exception& ex) {
promise.set_exception(std::current_exception());
}
});
/* Cancel any outstanding operations of the other coroutine.
* Since we're on the same strand we're hopefully guaranteed that all cancellations
* result in exceptions thrown by the yield_context, even if its already queued for
* completion.
*/
std::visit(
[&](const auto& stream) {
if (stream->lowest_layer().is_open()) {
if (m_Connected) {
stream->lowest_layer().cancel();
} else {
stream->lowest_layer().close();
}
}
},
m_Stream
);
m_ReconnectTimer.cancel();

promise.get_future().get();
Disconnect(lock);
}

AsioTlsOrTcpStream PerfdataWriterConnection::MakeStream() const
Expand All @@ -113,29 +131,41 @@ AsioTlsOrTcpStream PerfdataWriterConnection::MakeStream() const
* The waits between retries are doubled for each failure, up to a maximum of 32s, until it is
* reset by a successful attempt.
*/
void PerfdataWriterConnection::BackoffWait(const boost::asio::yield_context& yc)
void PerfdataWriterConnection::BackoffWait(std::unique_lock<std::mutex>& lock)
{
m_ReconnectTimer.expires_after(m_RetryTimeout);
if (m_RetryTimeout <= FinalRetryWait / 2) {
m_RetryTimeout *= 2;
}
m_ReconnectTimer.async_wait(yc);

WrapOp(lock, m_Stopped, [this]() { return m_ReconnectTimer.async_wait(boost::asio::use_future); });
}

void PerfdataWriterConnection::EnsureConnected(const boost::asio::yield_context& yc)
void PerfdataWriterConnection::EnsureConnected(std::unique_lock<std::mutex>& lock)
{
if (m_Connected) {
return;
}

std::visit(
[&](auto& stream) {
::Connect(stream->lowest_layer(), m_Host, m_Port, yc);
boost::asio::ip::tcp::resolver resolver(IoEngine::Get().GetIoContext());
auto results = WrapOp(lock, m_Stopped, [&]() {
return resolver.async_resolve(m_Host.GetData(), m_Port.GetData(), boost::asio::use_future);
});

WrapOp(lock, m_Stopped, [&]() {
return boost::asio::async_connect(stream->lowest_layer(), results, boost::asio::use_future);
});

stream->lowest_layer().set_option(boost::asio::ip::tcp::socket::keep_alive(true));

if constexpr (std::is_same_v<std::decay_t<decltype(stream)>, Shared<AsioTlsStream>::Ptr>) {
using type = boost::asio::ssl::stream_base::handshake_type;

stream->next_layer().async_handshake(type::client, yc);
WrapOp(lock, m_Stopped, [stream]() {
return stream->next_layer().async_handshake(type::client, boost::asio::use_future);
});

if (m_VerifyPeerCertificate) {
if (!stream->next_layer().IsVerifyOK()) {
Expand All @@ -154,55 +184,74 @@ void PerfdataWriterConnection::EnsureConnected(const boost::asio::yield_context&
m_Connected = true;
}

void PerfdataWriterConnection::Disconnect(boost::asio::yield_context yc)
void PerfdataWriterConnection::Disconnect(std::unique_lock<std::mutex>& lock)
{
if (!m_Connected.exchange(false, std::memory_order_relaxed)) {
if (!std::exchange(m_Connected, false)) {
return;
}

std::visit(
[&](auto& stream) {
if constexpr (std::is_same_v<std::decay_t<decltype(stream)>, Shared<AsioTlsStream>::Ptr>) {
stream->GracefulDisconnect(m_Strand, yc);
} else {
stream->lowest_layer().shutdown(boost::asio::socket_base::shutdown_both);
stream->lowest_layer().close();
// Timeout shutdownTimeout(m_Strand, boost::posix_time::seconds(10), [&stream] {
// stream->ForceDisconnect();
// });

try {
WrapOp(lock, m_Stopped, [stream]() {
return stream->next_layer().async_shutdown(boost::asio::use_future);
});
} catch (const std::exception&) {
}
}

if (!stream->lowest_layer().is_open()) {
return;
}

boost::system::error_code ec;
stream->lowest_layer().shutdown(boost::asio::socket_base::shutdown_both, ec);
stream->lowest_layer().close(ec);
},
m_Stream
);

m_Stream = MakeStream();
}

void PerfdataWriterConnection::WriteMessage(boost::asio::const_buffer buf, const boost::asio::yield_context& yc)
void PerfdataWriterConnection::WriteMessage(boost::asio::const_buffer buf, std::unique_lock<std::mutex>& lock)
{
std::visit(
[&](auto& stream) {
boost::asio::async_write(*stream, buf, yc);
stream->async_flush(yc);
WrapOp(lock, m_Stopped, [&]() { return boost::asio::async_write(*stream, buf, boost::asio::use_future); });
WrapOp(lock, m_Stopped, [&]() { return stream->async_flush(boost::asio::use_future); });
},
m_Stream
);
}

HttpResponse PerfdataWriterConnection::WriteMessage(const HttpRequest& request, const boost::asio::yield_context& yc)
HttpResponse PerfdataWriterConnection::WriteMessage(const HttpRequest& request, std::unique_lock<std::mutex>& lock)
{
boost::beast::http::response<boost::beast::http::string_body> response;
std::visit(
[&](auto& stream) {
boost::beast::http::request_serializer<boost::beast::http::string_body> sr{request};
boost::beast::http::async_write(*stream, sr, yc);
stream->async_flush(yc);
WrapOp(lock, m_Stopped, [&]() {
return boost::beast::http::async_write(*stream, sr, boost::asio::use_future);
});

WrapOp(lock, m_Stopped, [&]() { return stream->async_flush(boost::asio::use_future); });

boost::beast::flat_buffer buf;
boost::beast::http::async_read(*stream, buf, response, yc);
WrapOp(lock, m_Stopped, [&]() {
return boost::beast::http::async_read(*stream, buf, response, boost::asio::use_future);
});
},
m_Stream
);

if (!response.keep_alive()) {
Disconnect(yc);
Disconnect(lock);
}

return response;
Expand Down
Loading
Loading