ObserverRegistration.hpp
1 // Distributed under the MIT License.
2 // See LICENSE.txt for details.
3 
4 #pragma once
5 
6 #include <cstddef>
7 #include <set>
8 #include <type_traits>
9 #include <unordered_map>
10 #include <unordered_set>
11 
13 #include "DataStructures/Index.hpp"
14 #include "IO/Observer/ArrayComponentId.hpp"
15 #include "IO/Observer/ObserverComponent.hpp"
16 #include "IO/Observer/Tags.hpp"
17 #include "IO/Observer/TypeOfObservation.hpp"
18 #include "Parallel/GlobalCache.hpp"
19 #include "Parallel/Info.hpp"
20 #include "Parallel/Invoke.hpp"
21 #include "Utilities/Gsl.hpp"
22 #include "Utilities/TMPL.hpp"
23 #include "Utilities/TaggedTuple.hpp"
24 
25 namespace observers {
26 /*!
27  * \ingroup ObserversGroup
28  * \brief %Actions used by the observer parallel component
29  */
30 namespace Actions {
31 /// \brief Register an `ArrayComponentId` with a specific
32 /// `ObservationIdRegistrationKey` that will call
33 /// `observers::ThreadedActions::ContributeVolumeData`.
34 ///
35 /// Should be invoked on ObserverWriter by the component that will be
36 /// contributing the data.
38  public:
39  template <typename ParallelComponent, typename DbTagsList,
40  typename Metavariables, typename ArrayIndex>
41  static void apply(db::DataBox<DbTagsList>& box,
42  const Parallel::GlobalCache<Metavariables>& /*cache*/,
43  const ArrayIndex& /*array_index*/,
44  const observers::ObservationKey& observation_key,
45  const ArrayComponentId& id_of_caller) noexcept {
46  if constexpr (tmpl::list_contains_v<
48  db::mutate<Tags::ExpectedContributorsForObservations>(
49  make_not_null(&box),
50  [&id_of_caller, &observation_key](
53  volume_observers_registered) noexcept {
54  if (volume_observers_registered->find(observation_key) ==
55  volume_observers_registered->end()) {
56  (*volume_observers_registered)[observation_key] =
57  std::unordered_set<ArrayComponentId>{};
58  }
59 
60  if (UNLIKELY(
61  volume_observers_registered->at(observation_key)
62  .find(id_of_caller) !=
63  volume_observers_registered->at(observation_key).end())) {
64  ERROR("Trying to insert a Observer component more than once: "
65  << id_of_caller);
66  }
67 
68  volume_observers_registered->at(observation_key)
69  .insert(id_of_caller);
70  });
71  } else {
72  (void)box;
73  (void)observation_key;
74  (void)id_of_caller;
75  ERROR(
76  "Could not find tag "
77  "observers::Tags::ExpectedContributorsForObservations in the "
78  "DataBox.");
79  }
80  }
81 };
82 
83 /*!
84  * \brief Register a node with the node that writes the reduction data to disk.
85  */
87  template <typename ParallelComponent, typename DbTagsList,
88  typename Metavariables, typename ArrayIndex>
89  static void apply(db::DataBox<DbTagsList>& box,
91  const ArrayIndex& /*array_index*/,
92  const observers::ObservationKey& observation_key,
93  const size_t caller_node_id) noexcept {
94  if constexpr (tmpl::list_contains_v<
96  auto& my_proxy =
97  Parallel::get_parallel_component<ParallelComponent>(cache);
98  const auto node_id =
99  static_cast<size_t>(Parallel::my_node(*my_proxy.ckLocalBranch()));
100  ASSERT(node_id == 0, "Only node zero, not node "
101  << node_id
102  << ", should be called from another node");
103 
104  db::mutate<Tags::NodesExpectedToContributeReductions>(
105  make_not_null(&box),
106  [&caller_node_id, &observation_key](
107  const gsl::not_null<
109  reduction_observers_registered_nodes) noexcept {
110  if (reduction_observers_registered_nodes->find(observation_key) ==
111  reduction_observers_registered_nodes->end()) {
112  (*reduction_observers_registered_nodes)[observation_key] =
113  std::set<size_t>{};
114  }
115  if (UNLIKELY(
116  reduction_observers_registered_nodes->at(observation_key)
117  .find(caller_node_id) !=
118  reduction_observers_registered_nodes->at(observation_key)
119  .end())) {
120  ERROR("Already registered node "
121  << caller_node_id << " for reduction observations.");
122  }
123  reduction_observers_registered_nodes->at(observation_key)
124  .insert(caller_node_id);
125  });
126  } else {
127  (void)box;
128  (void)observation_key;
129  (void)caller_node_id;
130  ERROR(
131  "Do not have tag "
132  "observers::Tags::NodesExpectedToContributeReductions "
133  "in the DataBox. This means components are registering for "
134  "reductions before initialization is complete.");
135  }
136  }
137 };
138 
139 /// \brief Register an `ArrayComponentId` that will call
140 /// `observers::ThreadedActions::WriteReductionData` or
141 /// `observers::ThreadedActions::ContributeReductionData` for a specific
142 /// `ObservationIdRegistrationKey`
143 ///
144 /// Should be invoked on ObserverWriter by the component that will be
145 /// contributing the data.
147  public:
148  template <typename ParallelComponent, typename DbTagsList,
149  typename Metavariables, typename ArrayIndex>
150  static void apply(db::DataBox<DbTagsList>& box,
152  const ArrayIndex& /*array_index*/,
153  const observers::ObservationKey& observation_key,
154  const ArrayComponentId& id_of_caller) noexcept {
155  if constexpr (tmpl::list_contains_v<
157  auto& my_proxy =
158  Parallel::get_parallel_component<ParallelComponent>(cache);
159  const auto node_id =
160  static_cast<size_t>(Parallel::my_node(*my_proxy.ckLocalBranch()));
161  db::mutate<Tags::ExpectedContributorsForObservations>(
162  make_not_null(&box),
163  [&cache, &id_of_caller, &node_id, &observation_key](
166  reduction_observers_registered) noexcept {
167  if (reduction_observers_registered->find(observation_key) ==
168  reduction_observers_registered->end()) {
169  (*reduction_observers_registered)[observation_key] =
170  std::unordered_set<ArrayComponentId>{};
175  observation_key, node_id);
176  }
177 
178  if (LIKELY(reduction_observers_registered->at(observation_key)
179  .find(id_of_caller) ==
180  reduction_observers_registered->at(observation_key)
181  .end())) {
182  reduction_observers_registered->at(observation_key)
183  .insert(id_of_caller);
184  } else {
185  ERROR("Trying to insert a Observer component more than once: "
186  << id_of_caller
187  << " with observation key: " << observation_key);
188  }
189  });
190  } else {
191  (void)box;
192  (void)cache;
193  (void)observation_key;
194  (void)id_of_caller;
195  ERROR(
196  "Could not find tag "
197  "observers::Tags::ExpectedContributorsForObservations in the "
198  "DataBox.");
199  }
200  }
201 };
202 
203 /*!
204  * \brief Register the `ArrayComponentId` that will send the data to the
205  * observer for the given `ObservationIdRegistrationKey`
206  *
207  * Should be invoked on the `Observer` by the contributing component.
208  */
210  template <typename ParallelComponent, typename DbTagList,
211  typename Metavariables, typename ArrayIndex>
212  static void apply(db::DataBox<DbTagList>& box,
214  const ArrayIndex& array_index,
215  const observers::ObservationKey& observation_key,
216  const observers::ArrayComponentId& component_id,
217  const TypeOfObservation& type_of_observation) noexcept {
218  if constexpr (tmpl::list_contains_v<
219  DbTagList,
221  bool observation_key_already_registered = true;
222  db::mutate<observers::Tags::ExpectedContributorsForObservations>(
223  make_not_null(&box),
224  [&component_id, &observation_key,
225  &observation_key_already_registered](
228  array_component_ids) noexcept {
229  observation_key_already_registered =
230  (array_component_ids->find(observation_key) !=
231  array_component_ids->end());
232  if (UNLIKELY(observation_key_already_registered and
233  array_component_ids->at(observation_key)
234  .find(component_id) !=
235  array_component_ids->at(observation_key).end())) {
236  ERROR(
237  "Trying to insert a component_id more than once for "
238  "observation. This means an element is registering itself "
239  "with the observers more than once. The component_id is "
240  << component_id << " and the observation key is "
241  << observation_key);
242  }
243  array_component_ids->operator[](observation_key)
244  .insert(component_id);
245  });
246 
247  if (observation_key_already_registered) {
248  // We only need to register with the observer writer on the first call
249  // of this action. Later calls will have already been registered.
250  return;
251  }
252 
253  auto& observer_writer =
256  .ckLocalBranch();
257 
258  switch (type_of_observation) {
259  case TypeOfObservation::Reduction:
262  observer_writer, observation_key,
264  Parallel::ArrayIndex<ArrayIndex>(array_index)});
265  return;
266  case TypeOfObservation::Volume:
269  observer_writer, observation_key,
271  Parallel::ArrayIndex<ArrayIndex>(array_index)});
272  return;
273  default:
274  ERROR(
275  "Registering an unknown TypeOfObservation. Should be one of "
276  "'Reduction' or 'Volume'");
277  };
278  } else {
279  ERROR(
280  "The DataBox must contain the tag "
281  "observers::Tags::ExpectedContributorsForObservations when the "
282  "action "
283  "RegisterContributorWithObserver is called.");
284  }
285  }
286 };
287 } // namespace Actions
288 } // namespace observers
UNLIKELY
#define UNLIKELY(x)
Definition: Gsl.hpp:73
Parallel::GlobalCache
Definition: ElementReceiveInterpPoints.hpp:15
observers::Actions::RegisterVolumeContributorWithObserverWriter
Register an ArrayComponentId with a specific ObservationIdRegistrationKey that will call observers::T...
Definition: ObserverRegistration.hpp:37
observers::Actions::RegisterReductionContributorWithObserverWriter
Register an ArrayComponentId that will call observers::ThreadedActions::WriteReductionData or observe...
Definition: ObserverRegistration.hpp:146
unordered_set
GlobalCache.hpp
Parallel::get_parallel_component
auto get_parallel_component(GlobalCache< Metavariables > &cache) noexcept -> Parallel::proxy_from_parallel_component< GlobalCache_detail::get_component_if_mocked< typename Metavariables::component_list, ParallelComponentTag >> &
Access the Charm++ proxy associated with a ParallelComponent.
Definition: GlobalCache.hpp:507
observers::ObservationKey
Used as a key in maps to keep track of how many elements have registered.
Definition: ObservationId.hpp:28
Info.hpp
observers::TypeOfObservation
TypeOfObservation
Specifies the type of observation.
Definition: TypeOfObservation.hpp:18
ERROR
#define ERROR(m)
prints an error message to the standard error stream and aborts the program.
Definition: Error.hpp:36
observers::ObserverWriter
The nodegroup parallel component that is responsible for writing data to disk.
Definition: ObserverComponent.hpp:51
std::add_pointer_t
Parallel::my_node
int my_node(const DistribObject &distributed_object) noexcept
Index of my node.
Definition: Info.hpp:51
DataBox.hpp
cstddef
observers::Tags::NodesExpectedToContributeReductions
The set of nodes that are registered with each ObservationIdRegistrationKey for writing reduction dat...
Definition: Tags.hpp:69
Index.hpp
observers::Actions::RegisterReductionNodeWithWritingNode
Register a node with the node that writes the reduction data to disk.
Definition: ObserverRegistration.hpp:86
observers::Tags::ExpectedContributorsForObservations
The set of ArrayComponentIds that will contribute to each ObservationId for reduction.
Definition: Tags.hpp:35
ASSERT
#define ASSERT(a, m)
Assert that an expression should be true.
Definition: Assert.hpp:49
ActionTesting::cache
Parallel::GlobalCache< Metavariables > & cache(MockRuntimeSystem< Metavariables > &runner, const ArrayIndex &array_index) noexcept
Returns the GlobalCache of Component with index array_index.
Definition: MockRuntimeSystemFreeFunctions.hpp:362
Gsl.hpp
LIKELY
#define LIKELY(x)
Definition: Gsl.hpp:67
observers::ArrayComponentId
An ID type that identifies both the parallel component and the index in the parallel component.
Definition: ArrayComponentId.hpp:27
Parallel::simple_action
void simple_action(Proxy &&proxy) noexcept
Invoke a simple action on proxy
Definition: Invoke.hpp:84
make_not_null
gsl::not_null< T * > make_not_null(T *ptr) noexcept
Construct a not_null from a pointer. Often this will be done as an implicit conversion,...
Definition: Gsl.hpp:880
Parallel::ArrayIndex
The array index used for indexing Chare Arrays, mostly an implementation detail.
Definition: ArrayIndex.hpp:28
unordered_map
type_traits
set
TMPL.hpp
observers::Actions::RegisterContributorWithObserver
Register the ArrayComponentId that will send the data to the observer for the given ObservationIdRegi...
Definition: ObserverRegistration.hpp:209
gsl::not_null
Require a pointer to not be a nullptr
Definition: ReadSpecThirdOrderPiecewisePolynomial.hpp:13