Line data Source code
1 0 : // Distributed under the MIT License. 2 : // See LICENSE.txt for details. 3 : 4 : #pragma once 5 : 6 : #include <numeric> 7 : #include <string> 8 : #include <tuple> 9 : #include <utility> 10 : 11 : #include "DataStructures/DataBox/DataBox.hpp" 12 : #include "IO/Observer/ObserverComponent.hpp" 13 : #include "IO/Observer/ReductionActions.hpp" 14 : #include "Parallel/GlobalCache.hpp" 15 : #include "Parallel/Info.hpp" 16 : #include "Parallel/Invoke.hpp" 17 : #include "Parallel/Local.hpp" 18 : #include "Parallel/MemoryMonitor/MemoryMonitor.hpp" 19 : #include "Parallel/MemoryMonitor/Tags.hpp" 20 : #include "Utilities/ErrorHandling/Assert.hpp" 21 : #include "Utilities/GetOutput.hpp" 22 : #include "Utilities/Gsl.hpp" 23 : #include "Utilities/Serialization/Serialize.hpp" 24 : 25 : namespace mem_monitor { 26 : /*! 27 : * \brief Simple action meant to be run on the MemoryMonitor component that 28 : * collects sizes from Groups and Nodegroups. 29 : * 30 : * \details This action collects the sizes of all the local branches of a group 31 : * or nodegroup component, computes the total memory usage on a node for each, 32 : * then writes it to disk. For groups, the proc with the maximum memory usage is 33 : * also reported along with the size on the proc. 34 : * 35 : * The columns in the dat file for a nodegroup when running on 3 nodes will be 36 : * 37 : * - %Time 38 : * - Size on node 0 (MB) 39 : * - Size on node 1 (MB) 40 : * - Size on node 2 (MB) 41 : * - Average size per node (MB) 42 : * 43 : * The columns in the dat file for a group when running on 3 nodes will be 44 : * 45 : * - %Time 46 : * - Size on node 0 (MB) 47 : * - Size on node 1 (MB) 48 : * - Size on node 2 (MB) 49 : * - Proc of max size 50 : * - Size on proc of max size (MB) 51 : * - Average size per node (MB) 52 : * 53 : * The dat file will be placed in the `/MemoryMonitors/` group in the reduction 54 : * file. The name of the dat file is the `pretty_type::name` of the component. 55 : */ 56 : template <typename ContributingComponent> 57 1 : struct ContributeMemoryData { 58 : template <typename ParallelComponent, typename DbTags, typename Metavariables, 59 : typename ArrayIndex> 60 0 : static void apply(db::DataBox<DbTags>& box, 61 : Parallel::GlobalCache<Metavariables>& cache, 62 : const ArrayIndex& /*array_index*/, const double time, 63 : const int node_or_proc, const double size_in_megabytes) { 64 : static_assert(Parallel::is_group_v<ContributingComponent> or 65 : Parallel::is_nodegroup_v<ContributingComponent>); 66 : 67 : using tag = Tags::MemoryHolder; 68 : db::mutate<tag>( 69 : [&cache, &time, &node_or_proc, &size_in_megabytes]( 70 : const gsl::not_null<std::unordered_map< 71 : std::string, 72 : std::unordered_map<double, std::unordered_map<int, double>>>*> 73 : memory_holder_all) { 74 : auto memory_holder_pair = memory_holder_all->try_emplace( 75 : pretty_type::name<ContributingComponent>()); 76 : auto& memory_holder = (*memory_holder_pair.first).second; 77 : 78 : memory_holder.try_emplace(time); 79 : memory_holder.at(time)[node_or_proc] = size_in_megabytes; 80 : 81 : // If we have received data for every node/proc at a given 82 : // time, get all the data, write it to disk, then remove the current 83 : // time from the stored times as it's no longer needed 84 : 85 : auto& mem_monitor_proxy = 86 : Parallel::get_parallel_component<MemoryMonitor<Metavariables>>( 87 : cache); 88 : 89 : constexpr bool is_group = Parallel::is_group_v<ContributingComponent>; 90 : 91 : const size_t num_nodes = Parallel::number_of_nodes<size_t>( 92 : *Parallel::local(mem_monitor_proxy)); 93 : const size_t num_procs = Parallel::number_of_procs<size_t>( 94 : *Parallel::local(mem_monitor_proxy)); 95 : const size_t expected_number = is_group ? num_procs : num_nodes; 96 : ASSERT(memory_holder.at(time).size() <= expected_number, 97 : "ContributeMemoryData received more data than it was " 98 : "expecting. Was expecting " 99 : << expected_number << " calls but instead got " 100 : << memory_holder.at(time).size()); 101 : if (memory_holder.at(time).size() == expected_number) { 102 : // First column is always time 103 : std::vector<double> data_to_append{time}; 104 : std::vector<std::string> legend{{"Time"}}; 105 : 106 : // Append a column for each node, and keep track of cumulative 107 : // total. If we have proc data (from groups) do an additional loop 108 : // over the procs to get the total on that node and get the proc 109 : // of the maximum memory usage 110 : double avg_size_per_node = 0.0; 111 : double max_usage_on_proc = -std::numeric_limits<double>::max(); 112 : int proc_of_max = 0; 113 : for (size_t node = 0; node < num_nodes; node++) { 114 : double size_on_node = 0.0; 115 : if (not is_group) { 116 : size_on_node = memory_holder.at(time).at(node); 117 : } else { 118 : const int first_proc = Parallel::first_proc_on_node<int>( 119 : node, *Parallel::local(mem_monitor_proxy)); 120 : const int procs_on_node = Parallel::procs_on_node<int>( 121 : node, *Parallel::local(mem_monitor_proxy)); 122 : const int last_proc = first_proc + procs_on_node; 123 : for (int proc = first_proc; proc < last_proc; proc++) { 124 : size_on_node += memory_holder.at(time).at(proc); 125 : if (memory_holder.at(time).at(proc) > max_usage_on_proc) { 126 : max_usage_on_proc = memory_holder.at(time).at(proc); 127 : proc_of_max = proc; 128 : } 129 : } 130 : } 131 : 132 : data_to_append.push_back(size_on_node); 133 : avg_size_per_node += size_on_node; 134 : legend.emplace_back("Size on node " + get_output(node) + " (MB)"); 135 : } 136 : 137 : // If we have proc data, write the proc with the maximum usage to 138 : // disk along with how much memory it's using 139 : if (is_group) { 140 : data_to_append.push_back(static_cast<double>(proc_of_max)); 141 : data_to_append.push_back(max_usage_on_proc); 142 : legend.emplace_back("Proc of max size"); 143 : legend.emplace_back("Size on proc of max size (MB)"); 144 : } 145 : 146 : avg_size_per_node /= static_cast<double>(num_nodes); 147 : 148 : // Last column is average over all nodes 149 : data_to_append.push_back(avg_size_per_node); 150 : legend.emplace_back("Average size per node (MB)"); 151 : 152 : auto& observer_writer_proxy = Parallel::get_parallel_component< 153 : observers::ObserverWriter<Metavariables>>(cache); 154 : 155 : Parallel::threaded_action< 156 : observers::ThreadedActions::WriteReductionDataRow>( 157 : // Node 0 is always the writer 158 : observer_writer_proxy[0], subfile_name<ContributingComponent>(), 159 : legend, std::make_tuple(data_to_append)); 160 : 161 : // Clean up finished time 162 : auto finished_time_iter = memory_holder.find(time); 163 : memory_holder.erase(finished_time_iter); 164 : } 165 : }, 166 : make_not_null(&box)); 167 : } 168 : }; 169 : } // namespace mem_monitor