migration/rdma: Convert qemu_rdma_write() to Error
[qemu/armbru.git] / net / af-xdp.c
blob6c65028fb00aa42f9f9b2116fc5df4aebf4e18b5
1 /*
2 * AF_XDP network backend.
4 * Copyright (c) 2023 Red Hat, Inc.
6 * Authors:
7 * Ilya Maximets <i.maximets@ovn.org>
9 * This work is licensed under the terms of the GNU GPL, version 2 or later.
10 * See the COPYING file in the top-level directory.
14 #include "qemu/osdep.h"
15 #include <bpf/bpf.h>
16 #include <inttypes.h>
17 #include <linux/if_link.h>
18 #include <linux/if_xdp.h>
19 #include <net/if.h>
20 #include <xdp/xsk.h>
22 #include "clients.h"
23 #include "monitor/monitor.h"
24 #include "net/net.h"
25 #include "qapi/error.h"
26 #include "qemu/cutils.h"
27 #include "qemu/error-report.h"
28 #include "qemu/iov.h"
29 #include "qemu/main-loop.h"
30 #include "qemu/memalign.h"
33 typedef struct AFXDPState {
34 NetClientState nc;
36 struct xsk_socket *xsk;
37 struct xsk_ring_cons rx;
38 struct xsk_ring_prod tx;
39 struct xsk_ring_cons cq;
40 struct xsk_ring_prod fq;
42 char ifname[IFNAMSIZ];
43 int ifindex;
44 bool read_poll;
45 bool write_poll;
46 uint32_t outstanding_tx;
48 uint64_t *pool;
49 uint32_t n_pool;
50 char *buffer;
51 struct xsk_umem *umem;
53 uint32_t n_queues;
54 uint32_t xdp_flags;
55 bool inhibit;
56 } AFXDPState;
58 #define AF_XDP_BATCH_SIZE 64
60 static void af_xdp_send(void *opaque);
61 static void af_xdp_writable(void *opaque);
63 /* Set the event-loop handlers for the af-xdp backend. */
64 static void af_xdp_update_fd_handler(AFXDPState *s)
66 qemu_set_fd_handler(xsk_socket__fd(s->xsk),
67 s->read_poll ? af_xdp_send : NULL,
68 s->write_poll ? af_xdp_writable : NULL,
69 s);
72 /* Update the read handler. */
73 static void af_xdp_read_poll(AFXDPState *s, bool enable)
75 if (s->read_poll != enable) {
76 s->read_poll = enable;
77 af_xdp_update_fd_handler(s);
81 /* Update the write handler. */
82 static void af_xdp_write_poll(AFXDPState *s, bool enable)
84 if (s->write_poll != enable) {
85 s->write_poll = enable;
86 af_xdp_update_fd_handler(s);
90 static void af_xdp_poll(NetClientState *nc, bool enable)
92 AFXDPState *s = DO_UPCAST(AFXDPState, nc, nc);
94 if (s->read_poll != enable || s->write_poll != enable) {
95 s->write_poll = enable;
96 s->read_poll = enable;
97 af_xdp_update_fd_handler(s);
101 static void af_xdp_complete_tx(AFXDPState *s)
103 uint32_t idx = 0;
104 uint32_t done, i;
105 uint64_t *addr;
107 done = xsk_ring_cons__peek(&s->cq, XSK_RING_CONS__DEFAULT_NUM_DESCS, &idx);
109 for (i = 0; i < done; i++) {
110 addr = (void *) xsk_ring_cons__comp_addr(&s->cq, idx++);
111 s->pool[s->n_pool++] = *addr;
112 s->outstanding_tx--;
115 if (done) {
116 xsk_ring_cons__release(&s->cq, done);
121 * The fd_write() callback, invoked if the fd is marked as writable
122 * after a poll.
124 static void af_xdp_writable(void *opaque)
126 AFXDPState *s = opaque;
128 /* Try to recover buffers that are already sent. */
129 af_xdp_complete_tx(s);
132 * Unregister the handler, unless we still have packets to transmit
133 * and kernel needs a wake up.
135 if (!s->outstanding_tx || !xsk_ring_prod__needs_wakeup(&s->tx)) {
136 af_xdp_write_poll(s, false);
139 /* Flush any buffered packets. */
140 qemu_flush_queued_packets(&s->nc);
143 static ssize_t af_xdp_receive(NetClientState *nc,
144 const uint8_t *buf, size_t size)
146 AFXDPState *s = DO_UPCAST(AFXDPState, nc, nc);
147 struct xdp_desc *desc;
148 uint32_t idx;
149 void *data;
151 /* Try to recover buffers that are already sent. */
152 af_xdp_complete_tx(s);
154 if (size > XSK_UMEM__DEFAULT_FRAME_SIZE) {
155 /* We can't transmit packet this size... */
156 return size;
159 if (!s->n_pool || !xsk_ring_prod__reserve(&s->tx, 1, &idx)) {
161 * Out of buffers or space in tx ring. Poll until we can write.
162 * This will also kick the Tx, if it was waiting on CQ.
164 af_xdp_write_poll(s, true);
165 return 0;
168 desc = xsk_ring_prod__tx_desc(&s->tx, idx);
169 desc->addr = s->pool[--s->n_pool];
170 desc->len = size;
172 data = xsk_umem__get_data(s->buffer, desc->addr);
173 memcpy(data, buf, size);
175 xsk_ring_prod__submit(&s->tx, 1);
176 s->outstanding_tx++;
178 if (xsk_ring_prod__needs_wakeup(&s->tx)) {
179 af_xdp_write_poll(s, true);
182 return size;
186 * Complete a previous send (backend --> guest) and enable the
187 * fd_read callback.
189 static void af_xdp_send_completed(NetClientState *nc, ssize_t len)
191 AFXDPState *s = DO_UPCAST(AFXDPState, nc, nc);
193 af_xdp_read_poll(s, true);
196 static void af_xdp_fq_refill(AFXDPState *s, uint32_t n)
198 uint32_t i, idx = 0;
200 /* Leave one packet for Tx, just in case. */
201 if (s->n_pool < n + 1) {
202 n = s->n_pool;
205 if (!n || !xsk_ring_prod__reserve(&s->fq, n, &idx)) {
206 return;
209 for (i = 0; i < n; i++) {
210 *xsk_ring_prod__fill_addr(&s->fq, idx++) = s->pool[--s->n_pool];
212 xsk_ring_prod__submit(&s->fq, n);
214 if (xsk_ring_prod__needs_wakeup(&s->fq)) {
215 /* Receive was blocked by not having enough buffers. Wake it up. */
216 af_xdp_read_poll(s, true);
220 static void af_xdp_send(void *opaque)
222 uint32_t i, n_rx, idx = 0;
223 AFXDPState *s = opaque;
225 n_rx = xsk_ring_cons__peek(&s->rx, AF_XDP_BATCH_SIZE, &idx);
226 if (!n_rx) {
227 return;
230 for (i = 0; i < n_rx; i++) {
231 const struct xdp_desc *desc;
232 struct iovec iov;
234 desc = xsk_ring_cons__rx_desc(&s->rx, idx++);
236 iov.iov_base = xsk_umem__get_data(s->buffer, desc->addr);
237 iov.iov_len = desc->len;
239 s->pool[s->n_pool++] = desc->addr;
241 if (!qemu_sendv_packet_async(&s->nc, &iov, 1,
242 af_xdp_send_completed)) {
244 * The peer does not receive anymore. Packet is queued, stop
245 * reading from the backend until af_xdp_send_completed().
247 af_xdp_read_poll(s, false);
249 /* Return unused descriptors to not break the ring cache. */
250 xsk_ring_cons__cancel(&s->rx, n_rx - i - 1);
251 n_rx = i + 1;
252 break;
256 /* Release actually sent descriptors and try to re-fill. */
257 xsk_ring_cons__release(&s->rx, n_rx);
258 af_xdp_fq_refill(s, AF_XDP_BATCH_SIZE);
261 /* Flush and close. */
262 static void af_xdp_cleanup(NetClientState *nc)
264 AFXDPState *s = DO_UPCAST(AFXDPState, nc, nc);
266 qemu_purge_queued_packets(nc);
268 af_xdp_poll(nc, false);
270 xsk_socket__delete(s->xsk);
271 s->xsk = NULL;
272 g_free(s->pool);
273 s->pool = NULL;
274 xsk_umem__delete(s->umem);
275 s->umem = NULL;
276 qemu_vfree(s->buffer);
277 s->buffer = NULL;
279 /* Remove the program if it's the last open queue. */
280 if (!s->inhibit && nc->queue_index == s->n_queues - 1 && s->xdp_flags
281 && bpf_xdp_detach(s->ifindex, s->xdp_flags, NULL) != 0) {
282 fprintf(stderr,
283 "af-xdp: unable to remove XDP program from '%s', ifindex: %d\n",
284 s->ifname, s->ifindex);
288 static int af_xdp_umem_create(AFXDPState *s, int sock_fd, Error **errp)
290 struct xsk_umem_config config = {
291 .fill_size = XSK_RING_PROD__DEFAULT_NUM_DESCS,
292 .comp_size = XSK_RING_CONS__DEFAULT_NUM_DESCS,
293 .frame_size = XSK_UMEM__DEFAULT_FRAME_SIZE,
294 .frame_headroom = 0,
296 uint64_t n_descs;
297 uint64_t size;
298 int64_t i;
299 int ret;
301 /* Number of descriptors if all 4 queues (rx, tx, cq, fq) are full. */
302 n_descs = (XSK_RING_PROD__DEFAULT_NUM_DESCS
303 + XSK_RING_CONS__DEFAULT_NUM_DESCS) * 2;
304 size = n_descs * XSK_UMEM__DEFAULT_FRAME_SIZE;
306 s->buffer = qemu_memalign(qemu_real_host_page_size(), size);
307 memset(s->buffer, 0, size);
309 if (sock_fd < 0) {
310 ret = xsk_umem__create(&s->umem, s->buffer, size,
311 &s->fq, &s->cq, &config);
312 } else {
313 ret = xsk_umem__create_with_fd(&s->umem, sock_fd, s->buffer, size,
314 &s->fq, &s->cq, &config);
317 if (ret) {
318 qemu_vfree(s->buffer);
319 error_setg_errno(errp, errno,
320 "failed to create umem for %s queue_index: %d",
321 s->ifname, s->nc.queue_index);
322 return -1;
325 s->pool = g_new(uint64_t, n_descs);
326 /* Fill the pool in the opposite order, because it's a LIFO queue. */
327 for (i = n_descs; i >= 0; i--) {
328 s->pool[i] = i * XSK_UMEM__DEFAULT_FRAME_SIZE;
330 s->n_pool = n_descs;
332 af_xdp_fq_refill(s, XSK_RING_PROD__DEFAULT_NUM_DESCS);
334 return 0;
337 static int af_xdp_socket_create(AFXDPState *s,
338 const NetdevAFXDPOptions *opts, Error **errp)
340 struct xsk_socket_config cfg = {
341 .rx_size = XSK_RING_CONS__DEFAULT_NUM_DESCS,
342 .tx_size = XSK_RING_PROD__DEFAULT_NUM_DESCS,
343 .libxdp_flags = 0,
344 .bind_flags = XDP_USE_NEED_WAKEUP,
345 .xdp_flags = XDP_FLAGS_UPDATE_IF_NOEXIST,
347 int queue_id, error = 0;
349 s->inhibit = opts->has_inhibit && opts->inhibit;
350 if (s->inhibit) {
351 cfg.libxdp_flags |= XSK_LIBXDP_FLAGS__INHIBIT_PROG_LOAD;
354 if (opts->has_force_copy && opts->force_copy) {
355 cfg.bind_flags |= XDP_COPY;
358 queue_id = s->nc.queue_index;
359 if (opts->has_start_queue && opts->start_queue > 0) {
360 queue_id += opts->start_queue;
363 if (opts->has_mode) {
364 /* Specific mode requested. */
365 cfg.xdp_flags |= (opts->mode == AFXDP_MODE_NATIVE)
366 ? XDP_FLAGS_DRV_MODE : XDP_FLAGS_SKB_MODE;
367 if (xsk_socket__create(&s->xsk, s->ifname, queue_id,
368 s->umem, &s->rx, &s->tx, &cfg)) {
369 error = errno;
371 } else {
372 /* No mode requested, try native first. */
373 cfg.xdp_flags |= XDP_FLAGS_DRV_MODE;
375 if (xsk_socket__create(&s->xsk, s->ifname, queue_id,
376 s->umem, &s->rx, &s->tx, &cfg)) {
377 /* Can't use native mode, try skb. */
378 cfg.xdp_flags &= ~XDP_FLAGS_DRV_MODE;
379 cfg.xdp_flags |= XDP_FLAGS_SKB_MODE;
381 if (xsk_socket__create(&s->xsk, s->ifname, queue_id,
382 s->umem, &s->rx, &s->tx, &cfg)) {
383 error = errno;
388 if (error) {
389 error_setg_errno(errp, error,
390 "failed to create AF_XDP socket for %s queue_id: %d",
391 s->ifname, queue_id);
392 return -1;
395 s->xdp_flags = cfg.xdp_flags;
397 return 0;
400 /* NetClientInfo methods. */
401 static NetClientInfo net_af_xdp_info = {
402 .type = NET_CLIENT_DRIVER_AF_XDP,
403 .size = sizeof(AFXDPState),
404 .receive = af_xdp_receive,
405 .poll = af_xdp_poll,
406 .cleanup = af_xdp_cleanup,
409 static int *parse_socket_fds(const char *sock_fds_str,
410 int64_t n_expected, Error **errp)
412 gchar **substrings = g_strsplit(sock_fds_str, ":", -1);
413 int64_t i, n_sock_fds = g_strv_length(substrings);
414 int *sock_fds = NULL;
416 if (n_sock_fds != n_expected) {
417 error_setg(errp, "expected %"PRIi64" socket fds, got %"PRIi64,
418 n_expected, n_sock_fds);
419 goto exit;
422 sock_fds = g_new(int, n_sock_fds);
424 for (i = 0; i < n_sock_fds; i++) {
425 sock_fds[i] = monitor_fd_param(monitor_cur(), substrings[i], errp);
426 if (sock_fds[i] < 0) {
427 g_free(sock_fds);
428 sock_fds = NULL;
429 goto exit;
433 exit:
434 g_strfreev(substrings);
435 return sock_fds;
439 * The exported init function.
441 * ... -netdev af-xdp,ifname="..."
443 int net_init_af_xdp(const Netdev *netdev,
444 const char *name, NetClientState *peer, Error **errp)
446 const NetdevAFXDPOptions *opts = &netdev->u.af_xdp;
447 NetClientState *nc, *nc0 = NULL;
448 unsigned int ifindex;
449 uint32_t prog_id = 0;
450 int *sock_fds = NULL;
451 int64_t i, queues;
452 Error *err = NULL;
453 AFXDPState *s;
455 ifindex = if_nametoindex(opts->ifname);
456 if (!ifindex) {
457 error_setg_errno(errp, errno, "failed to get ifindex for '%s'",
458 opts->ifname);
459 return -1;
462 queues = opts->has_queues ? opts->queues : 1;
463 if (queues < 1) {
464 error_setg(errp, "invalid number of queues (%" PRIi64 ") for '%s'",
465 queues, opts->ifname);
466 return -1;
469 if ((opts->has_inhibit && opts->inhibit) != !!opts->sock_fds) {
470 error_setg(errp, "'inhibit=on' requires 'sock-fds' and vice versa");
471 return -1;
474 if (opts->sock_fds) {
475 sock_fds = parse_socket_fds(opts->sock_fds, queues, errp);
476 if (!sock_fds) {
477 return -1;
481 for (i = 0; i < queues; i++) {
482 nc = qemu_new_net_client(&net_af_xdp_info, peer, "af-xdp", name);
483 qemu_set_info_str(nc, "af-xdp%"PRIi64" to %s", i, opts->ifname);
484 nc->queue_index = i;
486 if (!nc0) {
487 nc0 = nc;
490 s = DO_UPCAST(AFXDPState, nc, nc);
492 pstrcpy(s->ifname, sizeof(s->ifname), opts->ifname);
493 s->ifindex = ifindex;
494 s->n_queues = queues;
496 if (af_xdp_umem_create(s, sock_fds ? sock_fds[i] : -1, errp)
497 || af_xdp_socket_create(s, opts, errp)) {
498 /* Make sure the XDP program will be removed. */
499 s->n_queues = i;
500 error_propagate(errp, err);
501 goto err;
505 if (nc0) {
506 s = DO_UPCAST(AFXDPState, nc, nc0);
507 if (bpf_xdp_query_id(s->ifindex, s->xdp_flags, &prog_id) || !prog_id) {
508 error_setg_errno(errp, errno,
509 "no XDP program loaded on '%s', ifindex: %d",
510 s->ifname, s->ifindex);
511 goto err;
515 af_xdp_read_poll(s, true); /* Initially only poll for reads. */
517 return 0;
519 err:
520 g_free(sock_fds);
521 if (nc0) {
522 qemu_del_net_client(nc0);
525 return -1;