VolumeActions.hpp
1 // Distributed under the MIT License.
2 // See LICENSE.txt for details.
3 
4 #pragma once
5 
6 #include <cstddef>
7 #include <iterator>
8 #include <unordered_map>
9 
12 #include "DataStructures/Index.hpp"
13 #include "DataStructures/Tensor/TensorData.hpp"
14 #include "IO/H5/AccessType.hpp"
15 #include "IO/H5/File.hpp"
16 #include "IO/H5/VolumeData.hpp"
17 #include "IO/Observer/ArrayComponentId.hpp"
18 #include "IO/Observer/ObservationId.hpp"
19 #include "IO/Observer/ObserverComponent.hpp"
20 #include "IO/Observer/Tags.hpp"
21 #include "IO/Observer/TypeOfObservation.hpp"
23 #include "Parallel/Info.hpp"
24 #include "Parallel/Invoke.hpp"
25 #include "Utilities/Algorithm.hpp"
26 #include "Utilities/Requires.hpp"
28 
29 namespace observers {
30 namespace ThreadedActions {
31 /// \cond
32 struct WriteVolumeData;
33 /// \endcond
34 } // namespace ThreadedActions
35 
36 namespace Actions {
37 /// \cond
38 struct ContributeVolumeDataToWriter;
39 /// \endcond
40 
41 /*!
42  * \ingroup ObserverGroup
43  * \brief Send volume tensor data to the observer.
44  *
45  * The caller of this Action (which is to be invoked on the Observer parallel
46  * component) must pass in an `observation_id` used to uniquely identify the
47  * observation in time, the name of the `h5::VolumeData` subfile in the HDF5
48  * file (e.g. `/element_data`, where the slash is important), the contributing
49  * parallel component element's component id, a vector of the `TensorComponent`s
50  * to be written to disk, and an `Index<Dim>` of the extents of the volume.
51  *
52  * \warning Currently this action can only be called once per `observation_id`
53  * per `array_component_id`. Calling it more often is undefined behavior and may
54  * result in incomplete data being written and/or memory leaks. The reason is
55  * that we do not register the number of times a component with an ID at a
56  * specific observation id will call the observer. However, this will be a
57  * feature implemented in the future.
58  */
60  template <typename... DbTags, typename... InboxTags, typename Metavariables,
61  typename ArrayIndex, typename ActionList,
62  typename ParallelComponent, size_t Dim,
63  Requires<sizeof...(DbTags) != 0> = nullptr>
64  static void apply(db::DataBox<tmpl::list<DbTags...>>& box,
67  const ArrayIndex& /*array_index*/,
68  const ActionList /*meta*/,
69  const ParallelComponent* const /*meta*/,
70  const observers::ObservationId& observation_id,
71  const std::string& subfile_name,
72  const observers::ArrayComponentId& array_component_id,
73  std::vector<TensorComponent>&& in_received_tensor_data,
74  const Index<Dim>& received_extents) noexcept {
75  db::mutate<Tags::TensorData>(
76  make_not_null(&box),
77  [
78  &cache, &observation_id, &array_component_id, &received_extents,
79  received_tensor_data = std::move(in_received_tensor_data), &
80  subfile_name
83  volume_component_ids) mutable noexcept {
84  if (volume_data->count(observation_id) == 0 or
85  volume_data->at(observation_id).count(array_component_id) == 0) {
86  std::vector<size_t> extents(received_extents.begin(),
87  received_extents.end());
88  volume_data->operator[](observation_id)
89  .emplace(
90  array_component_id,
92  std::move(extents), std::move(received_tensor_data)));
93  } else {
94  auto& current_data =
95  volume_data->at(observation_id).at(array_component_id);
96  ASSERT(
97  alg::equal(current_data.extents, received_extents),
98  "The extents from the same volume component at a specific "
99  "observation should always be the same. For example, the "
100  "extents of a dG element should be the same for all calls to "
101  "ContributeVolumeData that occur at the same time.");
102  current_data.tensor_components.insert(
103  current_data.tensor_components.end(),
104  std::make_move_iterator(received_tensor_data.begin()),
105  std::make_move_iterator(received_tensor_data.end()));
106  }
107 
108  // Check if we have received all "volume" data from the registered
109  // elements. If so we copy it to the nodegroup volume writer.
110  if (volume_data->at(observation_id).size() ==
111  volume_component_ids.size()) {
112  auto& local_writer = *Parallel::get_parallel_component<
114  .ckLocalBranch();
115  Parallel::simple_action<ContributeVolumeDataToWriter>(
116  local_writer, observation_id, subfile_name,
117  std::move((*volume_data)[observation_id]));
118  volume_data->erase(observation_id);
119  }
120  },
121  db::get<Tags::VolumeArrayComponentIds>(box));
122  }
123 };
124 
125 /*!
126  * \ingroup ObserverGroup
127  * \brief Move data to the observer writer for writing to disk.
128  */
130  template <typename... DbTags, typename... InboxTags, typename Metavariables,
131  typename ArrayIndex, typename ActionList,
132  typename ParallelComponent,
133  Requires<sizeof...(DbTags) != 0> = nullptr>
134  static void apply(db::DataBox<tmpl::list<DbTags...>>& box,
137  const ArrayIndex& /*array_index*/,
138  const ActionList /*meta*/,
139  const ParallelComponent* const /*meta*/,
140  const observers::ObservationId& observation_id,
141  const std::string& subfile_name,
144  in_volume_data) noexcept {
145  db::mutate<Tags::TensorData, Tags::VolumeObserversContributed>(
146  make_not_null(&box),
147  [
148  &cache, &observation_id, in_volume_data = std::move(in_volume_data), &
149  subfile_name
154  volume_data,
155  const gsl::not_null<
157  volume_observers_contributed) mutable noexcept {
158  if (volume_data->count(observation_id) == 0) {
159  volume_data->operator[](observation_id) = std::move(in_volume_data);
160  (*volume_observers_contributed)[observation_id] = 1;
161  } else {
162  auto& current_data = volume_data->at(observation_id);
163  current_data.insert(std::make_move_iterator(in_volume_data.begin()),
164  std::make_move_iterator(in_volume_data.end()));
165  (*volume_observers_contributed)[observation_id]++;
166  }
167  // Check if we have received all "volume" data from the Observer
168  // group. If so we write to disk.
169  const auto procs_on_node =
170  static_cast<size_t>(Parallel::procs_on_node(Parallel::my_node()));
171  if (volume_observers_contributed->at(observation_id) ==
172  procs_on_node) {
173  Parallel::threaded_action<ThreadedActions::WriteVolumeData>(
174  Parallel::get_parallel_component<ObserverWriter<Metavariables>>(
175  cache)[static_cast<size_t>(Parallel::my_node())],
176  observation_id, subfile_name);
177  volume_observers_contributed->erase(observation_id);
178  }
179  });
180  }
181 };
182 } // namespace Actions
183 
184 namespace ThreadedActions {
185 /*!
186  * \ingroup ObserverGroup
187  * \brief Writes volume data at the `observation_id` to disk.
188  */
190  template <typename... DbTags, typename... InboxTags, typename Metavariables,
191  typename ArrayIndex, typename ActionList,
192  typename ParallelComponent,
193  Requires<sizeof...(DbTags) != 0> = nullptr>
194  static auto apply(db::DataBox<tmpl::list<DbTags...>>& box,
197  const ArrayIndex& /*array_index*/,
198  const ActionList /*meta*/,
199  const ParallelComponent* const /*meta*/,
200  const gsl::not_null<CmiNodeLock*> node_lock,
201  const observers::ObservationId& observation_id,
202  const std::string& subfile_name) noexcept {
203  // Get data from the DataBox in a thread-safe manner
204  Parallel::lock(node_lock);
206  volume_data{};
207  CmiNodeLock file_lock;
208  db::mutate<Tags::H5FileLock, Tags::TensorData>(
209  make_not_null(&box),
210  [&observation_id, &file_lock, &volume_data ](
211  const gsl::not_null<CmiNodeLock*> in_file_lock,
213  in_volume_data) noexcept {
214  volume_data = std::move((*in_volume_data)[observation_id]);
215  in_volume_data->erase(observation_id);
216  file_lock = *in_file_lock;
217  });
218  Parallel::unlock(node_lock);
219 
220  // Write to file. We use a separate node lock because writing can be very
221  // time consuming (it's network dependent, depends on how full the disks
222  // are, what other users are doing, etc.) and we want to be able to continue
223  // to work on the nodegroup while we are writing data to disk.
224  Parallel::lock(&file_lock);
225  {
226  // Scoping is for closing HDF5 file before we release the lock.
227  const auto& file_prefix =
228  Parallel::get<OptionTags::VolumeFileName>(cache);
230  file_prefix + std::to_string(Parallel::my_node()) + ".h5", true);
231  constexpr size_t version_number = 0;
232  auto& volume_file =
233  h5file.try_insert<h5::VolumeData>(subfile_name, version_number);
234  for (const auto& id_and_tensor_data_for_grid : volume_data) {
235  const auto& extents_and_tensors = id_and_tensor_data_for_grid.second;
236  volume_file.insert_tensor_data(
237  observation_id.hash(), observation_id.value(), extents_and_tensors);
238  }
239  }
240  Parallel::unlock(&file_lock);
241  }
242 };
243 } // namespace ThreadedActions
244 } // namespace observers
Definition: Actions.hpp:20
Move data to the observer writer for writing to disk.
Definition: VolumeActions.hpp:129
Defines class tuples::TaggedTuple.
Defines functions for interfacing with the parallelization framework.
int procs_on_node(const int node_index)
Number of processing elements on the given node.
Definition: Info.hpp:40
Defines class h5::H5File.
void insert_tensor_data(size_t observation_id, double observation_value, const ExtentsAndTensorVolumeData &extents_and_tensors) noexcept
Insert tensor components at observation_id with floating point value observation_value ...
#define ASSERT(a, m)
Assert that an expression should be true.
Definition: Assert.hpp:49
The nodegroup parallel component that is responsible for writing data to disk.
Definition: ObserverComponent.hpp:55
Defines the type alias Requires.
constexpr auto apply(F &&f, const DataBox< BoxTags > &box, Args &&... args)
Apply the function f with argument Tags TagsList from DataBox box
Definition: DataBox.hpp:1595
int my_node()
Index of my node.
Definition: Info.hpp:34
An ID type that identifies both the parallel component and the index in the parallel component...
Definition: ArrayComponentId.hpp:27
void unlock(const gsl::not_null< CmiNodeLock *> node_lock) noexcept
Unlock a converse CmiNodeLock.
Definition: NodeLock.hpp:62
Defines class template Index.
Defines enum for specifying whether the H5 file is ReadWrite or ReadOnly.
An associative container that is indexed by structs.
Definition: TaggedTuple.hpp:272
Defines classes and functions used for manipulating DataBox&#39;s.
A volume data subfile written inside an H5 file.
Definition: VolumeData.hpp:49
Definition: InterpolationTargetWedgeSectionTorus.hpp:24
A Charm++ chare that caches constant data once per Charm++ node.
Definition: ConstGlobalCache.hpp:76
decltype(auto) equal(const Container &lhs, const Container2 &rhs)
Convenience wrapper around std::equal, assumes containers lhs has at least as many elements as rhs...
Definition: Algorithm.hpp:249
Opens an HDF5 file for access and allows manipulation of data.
Definition: File.hpp:59
Send volume tensor data to the observer.
Definition: VolumeActions.hpp:59
void lock(const gsl::not_null< CmiNodeLock *> node_lock) noexcept
Lock a converse CmiNodeLock.
Definition: NodeLock.hpp:34
An integer multi-index.
Definition: Index.hpp:28
Holds the extents of the mesh and the tensor components on the mesh.
Definition: TensorData.hpp:55
auto get_parallel_component(ConstGlobalCache< Metavariables > &cache) noexcept -> Parallel::proxy_from_parallel_component< ConstGlobalCache_detail::get_component_if_mocked< typename Metavariables::component_list, ParallelComponentTag >> &
Access the Charm++ proxy associated with a ParallelComponent.
Definition: ConstGlobalCache.hpp:163
gsl::not_null< T * > make_not_null(T *ptr) noexcept
Construct a not_null from a pointer. Often this will be done as an implicit conversion, but it may be necessary to perform the conversion explicitly when type deduction is desired.
Definition: Gsl.hpp:863
typename Requires_detail::requires_impl< B >::template_error_type_failed_to_meet_requirements_on_template_parameters Requires
Express requirements on the template parameters of a function or class, replaces std::enable_if_t ...
Definition: Requires.hpp:67
Writes volume data at the observation_id to disk.
Definition: VolumeActions.hpp:189
Definition: SolvePoissonProblem.hpp:38
Defines classes SimpleTag, PrefixTag, ComputeTag and several functions for retrieving tag info...
A type-erased identifier that combines the identifier&#39;s type and hash used to uniquely identify an ob...
Definition: ObservationId.hpp:43
Defines class template ConstGlobalCache.
Definition: ComputeTimeDerivative.hpp:28
Require a pointer to not be a nullptr
Definition: ConservativeFromPrimitive.hpp:12