Bug 1890689 remove DynamicResampler::mSetBufferDuration r=pehrsons
[gecko.git] / mfbt / SPSCQueue.h
blobbd4223d70ae14da05e0c09b119c26ae9ecf30dee
1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* vim: set ts=8 sts=2 et sw=2 tw=80: */
3 /* This Source Code Form is subject to the terms of the Mozilla Public
4 * License, v. 2.0. If a copy of the MPL was not distributed with this
5 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
7 /* Single producer single consumer lock-free and wait-free queue. */
9 #ifndef mozilla_LockFreeQueue_h
10 #define mozilla_LockFreeQueue_h
12 #include "mozilla/Assertions.h"
13 #include "mozilla/Attributes.h"
14 #include "mozilla/PodOperations.h"
15 #include <algorithm>
16 #include <atomic>
17 #include <cstddef>
18 #include <limits>
19 #include <memory>
20 #include <thread>
21 #include <type_traits>
23 namespace mozilla {
25 namespace detail {
26 template <typename T, bool IsPod = std::is_trivial<T>::value>
27 struct MemoryOperations {
28 /**
29 * This allows zeroing (using memset) or default-constructing a number of
30 * elements calling the constructors if necessary.
32 static void ConstructDefault(T* aDestination, size_t aCount);
33 /**
34 * This allows either moving (if T supports it) or copying a number of
35 * elements from a `aSource` pointer to a `aDestination` pointer.
36 * If it is safe to do so and this call copies, this uses PodCopy. Otherwise,
37 * constructors and destructors are called in a loop.
39 static void MoveOrCopy(T* aDestination, T* aSource, size_t aCount);
42 template <typename T>
43 struct MemoryOperations<T, true> {
44 static void ConstructDefault(T* aDestination, size_t aCount) {
45 PodZero(aDestination, aCount);
47 static void MoveOrCopy(T* aDestination, T* aSource, size_t aCount) {
48 PodCopy(aDestination, aSource, aCount);
52 template <typename T>
53 struct MemoryOperations<T, false> {
54 static void ConstructDefault(T* aDestination, size_t aCount) {
55 for (size_t i = 0; i < aCount; i++) {
56 aDestination[i] = T();
59 static void MoveOrCopy(T* aDestination, T* aSource, size_t aCount) {
60 std::move(aSource, aSource + aCount, aDestination);
63 } // namespace detail
65 /**
66 * This data structure allows producing data from one thread, and consuming it
67 * on another thread, safely and without explicit synchronization.
69 * The role for the producer and the consumer must be constant, i.e., the
70 * producer should always be on one thread and the consumer should always be on
71 * another thread.
73 * Some words about the inner workings of this class:
74 * - Capacity is fixed. Only one allocation is performed, in the constructor.
75 * When reading and writing, the return value of the method allows checking if
76 * the ring buffer is empty or full.
77 * - We always keep the read index at least one element ahead of the write
78 * index, so we can distinguish between an empty and a full ring buffer: an
79 * empty ring buffer is when the write index is at the same position as the
80 * read index. A full buffer is when the write index is exactly one position
81 * before the read index.
82 * - We synchronize updates to the read index after having read the data, and
83 * the write index after having written the data. This means that the each
84 * thread can only touch a portion of the buffer that is not touched by the
85 * other thread.
86 * - Callers are expected to provide buffers. When writing to the queue,
87 * elements are copied into the internal storage from the buffer passed in.
88 * When reading from the queue, the user is expected to provide a buffer.
89 * Because this is a ring buffer, data might not be contiguous in memory;
90 * providing an external buffer to copy into is an easy way to have linear
91 * data for further processing.
93 template <typename T>
94 class SPSCRingBufferBase {
95 public:
96 /**
97 * Constructor for a ring buffer.
99 * This performs an allocation on the heap, but is the only allocation that
100 * will happen for the life time of a `SPSCRingBufferBase`.
102 * @param Capacity The maximum number of element this ring buffer will hold.
104 explicit SPSCRingBufferBase(int aCapacity)
105 : mReadIndex(0),
106 mWriteIndex(0),
107 /* One more element to distinguish from empty and full buffer. */
108 mCapacity(aCapacity + 1) {
109 MOZ_RELEASE_ASSERT(aCapacity != std::numeric_limits<int>::max());
110 MOZ_RELEASE_ASSERT(mCapacity > 0);
112 mData = std::make_unique<T[]>(StorageCapacity());
114 std::atomic_thread_fence(std::memory_order_seq_cst);
117 * Push `aCount` zero or default constructed elements in the array.
119 * Only safely called on the producer thread.
121 * @param count The number of elements to enqueue.
122 * @return The number of element enqueued.
124 [[nodiscard]] int EnqueueDefault(int aCount) {
125 return Enqueue(nullptr, aCount);
128 * @brief Put an element in the queue.
130 * Only safely called on the producer thread.
132 * @param element The element to put in the queue.
134 * @return 1 if the element was inserted, 0 otherwise.
136 [[nodiscard]] int Enqueue(T& aElement) { return Enqueue(&aElement, 1); }
138 * Push `aCount` elements in the ring buffer.
140 * Only safely called on the producer thread.
142 * @param elements a pointer to a buffer containing at least `count` elements.
143 * If `elements` is nullptr, zero or default constructed elements are enqueud.
144 * @param count The number of elements to read from `elements`
145 * @return The number of elements successfully coped from `elements` and
146 * inserted into the ring buffer.
148 [[nodiscard]] int Enqueue(T* aElements, int aCount) {
149 #ifdef DEBUG
150 AssertCorrectThread(mProducerId);
151 #endif
153 int rdIdx = mReadIndex.load(std::memory_order_acquire);
154 int wrIdx = mWriteIndex.load(std::memory_order_relaxed);
156 if (IsFull(rdIdx, wrIdx)) {
157 return 0;
160 int toWrite = std::min(AvailableWriteInternal(rdIdx, wrIdx), aCount);
162 /* First part, from the write index to the end of the array. */
163 int firstPart = std::min(StorageCapacity() - wrIdx, toWrite);
164 /* Second part, from the beginning of the array */
165 int secondPart = toWrite - firstPart;
167 if (aElements) {
168 detail::MemoryOperations<T>::MoveOrCopy(mData.get() + wrIdx, aElements,
169 firstPart);
170 detail::MemoryOperations<T>::MoveOrCopy(
171 mData.get(), aElements + firstPart, secondPart);
172 } else {
173 detail::MemoryOperations<T>::ConstructDefault(mData.get() + wrIdx,
174 firstPart);
175 detail::MemoryOperations<T>::ConstructDefault(mData.get(), secondPart);
178 mWriteIndex.store(IncrementIndex(wrIdx, toWrite),
179 std::memory_order_release);
181 return toWrite;
184 * Retrieve at most `count` elements from the ring buffer, and copy them to
185 * `elements`, if non-null.
187 * Only safely called on the consumer side.
189 * @param elements A pointer to a buffer with space for at least `count`
190 * elements. If `elements` is `nullptr`, `count` element will be discarded.
191 * @param count The maximum number of elements to Dequeue.
192 * @return The number of elements written to `elements`.
194 [[nodiscard]] int Dequeue(T* elements, int count) {
195 #ifdef DEBUG
196 AssertCorrectThread(mConsumerId);
197 #endif
199 int wrIdx = mWriteIndex.load(std::memory_order_acquire);
200 int rdIdx = mReadIndex.load(std::memory_order_relaxed);
202 if (IsEmpty(rdIdx, wrIdx)) {
203 return 0;
206 int toRead = std::min(AvailableReadInternal(rdIdx, wrIdx), count);
208 int firstPart = std::min(StorageCapacity() - rdIdx, toRead);
209 int secondPart = toRead - firstPart;
211 if (elements) {
212 detail::MemoryOperations<T>::MoveOrCopy(elements, mData.get() + rdIdx,
213 firstPart);
214 detail::MemoryOperations<T>::MoveOrCopy(elements + firstPart, mData.get(),
215 secondPart);
218 mReadIndex.store(IncrementIndex(rdIdx, toRead), std::memory_order_release);
220 return toRead;
223 * Get the number of available elements for consuming.
225 * This can be less than the actual number of elements in the queue, since the
226 * mWriteIndex is updated at the very end of the Enqueue method on the
227 * producer thread, but consequently always returns a number of elements such
228 * that a call to Dequeue return this number of elements.
230 * @return The number of available elements for reading.
232 int AvailableRead() const {
233 return AvailableReadInternal(mReadIndex.load(std::memory_order_relaxed),
234 mWriteIndex.load(std::memory_order_relaxed));
237 * Get the number of available elements for writing.
239 * This can be less than than the actual number of slots that are available,
240 * because mReadIndex is updated at the very end of the Deque method. It
241 * always returns a number such that a call to Enqueue with this number will
242 * succeed in enqueuing this number of elements.
244 * @return The number of empty slots in the buffer, available for writing.
246 int AvailableWrite() const {
247 return AvailableWriteInternal(mReadIndex.load(std::memory_order_relaxed),
248 mWriteIndex.load(std::memory_order_relaxed));
251 * Get the total Capacity, for this ring buffer.
253 * Can be called safely on any thread.
255 * @return The maximum Capacity of this ring buffer.
257 int Capacity() const { return StorageCapacity() - 1; }
260 * Reset the consumer thread id to the current thread. The caller must
261 * guarantee that the last call to Dequeue() on the previous consumer thread
262 * has completed, and subsequent calls to Dequeue() will only happen on the
263 * current thread.
265 void ResetConsumerThreadId() {
266 #ifdef DEBUG
267 mConsumerId = std::this_thread::get_id();
268 #endif
270 // When changing consumer from thread A to B, the last Dequeue on A (synced
271 // by mReadIndex.store with memory_order_release) must be picked up by B
272 // through an acquire operation.
273 std::ignore = mReadIndex.load(std::memory_order_acquire);
277 * Reset the producer thread id to the current thread. The caller must
278 * guarantee that the last call to Enqueue() on the previous consumer thread
279 * has completed, and subsequent calls to Dequeue() will only happen on the
280 * current thread.
282 void ResetProducerThreadId() {
283 #ifdef DEBUG
284 mProducerId = std::this_thread::get_id();
285 #endif
287 // When changing producer from thread A to B, the last Enqueue on A (synced
288 // by mWriteIndex.store with memory_order_release) must be picked up by B
289 // through an acquire operation.
290 std::ignore = mWriteIndex.load(std::memory_order_acquire);
293 private:
294 /** Return true if the ring buffer is empty.
296 * This can be called from the consumer or the producer thread.
298 * @param aReadIndex the read index to consider
299 * @param writeIndex the write index to consider
300 * @return true if the ring buffer is empty, false otherwise.
302 bool IsEmpty(int aReadIndex, int aWriteIndex) const {
303 return aWriteIndex == aReadIndex;
305 /** Return true if the ring buffer is full.
307 * This happens if the write index is exactly one element behind the read
308 * index.
310 * This can be called from the consummer or the producer thread.
312 * @param aReadIndex the read index to consider
313 * @param writeIndex the write index to consider
314 * @return true if the ring buffer is full, false otherwise.
316 bool IsFull(int aReadIndex, int aWriteIndex) const {
317 return (aWriteIndex + 1) % StorageCapacity() == aReadIndex;
320 * Return the size of the storage. It is one more than the number of elements
321 * that can be stored in the buffer.
323 * This can be called from any thread.
325 * @return the number of elements that can be stored in the buffer.
327 int StorageCapacity() const { return mCapacity; }
329 * Returns the number of elements available for reading.
331 * This can be called from the consummer or producer thread, but see the
332 * comment in `AvailableRead`.
334 * @return the number of available elements for reading.
336 int AvailableReadInternal(int aReadIndex, int aWriteIndex) const {
337 if (aWriteIndex >= aReadIndex) {
338 return aWriteIndex - aReadIndex;
339 } else {
340 return aWriteIndex + StorageCapacity() - aReadIndex;
344 * Returns the number of empty elements, available for writing.
346 * This can be called from the consummer or producer thread, but see the
347 * comment in `AvailableWrite`.
349 * @return the number of elements that can be written into the array.
351 int AvailableWriteInternal(int aReadIndex, int aWriteIndex) const {
352 /* We subtract one element here to always keep at least one sample
353 * free in the buffer, to distinguish between full and empty array. */
354 int rv = aReadIndex - aWriteIndex - 1;
355 if (aWriteIndex >= aReadIndex) {
356 rv += StorageCapacity();
358 return rv;
361 * Increments an index, wrapping it around the storage.
363 * Incrementing `mWriteIndex` can be done on the producer thread.
364 * Incrementing `mReadIndex` can be done on the consummer thread.
366 * @param index a reference to the index to increment.
367 * @param increment the number by which `index` is incremented.
368 * @return the new index.
370 int IncrementIndex(int aIndex, int aIncrement) const {
371 MOZ_ASSERT(aIncrement >= 0 && aIncrement < StorageCapacity() &&
372 aIndex < StorageCapacity());
373 return (aIndex + aIncrement) % StorageCapacity();
376 * @brief This allows checking that Enqueue (resp. Dequeue) are always
377 * called by the right thread.
379 * The role of the thread are assigned the first time they call Enqueue or
380 * Dequeue, and cannot change, except by a ResetThreadId method.
382 * @param id the id of the thread that has called the calling method first.
384 #ifdef DEBUG
385 static void AssertCorrectThread(std::thread::id& aId) {
386 if (aId == std::thread::id()) {
387 aId = std::this_thread::get_id();
388 return;
390 MOZ_ASSERT(aId == std::this_thread::get_id());
392 #endif
393 /** Index at which the oldest element is. */
394 std::atomic<int> mReadIndex;
395 /** Index at which to write new elements. `mWriteIndex` is always at
396 * least one element ahead of `mReadIndex`. */
397 std::atomic<int> mWriteIndex;
398 /** Maximum number of elements that can be stored in the ring buffer. */
399 const int mCapacity;
400 /** Data storage, of size `mCapacity + 1` */
401 std::unique_ptr<T[]> mData;
402 #ifdef DEBUG
403 /** The id of the only thread that is allowed to read from the queue. */
404 mutable std::thread::id mConsumerId;
405 /** The id of the only thread that is allowed to write from the queue. */
406 mutable std::thread::id mProducerId;
407 #endif
411 * Instantiation of the `SPSCRingBufferBase` type. This is safe to use
412 * from two threads, one producer, one consumer (that never change role),
413 * without explicit synchronization.
415 template <typename T>
416 using SPSCQueue = SPSCRingBufferBase<T>;
418 } // namespace mozilla
420 #endif // mozilla_LockFreeQueue_h