diff --git a/include/tmc/detail/ex_cpu_st.ipp b/include/tmc/detail/ex_cpu_st.ipp index 06c982e7..56d5a408 100644 --- a/include/tmc/detail/ex_cpu_st.ipp +++ b/include/tmc/detail/ex_cpu_st.ipp @@ -5,6 +5,7 @@ #pragma once +#include "tmc/current.hpp" #include "tmc/detail/compat.hpp" #include "tmc/detail/hwloc_unique_bitmap.hpp" #include "tmc/detail/impl.hpp" // IWYU pragma: keep @@ -136,7 +137,7 @@ void ex_cpu_st::post(work_item&& Item, size_t Priority, size_t ThreadHint) { clamp_priority(Priority); bool fromExecThread = tmc::detail::this_thread::executor() == &type_erased_this; - // A non-zero ThreadHint indicates that reschedule() was called. In that case + // A zero ThreadHint indicates that reschedule() was called. In that case // we should use the external queue to force FIFO ordering. if (fromExecThread && ThreadHint != 0) [[likely]] { private_work[Priority].push_back(static_cast(Item)); diff --git a/include/tmc/detail/tiny_stack.hpp b/include/tmc/detail/tiny_stack.hpp new file mode 100644 index 00000000..a35d08b3 --- /dev/null +++ b/include/tmc/detail/tiny_stack.hpp @@ -0,0 +1,116 @@ +// Copyright (c) 2023-2026 Logan McDougall +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE or copy at http://www.boost.org/LICENSE_1_0.txt) + +#pragma once + +#include "tmc/detail/compat.hpp" +#include "tmc/work_item.hpp" + +#include +#include +#include +#include + +// A lightweight owner-only LIFO stack used for per-thread private queues. +// Only the owning worker thread reads/writes it - no synchronization needed. +// Assumes work_item is trivially copyable and trivially destructible, so no +// placement new or destructor calls are needed; growth is a single allocate + +// memcpy. + +// Currently not a template since it's only used for tmc::work_item. + +namespace tmc { +namespace detail { +class tiny_stack { + work_item* data; + size_t sz; + size_t cap; + + static_assert(std::is_trivially_copyable_v); + static_assert(std::is_trivially_destructible_v); + +public: + tiny_stack() noexcept : data(new_data(64)), sz(0), cap(64) {} + + ~tiny_stack() { delete_data(data); } + + tiny_stack(const tiny_stack&) = delete; + tiny_stack& operator=(const tiny_stack&) = delete; + tiny_stack(tiny_stack&&) = delete; + tiny_stack& operator=(tiny_stack&&) = delete; + + TMC_FORCE_INLINE bool empty() const noexcept { return sz == 0; } + TMC_FORCE_INLINE work_item& back() noexcept { return data[sz - 1]; } + TMC_FORCE_INLINE void pop_back() noexcept { --sz; } + + TMC_FORCE_INLINE void push_back(work_item Item) { + if (sz == cap) [[unlikely]] { + grow(sz + 1); + } + data[sz++] = Item; + } + + // Bulk append. For raw `work_item*` input, collapses to a single memcpy. + // For other iterator types, iterates and copies. + template + TMC_FORCE_INLINE void push_back_bulk(It&& Items, size_t Count) { + if (sz + Count > cap) [[unlikely]] { + grow(sz + Count); + } + using ItNoRef = std::decay_t; + if constexpr (std::is_pointer_v && + std::is_same_v< + std::remove_cv_t>, + work_item>) { + std::memcpy(data + sz, Items, Count * sizeof(work_item)); + } else { + auto items = Items; + for (size_t i = 0; i < Count; ++i) { + data[sz + i] = *items; + ++items; + } + } + sz += Count; + } + +private: + // Out-of-line slow path. Doubles capacity until Needed fits. + void grow(size_t Needed) { + size_t newCap = cap * 2; + while (newCap < Needed) { + newCap *= 2; + } + auto* newData = new_data(newCap); + if (sz != 0) { + std::memcpy(newData, data, sz * sizeof(work_item)); + } + delete_data(data); + data = newData; + cap = newCap; + } + + TMC_FORCE_INLINE work_item* new_data(size_t Capacity) { + if constexpr (alignof(work_item) > __STDCPP_DEFAULT_NEW_ALIGNMENT__) { + return static_cast(::operator new( + sizeof(work_item) * Capacity, std::align_val_t(alignof(work_item)) + )); + } else { + return static_cast( + ::operator new(sizeof(work_item) * Capacity) + ); + } + } + + TMC_FORCE_INLINE void delete_data(work_item* Data) { + if constexpr (alignof(work_item) > __STDCPP_DEFAULT_NEW_ALIGNMENT__) { + ::operator delete(Data, std::align_val_t(alignof(work_item))); + } else { + ::operator delete(Data); + } + } +}; + +} // namespace detail +} // namespace tmc diff --git a/include/tmc/ex_cpu_st.hpp b/include/tmc/ex_cpu_st.hpp index ecac52bf..b471b2a8 100644 --- a/include/tmc/ex_cpu_st.hpp +++ b/include/tmc/ex_cpu_st.hpp @@ -4,6 +4,7 @@ // file LICENSE or copy at http://www.boost.org/LICENSE_1_0.txt) #pragma once + #include "tmc/detail/impl.hpp" // IWYU pragma: keep #include "tmc/aw_resume_on.hpp" @@ -12,6 +13,7 @@ #include "tmc/detail/init_params.hpp" #include "tmc/detail/qu_mpsc_blocking.hpp" #include "tmc/detail/thread_locals.hpp" +#include "tmc/detail/tiny_stack.hpp" #include "tmc/detail/tiny_vec.hpp" #include "tmc/ex_any.hpp" #include "tmc/topology.hpp" @@ -44,7 +46,7 @@ class ex_cpu_st { using task_queue_t = tmc::detail::qu_mpsc_blocking; tmc::detail::tiny_vec work_queues; // size() == PRIORITY_COUNT - tmc::detail::tiny_vec> + tmc::detail::tiny_vec private_work; // size() == PRIORITY_COUNT // stop_source for the single worker thread std::stop_source thread_stopper; @@ -222,13 +224,10 @@ class ex_cpu_st { bool fromExecThread = tmc::detail::this_thread::executor() == &type_erased_this; if (Count > 0) [[likely]] { - // A non-zero ThreadHint indicates that reschedule() was called. In that + // A zero ThreadHint indicates that reschedule() was called. In that // case we should use the external queue to force FIFO ordering. if (fromExecThread && ThreadHint != 0) [[likely]] { - for (size_t i = 0; i < Count; ++i) { - private_work[Priority].push_back(std::move(*Items)); - ++Items; - } + private_work[Priority].push_back_bulk(static_cast(Items), Count); request_yield(Priority); } else { bool didWake =