4 * Copyright (c) 2015 Red Hat, Inc.
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Lesser General Public
8 * License as published by the Free Software Foundation; either
9 * version 2 of the License, or (at your option) any later version.
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Lesser General Public License for more details.
16 * You should have received a copy of the GNU Lesser General Public
17 * License along with this library; if not, see <http://www.gnu.org/licenses/>.
21 #include "qemu/osdep.h"
22 #include "io/channel.h"
23 #include "qapi/error.h"
24 #include "qemu/main-loop.h"
27 bool qio_channel_has_feature(QIOChannel
*ioc
,
28 QIOChannelFeature feature
)
30 return ioc
->features
& (1 << feature
);
34 void qio_channel_set_feature(QIOChannel
*ioc
,
35 QIOChannelFeature feature
)
37 ioc
->features
|= (1 << feature
);
41 void qio_channel_set_name(QIOChannel
*ioc
,
45 ioc
->name
= g_strdup(name
);
49 ssize_t
qio_channel_readv_full(QIOChannel
*ioc
,
50 const struct iovec
*iov
,
56 QIOChannelClass
*klass
= QIO_CHANNEL_GET_CLASS(ioc
);
59 !qio_channel_has_feature(ioc
, QIO_CHANNEL_FEATURE_FD_PASS
)) {
60 error_setg_errno(errp
, EINVAL
,
61 "Channel does not support file descriptor passing");
65 return klass
->io_readv(ioc
, iov
, niov
, fds
, nfds
, errp
);
69 ssize_t
qio_channel_writev_full(QIOChannel
*ioc
,
70 const struct iovec
*iov
,
76 QIOChannelClass
*klass
= QIO_CHANNEL_GET_CLASS(ioc
);
79 !qio_channel_has_feature(ioc
, QIO_CHANNEL_FEATURE_FD_PASS
)) {
80 error_setg_errno(errp
, EINVAL
,
81 "Channel does not support file descriptor passing");
85 return klass
->io_writev(ioc
, iov
, niov
, fds
, nfds
, errp
);
89 int qio_channel_readv_all_eof(QIOChannel
*ioc
,
90 const struct iovec
*iov
,
95 struct iovec
*local_iov
= g_new(struct iovec
, niov
);
96 struct iovec
*local_iov_head
= local_iov
;
97 unsigned int nlocal_iov
= niov
;
100 nlocal_iov
= iov_copy(local_iov
, nlocal_iov
,
102 0, iov_size(iov
, niov
));
104 while (nlocal_iov
> 0) {
106 len
= qio_channel_readv(ioc
, local_iov
, nlocal_iov
, errp
);
107 if (len
== QIO_CHANNEL_ERR_BLOCK
) {
108 if (qemu_in_coroutine()) {
109 qio_channel_yield(ioc
, G_IO_IN
);
111 qio_channel_wait(ioc
, G_IO_IN
);
114 } else if (len
< 0) {
116 } else if (len
== 0) {
119 "Unexpected end-of-file before all bytes were read");
127 iov_discard_front(&local_iov
, &nlocal_iov
, len
);
133 g_free(local_iov_head
);
137 int qio_channel_readv_all(QIOChannel
*ioc
,
138 const struct iovec
*iov
,
142 int ret
= qio_channel_readv_all_eof(ioc
, iov
, niov
, errp
);
147 "Unexpected end-of-file before all bytes were read");
148 } else if (ret
== 1) {
154 int qio_channel_writev_all(QIOChannel
*ioc
,
155 const struct iovec
*iov
,
160 struct iovec
*local_iov
= g_new(struct iovec
, niov
);
161 struct iovec
*local_iov_head
= local_iov
;
162 unsigned int nlocal_iov
= niov
;
164 nlocal_iov
= iov_copy(local_iov
, nlocal_iov
,
166 0, iov_size(iov
, niov
));
168 while (nlocal_iov
> 0) {
170 len
= qio_channel_writev(ioc
, local_iov
, nlocal_iov
, errp
);
171 if (len
== QIO_CHANNEL_ERR_BLOCK
) {
172 if (qemu_in_coroutine()) {
173 qio_channel_yield(ioc
, G_IO_OUT
);
175 qio_channel_wait(ioc
, G_IO_OUT
);
183 iov_discard_front(&local_iov
, &nlocal_iov
, len
);
188 g_free(local_iov_head
);
192 ssize_t
qio_channel_readv(QIOChannel
*ioc
,
193 const struct iovec
*iov
,
197 return qio_channel_readv_full(ioc
, iov
, niov
, NULL
, NULL
, errp
);
201 ssize_t
qio_channel_writev(QIOChannel
*ioc
,
202 const struct iovec
*iov
,
206 return qio_channel_writev_full(ioc
, iov
, niov
, NULL
, 0, errp
);
210 ssize_t
qio_channel_read(QIOChannel
*ioc
,
215 struct iovec iov
= { .iov_base
= buf
, .iov_len
= buflen
};
216 return qio_channel_readv_full(ioc
, &iov
, 1, NULL
, NULL
, errp
);
220 ssize_t
qio_channel_write(QIOChannel
*ioc
,
225 struct iovec iov
= { .iov_base
= (char *)buf
, .iov_len
= buflen
};
226 return qio_channel_writev_full(ioc
, &iov
, 1, NULL
, 0, errp
);
230 int qio_channel_read_all_eof(QIOChannel
*ioc
,
235 struct iovec iov
= { .iov_base
= buf
, .iov_len
= buflen
};
236 return qio_channel_readv_all_eof(ioc
, &iov
, 1, errp
);
240 int qio_channel_read_all(QIOChannel
*ioc
,
245 struct iovec iov
= { .iov_base
= buf
, .iov_len
= buflen
};
246 return qio_channel_readv_all(ioc
, &iov
, 1, errp
);
250 int qio_channel_write_all(QIOChannel
*ioc
,
255 struct iovec iov
= { .iov_base
= (char *)buf
, .iov_len
= buflen
};
256 return qio_channel_writev_all(ioc
, &iov
, 1, errp
);
260 int qio_channel_set_blocking(QIOChannel
*ioc
,
264 QIOChannelClass
*klass
= QIO_CHANNEL_GET_CLASS(ioc
);
265 return klass
->io_set_blocking(ioc
, enabled
, errp
);
269 int qio_channel_close(QIOChannel
*ioc
,
272 QIOChannelClass
*klass
= QIO_CHANNEL_GET_CLASS(ioc
);
273 return klass
->io_close(ioc
, errp
);
277 GSource
*qio_channel_create_watch(QIOChannel
*ioc
,
278 GIOCondition condition
)
280 QIOChannelClass
*klass
= QIO_CHANNEL_GET_CLASS(ioc
);
281 GSource
*ret
= klass
->io_create_watch(ioc
, condition
);
284 g_source_set_name(ret
, ioc
->name
);
291 void qio_channel_set_aio_fd_handler(QIOChannel
*ioc
,
297 QIOChannelClass
*klass
= QIO_CHANNEL_GET_CLASS(ioc
);
299 klass
->io_set_aio_fd_handler(ioc
, ctx
, io_read
, io_write
, opaque
);
302 guint
qio_channel_add_watch(QIOChannel
*ioc
,
303 GIOCondition condition
,
306 GDestroyNotify notify
)
311 source
= qio_channel_create_watch(ioc
, condition
);
313 g_source_set_callback(source
, (GSourceFunc
)func
, user_data
, notify
);
315 id
= g_source_attach(source
, NULL
);
316 g_source_unref(source
);
322 int qio_channel_shutdown(QIOChannel
*ioc
,
323 QIOChannelShutdown how
,
326 QIOChannelClass
*klass
= QIO_CHANNEL_GET_CLASS(ioc
);
328 if (!klass
->io_shutdown
) {
329 error_setg(errp
, "Data path shutdown not supported");
333 return klass
->io_shutdown(ioc
, how
, errp
);
337 void qio_channel_set_delay(QIOChannel
*ioc
,
340 QIOChannelClass
*klass
= QIO_CHANNEL_GET_CLASS(ioc
);
342 if (klass
->io_set_delay
) {
343 klass
->io_set_delay(ioc
, enabled
);
348 void qio_channel_set_cork(QIOChannel
*ioc
,
351 QIOChannelClass
*klass
= QIO_CHANNEL_GET_CLASS(ioc
);
353 if (klass
->io_set_cork
) {
354 klass
->io_set_cork(ioc
, enabled
);
359 off_t
qio_channel_io_seek(QIOChannel
*ioc
,
364 QIOChannelClass
*klass
= QIO_CHANNEL_GET_CLASS(ioc
);
366 if (!klass
->io_seek
) {
367 error_setg(errp
, "Channel does not support random access");
371 return klass
->io_seek(ioc
, offset
, whence
, errp
);
375 static void qio_channel_set_aio_fd_handlers(QIOChannel
*ioc
);
377 static void qio_channel_restart_read(void *opaque
)
379 QIOChannel
*ioc
= opaque
;
380 Coroutine
*co
= ioc
->read_coroutine
;
382 ioc
->read_coroutine
= NULL
;
383 qio_channel_set_aio_fd_handlers(ioc
);
387 static void qio_channel_restart_write(void *opaque
)
389 QIOChannel
*ioc
= opaque
;
390 Coroutine
*co
= ioc
->write_coroutine
;
392 ioc
->write_coroutine
= NULL
;
393 qio_channel_set_aio_fd_handlers(ioc
);
397 static void qio_channel_set_aio_fd_handlers(QIOChannel
*ioc
)
399 IOHandler
*rd_handler
= NULL
, *wr_handler
= NULL
;
402 if (ioc
->read_coroutine
) {
403 rd_handler
= qio_channel_restart_read
;
405 if (ioc
->write_coroutine
) {
406 wr_handler
= qio_channel_restart_write
;
409 ctx
= ioc
->ctx
? ioc
->ctx
: iohandler_get_aio_context();
410 qio_channel_set_aio_fd_handler(ioc
, ctx
, rd_handler
, wr_handler
, ioc
);
413 void qio_channel_attach_aio_context(QIOChannel
*ioc
,
416 assert(!ioc
->read_coroutine
);
417 assert(!ioc
->write_coroutine
);
421 void qio_channel_detach_aio_context(QIOChannel
*ioc
)
423 ioc
->read_coroutine
= NULL
;
424 ioc
->write_coroutine
= NULL
;
425 qio_channel_set_aio_fd_handlers(ioc
);
429 void coroutine_fn
qio_channel_yield(QIOChannel
*ioc
,
430 GIOCondition condition
)
432 assert(qemu_in_coroutine());
433 if (condition
== G_IO_IN
) {
434 assert(!ioc
->read_coroutine
);
435 ioc
->read_coroutine
= qemu_coroutine_self();
436 } else if (condition
== G_IO_OUT
) {
437 assert(!ioc
->write_coroutine
);
438 ioc
->write_coroutine
= qemu_coroutine_self();
442 qio_channel_set_aio_fd_handlers(ioc
);
443 qemu_coroutine_yield();
447 static gboolean
qio_channel_wait_complete(QIOChannel
*ioc
,
448 GIOCondition condition
,
451 GMainLoop
*loop
= opaque
;
453 g_main_loop_quit(loop
);
458 void qio_channel_wait(QIOChannel
*ioc
,
459 GIOCondition condition
)
461 GMainContext
*ctxt
= g_main_context_new();
462 GMainLoop
*loop
= g_main_loop_new(ctxt
, TRUE
);
465 source
= qio_channel_create_watch(ioc
, condition
);
467 g_source_set_callback(source
,
468 (GSourceFunc
)qio_channel_wait_complete
,
472 g_source_attach(source
, ctxt
);
474 g_main_loop_run(loop
);
476 g_source_unref(source
);
477 g_main_loop_unref(loop
);
478 g_main_context_unref(ctxt
);
482 static void qio_channel_finalize(Object
*obj
)
484 QIOChannel
*ioc
= QIO_CHANNEL(obj
);
490 CloseHandle(ioc
->event
);
495 static const TypeInfo qio_channel_info
= {
496 .parent
= TYPE_OBJECT
,
497 .name
= TYPE_QIO_CHANNEL
,
498 .instance_size
= sizeof(QIOChannel
),
499 .instance_finalize
= qio_channel_finalize
,
501 .class_size
= sizeof(QIOChannelClass
),
505 static void qio_channel_register_types(void)
507 type_register_static(&qio_channel_info
);
511 type_init(qio_channel_register_types
);