From 85693a0d1b29e34febba5700598e14b20b72f8fd Mon Sep 17 00:00:00 2001 From: arpittkhandelwal Date: Wed, 6 May 2026 08:05:25 +0530 Subject: [PATCH 1/2] Add segmented_copy parallel algorithm --- .../include/hpx/include/parallel_copy.hpp | 1 + libs/full/segmented_algorithms/CMakeLists.txt | 1 + .../hpx/parallel/segmented_algorithm.hpp | 1 + .../parallel/segmented_algorithms/copy.hpp | 379 ++++++++++++++++++ 4 files changed, 382 insertions(+) create mode 100644 libs/full/segmented_algorithms/include/hpx/parallel/segmented_algorithms/copy.hpp diff --git a/libs/full/include/include/hpx/include/parallel_copy.hpp b/libs/full/include/include/hpx/include/parallel_copy.hpp index 9a6be3def307..ab47416c6cba 100644 --- a/libs/full/include/include/hpx/include/parallel_copy.hpp +++ b/libs/full/include/include/hpx/include/parallel_copy.hpp @@ -8,3 +8,4 @@ #pragma once #include +#include diff --git a/libs/full/segmented_algorithms/CMakeLists.txt b/libs/full/segmented_algorithms/CMakeLists.txt index a3acd7912b97..413c68e3f71b 100644 --- a/libs/full/segmented_algorithms/CMakeLists.txt +++ b/libs/full/segmented_algorithms/CMakeLists.txt @@ -13,6 +13,7 @@ set(segmented_algorithms_headers hpx/parallel/segmented_algorithms/adjacent_difference.hpp hpx/parallel/segmented_algorithms/adjacent_find.hpp hpx/parallel/segmented_algorithms/all_any_none.hpp + hpx/parallel/segmented_algorithms/copy.hpp hpx/parallel/segmented_algorithms/count.hpp hpx/parallel/segmented_algorithms/detail/dispatch.hpp hpx/parallel/segmented_algorithms/detail/reduce.hpp diff --git a/libs/full/segmented_algorithms/include/hpx/parallel/segmented_algorithm.hpp b/libs/full/segmented_algorithms/include/hpx/parallel/segmented_algorithm.hpp index 195b04fdb55e..3fb9230c35d5 100644 --- a/libs/full/segmented_algorithms/include/hpx/parallel/segmented_algorithm.hpp +++ b/libs/full/segmented_algorithms/include/hpx/parallel/segmented_algorithm.hpp @@ -12,6 +12,7 @@ #include #include #include +#include #include #include #include diff --git a/libs/full/segmented_algorithms/include/hpx/parallel/segmented_algorithms/copy.hpp b/libs/full/segmented_algorithms/include/hpx/parallel/segmented_algorithms/copy.hpp new file mode 100644 index 000000000000..c840bdd4989a --- /dev/null +++ b/libs/full/segmented_algorithms/include/hpx/parallel/segmented_algorithms/copy.hpp @@ -0,0 +1,379 @@ +// Copyright (c) 2026 Arpit Khandelwal +// +// SPDX-License-Identifier: BSL-1.0 +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) + +#pragma once + +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace hpx::parallel { + + /////////////////////////////////////////////////////////////////////////// + // segmented_copy + namespace detail { + /////////////////////////////////////////////////////////////////////// + /// \cond NOINTERNAL + + // sequential remote implementation + template + static util::detail::algorithm_result_t> + segmented_copy(Algo&& algo, ExPolicy const& policy, SegIter first, + SegIter last, OutIter dest, std::true_type) + { + using traits1 = hpx::traits::segmented_iterator_traits; + using traits2 = hpx::traits::segmented_iterator_traits; + using segment_iterator1 = typename traits1::segment_iterator; + using local_iterator_type1 = typename traits1::local_iterator; + using segment_iterator2 = typename traits2::segment_iterator; + using local_iterator_type2 = typename traits2::local_iterator; + + using result = util::detail::algorithm_result>; + + segment_iterator1 sit = traits1::segment(first); + segment_iterator1 send = traits1::segment(last); + segment_iterator2 sdest = traits2::segment(dest); + + if (sit == send) + { + // all elements are on the same partition + local_iterator_type1 beg = traits1::local(first); + local_iterator_type1 end = traits1::local(last); + local_iterator_type2 ldest = traits2::local(dest); + if (beg != end) + { + util::in_out_result + out = dispatch(traits2::get_id(sdest), algo, policy, + std::true_type(), beg, end, ldest); + last = traits1::compose(send, out.in); + dest = traits2::compose(sdest, out.out); + } + } + else + { + // handle the remaining part of the first partition + local_iterator_type1 beg = traits1::local(first); + local_iterator_type1 end = traits1::end(sit); + local_iterator_type2 ldest = traits2::local(dest); + util::in_out_result + out{beg, ldest}; + if (beg != end) + { + out = dispatch(traits2::get_id(sdest), algo, policy, + std::true_type(), beg, end, ldest); + } + + // handle all of the full partitions + for (++sit, ++sdest; sit != send; ++sit, ++sdest) + { + beg = traits1::begin(sit); + end = traits1::end(sit); + ldest = traits2::begin(sdest); + out = util::in_out_result{beg, ldest}; + if (beg != end) + { + out = dispatch(traits2::get_id(sdest), algo, policy, + std::true_type(), beg, end, ldest); + } + } + + // handle the beginning of the last partition + beg = traits1::begin(sit); + end = traits1::local(last); + ldest = traits2::begin(sdest); + out = util::in_out_result{beg, ldest}; + if (beg != end) + { + out = dispatch(traits2::get_id(sdest), algo, policy, + std::true_type(), beg, end, ldest); + } + last = traits1::compose(send, out.in); + dest = traits2::compose(sdest, out.out); + } + return result::get(util::in_out_result{ + HPX_MOVE(last), HPX_MOVE(dest)}); + } + + // parallel remote implementation + template + static util::detail::algorithm_result_t> + segmented_copy(Algo&& algo, ExPolicy const& policy, SegIter first, + SegIter last, OutIter dest, std::false_type) + { + using traits1 = hpx::traits::segmented_iterator_traits; + using traits2 = hpx::traits::segmented_iterator_traits; + using segment_iterator1 = typename traits1::segment_iterator; + using local_iterator_type1 = typename traits1::local_iterator; + using segment_iterator2 = typename traits2::segment_iterator; + using local_iterator_type2 = typename traits2::local_iterator; + + using result = util::detail::algorithm_result>; + + using forced_seq = std::integral_constant>; + + segment_iterator1 sit = traits1::segment(first); + segment_iterator1 send = traits1::segment(last); + segment_iterator2 sdest = traits2::segment(dest); + + using segment_type = + std::vector>>; + segment_type segments; + segments.reserve(std::distance(sit, send)); + + if (sit == send) + { + // all elements are on the same partition + local_iterator_type1 beg = traits1::local(first); + local_iterator_type1 end = traits1::local(last); + local_iterator_type2 ldest = traits2::local(dest); + if (beg != end) + { + segments.push_back(dispatch_async(traits2::get_id(sdest), + algo, policy, forced_seq(), beg, end, ldest)); + } + else + { + segments.push_back(hpx::make_ready_future( + util::in_out_result{beg, ldest})); + } + } + else + { + // handle the remaining part of the first partition + local_iterator_type1 beg = traits1::local(first); + local_iterator_type1 end = traits1::end(sit); + local_iterator_type2 ldest = traits2::local(dest); + if (beg != end) + { + segments.push_back(dispatch_async(traits2::get_id(sdest), + algo, policy, forced_seq(), beg, end, ldest)); + } + else + { + segments.push_back(hpx::make_ready_future( + util::in_out_result{beg, ldest})); + } + + // handle all of the full partitions + for (++sit, ++sdest; sit != send; ++sit, ++sdest) + { + beg = traits1::begin(sit); + end = traits1::end(sit); + ldest = traits2::begin(sdest); + if (beg != end) + { + segments.push_back( + dispatch_async(traits2::get_id(sdest), algo, policy, + forced_seq(), beg, end, ldest)); + } + else + { + segments.push_back(hpx::make_ready_future( + util::in_out_result{beg, ldest})); + } + } + + // handle the beginning of the last partition + beg = traits1::begin(sit); + end = traits1::local(last); + ldest = traits2::begin(sdest); + if (beg != end) + { + segments.push_back(dispatch_async(traits2::get_id(sdest), + algo, policy, forced_seq(), beg, end, ldest)); + } + else + { + segments.push_back(hpx::make_ready_future( + util::in_out_result{beg, ldest})); + } + } + + return result::get(dataflow( + [=](segment_type&& r) -> util::in_out_result { + // handle any remote exceptions, will throw on error + std::list errors; + parallel::util::detail::handle_remote_exceptions< + ExPolicy>::call(r, errors); + auto ft = r.back().get(); + auto olast = traits1::compose(send, ft.in); + auto odest = traits2::compose(sdest, ft.out); + return util::in_out_result{olast, odest}; + }, + HPX_MOVE(segments))); + } + + } // namespace detail + /// \endcond +} // namespace hpx::parallel + +// The segmented iterators we support all live in namespace hpx::segmented +namespace hpx::segmented { + + /////////////////////////////////////////////////////////////////////////// + // segmented copy — tag_invoke overloads + + // no-policy (sequential) overload + template + requires(hpx::traits::is_iterator_v && + hpx::traits::is_segmented_iterator_v && + hpx::traits::is_iterator_v && + hpx::traits::is_segmented_iterator_v) + OutIter tag_invoke(hpx::copy_t, SegIter first, SegIter last, OutIter dest) + { + static_assert(hpx::traits::is_input_iterator_v, + "Requires at least input iterator."); + + if (first == last) + { + return HPX_MOVE(dest); + } + + using iterator_traits1 = + hpx::traits::segmented_iterator_traits; + using iterator_traits2 = + hpx::traits::segmented_iterator_traits; + + auto result = hpx::parallel::detail::segmented_copy( + hpx::parallel::detail::copy>(), + hpx::execution::seq, first, last, dest, std::true_type{}); + + return HPX_MOVE(result.out); + } + + // execution-policy overload + template + requires(hpx::is_execution_policy_v && + hpx::traits::is_iterator_v && + hpx::traits::is_segmented_iterator_v && + hpx::traits::is_iterator_v && + hpx::traits::is_segmented_iterator_v) + hpx::parallel::util::detail::algorithm_result_t + tag_invoke(hpx::copy_t, ExPolicy&& policy, SegIter first, SegIter last, + OutIter dest) + { + static_assert(hpx::traits::is_forward_iterator_v, + "Requires at least forward iterator."); + + using is_seq = typename hpx::is_sequenced_execution_policy< + std::decay_t>::type; + + if (first == last) + { + using result = + hpx::parallel::util::detail::algorithm_result; + return result::get(HPX_MOVE(dest)); + } + + using iterator_traits1 = + hpx::traits::segmented_iterator_traits; + using iterator_traits2 = + hpx::traits::segmented_iterator_traits; + + return hpx::parallel::util::get_second_element( + hpx::parallel::detail::segmented_copy( + hpx::parallel::detail::copy>(), + HPX_FORWARD(ExPolicy, policy), first, last, dest, is_seq{})); + } + + /////////////////////////////////////////////////////////////////////////// + // segmented copy_if — tag_invoke overloads + // + // copy_if must write accepted elements contiguously into the destination. + // Because the number of elements accepted per source segment is not known + // until the segment has been scanned, we cannot reset the destination + // iterator for each segment independently (that would cause gaps or + // overwrites). We therefore use a simple sequential element-by-element + // fallback that correctly advances a single destination cursor. + + // no-policy (sequential) overload + template + requires(hpx::traits::is_iterator_v && + hpx::traits::is_segmented_iterator_v && + hpx::traits::is_iterator_v && + hpx::traits::is_segmented_iterator_v) + OutIter tag_invoke( + hpx::copy_if_t, SegIter first, SegIter last, OutIter dest, Pred&& pred) + { + static_assert(hpx::traits::is_input_iterator_v, + "Requires at least input iterator."); + + for (; first != last; ++first) + { + if (HPX_INVOKE(pred, *first)) + { + *dest = *first; + ++dest; + } + } + return dest; + } + + // execution-policy overload — sequential policies run the same loop; + // parallel policies also run sequentially here because a truly parallel + // segmented copy_if requires a two-phase (count + scan) approach that is + // deferred to a future PR. + template + requires(hpx::is_execution_policy_v && + hpx::traits::is_iterator_v && + hpx::traits::is_segmented_iterator_v && + hpx::traits::is_iterator_v && + hpx::traits::is_segmented_iterator_v) + hpx::parallel::util::detail::algorithm_result_t + tag_invoke(hpx::copy_if_t, ExPolicy&& /* policy */, SegIter first, + SegIter last, OutIter dest, Pred&& pred) + { + static_assert(hpx::traits::is_forward_iterator_v, + "Requires at least forward iterator."); + + using result = + hpx::parallel::util::detail::algorithm_result; + + for (; first != last; ++first) + { + if (HPX_INVOKE(pred, *first)) + { + *dest = *first; + ++dest; + } + } + return result::get(HPX_MOVE(dest)); + } + +} // namespace hpx::segmented From 53081efb8befd081c16819b4edc7386f1863ef76 Mon Sep 17 00:00:00 2001 From: arpittkhandelwal Date: Wed, 6 May 2026 22:24:04 +0530 Subject: [PATCH 2/2] Remove copy_if from segmented_algorithms --- .../parallel/segmented_algorithms/copy.hpp | 65 ------------------- 1 file changed, 65 deletions(-) diff --git a/libs/full/segmented_algorithms/include/hpx/parallel/segmented_algorithms/copy.hpp b/libs/full/segmented_algorithms/include/hpx/parallel/segmented_algorithms/copy.hpp index c840bdd4989a..5d0c4c36c7b1 100644 --- a/libs/full/segmented_algorithms/include/hpx/parallel/segmented_algorithms/copy.hpp +++ b/libs/full/segmented_algorithms/include/hpx/parallel/segmented_algorithms/copy.hpp @@ -311,69 +311,4 @@ namespace hpx::segmented { HPX_FORWARD(ExPolicy, policy), first, last, dest, is_seq{})); } - /////////////////////////////////////////////////////////////////////////// - // segmented copy_if — tag_invoke overloads - // - // copy_if must write accepted elements contiguously into the destination. - // Because the number of elements accepted per source segment is not known - // until the segment has been scanned, we cannot reset the destination - // iterator for each segment independently (that would cause gaps or - // overwrites). We therefore use a simple sequential element-by-element - // fallback that correctly advances a single destination cursor. - - // no-policy (sequential) overload - template - requires(hpx::traits::is_iterator_v && - hpx::traits::is_segmented_iterator_v && - hpx::traits::is_iterator_v && - hpx::traits::is_segmented_iterator_v) - OutIter tag_invoke( - hpx::copy_if_t, SegIter first, SegIter last, OutIter dest, Pred&& pred) - { - static_assert(hpx::traits::is_input_iterator_v, - "Requires at least input iterator."); - - for (; first != last; ++first) - { - if (HPX_INVOKE(pred, *first)) - { - *dest = *first; - ++dest; - } - } - return dest; - } - - // execution-policy overload — sequential policies run the same loop; - // parallel policies also run sequentially here because a truly parallel - // segmented copy_if requires a two-phase (count + scan) approach that is - // deferred to a future PR. - template - requires(hpx::is_execution_policy_v && - hpx::traits::is_iterator_v && - hpx::traits::is_segmented_iterator_v && - hpx::traits::is_iterator_v && - hpx::traits::is_segmented_iterator_v) - hpx::parallel::util::detail::algorithm_result_t - tag_invoke(hpx::copy_if_t, ExPolicy&& /* policy */, SegIter first, - SegIter last, OutIter dest, Pred&& pred) - { - static_assert(hpx::traits::is_forward_iterator_v, - "Requires at least forward iterator."); - - using result = - hpx::parallel::util::detail::algorithm_result; - - for (; first != last; ++first) - { - if (HPX_INVOKE(pred, *first)) - { - *dest = *first; - ++dest; - } - } - return result::get(HPX_MOVE(dest)); - } - } // namespace hpx::segmented