Line data Source code
1 0 : // Distributed under the MIT License. 2 : // See LICENSE.txt for details. 3 : 4 : #pragma once 5 : 6 : #include <atomic> 7 : #include <concepts> 8 : #include <cstddef> 9 : #include <cstdint> 10 : #include <memory> 11 : #include <mutex> 12 : #include <new> // for hardware_destructive_interference_size 13 : #include <stdexcept> 14 : #include <utility> 15 : #include <vector> 16 : 17 : #include "Parallel/MultiReaderSpinlock.hpp" 18 : #include "Utilities/ErrorHandling/Assert.hpp" 19 : #include "Utilities/Gsl.hpp" 20 : 21 : namespace Parallel { 22 : /*! 23 : * \brief A threadsafe parallel first-in-first-out cache. 24 : */ 25 : template <class T> 26 1 : class FifoCache { 27 0 : using stored_type = std::pair<MultiReaderSpinlock, T>; 28 0 : using value_type = std::vector<stored_type>; 29 : 30 : public: 31 : /*! 32 : * \brief Wrapper type used as the result from `find` to ensure correct 33 : * thread safety. 34 : * 35 : * Use `.value()` to get the stored value. 36 : * 37 : * Use `.has_value()` to check if a value is stored. 38 : */ 39 1 : struct Cached { 40 : public: 41 0 : Cached() = delete; 42 0 : explicit Cached(const stored_type* t); 43 : /// NOLINTNEXTLINE(google-explicit-constructor) 44 1 : Cached(const std::nullopt_t /*unused*/) : Cached{nullptr} {} 45 0 : Cached(const Cached& rhs); 46 0 : Cached& operator=(const Cached& rhs); 47 0 : Cached(Cached&& rhs) noexcept(true); 48 0 : Cached& operator=(Cached&& rhs) noexcept(true); 49 0 : ~Cached() noexcept(true); 50 : 51 : /// \brief Returns a reference to the held object. 52 : /// 53 : /// \throws std::runtime_error if no value 54 1 : auto value() const -> const T&; 55 : 56 : /// \brief Returns `true` if a value is stored, otherwise returns `false`. 57 1 : bool has_value() const { return t_ != nullptr; } 58 : 59 : private: 60 0 : stored_type* t_; 61 : }; 62 : 63 : /// \brief Create a FifoCache that has \p capacity. 64 1 : explicit FifoCache(std::unsigned_integral auto capacity); 65 : 66 0 : FifoCache() = delete; 67 0 : FifoCache(const FifoCache& rhs) = delete; 68 0 : FifoCache& operator=(const FifoCache& rhs) = delete; 69 0 : FifoCache(FifoCache&& rhs) = delete; 70 0 : FifoCache& operator=(FifoCache&& rhs) = delete; 71 0 : ~FifoCache() = default; 72 : 73 : /*! 74 : * \brief Pushes the entry computed by `compute_value()` to the front of the 75 : * queue, ejecting the last entry if the capacity is reached. The inserted 76 : * entry is returned. 77 : * 78 : * This function allows lazy computation of the value, which means the 79 : * computation is elided if another thread pushes the new cache entry before 80 : * this one. 81 : * 82 : * The predicate must satisfy `predicate(t) == true` to avoid inserting 83 : * duplicates. This is best guaranteed by passing the same predicate that 84 : * would be passed to calls to `find()`. 85 : */ 86 : template <class ComputeValue, class UnaryPredicate> 87 1 : auto push(ComputeValue&& compute_value, const UnaryPredicate& predicate) 88 : -> Cached; 89 : 90 : /*! 91 : * \brief Pushes the entry `t` to the front of the queue, 92 : * ejecting the last entry if the capacity is reached. The inserted 93 : * entry is returned. 94 : * 95 : * The predicate must satisfy `predicate(t) == true` to avoid inserting 96 : * duplicates. This is best guaranteed by passing the same predicate that 97 : * would be passed to calls to `find()`. 98 : */ 99 : template <class UnaryPredicate> 100 1 : auto push(T t, const UnaryPredicate& predicate) -> Cached; 101 : 102 : /*! 103 : * \brief Get the first element that matches `predicate`. 104 : * 105 : * If no value in the cache matches the predicate then `result.has_value()` 106 : * is `false` and `result.value()` will throw. 107 : * 108 : * The return type is designed to handle the locking and unlocking 109 : * of the data to ensure thread safety. 110 : */ 111 : template <class UnaryPredicate> 112 1 : [[nodiscard]] auto find(const UnaryPredicate& predicate) const -> Cached; 113 : 114 : private: 115 : #if defined(__cpp_lib_hardware_interference_size) 116 : static constexpr size_t cache_line_size_ = 117 : std::hardware_destructive_interference_size; 118 : #else 119 0 : static constexpr size_t cache_line_size_ = 64; 120 : #endif 121 : 122 : // std::vector does not have an atomic size so in order to prevent tearing 123 : // we have to track the size separately. 124 0 : alignas(cache_line_size_) std::atomic<std::uint64_t> size_{0}; 125 0 : alignas(cache_line_size_) std::mutex write_lock_{}; 126 0 : alignas(cache_line_size_) value_type data_{}; 127 : // Ensure we pad the end to avoid false sharing. Unlikely to happen because 128 : // of the layout, but better to be safe. 129 : #if defined(__clang__) 130 : #pragma GCC diagnostic push 131 : #pragma GCC diagnostic ignored "-Wunused-private-field" 132 : #endif 133 : // NOLINTNEXTLINE(modernize-avoid-c-arrays) 134 0 : char padding_[cache_line_size_ - sizeof(value_type) % cache_line_size_] = {}; 135 : #if defined(__clang__) 136 : #pragma GCC diagnostic pop 137 : #endif 138 : }; 139 : 140 : template <class T> 141 : FifoCache<T>::FifoCache(const std::unsigned_integral auto capacity) 142 : : data_(capacity) { 143 : ASSERT(capacity > 0, "Must have a positive capacity but got " << capacity); 144 : } 145 : 146 : template <class T> 147 : template <class ComputeValue, class UnaryPredicate> 148 : auto FifoCache<T>::push(ComputeValue&& compute_value, 149 : const UnaryPredicate& predicate) -> Cached { 150 : std::lock_guard guard(write_lock_); 151 : auto size = size_.load(std::memory_order_acquire); 152 : for (size_t i = 0; i < size; ++i) { 153 : Cached vt{std::addressof(data_[i])}; 154 : if (predicate(vt.value())) { 155 : return {vt}; 156 : } 157 : } 158 : 159 : // Compute the new value _before_ locking data to minimize locked time. 160 : T new_value = std::forward<ComputeValue>(compute_value)(); 161 : if (size < data_.capacity()) { 162 : ++size; 163 : } 164 : data_[size - 1].first.write_lock(); 165 : for (size_t i = size - 1; i > 0; --i) { 166 : data_[i - 1].first.write_lock(); 167 : data_[i].second = std::move(data_[i - 1].second); 168 : } 169 : data_[0].second = std::move(new_value); 170 : for (size_t i = 0; i < size; ++i) { 171 : data_[i].first.write_unlock(); 172 : } 173 : size_.store(size, std::memory_order_release); 174 : return find(predicate); 175 : } 176 : 177 : template <class T> 178 : template <class UnaryPredicate> 179 : auto FifoCache<T>::push(T t, const UnaryPredicate& predicate) -> Cached { 180 : return push( 181 : [t_local = std::move(t)]() mutable -> T // NOLINT(spectre-mutable) 182 : { return std::move(t_local); }, 183 : predicate); 184 : } 185 : 186 : template <class T> 187 : template <class UnaryPredicate> 188 : [[nodiscard]] auto FifoCache<T>::find(const UnaryPredicate& predicate) const 189 : -> Cached { 190 : const auto size = size_.load(std::memory_order_relaxed); 191 : for (size_t i = 0; i < size; ++i) { 192 : Cached vt{std::addressof(data_[i])}; 193 : if (predicate(vt.value())) { 194 : return vt; 195 : } 196 : } 197 : return Cached{nullptr}; 198 : } 199 : 200 : template <class T> 201 : FifoCache<T>::Cached::Cached(const stored_type* t) 202 : // NOLINTNEXTLINE(cppcoreguidelines-pro-type-const-cast) 203 : : t_(const_cast<stored_type*>(t)) { 204 : if (t_ != nullptr) { 205 : t_->first.read_lock(); 206 : } 207 : } 208 : 209 : template <class T> 210 : FifoCache<T>::Cached::Cached(const Cached& rhs) : t_(rhs.t_) { 211 : if (t_ != nullptr) { 212 : t_->first.read_lock(); 213 : } 214 : } 215 : 216 : template <class T> 217 : typename FifoCache<T>::Cached& FifoCache<T>::Cached::operator=( 218 : const Cached& rhs) { 219 : if (&rhs == this) { 220 : return *this; 221 : } 222 : if (t_ != nullptr) { 223 : t_->first.read_unlock(); 224 : } 225 : t_ = rhs.t_; 226 : if (t_ != nullptr) { 227 : t_->first.read_lock(); 228 : } 229 : return *this; 230 : } 231 : 232 : template <class T> 233 : FifoCache<T>::Cached::Cached(Cached&& rhs) noexcept(true) : t_(rhs.t_) { 234 : rhs.t_ = nullptr; 235 : } 236 : 237 : template <class T> 238 : typename FifoCache<T>::Cached& FifoCache<T>::Cached::operator=( 239 : Cached&& rhs) noexcept(true) { 240 : if (&rhs == this) { 241 : return *this; 242 : } 243 : if (t_ != nullptr) { 244 : t_->first.read_unlock(); 245 : } 246 : t_ = rhs.t_; 247 : rhs.t_ = nullptr; 248 : return *this; 249 : } 250 : 251 : template <class T> 252 : FifoCache<T>::Cached::~Cached() noexcept(true) { 253 : if (t_ != nullptr) { 254 : t_->first.read_unlock(); 255 : } 256 : } 257 : 258 : template <class T> 259 : auto FifoCache<T>::Cached::value() const -> const T& { 260 : if (UNLIKELY(not has_value())) { 261 : throw std::runtime_error{"No value in FifoCache."}; 262 : } 263 : return t_->second; 264 : } 265 : } // namespace Parallel