2 * AioContext multithreading tests
4 * Copyright Red Hat, Inc. 2016
7 * Paolo Bonzini <pbonzini@redhat.com>
9 * This work is licensed under the terms of the GNU LGPL, version 2 or later.
10 * See the COPYING.LIB file in the top-level directory.
13 #include "qemu/osdep.h"
14 #include "block/aio.h"
15 #include "qemu/coroutine.h"
16 #include "qemu/thread.h"
17 #include "qemu/error-report.h"
20 /* AioContext management */
22 #define NUM_CONTEXTS 5
24 static IOThread
*threads
[NUM_CONTEXTS
];
25 static AioContext
*ctx
[NUM_CONTEXTS
];
26 static __thread
int id
= -1;
28 static QemuEvent done_event
;
30 /* Run a function synchronously on a remote iothread. */
32 typedef struct CtxRunData
{
37 static void ctx_run_bh_cb(void *opaque
)
39 CtxRunData
*data
= opaque
;
42 qemu_event_set(&done_event
);
45 static void ctx_run(int i
, QEMUBHFunc
*cb
, void *opaque
)
52 qemu_event_reset(&done_event
);
53 aio_bh_schedule_oneshot(ctx
[i
], ctx_run_bh_cb
, &data
);
54 qemu_event_wait(&done_event
);
57 /* Starting the iothreads. */
59 static void set_id_cb(void *opaque
)
66 static void create_aio_contexts(void)
70 for (i
= 0; i
< NUM_CONTEXTS
; i
++) {
71 threads
[i
] = iothread_new();
72 ctx
[i
] = iothread_get_aio_context(threads
[i
]);
75 qemu_event_init(&done_event
, false);
76 for (i
= 0; i
< NUM_CONTEXTS
; i
++) {
77 ctx_run(i
, set_id_cb
, &i
);
81 /* Stopping the iothreads. */
83 static void join_aio_contexts(void)
87 for (i
= 0; i
< NUM_CONTEXTS
; i
++) {
88 aio_context_ref(ctx
[i
]);
90 for (i
= 0; i
< NUM_CONTEXTS
; i
++) {
91 iothread_join(threads
[i
]);
93 for (i
= 0; i
< NUM_CONTEXTS
; i
++) {
94 aio_context_unref(ctx
[i
]);
96 qemu_event_destroy(&done_event
);
99 /* Basic test for the stuff above. */
101 static void test_lifecycle(void)
103 create_aio_contexts();
107 /* aio_co_schedule test. */
109 static Coroutine
*to_schedule
[NUM_CONTEXTS
];
110 static bool stop
[NUM_CONTEXTS
];
112 static int count_retry
;
113 static int count_here
;
114 static int count_other
;
116 static bool schedule_next(int n
)
120 co
= qatomic_xchg(&to_schedule
[n
], NULL
);
122 qatomic_inc(&count_retry
);
127 qatomic_inc(&count_here
);
129 qatomic_inc(&count_other
);
132 aio_co_schedule(ctx
[n
], co
);
136 static void finish_cb(void *opaque
)
142 static coroutine_fn
void test_multi_co_schedule_entry(void *opaque
)
144 g_assert(to_schedule
[id
] == NULL
);
147 * The next iteration will set to_schedule[id] again, but once finish_cb
148 * is scheduled there is no guarantee that it will actually be woken up,
149 * so at that point it must not go to sleep.
154 n
= g_test_rand_int_range(0, NUM_CONTEXTS
);
157 qatomic_mb_set(&to_schedule
[id
], qemu_coroutine_self());
158 /* finish_cb can run here. */
159 qemu_coroutine_yield();
160 g_assert(to_schedule
[id
] == NULL
);
165 static void test_multi_co_schedule(int seconds
)
169 count_here
= count_other
= count_retry
= 0;
171 create_aio_contexts();
172 for (i
= 0; i
< NUM_CONTEXTS
; i
++) {
173 Coroutine
*co1
= qemu_coroutine_create(test_multi_co_schedule_entry
, NULL
);
174 aio_co_schedule(ctx
[i
], co1
);
177 g_usleep(seconds
* 1000000);
179 /* Guarantee that each AioContext is woken up from its last wait. */
180 for (i
= 0; i
< NUM_CONTEXTS
; i
++) {
181 ctx_run(i
, finish_cb
, NULL
);
182 g_assert(to_schedule
[i
] == NULL
);
186 g_test_message("scheduled %d, queued %d, retry %d, total %d",
187 count_other
, count_here
, count_retry
,
188 count_here
+ count_other
+ count_retry
);
191 static void test_multi_co_schedule_1(void)
193 test_multi_co_schedule(1);
196 static void test_multi_co_schedule_10(void)
198 test_multi_co_schedule(10);
201 /* CoMutex thread-safety. */
203 static uint32_t atomic_counter
;
204 static uint32_t running
;
205 static uint32_t counter
;
206 static CoMutex comutex
;
207 static bool now_stopping
;
209 static void coroutine_fn
test_multi_co_mutex_entry(void *opaque
)
211 while (!qatomic_read(&now_stopping
)) {
212 qemu_co_mutex_lock(&comutex
);
214 qemu_co_mutex_unlock(&comutex
);
216 /* Increase atomic_counter *after* releasing the mutex. Otherwise
217 * there is a chance (it happens about 1 in 3 runs) that the iothread
218 * exits before the coroutine is woken up, causing a spurious
221 qatomic_inc(&atomic_counter
);
223 qatomic_dec(&running
);
226 static void test_multi_co_mutex(int threads
, int seconds
)
230 qemu_co_mutex_init(&comutex
);
233 now_stopping
= false;
235 create_aio_contexts();
236 assert(threads
<= NUM_CONTEXTS
);
238 for (i
= 0; i
< threads
; i
++) {
239 Coroutine
*co1
= qemu_coroutine_create(test_multi_co_mutex_entry
, NULL
);
240 aio_co_schedule(ctx
[i
], co1
);
243 g_usleep(seconds
* 1000000);
245 qatomic_set(&now_stopping
, true);
246 while (running
> 0) {
251 g_test_message("%d iterations/second", counter
/ seconds
);
252 g_assert_cmpint(counter
, ==, atomic_counter
);
255 /* Testing with NUM_CONTEXTS threads focuses on the queue. The mutex however
256 * is too contended (and the threads spend too much time in aio_poll)
257 * to actually stress the handoff protocol.
259 static void test_multi_co_mutex_1(void)
261 test_multi_co_mutex(NUM_CONTEXTS
, 1);
264 static void test_multi_co_mutex_10(void)
266 test_multi_co_mutex(NUM_CONTEXTS
, 10);
269 /* Testing with fewer threads stresses the handoff protocol too. Still, the
270 * case where the locker _can_ pick up a handoff is very rare, happening
271 * about 10 times in 1 million, so increase the runtime a bit compared to
272 * other "quick" testcases that only run for 1 second.
274 static void test_multi_co_mutex_2_3(void)
276 test_multi_co_mutex(2, 3);
279 static void test_multi_co_mutex_2_30(void)
281 test_multi_co_mutex(2, 30);
284 /* Same test with fair mutexes, for performance comparison. */
287 #include "qemu/futex.h"
289 /* The nodes for the mutex reside in this structure (on which we try to avoid
290 * false sharing). The head of the mutex is in the "mutex_head" variable.
295 } nodes
[NUM_CONTEXTS
] __attribute__((__aligned__(64)));
297 static int mutex_head
= -1;
299 static void mcs_mutex_lock(void)
304 nodes
[id
].locked
= 1;
305 prev
= qatomic_xchg(&mutex_head
, id
);
307 qatomic_set(&nodes
[prev
].next
, id
);
308 qemu_futex_wait(&nodes
[id
].locked
, 1);
312 static void mcs_mutex_unlock(void)
315 if (qatomic_read(&nodes
[id
].next
) == -1) {
316 if (qatomic_read(&mutex_head
) == id
&&
317 qatomic_cmpxchg(&mutex_head
, id
, -1) == id
) {
318 /* Last item in the list, exit. */
321 while (qatomic_read(&nodes
[id
].next
) == -1) {
322 /* mcs_mutex_lock did the xchg, but has not updated
323 * nodes[prev].next yet.
328 /* Wake up the next in line. */
329 next
= qatomic_read(&nodes
[id
].next
);
330 nodes
[next
].locked
= 0;
331 qemu_futex_wake(&nodes
[next
].locked
, 1);
334 static void test_multi_fair_mutex_entry(void *opaque
)
336 while (!qatomic_read(&now_stopping
)) {
340 qatomic_inc(&atomic_counter
);
342 qatomic_dec(&running
);
345 static void test_multi_fair_mutex(int threads
, int seconds
)
349 assert(mutex_head
== -1);
352 now_stopping
= false;
354 create_aio_contexts();
355 assert(threads
<= NUM_CONTEXTS
);
357 for (i
= 0; i
< threads
; i
++) {
358 Coroutine
*co1
= qemu_coroutine_create(test_multi_fair_mutex_entry
, NULL
);
359 aio_co_schedule(ctx
[i
], co1
);
362 g_usleep(seconds
* 1000000);
364 qatomic_set(&now_stopping
, true);
365 while (running
> 0) {
370 g_test_message("%d iterations/second", counter
/ seconds
);
371 g_assert_cmpint(counter
, ==, atomic_counter
);
374 static void test_multi_fair_mutex_1(void)
376 test_multi_fair_mutex(NUM_CONTEXTS
, 1);
379 static void test_multi_fair_mutex_10(void)
381 test_multi_fair_mutex(NUM_CONTEXTS
, 10);
385 /* Same test with pthread mutexes, for performance comparison and
388 static QemuMutex mutex
;
390 static void test_multi_mutex_entry(void *opaque
)
392 while (!qatomic_read(&now_stopping
)) {
393 qemu_mutex_lock(&mutex
);
395 qemu_mutex_unlock(&mutex
);
396 qatomic_inc(&atomic_counter
);
398 qatomic_dec(&running
);
401 static void test_multi_mutex(int threads
, int seconds
)
405 qemu_mutex_init(&mutex
);
408 now_stopping
= false;
410 create_aio_contexts();
411 assert(threads
<= NUM_CONTEXTS
);
413 for (i
= 0; i
< threads
; i
++) {
414 Coroutine
*co1
= qemu_coroutine_create(test_multi_mutex_entry
, NULL
);
415 aio_co_schedule(ctx
[i
], co1
);
418 g_usleep(seconds
* 1000000);
420 qatomic_set(&now_stopping
, true);
421 while (running
> 0) {
426 g_test_message("%d iterations/second", counter
/ seconds
);
427 g_assert_cmpint(counter
, ==, atomic_counter
);
430 static void test_multi_mutex_1(void)
432 test_multi_mutex(NUM_CONTEXTS
, 1);
435 static void test_multi_mutex_10(void)
437 test_multi_mutex(NUM_CONTEXTS
, 10);
442 int main(int argc
, char **argv
)
446 g_test_init(&argc
, &argv
, NULL
);
447 g_test_add_func("/aio/multi/lifecycle", test_lifecycle
);
448 if (g_test_quick()) {
449 g_test_add_func("/aio/multi/schedule", test_multi_co_schedule_1
);
450 g_test_add_func("/aio/multi/mutex/contended", test_multi_co_mutex_1
);
451 g_test_add_func("/aio/multi/mutex/handoff", test_multi_co_mutex_2_3
);
453 g_test_add_func("/aio/multi/mutex/mcs", test_multi_fair_mutex_1
);
455 g_test_add_func("/aio/multi/mutex/pthread", test_multi_mutex_1
);
457 g_test_add_func("/aio/multi/schedule", test_multi_co_schedule_10
);
458 g_test_add_func("/aio/multi/mutex/contended", test_multi_co_mutex_10
);
459 g_test_add_func("/aio/multi/mutex/handoff", test_multi_co_mutex_2_30
);
461 g_test_add_func("/aio/multi/mutex/mcs", test_multi_fair_mutex_10
);
463 g_test_add_func("/aio/multi/mutex/pthread", test_multi_mutex_10
);