Skip to content
Draft
3 changes: 2 additions & 1 deletion .github/workflows/linux.bash
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ case "$DISTRO" in
)

ln -vs /usr/bin/cmake3 /usr/local/bin/cmake
ln -vs /usr/bin/ctest3 /usr/local/bin/ctest
ln -vs /usr/bin/ninja-build /usr/local/bin/ninja
CMAKE_OPTS+=(-DBOOST_{INCLUDEDIR=/boost_1_69_0,LIBRARYDIR=/boost_1_69_0/stage/lib})
export LD_LIBRARY_PATH=/boost_1_69_0/stage/lib
Expand Down Expand Up @@ -154,6 +155,6 @@ cd /icinga2/build

ninja -v

ninja test
ctest -j$(nproc) --output-on-failure
ninja install
icinga2 daemon -C
7 changes: 5 additions & 2 deletions Containerfile
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,11 @@ RUN --mount=type=bind,source=.,target=/icinga2,readonly \
-DICINGA2_RUNDIR=/run \
-DICINGA2_WITH_COMPAT=OFF \
-DICINGA2_WITH_LIVESTATUS=OFF && \
make -j$([ "$MAKE_JOBS" = auto ] && nproc || echo "$MAKE_JOBS") && \
if [ "${ICINGA2_BUILD_TESTING}" = ON ]; then CTEST_OUTPUT_ON_FAILURE=1 make test; fi && \
JOBS=$([ "$MAKE_JOBS" = auto ] && nproc || echo "$MAKE_JOBS") && \
make -j"$JOBS" && \
if [ "${ICINGA2_BUILD_TESTING}" = ON ]; then \
ctest -j"$JOBS" --output-on-failure; \
fi && \
make install DESTDIR=/icinga2-install

RUN rm -rf /icinga2-install/etc/icinga2/features-enabled/mainlog.conf \
Expand Down
24 changes: 16 additions & 8 deletions lib/perfdata/perfdatawriterconnection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,11 @@ bool PerfdataWriterConnection::IsStopped() const

void PerfdataWriterConnection::Disconnect()
{
if (m_Stopped.exchange(true, std::memory_order_relaxed)) {
if (m_Stopped.exchange(true)) {
return;
}

std::promise<void> promise;
SyncResult<void> ret;

IoEngine::SpawnCoroutine(m_Strand, [&](boost::asio::yield_context yc) {
try {
Expand All @@ -76,23 +76,27 @@ void PerfdataWriterConnection::Disconnect()
* completion.
*/
std::visit(
[](const auto& stream) {
[&](const auto& stream) {
if (stream->lowest_layer().is_open()) {
stream->lowest_layer().cancel();
if (m_Connected) {
stream->lowest_layer().cancel();
} else {
stream->lowest_layer().close();
}
}
},
m_Stream
);
m_ReconnectTimer.cancel();

Disconnect(std::move(yc));
promise.set_value();
ret.SetValue();
} catch (const std::exception& ex) {
promise.set_exception(std::current_exception());
ret.SetException(std::current_exception());
}
});

promise.get_future().get();
ret.Get();
}

AsioTlsOrTcpStream PerfdataWriterConnection::MakeStream() const
Expand Down Expand Up @@ -133,6 +137,10 @@ void PerfdataWriterConnection::EnsureConnected(const boost::asio::yield_context&
::Connect(stream->lowest_layer(), m_Host, m_Port, yc);

if constexpr (std::is_same_v<std::decay_t<decltype(stream)>, Shared<AsioTlsStream>::Ptr>) {
if (m_Stopped) {
BOOST_THROW_EXCEPTION(Stopped{});
}

using type = boost::asio::ssl::stream_base::handshake_type;

stream->next_layer().async_handshake(type::client, yc);
Expand All @@ -156,7 +164,7 @@ void PerfdataWriterConnection::EnsureConnected(const boost::asio::yield_context&

void PerfdataWriterConnection::Disconnect(boost::asio::yield_context yc)
{
if (!m_Connected.exchange(false, std::memory_order_relaxed)) {
if (!m_Connected.exchange(false)) {
return;
}

Expand Down
69 changes: 63 additions & 6 deletions lib/perfdata/perfdatawriterconnection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <boost/beast/http/message.hpp>
#include <boost/beast/http/string_body.hpp>
#include <future>
#include <utility>

namespace icinga {

Expand All @@ -21,6 +22,62 @@ class PerfdataWriterConnection final : public Object
static constexpr auto InitialRetryWait = 50ms;
static constexpr auto FinalRetryWait = 32s;

template<typename T>
class SyncResult
{
using ValueType = std::variant<std::monostate, std::conditional_t<std::is_void_v<T>, bool, T>, std::exception_ptr>;

public:
template<typename U, typename V = T, typename = std::enable_if_t<!std::is_void_v<V>>>
void SetValue(U&& v)
{
{
std::lock_guard lock(m_Mutex);
m_Value = std::forward<U>(v);
}
m_Cv.notify_one();
}

template<typename V = T, typename = std::enable_if_t<std::is_void_v<V>>>
void SetValue()
{
{
std::lock_guard lock(m_Mutex);
m_Value = true;
}
m_Cv.notify_one();
}

void SetException(std::exception_ptr ep)
{
{
std::lock_guard lock(m_Mutex);
m_Value = ValueType{ep};
}
m_Cv.notify_one();
}

T Get()
{
std::unique_lock l(m_Mutex);
m_Cv.wait(l, [&] { return !std::holds_alternative<std::monostate>(m_Value); });
if (std::holds_alternative<std::exception_ptr>(m_Value)) {
std::rethrow_exception(std::get<std::exception_ptr>(m_Value));
}

if constexpr (std::is_void_v<T>) {
return;
} else {
return std::move(std::get<T>(m_Value));
}
}

private:
std::mutex m_Mutex;
std::condition_variable m_Cv;
ValueType m_Value;
};

public:
DECLARE_PTR_TYPEDEFS(PerfdataWriterConnection);

Expand Down Expand Up @@ -66,7 +123,7 @@ class PerfdataWriterConnection final : public Object
}

using RetType = decltype(WriteMessage(std::declval<Buffer>(), std::declval<boost::asio::yield_context>()));
std::promise<RetType> promise;
SyncResult<RetType> ret;

IoEngine::SpawnCoroutine(m_Strand, [&](boost::asio::yield_context yc) {
while (true) {
Expand All @@ -75,16 +132,16 @@ class PerfdataWriterConnection final : public Object

if constexpr (std::is_void_v<RetType>) {
WriteMessage(std::forward<Buffer>(buf), yc);
promise.set_value();
ret.SetValue();
} else {
promise.set_value(WriteMessage(std::forward<Buffer>(buf), yc));
ret.SetValue(WriteMessage(std::forward<Buffer>(buf), yc));
}

m_RetryTimeout = InitialRetryWait;
return;
} catch (const std::exception& ex) {
if (m_Stopped) {
promise.set_exception(std::make_exception_ptr(Stopped{}));
ret.SetException(std::make_exception_ptr(Stopped{}));
return;
}

Expand All @@ -98,14 +155,14 @@ class PerfdataWriterConnection final : public Object
try {
BackoffWait(yc);
} catch (const std::exception&) {
promise.set_exception(std::make_exception_ptr(Stopped{}));
ret.SetException(std::make_exception_ptr(Stopped{}));
return;
}
}
}
});

return promise.get_future().get();
return ret.Get();
}

void Disconnect();
Expand Down
12 changes: 6 additions & 6 deletions test/base-timer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
* Windows needs a special handicap to keep up with the other OSs.
*/
#ifdef _WIN32
static constexpr double timeMultiplier = 10;
static constexpr double timeMultiplier = 5;
#else //_WIN32
static constexpr double timeMultiplier = 1;
#endif //_WIN32
Expand Down Expand Up @@ -38,10 +38,10 @@ BOOST_AUTO_TEST_CASE(invoke)

Timer::Ptr timer = Timer::Create();
timer->OnTimerExpired.connect([&counter](const Timer* const&) { counter++; });
timer->SetInterval(.1 * timeMultiplier);
timer->SetInterval(.2 * timeMultiplier);

timer->Start();
Utility::Sleep(.55 * timeMultiplier);
Utility::Sleep(1.1 * timeMultiplier);
timer->Stop();

// At this point, the timer should have fired exactly 5 times (0.5 / 0.1) and the sixth time
Expand All @@ -55,12 +55,12 @@ BOOST_AUTO_TEST_CASE(scope)

Timer::Ptr timer = Timer::Create();
timer->OnTimerExpired.connect([&counter](const Timer* const&) { counter++; });
timer->SetInterval(.1 * timeMultiplier);
timer->SetInterval(.2 * timeMultiplier);

timer->Start();
Utility::Sleep(.55 * timeMultiplier);
Utility::Sleep(1.1 * timeMultiplier);
timer.reset();
Utility::Sleep(.1 * timeMultiplier);
Utility::Sleep(.2 * timeMultiplier);

// At this point, the timer should have fired exactly 5 times (0.5 / 0.1) and the sixth time
// should not have fired yet as we destroyed the timer after 0.55 seconds (0.6 would be needed),
Expand Down
4 changes: 2 additions & 2 deletions test/perfdata-elasticsearchwriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,13 @@ BOOST_AUTO_TEST_CASE(pause_with_pending_work)
ResumeWriter();

// Process check-results until the writer is stuck.
BOOST_REQUIRE_MESSAGE(GetWriterStuck(10s), "Failed to get Writer stuck.");
BOOST_REQUIRE_MESSAGE(GetWriterStuck(20s), "Failed to get Writer stuck.");

// Now try to pause.
PauseWriter();

REQUIRE_LOG_MESSAGE("Connection stopped\\.", 10s);
REQUIRE_LOG_MESSAGE("'ElasticsearchWriter' paused\\.", 10s);
REQUIRE_LOG_MESSAGE("'ElasticsearchWriter' paused\\.", 1s);
}

BOOST_AUTO_TEST_SUITE_END()
4 changes: 2 additions & 2 deletions test/perfdata-gelfwriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@ BOOST_AUTO_TEST_CASE(pause_with_pending_work)
ResumeWriter();

// Process check-results until the writer is stuck.
BOOST_REQUIRE_MESSAGE(GetWriterStuck(10s), "Failed to get Writer stuck.");
BOOST_REQUIRE_MESSAGE(GetWriterStuck(20s), "Failed to get Writer stuck.");

// Now stop reading and try to pause OpenTsdbWriter.
PauseWriter();

REQUIRE_LOG_MESSAGE("Connection stopped\\.", 1s);
REQUIRE_LOG_MESSAGE("Connection stopped\\.", 10s);
REQUIRE_LOG_MESSAGE("'GelfWriter' paused\\.", 1s);
}

Expand Down
4 changes: 2 additions & 2 deletions test/perfdata-graphitewriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,13 @@ BOOST_AUTO_TEST_CASE(pause_with_pending_work)
ResumeWriter();

// Process check-results until the writer is stuck.
BOOST_REQUIRE_MESSAGE(GetWriterStuck(10s), "Failed to get Writer stuck.");
BOOST_REQUIRE_MESSAGE(GetWriterStuck(20s), "Failed to get Writer stuck.");

// Now stop reading and try to pause OpenTsdbWriter.
PauseWriter();

REQUIRE_LOG_MESSAGE("Connection stopped\\.", 10s);
REQUIRE_LOG_MESSAGE("'GraphiteWriter' paused\\.", 10s);
REQUIRE_LOG_MESSAGE("'GraphiteWriter' paused\\.", 1s);
}

BOOST_AUTO_TEST_SUITE_END()
2 changes: 1 addition & 1 deletion test/perfdata-influxdbwriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ BOOST_AUTO_TEST_CASE(pause_with_pending_work)
ResumeWriter();

// Process check-results until the writer is stuck.
BOOST_REQUIRE_MESSAGE(GetWriterStuck(10s), "Failed to get Writer stuck.");
BOOST_REQUIRE_MESSAGE(GetWriterStuck(20s), "Failed to get Writer stuck.");

// Now try to pause.
PauseWriter();
Expand Down
4 changes: 2 additions & 2 deletions test/perfdata-opentsdbwriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,13 @@ BOOST_AUTO_TEST_CASE(pause_with_pending_work)
ResumeWriter();

// Process check-results until the writer is stuck.
BOOST_REQUIRE_MESSAGE(GetWriterStuck(10s), "Failed to get Writer stuck.");
BOOST_REQUIRE_MESSAGE(GetWriterStuck(20s), "Failed to get Writer stuck.");

// Now stop reading and try to pause OpenTsdbWriter.
PauseWriter();

REQUIRE_LOG_MESSAGE("Connection stopped\\.", 10s);
REQUIRE_LOG_MESSAGE("'OpenTsdbWriter' paused\\.", 10s);
REQUIRE_LOG_MESSAGE("'OpenTsdbWriter' paused\\.", 1s);
}

BOOST_AUTO_TEST_SUITE_END()
11 changes: 10 additions & 1 deletion test/perfdata-perfdatatargetfixture.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,20 @@ class PerfdataWriterTargetFixture
BOOST_REQUIRE(stream->next_layer().IsVerifyOK());
}

void Shutdown()
void Shutdown(bool wait = false)
{
BOOST_REQUIRE(std::holds_alternative<Shared<AsioTlsStream>::Ptr>(m_Stream));
auto& stream = std::get<Shared<AsioTlsStream>::Ptr>(m_Stream);
try {
if (wait) {
std::array<std::byte, 128> buf{};
boost::asio::mutable_buffer readBuf (buf.data(), buf.size());
boost::system::error_code ec;

do {
stream->read_some(readBuf, ec);
} while (!ec);
}
stream->next_layer().shutdown();
} catch (const std::exception& ex) {
if (const auto* se = dynamic_cast<const boost::system::system_error*>(&ex);
Expand Down
Loading
Loading