SpECTRE Documentation Coverage Report
Current view: top level - IO/Observer - ReductionActions.hpp Hit Total Coverage
Commit: 1f2210958b4f38fdc0400907ee7c6d5af5111418 Lines: 7 14 50.0 %
Date: 2025-12-05 05:03:31
Legend: Lines: hit not hit

          Line data    Source code
       1           0 : // Distributed under the MIT License.
       2             : // See LICENSE.txt for details.
       3             : 
       4             : #pragma once
       5             : 
       6             : #include <cstddef>
       7             : #include <mutex>
       8             : #include <optional>
       9             : #include <string>
      10             : #include <tuple>
      11             : #include <unordered_map>
      12             : #include <utility>
      13             : #include <vector>
      14             : 
      15             : #include "DataStructures/DataBox/DataBox.hpp"
      16             : #include "IO/H5/AccessType.hpp"
      17             : #include "IO/H5/Dat.hpp"
      18             : #include "IO/H5/File.hpp"
      19             : #include "IO/Observer/Helpers.hpp"
      20             : #include "IO/Observer/ObservationId.hpp"
      21             : #include "IO/Observer/Protocols/ReductionDataFormatter.hpp"
      22             : #include "IO/Observer/Tags.hpp"
      23             : #include "Parallel/ArrayComponentId.hpp"
      24             : #include "Parallel/ArrayIndex.hpp"
      25             : #include "Parallel/GlobalCache.hpp"
      26             : #include "Parallel/Info.hpp"
      27             : #include "Parallel/Invoke.hpp"
      28             : #include "Parallel/Local.hpp"
      29             : #include "Parallel/NodeLock.hpp"
      30             : #include "Parallel/ParallelComponentHelpers.hpp"
      31             : #include "Parallel/Printf/Printf.hpp"
      32             : #include "Parallel/Reduction.hpp"
      33             : #include "Utilities/ErrorHandling/Assert.hpp"
      34             : #include "Utilities/ErrorHandling/Error.hpp"
      35             : #include "Utilities/GetOutput.hpp"
      36             : #include "Utilities/Gsl.hpp"
      37             : #include "Utilities/PrettyType.hpp"
      38             : #include "Utilities/ProtocolHelpers.hpp"
      39             : #include "Utilities/Requires.hpp"
      40             : #include "Utilities/Serialization/PupStlCpp17.hpp"
      41             : #include "Utilities/StdHelpers.hpp"
      42             : #include "Utilities/TMPL.hpp"
      43             : #include "Utilities/TaggedTuple.hpp"
      44             : 
      45             : namespace observers {
      46             : 
      47             : /// \cond
      48             : template <class Metavariables>
      49             : struct ObserverWriter;
      50             : /// \endcond
      51             : 
      52           0 : namespace ThreadedActions {
      53             : /// \cond
      54             : struct CollectReductionDataOnNode;
      55             : struct WriteReductionData;
      56             : /// \endcond
      57             : }  // namespace ThreadedActions
      58             : 
      59             : /// Indicates no formatter is selected
      60           1 : struct NoFormatter {
      61           0 :   void pup(PUP::er& /*p*/) {}
      62             : };
      63             : 
      64             : namespace Actions {
      65             : /// \cond
      66             : struct ContributeReductionDataToWriter;
      67             : /// \endcond
      68             : 
      69             : /*!
      70             :  * \ingroup ObserversGroup
      71             :  * \brief Send reduction data to the observer group.
      72             :  *
      73             :  * Once everything at a specific `ObservationId` has been contributed to the
      74             :  * reduction, the groups reduce to their local nodegroup.
      75             :  *
      76             :  * The caller of this Action (which is to be invoked on the Observer parallel
      77             :  * component) must pass in an `observation_id` used to uniquely identify the
      78             :  * observation in time, the name of the `h5::Dat` subfile in the HDF5 file (e.g.
      79             :  * `/element_data`, where the slash is important), a `std::vector<std::string>`
      80             :  * of names of the quantities being reduced (e.g. `{"Time", "L1ErrorDensity",
      81             :  * "L2ErrorDensity"}`), and the `Parallel::ReductionData` that holds the
      82             :  * `ReductionDatums` containing info on how to do the reduction.
      83             :  *
      84             :  * The observer components need to know all expected reduction data types by
      85             :  * compile-time, so they rely on the
      86             :  * `Metavariables::observed_reduction_data_tags` alias to collect them in one
      87             :  * place. To this end, each Action that contributes reduction data must expose
      88             :  * the type alias as:
      89             :  *
      90             :  * \snippet ObserverHelpers.hpp make_reduction_data_tags
      91             :  *
      92             :  * Then, in the `Metavariables` collect them from all observing Actions using
      93             :  * the `observers::collect_reduction_data_tags` metafunction.
      94             :  *
      95             :  * This action also accepts a "formatter" that will be forwarded along with the
      96             :  * reduction data and used to print an informative message when the reduction is
      97             :  * complete. The formatter must conform to
      98             :  * `observers::protocols::ReductionDataFormatter`.
      99             :  *
     100             :  * This action also supports observing the intermediate stage of the reduction
     101             :  * over just the processing element that the element is currently on. This can
     102             :  * be useful e.g. to measure performance metric to assess load-balancing such as
     103             :  * the number of grid points on each core. Enable per-core observations by
     104             :  * passing `true` for the `observe_per_core` argument (default: `false`). The
     105             :  * data will be written in one H5 file per _node_ prefixed with the
     106             :  * `observers::Tags::ReductionFileName`, in a `Core{core_id}` subfile, where
     107             :  * `core_id` is an integer identifying the core across all nodes (see
     108             :  * `Parallel::my_proc`). For example, when running on 2 nodes with 2 cores each
     109             :  * you will end up with `Reductions0.h5` containing `/Core0/{subfile_name}.dat`
     110             :  * and `/Core1/{subfile_name}.dat`, and `Reductions1.h5` containing
     111             :  * `/Core2/{subfile_name}.dat` and `/Core3/{subfile_name}.dat`. This is in
     112             :  * addition to the usual reduction output over _all_ registered elements,
     113             :  * written to `Reductions.h5` (no node ID suffix in the file name).
     114             :  */
     115           1 : struct ContributeReductionData {
     116             :   template <typename ParallelComponent, typename DbTagsList,
     117             :             typename Metavariables, typename ArrayIndex, typename... Ts,
     118             :             typename Formatter = observers::NoFormatter>
     119           0 :   static void apply(db::DataBox<DbTagsList>& box,
     120             :                     Parallel::GlobalCache<Metavariables>& cache,
     121             :                     const ArrayIndex& array_index,
     122             :                     const observers::ObservationId& observation_id,
     123             :                     const Parallel::ArrayComponentId& sender_array_id,
     124             :                     const std::string& subfile_name,
     125             :                     const std::vector<std::string>& reduction_names,
     126             :                     Parallel::ReductionData<Ts...>&& reduction_data,
     127             :                     std::optional<Formatter>&& formatter = std::nullopt,
     128             :                     const bool observe_per_core = false) {
     129             :     db::mutate<Tags::ReductionData<Ts...>, Tags::ReductionDataNames<Ts...>,
     130             :                Tags::ContributorsOfReductionData>(
     131             :         [&array_index, &cache, &observation_id,
     132             :          reduction_data = std::move(reduction_data), &reduction_names,
     133             :          &sender_array_id, &subfile_name, &formatter, &observe_per_core](
     134             :             const gsl::not_null<std::unordered_map<
     135             :                 ObservationId, Parallel::ReductionData<Ts...>>*>
     136             :                 reduction_data_map,
     137             :             const gsl::not_null<
     138             :                 std::unordered_map<ObservationId, std::vector<std::string>>*>
     139             :                 reduction_names_map,
     140             :             const gsl::not_null<std::unordered_map<
     141             :                 ObservationId, std::unordered_set<Parallel::ArrayComponentId>>*>
     142             :                 reduction_observers_contributed,
     143             :             const std::unordered_map<
     144             :                 ObservationKey,
     145             :                 std::unordered_set<Parallel::ArrayComponentId>>&
     146             :                 observations_registered) mutable {  // NOLINT(spectre-mutable)
     147             :           ASSERT(
     148             :               observations_registered.find(observation_id.observation_key()) !=
     149             :                   observations_registered.end(),
     150             :               "Couldn't find registration key "
     151             :                   << observation_id.observation_key()
     152             :                   << " in the registered observers. Known IDs are "
     153             :                   << keys_of(observations_registered)
     154             :                   << "\nIt could be that you are using a nodegroup DG "
     155             :                      "collection but the observation code for the event hasn't "
     156             :                      "been updated to work with the nodegroup yet.");
     157             : 
     158             :           auto& contributed_array_ids =
     159             :               (*reduction_observers_contributed)[observation_id];
     160             :           if (UNLIKELY(contributed_array_ids.find(sender_array_id) !=
     161             :                        contributed_array_ids.end())) {
     162             :             ERROR("Already received reduction data to observation id "
     163             :                   << observation_id << " from array component id "
     164             :                   << sender_array_id);
     165             :           }
     166             :           contributed_array_ids.insert(sender_array_id);
     167             : 
     168             :           if (reduction_data_map->count(observation_id) == 0) {
     169             :             reduction_data_map->emplace(observation_id,
     170             :                                         std::move(reduction_data));
     171             :             reduction_names_map->emplace(observation_id, reduction_names);
     172             :           } else {
     173             :             if (UNLIKELY(reduction_names_map->at(observation_id) !=
     174             :                          reduction_names)) {
     175             :               using ::operator<<;
     176             :               ERROR("Reduction names differ at ObservationId "
     177             :                     << observation_id << " with the expected names being "
     178             :                     << reduction_names_map->at(observation_id)
     179             :                     << " and the received names being " << reduction_names);
     180             :             }
     181             :             reduction_data_map->operator[](observation_id)
     182             :                 .combine(std::move(reduction_data));
     183             :           }
     184             : 
     185             :           // Check if we have received all reduction data from the registered
     186             :           // elements. If so, we reduce to the local ObserverWriter nodegroup.
     187             :           if (UNLIKELY(
     188             :                   contributed_array_ids.size() ==
     189             :                   observations_registered.at(observation_id.observation_key())
     190             :                       .size())) {
     191             :             auto& local_writer = *Parallel::local_branch(
     192             :                 Parallel::get_parallel_component<ObserverWriter<Metavariables>>(
     193             :                     cache));
     194             :             auto& my_proxy =
     195             :                 Parallel::get_parallel_component<ParallelComponent>(cache);
     196             :             const std::optional<int> observe_with_core_id =
     197             :                 observe_per_core ? std::make_optional(Parallel::my_proc<int>(
     198             :                                        *Parallel::local_branch(my_proxy)))
     199             :                                  : std::nullopt;
     200             :             Parallel::threaded_action<
     201             :                 ThreadedActions::CollectReductionDataOnNode>(
     202             :                 local_writer, observation_id,
     203             :                 Parallel::make_array_component_id<ParallelComponent>(
     204             :                     array_index),
     205             :                 subfile_name, (*reduction_names_map)[observation_id],
     206             :                 std::move((*reduction_data_map)[observation_id]),
     207             :                 std::move(formatter), observe_with_core_id);
     208             :             reduction_data_map->erase(observation_id);
     209             :             reduction_names_map->erase(observation_id);
     210             :             reduction_observers_contributed->erase(observation_id);
     211             :           }
     212             :         },
     213             :         make_not_null(&box),
     214             :         db::get<Tags::ExpectedContributorsForObservations>(box));
     215             :     // Silence a gcc <= 9 unused-variable warning
     216             :     (void)observe_per_core;
     217             :   }
     218             : };
     219             : }  // namespace Actions
     220             : 
     221             : namespace ThreadedActions {
     222             : 
     223             : namespace ReductionActions_detail {
     224             : void append_to_reduction_data(
     225             :     const gsl::not_null<std::vector<double>*> all_reduction_data,
     226             :     const double t);
     227             : 
     228             : template <typename T>
     229             : void append_to_reduction_data(
     230             :     const gsl::not_null<std::vector<double>*> all_reduction_data,
     231             :     const std::vector<T>& t);
     232             : 
     233             : void append_to_reduction_data(
     234             :     gsl::not_null<std::vector<double>*> all_reduction_data,
     235             :     const std::array<double, 3>& t);
     236             : 
     237             : template <typename... Ts, size_t... Is>
     238             : void write_data(const std::string& subfile_name,
     239             :                 const std::string& input_source,
     240             :                 std::vector<std::string> legend, const std::tuple<Ts...>& data,
     241             :                 const std::string& file_prefix,
     242             :                 std::index_sequence<Is...> /*meta*/) {
     243             :   static_assert(sizeof...(Ts) > 0,
     244             :                 "Must be reducing at least one piece of data");
     245             :   std::vector<double> data_to_append{};
     246             :   EXPAND_PACK_LEFT_TO_RIGHT(
     247             :       append_to_reduction_data(&data_to_append, std::get<Is>(data)));
     248             : 
     249             :   if (legend.size() != data_to_append.size()) {
     250             :     ERROR(
     251             :         "There must be one name provided for each piece of data. You provided "
     252             :         << legend.size() << " names: '" << get_output(legend)
     253             :         << "' but there are " << data_to_append.size()
     254             :         << " pieces of data being reduced");
     255             :   }
     256             : 
     257             :   h5::H5File<h5::AccessType::ReadWrite> h5file(file_prefix + ".h5", true,
     258             :                                                input_source);
     259             :   constexpr size_t version_number = 0;
     260             :   auto& time_series_file = h5file.try_insert<h5::Dat>(
     261             :       subfile_name, std::move(legend), version_number);
     262             :   time_series_file.append(data_to_append);
     263             : }
     264             : }  // namespace ReductionActions_detail
     265             : 
     266             : /*!
     267             :  * \brief Gathers all the reduction data from all processing elements/cores on a
     268             :  * node.
     269             :  */
     270           1 : struct CollectReductionDataOnNode {
     271             :  public:
     272             :   template <typename ParallelComponent, typename DbTagsList,
     273             :             typename Metavariables, typename ArrayIndex,
     274             :             typename... ReductionDatums,
     275             :             typename Formatter = observers::NoFormatter>
     276           0 :   static void apply(
     277             :       db::DataBox<DbTagsList>& box, Parallel::GlobalCache<Metavariables>& cache,
     278             :       const ArrayIndex& /*array_index*/,
     279             :       const gsl::not_null<Parallel::NodeLock*> node_lock,
     280             :       const observers::ObservationId& observation_id,
     281             :       Parallel::ArrayComponentId observer_group_id,
     282             :       const std::string& subfile_name,
     283             :       std::vector<std::string>&& reduction_names,
     284             :       Parallel::ReductionData<ReductionDatums...>&& received_reduction_data,
     285             :       std::optional<Formatter>&& formatter = std::nullopt,
     286             :       const std::optional<int> observe_with_core_id = std::nullopt) {
     287             :     // The below gymnastics with pointers is done in order to minimize the
     288             :     // time spent locking the entire node, which is necessary because the
     289             :     // DataBox does not allow any functions calls, both get and mutate, during
     290             :     // a mutate. This design choice in DataBox is necessary to guarantee a
     291             :     // consistent state throughout mutation. Here, however, we need to be
     292             :     // reasonable efficient in parallel and so we manually guarantee that
     293             :     // consistent state. To this end, we create pointers and assign to them
     294             :     // the data in the DataBox which is guaranteed to be pointer stable. The
     295             :     // data itself is guaranteed to be stable inside the ReductionDataLock.
     296             :     std::unordered_map<observers::ObservationId,
     297             :                        Parallel::ReductionData<ReductionDatums...>>*
     298             :         reduction_data = nullptr;
     299             :     std::unordered_map<ObservationId, std::vector<std::string>>*
     300             :         reduction_names_map = nullptr;
     301             :     std::unordered_map<observers::ObservationId,
     302             :                        std::unordered_set<Parallel::ArrayComponentId>>*
     303             :         reduction_observers_contributed = nullptr;
     304             :     Parallel::NodeLock* reduction_data_lock = nullptr;
     305             :     Parallel::NodeLock* reduction_file_lock = nullptr;
     306             :     size_t observations_registered_with_id = std::numeric_limits<size_t>::max();
     307             : 
     308             :     {
     309             :       const std::lock_guard hold_lock(*node_lock);
     310             :       db::mutate<Tags::ReductionData<ReductionDatums...>,
     311             :                  Tags::ReductionDataNames<ReductionDatums...>,
     312             :                  Tags::ContributorsOfReductionData, Tags::ReductionDataLock,
     313             :                  Tags::H5FileLock>(
     314             :           [&reduction_data, &reduction_names_map,
     315             :            &reduction_observers_contributed, &reduction_data_lock,
     316             :            &reduction_file_lock, &observation_id, &observer_group_id,
     317             :            &observations_registered_with_id](
     318             :               const gsl::not_null<std::unordered_map<
     319             :                   observers::ObservationId,
     320             :                   Parallel::ReductionData<ReductionDatums...>>*>
     321             :                   reduction_data_ptr,
     322             :               const gsl::not_null<
     323             :                   std::unordered_map<ObservationId, std::vector<std::string>>*>
     324             :                   reduction_names_map_ptr,
     325             :               const gsl::not_null<std::unordered_map<
     326             :                   observers::ObservationId,
     327             :                   std::unordered_set<Parallel::ArrayComponentId>>*>
     328             :                   reduction_observers_contributed_ptr,
     329             :               const gsl::not_null<Parallel::NodeLock*> reduction_data_lock_ptr,
     330             :               const gsl::not_null<Parallel::NodeLock*> reduction_file_lock_ptr,
     331             :               const std::unordered_map<
     332             :                   ObservationKey,
     333             :                   std::unordered_set<Parallel::ArrayComponentId>>&
     334             :                   observations_registered) {
     335             :             const ObservationKey& key{observation_id.observation_key()};
     336             :             const auto& registered_group_ids = observations_registered.at(key);
     337             :             if (UNLIKELY(registered_group_ids.find(observer_group_id) ==
     338             :                          registered_group_ids.end())) {
     339             :               ERROR("The observer group id "
     340             :                     << observer_group_id
     341             :                     << " was not registered for the observation id "
     342             :                     << observation_id);
     343             :             }
     344             :             reduction_data = &*reduction_data_ptr;
     345             :             reduction_names_map = &*reduction_names_map_ptr;
     346             :             reduction_observers_contributed =
     347             :                 &*reduction_observers_contributed_ptr;
     348             :             reduction_data_lock = &*reduction_data_lock_ptr;
     349             :             reduction_file_lock = &*reduction_file_lock_ptr;
     350             :             observations_registered_with_id =
     351             :                 observations_registered.at(key).size();
     352             :           },
     353             :           make_not_null(&box),
     354             :           db::get<Tags::ExpectedContributorsForObservations>(box));
     355             :     }
     356             : 
     357             :     ASSERT(
     358             :         observations_registered_with_id != std::numeric_limits<size_t>::max(),
     359             :         "Failed to set observations_registered_with_id when mutating the "
     360             :         "DataBox. This is a bug in the code.");
     361             : 
     362             :     bool send_data = false;
     363             :     // Now that we've retrieved pointers to the data in the DataBox we wish to
     364             :     // manipulate, lock the data and manipulate it.
     365             :     {
     366             :       const std::lock_guard hold_data_lock(*reduction_data_lock);
     367             :       auto& contributed_group_ids =
     368             :           (*reduction_observers_contributed)[observation_id];
     369             : 
     370             :       if (UNLIKELY(contributed_group_ids.find(observer_group_id) !=
     371             :                    contributed_group_ids.end())) {
     372             :         ERROR("Already received reduction data to observation id "
     373             :               << observation_id << " from array component id "
     374             :               << observer_group_id);
     375             :       }
     376             :       contributed_group_ids.insert(observer_group_id);
     377             : 
     378             :       // If requested, write the intermediate reduction data from the particular
     379             :       // core to one file per node. This allows measuring reduction data
     380             :       // per-core, e.g. performance metrics to assess load balancing.
     381             :       if (observe_with_core_id.has_value()) {
     382             :         auto reduction_data_this_core = received_reduction_data;
     383             :         reduction_data_this_core.finalize();
     384             :         auto reduction_names_this_core = reduction_names;
     385             :         auto& my_proxy =
     386             :             Parallel::get_parallel_component<ParallelComponent>(cache);
     387             :         const std::lock_guard hold_file_lock(*reduction_file_lock);
     388             :         ReductionActions_detail::write_data(
     389             :             "/Core" + std::to_string(observe_with_core_id.value()) +
     390             :                 subfile_name,
     391             :             observers::input_source_from_cache(cache),
     392             :             std::move(reduction_names_this_core),
     393             :             std::move(reduction_data_this_core.data()),
     394             :             Parallel::get<Tags::ReductionFileName>(cache) +
     395             :                 std::to_string(
     396             :                     Parallel::my_node<int>(*Parallel::local_branch(my_proxy))),
     397             :             std::make_index_sequence<sizeof...(ReductionDatums)>{});
     398             :       }
     399             : 
     400             :       if (reduction_data->find(observation_id) == reduction_data->end()) {
     401             :         // This Action has been called for the first time,
     402             :         // so all we need to do is move the input data to the
     403             :         // reduction_data in the DataBox.
     404             :         reduction_data->operator[](observation_id) =
     405             :             std::move(received_reduction_data);
     406             :       } else {
     407             :         // This Action is being called at least the second time
     408             :         // (but not the final time if on node 0).
     409             :         reduction_data->at(observation_id)
     410             :             .combine(std::move(received_reduction_data));
     411             :       }
     412             : 
     413             :       if (UNLIKELY(reduction_names.empty())) {
     414             :         ERROR(
     415             :             "The reduction names, which is a std::vector of the names of "
     416             :             "the columns in the file, must be non-empty.");
     417             :       }
     418             :       if (auto current_names = reduction_names_map->find(observation_id);
     419             :           current_names == reduction_names_map->end()) {
     420             :         reduction_names_map->emplace(observation_id,
     421             :                                      std::move(reduction_names));
     422             :       } else if (UNLIKELY(current_names->second != reduction_names)) {
     423             :         ERROR(
     424             :             "The reduction names passed in must match the currently "
     425             :             "known reduction names.");
     426             :       }
     427             : 
     428             :       // Check if we have received all reduction data from the Observer
     429             :       // group. If so we reduce to node 0 for writing to disk. We use a bool
     430             :       // `send_data` to allow us to defer the send call until after we've
     431             :       // unlocked the lock.
     432             :       if (reduction_observers_contributed->at(observation_id).size() ==
     433             :           observations_registered_with_id) {
     434             :         send_data = true;
     435             :         // We intentionally move the data out of the map and erase it
     436             :         // before call `WriteReductionData` since if the call to
     437             :         // `WriteReductionData` is inlined and we erase data from the maps
     438             :         // afterward we would lose data.
     439             :         reduction_names =
     440             :             std::move(reduction_names_map->operator[](observation_id));
     441             :         received_reduction_data =
     442             :             std::move(reduction_data->operator[](observation_id));
     443             :         reduction_observers_contributed->erase(observation_id);
     444             :         reduction_data->erase(observation_id);
     445             :         reduction_names_map->erase(observation_id);
     446             :       }
     447             :     }
     448             : 
     449             :     if (send_data) {
     450             :       auto& my_proxy =
     451             :           Parallel::get_parallel_component<ParallelComponent>(cache);
     452             :       Parallel::threaded_action<WriteReductionData>(
     453             :           Parallel::get_parallel_component<ObserverWriter<Metavariables>>(
     454             :               cache)[0],
     455             :           observation_id,
     456             :           Parallel::my_node<size_t>(*Parallel::local_branch(my_proxy)),
     457             :           subfile_name,
     458             :           // NOLINTNEXTLINE(bugprone-use-after-move)
     459             :           std::move(reduction_names), std::move(received_reduction_data),
     460             :           std::move(formatter));
     461             :     }
     462             :   }
     463             : };
     464             : 
     465             : /*!
     466             :  * \ingroup ObserversGroup
     467             :  * \brief Write reduction data to disk from node 0.
     468             :  */
     469           1 : struct WriteReductionData {
     470             :   template <typename ParallelComponent, typename DbTagsList,
     471             :             typename Metavariables, typename ArrayIndex,
     472             :             typename... ReductionDatums,
     473             :             typename Formatter = observers::NoFormatter>
     474           0 :   static void apply(
     475             :       db::DataBox<DbTagsList>& box, Parallel::GlobalCache<Metavariables>& cache,
     476             :       const ArrayIndex& /*array_index*/,
     477             :       const gsl::not_null<Parallel::NodeLock*> node_lock,
     478             :       const observers::ObservationId& observation_id,
     479             :       const size_t sender_node_number, const std::string& subfile_name,
     480             :       std::vector<std::string>&& reduction_names,
     481             :       Parallel::ReductionData<ReductionDatums...>&& received_reduction_data,
     482             :       std::optional<Formatter>&& formatter = std::nullopt) {
     483             :     if constexpr (not std::is_same_v<Formatter, observers::NoFormatter>) {
     484             :       static_assert(
     485             :           tt::assert_conforms_to_v<Formatter,
     486             :                                    protocols::ReductionDataFormatter>);
     487             :       static_assert(
     488             :           std::is_same_v<typename Formatter::reduction_data,
     489             :                          Parallel::ReductionData<ReductionDatums...>>,
     490             :           "Mismatch between the formatter's `reduction_data` type alias and "
     491             :           "the reduction data that is being reduced.");
     492             :     }
     493             :     // The below gymnastics with pointers is done in order to minimize the
     494             :     // time spent locking the entire node, which is necessary because the
     495             :     // DataBox does not allow any functions calls, both get and mutate, during
     496             :     // a mutate. This design choice in DataBox is necessary to guarantee a
     497             :     // consistent state throughout mutation. Here, however, we need to be
     498             :     // reasonable efficient in parallel and so we manually guarantee that
     499             :     // consistent state. To this end, we create pointers and assign to them
     500             :     // the data in the DataBox which is guaranteed to be pointer stable. The
     501             :     // data itself is guaranteed to be stable inside the ReductionDataLock.
     502             :     std::unordered_map<observers::ObservationId,
     503             :                        Parallel::ReductionData<ReductionDatums...>>*
     504             :         reduction_data = nullptr;
     505             :     std::unordered_map<ObservationId, std::vector<std::string>>*
     506             :         reduction_names_map = nullptr;
     507             :     std::unordered_map<observers::ObservationId, std::unordered_set<size_t>>*
     508             :         nodes_contributed = nullptr;
     509             :     Parallel::NodeLock* reduction_data_lock = nullptr;
     510             :     Parallel::NodeLock* reduction_file_lock = nullptr;
     511             :     size_t observations_registered_with_id = std::numeric_limits<size_t>::max();
     512             : 
     513             :     {
     514             :       const std::lock_guard hold_lock(*node_lock);
     515             :       db::mutate<Tags::ReductionData<ReductionDatums...>,
     516             :                  Tags::ReductionDataNames<ReductionDatums...>,
     517             :                  Tags::NodesThatContributedReductions, Tags::ReductionDataLock,
     518             :                  Tags::H5FileLock>(
     519             :           [&nodes_contributed, &reduction_data, &reduction_names_map,
     520             :            &reduction_data_lock, &reduction_file_lock, &observation_id,
     521             :            &observations_registered_with_id, &sender_node_number](
     522             :               const gsl::not_null<
     523             :                   typename Tags::ReductionData<ReductionDatums...>::type*>
     524             :                   reduction_data_ptr,
     525             :               const gsl::not_null<
     526             :                   std::unordered_map<ObservationId, std::vector<std::string>>*>
     527             :                   reduction_names_map_ptr,
     528             :               const gsl::not_null<std::unordered_map<
     529             :                   ObservationId, std::unordered_set<size_t>>*>
     530             :                   nodes_contributed_ptr,
     531             :               const gsl::not_null<Parallel::NodeLock*> reduction_data_lock_ptr,
     532             :               const gsl::not_null<Parallel::NodeLock*> reduction_file_lock_ptr,
     533             :               const std::unordered_map<ObservationKey, std::set<size_t>>&
     534             :                   nodes_registered_for_reductions) {
     535             :             const ObservationKey& key{observation_id.observation_key()};
     536             :             ASSERT(nodes_registered_for_reductions.find(key) !=
     537             :                        nodes_registered_for_reductions.end(),
     538             :                    "Performing reduction with unregistered ID key "
     539             :                        << observation_id.observation_key());
     540             :             const auto& registered_nodes =
     541             :                 nodes_registered_for_reductions.at(key);
     542             : 
     543             :             if (UNLIKELY(registered_nodes.find(sender_node_number) ==
     544             :                          registered_nodes.end())) {
     545             :               ERROR("Node " << sender_node_number
     546             :                             << " was not registered for the observation id "
     547             :                             << observation_id);
     548             :             }
     549             : 
     550             :             reduction_data = &*reduction_data_ptr;
     551             :             reduction_names_map = &*reduction_names_map_ptr;
     552             :             nodes_contributed = &*nodes_contributed_ptr;
     553             :             reduction_data_lock = &*reduction_data_lock_ptr;
     554             :             reduction_file_lock = &*reduction_file_lock_ptr;
     555             :             observations_registered_with_id =
     556             :                 nodes_registered_for_reductions.at(key).size();
     557             :           },
     558             :           make_not_null(&box),
     559             :           db::get<Tags::NodesExpectedToContributeReductions>(box));
     560             :     }
     561             : 
     562             :     ASSERT(
     563             :         observations_registered_with_id != std::numeric_limits<size_t>::max(),
     564             :         "Failed to set observations_registered_with_id when mutating the "
     565             :         "DataBox. This is a bug in the code.");
     566             : 
     567             :     bool write_to_disk = false;
     568             :     // Now that we've retrieved pointers to the data in the DataBox we wish to
     569             :     // manipulate, lock the data and manipulate it.
     570             :     {
     571             :       const std::lock_guard hold_lock(*reduction_data_lock);
     572             :       auto& nodes_contributed_to_observation =
     573             :           (*nodes_contributed)[observation_id];
     574             :       if (nodes_contributed_to_observation.find(sender_node_number) !=
     575             :           nodes_contributed_to_observation.end()) {
     576             :         ERROR("Already received reduction data at observation id "
     577             :               << observation_id << " from node " << sender_node_number);
     578             :       }
     579             :       nodes_contributed_to_observation.insert(sender_node_number);
     580             : 
     581             :       if (UNLIKELY(reduction_names.empty())) {
     582             :         ERROR(
     583             :             "The reduction names, which is a std::vector of the names of "
     584             :             "the columns in the file, must be non-empty.");
     585             :       }
     586             :       if (auto current_names = reduction_names_map->find(observation_id);
     587             :           current_names == reduction_names_map->end()) {
     588             :         reduction_names_map->emplace(observation_id,
     589             :                                      std::move(reduction_names));
     590             :       } else if (UNLIKELY(current_names->second != reduction_names)) {
     591             :         using ::operator<<;
     592             :         ERROR(
     593             :             "The reduction names passed in must match the currently "
     594             :             "known reduction names. Current ones are "
     595             :             << current_names->second << " while the received are "
     596             :             << reduction_names);
     597             :       }
     598             : 
     599             :       if (reduction_data->find(observation_id) == reduction_data->end()) {
     600             :         // This Action has been called for the first time,
     601             :         // so all we need to do is move the input data to the
     602             :         // reduction_data in the DataBox.
     603             :         reduction_data->operator[](observation_id) =
     604             :             std::move(received_reduction_data);
     605             :       } else {
     606             :         // This Action is being called at least the second time
     607             :         // (but not the final time if on node 0).
     608             :         reduction_data->at(observation_id)
     609             :             .combine(std::move(received_reduction_data));
     610             :       }
     611             : 
     612             :       // We use a bool `write_to_disk` to allow us to defer the data writing
     613             :       // until after we've unlocked the lock. For the same reason, we move the
     614             :       // final, reduced result into `received_reduction_data` and
     615             :       // `reduction_names`.
     616             :       if (nodes_contributed_to_observation.size() ==
     617             :           observations_registered_with_id) {
     618             :         write_to_disk = true;
     619             :         received_reduction_data =
     620             :             std::move(reduction_data->operator[](observation_id));
     621             :         reduction_names =
     622             :             std::move(reduction_names_map->operator[](observation_id));
     623             :         reduction_data->erase(observation_id);
     624             :         reduction_names_map->erase(observation_id);
     625             :         nodes_contributed->erase(observation_id);
     626             :       }
     627             :     }
     628             : 
     629             :     if (write_to_disk) {
     630             :       const std::lock_guard hold_lock(*reduction_file_lock);
     631             :       // NOLINTNEXTLINE(bugprone-use-after-move)
     632             :       received_reduction_data.finalize();
     633             :       if constexpr (not std::is_same_v<Formatter, NoFormatter>) {
     634             :         if (formatter.has_value()) {
     635             :           Parallel::printf(
     636             :               std::apply(*formatter, received_reduction_data.data()));
     637             :         }
     638             :       }
     639             :       ReductionActions_detail::write_data(
     640             :           subfile_name, observers::input_source_from_cache(cache),
     641             :           // NOLINTNEXTLINE(bugprone-use-after-move)
     642             :           std::move(reduction_names), std::move(received_reduction_data.data()),
     643             :           Parallel::get<Tags::ReductionFileName>(cache),
     644             :           std::make_index_sequence<sizeof...(ReductionDatums)>{});
     645             :     }
     646             :   }
     647             : };
     648             : 
     649             : /*!
     650             :  * \brief Write a single row of data to the reductions file without the need to
     651             :  * register or reduce anything, e.g. from a singleton component or from a
     652             :  * specific chare.
     653             :  *
     654             :  * Use observers::Actions::ContributeReductionData instead if you need to
     655             :  * perform a reduction before writing to the file.
     656             :  *
     657             :  * Invoke this action on the observers::ObserverWriter component on node 0. Pass
     658             :  * the following arguments when invoking this action:
     659             :  *
     660             :  * - `subfile_name`: the name of the `h5::Dat` subfile in the HDF5 file. Include
     661             :  *   a leading slash, e.g., `/element_data`.
     662             :  * - `legend`: a `std::vector<std::string>` of column labels for the quantities
     663             :  *   being observed (e.g. `{"Time", "L1ErrorDensity", "L2ErrorDensity"}`).
     664             :  * - `reduction_data`: a `std::tuple<...>` with the data to write. The tuple can
     665             :  *   hold either `double`s or `std::vector<double>`s and is flattened before it
     666             :  *   is written to the file to form a single row of values. The total number of
     667             :  *   values must match the length of the `legend`.
     668             :  */
     669           1 : struct WriteReductionDataRow {
     670             :   /// \brief The apply call for the threaded action
     671             :   template <typename ParallelComponent, typename DbTagsList,
     672             :             typename Metavariables, typename ArrayIndex, typename... Ts>
     673           1 :   static void apply(db::DataBox<DbTagsList>& box,
     674             :                     Parallel::GlobalCache<Metavariables>& cache,
     675             :                     const ArrayIndex& /*array_index*/,
     676             :                     const gsl::not_null<Parallel::NodeLock*> node_lock,
     677             :                     const std::string& subfile_name,
     678             :                     std::vector<std::string> legend,
     679             :                     std::tuple<Ts...>&& reduction_data) {
     680             :     apply<ParallelComponent>(box, node_lock, cache, subfile_name,
     681             :                              std::move(legend), std::move(reduction_data));
     682             :   }
     683             : 
     684             :   // The local synchronous action
     685           0 :   using return_type = void;
     686             : 
     687             :   /// \brief The apply call for the local synchronous action
     688             :   template <typename ParallelComponent, typename DbTagList,
     689             :             typename Metavariables, typename... Ts>
     690           1 :   static return_type apply(
     691             :       db::DataBox<DbTagList>& box,
     692             :       const gsl::not_null<Parallel::NodeLock*> /*node_lock*/,
     693             :       Parallel::GlobalCache<Metavariables>& cache,
     694             :       const std::string& subfile_name, std::vector<std::string> legend,
     695             :       std::tuple<Ts...>&& reduction_data) {
     696             :     auto& reduction_file_lock =
     697             :         db::get_mutable_reference<Tags::H5FileLock>(make_not_null(&box));
     698             :     const std::lock_guard hold_lock(reduction_file_lock);
     699             :     ThreadedActions::ReductionActions_detail::write_data(
     700             :         subfile_name, observers::input_source_from_cache(cache),
     701             :         std::move(legend), std::move(reduction_data),
     702             :         Parallel::get<Tags::ReductionFileName>(cache),
     703             :         std::make_index_sequence<sizeof...(Ts)>{});
     704             :   }
     705             : };
     706             : 
     707             : }  // namespace ThreadedActions
     708             : }  // namespace observers

Generated by: LCOV version 1.14