SpECTRE Documentation Coverage Report
Current view: top level - IO/Observer - ReductionActions.hpp Hit Total Coverage
Commit: 664546099c4dbf27a1b708fac45e39c82dd743d2 Lines: 5 12 41.7 %
Date: 2024-04-19 16:28:01
Legend: Lines: hit not hit

          Line data    Source code
       1           0 : // Distributed under the MIT License.
       2             : // See LICENSE.txt for details.
       3             : 
       4             : #pragma once
       5             : 
       6             : #include <cstddef>
       7             : #include <mutex>
       8             : #include <optional>
       9             : #include <string>
      10             : #include <tuple>
      11             : #include <unordered_map>
      12             : #include <utility>
      13             : #include <vector>
      14             : 
      15             : #include "DataStructures/DataBox/DataBox.hpp"
      16             : #include "IO/H5/AccessType.hpp"
      17             : #include "IO/H5/Dat.hpp"
      18             : #include "IO/H5/File.hpp"
      19             : #include "IO/Observer/Helpers.hpp"
      20             : #include "IO/Observer/ObservationId.hpp"
      21             : #include "IO/Observer/Protocols/ReductionDataFormatter.hpp"
      22             : #include "IO/Observer/Tags.hpp"
      23             : #include "Parallel/ArrayComponentId.hpp"
      24             : #include "Parallel/ArrayIndex.hpp"
      25             : #include "Parallel/GlobalCache.hpp"
      26             : #include "Parallel/Info.hpp"
      27             : #include "Parallel/Invoke.hpp"
      28             : #include "Parallel/Local.hpp"
      29             : #include "Parallel/NodeLock.hpp"
      30             : #include "Parallel/ParallelComponentHelpers.hpp"
      31             : #include "Parallel/Printf/Printf.hpp"
      32             : #include "Parallel/Reduction.hpp"
      33             : #include "Utilities/ErrorHandling/Assert.hpp"
      34             : #include "Utilities/ErrorHandling/Error.hpp"
      35             : #include "Utilities/GetOutput.hpp"
      36             : #include "Utilities/Gsl.hpp"
      37             : #include "Utilities/PrettyType.hpp"
      38             : #include "Utilities/ProtocolHelpers.hpp"
      39             : #include "Utilities/Requires.hpp"
      40             : #include "Utilities/Serialization/PupStlCpp17.hpp"
      41             : #include "Utilities/StdHelpers.hpp"
      42             : #include "Utilities/TMPL.hpp"
      43             : #include "Utilities/TaggedTuple.hpp"
      44             : 
      45             : namespace observers {
      46             : 
      47             : /// \cond
      48             : template <class Metavariables>
      49             : struct ObserverWriter;
      50             : /// \endcond
      51             : 
      52           0 : namespace ThreadedActions {
      53             : /// \cond
      54             : struct CollectReductionDataOnNode;
      55             : struct WriteReductionData;
      56             : /// \endcond
      57             : }  // namespace ThreadedActions
      58             : 
      59             : /// Indicates no formatter is selected
      60           1 : struct NoFormatter {
      61           0 :   void pup(PUP::er& /*p*/) {}
      62             : };
      63             : 
      64             : namespace Actions {
      65             : /// \cond
      66             : struct ContributeReductionDataToWriter;
      67             : /// \endcond
      68             : 
      69             : /*!
      70             :  * \ingroup ObserversGroup
      71             :  * \brief Send reduction data to the observer group.
      72             :  *
      73             :  * Once everything at a specific `ObservationId` has been contributed to the
      74             :  * reduction, the groups reduce to their local nodegroup.
      75             :  *
      76             :  * The caller of this Action (which is to be invoked on the Observer parallel
      77             :  * component) must pass in an `observation_id` used to uniquely identify the
      78             :  * observation in time, the name of the `h5::Dat` subfile in the HDF5 file (e.g.
      79             :  * `/element_data`, where the slash is important), a `std::vector<std::string>`
      80             :  * of names of the quantities being reduced (e.g. `{"Time", "L1ErrorDensity",
      81             :  * "L2ErrorDensity"}`), and the `Parallel::ReductionData` that holds the
      82             :  * `ReductionDatums` containing info on how to do the reduction.
      83             :  *
      84             :  * The observer components need to know all expected reduction data types by
      85             :  * compile-time, so they rely on the
      86             :  * `Metavariables::observed_reduction_data_tags` alias to collect them in one
      87             :  * place. To this end, each Action that contributes reduction data must expose
      88             :  * the type alias as:
      89             :  *
      90             :  * \snippet ObserverHelpers.hpp make_reduction_data_tags
      91             :  *
      92             :  * Then, in the `Metavariables` collect them from all observing Actions using
      93             :  * the `observers::collect_reduction_data_tags` metafunction.
      94             :  *
      95             :  * This action also accepts a "formatter" that will be forwarded along with the
      96             :  * reduction data and used to print an informative message when the reduction is
      97             :  * complete. The formatter must conform to
      98             :  * `observers::protocols::ReductionDataFormatter`.
      99             :  *
     100             :  * This action also supports observing the intermediate stage of the reduction
     101             :  * over just the processing element that the element is currently on. This can
     102             :  * be useful e.g. to measure performance metric to assess load-balancing such as
     103             :  * the number of grid points on each core. Enable per-core observations by
     104             :  * passing `true` for the `observe_per_core` argument (default: `false`). The
     105             :  * data will be written in one H5 file per _node_ prefixed with the
     106             :  * `observers::Tags::ReductionFileName`, in a `Core{core_id}` subfile, where
     107             :  * `core_id` is an integer identifying the core across all nodes (see
     108             :  * `Parallel::my_proc`). For example, when running on 2 nodes with 2 cores each
     109             :  * you will end up with `Reductions0.h5` containing `/Core0/{subfile_name}.dat`
     110             :  * and `/Core1/{subfile_name}.dat`, and `Reductions1.h5` containing
     111             :  * `/Core2/{subfile_name}.dat` and `/Core3/{subfile_name}.dat`. This is in
     112             :  * addition to the usual reduction output over _all_ registered elements,
     113             :  * written to `Reductions.h5` (no node ID suffix in the file name).
     114             :  */
     115           1 : struct ContributeReductionData {
     116             :   template <typename ParallelComponent, typename DbTagsList,
     117             :             typename Metavariables, typename ArrayIndex, typename... Ts,
     118             :             typename Formatter = observers::NoFormatter>
     119           0 :   static void apply(db::DataBox<DbTagsList>& box,
     120             :                     Parallel::GlobalCache<Metavariables>& cache,
     121             :                     const ArrayIndex& array_index,
     122             :                     const observers::ObservationId& observation_id,
     123             :                     const Parallel::ArrayComponentId& sender_array_id,
     124             :                     const std::string& subfile_name,
     125             :                     const std::vector<std::string>& reduction_names,
     126             :                     Parallel::ReductionData<Ts...>&& reduction_data,
     127             :                     std::optional<Formatter>&& formatter = std::nullopt,
     128             :                     const bool observe_per_core = false) {
     129             :     db::mutate<Tags::ReductionData<Ts...>, Tags::ReductionDataNames<Ts...>,
     130             :                Tags::ContributorsOfReductionData>(
     131             :         [&array_index, &cache, &observation_id,
     132             :          reduction_data = std::move(reduction_data), &reduction_names,
     133             :          &sender_array_id, &subfile_name, &formatter, &observe_per_core](
     134             :             const gsl::not_null<std::unordered_map<
     135             :                 ObservationId, Parallel::ReductionData<Ts...>>*>
     136             :                 reduction_data_map,
     137             :             const gsl::not_null<
     138             :                 std::unordered_map<ObservationId, std::vector<std::string>>*>
     139             :                 reduction_names_map,
     140             :             const gsl::not_null<std::unordered_map<
     141             :                 ObservationId, std::unordered_set<Parallel::ArrayComponentId>>*>
     142             :                 reduction_observers_contributed,
     143             :             const std::unordered_map<
     144             :                 ObservationKey,
     145             :                 std::unordered_set<Parallel::ArrayComponentId>>&
     146             :                 observations_registered) mutable {  // NOLINT(spectre-mutable)
     147             :           ASSERT(
     148             :               observations_registered.find(observation_id.observation_key()) !=
     149             :                   observations_registered.end(),
     150             :               "Couldn't find registration key "
     151             :                   << observation_id.observation_key()
     152             :                   << " in the registered observers. Known IDs are "
     153             :                   << keys_of(observations_registered)
     154             :                   << "\nIt could be that you are using a nodegroup DG "
     155             :                      "collection but the observation code for the event hasn't "
     156             :                      "been updated to work with the nodegroup yet.");
     157             : 
     158             :           auto& contributed_array_ids =
     159             :               (*reduction_observers_contributed)[observation_id];
     160             :           if (UNLIKELY(contributed_array_ids.find(sender_array_id) !=
     161             :                        contributed_array_ids.end())) {
     162             :             ERROR("Already received reduction data to observation id "
     163             :                   << observation_id << " from array component id "
     164             :                   << sender_array_id);
     165             :           }
     166             :           contributed_array_ids.insert(sender_array_id);
     167             : 
     168             :           if (reduction_data_map->count(observation_id) == 0) {
     169             :             reduction_data_map->emplace(observation_id,
     170             :                                         std::move(reduction_data));
     171             :             reduction_names_map->emplace(observation_id, reduction_names);
     172             :           } else {
     173             :             if (UNLIKELY(reduction_names_map->at(observation_id) !=
     174             :                          reduction_names)) {
     175             :               using ::operator<<;
     176             :               ERROR("Reduction names differ at ObservationId "
     177             :                     << observation_id << " with the expected names being "
     178             :                     << reduction_names_map->at(observation_id)
     179             :                     << " and the received names being " << reduction_names);
     180             :             }
     181             :             reduction_data_map->operator[](observation_id)
     182             :                 .combine(std::move(reduction_data));
     183             :           }
     184             : 
     185             :           // Check if we have received all reduction data from the registered
     186             :           // elements. If so, we reduce to the local ObserverWriter nodegroup.
     187             :           if (UNLIKELY(
     188             :                   contributed_array_ids.size() ==
     189             :                   observations_registered.at(observation_id.observation_key())
     190             :                       .size())) {
     191             :             auto& local_writer = *Parallel::local_branch(
     192             :                 Parallel::get_parallel_component<ObserverWriter<Metavariables>>(
     193             :                     cache));
     194             :             auto& my_proxy =
     195             :                 Parallel::get_parallel_component<ParallelComponent>(cache);
     196             :             const std::optional<int> observe_with_core_id =
     197             :                 observe_per_core ? std::make_optional(Parallel::my_proc<int>(
     198             :                                        *Parallel::local_branch(my_proxy)))
     199             :                                  : std::nullopt;
     200             :             Parallel::threaded_action<
     201             :                 ThreadedActions::CollectReductionDataOnNode>(
     202             :                 local_writer, observation_id,
     203             :                 Parallel::make_array_component_id<ParallelComponent>(
     204             :                     array_index),
     205             :                 subfile_name, (*reduction_names_map)[observation_id],
     206             :                 std::move((*reduction_data_map)[observation_id]),
     207             :                 std::move(formatter), observe_with_core_id);
     208             :             reduction_data_map->erase(observation_id);
     209             :             reduction_names_map->erase(observation_id);
     210             :             reduction_observers_contributed->erase(observation_id);
     211             :           }
     212             :         },
     213             :         make_not_null(&box),
     214             :         db::get<Tags::ExpectedContributorsForObservations>(box));
     215             :     // Silence a gcc <= 9 unused-variable warning
     216             :     (void)observe_per_core;
     217             :   }
     218             : };
     219             : }  // namespace Actions
     220             : 
     221             : namespace ThreadedActions {
     222             : 
     223             : namespace ReductionActions_detail {
     224             : void append_to_reduction_data(
     225             :     const gsl::not_null<std::vector<double>*> all_reduction_data,
     226             :     const double t);
     227             : 
     228             : void append_to_reduction_data(
     229             :     const gsl::not_null<std::vector<double>*> all_reduction_data,
     230             :     const std::vector<double>& t);
     231             : 
     232             : template <typename... Ts, size_t... Is>
     233             : void write_data(const std::string& subfile_name,
     234             :                 const std::string& input_source,
     235             :                 std::vector<std::string> legend, const std::tuple<Ts...>& data,
     236             :                 const std::string& file_prefix,
     237             :                 std::index_sequence<Is...> /*meta*/) {
     238             :   static_assert(sizeof...(Ts) > 0,
     239             :                 "Must be reducing at least one piece of data");
     240             :   std::vector<double> data_to_append{};
     241             :   EXPAND_PACK_LEFT_TO_RIGHT(
     242             :       append_to_reduction_data(&data_to_append, std::get<Is>(data)));
     243             : 
     244             :   if (legend.size() != data_to_append.size()) {
     245             :     ERROR(
     246             :         "There must be one name provided for each piece of data. You provided "
     247             :         << legend.size() << " names: '" << get_output(legend)
     248             :         << "' but there are " << data_to_append.size()
     249             :         << " pieces of data being reduced");
     250             :   }
     251             : 
     252             :   h5::H5File<h5::AccessType::ReadWrite> h5file(file_prefix + ".h5", true,
     253             :                                                input_source);
     254             :   constexpr size_t version_number = 0;
     255             :   auto& time_series_file = h5file.try_insert<h5::Dat>(
     256             :       subfile_name, std::move(legend), version_number);
     257             :   time_series_file.append(data_to_append);
     258             : }
     259             : }  // namespace ReductionActions_detail
     260             : 
     261             : /*!
     262             :  * \brief Gathers all the reduction data from all processing elements/cores on a
     263             :  * node.
     264             :  */
     265           1 : struct CollectReductionDataOnNode {
     266             :  public:
     267             :   template <typename ParallelComponent, typename DbTagsList,
     268             :             typename Metavariables, typename ArrayIndex,
     269             :             typename... ReductionDatums,
     270             :             typename Formatter = observers::NoFormatter>
     271           0 :   static void apply(
     272             :       db::DataBox<DbTagsList>& box, Parallel::GlobalCache<Metavariables>& cache,
     273             :       const ArrayIndex& /*array_index*/,
     274             :       const gsl::not_null<Parallel::NodeLock*> node_lock,
     275             :       const observers::ObservationId& observation_id,
     276             :       Parallel::ArrayComponentId observer_group_id,
     277             :       const std::string& subfile_name,
     278             :       std::vector<std::string>&& reduction_names,
     279             :       Parallel::ReductionData<ReductionDatums...>&& received_reduction_data,
     280             :       std::optional<Formatter>&& formatter = std::nullopt,
     281             :       const std::optional<int> observe_with_core_id = std::nullopt) {
     282             :     // The below gymnastics with pointers is done in order to minimize the
     283             :     // time spent locking the entire node, which is necessary because the
     284             :     // DataBox does not allow any functions calls, both get and mutate, during
     285             :     // a mutate. This design choice in DataBox is necessary to guarantee a
     286             :     // consistent state throughout mutation. Here, however, we need to be
     287             :     // reasonable efficient in parallel and so we manually guarantee that
     288             :     // consistent state. To this end, we create pointers and assign to them
     289             :     // the data in the DataBox which is guaranteed to be pointer stable. The
     290             :     // data itself is guaranteed to be stable inside the ReductionDataLock.
     291             :     std::unordered_map<observers::ObservationId,
     292             :                        Parallel::ReductionData<ReductionDatums...>>*
     293             :         reduction_data = nullptr;
     294             :     std::unordered_map<ObservationId, std::vector<std::string>>*
     295             :         reduction_names_map = nullptr;
     296             :     std::unordered_map<observers::ObservationId,
     297             :                        std::unordered_set<Parallel::ArrayComponentId>>*
     298             :         reduction_observers_contributed = nullptr;
     299             :     Parallel::NodeLock* reduction_data_lock = nullptr;
     300             :     Parallel::NodeLock* reduction_file_lock = nullptr;
     301             :     size_t observations_registered_with_id = std::numeric_limits<size_t>::max();
     302             : 
     303             :     {
     304             :       const std::lock_guard hold_lock(*node_lock);
     305             :       db::mutate<Tags::ReductionData<ReductionDatums...>,
     306             :                  Tags::ReductionDataNames<ReductionDatums...>,
     307             :                  Tags::ContributorsOfReductionData, Tags::ReductionDataLock,
     308             :                  Tags::H5FileLock>(
     309             :           [&reduction_data, &reduction_names_map,
     310             :            &reduction_observers_contributed, &reduction_data_lock,
     311             :            &reduction_file_lock, &observation_id, &observer_group_id,
     312             :            &observations_registered_with_id](
     313             :               const gsl::not_null<std::unordered_map<
     314             :                   observers::ObservationId,
     315             :                   Parallel::ReductionData<ReductionDatums...>>*>
     316             :                   reduction_data_ptr,
     317             :               const gsl::not_null<
     318             :                   std::unordered_map<ObservationId, std::vector<std::string>>*>
     319             :                   reduction_names_map_ptr,
     320             :               const gsl::not_null<std::unordered_map<
     321             :                   observers::ObservationId,
     322             :                   std::unordered_set<Parallel::ArrayComponentId>>*>
     323             :                   reduction_observers_contributed_ptr,
     324             :               const gsl::not_null<Parallel::NodeLock*> reduction_data_lock_ptr,
     325             :               const gsl::not_null<Parallel::NodeLock*> reduction_file_lock_ptr,
     326             :               const std::unordered_map<
     327             :                   ObservationKey,
     328             :                   std::unordered_set<Parallel::ArrayComponentId>>&
     329             :                   observations_registered) {
     330             :             const ObservationKey& key{observation_id.observation_key()};
     331             :             const auto& registered_group_ids = observations_registered.at(key);
     332             :             if (UNLIKELY(registered_group_ids.find(observer_group_id) ==
     333             :                          registered_group_ids.end())) {
     334             :               ERROR("The observer group id "
     335             :                     << observer_group_id
     336             :                     << " was not registered for the observation id "
     337             :                     << observation_id);
     338             :             }
     339             :             reduction_data = &*reduction_data_ptr;
     340             :             reduction_names_map = &*reduction_names_map_ptr;
     341             :             reduction_observers_contributed =
     342             :                 &*reduction_observers_contributed_ptr;
     343             :             reduction_data_lock = &*reduction_data_lock_ptr;
     344             :             reduction_file_lock = &*reduction_file_lock_ptr;
     345             :             observations_registered_with_id =
     346             :                 observations_registered.at(key).size();
     347             :           },
     348             :           make_not_null(&box),
     349             :           db::get<Tags::ExpectedContributorsForObservations>(box));
     350             :     }
     351             : 
     352             :     ASSERT(
     353             :         observations_registered_with_id != std::numeric_limits<size_t>::max(),
     354             :         "Failed to set observations_registered_with_id when mutating the "
     355             :         "DataBox. This is a bug in the code.");
     356             : 
     357             :     bool send_data = false;
     358             :     // Now that we've retrieved pointers to the data in the DataBox we wish to
     359             :     // manipulate, lock the data and manipulate it.
     360             :     {
     361             :       const std::lock_guard hold_data_lock(*reduction_data_lock);
     362             :       auto& contributed_group_ids =
     363             :           (*reduction_observers_contributed)[observation_id];
     364             : 
     365             :       if (UNLIKELY(contributed_group_ids.find(observer_group_id) !=
     366             :                    contributed_group_ids.end())) {
     367             :         ERROR("Already received reduction data to observation id "
     368             :               << observation_id << " from array component id "
     369             :               << observer_group_id);
     370             :       }
     371             :       contributed_group_ids.insert(observer_group_id);
     372             : 
     373             :       // If requested, write the intermediate reduction data from the particular
     374             :       // core to one file per node. This allows measuring reduction data
     375             :       // per-core, e.g. performance metrics to assess load balancing.
     376             :       if (observe_with_core_id.has_value()) {
     377             :         auto reduction_data_this_core = received_reduction_data;
     378             :         reduction_data_this_core.finalize();
     379             :         auto reduction_names_this_core = reduction_names;
     380             :         auto& my_proxy =
     381             :             Parallel::get_parallel_component<ParallelComponent>(cache);
     382             :         const std::lock_guard hold_file_lock(*reduction_file_lock);
     383             :         ReductionActions_detail::write_data(
     384             :             "/Core" + std::to_string(observe_with_core_id.value()) +
     385             :                 subfile_name,
     386             :             observers::input_source_from_cache(cache),
     387             :             std::move(reduction_names_this_core),
     388             :             std::move(reduction_data_this_core.data()),
     389             :             Parallel::get<Tags::ReductionFileName>(cache) +
     390             :                 std::to_string(
     391             :                     Parallel::my_node<int>(*Parallel::local_branch(my_proxy))),
     392             :             std::make_index_sequence<sizeof...(ReductionDatums)>{});
     393             :       }
     394             : 
     395             :       if (reduction_data->find(observation_id) == reduction_data->end()) {
     396             :         // This Action has been called for the first time,
     397             :         // so all we need to do is move the input data to the
     398             :         // reduction_data in the DataBox.
     399             :         reduction_data->operator[](observation_id) =
     400             :             std::move(received_reduction_data);
     401             :       } else {
     402             :         // This Action is being called at least the second time
     403             :         // (but not the final time if on node 0).
     404             :         reduction_data->at(observation_id)
     405             :             .combine(std::move(received_reduction_data));
     406             :       }
     407             : 
     408             :       if (UNLIKELY(reduction_names.empty())) {
     409             :         ERROR(
     410             :             "The reduction names, which is a std::vector of the names of "
     411             :             "the columns in the file, must be non-empty.");
     412             :       }
     413             :       if (auto current_names = reduction_names_map->find(observation_id);
     414             :           current_names == reduction_names_map->end()) {
     415             :         reduction_names_map->emplace(observation_id,
     416             :                                      std::move(reduction_names));
     417             :       } else if (UNLIKELY(current_names->second != reduction_names)) {
     418             :         ERROR(
     419             :             "The reduction names passed in must match the currently "
     420             :             "known reduction names.");
     421             :       }
     422             : 
     423             :       // Check if we have received all reduction data from the Observer
     424             :       // group. If so we reduce to node 0 for writing to disk. We use a bool
     425             :       // `send_data` to allow us to defer the send call until after we've
     426             :       // unlocked the lock.
     427             :       if (reduction_observers_contributed->at(observation_id).size() ==
     428             :           observations_registered_with_id) {
     429             :         send_data = true;
     430             :         // We intentionally move the data out of the map and erase it
     431             :         // before call `WriteReductionData` since if the call to
     432             :         // `WriteReductionData` is inlined and we erase data from the maps
     433             :         // afterward we would lose data.
     434             :         reduction_names =
     435             :             std::move(reduction_names_map->operator[](observation_id));
     436             :         received_reduction_data =
     437             :             std::move(reduction_data->operator[](observation_id));
     438             :         reduction_observers_contributed->erase(observation_id);
     439             :         reduction_data->erase(observation_id);
     440             :         reduction_names_map->erase(observation_id);
     441             :       }
     442             :     }
     443             : 
     444             :     if (send_data) {
     445             :       auto& my_proxy =
     446             :           Parallel::get_parallel_component<ParallelComponent>(cache);
     447             :       Parallel::threaded_action<WriteReductionData>(
     448             :           Parallel::get_parallel_component<ObserverWriter<Metavariables>>(
     449             :               cache)[0],
     450             :           observation_id,
     451             :           Parallel::my_node<size_t>(*Parallel::local_branch(my_proxy)),
     452             :           subfile_name,
     453             :           // NOLINTNEXTLINE(bugprone-use-after-move)
     454             :           std::move(reduction_names), std::move(received_reduction_data),
     455             :           std::move(formatter));
     456             :     }
     457             :   }
     458             : };
     459             : 
     460             : /*!
     461             :  * \ingroup ObserversGroup
     462             :  * \brief Write reduction data to disk from node 0.
     463             :  */
     464           1 : struct WriteReductionData {
     465             :   template <typename ParallelComponent, typename DbTagsList,
     466             :             typename Metavariables, typename ArrayIndex,
     467             :             typename... ReductionDatums,
     468             :             typename Formatter = observers::NoFormatter>
     469           0 :   static void apply(
     470             :       db::DataBox<DbTagsList>& box, Parallel::GlobalCache<Metavariables>& cache,
     471             :       const ArrayIndex& /*array_index*/,
     472             :       const gsl::not_null<Parallel::NodeLock*> node_lock,
     473             :       const observers::ObservationId& observation_id,
     474             :       const size_t sender_node_number, const std::string& subfile_name,
     475             :       std::vector<std::string>&& reduction_names,
     476             :       Parallel::ReductionData<ReductionDatums...>&& received_reduction_data,
     477             :       std::optional<Formatter>&& formatter = std::nullopt) {
     478             :     if constexpr (not std::is_same_v<Formatter, observers::NoFormatter>) {
     479             :       static_assert(
     480             :           tt::assert_conforms_to_v<Formatter,
     481             :                                    protocols::ReductionDataFormatter>);
     482             :       static_assert(
     483             :           std::is_same_v<typename Formatter::reduction_data,
     484             :                          Parallel::ReductionData<ReductionDatums...>>,
     485             :           "Mismatch between the formatter's `reduction_data` type alias and "
     486             :           "the reduction data that is being reduced.");
     487             :     }
     488             :     // The below gymnastics with pointers is done in order to minimize the
     489             :     // time spent locking the entire node, which is necessary because the
     490             :     // DataBox does not allow any functions calls, both get and mutate, during
     491             :     // a mutate. This design choice in DataBox is necessary to guarantee a
     492             :     // consistent state throughout mutation. Here, however, we need to be
     493             :     // reasonable efficient in parallel and so we manually guarantee that
     494             :     // consistent state. To this end, we create pointers and assign to them
     495             :     // the data in the DataBox which is guaranteed to be pointer stable. The
     496             :     // data itself is guaranteed to be stable inside the ReductionDataLock.
     497             :     std::unordered_map<observers::ObservationId,
     498             :                        Parallel::ReductionData<ReductionDatums...>>*
     499             :         reduction_data = nullptr;
     500             :     std::unordered_map<ObservationId, std::vector<std::string>>*
     501             :         reduction_names_map = nullptr;
     502             :     std::unordered_map<observers::ObservationId, std::unordered_set<size_t>>*
     503             :         nodes_contributed = nullptr;
     504             :     Parallel::NodeLock* reduction_data_lock = nullptr;
     505             :     Parallel::NodeLock* reduction_file_lock = nullptr;
     506             :     size_t observations_registered_with_id = std::numeric_limits<size_t>::max();
     507             : 
     508             :     {
     509             :       const std::lock_guard hold_lock(*node_lock);
     510             :       db::mutate<Tags::ReductionData<ReductionDatums...>,
     511             :                  Tags::ReductionDataNames<ReductionDatums...>,
     512             :                  Tags::NodesThatContributedReductions, Tags::ReductionDataLock,
     513             :                  Tags::H5FileLock>(
     514             :           [&nodes_contributed, &reduction_data, &reduction_names_map,
     515             :            &reduction_data_lock, &reduction_file_lock, &observation_id,
     516             :            &observations_registered_with_id, &sender_node_number](
     517             :               const gsl::not_null<
     518             :                   typename Tags::ReductionData<ReductionDatums...>::type*>
     519             :                   reduction_data_ptr,
     520             :               const gsl::not_null<
     521             :                   std::unordered_map<ObservationId, std::vector<std::string>>*>
     522             :                   reduction_names_map_ptr,
     523             :               const gsl::not_null<std::unordered_map<
     524             :                   ObservationId, std::unordered_set<size_t>>*>
     525             :                   nodes_contributed_ptr,
     526             :               const gsl::not_null<Parallel::NodeLock*> reduction_data_lock_ptr,
     527             :               const gsl::not_null<Parallel::NodeLock*> reduction_file_lock_ptr,
     528             :               const std::unordered_map<ObservationKey, std::set<size_t>>&
     529             :                   nodes_registered_for_reductions) {
     530             :             const ObservationKey& key{observation_id.observation_key()};
     531             :             ASSERT(nodes_registered_for_reductions.find(key) !=
     532             :                        nodes_registered_for_reductions.end(),
     533             :                    "Performing reduction with unregistered ID key "
     534             :                        << observation_id.observation_key());
     535             :             const auto& registered_nodes =
     536             :                 nodes_registered_for_reductions.at(key);
     537             : 
     538             :             if (UNLIKELY(registered_nodes.find(sender_node_number) ==
     539             :                          registered_nodes.end())) {
     540             :               ERROR("Node " << sender_node_number
     541             :                             << " was not registered for the observation id "
     542             :                             << observation_id);
     543             :             }
     544             : 
     545             :             reduction_data = &*reduction_data_ptr;
     546             :             reduction_names_map = &*reduction_names_map_ptr;
     547             :             nodes_contributed = &*nodes_contributed_ptr;
     548             :             reduction_data_lock = &*reduction_data_lock_ptr;
     549             :             reduction_file_lock = &*reduction_file_lock_ptr;
     550             :             observations_registered_with_id =
     551             :                 nodes_registered_for_reductions.at(key).size();
     552             :           },
     553             :           make_not_null(&box),
     554             :           db::get<Tags::NodesExpectedToContributeReductions>(box));
     555             :     }
     556             : 
     557             :     ASSERT(
     558             :         observations_registered_with_id != std::numeric_limits<size_t>::max(),
     559             :         "Failed to set observations_registered_with_id when mutating the "
     560             :         "DataBox. This is a bug in the code.");
     561             : 
     562             :     bool write_to_disk = false;
     563             :     // Now that we've retrieved pointers to the data in the DataBox we wish to
     564             :     // manipulate, lock the data and manipulate it.
     565             :     {
     566             :       const std::lock_guard hold_lock(*reduction_data_lock);
     567             :       auto& nodes_contributed_to_observation =
     568             :           (*nodes_contributed)[observation_id];
     569             :       if (nodes_contributed_to_observation.find(sender_node_number) !=
     570             :           nodes_contributed_to_observation.end()) {
     571             :         ERROR("Already received reduction data at observation id "
     572             :               << observation_id << " from node " << sender_node_number);
     573             :       }
     574             :       nodes_contributed_to_observation.insert(sender_node_number);
     575             : 
     576             :       if (UNLIKELY(reduction_names.empty())) {
     577             :         ERROR(
     578             :             "The reduction names, which is a std::vector of the names of "
     579             :             "the columns in the file, must be non-empty.");
     580             :       }
     581             :       if (auto current_names = reduction_names_map->find(observation_id);
     582             :           current_names == reduction_names_map->end()) {
     583             :         reduction_names_map->emplace(observation_id,
     584             :                                      std::move(reduction_names));
     585             :       } else if (UNLIKELY(current_names->second != reduction_names)) {
     586             :         using ::operator<<;
     587             :         ERROR(
     588             :             "The reduction names passed in must match the currently "
     589             :             "known reduction names. Current ones are "
     590             :             << current_names->second << " while the received are "
     591             :             << reduction_names);
     592             :       }
     593             : 
     594             :       if (reduction_data->find(observation_id) == reduction_data->end()) {
     595             :         // This Action has been called for the first time,
     596             :         // so all we need to do is move the input data to the
     597             :         // reduction_data in the DataBox.
     598             :         reduction_data->operator[](observation_id) =
     599             :             std::move(received_reduction_data);
     600             :       } else {
     601             :         // This Action is being called at least the second time
     602             :         // (but not the final time if on node 0).
     603             :         reduction_data->at(observation_id)
     604             :             .combine(std::move(received_reduction_data));
     605             :       }
     606             : 
     607             :       // We use a bool `write_to_disk` to allow us to defer the data writing
     608             :       // until after we've unlocked the lock. For the same reason, we move the
     609             :       // final, reduced result into `received_reduction_data` and
     610             :       // `reduction_names`.
     611             :       if (nodes_contributed_to_observation.size() ==
     612             :           observations_registered_with_id) {
     613             :         write_to_disk = true;
     614             :         received_reduction_data =
     615             :             std::move(reduction_data->operator[](observation_id));
     616             :         reduction_names =
     617             :             std::move(reduction_names_map->operator[](observation_id));
     618             :         reduction_data->erase(observation_id);
     619             :         reduction_names_map->erase(observation_id);
     620             :         nodes_contributed->erase(observation_id);
     621             :       }
     622             :     }
     623             : 
     624             :     if (write_to_disk) {
     625             :       const std::lock_guard hold_lock(*reduction_file_lock);
     626             :       // NOLINTNEXTLINE(bugprone-use-after-move)
     627             :       received_reduction_data.finalize();
     628             :       if constexpr (not std::is_same_v<Formatter, NoFormatter>) {
     629             :         if (formatter.has_value()) {
     630             :           Parallel::printf(
     631             :               std::apply(*formatter, received_reduction_data.data()) + "\n");
     632             :         }
     633             :       }
     634             :       ReductionActions_detail::write_data(
     635             :           subfile_name, observers::input_source_from_cache(cache),
     636             :           // NOLINTNEXTLINE(bugprone-use-after-move)
     637             :           std::move(reduction_names), std::move(received_reduction_data.data()),
     638             :           Parallel::get<Tags::ReductionFileName>(cache),
     639             :           std::make_index_sequence<sizeof...(ReductionDatums)>{});
     640             :     }
     641             :   }
     642             : };
     643             : 
     644             : /*!
     645             :  * \brief Write a single row of data to the reductions file without the need to
     646             :  * register or reduce anything, e.g. from a singleton component or from a
     647             :  * specific chare.
     648             :  *
     649             :  * Use observers::Actions::ContributeReductionData instead if you need to
     650             :  * perform a reduction before writing to the file.
     651             :  *
     652             :  * Invoke this action on the observers::ObserverWriter component on node 0. Pass
     653             :  * the following arguments when invoking this action:
     654             :  *
     655             :  * - `subfile_name`: the name of the `h5::Dat` subfile in the HDF5 file. Include
     656             :  *   a leading slash, e.g., `/element_data`.
     657             :  * - `legend`: a `std::vector<std::string>` of column labels for the quantities
     658             :  *   being observed (e.g. `{"Time", "L1ErrorDensity", "L2ErrorDensity"}`).
     659             :  * - `reduction_data`: a `std::tuple<...>` with the data to write. The tuple can
     660             :  *   hold either `double`s or `std::vector<double>`s and is flattened before it
     661             :  *   is written to the file to form a single row of values. The total number of
     662             :  *   values must match the length of the `legend`.
     663             :  */
     664           1 : struct WriteReductionDataRow {
     665             :   template <typename ParallelComponent, typename DbTagsList,
     666             :             typename Metavariables, typename ArrayIndex, typename... Ts,
     667             :             typename DataBox = db::DataBox<DbTagsList>>
     668           0 :   static void apply(db::DataBox<DbTagsList>& box,
     669             :                     Parallel::GlobalCache<Metavariables>& cache,
     670             :                     const ArrayIndex& /*array_index*/,
     671             :                     const gsl::not_null<Parallel::NodeLock*> /*node_lock*/,
     672             :                     const std::string& subfile_name,
     673             :                     std::vector<std::string>&& legend,
     674             :                     std::tuple<Ts...>&& reduction_data) {
     675             :     auto& reduction_file_lock =
     676             :         db::get_mutable_reference<Tags::H5FileLock>(make_not_null(&box));
     677             :     const std::lock_guard hold_lock(reduction_file_lock);
     678             :     ThreadedActions::ReductionActions_detail::write_data(
     679             :         subfile_name, observers::input_source_from_cache(cache),
     680             :         std::move(legend), std::move(reduction_data),
     681             :         Parallel::get<Tags::ReductionFileName>(cache),
     682             :         std::make_index_sequence<sizeof...(Ts)>{});
     683             :   }
     684             : };
     685             : 
     686             : }  // namespace ThreadedActions
     687             : }  // namespace observers

Generated by: LCOV version 1.14