1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* vim: set ts=8 sts=2 et sw=2 tw=80: */
3 /* This Source Code Form is subject to the terms of the Mozilla Public
4 * License, v. 2.0. If a copy of the MPL was not distributed with this
5 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
7 /* Single producer single consumer lock-free and wait-free queue. */
9 #ifndef mozilla_LockFreeQueue_h
10 #define mozilla_LockFreeQueue_h
12 #include "mozilla/Assertions.h"
13 #include "mozilla/Attributes.h"
14 #include "mozilla/PodOperations.h"
21 #include <type_traits>
26 template <typename T
, bool IsPod
= std::is_trivial
<T
>::value
>
27 struct MemoryOperations
{
29 * This allows zeroing (using memset) or default-constructing a number of
30 * elements calling the constructors if necessary.
32 static void ConstructDefault(T
* aDestination
, size_t aCount
);
34 * This allows either moving (if T supports it) or copying a number of
35 * elements from a `aSource` pointer to a `aDestination` pointer.
36 * If it is safe to do so and this call copies, this uses PodCopy. Otherwise,
37 * constructors and destructors are called in a loop.
39 static void MoveOrCopy(T
* aDestination
, T
* aSource
, size_t aCount
);
43 struct MemoryOperations
<T
, true> {
44 static void ConstructDefault(T
* aDestination
, size_t aCount
) {
45 PodZero(aDestination
, aCount
);
47 static void MoveOrCopy(T
* aDestination
, T
* aSource
, size_t aCount
) {
48 PodCopy(aDestination
, aSource
, aCount
);
53 struct MemoryOperations
<T
, false> {
54 static void ConstructDefault(T
* aDestination
, size_t aCount
) {
55 for (size_t i
= 0; i
< aCount
; i
++) {
56 aDestination
[i
] = T();
59 static void MoveOrCopy(T
* aDestination
, T
* aSource
, size_t aCount
) {
60 std::move(aSource
, aSource
+ aCount
, aDestination
);
66 * This data structure allows producing data from one thread, and consuming it
67 * on another thread, safely and without explicit synchronization.
69 * The role for the producer and the consumer must be constant, i.e., the
70 * producer should always be on one thread and the consumer should always be on
73 * Some words about the inner workings of this class:
74 * - Capacity is fixed. Only one allocation is performed, in the constructor.
75 * When reading and writing, the return value of the method allows checking if
76 * the ring buffer is empty or full.
77 * - We always keep the read index at least one element ahead of the write
78 * index, so we can distinguish between an empty and a full ring buffer: an
79 * empty ring buffer is when the write index is at the same position as the
80 * read index. A full buffer is when the write index is exactly one position
81 * before the read index.
82 * - We synchronize updates to the read index after having read the data, and
83 * the write index after having written the data. This means that the each
84 * thread can only touch a portion of the buffer that is not touched by the
86 * - Callers are expected to provide buffers. When writing to the queue,
87 * elements are copied into the internal storage from the buffer passed in.
88 * When reading from the queue, the user is expected to provide a buffer.
89 * Because this is a ring buffer, data might not be contiguous in memory;
90 * providing an external buffer to copy into is an easy way to have linear
91 * data for further processing.
94 class SPSCRingBufferBase
{
97 * Constructor for a ring buffer.
99 * This performs an allocation on the heap, but is the only allocation that
100 * will happen for the life time of a `SPSCRingBufferBase`.
102 * @param Capacity The maximum number of element this ring buffer will hold.
104 explicit SPSCRingBufferBase(int aCapacity
)
107 /* One more element to distinguish from empty and full buffer. */
109 mCapacity(aCapacity
+ 1) {
110 MOZ_ASSERT(StorageCapacity() < std::numeric_limits
<int>::max() / 2,
111 "buffer too large for the type of index used.");
112 MOZ_ASSERT(mCapacity
> 0 && aCapacity
!= std::numeric_limits
<int>::max());
114 mData
= std::make_unique
<T
[]>(StorageCapacity());
116 std::atomic_thread_fence(std::memory_order::memory_order_seq_cst
);
119 * Push `aCount` zero or default constructed elements in the array.
121 * Only safely called on the producer thread.
123 * @param count The number of elements to enqueue.
124 * @return The number of element enqueued.
127 int EnqueueDefault(int aCount
) { return Enqueue(nullptr, aCount
); }
129 * @brief Put an element in the queue.
131 * Only safely called on the producer thread.
133 * @param element The element to put in the queue.
135 * @return 1 if the element was inserted, 0 otherwise.
138 int Enqueue(T
& aElement
) { return Enqueue(&aElement
, 1); }
140 * Push `aCount` elements in the ring buffer.
142 * Only safely called on the producer thread.
144 * @param elements a pointer to a buffer containing at least `count` elements.
145 * If `elements` is nullptr, zero or default constructed elements are enqueud.
146 * @param count The number of elements to read from `elements`
147 * @return The number of elements successfully coped from `elements` and
148 * inserted into the ring buffer.
151 int Enqueue(T
* aElements
, int aCount
) {
153 AssertCorrectThread(mProducerId
);
156 int rdIdx
= mReadIndex
.load(std::memory_order::memory_order_acquire
);
157 int wrIdx
= mWriteIndex
.load(std::memory_order::memory_order_relaxed
);
159 if (IsFull(rdIdx
, wrIdx
)) {
163 int toWrite
= std::min(AvailableWriteInternal(rdIdx
, wrIdx
), aCount
);
165 /* First part, from the write index to the end of the array. */
166 int firstPart
= std::min(StorageCapacity() - wrIdx
, toWrite
);
167 /* Second part, from the beginning of the array */
168 int secondPart
= toWrite
- firstPart
;
171 detail::MemoryOperations
<T
>::MoveOrCopy(mData
.get() + wrIdx
, aElements
,
173 detail::MemoryOperations
<T
>::MoveOrCopy(
174 mData
.get(), aElements
+ firstPart
, secondPart
);
176 detail::MemoryOperations
<T
>::ConstructDefault(mData
.get() + wrIdx
,
178 detail::MemoryOperations
<T
>::ConstructDefault(mData
.get(), secondPart
);
181 mWriteIndex
.store(IncrementIndex(wrIdx
, toWrite
),
182 std::memory_order::memory_order_release
);
187 * Retrieve at most `count` elements from the ring buffer, and copy them to
188 * `elements`, if non-null.
190 * Only safely called on the consumer side.
192 * @param elements A pointer to a buffer with space for at least `count`
193 * elements. If `elements` is `nullptr`, `count` element will be discarded.
194 * @param count The maximum number of elements to Dequeue.
195 * @return The number of elements written to `elements`.
198 int Dequeue(T
* elements
, int count
) {
200 AssertCorrectThread(mConsumerId
);
203 int wrIdx
= mWriteIndex
.load(std::memory_order::memory_order_acquire
);
204 int rdIdx
= mReadIndex
.load(std::memory_order::memory_order_relaxed
);
206 if (IsEmpty(rdIdx
, wrIdx
)) {
210 int toRead
= std::min(AvailableReadInternal(rdIdx
, wrIdx
), count
);
212 int firstPart
= std::min(StorageCapacity() - rdIdx
, toRead
);
213 int secondPart
= toRead
- firstPart
;
216 detail::MemoryOperations
<T
>::MoveOrCopy(elements
, mData
.get() + rdIdx
,
218 detail::MemoryOperations
<T
>::MoveOrCopy(elements
+ firstPart
, mData
.get(),
222 mReadIndex
.store(IncrementIndex(rdIdx
, toRead
),
223 std::memory_order::memory_order_release
);
228 * Get the number of available elements for consuming.
230 * Only safely called on the consumer thread. This can be less than the actual
231 * number of elements in the queue, since the mWriteIndex is updated at the
232 * very end of the Enqueue method on the producer thread, but consequently
233 * always returns a number of elements such that a call to Dequeue return this
234 * number of elements.
236 * @return The number of available elements for reading.
238 int AvailableRead() const {
240 AssertCorrectThread(mConsumerId
);
242 return AvailableReadInternal(
243 mReadIndex
.load(std::memory_order::memory_order_relaxed
),
244 mWriteIndex
.load(std::memory_order::memory_order_relaxed
));
247 * Get the number of available elements for writing.
249 * Only safely called on the producer thread. This can be less than than the
250 * actual number of slots that are available, because mReadIndex is update at
251 * the very end of the Deque method. It always returns a number such that a
252 * call to Enqueue with this number will succeed in enqueuing this number of
255 * @return The number of empty slots in the buffer, available for writing.
257 int AvailableWrite() const {
259 AssertCorrectThread(mProducerId
);
261 return AvailableWriteInternal(
262 mReadIndex
.load(std::memory_order::memory_order_relaxed
),
263 mWriteIndex
.load(std::memory_order::memory_order_relaxed
));
266 * Get the total Capacity, for this ring buffer.
268 * Can be called safely on any thread.
270 * @return The maximum Capacity of this ring buffer.
272 int Capacity() const { return StorageCapacity() - 1; }
274 * Reset the consumer and producer thread identifier, in case the threads are
275 * being changed. This has to be externally synchronized. This is no-op when
276 * asserts are disabled.
278 void ResetThreadIds() {
280 mConsumerId
= mProducerId
= std::thread::id();
285 /** Return true if the ring buffer is empty.
287 * This can be called from the consumer or the producer thread.
289 * @param aReadIndex the read index to consider
290 * @param writeIndex the write index to consider
291 * @return true if the ring buffer is empty, false otherwise.
293 bool IsEmpty(int aReadIndex
, int aWriteIndex
) const {
294 return aWriteIndex
== aReadIndex
;
296 /** Return true if the ring buffer is full.
298 * This happens if the write index is exactly one element behind the read
301 * This can be called from the consummer or the producer thread.
303 * @param aReadIndex the read index to consider
304 * @param writeIndex the write index to consider
305 * @return true if the ring buffer is full, false otherwise.
307 bool IsFull(int aReadIndex
, int aWriteIndex
) const {
308 return (aWriteIndex
+ 1) % StorageCapacity() == aReadIndex
;
311 * Return the size of the storage. It is one more than the number of elements
312 * that can be stored in the buffer.
314 * This can be called from any thread.
316 * @return the number of elements that can be stored in the buffer.
318 int StorageCapacity() const { return mCapacity
; }
320 * Returns the number of elements available for reading.
322 * This can be called from the consummer or producer thread, but see the
323 * comment in `AvailableRead`.
325 * @return the number of available elements for reading.
327 int AvailableReadInternal(int aReadIndex
, int aWriteIndex
) const {
328 if (aWriteIndex
>= aReadIndex
) {
329 return aWriteIndex
- aReadIndex
;
331 return aWriteIndex
+ StorageCapacity() - aReadIndex
;
335 * Returns the number of empty elements, available for writing.
337 * This can be called from the consummer or producer thread, but see the
338 * comment in `AvailableWrite`.
340 * @return the number of elements that can be written into the array.
342 int AvailableWriteInternal(int aReadIndex
, int aWriteIndex
) const {
343 /* We subtract one element here to always keep at least one sample
344 * free in the buffer, to distinguish between full and empty array. */
345 int rv
= aReadIndex
- aWriteIndex
- 1;
346 if (aWriteIndex
>= aReadIndex
) {
347 rv
+= StorageCapacity();
352 * Increments an index, wrapping it around the storage.
354 * Incrementing `mWriteIndex` can be done on the producer thread.
355 * Incrementing `mReadIndex` can be done on the consummer thread.
357 * @param index a reference to the index to increment.
358 * @param increment the number by which `index` is incremented.
359 * @return the new index.
361 int IncrementIndex(int aIndex
, int aIncrement
) const {
362 MOZ_ASSERT(aIncrement
>= 0 && aIncrement
< StorageCapacity() &&
363 aIndex
< StorageCapacity());
364 return (aIndex
+ aIncrement
) % StorageCapacity();
367 * @brief This allows checking that Enqueue (resp. Dequeue) are always
368 * called by the right thread.
370 * The role of the thread are assigned the first time they call Enqueue or
371 * Dequeue, and cannot change, except when ResetThreadIds is called..
373 * @param id the id of the thread that has called the calling method first.
376 static void AssertCorrectThread(std::thread::id
& aId
) {
377 if (aId
== std::thread::id()) {
378 aId
= std::this_thread::get_id();
381 MOZ_ASSERT(aId
== std::this_thread::get_id());
384 /** Index at which the oldest element is. */
385 std::atomic
<int> mReadIndex
;
386 /** Index at which to write new elements. `mWriteIndex` is always at
387 * least one element ahead of `mReadIndex`. */
388 std::atomic
<int> mWriteIndex
;
389 /** Maximum number of elements that can be stored in the ring buffer. */
391 /** Data storage, of size `mCapacity + 1` */
392 std::unique_ptr
<T
[]> mData
;
394 /** The id of the only thread that is allowed to read from the queue. */
395 mutable std::thread::id mConsumerId
;
396 /** The id of the only thread that is allowed to write from the queue. */
397 mutable std::thread::id mProducerId
;
402 * Instantiation of the `SPSCRingBufferBase` type. This is safe to use
403 * from two threads, one producer, one consumer (that never change role),
404 * without explicit synchronization.
406 template <typename T
>
407 using SPSCQueue
= SPSCRingBufferBase
<T
>;
409 } // namespace mozilla
411 #endif // mozilla_LockFreeQueue_h