4 * Copyright (c) 2015 Red Hat, Inc.
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Lesser General Public
8 * License as published by the Free Software Foundation; either
9 * version 2 of the License, or (at your option) any later version.
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Lesser General Public License for more details.
16 * You should have received a copy of the GNU Lesser General Public
17 * License along with this library; if not, see <http://www.gnu.org/licenses/>.
21 #include "qemu/osdep.h"
22 #include "io/channel.h"
23 #include "qapi/error.h"
24 #include "qemu/main-loop.h"
25 #include "qemu/module.h"
28 bool qio_channel_has_feature(QIOChannel
*ioc
,
29 QIOChannelFeature feature
)
31 return ioc
->features
& (1 << feature
);
35 void qio_channel_set_feature(QIOChannel
*ioc
,
36 QIOChannelFeature feature
)
38 ioc
->features
|= (1 << feature
);
42 void qio_channel_set_name(QIOChannel
*ioc
,
46 ioc
->name
= g_strdup(name
);
50 ssize_t
qio_channel_readv_full(QIOChannel
*ioc
,
51 const struct iovec
*iov
,
57 QIOChannelClass
*klass
= QIO_CHANNEL_GET_CLASS(ioc
);
60 !qio_channel_has_feature(ioc
, QIO_CHANNEL_FEATURE_FD_PASS
)) {
61 error_setg_errno(errp
, EINVAL
,
62 "Channel does not support file descriptor passing");
66 return klass
->io_readv(ioc
, iov
, niov
, fds
, nfds
, errp
);
70 ssize_t
qio_channel_writev_full(QIOChannel
*ioc
,
71 const struct iovec
*iov
,
77 QIOChannelClass
*klass
= QIO_CHANNEL_GET_CLASS(ioc
);
80 !qio_channel_has_feature(ioc
, QIO_CHANNEL_FEATURE_FD_PASS
)) {
81 error_setg_errno(errp
, EINVAL
,
82 "Channel does not support file descriptor passing");
86 return klass
->io_writev(ioc
, iov
, niov
, fds
, nfds
, errp
);
90 int qio_channel_readv_all_eof(QIOChannel
*ioc
,
91 const struct iovec
*iov
,
96 struct iovec
*local_iov
= g_new(struct iovec
, niov
);
97 struct iovec
*local_iov_head
= local_iov
;
98 unsigned int nlocal_iov
= niov
;
101 nlocal_iov
= iov_copy(local_iov
, nlocal_iov
,
103 0, iov_size(iov
, niov
));
105 while (nlocal_iov
> 0) {
107 len
= qio_channel_readv(ioc
, local_iov
, nlocal_iov
, errp
);
108 if (len
== QIO_CHANNEL_ERR_BLOCK
) {
109 if (qemu_in_coroutine()) {
110 qio_channel_yield(ioc
, G_IO_IN
);
112 qio_channel_wait(ioc
, G_IO_IN
);
115 } else if (len
< 0) {
117 } else if (len
== 0) {
120 "Unexpected end-of-file before all bytes were read");
128 iov_discard_front(&local_iov
, &nlocal_iov
, len
);
134 g_free(local_iov_head
);
138 int qio_channel_readv_all(QIOChannel
*ioc
,
139 const struct iovec
*iov
,
143 int ret
= qio_channel_readv_all_eof(ioc
, iov
, niov
, errp
);
148 "Unexpected end-of-file before all bytes were read");
149 } else if (ret
== 1) {
155 int qio_channel_writev_all(QIOChannel
*ioc
,
156 const struct iovec
*iov
,
161 struct iovec
*local_iov
= g_new(struct iovec
, niov
);
162 struct iovec
*local_iov_head
= local_iov
;
163 unsigned int nlocal_iov
= niov
;
165 nlocal_iov
= iov_copy(local_iov
, nlocal_iov
,
167 0, iov_size(iov
, niov
));
169 while (nlocal_iov
> 0) {
171 len
= qio_channel_writev(ioc
, local_iov
, nlocal_iov
, errp
);
172 if (len
== QIO_CHANNEL_ERR_BLOCK
) {
173 if (qemu_in_coroutine()) {
174 qio_channel_yield(ioc
, G_IO_OUT
);
176 qio_channel_wait(ioc
, G_IO_OUT
);
184 iov_discard_front(&local_iov
, &nlocal_iov
, len
);
189 g_free(local_iov_head
);
193 ssize_t
qio_channel_readv(QIOChannel
*ioc
,
194 const struct iovec
*iov
,
198 return qio_channel_readv_full(ioc
, iov
, niov
, NULL
, NULL
, errp
);
202 ssize_t
qio_channel_writev(QIOChannel
*ioc
,
203 const struct iovec
*iov
,
207 return qio_channel_writev_full(ioc
, iov
, niov
, NULL
, 0, errp
);
211 ssize_t
qio_channel_read(QIOChannel
*ioc
,
216 struct iovec iov
= { .iov_base
= buf
, .iov_len
= buflen
};
217 return qio_channel_readv_full(ioc
, &iov
, 1, NULL
, NULL
, errp
);
221 ssize_t
qio_channel_write(QIOChannel
*ioc
,
226 struct iovec iov
= { .iov_base
= (char *)buf
, .iov_len
= buflen
};
227 return qio_channel_writev_full(ioc
, &iov
, 1, NULL
, 0, errp
);
231 int qio_channel_read_all_eof(QIOChannel
*ioc
,
236 struct iovec iov
= { .iov_base
= buf
, .iov_len
= buflen
};
237 return qio_channel_readv_all_eof(ioc
, &iov
, 1, errp
);
241 int qio_channel_read_all(QIOChannel
*ioc
,
246 struct iovec iov
= { .iov_base
= buf
, .iov_len
= buflen
};
247 return qio_channel_readv_all(ioc
, &iov
, 1, errp
);
251 int qio_channel_write_all(QIOChannel
*ioc
,
256 struct iovec iov
= { .iov_base
= (char *)buf
, .iov_len
= buflen
};
257 return qio_channel_writev_all(ioc
, &iov
, 1, errp
);
261 int qio_channel_set_blocking(QIOChannel
*ioc
,
265 QIOChannelClass
*klass
= QIO_CHANNEL_GET_CLASS(ioc
);
266 return klass
->io_set_blocking(ioc
, enabled
, errp
);
270 int qio_channel_close(QIOChannel
*ioc
,
273 QIOChannelClass
*klass
= QIO_CHANNEL_GET_CLASS(ioc
);
274 return klass
->io_close(ioc
, errp
);
278 GSource
*qio_channel_create_watch(QIOChannel
*ioc
,
279 GIOCondition condition
)
281 QIOChannelClass
*klass
= QIO_CHANNEL_GET_CLASS(ioc
);
282 GSource
*ret
= klass
->io_create_watch(ioc
, condition
);
285 g_source_set_name(ret
, ioc
->name
);
292 void qio_channel_set_aio_fd_handler(QIOChannel
*ioc
,
298 QIOChannelClass
*klass
= QIO_CHANNEL_GET_CLASS(ioc
);
300 klass
->io_set_aio_fd_handler(ioc
, ctx
, io_read
, io_write
, opaque
);
303 guint
qio_channel_add_watch_full(QIOChannel
*ioc
,
304 GIOCondition condition
,
307 GDestroyNotify notify
,
308 GMainContext
*context
)
313 source
= qio_channel_create_watch(ioc
, condition
);
315 g_source_set_callback(source
, (GSourceFunc
)func
, user_data
, notify
);
317 id
= g_source_attach(source
, context
);
318 g_source_unref(source
);
323 guint
qio_channel_add_watch(QIOChannel
*ioc
,
324 GIOCondition condition
,
327 GDestroyNotify notify
)
329 return qio_channel_add_watch_full(ioc
, condition
, func
,
330 user_data
, notify
, NULL
);
333 GSource
*qio_channel_add_watch_source(QIOChannel
*ioc
,
334 GIOCondition condition
,
337 GDestroyNotify notify
,
338 GMainContext
*context
)
343 id
= qio_channel_add_watch_full(ioc
, condition
, func
,
344 user_data
, notify
, context
);
345 source
= g_main_context_find_source_by_id(context
, id
);
346 g_source_ref(source
);
351 int qio_channel_shutdown(QIOChannel
*ioc
,
352 QIOChannelShutdown how
,
355 QIOChannelClass
*klass
= QIO_CHANNEL_GET_CLASS(ioc
);
357 if (!klass
->io_shutdown
) {
358 error_setg(errp
, "Data path shutdown not supported");
362 return klass
->io_shutdown(ioc
, how
, errp
);
366 void qio_channel_set_delay(QIOChannel
*ioc
,
369 QIOChannelClass
*klass
= QIO_CHANNEL_GET_CLASS(ioc
);
371 if (klass
->io_set_delay
) {
372 klass
->io_set_delay(ioc
, enabled
);
377 void qio_channel_set_cork(QIOChannel
*ioc
,
380 QIOChannelClass
*klass
= QIO_CHANNEL_GET_CLASS(ioc
);
382 if (klass
->io_set_cork
) {
383 klass
->io_set_cork(ioc
, enabled
);
388 off_t
qio_channel_io_seek(QIOChannel
*ioc
,
393 QIOChannelClass
*klass
= QIO_CHANNEL_GET_CLASS(ioc
);
395 if (!klass
->io_seek
) {
396 error_setg(errp
, "Channel does not support random access");
400 return klass
->io_seek(ioc
, offset
, whence
, errp
);
404 static void qio_channel_restart_read(void *opaque
)
406 QIOChannel
*ioc
= opaque
;
407 Coroutine
*co
= ioc
->read_coroutine
;
409 /* Assert that aio_co_wake() reenters the coroutine directly */
410 assert(qemu_get_current_aio_context() ==
411 qemu_coroutine_get_aio_context(co
));
415 static void qio_channel_restart_write(void *opaque
)
417 QIOChannel
*ioc
= opaque
;
418 Coroutine
*co
= ioc
->write_coroutine
;
420 /* Assert that aio_co_wake() reenters the coroutine directly */
421 assert(qemu_get_current_aio_context() ==
422 qemu_coroutine_get_aio_context(co
));
426 static void qio_channel_set_aio_fd_handlers(QIOChannel
*ioc
)
428 IOHandler
*rd_handler
= NULL
, *wr_handler
= NULL
;
431 if (ioc
->read_coroutine
) {
432 rd_handler
= qio_channel_restart_read
;
434 if (ioc
->write_coroutine
) {
435 wr_handler
= qio_channel_restart_write
;
438 ctx
= ioc
->ctx
? ioc
->ctx
: iohandler_get_aio_context();
439 qio_channel_set_aio_fd_handler(ioc
, ctx
, rd_handler
, wr_handler
, ioc
);
442 void qio_channel_attach_aio_context(QIOChannel
*ioc
,
445 assert(!ioc
->read_coroutine
);
446 assert(!ioc
->write_coroutine
);
450 void qio_channel_detach_aio_context(QIOChannel
*ioc
)
452 ioc
->read_coroutine
= NULL
;
453 ioc
->write_coroutine
= NULL
;
454 qio_channel_set_aio_fd_handlers(ioc
);
458 void coroutine_fn
qio_channel_yield(QIOChannel
*ioc
,
459 GIOCondition condition
)
461 assert(qemu_in_coroutine());
462 if (condition
== G_IO_IN
) {
463 assert(!ioc
->read_coroutine
);
464 ioc
->read_coroutine
= qemu_coroutine_self();
465 } else if (condition
== G_IO_OUT
) {
466 assert(!ioc
->write_coroutine
);
467 ioc
->write_coroutine
= qemu_coroutine_self();
471 qio_channel_set_aio_fd_handlers(ioc
);
472 qemu_coroutine_yield();
474 /* Allow interrupting the operation by reentering the coroutine other than
475 * through the aio_fd_handlers. */
476 if (condition
== G_IO_IN
&& ioc
->read_coroutine
) {
477 ioc
->read_coroutine
= NULL
;
478 qio_channel_set_aio_fd_handlers(ioc
);
479 } else if (condition
== G_IO_OUT
&& ioc
->write_coroutine
) {
480 ioc
->write_coroutine
= NULL
;
481 qio_channel_set_aio_fd_handlers(ioc
);
486 static gboolean
qio_channel_wait_complete(QIOChannel
*ioc
,
487 GIOCondition condition
,
490 GMainLoop
*loop
= opaque
;
492 g_main_loop_quit(loop
);
497 void qio_channel_wait(QIOChannel
*ioc
,
498 GIOCondition condition
)
500 GMainContext
*ctxt
= g_main_context_new();
501 GMainLoop
*loop
= g_main_loop_new(ctxt
, TRUE
);
504 source
= qio_channel_create_watch(ioc
, condition
);
506 g_source_set_callback(source
,
507 (GSourceFunc
)qio_channel_wait_complete
,
511 g_source_attach(source
, ctxt
);
513 g_main_loop_run(loop
);
515 g_source_unref(source
);
516 g_main_loop_unref(loop
);
517 g_main_context_unref(ctxt
);
521 static void qio_channel_finalize(Object
*obj
)
523 QIOChannel
*ioc
= QIO_CHANNEL(obj
);
529 CloseHandle(ioc
->event
);
534 static const TypeInfo qio_channel_info
= {
535 .parent
= TYPE_OBJECT
,
536 .name
= TYPE_QIO_CHANNEL
,
537 .instance_size
= sizeof(QIOChannel
),
538 .instance_finalize
= qio_channel_finalize
,
540 .class_size
= sizeof(QIOChannelClass
),
544 static void qio_channel_register_types(void)
546 type_register_static(&qio_channel_info
);
550 type_init(qio_channel_register_types
);