Bug 1610775 [wpt PR 21336] - Update urllib3 to 1.25.8, a=testonly
[gecko.git] / mfbt / SPSCQueue.h
blob9dbeb79a91c0eebb2e41fc87eb9434c191186a70
1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* vim: set ts=8 sts=2 et sw=2 tw=80: */
3 /* This Source Code Form is subject to the terms of the Mozilla Public
4 * License, v. 2.0. If a copy of the MPL was not distributed with this
5 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
7 /* Single producer single consumer lock-free and wait-free queue. */
9 #ifndef mozilla_LockFreeQueue_h
10 #define mozilla_LockFreeQueue_h
12 #include "mozilla/Assertions.h"
13 #include "mozilla/Attributes.h"
14 #include "mozilla/PodOperations.h"
15 #include <algorithm>
16 #include <atomic>
17 #include <cstdint>
18 #include <memory>
19 #include <thread>
21 namespace mozilla {
23 namespace details {
24 template <typename T, bool IsPod = std::is_trivial<T>::value>
25 struct MemoryOperations {
26 /**
27 * This allows zeroing (using memset) or default-constructing a number of
28 * elements calling the constructors if necessary.
30 static void ConstructDefault(T* aDestination, size_t aCount);
31 /**
32 * This allows either moving (if T supports it) or copying a number of
33 * elements from a `aSource` pointer to a `aDestination` pointer.
34 * If it is safe to do so and this call copies, this uses PodCopy. Otherwise,
35 * constructors and destructors are called in a loop.
37 static void MoveOrCopy(T* aDestination, T* aSource, size_t aCount);
40 template <typename T>
41 struct MemoryOperations<T, true> {
42 static void ConstructDefault(T* aDestination, size_t aCount) {
43 PodZero(aDestination, aCount);
45 static void MoveOrCopy(T* aDestination, T* aSource, size_t aCount) {
46 PodCopy(aDestination, aSource, aCount);
50 template <typename T>
51 struct MemoryOperations<T, false> {
52 static void ConstructDefault(T* aDestination, size_t aCount) {
53 for (size_t i = 0; i < aCount; i++) {
54 aDestination[i] = T();
57 static void MoveOrCopy(T* aDestination, T* aSource, size_t aCount) {
58 std::move(aSource, aSource + aCount, aDestination);
61 } // namespace details
63 /**
64 * This data structure allows producing data from one thread, and consuming it
65 * on another thread, safely and without explicit synchronization.
67 * The role for the producer and the consumer must be constant, i.e., the
68 * producer should always be on one thread and the consumer should always be on
69 * another thread.
71 * Some words about the inner workings of this class:
72 * - Capacity is fixed. Only one allocation is performed, in the constructor.
73 * When reading and writing, the return value of the method allows checking if
74 * the ring buffer is empty or full.
75 * - We always keep the read index at least one element ahead of the write
76 * index, so we can distinguish between an empty and a full ring buffer: an
77 * empty ring buffer is when the write index is at the same position as the
78 * read index. A full buffer is when the write index is exactly one position
79 * before the read index.
80 * - We synchronize updates to the read index after having read the data, and
81 * the write index after having written the data. This means that the each
82 * thread can only touch a portion of the buffer that is not touched by the
83 * other thread.
84 * - Callers are expected to provide buffers. When writing to the queue,
85 * elements are copied into the internal storage from the buffer passed in.
86 * When reading from the queue, the user is expected to provide a buffer.
87 * Because this is a ring buffer, data might not be contiguous in memory;
88 * providing an external buffer to copy into is an easy way to have linear
89 * data for further processing.
91 template <typename T>
92 class SPSCRingBufferBase {
93 public:
94 /**
95 * Constructor for a ring buffer.
97 * This performs an allocation on the heap, but is the only allocation that
98 * will happen for the life time of a `SPSCRingBufferBase`.
100 * @param Capacity The maximum number of element this ring buffer will hold.
102 explicit SPSCRingBufferBase(int aCapacity)
103 : mReadIndex(0),
104 mWriteIndex(0)
105 /* One more element to distinguish from empty and full buffer. */
107 mCapacity(aCapacity + 1) {
108 MOZ_ASSERT(StorageCapacity() < std::numeric_limits<int>::max() / 2,
109 "buffer too large for the type of index used.");
110 MOZ_ASSERT(mCapacity > 0 && aCapacity != std::numeric_limits<int>::max());
112 mData = std::make_unique<T[]>(StorageCapacity());
114 std::atomic_thread_fence(std::memory_order::memory_order_seq_cst);
117 * Push `aCount` zero or default constructed elements in the array.
119 * Only safely called on the producer thread.
121 * @param count The number of elements to enqueue.
122 * @return The number of element enqueued.
124 MOZ_MUST_USE
125 int EnqueueDefault(int aCount) { return Enqueue(nullptr, aCount); }
127 * @brief Put an element in the queue.
129 * Only safely called on the producer thread.
131 * @param element The element to put in the queue.
133 * @return 1 if the element was inserted, 0 otherwise.
135 MOZ_MUST_USE
136 int Enqueue(T& aElement) { return Enqueue(&aElement, 1); }
138 * Push `aCount` elements in the ring buffer.
140 * Only safely called on the producer thread.
142 * @param elements a pointer to a buffer containing at least `count` elements.
143 * If `elements` is nullptr, zero or default constructed elements are enqueud.
144 * @param count The number of elements to read from `elements`
145 * @return The number of elements successfully coped from `elements` and
146 * inserted into the ring buffer.
148 MOZ_MUST_USE
149 int Enqueue(T* aElements, int aCount) {
150 #ifdef DEBUG
151 AssertCorrectThread(mProducerId);
152 #endif
154 int rdIdx = mReadIndex.load(std::memory_order::memory_order_acquire);
155 int wrIdx = mWriteIndex.load(std::memory_order::memory_order_relaxed);
157 if (IsFull(rdIdx, wrIdx)) {
158 return 0;
161 int toWrite = std::min(AvailableWriteInternal(rdIdx, wrIdx), aCount);
163 /* First part, from the write index to the end of the array. */
164 int firstPart = std::min(StorageCapacity() - wrIdx, toWrite);
165 /* Second part, from the beginning of the array */
166 int secondPart = toWrite - firstPart;
168 if (aElements) {
169 details::MemoryOperations<T>::MoveOrCopy(mData.get() + wrIdx, aElements,
170 firstPart);
171 details::MemoryOperations<T>::MoveOrCopy(
172 mData.get(), aElements + firstPart, secondPart);
173 } else {
174 details::MemoryOperations<T>::ConstructDefault(mData.get() + wrIdx,
175 firstPart);
176 details::MemoryOperations<T>::ConstructDefault(mData.get(), secondPart);
179 mWriteIndex.store(IncrementIndex(wrIdx, toWrite),
180 std::memory_order::memory_order_release);
182 return toWrite;
185 * Retrieve at most `count` elements from the ring buffer, and copy them to
186 * `elements`, if non-null.
188 * Only safely called on the consumer side.
190 * @param elements A pointer to a buffer with space for at least `count`
191 * elements. If `elements` is `nullptr`, `count` element will be discarded.
192 * @param count The maximum number of elements to Dequeue.
193 * @return The number of elements written to `elements`.
195 MOZ_MUST_USE
196 int Dequeue(T* elements, int count) {
197 #ifdef DEBUG
198 AssertCorrectThread(mConsumerId);
199 #endif
201 int wrIdx = mWriteIndex.load(std::memory_order::memory_order_acquire);
202 int rdIdx = mReadIndex.load(std::memory_order::memory_order_relaxed);
204 if (IsEmpty(rdIdx, wrIdx)) {
205 return 0;
208 int toRead = std::min(AvailableReadInternal(rdIdx, wrIdx), count);
210 int firstPart = std::min(StorageCapacity() - rdIdx, toRead);
211 int secondPart = toRead - firstPart;
213 if (elements) {
214 details::MemoryOperations<T>::MoveOrCopy(elements, mData.get() + rdIdx,
215 firstPart);
216 details::MemoryOperations<T>::MoveOrCopy(elements + firstPart,
217 mData.get(), secondPart);
220 mReadIndex.store(IncrementIndex(rdIdx, toRead),
221 std::memory_order::memory_order_release);
223 return toRead;
226 * Get the number of available elements for consuming.
228 * Only safely called on the consumer thread. This can be less than the actual
229 * number of elements in the queue, since the mWriteIndex is updated at the
230 * very end of the Enqueue method on the producer thread, but consequently
231 * always returns a number of elements such that a call to Dequeue return this
232 * number of elements.
234 * @return The number of available elements for reading.
236 int AvailableRead() const {
237 #ifdef DEBUG
238 AssertCorrectThread(mConsumerId);
239 #endif
240 return AvailableReadInternal(
241 mReadIndex.load(std::memory_order::memory_order_relaxed),
242 mWriteIndex.load(std::memory_order::memory_order_relaxed));
245 * Get the number of available elements for writing.
247 * Only safely called on the producer thread. This can be less than than the
248 * actual number of slots that are available, because mReadIndex is update at
249 * the very end of the Deque method. It always returns a number such that a
250 * call to Enqueue with this number will succeed in enqueuing this number of
251 * elements.
253 * @return The number of empty slots in the buffer, available for writing.
255 int AvailableWrite() const {
256 #ifdef DEBUG
257 AssertCorrectThread(mProducerId);
258 #endif
259 return AvailableWriteInternal(
260 mReadIndex.load(std::memory_order::memory_order_relaxed),
261 mWriteIndex.load(std::memory_order::memory_order_relaxed));
264 * Get the total Capacity, for this ring buffer.
266 * Can be called safely on any thread.
268 * @return The maximum Capacity of this ring buffer.
270 int Capacity() const { return StorageCapacity() - 1; }
272 * Reset the consumer and producer thread identifier, in case the threads are
273 * being changed. This has to be externally synchronized. This is no-op when
274 * asserts are disabled.
276 void ResetThreadIds() {
277 #ifdef DEBUG
278 mConsumerId = mProducerId = std::thread::id();
279 #endif
282 private:
283 /** Return true if the ring buffer is empty.
285 * This can be called from the consumer or the producer thread.
287 * @param aReadIndex the read index to consider
288 * @param writeIndex the write index to consider
289 * @return true if the ring buffer is empty, false otherwise.
291 bool IsEmpty(int aReadIndex, int aWriteIndex) const {
292 return aWriteIndex == aReadIndex;
294 /** Return true if the ring buffer is full.
296 * This happens if the write index is exactly one element behind the read
297 * index.
299 * This can be called from the consummer or the producer thread.
301 * @param aReadIndex the read index to consider
302 * @param writeIndex the write index to consider
303 * @return true if the ring buffer is full, false otherwise.
305 bool IsFull(int aReadIndex, int aWriteIndex) const {
306 return (aWriteIndex + 1) % StorageCapacity() == aReadIndex;
309 * Return the size of the storage. It is one more than the number of elements
310 * that can be stored in the buffer.
312 * This can be called from any thread.
314 * @return the number of elements that can be stored in the buffer.
316 int StorageCapacity() const { return mCapacity; }
318 * Returns the number of elements available for reading.
320 * This can be called from the consummer or producer thread, but see the
321 * comment in `AvailableRead`.
323 * @return the number of available elements for reading.
325 int AvailableReadInternal(int aReadIndex, int aWriteIndex) const {
326 if (aWriteIndex >= aReadIndex) {
327 return aWriteIndex - aReadIndex;
328 } else {
329 return aWriteIndex + StorageCapacity() - aReadIndex;
333 * Returns the number of empty elements, available for writing.
335 * This can be called from the consummer or producer thread, but see the
336 * comment in `AvailableWrite`.
338 * @return the number of elements that can be written into the array.
340 int AvailableWriteInternal(int aReadIndex, int aWriteIndex) const {
341 /* We subtract one element here to always keep at least one sample
342 * free in the buffer, to distinguish between full and empty array. */
343 int rv = aReadIndex - aWriteIndex - 1;
344 if (aWriteIndex >= aReadIndex) {
345 rv += StorageCapacity();
347 return rv;
350 * Increments an index, wrapping it around the storage.
352 * Incrementing `mWriteIndex` can be done on the producer thread.
353 * Incrementing `mReadIndex` can be done on the consummer thread.
355 * @param index a reference to the index to increment.
356 * @param increment the number by which `index` is incremented.
357 * @return the new index.
359 int IncrementIndex(int aIndex, int aIncrement) const {
360 MOZ_ASSERT(aIncrement >= 0 && aIncrement < StorageCapacity() &&
361 aIndex < StorageCapacity());
362 return (aIndex + aIncrement) % StorageCapacity();
365 * @brief This allows checking that Enqueue (resp. Dequeue) are always
366 * called by the right thread.
368 * The role of the thread are assigned the first time they call Enqueue or
369 * Dequeue, and cannot change, except when ResetThreadIds is called..
371 * @param id the id of the thread that has called the calling method first.
373 #ifdef DEBUG
374 static void AssertCorrectThread(std::thread::id& aId) {
375 if (aId == std::thread::id()) {
376 aId = std::this_thread::get_id();
377 return;
379 MOZ_ASSERT(aId == std::this_thread::get_id());
381 #endif
382 /** Index at which the oldest element is. */
383 std::atomic<int> mReadIndex;
384 /** Index at which to write new elements. `mWriteIndex` is always at
385 * least one element ahead of `mReadIndex`. */
386 std::atomic<int> mWriteIndex;
387 /** Maximum number of elements that can be stored in the ring buffer. */
388 const int mCapacity;
389 /** Data storage, of size `mCapacity + 1` */
390 std::unique_ptr<T[]> mData;
391 #ifdef DEBUG
392 /** The id of the only thread that is allowed to read from the queue. */
393 mutable std::thread::id mConsumerId;
394 /** The id of the only thread that is allowed to write from the queue. */
395 mutable std::thread::id mProducerId;
396 #endif
400 * Instantiation of the `SPSCRingBufferBase` type. This is safe to use
401 * from two threads, one producer, one consumer (that never change role),
402 * without explicit synchronization.
404 template <typename T>
405 using SPSCQueue = SPSCRingBufferBase<T>;
407 } // namespace mozilla
409 #endif // mozilla_LockFreeQueue_h