SpECTRE Documentation Coverage Report
Current view: top level - IO/Observer - VolumeActions.hpp Hit Total Coverage
Commit: 3ffcbc8ecf43797401b60bcca17d6040ee06f013 Lines: 6 14 42.9 %
Date: 2026-03-03 02:01:44
Legend: Lines: hit not hit

          Line data    Source code
       1           0 : // Distributed under the MIT License.
       2             : // See LICENSE.txt for details.
       3             : 
       4             : #pragma once
       5             : 
       6             : #include <algorithm>
       7             : #include <cmath>
       8             : #include <cstddef>
       9             : #include <iterator>
      10             : #include <limits>
      11             : #include <mutex>
      12             : #include <optional>
      13             : #include <string>
      14             : #include <unordered_map>
      15             : #include <utility>
      16             : #include <vector>
      17             : 
      18             : #include "DataStructures/DataBox/DataBox.hpp"
      19             : #include "DataStructures/Index.hpp"
      20             : #include "Domain/Creators/Tags/Domain.hpp"
      21             : #include "Domain/FunctionsOfTime/Tags.hpp"
      22             : #include "Domain/Tags.hpp"
      23             : #include "IO/H5/AccessType.hpp"
      24             : #include "IO/H5/File.hpp"
      25             : #include "IO/H5/TensorData.hpp"
      26             : #include "IO/H5/VolumeData.hpp"
      27             : #include "IO/Observer/Helpers.hpp"
      28             : #include "IO/Observer/ObservationId.hpp"
      29             : #include "IO/Observer/ObserverComponent.hpp"
      30             : #include "IO/Observer/Tags.hpp"
      31             : #include "IO/Observer/TypeOfObservation.hpp"
      32             : #include "Parallel/ArrayComponentId.hpp"
      33             : #include "Parallel/GlobalCache.hpp"
      34             : #include "Parallel/Info.hpp"
      35             : #include "Parallel/Invoke.hpp"
      36             : #include "Parallel/Local.hpp"
      37             : #include "Parallel/NodeLock.hpp"
      38             : #include "Parallel/ParallelComponentHelpers.hpp"
      39             : #include "Utilities/Algorithm.hpp"
      40             : #include "Utilities/ErrorHandling/Assert.hpp"
      41             : #include "Utilities/ErrorHandling/Error.hpp"
      42             : #include "Utilities/Gsl.hpp"
      43             : #include "Utilities/MakeVector.hpp"
      44             : #include "Utilities/Requires.hpp"
      45             : #include "Utilities/Serialization/Serialize.hpp"
      46             : #include "Utilities/StdHelpers.hpp"
      47             : #include "Utilities/TMPL.hpp"
      48             : #include "Utilities/TaggedTuple.hpp"
      49             : 
      50             : namespace observers {
      51             : /// \cond
      52             : namespace ThreadedActions {
      53             : struct ContributeVolumeDataToWriter;
      54             : }  // namespace ThreadedActions
      55             : /// \endcond
      56             : namespace Actions {
      57             : 
      58             : /*!
      59             :  * \ingroup ObserversGroup
      60             :  * \brief Send volume tensor data to the observer.
      61             :  *
      62             :  * The caller of this Action (which is to be invoked on the Observer parallel
      63             :  * component) must pass in an `observation_id` used to uniquely identify the
      64             :  * observation in time, the name of the `h5::VolumeData` subfile in the HDF5
      65             :  * file (e.g. `/element_data`, where the slash is important), the contributing
      66             :  * parallel component element's component id, and the `ElementVolumeData`
      67             :  * to be written to disk.
      68             :  */
      69           1 : struct ContributeVolumeData {
      70             :   template <typename ParallelComponent, typename DbTagsList,
      71             :             typename Metavariables, typename ArrayIndex>
      72           0 :   static void apply(
      73             :       db::DataBox<DbTagsList>& box, Parallel::GlobalCache<Metavariables>& cache,
      74             :       const ArrayIndex& array_index,
      75             :       const observers::ObservationId& observation_id,
      76             :       const std::string& subfile_name,
      77             :       const Parallel::ArrayComponentId& sender_array_id,
      78             :       ElementVolumeData&& received_volume_data,
      79             :       const std::optional<std::vector<char>>& serialized_functions_of_time =
      80             :           std::nullopt,
      81             :       const std::optional<std::string>& dependency = std::nullopt) {
      82             :     db::mutate<Tags::TensorData, Tags::ContributorsOfTensorData>(
      83             :         [&array_index, &cache, &received_volume_data, &observation_id,
      84             :          &sender_array_id, &subfile_name, &serialized_functions_of_time,
      85             :          &dependency](
      86             :             const gsl::not_null<std::unordered_map<
      87             :                 observers::ObservationId,
      88             :                 std::unordered_map<Parallel::ArrayComponentId,
      89             :                                    ElementVolumeData>>*>
      90             :                 volume_data,
      91             :             const gsl::not_null<std::unordered_map<
      92             :                 ObservationId, std::unordered_set<Parallel::ArrayComponentId>>*>
      93             :                 contributed_volume_data_ids,
      94             :             const std::unordered_map<
      95             :                 ObservationKey, std::unordered_set<Parallel::ArrayComponentId>>&
      96             :                 registered_array_component_ids) {
      97             :           const ObservationKey& key{observation_id.observation_key()};
      98             :           if (UNLIKELY(registered_array_component_ids.find(key) ==
      99             :                        registered_array_component_ids.end())) {
     100             :             ERROR("Receiving data from observation id "
     101             :                   << observation_id << " that was never registered.");
     102             :           }
     103             :           const auto& registered_ids = registered_array_component_ids.at(key);
     104             :           if (UNLIKELY(registered_ids.find(sender_array_id) ==
     105             :                        registered_ids.end())) {
     106             :             ERROR("Receiving volume data from array component id "
     107             :                   << sender_array_id << " that is not registered.");
     108             :           }
     109             : 
     110             :           auto& contributed_array_ids =
     111             :               (*contributed_volume_data_ids)[observation_id];
     112             :           if (UNLIKELY(contributed_array_ids.find(sender_array_id) !=
     113             :                        contributed_array_ids.end())) {
     114             :             ERROR("Already received volume data to observation id "
     115             :                   << observation_id << " from array component id "
     116             :                   << sender_array_id);
     117             :           }
     118             :           contributed_array_ids.insert(sender_array_id);
     119             : 
     120             :           if ((not volume_data->contains(observation_id)) or
     121             :               (not volume_data->at(observation_id).contains(sender_array_id))) {
     122             :             volume_data->operator[](observation_id)
     123             :                 .emplace(sender_array_id, std::move(received_volume_data));
     124             :           } else {
     125             :             auto& current_data =
     126             :                 volume_data->at(observation_id).at(sender_array_id);
     127             :             if (UNLIKELY(not alg::equal(current_data.extents,
     128             :                                         received_volume_data.extents))) {
     129             :               ERROR(
     130             :                   "The extents from the same volume component at a specific "
     131             :                   "observation should always be the same. For example, the "
     132             :                   "extents of a dG element should be the same for all calls to "
     133             :                   "ContributeVolumeData that occur at the same time.");
     134             :             }
     135             :             current_data.tensor_components.insert(
     136             :                 current_data.tensor_components.end(),
     137             :                 std::make_move_iterator(
     138             :                     received_volume_data.tensor_components.begin()),
     139             :                 std::make_move_iterator(
     140             :                     received_volume_data.tensor_components.end()));
     141             :           }
     142             : 
     143             :           // Check if we have received all "volume" data from the registered
     144             :           // elements. If so we copy it to the nodegroup volume writer.
     145             :           if (contributed_array_ids.size() == registered_ids.size()) {
     146             :             auto& local_writer = *Parallel::local_branch(
     147             :                 Parallel::get_parallel_component<ObserverWriter<Metavariables>>(
     148             :                     cache));
     149             :             Parallel::threaded_action<
     150             :                 ThreadedActions::ContributeVolumeDataToWriter>(
     151             :                 local_writer, observation_id,
     152             :                 Parallel::make_array_component_id<ParallelComponent>(
     153             :                     array_index),
     154             :                 subfile_name, std::move((*volume_data)[observation_id]),
     155             :                 serialized_functions_of_time, dependency);
     156             :             volume_data->erase(observation_id);
     157             :             contributed_volume_data_ids->erase(observation_id);
     158             :           }
     159             :         },
     160             :         make_not_null(&box),
     161             :         db::get<Tags::ExpectedContributorsForObservations>(box));
     162             :   }
     163             : };
     164             : }  // namespace Actions
     165             : 
     166             : namespace ThreadedActions {
     167             : namespace VolumeActions_detail {
     168             : void write_data(const std::string& h5_file_name,
     169             :                 const std::string& input_source,
     170             :                 const std::string& subfile_path,
     171             :                 const observers::ObservationId& observation_id,
     172             :                 std::vector<ElementVolumeData>&& volume_data);
     173             : 
     174             : template <typename ParallelComponent, typename Metavariables,
     175             :           typename VolumeDataAtObsId>
     176             : void write_combined_volume_data(
     177             :     Parallel::GlobalCache<Metavariables>& cache,
     178             :     const observers::ObservationId& observation_id,
     179             :     const VolumeDataAtObsId& volume_data,
     180             :     const gsl::not_null<Parallel::NodeLock*> volume_file_lock,
     181             :     const std::string& subfile_name,
     182             :     const std::optional<std::vector<char>>&
     183             :         serialized_observation_functions_of_time) {
     184             :   ASSERT(not volume_data.empty(),
     185             :          "Failed to populate volume_data before trying to write it.");
     186             : 
     187             :   std::vector<ElementVolumeData> volume_data_to_write;
     188             : 
     189             :   if constexpr (std::is_same_v<tmpl::at_c<VolumeDataAtObsId, 1>,
     190             :                                ElementVolumeData>) {
     191             :     volume_data_to_write.reserve(volume_data.size());
     192             :     for (const auto& [id, element] : volume_data) {
     193             :       (void)id;  // avoid compiler warnings
     194             :       volume_data_to_write.push_back(element);
     195             :     }
     196             :   } else {
     197             :     size_t total_size = 0;
     198             :     for (const auto& [id, vec_elements] : volume_data) {
     199             :       (void)id;  // avoid compiler warnings
     200             :       total_size += vec_elements.size();
     201             :     }
     202             :     volume_data_to_write.reserve(total_size);
     203             : 
     204             :     for (const auto& [id, vec_elements] : volume_data) {
     205             :       (void)id;  // avoid compiler warnings
     206             :       volume_data_to_write.insert(volume_data_to_write.end(),
     207             :                                   vec_elements.begin(), vec_elements.end());
     208             :     }
     209             :   }
     210             : 
     211             :   // Write to file. We use a separate node lock because writing can be
     212             :   // very time consuming (it's network dependent, depends on how full the
     213             :   // disks are, what other users are doing, etc.) and we want to be able
     214             :   // to continue to work on the nodegroup while we are writing data to
     215             :   // disk.
     216             :   const std::lock_guard hold_lock(*volume_file_lock);
     217             :   {
     218             :     // Scoping is for closing HDF5 file before we release the lock.
     219             :     const auto& file_prefix = Parallel::get<Tags::VolumeFileName>(cache);
     220             :     auto& my_proxy = Parallel::get_parallel_component<ParallelComponent>(cache);
     221             :     h5::H5File<h5::AccessType::ReadWrite> h5file(
     222             :         file_prefix +
     223             :             std::to_string(
     224             :                 Parallel::my_node<int>(*Parallel::local_branch(my_proxy))) +
     225             :             ".h5",
     226             :         true, observers::input_source_from_cache(cache));
     227             :     constexpr size_t version_number = 0;
     228             :     auto& volume_file =
     229             :         h5file.try_insert<h5::VolumeData>(subfile_name, version_number);
     230             : 
     231             :     // Serialize domain. See `Domain` docs for details on the serialization.
     232             :     // The domain is retrieved from the global cache using the standard
     233             :     // domain tag. If more flexibility is required here later, then the
     234             :     // domain can be passed along with the `ContributeVolumeData` action.
     235             :     std::optional<std::vector<char>> serialized_domain{};
     236             :     if (not volume_file.has_domain()) {
     237             :       serialized_domain = serialize(
     238             :           Parallel::get<domain::Tags::Domain<Metavariables::volume_dim>>(
     239             :               cache));
     240             :     }
     241             : 
     242             :     std::optional<std::vector<char>> serialized_global_functions_of_time =
     243             :         std::nullopt;
     244             :     if constexpr (Parallel::is_in_global_cache<Metavariables,
     245             :                                                domain::Tags::FunctionsOfTime>) {
     246             :       const auto& functions_of_time = get<domain::Tags::FunctionsOfTime>(cache);
     247             :       serialized_global_functions_of_time = serialize(functions_of_time);
     248             :     }
     249             : 
     250             :     // Write the data to the file
     251             :     volume_file.write_volume_data(observation_id.hash(), observation_id.value(),
     252             :                                   volume_data_to_write, serialized_domain,
     253             :                                   serialized_observation_functions_of_time,
     254             :                                   serialized_global_functions_of_time);
     255             :   }
     256             : }
     257             : }  // namespace VolumeActions_detail
     258             : /*!
     259             :  * \ingroup ObserversGroup
     260             :  * \brief Move data to the observer writer for writing to disk.
     261             :  *
     262             :  * Once data from all cores is collected this action writes the data to disk if
     263             :  * there isn't a dependency. Or if there is a dependency and it has been
     264             :  * received already. If there is a dependency but it hasn't been received yet,
     265             :  * data will be written by a call to `ContributeDependency`.
     266             :  */
     267           1 : struct ContributeVolumeDataToWriter {
     268             :   template <typename ParallelComponent, typename DbTagsList,
     269             :             typename Metavariables, typename ArrayIndex>
     270           0 :   static void apply(
     271             :       db::DataBox<DbTagsList>& box, Parallel::GlobalCache<Metavariables>& cache,
     272             :       const ArrayIndex& /*array_index*/,
     273             :       const gsl::not_null<Parallel::NodeLock*> node_lock,
     274             :       const observers::ObservationId& observation_id,
     275             :       Parallel::ArrayComponentId observer_group_id,
     276             :       const std::string& subfile_name,
     277             :       std::unordered_map<Parallel::ArrayComponentId,
     278             :                          std::vector<ElementVolumeData>>&& received_volume_data,
     279             :       const std::optional<std::vector<char>>& serialized_functions_of_time =
     280             :           std::nullopt,
     281             :       const std::optional<std::string>& dependency = std::nullopt) {
     282             :     apply_impl<Tags::InterpolatorTensorData, ParallelComponent>(
     283             :         box, cache, node_lock, observation_id, observer_group_id, subfile_name,
     284             :         std::move(received_volume_data), serialized_functions_of_time,
     285             :         dependency);
     286             :   }
     287             : 
     288             :   template <typename ParallelComponent, typename DbTagsList,
     289             :             typename Metavariables, typename ArrayIndex>
     290           0 :   static void apply(
     291             :       db::DataBox<DbTagsList>& box, Parallel::GlobalCache<Metavariables>& cache,
     292             :       const ArrayIndex& /*array_index*/,
     293             :       const gsl::not_null<Parallel::NodeLock*> node_lock,
     294             :       const observers::ObservationId& observation_id,
     295             :       Parallel::ArrayComponentId observer_group_id,
     296             :       const std::string& subfile_name,
     297             :       std::unordered_map<Parallel::ArrayComponentId, ElementVolumeData>&&
     298             :           received_volume_data,
     299             :       const std::optional<std::vector<char>>& serialized_functions_of_time =
     300             :           std::nullopt,
     301             :       const std::optional<std::string>& dependency = std::nullopt) {
     302             :     apply_impl<Tags::TensorData, ParallelComponent>(
     303             :         box, cache, node_lock, observation_id, observer_group_id, subfile_name,
     304             :         std::move(received_volume_data), serialized_functions_of_time,
     305             :         dependency);
     306             :   }
     307             : 
     308             :  private:
     309             :   template <typename TensorDataTag, typename ParallelComponent,
     310             :             typename DbTagsList, typename Metavariables,
     311             :             typename VolumeDataAtObsId>
     312           0 :   static void apply_impl(
     313             :       db::DataBox<DbTagsList>& box, Parallel::GlobalCache<Metavariables>& cache,
     314             :       const gsl::not_null<Parallel::NodeLock*> node_lock,
     315             :       const observers::ObservationId& observation_id,
     316             :       Parallel::ArrayComponentId observer_group_id,
     317             :       const std::string& subfile_name, VolumeDataAtObsId received_volume_data,
     318             :       const std::optional<std::vector<char>>& serialized_functions_of_time,
     319             :       const std::optional<std::string>& dependency) {
     320             :     // The below gymnastics with pointers is done in order to minimize the
     321             :     // time spent locking the entire node, which is necessary because the
     322             :     // DataBox does not allow any function calls, either get and mutate, during
     323             :     // a mutate. We separate out writing from the operations that edit the
     324             :     // DataBox since writing to disk can be very slow, but moving data around is
     325             :     // comparatively quick.
     326             :     Parallel::NodeLock* volume_file_lock = nullptr;
     327             :     bool perform_write = false;
     328             :     VolumeDataAtObsId volume_data{};
     329             : 
     330             :     {
     331             :       const std::lock_guard hold_lock(*node_lock);
     332             : 
     333             :       // Set file lock for later
     334             :       db::mutate<Tags::H5FileLock>(
     335             :           [&volume_file_lock](
     336             :               const gsl::not_null<Parallel::NodeLock*> volume_file_lock_ptr) {
     337             :             volume_file_lock = &*volume_file_lock_ptr;
     338             :           },
     339             :           make_not_null(&box));
     340             : 
     341             :       ASSERT(volume_file_lock != nullptr,
     342             :              "Failed to set volume_file_lock in the mutate");
     343             : 
     344             :       const auto& observations_registered =
     345             :           db::get<Tags::ExpectedContributorsForObservations>(box);
     346             : 
     347             :       const ObservationKey& key = observation_id.observation_key();
     348             :       if (LIKELY(observations_registered.contains(key))) {
     349             :         if (UNLIKELY(not observations_registered.at(key).contains(
     350             :                 observer_group_id))) {
     351             :           ERROR("The observer group id "
     352             :                 << observer_group_id
     353             :                 << " was not registered for the observation id "
     354             :                 << observation_id);
     355             :         }
     356             :       } else {
     357             :         ERROR("key " << key
     358             :                      << " not in the registered group ids. Known keys are "
     359             :                      << keys_of(observations_registered));
     360             :       }
     361             : 
     362             :       const size_t observations_registered_with_id =
     363             :           observations_registered.at(key).size();
     364             : 
     365             :       // Ok because we have the node lock
     366             :       auto& volume_observers_contributed =
     367             :           db::get_mutable_reference<Tags::ContributorsOfTensorData>(
     368             :               make_not_null(&box));
     369             :       auto& all_volume_data =
     370             :           db::get_mutable_reference<TensorDataTag>(make_not_null(&box));
     371             :       auto& all_serialized_functions_of_time =
     372             :           db::get_mutable_reference<Tags::SerializedFunctionsOfTime>(
     373             :               make_not_null(&box));
     374             :       auto& box_dependencies =
     375             :           db::get_mutable_reference<Tags::Dependencies>(make_not_null(&box));
     376             : 
     377             :       auto& contributed_group_ids =
     378             :           volume_observers_contributed[observation_id];
     379             : 
     380             :       if (UNLIKELY(contributed_group_ids.contains(observer_group_id))) {
     381             :         ERROR("Already received reduction data to observation id "
     382             :               << observation_id << " from array component id "
     383             :               << observer_group_id);
     384             :       }
     385             :       contributed_group_ids.insert(observer_group_id);
     386             : 
     387             :       // Add received volume data to the box
     388             :       if (all_volume_data.contains(observation_id)) {
     389             :         auto& current_data = all_volume_data.at(observation_id);
     390             :         current_data.insert(
     391             :             std::make_move_iterator(received_volume_data.begin()),
     392             :             std::make_move_iterator(received_volume_data.end()));
     393             :         ASSERT(all_serialized_functions_of_time.at(observation_id) ==
     394             :                    serialized_functions_of_time,
     395             :                "Got different serialized functions of time from different "
     396             :                "elements.");
     397             :       } else {
     398             :         // We haven't been called before on this processing element.
     399             :         all_volume_data[observation_id] = std::move(received_volume_data);
     400             :         all_serialized_functions_of_time[observation_id] =
     401             :             serialized_functions_of_time;
     402             :       }
     403             : 
     404             :       // Check if we have received all "volume" data from the Observer
     405             :       // group. If so we write to disk.
     406             :       if (volume_observers_contributed.at(observation_id).size() ==
     407             :           observations_registered_with_id) {
     408             :         // Check if
     409             :         //  1. there is an external dependencies
     410             :         if (dependency.has_value()) {
     411             :           //  2. if there is, that we have received something at this time
     412             :           //  3. that the dependencies are the same
     413             :           if (box_dependencies.contains(observation_id)) {
     414             :             if (UNLIKELY(box_dependencies.at(observation_id).first !=
     415             :                          dependency.value())) {
     416             :               ERROR(
     417             :                   "The dependency that was sent to the ObserverWriter from the "
     418             :                   "elements ("
     419             :                   << dependency.value()
     420             :                   << ") does not match the dependency received from "
     421             :                      "ContributeDependency ("
     422             :                   << box_dependencies.at(observation_id).first << ").");
     423             :             }
     424             :             //  4. that we are writing the volume data to disk
     425             :             if (box_dependencies.at(observation_id).second) {
     426             :               perform_write = true;
     427             :               volume_data = std::move(all_volume_data[observation_id]);
     428             :             }
     429             : 
     430             :             // Whether or not we are writing data to disk, we clean up because
     431             :             // we have received both the volume data and the dependency
     432             :             all_volume_data.erase(observation_id);
     433             :             all_serialized_functions_of_time.erase(observation_id);
     434             :             volume_observers_contributed.erase(observation_id);
     435             :             box_dependencies.erase(observation_id);
     436             :           }
     437             :         } else {
     438             :           perform_write = true;
     439             :           volume_data = std::move(all_volume_data[observation_id]);
     440             :           all_volume_data.erase(observation_id);
     441             :           all_serialized_functions_of_time.erase(observation_id);
     442             :           volume_observers_contributed.erase(observation_id);
     443             :         }
     444             :       }
     445             :     }
     446             : 
     447             :     if (perform_write) {
     448             :       VolumeActions_detail::write_combined_volume_data<ParallelComponent>(
     449             :           cache, observation_id, volume_data, make_not_null(volume_file_lock),
     450             :           subfile_name, serialized_functions_of_time);
     451             :     }
     452             :   }
     453             : };
     454             : 
     455             : /*!
     456             :  * \brief Threaded action that will add a dependency to the ObserverWriter for a
     457             :  * given ObservationId ( \p time + \p volume_subfile_name).
     458             :  *
     459             :  * \details If not all the volume data for this ObservationId has been received
     460             :  * yet, then this will just add the dependency to the box and exit without
     461             :  * writing anything. If all volume data arrives before this action is called,
     462             :  * then it will write out the volume data (or remove it if we aren't writing).
     463             :  */
     464           1 : struct ContributeDependency {
     465             :   template <typename ParallelComponent, typename DbTagsList,
     466             :             typename Metavariables, typename ArrayIndex>
     467           0 :   static void apply(db::DataBox<DbTagsList>& box,
     468             :                     Parallel::GlobalCache<Metavariables>& cache,
     469             :                     const ArrayIndex& /*array_index*/,
     470             :                     const gsl::not_null<Parallel::NodeLock*> node_lock,
     471             :                     const double time, const std::string& dependency,
     472             :                     std::string volume_subfile_name,
     473             :                     const bool write_volume_data) {
     474             :     if (not volume_subfile_name.starts_with("/")) {
     475             :       volume_subfile_name = "/" + volume_subfile_name;
     476             :     }
     477             :     if (not volume_subfile_name.ends_with(".vol")) {
     478             :       volume_subfile_name += ".vol";
     479             :     }
     480             : 
     481             :     const ObservationId observation_id{time, volume_subfile_name};
     482             : 
     483             :     // The below gymnastics with pointers is done in order to minimize the
     484             :     // time spent locking the entire node, which is necessary because the
     485             :     // DataBox does not allow any function calls, either get and mutate, during
     486             :     // a mutate. We separate out writing from the operations that edit the
     487             :     // DataBox since writing to disk can be very slow, but moving data around is
     488             :     // comparatively quick.
     489             :     Parallel::NodeLock* volume_file_lock = nullptr;
     490             :     bool perform_write = false;
     491             :     std::unordered_map<Parallel::ArrayComponentId, ElementVolumeData>
     492             :         volume_data{};
     493             :     std::optional<std::vector<char>> serialized_functions_of_time{};
     494             : 
     495             :     // For now just hold the entire node. We can optimize with different locks
     496             :     // later on
     497             :     {
     498             :       const std::lock_guard hold_lock(*node_lock);
     499             : 
     500             :       db::mutate<Tags::H5FileLock, Tags::Dependencies>(
     501             :           [&](const gsl::not_null<Parallel::NodeLock*> volume_file_lock_ptr,
     502             :               const gsl::not_null<std::unordered_map<
     503             :                   ObservationId, std::pair<std::string, bool>>*>
     504             :                   dependencies) {
     505             :             volume_file_lock = &*volume_file_lock_ptr;
     506             :             (*dependencies)[observation_id] =
     507             :                 std::pair{dependency, write_volume_data};
     508             :           },
     509             :           make_not_null(&box));
     510             : 
     511             :       auto& volume_observers_contributed =
     512             :           db::get_mutable_reference<Tags::ContributorsOfTensorData>(
     513             :               make_not_null(&box));
     514             :       auto& all_volume_data =
     515             :           db::get_mutable_reference<Tags::TensorData>(make_not_null(&box));
     516             :       auto& all_serialized_functions_of_time =
     517             :           db::get_mutable_reference<Tags::SerializedFunctionsOfTime>(
     518             :               make_not_null(&box));
     519             :       auto& box_dependencies =
     520             :           db::get_mutable_reference<Tags::Dependencies>(make_not_null(&box));
     521             :       const auto& expected_contributors =
     522             :           db::get<Tags::ExpectedContributorsForObservations>(box);
     523             : 
     524             :       if (not expected_contributors.contains(
     525             :               observation_id.observation_key())) {
     526             :         ERROR("Key " << observation_id.observation_key()
     527             :                      << " was not registered.");
     528             :       }
     529             : 
     530             :       // We have not received any volume data at this time so we can't do
     531             :       // anything
     532             :       if (not(volume_observers_contributed.contains(observation_id) and
     533             :               all_volume_data.contains(observation_id))) {
     534             :         return;
     535             :       }
     536             : 
     537             :       // Check if we have received all "volume" data from the Observer
     538             :       // group. If so we write to disk. Then always delete it since the volume
     539             :       // data was waiting for this dependency to arrive to be written
     540             :       if (volume_observers_contributed.at(observation_id).size() ==
     541             :           expected_contributors.at(observation_id.observation_key()).size()) {
     542             :         if (write_volume_data) {
     543             :           perform_write = true;
     544             :           volume_data = std::move(all_volume_data.at(observation_id));
     545             :           serialized_functions_of_time =
     546             :               std::move(all_serialized_functions_of_time.at(observation_id));
     547             :         }
     548             : 
     549             :         all_volume_data.erase(observation_id);
     550             :         all_serialized_functions_of_time.erase(observation_id);
     551             :         volume_observers_contributed.erase(observation_id);
     552             :         box_dependencies.erase(observation_id);
     553             :       }
     554             :     }
     555             : 
     556             :     if (perform_write) {
     557             :       VolumeActions_detail::write_combined_volume_data<ParallelComponent>(
     558             :           cache, observation_id, volume_data, make_not_null(volume_file_lock),
     559             :           volume_subfile_name, serialized_functions_of_time);
     560             :     }
     561             :   }
     562             : };
     563             : 
     564             : /*!
     565             :  * \brief Write volume data (such as surface data) at a given time (specified by
     566             :  * an `ObservationId`) without the need to register or reduce anything, e.g.
     567             :  * from a singleton component or from a specific chare.
     568             :  *
     569             :  * Use `observers::Actions::ContributeVolumeDataToWriter` instead if you need to
     570             :  * write volume data from an array chare (e.g., writing volume data from all the
     571             :  * elements in a domain).
     572             :  *
     573             :  * Invoke this action on the `observers::ObserverWriter` component on node 0.
     574             :  * Pass the following arguments when invoking this action:
     575             :  *
     576             :  * - `h5_file_name`: the name of the HDF5 file where the volume data is to be
     577             :  * written (without the .h5 extension).
     578             :  * - `subfile_path`: the path where the volume data should be written in the
     579             :  *   HDF5 file. Include a leading slash, e.g., `/AhA`.
     580             :  * - `observation_id`: the ObservationId corresponding to the volume data.
     581             :  * - `volume_data`: the volume data to be written.
     582             :  */
     583           1 : struct WriteVolumeData {
     584             :   template <typename ParallelComponent, typename DbTagsList,
     585             :             typename Metavariables, typename ArrayIndex, typename... Ts,
     586             :             typename DataBox = db::DataBox<DbTagsList>>
     587           0 :   static void apply(db::DataBox<DbTagsList>& box,
     588             :                     Parallel::GlobalCache<Metavariables>& cache,
     589             :                     const ArrayIndex& /*array_index*/,
     590             :                     const gsl::not_null<Parallel::NodeLock*> /*node_lock*/,
     591             :                     const std::string& h5_file_name,
     592             :                     const std::string& subfile_path,
     593             :                     const observers::ObservationId& observation_id,
     594             :                     std::vector<ElementVolumeData>&& volume_data) {
     595             :     auto& volume_file_lock =
     596             :         db::get_mutable_reference<Tags::H5FileLock>(make_not_null(&box));
     597             :     const std::lock_guard hold_lock(volume_file_lock);
     598             :     VolumeActions_detail::write_data(
     599             :         h5_file_name, observers::input_source_from_cache(cache), subfile_path,
     600             :         observation_id, std::move(volume_data));
     601             :   }
     602             : 
     603             :   // For a local synchronous action
     604           0 :   using return_type = void;
     605             : 
     606             :   /// \brief The apply call for the local synchronous action
     607             :   template <typename ParallelComponent, typename DbTagsList,
     608             :             typename Metavariables, typename... Ts,
     609             :             typename DataBox = db::DataBox<DbTagsList>>
     610           1 :   static return_type apply(
     611             :       db::DataBox<DbTagsList>& box,
     612             :       const gsl::not_null<Parallel::NodeLock*> /*node_lock*/,
     613             :       Parallel::GlobalCache<Metavariables>& cache,
     614             :       const std::string& h5_file_name, const std::string& subfile_path,
     615             :       const observers::ObservationId& observation_id,
     616             :       std::vector<ElementVolumeData>&& volume_data) {
     617             :     auto& volume_file_lock =
     618             :         db::get_mutable_reference<Tags::H5FileLock>(make_not_null(&box));
     619             :     const std::lock_guard hold_lock(volume_file_lock);
     620             :     VolumeActions_detail::write_data(
     621             :         h5_file_name, observers::input_source_from_cache(cache), subfile_path,
     622             :         observation_id, std::move(volume_data));
     623             :   }
     624             : };
     625             : }  // namespace ThreadedActions
     626             : 
     627             : /*!
     628             :  * \brief Contribute volume data for observing from an element.
     629             :  *
     630             :  * \tparam UseObserverComponent Whether to first send data to the
     631             :  * `observers::Observer` group component.  Generally should be true if
     632             :  * using an implementation where elements are bound to cores, and
     633             :  * false if they are only bound to nodes.
     634             :  */
     635             : template <bool UseObserverComponent, typename Metavariables>
     636           1 : void contribute_volume_data(
     637             :     Parallel::GlobalCache<Metavariables>& cache,
     638             :     observers::ObservationId observation_id, std::string subfile_path,
     639             :     const Parallel::ArrayComponentId& array_component_id,
     640             :     ElementVolumeData element_volume_data,
     641             :     std::optional<std::string> dependency = std::nullopt) {
     642             :   std::optional<std::vector<char>> serialized_observation_functions_of_time{};
     643             :   if constexpr (Parallel::is_in_global_cache<Metavariables,
     644             :                                              domain::Tags::FunctionsOfTime>) {
     645             :     const auto& functions_of_time = get<domain::Tags::FunctionsOfTime>(cache);
     646             :     // NOLINTNEXTLINE(misc-const-correctness)
     647             :     domain::FunctionsOfTimeMap observation_functions_of_time{};
     648             :     const double obs_time = observation_id.value();
     649             :     // Generally, the functions of time should be valid when we
     650             :     // perform an observation.  The exception is when running in an
     651             :     // AtCleanup event, in which case the observation time is a
     652             :     // bogus value and we just skip writing the values.
     653             :     if (alg::all_of(functions_of_time, [&](const auto& fot) {
     654             :           const auto bounds = fot.second->time_bounds();
     655             :           return bounds[0] <= obs_time and obs_time <= bounds[1];
     656             :         })) {
     657             :       // create a new function of time, effectively truncating the history.
     658             :       for (const auto& [name, fot_ptr] : functions_of_time) {
     659             :         observation_functions_of_time[name] = fot_ptr->create_at_time(
     660             :             obs_time, obs_time + 100.0 *
     661             :                                      std::numeric_limits<double>::epsilon() *
     662             :                                      std::max(std::abs(obs_time), 1.0));
     663             :       }
     664             :       serialized_observation_functions_of_time =
     665             :           serialize(observation_functions_of_time);
     666             :     }
     667             :   }
     668             : 
     669             :   if constexpr (UseObserverComponent) {
     670             :     // Send data to volume observer
     671             :     auto& local_observer = *Parallel::local_branch(
     672             :         Parallel::get_parallel_component<observers::Observer<Metavariables>>(
     673             :             cache));
     674             : 
     675             :     Parallel::simple_action<observers::Actions::ContributeVolumeData>(
     676             :         local_observer, std::move(observation_id), std::move(subfile_path),
     677             :         array_component_id, std::move(element_volume_data),
     678             :         std::move(serialized_observation_functions_of_time),
     679             :         std::move(dependency));
     680             :   } else {
     681             :     // Send data to reduction observer writer (nodegroup)
     682             :     auto& local_observer = *Parallel::local_branch(
     683             :         Parallel::get_parallel_component<
     684             :             observers::ObserverWriter<Metavariables>>(cache));
     685             : 
     686             :     std::unordered_map<Parallel::ArrayComponentId,
     687             :                        std::vector<ElementVolumeData>>
     688             :         data_to_send{};
     689             :     data_to_send[array_component_id] =
     690             :         make_vector(std::move(element_volume_data));
     691             :     Parallel::threaded_action<
     692             :         observers::ThreadedActions::ContributeVolumeDataToWriter>(
     693             :         local_observer, std::move(observation_id), array_component_id,
     694             :         std::move(subfile_path), std::move(data_to_send),
     695             :         std::move(serialized_observation_functions_of_time),
     696             :         std::move(dependency));
     697             :   }
     698             : }
     699             : }  // namespace observers

Generated by: LCOV version 1.14