1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* vim: set ts=8 sts=2 et sw=2 tw=80: */
3 /* This Source Code Form is subject to the terms of the Mozilla Public
4 * License, v. 2.0. If a copy of the MPL was not distributed with this
5 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
7 #include "mozilla/SPSCQueue.h"
8 #include "mozilla/PodOperations.h"
20 using namespace mozilla
;
22 /* Generate a monotonically increasing sequence of numbers. */
24 class SequenceGenerator
{
26 SequenceGenerator() = default;
27 void Get(T
* aElements
, size_t aCount
) {
28 for (size_t i
= 0; i
< aCount
; i
++) {
29 aElements
[i
] = static_cast<T
>(mIndex
);
33 void Rewind(size_t aCount
) { mIndex
-= aCount
; }
39 /* Checks that a sequence is monotonically increasing. */
41 class SequenceVerifier
{
43 SequenceVerifier() = default;
44 void Check(T
* aElements
, size_t aCount
) {
45 for (size_t i
= 0; i
< aCount
; i
++) {
46 if (aElements
[i
] != static_cast<T
>(mIndex
)) {
47 std::cerr
<< "Element " << i
<< " is different. Expected "
48 << static_cast<T
>(mIndex
) << ", got " << aElements
[i
] << "."
50 MOZ_RELEASE_ASSERT(false);
60 const int BLOCK_SIZE
= 127;
63 void TestRing(int capacity
) {
64 SPSCQueue
<T
> buf(capacity
);
65 std::unique_ptr
<T
[]> seq(new T
[capacity
]);
66 SequenceGenerator
<T
> gen
;
67 SequenceVerifier
<T
> checker
;
69 int iterations
= 1002;
71 while (iterations
--) {
72 gen
.Get(seq
.get(), BLOCK_SIZE
);
73 int rv
= buf
.Enqueue(seq
.get(), BLOCK_SIZE
);
74 MOZ_RELEASE_ASSERT(rv
== BLOCK_SIZE
);
75 PodZero(seq
.get(), BLOCK_SIZE
);
76 rv
= buf
.Dequeue(seq
.get(), BLOCK_SIZE
);
77 MOZ_RELEASE_ASSERT(rv
== BLOCK_SIZE
);
78 checker
.Check(seq
.get(), BLOCK_SIZE
);
83 // On Windows and x86 Android, the timer resolution is so bad that, even if
84 // we used `timeBeginPeriod(1)`, any nonzero sleep from the test's inner loops
85 // would make this program take far too long.
88 #elif defined(ANDROID)
89 std::this_thread::sleep_for(std::chrono::microseconds(0));
91 std::this_thread::sleep_for(std::chrono::microseconds(10));
96 void TestRingMultiThread(int capacity
) {
97 SPSCQueue
<T
> buf(capacity
);
98 SequenceVerifier
<T
> checker
;
99 std::unique_ptr
<T
[]> outBuffer(new T
[capacity
]);
101 std::thread
t([&buf
, capacity
] {
102 int iterations
= 1002;
103 std::unique_ptr
<T
[]> inBuffer(new T
[capacity
]);
104 SequenceGenerator
<T
> gen
;
106 while (iterations
--) {
108 gen
.Get(inBuffer
.get(), BLOCK_SIZE
);
109 int rv
= buf
.Enqueue(inBuffer
.get(), BLOCK_SIZE
);
110 MOZ_RELEASE_ASSERT(rv
<= BLOCK_SIZE
);
111 if (rv
!= BLOCK_SIZE
) {
112 gen
.Rewind(BLOCK_SIZE
- rv
);
117 int remaining
= 1002;
119 while (remaining
--) {
121 int rv
= buf
.Dequeue(outBuffer
.get(), BLOCK_SIZE
);
122 MOZ_RELEASE_ASSERT(rv
<= BLOCK_SIZE
);
123 checker
.Check(outBuffer
.get(), rv
);
129 template <typename T
>
130 void BasicAPITest(T
& ring
) {
131 MOZ_RELEASE_ASSERT(ring
.Capacity() == 128);
133 MOZ_RELEASE_ASSERT(ring
.AvailableRead() == 0);
134 MOZ_RELEASE_ASSERT(ring
.AvailableWrite() == 128);
136 int rv
= ring
.EnqueueDefault(63);
138 MOZ_RELEASE_ASSERT(rv
== 63);
139 MOZ_RELEASE_ASSERT(ring
.AvailableRead() == 63);
140 MOZ_RELEASE_ASSERT(ring
.AvailableWrite() == 65);
142 rv
= ring
.EnqueueDefault(65);
144 MOZ_RELEASE_ASSERT(rv
== 65);
145 MOZ_RELEASE_ASSERT(ring
.AvailableRead() == 128);
146 MOZ_RELEASE_ASSERT(ring
.AvailableWrite() == 0);
148 rv
= ring
.Dequeue(nullptr, 63);
150 MOZ_RELEASE_ASSERT(ring
.AvailableRead() == 65);
151 MOZ_RELEASE_ASSERT(ring
.AvailableWrite() == 63);
153 rv
= ring
.Dequeue(nullptr, 65);
155 MOZ_RELEASE_ASSERT(ring
.AvailableRead() == 0);
156 MOZ_RELEASE_ASSERT(ring
.AvailableWrite() == 128);
159 const size_t RING_BUFFER_SIZE
= 128;
160 const size_t ENQUEUE_SIZE
= RING_BUFFER_SIZE
/ 2;
162 void TestResetAPI() {
163 SPSCQueue
<float> ring(RING_BUFFER_SIZE
);
164 std::thread
p([&ring
] {
165 std::unique_ptr
<float[]> inBuffer(new float[ENQUEUE_SIZE
]);
166 int rv
= ring
.Enqueue(inBuffer
.get(), ENQUEUE_SIZE
);
167 MOZ_RELEASE_ASSERT(rv
> 0);
172 std::thread
c([&ring
] {
173 std::unique_ptr
<float[]> outBuffer(new float[ENQUEUE_SIZE
]);
174 int rv
= ring
.Dequeue(outBuffer
.get(), ENQUEUE_SIZE
);
175 MOZ_RELEASE_ASSERT(rv
> 0);
180 // Enqueue with a different thread. We reset the thread ID in the ring buffer,
182 std::thread
p2([&ring
] {
183 ring
.ResetProducerThreadId();
184 std::unique_ptr
<float[]> inBuffer(new float[ENQUEUE_SIZE
]);
185 int rv
= ring
.Enqueue(inBuffer
.get(), ENQUEUE_SIZE
);
186 MOZ_RELEASE_ASSERT(rv
> 0);
191 // Dequeue with a different thread. We reset the thread ID in the ring buffer,
193 std::thread
c2([&ring
] {
194 ring
.ResetConsumerThreadId();
195 std::unique_ptr
<float[]> outBuffer(new float[ENQUEUE_SIZE
]);
196 int rv
= ring
.Dequeue(outBuffer
.get(), ENQUEUE_SIZE
);
197 MOZ_RELEASE_ASSERT(rv
> 0);
202 // Similarly, but do the Enqueues without a Dequeue in between, since a
203 // Dequeue could affect memory ordering.
206 ring
.ResetProducerThreadId();
207 std::unique_ptr
<float[]> inBuffer(new float[ENQUEUE_SIZE
]);
208 int rv
= ring
.Enqueue(inBuffer
.get(), ENQUEUE_SIZE
);
209 MOZ_RELEASE_ASSERT(rv
> 0);
210 p4
= std::thread([&ring
] {
211 ring
.ResetProducerThreadId();
212 std::unique_ptr
<float[]> inBuffer(new float[ENQUEUE_SIZE
]);
213 int rv
= ring
.Enqueue(inBuffer
.get(), ENQUEUE_SIZE
);
214 MOZ_RELEASE_ASSERT(rv
> 0);
223 ring
.ResetConsumerThreadId();
224 std::unique_ptr
<float[]> outBuffer(new float[ENQUEUE_SIZE
]);
225 int rv
= ring
.Dequeue(outBuffer
.get(), ENQUEUE_SIZE
);
226 MOZ_RELEASE_ASSERT(rv
> 0);
227 c4
= std::thread([&ring
] {
228 ring
.ResetConsumerThreadId();
229 std::unique_ptr
<float[]> outBuffer(new float[ENQUEUE_SIZE
]);
230 int rv
= ring
.Dequeue(outBuffer
.get(), ENQUEUE_SIZE
);
231 MOZ_RELEASE_ASSERT(rv
> 0);
240 const size_t ELEMENT_COUNT
= 16;
242 Thing() : mStr("") {}
243 explicit Thing(const std::string
& aStr
) : mStr(aStr
) {}
244 Thing(Thing
&& aOtherThing
) {
245 mStr
= std::move(aOtherThing
.mStr
);
246 // aOtherThing.mStr.clear();
248 Thing
& operator=(Thing
&& aOtherThing
) {
249 mStr
= std::move(aOtherThing
.mStr
);
255 std::vector
<Thing
> vec_in
;
256 std::vector
<Thing
> vec_out
;
258 for (uint32_t i
= 0; i
< ELEMENT_COUNT
; i
++) {
259 vec_in
.push_back(Thing(std::to_string(i
)));
260 vec_out
.push_back(Thing());
263 SPSCQueue
<Thing
> queue(ELEMENT_COUNT
);
265 int rv
= queue
.Enqueue(&vec_in
[0], ELEMENT_COUNT
);
266 MOZ_RELEASE_ASSERT(rv
== ELEMENT_COUNT
);
268 // Check that we've moved the std::string into the queue.
269 for (uint32_t i
= 0; i
< ELEMENT_COUNT
; i
++) {
270 MOZ_RELEASE_ASSERT(vec_in
[i
].mStr
.empty());
273 rv
= queue
.Dequeue(&vec_out
[0], ELEMENT_COUNT
);
274 MOZ_RELEASE_ASSERT(rv
== ELEMENT_COUNT
);
276 for (uint32_t i
= 0; i
< ELEMENT_COUNT
; i
++) {
277 MOZ_RELEASE_ASSERT(std::stoul(vec_out
[i
].mStr
) == i
);
282 const int minCapacity
= 199;
283 const int maxCapacity
= 1277;
284 const int capacityIncrement
= 27;
286 SPSCQueue
<float> q1(128);
288 SPSCQueue
<char> q2(128);
291 for (uint32_t i
= minCapacity
; i
< maxCapacity
; i
+= capacityIncrement
) {
292 TestRing
<uint32_t>(i
);
293 TestRingMultiThread
<uint32_t>(i
);
295 TestRingMultiThread
<float>(i
);