4 * Copyright (c) 2015 Red Hat, Inc.
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Lesser General Public
8 * License as published by the Free Software Foundation; either
9 * version 2.1 of the License, or (at your option) any later version.
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Lesser General Public License for more details.
16 * You should have received a copy of the GNU Lesser General Public
17 * License along with this library; if not, see <http://www.gnu.org/licenses/>.
21 #include "qemu/osdep.h"
22 #include "io/channel.h"
23 #include "qapi/error.h"
24 #include "qemu/main-loop.h"
25 #include "qemu/module.h"
28 bool qio_channel_has_feature(QIOChannel
*ioc
,
29 QIOChannelFeature feature
)
31 return ioc
->features
& (1 << feature
);
35 void qio_channel_set_feature(QIOChannel
*ioc
,
36 QIOChannelFeature feature
)
38 ioc
->features
|= (1 << feature
);
42 void qio_channel_set_name(QIOChannel
*ioc
,
46 ioc
->name
= g_strdup(name
);
50 ssize_t
qio_channel_readv_full(QIOChannel
*ioc
,
51 const struct iovec
*iov
,
58 QIOChannelClass
*klass
= QIO_CHANNEL_GET_CLASS(ioc
);
61 !qio_channel_has_feature(ioc
, QIO_CHANNEL_FEATURE_FD_PASS
)) {
62 error_setg_errno(errp
, EINVAL
,
63 "Channel does not support file descriptor passing");
67 if ((flags
& QIO_CHANNEL_READ_FLAG_MSG_PEEK
) &&
68 !qio_channel_has_feature(ioc
, QIO_CHANNEL_FEATURE_READ_MSG_PEEK
)) {
69 error_setg_errno(errp
, EINVAL
,
70 "Channel does not support peek read");
74 return klass
->io_readv(ioc
, iov
, niov
, fds
, nfds
, flags
, errp
);
78 ssize_t
qio_channel_writev_full(QIOChannel
*ioc
,
79 const struct iovec
*iov
,
86 QIOChannelClass
*klass
= QIO_CHANNEL_GET_CLASS(ioc
);
89 if (!qio_channel_has_feature(ioc
, QIO_CHANNEL_FEATURE_FD_PASS
)) {
90 error_setg_errno(errp
, EINVAL
,
91 "Channel does not support file descriptor passing");
94 if (flags
& QIO_CHANNEL_WRITE_FLAG_ZERO_COPY
) {
95 error_setg_errno(errp
, EINVAL
,
96 "Zero Copy does not support file descriptor passing");
101 if ((flags
& QIO_CHANNEL_WRITE_FLAG_ZERO_COPY
) &&
102 !qio_channel_has_feature(ioc
, QIO_CHANNEL_FEATURE_WRITE_ZERO_COPY
)) {
103 error_setg_errno(errp
, EINVAL
,
104 "Requested Zero Copy feature is not available");
108 return klass
->io_writev(ioc
, iov
, niov
, fds
, nfds
, flags
, errp
);
112 int qio_channel_readv_all_eof(QIOChannel
*ioc
,
113 const struct iovec
*iov
,
117 return qio_channel_readv_full_all_eof(ioc
, iov
, niov
, NULL
, NULL
, errp
);
120 int qio_channel_readv_all(QIOChannel
*ioc
,
121 const struct iovec
*iov
,
125 return qio_channel_readv_full_all(ioc
, iov
, niov
, NULL
, NULL
, errp
);
128 int qio_channel_readv_full_all_eof(QIOChannel
*ioc
,
129 const struct iovec
*iov
,
131 int **fds
, size_t *nfds
,
135 struct iovec
*local_iov
= g_new(struct iovec
, niov
);
136 struct iovec
*local_iov_head
= local_iov
;
137 unsigned int nlocal_iov
= niov
;
138 int **local_fds
= fds
;
139 size_t *local_nfds
= nfds
;
140 bool partial
= false;
150 nlocal_iov
= iov_copy(local_iov
, nlocal_iov
,
152 0, iov_size(iov
, niov
));
154 while ((nlocal_iov
> 0) || local_fds
) {
156 len
= qio_channel_readv_full(ioc
, local_iov
, nlocal_iov
, local_fds
,
157 local_nfds
, 0, errp
);
158 if (len
== QIO_CHANNEL_ERR_BLOCK
) {
159 if (qemu_in_coroutine()) {
160 qio_channel_yield(ioc
, G_IO_IN
);
162 qio_channel_wait(ioc
, G_IO_IN
);
168 if (local_nfds
&& *local_nfds
) {
170 * Got some FDs, but no data yet. This isn't an EOF
171 * scenario (yet), so carry on to try to read data
172 * on next loop iteration
175 } else if (!partial
) {
176 /* No fds and no data - EOF before any data read */
182 "Unexpected end-of-file before all data were read");
183 /* Fallthrough into len < 0 handling */
188 /* Close any FDs we previously received */
191 for (i
= 0; i
< (*nfds
); i
++) {
202 iov_discard_front(&local_iov
, &nlocal_iov
, len
);
214 g_free(local_iov_head
);
218 int qio_channel_readv_full_all(QIOChannel
*ioc
,
219 const struct iovec
*iov
,
221 int **fds
, size_t *nfds
,
224 int ret
= qio_channel_readv_full_all_eof(ioc
, iov
, niov
, fds
, nfds
, errp
);
227 error_setg(errp
, "Unexpected end-of-file before all data were read");
237 int qio_channel_writev_all(QIOChannel
*ioc
,
238 const struct iovec
*iov
,
242 return qio_channel_writev_full_all(ioc
, iov
, niov
, NULL
, 0, 0, errp
);
245 int qio_channel_writev_full_all(QIOChannel
*ioc
,
246 const struct iovec
*iov
,
248 int *fds
, size_t nfds
,
249 int flags
, Error
**errp
)
252 struct iovec
*local_iov
= g_new(struct iovec
, niov
);
253 struct iovec
*local_iov_head
= local_iov
;
254 unsigned int nlocal_iov
= niov
;
256 nlocal_iov
= iov_copy(local_iov
, nlocal_iov
,
258 0, iov_size(iov
, niov
));
260 while (nlocal_iov
> 0) {
263 len
= qio_channel_writev_full(ioc
, local_iov
, nlocal_iov
, fds
,
266 if (len
== QIO_CHANNEL_ERR_BLOCK
) {
267 if (qemu_in_coroutine()) {
268 qio_channel_yield(ioc
, G_IO_OUT
);
270 qio_channel_wait(ioc
, G_IO_OUT
);
278 iov_discard_front(&local_iov
, &nlocal_iov
, len
);
286 g_free(local_iov_head
);
290 ssize_t
qio_channel_readv(QIOChannel
*ioc
,
291 const struct iovec
*iov
,
295 return qio_channel_readv_full(ioc
, iov
, niov
, NULL
, NULL
, 0, errp
);
299 ssize_t
qio_channel_writev(QIOChannel
*ioc
,
300 const struct iovec
*iov
,
304 return qio_channel_writev_full(ioc
, iov
, niov
, NULL
, 0, 0, errp
);
308 ssize_t
qio_channel_read(QIOChannel
*ioc
,
313 struct iovec iov
= { .iov_base
= buf
, .iov_len
= buflen
};
314 return qio_channel_readv_full(ioc
, &iov
, 1, NULL
, NULL
, 0, errp
);
318 ssize_t
qio_channel_write(QIOChannel
*ioc
,
323 struct iovec iov
= { .iov_base
= (char *)buf
, .iov_len
= buflen
};
324 return qio_channel_writev_full(ioc
, &iov
, 1, NULL
, 0, 0, errp
);
328 int qio_channel_read_all_eof(QIOChannel
*ioc
,
333 struct iovec iov
= { .iov_base
= buf
, .iov_len
= buflen
};
334 return qio_channel_readv_all_eof(ioc
, &iov
, 1, errp
);
338 int qio_channel_read_all(QIOChannel
*ioc
,
343 struct iovec iov
= { .iov_base
= buf
, .iov_len
= buflen
};
344 return qio_channel_readv_all(ioc
, &iov
, 1, errp
);
348 int qio_channel_write_all(QIOChannel
*ioc
,
353 struct iovec iov
= { .iov_base
= (char *)buf
, .iov_len
= buflen
};
354 return qio_channel_writev_all(ioc
, &iov
, 1, errp
);
358 int qio_channel_set_blocking(QIOChannel
*ioc
,
362 QIOChannelClass
*klass
= QIO_CHANNEL_GET_CLASS(ioc
);
363 return klass
->io_set_blocking(ioc
, enabled
, errp
);
367 int qio_channel_close(QIOChannel
*ioc
,
370 QIOChannelClass
*klass
= QIO_CHANNEL_GET_CLASS(ioc
);
371 return klass
->io_close(ioc
, errp
);
375 GSource
*qio_channel_create_watch(QIOChannel
*ioc
,
376 GIOCondition condition
)
378 QIOChannelClass
*klass
= QIO_CHANNEL_GET_CLASS(ioc
);
379 GSource
*ret
= klass
->io_create_watch(ioc
, condition
);
382 g_source_set_name(ret
, ioc
->name
);
389 void qio_channel_set_aio_fd_handler(QIOChannel
*ioc
,
395 QIOChannelClass
*klass
= QIO_CHANNEL_GET_CLASS(ioc
);
397 klass
->io_set_aio_fd_handler(ioc
, ctx
, io_read
, io_write
, opaque
);
400 guint
qio_channel_add_watch_full(QIOChannel
*ioc
,
401 GIOCondition condition
,
404 GDestroyNotify notify
,
405 GMainContext
*context
)
410 source
= qio_channel_create_watch(ioc
, condition
);
412 g_source_set_callback(source
, (GSourceFunc
)func
, user_data
, notify
);
414 id
= g_source_attach(source
, context
);
415 g_source_unref(source
);
420 guint
qio_channel_add_watch(QIOChannel
*ioc
,
421 GIOCondition condition
,
424 GDestroyNotify notify
)
426 return qio_channel_add_watch_full(ioc
, condition
, func
,
427 user_data
, notify
, NULL
);
430 GSource
*qio_channel_add_watch_source(QIOChannel
*ioc
,
431 GIOCondition condition
,
434 GDestroyNotify notify
,
435 GMainContext
*context
)
440 id
= qio_channel_add_watch_full(ioc
, condition
, func
,
441 user_data
, notify
, context
);
442 source
= g_main_context_find_source_by_id(context
, id
);
443 g_source_ref(source
);
448 int qio_channel_shutdown(QIOChannel
*ioc
,
449 QIOChannelShutdown how
,
452 QIOChannelClass
*klass
= QIO_CHANNEL_GET_CLASS(ioc
);
454 if (!klass
->io_shutdown
) {
455 error_setg(errp
, "Data path shutdown not supported");
459 return klass
->io_shutdown(ioc
, how
, errp
);
463 void qio_channel_set_delay(QIOChannel
*ioc
,
466 QIOChannelClass
*klass
= QIO_CHANNEL_GET_CLASS(ioc
);
468 if (klass
->io_set_delay
) {
469 klass
->io_set_delay(ioc
, enabled
);
474 void qio_channel_set_cork(QIOChannel
*ioc
,
477 QIOChannelClass
*klass
= QIO_CHANNEL_GET_CLASS(ioc
);
479 if (klass
->io_set_cork
) {
480 klass
->io_set_cork(ioc
, enabled
);
485 off_t
qio_channel_io_seek(QIOChannel
*ioc
,
490 QIOChannelClass
*klass
= QIO_CHANNEL_GET_CLASS(ioc
);
492 if (!klass
->io_seek
) {
493 error_setg(errp
, "Channel does not support random access");
497 return klass
->io_seek(ioc
, offset
, whence
, errp
);
500 int qio_channel_flush(QIOChannel
*ioc
,
503 QIOChannelClass
*klass
= QIO_CHANNEL_GET_CLASS(ioc
);
505 if (!klass
->io_flush
||
506 !qio_channel_has_feature(ioc
, QIO_CHANNEL_FEATURE_WRITE_ZERO_COPY
)) {
510 return klass
->io_flush(ioc
, errp
);
514 static void qio_channel_restart_read(void *opaque
)
516 QIOChannel
*ioc
= opaque
;
517 Coroutine
*co
= ioc
->read_coroutine
;
519 /* Assert that aio_co_wake() reenters the coroutine directly */
520 assert(qemu_get_current_aio_context() ==
521 qemu_coroutine_get_aio_context(co
));
525 static void qio_channel_restart_write(void *opaque
)
527 QIOChannel
*ioc
= opaque
;
528 Coroutine
*co
= ioc
->write_coroutine
;
530 /* Assert that aio_co_wake() reenters the coroutine directly */
531 assert(qemu_get_current_aio_context() ==
532 qemu_coroutine_get_aio_context(co
));
536 static void qio_channel_set_aio_fd_handlers(QIOChannel
*ioc
)
538 IOHandler
*rd_handler
= NULL
, *wr_handler
= NULL
;
541 if (ioc
->read_coroutine
) {
542 rd_handler
= qio_channel_restart_read
;
544 if (ioc
->write_coroutine
) {
545 wr_handler
= qio_channel_restart_write
;
548 ctx
= ioc
->ctx
? ioc
->ctx
: iohandler_get_aio_context();
549 qio_channel_set_aio_fd_handler(ioc
, ctx
, rd_handler
, wr_handler
, ioc
);
552 void qio_channel_attach_aio_context(QIOChannel
*ioc
,
555 assert(!ioc
->read_coroutine
);
556 assert(!ioc
->write_coroutine
);
560 void qio_channel_detach_aio_context(QIOChannel
*ioc
)
562 ioc
->read_coroutine
= NULL
;
563 ioc
->write_coroutine
= NULL
;
564 qio_channel_set_aio_fd_handlers(ioc
);
568 void coroutine_fn
qio_channel_yield(QIOChannel
*ioc
,
569 GIOCondition condition
)
571 assert(qemu_in_coroutine());
572 if (condition
== G_IO_IN
) {
573 assert(!ioc
->read_coroutine
);
574 ioc
->read_coroutine
= qemu_coroutine_self();
575 } else if (condition
== G_IO_OUT
) {
576 assert(!ioc
->write_coroutine
);
577 ioc
->write_coroutine
= qemu_coroutine_self();
581 qio_channel_set_aio_fd_handlers(ioc
);
582 qemu_coroutine_yield();
584 /* Allow interrupting the operation by reentering the coroutine other than
585 * through the aio_fd_handlers. */
586 if (condition
== G_IO_IN
&& ioc
->read_coroutine
) {
587 ioc
->read_coroutine
= NULL
;
588 qio_channel_set_aio_fd_handlers(ioc
);
589 } else if (condition
== G_IO_OUT
&& ioc
->write_coroutine
) {
590 ioc
->write_coroutine
= NULL
;
591 qio_channel_set_aio_fd_handlers(ioc
);
596 static gboolean
qio_channel_wait_complete(QIOChannel
*ioc
,
597 GIOCondition condition
,
600 GMainLoop
*loop
= opaque
;
602 g_main_loop_quit(loop
);
607 void qio_channel_wait(QIOChannel
*ioc
,
608 GIOCondition condition
)
610 GMainContext
*ctxt
= g_main_context_new();
611 GMainLoop
*loop
= g_main_loop_new(ctxt
, TRUE
);
614 source
= qio_channel_create_watch(ioc
, condition
);
616 g_source_set_callback(source
,
617 (GSourceFunc
)qio_channel_wait_complete
,
621 g_source_attach(source
, ctxt
);
623 g_main_loop_run(loop
);
625 g_source_unref(source
);
626 g_main_loop_unref(loop
);
627 g_main_context_unref(ctxt
);
631 static void qio_channel_finalize(Object
*obj
)
633 QIOChannel
*ioc
= QIO_CHANNEL(obj
);
639 CloseHandle(ioc
->event
);
644 static const TypeInfo qio_channel_info
= {
645 .parent
= TYPE_OBJECT
,
646 .name
= TYPE_QIO_CHANNEL
,
647 .instance_size
= sizeof(QIOChannel
),
648 .instance_finalize
= qio_channel_finalize
,
650 .class_size
= sizeof(QIOChannelClass
),
654 static void qio_channel_register_types(void)
656 type_register_static(&qio_channel_info
);
660 type_init(qio_channel_register_types
);