Line data Source code
1 0 : // Distributed under the MIT License.
2 : // See LICENSE.txt for details.
3 :
4 : #pragma once
5 :
6 : #include <cstddef>
7 : #include <mutex>
8 : #include <optional>
9 : #include <string>
10 : #include <tuple>
11 : #include <unordered_map>
12 : #include <utility>
13 : #include <vector>
14 :
15 : #include "DataStructures/DataBox/DataBox.hpp"
16 : #include "IO/H5/AccessType.hpp"
17 : #include "IO/H5/Dat.hpp"
18 : #include "IO/H5/File.hpp"
19 : #include "IO/Observer/Helpers.hpp"
20 : #include "IO/Observer/ObservationId.hpp"
21 : #include "IO/Observer/Protocols/ReductionDataFormatter.hpp"
22 : #include "IO/Observer/Tags.hpp"
23 : #include "Parallel/ArrayComponentId.hpp"
24 : #include "Parallel/ArrayIndex.hpp"
25 : #include "Parallel/GlobalCache.hpp"
26 : #include "Parallel/Info.hpp"
27 : #include "Parallel/Invoke.hpp"
28 : #include "Parallel/Local.hpp"
29 : #include "Parallel/NodeLock.hpp"
30 : #include "Parallel/ParallelComponentHelpers.hpp"
31 : #include "Parallel/Printf/Printf.hpp"
32 : #include "Parallel/Reduction.hpp"
33 : #include "Utilities/ErrorHandling/Assert.hpp"
34 : #include "Utilities/ErrorHandling/Error.hpp"
35 : #include "Utilities/GetOutput.hpp"
36 : #include "Utilities/Gsl.hpp"
37 : #include "Utilities/PrettyType.hpp"
38 : #include "Utilities/ProtocolHelpers.hpp"
39 : #include "Utilities/Requires.hpp"
40 : #include "Utilities/Serialization/PupStlCpp17.hpp"
41 : #include "Utilities/StdHelpers.hpp"
42 : #include "Utilities/TMPL.hpp"
43 : #include "Utilities/TaggedTuple.hpp"
44 :
45 : namespace observers {
46 :
47 : /// \cond
48 : template <class Metavariables>
49 : struct ObserverWriter;
50 : /// \endcond
51 :
52 0 : namespace ThreadedActions {
53 : /// \cond
54 : struct CollectReductionDataOnNode;
55 : struct WriteReductionData;
56 : /// \endcond
57 : } // namespace ThreadedActions
58 :
59 : /// Indicates no formatter is selected
60 1 : struct NoFormatter {
61 0 : void pup(PUP::er& /*p*/) {}
62 : };
63 :
64 : namespace Actions {
65 : /// \cond
66 : struct ContributeReductionDataToWriter;
67 : /// \endcond
68 :
69 : /*!
70 : * \ingroup ObserversGroup
71 : * \brief Send reduction data to the observer group.
72 : *
73 : * Once everything at a specific `ObservationId` has been contributed to the
74 : * reduction, the groups reduce to their local nodegroup.
75 : *
76 : * The caller of this Action (which is to be invoked on the Observer parallel
77 : * component) must pass in an `observation_id` used to uniquely identify the
78 : * observation in time, the name of the `h5::Dat` subfile in the HDF5 file (e.g.
79 : * `/element_data`, where the slash is important), a `std::vector<std::string>`
80 : * of names of the quantities being reduced (e.g. `{"Time", "L1ErrorDensity",
81 : * "L2ErrorDensity"}`), and the `Parallel::ReductionData` that holds the
82 : * `ReductionDatums` containing info on how to do the reduction.
83 : *
84 : * The observer components need to know all expected reduction data types by
85 : * compile-time, so they rely on the
86 : * `Metavariables::observed_reduction_data_tags` alias to collect them in one
87 : * place. To this end, each Action that contributes reduction data must expose
88 : * the type alias as:
89 : *
90 : * \snippet ObserverHelpers.hpp make_reduction_data_tags
91 : *
92 : * Then, in the `Metavariables` collect them from all observing Actions using
93 : * the `observers::collect_reduction_data_tags` metafunction.
94 : *
95 : * This action also accepts a "formatter" that will be forwarded along with the
96 : * reduction data and used to print an informative message when the reduction is
97 : * complete. The formatter must conform to
98 : * `observers::protocols::ReductionDataFormatter`.
99 : *
100 : * This action also supports observing the intermediate stage of the reduction
101 : * over just the processing element that the element is currently on. This can
102 : * be useful e.g. to measure performance metric to assess load-balancing such as
103 : * the number of grid points on each core. Enable per-core observations by
104 : * passing `true` for the `observe_per_core` argument (default: `false`). The
105 : * data will be written in one H5 file per _node_ prefixed with the
106 : * `observers::Tags::ReductionFileName`, in a `Core{core_id}` subfile, where
107 : * `core_id` is an integer identifying the core across all nodes (see
108 : * `Parallel::my_proc`). For example, when running on 2 nodes with 2 cores each
109 : * you will end up with `Reductions0.h5` containing `/Core0/{subfile_name}.dat`
110 : * and `/Core1/{subfile_name}.dat`, and `Reductions1.h5` containing
111 : * `/Core2/{subfile_name}.dat` and `/Core3/{subfile_name}.dat`. This is in
112 : * addition to the usual reduction output over _all_ registered elements,
113 : * written to `Reductions.h5` (no node ID suffix in the file name).
114 : */
115 1 : struct ContributeReductionData {
116 : template <typename ParallelComponent, typename DbTagsList,
117 : typename Metavariables, typename ArrayIndex, typename... Ts,
118 : typename Formatter = observers::NoFormatter>
119 0 : static void apply(db::DataBox<DbTagsList>& box,
120 : Parallel::GlobalCache<Metavariables>& cache,
121 : const ArrayIndex& array_index,
122 : const observers::ObservationId& observation_id,
123 : const Parallel::ArrayComponentId& sender_array_id,
124 : const std::string& subfile_name,
125 : const std::vector<std::string>& reduction_names,
126 : Parallel::ReductionData<Ts...>&& reduction_data,
127 : std::optional<Formatter>&& formatter = std::nullopt,
128 : const bool observe_per_core = false) {
129 : db::mutate<Tags::ReductionData<Ts...>, Tags::ReductionDataNames<Ts...>,
130 : Tags::ContributorsOfReductionData>(
131 : [&array_index, &cache, &observation_id,
132 : reduction_data = std::move(reduction_data), &reduction_names,
133 : &sender_array_id, &subfile_name, &formatter, &observe_per_core](
134 : const gsl::not_null<std::unordered_map<
135 : ObservationId, Parallel::ReductionData<Ts...>>*>
136 : reduction_data_map,
137 : const gsl::not_null<
138 : std::unordered_map<ObservationId, std::vector<std::string>>*>
139 : reduction_names_map,
140 : const gsl::not_null<std::unordered_map<
141 : ObservationId, std::unordered_set<Parallel::ArrayComponentId>>*>
142 : reduction_observers_contributed,
143 : const std::unordered_map<
144 : ObservationKey,
145 : std::unordered_set<Parallel::ArrayComponentId>>&
146 : observations_registered) mutable { // NOLINT(spectre-mutable)
147 : ASSERT(
148 : observations_registered.find(observation_id.observation_key()) !=
149 : observations_registered.end(),
150 : "Couldn't find registration key "
151 : << observation_id.observation_key()
152 : << " in the registered observers. Known IDs are "
153 : << keys_of(observations_registered)
154 : << "\nIt could be that you are using a nodegroup DG "
155 : "collection but the observation code for the event hasn't "
156 : "been updated to work with the nodegroup yet.");
157 :
158 : auto& contributed_array_ids =
159 : (*reduction_observers_contributed)[observation_id];
160 : if (UNLIKELY(contributed_array_ids.find(sender_array_id) !=
161 : contributed_array_ids.end())) {
162 : ERROR("Already received reduction data to observation id "
163 : << observation_id << " from array component id "
164 : << sender_array_id);
165 : }
166 : contributed_array_ids.insert(sender_array_id);
167 :
168 : if (reduction_data_map->count(observation_id) == 0) {
169 : reduction_data_map->emplace(observation_id,
170 : std::move(reduction_data));
171 : reduction_names_map->emplace(observation_id, reduction_names);
172 : } else {
173 : if (UNLIKELY(reduction_names_map->at(observation_id) !=
174 : reduction_names)) {
175 : using ::operator<<;
176 : ERROR("Reduction names differ at ObservationId "
177 : << observation_id << " with the expected names being "
178 : << reduction_names_map->at(observation_id)
179 : << " and the received names being " << reduction_names);
180 : }
181 : reduction_data_map->operator[](observation_id)
182 : .combine(std::move(reduction_data));
183 : }
184 :
185 : // Check if we have received all reduction data from the registered
186 : // elements. If so, we reduce to the local ObserverWriter nodegroup.
187 : if (UNLIKELY(
188 : contributed_array_ids.size() ==
189 : observations_registered.at(observation_id.observation_key())
190 : .size())) {
191 : auto& local_writer = *Parallel::local_branch(
192 : Parallel::get_parallel_component<ObserverWriter<Metavariables>>(
193 : cache));
194 : auto& my_proxy =
195 : Parallel::get_parallel_component<ParallelComponent>(cache);
196 : const std::optional<int> observe_with_core_id =
197 : observe_per_core ? std::make_optional(Parallel::my_proc<int>(
198 : *Parallel::local_branch(my_proxy)))
199 : : std::nullopt;
200 : Parallel::threaded_action<
201 : ThreadedActions::CollectReductionDataOnNode>(
202 : local_writer, observation_id,
203 : Parallel::make_array_component_id<ParallelComponent>(
204 : array_index),
205 : subfile_name, (*reduction_names_map)[observation_id],
206 : std::move((*reduction_data_map)[observation_id]),
207 : std::move(formatter), observe_with_core_id);
208 : reduction_data_map->erase(observation_id);
209 : reduction_names_map->erase(observation_id);
210 : reduction_observers_contributed->erase(observation_id);
211 : }
212 : },
213 : make_not_null(&box),
214 : db::get<Tags::ExpectedContributorsForObservations>(box));
215 : // Silence a gcc <= 9 unused-variable warning
216 : (void)observe_per_core;
217 : }
218 : };
219 : } // namespace Actions
220 :
221 : namespace ThreadedActions {
222 :
223 : namespace ReductionActions_detail {
224 : void append_to_reduction_data(
225 : const gsl::not_null<std::vector<double>*> all_reduction_data,
226 : const double t);
227 :
228 : template <typename T>
229 : void append_to_reduction_data(
230 : const gsl::not_null<std::vector<double>*> all_reduction_data,
231 : const std::vector<T>& t);
232 :
233 : void append_to_reduction_data(
234 : gsl::not_null<std::vector<double>*> all_reduction_data,
235 : const std::array<double, 3>& t);
236 :
237 : template <typename... Ts, size_t... Is>
238 : void write_data(const std::string& subfile_name,
239 : const std::string& input_source,
240 : std::vector<std::string> legend, const std::tuple<Ts...>& data,
241 : const std::string& file_prefix,
242 : std::index_sequence<Is...> /*meta*/) {
243 : static_assert(sizeof...(Ts) > 0,
244 : "Must be reducing at least one piece of data");
245 : std::vector<double> data_to_append{};
246 : EXPAND_PACK_LEFT_TO_RIGHT(
247 : append_to_reduction_data(&data_to_append, std::get<Is>(data)));
248 :
249 : if (legend.size() != data_to_append.size()) {
250 : ERROR(
251 : "There must be one name provided for each piece of data. You provided "
252 : << legend.size() << " names: '" << get_output(legend)
253 : << "' but there are " << data_to_append.size()
254 : << " pieces of data being reduced");
255 : }
256 :
257 : h5::H5File<h5::AccessType::ReadWrite> h5file(file_prefix + ".h5", true,
258 : input_source);
259 : constexpr size_t version_number = 0;
260 : auto& time_series_file = h5file.try_insert<h5::Dat>(
261 : subfile_name, std::move(legend), version_number);
262 : time_series_file.append(data_to_append);
263 : }
264 : } // namespace ReductionActions_detail
265 :
266 : /*!
267 : * \brief Gathers all the reduction data from all processing elements/cores on a
268 : * node.
269 : */
270 1 : struct CollectReductionDataOnNode {
271 : public:
272 : template <typename ParallelComponent, typename DbTagsList,
273 : typename Metavariables, typename ArrayIndex,
274 : typename... ReductionDatums,
275 : typename Formatter = observers::NoFormatter>
276 0 : static void apply(
277 : db::DataBox<DbTagsList>& box, Parallel::GlobalCache<Metavariables>& cache,
278 : const ArrayIndex& /*array_index*/,
279 : const gsl::not_null<Parallel::NodeLock*> node_lock,
280 : const observers::ObservationId& observation_id,
281 : Parallel::ArrayComponentId observer_group_id,
282 : const std::string& subfile_name,
283 : std::vector<std::string>&& reduction_names,
284 : Parallel::ReductionData<ReductionDatums...>&& received_reduction_data,
285 : std::optional<Formatter>&& formatter = std::nullopt,
286 : const std::optional<int> observe_with_core_id = std::nullopt) {
287 : // The below gymnastics with pointers is done in order to minimize the
288 : // time spent locking the entire node, which is necessary because the
289 : // DataBox does not allow any functions calls, both get and mutate, during
290 : // a mutate. This design choice in DataBox is necessary to guarantee a
291 : // consistent state throughout mutation. Here, however, we need to be
292 : // reasonable efficient in parallel and so we manually guarantee that
293 : // consistent state. To this end, we create pointers and assign to them
294 : // the data in the DataBox which is guaranteed to be pointer stable. The
295 : // data itself is guaranteed to be stable inside the ReductionDataLock.
296 : std::unordered_map<observers::ObservationId,
297 : Parallel::ReductionData<ReductionDatums...>>*
298 : reduction_data = nullptr;
299 : std::unordered_map<ObservationId, std::vector<std::string>>*
300 : reduction_names_map = nullptr;
301 : std::unordered_map<observers::ObservationId,
302 : std::unordered_set<Parallel::ArrayComponentId>>*
303 : reduction_observers_contributed = nullptr;
304 : Parallel::NodeLock* reduction_data_lock = nullptr;
305 : Parallel::NodeLock* reduction_file_lock = nullptr;
306 : size_t observations_registered_with_id = std::numeric_limits<size_t>::max();
307 :
308 : {
309 : const std::lock_guard hold_lock(*node_lock);
310 : db::mutate<Tags::ReductionData<ReductionDatums...>,
311 : Tags::ReductionDataNames<ReductionDatums...>,
312 : Tags::ContributorsOfReductionData, Tags::ReductionDataLock,
313 : Tags::H5FileLock>(
314 : [&reduction_data, &reduction_names_map,
315 : &reduction_observers_contributed, &reduction_data_lock,
316 : &reduction_file_lock, &observation_id, &observer_group_id,
317 : &observations_registered_with_id](
318 : const gsl::not_null<std::unordered_map<
319 : observers::ObservationId,
320 : Parallel::ReductionData<ReductionDatums...>>*>
321 : reduction_data_ptr,
322 : const gsl::not_null<
323 : std::unordered_map<ObservationId, std::vector<std::string>>*>
324 : reduction_names_map_ptr,
325 : const gsl::not_null<std::unordered_map<
326 : observers::ObservationId,
327 : std::unordered_set<Parallel::ArrayComponentId>>*>
328 : reduction_observers_contributed_ptr,
329 : const gsl::not_null<Parallel::NodeLock*> reduction_data_lock_ptr,
330 : const gsl::not_null<Parallel::NodeLock*> reduction_file_lock_ptr,
331 : const std::unordered_map<
332 : ObservationKey,
333 : std::unordered_set<Parallel::ArrayComponentId>>&
334 : observations_registered) {
335 : const ObservationKey& key{observation_id.observation_key()};
336 : const auto& registered_group_ids = observations_registered.at(key);
337 : if (UNLIKELY(registered_group_ids.find(observer_group_id) ==
338 : registered_group_ids.end())) {
339 : ERROR("The observer group id "
340 : << observer_group_id
341 : << " was not registered for the observation id "
342 : << observation_id);
343 : }
344 : reduction_data = &*reduction_data_ptr;
345 : reduction_names_map = &*reduction_names_map_ptr;
346 : reduction_observers_contributed =
347 : &*reduction_observers_contributed_ptr;
348 : reduction_data_lock = &*reduction_data_lock_ptr;
349 : reduction_file_lock = &*reduction_file_lock_ptr;
350 : observations_registered_with_id =
351 : observations_registered.at(key).size();
352 : },
353 : make_not_null(&box),
354 : db::get<Tags::ExpectedContributorsForObservations>(box));
355 : }
356 :
357 : ASSERT(
358 : observations_registered_with_id != std::numeric_limits<size_t>::max(),
359 : "Failed to set observations_registered_with_id when mutating the "
360 : "DataBox. This is a bug in the code.");
361 :
362 : bool send_data = false;
363 : // Now that we've retrieved pointers to the data in the DataBox we wish to
364 : // manipulate, lock the data and manipulate it.
365 : {
366 : const std::lock_guard hold_data_lock(*reduction_data_lock);
367 : auto& contributed_group_ids =
368 : (*reduction_observers_contributed)[observation_id];
369 :
370 : if (UNLIKELY(contributed_group_ids.find(observer_group_id) !=
371 : contributed_group_ids.end())) {
372 : ERROR("Already received reduction data to observation id "
373 : << observation_id << " from array component id "
374 : << observer_group_id);
375 : }
376 : contributed_group_ids.insert(observer_group_id);
377 :
378 : // If requested, write the intermediate reduction data from the particular
379 : // core to one file per node. This allows measuring reduction data
380 : // per-core, e.g. performance metrics to assess load balancing.
381 : if (observe_with_core_id.has_value()) {
382 : auto reduction_data_this_core = received_reduction_data;
383 : reduction_data_this_core.finalize();
384 : auto reduction_names_this_core = reduction_names;
385 : auto& my_proxy =
386 : Parallel::get_parallel_component<ParallelComponent>(cache);
387 : const std::lock_guard hold_file_lock(*reduction_file_lock);
388 : ReductionActions_detail::write_data(
389 : "/Core" + std::to_string(observe_with_core_id.value()) +
390 : subfile_name,
391 : observers::input_source_from_cache(cache),
392 : std::move(reduction_names_this_core),
393 : std::move(reduction_data_this_core.data()),
394 : Parallel::get<Tags::ReductionFileName>(cache) +
395 : std::to_string(
396 : Parallel::my_node<int>(*Parallel::local_branch(my_proxy))),
397 : std::make_index_sequence<sizeof...(ReductionDatums)>{});
398 : }
399 :
400 : if (reduction_data->find(observation_id) == reduction_data->end()) {
401 : // This Action has been called for the first time,
402 : // so all we need to do is move the input data to the
403 : // reduction_data in the DataBox.
404 : reduction_data->operator[](observation_id) =
405 : std::move(received_reduction_data);
406 : } else {
407 : // This Action is being called at least the second time
408 : // (but not the final time if on node 0).
409 : reduction_data->at(observation_id)
410 : .combine(std::move(received_reduction_data));
411 : }
412 :
413 : if (UNLIKELY(reduction_names.empty())) {
414 : ERROR(
415 : "The reduction names, which is a std::vector of the names of "
416 : "the columns in the file, must be non-empty.");
417 : }
418 : if (auto current_names = reduction_names_map->find(observation_id);
419 : current_names == reduction_names_map->end()) {
420 : reduction_names_map->emplace(observation_id,
421 : std::move(reduction_names));
422 : } else if (UNLIKELY(current_names->second != reduction_names)) {
423 : ERROR(
424 : "The reduction names passed in must match the currently "
425 : "known reduction names.");
426 : }
427 :
428 : // Check if we have received all reduction data from the Observer
429 : // group. If so we reduce to node 0 for writing to disk. We use a bool
430 : // `send_data` to allow us to defer the send call until after we've
431 : // unlocked the lock.
432 : if (reduction_observers_contributed->at(observation_id).size() ==
433 : observations_registered_with_id) {
434 : send_data = true;
435 : // We intentionally move the data out of the map and erase it
436 : // before call `WriteReductionData` since if the call to
437 : // `WriteReductionData` is inlined and we erase data from the maps
438 : // afterward we would lose data.
439 : reduction_names =
440 : std::move(reduction_names_map->operator[](observation_id));
441 : received_reduction_data =
442 : std::move(reduction_data->operator[](observation_id));
443 : reduction_observers_contributed->erase(observation_id);
444 : reduction_data->erase(observation_id);
445 : reduction_names_map->erase(observation_id);
446 : }
447 : }
448 :
449 : if (send_data) {
450 : auto& my_proxy =
451 : Parallel::get_parallel_component<ParallelComponent>(cache);
452 : Parallel::threaded_action<WriteReductionData>(
453 : Parallel::get_parallel_component<ObserverWriter<Metavariables>>(
454 : cache)[0],
455 : observation_id,
456 : Parallel::my_node<size_t>(*Parallel::local_branch(my_proxy)),
457 : subfile_name,
458 : // NOLINTNEXTLINE(bugprone-use-after-move)
459 : std::move(reduction_names), std::move(received_reduction_data),
460 : std::move(formatter));
461 : }
462 : }
463 : };
464 :
465 : /*!
466 : * \ingroup ObserversGroup
467 : * \brief Write reduction data to disk from node 0.
468 : */
469 1 : struct WriteReductionData {
470 : template <typename ParallelComponent, typename DbTagsList,
471 : typename Metavariables, typename ArrayIndex,
472 : typename... ReductionDatums,
473 : typename Formatter = observers::NoFormatter>
474 0 : static void apply(
475 : db::DataBox<DbTagsList>& box, Parallel::GlobalCache<Metavariables>& cache,
476 : const ArrayIndex& /*array_index*/,
477 : const gsl::not_null<Parallel::NodeLock*> node_lock,
478 : const observers::ObservationId& observation_id,
479 : const size_t sender_node_number, const std::string& subfile_name,
480 : std::vector<std::string>&& reduction_names,
481 : Parallel::ReductionData<ReductionDatums...>&& received_reduction_data,
482 : std::optional<Formatter>&& formatter = std::nullopt) {
483 : if constexpr (not std::is_same_v<Formatter, observers::NoFormatter>) {
484 : static_assert(
485 : tt::assert_conforms_to_v<Formatter,
486 : protocols::ReductionDataFormatter>);
487 : static_assert(
488 : std::is_same_v<typename Formatter::reduction_data,
489 : Parallel::ReductionData<ReductionDatums...>>,
490 : "Mismatch between the formatter's `reduction_data` type alias and "
491 : "the reduction data that is being reduced.");
492 : }
493 : // The below gymnastics with pointers is done in order to minimize the
494 : // time spent locking the entire node, which is necessary because the
495 : // DataBox does not allow any functions calls, both get and mutate, during
496 : // a mutate. This design choice in DataBox is necessary to guarantee a
497 : // consistent state throughout mutation. Here, however, we need to be
498 : // reasonable efficient in parallel and so we manually guarantee that
499 : // consistent state. To this end, we create pointers and assign to them
500 : // the data in the DataBox which is guaranteed to be pointer stable. The
501 : // data itself is guaranteed to be stable inside the ReductionDataLock.
502 : std::unordered_map<observers::ObservationId,
503 : Parallel::ReductionData<ReductionDatums...>>*
504 : reduction_data = nullptr;
505 : std::unordered_map<ObservationId, std::vector<std::string>>*
506 : reduction_names_map = nullptr;
507 : std::unordered_map<observers::ObservationId, std::unordered_set<size_t>>*
508 : nodes_contributed = nullptr;
509 : Parallel::NodeLock* reduction_data_lock = nullptr;
510 : Parallel::NodeLock* reduction_file_lock = nullptr;
511 : size_t observations_registered_with_id = std::numeric_limits<size_t>::max();
512 :
513 : {
514 : const std::lock_guard hold_lock(*node_lock);
515 : db::mutate<Tags::ReductionData<ReductionDatums...>,
516 : Tags::ReductionDataNames<ReductionDatums...>,
517 : Tags::NodesThatContributedReductions, Tags::ReductionDataLock,
518 : Tags::H5FileLock>(
519 : [&nodes_contributed, &reduction_data, &reduction_names_map,
520 : &reduction_data_lock, &reduction_file_lock, &observation_id,
521 : &observations_registered_with_id, &sender_node_number](
522 : const gsl::not_null<
523 : typename Tags::ReductionData<ReductionDatums...>::type*>
524 : reduction_data_ptr,
525 : const gsl::not_null<
526 : std::unordered_map<ObservationId, std::vector<std::string>>*>
527 : reduction_names_map_ptr,
528 : const gsl::not_null<std::unordered_map<
529 : ObservationId, std::unordered_set<size_t>>*>
530 : nodes_contributed_ptr,
531 : const gsl::not_null<Parallel::NodeLock*> reduction_data_lock_ptr,
532 : const gsl::not_null<Parallel::NodeLock*> reduction_file_lock_ptr,
533 : const std::unordered_map<ObservationKey, std::set<size_t>>&
534 : nodes_registered_for_reductions) {
535 : const ObservationKey& key{observation_id.observation_key()};
536 : ASSERT(nodes_registered_for_reductions.find(key) !=
537 : nodes_registered_for_reductions.end(),
538 : "Performing reduction with unregistered ID key "
539 : << observation_id.observation_key());
540 : const auto& registered_nodes =
541 : nodes_registered_for_reductions.at(key);
542 :
543 : if (UNLIKELY(registered_nodes.find(sender_node_number) ==
544 : registered_nodes.end())) {
545 : ERROR("Node " << sender_node_number
546 : << " was not registered for the observation id "
547 : << observation_id);
548 : }
549 :
550 : reduction_data = &*reduction_data_ptr;
551 : reduction_names_map = &*reduction_names_map_ptr;
552 : nodes_contributed = &*nodes_contributed_ptr;
553 : reduction_data_lock = &*reduction_data_lock_ptr;
554 : reduction_file_lock = &*reduction_file_lock_ptr;
555 : observations_registered_with_id =
556 : nodes_registered_for_reductions.at(key).size();
557 : },
558 : make_not_null(&box),
559 : db::get<Tags::NodesExpectedToContributeReductions>(box));
560 : }
561 :
562 : ASSERT(
563 : observations_registered_with_id != std::numeric_limits<size_t>::max(),
564 : "Failed to set observations_registered_with_id when mutating the "
565 : "DataBox. This is a bug in the code.");
566 :
567 : bool write_to_disk = false;
568 : // Now that we've retrieved pointers to the data in the DataBox we wish to
569 : // manipulate, lock the data and manipulate it.
570 : {
571 : const std::lock_guard hold_lock(*reduction_data_lock);
572 : auto& nodes_contributed_to_observation =
573 : (*nodes_contributed)[observation_id];
574 : if (nodes_contributed_to_observation.find(sender_node_number) !=
575 : nodes_contributed_to_observation.end()) {
576 : ERROR("Already received reduction data at observation id "
577 : << observation_id << " from node " << sender_node_number);
578 : }
579 : nodes_contributed_to_observation.insert(sender_node_number);
580 :
581 : if (UNLIKELY(reduction_names.empty())) {
582 : ERROR(
583 : "The reduction names, which is a std::vector of the names of "
584 : "the columns in the file, must be non-empty.");
585 : }
586 : if (auto current_names = reduction_names_map->find(observation_id);
587 : current_names == reduction_names_map->end()) {
588 : reduction_names_map->emplace(observation_id,
589 : std::move(reduction_names));
590 : } else if (UNLIKELY(current_names->second != reduction_names)) {
591 : using ::operator<<;
592 : ERROR(
593 : "The reduction names passed in must match the currently "
594 : "known reduction names. Current ones are "
595 : << current_names->second << " while the received are "
596 : << reduction_names);
597 : }
598 :
599 : if (reduction_data->find(observation_id) == reduction_data->end()) {
600 : // This Action has been called for the first time,
601 : // so all we need to do is move the input data to the
602 : // reduction_data in the DataBox.
603 : reduction_data->operator[](observation_id) =
604 : std::move(received_reduction_data);
605 : } else {
606 : // This Action is being called at least the second time
607 : // (but not the final time if on node 0).
608 : reduction_data->at(observation_id)
609 : .combine(std::move(received_reduction_data));
610 : }
611 :
612 : // We use a bool `write_to_disk` to allow us to defer the data writing
613 : // until after we've unlocked the lock. For the same reason, we move the
614 : // final, reduced result into `received_reduction_data` and
615 : // `reduction_names`.
616 : if (nodes_contributed_to_observation.size() ==
617 : observations_registered_with_id) {
618 : write_to_disk = true;
619 : received_reduction_data =
620 : std::move(reduction_data->operator[](observation_id));
621 : reduction_names =
622 : std::move(reduction_names_map->operator[](observation_id));
623 : reduction_data->erase(observation_id);
624 : reduction_names_map->erase(observation_id);
625 : nodes_contributed->erase(observation_id);
626 : }
627 : }
628 :
629 : if (write_to_disk) {
630 : const std::lock_guard hold_lock(*reduction_file_lock);
631 : // NOLINTNEXTLINE(bugprone-use-after-move)
632 : received_reduction_data.finalize();
633 : if constexpr (not std::is_same_v<Formatter, NoFormatter>) {
634 : if (formatter.has_value()) {
635 : Parallel::printf(
636 : std::apply(*formatter, received_reduction_data.data()));
637 : }
638 : }
639 : ReductionActions_detail::write_data(
640 : subfile_name, observers::input_source_from_cache(cache),
641 : // NOLINTNEXTLINE(bugprone-use-after-move)
642 : std::move(reduction_names), std::move(received_reduction_data.data()),
643 : Parallel::get<Tags::ReductionFileName>(cache),
644 : std::make_index_sequence<sizeof...(ReductionDatums)>{});
645 : }
646 : }
647 : };
648 :
649 : /*!
650 : * \brief Write a single row of data to the reductions file without the need to
651 : * register or reduce anything, e.g. from a singleton component or from a
652 : * specific chare.
653 : *
654 : * Use observers::Actions::ContributeReductionData instead if you need to
655 : * perform a reduction before writing to the file.
656 : *
657 : * Invoke this action on the observers::ObserverWriter component on node 0. Pass
658 : * the following arguments when invoking this action:
659 : *
660 : * - `subfile_name`: the name of the `h5::Dat` subfile in the HDF5 file. Include
661 : * a leading slash, e.g., `/element_data`.
662 : * - `legend`: a `std::vector<std::string>` of column labels for the quantities
663 : * being observed (e.g. `{"Time", "L1ErrorDensity", "L2ErrorDensity"}`).
664 : * - `reduction_data`: a `std::tuple<...>` with the data to write. The tuple can
665 : * hold either `double`s or `std::vector<double>`s and is flattened before it
666 : * is written to the file to form a single row of values. The total number of
667 : * values must match the length of the `legend`.
668 : */
669 1 : struct WriteReductionDataRow {
670 : /// \brief The apply call for the threaded action
671 : template <typename ParallelComponent, typename DbTagsList,
672 : typename Metavariables, typename ArrayIndex, typename... Ts>
673 1 : static void apply(db::DataBox<DbTagsList>& box,
674 : Parallel::GlobalCache<Metavariables>& cache,
675 : const ArrayIndex& /*array_index*/,
676 : const gsl::not_null<Parallel::NodeLock*> node_lock,
677 : const std::string& subfile_name,
678 : std::vector<std::string> legend,
679 : std::tuple<Ts...>&& reduction_data) {
680 : apply<ParallelComponent>(box, node_lock, cache, subfile_name,
681 : std::move(legend), std::move(reduction_data));
682 : }
683 :
684 : // The local synchronous action
685 0 : using return_type = void;
686 :
687 : /// \brief The apply call for the local synchronous action
688 : template <typename ParallelComponent, typename DbTagList,
689 : typename Metavariables, typename... Ts>
690 1 : static return_type apply(
691 : db::DataBox<DbTagList>& box,
692 : const gsl::not_null<Parallel::NodeLock*> /*node_lock*/,
693 : Parallel::GlobalCache<Metavariables>& cache,
694 : const std::string& subfile_name, std::vector<std::string> legend,
695 : std::tuple<Ts...>&& reduction_data) {
696 : auto& reduction_file_lock =
697 : db::get_mutable_reference<Tags::H5FileLock>(make_not_null(&box));
698 : const std::lock_guard hold_lock(reduction_file_lock);
699 : ThreadedActions::ReductionActions_detail::write_data(
700 : subfile_name, observers::input_source_from_cache(cache),
701 : std::move(legend), std::move(reduction_data),
702 : Parallel::get<Tags::ReductionFileName>(cache),
703 : std::make_index_sequence<sizeof...(Ts)>{});
704 : }
705 : };
706 :
707 : } // namespace ThreadedActions
708 : } // namespace observers
|