Line data Source code
1 0 : // Distributed under the MIT License. 2 : // See LICENSE.txt for details. 3 : 4 : #pragma once 5 : 6 : #include <optional> 7 : #include <ostream> 8 : #include <pup.h> 9 : #include <unordered_map> 10 : #include <utility> 11 : 12 : #include "DataStructures/LinkedMessageId.hpp" 13 : #include "Utilities/ErrorHandling/Assert.hpp" 14 : #include "Utilities/Serialization/PupStlCpp17.hpp" 15 : #include "Utilities/TMPL.hpp" 16 : #include "Utilities/TaggedTuple.hpp" 17 : 18 : /// \cond 19 : template <typename Id, typename QueueTags> 20 : class LinkedMessageQueue; 21 : /// \endcond 22 : 23 : /// \ingroup DataStructuresGroup 24 : /// A collection of queues of asynchronously received data 25 : /// 26 : /// This class is designed to collect asynchronously received 27 : /// messages, possibly from multiple sources, until sufficient data 28 : /// has been received for processing. Each tag in \p QueueTags 29 : /// defines a queue of messages of type `Tag::type`. 30 : template <typename Id, typename... QueueTags> 31 1 : class LinkedMessageQueue<Id, tmpl::list<QueueTags...>> { 32 : static_assert(sizeof...(QueueTags) > 0, 33 : "Must have at least one message source."); 34 : 35 : public: 36 0 : using IdType = Id; 37 0 : using queue_tags_list = tmpl::list<QueueTags...>; 38 : 39 : /// Insert data into a given queue at a given ID. All queues must 40 : /// receive data with the same collection of \p id_and_previous, but 41 : /// are not required to receive them in the same order. 42 : template <typename Tag> 43 1 : void insert(const LinkedMessageId<Id>& id_and_previous, 44 : typename Tag::type message); 45 : 46 : /// The next ID in the received sequence, if all queues have 47 : /// received messages at that ID. 48 1 : std::optional<Id> next_ready_id() const; 49 : 50 : /// All the messages received at the time indicated by 51 : /// `next_ready_id()`. It is an error to call this function if 52 : /// `next_ready_id()` would return an empty optional. This function 53 : /// removes the stored messages from the queues, advancing the 54 : /// internal sequence ID. 55 1 : tuples::TaggedTuple<QueueTags...> extract(); 56 : 57 : // NOLINTNEXTLINE(google-runtime-references) 58 0 : void pup(PUP::er& p) { 59 : p | previous_id_; 60 : p | messages_; 61 : } 62 : 63 : private: 64 : template <typename Tag> 65 0 : struct Optional { 66 0 : using type = std::optional<typename Tag::type>; 67 : }; 68 : 69 0 : using OptionalTuple = tuples::TaggedTuple<Optional<QueueTags>...>; 70 : 71 0 : std::optional<Id> previous_id_{}; 72 : std::unordered_map<std::optional<Id>, std::pair<Id, OptionalTuple>> 73 0 : messages_{}; 74 : }; 75 : 76 : template <typename Id, typename... QueueTags> 77 : template <typename Tag> 78 : void LinkedMessageQueue<Id, tmpl::list<QueueTags...>>::insert( 79 : const LinkedMessageId<Id>& id_and_previous, typename Tag::type message) { 80 : static_assert((... or std::is_same_v<Tag, QueueTags>), "Unrecognized tag."); 81 : const auto entry = 82 : messages_ 83 : .insert({id_and_previous.previous, 84 : std::pair{id_and_previous.id, OptionalTuple{}}}) 85 : .first; 86 : auto& [id, tuple] = entry->second; 87 : ASSERT(id_and_previous.id == id, 88 : "Received messages with different ids (" << id << " and " 89 : << id_and_previous.id << ") but the same previous id (" 90 : << id_and_previous.previous << ")."); 91 : ASSERT(not tuples::get<Optional<Tag>>(tuple).has_value(), 92 : "Received duplicate messages at id " << id << " and previous id " 93 : << id_and_previous.previous << "."); 94 : tuples::get<Optional<Tag>>(tuple) = std::move(message); 95 : } 96 : 97 : template <typename Id, typename... QueueTags> 98 : std::optional<Id> 99 : LinkedMessageQueue<Id, tmpl::list<QueueTags...>>::next_ready_id() const { 100 : const auto current_entry = messages_.find(previous_id_); 101 : if (current_entry == messages_.end()) { 102 : return {}; 103 : } 104 : const auto& current_value = current_entry->second; 105 : const auto& [current_id, current_messages] = current_value; 106 : if ((... and 107 : tuples::get<Optional<QueueTags>>(current_messages).has_value())) { 108 : return current_id; 109 : } else { 110 : return {}; 111 : } 112 : } 113 : 114 : template <typename Id, typename... QueueTags> 115 : tuples::TaggedTuple<QueueTags...> 116 : LinkedMessageQueue<Id, tmpl::list<QueueTags...>>::extract() { 117 : ASSERT(next_ready_id().has_value(), 118 : "Cannot extract before all messages have been received."); 119 : const auto current_entry = messages_.find(previous_id_); 120 : auto& current_value = current_entry->second; 121 : auto& [current_id, current_messages] = current_value; 122 : tuples::TaggedTuple<QueueTags...> result{}; 123 : (void)(..., (tuples::get<QueueTags>(result) = std::move( 124 : *tuples::get<Optional<QueueTags>>(current_messages)))); 125 : previous_id_ = current_id; 126 : messages_.erase(current_entry); 127 : return result; 128 : }