2 * AF_XDP network backend.
4 * Copyright (c) 2023 Red Hat, Inc.
7 * Ilya Maximets <i.maximets@ovn.org>
9 * This work is licensed under the terms of the GNU GPL, version 2 or later.
10 * See the COPYING file in the top-level directory.
14 #include "qemu/osdep.h"
16 #include <linux/if_link.h>
17 #include <linux/if_xdp.h>
22 #include "monitor/monitor.h"
24 #include "qapi/error.h"
25 #include "qemu/cutils.h"
26 #include "qemu/error-report.h"
28 #include "qemu/main-loop.h"
29 #include "qemu/memalign.h"
32 typedef struct AFXDPState
{
35 struct xsk_socket
*xsk
;
36 struct xsk_ring_cons rx
;
37 struct xsk_ring_prod tx
;
38 struct xsk_ring_cons cq
;
39 struct xsk_ring_prod fq
;
41 char ifname
[IFNAMSIZ
];
45 uint32_t outstanding_tx
;
50 struct xsk_umem
*umem
;
57 #define AF_XDP_BATCH_SIZE 64
59 static void af_xdp_send(void *opaque
);
60 static void af_xdp_writable(void *opaque
);
62 /* Set the event-loop handlers for the af-xdp backend. */
63 static void af_xdp_update_fd_handler(AFXDPState
*s
)
65 qemu_set_fd_handler(xsk_socket__fd(s
->xsk
),
66 s
->read_poll
? af_xdp_send
: NULL
,
67 s
->write_poll
? af_xdp_writable
: NULL
,
71 /* Update the read handler. */
72 static void af_xdp_read_poll(AFXDPState
*s
, bool enable
)
74 if (s
->read_poll
!= enable
) {
75 s
->read_poll
= enable
;
76 af_xdp_update_fd_handler(s
);
80 /* Update the write handler. */
81 static void af_xdp_write_poll(AFXDPState
*s
, bool enable
)
83 if (s
->write_poll
!= enable
) {
84 s
->write_poll
= enable
;
85 af_xdp_update_fd_handler(s
);
89 static void af_xdp_poll(NetClientState
*nc
, bool enable
)
91 AFXDPState
*s
= DO_UPCAST(AFXDPState
, nc
, nc
);
93 if (s
->read_poll
!= enable
|| s
->write_poll
!= enable
) {
94 s
->write_poll
= enable
;
95 s
->read_poll
= enable
;
96 af_xdp_update_fd_handler(s
);
100 static void af_xdp_complete_tx(AFXDPState
*s
)
106 done
= xsk_ring_cons__peek(&s
->cq
, XSK_RING_CONS__DEFAULT_NUM_DESCS
, &idx
);
108 for (i
= 0; i
< done
; i
++) {
109 addr
= (void *) xsk_ring_cons__comp_addr(&s
->cq
, idx
++);
110 s
->pool
[s
->n_pool
++] = *addr
;
115 xsk_ring_cons__release(&s
->cq
, done
);
120 * The fd_write() callback, invoked if the fd is marked as writable
123 static void af_xdp_writable(void *opaque
)
125 AFXDPState
*s
= opaque
;
127 /* Try to recover buffers that are already sent. */
128 af_xdp_complete_tx(s
);
131 * Unregister the handler, unless we still have packets to transmit
132 * and kernel needs a wake up.
134 if (!s
->outstanding_tx
|| !xsk_ring_prod__needs_wakeup(&s
->tx
)) {
135 af_xdp_write_poll(s
, false);
138 /* Flush any buffered packets. */
139 qemu_flush_queued_packets(&s
->nc
);
142 static ssize_t
af_xdp_receive(NetClientState
*nc
,
143 const uint8_t *buf
, size_t size
)
145 AFXDPState
*s
= DO_UPCAST(AFXDPState
, nc
, nc
);
146 struct xdp_desc
*desc
;
150 /* Try to recover buffers that are already sent. */
151 af_xdp_complete_tx(s
);
153 if (size
> XSK_UMEM__DEFAULT_FRAME_SIZE
) {
154 /* We can't transmit packet this size... */
158 if (!s
->n_pool
|| !xsk_ring_prod__reserve(&s
->tx
, 1, &idx
)) {
160 * Out of buffers or space in tx ring. Poll until we can write.
161 * This will also kick the Tx, if it was waiting on CQ.
163 af_xdp_write_poll(s
, true);
167 desc
= xsk_ring_prod__tx_desc(&s
->tx
, idx
);
168 desc
->addr
= s
->pool
[--s
->n_pool
];
171 data
= xsk_umem__get_data(s
->buffer
, desc
->addr
);
172 memcpy(data
, buf
, size
);
174 xsk_ring_prod__submit(&s
->tx
, 1);
177 if (xsk_ring_prod__needs_wakeup(&s
->tx
)) {
178 af_xdp_write_poll(s
, true);
185 * Complete a previous send (backend --> guest) and enable the
188 static void af_xdp_send_completed(NetClientState
*nc
, ssize_t len
)
190 AFXDPState
*s
= DO_UPCAST(AFXDPState
, nc
, nc
);
192 af_xdp_read_poll(s
, true);
195 static void af_xdp_fq_refill(AFXDPState
*s
, uint32_t n
)
199 /* Leave one packet for Tx, just in case. */
200 if (s
->n_pool
< n
+ 1) {
204 if (!n
|| !xsk_ring_prod__reserve(&s
->fq
, n
, &idx
)) {
208 for (i
= 0; i
< n
; i
++) {
209 *xsk_ring_prod__fill_addr(&s
->fq
, idx
++) = s
->pool
[--s
->n_pool
];
211 xsk_ring_prod__submit(&s
->fq
, n
);
213 if (xsk_ring_prod__needs_wakeup(&s
->fq
)) {
214 /* Receive was blocked by not having enough buffers. Wake it up. */
215 af_xdp_read_poll(s
, true);
219 static void af_xdp_send(void *opaque
)
221 uint32_t i
, n_rx
, idx
= 0;
222 AFXDPState
*s
= opaque
;
224 n_rx
= xsk_ring_cons__peek(&s
->rx
, AF_XDP_BATCH_SIZE
, &idx
);
229 for (i
= 0; i
< n_rx
; i
++) {
230 const struct xdp_desc
*desc
;
233 desc
= xsk_ring_cons__rx_desc(&s
->rx
, idx
++);
235 iov
.iov_base
= xsk_umem__get_data(s
->buffer
, desc
->addr
);
236 iov
.iov_len
= desc
->len
;
238 s
->pool
[s
->n_pool
++] = desc
->addr
;
240 if (!qemu_sendv_packet_async(&s
->nc
, &iov
, 1,
241 af_xdp_send_completed
)) {
243 * The peer does not receive anymore. Packet is queued, stop
244 * reading from the backend until af_xdp_send_completed().
246 af_xdp_read_poll(s
, false);
248 /* Return unused descriptors to not break the ring cache. */
249 xsk_ring_cons__cancel(&s
->rx
, n_rx
- i
- 1);
255 /* Release actually sent descriptors and try to re-fill. */
256 xsk_ring_cons__release(&s
->rx
, n_rx
);
257 af_xdp_fq_refill(s
, AF_XDP_BATCH_SIZE
);
260 /* Flush and close. */
261 static void af_xdp_cleanup(NetClientState
*nc
)
263 AFXDPState
*s
= DO_UPCAST(AFXDPState
, nc
, nc
);
265 qemu_purge_queued_packets(nc
);
267 af_xdp_poll(nc
, false);
269 xsk_socket__delete(s
->xsk
);
273 xsk_umem__delete(s
->umem
);
275 qemu_vfree(s
->buffer
);
278 /* Remove the program if it's the last open queue. */
279 if (!s
->inhibit
&& nc
->queue_index
== s
->n_queues
- 1 && s
->xdp_flags
280 && bpf_xdp_detach(s
->ifindex
, s
->xdp_flags
, NULL
) != 0) {
282 "af-xdp: unable to remove XDP program from '%s', ifindex: %d\n",
283 s
->ifname
, s
->ifindex
);
287 static int af_xdp_umem_create(AFXDPState
*s
, int sock_fd
, Error
**errp
)
289 struct xsk_umem_config config
= {
290 .fill_size
= XSK_RING_PROD__DEFAULT_NUM_DESCS
,
291 .comp_size
= XSK_RING_CONS__DEFAULT_NUM_DESCS
,
292 .frame_size
= XSK_UMEM__DEFAULT_FRAME_SIZE
,
300 /* Number of descriptors if all 4 queues (rx, tx, cq, fq) are full. */
301 n_descs
= (XSK_RING_PROD__DEFAULT_NUM_DESCS
302 + XSK_RING_CONS__DEFAULT_NUM_DESCS
) * 2;
303 size
= n_descs
* XSK_UMEM__DEFAULT_FRAME_SIZE
;
305 s
->buffer
= qemu_memalign(qemu_real_host_page_size(), size
);
306 memset(s
->buffer
, 0, size
);
309 ret
= xsk_umem__create(&s
->umem
, s
->buffer
, size
,
310 &s
->fq
, &s
->cq
, &config
);
312 ret
= xsk_umem__create_with_fd(&s
->umem
, sock_fd
, s
->buffer
, size
,
313 &s
->fq
, &s
->cq
, &config
);
317 qemu_vfree(s
->buffer
);
318 error_setg_errno(errp
, errno
,
319 "failed to create umem for %s queue_index: %d",
320 s
->ifname
, s
->nc
.queue_index
);
324 s
->pool
= g_new(uint64_t, n_descs
);
325 /* Fill the pool in the opposite order, because it's a LIFO queue. */
326 for (i
= n_descs
; i
>= 0; i
--) {
327 s
->pool
[i
] = i
* XSK_UMEM__DEFAULT_FRAME_SIZE
;
331 af_xdp_fq_refill(s
, XSK_RING_PROD__DEFAULT_NUM_DESCS
);
336 static int af_xdp_socket_create(AFXDPState
*s
,
337 const NetdevAFXDPOptions
*opts
, Error
**errp
)
339 struct xsk_socket_config cfg
= {
340 .rx_size
= XSK_RING_CONS__DEFAULT_NUM_DESCS
,
341 .tx_size
= XSK_RING_PROD__DEFAULT_NUM_DESCS
,
343 .bind_flags
= XDP_USE_NEED_WAKEUP
,
344 .xdp_flags
= XDP_FLAGS_UPDATE_IF_NOEXIST
,
346 int queue_id
, error
= 0;
348 s
->inhibit
= opts
->has_inhibit
&& opts
->inhibit
;
350 cfg
.libxdp_flags
|= XSK_LIBXDP_FLAGS__INHIBIT_PROG_LOAD
;
353 if (opts
->has_force_copy
&& opts
->force_copy
) {
354 cfg
.bind_flags
|= XDP_COPY
;
357 queue_id
= s
->nc
.queue_index
;
358 if (opts
->has_start_queue
&& opts
->start_queue
> 0) {
359 queue_id
+= opts
->start_queue
;
362 if (opts
->has_mode
) {
363 /* Specific mode requested. */
364 cfg
.xdp_flags
|= (opts
->mode
== AFXDP_MODE_NATIVE
)
365 ? XDP_FLAGS_DRV_MODE
: XDP_FLAGS_SKB_MODE
;
366 if (xsk_socket__create(&s
->xsk
, s
->ifname
, queue_id
,
367 s
->umem
, &s
->rx
, &s
->tx
, &cfg
)) {
371 /* No mode requested, try native first. */
372 cfg
.xdp_flags
|= XDP_FLAGS_DRV_MODE
;
374 if (xsk_socket__create(&s
->xsk
, s
->ifname
, queue_id
,
375 s
->umem
, &s
->rx
, &s
->tx
, &cfg
)) {
376 /* Can't use native mode, try skb. */
377 cfg
.xdp_flags
&= ~XDP_FLAGS_DRV_MODE
;
378 cfg
.xdp_flags
|= XDP_FLAGS_SKB_MODE
;
380 if (xsk_socket__create(&s
->xsk
, s
->ifname
, queue_id
,
381 s
->umem
, &s
->rx
, &s
->tx
, &cfg
)) {
388 error_setg_errno(errp
, error
,
389 "failed to create AF_XDP socket for %s queue_id: %d",
390 s
->ifname
, queue_id
);
394 s
->xdp_flags
= cfg
.xdp_flags
;
399 /* NetClientInfo methods. */
400 static NetClientInfo net_af_xdp_info
= {
401 .type
= NET_CLIENT_DRIVER_AF_XDP
,
402 .size
= sizeof(AFXDPState
),
403 .receive
= af_xdp_receive
,
405 .cleanup
= af_xdp_cleanup
,
408 static int *parse_socket_fds(const char *sock_fds_str
,
409 int64_t n_expected
, Error
**errp
)
411 gchar
**substrings
= g_strsplit(sock_fds_str
, ":", -1);
412 int64_t i
, n_sock_fds
= g_strv_length(substrings
);
413 int *sock_fds
= NULL
;
415 if (n_sock_fds
!= n_expected
) {
416 error_setg(errp
, "expected %"PRIi64
" socket fds, got %"PRIi64
,
417 n_expected
, n_sock_fds
);
421 sock_fds
= g_new(int, n_sock_fds
);
423 for (i
= 0; i
< n_sock_fds
; i
++) {
424 sock_fds
[i
] = monitor_fd_param(monitor_cur(), substrings
[i
], errp
);
425 if (sock_fds
[i
] < 0) {
433 g_strfreev(substrings
);
438 * The exported init function.
440 * ... -netdev af-xdp,ifname="..."
442 int net_init_af_xdp(const Netdev
*netdev
,
443 const char *name
, NetClientState
*peer
, Error
**errp
)
445 const NetdevAFXDPOptions
*opts
= &netdev
->u
.af_xdp
;
446 NetClientState
*nc
, *nc0
= NULL
;
447 unsigned int ifindex
;
448 uint32_t prog_id
= 0;
449 int *sock_fds
= NULL
;
454 ifindex
= if_nametoindex(opts
->ifname
);
456 error_setg_errno(errp
, errno
, "failed to get ifindex for '%s'",
461 queues
= opts
->has_queues
? opts
->queues
: 1;
463 error_setg(errp
, "invalid number of queues (%" PRIi64
") for '%s'",
464 queues
, opts
->ifname
);
468 if ((opts
->has_inhibit
&& opts
->inhibit
) != !!opts
->sock_fds
) {
469 error_setg(errp
, "'inhibit=on' requires 'sock-fds' and vice versa");
473 if (opts
->sock_fds
) {
474 sock_fds
= parse_socket_fds(opts
->sock_fds
, queues
, errp
);
480 for (i
= 0; i
< queues
; i
++) {
481 nc
= qemu_new_net_client(&net_af_xdp_info
, peer
, "af-xdp", name
);
482 qemu_set_info_str(nc
, "af-xdp%"PRIi64
" to %s", i
, opts
->ifname
);
489 s
= DO_UPCAST(AFXDPState
, nc
, nc
);
491 pstrcpy(s
->ifname
, sizeof(s
->ifname
), opts
->ifname
);
492 s
->ifindex
= ifindex
;
493 s
->n_queues
= queues
;
495 if (af_xdp_umem_create(s
, sock_fds
? sock_fds
[i
] : -1, errp
)
496 || af_xdp_socket_create(s
, opts
, errp
)) {
497 /* Make sure the XDP program will be removed. */
499 error_propagate(errp
, err
);
505 s
= DO_UPCAST(AFXDPState
, nc
, nc0
);
506 if (bpf_xdp_query_id(s
->ifindex
, s
->xdp_flags
, &prog_id
) || !prog_id
) {
507 error_setg_errno(errp
, errno
,
508 "no XDP program loaded on '%s', ifindex: %d",
509 s
->ifname
, s
->ifindex
);
514 af_xdp_read_poll(s
, true); /* Initially only poll for reads. */
521 qemu_del_net_client(nc0
);