SpECTRE Documentation Coverage Report
Current view: top level - DataStructures - LinkedMessageQueue.hpp Hit Total Coverage
Commit: 1f2210958b4f38fdc0400907ee7c6d5af5111418 Lines: 5 14 35.7 %
Date: 2025-12-05 05:03:31
Legend: Lines: hit not hit

          Line data    Source code
       1           0 : // Distributed under the MIT License.
       2             : // See LICENSE.txt for details.
       3             : 
       4             : #pragma once
       5             : 
       6             : #include <optional>
       7             : #include <ostream>
       8             : #include <pup.h>
       9             : #include <type_traits>
      10             : #include <unordered_map>
      11             : #include <utility>
      12             : 
      13             : #include "DataStructures/LinkedMessageId.hpp"
      14             : #include "Utilities/ErrorHandling/Assert.hpp"
      15             : #include "Utilities/Serialization/PupStlCpp17.hpp"
      16             : #include "Utilities/TMPL.hpp"
      17             : #include "Utilities/TaggedTuple.hpp"
      18             : 
      19             : /// \cond
      20             : template <typename Id, typename QueueTags>
      21             : class LinkedMessageQueue;
      22             : /// \endcond
      23             : 
      24             : /// \ingroup DataStructuresGroup
      25             : /// A collection of queues of asynchronously received data
      26             : ///
      27             : /// This class is designed to collect asynchronously received
      28             : /// messages, possibly from multiple sources, until sufficient data
      29             : /// has been received for processing.  Each tag in \p QueueTags
      30             : /// defines a queue of messages of type `Tag::type`.
      31             : template <typename Id, typename... QueueTags>
      32           1 : class LinkedMessageQueue<Id, tmpl::list<QueueTags...>> {
      33             :   static_assert(sizeof...(QueueTags) > 0,
      34             :                 "Must have at least one message source.");
      35             : 
      36             :  public:
      37           0 :   using IdType = Id;
      38           0 :   using queue_tags_list = tmpl::list<QueueTags...>;
      39             : 
      40             :   /// Insert data into a given queue at a given ID.  All queues must
      41             :   /// receive data with the same collection of \p id_and_previous, but
      42             :   /// are not required to receive them in the same order.
      43             :   template <typename Tag>
      44           1 :   void insert(const LinkedMessageId<Id>& id_and_previous,
      45             :               typename Tag::type message);
      46             : 
      47             :   /// Insert multiple data at once into a given queue at a given ID.  All queues
      48             :   /// must receive data with the same collection of \p id_and_previous, but are
      49             :   /// not required to receive them in the same order.
      50             :   ///
      51             :   /// \details Tags are inserted in the order they are passed in. Duplicate tags
      52             :   /// are not allowed.
      53             :   template <typename Tag1, typename Tag2, typename... Tags>
      54           1 :   void insert(const LinkedMessageId<Id>& id_and_previous,
      55             :               typename Tag1::type message_1, typename Tag2::type message_2,
      56             :               typename Tags::type... messages);
      57             : 
      58             :   /// The next ID in the received sequence, if all queues have
      59             :   /// received messages at that ID.
      60           1 :   std::optional<Id> next_ready_id() const;
      61             : 
      62             :   /// All the messages received at the time indicated by
      63             :   /// `next_ready_id()`.  It is an error to call this function if
      64             :   /// `next_ready_id()` would return an empty optional.  This function
      65             :   /// removes the stored messages from the queues, advancing the
      66             :   /// internal sequence ID.
      67           1 :   tuples::TaggedTuple<QueueTags...> extract();
      68             : 
      69             :   // NOLINTNEXTLINE(google-runtime-references)
      70           0 :   void pup(PUP::er& p) {
      71             :     p | previous_id_;
      72             :     p | messages_;
      73             :   }
      74             : 
      75             :  private:
      76             :   template <typename Tag>
      77           0 :   struct Optional {
      78           0 :     using type = std::optional<typename Tag::type>;
      79             :   };
      80             : 
      81           0 :   using OptionalTuple = tuples::TaggedTuple<Optional<QueueTags>...>;
      82             : 
      83           0 :   std::optional<Id> previous_id_{};
      84             :   std::unordered_map<std::optional<Id>, std::pair<Id, OptionalTuple>>
      85           0 :       messages_{};
      86             : };
      87             : 
      88             : template <typename Id, typename... QueueTags>
      89             : template <typename Tag>
      90             : void LinkedMessageQueue<Id, tmpl::list<QueueTags...>>::insert(
      91             :     const LinkedMessageId<Id>& id_and_previous, typename Tag::type message) {
      92             :   static_assert((... or std::is_same_v<Tag, QueueTags>), "Unrecognized tag.");
      93             :   const auto entry =
      94             :       messages_
      95             :           .insert({id_and_previous.previous,
      96             :                    std::pair{id_and_previous.id, OptionalTuple{}}})
      97             :           .first;
      98             :   auto& [id, tuple] = entry->second;
      99             :   ASSERT(id_and_previous.id == id, "Received messages with different ids ("
     100             :                                        << id << " and " << id_and_previous.id
     101             :                                        << ") but the same previous id ("
     102             :                                        << id_and_previous.previous << ").");
     103             :   ASSERT(not tuples::get<Optional<Tag>>(tuple).has_value(),
     104             :          "Received duplicate messages at id "
     105             :              << id << " and previous id " << id_and_previous.previous << ".");
     106             :   tuples::get<Optional<Tag>>(tuple) = std::move(message);
     107             : }
     108             : 
     109             : template <typename Id, typename... QueueTags>
     110             : template <typename Tag1, typename Tag2, typename... Tags>
     111             : void LinkedMessageQueue<Id, tmpl::list<QueueTags...>>::insert(
     112             :     const LinkedMessageId<Id>& id_and_previous, typename Tag1::type message_1,
     113             :     typename Tag2::type message_2, typename Tags::type... messages) {
     114             :   static_assert(
     115             :       tmpl::size<
     116             :           tmpl::remove_duplicates<tmpl::list<Tag1, Tag2, Tags...>>>::value ==
     117             :           sizeof...(Tags) + 2,
     118             :       "Must have unique tags in LinkedMessageQueue insert.");
     119             :   insert<Tag1>(id_and_previous, std::move(message_1));
     120             :   insert<Tag2>(id_and_previous, std::move(message_2));
     121             : 
     122             :   [[maybe_unused]] const auto insert_pack =
     123             :       [this, &id_and_previous](const auto& tag_v, auto message) {
     124             :         (void)this;
     125             :         using tag = std::decay_t<decltype(tag_v)>;
     126             :         insert<tag>(id_and_previous, std::move(message));
     127             :       };
     128             : 
     129             :   EXPAND_PACK_LEFT_TO_RIGHT(insert_pack(Tags{}, std::move(messages)));
     130             : }
     131             : 
     132             : template <typename Id, typename... QueueTags>
     133             : std::optional<Id>
     134             : LinkedMessageQueue<Id, tmpl::list<QueueTags...>>::next_ready_id() const {
     135             :   const auto current_entry = messages_.find(previous_id_);
     136             :   if (current_entry == messages_.end()) {
     137             :     return {};
     138             :   }
     139             :   const auto& current_value = current_entry->second;
     140             :   const auto& [current_id, current_messages] = current_value;
     141             :   if ((... and
     142             :        tuples::get<Optional<QueueTags>>(current_messages).has_value())) {
     143             :     return current_id;
     144             :   } else {
     145             :     return {};
     146             :   }
     147             : }
     148             : 
     149             : template <typename Id, typename... QueueTags>
     150             : tuples::TaggedTuple<QueueTags...>
     151             : LinkedMessageQueue<Id, tmpl::list<QueueTags...>>::extract() {
     152             :   ASSERT(next_ready_id().has_value(),
     153             :          "Cannot extract before all messages have been received.");
     154             :   const auto current_entry = messages_.find(previous_id_);
     155             :   auto& current_value = current_entry->second;
     156             :   auto& [current_id, current_messages] = current_value;
     157             :   tuples::TaggedTuple<QueueTags...> result{};
     158             :   (void)(..., (tuples::get<QueueTags>(result) = std::move(
     159             :                    *tuples::get<Optional<QueueTags>>(current_messages))));
     160             :   previous_id_ = current_id;
     161             :   messages_.erase(current_entry);
     162             :   return result;
     163             : }

Generated by: LCOV version 1.14