rbd: pass length, not op for osd completions
[linux-2.6/cjktty.git] / drivers / block / rbd.c
blob22085e86a4097064d4e7c78cc4ccf7473a240d53
1 /*
2 rbd.c -- Export ceph rados objects as a Linux block device
5 based on drivers/block/osdblk.c:
7 Copyright 2009 Red Hat, Inc.
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation.
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
18 You should have received a copy of the GNU General Public License
19 along with this program; see the file COPYING. If not, write to
20 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
24 For usage instructions, please refer to:
26 Documentation/ABI/testing/sysfs-bus-rbd
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
39 #include <linux/fs.h>
40 #include <linux/blkdev.h>
42 #include "rbd_types.h"
44 #define RBD_DEBUG /* Activate rbd_assert() calls */
47 * The basic unit of block I/O is a sector. It is interpreted in a
48 * number of contexts in Linux (blk, bio, genhd), but the default is
49 * universally 512 bytes. These symbols are just slightly more
50 * meaningful than the bare numbers they represent.
52 #define SECTOR_SHIFT 9
53 #define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
55 /* It might be useful to have these defined elsewhere */
57 #define U8_MAX ((u8) (~0U))
58 #define U16_MAX ((u16) (~0U))
59 #define U32_MAX ((u32) (~0U))
60 #define U64_MAX ((u64) (~0ULL))
62 #define RBD_DRV_NAME "rbd"
63 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
65 #define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
67 #define RBD_SNAP_DEV_NAME_PREFIX "snap_"
68 #define RBD_MAX_SNAP_NAME_LEN \
69 (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
71 #define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */
73 #define RBD_SNAP_HEAD_NAME "-"
75 /* This allows a single page to hold an image name sent by OSD */
76 #define RBD_IMAGE_NAME_LEN_MAX (PAGE_SIZE - sizeof (__le32) - 1)
77 #define RBD_IMAGE_ID_LEN_MAX 64
79 #define RBD_OBJ_PREFIX_LEN_MAX 64
81 /* Feature bits */
83 #define RBD_FEATURE_LAYERING 1
85 /* Features supported by this (client software) implementation. */
87 #define RBD_FEATURES_ALL (0)
90 * An RBD device name will be "rbd#", where the "rbd" comes from
91 * RBD_DRV_NAME above, and # is a unique integer identifier.
92 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
93 * enough to hold all possible device names.
95 #define DEV_NAME_LEN 32
96 #define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
99 * block device image metadata (in-memory version)
101 struct rbd_image_header {
102 /* These four fields never change for a given rbd image */
103 char *object_prefix;
104 u64 features;
105 __u8 obj_order;
106 __u8 crypt_type;
107 __u8 comp_type;
109 /* The remaining fields need to be updated occasionally */
110 u64 image_size;
111 struct ceph_snap_context *snapc;
112 char *snap_names;
113 u64 *snap_sizes;
115 u64 obj_version;
119 * An rbd image specification.
121 * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
122 * identify an image. Each rbd_dev structure includes a pointer to
123 * an rbd_spec structure that encapsulates this identity.
125 * Each of the id's in an rbd_spec has an associated name. For a
126 * user-mapped image, the names are supplied and the id's associated
127 * with them are looked up. For a layered image, a parent image is
128 * defined by the tuple, and the names are looked up.
130 * An rbd_dev structure contains a parent_spec pointer which is
131 * non-null if the image it represents is a child in a layered
132 * image. This pointer will refer to the rbd_spec structure used
133 * by the parent rbd_dev for its own identity (i.e., the structure
134 * is shared between the parent and child).
136 * Since these structures are populated once, during the discovery
137 * phase of image construction, they are effectively immutable so
138 * we make no effort to synchronize access to them.
140 * Note that code herein does not assume the image name is known (it
141 * could be a null pointer).
143 struct rbd_spec {
144 u64 pool_id;
145 char *pool_name;
147 char *image_id;
148 char *image_name;
150 u64 snap_id;
151 char *snap_name;
153 struct kref kref;
157 * an instance of the client. multiple devices may share an rbd client.
159 struct rbd_client {
160 struct ceph_client *client;
161 struct kref kref;
162 struct list_head node;
165 struct rbd_img_request;
166 typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
168 #define BAD_WHICH U32_MAX /* Good which or bad which, which? */
170 struct rbd_obj_request;
171 typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
173 enum obj_request_type {
174 OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
177 struct rbd_obj_request {
178 const char *object_name;
179 u64 offset; /* object start byte */
180 u64 length; /* bytes from offset */
182 struct rbd_img_request *img_request;
183 struct list_head links; /* img_request->obj_requests */
184 u32 which; /* posn image request list */
186 enum obj_request_type type;
187 union {
188 struct bio *bio_list;
189 struct {
190 struct page **pages;
191 u32 page_count;
195 struct ceph_osd_request *osd_req;
197 u64 xferred; /* bytes transferred */
198 u64 version;
199 s32 result;
200 atomic_t done;
202 rbd_obj_callback_t callback;
203 struct completion completion;
205 struct kref kref;
208 struct rbd_img_request {
209 struct request *rq;
210 struct rbd_device *rbd_dev;
211 u64 offset; /* starting image byte offset */
212 u64 length; /* byte count from offset */
213 bool write_request; /* false for read */
214 union {
215 struct ceph_snap_context *snapc; /* for writes */
216 u64 snap_id; /* for reads */
218 spinlock_t completion_lock;/* protects next_completion */
219 u32 next_completion;
220 rbd_img_callback_t callback;
222 u32 obj_request_count;
223 struct list_head obj_requests; /* rbd_obj_request structs */
225 struct kref kref;
228 #define for_each_obj_request(ireq, oreq) \
229 list_for_each_entry(oreq, &(ireq)->obj_requests, links)
230 #define for_each_obj_request_from(ireq, oreq) \
231 list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
232 #define for_each_obj_request_safe(ireq, oreq, n) \
233 list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
235 struct rbd_snap {
236 struct device dev;
237 const char *name;
238 u64 size;
239 struct list_head node;
240 u64 id;
241 u64 features;
244 struct rbd_mapping {
245 u64 size;
246 u64 features;
247 bool read_only;
251 * a single device
253 struct rbd_device {
254 int dev_id; /* blkdev unique id */
256 int major; /* blkdev assigned major */
257 struct gendisk *disk; /* blkdev's gendisk and rq */
259 u32 image_format; /* Either 1 or 2 */
260 struct rbd_client *rbd_client;
262 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
264 spinlock_t lock; /* queue, flags, open_count */
266 struct rbd_image_header header;
267 unsigned long flags; /* possibly lock protected */
268 struct rbd_spec *spec;
270 char *header_name;
272 struct ceph_file_layout layout;
274 struct ceph_osd_event *watch_event;
275 struct rbd_obj_request *watch_request;
277 struct rbd_spec *parent_spec;
278 u64 parent_overlap;
280 /* protects updating the header */
281 struct rw_semaphore header_rwsem;
283 struct rbd_mapping mapping;
285 struct list_head node;
287 /* list of snapshots */
288 struct list_head snaps;
290 /* sysfs related */
291 struct device dev;
292 unsigned long open_count; /* protected by lock */
296 * Flag bits for rbd_dev->flags. If atomicity is required,
297 * rbd_dev->lock is used to protect access.
299 * Currently, only the "removing" flag (which is coupled with the
300 * "open_count" field) requires atomic access.
302 enum rbd_dev_flags {
303 RBD_DEV_FLAG_EXISTS, /* mapped snapshot has not been deleted */
304 RBD_DEV_FLAG_REMOVING, /* this mapping is being removed */
307 static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
309 static LIST_HEAD(rbd_dev_list); /* devices */
310 static DEFINE_SPINLOCK(rbd_dev_list_lock);
312 static LIST_HEAD(rbd_client_list); /* clients */
313 static DEFINE_SPINLOCK(rbd_client_list_lock);
315 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
316 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev);
318 static void rbd_dev_release(struct device *dev);
319 static void rbd_remove_snap_dev(struct rbd_snap *snap);
321 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
322 size_t count);
323 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
324 size_t count);
326 static struct bus_attribute rbd_bus_attrs[] = {
327 __ATTR(add, S_IWUSR, NULL, rbd_add),
328 __ATTR(remove, S_IWUSR, NULL, rbd_remove),
329 __ATTR_NULL
332 static struct bus_type rbd_bus_type = {
333 .name = "rbd",
334 .bus_attrs = rbd_bus_attrs,
337 static void rbd_root_dev_release(struct device *dev)
341 static struct device rbd_root_dev = {
342 .init_name = "rbd",
343 .release = rbd_root_dev_release,
346 static __printf(2, 3)
347 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
349 struct va_format vaf;
350 va_list args;
352 va_start(args, fmt);
353 vaf.fmt = fmt;
354 vaf.va = &args;
356 if (!rbd_dev)
357 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
358 else if (rbd_dev->disk)
359 printk(KERN_WARNING "%s: %s: %pV\n",
360 RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
361 else if (rbd_dev->spec && rbd_dev->spec->image_name)
362 printk(KERN_WARNING "%s: image %s: %pV\n",
363 RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
364 else if (rbd_dev->spec && rbd_dev->spec->image_id)
365 printk(KERN_WARNING "%s: id %s: %pV\n",
366 RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
367 else /* punt */
368 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
369 RBD_DRV_NAME, rbd_dev, &vaf);
370 va_end(args);
373 #ifdef RBD_DEBUG
374 #define rbd_assert(expr) \
375 if (unlikely(!(expr))) { \
376 printk(KERN_ERR "\nAssertion failure in %s() " \
377 "at line %d:\n\n" \
378 "\trbd_assert(%s);\n\n", \
379 __func__, __LINE__, #expr); \
380 BUG(); \
382 #else /* !RBD_DEBUG */
383 # define rbd_assert(expr) ((void) 0)
384 #endif /* !RBD_DEBUG */
386 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver);
387 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver);
389 static int rbd_open(struct block_device *bdev, fmode_t mode)
391 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
392 bool removing = false;
394 if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
395 return -EROFS;
397 spin_lock_irq(&rbd_dev->lock);
398 if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
399 removing = true;
400 else
401 rbd_dev->open_count++;
402 spin_unlock_irq(&rbd_dev->lock);
403 if (removing)
404 return -ENOENT;
406 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
407 (void) get_device(&rbd_dev->dev);
408 set_device_ro(bdev, rbd_dev->mapping.read_only);
409 mutex_unlock(&ctl_mutex);
411 return 0;
414 static int rbd_release(struct gendisk *disk, fmode_t mode)
416 struct rbd_device *rbd_dev = disk->private_data;
417 unsigned long open_count_before;
419 spin_lock_irq(&rbd_dev->lock);
420 open_count_before = rbd_dev->open_count--;
421 spin_unlock_irq(&rbd_dev->lock);
422 rbd_assert(open_count_before > 0);
424 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
425 put_device(&rbd_dev->dev);
426 mutex_unlock(&ctl_mutex);
428 return 0;
431 static const struct block_device_operations rbd_bd_ops = {
432 .owner = THIS_MODULE,
433 .open = rbd_open,
434 .release = rbd_release,
438 * Initialize an rbd client instance.
439 * We own *ceph_opts.
441 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
443 struct rbd_client *rbdc;
444 int ret = -ENOMEM;
446 dout("%s:\n", __func__);
447 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
448 if (!rbdc)
449 goto out_opt;
451 kref_init(&rbdc->kref);
452 INIT_LIST_HEAD(&rbdc->node);
454 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
456 rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
457 if (IS_ERR(rbdc->client))
458 goto out_mutex;
459 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
461 ret = ceph_open_session(rbdc->client);
462 if (ret < 0)
463 goto out_err;
465 spin_lock(&rbd_client_list_lock);
466 list_add_tail(&rbdc->node, &rbd_client_list);
467 spin_unlock(&rbd_client_list_lock);
469 mutex_unlock(&ctl_mutex);
470 dout("%s: rbdc %p\n", __func__, rbdc);
472 return rbdc;
474 out_err:
475 ceph_destroy_client(rbdc->client);
476 out_mutex:
477 mutex_unlock(&ctl_mutex);
478 kfree(rbdc);
479 out_opt:
480 if (ceph_opts)
481 ceph_destroy_options(ceph_opts);
482 dout("%s: error %d\n", __func__, ret);
484 return ERR_PTR(ret);
488 * Find a ceph client with specific addr and configuration. If
489 * found, bump its reference count.
491 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
493 struct rbd_client *client_node;
494 bool found = false;
496 if (ceph_opts->flags & CEPH_OPT_NOSHARE)
497 return NULL;
499 spin_lock(&rbd_client_list_lock);
500 list_for_each_entry(client_node, &rbd_client_list, node) {
501 if (!ceph_compare_options(ceph_opts, client_node->client)) {
502 kref_get(&client_node->kref);
503 found = true;
504 break;
507 spin_unlock(&rbd_client_list_lock);
509 return found ? client_node : NULL;
513 * mount options
515 enum {
516 Opt_last_int,
517 /* int args above */
518 Opt_last_string,
519 /* string args above */
520 Opt_read_only,
521 Opt_read_write,
522 /* Boolean args above */
523 Opt_last_bool,
526 static match_table_t rbd_opts_tokens = {
527 /* int args above */
528 /* string args above */
529 {Opt_read_only, "read_only"},
530 {Opt_read_only, "ro"}, /* Alternate spelling */
531 {Opt_read_write, "read_write"},
532 {Opt_read_write, "rw"}, /* Alternate spelling */
533 /* Boolean args above */
534 {-1, NULL}
537 struct rbd_options {
538 bool read_only;
541 #define RBD_READ_ONLY_DEFAULT false
543 static int parse_rbd_opts_token(char *c, void *private)
545 struct rbd_options *rbd_opts = private;
546 substring_t argstr[MAX_OPT_ARGS];
547 int token, intval, ret;
549 token = match_token(c, rbd_opts_tokens, argstr);
550 if (token < 0)
551 return -EINVAL;
553 if (token < Opt_last_int) {
554 ret = match_int(&argstr[0], &intval);
555 if (ret < 0) {
556 pr_err("bad mount option arg (not int) "
557 "at '%s'\n", c);
558 return ret;
560 dout("got int token %d val %d\n", token, intval);
561 } else if (token > Opt_last_int && token < Opt_last_string) {
562 dout("got string token %d val %s\n", token,
563 argstr[0].from);
564 } else if (token > Opt_last_string && token < Opt_last_bool) {
565 dout("got Boolean token %d\n", token);
566 } else {
567 dout("got token %d\n", token);
570 switch (token) {
571 case Opt_read_only:
572 rbd_opts->read_only = true;
573 break;
574 case Opt_read_write:
575 rbd_opts->read_only = false;
576 break;
577 default:
578 rbd_assert(false);
579 break;
581 return 0;
585 * Get a ceph client with specific addr and configuration, if one does
586 * not exist create it.
588 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
590 struct rbd_client *rbdc;
592 rbdc = rbd_client_find(ceph_opts);
593 if (rbdc) /* using an existing client */
594 ceph_destroy_options(ceph_opts);
595 else
596 rbdc = rbd_client_create(ceph_opts);
598 return rbdc;
602 * Destroy ceph client
604 * Caller must hold rbd_client_list_lock.
606 static void rbd_client_release(struct kref *kref)
608 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
610 dout("%s: rbdc %p\n", __func__, rbdc);
611 spin_lock(&rbd_client_list_lock);
612 list_del(&rbdc->node);
613 spin_unlock(&rbd_client_list_lock);
615 ceph_destroy_client(rbdc->client);
616 kfree(rbdc);
620 * Drop reference to ceph client node. If it's not referenced anymore, release
621 * it.
623 static void rbd_put_client(struct rbd_client *rbdc)
625 if (rbdc)
626 kref_put(&rbdc->kref, rbd_client_release);
629 static bool rbd_image_format_valid(u32 image_format)
631 return image_format == 1 || image_format == 2;
634 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
636 size_t size;
637 u32 snap_count;
639 /* The header has to start with the magic rbd header text */
640 if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
641 return false;
643 /* The bio layer requires at least sector-sized I/O */
645 if (ondisk->options.order < SECTOR_SHIFT)
646 return false;
648 /* If we use u64 in a few spots we may be able to loosen this */
650 if (ondisk->options.order > 8 * sizeof (int) - 1)
651 return false;
654 * The size of a snapshot header has to fit in a size_t, and
655 * that limits the number of snapshots.
657 snap_count = le32_to_cpu(ondisk->snap_count);
658 size = SIZE_MAX - sizeof (struct ceph_snap_context);
659 if (snap_count > size / sizeof (__le64))
660 return false;
663 * Not only that, but the size of the entire the snapshot
664 * header must also be representable in a size_t.
666 size -= snap_count * sizeof (__le64);
667 if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
668 return false;
670 return true;
674 * Create a new header structure, translate header format from the on-disk
675 * header.
677 static int rbd_header_from_disk(struct rbd_image_header *header,
678 struct rbd_image_header_ondisk *ondisk)
680 u32 snap_count;
681 size_t len;
682 size_t size;
683 u32 i;
685 memset(header, 0, sizeof (*header));
687 snap_count = le32_to_cpu(ondisk->snap_count);
689 len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
690 header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
691 if (!header->object_prefix)
692 return -ENOMEM;
693 memcpy(header->object_prefix, ondisk->object_prefix, len);
694 header->object_prefix[len] = '\0';
696 if (snap_count) {
697 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
699 /* Save a copy of the snapshot names */
701 if (snap_names_len > (u64) SIZE_MAX)
702 return -EIO;
703 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
704 if (!header->snap_names)
705 goto out_err;
707 * Note that rbd_dev_v1_header_read() guarantees
708 * the ondisk buffer we're working with has
709 * snap_names_len bytes beyond the end of the
710 * snapshot id array, this memcpy() is safe.
712 memcpy(header->snap_names, &ondisk->snaps[snap_count],
713 snap_names_len);
715 /* Record each snapshot's size */
717 size = snap_count * sizeof (*header->snap_sizes);
718 header->snap_sizes = kmalloc(size, GFP_KERNEL);
719 if (!header->snap_sizes)
720 goto out_err;
721 for (i = 0; i < snap_count; i++)
722 header->snap_sizes[i] =
723 le64_to_cpu(ondisk->snaps[i].image_size);
724 } else {
725 WARN_ON(ondisk->snap_names_len);
726 header->snap_names = NULL;
727 header->snap_sizes = NULL;
730 header->features = 0; /* No features support in v1 images */
731 header->obj_order = ondisk->options.order;
732 header->crypt_type = ondisk->options.crypt_type;
733 header->comp_type = ondisk->options.comp_type;
735 /* Allocate and fill in the snapshot context */
737 header->image_size = le64_to_cpu(ondisk->image_size);
738 size = sizeof (struct ceph_snap_context);
739 size += snap_count * sizeof (header->snapc->snaps[0]);
740 header->snapc = kzalloc(size, GFP_KERNEL);
741 if (!header->snapc)
742 goto out_err;
744 atomic_set(&header->snapc->nref, 1);
745 header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
746 header->snapc->num_snaps = snap_count;
747 for (i = 0; i < snap_count; i++)
748 header->snapc->snaps[i] =
749 le64_to_cpu(ondisk->snaps[i].id);
751 return 0;
753 out_err:
754 kfree(header->snap_sizes);
755 header->snap_sizes = NULL;
756 kfree(header->snap_names);
757 header->snap_names = NULL;
758 kfree(header->object_prefix);
759 header->object_prefix = NULL;
761 return -ENOMEM;
764 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
766 struct rbd_snap *snap;
768 if (snap_id == CEPH_NOSNAP)
769 return RBD_SNAP_HEAD_NAME;
771 list_for_each_entry(snap, &rbd_dev->snaps, node)
772 if (snap_id == snap->id)
773 return snap->name;
775 return NULL;
778 static int snap_by_name(struct rbd_device *rbd_dev, const char *snap_name)
781 struct rbd_snap *snap;
783 list_for_each_entry(snap, &rbd_dev->snaps, node) {
784 if (!strcmp(snap_name, snap->name)) {
785 rbd_dev->spec->snap_id = snap->id;
786 rbd_dev->mapping.size = snap->size;
787 rbd_dev->mapping.features = snap->features;
789 return 0;
793 return -ENOENT;
796 static int rbd_dev_set_mapping(struct rbd_device *rbd_dev)
798 int ret;
800 if (!memcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME,
801 sizeof (RBD_SNAP_HEAD_NAME))) {
802 rbd_dev->spec->snap_id = CEPH_NOSNAP;
803 rbd_dev->mapping.size = rbd_dev->header.image_size;
804 rbd_dev->mapping.features = rbd_dev->header.features;
805 ret = 0;
806 } else {
807 ret = snap_by_name(rbd_dev, rbd_dev->spec->snap_name);
808 if (ret < 0)
809 goto done;
810 rbd_dev->mapping.read_only = true;
812 set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
814 done:
815 return ret;
818 static void rbd_header_free(struct rbd_image_header *header)
820 kfree(header->object_prefix);
821 header->object_prefix = NULL;
822 kfree(header->snap_sizes);
823 header->snap_sizes = NULL;
824 kfree(header->snap_names);
825 header->snap_names = NULL;
826 ceph_put_snap_context(header->snapc);
827 header->snapc = NULL;
830 static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
832 char *name;
833 u64 segment;
834 int ret;
836 name = kmalloc(MAX_OBJ_NAME_SIZE + 1, GFP_NOIO);
837 if (!name)
838 return NULL;
839 segment = offset >> rbd_dev->header.obj_order;
840 ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx",
841 rbd_dev->header.object_prefix, segment);
842 if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
843 pr_err("error formatting segment name for #%llu (%d)\n",
844 segment, ret);
845 kfree(name);
846 name = NULL;
849 return name;
852 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
854 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
856 return offset & (segment_size - 1);
859 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
860 u64 offset, u64 length)
862 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
864 offset &= segment_size - 1;
866 rbd_assert(length <= U64_MAX - offset);
867 if (offset + length > segment_size)
868 length = segment_size - offset;
870 return length;
874 * returns the size of an object in the image
876 static u64 rbd_obj_bytes(struct rbd_image_header *header)
878 return 1 << header->obj_order;
882 * bio helpers
885 static void bio_chain_put(struct bio *chain)
887 struct bio *tmp;
889 while (chain) {
890 tmp = chain;
891 chain = chain->bi_next;
892 bio_put(tmp);
897 * zeros a bio chain, starting at specific offset
899 static void zero_bio_chain(struct bio *chain, int start_ofs)
901 struct bio_vec *bv;
902 unsigned long flags;
903 void *buf;
904 int i;
905 int pos = 0;
907 while (chain) {
908 bio_for_each_segment(bv, chain, i) {
909 if (pos + bv->bv_len > start_ofs) {
910 int remainder = max(start_ofs - pos, 0);
911 buf = bvec_kmap_irq(bv, &flags);
912 memset(buf + remainder, 0,
913 bv->bv_len - remainder);
914 bvec_kunmap_irq(buf, &flags);
916 pos += bv->bv_len;
919 chain = chain->bi_next;
924 * Clone a portion of a bio, starting at the given byte offset
925 * and continuing for the number of bytes indicated.
927 static struct bio *bio_clone_range(struct bio *bio_src,
928 unsigned int offset,
929 unsigned int len,
930 gfp_t gfpmask)
932 struct bio_vec *bv;
933 unsigned int resid;
934 unsigned short idx;
935 unsigned int voff;
936 unsigned short end_idx;
937 unsigned short vcnt;
938 struct bio *bio;
940 /* Handle the easy case for the caller */
942 if (!offset && len == bio_src->bi_size)
943 return bio_clone(bio_src, gfpmask);
945 if (WARN_ON_ONCE(!len))
946 return NULL;
947 if (WARN_ON_ONCE(len > bio_src->bi_size))
948 return NULL;
949 if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
950 return NULL;
952 /* Find first affected segment... */
954 resid = offset;
955 __bio_for_each_segment(bv, bio_src, idx, 0) {
956 if (resid < bv->bv_len)
957 break;
958 resid -= bv->bv_len;
960 voff = resid;
962 /* ...and the last affected segment */
964 resid += len;
965 __bio_for_each_segment(bv, bio_src, end_idx, idx) {
966 if (resid <= bv->bv_len)
967 break;
968 resid -= bv->bv_len;
970 vcnt = end_idx - idx + 1;
972 /* Build the clone */
974 bio = bio_alloc(gfpmask, (unsigned int) vcnt);
975 if (!bio)
976 return NULL; /* ENOMEM */
978 bio->bi_bdev = bio_src->bi_bdev;
979 bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
980 bio->bi_rw = bio_src->bi_rw;
981 bio->bi_flags |= 1 << BIO_CLONED;
984 * Copy over our part of the bio_vec, then update the first
985 * and last (or only) entries.
987 memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
988 vcnt * sizeof (struct bio_vec));
989 bio->bi_io_vec[0].bv_offset += voff;
990 if (vcnt > 1) {
991 bio->bi_io_vec[0].bv_len -= voff;
992 bio->bi_io_vec[vcnt - 1].bv_len = resid;
993 } else {
994 bio->bi_io_vec[0].bv_len = len;
997 bio->bi_vcnt = vcnt;
998 bio->bi_size = len;
999 bio->bi_idx = 0;
1001 return bio;
1005 * Clone a portion of a bio chain, starting at the given byte offset
1006 * into the first bio in the source chain and continuing for the
1007 * number of bytes indicated. The result is another bio chain of
1008 * exactly the given length, or a null pointer on error.
1010 * The bio_src and offset parameters are both in-out. On entry they
1011 * refer to the first source bio and the offset into that bio where
1012 * the start of data to be cloned is located.
1014 * On return, bio_src is updated to refer to the bio in the source
1015 * chain that contains first un-cloned byte, and *offset will
1016 * contain the offset of that byte within that bio.
1018 static struct bio *bio_chain_clone_range(struct bio **bio_src,
1019 unsigned int *offset,
1020 unsigned int len,
1021 gfp_t gfpmask)
1023 struct bio *bi = *bio_src;
1024 unsigned int off = *offset;
1025 struct bio *chain = NULL;
1026 struct bio **end;
1028 /* Build up a chain of clone bios up to the limit */
1030 if (!bi || off >= bi->bi_size || !len)
1031 return NULL; /* Nothing to clone */
1033 end = &chain;
1034 while (len) {
1035 unsigned int bi_size;
1036 struct bio *bio;
1038 if (!bi) {
1039 rbd_warn(NULL, "bio_chain exhausted with %u left", len);
1040 goto out_err; /* EINVAL; ran out of bio's */
1042 bi_size = min_t(unsigned int, bi->bi_size - off, len);
1043 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1044 if (!bio)
1045 goto out_err; /* ENOMEM */
1047 *end = bio;
1048 end = &bio->bi_next;
1050 off += bi_size;
1051 if (off == bi->bi_size) {
1052 bi = bi->bi_next;
1053 off = 0;
1055 len -= bi_size;
1057 *bio_src = bi;
1058 *offset = off;
1060 return chain;
1061 out_err:
1062 bio_chain_put(chain);
1064 return NULL;
1067 static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1069 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1070 atomic_read(&obj_request->kref.refcount));
1071 kref_get(&obj_request->kref);
1074 static void rbd_obj_request_destroy(struct kref *kref);
1075 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1077 rbd_assert(obj_request != NULL);
1078 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1079 atomic_read(&obj_request->kref.refcount));
1080 kref_put(&obj_request->kref, rbd_obj_request_destroy);
1083 static void rbd_img_request_get(struct rbd_img_request *img_request)
1085 dout("%s: img %p (was %d)\n", __func__, img_request,
1086 atomic_read(&img_request->kref.refcount));
1087 kref_get(&img_request->kref);
1090 static void rbd_img_request_destroy(struct kref *kref);
1091 static void rbd_img_request_put(struct rbd_img_request *img_request)
1093 rbd_assert(img_request != NULL);
1094 dout("%s: img %p (was %d)\n", __func__, img_request,
1095 atomic_read(&img_request->kref.refcount));
1096 kref_put(&img_request->kref, rbd_img_request_destroy);
1099 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1100 struct rbd_obj_request *obj_request)
1102 rbd_assert(obj_request->img_request == NULL);
1104 rbd_obj_request_get(obj_request);
1105 obj_request->img_request = img_request;
1106 obj_request->which = img_request->obj_request_count;
1107 rbd_assert(obj_request->which != BAD_WHICH);
1108 img_request->obj_request_count++;
1109 list_add_tail(&obj_request->links, &img_request->obj_requests);
1110 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1111 obj_request->which);
1114 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1115 struct rbd_obj_request *obj_request)
1117 rbd_assert(obj_request->which != BAD_WHICH);
1119 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1120 obj_request->which);
1121 list_del(&obj_request->links);
1122 rbd_assert(img_request->obj_request_count > 0);
1123 img_request->obj_request_count--;
1124 rbd_assert(obj_request->which == img_request->obj_request_count);
1125 obj_request->which = BAD_WHICH;
1126 rbd_assert(obj_request->img_request == img_request);
1127 obj_request->img_request = NULL;
1128 obj_request->callback = NULL;
1129 rbd_obj_request_put(obj_request);
1132 static bool obj_request_type_valid(enum obj_request_type type)
1134 switch (type) {
1135 case OBJ_REQUEST_NODATA:
1136 case OBJ_REQUEST_BIO:
1137 case OBJ_REQUEST_PAGES:
1138 return true;
1139 default:
1140 return false;
1144 static struct ceph_osd_req_op *rbd_osd_req_op_create(u16 opcode, ...)
1146 struct ceph_osd_req_op *op;
1147 va_list args;
1148 size_t size;
1150 op = kzalloc(sizeof (*op), GFP_NOIO);
1151 if (!op)
1152 return NULL;
1153 op->op = opcode;
1154 va_start(args, opcode);
1155 switch (opcode) {
1156 case CEPH_OSD_OP_READ:
1157 case CEPH_OSD_OP_WRITE:
1158 /* rbd_osd_req_op_create(READ, offset, length) */
1159 /* rbd_osd_req_op_create(WRITE, offset, length) */
1160 op->extent.offset = va_arg(args, u64);
1161 op->extent.length = va_arg(args, u64);
1162 if (opcode == CEPH_OSD_OP_WRITE)
1163 op->payload_len = op->extent.length;
1164 break;
1165 case CEPH_OSD_OP_STAT:
1166 break;
1167 case CEPH_OSD_OP_CALL:
1168 /* rbd_osd_req_op_create(CALL, class, method, data, datalen) */
1169 op->cls.class_name = va_arg(args, char *);
1170 size = strlen(op->cls.class_name);
1171 rbd_assert(size <= (size_t) U8_MAX);
1172 op->cls.class_len = size;
1173 op->payload_len = size;
1175 op->cls.method_name = va_arg(args, char *);
1176 size = strlen(op->cls.method_name);
1177 rbd_assert(size <= (size_t) U8_MAX);
1178 op->cls.method_len = size;
1179 op->payload_len += size;
1181 op->cls.argc = 0;
1182 op->cls.indata = va_arg(args, void *);
1183 size = va_arg(args, size_t);
1184 rbd_assert(size <= (size_t) U32_MAX);
1185 op->cls.indata_len = (u32) size;
1186 op->payload_len += size;
1187 break;
1188 case CEPH_OSD_OP_NOTIFY_ACK:
1189 case CEPH_OSD_OP_WATCH:
1190 /* rbd_osd_req_op_create(NOTIFY_ACK, cookie, version) */
1191 /* rbd_osd_req_op_create(WATCH, cookie, version, flag) */
1192 op->watch.cookie = va_arg(args, u64);
1193 op->watch.ver = va_arg(args, u64);
1194 op->watch.ver = cpu_to_le64(op->watch.ver);
1195 if (opcode == CEPH_OSD_OP_WATCH && va_arg(args, int))
1196 op->watch.flag = (u8) 1;
1197 break;
1198 default:
1199 rbd_warn(NULL, "unsupported opcode %hu\n", opcode);
1200 kfree(op);
1201 op = NULL;
1202 break;
1204 va_end(args);
1206 return op;
1209 static void rbd_osd_req_op_destroy(struct ceph_osd_req_op *op)
1211 kfree(op);
1214 static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1215 struct rbd_obj_request *obj_request)
1217 dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request);
1219 return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1222 static void rbd_img_request_complete(struct rbd_img_request *img_request)
1224 dout("%s: img %p\n", __func__, img_request);
1225 if (img_request->callback)
1226 img_request->callback(img_request);
1227 else
1228 rbd_img_request_put(img_request);
1231 /* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1233 static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1235 dout("%s: obj %p\n", __func__, obj_request);
1237 return wait_for_completion_interruptible(&obj_request->completion);
1240 static void obj_request_done_init(struct rbd_obj_request *obj_request)
1242 atomic_set(&obj_request->done, 0);
1243 smp_wmb();
1246 static void obj_request_done_set(struct rbd_obj_request *obj_request)
1248 int done;
1250 done = atomic_inc_return(&obj_request->done);
1251 if (done > 1) {
1252 struct rbd_img_request *img_request = obj_request->img_request;
1253 struct rbd_device *rbd_dev;
1255 rbd_dev = img_request ? img_request->rbd_dev : NULL;
1256 rbd_warn(rbd_dev, "obj_request %p was already done\n",
1257 obj_request);
1261 static bool obj_request_done_test(struct rbd_obj_request *obj_request)
1263 smp_mb();
1264 return atomic_read(&obj_request->done) != 0;
1267 static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1269 dout("%s: obj %p cb %p\n", __func__, obj_request,
1270 obj_request->callback);
1271 if (obj_request->callback)
1272 obj_request->callback(obj_request);
1273 else
1274 complete_all(&obj_request->completion);
1277 static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
1279 dout("%s: obj %p\n", __func__, obj_request);
1280 obj_request_done_set(obj_request);
1283 static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
1286 dout("%s: obj %p result %d %llu/%llu\n", __func__, obj_request,
1287 obj_request->result, obj_request->xferred, obj_request->length);
1288 if (obj_request->result == (s32) -ENOENT) {
1289 zero_bio_chain(obj_request->bio_list, 0);
1290 obj_request->result = 0;
1291 } else if (obj_request->xferred < obj_request->length &&
1292 !obj_request->result) {
1293 zero_bio_chain(obj_request->bio_list, obj_request->xferred);
1294 obj_request->xferred = obj_request->length;
1296 obj_request_done_set(obj_request);
1299 static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
1301 dout("%s: obj %p result %d %llu/%llu\n", __func__, obj_request,
1302 obj_request->result, obj_request->xferred, obj_request->length);
1304 /* A short write really shouldn't occur. Warn if we see one */
1306 if (obj_request->xferred != obj_request->length) {
1307 struct rbd_img_request *img_request = obj_request->img_request;
1308 struct rbd_device *rbd_dev;
1310 rbd_dev = img_request ? img_request->rbd_dev : NULL;
1311 rbd_warn(rbd_dev, "wrote %llu want %llu\n",
1312 obj_request->xferred, obj_request->length);
1315 obj_request_done_set(obj_request);
1319 * For a simple stat call there's nothing to do. We'll do more if
1320 * this is part of a write sequence for a layered image.
1322 static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
1324 dout("%s: obj %p\n", __func__, obj_request);
1325 obj_request_done_set(obj_request);
1328 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1329 struct ceph_msg *msg)
1331 struct rbd_obj_request *obj_request = osd_req->r_priv;
1332 struct ceph_osd_reply_head *reply_head;
1333 struct ceph_osd_op *op;
1334 u32 num_ops;
1335 u16 opcode;
1337 dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
1338 rbd_assert(osd_req == obj_request->osd_req);
1339 rbd_assert(!!obj_request->img_request ^
1340 (obj_request->which == BAD_WHICH));
1342 reply_head = msg->front.iov_base;
1343 obj_request->result = (s32) le32_to_cpu(reply_head->result);
1344 obj_request->version = le64_to_cpu(osd_req->r_reassert_version.version);
1346 num_ops = le32_to_cpu(reply_head->num_ops);
1347 WARN_ON(num_ops != 1); /* For now */
1350 * We support a 64-bit length, but ultimately it has to be
1351 * passed to blk_end_request(), which takes an unsigned int.
1353 op = &reply_head->ops[0];
1354 obj_request->xferred = le64_to_cpu(op->extent.length);
1355 rbd_assert(obj_request->xferred < (u64) UINT_MAX);
1357 opcode = le16_to_cpu(op->op);
1358 switch (opcode) {
1359 case CEPH_OSD_OP_READ:
1360 rbd_osd_read_callback(obj_request);
1361 break;
1362 case CEPH_OSD_OP_WRITE:
1363 rbd_osd_write_callback(obj_request);
1364 break;
1365 case CEPH_OSD_OP_STAT:
1366 rbd_osd_stat_callback(obj_request);
1367 break;
1368 case CEPH_OSD_OP_CALL:
1369 case CEPH_OSD_OP_NOTIFY_ACK:
1370 case CEPH_OSD_OP_WATCH:
1371 rbd_osd_trivial_callback(obj_request);
1372 break;
1373 default:
1374 rbd_warn(NULL, "%s: unsupported op %hu\n",
1375 obj_request->object_name, (unsigned short) opcode);
1376 break;
1379 if (obj_request_done_test(obj_request))
1380 rbd_obj_request_complete(obj_request);
1383 static struct ceph_osd_request *rbd_osd_req_create(
1384 struct rbd_device *rbd_dev,
1385 bool write_request,
1386 struct rbd_obj_request *obj_request,
1387 struct ceph_osd_req_op *op)
1389 struct rbd_img_request *img_request = obj_request->img_request;
1390 struct ceph_snap_context *snapc = NULL;
1391 struct ceph_osd_client *osdc;
1392 struct ceph_osd_request *osd_req;
1393 struct timespec now;
1394 struct timespec *mtime;
1395 u64 snap_id = CEPH_NOSNAP;
1396 u64 offset = obj_request->offset;
1397 u64 length = obj_request->length;
1399 if (img_request) {
1400 rbd_assert(img_request->write_request == write_request);
1401 if (img_request->write_request)
1402 snapc = img_request->snapc;
1403 else
1404 snap_id = img_request->snap_id;
1407 /* Allocate and initialize the request, for the single op */
1409 osdc = &rbd_dev->rbd_client->client->osdc;
1410 osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
1411 if (!osd_req)
1412 return NULL; /* ENOMEM */
1414 rbd_assert(obj_request_type_valid(obj_request->type));
1415 switch (obj_request->type) {
1416 case OBJ_REQUEST_NODATA:
1417 break; /* Nothing to do */
1418 case OBJ_REQUEST_BIO:
1419 rbd_assert(obj_request->bio_list != NULL);
1420 osd_req->r_bio = obj_request->bio_list;
1421 break;
1422 case OBJ_REQUEST_PAGES:
1423 osd_req->r_pages = obj_request->pages;
1424 osd_req->r_num_pages = obj_request->page_count;
1425 osd_req->r_page_alignment = offset & ~PAGE_MASK;
1426 break;
1429 if (write_request) {
1430 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1431 now = CURRENT_TIME;
1432 mtime = &now;
1433 } else {
1434 osd_req->r_flags = CEPH_OSD_FLAG_READ;
1435 mtime = NULL; /* not needed for reads */
1436 offset = 0; /* These are not used... */
1437 length = 0; /* ...for osd read requests */
1440 osd_req->r_callback = rbd_osd_req_callback;
1441 osd_req->r_priv = obj_request;
1443 osd_req->r_oid_len = strlen(obj_request->object_name);
1444 rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1445 memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1447 osd_req->r_file_layout = rbd_dev->layout; /* struct */
1449 /* osd_req will get its own reference to snapc (if non-null) */
1451 ceph_osdc_build_request(osd_req, offset, length, 1, op,
1452 snapc, snap_id, mtime);
1454 return osd_req;
1457 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1459 ceph_osdc_put_request(osd_req);
1462 /* object_name is assumed to be a non-null pointer and NUL-terminated */
1464 static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1465 u64 offset, u64 length,
1466 enum obj_request_type type)
1468 struct rbd_obj_request *obj_request;
1469 size_t size;
1470 char *name;
1472 rbd_assert(obj_request_type_valid(type));
1474 size = strlen(object_name) + 1;
1475 obj_request = kzalloc(sizeof (*obj_request) + size, GFP_KERNEL);
1476 if (!obj_request)
1477 return NULL;
1479 name = (char *)(obj_request + 1);
1480 obj_request->object_name = memcpy(name, object_name, size);
1481 obj_request->offset = offset;
1482 obj_request->length = length;
1483 obj_request->which = BAD_WHICH;
1484 obj_request->type = type;
1485 INIT_LIST_HEAD(&obj_request->links);
1486 obj_request_done_init(obj_request);
1487 init_completion(&obj_request->completion);
1488 kref_init(&obj_request->kref);
1490 dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
1491 offset, length, (int)type, obj_request);
1493 return obj_request;
1496 static void rbd_obj_request_destroy(struct kref *kref)
1498 struct rbd_obj_request *obj_request;
1500 obj_request = container_of(kref, struct rbd_obj_request, kref);
1502 dout("%s: obj %p\n", __func__, obj_request);
1504 rbd_assert(obj_request->img_request == NULL);
1505 rbd_assert(obj_request->which == BAD_WHICH);
1507 if (obj_request->osd_req)
1508 rbd_osd_req_destroy(obj_request->osd_req);
1510 rbd_assert(obj_request_type_valid(obj_request->type));
1511 switch (obj_request->type) {
1512 case OBJ_REQUEST_NODATA:
1513 break; /* Nothing to do */
1514 case OBJ_REQUEST_BIO:
1515 if (obj_request->bio_list)
1516 bio_chain_put(obj_request->bio_list);
1517 break;
1518 case OBJ_REQUEST_PAGES:
1519 if (obj_request->pages)
1520 ceph_release_page_vector(obj_request->pages,
1521 obj_request->page_count);
1522 break;
1525 kfree(obj_request);
1529 * Caller is responsible for filling in the list of object requests
1530 * that comprises the image request, and the Linux request pointer
1531 * (if there is one).
1533 static struct rbd_img_request *rbd_img_request_create(
1534 struct rbd_device *rbd_dev,
1535 u64 offset, u64 length,
1536 bool write_request)
1538 struct rbd_img_request *img_request;
1539 struct ceph_snap_context *snapc = NULL;
1541 img_request = kmalloc(sizeof (*img_request), GFP_ATOMIC);
1542 if (!img_request)
1543 return NULL;
1545 if (write_request) {
1546 down_read(&rbd_dev->header_rwsem);
1547 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1548 up_read(&rbd_dev->header_rwsem);
1549 if (WARN_ON(!snapc)) {
1550 kfree(img_request);
1551 return NULL; /* Shouldn't happen */
1555 img_request->rq = NULL;
1556 img_request->rbd_dev = rbd_dev;
1557 img_request->offset = offset;
1558 img_request->length = length;
1559 img_request->write_request = write_request;
1560 if (write_request)
1561 img_request->snapc = snapc;
1562 else
1563 img_request->snap_id = rbd_dev->spec->snap_id;
1564 spin_lock_init(&img_request->completion_lock);
1565 img_request->next_completion = 0;
1566 img_request->callback = NULL;
1567 img_request->obj_request_count = 0;
1568 INIT_LIST_HEAD(&img_request->obj_requests);
1569 kref_init(&img_request->kref);
1571 rbd_img_request_get(img_request); /* Avoid a warning */
1572 rbd_img_request_put(img_request); /* TEMPORARY */
1574 dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
1575 write_request ? "write" : "read", offset, length,
1576 img_request);
1578 return img_request;
1581 static void rbd_img_request_destroy(struct kref *kref)
1583 struct rbd_img_request *img_request;
1584 struct rbd_obj_request *obj_request;
1585 struct rbd_obj_request *next_obj_request;
1587 img_request = container_of(kref, struct rbd_img_request, kref);
1589 dout("%s: img %p\n", __func__, img_request);
1591 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1592 rbd_img_obj_request_del(img_request, obj_request);
1593 rbd_assert(img_request->obj_request_count == 0);
1595 if (img_request->write_request)
1596 ceph_put_snap_context(img_request->snapc);
1598 kfree(img_request);
1601 static int rbd_img_request_fill_bio(struct rbd_img_request *img_request,
1602 struct bio *bio_list)
1604 struct rbd_device *rbd_dev = img_request->rbd_dev;
1605 struct rbd_obj_request *obj_request = NULL;
1606 struct rbd_obj_request *next_obj_request;
1607 unsigned int bio_offset;
1608 u64 image_offset;
1609 u64 resid;
1610 u16 opcode;
1612 dout("%s: img %p bio %p\n", __func__, img_request, bio_list);
1614 opcode = img_request->write_request ? CEPH_OSD_OP_WRITE
1615 : CEPH_OSD_OP_READ;
1616 bio_offset = 0;
1617 image_offset = img_request->offset;
1618 rbd_assert(image_offset == bio_list->bi_sector << SECTOR_SHIFT);
1619 resid = img_request->length;
1620 rbd_assert(resid > 0);
1621 while (resid) {
1622 const char *object_name;
1623 unsigned int clone_size;
1624 struct ceph_osd_req_op *op;
1625 u64 offset;
1626 u64 length;
1628 object_name = rbd_segment_name(rbd_dev, image_offset);
1629 if (!object_name)
1630 goto out_unwind;
1631 offset = rbd_segment_offset(rbd_dev, image_offset);
1632 length = rbd_segment_length(rbd_dev, image_offset, resid);
1633 obj_request = rbd_obj_request_create(object_name,
1634 offset, length,
1635 OBJ_REQUEST_BIO);
1636 kfree(object_name); /* object request has its own copy */
1637 if (!obj_request)
1638 goto out_unwind;
1640 rbd_assert(length <= (u64) UINT_MAX);
1641 clone_size = (unsigned int) length;
1642 obj_request->bio_list = bio_chain_clone_range(&bio_list,
1643 &bio_offset, clone_size,
1644 GFP_ATOMIC);
1645 if (!obj_request->bio_list)
1646 goto out_partial;
1649 * Build up the op to use in building the osd
1650 * request. Note that the contents of the op are
1651 * copied by rbd_osd_req_create().
1653 op = rbd_osd_req_op_create(opcode, offset, length);
1654 if (!op)
1655 goto out_partial;
1656 obj_request->osd_req = rbd_osd_req_create(rbd_dev,
1657 img_request->write_request,
1658 obj_request, op);
1659 rbd_osd_req_op_destroy(op);
1660 if (!obj_request->osd_req)
1661 goto out_partial;
1662 /* status and version are initially zero-filled */
1664 rbd_img_obj_request_add(img_request, obj_request);
1666 image_offset += length;
1667 resid -= length;
1670 return 0;
1672 out_partial:
1673 rbd_obj_request_put(obj_request);
1674 out_unwind:
1675 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1676 rbd_obj_request_put(obj_request);
1678 return -ENOMEM;
1681 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
1683 struct rbd_img_request *img_request;
1684 u32 which = obj_request->which;
1685 bool more = true;
1687 img_request = obj_request->img_request;
1689 dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1690 rbd_assert(img_request != NULL);
1691 rbd_assert(img_request->rq != NULL);
1692 rbd_assert(img_request->obj_request_count > 0);
1693 rbd_assert(which != BAD_WHICH);
1694 rbd_assert(which < img_request->obj_request_count);
1695 rbd_assert(which >= img_request->next_completion);
1697 spin_lock_irq(&img_request->completion_lock);
1698 if (which != img_request->next_completion)
1699 goto out;
1701 for_each_obj_request_from(img_request, obj_request) {
1702 unsigned int xferred;
1703 int result;
1705 rbd_assert(more);
1706 rbd_assert(which < img_request->obj_request_count);
1708 if (!obj_request_done_test(obj_request))
1709 break;
1711 rbd_assert(obj_request->xferred <= (u64) UINT_MAX);
1712 xferred = (unsigned int) obj_request->xferred;
1713 result = (int) obj_request->result;
1714 if (result)
1715 rbd_warn(NULL, "obj_request %s result %d xferred %u\n",
1716 img_request->write_request ? "write" : "read",
1717 result, xferred);
1719 more = blk_end_request(img_request->rq, result, xferred);
1720 which++;
1722 rbd_assert(more ^ (which == img_request->obj_request_count));
1723 img_request->next_completion = which;
1724 out:
1725 spin_unlock_irq(&img_request->completion_lock);
1727 if (!more)
1728 rbd_img_request_complete(img_request);
1731 static int rbd_img_request_submit(struct rbd_img_request *img_request)
1733 struct rbd_device *rbd_dev = img_request->rbd_dev;
1734 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1735 struct rbd_obj_request *obj_request;
1737 dout("%s: img %p\n", __func__, img_request);
1738 for_each_obj_request(img_request, obj_request) {
1739 int ret;
1741 obj_request->callback = rbd_img_obj_callback;
1742 ret = rbd_obj_request_submit(osdc, obj_request);
1743 if (ret)
1744 return ret;
1746 * The image request has its own reference to each
1747 * of its object requests, so we can safely drop the
1748 * initial one here.
1750 rbd_obj_request_put(obj_request);
1753 return 0;
1756 static int rbd_obj_notify_ack(struct rbd_device *rbd_dev,
1757 u64 ver, u64 notify_id)
1759 struct rbd_obj_request *obj_request;
1760 struct ceph_osd_req_op *op;
1761 struct ceph_osd_client *osdc;
1762 int ret;
1764 obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
1765 OBJ_REQUEST_NODATA);
1766 if (!obj_request)
1767 return -ENOMEM;
1769 ret = -ENOMEM;
1770 op = rbd_osd_req_op_create(CEPH_OSD_OP_NOTIFY_ACK, notify_id, ver);
1771 if (!op)
1772 goto out;
1773 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false,
1774 obj_request, op);
1775 rbd_osd_req_op_destroy(op);
1776 if (!obj_request->osd_req)
1777 goto out;
1779 osdc = &rbd_dev->rbd_client->client->osdc;
1780 obj_request->callback = rbd_obj_request_put;
1781 ret = rbd_obj_request_submit(osdc, obj_request);
1782 out:
1783 if (ret)
1784 rbd_obj_request_put(obj_request);
1786 return ret;
1789 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1791 struct rbd_device *rbd_dev = (struct rbd_device *)data;
1792 u64 hver;
1793 int rc;
1795 if (!rbd_dev)
1796 return;
1798 dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
1799 rbd_dev->header_name, (unsigned long long) notify_id,
1800 (unsigned int) opcode);
1801 rc = rbd_dev_refresh(rbd_dev, &hver);
1802 if (rc)
1803 rbd_warn(rbd_dev, "got notification but failed to "
1804 " update snaps: %d\n", rc);
1806 rbd_obj_notify_ack(rbd_dev, hver, notify_id);
1810 * Request sync osd watch/unwatch. The value of "start" determines
1811 * whether a watch request is being initiated or torn down.
1813 static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start)
1815 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1816 struct rbd_obj_request *obj_request;
1817 struct ceph_osd_req_op *op;
1818 int ret;
1820 rbd_assert(start ^ !!rbd_dev->watch_event);
1821 rbd_assert(start ^ !!rbd_dev->watch_request);
1823 if (start) {
1824 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
1825 &rbd_dev->watch_event);
1826 if (ret < 0)
1827 return ret;
1828 rbd_assert(rbd_dev->watch_event != NULL);
1831 ret = -ENOMEM;
1832 obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
1833 OBJ_REQUEST_NODATA);
1834 if (!obj_request)
1835 goto out_cancel;
1837 op = rbd_osd_req_op_create(CEPH_OSD_OP_WATCH,
1838 rbd_dev->watch_event->cookie,
1839 rbd_dev->header.obj_version, start);
1840 if (!op)
1841 goto out_cancel;
1842 obj_request->osd_req = rbd_osd_req_create(rbd_dev, true,
1843 obj_request, op);
1844 rbd_osd_req_op_destroy(op);
1845 if (!obj_request->osd_req)
1846 goto out_cancel;
1848 if (start)
1849 ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
1850 else
1851 ceph_osdc_unregister_linger_request(osdc,
1852 rbd_dev->watch_request->osd_req);
1853 ret = rbd_obj_request_submit(osdc, obj_request);
1854 if (ret)
1855 goto out_cancel;
1856 ret = rbd_obj_request_wait(obj_request);
1857 if (ret)
1858 goto out_cancel;
1859 ret = obj_request->result;
1860 if (ret)
1861 goto out_cancel;
1864 * A watch request is set to linger, so the underlying osd
1865 * request won't go away until we unregister it. We retain
1866 * a pointer to the object request during that time (in
1867 * rbd_dev->watch_request), so we'll keep a reference to
1868 * it. We'll drop that reference (below) after we've
1869 * unregistered it.
1871 if (start) {
1872 rbd_dev->watch_request = obj_request;
1874 return 0;
1877 /* We have successfully torn down the watch request */
1879 rbd_obj_request_put(rbd_dev->watch_request);
1880 rbd_dev->watch_request = NULL;
1881 out_cancel:
1882 /* Cancel the event if we're tearing down, or on error */
1883 ceph_osdc_cancel_event(rbd_dev->watch_event);
1884 rbd_dev->watch_event = NULL;
1885 if (obj_request)
1886 rbd_obj_request_put(obj_request);
1888 return ret;
1892 * Synchronous osd object method call
1894 static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
1895 const char *object_name,
1896 const char *class_name,
1897 const char *method_name,
1898 const char *outbound,
1899 size_t outbound_size,
1900 char *inbound,
1901 size_t inbound_size,
1902 u64 *version)
1904 struct rbd_obj_request *obj_request;
1905 struct ceph_osd_client *osdc;
1906 struct ceph_osd_req_op *op;
1907 struct page **pages;
1908 u32 page_count;
1909 int ret;
1912 * Method calls are ultimately read operations but they
1913 * don't involve object data (so no offset or length).
1914 * The result should placed into the inbound buffer
1915 * provided. They also supply outbound data--parameters for
1916 * the object method. Currently if this is present it will
1917 * be a snapshot id.
1919 page_count = (u32) calc_pages_for(0, inbound_size);
1920 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
1921 if (IS_ERR(pages))
1922 return PTR_ERR(pages);
1924 ret = -ENOMEM;
1925 obj_request = rbd_obj_request_create(object_name, 0, 0,
1926 OBJ_REQUEST_PAGES);
1927 if (!obj_request)
1928 goto out;
1930 obj_request->pages = pages;
1931 obj_request->page_count = page_count;
1933 op = rbd_osd_req_op_create(CEPH_OSD_OP_CALL, class_name,
1934 method_name, outbound, outbound_size);
1935 if (!op)
1936 goto out;
1937 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false,
1938 obj_request, op);
1939 rbd_osd_req_op_destroy(op);
1940 if (!obj_request->osd_req)
1941 goto out;
1943 osdc = &rbd_dev->rbd_client->client->osdc;
1944 ret = rbd_obj_request_submit(osdc, obj_request);
1945 if (ret)
1946 goto out;
1947 ret = rbd_obj_request_wait(obj_request);
1948 if (ret)
1949 goto out;
1951 ret = obj_request->result;
1952 if (ret < 0)
1953 goto out;
1954 ret = 0;
1955 ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
1956 if (version)
1957 *version = obj_request->version;
1958 out:
1959 if (obj_request)
1960 rbd_obj_request_put(obj_request);
1961 else
1962 ceph_release_page_vector(pages, page_count);
1964 return ret;
1967 static void rbd_request_fn(struct request_queue *q)
1968 __releases(q->queue_lock) __acquires(q->queue_lock)
1970 struct rbd_device *rbd_dev = q->queuedata;
1971 bool read_only = rbd_dev->mapping.read_only;
1972 struct request *rq;
1973 int result;
1975 while ((rq = blk_fetch_request(q))) {
1976 bool write_request = rq_data_dir(rq) == WRITE;
1977 struct rbd_img_request *img_request;
1978 u64 offset;
1979 u64 length;
1981 /* Ignore any non-FS requests that filter through. */
1983 if (rq->cmd_type != REQ_TYPE_FS) {
1984 dout("%s: non-fs request type %d\n", __func__,
1985 (int) rq->cmd_type);
1986 __blk_end_request_all(rq, 0);
1987 continue;
1990 /* Ignore/skip any zero-length requests */
1992 offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
1993 length = (u64) blk_rq_bytes(rq);
1995 if (!length) {
1996 dout("%s: zero-length request\n", __func__);
1997 __blk_end_request_all(rq, 0);
1998 continue;
2001 spin_unlock_irq(q->queue_lock);
2003 /* Disallow writes to a read-only device */
2005 if (write_request) {
2006 result = -EROFS;
2007 if (read_only)
2008 goto end_request;
2009 rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
2013 * Quit early if the mapped snapshot no longer
2014 * exists. It's still possible the snapshot will
2015 * have disappeared by the time our request arrives
2016 * at the osd, but there's no sense in sending it if
2017 * we already know.
2019 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
2020 dout("request for non-existent snapshot");
2021 rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
2022 result = -ENXIO;
2023 goto end_request;
2026 result = -EINVAL;
2027 if (WARN_ON(offset && length > U64_MAX - offset + 1))
2028 goto end_request; /* Shouldn't happen */
2030 result = -ENOMEM;
2031 img_request = rbd_img_request_create(rbd_dev, offset, length,
2032 write_request);
2033 if (!img_request)
2034 goto end_request;
2036 img_request->rq = rq;
2038 result = rbd_img_request_fill_bio(img_request, rq->bio);
2039 if (!result)
2040 result = rbd_img_request_submit(img_request);
2041 if (result)
2042 rbd_img_request_put(img_request);
2043 end_request:
2044 spin_lock_irq(q->queue_lock);
2045 if (result < 0) {
2046 rbd_warn(rbd_dev, "obj_request %s result %d\n",
2047 write_request ? "write" : "read", result);
2048 __blk_end_request_all(rq, result);
2054 * a queue callback. Makes sure that we don't create a bio that spans across
2055 * multiple osd objects. One exception would be with a single page bios,
2056 * which we handle later at bio_chain_clone_range()
2058 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
2059 struct bio_vec *bvec)
2061 struct rbd_device *rbd_dev = q->queuedata;
2062 sector_t sector_offset;
2063 sector_t sectors_per_obj;
2064 sector_t obj_sector_offset;
2065 int ret;
2068 * Find how far into its rbd object the partition-relative
2069 * bio start sector is to offset relative to the enclosing
2070 * device.
2072 sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
2073 sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
2074 obj_sector_offset = sector_offset & (sectors_per_obj - 1);
2077 * Compute the number of bytes from that offset to the end
2078 * of the object. Account for what's already used by the bio.
2080 ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
2081 if (ret > bmd->bi_size)
2082 ret -= bmd->bi_size;
2083 else
2084 ret = 0;
2087 * Don't send back more than was asked for. And if the bio
2088 * was empty, let the whole thing through because: "Note
2089 * that a block device *must* allow a single page to be
2090 * added to an empty bio."
2092 rbd_assert(bvec->bv_len <= PAGE_SIZE);
2093 if (ret > (int) bvec->bv_len || !bmd->bi_size)
2094 ret = (int) bvec->bv_len;
2096 return ret;
2099 static void rbd_free_disk(struct rbd_device *rbd_dev)
2101 struct gendisk *disk = rbd_dev->disk;
2103 if (!disk)
2104 return;
2106 if (disk->flags & GENHD_FL_UP)
2107 del_gendisk(disk);
2108 if (disk->queue)
2109 blk_cleanup_queue(disk->queue);
2110 put_disk(disk);
2113 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
2114 const char *object_name,
2115 u64 offset, u64 length,
2116 char *buf, u64 *version)
2119 struct ceph_osd_req_op *op;
2120 struct rbd_obj_request *obj_request;
2121 struct ceph_osd_client *osdc;
2122 struct page **pages = NULL;
2123 u32 page_count;
2124 size_t size;
2125 int ret;
2127 page_count = (u32) calc_pages_for(offset, length);
2128 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2129 if (IS_ERR(pages))
2130 ret = PTR_ERR(pages);
2132 ret = -ENOMEM;
2133 obj_request = rbd_obj_request_create(object_name, offset, length,
2134 OBJ_REQUEST_PAGES);
2135 if (!obj_request)
2136 goto out;
2138 obj_request->pages = pages;
2139 obj_request->page_count = page_count;
2141 op = rbd_osd_req_op_create(CEPH_OSD_OP_READ, offset, length);
2142 if (!op)
2143 goto out;
2144 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false,
2145 obj_request, op);
2146 rbd_osd_req_op_destroy(op);
2147 if (!obj_request->osd_req)
2148 goto out;
2150 osdc = &rbd_dev->rbd_client->client->osdc;
2151 ret = rbd_obj_request_submit(osdc, obj_request);
2152 if (ret)
2153 goto out;
2154 ret = rbd_obj_request_wait(obj_request);
2155 if (ret)
2156 goto out;
2158 ret = obj_request->result;
2159 if (ret < 0)
2160 goto out;
2162 rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
2163 size = (size_t) obj_request->xferred;
2164 ceph_copy_from_page_vector(pages, buf, 0, size);
2165 rbd_assert(size <= (size_t) INT_MAX);
2166 ret = (int) size;
2167 if (version)
2168 *version = obj_request->version;
2169 out:
2170 if (obj_request)
2171 rbd_obj_request_put(obj_request);
2172 else
2173 ceph_release_page_vector(pages, page_count);
2175 return ret;
2179 * Read the complete header for the given rbd device.
2181 * Returns a pointer to a dynamically-allocated buffer containing
2182 * the complete and validated header. Caller can pass the address
2183 * of a variable that will be filled in with the version of the
2184 * header object at the time it was read.
2186 * Returns a pointer-coded errno if a failure occurs.
2188 static struct rbd_image_header_ondisk *
2189 rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
2191 struct rbd_image_header_ondisk *ondisk = NULL;
2192 u32 snap_count = 0;
2193 u64 names_size = 0;
2194 u32 want_count;
2195 int ret;
2198 * The complete header will include an array of its 64-bit
2199 * snapshot ids, followed by the names of those snapshots as
2200 * a contiguous block of NUL-terminated strings. Note that
2201 * the number of snapshots could change by the time we read
2202 * it in, in which case we re-read it.
2204 do {
2205 size_t size;
2207 kfree(ondisk);
2209 size = sizeof (*ondisk);
2210 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
2211 size += names_size;
2212 ondisk = kmalloc(size, GFP_KERNEL);
2213 if (!ondisk)
2214 return ERR_PTR(-ENOMEM);
2216 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
2217 0, size,
2218 (char *) ondisk, version);
2219 if (ret < 0)
2220 goto out_err;
2221 if (WARN_ON((size_t) ret < size)) {
2222 ret = -ENXIO;
2223 rbd_warn(rbd_dev, "short header read (want %zd got %d)",
2224 size, ret);
2225 goto out_err;
2227 if (!rbd_dev_ondisk_valid(ondisk)) {
2228 ret = -ENXIO;
2229 rbd_warn(rbd_dev, "invalid header");
2230 goto out_err;
2233 names_size = le64_to_cpu(ondisk->snap_names_len);
2234 want_count = snap_count;
2235 snap_count = le32_to_cpu(ondisk->snap_count);
2236 } while (snap_count != want_count);
2238 return ondisk;
2240 out_err:
2241 kfree(ondisk);
2243 return ERR_PTR(ret);
2247 * reload the ondisk the header
2249 static int rbd_read_header(struct rbd_device *rbd_dev,
2250 struct rbd_image_header *header)
2252 struct rbd_image_header_ondisk *ondisk;
2253 u64 ver = 0;
2254 int ret;
2256 ondisk = rbd_dev_v1_header_read(rbd_dev, &ver);
2257 if (IS_ERR(ondisk))
2258 return PTR_ERR(ondisk);
2259 ret = rbd_header_from_disk(header, ondisk);
2260 if (ret >= 0)
2261 header->obj_version = ver;
2262 kfree(ondisk);
2264 return ret;
2267 static void rbd_remove_all_snaps(struct rbd_device *rbd_dev)
2269 struct rbd_snap *snap;
2270 struct rbd_snap *next;
2272 list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
2273 rbd_remove_snap_dev(snap);
2276 static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
2278 sector_t size;
2280 if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
2281 return;
2283 size = (sector_t) rbd_dev->header.image_size / SECTOR_SIZE;
2284 dout("setting size to %llu sectors", (unsigned long long) size);
2285 rbd_dev->mapping.size = (u64) size;
2286 set_capacity(rbd_dev->disk, size);
2290 * only read the first part of the ondisk header, without the snaps info
2292 static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev, u64 *hver)
2294 int ret;
2295 struct rbd_image_header h;
2297 ret = rbd_read_header(rbd_dev, &h);
2298 if (ret < 0)
2299 return ret;
2301 down_write(&rbd_dev->header_rwsem);
2303 /* Update image size, and check for resize of mapped image */
2304 rbd_dev->header.image_size = h.image_size;
2305 rbd_update_mapping_size(rbd_dev);
2307 /* rbd_dev->header.object_prefix shouldn't change */
2308 kfree(rbd_dev->header.snap_sizes);
2309 kfree(rbd_dev->header.snap_names);
2310 /* osd requests may still refer to snapc */
2311 ceph_put_snap_context(rbd_dev->header.snapc);
2313 if (hver)
2314 *hver = h.obj_version;
2315 rbd_dev->header.obj_version = h.obj_version;
2316 rbd_dev->header.image_size = h.image_size;
2317 rbd_dev->header.snapc = h.snapc;
2318 rbd_dev->header.snap_names = h.snap_names;
2319 rbd_dev->header.snap_sizes = h.snap_sizes;
2320 /* Free the extra copy of the object prefix */
2321 WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
2322 kfree(h.object_prefix);
2324 ret = rbd_dev_snaps_update(rbd_dev);
2325 if (!ret)
2326 ret = rbd_dev_snaps_register(rbd_dev);
2328 up_write(&rbd_dev->header_rwsem);
2330 return ret;
2333 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver)
2335 int ret;
2337 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
2338 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2339 if (rbd_dev->image_format == 1)
2340 ret = rbd_dev_v1_refresh(rbd_dev, hver);
2341 else
2342 ret = rbd_dev_v2_refresh(rbd_dev, hver);
2343 mutex_unlock(&ctl_mutex);
2345 return ret;
2348 static int rbd_init_disk(struct rbd_device *rbd_dev)
2350 struct gendisk *disk;
2351 struct request_queue *q;
2352 u64 segment_size;
2354 /* create gendisk info */
2355 disk = alloc_disk(RBD_MINORS_PER_MAJOR);
2356 if (!disk)
2357 return -ENOMEM;
2359 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
2360 rbd_dev->dev_id);
2361 disk->major = rbd_dev->major;
2362 disk->first_minor = 0;
2363 disk->fops = &rbd_bd_ops;
2364 disk->private_data = rbd_dev;
2366 q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
2367 if (!q)
2368 goto out_disk;
2370 /* We use the default size, but let's be explicit about it. */
2371 blk_queue_physical_block_size(q, SECTOR_SIZE);
2373 /* set io sizes to object size */
2374 segment_size = rbd_obj_bytes(&rbd_dev->header);
2375 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
2376 blk_queue_max_segment_size(q, segment_size);
2377 blk_queue_io_min(q, segment_size);
2378 blk_queue_io_opt(q, segment_size);
2380 blk_queue_merge_bvec(q, rbd_merge_bvec);
2381 disk->queue = q;
2383 q->queuedata = rbd_dev;
2385 rbd_dev->disk = disk;
2387 set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
2389 return 0;
2390 out_disk:
2391 put_disk(disk);
2393 return -ENOMEM;
2397 sysfs
2400 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
2402 return container_of(dev, struct rbd_device, dev);
2405 static ssize_t rbd_size_show(struct device *dev,
2406 struct device_attribute *attr, char *buf)
2408 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2409 sector_t size;
2411 down_read(&rbd_dev->header_rwsem);
2412 size = get_capacity(rbd_dev->disk);
2413 up_read(&rbd_dev->header_rwsem);
2415 return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
2419 * Note this shows the features for whatever's mapped, which is not
2420 * necessarily the base image.
2422 static ssize_t rbd_features_show(struct device *dev,
2423 struct device_attribute *attr, char *buf)
2425 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2427 return sprintf(buf, "0x%016llx\n",
2428 (unsigned long long) rbd_dev->mapping.features);
2431 static ssize_t rbd_major_show(struct device *dev,
2432 struct device_attribute *attr, char *buf)
2434 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2436 return sprintf(buf, "%d\n", rbd_dev->major);
2439 static ssize_t rbd_client_id_show(struct device *dev,
2440 struct device_attribute *attr, char *buf)
2442 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2444 return sprintf(buf, "client%lld\n",
2445 ceph_client_id(rbd_dev->rbd_client->client));
2448 static ssize_t rbd_pool_show(struct device *dev,
2449 struct device_attribute *attr, char *buf)
2451 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2453 return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
2456 static ssize_t rbd_pool_id_show(struct device *dev,
2457 struct device_attribute *attr, char *buf)
2459 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2461 return sprintf(buf, "%llu\n",
2462 (unsigned long long) rbd_dev->spec->pool_id);
2465 static ssize_t rbd_name_show(struct device *dev,
2466 struct device_attribute *attr, char *buf)
2468 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2470 if (rbd_dev->spec->image_name)
2471 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
2473 return sprintf(buf, "(unknown)\n");
2476 static ssize_t rbd_image_id_show(struct device *dev,
2477 struct device_attribute *attr, char *buf)
2479 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2481 return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
2485 * Shows the name of the currently-mapped snapshot (or
2486 * RBD_SNAP_HEAD_NAME for the base image).
2488 static ssize_t rbd_snap_show(struct device *dev,
2489 struct device_attribute *attr,
2490 char *buf)
2492 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2494 return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
2498 * For an rbd v2 image, shows the pool id, image id, and snapshot id
2499 * for the parent image. If there is no parent, simply shows
2500 * "(no parent image)".
2502 static ssize_t rbd_parent_show(struct device *dev,
2503 struct device_attribute *attr,
2504 char *buf)
2506 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2507 struct rbd_spec *spec = rbd_dev->parent_spec;
2508 int count;
2509 char *bufp = buf;
2511 if (!spec)
2512 return sprintf(buf, "(no parent image)\n");
2514 count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
2515 (unsigned long long) spec->pool_id, spec->pool_name);
2516 if (count < 0)
2517 return count;
2518 bufp += count;
2520 count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
2521 spec->image_name ? spec->image_name : "(unknown)");
2522 if (count < 0)
2523 return count;
2524 bufp += count;
2526 count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
2527 (unsigned long long) spec->snap_id, spec->snap_name);
2528 if (count < 0)
2529 return count;
2530 bufp += count;
2532 count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
2533 if (count < 0)
2534 return count;
2535 bufp += count;
2537 return (ssize_t) (bufp - buf);
2540 static ssize_t rbd_image_refresh(struct device *dev,
2541 struct device_attribute *attr,
2542 const char *buf,
2543 size_t size)
2545 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2546 int ret;
2548 ret = rbd_dev_refresh(rbd_dev, NULL);
2550 return ret < 0 ? ret : size;
2553 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
2554 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
2555 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
2556 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
2557 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
2558 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
2559 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
2560 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
2561 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
2562 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
2563 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
2565 static struct attribute *rbd_attrs[] = {
2566 &dev_attr_size.attr,
2567 &dev_attr_features.attr,
2568 &dev_attr_major.attr,
2569 &dev_attr_client_id.attr,
2570 &dev_attr_pool.attr,
2571 &dev_attr_pool_id.attr,
2572 &dev_attr_name.attr,
2573 &dev_attr_image_id.attr,
2574 &dev_attr_current_snap.attr,
2575 &dev_attr_parent.attr,
2576 &dev_attr_refresh.attr,
2577 NULL
2580 static struct attribute_group rbd_attr_group = {
2581 .attrs = rbd_attrs,
2584 static const struct attribute_group *rbd_attr_groups[] = {
2585 &rbd_attr_group,
2586 NULL
2589 static void rbd_sysfs_dev_release(struct device *dev)
2593 static struct device_type rbd_device_type = {
2594 .name = "rbd",
2595 .groups = rbd_attr_groups,
2596 .release = rbd_sysfs_dev_release,
2601 sysfs - snapshots
2604 static ssize_t rbd_snap_size_show(struct device *dev,
2605 struct device_attribute *attr,
2606 char *buf)
2608 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2610 return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
2613 static ssize_t rbd_snap_id_show(struct device *dev,
2614 struct device_attribute *attr,
2615 char *buf)
2617 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2619 return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
2622 static ssize_t rbd_snap_features_show(struct device *dev,
2623 struct device_attribute *attr,
2624 char *buf)
2626 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2628 return sprintf(buf, "0x%016llx\n",
2629 (unsigned long long) snap->features);
2632 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
2633 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
2634 static DEVICE_ATTR(snap_features, S_IRUGO, rbd_snap_features_show, NULL);
2636 static struct attribute *rbd_snap_attrs[] = {
2637 &dev_attr_snap_size.attr,
2638 &dev_attr_snap_id.attr,
2639 &dev_attr_snap_features.attr,
2640 NULL,
2643 static struct attribute_group rbd_snap_attr_group = {
2644 .attrs = rbd_snap_attrs,
2647 static void rbd_snap_dev_release(struct device *dev)
2649 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2650 kfree(snap->name);
2651 kfree(snap);
2654 static const struct attribute_group *rbd_snap_attr_groups[] = {
2655 &rbd_snap_attr_group,
2656 NULL
2659 static struct device_type rbd_snap_device_type = {
2660 .groups = rbd_snap_attr_groups,
2661 .release = rbd_snap_dev_release,
2664 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
2666 kref_get(&spec->kref);
2668 return spec;
2671 static void rbd_spec_free(struct kref *kref);
2672 static void rbd_spec_put(struct rbd_spec *spec)
2674 if (spec)
2675 kref_put(&spec->kref, rbd_spec_free);
2678 static struct rbd_spec *rbd_spec_alloc(void)
2680 struct rbd_spec *spec;
2682 spec = kzalloc(sizeof (*spec), GFP_KERNEL);
2683 if (!spec)
2684 return NULL;
2685 kref_init(&spec->kref);
2687 rbd_spec_put(rbd_spec_get(spec)); /* TEMPORARY */
2689 return spec;
2692 static void rbd_spec_free(struct kref *kref)
2694 struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
2696 kfree(spec->pool_name);
2697 kfree(spec->image_id);
2698 kfree(spec->image_name);
2699 kfree(spec->snap_name);
2700 kfree(spec);
2703 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
2704 struct rbd_spec *spec)
2706 struct rbd_device *rbd_dev;
2708 rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
2709 if (!rbd_dev)
2710 return NULL;
2712 spin_lock_init(&rbd_dev->lock);
2713 rbd_dev->flags = 0;
2714 INIT_LIST_HEAD(&rbd_dev->node);
2715 INIT_LIST_HEAD(&rbd_dev->snaps);
2716 init_rwsem(&rbd_dev->header_rwsem);
2718 rbd_dev->spec = spec;
2719 rbd_dev->rbd_client = rbdc;
2721 /* Initialize the layout used for all rbd requests */
2723 rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
2724 rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
2725 rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
2726 rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
2728 return rbd_dev;
2731 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
2733 rbd_spec_put(rbd_dev->parent_spec);
2734 kfree(rbd_dev->header_name);
2735 rbd_put_client(rbd_dev->rbd_client);
2736 rbd_spec_put(rbd_dev->spec);
2737 kfree(rbd_dev);
2740 static bool rbd_snap_registered(struct rbd_snap *snap)
2742 bool ret = snap->dev.type == &rbd_snap_device_type;
2743 bool reg = device_is_registered(&snap->dev);
2745 rbd_assert(!ret ^ reg);
2747 return ret;
2750 static void rbd_remove_snap_dev(struct rbd_snap *snap)
2752 list_del(&snap->node);
2753 if (device_is_registered(&snap->dev))
2754 device_unregister(&snap->dev);
2757 static int rbd_register_snap_dev(struct rbd_snap *snap,
2758 struct device *parent)
2760 struct device *dev = &snap->dev;
2761 int ret;
2763 dev->type = &rbd_snap_device_type;
2764 dev->parent = parent;
2765 dev->release = rbd_snap_dev_release;
2766 dev_set_name(dev, "%s%s", RBD_SNAP_DEV_NAME_PREFIX, snap->name);
2767 dout("%s: registering device for snapshot %s\n", __func__, snap->name);
2769 ret = device_register(dev);
2771 return ret;
2774 static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
2775 const char *snap_name,
2776 u64 snap_id, u64 snap_size,
2777 u64 snap_features)
2779 struct rbd_snap *snap;
2780 int ret;
2782 snap = kzalloc(sizeof (*snap), GFP_KERNEL);
2783 if (!snap)
2784 return ERR_PTR(-ENOMEM);
2786 ret = -ENOMEM;
2787 snap->name = kstrdup(snap_name, GFP_KERNEL);
2788 if (!snap->name)
2789 goto err;
2791 snap->id = snap_id;
2792 snap->size = snap_size;
2793 snap->features = snap_features;
2795 return snap;
2797 err:
2798 kfree(snap->name);
2799 kfree(snap);
2801 return ERR_PTR(ret);
2804 static char *rbd_dev_v1_snap_info(struct rbd_device *rbd_dev, u32 which,
2805 u64 *snap_size, u64 *snap_features)
2807 char *snap_name;
2809 rbd_assert(which < rbd_dev->header.snapc->num_snaps);
2811 *snap_size = rbd_dev->header.snap_sizes[which];
2812 *snap_features = 0; /* No features for v1 */
2814 /* Skip over names until we find the one we are looking for */
2816 snap_name = rbd_dev->header.snap_names;
2817 while (which--)
2818 snap_name += strlen(snap_name) + 1;
2820 return snap_name;
2824 * Get the size and object order for an image snapshot, or if
2825 * snap_id is CEPH_NOSNAP, gets this information for the base
2826 * image.
2828 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
2829 u8 *order, u64 *snap_size)
2831 __le64 snapid = cpu_to_le64(snap_id);
2832 int ret;
2833 struct {
2834 u8 order;
2835 __le64 size;
2836 } __attribute__ ((packed)) size_buf = { 0 };
2838 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
2839 "rbd", "get_size",
2840 (char *) &snapid, sizeof (snapid),
2841 (char *) &size_buf, sizeof (size_buf), NULL);
2842 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
2843 if (ret < 0)
2844 return ret;
2846 *order = size_buf.order;
2847 *snap_size = le64_to_cpu(size_buf.size);
2849 dout(" snap_id 0x%016llx order = %u, snap_size = %llu\n",
2850 (unsigned long long) snap_id, (unsigned int) *order,
2851 (unsigned long long) *snap_size);
2853 return 0;
2856 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
2858 return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
2859 &rbd_dev->header.obj_order,
2860 &rbd_dev->header.image_size);
2863 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
2865 void *reply_buf;
2866 int ret;
2867 void *p;
2869 reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
2870 if (!reply_buf)
2871 return -ENOMEM;
2873 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
2874 "rbd", "get_object_prefix",
2875 NULL, 0,
2876 reply_buf, RBD_OBJ_PREFIX_LEN_MAX, NULL);
2877 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
2878 if (ret < 0)
2879 goto out;
2881 p = reply_buf;
2882 rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
2883 p + RBD_OBJ_PREFIX_LEN_MAX,
2884 NULL, GFP_NOIO);
2886 if (IS_ERR(rbd_dev->header.object_prefix)) {
2887 ret = PTR_ERR(rbd_dev->header.object_prefix);
2888 rbd_dev->header.object_prefix = NULL;
2889 } else {
2890 dout(" object_prefix = %s\n", rbd_dev->header.object_prefix);
2893 out:
2894 kfree(reply_buf);
2896 return ret;
2899 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
2900 u64 *snap_features)
2902 __le64 snapid = cpu_to_le64(snap_id);
2903 struct {
2904 __le64 features;
2905 __le64 incompat;
2906 } features_buf = { 0 };
2907 u64 incompat;
2908 int ret;
2910 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
2911 "rbd", "get_features",
2912 (char *) &snapid, sizeof (snapid),
2913 (char *) &features_buf, sizeof (features_buf),
2914 NULL);
2915 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
2916 if (ret < 0)
2917 return ret;
2919 incompat = le64_to_cpu(features_buf.incompat);
2920 if (incompat & ~RBD_FEATURES_ALL)
2921 return -ENXIO;
2923 *snap_features = le64_to_cpu(features_buf.features);
2925 dout(" snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
2926 (unsigned long long) snap_id,
2927 (unsigned long long) *snap_features,
2928 (unsigned long long) le64_to_cpu(features_buf.incompat));
2930 return 0;
2933 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
2935 return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
2936 &rbd_dev->header.features);
2939 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
2941 struct rbd_spec *parent_spec;
2942 size_t size;
2943 void *reply_buf = NULL;
2944 __le64 snapid;
2945 void *p;
2946 void *end;
2947 char *image_id;
2948 u64 overlap;
2949 int ret;
2951 parent_spec = rbd_spec_alloc();
2952 if (!parent_spec)
2953 return -ENOMEM;
2955 size = sizeof (__le64) + /* pool_id */
2956 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX + /* image_id */
2957 sizeof (__le64) + /* snap_id */
2958 sizeof (__le64); /* overlap */
2959 reply_buf = kmalloc(size, GFP_KERNEL);
2960 if (!reply_buf) {
2961 ret = -ENOMEM;
2962 goto out_err;
2965 snapid = cpu_to_le64(CEPH_NOSNAP);
2966 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
2967 "rbd", "get_parent",
2968 (char *) &snapid, sizeof (snapid),
2969 (char *) reply_buf, size, NULL);
2970 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
2971 if (ret < 0)
2972 goto out_err;
2974 ret = -ERANGE;
2975 p = reply_buf;
2976 end = (char *) reply_buf + size;
2977 ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err);
2978 if (parent_spec->pool_id == CEPH_NOPOOL)
2979 goto out; /* No parent? No problem. */
2981 /* The ceph file layout needs to fit pool id in 32 bits */
2983 ret = -EIO;
2984 if (WARN_ON(parent_spec->pool_id > (u64) U32_MAX))
2985 goto out;
2987 image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
2988 if (IS_ERR(image_id)) {
2989 ret = PTR_ERR(image_id);
2990 goto out_err;
2992 parent_spec->image_id = image_id;
2993 ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
2994 ceph_decode_64_safe(&p, end, overlap, out_err);
2996 rbd_dev->parent_overlap = overlap;
2997 rbd_dev->parent_spec = parent_spec;
2998 parent_spec = NULL; /* rbd_dev now owns this */
2999 out:
3000 ret = 0;
3001 out_err:
3002 kfree(reply_buf);
3003 rbd_spec_put(parent_spec);
3005 return ret;
3008 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
3010 size_t image_id_size;
3011 char *image_id;
3012 void *p;
3013 void *end;
3014 size_t size;
3015 void *reply_buf = NULL;
3016 size_t len = 0;
3017 char *image_name = NULL;
3018 int ret;
3020 rbd_assert(!rbd_dev->spec->image_name);
3022 len = strlen(rbd_dev->spec->image_id);
3023 image_id_size = sizeof (__le32) + len;
3024 image_id = kmalloc(image_id_size, GFP_KERNEL);
3025 if (!image_id)
3026 return NULL;
3028 p = image_id;
3029 end = (char *) image_id + image_id_size;
3030 ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32) len);
3032 size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
3033 reply_buf = kmalloc(size, GFP_KERNEL);
3034 if (!reply_buf)
3035 goto out;
3037 ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
3038 "rbd", "dir_get_name",
3039 image_id, image_id_size,
3040 (char *) reply_buf, size, NULL);
3041 if (ret < 0)
3042 goto out;
3043 p = reply_buf;
3044 end = (char *) reply_buf + size;
3045 image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
3046 if (IS_ERR(image_name))
3047 image_name = NULL;
3048 else
3049 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
3050 out:
3051 kfree(reply_buf);
3052 kfree(image_id);
3054 return image_name;
3058 * When a parent image gets probed, we only have the pool, image,
3059 * and snapshot ids but not the names of any of them. This call
3060 * is made later to fill in those names. It has to be done after
3061 * rbd_dev_snaps_update() has completed because some of the
3062 * information (in particular, snapshot name) is not available
3063 * until then.
3065 static int rbd_dev_probe_update_spec(struct rbd_device *rbd_dev)
3067 struct ceph_osd_client *osdc;
3068 const char *name;
3069 void *reply_buf = NULL;
3070 int ret;
3072 if (rbd_dev->spec->pool_name)
3073 return 0; /* Already have the names */
3075 /* Look up the pool name */
3077 osdc = &rbd_dev->rbd_client->client->osdc;
3078 name = ceph_pg_pool_name_by_id(osdc->osdmap, rbd_dev->spec->pool_id);
3079 if (!name) {
3080 rbd_warn(rbd_dev, "there is no pool with id %llu",
3081 rbd_dev->spec->pool_id); /* Really a BUG() */
3082 return -EIO;
3085 rbd_dev->spec->pool_name = kstrdup(name, GFP_KERNEL);
3086 if (!rbd_dev->spec->pool_name)
3087 return -ENOMEM;
3089 /* Fetch the image name; tolerate failure here */
3091 name = rbd_dev_image_name(rbd_dev);
3092 if (name)
3093 rbd_dev->spec->image_name = (char *) name;
3094 else
3095 rbd_warn(rbd_dev, "unable to get image name");
3097 /* Look up the snapshot name. */
3099 name = rbd_snap_name(rbd_dev, rbd_dev->spec->snap_id);
3100 if (!name) {
3101 rbd_warn(rbd_dev, "no snapshot with id %llu",
3102 rbd_dev->spec->snap_id); /* Really a BUG() */
3103 ret = -EIO;
3104 goto out_err;
3106 rbd_dev->spec->snap_name = kstrdup(name, GFP_KERNEL);
3107 if(!rbd_dev->spec->snap_name)
3108 goto out_err;
3110 return 0;
3111 out_err:
3112 kfree(reply_buf);
3113 kfree(rbd_dev->spec->pool_name);
3114 rbd_dev->spec->pool_name = NULL;
3116 return ret;
3119 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver)
3121 size_t size;
3122 int ret;
3123 void *reply_buf;
3124 void *p;
3125 void *end;
3126 u64 seq;
3127 u32 snap_count;
3128 struct ceph_snap_context *snapc;
3129 u32 i;
3132 * We'll need room for the seq value (maximum snapshot id),
3133 * snapshot count, and array of that many snapshot ids.
3134 * For now we have a fixed upper limit on the number we're
3135 * prepared to receive.
3137 size = sizeof (__le64) + sizeof (__le32) +
3138 RBD_MAX_SNAP_COUNT * sizeof (__le64);
3139 reply_buf = kzalloc(size, GFP_KERNEL);
3140 if (!reply_buf)
3141 return -ENOMEM;
3143 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3144 "rbd", "get_snapcontext",
3145 NULL, 0,
3146 reply_buf, size, ver);
3147 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3148 if (ret < 0)
3149 goto out;
3151 ret = -ERANGE;
3152 p = reply_buf;
3153 end = (char *) reply_buf + size;
3154 ceph_decode_64_safe(&p, end, seq, out);
3155 ceph_decode_32_safe(&p, end, snap_count, out);
3158 * Make sure the reported number of snapshot ids wouldn't go
3159 * beyond the end of our buffer. But before checking that,
3160 * make sure the computed size of the snapshot context we
3161 * allocate is representable in a size_t.
3163 if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
3164 / sizeof (u64)) {
3165 ret = -EINVAL;
3166 goto out;
3168 if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
3169 goto out;
3171 size = sizeof (struct ceph_snap_context) +
3172 snap_count * sizeof (snapc->snaps[0]);
3173 snapc = kmalloc(size, GFP_KERNEL);
3174 if (!snapc) {
3175 ret = -ENOMEM;
3176 goto out;
3179 atomic_set(&snapc->nref, 1);
3180 snapc->seq = seq;
3181 snapc->num_snaps = snap_count;
3182 for (i = 0; i < snap_count; i++)
3183 snapc->snaps[i] = ceph_decode_64(&p);
3185 rbd_dev->header.snapc = snapc;
3187 dout(" snap context seq = %llu, snap_count = %u\n",
3188 (unsigned long long) seq, (unsigned int) snap_count);
3190 out:
3191 kfree(reply_buf);
3193 return 0;
3196 static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
3198 size_t size;
3199 void *reply_buf;
3200 __le64 snap_id;
3201 int ret;
3202 void *p;
3203 void *end;
3204 char *snap_name;
3206 size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
3207 reply_buf = kmalloc(size, GFP_KERNEL);
3208 if (!reply_buf)
3209 return ERR_PTR(-ENOMEM);
3211 snap_id = cpu_to_le64(rbd_dev->header.snapc->snaps[which]);
3212 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3213 "rbd", "get_snapshot_name",
3214 (char *) &snap_id, sizeof (snap_id),
3215 reply_buf, size, NULL);
3216 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3217 if (ret < 0)
3218 goto out;
3220 p = reply_buf;
3221 end = (char *) reply_buf + size;
3222 snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3223 if (IS_ERR(snap_name)) {
3224 ret = PTR_ERR(snap_name);
3225 goto out;
3226 } else {
3227 dout(" snap_id 0x%016llx snap_name = %s\n",
3228 (unsigned long long) le64_to_cpu(snap_id), snap_name);
3230 kfree(reply_buf);
3232 return snap_name;
3233 out:
3234 kfree(reply_buf);
3236 return ERR_PTR(ret);
3239 static char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which,
3240 u64 *snap_size, u64 *snap_features)
3242 u64 snap_id;
3243 u8 order;
3244 int ret;
3246 snap_id = rbd_dev->header.snapc->snaps[which];
3247 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, &order, snap_size);
3248 if (ret)
3249 return ERR_PTR(ret);
3250 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, snap_features);
3251 if (ret)
3252 return ERR_PTR(ret);
3254 return rbd_dev_v2_snap_name(rbd_dev, which);
3257 static char *rbd_dev_snap_info(struct rbd_device *rbd_dev, u32 which,
3258 u64 *snap_size, u64 *snap_features)
3260 if (rbd_dev->image_format == 1)
3261 return rbd_dev_v1_snap_info(rbd_dev, which,
3262 snap_size, snap_features);
3263 if (rbd_dev->image_format == 2)
3264 return rbd_dev_v2_snap_info(rbd_dev, which,
3265 snap_size, snap_features);
3266 return ERR_PTR(-EINVAL);
3269 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver)
3271 int ret;
3272 __u8 obj_order;
3274 down_write(&rbd_dev->header_rwsem);
3276 /* Grab old order first, to see if it changes */
3278 obj_order = rbd_dev->header.obj_order,
3279 ret = rbd_dev_v2_image_size(rbd_dev);
3280 if (ret)
3281 goto out;
3282 if (rbd_dev->header.obj_order != obj_order) {
3283 ret = -EIO;
3284 goto out;
3286 rbd_update_mapping_size(rbd_dev);
3288 ret = rbd_dev_v2_snap_context(rbd_dev, hver);
3289 dout("rbd_dev_v2_snap_context returned %d\n", ret);
3290 if (ret)
3291 goto out;
3292 ret = rbd_dev_snaps_update(rbd_dev);
3293 dout("rbd_dev_snaps_update returned %d\n", ret);
3294 if (ret)
3295 goto out;
3296 ret = rbd_dev_snaps_register(rbd_dev);
3297 dout("rbd_dev_snaps_register returned %d\n", ret);
3298 out:
3299 up_write(&rbd_dev->header_rwsem);
3301 return ret;
3305 * Scan the rbd device's current snapshot list and compare it to the
3306 * newly-received snapshot context. Remove any existing snapshots
3307 * not present in the new snapshot context. Add a new snapshot for
3308 * any snaphots in the snapshot context not in the current list.
3309 * And verify there are no changes to snapshots we already know
3310 * about.
3312 * Assumes the snapshots in the snapshot context are sorted by
3313 * snapshot id, highest id first. (Snapshots in the rbd_dev's list
3314 * are also maintained in that order.)
3316 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
3318 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
3319 const u32 snap_count = snapc->num_snaps;
3320 struct list_head *head = &rbd_dev->snaps;
3321 struct list_head *links = head->next;
3322 u32 index = 0;
3324 dout("%s: snap count is %u\n", __func__, (unsigned int) snap_count);
3325 while (index < snap_count || links != head) {
3326 u64 snap_id;
3327 struct rbd_snap *snap;
3328 char *snap_name;
3329 u64 snap_size = 0;
3330 u64 snap_features = 0;
3332 snap_id = index < snap_count ? snapc->snaps[index]
3333 : CEPH_NOSNAP;
3334 snap = links != head ? list_entry(links, struct rbd_snap, node)
3335 : NULL;
3336 rbd_assert(!snap || snap->id != CEPH_NOSNAP);
3338 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
3339 struct list_head *next = links->next;
3342 * A previously-existing snapshot is not in
3343 * the new snap context.
3345 * If the now missing snapshot is the one the
3346 * image is mapped to, clear its exists flag
3347 * so we can avoid sending any more requests
3348 * to it.
3350 if (rbd_dev->spec->snap_id == snap->id)
3351 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
3352 rbd_remove_snap_dev(snap);
3353 dout("%ssnap id %llu has been removed\n",
3354 rbd_dev->spec->snap_id == snap->id ?
3355 "mapped " : "",
3356 (unsigned long long) snap->id);
3358 /* Done with this list entry; advance */
3360 links = next;
3361 continue;
3364 snap_name = rbd_dev_snap_info(rbd_dev, index,
3365 &snap_size, &snap_features);
3366 if (IS_ERR(snap_name))
3367 return PTR_ERR(snap_name);
3369 dout("entry %u: snap_id = %llu\n", (unsigned int) snap_count,
3370 (unsigned long long) snap_id);
3371 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
3372 struct rbd_snap *new_snap;
3374 /* We haven't seen this snapshot before */
3376 new_snap = __rbd_add_snap_dev(rbd_dev, snap_name,
3377 snap_id, snap_size, snap_features);
3378 if (IS_ERR(new_snap)) {
3379 int err = PTR_ERR(new_snap);
3381 dout(" failed to add dev, error %d\n", err);
3383 return err;
3386 /* New goes before existing, or at end of list */
3388 dout(" added dev%s\n", snap ? "" : " at end\n");
3389 if (snap)
3390 list_add_tail(&new_snap->node, &snap->node);
3391 else
3392 list_add_tail(&new_snap->node, head);
3393 } else {
3394 /* Already have this one */
3396 dout(" already present\n");
3398 rbd_assert(snap->size == snap_size);
3399 rbd_assert(!strcmp(snap->name, snap_name));
3400 rbd_assert(snap->features == snap_features);
3402 /* Done with this list entry; advance */
3404 links = links->next;
3407 /* Advance to the next entry in the snapshot context */
3409 index++;
3411 dout("%s: done\n", __func__);
3413 return 0;
3417 * Scan the list of snapshots and register the devices for any that
3418 * have not already been registered.
3420 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev)
3422 struct rbd_snap *snap;
3423 int ret = 0;
3425 dout("%s:\n", __func__);
3426 if (WARN_ON(!device_is_registered(&rbd_dev->dev)))
3427 return -EIO;
3429 list_for_each_entry(snap, &rbd_dev->snaps, node) {
3430 if (!rbd_snap_registered(snap)) {
3431 ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
3432 if (ret < 0)
3433 break;
3436 dout("%s: returning %d\n", __func__, ret);
3438 return ret;
3441 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
3443 struct device *dev;
3444 int ret;
3446 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3448 dev = &rbd_dev->dev;
3449 dev->bus = &rbd_bus_type;
3450 dev->type = &rbd_device_type;
3451 dev->parent = &rbd_root_dev;
3452 dev->release = rbd_dev_release;
3453 dev_set_name(dev, "%d", rbd_dev->dev_id);
3454 ret = device_register(dev);
3456 mutex_unlock(&ctl_mutex);
3458 return ret;
3461 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
3463 device_unregister(&rbd_dev->dev);
3466 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
3469 * Get a unique rbd identifier for the given new rbd_dev, and add
3470 * the rbd_dev to the global list. The minimum rbd id is 1.
3472 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
3474 rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
3476 spin_lock(&rbd_dev_list_lock);
3477 list_add_tail(&rbd_dev->node, &rbd_dev_list);
3478 spin_unlock(&rbd_dev_list_lock);
3479 dout("rbd_dev %p given dev id %llu\n", rbd_dev,
3480 (unsigned long long) rbd_dev->dev_id);
3484 * Remove an rbd_dev from the global list, and record that its
3485 * identifier is no longer in use.
3487 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
3489 struct list_head *tmp;
3490 int rbd_id = rbd_dev->dev_id;
3491 int max_id;
3493 rbd_assert(rbd_id > 0);
3495 dout("rbd_dev %p released dev id %llu\n", rbd_dev,
3496 (unsigned long long) rbd_dev->dev_id);
3497 spin_lock(&rbd_dev_list_lock);
3498 list_del_init(&rbd_dev->node);
3501 * If the id being "put" is not the current maximum, there
3502 * is nothing special we need to do.
3504 if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
3505 spin_unlock(&rbd_dev_list_lock);
3506 return;
3510 * We need to update the current maximum id. Search the
3511 * list to find out what it is. We're more likely to find
3512 * the maximum at the end, so search the list backward.
3514 max_id = 0;
3515 list_for_each_prev(tmp, &rbd_dev_list) {
3516 struct rbd_device *rbd_dev;
3518 rbd_dev = list_entry(tmp, struct rbd_device, node);
3519 if (rbd_dev->dev_id > max_id)
3520 max_id = rbd_dev->dev_id;
3522 spin_unlock(&rbd_dev_list_lock);
3525 * The max id could have been updated by rbd_dev_id_get(), in
3526 * which case it now accurately reflects the new maximum.
3527 * Be careful not to overwrite the maximum value in that
3528 * case.
3530 atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
3531 dout(" max dev id has been reset\n");
3535 * Skips over white space at *buf, and updates *buf to point to the
3536 * first found non-space character (if any). Returns the length of
3537 * the token (string of non-white space characters) found. Note
3538 * that *buf must be terminated with '\0'.
3540 static inline size_t next_token(const char **buf)
3543 * These are the characters that produce nonzero for
3544 * isspace() in the "C" and "POSIX" locales.
3546 const char *spaces = " \f\n\r\t\v";
3548 *buf += strspn(*buf, spaces); /* Find start of token */
3550 return strcspn(*buf, spaces); /* Return token length */
3554 * Finds the next token in *buf, and if the provided token buffer is
3555 * big enough, copies the found token into it. The result, if
3556 * copied, is guaranteed to be terminated with '\0'. Note that *buf
3557 * must be terminated with '\0' on entry.
3559 * Returns the length of the token found (not including the '\0').
3560 * Return value will be 0 if no token is found, and it will be >=
3561 * token_size if the token would not fit.
3563 * The *buf pointer will be updated to point beyond the end of the
3564 * found token. Note that this occurs even if the token buffer is
3565 * too small to hold it.
3567 static inline size_t copy_token(const char **buf,
3568 char *token,
3569 size_t token_size)
3571 size_t len;
3573 len = next_token(buf);
3574 if (len < token_size) {
3575 memcpy(token, *buf, len);
3576 *(token + len) = '\0';
3578 *buf += len;
3580 return len;
3584 * Finds the next token in *buf, dynamically allocates a buffer big
3585 * enough to hold a copy of it, and copies the token into the new
3586 * buffer. The copy is guaranteed to be terminated with '\0'. Note
3587 * that a duplicate buffer is created even for a zero-length token.
3589 * Returns a pointer to the newly-allocated duplicate, or a null
3590 * pointer if memory for the duplicate was not available. If
3591 * the lenp argument is a non-null pointer, the length of the token
3592 * (not including the '\0') is returned in *lenp.
3594 * If successful, the *buf pointer will be updated to point beyond
3595 * the end of the found token.
3597 * Note: uses GFP_KERNEL for allocation.
3599 static inline char *dup_token(const char **buf, size_t *lenp)
3601 char *dup;
3602 size_t len;
3604 len = next_token(buf);
3605 dup = kmemdup(*buf, len + 1, GFP_KERNEL);
3606 if (!dup)
3607 return NULL;
3608 *(dup + len) = '\0';
3609 *buf += len;
3611 if (lenp)
3612 *lenp = len;
3614 return dup;
3618 * Parse the options provided for an "rbd add" (i.e., rbd image
3619 * mapping) request. These arrive via a write to /sys/bus/rbd/add,
3620 * and the data written is passed here via a NUL-terminated buffer.
3621 * Returns 0 if successful or an error code otherwise.
3623 * The information extracted from these options is recorded in
3624 * the other parameters which return dynamically-allocated
3625 * structures:
3626 * ceph_opts
3627 * The address of a pointer that will refer to a ceph options
3628 * structure. Caller must release the returned pointer using
3629 * ceph_destroy_options() when it is no longer needed.
3630 * rbd_opts
3631 * Address of an rbd options pointer. Fully initialized by
3632 * this function; caller must release with kfree().
3633 * spec
3634 * Address of an rbd image specification pointer. Fully
3635 * initialized by this function based on parsed options.
3636 * Caller must release with rbd_spec_put().
3638 * The options passed take this form:
3639 * <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
3640 * where:
3641 * <mon_addrs>
3642 * A comma-separated list of one or more monitor addresses.
3643 * A monitor address is an ip address, optionally followed
3644 * by a port number (separated by a colon).
3645 * I.e.: ip1[:port1][,ip2[:port2]...]
3646 * <options>
3647 * A comma-separated list of ceph and/or rbd options.
3648 * <pool_name>
3649 * The name of the rados pool containing the rbd image.
3650 * <image_name>
3651 * The name of the image in that pool to map.
3652 * <snap_id>
3653 * An optional snapshot id. If provided, the mapping will
3654 * present data from the image at the time that snapshot was
3655 * created. The image head is used if no snapshot id is
3656 * provided. Snapshot mappings are always read-only.
3658 static int rbd_add_parse_args(const char *buf,
3659 struct ceph_options **ceph_opts,
3660 struct rbd_options **opts,
3661 struct rbd_spec **rbd_spec)
3663 size_t len;
3664 char *options;
3665 const char *mon_addrs;
3666 size_t mon_addrs_size;
3667 struct rbd_spec *spec = NULL;
3668 struct rbd_options *rbd_opts = NULL;
3669 struct ceph_options *copts;
3670 int ret;
3672 /* The first four tokens are required */
3674 len = next_token(&buf);
3675 if (!len) {
3676 rbd_warn(NULL, "no monitor address(es) provided");
3677 return -EINVAL;
3679 mon_addrs = buf;
3680 mon_addrs_size = len + 1;
3681 buf += len;
3683 ret = -EINVAL;
3684 options = dup_token(&buf, NULL);
3685 if (!options)
3686 return -ENOMEM;
3687 if (!*options) {
3688 rbd_warn(NULL, "no options provided");
3689 goto out_err;
3692 spec = rbd_spec_alloc();
3693 if (!spec)
3694 goto out_mem;
3696 spec->pool_name = dup_token(&buf, NULL);
3697 if (!spec->pool_name)
3698 goto out_mem;
3699 if (!*spec->pool_name) {
3700 rbd_warn(NULL, "no pool name provided");
3701 goto out_err;
3704 spec->image_name = dup_token(&buf, NULL);
3705 if (!spec->image_name)
3706 goto out_mem;
3707 if (!*spec->image_name) {
3708 rbd_warn(NULL, "no image name provided");
3709 goto out_err;
3713 * Snapshot name is optional; default is to use "-"
3714 * (indicating the head/no snapshot).
3716 len = next_token(&buf);
3717 if (!len) {
3718 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
3719 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
3720 } else if (len > RBD_MAX_SNAP_NAME_LEN) {
3721 ret = -ENAMETOOLONG;
3722 goto out_err;
3724 spec->snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
3725 if (!spec->snap_name)
3726 goto out_mem;
3727 *(spec->snap_name + len) = '\0';
3729 /* Initialize all rbd options to the defaults */
3731 rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
3732 if (!rbd_opts)
3733 goto out_mem;
3735 rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
3737 copts = ceph_parse_options(options, mon_addrs,
3738 mon_addrs + mon_addrs_size - 1,
3739 parse_rbd_opts_token, rbd_opts);
3740 if (IS_ERR(copts)) {
3741 ret = PTR_ERR(copts);
3742 goto out_err;
3744 kfree(options);
3746 *ceph_opts = copts;
3747 *opts = rbd_opts;
3748 *rbd_spec = spec;
3750 return 0;
3751 out_mem:
3752 ret = -ENOMEM;
3753 out_err:
3754 kfree(rbd_opts);
3755 rbd_spec_put(spec);
3756 kfree(options);
3758 return ret;
3762 * An rbd format 2 image has a unique identifier, distinct from the
3763 * name given to it by the user. Internally, that identifier is
3764 * what's used to specify the names of objects related to the image.
3766 * A special "rbd id" object is used to map an rbd image name to its
3767 * id. If that object doesn't exist, then there is no v2 rbd image
3768 * with the supplied name.
3770 * This function will record the given rbd_dev's image_id field if
3771 * it can be determined, and in that case will return 0. If any
3772 * errors occur a negative errno will be returned and the rbd_dev's
3773 * image_id field will be unchanged (and should be NULL).
3775 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
3777 int ret;
3778 size_t size;
3779 char *object_name;
3780 void *response;
3781 void *p;
3784 * When probing a parent image, the image id is already
3785 * known (and the image name likely is not). There's no
3786 * need to fetch the image id again in this case.
3788 if (rbd_dev->spec->image_id)
3789 return 0;
3792 * First, see if the format 2 image id file exists, and if
3793 * so, get the image's persistent id from it.
3795 size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
3796 object_name = kmalloc(size, GFP_NOIO);
3797 if (!object_name)
3798 return -ENOMEM;
3799 sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
3800 dout("rbd id object name is %s\n", object_name);
3802 /* Response will be an encoded string, which includes a length */
3804 size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
3805 response = kzalloc(size, GFP_NOIO);
3806 if (!response) {
3807 ret = -ENOMEM;
3808 goto out;
3811 ret = rbd_obj_method_sync(rbd_dev, object_name,
3812 "rbd", "get_id",
3813 NULL, 0,
3814 response, RBD_IMAGE_ID_LEN_MAX, NULL);
3815 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3816 if (ret < 0)
3817 goto out;
3819 p = response;
3820 rbd_dev->spec->image_id = ceph_extract_encoded_string(&p,
3821 p + RBD_IMAGE_ID_LEN_MAX,
3822 NULL, GFP_NOIO);
3823 if (IS_ERR(rbd_dev->spec->image_id)) {
3824 ret = PTR_ERR(rbd_dev->spec->image_id);
3825 rbd_dev->spec->image_id = NULL;
3826 } else {
3827 dout("image_id is %s\n", rbd_dev->spec->image_id);
3829 out:
3830 kfree(response);
3831 kfree(object_name);
3833 return ret;
3836 static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
3838 int ret;
3839 size_t size;
3841 /* Version 1 images have no id; empty string is used */
3843 rbd_dev->spec->image_id = kstrdup("", GFP_KERNEL);
3844 if (!rbd_dev->spec->image_id)
3845 return -ENOMEM;
3847 /* Record the header object name for this rbd image. */
3849 size = strlen(rbd_dev->spec->image_name) + sizeof (RBD_SUFFIX);
3850 rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
3851 if (!rbd_dev->header_name) {
3852 ret = -ENOMEM;
3853 goto out_err;
3855 sprintf(rbd_dev->header_name, "%s%s",
3856 rbd_dev->spec->image_name, RBD_SUFFIX);
3858 /* Populate rbd image metadata */
3860 ret = rbd_read_header(rbd_dev, &rbd_dev->header);
3861 if (ret < 0)
3862 goto out_err;
3864 /* Version 1 images have no parent (no layering) */
3866 rbd_dev->parent_spec = NULL;
3867 rbd_dev->parent_overlap = 0;
3869 rbd_dev->image_format = 1;
3871 dout("discovered version 1 image, header name is %s\n",
3872 rbd_dev->header_name);
3874 return 0;
3876 out_err:
3877 kfree(rbd_dev->header_name);
3878 rbd_dev->header_name = NULL;
3879 kfree(rbd_dev->spec->image_id);
3880 rbd_dev->spec->image_id = NULL;
3882 return ret;
3885 static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
3887 size_t size;
3888 int ret;
3889 u64 ver = 0;
3892 * Image id was filled in by the caller. Record the header
3893 * object name for this rbd image.
3895 size = sizeof (RBD_HEADER_PREFIX) + strlen(rbd_dev->spec->image_id);
3896 rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
3897 if (!rbd_dev->header_name)
3898 return -ENOMEM;
3899 sprintf(rbd_dev->header_name, "%s%s",
3900 RBD_HEADER_PREFIX, rbd_dev->spec->image_id);
3902 /* Get the size and object order for the image */
3904 ret = rbd_dev_v2_image_size(rbd_dev);
3905 if (ret < 0)
3906 goto out_err;
3908 /* Get the object prefix (a.k.a. block_name) for the image */
3910 ret = rbd_dev_v2_object_prefix(rbd_dev);
3911 if (ret < 0)
3912 goto out_err;
3914 /* Get the and check features for the image */
3916 ret = rbd_dev_v2_features(rbd_dev);
3917 if (ret < 0)
3918 goto out_err;
3920 /* If the image supports layering, get the parent info */
3922 if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
3923 ret = rbd_dev_v2_parent_info(rbd_dev);
3924 if (ret < 0)
3925 goto out_err;
3928 /* crypto and compression type aren't (yet) supported for v2 images */
3930 rbd_dev->header.crypt_type = 0;
3931 rbd_dev->header.comp_type = 0;
3933 /* Get the snapshot context, plus the header version */
3935 ret = rbd_dev_v2_snap_context(rbd_dev, &ver);
3936 if (ret)
3937 goto out_err;
3938 rbd_dev->header.obj_version = ver;
3940 rbd_dev->image_format = 2;
3942 dout("discovered version 2 image, header name is %s\n",
3943 rbd_dev->header_name);
3945 return 0;
3946 out_err:
3947 rbd_dev->parent_overlap = 0;
3948 rbd_spec_put(rbd_dev->parent_spec);
3949 rbd_dev->parent_spec = NULL;
3950 kfree(rbd_dev->header_name);
3951 rbd_dev->header_name = NULL;
3952 kfree(rbd_dev->header.object_prefix);
3953 rbd_dev->header.object_prefix = NULL;
3955 return ret;
3958 static int rbd_dev_probe_finish(struct rbd_device *rbd_dev)
3960 int ret;
3962 /* no need to lock here, as rbd_dev is not registered yet */
3963 ret = rbd_dev_snaps_update(rbd_dev);
3964 if (ret)
3965 return ret;
3967 ret = rbd_dev_probe_update_spec(rbd_dev);
3968 if (ret)
3969 goto err_out_snaps;
3971 ret = rbd_dev_set_mapping(rbd_dev);
3972 if (ret)
3973 goto err_out_snaps;
3975 /* generate unique id: find highest unique id, add one */
3976 rbd_dev_id_get(rbd_dev);
3978 /* Fill in the device name, now that we have its id. */
3979 BUILD_BUG_ON(DEV_NAME_LEN
3980 < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
3981 sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
3983 /* Get our block major device number. */
3985 ret = register_blkdev(0, rbd_dev->name);
3986 if (ret < 0)
3987 goto err_out_id;
3988 rbd_dev->major = ret;
3990 /* Set up the blkdev mapping. */
3992 ret = rbd_init_disk(rbd_dev);
3993 if (ret)
3994 goto err_out_blkdev;
3996 ret = rbd_bus_add_dev(rbd_dev);
3997 if (ret)
3998 goto err_out_disk;
4001 * At this point cleanup in the event of an error is the job
4002 * of the sysfs code (initiated by rbd_bus_del_dev()).
4004 down_write(&rbd_dev->header_rwsem);
4005 ret = rbd_dev_snaps_register(rbd_dev);
4006 up_write(&rbd_dev->header_rwsem);
4007 if (ret)
4008 goto err_out_bus;
4010 ret = rbd_dev_header_watch_sync(rbd_dev, 1);
4011 if (ret)
4012 goto err_out_bus;
4014 /* Everything's ready. Announce the disk to the world. */
4016 add_disk(rbd_dev->disk);
4018 pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
4019 (unsigned long long) rbd_dev->mapping.size);
4021 return ret;
4022 err_out_bus:
4023 /* this will also clean up rest of rbd_dev stuff */
4025 rbd_bus_del_dev(rbd_dev);
4027 return ret;
4028 err_out_disk:
4029 rbd_free_disk(rbd_dev);
4030 err_out_blkdev:
4031 unregister_blkdev(rbd_dev->major, rbd_dev->name);
4032 err_out_id:
4033 rbd_dev_id_put(rbd_dev);
4034 err_out_snaps:
4035 rbd_remove_all_snaps(rbd_dev);
4037 return ret;
4041 * Probe for the existence of the header object for the given rbd
4042 * device. For format 2 images this includes determining the image
4043 * id.
4045 static int rbd_dev_probe(struct rbd_device *rbd_dev)
4047 int ret;
4050 * Get the id from the image id object. If it's not a
4051 * format 2 image, we'll get ENOENT back, and we'll assume
4052 * it's a format 1 image.
4054 ret = rbd_dev_image_id(rbd_dev);
4055 if (ret)
4056 ret = rbd_dev_v1_probe(rbd_dev);
4057 else
4058 ret = rbd_dev_v2_probe(rbd_dev);
4059 if (ret) {
4060 dout("probe failed, returning %d\n", ret);
4062 return ret;
4065 ret = rbd_dev_probe_finish(rbd_dev);
4066 if (ret)
4067 rbd_header_free(&rbd_dev->header);
4069 return ret;
4072 static ssize_t rbd_add(struct bus_type *bus,
4073 const char *buf,
4074 size_t count)
4076 struct rbd_device *rbd_dev = NULL;
4077 struct ceph_options *ceph_opts = NULL;
4078 struct rbd_options *rbd_opts = NULL;
4079 struct rbd_spec *spec = NULL;
4080 struct rbd_client *rbdc;
4081 struct ceph_osd_client *osdc;
4082 int rc = -ENOMEM;
4084 if (!try_module_get(THIS_MODULE))
4085 return -ENODEV;
4087 /* parse add command */
4088 rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
4089 if (rc < 0)
4090 goto err_out_module;
4092 rbdc = rbd_get_client(ceph_opts);
4093 if (IS_ERR(rbdc)) {
4094 rc = PTR_ERR(rbdc);
4095 goto err_out_args;
4097 ceph_opts = NULL; /* rbd_dev client now owns this */
4099 /* pick the pool */
4100 osdc = &rbdc->client->osdc;
4101 rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
4102 if (rc < 0)
4103 goto err_out_client;
4104 spec->pool_id = (u64) rc;
4106 /* The ceph file layout needs to fit pool id in 32 bits */
4108 if (WARN_ON(spec->pool_id > (u64) U32_MAX)) {
4109 rc = -EIO;
4110 goto err_out_client;
4113 rbd_dev = rbd_dev_create(rbdc, spec);
4114 if (!rbd_dev)
4115 goto err_out_client;
4116 rbdc = NULL; /* rbd_dev now owns this */
4117 spec = NULL; /* rbd_dev now owns this */
4119 rbd_dev->mapping.read_only = rbd_opts->read_only;
4120 kfree(rbd_opts);
4121 rbd_opts = NULL; /* done with this */
4123 rc = rbd_dev_probe(rbd_dev);
4124 if (rc < 0)
4125 goto err_out_rbd_dev;
4127 return count;
4128 err_out_rbd_dev:
4129 rbd_dev_destroy(rbd_dev);
4130 err_out_client:
4131 rbd_put_client(rbdc);
4132 err_out_args:
4133 if (ceph_opts)
4134 ceph_destroy_options(ceph_opts);
4135 kfree(rbd_opts);
4136 rbd_spec_put(spec);
4137 err_out_module:
4138 module_put(THIS_MODULE);
4140 dout("Error adding device %s\n", buf);
4142 return (ssize_t) rc;
4145 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
4147 struct list_head *tmp;
4148 struct rbd_device *rbd_dev;
4150 spin_lock(&rbd_dev_list_lock);
4151 list_for_each(tmp, &rbd_dev_list) {
4152 rbd_dev = list_entry(tmp, struct rbd_device, node);
4153 if (rbd_dev->dev_id == dev_id) {
4154 spin_unlock(&rbd_dev_list_lock);
4155 return rbd_dev;
4158 spin_unlock(&rbd_dev_list_lock);
4159 return NULL;
4162 static void rbd_dev_release(struct device *dev)
4164 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4166 if (rbd_dev->watch_event)
4167 rbd_dev_header_watch_sync(rbd_dev, 0);
4169 /* clean up and free blkdev */
4170 rbd_free_disk(rbd_dev);
4171 unregister_blkdev(rbd_dev->major, rbd_dev->name);
4173 /* release allocated disk header fields */
4174 rbd_header_free(&rbd_dev->header);
4176 /* done with the id, and with the rbd_dev */
4177 rbd_dev_id_put(rbd_dev);
4178 rbd_assert(rbd_dev->rbd_client != NULL);
4179 rbd_dev_destroy(rbd_dev);
4181 /* release module ref */
4182 module_put(THIS_MODULE);
4185 static ssize_t rbd_remove(struct bus_type *bus,
4186 const char *buf,
4187 size_t count)
4189 struct rbd_device *rbd_dev = NULL;
4190 int target_id, rc;
4191 unsigned long ul;
4192 int ret = count;
4194 rc = strict_strtoul(buf, 10, &ul);
4195 if (rc)
4196 return rc;
4198 /* convert to int; abort if we lost anything in the conversion */
4199 target_id = (int) ul;
4200 if (target_id != ul)
4201 return -EINVAL;
4203 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4205 rbd_dev = __rbd_get_dev(target_id);
4206 if (!rbd_dev) {
4207 ret = -ENOENT;
4208 goto done;
4211 spin_lock_irq(&rbd_dev->lock);
4212 if (rbd_dev->open_count)
4213 ret = -EBUSY;
4214 else
4215 set_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
4216 spin_unlock_irq(&rbd_dev->lock);
4217 if (ret < 0)
4218 goto done;
4220 rbd_remove_all_snaps(rbd_dev);
4221 rbd_bus_del_dev(rbd_dev);
4223 done:
4224 mutex_unlock(&ctl_mutex);
4226 return ret;
4230 * create control files in sysfs
4231 * /sys/bus/rbd/...
4233 static int rbd_sysfs_init(void)
4235 int ret;
4237 ret = device_register(&rbd_root_dev);
4238 if (ret < 0)
4239 return ret;
4241 ret = bus_register(&rbd_bus_type);
4242 if (ret < 0)
4243 device_unregister(&rbd_root_dev);
4245 return ret;
4248 static void rbd_sysfs_cleanup(void)
4250 bus_unregister(&rbd_bus_type);
4251 device_unregister(&rbd_root_dev);
4254 static int __init rbd_init(void)
4256 int rc;
4258 if (!libceph_compatible(NULL)) {
4259 rbd_warn(NULL, "libceph incompatibility (quitting)");
4261 return -EINVAL;
4263 rc = rbd_sysfs_init();
4264 if (rc)
4265 return rc;
4266 pr_info("loaded " RBD_DRV_NAME_LONG "\n");
4267 return 0;
4270 static void __exit rbd_exit(void)
4272 rbd_sysfs_cleanup();
4275 module_init(rbd_init);
4276 module_exit(rbd_exit);
4278 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
4279 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
4280 MODULE_DESCRIPTION("rados block device");
4282 /* following authorship retained from original osdblk.c */
4283 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
4285 MODULE_LICENSE("GPL");