SpECTRE Documentation Coverage Report
Current view: top level - IO/Observer/Actions - ObserverRegistration.hpp Hit Total Coverage
Commit: 1bd361db2ecec890b34404958975897856517ca1 Lines: 8 17 47.1 %
Date: 2024-05-08 20:10:16
Legend: Lines: hit not hit

          Line data    Source code
       1           0 : // Distributed under the MIT License.
       2             : // See LICENSE.txt for details.
       3             : 
       4             : #pragma once
       5             : 
       6             : #include <cstddef>
       7             : #include <set>
       8             : #include <type_traits>
       9             : #include <unordered_map>
      10             : #include <unordered_set>
      11             : 
      12             : #include "DataStructures/DataBox/DataBox.hpp"
      13             : #include "DataStructures/Index.hpp"
      14             : #include "IO/Observer/ObserverComponent.hpp"
      15             : #include "IO/Observer/Tags.hpp"
      16             : #include "IO/Observer/TypeOfObservation.hpp"
      17             : #include "Parallel/ArrayComponentId.hpp"
      18             : #include "Parallel/GlobalCache.hpp"
      19             : #include "Parallel/Info.hpp"
      20             : #include "Parallel/Invoke.hpp"
      21             : #include "Parallel/Local.hpp"
      22             : #include "Utilities/Gsl.hpp"
      23             : #include "Utilities/TMPL.hpp"
      24             : #include "Utilities/TaggedTuple.hpp"
      25             : 
      26             : namespace observers {
      27             : /*!
      28             :  * \ingroup ObserversGroup
      29             :  * \brief %Actions used by the observer parallel component
      30             :  */
      31             : namespace Actions {
      32             : /// \brief Register an `ArrayComponentId` with a specific
      33             : /// `ObservationIdRegistrationKey` that will call
      34             : /// `observers::ThreadedActions::ContributeVolumeData`.
      35             : ///
      36             : /// Should be invoked on ObserverWriter by the component that will be
      37             : /// contributing the data.
      38           1 : struct RegisterVolumeContributorWithObserverWriter {
      39             :  public:
      40             :   template <typename ParallelComponent, typename DbTagsList,
      41             :             typename Metavariables, typename ArrayIndex>
      42           0 :   static void apply(db::DataBox<DbTagsList>& box,
      43             :                     const Parallel::GlobalCache<Metavariables>& /*cache*/,
      44             :                     const ArrayIndex& /*array_index*/,
      45             :                     const observers::ObservationKey& observation_key,
      46             :                     const Parallel::ArrayComponentId& id_of_caller) {
      47             :     db::mutate<Tags::ExpectedContributorsForObservations>(
      48             :         [&id_of_caller,
      49             :          &observation_key](const gsl::not_null<std::unordered_map<
      50             :                                ObservationKey,
      51             :                                std::unordered_set<Parallel::ArrayComponentId>>*>
      52             :                                volume_observers_registered) {
      53             :           if (volume_observers_registered->find(observation_key) ==
      54             :               volume_observers_registered->end()) {
      55             :             (*volume_observers_registered)[observation_key] =
      56             :                 std::unordered_set<Parallel::ArrayComponentId>{};
      57             :           }
      58             : 
      59             :           if (UNLIKELY(
      60             :                   volume_observers_registered->at(observation_key)
      61             :                       .find(id_of_caller) !=
      62             :                   volume_observers_registered->at(observation_key).end())) {
      63             :             ERROR("Trying to insert a Observer component more than once: "
      64             :                   << id_of_caller);
      65             :           }
      66             : 
      67             :           volume_observers_registered->at(observation_key).insert(id_of_caller);
      68             :         },
      69             :         make_not_null(&box));
      70             :   }
      71             : };
      72             : 
      73             : /// \brief Deregister an `ArrayComponentId` with a specific
      74             : /// `ObservationIdRegistrationKey` that will no longer call
      75             : /// `observers::ThreadedActions::ContributeVolumeData`
      76             : ///
      77             : /// Should be invoked on ObserverWriter by the component that was previously
      78             : /// registered with
      79             : /// `observers::Actions::RegisterVolumeContributorWithObserverWriter`
      80           1 : struct DeregisterVolumeContributorWithObserverWriter {
      81             :  public:
      82             :   template <typename ParallelComponent, typename DbTagsList,
      83             :             typename Metavariables, typename ArrayIndex>
      84           0 :   static void apply(db::DataBox<DbTagsList>& box,
      85             :                     const Parallel::GlobalCache<Metavariables>& /*cache*/,
      86             :                     const ArrayIndex& /*array_index*/,
      87             :                     const observers::ObservationKey& observation_key,
      88             :                     const Parallel::ArrayComponentId& id_of_caller) {
      89             :     db::mutate<Tags::ExpectedContributorsForObservations>(
      90             :         [&id_of_caller,
      91             :          &observation_key](const gsl::not_null<std::unordered_map<
      92             :                                ObservationKey,
      93             :                                std::unordered_set<Parallel::ArrayComponentId>>*>
      94             :                                volume_observers_registered) {
      95             :           if (UNLIKELY(volume_observers_registered->find(observation_key) ==
      96             :                        volume_observers_registered->end())) {
      97             :             ERROR(
      98             :                 "Trying to deregister a component associated with an "
      99             :                 "unregistered observation key: "
     100             :                 << observation_key);
     101             :           }
     102             : 
     103             :           if (UNLIKELY(
     104             :                   volume_observers_registered->at(observation_key)
     105             :                       .find(id_of_caller) ==
     106             :                   volume_observers_registered->at(observation_key).end())) {
     107             :             ERROR("Trying to deregister an unregistered component: "
     108             :                   << id_of_caller);
     109             :           }
     110             : 
     111             :           volume_observers_registered->at(observation_key).erase(id_of_caller);
     112             :           if (UNLIKELY(
     113             :                   volume_observers_registered->at(observation_key).size() ==
     114             :                   0)) {
     115             :             volume_observers_registered->erase(observation_key);
     116             :           }
     117             :         },
     118             :         make_not_null(&box));
     119             :   }
     120             : };
     121             : 
     122             : /*!
     123             :  * \brief Register a node with the node that writes the reduction data to disk.
     124             :  */
     125           1 : struct RegisterReductionNodeWithWritingNode {
     126             :   template <typename ParallelComponent, typename DbTagsList,
     127             :             typename Metavariables, typename ArrayIndex>
     128           0 :   static void apply(db::DataBox<DbTagsList>& box,
     129             :                     Parallel::GlobalCache<Metavariables>& cache,
     130             :                     const ArrayIndex& /*array_index*/,
     131             :                     const observers::ObservationKey& observation_key,
     132             :                     const size_t caller_node_id) {
     133             :     auto& my_proxy = Parallel::get_parallel_component<ParallelComponent>(cache);
     134             :     const auto node_id =
     135             :         Parallel::my_node<size_t>(*Parallel::local_branch(my_proxy));
     136             :     ASSERT(node_id == 0, "Only node zero, not node "
     137             :                              << node_id
     138             :                              << ", should be called from another node");
     139             : 
     140             :     db::mutate<Tags::NodesExpectedToContributeReductions>(
     141             :         [&caller_node_id, &observation_key](
     142             :             const gsl::not_null<
     143             :                 std::unordered_map<ObservationKey, std::set<size_t>>*>
     144             :                 reduction_observers_registered_nodes) {
     145             :           if (reduction_observers_registered_nodes->find(observation_key) ==
     146             :               reduction_observers_registered_nodes->end()) {
     147             :             (*reduction_observers_registered_nodes)[observation_key] =
     148             :                 std::set<size_t>{};
     149             :           }
     150             :           auto& registered_nodes_for_key =
     151             :               reduction_observers_registered_nodes->at(observation_key);
     152             :           if (UNLIKELY(registered_nodes_for_key.find(caller_node_id) !=
     153             :                        registered_nodes_for_key.end())) {
     154             :             ERROR("Already registered node " << caller_node_id
     155             :                                              << " for reduction observations.");
     156             :           }
     157             :           registered_nodes_for_key.insert(caller_node_id);
     158             :         },
     159             :         make_not_null(&box));
     160             :   }
     161             : };
     162             : 
     163             : /*!
     164             :  * \brief Deregister a node with the node that writes the reduction data to
     165             :  * disk.
     166             :  */
     167           1 : struct DeregisterReductionNodeWithWritingNode {
     168             :   template <typename ParallelComponent, typename DbTagsList,
     169             :             typename Metavariables, typename ArrayIndex>
     170           0 :   static void apply(db::DataBox<DbTagsList>& box,
     171             :                     Parallel::GlobalCache<Metavariables>& cache,
     172             :                     const ArrayIndex& /*array_index*/,
     173             :                     const observers::ObservationKey& observation_key,
     174             :                     const size_t caller_node_id) {
     175             :     auto& my_proxy = Parallel::get_parallel_component<ParallelComponent>(cache);
     176             :     const auto node_id =
     177             :         Parallel::my_node<size_t>(*Parallel::local_branch(my_proxy));
     178             :     ASSERT(node_id == 0,
     179             :            "Only node zero, not node "
     180             :                << node_id << " should deregister other nodes in the reduction");
     181             : 
     182             :     db::mutate<Tags::NodesExpectedToContributeReductions>(
     183             :         [&caller_node_id, &observation_key](
     184             :             const gsl::not_null<
     185             :                 std::unordered_map<ObservationKey, std::set<size_t>>*>
     186             :                 reduction_observers_registered_nodes) {
     187             :           if (UNLIKELY(
     188             :                   reduction_observers_registered_nodes->find(observation_key) ==
     189             :                   reduction_observers_registered_nodes->end())) {
     190             :             ERROR(
     191             :                 "Trying to deregister a node associated with an unregistered "
     192             :                 "observation key: "
     193             :                 << observation_key);
     194             :           }
     195             :           auto& registered_nodes_for_key =
     196             :               reduction_observers_registered_nodes->at(observation_key);
     197             :           if (UNLIKELY(registered_nodes_for_key.find(caller_node_id) ==
     198             :                        registered_nodes_for_key.end())) {
     199             :             ERROR("Trying to deregister an unregistered node: "
     200             :                   << caller_node_id);
     201             :           }
     202             :           registered_nodes_for_key.erase(caller_node_id);
     203             :           if (UNLIKELY(registered_nodes_for_key.size() == 0)) {
     204             :             reduction_observers_registered_nodes->erase(observation_key);
     205             :           }
     206             :         },
     207             :         make_not_null(&box));
     208             :   }
     209             : };
     210             : 
     211             : /// \brief Register an `ArrayComponentId` that will call
     212             : /// `observers::ThreadedActions::WriteReductionData` or
     213             : /// `observers::ThreadedActions::ContributeReductionData` for a specific
     214             : /// `ObservationIdRegistrationKey`
     215             : ///
     216             : /// Should be invoked on ObserverWriter by the component that will be
     217             : /// contributing the data.
     218           1 : struct RegisterReductionContributorWithObserverWriter {
     219             :  public:
     220             :   template <typename ParallelComponent, typename DbTagsList,
     221             :             typename Metavariables, typename ArrayIndex>
     222           0 :   static void apply(db::DataBox<DbTagsList>& box,
     223             :                     Parallel::GlobalCache<Metavariables>& cache,
     224             :                     const ArrayIndex& /*array_index*/,
     225             :                     const observers::ObservationKey& observation_key,
     226             :                     const Parallel::ArrayComponentId& id_of_caller) {
     227             :     auto& my_proxy = Parallel::get_parallel_component<ParallelComponent>(cache);
     228             :     const auto node_id =
     229             :         Parallel::my_node<size_t>(*Parallel::local_branch(my_proxy));
     230             :     db::mutate<Tags::ExpectedContributorsForObservations>(
     231             :         [&cache, &id_of_caller, &node_id,
     232             :          &observation_key](const gsl::not_null<std::unordered_map<
     233             :                                ObservationKey,
     234             :                                std::unordered_set<Parallel::ArrayComponentId>>*>
     235             :                                reduction_observers_registered) {
     236             :           if (reduction_observers_registered->find(observation_key) ==
     237             :               reduction_observers_registered->end()) {
     238             :             (*reduction_observers_registered)[observation_key] =
     239             :                 std::unordered_set<Parallel::ArrayComponentId>{};
     240             :             Parallel::simple_action<
     241             :                 Actions::RegisterReductionNodeWithWritingNode>(
     242             :                 Parallel::get_parallel_component<ObserverWriter<Metavariables>>(
     243             :                     cache)[0],
     244             :                 observation_key, node_id);
     245             :           }
     246             : 
     247             :           if (LIKELY(
     248             :                   reduction_observers_registered->at(observation_key)
     249             :                       .find(id_of_caller) ==
     250             :                   reduction_observers_registered->at(observation_key).end())) {
     251             :             reduction_observers_registered->at(observation_key)
     252             :                 .insert(id_of_caller);
     253             :           } else {
     254             :             ERROR("Trying to insert a Observer component more than once: "
     255             :                   << id_of_caller
     256             :                   << " with observation key: " << observation_key);
     257             :           }
     258             :         },
     259             :         make_not_null(&box));
     260             :   }
     261             : };
     262             : 
     263             : /// \brief Deregister an `ArrayComponentId` that will no longer call
     264             : /// `observers::ThreadedActions::WriteReductionData` or
     265             : /// `observers::ThreadedActions::ContributeReductionData` for a specific
     266             : /// `ObservationIdRegistrationKey`
     267             : ///
     268             : /// Should be invoked on ObserverWriter by the component that was previously
     269             : /// registered by
     270             : /// `observers::Actions::RegisterReductionContributorWithObserverWriter`.
     271           1 : struct DeregisterReductionContributorWithObserverWriter {
     272             :  public:
     273             :   template <typename ParallelComponent, typename DbTagsList,
     274             :             typename Metavariables, typename ArrayIndex>
     275           0 :   static void apply(db::DataBox<DbTagsList>& box,
     276             :                     Parallel::GlobalCache<Metavariables>& cache,
     277             :                     const ArrayIndex& /*array_index*/,
     278             :                     const observers::ObservationKey& observation_key,
     279             :                     const Parallel::ArrayComponentId& id_of_caller) {
     280             :     auto& my_proxy = Parallel::get_parallel_component<ParallelComponent>(cache);
     281             :     const auto node_id =
     282             :         Parallel::my_node<size_t>(*Parallel::local_branch(my_proxy));
     283             :     db::mutate<Tags::ExpectedContributorsForObservations>(
     284             :         [&cache, &id_of_caller, &node_id,
     285             :          &observation_key](const gsl::not_null<std::unordered_map<
     286             :                                ObservationKey,
     287             :                                std::unordered_set<Parallel::ArrayComponentId>>*>
     288             :                                reduction_observers_registered) {
     289             :           if (UNLIKELY(reduction_observers_registered->find(observation_key) ==
     290             :                        reduction_observers_registered->end())) {
     291             :             ERROR(
     292             :                 "Trying to deregister a component associated with an "
     293             :                 "unregistered observation key: "
     294             :                 << observation_key);
     295             :           }
     296             :           auto& contributors_for_key =
     297             :               reduction_observers_registered->at(observation_key);
     298             :           if (UNLIKELY(contributors_for_key.find(id_of_caller) ==
     299             :                        contributors_for_key.end())) {
     300             :             ERROR("Trying to deregister an unregistered component: "
     301             :                   << id_of_caller
     302             :                   << " with observation key: " << observation_key);
     303             :           }
     304             :           contributors_for_key.erase(id_of_caller);
     305             :           if (UNLIKELY(contributors_for_key.empty())) {
     306             :             Parallel::simple_action<
     307             :                 Actions::DeregisterReductionNodeWithWritingNode>(
     308             :                 Parallel::get_parallel_component<ObserverWriter<Metavariables>>(
     309             :                     cache)[0],
     310             :                 observation_key, node_id);
     311             :             reduction_observers_registered->erase(observation_key);
     312             :           }
     313             :         },
     314             :         make_not_null(&box));
     315             :   }
     316             : };
     317             : 
     318             : /*!
     319             :  * \brief Register the `ArrayComponentId` that will send the data to the
     320             :  * observer for the given `ObservationIdRegistrationKey`
     321             :  *
     322             :  * Should be invoked on the `Observer` by the contributing component.
     323             :  */
     324           1 : struct RegisterContributorWithObserver {
     325             :   template <typename ParallelComponent, typename DbTagList,
     326             :             typename Metavariables, typename ArrayIndex>
     327           0 :   static void apply(db::DataBox<DbTagList>& box,
     328             :                     Parallel::GlobalCache<Metavariables>& cache,
     329             :                     const ArrayIndex& array_index,
     330             :                     const observers::ObservationKey& observation_key,
     331             :                     const Parallel::ArrayComponentId& component_id,
     332             :                     const TypeOfObservation& type_of_observation) {
     333             :     bool observation_key_already_registered = true;
     334             :     db::mutate<observers::Tags::ExpectedContributorsForObservations>(
     335             :         [&component_id, &observation_key, &observation_key_already_registered](
     336             :             const gsl::not_null<std::unordered_map<
     337             :                 ObservationKey,
     338             :                 std::unordered_set<Parallel::ArrayComponentId>>*>
     339             :                 array_component_ids) {
     340             :           observation_key_already_registered =
     341             :               (array_component_ids->find(observation_key) !=
     342             :                array_component_ids->end());
     343             :           if (UNLIKELY(
     344             :                   observation_key_already_registered and
     345             :                   array_component_ids->at(observation_key).find(component_id) !=
     346             :                       array_component_ids->at(observation_key).end())) {
     347             :             ERROR(
     348             :                 "Trying to insert a component_id more than once for "
     349             :                 "observation. This means an element is registering itself "
     350             :                 "with the observers more than once. The component_id is "
     351             :                 << component_id << " and the observation key is "
     352             :                 << observation_key);
     353             :           }
     354             :           array_component_ids->operator[](observation_key).insert(component_id);
     355             :         },
     356             :         make_not_null(&box));
     357             : 
     358             :     if (observation_key_already_registered) {
     359             :       // We only need to register with the observer writer on the first call
     360             :       // of this action. Later calls will have already been registered.
     361             :       return;
     362             :     }
     363             : 
     364             :     auto& observer_writer = *Parallel::local_branch(
     365             :         Parallel::get_parallel_component<
     366             :             observers::ObserverWriter<Metavariables>>(cache));
     367             : 
     368             :     switch (type_of_observation) {
     369             :       case TypeOfObservation::Reduction:
     370             :         Parallel::simple_action<
     371             :             Actions::RegisterReductionContributorWithObserverWriter>(
     372             :             observer_writer, observation_key,
     373             :             Parallel::make_array_component_id<ParallelComponent>(array_index));
     374             :         return;
     375             :       case TypeOfObservation::Volume:
     376             :         Parallel::simple_action<
     377             :             Actions::RegisterVolumeContributorWithObserverWriter>(
     378             :             observer_writer, observation_key,
     379             :             Parallel::make_array_component_id<ParallelComponent>(array_index));
     380             :         return;
     381             :       default:
     382             :         ERROR(
     383             :             "Registering an unknown TypeOfObservation. Should be one of "
     384             :             "'Reduction' or 'Volume'");
     385             :     };
     386             :   }
     387             : };
     388             : 
     389             : /*!
     390             :  * \brief Deregister the `ArrayComponentId` that will no longer send the data to
     391             :  * the observer for the given `ObservationIdRegistrationKey`
     392             :  *
     393             :  * Should be invoked on the `Observer` by the component that was previously
     394             :  * registered with `observers::Actions::RegisterContributorWithObserver`.
     395             :  */
     396           1 : struct DeregisterContributorWithObserver {
     397             :   template <typename ParallelComponent, typename DbTagList,
     398             :             typename Metavariables, typename ArrayIndex>
     399           0 :   static void apply(db::DataBox<DbTagList>& box,
     400             :                     Parallel::GlobalCache<Metavariables>& cache,
     401             :                     const ArrayIndex& array_index,
     402             :                     const observers::ObservationKey& observation_key,
     403             :                     const Parallel::ArrayComponentId& component_id,
     404             :                     const TypeOfObservation& type_of_observation) {
     405             :     bool all_array_components_have_been_deregistered = false;
     406             :     db::mutate<observers::Tags::ExpectedContributorsForObservations>(
     407             :         [&component_id, &observation_key,
     408             :          &all_array_components_have_been_deregistered](
     409             :             const gsl::not_null<std::unordered_map<
     410             :                 ObservationKey,
     411             :                 std::unordered_set<Parallel::ArrayComponentId>>*>
     412             :                 array_component_ids) {
     413             :           if (UNLIKELY(array_component_ids->find(observation_key) ==
     414             :                        array_component_ids->end())) {
     415             :             ERROR(
     416             :                 "Trying to deregister a component associated with an "
     417             :                 "unregistered observation key: "
     418             :                 << observation_key);
     419             :           }
     420             :           auto& component_ids_for_key =
     421             :               array_component_ids->at(observation_key);
     422             :           if (UNLIKELY(component_ids_for_key.find(component_id) ==
     423             :                        array_component_ids->at(observation_key).end())) {
     424             :             ERROR("Trying to deregister an unregistered component: "
     425             :                   << component_id
     426             :                   << " with observation key: " << observation_key);
     427             :           }
     428             :           component_ids_for_key.erase(component_id);
     429             :           if (UNLIKELY(component_ids_for_key.size() == 0)) {
     430             :             array_component_ids->erase(observation_key);
     431             :             all_array_components_have_been_deregistered = true;
     432             :           }
     433             :         },
     434             :         make_not_null(&box));
     435             : 
     436             :     if (not all_array_components_have_been_deregistered) {
     437             :       // Only deregister with the observer writer if this deregistration
     438             :       // removes the last component for the provided `observation_key`.
     439             :       return;
     440             :     }
     441             : 
     442             :     auto& observer_writer = *Parallel::local_branch(
     443             :         Parallel::get_parallel_component<
     444             :             observers::ObserverWriter<Metavariables>>(cache));
     445             : 
     446             :     switch (type_of_observation) {
     447             :       case TypeOfObservation::Reduction:
     448             :         Parallel::simple_action<
     449             :             Actions::DeregisterReductionContributorWithObserverWriter>(
     450             :             observer_writer, observation_key,
     451             :             Parallel::make_array_component_id<ParallelComponent>(array_index));
     452             :         return;
     453             :       case TypeOfObservation::Volume:
     454             :         Parallel::simple_action<
     455             :             Actions::DeregisterVolumeContributorWithObserverWriter>(
     456             :             observer_writer, observation_key,
     457             :             Parallel::make_array_component_id<ParallelComponent>(array_index));
     458             :         return;
     459             :       default:
     460             :         ERROR(
     461             :             "Attempting to deregister an unknown TypeOfObservation. "
     462             :             "Should be one of 'Reduction' or 'Volume'");
     463             :     };
     464             :   }
     465             : };
     466             : }  // namespace Actions
     467             : }  // namespace observers

Generated by: LCOV version 1.14