Line data Source code
1 0 : // Distributed under the MIT License.
2 : // See LICENSE.txt for details.
3 :
4 : #pragma once
5 :
6 : #include <algorithm>
7 : #include <cmath>
8 : #include <cstddef>
9 : #include <iterator>
10 : #include <limits>
11 : #include <mutex>
12 : #include <optional>
13 : #include <string>
14 : #include <unordered_map>
15 : #include <utility>
16 : #include <vector>
17 :
18 : #include "DataStructures/DataBox/DataBox.hpp"
19 : #include "DataStructures/Index.hpp"
20 : #include "Domain/Creators/Tags/Domain.hpp"
21 : #include "Domain/FunctionsOfTime/Tags.hpp"
22 : #include "Domain/Tags.hpp"
23 : #include "IO/H5/AccessType.hpp"
24 : #include "IO/H5/File.hpp"
25 : #include "IO/H5/TensorData.hpp"
26 : #include "IO/H5/VolumeData.hpp"
27 : #include "IO/Observer/Helpers.hpp"
28 : #include "IO/Observer/ObservationId.hpp"
29 : #include "IO/Observer/ObserverComponent.hpp"
30 : #include "IO/Observer/Tags.hpp"
31 : #include "IO/Observer/TypeOfObservation.hpp"
32 : #include "Parallel/ArrayComponentId.hpp"
33 : #include "Parallel/GlobalCache.hpp"
34 : #include "Parallel/Info.hpp"
35 : #include "Parallel/Invoke.hpp"
36 : #include "Parallel/Local.hpp"
37 : #include "Parallel/NodeLock.hpp"
38 : #include "Parallel/ParallelComponentHelpers.hpp"
39 : #include "Utilities/Algorithm.hpp"
40 : #include "Utilities/ErrorHandling/Assert.hpp"
41 : #include "Utilities/ErrorHandling/Error.hpp"
42 : #include "Utilities/Gsl.hpp"
43 : #include "Utilities/MakeVector.hpp"
44 : #include "Utilities/Requires.hpp"
45 : #include "Utilities/Serialization/Serialize.hpp"
46 : #include "Utilities/StdHelpers.hpp"
47 : #include "Utilities/TMPL.hpp"
48 : #include "Utilities/TaggedTuple.hpp"
49 :
50 : namespace observers {
51 : /// \cond
52 : namespace ThreadedActions {
53 : struct ContributeVolumeDataToWriter;
54 : } // namespace ThreadedActions
55 : /// \endcond
56 : namespace Actions {
57 :
58 : /*!
59 : * \ingroup ObserversGroup
60 : * \brief Send volume tensor data to the observer.
61 : *
62 : * The caller of this Action (which is to be invoked on the Observer parallel
63 : * component) must pass in an `observation_id` used to uniquely identify the
64 : * observation in time, the name of the `h5::VolumeData` subfile in the HDF5
65 : * file (e.g. `/element_data`, where the slash is important), the contributing
66 : * parallel component element's component id, and the `ElementVolumeData`
67 : * to be written to disk.
68 : */
69 1 : struct ContributeVolumeData {
70 : template <typename ParallelComponent, typename DbTagsList,
71 : typename Metavariables, typename ArrayIndex>
72 0 : static void apply(
73 : db::DataBox<DbTagsList>& box, Parallel::GlobalCache<Metavariables>& cache,
74 : const ArrayIndex& array_index,
75 : const observers::ObservationId& observation_id,
76 : const std::string& subfile_name,
77 : const Parallel::ArrayComponentId& sender_array_id,
78 : ElementVolumeData&& received_volume_data,
79 : const std::optional<std::vector<char>>& serialized_functions_of_time =
80 : std::nullopt,
81 : const std::optional<std::string>& dependency = std::nullopt) {
82 : db::mutate<Tags::TensorData, Tags::ContributorsOfTensorData>(
83 : [&array_index, &cache, &received_volume_data, &observation_id,
84 : &sender_array_id, &subfile_name, &serialized_functions_of_time,
85 : &dependency](
86 : const gsl::not_null<std::unordered_map<
87 : observers::ObservationId,
88 : std::unordered_map<Parallel::ArrayComponentId,
89 : ElementVolumeData>>*>
90 : volume_data,
91 : const gsl::not_null<std::unordered_map<
92 : ObservationId, std::unordered_set<Parallel::ArrayComponentId>>*>
93 : contributed_volume_data_ids,
94 : const std::unordered_map<
95 : ObservationKey, std::unordered_set<Parallel::ArrayComponentId>>&
96 : registered_array_component_ids) {
97 : const ObservationKey& key{observation_id.observation_key()};
98 : if (UNLIKELY(registered_array_component_ids.find(key) ==
99 : registered_array_component_ids.end())) {
100 : ERROR("Receiving data from observation id "
101 : << observation_id << " that was never registered.");
102 : }
103 : const auto& registered_ids = registered_array_component_ids.at(key);
104 : if (UNLIKELY(registered_ids.find(sender_array_id) ==
105 : registered_ids.end())) {
106 : ERROR("Receiving volume data from array component id "
107 : << sender_array_id << " that is not registered.");
108 : }
109 :
110 : auto& contributed_array_ids =
111 : (*contributed_volume_data_ids)[observation_id];
112 : if (UNLIKELY(contributed_array_ids.find(sender_array_id) !=
113 : contributed_array_ids.end())) {
114 : ERROR("Already received volume data to observation id "
115 : << observation_id << " from array component id "
116 : << sender_array_id);
117 : }
118 : contributed_array_ids.insert(sender_array_id);
119 :
120 : if ((not volume_data->contains(observation_id)) or
121 : (not volume_data->at(observation_id).contains(sender_array_id))) {
122 : volume_data->operator[](observation_id)
123 : .emplace(sender_array_id, std::move(received_volume_data));
124 : } else {
125 : auto& current_data =
126 : volume_data->at(observation_id).at(sender_array_id);
127 : if (UNLIKELY(not alg::equal(current_data.extents,
128 : received_volume_data.extents))) {
129 : ERROR(
130 : "The extents from the same volume component at a specific "
131 : "observation should always be the same. For example, the "
132 : "extents of a dG element should be the same for all calls to "
133 : "ContributeVolumeData that occur at the same time.");
134 : }
135 : current_data.tensor_components.insert(
136 : current_data.tensor_components.end(),
137 : std::make_move_iterator(
138 : received_volume_data.tensor_components.begin()),
139 : std::make_move_iterator(
140 : received_volume_data.tensor_components.end()));
141 : }
142 :
143 : // Check if we have received all "volume" data from the registered
144 : // elements. If so we copy it to the nodegroup volume writer.
145 : if (contributed_array_ids.size() == registered_ids.size()) {
146 : auto& local_writer = *Parallel::local_branch(
147 : Parallel::get_parallel_component<ObserverWriter<Metavariables>>(
148 : cache));
149 : Parallel::threaded_action<
150 : ThreadedActions::ContributeVolumeDataToWriter>(
151 : local_writer, observation_id,
152 : Parallel::make_array_component_id<ParallelComponent>(
153 : array_index),
154 : subfile_name, std::move((*volume_data)[observation_id]),
155 : serialized_functions_of_time, dependency);
156 : volume_data->erase(observation_id);
157 : contributed_volume_data_ids->erase(observation_id);
158 : }
159 : },
160 : make_not_null(&box),
161 : db::get<Tags::ExpectedContributorsForObservations>(box));
162 : }
163 : };
164 : } // namespace Actions
165 :
166 : namespace ThreadedActions {
167 : namespace VolumeActions_detail {
168 : void write_data(const std::string& h5_file_name,
169 : const std::string& input_source,
170 : const std::string& subfile_path,
171 : const observers::ObservationId& observation_id,
172 : std::vector<ElementVolumeData>&& volume_data);
173 :
174 : template <typename ParallelComponent, typename Metavariables,
175 : typename VolumeDataAtObsId>
176 : void write_combined_volume_data(
177 : Parallel::GlobalCache<Metavariables>& cache,
178 : const observers::ObservationId& observation_id,
179 : const VolumeDataAtObsId& volume_data,
180 : const gsl::not_null<Parallel::NodeLock*> volume_file_lock,
181 : const std::string& subfile_name,
182 : const std::optional<std::vector<char>>&
183 : serialized_observation_functions_of_time) {
184 : ASSERT(not volume_data.empty(),
185 : "Failed to populate volume_data before trying to write it.");
186 :
187 : std::vector<ElementVolumeData> volume_data_to_write;
188 :
189 : if constexpr (std::is_same_v<tmpl::at_c<VolumeDataAtObsId, 1>,
190 : ElementVolumeData>) {
191 : volume_data_to_write.reserve(volume_data.size());
192 : for (const auto& [id, element] : volume_data) {
193 : (void)id; // avoid compiler warnings
194 : volume_data_to_write.push_back(element);
195 : }
196 : } else {
197 : size_t total_size = 0;
198 : for (const auto& [id, vec_elements] : volume_data) {
199 : (void)id; // avoid compiler warnings
200 : total_size += vec_elements.size();
201 : }
202 : volume_data_to_write.reserve(total_size);
203 :
204 : for (const auto& [id, vec_elements] : volume_data) {
205 : (void)id; // avoid compiler warnings
206 : volume_data_to_write.insert(volume_data_to_write.end(),
207 : vec_elements.begin(), vec_elements.end());
208 : }
209 : }
210 :
211 : // Write to file. We use a separate node lock because writing can be
212 : // very time consuming (it's network dependent, depends on how full the
213 : // disks are, what other users are doing, etc.) and we want to be able
214 : // to continue to work on the nodegroup while we are writing data to
215 : // disk.
216 : const std::lock_guard hold_lock(*volume_file_lock);
217 : {
218 : // Scoping is for closing HDF5 file before we release the lock.
219 : const auto& file_prefix = Parallel::get<Tags::VolumeFileName>(cache);
220 : auto& my_proxy = Parallel::get_parallel_component<ParallelComponent>(cache);
221 : h5::H5File<h5::AccessType::ReadWrite> h5file(
222 : file_prefix +
223 : std::to_string(
224 : Parallel::my_node<int>(*Parallel::local_branch(my_proxy))) +
225 : ".h5",
226 : true, observers::input_source_from_cache(cache));
227 : constexpr size_t version_number = 0;
228 : auto& volume_file =
229 : h5file.try_insert<h5::VolumeData>(subfile_name, version_number);
230 :
231 : // Serialize domain. See `Domain` docs for details on the serialization.
232 : // The domain is retrieved from the global cache using the standard
233 : // domain tag. If more flexibility is required here later, then the
234 : // domain can be passed along with the `ContributeVolumeData` action.
235 : std::optional<std::vector<char>> serialized_domain{};
236 : if (not volume_file.has_domain()) {
237 : serialized_domain = serialize(
238 : Parallel::get<domain::Tags::Domain<Metavariables::volume_dim>>(
239 : cache));
240 : }
241 :
242 : std::optional<std::vector<char>> serialized_global_functions_of_time =
243 : std::nullopt;
244 : if constexpr (Parallel::is_in_global_cache<Metavariables,
245 : domain::Tags::FunctionsOfTime>) {
246 : const auto& functions_of_time = get<domain::Tags::FunctionsOfTime>(cache);
247 : serialized_global_functions_of_time = serialize(functions_of_time);
248 : }
249 :
250 : // Write the data to the file
251 : volume_file.write_volume_data(observation_id.hash(), observation_id.value(),
252 : volume_data_to_write, serialized_domain,
253 : serialized_observation_functions_of_time,
254 : serialized_global_functions_of_time);
255 : }
256 : }
257 : } // namespace VolumeActions_detail
258 : /*!
259 : * \ingroup ObserversGroup
260 : * \brief Move data to the observer writer for writing to disk.
261 : *
262 : * Once data from all cores is collected this action writes the data to disk if
263 : * there isn't a dependency. Or if there is a dependency and it has been
264 : * received already. If there is a dependency but it hasn't been received yet,
265 : * data will be written by a call to `ContributeDependency`.
266 : */
267 1 : struct ContributeVolumeDataToWriter {
268 : template <typename ParallelComponent, typename DbTagsList,
269 : typename Metavariables, typename ArrayIndex>
270 0 : static void apply(
271 : db::DataBox<DbTagsList>& box, Parallel::GlobalCache<Metavariables>& cache,
272 : const ArrayIndex& /*array_index*/,
273 : const gsl::not_null<Parallel::NodeLock*> node_lock,
274 : const observers::ObservationId& observation_id,
275 : Parallel::ArrayComponentId observer_group_id,
276 : const std::string& subfile_name,
277 : std::unordered_map<Parallel::ArrayComponentId,
278 : std::vector<ElementVolumeData>>&& received_volume_data,
279 : const std::optional<std::vector<char>>& serialized_functions_of_time =
280 : std::nullopt,
281 : const std::optional<std::string>& dependency = std::nullopt) {
282 : apply_impl<Tags::InterpolatorTensorData, ParallelComponent>(
283 : box, cache, node_lock, observation_id, observer_group_id, subfile_name,
284 : std::move(received_volume_data), serialized_functions_of_time,
285 : dependency);
286 : }
287 :
288 : template <typename ParallelComponent, typename DbTagsList,
289 : typename Metavariables, typename ArrayIndex>
290 0 : static void apply(
291 : db::DataBox<DbTagsList>& box, Parallel::GlobalCache<Metavariables>& cache,
292 : const ArrayIndex& /*array_index*/,
293 : const gsl::not_null<Parallel::NodeLock*> node_lock,
294 : const observers::ObservationId& observation_id,
295 : Parallel::ArrayComponentId observer_group_id,
296 : const std::string& subfile_name,
297 : std::unordered_map<Parallel::ArrayComponentId, ElementVolumeData>&&
298 : received_volume_data,
299 : const std::optional<std::vector<char>>& serialized_functions_of_time =
300 : std::nullopt,
301 : const std::optional<std::string>& dependency = std::nullopt) {
302 : apply_impl<Tags::TensorData, ParallelComponent>(
303 : box, cache, node_lock, observation_id, observer_group_id, subfile_name,
304 : std::move(received_volume_data), serialized_functions_of_time,
305 : dependency);
306 : }
307 :
308 : private:
309 : template <typename TensorDataTag, typename ParallelComponent,
310 : typename DbTagsList, typename Metavariables,
311 : typename VolumeDataAtObsId>
312 0 : static void apply_impl(
313 : db::DataBox<DbTagsList>& box, Parallel::GlobalCache<Metavariables>& cache,
314 : const gsl::not_null<Parallel::NodeLock*> node_lock,
315 : const observers::ObservationId& observation_id,
316 : Parallel::ArrayComponentId observer_group_id,
317 : const std::string& subfile_name, VolumeDataAtObsId received_volume_data,
318 : const std::optional<std::vector<char>>& serialized_functions_of_time,
319 : const std::optional<std::string>& dependency) {
320 : // The below gymnastics with pointers is done in order to minimize the
321 : // time spent locking the entire node, which is necessary because the
322 : // DataBox does not allow any function calls, either get and mutate, during
323 : // a mutate. We separate out writing from the operations that edit the
324 : // DataBox since writing to disk can be very slow, but moving data around is
325 : // comparatively quick.
326 : Parallel::NodeLock* volume_file_lock = nullptr;
327 : bool perform_write = false;
328 : VolumeDataAtObsId volume_data{};
329 :
330 : {
331 : const std::lock_guard hold_lock(*node_lock);
332 :
333 : // Set file lock for later
334 : db::mutate<Tags::H5FileLock>(
335 : [&volume_file_lock](
336 : const gsl::not_null<Parallel::NodeLock*> volume_file_lock_ptr) {
337 : volume_file_lock = &*volume_file_lock_ptr;
338 : },
339 : make_not_null(&box));
340 :
341 : ASSERT(volume_file_lock != nullptr,
342 : "Failed to set volume_file_lock in the mutate");
343 :
344 : const auto& observations_registered =
345 : db::get<Tags::ExpectedContributorsForObservations>(box);
346 :
347 : const ObservationKey& key = observation_id.observation_key();
348 : if (LIKELY(observations_registered.contains(key))) {
349 : if (UNLIKELY(not observations_registered.at(key).contains(
350 : observer_group_id))) {
351 : ERROR("The observer group id "
352 : << observer_group_id
353 : << " was not registered for the observation id "
354 : << observation_id);
355 : }
356 : } else {
357 : ERROR("key " << key
358 : << " not in the registered group ids. Known keys are "
359 : << keys_of(observations_registered));
360 : }
361 :
362 : const size_t observations_registered_with_id =
363 : observations_registered.at(key).size();
364 :
365 : // Ok because we have the node lock
366 : auto& volume_observers_contributed =
367 : db::get_mutable_reference<Tags::ContributorsOfTensorData>(
368 : make_not_null(&box));
369 : auto& all_volume_data =
370 : db::get_mutable_reference<TensorDataTag>(make_not_null(&box));
371 : auto& all_serialized_functions_of_time =
372 : db::get_mutable_reference<Tags::SerializedFunctionsOfTime>(
373 : make_not_null(&box));
374 : auto& box_dependencies =
375 : db::get_mutable_reference<Tags::Dependencies>(make_not_null(&box));
376 :
377 : auto& contributed_group_ids =
378 : volume_observers_contributed[observation_id];
379 :
380 : if (UNLIKELY(contributed_group_ids.contains(observer_group_id))) {
381 : ERROR("Already received reduction data to observation id "
382 : << observation_id << " from array component id "
383 : << observer_group_id);
384 : }
385 : contributed_group_ids.insert(observer_group_id);
386 :
387 : // Add received volume data to the box
388 : if (all_volume_data.contains(observation_id)) {
389 : auto& current_data = all_volume_data.at(observation_id);
390 : current_data.insert(
391 : std::make_move_iterator(received_volume_data.begin()),
392 : std::make_move_iterator(received_volume_data.end()));
393 : ASSERT(all_serialized_functions_of_time.at(observation_id) ==
394 : serialized_functions_of_time,
395 : "Got different serialized functions of time from different "
396 : "elements.");
397 : } else {
398 : // We haven't been called before on this processing element.
399 : all_volume_data[observation_id] = std::move(received_volume_data);
400 : all_serialized_functions_of_time[observation_id] =
401 : serialized_functions_of_time;
402 : }
403 :
404 : // Check if we have received all "volume" data from the Observer
405 : // group. If so we write to disk.
406 : if (volume_observers_contributed.at(observation_id).size() ==
407 : observations_registered_with_id) {
408 : // Check if
409 : // 1. there is an external dependencies
410 : if (dependency.has_value()) {
411 : // 2. if there is, that we have received something at this time
412 : // 3. that the dependencies are the same
413 : if (box_dependencies.contains(observation_id)) {
414 : if (UNLIKELY(box_dependencies.at(observation_id).first !=
415 : dependency.value())) {
416 : ERROR(
417 : "The dependency that was sent to the ObserverWriter from the "
418 : "elements ("
419 : << dependency.value()
420 : << ") does not match the dependency received from "
421 : "ContributeDependency ("
422 : << box_dependencies.at(observation_id).first << ").");
423 : }
424 : // 4. that we are writing the volume data to disk
425 : if (box_dependencies.at(observation_id).second) {
426 : perform_write = true;
427 : volume_data = std::move(all_volume_data[observation_id]);
428 : }
429 :
430 : // Whether or not we are writing data to disk, we clean up because
431 : // we have received both the volume data and the dependency
432 : all_volume_data.erase(observation_id);
433 : all_serialized_functions_of_time.erase(observation_id);
434 : volume_observers_contributed.erase(observation_id);
435 : box_dependencies.erase(observation_id);
436 : }
437 : } else {
438 : perform_write = true;
439 : volume_data = std::move(all_volume_data[observation_id]);
440 : all_volume_data.erase(observation_id);
441 : all_serialized_functions_of_time.erase(observation_id);
442 : volume_observers_contributed.erase(observation_id);
443 : }
444 : }
445 : }
446 :
447 : if (perform_write) {
448 : VolumeActions_detail::write_combined_volume_data<ParallelComponent>(
449 : cache, observation_id, volume_data, make_not_null(volume_file_lock),
450 : subfile_name, serialized_functions_of_time);
451 : }
452 : }
453 : };
454 :
455 : /*!
456 : * \brief Threaded action that will add a dependency to the ObserverWriter for a
457 : * given ObservationId ( \p time + \p volume_subfile_name).
458 : *
459 : * \details If not all the volume data for this ObservationId has been received
460 : * yet, then this will just add the dependency to the box and exit without
461 : * writing anything. If all volume data arrives before this action is called,
462 : * then it will write out the volume data (or remove it if we aren't writing).
463 : */
464 1 : struct ContributeDependency {
465 : template <typename ParallelComponent, typename DbTagsList,
466 : typename Metavariables, typename ArrayIndex>
467 0 : static void apply(db::DataBox<DbTagsList>& box,
468 : Parallel::GlobalCache<Metavariables>& cache,
469 : const ArrayIndex& /*array_index*/,
470 : const gsl::not_null<Parallel::NodeLock*> node_lock,
471 : const double time, const std::string& dependency,
472 : std::string volume_subfile_name,
473 : const bool write_volume_data) {
474 : if (not volume_subfile_name.starts_with("/")) {
475 : volume_subfile_name = "/" + volume_subfile_name;
476 : }
477 : if (not volume_subfile_name.ends_with(".vol")) {
478 : volume_subfile_name += ".vol";
479 : }
480 :
481 : const ObservationId observation_id{time, volume_subfile_name};
482 :
483 : // The below gymnastics with pointers is done in order to minimize the
484 : // time spent locking the entire node, which is necessary because the
485 : // DataBox does not allow any function calls, either get and mutate, during
486 : // a mutate. We separate out writing from the operations that edit the
487 : // DataBox since writing to disk can be very slow, but moving data around is
488 : // comparatively quick.
489 : Parallel::NodeLock* volume_file_lock = nullptr;
490 : bool perform_write = false;
491 : std::unordered_map<Parallel::ArrayComponentId, ElementVolumeData>
492 : volume_data{};
493 : std::optional<std::vector<char>> serialized_functions_of_time{};
494 :
495 : // For now just hold the entire node. We can optimize with different locks
496 : // later on
497 : {
498 : const std::lock_guard hold_lock(*node_lock);
499 :
500 : db::mutate<Tags::H5FileLock, Tags::Dependencies>(
501 : [&](const gsl::not_null<Parallel::NodeLock*> volume_file_lock_ptr,
502 : const gsl::not_null<std::unordered_map<
503 : ObservationId, std::pair<std::string, bool>>*>
504 : dependencies) {
505 : volume_file_lock = &*volume_file_lock_ptr;
506 : (*dependencies)[observation_id] =
507 : std::pair{dependency, write_volume_data};
508 : },
509 : make_not_null(&box));
510 :
511 : auto& volume_observers_contributed =
512 : db::get_mutable_reference<Tags::ContributorsOfTensorData>(
513 : make_not_null(&box));
514 : auto& all_volume_data =
515 : db::get_mutable_reference<Tags::TensorData>(make_not_null(&box));
516 : auto& all_serialized_functions_of_time =
517 : db::get_mutable_reference<Tags::SerializedFunctionsOfTime>(
518 : make_not_null(&box));
519 : auto& box_dependencies =
520 : db::get_mutable_reference<Tags::Dependencies>(make_not_null(&box));
521 : const auto& expected_contributors =
522 : db::get<Tags::ExpectedContributorsForObservations>(box);
523 :
524 : if (not expected_contributors.contains(
525 : observation_id.observation_key())) {
526 : ERROR("Key " << observation_id.observation_key()
527 : << " was not registered.");
528 : }
529 :
530 : // We have not received any volume data at this time so we can't do
531 : // anything
532 : if (not(volume_observers_contributed.contains(observation_id) and
533 : all_volume_data.contains(observation_id))) {
534 : return;
535 : }
536 :
537 : // Check if we have received all "volume" data from the Observer
538 : // group. If so we write to disk. Then always delete it since the volume
539 : // data was waiting for this dependency to arrive to be written
540 : if (volume_observers_contributed.at(observation_id).size() ==
541 : expected_contributors.at(observation_id.observation_key()).size()) {
542 : if (write_volume_data) {
543 : perform_write = true;
544 : volume_data = std::move(all_volume_data.at(observation_id));
545 : serialized_functions_of_time =
546 : std::move(all_serialized_functions_of_time.at(observation_id));
547 : }
548 :
549 : all_volume_data.erase(observation_id);
550 : all_serialized_functions_of_time.erase(observation_id);
551 : volume_observers_contributed.erase(observation_id);
552 : box_dependencies.erase(observation_id);
553 : }
554 : }
555 :
556 : if (perform_write) {
557 : VolumeActions_detail::write_combined_volume_data<ParallelComponent>(
558 : cache, observation_id, volume_data, make_not_null(volume_file_lock),
559 : volume_subfile_name, serialized_functions_of_time);
560 : }
561 : }
562 : };
563 :
564 : /*!
565 : * \brief Write volume data (such as surface data) at a given time (specified by
566 : * an `ObservationId`) without the need to register or reduce anything, e.g.
567 : * from a singleton component or from a specific chare.
568 : *
569 : * Use `observers::Actions::ContributeVolumeDataToWriter` instead if you need to
570 : * write volume data from an array chare (e.g., writing volume data from all the
571 : * elements in a domain).
572 : *
573 : * Invoke this action on the `observers::ObserverWriter` component on node 0.
574 : * Pass the following arguments when invoking this action:
575 : *
576 : * - `h5_file_name`: the name of the HDF5 file where the volume data is to be
577 : * written (without the .h5 extension).
578 : * - `subfile_path`: the path where the volume data should be written in the
579 : * HDF5 file. Include a leading slash, e.g., `/AhA`.
580 : * - `observation_id`: the ObservationId corresponding to the volume data.
581 : * - `volume_data`: the volume data to be written.
582 : */
583 1 : struct WriteVolumeData {
584 : template <typename ParallelComponent, typename DbTagsList,
585 : typename Metavariables, typename ArrayIndex, typename... Ts,
586 : typename DataBox = db::DataBox<DbTagsList>>
587 0 : static void apply(db::DataBox<DbTagsList>& box,
588 : Parallel::GlobalCache<Metavariables>& cache,
589 : const ArrayIndex& /*array_index*/,
590 : const gsl::not_null<Parallel::NodeLock*> /*node_lock*/,
591 : const std::string& h5_file_name,
592 : const std::string& subfile_path,
593 : const observers::ObservationId& observation_id,
594 : std::vector<ElementVolumeData>&& volume_data) {
595 : auto& volume_file_lock =
596 : db::get_mutable_reference<Tags::H5FileLock>(make_not_null(&box));
597 : const std::lock_guard hold_lock(volume_file_lock);
598 : VolumeActions_detail::write_data(
599 : h5_file_name, observers::input_source_from_cache(cache), subfile_path,
600 : observation_id, std::move(volume_data));
601 : }
602 :
603 : // For a local synchronous action
604 0 : using return_type = void;
605 :
606 : /// \brief The apply call for the local synchronous action
607 : template <typename ParallelComponent, typename DbTagsList,
608 : typename Metavariables, typename... Ts,
609 : typename DataBox = db::DataBox<DbTagsList>>
610 1 : static return_type apply(
611 : db::DataBox<DbTagsList>& box,
612 : const gsl::not_null<Parallel::NodeLock*> /*node_lock*/,
613 : Parallel::GlobalCache<Metavariables>& cache,
614 : const std::string& h5_file_name, const std::string& subfile_path,
615 : const observers::ObservationId& observation_id,
616 : std::vector<ElementVolumeData>&& volume_data) {
617 : auto& volume_file_lock =
618 : db::get_mutable_reference<Tags::H5FileLock>(make_not_null(&box));
619 : const std::lock_guard hold_lock(volume_file_lock);
620 : VolumeActions_detail::write_data(
621 : h5_file_name, observers::input_source_from_cache(cache), subfile_path,
622 : observation_id, std::move(volume_data));
623 : }
624 : };
625 : } // namespace ThreadedActions
626 :
627 : /*!
628 : * \brief Contribute volume data for observing from an element.
629 : *
630 : * \tparam UseObserverComponent Whether to first send data to the
631 : * `observers::Observer` group component. Generally should be true if
632 : * using an implementation where elements are bound to cores, and
633 : * false if they are only bound to nodes.
634 : */
635 : template <bool UseObserverComponent, typename Metavariables>
636 1 : void contribute_volume_data(
637 : Parallel::GlobalCache<Metavariables>& cache,
638 : observers::ObservationId observation_id, std::string subfile_path,
639 : const Parallel::ArrayComponentId& array_component_id,
640 : ElementVolumeData element_volume_data,
641 : std::optional<std::string> dependency = std::nullopt) {
642 : std::optional<std::vector<char>> serialized_observation_functions_of_time{};
643 : if constexpr (Parallel::is_in_global_cache<Metavariables,
644 : domain::Tags::FunctionsOfTime>) {
645 : const auto& functions_of_time = get<domain::Tags::FunctionsOfTime>(cache);
646 : // NOLINTNEXTLINE(misc-const-correctness)
647 : domain::FunctionsOfTimeMap observation_functions_of_time{};
648 : const double obs_time = observation_id.value();
649 : // Generally, the functions of time should be valid when we
650 : // perform an observation. The exception is when running in an
651 : // AtCleanup event, in which case the observation time is a
652 : // bogus value and we just skip writing the values.
653 : if (alg::all_of(functions_of_time, [&](const auto& fot) {
654 : const auto bounds = fot.second->time_bounds();
655 : return bounds[0] <= obs_time and obs_time <= bounds[1];
656 : })) {
657 : // create a new function of time, effectively truncating the history.
658 : for (const auto& [name, fot_ptr] : functions_of_time) {
659 : observation_functions_of_time[name] = fot_ptr->create_at_time(
660 : obs_time, obs_time + 100.0 *
661 : std::numeric_limits<double>::epsilon() *
662 : std::max(std::abs(obs_time), 1.0));
663 : }
664 : serialized_observation_functions_of_time =
665 : serialize(observation_functions_of_time);
666 : }
667 : }
668 :
669 : if constexpr (UseObserverComponent) {
670 : // Send data to volume observer
671 : auto& local_observer = *Parallel::local_branch(
672 : Parallel::get_parallel_component<observers::Observer<Metavariables>>(
673 : cache));
674 :
675 : Parallel::simple_action<observers::Actions::ContributeVolumeData>(
676 : local_observer, std::move(observation_id), std::move(subfile_path),
677 : array_component_id, std::move(element_volume_data),
678 : std::move(serialized_observation_functions_of_time),
679 : std::move(dependency));
680 : } else {
681 : // Send data to reduction observer writer (nodegroup)
682 : auto& local_observer = *Parallel::local_branch(
683 : Parallel::get_parallel_component<
684 : observers::ObserverWriter<Metavariables>>(cache));
685 :
686 : std::unordered_map<Parallel::ArrayComponentId,
687 : std::vector<ElementVolumeData>>
688 : data_to_send{};
689 : data_to_send[array_component_id] =
690 : make_vector(std::move(element_volume_data));
691 : Parallel::threaded_action<
692 : observers::ThreadedActions::ContributeVolumeDataToWriter>(
693 : local_observer, std::move(observation_id), array_component_id,
694 : std::move(subfile_path), std::move(data_to_send),
695 : std::move(serialized_observation_functions_of_time),
696 : std::move(dependency));
697 : }
698 : }
699 : } // namespace observers
|