1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* vim: set ts=8 sts=2 et sw=2 tw=80: */
3 /* This Source Code Form is subject to the terms of the Mozilla Public
4 * License, v. 2.0. If a copy of the MPL was not distributed with this
5 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
8 * Multiple Producer Single Consumer lock-free queue.
9 * Allocation-free is guaranteed outside of the constructor.
11 * This is a direct C++ port from
12 * https://docs.rs/signal-hook/0.3.17/src/signal_hook/low_level/channel.rs.html#1-235
13 * with the exception we are using atomic uint64t to have 15 slots in the ring
14 * buffer (Rust implem is 5 slots, we want a bit more).
17 #ifndef mozilla_MPSCQueue_h
18 #define mozilla_MPSCQueue_h
20 #include "mozilla/Assertions.h"
21 #include "mozilla/Attributes.h"
22 #include "mozilla/PodOperations.h"
29 #include <type_traits>
36 template <typename T
, bool IsPod
= std::is_trivial
<T
>::value
>
37 struct MemoryOperations
{
39 * This allows either moving (if T supports it) or copying a number of
40 * elements from a `aSource` pointer to a `aDestination` pointer.
41 * If it is safe to do so and this call copies, this uses PodCopy. Otherwise,
42 * constructors and destructors are called in a loop.
44 static void MoveOrCopy(T
* aDestination
, T
* aSource
, size_t aCount
);
48 struct MemoryOperations
<T
, true> {
49 static void MoveOrCopy(T
* aDestination
, T
* aSource
, size_t aCount
) {
50 PodCopy(aDestination
, aSource
, aCount
);
55 struct MemoryOperations
<T
, false> {
56 static void MoveOrCopy(T
* aDestination
, T
* aSource
, size_t aCount
) {
57 std::move(aSource
, aSource
+ aCount
, aDestination
);
62 static const bool MPSC_DEBUG
= false;
64 static const size_t kMaxCapacity
= 16;
67 * This data structure allows producing data from several threads, and consuming
68 * it on one thread, safely and without performing memory allocations or
71 * The role for the producers and the consumer must be constant, i.e., the
72 * producer should always be on one thread and the consumer should always be on
75 * Some words about the inner workings of this class:
76 * - Capacity is fixed. Only one allocation is performed, in the constructor.
77 * - Maximum capacity is 15 elements, with 0 being used to denote an empty set.
78 * This is a hard limitation from encoding indexes within the atomic uint64_t.
79 * - This is lock-free but not wait-free, it might spin a little until
80 * compare/exchange succeeds.
81 * - There is no guarantee of forward progression for individual threads.
82 * - This should be safe to use from a signal handler context.
85 class MPSCRingBufferBase
{
87 explicit MPSCRingBufferBase(size_t aCapacity
)
88 : mFree(0), mOccupied(0), mCapacity(aCapacity
+ 1) {
89 MOZ_RELEASE_ASSERT(aCapacity
< kMaxCapacity
);
91 if constexpr (MPSC_DEBUG
) {
93 "[%s] this=%p { mCapacity=%zu, mBits=%" PRIu64
94 ", mMask=0x%" PRIx64
" }\n",
95 __PRETTY_FUNCTION__
, this, mCapacity
, mBits
, mMask
);
98 // Leave one empty space in the queue, used to distinguish an empty queue
99 // from a full one, as in the SPSCQueue.
100 // https://docs.rs/signal-hook/0.3.17/src/signal_hook/low_level/channel.rs.html#126
101 for (uint64_t i
= 1; i
< StorageCapacity(); ++i
) {
105 // This should be the only allocation performed, thus it cannot be performed
106 // in a restricted context (e.g., signal handler, real-time thread)
107 mData
= std::make_unique
<T
[]>(Capacity());
109 std::atomic_thread_fence(std::memory_order_seq_cst
);
113 * @brief Put an element in the queue. The caller MUST check the return value
114 * and maybe loop to try again (or drop if acceptable).
116 * First it attempts to acuire a slot (storage index) that is known to be
117 * non used. If that is not successfull then 0 is returned. If that is
118 * successfull, the slot is ours (it has been exclusively acquired) and data
119 * can be copied into the ring buffer at that index.
121 * @param aElement The element to put in the queue.
123 * @return 0 if insertion could not be performed, inserted index otherwise
125 [[nodiscard
]] int Send(T
& aElement
) {
126 std::optional
<uint64_t> empty_idx
= UnmarkSlot(mFree
);
127 if (empty_idx
.has_value()) {
128 detail::MemoryOperations
<T
>::MoveOrCopy(&mData
[*empty_idx
- 1], &aElement
,
130 MarkSlot(mOccupied
, *empty_idx
);
137 * Retrieve one element from the ring buffer, and copy it to
138 * `aElement`, if non-null.
140 * It attempts to acquire a slot from the list of used ones. If that is not
141 * successfull, then 0 is returned. Once a slot has been exclusively acquired,
142 * data is copied from it into the non-null pointer passed in parameter.
144 * @param aElement A pointer to a `T` where data will be copied.
146 * @return The index from which data was copied, 0 if there was nothing in the
149 [[nodiscard
]] int Recv(T
* aElement
) {
150 std::optional
<uint64_t> idx
= UnmarkSlot(mOccupied
);
151 if (idx
.has_value()) {
153 detail::MemoryOperations
<T
>::MoveOrCopy(aElement
, &mData
[*idx
- 1], 1);
155 MarkSlot(mFree
, *idx
);
161 size_t Capacity() const { return StorageCapacity() - 1; }
165 * Get/Set manipulates the encoding within `aNumber` by storing the index as a
166 * number and shifting it to the left (set) or right (get).
168 * Initial `aNumber` value is 0.
170 * Set() with first index value (1), we store the index on mBits and we shift
171 * it to the left, e.g., as follows:
173 * aNumber=0b00000000000000000000000000000000000000000000000000000000000000
175 * aNumber=0b00000000000000000000000000000000000000000000000000000000000001
177 * aNumber=0b00000000000000000000000000000000000000000000000000000000100001
178 * aIndex=2 aValue=801
179 * aNumber=0b00000000000000000000000000000000000000000000000000001100100001
180 * aIndex=3 aValue=17185
181 * aNumber=0b00000000000000000000000000000000000000000000000100001100100001
182 * aIndex=4 aValue=344865
183 * aNumber=0b00000000000000000000000000000000000000000001010100001100100001
184 * aIndex=5 aValue=6636321
185 * aNumber=0b00000000000000000000000000000000000000011001010100001100100001
186 * aIndex=6 aValue=124076833
187 * aNumber=0b00000000000000000000000000000000000111011001010100001100100001
188 * aIndex=7 aValue=2271560481
189 * aNumber=0b00000000000000000000000000000010000111011001010100001100100001
190 * aIndex=8 aValue=40926266145
191 * aNumber=0b00000000000000000000000000100110000111011001010100001100100001
192 * aIndex=9 aValue=728121033505
193 * aNumber=0b00000000000000000000001010100110000111011001010100001100100001
194 * aIndex=10 aValue=12822748939041
195 * aNumber=0b00000000000000000010111010100110000111011001010100001100100001
196 * aIndex=11 aValue=223928981472033
197 * aNumber=0b00000000000000110010111010100110000111011001010100001100100001
198 * aIndex=12 aValue=3883103678710561
199 * aNumber=0b00000000001101110010111010100110000111011001010100001100100001
200 * aIndex=13 aValue=66933498461897505
201 * aNumber=0b00000011101101110010111010100110000111011001010100001100100001
202 * aIndex=14 aValue=1147797409030816545
204 [[nodiscard
]] uint64_t Get(uint64_t aNumber
, uint64_t aIndex
) {
205 return (aNumber
>> (mBits
* aIndex
)) & mMask
;
208 [[nodiscard
]] uint64_t Set(uint64_t aNumber
, uint64_t aIndex
,
210 return (aNumber
& ~(mMask
<< (mBits
* aIndex
))) |
211 (aValue
<< (mBits
* aIndex
));
215 * Enqueue a value in the ring buffer at aIndex.
217 * Takes the current uint64_t value from the atomic and try to acquire a non
218 * used slot in the ring buffer. If unsucessfull, 0 is returned, otherwise
219 * compute the new atomic value that holds the new state of usage of the
220 * slots, and use compare/exchange to perform lock-free synchronization:
221 * compare/exchanges succeeds when the current value and the modified one are
222 * equal, reflecting an acquired lock. If another thread was concurrent to
223 * this one, then it would fail to that operation, and go into the next
224 * iteration of the loop to read the new state value from the atomic, and
225 * acquire a different slot.
227 * @param aSlotStatus a uint64_t atomic that is used to perform lock-free
230 * @param aIndex the index where we want to enqueue. It should come from the
233 void MarkSlot(std::atomic
<uint64_t>& aSlotStatus
, uint64_t aIndex
) {
235 aSlotStatus
.load(std::memory_order::memory_order_relaxed
);
237 // Attempts to find a slot that is available to enqueue, without
238 // cross-thread synchronization
239 auto empty
= [&]() -> std::optional
<uint64_t> {
240 for (uint64_t i
= 0; i
< Capacity(); ++i
) {
241 if (Get(current
, i
) == 0) {
247 if (!empty
.has_value()) {
248 // Rust does expect() which would panic:
249 // https://docs.rs/signal-hook/0.3.17/src/signal_hook/low_level/channel.rs.html#62
250 // If there's no empty place, then it would be up to the caller to deal
252 MOZ_CRASH("No empty slot available");
254 uint64_t modified
= Set(current
, *empty
, aIndex
);
255 // This is where the lock-free synchronization happens ; if `current`
256 // matches the content of `aSlotStatus`, then store `modified` in
257 // aSlotStatus and succeeds. Upon success it means no other thread has
258 // tried to change the same value at the same time, so the lock was safely
261 // Upon failure, it means another thread tried at the same time to use the
262 // same slot, so a new iteration of the loop needs to be executed to try
265 // In case of success (`aSlotStatus`'s content is equal to `current`), we
266 // require memory_order_release for the read-modify-write operation
267 // because we want to make sure when acquiring a slot that any concurrent
268 // thread performing a write had a chance to do it.
270 // In case of failure we require memory_order_relaxed for the load
271 // operation because we dont need synchronization at that point.
272 if (aSlotStatus
.compare_exchange_weak(
273 current
, modified
, std::memory_order::memory_order_release
,
274 std::memory_order::memory_order_relaxed
)) {
275 if constexpr (MPSC_DEBUG
) {
277 "[enqueue] modified=0x%" PRIx64
" => index=%" PRIu64
"\n",
286 * Dequeue a value from the ring buffer.
288 * Takes the current value from the uint64_t atomic and read the current index
289 * out of it. If that index is 0 then we are facing a lack of slots and we
290 * return, the caller MUST check this and deal with the situation. If the
291 * index is non null we can try to acquire the matching slot in the ring
292 * buffer thanks to the compare/exchange loop. When the compare/exchange call
293 * succeeds, then the slot was acquired.
295 * @param aSlotStatus a uint64_t atomic that is used to perform lock-free
298 [[nodiscard
]] std::optional
<uint64_t> UnmarkSlot(
299 std::atomic
<uint64_t>& aSlotStatus
) {
301 aSlotStatus
.load(std::memory_order::memory_order_relaxed
);
303 uint64_t index
= current
& mMask
;
306 // https://docs.rs/signal-hook/0.3.17/src/signal_hook/low_level/channel.rs.html#77
307 // If we return None while dequeuing on mFree then we are full and the
308 // caller needs to deal with that.
311 uint64_t modified
= current
>> mBits
;
312 // See the comment in MarkSlot for details
314 // In case of success (`aSlotStatus`'s content is equal to `current`), we
315 // require memory_order_acquire for the read-modify-write operation
316 // because we want to make sure when unmarking a slot that any concurrent
317 // thread performing a read will see the value we are writing.
319 // In case of failure we require memory_order_relaxed for the load
320 // operation because we dont need synchronization at that point.
321 if (aSlotStatus
.compare_exchange_weak(
322 current
, modified
, std::memory_order::memory_order_acquire
,
323 std::memory_order::memory_order_relaxed
)) {
324 if constexpr (MPSC_DEBUG
) {
326 "[dequeue] current=0x%" PRIx64
" => index=%" PRIu64
"\n",
335 // Return the number of elements we can store within the ring buffer, whereas
336 // Capacity() will return the amount of elements in mData, including the 0
338 [[nodiscard
]] size_t StorageCapacity() const { return mCapacity
; }
340 // For the atomics below they are manipulated by Get()/Set(), and we are using
341 // them to store the IDs of the ring buffer usage (empty/full).
343 // We use mBits bits to store an ID (so we are limited to 16 and 0 is
344 // reserved) and append each of them to the atomics.
346 // A 0 value in one of those denotes we are full for the atomic, i.e.,
347 // mFree=0 means we are full and mOccupied=0 means we are empty.
349 // Holds the IDs of the free slots in the ring buffer
350 std::atomic
<uint64_t> mFree
;
352 // Holds the IDs of the occupied slots in the ring buffer
353 std::atomic
<uint64_t> mOccupied
;
355 const size_t mCapacity
;
357 // The actual ring buffer
358 std::unique_ptr
<T
[]> mData
;
360 // How we are using the uint64_t atomic above to store the IDs of the ring
362 static const uint64_t mBits
= 4;
363 static const uint64_t mMask
= 0b1111;
367 * Instantiation of the `MPSCRingBufferBase` type. This is safe to use from
368 * several producers threads and one one consumer (that never changes role),
369 * without explicit synchronization nor allocation (outside of the constructor).
371 template <typename T
>
372 using MPSCQueue
= MPSCRingBufferBase
<T
>;
374 } // namespace mozilla
376 #endif // mozilla_MPSCQueue_h