qom: eliminate identical functions
[qemu.git] / util / aio-posix.c
blob30f5354b1e974259e92e10133ae0a377371d34dd
1 /*
2 * QEMU aio implementation
4 * Copyright IBM, Corp. 2008
6 * Authors:
7 * Anthony Liguori <aliguori@us.ibm.com>
9 * This work is licensed under the terms of the GNU GPL, version 2. See
10 * the COPYING file in the top-level directory.
12 * Contributions after 2012-01-13 are licensed under the terms of the
13 * GNU GPL, version 2 or (at your option) any later version.
16 #include "qemu/osdep.h"
17 #include "block/block.h"
18 #include "qemu/main-loop.h"
19 #include "qemu/rcu.h"
20 #include "qemu/rcu_queue.h"
21 #include "qemu/sockets.h"
22 #include "qemu/cutils.h"
23 #include "trace.h"
24 #include "aio-posix.h"
26 /* Stop userspace polling on a handler if it isn't active for some time */
27 #define POLL_IDLE_INTERVAL_NS (7 * NANOSECONDS_PER_SECOND)
29 bool aio_poll_disabled(AioContext *ctx)
31 return qatomic_read(&ctx->poll_disable_cnt);
34 void aio_add_ready_handler(AioHandlerList *ready_list,
35 AioHandler *node,
36 int revents)
38 QLIST_SAFE_REMOVE(node, node_ready); /* remove from nested parent's list */
39 node->pfd.revents = revents;
40 QLIST_INSERT_HEAD(ready_list, node, node_ready);
43 static AioHandler *find_aio_handler(AioContext *ctx, int fd)
45 AioHandler *node;
47 QLIST_FOREACH(node, &ctx->aio_handlers, node) {
48 if (node->pfd.fd == fd) {
49 if (!QLIST_IS_INSERTED(node, node_deleted)) {
50 return node;
55 return NULL;
58 static bool aio_remove_fd_handler(AioContext *ctx, AioHandler *node)
60 /* If the GSource is in the process of being destroyed then
61 * g_source_remove_poll() causes an assertion failure. Skip
62 * removal in that case, because glib cleans up its state during
63 * destruction anyway.
65 if (!g_source_is_destroyed(&ctx->source)) {
66 g_source_remove_poll(&ctx->source, &node->pfd);
69 node->pfd.revents = 0;
71 /* If the fd monitor has already marked it deleted, leave it alone */
72 if (QLIST_IS_INSERTED(node, node_deleted)) {
73 return false;
76 /* If a read is in progress, just mark the node as deleted */
77 if (qemu_lockcnt_count(&ctx->list_lock)) {
78 QLIST_INSERT_HEAD_RCU(&ctx->deleted_aio_handlers, node, node_deleted);
79 return false;
81 /* Otherwise, delete it for real. We can't just mark it as
82 * deleted because deleted nodes are only cleaned up while
83 * no one is walking the handlers list.
85 QLIST_SAFE_REMOVE(node, node_poll);
86 QLIST_REMOVE(node, node);
87 return true;
90 void aio_set_fd_handler(AioContext *ctx,
91 int fd,
92 bool is_external,
93 IOHandler *io_read,
94 IOHandler *io_write,
95 AioPollFn *io_poll,
96 void *opaque)
98 AioHandler *node;
99 AioHandler *new_node = NULL;
100 bool is_new = false;
101 bool deleted = false;
102 int poll_disable_change;
104 qemu_lockcnt_lock(&ctx->list_lock);
106 node = find_aio_handler(ctx, fd);
108 /* Are we deleting the fd handler? */
109 if (!io_read && !io_write && !io_poll) {
110 if (node == NULL) {
111 qemu_lockcnt_unlock(&ctx->list_lock);
112 return;
114 /* Clean events in order to unregister fd from the ctx epoll. */
115 node->pfd.events = 0;
117 poll_disable_change = -!node->io_poll;
118 } else {
119 poll_disable_change = !io_poll - (node && !node->io_poll);
120 if (node == NULL) {
121 is_new = true;
123 /* Alloc and insert if it's not already there */
124 new_node = g_new0(AioHandler, 1);
126 /* Update handler with latest information */
127 new_node->io_read = io_read;
128 new_node->io_write = io_write;
129 new_node->io_poll = io_poll;
130 new_node->opaque = opaque;
131 new_node->is_external = is_external;
133 if (is_new) {
134 new_node->pfd.fd = fd;
135 } else {
136 new_node->pfd = node->pfd;
138 g_source_add_poll(&ctx->source, &new_node->pfd);
140 new_node->pfd.events = (io_read ? G_IO_IN | G_IO_HUP | G_IO_ERR : 0);
141 new_node->pfd.events |= (io_write ? G_IO_OUT | G_IO_ERR : 0);
143 QLIST_INSERT_HEAD_RCU(&ctx->aio_handlers, new_node, node);
146 /* No need to order poll_disable_cnt writes against other updates;
147 * the counter is only used to avoid wasting time and latency on
148 * iterated polling when the system call will be ultimately necessary.
149 * Changing handlers is a rare event, and a little wasted polling until
150 * the aio_notify below is not an issue.
152 qatomic_set(&ctx->poll_disable_cnt,
153 qatomic_read(&ctx->poll_disable_cnt) + poll_disable_change);
155 ctx->fdmon_ops->update(ctx, node, new_node);
156 if (node) {
157 deleted = aio_remove_fd_handler(ctx, node);
159 qemu_lockcnt_unlock(&ctx->list_lock);
160 aio_notify(ctx);
162 if (deleted) {
163 g_free(node);
167 void aio_set_fd_poll(AioContext *ctx, int fd,
168 IOHandler *io_poll_begin,
169 IOHandler *io_poll_end)
171 AioHandler *node = find_aio_handler(ctx, fd);
173 if (!node) {
174 return;
177 node->io_poll_begin = io_poll_begin;
178 node->io_poll_end = io_poll_end;
181 void aio_set_event_notifier(AioContext *ctx,
182 EventNotifier *notifier,
183 bool is_external,
184 EventNotifierHandler *io_read,
185 AioPollFn *io_poll)
187 aio_set_fd_handler(ctx, event_notifier_get_fd(notifier), is_external,
188 (IOHandler *)io_read, NULL, io_poll, notifier);
191 void aio_set_event_notifier_poll(AioContext *ctx,
192 EventNotifier *notifier,
193 EventNotifierHandler *io_poll_begin,
194 EventNotifierHandler *io_poll_end)
196 aio_set_fd_poll(ctx, event_notifier_get_fd(notifier),
197 (IOHandler *)io_poll_begin,
198 (IOHandler *)io_poll_end);
201 static bool poll_set_started(AioContext *ctx, bool started)
203 AioHandler *node;
204 bool progress = false;
206 if (started == ctx->poll_started) {
207 return false;
210 ctx->poll_started = started;
212 qemu_lockcnt_inc(&ctx->list_lock);
213 QLIST_FOREACH(node, &ctx->poll_aio_handlers, node_poll) {
214 IOHandler *fn;
216 if (QLIST_IS_INSERTED(node, node_deleted)) {
217 continue;
220 if (started) {
221 fn = node->io_poll_begin;
222 } else {
223 fn = node->io_poll_end;
226 if (fn) {
227 fn(node->opaque);
230 /* Poll one last time in case ->io_poll_end() raced with the event */
231 if (!started) {
232 progress = node->io_poll(node->opaque) || progress;
235 qemu_lockcnt_dec(&ctx->list_lock);
237 return progress;
241 bool aio_prepare(AioContext *ctx)
243 /* Poll mode cannot be used with glib's event loop, disable it. */
244 poll_set_started(ctx, false);
246 return false;
249 bool aio_pending(AioContext *ctx)
251 AioHandler *node;
252 bool result = false;
255 * We have to walk very carefully in case aio_set_fd_handler is
256 * called while we're walking.
258 qemu_lockcnt_inc(&ctx->list_lock);
260 QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
261 int revents;
263 revents = node->pfd.revents & node->pfd.events;
264 if (revents & (G_IO_IN | G_IO_HUP | G_IO_ERR) && node->io_read &&
265 aio_node_check(ctx, node->is_external)) {
266 result = true;
267 break;
269 if (revents & (G_IO_OUT | G_IO_ERR) && node->io_write &&
270 aio_node_check(ctx, node->is_external)) {
271 result = true;
272 break;
275 qemu_lockcnt_dec(&ctx->list_lock);
277 return result;
280 static void aio_free_deleted_handlers(AioContext *ctx)
282 AioHandler *node;
284 if (QLIST_EMPTY_RCU(&ctx->deleted_aio_handlers)) {
285 return;
287 if (!qemu_lockcnt_dec_if_lock(&ctx->list_lock)) {
288 return; /* we are nested, let the parent do the freeing */
291 while ((node = QLIST_FIRST_RCU(&ctx->deleted_aio_handlers))) {
292 QLIST_REMOVE(node, node);
293 QLIST_REMOVE(node, node_deleted);
294 QLIST_SAFE_REMOVE(node, node_poll);
295 g_free(node);
298 qemu_lockcnt_inc_and_unlock(&ctx->list_lock);
301 static bool aio_dispatch_handler(AioContext *ctx, AioHandler *node)
303 bool progress = false;
304 int revents;
306 revents = node->pfd.revents & node->pfd.events;
307 node->pfd.revents = 0;
310 * Start polling AioHandlers when they become ready because activity is
311 * likely to continue. Note that starvation is theoretically possible when
312 * fdmon_supports_polling(), but only until the fd fires for the first
313 * time.
315 if (!QLIST_IS_INSERTED(node, node_deleted) &&
316 !QLIST_IS_INSERTED(node, node_poll) &&
317 node->io_poll) {
318 trace_poll_add(ctx, node, node->pfd.fd, revents);
319 if (ctx->poll_started && node->io_poll_begin) {
320 node->io_poll_begin(node->opaque);
322 QLIST_INSERT_HEAD(&ctx->poll_aio_handlers, node, node_poll);
325 if (!QLIST_IS_INSERTED(node, node_deleted) &&
326 (revents & (G_IO_IN | G_IO_HUP | G_IO_ERR)) &&
327 aio_node_check(ctx, node->is_external) &&
328 node->io_read) {
329 node->io_read(node->opaque);
331 /* aio_notify() does not count as progress */
332 if (node->opaque != &ctx->notifier) {
333 progress = true;
336 if (!QLIST_IS_INSERTED(node, node_deleted) &&
337 (revents & (G_IO_OUT | G_IO_ERR)) &&
338 aio_node_check(ctx, node->is_external) &&
339 node->io_write) {
340 node->io_write(node->opaque);
341 progress = true;
344 return progress;
348 * If we have a list of ready handlers then this is more efficient than
349 * scanning all handlers with aio_dispatch_handlers().
351 static bool aio_dispatch_ready_handlers(AioContext *ctx,
352 AioHandlerList *ready_list)
354 bool progress = false;
355 AioHandler *node;
357 while ((node = QLIST_FIRST(ready_list))) {
358 QLIST_REMOVE(node, node_ready);
359 progress = aio_dispatch_handler(ctx, node) || progress;
362 return progress;
365 /* Slower than aio_dispatch_ready_handlers() but only used via glib */
366 static bool aio_dispatch_handlers(AioContext *ctx)
368 AioHandler *node, *tmp;
369 bool progress = false;
371 QLIST_FOREACH_SAFE_RCU(node, &ctx->aio_handlers, node, tmp) {
372 progress = aio_dispatch_handler(ctx, node) || progress;
375 return progress;
378 void aio_dispatch(AioContext *ctx)
380 qemu_lockcnt_inc(&ctx->list_lock);
381 aio_bh_poll(ctx);
382 aio_dispatch_handlers(ctx);
383 aio_free_deleted_handlers(ctx);
384 qemu_lockcnt_dec(&ctx->list_lock);
386 timerlistgroup_run_timers(&ctx->tlg);
389 static bool run_poll_handlers_once(AioContext *ctx,
390 int64_t now,
391 int64_t *timeout)
393 bool progress = false;
394 AioHandler *node;
395 AioHandler *tmp;
397 QLIST_FOREACH_SAFE(node, &ctx->poll_aio_handlers, node_poll, tmp) {
398 if (aio_node_check(ctx, node->is_external) &&
399 node->io_poll(node->opaque)) {
400 node->poll_idle_timeout = now + POLL_IDLE_INTERVAL_NS;
403 * Polling was successful, exit try_poll_mode immediately
404 * to adjust the next polling time.
406 *timeout = 0;
407 if (node->opaque != &ctx->notifier) {
408 progress = true;
412 /* Caller handles freeing deleted nodes. Don't do it here. */
415 return progress;
418 static bool fdmon_supports_polling(AioContext *ctx)
420 return ctx->fdmon_ops->need_wait != aio_poll_disabled;
423 static bool remove_idle_poll_handlers(AioContext *ctx, int64_t now)
425 AioHandler *node;
426 AioHandler *tmp;
427 bool progress = false;
430 * File descriptor monitoring implementations without userspace polling
431 * support suffer from starvation when a subset of handlers is polled
432 * because fds will not be processed in a timely fashion. Don't remove
433 * idle poll handlers.
435 if (!fdmon_supports_polling(ctx)) {
436 return false;
439 QLIST_FOREACH_SAFE(node, &ctx->poll_aio_handlers, node_poll, tmp) {
440 if (node->poll_idle_timeout == 0LL) {
441 node->poll_idle_timeout = now + POLL_IDLE_INTERVAL_NS;
442 } else if (now >= node->poll_idle_timeout) {
443 trace_poll_remove(ctx, node, node->pfd.fd);
444 node->poll_idle_timeout = 0LL;
445 QLIST_SAFE_REMOVE(node, node_poll);
446 if (ctx->poll_started && node->io_poll_end) {
447 node->io_poll_end(node->opaque);
450 * Final poll in case ->io_poll_end() races with an event.
451 * Nevermind about re-adding the handler in the rare case where
452 * this causes progress.
454 progress = node->io_poll(node->opaque) || progress;
459 return progress;
462 /* run_poll_handlers:
463 * @ctx: the AioContext
464 * @max_ns: maximum time to poll for, in nanoseconds
466 * Polls for a given time.
468 * Note that the caller must have incremented ctx->list_lock.
470 * Returns: true if progress was made, false otherwise
472 static bool run_poll_handlers(AioContext *ctx, int64_t max_ns, int64_t *timeout)
474 bool progress;
475 int64_t start_time, elapsed_time;
477 assert(qemu_lockcnt_count(&ctx->list_lock) > 0);
479 trace_run_poll_handlers_begin(ctx, max_ns, *timeout);
482 * Optimization: ->io_poll() handlers often contain RCU read critical
483 * sections and we therefore see many rcu_read_lock() -> rcu_read_unlock()
484 * -> rcu_read_lock() -> ... sequences with expensive memory
485 * synchronization primitives. Make the entire polling loop an RCU
486 * critical section because nested rcu_read_lock()/rcu_read_unlock() calls
487 * are cheap.
489 RCU_READ_LOCK_GUARD();
491 start_time = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
492 do {
493 progress = run_poll_handlers_once(ctx, start_time, timeout);
494 elapsed_time = qemu_clock_get_ns(QEMU_CLOCK_REALTIME) - start_time;
495 max_ns = qemu_soonest_timeout(*timeout, max_ns);
496 assert(!(max_ns && progress));
497 } while (elapsed_time < max_ns && !ctx->fdmon_ops->need_wait(ctx));
499 if (remove_idle_poll_handlers(ctx, start_time + elapsed_time)) {
500 *timeout = 0;
501 progress = true;
504 /* If time has passed with no successful polling, adjust *timeout to
505 * keep the same ending time.
507 if (*timeout != -1) {
508 *timeout -= MIN(*timeout, elapsed_time);
511 trace_run_poll_handlers_end(ctx, progress, *timeout);
512 return progress;
515 /* try_poll_mode:
516 * @ctx: the AioContext
517 * @timeout: timeout for blocking wait, computed by the caller and updated if
518 * polling succeeds.
520 * Note that the caller must have incremented ctx->list_lock.
522 * Returns: true if progress was made, false otherwise
524 static bool try_poll_mode(AioContext *ctx, int64_t *timeout)
526 int64_t max_ns;
528 if (QLIST_EMPTY_RCU(&ctx->poll_aio_handlers)) {
529 return false;
532 max_ns = qemu_soonest_timeout(*timeout, ctx->poll_ns);
533 if (max_ns && !ctx->fdmon_ops->need_wait(ctx)) {
534 poll_set_started(ctx, true);
536 if (run_poll_handlers(ctx, max_ns, timeout)) {
537 return true;
541 if (poll_set_started(ctx, false)) {
542 *timeout = 0;
543 return true;
546 return false;
549 bool aio_poll(AioContext *ctx, bool blocking)
551 AioHandlerList ready_list = QLIST_HEAD_INITIALIZER(ready_list);
552 int ret = 0;
553 bool progress;
554 bool use_notify_me;
555 int64_t timeout;
556 int64_t start = 0;
559 * There cannot be two concurrent aio_poll calls for the same AioContext (or
560 * an aio_poll concurrent with a GSource prepare/check/dispatch callback).
561 * We rely on this below to avoid slow locked accesses to ctx->notify_me.
563 * aio_poll() may only be called in the AioContext's thread. iohandler_ctx
564 * is special in that it runs in the main thread, but that thread's context
565 * is qemu_aio_context.
567 assert(in_aio_context_home_thread(ctx == iohandler_get_aio_context() ?
568 qemu_get_aio_context() : ctx));
570 qemu_lockcnt_inc(&ctx->list_lock);
572 if (ctx->poll_max_ns) {
573 start = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
576 timeout = blocking ? aio_compute_timeout(ctx) : 0;
577 progress = try_poll_mode(ctx, &timeout);
578 assert(!(timeout && progress));
581 * aio_notify can avoid the expensive event_notifier_set if
582 * everything (file descriptors, bottom halves, timers) will
583 * be re-evaluated before the next blocking poll(). This is
584 * already true when aio_poll is called with blocking == false;
585 * if blocking == true, it is only true after poll() returns,
586 * so disable the optimization now.
588 use_notify_me = timeout != 0;
589 if (use_notify_me) {
590 qatomic_set(&ctx->notify_me, qatomic_read(&ctx->notify_me) + 2);
592 * Write ctx->notify_me before reading ctx->notified. Pairs with
593 * smp_mb in aio_notify().
595 smp_mb();
597 /* Don't block if aio_notify() was called */
598 if (qatomic_read(&ctx->notified)) {
599 timeout = 0;
603 /* If polling is allowed, non-blocking aio_poll does not need the
604 * system call---a single round of run_poll_handlers_once suffices.
606 if (timeout || ctx->fdmon_ops->need_wait(ctx)) {
607 ret = ctx->fdmon_ops->wait(ctx, &ready_list, timeout);
610 if (use_notify_me) {
611 /* Finish the poll before clearing the flag. */
612 qatomic_store_release(&ctx->notify_me,
613 qatomic_read(&ctx->notify_me) - 2);
616 aio_notify_accept(ctx);
618 /* Adjust polling time */
619 if (ctx->poll_max_ns) {
620 int64_t block_ns = qemu_clock_get_ns(QEMU_CLOCK_REALTIME) - start;
622 if (block_ns <= ctx->poll_ns) {
623 /* This is the sweet spot, no adjustment needed */
624 } else if (block_ns > ctx->poll_max_ns) {
625 /* We'd have to poll for too long, poll less */
626 int64_t old = ctx->poll_ns;
628 if (ctx->poll_shrink) {
629 ctx->poll_ns /= ctx->poll_shrink;
630 } else {
631 ctx->poll_ns = 0;
634 trace_poll_shrink(ctx, old, ctx->poll_ns);
635 } else if (ctx->poll_ns < ctx->poll_max_ns &&
636 block_ns < ctx->poll_max_ns) {
637 /* There is room to grow, poll longer */
638 int64_t old = ctx->poll_ns;
639 int64_t grow = ctx->poll_grow;
641 if (grow == 0) {
642 grow = 2;
645 if (ctx->poll_ns) {
646 ctx->poll_ns *= grow;
647 } else {
648 ctx->poll_ns = 4000; /* start polling at 4 microseconds */
651 if (ctx->poll_ns > ctx->poll_max_ns) {
652 ctx->poll_ns = ctx->poll_max_ns;
655 trace_poll_grow(ctx, old, ctx->poll_ns);
659 progress |= aio_bh_poll(ctx);
661 if (ret > 0) {
662 progress |= aio_dispatch_ready_handlers(ctx, &ready_list);
665 aio_free_deleted_handlers(ctx);
667 qemu_lockcnt_dec(&ctx->list_lock);
669 progress |= timerlistgroup_run_timers(&ctx->tlg);
671 return progress;
674 void aio_context_setup(AioContext *ctx)
676 ctx->fdmon_ops = &fdmon_poll_ops;
677 ctx->epollfd = -1;
679 /* Use the fastest fd monitoring implementation if available */
680 if (fdmon_io_uring_setup(ctx)) {
681 return;
684 fdmon_epoll_setup(ctx);
687 void aio_context_destroy(AioContext *ctx)
689 fdmon_io_uring_destroy(ctx);
690 fdmon_epoll_disable(ctx);
691 aio_free_deleted_handlers(ctx);
694 void aio_context_use_g_source(AioContext *ctx)
697 * Disable io_uring when the glib main loop is used because it doesn't
698 * support mixed glib/aio_poll() usage. It relies on aio_poll() being
699 * called regularly so that changes to the monitored file descriptors are
700 * submitted, otherwise a list of pending fd handlers builds up.
702 fdmon_io_uring_destroy(ctx);
703 aio_free_deleted_handlers(ctx);
706 void aio_context_set_poll_params(AioContext *ctx, int64_t max_ns,
707 int64_t grow, int64_t shrink, Error **errp)
709 /* No thread synchronization here, it doesn't matter if an incorrect value
710 * is used once.
712 ctx->poll_max_ns = max_ns;
713 ctx->poll_ns = 0;
714 ctx->poll_grow = grow;
715 ctx->poll_shrink = shrink;
717 aio_notify(ctx);