Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .git-blame-ignore-revs
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,4 @@ aee392a046a26ae2340849fe98e38332d9537397

# new formatting style for improved readability
0cccd586b8d75c64b289e38f334e95846dfb4f33
72666721b5787c1adcc100dd86c26fbbe8bda82f
75 changes: 45 additions & 30 deletions examples/benchmark/asio_thread_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
#include <exec/asio/asio_thread_pool.hpp>
#include <exec/start_detached.hpp>

struct RunThread {
struct RunThread
{
void operator()(exec::asio::asio_thread_pool& pool,
std::size_t total_scheds,
std::size_t tid,
Expand All @@ -27,49 +28,62 @@ struct RunThread {
std::span<char> buffer,
#endif
std::atomic<bool>& stop,
exec::numa_policy numa) {
exec::numa_policy numa)
{
int numa_node = numa.thread_index_to_node(tid);
numa.bind_to_node(numa_node);
auto scheduler = pool.get_scheduler();
std::mutex mut;
auto scheduler = pool.get_scheduler();
std::mutex mut;
std::condition_variable cv;
while (true) {
while (true)
{
barrier.arrive_and_wait();
if (stop.load()) {
if (stop.load())
{
break;
}
#ifndef STDEXEC_NO_MONOTONIC_BUFFER_RESOURCE
pmr::monotonic_buffer_resource resource{
buffer.data(), buffer.size(), pmr::null_memory_resource()};
pmr::monotonic_buffer_resource resource{buffer.data(),
buffer.size(),
pmr::null_memory_resource()};
pmr::polymorphic_allocator<char> alloc(&resource);
auto [start, end] = exec::_pool_::even_share(total_scheds, tid, pool.available_parallelism());
std::size_t scheds = end - start;
std::size_t scheds = end - start;
std::atomic<std::size_t> counter{scheds};
auto env = stdexec::prop{stdexec::get_allocator, alloc};
while (scheds) {
exec::start_detached(
stdexec::schedule(scheduler) | stdexec::then([&] {
auto prev = counter.fetch_sub(1);
if (prev == 1) {
std::lock_guard lock{mut};
cv.notify_one();
}
}),
env);
auto env = stdexec::prop{stdexec::get_allocator, alloc};
while (scheds)
{
exec::start_detached(stdexec::schedule(scheduler)
| stdexec::then(
[&]
{
auto prev = counter.fetch_sub(1);
if (prev == 1)
{
std::lock_guard lock{mut};
cv.notify_one();
}
}),
env);
--scheds;
}
#else
auto [start, end] = exec::_pool_::even_share(total_scheds, tid, pool.available_parallelism());
std::size_t scheds = end - start;
std::size_t scheds = end - start;
std::atomic<std::size_t> counter{scheds};
while (scheds) {
exec::start_detached(stdexec::schedule(scheduler) | stdexec::then([&] {
auto prev = counter.fetch_sub(1);
if (prev == 1) {
std::lock_guard lock{mut};
cv.notify_one();
}
}));
while (scheds)
{
exec::start_detached(stdexec::schedule(scheduler)
| stdexec::then(
[&]
{
auto prev = counter.fetch_sub(1);
if (prev == 1)
{
std::lock_guard lock{mut};
cv.notify_one();
}
}));
--scheds;
}
#endif
Expand All @@ -81,6 +95,7 @@ struct RunThread {
}
};

auto main(int argc, char** argv) -> int {
auto main(int argc, char** argv) -> int
{
my_main<exec::asio::asio_thread_pool, RunThread>(argc, argv);
}
82 changes: 50 additions & 32 deletions examples/benchmark/fibonacci.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@
#include <exec/any_sender_of.hpp>
#include <stdexec/execution.hpp>

auto serial_fib(long n) -> long {
auto serial_fib(long n) -> long
{
return n < 2 ? n : serial_fib(n - 1) + serial_fib(n - 2);
}

Expand All @@ -35,41 +36,48 @@ using any_sender_of =
using fib_sender = any_sender_of<stdexec::set_value_t(long)>;

template <typename Scheduler>
struct fib_s {
using sender_concept = stdexec::sender_t;
struct fib_s
{
using sender_concept = stdexec::sender_t;
using completion_signatures = stdexec::completion_signatures<stdexec::set_value_t(long)>;

long cutoff;
long n;
long cutoff;
long n;
Scheduler sched;

template <class Receiver>
struct operation {
Receiver rcvr_;
long cutoff;
long n;
struct operation
{
Receiver rcvr_;
long cutoff;
long n;
Scheduler sched;

void start() & noexcept {
if (n < cutoff) {
void start() & noexcept
{
if (n < cutoff)
{
stdexec::set_value(static_cast<Receiver&&>(rcvr_), serial_fib(n));
} else {
auto mkchild = [&](long n) {
}
else
{
auto mkchild = [&](long n)
{
return stdexec::starts_on(sched, fib_sender(fib_s{cutoff, n, sched}));
};

exec::start_detached(
stdexec::when_all(mkchild(n - 1), mkchild(n - 2))
| stdexec::then([rcvr = static_cast<Receiver&&>(rcvr_)](long a, long b) mutable {
stdexec::set_value(static_cast<Receiver&&>(rcvr), a + b);
}));
| stdexec::then([rcvr = static_cast<Receiver&&>(rcvr_)](long a, long b) mutable
{ stdexec::set_value(static_cast<Receiver&&>(rcvr), a + b); }));
}
}
};

template <stdexec::receiver_of<completion_signatures> Receiver>
[[nodiscard]]
auto connect(Receiver rcvr) const -> operation<Receiver> {
auto connect(Receiver rcvr) const -> operation<Receiver>
{
return {static_cast<Receiver&&>(rcvr), cutoff, n, sched};
}
};
Expand All @@ -78,44 +86,54 @@ template <class Scheduler>
fib_s(long cutoff, long n, Scheduler sched) -> fib_s<Scheduler>;

template <typename duration, typename F>
auto measure(F&& f) {
auto measure(F&& f)
{
std::chrono::steady_clock::time_point start = std::chrono::steady_clock::now();
f();
return std::chrono::duration_cast<duration>(std::chrono::steady_clock::now() - start).count();
}

auto main(int argc, char** argv) -> int {
if (argc < 5) {
auto main(int argc, char** argv) -> int
{
if (argc < 5)
{
std::cerr << "Usage: example.benchmark.fibonacci cutoff n nruns {tbb|static}" << std::endl;
return -1;
}

// skip 'warmup' iterations for performance measurements
static constexpr size_t warmup = 1;

long cutoff = std::strtol(argv[1], nullptr, 10);
long n = std::strtol(argv[2], nullptr, 10);
std::size_t nruns = std::strtoul(argv[3], nullptr, 10);
long cutoff = std::strtol(argv[1], nullptr, 10);
long n = std::strtol(argv[2], nullptr, 10);
std::size_t nruns = std::strtoul(argv[3], nullptr, 10);

if (nruns <= warmup) {
if (nruns <= warmup)
{
std::cerr << "nruns should be >= " << warmup << std::endl;
return -1;
}

std::variant<exec::tbb::tbb_thread_pool, exec::static_thread_pool> pool;

if (argv[4] == std::string_view("tbb")) {
if (argv[4] == std::string_view("tbb"))
{
pool.emplace<exec::tbb::tbb_thread_pool>(static_cast<int>(std::thread::hardware_concurrency()));
} else {
pool.emplace<exec::static_thread_pool>(
std::thread::hardware_concurrency(), exec::bwos_params{}, exec::get_numa_policy());
}
else
{
pool.emplace<exec::static_thread_pool>(std::thread::hardware_concurrency(),
exec::bwos_params{},
exec::get_numa_policy());
}

std::vector<unsigned long> times;
long result;
for (unsigned long i = 0; i < nruns; ++i) {
auto snd = std::visit(
[&](auto&& pool) { return fib_sender(fib_s{cutoff, n, pool.get_scheduler()}); }, pool);
long result;
for (unsigned long i = 0; i < nruns; ++i)
{
auto snd = std::visit([&](auto&& pool)
{ return fib_sender(fib_s{cutoff, n, pool.get_scheduler()}); },
pool);

auto time = measure<std::chrono::milliseconds>(
[&] { std::tie(result) = stdexec::sync_wait(std::move(snd)).value(); });
Expand Down
94 changes: 55 additions & 39 deletions examples/benchmark/static_thread_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,61 +18,74 @@
#include <exec/start_detached.hpp>
#include <exec/static_thread_pool.hpp>

struct RunThread {
void operator()(
exec::static_thread_pool& pool,
std::size_t total_scheds,
std::size_t tid,
std::barrier<>& barrier,
struct RunThread
{
void operator()(exec::static_thread_pool& pool,
std::size_t total_scheds,
std::size_t tid,
std::barrier<>& barrier,
#ifndef STDEXEC_NO_MONOTONIC_BUFFER_RESOURCE
std::span<char> buffer,
std::span<char> buffer,
#endif
std::atomic<bool>& stop,
exec::numa_policy numa) {
std::atomic<bool>& stop,
exec::numa_policy numa)
{
int numa_node = numa.thread_index_to_node(tid);
numa.bind_to_node(numa_node);
exec::nodemask mask{};
mask.set(static_cast<std::size_t>(numa_node));
auto scheduler = pool.get_constrained_scheduler(&mask);
std::mutex mut;
auto scheduler = pool.get_constrained_scheduler(&mask);
std::mutex mut;
std::condition_variable cv;
while (true) {
while (true)
{
barrier.arrive_and_wait();
if (stop.load()) {
if (stop.load())
{
break;
}
#ifndef STDEXEC_NO_MONOTONIC_BUFFER_RESOURCE
pmr::monotonic_buffer_resource resource{
buffer.data(), buffer.size(), pmr::null_memory_resource()};
pmr::monotonic_buffer_resource resource{buffer.data(),
buffer.size(),
pmr::null_memory_resource()};
pmr::polymorphic_allocator<char> alloc(&resource);
auto [start, end] = exec::_pool_::even_share(total_scheds, tid, pool.available_parallelism());
std::size_t scheds = end - start;
std::size_t scheds = end - start;
std::atomic<std::size_t> counter{scheds};
auto env = stdexec::prop{stdexec::get_allocator, alloc};
while (scheds) {
exec::start_detached(
stdexec::schedule(scheduler) | stdexec::then([&] {
auto prev = counter.fetch_sub(1);
if (prev == 1) {
std::lock_guard lock{mut};
cv.notify_one();
}
}),
env);
auto env = stdexec::prop{stdexec::get_allocator, alloc};
while (scheds)
{
exec::start_detached(stdexec::schedule(scheduler)
| stdexec::then(
[&]
{
auto prev = counter.fetch_sub(1);
if (prev == 1)
{
std::lock_guard lock{mut};
cv.notify_one();
}
}),
env);
--scheds;
}
#else
auto [start, end] = exec::_pool_::even_share(total_scheds, tid, pool.available_parallelism());
std::size_t scheds = end - start;
std::size_t scheds = end - start;
std::atomic<std::size_t> counter{scheds};
while (scheds) {
exec::start_detached(stdexec::schedule(scheduler) | stdexec::then([&] {
auto prev = counter.fetch_sub(1);
if (prev == 1) {
std::lock_guard lock{mut};
cv.notify_one();
}
}));
while (scheds)
{
exec::start_detached(stdexec::schedule(scheduler)
| stdexec::then(
[&]
{
auto prev = counter.fetch_sub(1);
if (prev == 1)
{
std::lock_guard lock{mut};
cv.notify_one();
}
}));
--scheds;
}
#endif
Expand All @@ -84,14 +97,17 @@ struct RunThread {
}
};

struct my_numa_distribution : public exec::default_numa_policy {
struct my_numa_distribution : public exec::default_numa_policy
{
[[nodiscard]]
auto thread_index_to_node(std::size_t index) const noexcept -> int {
auto thread_index_to_node(std::size_t index) const noexcept -> int
{
return exec::default_numa_policy::thread_index_to_node(2 * index);
}
};

auto main(int argc, char** argv) -> int {
auto main(int argc, char** argv) -> int
{
my_numa_distribution numa{};
my_main<exec::static_thread_pool, RunThread>(argc, argv, numa);
}
Loading
Loading