From 958a4588e66531d0500cd9afe09a3435679cecbd Mon Sep 17 00:00:00 2001 From: Eric Niebler Date: Tue, 3 Mar 2026 14:34:05 -0800 Subject: [PATCH 1/3] make `trampoline_scheduler` conditionally infallible and specify its completion behavior --- include/exec/trampoline_scheduler.hpp | 153 ++++++++++++++---------- include/stdexec/__detail/__domain.hpp | 1 - test/exec/test_repeat_until.cpp | 15 ++- test/exec/test_trampoline_scheduler.cpp | 3 - test/test_common/retry.hpp | 28 +++-- 5 files changed, 120 insertions(+), 80 deletions(-) diff --git a/include/exec/trampoline_scheduler.hpp b/include/exec/trampoline_scheduler.hpp index 86ae0bc08..0281b812a 100644 --- a/include/exec/trampoline_scheduler.hpp +++ b/include/exec/trampoline_scheduler.hpp @@ -17,6 +17,7 @@ #pragma once #include "../stdexec/execution.hpp" +#include "completion_behavior.hpp" #include #include @@ -26,21 +27,22 @@ namespace experimental::execution namespace __trampoline { using namespace STDEXEC; + class __scheduler; template - struct __trampoline_state + struct __state { - static thread_local __trampoline_state* __current_; + static thread_local __state* __current_; - constexpr __trampoline_state(std::size_t __max_recursion_depth, - std::size_t __max_recursion_size) noexcept + constexpr __state(std::size_t __max_recursion_depth, + std::size_t __max_recursion_size) noexcept : __max_recursion_size_(__max_recursion_size) , __max_recursion_depth_(__max_recursion_depth) { __current_ = this; } - constexpr ~__trampoline_state() + constexpr ~__state() { __current_ = nullptr; } @@ -59,36 +61,61 @@ namespace experimental::execution _Operation* __tail_ = nullptr; }; - class __scheduler + struct __attrs { - std::size_t const __max_recursion_size_; - std::size_t const __max_recursion_depth_; + template <__one_of _Tag> + [[nodiscard]] + constexpr auto query(get_completion_scheduler_t<_Tag>) const noexcept -> __scheduler; + + template <__one_of _Tag, __queryable_with _Env> + [[nodiscard]] + constexpr auto query(get_completion_domain_t<_Tag>, _Env const &) const noexcept + { + return __domain_of_t<_Env>(); + } + + template <__one_of _Tag> + [[nodiscard]] + constexpr auto query(exec::get_completion_behavior_t<_Tag>) const noexcept + { + return exec::completion_behavior::inline_completion + | exec::completion_behavior::asynchronous_affine; + } + + constexpr auto operator==(__attrs const &) const noexcept -> bool = default; + + std::size_t __max_recursion_depth_{4096}; + }; + + class __scheduler : __attrs + { + std::size_t __max_recursion_size_; public: constexpr __scheduler() noexcept - : __max_recursion_size_(4096) - , __max_recursion_depth_(16) + : __attrs{16} + , __max_recursion_size_(4096) {} constexpr explicit __scheduler(std::size_t __max_recursion_depth) noexcept - : __max_recursion_size_(4096) - , __max_recursion_depth_(__max_recursion_depth) + : __attrs{__max_recursion_depth} + , __max_recursion_size_(4096) {} constexpr explicit __scheduler(std::size_t __max_recursion_depth, std::size_t __max_recursion_size) noexcept - : __max_recursion_size_(__max_recursion_size) - , __max_recursion_depth_(__max_recursion_depth) + : __attrs{__max_recursion_depth} + , __max_recursion_size_(__max_recursion_size) {} private: - struct __operation_base + struct __opstate_base { - using __execute_fn = void(__operation_base*) noexcept; + using __execute_fn = void(__opstate_base*) noexcept; - constexpr explicit __operation_base(__execute_fn* __execute, - std::size_t __max_size, - std::size_t __max_depth) noexcept + constexpr explicit __opstate_base(__execute_fn* __execute, + std::size_t __max_size, + std::size_t __max_depth) noexcept : __execute_(__execute) , __max_recursion_size_(__max_size) , __max_recursion_depth_(__max_depth) @@ -101,13 +128,12 @@ namespace experimental::execution void start() & noexcept { - auto* __current_state = __trampoline_state<__operation_base>::__current_; + auto* __current_state = __state<__opstate_base>::__current_; if (__current_state == nullptr) { // origin schedule frame on this thread - __trampoline_state<__operation_base> __state{__max_recursion_depth_, - __max_recursion_size_}; + __state<__opstate_base> __state{__max_recursion_depth_, __max_recursion_size_}; __execute(); __state.__drain(); } @@ -131,43 +157,42 @@ namespace experimental::execution // Exceeded recursion limit. // push this recursive schedule to list tail - __prev_ = std::exchange(__current_state->__tail_, - static_cast<__operation_base*>(this)); + __prev_ = std::exchange(__current_state->__tail_, static_cast<__opstate_base*>(this)); if (__prev_ != nullptr) { // was not empty - std::exchange(__prev_->__next_, static_cast<__operation_base*>(this)); + std::exchange(__prev_->__next_, static_cast<__opstate_base*>(this)); } else { // was empty - std::exchange(__current_state->__head_, static_cast<__operation_base*>(this)); + std::exchange(__current_state->__head_, static_cast<__opstate_base*>(this)); } } } } - __operation_base* __prev_ = nullptr; - __operation_base* __next_ = nullptr; + __opstate_base* __prev_ = nullptr; + __opstate_base* __next_ = nullptr; __execute_fn* __execute_; std::size_t const __max_recursion_size_; std::size_t const __max_recursion_depth_; }; template - struct __operation : __operation_base + struct __opstate : __opstate_base { - constexpr explicit __operation(_Receiver __rcvr, - std::size_t __max_size, - std::size_t __max_depth) noexcept - : __operation_base(&__execute_impl, __max_size, __max_depth) + constexpr explicit __opstate(_Receiver __rcvr, + std::size_t __max_size, + std::size_t __max_depth) noexcept + : __opstate_base(&__execute_impl, __max_size, __max_depth) , __rcvr_(static_cast<_Receiver&&>(__rcvr)) {} - static constexpr void __execute_impl(__operation_base* __op) noexcept + static constexpr void __execute_impl(__opstate_base* __op) noexcept { - auto& __self = *static_cast<__operation*>(__op); - if (STDEXEC::unstoppable_token>>) + auto& __self = *static_cast<__opstate*>(__op); + if constexpr (STDEXEC::unstoppable_token>>) { STDEXEC::set_value(static_cast<_Receiver&&>(__self.__rcvr_)); } @@ -188,40 +213,40 @@ namespace experimental::execution _Receiver __rcvr_; }; - struct __schedule_sender; - friend __schedule_sender; - - struct __schedule_sender + struct __sender { using sender_concept = STDEXEC::sender_t; - using completion_signatures = - STDEXEC::completion_signatures; - constexpr explicit __schedule_sender(std::size_t __max_size, - std::size_t __max_depth) noexcept + constexpr explicit __sender(std::size_t __max_size, std::size_t __max_depth) noexcept : __max_recursion_size_(__max_size) , __max_recursion_depth_(__max_depth) {} - template _Receiver> - constexpr auto connect(_Receiver __rcvr) const noexcept -> __operation<_Receiver> + template + static consteval auto get_completion_signatures() noexcept { - return __operation<_Receiver>{static_cast<_Receiver&&>(__rcvr), - __max_recursion_size_, - __max_recursion_depth_}; + if constexpr (unstoppable_token>) + { + return completion_signatures(); + } + else + { + return completion_signatures(); + } } - [[nodiscard]] - constexpr auto - query(get_completion_scheduler_t, __ignore = {}) const noexcept -> __scheduler + template + constexpr auto connect(_Receiver __rcvr) const noexcept -> __opstate<_Receiver> { - return __scheduler{__max_recursion_depth_}; + return __opstate<_Receiver>{static_cast<_Receiver&&>(__rcvr), + __max_recursion_size_, + __max_recursion_depth_}; } [[nodiscard]] - constexpr auto get_env() const noexcept -> __schedule_sender const & + constexpr auto get_env() const noexcept -> __attrs { - return *this; + return __attrs{__max_recursion_depth_}; } std::size_t const __max_recursion_size_; @@ -230,20 +255,21 @@ namespace experimental::execution public: [[nodiscard]] - constexpr auto schedule() const noexcept -> __schedule_sender + constexpr auto schedule() const noexcept -> __sender { - return __schedule_sender{__max_recursion_size_, __max_recursion_depth_}; + return __sender{__max_recursion_size_, __max_recursion_depth_}; } constexpr auto operator==(__scheduler const &) const noexcept -> bool = default; + + using __attrs::query; }; template - thread_local __trampoline_state<_Operation>* __trampoline_state<_Operation>::__current_ = - nullptr; + thread_local __state<_Operation>* __state<_Operation>::__current_ = nullptr; template - constexpr void __trampoline_state<_Operation>::__drain() noexcept + constexpr void __state<_Operation>::__drain() noexcept { while (__head_ != nullptr) { @@ -277,6 +303,13 @@ namespace experimental::execution __op->__execute(); } } + + template <__one_of _Tag> + [[nodiscard]] + constexpr auto __attrs::query(get_completion_scheduler_t<_Tag>) const noexcept -> __scheduler + { + return __scheduler{__max_recursion_depth_}; + } } // namespace __trampoline using trampoline_scheduler = __trampoline::__scheduler; diff --git a/include/stdexec/__detail/__domain.hpp b/include/stdexec/__detail/__domain.hpp index 162d59db6..3544dcaf2 100644 --- a/include/stdexec/__detail/__domain.hpp +++ b/include/stdexec/__detail/__domain.hpp @@ -342,7 +342,6 @@ namespace STDEXEC { using __sch_t = __call_result_t, _Attrs const &, _Env const &...>; - using X [[maybe_unused]] = decltype(__declval<__sch_t>().schedule()); using __read_query_t = typename get_completion_domain_t::__read_query_t; if constexpr (__callable<__read_query_t, __sch_t, _Env const &...>) diff --git a/test/exec/test_repeat_until.cpp b/test/exec/test_repeat_until.cpp index 0435bc33c..8c6765f66 100644 --- a/test/exec/test_repeat_until.cpp +++ b/test/exec/test_repeat_until.cpp @@ -315,7 +315,7 @@ namespace static_assert( std::same_as, ex::__detail::__not_a_variant>, "Expect no value completions"); - static_assert(ex::sender_of, + static_assert(ex::sender_of>, "Missing set_stopped_t() from upstream"); // operator| and sync_wait require valid completion signatures @@ -331,12 +331,17 @@ namespace static_assert(std::same_as, std::variant>, "Unexpected added set_error_t(std::exception_ptr)"); - // set_stopped_t is always added as a consequence of the internal trampoline_scheduler - using SC = ex::completion_signatures_of_t>; - static_assert(!ex::sender_of - || ex::sender_of, + // set_stopped_t is added when the receiver's stop token is stoppable, as a + // consequence of the internal trampoline_scheduler + using stoppable_env_t = ex::prop; + static_assert(ex::sender_of, "Missing added set_stopped_t()"); + // set_stopped_t is *not* added when the receiver's stop token is unstoppable. + static_assert(!ex::sender_of>, + "set_stopped_t() should not be added when the receiver's stop token is " + "unstoppable"); + // operator| and sync_wait require valid completion signatures ex::sync_wait(only_error | ex::upon_error([](auto const) { return -1; })); } diff --git a/test/exec/test_trampoline_scheduler.cpp b/test/exec/test_trampoline_scheduler.cpp index 7ade514ab..9a0582b4b 100644 --- a/test/exec/test_trampoline_scheduler.cpp +++ b/test/exec/test_trampoline_scheduler.cpp @@ -26,7 +26,6 @@ namespace ex = STDEXEC; namespace { - struct try_again {}; @@ -78,8 +77,6 @@ namespace "[schedulers][trampoline_scheduler]") { exec::trampoline_scheduler sched; - ex::run_loop loop; - auto recurse_deeply = retry(ex::on(sched, fails_alot{})); ex::sync_wait(std::move(recurse_deeply)); } diff --git a/test/test_common/retry.hpp b/test/test_common/retry.hpp index 033b2eb82..cca53ac20 100644 --- a/test/test_common/retry.hpp +++ b/test/test_common/retry.hpp @@ -17,6 +17,7 @@ #pragma once // Pull in the reference implementation of P2300: +#include #include #include @@ -26,7 +27,6 @@ namespace { - template using _copy_cvref_t = STDEXEC::__copy_cvref_t; @@ -165,13 +165,14 @@ namespace using _keep_values = STDEXEC::completion_signatures; template Self, class... Env> - static consteval auto get_completion_signatures() -> STDEXEC::transform_completion_signatures< - STDEXEC::completion_signatures_of_t, - STDEXEC::completion_signatures, - _keep_values, - _swallow_errors> + static consteval auto get_completion_signatures() { - return {}; + return exec::transform_completion_signatures( + STDEXEC::get_completion_signatures(), + exec::keep_completion(), + exec::ignore_completion(), + {}, + STDEXEC::completion_signatures()); } template @@ -186,9 +187,14 @@ namespace } }; - template - auto retry(S s) -> STDEXEC::sender auto + struct retry_t { - return _retry_sender{static_cast(s)}; - } + template + auto operator()(S s) const -> _retry_sender + { + return _retry_sender{static_cast(s)}; + } + }; + + inline constexpr retry_t retry{}; } // namespace From d4a8497c05fbe0930ca64ef4fcb38bf3935e0550 Mon Sep 17 00:00:00 2001 From: Eric Niebler Date: Tue, 3 Mar 2026 14:37:30 -0800 Subject: [PATCH 2/3] formatting --- include/stdexec/__detail/__domain.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/include/stdexec/__detail/__domain.hpp b/include/stdexec/__detail/__domain.hpp index 3544dcaf2..711da8efd 100644 --- a/include/stdexec/__detail/__domain.hpp +++ b/include/stdexec/__detail/__domain.hpp @@ -342,7 +342,7 @@ namespace STDEXEC { using __sch_t = __call_result_t, _Attrs const &, _Env const &...>; - using __read_query_t = typename get_completion_domain_t::__read_query_t; + using __read_query_t = typename get_completion_domain_t::__read_query_t; if constexpr (__callable<__read_query_t, __sch_t, _Env const &...>) { From c59fe2fa0187a692ae43df74a747f883dcb9aff7 Mon Sep 17 00:00:00 2001 From: Eric Niebler Date: Tue, 3 Mar 2026 15:22:48 -0800 Subject: [PATCH 3/3] formatting --- test/exec/test_trampoline_scheduler.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/exec/test_trampoline_scheduler.cpp b/test/exec/test_trampoline_scheduler.cpp index 9a0582b4b..3cd3cb4ec 100644 --- a/test/exec/test_trampoline_scheduler.cpp +++ b/test/exec/test_trampoline_scheduler.cpp @@ -77,7 +77,7 @@ namespace "[schedulers][trampoline_scheduler]") { exec::trampoline_scheduler sched; - auto recurse_deeply = retry(ex::on(sched, fails_alot{})); + auto recurse_deeply = retry(ex::on(sched, fails_alot{})); ex::sync_wait(std::move(recurse_deeply)); }