SpECTRE Documentation Coverage Report
Current view: top level - Parallel - StaticSpscQueue.hpp Hit Total Coverage
Commit: 2c4f624839e832d3d5b2abc37601f7e1f9a600c9 Lines: 12 28 42.9 %
Date: 2024-05-04 01:01:37
Legend: Lines: hit not hit

          Line data    Source code
       1           0 : // Distributed under the MIT License.
       2             : // See LICENSE.txt for details.
       3             : 
       4             : /*
       5             :  *The original code is distributed under the following copyright and license:
       6             :  *
       7             :  * Copyright (c) 2020 Erik Rigtorp <erik@rigtorp.se>
       8             :  *
       9             :  * Permission is hereby granted, free of charge, to any person obtaining a copy
      10             :  * of this software and associated documentation files (the "Software"), to deal
      11             :  * in the Software without restriction, including without limitation the rights
      12             :  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
      13             :  * copies of the Software, and to permit persons to whom the Software is
      14             :  * furnished to do so, subject to the following conditions:
      15             :  *
      16             :  * The above copyright notice and this permission notice shall be included in
      17             :  * all copies or substantial portions of the Software.
      18             :  *
      19             :  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
      20             :  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
      21             :  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
      22             :  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
      23             :  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
      24             :  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
      25             :  * SOFTWARE.
      26             :  *
      27             :  *
      28             :  * SXS Modifications:
      29             :  * 1. Casing to match SpECTRE conventions
      30             :  * 2. Static capacity
      31             :  * 3. Storage is std::array
      32             :  * 4. Switch to west-const
      33             :  *
      34             :  */
      35             : 
      36             : #pragma once
      37             : 
      38             : #include <atomic>
      39             : #include <cassert>
      40             : #include <cstddef>
      41             : #include <new>  // Placement new
      42             : #include <stdexcept>
      43             : #include <type_traits>
      44             : 
      45             : #include "Utilities/ErrorHandling/Assert.hpp"
      46             : #include "Utilities/Requires.hpp"
      47             : 
      48             : namespace Parallel {
      49             : /*!
      50             :  * \brief A static capacity runtime-sized single-producer single-consumer
      51             :  * lockfree queue.
      52             :  *
      53             :  * As long as only one thread reads and writes simultaneously the queue is
      54             :  * threadsafe. Which threads read and write can change throughout program
      55             :  * execution, the important thing is that there is no instance during the
      56             :  * execution where more than one thread tries to read and where more than one
      57             :  * thread tries to write.
      58             :  *
      59             :  * \note This class is intentionally not serializable since handling
      60             :  * threadsafety around serialization requires careful thought of the individual
      61             :  * circumstances.
      62             :  */
      63             : template <typename T, size_t Capacity>
      64           1 : class StaticSpscQueue {
      65             :  private:
      66             : #ifdef __cpp_lib_hardware_interference_size
      67             :   static constexpr size_t cache_line_size_ =
      68             :       std::hardware_destructive_interference_size;
      69             : #else
      70           0 :   static constexpr size_t cache_line_size_ = 64;
      71             : #endif
      72             : 
      73             :   // Padding to avoid false sharing between slots_ and adjacent allocations
      74           0 :   static constexpr size_t padding_ = (cache_line_size_ - 1) / sizeof(T) + 1;
      75             : 
      76             :  public:
      77           0 :   StaticSpscQueue() = default;
      78           0 :   ~StaticSpscQueue() {
      79             :     // Destruct objects in the buffer.
      80             :     while (front()) {
      81             :       pop();
      82             :     }
      83             :   }
      84             : 
      85           0 :   StaticSpscQueue(const StaticSpscQueue&) = delete;
      86           0 :   StaticSpscQueue& operator=(const StaticSpscQueue&) = delete;
      87           0 :   StaticSpscQueue(StaticSpscQueue&&) = delete;
      88           0 :   StaticSpscQueue& operator=(StaticSpscQueue&&) = delete;
      89             : 
      90             :   /// Construct a new element at the end of the queue in place.
      91             :   ///
      92             :   /// Uses placement new for in-place construction.
      93             :   ///
      94             :   /// \warning This may overwrite existing elements if `capacity()` is
      95             :   /// exceeded without warning.
      96             :   template <typename... Args>
      97           1 :   void emplace(Args&&... args) noexcept(
      98             :       std::is_nothrow_constructible_v<T, Args&&...>) {
      99             :     static_assert(std::is_constructible_v<T, Args&&...>,
     100             :                   "T must be constructible with Args&&...");
     101             :     const auto write_index = write_index_.load(std::memory_order_relaxed);
     102             :     auto next_write_index = write_index + 1;
     103             :     if (next_write_index == capacity_) {
     104             :       next_write_index = 0;
     105             :     }
     106             :     while (next_write_index == read_index_cache_) {
     107             :       read_index_cache_ = read_index_.load(std::memory_order_acquire);
     108             :     }
     109             :     new (&data_[write_index + padding_]) T(std::forward<Args>(args)...);
     110             :     write_index_.store(next_write_index, std::memory_order_release);
     111             :   }
     112             : 
     113             :   /// Construct a new element at the end of the queue in place.
     114             :   ///
     115             :   /// Uses placement new for in-place construction.
     116             :   ///
     117             :   /// Returns `true` if the emplacement succeeded and `false` if it did
     118             :   /// not. If it failed then the queue is currently full.
     119             :   template <typename... Args>
     120           1 :   [[nodiscard]] bool try_emplace(Args&&... args) noexcept(
     121             :       std::is_nothrow_constructible_v<T, Args&&...>) {
     122             :     static_assert(std::is_constructible_v<T, Args&&...>,
     123             :                   "T must be constructible with Args&&...");
     124             :     const auto write_index = write_index_.load(std::memory_order_relaxed);
     125             :     auto next_write_index = write_index + 1;
     126             :     if (next_write_index == capacity_) {
     127             :       next_write_index = 0;
     128             :     }
     129             :     if (next_write_index == read_index_cache_) {
     130             :       read_index_cache_ = read_index_.load(std::memory_order_acquire);
     131             :       if (next_write_index == read_index_cache_) {
     132             :         return false;
     133             :       }
     134             :     }
     135             :     new (&data_[write_index + padding_]) T(std::forward<Args>(args)...);
     136             :     write_index_.store(next_write_index, std::memory_order_release);
     137             :     return true;
     138             :   }
     139             : 
     140             :   /// Push a new element to the end of the queue.
     141             :   ///
     142             :   /// Uses `emplace()` internally.
     143             :   ///
     144             :   /// \warning This may overwrite existing elements if `capacity()` is
     145             :   /// exceeded without warning.
     146           1 :   void push(const T& v) noexcept(std::is_nothrow_copy_constructible_v<T>) {
     147             :     static_assert(std::is_copy_constructible_v<T>,
     148             :                   "T must be copy constructible");
     149             :     emplace(v);
     150             :   }
     151             : 
     152             :   /// Push a new element to the end of the queue.
     153             :   ///
     154             :   /// Uses `emplace()` internally.
     155             :   ///
     156             :   /// \warning This may overwrite existing elements if `capacity()` is
     157             :   /// exceeded without warning.
     158             :   template <typename P, Requires<std::is_constructible_v<T, P&&>> = nullptr>
     159           1 :   void push(P&& v) noexcept(std::is_nothrow_constructible_v<T, P&&>) {
     160             :     emplace(std::forward<P>(v));
     161             :   }
     162             : 
     163             :   /// Push a new element to the end of the queue. Returns `false` if the queue
     164             :   /// is at capacity and does not push the new object, otherwise returns `true`.
     165             :   ///
     166             :   /// Uses `try_emplace()` internally.
     167           1 :   [[nodiscard]] bool try_push(const T& v) noexcept(
     168             :       std::is_nothrow_copy_constructible_v<T>) {
     169             :     static_assert(std::is_copy_constructible_v<T>,
     170             :                   "T must be copy constructible");
     171             :     return try_emplace(v);
     172             :   }
     173             : 
     174             :   /// Push a new element to the end of the queue. Returns `false` if the queue
     175             :   /// is at capacity and does not push the new object, otherwise returns `true`.
     176             :   ///
     177             :   /// Uses `try_emplace()` internally.
     178             :   template <typename P, Requires<std::is_constructible_v<T, P&&>> = nullptr>
     179           1 :   [[nodiscard]] bool try_push(P&& v) noexcept(
     180             :       std::is_nothrow_constructible_v<T, P&&>) {
     181             :     return try_emplace(std::forward<P>(v));
     182             :   }
     183             : 
     184             :   /// Returns the first element from the queue.
     185             :   ///
     186             :   /// \note Returns `nullptr` if the queue is empty.
     187           1 :   [[nodiscard]] T* front() noexcept {
     188             :     const auto read_index = read_index_.load(std::memory_order_relaxed);
     189             :     if (read_index == write_index_cache_) {
     190             :       write_index_cache_ = write_index_.load(std::memory_order_acquire);
     191             :       if (write_index_cache_ == read_index) {
     192             :         return nullptr;
     193             :       }
     194             :     }
     195             :     return &data_[read_index + padding_];
     196             :   }
     197             : 
     198             :   /// Removes the first element from the queue.
     199           1 :   void pop() {
     200             :     static_assert(std::is_nothrow_destructible_v<T>,
     201             :                   "T must be nothrow destructible");
     202             :     const auto read_index = read_index_.load(std::memory_order_relaxed);
     203             : #ifdef SPECTRE_DEBUG
     204             :     const auto write_index = write_index_.load(std::memory_order_acquire);
     205             :     ASSERT(write_index != read_index,
     206             :            "Can't pop an element from an empty queue. read_index: "
     207             :                << read_index << " write_index " << write_index);
     208             : #endif  // SPECTRE_DEBUG
     209             :     data_[read_index + padding_].~T();
     210             :     auto next_read_index = read_index + 1;
     211             :     if (next_read_index == capacity_) {
     212             :       next_read_index = 0;
     213             :     }
     214             :     if (read_index == write_index_cache_) {
     215             :       write_index_cache_ = next_read_index;
     216             :     }
     217             :     read_index_.store(next_read_index, std::memory_order_release);
     218             :   }
     219             : 
     220             :   /// Returns the size of the queue at a particular hardware state.
     221             :   ///
     222             :   /// Note that while this can be checked in a threadsafe manner, it is up to
     223             :   /// the user to guarantee that another thread does not change the queue
     224             :   /// between when `size()` is called and how the result is used.
     225           1 :   [[nodiscard]] size_t size() const noexcept {
     226             :     std::ptrdiff_t diff = static_cast<std::ptrdiff_t>(
     227             :                               write_index_.load(std::memory_order_acquire)) -
     228             :                           static_cast<std::ptrdiff_t>(
     229             :                               read_index_.load(std::memory_order_acquire));
     230             :     if (diff < 0) {
     231             :       diff += static_cast<std::ptrdiff_t>(capacity_);
     232             :     }
     233             :     return static_cast<size_t>(diff);
     234             :   }
     235             : 
     236             :   /// Returns `true` if the queue may be empty, otherwise `false`.
     237             :   ///
     238             :   /// Note that while this can be checked in a threadsafe manner, it is up to
     239             :   /// the user to guarantee that another thread does not change the queue
     240             :   /// between when `empty()` is called and how the result is used.
     241           1 :   [[nodiscard]] bool empty() const noexcept {
     242             :     return write_index_.load(std::memory_order_acquire) ==
     243             :            read_index_.load(std::memory_order_acquire);
     244             :   }
     245             : 
     246             :   /// Returns the capacity of the queue.
     247           1 :   [[nodiscard]] size_t capacity() const noexcept { return capacity_ - 1; }
     248             : 
     249             :  private:
     250           0 :   static constexpr size_t capacity_ = Capacity + 1;
     251           0 :   std::array<T, capacity_ + 2 * padding_> data_{};
     252             : 
     253             :   // Align to cache line size in order to avoid false sharing
     254             :   // read_index_cache_ and write_index_cache_ is used to reduce the amount of
     255             :   // cache coherency traffic
     256           0 :   alignas(cache_line_size_) std::atomic<size_t> write_index_{0};
     257           0 :   alignas(cache_line_size_) size_t read_index_cache_{0};
     258           0 :   alignas(cache_line_size_) std::atomic<size_t> read_index_{0};
     259           0 :   alignas(cache_line_size_) size_t write_index_cache_{0};
     260             : 
     261             :   // Padding to avoid adjacent allocations from sharing a cache line with
     262             :   // write_index_cache_
     263             :   // NOLINTNEXTLINE(modernize-avoid-c-arrays)
     264           0 :   char padding_data_[cache_line_size_ - sizeof(write_index_cache_)]{};
     265             : };
     266             : }  // namespace Parallel

Generated by: LCOV version 1.14