2 * QEMU aio implementation
4 * Copyright IBM, Corp. 2008
7 * Anthony Liguori <aliguori@us.ibm.com>
9 * This work is licensed under the terms of the GNU GPL, version 2. See
10 * the COPYING file in the top-level directory.
12 * Contributions after 2012-01-13 are licensed under the terms of the
13 * GNU GPL, version 2 or (at your option) any later version.
16 #include "qemu/osdep.h"
17 #include "block/block.h"
19 #include "qemu/rcu_queue.h"
20 #include "qemu/sockets.h"
21 #include "qemu/cutils.h"
23 #include "aio-posix.h"
25 /* Stop userspace polling on a handler if it isn't active for some time */
26 #define POLL_IDLE_INTERVAL_NS (7 * NANOSECONDS_PER_SECOND)
28 bool aio_poll_disabled(AioContext
*ctx
)
30 return qatomic_read(&ctx
->poll_disable_cnt
);
33 void aio_add_ready_handler(AioHandlerList
*ready_list
,
37 QLIST_SAFE_REMOVE(node
, node_ready
); /* remove from nested parent's list */
38 node
->pfd
.revents
= revents
;
39 QLIST_INSERT_HEAD(ready_list
, node
, node_ready
);
42 static AioHandler
*find_aio_handler(AioContext
*ctx
, int fd
)
46 QLIST_FOREACH(node
, &ctx
->aio_handlers
, node
) {
47 if (node
->pfd
.fd
== fd
) {
48 if (!QLIST_IS_INSERTED(node
, node_deleted
)) {
57 static bool aio_remove_fd_handler(AioContext
*ctx
, AioHandler
*node
)
59 /* If the GSource is in the process of being destroyed then
60 * g_source_remove_poll() causes an assertion failure. Skip
61 * removal in that case, because glib cleans up its state during
64 if (!g_source_is_destroyed(&ctx
->source
)) {
65 g_source_remove_poll(&ctx
->source
, &node
->pfd
);
68 node
->pfd
.revents
= 0;
70 /* If the fd monitor has already marked it deleted, leave it alone */
71 if (QLIST_IS_INSERTED(node
, node_deleted
)) {
75 /* If a read is in progress, just mark the node as deleted */
76 if (qemu_lockcnt_count(&ctx
->list_lock
)) {
77 QLIST_INSERT_HEAD_RCU(&ctx
->deleted_aio_handlers
, node
, node_deleted
);
80 /* Otherwise, delete it for real. We can't just mark it as
81 * deleted because deleted nodes are only cleaned up while
82 * no one is walking the handlers list.
84 QLIST_SAFE_REMOVE(node
, node_poll
);
85 QLIST_REMOVE(node
, node
);
89 void aio_set_fd_handler(AioContext
*ctx
,
98 AioHandler
*new_node
= NULL
;
100 bool deleted
= false;
101 int poll_disable_change
;
103 qemu_lockcnt_lock(&ctx
->list_lock
);
105 node
= find_aio_handler(ctx
, fd
);
107 /* Are we deleting the fd handler? */
108 if (!io_read
&& !io_write
&& !io_poll
) {
110 qemu_lockcnt_unlock(&ctx
->list_lock
);
113 /* Clean events in order to unregister fd from the ctx epoll. */
114 node
->pfd
.events
= 0;
116 poll_disable_change
= -!node
->io_poll
;
118 poll_disable_change
= !io_poll
- (node
&& !node
->io_poll
);
122 /* Alloc and insert if it's not already there */
123 new_node
= g_new0(AioHandler
, 1);
125 /* Update handler with latest information */
126 new_node
->io_read
= io_read
;
127 new_node
->io_write
= io_write
;
128 new_node
->io_poll
= io_poll
;
129 new_node
->opaque
= opaque
;
130 new_node
->is_external
= is_external
;
133 new_node
->pfd
.fd
= fd
;
135 new_node
->pfd
= node
->pfd
;
137 g_source_add_poll(&ctx
->source
, &new_node
->pfd
);
139 new_node
->pfd
.events
= (io_read
? G_IO_IN
| G_IO_HUP
| G_IO_ERR
: 0);
140 new_node
->pfd
.events
|= (io_write
? G_IO_OUT
| G_IO_ERR
: 0);
142 QLIST_INSERT_HEAD_RCU(&ctx
->aio_handlers
, new_node
, node
);
145 /* No need to order poll_disable_cnt writes against other updates;
146 * the counter is only used to avoid wasting time and latency on
147 * iterated polling when the system call will be ultimately necessary.
148 * Changing handlers is a rare event, and a little wasted polling until
149 * the aio_notify below is not an issue.
151 qatomic_set(&ctx
->poll_disable_cnt
,
152 qatomic_read(&ctx
->poll_disable_cnt
) + poll_disable_change
);
154 ctx
->fdmon_ops
->update(ctx
, node
, new_node
);
156 deleted
= aio_remove_fd_handler(ctx
, node
);
158 qemu_lockcnt_unlock(&ctx
->list_lock
);
166 void aio_set_fd_poll(AioContext
*ctx
, int fd
,
167 IOHandler
*io_poll_begin
,
168 IOHandler
*io_poll_end
)
170 AioHandler
*node
= find_aio_handler(ctx
, fd
);
176 node
->io_poll_begin
= io_poll_begin
;
177 node
->io_poll_end
= io_poll_end
;
180 void aio_set_event_notifier(AioContext
*ctx
,
181 EventNotifier
*notifier
,
183 EventNotifierHandler
*io_read
,
186 aio_set_fd_handler(ctx
, event_notifier_get_fd(notifier
), is_external
,
187 (IOHandler
*)io_read
, NULL
, io_poll
, notifier
);
190 void aio_set_event_notifier_poll(AioContext
*ctx
,
191 EventNotifier
*notifier
,
192 EventNotifierHandler
*io_poll_begin
,
193 EventNotifierHandler
*io_poll_end
)
195 aio_set_fd_poll(ctx
, event_notifier_get_fd(notifier
),
196 (IOHandler
*)io_poll_begin
,
197 (IOHandler
*)io_poll_end
);
200 static bool poll_set_started(AioContext
*ctx
, bool started
)
203 bool progress
= false;
205 if (started
== ctx
->poll_started
) {
209 ctx
->poll_started
= started
;
211 qemu_lockcnt_inc(&ctx
->list_lock
);
212 QLIST_FOREACH(node
, &ctx
->poll_aio_handlers
, node_poll
) {
215 if (QLIST_IS_INSERTED(node
, node_deleted
)) {
220 fn
= node
->io_poll_begin
;
222 fn
= node
->io_poll_end
;
229 /* Poll one last time in case ->io_poll_end() raced with the event */
231 progress
= node
->io_poll(node
->opaque
) || progress
;
234 qemu_lockcnt_dec(&ctx
->list_lock
);
240 bool aio_prepare(AioContext
*ctx
)
242 /* Poll mode cannot be used with glib's event loop, disable it. */
243 poll_set_started(ctx
, false);
248 bool aio_pending(AioContext
*ctx
)
254 * We have to walk very carefully in case aio_set_fd_handler is
255 * called while we're walking.
257 qemu_lockcnt_inc(&ctx
->list_lock
);
259 QLIST_FOREACH_RCU(node
, &ctx
->aio_handlers
, node
) {
262 revents
= node
->pfd
.revents
& node
->pfd
.events
;
263 if (revents
& (G_IO_IN
| G_IO_HUP
| G_IO_ERR
) && node
->io_read
&&
264 aio_node_check(ctx
, node
->is_external
)) {
268 if (revents
& (G_IO_OUT
| G_IO_ERR
) && node
->io_write
&&
269 aio_node_check(ctx
, node
->is_external
)) {
274 qemu_lockcnt_dec(&ctx
->list_lock
);
279 static void aio_free_deleted_handlers(AioContext
*ctx
)
283 if (QLIST_EMPTY_RCU(&ctx
->deleted_aio_handlers
)) {
286 if (!qemu_lockcnt_dec_if_lock(&ctx
->list_lock
)) {
287 return; /* we are nested, let the parent do the freeing */
290 while ((node
= QLIST_FIRST_RCU(&ctx
->deleted_aio_handlers
))) {
291 QLIST_REMOVE(node
, node
);
292 QLIST_REMOVE(node
, node_deleted
);
293 QLIST_SAFE_REMOVE(node
, node_poll
);
297 qemu_lockcnt_inc_and_unlock(&ctx
->list_lock
);
300 static bool aio_dispatch_handler(AioContext
*ctx
, AioHandler
*node
)
302 bool progress
= false;
305 revents
= node
->pfd
.revents
& node
->pfd
.events
;
306 node
->pfd
.revents
= 0;
309 * Start polling AioHandlers when they become ready because activity is
310 * likely to continue. Note that starvation is theoretically possible when
311 * fdmon_supports_polling(), but only until the fd fires for the first
314 if (!QLIST_IS_INSERTED(node
, node_deleted
) &&
315 !QLIST_IS_INSERTED(node
, node_poll
) &&
317 trace_poll_add(ctx
, node
, node
->pfd
.fd
, revents
);
318 if (ctx
->poll_started
&& node
->io_poll_begin
) {
319 node
->io_poll_begin(node
->opaque
);
321 QLIST_INSERT_HEAD(&ctx
->poll_aio_handlers
, node
, node_poll
);
324 if (!QLIST_IS_INSERTED(node
, node_deleted
) &&
325 (revents
& (G_IO_IN
| G_IO_HUP
| G_IO_ERR
)) &&
326 aio_node_check(ctx
, node
->is_external
) &&
328 node
->io_read(node
->opaque
);
330 /* aio_notify() does not count as progress */
331 if (node
->opaque
!= &ctx
->notifier
) {
335 if (!QLIST_IS_INSERTED(node
, node_deleted
) &&
336 (revents
& (G_IO_OUT
| G_IO_ERR
)) &&
337 aio_node_check(ctx
, node
->is_external
) &&
339 node
->io_write(node
->opaque
);
347 * If we have a list of ready handlers then this is more efficient than
348 * scanning all handlers with aio_dispatch_handlers().
350 static bool aio_dispatch_ready_handlers(AioContext
*ctx
,
351 AioHandlerList
*ready_list
)
353 bool progress
= false;
356 while ((node
= QLIST_FIRST(ready_list
))) {
357 QLIST_REMOVE(node
, node_ready
);
358 progress
= aio_dispatch_handler(ctx
, node
) || progress
;
364 /* Slower than aio_dispatch_ready_handlers() but only used via glib */
365 static bool aio_dispatch_handlers(AioContext
*ctx
)
367 AioHandler
*node
, *tmp
;
368 bool progress
= false;
370 QLIST_FOREACH_SAFE_RCU(node
, &ctx
->aio_handlers
, node
, tmp
) {
371 progress
= aio_dispatch_handler(ctx
, node
) || progress
;
377 void aio_dispatch(AioContext
*ctx
)
379 qemu_lockcnt_inc(&ctx
->list_lock
);
381 aio_dispatch_handlers(ctx
);
382 aio_free_deleted_handlers(ctx
);
383 qemu_lockcnt_dec(&ctx
->list_lock
);
385 timerlistgroup_run_timers(&ctx
->tlg
);
388 static bool run_poll_handlers_once(AioContext
*ctx
,
392 bool progress
= false;
396 QLIST_FOREACH_SAFE(node
, &ctx
->poll_aio_handlers
, node_poll
, tmp
) {
397 if (aio_node_check(ctx
, node
->is_external
) &&
398 node
->io_poll(node
->opaque
)) {
399 node
->poll_idle_timeout
= now
+ POLL_IDLE_INTERVAL_NS
;
402 * Polling was successful, exit try_poll_mode immediately
403 * to adjust the next polling time.
406 if (node
->opaque
!= &ctx
->notifier
) {
411 /* Caller handles freeing deleted nodes. Don't do it here. */
417 static bool fdmon_supports_polling(AioContext
*ctx
)
419 return ctx
->fdmon_ops
->need_wait
!= aio_poll_disabled
;
422 static bool remove_idle_poll_handlers(AioContext
*ctx
, int64_t now
)
426 bool progress
= false;
429 * File descriptor monitoring implementations without userspace polling
430 * support suffer from starvation when a subset of handlers is polled
431 * because fds will not be processed in a timely fashion. Don't remove
432 * idle poll handlers.
434 if (!fdmon_supports_polling(ctx
)) {
438 QLIST_FOREACH_SAFE(node
, &ctx
->poll_aio_handlers
, node_poll
, tmp
) {
439 if (node
->poll_idle_timeout
== 0LL) {
440 node
->poll_idle_timeout
= now
+ POLL_IDLE_INTERVAL_NS
;
441 } else if (now
>= node
->poll_idle_timeout
) {
442 trace_poll_remove(ctx
, node
, node
->pfd
.fd
);
443 node
->poll_idle_timeout
= 0LL;
444 QLIST_SAFE_REMOVE(node
, node_poll
);
445 if (ctx
->poll_started
&& node
->io_poll_end
) {
446 node
->io_poll_end(node
->opaque
);
449 * Final poll in case ->io_poll_end() races with an event.
450 * Nevermind about re-adding the handler in the rare case where
451 * this causes progress.
453 progress
= node
->io_poll(node
->opaque
) || progress
;
461 /* run_poll_handlers:
462 * @ctx: the AioContext
463 * @max_ns: maximum time to poll for, in nanoseconds
465 * Polls for a given time.
467 * Note that the caller must have incremented ctx->list_lock.
469 * Returns: true if progress was made, false otherwise
471 static bool run_poll_handlers(AioContext
*ctx
, int64_t max_ns
, int64_t *timeout
)
474 int64_t start_time
, elapsed_time
;
476 assert(qemu_lockcnt_count(&ctx
->list_lock
) > 0);
478 trace_run_poll_handlers_begin(ctx
, max_ns
, *timeout
);
481 * Optimization: ->io_poll() handlers often contain RCU read critical
482 * sections and we therefore see many rcu_read_lock() -> rcu_read_unlock()
483 * -> rcu_read_lock() -> ... sequences with expensive memory
484 * synchronization primitives. Make the entire polling loop an RCU
485 * critical section because nested rcu_read_lock()/rcu_read_unlock() calls
488 RCU_READ_LOCK_GUARD();
490 start_time
= qemu_clock_get_ns(QEMU_CLOCK_REALTIME
);
492 progress
= run_poll_handlers_once(ctx
, start_time
, timeout
);
493 elapsed_time
= qemu_clock_get_ns(QEMU_CLOCK_REALTIME
) - start_time
;
494 max_ns
= qemu_soonest_timeout(*timeout
, max_ns
);
495 assert(!(max_ns
&& progress
));
496 } while (elapsed_time
< max_ns
&& !ctx
->fdmon_ops
->need_wait(ctx
));
498 if (remove_idle_poll_handlers(ctx
, start_time
+ elapsed_time
)) {
503 /* If time has passed with no successful polling, adjust *timeout to
504 * keep the same ending time.
506 if (*timeout
!= -1) {
507 *timeout
-= MIN(*timeout
, elapsed_time
);
510 trace_run_poll_handlers_end(ctx
, progress
, *timeout
);
515 * @ctx: the AioContext
516 * @timeout: timeout for blocking wait, computed by the caller and updated if
519 * Note that the caller must have incremented ctx->list_lock.
521 * Returns: true if progress was made, false otherwise
523 static bool try_poll_mode(AioContext
*ctx
, int64_t *timeout
)
527 if (QLIST_EMPTY_RCU(&ctx
->poll_aio_handlers
)) {
531 max_ns
= qemu_soonest_timeout(*timeout
, ctx
->poll_ns
);
532 if (max_ns
&& !ctx
->fdmon_ops
->need_wait(ctx
)) {
533 poll_set_started(ctx
, true);
535 if (run_poll_handlers(ctx
, max_ns
, timeout
)) {
540 if (poll_set_started(ctx
, false)) {
548 bool aio_poll(AioContext
*ctx
, bool blocking
)
550 AioHandlerList ready_list
= QLIST_HEAD_INITIALIZER(ready_list
);
558 * There cannot be two concurrent aio_poll calls for the same AioContext (or
559 * an aio_poll concurrent with a GSource prepare/check/dispatch callback).
560 * We rely on this below to avoid slow locked accesses to ctx->notify_me.
562 assert(in_aio_context_home_thread(ctx
));
564 qemu_lockcnt_inc(&ctx
->list_lock
);
566 if (ctx
->poll_max_ns
) {
567 start
= qemu_clock_get_ns(QEMU_CLOCK_REALTIME
);
570 timeout
= blocking
? aio_compute_timeout(ctx
) : 0;
571 progress
= try_poll_mode(ctx
, &timeout
);
572 assert(!(timeout
&& progress
));
575 * aio_notify can avoid the expensive event_notifier_set if
576 * everything (file descriptors, bottom halves, timers) will
577 * be re-evaluated before the next blocking poll(). This is
578 * already true when aio_poll is called with blocking == false;
579 * if blocking == true, it is only true after poll() returns,
580 * so disable the optimization now.
582 use_notify_me
= timeout
!= 0;
584 qatomic_set(&ctx
->notify_me
, qatomic_read(&ctx
->notify_me
) + 2);
586 * Write ctx->notify_me before reading ctx->notified. Pairs with
587 * smp_mb in aio_notify().
591 /* Don't block if aio_notify() was called */
592 if (qatomic_read(&ctx
->notified
)) {
597 /* If polling is allowed, non-blocking aio_poll does not need the
598 * system call---a single round of run_poll_handlers_once suffices.
600 if (timeout
|| ctx
->fdmon_ops
->need_wait(ctx
)) {
601 ret
= ctx
->fdmon_ops
->wait(ctx
, &ready_list
, timeout
);
605 /* Finish the poll before clearing the flag. */
606 qatomic_store_release(&ctx
->notify_me
,
607 qatomic_read(&ctx
->notify_me
) - 2);
610 aio_notify_accept(ctx
);
612 /* Adjust polling time */
613 if (ctx
->poll_max_ns
) {
614 int64_t block_ns
= qemu_clock_get_ns(QEMU_CLOCK_REALTIME
) - start
;
616 if (block_ns
<= ctx
->poll_ns
) {
617 /* This is the sweet spot, no adjustment needed */
618 } else if (block_ns
> ctx
->poll_max_ns
) {
619 /* We'd have to poll for too long, poll less */
620 int64_t old
= ctx
->poll_ns
;
622 if (ctx
->poll_shrink
) {
623 ctx
->poll_ns
/= ctx
->poll_shrink
;
628 trace_poll_shrink(ctx
, old
, ctx
->poll_ns
);
629 } else if (ctx
->poll_ns
< ctx
->poll_max_ns
&&
630 block_ns
< ctx
->poll_max_ns
) {
631 /* There is room to grow, poll longer */
632 int64_t old
= ctx
->poll_ns
;
633 int64_t grow
= ctx
->poll_grow
;
640 ctx
->poll_ns
*= grow
;
642 ctx
->poll_ns
= 4000; /* start polling at 4 microseconds */
645 if (ctx
->poll_ns
> ctx
->poll_max_ns
) {
646 ctx
->poll_ns
= ctx
->poll_max_ns
;
649 trace_poll_grow(ctx
, old
, ctx
->poll_ns
);
653 progress
|= aio_bh_poll(ctx
);
656 progress
|= aio_dispatch_ready_handlers(ctx
, &ready_list
);
659 aio_free_deleted_handlers(ctx
);
661 qemu_lockcnt_dec(&ctx
->list_lock
);
663 progress
|= timerlistgroup_run_timers(&ctx
->tlg
);
668 void aio_context_setup(AioContext
*ctx
)
670 ctx
->fdmon_ops
= &fdmon_poll_ops
;
673 /* Use the fastest fd monitoring implementation if available */
674 if (fdmon_io_uring_setup(ctx
)) {
678 fdmon_epoll_setup(ctx
);
681 void aio_context_destroy(AioContext
*ctx
)
683 fdmon_io_uring_destroy(ctx
);
684 fdmon_epoll_disable(ctx
);
685 aio_free_deleted_handlers(ctx
);
688 void aio_context_use_g_source(AioContext
*ctx
)
691 * Disable io_uring when the glib main loop is used because it doesn't
692 * support mixed glib/aio_poll() usage. It relies on aio_poll() being
693 * called regularly so that changes to the monitored file descriptors are
694 * submitted, otherwise a list of pending fd handlers builds up.
696 fdmon_io_uring_destroy(ctx
);
697 aio_free_deleted_handlers(ctx
);
700 void aio_context_set_poll_params(AioContext
*ctx
, int64_t max_ns
,
701 int64_t grow
, int64_t shrink
, Error
**errp
)
703 /* No thread synchronization here, it doesn't matter if an incorrect value
706 ctx
->poll_max_ns
= max_ns
;
708 ctx
->poll_grow
= grow
;
709 ctx
->poll_shrink
= shrink
;