2 * QEMU I/O channels sockets driver
4 * Copyright (c) 2015 Red Hat, Inc.
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Lesser General Public
8 * License as published by the Free Software Foundation; either
9 * version 2.1 of the License, or (at your option) any later version.
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Lesser General Public License for more details.
16 * You should have received a copy of the GNU Lesser General Public
17 * License along with this library; if not, see <http://www.gnu.org/licenses/>.
20 #include "qemu/osdep.h"
21 #include "qapi/error.h"
22 #include "qapi/qapi-visit-sockets.h"
23 #include "qemu/module.h"
24 #include "io/channel-socket.h"
25 #include "io/channel-watch.h"
27 #include "qapi/clone-visitor.h"
29 #include <linux/errqueue.h>
30 #include <sys/socket.h>
32 #if (defined(MSG_ZEROCOPY) && defined(SO_ZEROCOPY))
33 #define QEMU_MSG_ZEROCOPY
37 #define SOCKET_MAX_FDS 16
40 qio_channel_socket_get_local_address(QIOChannelSocket
*ioc
,
43 return socket_sockaddr_to_address(&ioc
->localAddr
,
49 qio_channel_socket_get_remote_address(QIOChannelSocket
*ioc
,
52 return socket_sockaddr_to_address(&ioc
->remoteAddr
,
58 qio_channel_socket_new(void)
60 QIOChannelSocket
*sioc
;
63 sioc
= QIO_CHANNEL_SOCKET(object_new(TYPE_QIO_CHANNEL_SOCKET
));
65 sioc
->zero_copy_queued
= 0;
66 sioc
->zero_copy_sent
= 0;
68 ioc
= QIO_CHANNEL(sioc
);
69 qio_channel_set_feature(ioc
, QIO_CHANNEL_FEATURE_SHUTDOWN
);
72 ioc
->event
= CreateEvent(NULL
, FALSE
, FALSE
, NULL
);
75 trace_qio_channel_socket_new(sioc
);
82 qio_channel_socket_set_fd(QIOChannelSocket
*sioc
,
87 error_setg(errp
, "Socket is already open");
92 sioc
->remoteAddrLen
= sizeof(sioc
->remoteAddr
);
93 sioc
->localAddrLen
= sizeof(sioc
->localAddr
);
96 if (getpeername(fd
, (struct sockaddr
*)&sioc
->remoteAddr
,
97 &sioc
->remoteAddrLen
) < 0) {
98 if (errno
== ENOTCONN
) {
99 memset(&sioc
->remoteAddr
, 0, sizeof(sioc
->remoteAddr
));
100 sioc
->remoteAddrLen
= sizeof(sioc
->remoteAddr
);
102 error_setg_errno(errp
, errno
,
103 "Unable to query remote socket address");
108 if (getsockname(fd
, (struct sockaddr
*)&sioc
->localAddr
,
109 &sioc
->localAddrLen
) < 0) {
110 error_setg_errno(errp
, errno
,
111 "Unable to query local socket address");
116 if (sioc
->localAddr
.ss_family
== AF_UNIX
) {
117 QIOChannel
*ioc
= QIO_CHANNEL(sioc
);
118 qio_channel_set_feature(ioc
, QIO_CHANNEL_FEATURE_FD_PASS
);
125 sioc
->fd
= -1; /* Let the caller close FD on failure */
130 qio_channel_socket_new_fd(int fd
,
133 QIOChannelSocket
*ioc
;
135 ioc
= qio_channel_socket_new();
136 if (qio_channel_socket_set_fd(ioc
, fd
, errp
) < 0) {
137 object_unref(OBJECT(ioc
));
141 trace_qio_channel_socket_new_fd(ioc
, fd
);
147 int qio_channel_socket_connect_sync(QIOChannelSocket
*ioc
,
153 trace_qio_channel_socket_connect_sync(ioc
, addr
);
154 fd
= socket_connect(addr
, errp
);
156 trace_qio_channel_socket_connect_fail(ioc
);
160 trace_qio_channel_socket_connect_complete(ioc
, fd
);
161 if (qio_channel_socket_set_fd(ioc
, fd
, errp
) < 0) {
166 #ifdef QEMU_MSG_ZEROCOPY
168 ret
= setsockopt(fd
, SOL_SOCKET
, SO_ZEROCOPY
, &v
, sizeof(v
));
170 /* Zero copy available on host */
171 qio_channel_set_feature(QIO_CHANNEL(ioc
),
172 QIO_CHANNEL_FEATURE_WRITE_ZERO_COPY
);
180 static void qio_channel_socket_connect_worker(QIOTask
*task
,
183 QIOChannelSocket
*ioc
= QIO_CHANNEL_SOCKET(qio_task_get_source(task
));
184 SocketAddress
*addr
= opaque
;
187 qio_channel_socket_connect_sync(ioc
, addr
, &err
);
189 qio_task_set_error(task
, err
);
193 void qio_channel_socket_connect_async(QIOChannelSocket
*ioc
,
195 QIOTaskFunc callback
,
197 GDestroyNotify destroy
,
198 GMainContext
*context
)
200 QIOTask
*task
= qio_task_new(
201 OBJECT(ioc
), callback
, opaque
, destroy
);
202 SocketAddress
*addrCopy
;
204 addrCopy
= QAPI_CLONE(SocketAddress
, addr
);
206 /* socket_connect() does a non-blocking connect(), but it
207 * still blocks in DNS lookups, so we must use a thread */
208 trace_qio_channel_socket_connect_async(ioc
, addr
);
209 qio_task_run_in_thread(task
,
210 qio_channel_socket_connect_worker
,
212 (GDestroyNotify
)qapi_free_SocketAddress
,
217 int qio_channel_socket_listen_sync(QIOChannelSocket
*ioc
,
224 trace_qio_channel_socket_listen_sync(ioc
, addr
, num
);
225 fd
= socket_listen(addr
, num
, errp
);
227 trace_qio_channel_socket_listen_fail(ioc
);
231 trace_qio_channel_socket_listen_complete(ioc
, fd
);
232 if (qio_channel_socket_set_fd(ioc
, fd
, errp
) < 0) {
236 qio_channel_set_feature(QIO_CHANNEL(ioc
), QIO_CHANNEL_FEATURE_LISTEN
);
242 struct QIOChannelListenWorkerData
{
244 int num
; /* amount of expected connections */
247 static void qio_channel_listen_worker_free(gpointer opaque
)
249 struct QIOChannelListenWorkerData
*data
= opaque
;
251 qapi_free_SocketAddress(data
->addr
);
255 static void qio_channel_socket_listen_worker(QIOTask
*task
,
258 QIOChannelSocket
*ioc
= QIO_CHANNEL_SOCKET(qio_task_get_source(task
));
259 struct QIOChannelListenWorkerData
*data
= opaque
;
262 qio_channel_socket_listen_sync(ioc
, data
->addr
, data
->num
, &err
);
264 qio_task_set_error(task
, err
);
268 void qio_channel_socket_listen_async(QIOChannelSocket
*ioc
,
271 QIOTaskFunc callback
,
273 GDestroyNotify destroy
,
274 GMainContext
*context
)
276 QIOTask
*task
= qio_task_new(
277 OBJECT(ioc
), callback
, opaque
, destroy
);
278 struct QIOChannelListenWorkerData
*data
;
280 data
= g_new0(struct QIOChannelListenWorkerData
, 1);
281 data
->addr
= QAPI_CLONE(SocketAddress
, addr
);
284 /* socket_listen() blocks in DNS lookups, so we must use a thread */
285 trace_qio_channel_socket_listen_async(ioc
, addr
, num
);
286 qio_task_run_in_thread(task
,
287 qio_channel_socket_listen_worker
,
289 qio_channel_listen_worker_free
,
294 int qio_channel_socket_dgram_sync(QIOChannelSocket
*ioc
,
295 SocketAddress
*localAddr
,
296 SocketAddress
*remoteAddr
,
301 trace_qio_channel_socket_dgram_sync(ioc
, localAddr
, remoteAddr
);
302 fd
= socket_dgram(remoteAddr
, localAddr
, errp
);
304 trace_qio_channel_socket_dgram_fail(ioc
);
308 trace_qio_channel_socket_dgram_complete(ioc
, fd
);
309 if (qio_channel_socket_set_fd(ioc
, fd
, errp
) < 0) {
318 struct QIOChannelSocketDGramWorkerData
{
319 SocketAddress
*localAddr
;
320 SocketAddress
*remoteAddr
;
324 static void qio_channel_socket_dgram_worker_free(gpointer opaque
)
326 struct QIOChannelSocketDGramWorkerData
*data
= opaque
;
327 qapi_free_SocketAddress(data
->localAddr
);
328 qapi_free_SocketAddress(data
->remoteAddr
);
332 static void qio_channel_socket_dgram_worker(QIOTask
*task
,
335 QIOChannelSocket
*ioc
= QIO_CHANNEL_SOCKET(qio_task_get_source(task
));
336 struct QIOChannelSocketDGramWorkerData
*data
= opaque
;
339 /* socket_dgram() blocks in DNS lookups, so we must use a thread */
340 qio_channel_socket_dgram_sync(ioc
, data
->localAddr
,
341 data
->remoteAddr
, &err
);
343 qio_task_set_error(task
, err
);
347 void qio_channel_socket_dgram_async(QIOChannelSocket
*ioc
,
348 SocketAddress
*localAddr
,
349 SocketAddress
*remoteAddr
,
350 QIOTaskFunc callback
,
352 GDestroyNotify destroy
,
353 GMainContext
*context
)
355 QIOTask
*task
= qio_task_new(
356 OBJECT(ioc
), callback
, opaque
, destroy
);
357 struct QIOChannelSocketDGramWorkerData
*data
= g_new0(
358 struct QIOChannelSocketDGramWorkerData
, 1);
360 data
->localAddr
= QAPI_CLONE(SocketAddress
, localAddr
);
361 data
->remoteAddr
= QAPI_CLONE(SocketAddress
, remoteAddr
);
363 trace_qio_channel_socket_dgram_async(ioc
, localAddr
, remoteAddr
);
364 qio_task_run_in_thread(task
,
365 qio_channel_socket_dgram_worker
,
367 qio_channel_socket_dgram_worker_free
,
373 qio_channel_socket_accept(QIOChannelSocket
*ioc
,
376 QIOChannelSocket
*cioc
;
378 cioc
= qio_channel_socket_new();
379 cioc
->remoteAddrLen
= sizeof(ioc
->remoteAddr
);
380 cioc
->localAddrLen
= sizeof(ioc
->localAddr
);
383 trace_qio_channel_socket_accept(ioc
);
384 cioc
->fd
= qemu_accept(ioc
->fd
, (struct sockaddr
*)&cioc
->remoteAddr
,
385 &cioc
->remoteAddrLen
);
387 if (errno
== EINTR
) {
390 error_setg_errno(errp
, errno
, "Unable to accept connection");
391 trace_qio_channel_socket_accept_fail(ioc
);
395 if (getsockname(cioc
->fd
, (struct sockaddr
*)&cioc
->localAddr
,
396 &cioc
->localAddrLen
) < 0) {
397 error_setg_errno(errp
, errno
,
398 "Unable to query local socket address");
403 if (cioc
->localAddr
.ss_family
== AF_UNIX
) {
404 QIOChannel
*ioc_local
= QIO_CHANNEL(cioc
);
405 qio_channel_set_feature(ioc_local
, QIO_CHANNEL_FEATURE_FD_PASS
);
409 trace_qio_channel_socket_accept_complete(ioc
, cioc
, cioc
->fd
);
413 object_unref(OBJECT(cioc
));
417 static void qio_channel_socket_init(Object
*obj
)
419 QIOChannelSocket
*ioc
= QIO_CHANNEL_SOCKET(obj
);
423 static void qio_channel_socket_finalize(Object
*obj
)
425 QIOChannelSocket
*ioc
= QIO_CHANNEL_SOCKET(obj
);
428 QIOChannel
*ioc_local
= QIO_CHANNEL(ioc
);
429 if (qio_channel_has_feature(ioc_local
, QIO_CHANNEL_FEATURE_LISTEN
)) {
432 socket_listen_cleanup(ioc
->fd
, &err
);
434 error_report_err(err
);
439 WSAEventSelect(ioc
->fd
, NULL
, 0);
441 closesocket(ioc
->fd
);
448 static void qio_channel_socket_copy_fds(struct msghdr
*msg
,
449 int **fds
, size_t *nfds
)
451 struct cmsghdr
*cmsg
;
456 for (cmsg
= CMSG_FIRSTHDR(msg
); cmsg
; cmsg
= CMSG_NXTHDR(msg
, cmsg
)) {
460 if (cmsg
->cmsg_len
< CMSG_LEN(sizeof(int)) ||
461 cmsg
->cmsg_level
!= SOL_SOCKET
||
462 cmsg
->cmsg_type
!= SCM_RIGHTS
) {
466 fd_size
= cmsg
->cmsg_len
- CMSG_LEN(0);
472 gotfds
= fd_size
/ sizeof(int);
473 *fds
= g_renew(int, *fds
, *nfds
+ gotfds
);
474 memcpy(*fds
+ *nfds
, CMSG_DATA(cmsg
), fd_size
);
476 for (i
= 0; i
< gotfds
; i
++) {
477 int fd
= (*fds
)[*nfds
+ i
];
482 /* O_NONBLOCK is preserved across SCM_RIGHTS so reset it */
483 qemu_socket_set_block(fd
);
485 #ifndef MSG_CMSG_CLOEXEC
486 qemu_set_cloexec(fd
);
494 static ssize_t
qio_channel_socket_readv(QIOChannel
*ioc
,
495 const struct iovec
*iov
,
501 QIOChannelSocket
*sioc
= QIO_CHANNEL_SOCKET(ioc
);
503 struct msghdr msg
= { NULL
, };
504 char control
[CMSG_SPACE(sizeof(int) * SOCKET_MAX_FDS
)];
507 memset(control
, 0, CMSG_SPACE(sizeof(int) * SOCKET_MAX_FDS
));
509 msg
.msg_iov
= (struct iovec
*)iov
;
510 msg
.msg_iovlen
= niov
;
512 msg
.msg_control
= control
;
513 msg
.msg_controllen
= sizeof(control
);
514 #ifdef MSG_CMSG_CLOEXEC
515 sflags
|= MSG_CMSG_CLOEXEC
;
521 ret
= recvmsg(sioc
->fd
, &msg
, sflags
);
523 if (errno
== EAGAIN
) {
524 return QIO_CHANNEL_ERR_BLOCK
;
526 if (errno
== EINTR
) {
530 error_setg_errno(errp
, errno
,
531 "Unable to read from socket");
536 qio_channel_socket_copy_fds(&msg
, fds
, nfds
);
542 static ssize_t
qio_channel_socket_writev(QIOChannel
*ioc
,
543 const struct iovec
*iov
,
550 QIOChannelSocket
*sioc
= QIO_CHANNEL_SOCKET(ioc
);
552 struct msghdr msg
= { NULL
, };
553 char control
[CMSG_SPACE(sizeof(int) * SOCKET_MAX_FDS
)];
554 size_t fdsize
= sizeof(int) * nfds
;
555 struct cmsghdr
*cmsg
;
558 memset(control
, 0, CMSG_SPACE(sizeof(int) * SOCKET_MAX_FDS
));
560 msg
.msg_iov
= (struct iovec
*)iov
;
561 msg
.msg_iovlen
= niov
;
564 if (nfds
> SOCKET_MAX_FDS
) {
565 error_setg_errno(errp
, EINVAL
,
566 "Only %d FDs can be sent, got %zu",
567 SOCKET_MAX_FDS
, nfds
);
571 msg
.msg_control
= control
;
572 msg
.msg_controllen
= CMSG_SPACE(sizeof(int) * nfds
);
574 cmsg
= CMSG_FIRSTHDR(&msg
);
575 cmsg
->cmsg_len
= CMSG_LEN(fdsize
);
576 cmsg
->cmsg_level
= SOL_SOCKET
;
577 cmsg
->cmsg_type
= SCM_RIGHTS
;
578 memcpy(CMSG_DATA(cmsg
), fds
, fdsize
);
581 if (flags
& QIO_CHANNEL_WRITE_FLAG_ZERO_COPY
) {
582 #ifdef QEMU_MSG_ZEROCOPY
583 sflags
= MSG_ZEROCOPY
;
586 * We expect QIOChannel class entry point to have
587 * blocked this code path already
589 g_assert_not_reached();
594 ret
= sendmsg(sioc
->fd
, &msg
, sflags
);
598 return QIO_CHANNEL_ERR_BLOCK
;
602 if (flags
& QIO_CHANNEL_WRITE_FLAG_ZERO_COPY
) {
603 error_setg_errno(errp
, errno
,
604 "Process can't lock enough memory for using MSG_ZEROCOPY");
610 error_setg_errno(errp
, errno
,
611 "Unable to write to socket");
615 if (flags
& QIO_CHANNEL_WRITE_FLAG_ZERO_COPY
) {
616 sioc
->zero_copy_queued
++;
622 static ssize_t
qio_channel_socket_readv(QIOChannel
*ioc
,
623 const struct iovec
*iov
,
629 QIOChannelSocket
*sioc
= QIO_CHANNEL_SOCKET(ioc
);
633 for (i
= 0; i
< niov
; i
++) {
641 if (errno
== EAGAIN
) {
645 return QIO_CHANNEL_ERR_BLOCK
;
647 } else if (errno
== EINTR
) {
650 error_setg_errno(errp
, errno
,
651 "Unable to read from socket");
656 if (ret
< iov
[i
].iov_len
) {
664 static ssize_t
qio_channel_socket_writev(QIOChannel
*ioc
,
665 const struct iovec
*iov
,
672 QIOChannelSocket
*sioc
= QIO_CHANNEL_SOCKET(ioc
);
676 for (i
= 0; i
< niov
; i
++) {
684 if (errno
== EAGAIN
) {
688 return QIO_CHANNEL_ERR_BLOCK
;
690 } else if (errno
== EINTR
) {
693 error_setg_errno(errp
, errno
,
694 "Unable to write to socket");
699 if (ret
< iov
[i
].iov_len
) {
709 #ifdef QEMU_MSG_ZEROCOPY
710 static int qio_channel_socket_flush(QIOChannel
*ioc
,
713 QIOChannelSocket
*sioc
= QIO_CHANNEL_SOCKET(ioc
);
714 struct msghdr msg
= {};
715 struct sock_extended_err
*serr
;
717 char control
[CMSG_SPACE(sizeof(*serr
))];
721 msg
.msg_control
= control
;
722 msg
.msg_controllen
= sizeof(control
);
723 memset(control
, 0, sizeof(control
));
725 while (sioc
->zero_copy_sent
< sioc
->zero_copy_queued
) {
726 received
= recvmsg(sioc
->fd
, &msg
, MSG_ERRQUEUE
);
730 /* Nothing on errqueue, wait until something is available */
731 qio_channel_wait(ioc
, G_IO_ERR
);
736 error_setg_errno(errp
, errno
,
737 "Unable to read errqueue");
742 cm
= CMSG_FIRSTHDR(&msg
);
743 if (cm
->cmsg_level
!= SOL_IP
&&
744 cm
->cmsg_type
!= IP_RECVERR
) {
745 error_setg_errno(errp
, EPROTOTYPE
,
746 "Wrong cmsg in errqueue");
750 serr
= (void *) CMSG_DATA(cm
);
751 if (serr
->ee_errno
!= SO_EE_ORIGIN_NONE
) {
752 error_setg_errno(errp
, serr
->ee_errno
,
756 if (serr
->ee_origin
!= SO_EE_ORIGIN_ZEROCOPY
) {
757 error_setg_errno(errp
, serr
->ee_origin
,
758 "Error not from zero copy");
762 /* No errors, count successfully finished sendmsg()*/
763 sioc
->zero_copy_sent
+= serr
->ee_data
- serr
->ee_info
+ 1;
765 /* If any sendmsg() succeeded using zero copy, return 0 at the end */
766 if (serr
->ee_code
!= SO_EE_CODE_ZEROCOPY_COPIED
) {
774 #endif /* QEMU_MSG_ZEROCOPY */
777 qio_channel_socket_set_blocking(QIOChannel
*ioc
,
781 QIOChannelSocket
*sioc
= QIO_CHANNEL_SOCKET(ioc
);
784 qemu_socket_set_block(sioc
->fd
);
786 qemu_socket_set_nonblock(sioc
->fd
);
793 qio_channel_socket_set_delay(QIOChannel
*ioc
,
796 QIOChannelSocket
*sioc
= QIO_CHANNEL_SOCKET(ioc
);
797 int v
= enabled
? 0 : 1;
800 IPPROTO_TCP
, TCP_NODELAY
,
806 qio_channel_socket_set_cork(QIOChannel
*ioc
,
809 QIOChannelSocket
*sioc
= QIO_CHANNEL_SOCKET(ioc
);
810 int v
= enabled
? 1 : 0;
812 socket_set_cork(sioc
->fd
, v
);
817 qio_channel_socket_close(QIOChannel
*ioc
,
820 QIOChannelSocket
*sioc
= QIO_CHANNEL_SOCKET(ioc
);
824 if (sioc
->fd
!= -1) {
826 WSAEventSelect(sioc
->fd
, NULL
, 0);
828 if (qio_channel_has_feature(ioc
, QIO_CHANNEL_FEATURE_LISTEN
)) {
829 socket_listen_cleanup(sioc
->fd
, errp
);
832 if (closesocket(sioc
->fd
) < 0) {
834 error_setg_errno(&err
, errno
, "Unable to close socket");
835 error_propagate(errp
, err
);
844 qio_channel_socket_shutdown(QIOChannel
*ioc
,
845 QIOChannelShutdown how
,
848 QIOChannelSocket
*sioc
= QIO_CHANNEL_SOCKET(ioc
);
852 case QIO_CHANNEL_SHUTDOWN_READ
:
855 case QIO_CHANNEL_SHUTDOWN_WRITE
:
858 case QIO_CHANNEL_SHUTDOWN_BOTH
:
864 if (shutdown(sioc
->fd
, sockhow
) < 0) {
865 error_setg_errno(errp
, errno
,
866 "Unable to shutdown socket");
872 static void qio_channel_socket_set_aio_fd_handler(QIOChannel
*ioc
,
878 QIOChannelSocket
*sioc
= QIO_CHANNEL_SOCKET(ioc
);
879 aio_set_fd_handler(ctx
, sioc
->fd
, false,
880 io_read
, io_write
, NULL
, NULL
, opaque
);
883 static GSource
*qio_channel_socket_create_watch(QIOChannel
*ioc
,
884 GIOCondition condition
)
886 QIOChannelSocket
*sioc
= QIO_CHANNEL_SOCKET(ioc
);
887 return qio_channel_create_socket_watch(ioc
,
892 static void qio_channel_socket_class_init(ObjectClass
*klass
,
893 void *class_data G_GNUC_UNUSED
)
895 QIOChannelClass
*ioc_klass
= QIO_CHANNEL_CLASS(klass
);
897 ioc_klass
->io_writev
= qio_channel_socket_writev
;
898 ioc_klass
->io_readv
= qio_channel_socket_readv
;
899 ioc_klass
->io_set_blocking
= qio_channel_socket_set_blocking
;
900 ioc_klass
->io_close
= qio_channel_socket_close
;
901 ioc_klass
->io_shutdown
= qio_channel_socket_shutdown
;
902 ioc_klass
->io_set_cork
= qio_channel_socket_set_cork
;
903 ioc_klass
->io_set_delay
= qio_channel_socket_set_delay
;
904 ioc_klass
->io_create_watch
= qio_channel_socket_create_watch
;
905 ioc_klass
->io_set_aio_fd_handler
= qio_channel_socket_set_aio_fd_handler
;
906 #ifdef QEMU_MSG_ZEROCOPY
907 ioc_klass
->io_flush
= qio_channel_socket_flush
;
911 static const TypeInfo qio_channel_socket_info
= {
912 .parent
= TYPE_QIO_CHANNEL
,
913 .name
= TYPE_QIO_CHANNEL_SOCKET
,
914 .instance_size
= sizeof(QIOChannelSocket
),
915 .instance_init
= qio_channel_socket_init
,
916 .instance_finalize
= qio_channel_socket_finalize
,
917 .class_init
= qio_channel_socket_class_init
,
920 static void qio_channel_socket_register_types(void)
922 type_register_static(&qio_channel_socket_info
);
925 type_init(qio_channel_socket_register_types
);