ReductionActions.hpp
1 // Distributed under the MIT License.
2 // See LICENSE.txt for details.
3 
4 #pragma once
5 
6 #include <cstddef>
7 #include <string>
8 #include <tuple>
9 #include <unordered_map>
10 #include <utility>
11 #include <vector>
12 
14 #include "ErrorHandling/Assert.hpp"
15 #include "IO/H5/AccessType.hpp"
16 #include "IO/H5/Dat.hpp"
17 #include "IO/H5/File.hpp"
18 #include "IO/Observer/ArrayComponentId.hpp"
19 #include "IO/Observer/ObservationId.hpp"
20 #include "IO/Observer/Tags.hpp"
22 #include "Parallel/Info.hpp"
23 #include "Parallel/NodeLock.hpp"
24 #include "Parallel/Printf.hpp"
25 #include "Parallel/Reduction.hpp"
26 #include "Utilities/Gsl.hpp"
27 #include "Utilities/MakeString.hpp"
28 #include "Utilities/Requires.hpp"
29 #include "Utilities/StdHelpers.hpp"
30 #include "Utilities/TMPL.hpp"
32 
33 namespace observers {
34 namespace ThreadedActions {
35 /// \cond
36 struct WriteReductionData;
37 /// \endcond
38 } // namespace ThreadedActions
39 
40 namespace Actions {
41 /// \cond
42 struct ContributeReductionDataToWriter;
43 /// \endcond
44 
45 /*!
46  * \ingroup ObserverGroup
47  * \brief Send reduction data to the observer group.
48  *
49  * Once everything at a specific `ObservationId` has been contributed to the
50  * reduction, the groups reduce to their local nodegroup.
51  *
52  * The caller of this Action (which is to be invoked on the Observer parallel
53  * component) must pass in an `observation_id` used to uniquely identify the
54  * observation in time, the name of the `h5::Dat` subfile in the HDF5 file (e.g.
55  * `/element_data`, where the slash is important), a `std::vector<std::string>`
56  * of names of the quantities being reduced (e.g. `{"Time", "L1ErrorDensity",
57  * "L2ErrorDensity"}`), and the `Parallel::ReductionData` that holds the
58  * `ReductionDatums` containing info on how to do the reduction.
59  *
60  * The observer components need to know all expected reduction data types by
61  * compile-time, so they rely on the
62  * `Metavariables::observed_reduction_data_tags` alias to collect them in one
63  * place. To this end, each Action that contributes reduction data must expose
64  * the type alias as:
65  *
66  * \snippet ObserverHelpers.hpp make_reduction_data_tags
67  *
68  * Then, in the `Metavariables` collect them from all observing Actions like
69  * this:
70  *
71  * \snippet Test_Observe.cpp collect_reduction_data_tags
72  */
74  template <typename... DbTags, typename... InboxTags, typename Metavariables,
75  typename ArrayIndex, typename ActionList,
76  typename ParallelComponent, typename... Ts,
77  Requires<sizeof...(DbTags) != 0> = nullptr>
78  static auto apply(db::DataBox<tmpl::list<DbTags...>>& box,
79  const tuples::TaggedTuple<InboxTags...>& /*inboxes*/,
81  const ArrayIndex& /*array_index*/,
82  const ActionList /*meta*/,
83  const ParallelComponent* const /*meta*/,
84  const observers::ObservationId& observation_id,
85  const std::string& subfile_name,
86  const std::vector<std::string>& reduction_names,
87  Parallel::ReductionData<Ts...>&& reduction_data) noexcept {
90  make_not_null(&box),
91  [
92  &observation_id, reduction_data = std::move(reduction_data),
93  &reduction_names, &cache, &subfile_name
95  ObservationId, Parallel::ReductionData<Ts...>>*>
96  reduction_data_map,
97  const gsl::not_null<
99  reduction_names_map,
100  const gsl::not_null<
102  reduction_observers_contributed,
104  reduction_component_ids) mutable noexcept {
105  auto& contribute_count =
106  (*reduction_observers_contributed)[observation_id];
107  if (reduction_data_map->count(observation_id) == 0) {
108  reduction_data_map->emplace(observation_id,
109  std::move(reduction_data));
110  reduction_names_map->emplace(observation_id, reduction_names);
111  contribute_count = 1;
112  } else {
113  ASSERT(
114  reduction_names_map->at(observation_id) == reduction_names,
115  "Reduction names differ at ObservationId "
116  << observation_id
117  << " with the expected names being "
118  // Use MakeString to get around ADL for STL stream operators
119  // (MakeString is in global namespace).
120  << (MakeString{} << reduction_names_map->at(observation_id)
121  << " and the received names being "
122  << reduction_names));
123  reduction_data_map->operator[](observation_id)
124  .combine(std::move(reduction_data));
125  contribute_count++;
126  }
127 
128  // Check if we have received all reduction data from the registered
129  // elements. If so, we reduce to the local ObserverWriter nodegroup.
130  if (contribute_count == reduction_component_ids.size()) {
131  const auto node_id = Parallel::my_node();
132  auto& local_writer = *Parallel::get_parallel_component<
134  .ckLocalBranch();
135  Parallel::threaded_action<ThreadedActions::WriteReductionData>(
136  local_writer, observation_id, subfile_name,
137  node_id == 0 ? std::move((*reduction_names_map)[observation_id])
139  std::move((*reduction_data_map)[observation_id]));
140  reduction_data_map->erase(observation_id);
141  reduction_names_map->erase(observation_id);
142  reduction_observers_contributed->erase(observation_id);
143  }
144  },
145  db::get<Tags::ReductionArrayComponentIds>(box));
146  }
147 };
148 } // namespace Actions
149 
150 namespace ThreadedActions {
151 /*!
152  * \ingroup ObserverGroup
153  * \brief Collect the reduction data from the Observer group on the
154  * ObserverWriter nodegroup before sending to node 0 for writing to disk.
155  *
156  * \note This action is also used for writing on node 0.
157  */
159  private:
160  template <typename... Ts, size_t... Is>
161  static void write_data(const std::string& subfile_name,
162  std::vector<std::string>&& legend,
163  std::tuple<Ts...>&& data,
164  const std::string& file_prefix,
165  std::index_sequence<Is...> /*meta*/) noexcept {
166  static_assert(sizeof...(Ts) > 0,
167  "Must be reducing at least one piece of data");
168  std::vector<double> data_to_append{
169  static_cast<double>(std::get<Is>(data))...};
170 
171  h5::H5File<h5::AccessType::ReadWrite> h5file(file_prefix + ".h5", true);
172  constexpr size_t version_number = 0;
173  auto& time_series_file = h5file.try_insert<h5::Dat>(
174  subfile_name, std::move(legend), version_number);
175  time_series_file.append(data_to_append);
176  }
177 
178  public:
179  template <typename... DbTags, typename... InboxTags, typename Metavariables,
180  typename ArrayIndex, typename ActionList,
181  typename ParallelComponent, typename... ReductionDatums,
182  Requires<sizeof...(DbTags) != 0> = nullptr>
183  static void apply(db::DataBox<tmpl::list<DbTags...>>& box,
186  const ArrayIndex& /*array_index*/,
187  const ActionList /*meta*/,
188  const ParallelComponent* const /*meta*/,
189  const gsl::not_null<CmiNodeLock*> node_lock,
190  const observers::ObservationId& observation_id,
191  const std::string& subfile_name,
192  std::vector<std::string>&& reduction_names,
193  Parallel::ReductionData<ReductionDatums...>&&
194  in_reduction_data) noexcept {
195  CmiNodeLock file_lock;
196  bool write_to_disk = false;
197  std::vector<std::string> legend{};
198  Parallel::lock(node_lock);
199  db::mutate<Tags::ReductionData<ReductionDatums...>,
200  Tags::ReductionDataNames<ReductionDatums...>,
202  make_not_null(&box),
203  [
204  &cache, &file_lock, &in_reduction_data, &legend, &observation_id,
205  &reduction_names, &subfile_name, &write_to_disk
206  ](const gsl::not_null<
207  db::item_type<Tags::ReductionData<ReductionDatums...>>*>
208  reduction_data,
209  const gsl::not_null<
211  reduction_names_map,
212  const gsl::not_null<
214  reduction_observers_contributed,
215  const gsl::not_null<CmiNodeLock*> reduction_file_lock) noexcept {
216  auto& contribute_count =
217  (*reduction_observers_contributed)[observation_id];
218  const auto node_id = Parallel::my_node();
219  const auto number_of_nodes =
220  static_cast<size_t>(Parallel::number_of_nodes());
221  const auto procs_on_node =
222  static_cast<size_t>(Parallel::procs_on_node(node_id));
223 
224  if (node_id == 0 and not reduction_names.empty()) {
225  reduction_names_map->emplace(observation_id,
226  std::move(reduction_names));
227  }
228 
229  if (UNLIKELY(procs_on_node == 1 and number_of_nodes == 1)) {
230  write_to_disk = true;
231  file_lock = *reduction_file_lock;
232  legend = std::move(reduction_names_map->operator[](observation_id));
233  reduction_names_map->erase(observation_id);
234  } else if (reduction_data->count(observation_id) == 0) {
235  reduction_data->operator[](observation_id) =
236  std::move(in_reduction_data);
237  contribute_count = 1;
238  } else if (contribute_count ==
239  (procs_on_node - 1) + (number_of_nodes - 1)) {
240  ASSERT(node_id == 0,
241  "Should only receive additional reduction data on node 0 "
242  "but received it on node "
243  << node_id);
244  // On node 0 we are collecting data from all other nodes so we
245  // should get procs_on_node data from the group on our node plus
246  // (number_of_nodes - 1) contributions from other nodes.
247  in_reduction_data.combine(
248  std::move(reduction_data->operator[](observation_id)));
249  reduction_data->erase(observation_id);
250  reduction_observers_contributed->erase(observation_id);
251  write_to_disk = true;
252  file_lock = *reduction_file_lock;
253  legend = std::move(reduction_names_map->operator[](observation_id));
254  reduction_names_map->erase(observation_id);
255  } else {
256  reduction_data->at(observation_id)
257  .combine(std::move(in_reduction_data));
258  contribute_count++;
259  }
260 
261  // Check if we have received all reduction data from the Observer
262  // group. If so we reduce to node 0 for writing to disk.
263  if (node_id != 0 and reduction_observers_contributed->at(
264  observation_id) == procs_on_node) {
265  Parallel::threaded_action<WriteReductionData>(
266  Parallel::get_parallel_component<ObserverWriter<Metavariables>>(
267  cache)[0],
268  observation_id, subfile_name, std::vector<std::string>{},
269  std::move(reduction_data->operator[](observation_id)));
270  reduction_observers_contributed->erase(observation_id);
271  reduction_data->erase(observation_id);
272  }
273  });
274  Parallel::unlock(node_lock);
275 
276  if (write_to_disk) {
277  Parallel::lock(&file_lock);
278  in_reduction_data.finalize();
279  WriteReductionData::write_data(
280  subfile_name, std::move(legend), std::move(in_reduction_data.data()),
281  Parallel::get<OptionTags::ReductionFileName>(cache),
282  std::make_index_sequence<sizeof...(ReductionDatums)>{});
283  Parallel::unlock(&file_lock);
284  }
285  }
286 };
287 } // namespace ThreadedActions
288 } // namespace observers
Definition: Actions.hpp:20
Defines helper functions for the standard library.
The number of observer components that have contributed data at the observation ids.
Definition: Tags.hpp:95
Defines class tuples::TaggedTuple.
Defines functions for interfacing with the parallelization framework.
int procs_on_node(const int node_index)
Number of processing elements on the given node.
Definition: Info.hpp:40
void mutate(const gsl::not_null< DataBox< TagList > *> box, Invokable &&invokable, Args &&... args) noexcept
Allows changing the state of one or more non-computed elements in the DataBox.
Definition: DataBox.hpp:1099
Defines class h5::H5File.
#define UNLIKELY(x)
Definition: Gsl.hpp:72
#define ASSERT(a, m)
Assert that an expression should be true.
Definition: Assert.hpp:49
Represents a multicolumn dat file inside an HDF5 file.
Definition: Dat.hpp:42
The nodegroup parallel component that is responsible for writing data to disk.
Definition: ObserverComponent.hpp:55
Defines the type alias Requires.
constexpr auto apply(F &&f, const DataBox< BoxTags > &box, Args &&... args)
Apply the function f with argument Tags TagsList from DataBox box
Definition: DataBox.hpp:1595
int my_node()
Index of my node.
Definition: Info.hpp:34
Names of the reduction data to be written to disk.
Definition: Tags.hpp:71
void unlock(const gsl::not_null< CmiNodeLock *> node_lock) noexcept
Unlock a converse CmiNodeLock.
Definition: NodeLock.hpp:62
HIDDEN_SYMBOLS void append(const std::vector< double > &data)
Definition: Dat.cpp:144
Make a string by streaming into object.
Definition: MakeString.hpp:16
Defines enum for specifying whether the H5 file is ReadWrite or ReadOnly.
An associative container that is indexed by structs.
Definition: TaggedTuple.hpp:272
Defines classes and functions used for manipulating DataBox&#39;s.
Defines Parallel::printf for writing to stdout.
Definition: InterpolationTargetWedgeSectionTorus.hpp:24
A Charm++ chare that caches constant data once per Charm++ node.
Definition: ConstGlobalCache.hpp:76
Opens an HDF5 file for access and allows manipulation of data.
Definition: File.hpp:59
Reduction data to be written to disk.
Definition: Tags.hpp:62
Defines macro ASSERT.
void lock(const gsl::not_null< CmiNodeLock *> node_lock) noexcept
Lock a converse CmiNodeLock.
Definition: NodeLock.hpp:34
ObjectType & try_insert(const std::string &path, Args &&... args) noexcept
Inserts an object like insert if it does not exist, returns the object if it does.
Definition: File.hpp:253
Wraps the template metaprogramming library used (brigand)
auto get_parallel_component(ConstGlobalCache< Metavariables > &cache) noexcept -> Parallel::proxy_from_parallel_component< ConstGlobalCache_detail::get_component_if_mocked< typename Metavariables::component_list, ParallelComponentTag >> &
Access the Charm++ proxy associated with a ParallelComponent.
Definition: ConstGlobalCache.hpp:163
typename DataBox_detail::item_type_impl< TagList, Tag >::type item_type
Get the type that is returned by the Tag. If it is a base tag then a TagList must be passed as a seco...
Definition: DataBoxTag.hpp:410
Defines functions and classes from the GSL.
gsl::not_null< T * > make_not_null(T *ptr) noexcept
Construct a not_null from a pointer. Often this will be done as an implicit conversion, but it may be necessary to perform the conversion explicitly when type deduction is desired.
Definition: Gsl.hpp:863
Defines class h5::Dat.
typename Requires_detail::requires_impl< B >::template_error_type_failed_to_meet_requirements_on_template_parameters Requires
Express requirements on the template parameters of a function or class, replaces std::enable_if_t ...
Definition: Requires.hpp:67
Collect the reduction data from the Observer group on the ObserverWriter nodegroup before sending to ...
Definition: ReductionActions.hpp:158
Definition: SolvePoissonProblem.hpp:38
A type-erased identifier that combines the identifier&#39;s type and hash used to uniquely identify an ob...
Definition: ObservationId.hpp:43
Defines class template ConstGlobalCache.
Send reduction data to the observer group.
Definition: ReductionActions.hpp:73
Node lock used when needing to read/write to H5 files on disk.
Definition: Tags.hpp:105
Definition: ComputeTimeDerivative.hpp:28
void write_data(const hid_t group_id, const std::vector< T > &data, const std::vector< size_t > &extents, const std::string &name) noexcept
Write a std::vector named name to the group group_id
Definition: Helpers.cpp:114
Require a pointer to not be a nullptr
Definition: ConservativeFromPrimitive.hpp:12
int number_of_nodes()
Number of nodes.
Definition: Info.hpp:28