Bug 1807268 - Fix verifyOpenAllInNewTabsOptionTest UI test r=ohorvath
[gecko.git] / mfbt / MPSCQueue.h
blob8edd260344f52d456fe92c8aed48931e51720b7d
1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* vim: set ts=8 sts=2 et sw=2 tw=80: */
3 /* This Source Code Form is subject to the terms of the Mozilla Public
4 * License, v. 2.0. If a copy of the MPL was not distributed with this
5 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
7 /*
8 * Multiple Producer Single Consumer lock-free queue.
9 * Allocation-free is guaranteed outside of the constructor.
11 * This is a direct C++ port from
12 * https://docs.rs/signal-hook/0.3.17/src/signal_hook/low_level/channel.rs.html#1-235
13 * with the exception we are using atomic uint64t to have 15 slots in the ring
14 * buffer (Rust implem is 5 slots, we want a bit more).
15 * */
17 #ifndef mozilla_MPSCQueue_h
18 #define mozilla_MPSCQueue_h
20 #include "mozilla/Assertions.h"
21 #include "mozilla/Attributes.h"
22 #include "mozilla/PodOperations.h"
23 #include <algorithm>
24 #include <atomic>
25 #include <cstddef>
26 #include <limits>
27 #include <memory>
28 #include <thread>
29 #include <type_traits>
30 #include <optional>
31 #include <inttypes.h>
33 namespace mozilla {
35 namespace detail {
36 template <typename T, bool IsPod = std::is_trivial<T>::value>
37 struct MemoryOperations {
38 /**
39 * This allows either moving (if T supports it) or copying a number of
40 * elements from a `aSource` pointer to a `aDestination` pointer.
41 * If it is safe to do so and this call copies, this uses PodCopy. Otherwise,
42 * constructors and destructors are called in a loop.
44 static void MoveOrCopy(T* aDestination, T* aSource, size_t aCount);
47 template <typename T>
48 struct MemoryOperations<T, true> {
49 static void MoveOrCopy(T* aDestination, T* aSource, size_t aCount) {
50 PodCopy(aDestination, aSource, aCount);
54 template <typename T>
55 struct MemoryOperations<T, false> {
56 static void MoveOrCopy(T* aDestination, T* aSource, size_t aCount) {
57 std::move(aSource, aSource + aCount, aDestination);
60 } // namespace detail
62 static const bool MPSC_DEBUG = false;
64 static const size_t kMaxCapacity = 16;
66 /**
67 * This data structure allows producing data from several threads, and consuming
68 * it on one thread, safely and without performing memory allocations or
69 * locking.
71 * The role for the producers and the consumer must be constant, i.e., the
72 * producer should always be on one thread and the consumer should always be on
73 * another thread.
75 * Some words about the inner workings of this class:
76 * - Capacity is fixed. Only one allocation is performed, in the constructor.
77 * - Maximum capacity is 15 elements, with 0 being used to denote an empty set.
78 * This is a hard limitation from encoding indexes within the atomic uint64_t.
79 * - This is lock-free but not wait-free, it might spin a little until
80 * compare/exchange succeeds.
81 * - There is no guarantee of forward progression for individual threads.
82 * - This should be safe to use from a signal handler context.
84 template <typename T>
85 class MPSCRingBufferBase {
86 public:
87 explicit MPSCRingBufferBase(size_t aCapacity)
88 : mFree(0), mOccupied(0), mCapacity(aCapacity + 1) {
89 MOZ_RELEASE_ASSERT(aCapacity < kMaxCapacity);
91 if constexpr (MPSC_DEBUG) {
92 fprintf(stderr,
93 "[%s] this=%p { mCapacity=%zu, mBits=%" PRIu64
94 ", mMask=0x%" PRIx64 " }\n",
95 __PRETTY_FUNCTION__, this, mCapacity, mBits, mMask);
98 // Leave one empty space in the queue, used to distinguish an empty queue
99 // from a full one, as in the SPSCQueue.
100 // https://docs.rs/signal-hook/0.3.17/src/signal_hook/low_level/channel.rs.html#126
101 for (uint64_t i = 1; i < StorageCapacity(); ++i) {
102 MarkSlot(mFree, i);
105 // This should be the only allocation performed, thus it cannot be performed
106 // in a restricted context (e.g., signal handler, real-time thread)
107 mData = std::make_unique<T[]>(Capacity());
109 std::atomic_thread_fence(std::memory_order_seq_cst);
113 * @brief Put an element in the queue. The caller MUST check the return value
114 * and maybe loop to try again (or drop if acceptable).
116 * First it attempts to acuire a slot (storage index) that is known to be
117 * non used. If that is not successfull then 0 is returned. If that is
118 * successfull, the slot is ours (it has been exclusively acquired) and data
119 * can be copied into the ring buffer at that index.
121 * @param aElement The element to put in the queue.
123 * @return 0 if insertion could not be performed, inserted index otherwise
125 [[nodiscard]] int Send(T& aElement) {
126 std::optional<uint64_t> empty_idx = UnmarkSlot(mFree);
127 if (empty_idx.has_value()) {
128 detail::MemoryOperations<T>::MoveOrCopy(&mData[*empty_idx - 1], &aElement,
130 MarkSlot(mOccupied, *empty_idx);
131 return *empty_idx;
133 return 0;
137 * Retrieve one element from the ring buffer, and copy it to
138 * `aElement`, if non-null.
140 * It attempts to acquire a slot from the list of used ones. If that is not
141 * successfull, then 0 is returned. Once a slot has been exclusively acquired,
142 * data is copied from it into the non-null pointer passed in parameter.
144 * @param aElement A pointer to a `T` where data will be copied.
146 * @return The index from which data was copied, 0 if there was nothing in the
147 * ring buffer.
149 [[nodiscard]] int Recv(T* aElement) {
150 std::optional<uint64_t> idx = UnmarkSlot(mOccupied);
151 if (idx.has_value()) {
152 if (aElement) {
153 detail::MemoryOperations<T>::MoveOrCopy(aElement, &mData[*idx - 1], 1);
155 MarkSlot(mFree, *idx);
156 return *idx;
158 return 0;
161 size_t Capacity() const { return StorageCapacity() - 1; }
163 private:
165 * Get/Set manipulates the encoding within `aNumber` by storing the index as a
166 * number and shifting it to the left (set) or right (get).
168 * Initial `aNumber` value is 0.
170 * Set() with first index value (1), we store the index on mBits and we shift
171 * it to the left, e.g., as follows:
173 * aNumber=0b00000000000000000000000000000000000000000000000000000000000000
174 * aIndex=0 aValue=1
175 * aNumber=0b00000000000000000000000000000000000000000000000000000000000001
176 * aIndex=1 aValue=33
177 * aNumber=0b00000000000000000000000000000000000000000000000000000000100001
178 * aIndex=2 aValue=801
179 * aNumber=0b00000000000000000000000000000000000000000000000000001100100001
180 * aIndex=3 aValue=17185
181 * aNumber=0b00000000000000000000000000000000000000000000000100001100100001
182 * aIndex=4 aValue=344865
183 * aNumber=0b00000000000000000000000000000000000000000001010100001100100001
184 * aIndex=5 aValue=6636321
185 * aNumber=0b00000000000000000000000000000000000000011001010100001100100001
186 * aIndex=6 aValue=124076833
187 * aNumber=0b00000000000000000000000000000000000111011001010100001100100001
188 * aIndex=7 aValue=2271560481
189 * aNumber=0b00000000000000000000000000000010000111011001010100001100100001
190 * aIndex=8 aValue=40926266145
191 * aNumber=0b00000000000000000000000000100110000111011001010100001100100001
192 * aIndex=9 aValue=728121033505
193 * aNumber=0b00000000000000000000001010100110000111011001010100001100100001
194 * aIndex=10 aValue=12822748939041
195 * aNumber=0b00000000000000000010111010100110000111011001010100001100100001
196 * aIndex=11 aValue=223928981472033
197 * aNumber=0b00000000000000110010111010100110000111011001010100001100100001
198 * aIndex=12 aValue=3883103678710561
199 * aNumber=0b00000000001101110010111010100110000111011001010100001100100001
200 * aIndex=13 aValue=66933498461897505
201 * aNumber=0b00000011101101110010111010100110000111011001010100001100100001
202 * aIndex=14 aValue=1147797409030816545
204 [[nodiscard]] uint64_t Get(uint64_t aNumber, uint64_t aIndex) {
205 return (aNumber >> (mBits * aIndex)) & mMask;
208 [[nodiscard]] uint64_t Set(uint64_t aNumber, uint64_t aIndex,
209 uint64_t aValue) {
210 return (aNumber & ~(mMask << (mBits * aIndex))) |
211 (aValue << (mBits * aIndex));
215 * Enqueue a value in the ring buffer at aIndex.
217 * Takes the current uint64_t value from the atomic and try to acquire a non
218 * used slot in the ring buffer. If unsucessfull, 0 is returned, otherwise
219 * compute the new atomic value that holds the new state of usage of the
220 * slots, and use compare/exchange to perform lock-free synchronization:
221 * compare/exchanges succeeds when the current value and the modified one are
222 * equal, reflecting an acquired lock. If another thread was concurrent to
223 * this one, then it would fail to that operation, and go into the next
224 * iteration of the loop to read the new state value from the atomic, and
225 * acquire a different slot.
227 * @param aSlotStatus a uint64_t atomic that is used to perform lock-free
228 * thread exclusions
230 * @param aIndex the index where we want to enqueue. It should come from the
231 * empty queue
232 * */
233 void MarkSlot(std::atomic<uint64_t>& aSlotStatus, uint64_t aIndex) {
234 uint64_t current =
235 aSlotStatus.load(std::memory_order::memory_order_relaxed);
236 do {
237 // Attempts to find a slot that is available to enqueue, without
238 // cross-thread synchronization
239 auto empty = [&]() -> std::optional<uint64_t> {
240 for (uint64_t i = 0; i < Capacity(); ++i) {
241 if (Get(current, i) == 0) {
242 return i;
245 return {};
246 }();
247 if (!empty.has_value()) {
248 // Rust does expect() which would panic:
249 // https://docs.rs/signal-hook/0.3.17/src/signal_hook/low_level/channel.rs.html#62
250 // If there's no empty place, then it would be up to the caller to deal
251 // with that
252 MOZ_CRASH("No empty slot available");
254 uint64_t modified = Set(current, *empty, aIndex);
255 // This is where the lock-free synchronization happens ; if `current`
256 // matches the content of `aSlotStatus`, then store `modified` in
257 // aSlotStatus and succeeds. Upon success it means no other thread has
258 // tried to change the same value at the same time, so the lock was safely
259 // acquired.
261 // Upon failure, it means another thread tried at the same time to use the
262 // same slot, so a new iteration of the loop needs to be executed to try
263 // another slot.
265 // In case of success (`aSlotStatus`'s content is equal to `current`), we
266 // require memory_order_release for the read-modify-write operation
267 // because we want to make sure when acquiring a slot that any concurrent
268 // thread performing a write had a chance to do it.
270 // In case of failure we require memory_order_relaxed for the load
271 // operation because we dont need synchronization at that point.
272 if (aSlotStatus.compare_exchange_weak(
273 current, modified, std::memory_order::memory_order_release,
274 std::memory_order::memory_order_relaxed)) {
275 if constexpr (MPSC_DEBUG) {
276 fprintf(stderr,
277 "[enqueue] modified=0x%" PRIx64 " => index=%" PRIu64 "\n",
278 modified, aIndex);
280 return;
282 } while (true);
286 * Dequeue a value from the ring buffer.
288 * Takes the current value from the uint64_t atomic and read the current index
289 * out of it. If that index is 0 then we are facing a lack of slots and we
290 * return, the caller MUST check this and deal with the situation. If the
291 * index is non null we can try to acquire the matching slot in the ring
292 * buffer thanks to the compare/exchange loop. When the compare/exchange call
293 * succeeds, then the slot was acquired.
295 * @param aSlotStatus a uint64_t atomic that is used to perform lock-free
296 * thread exclusions
297 * */
298 [[nodiscard]] std::optional<uint64_t> UnmarkSlot(
299 std::atomic<uint64_t>& aSlotStatus) {
300 uint64_t current =
301 aSlotStatus.load(std::memory_order::memory_order_relaxed);
302 do {
303 uint64_t index = current & mMask;
304 if (index == 0) {
305 // Return a None
306 // https://docs.rs/signal-hook/0.3.17/src/signal_hook/low_level/channel.rs.html#77
307 // If we return None while dequeuing on mFree then we are full and the
308 // caller needs to deal with that.
309 return {};
311 uint64_t modified = current >> mBits;
312 // See the comment in MarkSlot for details
314 // In case of success (`aSlotStatus`'s content is equal to `current`), we
315 // require memory_order_acquire for the read-modify-write operation
316 // because we want to make sure when unmarking a slot that any concurrent
317 // thread performing a read will see the value we are writing.
319 // In case of failure we require memory_order_relaxed for the load
320 // operation because we dont need synchronization at that point.
321 if (aSlotStatus.compare_exchange_weak(
322 current, modified, std::memory_order::memory_order_acquire,
323 std::memory_order::memory_order_relaxed)) {
324 if constexpr (MPSC_DEBUG) {
325 fprintf(stderr,
326 "[dequeue] current=0x%" PRIx64 " => index=%" PRIu64 "\n",
327 current, index);
329 return index;
331 } while (true);
332 return {};
335 // Return the number of elements we can store within the ring buffer, whereas
336 // Capacity() will return the amount of elements in mData, including the 0
337 // value.
338 [[nodiscard]] size_t StorageCapacity() const { return mCapacity; }
340 // For the atomics below they are manipulated by Get()/Set(), and we are using
341 // them to store the IDs of the ring buffer usage (empty/full).
343 // We use mBits bits to store an ID (so we are limited to 16 and 0 is
344 // reserved) and append each of them to the atomics.
346 // A 0 value in one of those denotes we are full for the atomic, i.e.,
347 // mFree=0 means we are full and mOccupied=0 means we are empty.
349 // Holds the IDs of the free slots in the ring buffer
350 std::atomic<uint64_t> mFree;
352 // Holds the IDs of the occupied slots in the ring buffer
353 std::atomic<uint64_t> mOccupied;
355 const size_t mCapacity;
357 // The actual ring buffer
358 std::unique_ptr<T[]> mData;
360 // How we are using the uint64_t atomic above to store the IDs of the ring
361 // buffer.
362 static const uint64_t mBits = 4;
363 static const uint64_t mMask = 0b1111;
367 * Instantiation of the `MPSCRingBufferBase` type. This is safe to use from
368 * several producers threads and one one consumer (that never changes role),
369 * without explicit synchronization nor allocation (outside of the constructor).
371 template <typename T>
372 using MPSCQueue = MPSCRingBufferBase<T>;
374 } // namespace mozilla
376 #endif // mozilla_MPSCQueue_h