Line data Source code
1 0 : // Distributed under the MIT License. 2 : // See LICENSE.txt for details. 3 : 4 : #pragma once 5 : 6 : #include <cstddef> 7 : #include <mutex> 8 : 9 : #include "DataStructures/DataBox/DataBox.hpp" 10 : #include "Domain/Structure/ElementId.hpp" 11 : #include "Parallel/GlobalCache.hpp" 12 : #include "Parallel/Invoke.hpp" 13 : #include "Parallel/Local.hpp" 14 : #include "Parallel/NodeLock.hpp" 15 : #include "Parallel/Phase.hpp" 16 : #include "Utilities/ErrorHandling/Error.hpp" 17 : #include "Utilities/Gsl.hpp" 18 : #include "Utilities/TaggedTuple.hpp" 19 : 20 : namespace Parallel::Actions { 21 : /// \brief Receive data for a specific element on the nodegroup. 22 : /// 23 : /// If `StartPhase` is `true` then `start_phase(phase)` is called on the 24 : /// `element_to_execute_on`, otherwise `perform_algorithm()` is called on the 25 : /// `element_to_execute_on`. 26 : template <bool StartPhase = false> 27 1 : struct ReceiveDataForElement { 28 : /// \brief Entry method called when receiving data from another node. 29 : template <typename ParallelComponent, typename DbTagsList, 30 : typename Metavariables, typename ArrayIndex, typename ReceiveData, 31 : typename ReceiveTag, size_t Dim, typename DistributedObject> 32 1 : static void apply(db::DataBox<DbTagsList>& box, 33 : Parallel::GlobalCache<Metavariables>& cache, 34 : const ArrayIndex& /*array_index*/, 35 : const gsl::not_null<Parallel::NodeLock*> /*node_lock*/, 36 : const DistributedObject* /*distributed_object*/, 37 : const ReceiveTag& /*meta*/, 38 : const ElementId<Dim>& element_to_execute_on, 39 : typename ReceiveTag::temporal_id instance, 40 : ReceiveData receive_data) { 41 : [[maybe_unused]] const size_t my_node = Parallel::my_node<size_t>(cache); 42 : auto& element_collection = db::get_mutable_reference< 43 : typename ParallelComponent::element_collection_tag>( 44 : make_not_null(&box)); 45 : 46 : ASSERT( 47 : element_collection.count(element_to_execute_on) == 1, 48 : "ElementId " << element_to_execute_on << " is not on node " << my_node); 49 : const bool run_algorithm = ReceiveTag::insert_into_inbox( 50 : make_not_null(&tuples::get<ReceiveTag>( 51 : element_collection.at(element_to_execute_on).inboxes())), 52 : instance, std::move(receive_data)); 53 : if (run_algorithm) { 54 : apply_impl<ParallelComponent>(cache, element_to_execute_on, 55 : make_not_null(&element_collection)); 56 : } 57 : } 58 : 59 : /// \brief Entry method call when receiving from same node. 60 : template <typename ParallelComponent, typename DbTagsList, 61 : typename Metavariables, typename ArrayIndex, size_t Dim, 62 : typename DistributedObject> 63 1 : static void apply(db::DataBox<DbTagsList>& box, 64 : Parallel::GlobalCache<Metavariables>& cache, 65 : const ArrayIndex& /*array_index*/, 66 : const gsl::not_null<Parallel::NodeLock*> /*node_lock*/, 67 : const DistributedObject* /*distributed_object*/, 68 : const ElementId<Dim>& element_to_execute_on) { 69 : auto& element_collection = db::get_mutable_reference< 70 : typename ParallelComponent::element_collection_tag>( 71 : make_not_null(&box)); 72 : apply_impl<ParallelComponent>(cache, element_to_execute_on, 73 : make_not_null(&element_collection)); 74 : } 75 : 76 : private: 77 : template <typename ParallelComponent, typename Metavariables, 78 : typename ElementCollection, size_t Dim> 79 0 : static void apply_impl( 80 : Parallel::GlobalCache<Metavariables>& cache, 81 : const ElementId<Dim>& element_to_execute_on, 82 : const gsl::not_null<ElementCollection*> element_collection) { 83 : if constexpr (StartPhase) { 84 : const Phase current_phase = 85 : Parallel::local_branch( 86 : Parallel::get_parallel_component<ParallelComponent>(cache)) 87 : ->phase(); 88 : ASSERT(element_collection->count(element_to_execute_on) == 1, 89 : "ElementId " << element_to_execute_on << " is not on node " 90 : << Parallel::my_node<size_t>(cache);); 91 : auto& element = element_collection->at(element_to_execute_on); 92 : const std::lock_guard element_lock(element.element_lock()); 93 : // We always force the phase to start because if a previous phase 94 : // terminated properly, then this does nothing. But if a phase didn't 95 : // terminate properly, then it likely went into deadlock analysis or 96 : // post-failure cleanup, in which case we want to run code after a 97 : // failure, so we must force the phase to start. 98 : element.start_phase(current_phase, true); 99 : } else { 100 : auto& element = element_collection->at(element_to_execute_on); 101 : std::unique_lock element_lock(element.element_lock(), std::defer_lock); 102 : if (element_lock.try_lock()) { 103 : element.perform_algorithm(); 104 : } else { 105 : const size_t my_node = Parallel::my_node<size_t>(cache); 106 : auto& my_proxy = 107 : Parallel::get_parallel_component<ParallelComponent>(cache); 108 : Parallel::threaded_action<Parallel::Actions::ReceiveDataForElement<>>( 109 : my_proxy[my_node], element_to_execute_on); 110 : } 111 : } 112 : } 113 : }; 114 : } // namespace Parallel::Actions