rbd: implement sync object read with new code
[linux-2.6/cjktty.git] / drivers / block / rbd.c
blob3f5eaea444a0d08fb816c8d49d2f49946f2ccb5a
1 /*
2 rbd.c -- Export ceph rados objects as a Linux block device
5 based on drivers/block/osdblk.c:
7 Copyright 2009 Red Hat, Inc.
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation.
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
18 You should have received a copy of the GNU General Public License
19 along with this program; see the file COPYING. If not, write to
20 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
24 For usage instructions, please refer to:
26 Documentation/ABI/testing/sysfs-bus-rbd
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
39 #include <linux/fs.h>
40 #include <linux/blkdev.h>
42 #include "rbd_types.h"
44 #define RBD_DEBUG /* Activate rbd_assert() calls */
47 * The basic unit of block I/O is a sector. It is interpreted in a
48 * number of contexts in Linux (blk, bio, genhd), but the default is
49 * universally 512 bytes. These symbols are just slightly more
50 * meaningful than the bare numbers they represent.
52 #define SECTOR_SHIFT 9
53 #define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
55 /* It might be useful to have these defined elsewhere */
57 #define U8_MAX ((u8) (~0U))
58 #define U16_MAX ((u16) (~0U))
59 #define U32_MAX ((u32) (~0U))
60 #define U64_MAX ((u64) (~0ULL))
62 #define RBD_DRV_NAME "rbd"
63 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
65 #define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
67 #define RBD_SNAP_DEV_NAME_PREFIX "snap_"
68 #define RBD_MAX_SNAP_NAME_LEN \
69 (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
71 #define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */
73 #define RBD_SNAP_HEAD_NAME "-"
75 /* This allows a single page to hold an image name sent by OSD */
76 #define RBD_IMAGE_NAME_LEN_MAX (PAGE_SIZE - sizeof (__le32) - 1)
77 #define RBD_IMAGE_ID_LEN_MAX 64
79 #define RBD_OBJ_PREFIX_LEN_MAX 64
81 /* Feature bits */
83 #define RBD_FEATURE_LAYERING 1
85 /* Features supported by this (client software) implementation. */
87 #define RBD_FEATURES_ALL (0)
90 * An RBD device name will be "rbd#", where the "rbd" comes from
91 * RBD_DRV_NAME above, and # is a unique integer identifier.
92 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
93 * enough to hold all possible device names.
95 #define DEV_NAME_LEN 32
96 #define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
99 * block device image metadata (in-memory version)
101 struct rbd_image_header {
102 /* These four fields never change for a given rbd image */
103 char *object_prefix;
104 u64 features;
105 __u8 obj_order;
106 __u8 crypt_type;
107 __u8 comp_type;
109 /* The remaining fields need to be updated occasionally */
110 u64 image_size;
111 struct ceph_snap_context *snapc;
112 char *snap_names;
113 u64 *snap_sizes;
115 u64 obj_version;
119 * An rbd image specification.
121 * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
122 * identify an image. Each rbd_dev structure includes a pointer to
123 * an rbd_spec structure that encapsulates this identity.
125 * Each of the id's in an rbd_spec has an associated name. For a
126 * user-mapped image, the names are supplied and the id's associated
127 * with them are looked up. For a layered image, a parent image is
128 * defined by the tuple, and the names are looked up.
130 * An rbd_dev structure contains a parent_spec pointer which is
131 * non-null if the image it represents is a child in a layered
132 * image. This pointer will refer to the rbd_spec structure used
133 * by the parent rbd_dev for its own identity (i.e., the structure
134 * is shared between the parent and child).
136 * Since these structures are populated once, during the discovery
137 * phase of image construction, they are effectively immutable so
138 * we make no effort to synchronize access to them.
140 * Note that code herein does not assume the image name is known (it
141 * could be a null pointer).
143 struct rbd_spec {
144 u64 pool_id;
145 char *pool_name;
147 char *image_id;
148 char *image_name;
150 u64 snap_id;
151 char *snap_name;
153 struct kref kref;
157 * an instance of the client. multiple devices may share an rbd client.
159 struct rbd_client {
160 struct ceph_client *client;
161 struct kref kref;
162 struct list_head node;
165 struct rbd_img_request;
166 typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
168 #define BAD_WHICH U32_MAX /* Good which or bad which, which? */
170 struct rbd_obj_request;
171 typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
173 enum obj_request_type { OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES };
175 struct rbd_obj_request {
176 const char *object_name;
177 u64 offset; /* object start byte */
178 u64 length; /* bytes from offset */
180 struct rbd_img_request *img_request;
181 struct list_head links; /* img_request->obj_requests */
182 u32 which; /* posn image request list */
184 enum obj_request_type type;
185 union {
186 struct bio *bio_list;
187 struct {
188 struct page **pages;
189 u32 page_count;
193 struct ceph_osd_request *osd_req;
195 u64 xferred; /* bytes transferred */
196 u64 version;
197 s32 result;
198 atomic_t done;
200 rbd_obj_callback_t callback;
201 struct completion completion;
203 struct kref kref;
206 struct rbd_img_request {
207 struct request *rq;
208 struct rbd_device *rbd_dev;
209 u64 offset; /* starting image byte offset */
210 u64 length; /* byte count from offset */
211 bool write_request; /* false for read */
212 union {
213 struct ceph_snap_context *snapc; /* for writes */
214 u64 snap_id; /* for reads */
216 spinlock_t completion_lock;/* protects next_completion */
217 u32 next_completion;
218 rbd_img_callback_t callback;
220 u32 obj_request_count;
221 struct list_head obj_requests; /* rbd_obj_request structs */
223 struct kref kref;
226 #define for_each_obj_request(ireq, oreq) \
227 list_for_each_entry(oreq, &ireq->obj_requests, links)
228 #define for_each_obj_request_from(ireq, oreq) \
229 list_for_each_entry_from(oreq, &ireq->obj_requests, links)
230 #define for_each_obj_request_safe(ireq, oreq, n) \
231 list_for_each_entry_safe_reverse(oreq, n, &ireq->obj_requests, links)
233 struct rbd_snap {
234 struct device dev;
235 const char *name;
236 u64 size;
237 struct list_head node;
238 u64 id;
239 u64 features;
242 struct rbd_mapping {
243 u64 size;
244 u64 features;
245 bool read_only;
249 * a single device
251 struct rbd_device {
252 int dev_id; /* blkdev unique id */
254 int major; /* blkdev assigned major */
255 struct gendisk *disk; /* blkdev's gendisk and rq */
257 u32 image_format; /* Either 1 or 2 */
258 struct rbd_client *rbd_client;
260 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
262 spinlock_t lock; /* queue lock */
264 struct rbd_image_header header;
265 atomic_t exists;
266 struct rbd_spec *spec;
268 char *header_name;
270 struct ceph_file_layout layout;
272 struct ceph_osd_event *watch_event;
273 struct ceph_osd_request *watch_request;
275 struct rbd_spec *parent_spec;
276 u64 parent_overlap;
278 /* protects updating the header */
279 struct rw_semaphore header_rwsem;
281 struct rbd_mapping mapping;
283 struct list_head node;
285 /* list of snapshots */
286 struct list_head snaps;
288 /* sysfs related */
289 struct device dev;
290 unsigned long open_count;
293 static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
295 static LIST_HEAD(rbd_dev_list); /* devices */
296 static DEFINE_SPINLOCK(rbd_dev_list_lock);
298 static LIST_HEAD(rbd_client_list); /* clients */
299 static DEFINE_SPINLOCK(rbd_client_list_lock);
301 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
302 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev);
304 static void rbd_dev_release(struct device *dev);
305 static void rbd_remove_snap_dev(struct rbd_snap *snap);
307 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
308 size_t count);
309 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
310 size_t count);
312 static struct bus_attribute rbd_bus_attrs[] = {
313 __ATTR(add, S_IWUSR, NULL, rbd_add),
314 __ATTR(remove, S_IWUSR, NULL, rbd_remove),
315 __ATTR_NULL
318 static struct bus_type rbd_bus_type = {
319 .name = "rbd",
320 .bus_attrs = rbd_bus_attrs,
323 static void rbd_root_dev_release(struct device *dev)
327 static struct device rbd_root_dev = {
328 .init_name = "rbd",
329 .release = rbd_root_dev_release,
332 static __printf(2, 3)
333 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
335 struct va_format vaf;
336 va_list args;
338 va_start(args, fmt);
339 vaf.fmt = fmt;
340 vaf.va = &args;
342 if (!rbd_dev)
343 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
344 else if (rbd_dev->disk)
345 printk(KERN_WARNING "%s: %s: %pV\n",
346 RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
347 else if (rbd_dev->spec && rbd_dev->spec->image_name)
348 printk(KERN_WARNING "%s: image %s: %pV\n",
349 RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
350 else if (rbd_dev->spec && rbd_dev->spec->image_id)
351 printk(KERN_WARNING "%s: id %s: %pV\n",
352 RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
353 else /* punt */
354 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
355 RBD_DRV_NAME, rbd_dev, &vaf);
356 va_end(args);
359 #ifdef RBD_DEBUG
360 #define rbd_assert(expr) \
361 if (unlikely(!(expr))) { \
362 printk(KERN_ERR "\nAssertion failure in %s() " \
363 "at line %d:\n\n" \
364 "\trbd_assert(%s);\n\n", \
365 __func__, __LINE__, #expr); \
366 BUG(); \
368 #else /* !RBD_DEBUG */
369 # define rbd_assert(expr) ((void) 0)
370 #endif /* !RBD_DEBUG */
372 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver);
373 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver);
375 static int rbd_open(struct block_device *bdev, fmode_t mode)
377 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
379 if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
380 return -EROFS;
382 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
383 (void) get_device(&rbd_dev->dev);
384 set_device_ro(bdev, rbd_dev->mapping.read_only);
385 rbd_dev->open_count++;
386 mutex_unlock(&ctl_mutex);
388 return 0;
391 static int rbd_release(struct gendisk *disk, fmode_t mode)
393 struct rbd_device *rbd_dev = disk->private_data;
395 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
396 rbd_assert(rbd_dev->open_count > 0);
397 rbd_dev->open_count--;
398 put_device(&rbd_dev->dev);
399 mutex_unlock(&ctl_mutex);
401 return 0;
404 static const struct block_device_operations rbd_bd_ops = {
405 .owner = THIS_MODULE,
406 .open = rbd_open,
407 .release = rbd_release,
411 * Initialize an rbd client instance.
412 * We own *ceph_opts.
414 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
416 struct rbd_client *rbdc;
417 int ret = -ENOMEM;
419 dout("rbd_client_create\n");
420 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
421 if (!rbdc)
422 goto out_opt;
424 kref_init(&rbdc->kref);
425 INIT_LIST_HEAD(&rbdc->node);
427 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
429 rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
430 if (IS_ERR(rbdc->client))
431 goto out_mutex;
432 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
434 ret = ceph_open_session(rbdc->client);
435 if (ret < 0)
436 goto out_err;
438 spin_lock(&rbd_client_list_lock);
439 list_add_tail(&rbdc->node, &rbd_client_list);
440 spin_unlock(&rbd_client_list_lock);
442 mutex_unlock(&ctl_mutex);
444 dout("rbd_client_create created %p\n", rbdc);
445 return rbdc;
447 out_err:
448 ceph_destroy_client(rbdc->client);
449 out_mutex:
450 mutex_unlock(&ctl_mutex);
451 kfree(rbdc);
452 out_opt:
453 if (ceph_opts)
454 ceph_destroy_options(ceph_opts);
455 return ERR_PTR(ret);
459 * Find a ceph client with specific addr and configuration. If
460 * found, bump its reference count.
462 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
464 struct rbd_client *client_node;
465 bool found = false;
467 if (ceph_opts->flags & CEPH_OPT_NOSHARE)
468 return NULL;
470 spin_lock(&rbd_client_list_lock);
471 list_for_each_entry(client_node, &rbd_client_list, node) {
472 if (!ceph_compare_options(ceph_opts, client_node->client)) {
473 kref_get(&client_node->kref);
474 found = true;
475 break;
478 spin_unlock(&rbd_client_list_lock);
480 return found ? client_node : NULL;
484 * mount options
486 enum {
487 Opt_last_int,
488 /* int args above */
489 Opt_last_string,
490 /* string args above */
491 Opt_read_only,
492 Opt_read_write,
493 /* Boolean args above */
494 Opt_last_bool,
497 static match_table_t rbd_opts_tokens = {
498 /* int args above */
499 /* string args above */
500 {Opt_read_only, "read_only"},
501 {Opt_read_only, "ro"}, /* Alternate spelling */
502 {Opt_read_write, "read_write"},
503 {Opt_read_write, "rw"}, /* Alternate spelling */
504 /* Boolean args above */
505 {-1, NULL}
508 struct rbd_options {
509 bool read_only;
512 #define RBD_READ_ONLY_DEFAULT false
514 static int parse_rbd_opts_token(char *c, void *private)
516 struct rbd_options *rbd_opts = private;
517 substring_t argstr[MAX_OPT_ARGS];
518 int token, intval, ret;
520 token = match_token(c, rbd_opts_tokens, argstr);
521 if (token < 0)
522 return -EINVAL;
524 if (token < Opt_last_int) {
525 ret = match_int(&argstr[0], &intval);
526 if (ret < 0) {
527 pr_err("bad mount option arg (not int) "
528 "at '%s'\n", c);
529 return ret;
531 dout("got int token %d val %d\n", token, intval);
532 } else if (token > Opt_last_int && token < Opt_last_string) {
533 dout("got string token %d val %s\n", token,
534 argstr[0].from);
535 } else if (token > Opt_last_string && token < Opt_last_bool) {
536 dout("got Boolean token %d\n", token);
537 } else {
538 dout("got token %d\n", token);
541 switch (token) {
542 case Opt_read_only:
543 rbd_opts->read_only = true;
544 break;
545 case Opt_read_write:
546 rbd_opts->read_only = false;
547 break;
548 default:
549 rbd_assert(false);
550 break;
552 return 0;
556 * Get a ceph client with specific addr and configuration, if one does
557 * not exist create it.
559 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
561 struct rbd_client *rbdc;
563 rbdc = rbd_client_find(ceph_opts);
564 if (rbdc) /* using an existing client */
565 ceph_destroy_options(ceph_opts);
566 else
567 rbdc = rbd_client_create(ceph_opts);
569 return rbdc;
573 * Destroy ceph client
575 * Caller must hold rbd_client_list_lock.
577 static void rbd_client_release(struct kref *kref)
579 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
581 dout("rbd_release_client %p\n", rbdc);
582 spin_lock(&rbd_client_list_lock);
583 list_del(&rbdc->node);
584 spin_unlock(&rbd_client_list_lock);
586 ceph_destroy_client(rbdc->client);
587 kfree(rbdc);
591 * Drop reference to ceph client node. If it's not referenced anymore, release
592 * it.
594 static void rbd_put_client(struct rbd_client *rbdc)
596 if (rbdc)
597 kref_put(&rbdc->kref, rbd_client_release);
600 static bool rbd_image_format_valid(u32 image_format)
602 return image_format == 1 || image_format == 2;
605 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
607 size_t size;
608 u32 snap_count;
610 /* The header has to start with the magic rbd header text */
611 if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
612 return false;
614 /* The bio layer requires at least sector-sized I/O */
616 if (ondisk->options.order < SECTOR_SHIFT)
617 return false;
619 /* If we use u64 in a few spots we may be able to loosen this */
621 if (ondisk->options.order > 8 * sizeof (int) - 1)
622 return false;
625 * The size of a snapshot header has to fit in a size_t, and
626 * that limits the number of snapshots.
628 snap_count = le32_to_cpu(ondisk->snap_count);
629 size = SIZE_MAX - sizeof (struct ceph_snap_context);
630 if (snap_count > size / sizeof (__le64))
631 return false;
634 * Not only that, but the size of the entire the snapshot
635 * header must also be representable in a size_t.
637 size -= snap_count * sizeof (__le64);
638 if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
639 return false;
641 return true;
645 * Create a new header structure, translate header format from the on-disk
646 * header.
648 static int rbd_header_from_disk(struct rbd_image_header *header,
649 struct rbd_image_header_ondisk *ondisk)
651 u32 snap_count;
652 size_t len;
653 size_t size;
654 u32 i;
656 memset(header, 0, sizeof (*header));
658 snap_count = le32_to_cpu(ondisk->snap_count);
660 len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
661 header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
662 if (!header->object_prefix)
663 return -ENOMEM;
664 memcpy(header->object_prefix, ondisk->object_prefix, len);
665 header->object_prefix[len] = '\0';
667 if (snap_count) {
668 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
670 /* Save a copy of the snapshot names */
672 if (snap_names_len > (u64) SIZE_MAX)
673 return -EIO;
674 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
675 if (!header->snap_names)
676 goto out_err;
678 * Note that rbd_dev_v1_header_read() guarantees
679 * the ondisk buffer we're working with has
680 * snap_names_len bytes beyond the end of the
681 * snapshot id array, this memcpy() is safe.
683 memcpy(header->snap_names, &ondisk->snaps[snap_count],
684 snap_names_len);
686 /* Record each snapshot's size */
688 size = snap_count * sizeof (*header->snap_sizes);
689 header->snap_sizes = kmalloc(size, GFP_KERNEL);
690 if (!header->snap_sizes)
691 goto out_err;
692 for (i = 0; i < snap_count; i++)
693 header->snap_sizes[i] =
694 le64_to_cpu(ondisk->snaps[i].image_size);
695 } else {
696 WARN_ON(ondisk->snap_names_len);
697 header->snap_names = NULL;
698 header->snap_sizes = NULL;
701 header->features = 0; /* No features support in v1 images */
702 header->obj_order = ondisk->options.order;
703 header->crypt_type = ondisk->options.crypt_type;
704 header->comp_type = ondisk->options.comp_type;
706 /* Allocate and fill in the snapshot context */
708 header->image_size = le64_to_cpu(ondisk->image_size);
709 size = sizeof (struct ceph_snap_context);
710 size += snap_count * sizeof (header->snapc->snaps[0]);
711 header->snapc = kzalloc(size, GFP_KERNEL);
712 if (!header->snapc)
713 goto out_err;
715 atomic_set(&header->snapc->nref, 1);
716 header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
717 header->snapc->num_snaps = snap_count;
718 for (i = 0; i < snap_count; i++)
719 header->snapc->snaps[i] =
720 le64_to_cpu(ondisk->snaps[i].id);
722 return 0;
724 out_err:
725 kfree(header->snap_sizes);
726 header->snap_sizes = NULL;
727 kfree(header->snap_names);
728 header->snap_names = NULL;
729 kfree(header->object_prefix);
730 header->object_prefix = NULL;
732 return -ENOMEM;
735 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
737 struct rbd_snap *snap;
739 if (snap_id == CEPH_NOSNAP)
740 return RBD_SNAP_HEAD_NAME;
742 list_for_each_entry(snap, &rbd_dev->snaps, node)
743 if (snap_id == snap->id)
744 return snap->name;
746 return NULL;
749 static int snap_by_name(struct rbd_device *rbd_dev, const char *snap_name)
752 struct rbd_snap *snap;
754 list_for_each_entry(snap, &rbd_dev->snaps, node) {
755 if (!strcmp(snap_name, snap->name)) {
756 rbd_dev->spec->snap_id = snap->id;
757 rbd_dev->mapping.size = snap->size;
758 rbd_dev->mapping.features = snap->features;
760 return 0;
764 return -ENOENT;
767 static int rbd_dev_set_mapping(struct rbd_device *rbd_dev)
769 int ret;
771 if (!memcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME,
772 sizeof (RBD_SNAP_HEAD_NAME))) {
773 rbd_dev->spec->snap_id = CEPH_NOSNAP;
774 rbd_dev->mapping.size = rbd_dev->header.image_size;
775 rbd_dev->mapping.features = rbd_dev->header.features;
776 ret = 0;
777 } else {
778 ret = snap_by_name(rbd_dev, rbd_dev->spec->snap_name);
779 if (ret < 0)
780 goto done;
781 rbd_dev->mapping.read_only = true;
783 atomic_set(&rbd_dev->exists, 1);
784 done:
785 return ret;
788 static void rbd_header_free(struct rbd_image_header *header)
790 kfree(header->object_prefix);
791 header->object_prefix = NULL;
792 kfree(header->snap_sizes);
793 header->snap_sizes = NULL;
794 kfree(header->snap_names);
795 header->snap_names = NULL;
796 ceph_put_snap_context(header->snapc);
797 header->snapc = NULL;
800 static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
802 char *name;
803 u64 segment;
804 int ret;
806 name = kmalloc(MAX_OBJ_NAME_SIZE + 1, GFP_NOIO);
807 if (!name)
808 return NULL;
809 segment = offset >> rbd_dev->header.obj_order;
810 ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx",
811 rbd_dev->header.object_prefix, segment);
812 if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
813 pr_err("error formatting segment name for #%llu (%d)\n",
814 segment, ret);
815 kfree(name);
816 name = NULL;
819 return name;
822 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
824 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
826 return offset & (segment_size - 1);
829 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
830 u64 offset, u64 length)
832 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
834 offset &= segment_size - 1;
836 rbd_assert(length <= U64_MAX - offset);
837 if (offset + length > segment_size)
838 length = segment_size - offset;
840 return length;
844 * returns the size of an object in the image
846 static u64 rbd_obj_bytes(struct rbd_image_header *header)
848 return 1 << header->obj_order;
852 * bio helpers
855 static void bio_chain_put(struct bio *chain)
857 struct bio *tmp;
859 while (chain) {
860 tmp = chain;
861 chain = chain->bi_next;
862 bio_put(tmp);
867 * zeros a bio chain, starting at specific offset
869 static void zero_bio_chain(struct bio *chain, int start_ofs)
871 struct bio_vec *bv;
872 unsigned long flags;
873 void *buf;
874 int i;
875 int pos = 0;
877 while (chain) {
878 bio_for_each_segment(bv, chain, i) {
879 if (pos + bv->bv_len > start_ofs) {
880 int remainder = max(start_ofs - pos, 0);
881 buf = bvec_kmap_irq(bv, &flags);
882 memset(buf + remainder, 0,
883 bv->bv_len - remainder);
884 bvec_kunmap_irq(buf, &flags);
886 pos += bv->bv_len;
889 chain = chain->bi_next;
894 * Clone a portion of a bio, starting at the given byte offset
895 * and continuing for the number of bytes indicated.
897 static struct bio *bio_clone_range(struct bio *bio_src,
898 unsigned int offset,
899 unsigned int len,
900 gfp_t gfpmask)
902 struct bio_vec *bv;
903 unsigned int resid;
904 unsigned short idx;
905 unsigned int voff;
906 unsigned short end_idx;
907 unsigned short vcnt;
908 struct bio *bio;
910 /* Handle the easy case for the caller */
912 if (!offset && len == bio_src->bi_size)
913 return bio_clone(bio_src, gfpmask);
915 if (WARN_ON_ONCE(!len))
916 return NULL;
917 if (WARN_ON_ONCE(len > bio_src->bi_size))
918 return NULL;
919 if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
920 return NULL;
922 /* Find first affected segment... */
924 resid = offset;
925 __bio_for_each_segment(bv, bio_src, idx, 0) {
926 if (resid < bv->bv_len)
927 break;
928 resid -= bv->bv_len;
930 voff = resid;
932 /* ...and the last affected segment */
934 resid += len;
935 __bio_for_each_segment(bv, bio_src, end_idx, idx) {
936 if (resid <= bv->bv_len)
937 break;
938 resid -= bv->bv_len;
940 vcnt = end_idx - idx + 1;
942 /* Build the clone */
944 bio = bio_alloc(gfpmask, (unsigned int) vcnt);
945 if (!bio)
946 return NULL; /* ENOMEM */
948 bio->bi_bdev = bio_src->bi_bdev;
949 bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
950 bio->bi_rw = bio_src->bi_rw;
951 bio->bi_flags |= 1 << BIO_CLONED;
954 * Copy over our part of the bio_vec, then update the first
955 * and last (or only) entries.
957 memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
958 vcnt * sizeof (struct bio_vec));
959 bio->bi_io_vec[0].bv_offset += voff;
960 if (vcnt > 1) {
961 bio->bi_io_vec[0].bv_len -= voff;
962 bio->bi_io_vec[vcnt - 1].bv_len = resid;
963 } else {
964 bio->bi_io_vec[0].bv_len = len;
967 bio->bi_vcnt = vcnt;
968 bio->bi_size = len;
969 bio->bi_idx = 0;
971 return bio;
975 * Clone a portion of a bio chain, starting at the given byte offset
976 * into the first bio in the source chain and continuing for the
977 * number of bytes indicated. The result is another bio chain of
978 * exactly the given length, or a null pointer on error.
980 * The bio_src and offset parameters are both in-out. On entry they
981 * refer to the first source bio and the offset into that bio where
982 * the start of data to be cloned is located.
984 * On return, bio_src is updated to refer to the bio in the source
985 * chain that contains first un-cloned byte, and *offset will
986 * contain the offset of that byte within that bio.
988 static struct bio *bio_chain_clone_range(struct bio **bio_src,
989 unsigned int *offset,
990 unsigned int len,
991 gfp_t gfpmask)
993 struct bio *bi = *bio_src;
994 unsigned int off = *offset;
995 struct bio *chain = NULL;
996 struct bio **end;
998 /* Build up a chain of clone bios up to the limit */
1000 if (!bi || off >= bi->bi_size || !len)
1001 return NULL; /* Nothing to clone */
1003 end = &chain;
1004 while (len) {
1005 unsigned int bi_size;
1006 struct bio *bio;
1008 if (!bi) {
1009 rbd_warn(NULL, "bio_chain exhausted with %u left", len);
1010 goto out_err; /* EINVAL; ran out of bio's */
1012 bi_size = min_t(unsigned int, bi->bi_size - off, len);
1013 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1014 if (!bio)
1015 goto out_err; /* ENOMEM */
1017 *end = bio;
1018 end = &bio->bi_next;
1020 off += bi_size;
1021 if (off == bi->bi_size) {
1022 bi = bi->bi_next;
1023 off = 0;
1025 len -= bi_size;
1027 *bio_src = bi;
1028 *offset = off;
1030 return chain;
1031 out_err:
1032 bio_chain_put(chain);
1034 return NULL;
1037 static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1039 kref_get(&obj_request->kref);
1042 static void rbd_obj_request_destroy(struct kref *kref);
1043 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1045 rbd_assert(obj_request != NULL);
1046 kref_put(&obj_request->kref, rbd_obj_request_destroy);
1049 static void rbd_img_request_get(struct rbd_img_request *img_request)
1051 kref_get(&img_request->kref);
1054 static void rbd_img_request_destroy(struct kref *kref);
1055 static void rbd_img_request_put(struct rbd_img_request *img_request)
1057 rbd_assert(img_request != NULL);
1058 kref_put(&img_request->kref, rbd_img_request_destroy);
1061 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1062 struct rbd_obj_request *obj_request)
1064 rbd_obj_request_get(obj_request);
1065 obj_request->img_request = img_request;
1066 list_add_tail(&obj_request->links, &img_request->obj_requests);
1067 obj_request->which = img_request->obj_request_count++;
1068 rbd_assert(obj_request->which != BAD_WHICH);
1071 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1072 struct rbd_obj_request *obj_request)
1074 rbd_assert(obj_request->which != BAD_WHICH);
1075 obj_request->which = BAD_WHICH;
1076 list_del(&obj_request->links);
1077 rbd_assert(obj_request->img_request == img_request);
1078 obj_request->callback = NULL;
1079 obj_request->img_request = NULL;
1080 rbd_obj_request_put(obj_request);
1083 static bool obj_request_type_valid(enum obj_request_type type)
1085 switch (type) {
1086 case OBJ_REQUEST_BIO:
1087 case OBJ_REQUEST_PAGES:
1088 return true;
1089 default:
1090 return false;
1094 struct ceph_osd_req_op *rbd_osd_req_op_create(u16 opcode, ...)
1096 struct ceph_osd_req_op *op;
1097 va_list args;
1098 size_t size;
1100 op = kzalloc(sizeof (*op), GFP_NOIO);
1101 if (!op)
1102 return NULL;
1103 op->op = opcode;
1104 va_start(args, opcode);
1105 switch (opcode) {
1106 case CEPH_OSD_OP_READ:
1107 case CEPH_OSD_OP_WRITE:
1108 /* rbd_osd_req_op_create(READ, offset, length) */
1109 /* rbd_osd_req_op_create(WRITE, offset, length) */
1110 op->extent.offset = va_arg(args, u64);
1111 op->extent.length = va_arg(args, u64);
1112 if (opcode == CEPH_OSD_OP_WRITE)
1113 op->payload_len = op->extent.length;
1114 break;
1115 case CEPH_OSD_OP_CALL:
1116 /* rbd_osd_req_op_create(CALL, class, method, data, datalen) */
1117 op->cls.class_name = va_arg(args, char *);
1118 size = strlen(op->cls.class_name);
1119 rbd_assert(size <= (size_t) U8_MAX);
1120 op->cls.class_len = size;
1121 op->payload_len = size;
1123 op->cls.method_name = va_arg(args, char *);
1124 size = strlen(op->cls.method_name);
1125 rbd_assert(size <= (size_t) U8_MAX);
1126 op->cls.method_len = size;
1127 op->payload_len += size;
1129 op->cls.argc = 0;
1130 op->cls.indata = va_arg(args, void *);
1131 size = va_arg(args, size_t);
1132 rbd_assert(size <= (size_t) U32_MAX);
1133 op->cls.indata_len = (u32) size;
1134 op->payload_len += size;
1135 break;
1136 case CEPH_OSD_OP_NOTIFY_ACK:
1137 case CEPH_OSD_OP_WATCH:
1138 /* rbd_osd_req_op_create(NOTIFY_ACK, cookie, version) */
1139 /* rbd_osd_req_op_create(WATCH, cookie, version, flag) */
1140 op->watch.cookie = va_arg(args, u64);
1141 op->watch.ver = va_arg(args, u64);
1142 op->watch.ver = cpu_to_le64(op->watch.ver);
1143 if (opcode == CEPH_OSD_OP_WATCH && va_arg(args, int))
1144 op->watch.flag = (u8) 1;
1145 break;
1146 default:
1147 rbd_warn(NULL, "unsupported opcode %hu\n", opcode);
1148 kfree(op);
1149 op = NULL;
1150 break;
1152 va_end(args);
1154 return op;
1157 static void rbd_osd_req_op_destroy(struct ceph_osd_req_op *op)
1159 kfree(op);
1163 * Send ceph osd request
1165 static int rbd_do_request(struct request *rq,
1166 struct rbd_device *rbd_dev,
1167 struct ceph_snap_context *snapc,
1168 u64 snapid,
1169 const char *object_name, u64 ofs, u64 len,
1170 struct bio *bio,
1171 struct page **pages,
1172 int num_pages,
1173 int flags,
1174 struct ceph_osd_req_op *op,
1175 void (*rbd_cb)(struct ceph_osd_request *,
1176 struct ceph_msg *),
1177 u64 *ver)
1179 struct ceph_osd_client *osdc;
1180 struct ceph_osd_request *osd_req;
1181 struct timespec mtime = CURRENT_TIME;
1182 int ret;
1184 dout("rbd_do_request object_name=%s ofs=%llu len=%llu\n",
1185 object_name, (unsigned long long) ofs,
1186 (unsigned long long) len);
1188 osdc = &rbd_dev->rbd_client->client->osdc;
1189 osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_NOIO);
1190 if (!osd_req)
1191 return -ENOMEM;
1193 osd_req->r_flags = flags;
1194 osd_req->r_pages = pages;
1195 if (bio) {
1196 osd_req->r_bio = bio;
1197 bio_get(osd_req->r_bio);
1200 osd_req->r_callback = rbd_cb;
1201 osd_req->r_priv = NULL;
1203 strncpy(osd_req->r_oid, object_name, sizeof(osd_req->r_oid));
1204 osd_req->r_oid_len = strlen(osd_req->r_oid);
1206 osd_req->r_file_layout = rbd_dev->layout; /* struct */
1207 osd_req->r_num_pages = calc_pages_for(ofs, len);
1208 osd_req->r_page_alignment = ofs & ~PAGE_MASK;
1210 ceph_osdc_build_request(osd_req, ofs, len, 1, op,
1211 snapc, snapid, &mtime);
1213 if (op->op == CEPH_OSD_OP_WATCH && op->watch.flag) {
1214 ceph_osdc_set_request_linger(osdc, osd_req);
1215 rbd_dev->watch_request = osd_req;
1218 ret = ceph_osdc_start_request(osdc, osd_req, false);
1219 if (ret < 0)
1220 goto done_err;
1222 if (!rbd_cb) {
1223 u64 version;
1225 ret = ceph_osdc_wait_request(osdc, osd_req);
1226 version = le64_to_cpu(osd_req->r_reassert_version.version);
1227 if (ver)
1228 *ver = version;
1229 dout("reassert_ver=%llu\n", (unsigned long long) version);
1230 ceph_osdc_put_request(osd_req);
1232 return ret;
1234 done_err:
1235 if (bio)
1236 bio_chain_put(osd_req->r_bio);
1237 ceph_osdc_put_request(osd_req);
1239 return ret;
1242 static void rbd_simple_req_cb(struct ceph_osd_request *osd_req,
1243 struct ceph_msg *msg)
1245 ceph_osdc_put_request(osd_req);
1249 * Do a synchronous ceph osd operation
1251 static int rbd_req_sync_op(struct rbd_device *rbd_dev,
1252 int flags,
1253 struct ceph_osd_req_op *op,
1254 const char *object_name,
1255 u64 ofs, u64 inbound_size,
1256 char *inbound,
1257 u64 *ver)
1259 int ret;
1260 struct page **pages;
1261 int num_pages;
1263 rbd_assert(op != NULL);
1265 num_pages = calc_pages_for(ofs, inbound_size);
1266 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1267 if (IS_ERR(pages))
1268 return PTR_ERR(pages);
1270 ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
1271 object_name, ofs, inbound_size, NULL,
1272 pages, num_pages,
1273 flags,
1275 NULL,
1276 ver);
1277 if (ret < 0)
1278 goto done;
1280 if ((flags & CEPH_OSD_FLAG_READ) && inbound)
1281 ret = ceph_copy_from_page_vector(pages, inbound, ofs, ret);
1283 done:
1284 ceph_release_page_vector(pages, num_pages);
1285 return ret;
1288 static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1289 struct rbd_obj_request *obj_request)
1291 return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1294 static void rbd_img_request_complete(struct rbd_img_request *img_request)
1296 if (img_request->callback)
1297 img_request->callback(img_request);
1298 else
1299 rbd_img_request_put(img_request);
1302 /* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1304 static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1306 return wait_for_completion_interruptible(&obj_request->completion);
1309 static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1311 if (obj_request->callback)
1312 obj_request->callback(obj_request);
1313 else
1314 complete_all(&obj_request->completion);
1318 * Synchronously read a range from an object into a provided buffer
1320 static int rbd_req_sync_read(struct rbd_device *rbd_dev,
1321 const char *object_name,
1322 u64 ofs, u64 len,
1323 char *buf,
1324 u64 *ver)
1326 struct ceph_osd_req_op *op;
1327 int ret;
1329 op = rbd_osd_req_op_create(CEPH_OSD_OP_READ, ofs, len);
1330 if (!op)
1331 return -ENOMEM;
1333 ret = rbd_req_sync_op(rbd_dev, CEPH_OSD_FLAG_READ,
1334 op, object_name, ofs, len, buf, ver);
1335 rbd_osd_req_op_destroy(op);
1337 return ret;
1341 * Request sync osd watch
1343 static int rbd_req_sync_notify_ack(struct rbd_device *rbd_dev,
1344 u64 ver,
1345 u64 notify_id)
1347 struct ceph_osd_req_op *op;
1348 int ret;
1350 op = rbd_osd_req_op_create(CEPH_OSD_OP_NOTIFY_ACK, notify_id, ver);
1351 if (!op)
1352 return -ENOMEM;
1354 ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
1355 rbd_dev->header_name, 0, 0, NULL,
1356 NULL, 0,
1357 CEPH_OSD_FLAG_READ,
1359 rbd_simple_req_cb, NULL);
1361 rbd_osd_req_op_destroy(op);
1363 return ret;
1366 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1368 struct rbd_device *rbd_dev = (struct rbd_device *)data;
1369 u64 hver;
1370 int rc;
1372 if (!rbd_dev)
1373 return;
1375 dout("rbd_watch_cb %s notify_id=%llu opcode=%u\n",
1376 rbd_dev->header_name, (unsigned long long) notify_id,
1377 (unsigned int) opcode);
1378 rc = rbd_dev_refresh(rbd_dev, &hver);
1379 if (rc)
1380 rbd_warn(rbd_dev, "got notification but failed to "
1381 " update snaps: %d\n", rc);
1383 rbd_req_sync_notify_ack(rbd_dev, hver, notify_id);
1387 * Request sync osd watch/unwatch. The value of "start" determines
1388 * whether a watch request is being initiated or torn down.
1390 static int rbd_req_sync_watch(struct rbd_device *rbd_dev, int start)
1392 struct ceph_osd_req_op *op;
1393 int ret = 0;
1395 rbd_assert(start ^ !!rbd_dev->watch_event);
1396 rbd_assert(start ^ !!rbd_dev->watch_request);
1398 if (start) {
1399 struct ceph_osd_client *osdc;
1401 osdc = &rbd_dev->rbd_client->client->osdc;
1402 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0, rbd_dev,
1403 &rbd_dev->watch_event);
1404 if (ret < 0)
1405 return ret;
1408 op = rbd_osd_req_op_create(CEPH_OSD_OP_WATCH,
1409 rbd_dev->watch_event->cookie,
1410 rbd_dev->header.obj_version, start);
1411 if (op)
1412 ret = rbd_req_sync_op(rbd_dev,
1413 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1414 op, rbd_dev->header_name,
1415 0, 0, NULL, NULL);
1417 /* Cancel the event if we're tearing down, or on error */
1419 if (!start || !op || ret < 0) {
1420 ceph_osdc_cancel_event(rbd_dev->watch_event);
1421 rbd_dev->watch_event = NULL;
1423 rbd_osd_req_op_destroy(op);
1425 return ret;
1429 * Synchronous osd object method call
1431 static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
1432 const char *object_name,
1433 const char *class_name,
1434 const char *method_name,
1435 const char *outbound,
1436 size_t outbound_size,
1437 char *inbound,
1438 size_t inbound_size,
1439 u64 *ver)
1441 struct ceph_osd_req_op *op;
1442 int ret;
1445 * Any input parameters required by the method we're calling
1446 * will be sent along with the class and method names as
1447 * part of the message payload. That data and its size are
1448 * supplied via the indata and indata_len fields (named from
1449 * the perspective of the server side) in the OSD request
1450 * operation.
1452 op = rbd_osd_req_op_create(CEPH_OSD_OP_CALL, class_name,
1453 method_name, outbound, outbound_size);
1454 if (!op)
1455 return -ENOMEM;
1457 ret = rbd_req_sync_op(rbd_dev, CEPH_OSD_FLAG_READ, op,
1458 object_name, 0, inbound_size, inbound,
1459 ver);
1461 rbd_osd_req_op_destroy(op);
1463 dout("cls_exec returned %d\n", ret);
1464 return ret;
1467 static void rbd_osd_read_callback(struct rbd_obj_request *obj_request,
1468 struct ceph_osd_op *op)
1470 u64 xferred;
1473 * We support a 64-bit length, but ultimately it has to be
1474 * passed to blk_end_request(), which takes an unsigned int.
1476 xferred = le64_to_cpu(op->extent.length);
1477 rbd_assert(xferred < (u64) UINT_MAX);
1478 if (obj_request->result == (s32) -ENOENT) {
1479 zero_bio_chain(obj_request->bio_list, 0);
1480 obj_request->result = 0;
1481 } else if (xferred < obj_request->length && !obj_request->result) {
1482 zero_bio_chain(obj_request->bio_list, xferred);
1483 xferred = obj_request->length;
1485 obj_request->xferred = xferred;
1486 atomic_set(&obj_request->done, 1);
1489 static void rbd_osd_write_callback(struct rbd_obj_request *obj_request,
1490 struct ceph_osd_op *op)
1492 obj_request->xferred = le64_to_cpu(op->extent.length);
1493 atomic_set(&obj_request->done, 1);
1496 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1497 struct ceph_msg *msg)
1499 struct rbd_obj_request *obj_request = osd_req->r_priv;
1500 struct ceph_osd_reply_head *reply_head;
1501 struct ceph_osd_op *op;
1502 u32 num_ops;
1503 u16 opcode;
1505 rbd_assert(osd_req == obj_request->osd_req);
1506 rbd_assert(!!obj_request->img_request ^
1507 (obj_request->which == BAD_WHICH));
1509 obj_request->xferred = le32_to_cpu(msg->hdr.data_len);
1510 reply_head = msg->front.iov_base;
1511 obj_request->result = (s32) le32_to_cpu(reply_head->result);
1512 obj_request->version = le64_to_cpu(osd_req->r_reassert_version.version);
1514 num_ops = le32_to_cpu(reply_head->num_ops);
1515 WARN_ON(num_ops != 1); /* For now */
1517 op = &reply_head->ops[0];
1518 opcode = le16_to_cpu(op->op);
1519 switch (opcode) {
1520 case CEPH_OSD_OP_READ:
1521 rbd_osd_read_callback(obj_request, op);
1522 break;
1523 case CEPH_OSD_OP_WRITE:
1524 rbd_osd_write_callback(obj_request, op);
1525 break;
1526 default:
1527 rbd_warn(NULL, "%s: unsupported op %hu\n",
1528 obj_request->object_name, (unsigned short) opcode);
1529 break;
1532 if (atomic_read(&obj_request->done))
1533 rbd_obj_request_complete(obj_request);
1536 static struct ceph_osd_request *rbd_osd_req_create(
1537 struct rbd_device *rbd_dev,
1538 bool write_request,
1539 struct rbd_obj_request *obj_request,
1540 struct ceph_osd_req_op *op)
1542 struct rbd_img_request *img_request = obj_request->img_request;
1543 struct ceph_snap_context *snapc = NULL;
1544 struct ceph_osd_client *osdc;
1545 struct ceph_osd_request *osd_req;
1546 struct timespec now;
1547 struct timespec *mtime;
1548 u64 snap_id = CEPH_NOSNAP;
1549 u64 offset = obj_request->offset;
1550 u64 length = obj_request->length;
1552 if (img_request) {
1553 rbd_assert(img_request->write_request == write_request);
1554 if (img_request->write_request)
1555 snapc = img_request->snapc;
1556 else
1557 snap_id = img_request->snap_id;
1560 /* Allocate and initialize the request, for the single op */
1562 osdc = &rbd_dev->rbd_client->client->osdc;
1563 osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
1564 if (!osd_req)
1565 return NULL; /* ENOMEM */
1567 rbd_assert(obj_request_type_valid(obj_request->type));
1568 switch (obj_request->type) {
1569 case OBJ_REQUEST_BIO:
1570 rbd_assert(obj_request->bio_list != NULL);
1571 osd_req->r_bio = obj_request->bio_list;
1572 bio_get(osd_req->r_bio);
1573 /* osd client requires "num pages" even for bio */
1574 osd_req->r_num_pages = calc_pages_for(offset, length);
1575 break;
1576 case OBJ_REQUEST_PAGES:
1577 osd_req->r_pages = obj_request->pages;
1578 osd_req->r_num_pages = obj_request->page_count;
1579 osd_req->r_page_alignment = offset & ~PAGE_MASK;
1580 break;
1583 if (write_request) {
1584 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1585 now = CURRENT_TIME;
1586 mtime = &now;
1587 } else {
1588 osd_req->r_flags = CEPH_OSD_FLAG_READ;
1589 mtime = NULL; /* not needed for reads */
1590 offset = 0; /* These are not used... */
1591 length = 0; /* ...for osd read requests */
1594 osd_req->r_callback = rbd_osd_req_callback;
1595 osd_req->r_priv = obj_request;
1597 osd_req->r_oid_len = strlen(obj_request->object_name);
1598 rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1599 memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1601 osd_req->r_file_layout = rbd_dev->layout; /* struct */
1603 /* osd_req will get its own reference to snapc (if non-null) */
1605 ceph_osdc_build_request(osd_req, offset, length, 1, op,
1606 snapc, snap_id, mtime);
1608 return osd_req;
1611 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1613 ceph_osdc_put_request(osd_req);
1616 /* object_name is assumed to be a non-null pointer and NUL-terminated */
1618 static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1619 u64 offset, u64 length,
1620 enum obj_request_type type)
1622 struct rbd_obj_request *obj_request;
1623 size_t size;
1624 char *name;
1626 rbd_assert(obj_request_type_valid(type));
1628 size = strlen(object_name) + 1;
1629 obj_request = kzalloc(sizeof (*obj_request) + size, GFP_KERNEL);
1630 if (!obj_request)
1631 return NULL;
1633 name = (char *)(obj_request + 1);
1634 obj_request->object_name = memcpy(name, object_name, size);
1635 obj_request->offset = offset;
1636 obj_request->length = length;
1637 obj_request->which = BAD_WHICH;
1638 obj_request->type = type;
1639 INIT_LIST_HEAD(&obj_request->links);
1640 atomic_set(&obj_request->done, 0);
1641 init_completion(&obj_request->completion);
1642 kref_init(&obj_request->kref);
1644 return obj_request;
1647 static void rbd_obj_request_destroy(struct kref *kref)
1649 struct rbd_obj_request *obj_request;
1651 obj_request = container_of(kref, struct rbd_obj_request, kref);
1653 rbd_assert(obj_request->img_request == NULL);
1654 rbd_assert(obj_request->which == BAD_WHICH);
1656 if (obj_request->osd_req)
1657 rbd_osd_req_destroy(obj_request->osd_req);
1659 rbd_assert(obj_request_type_valid(obj_request->type));
1660 switch (obj_request->type) {
1661 case OBJ_REQUEST_BIO:
1662 if (obj_request->bio_list)
1663 bio_chain_put(obj_request->bio_list);
1664 break;
1665 case OBJ_REQUEST_PAGES:
1666 if (obj_request->pages)
1667 ceph_release_page_vector(obj_request->pages,
1668 obj_request->page_count);
1669 break;
1672 kfree(obj_request);
1676 * Caller is responsible for filling in the list of object requests
1677 * that comprises the image request, and the Linux request pointer
1678 * (if there is one).
1680 struct rbd_img_request *rbd_img_request_create(struct rbd_device *rbd_dev,
1681 u64 offset, u64 length,
1682 bool write_request)
1684 struct rbd_img_request *img_request;
1685 struct ceph_snap_context *snapc = NULL;
1687 img_request = kmalloc(sizeof (*img_request), GFP_ATOMIC);
1688 if (!img_request)
1689 return NULL;
1691 if (write_request) {
1692 down_read(&rbd_dev->header_rwsem);
1693 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1694 up_read(&rbd_dev->header_rwsem);
1695 if (WARN_ON(!snapc)) {
1696 kfree(img_request);
1697 return NULL; /* Shouldn't happen */
1701 img_request->rq = NULL;
1702 img_request->rbd_dev = rbd_dev;
1703 img_request->offset = offset;
1704 img_request->length = length;
1705 img_request->write_request = write_request;
1706 if (write_request)
1707 img_request->snapc = snapc;
1708 else
1709 img_request->snap_id = rbd_dev->spec->snap_id;
1710 spin_lock_init(&img_request->completion_lock);
1711 img_request->next_completion = 0;
1712 img_request->callback = NULL;
1713 img_request->obj_request_count = 0;
1714 INIT_LIST_HEAD(&img_request->obj_requests);
1715 kref_init(&img_request->kref);
1717 rbd_img_request_get(img_request); /* Avoid a warning */
1718 rbd_img_request_put(img_request); /* TEMPORARY */
1720 return img_request;
1723 static void rbd_img_request_destroy(struct kref *kref)
1725 struct rbd_img_request *img_request;
1726 struct rbd_obj_request *obj_request;
1727 struct rbd_obj_request *next_obj_request;
1729 img_request = container_of(kref, struct rbd_img_request, kref);
1731 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1732 rbd_img_obj_request_del(img_request, obj_request);
1734 if (img_request->write_request)
1735 ceph_put_snap_context(img_request->snapc);
1737 kfree(img_request);
1740 static int rbd_img_request_fill_bio(struct rbd_img_request *img_request,
1741 struct bio *bio_list)
1743 struct rbd_device *rbd_dev = img_request->rbd_dev;
1744 struct rbd_obj_request *obj_request = NULL;
1745 struct rbd_obj_request *next_obj_request;
1746 unsigned int bio_offset;
1747 u64 image_offset;
1748 u64 resid;
1749 u16 opcode;
1751 opcode = img_request->write_request ? CEPH_OSD_OP_WRITE
1752 : CEPH_OSD_OP_READ;
1753 bio_offset = 0;
1754 image_offset = img_request->offset;
1755 rbd_assert(image_offset == bio_list->bi_sector << SECTOR_SHIFT);
1756 resid = img_request->length;
1757 while (resid) {
1758 const char *object_name;
1759 unsigned int clone_size;
1760 struct ceph_osd_req_op *op;
1761 u64 offset;
1762 u64 length;
1764 object_name = rbd_segment_name(rbd_dev, image_offset);
1765 if (!object_name)
1766 goto out_unwind;
1767 offset = rbd_segment_offset(rbd_dev, image_offset);
1768 length = rbd_segment_length(rbd_dev, image_offset, resid);
1769 obj_request = rbd_obj_request_create(object_name,
1770 offset, length,
1771 OBJ_REQUEST_BIO);
1772 kfree(object_name); /* object request has its own copy */
1773 if (!obj_request)
1774 goto out_unwind;
1776 rbd_assert(length <= (u64) UINT_MAX);
1777 clone_size = (unsigned int) length;
1778 obj_request->bio_list = bio_chain_clone_range(&bio_list,
1779 &bio_offset, clone_size,
1780 GFP_ATOMIC);
1781 if (!obj_request->bio_list)
1782 goto out_partial;
1785 * Build up the op to use in building the osd
1786 * request. Note that the contents of the op are
1787 * copied by rbd_osd_req_create().
1789 op = rbd_osd_req_op_create(opcode, offset, length);
1790 if (!op)
1791 goto out_partial;
1792 obj_request->osd_req = rbd_osd_req_create(rbd_dev,
1793 img_request->write_request,
1794 obj_request, op);
1795 rbd_osd_req_op_destroy(op);
1796 if (!obj_request->osd_req)
1797 goto out_partial;
1798 /* status and version are initially zero-filled */
1800 rbd_img_obj_request_add(img_request, obj_request);
1802 image_offset += length;
1803 resid -= length;
1806 return 0;
1808 out_partial:
1809 rbd_obj_request_put(obj_request);
1810 out_unwind:
1811 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1812 rbd_obj_request_put(obj_request);
1814 return -ENOMEM;
1817 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
1819 struct rbd_img_request *img_request;
1820 u32 which = obj_request->which;
1821 bool more = true;
1823 img_request = obj_request->img_request;
1824 rbd_assert(img_request != NULL);
1825 rbd_assert(img_request->rq != NULL);
1826 rbd_assert(which != BAD_WHICH);
1827 rbd_assert(which < img_request->obj_request_count);
1828 rbd_assert(which >= img_request->next_completion);
1830 spin_lock_irq(&img_request->completion_lock);
1831 if (which != img_request->next_completion)
1832 goto out;
1834 for_each_obj_request_from(img_request, obj_request) {
1835 unsigned int xferred;
1836 int result;
1838 rbd_assert(more);
1839 rbd_assert(which < img_request->obj_request_count);
1841 if (!atomic_read(&obj_request->done))
1842 break;
1844 rbd_assert(obj_request->xferred <= (u64) UINT_MAX);
1845 xferred = (unsigned int) obj_request->xferred;
1846 result = (int) obj_request->result;
1847 if (result)
1848 rbd_warn(NULL, "obj_request %s result %d xferred %u\n",
1849 img_request->write_request ? "write" : "read",
1850 result, xferred);
1852 more = blk_end_request(img_request->rq, result, xferred);
1853 which++;
1855 rbd_assert(more ^ (which == img_request->obj_request_count));
1856 img_request->next_completion = which;
1857 out:
1858 spin_unlock_irq(&img_request->completion_lock);
1860 if (!more)
1861 rbd_img_request_complete(img_request);
1864 static int rbd_img_request_submit(struct rbd_img_request *img_request)
1866 struct rbd_device *rbd_dev = img_request->rbd_dev;
1867 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1868 struct rbd_obj_request *obj_request;
1870 for_each_obj_request(img_request, obj_request) {
1871 int ret;
1873 obj_request->callback = rbd_img_obj_callback;
1874 ret = rbd_obj_request_submit(osdc, obj_request);
1875 if (ret)
1876 return ret;
1878 * The image request has its own reference to each
1879 * of its object requests, so we can safely drop the
1880 * initial one here.
1882 rbd_obj_request_put(obj_request);
1885 return 0;
1888 static void rbd_request_fn(struct request_queue *q)
1890 struct rbd_device *rbd_dev = q->queuedata;
1891 bool read_only = rbd_dev->mapping.read_only;
1892 struct request *rq;
1893 int result;
1895 while ((rq = blk_fetch_request(q))) {
1896 bool write_request = rq_data_dir(rq) == WRITE;
1897 struct rbd_img_request *img_request;
1898 u64 offset;
1899 u64 length;
1901 /* Ignore any non-FS requests that filter through. */
1903 if (rq->cmd_type != REQ_TYPE_FS) {
1904 __blk_end_request_all(rq, 0);
1905 continue;
1908 spin_unlock_irq(q->queue_lock);
1910 /* Disallow writes to a read-only device */
1912 if (write_request) {
1913 result = -EROFS;
1914 if (read_only)
1915 goto end_request;
1916 rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
1919 /* Quit early if the snapshot has disappeared */
1921 if (!atomic_read(&rbd_dev->exists)) {
1922 dout("request for non-existent snapshot");
1923 rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
1924 result = -ENXIO;
1925 goto end_request;
1928 offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
1929 length = (u64) blk_rq_bytes(rq);
1931 result = -EINVAL;
1932 if (WARN_ON(offset && length > U64_MAX - offset + 1))
1933 goto end_request; /* Shouldn't happen */
1935 result = -ENOMEM;
1936 img_request = rbd_img_request_create(rbd_dev, offset, length,
1937 write_request);
1938 if (!img_request)
1939 goto end_request;
1941 img_request->rq = rq;
1943 result = rbd_img_request_fill_bio(img_request, rq->bio);
1944 if (!result)
1945 result = rbd_img_request_submit(img_request);
1946 if (result)
1947 rbd_img_request_put(img_request);
1948 end_request:
1949 spin_lock_irq(q->queue_lock);
1950 if (result < 0) {
1951 rbd_warn(rbd_dev, "obj_request %s result %d\n",
1952 write_request ? "write" : "read", result);
1953 __blk_end_request_all(rq, result);
1959 * a queue callback. Makes sure that we don't create a bio that spans across
1960 * multiple osd objects. One exception would be with a single page bios,
1961 * which we handle later at bio_chain_clone_range()
1963 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1964 struct bio_vec *bvec)
1966 struct rbd_device *rbd_dev = q->queuedata;
1967 sector_t sector_offset;
1968 sector_t sectors_per_obj;
1969 sector_t obj_sector_offset;
1970 int ret;
1973 * Find how far into its rbd object the partition-relative
1974 * bio start sector is to offset relative to the enclosing
1975 * device.
1977 sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
1978 sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
1979 obj_sector_offset = sector_offset & (sectors_per_obj - 1);
1982 * Compute the number of bytes from that offset to the end
1983 * of the object. Account for what's already used by the bio.
1985 ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
1986 if (ret > bmd->bi_size)
1987 ret -= bmd->bi_size;
1988 else
1989 ret = 0;
1992 * Don't send back more than was asked for. And if the bio
1993 * was empty, let the whole thing through because: "Note
1994 * that a block device *must* allow a single page to be
1995 * added to an empty bio."
1997 rbd_assert(bvec->bv_len <= PAGE_SIZE);
1998 if (ret > (int) bvec->bv_len || !bmd->bi_size)
1999 ret = (int) bvec->bv_len;
2001 return ret;
2004 static void rbd_free_disk(struct rbd_device *rbd_dev)
2006 struct gendisk *disk = rbd_dev->disk;
2008 if (!disk)
2009 return;
2011 if (disk->flags & GENHD_FL_UP)
2012 del_gendisk(disk);
2013 if (disk->queue)
2014 blk_cleanup_queue(disk->queue);
2015 put_disk(disk);
2018 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
2019 const char *object_name,
2020 u64 offset, u64 length,
2021 char *buf, u64 *version)
2024 struct ceph_osd_req_op *op;
2025 struct rbd_obj_request *obj_request;
2026 struct ceph_osd_client *osdc;
2027 struct page **pages = NULL;
2028 u32 page_count;
2029 int ret;
2031 page_count = (u32) calc_pages_for(offset, length);
2032 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2033 if (IS_ERR(pages))
2034 ret = PTR_ERR(pages);
2036 ret = -ENOMEM;
2037 obj_request = rbd_obj_request_create(object_name, offset, length,
2038 OBJ_REQUEST_PAGES);
2039 if (!obj_request)
2040 goto out;
2042 obj_request->pages = pages;
2043 obj_request->page_count = page_count;
2045 op = rbd_osd_req_op_create(CEPH_OSD_OP_READ, offset, length);
2046 if (!op)
2047 goto out;
2048 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false,
2049 obj_request, op);
2050 rbd_osd_req_op_destroy(op);
2051 if (!obj_request->osd_req)
2052 goto out;
2054 osdc = &rbd_dev->rbd_client->client->osdc;
2055 ret = rbd_obj_request_submit(osdc, obj_request);
2056 if (ret)
2057 goto out;
2058 ret = rbd_obj_request_wait(obj_request);
2059 if (ret)
2060 goto out;
2062 ret = obj_request->result;
2063 if (ret < 0)
2064 goto out;
2065 ret = ceph_copy_from_page_vector(pages, buf, 0, obj_request->xferred);
2066 if (version)
2067 *version = obj_request->version;
2068 out:
2069 if (obj_request)
2070 rbd_obj_request_put(obj_request);
2071 else
2072 ceph_release_page_vector(pages, page_count);
2074 return ret;
2078 * Read the complete header for the given rbd device.
2080 * Returns a pointer to a dynamically-allocated buffer containing
2081 * the complete and validated header. Caller can pass the address
2082 * of a variable that will be filled in with the version of the
2083 * header object at the time it was read.
2085 * Returns a pointer-coded errno if a failure occurs.
2087 static struct rbd_image_header_ondisk *
2088 rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
2090 struct rbd_image_header_ondisk *ondisk = NULL;
2091 u32 snap_count = 0;
2092 u64 names_size = 0;
2093 u32 want_count;
2094 int ret;
2097 * The complete header will include an array of its 64-bit
2098 * snapshot ids, followed by the names of those snapshots as
2099 * a contiguous block of NUL-terminated strings. Note that
2100 * the number of snapshots could change by the time we read
2101 * it in, in which case we re-read it.
2103 do {
2104 size_t size;
2106 kfree(ondisk);
2108 size = sizeof (*ondisk);
2109 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
2110 size += names_size;
2111 ondisk = kmalloc(size, GFP_KERNEL);
2112 if (!ondisk)
2113 return ERR_PTR(-ENOMEM);
2115 (void) rbd_req_sync_read; /* avoid a warning */
2116 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
2117 0, size,
2118 (char *) ondisk, version);
2120 if (ret < 0)
2121 goto out_err;
2122 if (WARN_ON((size_t) ret < size)) {
2123 ret = -ENXIO;
2124 rbd_warn(rbd_dev, "short header read (want %zd got %d)",
2125 size, ret);
2126 goto out_err;
2128 if (!rbd_dev_ondisk_valid(ondisk)) {
2129 ret = -ENXIO;
2130 rbd_warn(rbd_dev, "invalid header");
2131 goto out_err;
2134 names_size = le64_to_cpu(ondisk->snap_names_len);
2135 want_count = snap_count;
2136 snap_count = le32_to_cpu(ondisk->snap_count);
2137 } while (snap_count != want_count);
2139 return ondisk;
2141 out_err:
2142 kfree(ondisk);
2144 return ERR_PTR(ret);
2148 * reload the ondisk the header
2150 static int rbd_read_header(struct rbd_device *rbd_dev,
2151 struct rbd_image_header *header)
2153 struct rbd_image_header_ondisk *ondisk;
2154 u64 ver = 0;
2155 int ret;
2157 ondisk = rbd_dev_v1_header_read(rbd_dev, &ver);
2158 if (IS_ERR(ondisk))
2159 return PTR_ERR(ondisk);
2160 ret = rbd_header_from_disk(header, ondisk);
2161 if (ret >= 0)
2162 header->obj_version = ver;
2163 kfree(ondisk);
2165 return ret;
2168 static void rbd_remove_all_snaps(struct rbd_device *rbd_dev)
2170 struct rbd_snap *snap;
2171 struct rbd_snap *next;
2173 list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
2174 rbd_remove_snap_dev(snap);
2177 static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
2179 sector_t size;
2181 if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
2182 return;
2184 size = (sector_t) rbd_dev->header.image_size / SECTOR_SIZE;
2185 dout("setting size to %llu sectors", (unsigned long long) size);
2186 rbd_dev->mapping.size = (u64) size;
2187 set_capacity(rbd_dev->disk, size);
2191 * only read the first part of the ondisk header, without the snaps info
2193 static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev, u64 *hver)
2195 int ret;
2196 struct rbd_image_header h;
2198 ret = rbd_read_header(rbd_dev, &h);
2199 if (ret < 0)
2200 return ret;
2202 down_write(&rbd_dev->header_rwsem);
2204 /* Update image size, and check for resize of mapped image */
2205 rbd_dev->header.image_size = h.image_size;
2206 rbd_update_mapping_size(rbd_dev);
2208 /* rbd_dev->header.object_prefix shouldn't change */
2209 kfree(rbd_dev->header.snap_sizes);
2210 kfree(rbd_dev->header.snap_names);
2211 /* osd requests may still refer to snapc */
2212 ceph_put_snap_context(rbd_dev->header.snapc);
2214 if (hver)
2215 *hver = h.obj_version;
2216 rbd_dev->header.obj_version = h.obj_version;
2217 rbd_dev->header.image_size = h.image_size;
2218 rbd_dev->header.snapc = h.snapc;
2219 rbd_dev->header.snap_names = h.snap_names;
2220 rbd_dev->header.snap_sizes = h.snap_sizes;
2221 /* Free the extra copy of the object prefix */
2222 WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
2223 kfree(h.object_prefix);
2225 ret = rbd_dev_snaps_update(rbd_dev);
2226 if (!ret)
2227 ret = rbd_dev_snaps_register(rbd_dev);
2229 up_write(&rbd_dev->header_rwsem);
2231 return ret;
2234 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver)
2236 int ret;
2238 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
2239 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2240 if (rbd_dev->image_format == 1)
2241 ret = rbd_dev_v1_refresh(rbd_dev, hver);
2242 else
2243 ret = rbd_dev_v2_refresh(rbd_dev, hver);
2244 mutex_unlock(&ctl_mutex);
2246 return ret;
2249 static int rbd_init_disk(struct rbd_device *rbd_dev)
2251 struct gendisk *disk;
2252 struct request_queue *q;
2253 u64 segment_size;
2255 /* create gendisk info */
2256 disk = alloc_disk(RBD_MINORS_PER_MAJOR);
2257 if (!disk)
2258 return -ENOMEM;
2260 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
2261 rbd_dev->dev_id);
2262 disk->major = rbd_dev->major;
2263 disk->first_minor = 0;
2264 disk->fops = &rbd_bd_ops;
2265 disk->private_data = rbd_dev;
2267 q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
2268 if (!q)
2269 goto out_disk;
2271 /* We use the default size, but let's be explicit about it. */
2272 blk_queue_physical_block_size(q, SECTOR_SIZE);
2274 /* set io sizes to object size */
2275 segment_size = rbd_obj_bytes(&rbd_dev->header);
2276 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
2277 blk_queue_max_segment_size(q, segment_size);
2278 blk_queue_io_min(q, segment_size);
2279 blk_queue_io_opt(q, segment_size);
2281 blk_queue_merge_bvec(q, rbd_merge_bvec);
2282 disk->queue = q;
2284 q->queuedata = rbd_dev;
2286 rbd_dev->disk = disk;
2288 set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
2290 return 0;
2291 out_disk:
2292 put_disk(disk);
2294 return -ENOMEM;
2298 sysfs
2301 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
2303 return container_of(dev, struct rbd_device, dev);
2306 static ssize_t rbd_size_show(struct device *dev,
2307 struct device_attribute *attr, char *buf)
2309 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2310 sector_t size;
2312 down_read(&rbd_dev->header_rwsem);
2313 size = get_capacity(rbd_dev->disk);
2314 up_read(&rbd_dev->header_rwsem);
2316 return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
2320 * Note this shows the features for whatever's mapped, which is not
2321 * necessarily the base image.
2323 static ssize_t rbd_features_show(struct device *dev,
2324 struct device_attribute *attr, char *buf)
2326 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2328 return sprintf(buf, "0x%016llx\n",
2329 (unsigned long long) rbd_dev->mapping.features);
2332 static ssize_t rbd_major_show(struct device *dev,
2333 struct device_attribute *attr, char *buf)
2335 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2337 return sprintf(buf, "%d\n", rbd_dev->major);
2340 static ssize_t rbd_client_id_show(struct device *dev,
2341 struct device_attribute *attr, char *buf)
2343 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2345 return sprintf(buf, "client%lld\n",
2346 ceph_client_id(rbd_dev->rbd_client->client));
2349 static ssize_t rbd_pool_show(struct device *dev,
2350 struct device_attribute *attr, char *buf)
2352 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2354 return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
2357 static ssize_t rbd_pool_id_show(struct device *dev,
2358 struct device_attribute *attr, char *buf)
2360 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2362 return sprintf(buf, "%llu\n",
2363 (unsigned long long) rbd_dev->spec->pool_id);
2366 static ssize_t rbd_name_show(struct device *dev,
2367 struct device_attribute *attr, char *buf)
2369 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2371 if (rbd_dev->spec->image_name)
2372 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
2374 return sprintf(buf, "(unknown)\n");
2377 static ssize_t rbd_image_id_show(struct device *dev,
2378 struct device_attribute *attr, char *buf)
2380 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2382 return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
2386 * Shows the name of the currently-mapped snapshot (or
2387 * RBD_SNAP_HEAD_NAME for the base image).
2389 static ssize_t rbd_snap_show(struct device *dev,
2390 struct device_attribute *attr,
2391 char *buf)
2393 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2395 return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
2399 * For an rbd v2 image, shows the pool id, image id, and snapshot id
2400 * for the parent image. If there is no parent, simply shows
2401 * "(no parent image)".
2403 static ssize_t rbd_parent_show(struct device *dev,
2404 struct device_attribute *attr,
2405 char *buf)
2407 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2408 struct rbd_spec *spec = rbd_dev->parent_spec;
2409 int count;
2410 char *bufp = buf;
2412 if (!spec)
2413 return sprintf(buf, "(no parent image)\n");
2415 count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
2416 (unsigned long long) spec->pool_id, spec->pool_name);
2417 if (count < 0)
2418 return count;
2419 bufp += count;
2421 count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
2422 spec->image_name ? spec->image_name : "(unknown)");
2423 if (count < 0)
2424 return count;
2425 bufp += count;
2427 count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
2428 (unsigned long long) spec->snap_id, spec->snap_name);
2429 if (count < 0)
2430 return count;
2431 bufp += count;
2433 count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
2434 if (count < 0)
2435 return count;
2436 bufp += count;
2438 return (ssize_t) (bufp - buf);
2441 static ssize_t rbd_image_refresh(struct device *dev,
2442 struct device_attribute *attr,
2443 const char *buf,
2444 size_t size)
2446 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2447 int ret;
2449 ret = rbd_dev_refresh(rbd_dev, NULL);
2451 return ret < 0 ? ret : size;
2454 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
2455 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
2456 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
2457 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
2458 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
2459 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
2460 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
2461 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
2462 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
2463 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
2464 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
2466 static struct attribute *rbd_attrs[] = {
2467 &dev_attr_size.attr,
2468 &dev_attr_features.attr,
2469 &dev_attr_major.attr,
2470 &dev_attr_client_id.attr,
2471 &dev_attr_pool.attr,
2472 &dev_attr_pool_id.attr,
2473 &dev_attr_name.attr,
2474 &dev_attr_image_id.attr,
2475 &dev_attr_current_snap.attr,
2476 &dev_attr_parent.attr,
2477 &dev_attr_refresh.attr,
2478 NULL
2481 static struct attribute_group rbd_attr_group = {
2482 .attrs = rbd_attrs,
2485 static const struct attribute_group *rbd_attr_groups[] = {
2486 &rbd_attr_group,
2487 NULL
2490 static void rbd_sysfs_dev_release(struct device *dev)
2494 static struct device_type rbd_device_type = {
2495 .name = "rbd",
2496 .groups = rbd_attr_groups,
2497 .release = rbd_sysfs_dev_release,
2502 sysfs - snapshots
2505 static ssize_t rbd_snap_size_show(struct device *dev,
2506 struct device_attribute *attr,
2507 char *buf)
2509 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2511 return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
2514 static ssize_t rbd_snap_id_show(struct device *dev,
2515 struct device_attribute *attr,
2516 char *buf)
2518 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2520 return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
2523 static ssize_t rbd_snap_features_show(struct device *dev,
2524 struct device_attribute *attr,
2525 char *buf)
2527 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2529 return sprintf(buf, "0x%016llx\n",
2530 (unsigned long long) snap->features);
2533 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
2534 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
2535 static DEVICE_ATTR(snap_features, S_IRUGO, rbd_snap_features_show, NULL);
2537 static struct attribute *rbd_snap_attrs[] = {
2538 &dev_attr_snap_size.attr,
2539 &dev_attr_snap_id.attr,
2540 &dev_attr_snap_features.attr,
2541 NULL,
2544 static struct attribute_group rbd_snap_attr_group = {
2545 .attrs = rbd_snap_attrs,
2548 static void rbd_snap_dev_release(struct device *dev)
2550 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2551 kfree(snap->name);
2552 kfree(snap);
2555 static const struct attribute_group *rbd_snap_attr_groups[] = {
2556 &rbd_snap_attr_group,
2557 NULL
2560 static struct device_type rbd_snap_device_type = {
2561 .groups = rbd_snap_attr_groups,
2562 .release = rbd_snap_dev_release,
2565 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
2567 kref_get(&spec->kref);
2569 return spec;
2572 static void rbd_spec_free(struct kref *kref);
2573 static void rbd_spec_put(struct rbd_spec *spec)
2575 if (spec)
2576 kref_put(&spec->kref, rbd_spec_free);
2579 static struct rbd_spec *rbd_spec_alloc(void)
2581 struct rbd_spec *spec;
2583 spec = kzalloc(sizeof (*spec), GFP_KERNEL);
2584 if (!spec)
2585 return NULL;
2586 kref_init(&spec->kref);
2588 rbd_spec_put(rbd_spec_get(spec)); /* TEMPORARY */
2590 return spec;
2593 static void rbd_spec_free(struct kref *kref)
2595 struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
2597 kfree(spec->pool_name);
2598 kfree(spec->image_id);
2599 kfree(spec->image_name);
2600 kfree(spec->snap_name);
2601 kfree(spec);
2604 struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
2605 struct rbd_spec *spec)
2607 struct rbd_device *rbd_dev;
2609 rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
2610 if (!rbd_dev)
2611 return NULL;
2613 spin_lock_init(&rbd_dev->lock);
2614 atomic_set(&rbd_dev->exists, 0);
2615 INIT_LIST_HEAD(&rbd_dev->node);
2616 INIT_LIST_HEAD(&rbd_dev->snaps);
2617 init_rwsem(&rbd_dev->header_rwsem);
2619 rbd_dev->spec = spec;
2620 rbd_dev->rbd_client = rbdc;
2622 /* Initialize the layout used for all rbd requests */
2624 rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
2625 rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
2626 rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
2627 rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
2629 return rbd_dev;
2632 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
2634 rbd_spec_put(rbd_dev->parent_spec);
2635 kfree(rbd_dev->header_name);
2636 rbd_put_client(rbd_dev->rbd_client);
2637 rbd_spec_put(rbd_dev->spec);
2638 kfree(rbd_dev);
2641 static bool rbd_snap_registered(struct rbd_snap *snap)
2643 bool ret = snap->dev.type == &rbd_snap_device_type;
2644 bool reg = device_is_registered(&snap->dev);
2646 rbd_assert(!ret ^ reg);
2648 return ret;
2651 static void rbd_remove_snap_dev(struct rbd_snap *snap)
2653 list_del(&snap->node);
2654 if (device_is_registered(&snap->dev))
2655 device_unregister(&snap->dev);
2658 static int rbd_register_snap_dev(struct rbd_snap *snap,
2659 struct device *parent)
2661 struct device *dev = &snap->dev;
2662 int ret;
2664 dev->type = &rbd_snap_device_type;
2665 dev->parent = parent;
2666 dev->release = rbd_snap_dev_release;
2667 dev_set_name(dev, "%s%s", RBD_SNAP_DEV_NAME_PREFIX, snap->name);
2668 dout("%s: registering device for snapshot %s\n", __func__, snap->name);
2670 ret = device_register(dev);
2672 return ret;
2675 static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
2676 const char *snap_name,
2677 u64 snap_id, u64 snap_size,
2678 u64 snap_features)
2680 struct rbd_snap *snap;
2681 int ret;
2683 snap = kzalloc(sizeof (*snap), GFP_KERNEL);
2684 if (!snap)
2685 return ERR_PTR(-ENOMEM);
2687 ret = -ENOMEM;
2688 snap->name = kstrdup(snap_name, GFP_KERNEL);
2689 if (!snap->name)
2690 goto err;
2692 snap->id = snap_id;
2693 snap->size = snap_size;
2694 snap->features = snap_features;
2696 return snap;
2698 err:
2699 kfree(snap->name);
2700 kfree(snap);
2702 return ERR_PTR(ret);
2705 static char *rbd_dev_v1_snap_info(struct rbd_device *rbd_dev, u32 which,
2706 u64 *snap_size, u64 *snap_features)
2708 char *snap_name;
2710 rbd_assert(which < rbd_dev->header.snapc->num_snaps);
2712 *snap_size = rbd_dev->header.snap_sizes[which];
2713 *snap_features = 0; /* No features for v1 */
2715 /* Skip over names until we find the one we are looking for */
2717 snap_name = rbd_dev->header.snap_names;
2718 while (which--)
2719 snap_name += strlen(snap_name) + 1;
2721 return snap_name;
2725 * Get the size and object order for an image snapshot, or if
2726 * snap_id is CEPH_NOSNAP, gets this information for the base
2727 * image.
2729 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
2730 u8 *order, u64 *snap_size)
2732 __le64 snapid = cpu_to_le64(snap_id);
2733 int ret;
2734 struct {
2735 u8 order;
2736 __le64 size;
2737 } __attribute__ ((packed)) size_buf = { 0 };
2739 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2740 "rbd", "get_size",
2741 (char *) &snapid, sizeof (snapid),
2742 (char *) &size_buf, sizeof (size_buf), NULL);
2743 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2744 if (ret < 0)
2745 return ret;
2747 *order = size_buf.order;
2748 *snap_size = le64_to_cpu(size_buf.size);
2750 dout(" snap_id 0x%016llx order = %u, snap_size = %llu\n",
2751 (unsigned long long) snap_id, (unsigned int) *order,
2752 (unsigned long long) *snap_size);
2754 return 0;
2757 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
2759 return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
2760 &rbd_dev->header.obj_order,
2761 &rbd_dev->header.image_size);
2764 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
2766 void *reply_buf;
2767 int ret;
2768 void *p;
2770 reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
2771 if (!reply_buf)
2772 return -ENOMEM;
2774 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2775 "rbd", "get_object_prefix",
2776 NULL, 0,
2777 reply_buf, RBD_OBJ_PREFIX_LEN_MAX, NULL);
2778 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2779 if (ret < 0)
2780 goto out;
2781 ret = 0; /* rbd_req_sync_exec() can return positive */
2783 p = reply_buf;
2784 rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
2785 p + RBD_OBJ_PREFIX_LEN_MAX,
2786 NULL, GFP_NOIO);
2788 if (IS_ERR(rbd_dev->header.object_prefix)) {
2789 ret = PTR_ERR(rbd_dev->header.object_prefix);
2790 rbd_dev->header.object_prefix = NULL;
2791 } else {
2792 dout(" object_prefix = %s\n", rbd_dev->header.object_prefix);
2795 out:
2796 kfree(reply_buf);
2798 return ret;
2801 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
2802 u64 *snap_features)
2804 __le64 snapid = cpu_to_le64(snap_id);
2805 struct {
2806 __le64 features;
2807 __le64 incompat;
2808 } features_buf = { 0 };
2809 u64 incompat;
2810 int ret;
2812 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2813 "rbd", "get_features",
2814 (char *) &snapid, sizeof (snapid),
2815 (char *) &features_buf, sizeof (features_buf),
2816 NULL);
2817 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2818 if (ret < 0)
2819 return ret;
2821 incompat = le64_to_cpu(features_buf.incompat);
2822 if (incompat & ~RBD_FEATURES_ALL)
2823 return -ENXIO;
2825 *snap_features = le64_to_cpu(features_buf.features);
2827 dout(" snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
2828 (unsigned long long) snap_id,
2829 (unsigned long long) *snap_features,
2830 (unsigned long long) le64_to_cpu(features_buf.incompat));
2832 return 0;
2835 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
2837 return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
2838 &rbd_dev->header.features);
2841 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
2843 struct rbd_spec *parent_spec;
2844 size_t size;
2845 void *reply_buf = NULL;
2846 __le64 snapid;
2847 void *p;
2848 void *end;
2849 char *image_id;
2850 u64 overlap;
2851 int ret;
2853 parent_spec = rbd_spec_alloc();
2854 if (!parent_spec)
2855 return -ENOMEM;
2857 size = sizeof (__le64) + /* pool_id */
2858 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX + /* image_id */
2859 sizeof (__le64) + /* snap_id */
2860 sizeof (__le64); /* overlap */
2861 reply_buf = kmalloc(size, GFP_KERNEL);
2862 if (!reply_buf) {
2863 ret = -ENOMEM;
2864 goto out_err;
2867 snapid = cpu_to_le64(CEPH_NOSNAP);
2868 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2869 "rbd", "get_parent",
2870 (char *) &snapid, sizeof (snapid),
2871 (char *) reply_buf, size, NULL);
2872 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2873 if (ret < 0)
2874 goto out_err;
2876 ret = -ERANGE;
2877 p = reply_buf;
2878 end = (char *) reply_buf + size;
2879 ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err);
2880 if (parent_spec->pool_id == CEPH_NOPOOL)
2881 goto out; /* No parent? No problem. */
2883 /* The ceph file layout needs to fit pool id in 32 bits */
2885 ret = -EIO;
2886 if (WARN_ON(parent_spec->pool_id > (u64) U32_MAX))
2887 goto out;
2889 image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
2890 if (IS_ERR(image_id)) {
2891 ret = PTR_ERR(image_id);
2892 goto out_err;
2894 parent_spec->image_id = image_id;
2895 ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
2896 ceph_decode_64_safe(&p, end, overlap, out_err);
2898 rbd_dev->parent_overlap = overlap;
2899 rbd_dev->parent_spec = parent_spec;
2900 parent_spec = NULL; /* rbd_dev now owns this */
2901 out:
2902 ret = 0;
2903 out_err:
2904 kfree(reply_buf);
2905 rbd_spec_put(parent_spec);
2907 return ret;
2910 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
2912 size_t image_id_size;
2913 char *image_id;
2914 void *p;
2915 void *end;
2916 size_t size;
2917 void *reply_buf = NULL;
2918 size_t len = 0;
2919 char *image_name = NULL;
2920 int ret;
2922 rbd_assert(!rbd_dev->spec->image_name);
2924 len = strlen(rbd_dev->spec->image_id);
2925 image_id_size = sizeof (__le32) + len;
2926 image_id = kmalloc(image_id_size, GFP_KERNEL);
2927 if (!image_id)
2928 return NULL;
2930 p = image_id;
2931 end = (char *) image_id + image_id_size;
2932 ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32) len);
2934 size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
2935 reply_buf = kmalloc(size, GFP_KERNEL);
2936 if (!reply_buf)
2937 goto out;
2939 ret = rbd_req_sync_exec(rbd_dev, RBD_DIRECTORY,
2940 "rbd", "dir_get_name",
2941 image_id, image_id_size,
2942 (char *) reply_buf, size, NULL);
2943 if (ret < 0)
2944 goto out;
2945 p = reply_buf;
2946 end = (char *) reply_buf + size;
2947 image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
2948 if (IS_ERR(image_name))
2949 image_name = NULL;
2950 else
2951 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
2952 out:
2953 kfree(reply_buf);
2954 kfree(image_id);
2956 return image_name;
2960 * When a parent image gets probed, we only have the pool, image,
2961 * and snapshot ids but not the names of any of them. This call
2962 * is made later to fill in those names. It has to be done after
2963 * rbd_dev_snaps_update() has completed because some of the
2964 * information (in particular, snapshot name) is not available
2965 * until then.
2967 static int rbd_dev_probe_update_spec(struct rbd_device *rbd_dev)
2969 struct ceph_osd_client *osdc;
2970 const char *name;
2971 void *reply_buf = NULL;
2972 int ret;
2974 if (rbd_dev->spec->pool_name)
2975 return 0; /* Already have the names */
2977 /* Look up the pool name */
2979 osdc = &rbd_dev->rbd_client->client->osdc;
2980 name = ceph_pg_pool_name_by_id(osdc->osdmap, rbd_dev->spec->pool_id);
2981 if (!name) {
2982 rbd_warn(rbd_dev, "there is no pool with id %llu",
2983 rbd_dev->spec->pool_id); /* Really a BUG() */
2984 return -EIO;
2987 rbd_dev->spec->pool_name = kstrdup(name, GFP_KERNEL);
2988 if (!rbd_dev->spec->pool_name)
2989 return -ENOMEM;
2991 /* Fetch the image name; tolerate failure here */
2993 name = rbd_dev_image_name(rbd_dev);
2994 if (name)
2995 rbd_dev->spec->image_name = (char *) name;
2996 else
2997 rbd_warn(rbd_dev, "unable to get image name");
2999 /* Look up the snapshot name. */
3001 name = rbd_snap_name(rbd_dev, rbd_dev->spec->snap_id);
3002 if (!name) {
3003 rbd_warn(rbd_dev, "no snapshot with id %llu",
3004 rbd_dev->spec->snap_id); /* Really a BUG() */
3005 ret = -EIO;
3006 goto out_err;
3008 rbd_dev->spec->snap_name = kstrdup(name, GFP_KERNEL);
3009 if(!rbd_dev->spec->snap_name)
3010 goto out_err;
3012 return 0;
3013 out_err:
3014 kfree(reply_buf);
3015 kfree(rbd_dev->spec->pool_name);
3016 rbd_dev->spec->pool_name = NULL;
3018 return ret;
3021 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver)
3023 size_t size;
3024 int ret;
3025 void *reply_buf;
3026 void *p;
3027 void *end;
3028 u64 seq;
3029 u32 snap_count;
3030 struct ceph_snap_context *snapc;
3031 u32 i;
3034 * We'll need room for the seq value (maximum snapshot id),
3035 * snapshot count, and array of that many snapshot ids.
3036 * For now we have a fixed upper limit on the number we're
3037 * prepared to receive.
3039 size = sizeof (__le64) + sizeof (__le32) +
3040 RBD_MAX_SNAP_COUNT * sizeof (__le64);
3041 reply_buf = kzalloc(size, GFP_KERNEL);
3042 if (!reply_buf)
3043 return -ENOMEM;
3045 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
3046 "rbd", "get_snapcontext",
3047 NULL, 0,
3048 reply_buf, size, ver);
3049 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
3050 if (ret < 0)
3051 goto out;
3053 ret = -ERANGE;
3054 p = reply_buf;
3055 end = (char *) reply_buf + size;
3056 ceph_decode_64_safe(&p, end, seq, out);
3057 ceph_decode_32_safe(&p, end, snap_count, out);
3060 * Make sure the reported number of snapshot ids wouldn't go
3061 * beyond the end of our buffer. But before checking that,
3062 * make sure the computed size of the snapshot context we
3063 * allocate is representable in a size_t.
3065 if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
3066 / sizeof (u64)) {
3067 ret = -EINVAL;
3068 goto out;
3070 if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
3071 goto out;
3073 size = sizeof (struct ceph_snap_context) +
3074 snap_count * sizeof (snapc->snaps[0]);
3075 snapc = kmalloc(size, GFP_KERNEL);
3076 if (!snapc) {
3077 ret = -ENOMEM;
3078 goto out;
3081 atomic_set(&snapc->nref, 1);
3082 snapc->seq = seq;
3083 snapc->num_snaps = snap_count;
3084 for (i = 0; i < snap_count; i++)
3085 snapc->snaps[i] = ceph_decode_64(&p);
3087 rbd_dev->header.snapc = snapc;
3089 dout(" snap context seq = %llu, snap_count = %u\n",
3090 (unsigned long long) seq, (unsigned int) snap_count);
3092 out:
3093 kfree(reply_buf);
3095 return 0;
3098 static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
3100 size_t size;
3101 void *reply_buf;
3102 __le64 snap_id;
3103 int ret;
3104 void *p;
3105 void *end;
3106 char *snap_name;
3108 size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
3109 reply_buf = kmalloc(size, GFP_KERNEL);
3110 if (!reply_buf)
3111 return ERR_PTR(-ENOMEM);
3113 snap_id = cpu_to_le64(rbd_dev->header.snapc->snaps[which]);
3114 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
3115 "rbd", "get_snapshot_name",
3116 (char *) &snap_id, sizeof (snap_id),
3117 reply_buf, size, NULL);
3118 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
3119 if (ret < 0)
3120 goto out;
3122 p = reply_buf;
3123 end = (char *) reply_buf + size;
3124 snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3125 if (IS_ERR(snap_name)) {
3126 ret = PTR_ERR(snap_name);
3127 goto out;
3128 } else {
3129 dout(" snap_id 0x%016llx snap_name = %s\n",
3130 (unsigned long long) le64_to_cpu(snap_id), snap_name);
3132 kfree(reply_buf);
3134 return snap_name;
3135 out:
3136 kfree(reply_buf);
3138 return ERR_PTR(ret);
3141 static char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which,
3142 u64 *snap_size, u64 *snap_features)
3144 u64 snap_id;
3145 u8 order;
3146 int ret;
3148 snap_id = rbd_dev->header.snapc->snaps[which];
3149 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, &order, snap_size);
3150 if (ret)
3151 return ERR_PTR(ret);
3152 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, snap_features);
3153 if (ret)
3154 return ERR_PTR(ret);
3156 return rbd_dev_v2_snap_name(rbd_dev, which);
3159 static char *rbd_dev_snap_info(struct rbd_device *rbd_dev, u32 which,
3160 u64 *snap_size, u64 *snap_features)
3162 if (rbd_dev->image_format == 1)
3163 return rbd_dev_v1_snap_info(rbd_dev, which,
3164 snap_size, snap_features);
3165 if (rbd_dev->image_format == 2)
3166 return rbd_dev_v2_snap_info(rbd_dev, which,
3167 snap_size, snap_features);
3168 return ERR_PTR(-EINVAL);
3171 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver)
3173 int ret;
3174 __u8 obj_order;
3176 down_write(&rbd_dev->header_rwsem);
3178 /* Grab old order first, to see if it changes */
3180 obj_order = rbd_dev->header.obj_order,
3181 ret = rbd_dev_v2_image_size(rbd_dev);
3182 if (ret)
3183 goto out;
3184 if (rbd_dev->header.obj_order != obj_order) {
3185 ret = -EIO;
3186 goto out;
3188 rbd_update_mapping_size(rbd_dev);
3190 ret = rbd_dev_v2_snap_context(rbd_dev, hver);
3191 dout("rbd_dev_v2_snap_context returned %d\n", ret);
3192 if (ret)
3193 goto out;
3194 ret = rbd_dev_snaps_update(rbd_dev);
3195 dout("rbd_dev_snaps_update returned %d\n", ret);
3196 if (ret)
3197 goto out;
3198 ret = rbd_dev_snaps_register(rbd_dev);
3199 dout("rbd_dev_snaps_register returned %d\n", ret);
3200 out:
3201 up_write(&rbd_dev->header_rwsem);
3203 return ret;
3207 * Scan the rbd device's current snapshot list and compare it to the
3208 * newly-received snapshot context. Remove any existing snapshots
3209 * not present in the new snapshot context. Add a new snapshot for
3210 * any snaphots in the snapshot context not in the current list.
3211 * And verify there are no changes to snapshots we already know
3212 * about.
3214 * Assumes the snapshots in the snapshot context are sorted by
3215 * snapshot id, highest id first. (Snapshots in the rbd_dev's list
3216 * are also maintained in that order.)
3218 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
3220 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
3221 const u32 snap_count = snapc->num_snaps;
3222 struct list_head *head = &rbd_dev->snaps;
3223 struct list_head *links = head->next;
3224 u32 index = 0;
3226 dout("%s: snap count is %u\n", __func__, (unsigned int) snap_count);
3227 while (index < snap_count || links != head) {
3228 u64 snap_id;
3229 struct rbd_snap *snap;
3230 char *snap_name;
3231 u64 snap_size = 0;
3232 u64 snap_features = 0;
3234 snap_id = index < snap_count ? snapc->snaps[index]
3235 : CEPH_NOSNAP;
3236 snap = links != head ? list_entry(links, struct rbd_snap, node)
3237 : NULL;
3238 rbd_assert(!snap || snap->id != CEPH_NOSNAP);
3240 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
3241 struct list_head *next = links->next;
3243 /* Existing snapshot not in the new snap context */
3245 if (rbd_dev->spec->snap_id == snap->id)
3246 atomic_set(&rbd_dev->exists, 0);
3247 rbd_remove_snap_dev(snap);
3248 dout("%ssnap id %llu has been removed\n",
3249 rbd_dev->spec->snap_id == snap->id ?
3250 "mapped " : "",
3251 (unsigned long long) snap->id);
3253 /* Done with this list entry; advance */
3255 links = next;
3256 continue;
3259 snap_name = rbd_dev_snap_info(rbd_dev, index,
3260 &snap_size, &snap_features);
3261 if (IS_ERR(snap_name))
3262 return PTR_ERR(snap_name);
3264 dout("entry %u: snap_id = %llu\n", (unsigned int) snap_count,
3265 (unsigned long long) snap_id);
3266 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
3267 struct rbd_snap *new_snap;
3269 /* We haven't seen this snapshot before */
3271 new_snap = __rbd_add_snap_dev(rbd_dev, snap_name,
3272 snap_id, snap_size, snap_features);
3273 if (IS_ERR(new_snap)) {
3274 int err = PTR_ERR(new_snap);
3276 dout(" failed to add dev, error %d\n", err);
3278 return err;
3281 /* New goes before existing, or at end of list */
3283 dout(" added dev%s\n", snap ? "" : " at end\n");
3284 if (snap)
3285 list_add_tail(&new_snap->node, &snap->node);
3286 else
3287 list_add_tail(&new_snap->node, head);
3288 } else {
3289 /* Already have this one */
3291 dout(" already present\n");
3293 rbd_assert(snap->size == snap_size);
3294 rbd_assert(!strcmp(snap->name, snap_name));
3295 rbd_assert(snap->features == snap_features);
3297 /* Done with this list entry; advance */
3299 links = links->next;
3302 /* Advance to the next entry in the snapshot context */
3304 index++;
3306 dout("%s: done\n", __func__);
3308 return 0;
3312 * Scan the list of snapshots and register the devices for any that
3313 * have not already been registered.
3315 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev)
3317 struct rbd_snap *snap;
3318 int ret = 0;
3320 dout("%s called\n", __func__);
3321 if (WARN_ON(!device_is_registered(&rbd_dev->dev)))
3322 return -EIO;
3324 list_for_each_entry(snap, &rbd_dev->snaps, node) {
3325 if (!rbd_snap_registered(snap)) {
3326 ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
3327 if (ret < 0)
3328 break;
3331 dout("%s: returning %d\n", __func__, ret);
3333 return ret;
3336 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
3338 struct device *dev;
3339 int ret;
3341 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3343 dev = &rbd_dev->dev;
3344 dev->bus = &rbd_bus_type;
3345 dev->type = &rbd_device_type;
3346 dev->parent = &rbd_root_dev;
3347 dev->release = rbd_dev_release;
3348 dev_set_name(dev, "%d", rbd_dev->dev_id);
3349 ret = device_register(dev);
3351 mutex_unlock(&ctl_mutex);
3353 return ret;
3356 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
3358 device_unregister(&rbd_dev->dev);
3361 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
3364 * Get a unique rbd identifier for the given new rbd_dev, and add
3365 * the rbd_dev to the global list. The minimum rbd id is 1.
3367 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
3369 rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
3371 spin_lock(&rbd_dev_list_lock);
3372 list_add_tail(&rbd_dev->node, &rbd_dev_list);
3373 spin_unlock(&rbd_dev_list_lock);
3374 dout("rbd_dev %p given dev id %llu\n", rbd_dev,
3375 (unsigned long long) rbd_dev->dev_id);
3379 * Remove an rbd_dev from the global list, and record that its
3380 * identifier is no longer in use.
3382 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
3384 struct list_head *tmp;
3385 int rbd_id = rbd_dev->dev_id;
3386 int max_id;
3388 rbd_assert(rbd_id > 0);
3390 dout("rbd_dev %p released dev id %llu\n", rbd_dev,
3391 (unsigned long long) rbd_dev->dev_id);
3392 spin_lock(&rbd_dev_list_lock);
3393 list_del_init(&rbd_dev->node);
3396 * If the id being "put" is not the current maximum, there
3397 * is nothing special we need to do.
3399 if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
3400 spin_unlock(&rbd_dev_list_lock);
3401 return;
3405 * We need to update the current maximum id. Search the
3406 * list to find out what it is. We're more likely to find
3407 * the maximum at the end, so search the list backward.
3409 max_id = 0;
3410 list_for_each_prev(tmp, &rbd_dev_list) {
3411 struct rbd_device *rbd_dev;
3413 rbd_dev = list_entry(tmp, struct rbd_device, node);
3414 if (rbd_dev->dev_id > max_id)
3415 max_id = rbd_dev->dev_id;
3417 spin_unlock(&rbd_dev_list_lock);
3420 * The max id could have been updated by rbd_dev_id_get(), in
3421 * which case it now accurately reflects the new maximum.
3422 * Be careful not to overwrite the maximum value in that
3423 * case.
3425 atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
3426 dout(" max dev id has been reset\n");
3430 * Skips over white space at *buf, and updates *buf to point to the
3431 * first found non-space character (if any). Returns the length of
3432 * the token (string of non-white space characters) found. Note
3433 * that *buf must be terminated with '\0'.
3435 static inline size_t next_token(const char **buf)
3438 * These are the characters that produce nonzero for
3439 * isspace() in the "C" and "POSIX" locales.
3441 const char *spaces = " \f\n\r\t\v";
3443 *buf += strspn(*buf, spaces); /* Find start of token */
3445 return strcspn(*buf, spaces); /* Return token length */
3449 * Finds the next token in *buf, and if the provided token buffer is
3450 * big enough, copies the found token into it. The result, if
3451 * copied, is guaranteed to be terminated with '\0'. Note that *buf
3452 * must be terminated with '\0' on entry.
3454 * Returns the length of the token found (not including the '\0').
3455 * Return value will be 0 if no token is found, and it will be >=
3456 * token_size if the token would not fit.
3458 * The *buf pointer will be updated to point beyond the end of the
3459 * found token. Note that this occurs even if the token buffer is
3460 * too small to hold it.
3462 static inline size_t copy_token(const char **buf,
3463 char *token,
3464 size_t token_size)
3466 size_t len;
3468 len = next_token(buf);
3469 if (len < token_size) {
3470 memcpy(token, *buf, len);
3471 *(token + len) = '\0';
3473 *buf += len;
3475 return len;
3479 * Finds the next token in *buf, dynamically allocates a buffer big
3480 * enough to hold a copy of it, and copies the token into the new
3481 * buffer. The copy is guaranteed to be terminated with '\0'. Note
3482 * that a duplicate buffer is created even for a zero-length token.
3484 * Returns a pointer to the newly-allocated duplicate, or a null
3485 * pointer if memory for the duplicate was not available. If
3486 * the lenp argument is a non-null pointer, the length of the token
3487 * (not including the '\0') is returned in *lenp.
3489 * If successful, the *buf pointer will be updated to point beyond
3490 * the end of the found token.
3492 * Note: uses GFP_KERNEL for allocation.
3494 static inline char *dup_token(const char **buf, size_t *lenp)
3496 char *dup;
3497 size_t len;
3499 len = next_token(buf);
3500 dup = kmemdup(*buf, len + 1, GFP_KERNEL);
3501 if (!dup)
3502 return NULL;
3503 *(dup + len) = '\0';
3504 *buf += len;
3506 if (lenp)
3507 *lenp = len;
3509 return dup;
3513 * Parse the options provided for an "rbd add" (i.e., rbd image
3514 * mapping) request. These arrive via a write to /sys/bus/rbd/add,
3515 * and the data written is passed here via a NUL-terminated buffer.
3516 * Returns 0 if successful or an error code otherwise.
3518 * The information extracted from these options is recorded in
3519 * the other parameters which return dynamically-allocated
3520 * structures:
3521 * ceph_opts
3522 * The address of a pointer that will refer to a ceph options
3523 * structure. Caller must release the returned pointer using
3524 * ceph_destroy_options() when it is no longer needed.
3525 * rbd_opts
3526 * Address of an rbd options pointer. Fully initialized by
3527 * this function; caller must release with kfree().
3528 * spec
3529 * Address of an rbd image specification pointer. Fully
3530 * initialized by this function based on parsed options.
3531 * Caller must release with rbd_spec_put().
3533 * The options passed take this form:
3534 * <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
3535 * where:
3536 * <mon_addrs>
3537 * A comma-separated list of one or more monitor addresses.
3538 * A monitor address is an ip address, optionally followed
3539 * by a port number (separated by a colon).
3540 * I.e.: ip1[:port1][,ip2[:port2]...]
3541 * <options>
3542 * A comma-separated list of ceph and/or rbd options.
3543 * <pool_name>
3544 * The name of the rados pool containing the rbd image.
3545 * <image_name>
3546 * The name of the image in that pool to map.
3547 * <snap_id>
3548 * An optional snapshot id. If provided, the mapping will
3549 * present data from the image at the time that snapshot was
3550 * created. The image head is used if no snapshot id is
3551 * provided. Snapshot mappings are always read-only.
3553 static int rbd_add_parse_args(const char *buf,
3554 struct ceph_options **ceph_opts,
3555 struct rbd_options **opts,
3556 struct rbd_spec **rbd_spec)
3558 size_t len;
3559 char *options;
3560 const char *mon_addrs;
3561 size_t mon_addrs_size;
3562 struct rbd_spec *spec = NULL;
3563 struct rbd_options *rbd_opts = NULL;
3564 struct ceph_options *copts;
3565 int ret;
3567 /* The first four tokens are required */
3569 len = next_token(&buf);
3570 if (!len) {
3571 rbd_warn(NULL, "no monitor address(es) provided");
3572 return -EINVAL;
3574 mon_addrs = buf;
3575 mon_addrs_size = len + 1;
3576 buf += len;
3578 ret = -EINVAL;
3579 options = dup_token(&buf, NULL);
3580 if (!options)
3581 return -ENOMEM;
3582 if (!*options) {
3583 rbd_warn(NULL, "no options provided");
3584 goto out_err;
3587 spec = rbd_spec_alloc();
3588 if (!spec)
3589 goto out_mem;
3591 spec->pool_name = dup_token(&buf, NULL);
3592 if (!spec->pool_name)
3593 goto out_mem;
3594 if (!*spec->pool_name) {
3595 rbd_warn(NULL, "no pool name provided");
3596 goto out_err;
3599 spec->image_name = dup_token(&buf, NULL);
3600 if (!spec->image_name)
3601 goto out_mem;
3602 if (!*spec->image_name) {
3603 rbd_warn(NULL, "no image name provided");
3604 goto out_err;
3608 * Snapshot name is optional; default is to use "-"
3609 * (indicating the head/no snapshot).
3611 len = next_token(&buf);
3612 if (!len) {
3613 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
3614 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
3615 } else if (len > RBD_MAX_SNAP_NAME_LEN) {
3616 ret = -ENAMETOOLONG;
3617 goto out_err;
3619 spec->snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
3620 if (!spec->snap_name)
3621 goto out_mem;
3622 *(spec->snap_name + len) = '\0';
3624 /* Initialize all rbd options to the defaults */
3626 rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
3627 if (!rbd_opts)
3628 goto out_mem;
3630 rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
3632 copts = ceph_parse_options(options, mon_addrs,
3633 mon_addrs + mon_addrs_size - 1,
3634 parse_rbd_opts_token, rbd_opts);
3635 if (IS_ERR(copts)) {
3636 ret = PTR_ERR(copts);
3637 goto out_err;
3639 kfree(options);
3641 *ceph_opts = copts;
3642 *opts = rbd_opts;
3643 *rbd_spec = spec;
3645 return 0;
3646 out_mem:
3647 ret = -ENOMEM;
3648 out_err:
3649 kfree(rbd_opts);
3650 rbd_spec_put(spec);
3651 kfree(options);
3653 return ret;
3657 * An rbd format 2 image has a unique identifier, distinct from the
3658 * name given to it by the user. Internally, that identifier is
3659 * what's used to specify the names of objects related to the image.
3661 * A special "rbd id" object is used to map an rbd image name to its
3662 * id. If that object doesn't exist, then there is no v2 rbd image
3663 * with the supplied name.
3665 * This function will record the given rbd_dev's image_id field if
3666 * it can be determined, and in that case will return 0. If any
3667 * errors occur a negative errno will be returned and the rbd_dev's
3668 * image_id field will be unchanged (and should be NULL).
3670 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
3672 int ret;
3673 size_t size;
3674 char *object_name;
3675 void *response;
3676 void *p;
3679 * When probing a parent image, the image id is already
3680 * known (and the image name likely is not). There's no
3681 * need to fetch the image id again in this case.
3683 if (rbd_dev->spec->image_id)
3684 return 0;
3687 * First, see if the format 2 image id file exists, and if
3688 * so, get the image's persistent id from it.
3690 size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
3691 object_name = kmalloc(size, GFP_NOIO);
3692 if (!object_name)
3693 return -ENOMEM;
3694 sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
3695 dout("rbd id object name is %s\n", object_name);
3697 /* Response will be an encoded string, which includes a length */
3699 size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
3700 response = kzalloc(size, GFP_NOIO);
3701 if (!response) {
3702 ret = -ENOMEM;
3703 goto out;
3706 ret = rbd_req_sync_exec(rbd_dev, object_name,
3707 "rbd", "get_id",
3708 NULL, 0,
3709 response, RBD_IMAGE_ID_LEN_MAX, NULL);
3710 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
3711 if (ret < 0)
3712 goto out;
3713 ret = 0; /* rbd_req_sync_exec() can return positive */
3715 p = response;
3716 rbd_dev->spec->image_id = ceph_extract_encoded_string(&p,
3717 p + RBD_IMAGE_ID_LEN_MAX,
3718 NULL, GFP_NOIO);
3719 if (IS_ERR(rbd_dev->spec->image_id)) {
3720 ret = PTR_ERR(rbd_dev->spec->image_id);
3721 rbd_dev->spec->image_id = NULL;
3722 } else {
3723 dout("image_id is %s\n", rbd_dev->spec->image_id);
3725 out:
3726 kfree(response);
3727 kfree(object_name);
3729 return ret;
3732 static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
3734 int ret;
3735 size_t size;
3737 /* Version 1 images have no id; empty string is used */
3739 rbd_dev->spec->image_id = kstrdup("", GFP_KERNEL);
3740 if (!rbd_dev->spec->image_id)
3741 return -ENOMEM;
3743 /* Record the header object name for this rbd image. */
3745 size = strlen(rbd_dev->spec->image_name) + sizeof (RBD_SUFFIX);
3746 rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
3747 if (!rbd_dev->header_name) {
3748 ret = -ENOMEM;
3749 goto out_err;
3751 sprintf(rbd_dev->header_name, "%s%s",
3752 rbd_dev->spec->image_name, RBD_SUFFIX);
3754 /* Populate rbd image metadata */
3756 ret = rbd_read_header(rbd_dev, &rbd_dev->header);
3757 if (ret < 0)
3758 goto out_err;
3760 /* Version 1 images have no parent (no layering) */
3762 rbd_dev->parent_spec = NULL;
3763 rbd_dev->parent_overlap = 0;
3765 rbd_dev->image_format = 1;
3767 dout("discovered version 1 image, header name is %s\n",
3768 rbd_dev->header_name);
3770 return 0;
3772 out_err:
3773 kfree(rbd_dev->header_name);
3774 rbd_dev->header_name = NULL;
3775 kfree(rbd_dev->spec->image_id);
3776 rbd_dev->spec->image_id = NULL;
3778 return ret;
3781 static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
3783 size_t size;
3784 int ret;
3785 u64 ver = 0;
3788 * Image id was filled in by the caller. Record the header
3789 * object name for this rbd image.
3791 size = sizeof (RBD_HEADER_PREFIX) + strlen(rbd_dev->spec->image_id);
3792 rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
3793 if (!rbd_dev->header_name)
3794 return -ENOMEM;
3795 sprintf(rbd_dev->header_name, "%s%s",
3796 RBD_HEADER_PREFIX, rbd_dev->spec->image_id);
3798 /* Get the size and object order for the image */
3800 ret = rbd_dev_v2_image_size(rbd_dev);
3801 if (ret < 0)
3802 goto out_err;
3804 /* Get the object prefix (a.k.a. block_name) for the image */
3806 ret = rbd_dev_v2_object_prefix(rbd_dev);
3807 if (ret < 0)
3808 goto out_err;
3810 /* Get the and check features for the image */
3812 ret = rbd_dev_v2_features(rbd_dev);
3813 if (ret < 0)
3814 goto out_err;
3816 /* If the image supports layering, get the parent info */
3818 if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
3819 ret = rbd_dev_v2_parent_info(rbd_dev);
3820 if (ret < 0)
3821 goto out_err;
3824 /* crypto and compression type aren't (yet) supported for v2 images */
3826 rbd_dev->header.crypt_type = 0;
3827 rbd_dev->header.comp_type = 0;
3829 /* Get the snapshot context, plus the header version */
3831 ret = rbd_dev_v2_snap_context(rbd_dev, &ver);
3832 if (ret)
3833 goto out_err;
3834 rbd_dev->header.obj_version = ver;
3836 rbd_dev->image_format = 2;
3838 dout("discovered version 2 image, header name is %s\n",
3839 rbd_dev->header_name);
3841 return 0;
3842 out_err:
3843 rbd_dev->parent_overlap = 0;
3844 rbd_spec_put(rbd_dev->parent_spec);
3845 rbd_dev->parent_spec = NULL;
3846 kfree(rbd_dev->header_name);
3847 rbd_dev->header_name = NULL;
3848 kfree(rbd_dev->header.object_prefix);
3849 rbd_dev->header.object_prefix = NULL;
3851 return ret;
3854 static int rbd_dev_probe_finish(struct rbd_device *rbd_dev)
3856 int ret;
3858 /* no need to lock here, as rbd_dev is not registered yet */
3859 ret = rbd_dev_snaps_update(rbd_dev);
3860 if (ret)
3861 return ret;
3863 ret = rbd_dev_probe_update_spec(rbd_dev);
3864 if (ret)
3865 goto err_out_snaps;
3867 ret = rbd_dev_set_mapping(rbd_dev);
3868 if (ret)
3869 goto err_out_snaps;
3871 /* generate unique id: find highest unique id, add one */
3872 rbd_dev_id_get(rbd_dev);
3874 /* Fill in the device name, now that we have its id. */
3875 BUILD_BUG_ON(DEV_NAME_LEN
3876 < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
3877 sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
3879 /* Get our block major device number. */
3881 ret = register_blkdev(0, rbd_dev->name);
3882 if (ret < 0)
3883 goto err_out_id;
3884 rbd_dev->major = ret;
3886 /* Set up the blkdev mapping. */
3888 ret = rbd_init_disk(rbd_dev);
3889 if (ret)
3890 goto err_out_blkdev;
3892 ret = rbd_bus_add_dev(rbd_dev);
3893 if (ret)
3894 goto err_out_disk;
3897 * At this point cleanup in the event of an error is the job
3898 * of the sysfs code (initiated by rbd_bus_del_dev()).
3900 down_write(&rbd_dev->header_rwsem);
3901 ret = rbd_dev_snaps_register(rbd_dev);
3902 up_write(&rbd_dev->header_rwsem);
3903 if (ret)
3904 goto err_out_bus;
3906 ret = rbd_req_sync_watch(rbd_dev, 1);
3907 if (ret)
3908 goto err_out_bus;
3910 /* Everything's ready. Announce the disk to the world. */
3912 add_disk(rbd_dev->disk);
3914 pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
3915 (unsigned long long) rbd_dev->mapping.size);
3917 return ret;
3918 err_out_bus:
3919 /* this will also clean up rest of rbd_dev stuff */
3921 rbd_bus_del_dev(rbd_dev);
3923 return ret;
3924 err_out_disk:
3925 rbd_free_disk(rbd_dev);
3926 err_out_blkdev:
3927 unregister_blkdev(rbd_dev->major, rbd_dev->name);
3928 err_out_id:
3929 rbd_dev_id_put(rbd_dev);
3930 err_out_snaps:
3931 rbd_remove_all_snaps(rbd_dev);
3933 return ret;
3937 * Probe for the existence of the header object for the given rbd
3938 * device. For format 2 images this includes determining the image
3939 * id.
3941 static int rbd_dev_probe(struct rbd_device *rbd_dev)
3943 int ret;
3946 * Get the id from the image id object. If it's not a
3947 * format 2 image, we'll get ENOENT back, and we'll assume
3948 * it's a format 1 image.
3950 ret = rbd_dev_image_id(rbd_dev);
3951 if (ret)
3952 ret = rbd_dev_v1_probe(rbd_dev);
3953 else
3954 ret = rbd_dev_v2_probe(rbd_dev);
3955 if (ret) {
3956 dout("probe failed, returning %d\n", ret);
3958 return ret;
3961 ret = rbd_dev_probe_finish(rbd_dev);
3962 if (ret)
3963 rbd_header_free(&rbd_dev->header);
3965 return ret;
3968 static ssize_t rbd_add(struct bus_type *bus,
3969 const char *buf,
3970 size_t count)
3972 struct rbd_device *rbd_dev = NULL;
3973 struct ceph_options *ceph_opts = NULL;
3974 struct rbd_options *rbd_opts = NULL;
3975 struct rbd_spec *spec = NULL;
3976 struct rbd_client *rbdc;
3977 struct ceph_osd_client *osdc;
3978 int rc = -ENOMEM;
3980 if (!try_module_get(THIS_MODULE))
3981 return -ENODEV;
3983 /* parse add command */
3984 rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
3985 if (rc < 0)
3986 goto err_out_module;
3988 rbdc = rbd_get_client(ceph_opts);
3989 if (IS_ERR(rbdc)) {
3990 rc = PTR_ERR(rbdc);
3991 goto err_out_args;
3993 ceph_opts = NULL; /* rbd_dev client now owns this */
3995 /* pick the pool */
3996 osdc = &rbdc->client->osdc;
3997 rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
3998 if (rc < 0)
3999 goto err_out_client;
4000 spec->pool_id = (u64) rc;
4002 /* The ceph file layout needs to fit pool id in 32 bits */
4004 if (WARN_ON(spec->pool_id > (u64) U32_MAX)) {
4005 rc = -EIO;
4006 goto err_out_client;
4009 rbd_dev = rbd_dev_create(rbdc, spec);
4010 if (!rbd_dev)
4011 goto err_out_client;
4012 rbdc = NULL; /* rbd_dev now owns this */
4013 spec = NULL; /* rbd_dev now owns this */
4015 rbd_dev->mapping.read_only = rbd_opts->read_only;
4016 kfree(rbd_opts);
4017 rbd_opts = NULL; /* done with this */
4019 rc = rbd_dev_probe(rbd_dev);
4020 if (rc < 0)
4021 goto err_out_rbd_dev;
4023 return count;
4024 err_out_rbd_dev:
4025 rbd_dev_destroy(rbd_dev);
4026 err_out_client:
4027 rbd_put_client(rbdc);
4028 err_out_args:
4029 if (ceph_opts)
4030 ceph_destroy_options(ceph_opts);
4031 kfree(rbd_opts);
4032 rbd_spec_put(spec);
4033 err_out_module:
4034 module_put(THIS_MODULE);
4036 dout("Error adding device %s\n", buf);
4038 return (ssize_t) rc;
4041 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
4043 struct list_head *tmp;
4044 struct rbd_device *rbd_dev;
4046 spin_lock(&rbd_dev_list_lock);
4047 list_for_each(tmp, &rbd_dev_list) {
4048 rbd_dev = list_entry(tmp, struct rbd_device, node);
4049 if (rbd_dev->dev_id == dev_id) {
4050 spin_unlock(&rbd_dev_list_lock);
4051 return rbd_dev;
4054 spin_unlock(&rbd_dev_list_lock);
4055 return NULL;
4058 static void rbd_dev_release(struct device *dev)
4060 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4062 if (rbd_dev->watch_request) {
4063 struct ceph_client *client = rbd_dev->rbd_client->client;
4065 ceph_osdc_unregister_linger_request(&client->osdc,
4066 rbd_dev->watch_request);
4068 if (rbd_dev->watch_event)
4069 rbd_req_sync_watch(rbd_dev, 0);
4071 /* clean up and free blkdev */
4072 rbd_free_disk(rbd_dev);
4073 unregister_blkdev(rbd_dev->major, rbd_dev->name);
4075 /* release allocated disk header fields */
4076 rbd_header_free(&rbd_dev->header);
4078 /* done with the id, and with the rbd_dev */
4079 rbd_dev_id_put(rbd_dev);
4080 rbd_assert(rbd_dev->rbd_client != NULL);
4081 rbd_dev_destroy(rbd_dev);
4083 /* release module ref */
4084 module_put(THIS_MODULE);
4087 static ssize_t rbd_remove(struct bus_type *bus,
4088 const char *buf,
4089 size_t count)
4091 struct rbd_device *rbd_dev = NULL;
4092 int target_id, rc;
4093 unsigned long ul;
4094 int ret = count;
4096 rc = strict_strtoul(buf, 10, &ul);
4097 if (rc)
4098 return rc;
4100 /* convert to int; abort if we lost anything in the conversion */
4101 target_id = (int) ul;
4102 if (target_id != ul)
4103 return -EINVAL;
4105 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4107 rbd_dev = __rbd_get_dev(target_id);
4108 if (!rbd_dev) {
4109 ret = -ENOENT;
4110 goto done;
4113 if (rbd_dev->open_count) {
4114 ret = -EBUSY;
4115 goto done;
4118 rbd_remove_all_snaps(rbd_dev);
4119 rbd_bus_del_dev(rbd_dev);
4121 done:
4122 mutex_unlock(&ctl_mutex);
4124 return ret;
4128 * create control files in sysfs
4129 * /sys/bus/rbd/...
4131 static int rbd_sysfs_init(void)
4133 int ret;
4135 ret = device_register(&rbd_root_dev);
4136 if (ret < 0)
4137 return ret;
4139 ret = bus_register(&rbd_bus_type);
4140 if (ret < 0)
4141 device_unregister(&rbd_root_dev);
4143 return ret;
4146 static void rbd_sysfs_cleanup(void)
4148 bus_unregister(&rbd_bus_type);
4149 device_unregister(&rbd_root_dev);
4152 int __init rbd_init(void)
4154 int rc;
4156 rc = rbd_sysfs_init();
4157 if (rc)
4158 return rc;
4159 pr_info("loaded " RBD_DRV_NAME_LONG "\n");
4160 return 0;
4163 void __exit rbd_exit(void)
4165 rbd_sysfs_cleanup();
4168 module_init(rbd_init);
4169 module_exit(rbd_exit);
4171 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
4172 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
4173 MODULE_DESCRIPTION("rados block device");
4175 /* following authorship retained from original osdblk.c */
4176 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
4178 MODULE_LICENSE("GPL");