diff --git a/include/tmc/detail/compat.hpp b/include/tmc/detail/compat.hpp index 6a6e3ed9..1fd7b253 100644 --- a/include/tmc/detail/compat.hpp +++ b/include/tmc/detail/compat.hpp @@ -7,6 +7,7 @@ #include #include +#include #if defined(_MSC_VER) @@ -193,7 +194,11 @@ _Pragma("GCC diagnostic pop") #define NO_HINT static_cast(-1) -inline constexpr size_t TMC_PLATFORM_BITS = sizeof(size_t) * 8; // 32 or 64 +#if SIZE_MAX == 0xFFFFFFFFu +#define TMC_PLATFORM_BITS 32 +#else +#define TMC_PLATFORM_BITS 64 +#endif inline constexpr size_t TMC_MAX_PRIORITY_COUNT = 16; diff --git a/include/tmc/detail/ex_cpu.ipp b/include/tmc/detail/ex_cpu.ipp index e935bcef..a945624f 100644 --- a/include/tmc/detail/ex_cpu.ipp +++ b/include/tmc/detail/ex_cpu.ipp @@ -15,7 +15,7 @@ #include "tmc/detail/hwloc_unique_bitmap.hpp" #include "tmc/detail/impl.hpp" // IWYU pragma: keep #include "tmc/detail/matrix.hpp" -#include "tmc/detail/qu_lockfree.hpp" +#include "tmc/detail/qu_mc.hpp" #include "tmc/detail/thread_layout.hpp" #include "tmc/detail/thread_locals.hpp" #include "tmc/ex_any.hpp" @@ -288,29 +288,28 @@ INTERRUPT_DONE: } } -ex_cpu::task_queue_t::ExplicitProducer** ex_cpu::init_queue_iteration_order( +ex_cpu::cldq_t** ex_cpu::init_queue_iteration_order( std::vector> const& Forward ) { - // Calculate total size based on the global producerArrayOffset layout. - // Each priority level contributes (dequeueProducerCount + 1) slots. - // The array must be full-sized so that producerArrayOffset indexing works + // Calculate total size based on the global tlsArrayOffset layout. + // Each priority level contributes (threadCount + 1) slots. + // The array must be full-sized so that tlsArrayOffset indexing works // correctly, even if this thread doesn't handle all priorities. // Layout per priority: [self, cached, others...] size_t totalSize = 0; for (size_t prio = 0; prio < PRIORITY_COUNT; ++prio) { - totalSize += work_queues[prio].dequeueProducerCount + 1; + totalSize += q_ws[prio].threadCount + 1; } if (totalSize == 0) { return nullptr; } - task_queue_t::ExplicitProducer** producers = - new task_queue_t::ExplicitProducer*[totalSize]; + cldq_t** producers = new cldq_t*[totalSize]; for (size_t prio = 0; prio < PRIORITY_COUNT; ++prio) { - size_t offset = work_queues[prio].producerArrayOffset; - size_t slotCount = work_queues[prio].dequeueProducerCount + 1; + size_t offset = q_ws[prio].tlsArrayOffset; + size_t slotCount = q_ws[prio].threadCount + 1; if (Forward[prio].empty()) { // This thread doesn't handle this priority - null out all slots @@ -320,17 +319,16 @@ ex_cpu::task_queue_t::ExplicitProducer** ex_cpu::init_queue_iteration_order( continue; } - assert(Forward[prio].size() == work_queues[prio].dequeueProducerCount); + assert(Forward[prio].size() == q_ws[prio].threadCount); auto& thisMatrix = Forward[prio]; - // pointer to this thread's producer - producers[offset] = &work_queues[prio].staticProducers[thisMatrix[0]]; - // pointer to previously consumed-from producer (initially none) + // pointer to this thread's deque + producers[offset] = &q_ws[prio].deques[thisMatrix[0]]; + // pointer to previously consumed-from deque (initially none) producers[offset + 1] = nullptr; for (size_t i = 1; i < Forward[prio].size(); ++i) { - producers[offset + 1 + i] = - &work_queues[prio].staticProducers[thisMatrix[i]]; + producers[offset + 1 + i] = &q_ws[prio].deques[thisMatrix[i]]; } } return producers; @@ -389,25 +387,22 @@ TMC_FORCE_INLINE inline bool ex_cpu::try_run_some( const size_t PriorityRangeBegin, const size_t PriorityRangeEnd, size_t& PrevPriority, bool& Spinning ) { - size_t idxBase = work_queues[PriorityRangeBegin].producerArrayOffset; - task_queue_t::ExplicitProducer** producersBase = - static_cast( - tmc::detail::this_thread::producers() - ) + - idxBase; + size_t idxBase = q_ws[PriorityRangeBegin].tlsArrayOffset; + cldq_t** producersBase = + static_cast(tmc::detail::this_thread::producers()) + idxBase; TOP: if (ThreadStopToken.stop_requested()) [[unlikely]] { return false; } work_item item; - task_queue_t::ExplicitProducer** producers = producersBase; + cldq_t** producers = producersBase; // For priority 0, check private queue, then inbox, then try to steal // Lower priorities can just check private queue and steal - no inbox // Although this could be combined into the following loop (with an if // statement to remove the inbox check), it gives better codegen for the // fast path to keep it separate. - if ((*producers)->dequeue_lifo(item)) [[likely]] { + if ((*producers)->try_pop(item)) [[likely]] { run_one(item, Slot, PriorityRangeBegin, PrevPriority, Spinning); goto TOP; } @@ -422,21 +417,32 @@ TOP: goto TOP; } - if (work_queues[PriorityRangeBegin].try_dequeue_ex_cpu_steal(item, producers)) - [[likely]] { + // Check for new work coming from I/O or from the main thread + if (q_ingest[PriorityRangeBegin].try_dequeue(item)) { + run_one(item, Slot, PriorityRangeBegin, PrevPriority, Spinning); + goto TOP; + } + + // Try to steal from other threads in this executor + if (q_ws[PriorityRangeBegin].try_steal_in_order(item, producers)) [[likely]] { run_one(item, Slot, PriorityRangeBegin, PrevPriority, Spinning); goto TOP; } // Now check lower priority queues for (size_t prio = PriorityRangeBegin + 1; prio < PriorityRangeEnd; ++prio) { - if ((*producers)->dequeue_lifo(item)) { + if ((*producers)->try_pop(item)) { run_one(item, Slot, prio, PrevPriority, Spinning); goto TOP; } ++producers; - if (work_queues[prio].try_dequeue_ex_cpu_steal(item, producers)) { + if (q_ingest[prio].try_dequeue(item)) { + run_one(item, Slot, prio, PrevPriority, Spinning); + goto TOP; + } + + if (q_ws[prio].try_steal_in_order(item, producers)) { run_one(item, Slot, prio, PrevPriority, Spinning); goto TOP; } @@ -481,9 +487,13 @@ void ex_cpu::post(work_item&& Item, size_t Priority, size_t ThreadHint) { } } if (allowedPriority) [[likely]] { - work_queues[Priority].enqueue_ex_cpu(static_cast(Item)); + // Push to this thread's Chase-Lev deque for this priority. + cldq_t** producers = + static_cast(tmc::detail::this_thread::producers()); + cldq_t* myDeque = producers[q_ws[Priority].tlsArrayOffset]; + myDeque->push(static_cast(Item)); } else { - work_queues[Priority].enqueue(static_cast(Item)); + q_ingest[Priority].enqueue(static_cast(Item)); } notify_n(1, Priority, allowedPriority, true); @@ -509,7 +519,7 @@ ex_cpu::ex_cpu() auto ex_cpu::make_worker( tmc::topology::thread_info Info, size_t PriorityRangeBegin, - size_t PriorityRangeEnd, ex_cpu::task_queue_t::ExplicitProducer** StealOrder, + size_t PriorityRangeEnd, ex_cpu::cldq_t** StealOrder, std::atomic& InitThreadsBarrier, // will be nullptr if hwloc is not enabled [[maybe_unused]] tmc::detail::hwloc_unique_bitmap& CpuSet, @@ -595,14 +605,18 @@ auto ex_cpu::make_worker( // working threads. This prevents too many spinners in a lightly // loaded system. if (2 * spinningThreadCount <= workingThreadCount) { + cldq_t** producersBase = + static_cast(tmc::detail::this_thread::producers()); for (size_t i = 0; i < spins; ++i) { - TMC_CPU_PAUSE(); if (!thread_states[Slot].inbox->empty()) { goto TOP; } for (size_t prio = PriorityRangeBegin; prio < PriorityRangeEnd; ++prio) { - if (!work_queues[prio].empty()) { + if (!q_ingest[prio].empty()) { + goto TOP; + } + if (!q_ws[prio].steal_empty(producersBase)) { goto TOP; } } @@ -643,10 +657,18 @@ auto ex_cpu::make_worker( spinning_threads_bitset.set_bit(Slot); goto TOP; } - for (size_t prio = PriorityRangeBegin; prio < PriorityRangeEnd; ++prio) { - if (!work_queues[prio].empty()) { - spinning_threads_bitset.set_bit(Slot); - goto TOP; + { + cldq_t** producersBase = + static_cast(tmc::detail::this_thread::producers()); + for (size_t prio = PriorityRangeBegin; prio < PriorityRangeEnd; + ++prio) { + if (!q_ingest[prio].empty()) { + spinning_threads_bitset.set_bit(Slot); + goto TOP; + } + if (!q_ws[prio].steal_empty(producersBase)) { + goto TOP; + } } } @@ -665,9 +687,7 @@ auto ex_cpu::make_worker( } clear_thread_locals(); - delete[] static_cast( - tmc::detail::this_thread::producers() - ); + delete[] static_cast(tmc::detail::this_thread::producers()); tmc::detail::this_thread::producers() = nullptr; }; } @@ -990,25 +1010,24 @@ void ex_cpu::init() { std::vector threadInboxIndexes = tmc::detail::get_thread_inbox_indexes(inboxInputs); // Index is returned in ascending order - inboxes.resize(threadInboxIndexes.back() + 1); - inboxes.fill_default(); + q_inbox.resize(threadInboxIndexes.back() + 1); + q_inbox.fill_default(); for (size_t i = 0; i < thread_count(); ++i) { - thread_states[i].inbox = &inboxes[threadInboxIndexes[i]]; + thread_states[i].inbox = &q_inbox[threadInboxIndexes[i]]; } } - work_queues.resize(PRIORITY_COUNT); + q_ingest.resize(PRIORITY_COUNT); + q_ws.resize(PRIORITY_COUNT); size_t cumulativeOffset = 0; for (size_t prio = 0; prio < PRIORITY_COUNT; ++prio) { size_t threadCount = tidsByPrio[prio].size(); - work_queues.emplace_at(prio, threadCount + 1); - work_queues[prio].staticProducers = - new task_queue_t::ExplicitProducer[threadCount]; - for (size_t i = 0; i < threadCount; ++i) { - work_queues[prio].staticProducers[i].init(&work_queues[prio]); - } - work_queues[prio].dequeueProducerCount = threadCount; - work_queues[prio].producerArrayOffset = cumulativeOffset; + q_ingest.emplace_at(prio, 1u); + + // Allocates one Chase-Lev deque per thread that participates in this + // priority. + q_ws.emplace_at(prio, threadCount, cumulativeOffset); + // add 1 extra space for cached producer cumulativeOffset += threadCount + 1; } @@ -1284,15 +1303,13 @@ void ex_cpu::teardown() { } threads.clear(); thread_stoppers.clear(); - inboxes.clear(); + q_inbox.clear(); threads_by_priority_bitset.clear(); waker_matrix.clear(); - for (size_t i = 0; i < work_queues.size(); ++i) { - delete[] work_queues[i].staticProducers; - } - work_queues.clear(); + q_ws.clear(); + q_ingest.clear(); working_threads_bitset.clear(); spinning_threads_bitset.clear(); if (task_stopper_bitsets != nullptr) { diff --git a/include/tmc/detail/qu_chase_lev32.hpp b/include/tmc/detail/qu_chase_lev32.hpp new file mode 100644 index 00000000..ff4e2b42 --- /dev/null +++ b/include/tmc/detail/qu_chase_lev32.hpp @@ -0,0 +1,333 @@ +// Copyright (c) 2023-2026 Logan McDougall +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE or copy at http://www.boost.org/LICENSE_1_0.txt) + +#pragma once + +#include "tmc/detail/compat.hpp" +#include "tmc/detail/tsan.hpp" + +#include +#include +#include +#include +#include +#include +#include +#include + +namespace tmc { +namespace detail { + +// A Chase-Lev work-stealing deque. +// +// The owner (single producer/consumer) calls push() and try_pop() at the tail. +// Any other thread may call steal() to take work from the head. +// +// Buffers can grow. When the owner detects its buffer is full, it allocates a +// new buffer of double the size, copies all elements over, and atomically +// publishes the new buffer. Old buffers are retained in a "leftovers" vector +// so that stealers that loaded the old buffer pointer can still access it +// safely. Leftovers are destroyed only when the deque itself is destroyed. +// +// T must be trivially copyable (and trivially destructible). This is sufficient +// for tmc::work_item (either std::coroutine_handle<> or tmc::coro_functor), +// both of which are non-owning handles whose underlying allocations are freed +// only by the unique copy that is actually executed. +// +// This version does not use any packing, so it is suitable for use on 32-bit. +template class chase_lev_deque { + static_assert( + std::is_trivially_copyable_v, + "chase_lev_deque requires a trivially copyable element type" + ); + static_assert( + std::is_trivially_destructible_v, + "chase_lev_deque requires a trivially destructible element type" + ); + + struct buffer { + size_t capacity; + size_t mask; + T* data; + + explicit buffer(size_t cap) : capacity(cap), mask(cap - 1) { + assert((cap & (cap - 1)) == 0 && "capacity must be a power of two"); + data = static_cast(::operator new(sizeof(T) * cap)); + } + + ~buffer() { ::operator delete(data); } + + buffer(const buffer&) = delete; + buffer& operator=(const buffer&) = delete; + }; + + // Top (head) - where steals take from. Owned by all threads. + // Bottom (tail) - where push/pop happen. Owned by the single owner. + // Storage is unsigned (defined wrap-around). All relational comparisons + // between top and bottom are performed by computing (b - t) in uint32_t + // and reinterpreting the difference as int32_t (the Chase-Lev signed- + // difference trick), so the algorithm is correct across 32-bit + // wraparound as long as |b - t| < 2^31. + alignas(TMC_CACHE_LINE_SIZE) std::atomic top_; + std::atomic bottom_; + alignas(TMC_CACHE_LINE_SIZE) std::atomic buffer_; + + // Owner-private cached pointer to the current buffer. Avoids one atomic load + // on the hot push path. + alignas(TMC_CACHE_LINE_SIZE) buffer* owner_buffer_; + + // Owner-private cached value of bottom_. The owner is the only writer of + // bottom_, so it can read this private copy instead of the atomic, avoiding + // a load from the contended top_/bottom_ cache line on every push/pop. + // Stays in sync with bottom_ via parallel updates in push/pop. + uint32_t owner_bottom_; + + // Buffers retained until destruction so stealers always have a valid pointer. + std::vector leftovers_; + + static constexpr size_t DEFAULT_INITIAL_CAPACITY = 64; + + // For T sizes that fit in a single hardware atomic word, we use + // std::atomic_ref so that push/pop/steal slot accesses are well-defined under + // the C++ memory model and TSAN-clean. The ordering of these atomics is + // relaxed; the actual happens-before edge for slot visibility is provided by + // the release fence / acquire-load on bottom_ / top_. + // + // For larger T (notably the 2-pointer coro_functor used when + // TMC_WORK_ITEM=FUNCORO), std::atomic_ref would either generate a + // multi-word CAS or fall back to an internal lock. We instead use the classic + // Chase-Lev "benign racy" approach which ignores invalid reads afterward, and + // simply disable TSan for the helper. + template + TMC_INLINE_OR_TSAN static void store_item(T* slot, U&& item) { + if constexpr (sizeof(T) <= sizeof(size_t)) { + T tmp(static_cast(item)); + std::atomic_ref(*slot).store(tmp, std::memory_order_relaxed); + } else { + new (slot) T(static_cast(item)); + } + } + + TMC_INLINE_OR_TSAN static void load_item(T& out, T* slot) { + if constexpr (sizeof(T) <= sizeof(size_t)) { + out = std::atomic_ref(*slot).load(std::memory_order_relaxed); + } else { + std::memcpy( + static_cast(&out), static_cast(slot), sizeof(T) + ); + } + } + +public: + chase_lev_deque() : chase_lev_deque(DEFAULT_INITIAL_CAPACITY) {} + + explicit chase_lev_deque(size_t initialCapacity) { + size_t cap = 1; + while (cap < initialCapacity) { + cap <<= 1; + } + auto* buf = new buffer(cap); + owner_buffer_ = buf; + owner_bottom_ = 0; + buffer_.store(buf, std::memory_order_relaxed); + top_.store(0, std::memory_order_relaxed); + bottom_.store(0, std::memory_order_relaxed); + leftovers_.push_back(buf); + } + + ~chase_lev_deque() { + for (buffer* b : leftovers_) { + delete b; + } + } + + chase_lev_deque(const chase_lev_deque&) = delete; + chase_lev_deque& operator=(const chase_lev_deque&) = delete; + chase_lev_deque(chase_lev_deque&&) = delete; + chase_lev_deque& operator=(chase_lev_deque&&) = delete; + + // Owner-only. Push an item at the bottom (tail) of the deque. + // Grows the buffer if it is full. + template TMC_FORCE_INLINE void push(U&& item) { + uint32_t b = owner_bottom_; + uint32_t t = top_.load(std::memory_order_acquire); + buffer* buf = owner_buffer_; + // (b - t) is the current queue size, computed in uint32_t (defined + // wraparound). Reinterpret as int32_t for the signed comparison + // against capacity (the Chase-Lev signed-difference trick). + if (static_cast(b - t) > static_cast(buf->capacity) - 1) + [[unlikely]] { + // Buffer full - grow. + buffer* nb = new buffer(buf->capacity * 2); + for (uint32_t i = t; i != b; ++i) { + std::memcpy( + static_cast(nb->data + (static_cast(i) & nb->mask)), + static_cast( + buf->data + (static_cast(i) & buf->mask) + ), + sizeof(T) + ); + } + leftovers_.push_back(nb); + owner_buffer_ = nb; + buffer_.store(nb, std::memory_order_release); + buf = nb; + } + store_item( + buf->data + (static_cast(b) & buf->mask), static_cast(item) + ); + bottom_.store(b + 1u, std::memory_order_release); + owner_bottom_ = b + 1u; + } + + // Owner-only. Push Count items at the bottom (tail) of the deque, taken + // from the iterator It (incremented Count times). Grows the buffer if it + // is full. Issues only a single release fence and a single store to + // bottom_, regardless of Count. + template + TMC_FORCE_INLINE void post_bulk(It&& Items, size_t Count) { + if (Count == 0) [[unlikely]] { + return; + } + uint32_t b = owner_bottom_; + uint32_t t = top_.load(std::memory_order_acquire); + buffer* buf = owner_buffer_; + // Compute (queue_size + Count) entirely in uint32_t (defined wrap), + // then reinterpret as int32_t for the signed comparison. + uint32_t needed_u = (b - t) + static_cast(Count); + if (static_cast(needed_u) > static_cast(buf->capacity)) + [[unlikely]] { + // Buffer too small - grow to fit. + size_t newCap = buf->capacity * 2; + size_t needed = static_cast(needed_u); + while (newCap < needed) { + newCap <<= 1; + } + buffer* nb = new buffer(newCap); + for (uint32_t i = t; i != b; ++i) { + std::memcpy( + static_cast(nb->data + (static_cast(i) & nb->mask)), + static_cast( + buf->data + (static_cast(i) & buf->mask) + ), + sizeof(T) + ); + } + leftovers_.push_back(nb); + owner_buffer_ = nb; + buffer_.store(nb, std::memory_order_release); + buf = nb; + } + It it = static_cast(Items); + for (size_t i = 0; i < Count; ++i) { + store_item( + buf->data + + (static_cast(b + static_cast(i)) & buf->mask), + static_cast(*it) + ); + ++it; + } + uint32_t newBottom = b + static_cast(Count); + bottom_.store(newBottom, std::memory_order_release); + owner_bottom_ = newBottom; + } + + // Owner-only. Pop an item from the bottom (tail) of the deque (LIFO). + // Returns true if an item was popped, false if the deque was empty. + TMC_FORCE_INLINE bool try_pop(T& out) { + // (owner_bottom_ - 1u) wraps to UINT32_MAX when owner_bottom_ is 0; + // that's the intended bit pattern - the (b - t) signed difference + // below still computes the correct logical sign. + uint32_t b = owner_bottom_ - 1u; + buffer* buf = owner_buffer_; + bottom_.store(b, std::memory_order_relaxed); + std::atomic_thread_fence(std::memory_order_seq_cst); + uint32_t t = top_.load(std::memory_order_relaxed); + // (b - t) is computed in uint32_t (defined wraparound). Reinterpret as + // int32_t for the signed sign check (the Chase-Lev trick). + int32_t diff = static_cast(b - t); + if (diff >= 0) { + // Non-empty. + T* slot = buf->data + (static_cast(b) & buf->mask); + if (diff > 0) { + // More than one element - safe to take without racing stealers. + owner_bottom_ = b; + load_item(out, slot); + return true; + } + // Last element - race with stealers via CAS on top. + bool won = top_.compare_exchange_strong( + t, t + 1u, std::memory_order_seq_cst, std::memory_order_relaxed + ); + bottom_.store(b + 1u, std::memory_order_relaxed); + owner_bottom_ = b + 1u; + if (!won) { + return false; + } + load_item(out, slot); + return true; + } else { + // Empty. + bottom_.store(b + 1u, std::memory_order_relaxed); + owner_bottom_ = b + 1u; + return false; + } + } + + // Any thread. Steal an item from the top (head) of the deque (FIFO). + // Returns true if an item was stolen, false otherwise (deque empty or lost + // a race). + TMC_FORCE_INLINE bool steal(T& out) { + // Cheap empty-check first. If the deque appears empty, return without + // issuing the expensive seq_cst fence or CAS. Both loads are acquire so + // we still synchronize-with the owner's push, but we avoid the mfence + // and the CAS-induced cache-line ping-pong on top_ in the common + // "victim is empty" case. + uint32_t t = top_.load(std::memory_order_acquire); + uint32_t b = bottom_.load(std::memory_order_acquire); + // (b - t) is computed in uint32_t (defined wraparound). Reinterpret as + // int32_t for the signed sign check. + if (static_cast(b - t) <= 0) { + return false; + } + std::atomic_thread_fence(std::memory_order_seq_cst); + b = bottom_.load(std::memory_order_acquire); + if (static_cast(b - t) > 0) { + buffer* buf = buffer_.load(std::memory_order_acquire); + // Racy read - safe because T is trivially copyable and destructible. If + // we lose the CAS below, the caller will ignore the out value. + load_item(out, buf->data + (static_cast(t) & buf->mask)); + if (!top_.compare_exchange_strong( + t, t + 1u, std::memory_order_seq_cst, std::memory_order_relaxed + )) { + return false; + } + return true; + } + return false; + } + + // Approximate size. Safe to call from any thread. + size_t size_approx() const { + uint32_t b = bottom_.load(std::memory_order_relaxed); + uint32_t t = top_.load(std::memory_order_relaxed); + // (b - t) is computed in uint32_t (defined wraparound) and + // reinterpreted as int32_t for the signed sign check. + int32_t s = static_cast(b - t); + return s > 0 ? static_cast(s) : 0; + } + + // Approximate emptiness. Safe to call from any thread. + bool empty() const { + uint32_t b = bottom_.load(std::memory_order_relaxed); + uint32_t t = top_.load(std::memory_order_relaxed); + // (b - t) is computed in uint32_t (defined wraparound) and + // reinterpreted as int32_t for the signed sign check. + return static_cast(b - t) <= 0; + } +}; + +} // namespace detail +} // namespace tmc diff --git a/include/tmc/detail/qu_chase_lev64.hpp b/include/tmc/detail/qu_chase_lev64.hpp new file mode 100644 index 00000000..9bad6b37 --- /dev/null +++ b/include/tmc/detail/qu_chase_lev64.hpp @@ -0,0 +1,552 @@ +// Copyright (c) 2023-2026 Logan McDougall +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE or copy at http://www.boost.org/LICENSE_1_0.txt) + +#pragma once + +#include "tmc/detail/compat.hpp" +#include "tmc/detail/tsan.hpp" + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace tmc { +namespace detail { + +// A Chase-Lev work-stealing deque. +// +// The owner (single producer/consumer) calls push() and try_pop() at the tail. +// Any other thread may call steal() to take work from the head. +// +// Buffers can grow. When the owner detects its buffer is full, it allocates a +// new buffer of double the size, copies all elements over, and atomically +// publishes the new buffer. Old buffers are retained in a "leftovers" vector +// so that stealers that loaded the old buffer pointer can still access it +// safely. Leftovers are destroyed only when the deque itself is destroyed. +// +// T must be trivially copyable (and trivially destructible). This is sufficient +// for tmc::work_item (either std::coroutine_handle<> or tmc::coro_functor), +// both of which are non-owning handles whose underlying allocations are freed +// only by the unique copy that is actually executed. +// +// This variant packs the top (head) and bottom (tail) indices into a single +// 8-byte-aligned region: +// - bytes 0-3 (low 32 bits in a 64-bit load) = top +// - bytes 4-7 (high 32 bits in a 64-bit load) = bottom (owner writes this) +// Indices are 32 bits, so the deque capacity maxes out below 2^31 elements. +// Comparisons use signed 32-bit differences so the algorithm is correct +// across 32-bit wraparound as long as |b - t| < 2^31. +// +// Because the owner is the sole writer of bottom, push() and pop() update +// the bottom half via a plain aligned 32-bit store (no LOCK prefix). On x86 +// this preserves the cheap relaxed-store hot path of classic Chase-Lev while +// still letting stealers grab a consistent (top, bottom) snapshot in a single +// aligned 64-bit load. +// +// Last-element race: rather than have pop() CAS the top, steal() CAS-es the +// full 64-bit (top, bottom) word (incrementing only top). pop() decrements +// bottom before reading top, so any in-flight stealer with a snapshot of +// (t, t+1) will observe the decremented bottom and fail its full-word CAS. +// This means pop() always wins the last-element race once its bottom +// decrement is visible - it does not need its own CAS, only a plain store +// to publish the new empty (top, bottom) state. + +template class chase_lev_deque { + static_assert( + std::is_trivially_copyable_v, + "chase_lev_deque requires a trivially copyable element type" + ); + static_assert( + std::is_trivially_destructible_v, + "chase_lev_deque requires a trivially destructible element type" + ); + static_assert( + std::endian::native == std::endian::little, + "chase_lev_deque assumes little-endian" + ); + + // Data buffers are over-aligned to 64 bytes so the low 6 bits of the data + // pointer are free for use as a tag. We use 5 of those bits to encode + // log2(capacity) (capacity is always a power of two and is bounded by 2^31 + // due to the 32-bit indices), which lets us pack (data_ptr, mask) into a + // single 8-byte word that stealers can read with one atomic load. + static constexpr size_t BUFFER_DATA_ALIGN = 64; + static constexpr uintptr_t TAG_MASK = BUFFER_DATA_ALIGN - 1; + + struct buffer { + // Owning pointer to the data block. The capacity / mask are NOT stored + // here; they are recovered from the tagged active cell in active_data_ + // when needed. This field exists only so that ~buffer can free the block. + T* data; + + buffer() : data(nullptr) {} + + void init(size_t cap) { + assert((cap & (cap - 1)) == 0 && "capacity must be a power of two"); + data = static_cast( + ::operator new(sizeof(T) * cap, std::align_val_t{BUFFER_DATA_ALIGN}) + ); + assert( + (reinterpret_cast(data) & TAG_MASK) == 0 && + "data pointer must be aligned to BUFFER_DATA_ALIGN" + ); + } + + ~buffer() { + if (data != nullptr) { + ::operator delete(data, std::align_val_t{BUFFER_DATA_ALIGN}); + } + } + + buffer(const buffer&) = delete; + buffer& operator=(const buffer&) = delete; + }; + + // Packed (top, bottom) state. Logically two 32-bit fields packed into a + // single 8-byte word: + // bytes 0..3 (low 32 bits) = bottom (owner-written via push/pop) + // bytes 4..7 (high 32 bits) = top (CAS'd by stealers; some pop paths) + // Stealers must read both halves in a single 64-bit atomic load to get a + // consistent snapshot, so the canonical storage type is uint64_t. All + // full-word loads / stores / CAS / RMW go through state_full() as + // atomic_ref over this object directly - no aliasing. + alignas(TMC_CACHE_LINE_SIZE) uint64_t state_storage_; + + // The owner's fast-path bottom update is a single 32-bit aligned atomic + // store. This avoids the LOCK prefix a 64-bit RMW would incur on x86 and + // is safe to race with a stealer's full-word CAS (the CAS observes the + // changed bottom and fails). + // + // Expressing "atomic 32-bit store to the low half of a 64-bit atomic" is + // not possible in strictly standards-compliant C++: + // - std::atomic_ref requires the referenced object to be of type T, + // so atomic_ref over half of a uint64_t violates its + // preconditions. + // - Strict aliasing ([basic.lval]) does not list uint32_t as a type + // that may alias a uint64_t object. + // - A union of atomic/atomic trips the active-member + // rule (atomic has no guaranteed common initial sequence). + // - Replacing the 32-bit store with a 64-bit RMW would cost a LOCK and + // defeat the optimization (and would corrupt the top half when + // bottom == UINT32_MAX). + // We use the GCC/Clang __may_alias__ attribute to permit the access. This + // is the ONLY place in this file that accesses state_storage_ via a type + // other than its declared uint64_t. + using aliased_u32 __attribute__((__may_alias__)) = uint32_t; + + // Owner-private "active buffer*" - used only by grow() to find the previous + // buffer slot and advance to oldBuf + 1. No synchronization with stealers + // needed (relaxed accesses are fine). + alignas(TMC_CACHE_LINE_SIZE) buffer* active_buffer_; + + // Tagged (data_ptr | log2(capacity)) cell that push/pop/steal read with a + // single atomic load to recover both the active data pointer and its mask + // without an extra indirection. Owner updates this with a release-store in + // the constructor and grow(); stealers acquire-load. + std::atomic active_data_; + + // Buffers retained until destruction so stealers always have a valid pointer. + // Each slot's data pointer starts as nullptr and is allocated when the slot + // becomes the active buffer (initial construction or grow()). The doubling- + // on-grow behavior guarantees we never need more than 62 slots in practice. + std::array buffers_; + + static constexpr size_t DEFAULT_INITIAL_CAPACITY = 128; + + // Atomic accessors over the 8-byte state word. + // + // state_full() returns an atomic_ref over the canonical uint64_t storage - + // no aliasing, this is the natural type. Used for every load, store, CAS, + // and RMW in the file. + // + // state_bottom() returns an atomic_ref over the low 32 bits of the same + // word via the __may_alias__ uint32_t alias declared above. Used only for + // owner-only stores (push/post_bulk/pop's restore path); never for loads. + std::atomic_ref state_full() noexcept { + return std::atomic_ref(state_storage_); + } + std::atomic_ref state_full() const noexcept { + return std::atomic_ref(const_cast(state_storage_)); + } + std::atomic_ref state_bottom() noexcept { + // Little-endian (asserted above): bottom occupies bytes 0..3. The + // pointer is laundered through void* to avoid clang's + // -Wundefined-reinterpret-cast (the may_alias attribute permits the + // access at runtime but the direct uint64_t*->uint32_t* cast still + // trips the warning). + return std::atomic_ref( + *static_cast(static_cast(&state_storage_)) + ); + } + + // Return the top/bottom halves as uint32_t. All arithmetic on indices is + // performed in unsigned (defined-wrap) form; callers reinterpret the + // difference as int32_t only at relational comparisons (the Chase-Lev + // signed-difference trick). + static uint32_t unpack_top(uint64_t s) { + return static_cast(s >> 32); + } + static uint32_t unpack_bottom(uint64_t s) { return static_cast(s); } + + // Unpack the data pointer from the tagged (data | log2(capacity)) cell + // stored in active_data_. + static T* unpack_data(uintptr_t w) { + return reinterpret_cast(w & ~static_cast(TAG_MASK)); + } + + // Unpack the mask (capacity - 1) from the tagged + // (data | log2(capacity)) cell stored in active_data_. + static size_t unpack_mask(uintptr_t w) { + size_t lc = static_cast(w & TAG_MASK); + return (TMC_ONE_BIT << lc) - 1; + } + + // Acquire-load of the tagged active cell. Used by stealers, which need to + // synchronize-with grow()'s release-store to see the copied elements in + // the new buffer. The caller must unpack the result via unpack_data() and + // unpack_mask(). + uintptr_t load_active_acquire() const { + return active_data_.load(std::memory_order_acquire); + } + + // Relaxed load - safe for the owner, since the owner is the only writer. + // The caller must unpack the result via unpack_data() and unpack_mask(). + uintptr_t load_active_relaxed() const { + return active_data_.load(std::memory_order_relaxed); + } + + // Owner-only. Publish a new (data, mask) pair for stealers. The + // release-store synchronizes-with stealers' acquire-load above, ensuring + // any element copies performed by grow() into the new buffer become + // visible. + void publish_active(T* data, size_t cap) { + assert((cap & (cap - 1)) == 0 && "capacity must be a power of two"); + size_t lc = static_cast(std::countr_zero(cap)); + assert(lc <= TAG_MASK && "log2(capacity) does not fit in tag bits"); + uintptr_t tagged = reinterpret_cast(data) | lc; + active_data_.store(tagged, std::memory_order_release); + } + + // For T sizes that fit in a single hardware atomic word, we use + // std::atomic_ref so that push/pop/steal slot accesses are well-defined under + // the C++ memory model and TSAN-clean. The ordering of these atomics is + // relaxed; the actual happens-before edge for slot visibility is provided by + // the release-store / acquire-load on the packed state word. + // + // For larger T (notably the 2-pointer coro_functor used when + // TMC_WORK_ITEM=FUNCORO), std::atomic_ref would either generate a + // multi-word CAS or fall back to an internal lock. We instead use the classic + // Chase-Lev "benign racy" approach which ignores invalid reads afterward, and + // simply disable TSan for the helper. + template + TMC_INLINE_OR_TSAN static void store_item(T* slot, U&& item) { + if constexpr (sizeof(T) <= sizeof(size_t)) { + T tmp(static_cast(item)); + std::atomic_ref(*slot).store(tmp, std::memory_order_relaxed); + } else { + new (slot) T(static_cast(item)); + } + } + + TMC_INLINE_OR_TSAN static void load_item(T& out, T* slot) { + if constexpr (sizeof(T) <= sizeof(size_t)) { + out = std::atomic_ref(*slot).load(std::memory_order_relaxed); + } else { + std::memcpy( + static_cast(&out), static_cast(slot), sizeof(T) + ); + } + } + +public: + chase_lev_deque() : chase_lev_deque(DEFAULT_INITIAL_CAPACITY) {} + + explicit chase_lev_deque(size_t initialCapacity) { + size_t cap = 1; + while (cap < initialCapacity) { + cap <<= 1; + } + buffer* buf = &buffers_[0]; + buf->init(cap); + active_buffer_ = buf; + publish_active(buf->data, cap); + // start at 1 so we don't underflow when subtracting bottom right away + state_full().store( + (TMC_ONE_BIT << 32) | TMC_ONE_BIT, std::memory_order_relaxed + ); + } + + ~chase_lev_deque() = default; + + chase_lev_deque(const chase_lev_deque&) = delete; + chase_lev_deque& operator=(const chase_lev_deque&) = delete; + chase_lev_deque(chase_lev_deque&&) = delete; + chase_lev_deque& operator=(chase_lev_deque&&) = delete; + + void grow(size_t cap, uint32_t b, uint32_t t, uint32_t count) { + // Buffer too small - grow to fit. + cap *= 2; + // All index arithmetic is done in uint32_t so that wraparound is defined + // behavior (signed overflow would be UB). The (b - t) difference is the + // queue's current size in two's-complement representation and adding + // count likewise wraps cleanly. + size_t needed = static_cast(b - t + count); + while (cap < needed) { + cap <<= 1; + } + + // Recover the old buffer's data pointer and mask from the tagged active + // cell (owner-only access - relaxed load is safe). + uintptr_t old_aw = load_active_relaxed(); + T* old_data = unpack_data(old_aw); + size_t old_mask = unpack_mask(old_aw); + + buffer* oldBuf = active_buffer_; + buffer* nb = oldBuf + 1; + nb->init(cap); + size_t new_mask = cap - 1; + for (uint32_t i = t; i != b; ++i) { + std::memcpy( + static_cast(nb->data + (static_cast(i) & new_mask)), + static_cast( + old_data + (static_cast(i) & old_mask) + ), + sizeof(T) + ); + } + active_buffer_ = nb; + // Release-store synchronizes-with stealers' acquire-load of the tagged + // cell. The element memcpys above are sequenced-before this store, so + // any stealer that observes the new (data, mask) pair will also observe + // the copied elements. + publish_active(nb->data, cap); + } + + // Owner-only. Push an item at the bottom (tail) of the deque. + // Grows the buffer if it is full. + template TMC_FORCE_INLINE void push(U&& item) { + auto state = state_full().load(std::memory_order_acquire); + uint32_t b = unpack_bottom(state); + uint32_t t = unpack_top(state); + // Owner is the only writer of the tagged active cell, so a relaxed + // load is sufficient here. + uintptr_t aw = load_active_relaxed(); + T* data = unpack_data(aw); + size_t mask = unpack_mask(aw); + // (b - t) is computed in uint32_t (defined wraparound). Reinterpret as + // int32_t for the signed comparison against mask (the Chase-Lev trick). + if (static_cast(b - t) > static_cast(mask)) [[unlikely]] { + grow(mask + 1, b, t, 1); + // grow() just published a new active cell; re-read it (owner-only, + // relaxed) to pick up the new data pointer and mask. + aw = load_active_relaxed(); + data = unpack_data(aw); + mask = unpack_mask(aw); + } + + store_item(data + (static_cast(b) & mask), static_cast(item)); + // Plain 32-bit aligned store to the bottom half. release publishes the + // slot store to any stealer that subsequently observes the new bottom. + state_bottom().store(b + 1u, std::memory_order_release); + } + + // Owner-only. Push Count items at the bottom (tail) of the deque, taken + // from the iterator It (incremented Count times). Grows the buffer if it + // is full. Issues only a single bottom update regardless of Count. + template + TMC_FORCE_INLINE void post_bulk(It&& Items, size_t Count) { + if (Count == 0) [[unlikely]] { + return; + } + auto state = state_full().load(std::memory_order_acquire); + uint32_t b = unpack_bottom(state); + uint32_t t = unpack_top(state); + uintptr_t aw = load_active_relaxed(); + T* data = unpack_data(aw); + size_t mask = unpack_mask(aw); + size_t capacity = mask + 1; + // Compute (queue_size + Count) entirely in uint32_t (defined wrap), then + // reinterpret as int32_t for the signed comparison. + uint32_t needed = (b - t) + static_cast(Count); + if (static_cast(needed) > static_cast(capacity)) + [[unlikely]] { + grow(capacity, b, t, static_cast(Count)); + // grow() just published a new active cell; re-read it (owner-only, + // relaxed) to pick up the new data pointer and mask. + aw = load_active_relaxed(); + data = unpack_data(aw); + mask = unpack_mask(aw); + } + + It it = static_cast(Items); + for (size_t i = 0; i < Count; ++i) { + store_item( + data + (static_cast(b + static_cast(i)) & mask), + static_cast(*it) + ); + ++it; + } + uint32_t newBottom = b + static_cast(Count); + state_bottom().store(newBottom, std::memory_order_release); + } + + // Owner-only. Pop an item from the bottom (tail) of the deque (LIFO). + // Returns true if an item was popped, false if the deque was empty. + // + // Last-element race resolution: pop() does not CAS. It decrements bottom + // before the seq_cst fence, then reads top. Any concurrent stealer's + // full-word CAS will observe the decremented bottom and fail, so pop() + // wins as long as no stealer's CAS completed before the bottom decrement + // became visible (in which case pop sees top advanced and returns false). + TMC_FORCE_INLINE bool try_pop(T& out) { + uintptr_t aw = load_active_relaxed(); + T* data = unpack_data(aw); + size_t mask = unpack_mask(aw); + auto state = state_full().load(std::memory_order_acquire); + uint32_t b = unpack_bottom(state); + uint32_t t = unpack_top(state); + RETRY: + // (b - t) is computed in uint32_t (defined wraparound). Reinterpret as + // int32_t for the signed sign check (the Chase-Lev trick). + int32_t diff = static_cast(b - t); + if (diff <= 0) { + // Queue was empty + return false; + } + if (diff == 0) { + // Queue has one element. Try to claim it by advancing top via CAS. + uint64_t newState = + static_cast(b) | (static_cast(t + 1u) << 32); + if (state_full().compare_exchange_strong( + state, newState, std::memory_order_seq_cst, + std::memory_order_relaxed + )) { + load_item(out, data + (static_cast(b) & mask)); + } + return false; + } + // Queue has more than one element. Decrement bottom to claim it. + if (b != 0) [[likely]] { + // Happiest path - try to complete the entire operation in a single FAA + state = state_full().fetch_sub(1, std::memory_order_acq_rel); + --b; + t = unpack_top(state); + } else { + // Underflow case - bottom is at 0 and we need to decrement it. + // Use CAS to set bottom without affecting top. + // (b - 1u) wraps to UINT32_MAX, which is the intended bit pattern. + uint64_t newState = (static_cast(t) << 32) | (b - 1u); + if (state_full().compare_exchange_strong( + state, newState, std::memory_order_acq_rel, + std::memory_order_relaxed + )) { + --b; + } else { + // Stealer modified top + t = unpack_top(state); + goto RETRY; + } + } + diff = static_cast(b - t); + if (diff < 0) [[unlikely]] { + // Empty. Restore bottom. + state_bottom().store(b + 1u, std::memory_order_relaxed); + return false; + } + + load_item(out, data + (static_cast(b) & mask)); + if (diff > 0) { + // More than one element - safe to take without racing stealers. + return true; + } + // diff == 0 + // Last element - pop always wins. Take the element and publish the + // empty state (top=t+1=b+1, bottom=b+1) in a single full-word store. + // Any in-flight stealer with snapshot (t, t+1) will fail its full-word + // CAS because both halves have changed (and bottom was already + // decremented to t before this store). Advancing top here is essential + // to prevent ABA: without it a subsequent push would restore the + // packed state to (t, t+1), matching the old stealer's expected value. + uint64_t empty_state = + static_cast(b + 1u) | (static_cast(b + 1u) << 32); + state_full().store(empty_state, std::memory_order_relaxed); + return true; + } + + // Any thread. Steal an item from the top (head) of the deque (FIFO). + // Returns true if an item was stolen, false otherwise (deque empty or lost + // a race). + TMC_FORCE_INLINE bool steal(T& out) { + size_t retryCount = 0; + // Single 64-bit acquire load gives a consistent (top, bottom) snapshot. + uint64_t s = state_full().load(std::memory_order_acquire); + RETRY: + uint32_t t = unpack_top(s); + uint32_t b = unpack_bottom(s); + // (b - t) is computed in uint32_t (defined wraparound). Reinterpret as + // int32_t for the signed sign check. + if (static_cast(b - t) <= 0) { + return false; + } + // Single acquire-load of the tagged (data | log2(capacity)) cell. + // Synchronizes-with grow()'s release-store, ensuring any element copies + // performed into the new buffer are visible if we observe the new tag. + // If we observe the old tag, the old buffer is still alive (retained in + // buffers_) so the memcpy below is safe. + uintptr_t aw = load_active_acquire(); + T* data = unpack_data(aw); + size_t mask = unpack_mask(aw); + // Racy read - safe because T is trivially copyable and destructible. If + // we lose the CAS below, the caller will ignore the out value. + load_item(out, data + (static_cast(t) & mask)); + // Full-word CAS: increment top, leave bottom unchanged. This will fail + // if the owner has modified bottom (push or pop) since our snapshot, + // which is what lets pop() always win the last-element race without + // having to CAS itself - pop's bottom decrement is enough to invalidate + // any in-flight stealer's expected value here. The cost is that steal() + // may now spuriously fail when concurrent push()/pop() are happening. + uint64_t desired = static_cast(static_cast(s)) | + (static_cast(t + 1u) << 32); + if (!state_full().compare_exchange_strong( + s, desired, std::memory_order_seq_cst, std::memory_order_relaxed + )) { + if (retryCount == 3) { + return false; + } + for (size_t i = 0; i < retryCount; ++i) { + TMC_CPU_PAUSE(); + } + ++retryCount; + goto RETRY; + } + return true; + } + + // Approximate size. Safe to call from any thread. + size_t size_approx() const { + uint64_t s = state_full().load(std::memory_order_relaxed); + // (bottom - top) is computed in uint32_t (defined wraparound) and + // reinterpreted as int32_t for the signed sign check. + int32_t diff = static_cast(unpack_bottom(s) - unpack_top(s)); + return diff > 0 ? static_cast(diff) : 0; + } + + // Approximate emptiness. Safe to call from any thread. + bool empty() const { + uint64_t s = state_full().load(std::memory_order_relaxed); + // (bottom - top) is computed in uint32_t (defined wraparound) and + // reinterpreted as int32_t for the signed sign check. + return static_cast(unpack_bottom(s) - unpack_top(s)) <= 0; + } +}; + +} // namespace detail +} // namespace tmc diff --git a/include/tmc/detail/qu_lockfree.hpp b/include/tmc/detail/qu_mc.hpp similarity index 60% rename from include/tmc/detail/qu_lockfree.hpp rename to include/tmc/detail/qu_mc.hpp index 84518d9d..c5fb07f8 100644 --- a/include/tmc/detail/qu_lockfree.hpp +++ b/include/tmc/detail/qu_mc.hpp @@ -9,10 +9,10 @@ // This queue has been modified from the original at: // https://github.com/cameron314/concurrentqueue/blob/master/concurrentqueue.h -// for integration within TooManyCooks. It now contains a preallocated list of -// explicit producers which can be read from as thread-local members by ex_cpu. -// Each of these producers (which represents a work-stealing thread) may dequeue -// from its own queue as a stack (LIFO) but only FIFO from other producers. +// for integration within TooManyCooks: +// - Removed explicit producers. Only implicit producers remain. +// - Cleaned up / modernized code and made operations infallible. The only +// remaining failure mode is std::bad_alloc, which we do not attempt to handle. // The original software is offered under either the BSD or Boost license. // The Boost license has been chosen here for compatibility. @@ -20,7 +20,6 @@ #pragma once #include "tmc/detail/compat.hpp" -#include "tmc/detail/thread_locals.hpp" #if defined(__GNUC__) && !defined(__INTEL_COMPILER) // Disable -Wconversion warnings (spuriously triggered when Traits::size_t and @@ -322,13 +321,9 @@ struct ConcurrentQueueDefaultTraits { // TZCNT: setting this to 2 has a small positive impact on some benchmarks, // and negative impact on others. + // TODO re-test this now that this queue is used for ingest only static const size_t ELEM_INTERLEAVING = 1; - // How many full blocks can be expected for a single explicit producer? This - // should reflect that number's maximum for optimal performance. Must be a - // power of 2. - static const size_t EXPLICIT_INITIAL_INDEX_SIZE = 32; - // How many full blocks can be expected for a single implicit producer? This // should reflect that number's maximum for optimal performance. Must be a // power of 2. @@ -342,9 +337,7 @@ struct ConcurrentQueueDefaultTraits { // Whether to recycle dynamically-allocated blocks into an internal free list // or not. If false, only pre-allocated blocks (controlled by the constructor // arguments) will be recycled, and all others will be `free`d back to the - // heap. Note that blocks consumed by explicit producers are only freed on - // destruction of the queue (not following destruction of the token) - // regardless of this trait. + // heap. static const bool RECYCLE_ALLOCATED_BLOCKS = false; #ifndef MCDBGQ_USE_RELACY @@ -587,17 +580,12 @@ class ConcurrentQueue { ? 1 : (PRODUCER_BLOCK_SIZE / TMC_PLATFORM_BITS); - static constexpr size_t PLATFORM_CACHELINE_BYTES = 64; - static constexpr size_t ELEM_INTERLEAVING = Traits::ELEM_INTERLEAVING; static constexpr size_t ELEM_INTERLEAVING_MASK = ELEM_INTERLEAVING - 1; - static constexpr size_t ELEMS_PER_CACHELINE = - PLATFORM_CACHELINE_BYTES / sizeof(T); + static constexpr size_t ELEMS_PER_CACHELINE = TMC_CACHE_LINE_SIZE / sizeof(T); static constexpr size_t ELEM_INTERLEAVING_ALL_MASK = (ELEMS_PER_CACHELINE * ELEM_INTERLEAVING) - 1; - static constexpr size_t EXPLICIT_INITIAL_INDEX_SIZE = - static_cast(Traits::EXPLICIT_INITIAL_INDEX_SIZE); static constexpr size_t IMPLICIT_INITIAL_INDEX_SIZE = static_cast(Traits::IMPLICIT_INITIAL_INDEX_SIZE); static constexpr size_t INITIAL_IMPLICIT_PRODUCER_HASH_SIZE = @@ -616,12 +604,6 @@ class ConcurrentQueue { !(BLOCK_EMPTY_ARRAY_SIZE & (BLOCK_EMPTY_ARRAY_SIZE - 1)), "Traits::BLOCK_EMPTY_ARRAY_SIZE must be a power of 2 (and at least 1)" ); - static_assert( - (EXPLICIT_INITIAL_INDEX_SIZE > 1) && - !(EXPLICIT_INITIAL_INDEX_SIZE & (EXPLICIT_INITIAL_INDEX_SIZE - 1)), - "Traits::EXPLICIT_INITIAL_INDEX_SIZE must be a power of 2 (and " - "greater than 1)" - ); static_assert( (IMPLICIT_INITIAL_INDEX_SIZE > 1) && !(IMPLICIT_INITIAL_INDEX_SIZE & (IMPLICIT_INITIAL_INDEX_SIZE - 1)), @@ -639,16 +621,13 @@ class ConcurrentQueue { ); public: - struct ExplicitProducer; - // Creates a queue with `capacity` preallocated blocks. This method is not // thread safe -- it is up to the user to ensure that the queue is fully // constructed before it starts being used by other threads (this includes // making the memory effects of construction visible, possibly with a memory // barrier). explicit ConcurrentQueue(size_t capacity = 1) - : producerListTail(nullptr), producerCount(0), initialBlockPoolIndex(0), - nextExplicitConsumerId(0), globalExplicitConsumerOffset(0) { + : producerListTail(nullptr), producerCount(0), initialBlockPoolIndex(0) { implicitProducerHashResizeInProgress.clear(std::memory_order_relaxed); populate_initial_implicit_producer_hash(); populate_initial_block_list(capacity); @@ -658,7 +637,6 @@ class ConcurrentQueue { // each kind; this makes it possible to debug them starting from // the root queue object (otherwise wacky casts are needed that // don't compile in the debugger's expression evaluator). - explicitProducers.store(nullptr, std::memory_order_relaxed); implicitProducers.store(nullptr, std::memory_order_relaxed); #endif } @@ -667,9 +645,6 @@ class ConcurrentQueue { // being deleted. It's up to the user to synchronize this. // This method is not thread safe. ~ConcurrentQueue() { - // If explicit producers exist (by ex_cpu) they must be destroyed by the - // owning executor first. - // Destroy implicit producers auto ptr = producerListTail.load(std::memory_order_relaxed); while (ptr != nullptr) { @@ -713,12 +688,9 @@ class ConcurrentQueue { ConcurrentQueue(ConcurrentQueue&& other) = delete; inline ConcurrentQueue& operator=(ConcurrentQueue&& other) = delete; - // When used by ex_cpu, explicit or implicit producers may produce. - // However, only explicit producers may consume (as implicit producers cannot - // see the explicit producers - they aren't registered to the linked list). - - // When used by ex_braid, there are no explicit producers. Implicit producers - // may produce and consume. + // When used by ex_cpu, implicit producers may be external to the executor, or + // they may be other threads within the same executor posting to a priority + // that they cannot access. // Enqueues a single item via an implicit producer. template inline void enqueue(U&& item) { @@ -728,78 +700,14 @@ class ConcurrentQueue { ); } - // Enqueues a single item using this ex_cpu thread's explicit producer. - template TMC_FORCE_INLINE inline void enqueue_ex_cpu(U&& item) { - ExplicitProducer** producers = - static_cast(tmc::detail::this_thread::producers()); - ExplicitProducer* this_thread_prod = producers[producerArrayOffset]; - this_thread_prod->enqueue(static_cast(item)); - } - // Enqueues several items via an implicit producer. template void enqueue_bulk(It itemFirst, size_t count) { auto producer = get_or_add_implicit_producer(); producer->ConcurrentQueue::ImplicitProducer::enqueue_bulk(itemFirst, count); } - // Enqueues several items using this ex_cpu thread's explicit producer. - template - TMC_FORCE_INLINE void enqueue_bulk_ex_cpu(It itemFirst, size_t count) { - ExplicitProducer** producers = - static_cast(tmc::detail::this_thread::producers()); - ExplicitProducer* this_thread_prod = producers[producerArrayOffset]; - this_thread_prod->enqueue_bulk(itemFirst, count); - } - - // Attempts to dequeue from the queue's implicit producers. - // Returns false if all producer streams appeared empty at the time they - // were checked (so, the queue is likely but not guaranteed to be empty). - // Never allocates. Thread-safe. - template bool try_dequeue(U& item) { - // Instead of simply trying each producer in turn (which could cause - // needless contention on the first producer), we score them heuristically. - size_t nonEmptyCount = 0; - ImplicitProducer* best = nullptr; - size_t bestSize = 0; - for (auto ptr = producerListTail.load(std::memory_order_acquire); - nonEmptyCount < 3 && ptr != nullptr; ptr = ptr->next_prod()) { - auto size = ptr->size_approx(); - if (size > 0) { - if (size > bestSize) { - bestSize = size; - best = ptr; - } - ++nonEmptyCount; - } - } - - // If there was at least one non-empty queue but it appears empty at the - // time we try to dequeue from it, we need to make sure every queue's been - // tried - if (nonEmptyCount > 0) { - if (best->dequeue(item)) [[likely]] { - return true; - } - for (auto ptr = producerListTail.load(std::memory_order_acquire); - ptr != nullptr; ptr = ptr->next_prod()) { - if (ptr != best && ptr->dequeue(item)) { - return true; - } - } - } - return false; - } - - // Attempts to dequeue from the queue's implicit producers. - // Returns false if all producer streams appeared empty at the time they - // were checked (so, the queue is likely but not guaranteed to be empty). - // This differs from the try_dequeue(item) method in that this one does - // not attempt to reduce contention by interleaving the order that producer - // streams are dequeued from. So, using this method can reduce overall - // throughput under contention, but will give more predictable results in - // single-threaded consumer scenarios. This is mostly only useful for internal - // unit tests. Never allocates. Thread-safe. - template bool try_dequeue_non_interleaved(U& item) { + // Equivalent of the original `try_dequeue_non_interleaved` + TMC_FORCE_INLINE bool try_dequeue(T& item) { for (auto ptr = producerListTail.load(std::memory_order_acquire); ptr != nullptr; ptr = ptr->next_prod()) { if (ptr->dequeue(item)) { @@ -809,83 +717,12 @@ class ConcurrentQueue { return false; } - // TZCNT MODIFIED: New function, used only by ex_cpu threads. - // Uses precalculated iteration order to check other queues to steal. - // Precondition: producers is the second (cache) slot for this priority. - // Increments producers to the beginning of the next priority for use by the - // outer work stealing loop. - TMC_FORCE_INLINE bool - try_dequeue_ex_cpu_steal(T& item, ExplicitProducer**& producers) { - // CHECK the implicit producers (main thread, I/O, etc) - ImplicitProducer* implicit_prod = static_cast( - producerListTail.load(std::memory_order_acquire) - ); - while (implicit_prod != nullptr) { - if (implicit_prod->dequeue(item)) { - return true; - } - implicit_prod = - static_cast(implicit_prod->next_prod()); - } - - ExplicitProducer** end = producers + dequeueProducerCount; - - // producers[0] is the previously consumed-from (cached) producer for this - // priority. If we didn't find work last time, it will be null. - ExplicitProducer** cacheProd = producers; - if (*producers == nullptr) { - ++producers; - } - - // CHECK the remaining threads in the predefined order - for (; producers != end; ++producers) { - ExplicitProducer* prod = *producers; - if (prod->dequeue(item)) { - // update prev_prod - *cacheProd = prod; - return true; - } - } - - // Some synthetic benchmarks get 1-2% faster if this line is commented - // out, but I think that might have undesirable side effects - *cacheProd = nullptr; - return false; - } - - // Attempts to dequeue several elements from the queue's implicit producers. - // Returns the number of items actually dequeued. - // Returns 0 if all producer streams appeared empty at the time they - // were checked (so, the queue is likely but not guaranteed to be empty). - // Never allocates. Thread-safe. - template size_t try_dequeue_bulk(It itemFirst, size_t max) { - size_t count = 0; - for (auto ptr = producerListTail.load(std::memory_order_acquire); - ptr != nullptr; ptr = ptr->next_prod()) { - count += ptr->dequeue_bulk(itemFirst, max - count); - if (count == max) { - break; - } - } - return count; - } - // Returns an estimate of whether or not the queue is empty. This // estimate is only accurate if the queue has completely stabilized before it // is called (i.e. all enqueue and dequeue operations have completed and their // memory effects are visible on the calling thread, and no further operations // start while this method is being called). Thread-safe. bool empty() const { - // TODO make a producer thread version of this that uses this thread's - // static iteration order - auto static_producer_count = static_cast(dequeueProducerCount); - for (ptrdiff_t pidx = 0; pidx < static_producer_count; ++pidx) { - ExplicitProducer& prod = staticProducers[pidx]; - if (prod.size_approx() != 0) { - return false; - } - } - for (auto ptr = producerListTail.load(std::memory_order_seq_cst); ptr != nullptr; ptr = ptr->next_prod()) { if (ptr->size_approx() != 0) { @@ -909,7 +746,6 @@ class ConcurrentQueue { } private: - friend struct ExplicitProducer; struct ImplicitProducer; friend struct ImplicitProducer; friend class ConcurrentQueueTests; @@ -990,1263 +826,175 @@ class ConcurrentQueue { // Decrease refcount twice, once for our ref, and once for the list's // ref - head->freeListRefs.fetch_sub(2, std::memory_order_release); - return head; - } - - // OK, the head must have changed on us, but we still need to decrease - // the refcount we increased. Note that we don't need to release any - // memory effects, but we do need to ensure that the reference count - // decrement happens-after the CAS on the head. - refs = prevHead->freeListRefs.fetch_sub(1, std::memory_order_acq_rel); - if (refs == SHOULD_BE_ON_FREELIST + 1) { - add_knowing_refcount_is_zero(prevHead); - } - } - - return nullptr; - } - - // Useful for traversing the list when there's no contention (e.g. to - // destroy remaining nodes) - N* head_unsafe() const { - return freeListHead.load(std::memory_order_relaxed); - } - - private: - inline void add_knowing_refcount_is_zero(N* node) { - // Since the refcount is zero, and nobody can increase it once it's zero - // (except us, and we run only one copy of this method per node at a time, - // i.e. the single thread case), then we know we can safely change the - // next pointer of the node; however, once the refcount is back above - // zero, then other threads could increase it (happens under heavy - // contention, when the refcount goes to zero in between a load and a - // refcount increment of a node in try_get, then back up to something - // non-zero, then the refcount increment is done by the other thread) -- - // so, if the CAS to add the node to the actual list fails, decrease the - // refcount and leave the add operation to the next thread who puts the - // refcount back at zero (which could be us, hence the loop). - auto head = freeListHead.load(std::memory_order_relaxed); - while (true) { - node->freeListNext.store(head, std::memory_order_relaxed); - node->freeListRefs.store(1, std::memory_order_release); - if (!freeListHead.compare_exchange_strong( - head, node, std::memory_order_release, std::memory_order_relaxed - )) { - // Hmm, the add failed, but we can only try again when the refcount - // goes back to zero - if (node->freeListRefs.fetch_add( - SHOULD_BE_ON_FREELIST - 1, std::memory_order_acq_rel - ) == 1) { - continue; - } - } - return; - } - } - - private: - // Implemented like a stack, but where node order doesn't matter (nodes are - // inserted out of order under contention) - std::atomic freeListHead; - - static const std::uint32_t REFS_MASK = 0x7FFFFFFF; - static const std::uint32_t SHOULD_BE_ON_FREELIST = 0x80000000; - -#ifdef MCDBGQ_NOLOCKFREE_FREELIST - debug::DebugMutex mutex; -#endif - }; - - /////////////////////////// - // Block - /////////////////////////// - - enum InnerQueueContext { implicit_context = 0, explicit_context = 1 }; - - struct Block { - Block() - : next(nullptr), elementsCompletelyDequeued(0), freeListRefs(0), - freeListNext(nullptr), dynamicallyAllocated(true) { - for (size_t i = 0; i < BLOCK_EMPTY_ARRAY_SIZE; ++i) { - emptyFlags[i] = BLOCK_EMPTY_MASK; - } -#ifdef MCDBGQ_TRACKMEM - owner = nullptr; -#endif - } - - template inline bool is_empty() const { - if constexpr (context == explicit_context) { - for (size_t i = 0; i < BLOCK_EMPTY_ARRAY_SIZE; ++i) { -#ifdef TMC_MOODYCAMEL_HAS_TSAN - // This is necessary because TSan doesn't see the acquire fence below. - // This is a required synchronizing operation between dequeue and - // enqueue operations on explicit producers. - std::atomic* addr = - const_cast*>(&emptyFlags[i]); - __tsan_acquire(reinterpret_cast(addr)); -#endif - if (emptyFlags[i].load(std::memory_order_relaxed) != - BLOCK_EMPTY_MASK) { - return false; - } - } - - // Aha, empty; make sure we have all other memory effects that happened - // before the empty flags were set - std::atomic_thread_fence(std::memory_order_acquire); - return true; - } else { - // Check counter - if (elementsCompletelyDequeued.load(std::memory_order_relaxed) == - PRODUCER_BLOCK_SIZE) { - std::atomic_thread_fence(std::memory_order_acquire); - return true; - } - assert( - elementsCompletelyDequeued.load(std::memory_order_relaxed) <= - PRODUCER_BLOCK_SIZE - ); - return false; - } - } - - // Returns true if the block is now empty (does not apply in explicit - // context) - template - inline bool set_empty([[maybe_unused]] index_t i) { - if constexpr (context == explicit_context) { - auto rawIndex = - static_cast(i & static_cast(BLOCK_MASK)); - size_t arrIndex, bitIndex; - arrIndex = rawIndex / BLOCK_EMPTY_ELEM_SIZE; - bitIndex = rawIndex & (BLOCK_EMPTY_ELEM_SIZE - 1); - size_t bit = TMC_ONE_BIT << bitIndex; - // Set flag - assert( - (emptyFlags[arrIndex].load(std::memory_order_relaxed) & bit) == 0 - ); - emptyFlags[arrIndex].fetch_or(bit, std::memory_order_release); - return false; - } else { - // Increment counter - auto prevVal = - elementsCompletelyDequeued.fetch_add(1, std::memory_order_acq_rel); - assert(prevVal < PRODUCER_BLOCK_SIZE); - return prevVal == BLOCK_MASK; - } - } - - // Sets multiple contiguous item statuses to 'empty' (assumes no wrapping - // and count > 0). Returns true if the block is now empty (does not apply in - // explicit context). - // Unused and incomplete since the transition to empty bitmask - template - inline bool set_many_empty([[maybe_unused]] index_t i, size_t count) { - if constexpr (context == explicit_context) { - // Set flags - std::atomic_thread_fence(std::memory_order_release); - // TODO implement this with interleaving - i = static_cast(i & static_cast(BLOCK_MASK)); - auto rawIndexStart = - static_cast(i & static_cast(BLOCK_MASK)); - auto arrIndexStart = rawIndexStart / BLOCK_EMPTY_ELEM_SIZE; - auto bitIndexStart = rawIndexStart & (BLOCK_EMPTY_ELEM_SIZE - 1); - - auto rawIndexEnd = i + count; // this shouldn't wrap - auto arrIndexEnd = rawIndexEnd / BLOCK_EMPTY_ELEM_SIZE; - auto bitIndexEnd = rawIndexEnd & (BLOCK_EMPTY_ELEM_SIZE - 1); - - // Begin - auto arrIndex = arrIndexStart; - auto bitIndex = bitIndexStart; - if (arrIndex < arrIndexEnd) { - size_t bits = - -(TMC_ONE_BIT << bitIndex); // set all bits from bitIndex and higher - assert( - (emptyFlags[arrIndex].load(std::memory_order_relaxed) & bits) == 0 - ); - emptyFlags[arrIndex].fetch_or(bits, std::memory_order_relaxed); - count -= (BLOCK_EMPTY_ELEM_SIZE - bitIndex); - arrIndex++; - } - - // Middle - while (count > TMC_PLATFORM_BITS) { - assert(count % TMC_PLATFORM_BITS == 0); - assert(emptyFlags[arrIndex].load(std::memory_order_relaxed) == 0); - emptyFlags[arrIndex].fetch_or( - TMC_ALL_ONES, std::memory_order_relaxed - ); - count -= TMC_PLATFORM_BITS; - arrIndex++; - } - - // End - assert(arrIndex == arrIndexEnd); - size_t bits = ((TMC_ONE_BIT << count) - 1) << (bitIndexEnd + 1 - count); - assert( - (emptyFlags[arrIndex].load(std::memory_order_relaxed) & bits) == 0 - ); - emptyFlags[arrIndex].fetch_or(bits, std::memory_order_relaxed); - return false; - } else { - // Increment counter - auto prevVal = elementsCompletelyDequeued.fetch_add( - count, std::memory_order_acq_rel - ); - assert(prevVal + count <= PRODUCER_BLOCK_SIZE); - return prevVal + count == PRODUCER_BLOCK_SIZE; - } - } - - template inline void set_all_empty() { - if constexpr (context == explicit_context) { - for (size_t i = 0; i < BLOCK_EMPTY_ARRAY_SIZE; ++i) { - emptyFlags[i].store(BLOCK_EMPTY_MASK, std::memory_order_relaxed); - } - } else { - // Reset counter - elementsCompletelyDequeued.store( - PRODUCER_BLOCK_SIZE, std::memory_order_relaxed - ); - } - } - - template inline void reset_empty() { - if constexpr (context == explicit_context) { - for (size_t i = 0; i < BLOCK_EMPTY_ARRAY_SIZE; ++i) { - emptyFlags[i].store(0, std::memory_order_relaxed); - } - } else { - // Reset counter - elementsCompletelyDequeued.store(0, std::memory_order_relaxed); - } - } - - inline T* operator[](index_t idx) TMC_MOODYCAMEL_NOEXCEPT { - size_t raw_off = - static_cast(idx & static_cast(BLOCK_MASK)); - if constexpr (ELEM_INTERLEAVING > 1) { - size_t off_unaffected = raw_off & ~ELEM_INTERLEAVING_ALL_MASK; - size_t il_bits = raw_off & ELEM_INTERLEAVING_ALL_MASK; - size_t low_bits = il_bits & ELEM_INTERLEAVING_MASK; - size_t high_bits = il_bits / ELEM_INTERLEAVING; - size_t low_bits_in_place = low_bits * ELEMS_PER_CACHELINE; - size_t off = off_unaffected | high_bits | low_bits_in_place; - return static_cast(static_cast(elements)) + off; - } else { - return static_cast(static_cast(elements)) + raw_off; - } - } - - inline T const* operator[](index_t idx) const TMC_MOODYCAMEL_NOEXCEPT { - size_t raw_off = - static_cast(idx & static_cast(BLOCK_MASK)); - - if constexpr (ELEM_INTERLEAVING > 1) { - size_t off_unaffected = raw_off & ~ELEM_INTERLEAVING_ALL_MASK; - size_t il_bits = raw_off & ELEM_INTERLEAVING_ALL_MASK; - size_t low_bits = il_bits & ELEM_INTERLEAVING_MASK; - size_t high_bits = il_bits / ELEM_INTERLEAVING; - size_t low_bits_in_place = low_bits * ELEMS_PER_CACHELINE; - size_t off = off_unaffected | high_bits | low_bits_in_place; - return static_cast(static_cast(elements)) + off; - } else { - return static_cast(static_cast(elements)) + - raw_off; - } - } - - private: - static_assert( - std::alignment_of::value <= sizeof(T), - "The queue does not support types with an alignment greater " - "than their size at this time" - ); - TMC_MOODYCAMEL_ALIGNED_TYPE_LIKE(char[sizeof(T) * PRODUCER_BLOCK_SIZE], T) - elements; - - public: - TMC_DISABLE_WARNING_PADDED_BEGIN - alignas( - TMC_CACHE_LINE_SIZE - ) std::atomic emptyFlags[BLOCK_EMPTY_ARRAY_SIZE]; - TMC_DISABLE_WARNING_PADDED_END - Block* next; - std::atomic elementsCompletelyDequeued; - - public: - std::atomic freeListRefs; - std::atomic freeListNext; - bool dynamicallyAllocated; // Perhaps a better name for this would be - // 'isNotPartOfInitialBlockPool' - -#ifdef MCDBGQ_TRACKMEM - void* owner; -#endif - }; - static_assert( - std::alignment_of::value >= std::alignment_of::value, - "Internal error: Blocks must be at least as aligned as the " - "type they are wrapping" - ); - -#ifdef MCDBGQ_TRACKMEM -public: - struct MemStats; - -private: -#endif - - /////////////////////////// - // Explicit queue - /////////////////////////// -public: - struct alignas(TMC_CACHE_LINE_SIZE) ExplicitProducer { - explicit ExplicitProducer() - : tailIndex(0), headIndex(0), dequeueOptimisticCount(0), - dequeueOvercommit(0), tailBlock(nullptr), parent(nullptr), - blockIndex(nullptr), pr_blockIndexSlotsUsed(0), - pr_blockIndexSize(EXPLICIT_INITIAL_INDEX_SIZE >> 1), - pr_blockIndexFront(0), pr_blockIndexFrontMax(0), - pr_blockIndexEntries(nullptr), pr_blockIndexRaw(nullptr) {} - - void init(ConcurrentQueue* parent_) { - parent = parent_; - size_t poolBasedIndexSize = - details::ceil_to_pow_2(parent_->initialBlockPoolSize) >> 1; - if (poolBasedIndexSize > pr_blockIndexSize) { - pr_blockIndexSize = poolBasedIndexSize; - } - - new_block_index(0); // This creates an index with double the number of - // current entries, i.e. EXPLICIT_INITIAL_INDEX_SIZE - } - - ~ExplicitProducer() { - // Destruct any elements not yet dequeued. - // Since we're in the destructor, we can assume all elements - // are either completely dequeued or completely not (no halfways). - if (this->tailBlock != nullptr) { // Note this means there must be a block - // index too - // First find the block that's partially dequeued, if any - Block* halfDequeuedBlock = nullptr; - if ((this->headIndex.load(std::memory_order_relaxed) & - static_cast(BLOCK_MASK)) != 0) { - // The head's not on a block boundary, meaning a block somewhere is - // partially dequeued (or the head block is the tail block and was - // fully dequeued, but the head/tail are still not on a boundary) - size_t i = (pr_blockIndexFrontMax - pr_blockIndexSlotsUsed) & - (pr_blockIndexSize - 1); - while (details::circular_less_than( - pr_blockIndexEntries[i].base + PRODUCER_BLOCK_SIZE, - this->headIndex.load(std::memory_order_relaxed) - )) { - i = (i + 1) & (pr_blockIndexSize - 1); - } - assert( - details::circular_less_than( - pr_blockIndexEntries[i].base, - this->headIndex.load(std::memory_order_relaxed) - ) - ); - halfDequeuedBlock = pr_blockIndexEntries[i].block; - } - - // Start at the head block (note the first line in the loop gives us the - // head from the tail on the first iteration) - auto block = this->tailBlock; - do { - block = block->next; - if (block->ConcurrentQueue::Block::template is_empty< - explicit_context>()) { - continue; - } - - size_t i = 0; // Offset into block - if (block == halfDequeuedBlock) { - i = static_cast( - this->headIndex.load(std::memory_order_relaxed) & - static_cast(BLOCK_MASK) - ); - } - - // Walk through all the items in the block; if this is the tail block, - // we need to stop when we reach the tail index - auto lastValidIndex = - (this->tailIndex.load(std::memory_order_relaxed) & - static_cast(BLOCK_MASK)) == 0 - ? PRODUCER_BLOCK_SIZE - : static_cast( - this->tailIndex.load(std::memory_order_relaxed) & - static_cast(BLOCK_MASK) - ); - while (i != PRODUCER_BLOCK_SIZE && - (block != this->tailBlock || i != lastValidIndex)) { - (*block)[i]->~T(); - ++i; - } - } while (block != this->tailBlock); - } - - // Destroy all blocks that we own - if (this->tailBlock != nullptr) { - auto block = this->tailBlock; - do { - auto nextBlock = block->next; - this->parent->add_block_to_free_list(block); - block = nextBlock; - } while (block != this->tailBlock); - } - - // Destroy the block indices - auto header = static_cast(pr_blockIndexRaw); - while (header != nullptr) { - auto prev = static_cast(header->prev); - header->~BlockIndexHeader(); - (Traits::free)(header); - header = prev; - } - } - - template TMC_FORCE_INLINE inline void enqueue(U&& element) { - index_t currentTailIndex = - this->tailIndex.load(std::memory_order_relaxed); - index_t newTailIndex = 1 + currentTailIndex; - if ((currentTailIndex & static_cast(BLOCK_MASK)) == 0) - [[unlikely]] { - // We reached the end of a block, start a new one - auto startBlock = this->tailBlock; - auto originalBlockIndexSlotsUsed = pr_blockIndexSlotsUsed; - if (this->tailBlock != nullptr && - this->tailBlock->next - ->ConcurrentQueue::Block::template is_empty()) { - // We can re-use the block ahead of us, it's empty! - this->tailBlock = this->tailBlock->next; - this->tailBlock - ->ConcurrentQueue::Block::template reset_empty(); - - // We'll put the block on the block index (guaranteed to be room since - // we're conceptually removing the last block from it first -- except - // instead of removing then adding, we can just overwrite). - } else { - // We're going to need a new block; ensure the block index has room - if (pr_blockIndexSlotsUsed == pr_blockIndexSize) { - // Hmm, the circular block index is already full -- we'll need - // to allocate a new index. - - new_block_index(pr_blockIndexSlotsUsed); - } - - // Insert a new block in the circular linked list - auto newBlock = this->parent->ConcurrentQueue::requisition_block(); -#ifdef MCDBGQ_TRACKMEM - newBlock->owner = this; -#endif - newBlock - ->ConcurrentQueue::Block::template reset_empty(); - if (this->tailBlock == nullptr) { - newBlock->next = newBlock; - } else { - newBlock->next = this->tailBlock->next; - this->tailBlock->next = newBlock; - } - this->tailBlock = newBlock; - ++pr_blockIndexSlotsUsed; - } - - if constexpr (!TMC_MOODYCAMEL_NOEXCEPT_CTOR(new ( - static_cast(nullptr) - ) T(static_cast(element)))) { - // The constructor may throw. We want the element not to appear in the - // queue in that case (without corrupting the queue): - TMC_MOODYCAMEL_TRY { - new ((*this->tailBlock)[currentTailIndex]) - T(static_cast(element)); - } - TMC_MOODYCAMEL_CATCH(...) { - // Revert change to the current block, but leave the new block - // available for next time - pr_blockIndexSlotsUsed = originalBlockIndexSlotsUsed; - this->tailBlock = - startBlock == nullptr ? this->tailBlock : startBlock; - TMC_MOODYCAMEL_RETHROW; - } - } else { - (void)startBlock; - (void)originalBlockIndexSlotsUsed; - } - - auto localBlockIndex = blockIndex.load(std::memory_order_relaxed); - size_t nextFront = (pr_blockIndexFront + 1) & (pr_blockIndexSize - 1); - if (pr_blockIndexFrontMax == pr_blockIndexFront) { - pr_blockIndexFrontMax = nextFront; - // Add block to block index. We skip this if not at FrontMax (because - // we popped this off with dequeue_lifo), because this block already - // exists in the block index with the same base value, and it causes a - // TSan warning to write the same data again. However this is a false - // positive, and the below lines could be run unconditionally since - // the data being overwritten is identical. - auto& entry = localBlockIndex->entries[pr_blockIndexFront]; - entry.base = currentTailIndex; - entry.block = this->tailBlock; - } - localBlockIndex->front.store( - pr_blockIndexFront, std::memory_order_release - ); - pr_blockIndexFront = nextFront; - - if constexpr (!TMC_MOODYCAMEL_NOEXCEPT_CTOR(new ( - static_cast(nullptr) - ) T(static_cast(element)))) { - this->tailIndex.store(newTailIndex, std::memory_order_release); - return; - } - } - - // Enqueue - new ((*this->tailBlock)[currentTailIndex]) T(static_cast(element)); - - this->tailIndex.store(newTailIndex, std::memory_order_release); - } - - // TZCNT MODIFIED: Pops from tail (like a LIFO stack) instead of from head - // (like a FIFO queue) of this thread's locally owned producer. This must - // not be called on any other thread's producer. - // - // This is always called in exactly one place. TMC_FORCE_INLINE empirically - // determined to improve perf. - template TMC_FORCE_INLINE bool dequeue_lifo(U& element) { - // Since this is our own queue, just be optimistic and go for it - // without checking if there are actually any elements first. - auto prevIndex = this->tailIndex.fetch_sub(1, std::memory_order_seq_cst); - // StoreLoad barrier required to see other readers - // Overcommit must be loaded before optimistic for correct operation - auto myOvercommit = - this->dequeueOvercommit.load(std::memory_order_seq_cst); - auto myDequeueCount = - this->dequeueOptimisticCount.load(std::memory_order_seq_cst); - if (!details::circular_less_than( - myDequeueCount - myOvercommit, prevIndex - )) [[unlikely]] { - // Wasn't anything to dequeue after all; make the effective dequeue - // count eventually consistent. - // Note that the queue can spuriously appear empty multiple times in a - // row if another consumer incremented dequeueOptimisticCount but then - // its thread was pre-empted. - this->tailIndex.store(prevIndex, std::memory_order_release); - return false; - } - - auto index = prevIndex - 1; - // Relaxed loads are OK since these values are only written by this thread - auto localBlockIndex = blockIndex.load(std::memory_order_relaxed); - auto currentTailBlockIndex = - localBlockIndex->front.load(std::memory_order_relaxed); - - assert(( - localBlockIndex->entries[currentTailBlockIndex].block == this->tailBlock - )); - Block* block = this->tailBlock; - if ((index & static_cast(BLOCK_MASK)) == 0) [[unlikely]] { - // tailIndex was pointing at the first (empty) element of a new block. - // index now points at the last element of the previous block. - // as we dequeue from index, we need to back up tailIndex so that it is - // also at the end of the previous block. - assert(currentTailBlockIndex != TMC_ALL_ONES); - assert((localBlockIndex->entries[currentTailBlockIndex].base == index)); - - // When backing up, we can underflow the array: - // (index wraps from 0 to pr_blockIndexSize - 1) - // or underflow the used slots: - // (index wraps from pr_blockIndexFrontMax - pr_blockIndexSlotsUsed - // to pr_blockIndexFrontMax - 1) - if (pr_blockIndexSlotsUsed > 1) { - size_t underflowFrom; - if (currentTailBlockIndex == - ((pr_blockIndexFrontMax - pr_blockIndexSlotsUsed) & - (pr_blockIndexSize - 1))) { - underflowFrom = pr_blockIndexFrontMax; - } else { - underflowFrom = currentTailBlockIndex; - } - auto blockBeforeTailBlockIndex = - (underflowFrom - 1) & (pr_blockIndexSize - 1); - Block* blockBeforeTailBlock = - localBlockIndex->entries[blockBeforeTailBlockIndex].block; - localBlockIndex->front.store( - blockBeforeTailBlockIndex, std::memory_order_release - ); - pr_blockIndexFront = currentTailBlockIndex; - - assert((blockBeforeTailBlock->next == this->tailBlock)); - this->tailBlock = blockBeforeTailBlock; - } - block - ->ConcurrentQueue::Block::template set_all_empty(); - } - - // Dequeue - T& el = *((*block)[index]); - if constexpr (!TMC_MOODYCAMEL_NOEXCEPT_ASSIGN( - element = static_cast(el) - )) { - struct Guard { - Block* block; - index_t index; - - ~Guard() { - (*block)[index]->~T(); - // set_empty() not needed here - that happens in the underflow block - } - } guard = {block, index}; - element = static_cast(el); // NOLINT - } else { - element = static_cast(el); // NOLINT - el.~T(); // NOLINT - // set_empty() not needed here - that happens in the underflow block - } - - return true; - } - - template bool dequeue(U& element) { - auto tail = this->tailIndex.load(std::memory_order_relaxed); - auto overcommit = this->dequeueOvercommit.load(std::memory_order_relaxed); - if (!details::circular_less_than( - this->dequeueOptimisticCount.load(std::memory_order_relaxed) - - overcommit, - tail - )) { - return false; - } - // Might be something to dequeue, let's give it a try - - // Note that this if is purely for performance purposes in the common - // case when the queue is empty and the values are eventually consistent - // -- we may enter here spuriously. - - // Note that whatever the values of overcommit and tail are, they are - // not going to change (unless we change them) and must be the same - // value at this point (inside the if) as when the if condition was - // evaluated. - - // We insert an acquire fence here to synchronize-with the release upon - // incrementing dequeueOvercommit below. This ensures that whatever the - // value we got loaded into overcommit, the load of dequeueOptisticCount - // in the fetch_add below will result in a value at least as recent as - // that (and therefore at least as large). Note that I believe a - // compiler (signal) fence here would be sufficient due to the nature of - // fetch_add (all read-modify-write operations are guaranteed to work on - // the latest value in the modification order), but unfortunately that - // can't be shown to be correct using only the C++11 standard. See - // http://stackoverflow.com/questions/18223161/what-are-the-c11-memory-ordering-guarantees-in-this-corner-case - std::atomic_thread_fence(std::memory_order_acquire); - - // Increment optimistic counter, then check if it went over the boundary - // TZCNT MODIFIED: From relaxed to acq_rel. This is required to - // synchronize with dequeue_lifo (on explicit producers only - implicit - // producers don't have dequeue_lifo). - auto myDequeueCount = - this->dequeueOptimisticCount.fetch_add(1, std::memory_order_acq_rel); - - // Note that since dequeueOvercommit must be <= dequeueOptimisticCount - // (because dequeueOvercommit is only ever incremented after - // dequeueOptimisticCount -- this is enforced in the `else` block - // below), and since we now have a version of dequeueOptimisticCount - // that is at least as recent as overcommit (due to the release upon - // incrementing dequeueOvercommit and the acquire above that - // synchronizes with it), overcommit <= myDequeueCount. However, we - // can't assert this since both dequeueOptimisticCount and - // dequeueOvercommit may (independently) overflow; in such a case, - // though, the logic still holds since the difference between the two is - // maintained. - - // Note that we reload tail here in case it changed; it will be the same - // value as before or greater, since this load is sequenced after - // (happens after) the earlier load above. This is supported by - // read-read coherency (as defined in the standard), explained here: - // http://en.cppreference.com/w/cpp/atomic/memory_order - tail = this->tailIndex.load(std::memory_order_acquire); - if (!details::circular_less_than( - myDequeueCount - overcommit, tail - )) { - // Wasn't anything to dequeue after all; make the effective dequeue - // count eventually consistent - this->dequeueOvercommit.fetch_add(1, std::memory_order_release); - return false; - } - - // Guaranteed to be at least one element to dequeue! - // Get the index. Note that since there's guaranteed to be at least - // one element, this will never exceed tail. We need to do an - // acquire-release fence here since it's possible that whatever - // condition got us to this point was for an earlier enqueued element - // (that we already see the memory effects for), but that by the time - // we increment somebody else has incremented it, and we need to see - // the memory effects for *that* element, which is in such a case is - // necessarily visible on the thread that incremented it in the first - // place with the more current condition (they must have acquired a - // tail that is at least as recent). - auto index = this->headIndex.fetch_add(1, std::memory_order_acq_rel); - - // Determine which block the element is in - auto localBlockIndex = blockIndex.load(std::memory_order_acquire); - auto localBlockIndexHead = - localBlockIndex->front.load(std::memory_order_acquire); - - // We need to be careful here about subtracting and dividing because - // of index wrap-around. When an index wraps, we need to preserve the - // sign of the offset when dividing it by the block size (in order to - // get a correct signed block count offset in all cases): - auto headBase = localBlockIndex->entries[localBlockIndexHead].base; - auto blockBaseIndex = index & ~static_cast(BLOCK_MASK); - auto offset = static_cast( - static_cast::type>( - blockBaseIndex - headBase - ) / - static_cast::type>( - PRODUCER_BLOCK_SIZE - ) - ); - auto block = - localBlockIndex - ->entries - [(localBlockIndexHead + offset) & (localBlockIndex->size - 1)] - .block; - - // Dequeue - T& el = *((*block)[index]); - if constexpr (!TMC_MOODYCAMEL_NOEXCEPT_ASSIGN( - element = static_cast(el) - )) { - // Make sure the element is still fully dequeued and destroyed even - // if the assignment throws - struct Guard { - Block* block; - index_t index; - - ~Guard() { - (*block)[index]->~T(); - block->ConcurrentQueue::Block::template set_empty( - index - ); - } - } guard = {block, index}; - - element = static_cast(el); // NOLINT - } else { - element = static_cast(el); // NOLINT - el.~T(); // NOLINT - block->ConcurrentQueue::Block::template set_empty( - index - ); - } - - return true; - } - - template - TMC_FORCE_INLINE void TMC_MOODYCAMEL_NO_TSAN - enqueue_bulk(It itemFirst, size_t count) { - // static constexpr bool HasMoveConstructor = std::is_constructible_v< - // T, std::add_rvalue_reference_t>>; - static constexpr bool HasNoexceptMoveConstructor = - std::is_nothrow_constructible_v< - T, std::add_rvalue_reference_t>>; - - static constexpr bool HasCopyConstructor = std::is_constructible_v< - T, std::add_lvalue_reference_t>>; - static constexpr bool HasNoexceptCopyConstructor = - std::is_nothrow_constructible_v< - T, std::add_lvalue_reference_t>>; - - // Prefer constructors in this order: - // 1. Noexcept move constructor - // 2. Noexcept copy constructor - // 3. Copy constructor - // 4. Move constructor - - // For 3. and 4., prefer copy constructor even if move constructor is - // available because we may have to revert if there's an - // exception. - static constexpr bool UseMoveConstructor = - HasNoexceptMoveConstructor || !HasCopyConstructor; - - static constexpr bool IsConstructorNoexcept = - HasNoexceptMoveConstructor || HasNoexceptCopyConstructor; - - // First, we need to make sure we have enough room to enqueue all of the - // elements; this means pre-allocating blocks and putting them in the - // block index. - index_t startTailIndex = this->tailIndex.load(std::memory_order_relaxed); - auto startBlock = this->tailBlock; - auto originalBlockIndexFront = pr_blockIndexFront; - auto originalBlockIndexFrontMax = pr_blockIndexFrontMax; - auto originalBlockIndexSlotsUsed = pr_blockIndexSlotsUsed; - - Block* firstAllocatedBlock = nullptr; - - // Figure out how many blocks we'll need to allocate, and do so - size_t blockBaseDiff = - ((startTailIndex + count - 1) & ~static_cast(BLOCK_MASK)) - - ((startTailIndex - 1) & ~static_cast(BLOCK_MASK)); - index_t currentTailIndex = - (startTailIndex - 1) & ~static_cast(BLOCK_MASK); - if (blockBaseDiff > 0) [[unlikely]] { - auto localBlockIndex = blockIndex.load(std::memory_order_relaxed); - // Allocate as many blocks as possible from ahead - while ( - blockBaseDiff > 0 && this->tailBlock != nullptr && - this->tailBlock->next != firstAllocatedBlock && - this->tailBlock->next - ->ConcurrentQueue::Block::template is_empty() - ) { - blockBaseDiff -= static_cast(PRODUCER_BLOCK_SIZE); - currentTailIndex += static_cast(PRODUCER_BLOCK_SIZE); - - this->tailBlock = this->tailBlock->next; - firstAllocatedBlock = firstAllocatedBlock == nullptr - ? this->tailBlock - : firstAllocatedBlock; - - size_t nextFront = (pr_blockIndexFront + 1) & (pr_blockIndexSize - 1); - if (pr_blockIndexFrontMax == pr_blockIndexFront) { - pr_blockIndexFrontMax = nextFront; - // Add block to block index. We skip this if not at FrontMax - // (because we popped this off with dequeue_lifo), because this - // block already exists in the block index with the same base value, - // and it causes a TSan warning to write the same data again. - // However this is a false positive, and the below lines could be - // run unconditionally since the data being overwritten is - // identical. - auto& entry = localBlockIndex->entries[pr_blockIndexFront]; - entry.base = currentTailIndex; - entry.block = this->tailBlock; - } - pr_blockIndexFront = nextFront; - } - - // Now allocate as many blocks as necessary from the block pool - while (blockBaseDiff > 0) { - blockBaseDiff -= static_cast(PRODUCER_BLOCK_SIZE); - currentTailIndex += static_cast(PRODUCER_BLOCK_SIZE); - if (pr_blockIndexSlotsUsed == pr_blockIndexSize) { - new_block_index(originalBlockIndexSlotsUsed); - - // pr_blockIndexFront is updated inside new_block_index, so we - // need to update our fallback value too (since we keep the new - // index even if we later fail) - originalBlockIndexFront = originalBlockIndexSlotsUsed; - originalBlockIndexFrontMax = originalBlockIndexSlotsUsed; - } - - // Insert a new block in the circular linked list - auto newBlock = this->parent->ConcurrentQueue::requisition_block(); - -#ifdef MCDBGQ_TRACKMEM - newBlock->owner = this; -#endif - newBlock->ConcurrentQueue::Block::template set_all_empty< - explicit_context>(); - if (this->tailBlock == nullptr) { - newBlock->next = newBlock; - } else { - newBlock->next = this->tailBlock->next; - this->tailBlock->next = newBlock; - } - this->tailBlock = newBlock; - firstAllocatedBlock = firstAllocatedBlock == nullptr - ? this->tailBlock - : firstAllocatedBlock; - - ++pr_blockIndexSlotsUsed; - - auto& entry = blockIndex.load(std::memory_order_relaxed) - ->entries[pr_blockIndexFront]; - entry.base = currentTailIndex; - entry.block = this->tailBlock; - bool frontMatched = pr_blockIndexFrontMax == pr_blockIndexFront; - pr_blockIndexFront = - (pr_blockIndexFront + 1) & (pr_blockIndexSize - 1); - if (frontMatched) { - pr_blockIndexFrontMax = pr_blockIndexFront; - } - } - - // Reset each block's emptiness before we fill them up, and publish the - // new block index front - auto block = firstAllocatedBlock; - while (true) { - block - ->ConcurrentQueue::Block::template reset_empty(); - if (block == this->tailBlock) { - break; - } - block = block->next; - } - - if constexpr (IsConstructorNoexcept) { - blockIndex.load(std::memory_order_relaxed) - ->front.store( - (pr_blockIndexFront - 1) & (pr_blockIndexSize - 1), - std::memory_order_release - ); - } - } - - // Enqueue, one block at a time - index_t newTailIndex = startTailIndex + static_cast(count); - currentTailIndex = startTailIndex; - auto endBlock = this->tailBlock; - this->tailBlock = startBlock; - assert( - (startTailIndex & static_cast(BLOCK_MASK)) != 0 || - firstAllocatedBlock != nullptr || count == 0 - ); - if ((startTailIndex & static_cast(BLOCK_MASK)) == 0 && - firstAllocatedBlock != nullptr) { - this->tailBlock = firstAllocatedBlock; - } - while (true) { - index_t stopIndex = - (currentTailIndex & ~static_cast(BLOCK_MASK)) + - static_cast(PRODUCER_BLOCK_SIZE); - if (details::circular_less_than(newTailIndex, stopIndex)) { - stopIndex = newTailIndex; - } - - if constexpr (IsConstructorNoexcept) { - while (currentTailIndex != stopIndex) { - if constexpr (UseMoveConstructor) { - new ((*this->tailBlock)[currentTailIndex]) - T(std::move(*itemFirst)); - } else { - new ((*this->tailBlock)[currentTailIndex]) - T(details::nomove(*itemFirst)); - } - ++currentTailIndex; - ++itemFirst; - } - } else { - TMC_MOODYCAMEL_TRY { - while (currentTailIndex != stopIndex) { - if constexpr (UseMoveConstructor) { - new ((*this->tailBlock)[currentTailIndex]) - T(std::move(*itemFirst)); - } else { - new ((*this->tailBlock)[currentTailIndex]) - T(details::nomove(*itemFirst)); - } - ++currentTailIndex; - ++itemFirst; - } - } - TMC_MOODYCAMEL_CATCH(...) { - // Oh dear, an exception's been thrown -- destroy the elements that - // were enqueued so far and revert the entire bulk operation (we'll - // keep any allocated blocks in our linked list for later, though). - auto constructedStopIndex = currentTailIndex; - auto lastBlockEnqueued = this->tailBlock; - - pr_blockIndexFront = originalBlockIndexFront; - pr_blockIndexFrontMax = originalBlockIndexFrontMax; - pr_blockIndexSlotsUsed = originalBlockIndexSlotsUsed; - this->tailBlock = - startBlock == nullptr ? firstAllocatedBlock : startBlock; - - if (!details::is_trivially_destructible::value) { - auto block = startBlock; - if ((startTailIndex & static_cast(BLOCK_MASK)) == 0) { - block = firstAllocatedBlock; - } - currentTailIndex = startTailIndex; - while (true) { - stopIndex = - (currentTailIndex & ~static_cast(BLOCK_MASK)) + - static_cast(PRODUCER_BLOCK_SIZE); - if (details::circular_less_than( - constructedStopIndex, stopIndex - )) { - stopIndex = constructedStopIndex; - } - while (currentTailIndex != stopIndex) { - (*block)[currentTailIndex]->~T(); - ++currentTailIndex; - } - if (block == lastBlockEnqueued) { - break; - } - block = block->next; - } - } - TMC_MOODYCAMEL_RETHROW; - } + head->freeListRefs.fetch_sub(2, std::memory_order_release); + return head; } - if (this->tailBlock == endBlock) { - assert(currentTailIndex == newTailIndex); - break; + // OK, the head must have changed on us, but we still need to decrease + // the refcount we increased. Note that we don't need to release any + // memory effects, but we do need to ensure that the reference count + // decrement happens-after the CAS on the head. + refs = prevHead->freeListRefs.fetch_sub(1, std::memory_order_acq_rel); + if (refs == SHOULD_BE_ON_FREELIST + 1) { + add_knowing_refcount_is_zero(prevHead); } - this->tailBlock = this->tailBlock->next; - } - - if constexpr (!IsConstructorNoexcept) { - if (firstAllocatedBlock != nullptr) - blockIndex.load(std::memory_order_relaxed) - ->front.store( - (pr_blockIndexFront - 1) & (pr_blockIndexSize - 1), - std::memory_order_release - ); } - this->tailIndex.store(newTailIndex, std::memory_order_release); + return nullptr; } - template size_t dequeue_bulk(It& itemFirst, size_t max) { - auto tail = this->tailIndex.load(std::memory_order_relaxed); - auto overcommit = this->dequeueOvercommit.load(std::memory_order_relaxed); - auto desiredCount = static_cast( - tail - (this->dequeueOptimisticCount.load(std::memory_order_relaxed) - - overcommit) - ); - if (details::circular_less_than(0, desiredCount)) { - desiredCount = desiredCount < max ? desiredCount : max; - std::atomic_thread_fence(std::memory_order_acquire); - - auto myDequeueCount = this->dequeueOptimisticCount.fetch_add( - desiredCount, std::memory_order_relaxed - ); + // Useful for traversing the list when there's no contention (e.g. to + // destroy remaining nodes) + N* head_unsafe() const { + return freeListHead.load(std::memory_order_relaxed); + } - tail = this->tailIndex.load(std::memory_order_acquire); - auto actualCount = - static_cast(tail - (myDequeueCount - overcommit)); - if (details::circular_less_than(0, actualCount)) { - actualCount = desiredCount < actualCount ? desiredCount : actualCount; - if (actualCount < desiredCount) { - this->dequeueOvercommit.fetch_add( - desiredCount - actualCount, std::memory_order_release - ); + private: + inline void add_knowing_refcount_is_zero(N* node) { + // Since the refcount is zero, and nobody can increase it once it's zero + // (except us, and we run only one copy of this method per node at a time, + // i.e. the single thread case), then we know we can safely change the + // next pointer of the node; however, once the refcount is back above + // zero, then other threads could increase it (happens under heavy + // contention, when the refcount goes to zero in between a load and a + // refcount increment of a node in try_get, then back up to something + // non-zero, then the refcount increment is done by the other thread) -- + // so, if the CAS to add the node to the actual list fails, decrease the + // refcount and leave the add operation to the next thread who puts the + // refcount back at zero (which could be us, hence the loop). + auto head = freeListHead.load(std::memory_order_relaxed); + while (true) { + node->freeListNext.store(head, std::memory_order_relaxed); + node->freeListRefs.store(1, std::memory_order_release); + if (!freeListHead.compare_exchange_strong( + head, node, std::memory_order_release, std::memory_order_relaxed + )) { + // Hmm, the add failed, but we can only try again when the refcount + // goes back to zero + if (node->freeListRefs.fetch_add( + SHOULD_BE_ON_FREELIST - 1, std::memory_order_acq_rel + ) == 1) { + continue; } - - // Get the first index. Note that since there's guaranteed to be at - // least actualCount elements, this will never exceed tail. - auto firstIndex = - this->headIndex.fetch_add(actualCount, std::memory_order_acq_rel); - - // Determine which block the first element is in - auto localBlockIndex = blockIndex.load(std::memory_order_acquire); - auto localBlockIndexHead = - localBlockIndex->front.load(std::memory_order_acquire); - - auto headBase = localBlockIndex->entries[localBlockIndexHead].base; - auto firstBlockBaseIndex = - firstIndex & ~static_cast(BLOCK_MASK); - auto offset = static_cast( - static_cast::type>( - firstBlockBaseIndex - headBase - ) / - static_cast::type>( - PRODUCER_BLOCK_SIZE - ) - ); - auto indexIndex = - (localBlockIndexHead + offset) & (localBlockIndex->size - 1); - - // Iterate the blocks and dequeue - auto index = firstIndex; - do { - auto firstIndexInBlock = index; - index_t endIndex = (index & ~static_cast(BLOCK_MASK)) + - static_cast(PRODUCER_BLOCK_SIZE); - endIndex = - details::circular_less_than( - firstIndex + static_cast(actualCount), endIndex - ) - ? firstIndex + static_cast(actualCount) - : endIndex; - auto block = localBlockIndex->entries[indexIndex].block; - if constexpr (TMC_MOODYCAMEL_NOEXCEPT_ASSIGN( - details::deref_noexcept(itemFirst) = - static_cast((*(*block)[index])) - )) { - while (index != endIndex) { - T& el = *((*block)[index]); - *itemFirst = static_cast(el); - el.~T(); - ++index; - ++itemFirst; - } - } else { - TMC_MOODYCAMEL_TRY { - while (index != endIndex) { - T& el = *((*block)[index]); - *itemFirst = static_cast(el); - ++itemFirst; - el.~T(); - ++index; - } - } - TMC_MOODYCAMEL_CATCH(...) { - // It's too late to revert the dequeue, but we can make sure - // that all the dequeued objects are properly destroyed and the - // block index (and empty count) are properly updated before we - // propagate the exception - do { - block = localBlockIndex->entries[indexIndex].block; - while (index != endIndex) { - (*block)[index]->~T(); - ++index; - } - block->ConcurrentQueue::Block::template set_many_empty< - explicit_context>( - firstIndexInBlock, - static_cast(endIndex - firstIndexInBlock) - ); - indexIndex = (indexIndex + 1) & (localBlockIndex->size - 1); - - firstIndexInBlock = index; - endIndex = (index & ~static_cast(BLOCK_MASK)) + - static_cast(PRODUCER_BLOCK_SIZE); - endIndex = - details::circular_less_than( - firstIndex + static_cast(actualCount), endIndex - ) - ? firstIndex + static_cast(actualCount) - : endIndex; - } while (index != firstIndex + actualCount); - - TMC_MOODYCAMEL_RETHROW; - } - } - block->ConcurrentQueue::Block::template set_many_empty< - explicit_context>( - firstIndexInBlock, - static_cast(endIndex - firstIndexInBlock) - ); - indexIndex = (indexIndex + 1) & (localBlockIndex->size - 1); - } while (index != firstIndex + actualCount); - - return actualCount; - } else { - // Wasn't anything to dequeue after all; make the effective dequeue - // count eventually consistent - this->dequeueOvercommit.fetch_add( - desiredCount, std::memory_order_release - ); } + return; } - - return 0; - } - - inline size_t size_approx() const { - auto tail = tailIndex.load(std::memory_order_relaxed); - auto head = headIndex.load(std::memory_order_relaxed); - return details::circular_less_than(head, tail) - ? static_cast(tail - head) - : 0; } private: - struct BlockIndexEntry { - index_t base; - Block* block; - }; + // Implemented like a stack, but where node order doesn't matter (nodes are + // inserted out of order under contention) + std::atomic freeListHead; - struct BlockIndexHeader { - size_t size; - std::atomic - front; // Current slot (not next, like pr_blockIndexFront) - BlockIndexEntry* entries; - void* prev; - }; + static const std::uint32_t REFS_MASK = 0x7FFFFFFF; + static const std::uint32_t SHOULD_BE_ON_FREELIST = 0x80000000; - void new_block_index(size_t numberOfFilledSlotsToExpose) { - auto prevBlockSizeMask = pr_blockIndexSize - 1; +#ifdef MCDBGQ_NOLOCKFREE_FREELIST + debug::DebugMutex mutex; +#endif + }; - // Create the new block - pr_blockIndexSize <<= 1; - auto newRawPtr = Traits::malloc( - sizeof(BlockIndexHeader) + std::alignment_of::value - - 1 + sizeof(BlockIndexEntry) * pr_blockIndexSize - ); + /////////////////////////// + // Block + /////////////////////////// - auto newBlockIndexEntries = - reinterpret_cast(details::align_for( - newRawPtr + sizeof(BlockIndexHeader) - )); - - assert(pr_blockIndexFront == pr_blockIndexFrontMax); - // Copy in all the old indices, if any - size_t j = 0; - if (pr_blockIndexSlotsUsed != 0) { - auto i = - (pr_blockIndexFront - pr_blockIndexSlotsUsed) & prevBlockSizeMask; - do { - newBlockIndexEntries[j] = pr_blockIndexEntries[i]; - ++j; - i = (i + 1) & prevBlockSizeMask; - } while (i != pr_blockIndexFront); + struct Block { + Block() + : next(nullptr), elementsCompletelyDequeued(0), freeListRefs(0), + freeListNext(nullptr), dynamicallyAllocated(true) { + for (size_t i = 0; i < BLOCK_EMPTY_ARRAY_SIZE; ++i) { + emptyFlags[i] = BLOCK_EMPTY_MASK; } +#ifdef MCDBGQ_TRACKMEM + owner = nullptr; +#endif + } - // Update everything - auto header = new (newRawPtr) BlockIndexHeader; - header->size = pr_blockIndexSize; - header->front.store( - numberOfFilledSlotsToExpose - 1, std::memory_order_relaxed + inline bool is_empty() const { + + // Check counter + if (elementsCompletelyDequeued.load(std::memory_order_relaxed) == + PRODUCER_BLOCK_SIZE) { + std::atomic_thread_fence(std::memory_order_acquire); + return true; + } + assert( + elementsCompletelyDequeued.load(std::memory_order_relaxed) <= + PRODUCER_BLOCK_SIZE ); - header->entries = newBlockIndexEntries; - header->prev = pr_blockIndexRaw; // we link the new block to the old one - // so we can free it later - - pr_blockIndexFront = j; - pr_blockIndexFrontMax = j; - pr_blockIndexEntries = newBlockIndexEntries; - pr_blockIndexRaw = newRawPtr; - blockIndex.store(header, std::memory_order_release); + return false; } - private: - std::atomic tailIndex; // Where to enqueue to next - std::atomic headIndex; // Where to dequeue from next - - std::atomic dequeueOptimisticCount; - std::atomic dequeueOvercommit; + // Returns true if the block is now empty + inline bool set_empty([[maybe_unused]] index_t i) { + // Increment counter + auto prevVal = + elementsCompletelyDequeued.fetch_add(1, std::memory_order_acq_rel); + assert(prevVal < PRODUCER_BLOCK_SIZE); + return prevVal == BLOCK_MASK; + } - Block* tailBlock; + inline void reset_empty() { + // Reset counter + elementsCompletelyDequeued.store(0, std::memory_order_relaxed); + } - public: - ConcurrentQueue* parent; + inline T* operator[](index_t idx) TMC_MOODYCAMEL_NOEXCEPT { + size_t raw_off = + static_cast(idx & static_cast(BLOCK_MASK)); + if constexpr (ELEM_INTERLEAVING > 1) { + size_t off_unaffected = raw_off & ~ELEM_INTERLEAVING_ALL_MASK; + size_t il_bits = raw_off & ELEM_INTERLEAVING_ALL_MASK; + size_t low_bits = il_bits & ELEM_INTERLEAVING_MASK; + size_t high_bits = il_bits / ELEM_INTERLEAVING; + size_t low_bits_in_place = low_bits * ELEMS_PER_CACHELINE; + size_t off = off_unaffected | high_bits | low_bits_in_place; + return static_cast(static_cast(elements)) + off; + } else { + return static_cast(static_cast(elements)) + raw_off; + } + } private: - std::atomic blockIndex; + static_assert( + std::alignment_of::value <= sizeof(T), + "The queue does not support types with an alignment greater " + "than their size at this time" + ); + TMC_MOODYCAMEL_ALIGNED_TYPE_LIKE(char[sizeof(T) * PRODUCER_BLOCK_SIZE], T) + elements; - // To be used by producer only -- consumer must use the ones in referenced - // by blockIndex - size_t pr_blockIndexSlotsUsed; - size_t pr_blockIndexSize; - size_t pr_blockIndexFront; // Next slot (not current) - size_t - pr_blockIndexFrontMax; // Highest value of pr_blockIndexFront we've set - BlockIndexEntry* pr_blockIndexEntries; - void* pr_blockIndexRaw; + public: + TMC_DISABLE_WARNING_PADDED_BEGIN + alignas( + TMC_CACHE_LINE_SIZE + ) std::atomic emptyFlags[BLOCK_EMPTY_ARRAY_SIZE]; + TMC_DISABLE_WARNING_PADDED_END + Block* next; + std::atomic elementsCompletelyDequeued; -#ifdef TMC_MOODYCAMEL_QUEUE_INTERNAL_DEBUG public: - ExplicitProducer* nextExplicitProducer; + std::atomic freeListRefs; + std::atomic freeListNext; + bool dynamicallyAllocated; // Perhaps a better name for this would be + // 'isNotPartOfInitialBlockPool' - private: +#ifdef MCDBGQ_TRACKMEM + void* owner; #endif + }; + static_assert( + std::alignment_of::value >= std::alignment_of::value, + "Internal error: Blocks must be at least as aligned as the " + "type they are wrapping" + ); #ifdef MCDBGQ_TRACKMEM - friend struct MemStats; +public: + struct MemStats; + +private: #endif - TMC_DISABLE_WARNING_PADDED_BEGIN - }; - TMC_DISABLE_WARNING_PADDED_END private: ////////////////////////////////// @@ -2340,8 +1088,7 @@ class ConcurrentQueue { #ifdef MCDBGQ_TRACKMEM newBlock->owner = this; #endif - newBlock - ->ConcurrentQueue::Block::template reset_empty(); + newBlock->reset_empty(); if constexpr (!TMC_MOODYCAMEL_NOEXCEPT_CTOR(new ( static_cast(nullptr) @@ -2427,8 +1174,7 @@ class ConcurrentQueue { ~Guard() { (*block)[index]->~T(); - if (block->ConcurrentQueue::Block::template set_empty< - implicit_context>(index)) { + if (block->set_empty(index)) { entry->value.store(nullptr, std::memory_order_relaxed); parent->add_block_to_free_list(block); } @@ -2440,9 +1186,7 @@ class ConcurrentQueue { element = static_cast(el); // NOLINT el.~T(); // NOLINT - if (block->ConcurrentQueue::Block::template set_empty( - index - )) { + if (block->set_empty(index)) { { #ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX debug::DebugLock lock(mutex); @@ -2531,8 +1275,7 @@ class ConcurrentQueue { #ifdef MCDBGQ_TRACKMEM newBlock->owner = this; #endif - newBlock - ->ConcurrentQueue::Block::template reset_empty(); + newBlock->reset_empty(); newBlock->next = nullptr; // Insert the new block into the index @@ -2731,8 +1474,7 @@ class ConcurrentQueue { ++index; } - if (block->ConcurrentQueue::Block::template set_many_empty< - implicit_context>( + if (block->set_many_empty( blockStartIndex, static_cast(endIndex - blockStartIndex) )) { @@ -2759,8 +1501,7 @@ class ConcurrentQueue { TMC_MOODYCAMEL_RETHROW; } } - if (block->ConcurrentQueue::Block::template set_many_empty< - implicit_context>( + if (block->set_many_empty( blockStartIndex, static_cast(endIndex - blockStartIndex) )) { @@ -3486,20 +2227,6 @@ class ConcurrentQueue { queue->implicit_producer_thread_exited(producer); } -public: - // this is not used by all classes, only by ex_cpu. so it is not managed by - // this' destructor, but by init() / teardown() of ex_cpu - // array of deqeueueProducerCount elements - ExplicitProducer* staticProducers; - // Count of staticProducers - size_t dequeueProducerCount = 0; - - // Offset into the thread-local flattened producers array - // (tmc::detail::this_thread::producers) for this priority. This is the sum of - // (dequeueProducerCount + 1) for all priorities < this one. The +1 is for the - // extra cache slot that is inserted into that array. - size_t producerArrayOffset = 0; - private: std::atomic producerListTail; std::atomic producerCount; diff --git a/include/tmc/detail/qu_work_stealing.hpp b/include/tmc/detail/qu_work_stealing.hpp new file mode 100644 index 00000000..5129fae7 --- /dev/null +++ b/include/tmc/detail/qu_work_stealing.hpp @@ -0,0 +1,103 @@ +// Copyright (c) 2023-2026 Logan McDougall +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE or copy at http://www.boost.org/LICENSE_1_0.txt) + +#pragma once + +#include "tmc/detail/compat.hpp" +#include "tmc/work_item.hpp" + +#include +#include +#include +#include + +#if TMC_PLATFORM_BITS == 64 +#include "tmc/detail/qu_chase_lev64.hpp" +#else +#include "tmc/detail/qu_chase_lev32.hpp" +#endif + +// A collection of ThreadCount individual deques, each of which is "owned" by a +// single ex_cpu thread. This collection owns the lifecycle of the `deques` +// array, but the methods don't use it. Instead, threads have a TLS-cached steal +// order that they use to look for victims, which is passed in to the methods. + +namespace tmc { +namespace detail { +struct qu_work_stealing { + using cldq_t = chase_lev_deque; + cldq_t* deques; + size_t threadCount; + size_t tlsArrayOffset; + + qu_work_stealing(size_t ThreadCount, size_t CumulativeOffset) + : threadCount{ThreadCount}, tlsArrayOffset{CumulativeOffset} { + if (ThreadCount > 0) { + deques = new chase_lev_deque[ThreadCount]; + } + } + + ~qu_work_stealing() { + if (threadCount > 0) { + delete[] deques; + } + } + + // Try to steal an item from one of the other worker threads' Chase-Lev + // deques using the precalculated (TLS-cached) static iteration order. Returns + // true if an item was stolen. + // + // Precondition: producers points to the second (cache) slot for this thread's + // deque list on this priority. + // Postcondition: producers is advanced to the beginning of the next + // priority's slot block, ready to try_pop for this thread. + TMC_FORCE_INLINE inline bool + try_steal_in_order(tmc::work_item& item, cldq_t**& producers) { + cldq_t** end = producers + threadCount; + + // producers[0] is the previously consumed-from (cached) deque for this + // priority. If we didn't find work last time, it will be null. + cldq_t** cacheProd = producers; + if (*producers == nullptr) { + ++producers; + } + + // Check the remaining threads in the predefined order + for (; producers != end; ++producers) { + // Use a single pause to limit cross-core cache coherency traffic under + // load. + TMC_CPU_PAUSE(); + cldq_t* prod = *producers; + if (prod->steal(item)) { + // update prev_prod + *cacheProd = prod; + return true; + } + } + + *cacheProd = nullptr; + return false; + } + + // Per-priority TLS array layout: [self, cache, others...]. + // Skip both self and cache and start iterating from index 2. + // Check if all of `others` are empty. + TMC_FORCE_INLINE inline bool + steal_empty(tmc::detail::chase_lev_deque**& producersBase) { + cldq_t** producers = producersBase + tlsArrayOffset + 1; + size_t deqCount = threadCount; + for (size_t d = 1; d < deqCount; ++d) { + TMC_CPU_PAUSE(); + if (!producers[d]->empty()) { + // update cache so runner finds this first + producers[0] = producers[d]; + return false; + } + } + return true; + } +}; +} // namespace detail +} // namespace tmc diff --git a/include/tmc/detail/tsan.hpp b/include/tmc/detail/tsan.hpp new file mode 100644 index 00000000..3adc553d --- /dev/null +++ b/include/tmc/detail/tsan.hpp @@ -0,0 +1,22 @@ +#pragma once + +// The Chase-Lev deque has a racy read where it will memcpy data that may be +// invalid, and then check the validity by an atomic load afterward. If invalid, +// the potentially-garbage data is discarded. TSan doesn't like this. +// When the data size is pointer-sized (std::coroutine_handle) we can solve this +// by accessing it using a relaxed read from an atomic_ref, which TSan accepts. +// When the data size is 2 pointers (tmc::coro_functor), we would have to use a +// lot of workaround hacks. It's simpler to just disable TSan for the specific +// accesses in that case. +#if defined(__has_feature) +#if __has_feature(thread_sanitizer) +#define TMC_HAS_TSAN +#endif // TSAN +#endif // TSAN + +// no_sanitize suppression only works if the function is not inlined. +#ifdef TMC_HAS_TSAN +#define TMC_INLINE_OR_TSAN __attribute__((no_sanitize("thread"))) +#else +#define TMC_INLINE_OR_TSAN TMC_FORCE_INLINE +#endif diff --git a/include/tmc/ex_cpu.hpp b/include/tmc/ex_cpu.hpp index b3acfcea..144c465c 100644 --- a/include/tmc/ex_cpu.hpp +++ b/include/tmc/ex_cpu.hpp @@ -15,7 +15,8 @@ #include "tmc/detail/init_params.hpp" #include "tmc/detail/matrix.hpp" #include "tmc/detail/qu_inbox.hpp" -#include "tmc/detail/qu_lockfree.hpp" +#include "tmc/detail/qu_mc.hpp" +#include "tmc/detail/qu_work_stealing.hpp" #include "tmc/detail/thread_locals.hpp" #include "tmc/detail/tiny_vec.hpp" #include "tmc/ex_any.hpp" @@ -44,14 +45,20 @@ class ex_cpu { TMC_DISABLE_WARNING_PADDED_BEGIN }; TMC_DISABLE_WARNING_PADDED_END - using task_queue_t = tmc::queue::ConcurrentQueue; - // One inbox per thread group - tmc::detail::tiny_vec> inboxes; - tmc::detail::InitParams* init_params; // accessed only during init() - tmc::detail::tiny_vec threads; // size() == thread_count() + // size() == thread group count + tmc::detail::tiny_vec> q_inbox; + + // size() == PRIORITY_COUNT + tmc::detail::tiny_vec> q_ingest; + + // size() == PRIORITY_COUNT + // inner size = thread_count + tmc::detail::tiny_vec q_ws; + using cldq_t = tmc::detail::chase_lev_deque; + + tmc::detail::InitParams* init_params; // accessed only during init() tmc::ex_any type_erased_this; - tmc::detail::tiny_vec work_queues; // size() == PRIORITY_COUNT // stop_sources that correspond to this pool's threads tmc::detail::tiny_vec thread_stoppers; size_t spins; @@ -72,10 +79,8 @@ class ex_cpu { // here due to the possibility to construct an ex_cpu inside of a task. Tasks // don't support overaligned allocation by default without a compiler flag, // which would be a footgun. - char pad0[TMC_CACHE_LINE_SIZE - sizeof(size_t)]; - tmc::detail::atomic_bitmap working_threads_bitset; - tmc::detail::atomic_bitmap spinning_threads_bitset; - char pad1[TMC_CACHE_LINE_SIZE - 2 * sizeof(size_t)]; + tmc::detail::tiny_vec threads; // size() == thread_count() + char pad1[TMC_CACHE_LINE_SIZE - sizeof(size_t)]; // Last-chance wake handshake per priority. This avoids the fallback wake when // the priority's sentinel thread is not trying to sleep, and coalesces // simultaneous fallback producers so only one performs the wake syscall. @@ -90,6 +95,9 @@ class ex_cpu { // running, the join() call in the destructor will block until it completes. std::atomic ref_count; char pad2[TMC_CACHE_LINE_SIZE - sizeof(size_t)]; + tmc::detail::atomic_bitmap working_threads_bitset; + tmc::detail::atomic_bitmap spinning_threads_bitset; + char pad3[TMC_CACHE_LINE_SIZE - sizeof(size_t)]; // capitalized variables are constant while ex_cpu is initialized & running #ifdef TMC_PRIORITY_COUNT @@ -116,15 +124,14 @@ class ex_cpu { TMC_DECL void notify_hint(size_t Priority, size_t ThreadHint); TMC_DECL void init_thread_locals(size_t Slot); - TMC_DECL task_queue_t::ExplicitProducer** + TMC_DECL cldq_t** init_queue_iteration_order(std::vector> const& Forward); TMC_DECL void clear_thread_locals(); // Returns a lambda closure that is executed on a worker thread TMC_DECL auto make_worker( tmc::topology::thread_info Info, size_t PriorityRangeBegin, - size_t PriorityRangeEnd, - ex_cpu::task_queue_t::ExplicitProducer** StealOrder, + size_t PriorityRangeEnd, ex_cpu::cldq_t** StealOrder, std::atomic& InitThreadsBarrier, // will be nullptr if hwloc is not enabled tmc::detail::hwloc_unique_bitmap& CpuSet, @@ -367,11 +374,13 @@ class ex_cpu { } if (Count > 0) [[likely]] { if (allowedPriority) [[likely]] { - work_queues[Priority].enqueue_bulk_ex_cpu( - static_cast(Items), Count - ); + // Push to this thread's Chase-Lev deque for this priority. + cldq_t** producers = + static_cast(tmc::detail::this_thread::producers()); + cldq_t* myDeque = producers[q_ws[Priority].tlsArrayOffset]; + myDeque->post_bulk(static_cast(Items), Count); } else { - work_queues[Priority].enqueue_bulk(static_cast(Items), Count); + q_ingest[Priority].enqueue_bulk(static_cast(Items), Count); } notify_n(Count, Priority, allowedPriority, true); } diff --git a/include/tmc/work_item.hpp b/include/tmc/work_item.hpp index b5af8e76..e5370e9c 100644 --- a/include/tmc/work_item.hpp +++ b/include/tmc/work_item.hpp @@ -6,19 +6,16 @@ #pragma once // Selects the type of tmc::work_item based on the provided compile time -// parameter TMC_WORK_ITEM= -// CORO, FUNC, or FUNCORO +// parameter TMC_WORK_ITEM=CORO or TMC_WORK_ITEM=FUNCORO // CORO will be the default if undefined - -// Macro hackery to enable defines TMC_WORK_ITEM=CORO / TMC_WORK_ITEM=FUNC, etc #ifndef TMC_WORK_ITEM #define TMC_WORK_ITEM CORO #endif #define TMC_WORK_ITEM_CORO 0 -#define TMC_WORK_ITEM_FUNC 1 -#define TMC_WORK_ITEM_FUNCORO 2 +#define TMC_WORK_ITEM_FUNCORO 1 +#define TMC_WORK_ITEM_FUNC 2 #define TMC_CONCAT_impl(a, b) a##b #define TMC_CONCAT(a, b) TMC_CONCAT_impl(a, b) #define TMC_WORK_ITEM_IS_impl(WORK_ITEM_TYPE) \ @@ -33,21 +30,6 @@ using work_item = std::coroutine_handle<>; } #define TMC_WORK_ITEM_AS_STD_CORO(x) (x) -#elif TMC_WORK_ITEM_IS(FUNC) -#ifndef TMC_TRIVIAL_TASK -// This is because std::function requires its template type to be copyable. -static_assert( - false, "If TMC_WORK_ITEM=FUNC, then TMC_TRIVIAL_TASK must also be defined." -); -#endif -#include -#include -namespace tmc { -using work_item = std::function; -} -#define TMC_WORK_ITEM_AS_STD_CORO(x) \ - (*x.template target>()) - #elif TMC_WORK_ITEM_IS(FUNCORO) #include "tmc/detail/coro_functor.hpp" namespace tmc { @@ -55,4 +37,9 @@ using work_item = tmc::detail::coro_functor; } #define TMC_WORK_ITEM_AS_STD_CORO(x) (x.as_coroutine()) +#elif TMC_WORK_ITEM_IS(FUNC) +static_assert( + false, "TMC_WORK_ITEM=FUNC was removed in v2.0. Use TMC_WORK_ITEM=FUNCORO " + "instead." +); #endif