Line data Source code
1 0 : // Distributed under the MIT License. 2 : // See LICENSE.txt for details. 3 : 4 : #pragma once 5 : 6 : #include <optional> 7 : #include <ostream> 8 : #include <pup.h> 9 : #include <unordered_map> 10 : #include <utility> 11 : 12 : #include "DataStructures/LinkedMessageId.hpp" 13 : #include "Utilities/ErrorHandling/Assert.hpp" 14 : #include "Utilities/Serialization/PupStlCpp17.hpp" 15 : #include "Utilities/TMPL.hpp" 16 : #include "Utilities/TaggedTuple.hpp" 17 : 18 : /// \cond 19 : template <typename Id, typename QueueTags> 20 : class LinkedMessageQueue; 21 : /// \endcond 22 : 23 : /// \ingroup DataStructuresGroup 24 : /// A collection of queues of asynchronously received data 25 : /// 26 : /// This class is designed to collect asynchronously received 27 : /// messages, possibly from multiple sources, until sufficient data 28 : /// has been received for processing. Each tag in \p QueueTags 29 : /// defines a queue of messages of type `Tag::type`. 30 : template <typename Id, typename... QueueTags> 31 1 : class LinkedMessageQueue<Id, tmpl::list<QueueTags...>> { 32 : static_assert(sizeof...(QueueTags) > 0, 33 : "Must have at least one message source."); 34 : 35 : public: 36 0 : using IdType = Id; 37 : 38 : /// Insert data into a given queue at a given ID. All queues must 39 : /// receive data with the same collection of \p id_and_previous, but 40 : /// are not required to receive them in the same order. 41 : template <typename Tag> 42 1 : void insert(const LinkedMessageId<Id>& id_and_previous, 43 : typename Tag::type message); 44 : 45 : /// The next ID in the received sequence, if all queues have 46 : /// received messages at that ID. 47 1 : std::optional<Id> next_ready_id() const; 48 : 49 : /// All the messages received at the time indicated by 50 : /// `next_ready_id()`. It is an error to call this function if 51 : /// `next_ready_id()` would return an empty optional. This function 52 : /// removes the stored messages from the queues, advancing the 53 : /// internal sequence ID. 54 1 : tuples::TaggedTuple<QueueTags...> extract(); 55 : 56 : // NOLINTNEXTLINE(google-runtime-references) 57 0 : void pup(PUP::er& p) { 58 : p | previous_id_; 59 : p | messages_; 60 : } 61 : 62 : private: 63 : template <typename Tag> 64 0 : struct Optional { 65 0 : using type = std::optional<typename Tag::type>; 66 : }; 67 : 68 0 : using OptionalTuple = tuples::TaggedTuple<Optional<QueueTags>...>; 69 : 70 0 : std::optional<Id> previous_id_{}; 71 : std::unordered_map<std::optional<Id>, std::pair<Id, OptionalTuple>> 72 0 : messages_{}; 73 : }; 74 : 75 : template <typename Id, typename... QueueTags> 76 : template <typename Tag> 77 : void LinkedMessageQueue<Id, tmpl::list<QueueTags...>>::insert( 78 : const LinkedMessageId<Id>& id_and_previous, typename Tag::type message) { 79 : static_assert((... or std::is_same_v<Tag, QueueTags>), "Unrecognized tag."); 80 : const auto entry = 81 : messages_ 82 : .insert({id_and_previous.previous, 83 : std::pair{id_and_previous.id, OptionalTuple{}}}) 84 : .first; 85 : auto& [id, tuple] = entry->second; 86 : ASSERT(id_and_previous.id == id, 87 : "Received messages with different ids (" << id << " and " 88 : << id_and_previous.id << ") but the same previous id (" 89 : << id_and_previous.previous << ")."); 90 : ASSERT(not tuples::get<Optional<Tag>>(tuple).has_value(), 91 : "Received duplicate messages at id " << id << " and previous id " 92 : << id_and_previous.previous << "."); 93 : tuples::get<Optional<Tag>>(tuple) = std::move(message); 94 : } 95 : 96 : template <typename Id, typename... QueueTags> 97 : std::optional<Id> 98 : LinkedMessageQueue<Id, tmpl::list<QueueTags...>>::next_ready_id() const { 99 : const auto current_entry = messages_.find(previous_id_); 100 : if (current_entry == messages_.end()) { 101 : return {}; 102 : } 103 : const auto& current_value = current_entry->second; 104 : const auto& [current_id, current_messages] = current_value; 105 : if ((... and 106 : tuples::get<Optional<QueueTags>>(current_messages).has_value())) { 107 : return current_id; 108 : } else { 109 : return {}; 110 : } 111 : } 112 : 113 : template <typename Id, typename... QueueTags> 114 : tuples::TaggedTuple<QueueTags...> 115 : LinkedMessageQueue<Id, tmpl::list<QueueTags...>>::extract() { 116 : ASSERT(next_ready_id().has_value(), 117 : "Cannot extract before all messages have been received."); 118 : const auto current_entry = messages_.find(previous_id_); 119 : auto& current_value = current_entry->second; 120 : auto& [current_id, current_messages] = current_value; 121 : tuples::TaggedTuple<QueueTags...> result{}; 122 : (void)(..., (tuples::get<QueueTags>(result) = std::move( 123 : *tuples::get<Optional<QueueTags>>(current_messages)))); 124 : previous_id_ = current_id; 125 : messages_.erase(current_entry); 126 : return result; 127 : }