2 * AF_XDP network backend.
4 * Copyright (c) 2023 Red Hat, Inc.
7 * Ilya Maximets <i.maximets@ovn.org>
9 * This work is licensed under the terms of the GNU GPL, version 2 or later.
10 * See the COPYING file in the top-level directory.
14 #include "qemu/osdep.h"
17 #include <linux/if_link.h>
18 #include <linux/if_xdp.h>
23 #include "monitor/monitor.h"
25 #include "qapi/error.h"
26 #include "qemu/cutils.h"
27 #include "qemu/error-report.h"
29 #include "qemu/main-loop.h"
30 #include "qemu/memalign.h"
33 typedef struct AFXDPState
{
36 struct xsk_socket
*xsk
;
37 struct xsk_ring_cons rx
;
38 struct xsk_ring_prod tx
;
39 struct xsk_ring_cons cq
;
40 struct xsk_ring_prod fq
;
42 char ifname
[IFNAMSIZ
];
46 uint32_t outstanding_tx
;
51 struct xsk_umem
*umem
;
58 #define AF_XDP_BATCH_SIZE 64
60 static void af_xdp_send(void *opaque
);
61 static void af_xdp_writable(void *opaque
);
63 /* Set the event-loop handlers for the af-xdp backend. */
64 static void af_xdp_update_fd_handler(AFXDPState
*s
)
66 qemu_set_fd_handler(xsk_socket__fd(s
->xsk
),
67 s
->read_poll
? af_xdp_send
: NULL
,
68 s
->write_poll
? af_xdp_writable
: NULL
,
72 /* Update the read handler. */
73 static void af_xdp_read_poll(AFXDPState
*s
, bool enable
)
75 if (s
->read_poll
!= enable
) {
76 s
->read_poll
= enable
;
77 af_xdp_update_fd_handler(s
);
81 /* Update the write handler. */
82 static void af_xdp_write_poll(AFXDPState
*s
, bool enable
)
84 if (s
->write_poll
!= enable
) {
85 s
->write_poll
= enable
;
86 af_xdp_update_fd_handler(s
);
90 static void af_xdp_poll(NetClientState
*nc
, bool enable
)
92 AFXDPState
*s
= DO_UPCAST(AFXDPState
, nc
, nc
);
94 if (s
->read_poll
!= enable
|| s
->write_poll
!= enable
) {
95 s
->write_poll
= enable
;
96 s
->read_poll
= enable
;
97 af_xdp_update_fd_handler(s
);
101 static void af_xdp_complete_tx(AFXDPState
*s
)
107 done
= xsk_ring_cons__peek(&s
->cq
, XSK_RING_CONS__DEFAULT_NUM_DESCS
, &idx
);
109 for (i
= 0; i
< done
; i
++) {
110 addr
= (void *) xsk_ring_cons__comp_addr(&s
->cq
, idx
++);
111 s
->pool
[s
->n_pool
++] = *addr
;
116 xsk_ring_cons__release(&s
->cq
, done
);
121 * The fd_write() callback, invoked if the fd is marked as writable
124 static void af_xdp_writable(void *opaque
)
126 AFXDPState
*s
= opaque
;
128 /* Try to recover buffers that are already sent. */
129 af_xdp_complete_tx(s
);
132 * Unregister the handler, unless we still have packets to transmit
133 * and kernel needs a wake up.
135 if (!s
->outstanding_tx
|| !xsk_ring_prod__needs_wakeup(&s
->tx
)) {
136 af_xdp_write_poll(s
, false);
139 /* Flush any buffered packets. */
140 qemu_flush_queued_packets(&s
->nc
);
143 static ssize_t
af_xdp_receive(NetClientState
*nc
,
144 const uint8_t *buf
, size_t size
)
146 AFXDPState
*s
= DO_UPCAST(AFXDPState
, nc
, nc
);
147 struct xdp_desc
*desc
;
151 /* Try to recover buffers that are already sent. */
152 af_xdp_complete_tx(s
);
154 if (size
> XSK_UMEM__DEFAULT_FRAME_SIZE
) {
155 /* We can't transmit packet this size... */
159 if (!s
->n_pool
|| !xsk_ring_prod__reserve(&s
->tx
, 1, &idx
)) {
161 * Out of buffers or space in tx ring. Poll until we can write.
162 * This will also kick the Tx, if it was waiting on CQ.
164 af_xdp_write_poll(s
, true);
168 desc
= xsk_ring_prod__tx_desc(&s
->tx
, idx
);
169 desc
->addr
= s
->pool
[--s
->n_pool
];
172 data
= xsk_umem__get_data(s
->buffer
, desc
->addr
);
173 memcpy(data
, buf
, size
);
175 xsk_ring_prod__submit(&s
->tx
, 1);
178 if (xsk_ring_prod__needs_wakeup(&s
->tx
)) {
179 af_xdp_write_poll(s
, true);
186 * Complete a previous send (backend --> guest) and enable the
189 static void af_xdp_send_completed(NetClientState
*nc
, ssize_t len
)
191 AFXDPState
*s
= DO_UPCAST(AFXDPState
, nc
, nc
);
193 af_xdp_read_poll(s
, true);
196 static void af_xdp_fq_refill(AFXDPState
*s
, uint32_t n
)
200 /* Leave one packet for Tx, just in case. */
201 if (s
->n_pool
< n
+ 1) {
205 if (!n
|| !xsk_ring_prod__reserve(&s
->fq
, n
, &idx
)) {
209 for (i
= 0; i
< n
; i
++) {
210 *xsk_ring_prod__fill_addr(&s
->fq
, idx
++) = s
->pool
[--s
->n_pool
];
212 xsk_ring_prod__submit(&s
->fq
, n
);
214 if (xsk_ring_prod__needs_wakeup(&s
->fq
)) {
215 /* Receive was blocked by not having enough buffers. Wake it up. */
216 af_xdp_read_poll(s
, true);
220 static void af_xdp_send(void *opaque
)
222 uint32_t i
, n_rx
, idx
= 0;
223 AFXDPState
*s
= opaque
;
225 n_rx
= xsk_ring_cons__peek(&s
->rx
, AF_XDP_BATCH_SIZE
, &idx
);
230 for (i
= 0; i
< n_rx
; i
++) {
231 const struct xdp_desc
*desc
;
234 desc
= xsk_ring_cons__rx_desc(&s
->rx
, idx
++);
236 iov
.iov_base
= xsk_umem__get_data(s
->buffer
, desc
->addr
);
237 iov
.iov_len
= desc
->len
;
239 s
->pool
[s
->n_pool
++] = desc
->addr
;
241 if (!qemu_sendv_packet_async(&s
->nc
, &iov
, 1,
242 af_xdp_send_completed
)) {
244 * The peer does not receive anymore. Packet is queued, stop
245 * reading from the backend until af_xdp_send_completed().
247 af_xdp_read_poll(s
, false);
249 /* Return unused descriptors to not break the ring cache. */
250 xsk_ring_cons__cancel(&s
->rx
, n_rx
- i
- 1);
256 /* Release actually sent descriptors and try to re-fill. */
257 xsk_ring_cons__release(&s
->rx
, n_rx
);
258 af_xdp_fq_refill(s
, AF_XDP_BATCH_SIZE
);
261 /* Flush and close. */
262 static void af_xdp_cleanup(NetClientState
*nc
)
264 AFXDPState
*s
= DO_UPCAST(AFXDPState
, nc
, nc
);
266 qemu_purge_queued_packets(nc
);
268 af_xdp_poll(nc
, false);
270 xsk_socket__delete(s
->xsk
);
274 xsk_umem__delete(s
->umem
);
276 qemu_vfree(s
->buffer
);
279 /* Remove the program if it's the last open queue. */
280 if (!s
->inhibit
&& nc
->queue_index
== s
->n_queues
- 1 && s
->xdp_flags
281 && bpf_xdp_detach(s
->ifindex
, s
->xdp_flags
, NULL
) != 0) {
283 "af-xdp: unable to remove XDP program from '%s', ifindex: %d\n",
284 s
->ifname
, s
->ifindex
);
288 static int af_xdp_umem_create(AFXDPState
*s
, int sock_fd
, Error
**errp
)
290 struct xsk_umem_config config
= {
291 .fill_size
= XSK_RING_PROD__DEFAULT_NUM_DESCS
,
292 .comp_size
= XSK_RING_CONS__DEFAULT_NUM_DESCS
,
293 .frame_size
= XSK_UMEM__DEFAULT_FRAME_SIZE
,
301 /* Number of descriptors if all 4 queues (rx, tx, cq, fq) are full. */
302 n_descs
= (XSK_RING_PROD__DEFAULT_NUM_DESCS
303 + XSK_RING_CONS__DEFAULT_NUM_DESCS
) * 2;
304 size
= n_descs
* XSK_UMEM__DEFAULT_FRAME_SIZE
;
306 s
->buffer
= qemu_memalign(qemu_real_host_page_size(), size
);
307 memset(s
->buffer
, 0, size
);
310 ret
= xsk_umem__create(&s
->umem
, s
->buffer
, size
,
311 &s
->fq
, &s
->cq
, &config
);
313 ret
= xsk_umem__create_with_fd(&s
->umem
, sock_fd
, s
->buffer
, size
,
314 &s
->fq
, &s
->cq
, &config
);
318 qemu_vfree(s
->buffer
);
319 error_setg_errno(errp
, errno
,
320 "failed to create umem for %s queue_index: %d",
321 s
->ifname
, s
->nc
.queue_index
);
325 s
->pool
= g_new(uint64_t, n_descs
);
326 /* Fill the pool in the opposite order, because it's a LIFO queue. */
327 for (i
= n_descs
; i
>= 0; i
--) {
328 s
->pool
[i
] = i
* XSK_UMEM__DEFAULT_FRAME_SIZE
;
332 af_xdp_fq_refill(s
, XSK_RING_PROD__DEFAULT_NUM_DESCS
);
337 static int af_xdp_socket_create(AFXDPState
*s
,
338 const NetdevAFXDPOptions
*opts
, Error
**errp
)
340 struct xsk_socket_config cfg
= {
341 .rx_size
= XSK_RING_CONS__DEFAULT_NUM_DESCS
,
342 .tx_size
= XSK_RING_PROD__DEFAULT_NUM_DESCS
,
344 .bind_flags
= XDP_USE_NEED_WAKEUP
,
345 .xdp_flags
= XDP_FLAGS_UPDATE_IF_NOEXIST
,
347 int queue_id
, error
= 0;
349 s
->inhibit
= opts
->has_inhibit
&& opts
->inhibit
;
351 cfg
.libxdp_flags
|= XSK_LIBXDP_FLAGS__INHIBIT_PROG_LOAD
;
354 if (opts
->has_force_copy
&& opts
->force_copy
) {
355 cfg
.bind_flags
|= XDP_COPY
;
358 queue_id
= s
->nc
.queue_index
;
359 if (opts
->has_start_queue
&& opts
->start_queue
> 0) {
360 queue_id
+= opts
->start_queue
;
363 if (opts
->has_mode
) {
364 /* Specific mode requested. */
365 cfg
.xdp_flags
|= (opts
->mode
== AFXDP_MODE_NATIVE
)
366 ? XDP_FLAGS_DRV_MODE
: XDP_FLAGS_SKB_MODE
;
367 if (xsk_socket__create(&s
->xsk
, s
->ifname
, queue_id
,
368 s
->umem
, &s
->rx
, &s
->tx
, &cfg
)) {
372 /* No mode requested, try native first. */
373 cfg
.xdp_flags
|= XDP_FLAGS_DRV_MODE
;
375 if (xsk_socket__create(&s
->xsk
, s
->ifname
, queue_id
,
376 s
->umem
, &s
->rx
, &s
->tx
, &cfg
)) {
377 /* Can't use native mode, try skb. */
378 cfg
.xdp_flags
&= ~XDP_FLAGS_DRV_MODE
;
379 cfg
.xdp_flags
|= XDP_FLAGS_SKB_MODE
;
381 if (xsk_socket__create(&s
->xsk
, s
->ifname
, queue_id
,
382 s
->umem
, &s
->rx
, &s
->tx
, &cfg
)) {
389 error_setg_errno(errp
, error
,
390 "failed to create AF_XDP socket for %s queue_id: %d",
391 s
->ifname
, queue_id
);
395 s
->xdp_flags
= cfg
.xdp_flags
;
400 /* NetClientInfo methods. */
401 static NetClientInfo net_af_xdp_info
= {
402 .type
= NET_CLIENT_DRIVER_AF_XDP
,
403 .size
= sizeof(AFXDPState
),
404 .receive
= af_xdp_receive
,
406 .cleanup
= af_xdp_cleanup
,
409 static int *parse_socket_fds(const char *sock_fds_str
,
410 int64_t n_expected
, Error
**errp
)
412 gchar
**substrings
= g_strsplit(sock_fds_str
, ":", -1);
413 int64_t i
, n_sock_fds
= g_strv_length(substrings
);
414 int *sock_fds
= NULL
;
416 if (n_sock_fds
!= n_expected
) {
417 error_setg(errp
, "expected %"PRIi64
" socket fds, got %"PRIi64
,
418 n_expected
, n_sock_fds
);
422 sock_fds
= g_new(int, n_sock_fds
);
424 for (i
= 0; i
< n_sock_fds
; i
++) {
425 sock_fds
[i
] = monitor_fd_param(monitor_cur(), substrings
[i
], errp
);
426 if (sock_fds
[i
] < 0) {
434 g_strfreev(substrings
);
439 * The exported init function.
441 * ... -netdev af-xdp,ifname="..."
443 int net_init_af_xdp(const Netdev
*netdev
,
444 const char *name
, NetClientState
*peer
, Error
**errp
)
446 const NetdevAFXDPOptions
*opts
= &netdev
->u
.af_xdp
;
447 NetClientState
*nc
, *nc0
= NULL
;
448 unsigned int ifindex
;
449 uint32_t prog_id
= 0;
450 int *sock_fds
= NULL
;
455 ifindex
= if_nametoindex(opts
->ifname
);
457 error_setg_errno(errp
, errno
, "failed to get ifindex for '%s'",
462 queues
= opts
->has_queues
? opts
->queues
: 1;
464 error_setg(errp
, "invalid number of queues (%" PRIi64
") for '%s'",
465 queues
, opts
->ifname
);
469 if ((opts
->has_inhibit
&& opts
->inhibit
) != !!opts
->sock_fds
) {
470 error_setg(errp
, "'inhibit=on' requires 'sock-fds' and vice versa");
474 if (opts
->sock_fds
) {
475 sock_fds
= parse_socket_fds(opts
->sock_fds
, queues
, errp
);
481 for (i
= 0; i
< queues
; i
++) {
482 nc
= qemu_new_net_client(&net_af_xdp_info
, peer
, "af-xdp", name
);
483 qemu_set_info_str(nc
, "af-xdp%"PRIi64
" to %s", i
, opts
->ifname
);
490 s
= DO_UPCAST(AFXDPState
, nc
, nc
);
492 pstrcpy(s
->ifname
, sizeof(s
->ifname
), opts
->ifname
);
493 s
->ifindex
= ifindex
;
494 s
->n_queues
= queues
;
496 if (af_xdp_umem_create(s
, sock_fds
? sock_fds
[i
] : -1, errp
)
497 || af_xdp_socket_create(s
, opts
, errp
)) {
498 /* Make sure the XDP program will be removed. */
500 error_propagate(errp
, err
);
506 s
= DO_UPCAST(AFXDPState
, nc
, nc0
);
507 if (bpf_xdp_query_id(s
->ifindex
, s
->xdp_flags
, &prog_id
) || !prog_id
) {
508 error_setg_errno(errp
, errno
,
509 "no XDP program loaded on '%s', ifindex: %d",
510 s
->ifname
, s
->ifindex
);
515 af_xdp_read_poll(s
, true); /* Initially only poll for reads. */
522 qemu_del_net_client(nc0
);