Line data Source code
1 0 : // Distributed under the MIT License. 2 : // See LICENSE.txt for details. 3 : 4 : #pragma once 5 : 6 : #include <cstddef> 7 : #include <mutex> 8 : 9 : #include "DataStructures/DataBox/DataBox.hpp" 10 : #include "Domain/Structure/ElementId.hpp" 11 : #include "Parallel/GlobalCache.hpp" 12 : #include "Parallel/Invoke.hpp" 13 : #include "Parallel/Local.hpp" 14 : #include "Parallel/NodeLock.hpp" 15 : #include "Parallel/Phase.hpp" 16 : #include "Utilities/ErrorHandling/Error.hpp" 17 : #include "Utilities/Gsl.hpp" 18 : #include "Utilities/TaggedTuple.hpp" 19 : 20 : namespace Parallel::Actions { 21 : /// \brief Receive data for a specific element on the nodegroup. 22 : /// 23 : /// If `StartPhase` is `true` then `start_phase(phase)` is called on the 24 : /// `element_to_execute_on`, otherwise `perform_algorithm()` is called on the 25 : /// `element_to_execute_on`. 26 : template <bool StartPhase = false> 27 1 : struct ReceiveDataForElement { 28 : /// \brief Entry method called when receiving data from another node. 29 : template <typename ParallelComponent, typename DbTagsList, 30 : typename Metavariables, typename ArrayIndex, typename ReceiveData, 31 : typename ReceiveTag, size_t Dim, typename DistributedObject> 32 1 : static void apply(db::DataBox<DbTagsList>& box, 33 : Parallel::GlobalCache<Metavariables>& cache, 34 : const ArrayIndex& /*array_index*/, 35 : const gsl::not_null<Parallel::NodeLock*> /*node_lock*/, 36 : const DistributedObject* /*distributed_object*/, 37 : const ReceiveTag& /*meta*/, 38 : const ElementId<Dim>& element_to_execute_on, 39 : typename ReceiveTag::temporal_id instance, 40 : ReceiveData receive_data) { 41 : [[maybe_unused]] const size_t my_node = Parallel::my_node<size_t>(cache); 42 : auto& element_collection = db::get_mutable_reference< 43 : typename ParallelComponent::element_collection_tag>( 44 : make_not_null(&box)); 45 : 46 : ASSERT( 47 : element_collection.count(element_to_execute_on) == 1, 48 : "ElementId " << element_to_execute_on << " is not on node " << my_node); 49 : ReceiveTag::insert_into_inbox( 50 : make_not_null(&tuples::get<ReceiveTag>( 51 : element_collection.at(element_to_execute_on).inboxes())), 52 : instance, std::move(receive_data)); 53 : 54 : apply_impl<ParallelComponent>(cache, element_to_execute_on, 55 : make_not_null(&element_collection)); 56 : } 57 : 58 : /// \brief Entry method call when receiving from same node. 59 : template <typename ParallelComponent, typename DbTagsList, 60 : typename Metavariables, typename ArrayIndex, size_t Dim, 61 : typename DistributedObject> 62 1 : static void apply(db::DataBox<DbTagsList>& box, 63 : Parallel::GlobalCache<Metavariables>& cache, 64 : const ArrayIndex& /*array_index*/, 65 : const gsl::not_null<Parallel::NodeLock*> /*node_lock*/, 66 : const DistributedObject* /*distributed_object*/, 67 : const ElementId<Dim>& element_to_execute_on) { 68 : auto& element_collection = db::get_mutable_reference< 69 : typename ParallelComponent::element_collection_tag>( 70 : make_not_null(&box)); 71 : apply_impl<ParallelComponent>(cache, element_to_execute_on, 72 : make_not_null(&element_collection)); 73 : } 74 : 75 : private: 76 : template <typename ParallelComponent, typename Metavariables, 77 : typename ElementCollection, size_t Dim> 78 0 : static void apply_impl( 79 : Parallel::GlobalCache<Metavariables>& cache, 80 : const ElementId<Dim>& element_to_execute_on, 81 : const gsl::not_null<ElementCollection*> element_collection) { 82 : if constexpr (StartPhase) { 83 : const Phase current_phase = 84 : Parallel::local_branch( 85 : Parallel::get_parallel_component<ParallelComponent>(cache)) 86 : ->phase(); 87 : ASSERT(element_collection->count(element_to_execute_on) == 1, 88 : "ElementId " << element_to_execute_on << " is not on node " 89 : << Parallel::my_node<size_t>(cache);); 90 : auto& element = element_collection->at(element_to_execute_on); 91 : const std::lock_guard element_lock(element.element_lock()); 92 : // We always force the phase to start because if a previous phase 93 : // terminated properly, then this does nothing. But if a phase didn't 94 : // terminate properly, then it likely went into deadlock analysis or 95 : // post-failure cleanup, in which case we want to run code after a 96 : // failure, so we must force the phase to start. 97 : element.start_phase(current_phase, true); 98 : } else { 99 : auto& element = element_collection->at(element_to_execute_on); 100 : std::unique_lock element_lock(element.element_lock(), std::defer_lock); 101 : if (element_lock.try_lock()) { 102 : element.perform_algorithm(); 103 : } else { 104 : const size_t my_node = Parallel::my_node<size_t>(cache); 105 : auto& my_proxy = 106 : Parallel::get_parallel_component<ParallelComponent>(cache); 107 : Parallel::threaded_action<Parallel::Actions::ReceiveDataForElement<>>( 108 : my_proxy[my_node], element_to_execute_on); 109 : } 110 : } 111 : } 112 : }; 113 : } // namespace Parallel::Actions