Virtiofsd: fix memory leak on fuse queueinfo
[qemu/ar7.git] / tools / virtiofsd / fuse_virtio.c
blobfb8d6d1379e3569d90c6315214905f4aaf1c7d8e
1 /*
2 * virtio-fs glue for FUSE
3 * Copyright (C) 2018 Red Hat, Inc. and/or its affiliates
5 * Authors:
6 * Dave Gilbert <dgilbert@redhat.com>
8 * Implements the glue between libfuse and libvhost-user
10 * This program can be distributed under the terms of the GNU LGPLv2.
11 * See the file COPYING.LIB
14 #include "qemu/osdep.h"
15 #include "qemu/iov.h"
16 #include "qapi/error.h"
17 #include "fuse_i.h"
18 #include "standard-headers/linux/fuse.h"
19 #include "fuse_misc.h"
20 #include "fuse_opt.h"
21 #include "fuse_virtio.h"
23 #include <assert.h>
24 #include <errno.h>
25 #include <stdint.h>
26 #include <stdio.h>
27 #include <stdlib.h>
28 #include <string.h>
29 #include <sys/eventfd.h>
30 #include <sys/socket.h>
31 #include <sys/types.h>
32 #include <sys/un.h>
33 #include <unistd.h>
35 #include "contrib/libvhost-user/libvhost-user.h"
37 struct fv_VuDev;
38 struct fv_QueueInfo {
39 pthread_t thread;
40 struct fv_VuDev *virtio_dev;
42 /* Our queue index, corresponds to array position */
43 int qidx;
44 int kick_fd;
45 int kill_fd; /* For killing the thread */
47 /* The element for the command currently being processed */
48 VuVirtqElement *qe;
49 bool reply_sent;
53 * We pass the dev element into libvhost-user
54 * and then use it to get back to the outer
55 * container for other data.
57 struct fv_VuDev {
58 VuDev dev;
59 struct fuse_session *se;
62 * The following pair of fields are only accessed in the main
63 * virtio_loop
65 size_t nqueues;
66 struct fv_QueueInfo **qi;
69 /* From spec */
70 struct virtio_fs_config {
71 char tag[36];
72 uint32_t num_queues;
75 /* Callback from libvhost-user */
76 static uint64_t fv_get_features(VuDev *dev)
78 return 1ULL << VIRTIO_F_VERSION_1;
81 /* Callback from libvhost-user */
82 static void fv_set_features(VuDev *dev, uint64_t features)
87 * Callback from libvhost-user if there's a new fd we're supposed to listen
88 * to, typically a queue kick?
90 static void fv_set_watch(VuDev *dev, int fd, int condition, vu_watch_cb cb,
91 void *data)
93 fuse_log(FUSE_LOG_WARNING, "%s: TODO! fd=%d\n", __func__, fd);
97 * Callback from libvhost-user if we're no longer supposed to listen on an fd
99 static void fv_remove_watch(VuDev *dev, int fd)
101 fuse_log(FUSE_LOG_WARNING, "%s: TODO! fd=%d\n", __func__, fd);
104 /* Callback from libvhost-user to panic */
105 static void fv_panic(VuDev *dev, const char *err)
107 fuse_log(FUSE_LOG_ERR, "%s: libvhost-user: %s\n", __func__, err);
108 /* TODO: Allow reconnects?? */
109 exit(EXIT_FAILURE);
113 * Copy from an iovec into a fuse_buf (memory only)
114 * Caller must ensure there is space
116 static void copy_from_iov(struct fuse_buf *buf, size_t out_num,
117 const struct iovec *out_sg)
119 void *dest = buf->mem;
121 while (out_num) {
122 size_t onelen = out_sg->iov_len;
123 memcpy(dest, out_sg->iov_base, onelen);
124 dest += onelen;
125 out_sg++;
126 out_num--;
131 * Copy from one iov to another, the given number of bytes
132 * The caller must have checked sizes.
134 static void copy_iov(struct iovec *src_iov, int src_count,
135 struct iovec *dst_iov, int dst_count, size_t to_copy)
137 size_t dst_offset = 0;
138 /* Outer loop copies 'src' elements */
139 while (to_copy) {
140 assert(src_count);
141 size_t src_len = src_iov[0].iov_len;
142 size_t src_offset = 0;
144 if (src_len > to_copy) {
145 src_len = to_copy;
147 /* Inner loop copies contents of one 'src' to maybe multiple dst. */
148 while (src_len) {
149 assert(dst_count);
150 size_t dst_len = dst_iov[0].iov_len - dst_offset;
151 if (dst_len > src_len) {
152 dst_len = src_len;
155 memcpy(dst_iov[0].iov_base + dst_offset,
156 src_iov[0].iov_base + src_offset, dst_len);
157 src_len -= dst_len;
158 to_copy -= dst_len;
159 src_offset += dst_len;
160 dst_offset += dst_len;
162 assert(dst_offset <= dst_iov[0].iov_len);
163 if (dst_offset == dst_iov[0].iov_len) {
164 dst_offset = 0;
165 dst_iov++;
166 dst_count--;
169 src_iov++;
170 src_count--;
175 * Called back by ll whenever it wants to send a reply/message back
176 * The 1st element of the iov starts with the fuse_out_header
177 * 'unique'==0 means it's a notify message.
179 int virtio_send_msg(struct fuse_session *se, struct fuse_chan *ch,
180 struct iovec *iov, int count)
182 VuVirtqElement *elem;
183 VuVirtq *q;
184 int ret = 0;
186 assert(count >= 1);
187 assert(iov[0].iov_len >= sizeof(struct fuse_out_header));
189 struct fuse_out_header *out = iov[0].iov_base;
190 /* TODO: Endianness! */
192 size_t tosend_len = iov_size(iov, count);
194 /* unique == 0 is notification, which we don't support */
195 assert(out->unique);
196 /* For virtio we always have ch */
197 assert(ch);
198 assert(!ch->qi->reply_sent);
199 elem = ch->qi->qe;
200 q = &ch->qi->virtio_dev->dev.vq[ch->qi->qidx];
202 /* The 'in' part of the elem is to qemu */
203 unsigned int in_num = elem->in_num;
204 struct iovec *in_sg = elem->in_sg;
205 size_t in_len = iov_size(in_sg, in_num);
206 fuse_log(FUSE_LOG_DEBUG, "%s: elem %d: with %d in desc of length %zd\n",
207 __func__, elem->index, in_num, in_len);
210 * The elem should have room for a 'fuse_out_header' (out from fuse)
211 * plus the data based on the len in the header.
213 if (in_len < sizeof(struct fuse_out_header)) {
214 fuse_log(FUSE_LOG_ERR, "%s: elem %d too short for out_header\n",
215 __func__, elem->index);
216 ret = -E2BIG;
217 goto err;
219 if (in_len < tosend_len) {
220 fuse_log(FUSE_LOG_ERR, "%s: elem %d too small for data len %zd\n",
221 __func__, elem->index, tosend_len);
222 ret = -E2BIG;
223 goto err;
226 copy_iov(iov, count, in_sg, in_num, tosend_len);
227 vu_queue_push(&se->virtio_dev->dev, q, elem, tosend_len);
228 vu_queue_notify(&se->virtio_dev->dev, q);
229 ch->qi->reply_sent = true;
231 err:
232 return ret;
236 * Callback from fuse_send_data_iov_* when it's virtio and the buffer
237 * is a single FD with FUSE_BUF_IS_FD | FUSE_BUF_FD_SEEK
238 * We need send the iov and then the buffer.
239 * Return 0 on success
241 int virtio_send_data_iov(struct fuse_session *se, struct fuse_chan *ch,
242 struct iovec *iov, int count, struct fuse_bufvec *buf,
243 size_t len)
245 int ret = 0;
246 VuVirtqElement *elem;
247 VuVirtq *q;
249 assert(count >= 1);
250 assert(iov[0].iov_len >= sizeof(struct fuse_out_header));
252 struct fuse_out_header *out = iov[0].iov_base;
253 /* TODO: Endianness! */
255 size_t iov_len = iov_size(iov, count);
256 size_t tosend_len = iov_len + len;
258 out->len = tosend_len;
260 fuse_log(FUSE_LOG_DEBUG, "%s: count=%d len=%zd iov_len=%zd\n", __func__,
261 count, len, iov_len);
263 /* unique == 0 is notification which we don't support */
264 assert(out->unique);
266 /* For virtio we always have ch */
267 assert(ch);
268 assert(!ch->qi->reply_sent);
269 elem = ch->qi->qe;
270 q = &ch->qi->virtio_dev->dev.vq[ch->qi->qidx];
272 /* The 'in' part of the elem is to qemu */
273 unsigned int in_num = elem->in_num;
274 struct iovec *in_sg = elem->in_sg;
275 size_t in_len = iov_size(in_sg, in_num);
276 fuse_log(FUSE_LOG_DEBUG, "%s: elem %d: with %d in desc of length %zd\n",
277 __func__, elem->index, in_num, in_len);
280 * The elem should have room for a 'fuse_out_header' (out from fuse)
281 * plus the data based on the len in the header.
283 if (in_len < sizeof(struct fuse_out_header)) {
284 fuse_log(FUSE_LOG_ERR, "%s: elem %d too short for out_header\n",
285 __func__, elem->index);
286 ret = E2BIG;
287 goto err;
289 if (in_len < tosend_len) {
290 fuse_log(FUSE_LOG_ERR, "%s: elem %d too small for data len %zd\n",
291 __func__, elem->index, tosend_len);
292 ret = E2BIG;
293 goto err;
296 /* TODO: Limit to 'len' */
298 /* First copy the header data from iov->in_sg */
299 copy_iov(iov, count, in_sg, in_num, iov_len);
302 * Build a copy of the the in_sg iov so we can skip bits in it,
303 * including changing the offsets
305 struct iovec *in_sg_cpy = calloc(sizeof(struct iovec), in_num);
306 assert(in_sg_cpy);
307 memcpy(in_sg_cpy, in_sg, sizeof(struct iovec) * in_num);
308 /* These get updated as we skip */
309 struct iovec *in_sg_ptr = in_sg_cpy;
310 int in_sg_cpy_count = in_num;
312 /* skip over parts of in_sg that contained the header iov */
313 size_t skip_size = iov_len;
315 size_t in_sg_left = 0;
316 do {
317 while (skip_size != 0 && in_sg_cpy_count) {
318 if (skip_size >= in_sg_ptr[0].iov_len) {
319 skip_size -= in_sg_ptr[0].iov_len;
320 in_sg_ptr++;
321 in_sg_cpy_count--;
322 } else {
323 in_sg_ptr[0].iov_len -= skip_size;
324 in_sg_ptr[0].iov_base += skip_size;
325 break;
329 int i;
330 for (i = 0, in_sg_left = 0; i < in_sg_cpy_count; i++) {
331 in_sg_left += in_sg_ptr[i].iov_len;
333 fuse_log(FUSE_LOG_DEBUG,
334 "%s: after skip skip_size=%zd in_sg_cpy_count=%d "
335 "in_sg_left=%zd\n",
336 __func__, skip_size, in_sg_cpy_count, in_sg_left);
337 ret = preadv(buf->buf[0].fd, in_sg_ptr, in_sg_cpy_count,
338 buf->buf[0].pos);
340 if (ret == -1) {
341 ret = errno;
342 fuse_log(FUSE_LOG_DEBUG, "%s: preadv failed (%m) len=%zd\n",
343 __func__, len);
344 free(in_sg_cpy);
345 goto err;
347 fuse_log(FUSE_LOG_DEBUG, "%s: preadv ret=%d len=%zd\n", __func__,
348 ret, len);
349 if (ret < len && ret) {
350 fuse_log(FUSE_LOG_DEBUG, "%s: ret < len\n", __func__);
351 /* Skip over this much next time around */
352 skip_size = ret;
353 buf->buf[0].pos += ret;
354 len -= ret;
356 /* Lets do another read */
357 continue;
359 if (!ret) {
360 /* EOF case? */
361 fuse_log(FUSE_LOG_DEBUG, "%s: !ret in_sg_left=%zd\n", __func__,
362 in_sg_left);
363 break;
365 if (ret != len) {
366 fuse_log(FUSE_LOG_DEBUG, "%s: ret!=len\n", __func__);
367 ret = EIO;
368 free(in_sg_cpy);
369 goto err;
371 in_sg_left -= ret;
372 len -= ret;
373 } while (in_sg_left);
374 free(in_sg_cpy);
376 /* Need to fix out->len on EOF */
377 if (len) {
378 struct fuse_out_header *out_sg = in_sg[0].iov_base;
380 tosend_len -= len;
381 out_sg->len = tosend_len;
384 ret = 0;
386 vu_queue_push(&se->virtio_dev->dev, q, elem, tosend_len);
387 vu_queue_notify(&se->virtio_dev->dev, q);
389 err:
390 if (ret == 0) {
391 ch->qi->reply_sent = true;
394 return ret;
397 /* Thread function for individual queues, created when a queue is 'started' */
398 static void *fv_queue_thread(void *opaque)
400 struct fv_QueueInfo *qi = opaque;
401 struct VuDev *dev = &qi->virtio_dev->dev;
402 struct VuVirtq *q = vu_get_queue(dev, qi->qidx);
403 struct fuse_session *se = qi->virtio_dev->se;
404 struct fuse_chan ch;
405 struct fuse_buf fbuf;
407 fbuf.mem = NULL;
408 fbuf.flags = 0;
410 fuse_mutex_init(&ch.lock);
411 ch.fd = (int)0xdaff0d111;
412 ch.qi = qi;
414 fuse_log(FUSE_LOG_INFO, "%s: Start for queue %d kick_fd %d\n", __func__,
415 qi->qidx, qi->kick_fd);
416 while (1) {
417 struct pollfd pf[2];
418 pf[0].fd = qi->kick_fd;
419 pf[0].events = POLLIN;
420 pf[0].revents = 0;
421 pf[1].fd = qi->kill_fd;
422 pf[1].events = POLLIN;
423 pf[1].revents = 0;
425 fuse_log(FUSE_LOG_DEBUG, "%s: Waiting for Queue %d event\n", __func__,
426 qi->qidx);
427 int poll_res = ppoll(pf, 2, NULL, NULL);
429 if (poll_res == -1) {
430 if (errno == EINTR) {
431 fuse_log(FUSE_LOG_INFO, "%s: ppoll interrupted, going around\n",
432 __func__);
433 continue;
435 fuse_log(FUSE_LOG_ERR, "fv_queue_thread ppoll: %m\n");
436 break;
438 assert(poll_res >= 1);
439 if (pf[0].revents & (POLLERR | POLLHUP | POLLNVAL)) {
440 fuse_log(FUSE_LOG_ERR, "%s: Unexpected poll revents %x Queue %d\n",
441 __func__, pf[0].revents, qi->qidx);
442 break;
444 if (pf[1].revents & (POLLERR | POLLHUP | POLLNVAL)) {
445 fuse_log(FUSE_LOG_ERR,
446 "%s: Unexpected poll revents %x Queue %d killfd\n",
447 __func__, pf[1].revents, qi->qidx);
448 break;
450 if (pf[1].revents) {
451 fuse_log(FUSE_LOG_INFO, "%s: kill event on queue %d - quitting\n",
452 __func__, qi->qidx);
453 break;
455 assert(pf[0].revents & POLLIN);
456 fuse_log(FUSE_LOG_DEBUG, "%s: Got queue event on Queue %d\n", __func__,
457 qi->qidx);
459 eventfd_t evalue;
460 if (eventfd_read(qi->kick_fd, &evalue)) {
461 fuse_log(FUSE_LOG_ERR, "Eventfd_read for queue: %m\n");
462 break;
464 /* out is from guest, in is too guest */
465 unsigned int in_bytes, out_bytes;
466 vu_queue_get_avail_bytes(dev, q, &in_bytes, &out_bytes, ~0, ~0);
468 fuse_log(FUSE_LOG_DEBUG,
469 "%s: Queue %d gave evalue: %zx available: in: %u out: %u\n",
470 __func__, qi->qidx, (size_t)evalue, in_bytes, out_bytes);
472 while (1) {
473 bool allocated_bufv = false;
474 struct fuse_bufvec bufv;
475 struct fuse_bufvec *pbufv;
478 * An element contains one request and the space to send our
479 * response They're spread over multiple descriptors in a
480 * scatter/gather set and we can't trust the guest to keep them
481 * still; so copy in/out.
483 VuVirtqElement *elem = vu_queue_pop(dev, q, sizeof(VuVirtqElement));
484 if (!elem) {
485 break;
488 qi->qe = elem;
489 qi->reply_sent = false;
491 if (!fbuf.mem) {
492 fbuf.mem = malloc(se->bufsize);
493 assert(fbuf.mem);
494 assert(se->bufsize > sizeof(struct fuse_in_header));
496 /* The 'out' part of the elem is from qemu */
497 unsigned int out_num = elem->out_num;
498 struct iovec *out_sg = elem->out_sg;
499 size_t out_len = iov_size(out_sg, out_num);
500 fuse_log(FUSE_LOG_DEBUG,
501 "%s: elem %d: with %d out desc of length %zd\n", __func__,
502 elem->index, out_num, out_len);
505 * The elem should contain a 'fuse_in_header' (in to fuse)
506 * plus the data based on the len in the header.
508 if (out_len < sizeof(struct fuse_in_header)) {
509 fuse_log(FUSE_LOG_ERR, "%s: elem %d too short for in_header\n",
510 __func__, elem->index);
511 assert(0); /* TODO */
513 if (out_len > se->bufsize) {
514 fuse_log(FUSE_LOG_ERR, "%s: elem %d too large for buffer\n",
515 __func__, elem->index);
516 assert(0); /* TODO */
518 /* Copy just the first element and look at it */
519 copy_from_iov(&fbuf, 1, out_sg);
521 if (out_num > 2 &&
522 out_sg[0].iov_len == sizeof(struct fuse_in_header) &&
523 ((struct fuse_in_header *)fbuf.mem)->opcode == FUSE_WRITE &&
524 out_sg[1].iov_len == sizeof(struct fuse_write_in)) {
526 * For a write we don't actually need to copy the
527 * data, we can just do it straight out of guest memory
528 * but we must still copy the headers in case the guest
529 * was nasty and changed them while we were using them.
531 fuse_log(FUSE_LOG_DEBUG, "%s: Write special case\n", __func__);
533 /* copy the fuse_write_in header after the fuse_in_header */
534 fbuf.mem += out_sg->iov_len;
535 copy_from_iov(&fbuf, 1, out_sg + 1);
536 fbuf.mem -= out_sg->iov_len;
537 fbuf.size = out_sg[0].iov_len + out_sg[1].iov_len;
539 /* Allocate the bufv, with space for the rest of the iov */
540 allocated_bufv = true;
541 pbufv = malloc(sizeof(struct fuse_bufvec) +
542 sizeof(struct fuse_buf) * (out_num - 2));
543 if (!pbufv) {
544 vu_queue_unpop(dev, q, elem, 0);
545 free(elem);
546 fuse_log(FUSE_LOG_ERR, "%s: pbufv malloc failed\n",
547 __func__);
548 goto out;
551 pbufv->count = 1;
552 pbufv->buf[0] = fbuf;
554 size_t iovindex, pbufvindex;
555 iovindex = 2; /* 2 headers, separate iovs */
556 pbufvindex = 1; /* 2 headers, 1 fusebuf */
558 for (; iovindex < out_num; iovindex++, pbufvindex++) {
559 pbufv->count++;
560 pbufv->buf[pbufvindex].pos = ~0; /* Dummy */
561 pbufv->buf[pbufvindex].flags = 0;
562 pbufv->buf[pbufvindex].mem = out_sg[iovindex].iov_base;
563 pbufv->buf[pbufvindex].size = out_sg[iovindex].iov_len;
565 } else {
566 /* Normal (non fast write) path */
568 /* Copy the rest of the buffer */
569 fbuf.mem += out_sg->iov_len;
570 copy_from_iov(&fbuf, out_num - 1, out_sg + 1);
571 fbuf.mem -= out_sg->iov_len;
572 fbuf.size = out_len;
574 /* TODO! Endianness of header */
576 /* TODO: Add checks for fuse_session_exited */
577 bufv.buf[0] = fbuf;
578 bufv.count = 1;
579 pbufv = &bufv;
581 pbufv->idx = 0;
582 pbufv->off = 0;
583 fuse_session_process_buf_int(se, pbufv, &ch);
585 if (allocated_bufv) {
586 free(pbufv);
589 if (!qi->reply_sent) {
590 fuse_log(FUSE_LOG_DEBUG, "%s: elem %d no reply sent\n",
591 __func__, elem->index);
592 /* I think we've still got to recycle the element */
593 vu_queue_push(dev, q, elem, 0);
594 vu_queue_notify(dev, q);
596 qi->qe = NULL;
597 free(elem);
598 elem = NULL;
601 out:
602 pthread_mutex_destroy(&ch.lock);
603 free(fbuf.mem);
605 return NULL;
608 static void fv_queue_cleanup_thread(struct fv_VuDev *vud, int qidx)
610 int ret;
611 struct fv_QueueInfo *ourqi;
613 assert(qidx < vud->nqueues);
614 ourqi = vud->qi[qidx];
616 /* Kill the thread */
617 if (eventfd_write(ourqi->kill_fd, 1)) {
618 fuse_log(FUSE_LOG_ERR, "Eventfd_write for queue %d: %s\n",
619 qidx, strerror(errno));
621 ret = pthread_join(ourqi->thread, NULL);
622 if (ret) {
623 fuse_log(FUSE_LOG_ERR, "%s: Failed to join thread idx %d err %d\n",
624 __func__, qidx, ret);
626 close(ourqi->kill_fd);
627 ourqi->kick_fd = -1;
628 free(vud->qi[qidx]);
629 vud->qi[qidx] = NULL;
632 /* Callback from libvhost-user on start or stop of a queue */
633 static void fv_queue_set_started(VuDev *dev, int qidx, bool started)
635 struct fv_VuDev *vud = container_of(dev, struct fv_VuDev, dev);
636 struct fv_QueueInfo *ourqi;
638 fuse_log(FUSE_LOG_INFO, "%s: qidx=%d started=%d\n", __func__, qidx,
639 started);
640 assert(qidx >= 0);
643 * Ignore additional request queues for now. passthrough_ll.c must be
644 * audited for thread-safety issues first. It was written with a
645 * well-behaved client in mind and may not protect against all types of
646 * races yet.
648 if (qidx > 1) {
649 fuse_log(FUSE_LOG_ERR,
650 "%s: multiple request queues not yet implemented, please only "
651 "configure 1 request queue\n",
652 __func__);
653 exit(EXIT_FAILURE);
656 if (started) {
657 /* Fire up a thread to watch this queue */
658 if (qidx >= vud->nqueues) {
659 vud->qi = realloc(vud->qi, (qidx + 1) * sizeof(vud->qi[0]));
660 assert(vud->qi);
661 memset(vud->qi + vud->nqueues, 0,
662 sizeof(vud->qi[0]) * (1 + (qidx - vud->nqueues)));
663 vud->nqueues = qidx + 1;
665 if (!vud->qi[qidx]) {
666 vud->qi[qidx] = calloc(sizeof(struct fv_QueueInfo), 1);
667 assert(vud->qi[qidx]);
668 vud->qi[qidx]->virtio_dev = vud;
669 vud->qi[qidx]->qidx = qidx;
670 } else {
671 /* Shouldn't have been started */
672 assert(vud->qi[qidx]->kick_fd == -1);
674 ourqi = vud->qi[qidx];
675 ourqi->kick_fd = dev->vq[qidx].kick_fd;
677 ourqi->kill_fd = eventfd(0, EFD_CLOEXEC | EFD_SEMAPHORE);
678 assert(ourqi->kill_fd != -1);
679 if (pthread_create(&ourqi->thread, NULL, fv_queue_thread, ourqi)) {
680 fuse_log(FUSE_LOG_ERR, "%s: Failed to create thread for queue %d\n",
681 __func__, qidx);
682 assert(0);
684 } else {
685 fv_queue_cleanup_thread(vud, qidx);
689 static bool fv_queue_order(VuDev *dev, int qidx)
691 return false;
694 static const VuDevIface fv_iface = {
695 .get_features = fv_get_features,
696 .set_features = fv_set_features,
698 /* Don't need process message, we've not got any at vhost-user level */
699 .queue_set_started = fv_queue_set_started,
701 .queue_is_processed_in_order = fv_queue_order,
705 * Main loop; this mostly deals with events on the vhost-user
706 * socket itself, and not actual fuse data.
708 int virtio_loop(struct fuse_session *se)
710 fuse_log(FUSE_LOG_INFO, "%s: Entry\n", __func__);
712 while (!fuse_session_exited(se)) {
713 struct pollfd pf[1];
714 pf[0].fd = se->vu_socketfd;
715 pf[0].events = POLLIN;
716 pf[0].revents = 0;
718 fuse_log(FUSE_LOG_DEBUG, "%s: Waiting for VU event\n", __func__);
719 int poll_res = ppoll(pf, 1, NULL, NULL);
721 if (poll_res == -1) {
722 if (errno == EINTR) {
723 fuse_log(FUSE_LOG_INFO, "%s: ppoll interrupted, going around\n",
724 __func__);
725 continue;
727 fuse_log(FUSE_LOG_ERR, "virtio_loop ppoll: %m\n");
728 break;
730 assert(poll_res == 1);
731 if (pf[0].revents & (POLLERR | POLLHUP | POLLNVAL)) {
732 fuse_log(FUSE_LOG_ERR, "%s: Unexpected poll revents %x\n", __func__,
733 pf[0].revents);
734 break;
736 assert(pf[0].revents & POLLIN);
737 fuse_log(FUSE_LOG_DEBUG, "%s: Got VU event\n", __func__);
738 if (!vu_dispatch(&se->virtio_dev->dev)) {
739 fuse_log(FUSE_LOG_ERR, "%s: vu_dispatch failed\n", __func__);
740 break;
744 fuse_log(FUSE_LOG_INFO, "%s: Exit\n", __func__);
746 return 0;
749 static void strreplace(char *s, char old, char new)
751 for (; *s; ++s) {
752 if (*s == old) {
753 *s = new;
758 static bool fv_socket_lock(struct fuse_session *se)
760 g_autofree gchar *sk_name = NULL;
761 g_autofree gchar *pidfile = NULL;
762 g_autofree gchar *dir = NULL;
763 Error *local_err = NULL;
765 dir = qemu_get_local_state_pathname("run/virtiofsd");
767 if (g_mkdir_with_parents(dir, S_IRWXU) < 0) {
768 fuse_log(FUSE_LOG_ERR, "%s: Failed to create directory %s: %s",
769 __func__, dir, strerror(errno));
770 return false;
773 sk_name = g_strdup(se->vu_socket_path);
774 strreplace(sk_name, '/', '.');
775 pidfile = g_strdup_printf("%s/%s.pid", dir, sk_name);
777 if (!qemu_write_pidfile(pidfile, &local_err)) {
778 error_report_err(local_err);
779 return false;
782 return true;
785 static int fv_create_listen_socket(struct fuse_session *se)
787 struct sockaddr_un un;
788 mode_t old_umask;
790 /* Nothing to do if fd is already initialized */
791 if (se->vu_listen_fd >= 0) {
792 return 0;
795 if (strlen(se->vu_socket_path) >= sizeof(un.sun_path)) {
796 fuse_log(FUSE_LOG_ERR, "Socket path too long\n");
797 return -1;
800 if (!strlen(se->vu_socket_path)) {
801 fuse_log(FUSE_LOG_ERR, "Socket path is empty\n");
802 return -1;
805 /* Check the vu_socket_path is already used */
806 if (!fv_socket_lock(se)) {
807 return -1;
811 * Create the Unix socket to communicate with qemu
812 * based on QEMU's vhost-user-bridge
814 unlink(se->vu_socket_path);
815 strcpy(un.sun_path, se->vu_socket_path);
816 size_t addr_len = sizeof(un);
818 int listen_sock = socket(AF_UNIX, SOCK_STREAM, 0);
819 if (listen_sock == -1) {
820 fuse_log(FUSE_LOG_ERR, "vhost socket creation: %m\n");
821 return -1;
823 un.sun_family = AF_UNIX;
826 * Unfortunately bind doesn't let you set the mask on the socket,
827 * so set umask to 077 and restore it later.
829 old_umask = umask(0077);
830 if (bind(listen_sock, (struct sockaddr *)&un, addr_len) == -1) {
831 fuse_log(FUSE_LOG_ERR, "vhost socket bind: %m\n");
832 umask(old_umask);
833 return -1;
835 umask(old_umask);
837 if (listen(listen_sock, 1) == -1) {
838 fuse_log(FUSE_LOG_ERR, "vhost socket listen: %m\n");
839 return -1;
842 se->vu_listen_fd = listen_sock;
843 return 0;
846 int virtio_session_mount(struct fuse_session *se)
848 int ret;
850 ret = fv_create_listen_socket(se);
851 if (ret < 0) {
852 return ret;
855 se->fd = -1;
857 fuse_log(FUSE_LOG_INFO, "%s: Waiting for vhost-user socket connection...\n",
858 __func__);
859 int data_sock = accept(se->vu_listen_fd, NULL, NULL);
860 if (data_sock == -1) {
861 fuse_log(FUSE_LOG_ERR, "vhost socket accept: %m\n");
862 close(se->vu_listen_fd);
863 return -1;
865 close(se->vu_listen_fd);
866 se->vu_listen_fd = -1;
867 fuse_log(FUSE_LOG_INFO, "%s: Received vhost-user socket connection\n",
868 __func__);
870 /* TODO: Some cleanup/deallocation! */
871 se->virtio_dev = calloc(sizeof(struct fv_VuDev), 1);
872 if (!se->virtio_dev) {
873 fuse_log(FUSE_LOG_ERR, "%s: virtio_dev calloc failed\n", __func__);
874 close(data_sock);
875 return -1;
878 se->vu_socketfd = data_sock;
879 se->virtio_dev->se = se;
880 vu_init(&se->virtio_dev->dev, 2, se->vu_socketfd, fv_panic, fv_set_watch,
881 fv_remove_watch, &fv_iface);
883 return 0;
886 void virtio_session_close(struct fuse_session *se)
888 close(se->vu_socketfd);
890 if (!se->virtio_dev) {
891 return;
894 free(se->virtio_dev->qi);
895 free(se->virtio_dev);
896 se->virtio_dev = NULL;