Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions include/tmc/all_headers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "tmc/latch.hpp" // IWYU pragma: export
#include "tmc/manual_reset_event.hpp" // IWYU pragma: export
#include "tmc/mutex.hpp" // IWYU pragma: export
#include "tmc/qu_mpsc_bounded.hpp" // IWYU pragma: export
#include "tmc/qu_mpsc_unbounded.hpp" // IWYU pragma: export
#include "tmc/qu_spsc_bounded.hpp" // IWYU pragma: export
#include "tmc/qu_spsc_unbounded.hpp" // IWYU pragma: export
Expand Down
78 changes: 11 additions & 67 deletions include/tmc/channel.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "tmc/current.hpp"
#include "tmc/detail/compat.hpp"
#include "tmc/detail/concepts_awaitable.hpp"
#include "tmc/detail/qu_storage.hpp"
#include "tmc/detail/tiny_lock.hpp"
#include "tmc/ex_any.hpp"
#include "tmc/task.hpp"
Expand All @@ -36,6 +37,7 @@
#include <cassert>
#include <climits>
#include <coroutine>
#include <cstdint>
#include <memory>
#include <mutex>
#include <optional>
Expand All @@ -45,65 +47,6 @@
#include <vector>

namespace tmc {
namespace detail {
// Allocates elements without constructing them, to be constructed later using
// placement new. T need not be default, copy, or move constructible.
// The caller must track whether the element exists, and manually invoke the
// destructor if necessary.
template <typename T> struct channel_storage {
union alignas(alignof(T)) {
T value;
};
#ifndef NDEBUG
bool exists = false;
#endif

channel_storage() noexcept {}

template <typename... ConstructArgs>
void emplace(ConstructArgs&&... Args) noexcept {
#ifndef NDEBUG
assert(!exists);
exists = true;
#endif
::new (static_cast<void*>(&value)) T(static_cast<ConstructArgs&&>(Args)...);
}

void destroy() noexcept {
#ifndef NDEBUG
assert(exists);
exists = false;
#endif
value.~T();
}

// Precondition: Other.value must exist
channel_storage(channel_storage&& Other) noexcept {
emplace(static_cast<T&&>(Other.value));
Other.destroy();
}
channel_storage& operator=(channel_storage&& Other) noexcept {
emplace(static_cast<T&&>(Other.value));
Other.destroy();
return *this;
}

// If data was present, the caller is responsible for destroying it.
#ifndef NDEBUG
~channel_storage() { assert(!exists); }
#else
~channel_storage()
requires(std::is_trivially_destructible_v<T>)
= default;
~channel_storage()
requires(!std::is_trivially_destructible_v<T>)
{}
#endif

channel_storage(const channel_storage&) = delete;
channel_storage& operator=(const channel_storage&) = delete;
};
} // namespace detail

struct chan_default_config {
/// The number of elements that can be stored in each block in the channel
Expand Down Expand Up @@ -244,14 +187,14 @@ TMC_DISABLE_WARNING_PADDED_END
template <typename T> class chan_zc_scope {
using hazard_ptr = tmc::detail::hazard_ptr;
hazard_ptr* haz_ptr;
tmc::detail::channel_storage<T>* data;
tmc::detail::qu_storage<T>* data;
size_t release_idx;

template <typename U, typename Config> friend class tmc::channel;
template <typename U, typename Config> friend class tmc::chan_tok;

chan_zc_scope(
hazard_ptr* Haz, tmc::detail::channel_storage<T>* Data, size_t ReleaseIdx
hazard_ptr* Haz, tmc::detail::qu_storage<T>* Data, size_t ReleaseIdx
) noexcept
: haz_ptr{Haz}, data{Data}, release_idx{ReleaseIdx} {}

Expand Down Expand Up @@ -362,10 +305,10 @@ template <typename T, typename Config> class channel {
consumer_base* consumer;

public:
tmc::detail::channel_storage<T> data;
tmc::detail::qu_storage<T> data;

static constexpr size_t UNPADLEN =
sizeof(size_t) + sizeof(void*) + sizeof(tmc::detail::channel_storage<T>);
sizeof(size_t) + sizeof(void*) + sizeof(tmc::detail::qu_storage<T>);
static constexpr size_t WANTLEN = (UNPADLEN + TMC_CACHE_LINE_SIZE - 1) &
static_cast<size_t>(
0 - TMC_CACHE_LINE_SIZE
Expand Down Expand Up @@ -433,13 +376,17 @@ template <typename T, typename Config> class channel {

// Same API as element_t
struct packed_element_t {
// Upper bits encode the consumer_base* (low 2 bits guaranteed 0 by
// alignment).
static inline constexpr uintptr_t DATA_BIT = TMC_ONE_BIT;
static inline constexpr uintptr_t CONS_BIT = TMC_ONE_BIT << 1;
static inline constexpr uintptr_t BOTH_BITS = DATA_BIT | CONS_BIT;
std::atomic<void*> flags;

static_assert(alignof(consumer_base) >= 4);

public:
tmc::detail::channel_storage<T> data;
tmc::detail::qu_storage<T> data;

// If this returns false, data is ready and consumer should not wait.
bool try_wait(consumer_base* Cons) noexcept {
Expand Down Expand Up @@ -519,9 +466,6 @@ template <typename T, typename Config> class channel {
data_block() noexcept : data_block(0) {}
};

static_assert(std::atomic<size_t>::is_always_lock_free);
static_assert(std::atomic<data_block*>::is_always_lock_free);

static inline constexpr size_t WRITE_CLOSING_BIT = TMC_ONE_BIT;
static inline constexpr size_t WRITE_CLOSED_BIT = TMC_ONE_BIT << 1;
static inline constexpr size_t READ_CLOSED_BIT = TMC_ONE_BIT << 2;
Expand Down
7 changes: 7 additions & 0 deletions include/tmc/detail/compat.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

#include <atomic>
#include <cstddef>
#include <cstdint>

#if defined(_MSC_VER)

Expand Down Expand Up @@ -231,6 +232,12 @@ using atomic_waker_t = size_t;
} // namespace detail
} // namespace tmc

static_assert(std::atomic<void*>::is_always_lock_free);
static_assert(std::atomic<uintptr_t>::is_always_lock_free);
static_assert(std::atomic<size_t>::is_always_lock_free);
static_assert(std::atomic<tmc::detail::atomic_wait_t>::is_always_lock_free);
static_assert(std::atomic<tmc::detail::atomic_waker_t>::is_always_lock_free);

#ifdef TMC_NODISCARD_AWAIT
#define TMC_AWAIT_RESUME [[nodiscard]]
#else
Expand Down
11 changes: 3 additions & 8 deletions include/tmc/detail/qu_mpsc_blocking.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ class qu_mpsc_blocking {
static_assert(std::is_nothrow_move_constructible_v<T>);

private:
struct element_t {
struct element {
static inline constexpr tmc::detail::atomic_wait_t EMPTY = 0;
static inline constexpr tmc::detail::atomic_wait_t WAITING = 1;
static inline constexpr tmc::detail::atomic_wait_t DATA = 2;
Expand Down Expand Up @@ -179,7 +179,6 @@ class qu_mpsc_blocking {
void reset() noexcept { flags.store(EMPTY, std::memory_order_relaxed); }
};

using element = element_t;
static_assert(Config::PackingLevel < 2);

struct data_block {
Expand All @@ -202,10 +201,6 @@ class qu_mpsc_blocking {
data_block() noexcept : data_block(0) {}
};

static_assert(std::atomic<size_t>::is_always_lock_free);
static_assert(std::atomic<data_block*>::is_always_lock_free);
static_assert(std::atomic<tmc::detail::atomic_wait_t>::is_always_lock_free);

char pad0[TMC_CACHE_LINE_SIZE - sizeof(size_t)];
std::atomic<size_t> write_offset;
char pad1[TMC_CACHE_LINE_SIZE - sizeof(size_t)];
Expand Down Expand Up @@ -473,8 +468,8 @@ class qu_mpsc_blocking {
// seq_cst and acq_rel exchanges are identical on modern x86/ARM, so we
// use a single ordering for consistency.
tmc::detail::atomic_wait_t prev =
Elem->flags.exchange(element_t::DATA, std::memory_order_seq_cst);
if (prev != element_t::WAITING) {
Elem->flags.exchange(element::DATA, std::memory_order_seq_cst);
if (prev != element::WAITING) {
return false;
}

Expand Down
79 changes: 79 additions & 0 deletions include/tmc/detail/qu_storage.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Copyright (c) 2023-2026 Logan McDougall
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE or copy at http://www.boost.org/LICENSE_1_0.txt)

#pragma once

// Storage type used internally by several public-API queues:
// - qu_spsc_bounded
// - qu_spsc_unbounded
// - qu_mpsc_bounded
// - qu_mpsc_unbounded
// - channel

// Allocates elements without constructing them, to be constructed later using
// placement new. T need not be default, copy, or move constructible.
// The caller must track whether the element exists, and manually invoke the
// destructor if necessary.

#include <cassert>
#include <type_traits>

namespace tmc {
namespace detail {
template <typename T> struct qu_storage {
union alignas(alignof(T)) {
T value;
};
#ifndef NDEBUG
bool exists = false;
#endif

qu_storage() noexcept {}

template <typename... ConstructArgs>
void emplace(ConstructArgs&&... Args) noexcept {
#ifndef NDEBUG
assert(!exists);
exists = true;
#endif
::new (static_cast<void*>(&value)) T(static_cast<ConstructArgs&&>(Args)...);
}

void destroy() noexcept {
#ifndef NDEBUG
assert(exists);
exists = false;
#endif
value.~T();
}

// Precondition: Other.value must exist
qu_storage(qu_storage&& Other) noexcept {
emplace(static_cast<T&&>(Other.value));
Other.destroy();
}
qu_storage& operator=(qu_storage&& Other) noexcept {
emplace(static_cast<T&&>(Other.value));
Other.destroy();
return *this;
}

// If data was present, the caller is responsible for destroying it.
#ifndef NDEBUG
~qu_storage() { assert(!exists); }
#else
~qu_storage()
requires(std::is_trivially_destructible_v<T>)
= default;
~qu_storage()
requires(!std::is_trivially_destructible_v<T>)
{}
#endif

qu_storage(const qu_storage&) = delete;
qu_storage& operator=(const qu_storage&) = delete;
};
} // namespace detail
} // namespace tmc
Loading
Loading