ObserverRegistration.hpp
1 // Distributed under the MIT License.
2 // See LICENSE.txt for details.
3 
4 #pragma once
5 
6 #include <cstddef>
7 #include <set>
8 #include <type_traits>
9 #include <unordered_map>
10 #include <unordered_set>
11 
14 #include "DataStructures/Index.hpp"
15 #include "IO/Observer/ArrayComponentId.hpp"
16 #include "IO/Observer/ObserverComponent.hpp"
17 #include "IO/Observer/Tags.hpp"
18 #include "IO/Observer/TypeOfObservation.hpp"
19 #include "Parallel/GlobalCache.hpp"
20 #include "Parallel/Info.hpp"
21 #include "Parallel/Invoke.hpp"
22 #include "Utilities/Gsl.hpp"
23 #include "Utilities/TMPL.hpp"
24 #include "Utilities/TaggedTuple.hpp"
25 
26 namespace observers {
27 /*!
28  * \ingroup ObserversGroup
29  * \brief %Actions used by the observer parallel component
30  */
31 namespace Actions {
32 /// \brief Register an `ArrayComponentId` with a specific
33 /// `ObservationIdRegistrationKey` that will call
34 /// `observers::ThreadedActions::ContributeVolumeData`.
35 ///
36 /// Should be invoked on ObserverWriter by the component that will be
37 /// contributing the data.
39  public:
40  template <typename ParallelComponent, typename DbTagsList,
41  typename Metavariables, typename ArrayIndex>
42  static void apply(db::DataBox<DbTagsList>& box,
43  const Parallel::GlobalCache<Metavariables>& /*cache*/,
44  const ArrayIndex& /*array_index*/,
45  const observers::ObservationKey& observation_key,
46  const ArrayComponentId& id_of_caller) noexcept {
47  if constexpr (tmpl::list_contains_v<
49  db::mutate<Tags::ExpectedContributorsForObservations>(
50  make_not_null(&box),
51  [&id_of_caller, &observation_key](
54  volume_observers_registered) noexcept {
55  if (volume_observers_registered->find(observation_key) ==
56  volume_observers_registered->end()) {
57  (*volume_observers_registered)[observation_key] =
58  std::unordered_set<ArrayComponentId>{};
59  }
60 
61  if (UNLIKELY(
62  volume_observers_registered->at(observation_key)
63  .find(id_of_caller) !=
64  volume_observers_registered->at(observation_key).end())) {
65  ERROR("Trying to insert a Observer component more than once: "
66  << id_of_caller);
67  }
68 
69  volume_observers_registered->at(observation_key)
70  .insert(id_of_caller);
71  });
72  } else {
73  (void)box;
74  (void)observation_key;
75  (void)id_of_caller;
76  ERROR(
77  "Could not find tag "
78  "observers::Tags::ExpectedContributorsForObservations in the "
79  "DataBox.");
80  }
81  }
82 };
83 
84 /*!
85  * \brief Register a node with the node that writes the reduction data to disk.
86  */
88  template <typename ParallelComponent, typename DbTagsList,
89  typename Metavariables, typename ArrayIndex>
90  static void apply(db::DataBox<DbTagsList>& box,
92  const ArrayIndex& /*array_index*/,
93  const observers::ObservationKey& observation_key,
94  const size_t caller_node_id) noexcept {
95  if constexpr (tmpl::list_contains_v<
97  const auto node_id = static_cast<size_t>(Parallel::my_node());
98  ASSERT(node_id == 0, "Only node zero, not node "
99  << node_id
100  << ", should be called from another node");
101 
102  db::mutate<Tags::NodesExpectedToContributeReductions>(
103  make_not_null(&box),
104  [&caller_node_id, &observation_key](
105  const gsl::not_null<
107  reduction_observers_registered_nodes) noexcept {
108  if (reduction_observers_registered_nodes->find(observation_key) ==
109  reduction_observers_registered_nodes->end()) {
110  (*reduction_observers_registered_nodes)[observation_key] =
111  std::set<size_t>{};
112  }
113  if (UNLIKELY(
114  reduction_observers_registered_nodes->at(observation_key)
115  .find(caller_node_id) !=
116  reduction_observers_registered_nodes->at(observation_key)
117  .end())) {
118  ERROR("Already registered node "
119  << caller_node_id << " for reduction observations.");
120  }
121  reduction_observers_registered_nodes->at(observation_key)
122  .insert(caller_node_id);
123  });
124  } else {
125  (void)box;
126  (void)observation_key;
127  (void)caller_node_id;
128  ERROR(
129  "Do not have tag "
130  "observers::Tags::NodesExpectedToContributeReductions "
131  "in the DataBox. This means components are registering for "
132  "reductions before initialization is complete.");
133  }
134  }
135 };
136 
137 /// \brief Register an `ArrayComponentId` that will call
138 /// `observers::ThreadedActions::WriteReductionData` or
139 /// `observers::ThreadedActions::ContributeReductionData` for a specific
140 /// `ObservationIdRegistrationKey`
141 ///
142 /// Should be invoked on ObserverWriter by the component that will be
143 /// contributing the data.
145  public:
146  template <typename ParallelComponent, typename DbTagsList,
147  typename Metavariables, typename ArrayIndex>
148  static void apply(db::DataBox<DbTagsList>& box,
150  const ArrayIndex& /*array_index*/,
151  const observers::ObservationKey& observation_key,
152  const ArrayComponentId& id_of_caller) noexcept {
153  if constexpr (tmpl::list_contains_v<
155  const auto node_id = static_cast<size_t>(Parallel::my_node());
156  db::mutate<Tags::ExpectedContributorsForObservations>(
157  make_not_null(&box),
158  [&cache, &id_of_caller, &node_id, &observation_key](
161  reduction_observers_registered) noexcept {
162  if (reduction_observers_registered->find(observation_key) ==
163  reduction_observers_registered->end()) {
164  (*reduction_observers_registered)[observation_key] =
165  std::unordered_set<ArrayComponentId>{};
170  observation_key, node_id);
171  }
172 
173  if (LIKELY(reduction_observers_registered->at(observation_key)
174  .find(id_of_caller) ==
175  reduction_observers_registered->at(observation_key)
176  .end())) {
177  reduction_observers_registered->at(observation_key)
178  .insert(id_of_caller);
179  } else {
180  ERROR("Trying to insert a Observer component more than once: "
181  << id_of_caller
182  << " with observation key: " << observation_key);
183  }
184  });
185  } else {
186  (void)box;
187  (void)cache;
188  (void)observation_key;
189  (void)id_of_caller;
190  ERROR(
191  "Could not find tag "
192  "observers::Tags::ExpectedContributorsForObservations in the "
193  "DataBox.");
194  }
195  }
196 };
197 
198 /*!
199  * \brief Register the `ArrayComponentId` that will send the data to the
200  * observer for the given `ObservationIdRegistrationKey`
201  *
202  * Should be invoked on the `Observer` by the contributing component.
203  */
205  template <typename ParallelComponent, typename DbTagList,
206  typename Metavariables, typename ArrayIndex>
207  static void apply(db::DataBox<DbTagList>& box,
209  const ArrayIndex& array_index,
210  const observers::ObservationKey& observation_key,
211  const observers::ArrayComponentId& component_id,
212  const TypeOfObservation& type_of_observation) noexcept {
213  if constexpr (tmpl::list_contains_v<
214  DbTagList,
216  bool observation_key_already_registered = true;
217  db::mutate<observers::Tags::ExpectedContributorsForObservations>(
218  make_not_null(&box),
219  [&component_id, &observation_key,
220  &observation_key_already_registered](
223  array_component_ids) noexcept {
224  observation_key_already_registered =
225  (array_component_ids->find(observation_key) !=
226  array_component_ids->end());
227  if (UNLIKELY(observation_key_already_registered and
228  array_component_ids->at(observation_key)
229  .find(component_id) !=
230  array_component_ids->at(observation_key).end())) {
231  ERROR(
232  "Trying to insert a component_id more than once for "
233  "observation. This means an element is registering itself "
234  "with the observers more than once. The component_id is "
235  << component_id << " and the observation key is "
236  << observation_key);
237  }
238  array_component_ids->operator[](observation_key)
239  .insert(component_id);
240  });
241 
242  if (observation_key_already_registered) {
243  // We only need to register with the observer writer on the first call
244  // of this action. Later calls will have already been registered.
245  return;
246  }
247 
248  auto& observer_writer =
251  .ckLocalBranch();
252 
253  switch (type_of_observation) {
254  case TypeOfObservation::Reduction:
257  observer_writer, observation_key,
259  Parallel::ArrayIndex<ArrayIndex>(array_index)});
260  return;
261  case TypeOfObservation::Volume:
264  observer_writer, observation_key,
266  Parallel::ArrayIndex<ArrayIndex>(array_index)});
267  return;
268  default:
269  ERROR(
270  "Registering an unknown TypeOfObservation. Should be one of "
271  "'Reduction' or 'Volume'");
272  };
273  } else {
274  ERROR(
275  "The DataBox must contain the tag "
276  "observers::Tags::ExpectedContributorsForObservations when the "
277  "action "
278  "RegisterContributorWithObserver is called.");
279  }
280  }
281 };
282 } // namespace Actions
283 } // namespace observers
DataBoxTag.hpp
UNLIKELY
#define UNLIKELY(x)
Definition: Gsl.hpp:73
Parallel::GlobalCache
Definition: ElementReceiveInterpPoints.hpp:16
observers::Actions::RegisterVolumeContributorWithObserverWriter
Register an ArrayComponentId with a specific ObservationIdRegistrationKey that will call observers::T...
Definition: ObserverRegistration.hpp:38
observers::Actions::RegisterReductionContributorWithObserverWriter
Register an ArrayComponentId that will call observers::ThreadedActions::WriteReductionData or observe...
Definition: ObserverRegistration.hpp:144
unordered_set
GlobalCache.hpp
Parallel::get_parallel_component
auto get_parallel_component(GlobalCache< Metavariables > &cache) noexcept -> Parallel::proxy_from_parallel_component< GlobalCache_detail::get_component_if_mocked< typename Metavariables::component_list, ParallelComponentTag >> &
Access the Charm++ proxy associated with a ParallelComponent.
Definition: GlobalCache.hpp:223
observers::ObservationKey
Used as a key in maps to keep track of how many elements have registered.
Definition: ObservationId.hpp:28
Info.hpp
observers::TypeOfObservation
TypeOfObservation
Specifies the type of observation.
Definition: TypeOfObservation.hpp:18
ERROR
#define ERROR(m)
prints an error message to the standard error stream and aborts the program.
Definition: Error.hpp:36
observers::ObserverWriter
The nodegroup parallel component that is responsible for writing data to disk.
Definition: ObserverComponent.hpp:48
db::apply
constexpr auto apply(F &&f, const DataBox< BoxTags > &box, Args &&... args) noexcept
Apply the invokable f with argument Tags TagsList from DataBox box
Definition: DataBox.hpp:1424
std::add_pointer_t
Parallel::my_node
int my_node()
Index of my node.
Definition: Info.hpp:34
DataBox.hpp
cstddef
observers::Tags::NodesExpectedToContributeReductions
The set of nodes that are registered with each ObservationIdRegistrationKey for writing reduction dat...
Definition: Tags.hpp:69
Index.hpp
observers::Actions::RegisterReductionNodeWithWritingNode
Register a node with the node that writes the reduction data to disk.
Definition: ObserverRegistration.hpp:87
observers::Tags::ExpectedContributorsForObservations
The set of ArrayComponentIds that will contribute to each ObservationId for reduction.
Definition: Tags.hpp:35
ASSERT
#define ASSERT(a, m)
Assert that an expression should be true.
Definition: Assert.hpp:51
Gsl.hpp
LIKELY
#define LIKELY(x)
Definition: Gsl.hpp:67
observers::ArrayComponentId
An ID type that identifies both the parallel component and the index in the parallel component.
Definition: ArrayComponentId.hpp:27
Parallel::simple_action
void simple_action(Proxy &&proxy) noexcept
Invoke a simple action on proxy
Definition: Invoke.hpp:82
make_not_null
gsl::not_null< T * > make_not_null(T *ptr) noexcept
Construct a not_null from a pointer. Often this will be done as an implicit conversion,...
Definition: Gsl.hpp:880
Parallel::ArrayIndex
The array index used for indexing Chare Arrays, mostly an implementation detail.
Definition: ArrayIndex.hpp:27
unordered_map
db::DataBox
Definition: InterpolationTargetWedgeSectionTorus.hpp:24
type_traits
set
TMPL.hpp
observers::Actions::RegisterContributorWithObserver
Register the ArrayComponentId that will send the data to the observer for the given ObservationIdRegi...
Definition: ObserverRegistration.hpp:204
gsl::not_null
Require a pointer to not be a nullptr
Definition: Gsl.hpp:183