Line data Source code
1 0 : // Distributed under the MIT License.
2 : // See LICENSE.txt for details.
3 :
4 : #pragma once
5 :
6 : #include <cstddef>
7 : #include <set>
8 : #include <type_traits>
9 : #include <unordered_map>
10 : #include <unordered_set>
11 :
12 : #include "DataStructures/DataBox/DataBox.hpp"
13 : #include "DataStructures/Index.hpp"
14 : #include "IO/Observer/ObserverComponent.hpp"
15 : #include "IO/Observer/Tags.hpp"
16 : #include "IO/Observer/TypeOfObservation.hpp"
17 : #include "Parallel/ArrayComponentId.hpp"
18 : #include "Parallel/GlobalCache.hpp"
19 : #include "Parallel/Info.hpp"
20 : #include "Parallel/Invoke.hpp"
21 : #include "Parallel/Local.hpp"
22 : #include "Utilities/Gsl.hpp"
23 : #include "Utilities/TMPL.hpp"
24 : #include "Utilities/TaggedTuple.hpp"
25 :
26 : namespace observers {
27 : /*!
28 : * \ingroup ObserversGroup
29 : * \brief %Actions used by the observer parallel component
30 : */
31 : namespace Actions {
32 : /// \brief Register an `ArrayComponentId` with a specific
33 : /// `ObservationIdRegistrationKey` that will call
34 : /// `observers::ThreadedActions::ContributeVolumeData`.
35 : ///
36 : /// Should be invoked on ObserverWriter by the component that will be
37 : /// contributing the data.
38 1 : struct RegisterVolumeContributorWithObserverWriter {
39 : public:
40 : template <typename ParallelComponent, typename DbTagsList,
41 : typename Metavariables, typename ArrayIndex>
42 0 : static void apply(db::DataBox<DbTagsList>& box,
43 : const Parallel::GlobalCache<Metavariables>& /*cache*/,
44 : const ArrayIndex& /*array_index*/,
45 : const observers::ObservationKey& observation_key,
46 : const Parallel::ArrayComponentId& id_of_caller) {
47 : db::mutate<Tags::ExpectedContributorsForObservations>(
48 : [&id_of_caller,
49 : &observation_key](const gsl::not_null<std::unordered_map<
50 : ObservationKey,
51 : std::unordered_set<Parallel::ArrayComponentId>>*>
52 : volume_observers_registered) {
53 : if (volume_observers_registered->find(observation_key) ==
54 : volume_observers_registered->end()) {
55 : (*volume_observers_registered)[observation_key] =
56 : std::unordered_set<Parallel::ArrayComponentId>{};
57 : }
58 :
59 : if (UNLIKELY(
60 : volume_observers_registered->at(observation_key)
61 : .find(id_of_caller) !=
62 : volume_observers_registered->at(observation_key).end())) {
63 : ERROR("Trying to insert a Observer component more than once: "
64 : << id_of_caller);
65 : }
66 :
67 : volume_observers_registered->at(observation_key).insert(id_of_caller);
68 : },
69 : make_not_null(&box));
70 : }
71 : };
72 :
73 : /// \brief Deregister an `ArrayComponentId` with a specific
74 : /// `ObservationIdRegistrationKey` that will no longer call
75 : /// `observers::ThreadedActions::ContributeVolumeData`
76 : ///
77 : /// Should be invoked on ObserverWriter by the component that was previously
78 : /// registered with
79 : /// `observers::Actions::RegisterVolumeContributorWithObserverWriter`
80 1 : struct DeregisterVolumeContributorWithObserverWriter {
81 : public:
82 : template <typename ParallelComponent, typename DbTagsList,
83 : typename Metavariables, typename ArrayIndex>
84 0 : static void apply(db::DataBox<DbTagsList>& box,
85 : const Parallel::GlobalCache<Metavariables>& /*cache*/,
86 : const ArrayIndex& /*array_index*/,
87 : const observers::ObservationKey& observation_key,
88 : const Parallel::ArrayComponentId& id_of_caller) {
89 : db::mutate<Tags::ExpectedContributorsForObservations>(
90 : [&id_of_caller,
91 : &observation_key](const gsl::not_null<std::unordered_map<
92 : ObservationKey,
93 : std::unordered_set<Parallel::ArrayComponentId>>*>
94 : volume_observers_registered) {
95 : if (UNLIKELY(volume_observers_registered->find(observation_key) ==
96 : volume_observers_registered->end())) {
97 : ERROR(
98 : "Trying to deregister a component associated with an "
99 : "unregistered observation key: "
100 : << observation_key);
101 : }
102 :
103 : if (UNLIKELY(
104 : volume_observers_registered->at(observation_key)
105 : .find(id_of_caller) ==
106 : volume_observers_registered->at(observation_key).end())) {
107 : ERROR("Trying to deregister an unregistered component: "
108 : << id_of_caller);
109 : }
110 :
111 : volume_observers_registered->at(observation_key).erase(id_of_caller);
112 : if (UNLIKELY(
113 : volume_observers_registered->at(observation_key).size() ==
114 : 0)) {
115 : volume_observers_registered->erase(observation_key);
116 : }
117 : },
118 : make_not_null(&box));
119 : }
120 : };
121 :
122 : /*!
123 : * \brief Register a node with the node that writes the reduction data to disk.
124 : */
125 1 : struct RegisterReductionNodeWithWritingNode {
126 : template <typename ParallelComponent, typename DbTagsList,
127 : typename Metavariables, typename ArrayIndex>
128 0 : static void apply(db::DataBox<DbTagsList>& box,
129 : Parallel::GlobalCache<Metavariables>& cache,
130 : const ArrayIndex& /*array_index*/,
131 : const observers::ObservationKey& observation_key,
132 : const size_t caller_node_id) {
133 : auto& my_proxy = Parallel::get_parallel_component<ParallelComponent>(cache);
134 : const auto node_id =
135 : Parallel::my_node<size_t>(*Parallel::local_branch(my_proxy));
136 : ASSERT(node_id == 0, "Only node zero, not node "
137 : << node_id
138 : << ", should be called from another node");
139 :
140 : db::mutate<Tags::NodesExpectedToContributeReductions>(
141 : [&caller_node_id, &observation_key](
142 : const gsl::not_null<
143 : std::unordered_map<ObservationKey, std::set<size_t>>*>
144 : reduction_observers_registered_nodes) {
145 : if (reduction_observers_registered_nodes->find(observation_key) ==
146 : reduction_observers_registered_nodes->end()) {
147 : (*reduction_observers_registered_nodes)[observation_key] =
148 : std::set<size_t>{};
149 : }
150 : auto& registered_nodes_for_key =
151 : reduction_observers_registered_nodes->at(observation_key);
152 : if (UNLIKELY(registered_nodes_for_key.find(caller_node_id) !=
153 : registered_nodes_for_key.end())) {
154 : ERROR("Already registered node " << caller_node_id
155 : << " for reduction observations.");
156 : }
157 : registered_nodes_for_key.insert(caller_node_id);
158 : },
159 : make_not_null(&box));
160 : }
161 : };
162 :
163 : /*!
164 : * \brief Deregister a node with the node that writes the reduction data to
165 : * disk.
166 : */
167 1 : struct DeregisterReductionNodeWithWritingNode {
168 : template <typename ParallelComponent, typename DbTagsList,
169 : typename Metavariables, typename ArrayIndex>
170 0 : static void apply(db::DataBox<DbTagsList>& box,
171 : Parallel::GlobalCache<Metavariables>& cache,
172 : const ArrayIndex& /*array_index*/,
173 : const observers::ObservationKey& observation_key,
174 : const size_t caller_node_id) {
175 : auto& my_proxy = Parallel::get_parallel_component<ParallelComponent>(cache);
176 : const auto node_id =
177 : Parallel::my_node<size_t>(*Parallel::local_branch(my_proxy));
178 : ASSERT(node_id == 0,
179 : "Only node zero, not node "
180 : << node_id << " should deregister other nodes in the reduction");
181 :
182 : db::mutate<Tags::NodesExpectedToContributeReductions>(
183 : [&caller_node_id, &observation_key](
184 : const gsl::not_null<
185 : std::unordered_map<ObservationKey, std::set<size_t>>*>
186 : reduction_observers_registered_nodes) {
187 : if (UNLIKELY(
188 : reduction_observers_registered_nodes->find(observation_key) ==
189 : reduction_observers_registered_nodes->end())) {
190 : ERROR(
191 : "Trying to deregister a node associated with an unregistered "
192 : "observation key: "
193 : << observation_key);
194 : }
195 : auto& registered_nodes_for_key =
196 : reduction_observers_registered_nodes->at(observation_key);
197 : if (UNLIKELY(registered_nodes_for_key.find(caller_node_id) ==
198 : registered_nodes_for_key.end())) {
199 : ERROR("Trying to deregister an unregistered node: "
200 : << caller_node_id);
201 : }
202 : registered_nodes_for_key.erase(caller_node_id);
203 : if (UNLIKELY(registered_nodes_for_key.size() == 0)) {
204 : reduction_observers_registered_nodes->erase(observation_key);
205 : }
206 : },
207 : make_not_null(&box));
208 : }
209 : };
210 :
211 : /// \brief Register an `ArrayComponentId` that will call
212 : /// `observers::ThreadedActions::WriteReductionData` or
213 : /// `observers::ThreadedActions::ContributeReductionData` for a specific
214 : /// `ObservationIdRegistrationKey`
215 : ///
216 : /// Should be invoked on ObserverWriter by the component that will be
217 : /// contributing the data.
218 1 : struct RegisterReductionContributorWithObserverWriter {
219 : public:
220 : template <typename ParallelComponent, typename DbTagsList,
221 : typename Metavariables, typename ArrayIndex>
222 0 : static void apply(db::DataBox<DbTagsList>& box,
223 : Parallel::GlobalCache<Metavariables>& cache,
224 : const ArrayIndex& /*array_index*/,
225 : const observers::ObservationKey& observation_key,
226 : const Parallel::ArrayComponentId& id_of_caller) {
227 : auto& my_proxy = Parallel::get_parallel_component<ParallelComponent>(cache);
228 : const auto node_id =
229 : Parallel::my_node<size_t>(*Parallel::local_branch(my_proxy));
230 : db::mutate<Tags::ExpectedContributorsForObservations>(
231 : [&cache, &id_of_caller, &node_id,
232 : &observation_key](const gsl::not_null<std::unordered_map<
233 : ObservationKey,
234 : std::unordered_set<Parallel::ArrayComponentId>>*>
235 : reduction_observers_registered) {
236 : if (reduction_observers_registered->find(observation_key) ==
237 : reduction_observers_registered->end()) {
238 : (*reduction_observers_registered)[observation_key] =
239 : std::unordered_set<Parallel::ArrayComponentId>{};
240 : Parallel::simple_action<
241 : Actions::RegisterReductionNodeWithWritingNode>(
242 : Parallel::get_parallel_component<ObserverWriter<Metavariables>>(
243 : cache)[0],
244 : observation_key, node_id);
245 : }
246 :
247 : if (LIKELY(
248 : reduction_observers_registered->at(observation_key)
249 : .find(id_of_caller) ==
250 : reduction_observers_registered->at(observation_key).end())) {
251 : reduction_observers_registered->at(observation_key)
252 : .insert(id_of_caller);
253 : } else {
254 : ERROR("Trying to insert a Observer component more than once: "
255 : << id_of_caller
256 : << " with observation key: " << observation_key);
257 : }
258 : },
259 : make_not_null(&box));
260 : }
261 : };
262 :
263 : /// \brief Deregister an `ArrayComponentId` that will no longer call
264 : /// `observers::ThreadedActions::WriteReductionData` or
265 : /// `observers::ThreadedActions::ContributeReductionData` for a specific
266 : /// `ObservationIdRegistrationKey`
267 : ///
268 : /// Should be invoked on ObserverWriter by the component that was previously
269 : /// registered by
270 : /// `observers::Actions::RegisterReductionContributorWithObserverWriter`.
271 1 : struct DeregisterReductionContributorWithObserverWriter {
272 : public:
273 : template <typename ParallelComponent, typename DbTagsList,
274 : typename Metavariables, typename ArrayIndex>
275 0 : static void apply(db::DataBox<DbTagsList>& box,
276 : Parallel::GlobalCache<Metavariables>& cache,
277 : const ArrayIndex& /*array_index*/,
278 : const observers::ObservationKey& observation_key,
279 : const Parallel::ArrayComponentId& id_of_caller) {
280 : auto& my_proxy = Parallel::get_parallel_component<ParallelComponent>(cache);
281 : const auto node_id =
282 : Parallel::my_node<size_t>(*Parallel::local_branch(my_proxy));
283 : db::mutate<Tags::ExpectedContributorsForObservations>(
284 : [&cache, &id_of_caller, &node_id,
285 : &observation_key](const gsl::not_null<std::unordered_map<
286 : ObservationKey,
287 : std::unordered_set<Parallel::ArrayComponentId>>*>
288 : reduction_observers_registered) {
289 : if (UNLIKELY(reduction_observers_registered->find(observation_key) ==
290 : reduction_observers_registered->end())) {
291 : ERROR(
292 : "Trying to deregister a component associated with an "
293 : "unregistered observation key: "
294 : << observation_key);
295 : }
296 : auto& contributors_for_key =
297 : reduction_observers_registered->at(observation_key);
298 : if (UNLIKELY(contributors_for_key.find(id_of_caller) ==
299 : contributors_for_key.end())) {
300 : ERROR("Trying to deregister an unregistered component: "
301 : << id_of_caller
302 : << " with observation key: " << observation_key);
303 : }
304 : contributors_for_key.erase(id_of_caller);
305 : if (UNLIKELY(contributors_for_key.empty())) {
306 : Parallel::simple_action<
307 : Actions::DeregisterReductionNodeWithWritingNode>(
308 : Parallel::get_parallel_component<ObserverWriter<Metavariables>>(
309 : cache)[0],
310 : observation_key, node_id);
311 : reduction_observers_registered->erase(observation_key);
312 : }
313 : },
314 : make_not_null(&box));
315 : }
316 : };
317 :
318 : /*!
319 : * \brief Register the `ArrayComponentId` that will send the data to the
320 : * observer for the given `ObservationIdRegistrationKey`
321 : *
322 : * Should be invoked on the `Observer` by the contributing component.
323 : */
324 1 : struct RegisterContributorWithObserver {
325 : template <typename ParallelComponent, typename DbTagList,
326 : typename Metavariables, typename ArrayIndex>
327 0 : static void apply(db::DataBox<DbTagList>& box,
328 : Parallel::GlobalCache<Metavariables>& cache,
329 : const ArrayIndex& array_index,
330 : const observers::ObservationKey& observation_key,
331 : const Parallel::ArrayComponentId& component_id,
332 : const TypeOfObservation& type_of_observation) {
333 : bool observation_key_already_registered = true;
334 : db::mutate<observers::Tags::ExpectedContributorsForObservations>(
335 : [&component_id, &observation_key, &observation_key_already_registered](
336 : const gsl::not_null<std::unordered_map<
337 : ObservationKey,
338 : std::unordered_set<Parallel::ArrayComponentId>>*>
339 : array_component_ids) {
340 : observation_key_already_registered =
341 : (array_component_ids->find(observation_key) !=
342 : array_component_ids->end());
343 : if (UNLIKELY(
344 : observation_key_already_registered and
345 : array_component_ids->at(observation_key).find(component_id) !=
346 : array_component_ids->at(observation_key).end())) {
347 : ERROR(
348 : "Trying to insert a component_id more than once for "
349 : "observation. This means an element is registering itself "
350 : "with the observers more than once. The component_id is "
351 : << component_id << " and the observation key is "
352 : << observation_key);
353 : }
354 : array_component_ids->operator[](observation_key).insert(component_id);
355 : },
356 : make_not_null(&box));
357 :
358 : if (observation_key_already_registered) {
359 : // We only need to register with the observer writer on the first call
360 : // of this action. Later calls will have already been registered.
361 : return;
362 : }
363 :
364 : auto& observer_writer = *Parallel::local_branch(
365 : Parallel::get_parallel_component<
366 : observers::ObserverWriter<Metavariables>>(cache));
367 :
368 : switch (type_of_observation) {
369 : case TypeOfObservation::Reduction:
370 : Parallel::simple_action<
371 : Actions::RegisterReductionContributorWithObserverWriter>(
372 : observer_writer, observation_key,
373 : Parallel::make_array_component_id<ParallelComponent>(array_index));
374 : return;
375 : case TypeOfObservation::Volume:
376 : Parallel::simple_action<
377 : Actions::RegisterVolumeContributorWithObserverWriter>(
378 : observer_writer, observation_key,
379 : Parallel::make_array_component_id<ParallelComponent>(array_index));
380 : return;
381 : default:
382 : ERROR(
383 : "Registering an unknown TypeOfObservation. Should be one of "
384 : "'Reduction' or 'Volume'");
385 : };
386 : }
387 : };
388 :
389 : /*!
390 : * \brief Deregister the `ArrayComponentId` that will no longer send the data to
391 : * the observer for the given `ObservationIdRegistrationKey`
392 : *
393 : * Should be invoked on the `Observer` by the component that was previously
394 : * registered with `observers::Actions::RegisterContributorWithObserver`.
395 : */
396 1 : struct DeregisterContributorWithObserver {
397 : template <typename ParallelComponent, typename DbTagList,
398 : typename Metavariables, typename ArrayIndex>
399 0 : static void apply(db::DataBox<DbTagList>& box,
400 : Parallel::GlobalCache<Metavariables>& cache,
401 : const ArrayIndex& array_index,
402 : const observers::ObservationKey& observation_key,
403 : const Parallel::ArrayComponentId& component_id,
404 : const TypeOfObservation& type_of_observation) {
405 : bool all_array_components_have_been_deregistered = false;
406 : db::mutate<observers::Tags::ExpectedContributorsForObservations>(
407 : [&component_id, &observation_key,
408 : &all_array_components_have_been_deregistered](
409 : const gsl::not_null<std::unordered_map<
410 : ObservationKey,
411 : std::unordered_set<Parallel::ArrayComponentId>>*>
412 : array_component_ids) {
413 : if (UNLIKELY(array_component_ids->find(observation_key) ==
414 : array_component_ids->end())) {
415 : ERROR(
416 : "Trying to deregister a component associated with an "
417 : "unregistered observation key: "
418 : << observation_key);
419 : }
420 : auto& component_ids_for_key =
421 : array_component_ids->at(observation_key);
422 : if (UNLIKELY(component_ids_for_key.find(component_id) ==
423 : array_component_ids->at(observation_key).end())) {
424 : ERROR("Trying to deregister an unregistered component: "
425 : << component_id
426 : << " with observation key: " << observation_key);
427 : }
428 : component_ids_for_key.erase(component_id);
429 : if (UNLIKELY(component_ids_for_key.size() == 0)) {
430 : array_component_ids->erase(observation_key);
431 : all_array_components_have_been_deregistered = true;
432 : }
433 : },
434 : make_not_null(&box));
435 :
436 : if (not all_array_components_have_been_deregistered) {
437 : // Only deregister with the observer writer if this deregistration
438 : // removes the last component for the provided `observation_key`.
439 : return;
440 : }
441 :
442 : auto& observer_writer = *Parallel::local_branch(
443 : Parallel::get_parallel_component<
444 : observers::ObserverWriter<Metavariables>>(cache));
445 :
446 : switch (type_of_observation) {
447 : case TypeOfObservation::Reduction:
448 : Parallel::simple_action<
449 : Actions::DeregisterReductionContributorWithObserverWriter>(
450 : observer_writer, observation_key,
451 : Parallel::make_array_component_id<ParallelComponent>(array_index));
452 : return;
453 : case TypeOfObservation::Volume:
454 : Parallel::simple_action<
455 : Actions::DeregisterVolumeContributorWithObserverWriter>(
456 : observer_writer, observation_key,
457 : Parallel::make_array_component_id<ParallelComponent>(array_index));
458 : return;
459 : default:
460 : ERROR(
461 : "Attempting to deregister an unknown TypeOfObservation. "
462 : "Should be one of 'Reduction' or 'Volume'");
463 : };
464 : }
465 : };
466 : } // namespace Actions
467 : } // namespace observers
|