4 * Copyright (c) 2003-2008 Fabrice Bellard
5 * Copyright (c) 2022 Red Hat, Inc.
7 * Permission is hereby granted, free of charge, to any person obtaining a copy
8 * of this software and associated documentation files (the "Software"), to deal
9 * in the Software without restriction, including without limitation the rights
10 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11 * copies of the Software, and to permit persons to whom the Software is
12 * furnished to do so, subject to the following conditions:
14 * The above copyright notice and this permission notice shall be included in
15 * all copies or substantial portions of the Software.
17 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
20 * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
26 #include "qemu/osdep.h"
30 #include "monitor/monitor.h"
31 #include "qapi/error.h"
32 #include "qemu/error-report.h"
33 #include "qemu/option.h"
34 #include "qemu/sockets.h"
36 #include "qemu/main-loop.h"
37 #include "qemu/cutils.h"
38 #include "io/channel.h"
39 #include "io/channel-socket.h"
40 #include "io/net-listener.h"
41 #include "qapi/qapi-events-net.h"
43 typedef struct NetStreamState
{
45 QIOChannel
*listen_ioc
;
46 QIONetListener
*listener
;
51 unsigned int send_index
; /* number of bytes sent*/
54 static void net_stream_listen(QIONetListener
*listener
,
55 QIOChannelSocket
*cioc
,
58 static gboolean
net_stream_writable(QIOChannel
*ioc
,
59 GIOCondition condition
,
62 NetStreamState
*s
= data
;
66 qemu_flush_queued_packets(&s
->nc
);
68 return G_SOURCE_REMOVE
;
71 static ssize_t
net_stream_receive(NetClientState
*nc
, const uint8_t *buf
,
74 NetStreamState
*s
= DO_UPCAST(NetStreamState
, nc
, nc
);
75 uint32_t len
= htonl(size
);
76 struct iovec iov
[] = {
79 .iov_len
= sizeof(len
),
81 .iov_base
= (void *)buf
,
85 struct iovec local_iov
[2];
86 unsigned int nlocal_iov
;
90 remaining
= iov_size(iov
, 2) - s
->send_index
;
91 nlocal_iov
= iov_copy(local_iov
, 2, iov
, 2, s
->send_index
, remaining
);
92 ret
= qio_channel_writev(s
->ioc
, local_iov
, nlocal_iov
, NULL
);
93 if (ret
== QIO_CHANNEL_ERR_BLOCK
) {
94 ret
= 0; /* handled further down */
100 if (ret
< (ssize_t
)remaining
) {
101 s
->send_index
+= ret
;
102 s
->ioc_write_tag
= qio_channel_add_watch(s
->ioc
, G_IO_OUT
,
103 net_stream_writable
, s
, NULL
);
110 static gboolean
net_stream_send(QIOChannel
*ioc
,
111 GIOCondition condition
,
114 static void net_stream_send_completed(NetClientState
*nc
, ssize_t len
)
116 NetStreamState
*s
= DO_UPCAST(NetStreamState
, nc
, nc
);
118 if (!s
->ioc_read_tag
) {
119 s
->ioc_read_tag
= qio_channel_add_watch(s
->ioc
, G_IO_IN
,
120 net_stream_send
, s
, NULL
);
124 static void net_stream_rs_finalize(SocketReadState
*rs
)
126 NetStreamState
*s
= container_of(rs
, NetStreamState
, rs
);
128 if (qemu_send_packet_async(&s
->nc
, rs
->buf
,
130 net_stream_send_completed
) == 0) {
131 if (s
->ioc_read_tag
) {
132 g_source_remove(s
->ioc_read_tag
);
138 static gboolean
net_stream_send(QIOChannel
*ioc
,
139 GIOCondition condition
,
142 NetStreamState
*s
= data
;
145 char buf1
[NET_BUFSIZE
];
148 size
= qio_channel_read(s
->ioc
, buf1
, sizeof(buf1
), NULL
);
150 if (errno
!= EWOULDBLOCK
) {
153 } else if (size
== 0) {
154 /* end of connection */
157 if (s
->ioc_write_tag
) {
158 g_source_remove(s
->ioc_write_tag
);
159 s
->ioc_write_tag
= 0;
162 qio_net_listener_set_client_func(s
->listener
, net_stream_listen
,
165 object_unref(OBJECT(s
->ioc
));
168 net_socket_rs_init(&s
->rs
, net_stream_rs_finalize
, false);
169 s
->nc
.link_down
= true;
170 qemu_set_info_str(&s
->nc
, "%s", "");
172 qapi_event_send_netdev_stream_disconnected(s
->nc
.name
);
174 return G_SOURCE_REMOVE
;
178 ret
= net_fill_rstate(&s
->rs
, (const uint8_t *)buf
, size
);
184 return G_SOURCE_CONTINUE
;
187 static void net_stream_cleanup(NetClientState
*nc
)
189 NetStreamState
*s
= DO_UPCAST(NetStreamState
, nc
, nc
);
191 if (QIO_CHANNEL_SOCKET(s
->ioc
)->fd
!= -1) {
192 if (s
->ioc_read_tag
) {
193 g_source_remove(s
->ioc_read_tag
);
196 if (s
->ioc_write_tag
) {
197 g_source_remove(s
->ioc_write_tag
);
198 s
->ioc_write_tag
= 0;
201 object_unref(OBJECT(s
->ioc
));
206 qio_net_listener_disconnect(s
->listener
);
207 object_unref(OBJECT(s
->listener
));
210 object_unref(OBJECT(s
->listen_ioc
));
211 s
->listen_ioc
= NULL
;
215 static NetClientInfo net_stream_info
= {
216 .type
= NET_CLIENT_DRIVER_STREAM
,
217 .size
= sizeof(NetStreamState
),
218 .receive
= net_stream_receive
,
219 .cleanup
= net_stream_cleanup
,
222 static void net_stream_listen(QIONetListener
*listener
,
223 QIOChannelSocket
*cioc
,
226 NetStreamState
*s
= opaque
;
230 object_ref(OBJECT(cioc
));
232 qio_net_listener_set_client_func(s
->listener
, NULL
, s
, NULL
);
234 s
->ioc
= QIO_CHANNEL(cioc
);
235 qio_channel_set_name(s
->ioc
, "stream-server");
236 s
->nc
.link_down
= false;
238 s
->ioc_read_tag
= qio_channel_add_watch(s
->ioc
, G_IO_IN
, net_stream_send
,
241 if (cioc
->localAddr
.ss_family
== AF_UNIX
) {
242 addr
= qio_channel_socket_get_local_address(cioc
, NULL
);
244 addr
= qio_channel_socket_get_remote_address(cioc
, NULL
);
246 g_assert(addr
!= NULL
);
247 uri
= socket_uri(addr
);
248 qemu_set_info_str(&s
->nc
, "%s", uri
);
250 qapi_event_send_netdev_stream_connected(s
->nc
.name
, addr
);
251 qapi_free_SocketAddress(addr
);
254 static void net_stream_server_listening(QIOTask
*task
, gpointer opaque
)
256 NetStreamState
*s
= opaque
;
257 QIOChannelSocket
*listen_sioc
= QIO_CHANNEL_SOCKET(s
->listen_ioc
);
261 if (listen_sioc
->fd
< 0) {
262 qemu_set_info_str(&s
->nc
, "connection error");
266 addr
= qio_channel_socket_get_local_address(listen_sioc
, NULL
);
267 g_assert(addr
!= NULL
);
268 ret
= qemu_socket_try_set_nonblock(listen_sioc
->fd
);
269 if (addr
->type
== SOCKET_ADDRESS_TYPE_FD
&& ret
< 0) {
270 qemu_set_info_str(&s
->nc
, "can't use file descriptor %s (errno %d)",
271 addr
->u
.fd
.str
, -ret
);
275 qapi_free_SocketAddress(addr
);
277 s
->nc
.link_down
= true;
278 s
->listener
= qio_net_listener_new();
280 net_socket_rs_init(&s
->rs
, net_stream_rs_finalize
, false);
281 qio_net_listener_set_client_func(s
->listener
, net_stream_listen
, s
, NULL
);
282 qio_net_listener_add(s
->listener
, listen_sioc
);
285 static int net_stream_server_init(NetClientState
*peer
,
293 QIOChannelSocket
*listen_sioc
= qio_channel_socket_new();
295 nc
= qemu_new_net_client(&net_stream_info
, peer
, model
, name
);
296 s
= DO_UPCAST(NetStreamState
, nc
, nc
);
298 s
->listen_ioc
= QIO_CHANNEL(listen_sioc
);
299 qio_channel_socket_listen_async(listen_sioc
, addr
, 0,
300 net_stream_server_listening
, s
,
306 static void net_stream_client_connected(QIOTask
*task
, gpointer opaque
)
308 NetStreamState
*s
= opaque
;
309 QIOChannelSocket
*sioc
= QIO_CHANNEL_SOCKET(s
->ioc
);
315 qemu_set_info_str(&s
->nc
, "connection error");
319 addr
= qio_channel_socket_get_remote_address(sioc
, NULL
);
320 g_assert(addr
!= NULL
);
321 uri
= socket_uri(addr
);
322 qemu_set_info_str(&s
->nc
, "%s", uri
);
325 ret
= qemu_socket_try_set_nonblock(sioc
->fd
);
326 if (addr
->type
== SOCKET_ADDRESS_TYPE_FD
&& ret
< 0) {
327 qemu_set_info_str(&s
->nc
, "can't use file descriptor %s (errno %d)",
328 addr
->u
.fd
.str
, -ret
);
329 qapi_free_SocketAddress(addr
);
334 net_socket_rs_init(&s
->rs
, net_stream_rs_finalize
, false);
336 /* Disable Nagle algorithm on TCP sockets to reduce latency */
337 qio_channel_set_delay(s
->ioc
, false);
339 s
->ioc_read_tag
= qio_channel_add_watch(s
->ioc
, G_IO_IN
, net_stream_send
,
341 s
->nc
.link_down
= false;
342 qapi_event_send_netdev_stream_connected(s
->nc
.name
, addr
);
343 qapi_free_SocketAddress(addr
);
347 object_unref(OBJECT(s
->ioc
));
351 static int net_stream_client_init(NetClientState
*peer
,
359 QIOChannelSocket
*sioc
= qio_channel_socket_new();
361 nc
= qemu_new_net_client(&net_stream_info
, peer
, model
, name
);
362 s
= DO_UPCAST(NetStreamState
, nc
, nc
);
364 s
->ioc
= QIO_CHANNEL(sioc
);
365 s
->nc
.link_down
= true;
367 qio_channel_socket_connect_async(sioc
, addr
,
368 net_stream_client_connected
, s
,
374 int net_init_stream(const Netdev
*netdev
, const char *name
,
375 NetClientState
*peer
, Error
**errp
)
377 const NetdevStreamOptions
*sock
;
379 assert(netdev
->type
== NET_CLIENT_DRIVER_STREAM
);
380 sock
= &netdev
->u
.stream
;
382 if (!sock
->has_server
|| !sock
->server
) {
383 return net_stream_client_init(peer
, "stream", name
, sock
->addr
, errp
);
385 return net_stream_server_init(peer
, "stream", name
, sock
->addr
, errp
);