Backed out changeset f1500f288aba (bug 1845736) for causing bc failures @ browser_ale...
[gecko.git] / media / libcubeb / src / cubeb_ringbuffer.h
blobf0203515660745800c2f53b57abc7ac04134c374
1 /*
2 * Copyright © 2016 Mozilla Foundation
4 * This program is made available under an ISC-style license. See the
5 * accompanying file LICENSE for details.
6 */
8 #ifndef CUBEB_RING_BUFFER_H
9 #define CUBEB_RING_BUFFER_H
11 #include "cubeb_utils.h"
12 #include <algorithm>
13 #include <atomic>
14 #include <cstdint>
15 #include <memory>
16 #include <thread>
18 /**
19 * Single producer single consumer lock-free and wait-free ring buffer.
21 * This data structure allows producing data from one thread, and consuming it
22 * on another thread, safely and without explicit synchronization. If used on
23 * two threads, this data structure uses atomics for thread safety. It is
24 * possible to disable the use of atomics at compile time and only use this data
25 * structure on one thread.
27 * The role for the producer and the consumer must be constant, i.e., the
28 * producer should always be on one thread and the consumer should always be on
29 * another thread.
31 * Some words about the inner workings of this class:
32 * - Capacity is fixed. Only one allocation is performed, in the constructor.
33 * When reading and writing, the return value of the method allows checking if
34 * the ring buffer is empty or full.
35 * - We always keep the read index at least one element ahead of the write
36 * index, so we can distinguish between an empty and a full ring buffer: an
37 * empty ring buffer is when the write index is at the same position as the
38 * read index. A full buffer is when the write index is exactly one position
39 * before the read index.
40 * - We synchronize updates to the read index after having read the data, and
41 * the write index after having written the data. This means that the each
42 * thread can only touch a portion of the buffer that is not touched by the
43 * other thread.
44 * - Callers are expected to provide buffers. When writing to the queue,
45 * elements are copied into the internal storage from the buffer passed in.
46 * When reading from the queue, the user is expected to provide a buffer.
47 * Because this is a ring buffer, data might not be contiguous in memory,
48 * providing an external buffer to copy into is an easy way to have linear
49 * data for further processing.
51 template <typename T> class ring_buffer_base {
52 public:
53 /**
54 * Constructor for a ring buffer.
56 * This performs an allocation, but is the only allocation that will happen
57 * for the life time of a `ring_buffer_base`.
59 * @param capacity The maximum number of element this ring buffer will hold.
61 ring_buffer_base(int capacity)
62 /* One more element to distinguish from empty and full buffer. */
63 : capacity_(capacity + 1)
65 assert(storage_capacity() < std::numeric_limits<int>::max() / 2 &&
66 "buffer too large for the type of index used.");
67 assert(capacity_ > 0);
69 data_.reset(new T[storage_capacity()]);
70 /* If this queue is using atomics, initializing those members as the last
71 * action in the constructor acts as a full barrier, and allow capacity() to
72 * be thread-safe. */
73 write_index_ = 0;
74 read_index_ = 0;
76 /**
77 * Push `count` zero or default constructed elements in the array.
79 * Only safely called on the producer thread.
81 * @param count The number of elements to enqueue.
82 * @return The number of element enqueued.
84 int enqueue_default(int count) { return enqueue(nullptr, count); }
85 /**
86 * @brief Put an element in the queue
88 * Only safely called on the producer thread.
90 * @param element The element to put in the queue.
92 * @return 1 if the element was inserted, 0 otherwise.
94 int enqueue(T & element) { return enqueue(&element, 1); }
95 /**
96 * Push `count` elements in the ring buffer.
98 * Only safely called on the producer thread.
100 * @param elements a pointer to a buffer containing at least `count` elements.
101 * If `elements` is nullptr, zero or default constructed elements are
102 * enqueued.
103 * @param count The number of elements to read from `elements`
104 * @return The number of elements successfully coped from `elements` and
105 * inserted into the ring buffer.
107 int enqueue(T * elements, int count)
109 #ifndef NDEBUG
110 assert_correct_thread(producer_id);
111 #endif
113 int wr_idx = write_index_.load(std::memory_order_relaxed);
114 int rd_idx = read_index_.load(std::memory_order_acquire);
116 if (full_internal(rd_idx, wr_idx)) {
117 return 0;
120 int to_write = std::min(available_write_internal(rd_idx, wr_idx), count);
122 /* First part, from the write index to the end of the array. */
123 int first_part = std::min(storage_capacity() - wr_idx, to_write);
124 /* Second part, from the beginning of the array */
125 int second_part = to_write - first_part;
127 if (elements) {
128 Copy(data_.get() + wr_idx, elements, first_part);
129 Copy(data_.get(), elements + first_part, second_part);
130 } else {
131 ConstructDefault(data_.get() + wr_idx, first_part);
132 ConstructDefault(data_.get(), second_part);
135 write_index_.store(increment_index(wr_idx, to_write),
136 std::memory_order_release);
138 return to_write;
141 * Retrieve at most `count` elements from the ring buffer, and copy them to
142 * `elements`, if non-null.
144 * Only safely called on the consumer side.
146 * @param elements A pointer to a buffer with space for at least `count`
147 * elements. If `elements` is `nullptr`, `count` element will be discarded.
148 * @param count The maximum number of elements to dequeue.
149 * @return The number of elements written to `elements`.
151 int dequeue(T * elements, int count)
153 #ifndef NDEBUG
154 assert_correct_thread(consumer_id);
155 #endif
157 int rd_idx = read_index_.load(std::memory_order_relaxed);
158 int wr_idx = write_index_.load(std::memory_order_acquire);
160 if (empty_internal(rd_idx, wr_idx)) {
161 return 0;
164 int to_read = std::min(available_read_internal(rd_idx, wr_idx), count);
166 int first_part = std::min(storage_capacity() - rd_idx, to_read);
167 int second_part = to_read - first_part;
169 if (elements) {
170 Copy(elements, data_.get() + rd_idx, first_part);
171 Copy(elements + first_part, data_.get(), second_part);
174 read_index_.store(increment_index(rd_idx, to_read),
175 std::memory_order_release);
177 return to_read;
180 * Get the number of available element for consuming.
182 * Only safely called on the consumer thread.
184 * @return The number of available elements for reading.
186 int available_read() const
188 #ifndef NDEBUG
189 assert_correct_thread(consumer_id);
190 #endif
191 return available_read_internal(
192 read_index_.load(std::memory_order_relaxed),
193 write_index_.load(std::memory_order_acquire));
196 * Get the number of available elements for consuming.
198 * Only safely called on the producer thread.
200 * @return The number of empty slots in the buffer, available for writing.
202 int available_write() const
204 #ifndef NDEBUG
205 assert_correct_thread(producer_id);
206 #endif
207 return available_write_internal(
208 read_index_.load(std::memory_order_acquire),
209 write_index_.load(std::memory_order_relaxed));
212 * Get the total capacity, for this ring buffer.
214 * Can be called safely on any thread.
216 * @return The maximum capacity of this ring buffer.
218 int capacity() const { return storage_capacity() - 1; }
220 * Reset the consumer and producer thread identifier, in case the thread are
221 * being changed. This has to be externally synchronized. This is no-op when
222 * asserts are disabled.
224 void reset_thread_ids()
226 #ifndef NDEBUG
227 consumer_id = producer_id = std::thread::id();
228 #endif
231 private:
232 /** Return true if the ring buffer is empty.
234 * @param read_index the read index to consider
235 * @param write_index the write index to consider
236 * @return true if the ring buffer is empty, false otherwise.
238 bool empty_internal(int read_index, int write_index) const
240 return write_index == read_index;
242 /** Return true if the ring buffer is full.
244 * This happens if the write index is exactly one element behind the read
245 * index.
247 * @param read_index the read index to consider
248 * @param write_index the write index to consider
249 * @return true if the ring buffer is full, false otherwise.
251 bool full_internal(int read_index, int write_index) const
253 return (write_index + 1) % storage_capacity() == read_index;
256 * Return the size of the storage. It is one more than the number of elements
257 * that can be stored in the buffer.
259 * @return the number of elements that can be stored in the buffer.
261 int storage_capacity() const { return capacity_; }
263 * Returns the number of elements available for reading.
265 * @return the number of available elements for reading.
267 int available_read_internal(int read_index, int write_index) const
269 if (write_index >= read_index) {
270 return write_index - read_index;
271 } else {
272 return write_index + storage_capacity() - read_index;
276 * Returns the number of empty elements, available for writing.
278 * @return the number of elements that can be written into the array.
280 int available_write_internal(int read_index, int write_index) const
282 /* We substract one element here to always keep at least one sample
283 * free in the buffer, to distinguish between full and empty array. */
284 int rv = read_index - write_index - 1;
285 if (write_index >= read_index) {
286 rv += storage_capacity();
288 return rv;
291 * Increments an index, wrapping it around the storage.
293 * @param index a reference to the index to increment.
294 * @param increment the number by which `index` is incremented.
295 * @return the new index.
297 int increment_index(int index, int increment) const
299 assert(increment >= 0);
300 return (index + increment) % storage_capacity();
303 * @brief This allows checking that enqueue (resp. dequeue) are always called
304 * by the right thread.
306 * @param id the id of the thread that has called the calling method first.
308 #ifndef NDEBUG
309 static void assert_correct_thread(std::thread::id & id)
311 if (id == std::thread::id()) {
312 id = std::this_thread::get_id();
313 return;
315 assert(id == std::this_thread::get_id());
317 #endif
318 /** Index at which the oldest element is at, in samples. */
319 std::atomic<int> read_index_;
320 /** Index at which to write new elements. `write_index` is always at
321 * least one element ahead of `read_index_`. */
322 std::atomic<int> write_index_;
323 /** Maximum number of elements that can be stored in the ring buffer. */
324 const int capacity_;
325 /** Data storage */
326 std::unique_ptr<T[]> data_;
327 #ifndef NDEBUG
328 /** The id of the only thread that is allowed to read from the queue. */
329 mutable std::thread::id consumer_id;
330 /** The id of the only thread that is allowed to write from the queue. */
331 mutable std::thread::id producer_id;
332 #endif
336 * Adapter for `ring_buffer_base` that exposes an interface in frames.
338 template <typename T> class audio_ring_buffer_base {
339 public:
341 * @brief Constructor.
343 * @param channel_count Number of channels.
344 * @param capacity_in_frames The capacity in frames.
346 audio_ring_buffer_base(int channel_count, int capacity_in_frames)
347 : channel_count(channel_count),
348 ring_buffer(frames_to_samples(capacity_in_frames))
350 assert(channel_count > 0);
353 * @brief Enqueue silence.
355 * Only safely called on the producer thread.
357 * @param frame_count The number of frames of silence to enqueue.
358 * @return The number of frames of silence actually written to the queue.
360 int enqueue_default(int frame_count)
362 return samples_to_frames(
363 ring_buffer.enqueue(nullptr, frames_to_samples(frame_count)));
366 * @brief Enqueue `frames_count` frames of audio.
368 * Only safely called from the producer thread.
370 * @param [in] frames If non-null, the frames to enqueue.
371 * Otherwise, silent frames are enqueued.
372 * @param frame_count The number of frames to enqueue.
374 * @return The number of frames enqueued
377 int enqueue(T * frames, int frame_count)
379 return samples_to_frames(
380 ring_buffer.enqueue(frames, frames_to_samples(frame_count)));
384 * @brief Removes `frame_count` frames from the buffer, and
385 * write them to `frames` if it is non-null.
387 * Only safely called on the consumer thread.
389 * @param frames If non-null, the frames are copied to `frames`.
390 * Otherwise, they are dropped.
391 * @param frame_count The number of frames to remove.
393 * @return The number of frames actually dequeud.
395 int dequeue(T * frames, int frame_count)
397 return samples_to_frames(
398 ring_buffer.dequeue(frames, frames_to_samples(frame_count)));
401 * Get the number of available frames of audio for consuming.
403 * Only safely called on the consumer thread.
405 * @return The number of available frames of audio for reading.
407 int available_read() const
409 return samples_to_frames(ring_buffer.available_read());
412 * Get the number of available frames of audio for consuming.
414 * Only safely called on the producer thread.
416 * @return The number of empty slots in the buffer, available for writing.
418 int available_write() const
420 return samples_to_frames(ring_buffer.available_write());
423 * Get the total capacity, for this ring buffer.
425 * Can be called safely on any thread.
427 * @return The maximum capacity of this ring buffer.
429 int capacity() const { return samples_to_frames(ring_buffer.capacity()); }
431 private:
433 * @brief Frames to samples conversion.
435 * @param frames The number of frames.
437 * @return A number of samples.
439 int frames_to_samples(int frames) const { return frames * channel_count; }
441 * @brief Samples to frames conversion.
443 * @param samples The number of samples.
445 * @return A number of frames.
447 int samples_to_frames(int samples) const { return samples / channel_count; }
448 /** Number of channels of audio that will stream through this ring buffer. */
449 int channel_count;
450 /** The underlying ring buffer that is used to store the data. */
451 ring_buffer_base<T> ring_buffer;
455 * Lock-free instantiation of the `ring_buffer_base` type. This is safe to use
456 * from two threads, one producer, one consumer (that never change role),
457 * without explicit synchronization.
459 template <typename T> using lock_free_queue = ring_buffer_base<T>;
461 * Lock-free instantiation of the `audio_ring_buffer` type. This is safe to use
462 * from two threads, one producer, one consumer (that never change role),
463 * without explicit synchronization.
465 template <typename T>
466 using lock_free_audio_ring_buffer = audio_ring_buffer_base<T>;
468 #endif // CUBEB_RING_BUFFER_H