Line data Source code
1 0 : // Distributed under the MIT License. 2 : // See LICENSE.txt for details. 3 : 4 : #pragma once 5 : 6 : #include <limits> 7 : #include <optional> 8 : #include <pup.h> 9 : #include <string> 10 : #include <type_traits> 11 : #include <utility> 12 : 13 : #include "Options/Auto.hpp" 14 : #include "Options/Options.hpp" 15 : #include "Parallel/AlgorithmMetafunctions.hpp" 16 : #include "Parallel/ExitCode.hpp" 17 : #include "Parallel/GlobalCache.hpp" 18 : #include "Parallel/Phase.hpp" 19 : #include "Parallel/PhaseControl/ContributeToPhaseChangeReduction.hpp" 20 : #include "Parallel/PhaseControl/PhaseChange.hpp" 21 : #include "Utilities/ErrorHandling/Assert.hpp" 22 : #include "Utilities/Functional.hpp" 23 : #include "Utilities/Serialization/CharmPupable.hpp" 24 : #include "Utilities/System/ParallelInfo.hpp" 25 : #include "Utilities/TMPL.hpp" 26 : #include "Utilities/TaggedTuple.hpp" 27 : #include "Utilities/UtcTime.hpp" 28 : 29 1 : namespace PhaseControl { 30 : 31 0 : namespace Tags { 32 : /// Storage in the phase change decision tuple so that the Main chare can record 33 : /// the phase to go to when restarting the run from a checkpoint file. 34 : /// 35 : /// \note This tag is not intended to participate in any of the reduction 36 : /// procedures, so will error if the combine method is called. 37 1 : struct RestartPhase { 38 0 : using type = std::optional<Parallel::Phase>; 39 : 40 0 : struct combine_method { 41 0 : [[noreturn]] std::optional<Parallel::Phase> operator()( 42 : const std::optional<Parallel::Phase> /*first_phase*/, 43 : const std::optional<Parallel::Phase>& /*second_phase*/); 44 : }; 45 : 46 0 : using main_combine_method = combine_method; 47 : }; 48 : 49 : /// Stores whether the checkpoint and exit has been requested. 50 : /// 51 : /// Combinations are performed via `funcl::Or`, as the phase in question should 52 : /// be chosen if any component requests the jump. 53 1 : struct CheckpointAndExitRequested { 54 0 : using type = bool; 55 : 56 0 : using combine_method = funcl::Or<>; 57 0 : using main_combine_method = funcl::Or<>; 58 : }; 59 : 60 : } // namespace Tags 61 : 62 : /*! 63 : * \brief Phase control object that runs the WriteCheckpoint and Exit phases 64 : * after a specified amount of wallclock time has elapsed. 65 : * 66 : * When the executable exits from here, it does so with 67 : * `Parallel::ExitCode::ContinueFromCheckpoint`. 68 : * 69 : * This phase control is useful for running SpECTRE executables performing 70 : * lengthy computations that may exceed a supercomputer's wallclock limits. 71 : * Writing a single checkpoint at the end of the job's allocated time allows 72 : * the computation to be continued, while minimizing the disc space taken up by 73 : * checkpoint files. 74 : * 75 : * When restarting from the checkpoint, this phase control sends the control 76 : * flow to a UpdateOptionsAtRestartFromCheckpoint phase, allowing the user to 77 : * update (some) simulation parameters for the continuation of the run. 78 : * 79 : * Note that this phase control is not a trigger on wallclock time. Rather, 80 : * it checks the elapsed wallclock time when called, likely from a global sync 81 : * point triggered by some other mechanism, e.g., at some slab boundary. 82 : * Therefore, the WriteCheckpoint and Exit phases will run the first time 83 : * this phase control is called after the specified wallclock time has been 84 : * reached. 85 : * 86 : * \warning the global sync points _must_ be triggered often enough to ensure 87 : * there will be at least one sync point (i.e., one call to this phase control) 88 : * in the window between the requested checkpoint-and-exit time and the time at 89 : * which the batch system will kill the executable. To make this more concrete, 90 : * consider this example: when running on a 12-hour queue with a 91 : * checkpoint-and-exit requested after 11.5 hours, there is a 0.5-hour window 92 : * for a global sync to occur, the checkpoint files to be written to disc, and 93 : * the executable to clean up. In this case, triggering a global sync every 94 : * 2-10 minutes might be desirable. Matching the global sync frequency with the 95 : * time window for checkpoint and exit is the responsibility of the user! 96 : * 97 : * \parblock 98 : * \warning If modifying the phase-change logic on a 99 : * checkpoint-restart, this PhaseChange must remain in the list after 100 : * modification so that the end of the restart logic will run. The 101 : * WallclockHours can be changed to None to disable further restarts. 102 : * \endparblock 103 : */ 104 1 : struct CheckpointAndExitAfterWallclock : public PhaseChange { 105 0 : CheckpointAndExitAfterWallclock(const std::optional<double> wallclock_hours, 106 : const Options::Context& context = {}); 107 : 108 0 : explicit CheckpointAndExitAfterWallclock(CkMigrateMessage* msg); 109 : 110 : /// \cond 111 : CheckpointAndExitAfterWallclock() = default; 112 : using PUP::able::register_constructor; 113 : WRAPPED_PUPable_decl_template(CheckpointAndExitAfterWallclock); // NOLINT 114 : /// \endcond 115 : 116 0 : struct WallclockHours { 117 0 : using type = Options::Auto<double, Options::AutoLabel::None>; 118 0 : static constexpr Options::String help = { 119 : "Time in hours after which to write the checkpoint and exit. " 120 : "If 'None' is specified, no action will be taken."}; 121 : }; 122 : 123 0 : using options = tmpl::list<WallclockHours>; 124 0 : static constexpr Options::String help{ 125 : "Once the wallclock time has exceeded the specified amount, trigger " 126 : "writing a checkpoint and then exit with the 'ContinueFromCheckpoint' " 127 : "exit code."}; 128 : 129 0 : using argument_tags = tmpl::list<>; 130 0 : using return_tags = tmpl::list<>; 131 : 132 0 : using phase_change_tags_and_combines = 133 : tmpl::list<Tags::RestartPhase, Tags::CheckpointAndExitRequested>; 134 : 135 : template <typename Metavariables> 136 0 : using participating_components = typename Metavariables::component_list; 137 : 138 : template <typename... DecisionTags> 139 0 : void initialize_phase_data_impl( 140 : const gsl::not_null<tuples::TaggedTuple<DecisionTags...>*> 141 : phase_change_decision_data) const; 142 : 143 : template <typename ParallelComponent, typename ArrayIndex, 144 : typename Metavariables> 145 0 : void contribute_phase_data_impl(Parallel::GlobalCache<Metavariables>& cache, 146 : const ArrayIndex& array_index) const; 147 : 148 : template <typename... DecisionTags, typename Metavariables> 149 : typename std::optional<std::pair<Parallel::Phase, ArbitrationStrategy>> 150 0 : arbitrate_phase_change_impl( 151 : const gsl::not_null<tuples::TaggedTuple<DecisionTags...>*> 152 : phase_change_decision_data, 153 : const Parallel::Phase current_phase, 154 : const Parallel::GlobalCache<Metavariables>& /*cache*/) const; 155 : 156 0 : void pup(PUP::er& p) override; 157 : 158 : private: 159 0 : std::optional<double> wallclock_hours_for_checkpoint_and_exit_ = std::nullopt; 160 : // This flag is set during arbitration when the class decides to 161 : // halt the run. As it is not checkpointed, this distinguishes the 162 : // state immediately after writing the checkpoint from that 163 : // immediately after reading it during the restart. 164 : // 165 : // Phase arbitration is only run from Main, so there are no 166 : // threading issues here. 167 : // NOLINTNEXTLINE(spectre-mutable) 168 0 : mutable bool halting_ = false; 169 : }; 170 : 171 : template <typename... DecisionTags> 172 : void CheckpointAndExitAfterWallclock::initialize_phase_data_impl( 173 : const gsl::not_null<tuples::TaggedTuple<DecisionTags...>*> 174 : phase_change_decision_data) const { 175 : tuples::get<Tags::RestartPhase>(*phase_change_decision_data) = std::nullopt; 176 : tuples::get<Tags::CheckpointAndExitRequested>(*phase_change_decision_data) = 177 : false; 178 : } 179 : 180 : template <typename ParallelComponent, typename ArrayIndex, 181 : typename Metavariables> 182 : void CheckpointAndExitAfterWallclock::contribute_phase_data_impl( 183 : Parallel::GlobalCache<Metavariables>& cache, 184 : const ArrayIndex& array_index) const { 185 : if constexpr (std::is_same_v<typename ParallelComponent::chare_type, 186 : Parallel::Algorithms::Array>) { 187 : Parallel::contribute_to_phase_change_reduction<ParallelComponent>( 188 : tuples::TaggedTuple<Tags::CheckpointAndExitRequested>{true}, cache, 189 : array_index); 190 : } else { 191 : Parallel::contribute_to_phase_change_reduction<ParallelComponent>( 192 : tuples::TaggedTuple<Tags::CheckpointAndExitRequested>{true}, cache); 193 : } 194 : } 195 : 196 : template <typename... DecisionTags, typename Metavariables> 197 : typename std::optional<std::pair<Parallel::Phase, ArbitrationStrategy>> 198 : CheckpointAndExitAfterWallclock::arbitrate_phase_change_impl( 199 : const gsl::not_null<tuples::TaggedTuple<DecisionTags...>*> 200 : phase_change_decision_data, 201 : const Parallel::Phase current_phase, 202 : const Parallel::GlobalCache<Metavariables>& /*cache*/) const { 203 : const double elapsed_hours = sys::wall_time() / 3600.0; 204 : 205 : auto& restart_phase = 206 : tuples::get<Tags::RestartPhase>(*phase_change_decision_data); 207 : auto& exit_code = 208 : tuples::get<Parallel::Tags::ExitCode>(*phase_change_decision_data); 209 : if (restart_phase.has_value()) { 210 : // This `if` branch, where restart_phase has a value, is the 211 : // post-checkpoint call to arbitrate_phase_change. 212 : if (halting_) { 213 : // Preserve restart_phase for use after restarting from the checkpoint 214 : exit_code = Parallel::ExitCode::ContinueFromCheckpoint; 215 : return std::make_pair(Parallel::Phase::Exit, 216 : ArbitrationStrategy::RunPhaseImmediately); 217 : } else { 218 : // if current_phase is WriteCheckpoint, we follow with updating options 219 : if (current_phase == Parallel::Phase::WriteCheckpoint) { 220 : Parallel::printf("Restarting from checkpoint. Date and time: %s\n", 221 : utc_time()); 222 : return std::make_pair( 223 : Parallel::Phase::UpdateOptionsAtRestartFromCheckpoint, 224 : ArbitrationStrategy::PermitAdditionalJumps); 225 : } else if (current_phase == 226 : Parallel::Phase::UpdateOptionsAtRestartFromCheckpoint) { 227 : return std::make_pair(Parallel::Phase::Restart, 228 : ArbitrationStrategy::PermitAdditionalJumps); 229 : } 230 : // Reset restart_phase until it is needed for the next checkpoint 231 : const auto result = restart_phase; 232 : restart_phase.reset(); 233 : return std::make_pair(result.value(), 234 : ArbitrationStrategy::PermitAdditionalJumps); 235 : } 236 : } 237 : 238 : auto& checkpoint_and_exit_requested = 239 : tuples::get<Tags::CheckpointAndExitRequested>( 240 : *phase_change_decision_data); 241 : if (checkpoint_and_exit_requested) { 242 : checkpoint_and_exit_requested = false; 243 : if (elapsed_hours >= wallclock_hours_for_checkpoint_and_exit_.value_or( 244 : std::numeric_limits<double>::infinity())) { 245 : // Record phase and actual elapsed time for determining following phase 246 : restart_phase = current_phase; 247 : ASSERT(not halting_, "Halting for checkpoint recursively"); 248 : halting_ = true; 249 : return std::make_pair(Parallel::Phase::WriteCheckpoint, 250 : ArbitrationStrategy::RunPhaseImmediately); 251 : } 252 : } 253 : return std::nullopt; 254 : } 255 : } // namespace PhaseControl