Line data Source code
1 0 : // Distributed under the MIT License.
2 : // See LICENSE.txt for details.
3 :
4 : #pragma once
5 :
6 : #include <charm++.h>
7 : #include <tuple>
8 :
9 : #include "Parallel/CharmRegistration.hpp"
10 : #include "Parallel/GlobalCache.hpp"
11 : #include "Parallel/Local.hpp"
12 : #include "Parallel/Section.hpp"
13 : #include "Parallel/TypeTraits.hpp"
14 : #include "Utilities/ConstantExpressions.hpp"
15 : #include "Utilities/Functional.hpp"
16 : #include "Utilities/Gsl.hpp"
17 : #include "Utilities/Requires.hpp"
18 : #include "Utilities/TypeTraits.hpp"
19 : #include "Utilities/TypeTraits/IsA.hpp"
20 :
21 : namespace Parallel {
22 : /// \cond
23 : template <class... Ts>
24 : struct ReductionData;
25 : /// \endcond
26 :
27 : namespace detail {
28 : /*!
29 : * \ingroup ParallelGroup
30 : * \brief Convert a `ReductionData` to a `CkReductionMsg`. Used in custom
31 : * reducers.
32 : */
33 : template <class... Ts>
34 : CkReductionMsg* new_reduction_msg(ReductionData<Ts...>& reduction_data) {
35 : return CkReductionMsg::buildNew(static_cast<int>(reduction_data.size()),
36 : reduction_data.packed().get());
37 : }
38 : } // namespace detail
39 :
40 : /*!
41 : * \ingroup ParallelGroup
42 : * \brief The data to be reduced, and invokables to be called whenever two
43 : * reduction messages are combined and after the reduction has been completed.
44 : *
45 : * `InvokeCombine` is a binary invokable that maps `(T current_state, T element)
46 : * -> T`, where the `current_state` is the result of reductions so far. The
47 : * `InvokeFinal` is an n-ary that takes as its first argument a `T
48 : * result_of_reduction` and is invoked once after the reduction is completed.
49 : * The additional arguments correspond to the resultant data of earlier
50 : * `ReductionDatum` template parameters in the `ReductionData`, and are
51 : * identified via the `InvokeFinalExtraArgsIndices`, which must be a
52 : * `std::index_sequence`. Specifically, say you want the third
53 : * `ReductionDatum`'s `InvokeFinal` to be passed the first `ReductionDatum` then
54 : * `std::index_sequence<0>` would be passed for `InvokeFinalExtraArgsIndices`.
55 : * Here is an example of computing the RMS error of the evolved variables `u`
56 : * and `v`:
57 : *
58 : * \snippet Test_AlgorithmReduction.cpp contribute_to_rms_reduction
59 : *
60 : * with the receiving action:
61 : *
62 : * \snippet Test_AlgorithmReduction.cpp reduce_rms_action
63 : */
64 : template <class T, class InvokeCombine, class InvokeFinal = funcl::Identity,
65 : class InvokeFinalExtraArgsIndices = std::index_sequence<>>
66 1 : struct ReductionDatum {
67 0 : using value_type = T;
68 0 : using invoke_combine = InvokeCombine;
69 0 : using invoke_final = InvokeFinal;
70 0 : using invoke_final_extra_args_indices = InvokeFinalExtraArgsIndices;
71 0 : T value;
72 : };
73 :
74 : /*!
75 : * \ingroup ParallelGroup
76 : * \brief Used for reducing a possibly heterogeneous collection of types in a
77 : * single reduction call
78 : */
79 : template <class... Ts, class... InvokeCombines, class... InvokeFinals,
80 : class... InvokeFinalExtraArgsIndices>
81 1 : struct ReductionData<ReductionDatum<Ts, InvokeCombines, InvokeFinals,
82 : InvokeFinalExtraArgsIndices>...> {
83 : static_assert(sizeof...(Ts) > 0,
84 : "Must be reducing at least one piece of data.");
85 0 : static constexpr size_t pack_size() { return sizeof...(Ts); }
86 0 : using datum_list = tmpl::list<ReductionDatum<Ts, InvokeCombines, InvokeFinals,
87 : InvokeFinalExtraArgsIndices>...>;
88 :
89 0 : explicit ReductionData(ReductionDatum<Ts, InvokeCombines, InvokeFinals,
90 : InvokeFinalExtraArgsIndices>... args);
91 :
92 0 : explicit ReductionData(Ts... args);
93 :
94 0 : ReductionData() = default;
95 0 : ReductionData(const ReductionData& /*rhs*/) = default;
96 0 : ReductionData& operator=(const ReductionData& /*rhs*/) = default;
97 0 : ReductionData(ReductionData&& /*rhs*/) = default;
98 0 : ReductionData& operator=(ReductionData&& /*rhs*/) = default;
99 0 : ~ReductionData() = default;
100 :
101 0 : explicit ReductionData(CkReductionMsg* const message) {
102 : PUP::fromMem creator(message->getData());
103 : creator | *this;
104 : }
105 :
106 0 : static CkReductionMsg* combine(int number_of_messages, CkReductionMsg** msgs);
107 :
108 0 : ReductionData& combine(ReductionData&& t) {
109 : ReductionData::combine_helper(this, std::move(t),
110 : std::make_index_sequence<sizeof...(Ts)>{});
111 : return *this;
112 : }
113 :
114 0 : ReductionData& finalize() {
115 : invoke_final_loop_over_tuple(std::make_index_sequence<sizeof...(Ts)>{});
116 : return *this;
117 : }
118 :
119 : /// \cond
120 : // clang-tidy: non-const reference
121 : void pup(PUP::er& p) { p | data_; } // NOLINT
122 :
123 : // NOLINTNEXTLINE(modernize-avoid-c-arrays)
124 : std::unique_ptr<char[]> packed();
125 :
126 : size_t size();
127 :
128 : const std::tuple<Ts...>& data() const { return data_; }
129 :
130 : std::tuple<Ts...>& data() { return data_; }
131 :
132 : private:
133 : template <size_t... Is>
134 : static void combine_helper(gsl::not_null<ReductionData*> reduced,
135 : ReductionData&& current,
136 : std::index_sequence<Is...> /*meta*/);
137 :
138 : template <size_t I, class InvokeFinal, size_t... Js>
139 : void invoke_final_helper(std::index_sequence<Js...> /*meta*/);
140 :
141 : template <size_t... Is>
142 : void invoke_final_loop_over_tuple(std::index_sequence<Is...> /*meta*/);
143 :
144 : std::tuple<Ts...> data_;
145 : /// \endcond
146 : };
147 :
148 : /// \cond
149 : template <class... Ts, class... InvokeCombines, class... InvokeFinals,
150 : class... InvokeFinalExtraArgsIndices>
151 : ReductionData<ReductionDatum<Ts, InvokeCombines, InvokeFinals,
152 : InvokeFinalExtraArgsIndices>...>::
153 : ReductionData(ReductionDatum<Ts, InvokeCombines, InvokeFinals,
154 : InvokeFinalExtraArgsIndices>... args)
155 : : data_(std::move(args.value)...) {}
156 :
157 : template <class... Ts, class... InvokeCombines, class... InvokeFinals,
158 : class... InvokeFinalExtraArgsIndices>
159 : ReductionData<
160 : ReductionDatum<Ts, InvokeCombines, InvokeFinals,
161 : InvokeFinalExtraArgsIndices>...>::ReductionData(Ts... args)
162 : : data_(std::move(args)...) {}
163 :
164 : template <class... Ts, class... InvokeCombines, class... InvokeFinals,
165 : class... InvokeFinalExtraArgsIndices>
166 : CkReductionMsg* ReductionData<ReductionDatum<Ts, InvokeCombines, InvokeFinals,
167 : InvokeFinalExtraArgsIndices>...>::
168 : combine(const int number_of_messages, CkReductionMsg** const msgs) {
169 : // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
170 : ReductionData reduced(msgs[0]);
171 : for (int msg_id = 1; msg_id < number_of_messages; ++msg_id) {
172 : // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
173 : ReductionData current(msgs[msg_id]);
174 : ReductionData::combine_helper(&reduced, std::move(current),
175 : std::make_index_sequence<sizeof...(Ts)>{});
176 : }
177 : return detail::new_reduction_msg(reduced);
178 : }
179 :
180 : template <class... Ts, class... InvokeCombines, class... InvokeFinals,
181 : class... InvokeFinalExtraArgsIndices>
182 : // NOLINTNEXTLINE(modernize-avoid-c-arrays)
183 : std::unique_ptr<char[]>
184 : ReductionData<ReductionDatum<Ts, InvokeCombines, InvokeFinals,
185 : InvokeFinalExtraArgsIndices>...>::packed() {
186 : // NOLINTNEXTLINE(modernize-avoid-c-arrays)
187 : auto result = std::make_unique<char[]>(size());
188 : PUP::toMem packer(result.get());
189 : packer | *this;
190 : return result;
191 : }
192 :
193 : template <class... Ts, class... InvokeCombines, class... InvokeFinals,
194 : class... InvokeFinalExtraArgsIndices>
195 : size_t ReductionData<ReductionDatum<Ts, InvokeCombines, InvokeFinals,
196 : InvokeFinalExtraArgsIndices>...>::size() {
197 : PUP::sizer size_pup;
198 : size_pup | *this;
199 : return size_pup.size();
200 : }
201 :
202 : template <class... Ts, class... InvokeCombines, class... InvokeFinals,
203 : class... InvokeFinalExtraArgsIndices>
204 : template <size_t... Is>
205 : void ReductionData<ReductionDatum<Ts, InvokeCombines, InvokeFinals,
206 : InvokeFinalExtraArgsIndices>...>::
207 : combine_helper(const gsl::not_null<ReductionData*> reduced,
208 : ReductionData&& current,
209 : std::index_sequence<Is...> /*meta*/) {
210 : EXPAND_PACK_LEFT_TO_RIGHT((std::get<Is>(reduced->data_) = InvokeCombines{}(
211 : std::move(std::get<Is>(reduced->data_)),
212 : std::move(std::get<Is>(current.data_)))));
213 : }
214 :
215 : template <class... Ts, class... InvokeCombines, class... InvokeFinals,
216 : class... InvokeFinalExtraArgsIndices>
217 : template <size_t I, class InvokeFinal, size_t... Js>
218 : void ReductionData<ReductionDatum<Ts, InvokeCombines, InvokeFinals,
219 : InvokeFinalExtraArgsIndices>...>::
220 : invoke_final_helper(std::index_sequence<Js...> /*meta*/) {
221 : std::get<I>(data_) = InvokeFinal{}(std::move(std::get<I>(data_)),
222 : std::as_const(std::get<Js>(data_))...);
223 : }
224 :
225 : template <class... Ts, class... InvokeCombines, class... InvokeFinals,
226 : class... InvokeFinalExtraArgsIndices>
227 : template <size_t... Is>
228 : void ReductionData<ReductionDatum<Ts, InvokeCombines, InvokeFinals,
229 : InvokeFinalExtraArgsIndices>...>::
230 : invoke_final_loop_over_tuple(std::index_sequence<Is...> /*meta*/) {
231 : EXPAND_PACK_LEFT_TO_RIGHT(
232 : invoke_final_helper<Is, InvokeFinals>(InvokeFinalExtraArgsIndices{}));
233 : }
234 : /// \endcond
235 :
236 : /// Can be used instead of a `Parallel::Section` when no section is desired.
237 : ///
238 : /// \see Parallel::contribute_to_reduction
239 : /// @{
240 1 : struct NoSection {};
241 0 : NoSection& no_section();
242 : /// @}
243 :
244 : /*!
245 : * \ingroup ParallelGroup
246 : * \brief Perform a reduction from the `sender_component` (typically your own
247 : * parallel component) to the `target_component`, performing the `Action` upon
248 : * receiving the reduction.
249 : *
250 : * \par Section reductions
251 : * This function supports section reductions (see `Parallel::Section`). Pass
252 : * the `Parallel::Section` as the \p section argument, or pass
253 : * `Parallel::no_section()` to perform a reduction over the entire parallel
254 : * component (default). Here's an example of a section reduction:
255 : *
256 : * \snippet Test_SectionReductions.cpp section_reduction
257 : *
258 : * \warning Section reductions currently don't support migrating elements, i.e.
259 : * either load-balancing or restoring a checkpoint to a different number of PEs.
260 : * Support for migrating elements may require [updating the "section
261 : * cookie"](https://charm.readthedocs.io/en/latest/charm++/manual.html#section-operations-with-migrating-elements).
262 : * One possibility to update the section cookie is to broadcast a CkMulticast
263 : * message to the section elements and invoke `CkGetSectionInfo` within the
264 : * message.
265 : */
266 : template <class Action, class SenderProxy, class TargetProxy, class... Ts,
267 : class SectionType = NoSection>
268 1 : void contribute_to_reduction(ReductionData<Ts...> reduction_data,
269 : const SenderProxy& sender_component,
270 : const TargetProxy& target_component,
271 : [[maybe_unused]] const gsl::not_null<SectionType*>
272 : section = &no_section()) {
273 : (void)Parallel::charmxx::RegisterReducerFunction<
274 : &ReductionData<Ts...>::combine>::registrar;
275 : CkCallback callback(
276 : TargetProxy::index_t::template redn_wrapper_reduction_action<
277 : Action, std::decay_t<ReductionData<Ts...>>>(nullptr),
278 : target_component);
279 : const auto& charm_reducer_function =
280 : Parallel::charmxx::charm_reducer_functions.at(
281 : std::hash<Parallel::charmxx::ReducerFunctions>{}(
282 : &ReductionData<Ts...>::combine));
283 : if constexpr (std::is_same_v<SectionType, NoSection>) {
284 : if constexpr (is_array_element_proxy<SenderProxy>::value) {
285 : Parallel::local(sender_component)
286 : ->contribute(static_cast<int>(reduction_data.size()),
287 : reduction_data.packed().get(), charm_reducer_function,
288 : callback);
289 : } else {
290 : Parallel::local_branch(sender_component)
291 : ->contribute(static_cast<int>(reduction_data.size()),
292 : reduction_data.packed().get(), charm_reducer_function,
293 : callback);
294 : }
295 : } else {
296 : static_assert(
297 : tt::is_a_v<Section, SectionType>,
298 : "Either pass a 'Parallel::Section' for the 'section' argument or "
299 : "'Parallel::NoSection{}'. For the latter you can just omit the "
300 : "argument.");
301 : using SectionProxy = typename SectionType::cproxy_section;
302 : // Retrieve the section cookie that keeps track of the reduction
303 : auto& section_cookie = section->cookie();
304 : // Ideally we would update the section cookie here using
305 : // `CkGetSectionInfo()`. However, that only works with CkMulticast messages
306 : // (see
307 : // https://github.com/UIUC-PPL/charm/blob/99cda7a11108f503b89dc847b58e62bc74267440/src/ck-core/ckmulticast.C#L1180).
308 : // Dispatching a message to the `sender_component` doesn't help because
309 : // sending a message to a single element doesn't go through CkMulticast. Not
310 : // updating the section cookie seems to work, but might break when elements
311 : // migrate (see
312 : // https://charm.readthedocs.io/en/latest/charm++/manual.html#section-operations-with-migrating-elements).
313 : // In that case we can possibly broadcast a CkMulticast message to all
314 : // elements to update their section cookies.
315 : SectionProxy::contribute(static_cast<int>(reduction_data.size()),
316 : reduction_data.packed().get(),
317 : charm_reducer_function, section_cookie, callback);
318 : }
319 : }
320 : } // namespace Parallel
|