Backed out changeset 06f41c22f3a6 (bug 1888460) for causing linux xpcshell failures...
[gecko.git] / dom / media / MPSCQueue.h
blobea7848154ffec3cb80c83b1982305c4ff3faa6bf
1 /* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
5 #ifndef mozilla_dom_MPSCQueue_h
6 #define mozilla_dom_MPSCQueue_h
8 namespace mozilla {
10 // This class implements a lock-free multiple producer single consumer queue of
11 // fixed size log messages, with the following characteristics:
12 // - Unbounded (uses a intrinsic linked list)
13 // - Allocates on Push. Push can be called on any thread.
14 // - Deallocates on Pop. Pop MUST always be called on the same thread for the
15 // life-time of the queue.
17 // In our scenario, the producer threads are real-time, they can't block. The
18 // consummer thread runs every now and then and empties the queue to a log
19 // file, on disk.
20 const size_t MPSC_MSG_RESERVED = sizeof(std::atomic<void*>);
22 template <typename T>
23 class MPSCQueue {
24 public:
25 struct Message {
26 Message() { mNext.store(nullptr, std::memory_order_relaxed); }
27 Message(const Message& aMessage) = delete;
28 void operator=(const Message& aMessage) = delete;
30 std::atomic<Message*> mNext;
31 T data;
34 // Creates a new MPSCQueue. Initially, the queue has a single sentinel node,
35 // pointed to by both mHead and mTail.
36 MPSCQueue()
37 // At construction, the initial message points to nullptr (it has no
38 // successor). It is a sentinel node, that does not contain meaningful
39 // data.
40 : mHead(new Message()), mTail(mHead.load(std::memory_order_relaxed)) {}
42 ~MPSCQueue() {
43 Message dummy;
44 while (Pop(&dummy.data)) {
46 Message* front = mHead.load(std::memory_order_relaxed);
47 delete front;
50 void Push(MPSCQueue<T>::Message* aMessage) {
51 // The next two non-commented line are called A and B in this paragraph.
52 // Producer threads i, i-1, etc. are numbered in the order they reached
53 // A in time, thread i being the thread that has reached A first.
54 // Atomically, on line A the new `mHead` is set to be the node that was
55 // just allocated, with strong memory order. From now on, any thread
56 // that reaches A will see that the node just allocated is
57 // effectively the head of the list, and will make itself the new head
58 // of the list.
59 // In a bad case (when thread i executes A and then
60 // is not scheduled for a long time), it is possible that thread i-1 and
61 // subsequent threads create a seemingly disconnected set of nodes, but
62 // they all have the correct value for the next node to set as their
63 // mNext member on their respective stacks (in `prev`), and this is
64 // always correct. When the scheduler resumes, and line B is executed,
65 // the correct linkage is resumed.
66 // Before line B, since mNext for the node was the last element of
67 // the queue still has an mNext of nullptr, Pop will not see the node
68 // added.
69 // For line A, it's critical to have strong ordering both ways (since
70 // it's going to possibly be read and write repeatidly by multiple
71 // threads)
72 // Line B can have weaker guarantees, it's only going to be written by a
73 // single thread, and we just need to ensure it's read properly by a
74 // single other one.
75 Message* prev = mHead.exchange(aMessage, std::memory_order_acq_rel);
76 prev->mNext.store(aMessage, std::memory_order_release);
79 // Copy the content of the first message of the queue to aOutput, and
80 // frees the message. Returns true if there was a message, in which case
81 // `aOutput` contains a valid value. If the queue was empty, returns false,
82 // in which case `aOutput` is left untouched.
83 bool Pop(T* aOutput) {
84 // Similarly, in this paragraph, the two following lines are called A
85 // and B, and threads are called thread i, i-1, etc. in order of
86 // execution of line A.
87 // On line A, the first element of the queue is acquired. It is simply a
88 // sentinel node.
89 // On line B, we acquire the node that has the data we want. If B is
90 // null, then only the sentinel node was present in the queue, we can
91 // safely return false.
92 // mTail can be loaded with relaxed ordering, since it's not written nor
93 // read by any other thread (this queue is single consumer).
94 // mNext can be written to by one of the producer, so it's necessary to
95 // ensure those writes are seen, hence the stricter ordering.
96 Message* tail = mTail.load(std::memory_order_relaxed);
97 Message* next = tail->mNext.load(std::memory_order_acquire);
99 if (next == nullptr) {
100 return false;
103 *aOutput = next->data;
105 // Simply shift the queue one node further, so that the sentinel node is
106 // now pointing to the correct most ancient node. It contains stale data,
107 // but this data will never be read again.
108 // It's only necessary to ensure the previous load on this thread is not
109 // reordered past this line, so release ordering is sufficient here.
110 mTail.store(next, std::memory_order_release);
112 // This thread is now the only thing that points to `tail`, it can be
113 // safely deleted.
114 delete tail;
116 return true;
119 private:
120 // An atomic pointer to the most recent message in the queue.
121 std::atomic<Message*> mHead;
122 // An atomic pointer to a sentinel node, that points to the oldest message
123 // in the queue.
124 std::atomic<Message*> mTail;
126 MPSCQueue(const MPSCQueue&) = delete;
127 void operator=(const MPSCQueue&) = delete;
130 } // namespace mozilla
132 #endif // mozilla_dom_MPSCQueue_h