buffer: add buffer_move
[qemu/kevin.git] / aio-posix.c
blob0467f23a6357cbf966f296b71edeeb96223ef1c0
1 /*
2 * QEMU aio implementation
4 * Copyright IBM, Corp. 2008
6 * Authors:
7 * Anthony Liguori <aliguori@us.ibm.com>
9 * This work is licensed under the terms of the GNU GPL, version 2. See
10 * the COPYING file in the top-level directory.
12 * Contributions after 2012-01-13 are licensed under the terms of the
13 * GNU GPL, version 2 or (at your option) any later version.
16 #include "qemu-common.h"
17 #include "block/block.h"
18 #include "qemu/queue.h"
19 #include "qemu/sockets.h"
21 struct AioHandler
23 GPollFD pfd;
24 IOHandler *io_read;
25 IOHandler *io_write;
26 int deleted;
27 void *opaque;
28 bool is_external;
29 QLIST_ENTRY(AioHandler) node;
32 static AioHandler *find_aio_handler(AioContext *ctx, int fd)
34 AioHandler *node;
36 QLIST_FOREACH(node, &ctx->aio_handlers, node) {
37 if (node->pfd.fd == fd)
38 if (!node->deleted)
39 return node;
42 return NULL;
45 void aio_set_fd_handler(AioContext *ctx,
46 int fd,
47 bool is_external,
48 IOHandler *io_read,
49 IOHandler *io_write,
50 void *opaque)
52 AioHandler *node;
54 node = find_aio_handler(ctx, fd);
56 /* Are we deleting the fd handler? */
57 if (!io_read && !io_write) {
58 if (node) {
59 g_source_remove_poll(&ctx->source, &node->pfd);
61 /* If the lock is held, just mark the node as deleted */
62 if (ctx->walking_handlers) {
63 node->deleted = 1;
64 node->pfd.revents = 0;
65 } else {
66 /* Otherwise, delete it for real. We can't just mark it as
67 * deleted because deleted nodes are only cleaned up after
68 * releasing the walking_handlers lock.
70 QLIST_REMOVE(node, node);
71 g_free(node);
74 } else {
75 if (node == NULL) {
76 /* Alloc and insert if it's not already there */
77 node = g_new0(AioHandler, 1);
78 node->pfd.fd = fd;
79 QLIST_INSERT_HEAD(&ctx->aio_handlers, node, node);
81 g_source_add_poll(&ctx->source, &node->pfd);
83 /* Update handler with latest information */
84 node->io_read = io_read;
85 node->io_write = io_write;
86 node->opaque = opaque;
87 node->is_external = is_external;
89 node->pfd.events = (io_read ? G_IO_IN | G_IO_HUP | G_IO_ERR : 0);
90 node->pfd.events |= (io_write ? G_IO_OUT | G_IO_ERR : 0);
93 aio_notify(ctx);
96 void aio_set_event_notifier(AioContext *ctx,
97 EventNotifier *notifier,
98 bool is_external,
99 EventNotifierHandler *io_read)
101 aio_set_fd_handler(ctx, event_notifier_get_fd(notifier),
102 is_external, (IOHandler *)io_read, NULL, notifier);
105 bool aio_prepare(AioContext *ctx)
107 return false;
110 bool aio_pending(AioContext *ctx)
112 AioHandler *node;
114 QLIST_FOREACH(node, &ctx->aio_handlers, node) {
115 int revents;
117 revents = node->pfd.revents & node->pfd.events;
118 if (revents & (G_IO_IN | G_IO_HUP | G_IO_ERR) && node->io_read) {
119 return true;
121 if (revents & (G_IO_OUT | G_IO_ERR) && node->io_write) {
122 return true;
126 return false;
129 bool aio_dispatch(AioContext *ctx)
131 AioHandler *node;
132 bool progress = false;
135 * If there are callbacks left that have been queued, we need to call them.
136 * Do not call select in this case, because it is possible that the caller
137 * does not need a complete flush (as is the case for aio_poll loops).
139 if (aio_bh_poll(ctx)) {
140 progress = true;
144 * We have to walk very carefully in case aio_set_fd_handler is
145 * called while we're walking.
147 node = QLIST_FIRST(&ctx->aio_handlers);
148 while (node) {
149 AioHandler *tmp;
150 int revents;
152 ctx->walking_handlers++;
154 revents = node->pfd.revents & node->pfd.events;
155 node->pfd.revents = 0;
157 if (!node->deleted &&
158 (revents & (G_IO_IN | G_IO_HUP | G_IO_ERR)) &&
159 node->io_read) {
160 node->io_read(node->opaque);
162 /* aio_notify() does not count as progress */
163 if (node->opaque != &ctx->notifier) {
164 progress = true;
167 if (!node->deleted &&
168 (revents & (G_IO_OUT | G_IO_ERR)) &&
169 node->io_write) {
170 node->io_write(node->opaque);
171 progress = true;
174 tmp = node;
175 node = QLIST_NEXT(node, node);
177 ctx->walking_handlers--;
179 if (!ctx->walking_handlers && tmp->deleted) {
180 QLIST_REMOVE(tmp, node);
181 g_free(tmp);
185 /* Run our timers */
186 progress |= timerlistgroup_run_timers(&ctx->tlg);
188 return progress;
191 /* These thread-local variables are used only in a small part of aio_poll
192 * around the call to the poll() system call. In particular they are not
193 * used while aio_poll is performing callbacks, which makes it much easier
194 * to think about reentrancy!
196 * Stack-allocated arrays would be perfect but they have size limitations;
197 * heap allocation is expensive enough that we want to reuse arrays across
198 * calls to aio_poll(). And because poll() has to be called without holding
199 * any lock, the arrays cannot be stored in AioContext. Thread-local data
200 * has none of the disadvantages of these three options.
202 static __thread GPollFD *pollfds;
203 static __thread AioHandler **nodes;
204 static __thread unsigned npfd, nalloc;
205 static __thread Notifier pollfds_cleanup_notifier;
207 static void pollfds_cleanup(Notifier *n, void *unused)
209 g_assert(npfd == 0);
210 g_free(pollfds);
211 g_free(nodes);
212 nalloc = 0;
215 static void add_pollfd(AioHandler *node)
217 if (npfd == nalloc) {
218 if (nalloc == 0) {
219 pollfds_cleanup_notifier.notify = pollfds_cleanup;
220 qemu_thread_atexit_add(&pollfds_cleanup_notifier);
221 nalloc = 8;
222 } else {
223 g_assert(nalloc <= INT_MAX);
224 nalloc *= 2;
226 pollfds = g_renew(GPollFD, pollfds, nalloc);
227 nodes = g_renew(AioHandler *, nodes, nalloc);
229 nodes[npfd] = node;
230 pollfds[npfd] = (GPollFD) {
231 .fd = node->pfd.fd,
232 .events = node->pfd.events,
234 npfd++;
237 bool aio_poll(AioContext *ctx, bool blocking)
239 AioHandler *node;
240 int i, ret;
241 bool progress;
242 int64_t timeout;
244 aio_context_acquire(ctx);
245 progress = false;
247 /* aio_notify can avoid the expensive event_notifier_set if
248 * everything (file descriptors, bottom halves, timers) will
249 * be re-evaluated before the next blocking poll(). This is
250 * already true when aio_poll is called with blocking == false;
251 * if blocking == true, it is only true after poll() returns,
252 * so disable the optimization now.
254 if (blocking) {
255 atomic_add(&ctx->notify_me, 2);
258 ctx->walking_handlers++;
260 assert(npfd == 0);
262 /* fill pollfds */
263 QLIST_FOREACH(node, &ctx->aio_handlers, node) {
264 if (!node->deleted && node->pfd.events
265 && aio_node_check(ctx, node->is_external)) {
266 add_pollfd(node);
270 timeout = blocking ? aio_compute_timeout(ctx) : 0;
272 /* wait until next event */
273 if (timeout) {
274 aio_context_release(ctx);
276 ret = qemu_poll_ns((GPollFD *)pollfds, npfd, timeout);
277 if (blocking) {
278 atomic_sub(&ctx->notify_me, 2);
280 if (timeout) {
281 aio_context_acquire(ctx);
284 aio_notify_accept(ctx);
286 /* if we have any readable fds, dispatch event */
287 if (ret > 0) {
288 for (i = 0; i < npfd; i++) {
289 nodes[i]->pfd.revents = pollfds[i].revents;
293 npfd = 0;
294 ctx->walking_handlers--;
296 /* Run dispatch even if there were no readable fds to run timers */
297 if (aio_dispatch(ctx)) {
298 progress = true;
301 aio_context_release(ctx);
303 return progress;