Line data Source code
1 0 : // Distributed under the MIT License.
2 : // See LICENSE.txt for details.
3 :
4 : #pragma once
5 :
6 : #include <algorithm>
7 : #include <boost/functional/hash.hpp>
8 : #include <cstddef>
9 : #include <cstdint>
10 : #include <iomanip>
11 : #include <limits>
12 : #include <map>
13 : #include <memory>
14 : #include <optional>
15 : #include <pup.h>
16 : #include <pup_stl.h>
17 : #include <sstream>
18 : #include <string>
19 : #include <tuple>
20 : #include <unordered_set>
21 : #include <utility>
22 : #include <vector>
23 :
24 : #include "DataStructures/DataBox/DataBox.hpp"
25 : #include "Options/String.hpp"
26 : #include "Parallel/AlgorithmExecution.hpp"
27 : #include "Parallel/GlobalCache.hpp"
28 : #include "Parallel/InboxInserters.hpp"
29 : #include "Parallel/Invoke.hpp"
30 : #include "Parallel/Local.hpp"
31 : #include "Parallel/Reduction.hpp"
32 : #include "ParallelAlgorithms/EventsAndTriggers/Event.hpp"
33 : #include "Time/ChangeSlabSize.hpp"
34 : #include "Time/StepChoosers/StepChooser.hpp"
35 : #include "Time/Tags/HistoryEvolvedVariables.hpp"
36 : #include "Time/Tags/TimeStepper.hpp"
37 : #include "Time/TimeStepId.hpp"
38 : #include "Time/TimeSteppers/TimeStepper.hpp"
39 : #include "Utilities/Algorithm.hpp"
40 : #include "Utilities/ErrorHandling/Assert.hpp"
41 : #include "Utilities/Functional.hpp"
42 : #include "Utilities/Serialization/CharmPupable.hpp"
43 : #include "Utilities/TMPL.hpp"
44 : #include "Utilities/TaggedTuple.hpp"
45 :
46 : /// \cond
47 : namespace Tags {
48 : struct DataBox;
49 : struct TimeStepId;
50 : } // namespace Tags
51 : /// \endcond
52 :
53 : namespace ChangeSlabSize_detail {
54 : struct NewSlabSizeInbox
55 : : public Parallel::InboxInserters::MemberInsert<NewSlabSizeInbox> {
56 : using temporal_id = int64_t;
57 : using type = std::map<temporal_id, std::unordered_multiset<double>>;
58 :
59 : static std::string output_inbox(const type& inbox,
60 : const size_t padding_size) {
61 : std::stringstream ss{};
62 : const std::string pad(padding_size, ' ');
63 :
64 : ss << std::scientific << std::setprecision(16);
65 : ss << pad << "NewSlabSizeInbox:\n";
66 : for (const auto& [slab_number, slab_sizes] : inbox) {
67 : ss << pad << " Slab number: " << slab_number
68 : << ", slab sizes: " << slab_sizes << "\n";
69 : }
70 :
71 : return ss.str();
72 : }
73 : };
74 :
75 : // This inbox doesn't receive any data, it just counts messages (using
76 : // the size of the multiset). Whenever a message is sent to the
77 : // NewSlabSizeInbox, another message is sent here synchronously, so
78 : // the count here is the number of messages we expect in the
79 : // NewSlabSizeInbox.
80 : struct NumberOfExpectedMessagesInbox
81 : : public Parallel::InboxInserters::MemberInsert<
82 : NumberOfExpectedMessagesInbox> {
83 : using temporal_id = int64_t;
84 : using NoData = std::tuple<>;
85 : using type = std::map<temporal_id,
86 : std::unordered_multiset<NoData, boost::hash<NoData>>>;
87 :
88 : static std::string output_inbox(const type& inbox,
89 : const size_t padding_size) {
90 : std::stringstream ss{};
91 : const std::string pad(padding_size, ' ');
92 :
93 : ss << std::scientific << std::setprecision(16);
94 : ss << pad << "SlabSizeNumberOfExpectedMessagesInbox:\n";
95 : for (const auto& [number_of_messages, no_data] : inbox) {
96 : (void)no_data;
97 : ss << pad << " Number of messages: " << number_of_messages << "\n";
98 : }
99 :
100 : return ss.str();
101 : }
102 : };
103 :
104 : struct StoreNewSlabSize {
105 : template <typename ParallelComponent, typename DbTags, typename Metavariables,
106 : typename ArrayIndex>
107 : static void apply(const db::DataBox<DbTags>& /*box*/,
108 : Parallel::GlobalCache<Metavariables>& cache,
109 : const ArrayIndex& array_index, const int64_t slab_number,
110 : const double slab_size) {
111 : Parallel::receive_data<ChangeSlabSize_detail::NewSlabSizeInbox>(
112 : *Parallel::local(Parallel::get_parallel_component<ParallelComponent>(
113 : cache)[array_index]),
114 : slab_number, slab_size);
115 : }
116 : };
117 : } // namespace ChangeSlabSize_detail
118 :
119 : namespace Actions {
120 : /// \ingroup ActionsGroup
121 : /// \ingroup TimeGroup
122 : /// Adjust the slab size based on previous executions of
123 : /// Events::ChangeSlabSize
124 : ///
125 : /// Uses:
126 : /// - DataBox:
127 : /// - Tags::HistoryEvolvedVariables
128 : /// - Tags::TimeStep
129 : /// - Tags::TimeStepId
130 : /// - Tags::TimeStepper<>
131 : ///
132 : /// DataBox changes:
133 : /// - Adds: nothing
134 : /// - Removes: nothing
135 : /// - Modifies:
136 : /// - Tags::Next<Tags::TimeStepId>
137 : /// - Tags::TimeStep
138 : /// - Tags::TimeStepId
139 1 : struct ChangeSlabSize {
140 0 : using inbox_tags =
141 : tmpl::list<ChangeSlabSize_detail::NumberOfExpectedMessagesInbox,
142 : ChangeSlabSize_detail::NewSlabSizeInbox>;
143 :
144 : template <typename DbTags, typename... InboxTags, typename Metavariables,
145 : typename ArrayIndex, typename ActionList,
146 : typename ParallelComponent>
147 0 : static Parallel::iterable_action_return_t apply(
148 : db::DataBox<DbTags>& box, tuples::TaggedTuple<InboxTags...>& inboxes,
149 : const Parallel::GlobalCache<Metavariables>& /*cache*/,
150 : const ArrayIndex& /*array_index*/, const ActionList /*meta*/,
151 : const ParallelComponent* const /*meta*/) {
152 : const auto& time_step_id = db::get<::Tags::TimeStepId>(box);
153 : if (not time_step_id.is_at_slab_boundary()) {
154 : return {Parallel::AlgorithmExecution::Continue, std::nullopt};
155 : }
156 :
157 : auto& message_count_inbox =
158 : tuples::get<ChangeSlabSize_detail::NumberOfExpectedMessagesInbox>(
159 : inboxes);
160 : if (message_count_inbox.empty() or
161 : message_count_inbox.begin()->first != time_step_id.slab_number()) {
162 : return {Parallel::AlgorithmExecution::Continue, std::nullopt};
163 : }
164 :
165 : auto& new_slab_size_inbox =
166 : tuples::get<ChangeSlabSize_detail::NewSlabSizeInbox>(inboxes);
167 :
168 : const auto slab_number = time_step_id.slab_number();
169 : const auto number_of_changes = [&slab_number](const auto& inbox) -> size_t {
170 : if (inbox.empty()) {
171 : return 0;
172 : }
173 : if (inbox.begin()->first == slab_number) {
174 : return inbox.begin()->second.size();
175 : }
176 : ASSERT(inbox.begin()->first >= slab_number,
177 : "Received data for a change at slab " << inbox.begin()->first
178 : << " but it is already slab " << slab_number);
179 : return 0;
180 : };
181 :
182 : const size_t expected_messages = number_of_changes(message_count_inbox);
183 : const size_t received_messages = number_of_changes(new_slab_size_inbox);
184 : ASSERT(expected_messages >= received_messages,
185 : "Received " << received_messages << " size change messages at slab "
186 : << slab_number << ", but only expected "
187 : << expected_messages);
188 : if (expected_messages != received_messages) {
189 : return {Parallel::AlgorithmExecution::Retry, std::nullopt};
190 : }
191 :
192 : message_count_inbox.erase(message_count_inbox.begin());
193 :
194 : const double new_slab_size =
195 : *alg::min_element(new_slab_size_inbox.begin()->second);
196 : new_slab_size_inbox.erase(new_slab_size_inbox.begin());
197 :
198 : const TimeStepper& time_stepper = db::get<::Tags::TimeStepper<>>(box);
199 :
200 : // Sometimes time steppers need to run with a fixed step size.
201 : // This is generally at the start of an evolution when the history
202 : // is in an unusual state.
203 : if (not time_stepper.can_change_step_size(
204 : time_step_id, db::get<::Tags::HistoryEvolvedVariables<>>(box))) {
205 : return {Parallel::AlgorithmExecution::Continue, std::nullopt};
206 : }
207 :
208 : const auto current_slab = time_step_id.step_time().slab();
209 :
210 : const double new_slab_end =
211 : time_step_id.time_runs_forward()
212 : ? current_slab.start().value() + new_slab_size
213 : : current_slab.end().value() - new_slab_size;
214 :
215 : change_slab_size(make_not_null(&box), new_slab_end);
216 :
217 : return {Parallel::AlgorithmExecution::Continue, std::nullopt};
218 : }
219 : };
220 : } // namespace Actions
221 :
222 : namespace Events {
223 : /// \ingroup TimeGroup
224 : /// %Trigger a slab size change.
225 : ///
226 : /// The new size will be the minimum suggested by any of the provided
227 : /// step choosers on any element. This requires a global reduction,
228 : /// so it is possible to delay the change until a later slab to avoid
229 : /// a global synchronization. The actual change is carried out by
230 : /// Actions::ChangeSlabSize.
231 : ///
232 : /// When running with global time-stepping, the slab size and step
233 : /// size are the same, so this adjusts the step size used by the time
234 : /// integration. With local time-stepping this controls the interval
235 : /// between times when the sequences of steps on all elements are
236 : /// forced to align.
237 1 : class ChangeSlabSize : public Event {
238 0 : using ReductionData = Parallel::ReductionData<
239 : Parallel::ReductionDatum<int64_t, funcl::AssertEqual<>>,
240 : Parallel::ReductionDatum<double, funcl::Min<>>>;
241 :
242 : public:
243 : /// \cond
244 : explicit ChangeSlabSize(CkMigrateMessage* /*unused*/) {}
245 : using PUP::able::register_constructor;
246 : WRAPPED_PUPable_decl_template(ChangeSlabSize); // NOLINT
247 : /// \endcond
248 :
249 0 : struct StepChoosers {
250 0 : static constexpr Options::String help = "Limits on slab size";
251 0 : using type =
252 : std::vector<std::unique_ptr<StepChooser<StepChooserUse::Slab>>>;
253 0 : static size_t lower_bound_on_size() { return 1; }
254 : };
255 :
256 0 : struct DelayChange {
257 0 : static constexpr Options::String help = "Slabs to wait before changing";
258 0 : using type = uint64_t;
259 : };
260 :
261 0 : using options = tmpl::list<StepChoosers, DelayChange>;
262 0 : static constexpr Options::String help =
263 : "Trigger a slab size change chosen by the provided step choosers.\n"
264 : "The actual changing of the slab size can be delayed until a later\n"
265 : "slab to improve parallelization.";
266 :
267 0 : ChangeSlabSize() = default;
268 0 : ChangeSlabSize(std::vector<std::unique_ptr<StepChooser<StepChooserUse::Slab>>>
269 : step_choosers,
270 : const uint64_t delay_change)
271 : : step_choosers_(std::move(step_choosers)), delay_change_(delay_change) {}
272 :
273 0 : using compute_tags_for_observation_box = tmpl::list<>;
274 :
275 0 : using return_tags = tmpl::list<>;
276 0 : using argument_tags = tmpl::list<::Tags::TimeStepId, ::Tags::DataBox>;
277 :
278 : template <typename DbTags, typename Metavariables, typename ArrayIndex,
279 : typename ParallelComponent>
280 0 : void operator()(const TimeStepId& time_step_id,
281 : const db::DataBox<DbTags>& box_for_step_choosers,
282 : Parallel::GlobalCache<Metavariables>& cache,
283 : const ArrayIndex& array_index,
284 : const ParallelComponent* const /*meta*/,
285 : const ObservationValue& /*observation_value*/) const {
286 : const auto next_changable_slab = time_step_id.is_at_slab_boundary()
287 : ? time_step_id.slab_number()
288 : : time_step_id.slab_number() + 1;
289 : const auto slab_to_change =
290 : next_changable_slab + static_cast<int64_t>(delay_change_);
291 :
292 : double desired_slab_size = std::numeric_limits<double>::infinity();
293 : bool synchronization_required = false;
294 : for (const auto& step_chooser : step_choosers_) {
295 : desired_slab_size = std::min(
296 : desired_slab_size,
297 : step_chooser
298 : ->desired_step(time_step_id.step_time().slab().duration().value(),
299 : box_for_step_choosers)
300 : .first);
301 : // We must synchronize if any step chooser requires it, not just
302 : // the limiting one, because choosers requiring synchronization
303 : // can be limiting on some processors and not others.
304 : if (not synchronization_required) {
305 : synchronization_required = step_chooser->uses_local_data();
306 : }
307 : }
308 :
309 : const auto& component_proxy =
310 : Parallel::get_parallel_component<ParallelComponent>(cache);
311 : const auto& self_proxy = component_proxy[array_index];
312 : // This message is sent synchronously, so it is guaranteed to
313 : // arrive before the ChangeSlabSize action is called.
314 : Parallel::receive_data<
315 : ChangeSlabSize_detail::NumberOfExpectedMessagesInbox>(
316 : *Parallel::local(self_proxy), slab_to_change,
317 : ChangeSlabSize_detail::NumberOfExpectedMessagesInbox::NoData{});
318 : if (synchronization_required) {
319 : Parallel::contribute_to_reduction<
320 : ChangeSlabSize_detail::StoreNewSlabSize>(
321 : ReductionData(slab_to_change, desired_slab_size), self_proxy,
322 : component_proxy);
323 : } else {
324 : Parallel::receive_data<ChangeSlabSize_detail::NewSlabSizeInbox>(
325 : *Parallel::local(self_proxy), slab_to_change, desired_slab_size);
326 : }
327 : }
328 :
329 0 : using is_ready_argument_tags = tmpl::list<>;
330 :
331 : template <typename Metavariables, typename ArrayIndex, typename Component>
332 0 : bool is_ready(Parallel::GlobalCache<Metavariables>& /*cache*/,
333 : const ArrayIndex& /*array_index*/,
334 : const Component* const /*meta*/) const {
335 : return true;
336 : }
337 :
338 1 : bool needs_evolved_variables() const override {
339 : // This depends on the chosen StepChoosers, but they don't have a
340 : // way to report this information so we just return true to be
341 : // safe.
342 : return true;
343 : }
344 :
345 : template <typename F>
346 0 : void for_each_step_chooser(F&& f) const {
347 : for (const auto& step_chooser : step_choosers_) {
348 : f(*step_chooser);
349 : }
350 : }
351 :
352 : // NOLINTNEXTLINE(google-runtime-references)
353 0 : void pup(PUP::er& p) override {
354 : Event::pup(p);
355 : p | step_choosers_;
356 : p | delay_change_;
357 : }
358 :
359 : private:
360 : std::vector<std::unique_ptr<StepChooser<StepChooserUse::Slab>>>
361 0 : step_choosers_;
362 0 : uint64_t delay_change_ = std::numeric_limits<uint64_t>::max();
363 : };
364 : } // namespace Events
|