Line data Source code
1 0 : // Distributed under the MIT License.
2 : // See LICENSE.txt for details.
3 :
4 : #pragma once
5 :
6 : #include <cstddef>
7 : #include <mutex>
8 : #include <optional>
9 : #include <string>
10 : #include <tuple>
11 : #include <unordered_map>
12 : #include <utility>
13 : #include <vector>
14 :
15 : #include "DataStructures/DataBox/DataBox.hpp"
16 : #include "IO/H5/AccessType.hpp"
17 : #include "IO/H5/Dat.hpp"
18 : #include "IO/H5/File.hpp"
19 : #include "IO/Observer/Helpers.hpp"
20 : #include "IO/Observer/ObservationId.hpp"
21 : #include "IO/Observer/Protocols/ReductionDataFormatter.hpp"
22 : #include "IO/Observer/Tags.hpp"
23 : #include "Parallel/ArrayComponentId.hpp"
24 : #include "Parallel/ArrayIndex.hpp"
25 : #include "Parallel/GlobalCache.hpp"
26 : #include "Parallel/Info.hpp"
27 : #include "Parallel/Invoke.hpp"
28 : #include "Parallel/Local.hpp"
29 : #include "Parallel/NodeLock.hpp"
30 : #include "Parallel/ParallelComponentHelpers.hpp"
31 : #include "Parallel/Printf/Printf.hpp"
32 : #include "Parallel/Reduction.hpp"
33 : #include "Utilities/ErrorHandling/Assert.hpp"
34 : #include "Utilities/ErrorHandling/Error.hpp"
35 : #include "Utilities/GetOutput.hpp"
36 : #include "Utilities/Gsl.hpp"
37 : #include "Utilities/PrettyType.hpp"
38 : #include "Utilities/ProtocolHelpers.hpp"
39 : #include "Utilities/Requires.hpp"
40 : #include "Utilities/Serialization/PupStlCpp17.hpp"
41 : #include "Utilities/StdHelpers.hpp"
42 : #include "Utilities/TMPL.hpp"
43 : #include "Utilities/TaggedTuple.hpp"
44 :
45 : namespace observers {
46 :
47 : /// \cond
48 : template <class Metavariables>
49 : struct ObserverWriter;
50 : /// \endcond
51 :
52 0 : namespace ThreadedActions {
53 : /// \cond
54 : struct CollectReductionDataOnNode;
55 : struct WriteReductionData;
56 : /// \endcond
57 : } // namespace ThreadedActions
58 :
59 : /// Indicates no formatter is selected
60 1 : struct NoFormatter {
61 0 : void pup(PUP::er& /*p*/) {}
62 : };
63 :
64 : namespace Actions {
65 : /// \cond
66 : struct ContributeReductionDataToWriter;
67 : /// \endcond
68 :
69 : /*!
70 : * \ingroup ObserversGroup
71 : * \brief Send reduction data to the observer group.
72 : *
73 : * Once everything at a specific `ObservationId` has been contributed to the
74 : * reduction, the groups reduce to their local nodegroup.
75 : *
76 : * The caller of this Action (which is to be invoked on the Observer parallel
77 : * component) must pass in an `observation_id` used to uniquely identify the
78 : * observation in time, the name of the `h5::Dat` subfile in the HDF5 file (e.g.
79 : * `/element_data`, where the slash is important), a `std::vector<std::string>`
80 : * of names of the quantities being reduced (e.g. `{"Time", "L1ErrorDensity",
81 : * "L2ErrorDensity"}`), and the `Parallel::ReductionData` that holds the
82 : * `ReductionDatums` containing info on how to do the reduction.
83 : *
84 : * The observer components need to know all expected reduction data types by
85 : * compile-time, so they rely on the
86 : * `Metavariables::observed_reduction_data_tags` alias to collect them in one
87 : * place. To this end, each Action that contributes reduction data must expose
88 : * the type alias as:
89 : *
90 : * \snippet ObserverHelpers.hpp make_reduction_data_tags
91 : *
92 : * Then, in the `Metavariables` collect them from all observing Actions using
93 : * the `observers::collect_reduction_data_tags` metafunction.
94 : *
95 : * This action also accepts a "formatter" that will be forwarded along with the
96 : * reduction data and used to print an informative message when the reduction is
97 : * complete. The formatter must conform to
98 : * `observers::protocols::ReductionDataFormatter`.
99 : *
100 : * This action also supports observing the intermediate stage of the reduction
101 : * over just the processing element that the element is currently on. This can
102 : * be useful e.g. to measure performance metric to assess load-balancing such as
103 : * the number of grid points on each core. Enable per-core observations by
104 : * passing `true` for the `observe_per_core` argument (default: `false`). The
105 : * data will be written in one H5 file per _node_ prefixed with the
106 : * `observers::Tags::ReductionFileName`, in a `Core{core_id}` subfile, where
107 : * `core_id` is an integer identifying the core across all nodes (see
108 : * `Parallel::my_proc`). For example, when running on 2 nodes with 2 cores each
109 : * you will end up with `Reductions0.h5` containing `/Core0/{subfile_name}.dat`
110 : * and `/Core1/{subfile_name}.dat`, and `Reductions1.h5` containing
111 : * `/Core2/{subfile_name}.dat` and `/Core3/{subfile_name}.dat`. This is in
112 : * addition to the usual reduction output over _all_ registered elements,
113 : * written to `Reductions.h5` (no node ID suffix in the file name).
114 : */
115 1 : struct ContributeReductionData {
116 : template <typename ParallelComponent, typename DbTagsList,
117 : typename Metavariables, typename ArrayIndex, typename... Ts,
118 : typename Formatter = observers::NoFormatter>
119 0 : static void apply(db::DataBox<DbTagsList>& box,
120 : Parallel::GlobalCache<Metavariables>& cache,
121 : const ArrayIndex& array_index,
122 : const observers::ObservationId& observation_id,
123 : const Parallel::ArrayComponentId& sender_array_id,
124 : const std::string& subfile_name,
125 : const std::vector<std::string>& reduction_names,
126 : Parallel::ReductionData<Ts...>&& reduction_data,
127 : std::optional<Formatter>&& formatter = std::nullopt,
128 : const bool observe_per_core = false) {
129 : db::mutate<Tags::ReductionData<Ts...>, Tags::ReductionDataNames<Ts...>,
130 : Tags::ContributorsOfReductionData>(
131 : [&array_index, &cache, &observation_id,
132 : reduction_data = std::move(reduction_data), &reduction_names,
133 : &sender_array_id, &subfile_name, &formatter, &observe_per_core](
134 : const gsl::not_null<std::unordered_map<
135 : ObservationId, Parallel::ReductionData<Ts...>>*>
136 : reduction_data_map,
137 : const gsl::not_null<
138 : std::unordered_map<ObservationId, std::vector<std::string>>*>
139 : reduction_names_map,
140 : const gsl::not_null<std::unordered_map<
141 : ObservationId, std::unordered_set<Parallel::ArrayComponentId>>*>
142 : reduction_observers_contributed,
143 : const std::unordered_map<
144 : ObservationKey,
145 : std::unordered_set<Parallel::ArrayComponentId>>&
146 : observations_registered) mutable { // NOLINT(spectre-mutable)
147 : ASSERT(
148 : observations_registered.find(observation_id.observation_key()) !=
149 : observations_registered.end(),
150 : "Couldn't find registration key "
151 : << observation_id.observation_key()
152 : << " in the registered observers. Known IDs are "
153 : << keys_of(observations_registered)
154 : << "\nIt could be that you are using a nodegroup DG "
155 : "collection but the observation code for the event hasn't "
156 : "been updated to work with the nodegroup yet.");
157 :
158 : auto& contributed_array_ids =
159 : (*reduction_observers_contributed)[observation_id];
160 : if (UNLIKELY(contributed_array_ids.find(sender_array_id) !=
161 : contributed_array_ids.end())) {
162 : ERROR("Already received reduction data to observation id "
163 : << observation_id << " from array component id "
164 : << sender_array_id);
165 : }
166 : contributed_array_ids.insert(sender_array_id);
167 :
168 : if (reduction_data_map->count(observation_id) == 0) {
169 : reduction_data_map->emplace(observation_id,
170 : std::move(reduction_data));
171 : reduction_names_map->emplace(observation_id, reduction_names);
172 : } else {
173 : if (UNLIKELY(reduction_names_map->at(observation_id) !=
174 : reduction_names)) {
175 : using ::operator<<;
176 : ERROR("Reduction names differ at ObservationId "
177 : << observation_id << " with the expected names being "
178 : << reduction_names_map->at(observation_id)
179 : << " and the received names being " << reduction_names);
180 : }
181 : reduction_data_map->operator[](observation_id)
182 : .combine(std::move(reduction_data));
183 : }
184 :
185 : // Check if we have received all reduction data from the registered
186 : // elements. If so, we reduce to the local ObserverWriter nodegroup.
187 : if (UNLIKELY(
188 : contributed_array_ids.size() ==
189 : observations_registered.at(observation_id.observation_key())
190 : .size())) {
191 : auto& local_writer = *Parallel::local_branch(
192 : Parallel::get_parallel_component<ObserverWriter<Metavariables>>(
193 : cache));
194 : auto& my_proxy =
195 : Parallel::get_parallel_component<ParallelComponent>(cache);
196 : const std::optional<int> observe_with_core_id =
197 : observe_per_core ? std::make_optional(Parallel::my_proc<int>(
198 : *Parallel::local_branch(my_proxy)))
199 : : std::nullopt;
200 : Parallel::threaded_action<
201 : ThreadedActions::CollectReductionDataOnNode>(
202 : local_writer, observation_id,
203 : Parallel::make_array_component_id<ParallelComponent>(
204 : array_index),
205 : subfile_name, (*reduction_names_map)[observation_id],
206 : std::move((*reduction_data_map)[observation_id]),
207 : std::move(formatter), observe_with_core_id);
208 : reduction_data_map->erase(observation_id);
209 : reduction_names_map->erase(observation_id);
210 : reduction_observers_contributed->erase(observation_id);
211 : }
212 : },
213 : make_not_null(&box),
214 : db::get<Tags::ExpectedContributorsForObservations>(box));
215 : // Silence a gcc <= 9 unused-variable warning
216 : (void)observe_per_core;
217 : }
218 : };
219 : } // namespace Actions
220 :
221 : namespace ThreadedActions {
222 :
223 : namespace ReductionActions_detail {
224 : void append_to_reduction_data(
225 : const gsl::not_null<std::vector<double>*> all_reduction_data,
226 : const double t);
227 :
228 : void append_to_reduction_data(
229 : const gsl::not_null<std::vector<double>*> all_reduction_data,
230 : const std::vector<double>& t);
231 :
232 : template <typename... Ts, size_t... Is>
233 : void write_data(const std::string& subfile_name,
234 : const std::string& input_source,
235 : std::vector<std::string> legend, const std::tuple<Ts...>& data,
236 : const std::string& file_prefix,
237 : std::index_sequence<Is...> /*meta*/) {
238 : static_assert(sizeof...(Ts) > 0,
239 : "Must be reducing at least one piece of data");
240 : std::vector<double> data_to_append{};
241 : EXPAND_PACK_LEFT_TO_RIGHT(
242 : append_to_reduction_data(&data_to_append, std::get<Is>(data)));
243 :
244 : if (legend.size() != data_to_append.size()) {
245 : ERROR(
246 : "There must be one name provided for each piece of data. You provided "
247 : << legend.size() << " names: '" << get_output(legend)
248 : << "' but there are " << data_to_append.size()
249 : << " pieces of data being reduced");
250 : }
251 :
252 : h5::H5File<h5::AccessType::ReadWrite> h5file(file_prefix + ".h5", true,
253 : input_source);
254 : constexpr size_t version_number = 0;
255 : auto& time_series_file = h5file.try_insert<h5::Dat>(
256 : subfile_name, std::move(legend), version_number);
257 : time_series_file.append(data_to_append);
258 : }
259 : } // namespace ReductionActions_detail
260 :
261 : /*!
262 : * \brief Gathers all the reduction data from all processing elements/cores on a
263 : * node.
264 : */
265 1 : struct CollectReductionDataOnNode {
266 : public:
267 : template <typename ParallelComponent, typename DbTagsList,
268 : typename Metavariables, typename ArrayIndex,
269 : typename... ReductionDatums,
270 : typename Formatter = observers::NoFormatter>
271 0 : static void apply(
272 : db::DataBox<DbTagsList>& box, Parallel::GlobalCache<Metavariables>& cache,
273 : const ArrayIndex& /*array_index*/,
274 : const gsl::not_null<Parallel::NodeLock*> node_lock,
275 : const observers::ObservationId& observation_id,
276 : Parallel::ArrayComponentId observer_group_id,
277 : const std::string& subfile_name,
278 : std::vector<std::string>&& reduction_names,
279 : Parallel::ReductionData<ReductionDatums...>&& received_reduction_data,
280 : std::optional<Formatter>&& formatter = std::nullopt,
281 : const std::optional<int> observe_with_core_id = std::nullopt) {
282 : // The below gymnastics with pointers is done in order to minimize the
283 : // time spent locking the entire node, which is necessary because the
284 : // DataBox does not allow any functions calls, both get and mutate, during
285 : // a mutate. This design choice in DataBox is necessary to guarantee a
286 : // consistent state throughout mutation. Here, however, we need to be
287 : // reasonable efficient in parallel and so we manually guarantee that
288 : // consistent state. To this end, we create pointers and assign to them
289 : // the data in the DataBox which is guaranteed to be pointer stable. The
290 : // data itself is guaranteed to be stable inside the ReductionDataLock.
291 : std::unordered_map<observers::ObservationId,
292 : Parallel::ReductionData<ReductionDatums...>>*
293 : reduction_data = nullptr;
294 : std::unordered_map<ObservationId, std::vector<std::string>>*
295 : reduction_names_map = nullptr;
296 : std::unordered_map<observers::ObservationId,
297 : std::unordered_set<Parallel::ArrayComponentId>>*
298 : reduction_observers_contributed = nullptr;
299 : Parallel::NodeLock* reduction_data_lock = nullptr;
300 : Parallel::NodeLock* reduction_file_lock = nullptr;
301 : size_t observations_registered_with_id = std::numeric_limits<size_t>::max();
302 :
303 : {
304 : const std::lock_guard hold_lock(*node_lock);
305 : db::mutate<Tags::ReductionData<ReductionDatums...>,
306 : Tags::ReductionDataNames<ReductionDatums...>,
307 : Tags::ContributorsOfReductionData, Tags::ReductionDataLock,
308 : Tags::H5FileLock>(
309 : [&reduction_data, &reduction_names_map,
310 : &reduction_observers_contributed, &reduction_data_lock,
311 : &reduction_file_lock, &observation_id, &observer_group_id,
312 : &observations_registered_with_id](
313 : const gsl::not_null<std::unordered_map<
314 : observers::ObservationId,
315 : Parallel::ReductionData<ReductionDatums...>>*>
316 : reduction_data_ptr,
317 : const gsl::not_null<
318 : std::unordered_map<ObservationId, std::vector<std::string>>*>
319 : reduction_names_map_ptr,
320 : const gsl::not_null<std::unordered_map<
321 : observers::ObservationId,
322 : std::unordered_set<Parallel::ArrayComponentId>>*>
323 : reduction_observers_contributed_ptr,
324 : const gsl::not_null<Parallel::NodeLock*> reduction_data_lock_ptr,
325 : const gsl::not_null<Parallel::NodeLock*> reduction_file_lock_ptr,
326 : const std::unordered_map<
327 : ObservationKey,
328 : std::unordered_set<Parallel::ArrayComponentId>>&
329 : observations_registered) {
330 : const ObservationKey& key{observation_id.observation_key()};
331 : const auto& registered_group_ids = observations_registered.at(key);
332 : if (UNLIKELY(registered_group_ids.find(observer_group_id) ==
333 : registered_group_ids.end())) {
334 : ERROR("The observer group id "
335 : << observer_group_id
336 : << " was not registered for the observation id "
337 : << observation_id);
338 : }
339 : reduction_data = &*reduction_data_ptr;
340 : reduction_names_map = &*reduction_names_map_ptr;
341 : reduction_observers_contributed =
342 : &*reduction_observers_contributed_ptr;
343 : reduction_data_lock = &*reduction_data_lock_ptr;
344 : reduction_file_lock = &*reduction_file_lock_ptr;
345 : observations_registered_with_id =
346 : observations_registered.at(key).size();
347 : },
348 : make_not_null(&box),
349 : db::get<Tags::ExpectedContributorsForObservations>(box));
350 : }
351 :
352 : ASSERT(
353 : observations_registered_with_id != std::numeric_limits<size_t>::max(),
354 : "Failed to set observations_registered_with_id when mutating the "
355 : "DataBox. This is a bug in the code.");
356 :
357 : bool send_data = false;
358 : // Now that we've retrieved pointers to the data in the DataBox we wish to
359 : // manipulate, lock the data and manipulate it.
360 : {
361 : const std::lock_guard hold_data_lock(*reduction_data_lock);
362 : auto& contributed_group_ids =
363 : (*reduction_observers_contributed)[observation_id];
364 :
365 : if (UNLIKELY(contributed_group_ids.find(observer_group_id) !=
366 : contributed_group_ids.end())) {
367 : ERROR("Already received reduction data to observation id "
368 : << observation_id << " from array component id "
369 : << observer_group_id);
370 : }
371 : contributed_group_ids.insert(observer_group_id);
372 :
373 : // If requested, write the intermediate reduction data from the particular
374 : // core to one file per node. This allows measuring reduction data
375 : // per-core, e.g. performance metrics to assess load balancing.
376 : if (observe_with_core_id.has_value()) {
377 : auto reduction_data_this_core = received_reduction_data;
378 : reduction_data_this_core.finalize();
379 : auto reduction_names_this_core = reduction_names;
380 : auto& my_proxy =
381 : Parallel::get_parallel_component<ParallelComponent>(cache);
382 : const std::lock_guard hold_file_lock(*reduction_file_lock);
383 : ReductionActions_detail::write_data(
384 : "/Core" + std::to_string(observe_with_core_id.value()) +
385 : subfile_name,
386 : observers::input_source_from_cache(cache),
387 : std::move(reduction_names_this_core),
388 : std::move(reduction_data_this_core.data()),
389 : Parallel::get<Tags::ReductionFileName>(cache) +
390 : std::to_string(
391 : Parallel::my_node<int>(*Parallel::local_branch(my_proxy))),
392 : std::make_index_sequence<sizeof...(ReductionDatums)>{});
393 : }
394 :
395 : if (reduction_data->find(observation_id) == reduction_data->end()) {
396 : // This Action has been called for the first time,
397 : // so all we need to do is move the input data to the
398 : // reduction_data in the DataBox.
399 : reduction_data->operator[](observation_id) =
400 : std::move(received_reduction_data);
401 : } else {
402 : // This Action is being called at least the second time
403 : // (but not the final time if on node 0).
404 : reduction_data->at(observation_id)
405 : .combine(std::move(received_reduction_data));
406 : }
407 :
408 : if (UNLIKELY(reduction_names.empty())) {
409 : ERROR(
410 : "The reduction names, which is a std::vector of the names of "
411 : "the columns in the file, must be non-empty.");
412 : }
413 : if (auto current_names = reduction_names_map->find(observation_id);
414 : current_names == reduction_names_map->end()) {
415 : reduction_names_map->emplace(observation_id,
416 : std::move(reduction_names));
417 : } else if (UNLIKELY(current_names->second != reduction_names)) {
418 : ERROR(
419 : "The reduction names passed in must match the currently "
420 : "known reduction names.");
421 : }
422 :
423 : // Check if we have received all reduction data from the Observer
424 : // group. If so we reduce to node 0 for writing to disk. We use a bool
425 : // `send_data` to allow us to defer the send call until after we've
426 : // unlocked the lock.
427 : if (reduction_observers_contributed->at(observation_id).size() ==
428 : observations_registered_with_id) {
429 : send_data = true;
430 : // We intentionally move the data out of the map and erase it
431 : // before call `WriteReductionData` since if the call to
432 : // `WriteReductionData` is inlined and we erase data from the maps
433 : // afterward we would lose data.
434 : reduction_names =
435 : std::move(reduction_names_map->operator[](observation_id));
436 : received_reduction_data =
437 : std::move(reduction_data->operator[](observation_id));
438 : reduction_observers_contributed->erase(observation_id);
439 : reduction_data->erase(observation_id);
440 : reduction_names_map->erase(observation_id);
441 : }
442 : }
443 :
444 : if (send_data) {
445 : auto& my_proxy =
446 : Parallel::get_parallel_component<ParallelComponent>(cache);
447 : Parallel::threaded_action<WriteReductionData>(
448 : Parallel::get_parallel_component<ObserverWriter<Metavariables>>(
449 : cache)[0],
450 : observation_id,
451 : Parallel::my_node<size_t>(*Parallel::local_branch(my_proxy)),
452 : subfile_name,
453 : // NOLINTNEXTLINE(bugprone-use-after-move)
454 : std::move(reduction_names), std::move(received_reduction_data),
455 : std::move(formatter));
456 : }
457 : }
458 : };
459 :
460 : /*!
461 : * \ingroup ObserversGroup
462 : * \brief Write reduction data to disk from node 0.
463 : */
464 1 : struct WriteReductionData {
465 : template <typename ParallelComponent, typename DbTagsList,
466 : typename Metavariables, typename ArrayIndex,
467 : typename... ReductionDatums,
468 : typename Formatter = observers::NoFormatter>
469 0 : static void apply(
470 : db::DataBox<DbTagsList>& box, Parallel::GlobalCache<Metavariables>& cache,
471 : const ArrayIndex& /*array_index*/,
472 : const gsl::not_null<Parallel::NodeLock*> node_lock,
473 : const observers::ObservationId& observation_id,
474 : const size_t sender_node_number, const std::string& subfile_name,
475 : std::vector<std::string>&& reduction_names,
476 : Parallel::ReductionData<ReductionDatums...>&& received_reduction_data,
477 : std::optional<Formatter>&& formatter = std::nullopt) {
478 : if constexpr (not std::is_same_v<Formatter, observers::NoFormatter>) {
479 : static_assert(
480 : tt::assert_conforms_to_v<Formatter,
481 : protocols::ReductionDataFormatter>);
482 : static_assert(
483 : std::is_same_v<typename Formatter::reduction_data,
484 : Parallel::ReductionData<ReductionDatums...>>,
485 : "Mismatch between the formatter's `reduction_data` type alias and "
486 : "the reduction data that is being reduced.");
487 : }
488 : // The below gymnastics with pointers is done in order to minimize the
489 : // time spent locking the entire node, which is necessary because the
490 : // DataBox does not allow any functions calls, both get and mutate, during
491 : // a mutate. This design choice in DataBox is necessary to guarantee a
492 : // consistent state throughout mutation. Here, however, we need to be
493 : // reasonable efficient in parallel and so we manually guarantee that
494 : // consistent state. To this end, we create pointers and assign to them
495 : // the data in the DataBox which is guaranteed to be pointer stable. The
496 : // data itself is guaranteed to be stable inside the ReductionDataLock.
497 : std::unordered_map<observers::ObservationId,
498 : Parallel::ReductionData<ReductionDatums...>>*
499 : reduction_data = nullptr;
500 : std::unordered_map<ObservationId, std::vector<std::string>>*
501 : reduction_names_map = nullptr;
502 : std::unordered_map<observers::ObservationId, std::unordered_set<size_t>>*
503 : nodes_contributed = nullptr;
504 : Parallel::NodeLock* reduction_data_lock = nullptr;
505 : Parallel::NodeLock* reduction_file_lock = nullptr;
506 : size_t observations_registered_with_id = std::numeric_limits<size_t>::max();
507 :
508 : {
509 : const std::lock_guard hold_lock(*node_lock);
510 : db::mutate<Tags::ReductionData<ReductionDatums...>,
511 : Tags::ReductionDataNames<ReductionDatums...>,
512 : Tags::NodesThatContributedReductions, Tags::ReductionDataLock,
513 : Tags::H5FileLock>(
514 : [&nodes_contributed, &reduction_data, &reduction_names_map,
515 : &reduction_data_lock, &reduction_file_lock, &observation_id,
516 : &observations_registered_with_id, &sender_node_number](
517 : const gsl::not_null<
518 : typename Tags::ReductionData<ReductionDatums...>::type*>
519 : reduction_data_ptr,
520 : const gsl::not_null<
521 : std::unordered_map<ObservationId, std::vector<std::string>>*>
522 : reduction_names_map_ptr,
523 : const gsl::not_null<std::unordered_map<
524 : ObservationId, std::unordered_set<size_t>>*>
525 : nodes_contributed_ptr,
526 : const gsl::not_null<Parallel::NodeLock*> reduction_data_lock_ptr,
527 : const gsl::not_null<Parallel::NodeLock*> reduction_file_lock_ptr,
528 : const std::unordered_map<ObservationKey, std::set<size_t>>&
529 : nodes_registered_for_reductions) {
530 : const ObservationKey& key{observation_id.observation_key()};
531 : ASSERT(nodes_registered_for_reductions.find(key) !=
532 : nodes_registered_for_reductions.end(),
533 : "Performing reduction with unregistered ID key "
534 : << observation_id.observation_key());
535 : const auto& registered_nodes =
536 : nodes_registered_for_reductions.at(key);
537 :
538 : if (UNLIKELY(registered_nodes.find(sender_node_number) ==
539 : registered_nodes.end())) {
540 : ERROR("Node " << sender_node_number
541 : << " was not registered for the observation id "
542 : << observation_id);
543 : }
544 :
545 : reduction_data = &*reduction_data_ptr;
546 : reduction_names_map = &*reduction_names_map_ptr;
547 : nodes_contributed = &*nodes_contributed_ptr;
548 : reduction_data_lock = &*reduction_data_lock_ptr;
549 : reduction_file_lock = &*reduction_file_lock_ptr;
550 : observations_registered_with_id =
551 : nodes_registered_for_reductions.at(key).size();
552 : },
553 : make_not_null(&box),
554 : db::get<Tags::NodesExpectedToContributeReductions>(box));
555 : }
556 :
557 : ASSERT(
558 : observations_registered_with_id != std::numeric_limits<size_t>::max(),
559 : "Failed to set observations_registered_with_id when mutating the "
560 : "DataBox. This is a bug in the code.");
561 :
562 : bool write_to_disk = false;
563 : // Now that we've retrieved pointers to the data in the DataBox we wish to
564 : // manipulate, lock the data and manipulate it.
565 : {
566 : const std::lock_guard hold_lock(*reduction_data_lock);
567 : auto& nodes_contributed_to_observation =
568 : (*nodes_contributed)[observation_id];
569 : if (nodes_contributed_to_observation.find(sender_node_number) !=
570 : nodes_contributed_to_observation.end()) {
571 : ERROR("Already received reduction data at observation id "
572 : << observation_id << " from node " << sender_node_number);
573 : }
574 : nodes_contributed_to_observation.insert(sender_node_number);
575 :
576 : if (UNLIKELY(reduction_names.empty())) {
577 : ERROR(
578 : "The reduction names, which is a std::vector of the names of "
579 : "the columns in the file, must be non-empty.");
580 : }
581 : if (auto current_names = reduction_names_map->find(observation_id);
582 : current_names == reduction_names_map->end()) {
583 : reduction_names_map->emplace(observation_id,
584 : std::move(reduction_names));
585 : } else if (UNLIKELY(current_names->second != reduction_names)) {
586 : using ::operator<<;
587 : ERROR(
588 : "The reduction names passed in must match the currently "
589 : "known reduction names. Current ones are "
590 : << current_names->second << " while the received are "
591 : << reduction_names);
592 : }
593 :
594 : if (reduction_data->find(observation_id) == reduction_data->end()) {
595 : // This Action has been called for the first time,
596 : // so all we need to do is move the input data to the
597 : // reduction_data in the DataBox.
598 : reduction_data->operator[](observation_id) =
599 : std::move(received_reduction_data);
600 : } else {
601 : // This Action is being called at least the second time
602 : // (but not the final time if on node 0).
603 : reduction_data->at(observation_id)
604 : .combine(std::move(received_reduction_data));
605 : }
606 :
607 : // We use a bool `write_to_disk` to allow us to defer the data writing
608 : // until after we've unlocked the lock. For the same reason, we move the
609 : // final, reduced result into `received_reduction_data` and
610 : // `reduction_names`.
611 : if (nodes_contributed_to_observation.size() ==
612 : observations_registered_with_id) {
613 : write_to_disk = true;
614 : received_reduction_data =
615 : std::move(reduction_data->operator[](observation_id));
616 : reduction_names =
617 : std::move(reduction_names_map->operator[](observation_id));
618 : reduction_data->erase(observation_id);
619 : reduction_names_map->erase(observation_id);
620 : nodes_contributed->erase(observation_id);
621 : }
622 : }
623 :
624 : if (write_to_disk) {
625 : const std::lock_guard hold_lock(*reduction_file_lock);
626 : // NOLINTNEXTLINE(bugprone-use-after-move)
627 : received_reduction_data.finalize();
628 : if constexpr (not std::is_same_v<Formatter, NoFormatter>) {
629 : if (formatter.has_value()) {
630 : Parallel::printf(
631 : std::apply(*formatter, received_reduction_data.data()) + "\n");
632 : }
633 : }
634 : ReductionActions_detail::write_data(
635 : subfile_name, observers::input_source_from_cache(cache),
636 : // NOLINTNEXTLINE(bugprone-use-after-move)
637 : std::move(reduction_names), std::move(received_reduction_data.data()),
638 : Parallel::get<Tags::ReductionFileName>(cache),
639 : std::make_index_sequence<sizeof...(ReductionDatums)>{});
640 : }
641 : }
642 : };
643 :
644 : /*!
645 : * \brief Write a single row of data to the reductions file without the need to
646 : * register or reduce anything, e.g. from a singleton component or from a
647 : * specific chare.
648 : *
649 : * Use observers::Actions::ContributeReductionData instead if you need to
650 : * perform a reduction before writing to the file.
651 : *
652 : * Invoke this action on the observers::ObserverWriter component on node 0. Pass
653 : * the following arguments when invoking this action:
654 : *
655 : * - `subfile_name`: the name of the `h5::Dat` subfile in the HDF5 file. Include
656 : * a leading slash, e.g., `/element_data`.
657 : * - `legend`: a `std::vector<std::string>` of column labels for the quantities
658 : * being observed (e.g. `{"Time", "L1ErrorDensity", "L2ErrorDensity"}`).
659 : * - `reduction_data`: a `std::tuple<...>` with the data to write. The tuple can
660 : * hold either `double`s or `std::vector<double>`s and is flattened before it
661 : * is written to the file to form a single row of values. The total number of
662 : * values must match the length of the `legend`.
663 : */
664 1 : struct WriteReductionDataRow {
665 : template <typename ParallelComponent, typename DbTagsList,
666 : typename Metavariables, typename ArrayIndex, typename... Ts,
667 : typename DataBox = db::DataBox<DbTagsList>>
668 0 : static void apply(db::DataBox<DbTagsList>& box,
669 : Parallel::GlobalCache<Metavariables>& cache,
670 : const ArrayIndex& /*array_index*/,
671 : const gsl::not_null<Parallel::NodeLock*> /*node_lock*/,
672 : const std::string& subfile_name,
673 : std::vector<std::string>&& legend,
674 : std::tuple<Ts...>&& reduction_data) {
675 : auto& reduction_file_lock =
676 : db::get_mutable_reference<Tags::H5FileLock>(make_not_null(&box));
677 : const std::lock_guard hold_lock(reduction_file_lock);
678 : ThreadedActions::ReductionActions_detail::write_data(
679 : subfile_name, observers::input_source_from_cache(cache),
680 : std::move(legend), std::move(reduction_data),
681 : Parallel::get<Tags::ReductionFileName>(cache),
682 : std::make_index_sequence<sizeof...(Ts)>{});
683 : }
684 : };
685 :
686 : } // namespace ThreadedActions
687 : } // namespace observers
|