Line data Source code
1 0 : // Distributed under the MIT License.
2 : // See LICENSE.txt for details.
3 :
4 : #pragma once
5 :
6 : #include <algorithm>
7 : #include <cmath>
8 : #include <cstddef>
9 : #include <iterator>
10 : #include <limits>
11 : #include <mutex>
12 : #include <optional>
13 : #include <string>
14 : #include <unordered_map>
15 :
16 : #include "DataStructures/DataBox/DataBox.hpp"
17 : #include "DataStructures/Index.hpp"
18 : #include "Domain/Creators/Tags/Domain.hpp"
19 : #include "Domain/FunctionsOfTime/Tags.hpp"
20 : #include "Domain/Tags.hpp"
21 : #include "IO/H5/AccessType.hpp"
22 : #include "IO/H5/File.hpp"
23 : #include "IO/H5/TensorData.hpp"
24 : #include "IO/H5/VolumeData.hpp"
25 : #include "IO/Observer/Helpers.hpp"
26 : #include "IO/Observer/ObservationId.hpp"
27 : #include "IO/Observer/ObserverComponent.hpp"
28 : #include "IO/Observer/Tags.hpp"
29 : #include "IO/Observer/TypeOfObservation.hpp"
30 : #include "Parallel/ArrayComponentId.hpp"
31 : #include "Parallel/GlobalCache.hpp"
32 : #include "Parallel/Info.hpp"
33 : #include "Parallel/Invoke.hpp"
34 : #include "Parallel/Local.hpp"
35 : #include "Parallel/NodeLock.hpp"
36 : #include "Parallel/ParallelComponentHelpers.hpp"
37 : #include "Utilities/Algorithm.hpp"
38 : #include "Utilities/ErrorHandling/Error.hpp"
39 : #include "Utilities/Gsl.hpp"
40 : #include "Utilities/Requires.hpp"
41 : #include "Utilities/Serialization/Serialize.hpp"
42 : #include "Utilities/StdHelpers.hpp"
43 : #include "Utilities/TMPL.hpp"
44 : #include "Utilities/TaggedTuple.hpp"
45 :
46 : namespace observers {
47 : /// \cond
48 : namespace ThreadedActions {
49 : struct ContributeVolumeDataToWriter;
50 : } // namespace ThreadedActions
51 : /// \endcond
52 : namespace Actions {
53 :
54 : /*!
55 : * \ingroup ObserversGroup
56 : * \brief Send volume tensor data to the observer.
57 : *
58 : * The caller of this Action (which is to be invoked on the Observer parallel
59 : * component) must pass in an `observation_id` used to uniquely identify the
60 : * observation in time, the name of the `h5::VolumeData` subfile in the HDF5
61 : * file (e.g. `/element_data`, where the slash is important), the contributing
62 : * parallel component element's component id, and the `ElementVolumeData`
63 : * to be written to disk.
64 : */
65 1 : struct ContributeVolumeData {
66 : template <typename ParallelComponent, typename DbTagsList,
67 : typename Metavariables, typename ArrayIndex>
68 0 : static void apply(
69 : db::DataBox<DbTagsList>& box, Parallel::GlobalCache<Metavariables>& cache,
70 : const ArrayIndex& array_index,
71 : const observers::ObservationId& observation_id,
72 : const std::string& subfile_name,
73 : const Parallel::ArrayComponentId& sender_array_id,
74 : ElementVolumeData&& received_volume_data,
75 : const std::optional<std::string>& dependency = std::nullopt) {
76 : db::mutate<Tags::TensorData, Tags::ContributorsOfTensorData>(
77 : [&array_index, &cache, &received_volume_data, &observation_id,
78 : &sender_array_id, &subfile_name, &dependency](
79 : const gsl::not_null<std::unordered_map<
80 : observers::ObservationId,
81 : std::unordered_map<Parallel::ArrayComponentId,
82 : ElementVolumeData>>*>
83 : volume_data,
84 : const gsl::not_null<std::unordered_map<
85 : ObservationId, std::unordered_set<Parallel::ArrayComponentId>>*>
86 : contributed_volume_data_ids,
87 : const std::unordered_map<
88 : ObservationKey,
89 : std::unordered_set<Parallel::ArrayComponentId>>&
90 : registered_array_component_ids) mutable { // NOLINT(spectre-mutable)
91 : const ObservationKey& key{observation_id.observation_key()};
92 : if (UNLIKELY(registered_array_component_ids.find(key) ==
93 : registered_array_component_ids.end())) {
94 : ERROR("Receiving data from observation id "
95 : << observation_id << " that was never registered.");
96 : }
97 : const auto& registered_ids = registered_array_component_ids.at(key);
98 : if (UNLIKELY(registered_ids.find(sender_array_id) ==
99 : registered_ids.end())) {
100 : ERROR("Receiving volume data from array component id "
101 : << sender_array_id << " that is not registered.");
102 : }
103 :
104 : auto& contributed_array_ids =
105 : (*contributed_volume_data_ids)[observation_id];
106 : if (UNLIKELY(contributed_array_ids.find(sender_array_id) !=
107 : contributed_array_ids.end())) {
108 : ERROR("Already received volume data to observation id "
109 : << observation_id << " from array component id "
110 : << sender_array_id);
111 : }
112 : contributed_array_ids.insert(sender_array_id);
113 :
114 : if ((not volume_data->contains(observation_id)) or
115 : (not volume_data->at(observation_id).contains(sender_array_id))) {
116 : volume_data->operator[](observation_id)
117 : .emplace(sender_array_id, std::move(received_volume_data));
118 : } else {
119 : auto& current_data =
120 : volume_data->at(observation_id).at(sender_array_id);
121 : if (UNLIKELY(not alg::equal(current_data.extents,
122 : received_volume_data.extents))) {
123 : ERROR(
124 : "The extents from the same volume component at a specific "
125 : "observation should always be the same. For example, the "
126 : "extents of a dG element should be the same for all calls to "
127 : "ContributeVolumeData that occur at the same time.");
128 : }
129 : current_data.tensor_components.insert(
130 : current_data.tensor_components.end(),
131 : std::make_move_iterator(
132 : received_volume_data.tensor_components.begin()),
133 : std::make_move_iterator(
134 : received_volume_data.tensor_components.end()));
135 : }
136 :
137 : // Check if we have received all "volume" data from the registered
138 : // elements. If so we copy it to the nodegroup volume writer.
139 : if (contributed_array_ids.size() == registered_ids.size()) {
140 : auto& local_writer = *Parallel::local_branch(
141 : Parallel::get_parallel_component<ObserverWriter<Metavariables>>(
142 : cache));
143 : Parallel::threaded_action<
144 : ThreadedActions::ContributeVolumeDataToWriter>(
145 : local_writer, observation_id,
146 : Parallel::make_array_component_id<ParallelComponent>(
147 : array_index),
148 : subfile_name, std::move((*volume_data)[observation_id]),
149 : dependency);
150 : volume_data->erase(observation_id);
151 : contributed_volume_data_ids->erase(observation_id);
152 : }
153 : },
154 : make_not_null(&box),
155 : db::get<Tags::ExpectedContributorsForObservations>(box));
156 : }
157 : };
158 : } // namespace Actions
159 :
160 : namespace ThreadedActions {
161 : namespace VolumeActions_detail {
162 : void write_data(const std::string& h5_file_name,
163 : const std::string& input_source,
164 : const std::string& subfile_path,
165 : const observers::ObservationId& observation_id,
166 : std::vector<ElementVolumeData>&& volume_data);
167 :
168 : template <typename ParallelComponent, typename Metavariables,
169 : typename VolumeDataAtObsId>
170 : void write_combined_volume_data(
171 : Parallel::GlobalCache<Metavariables>& cache,
172 : const observers::ObservationId& observation_id,
173 : const VolumeDataAtObsId& volume_data,
174 : const gsl::not_null<Parallel::NodeLock*> volume_file_lock,
175 : const std::string& subfile_name) {
176 : ASSERT(not volume_data.empty(),
177 : "Failed to populate volume_data before trying to write it.");
178 :
179 : std::vector<ElementVolumeData> volume_data_to_write;
180 :
181 : if constexpr (std::is_same_v<tmpl::at_c<VolumeDataAtObsId, 1>,
182 : ElementVolumeData>) {
183 : volume_data_to_write.reserve(volume_data.size());
184 : for (const auto& [id, element] : volume_data) {
185 : (void)id; // avoid compiler warnings
186 : volume_data_to_write.push_back(element);
187 : }
188 : } else {
189 : size_t total_size = 0;
190 : for (const auto& [id, vec_elements] : volume_data) {
191 : (void)id; // avoid compiler warnings
192 : total_size += vec_elements.size();
193 : }
194 : volume_data_to_write.reserve(total_size);
195 :
196 : for (const auto& [id, vec_elements] : volume_data) {
197 : (void)id; // avoid compiler warnings
198 : volume_data_to_write.insert(volume_data_to_write.end(),
199 : vec_elements.begin(), vec_elements.end());
200 : }
201 : }
202 :
203 : // Write to file. We use a separate node lock because writing can be
204 : // very time consuming (it's network dependent, depends on how full the
205 : // disks are, what other users are doing, etc.) and we want to be able
206 : // to continue to work on the nodegroup while we are writing data to
207 : // disk.
208 : const std::lock_guard hold_lock(*volume_file_lock);
209 : {
210 : // Scoping is for closing HDF5 file before we release the lock.
211 : const auto& file_prefix = Parallel::get<Tags::VolumeFileName>(cache);
212 : auto& my_proxy = Parallel::get_parallel_component<ParallelComponent>(cache);
213 : h5::H5File<h5::AccessType::ReadWrite> h5file(
214 : file_prefix +
215 : std::to_string(
216 : Parallel::my_node<int>(*Parallel::local_branch(my_proxy))) +
217 : ".h5",
218 : true, observers::input_source_from_cache(cache));
219 : constexpr size_t version_number = 0;
220 : auto& volume_file =
221 : h5file.try_insert<h5::VolumeData>(subfile_name, version_number);
222 :
223 : // Serialize domain. See `Domain` docs for details on the serialization.
224 : // The domain is retrieved from the global cache using the standard
225 : // domain tag. If more flexibility is required here later, then the
226 : // domain can be passed along with the `ContributeVolumeData` action.
227 : std::optional<std::vector<char>> serialized_domain{};
228 : if (not volume_file.has_domain()) {
229 : serialized_domain = serialize(
230 : Parallel::get<domain::Tags::Domain<Metavariables::volume_dim>>(
231 : cache));
232 : }
233 :
234 : std::optional<std::vector<char>> serialized_global_functions_of_time =
235 : std::nullopt;
236 : std::optional<std::vector<char>> serialized_observation_functions_of_time =
237 : std::nullopt;
238 : if constexpr (Parallel::is_in_global_cache<Metavariables,
239 : domain::Tags::FunctionsOfTime>) {
240 : const auto& functions_of_time = get<domain::Tags::FunctionsOfTime>(cache);
241 : serialized_global_functions_of_time = serialize(functions_of_time);
242 : // NOLINTNEXTLINE(misc-const-correctness)
243 : domain::FunctionsOfTimeMap observation_functions_of_time{};
244 : const double obs_time = observation_id.value();
245 : // Generally, the functions of time should be valid when we
246 : // perform an observation. The exception is when running in an
247 : // AtCleanup event, in which case the observation time is a
248 : // bogus value and we just skip writing the values.
249 : if (alg::all_of(functions_of_time, [&](const auto& fot) {
250 : const auto bounds = fot.second->time_bounds();
251 : return bounds[0] <= obs_time and obs_time <= bounds[1];
252 : })) {
253 : // create a new function of time, effectively truncating the history.
254 : for (const auto& [name, fot_ptr] : functions_of_time) {
255 : observation_functions_of_time[name] = fot_ptr->create_at_time(
256 : obs_time, obs_time + 100.0 *
257 : std::numeric_limits<double>::epsilon() *
258 : std::max(std::abs(obs_time), 1.0));
259 : }
260 : serialized_observation_functions_of_time =
261 : serialize(observation_functions_of_time);
262 : }
263 : }
264 :
265 : // Write the data to the file
266 : volume_file.write_volume_data(observation_id.hash(), observation_id.value(),
267 : volume_data_to_write, serialized_domain,
268 : serialized_observation_functions_of_time,
269 : serialized_global_functions_of_time);
270 : }
271 : }
272 : } // namespace VolumeActions_detail
273 : /*!
274 : * \ingroup ObserversGroup
275 : * \brief Move data to the observer writer for writing to disk.
276 : *
277 : * Once data from all cores is collected this action writes the data to disk if
278 : * there isn't a dependency. Or if there is a dependency and it has been
279 : * received already. If there is a dependency but it hasn't been received yet,
280 : * data will be written by a call to `ContributeDependency`.
281 : */
282 1 : struct ContributeVolumeDataToWriter {
283 : template <typename ParallelComponent, typename DbTagsList,
284 : typename Metavariables, typename ArrayIndex>
285 0 : static void apply(
286 : db::DataBox<DbTagsList>& box, Parallel::GlobalCache<Metavariables>& cache,
287 : const ArrayIndex& /*array_index*/,
288 : const gsl::not_null<Parallel::NodeLock*> node_lock,
289 : const observers::ObservationId& observation_id,
290 : Parallel::ArrayComponentId observer_group_id,
291 : const std::string& subfile_name,
292 : std::unordered_map<Parallel::ArrayComponentId,
293 : std::vector<ElementVolumeData>>&& received_volume_data,
294 : const std::optional<std::string>& dependency = std::nullopt) {
295 : apply_impl<Tags::InterpolatorTensorData, ParallelComponent>(
296 : box, cache, node_lock, observation_id, observer_group_id, subfile_name,
297 : std::move(received_volume_data), dependency);
298 : }
299 :
300 : template <typename ParallelComponent, typename DbTagsList,
301 : typename Metavariables, typename ArrayIndex>
302 0 : static void apply(
303 : db::DataBox<DbTagsList>& box, Parallel::GlobalCache<Metavariables>& cache,
304 : const ArrayIndex& /*array_index*/,
305 : const gsl::not_null<Parallel::NodeLock*> node_lock,
306 : const observers::ObservationId& observation_id,
307 : Parallel::ArrayComponentId observer_group_id,
308 : const std::string& subfile_name,
309 : std::unordered_map<Parallel::ArrayComponentId, ElementVolumeData>&&
310 : received_volume_data,
311 : const std::optional<std::string>& dependency = std::nullopt) {
312 : apply_impl<Tags::TensorData, ParallelComponent>(
313 : box, cache, node_lock, observation_id, observer_group_id, subfile_name,
314 : std::move(received_volume_data), dependency);
315 : }
316 :
317 : private:
318 : template <typename TensorDataTag, typename ParallelComponent,
319 : typename DbTagsList, typename Metavariables,
320 : typename VolumeDataAtObsId>
321 0 : static void apply_impl(
322 : db::DataBox<DbTagsList>& box, Parallel::GlobalCache<Metavariables>& cache,
323 : const gsl::not_null<Parallel::NodeLock*> node_lock,
324 : const observers::ObservationId& observation_id,
325 : Parallel::ArrayComponentId observer_group_id,
326 : const std::string& subfile_name, VolumeDataAtObsId received_volume_data,
327 : const std::optional<std::string>& dependency = std::nullopt) {
328 : // The below gymnastics with pointers is done in order to minimize the
329 : // time spent locking the entire node, which is necessary because the
330 : // DataBox does not allow any function calls, either get and mutate, during
331 : // a mutate. We separate out writing from the operations that edit the
332 : // DataBox since writing to disk can be very slow, but moving data around is
333 : // comparatively quick.
334 : Parallel::NodeLock* volume_file_lock = nullptr;
335 : bool perform_write = false;
336 : VolumeDataAtObsId volume_data{};
337 :
338 : {
339 : const std::lock_guard hold_lock(*node_lock);
340 :
341 : // Set file lock for later
342 : db::mutate<Tags::H5FileLock>(
343 : [&volume_file_lock](
344 : const gsl::not_null<Parallel::NodeLock*> volume_file_lock_ptr) {
345 : volume_file_lock = &*volume_file_lock_ptr;
346 : },
347 : make_not_null(&box));
348 :
349 : ASSERT(volume_file_lock != nullptr,
350 : "Failed to set volume_file_lock in the mutate");
351 :
352 : const auto& observations_registered =
353 : db::get<Tags::ExpectedContributorsForObservations>(box);
354 :
355 : const ObservationKey& key = observation_id.observation_key();
356 : if (LIKELY(observations_registered.contains(key))) {
357 : if (UNLIKELY(not observations_registered.at(key).contains(
358 : observer_group_id))) {
359 : ERROR("The observer group id "
360 : << observer_group_id
361 : << " was not registered for the observation id "
362 : << observation_id);
363 : }
364 : } else {
365 : ERROR("key " << key
366 : << " not in the registered group ids. Known keys are "
367 : << keys_of(observations_registered));
368 : }
369 :
370 : const size_t observations_registered_with_id =
371 : observations_registered.at(key).size();
372 :
373 : // Ok because we have the node lock
374 : auto& volume_observers_contributed =
375 : db::get_mutable_reference<Tags::ContributorsOfTensorData>(
376 : make_not_null(&box));
377 : auto& all_volume_data =
378 : db::get_mutable_reference<TensorDataTag>(make_not_null(&box));
379 : auto& box_dependencies =
380 : db::get_mutable_reference<Tags::Dependencies>(make_not_null(&box));
381 :
382 : auto& contributed_group_ids =
383 : volume_observers_contributed[observation_id];
384 :
385 : if (UNLIKELY(contributed_group_ids.contains(observer_group_id))) {
386 : ERROR("Already received reduction data to observation id "
387 : << observation_id << " from array component id "
388 : << observer_group_id);
389 : }
390 : contributed_group_ids.insert(observer_group_id);
391 :
392 : // Add received volume data to the box
393 : if (all_volume_data.contains(observation_id)) {
394 : auto& current_data = all_volume_data.at(observation_id);
395 : current_data.insert(
396 : std::make_move_iterator(received_volume_data.begin()),
397 : std::make_move_iterator(received_volume_data.end()));
398 : } else {
399 : // We haven't been called before on this processing element.
400 : all_volume_data[observation_id] = std::move(received_volume_data);
401 : }
402 :
403 : // Check if we have received all "volume" data from the Observer
404 : // group. If so we write to disk.
405 : if (volume_observers_contributed.at(observation_id).size() ==
406 : observations_registered_with_id) {
407 : // Check if
408 : // 1. there is an external dependencies
409 : if (dependency.has_value()) {
410 : // 2. if there is, that we have received something at this time
411 : // 3. that the dependencies are the same
412 : if (box_dependencies.contains(observation_id)) {
413 : if (UNLIKELY(box_dependencies.at(observation_id).first !=
414 : dependency.value())) {
415 : ERROR(
416 : "The dependency that was sent to the ObserverWriter from the "
417 : "elements ("
418 : << dependency.value()
419 : << ") does not match the dependency received from "
420 : "ContributeDependency ("
421 : << box_dependencies.at(observation_id).first << ").");
422 : }
423 : // 4. that we are writing the volume data to disk
424 : if (box_dependencies.at(observation_id).second) {
425 : perform_write = true;
426 : volume_data = std::move(all_volume_data[observation_id]);
427 : }
428 :
429 : // Whether or not we are writing data to disk, we clean up because
430 : // we have received both the volume data and the dependency
431 : all_volume_data.erase(observation_id);
432 : volume_observers_contributed.erase(observation_id);
433 : box_dependencies.erase(observation_id);
434 : }
435 : } else {
436 : perform_write = true;
437 : volume_data = std::move(all_volume_data[observation_id]);
438 : all_volume_data.erase(observation_id);
439 : volume_observers_contributed.erase(observation_id);
440 : }
441 : }
442 : }
443 :
444 : if (perform_write) {
445 : VolumeActions_detail::write_combined_volume_data<ParallelComponent>(
446 : cache, observation_id, volume_data, make_not_null(volume_file_lock),
447 : subfile_name);
448 : }
449 : }
450 : };
451 :
452 : /*!
453 : * \brief Threaded action that will add a dependency to the ObserverWriter for a
454 : * given ObservationId ( \p time + \p volume_subfile_name).
455 : *
456 : * \details If not all the volume data for this ObservationId has been received
457 : * yet, then this will just add the dependency to the box and exit without
458 : * writing anything. If all volume data arrives before this action is called,
459 : * then it will write out the volume data (or remove it if we aren't writing).
460 : */
461 1 : struct ContributeDependency {
462 : template <typename ParallelComponent, typename DbTagsList,
463 : typename Metavariables, typename ArrayIndex>
464 0 : static void apply(db::DataBox<DbTagsList>& box,
465 : Parallel::GlobalCache<Metavariables>& cache,
466 : const ArrayIndex& /*array_index*/,
467 : const gsl::not_null<Parallel::NodeLock*> node_lock,
468 : const double time, const std::string& dependency,
469 : std::string volume_subfile_name,
470 : const bool write_volume_data) {
471 : if (not volume_subfile_name.starts_with("/")) {
472 : volume_subfile_name = "/" + volume_subfile_name;
473 : }
474 : if (not volume_subfile_name.ends_with(".vol")) {
475 : volume_subfile_name += ".vol";
476 : }
477 :
478 : const ObservationId observation_id{time, volume_subfile_name};
479 :
480 : // The below gymnastics with pointers is done in order to minimize the
481 : // time spent locking the entire node, which is necessary because the
482 : // DataBox does not allow any function calls, either get and mutate, during
483 : // a mutate. We separate out writing from the operations that edit the
484 : // DataBox since writing to disk can be very slow, but moving data around is
485 : // comparatively quick.
486 : Parallel::NodeLock* volume_file_lock = nullptr;
487 : bool perform_write = false;
488 : std::unordered_map<Parallel::ArrayComponentId, ElementVolumeData>
489 : volume_data{};
490 :
491 : // For now just hold the entire node. We can optimize with different locks
492 : // later on
493 : {
494 : const std::lock_guard hold_lock(*node_lock);
495 :
496 : db::mutate<Tags::H5FileLock, Tags::Dependencies>(
497 : [&](const gsl::not_null<Parallel::NodeLock*> volume_file_lock_ptr,
498 : const gsl::not_null<std::unordered_map<
499 : ObservationId, std::pair<std::string, bool>>*>
500 : dependencies) {
501 : volume_file_lock = &*volume_file_lock_ptr;
502 : (*dependencies)[observation_id] =
503 : std::pair{dependency, write_volume_data};
504 : },
505 : make_not_null(&box));
506 :
507 : auto& volume_observers_contributed =
508 : db::get_mutable_reference<Tags::ContributorsOfTensorData>(
509 : make_not_null(&box));
510 : auto& all_volume_data =
511 : db::get_mutable_reference<Tags::TensorData>(make_not_null(&box));
512 : auto& box_dependencies =
513 : db::get_mutable_reference<Tags::Dependencies>(make_not_null(&box));
514 : const auto& expected_contributors =
515 : db::get<Tags::ExpectedContributorsForObservations>(box);
516 :
517 : if (not expected_contributors.contains(
518 : observation_id.observation_key())) {
519 : ERROR("Key " << observation_id.observation_key()
520 : << " was not registered.");
521 : }
522 :
523 : // We have not received any volume data at this time so we can't do
524 : // anything
525 : if (not(volume_observers_contributed.contains(observation_id) and
526 : all_volume_data.contains(observation_id))) {
527 : return;
528 : }
529 :
530 : // Check if we have received all "volume" data from the Observer
531 : // group. If so we write to disk. Then always delete it since the volume
532 : // data was waiting for this dependency to arrive to be written
533 : if (volume_observers_contributed.at(observation_id).size() ==
534 : expected_contributors.at(observation_id.observation_key()).size()) {
535 : if (write_volume_data) {
536 : perform_write = true;
537 : volume_data = std::move(all_volume_data.at(observation_id));
538 : }
539 :
540 : all_volume_data.erase(observation_id);
541 : volume_observers_contributed.erase(observation_id);
542 : box_dependencies.erase(observation_id);
543 : }
544 : }
545 :
546 : if (perform_write) {
547 : VolumeActions_detail::write_combined_volume_data<ParallelComponent>(
548 : cache, observation_id, volume_data, make_not_null(volume_file_lock),
549 : volume_subfile_name);
550 : }
551 : }
552 : };
553 :
554 : /*!
555 : * \brief Write volume data (such as surface data) at a given time (specified by
556 : * an `ObservationId`) without the need to register or reduce anything, e.g.
557 : * from a singleton component or from a specific chare.
558 : *
559 : * Use `observers::Actions::ContributeVolumeDataToWriter` instead if you need to
560 : * write volume data from an array chare (e.g., writing volume data from all the
561 : * elements in a domain).
562 : *
563 : * Invoke this action on the `observers::ObserverWriter` component on node 0.
564 : * Pass the following arguments when invoking this action:
565 : *
566 : * - `h5_file_name`: the name of the HDF5 file where the volume data is to be
567 : * written (without the .h5 extension).
568 : * - `subfile_path`: the path where the volume data should be written in the
569 : * HDF5 file. Include a leading slash, e.g., `/AhA`.
570 : * - `observation_id`: the ObservationId corresponding to the volume data.
571 : * - `volume_data`: the volume data to be written.
572 : */
573 1 : struct WriteVolumeData {
574 : template <typename ParallelComponent, typename DbTagsList,
575 : typename Metavariables, typename ArrayIndex, typename... Ts,
576 : typename DataBox = db::DataBox<DbTagsList>>
577 0 : static void apply(db::DataBox<DbTagsList>& box,
578 : Parallel::GlobalCache<Metavariables>& cache,
579 : const ArrayIndex& /*array_index*/,
580 : const gsl::not_null<Parallel::NodeLock*> /*node_lock*/,
581 : const std::string& h5_file_name,
582 : const std::string& subfile_path,
583 : const observers::ObservationId& observation_id,
584 : std::vector<ElementVolumeData>&& volume_data) {
585 : auto& volume_file_lock =
586 : db::get_mutable_reference<Tags::H5FileLock>(make_not_null(&box));
587 : const std::lock_guard hold_lock(volume_file_lock);
588 : VolumeActions_detail::write_data(
589 : h5_file_name, observers::input_source_from_cache(cache), subfile_path,
590 : observation_id, std::move(volume_data));
591 : }
592 :
593 : // For a local synchronous action
594 0 : using return_type = void;
595 :
596 : /// \brief The apply call for the local synchronous action
597 : template <typename ParallelComponent, typename DbTagsList,
598 : typename Metavariables, typename... Ts,
599 : typename DataBox = db::DataBox<DbTagsList>>
600 1 : static return_type apply(
601 : db::DataBox<DbTagsList>& box,
602 : const gsl::not_null<Parallel::NodeLock*> /*node_lock*/,
603 : Parallel::GlobalCache<Metavariables>& cache,
604 : const std::string& h5_file_name, const std::string& subfile_path,
605 : const observers::ObservationId& observation_id,
606 : std::vector<ElementVolumeData>&& volume_data) {
607 : auto& volume_file_lock =
608 : db::get_mutable_reference<Tags::H5FileLock>(make_not_null(&box));
609 : const std::lock_guard hold_lock(volume_file_lock);
610 : VolumeActions_detail::write_data(
611 : h5_file_name, observers::input_source_from_cache(cache), subfile_path,
612 : observation_id, std::move(volume_data));
613 : }
614 : };
615 : } // namespace ThreadedActions
616 : } // namespace observers
|