ObserverRegistration.hpp
1 // Distributed under the MIT License.
2 // See LICENSE.txt for details.
3 
4 #pragma once
5 
6 #include <cstddef>
7 #include <set>
8 #include <type_traits>
9 #include <unordered_map>
10 #include <unordered_set>
11 
13 #include "DataStructures/Index.hpp"
14 #include "IO/Observer/ArrayComponentId.hpp"
15 #include "IO/Observer/ObserverComponent.hpp"
16 #include "IO/Observer/Tags.hpp"
17 #include "IO/Observer/TypeOfObservation.hpp"
18 #include "Parallel/GlobalCache.hpp"
19 #include "Parallel/Info.hpp"
20 #include "Parallel/Invoke.hpp"
21 #include "Utilities/Gsl.hpp"
22 #include "Utilities/TMPL.hpp"
23 #include "Utilities/TaggedTuple.hpp"
24 
25 namespace observers {
26 /*!
27  * \ingroup ObserversGroup
28  * \brief %Actions used by the observer parallel component
29  */
30 namespace Actions {
31 /// \brief Register an `ArrayComponentId` with a specific
32 /// `ObservationIdRegistrationKey` that will call
33 /// `observers::ThreadedActions::ContributeVolumeData`.
34 ///
35 /// Should be invoked on ObserverWriter by the component that will be
36 /// contributing the data.
38  public:
39  template <typename ParallelComponent, typename DbTagsList,
40  typename Metavariables, typename ArrayIndex>
41  static void apply(db::DataBox<DbTagsList>& box,
42  const Parallel::GlobalCache<Metavariables>& /*cache*/,
43  const ArrayIndex& /*array_index*/,
44  const observers::ObservationKey& observation_key,
45  const ArrayComponentId& id_of_caller) noexcept {
46  if constexpr (tmpl::list_contains_v<
48  db::mutate<Tags::ExpectedContributorsForObservations>(
49  make_not_null(&box),
50  [&id_of_caller, &observation_key](
53  volume_observers_registered) noexcept {
54  if (volume_observers_registered->find(observation_key) ==
55  volume_observers_registered->end()) {
56  (*volume_observers_registered)[observation_key] =
57  std::unordered_set<ArrayComponentId>{};
58  }
59 
60  if (UNLIKELY(
61  volume_observers_registered->at(observation_key)
62  .find(id_of_caller) !=
63  volume_observers_registered->at(observation_key).end())) {
64  ERROR("Trying to insert a Observer component more than once: "
65  << id_of_caller);
66  }
67 
68  volume_observers_registered->at(observation_key)
69  .insert(id_of_caller);
70  });
71  } else {
72  (void)box;
73  (void)observation_key;
74  (void)id_of_caller;
75  ERROR(
76  "Could not find tag "
77  "observers::Tags::ExpectedContributorsForObservations in the "
78  "DataBox.");
79  }
80  }
81 };
82 
83 /*!
84  * \brief Register a node with the node that writes the reduction data to disk.
85  */
87  template <typename ParallelComponent, typename DbTagsList,
88  typename Metavariables, typename ArrayIndex>
89  static void apply(db::DataBox<DbTagsList>& box,
91  const ArrayIndex& /*array_index*/,
92  const observers::ObservationKey& observation_key,
93  const size_t caller_node_id) noexcept {
94  if constexpr (tmpl::list_contains_v<
96  const auto node_id = static_cast<size_t>(Parallel::my_node());
97  ASSERT(node_id == 0, "Only node zero, not node "
98  << node_id
99  << ", should be called from another node");
100 
101  db::mutate<Tags::NodesExpectedToContributeReductions>(
102  make_not_null(&box),
103  [&caller_node_id, &observation_key](
104  const gsl::not_null<
106  reduction_observers_registered_nodes) noexcept {
107  if (reduction_observers_registered_nodes->find(observation_key) ==
108  reduction_observers_registered_nodes->end()) {
109  (*reduction_observers_registered_nodes)[observation_key] =
110  std::set<size_t>{};
111  }
112  if (UNLIKELY(
113  reduction_observers_registered_nodes->at(observation_key)
114  .find(caller_node_id) !=
115  reduction_observers_registered_nodes->at(observation_key)
116  .end())) {
117  ERROR("Already registered node "
118  << caller_node_id << " for reduction observations.");
119  }
120  reduction_observers_registered_nodes->at(observation_key)
121  .insert(caller_node_id);
122  });
123  } else {
124  (void)box;
125  (void)observation_key;
126  (void)caller_node_id;
127  ERROR(
128  "Do not have tag "
129  "observers::Tags::NodesExpectedToContributeReductions "
130  "in the DataBox. This means components are registering for "
131  "reductions before initialization is complete.");
132  }
133  }
134 };
135 
136 /// \brief Register an `ArrayComponentId` that will call
137 /// `observers::ThreadedActions::WriteReductionData` or
138 /// `observers::ThreadedActions::ContributeReductionData` for a specific
139 /// `ObservationIdRegistrationKey`
140 ///
141 /// Should be invoked on ObserverWriter by the component that will be
142 /// contributing the data.
144  public:
145  template <typename ParallelComponent, typename DbTagsList,
146  typename Metavariables, typename ArrayIndex>
147  static void apply(db::DataBox<DbTagsList>& box,
149  const ArrayIndex& /*array_index*/,
150  const observers::ObservationKey& observation_key,
151  const ArrayComponentId& id_of_caller) noexcept {
152  if constexpr (tmpl::list_contains_v<
154  const auto node_id = static_cast<size_t>(Parallel::my_node());
155  db::mutate<Tags::ExpectedContributorsForObservations>(
156  make_not_null(&box),
157  [&cache, &id_of_caller, &node_id, &observation_key](
160  reduction_observers_registered) noexcept {
161  if (reduction_observers_registered->find(observation_key) ==
162  reduction_observers_registered->end()) {
163  (*reduction_observers_registered)[observation_key] =
164  std::unordered_set<ArrayComponentId>{};
169  observation_key, node_id);
170  }
171 
172  if (LIKELY(reduction_observers_registered->at(observation_key)
173  .find(id_of_caller) ==
174  reduction_observers_registered->at(observation_key)
175  .end())) {
176  reduction_observers_registered->at(observation_key)
177  .insert(id_of_caller);
178  } else {
179  ERROR("Trying to insert a Observer component more than once: "
180  << id_of_caller
181  << " with observation key: " << observation_key);
182  }
183  });
184  } else {
185  (void)box;
186  (void)cache;
187  (void)observation_key;
188  (void)id_of_caller;
189  ERROR(
190  "Could not find tag "
191  "observers::Tags::ExpectedContributorsForObservations in the "
192  "DataBox.");
193  }
194  }
195 };
196 
197 /*!
198  * \brief Register the `ArrayComponentId` that will send the data to the
199  * observer for the given `ObservationIdRegistrationKey`
200  *
201  * Should be invoked on the `Observer` by the contributing component.
202  */
204  template <typename ParallelComponent, typename DbTagList,
205  typename Metavariables, typename ArrayIndex>
206  static void apply(db::DataBox<DbTagList>& box,
208  const ArrayIndex& array_index,
209  const observers::ObservationKey& observation_key,
210  const observers::ArrayComponentId& component_id,
211  const TypeOfObservation& type_of_observation) noexcept {
212  if constexpr (tmpl::list_contains_v<
213  DbTagList,
215  bool observation_key_already_registered = true;
216  db::mutate<observers::Tags::ExpectedContributorsForObservations>(
217  make_not_null(&box),
218  [&component_id, &observation_key,
219  &observation_key_already_registered](
222  array_component_ids) noexcept {
223  observation_key_already_registered =
224  (array_component_ids->find(observation_key) !=
225  array_component_ids->end());
226  if (UNLIKELY(observation_key_already_registered and
227  array_component_ids->at(observation_key)
228  .find(component_id) !=
229  array_component_ids->at(observation_key).end())) {
230  ERROR(
231  "Trying to insert a component_id more than once for "
232  "observation. This means an element is registering itself "
233  "with the observers more than once. The component_id is "
234  << component_id << " and the observation key is "
235  << observation_key);
236  }
237  array_component_ids->operator[](observation_key)
238  .insert(component_id);
239  });
240 
241  if (observation_key_already_registered) {
242  // We only need to register with the observer writer on the first call
243  // of this action. Later calls will have already been registered.
244  return;
245  }
246 
247  auto& observer_writer =
250  .ckLocalBranch();
251 
252  switch (type_of_observation) {
253  case TypeOfObservation::Reduction:
256  observer_writer, observation_key,
258  Parallel::ArrayIndex<ArrayIndex>(array_index)});
259  return;
260  case TypeOfObservation::Volume:
263  observer_writer, observation_key,
265  Parallel::ArrayIndex<ArrayIndex>(array_index)});
266  return;
267  default:
268  ERROR(
269  "Registering an unknown TypeOfObservation. Should be one of "
270  "'Reduction' or 'Volume'");
271  };
272  } else {
273  ERROR(
274  "The DataBox must contain the tag "
275  "observers::Tags::ExpectedContributorsForObservations when the "
276  "action "
277  "RegisterContributorWithObserver is called.");
278  }
279  }
280 };
281 } // namespace Actions
282 } // namespace observers
UNLIKELY
#define UNLIKELY(x)
Definition: Gsl.hpp:73
Parallel::GlobalCache
Definition: ElementReceiveInterpPoints.hpp:15
observers::Actions::RegisterVolumeContributorWithObserverWriter
Register an ArrayComponentId with a specific ObservationIdRegistrationKey that will call observers::T...
Definition: ObserverRegistration.hpp:37
observers::Actions::RegisterReductionContributorWithObserverWriter
Register an ArrayComponentId that will call observers::ThreadedActions::WriteReductionData or observe...
Definition: ObserverRegistration.hpp:143
unordered_set
GlobalCache.hpp
Parallel::get_parallel_component
auto get_parallel_component(GlobalCache< Metavariables > &cache) noexcept -> Parallel::proxy_from_parallel_component< GlobalCache_detail::get_component_if_mocked< typename Metavariables::component_list, ParallelComponentTag >> &
Access the Charm++ proxy associated with a ParallelComponent.
Definition: GlobalCache.hpp:454
observers::ObservationKey
Used as a key in maps to keep track of how many elements have registered.
Definition: ObservationId.hpp:28
Info.hpp
observers::TypeOfObservation
TypeOfObservation
Specifies the type of observation.
Definition: TypeOfObservation.hpp:18
ERROR
#define ERROR(m)
prints an error message to the standard error stream and aborts the program.
Definition: Error.hpp:36
observers::ObserverWriter
The nodegroup parallel component that is responsible for writing data to disk.
Definition: ObserverComponent.hpp:51
std::add_pointer_t
Parallel::my_node
int my_node()
Index of my node.
Definition: Info.hpp:34
DataBox.hpp
cstddef
observers::Tags::NodesExpectedToContributeReductions
The set of nodes that are registered with each ObservationIdRegistrationKey for writing reduction dat...
Definition: Tags.hpp:69
Index.hpp
observers::Actions::RegisterReductionNodeWithWritingNode
Register a node with the node that writes the reduction data to disk.
Definition: ObserverRegistration.hpp:86
observers::Tags::ExpectedContributorsForObservations
The set of ArrayComponentIds that will contribute to each ObservationId for reduction.
Definition: Tags.hpp:35
ASSERT
#define ASSERT(a, m)
Assert that an expression should be true.
Definition: Assert.hpp:51
Gsl.hpp
LIKELY
#define LIKELY(x)
Definition: Gsl.hpp:67
observers::ArrayComponentId
An ID type that identifies both the parallel component and the index in the parallel component.
Definition: ArrayComponentId.hpp:27
Parallel::simple_action
void simple_action(Proxy &&proxy) noexcept
Invoke a simple action on proxy
Definition: Invoke.hpp:82
make_not_null
gsl::not_null< T * > make_not_null(T *ptr) noexcept
Construct a not_null from a pointer. Often this will be done as an implicit conversion,...
Definition: Gsl.hpp:880
Parallel::ArrayIndex
The array index used for indexing Chare Arrays, mostly an implementation detail.
Definition: ArrayIndex.hpp:27
unordered_map
type_traits
set
TMPL.hpp
observers::Actions::RegisterContributorWithObserver
Register the ArrayComponentId that will send the data to the observer for the given ObservationIdRegi...
Definition: ObserverRegistration.hpp:203
gsl::not_null
Require a pointer to not be a nullptr
Definition: Gsl.hpp:183