Line data Source code
1 0 : // Distributed under the MIT License. 2 : // See LICENSE.txt for details. 3 : 4 : /* 5 : *The original code is distributed under the following copyright and license: 6 : * 7 : * Copyright (c) 2020 Erik Rigtorp <erik@rigtorp.se> 8 : * 9 : * Permission is hereby granted, free of charge, to any person obtaining a copy 10 : * of this software and associated documentation files (the "Software"), to deal 11 : * in the Software without restriction, including without limitation the rights 12 : * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 13 : * copies of the Software, and to permit persons to whom the Software is 14 : * furnished to do so, subject to the following conditions: 15 : * 16 : * The above copyright notice and this permission notice shall be included in 17 : * all copies or substantial portions of the Software. 18 : * 19 : * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 20 : * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 21 : * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 22 : * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 23 : * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 24 : * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 25 : * SOFTWARE. 26 : * 27 : * 28 : * SXS Modifications: 29 : * 1. Casing to match SpECTRE conventions 30 : * 2. Static capacity 31 : * 3. Storage is std::array 32 : * 4. Switch to west-const 33 : * 34 : */ 35 : 36 : #pragma once 37 : 38 : #include <atomic> 39 : #include <cassert> 40 : #include <cstddef> 41 : #include <new> // Placement new 42 : #include <stdexcept> 43 : #include <type_traits> 44 : 45 : #include "Utilities/ErrorHandling/Assert.hpp" 46 : #include "Utilities/Requires.hpp" 47 : 48 : namespace Parallel { 49 : /*! 50 : * \brief A static capacity runtime-sized single-producer single-consumer 51 : * lockfree queue. 52 : * 53 : * As long as only one thread reads and writes simultaneously the queue is 54 : * threadsafe. Which threads read and write can change throughout program 55 : * execution, the important thing is that there is no instance during the 56 : * execution where more than one thread tries to read and where more than one 57 : * thread tries to write. 58 : * 59 : * \note This class is intentionally not serializable since handling 60 : * threadsafety around serialization requires careful thought of the individual 61 : * circumstances. 62 : */ 63 : template <typename T, size_t Capacity> 64 1 : class StaticSpscQueue { 65 : private: 66 : #ifdef __cpp_lib_hardware_interference_size 67 : static constexpr size_t cache_line_size_ = 68 : std::hardware_destructive_interference_size; 69 : #else 70 0 : static constexpr size_t cache_line_size_ = 64; 71 : #endif 72 : 73 : // Padding to avoid false sharing between slots_ and adjacent allocations 74 0 : static constexpr size_t padding_ = (cache_line_size_ - 1) / sizeof(T) + 1; 75 : 76 : public: 77 0 : StaticSpscQueue() = default; 78 0 : ~StaticSpscQueue() { 79 : // Destruct objects in the buffer. 80 : while (front()) { 81 : pop(); 82 : } 83 : } 84 : 85 0 : StaticSpscQueue(const StaticSpscQueue&) = delete; 86 0 : StaticSpscQueue& operator=(const StaticSpscQueue&) = delete; 87 0 : StaticSpscQueue(StaticSpscQueue&&) = delete; 88 0 : StaticSpscQueue& operator=(StaticSpscQueue&&) = delete; 89 : 90 : /// Construct a new element at the end of the queue in place. 91 : /// 92 : /// Uses placement new for in-place construction. 93 : /// 94 : /// \warning This may overwrite existing elements if `capacity()` is 95 : /// exceeded without warning. 96 : template <typename... Args> 97 1 : void emplace(Args&&... args) noexcept( 98 : std::is_nothrow_constructible_v<T, Args&&...>) { 99 : static_assert(std::is_constructible_v<T, Args&&...>, 100 : "T must be constructible with Args&&..."); 101 : const auto write_index = write_index_.load(std::memory_order_relaxed); 102 : auto next_write_index = write_index + 1; 103 : if (next_write_index == capacity_) { 104 : next_write_index = 0; 105 : } 106 : while (next_write_index == read_index_cache_) { 107 : read_index_cache_ = read_index_.load(std::memory_order_acquire); 108 : } 109 : new (&data_[write_index + padding_]) T(std::forward<Args>(args)...); 110 : write_index_.store(next_write_index, std::memory_order_release); 111 : } 112 : 113 : /// Construct a new element at the end of the queue in place. 114 : /// 115 : /// Uses placement new for in-place construction. 116 : /// 117 : /// Returns `true` if the emplacement succeeded and `false` if it did 118 : /// not. If it failed then the queue is currently full. 119 : template <typename... Args> 120 1 : [[nodiscard]] bool try_emplace(Args&&... args) noexcept( 121 : std::is_nothrow_constructible_v<T, Args&&...>) { 122 : static_assert(std::is_constructible_v<T, Args&&...>, 123 : "T must be constructible with Args&&..."); 124 : const auto write_index = write_index_.load(std::memory_order_relaxed); 125 : auto next_write_index = write_index + 1; 126 : if (next_write_index == capacity_) { 127 : next_write_index = 0; 128 : } 129 : if (next_write_index == read_index_cache_) { 130 : read_index_cache_ = read_index_.load(std::memory_order_acquire); 131 : if (next_write_index == read_index_cache_) { 132 : return false; 133 : } 134 : } 135 : new (&data_[write_index + padding_]) T(std::forward<Args>(args)...); 136 : write_index_.store(next_write_index, std::memory_order_release); 137 : return true; 138 : } 139 : 140 : /// Push a new element to the end of the queue. 141 : /// 142 : /// Uses `emplace()` internally. 143 : /// 144 : /// \warning This may overwrite existing elements if `capacity()` is 145 : /// exceeded without warning. 146 1 : void push(const T& v) noexcept(std::is_nothrow_copy_constructible_v<T>) { 147 : static_assert(std::is_copy_constructible_v<T>, 148 : "T must be copy constructible"); 149 : emplace(v); 150 : } 151 : 152 : /// Push a new element to the end of the queue. 153 : /// 154 : /// Uses `emplace()` internally. 155 : /// 156 : /// \warning This may overwrite existing elements if `capacity()` is 157 : /// exceeded without warning. 158 : template <typename P, Requires<std::is_constructible_v<T, P&&>> = nullptr> 159 1 : void push(P&& v) noexcept(std::is_nothrow_constructible_v<T, P&&>) { 160 : emplace(std::forward<P>(v)); 161 : } 162 : 163 : /// Push a new element to the end of the queue. Returns `false` if the queue 164 : /// is at capacity and does not push the new object, otherwise returns `true`. 165 : /// 166 : /// Uses `try_emplace()` internally. 167 1 : [[nodiscard]] bool try_push(const T& v) noexcept( 168 : std::is_nothrow_copy_constructible_v<T>) { 169 : static_assert(std::is_copy_constructible_v<T>, 170 : "T must be copy constructible"); 171 : return try_emplace(v); 172 : } 173 : 174 : /// Push a new element to the end of the queue. Returns `false` if the queue 175 : /// is at capacity and does not push the new object, otherwise returns `true`. 176 : /// 177 : /// Uses `try_emplace()` internally. 178 : template <typename P, Requires<std::is_constructible_v<T, P&&>> = nullptr> 179 1 : [[nodiscard]] bool try_push(P&& v) noexcept( 180 : std::is_nothrow_constructible_v<T, P&&>) { 181 : return try_emplace(std::forward<P>(v)); 182 : } 183 : 184 : /// Returns the first element from the queue. 185 : /// 186 : /// \note Returns `nullptr` if the queue is empty. 187 1 : [[nodiscard]] T* front() noexcept { 188 : const auto read_index = read_index_.load(std::memory_order_relaxed); 189 : if (read_index == write_index_cache_) { 190 : write_index_cache_ = write_index_.load(std::memory_order_acquire); 191 : if (write_index_cache_ == read_index) { 192 : return nullptr; 193 : } 194 : } 195 : return &data_[read_index + padding_]; 196 : } 197 : 198 : /// Removes the first element from the queue. 199 1 : void pop() { 200 : static_assert(std::is_nothrow_destructible_v<T>, 201 : "T must be nothrow destructible"); 202 : const auto read_index = read_index_.load(std::memory_order_relaxed); 203 : #ifdef SPECTRE_DEBUG 204 : const auto write_index = write_index_.load(std::memory_order_acquire); 205 : ASSERT(write_index != read_index, 206 : "Can't pop an element from an empty queue. read_index: " 207 : << read_index << " write_index " << write_index); 208 : #endif // SPECTRE_DEBUG 209 : data_[read_index + padding_].~T(); 210 : auto next_read_index = read_index + 1; 211 : if (next_read_index == capacity_) { 212 : next_read_index = 0; 213 : } 214 : if (read_index == write_index_cache_) { 215 : write_index_cache_ = next_read_index; 216 : } 217 : read_index_.store(next_read_index, std::memory_order_release); 218 : } 219 : 220 : /// Returns the size of the queue at a particular hardware state. 221 : /// 222 : /// Note that while this can be checked in a threadsafe manner, it is up to 223 : /// the user to guarantee that another thread does not change the queue 224 : /// between when `size()` is called and how the result is used. 225 1 : [[nodiscard]] size_t size() const noexcept { 226 : std::ptrdiff_t diff = static_cast<std::ptrdiff_t>( 227 : write_index_.load(std::memory_order_acquire)) - 228 : static_cast<std::ptrdiff_t>( 229 : read_index_.load(std::memory_order_acquire)); 230 : if (diff < 0) { 231 : diff += static_cast<std::ptrdiff_t>(capacity_); 232 : } 233 : return static_cast<size_t>(diff); 234 : } 235 : 236 : /// Returns `true` if the queue may be empty, otherwise `false`. 237 : /// 238 : /// Note that while this can be checked in a threadsafe manner, it is up to 239 : /// the user to guarantee that another thread does not change the queue 240 : /// between when `empty()` is called and how the result is used. 241 1 : [[nodiscard]] bool empty() const noexcept { 242 : return write_index_.load(std::memory_order_acquire) == 243 : read_index_.load(std::memory_order_acquire); 244 : } 245 : 246 : /// Returns the capacity of the queue. 247 1 : [[nodiscard]] size_t capacity() const noexcept { return capacity_ - 1; } 248 : 249 : private: 250 0 : static constexpr size_t capacity_ = Capacity + 1; 251 0 : std::array<T, capacity_ + 2 * padding_> data_{}; 252 : 253 : // Align to cache line size in order to avoid false sharing 254 : // read_index_cache_ and write_index_cache_ is used to reduce the amount of 255 : // cache coherency traffic 256 0 : alignas(cache_line_size_) std::atomic<size_t> write_index_{0}; 257 0 : alignas(cache_line_size_) size_t read_index_cache_{0}; 258 0 : alignas(cache_line_size_) std::atomic<size_t> read_index_{0}; 259 0 : alignas(cache_line_size_) size_t write_index_cache_{0}; 260 : 261 : // Padding to avoid adjacent allocations from sharing a cache line with 262 : // write_index_cache_ 263 : // NOLINTNEXTLINE(modernize-avoid-c-arrays) 264 0 : char padding_data_[cache_line_size_ - sizeof(write_index_cache_)]{}; 265 : }; 266 : } // namespace Parallel