Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion include/tmc/detail/compat.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

#include <atomic>
#include <cstddef>
#include <cstdint>

#if defined(_MSC_VER)

Expand Down Expand Up @@ -193,7 +194,11 @@ _Pragma("GCC diagnostic pop")

#define NO_HINT static_cast<size_t>(-1)

inline constexpr size_t TMC_PLATFORM_BITS = sizeof(size_t) * 8; // 32 or 64
#if SIZE_MAX == 0xFFFFFFFFu
#define TMC_PLATFORM_BITS 32
#else
#define TMC_PLATFORM_BITS 64
#endif

inline constexpr size_t TMC_MAX_PRIORITY_COUNT = 16;

Expand Down
131 changes: 74 additions & 57 deletions include/tmc/detail/ex_cpu.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
#include "tmc/detail/hwloc_unique_bitmap.hpp"
#include "tmc/detail/impl.hpp" // IWYU pragma: keep
#include "tmc/detail/matrix.hpp"
#include "tmc/detail/qu_lockfree.hpp"
#include "tmc/detail/qu_mc.hpp"
#include "tmc/detail/thread_layout.hpp"
#include "tmc/detail/thread_locals.hpp"
#include "tmc/ex_any.hpp"
Expand Down Expand Up @@ -288,29 +288,28 @@ INTERRUPT_DONE:
}
}

ex_cpu::task_queue_t::ExplicitProducer** ex_cpu::init_queue_iteration_order(
ex_cpu::cldq_t** ex_cpu::init_queue_iteration_order(
std::vector<std::vector<size_t>> const& Forward
) {
// Calculate total size based on the global producerArrayOffset layout.
// Each priority level contributes (dequeueProducerCount + 1) slots.
// The array must be full-sized so that producerArrayOffset indexing works
// Calculate total size based on the global tlsArrayOffset layout.
// Each priority level contributes (threadCount + 1) slots.
// The array must be full-sized so that tlsArrayOffset indexing works
// correctly, even if this thread doesn't handle all priorities.
// Layout per priority: [self, cached, others...]
size_t totalSize = 0;
for (size_t prio = 0; prio < PRIORITY_COUNT; ++prio) {
totalSize += work_queues[prio].dequeueProducerCount + 1;
totalSize += q_ws[prio].threadCount + 1;
}

if (totalSize == 0) {
return nullptr;
}

task_queue_t::ExplicitProducer** producers =
new task_queue_t::ExplicitProducer*[totalSize];
cldq_t** producers = new cldq_t*[totalSize];

for (size_t prio = 0; prio < PRIORITY_COUNT; ++prio) {
size_t offset = work_queues[prio].producerArrayOffset;
size_t slotCount = work_queues[prio].dequeueProducerCount + 1;
size_t offset = q_ws[prio].tlsArrayOffset;
size_t slotCount = q_ws[prio].threadCount + 1;

if (Forward[prio].empty()) {
// This thread doesn't handle this priority - null out all slots
Expand All @@ -320,17 +319,16 @@ ex_cpu::task_queue_t::ExplicitProducer** ex_cpu::init_queue_iteration_order(
continue;
}

assert(Forward[prio].size() == work_queues[prio].dequeueProducerCount);
assert(Forward[prio].size() == q_ws[prio].threadCount);

auto& thisMatrix = Forward[prio];
// pointer to this thread's producer
producers[offset] = &work_queues[prio].staticProducers[thisMatrix[0]];
// pointer to previously consumed-from producer (initially none)
// pointer to this thread's deque
producers[offset] = &q_ws[prio].deques[thisMatrix[0]];
// pointer to previously consumed-from deque (initially none)
producers[offset + 1] = nullptr;

for (size_t i = 1; i < Forward[prio].size(); ++i) {
producers[offset + 1 + i] =
&work_queues[prio].staticProducers[thisMatrix[i]];
producers[offset + 1 + i] = &q_ws[prio].deques[thisMatrix[i]];
}
}
return producers;
Expand Down Expand Up @@ -389,25 +387,22 @@ TMC_FORCE_INLINE inline bool ex_cpu::try_run_some(
const size_t PriorityRangeBegin, const size_t PriorityRangeEnd,
size_t& PrevPriority, bool& Spinning
) {
size_t idxBase = work_queues[PriorityRangeBegin].producerArrayOffset;
task_queue_t::ExplicitProducer** producersBase =
static_cast<task_queue_t::ExplicitProducer**>(
tmc::detail::this_thread::producers()
) +
idxBase;
size_t idxBase = q_ws[PriorityRangeBegin].tlsArrayOffset;
cldq_t** producersBase =
static_cast<cldq_t**>(tmc::detail::this_thread::producers()) + idxBase;
TOP:
if (ThreadStopToken.stop_requested()) [[unlikely]] {
return false;
}
work_item item;

task_queue_t::ExplicitProducer** producers = producersBase;
cldq_t** producers = producersBase;
// For priority 0, check private queue, then inbox, then try to steal
// Lower priorities can just check private queue and steal - no inbox
// Although this could be combined into the following loop (with an if
// statement to remove the inbox check), it gives better codegen for the
// fast path to keep it separate.
if ((*producers)->dequeue_lifo(item)) [[likely]] {
if ((*producers)->try_pop(item)) [[likely]] {
run_one(item, Slot, PriorityRangeBegin, PrevPriority, Spinning);
goto TOP;
}
Expand All @@ -422,21 +417,32 @@ TOP:
goto TOP;
}

if (work_queues[PriorityRangeBegin].try_dequeue_ex_cpu_steal(item, producers))
[[likely]] {
// Check for new work coming from I/O or from the main thread
if (q_ingest[PriorityRangeBegin].try_dequeue(item)) {
run_one(item, Slot, PriorityRangeBegin, PrevPriority, Spinning);
goto TOP;
}

// Try to steal from other threads in this executor
if (q_ws[PriorityRangeBegin].try_steal_in_order(item, producers)) [[likely]] {
run_one(item, Slot, PriorityRangeBegin, PrevPriority, Spinning);
goto TOP;
}

// Now check lower priority queues
for (size_t prio = PriorityRangeBegin + 1; prio < PriorityRangeEnd; ++prio) {
if ((*producers)->dequeue_lifo(item)) {
if ((*producers)->try_pop(item)) {
run_one(item, Slot, prio, PrevPriority, Spinning);
goto TOP;
}
++producers;

if (work_queues[prio].try_dequeue_ex_cpu_steal(item, producers)) {
if (q_ingest[prio].try_dequeue(item)) {
run_one(item, Slot, prio, PrevPriority, Spinning);
goto TOP;
}

if (q_ws[prio].try_steal_in_order(item, producers)) {
run_one(item, Slot, prio, PrevPriority, Spinning);
goto TOP;
}
Expand Down Expand Up @@ -481,9 +487,13 @@ void ex_cpu::post(work_item&& Item, size_t Priority, size_t ThreadHint) {
}
}
if (allowedPriority) [[likely]] {
work_queues[Priority].enqueue_ex_cpu(static_cast<work_item&&>(Item));
// Push to this thread's Chase-Lev deque for this priority.
cldq_t** producers =
static_cast<cldq_t**>(tmc::detail::this_thread::producers());
cldq_t* myDeque = producers[q_ws[Priority].tlsArrayOffset];
myDeque->push(static_cast<work_item&&>(Item));
} else {
work_queues[Priority].enqueue(static_cast<work_item&&>(Item));
q_ingest[Priority].enqueue(static_cast<work_item&&>(Item));
}
notify_n(1, Priority, allowedPriority, true);

Expand All @@ -509,7 +519,7 @@ ex_cpu::ex_cpu()

auto ex_cpu::make_worker(
tmc::topology::thread_info Info, size_t PriorityRangeBegin,
size_t PriorityRangeEnd, ex_cpu::task_queue_t::ExplicitProducer** StealOrder,
size_t PriorityRangeEnd, ex_cpu::cldq_t** StealOrder,
std::atomic<tmc::detail::atomic_wait_t>& InitThreadsBarrier,
// will be nullptr if hwloc is not enabled
[[maybe_unused]] tmc::detail::hwloc_unique_bitmap& CpuSet,
Expand Down Expand Up @@ -595,14 +605,18 @@ auto ex_cpu::make_worker(
// working threads. This prevents too many spinners in a lightly
// loaded system.
if (2 * spinningThreadCount <= workingThreadCount) {
cldq_t** producersBase =
static_cast<cldq_t**>(tmc::detail::this_thread::producers());
for (size_t i = 0; i < spins; ++i) {
TMC_CPU_PAUSE();
if (!thread_states[Slot].inbox->empty()) {
goto TOP;
}
for (size_t prio = PriorityRangeBegin; prio < PriorityRangeEnd;
++prio) {
if (!work_queues[prio].empty()) {
if (!q_ingest[prio].empty()) {
goto TOP;
}
if (!q_ws[prio].steal_empty(producersBase)) {
goto TOP;
}
}
Expand Down Expand Up @@ -643,10 +657,18 @@ auto ex_cpu::make_worker(
spinning_threads_bitset.set_bit(Slot);
goto TOP;
}
for (size_t prio = PriorityRangeBegin; prio < PriorityRangeEnd; ++prio) {
if (!work_queues[prio].empty()) {
spinning_threads_bitset.set_bit(Slot);
goto TOP;
{
cldq_t** producersBase =
static_cast<cldq_t**>(tmc::detail::this_thread::producers());
for (size_t prio = PriorityRangeBegin; prio < PriorityRangeEnd;
++prio) {
if (!q_ingest[prio].empty()) {
spinning_threads_bitset.set_bit(Slot);
goto TOP;
}
if (!q_ws[prio].steal_empty(producersBase)) {
goto TOP;
}
}
}

Expand All @@ -665,9 +687,7 @@ auto ex_cpu::make_worker(
}

clear_thread_locals();
delete[] static_cast<task_queue_t::ExplicitProducer**>(
tmc::detail::this_thread::producers()
);
delete[] static_cast<cldq_t**>(tmc::detail::this_thread::producers());
tmc::detail::this_thread::producers() = nullptr;
};
}
Expand Down Expand Up @@ -990,25 +1010,24 @@ void ex_cpu::init() {
std::vector<size_t> threadInboxIndexes =
tmc::detail::get_thread_inbox_indexes(inboxInputs);
// Index is returned in ascending order
inboxes.resize(threadInboxIndexes.back() + 1);
inboxes.fill_default();
q_inbox.resize(threadInboxIndexes.back() + 1);
q_inbox.fill_default();
for (size_t i = 0; i < thread_count(); ++i) {
thread_states[i].inbox = &inboxes[threadInboxIndexes[i]];
thread_states[i].inbox = &q_inbox[threadInboxIndexes[i]];
}
}

work_queues.resize(PRIORITY_COUNT);
q_ingest.resize(PRIORITY_COUNT);
q_ws.resize(PRIORITY_COUNT);
size_t cumulativeOffset = 0;
for (size_t prio = 0; prio < PRIORITY_COUNT; ++prio) {
size_t threadCount = tidsByPrio[prio].size();
work_queues.emplace_at(prio, threadCount + 1);
work_queues[prio].staticProducers =
new task_queue_t::ExplicitProducer[threadCount];
for (size_t i = 0; i < threadCount; ++i) {
work_queues[prio].staticProducers[i].init(&work_queues[prio]);
}
work_queues[prio].dequeueProducerCount = threadCount;
work_queues[prio].producerArrayOffset = cumulativeOffset;
q_ingest.emplace_at(prio, 1u);

// Allocates one Chase-Lev deque per thread that participates in this
// priority.
q_ws.emplace_at(prio, threadCount, cumulativeOffset);

// add 1 extra space for cached producer
cumulativeOffset += threadCount + 1;
}
Expand Down Expand Up @@ -1284,15 +1303,13 @@ void ex_cpu::teardown() {
}
threads.clear();
thread_stoppers.clear();
inboxes.clear();
q_inbox.clear();

threads_by_priority_bitset.clear();
waker_matrix.clear();

for (size_t i = 0; i < work_queues.size(); ++i) {
delete[] work_queues[i].staticProducers;
}
work_queues.clear();
q_ws.clear();
q_ingest.clear();
working_threads_bitset.clear();
spinning_threads_bitset.clear();
if (task_stopper_bitsets != nullptr) {
Expand Down
Loading
Loading