diff --git a/libs/full/include/include/hpx/include/parallel_copy.hpp b/libs/full/include/include/hpx/include/parallel_copy.hpp index 9a6be3def307..ab47416c6cba 100644 --- a/libs/full/include/include/hpx/include/parallel_copy.hpp +++ b/libs/full/include/include/hpx/include/parallel_copy.hpp @@ -8,3 +8,4 @@ #pragma once #include +#include diff --git a/libs/full/segmented_algorithms/CMakeLists.txt b/libs/full/segmented_algorithms/CMakeLists.txt index a3acd7912b97..413c68e3f71b 100644 --- a/libs/full/segmented_algorithms/CMakeLists.txt +++ b/libs/full/segmented_algorithms/CMakeLists.txt @@ -13,6 +13,7 @@ set(segmented_algorithms_headers hpx/parallel/segmented_algorithms/adjacent_difference.hpp hpx/parallel/segmented_algorithms/adjacent_find.hpp hpx/parallel/segmented_algorithms/all_any_none.hpp + hpx/parallel/segmented_algorithms/copy.hpp hpx/parallel/segmented_algorithms/count.hpp hpx/parallel/segmented_algorithms/detail/dispatch.hpp hpx/parallel/segmented_algorithms/detail/reduce.hpp diff --git a/libs/full/segmented_algorithms/include/hpx/parallel/segmented_algorithm.hpp b/libs/full/segmented_algorithms/include/hpx/parallel/segmented_algorithm.hpp index 195b04fdb55e..3fb9230c35d5 100644 --- a/libs/full/segmented_algorithms/include/hpx/parallel/segmented_algorithm.hpp +++ b/libs/full/segmented_algorithms/include/hpx/parallel/segmented_algorithm.hpp @@ -12,6 +12,7 @@ #include #include #include +#include #include #include #include diff --git a/libs/full/segmented_algorithms/include/hpx/parallel/segmented_algorithms/copy.hpp b/libs/full/segmented_algorithms/include/hpx/parallel/segmented_algorithms/copy.hpp new file mode 100644 index 000000000000..5d0c4c36c7b1 --- /dev/null +++ b/libs/full/segmented_algorithms/include/hpx/parallel/segmented_algorithms/copy.hpp @@ -0,0 +1,314 @@ +// Copyright (c) 2026 Arpit Khandelwal +// +// SPDX-License-Identifier: BSL-1.0 +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) + +#pragma once + +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace hpx::parallel { + + /////////////////////////////////////////////////////////////////////////// + // segmented_copy + namespace detail { + /////////////////////////////////////////////////////////////////////// + /// \cond NOINTERNAL + + // sequential remote implementation + template + static util::detail::algorithm_result_t> + segmented_copy(Algo&& algo, ExPolicy const& policy, SegIter first, + SegIter last, OutIter dest, std::true_type) + { + using traits1 = hpx::traits::segmented_iterator_traits; + using traits2 = hpx::traits::segmented_iterator_traits; + using segment_iterator1 = typename traits1::segment_iterator; + using local_iterator_type1 = typename traits1::local_iterator; + using segment_iterator2 = typename traits2::segment_iterator; + using local_iterator_type2 = typename traits2::local_iterator; + + using result = util::detail::algorithm_result>; + + segment_iterator1 sit = traits1::segment(first); + segment_iterator1 send = traits1::segment(last); + segment_iterator2 sdest = traits2::segment(dest); + + if (sit == send) + { + // all elements are on the same partition + local_iterator_type1 beg = traits1::local(first); + local_iterator_type1 end = traits1::local(last); + local_iterator_type2 ldest = traits2::local(dest); + if (beg != end) + { + util::in_out_result + out = dispatch(traits2::get_id(sdest), algo, policy, + std::true_type(), beg, end, ldest); + last = traits1::compose(send, out.in); + dest = traits2::compose(sdest, out.out); + } + } + else + { + // handle the remaining part of the first partition + local_iterator_type1 beg = traits1::local(first); + local_iterator_type1 end = traits1::end(sit); + local_iterator_type2 ldest = traits2::local(dest); + util::in_out_result + out{beg, ldest}; + if (beg != end) + { + out = dispatch(traits2::get_id(sdest), algo, policy, + std::true_type(), beg, end, ldest); + } + + // handle all of the full partitions + for (++sit, ++sdest; sit != send; ++sit, ++sdest) + { + beg = traits1::begin(sit); + end = traits1::end(sit); + ldest = traits2::begin(sdest); + out = util::in_out_result{beg, ldest}; + if (beg != end) + { + out = dispatch(traits2::get_id(sdest), algo, policy, + std::true_type(), beg, end, ldest); + } + } + + // handle the beginning of the last partition + beg = traits1::begin(sit); + end = traits1::local(last); + ldest = traits2::begin(sdest); + out = util::in_out_result{beg, ldest}; + if (beg != end) + { + out = dispatch(traits2::get_id(sdest), algo, policy, + std::true_type(), beg, end, ldest); + } + last = traits1::compose(send, out.in); + dest = traits2::compose(sdest, out.out); + } + return result::get(util::in_out_result{ + HPX_MOVE(last), HPX_MOVE(dest)}); + } + + // parallel remote implementation + template + static util::detail::algorithm_result_t> + segmented_copy(Algo&& algo, ExPolicy const& policy, SegIter first, + SegIter last, OutIter dest, std::false_type) + { + using traits1 = hpx::traits::segmented_iterator_traits; + using traits2 = hpx::traits::segmented_iterator_traits; + using segment_iterator1 = typename traits1::segment_iterator; + using local_iterator_type1 = typename traits1::local_iterator; + using segment_iterator2 = typename traits2::segment_iterator; + using local_iterator_type2 = typename traits2::local_iterator; + + using result = util::detail::algorithm_result>; + + using forced_seq = std::integral_constant>; + + segment_iterator1 sit = traits1::segment(first); + segment_iterator1 send = traits1::segment(last); + segment_iterator2 sdest = traits2::segment(dest); + + using segment_type = + std::vector>>; + segment_type segments; + segments.reserve(std::distance(sit, send)); + + if (sit == send) + { + // all elements are on the same partition + local_iterator_type1 beg = traits1::local(first); + local_iterator_type1 end = traits1::local(last); + local_iterator_type2 ldest = traits2::local(dest); + if (beg != end) + { + segments.push_back(dispatch_async(traits2::get_id(sdest), + algo, policy, forced_seq(), beg, end, ldest)); + } + else + { + segments.push_back(hpx::make_ready_future( + util::in_out_result{beg, ldest})); + } + } + else + { + // handle the remaining part of the first partition + local_iterator_type1 beg = traits1::local(first); + local_iterator_type1 end = traits1::end(sit); + local_iterator_type2 ldest = traits2::local(dest); + if (beg != end) + { + segments.push_back(dispatch_async(traits2::get_id(sdest), + algo, policy, forced_seq(), beg, end, ldest)); + } + else + { + segments.push_back(hpx::make_ready_future( + util::in_out_result{beg, ldest})); + } + + // handle all of the full partitions + for (++sit, ++sdest; sit != send; ++sit, ++sdest) + { + beg = traits1::begin(sit); + end = traits1::end(sit); + ldest = traits2::begin(sdest); + if (beg != end) + { + segments.push_back( + dispatch_async(traits2::get_id(sdest), algo, policy, + forced_seq(), beg, end, ldest)); + } + else + { + segments.push_back(hpx::make_ready_future( + util::in_out_result{beg, ldest})); + } + } + + // handle the beginning of the last partition + beg = traits1::begin(sit); + end = traits1::local(last); + ldest = traits2::begin(sdest); + if (beg != end) + { + segments.push_back(dispatch_async(traits2::get_id(sdest), + algo, policy, forced_seq(), beg, end, ldest)); + } + else + { + segments.push_back(hpx::make_ready_future( + util::in_out_result{beg, ldest})); + } + } + + return result::get(dataflow( + [=](segment_type&& r) -> util::in_out_result { + // handle any remote exceptions, will throw on error + std::list errors; + parallel::util::detail::handle_remote_exceptions< + ExPolicy>::call(r, errors); + auto ft = r.back().get(); + auto olast = traits1::compose(send, ft.in); + auto odest = traits2::compose(sdest, ft.out); + return util::in_out_result{olast, odest}; + }, + HPX_MOVE(segments))); + } + + } // namespace detail + /// \endcond +} // namespace hpx::parallel + +// The segmented iterators we support all live in namespace hpx::segmented +namespace hpx::segmented { + + /////////////////////////////////////////////////////////////////////////// + // segmented copy — tag_invoke overloads + + // no-policy (sequential) overload + template + requires(hpx::traits::is_iterator_v && + hpx::traits::is_segmented_iterator_v && + hpx::traits::is_iterator_v && + hpx::traits::is_segmented_iterator_v) + OutIter tag_invoke(hpx::copy_t, SegIter first, SegIter last, OutIter dest) + { + static_assert(hpx::traits::is_input_iterator_v, + "Requires at least input iterator."); + + if (first == last) + { + return HPX_MOVE(dest); + } + + using iterator_traits1 = + hpx::traits::segmented_iterator_traits; + using iterator_traits2 = + hpx::traits::segmented_iterator_traits; + + auto result = hpx::parallel::detail::segmented_copy( + hpx::parallel::detail::copy>(), + hpx::execution::seq, first, last, dest, std::true_type{}); + + return HPX_MOVE(result.out); + } + + // execution-policy overload + template + requires(hpx::is_execution_policy_v && + hpx::traits::is_iterator_v && + hpx::traits::is_segmented_iterator_v && + hpx::traits::is_iterator_v && + hpx::traits::is_segmented_iterator_v) + hpx::parallel::util::detail::algorithm_result_t + tag_invoke(hpx::copy_t, ExPolicy&& policy, SegIter first, SegIter last, + OutIter dest) + { + static_assert(hpx::traits::is_forward_iterator_v, + "Requires at least forward iterator."); + + using is_seq = typename hpx::is_sequenced_execution_policy< + std::decay_t>::type; + + if (first == last) + { + using result = + hpx::parallel::util::detail::algorithm_result; + return result::get(HPX_MOVE(dest)); + } + + using iterator_traits1 = + hpx::traits::segmented_iterator_traits; + using iterator_traits2 = + hpx::traits::segmented_iterator_traits; + + return hpx::parallel::util::get_second_element( + hpx::parallel::detail::segmented_copy( + hpx::parallel::detail::copy>(), + HPX_FORWARD(ExPolicy, policy), first, last, dest, is_seq{})); + } + +} // namespace hpx::segmented