gluster: Convert aio routines into coroutines
[qemu/rayw.git] / block / gluster.c
blobf9aea0ea27e6bd8c6c648e30d841f13efec0a16d
1 /*
2 * GlusterFS backend for QEMU
4 * Copyright (C) 2012 Bharata B Rao <bharata@linux.vnet.ibm.com>
6 * Pipe handling mechanism in AIO implementation is derived from
7 * block/rbd.c. Hence,
9 * Copyright (C) 2010-2011 Christian Brunner <chb@muc.de>,
10 * Josh Durgin <josh.durgin@dreamhost.com>
12 * This work is licensed under the terms of the GNU GPL, version 2. See
13 * the COPYING file in the top-level directory.
15 * Contributions after 2012-01-13 are licensed under the terms of the
16 * GNU GPL, version 2 or (at your option) any later version.
18 #include <glusterfs/api/glfs.h>
19 #include "block/block_int.h"
20 #include "qemu/sockets.h"
21 #include "qemu/uri.h"
23 typedef struct GlusterAIOCB {
24 int64_t size;
25 int ret;
26 QEMUBH *bh;
27 Coroutine *coroutine;
28 } GlusterAIOCB;
30 typedef struct BDRVGlusterState {
31 struct glfs *glfs;
32 struct glfs_fd *fd;
33 } BDRVGlusterState;
35 #define GLUSTER_FD_READ 0
36 #define GLUSTER_FD_WRITE 1
38 typedef struct GlusterConf {
39 char *server;
40 int port;
41 char *volname;
42 char *image;
43 char *transport;
44 } GlusterConf;
46 static void qemu_gluster_gconf_free(GlusterConf *gconf)
48 g_free(gconf->server);
49 g_free(gconf->volname);
50 g_free(gconf->image);
51 g_free(gconf->transport);
52 g_free(gconf);
55 static int parse_volume_options(GlusterConf *gconf, char *path)
57 char *p, *q;
59 if (!path) {
60 return -EINVAL;
63 /* volume */
64 p = q = path + strspn(path, "/");
65 p += strcspn(p, "/");
66 if (*p == '\0') {
67 return -EINVAL;
69 gconf->volname = g_strndup(q, p - q);
71 /* image */
72 p += strspn(p, "/");
73 if (*p == '\0') {
74 return -EINVAL;
76 gconf->image = g_strdup(p);
77 return 0;
81 * file=gluster[+transport]://[server[:port]]/volname/image[?socket=...]
83 * 'gluster' is the protocol.
85 * 'transport' specifies the transport type used to connect to gluster
86 * management daemon (glusterd). Valid transport types are
87 * tcp, unix and rdma. If a transport type isn't specified, then tcp
88 * type is assumed.
90 * 'server' specifies the server where the volume file specification for
91 * the given volume resides. This can be either hostname, ipv4 address
92 * or ipv6 address. ipv6 address needs to be within square brackets [ ].
93 * If transport type is 'unix', then 'server' field should not be specifed.
94 * The 'socket' field needs to be populated with the path to unix domain
95 * socket.
97 * 'port' is the port number on which glusterd is listening. This is optional
98 * and if not specified, QEMU will send 0 which will make gluster to use the
99 * default port. If the transport type is unix, then 'port' should not be
100 * specified.
102 * 'volname' is the name of the gluster volume which contains the VM image.
104 * 'image' is the path to the actual VM image that resides on gluster volume.
106 * Examples:
108 * file=gluster://1.2.3.4/testvol/a.img
109 * file=gluster+tcp://1.2.3.4/testvol/a.img
110 * file=gluster+tcp://1.2.3.4:24007/testvol/dir/a.img
111 * file=gluster+tcp://[1:2:3:4:5:6:7:8]/testvol/dir/a.img
112 * file=gluster+tcp://[1:2:3:4:5:6:7:8]:24007/testvol/dir/a.img
113 * file=gluster+tcp://server.domain.com:24007/testvol/dir/a.img
114 * file=gluster+unix:///testvol/dir/a.img?socket=/tmp/glusterd.socket
115 * file=gluster+rdma://1.2.3.4:24007/testvol/a.img
117 static int qemu_gluster_parseuri(GlusterConf *gconf, const char *filename)
119 URI *uri;
120 QueryParams *qp = NULL;
121 bool is_unix = false;
122 int ret = 0;
124 uri = uri_parse(filename);
125 if (!uri) {
126 return -EINVAL;
129 /* transport */
130 if (!strcmp(uri->scheme, "gluster")) {
131 gconf->transport = g_strdup("tcp");
132 } else if (!strcmp(uri->scheme, "gluster+tcp")) {
133 gconf->transport = g_strdup("tcp");
134 } else if (!strcmp(uri->scheme, "gluster+unix")) {
135 gconf->transport = g_strdup("unix");
136 is_unix = true;
137 } else if (!strcmp(uri->scheme, "gluster+rdma")) {
138 gconf->transport = g_strdup("rdma");
139 } else {
140 ret = -EINVAL;
141 goto out;
144 ret = parse_volume_options(gconf, uri->path);
145 if (ret < 0) {
146 goto out;
149 qp = query_params_parse(uri->query);
150 if (qp->n > 1 || (is_unix && !qp->n) || (!is_unix && qp->n)) {
151 ret = -EINVAL;
152 goto out;
155 if (is_unix) {
156 if (uri->server || uri->port) {
157 ret = -EINVAL;
158 goto out;
160 if (strcmp(qp->p[0].name, "socket")) {
161 ret = -EINVAL;
162 goto out;
164 gconf->server = g_strdup(qp->p[0].value);
165 } else {
166 gconf->server = g_strdup(uri->server);
167 gconf->port = uri->port;
170 out:
171 if (qp) {
172 query_params_free(qp);
174 uri_free(uri);
175 return ret;
178 static struct glfs *qemu_gluster_init(GlusterConf *gconf, const char *filename)
180 struct glfs *glfs = NULL;
181 int ret;
182 int old_errno;
184 ret = qemu_gluster_parseuri(gconf, filename);
185 if (ret < 0) {
186 error_report("Usage: file=gluster[+transport]://[server[:port]]/"
187 "volname/image[?socket=...]");
188 errno = -ret;
189 goto out;
192 glfs = glfs_new(gconf->volname);
193 if (!glfs) {
194 goto out;
197 ret = glfs_set_volfile_server(glfs, gconf->transport, gconf->server,
198 gconf->port);
199 if (ret < 0) {
200 goto out;
204 * TODO: Use GF_LOG_ERROR instead of hard code value of 4 here when
205 * GlusterFS makes GF_LOG_* macros available to libgfapi users.
207 ret = glfs_set_logging(glfs, "-", 4);
208 if (ret < 0) {
209 goto out;
212 ret = glfs_init(glfs);
213 if (ret) {
214 error_report("Gluster connection failed for server=%s port=%d "
215 "volume=%s image=%s transport=%s", gconf->server, gconf->port,
216 gconf->volname, gconf->image, gconf->transport);
217 goto out;
219 return glfs;
221 out:
222 if (glfs) {
223 old_errno = errno;
224 glfs_fini(glfs);
225 errno = old_errno;
227 return NULL;
230 static void qemu_gluster_complete_aio(void *opaque)
232 GlusterAIOCB *acb = (GlusterAIOCB *)opaque;
234 qemu_bh_delete(acb->bh);
235 acb->bh = NULL;
236 qemu_coroutine_enter(acb->coroutine, NULL);
239 /* TODO Convert to fine grained options */
240 static QemuOptsList runtime_opts = {
241 .name = "gluster",
242 .head = QTAILQ_HEAD_INITIALIZER(runtime_opts.head),
243 .desc = {
245 .name = "filename",
246 .type = QEMU_OPT_STRING,
247 .help = "URL to the gluster image",
249 { /* end of list */ }
253 static int qemu_gluster_open(BlockDriverState *bs, QDict *options,
254 int bdrv_flags, Error **errp)
256 BDRVGlusterState *s = bs->opaque;
257 int open_flags = O_BINARY;
258 int ret = 0;
259 GlusterConf *gconf = g_malloc0(sizeof(GlusterConf));
260 QemuOpts *opts;
261 Error *local_err = NULL;
262 const char *filename;
264 opts = qemu_opts_create(&runtime_opts, NULL, 0, &error_abort);
265 qemu_opts_absorb_qdict(opts, options, &local_err);
266 if (error_is_set(&local_err)) {
267 qerror_report_err(local_err);
268 error_free(local_err);
269 ret = -EINVAL;
270 goto out;
273 filename = qemu_opt_get(opts, "filename");
275 s->glfs = qemu_gluster_init(gconf, filename);
276 if (!s->glfs) {
277 ret = -errno;
278 goto out;
281 if (bdrv_flags & BDRV_O_RDWR) {
282 open_flags |= O_RDWR;
283 } else {
284 open_flags |= O_RDONLY;
287 if ((bdrv_flags & BDRV_O_NOCACHE)) {
288 open_flags |= O_DIRECT;
291 s->fd = glfs_open(s->glfs, gconf->image, open_flags);
292 if (!s->fd) {
293 ret = -errno;
296 out:
297 qemu_opts_del(opts);
298 qemu_gluster_gconf_free(gconf);
299 if (!ret) {
300 return ret;
302 if (s->fd) {
303 glfs_close(s->fd);
305 if (s->glfs) {
306 glfs_fini(s->glfs);
308 return ret;
311 static int qemu_gluster_create(const char *filename,
312 QEMUOptionParameter *options, Error **errp)
314 struct glfs *glfs;
315 struct glfs_fd *fd;
316 int ret = 0;
317 int64_t total_size = 0;
318 GlusterConf *gconf = g_malloc0(sizeof(GlusterConf));
320 glfs = qemu_gluster_init(gconf, filename);
321 if (!glfs) {
322 ret = -errno;
323 goto out;
326 while (options && options->name) {
327 if (!strcmp(options->name, BLOCK_OPT_SIZE)) {
328 total_size = options->value.n / BDRV_SECTOR_SIZE;
330 options++;
333 fd = glfs_creat(glfs, gconf->image,
334 O_WRONLY | O_CREAT | O_TRUNC | O_BINARY, S_IRUSR | S_IWUSR);
335 if (!fd) {
336 ret = -errno;
337 } else {
338 if (glfs_ftruncate(fd, total_size * BDRV_SECTOR_SIZE) != 0) {
339 ret = -errno;
341 if (glfs_close(fd) != 0) {
342 ret = -errno;
345 out:
346 qemu_gluster_gconf_free(gconf);
347 if (glfs) {
348 glfs_fini(glfs);
350 return ret;
354 * AIO callback routine called from GlusterFS thread.
356 static void gluster_finish_aiocb(struct glfs_fd *fd, ssize_t ret, void *arg)
358 GlusterAIOCB *acb = (GlusterAIOCB *)arg;
360 if (!ret || ret == acb->size) {
361 acb->ret = 0; /* Success */
362 } else if (ret < 0) {
363 acb->ret = ret; /* Read/Write failed */
364 } else {
365 acb->ret = -EIO; /* Partial read/write - fail it */
368 acb->bh = qemu_bh_new(qemu_gluster_complete_aio, acb);
369 qemu_bh_schedule(acb->bh);
372 static coroutine_fn int qemu_gluster_co_rw(BlockDriverState *bs,
373 int64_t sector_num, int nb_sectors, QEMUIOVector *qiov, int write)
375 int ret;
376 GlusterAIOCB *acb = g_slice_new(GlusterAIOCB);
377 BDRVGlusterState *s = bs->opaque;
378 size_t size = nb_sectors * BDRV_SECTOR_SIZE;
379 off_t offset = sector_num * BDRV_SECTOR_SIZE;
381 acb->size = size;
382 acb->ret = 0;
383 acb->coroutine = qemu_coroutine_self();
385 if (write) {
386 ret = glfs_pwritev_async(s->fd, qiov->iov, qiov->niov, offset, 0,
387 &gluster_finish_aiocb, acb);
388 } else {
389 ret = glfs_preadv_async(s->fd, qiov->iov, qiov->niov, offset, 0,
390 &gluster_finish_aiocb, acb);
393 if (ret < 0) {
394 ret = -errno;
395 goto out;
398 qemu_coroutine_yield();
399 ret = acb->ret;
401 out:
402 g_slice_free(GlusterAIOCB, acb);
403 return ret;
406 static int qemu_gluster_truncate(BlockDriverState *bs, int64_t offset)
408 int ret;
409 BDRVGlusterState *s = bs->opaque;
411 ret = glfs_ftruncate(s->fd, offset);
412 if (ret < 0) {
413 return -errno;
416 return 0;
419 static coroutine_fn int qemu_gluster_co_readv(BlockDriverState *bs,
420 int64_t sector_num, int nb_sectors, QEMUIOVector *qiov)
422 return qemu_gluster_co_rw(bs, sector_num, nb_sectors, qiov, 0);
425 static coroutine_fn int qemu_gluster_co_writev(BlockDriverState *bs,
426 int64_t sector_num, int nb_sectors, QEMUIOVector *qiov)
428 return qemu_gluster_co_rw(bs, sector_num, nb_sectors, qiov, 1);
431 static coroutine_fn int qemu_gluster_co_flush_to_disk(BlockDriverState *bs)
433 int ret;
434 GlusterAIOCB *acb = g_slice_new(GlusterAIOCB);
435 BDRVGlusterState *s = bs->opaque;
437 acb->size = 0;
438 acb->ret = 0;
439 acb->coroutine = qemu_coroutine_self();
441 ret = glfs_fsync_async(s->fd, &gluster_finish_aiocb, acb);
442 if (ret < 0) {
443 ret = -errno;
444 goto out;
447 qemu_coroutine_yield();
448 ret = acb->ret;
450 out:
451 g_slice_free(GlusterAIOCB, acb);
452 return ret;
455 #ifdef CONFIG_GLUSTERFS_DISCARD
456 static coroutine_fn int qemu_gluster_co_discard(BlockDriverState *bs,
457 int64_t sector_num, int nb_sectors)
459 int ret;
460 GlusterAIOCB *acb = g_slice_new(GlusterAIOCB);
461 BDRVGlusterState *s = bs->opaque;
462 size_t size = nb_sectors * BDRV_SECTOR_SIZE;
463 off_t offset = sector_num * BDRV_SECTOR_SIZE;
465 acb->size = 0;
466 acb->ret = 0;
467 acb->coroutine = qemu_coroutine_self();
469 ret = glfs_discard_async(s->fd, offset, size, &gluster_finish_aiocb, acb);
470 if (ret < 0) {
471 ret = -errno;
472 goto out;
475 qemu_coroutine_yield();
476 ret = acb->ret;
478 out:
479 g_slice_free(GlusterAIOCB, acb);
480 return ret;
482 #endif
484 static int64_t qemu_gluster_getlength(BlockDriverState *bs)
486 BDRVGlusterState *s = bs->opaque;
487 int64_t ret;
489 ret = glfs_lseek(s->fd, 0, SEEK_END);
490 if (ret < 0) {
491 return -errno;
492 } else {
493 return ret;
497 static int64_t qemu_gluster_allocated_file_size(BlockDriverState *bs)
499 BDRVGlusterState *s = bs->opaque;
500 struct stat st;
501 int ret;
503 ret = glfs_fstat(s->fd, &st);
504 if (ret < 0) {
505 return -errno;
506 } else {
507 return st.st_blocks * 512;
511 static void qemu_gluster_close(BlockDriverState *bs)
513 BDRVGlusterState *s = bs->opaque;
515 if (s->fd) {
516 glfs_close(s->fd);
517 s->fd = NULL;
519 glfs_fini(s->glfs);
522 static int qemu_gluster_has_zero_init(BlockDriverState *bs)
524 /* GlusterFS volume could be backed by a block device */
525 return 0;
528 static QEMUOptionParameter qemu_gluster_create_options[] = {
530 .name = BLOCK_OPT_SIZE,
531 .type = OPT_SIZE,
532 .help = "Virtual disk size"
534 { NULL }
537 static BlockDriver bdrv_gluster = {
538 .format_name = "gluster",
539 .protocol_name = "gluster",
540 .instance_size = sizeof(BDRVGlusterState),
541 .bdrv_needs_filename = true,
542 .bdrv_file_open = qemu_gluster_open,
543 .bdrv_close = qemu_gluster_close,
544 .bdrv_create = qemu_gluster_create,
545 .bdrv_getlength = qemu_gluster_getlength,
546 .bdrv_get_allocated_file_size = qemu_gluster_allocated_file_size,
547 .bdrv_truncate = qemu_gluster_truncate,
548 .bdrv_co_readv = qemu_gluster_co_readv,
549 .bdrv_co_writev = qemu_gluster_co_writev,
550 .bdrv_co_flush_to_disk = qemu_gluster_co_flush_to_disk,
551 .bdrv_has_zero_init = qemu_gluster_has_zero_init,
552 #ifdef CONFIG_GLUSTERFS_DISCARD
553 .bdrv_co_discard = qemu_gluster_co_discard,
554 #endif
555 .create_options = qemu_gluster_create_options,
558 static BlockDriver bdrv_gluster_tcp = {
559 .format_name = "gluster",
560 .protocol_name = "gluster+tcp",
561 .instance_size = sizeof(BDRVGlusterState),
562 .bdrv_needs_filename = true,
563 .bdrv_file_open = qemu_gluster_open,
564 .bdrv_close = qemu_gluster_close,
565 .bdrv_create = qemu_gluster_create,
566 .bdrv_getlength = qemu_gluster_getlength,
567 .bdrv_get_allocated_file_size = qemu_gluster_allocated_file_size,
568 .bdrv_truncate = qemu_gluster_truncate,
569 .bdrv_co_readv = qemu_gluster_co_readv,
570 .bdrv_co_writev = qemu_gluster_co_writev,
571 .bdrv_co_flush_to_disk = qemu_gluster_co_flush_to_disk,
572 .bdrv_has_zero_init = qemu_gluster_has_zero_init,
573 #ifdef CONFIG_GLUSTERFS_DISCARD
574 .bdrv_co_discard = qemu_gluster_co_discard,
575 #endif
576 .create_options = qemu_gluster_create_options,
579 static BlockDriver bdrv_gluster_unix = {
580 .format_name = "gluster",
581 .protocol_name = "gluster+unix",
582 .instance_size = sizeof(BDRVGlusterState),
583 .bdrv_needs_filename = true,
584 .bdrv_file_open = qemu_gluster_open,
585 .bdrv_close = qemu_gluster_close,
586 .bdrv_create = qemu_gluster_create,
587 .bdrv_getlength = qemu_gluster_getlength,
588 .bdrv_get_allocated_file_size = qemu_gluster_allocated_file_size,
589 .bdrv_truncate = qemu_gluster_truncate,
590 .bdrv_co_readv = qemu_gluster_co_readv,
591 .bdrv_co_writev = qemu_gluster_co_writev,
592 .bdrv_co_flush_to_disk = qemu_gluster_co_flush_to_disk,
593 .bdrv_has_zero_init = qemu_gluster_has_zero_init,
594 #ifdef CONFIG_GLUSTERFS_DISCARD
595 .bdrv_co_discard = qemu_gluster_co_discard,
596 #endif
597 .create_options = qemu_gluster_create_options,
600 static BlockDriver bdrv_gluster_rdma = {
601 .format_name = "gluster",
602 .protocol_name = "gluster+rdma",
603 .instance_size = sizeof(BDRVGlusterState),
604 .bdrv_needs_filename = true,
605 .bdrv_file_open = qemu_gluster_open,
606 .bdrv_close = qemu_gluster_close,
607 .bdrv_create = qemu_gluster_create,
608 .bdrv_getlength = qemu_gluster_getlength,
609 .bdrv_get_allocated_file_size = qemu_gluster_allocated_file_size,
610 .bdrv_truncate = qemu_gluster_truncate,
611 .bdrv_co_readv = qemu_gluster_co_readv,
612 .bdrv_co_writev = qemu_gluster_co_writev,
613 .bdrv_co_flush_to_disk = qemu_gluster_co_flush_to_disk,
614 .bdrv_has_zero_init = qemu_gluster_has_zero_init,
615 #ifdef CONFIG_GLUSTERFS_DISCARD
616 .bdrv_co_discard = qemu_gluster_co_discard,
617 #endif
618 .create_options = qemu_gluster_create_options,
621 static void bdrv_gluster_init(void)
623 bdrv_register(&bdrv_gluster_rdma);
624 bdrv_register(&bdrv_gluster_unix);
625 bdrv_register(&bdrv_gluster_tcp);
626 bdrv_register(&bdrv_gluster);
629 block_init(bdrv_gluster_init);