2 * QEMU aio implementation
4 * Copyright IBM, Corp. 2008
7 * Anthony Liguori <aliguori@us.ibm.com>
9 * This work is licensed under the terms of the GNU GPL, version 2. See
10 * the COPYING file in the top-level directory.
12 * Contributions after 2012-01-13 are licensed under the terms of the
13 * GNU GPL, version 2 or (at your option) any later version.
16 #include "qemu/osdep.h"
17 #include "block/block.h"
19 #include "qemu/rcu_queue.h"
20 #include "qemu/sockets.h"
21 #include "qemu/cutils.h"
23 #include "aio-posix.h"
25 /* Stop userspace polling on a handler if it isn't active for some time */
26 #define POLL_IDLE_INTERVAL_NS (7 * NANOSECONDS_PER_SECOND)
28 bool aio_poll_disabled(AioContext
*ctx
)
30 return atomic_read(&ctx
->poll_disable_cnt
);
33 void aio_add_ready_handler(AioHandlerList
*ready_list
,
37 QLIST_SAFE_REMOVE(node
, node_ready
); /* remove from nested parent's list */
38 node
->pfd
.revents
= revents
;
39 QLIST_INSERT_HEAD(ready_list
, node
, node_ready
);
42 static AioHandler
*find_aio_handler(AioContext
*ctx
, int fd
)
46 QLIST_FOREACH(node
, &ctx
->aio_handlers
, node
) {
47 if (node
->pfd
.fd
== fd
) {
48 if (!QLIST_IS_INSERTED(node
, node_deleted
)) {
57 static bool aio_remove_fd_handler(AioContext
*ctx
, AioHandler
*node
)
59 /* If the GSource is in the process of being destroyed then
60 * g_source_remove_poll() causes an assertion failure. Skip
61 * removal in that case, because glib cleans up its state during
64 if (!g_source_is_destroyed(&ctx
->source
)) {
65 g_source_remove_poll(&ctx
->source
, &node
->pfd
);
68 node
->pfd
.revents
= 0;
70 /* If the fd monitor has already marked it deleted, leave it alone */
71 if (QLIST_IS_INSERTED(node
, node_deleted
)) {
75 /* If a read is in progress, just mark the node as deleted */
76 if (qemu_lockcnt_count(&ctx
->list_lock
)) {
77 QLIST_INSERT_HEAD_RCU(&ctx
->deleted_aio_handlers
, node
, node_deleted
);
80 /* Otherwise, delete it for real. We can't just mark it as
81 * deleted because deleted nodes are only cleaned up while
82 * no one is walking the handlers list.
84 QLIST_SAFE_REMOVE(node
, node_poll
);
85 QLIST_REMOVE(node
, node
);
89 void aio_set_fd_handler(AioContext
*ctx
,
98 AioHandler
*new_node
= NULL
;
100 bool deleted
= false;
101 int poll_disable_change
;
103 qemu_lockcnt_lock(&ctx
->list_lock
);
105 node
= find_aio_handler(ctx
, fd
);
107 /* Are we deleting the fd handler? */
108 if (!io_read
&& !io_write
&& !io_poll
) {
110 qemu_lockcnt_unlock(&ctx
->list_lock
);
113 /* Clean events in order to unregister fd from the ctx epoll. */
114 node
->pfd
.events
= 0;
116 poll_disable_change
= -!node
->io_poll
;
118 poll_disable_change
= !io_poll
- (node
&& !node
->io_poll
);
122 /* Alloc and insert if it's not already there */
123 new_node
= g_new0(AioHandler
, 1);
125 /* Update handler with latest information */
126 new_node
->io_read
= io_read
;
127 new_node
->io_write
= io_write
;
128 new_node
->io_poll
= io_poll
;
129 new_node
->opaque
= opaque
;
130 new_node
->is_external
= is_external
;
133 new_node
->pfd
.fd
= fd
;
135 new_node
->pfd
= node
->pfd
;
137 g_source_add_poll(&ctx
->source
, &new_node
->pfd
);
139 new_node
->pfd
.events
= (io_read
? G_IO_IN
| G_IO_HUP
| G_IO_ERR
: 0);
140 new_node
->pfd
.events
|= (io_write
? G_IO_OUT
| G_IO_ERR
: 0);
142 QLIST_INSERT_HEAD_RCU(&ctx
->aio_handlers
, new_node
, node
);
145 /* No need to order poll_disable_cnt writes against other updates;
146 * the counter is only used to avoid wasting time and latency on
147 * iterated polling when the system call will be ultimately necessary.
148 * Changing handlers is a rare event, and a little wasted polling until
149 * the aio_notify below is not an issue.
151 atomic_set(&ctx
->poll_disable_cnt
,
152 atomic_read(&ctx
->poll_disable_cnt
) + poll_disable_change
);
154 ctx
->fdmon_ops
->update(ctx
, node
, new_node
);
156 deleted
= aio_remove_fd_handler(ctx
, node
);
158 qemu_lockcnt_unlock(&ctx
->list_lock
);
166 void aio_set_fd_poll(AioContext
*ctx
, int fd
,
167 IOHandler
*io_poll_begin
,
168 IOHandler
*io_poll_end
)
170 AioHandler
*node
= find_aio_handler(ctx
, fd
);
176 node
->io_poll_begin
= io_poll_begin
;
177 node
->io_poll_end
= io_poll_end
;
180 void aio_set_event_notifier(AioContext
*ctx
,
181 EventNotifier
*notifier
,
183 EventNotifierHandler
*io_read
,
186 aio_set_fd_handler(ctx
, event_notifier_get_fd(notifier
), is_external
,
187 (IOHandler
*)io_read
, NULL
, io_poll
, notifier
);
190 void aio_set_event_notifier_poll(AioContext
*ctx
,
191 EventNotifier
*notifier
,
192 EventNotifierHandler
*io_poll_begin
,
193 EventNotifierHandler
*io_poll_end
)
195 aio_set_fd_poll(ctx
, event_notifier_get_fd(notifier
),
196 (IOHandler
*)io_poll_begin
,
197 (IOHandler
*)io_poll_end
);
200 static bool poll_set_started(AioContext
*ctx
, bool started
)
203 bool progress
= false;
205 if (started
== ctx
->poll_started
) {
209 ctx
->poll_started
= started
;
211 qemu_lockcnt_inc(&ctx
->list_lock
);
212 QLIST_FOREACH(node
, &ctx
->poll_aio_handlers
, node_poll
) {
215 if (QLIST_IS_INSERTED(node
, node_deleted
)) {
220 fn
= node
->io_poll_begin
;
222 fn
= node
->io_poll_end
;
229 /* Poll one last time in case ->io_poll_end() raced with the event */
231 progress
= node
->io_poll(node
->opaque
) || progress
;
234 qemu_lockcnt_dec(&ctx
->list_lock
);
240 bool aio_prepare(AioContext
*ctx
)
242 /* Poll mode cannot be used with glib's event loop, disable it. */
243 poll_set_started(ctx
, false);
248 bool aio_pending(AioContext
*ctx
)
254 * We have to walk very carefully in case aio_set_fd_handler is
255 * called while we're walking.
257 qemu_lockcnt_inc(&ctx
->list_lock
);
259 QLIST_FOREACH_RCU(node
, &ctx
->aio_handlers
, node
) {
262 revents
= node
->pfd
.revents
& node
->pfd
.events
;
263 if (revents
& (G_IO_IN
| G_IO_HUP
| G_IO_ERR
) && node
->io_read
&&
264 aio_node_check(ctx
, node
->is_external
)) {
268 if (revents
& (G_IO_OUT
| G_IO_ERR
) && node
->io_write
&&
269 aio_node_check(ctx
, node
->is_external
)) {
274 qemu_lockcnt_dec(&ctx
->list_lock
);
279 static void aio_free_deleted_handlers(AioContext
*ctx
)
283 if (QLIST_EMPTY_RCU(&ctx
->deleted_aio_handlers
)) {
286 if (!qemu_lockcnt_dec_if_lock(&ctx
->list_lock
)) {
287 return; /* we are nested, let the parent do the freeing */
290 while ((node
= QLIST_FIRST_RCU(&ctx
->deleted_aio_handlers
))) {
291 QLIST_REMOVE(node
, node
);
292 QLIST_REMOVE(node
, node_deleted
);
293 QLIST_SAFE_REMOVE(node
, node_poll
);
297 qemu_lockcnt_inc_and_unlock(&ctx
->list_lock
);
300 static bool aio_dispatch_handler(AioContext
*ctx
, AioHandler
*node
)
302 bool progress
= false;
305 revents
= node
->pfd
.revents
& node
->pfd
.events
;
306 node
->pfd
.revents
= 0;
309 * Start polling AioHandlers when they become ready because activity is
310 * likely to continue. Note that starvation is theoretically possible when
311 * fdmon_supports_polling(), but only until the fd fires for the first
314 if (!QLIST_IS_INSERTED(node
, node_deleted
) &&
315 !QLIST_IS_INSERTED(node
, node_poll
) &&
317 trace_poll_add(ctx
, node
, node
->pfd
.fd
, revents
);
318 if (ctx
->poll_started
&& node
->io_poll_begin
) {
319 node
->io_poll_begin(node
->opaque
);
321 QLIST_INSERT_HEAD(&ctx
->poll_aio_handlers
, node
, node_poll
);
324 if (!QLIST_IS_INSERTED(node
, node_deleted
) &&
325 (revents
& (G_IO_IN
| G_IO_HUP
| G_IO_ERR
)) &&
326 aio_node_check(ctx
, node
->is_external
) &&
328 node
->io_read(node
->opaque
);
330 /* aio_notify() does not count as progress */
331 if (node
->opaque
!= &ctx
->notifier
) {
335 if (!QLIST_IS_INSERTED(node
, node_deleted
) &&
336 (revents
& (G_IO_OUT
| G_IO_ERR
)) &&
337 aio_node_check(ctx
, node
->is_external
) &&
339 node
->io_write(node
->opaque
);
347 * If we have a list of ready handlers then this is more efficient than
348 * scanning all handlers with aio_dispatch_handlers().
350 static bool aio_dispatch_ready_handlers(AioContext
*ctx
,
351 AioHandlerList
*ready_list
)
353 bool progress
= false;
356 while ((node
= QLIST_FIRST(ready_list
))) {
357 QLIST_REMOVE(node
, node_ready
);
358 progress
= aio_dispatch_handler(ctx
, node
) || progress
;
364 /* Slower than aio_dispatch_ready_handlers() but only used via glib */
365 static bool aio_dispatch_handlers(AioContext
*ctx
)
367 AioHandler
*node
, *tmp
;
368 bool progress
= false;
370 QLIST_FOREACH_SAFE_RCU(node
, &ctx
->aio_handlers
, node
, tmp
) {
371 progress
= aio_dispatch_handler(ctx
, node
) || progress
;
377 void aio_dispatch(AioContext
*ctx
)
379 qemu_lockcnt_inc(&ctx
->list_lock
);
381 aio_dispatch_handlers(ctx
);
382 aio_free_deleted_handlers(ctx
);
383 qemu_lockcnt_dec(&ctx
->list_lock
);
385 timerlistgroup_run_timers(&ctx
->tlg
);
388 static bool run_poll_handlers_once(AioContext
*ctx
,
392 bool progress
= false;
396 QLIST_FOREACH_SAFE(node
, &ctx
->poll_aio_handlers
, node_poll
, tmp
) {
397 if (aio_node_check(ctx
, node
->is_external
) &&
398 node
->io_poll(node
->opaque
)) {
399 node
->poll_idle_timeout
= now
+ POLL_IDLE_INTERVAL_NS
;
402 * Polling was successful, exit try_poll_mode immediately
403 * to adjust the next polling time.
406 if (node
->opaque
!= &ctx
->notifier
) {
411 /* Caller handles freeing deleted nodes. Don't do it here. */
417 static bool fdmon_supports_polling(AioContext
*ctx
)
419 return ctx
->fdmon_ops
->need_wait
!= aio_poll_disabled
;
422 static bool remove_idle_poll_handlers(AioContext
*ctx
, int64_t now
)
426 bool progress
= false;
429 * File descriptor monitoring implementations without userspace polling
430 * support suffer from starvation when a subset of handlers is polled
431 * because fds will not be processed in a timely fashion. Don't remove
432 * idle poll handlers.
434 if (!fdmon_supports_polling(ctx
)) {
438 QLIST_FOREACH_SAFE(node
, &ctx
->poll_aio_handlers
, node_poll
, tmp
) {
439 if (node
->poll_idle_timeout
== 0LL) {
440 node
->poll_idle_timeout
= now
+ POLL_IDLE_INTERVAL_NS
;
441 } else if (now
>= node
->poll_idle_timeout
) {
442 trace_poll_remove(ctx
, node
, node
->pfd
.fd
);
443 node
->poll_idle_timeout
= 0LL;
444 QLIST_SAFE_REMOVE(node
, node_poll
);
445 if (ctx
->poll_started
&& node
->io_poll_end
) {
446 node
->io_poll_end(node
->opaque
);
449 * Final poll in case ->io_poll_end() races with an event.
450 * Nevermind about re-adding the handler in the rare case where
451 * this causes progress.
453 progress
= node
->io_poll(node
->opaque
) || progress
;
461 /* run_poll_handlers:
462 * @ctx: the AioContext
463 * @max_ns: maximum time to poll for, in nanoseconds
465 * Polls for a given time.
467 * Note that ctx->notify_me must be non-zero so this function can detect
470 * Note that the caller must have incremented ctx->list_lock.
472 * Returns: true if progress was made, false otherwise
474 static bool run_poll_handlers(AioContext
*ctx
, int64_t max_ns
, int64_t *timeout
)
477 int64_t start_time
, elapsed_time
;
479 assert(ctx
->notify_me
);
480 assert(qemu_lockcnt_count(&ctx
->list_lock
) > 0);
482 trace_run_poll_handlers_begin(ctx
, max_ns
, *timeout
);
485 * Optimization: ->io_poll() handlers often contain RCU read critical
486 * sections and we therefore see many rcu_read_lock() -> rcu_read_unlock()
487 * -> rcu_read_lock() -> ... sequences with expensive memory
488 * synchronization primitives. Make the entire polling loop an RCU
489 * critical section because nested rcu_read_lock()/rcu_read_unlock() calls
492 RCU_READ_LOCK_GUARD();
494 start_time
= qemu_clock_get_ns(QEMU_CLOCK_REALTIME
);
496 progress
= run_poll_handlers_once(ctx
, start_time
, timeout
);
497 elapsed_time
= qemu_clock_get_ns(QEMU_CLOCK_REALTIME
) - start_time
;
498 max_ns
= qemu_soonest_timeout(*timeout
, max_ns
);
499 assert(!(max_ns
&& progress
));
500 } while (elapsed_time
< max_ns
&& !ctx
->fdmon_ops
->need_wait(ctx
));
502 if (remove_idle_poll_handlers(ctx
, start_time
+ elapsed_time
)) {
507 /* If time has passed with no successful polling, adjust *timeout to
508 * keep the same ending time.
510 if (*timeout
!= -1) {
511 *timeout
-= MIN(*timeout
, elapsed_time
);
514 trace_run_poll_handlers_end(ctx
, progress
, *timeout
);
519 * @ctx: the AioContext
520 * @timeout: timeout for blocking wait, computed by the caller and updated if
523 * ctx->notify_me must be non-zero so this function can detect aio_notify().
525 * Note that the caller must have incremented ctx->list_lock.
527 * Returns: true if progress was made, false otherwise
529 static bool try_poll_mode(AioContext
*ctx
, int64_t *timeout
)
533 if (QLIST_EMPTY_RCU(&ctx
->poll_aio_handlers
)) {
537 max_ns
= qemu_soonest_timeout(*timeout
, ctx
->poll_ns
);
538 if (max_ns
&& !ctx
->fdmon_ops
->need_wait(ctx
)) {
539 poll_set_started(ctx
, true);
541 if (run_poll_handlers(ctx
, max_ns
, timeout
)) {
546 if (poll_set_started(ctx
, false)) {
554 bool aio_poll(AioContext
*ctx
, bool blocking
)
556 AioHandlerList ready_list
= QLIST_HEAD_INITIALIZER(ready_list
);
563 * There cannot be two concurrent aio_poll calls for the same AioContext (or
564 * an aio_poll concurrent with a GSource prepare/check/dispatch callback).
565 * We rely on this below to avoid slow locked accesses to ctx->notify_me.
567 assert(in_aio_context_home_thread(ctx
));
569 /* aio_notify can avoid the expensive event_notifier_set if
570 * everything (file descriptors, bottom halves, timers) will
571 * be re-evaluated before the next blocking poll(). This is
572 * already true when aio_poll is called with blocking == false;
573 * if blocking == true, it is only true after poll() returns,
574 * so disable the optimization now.
577 atomic_set(&ctx
->notify_me
, atomic_read(&ctx
->notify_me
) + 2);
579 * Write ctx->notify_me before computing the timeout
580 * (reading bottom half flags, etc.). Pairs with
581 * smp_mb in aio_notify().
586 qemu_lockcnt_inc(&ctx
->list_lock
);
588 if (ctx
->poll_max_ns
) {
589 start
= qemu_clock_get_ns(QEMU_CLOCK_REALTIME
);
592 timeout
= blocking
? aio_compute_timeout(ctx
) : 0;
593 progress
= try_poll_mode(ctx
, &timeout
);
594 assert(!(timeout
&& progress
));
596 /* If polling is allowed, non-blocking aio_poll does not need the
597 * system call---a single round of run_poll_handlers_once suffices.
599 if (timeout
|| ctx
->fdmon_ops
->need_wait(ctx
)) {
600 ret
= ctx
->fdmon_ops
->wait(ctx
, &ready_list
, timeout
);
604 /* Finish the poll before clearing the flag. */
605 atomic_store_release(&ctx
->notify_me
, atomic_read(&ctx
->notify_me
) - 2);
606 aio_notify_accept(ctx
);
609 /* Adjust polling time */
610 if (ctx
->poll_max_ns
) {
611 int64_t block_ns
= qemu_clock_get_ns(QEMU_CLOCK_REALTIME
) - start
;
613 if (block_ns
<= ctx
->poll_ns
) {
614 /* This is the sweet spot, no adjustment needed */
615 } else if (block_ns
> ctx
->poll_max_ns
) {
616 /* We'd have to poll for too long, poll less */
617 int64_t old
= ctx
->poll_ns
;
619 if (ctx
->poll_shrink
) {
620 ctx
->poll_ns
/= ctx
->poll_shrink
;
625 trace_poll_shrink(ctx
, old
, ctx
->poll_ns
);
626 } else if (ctx
->poll_ns
< ctx
->poll_max_ns
&&
627 block_ns
< ctx
->poll_max_ns
) {
628 /* There is room to grow, poll longer */
629 int64_t old
= ctx
->poll_ns
;
630 int64_t grow
= ctx
->poll_grow
;
637 ctx
->poll_ns
*= grow
;
639 ctx
->poll_ns
= 4000; /* start polling at 4 microseconds */
642 if (ctx
->poll_ns
> ctx
->poll_max_ns
) {
643 ctx
->poll_ns
= ctx
->poll_max_ns
;
646 trace_poll_grow(ctx
, old
, ctx
->poll_ns
);
650 progress
|= aio_bh_poll(ctx
);
653 progress
|= aio_dispatch_ready_handlers(ctx
, &ready_list
);
656 aio_free_deleted_handlers(ctx
);
658 qemu_lockcnt_dec(&ctx
->list_lock
);
660 progress
|= timerlistgroup_run_timers(&ctx
->tlg
);
665 void aio_context_setup(AioContext
*ctx
)
667 ctx
->fdmon_ops
= &fdmon_poll_ops
;
670 /* Use the fastest fd monitoring implementation if available */
671 if (fdmon_io_uring_setup(ctx
)) {
675 fdmon_epoll_setup(ctx
);
678 void aio_context_destroy(AioContext
*ctx
)
680 fdmon_io_uring_destroy(ctx
);
681 fdmon_epoll_disable(ctx
);
682 aio_free_deleted_handlers(ctx
);
685 void aio_context_use_g_source(AioContext
*ctx
)
688 * Disable io_uring when the glib main loop is used because it doesn't
689 * support mixed glib/aio_poll() usage. It relies on aio_poll() being
690 * called regularly so that changes to the monitored file descriptors are
691 * submitted, otherwise a list of pending fd handlers builds up.
693 fdmon_io_uring_destroy(ctx
);
694 aio_free_deleted_handlers(ctx
);
697 void aio_context_set_poll_params(AioContext
*ctx
, int64_t max_ns
,
698 int64_t grow
, int64_t shrink
, Error
**errp
)
700 /* No thread synchronization here, it doesn't matter if an incorrect value
703 ctx
->poll_max_ns
= max_ns
;
705 ctx
->poll_grow
= grow
;
706 ctx
->poll_shrink
= shrink
;