ObserverRegistration.hpp
1 // Distributed under the MIT License.
2 // See LICENSE.txt for details.
3 
4 #pragma once
5 
6 #include <cstddef>
7 #include <set>
8 #include <type_traits>
9 #include <unordered_map>
10 #include <unordered_set>
11 
13 #include "DataStructures/Index.hpp"
14 #include "IO/Observer/ArrayComponentId.hpp"
15 #include "IO/Observer/ObserverComponent.hpp"
16 #include "IO/Observer/Tags.hpp"
17 #include "IO/Observer/TypeOfObservation.hpp"
18 #include "Parallel/GlobalCache.hpp"
19 #include "Parallel/Info.hpp"
20 #include "Parallel/Invoke.hpp"
21 #include "Utilities/Gsl.hpp"
22 #include "Utilities/TMPL.hpp"
23 #include "Utilities/TaggedTuple.hpp"
24 
25 namespace observers {
26 /*!
27  * \ingroup ObserversGroup
28  * \brief %Actions used by the observer parallel component
29  */
30 namespace Actions {
31 /// \brief Register an `ArrayComponentId` with a specific
32 /// `ObservationIdRegistrationKey` that will call
33 /// `observers::ThreadedActions::ContributeVolumeData`.
34 ///
35 /// Should be invoked on ObserverWriter by the component that will be
36 /// contributing the data.
38  public:
39  template <typename ParallelComponent, typename DbTagsList,
40  typename Metavariables, typename ArrayIndex>
41  static void apply(db::DataBox<DbTagsList>& box,
42  const Parallel::GlobalCache<Metavariables>& /*cache*/,
43  const ArrayIndex& /*array_index*/,
44  const observers::ObservationKey& observation_key,
45  const ArrayComponentId& id_of_caller) noexcept {
46  if constexpr (tmpl::list_contains_v<
48  db::mutate<Tags::ExpectedContributorsForObservations>(
49  make_not_null(&box),
50  [&id_of_caller, &observation_key](
53  volume_observers_registered) noexcept {
54  if (volume_observers_registered->find(observation_key) ==
55  volume_observers_registered->end()) {
56  (*volume_observers_registered)[observation_key] =
57  std::unordered_set<ArrayComponentId>{};
58  }
59 
60  if (UNLIKELY(
61  volume_observers_registered->at(observation_key)
62  .find(id_of_caller) !=
63  volume_observers_registered->at(observation_key).end())) {
64  ERROR("Trying to insert a Observer component more than once: "
65  << id_of_caller);
66  }
67 
68  volume_observers_registered->at(observation_key)
69  .insert(id_of_caller);
70  });
71  } else {
72  (void)box;
73  (void)observation_key;
74  (void)id_of_caller;
75  ERROR(
76  "Could not find tag "
77  "observers::Tags::ExpectedContributorsForObservations in the "
78  "DataBox.");
79  }
80  }
81 };
82 
83 /// \brief Deregister an `ArrayComponentId` with a specific
84 /// `ObservationIdRegistrationKey` that will no longer call
85 /// `observers::ThreadedActions::ContributeVolumeData`
86 ///
87 /// Should be invoked on ObserverWriter by the component that was previously
88 /// registered with
89 /// `observers::Actions::RegisterVolumeContributorWithObserverWriter`
91  public:
92  template <typename ParallelComponent, typename DbTagsList,
93  typename Metavariables, typename ArrayIndex>
94  static void apply(db::DataBox<DbTagsList>& box,
95  const Parallel::GlobalCache<Metavariables>& /*cache*/,
96  const ArrayIndex& /*array_index*/,
97  const observers::ObservationKey& observation_key,
98  const ArrayComponentId& id_of_caller) noexcept {
99  if constexpr (tmpl::list_contains_v<
101  db::mutate<Tags::ExpectedContributorsForObservations>(
102  make_not_null(&box),
103  [&id_of_caller, &observation_key](
106  volume_observers_registered) noexcept {
107  if (UNLIKELY(volume_observers_registered->find(observation_key) ==
108  volume_observers_registered->end())) {
109  ERROR(
110  "Trying to deregister a component associated with an "
111  "unregistered observation key: "
112  << observation_key);
113  }
114 
115  if (UNLIKELY(
116  volume_observers_registered->at(observation_key)
117  .find(id_of_caller) ==
118  volume_observers_registered->at(observation_key).end())) {
119  ERROR("Trying to deregister an unregistered component: "
120  << id_of_caller);
121  }
122 
123  volume_observers_registered->at(observation_key)
124  .erase(id_of_caller);
125  if (UNLIKELY(
126  volume_observers_registered->at(observation_key).size() ==
127  0)) {
128  volume_observers_registered->erase(observation_key);
129  }
130  });
131  } else {
132  (void)box;
133  (void)observation_key;
134  (void)id_of_caller;
135  ERROR(
136  "Could not find tag "
137  "observers::Tags::ExpectedContributorsForObservations in the "
138  "DataBox.");
139  }
140  }
141 };
142 
143 /*!
144  * \brief Register a node with the node that writes the reduction data to disk.
145  */
147  template <typename ParallelComponent, typename DbTagsList,
148  typename Metavariables, typename ArrayIndex>
149  static void apply(db::DataBox<DbTagsList>& box,
151  const ArrayIndex& /*array_index*/,
152  const observers::ObservationKey& observation_key,
153  const size_t caller_node_id) noexcept {
154  if constexpr (tmpl::list_contains_v<
156  auto& my_proxy =
157  Parallel::get_parallel_component<ParallelComponent>(cache);
158  const auto node_id =
159  static_cast<size_t>(Parallel::my_node(*my_proxy.ckLocalBranch()));
160  ASSERT(node_id == 0, "Only node zero, not node "
161  << node_id
162  << ", should be called from another node");
163 
164  db::mutate<Tags::NodesExpectedToContributeReductions>(
165  make_not_null(&box),
166  [&caller_node_id, &observation_key](
167  const gsl::not_null<
169  reduction_observers_registered_nodes) noexcept {
170  if (reduction_observers_registered_nodes->find(observation_key) ==
171  reduction_observers_registered_nodes->end()) {
172  (*reduction_observers_registered_nodes)[observation_key] =
173  std::set<size_t>{};
174  }
175  auto& registered_nodes_for_key =
176  reduction_observers_registered_nodes->at(observation_key);
177  if (UNLIKELY(registered_nodes_for_key.find(caller_node_id) !=
178  registered_nodes_for_key.end())) {
179  ERROR("Already registered node "
180  << caller_node_id << " for reduction observations.");
181  }
182  registered_nodes_for_key.insert(caller_node_id);
183  });
184  } else {
185  (void)box;
186  (void)observation_key;
187  (void)caller_node_id;
188  ERROR(
189  "Do not have tag "
190  "observers::Tags::NodesExpectedToContributeReductions "
191  "in the DataBox. This means components are registering for "
192  "reductions before initialization is complete.");
193  }
194  }
195 };
196 
197 /*!
198  * \brief Deregister a node with the node that writes the reduction data to
199  * disk.
200  */
202  template <typename ParallelComponent, typename DbTagsList,
203  typename Metavariables, typename ArrayIndex>
204  static void apply(db::DataBox<DbTagsList>& box,
206  const ArrayIndex& /*array_index*/,
207  const observers::ObservationKey& observation_key,
208  const size_t caller_node_id) noexcept {
209  if constexpr (tmpl::list_contains_v<
211  auto& my_proxy =
212  Parallel::get_parallel_component<ParallelComponent>(cache);
213  const auto node_id =
214  static_cast<size_t>(Parallel::my_node(*my_proxy.ckLocalBranch()));
215  ASSERT(node_id == 0,
216  "Only node zero, not node "
217  << node_id
218  << " should deregister other nodes in the reduction");
219 
220  db::mutate<Tags::NodesExpectedToContributeReductions>(
221  make_not_null(&box),
222  [&caller_node_id, &observation_key](
223  const gsl::not_null<
225  reduction_observers_registered_nodes) noexcept {
226  if (UNLIKELY(reduction_observers_registered_nodes->find(
227  observation_key) ==
228  reduction_observers_registered_nodes->end())) {
229  ERROR(
230  "Trying to deregister a node associated with an unregistered "
231  "observation key: "
232  << observation_key);
233  }
234  auto& registered_nodes_for_key =
235  reduction_observers_registered_nodes->at(observation_key);
236  if (UNLIKELY(registered_nodes_for_key.find(caller_node_id) ==
237  registered_nodes_for_key.end())) {
238  ERROR("Trying to deregister an unregistered node: "
239  << caller_node_id);
240  }
241  registered_nodes_for_key.erase(caller_node_id);
242  if (UNLIKELY(registered_nodes_for_key.size() == 0)) {
243  reduction_observers_registered_nodes->erase(observation_key);
244  }
245  });
246  } else {
247  (void)box;
248  (void)observation_key;
249  (void)caller_node_id;
250  ERROR(
251  "Do not have tag "
252  "observers::Tags::NodesExpectedToContributeReductions "
253  "in the DataBox. This means components are registering for "
254  "reductions before initialization is complete.");
255  }
256  }
257 };
258 
259 /// \brief Register an `ArrayComponentId` that will call
260 /// `observers::ThreadedActions::WriteReductionData` or
261 /// `observers::ThreadedActions::ContributeReductionData` for a specific
262 /// `ObservationIdRegistrationKey`
263 ///
264 /// Should be invoked on ObserverWriter by the component that will be
265 /// contributing the data.
267  public:
268  template <typename ParallelComponent, typename DbTagsList,
269  typename Metavariables, typename ArrayIndex>
270  static void apply(db::DataBox<DbTagsList>& box,
272  const ArrayIndex& /*array_index*/,
273  const observers::ObservationKey& observation_key,
274  const ArrayComponentId& id_of_caller) noexcept {
275  if constexpr (tmpl::list_contains_v<
277  auto& my_proxy =
278  Parallel::get_parallel_component<ParallelComponent>(cache);
279  const auto node_id =
280  static_cast<size_t>(Parallel::my_node(*my_proxy.ckLocalBranch()));
281  db::mutate<Tags::ExpectedContributorsForObservations>(
282  make_not_null(&box),
283  [&cache, &id_of_caller, &node_id, &observation_key](
286  reduction_observers_registered) noexcept {
287  if (reduction_observers_registered->find(observation_key) ==
288  reduction_observers_registered->end()) {
289  (*reduction_observers_registered)[observation_key] =
290  std::unordered_set<ArrayComponentId>{};
295  observation_key, node_id);
296  }
297 
298  if (LIKELY(reduction_observers_registered->at(observation_key)
299  .find(id_of_caller) ==
300  reduction_observers_registered->at(observation_key)
301  .end())) {
302  reduction_observers_registered->at(observation_key)
303  .insert(id_of_caller);
304  } else {
305  ERROR("Trying to insert a Observer component more than once: "
306  << id_of_caller
307  << " with observation key: " << observation_key);
308  }
309  });
310  } else {
311  (void)box;
312  (void)cache;
313  (void)observation_key;
314  (void)id_of_caller;
315  ERROR(
316  "Could not find tag "
317  "observers::Tags::ExpectedContributorsForObservations in the "
318  "DataBox.");
319  }
320  }
321 };
322 
323 /// \brief Deregister an `ArrayComponentId` that will no longer call
324 /// `observers::ThreadedActions::WriteReductionData` or
325 /// `observers::ThreadedActions::ContributeReductionData` for a specific
326 /// `ObservationIdRegistrationKey`
327 ///
328 /// Should be invoked on ObserverWriter by the component that was previously
329 /// registered by
330 /// `observers::Actions::RegisterReductionContributorWithObserverWriter`.
332  public:
333  template <typename ParallelComponent, typename DbTagsList,
334  typename Metavariables, typename ArrayIndex>
335  static void apply(db::DataBox<DbTagsList>& box,
337  const ArrayIndex& /*array_index*/,
338  const observers::ObservationKey& observation_key,
339  const ArrayComponentId& id_of_caller) noexcept {
340  if constexpr (tmpl::list_contains_v<
342  auto& my_proxy =
343  Parallel::get_parallel_component<ParallelComponent>(cache);
344  const auto node_id =
345  static_cast<size_t>(Parallel::my_node(*my_proxy.ckLocalBranch()));
346  db::mutate<Tags::ExpectedContributorsForObservations>(
347  make_not_null(&box),
348  [&cache, &id_of_caller, &node_id, &observation_key](
351  reduction_observers_registered) noexcept {
352  if (UNLIKELY(
353  reduction_observers_registered->find(observation_key) ==
354  reduction_observers_registered->end())) {
355  ERROR(
356  "Trying to deregister a component associated with an "
357  "unregistered observation key: "
358  << observation_key);
359  }
360  auto& contributors_for_key =
361  reduction_observers_registered->at(observation_key);
362  if (UNLIKELY(contributors_for_key.find(id_of_caller) ==
363  contributors_for_key.end())) {
364  ERROR("Trying to deregister an unregistered component: "
365  << id_of_caller
366  << " with observation key: " << observation_key);
367  }
368  contributors_for_key.erase(id_of_caller);
369  if (UNLIKELY(contributors_for_key.size() == 0)) {
370  Parallel::simple_action<
371  Actions::DeregisterReductionNodeWithWritingNode>(
372  Parallel::get_parallel_component<
373  ObserverWriter<Metavariables>>(cache)[0],
374  observation_key, node_id);
375  reduction_observers_registered->erase(observation_key);
376  }
377  });
378  } else {
379  (void)box;
380  (void)cache;
381  (void)observation_key;
382  (void)id_of_caller;
383  ERROR(
384  "Could not find tag "
385  "observers::Tags::ExpectedContributorsForObservations in the "
386  "DataBox.");
387  }
388  }
389 };
390 
391 /*!
392  * \brief Register the `ArrayComponentId` that will send the data to the
393  * observer for the given `ObservationIdRegistrationKey`
394  *
395  * Should be invoked on the `Observer` by the contributing component.
396  */
398  template <typename ParallelComponent, typename DbTagList,
399  typename Metavariables, typename ArrayIndex>
400  static void apply(db::DataBox<DbTagList>& box,
402  const ArrayIndex& array_index,
403  const observers::ObservationKey& observation_key,
404  const observers::ArrayComponentId& component_id,
405  const TypeOfObservation& type_of_observation) noexcept {
406  if constexpr (tmpl::list_contains_v<
407  DbTagList,
409  bool observation_key_already_registered = true;
410  db::mutate<observers::Tags::ExpectedContributorsForObservations>(
411  make_not_null(&box),
412  [&component_id, &observation_key,
413  &observation_key_already_registered](
416  array_component_ids) noexcept {
417  observation_key_already_registered =
418  (array_component_ids->find(observation_key) !=
419  array_component_ids->end());
420  if (UNLIKELY(observation_key_already_registered and
421  array_component_ids->at(observation_key)
422  .find(component_id) !=
423  array_component_ids->at(observation_key).end())) {
424  ERROR(
425  "Trying to insert a component_id more than once for "
426  "observation. This means an element is registering itself "
427  "with the observers more than once. The component_id is "
428  << component_id << " and the observation key is "
429  << observation_key);
430  }
431  array_component_ids->operator[](observation_key)
432  .insert(component_id);
433  });
434 
435  if (observation_key_already_registered) {
436  // We only need to register with the observer writer on the first call
437  // of this action. Later calls will have already been registered.
438  return;
439  }
440 
441  auto& observer_writer =
444  .ckLocalBranch();
445 
446  switch (type_of_observation) {
447  case TypeOfObservation::Reduction:
450  observer_writer, observation_key,
452  Parallel::ArrayIndex<ArrayIndex>(array_index)});
453  return;
454  case TypeOfObservation::Volume:
457  observer_writer, observation_key,
459  Parallel::ArrayIndex<ArrayIndex>(array_index)});
460  return;
461  default:
462  ERROR(
463  "Registering an unknown TypeOfObservation. Should be one of "
464  "'Reduction' or 'Volume'");
465  };
466  } else {
467  ERROR(
468  "The DataBox must contain the tag "
469  "observers::Tags::ExpectedContributorsForObservations when the "
470  "action "
471  "RegisterContributorWithObserver is called.");
472  }
473  }
474 };
475 
476 /*!
477  * \brief Deregister the `ArrayComponentId` that will no longer send the data to
478  * the observer for the given `ObservationIdRegistrationKey`
479  *
480  * Should be invoked on the `Observer` by the component that was previously
481  * registered with `observers::Actions::RegisterContributorWithObserver`.
482  */
484  template <typename ParallelComponent, typename DbTagList,
485  typename Metavariables, typename ArrayIndex>
486  static void apply(db::DataBox<DbTagList>& box,
488  const ArrayIndex& array_index,
489  const observers::ObservationKey& observation_key,
490  const observers::ArrayComponentId& component_id,
491  const TypeOfObservation& type_of_observation) noexcept {
492  if constexpr (tmpl::list_contains_v<
493  DbTagList,
495  bool all_array_components_have_been_deregistered = false;
496  db::mutate<observers::Tags::ExpectedContributorsForObservations>(
497  make_not_null(&box),
498  [&component_id, &observation_key,
499  &all_array_components_have_been_deregistered](
502  array_component_ids) noexcept {
503  if (UNLIKELY(array_component_ids->find(observation_key) ==
504  array_component_ids->end())) {
505  ERROR(
506  "Trying to deregister a component associated with an "
507  "unregistered observation key: "
508  << observation_key);
509  }
510  auto& component_ids_for_key =
511  array_component_ids->at(observation_key);
512  if (UNLIKELY(component_ids_for_key.find(component_id) ==
513  array_component_ids->at(observation_key).end())) {
514  ERROR("Trying to deregister an unregistered component: "
515  << component_id
516  << " with observation key: " << observation_key);
517  }
518  component_ids_for_key.erase(component_id);
519  if (UNLIKELY(component_ids_for_key.size() == 0)) {
520  array_component_ids->erase(observation_key);
521  all_array_components_have_been_deregistered = true;
522  }
523  });
524 
525  if (not all_array_components_have_been_deregistered) {
526  // Only deregister with the observer writer if this deregistration
527  // removes the last component for the provided `observation_key`.
528  return;
529  }
530 
531  auto& observer_writer =
534  .ckLocalBranch();
535 
536  switch (type_of_observation) {
537  case TypeOfObservation::Reduction:
540  observer_writer, observation_key,
542  Parallel::ArrayIndex<ArrayIndex>(array_index)});
543  return;
544  case TypeOfObservation::Volume:
547  observer_writer, observation_key,
549  Parallel::ArrayIndex<ArrayIndex>(array_index)});
550  return;
551  default:
552  ERROR(
553  "Attempting to deregister an unknown TypeOfObservation. "
554  "Should be one of 'Reduction' or 'Volume'");
555  };
556  } else {
557  ERROR(
558  "The DataBox must contain the tag "
559  "observers::Tags::ExpectedContributorsForObservations when the "
560  "action DeregisterContributorWithObserver is called.");
561  }
562  }
563 };
564 } // namespace Actions
565 } // namespace observers
UNLIKELY
#define UNLIKELY(x)
Definition: Gsl.hpp:73
Parallel::GlobalCache
Definition: ElementReceiveInterpPoints.hpp:15
observers::Actions::RegisterVolumeContributorWithObserverWriter
Register an ArrayComponentId with a specific ObservationIdRegistrationKey that will call observers::T...
Definition: ObserverRegistration.hpp:37
observers::Actions::RegisterReductionContributorWithObserverWriter
Register an ArrayComponentId that will call observers::ThreadedActions::WriteReductionData or observe...
Definition: ObserverRegistration.hpp:266
unordered_set
GlobalCache.hpp
Parallel::get_parallel_component
auto get_parallel_component(GlobalCache< Metavariables > &cache) noexcept -> Parallel::proxy_from_parallel_component< GlobalCache_detail::get_component_if_mocked< typename Metavariables::component_list, ParallelComponentTag >> &
Access the Charm++ proxy associated with a ParallelComponent.
Definition: GlobalCache.hpp:535
observers::ObservationKey
Used as a key in maps to keep track of how many elements have registered.
Definition: ObservationId.hpp:28
Info.hpp
observers::TypeOfObservation
TypeOfObservation
Specifies the type of observation.
Definition: TypeOfObservation.hpp:18
observers::Actions::DeregisterReductionContributorWithObserverWriter
Deregister an ArrayComponentId that will no longer call observers::ThreadedActions::WriteReductionDat...
Definition: ObserverRegistration.hpp:331
observers::Actions::DeregisterContributorWithObserver
Deregister the ArrayComponentId that will no longer send the data to the observer for the given Obser...
Definition: ObserverRegistration.hpp:483
observers::Actions::DeregisterReductionNodeWithWritingNode
Deregister a node with the node that writes the reduction data to disk.
Definition: ObserverRegistration.hpp:201
ERROR
#define ERROR(m)
prints an error message to the standard error stream and aborts the program.
Definition: Error.hpp:37
observers::ObserverWriter
The nodegroup parallel component that is responsible for writing data to disk.
Definition: ObserverComponent.hpp:51
std::add_pointer_t
Parallel::my_node
int my_node(const DistribObject &distributed_object) noexcept
Index of my node.
Definition: Info.hpp:51
DataBox.hpp
cstddef
observers::Tags::NodesExpectedToContributeReductions
The set of nodes that are registered with each ObservationIdRegistrationKey for writing reduction dat...
Definition: Tags.hpp:69
Index.hpp
observers::Actions::RegisterReductionNodeWithWritingNode
Register a node with the node that writes the reduction data to disk.
Definition: ObserverRegistration.hpp:146
observers::Tags::ExpectedContributorsForObservations
The set of ArrayComponentIds that will contribute to each ObservationId for reduction.
Definition: Tags.hpp:35
ASSERT
#define ASSERT(a, m)
Assert that an expression should be true.
Definition: Assert.hpp:49
ActionTesting::cache
Parallel::GlobalCache< Metavariables > & cache(MockRuntimeSystem< Metavariables > &runner, const ArrayIndex &array_index) noexcept
Returns the GlobalCache of Component with index array_index.
Definition: MockRuntimeSystemFreeFunctions.hpp:382
Gsl.hpp
LIKELY
#define LIKELY(x)
Definition: Gsl.hpp:67
observers::ArrayComponentId
An ID type that identifies both the parallel component and the index in the parallel component.
Definition: ArrayComponentId.hpp:27
Parallel::simple_action
void simple_action(Proxy &&proxy) noexcept
Invoke a simple action on proxy
Definition: Invoke.hpp:62
observers::Actions::DeregisterVolumeContributorWithObserverWriter
Deregister an ArrayComponentId with a specific ObservationIdRegistrationKey that will no longer call ...
Definition: ObserverRegistration.hpp:90
make_not_null
gsl::not_null< T * > make_not_null(T *ptr) noexcept
Construct a not_null from a pointer. Often this will be done as an implicit conversion,...
Definition: Gsl.hpp:880
Parallel::ArrayIndex
The array index used for indexing Chare Arrays, mostly an implementation detail.
Definition: ArrayIndex.hpp:28
unordered_map
type_traits
set
TMPL.hpp
observers::Actions::RegisterContributorWithObserver
Register the ArrayComponentId that will send the data to the observer for the given ObservationIdRegi...
Definition: ObserverRegistration.hpp:397
gsl::not_null
Require a pointer to not be a nullptr
Definition: ReadSpecThirdOrderPiecewisePolynomial.hpp:13