2 * QEMU I/O channels sockets driver
4 * Copyright (c) 2015 Red Hat, Inc.
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Lesser General Public
8 * License as published by the Free Software Foundation; either
9 * version 2.1 of the License, or (at your option) any later version.
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Lesser General Public License for more details.
16 * You should have received a copy of the GNU Lesser General Public
17 * License along with this library; if not, see <http://www.gnu.org/licenses/>.
20 #include "qemu/osdep.h"
21 #include "qapi/error.h"
22 #include "qapi/qapi-visit-sockets.h"
23 #include "qemu/module.h"
24 #include "io/channel-socket.h"
25 #include "io/channel-watch.h"
27 #include "qapi/clone-visitor.h"
29 #include <linux/errqueue.h>
30 #include <sys/socket.h>
32 #if (defined(MSG_ZEROCOPY) && defined(SO_ZEROCOPY))
33 #define QEMU_MSG_ZEROCOPY
37 #define SOCKET_MAX_FDS 16
40 qio_channel_socket_get_local_address(QIOChannelSocket
*ioc
,
43 return socket_sockaddr_to_address(&ioc
->localAddr
,
49 qio_channel_socket_get_remote_address(QIOChannelSocket
*ioc
,
52 return socket_sockaddr_to_address(&ioc
->remoteAddr
,
58 qio_channel_socket_new(void)
60 QIOChannelSocket
*sioc
;
63 sioc
= QIO_CHANNEL_SOCKET(object_new(TYPE_QIO_CHANNEL_SOCKET
));
65 sioc
->zero_copy_queued
= 0;
66 sioc
->zero_copy_sent
= 0;
68 ioc
= QIO_CHANNEL(sioc
);
69 qio_channel_set_feature(ioc
, QIO_CHANNEL_FEATURE_SHUTDOWN
);
72 ioc
->event
= CreateEvent(NULL
, FALSE
, FALSE
, NULL
);
75 trace_qio_channel_socket_new(sioc
);
82 qio_channel_socket_set_fd(QIOChannelSocket
*sioc
,
87 error_setg(errp
, "Socket is already open");
92 sioc
->remoteAddrLen
= sizeof(sioc
->remoteAddr
);
93 sioc
->localAddrLen
= sizeof(sioc
->localAddr
);
96 if (getpeername(fd
, (struct sockaddr
*)&sioc
->remoteAddr
,
97 &sioc
->remoteAddrLen
) < 0) {
98 if (errno
== ENOTCONN
) {
99 memset(&sioc
->remoteAddr
, 0, sizeof(sioc
->remoteAddr
));
100 sioc
->remoteAddrLen
= sizeof(sioc
->remoteAddr
);
102 error_setg_errno(errp
, errno
,
103 "Unable to query remote socket address");
108 if (getsockname(fd
, (struct sockaddr
*)&sioc
->localAddr
,
109 &sioc
->localAddrLen
) < 0) {
110 error_setg_errno(errp
, errno
,
111 "Unable to query local socket address");
116 if (sioc
->localAddr
.ss_family
== AF_UNIX
) {
117 QIOChannel
*ioc
= QIO_CHANNEL(sioc
);
118 qio_channel_set_feature(ioc
, QIO_CHANNEL_FEATURE_FD_PASS
);
125 sioc
->fd
= -1; /* Let the caller close FD on failure */
130 qio_channel_socket_new_fd(int fd
,
133 QIOChannelSocket
*ioc
;
135 ioc
= qio_channel_socket_new();
136 if (qio_channel_socket_set_fd(ioc
, fd
, errp
) < 0) {
137 object_unref(OBJECT(ioc
));
141 trace_qio_channel_socket_new_fd(ioc
, fd
);
147 int qio_channel_socket_connect_sync(QIOChannelSocket
*ioc
,
153 trace_qio_channel_socket_connect_sync(ioc
, addr
);
154 fd
= socket_connect(addr
, errp
);
156 trace_qio_channel_socket_connect_fail(ioc
);
160 trace_qio_channel_socket_connect_complete(ioc
, fd
);
161 if (qio_channel_socket_set_fd(ioc
, fd
, errp
) < 0) {
166 #ifdef QEMU_MSG_ZEROCOPY
168 ret
= setsockopt(fd
, SOL_SOCKET
, SO_ZEROCOPY
, &v
, sizeof(v
));
170 /* Zero copy available on host */
171 qio_channel_set_feature(QIO_CHANNEL(ioc
),
172 QIO_CHANNEL_FEATURE_WRITE_ZERO_COPY
);
180 static void qio_channel_socket_connect_worker(QIOTask
*task
,
183 QIOChannelSocket
*ioc
= QIO_CHANNEL_SOCKET(qio_task_get_source(task
));
184 SocketAddress
*addr
= opaque
;
187 qio_channel_socket_connect_sync(ioc
, addr
, &err
);
189 qio_task_set_error(task
, err
);
193 void qio_channel_socket_connect_async(QIOChannelSocket
*ioc
,
195 QIOTaskFunc callback
,
197 GDestroyNotify destroy
,
198 GMainContext
*context
)
200 QIOTask
*task
= qio_task_new(
201 OBJECT(ioc
), callback
, opaque
, destroy
);
202 SocketAddress
*addrCopy
;
204 addrCopy
= QAPI_CLONE(SocketAddress
, addr
);
206 /* socket_connect() does a non-blocking connect(), but it
207 * still blocks in DNS lookups, so we must use a thread */
208 trace_qio_channel_socket_connect_async(ioc
, addr
);
209 qio_task_run_in_thread(task
,
210 qio_channel_socket_connect_worker
,
212 (GDestroyNotify
)qapi_free_SocketAddress
,
217 int qio_channel_socket_listen_sync(QIOChannelSocket
*ioc
,
224 trace_qio_channel_socket_listen_sync(ioc
, addr
, num
);
225 fd
= socket_listen(addr
, num
, errp
);
227 trace_qio_channel_socket_listen_fail(ioc
);
231 trace_qio_channel_socket_listen_complete(ioc
, fd
);
232 if (qio_channel_socket_set_fd(ioc
, fd
, errp
) < 0) {
236 qio_channel_set_feature(QIO_CHANNEL(ioc
), QIO_CHANNEL_FEATURE_LISTEN
);
242 struct QIOChannelListenWorkerData
{
244 int num
; /* amount of expected connections */
247 static void qio_channel_listen_worker_free(gpointer opaque
)
249 struct QIOChannelListenWorkerData
*data
= opaque
;
251 qapi_free_SocketAddress(data
->addr
);
255 static void qio_channel_socket_listen_worker(QIOTask
*task
,
258 QIOChannelSocket
*ioc
= QIO_CHANNEL_SOCKET(qio_task_get_source(task
));
259 struct QIOChannelListenWorkerData
*data
= opaque
;
262 qio_channel_socket_listen_sync(ioc
, data
->addr
, data
->num
, &err
);
264 qio_task_set_error(task
, err
);
268 void qio_channel_socket_listen_async(QIOChannelSocket
*ioc
,
271 QIOTaskFunc callback
,
273 GDestroyNotify destroy
,
274 GMainContext
*context
)
276 QIOTask
*task
= qio_task_new(
277 OBJECT(ioc
), callback
, opaque
, destroy
);
278 struct QIOChannelListenWorkerData
*data
;
280 data
= g_new0(struct QIOChannelListenWorkerData
, 1);
281 data
->addr
= QAPI_CLONE(SocketAddress
, addr
);
284 /* socket_listen() blocks in DNS lookups, so we must use a thread */
285 trace_qio_channel_socket_listen_async(ioc
, addr
, num
);
286 qio_task_run_in_thread(task
,
287 qio_channel_socket_listen_worker
,
289 qio_channel_listen_worker_free
,
294 int qio_channel_socket_dgram_sync(QIOChannelSocket
*ioc
,
295 SocketAddress
*localAddr
,
296 SocketAddress
*remoteAddr
,
301 trace_qio_channel_socket_dgram_sync(ioc
, localAddr
, remoteAddr
);
302 fd
= socket_dgram(remoteAddr
, localAddr
, errp
);
304 trace_qio_channel_socket_dgram_fail(ioc
);
308 trace_qio_channel_socket_dgram_complete(ioc
, fd
);
309 if (qio_channel_socket_set_fd(ioc
, fd
, errp
) < 0) {
318 struct QIOChannelSocketDGramWorkerData
{
319 SocketAddress
*localAddr
;
320 SocketAddress
*remoteAddr
;
324 static void qio_channel_socket_dgram_worker_free(gpointer opaque
)
326 struct QIOChannelSocketDGramWorkerData
*data
= opaque
;
327 qapi_free_SocketAddress(data
->localAddr
);
328 qapi_free_SocketAddress(data
->remoteAddr
);
332 static void qio_channel_socket_dgram_worker(QIOTask
*task
,
335 QIOChannelSocket
*ioc
= QIO_CHANNEL_SOCKET(qio_task_get_source(task
));
336 struct QIOChannelSocketDGramWorkerData
*data
= opaque
;
339 /* socket_dgram() blocks in DNS lookups, so we must use a thread */
340 qio_channel_socket_dgram_sync(ioc
, data
->localAddr
,
341 data
->remoteAddr
, &err
);
343 qio_task_set_error(task
, err
);
347 void qio_channel_socket_dgram_async(QIOChannelSocket
*ioc
,
348 SocketAddress
*localAddr
,
349 SocketAddress
*remoteAddr
,
350 QIOTaskFunc callback
,
352 GDestroyNotify destroy
,
353 GMainContext
*context
)
355 QIOTask
*task
= qio_task_new(
356 OBJECT(ioc
), callback
, opaque
, destroy
);
357 struct QIOChannelSocketDGramWorkerData
*data
= g_new0(
358 struct QIOChannelSocketDGramWorkerData
, 1);
360 data
->localAddr
= QAPI_CLONE(SocketAddress
, localAddr
);
361 data
->remoteAddr
= QAPI_CLONE(SocketAddress
, remoteAddr
);
363 trace_qio_channel_socket_dgram_async(ioc
, localAddr
, remoteAddr
);
364 qio_task_run_in_thread(task
,
365 qio_channel_socket_dgram_worker
,
367 qio_channel_socket_dgram_worker_free
,
373 qio_channel_socket_accept(QIOChannelSocket
*ioc
,
376 QIOChannelSocket
*cioc
;
378 cioc
= qio_channel_socket_new();
379 cioc
->remoteAddrLen
= sizeof(ioc
->remoteAddr
);
380 cioc
->localAddrLen
= sizeof(ioc
->localAddr
);
383 trace_qio_channel_socket_accept(ioc
);
384 cioc
->fd
= qemu_accept(ioc
->fd
, (struct sockaddr
*)&cioc
->remoteAddr
,
385 &cioc
->remoteAddrLen
);
387 if (errno
== EINTR
) {
390 error_setg_errno(errp
, errno
, "Unable to accept connection");
391 trace_qio_channel_socket_accept_fail(ioc
);
395 if (getsockname(cioc
->fd
, (struct sockaddr
*)&cioc
->localAddr
,
396 &cioc
->localAddrLen
) < 0) {
397 error_setg_errno(errp
, errno
,
398 "Unable to query local socket address");
403 if (cioc
->localAddr
.ss_family
== AF_UNIX
) {
404 QIOChannel
*ioc_local
= QIO_CHANNEL(cioc
);
405 qio_channel_set_feature(ioc_local
, QIO_CHANNEL_FEATURE_FD_PASS
);
409 trace_qio_channel_socket_accept_complete(ioc
, cioc
, cioc
->fd
);
413 object_unref(OBJECT(cioc
));
417 static void qio_channel_socket_init(Object
*obj
)
419 QIOChannelSocket
*ioc
= QIO_CHANNEL_SOCKET(obj
);
423 static void qio_channel_socket_finalize(Object
*obj
)
425 QIOChannelSocket
*ioc
= QIO_CHANNEL_SOCKET(obj
);
428 QIOChannel
*ioc_local
= QIO_CHANNEL(ioc
);
429 if (qio_channel_has_feature(ioc_local
, QIO_CHANNEL_FEATURE_LISTEN
)) {
432 socket_listen_cleanup(ioc
->fd
, &err
);
434 error_report_err(err
);
439 WSAEventSelect(ioc
->fd
, NULL
, 0);
441 closesocket(ioc
->fd
);
448 static void qio_channel_socket_copy_fds(struct msghdr
*msg
,
449 int **fds
, size_t *nfds
)
451 struct cmsghdr
*cmsg
;
456 for (cmsg
= CMSG_FIRSTHDR(msg
); cmsg
; cmsg
= CMSG_NXTHDR(msg
, cmsg
)) {
460 if (cmsg
->cmsg_len
< CMSG_LEN(sizeof(int)) ||
461 cmsg
->cmsg_level
!= SOL_SOCKET
||
462 cmsg
->cmsg_type
!= SCM_RIGHTS
) {
466 fd_size
= cmsg
->cmsg_len
- CMSG_LEN(0);
472 gotfds
= fd_size
/ sizeof(int);
473 *fds
= g_renew(int, *fds
, *nfds
+ gotfds
);
474 memcpy(*fds
+ *nfds
, CMSG_DATA(cmsg
), fd_size
);
476 for (i
= 0; i
< gotfds
; i
++) {
477 int fd
= (*fds
)[*nfds
+ i
];
482 /* O_NONBLOCK is preserved across SCM_RIGHTS so reset it */
483 qemu_socket_set_block(fd
);
485 #ifndef MSG_CMSG_CLOEXEC
486 qemu_set_cloexec(fd
);
494 static ssize_t
qio_channel_socket_readv(QIOChannel
*ioc
,
495 const struct iovec
*iov
,
501 QIOChannelSocket
*sioc
= QIO_CHANNEL_SOCKET(ioc
);
503 struct msghdr msg
= { NULL
, };
504 char control
[CMSG_SPACE(sizeof(int) * SOCKET_MAX_FDS
)];
507 memset(control
, 0, CMSG_SPACE(sizeof(int) * SOCKET_MAX_FDS
));
509 msg
.msg_iov
= (struct iovec
*)iov
;
510 msg
.msg_iovlen
= niov
;
512 msg
.msg_control
= control
;
513 msg
.msg_controllen
= sizeof(control
);
514 #ifdef MSG_CMSG_CLOEXEC
515 sflags
|= MSG_CMSG_CLOEXEC
;
521 ret
= recvmsg(sioc
->fd
, &msg
, sflags
);
523 if (errno
== EAGAIN
) {
524 return QIO_CHANNEL_ERR_BLOCK
;
526 if (errno
== EINTR
) {
530 error_setg_errno(errp
, errno
,
531 "Unable to read from socket");
536 qio_channel_socket_copy_fds(&msg
, fds
, nfds
);
542 static ssize_t
qio_channel_socket_writev(QIOChannel
*ioc
,
543 const struct iovec
*iov
,
550 QIOChannelSocket
*sioc
= QIO_CHANNEL_SOCKET(ioc
);
552 struct msghdr msg
= { NULL
, };
553 char control
[CMSG_SPACE(sizeof(int) * SOCKET_MAX_FDS
)];
554 size_t fdsize
= sizeof(int) * nfds
;
555 struct cmsghdr
*cmsg
;
558 memset(control
, 0, CMSG_SPACE(sizeof(int) * SOCKET_MAX_FDS
));
560 msg
.msg_iov
= (struct iovec
*)iov
;
561 msg
.msg_iovlen
= niov
;
564 if (nfds
> SOCKET_MAX_FDS
) {
565 error_setg_errno(errp
, EINVAL
,
566 "Only %d FDs can be sent, got %zu",
567 SOCKET_MAX_FDS
, nfds
);
571 msg
.msg_control
= control
;
572 msg
.msg_controllen
= CMSG_SPACE(sizeof(int) * nfds
);
574 cmsg
= CMSG_FIRSTHDR(&msg
);
575 cmsg
->cmsg_len
= CMSG_LEN(fdsize
);
576 cmsg
->cmsg_level
= SOL_SOCKET
;
577 cmsg
->cmsg_type
= SCM_RIGHTS
;
578 memcpy(CMSG_DATA(cmsg
), fds
, fdsize
);
581 #ifdef QEMU_MSG_ZEROCOPY
582 if (flags
& QIO_CHANNEL_WRITE_FLAG_ZERO_COPY
) {
583 sflags
= MSG_ZEROCOPY
;
588 ret
= sendmsg(sioc
->fd
, &msg
, sflags
);
592 return QIO_CHANNEL_ERR_BLOCK
;
595 #ifdef QEMU_MSG_ZEROCOPY
597 if (sflags
& MSG_ZEROCOPY
) {
598 error_setg_errno(errp
, errno
,
599 "Process can't lock enough memory for using MSG_ZEROCOPY");
606 error_setg_errno(errp
, errno
,
607 "Unable to write to socket");
613 static ssize_t
qio_channel_socket_readv(QIOChannel
*ioc
,
614 const struct iovec
*iov
,
620 QIOChannelSocket
*sioc
= QIO_CHANNEL_SOCKET(ioc
);
624 for (i
= 0; i
< niov
; i
++) {
632 if (errno
== EAGAIN
) {
636 return QIO_CHANNEL_ERR_BLOCK
;
638 } else if (errno
== EINTR
) {
641 error_setg_errno(errp
, errno
,
642 "Unable to read from socket");
647 if (ret
< iov
[i
].iov_len
) {
655 static ssize_t
qio_channel_socket_writev(QIOChannel
*ioc
,
656 const struct iovec
*iov
,
663 QIOChannelSocket
*sioc
= QIO_CHANNEL_SOCKET(ioc
);
667 for (i
= 0; i
< niov
; i
++) {
675 if (errno
== EAGAIN
) {
679 return QIO_CHANNEL_ERR_BLOCK
;
681 } else if (errno
== EINTR
) {
684 error_setg_errno(errp
, errno
,
685 "Unable to write to socket");
690 if (ret
< iov
[i
].iov_len
) {
700 #ifdef QEMU_MSG_ZEROCOPY
701 static int qio_channel_socket_flush(QIOChannel
*ioc
,
704 QIOChannelSocket
*sioc
= QIO_CHANNEL_SOCKET(ioc
);
705 struct msghdr msg
= {};
706 struct sock_extended_err
*serr
;
708 char control
[CMSG_SPACE(sizeof(*serr
))];
712 msg
.msg_control
= control
;
713 msg
.msg_controllen
= sizeof(control
);
714 memset(control
, 0, sizeof(control
));
716 while (sioc
->zero_copy_sent
< sioc
->zero_copy_queued
) {
717 received
= recvmsg(sioc
->fd
, &msg
, MSG_ERRQUEUE
);
721 /* Nothing on errqueue, wait until something is available */
722 qio_channel_wait(ioc
, G_IO_ERR
);
727 error_setg_errno(errp
, errno
,
728 "Unable to read errqueue");
733 cm
= CMSG_FIRSTHDR(&msg
);
734 if (cm
->cmsg_level
!= SOL_IP
&&
735 cm
->cmsg_type
!= IP_RECVERR
) {
736 error_setg_errno(errp
, EPROTOTYPE
,
737 "Wrong cmsg in errqueue");
741 serr
= (void *) CMSG_DATA(cm
);
742 if (serr
->ee_errno
!= SO_EE_ORIGIN_NONE
) {
743 error_setg_errno(errp
, serr
->ee_errno
,
747 if (serr
->ee_origin
!= SO_EE_ORIGIN_ZEROCOPY
) {
748 error_setg_errno(errp
, serr
->ee_origin
,
749 "Error not from zero copy");
753 /* No errors, count successfully finished sendmsg()*/
754 sioc
->zero_copy_sent
+= serr
->ee_data
- serr
->ee_info
+ 1;
756 /* If any sendmsg() succeeded using zero copy, return 0 at the end */
757 if (serr
->ee_code
!= SO_EE_CODE_ZEROCOPY_COPIED
) {
765 #endif /* QEMU_MSG_ZEROCOPY */
768 qio_channel_socket_set_blocking(QIOChannel
*ioc
,
772 QIOChannelSocket
*sioc
= QIO_CHANNEL_SOCKET(ioc
);
775 qemu_socket_set_block(sioc
->fd
);
777 qemu_socket_set_nonblock(sioc
->fd
);
784 qio_channel_socket_set_delay(QIOChannel
*ioc
,
787 QIOChannelSocket
*sioc
= QIO_CHANNEL_SOCKET(ioc
);
788 int v
= enabled
? 0 : 1;
791 IPPROTO_TCP
, TCP_NODELAY
,
797 qio_channel_socket_set_cork(QIOChannel
*ioc
,
800 QIOChannelSocket
*sioc
= QIO_CHANNEL_SOCKET(ioc
);
801 int v
= enabled
? 1 : 0;
803 socket_set_cork(sioc
->fd
, v
);
808 qio_channel_socket_close(QIOChannel
*ioc
,
811 QIOChannelSocket
*sioc
= QIO_CHANNEL_SOCKET(ioc
);
815 if (sioc
->fd
!= -1) {
817 WSAEventSelect(sioc
->fd
, NULL
, 0);
819 if (qio_channel_has_feature(ioc
, QIO_CHANNEL_FEATURE_LISTEN
)) {
820 socket_listen_cleanup(sioc
->fd
, errp
);
823 if (closesocket(sioc
->fd
) < 0) {
825 error_setg_errno(&err
, errno
, "Unable to close socket");
826 error_propagate(errp
, err
);
835 qio_channel_socket_shutdown(QIOChannel
*ioc
,
836 QIOChannelShutdown how
,
839 QIOChannelSocket
*sioc
= QIO_CHANNEL_SOCKET(ioc
);
843 case QIO_CHANNEL_SHUTDOWN_READ
:
846 case QIO_CHANNEL_SHUTDOWN_WRITE
:
849 case QIO_CHANNEL_SHUTDOWN_BOTH
:
855 if (shutdown(sioc
->fd
, sockhow
) < 0) {
856 error_setg_errno(errp
, errno
,
857 "Unable to shutdown socket");
863 static void qio_channel_socket_set_aio_fd_handler(QIOChannel
*ioc
,
869 QIOChannelSocket
*sioc
= QIO_CHANNEL_SOCKET(ioc
);
870 aio_set_fd_handler(ctx
, sioc
->fd
, false,
871 io_read
, io_write
, NULL
, NULL
, opaque
);
874 static GSource
*qio_channel_socket_create_watch(QIOChannel
*ioc
,
875 GIOCondition condition
)
877 QIOChannelSocket
*sioc
= QIO_CHANNEL_SOCKET(ioc
);
878 return qio_channel_create_socket_watch(ioc
,
883 static void qio_channel_socket_class_init(ObjectClass
*klass
,
884 void *class_data G_GNUC_UNUSED
)
886 QIOChannelClass
*ioc_klass
= QIO_CHANNEL_CLASS(klass
);
888 ioc_klass
->io_writev
= qio_channel_socket_writev
;
889 ioc_klass
->io_readv
= qio_channel_socket_readv
;
890 ioc_klass
->io_set_blocking
= qio_channel_socket_set_blocking
;
891 ioc_klass
->io_close
= qio_channel_socket_close
;
892 ioc_klass
->io_shutdown
= qio_channel_socket_shutdown
;
893 ioc_klass
->io_set_cork
= qio_channel_socket_set_cork
;
894 ioc_klass
->io_set_delay
= qio_channel_socket_set_delay
;
895 ioc_klass
->io_create_watch
= qio_channel_socket_create_watch
;
896 ioc_klass
->io_set_aio_fd_handler
= qio_channel_socket_set_aio_fd_handler
;
897 #ifdef QEMU_MSG_ZEROCOPY
898 ioc_klass
->io_flush
= qio_channel_socket_flush
;
902 static const TypeInfo qio_channel_socket_info
= {
903 .parent
= TYPE_QIO_CHANNEL
,
904 .name
= TYPE_QIO_CHANNEL_SOCKET
,
905 .instance_size
= sizeof(QIOChannelSocket
),
906 .instance_init
= qio_channel_socket_init
,
907 .instance_finalize
= qio_channel_socket_finalize
,
908 .class_init
= qio_channel_socket_class_init
,
911 static void qio_channel_socket_register_types(void)
913 type_register_static(&qio_channel_socket_info
);
916 type_init(qio_channel_socket_register_types
);