Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 93 additions & 60 deletions include/exec/trampoline_scheduler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#pragma once

#include "../stdexec/execution.hpp"
#include "completion_behavior.hpp"

#include <cstddef>
#include <utility>
Expand All @@ -26,21 +27,22 @@ namespace experimental::execution
namespace __trampoline
{
using namespace STDEXEC;
class __scheduler;

template <class _Operation>
struct __trampoline_state
struct __state
{
static thread_local __trampoline_state* __current_;
static thread_local __state* __current_;

constexpr __trampoline_state(std::size_t __max_recursion_depth,
std::size_t __max_recursion_size) noexcept
constexpr __state(std::size_t __max_recursion_depth,
std::size_t __max_recursion_size) noexcept
: __max_recursion_size_(__max_recursion_size)
, __max_recursion_depth_(__max_recursion_depth)
{
__current_ = this;
}

constexpr ~__trampoline_state()
constexpr ~__state()
{
__current_ = nullptr;
}
Expand All @@ -59,36 +61,61 @@ namespace experimental::execution
_Operation* __tail_ = nullptr;
};

class __scheduler
struct __attrs
{
std::size_t const __max_recursion_size_;
std::size_t const __max_recursion_depth_;
template <__one_of<set_value_t, set_stopped_t> _Tag>
[[nodiscard]]
constexpr auto query(get_completion_scheduler_t<_Tag>) const noexcept -> __scheduler;

template <__one_of<set_value_t, set_stopped_t> _Tag, __queryable_with<get_domain_t> _Env>
[[nodiscard]]
constexpr auto query(get_completion_domain_t<_Tag>, _Env const &) const noexcept
{
return __domain_of_t<_Env>();
}

template <__one_of<set_value_t, set_stopped_t> _Tag>
[[nodiscard]]
constexpr auto query(exec::get_completion_behavior_t<_Tag>) const noexcept
{
return exec::completion_behavior::inline_completion
| exec::completion_behavior::asynchronous_affine;
}

constexpr auto operator==(__attrs const &) const noexcept -> bool = default;

std::size_t __max_recursion_depth_{4096};
};

class __scheduler : __attrs
{
std::size_t __max_recursion_size_;

public:
constexpr __scheduler() noexcept
: __max_recursion_size_(4096)
, __max_recursion_depth_(16)
: __attrs{16}
, __max_recursion_size_(4096)
{}

constexpr explicit __scheduler(std::size_t __max_recursion_depth) noexcept
: __max_recursion_size_(4096)
, __max_recursion_depth_(__max_recursion_depth)
: __attrs{__max_recursion_depth}
, __max_recursion_size_(4096)
{}

constexpr explicit __scheduler(std::size_t __max_recursion_depth,
std::size_t __max_recursion_size) noexcept
: __max_recursion_size_(__max_recursion_size)
, __max_recursion_depth_(__max_recursion_depth)
: __attrs{__max_recursion_depth}
, __max_recursion_size_(__max_recursion_size)
{}

private:
struct __operation_base
struct __opstate_base
{
using __execute_fn = void(__operation_base*) noexcept;
using __execute_fn = void(__opstate_base*) noexcept;

constexpr explicit __operation_base(__execute_fn* __execute,
std::size_t __max_size,
std::size_t __max_depth) noexcept
constexpr explicit __opstate_base(__execute_fn* __execute,
std::size_t __max_size,
std::size_t __max_depth) noexcept
: __execute_(__execute)
, __max_recursion_size_(__max_size)
, __max_recursion_depth_(__max_depth)
Expand All @@ -101,13 +128,12 @@ namespace experimental::execution

void start() & noexcept
{
auto* __current_state = __trampoline_state<__operation_base>::__current_;
auto* __current_state = __state<__opstate_base>::__current_;

if (__current_state == nullptr)
{
// origin schedule frame on this thread
__trampoline_state<__operation_base> __state{__max_recursion_depth_,
__max_recursion_size_};
__state<__opstate_base> __state{__max_recursion_depth_, __max_recursion_size_};
__execute();
__state.__drain();
}
Expand All @@ -131,43 +157,42 @@ namespace experimental::execution
// Exceeded recursion limit.

// push this recursive schedule to list tail
__prev_ = std::exchange(__current_state->__tail_,
static_cast<__operation_base*>(this));
__prev_ = std::exchange(__current_state->__tail_, static_cast<__opstate_base*>(this));
if (__prev_ != nullptr)
{
// was not empty
std::exchange(__prev_->__next_, static_cast<__operation_base*>(this));
std::exchange(__prev_->__next_, static_cast<__opstate_base*>(this));
}
else
{
// was empty
std::exchange(__current_state->__head_, static_cast<__operation_base*>(this));
std::exchange(__current_state->__head_, static_cast<__opstate_base*>(this));
}
}
}
}

__operation_base* __prev_ = nullptr;
__operation_base* __next_ = nullptr;
__opstate_base* __prev_ = nullptr;
__opstate_base* __next_ = nullptr;
__execute_fn* __execute_;
std::size_t const __max_recursion_size_;
std::size_t const __max_recursion_depth_;
};

template <class _Receiver>
struct __operation : __operation_base
struct __opstate : __opstate_base
{
constexpr explicit __operation(_Receiver __rcvr,
std::size_t __max_size,
std::size_t __max_depth) noexcept
: __operation_base(&__execute_impl, __max_size, __max_depth)
constexpr explicit __opstate(_Receiver __rcvr,
std::size_t __max_size,
std::size_t __max_depth) noexcept
: __opstate_base(&__execute_impl, __max_size, __max_depth)
, __rcvr_(static_cast<_Receiver&&>(__rcvr))
{}

static constexpr void __execute_impl(__operation_base* __op) noexcept
static constexpr void __execute_impl(__opstate_base* __op) noexcept
{
auto& __self = *static_cast<__operation*>(__op);
if (STDEXEC::unstoppable_token<stop_token_of_t<env_of_t<_Receiver&>>>)
auto& __self = *static_cast<__opstate*>(__op);
if constexpr (STDEXEC::unstoppable_token<stop_token_of_t<env_of_t<_Receiver&>>>)
{
STDEXEC::set_value(static_cast<_Receiver&&>(__self.__rcvr_));
}
Expand All @@ -188,40 +213,40 @@ namespace experimental::execution
_Receiver __rcvr_;
};

struct __schedule_sender;
friend __schedule_sender;

struct __schedule_sender
struct __sender
{
using sender_concept = STDEXEC::sender_t;
using completion_signatures =
STDEXEC::completion_signatures<set_value_t(), set_stopped_t()>;

constexpr explicit __schedule_sender(std::size_t __max_size,
std::size_t __max_depth) noexcept
constexpr explicit __sender(std::size_t __max_size, std::size_t __max_depth) noexcept
: __max_recursion_size_(__max_size)
, __max_recursion_depth_(__max_depth)
{}

template <receiver_of<completion_signatures> _Receiver>
constexpr auto connect(_Receiver __rcvr) const noexcept -> __operation<_Receiver>
template <class, class _Env>
static consteval auto get_completion_signatures() noexcept
{
return __operation<_Receiver>{static_cast<_Receiver&&>(__rcvr),
__max_recursion_size_,
__max_recursion_depth_};
if constexpr (unstoppable_token<stop_token_of_t<_Env>>)
{
return completion_signatures<set_value_t()>();
}
else
{
return completion_signatures<set_value_t(), set_stopped_t()>();
}
}

[[nodiscard]]
constexpr auto
query(get_completion_scheduler_t<set_value_t>, __ignore = {}) const noexcept -> __scheduler
template <class _Receiver>
constexpr auto connect(_Receiver __rcvr) const noexcept -> __opstate<_Receiver>
{
return __scheduler{__max_recursion_depth_};
return __opstate<_Receiver>{static_cast<_Receiver&&>(__rcvr),
__max_recursion_size_,
__max_recursion_depth_};
}

[[nodiscard]]
constexpr auto get_env() const noexcept -> __schedule_sender const &
constexpr auto get_env() const noexcept -> __attrs
{
return *this;
return __attrs{__max_recursion_depth_};
}

std::size_t const __max_recursion_size_;
Expand All @@ -230,20 +255,21 @@ namespace experimental::execution

public:
[[nodiscard]]
constexpr auto schedule() const noexcept -> __schedule_sender
constexpr auto schedule() const noexcept -> __sender
{
return __schedule_sender{__max_recursion_size_, __max_recursion_depth_};
return __sender{__max_recursion_size_, __max_recursion_depth_};
}

constexpr auto operator==(__scheduler const &) const noexcept -> bool = default;

using __attrs::query;
};

template <class _Operation>
thread_local __trampoline_state<_Operation>* __trampoline_state<_Operation>::__current_ =
nullptr;
thread_local __state<_Operation>* __state<_Operation>::__current_ = nullptr;

template <class _Operation>
constexpr void __trampoline_state<_Operation>::__drain() noexcept
constexpr void __state<_Operation>::__drain() noexcept
{
while (__head_ != nullptr)
{
Expand Down Expand Up @@ -277,6 +303,13 @@ namespace experimental::execution
__op->__execute();
}
}

template <__one_of<set_value_t, set_stopped_t> _Tag>
[[nodiscard]]
constexpr auto __attrs::query(get_completion_scheduler_t<_Tag>) const noexcept -> __scheduler
{
return __scheduler{__max_recursion_depth_};
}
} // namespace __trampoline

using trampoline_scheduler = __trampoline::__scheduler;
Expand Down
3 changes: 1 addition & 2 deletions include/stdexec/__detail/__domain.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -342,8 +342,7 @@ namespace STDEXEC
{
using __sch_t =
__call_result_t<get_completion_scheduler_t<_Tag>, _Attrs const &, _Env const &...>;
using X [[maybe_unused]] = decltype(__declval<__sch_t>().schedule());
using __read_query_t = typename get_completion_domain_t<set_value_t>::__read_query_t;
using __read_query_t = typename get_completion_domain_t<set_value_t>::__read_query_t;

if constexpr (__callable<__read_query_t, __sch_t, _Env const &...>)
{
Expand Down
15 changes: 10 additions & 5 deletions test/exec/test_repeat_until.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ namespace
static_assert(
std::same_as<ex::error_types_of_t<decltype(only_stopped)>, ex::__detail::__not_a_variant>,
"Expect no value completions");
static_assert(ex::sender_of<decltype(only_stopped), ex::set_stopped_t()>,
static_assert(ex::sender_of<decltype(only_stopped), ex::set_stopped_t(), ex::env<>>,
"Missing set_stopped_t() from upstream");

// operator| and sync_wait require valid completion signatures
Expand All @@ -331,12 +331,17 @@ namespace
static_assert(std::same_as<ex::error_types_of_t<decltype(only_error)>, std::variant<int>>,
"Unexpected added set_error_t(std::exception_ptr)");

// set_stopped_t is always added as a consequence of the internal trampoline_scheduler
using SC = ex::completion_signatures_of_t<ex::schedule_result_t<exec::trampoline_scheduler>>;
static_assert(!ex::sender_of<SC, ex::set_stopped_t()>
|| ex::sender_of<decltype(only_error), ex::set_stopped_t()>,
// set_stopped_t is added when the receiver's stop token is stoppable, as a
// consequence of the internal trampoline_scheduler
using stoppable_env_t = ex::prop<ex::get_stop_token_t, ex::inplace_stop_token>;
static_assert(ex::sender_of<decltype(only_error), ex::set_stopped_t(), stoppable_env_t>,
"Missing added set_stopped_t()");

// set_stopped_t is *not* added when the receiver's stop token is unstoppable.
static_assert(!ex::sender_of<decltype(only_error), ex::set_stopped_t(), ex::env<>>,
"set_stopped_t() should not be added when the receiver's stop token is "
"unstoppable");

// operator| and sync_wait require valid completion signatures
ex::sync_wait(only_error | ex::upon_error([](auto const) { return -1; }));
}
Expand Down
5 changes: 1 addition & 4 deletions test/exec/test_trampoline_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ namespace ex = STDEXEC;

namespace
{

struct try_again
{};

Expand Down Expand Up @@ -78,9 +77,7 @@ namespace
"[schedulers][trampoline_scheduler]")
{
exec::trampoline_scheduler sched;
ex::run_loop loop;

auto recurse_deeply = retry(ex::on(sched, fails_alot{}));
auto recurse_deeply = retry(ex::on(sched, fails_alot{}));
ex::sync_wait(std::move(recurse_deeply));
}

Expand Down
Loading
Loading