Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion include/tmc/detail/ex_cpu_st.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

#pragma once

#include "tmc/current.hpp"
#include "tmc/detail/compat.hpp"
#include "tmc/detail/hwloc_unique_bitmap.hpp"
#include "tmc/detail/impl.hpp" // IWYU pragma: keep
Expand Down Expand Up @@ -136,7 +137,7 @@ void ex_cpu_st::post(work_item&& Item, size_t Priority, size_t ThreadHint) {
clamp_priority(Priority);
bool fromExecThread =
tmc::detail::this_thread::executor() == &type_erased_this;
// A non-zero ThreadHint indicates that reschedule() was called. In that case
// A zero ThreadHint indicates that reschedule() was called. In that case
// we should use the external queue to force FIFO ordering.
if (fromExecThread && ThreadHint != 0) [[likely]] {
private_work[Priority].push_back(static_cast<work_item&&>(Item));
Expand Down
116 changes: 116 additions & 0 deletions include/tmc/detail/tiny_stack.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
// Copyright (c) 2023-2026 Logan McDougall
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE or copy at http://www.boost.org/LICENSE_1_0.txt)

#pragma once

#include "tmc/detail/compat.hpp"
#include "tmc/work_item.hpp"

#include <cstddef>
#include <cstring>
#include <new>
#include <type_traits>

// A lightweight owner-only LIFO stack used for per-thread private queues.
// Only the owning worker thread reads/writes it - no synchronization needed.
// Assumes work_item is trivially copyable and trivially destructible, so no
// placement new or destructor calls are needed; growth is a single allocate +
// memcpy.

// Currently not a template since it's only used for tmc::work_item.

namespace tmc {
namespace detail {
class tiny_stack {
work_item* data;
size_t sz;
size_t cap;

static_assert(std::is_trivially_copyable_v<work_item>);
static_assert(std::is_trivially_destructible_v<work_item>);

public:
tiny_stack() noexcept : data(new_data(64)), sz(0), cap(64) {}

~tiny_stack() { delete_data(data); }

tiny_stack(const tiny_stack&) = delete;
tiny_stack& operator=(const tiny_stack&) = delete;
tiny_stack(tiny_stack&&) = delete;
tiny_stack& operator=(tiny_stack&&) = delete;

TMC_FORCE_INLINE bool empty() const noexcept { return sz == 0; }
TMC_FORCE_INLINE work_item& back() noexcept { return data[sz - 1]; }
TMC_FORCE_INLINE void pop_back() noexcept { --sz; }

TMC_FORCE_INLINE void push_back(work_item Item) {
if (sz == cap) [[unlikely]] {
grow(sz + 1);
}
data[sz++] = Item;
}

// Bulk append. For raw `work_item*` input, collapses to a single memcpy.
// For other iterator types, iterates and copies.
template <typename It>
TMC_FORCE_INLINE void push_back_bulk(It&& Items, size_t Count) {
if (sz + Count > cap) [[unlikely]] {
grow(sz + Count);
}
using ItNoRef = std::decay_t<It>;
if constexpr (std::is_pointer_v<ItNoRef> &&
std::is_same_v<
std::remove_cv_t<std::remove_pointer_t<ItNoRef>>,
work_item>) {
std::memcpy(data + sz, Items, Count * sizeof(work_item));
} else {
auto items = Items;
for (size_t i = 0; i < Count; ++i) {
data[sz + i] = *items;
++items;
}
}
sz += Count;
}

private:
// Out-of-line slow path. Doubles capacity until Needed fits.
void grow(size_t Needed) {
size_t newCap = cap * 2;
while (newCap < Needed) {
newCap *= 2;
}
auto* newData = new_data(newCap);
if (sz != 0) {
std::memcpy(newData, data, sz * sizeof(work_item));
}
delete_data(data);
data = newData;
cap = newCap;
}

TMC_FORCE_INLINE work_item* new_data(size_t Capacity) {
if constexpr (alignof(work_item) > __STDCPP_DEFAULT_NEW_ALIGNMENT__) {
return static_cast<work_item*>(::operator new(
sizeof(work_item) * Capacity, std::align_val_t(alignof(work_item))
));
} else {
return static_cast<work_item*>(
::operator new(sizeof(work_item) * Capacity)
);
}
}

TMC_FORCE_INLINE void delete_data(work_item* Data) {
if constexpr (alignof(work_item) > __STDCPP_DEFAULT_NEW_ALIGNMENT__) {
::operator delete(Data, std::align_val_t(alignof(work_item)));
} else {
::operator delete(Data);
}
}
};

} // namespace detail
} // namespace tmc
11 changes: 5 additions & 6 deletions include/tmc/ex_cpu_st.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
// file LICENSE or copy at http://www.boost.org/LICENSE_1_0.txt)

#pragma once

#include "tmc/detail/impl.hpp" // IWYU pragma: keep

#include "tmc/aw_resume_on.hpp"
Expand All @@ -12,6 +13,7 @@
#include "tmc/detail/init_params.hpp"
#include "tmc/detail/qu_mpsc_blocking.hpp"
#include "tmc/detail/thread_locals.hpp"
#include "tmc/detail/tiny_stack.hpp"
#include "tmc/detail/tiny_vec.hpp"
#include "tmc/ex_any.hpp"
#include "tmc/topology.hpp"
Expand Down Expand Up @@ -44,7 +46,7 @@ class ex_cpu_st {
using task_queue_t = tmc::detail::qu_mpsc_blocking<work_item, qu_cfg>;
tmc::detail::tiny_vec<task_queue_t> work_queues; // size() == PRIORITY_COUNT

tmc::detail::tiny_vec<std::vector<work_item>>
tmc::detail::tiny_vec<tmc::detail::tiny_stack>
private_work; // size() == PRIORITY_COUNT
// stop_source for the single worker thread
std::stop_source thread_stopper;
Expand Down Expand Up @@ -222,13 +224,10 @@ class ex_cpu_st {
bool fromExecThread =
tmc::detail::this_thread::executor() == &type_erased_this;
if (Count > 0) [[likely]] {
// A non-zero ThreadHint indicates that reschedule() was called. In that
// A zero ThreadHint indicates that reschedule() was called. In that
// case we should use the external queue to force FIFO ordering.
if (fromExecThread && ThreadHint != 0) [[likely]] {
for (size_t i = 0; i < Count; ++i) {
private_work[Priority].push_back(std::move(*Items));
++Items;
}
private_work[Priority].push_back_bulk(static_cast<It&&>(Items), Count);
request_yield(Priority);
} else {
bool didWake =
Expand Down
Loading