4 * Copyright (c) 2003-2008 Fabrice Bellard
5 * Copyright (c) 2022 Red Hat, Inc.
7 * Permission is hereby granted, free of charge, to any person obtaining a copy
8 * of this software and associated documentation files (the "Software"), to deal
9 * in the Software without restriction, including without limitation the rights
10 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11 * copies of the Software, and to permit persons to whom the Software is
12 * furnished to do so, subject to the following conditions:
14 * The above copyright notice and this permission notice shall be included in
15 * all copies or substantial portions of the Software.
17 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
20 * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
26 #include "qemu/osdep.h"
30 #include "monitor/monitor.h"
31 #include "qapi/error.h"
32 #include "qemu/error-report.h"
33 #include "qemu/option.h"
34 #include "qemu/sockets.h"
36 #include "qemu/main-loop.h"
37 #include "qemu/cutils.h"
38 #include "io/channel.h"
39 #include "io/channel-socket.h"
40 #include "io/net-listener.h"
41 #include "qapi/qapi-events-net.h"
42 #include "qapi/qapi-visit-sockets.h"
43 #include "qapi/clone-visitor.h"
45 typedef struct NetStreamState
{
47 QIOChannel
*listen_ioc
;
48 QIONetListener
*listener
;
53 unsigned int send_index
; /* number of bytes sent*/
59 static void net_stream_listen(QIONetListener
*listener
,
60 QIOChannelSocket
*cioc
,
62 static void net_stream_arm_reconnect(NetStreamState
*s
);
64 static gboolean
net_stream_writable(QIOChannel
*ioc
,
65 GIOCondition condition
,
68 NetStreamState
*s
= data
;
72 qemu_flush_queued_packets(&s
->nc
);
74 return G_SOURCE_REMOVE
;
77 static ssize_t
net_stream_receive(NetClientState
*nc
, const uint8_t *buf
,
80 NetStreamState
*s
= DO_UPCAST(NetStreamState
, nc
, nc
);
81 uint32_t len
= htonl(size
);
82 struct iovec iov
[] = {
85 .iov_len
= sizeof(len
),
87 .iov_base
= (void *)buf
,
91 struct iovec local_iov
[2];
92 unsigned int nlocal_iov
;
96 remaining
= iov_size(iov
, 2) - s
->send_index
;
97 nlocal_iov
= iov_copy(local_iov
, 2, iov
, 2, s
->send_index
, remaining
);
98 ret
= qio_channel_writev(s
->ioc
, local_iov
, nlocal_iov
, NULL
);
99 if (ret
== QIO_CHANNEL_ERR_BLOCK
) {
100 ret
= 0; /* handled further down */
106 if (ret
< (ssize_t
)remaining
) {
107 s
->send_index
+= ret
;
108 s
->ioc_write_tag
= qio_channel_add_watch(s
->ioc
, G_IO_OUT
,
109 net_stream_writable
, s
, NULL
);
116 static gboolean
net_stream_send(QIOChannel
*ioc
,
117 GIOCondition condition
,
120 static void net_stream_send_completed(NetClientState
*nc
, ssize_t len
)
122 NetStreamState
*s
= DO_UPCAST(NetStreamState
, nc
, nc
);
124 if (!s
->ioc_read_tag
) {
125 s
->ioc_read_tag
= qio_channel_add_watch(s
->ioc
, G_IO_IN
,
126 net_stream_send
, s
, NULL
);
130 static void net_stream_rs_finalize(SocketReadState
*rs
)
132 NetStreamState
*s
= container_of(rs
, NetStreamState
, rs
);
134 if (qemu_send_packet_async(&s
->nc
, rs
->buf
,
136 net_stream_send_completed
) == 0) {
137 if (s
->ioc_read_tag
) {
138 g_source_remove(s
->ioc_read_tag
);
144 static gboolean
net_stream_send(QIOChannel
*ioc
,
145 GIOCondition condition
,
148 NetStreamState
*s
= data
;
151 char buf1
[NET_BUFSIZE
];
154 size
= qio_channel_read(s
->ioc
, buf1
, sizeof(buf1
), NULL
);
156 if (errno
!= EWOULDBLOCK
) {
159 } else if (size
== 0) {
160 /* end of connection */
163 if (s
->ioc_write_tag
) {
164 g_source_remove(s
->ioc_write_tag
);
165 s
->ioc_write_tag
= 0;
168 qemu_set_info_str(&s
->nc
, "listening");
169 qio_net_listener_set_client_func(s
->listener
, net_stream_listen
,
172 object_unref(OBJECT(s
->ioc
));
175 net_socket_rs_init(&s
->rs
, net_stream_rs_finalize
, false);
176 s
->nc
.link_down
= true;
178 qapi_event_send_netdev_stream_disconnected(s
->nc
.name
);
179 net_stream_arm_reconnect(s
);
181 return G_SOURCE_REMOVE
;
185 ret
= net_fill_rstate(&s
->rs
, (const uint8_t *)buf
, size
);
191 return G_SOURCE_CONTINUE
;
194 static void net_stream_cleanup(NetClientState
*nc
)
196 NetStreamState
*s
= DO_UPCAST(NetStreamState
, nc
, nc
);
198 g_source_remove(s
->timer_tag
);
202 qapi_free_SocketAddress(s
->addr
);
206 if (QIO_CHANNEL_SOCKET(s
->ioc
)->fd
!= -1) {
207 if (s
->ioc_read_tag
) {
208 g_source_remove(s
->ioc_read_tag
);
211 if (s
->ioc_write_tag
) {
212 g_source_remove(s
->ioc_write_tag
);
213 s
->ioc_write_tag
= 0;
216 object_unref(OBJECT(s
->ioc
));
221 qio_net_listener_disconnect(s
->listener
);
222 object_unref(OBJECT(s
->listener
));
225 object_unref(OBJECT(s
->listen_ioc
));
226 s
->listen_ioc
= NULL
;
230 static NetClientInfo net_stream_info
= {
231 .type
= NET_CLIENT_DRIVER_STREAM
,
232 .size
= sizeof(NetStreamState
),
233 .receive
= net_stream_receive
,
234 .cleanup
= net_stream_cleanup
,
237 static void net_stream_listen(QIONetListener
*listener
,
238 QIOChannelSocket
*cioc
,
241 NetStreamState
*s
= opaque
;
245 object_ref(OBJECT(cioc
));
247 qio_net_listener_set_client_func(s
->listener
, NULL
, s
, NULL
);
249 s
->ioc
= QIO_CHANNEL(cioc
);
250 qio_channel_set_name(s
->ioc
, "stream-server");
251 s
->nc
.link_down
= false;
253 s
->ioc_read_tag
= qio_channel_add_watch(s
->ioc
, G_IO_IN
, net_stream_send
,
256 if (cioc
->localAddr
.ss_family
== AF_UNIX
) {
257 addr
= qio_channel_socket_get_local_address(cioc
, NULL
);
259 addr
= qio_channel_socket_get_remote_address(cioc
, NULL
);
261 g_assert(addr
!= NULL
);
262 uri
= socket_uri(addr
);
263 qemu_set_info_str(&s
->nc
, "%s", uri
);
265 qapi_event_send_netdev_stream_connected(s
->nc
.name
, addr
);
266 qapi_free_SocketAddress(addr
);
269 static void net_stream_server_listening(QIOTask
*task
, gpointer opaque
)
271 NetStreamState
*s
= opaque
;
272 QIOChannelSocket
*listen_sioc
= QIO_CHANNEL_SOCKET(s
->listen_ioc
);
277 if (qio_task_propagate_error(task
, &err
)) {
278 qemu_set_info_str(&s
->nc
, "error: %s", error_get_pretty(err
));
283 addr
= qio_channel_socket_get_local_address(listen_sioc
, NULL
);
284 g_assert(addr
!= NULL
);
285 ret
= qemu_socket_try_set_nonblock(listen_sioc
->fd
);
286 if (addr
->type
== SOCKET_ADDRESS_TYPE_FD
&& ret
< 0) {
287 qemu_set_info_str(&s
->nc
, "can't use file descriptor %s (errno %d)",
288 addr
->u
.fd
.str
, -ret
);
292 qapi_free_SocketAddress(addr
);
294 s
->nc
.link_down
= true;
295 s
->listener
= qio_net_listener_new();
297 qemu_set_info_str(&s
->nc
, "listening");
298 net_socket_rs_init(&s
->rs
, net_stream_rs_finalize
, false);
299 qio_net_listener_set_client_func(s
->listener
, net_stream_listen
, s
, NULL
);
300 qio_net_listener_add(s
->listener
, listen_sioc
);
303 static int net_stream_server_init(NetClientState
*peer
,
311 QIOChannelSocket
*listen_sioc
= qio_channel_socket_new();
313 nc
= qemu_new_net_client(&net_stream_info
, peer
, model
, name
);
314 s
= DO_UPCAST(NetStreamState
, nc
, nc
);
315 qemu_set_info_str(&s
->nc
, "initializing");
317 s
->listen_ioc
= QIO_CHANNEL(listen_sioc
);
318 qio_channel_socket_listen_async(listen_sioc
, addr
, 0,
319 net_stream_server_listening
, s
,
325 static void net_stream_client_connected(QIOTask
*task
, gpointer opaque
)
327 NetStreamState
*s
= opaque
;
328 QIOChannelSocket
*sioc
= QIO_CHANNEL_SOCKET(s
->ioc
);
334 if (qio_task_propagate_error(task
, &err
)) {
335 qemu_set_info_str(&s
->nc
, "error: %s", error_get_pretty(err
));
340 addr
= qio_channel_socket_get_remote_address(sioc
, NULL
);
341 g_assert(addr
!= NULL
);
342 uri
= socket_uri(addr
);
343 qemu_set_info_str(&s
->nc
, "%s", uri
);
346 ret
= qemu_socket_try_set_nonblock(sioc
->fd
);
347 if (addr
->type
== SOCKET_ADDRESS_TYPE_FD
&& ret
< 0) {
348 qemu_set_info_str(&s
->nc
, "can't use file descriptor %s (errno %d)",
349 addr
->u
.fd
.str
, -ret
);
350 qapi_free_SocketAddress(addr
);
355 net_socket_rs_init(&s
->rs
, net_stream_rs_finalize
, false);
357 /* Disable Nagle algorithm on TCP sockets to reduce latency */
358 qio_channel_set_delay(s
->ioc
, false);
360 s
->ioc_read_tag
= qio_channel_add_watch(s
->ioc
, G_IO_IN
, net_stream_send
,
362 s
->nc
.link_down
= false;
363 qapi_event_send_netdev_stream_connected(s
->nc
.name
, addr
);
364 qapi_free_SocketAddress(addr
);
368 object_unref(OBJECT(s
->ioc
));
370 net_stream_arm_reconnect(s
);
373 static gboolean
net_stream_reconnect(gpointer data
)
375 NetStreamState
*s
= data
;
376 QIOChannelSocket
*sioc
;
380 sioc
= qio_channel_socket_new();
381 s
->ioc
= QIO_CHANNEL(sioc
);
382 qio_channel_socket_connect_async(sioc
, s
->addr
,
383 net_stream_client_connected
, s
,
385 return G_SOURCE_REMOVE
;
388 static void net_stream_arm_reconnect(NetStreamState
*s
)
390 if (s
->reconnect
&& s
->timer_tag
== 0) {
391 qemu_set_info_str(&s
->nc
, "connecting");
392 s
->timer_tag
= g_timeout_add_seconds(s
->reconnect
,
393 net_stream_reconnect
, s
);
397 static int net_stream_client_init(NetClientState
*peer
,
406 QIOChannelSocket
*sioc
= qio_channel_socket_new();
408 nc
= qemu_new_net_client(&net_stream_info
, peer
, model
, name
);
409 s
= DO_UPCAST(NetStreamState
, nc
, nc
);
410 qemu_set_info_str(&s
->nc
, "connecting");
412 s
->ioc
= QIO_CHANNEL(sioc
);
413 s
->nc
.link_down
= true;
415 s
->reconnect
= reconnect
;
417 s
->addr
= QAPI_CLONE(SocketAddress
, addr
);
419 qio_channel_socket_connect_async(sioc
, addr
,
420 net_stream_client_connected
, s
,
426 int net_init_stream(const Netdev
*netdev
, const char *name
,
427 NetClientState
*peer
, Error
**errp
)
429 const NetdevStreamOptions
*sock
;
431 assert(netdev
->type
== NET_CLIENT_DRIVER_STREAM
);
432 sock
= &netdev
->u
.stream
;
434 if (!sock
->has_server
|| !sock
->server
) {
435 return net_stream_client_init(peer
, "stream", name
, sock
->addr
,
436 sock
->has_reconnect
? sock
->reconnect
: 0,
439 if (sock
->has_reconnect
) {
440 error_setg(errp
, "'reconnect' option is incompatible with "
441 "socket in server mode");
444 return net_stream_server_init(peer
, "stream", name
, sock
->addr
, errp
);