2 * QEMU Block driver for RADOS (Ceph)
4 * Copyright (C) 2010 Christian Brunner <chb@muc.de>
6 * This work is licensed under the terms of the GNU GPL, version 2. See
7 * the COPYING file in the top-level directory.
11 #include "qemu-common.h"
12 #include "qemu-error.h"
14 #include "rbd_types.h"
15 #include "block_int.h"
17 #include <rados/librados.h>
22 * When specifying the image filename use:
24 * rbd:poolname/devicename
26 * poolname must be the name of an existing rados pool
28 * devicename is the basename for all objects used to
29 * emulate the raw device.
31 * Metadata information (image size, ...) is stored in an
32 * object with the name "devicename.rbd".
34 * The raw device is split into 4MB sized objects by default.
35 * The sequencenumber is encoded in a 12 byte long hex-string,
36 * and is attached to the devicename, separated by a dot.
37 * e.g. "devicename.1234567890ab"
41 #define OBJ_MAX_SIZE (1UL << OBJ_DEFAULT_OBJ_ORDER)
43 typedef struct RBDAIOCB
{
44 BlockDriverAIOCB common
;
53 struct BDRVRBDState
*s
;
57 typedef struct RADOSCB
{
60 struct BDRVRBDState
*s
;
68 #define RBD_FD_WRITE 1
70 typedef struct BDRVRBDState
{
73 rados_pool_t header_pool
;
74 char name
[RBD_MAX_OBJ_NAME_SIZE
];
75 char block_name
[RBD_MAX_BLOCK_NAME_SIZE
];
83 typedef struct rbd_obj_header_ondisk RbdHeader1
;
85 static void rbd_aio_bh_cb(void *opaque
);
87 static int rbd_next_tok(char *dst
, int dst_len
,
88 char *src
, char delim
,
98 end
= strchr(src
, delim
);
106 error_report("%s too long", name
);
109 error_report("%s too short", name
);
113 pstrcpy(dst
, dst_len
, src
);
118 static int rbd_parsename(const char *filename
,
119 char *pool
, int pool_len
,
120 char *snap
, int snap_len
,
121 char *name
, int name_len
)
127 if (!strstart(filename
, "rbd:", &start
)) {
131 buf
= qemu_strdup(start
);
134 ret
= rbd_next_tok(pool
, pool_len
, p
, '/', "pool name", &p
);
139 ret
= rbd_next_tok(name
, name_len
, p
, '@', "object name", &p
);
148 ret
= rbd_next_tok(snap
, snap_len
, p
, '\0', "snap name", &p
);
155 static int create_tmap_op(uint8_t op
, const char *name
, char **tmap_desc
)
157 uint32_t len
= strlen(name
);
158 uint32_t len_le
= cpu_to_le32(len
);
159 /* total_len = encoding op + name + empty buffer */
160 uint32_t total_len
= 1 + (sizeof(uint32_t) + len
) + sizeof(uint32_t);
161 uint8_t *desc
= NULL
;
163 desc
= qemu_malloc(total_len
);
165 *tmap_desc
= (char *)desc
;
169 memcpy(desc
, &len_le
, sizeof(len_le
));
170 desc
+= sizeof(len_le
);
171 memcpy(desc
, name
, len
);
173 len
= 0; /* no need for endian conversion for 0 */
174 memcpy(desc
, &len
, sizeof(len
));
177 return (char *)desc
- *tmap_desc
;
180 static void free_tmap_op(char *tmap_desc
)
182 qemu_free(tmap_desc
);
185 static int rbd_register_image(rados_pool_t pool
, const char *name
)
188 const char *dir
= RBD_DIRECTORY
;
191 ret
= create_tmap_op(CEPH_OSD_TMAP_SET
, name
, &tmap_desc
);
196 ret
= rados_tmap_update(pool
, dir
, tmap_desc
, ret
);
197 free_tmap_op(tmap_desc
);
202 static int touch_rbd_info(rados_pool_t pool
, const char *info_oid
)
204 int r
= rados_write(pool
, info_oid
, 0, NULL
, 0);
211 static int rbd_assign_bid(rados_pool_t pool
, uint64_t *id
)
214 const char *info_oid
= RBD_INFO
;
218 int r
= touch_rbd_info(pool
, info_oid
);
223 r
= rados_exec(pool
, info_oid
, "rbd", "assign_bid", NULL
,
224 0, (char *)out
, sizeof(out
));
235 static int rbd_create(const char *filename
, QEMUOptionParameter
*options
)
241 uint8_t obj_order
= RBD_DEFAULT_OBJ_ORDER
;
242 char pool
[RBD_MAX_SEG_NAME_SIZE
];
243 char n
[RBD_MAX_SEG_NAME_SIZE
];
244 char name
[RBD_MAX_OBJ_NAME_SIZE
];
245 char snap_buf
[RBD_MAX_SEG_NAME_SIZE
];
253 if (rbd_parsename(filename
,
255 snap_buf
, sizeof(snap_buf
),
256 name
, sizeof(name
)) < 0) {
259 if (snap_buf
[0] != '\0') {
263 snprintf(n
, sizeof(n
), "%s%s", name
, RBD_SUFFIX
);
265 /* Read out options */
266 while (options
&& options
->name
) {
267 if (!strcmp(options
->name
, BLOCK_OPT_SIZE
)) {
268 bytes
= options
->value
.n
;
269 } else if (!strcmp(options
->name
, BLOCK_OPT_CLUSTER_SIZE
)) {
270 if (options
->value
.n
) {
271 objsize
= options
->value
.n
;
272 if ((objsize
- 1) & objsize
) { /* not a power of 2? */
273 error_report("obj size needs to be power of 2");
276 if (objsize
< 4096) {
277 error_report("obj size too small");
280 obj_order
= ffs(objsize
) - 1;
286 memset(&header
, 0, sizeof(header
));
287 pstrcpy(header
.text
, sizeof(header
.text
), RBD_HEADER_TEXT
);
288 pstrcpy(header
.signature
, sizeof(header
.signature
), RBD_HEADER_SIGNATURE
);
289 pstrcpy(header
.version
, sizeof(header
.version
), RBD_HEADER_VERSION
);
290 header
.image_size
= cpu_to_le64(bytes
);
291 header
.options
.order
= obj_order
;
292 header
.options
.crypt_type
= RBD_CRYPT_NONE
;
293 header
.options
.comp_type
= RBD_COMP_NONE
;
295 header
.snap_count
= 0;
297 if (rados_initialize(0, NULL
) < 0) {
298 error_report("error initializing");
302 if (rados_open_pool(pool
, &p
)) {
303 error_report("error opening pool %s", pool
);
304 rados_deinitialize();
308 /* check for existing rbd header file */
309 ret
= rados_stat(p
, n
, &size
, &mtime
);
315 ret
= rbd_assign_bid(p
, &bid
);
317 error_report("failed assigning block id");
318 rados_deinitialize();
322 lo
= bid
& 0xFFFFFFFF;
323 snprintf(header
.block_name
, sizeof(header
.block_name
), "rb.%x.%x", hi
, lo
);
325 /* create header file */
326 ret
= rados_write(p
, n
, 0, (const char *)&header
, sizeof(header
));
331 ret
= rbd_register_image(p
, name
);
334 rados_deinitialize();
340 * This aio completion is being called from rbd_aio_event_reader() and
341 * runs in qemu context. It schedules a bh, but just in case the aio
342 * was not cancelled before.
344 static void rbd_complete_aio(RADOSCB
*rcb
)
346 RBDAIOCB
*acb
= rcb
->acb
;
351 if (acb
->cancelled
) {
353 qemu_vfree(acb
->bounce
);
354 qemu_aio_release(acb
);
365 } else if (!acb
->error
) {
366 acb
->ret
+= rcb
->segsize
;
370 memset(rcb
->buf
, 0, rcb
->segsize
);
372 acb
->ret
+= rcb
->segsize
;
375 memset(rcb
->buf
, 0, rcb
->segsize
);
378 } else if (r
< rcb
->segsize
) {
379 memset(rcb
->buf
+ r
, 0, rcb
->segsize
- r
);
381 acb
->ret
+= rcb
->segsize
;
383 } else if (!acb
->error
) {
387 /* Note that acb->bh can be NULL in case where the aio was cancelled */
389 acb
->bh
= qemu_bh_new(rbd_aio_bh_cb
, acb
);
390 qemu_bh_schedule(acb
->bh
);
397 * aio fd read handler. It runs in the qemu context and calls the
398 * completion handling of completed rados aio operations.
400 static void rbd_aio_event_reader(void *opaque
)
402 BDRVRBDState
*s
= opaque
;
407 char *p
= (char *)&s
->event_rcb
;
409 /* now read the rcb pointer that was sent from a non qemu thread */
410 if ((ret
= read(s
->fds
[RBD_FD_READ
], p
+ s
->event_reader_pos
,
411 sizeof(s
->event_rcb
) - s
->event_reader_pos
)) > 0) {
413 s
->event_reader_pos
+= ret
;
414 if (s
->event_reader_pos
== sizeof(s
->event_rcb
)) {
415 s
->event_reader_pos
= 0;
416 rbd_complete_aio(s
->event_rcb
);
417 s
->qemu_aio_count
--;
421 } while (ret
< 0 && errno
== EINTR
);
424 static int rbd_aio_flush_cb(void *opaque
)
426 BDRVRBDState
*s
= opaque
;
428 return (s
->qemu_aio_count
> 0);
432 static int rbd_set_snapc(rados_pool_t pool
, const char *snap
, RbdHeader1
*header
)
434 uint32_t snap_count
= le32_to_cpu(header
->snap_count
);
435 rados_snap_t
*snaps
= NULL
;
438 uint64_t snap_names_len
= le64_to_cpu(header
->snap_names_len
);
440 rados_snap_t snapid
= 0;
443 const char *header_snap
= (const char *)&header
->snaps
[snap_count
];
444 const char *end
= header_snap
+ snap_names_len
;
445 snaps
= qemu_malloc(sizeof(rados_snap_t
) * header
->snap_count
);
447 for (i
=0; i
< snap_count
; i
++) {
448 snaps
[i
] = le64_to_cpu(header
->snaps
[i
].id
);
450 if (snap
&& strcmp(snap
, header_snap
) == 0) {
454 header_snap
+= strlen(header_snap
) + 1;
455 if (header_snap
> end
) {
456 error_report("bad header, snapshot list broken");
461 if (snap
&& !snapid
) {
462 error_report("snapshot not found");
466 seq
= le32_to_cpu(header
->snap_seq
);
468 r
= rados_set_snap_context(pool
, seq
, snaps
, snap_count
);
470 rados_set_snap(pool
, snapid
);
477 #define BUF_READ_START_LEN 4096
479 static int rbd_read_header(BDRVRBDState
*s
, char **hbuf
)
482 char n
[RBD_MAX_SEG_NAME_SIZE
];
483 uint64_t len
= BUF_READ_START_LEN
;
486 snprintf(n
, sizeof(n
), "%s%s", s
->name
, RBD_SUFFIX
);
488 buf
= qemu_malloc(len
);
490 r
= rados_read(s
->header_pool
, n
, 0, buf
, len
);
500 buf
= qemu_malloc(len
);
502 r
= rados_stat(s
->header_pool
, n
, &len
, NULL
);
507 r
= rados_read(s
->header_pool
, n
, 0, buf
, len
);
521 static int rbd_open(BlockDriverState
*bs
, const char *filename
, int flags
)
523 BDRVRBDState
*s
= bs
->opaque
;
525 char pool
[RBD_MAX_SEG_NAME_SIZE
];
526 char snap_buf
[RBD_MAX_SEG_NAME_SIZE
];
531 if (rbd_parsename(filename
, pool
, sizeof(pool
),
532 snap_buf
, sizeof(snap_buf
),
533 s
->name
, sizeof(s
->name
)) < 0) {
536 if (snap_buf
[0] != '\0') {
540 if ((r
= rados_initialize(0, NULL
)) < 0) {
541 error_report("error initializing");
545 if ((r
= rados_open_pool(pool
, &s
->pool
))) {
546 error_report("error opening pool %s", pool
);
547 rados_deinitialize();
551 if ((r
= rados_open_pool(pool
, &s
->header_pool
))) {
552 error_report("error opening pool %s", pool
);
553 rados_deinitialize();
557 if ((r
= rbd_read_header(s
, &hbuf
)) < 0) {
558 error_report("error reading header from %s", s
->name
);
562 if (memcmp(hbuf
+ 64, RBD_HEADER_SIGNATURE
, 4)) {
563 error_report("Invalid header signature");
568 if (memcmp(hbuf
+ 68, RBD_HEADER_VERSION
, 8)) {
569 error_report("Unknown image version");
574 header
= (RbdHeader1
*) hbuf
;
575 s
->size
= le64_to_cpu(header
->image_size
);
576 s
->objsize
= 1ULL << header
->options
.order
;
577 memcpy(s
->block_name
, header
->block_name
, sizeof(header
->block_name
));
579 r
= rbd_set_snapc(s
->pool
, snap
, header
);
581 error_report("failed setting snap context: %s", strerror(-r
));
585 bs
->read_only
= (snap
!= NULL
);
587 s
->event_reader_pos
= 0;
588 r
= qemu_pipe(s
->fds
);
590 error_report("error opening eventfd");
593 fcntl(s
->fds
[0], F_SETFL
, O_NONBLOCK
);
594 fcntl(s
->fds
[1], F_SETFL
, O_NONBLOCK
);
595 qemu_aio_set_fd_handler(s
->fds
[RBD_FD_READ
], rbd_aio_event_reader
, NULL
,
596 rbd_aio_flush_cb
, NULL
, s
);
605 rados_close_pool(s
->header_pool
);
606 rados_close_pool(s
->pool
);
607 rados_deinitialize();
611 static void rbd_close(BlockDriverState
*bs
)
613 BDRVRBDState
*s
= bs
->opaque
;
617 qemu_aio_set_fd_handler(s
->fds
[RBD_FD_READ
], NULL
, NULL
, NULL
, NULL
,
620 rados_close_pool(s
->header_pool
);
621 rados_close_pool(s
->pool
);
622 rados_deinitialize();
626 * Cancel aio. Since we don't reference acb in a non qemu threads,
627 * it is safe to access it here.
629 static void rbd_aio_cancel(BlockDriverAIOCB
*blockacb
)
631 RBDAIOCB
*acb
= (RBDAIOCB
*) blockacb
;
635 static AIOPool rbd_aio_pool
= {
636 .aiocb_size
= sizeof(RBDAIOCB
),
637 .cancel
= rbd_aio_cancel
,
641 * This is the callback function for rados_aio_read and _write
643 * Note: this function is being called from a non qemu thread so
644 * we need to be careful about what we do here. Generally we only
645 * write to the block notification pipe, and do the rest of the
646 * io completion handling from rbd_aio_event_reader() which
647 * runs in a qemu context.
649 static void rbd_finish_aiocb(rados_completion_t c
, RADOSCB
*rcb
)
652 rcb
->ret
= rados_aio_get_return_value(c
);
653 rados_aio_release(c
);
656 int fd
= rcb
->s
->fds
[RBD_FD_WRITE
];
658 /* send the rcb pointer to the qemu thread that is responsible
659 for the aio completion. Must do it in a qemu thread context */
660 ret
= write(fd
, (void *)&rcb
, sizeof(rcb
));
664 if (errno
== EINTR
) {
667 if (errno
!= EAGAIN
) {
674 ret
= select(fd
+ 1, NULL
, &wfd
, NULL
, NULL
);
675 } while (ret
< 0 && errno
== EINTR
);
679 error_report("failed writing to acb->s->fds\n");
684 /* Callback when all queued rados_aio requests are complete */
686 static void rbd_aio_bh_cb(void *opaque
)
688 RBDAIOCB
*acb
= opaque
;
691 qemu_iovec_from_buffer(acb
->qiov
, acb
->bounce
, acb
->qiov
->size
);
693 qemu_vfree(acb
->bounce
);
694 acb
->common
.cb(acb
->common
.opaque
, (acb
->ret
> 0 ? 0 : acb
->ret
));
695 qemu_bh_delete(acb
->bh
);
698 qemu_aio_release(acb
);
701 static BlockDriverAIOCB
*rbd_aio_rw_vector(BlockDriverState
*bs
,
705 BlockDriverCompletionFunc
*cb
,
706 void *opaque
, int write
)
710 rados_completion_t c
;
711 char n
[RBD_MAX_SEG_NAME_SIZE
];
712 int64_t segnr
, segoffs
, segsize
, last_segnr
;
716 BDRVRBDState
*s
= bs
->opaque
;
718 acb
= qemu_aio_get(&rbd_aio_pool
, bs
, cb
, opaque
);
721 acb
->bounce
= qemu_blockalign(bs
, qiov
->size
);
730 qemu_iovec_to_buffer(acb
->qiov
, acb
->bounce
);
735 off
= sector_num
* BDRV_SECTOR_SIZE
;
736 size
= nb_sectors
* BDRV_SECTOR_SIZE
;
737 segnr
= off
/ s
->objsize
;
738 segoffs
= off
% s
->objsize
;
739 segsize
= s
->objsize
- segoffs
;
741 last_segnr
= ((off
+ size
- 1) / s
->objsize
);
742 acb
->aiocnt
= (last_segnr
- segnr
) + 1;
744 s
->qemu_aio_count
+= acb
->aiocnt
; /* All the RADOSCB */
747 if (size
< segsize
) {
751 snprintf(n
, sizeof(n
), "%s.%012" PRIx64
, s
->block_name
,
754 rcb
= qemu_malloc(sizeof(RADOSCB
));
757 rcb
->segsize
= segsize
;
762 rados_aio_create_completion(rcb
, NULL
,
763 (rados_callback_t
) rbd_finish_aiocb
,
765 rados_aio_write(s
->pool
, n
, segoffs
, buf
, segsize
, c
);
767 rados_aio_create_completion(rcb
,
768 (rados_callback_t
) rbd_finish_aiocb
,
770 rados_aio_read(s
->pool
, n
, segoffs
, buf
, segsize
, c
);
776 segsize
= s
->objsize
;
783 static BlockDriverAIOCB
*rbd_aio_readv(BlockDriverState
* bs
,
784 int64_t sector_num
, QEMUIOVector
* qiov
,
786 BlockDriverCompletionFunc
* cb
,
789 return rbd_aio_rw_vector(bs
, sector_num
, qiov
, nb_sectors
, cb
, opaque
, 0);
792 static BlockDriverAIOCB
*rbd_aio_writev(BlockDriverState
* bs
,
793 int64_t sector_num
, QEMUIOVector
* qiov
,
795 BlockDriverCompletionFunc
* cb
,
798 return rbd_aio_rw_vector(bs
, sector_num
, qiov
, nb_sectors
, cb
, opaque
, 1);
801 static int rbd_getinfo(BlockDriverState
* bs
, BlockDriverInfo
* bdi
)
803 BDRVRBDState
*s
= bs
->opaque
;
804 bdi
->cluster_size
= s
->objsize
;
808 static int64_t rbd_getlength(BlockDriverState
* bs
)
810 BDRVRBDState
*s
= bs
->opaque
;
815 static int rbd_snap_create(BlockDriverState
*bs
, QEMUSnapshotInfo
*sn_info
)
817 BDRVRBDState
*s
= bs
->opaque
;
818 char inbuf
[512], outbuf
[128];
822 char *end
= inbuf
+ sizeof(inbuf
);
823 char n
[RBD_MAX_SEG_NAME_SIZE
];
827 if (sn_info
->name
[0] == '\0') {
828 return -EINVAL
; /* we need a name for rbd snapshots */
832 * rbd snapshots are using the name as the user controlled unique identifier
833 * we can't use the rbd snapid for that purpose, as it can't be set
835 if (sn_info
->id_str
[0] != '\0' &&
836 strcmp(sn_info
->id_str
, sn_info
->name
) != 0) {
840 if (strlen(sn_info
->name
) >= sizeof(sn_info
->id_str
)) {
844 r
= rados_selfmanaged_snap_create(s
->header_pool
, &snap_id
);
846 error_report("failed to create snap id: %s", strerror(-r
));
850 *(uint32_t *)p
= strlen(sn_info
->name
);
851 cpu_to_le32s((uint32_t *)p
);
852 p
+= sizeof(uint32_t);
853 strncpy(p
, sn_info
->name
, end
- p
);
855 if (p
+ sizeof(snap_id
) > end
) {
856 error_report("invalid input parameter");
860 *(uint64_t *)p
= snap_id
;
861 cpu_to_le64s((uint64_t *)p
);
863 snprintf(n
, sizeof(n
), "%s%s", s
->name
, RBD_SUFFIX
);
865 r
= rados_exec(s
->header_pool
, n
, "rbd", "snap_add", inbuf
,
866 sizeof(inbuf
), outbuf
, sizeof(outbuf
));
868 error_report("rbd.snap_add execution failed failed: %s", strerror(-r
));
872 sprintf(sn_info
->id_str
, "%s", sn_info
->name
);
874 r
= rbd_read_header(s
, &hbuf
);
876 error_report("failed reading header: %s", strerror(-r
));
880 header
= (RbdHeader1
*) hbuf
;
881 r
= rbd_set_snapc(s
->pool
, sn_info
->name
, header
);
883 error_report("failed setting snap context: %s", strerror(-r
));
894 static int decode32(char **p
, const char *end
, uint32_t *v
)
900 *v
= *(uint32_t *)(*p
);
906 static int decode64(char **p
, const char *end
, uint64_t *v
)
912 *v
= *(uint64_t *)(*p
);
918 static int decode_str(char **p
, const char *end
, char **s
)
923 if ((r
= decode32(p
, end
, &len
)) < 0) {
927 *s
= qemu_malloc(len
+ 1);
935 static int rbd_snap_list(BlockDriverState
*bs
, QEMUSnapshotInfo
**psn_tab
)
937 BDRVRBDState
*s
= bs
->opaque
;
938 char n
[RBD_MAX_SEG_NAME_SIZE
];
939 QEMUSnapshotInfo
*sn_info
, *sn_tab
= NULL
;
942 char *outbuf
= NULL
, *end
, *buf
;
948 /* read header to estimate how much space we need to read the snap
950 if ((r
= rbd_read_header(s
, &hbuf
)) < 0) {
953 header
= (RbdHeader1
*)hbuf
;
954 len
= le64_to_cpu(header
->snap_names_len
);
955 len
+= 1024; /* should have already been enough, but new snapshots might
956 already been created since we read the header. just allocate
957 a bit more, so that in most cases it'll suffice anyway */
960 snprintf(n
, sizeof(n
), "%s%s", s
->name
, RBD_SUFFIX
);
963 outbuf
= qemu_malloc(len
);
965 r
= rados_exec(s
->header_pool
, n
, "rbd", "snap_list", NULL
, 0,
968 error_report("rbd.snap_list execution failed failed: %s", strerror(-r
));
975 /* if we're here, we probably raced with some snaps creation */
981 if ((r
= decode64(&buf
, end
, &snap_seq
)) < 0) {
984 if ((r
= decode32(&buf
, end
, &snap_count
)) < 0) {
988 sn_tab
= qemu_mallocz(snap_count
* sizeof(QEMUSnapshotInfo
));
989 for (i
= 0; i
< snap_count
; i
++) {
990 uint64_t id
, image_size
;
993 if ((r
= decode64(&buf
, end
, &id
)) < 0) {
996 if ((r
= decode64(&buf
, end
, &image_size
)) < 0) {
999 if ((r
= decode_str(&buf
, end
, &snap_name
)) < 0) {
1003 sn_info
= sn_tab
+ i
;
1004 pstrcpy(sn_info
->id_str
, sizeof(sn_info
->id_str
), snap_name
);
1005 pstrcpy(sn_info
->name
, sizeof(sn_info
->name
), snap_name
);
1006 qemu_free(snap_name
);
1008 sn_info
->vm_state_size
= image_size
;
1009 sn_info
->date_sec
= 0;
1010 sn_info
->date_nsec
= 0;
1011 sn_info
->vm_clock_nsec
= 0;
1022 static QEMUOptionParameter rbd_create_options
[] = {
1024 .name
= BLOCK_OPT_SIZE
,
1026 .help
= "Virtual disk size"
1029 .name
= BLOCK_OPT_CLUSTER_SIZE
,
1031 .help
= "RBD object size"
1036 static BlockDriver bdrv_rbd
= {
1037 .format_name
= "rbd",
1038 .instance_size
= sizeof(BDRVRBDState
),
1039 .bdrv_file_open
= rbd_open
,
1040 .bdrv_close
= rbd_close
,
1041 .bdrv_create
= rbd_create
,
1042 .bdrv_get_info
= rbd_getinfo
,
1043 .create_options
= rbd_create_options
,
1044 .bdrv_getlength
= rbd_getlength
,
1045 .protocol_name
= "rbd",
1047 .bdrv_aio_readv
= rbd_aio_readv
,
1048 .bdrv_aio_writev
= rbd_aio_writev
,
1050 .bdrv_snapshot_create
= rbd_snap_create
,
1051 .bdrv_snapshot_list
= rbd_snap_list
,
1054 static void bdrv_rbd_init(void)
1056 bdrv_register(&bdrv_rbd
);
1059 block_init(bdrv_rbd_init
);