SpECTRE Documentation Coverage Report
Current view: top level - IO/Observer - VolumeActions.hpp Hit Total Coverage
Commit: 1f2210958b4f38fdc0400907ee7c6d5af5111418 Lines: 5 13 38.5 %
Date: 2025-12-05 05:03:31
Legend: Lines: hit not hit

          Line data    Source code
       1           0 : // Distributed under the MIT License.
       2             : // See LICENSE.txt for details.
       3             : 
       4             : #pragma once
       5             : 
       6             : #include <algorithm>
       7             : #include <cmath>
       8             : #include <cstddef>
       9             : #include <iterator>
      10             : #include <limits>
      11             : #include <mutex>
      12             : #include <optional>
      13             : #include <string>
      14             : #include <unordered_map>
      15             : 
      16             : #include "DataStructures/DataBox/DataBox.hpp"
      17             : #include "DataStructures/Index.hpp"
      18             : #include "Domain/Creators/Tags/Domain.hpp"
      19             : #include "Domain/FunctionsOfTime/Tags.hpp"
      20             : #include "Domain/Tags.hpp"
      21             : #include "IO/H5/AccessType.hpp"
      22             : #include "IO/H5/File.hpp"
      23             : #include "IO/H5/TensorData.hpp"
      24             : #include "IO/H5/VolumeData.hpp"
      25             : #include "IO/Observer/Helpers.hpp"
      26             : #include "IO/Observer/ObservationId.hpp"
      27             : #include "IO/Observer/ObserverComponent.hpp"
      28             : #include "IO/Observer/Tags.hpp"
      29             : #include "IO/Observer/TypeOfObservation.hpp"
      30             : #include "Parallel/ArrayComponentId.hpp"
      31             : #include "Parallel/GlobalCache.hpp"
      32             : #include "Parallel/Info.hpp"
      33             : #include "Parallel/Invoke.hpp"
      34             : #include "Parallel/Local.hpp"
      35             : #include "Parallel/NodeLock.hpp"
      36             : #include "Parallel/ParallelComponentHelpers.hpp"
      37             : #include "Utilities/Algorithm.hpp"
      38             : #include "Utilities/ErrorHandling/Error.hpp"
      39             : #include "Utilities/Gsl.hpp"
      40             : #include "Utilities/Requires.hpp"
      41             : #include "Utilities/Serialization/Serialize.hpp"
      42             : #include "Utilities/StdHelpers.hpp"
      43             : #include "Utilities/TMPL.hpp"
      44             : #include "Utilities/TaggedTuple.hpp"
      45             : 
      46             : namespace observers {
      47             : /// \cond
      48             : namespace ThreadedActions {
      49             : struct ContributeVolumeDataToWriter;
      50             : }  // namespace ThreadedActions
      51             : /// \endcond
      52             : namespace Actions {
      53             : 
      54             : /*!
      55             :  * \ingroup ObserversGroup
      56             :  * \brief Send volume tensor data to the observer.
      57             :  *
      58             :  * The caller of this Action (which is to be invoked on the Observer parallel
      59             :  * component) must pass in an `observation_id` used to uniquely identify the
      60             :  * observation in time, the name of the `h5::VolumeData` subfile in the HDF5
      61             :  * file (e.g. `/element_data`, where the slash is important), the contributing
      62             :  * parallel component element's component id, and the `ElementVolumeData`
      63             :  * to be written to disk.
      64             :  */
      65           1 : struct ContributeVolumeData {
      66             :   template <typename ParallelComponent, typename DbTagsList,
      67             :             typename Metavariables, typename ArrayIndex>
      68           0 :   static void apply(
      69             :       db::DataBox<DbTagsList>& box, Parallel::GlobalCache<Metavariables>& cache,
      70             :       const ArrayIndex& array_index,
      71             :       const observers::ObservationId& observation_id,
      72             :       const std::string& subfile_name,
      73             :       const Parallel::ArrayComponentId& sender_array_id,
      74             :       ElementVolumeData&& received_volume_data,
      75             :       const std::optional<std::string>& dependency = std::nullopt) {
      76             :     db::mutate<Tags::TensorData, Tags::ContributorsOfTensorData>(
      77             :         [&array_index, &cache, &received_volume_data, &observation_id,
      78             :          &sender_array_id, &subfile_name, &dependency](
      79             :             const gsl::not_null<std::unordered_map<
      80             :                 observers::ObservationId,
      81             :                 std::unordered_map<Parallel::ArrayComponentId,
      82             :                                    ElementVolumeData>>*>
      83             :                 volume_data,
      84             :             const gsl::not_null<std::unordered_map<
      85             :                 ObservationId, std::unordered_set<Parallel::ArrayComponentId>>*>
      86             :                 contributed_volume_data_ids,
      87             :             const std::unordered_map<
      88             :                 ObservationKey,
      89             :                 std::unordered_set<Parallel::ArrayComponentId>>&
      90             :                 registered_array_component_ids) mutable {  // NOLINT(spectre-mutable)
      91             :           const ObservationKey& key{observation_id.observation_key()};
      92             :           if (UNLIKELY(registered_array_component_ids.find(key) ==
      93             :                        registered_array_component_ids.end())) {
      94             :             ERROR("Receiving data from observation id "
      95             :                   << observation_id << " that was never registered.");
      96             :           }
      97             :           const auto& registered_ids = registered_array_component_ids.at(key);
      98             :           if (UNLIKELY(registered_ids.find(sender_array_id) ==
      99             :                        registered_ids.end())) {
     100             :             ERROR("Receiving volume data from array component id "
     101             :                   << sender_array_id << " that is not registered.");
     102             :           }
     103             : 
     104             :           auto& contributed_array_ids =
     105             :               (*contributed_volume_data_ids)[observation_id];
     106             :           if (UNLIKELY(contributed_array_ids.find(sender_array_id) !=
     107             :                        contributed_array_ids.end())) {
     108             :             ERROR("Already received volume data to observation id "
     109             :                   << observation_id << " from array component id "
     110             :                   << sender_array_id);
     111             :           }
     112             :           contributed_array_ids.insert(sender_array_id);
     113             : 
     114             :           if ((not volume_data->contains(observation_id)) or
     115             :               (not volume_data->at(observation_id).contains(sender_array_id))) {
     116             :             volume_data->operator[](observation_id)
     117             :                 .emplace(sender_array_id, std::move(received_volume_data));
     118             :           } else {
     119             :             auto& current_data =
     120             :                 volume_data->at(observation_id).at(sender_array_id);
     121             :             if (UNLIKELY(not alg::equal(current_data.extents,
     122             :                                         received_volume_data.extents))) {
     123             :               ERROR(
     124             :                   "The extents from the same volume component at a specific "
     125             :                   "observation should always be the same. For example, the "
     126             :                   "extents of a dG element should be the same for all calls to "
     127             :                   "ContributeVolumeData that occur at the same time.");
     128             :             }
     129             :             current_data.tensor_components.insert(
     130             :                 current_data.tensor_components.end(),
     131             :                 std::make_move_iterator(
     132             :                     received_volume_data.tensor_components.begin()),
     133             :                 std::make_move_iterator(
     134             :                     received_volume_data.tensor_components.end()));
     135             :           }
     136             : 
     137             :           // Check if we have received all "volume" data from the registered
     138             :           // elements. If so we copy it to the nodegroup volume writer.
     139             :           if (contributed_array_ids.size() == registered_ids.size()) {
     140             :             auto& local_writer = *Parallel::local_branch(
     141             :                 Parallel::get_parallel_component<ObserverWriter<Metavariables>>(
     142             :                     cache));
     143             :             Parallel::threaded_action<
     144             :                 ThreadedActions::ContributeVolumeDataToWriter>(
     145             :                 local_writer, observation_id,
     146             :                 Parallel::make_array_component_id<ParallelComponent>(
     147             :                     array_index),
     148             :                 subfile_name, std::move((*volume_data)[observation_id]),
     149             :                 dependency);
     150             :             volume_data->erase(observation_id);
     151             :             contributed_volume_data_ids->erase(observation_id);
     152             :           }
     153             :         },
     154             :         make_not_null(&box),
     155             :         db::get<Tags::ExpectedContributorsForObservations>(box));
     156             :   }
     157             : };
     158             : }  // namespace Actions
     159             : 
     160             : namespace ThreadedActions {
     161             : namespace VolumeActions_detail {
     162             : void write_data(const std::string& h5_file_name,
     163             :                 const std::string& input_source,
     164             :                 const std::string& subfile_path,
     165             :                 const observers::ObservationId& observation_id,
     166             :                 std::vector<ElementVolumeData>&& volume_data);
     167             : 
     168             : template <typename ParallelComponent, typename Metavariables,
     169             :           typename VolumeDataAtObsId>
     170             : void write_combined_volume_data(
     171             :     Parallel::GlobalCache<Metavariables>& cache,
     172             :     const observers::ObservationId& observation_id,
     173             :     const VolumeDataAtObsId& volume_data,
     174             :     const gsl::not_null<Parallel::NodeLock*> volume_file_lock,
     175             :     const std::string& subfile_name) {
     176             :   ASSERT(not volume_data.empty(),
     177             :          "Failed to populate volume_data before trying to write it.");
     178             : 
     179             :   std::vector<ElementVolumeData> volume_data_to_write;
     180             : 
     181             :   if constexpr (std::is_same_v<tmpl::at_c<VolumeDataAtObsId, 1>,
     182             :                                ElementVolumeData>) {
     183             :     volume_data_to_write.reserve(volume_data.size());
     184             :     for (const auto& [id, element] : volume_data) {
     185             :       (void)id;  // avoid compiler warnings
     186             :       volume_data_to_write.push_back(element);
     187             :     }
     188             :   } else {
     189             :     size_t total_size = 0;
     190             :     for (const auto& [id, vec_elements] : volume_data) {
     191             :       (void)id;  // avoid compiler warnings
     192             :       total_size += vec_elements.size();
     193             :     }
     194             :     volume_data_to_write.reserve(total_size);
     195             : 
     196             :     for (const auto& [id, vec_elements] : volume_data) {
     197             :       (void)id;  // avoid compiler warnings
     198             :       volume_data_to_write.insert(volume_data_to_write.end(),
     199             :                                   vec_elements.begin(), vec_elements.end());
     200             :     }
     201             :   }
     202             : 
     203             :   // Write to file. We use a separate node lock because writing can be
     204             :   // very time consuming (it's network dependent, depends on how full the
     205             :   // disks are, what other users are doing, etc.) and we want to be able
     206             :   // to continue to work on the nodegroup while we are writing data to
     207             :   // disk.
     208             :   const std::lock_guard hold_lock(*volume_file_lock);
     209             :   {
     210             :     // Scoping is for closing HDF5 file before we release the lock.
     211             :     const auto& file_prefix = Parallel::get<Tags::VolumeFileName>(cache);
     212             :     auto& my_proxy = Parallel::get_parallel_component<ParallelComponent>(cache);
     213             :     h5::H5File<h5::AccessType::ReadWrite> h5file(
     214             :         file_prefix +
     215             :             std::to_string(
     216             :                 Parallel::my_node<int>(*Parallel::local_branch(my_proxy))) +
     217             :             ".h5",
     218             :         true, observers::input_source_from_cache(cache));
     219             :     constexpr size_t version_number = 0;
     220             :     auto& volume_file =
     221             :         h5file.try_insert<h5::VolumeData>(subfile_name, version_number);
     222             : 
     223             :     // Serialize domain. See `Domain` docs for details on the serialization.
     224             :     // The domain is retrieved from the global cache using the standard
     225             :     // domain tag. If more flexibility is required here later, then the
     226             :     // domain can be passed along with the `ContributeVolumeData` action.
     227             :     std::optional<std::vector<char>> serialized_domain{};
     228             :     if (not volume_file.has_domain()) {
     229             :       serialized_domain = serialize(
     230             :           Parallel::get<domain::Tags::Domain<Metavariables::volume_dim>>(
     231             :               cache));
     232             :     }
     233             : 
     234             :     std::optional<std::vector<char>> serialized_global_functions_of_time =
     235             :         std::nullopt;
     236             :     std::optional<std::vector<char>> serialized_observation_functions_of_time =
     237             :         std::nullopt;
     238             :     if constexpr (Parallel::is_in_global_cache<Metavariables,
     239             :                                                domain::Tags::FunctionsOfTime>) {
     240             :       const auto& functions_of_time = get<domain::Tags::FunctionsOfTime>(cache);
     241             :       serialized_global_functions_of_time = serialize(functions_of_time);
     242             :       // NOLINTNEXTLINE(misc-const-correctness)
     243             :       domain::FunctionsOfTimeMap observation_functions_of_time{};
     244             :       const double obs_time = observation_id.value();
     245             :       // Generally, the functions of time should be valid when we
     246             :       // perform an observation.  The exception is when running in an
     247             :       // AtCleanup event, in which case the observation time is a
     248             :       // bogus value and we just skip writing the values.
     249             :       if (alg::all_of(functions_of_time, [&](const auto& fot) {
     250             :             const auto bounds = fot.second->time_bounds();
     251             :             return bounds[0] <= obs_time and obs_time <= bounds[1];
     252             :           })) {
     253             :         // create a new function of time, effectively truncating the history.
     254             :         for (const auto& [name, fot_ptr] : functions_of_time) {
     255             :           observation_functions_of_time[name] = fot_ptr->create_at_time(
     256             :               obs_time, obs_time + 100.0 *
     257             :                                        std::numeric_limits<double>::epsilon() *
     258             :                                        std::max(std::abs(obs_time), 1.0));
     259             :         }
     260             :         serialized_observation_functions_of_time =
     261             :             serialize(observation_functions_of_time);
     262             :       }
     263             :     }
     264             : 
     265             :     // Write the data to the file
     266             :     volume_file.write_volume_data(observation_id.hash(), observation_id.value(),
     267             :                                   volume_data_to_write, serialized_domain,
     268             :                                   serialized_observation_functions_of_time,
     269             :                                   serialized_global_functions_of_time);
     270             :   }
     271             : }
     272             : }  // namespace VolumeActions_detail
     273             : /*!
     274             :  * \ingroup ObserversGroup
     275             :  * \brief Move data to the observer writer for writing to disk.
     276             :  *
     277             :  * Once data from all cores is collected this action writes the data to disk if
     278             :  * there isn't a dependency. Or if there is a dependency and it has been
     279             :  * received already. If there is a dependency but it hasn't been received yet,
     280             :  * data will be written by a call to `ContributeDependency`.
     281             :  */
     282           1 : struct ContributeVolumeDataToWriter {
     283             :   template <typename ParallelComponent, typename DbTagsList,
     284             :             typename Metavariables, typename ArrayIndex>
     285           0 :   static void apply(
     286             :       db::DataBox<DbTagsList>& box, Parallel::GlobalCache<Metavariables>& cache,
     287             :       const ArrayIndex& /*array_index*/,
     288             :       const gsl::not_null<Parallel::NodeLock*> node_lock,
     289             :       const observers::ObservationId& observation_id,
     290             :       Parallel::ArrayComponentId observer_group_id,
     291             :       const std::string& subfile_name,
     292             :       std::unordered_map<Parallel::ArrayComponentId,
     293             :                          std::vector<ElementVolumeData>>&& received_volume_data,
     294             :       const std::optional<std::string>& dependency = std::nullopt) {
     295             :     apply_impl<Tags::InterpolatorTensorData, ParallelComponent>(
     296             :         box, cache, node_lock, observation_id, observer_group_id, subfile_name,
     297             :         std::move(received_volume_data), dependency);
     298             :   }
     299             : 
     300             :   template <typename ParallelComponent, typename DbTagsList,
     301             :             typename Metavariables, typename ArrayIndex>
     302           0 :   static void apply(
     303             :       db::DataBox<DbTagsList>& box, Parallel::GlobalCache<Metavariables>& cache,
     304             :       const ArrayIndex& /*array_index*/,
     305             :       const gsl::not_null<Parallel::NodeLock*> node_lock,
     306             :       const observers::ObservationId& observation_id,
     307             :       Parallel::ArrayComponentId observer_group_id,
     308             :       const std::string& subfile_name,
     309             :       std::unordered_map<Parallel::ArrayComponentId, ElementVolumeData>&&
     310             :           received_volume_data,
     311             :       const std::optional<std::string>& dependency = std::nullopt) {
     312             :     apply_impl<Tags::TensorData, ParallelComponent>(
     313             :         box, cache, node_lock, observation_id, observer_group_id, subfile_name,
     314             :         std::move(received_volume_data), dependency);
     315             :   }
     316             : 
     317             :  private:
     318             :   template <typename TensorDataTag, typename ParallelComponent,
     319             :             typename DbTagsList, typename Metavariables,
     320             :             typename VolumeDataAtObsId>
     321           0 :   static void apply_impl(
     322             :       db::DataBox<DbTagsList>& box, Parallel::GlobalCache<Metavariables>& cache,
     323             :       const gsl::not_null<Parallel::NodeLock*> node_lock,
     324             :       const observers::ObservationId& observation_id,
     325             :       Parallel::ArrayComponentId observer_group_id,
     326             :       const std::string& subfile_name, VolumeDataAtObsId received_volume_data,
     327             :       const std::optional<std::string>& dependency = std::nullopt) {
     328             :     // The below gymnastics with pointers is done in order to minimize the
     329             :     // time spent locking the entire node, which is necessary because the
     330             :     // DataBox does not allow any function calls, either get and mutate, during
     331             :     // a mutate. We separate out writing from the operations that edit the
     332             :     // DataBox since writing to disk can be very slow, but moving data around is
     333             :     // comparatively quick.
     334             :     Parallel::NodeLock* volume_file_lock = nullptr;
     335             :     bool perform_write = false;
     336             :     VolumeDataAtObsId volume_data{};
     337             : 
     338             :     {
     339             :       const std::lock_guard hold_lock(*node_lock);
     340             : 
     341             :       // Set file lock for later
     342             :       db::mutate<Tags::H5FileLock>(
     343             :           [&volume_file_lock](
     344             :               const gsl::not_null<Parallel::NodeLock*> volume_file_lock_ptr) {
     345             :             volume_file_lock = &*volume_file_lock_ptr;
     346             :           },
     347             :           make_not_null(&box));
     348             : 
     349             :       ASSERT(volume_file_lock != nullptr,
     350             :              "Failed to set volume_file_lock in the mutate");
     351             : 
     352             :       const auto& observations_registered =
     353             :           db::get<Tags::ExpectedContributorsForObservations>(box);
     354             : 
     355             :       const ObservationKey& key = observation_id.observation_key();
     356             :       if (LIKELY(observations_registered.contains(key))) {
     357             :         if (UNLIKELY(not observations_registered.at(key).contains(
     358             :                 observer_group_id))) {
     359             :           ERROR("The observer group id "
     360             :                 << observer_group_id
     361             :                 << " was not registered for the observation id "
     362             :                 << observation_id);
     363             :         }
     364             :       } else {
     365             :         ERROR("key " << key
     366             :                      << " not in the registered group ids. Known keys are "
     367             :                      << keys_of(observations_registered));
     368             :       }
     369             : 
     370             :       const size_t observations_registered_with_id =
     371             :           observations_registered.at(key).size();
     372             : 
     373             :       // Ok because we have the node lock
     374             :       auto& volume_observers_contributed =
     375             :           db::get_mutable_reference<Tags::ContributorsOfTensorData>(
     376             :               make_not_null(&box));
     377             :       auto& all_volume_data =
     378             :           db::get_mutable_reference<TensorDataTag>(make_not_null(&box));
     379             :       auto& box_dependencies =
     380             :           db::get_mutable_reference<Tags::Dependencies>(make_not_null(&box));
     381             : 
     382             :       auto& contributed_group_ids =
     383             :           volume_observers_contributed[observation_id];
     384             : 
     385             :       if (UNLIKELY(contributed_group_ids.contains(observer_group_id))) {
     386             :         ERROR("Already received reduction data to observation id "
     387             :               << observation_id << " from array component id "
     388             :               << observer_group_id);
     389             :       }
     390             :       contributed_group_ids.insert(observer_group_id);
     391             : 
     392             :       // Add received volume data to the box
     393             :       if (all_volume_data.contains(observation_id)) {
     394             :         auto& current_data = all_volume_data.at(observation_id);
     395             :         current_data.insert(
     396             :             std::make_move_iterator(received_volume_data.begin()),
     397             :             std::make_move_iterator(received_volume_data.end()));
     398             :       } else {
     399             :         // We haven't been called before on this processing element.
     400             :         all_volume_data[observation_id] = std::move(received_volume_data);
     401             :       }
     402             : 
     403             :       // Check if we have received all "volume" data from the Observer
     404             :       // group. If so we write to disk.
     405             :       if (volume_observers_contributed.at(observation_id).size() ==
     406             :           observations_registered_with_id) {
     407             :         // Check if
     408             :         //  1. there is an external dependencies
     409             :         if (dependency.has_value()) {
     410             :           //  2. if there is, that we have received something at this time
     411             :           //  3. that the dependencies are the same
     412             :           if (box_dependencies.contains(observation_id)) {
     413             :             if (UNLIKELY(box_dependencies.at(observation_id).first !=
     414             :                          dependency.value())) {
     415             :               ERROR(
     416             :                   "The dependency that was sent to the ObserverWriter from the "
     417             :                   "elements ("
     418             :                   << dependency.value()
     419             :                   << ") does not match the dependency received from "
     420             :                      "ContributeDependency ("
     421             :                   << box_dependencies.at(observation_id).first << ").");
     422             :             }
     423             :             //  4. that we are writing the volume data to disk
     424             :             if (box_dependencies.at(observation_id).second) {
     425             :               perform_write = true;
     426             :               volume_data = std::move(all_volume_data[observation_id]);
     427             :             }
     428             : 
     429             :             // Whether or not we are writing data to disk, we clean up because
     430             :             // we have received both the volume data and the dependency
     431             :             all_volume_data.erase(observation_id);
     432             :             volume_observers_contributed.erase(observation_id);
     433             :             box_dependencies.erase(observation_id);
     434             :           }
     435             :         } else {
     436             :           perform_write = true;
     437             :           volume_data = std::move(all_volume_data[observation_id]);
     438             :           all_volume_data.erase(observation_id);
     439             :           volume_observers_contributed.erase(observation_id);
     440             :         }
     441             :       }
     442             :     }
     443             : 
     444             :     if (perform_write) {
     445             :       VolumeActions_detail::write_combined_volume_data<ParallelComponent>(
     446             :           cache, observation_id, volume_data, make_not_null(volume_file_lock),
     447             :           subfile_name);
     448             :     }
     449             :   }
     450             : };
     451             : 
     452             : /*!
     453             :  * \brief Threaded action that will add a dependency to the ObserverWriter for a
     454             :  * given ObservationId ( \p time + \p volume_subfile_name).
     455             :  *
     456             :  * \details If not all the volume data for this ObservationId has been received
     457             :  * yet, then this will just add the dependency to the box and exit without
     458             :  * writing anything. If all volume data arrives before this action is called,
     459             :  * then it will write out the volume data (or remove it if we aren't writing).
     460             :  */
     461           1 : struct ContributeDependency {
     462             :   template <typename ParallelComponent, typename DbTagsList,
     463             :             typename Metavariables, typename ArrayIndex>
     464           0 :   static void apply(db::DataBox<DbTagsList>& box,
     465             :                     Parallel::GlobalCache<Metavariables>& cache,
     466             :                     const ArrayIndex& /*array_index*/,
     467             :                     const gsl::not_null<Parallel::NodeLock*> node_lock,
     468             :                     const double time, const std::string& dependency,
     469             :                     std::string volume_subfile_name,
     470             :                     const bool write_volume_data) {
     471             :     if (not volume_subfile_name.starts_with("/")) {
     472             :       volume_subfile_name = "/" + volume_subfile_name;
     473             :     }
     474             :     if (not volume_subfile_name.ends_with(".vol")) {
     475             :       volume_subfile_name += ".vol";
     476             :     }
     477             : 
     478             :     const ObservationId observation_id{time, volume_subfile_name};
     479             : 
     480             :     // The below gymnastics with pointers is done in order to minimize the
     481             :     // time spent locking the entire node, which is necessary because the
     482             :     // DataBox does not allow any function calls, either get and mutate, during
     483             :     // a mutate. We separate out writing from the operations that edit the
     484             :     // DataBox since writing to disk can be very slow, but moving data around is
     485             :     // comparatively quick.
     486             :     Parallel::NodeLock* volume_file_lock = nullptr;
     487             :     bool perform_write = false;
     488             :     std::unordered_map<Parallel::ArrayComponentId, ElementVolumeData>
     489             :         volume_data{};
     490             : 
     491             :     // For now just hold the entire node. We can optimize with different locks
     492             :     // later on
     493             :     {
     494             :       const std::lock_guard hold_lock(*node_lock);
     495             : 
     496             :       db::mutate<Tags::H5FileLock, Tags::Dependencies>(
     497             :           [&](const gsl::not_null<Parallel::NodeLock*> volume_file_lock_ptr,
     498             :               const gsl::not_null<std::unordered_map<
     499             :                   ObservationId, std::pair<std::string, bool>>*>
     500             :                   dependencies) {
     501             :             volume_file_lock = &*volume_file_lock_ptr;
     502             :             (*dependencies)[observation_id] =
     503             :                 std::pair{dependency, write_volume_data};
     504             :           },
     505             :           make_not_null(&box));
     506             : 
     507             :       auto& volume_observers_contributed =
     508             :           db::get_mutable_reference<Tags::ContributorsOfTensorData>(
     509             :               make_not_null(&box));
     510             :       auto& all_volume_data =
     511             :           db::get_mutable_reference<Tags::TensorData>(make_not_null(&box));
     512             :       auto& box_dependencies =
     513             :           db::get_mutable_reference<Tags::Dependencies>(make_not_null(&box));
     514             :       const auto& expected_contributors =
     515             :           db::get<Tags::ExpectedContributorsForObservations>(box);
     516             : 
     517             :       if (not expected_contributors.contains(
     518             :               observation_id.observation_key())) {
     519             :         ERROR("Key " << observation_id.observation_key()
     520             :                      << " was not registered.");
     521             :       }
     522             : 
     523             :       // We have not received any volume data at this time so we can't do
     524             :       // anything
     525             :       if (not(volume_observers_contributed.contains(observation_id) and
     526             :               all_volume_data.contains(observation_id))) {
     527             :         return;
     528             :       }
     529             : 
     530             :       // Check if we have received all "volume" data from the Observer
     531             :       // group. If so we write to disk. Then always delete it since the volume
     532             :       // data was waiting for this dependency to arrive to be written
     533             :       if (volume_observers_contributed.at(observation_id).size() ==
     534             :           expected_contributors.at(observation_id.observation_key()).size()) {
     535             :         if (write_volume_data) {
     536             :           perform_write = true;
     537             :           volume_data = std::move(all_volume_data.at(observation_id));
     538             :         }
     539             : 
     540             :         all_volume_data.erase(observation_id);
     541             :         volume_observers_contributed.erase(observation_id);
     542             :         box_dependencies.erase(observation_id);
     543             :       }
     544             :     }
     545             : 
     546             :     if (perform_write) {
     547             :       VolumeActions_detail::write_combined_volume_data<ParallelComponent>(
     548             :           cache, observation_id, volume_data, make_not_null(volume_file_lock),
     549             :           volume_subfile_name);
     550             :     }
     551             :   }
     552             : };
     553             : 
     554             : /*!
     555             :  * \brief Write volume data (such as surface data) at a given time (specified by
     556             :  * an `ObservationId`) without the need to register or reduce anything, e.g.
     557             :  * from a singleton component or from a specific chare.
     558             :  *
     559             :  * Use `observers::Actions::ContributeVolumeDataToWriter` instead if you need to
     560             :  * write volume data from an array chare (e.g., writing volume data from all the
     561             :  * elements in a domain).
     562             :  *
     563             :  * Invoke this action on the `observers::ObserverWriter` component on node 0.
     564             :  * Pass the following arguments when invoking this action:
     565             :  *
     566             :  * - `h5_file_name`: the name of the HDF5 file where the volume data is to be
     567             :  * written (without the .h5 extension).
     568             :  * - `subfile_path`: the path where the volume data should be written in the
     569             :  *   HDF5 file. Include a leading slash, e.g., `/AhA`.
     570             :  * - `observation_id`: the ObservationId corresponding to the volume data.
     571             :  * - `volume_data`: the volume data to be written.
     572             :  */
     573           1 : struct WriteVolumeData {
     574             :   template <typename ParallelComponent, typename DbTagsList,
     575             :             typename Metavariables, typename ArrayIndex, typename... Ts,
     576             :             typename DataBox = db::DataBox<DbTagsList>>
     577           0 :   static void apply(db::DataBox<DbTagsList>& box,
     578             :                     Parallel::GlobalCache<Metavariables>& cache,
     579             :                     const ArrayIndex& /*array_index*/,
     580             :                     const gsl::not_null<Parallel::NodeLock*> /*node_lock*/,
     581             :                     const std::string& h5_file_name,
     582             :                     const std::string& subfile_path,
     583             :                     const observers::ObservationId& observation_id,
     584             :                     std::vector<ElementVolumeData>&& volume_data) {
     585             :     auto& volume_file_lock =
     586             :         db::get_mutable_reference<Tags::H5FileLock>(make_not_null(&box));
     587             :     const std::lock_guard hold_lock(volume_file_lock);
     588             :     VolumeActions_detail::write_data(
     589             :         h5_file_name, observers::input_source_from_cache(cache), subfile_path,
     590             :         observation_id, std::move(volume_data));
     591             :   }
     592             : 
     593             :   // For a local synchronous action
     594           0 :   using return_type = void;
     595             : 
     596             :   /// \brief The apply call for the local synchronous action
     597             :   template <typename ParallelComponent, typename DbTagsList,
     598             :             typename Metavariables, typename... Ts,
     599             :             typename DataBox = db::DataBox<DbTagsList>>
     600           1 :   static return_type apply(
     601             :       db::DataBox<DbTagsList>& box,
     602             :       const gsl::not_null<Parallel::NodeLock*> /*node_lock*/,
     603             :       Parallel::GlobalCache<Metavariables>& cache,
     604             :       const std::string& h5_file_name, const std::string& subfile_path,
     605             :       const observers::ObservationId& observation_id,
     606             :       std::vector<ElementVolumeData>&& volume_data) {
     607             :     auto& volume_file_lock =
     608             :         db::get_mutable_reference<Tags::H5FileLock>(make_not_null(&box));
     609             :     const std::lock_guard hold_lock(volume_file_lock);
     610             :     VolumeActions_detail::write_data(
     611             :         h5_file_name, observers::input_source_from_cache(cache), subfile_path,
     612             :         observation_id, std::move(volume_data));
     613             :   }
     614             : };
     615             : }  // namespace ThreadedActions
     616             : }  // namespace observers

Generated by: LCOV version 1.14