From 14a11e2626ed497690e14d3722a2d75e24d71898 Mon Sep 17 00:00:00 2001 From: Lee Howes Date: Wed, 18 Jul 2018 09:45:03 -0700 Subject: [PATCH 01/18] Initial draft of async fork. --- include/pushmi/o/async.h | 251 +++++++++++++++++++++++++++++++++++++++ test/AsyncTest.cpp | 174 +++++++++++++++++++++++++++ test/CMakeLists.txt | 3 +- 3 files changed, 427 insertions(+), 1 deletion(-) create mode 100644 include/pushmi/o/async.h create mode 100644 test/AsyncTest.cpp diff --git a/include/pushmi/o/async.h b/include/pushmi/o/async.h new file mode 100644 index 0000000..32e5d7f --- /dev/null +++ b/include/pushmi/o/async.h @@ -0,0 +1,251 @@ +#pragma once +// Copyright (c) 2018-present, Facebook, Inc. +// +// This source code is licensed under the MIT license found in the +// LICENSE file in the root directory of this source tree. + +#include "../piping.h" +#include "../executor.h" +#include "extension_operators.h" + +namespace pushmi { + +namespace detail { + +template +struct via_fn_data : public Out { + Executor exec; + + via_fn_data(Out out, Executor exec) : + Out(std::move(out)), exec(std::move(exec)) {} +}; + +template +auto make_via_fn_data(Out out, Executor ex) -> via_fn_data { + return {std::move(out), std::move(ex)}; +} + +struct via_fn { + PUSHMI_TEMPLATE(class ExecutorFactory) + (requires Invocable) + auto operator()(ExecutorFactory ef) const { + return constrain(lazy::Sender<_1>, [ef = std::move(ef)](auto in) { + using In = decltype(in); + return ::pushmi::detail::deferred_from>( + std::move(in), + ::pushmi::detail::submit_transform_out( + constrain(lazy::Receiver<_1>, [ef](auto out) { + using Out = decltype(out); + auto exec = ef(); + return ::pushmi::detail::out_from_fn()( + make_via_fn_data(std::move(out), std::move(exec)), + // copy 'f' to allow multiple calls to submit + ::pushmi::on_value([](auto& data, auto&& v) { + using V = decltype(v); + ::pushmi::submit( + data.exec, + ::pushmi::now(data.exec), + ::pushmi::make_single( + [v = (V&&)v, out = std::move(static_cast(data))](auto) mutable { + ::pushmi::set_value(out, std::move(v)); + } + ) + ); + }), + ::pushmi::on_error([](auto& data, auto e) noexcept { + ::pushmi::submit( + data.exec, + ::pushmi::now(data.exec), + ::pushmi::make_single( + [e = std::move(e), out = std::move(static_cast(data))](auto) mutable { + ::pushmi::set_error(out, std::move(e)); + } + ) + ); + }), + ::pushmi::on_done([](auto& data){ + ::pushmi::submit( + data.exec, + ::pushmi::now(data.exec), + ::pushmi::make_single( + [out = std::move(static_cast(data))](auto) mutable { + ::pushmi::set_done(out); + } + ) + ); + }) + ); + }) + ) + ); + }); + } +}; + +} // namespace detail + +namespace operators { +PUSHMI_INLINE_VAR constexpr detail::via_fn via{}; +} // namespace operators + +#if 0 + +namespace detail { + +template +class fsdvia { + using executor_factory_type = std::decay_t; + + executor_factory_type factory_; + + template + class start_via { + using in_type = std::decay_t; + + executor_factory_type factory_; + in_type in_; + + template + class out_via { + using out_type = std::decay_t; + using executor_type = std::decay_t; + + struct shared_type { + shared_type(out_type&& out) : out_(std::move(out)), stopped_(false) {} + out_type out_; + std::atomic_bool stopped_; + }; + + template + struct producer_proxy { + RefWrapper up_; + std::shared_ptr shared_; + + producer_proxy(RefWrapper p, std::shared_ptr s) + : up_(std::move(p)), shared_(std::move(s)) {} + + template + void value(V v) { + if (!!shared_->stopped_.exchange(true)) { + return; + } + up_.get().value(std::move(v)); + } + + template + void error(E e) { + if (!!shared_->stopped_.exchange(true)) { + return; + } + up_.get().error(std::move(e)); + } + }; + + bool done_; + std::shared_ptr shared_; + executor_type exec_; + std::shared_ptr> upProxy_; + + public: + explicit out_via(out_type&& out, executor_type&& exec) + : done_(false), + shared_(std::make_shared(std::move(out))), + exec_(std::move(exec)), + upProxy_() {} + + template + void value(T t) { + if (done_ || shared_->stopped_) { + done_ = true; + return; + } + if (!upProxy_) { + std::abort(); + } + done_ = true; + exec_ | execute([t = std::move(t), shared = shared_](auto) mutable { + shared->out_.value(std::move(t)); + }); + } + + template + void error(E e) { + if (done_ || shared_->stopped_) { + done_ = true; + return; + } + if (!upProxy_) { + std::abort(); + } + done_ = true; + exec_ | execute([e = std::move(e), shared = shared_](auto) mutable { + shared->out_.error(std::move(e)); + }); + } + + void stopping() { + if (done_) { + return; + } + if (!upProxy_) { + std::abort(); + } + done_ = true; + if (!shared_->stopped_.exchange(true)) { + exec_ | + // must keep out and upProxy alive until out is notified that it + // is unsafe + execute([shared = shared_](auto) mutable { + shared->out_.stopping(); + }); + } + } + + template + void starting(RefWrapper up) { + if (!!upProxy_) { + std::abort(); + } + upProxy_ = std::make_shared>(AnyNone<>{ + producer_proxy{std::move(up), shared_}}); + // must keep out and upProxy alive until out is notified that it is + // starting + exec_ | execute([shared = shared_, upProxy = upProxy_](auto) mutable { + shared->out_.starting(wrap_ref(*upProxy)); + }); + } + }; + + public: + start_via(executor_factory_type&& ef, in_type&& in) + : factory_(ef), in_(in) {} + + template + auto then(Out out) { + auto exec = factory_(); + in_.then(out_via{std::move(out), std::move(exec)}); + } + }; + + public: + explicit fsdvia(executor_factory_type&& ef) : factory_(std::move(ef)) {} + + template + auto operator()(In in) { + return start_via{std::move(factory_), std::move(in)}; + } +}; + +} // namespace detail + +namespace fsd { + +template +auto via(ExecutorFactory factory) { + return detail::fsdvia{std::move(factory)}; +} + +} // namespace fsd +#endif + +} // namespace pushmi diff --git a/test/AsyncTest.cpp b/test/AsyncTest.cpp new file mode 100644 index 0000000..2f0b450 --- /dev/null +++ b/test/AsyncTest.cpp @@ -0,0 +1,174 @@ +#include "catch.hpp" + +#include + +#include +#include + +using namespace std::literals; + +#include "pushmi/flow_single_deferred.h" +#include "pushmi/o/empty.h" +#include "pushmi/o/just.h" +#include "pushmi/o/on.h" +#include "pushmi/o/transform.h" +#include "pushmi/o/tap.h" +#include "pushmi/o/via.h" +#include "pushmi/o/submit.h" +#include "pushmi/o/extension_operators.h" + +#include "pushmi/trampoline.h" +#include "pushmi/new_thread.h" + +using namespace pushmi::aliases; + +template +struct AsyncToken { +public: + struct Data { + Data(ValueType v) : v_(std::move(v)) {} + ValueType v_; + std::condition_variable cv_; + std::mutex cvm_; + }; + + AsyncToken(ExecutorType e, ValueType v) : + e_{std::move(e)}, dataPtr_{std::make_shared(std::move(v))} {} + + ExecutorType e_; + std::shared_ptr dataPtr_; +}; + + +namespace pushmi { +namespace detail { + template + struct async_fork_fn_data : public Out { + Executor exec; + + async_fork_fn_data(Out out, Executor exec) : + Out(std::move(out)), exec(std::move(exec)) {} + }; + + template + auto make_async_fork_fn_data(Out out, Executor ex) -> async_fork_fn_data { + return {std::move(out), std::move(ex)}; + } + + struct async_fork_fn { + PUSHMI_TEMPLATE(class ExecutorFactory) + (requires Invocable) + auto operator()(ExecutorFactory ef) const { + return constrain(lazy::Sender<_1>, [ef = std::move(ef)](auto in) { + using In = decltype(in); + return ::pushmi::detail::deferred_from>( + std::move(in), + ::pushmi::detail::submit_transform_out( + constrain(lazy::Receiver<_1>, [ef](auto out) { + using Out = decltype(out); + auto exec = ef(); + return ::pushmi::detail::out_from_fn()( + make_async_fork_fn_data(std::move(out), std::move(exec)), + // copy 'f' to allow multiple calls to submit + ::pushmi::on_value([](auto& data, auto&& v) { + using V = decltype(v); + auto exec = data.exec; + ::pushmi::submit( + exec, + ::pushmi::now(exec), + ::pushmi::make_single( + [v = (V&&)v, out = std::move(static_cast(data)), exec](auto) mutable { + // Token hard coded for this executor type at the moment + auto token = AsyncToken< + std::decay_t, std::decay_t>{ + exec, std::forward(v)}; + ::pushmi::set_value(out, std::move(token)); + } + ) + ); + }), + ::pushmi::on_error([](auto& data, auto e) noexcept { + ::pushmi::submit( + data.exec, + ::pushmi::now(data.exec), + ::pushmi::make_single( + [e = std::move(e), out = std::move(static_cast(data))](auto) mutable { + ::pushmi::set_error(out, std::move(e)); + } + ) + ); + }), + ::pushmi::on_done([](auto& data){ + ::pushmi::submit( + data.exec, + ::pushmi::now(data.exec), + ::pushmi::make_single( + [out = std::move(static_cast(data))](auto) mutable { + ::pushmi::set_done(out); + } + ) + ); + }) + ); + }) + ) + ); + }); + } + }; + +} // namespace detail + +namespace operators { +PUSHMI_INLINE_VAR constexpr detail::async_fork_fn async_fork{}; +} // namespace operators +} // namespace pushmi + + +SCENARIO( "async", "[async]" ) { + + GIVEN( "A new_thread time_single_deferred" ) { + auto nt = v::new_thread(); + using NT = decltype(nt); + + WHEN( "async task chain used with via" ) { + { + std::vector values; + + auto comparablething = op::just(2.0) | + op::via([&](){return nt;}) | + op::transform([exec = nt](auto v){ + return AsyncToken< + std::decay_t, std::decay_t>{ + exec, std::forward(v)}; + }) | + op::transform([](auto v){return v.dataPtr_->v_;}) | + op::blocking_submit(v::on_value([&](auto v) { values.push_back(std::to_string(v)); })); + + THEN( "only the first item was pushed" ) { + REQUIRE(values == std::vector{"2.000000"}); + } + } + + { + std::vector values; + auto realthing = op::just(2.0) | + op::async_fork([&](){return nt;}) | + op::transform([](auto v){return v.dataPtr_->v_;}) | + op::blocking_submit(v::on_value([&](auto v) { values.push_back(std::to_string(v)); })); + + THEN( "only the first item was pushed" ) { + REQUIRE(values == std::vector{"2.000000"}); + } + } + } + } +} + +/* +v::bulk_on_value( + [](size_t idx, auto& shared){shared += idx;}, + []() -> size_t { return 10; }, + [](size_t shape){ return 0; }, + [](auto& shared){return shared;}) +*/ diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 7f11184..a40c62a 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -2,6 +2,7 @@ add_executable(PushmiTest catch.cpp + AsyncTest.cpp CompileTest.cpp NewThreadTest.cpp TrampolineTest.cpp @@ -13,4 +14,4 @@ target_link_libraries(PushmiTest ) include(../external/Catch2/contrib/Catch.cmake) -catch_discover_tests(PushmiTest) \ No newline at end of file +catch_discover_tests(PushmiTest) From 920b989824f0c211988a276704a4002aaf671921 Mon Sep 17 00:00:00 2001 From: Lee Howes Date: Wed, 18 Jul 2018 11:00:09 -0700 Subject: [PATCH 02/18] Added async_join --- test/AsyncTest.cpp | 108 ++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 107 insertions(+), 1 deletion(-) diff --git a/test/AsyncTest.cpp b/test/AsyncTest.cpp index 2f0b450..8b5b55d 100644 --- a/test/AsyncTest.cpp +++ b/test/AsyncTest.cpp @@ -82,6 +82,7 @@ namespace detail { auto token = AsyncToken< std::decay_t, std::decay_t>{ exec, std::forward(v)}; + token.dataPtr_->cv_.notify_all(); ::pushmi::set_value(out, std::move(token)); } ) @@ -125,6 +126,109 @@ PUSHMI_INLINE_VAR constexpr detail::async_fork_fn async_fork{}; } // namespace pushmi +namespace pushmi { +namespace detail { + template + struct async_join_fn_data : public Out { + async_join_fn_data(Out out) : + Out(std::move(out)) {} + }; + + template + auto make_async_join_fn_data(Out out) -> async_join_fn_data { + return {std::move(out)}; + } + + struct async_join_fn { + auto operator()() const { + return constrain(lazy::Sender<_1>, [](auto in) { + using In = decltype(in); + return ::pushmi::detail::deferred_from>( + std::move(in), + ::pushmi::detail::submit_transform_out( + constrain(lazy::Receiver<_1>, [](auto out) { + using Out = decltype(out); + return ::pushmi::detail::out_from_fn()( + make_async_join_fn_data(std::move(out)), + // copy 'f' to allow multiple calls to submit + ::pushmi::on_value([](auto& data, auto&& asyncToken) { + // Async version that does not - why not? + using V = decltype(asyncToken); + auto exec = asyncToken.e_; + ::pushmi::submit( + exec, + ::pushmi::now(exec), + ::pushmi::make_single( + [asyncToken, out = std::move(static_cast(data)), exec](auto) mutable { + // Token hard coded for this executor type at the moment + std::thread t([ + exec, + asyncToken, + out]() mutable { + std::unique_lock lk(asyncToken.dataPtr_->cvm_); + // TODO: Currently this never wakes up. Fix to work properly. + //asyncToken.dataPtr_->cv_.wait(lk); + ::pushmi::submit( + exec, + ::pushmi::now(exec), + ::pushmi::make_single( + [asyncToken, out, exec](auto) mutable { + // Token hard coded for this executor type at the moment + ::pushmi::set_value(out, std::move(asyncToken.dataPtr_->v_)); + } + )); + }); + t.detach(); + } + ) + ); + }), + ::pushmi::on_error([](auto& data, auto e) noexcept { + auto out = std::move(static_cast(data)); + ::pushmi::set_error(out, std::move(e)); + }), + ::pushmi::on_done([](auto& data){ + auto out = std::move(static_cast(data)); + ::pushmi::set_done(out); + }) + ); + }) + ) + ); + }); + } + }; + +} // namespace detail + +namespace operators { +PUSHMI_INLINE_VAR constexpr detail::async_join_fn async_join{}; +} // namespace operators +} // namespace pushmi + +// This should specialise using customisation points +// For now it just assumes that this executor deals with threads and creates a +// new thread for each node. +/* +auto async_join() { + return pushmi::make_single_deferred( + [](auto out){ + return v::on_value([out = std::move(out)](auto asyncToken){ + std::thread t([ + asyncToken = std::move(asyncToken), + out = std::move(out)](){ + std::unique_lock lk(asyncToken.data_->cvm_); + asyncToken.data_->cv_.wait(lk); + op::via([](){return asyncToken.e_;}) | + v::on_value([asyncToken, out = std::move(out)](auto&){ + ::pushmi::set_value(out, std::move(asyncToken.data_->v_)); + }); + }); + t.detach(); + }); + }); +}*/ + SCENARIO( "async", "[async]" ) { GIVEN( "A new_thread time_single_deferred" ) { @@ -142,6 +246,7 @@ SCENARIO( "async", "[async]" ) { std::decay_t, std::decay_t>{ exec, std::forward(v)}; }) | + op::transform([](auto v){auto ptr = v.dataPtr_; return v;}) | op::transform([](auto v){return v.dataPtr_->v_;}) | op::blocking_submit(v::on_value([&](auto v) { values.push_back(std::to_string(v)); })); @@ -154,7 +259,8 @@ SCENARIO( "async", "[async]" ) { std::vector values; auto realthing = op::just(2.0) | op::async_fork([&](){return nt;}) | - op::transform([](auto v){return v.dataPtr_->v_;}) | + op::transform([](auto v){auto ptr = v.dataPtr_; return v;}) | + op::async_join() | op::blocking_submit(v::on_value([&](auto v) { values.push_back(std::to_string(v)); })); THEN( "only the first item was pushed" ) { From b75e933e5f7a331d7668d406e5d68b8050a1737c Mon Sep 17 00:00:00 2001 From: Lee Howes Date: Wed, 18 Jul 2018 11:08:12 -0700 Subject: [PATCH 03/18] Add flag --- test/AsyncTest.cpp | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/test/AsyncTest.cpp b/test/AsyncTest.cpp index 8b5b55d..759728b 100644 --- a/test/AsyncTest.cpp +++ b/test/AsyncTest.cpp @@ -30,6 +30,7 @@ struct AsyncToken { ValueType v_; std::condition_variable cv_; std::mutex cvm_; + bool flag_ = false; }; AsyncToken(ExecutorType e, ValueType v) : @@ -82,7 +83,7 @@ namespace detail { auto token = AsyncToken< std::decay_t, std::decay_t>{ exec, std::forward(v)}; - token.dataPtr_->cv_.notify_all(); + token.dataPtr_->flag_ = true; ::pushmi::set_value(out, std::move(token)); } ) @@ -166,8 +167,10 @@ namespace detail { asyncToken, out]() mutable { std::unique_lock lk(asyncToken.dataPtr_->cvm_); - // TODO: Currently this never wakes up. Fix to work properly. - //asyncToken.dataPtr_->cv_.wait(lk); + if(!asyncToken.dataPtr_->flag_) { + asyncToken.dataPtr_->cv_.wait( + lk, [&](){return asyncToken.dataPtr_->flag_;}); + } ::pushmi::submit( exec, ::pushmi::now(exec), From a245d2b3ff7809284694414b9191d299ae4d1951 Mon Sep 17 00:00:00 2001 From: Lee Howes Date: Wed, 18 Jul 2018 11:22:11 -0700 Subject: [PATCH 04/18] Move async management to async header. --- include/pushmi/o/async.h | 382 ++++++++++++++++----------------------- test/AsyncTest.cpp | 204 +-------------------- 2 files changed, 168 insertions(+), 418 deletions(-) diff --git a/include/pushmi/o/async.h b/include/pushmi/o/async.h index 32e5d7f..223bc99 100644 --- a/include/pushmi/o/async.h +++ b/include/pushmi/o/async.h @@ -9,243 +9,179 @@ #include "extension_operators.h" namespace pushmi { - -namespace detail { - -template -struct via_fn_data : public Out { - Executor exec; - - via_fn_data(Out out, Executor exec) : - Out(std::move(out)), exec(std::move(exec)) {} -}; - -template -auto make_via_fn_data(Out out, Executor ex) -> via_fn_data { - return {std::move(out), std::move(ex)}; -} - -struct via_fn { - PUSHMI_TEMPLATE(class ExecutorFactory) - (requires Invocable) - auto operator()(ExecutorFactory ef) const { - return constrain(lazy::Sender<_1>, [ef = std::move(ef)](auto in) { - using In = decltype(in); - return ::pushmi::detail::deferred_from>( - std::move(in), - ::pushmi::detail::submit_transform_out( - constrain(lazy::Receiver<_1>, [ef](auto out) { - using Out = decltype(out); - auto exec = ef(); - return ::pushmi::detail::out_from_fn()( - make_via_fn_data(std::move(out), std::move(exec)), - // copy 'f' to allow multiple calls to submit - ::pushmi::on_value([](auto& data, auto&& v) { - using V = decltype(v); - ::pushmi::submit( - data.exec, - ::pushmi::now(data.exec), - ::pushmi::make_single( - [v = (V&&)v, out = std::move(static_cast(data))](auto) mutable { - ::pushmi::set_value(out, std::move(v)); - } - ) - ); - }), - ::pushmi::on_error([](auto& data, auto e) noexcept { - ::pushmi::submit( - data.exec, - ::pushmi::now(data.exec), - ::pushmi::make_single( - [e = std::move(e), out = std::move(static_cast(data))](auto) mutable { - ::pushmi::set_error(out, std::move(e)); - } - ) - ); - }), - ::pushmi::on_done([](auto& data){ - ::pushmi::submit( - data.exec, - ::pushmi::now(data.exec), - ::pushmi::make_single( - [out = std::move(static_cast(data))](auto) mutable { - ::pushmi::set_done(out); - } - ) - ); - }) - ); - }) - ) - ); - }); - } -}; - -} // namespace detail - -namespace operators { -PUSHMI_INLINE_VAR constexpr detail::via_fn via{}; -} // namespace operators - -#if 0 - namespace detail { -template -class fsdvia { - using executor_factory_type = std::decay_t; - - executor_factory_type factory_; - - template - class start_via { - using in_type = std::decay_t; - - executor_factory_type factory_; - in_type in_; - - template - class out_via { - using out_type = std::decay_t; - using executor_type = std::decay_t; - - struct shared_type { - shared_type(out_type&& out) : out_(std::move(out)), stopped_(false) {} - out_type out_; - std::atomic_bool stopped_; - }; - - template - struct producer_proxy { - RefWrapper up_; - std::shared_ptr shared_; - - producer_proxy(RefWrapper p, std::shared_ptr s) - : up_(std::move(p)), shared_(std::move(s)) {} - - template - void value(V v) { - if (!!shared_->stopped_.exchange(true)) { - return; - } - up_.get().value(std::move(v)); - } - - template - void error(E e) { - if (!!shared_->stopped_.exchange(true)) { - return; - } - up_.get().error(std::move(e)); - } - }; - - bool done_; - std::shared_ptr shared_; - executor_type exec_; - std::shared_ptr> upProxy_; - - public: - explicit out_via(out_type&& out, executor_type&& exec) - : done_(false), - shared_(std::make_shared(std::move(out))), - exec_(std::move(exec)), - upProxy_() {} + template + struct AsyncToken { + public: + struct Data { + Data(ValueType v) : v_(std::move(v)) {} + ValueType v_; + std::condition_variable cv_; + std::mutex cvm_; + bool flag_ = false; + }; - template - void value(T t) { - if (done_ || shared_->stopped_) { - done_ = true; - return; - } - if (!upProxy_) { - std::abort(); - } - done_ = true; - exec_ | execute([t = std::move(t), shared = shared_](auto) mutable { - shared->out_.value(std::move(t)); - }); - } + AsyncToken(ExecutorType e, ValueType v) : + e_{std::move(e)}, dataPtr_{std::make_shared(std::move(v))} {} - template - void error(E e) { - if (done_ || shared_->stopped_) { - done_ = true; - return; - } - if (!upProxy_) { - std::abort(); - } - done_ = true; - exec_ | execute([e = std::move(e), shared = shared_](auto) mutable { - shared->out_.error(std::move(e)); - }); - } + ExecutorType e_; + std::shared_ptr dataPtr_; + }; - void stopping() { - if (done_) { - return; - } - if (!upProxy_) { - std::abort(); - } - done_ = true; - if (!shared_->stopped_.exchange(true)) { - exec_ | - // must keep out and upProxy alive until out is notified that it - // is unsafe - execute([shared = shared_](auto) mutable { - shared->out_.stopping(); - }); - } - } + template + struct async_fork_fn_data : public Out { + Executor exec; - template - void starting(RefWrapper up) { - if (!!upProxy_) { - std::abort(); - } - upProxy_ = std::make_shared>(AnyNone<>{ - producer_proxy{std::move(up), shared_}}); - // must keep out and upProxy alive until out is notified that it is - // starting - exec_ | execute([shared = shared_, upProxy = upProxy_](auto) mutable { - shared->out_.starting(wrap_ref(*upProxy)); - }); - } - }; + async_fork_fn_data(Out out, Executor exec) : + Out(std::move(out)), exec(std::move(exec)) {} + }; - public: - start_via(executor_factory_type&& ef, in_type&& in) - : factory_(ef), in_(in) {} + template + auto make_async_fork_fn_data(Out out, Executor ex) -> async_fork_fn_data { + return {std::move(out), std::move(ex)}; + } - template - auto then(Out out) { - auto exec = factory_(); - in_.then(out_via{std::move(out), std::move(exec)}); + struct async_fork_fn { + PUSHMI_TEMPLATE(class ExecutorFactory) + (requires Invocable) + auto operator()(ExecutorFactory ef) const { + return constrain(lazy::Sender<_1>, [ef = std::move(ef)](auto in) { + using In = decltype(in); + return ::pushmi::detail::deferred_from>( + std::move(in), + ::pushmi::detail::submit_transform_out( + constrain(lazy::Receiver<_1>, [ef](auto out) { + using Out = decltype(out); + auto exec = ef(); + return ::pushmi::detail::out_from_fn()( + make_async_fork_fn_data(std::move(out), std::move(exec)), + // copy 'f' to allow multiple calls to submit + ::pushmi::on_value([](auto& data, auto&& v) { + using V = decltype(v); + auto exec = data.exec; + ::pushmi::submit( + exec, + ::pushmi::now(exec), + ::pushmi::make_single( + [v = (V&&)v, out = std::move(static_cast(data)), exec](auto) mutable { + // Token hard coded for this executor type at the moment + auto token = AsyncToken< + std::decay_t, std::decay_t>{ + exec, std::forward(v)}; + token.dataPtr_->flag_ = true; + ::pushmi::set_value(out, std::move(token)); + } + ) + ); + }), + ::pushmi::on_error([](auto& data, auto e) noexcept { + ::pushmi::submit( + data.exec, + ::pushmi::now(data.exec), + ::pushmi::make_single( + [e = std::move(e), out = std::move(static_cast(data))](auto) mutable { + ::pushmi::set_error(out, std::move(e)); + } + ) + ); + }), + ::pushmi::on_done([](auto& data){ + ::pushmi::submit( + data.exec, + ::pushmi::now(data.exec), + ::pushmi::make_single( + [out = std::move(static_cast(data))](auto) mutable { + ::pushmi::set_done(out); + } + ) + ); + }) + ); + }) + ) + ); + }); } }; - public: - explicit fsdvia(executor_factory_type&& ef) : factory_(std::move(ef)) {} + template + struct async_join_fn_data : public Out { + async_join_fn_data(Out out) : + Out(std::move(out)) {} + }; - template - auto operator()(In in) { - return start_via{std::move(factory_), std::move(in)}; + template + auto make_async_join_fn_data(Out out) -> async_join_fn_data { + return {std::move(out)}; } -}; - -} // namespace detail -namespace fsd { - -template -auto via(ExecutorFactory factory) { - return detail::fsdvia{std::move(factory)}; -} + struct async_join_fn { + auto operator()() const { + return constrain(lazy::Sender<_1>, [](auto in) { + using In = decltype(in); + return ::pushmi::detail::deferred_from>( + std::move(in), + ::pushmi::detail::submit_transform_out( + constrain(lazy::Receiver<_1>, [](auto out) { + using Out = decltype(out); + return ::pushmi::detail::out_from_fn()( + make_async_join_fn_data(std::move(out)), + // copy 'f' to allow multiple calls to submit + ::pushmi::on_value([](auto& data, auto&& asyncToken) { + // Async version that does not - why not? + using V = decltype(asyncToken); + auto exec = asyncToken.e_; + ::pushmi::submit( + exec, + ::pushmi::now(exec), + ::pushmi::make_single( + [asyncToken, out = std::move(static_cast(data)), exec](auto) mutable { + // Token hard coded for this executor type at the moment + std::thread t([ + exec, + asyncToken, + out]() mutable { + std::unique_lock lk(asyncToken.dataPtr_->cvm_); + if(!asyncToken.dataPtr_->flag_) { + asyncToken.dataPtr_->cv_.wait( + lk, [&](){return asyncToken.dataPtr_->flag_;}); + } + ::pushmi::submit( + exec, + ::pushmi::now(exec), + ::pushmi::make_single( + [asyncToken, out, exec](auto) mutable { + // Token hard coded for this executor type at the moment + ::pushmi::set_value(out, std::move(asyncToken.dataPtr_->v_)); + } + )); + }); + t.detach(); + } + ) + ); + }), + ::pushmi::on_error([](auto& data, auto e) noexcept { + auto out = std::move(static_cast(data)); + ::pushmi::set_error(out, std::move(e)); + }), + ::pushmi::on_done([](auto& data){ + auto out = std::move(static_cast(data)); + ::pushmi::set_done(out); + }) + ); + }) + ) + ); + }); + } + }; -} // namespace fsd -#endif +} // namespace detail +namespace operators { +PUSHMI_INLINE_VAR constexpr detail::async_join_fn async_join{}; +PUSHMI_INLINE_VAR constexpr detail::async_fork_fn async_fork{}; +} // namespace operators } // namespace pushmi diff --git a/test/AsyncTest.cpp b/test/AsyncTest.cpp index 759728b..2b56b25 100644 --- a/test/AsyncTest.cpp +++ b/test/AsyncTest.cpp @@ -8,6 +8,7 @@ using namespace std::literals; #include "pushmi/flow_single_deferred.h" +#include "pushmi/o/async.h" #include "pushmi/o/empty.h" #include "pushmi/o/just.h" #include "pushmi/o/on.h" @@ -22,193 +23,6 @@ using namespace std::literals; using namespace pushmi::aliases; -template -struct AsyncToken { -public: - struct Data { - Data(ValueType v) : v_(std::move(v)) {} - ValueType v_; - std::condition_variable cv_; - std::mutex cvm_; - bool flag_ = false; - }; - - AsyncToken(ExecutorType e, ValueType v) : - e_{std::move(e)}, dataPtr_{std::make_shared(std::move(v))} {} - - ExecutorType e_; - std::shared_ptr dataPtr_; -}; - - -namespace pushmi { -namespace detail { - template - struct async_fork_fn_data : public Out { - Executor exec; - - async_fork_fn_data(Out out, Executor exec) : - Out(std::move(out)), exec(std::move(exec)) {} - }; - - template - auto make_async_fork_fn_data(Out out, Executor ex) -> async_fork_fn_data { - return {std::move(out), std::move(ex)}; - } - - struct async_fork_fn { - PUSHMI_TEMPLATE(class ExecutorFactory) - (requires Invocable) - auto operator()(ExecutorFactory ef) const { - return constrain(lazy::Sender<_1>, [ef = std::move(ef)](auto in) { - using In = decltype(in); - return ::pushmi::detail::deferred_from>( - std::move(in), - ::pushmi::detail::submit_transform_out( - constrain(lazy::Receiver<_1>, [ef](auto out) { - using Out = decltype(out); - auto exec = ef(); - return ::pushmi::detail::out_from_fn()( - make_async_fork_fn_data(std::move(out), std::move(exec)), - // copy 'f' to allow multiple calls to submit - ::pushmi::on_value([](auto& data, auto&& v) { - using V = decltype(v); - auto exec = data.exec; - ::pushmi::submit( - exec, - ::pushmi::now(exec), - ::pushmi::make_single( - [v = (V&&)v, out = std::move(static_cast(data)), exec](auto) mutable { - // Token hard coded for this executor type at the moment - auto token = AsyncToken< - std::decay_t, std::decay_t>{ - exec, std::forward(v)}; - token.dataPtr_->flag_ = true; - ::pushmi::set_value(out, std::move(token)); - } - ) - ); - }), - ::pushmi::on_error([](auto& data, auto e) noexcept { - ::pushmi::submit( - data.exec, - ::pushmi::now(data.exec), - ::pushmi::make_single( - [e = std::move(e), out = std::move(static_cast(data))](auto) mutable { - ::pushmi::set_error(out, std::move(e)); - } - ) - ); - }), - ::pushmi::on_done([](auto& data){ - ::pushmi::submit( - data.exec, - ::pushmi::now(data.exec), - ::pushmi::make_single( - [out = std::move(static_cast(data))](auto) mutable { - ::pushmi::set_done(out); - } - ) - ); - }) - ); - }) - ) - ); - }); - } - }; - -} // namespace detail - -namespace operators { -PUSHMI_INLINE_VAR constexpr detail::async_fork_fn async_fork{}; -} // namespace operators -} // namespace pushmi - - -namespace pushmi { -namespace detail { - template - struct async_join_fn_data : public Out { - async_join_fn_data(Out out) : - Out(std::move(out)) {} - }; - - template - auto make_async_join_fn_data(Out out) -> async_join_fn_data { - return {std::move(out)}; - } - - struct async_join_fn { - auto operator()() const { - return constrain(lazy::Sender<_1>, [](auto in) { - using In = decltype(in); - return ::pushmi::detail::deferred_from>( - std::move(in), - ::pushmi::detail::submit_transform_out( - constrain(lazy::Receiver<_1>, [](auto out) { - using Out = decltype(out); - return ::pushmi::detail::out_from_fn()( - make_async_join_fn_data(std::move(out)), - // copy 'f' to allow multiple calls to submit - ::pushmi::on_value([](auto& data, auto&& asyncToken) { - // Async version that does not - why not? - using V = decltype(asyncToken); - auto exec = asyncToken.e_; - ::pushmi::submit( - exec, - ::pushmi::now(exec), - ::pushmi::make_single( - [asyncToken, out = std::move(static_cast(data)), exec](auto) mutable { - // Token hard coded for this executor type at the moment - std::thread t([ - exec, - asyncToken, - out]() mutable { - std::unique_lock lk(asyncToken.dataPtr_->cvm_); - if(!asyncToken.dataPtr_->flag_) { - asyncToken.dataPtr_->cv_.wait( - lk, [&](){return asyncToken.dataPtr_->flag_;}); - } - ::pushmi::submit( - exec, - ::pushmi::now(exec), - ::pushmi::make_single( - [asyncToken, out, exec](auto) mutable { - // Token hard coded for this executor type at the moment - ::pushmi::set_value(out, std::move(asyncToken.dataPtr_->v_)); - } - )); - }); - t.detach(); - } - ) - ); - }), - ::pushmi::on_error([](auto& data, auto e) noexcept { - auto out = std::move(static_cast(data)); - ::pushmi::set_error(out, std::move(e)); - }), - ::pushmi::on_done([](auto& data){ - auto out = std::move(static_cast(data)); - ::pushmi::set_done(out); - }) - ); - }) - ) - ); - }); - } - }; - -} // namespace detail - -namespace operators { -PUSHMI_INLINE_VAR constexpr detail::async_join_fn async_join{}; -} // namespace operators -} // namespace pushmi - // This should specialise using customisation points // For now it just assumes that this executor deals with threads and creates a // new thread for each node. @@ -216,15 +30,15 @@ PUSHMI_INLINE_VAR constexpr detail::async_join_fn async_join{}; auto async_join() { return pushmi::make_single_deferred( [](auto out){ - return v::on_value([out = std::move(out)](auto asyncToken){ + return v::on_value([out = std::move(out)](auto pushmi::detail::AsyncToken){ std::thread t([ - asyncToken = std::move(asyncToken), + pushmi::detail::AsyncToken = std::move(pushmi::detail::AsyncToken), out = std::move(out)](){ - std::unique_lock lk(asyncToken.data_->cvm_); - asyncToken.data_->cv_.wait(lk); - op::via([](){return asyncToken.e_;}) | - v::on_value([asyncToken, out = std::move(out)](auto&){ - ::pushmi::set_value(out, std::move(asyncToken.data_->v_)); + std::unique_lock lk(pushmi::detail::AsyncToken.data_->cvm_); + pushmi::detail::AsyncToken.data_->cv_.wait(lk); + op::via([](){return pushmi::detail::AsyncToken.e_;}) | + v::on_value([pushmi::detail::AsyncToken, out = std::move(out)](auto&){ + ::pushmi::set_value(out, std::move(pushmi::detail::AsyncToken.data_->v_)); }); }); t.detach(); @@ -245,7 +59,7 @@ SCENARIO( "async", "[async]" ) { auto comparablething = op::just(2.0) | op::via([&](){return nt;}) | op::transform([exec = nt](auto v){ - return AsyncToken< + return pushmi::detail::AsyncToken< std::decay_t, std::decay_t>{ exec, std::forward(v)}; }) | From a237a5558345e1f8fda2ceb55e3352a834784abe Mon Sep 17 00:00:00 2001 From: Lee Howes Date: Wed, 18 Jul 2018 11:58:49 -0700 Subject: [PATCH 05/18] async_transform --- include/pushmi/o/async.h | 95 +++++++++++++++++++++++++++++++++++++--- test/AsyncTest.cpp | 42 ++++++------------ 2 files changed, 103 insertions(+), 34 deletions(-) diff --git a/include/pushmi/o/async.h b/include/pushmi/o/async.h index 223bc99..bac7f8b 100644 --- a/include/pushmi/o/async.h +++ b/include/pushmi/o/async.h @@ -11,19 +11,20 @@ namespace pushmi { namespace detail { - template + template struct AsyncToken { public: + using ValueType = ValueType_; + using ExecutorType = ExecutorType_; struct Data { - Data(ValueType v) : v_(std::move(v)) {} ValueType v_; std::condition_variable cv_; std::mutex cvm_; bool flag_ = false; }; - AsyncToken(ExecutorType e, ValueType v) : - e_{std::move(e)}, dataPtr_{std::make_shared(std::move(v))} {} + AsyncToken(ExecutorType e) : + e_{std::move(e)}, dataPtr_{std::make_shared()} {} ExecutorType e_; std::shared_ptr dataPtr_; @@ -68,7 +69,8 @@ namespace detail { // Token hard coded for this executor type at the moment auto token = AsyncToken< std::decay_t, std::decay_t>{ - exec, std::forward(v)}; + exec}; + token.dataPtr_->v_ = std::forward(v); token.dataPtr_->flag_ = true; ::pushmi::set_value(out, std::move(token)); } @@ -178,10 +180,93 @@ namespace detail { } }; + + // extracted this to workaround cuda compiler failure to compute the static_asserts in the nested lambda context + template + struct async_transform_on_value { + F f_; + async_transform_on_value() = default; + constexpr explicit async_transform_on_value(F f) + : f_(std::move(f)) {} + template + auto operator()(Out& out, V&& inputToken) { + using Result = decltype(f_(std::declval())); + using Executor = typename V::ExecutorType; + static_assert(::pushmi::SemiMovable>, + "none of the functions supplied to transform can convert this value"); + static_assert(::pushmi::SingleReceiver>, + "Result of value transform cannot be delivered to Out"); + + AsyncToken outputToken{inputToken.e_}; + std::thread t([ + inputToken, + outputToken, + out, + func = this->f_]() mutable { + std::unique_lock inlk(inputToken.dataPtr_->cvm_); + // Wait for input value + if(!inputToken.dataPtr_->flag_) { + inputToken.dataPtr_->cv_.wait( + inlk, [&](){return inputToken.dataPtr_->flag_;}); + } + // Compute + auto result = func(inputToken.dataPtr_->v_); + // Move output and notify + std::unique_lock outlk(outputToken.dataPtr_->cvm_); + outputToken.dataPtr_->v_ = std::move(result); + outputToken.dataPtr_->flag_ = true; + outputToken.dataPtr_->cv_.notify_all(); + }); + + t.detach(); + ::pushmi::set_value(out, outputToken); + } + }; + + struct async_transform_fn { + template + auto operator()(FN... fn) const; + }; + + template + auto async_transform_fn::operator()(FN... fn) const { + auto f = ::pushmi::overload(std::move(fn)...); + return ::pushmi::constrain(::pushmi::lazy::Sender<::pushmi::_1>, [f = std::move(f)](auto in) { + using In = decltype(in); + // copy 'f' to allow multiple calls to connect to multiple 'in' + using F = decltype(f); + return ::pushmi::detail::deferred_from>( + std::move(in), + ::pushmi::detail::submit_transform_out( + ::pushmi::constrain(::pushmi::lazy::Receiver<::pushmi::_1>, [f](auto out) { + using Out = decltype(out); + return ::pushmi::detail::out_from_fn()( + std::move(out), + // copy 'f' to allow multiple calls to submit + ::pushmi::on_value( + async_transform_on_value(f) + // [f](Out& out, auto&& v) { + // using V = decltype(v); + // using Result = decltype(f((V&&) v)); + // static_assert(::pushmi::SemiMovable, + // "none of the functions supplied to transform can convert this value"); + // static_assert(::pushmi::SingleReceiver, + // "Result of value transform cannot be delivered to Out"); + // ::pushmi::set_value(out, f((V&&) v)); + // } + ) + ); + }) + ) + ); + }); + } + } // namespace detail namespace operators { PUSHMI_INLINE_VAR constexpr detail::async_join_fn async_join{}; PUSHMI_INLINE_VAR constexpr detail::async_fork_fn async_fork{}; +PUSHMI_INLINE_VAR constexpr detail::async_transform_fn async_transform{}; } // namespace operators } // namespace pushmi diff --git a/test/AsyncTest.cpp b/test/AsyncTest.cpp index 2b56b25..8a8f2f5 100644 --- a/test/AsyncTest.cpp +++ b/test/AsyncTest.cpp @@ -23,35 +23,14 @@ using namespace std::literals; using namespace pushmi::aliases; -// This should specialise using customisation points -// For now it just assumes that this executor deals with threads and creates a -// new thread for each node. -/* -auto async_join() { - return pushmi::make_single_deferred( - [](auto out){ - return v::on_value([out = std::move(out)](auto pushmi::detail::AsyncToken){ - std::thread t([ - pushmi::detail::AsyncToken = std::move(pushmi::detail::AsyncToken), - out = std::move(out)](){ - std::unique_lock lk(pushmi::detail::AsyncToken.data_->cvm_); - pushmi::detail::AsyncToken.data_->cv_.wait(lk); - op::via([](){return pushmi::detail::AsyncToken.e_;}) | - v::on_value([pushmi::detail::AsyncToken, out = std::move(out)](auto&){ - ::pushmi::set_value(out, std::move(pushmi::detail::AsyncToken.data_->v_)); - }); - }); - t.detach(); - }); - }); -}*/ - SCENARIO( "async", "[async]" ) { GIVEN( "A new_thread time_single_deferred" ) { auto nt = v::new_thread(); using NT = decltype(nt); + auto workerTask = [](auto v) mutable {return v + 1;}; + WHEN( "async task chain used with via" ) { { std::vector values; @@ -59,16 +38,21 @@ SCENARIO( "async", "[async]" ) { auto comparablething = op::just(2.0) | op::via([&](){return nt;}) | op::transform([exec = nt](auto v){ - return pushmi::detail::AsyncToken< + auto token = pushmi::detail::AsyncToken< std::decay_t, std::decay_t>{ - exec, std::forward(v)}; + exec}; + token.dataPtr_->v_ = std::forward(v); + return token; + }) | + op::transform([workerTask](auto v) mutable { + v.dataPtr_->v_ = workerTask(v.dataPtr_->v_); + return v; }) | - op::transform([](auto v){auto ptr = v.dataPtr_; return v;}) | op::transform([](auto v){return v.dataPtr_->v_;}) | op::blocking_submit(v::on_value([&](auto v) { values.push_back(std::to_string(v)); })); THEN( "only the first item was pushed" ) { - REQUIRE(values == std::vector{"2.000000"}); + REQUIRE(values == std::vector{"3.000000"}); } } @@ -76,12 +60,12 @@ SCENARIO( "async", "[async]" ) { std::vector values; auto realthing = op::just(2.0) | op::async_fork([&](){return nt;}) | - op::transform([](auto v){auto ptr = v.dataPtr_; return v;}) | + op::async_transform(workerTask) | op::async_join() | op::blocking_submit(v::on_value([&](auto v) { values.push_back(std::to_string(v)); })); THEN( "only the first item was pushed" ) { - REQUIRE(values == std::vector{"2.000000"}); + REQUIRE(values == std::vector{"3.000000"}); } } } From 28ca24a272cdd4358f4ae20b034929b93fe4bb5a Mon Sep 17 00:00:00 2001 From: Lee Howes Date: Wed, 18 Jul 2018 14:09:13 -0700 Subject: [PATCH 06/18] Add customisation points. --- include/pushmi/o/async.h | 214 ++++++++++++++++++++++++++------------- test/AsyncTest.cpp | 2 +- 2 files changed, 143 insertions(+), 73 deletions(-) diff --git a/include/pushmi/o/async.h b/include/pushmi/o/async.h index bac7f8b..f4a5468 100644 --- a/include/pushmi/o/async.h +++ b/include/pushmi/o/async.h @@ -6,13 +6,14 @@ #include "../piping.h" #include "../executor.h" +#include "../new_thread.h" #include "extension_operators.h" namespace pushmi { namespace detail { template - struct AsyncToken { + struct NewThreadAsyncToken { public: using ValueType = ValueType_; using ExecutorType = ExecutorType_; @@ -23,15 +24,29 @@ namespace detail { bool flag_ = false; }; - AsyncToken(ExecutorType e) : + NewThreadAsyncToken(ExecutorType e) : e_{std::move(e)}, dataPtr_{std::make_shared()} {} ExecutorType e_; std::shared_ptr dataPtr_; }; + template + struct InlineAsyncToken { + public: + using ValueType = ValueType_; + using ExecutorType = ExecutorType_; + + InlineAsyncToken(ExecutorType e) : + e_{std::move(e)} {} + + ExecutorType e_; + ValueType value_; + }; + template struct async_fork_fn_data : public Out { + using out_t = Out; Executor exec; async_fork_fn_data(Out out, Executor exec) : @@ -43,6 +58,40 @@ namespace detail { return {std::move(out), std::move(ex)}; } + // Generic version + template + struct async_fork_on_value_impl { + void operator()(Executor exec, Data& data, Value&& value) { + static_assert(std::is_same::value, "Inline not yet implemented for fork"); + } + }; + + + // Customisation for NewThreadAsyncToken + template + struct async_fork_on_value_impl { + void operator()(decltype(new_thread()) exec, Data& data, Value&& value) { + + ::pushmi::submit( + exec, + ::pushmi::now(exec), + ::pushmi::make_single( + [value = (Value&&)value, + out = std::move(static_cast::out_t&>(data)), + exec](auto) mutable { + // Token hard coded for this executor type at the moment + auto token = NewThreadAsyncToken< + std::decay_t, std::decay_t>{ + exec}; + token.dataPtr_->v_ = std::forward(value); + token.dataPtr_->flag_ = true; + ::pushmi::set_value(out, std::move(token)); + } + ) + ); + } + }; + struct async_fork_fn { PUSHMI_TEMPLATE(class ExecutorFactory) (requires Invocable) @@ -61,21 +110,8 @@ namespace detail { ::pushmi::on_value([](auto& data, auto&& v) { using V = decltype(v); auto exec = data.exec; - ::pushmi::submit( - exec, - ::pushmi::now(exec), - ::pushmi::make_single( - [v = (V&&)v, out = std::move(static_cast(data)), exec](auto) mutable { - // Token hard coded for this executor type at the moment - auto token = AsyncToken< - std::decay_t, std::decay_t>{ - exec}; - token.dataPtr_->v_ = std::forward(v); - token.dataPtr_->flag_ = true; - ::pushmi::set_value(out, std::move(token)); - } - ) - ); + async_fork_on_value_impl{}( + exec, data, std::forward(v)); }), ::pushmi::on_error([](auto& data, auto e) noexcept { ::pushmi::submit( @@ -109,6 +145,7 @@ namespace detail { template struct async_join_fn_data : public Out { + using out_t = Out; async_join_fn_data(Out out) : Out(std::move(out)) {} }; @@ -118,6 +155,55 @@ namespace detail { return {std::move(out)}; } + // Generic version + template + struct async_join_on_value_impl { + void operator()(Data& data, Token&& token) { + static_assert(std::is_same::value, "Inline not yet implemented for join"); + } + }; + + // Customisation for NewThreadAsyncToken + template + struct async_join_on_value_impl< + NewThreadAsyncToken, Data> { + using token_t = NewThreadAsyncToken; + void operator()(Data& data, token_t&& asyncToken) { + auto exec = asyncToken.e_; + ::pushmi::submit( + exec, + ::pushmi::now(exec), + ::pushmi::make_single( + [asyncToken, + out = std::move(static_cast::out_t&>(data)), + exec](auto) mutable { + // Token hard coded for this executor type at the moment + std::thread t([ + exec, + asyncToken, + out]() mutable { + std::unique_lock lk(asyncToken.dataPtr_->cvm_); + if(!asyncToken.dataPtr_->flag_) { + asyncToken.dataPtr_->cv_.wait( + lk, [&](){return asyncToken.dataPtr_->flag_;}); + } + ::pushmi::submit( + exec, + ::pushmi::now(exec), + ::pushmi::make_single( + [asyncToken, out, exec](auto) mutable { + // Token hard coded for this executor type at the moment + ::pushmi::set_value(out, std::move(asyncToken.dataPtr_->v_)); + } + )); + }); + t.detach(); + } + ) + ); + } + }; + struct async_join_fn { auto operator()() const { return constrain(lazy::Sender<_1>, [](auto in) { @@ -131,38 +217,10 @@ namespace detail { make_async_join_fn_data(std::move(out)), // copy 'f' to allow multiple calls to submit ::pushmi::on_value([](auto& data, auto&& asyncToken) { - // Async version that does not - why not? - using V = decltype(asyncToken); - auto exec = asyncToken.e_; - ::pushmi::submit( - exec, - ::pushmi::now(exec), - ::pushmi::make_single( - [asyncToken, out = std::move(static_cast(data)), exec](auto) mutable { - // Token hard coded for this executor type at the moment - std::thread t([ - exec, - asyncToken, - out]() mutable { - std::unique_lock lk(asyncToken.dataPtr_->cvm_); - if(!asyncToken.dataPtr_->flag_) { - asyncToken.dataPtr_->cv_.wait( - lk, [&](){return asyncToken.dataPtr_->flag_;}); - } - ::pushmi::submit( - exec, - ::pushmi::now(exec), - ::pushmi::make_single( - [asyncToken, out, exec](auto) mutable { - // Token hard coded for this executor type at the moment - ::pushmi::set_value(out, std::move(asyncToken.dataPtr_->v_)); - } - )); - }); - t.detach(); - } - ) - ); + async_join_on_value_impl< + std::decay_t, + std::decay_t>{}( + data, std::move(asyncToken)); }), ::pushmi::on_error([](auto& data, auto e) noexcept { auto out = std::move(static_cast(data)); @@ -180,24 +238,41 @@ namespace detail { } }; - - // extracted this to workaround cuda compiler failure to compute the static_asserts in the nested lambda context - template - struct async_transform_on_value { + // Generic version + template + struct async_transform_on_value_impl { F f_; - async_transform_on_value() = default; - constexpr explicit async_transform_on_value(F f) + async_transform_on_value_impl() = default; + constexpr explicit async_transform_on_value_impl(F f) : f_(std::move(f)) {} template auto operator()(Out& out, V&& inputToken) { - using Result = decltype(f_(std::declval())); - using Executor = typename V::ExecutorType; - static_assert(::pushmi::SemiMovable>, + static_assert(std::is_same::value, "Inline not yet implemented for transform"); + } + }; + + // Customisation for NewThreadAsyncToken + template + struct async_transform_on_value_impl< + F, NewThreadAsyncToken, Data> { + + using token_t = NewThreadAsyncToken; + F f_; + + async_transform_on_value_impl() = default; + constexpr explicit async_transform_on_value_impl(F f) + : f_(std::move(f)) {} + + template + auto operator()(Out& out, token_t&& inputToken) { + using Result = decltype(f_(std::declval())); + using Executor = typename token_t::ExecutorType; + static_assert(::pushmi::SemiMovable>, "none of the functions supplied to transform can convert this value"); - static_assert(::pushmi::SingleReceiver>, + static_assert(::pushmi::SingleReceiver>, "Result of value transform cannot be delivered to Out"); - AsyncToken outputToken{inputToken.e_}; + NewThreadAsyncToken outputToken{inputToken.e_}; std::thread t([ inputToken, outputToken, @@ -243,18 +318,13 @@ namespace detail { return ::pushmi::detail::out_from_fn()( std::move(out), // copy 'f' to allow multiple calls to submit - ::pushmi::on_value( - async_transform_on_value(f) - // [f](Out& out, auto&& v) { - // using V = decltype(v); - // using Result = decltype(f((V&&) v)); - // static_assert(::pushmi::SemiMovable, - // "none of the functions supplied to transform can convert this value"); - // static_assert(::pushmi::SingleReceiver, - // "Result of value transform cannot be delivered to Out"); - // ::pushmi::set_value(out, f((V&&) v)); - // } - ) + ::pushmi::on_value([f](auto& data, auto&& asyncToken) mutable { + async_transform_on_value_impl< + F, + std::decay_t, + std::decay_t>(std::move(f))( + data, std::move(asyncToken)); + }) ); }) ) diff --git a/test/AsyncTest.cpp b/test/AsyncTest.cpp index 8a8f2f5..222f153 100644 --- a/test/AsyncTest.cpp +++ b/test/AsyncTest.cpp @@ -38,7 +38,7 @@ SCENARIO( "async", "[async]" ) { auto comparablething = op::just(2.0) | op::via([&](){return nt;}) | op::transform([exec = nt](auto v){ - auto token = pushmi::detail::AsyncToken< + auto token = pushmi::detail::NewThreadAsyncToken< std::decay_t, std::decay_t>{ exec}; token.dataPtr_->v_ = std::forward(v); From ed58ce5a7bb8ab28f789fbd8e173ff94221584ce Mon Sep 17 00:00:00 2001 From: Lee Howes Date: Wed, 18 Jul 2018 15:03:39 -0700 Subject: [PATCH 07/18] Add inline executor support to show customisation. --- include/pushmi/o/async.h | 40 +++++++++++--- test/AsyncTest.cpp | 110 +++++++++++++++++++++++++++++++++++++-- 2 files changed, 139 insertions(+), 11 deletions(-) diff --git a/include/pushmi/o/async.h b/include/pushmi/o/async.h index f4a5468..f405c48 100644 --- a/include/pushmi/o/async.h +++ b/include/pushmi/o/async.h @@ -62,7 +62,23 @@ namespace detail { template struct async_fork_on_value_impl { void operator()(Executor exec, Data& data, Value&& value) { - static_assert(std::is_same::value, "Inline not yet implemented for fork"); + + ::pushmi::submit( + exec, + ::pushmi::now(exec), + ::pushmi::make_single( + [value = (Value&&)value, + out = std::move(static_cast::out_t&>(data)), + exec](auto) mutable { + // Token hard coded for this executor type at the moment + auto token = InlineAsyncToken< + std::decay_t, std::decay_t>{ + exec}; + token.value_ = std::forward(value); + ::pushmi::set_value(out, std::move(token)); + } + ) + ); } }; @@ -71,7 +87,6 @@ namespace detail { template struct async_fork_on_value_impl { void operator()(decltype(new_thread()) exec, Data& data, Value&& value) { - ::pushmi::submit( exec, ::pushmi::now(exec), @@ -155,11 +170,14 @@ namespace detail { return {std::move(out)}; } - // Generic version + // Generic version, using inline execution template struct async_join_on_value_impl { void operator()(Data& data, Token&& token) { - static_assert(std::is_same::value, "Inline not yet implemented for join"); + + ::pushmi::set_value( + std::move(static_cast::out_t&>(data)), + std::move(token.value_)); } }; @@ -167,8 +185,10 @@ namespace detail { template struct async_join_on_value_impl< NewThreadAsyncToken, Data> { + using token_t = NewThreadAsyncToken; void operator()(Data& data, token_t&& asyncToken) { + auto exec = asyncToken.e_; ::pushmi::submit( exec, @@ -182,6 +202,7 @@ namespace detail { exec, asyncToken, out]() mutable { + std::unique_lock lk(asyncToken.dataPtr_->cvm_); if(!asyncToken.dataPtr_->flag_) { asyncToken.dataPtr_->cv_.wait( @@ -192,7 +213,6 @@ namespace detail { ::pushmi::now(exec), ::pushmi::make_single( [asyncToken, out, exec](auto) mutable { - // Token hard coded for this executor type at the moment ::pushmi::set_value(out, std::move(asyncToken.dataPtr_->v_)); } )); @@ -238,7 +258,7 @@ namespace detail { } }; - // Generic version + // Generic version implemented as inline template struct async_transform_on_value_impl { F f_; @@ -247,7 +267,11 @@ namespace detail { : f_(std::move(f)) {} template auto operator()(Out& out, V&& inputToken) { - static_assert(std::is_same::value, "Inline not yet implemented for transform"); + + auto outputToken = inputToken; + outputToken.value_ = f_(std::move(inputToken.value_)); + + ::pushmi::set_value(out, outputToken); } }; @@ -265,6 +289,7 @@ namespace detail { template auto operator()(Out& out, token_t&& inputToken) { + using Result = decltype(f_(std::declval())); using Executor = typename token_t::ExecutorType; static_assert(::pushmi::SemiMovable>, @@ -278,6 +303,7 @@ namespace detail { outputToken, out, func = this->f_]() mutable { + std::unique_lock inlk(inputToken.dataPtr_->cvm_); // Wait for input value if(!inputToken.dataPtr_->flag_) { diff --git a/test/AsyncTest.cpp b/test/AsyncTest.cpp index 222f153..236ea5d 100644 --- a/test/AsyncTest.cpp +++ b/test/AsyncTest.cpp @@ -4,6 +4,7 @@ #include #include +#include using namespace std::literals; @@ -21,19 +22,45 @@ using namespace std::literals; #include "pushmi/trampoline.h" #include "pushmi/new_thread.h" +// Pause ensures that the tasks take long enough that the enqueue task +// runs ahead and multiple threads are launched +void pause() { + std::this_thread::sleep_for(100ms); +} + using namespace pushmi::aliases; -SCENARIO( "async", "[async]" ) { +struct __inline_submit { + + template + void operator()(TP at, Out out) const { + auto tr = ::pushmi::trampoline(); + ::pushmi::submit(tr, std::move(at), std::move(out)); + } +}; + +inline auto inline_executor() { + return ::pushmi::make_time_single_deferred(__inline_submit{}); +} +SCENARIO( "async", "[async]" ) { + #if 1 GIVEN( "A new_thread time_single_deferred" ) { auto nt = v::new_thread(); using NT = decltype(nt); - auto workerTask = [](auto v) mutable {return v + 1;}; + std::mutex threads_mutex; WHEN( "async task chain used with via" ) { { std::vector values; + std::set threads; + auto workerTask = [&threads, &threads_mutex](auto v) mutable { + std::lock_guard lck(threads_mutex); + threads.insert(std::this_thread::get_id()); + pause(); + return v + 1; + }; auto comparablething = op::just(2.0) | op::via([&](){return nt;}) | @@ -48,24 +75,99 @@ SCENARIO( "async", "[async]" ) { v.dataPtr_->v_ = workerTask(v.dataPtr_->v_); return v; }) | + op::transform([workerTask](auto v) mutable { + v.dataPtr_->v_ = workerTask(v.dataPtr_->v_); + return v; + }) | + op::transform([workerTask](auto v) mutable { + v.dataPtr_->v_ = workerTask(v.dataPtr_->v_); + return v; + }) | + op::transform([workerTask](auto v) mutable { + v.dataPtr_->v_ = workerTask(v.dataPtr_->v_); + return v; + }) | + op::transform([workerTask](auto v) mutable { + v.dataPtr_->v_ = workerTask(v.dataPtr_->v_); + return v; + }) | + op::transform([workerTask](auto v) mutable { + v.dataPtr_->v_ = workerTask(v.dataPtr_->v_); + return v; + }) | op::transform([](auto v){return v.dataPtr_->v_;}) | op::blocking_submit(v::on_value([&](auto v) { values.push_back(std::to_string(v)); })); THEN( "only the first item was pushed" ) { - REQUIRE(values == std::vector{"3.000000"}); + REQUIRE(values == std::vector{"8.000000"}); + // Should be inline on a single thread as transform is literal + REQUIRE(threads.size() == 1); } } { std::vector values; + std::set threads; + auto workerTask = [&threads, &threads_mutex](auto v) mutable { + std::lock_guard lck(threads_mutex); + threads.insert(std::this_thread::get_id()); + std::this_thread::sleep_for(100ms); + pause(); + return v + 1; + }; + auto realthing = op::just(2.0) | op::async_fork([&](){return nt;}) | op::async_transform(workerTask) | + op::async_transform(workerTask) | + op::async_transform(workerTask) | + op::async_transform(workerTask) | + op::async_transform(workerTask) | + op::async_transform(workerTask) | + op::async_join() | + op::blocking_submit(v::on_value([&](auto v) { values.push_back(std::to_string(v)); })); + + THEN( "only the first item was pushed" ) { + REQUIRE(values == std::vector{"8.000000"}); + // Should be asynchronous on multiple threads + REQUIRE(threads.size() != 1); + } + } + } + } + #endif + + GIVEN( "An inline time_single_deferred" ) { + auto nt = inline_executor(); + using NT = decltype(nt); + std::mutex threads_mutex; + + WHEN( "async task chain used with via" ) { + { + std::vector values; + std::set threads; + auto workerTask = [&threads, &threads_mutex](auto v) mutable { + std::lock_guard lck(threads_mutex); + threads.insert(std::this_thread::get_id()); + pause(); + return v + 1; + }; + + auto realthing = op::just(2.0) | + op::async_fork([&](){return nt;}) | + op::async_transform(workerTask) | + op::async_transform(workerTask) | + op::async_transform(workerTask) | + op::async_transform(workerTask) | + op::async_transform(workerTask) | + op::async_transform(workerTask) | op::async_join() | op::blocking_submit(v::on_value([&](auto v) { values.push_back(std::to_string(v)); })); THEN( "only the first item was pushed" ) { - REQUIRE(values == std::vector{"3.000000"}); + REQUIRE(values == std::vector{"8.000000"}); + // Should have all been inline on a single thread + REQUIRE(threads.size() == 1); } } } From 22d3da4dfe97351e364dcb36dd48b941c9829169 Mon Sep 17 00:00:00 2001 From: Lee Howes Date: Wed, 18 Jul 2018 15:20:54 -0700 Subject: [PATCH 08/18] Remove #if --- test/AsyncTest.cpp | 2 -- 1 file changed, 2 deletions(-) diff --git a/test/AsyncTest.cpp b/test/AsyncTest.cpp index 236ea5d..a6cf8a7 100644 --- a/test/AsyncTest.cpp +++ b/test/AsyncTest.cpp @@ -44,7 +44,6 @@ inline auto inline_executor() { } SCENARIO( "async", "[async]" ) { - #if 1 GIVEN( "A new_thread time_single_deferred" ) { auto nt = v::new_thread(); using NT = decltype(nt); @@ -135,7 +134,6 @@ SCENARIO( "async", "[async]" ) { } } } - #endif GIVEN( "An inline time_single_deferred" ) { auto nt = inline_executor(); From c497ea9f2eb0e20e2388d3045009827e46bf1bb9 Mon Sep 17 00:00:00 2001 From: Lee Howes Date: Thu, 19 Jul 2018 14:01:52 -0700 Subject: [PATCH 09/18] Default bulk definition --- include/pushmi/o/async.h | 90 ++++++++++++++++++++++++++++++++++++++++ test/AsyncTest.cpp | 39 ++++++++++++----- 2 files changed, 118 insertions(+), 11 deletions(-) diff --git a/include/pushmi/o/async.h b/include/pushmi/o/async.h index f405c48..e62256b 100644 --- a/include/pushmi/o/async.h +++ b/include/pushmi/o/async.h @@ -358,11 +358,101 @@ namespace detail { }); } + + // Generic version implemented as inline + template< + class ValueFunction, + class ShapeF, + class SharedF, + class ResultS, + class Token, + class Data> + struct async_bulk_on_value_impl { + ValueFunction f_; + ShapeF shapeF_; + SharedF sharedF_; + ResultS resultS_; + + async_bulk_on_value_impl() = default; + constexpr explicit async_bulk_on_value_impl( + ValueFunction f, + ShapeF shapeF, + SharedF sharedF, + ResultS resultS) + : f_(std::move(f)), + shapeF_(std::move(shapeF)), + sharedF_(std::move(sharedF)), + resultS_(std::move(resultS)) {} + template + auto operator()(Out& out, V&& inputToken) { + + auto shape = shapeF_(inputToken.value_); + auto shared = sharedF_(inputToken.value_, shape); + using ShapeType = decltype(shape); + for(ShapeType i{}; i <= shape; ++i) { + f_(inputToken.value_, i, shared); + } + auto outputToken = inputToken; + outputToken.value_ = resultS_(shared); + + ::pushmi::set_value(out, std::move(outputToken)); + } + }; + + struct async_bulk_fn { + template + auto operator()(ValueFunction, ShapeF, SharedF, ResultS) const; + }; + + template + auto async_bulk_fn::operator()( + ValueFunction vfn, ShapeF shapeF, SharedF sharedF, ResultS resultS) + const { + return ::pushmi::constrain( + ::pushmi::lazy::Sender<::pushmi::_1>, + [vfn = std::move(vfn), + shapeF = std::move(shapeF), + sharedF = std::move(sharedF), + resultS = std::move(resultS)](auto in) { + using In = decltype(in); + return ::pushmi::detail::deferred_from>( + std::move(in), + ::pushmi::detail::submit_transform_out( + ::pushmi::constrain( + ::pushmi::lazy::Receiver<::pushmi::_1>, + [vfn, shapeF, sharedF, resultS](auto out) { + using Out = decltype(out); + return ::pushmi::detail::out_from_fn()( + std::move(out), + // copy 'f' to allow multiple calls to submit + ::pushmi::on_value( + [vfn, shapeF, sharedF, resultS]( + auto& data, auto&& asyncToken) mutable { + async_bulk_on_value_impl< + ValueFunction, + ShapeF, + SharedF, + ResultS, + std::decay_t, + std::decay_t>( + std::move(vfn), + std::move(shapeF), + std::move(sharedF), + std::move(resultS))(data, std::move(asyncToken)); + }) + ); + }) + ) + ); + }); + } + } // namespace detail namespace operators { PUSHMI_INLINE_VAR constexpr detail::async_join_fn async_join{}; PUSHMI_INLINE_VAR constexpr detail::async_fork_fn async_fork{}; PUSHMI_INLINE_VAR constexpr detail::async_transform_fn async_transform{}; +PUSHMI_INLINE_VAR constexpr detail::async_bulk_fn async_bulk{}; } // namespace operators } // namespace pushmi diff --git a/test/AsyncTest.cpp b/test/AsyncTest.cpp index a6cf8a7..7b7ccac 100644 --- a/test/AsyncTest.cpp +++ b/test/AsyncTest.cpp @@ -136,8 +136,7 @@ SCENARIO( "async", "[async]" ) { } GIVEN( "An inline time_single_deferred" ) { - auto nt = inline_executor(); - using NT = decltype(nt); + auto inline_exec = inline_executor(); std::mutex threads_mutex; WHEN( "async task chain used with via" ) { @@ -152,7 +151,7 @@ SCENARIO( "async", "[async]" ) { }; auto realthing = op::just(2.0) | - op::async_fork([&](){return nt;}) | + op::async_fork([&](){return inline_exec;}) | op::async_transform(workerTask) | op::async_transform(workerTask) | op::async_transform(workerTask) | @@ -170,12 +169,30 @@ SCENARIO( "async", "[async]" ) { } } } -} -/* -v::bulk_on_value( - [](size_t idx, auto& shared){shared += idx;}, - []() -> size_t { return 10; }, - [](size_t shape){ return 0; }, - [](auto& shared){return shared;}) -*/ + + GIVEN( "An inline time_single_deferred" ) { + auto inline_exec = inline_executor(); + std::mutex threads_mutex; + + WHEN( "async bulk" ) { + { + std::vector values; + + auto realthing = op::just(2.0) | + op::async_fork([&](){return inline_exec;}) | + op::async_bulk( + [](const auto& input, size_t idx, auto& shared){shared += idx;}, // on_value + [](const auto& input) -> size_t { return 10; }, // shape factory + [](const auto& input, size_t shape){ return 0; }, // shared factory + [](auto& shared){return shared;}) | // result selector + op::async_join() | + op::blocking_submit(v::on_value([&](auto v) { values.push_back(std::to_string(v)); })); + + THEN( "only the first item was pushed" ) { + REQUIRE(values == std::vector{"55.000000"}); + } + } + } + } +} From 890887a040956bee411bc8e992f3ebbc742fa794 Mon Sep 17 00:00:00 2001 From: Lee Howes Date: Thu, 19 Jul 2018 14:03:44 -0700 Subject: [PATCH 10/18] Cleanup --- test/AsyncTest.cpp | 6 ------ 1 file changed, 6 deletions(-) diff --git a/test/AsyncTest.cpp b/test/AsyncTest.cpp index 7b7ccac..66b5dae 100644 --- a/test/AsyncTest.cpp +++ b/test/AsyncTest.cpp @@ -168,12 +168,6 @@ SCENARIO( "async", "[async]" ) { } } } - } - - - GIVEN( "An inline time_single_deferred" ) { - auto inline_exec = inline_executor(); - std::mutex threads_mutex; WHEN( "async bulk" ) { { From 2c1cad2e2af7ea1824c21ddcbf6cd8eb10a06baa Mon Sep 17 00:00:00 2001 From: Eric Niebler Date: Thu, 2 Aug 2018 09:19:11 -0700 Subject: [PATCH 11/18] add async.h to the single header --- buildSingleHeader.cmake | 1 + include/pushmi.h | 575 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 576 insertions(+) diff --git a/buildSingleHeader.cmake b/buildSingleHeader.cmake index a1263ea..26aa376 100644 --- a/buildSingleHeader.cmake +++ b/buildSingleHeader.cmake @@ -48,6 +48,7 @@ set(header_files "${CMAKE_CURRENT_SOURCE_DIR}/include/pushmi/o/extension_operators.h" "${CMAKE_CURRENT_SOURCE_DIR}/include/pushmi/o/submit.h" "${CMAKE_CURRENT_SOURCE_DIR}/include/pushmi/subject.h" + "${CMAKE_CURRENT_SOURCE_DIR}/include/pushmi/o/async.h" "${CMAKE_CURRENT_SOURCE_DIR}/include/pushmi/o/empty.h" "${CMAKE_CURRENT_SOURCE_DIR}/include/pushmi/o/from.h" "${CMAKE_CURRENT_SOURCE_DIR}/include/pushmi/o/just.h" diff --git a/include/pushmi.h b/include/pushmi.h index 026357e..7335c1f 100644 --- a/include/pushmi.h +++ b/include/pushmi.h @@ -6522,6 +6522,581 @@ struct subject { } }; +} // namespace pushmi +//#pragma once +// Copyright (c) 2018-present, Facebook, Inc. +// +// This source code is licensed under the MIT license found in the +// LICENSE file in the root directory of this source tree. + +//#include "../piping.h" +//#include "../executor.h" +//#include "../new_thread.h" +//#include "extension_operators.h" + +namespace pushmi { +namespace detail { + using new_thread_t = decltype(new_thread()); + + template + struct NewThreadAsyncToken { + public: + using ValueType = ValueType_; + using ExecutorType = ExecutorType_; + struct Data { + ValueType v_; + std::condition_variable cv_; + std::mutex cvm_; + bool flag_ = false; + }; + + NewThreadAsyncToken(ExecutorType e) : + e_{std::move(e)}, dataPtr_{std::make_shared()} {} + + ExecutorType e_; + std::shared_ptr dataPtr_; + }; + + template + struct InlineAsyncToken { + public: + using ValueType = ValueType_; + using ExecutorType = ExecutorType_; + + InlineAsyncToken(ExecutorType e) : + e_{std::move(e)} {} + + ExecutorType e_; + ValueType value_; + }; + + template + struct async_fork_fn_data : public Out { + using out_t = Out; + Executor exec; + + async_fork_fn_data(Out out, Executor exec) : + Out(std::move(out)), exec(std::move(exec)) {} + }; + + template + auto make_async_fork_fn_data(Out out, Executor ex) { + return async_fork_fn_data{std::move(out), std::move(ex)}; + } + + // Generic version + template + struct async_fork_on_value_impl { + void operator()(Executor exec, Data& data, Value&& value) { + ::pushmi::submit( + exec, + ::pushmi::now(exec), + ::pushmi::make_single( + [value = (Value&&) value, + out = std::move(static_cast::out_t&>(data)), + exec](auto) mutable { + // Token hard coded for this executor type at the moment + auto token = InlineAsyncToken< + std::decay_t, std::decay_t>{exec}; + token.value_ = std::forward(value); + ::pushmi::set_value(out, std::move(token)); + } + ) + ); + } + }; + + // Customisation for NewThreadAsyncToken + template + struct async_fork_on_value_impl { + using out_t = typename std::decay_t::out_t; + void operator()(new_thread_t exec, Data& data, Value&& value) { + ::pushmi::submit( + exec, + ::pushmi::now(exec), + ::pushmi::make_single( + [value = (Value&&)value, + out = std::move(static_cast(data)), + exec](auto) mutable { + // Token hard coded for this executor type at the moment + auto token = NewThreadAsyncToken< + std::decay_t, std::decay_t>{ + exec}; + token.dataPtr_->v_ = std::forward(value); + token.dataPtr_->flag_ = true; + ::pushmi::set_value(out, std::move(token)); + } + ) + ); + } + }; + + struct async_fork_fn { + private: + struct value_fn { + template + void operator()(Data& data, V&& v) const { + auto exec = data.exec; + async_fork_on_value_impl{}(exec, data, (V&&) v); + } + }; + template + struct error_fn { + private: + template + struct on_value_impl { + E e_; + Out out_; + void operator()(any) { + ::pushmi::set_error(out_, std::move(e_)); + } + }; + public: + template + void operator()(Data& data, E e) const noexcept { + ::pushmi::submit( + data.exec, + ::pushmi::now(data.exec), + ::pushmi::make_single( + on_value_impl{std::move(e), std::move(static_cast(data))} + ) + ); + } + }; + template + struct done_fn { + struct on_value_impl { + Out out_; + void operator()(any) { + ::pushmi::set_done(out_); + } + }; + template + void operator()(Data& data) const { + ::pushmi::submit( + data.exec, + ::pushmi::now(data.exec), + ::pushmi::make_single( + on_value_impl{std::move(static_cast(data))} + ) + ); + } + }; + template + struct out_impl { + ExecutorFactory ef_; + out_impl(ExecutorFactory ef) : ef_(std::move(ef)) {} + PUSHMI_TEMPLATE(class Out) + (requires Receiver) + auto operator()(Out out) const { + auto exec = ef_(); + return ::pushmi::detail::out_from_fn()( + make_async_fork_fn_data(std::move(out), std::move(exec)), + // copy 'f' to allow multiple calls to submit + value_fn{}, + error_fn{}, + done_fn{} + ); + } + }; + template + struct in_impl { + ExecutorFactory ef_; + in_impl(ExecutorFactory ef) : ef_(std::move(ef)) {} + PUSHMI_TEMPLATE(class In) + (requires Sender) + auto operator()(In in) const { + return ::pushmi::detail::deferred_from>( + std::move(in), + ::pushmi::detail::submit_transform_out( + out_impl{ef_} + ) + ); + } + }; + public: + PUSHMI_TEMPLATE(class ExecutorFactory) + (requires Invocable) + auto operator()(ExecutorFactory ef) const { + return in_impl{std::move(ef)}; + } + }; + + template + struct async_join_fn_data : public Out { + using out_t = Out; + async_join_fn_data(Out out) : + Out(std::move(out)) {} + }; + + template + auto make_async_join_fn_data(Out out) { + return async_join_fn_data{std::move(out)}; + } + + // Generic version, using inline execution + template + struct async_join_on_value_impl { + void operator()(Data& data, Token&& token) { + ::pushmi::set_value( + std::move(static_cast::out_t&>(data)), + std::move(token.value_)); + } + }; + + struct condition { + bool* flag_; + bool operator()() const { + return *flag_; + } + }; + + // Customisation for NewThreadAsyncToken + template + struct async_join_on_value_impl, Data> { + using token_t = NewThreadAsyncToken; + private: + using out_t = typename std::decay_t::out_t; + struct thread_fn { + struct on_value_fn { + token_t asyncToken_; + out_t out_; + void operator()(any) { + ::pushmi::set_value(out_, std::move(asyncToken_.dataPtr_->v_)); + } + }; + token_t asyncToken_; + out_t out_; + void operator()() { + std::unique_lock lk(asyncToken_.dataPtr_->cvm_); + if(!asyncToken_.dataPtr_->flag_) { + asyncToken_.dataPtr_->cv_.wait( + lk, condition{&asyncToken_.dataPtr_->flag_} + ); + } + ::pushmi::submit( + asyncToken_.e_, + ::pushmi::now(asyncToken_.e_), + ::pushmi::make_single(on_value_fn{asyncToken_, out_}) + ); + } + }; + struct on_value_fn { + token_t asyncToken_; + out_t out_; + void operator()(any) { + // Token hard coded for this executor type at the moment + std::thread t(thread_fn{asyncToken_, out_}); + t.detach(); + } + }; + public: + void operator()(Data& data, token_t&& asyncToken) { + ::pushmi::submit( + asyncToken.e_, + ::pushmi::now(asyncToken.e_), + ::pushmi::make_single( + on_value_fn{asyncToken, std::move(static_cast(data))} + ) + ); + } + }; + + struct async_join_fn { + private: + struct value_fn { + template + void operator()(Data& data, Token&& asyncToken) const { + async_join_on_value_impl, Data>{}( + data, + std::move(asyncToken) // BUGBUG this is suspect + ); + } + }; + template + struct error_fn { + template + void operator()(Data& data, E e) const noexcept { + auto out = std::move(static_cast(data)); + ::pushmi::set_error(out, std::move(e)); + } + }; + template + struct done_fn { + template + void operator()(Data& data) const noexcept { + auto out = std::move(static_cast(data)); + ::pushmi::set_done(out); + } + }; + template + struct out_impl { + PUSHMI_TEMPLATE (class Out) + (requires Receiver) + auto operator()(Out out) const { + return ::pushmi::detail::out_from_fn()( + make_async_join_fn_data(std::move(out)), + // copy 'f' to allow multiple calls to submit + ::pushmi::on_value(value_fn{}), + ::pushmi::on_error(error_fn{}), + ::pushmi::on_done(done_fn{}) + ); + } + }; + struct in_impl { + PUSHMI_TEMPLATE(class In) + (requires Sender) + auto operator()(In in) const { + return ::pushmi::detail::deferred_from>( + std::move(in), + ::pushmi::detail::submit_transform_out(out_impl{}) + ); + } + }; + public: + auto operator()() const { + return in_impl{}; + } + }; + + // Generic version implemented as inline + template + struct async_transform_on_value_impl { + F f_; + async_transform_on_value_impl() = default; + constexpr explicit async_transform_on_value_impl(F f) + : f_(std::move(f)) {} + + template + auto operator()(Out& out, V&& inputToken) { + auto outputToken = inputToken; + outputToken.value_ = f_(std::move(inputToken.value_)); + ::pushmi::set_value(out, outputToken); + } + }; + + // Customisation for NewThreadAsyncToken + template + struct async_transform_on_value_impl< + F, NewThreadAsyncToken, Data> { + + using token_t = NewThreadAsyncToken; + private: + using Result = invoke_result_t; + using Executor = typename token_t::ExecutorType; + static_assert(::pushmi::SemiMovable>, + "none of the functions supplied to transform can convert this value"); + using OutputToken = NewThreadAsyncToken; + + struct thread_fn { + token_t inputToken_; + OutputToken outputToken_; + F func_; + void operator()() { + std::unique_lock inlk(inputToken_.dataPtr_->cvm_); + // Wait for input value + if(!inputToken_.dataPtr_->flag_) { + inputToken_.dataPtr_->cv_.wait( + inlk, condition{&inputToken_.dataPtr_->flag_} + ); + } + // Compute + auto result = func_(inputToken_.dataPtr_->v_); + // Move output and notify + std::unique_lock outlk(outputToken_.dataPtr_->cvm_); + outputToken_.dataPtr_->v_ = std::move(result); + outputToken_.dataPtr_->flag_ = true; + outputToken_.dataPtr_->cv_.notify_all(); + } + }; + + F f_; + + public: + async_transform_on_value_impl() = default; + constexpr explicit async_transform_on_value_impl(F f) + : f_(std::move(f)) {} + + template + auto operator()(Out& out, token_t&& inputToken) { + static_assert(::pushmi::SingleReceiver>, + "Result of value transform cannot be delivered to Out"); + OutputToken outputToken{inputToken.e_}; + std::thread t(thread_fn{inputToken, outputToken, f_}); + t.detach(); + ::pushmi::set_value(out, outputToken); + } + }; + + struct async_transform_fn { + private: + template + struct on_value_fn { + F f_; + template + void operator()(Data& data, Token&& asyncToken) { + async_transform_on_value_impl, std::decay_t>( + std::move(f_))(data, std::move(asyncToken) + ); + } + }; + template + struct out_impl { + F f_; + out_impl(F f) : f_(f) {} + PUSHMI_TEMPLATE(class Out) + (requires Receiver) + auto operator()(Out out) const { + return ::pushmi::detail::out_from_fn()( + std::move(out), + // copy 'f' to allow multiple calls to submit + ::pushmi::on_value(on_value_fn{f_}) + ); + } + }; + template + struct in_impl { + F f_; + in_impl(F f) : f_(std::move(f)) {} + PUSHMI_TEMPLATE(class In) + (requires Sender) + auto operator()(In in) { + return ::pushmi::detail::deferred_from>( + std::move(in), + ::pushmi::detail::submit_transform_out(out_impl{f_}) + ); + } + }; + template + static auto make_in_impl(F f) { + return in_impl{std::move(f)}; + } + public: + template + auto operator()(FN... fn) const { + return make_in_impl(overload(std::move(fn)...)); + } + }; + + // Generic version implemented as inline + template< + class ValueFunction, + class ShapeF, + class SharedF, + class ResultS, + class Token, + class Data> + struct async_bulk_on_value_impl { + ValueFunction f_; + ShapeF shapeF_; + SharedF sharedF_; + ResultS resultS_; + + async_bulk_on_value_impl() = default; + constexpr explicit async_bulk_on_value_impl( + ValueFunction f, + ShapeF shapeF, + SharedF sharedF, + ResultS resultS) + : f_(std::move(f)), + shapeF_(std::move(shapeF)), + sharedF_(std::move(sharedF)), + resultS_(std::move(resultS)) {} + + template + auto operator()(Out& out, V&& inputToken) { + auto shape = shapeF_(inputToken.value_); + auto shared = sharedF_(inputToken.value_, shape); + using ShapeType = decltype(shape); + for(ShapeType i{}; i <= shape; ++i) { + f_(inputToken.value_, i, shared); + } + auto outputToken = inputToken; + outputToken.value_ = resultS_(shared); + ::pushmi::set_value(out, std::move(outputToken)); + } + }; + + struct async_bulk_fn { + private: + template + struct on_value_fn { + ValueFunction vfn_; + ShapeF shapeF_; + SharedF sharedF_; + ResultS resultS_; + template + void operator()(Data& data, Token&& asyncToken) { + async_bulk_on_value_impl< + ValueFunction, + ShapeF, + SharedF, + ResultS, + std::decay_t, + std::decay_t>( + std::move(vfn_), + std::move(shapeF_), + std::move(sharedF_), + std::move(resultS_))(data, std::move(asyncToken)); + } + }; + template + struct out_impl { + ValueFunction vfn_; + ShapeF shapeF_; + SharedF sharedF_; + ResultS resultS_; + PUSHMI_TEMPLATE (class Out) + (requires Receiver) + auto operator()(Out out) const { + return ::pushmi::detail::out_from_fn()( + std::move(out), + // copy 'f' to allow multiple calls to submit + ::pushmi::on_value( + on_value_fn{ + vfn_, shapeF_, sharedF_, resultS_} + ) + ); + } + }; + template + struct in_impl { + ValueFunction vfn_; + ShapeF shapeF_; + SharedF sharedF_; + ResultS resultS_; + PUSHMI_TEMPLATE (class In) + (requires Sender) + auto operator()(In in) const { + return ::pushmi::detail::deferred_from>( + std::move(in), + ::pushmi::detail::submit_transform_out( + out_impl{ + vfn_, shapeF_, sharedF_, resultS_} + ) + ); + } + }; + public: + template + auto operator()(ValueFunction, ShapeF, SharedF, ResultS) const; + }; + + template + auto async_bulk_fn::operator()( + ValueFunction vfn, ShapeF shapeF, SharedF sharedF, ResultS resultS) + const { + return in_impl{ + std::move(vfn), std::move(shapeF), std::move(sharedF), std::move(resultS)}; + } +} // namespace detail + +namespace operators { +PUSHMI_INLINE_VAR constexpr detail::async_join_fn async_join{}; +PUSHMI_INLINE_VAR constexpr detail::async_fork_fn async_fork{}; +PUSHMI_INLINE_VAR constexpr detail::async_transform_fn async_transform{}; +PUSHMI_INLINE_VAR constexpr detail::async_bulk_fn async_bulk{}; +} // namespace operators } // namespace pushmi // clang-format off // clang format does not support the '<>' in the lambda syntax yet.. []<>()->{} From 939f6a2562424d4b7c922985e37b65ce6b6f2a24 Mon Sep 17 00:00:00 2001 From: Lee Howes Date: Thu, 2 Aug 2018 14:58:04 -0700 Subject: [PATCH 12/18] Move all code into a single customization point for fork. --- include/pushmi.h | 173 +++++++++++++++++++++++++++------------ include/pushmi/o/async.h | 173 +++++++++++++++++++++++++++------------ 2 files changed, 242 insertions(+), 104 deletions(-) diff --git a/include/pushmi.h b/include/pushmi.h index 7335c1f..81d58a0 100644 --- a/include/pushmi.h +++ b/include/pushmi.h @@ -6584,60 +6584,115 @@ namespace detail { return async_fork_fn_data{std::move(out), std::move(ex)}; } + // Generic version - template - struct async_fork_on_value_impl { - void operator()(Executor exec, Data& data, Value&& value) { - ::pushmi::submit( - exec, - ::pushmi::now(exec), - ::pushmi::make_single( - [value = (Value&&) value, - out = std::move(static_cast::out_t&>(data)), - exec](auto) mutable { - // Token hard coded for this executor type at the moment - auto token = InlineAsyncToken< - std::decay_t, std::decay_t>{exec}; - token.value_ = std::forward(value); - ::pushmi::set_value(out, std::move(token)); - } - ) - ); - } - }; + template + struct async_fork_customization { + private: + struct value_fn { + template + void operator()(Data& data, Value&& value) const { + auto exec = data.exec; + ::pushmi::submit( + exec, + ::pushmi::now(exec), + ::pushmi::make_single( + [value = (Value&&) value, + out = std::move(static_cast::out_t&>(data)), + exec](auto) mutable { + // Token hard coded for this executor type at the moment + auto token = InlineAsyncToken< + std::decay_t, std::decay_t>{exec}; + token.value_ = std::forward(value); + ::pushmi::set_value(out, std::move(token)); + } + ) + ); + } + }; + template + struct error_fn { + private: + template + struct on_value_impl { + E e_; + Out out_; + void operator()(any) { + ::pushmi::set_error(out_, std::move(e_)); + } + }; + public: + template + void operator()(Data& data, E e) const noexcept { + ::pushmi::submit( + data.exec, + ::pushmi::now(data.exec), + ::pushmi::make_single( + on_value_impl{std::move(e), std::move(static_cast(data))} + ) + ); + } + }; + template + struct done_fn { + struct on_value_impl { + Out out_; + void operator()(any) { + ::pushmi::set_done(out_); + } + }; + template + void operator()(Data& data) const { + ::pushmi::submit( + data.exec, + ::pushmi::now(data.exec), + ::pushmi::make_single( + on_value_impl{std::move(static_cast(data))} + ) + ); + } + }; - // Customisation for NewThreadAsyncToken - template - struct async_fork_on_value_impl { - using out_t = typename std::decay_t::out_t; - void operator()(new_thread_t exec, Data& data, Value&& value) { - ::pushmi::submit( - exec, - ::pushmi::now(exec), - ::pushmi::make_single( - [value = (Value&&)value, - out = std::move(static_cast(data)), - exec](auto) mutable { - // Token hard coded for this executor type at the moment - auto token = NewThreadAsyncToken< - std::decay_t, std::decay_t>{ - exec}; - token.dataPtr_->v_ = std::forward(value); - token.dataPtr_->flag_ = true; - ::pushmi::set_value(out, std::move(token)); - } - ) + public: + template + auto operator()(Out out, Executor exec) { + return ::pushmi::detail::out_from_fn()( + make_async_fork_fn_data(std::move(out), std::move(exec)), + // copy 'f' to allow multiple calls to submit + value_fn{}, + error_fn{}, + done_fn{} ); } }; - struct async_fork_fn { + + // Customisation for NewThreadAsyncToken + template + struct async_fork_customization { private: + using Executor = new_thread_t; struct value_fn { - template - void operator()(Data& data, V&& v) const { + template + void operator()(Data& data, Value&& value) const { auto exec = data.exec; - async_fork_on_value_impl{}(exec, data, (V&&) v); + ::pushmi::submit( + exec, + ::pushmi::now(exec), + ::pushmi::make_single( + [value = (Value&&)value, + out = std::move(static_cast::out_t&>(data)), + exec](auto) mutable { + // Token hard coded for this executor type at the moment + auto token = NewThreadAsyncToken< + std::decay_t, std::decay_t>{ + exec}; + token.dataPtr_->v_ = std::forward(value); + token.dataPtr_->flag_ = true; + ::pushmi::set_value(out, std::move(token)); + } + ) + ); } }; template @@ -6682,6 +6737,22 @@ namespace detail { ); } }; + + public: + template + auto operator()(Out out, Executor exec) { + return ::pushmi::detail::out_from_fn()( + make_async_fork_fn_data(std::move(out), std::move(exec)), + // copy 'f' to allow multiple calls to submit + value_fn{}, + error_fn{}, + done_fn{} + ); + } + }; + + struct async_fork_fn { + private: template struct out_impl { ExecutorFactory ef_; @@ -6690,13 +6761,11 @@ namespace detail { (requires Receiver) auto operator()(Out out) const { auto exec = ef_(); - return ::pushmi::detail::out_from_fn()( - make_async_fork_fn_data(std::move(out), std::move(exec)), - // copy 'f' to allow multiple calls to submit - value_fn{}, - error_fn{}, - done_fn{} - ); + // Call customization point for fork + // TODO: how should this actually customise. + // do we need the In parameter? + return async_fork_customization, In>{}( + out, exec); } }; template diff --git a/include/pushmi/o/async.h b/include/pushmi/o/async.h index b590dec..e131332 100644 --- a/include/pushmi/o/async.h +++ b/include/pushmi/o/async.h @@ -59,60 +59,115 @@ namespace detail { return async_fork_fn_data{std::move(out), std::move(ex)}; } + // Generic version - template - struct async_fork_on_value_impl { - void operator()(Executor exec, Data& data, Value&& value) { - ::pushmi::submit( - exec, - ::pushmi::now(exec), - ::pushmi::make_single( - [value = (Value&&) value, - out = std::move(static_cast::out_t&>(data)), - exec](auto) mutable { - // Token hard coded for this executor type at the moment - auto token = InlineAsyncToken< - std::decay_t, std::decay_t>{exec}; - token.value_ = std::forward(value); - ::pushmi::set_value(out, std::move(token)); - } - ) - ); - } - }; + template + struct async_fork_customization { + private: + struct value_fn { + template + void operator()(Data& data, Value&& value) const { + auto exec = data.exec; + ::pushmi::submit( + exec, + ::pushmi::now(exec), + ::pushmi::make_single( + [value = (Value&&) value, + out = std::move(static_cast::out_t&>(data)), + exec](auto) mutable { + // Token hard coded for this executor type at the moment + auto token = InlineAsyncToken< + std::decay_t, std::decay_t>{exec}; + token.value_ = std::forward(value); + ::pushmi::set_value(out, std::move(token)); + } + ) + ); + } + }; + template + struct error_fn { + private: + template + struct on_value_impl { + E e_; + Out out_; + void operator()(any) { + ::pushmi::set_error(out_, std::move(e_)); + } + }; + public: + template + void operator()(Data& data, E e) const noexcept { + ::pushmi::submit( + data.exec, + ::pushmi::now(data.exec), + ::pushmi::make_single( + on_value_impl{std::move(e), std::move(static_cast(data))} + ) + ); + } + }; + template + struct done_fn { + struct on_value_impl { + Out out_; + void operator()(any) { + ::pushmi::set_done(out_); + } + }; + template + void operator()(Data& data) const { + ::pushmi::submit( + data.exec, + ::pushmi::now(data.exec), + ::pushmi::make_single( + on_value_impl{std::move(static_cast(data))} + ) + ); + } + }; - // Customisation for NewThreadAsyncToken - template - struct async_fork_on_value_impl { - using out_t = typename std::decay_t::out_t; - void operator()(new_thread_t exec, Data& data, Value&& value) { - ::pushmi::submit( - exec, - ::pushmi::now(exec), - ::pushmi::make_single( - [value = (Value&&)value, - out = std::move(static_cast(data)), - exec](auto) mutable { - // Token hard coded for this executor type at the moment - auto token = NewThreadAsyncToken< - std::decay_t, std::decay_t>{ - exec}; - token.dataPtr_->v_ = std::forward(value); - token.dataPtr_->flag_ = true; - ::pushmi::set_value(out, std::move(token)); - } - ) + public: + template + auto operator()(Out out, Executor exec) { + return ::pushmi::detail::out_from_fn()( + make_async_fork_fn_data(std::move(out), std::move(exec)), + // copy 'f' to allow multiple calls to submit + value_fn{}, + error_fn{}, + done_fn{} ); } }; - struct async_fork_fn { + + // Customisation for NewThreadAsyncToken + template + struct async_fork_customization { private: + using Executor = new_thread_t; struct value_fn { - template - void operator()(Data& data, V&& v) const { + template + void operator()(Data& data, Value&& value) const { auto exec = data.exec; - async_fork_on_value_impl{}(exec, data, (V&&) v); + ::pushmi::submit( + exec, + ::pushmi::now(exec), + ::pushmi::make_single( + [value = (Value&&)value, + out = std::move(static_cast::out_t&>(data)), + exec](auto) mutable { + // Token hard coded for this executor type at the moment + auto token = NewThreadAsyncToken< + std::decay_t, std::decay_t>{ + exec}; + token.dataPtr_->v_ = std::forward(value); + token.dataPtr_->flag_ = true; + ::pushmi::set_value(out, std::move(token)); + } + ) + ); } }; template @@ -157,6 +212,22 @@ namespace detail { ); } }; + + public: + template + auto operator()(Out out, Executor exec) { + return ::pushmi::detail::out_from_fn()( + make_async_fork_fn_data(std::move(out), std::move(exec)), + // copy 'f' to allow multiple calls to submit + value_fn{}, + error_fn{}, + done_fn{} + ); + } + }; + + struct async_fork_fn { + private: template struct out_impl { ExecutorFactory ef_; @@ -165,13 +236,11 @@ namespace detail { (requires Receiver) auto operator()(Out out) const { auto exec = ef_(); - return ::pushmi::detail::out_from_fn()( - make_async_fork_fn_data(std::move(out), std::move(exec)), - // copy 'f' to allow multiple calls to submit - value_fn{}, - error_fn{}, - done_fn{} - ); + // Call customization point for fork + // TODO: how should this actually customise. + // do we need the In parameter? + return async_fork_customization, In>{}( + out, exec); } }; template From 52ed27bebcac291e8ffd569bf10f81da1113f85d Mon Sep 17 00:00:00 2001 From: Lee Howes Date: Thu, 2 Aug 2018 16:04:25 -0700 Subject: [PATCH 13/18] Switched to use functions for customisation. --- include/pushmi.h | 17 +++++++++++++---- include/pushmi/o/async.h | 20 ++++++++++++++++---- 2 files changed, 29 insertions(+), 8 deletions(-) diff --git a/include/pushmi.h b/include/pushmi.h index 81d58a0..acfd29f 100644 --- a/include/pushmi.h +++ b/include/pushmi.h @@ -6587,7 +6587,7 @@ namespace detail { // Generic version template - struct async_fork_customization { + struct async_fork_customization_generic { private: struct value_fn { template @@ -6669,7 +6669,7 @@ namespace detail { // Customisation for NewThreadAsyncToken template - struct async_fork_customization { + struct async_fork_customization_new_thread_t { private: using Executor = new_thread_t; struct value_fn { @@ -6751,6 +6751,16 @@ namespace detail { } }; + template + auto async_fork_customization(Executor exec, Out out) { + return async_fork_customization_generic{}(out, exec); + } + + template + auto async_fork_customization(new_thread_t exec, Out out) { + return async_fork_customization_new_thread_t{}(out, exec); + } + struct async_fork_fn { private: template @@ -6764,8 +6774,7 @@ namespace detail { // Call customization point for fork // TODO: how should this actually customise. // do we need the In parameter? - return async_fork_customization, In>{}( - out, exec); + return async_fork_customization(exec, out); } }; template diff --git a/include/pushmi/o/async.h b/include/pushmi/o/async.h index e131332..a5a4e51 100644 --- a/include/pushmi/o/async.h +++ b/include/pushmi/o/async.h @@ -62,7 +62,7 @@ namespace detail { // Generic version template - struct async_fork_customization { + struct async_fork_customization_generic { private: struct value_fn { template @@ -144,7 +144,7 @@ namespace detail { // Customisation for NewThreadAsyncToken template - struct async_fork_customization { + struct async_fork_customization_new_thread_t { private: using Executor = new_thread_t; struct value_fn { @@ -226,6 +226,19 @@ namespace detail { } }; + // Generalisation to customise the entire protocol + // TODO: The In template parameter here makes this customisation point + // seem strange + template + auto async_fork_customization(Executor exec, Out out) { + return async_fork_customization_generic{}(out, exec); + } + + template + auto async_fork_customization(new_thread_t exec, Out out) { + return async_fork_customization_new_thread_t{}(out, exec); + } + struct async_fork_fn { private: template @@ -239,8 +252,7 @@ namespace detail { // Call customization point for fork // TODO: how should this actually customise. // do we need the In parameter? - return async_fork_customization, In>{}( - out, exec); + return async_fork_customization(exec, out); } }; template From 5e59a39b9414ebb05aa3511cca622b6920057de6 Mon Sep 17 00:00:00 2001 From: Lee Howes Date: Thu, 2 Aug 2018 16:06:39 -0700 Subject: [PATCH 14/18] Comments on other operators. --- include/pushmi.h | 13 +++++++++++++ include/pushmi/o/async.h | 10 ++++++++++ 2 files changed, 23 insertions(+) diff --git a/include/pushmi.h b/include/pushmi.h index acfd29f..86e7383 100644 --- a/include/pushmi.h +++ b/include/pushmi.h @@ -6751,6 +6751,9 @@ namespace detail { } }; + // Generalisation to customise the entire protocol + // TODO: The In template parameter here makes this customisation point + // seem strange template auto async_fork_customization(Executor exec, Out out) { return async_fork_customization_generic{}(out, exec); @@ -6880,6 +6883,9 @@ namespace detail { } }; + // TODO: This should be transformed to use a single customisation point as for + // fork. To do this we need to get the executor consistently rather than + // getting it from the value method. struct async_join_fn { private: struct value_fn { @@ -7006,6 +7012,9 @@ namespace detail { } }; + // TODO: This should be transformed to use a single customisation point as for + // fork. To do this we need to get the executor consistently rather than + // getting it from the value method. struct async_transform_fn { private: template @@ -7095,6 +7104,10 @@ namespace detail { } }; + + // TODO: This should be transformed to use a single customisation point as for + // fork. To do this we need to get the executor consistently rather than + // getting it from the value method. struct async_bulk_fn { private: template diff --git a/include/pushmi/o/async.h b/include/pushmi/o/async.h index a5a4e51..d61b944 100644 --- a/include/pushmi/o/async.h +++ b/include/pushmi/o/async.h @@ -358,6 +358,9 @@ namespace detail { } }; + // TODO: This should be transformed to use a single customisation point as for + // fork. To do this we need to get the executor consistently rather than + // getting it from the value method. struct async_join_fn { private: struct value_fn { @@ -484,6 +487,9 @@ namespace detail { } }; + // TODO: This should be transformed to use a single customisation point as for + // fork. To do this we need to get the executor consistently rather than + // getting it from the value method. struct async_transform_fn { private: template @@ -573,6 +579,10 @@ namespace detail { } }; + + // TODO: This should be transformed to use a single customisation point as for + // fork. To do this we need to get the executor consistently rather than + // getting it from the value method. struct async_bulk_fn { private: template From 5af2e2486bc4244a424669f8c1fad80f9dcaa8cc Mon Sep 17 00:00:00 2001 From: Eric Niebler Date: Fri, 3 Aug 2018 10:24:33 -0700 Subject: [PATCH 15/18] work around compiler bugs, async customization point no longer depends on the actual type of the Sender, only its properties --- include/pushmi/o/async.h | 87 ++++++++++++++++++---------------------- 1 file changed, 40 insertions(+), 47 deletions(-) diff --git a/include/pushmi/o/async.h b/include/pushmi/o/async.h index d61b944..0a2c720 100644 --- a/include/pushmi/o/async.h +++ b/include/pushmi/o/async.h @@ -24,7 +24,6 @@ namespace detail { std::mutex cvm_; bool flag_ = false; }; - NewThreadAsyncToken(ExecutorType e) : e_{std::move(e)}, dataPtr_{std::make_shared()} {} @@ -61,20 +60,19 @@ namespace detail { // Generic version - template + template struct async_fork_customization_generic { private: struct value_fn { template void operator()(Data& data, Value&& value) const { - auto exec = data.exec; ::pushmi::submit( - exec, - ::pushmi::now(exec), + data.exec, + ::pushmi::now(data.exec), ::pushmi::make_single( [value = (Value&&) value, - out = std::move(static_cast::out_t&>(data)), - exec](auto) mutable { + out = std::move(static_cast(data)), + exec = data.exec](auto) mutable { // Token hard coded for this executor type at the moment auto token = InlineAsyncToken< std::decay_t, std::decay_t>{exec}; @@ -131,7 +129,7 @@ namespace detail { public: template auto operator()(Out out, Executor exec) { - return ::pushmi::detail::out_from_fn()( + return MakeReceiver{}( make_async_fork_fn_data(std::move(out), std::move(exec)), // copy 'f' to allow multiple calls to submit value_fn{}, @@ -141,28 +139,26 @@ namespace detail { } }; - // Customisation for NewThreadAsyncToken - template + template struct async_fork_customization_new_thread_t { private: using Executor = new_thread_t; struct value_fn { template void operator()(Data& data, Value&& value) const { - auto exec = data.exec; ::pushmi::submit( - exec, - ::pushmi::now(exec), + data.exec, + ::pushmi::now(data.exec), ::pushmi::make_single( - [value = (Value&&)value, - out = std::move(static_cast::out_t&>(data)), - exec](auto) mutable { + [value = (Value&&) value, + out = std::move(static_cast(data)), + exec = data.exec](auto) mutable { // Token hard coded for this executor type at the moment auto token = NewThreadAsyncToken< std::decay_t, std::decay_t>{ - exec}; - token.dataPtr_->v_ = std::forward(value); + std::move(exec)}; + token.dataPtr_->v_ = std::move(value); token.dataPtr_->flag_ = true; ::pushmi::set_value(out, std::move(token)); } @@ -216,7 +212,7 @@ namespace detail { public: template auto operator()(Out out, Executor exec) { - return ::pushmi::detail::out_from_fn()( + return MakeReceiver{}( make_async_fork_fn_data(std::move(out), std::move(exec)), // copy 'f' to allow multiple calls to submit value_fn{}, @@ -227,45 +223,40 @@ namespace detail { }; // Generalisation to customise the entire protocol - // TODO: The In template parameter here makes this customisation point - // seem strange - template + template auto async_fork_customization(Executor exec, Out out) { - return async_fork_customization_generic{}(out, exec); + return async_fork_customization_generic{}(out, exec); } - template - auto async_fork_customization(new_thread_t exec, Out out) { - return async_fork_customization_new_thread_t{}(out, exec); + template + auto async_fork_customization(new_thread_t exec, Out out) { + return async_fork_customization_new_thread_t{}(out, exec); } struct async_fork_fn { private: - template + template struct out_impl { ExecutorFactory ef_; - out_impl(ExecutorFactory ef) : ef_(std::move(ef)) {} PUSHMI_TEMPLATE(class Out) (requires Receiver) auto operator()(Out out) const { auto exec = ef_(); // Call customization point for fork // TODO: how should this actually customise. - // do we need the In parameter? - return async_fork_customization(exec, out); + return async_fork_customization(exec, out); } }; template struct in_impl { ExecutorFactory ef_; - in_impl(ExecutorFactory ef) : ef_(std::move(ef)) {} PUSHMI_TEMPLATE(class In) (requires Sender) auto operator()(In in) const { return ::pushmi::detail::deferred_from>( std::move(in), ::pushmi::detail::submit_transform_out( - out_impl{ef_} + out_impl>{ef_} ) ); } @@ -295,7 +286,7 @@ namespace detail { struct async_join_on_value_impl { void operator()(Data& data, Token&& token) { ::pushmi::set_value( - std::move(static_cast::out_t&>(data)), + std::move(static_cast(data)), std::move(token.value_)); } }; @@ -312,7 +303,7 @@ namespace detail { struct async_join_on_value_impl, Data> { using token_t = NewThreadAsyncToken; private: - using out_t = typename std::decay_t::out_t; + using out_t = typename Data::out_t; struct thread_fn { struct on_value_fn { token_t asyncToken_; @@ -388,12 +379,12 @@ namespace detail { ::pushmi::set_done(out); } }; - template + template struct out_impl { PUSHMI_TEMPLATE (class Out) (requires Receiver) auto operator()(Out out) const { - return ::pushmi::detail::out_from_fn()( + return MakeReceiver{}( make_async_join_fn_data(std::move(out)), // copy 'f' to allow multiple calls to submit ::pushmi::on_value(value_fn{}), @@ -408,7 +399,9 @@ namespace detail { auto operator()(In in) const { return ::pushmi::detail::deferred_from>( std::move(in), - ::pushmi::detail::submit_transform_out(out_impl{}) + ::pushmi::detail::submit_transform_out( + out_impl>{} + ) ); } }; @@ -497,19 +490,18 @@ namespace detail { F f_; template void operator()(Data& data, Token&& asyncToken) { - async_transform_on_value_impl, std::decay_t>( + async_transform_on_value_impl, Data>( std::move(f_))(data, std::move(asyncToken) ); } }; - template + template struct out_impl { F f_; - out_impl(F f) : f_(f) {} PUSHMI_TEMPLATE(class Out) (requires Receiver) auto operator()(Out out) const { - return ::pushmi::detail::out_from_fn()( + return MakeReceiver{}( std::move(out), // copy 'f' to allow multiple calls to submit ::pushmi::on_value(on_value_fn{f_}) @@ -519,13 +511,14 @@ namespace detail { template struct in_impl { F f_; - in_impl(F f) : f_(std::move(f)) {} PUSHMI_TEMPLATE(class In) (requires Sender) auto operator()(In in) { return ::pushmi::detail::deferred_from>( std::move(in), - ::pushmi::detail::submit_transform_out(out_impl{f_}) + ::pushmi::detail::submit_transform_out( + out_impl>{f_} + ) ); } }; @@ -599,14 +592,14 @@ namespace detail { SharedF, ResultS, std::decay_t, - std::decay_t>( + Data>( std::move(vfn_), std::move(shapeF_), std::move(sharedF_), std::move(resultS_))(data, std::move(asyncToken)); } }; - template + template struct out_impl { ValueFunction vfn_; ShapeF shapeF_; @@ -615,7 +608,7 @@ namespace detail { PUSHMI_TEMPLATE (class Out) (requires Receiver) auto operator()(Out out) const { - return ::pushmi::detail::out_from_fn()( + return MakeReceiver{}( std::move(out), // copy 'f' to allow multiple calls to submit ::pushmi::on_value( @@ -637,7 +630,7 @@ namespace detail { return ::pushmi::detail::deferred_from>( std::move(in), ::pushmi::detail::submit_transform_out( - out_impl{ + out_impl>{ vfn_, shapeF_, sharedF_, resultS_} ) ); From b4d207fd563760c5f0365e8e34ff52feb948ccdc Mon Sep 17 00:00:00 2001 From: Eric Niebler Date: Fri, 3 Aug 2018 13:14:12 -0700 Subject: [PATCH 16/18] rename deferred to sender --- Readme.md | 84 +-- TARGETS | 4 +- benchmarks/PushmiBenchmarks.cpp | 2 +- buildSingleHeader.cmake | 10 +- examples/include/bulk.h | 4 +- examples/include/no_fail.h | 4 +- examples/include/pool.h | 2 +- examples/set_done/set_done_2.cpp | 8 +- examples/set_error/set_error_2.cpp | 4 +- examples/then_execute/then_execute_2.cpp | 4 +- examples/twoway_execute/twoway_execute_2.cpp | 4 +- include/pushmi.h | 530 +++++++++--------- include/pushmi/boosters.h | 10 +- include/pushmi/executor.h | 4 +- ...single_deferred.h => flow_single_sender.h} | 76 +-- include/pushmi/forwards.h | 10 +- .../pushmi/{many_deferred.h => many_sender.h} | 86 +-- include/pushmi/new_thread.h | 2 +- include/pushmi/o/defer.h | 4 +- include/pushmi/o/empty.h | 8 +- include/pushmi/o/error.h | 6 +- include/pushmi/o/extension_operators.h | 26 +- include/pushmi/o/filter.h | 2 +- include/pushmi/o/from.h | 4 +- include/pushmi/o/just.h | 4 +- include/pushmi/o/on.h | 2 +- include/pushmi/o/submit.h | 2 +- include/pushmi/o/switch_on_error.h | 2 +- include/pushmi/o/tap.h | 14 +- include/pushmi/o/transform.h | 2 +- include/pushmi/o/via.h | 2 +- include/pushmi/{deferred.h => sender.h} | 76 +-- .../{single_deferred.h => single_sender.h} | 86 +-- include/pushmi/subject.h | 4 +- ...single_deferred.h => time_single_sender.h} | 96 ++-- include/pushmi/trampoline.h | 2 +- test/CompileTest.cpp | 52 +- test/FlowTest.cpp | 26 +- test/NewThreadTest.cpp | 14 +- test/PushmiTest.cpp | 16 +- test/TrampolineTest.cpp | 14 +- 41 files changed, 656 insertions(+), 656 deletions(-) rename include/pushmi/{flow_single_deferred.h => flow_single_sender.h} (69%) rename include/pushmi/{many_deferred.h => many_sender.h} (69%) rename include/pushmi/{deferred.h => sender.h} (71%) rename include/pushmi/{single_deferred.h => single_sender.h} (68%) rename include/pushmi/{time_single_deferred.h => time_single_sender.h} (68%) diff --git a/Readme.md b/Readme.md index 707c095..d82a2b0 100644 --- a/Readme.md +++ b/Readme.md @@ -157,121 +157,121 @@ auto s0 = single{single{}}; auto s1 = single{single{}}; ``` -## `deferred` +## `sender` -The `deferred` type in the library provides simple ways to construct new implementations of the NoneSender concept. +The `sender` type in the library provides simple ways to construct new implementations of the NoneSender concept. construct a producer of nothing, aka `never()` ```cpp -deferred<> d; +sender<> d; ``` construct new type using one or more lambdas, or with designated initializers, use multiple lambdas to build overload sets ```cpp -auto d0 = deferred{on_submit{[](auto out){}}}; -auto d1 = deferred{[](auto out){}}; -auto d2 = deferred{on_submit{[](none<> out){}, [](auto out){}}}; +auto d0 = sender{on_submit{[](auto out){}}}; +auto d1 = sender{[](auto out){}}; +auto d2 = sender{on_submit{[](none<> out){}, [](auto out){}}}; ``` -construct a new type with shared state across the lambdas. very useful for building a filter on top of an existing deferred. The state must be a NoneSender, but can be a super-set with additional state for this filter. +construct a new type with shared state across the lambdas. very useful for building a filter on top of an existing sender. The state must be a NoneSender, but can be a super-set with additional state for this filter. ```cpp -auto d0 = deferred{deferred{}}; +auto d0 = sender{sender{}}; -auto d1 = deferred{deferred{}, on_submit{ - [](deferred<>& in, auto out){in | submit(out);}}}; +auto d1 = sender{sender{}, on_submit{ + [](sender<>& in, auto out){in | submit(out);}}}; -auto d2 = deferred{deferred{}, - [](deferred<>& in, auto out){in | submit(out);}}; +auto d2 = sender{sender{}, + [](sender<>& in, auto out){in | submit(out);}}; ``` construct a type-erased type for a particular E (which could be a std::variant of supported types). I have a plan to provide operators to collapse values and errors to variant or tuple and then expand from variant or tuple back to their constituent values/errors. ```cpp -auto d0 = deferred<>{deferred{}}; -auto d1 = deferred{deferred{}}; +auto d0 = sender<>{sender{}}; +auto d1 = sender{sender{}}; ``` -## `single_deferred` +## `single_sender` -The `single_deferred` type in the library provides simple ways to construct new implementations of the SingleSender concept. +The `single_sender` type in the library provides simple ways to construct new implementations of the SingleSender concept. construct a producer of nothing, aka `never()` ```cpp -single_deferred<> sd; +single_sender<> sd; ``` construct new type using one or more lambdas, or with designated initializers, use multiple lambdas to build overload sets ```cpp -auto sd0 = single_deferred{on_submit{[](auto out){}}}; -auto sd1 = single_deferred{[](auto out){}}; -auto sd2 = single_deferred{on_submit{[](single<> out){}, [](auto out){}}}; +auto sd0 = single_sender{on_submit{[](auto out){}}}; +auto sd1 = single_sender{[](auto out){}}; +auto sd2 = single_sender{on_submit{[](single<> out){}, [](auto out){}}}; ``` -construct a new type with shared state across the lambdas. very useful for building a filter on top of an existing single_deferred. The state must be a SingleSender, but can be a super-set with additional state for this filter. +construct a new type with shared state across the lambdas. very useful for building a filter on top of an existing single_sender. The state must be a SingleSender, but can be a super-set with additional state for this filter. ```cpp -auto sd0 = single_deferred{single_deferred{}}; +auto sd0 = single_sender{single_sender{}}; -auto sd1 = single_deferred{single_deferred{}, on_submit{ - [](single_deferred<>& in, auto out){in | submit(out);}}}; +auto sd1 = single_sender{single_sender{}, on_submit{ + [](single_sender<>& in, auto out){in | submit(out);}}}; -auto sd2 = single_deferred{single_deferred{}, - [](single_deferred<>& in, auto out){in | submit(out);}}; +auto sd2 = single_sender{single_sender{}, + [](single_sender<>& in, auto out){in | submit(out);}}; ``` construct a type-erased type for a particular T & E (which could be a std::variant of supported types). I have a plan to provide operators to collapse values and errors to variant or tuple and then expand from variant or tuple back to their constituent values/errors. ```cpp -auto sd0 = single_deferred{single_deferred{}}; -auto sd1 = single_deferred{single_deferred{}}; +auto sd0 = single_sender{single_sender{}}; +auto sd1 = single_sender{single_sender{}}; ``` -## `time_single_deferred` +## `time_single_sender` -The `time_single_deferred` type in the library provides simple ways to construct new implementations of the TimeSingleSender concept. +The `time_single_sender` type in the library provides simple ways to construct new implementations of the TimeSingleSender concept. construct a producer of nothing, aka `never()` ```cpp -time_single_deferred<> tsd; +time_single_sender<> tsd; ``` construct new type using one or more lambdas, or with designated initializers, use multiple lambdas to build overload sets ```cpp -auto tsd0 = time_single_deferred{on_submit{[](auto at, auto out){}}}; -auto tsd1 = time_single_deferred{[](auto at, auto out){}}; -auto tsd2 = time_single_deferred{on_submit{[](auto at, single<> out){}, [](auto at, auto out){}}}; +auto tsd0 = time_single_sender{on_submit{[](auto at, auto out){}}}; +auto tsd1 = time_single_sender{[](auto at, auto out){}}; +auto tsd2 = time_single_sender{on_submit{[](auto at, single<> out){}, [](auto at, auto out){}}}; ``` -construct a new type with shared state across the lambdas. very useful for building a filter on top of an existing time_single_deferred. The state must be a SingleSender, but can be a super-set with additional state for this filter. +construct a new type with shared state across the lambdas. very useful for building a filter on top of an existing time_single_sender. The state must be a SingleSender, but can be a super-set with additional state for this filter. ```cpp -auto tsd0 = time_single_deferred{single_deferred{}}; +auto tsd0 = time_single_sender{single_sender{}}; -auto tsd1 = time_single_deferred{single_deferred{}, on_submit{ - [](time_single_deferred<>& in, auto at, auto out){in | submit(at, out);}}}; +auto tsd1 = time_single_sender{single_sender{}, on_submit{ + [](time_single_sender<>& in, auto at, auto out){in | submit(at, out);}}}; -auto tsd2 = time_single_deferred{single_deferred{}, - [](time_single_deferred<>& in, auto at, auto out){in | submit(at, out);}}; +auto tsd2 = time_single_sender{single_sender{}, + [](time_single_sender<>& in, auto at, auto out){in | submit(at, out);}}; ``` construct a type-erased type for a particular T & E (which could be a std::variant of supported types). I have a plan to provide operators to collapse values and errors to variant or tuple and then expand from variant or tuple back to their constituent values/errors. ```cpp -auto tsd0 = time_single_deferred{time_single_deferred{}}; -auto tsd1 = time_single_deferred{time_single_deferred{}}; +auto tsd0 = time_single_sender{time_single_sender{}}; +auto tsd1 = time_single_sender{time_single_sender{}}; ``` ## put it all together with some algorithms diff --git a/TARGETS b/TARGETS index 1669ad2..3cb22b8 100644 --- a/TARGETS +++ b/TARGETS @@ -7,11 +7,11 @@ cpp_library( "include/executor.h", "include/extension_points.h", "include/flowsingle.h", - "include/flowsingledeferred.h", + "include/flowsinglesender.h", "include/none.h", "include/piping.h", "include/single.h", - "include/singledeferred.h", + "include/singlesender.h", "include/traits.h", "include/trampoline.h", ], diff --git a/benchmarks/PushmiBenchmarks.cpp b/benchmarks/PushmiBenchmarks.cpp index 996ca50..7a171c9 100644 --- a/benchmarks/PushmiBenchmarks.cpp +++ b/benchmarks/PushmiBenchmarks.cpp @@ -11,7 +11,7 @@ #include "pushmi/none.h" #include "pushmi/flow_single.h" -#include "pushmi/flow_single_deferred.h" +#include "pushmi/flow_single_sender.h" #include "pushmi/entangle.h" #include "pool.h" diff --git a/buildSingleHeader.cmake b/buildSingleHeader.cmake index a1263ea..ec3b5f5 100644 --- a/buildSingleHeader.cmake +++ b/buildSingleHeader.cmake @@ -34,15 +34,15 @@ set(header_files "${CMAKE_CURRENT_SOURCE_DIR}/include/pushmi/boosters.h" "${CMAKE_CURRENT_SOURCE_DIR}/include/pushmi/piping.h" "${CMAKE_CURRENT_SOURCE_DIR}/include/pushmi/none.h" - "${CMAKE_CURRENT_SOURCE_DIR}/include/pushmi/deferred.h" + "${CMAKE_CURRENT_SOURCE_DIR}/include/pushmi/sender.h" "${CMAKE_CURRENT_SOURCE_DIR}/include/pushmi/single.h" - "${CMAKE_CURRENT_SOURCE_DIR}/include/pushmi/single_deferred.h" - "${CMAKE_CURRENT_SOURCE_DIR}/include/pushmi/time_single_deferred.h" + "${CMAKE_CURRENT_SOURCE_DIR}/include/pushmi/single_sender.h" + "${CMAKE_CURRENT_SOURCE_DIR}/include/pushmi/time_single_sender.h" "${CMAKE_CURRENT_SOURCE_DIR}/include/pushmi/executor.h" "${CMAKE_CURRENT_SOURCE_DIR}/include/pushmi/flow_single.h" - "${CMAKE_CURRENT_SOURCE_DIR}/include/pushmi/flow_single_deferred.h" + "${CMAKE_CURRENT_SOURCE_DIR}/include/pushmi/flow_single_sender.h" "${CMAKE_CURRENT_SOURCE_DIR}/include/pushmi/many.h" - "${CMAKE_CURRENT_SOURCE_DIR}/include/pushmi/many_deferred.h" + "${CMAKE_CURRENT_SOURCE_DIR}/include/pushmi/many_sender.h" "${CMAKE_CURRENT_SOURCE_DIR}/include/pushmi/trampoline.h" "${CMAKE_CURRENT_SOURCE_DIR}/include/pushmi/new_thread.h" "${CMAKE_CURRENT_SOURCE_DIR}/include/pushmi/o/extension_operators.h" diff --git a/examples/include/bulk.h b/examples/include/bulk.h index 1cc7c18..d13bd6b 100644 --- a/examples/include/bulk.h +++ b/examples/include/bulk.h @@ -5,7 +5,7 @@ // This source code is licensed under the MIT license found in the // LICENSE file in the root directory of this source tree. -#include +#include namespace pushmi { @@ -21,7 +21,7 @@ PUSHMI_INLINE_VAR constexpr struct bulk_fn { IF&& initFunc, RS&& selector) const { return [func, sb, se, driver, initFunc, selector](auto in){ - return make_single_deferred( + return make_single_sender( [in, func, sb, se, driver, initFunc, selector](auto out) mutable { submit(in, make_single(std::move(out), [func, sb, se, driver, initFunc, selector](auto& out, auto input) { diff --git a/examples/include/no_fail.h b/examples/include/no_fail.h index 2b0df49..ba0100a 100644 --- a/examples/include/no_fail.h +++ b/examples/include/no_fail.h @@ -4,7 +4,7 @@ // This source code is licensed under the MIT license found in the // LICENSE file in the root directory of this source tree. -#include +#include #include namespace pushmi { @@ -33,7 +33,7 @@ struct no_fail_fn { PUSHMI_TEMPLATE(class In) (requires Sender) auto operator()(In in) const { - return ::pushmi::detail::deferred_from( + return ::pushmi::detail::sender_from( std::move(in), ::pushmi::detail::submit_transform_out(out_impl{}) ); diff --git a/examples/include/pool.h b/examples/include/pool.h index adcd0ad..947eba0 100644 --- a/examples/include/pool.h +++ b/examples/include/pool.h @@ -45,7 +45,7 @@ class pool { inline auto executor() { auto exec = execution::require(p.executor(), execution::never_blocking, execution::oneway); - return MAKE(time_single_deferred)(__pool_submit{exec}); + return MAKE(time_single_sender)(__pool_submit{exec}); } inline void stop() {p.stop();} diff --git a/examples/set_done/set_done_2.cpp b/examples/set_done/set_done_2.cpp index 2fb86ec..e3cd434 100644 --- a/examples/set_done/set_done_2.cpp +++ b/examples/set_done/set_done_2.cpp @@ -14,7 +14,7 @@ using namespace pushmi::aliases; const bool setting_exists = false; auto get_setting() { - return mi::make_single_deferred( + return mi::make_single_sender( [](auto out){ if(setting_exists) { op::just(42) | op::submit(out); @@ -31,7 +31,7 @@ auto println = [](auto v){std::cout << v << std::endl;}; template auto concat = [](auto in){ - return mi::make_single_deferred( + return mi::make_single_sender( [in](auto out) mutable { ::pushmi::submit(in, mi::make_single(out, [](auto out, auto v){ @@ -54,9 +54,9 @@ int main() op::just(42) | op::transform([](int i) { if (i < 42) { - return mi::any_single_deferred{op::empty()}; + return mi::any_single_sender{op::empty()}; } - return mi::any_single_deferred{op::just(std::to_string(i))}; + return mi::any_single_sender{op::just(std::to_string(i))}; }) | concat | op::submit(println); diff --git a/examples/set_error/set_error_2.cpp b/examples/set_error/set_error_2.cpp index 2bfa540..e078b99 100644 --- a/examples/set_error/set_error_2.cpp +++ b/examples/set_error/set_error_2.cpp @@ -16,7 +16,7 @@ using namespace pushmi::aliases; template auto concat = [](auto in){ - return mi::make_single_deferred( + return mi::make_single_sender( [in](auto out) mutable { ::pushmi::submit(in, mi::make_single(out, [](auto out, auto v){ @@ -63,7 +63,7 @@ int main() op::just(42) | op::transform([](auto v) { - using r_t = mi::any_single_deferred; + using r_t = mi::any_single_sender; if (v < 40) { return r_t{op::error(std::exception_ptr{})}; } else { diff --git a/examples/then_execute/then_execute_2.cpp b/examples/then_execute/then_execute_2.cpp index 94ce6c2..67da81c 100644 --- a/examples/then_execute/then_execute_2.cpp +++ b/examples/then_execute/then_execute_2.cpp @@ -13,8 +13,8 @@ #include -#include -#include +#include +#include #include #include #include diff --git a/examples/twoway_execute/twoway_execute_2.cpp b/examples/twoway_execute/twoway_execute_2.cpp index f1d019c..d4ee172 100644 --- a/examples/twoway_execute/twoway_execute_2.cpp +++ b/examples/twoway_execute/twoway_execute_2.cpp @@ -12,8 +12,8 @@ #include -#include -#include +#include +#include #include using namespace pushmi::aliases; diff --git a/include/pushmi.h b/include/pushmi.h index 98a652f..b9be6ca 100644 --- a/include/pushmi.h +++ b/include/pushmi.h @@ -1150,7 +1150,7 @@ template class none; template -class deferred; +class sender; template class single; @@ -1159,19 +1159,19 @@ template class many; template -class single_deferred; +class single_sender; template -class many_deferred; +class many_sender; template -class time_single_deferred; +class time_single_sender; template class flow_single; template -class flow_single_deferred; +class flow_single_sender; template< class E = std::exception_ptr, @@ -2370,19 +2370,19 @@ template<> struct construct_deduced; template<> -struct construct_deduced; +struct construct_deduced; template<> -struct construct_deduced; +struct construct_deduced; template<> -struct construct_deduced; +struct construct_deduced; template<> -struct construct_deduced; +struct construct_deduced; template<> -struct construct_deduced; +struct construct_deduced; template