Merge remote-tracking branch 'qemu-project/master'
[qemu/ar7.git] / block / rbd.c
blobb3a36045a18ecb6f82d0727273e2352bda754c6e
1 /*
2 * QEMU Block driver for RADOS (Ceph)
4 * Copyright (C) 2010-2011 Christian Brunner <chb@muc.de>,
5 * Josh Durgin <josh.durgin@dreamhost.com>
7 * This work is licensed under the terms of the GNU GPL, version 2. See
8 * the COPYING file in the top-level directory.
10 * Contributions after 2012-01-13 are licensed under the terms of the
11 * GNU GPL, version 2 or (at your option) any later version.
14 #include "qemu/osdep.h"
16 #include <rbd/librbd.h>
17 #include "qapi/error.h"
18 #include "qemu/error-report.h"
19 #include "qemu/module.h"
20 #include "qemu/option.h"
21 #include "block/block-io.h"
22 #include "block/block_int.h"
23 #include "block/qdict.h"
24 #include "crypto/secret.h"
25 #include "qemu/cutils.h"
26 #include "sysemu/replay.h"
27 #include "qapi/qmp/qstring.h"
28 #include "qapi/qmp/qdict.h"
29 #include "qapi/qmp/qjson.h"
30 #include "qapi/qmp/qlist.h"
31 #include "qapi/qobject-input-visitor.h"
32 #include "qapi/qapi-visit-block-core.h"
35 * When specifying the image filename use:
37 * rbd:poolname/devicename[@snapshotname][:option1=value1[:option2=value2...]]
39 * poolname must be the name of an existing rados pool.
41 * devicename is the name of the rbd image.
43 * Each option given is used to configure rados, and may be any valid
44 * Ceph option, "id", or "conf".
46 * The "id" option indicates what user we should authenticate as to
47 * the Ceph cluster. If it is excluded we will use the Ceph default
48 * (normally 'admin').
50 * The "conf" option specifies a Ceph configuration file to read. If
51 * it is not specified, we will read from the default Ceph locations
52 * (e.g., /etc/ceph/ceph.conf). To avoid reading _any_ configuration
53 * file, specify conf=/dev/null.
55 * Configuration values containing :, @, or = can be escaped with a
56 * leading "\".
59 #define OBJ_MAX_SIZE (1UL << OBJ_DEFAULT_OBJ_ORDER)
61 #define RBD_MAX_SNAPS 100
63 #define RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN 8
65 static const char rbd_luks_header_verification[
66 RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN] = {
67 'L', 'U', 'K', 'S', 0xBA, 0xBE, 0, 1
70 static const char rbd_luks2_header_verification[
71 RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN] = {
72 'L', 'U', 'K', 'S', 0xBA, 0xBE, 0, 2
75 static const char rbd_layered_luks_header_verification[
76 RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN] = {
77 'R', 'B', 'D', 'L', 0xBA, 0xBE, 0, 1
80 static const char rbd_layered_luks2_header_verification[
81 RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN] = {
82 'R', 'B', 'D', 'L', 0xBA, 0xBE, 0, 2
85 typedef enum {
86 RBD_AIO_READ,
87 RBD_AIO_WRITE,
88 RBD_AIO_DISCARD,
89 RBD_AIO_FLUSH,
90 RBD_AIO_WRITE_ZEROES
91 } RBDAIOCmd;
93 typedef struct BDRVRBDState {
94 rados_t cluster;
95 rados_ioctx_t io_ctx;
96 rbd_image_t image;
97 char *image_name;
98 char *snap;
99 char *namespace;
100 uint64_t image_size;
101 uint64_t object_size;
102 } BDRVRBDState;
104 typedef struct RBDTask {
105 BlockDriverState *bs;
106 Coroutine *co;
107 bool complete;
108 int64_t ret;
109 } RBDTask;
111 typedef struct RBDDiffIterateReq {
112 uint64_t offs;
113 uint64_t bytes;
114 bool exists;
115 } RBDDiffIterateReq;
117 static int qemu_rbd_connect(rados_t *cluster, rados_ioctx_t *io_ctx,
118 BlockdevOptionsRbd *opts, bool cache,
119 const char *keypairs, const char *secretid,
120 Error **errp);
122 static char *qemu_rbd_strchr(char *src, char delim)
124 char *p;
126 for (p = src; *p; ++p) {
127 if (*p == delim) {
128 return p;
130 if (*p == '\\' && p[1] != '\0') {
131 ++p;
135 return NULL;
139 static char *qemu_rbd_next_tok(char *src, char delim, char **p)
141 char *end;
143 *p = NULL;
145 end = qemu_rbd_strchr(src, delim);
146 if (end) {
147 *p = end + 1;
148 *end = '\0';
150 return src;
153 static void qemu_rbd_unescape(char *src)
155 char *p;
157 for (p = src; *src; ++src, ++p) {
158 if (*src == '\\' && src[1] != '\0') {
159 src++;
161 *p = *src;
163 *p = '\0';
166 static void qemu_rbd_parse_filename(const char *filename, QDict *options,
167 Error **errp)
169 const char *start;
170 char *p, *buf;
171 QList *keypairs = NULL;
172 char *found_str, *image_name;
174 if (!strstart(filename, "rbd:", &start)) {
175 error_setg(errp, "File name must start with 'rbd:'");
176 return;
179 buf = g_strdup(start);
180 p = buf;
182 found_str = qemu_rbd_next_tok(p, '/', &p);
183 if (!p) {
184 error_setg(errp, "Pool name is required");
185 goto done;
187 qemu_rbd_unescape(found_str);
188 qdict_put_str(options, "pool", found_str);
190 if (qemu_rbd_strchr(p, '@')) {
191 image_name = qemu_rbd_next_tok(p, '@', &p);
193 found_str = qemu_rbd_next_tok(p, ':', &p);
194 qemu_rbd_unescape(found_str);
195 qdict_put_str(options, "snapshot", found_str);
196 } else {
197 image_name = qemu_rbd_next_tok(p, ':', &p);
199 /* Check for namespace in the image_name */
200 if (qemu_rbd_strchr(image_name, '/')) {
201 found_str = qemu_rbd_next_tok(image_name, '/', &image_name);
202 qemu_rbd_unescape(found_str);
203 qdict_put_str(options, "namespace", found_str);
204 } else {
205 qdict_put_str(options, "namespace", "");
207 qemu_rbd_unescape(image_name);
208 qdict_put_str(options, "image", image_name);
209 if (!p) {
210 goto done;
213 /* The following are essentially all key/value pairs, and we treat
214 * 'id' and 'conf' a bit special. Key/value pairs may be in any order. */
215 while (p) {
216 char *name, *value;
217 name = qemu_rbd_next_tok(p, '=', &p);
218 if (!p) {
219 error_setg(errp, "conf option %s has no value", name);
220 break;
223 qemu_rbd_unescape(name);
225 value = qemu_rbd_next_tok(p, ':', &p);
226 qemu_rbd_unescape(value);
228 if (!strcmp(name, "conf")) {
229 qdict_put_str(options, "conf", value);
230 } else if (!strcmp(name, "id")) {
231 qdict_put_str(options, "user", value);
232 } else {
234 * We pass these internally to qemu_rbd_set_keypairs(), so
235 * we can get away with the simpler list of [ "key1",
236 * "value1", "key2", "value2" ] rather than a raw dict
237 * { "key1": "value1", "key2": "value2" } where we can't
238 * guarantee order, or even a more correct but complex
239 * [ { "key1": "value1" }, { "key2": "value2" } ]
241 if (!keypairs) {
242 keypairs = qlist_new();
244 qlist_append_str(keypairs, name);
245 qlist_append_str(keypairs, value);
249 if (keypairs) {
250 qdict_put(options, "=keyvalue-pairs",
251 qstring_from_gstring(qobject_to_json(QOBJECT(keypairs))));
254 done:
255 g_free(buf);
256 qobject_unref(keypairs);
257 return;
260 static int qemu_rbd_set_auth(rados_t cluster, BlockdevOptionsRbd *opts,
261 Error **errp)
263 char *key, *acr;
264 int r;
265 GString *accu;
266 RbdAuthModeList *auth;
268 if (opts->key_secret) {
269 key = qcrypto_secret_lookup_as_base64(opts->key_secret, errp);
270 if (!key) {
271 return -EIO;
273 r = rados_conf_set(cluster, "key", key);
274 g_free(key);
275 if (r < 0) {
276 error_setg_errno(errp, -r, "Could not set 'key'");
277 return r;
281 if (opts->has_auth_client_required) {
282 accu = g_string_new("");
283 for (auth = opts->auth_client_required; auth; auth = auth->next) {
284 if (accu->str[0]) {
285 g_string_append_c(accu, ';');
287 g_string_append(accu, RbdAuthMode_str(auth->value));
289 acr = g_string_free(accu, FALSE);
290 r = rados_conf_set(cluster, "auth_client_required", acr);
291 g_free(acr);
292 if (r < 0) {
293 error_setg_errno(errp, -r,
294 "Could not set 'auth_client_required'");
295 return r;
299 return 0;
302 #pragma GCC diagnostic ignored "-Wsuggest-attribute=format"
303 static int qemu_rbd_set_keypairs(rados_t cluster, const char *keypairs_json,
304 Error **errp)
306 QList *keypairs;
307 QString *name;
308 QString *value;
309 const char *key;
310 size_t remaining;
311 int ret = 0;
313 if (!keypairs_json) {
314 return ret;
316 keypairs = qobject_to(QList,
317 qobject_from_json(keypairs_json, &error_abort));
318 remaining = qlist_size(keypairs) / 2;
319 assert(remaining);
321 while (remaining--) {
322 name = qobject_to(QString, qlist_pop(keypairs));
323 value = qobject_to(QString, qlist_pop(keypairs));
324 assert(name && value);
325 key = qstring_get_str(name);
327 ret = rados_conf_set(cluster, key, qstring_get_str(value));
328 qobject_unref(value);
329 if (ret < 0) {
330 error_setg_errno(errp, -ret, "invalid conf option %s", key);
331 qobject_unref(name);
332 ret = -EINVAL;
333 break;
335 qobject_unref(name);
338 qobject_unref(keypairs);
339 return ret;
342 #ifdef LIBRBD_SUPPORTS_ENCRYPTION
343 static int qemu_rbd_convert_luks_options(
344 RbdEncryptionOptionsLUKSBase *luks_opts,
345 char **passphrase,
346 size_t *passphrase_len,
347 Error **errp)
349 return qcrypto_secret_lookup(luks_opts->key_secret, (uint8_t **)passphrase,
350 passphrase_len, errp);
353 static int qemu_rbd_convert_luks_create_options(
354 RbdEncryptionCreateOptionsLUKSBase *luks_opts,
355 rbd_encryption_algorithm_t *alg,
356 char **passphrase,
357 size_t *passphrase_len,
358 Error **errp)
360 int r = 0;
362 r = qemu_rbd_convert_luks_options(
363 qapi_RbdEncryptionCreateOptionsLUKSBase_base(luks_opts),
364 passphrase, passphrase_len, errp);
365 if (r < 0) {
366 return r;
369 if (luks_opts->has_cipher_alg) {
370 switch (luks_opts->cipher_alg) {
371 case QCRYPTO_CIPHER_ALG_AES_128: {
372 *alg = RBD_ENCRYPTION_ALGORITHM_AES128;
373 break;
375 case QCRYPTO_CIPHER_ALG_AES_256: {
376 *alg = RBD_ENCRYPTION_ALGORITHM_AES256;
377 break;
379 default: {
380 r = -ENOTSUP;
381 error_setg_errno(errp, -r, "unknown encryption algorithm: %u",
382 luks_opts->cipher_alg);
383 return r;
386 } else {
387 /* default alg */
388 *alg = RBD_ENCRYPTION_ALGORITHM_AES256;
391 return 0;
394 static int qemu_rbd_encryption_format(rbd_image_t image,
395 RbdEncryptionCreateOptions *encrypt,
396 Error **errp)
398 int r = 0;
399 g_autofree char *passphrase = NULL;
400 rbd_encryption_format_t format;
401 rbd_encryption_options_t opts;
402 rbd_encryption_luks1_format_options_t luks_opts;
403 rbd_encryption_luks2_format_options_t luks2_opts;
404 size_t opts_size;
405 uint64_t raw_size, effective_size;
407 r = rbd_get_size(image, &raw_size);
408 if (r < 0) {
409 error_setg_errno(errp, -r, "cannot get raw image size");
410 return r;
413 switch (encrypt->format) {
414 case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS: {
415 memset(&luks_opts, 0, sizeof(luks_opts));
416 format = RBD_ENCRYPTION_FORMAT_LUKS1;
417 opts = &luks_opts;
418 opts_size = sizeof(luks_opts);
419 r = qemu_rbd_convert_luks_create_options(
420 qapi_RbdEncryptionCreateOptionsLUKS_base(&encrypt->u.luks),
421 &luks_opts.alg, &passphrase, &luks_opts.passphrase_size,
422 errp);
423 if (r < 0) {
424 return r;
426 luks_opts.passphrase = passphrase;
427 break;
429 case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS2: {
430 memset(&luks2_opts, 0, sizeof(luks2_opts));
431 format = RBD_ENCRYPTION_FORMAT_LUKS2;
432 opts = &luks2_opts;
433 opts_size = sizeof(luks2_opts);
434 r = qemu_rbd_convert_luks_create_options(
435 qapi_RbdEncryptionCreateOptionsLUKS2_base(
436 &encrypt->u.luks2),
437 &luks2_opts.alg, &passphrase, &luks2_opts.passphrase_size,
438 errp);
439 if (r < 0) {
440 return r;
442 luks2_opts.passphrase = passphrase;
443 break;
445 default: {
446 r = -ENOTSUP;
447 error_setg_errno(
448 errp, -r, "unknown image encryption format: %u",
449 encrypt->format);
450 return r;
454 r = rbd_encryption_format(image, format, opts, opts_size);
455 if (r < 0) {
456 error_setg_errno(errp, -r, "encryption format fail");
457 return r;
460 r = rbd_get_size(image, &effective_size);
461 if (r < 0) {
462 error_setg_errno(errp, -r, "cannot get effective image size");
463 return r;
466 r = rbd_resize(image, raw_size + (raw_size - effective_size));
467 if (r < 0) {
468 error_setg_errno(errp, -r, "cannot resize image after format");
469 return r;
472 return 0;
475 static int qemu_rbd_encryption_load(rbd_image_t image,
476 RbdEncryptionOptions *encrypt,
477 Error **errp)
479 int r = 0;
480 g_autofree char *passphrase = NULL;
481 rbd_encryption_luks1_format_options_t luks_opts;
482 rbd_encryption_luks2_format_options_t luks2_opts;
483 #ifdef LIBRBD_SUPPORTS_ENCRYPTION_LOAD2
484 rbd_encryption_luks_format_options_t luks_any_opts;
485 #endif
486 rbd_encryption_format_t format;
487 rbd_encryption_options_t opts;
488 size_t opts_size;
490 switch (encrypt->format) {
491 case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS: {
492 memset(&luks_opts, 0, sizeof(luks_opts));
493 format = RBD_ENCRYPTION_FORMAT_LUKS1;
494 opts = &luks_opts;
495 opts_size = sizeof(luks_opts);
496 r = qemu_rbd_convert_luks_options(
497 qapi_RbdEncryptionOptionsLUKS_base(&encrypt->u.luks),
498 &passphrase, &luks_opts.passphrase_size, errp);
499 if (r < 0) {
500 return r;
502 luks_opts.passphrase = passphrase;
503 break;
505 case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS2: {
506 memset(&luks2_opts, 0, sizeof(luks2_opts));
507 format = RBD_ENCRYPTION_FORMAT_LUKS2;
508 opts = &luks2_opts;
509 opts_size = sizeof(luks2_opts);
510 r = qemu_rbd_convert_luks_options(
511 qapi_RbdEncryptionOptionsLUKS2_base(&encrypt->u.luks2),
512 &passphrase, &luks2_opts.passphrase_size, errp);
513 if (r < 0) {
514 return r;
516 luks2_opts.passphrase = passphrase;
517 break;
519 #ifdef LIBRBD_SUPPORTS_ENCRYPTION_LOAD2
520 case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS_ANY: {
521 memset(&luks_any_opts, 0, sizeof(luks_any_opts));
522 format = RBD_ENCRYPTION_FORMAT_LUKS;
523 opts = &luks_any_opts;
524 opts_size = sizeof(luks_any_opts);
525 r = qemu_rbd_convert_luks_options(
526 qapi_RbdEncryptionOptionsLUKSAny_base(&encrypt->u.luks_any),
527 &passphrase, &luks_any_opts.passphrase_size, errp);
528 if (r < 0) {
529 return r;
531 luks_any_opts.passphrase = passphrase;
532 break;
534 #endif
535 default: {
536 r = -ENOTSUP;
537 error_setg_errno(
538 errp, -r, "unknown image encryption format: %u",
539 encrypt->format);
540 return r;
544 r = rbd_encryption_load(image, format, opts, opts_size);
545 if (r < 0) {
546 error_setg_errno(errp, -r, "encryption load fail");
547 return r;
550 return 0;
553 #ifdef LIBRBD_SUPPORTS_ENCRYPTION_LOAD2
554 static int qemu_rbd_encryption_load2(rbd_image_t image,
555 RbdEncryptionOptions *encrypt,
556 Error **errp)
558 int r = 0;
559 int encrypt_count = 1;
560 int i;
561 RbdEncryptionOptions *curr_encrypt;
562 rbd_encryption_spec_t *specs;
563 rbd_encryption_luks1_format_options_t *luks_opts;
564 rbd_encryption_luks2_format_options_t *luks2_opts;
565 rbd_encryption_luks_format_options_t *luks_any_opts;
567 /* count encryption options */
568 for (curr_encrypt = encrypt->parent; curr_encrypt;
569 curr_encrypt = curr_encrypt->parent) {
570 ++encrypt_count;
573 specs = g_new0(rbd_encryption_spec_t, encrypt_count);
575 curr_encrypt = encrypt;
576 for (i = 0; i < encrypt_count; ++i) {
577 switch (curr_encrypt->format) {
578 case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS: {
579 specs[i].format = RBD_ENCRYPTION_FORMAT_LUKS1;
581 luks_opts = g_new0(rbd_encryption_luks1_format_options_t, 1);
582 specs[i].opts = luks_opts;
583 specs[i].opts_size = sizeof(*luks_opts);
585 r = qemu_rbd_convert_luks_options(
586 qapi_RbdEncryptionOptionsLUKS_base(
587 &curr_encrypt->u.luks),
588 (char **)&luks_opts->passphrase,
589 &luks_opts->passphrase_size,
590 errp);
591 break;
593 case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS2: {
594 specs[i].format = RBD_ENCRYPTION_FORMAT_LUKS2;
596 luks2_opts = g_new0(rbd_encryption_luks2_format_options_t, 1);
597 specs[i].opts = luks2_opts;
598 specs[i].opts_size = sizeof(*luks2_opts);
600 r = qemu_rbd_convert_luks_options(
601 qapi_RbdEncryptionOptionsLUKS2_base(
602 &curr_encrypt->u.luks2),
603 (char **)&luks2_opts->passphrase,
604 &luks2_opts->passphrase_size,
605 errp);
606 break;
608 case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS_ANY: {
609 specs[i].format = RBD_ENCRYPTION_FORMAT_LUKS;
611 luks_any_opts = g_new0(rbd_encryption_luks_format_options_t, 1);
612 specs[i].opts = luks_any_opts;
613 specs[i].opts_size = sizeof(*luks_any_opts);
615 r = qemu_rbd_convert_luks_options(
616 qapi_RbdEncryptionOptionsLUKSAny_base(
617 &curr_encrypt->u.luks_any),
618 (char **)&luks_any_opts->passphrase,
619 &luks_any_opts->passphrase_size,
620 errp);
621 break;
623 default: {
624 r = -ENOTSUP;
625 error_setg_errno(
626 errp, -r, "unknown image encryption format: %u",
627 curr_encrypt->format);
631 if (r < 0) {
632 goto exit;
635 curr_encrypt = curr_encrypt->parent;
638 r = rbd_encryption_load2(image, specs, encrypt_count);
639 if (r < 0) {
640 error_setg_errno(errp, -r, "layered encryption load fail");
641 goto exit;
644 exit:
645 for (i = 0; i < encrypt_count; ++i) {
646 if (!specs[i].opts) {
647 break;
650 switch (specs[i].format) {
651 case RBD_ENCRYPTION_FORMAT_LUKS1: {
652 luks_opts = specs[i].opts;
653 g_free((void *)luks_opts->passphrase);
654 break;
656 case RBD_ENCRYPTION_FORMAT_LUKS2: {
657 luks2_opts = specs[i].opts;
658 g_free((void *)luks2_opts->passphrase);
659 break;
661 case RBD_ENCRYPTION_FORMAT_LUKS: {
662 luks_any_opts = specs[i].opts;
663 g_free((void *)luks_any_opts->passphrase);
664 break;
668 g_free(specs[i].opts);
670 g_free(specs);
671 return r;
673 #endif
674 #endif
676 /* FIXME Deprecate and remove keypairs or make it available in QMP. */
677 static int qemu_rbd_do_create(BlockdevCreateOptions *options,
678 const char *keypairs, const char *password_secret,
679 Error **errp)
681 BlockdevCreateOptionsRbd *opts = &options->u.rbd;
682 rados_t cluster;
683 rados_ioctx_t io_ctx;
684 int obj_order = 0;
685 int ret;
687 assert(options->driver == BLOCKDEV_DRIVER_RBD);
688 if (opts->location->snapshot) {
689 error_setg(errp, "Can't use snapshot name for image creation");
690 return -EINVAL;
693 #ifndef LIBRBD_SUPPORTS_ENCRYPTION
694 if (opts->encrypt) {
695 error_setg(errp, "RBD library does not support image encryption");
696 return -ENOTSUP;
698 #endif
700 if (opts->has_cluster_size) {
701 int64_t objsize = opts->cluster_size;
702 if ((objsize - 1) & objsize) { /* not a power of 2? */
703 error_setg(errp, "obj size needs to be power of 2");
704 return -EINVAL;
706 if (objsize < 4096) {
707 error_setg(errp, "obj size too small");
708 return -EINVAL;
710 obj_order = ctz32(objsize);
713 ret = qemu_rbd_connect(&cluster, &io_ctx, opts->location, false, keypairs,
714 password_secret, errp);
715 if (ret < 0) {
716 return ret;
719 ret = rbd_create(io_ctx, opts->location->image, opts->size, &obj_order);
720 if (ret < 0) {
721 error_setg_errno(errp, -ret, "error rbd create");
722 goto out;
725 #ifdef LIBRBD_SUPPORTS_ENCRYPTION
726 if (opts->encrypt) {
727 rbd_image_t image;
729 ret = rbd_open(io_ctx, opts->location->image, &image, NULL);
730 if (ret < 0) {
731 error_setg_errno(errp, -ret,
732 "error opening image '%s' for encryption format",
733 opts->location->image);
734 goto out;
737 ret = qemu_rbd_encryption_format(image, opts->encrypt, errp);
738 rbd_close(image);
739 if (ret < 0) {
740 /* encryption format fail, try removing the image */
741 rbd_remove(io_ctx, opts->location->image);
742 goto out;
745 #endif
747 ret = 0;
748 out:
749 rados_ioctx_destroy(io_ctx);
750 rados_shutdown(cluster);
751 return ret;
754 static int qemu_rbd_co_create(BlockdevCreateOptions *options, Error **errp)
756 return qemu_rbd_do_create(options, NULL, NULL, errp);
759 static int qemu_rbd_extract_encryption_create_options(
760 QemuOpts *opts,
761 RbdEncryptionCreateOptions **spec,
762 Error **errp)
764 QDict *opts_qdict;
765 QDict *encrypt_qdict;
766 Visitor *v;
767 int ret = 0;
769 opts_qdict = qemu_opts_to_qdict(opts, NULL);
770 qdict_extract_subqdict(opts_qdict, &encrypt_qdict, "encrypt.");
771 qobject_unref(opts_qdict);
772 if (!qdict_size(encrypt_qdict)) {
773 *spec = NULL;
774 goto exit;
777 /* Convert options into a QAPI object */
778 v = qobject_input_visitor_new_flat_confused(encrypt_qdict, errp);
779 if (!v) {
780 ret = -EINVAL;
781 goto exit;
784 visit_type_RbdEncryptionCreateOptions(v, NULL, spec, errp);
785 visit_free(v);
786 if (!*spec) {
787 ret = -EINVAL;
788 goto exit;
791 exit:
792 qobject_unref(encrypt_qdict);
793 return ret;
796 static int coroutine_fn qemu_rbd_co_create_opts(BlockDriver *drv,
797 const char *filename,
798 QemuOpts *opts,
799 Error **errp)
801 BlockdevCreateOptions *create_options;
802 BlockdevCreateOptionsRbd *rbd_opts;
803 BlockdevOptionsRbd *loc;
804 RbdEncryptionCreateOptions *encrypt = NULL;
805 Error *local_err = NULL;
806 const char *keypairs, *password_secret;
807 QDict *options = NULL;
808 int ret = 0;
810 create_options = g_new0(BlockdevCreateOptions, 1);
811 create_options->driver = BLOCKDEV_DRIVER_RBD;
812 rbd_opts = &create_options->u.rbd;
814 rbd_opts->location = g_new0(BlockdevOptionsRbd, 1);
816 password_secret = qemu_opt_get(opts, "password-secret");
818 /* Read out options */
819 rbd_opts->size = ROUND_UP(qemu_opt_get_size_del(opts, BLOCK_OPT_SIZE, 0),
820 BDRV_SECTOR_SIZE);
821 rbd_opts->cluster_size = qemu_opt_get_size_del(opts,
822 BLOCK_OPT_CLUSTER_SIZE, 0);
823 rbd_opts->has_cluster_size = (rbd_opts->cluster_size != 0);
825 options = qdict_new();
826 qemu_rbd_parse_filename(filename, options, &local_err);
827 if (local_err) {
828 ret = -EINVAL;
829 error_propagate(errp, local_err);
830 goto exit;
833 ret = qemu_rbd_extract_encryption_create_options(opts, &encrypt, errp);
834 if (ret < 0) {
835 goto exit;
837 rbd_opts->encrypt = encrypt;
840 * Caution: while qdict_get_try_str() is fine, getting non-string
841 * types would require more care. When @options come from -blockdev
842 * or blockdev_add, its members are typed according to the QAPI
843 * schema, but when they come from -drive, they're all QString.
845 loc = rbd_opts->location;
846 loc->pool = g_strdup(qdict_get_try_str(options, "pool"));
847 loc->conf = g_strdup(qdict_get_try_str(options, "conf"));
848 loc->user = g_strdup(qdict_get_try_str(options, "user"));
849 loc->q_namespace = g_strdup(qdict_get_try_str(options, "namespace"));
850 loc->image = g_strdup(qdict_get_try_str(options, "image"));
851 keypairs = qdict_get_try_str(options, "=keyvalue-pairs");
853 ret = qemu_rbd_do_create(create_options, keypairs, password_secret, errp);
854 if (ret < 0) {
855 goto exit;
858 exit:
859 qobject_unref(options);
860 qapi_free_BlockdevCreateOptions(create_options);
861 return ret;
864 static char *qemu_rbd_mon_host(BlockdevOptionsRbd *opts, Error **errp)
866 const char **vals;
867 const char *host, *port;
868 char *rados_str;
869 InetSocketAddressBaseList *p;
870 int i, cnt;
872 if (!opts->has_server) {
873 return NULL;
876 for (cnt = 0, p = opts->server; p; p = p->next) {
877 cnt++;
880 vals = g_new(const char *, cnt + 1);
882 for (i = 0, p = opts->server; p; p = p->next, i++) {
883 host = p->value->host;
884 port = p->value->port;
886 if (strchr(host, ':')) {
887 vals[i] = g_strdup_printf("[%s]:%s", host, port);
888 } else {
889 vals[i] = g_strdup_printf("%s:%s", host, port);
892 vals[i] = NULL;
894 rados_str = i ? g_strjoinv(";", (char **)vals) : NULL;
895 g_strfreev((char **)vals);
896 return rados_str;
899 static int qemu_rbd_connect(rados_t *cluster, rados_ioctx_t *io_ctx,
900 BlockdevOptionsRbd *opts, bool cache,
901 const char *keypairs, const char *secretid,
902 Error **errp)
904 char *mon_host = NULL;
905 Error *local_err = NULL;
906 int r;
908 if (secretid) {
909 if (opts->key_secret) {
910 error_setg(errp,
911 "Legacy 'password-secret' clashes with 'key-secret'");
912 return -EINVAL;
914 opts->key_secret = g_strdup(secretid);
917 mon_host = qemu_rbd_mon_host(opts, &local_err);
918 if (local_err) {
919 error_propagate(errp, local_err);
920 r = -EINVAL;
921 goto out;
924 r = rados_create(cluster, opts->user);
925 if (r < 0) {
926 error_setg_errno(errp, -r, "error initializing");
927 goto out;
930 /* try default location when conf=NULL, but ignore failure */
931 r = rados_conf_read_file(*cluster, opts->conf);
932 if (opts->conf && r < 0) {
933 error_setg_errno(errp, -r, "error reading conf file %s", opts->conf);
934 goto failed_shutdown;
937 r = qemu_rbd_set_keypairs(*cluster, keypairs, errp);
938 if (r < 0) {
939 goto failed_shutdown;
942 if (mon_host) {
943 r = rados_conf_set(*cluster, "mon_host", mon_host);
944 if (r < 0) {
945 goto failed_shutdown;
949 r = qemu_rbd_set_auth(*cluster, opts, errp);
950 if (r < 0) {
951 goto failed_shutdown;
955 * Fallback to more conservative semantics if setting cache
956 * options fails. Ignore errors from setting rbd_cache because the
957 * only possible error is that the option does not exist, and
958 * librbd defaults to no caching. If write through caching cannot
959 * be set up, fall back to no caching.
961 if (cache) {
962 rados_conf_set(*cluster, "rbd_cache", "true");
963 } else {
964 rados_conf_set(*cluster, "rbd_cache", "false");
967 r = rados_connect(*cluster);
968 if (r < 0) {
969 error_setg_errno(errp, -r, "error connecting");
970 goto failed_shutdown;
973 r = rados_ioctx_create(*cluster, opts->pool, io_ctx);
974 if (r < 0) {
975 error_setg_errno(errp, -r, "error opening pool %s", opts->pool);
976 goto failed_shutdown;
979 #ifdef HAVE_RBD_NAMESPACE_EXISTS
980 if (opts->q_namespace && strlen(opts->q_namespace) > 0) {
981 bool exists;
983 r = rbd_namespace_exists(*io_ctx, opts->q_namespace, &exists);
984 if (r < 0) {
985 error_setg_errno(errp, -r, "error checking namespace");
986 goto failed_ioctx_destroy;
989 if (!exists) {
990 error_setg(errp, "namespace '%s' does not exist",
991 opts->q_namespace);
992 r = -ENOENT;
993 goto failed_ioctx_destroy;
996 #endif
999 * Set the namespace after opening the io context on the pool,
1000 * if nspace == NULL or if nspace == "", it is just as we did nothing
1002 rados_ioctx_set_namespace(*io_ctx, opts->q_namespace);
1004 r = 0;
1005 goto out;
1007 #ifdef HAVE_RBD_NAMESPACE_EXISTS
1008 failed_ioctx_destroy:
1009 rados_ioctx_destroy(*io_ctx);
1010 #endif
1011 failed_shutdown:
1012 rados_shutdown(*cluster);
1013 out:
1014 g_free(mon_host);
1015 return r;
1018 static int qemu_rbd_convert_options(QDict *options, BlockdevOptionsRbd **opts,
1019 Error **errp)
1021 Visitor *v;
1023 /* Convert the remaining options into a QAPI object */
1024 v = qobject_input_visitor_new_flat_confused(options, errp);
1025 if (!v) {
1026 return -EINVAL;
1029 visit_type_BlockdevOptionsRbd(v, NULL, opts, errp);
1030 visit_free(v);
1031 if (!opts) {
1032 return -EINVAL;
1035 return 0;
1038 static int qemu_rbd_attempt_legacy_options(QDict *options,
1039 BlockdevOptionsRbd **opts,
1040 char **keypairs)
1042 char *filename;
1043 int r;
1045 filename = g_strdup(qdict_get_try_str(options, "filename"));
1046 if (!filename) {
1047 return -EINVAL;
1049 qdict_del(options, "filename");
1051 qemu_rbd_parse_filename(filename, options, NULL);
1053 /* keypairs freed by caller */
1054 *keypairs = g_strdup(qdict_get_try_str(options, "=keyvalue-pairs"));
1055 if (*keypairs) {
1056 qdict_del(options, "=keyvalue-pairs");
1059 r = qemu_rbd_convert_options(options, opts, NULL);
1061 g_free(filename);
1062 return r;
1065 static int qemu_rbd_open(BlockDriverState *bs, QDict *options, int flags,
1066 Error **errp)
1068 BDRVRBDState *s = bs->opaque;
1069 BlockdevOptionsRbd *opts = NULL;
1070 const QDictEntry *e;
1071 Error *local_err = NULL;
1072 char *keypairs, *secretid;
1073 rbd_image_info_t info;
1074 int r;
1076 keypairs = g_strdup(qdict_get_try_str(options, "=keyvalue-pairs"));
1077 if (keypairs) {
1078 qdict_del(options, "=keyvalue-pairs");
1081 secretid = g_strdup(qdict_get_try_str(options, "password-secret"));
1082 if (secretid) {
1083 qdict_del(options, "password-secret");
1086 r = qemu_rbd_convert_options(options, &opts, &local_err);
1087 if (local_err) {
1088 /* If keypairs are present, that means some options are present in
1089 * the modern option format. Don't attempt to parse legacy option
1090 * formats, as we won't support mixed usage. */
1091 if (keypairs) {
1092 error_propagate(errp, local_err);
1093 goto out;
1096 /* If the initial attempt to convert and process the options failed,
1097 * we may be attempting to open an image file that has the rbd options
1098 * specified in the older format consisting of all key/value pairs
1099 * encoded in the filename. Go ahead and attempt to parse the
1100 * filename, and see if we can pull out the required options. */
1101 r = qemu_rbd_attempt_legacy_options(options, &opts, &keypairs);
1102 if (r < 0) {
1103 /* Propagate the original error, not the legacy parsing fallback
1104 * error, as the latter was just a best-effort attempt. */
1105 error_propagate(errp, local_err);
1106 goto out;
1108 /* Take care whenever deciding to actually deprecate; once this ability
1109 * is removed, we will not be able to open any images with legacy-styled
1110 * backing image strings. */
1111 warn_report("RBD options encoded in the filename as keyvalue pairs "
1112 "is deprecated");
1115 /* Remove the processed options from the QDict (the visitor processes
1116 * _all_ options in the QDict) */
1117 while ((e = qdict_first(options))) {
1118 qdict_del(options, e->key);
1121 r = qemu_rbd_connect(&s->cluster, &s->io_ctx, opts,
1122 !(flags & BDRV_O_NOCACHE), keypairs, secretid, errp);
1123 if (r < 0) {
1124 goto out;
1127 s->snap = g_strdup(opts->snapshot);
1128 s->image_name = g_strdup(opts->image);
1130 /* rbd_open is always r/w */
1131 r = rbd_open(s->io_ctx, s->image_name, &s->image, s->snap);
1132 if (r < 0) {
1133 error_setg_errno(errp, -r, "error reading header from %s",
1134 s->image_name);
1135 goto failed_open;
1138 if (opts->encrypt) {
1139 #ifdef LIBRBD_SUPPORTS_ENCRYPTION
1140 if (opts->encrypt->parent) {
1141 #ifdef LIBRBD_SUPPORTS_ENCRYPTION_LOAD2
1142 r = qemu_rbd_encryption_load2(s->image, opts->encrypt, errp);
1143 #else
1144 r = -ENOTSUP;
1145 error_setg(errp, "RBD library does not support layered encryption");
1146 #endif
1147 } else {
1148 r = qemu_rbd_encryption_load(s->image, opts->encrypt, errp);
1150 if (r < 0) {
1151 goto failed_post_open;
1153 #else
1154 r = -ENOTSUP;
1155 error_setg(errp, "RBD library does not support image encryption");
1156 goto failed_post_open;
1157 #endif
1160 r = rbd_stat(s->image, &info, sizeof(info));
1161 if (r < 0) {
1162 error_setg_errno(errp, -r, "error getting image info from %s",
1163 s->image_name);
1164 goto failed_post_open;
1166 s->image_size = info.size;
1167 s->object_size = info.obj_size;
1169 /* If we are using an rbd snapshot, we must be r/o, otherwise
1170 * leave as-is */
1171 if (s->snap != NULL) {
1172 bdrv_graph_rdlock_main_loop();
1173 r = bdrv_apply_auto_read_only(bs, "rbd snapshots are read-only", errp);
1174 bdrv_graph_rdunlock_main_loop();
1175 if (r < 0) {
1176 goto failed_post_open;
1180 #ifdef LIBRBD_SUPPORTS_WRITE_ZEROES
1181 bs->supported_zero_flags = BDRV_REQ_MAY_UNMAP | BDRV_REQ_NO_FALLBACK;
1182 #endif
1184 /* When extending regular files, we get zeros from the OS */
1185 bs->supported_truncate_flags = BDRV_REQ_ZERO_WRITE;
1187 r = 0;
1188 goto out;
1190 failed_post_open:
1191 rbd_close(s->image);
1192 failed_open:
1193 rados_ioctx_destroy(s->io_ctx);
1194 g_free(s->snap);
1195 g_free(s->image_name);
1196 rados_shutdown(s->cluster);
1197 out:
1198 qapi_free_BlockdevOptionsRbd(opts);
1199 g_free(keypairs);
1200 g_free(secretid);
1201 return r;
1205 /* Since RBD is currently always opened R/W via the API,
1206 * we just need to check if we are using a snapshot or not, in
1207 * order to determine if we will allow it to be R/W */
1208 static int qemu_rbd_reopen_prepare(BDRVReopenState *state,
1209 BlockReopenQueue *queue, Error **errp)
1211 BDRVRBDState *s = state->bs->opaque;
1212 int ret = 0;
1214 GRAPH_RDLOCK_GUARD_MAINLOOP();
1216 if (s->snap && state->flags & BDRV_O_RDWR) {
1217 error_setg(errp,
1218 "Cannot change node '%s' to r/w when using RBD snapshot",
1219 bdrv_get_device_or_node_name(state->bs));
1220 ret = -EINVAL;
1223 return ret;
1226 static void qemu_rbd_close(BlockDriverState *bs)
1228 BDRVRBDState *s = bs->opaque;
1230 rbd_close(s->image);
1231 rados_ioctx_destroy(s->io_ctx);
1232 g_free(s->snap);
1233 g_free(s->image_name);
1234 rados_shutdown(s->cluster);
1237 /* Resize the RBD image and update the 'image_size' with the current size */
1238 static int qemu_rbd_resize(BlockDriverState *bs, uint64_t size)
1240 BDRVRBDState *s = bs->opaque;
1241 int r;
1243 r = rbd_resize(s->image, size);
1244 if (r < 0) {
1245 return r;
1248 s->image_size = size;
1250 return 0;
1253 static void qemu_rbd_finish_bh(void *opaque)
1255 RBDTask *task = opaque;
1256 task->complete = true;
1257 aio_co_wake(task->co);
1261 * This is the completion callback function for all rbd aio calls
1262 * started from qemu_rbd_start_co().
1264 * Note: this function is being called from a non qemu thread so
1265 * we need to be careful about what we do here. Generally we only
1266 * schedule a BH, and do the rest of the io completion handling
1267 * from qemu_rbd_finish_bh() which runs in a qemu context.
1269 static void qemu_rbd_completion_cb(rbd_completion_t c, RBDTask *task)
1271 task->ret = rbd_aio_get_return_value(c);
1272 rbd_aio_release(c);
1273 aio_bh_schedule_oneshot(bdrv_get_aio_context(task->bs),
1274 qemu_rbd_finish_bh, task);
1277 static int coroutine_fn qemu_rbd_start_co(BlockDriverState *bs,
1278 uint64_t offset,
1279 uint64_t bytes,
1280 QEMUIOVector *qiov,
1281 int flags,
1282 RBDAIOCmd cmd)
1284 BDRVRBDState *s = bs->opaque;
1285 RBDTask task = { .bs = bs, .co = qemu_coroutine_self() };
1286 rbd_completion_t c;
1287 int r;
1289 assert(!qiov || qiov->size == bytes);
1291 if (cmd == RBD_AIO_WRITE || cmd == RBD_AIO_WRITE_ZEROES) {
1293 * RBD APIs don't allow us to write more than actual size, so in order
1294 * to support growing images, we resize the image before write
1295 * operations that exceed the current size.
1297 if (offset + bytes > s->image_size) {
1298 r = qemu_rbd_resize(bs, offset + bytes);
1299 if (r < 0) {
1300 return r;
1305 r = rbd_aio_create_completion(&task,
1306 (rbd_callback_t) qemu_rbd_completion_cb, &c);
1307 if (r < 0) {
1308 return r;
1311 switch (cmd) {
1312 case RBD_AIO_READ:
1313 r = rbd_aio_readv(s->image, qiov->iov, qiov->niov, offset, c);
1314 break;
1315 case RBD_AIO_WRITE:
1316 r = rbd_aio_writev(s->image, qiov->iov, qiov->niov, offset, c);
1317 break;
1318 case RBD_AIO_DISCARD:
1319 r = rbd_aio_discard(s->image, offset, bytes, c);
1320 break;
1321 case RBD_AIO_FLUSH:
1322 r = rbd_aio_flush(s->image, c);
1323 break;
1324 #ifdef LIBRBD_SUPPORTS_WRITE_ZEROES
1325 case RBD_AIO_WRITE_ZEROES: {
1326 int zero_flags = 0;
1327 #ifdef RBD_WRITE_ZEROES_FLAG_THICK_PROVISION
1328 if (!(flags & BDRV_REQ_MAY_UNMAP)) {
1329 zero_flags = RBD_WRITE_ZEROES_FLAG_THICK_PROVISION;
1331 #endif
1332 r = rbd_aio_write_zeroes(s->image, offset, bytes, c, zero_flags, 0);
1333 break;
1335 #endif
1336 default:
1337 r = -EINVAL;
1340 if (r < 0) {
1341 error_report("rbd request failed early: cmd %d offset %" PRIu64
1342 " bytes %" PRIu64 " flags %d r %d (%s)", cmd, offset,
1343 bytes, flags, r, strerror(-r));
1344 rbd_aio_release(c);
1345 return r;
1348 while (!task.complete) {
1349 qemu_coroutine_yield();
1352 if (task.ret < 0) {
1353 error_report("rbd request failed: cmd %d offset %" PRIu64 " bytes %"
1354 PRIu64 " flags %d task.ret %" PRIi64 " (%s)", cmd, offset,
1355 bytes, flags, task.ret, strerror(-task.ret));
1356 return task.ret;
1359 /* zero pad short reads */
1360 if (cmd == RBD_AIO_READ && task.ret < qiov->size) {
1361 qemu_iovec_memset(qiov, task.ret, 0, qiov->size - task.ret);
1364 return 0;
1367 static int
1368 coroutine_fn qemu_rbd_co_preadv(BlockDriverState *bs, int64_t offset,
1369 int64_t bytes, QEMUIOVector *qiov,
1370 BdrvRequestFlags flags)
1372 return qemu_rbd_start_co(bs, offset, bytes, qiov, flags, RBD_AIO_READ);
1375 static int
1376 coroutine_fn qemu_rbd_co_pwritev(BlockDriverState *bs, int64_t offset,
1377 int64_t bytes, QEMUIOVector *qiov,
1378 BdrvRequestFlags flags)
1380 return qemu_rbd_start_co(bs, offset, bytes, qiov, flags, RBD_AIO_WRITE);
1383 static int coroutine_fn qemu_rbd_co_flush(BlockDriverState *bs)
1385 return qemu_rbd_start_co(bs, 0, 0, NULL, 0, RBD_AIO_FLUSH);
1388 static int coroutine_fn qemu_rbd_co_pdiscard(BlockDriverState *bs,
1389 int64_t offset, int64_t bytes)
1391 return qemu_rbd_start_co(bs, offset, bytes, NULL, 0, RBD_AIO_DISCARD);
1394 #ifdef LIBRBD_SUPPORTS_WRITE_ZEROES
1395 static int
1396 coroutine_fn qemu_rbd_co_pwrite_zeroes(BlockDriverState *bs, int64_t offset,
1397 int64_t bytes, BdrvRequestFlags flags)
1399 return qemu_rbd_start_co(bs, offset, bytes, NULL, flags,
1400 RBD_AIO_WRITE_ZEROES);
1402 #endif
1404 static int coroutine_fn
1405 qemu_rbd_co_get_info(BlockDriverState *bs, BlockDriverInfo *bdi)
1407 BDRVRBDState *s = bs->opaque;
1408 bdi->cluster_size = s->object_size;
1409 return 0;
1412 static ImageInfoSpecific *qemu_rbd_get_specific_info(BlockDriverState *bs,
1413 Error **errp)
1415 BDRVRBDState *s = bs->opaque;
1416 ImageInfoSpecific *spec_info;
1417 char buf[RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN] = {0};
1418 int r;
1420 if (s->image_size >= RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN) {
1421 r = rbd_read(s->image, 0,
1422 RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN, buf);
1423 if (r < 0) {
1424 error_setg_errno(errp, -r, "cannot read image start for probe");
1425 return NULL;
1429 spec_info = g_new(ImageInfoSpecific, 1);
1430 *spec_info = (ImageInfoSpecific){
1431 .type = IMAGE_INFO_SPECIFIC_KIND_RBD,
1432 .u.rbd.data = g_new0(ImageInfoSpecificRbd, 1),
1435 if (memcmp(buf, rbd_luks_header_verification,
1436 RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN) == 0) {
1437 spec_info->u.rbd.data->encryption_format =
1438 RBD_IMAGE_ENCRYPTION_FORMAT_LUKS;
1439 spec_info->u.rbd.data->has_encryption_format = true;
1440 } else if (memcmp(buf, rbd_luks2_header_verification,
1441 RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN) == 0) {
1442 spec_info->u.rbd.data->encryption_format =
1443 RBD_IMAGE_ENCRYPTION_FORMAT_LUKS2;
1444 spec_info->u.rbd.data->has_encryption_format = true;
1445 } else if (memcmp(buf, rbd_layered_luks_header_verification,
1446 RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN) == 0) {
1447 spec_info->u.rbd.data->encryption_format =
1448 RBD_IMAGE_ENCRYPTION_FORMAT_LUKS;
1449 spec_info->u.rbd.data->has_encryption_format = true;
1450 } else if (memcmp(buf, rbd_layered_luks2_header_verification,
1451 RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN) == 0) {
1452 spec_info->u.rbd.data->encryption_format =
1453 RBD_IMAGE_ENCRYPTION_FORMAT_LUKS2;
1454 spec_info->u.rbd.data->has_encryption_format = true;
1455 } else {
1456 spec_info->u.rbd.data->has_encryption_format = false;
1459 return spec_info;
1463 * rbd_diff_iterate2 allows to interrupt the exection by returning a negative
1464 * value in the callback routine. Choose a value that does not conflict with
1465 * an existing exitcode and return it if we want to prematurely stop the
1466 * execution because we detected a change in the allocation status.
1468 #define QEMU_RBD_EXIT_DIFF_ITERATE2 -9000
1470 static int qemu_rbd_diff_iterate_cb(uint64_t offs, size_t len,
1471 int exists, void *opaque)
1473 RBDDiffIterateReq *req = opaque;
1475 assert(req->offs + req->bytes <= offs);
1477 /* treat a hole like an unallocated area and bail out */
1478 if (!exists) {
1479 return 0;
1482 if (!req->exists && offs > req->offs) {
1484 * we started in an unallocated area and hit the first allocated
1485 * block. req->bytes must be set to the length of the unallocated area
1486 * before the allocated area. stop further processing.
1488 req->bytes = offs - req->offs;
1489 return QEMU_RBD_EXIT_DIFF_ITERATE2;
1492 if (req->exists && offs > req->offs + req->bytes) {
1494 * we started in an allocated area and jumped over an unallocated area,
1495 * req->bytes contains the length of the allocated area before the
1496 * unallocated area. stop further processing.
1498 return QEMU_RBD_EXIT_DIFF_ITERATE2;
1501 req->bytes += len;
1502 req->exists = true;
1504 return 0;
1507 static int coroutine_fn qemu_rbd_co_block_status(BlockDriverState *bs,
1508 bool want_zero, int64_t offset,
1509 int64_t bytes, int64_t *pnum,
1510 int64_t *map,
1511 BlockDriverState **file)
1513 BDRVRBDState *s = bs->opaque;
1514 int status, r;
1515 RBDDiffIterateReq req = { .offs = offset };
1516 uint64_t features, flags;
1517 uint64_t head = 0;
1519 assert(offset + bytes <= s->image_size);
1521 /* default to all sectors allocated */
1522 status = BDRV_BLOCK_DATA | BDRV_BLOCK_OFFSET_VALID;
1523 *map = offset;
1524 *file = bs;
1525 *pnum = bytes;
1527 /* check if RBD image supports fast-diff */
1528 r = rbd_get_features(s->image, &features);
1529 if (r < 0) {
1530 return status;
1532 if (!(features & RBD_FEATURE_FAST_DIFF)) {
1533 return status;
1536 /* check if RBD fast-diff result is valid */
1537 r = rbd_get_flags(s->image, &flags);
1538 if (r < 0) {
1539 return status;
1541 if (flags & RBD_FLAG_FAST_DIFF_INVALID) {
1542 return status;
1545 #if LIBRBD_VERSION_CODE < LIBRBD_VERSION(1, 17, 0)
1547 * librbd had a bug until early 2022 that affected all versions of ceph that
1548 * supported fast-diff. This bug results in reporting of incorrect offsets
1549 * if the offset parameter to rbd_diff_iterate2 is not object aligned.
1550 * Work around this bug by rounding down the offset to object boundaries.
1551 * This is OK because we call rbd_diff_iterate2 with whole_object = true.
1552 * However, this workaround only works for non cloned images with default
1553 * striping.
1555 * See: https://tracker.ceph.com/issues/53784
1558 /* check if RBD image has non-default striping enabled */
1559 if (features & RBD_FEATURE_STRIPINGV2) {
1560 return status;
1563 #pragma GCC diagnostic push
1564 #pragma GCC diagnostic ignored "-Wdeprecated-declarations"
1566 * check if RBD image is a clone (= has a parent).
1568 * rbd_get_parent_info is deprecated from Nautilus onwards, but the
1569 * replacement rbd_get_parent is not present in Luminous and Mimic.
1571 if (rbd_get_parent_info(s->image, NULL, 0, NULL, 0, NULL, 0) != -ENOENT) {
1572 return status;
1574 #pragma GCC diagnostic pop
1576 head = req.offs & (s->object_size - 1);
1577 req.offs -= head;
1578 bytes += head;
1579 #endif
1581 r = rbd_diff_iterate2(s->image, NULL, req.offs, bytes, true, true,
1582 qemu_rbd_diff_iterate_cb, &req);
1583 if (r < 0 && r != QEMU_RBD_EXIT_DIFF_ITERATE2) {
1584 return status;
1586 assert(req.bytes <= bytes);
1587 if (!req.exists) {
1588 if (r == 0) {
1590 * rbd_diff_iterate2 does not invoke callbacks for unallocated
1591 * areas. This here catches the case where no callback was
1592 * invoked at all (req.bytes == 0).
1594 assert(req.bytes == 0);
1595 req.bytes = bytes;
1597 status = BDRV_BLOCK_ZERO | BDRV_BLOCK_OFFSET_VALID;
1600 assert(req.bytes > head);
1601 *pnum = req.bytes - head;
1602 return status;
1605 static int64_t coroutine_fn qemu_rbd_co_getlength(BlockDriverState *bs)
1607 BDRVRBDState *s = bs->opaque;
1608 int r;
1610 r = rbd_get_size(s->image, &s->image_size);
1611 if (r < 0) {
1612 return r;
1615 return s->image_size;
1618 static int coroutine_fn qemu_rbd_co_truncate(BlockDriverState *bs,
1619 int64_t offset,
1620 bool exact,
1621 PreallocMode prealloc,
1622 BdrvRequestFlags flags,
1623 Error **errp)
1625 int r;
1627 if (prealloc != PREALLOC_MODE_OFF) {
1628 error_setg(errp, "Unsupported preallocation mode '%s'",
1629 PreallocMode_str(prealloc));
1630 return -ENOTSUP;
1633 r = qemu_rbd_resize(bs, offset);
1634 if (r < 0) {
1635 error_setg_errno(errp, -r, "Failed to resize file");
1636 return r;
1639 return 0;
1642 static int qemu_rbd_snap_create(BlockDriverState *bs,
1643 QEMUSnapshotInfo *sn_info)
1645 BDRVRBDState *s = bs->opaque;
1646 int r;
1648 if (sn_info->name[0] == '\0') {
1649 return -EINVAL; /* we need a name for rbd snapshots */
1653 * rbd snapshots are using the name as the user controlled unique identifier
1654 * we can't use the rbd snapid for that purpose, as it can't be set
1656 if (sn_info->id_str[0] != '\0' &&
1657 strcmp(sn_info->id_str, sn_info->name) != 0) {
1658 return -EINVAL;
1661 if (strlen(sn_info->name) >= sizeof(sn_info->id_str)) {
1662 return -ERANGE;
1665 r = rbd_snap_create(s->image, sn_info->name);
1666 if (r < 0) {
1667 error_report("failed to create snap: %s", strerror(-r));
1668 return r;
1671 return 0;
1674 static int qemu_rbd_snap_remove(BlockDriverState *bs,
1675 const char *snapshot_id,
1676 const char *snapshot_name,
1677 Error **errp)
1679 BDRVRBDState *s = bs->opaque;
1680 int r;
1682 if (!snapshot_name) {
1683 error_setg(errp, "rbd need a valid snapshot name");
1684 return -EINVAL;
1687 /* If snapshot_id is specified, it must be equal to name, see
1688 qemu_rbd_snap_list() */
1689 if (snapshot_id && strcmp(snapshot_id, snapshot_name)) {
1690 error_setg(errp,
1691 "rbd do not support snapshot id, it should be NULL or "
1692 "equal to snapshot name");
1693 return -EINVAL;
1696 r = rbd_snap_remove(s->image, snapshot_name);
1697 if (r < 0) {
1698 error_setg_errno(errp, -r, "Failed to remove the snapshot");
1700 return r;
1703 static int qemu_rbd_snap_rollback(BlockDriverState *bs,
1704 const char *snapshot_name)
1706 BDRVRBDState *s = bs->opaque;
1708 return rbd_snap_rollback(s->image, snapshot_name);
1711 static int qemu_rbd_snap_list(BlockDriverState *bs,
1712 QEMUSnapshotInfo **psn_tab)
1714 BDRVRBDState *s = bs->opaque;
1715 QEMUSnapshotInfo *sn_info, *sn_tab = NULL;
1716 int i, snap_count;
1717 rbd_snap_info_t *snaps;
1718 int max_snaps = RBD_MAX_SNAPS;
1720 do {
1721 snaps = g_new(rbd_snap_info_t, max_snaps);
1722 snap_count = rbd_snap_list(s->image, snaps, &max_snaps);
1723 if (snap_count <= 0) {
1724 g_free(snaps);
1726 } while (snap_count == -ERANGE);
1728 if (snap_count <= 0) {
1729 goto done;
1732 sn_tab = g_new0(QEMUSnapshotInfo, snap_count);
1734 for (i = 0; i < snap_count; i++) {
1735 const char *snap_name = snaps[i].name;
1737 sn_info = sn_tab + i;
1738 pstrcpy(sn_info->id_str, sizeof(sn_info->id_str), snap_name);
1739 pstrcpy(sn_info->name, sizeof(sn_info->name), snap_name);
1741 sn_info->vm_state_size = snaps[i].size;
1742 sn_info->date_sec = 0;
1743 sn_info->date_nsec = 0;
1744 sn_info->vm_clock_nsec = 0;
1746 rbd_snap_list_end(snaps);
1747 g_free(snaps);
1749 done:
1750 *psn_tab = sn_tab;
1751 return snap_count;
1754 static void coroutine_fn qemu_rbd_co_invalidate_cache(BlockDriverState *bs,
1755 Error **errp)
1757 BDRVRBDState *s = bs->opaque;
1758 int r = rbd_invalidate_cache(s->image);
1759 if (r < 0) {
1760 error_setg_errno(errp, -r, "Failed to invalidate the cache");
1764 static QemuOptsList qemu_rbd_create_opts = {
1765 .name = "rbd-create-opts",
1766 .head = QTAILQ_HEAD_INITIALIZER(qemu_rbd_create_opts.head),
1767 .desc = {
1769 .name = BLOCK_OPT_SIZE,
1770 .type = QEMU_OPT_SIZE,
1771 .help = "Virtual disk size"
1774 .name = BLOCK_OPT_CLUSTER_SIZE,
1775 .type = QEMU_OPT_SIZE,
1776 .help = "RBD object size"
1779 .name = "password-secret",
1780 .type = QEMU_OPT_STRING,
1781 .help = "ID of secret providing the password",
1784 .name = "encrypt.format",
1785 .type = QEMU_OPT_STRING,
1786 .help = "Encrypt the image, format choices: 'luks', 'luks2'",
1789 .name = "encrypt.cipher-alg",
1790 .type = QEMU_OPT_STRING,
1791 .help = "Name of encryption cipher algorithm"
1792 " (allowed values: aes-128, aes-256)",
1795 .name = "encrypt.key-secret",
1796 .type = QEMU_OPT_STRING,
1797 .help = "ID of secret providing LUKS passphrase",
1799 { /* end of list */ }
1803 static const char *const qemu_rbd_strong_runtime_opts[] = {
1804 "pool",
1805 "namespace",
1806 "image",
1807 "conf",
1808 "snapshot",
1809 "user",
1810 "server.",
1811 "password-secret",
1813 NULL
1816 static BlockDriver bdrv_rbd = {
1817 .format_name = "rbd",
1818 .instance_size = sizeof(BDRVRBDState),
1819 .bdrv_parse_filename = qemu_rbd_parse_filename,
1820 .bdrv_file_open = qemu_rbd_open,
1821 .bdrv_close = qemu_rbd_close,
1822 .bdrv_reopen_prepare = qemu_rbd_reopen_prepare,
1823 .bdrv_co_create = qemu_rbd_co_create,
1824 .bdrv_co_create_opts = qemu_rbd_co_create_opts,
1825 .bdrv_has_zero_init = bdrv_has_zero_init_1,
1826 .bdrv_co_get_info = qemu_rbd_co_get_info,
1827 .bdrv_get_specific_info = qemu_rbd_get_specific_info,
1828 .create_opts = &qemu_rbd_create_opts,
1829 .bdrv_co_getlength = qemu_rbd_co_getlength,
1830 .bdrv_co_truncate = qemu_rbd_co_truncate,
1831 .protocol_name = "rbd",
1833 .bdrv_co_preadv = qemu_rbd_co_preadv,
1834 .bdrv_co_pwritev = qemu_rbd_co_pwritev,
1835 .bdrv_co_flush_to_disk = qemu_rbd_co_flush,
1836 .bdrv_co_pdiscard = qemu_rbd_co_pdiscard,
1837 #ifdef LIBRBD_SUPPORTS_WRITE_ZEROES
1838 .bdrv_co_pwrite_zeroes = qemu_rbd_co_pwrite_zeroes,
1839 #endif
1840 .bdrv_co_block_status = qemu_rbd_co_block_status,
1842 .bdrv_snapshot_create = qemu_rbd_snap_create,
1843 .bdrv_snapshot_delete = qemu_rbd_snap_remove,
1844 .bdrv_snapshot_list = qemu_rbd_snap_list,
1845 .bdrv_snapshot_goto = qemu_rbd_snap_rollback,
1846 .bdrv_co_invalidate_cache = qemu_rbd_co_invalidate_cache,
1848 .strong_runtime_opts = qemu_rbd_strong_runtime_opts,
1851 static void bdrv_rbd_init(void)
1853 bdrv_register(&bdrv_rbd);
1856 block_init(bdrv_rbd_init);