SpECTRE Documentation Coverage Report
Current view: top level - DataStructures - LinkedMessageQueue.hpp Hit Total Coverage
Commit: d0fc80462417e83e5cddfa1b9901bb4a9b6af4d6 Lines: 4 13 30.8 %
Date: 2024-03-29 00:33:31
Legend: Lines: hit not hit

          Line data    Source code
       1           0 : // Distributed under the MIT License.
       2             : // See LICENSE.txt for details.
       3             : 
       4             : #pragma once
       5             : 
       6             : #include <optional>
       7             : #include <ostream>
       8             : #include <pup.h>
       9             : #include <unordered_map>
      10             : #include <utility>
      11             : 
      12             : #include "DataStructures/LinkedMessageId.hpp"
      13             : #include "Utilities/ErrorHandling/Assert.hpp"
      14             : #include "Utilities/Serialization/PupStlCpp17.hpp"
      15             : #include "Utilities/TMPL.hpp"
      16             : #include "Utilities/TaggedTuple.hpp"
      17             : 
      18             : /// \cond
      19             : template <typename Id, typename QueueTags>
      20             : class LinkedMessageQueue;
      21             : /// \endcond
      22             : 
      23             : /// \ingroup DataStructuresGroup
      24             : /// A collection of queues of asynchronously received data
      25             : ///
      26             : /// This class is designed to collect asynchronously received
      27             : /// messages, possibly from multiple sources, until sufficient data
      28             : /// has been received for processing.  Each tag in \p QueueTags
      29             : /// defines a queue of messages of type `Tag::type`.
      30             : template <typename Id, typename... QueueTags>
      31           1 : class LinkedMessageQueue<Id, tmpl::list<QueueTags...>> {
      32             :   static_assert(sizeof...(QueueTags) > 0,
      33             :                 "Must have at least one message source.");
      34             : 
      35             :  public:
      36           0 :   using IdType = Id;
      37           0 :   using queue_tags_list = tmpl::list<QueueTags...>;
      38             : 
      39             :   /// Insert data into a given queue at a given ID.  All queues must
      40             :   /// receive data with the same collection of \p id_and_previous, but
      41             :   /// are not required to receive them in the same order.
      42             :   template <typename Tag>
      43           1 :   void insert(const LinkedMessageId<Id>& id_and_previous,
      44             :               typename Tag::type message);
      45             : 
      46             :   /// The next ID in the received sequence, if all queues have
      47             :   /// received messages at that ID.
      48           1 :   std::optional<Id> next_ready_id() const;
      49             : 
      50             :   /// All the messages received at the time indicated by
      51             :   /// `next_ready_id()`.  It is an error to call this function if
      52             :   /// `next_ready_id()` would return an empty optional.  This function
      53             :   /// removes the stored messages from the queues, advancing the
      54             :   /// internal sequence ID.
      55           1 :   tuples::TaggedTuple<QueueTags...> extract();
      56             : 
      57             :   // NOLINTNEXTLINE(google-runtime-references)
      58           0 :   void pup(PUP::er& p) {
      59             :     p | previous_id_;
      60             :     p | messages_;
      61             :   }
      62             : 
      63             :  private:
      64             :   template <typename Tag>
      65           0 :   struct Optional {
      66           0 :     using type = std::optional<typename Tag::type>;
      67             :   };
      68             : 
      69           0 :   using OptionalTuple = tuples::TaggedTuple<Optional<QueueTags>...>;
      70             : 
      71           0 :   std::optional<Id> previous_id_{};
      72             :   std::unordered_map<std::optional<Id>, std::pair<Id, OptionalTuple>>
      73           0 :       messages_{};
      74             : };
      75             : 
      76             : template <typename Id, typename... QueueTags>
      77             : template <typename Tag>
      78             : void LinkedMessageQueue<Id, tmpl::list<QueueTags...>>::insert(
      79             :     const LinkedMessageId<Id>& id_and_previous, typename Tag::type message) {
      80             :   static_assert((... or std::is_same_v<Tag, QueueTags>), "Unrecognized tag.");
      81             :   const auto entry =
      82             :       messages_
      83             :           .insert({id_and_previous.previous,
      84             :                    std::pair{id_and_previous.id, OptionalTuple{}}})
      85             :           .first;
      86             :   auto& [id, tuple] = entry->second;
      87             :   ASSERT(id_and_previous.id == id,
      88             :          "Received messages with different ids (" << id << " and "
      89             :          << id_and_previous.id << ") but the same previous id ("
      90             :          << id_and_previous.previous << ").");
      91             :   ASSERT(not tuples::get<Optional<Tag>>(tuple).has_value(),
      92             :          "Received duplicate messages at id " << id << " and previous id "
      93             :          << id_and_previous.previous << ".");
      94             :   tuples::get<Optional<Tag>>(tuple) = std::move(message);
      95             : }
      96             : 
      97             : template <typename Id, typename... QueueTags>
      98             : std::optional<Id>
      99             : LinkedMessageQueue<Id, tmpl::list<QueueTags...>>::next_ready_id() const {
     100             :   const auto current_entry = messages_.find(previous_id_);
     101             :   if (current_entry == messages_.end()) {
     102             :     return {};
     103             :   }
     104             :   const auto& current_value = current_entry->second;
     105             :   const auto& [current_id, current_messages] = current_value;
     106             :   if ((... and
     107             :        tuples::get<Optional<QueueTags>>(current_messages).has_value())) {
     108             :     return current_id;
     109             :   } else {
     110             :     return {};
     111             :   }
     112             : }
     113             : 
     114             : template <typename Id, typename... QueueTags>
     115             : tuples::TaggedTuple<QueueTags...>
     116             : LinkedMessageQueue<Id, tmpl::list<QueueTags...>>::extract() {
     117             :   ASSERT(next_ready_id().has_value(),
     118             :          "Cannot extract before all messages have been received.");
     119             :   const auto current_entry = messages_.find(previous_id_);
     120             :   auto& current_value = current_entry->second;
     121             :   auto& [current_id, current_messages] = current_value;
     122             :   tuples::TaggedTuple<QueueTags...> result{};
     123             :   (void)(..., (tuples::get<QueueTags>(result) = std::move(
     124             :                    *tuples::get<Optional<QueueTags>>(current_messages))));
     125             :   previous_id_ = current_id;
     126             :   messages_.erase(current_entry);
     127             :   return result;
     128             : }

Generated by: LCOV version 1.14