Line data Source code
1 0 : // Distributed under the MIT License.
2 : // See LICENSE.txt for details.
3 :
4 : #pragma once
5 :
6 : #include <optional>
7 : #include <ostream>
8 : #include <pup.h>
9 : #include <type_traits>
10 : #include <unordered_map>
11 : #include <utility>
12 :
13 : #include "DataStructures/LinkedMessageId.hpp"
14 : #include "Utilities/ErrorHandling/Assert.hpp"
15 : #include "Utilities/Serialization/PupStlCpp17.hpp"
16 : #include "Utilities/TMPL.hpp"
17 : #include "Utilities/TaggedTuple.hpp"
18 :
19 : /// \cond
20 : template <typename Id, typename QueueTags>
21 : class LinkedMessageQueue;
22 : /// \endcond
23 :
24 : /// \ingroup DataStructuresGroup
25 : /// A collection of queues of asynchronously received data
26 : ///
27 : /// This class is designed to collect asynchronously received
28 : /// messages, possibly from multiple sources, until sufficient data
29 : /// has been received for processing. Each tag in \p QueueTags
30 : /// defines a queue of messages of type `Tag::type`.
31 : template <typename Id, typename... QueueTags>
32 1 : class LinkedMessageQueue<Id, tmpl::list<QueueTags...>> {
33 : static_assert(sizeof...(QueueTags) > 0,
34 : "Must have at least one message source.");
35 :
36 : public:
37 0 : using IdType = Id;
38 0 : using queue_tags_list = tmpl::list<QueueTags...>;
39 :
40 : /// Insert data into a given queue at a given ID. All queues must
41 : /// receive data with the same collection of \p id_and_previous, but
42 : /// are not required to receive them in the same order.
43 : template <typename Tag>
44 1 : void insert(const LinkedMessageId<Id>& id_and_previous,
45 : typename Tag::type message);
46 :
47 : /// Insert multiple data at once into a given queue at a given ID. All queues
48 : /// must receive data with the same collection of \p id_and_previous, but are
49 : /// not required to receive them in the same order.
50 : ///
51 : /// \details Tags are inserted in the order they are passed in. Duplicate tags
52 : /// are not allowed.
53 : template <typename Tag1, typename Tag2, typename... Tags>
54 1 : void insert(const LinkedMessageId<Id>& id_and_previous,
55 : typename Tag1::type message_1, typename Tag2::type message_2,
56 : typename Tags::type... messages);
57 :
58 : /// The next ID in the received sequence, if all queues have
59 : /// received messages at that ID.
60 1 : std::optional<Id> next_ready_id() const;
61 :
62 : /// All the messages received at the time indicated by
63 : /// `next_ready_id()`. It is an error to call this function if
64 : /// `next_ready_id()` would return an empty optional. This function
65 : /// removes the stored messages from the queues, advancing the
66 : /// internal sequence ID.
67 1 : tuples::TaggedTuple<QueueTags...> extract();
68 :
69 : // NOLINTNEXTLINE(google-runtime-references)
70 0 : void pup(PUP::er& p) {
71 : p | previous_id_;
72 : p | messages_;
73 : }
74 :
75 : private:
76 : template <typename Tag>
77 0 : struct Optional {
78 0 : using type = std::optional<typename Tag::type>;
79 : };
80 :
81 0 : using OptionalTuple = tuples::TaggedTuple<Optional<QueueTags>...>;
82 :
83 0 : std::optional<Id> previous_id_{};
84 : std::unordered_map<std::optional<Id>, std::pair<Id, OptionalTuple>>
85 0 : messages_{};
86 : };
87 :
88 : template <typename Id, typename... QueueTags>
89 : template <typename Tag>
90 : void LinkedMessageQueue<Id, tmpl::list<QueueTags...>>::insert(
91 : const LinkedMessageId<Id>& id_and_previous, typename Tag::type message) {
92 : static_assert((... or std::is_same_v<Tag, QueueTags>), "Unrecognized tag.");
93 : const auto entry =
94 : messages_
95 : .insert({id_and_previous.previous,
96 : std::pair{id_and_previous.id, OptionalTuple{}}})
97 : .first;
98 : auto& [id, tuple] = entry->second;
99 : ASSERT(id_and_previous.id == id, "Received messages with different ids ("
100 : << id << " and " << id_and_previous.id
101 : << ") but the same previous id ("
102 : << id_and_previous.previous << ").");
103 : ASSERT(not tuples::get<Optional<Tag>>(tuple).has_value(),
104 : "Received duplicate messages at id "
105 : << id << " and previous id " << id_and_previous.previous << ".");
106 : tuples::get<Optional<Tag>>(tuple) = std::move(message);
107 : }
108 :
109 : template <typename Id, typename... QueueTags>
110 : template <typename Tag1, typename Tag2, typename... Tags>
111 : void LinkedMessageQueue<Id, tmpl::list<QueueTags...>>::insert(
112 : const LinkedMessageId<Id>& id_and_previous, typename Tag1::type message_1,
113 : typename Tag2::type message_2, typename Tags::type... messages) {
114 : static_assert(
115 : tmpl::size<
116 : tmpl::remove_duplicates<tmpl::list<Tag1, Tag2, Tags...>>>::value ==
117 : sizeof...(Tags) + 2,
118 : "Must have unique tags in LinkedMessageQueue insert.");
119 : insert<Tag1>(id_and_previous, std::move(message_1));
120 : insert<Tag2>(id_and_previous, std::move(message_2));
121 :
122 : [[maybe_unused]] const auto insert_pack =
123 : [this, &id_and_previous](const auto& tag_v, auto message) {
124 : (void)this;
125 : using tag = std::decay_t<decltype(tag_v)>;
126 : insert<tag>(id_and_previous, std::move(message));
127 : };
128 :
129 : EXPAND_PACK_LEFT_TO_RIGHT(insert_pack(Tags{}, std::move(messages)));
130 : }
131 :
132 : template <typename Id, typename... QueueTags>
133 : std::optional<Id>
134 : LinkedMessageQueue<Id, tmpl::list<QueueTags...>>::next_ready_id() const {
135 : const auto current_entry = messages_.find(previous_id_);
136 : if (current_entry == messages_.end()) {
137 : return {};
138 : }
139 : const auto& current_value = current_entry->second;
140 : const auto& [current_id, current_messages] = current_value;
141 : if ((... and
142 : tuples::get<Optional<QueueTags>>(current_messages).has_value())) {
143 : return current_id;
144 : } else {
145 : return {};
146 : }
147 : }
148 :
149 : template <typename Id, typename... QueueTags>
150 : tuples::TaggedTuple<QueueTags...>
151 : LinkedMessageQueue<Id, tmpl::list<QueueTags...>>::extract() {
152 : ASSERT(next_ready_id().has_value(),
153 : "Cannot extract before all messages have been received.");
154 : const auto current_entry = messages_.find(previous_id_);
155 : auto& current_value = current_entry->second;
156 : auto& [current_id, current_messages] = current_value;
157 : tuples::TaggedTuple<QueueTags...> result{};
158 : (void)(..., (tuples::get<QueueTags>(result) = std::move(
159 : *tuples::get<Optional<QueueTags>>(current_messages))));
160 : previous_id_ = current_id;
161 : messages_.erase(current_entry);
162 : return result;
163 : }
|