SpECTRE Documentation Coverage Report
Current view: top level - IO/Observer - VolumeActions.hpp Hit Total Coverage
Commit: f1ddee3e40d81480e49140855d2b0e66fafaa908 Lines: 2 5 40.0 %
Date: 2020-12-02 17:35:08
Legend: Lines: hit not hit

          Line data    Source code
       1           0 : // Distributed under the MIT License.
       2             : // See LICENSE.txt for details.
       3             : 
       4             : #pragma once
       5             : 
       6             : #include <cstddef>
       7             : #include <iterator>
       8             : #include <unordered_map>
       9             : 
      10             : #include "DataStructures/DataBox/DataBox.hpp"
      11             : #include "DataStructures/Index.hpp"
      12             : #include "DataStructures/Tensor/TensorData.hpp"
      13             : #include "ErrorHandling/Error.hpp"
      14             : #include "IO/H5/AccessType.hpp"
      15             : #include "IO/H5/File.hpp"
      16             : #include "IO/H5/VolumeData.hpp"
      17             : #include "IO/Observer/ArrayComponentId.hpp"
      18             : #include "IO/Observer/ObservationId.hpp"
      19             : #include "IO/Observer/ObserverComponent.hpp"
      20             : #include "IO/Observer/Tags.hpp"
      21             : #include "IO/Observer/TypeOfObservation.hpp"
      22             : #include "Parallel/GlobalCache.hpp"
      23             : #include "Parallel/Info.hpp"
      24             : #include "Parallel/Invoke.hpp"
      25             : #include "Utilities/Algorithm.hpp"
      26             : #include "Utilities/Gsl.hpp"
      27             : #include "Utilities/Requires.hpp"
      28             : #include "Utilities/TMPL.hpp"
      29             : #include "Utilities/TaggedTuple.hpp"
      30             : 
      31             : namespace observers {
      32             : /// \cond
      33             : namespace ThreadedActions {
      34             : struct ContributeVolumeDataToWriter;
      35             : }  // namespace ThreadedActions
      36             : /// \endcond
      37             : namespace Actions {
      38             : 
      39             : /*!
      40             :  * \ingroup ObserversGroup
      41             :  * \brief Send volume tensor data to the observer.
      42             :  *
      43             :  * The caller of this Action (which is to be invoked on the Observer parallel
      44             :  * component) must pass in an `observation_id` used to uniquely identify the
      45             :  * observation in time, the name of the `h5::VolumeData` subfile in the HDF5
      46             :  * file (e.g. `/element_data`, where the slash is important), the contributing
      47             :  * parallel component element's component id, a vector of the `TensorComponent`s
      48             :  * to be written to disk, and an `Index<Dim>` of the extents of the volume.
      49             :  */
      50           1 : struct ContributeVolumeData {
      51             :   template <
      52             :       typename ParallelComponent, typename DbTagsList, typename Metavariables,
      53             :       typename ArrayIndex, size_t Dim,
      54             :       Requires<tmpl::list_contains_v<DbTagsList, Tags::TensorData>> = nullptr>
      55           0 :   static void apply(db::DataBox<DbTagsList>& box,
      56             :                     Parallel::GlobalCache<Metavariables>& cache,
      57             :                     const ArrayIndex& array_index,
      58             :                     const observers::ObservationId& observation_id,
      59             :                     const std::string& subfile_name,
      60             :                     const observers::ArrayComponentId& sender_array_id,
      61             :                     std::vector<TensorComponent>&& received_tensor_data,
      62             :                     const Index<Dim>& received_extents,
      63             :                     const std::array<Spectral::Basis, Dim>& received_basis,
      64             :                     const std::array<Spectral::Quadrature, Dim>&
      65             :                         received_quadrature) noexcept {
      66             :     db::mutate<Tags::TensorData, Tags::ContributorsOfTensorData>(
      67             :         make_not_null(&box),
      68             :         [&array_index, &cache, &observation_id, &sender_array_id,
      69             :          &received_basis, &received_extents, &received_quadrature,
      70             :          &received_tensor_data, &subfile_name](
      71             :             const gsl::not_null<std::unordered_map<
      72             :                 observers::ObservationId,
      73             :                 std::unordered_map<observers::ArrayComponentId,
      74             :                                    ElementVolumeData>>*>
      75             :                 volume_data,
      76             :             const gsl::not_null<std::unordered_map<
      77             :                 ObservationId, std::unordered_set<ArrayComponentId>>*>
      78             :                 contributed_volume_data_ids,
      79             :             const std::unordered_map<ObservationKey,
      80             :                                      std::unordered_set<ArrayComponentId>>&
      81             :                 registered_array_component_ids) mutable noexcept {
      82             :           const ObservationKey& key{observation_id.observation_key()};
      83             :           if (UNLIKELY(registered_array_component_ids.find(key) ==
      84             :                        registered_array_component_ids.end())) {
      85             :             ERROR("Receiving data from observation id "
      86             :                   << observation_id << " that was never registered.");
      87             :           }
      88             :           const auto& registered_ids = registered_array_component_ids.at(key);
      89             :           if (UNLIKELY(registered_ids.find(sender_array_id) ==
      90             :                        registered_ids.end())) {
      91             :             ERROR("Receiving volume data from array component id "
      92             :                   << sender_array_id << " that is not registered.");
      93             :           }
      94             : 
      95             :           auto& contributed_array_ids =
      96             :               (*contributed_volume_data_ids)[observation_id];
      97             :           if (UNLIKELY(contributed_array_ids.find(sender_array_id) !=
      98             :                        contributed_array_ids.end())) {
      99             :             ERROR("Already received volume data to observation id "
     100             :                   << observation_id << " from array component id "
     101             :                   << sender_array_id);
     102             :           }
     103             :           contributed_array_ids.insert(sender_array_id);
     104             : 
     105             :           if (volume_data->count(observation_id) == 0 or
     106             :               volume_data->at(observation_id).count(sender_array_id) == 0) {
     107             :             std::vector<size_t> extents(received_extents.begin(),
     108             :                                         received_extents.end());
     109             :             std::vector<Spectral::Basis> bases(received_basis.begin(),
     110             :                                                received_basis.end());
     111             :             std::vector<Spectral::Quadrature> quadratures(
     112             :                 received_quadrature.begin(), received_quadrature.end());
     113             : 
     114             :             volume_data->operator[](observation_id)
     115             :                 .emplace(sender_array_id,
     116             :                          ElementVolumeData(
     117             :                              {received_extents.begin(), received_extents.end()},
     118             :                              std::move(received_tensor_data),
     119             :                              {received_basis.begin(), received_basis.end()},
     120             :                              {received_quadrature.begin(),
     121             :                               received_quadrature.end()}));
     122             :           } else {
     123             :             auto& current_data =
     124             :                 volume_data->at(observation_id).at(sender_array_id);
     125             :             if (UNLIKELY(
     126             :                     not alg::equal(current_data.extents, received_extents))) {
     127             :               ERROR(
     128             :                   "The extents from the same volume component at a specific "
     129             :                   "observation should always be the same. For example, the "
     130             :                   "extents of a dG element should be the same for all calls to "
     131             :                   "ContributeVolumeData that occur at the same time.");
     132             :             }
     133             :             current_data.tensor_components.insert(
     134             :                 current_data.tensor_components.end(),
     135             :                 std::make_move_iterator(received_tensor_data.begin()),
     136             :                 std::make_move_iterator(received_tensor_data.end()));
     137             :           }
     138             : 
     139             :           // Check if we have received all "volume" data from the registered
     140             :           // elements. If so we copy it to the nodegroup volume writer.
     141             :           if (contributed_array_ids.size() == registered_ids.size()) {
     142             :             auto& local_writer = *Parallel::get_parallel_component<
     143             :                                       ObserverWriter<Metavariables>>(cache)
     144             :                                       .ckLocalBranch();
     145             :             Parallel::threaded_action<
     146             :                 ThreadedActions::ContributeVolumeDataToWriter>(
     147             :                 local_writer, observation_id,
     148             :                 ArrayComponentId{std::add_pointer_t<ParallelComponent>{nullptr},
     149             :                                  Parallel::ArrayIndex<ArrayIndex>(array_index)},
     150             :                 subfile_name, std::move((*volume_data)[observation_id]));
     151             :             volume_data->erase(observation_id);
     152             :           }
     153             :         },
     154             :         db::get<Tags::ExpectedContributorsForObservations>(box));
     155             :   }
     156             : };
     157             : }  // namespace Actions
     158             : 
     159             : namespace ThreadedActions {
     160             : /*!
     161             :  * \ingroup ObserversGroup
     162             :  * \brief Move data to the observer writer for writing to disk.
     163             :  *
     164             :  * Once data from all cores is collected this action writes the data to disk.
     165             :  */
     166           1 : struct ContributeVolumeDataToWriter {
     167             :   template <typename ParallelComponent, typename DbTagsList,
     168             :             typename Metavariables, typename ArrayIndex>
     169           0 :   static void apply(
     170             :       db::DataBox<DbTagsList>& box, Parallel::GlobalCache<Metavariables>& cache,
     171             :       const ArrayIndex& /*array_index*/,
     172             :       const gsl::not_null<Parallel::NodeLock*> node_lock,
     173             :       const observers::ObservationId& observation_id,
     174             :       ArrayComponentId observer_group_id, const std::string& subfile_name,
     175             :       std::unordered_map<observers::ArrayComponentId, ElementVolumeData>&&
     176             :           received_volume_data) noexcept {
     177             :     if constexpr (tmpl::list_contains_v<DbTagsList, Tags::TensorData> and
     178             :                   tmpl::list_contains_v<DbTagsList,
     179             :                                         Tags::ContributorsOfTensorData> and
     180             :                   tmpl::list_contains_v<DbTagsList, Tags::VolumeDataLock> and
     181             :                   tmpl::list_contains_v<DbTagsList, Tags::H5FileLock>) {
     182             :       // The below gymnastics with pointers is done in order to minimize the
     183             :       // time spent locking the entire node, which is necessary because the
     184             :       // DataBox does not allow any functions calls, both get and mutate, during
     185             :       // a mutate. This design choice in DataBox is necessary to guarantee a
     186             :       // consistent state throughout mutation. Here, however, we need to be
     187             :       // reasonable efficient in parallel and so we manually guarantee that
     188             :       // consistent state. To this end, we create pointers and assign to them
     189             :       // the data in the DataBox which is guaranteed to be pointer stable. The
     190             :       // data itself is guaranteed to be stable inside the VolumeDataLock.
     191             :       std::unordered_map<
     192             :           observers::ObservationId,
     193             :           std::unordered_map<observers::ArrayComponentId, ElementVolumeData>>*
     194             :           all_volume_data = nullptr;
     195             :       std::unordered_map<observers::ArrayComponentId, ElementVolumeData>
     196             :           volume_data;
     197             :       Parallel::NodeLock* volume_file_lock = nullptr;
     198             :       std::unordered_map<ObservationId, std::unordered_set<ArrayComponentId>>*
     199             :           volume_observers_contributed = nullptr;
     200             :       Parallel::NodeLock* volume_data_lock = nullptr;
     201             :       size_t observations_registered_with_id =
     202             :           std::numeric_limits<size_t>::max();
     203             : 
     204             :       node_lock->lock();
     205             :       db::mutate<Tags::TensorData, Tags::ContributorsOfTensorData,
     206             :                  Tags::VolumeDataLock, Tags::H5FileLock>(
     207             :           make_not_null(&box),
     208             :           [&observation_id, &observations_registered_with_id,
     209             :            &observer_group_id, &all_volume_data, &volume_observers_contributed,
     210             :            &volume_data_lock, &volume_file_lock](
     211             :               const gsl::not_null<std::unordered_map<
     212             :                   observers::ObservationId,
     213             :                   std::unordered_map<observers::ArrayComponentId,
     214             :                                      ElementVolumeData>>*>
     215             :                   volume_data_ptr,
     216             :               const gsl::not_null<std::unordered_map<
     217             :                   ObservationId, std::unordered_set<ArrayComponentId>>*>
     218             :                   volume_observers_contributed_ptr,
     219             :               const gsl::not_null<Parallel::NodeLock*> volume_data_lock_ptr,
     220             :               const gsl::not_null<Parallel::NodeLock*> volume_file_lock_ptr,
     221             :               const std::unordered_map<ObservationKey,
     222             :                                        std::unordered_set<ArrayComponentId>>&
     223             :                   observations_registered) noexcept {
     224             :             const ObservationKey& key{observation_id.observation_key()};
     225             :             const auto& registered_group_ids = observations_registered.at(key);
     226             :             if (UNLIKELY(registered_group_ids.find(observer_group_id) ==
     227             :                          registered_group_ids.end())) {
     228             :               ERROR("The observer group id "
     229             :                     << observer_group_id
     230             :                     << " was not registered for the observation id "
     231             :                     << observation_id);
     232             :             }
     233             : 
     234             :             all_volume_data = &*volume_data_ptr;
     235             :             volume_observers_contributed = &*volume_observers_contributed_ptr;
     236             :             volume_data_lock = &*volume_data_lock_ptr;
     237             :             observations_registered_with_id =
     238             :                 observations_registered.at(key).size();
     239             :             volume_file_lock = &*volume_file_lock_ptr;
     240             :           },
     241             :           db::get<Tags::ExpectedContributorsForObservations>(box));
     242             :       node_lock->unlock();
     243             : 
     244             :       ASSERT(all_volume_data != nullptr,
     245             :              "Failed to set all_volume_data in the mutate");
     246             :       ASSERT(volume_file_lock != nullptr,
     247             :              "Failed to set volume_file_lock in the mutate");
     248             :       ASSERT(volume_observers_contributed != nullptr,
     249             :              "Failed to set volume_observers_contributed in the mutate");
     250             :       ASSERT(volume_data_lock != nullptr,
     251             :              "Failed to set volume_data_lock in the mutate");
     252             :       ASSERT(
     253             :           observations_registered_with_id != std::numeric_limits<size_t>::max(),
     254             :           "Failed to set observations_registered_with_id when mutating the "
     255             :           "DataBox. This is a bug in the code.");
     256             : 
     257             :       volume_data_lock->lock();
     258             :       auto& contributed_group_ids =
     259             :           (*volume_observers_contributed)[observation_id];
     260             : 
     261             :       if (UNLIKELY(contributed_group_ids.find(observer_group_id) !=
     262             :                    contributed_group_ids.end())) {
     263             :         ERROR("Already received reduction data to observation id "
     264             :               << observation_id << " from array component id "
     265             :               << observer_group_id);
     266             :       }
     267             :       contributed_group_ids.insert(observer_group_id);
     268             : 
     269             :       if (all_volume_data->find(observation_id) == all_volume_data->end()) {
     270             :         // We haven't been called before on this processing element.
     271             :         all_volume_data->operator[](observation_id) =
     272             :             std::move(received_volume_data);
     273             :       } else {
     274             :         auto& current_data = all_volume_data->at(observation_id);
     275             :         current_data.insert(
     276             :             std::make_move_iterator(received_volume_data.begin()),
     277             :             std::make_move_iterator(received_volume_data.end()));
     278             :       }
     279             :       // Check if we have received all "volume" data from the Observer
     280             :       // group. If so we write to disk.
     281             :       bool perform_write = false;
     282             :       if (volume_observers_contributed->at(observation_id).size() ==
     283             :           observations_registered_with_id) {
     284             :         perform_write = true;
     285             :         volume_data = std::move(all_volume_data->operator[](observation_id));
     286             :         all_volume_data->erase(observation_id);
     287             :         volume_observers_contributed->erase(observation_id);
     288             :       }
     289             :       volume_data_lock->unlock();
     290             : 
     291             :       if (perform_write) {
     292             :         ASSERT(not volume_data.empty(),
     293             :                "Failed to populate volume_data before trying to write it.");
     294             :         // Write to file. We use a separate node lock because writing can be
     295             :         // very time consuming (it's network dependent, depends on how full the
     296             :         // disks are, what other users are doing, etc.) and we want to be able
     297             :         // to continue to work on the nodegroup while we are writing data to
     298             :         // disk.
     299             :         volume_file_lock->lock();
     300             :         {
     301             :           // Scoping is for closing HDF5 file before we release the lock.
     302             :           const auto& file_prefix = Parallel::get<Tags::VolumeFileName>(cache);
     303             :           h5::H5File<h5::AccessType::ReadWrite> h5file(
     304             :               file_prefix + std::to_string(Parallel::my_node()) + ".h5", true);
     305             :           constexpr size_t version_number = 0;
     306             :           auto& volume_file =
     307             :               h5file.try_insert<h5::VolumeData>(subfile_name, version_number);
     308             :           std::vector<ElementVolumeData> dg_elements;
     309             :           dg_elements.reserve(volume_data.size());
     310             :           for (const auto& id_and_element : volume_data) {
     311             :             dg_elements.push_back(id_and_element.second);
     312             :           }
     313             :           // Write the data to the file
     314             :           volume_file.write_volume_data(observation_id.hash(),
     315             :                                         observation_id.value(), dg_elements);
     316             :         }
     317             :         volume_file_lock->unlock();
     318             :       }
     319             :     } else {
     320             :       (void)node_lock;
     321             :       (void)observation_id;
     322             :       (void)observer_group_id;
     323             :       (void)subfile_name;
     324             :       (void)received_volume_data;
     325             :       ERROR(
     326             :           "Could not find one of the tags TensorData, "
     327             :           "ContributorsOfTensorData, "
     328             :           "VolumeDataLock, or H5FileLock in the DataBox.");
     329             :     }
     330             :   }
     331             : };
     332             : }  // namespace ThreadedActions
     333             : }  // namespace observers

Generated by: LCOV version 1.14