SpECTRE Documentation Coverage Report
Current view: top level - DataStructures - LinkedMessageQueue.hpp Hit Total Coverage
Commit: 9f349d3c09e1c03107f00c2135ca40e209d3b84c Lines: 4 12 33.3 %
Date: 2023-06-09 21:05:06
Legend: Lines: hit not hit

          Line data    Source code
       1           0 : // Distributed under the MIT License.
       2             : // See LICENSE.txt for details.
       3             : 
       4             : #pragma once
       5             : 
       6             : #include <optional>
       7             : #include <ostream>
       8             : #include <pup.h>
       9             : #include <unordered_map>
      10             : #include <utility>
      11             : 
      12             : #include "DataStructures/LinkedMessageId.hpp"
      13             : #include "Utilities/ErrorHandling/Assert.hpp"
      14             : #include "Utilities/Serialization/PupStlCpp17.hpp"
      15             : #include "Utilities/TMPL.hpp"
      16             : #include "Utilities/TaggedTuple.hpp"
      17             : 
      18             : /// \cond
      19             : template <typename Id, typename QueueTags>
      20             : class LinkedMessageQueue;
      21             : /// \endcond
      22             : 
      23             : /// \ingroup DataStructuresGroup
      24             : /// A collection of queues of asynchronously received data
      25             : ///
      26             : /// This class is designed to collect asynchronously received
      27             : /// messages, possibly from multiple sources, until sufficient data
      28             : /// has been received for processing.  Each tag in \p QueueTags
      29             : /// defines a queue of messages of type `Tag::type`.
      30             : template <typename Id, typename... QueueTags>
      31           1 : class LinkedMessageQueue<Id, tmpl::list<QueueTags...>> {
      32             :   static_assert(sizeof...(QueueTags) > 0,
      33             :                 "Must have at least one message source.");
      34             : 
      35             :  public:
      36           0 :   using IdType = Id;
      37             : 
      38             :   /// Insert data into a given queue at a given ID.  All queues must
      39             :   /// receive data with the same collection of \p id_and_previous, but
      40             :   /// are not required to receive them in the same order.
      41             :   template <typename Tag>
      42           1 :   void insert(const LinkedMessageId<Id>& id_and_previous,
      43             :               typename Tag::type message);
      44             : 
      45             :   /// The next ID in the received sequence, if all queues have
      46             :   /// received messages at that ID.
      47           1 :   std::optional<Id> next_ready_id() const;
      48             : 
      49             :   /// All the messages received at the time indicated by
      50             :   /// `next_ready_id()`.  It is an error to call this function if
      51             :   /// `next_ready_id()` would return an empty optional.  This function
      52             :   /// removes the stored messages from the queues, advancing the
      53             :   /// internal sequence ID.
      54           1 :   tuples::TaggedTuple<QueueTags...> extract();
      55             : 
      56             :   // NOLINTNEXTLINE(google-runtime-references)
      57           0 :   void pup(PUP::er& p) {
      58             :     p | previous_id_;
      59             :     p | messages_;
      60             :   }
      61             : 
      62             :  private:
      63             :   template <typename Tag>
      64           0 :   struct Optional {
      65           0 :     using type = std::optional<typename Tag::type>;
      66             :   };
      67             : 
      68           0 :   using OptionalTuple = tuples::TaggedTuple<Optional<QueueTags>...>;
      69             : 
      70           0 :   std::optional<Id> previous_id_{};
      71             :   std::unordered_map<std::optional<Id>, std::pair<Id, OptionalTuple>>
      72           0 :       messages_{};
      73             : };
      74             : 
      75             : template <typename Id, typename... QueueTags>
      76             : template <typename Tag>
      77             : void LinkedMessageQueue<Id, tmpl::list<QueueTags...>>::insert(
      78             :     const LinkedMessageId<Id>& id_and_previous, typename Tag::type message) {
      79             :   static_assert((... or std::is_same_v<Tag, QueueTags>), "Unrecognized tag.");
      80             :   const auto entry =
      81             :       messages_
      82             :           .insert({id_and_previous.previous,
      83             :                    std::pair{id_and_previous.id, OptionalTuple{}}})
      84             :           .first;
      85             :   auto& [id, tuple] = entry->second;
      86             :   ASSERT(id_and_previous.id == id,
      87             :          "Received messages with different ids (" << id << " and "
      88             :          << id_and_previous.id << ") but the same previous id ("
      89             :          << id_and_previous.previous << ").");
      90             :   ASSERT(not tuples::get<Optional<Tag>>(tuple).has_value(),
      91             :          "Received duplicate messages at id " << id << " and previous id "
      92             :          << id_and_previous.previous << ".");
      93             :   tuples::get<Optional<Tag>>(tuple) = std::move(message);
      94             : }
      95             : 
      96             : template <typename Id, typename... QueueTags>
      97             : std::optional<Id>
      98             : LinkedMessageQueue<Id, tmpl::list<QueueTags...>>::next_ready_id() const {
      99             :   const auto current_entry = messages_.find(previous_id_);
     100             :   if (current_entry == messages_.end()) {
     101             :     return {};
     102             :   }
     103             :   const auto& current_value = current_entry->second;
     104             :   const auto& [current_id, current_messages] = current_value;
     105             :   if ((... and
     106             :        tuples::get<Optional<QueueTags>>(current_messages).has_value())) {
     107             :     return current_id;
     108             :   } else {
     109             :     return {};
     110             :   }
     111             : }
     112             : 
     113             : template <typename Id, typename... QueueTags>
     114             : tuples::TaggedTuple<QueueTags...>
     115             : LinkedMessageQueue<Id, tmpl::list<QueueTags...>>::extract() {
     116             :   ASSERT(next_ready_id().has_value(),
     117             :          "Cannot extract before all messages have been received.");
     118             :   const auto current_entry = messages_.find(previous_id_);
     119             :   auto& current_value = current_entry->second;
     120             :   auto& [current_id, current_messages] = current_value;
     121             :   tuples::TaggedTuple<QueueTags...> result{};
     122             :   (void)(..., (tuples::get<QueueTags>(result) = std::move(
     123             :                    *tuples::get<Optional<QueueTags>>(current_messages))));
     124             :   previous_id_ = current_id;
     125             :   messages_.erase(current_entry);
     126             :   return result;
     127             : }

Generated by: LCOV version 1.14