tuntap: switch to use rtnl_dereference()
[linux-2.6/linux-acpi-2.6/ibm-acpi-2.6.git] / drivers / block / rbd.c
blob89576a0b3f2ed5b099ecf7ce675104d282b0dd48
1 /*
2 rbd.c -- Export ceph rados objects as a Linux block device
5 based on drivers/block/osdblk.c:
7 Copyright 2009 Red Hat, Inc.
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation.
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
18 You should have received a copy of the GNU General Public License
19 along with this program; see the file COPYING. If not, write to
20 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
24 For usage instructions, please refer to:
26 Documentation/ABI/testing/sysfs-bus-rbd
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
39 #include <linux/fs.h>
40 #include <linux/blkdev.h>
42 #include "rbd_types.h"
44 #define RBD_DEBUG /* Activate rbd_assert() calls */
47 * The basic unit of block I/O is a sector. It is interpreted in a
48 * number of contexts in Linux (blk, bio, genhd), but the default is
49 * universally 512 bytes. These symbols are just slightly more
50 * meaningful than the bare numbers they represent.
52 #define SECTOR_SHIFT 9
53 #define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
55 /* It might be useful to have this defined elsewhere too */
57 #define U64_MAX ((u64) (~0ULL))
59 #define RBD_DRV_NAME "rbd"
60 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
62 #define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
64 #define RBD_SNAP_DEV_NAME_PREFIX "snap_"
65 #define RBD_MAX_SNAP_NAME_LEN \
66 (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
68 #define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */
69 #define RBD_MAX_OPT_LEN 1024
71 #define RBD_SNAP_HEAD_NAME "-"
73 /* This allows a single page to hold an image name sent by OSD */
74 #define RBD_IMAGE_NAME_LEN_MAX (PAGE_SIZE - sizeof (__le32) - 1)
75 #define RBD_IMAGE_ID_LEN_MAX 64
77 #define RBD_OBJ_PREFIX_LEN_MAX 64
79 /* Feature bits */
81 #define RBD_FEATURE_LAYERING 1
83 /* Features supported by this (client software) implementation. */
85 #define RBD_FEATURES_ALL (0)
88 * An RBD device name will be "rbd#", where the "rbd" comes from
89 * RBD_DRV_NAME above, and # is a unique integer identifier.
90 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
91 * enough to hold all possible device names.
93 #define DEV_NAME_LEN 32
94 #define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
96 #define RBD_READ_ONLY_DEFAULT false
99 * block device image metadata (in-memory version)
101 struct rbd_image_header {
102 /* These four fields never change for a given rbd image */
103 char *object_prefix;
104 u64 features;
105 __u8 obj_order;
106 __u8 crypt_type;
107 __u8 comp_type;
109 /* The remaining fields need to be updated occasionally */
110 u64 image_size;
111 struct ceph_snap_context *snapc;
112 char *snap_names;
113 u64 *snap_sizes;
115 u64 obj_version;
119 * An rbd image specification.
121 * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
122 * identify an image.
124 struct rbd_spec {
125 u64 pool_id;
126 char *pool_name;
128 char *image_id;
129 size_t image_id_len;
130 char *image_name;
131 size_t image_name_len;
133 u64 snap_id;
134 char *snap_name;
136 struct kref kref;
139 struct rbd_options {
140 bool read_only;
144 * an instance of the client. multiple devices may share an rbd client.
146 struct rbd_client {
147 struct ceph_client *client;
148 struct kref kref;
149 struct list_head node;
153 * a request completion status
155 struct rbd_req_status {
156 int done;
157 int rc;
158 u64 bytes;
162 * a collection of requests
164 struct rbd_req_coll {
165 int total;
166 int num_done;
167 struct kref kref;
168 struct rbd_req_status status[0];
172 * a single io request
174 struct rbd_request {
175 struct request *rq; /* blk layer request */
176 struct bio *bio; /* cloned bio */
177 struct page **pages; /* list of used pages */
178 u64 len;
179 int coll_index;
180 struct rbd_req_coll *coll;
183 struct rbd_snap {
184 struct device dev;
185 const char *name;
186 u64 size;
187 struct list_head node;
188 u64 id;
189 u64 features;
192 struct rbd_mapping {
193 u64 size;
194 u64 features;
195 bool read_only;
199 * a single device
201 struct rbd_device {
202 int dev_id; /* blkdev unique id */
204 int major; /* blkdev assigned major */
205 struct gendisk *disk; /* blkdev's gendisk and rq */
207 u32 image_format; /* Either 1 or 2 */
208 struct rbd_client *rbd_client;
210 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
212 spinlock_t lock; /* queue lock */
214 struct rbd_image_header header;
215 bool exists;
216 struct rbd_spec *spec;
218 char *header_name;
220 struct ceph_osd_event *watch_event;
221 struct ceph_osd_request *watch_request;
223 struct rbd_spec *parent_spec;
224 u64 parent_overlap;
226 /* protects updating the header */
227 struct rw_semaphore header_rwsem;
229 struct rbd_mapping mapping;
231 struct list_head node;
233 /* list of snapshots */
234 struct list_head snaps;
236 /* sysfs related */
237 struct device dev;
238 unsigned long open_count;
241 static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
243 static LIST_HEAD(rbd_dev_list); /* devices */
244 static DEFINE_SPINLOCK(rbd_dev_list_lock);
246 static LIST_HEAD(rbd_client_list); /* clients */
247 static DEFINE_SPINLOCK(rbd_client_list_lock);
249 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
250 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev);
252 static void rbd_dev_release(struct device *dev);
253 static void rbd_remove_snap_dev(struct rbd_snap *snap);
255 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
256 size_t count);
257 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
258 size_t count);
260 static struct bus_attribute rbd_bus_attrs[] = {
261 __ATTR(add, S_IWUSR, NULL, rbd_add),
262 __ATTR(remove, S_IWUSR, NULL, rbd_remove),
263 __ATTR_NULL
266 static struct bus_type rbd_bus_type = {
267 .name = "rbd",
268 .bus_attrs = rbd_bus_attrs,
271 static void rbd_root_dev_release(struct device *dev)
275 static struct device rbd_root_dev = {
276 .init_name = "rbd",
277 .release = rbd_root_dev_release,
280 #ifdef RBD_DEBUG
281 #define rbd_assert(expr) \
282 if (unlikely(!(expr))) { \
283 printk(KERN_ERR "\nAssertion failure in %s() " \
284 "at line %d:\n\n" \
285 "\trbd_assert(%s);\n\n", \
286 __func__, __LINE__, #expr); \
287 BUG(); \
289 #else /* !RBD_DEBUG */
290 # define rbd_assert(expr) ((void) 0)
291 #endif /* !RBD_DEBUG */
293 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver);
294 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver);
296 static int rbd_open(struct block_device *bdev, fmode_t mode)
298 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
300 if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
301 return -EROFS;
303 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
304 (void) get_device(&rbd_dev->dev);
305 set_device_ro(bdev, rbd_dev->mapping.read_only);
306 rbd_dev->open_count++;
307 mutex_unlock(&ctl_mutex);
309 return 0;
312 static int rbd_release(struct gendisk *disk, fmode_t mode)
314 struct rbd_device *rbd_dev = disk->private_data;
316 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
317 rbd_assert(rbd_dev->open_count > 0);
318 rbd_dev->open_count--;
319 put_device(&rbd_dev->dev);
320 mutex_unlock(&ctl_mutex);
322 return 0;
325 static const struct block_device_operations rbd_bd_ops = {
326 .owner = THIS_MODULE,
327 .open = rbd_open,
328 .release = rbd_release,
332 * Initialize an rbd client instance.
333 * We own *ceph_opts.
335 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
337 struct rbd_client *rbdc;
338 int ret = -ENOMEM;
340 dout("rbd_client_create\n");
341 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
342 if (!rbdc)
343 goto out_opt;
345 kref_init(&rbdc->kref);
346 INIT_LIST_HEAD(&rbdc->node);
348 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
350 rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
351 if (IS_ERR(rbdc->client))
352 goto out_mutex;
353 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
355 ret = ceph_open_session(rbdc->client);
356 if (ret < 0)
357 goto out_err;
359 spin_lock(&rbd_client_list_lock);
360 list_add_tail(&rbdc->node, &rbd_client_list);
361 spin_unlock(&rbd_client_list_lock);
363 mutex_unlock(&ctl_mutex);
365 dout("rbd_client_create created %p\n", rbdc);
366 return rbdc;
368 out_err:
369 ceph_destroy_client(rbdc->client);
370 out_mutex:
371 mutex_unlock(&ctl_mutex);
372 kfree(rbdc);
373 out_opt:
374 if (ceph_opts)
375 ceph_destroy_options(ceph_opts);
376 return ERR_PTR(ret);
380 * Find a ceph client with specific addr and configuration. If
381 * found, bump its reference count.
383 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
385 struct rbd_client *client_node;
386 bool found = false;
388 if (ceph_opts->flags & CEPH_OPT_NOSHARE)
389 return NULL;
391 spin_lock(&rbd_client_list_lock);
392 list_for_each_entry(client_node, &rbd_client_list, node) {
393 if (!ceph_compare_options(ceph_opts, client_node->client)) {
394 kref_get(&client_node->kref);
395 found = true;
396 break;
399 spin_unlock(&rbd_client_list_lock);
401 return found ? client_node : NULL;
405 * mount options
407 enum {
408 Opt_last_int,
409 /* int args above */
410 Opt_last_string,
411 /* string args above */
412 Opt_read_only,
413 Opt_read_write,
414 /* Boolean args above */
415 Opt_last_bool,
418 static match_table_t rbd_opts_tokens = {
419 /* int args above */
420 /* string args above */
421 {Opt_read_only, "read_only"},
422 {Opt_read_only, "ro"}, /* Alternate spelling */
423 {Opt_read_write, "read_write"},
424 {Opt_read_write, "rw"}, /* Alternate spelling */
425 /* Boolean args above */
426 {-1, NULL}
429 static int parse_rbd_opts_token(char *c, void *private)
431 struct rbd_options *rbd_opts = private;
432 substring_t argstr[MAX_OPT_ARGS];
433 int token, intval, ret;
435 token = match_token(c, rbd_opts_tokens, argstr);
436 if (token < 0)
437 return -EINVAL;
439 if (token < Opt_last_int) {
440 ret = match_int(&argstr[0], &intval);
441 if (ret < 0) {
442 pr_err("bad mount option arg (not int) "
443 "at '%s'\n", c);
444 return ret;
446 dout("got int token %d val %d\n", token, intval);
447 } else if (token > Opt_last_int && token < Opt_last_string) {
448 dout("got string token %d val %s\n", token,
449 argstr[0].from);
450 } else if (token > Opt_last_string && token < Opt_last_bool) {
451 dout("got Boolean token %d\n", token);
452 } else {
453 dout("got token %d\n", token);
456 switch (token) {
457 case Opt_read_only:
458 rbd_opts->read_only = true;
459 break;
460 case Opt_read_write:
461 rbd_opts->read_only = false;
462 break;
463 default:
464 rbd_assert(false);
465 break;
467 return 0;
471 * Get a ceph client with specific addr and configuration, if one does
472 * not exist create it.
474 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
476 struct rbd_client *rbdc;
478 rbdc = rbd_client_find(ceph_opts);
479 if (rbdc) /* using an existing client */
480 ceph_destroy_options(ceph_opts);
481 else
482 rbdc = rbd_client_create(ceph_opts);
484 return rbdc;
488 * Destroy ceph client
490 * Caller must hold rbd_client_list_lock.
492 static void rbd_client_release(struct kref *kref)
494 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
496 dout("rbd_release_client %p\n", rbdc);
497 spin_lock(&rbd_client_list_lock);
498 list_del(&rbdc->node);
499 spin_unlock(&rbd_client_list_lock);
501 ceph_destroy_client(rbdc->client);
502 kfree(rbdc);
506 * Drop reference to ceph client node. If it's not referenced anymore, release
507 * it.
509 static void rbd_put_client(struct rbd_client *rbdc)
511 if (rbdc)
512 kref_put(&rbdc->kref, rbd_client_release);
516 * Destroy requests collection
518 static void rbd_coll_release(struct kref *kref)
520 struct rbd_req_coll *coll =
521 container_of(kref, struct rbd_req_coll, kref);
523 dout("rbd_coll_release %p\n", coll);
524 kfree(coll);
527 static bool rbd_image_format_valid(u32 image_format)
529 return image_format == 1 || image_format == 2;
532 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
534 size_t size;
535 u32 snap_count;
537 /* The header has to start with the magic rbd header text */
538 if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
539 return false;
541 /* The bio layer requires at least sector-sized I/O */
543 if (ondisk->options.order < SECTOR_SHIFT)
544 return false;
546 /* If we use u64 in a few spots we may be able to loosen this */
548 if (ondisk->options.order > 8 * sizeof (int) - 1)
549 return false;
552 * The size of a snapshot header has to fit in a size_t, and
553 * that limits the number of snapshots.
555 snap_count = le32_to_cpu(ondisk->snap_count);
556 size = SIZE_MAX - sizeof (struct ceph_snap_context);
557 if (snap_count > size / sizeof (__le64))
558 return false;
561 * Not only that, but the size of the entire the snapshot
562 * header must also be representable in a size_t.
564 size -= snap_count * sizeof (__le64);
565 if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
566 return false;
568 return true;
572 * Create a new header structure, translate header format from the on-disk
573 * header.
575 static int rbd_header_from_disk(struct rbd_image_header *header,
576 struct rbd_image_header_ondisk *ondisk)
578 u32 snap_count;
579 size_t len;
580 size_t size;
581 u32 i;
583 memset(header, 0, sizeof (*header));
585 snap_count = le32_to_cpu(ondisk->snap_count);
587 len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
588 header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
589 if (!header->object_prefix)
590 return -ENOMEM;
591 memcpy(header->object_prefix, ondisk->object_prefix, len);
592 header->object_prefix[len] = '\0';
594 if (snap_count) {
595 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
597 /* Save a copy of the snapshot names */
599 if (snap_names_len > (u64) SIZE_MAX)
600 return -EIO;
601 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
602 if (!header->snap_names)
603 goto out_err;
605 * Note that rbd_dev_v1_header_read() guarantees
606 * the ondisk buffer we're working with has
607 * snap_names_len bytes beyond the end of the
608 * snapshot id array, this memcpy() is safe.
610 memcpy(header->snap_names, &ondisk->snaps[snap_count],
611 snap_names_len);
613 /* Record each snapshot's size */
615 size = snap_count * sizeof (*header->snap_sizes);
616 header->snap_sizes = kmalloc(size, GFP_KERNEL);
617 if (!header->snap_sizes)
618 goto out_err;
619 for (i = 0; i < snap_count; i++)
620 header->snap_sizes[i] =
621 le64_to_cpu(ondisk->snaps[i].image_size);
622 } else {
623 WARN_ON(ondisk->snap_names_len);
624 header->snap_names = NULL;
625 header->snap_sizes = NULL;
628 header->features = 0; /* No features support in v1 images */
629 header->obj_order = ondisk->options.order;
630 header->crypt_type = ondisk->options.crypt_type;
631 header->comp_type = ondisk->options.comp_type;
633 /* Allocate and fill in the snapshot context */
635 header->image_size = le64_to_cpu(ondisk->image_size);
636 size = sizeof (struct ceph_snap_context);
637 size += snap_count * sizeof (header->snapc->snaps[0]);
638 header->snapc = kzalloc(size, GFP_KERNEL);
639 if (!header->snapc)
640 goto out_err;
642 atomic_set(&header->snapc->nref, 1);
643 header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
644 header->snapc->num_snaps = snap_count;
645 for (i = 0; i < snap_count; i++)
646 header->snapc->snaps[i] =
647 le64_to_cpu(ondisk->snaps[i].id);
649 return 0;
651 out_err:
652 kfree(header->snap_sizes);
653 header->snap_sizes = NULL;
654 kfree(header->snap_names);
655 header->snap_names = NULL;
656 kfree(header->object_prefix);
657 header->object_prefix = NULL;
659 return -ENOMEM;
662 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
664 struct rbd_snap *snap;
666 if (snap_id == CEPH_NOSNAP)
667 return RBD_SNAP_HEAD_NAME;
669 list_for_each_entry(snap, &rbd_dev->snaps, node)
670 if (snap_id == snap->id)
671 return snap->name;
673 return NULL;
676 static int snap_by_name(struct rbd_device *rbd_dev, const char *snap_name)
679 struct rbd_snap *snap;
681 list_for_each_entry(snap, &rbd_dev->snaps, node) {
682 if (!strcmp(snap_name, snap->name)) {
683 rbd_dev->spec->snap_id = snap->id;
684 rbd_dev->mapping.size = snap->size;
685 rbd_dev->mapping.features = snap->features;
687 return 0;
691 return -ENOENT;
694 static int rbd_dev_set_mapping(struct rbd_device *rbd_dev)
696 int ret;
698 if (!memcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME,
699 sizeof (RBD_SNAP_HEAD_NAME))) {
700 rbd_dev->spec->snap_id = CEPH_NOSNAP;
701 rbd_dev->mapping.size = rbd_dev->header.image_size;
702 rbd_dev->mapping.features = rbd_dev->header.features;
703 ret = 0;
704 } else {
705 ret = snap_by_name(rbd_dev, rbd_dev->spec->snap_name);
706 if (ret < 0)
707 goto done;
708 rbd_dev->mapping.read_only = true;
710 rbd_dev->exists = true;
711 done:
712 return ret;
715 static void rbd_header_free(struct rbd_image_header *header)
717 kfree(header->object_prefix);
718 header->object_prefix = NULL;
719 kfree(header->snap_sizes);
720 header->snap_sizes = NULL;
721 kfree(header->snap_names);
722 header->snap_names = NULL;
723 ceph_put_snap_context(header->snapc);
724 header->snapc = NULL;
727 static char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
729 char *name;
730 u64 segment;
731 int ret;
733 name = kmalloc(MAX_OBJ_NAME_SIZE + 1, GFP_NOIO);
734 if (!name)
735 return NULL;
736 segment = offset >> rbd_dev->header.obj_order;
737 ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx",
738 rbd_dev->header.object_prefix, segment);
739 if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
740 pr_err("error formatting segment name for #%llu (%d)\n",
741 segment, ret);
742 kfree(name);
743 name = NULL;
746 return name;
749 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
751 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
753 return offset & (segment_size - 1);
756 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
757 u64 offset, u64 length)
759 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
761 offset &= segment_size - 1;
763 rbd_assert(length <= U64_MAX - offset);
764 if (offset + length > segment_size)
765 length = segment_size - offset;
767 return length;
770 static int rbd_get_num_segments(struct rbd_image_header *header,
771 u64 ofs, u64 len)
773 u64 start_seg;
774 u64 end_seg;
776 if (!len)
777 return 0;
778 if (len - 1 > U64_MAX - ofs)
779 return -ERANGE;
781 start_seg = ofs >> header->obj_order;
782 end_seg = (ofs + len - 1) >> header->obj_order;
784 return end_seg - start_seg + 1;
788 * returns the size of an object in the image
790 static u64 rbd_obj_bytes(struct rbd_image_header *header)
792 return 1 << header->obj_order;
796 * bio helpers
799 static void bio_chain_put(struct bio *chain)
801 struct bio *tmp;
803 while (chain) {
804 tmp = chain;
805 chain = chain->bi_next;
806 bio_put(tmp);
811 * zeros a bio chain, starting at specific offset
813 static void zero_bio_chain(struct bio *chain, int start_ofs)
815 struct bio_vec *bv;
816 unsigned long flags;
817 void *buf;
818 int i;
819 int pos = 0;
821 while (chain) {
822 bio_for_each_segment(bv, chain, i) {
823 if (pos + bv->bv_len > start_ofs) {
824 int remainder = max(start_ofs - pos, 0);
825 buf = bvec_kmap_irq(bv, &flags);
826 memset(buf + remainder, 0,
827 bv->bv_len - remainder);
828 bvec_kunmap_irq(buf, &flags);
830 pos += bv->bv_len;
833 chain = chain->bi_next;
838 * Clone a portion of a bio, starting at the given byte offset
839 * and continuing for the number of bytes indicated.
841 static struct bio *bio_clone_range(struct bio *bio_src,
842 unsigned int offset,
843 unsigned int len,
844 gfp_t gfpmask)
846 struct bio_vec *bv;
847 unsigned int resid;
848 unsigned short idx;
849 unsigned int voff;
850 unsigned short end_idx;
851 unsigned short vcnt;
852 struct bio *bio;
854 /* Handle the easy case for the caller */
856 if (!offset && len == bio_src->bi_size)
857 return bio_clone(bio_src, gfpmask);
859 if (WARN_ON_ONCE(!len))
860 return NULL;
861 if (WARN_ON_ONCE(len > bio_src->bi_size))
862 return NULL;
863 if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
864 return NULL;
866 /* Find first affected segment... */
868 resid = offset;
869 __bio_for_each_segment(bv, bio_src, idx, 0) {
870 if (resid < bv->bv_len)
871 break;
872 resid -= bv->bv_len;
874 voff = resid;
876 /* ...and the last affected segment */
878 resid += len;
879 __bio_for_each_segment(bv, bio_src, end_idx, idx) {
880 if (resid <= bv->bv_len)
881 break;
882 resid -= bv->bv_len;
884 vcnt = end_idx - idx + 1;
886 /* Build the clone */
888 bio = bio_alloc(gfpmask, (unsigned int) vcnt);
889 if (!bio)
890 return NULL; /* ENOMEM */
892 bio->bi_bdev = bio_src->bi_bdev;
893 bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
894 bio->bi_rw = bio_src->bi_rw;
895 bio->bi_flags |= 1 << BIO_CLONED;
898 * Copy over our part of the bio_vec, then update the first
899 * and last (or only) entries.
901 memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
902 vcnt * sizeof (struct bio_vec));
903 bio->bi_io_vec[0].bv_offset += voff;
904 if (vcnt > 1) {
905 bio->bi_io_vec[0].bv_len -= voff;
906 bio->bi_io_vec[vcnt - 1].bv_len = resid;
907 } else {
908 bio->bi_io_vec[0].bv_len = len;
911 bio->bi_vcnt = vcnt;
912 bio->bi_size = len;
913 bio->bi_idx = 0;
915 return bio;
919 * Clone a portion of a bio chain, starting at the given byte offset
920 * into the first bio in the source chain and continuing for the
921 * number of bytes indicated. The result is another bio chain of
922 * exactly the given length, or a null pointer on error.
924 * The bio_src and offset parameters are both in-out. On entry they
925 * refer to the first source bio and the offset into that bio where
926 * the start of data to be cloned is located.
928 * On return, bio_src is updated to refer to the bio in the source
929 * chain that contains first un-cloned byte, and *offset will
930 * contain the offset of that byte within that bio.
932 static struct bio *bio_chain_clone_range(struct bio **bio_src,
933 unsigned int *offset,
934 unsigned int len,
935 gfp_t gfpmask)
937 struct bio *bi = *bio_src;
938 unsigned int off = *offset;
939 struct bio *chain = NULL;
940 struct bio **end;
942 /* Build up a chain of clone bios up to the limit */
944 if (!bi || off >= bi->bi_size || !len)
945 return NULL; /* Nothing to clone */
947 end = &chain;
948 while (len) {
949 unsigned int bi_size;
950 struct bio *bio;
952 if (!bi)
953 goto out_err; /* EINVAL; ran out of bio's */
954 bi_size = min_t(unsigned int, bi->bi_size - off, len);
955 bio = bio_clone_range(bi, off, bi_size, gfpmask);
956 if (!bio)
957 goto out_err; /* ENOMEM */
959 *end = bio;
960 end = &bio->bi_next;
962 off += bi_size;
963 if (off == bi->bi_size) {
964 bi = bi->bi_next;
965 off = 0;
967 len -= bi_size;
969 *bio_src = bi;
970 *offset = off;
972 return chain;
973 out_err:
974 bio_chain_put(chain);
976 return NULL;
980 * helpers for osd request op vectors.
982 static struct ceph_osd_req_op *rbd_create_rw_ops(int num_ops,
983 int opcode, u32 payload_len)
985 struct ceph_osd_req_op *ops;
987 ops = kzalloc(sizeof (*ops) * (num_ops + 1), GFP_NOIO);
988 if (!ops)
989 return NULL;
991 ops[0].op = opcode;
994 * op extent offset and length will be set later on
995 * in calc_raw_layout()
997 ops[0].payload_len = payload_len;
999 return ops;
1002 static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
1004 kfree(ops);
1007 static void rbd_coll_end_req_index(struct request *rq,
1008 struct rbd_req_coll *coll,
1009 int index,
1010 int ret, u64 len)
1012 struct request_queue *q;
1013 int min, max, i;
1015 dout("rbd_coll_end_req_index %p index %d ret %d len %llu\n",
1016 coll, index, ret, (unsigned long long) len);
1018 if (!rq)
1019 return;
1021 if (!coll) {
1022 blk_end_request(rq, ret, len);
1023 return;
1026 q = rq->q;
1028 spin_lock_irq(q->queue_lock);
1029 coll->status[index].done = 1;
1030 coll->status[index].rc = ret;
1031 coll->status[index].bytes = len;
1032 max = min = coll->num_done;
1033 while (max < coll->total && coll->status[max].done)
1034 max++;
1036 for (i = min; i<max; i++) {
1037 __blk_end_request(rq, coll->status[i].rc,
1038 coll->status[i].bytes);
1039 coll->num_done++;
1040 kref_put(&coll->kref, rbd_coll_release);
1042 spin_unlock_irq(q->queue_lock);
1045 static void rbd_coll_end_req(struct rbd_request *req,
1046 int ret, u64 len)
1048 rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
1052 * Send ceph osd request
1054 static int rbd_do_request(struct request *rq,
1055 struct rbd_device *rbd_dev,
1056 struct ceph_snap_context *snapc,
1057 u64 snapid,
1058 const char *object_name, u64 ofs, u64 len,
1059 struct bio *bio,
1060 struct page **pages,
1061 int num_pages,
1062 int flags,
1063 struct ceph_osd_req_op *ops,
1064 struct rbd_req_coll *coll,
1065 int coll_index,
1066 void (*rbd_cb)(struct ceph_osd_request *req,
1067 struct ceph_msg *msg),
1068 struct ceph_osd_request **linger_req,
1069 u64 *ver)
1071 struct ceph_osd_request *req;
1072 struct ceph_file_layout *layout;
1073 int ret;
1074 u64 bno;
1075 struct timespec mtime = CURRENT_TIME;
1076 struct rbd_request *req_data;
1077 struct ceph_osd_request_head *reqhead;
1078 struct ceph_osd_client *osdc;
1080 req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
1081 if (!req_data) {
1082 if (coll)
1083 rbd_coll_end_req_index(rq, coll, coll_index,
1084 -ENOMEM, len);
1085 return -ENOMEM;
1088 if (coll) {
1089 req_data->coll = coll;
1090 req_data->coll_index = coll_index;
1093 dout("rbd_do_request object_name=%s ofs=%llu len=%llu coll=%p[%d]\n",
1094 object_name, (unsigned long long) ofs,
1095 (unsigned long long) len, coll, coll_index);
1097 osdc = &rbd_dev->rbd_client->client->osdc;
1098 req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
1099 false, GFP_NOIO, pages, bio);
1100 if (!req) {
1101 ret = -ENOMEM;
1102 goto done_pages;
1105 req->r_callback = rbd_cb;
1107 req_data->rq = rq;
1108 req_data->bio = bio;
1109 req_data->pages = pages;
1110 req_data->len = len;
1112 req->r_priv = req_data;
1114 reqhead = req->r_request->front.iov_base;
1115 reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
1117 strncpy(req->r_oid, object_name, sizeof(req->r_oid));
1118 req->r_oid_len = strlen(req->r_oid);
1120 layout = &req->r_file_layout;
1121 memset(layout, 0, sizeof(*layout));
1122 layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
1123 layout->fl_stripe_count = cpu_to_le32(1);
1124 layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
1125 layout->fl_pg_pool = cpu_to_le32((int) rbd_dev->spec->pool_id);
1126 ret = ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
1127 req, ops);
1128 rbd_assert(ret == 0);
1130 ceph_osdc_build_request(req, ofs, &len,
1131 ops,
1132 snapc,
1133 &mtime,
1134 req->r_oid, req->r_oid_len);
1136 if (linger_req) {
1137 ceph_osdc_set_request_linger(osdc, req);
1138 *linger_req = req;
1141 ret = ceph_osdc_start_request(osdc, req, false);
1142 if (ret < 0)
1143 goto done_err;
1145 if (!rbd_cb) {
1146 ret = ceph_osdc_wait_request(osdc, req);
1147 if (ver)
1148 *ver = le64_to_cpu(req->r_reassert_version.version);
1149 dout("reassert_ver=%llu\n",
1150 (unsigned long long)
1151 le64_to_cpu(req->r_reassert_version.version));
1152 ceph_osdc_put_request(req);
1154 return ret;
1156 done_err:
1157 bio_chain_put(req_data->bio);
1158 ceph_osdc_put_request(req);
1159 done_pages:
1160 rbd_coll_end_req(req_data, ret, len);
1161 kfree(req_data);
1162 return ret;
1166 * Ceph osd op callback
1168 static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1170 struct rbd_request *req_data = req->r_priv;
1171 struct ceph_osd_reply_head *replyhead;
1172 struct ceph_osd_op *op;
1173 __s32 rc;
1174 u64 bytes;
1175 int read_op;
1177 /* parse reply */
1178 replyhead = msg->front.iov_base;
1179 WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
1180 op = (void *)(replyhead + 1);
1181 rc = le32_to_cpu(replyhead->result);
1182 bytes = le64_to_cpu(op->extent.length);
1183 read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ);
1185 dout("rbd_req_cb bytes=%llu readop=%d rc=%d\n",
1186 (unsigned long long) bytes, read_op, (int) rc);
1188 if (rc == -ENOENT && read_op) {
1189 zero_bio_chain(req_data->bio, 0);
1190 rc = 0;
1191 } else if (rc == 0 && read_op && bytes < req_data->len) {
1192 zero_bio_chain(req_data->bio, bytes);
1193 bytes = req_data->len;
1196 rbd_coll_end_req(req_data, rc, bytes);
1198 if (req_data->bio)
1199 bio_chain_put(req_data->bio);
1201 ceph_osdc_put_request(req);
1202 kfree(req_data);
1205 static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1207 ceph_osdc_put_request(req);
1211 * Do a synchronous ceph osd operation
1213 static int rbd_req_sync_op(struct rbd_device *rbd_dev,
1214 struct ceph_snap_context *snapc,
1215 u64 snapid,
1216 int flags,
1217 struct ceph_osd_req_op *ops,
1218 const char *object_name,
1219 u64 ofs, u64 inbound_size,
1220 char *inbound,
1221 struct ceph_osd_request **linger_req,
1222 u64 *ver)
1224 int ret;
1225 struct page **pages;
1226 int num_pages;
1228 rbd_assert(ops != NULL);
1230 num_pages = calc_pages_for(ofs, inbound_size);
1231 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1232 if (IS_ERR(pages))
1233 return PTR_ERR(pages);
1235 ret = rbd_do_request(NULL, rbd_dev, snapc, snapid,
1236 object_name, ofs, inbound_size, NULL,
1237 pages, num_pages,
1238 flags,
1239 ops,
1240 NULL, 0,
1241 NULL,
1242 linger_req, ver);
1243 if (ret < 0)
1244 goto done;
1246 if ((flags & CEPH_OSD_FLAG_READ) && inbound)
1247 ret = ceph_copy_from_page_vector(pages, inbound, ofs, ret);
1249 done:
1250 ceph_release_page_vector(pages, num_pages);
1251 return ret;
1255 * Do an asynchronous ceph osd operation
1257 static int rbd_do_op(struct request *rq,
1258 struct rbd_device *rbd_dev,
1259 struct ceph_snap_context *snapc,
1260 u64 ofs, u64 len,
1261 struct bio *bio,
1262 struct rbd_req_coll *coll,
1263 int coll_index)
1265 char *seg_name;
1266 u64 seg_ofs;
1267 u64 seg_len;
1268 int ret;
1269 struct ceph_osd_req_op *ops;
1270 u32 payload_len;
1271 int opcode;
1272 int flags;
1273 u64 snapid;
1275 seg_name = rbd_segment_name(rbd_dev, ofs);
1276 if (!seg_name)
1277 return -ENOMEM;
1278 seg_len = rbd_segment_length(rbd_dev, ofs, len);
1279 seg_ofs = rbd_segment_offset(rbd_dev, ofs);
1281 if (rq_data_dir(rq) == WRITE) {
1282 opcode = CEPH_OSD_OP_WRITE;
1283 flags = CEPH_OSD_FLAG_WRITE|CEPH_OSD_FLAG_ONDISK;
1284 snapid = CEPH_NOSNAP;
1285 payload_len = seg_len;
1286 } else {
1287 opcode = CEPH_OSD_OP_READ;
1288 flags = CEPH_OSD_FLAG_READ;
1289 snapc = NULL;
1290 snapid = rbd_dev->spec->snap_id;
1291 payload_len = 0;
1294 ret = -ENOMEM;
1295 ops = rbd_create_rw_ops(1, opcode, payload_len);
1296 if (!ops)
1297 goto done;
1299 /* we've taken care of segment sizes earlier when we
1300 cloned the bios. We should never have a segment
1301 truncated at this point */
1302 rbd_assert(seg_len == len);
1304 ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1305 seg_name, seg_ofs, seg_len,
1306 bio,
1307 NULL, 0,
1308 flags,
1309 ops,
1310 coll, coll_index,
1311 rbd_req_cb, 0, NULL);
1313 rbd_destroy_ops(ops);
1314 done:
1315 kfree(seg_name);
1316 return ret;
1320 * Request sync osd read
1322 static int rbd_req_sync_read(struct rbd_device *rbd_dev,
1323 u64 snapid,
1324 const char *object_name,
1325 u64 ofs, u64 len,
1326 char *buf,
1327 u64 *ver)
1329 struct ceph_osd_req_op *ops;
1330 int ret;
1332 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_READ, 0);
1333 if (!ops)
1334 return -ENOMEM;
1336 ret = rbd_req_sync_op(rbd_dev, NULL,
1337 snapid,
1338 CEPH_OSD_FLAG_READ,
1339 ops, object_name, ofs, len, buf, NULL, ver);
1340 rbd_destroy_ops(ops);
1342 return ret;
1346 * Request sync osd watch
1348 static int rbd_req_sync_notify_ack(struct rbd_device *rbd_dev,
1349 u64 ver,
1350 u64 notify_id)
1352 struct ceph_osd_req_op *ops;
1353 int ret;
1355 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1356 if (!ops)
1357 return -ENOMEM;
1359 ops[0].watch.ver = cpu_to_le64(ver);
1360 ops[0].watch.cookie = notify_id;
1361 ops[0].watch.flag = 0;
1363 ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
1364 rbd_dev->header_name, 0, 0, NULL,
1365 NULL, 0,
1366 CEPH_OSD_FLAG_READ,
1367 ops,
1368 NULL, 0,
1369 rbd_simple_req_cb, 0, NULL);
1371 rbd_destroy_ops(ops);
1372 return ret;
1375 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1377 struct rbd_device *rbd_dev = (struct rbd_device *)data;
1378 u64 hver;
1379 int rc;
1381 if (!rbd_dev)
1382 return;
1384 dout("rbd_watch_cb %s notify_id=%llu opcode=%u\n",
1385 rbd_dev->header_name, (unsigned long long) notify_id,
1386 (unsigned int) opcode);
1387 rc = rbd_dev_refresh(rbd_dev, &hver);
1388 if (rc)
1389 pr_warning(RBD_DRV_NAME "%d got notification but failed to "
1390 " update snaps: %d\n", rbd_dev->major, rc);
1392 rbd_req_sync_notify_ack(rbd_dev, hver, notify_id);
1396 * Request sync osd watch
1398 static int rbd_req_sync_watch(struct rbd_device *rbd_dev)
1400 struct ceph_osd_req_op *ops;
1401 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1402 int ret;
1404 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1405 if (!ops)
1406 return -ENOMEM;
1408 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1409 (void *)rbd_dev, &rbd_dev->watch_event);
1410 if (ret < 0)
1411 goto fail;
1413 ops[0].watch.ver = cpu_to_le64(rbd_dev->header.obj_version);
1414 ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1415 ops[0].watch.flag = 1;
1417 ret = rbd_req_sync_op(rbd_dev, NULL,
1418 CEPH_NOSNAP,
1419 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1420 ops,
1421 rbd_dev->header_name,
1422 0, 0, NULL,
1423 &rbd_dev->watch_request, NULL);
1425 if (ret < 0)
1426 goto fail_event;
1428 rbd_destroy_ops(ops);
1429 return 0;
1431 fail_event:
1432 ceph_osdc_cancel_event(rbd_dev->watch_event);
1433 rbd_dev->watch_event = NULL;
1434 fail:
1435 rbd_destroy_ops(ops);
1436 return ret;
1440 * Request sync osd unwatch
1442 static int rbd_req_sync_unwatch(struct rbd_device *rbd_dev)
1444 struct ceph_osd_req_op *ops;
1445 int ret;
1447 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1448 if (!ops)
1449 return -ENOMEM;
1451 ops[0].watch.ver = 0;
1452 ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1453 ops[0].watch.flag = 0;
1455 ret = rbd_req_sync_op(rbd_dev, NULL,
1456 CEPH_NOSNAP,
1457 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1458 ops,
1459 rbd_dev->header_name,
1460 0, 0, NULL, NULL, NULL);
1463 rbd_destroy_ops(ops);
1464 ceph_osdc_cancel_event(rbd_dev->watch_event);
1465 rbd_dev->watch_event = NULL;
1466 return ret;
1470 * Synchronous osd object method call
1472 static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
1473 const char *object_name,
1474 const char *class_name,
1475 const char *method_name,
1476 const char *outbound,
1477 size_t outbound_size,
1478 char *inbound,
1479 size_t inbound_size,
1480 int flags,
1481 u64 *ver)
1483 struct ceph_osd_req_op *ops;
1484 int class_name_len = strlen(class_name);
1485 int method_name_len = strlen(method_name);
1486 int payload_size;
1487 int ret;
1490 * Any input parameters required by the method we're calling
1491 * will be sent along with the class and method names as
1492 * part of the message payload. That data and its size are
1493 * supplied via the indata and indata_len fields (named from
1494 * the perspective of the server side) in the OSD request
1495 * operation.
1497 payload_size = class_name_len + method_name_len + outbound_size;
1498 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_CALL, payload_size);
1499 if (!ops)
1500 return -ENOMEM;
1502 ops[0].cls.class_name = class_name;
1503 ops[0].cls.class_len = (__u8) class_name_len;
1504 ops[0].cls.method_name = method_name;
1505 ops[0].cls.method_len = (__u8) method_name_len;
1506 ops[0].cls.argc = 0;
1507 ops[0].cls.indata = outbound;
1508 ops[0].cls.indata_len = outbound_size;
1510 ret = rbd_req_sync_op(rbd_dev, NULL,
1511 CEPH_NOSNAP,
1512 flags, ops,
1513 object_name, 0, inbound_size, inbound,
1514 NULL, ver);
1516 rbd_destroy_ops(ops);
1518 dout("cls_exec returned %d\n", ret);
1519 return ret;
1522 static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1524 struct rbd_req_coll *coll =
1525 kzalloc(sizeof(struct rbd_req_coll) +
1526 sizeof(struct rbd_req_status) * num_reqs,
1527 GFP_ATOMIC);
1529 if (!coll)
1530 return NULL;
1531 coll->total = num_reqs;
1532 kref_init(&coll->kref);
1533 return coll;
1537 * block device queue callback
1539 static void rbd_rq_fn(struct request_queue *q)
1541 struct rbd_device *rbd_dev = q->queuedata;
1542 struct request *rq;
1544 while ((rq = blk_fetch_request(q))) {
1545 struct bio *bio;
1546 bool do_write;
1547 unsigned int size;
1548 u64 ofs;
1549 int num_segs, cur_seg = 0;
1550 struct rbd_req_coll *coll;
1551 struct ceph_snap_context *snapc;
1552 unsigned int bio_offset;
1554 dout("fetched request\n");
1556 /* filter out block requests we don't understand */
1557 if ((rq->cmd_type != REQ_TYPE_FS)) {
1558 __blk_end_request_all(rq, 0);
1559 continue;
1562 /* deduce our operation (read, write) */
1563 do_write = (rq_data_dir(rq) == WRITE);
1564 if (do_write && rbd_dev->mapping.read_only) {
1565 __blk_end_request_all(rq, -EROFS);
1566 continue;
1569 spin_unlock_irq(q->queue_lock);
1571 down_read(&rbd_dev->header_rwsem);
1573 if (!rbd_dev->exists) {
1574 rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
1575 up_read(&rbd_dev->header_rwsem);
1576 dout("request for non-existent snapshot");
1577 spin_lock_irq(q->queue_lock);
1578 __blk_end_request_all(rq, -ENXIO);
1579 continue;
1582 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1584 up_read(&rbd_dev->header_rwsem);
1586 size = blk_rq_bytes(rq);
1587 ofs = blk_rq_pos(rq) * SECTOR_SIZE;
1588 bio = rq->bio;
1590 dout("%s 0x%x bytes at 0x%llx\n",
1591 do_write ? "write" : "read",
1592 size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE);
1594 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1595 if (num_segs <= 0) {
1596 spin_lock_irq(q->queue_lock);
1597 __blk_end_request_all(rq, num_segs);
1598 ceph_put_snap_context(snapc);
1599 continue;
1601 coll = rbd_alloc_coll(num_segs);
1602 if (!coll) {
1603 spin_lock_irq(q->queue_lock);
1604 __blk_end_request_all(rq, -ENOMEM);
1605 ceph_put_snap_context(snapc);
1606 continue;
1609 bio_offset = 0;
1610 do {
1611 u64 limit = rbd_segment_length(rbd_dev, ofs, size);
1612 unsigned int chain_size;
1613 struct bio *bio_chain;
1615 BUG_ON(limit > (u64) UINT_MAX);
1616 chain_size = (unsigned int) limit;
1617 dout("rq->bio->bi_vcnt=%hu\n", rq->bio->bi_vcnt);
1619 kref_get(&coll->kref);
1621 /* Pass a cloned bio chain via an osd request */
1623 bio_chain = bio_chain_clone_range(&bio,
1624 &bio_offset, chain_size,
1625 GFP_ATOMIC);
1626 if (bio_chain)
1627 (void) rbd_do_op(rq, rbd_dev, snapc,
1628 ofs, chain_size,
1629 bio_chain, coll, cur_seg);
1630 else
1631 rbd_coll_end_req_index(rq, coll, cur_seg,
1632 -ENOMEM, chain_size);
1633 size -= chain_size;
1634 ofs += chain_size;
1636 cur_seg++;
1637 } while (size > 0);
1638 kref_put(&coll->kref, rbd_coll_release);
1640 spin_lock_irq(q->queue_lock);
1642 ceph_put_snap_context(snapc);
1647 * a queue callback. Makes sure that we don't create a bio that spans across
1648 * multiple osd objects. One exception would be with a single page bios,
1649 * which we handle later at bio_chain_clone_range()
1651 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1652 struct bio_vec *bvec)
1654 struct rbd_device *rbd_dev = q->queuedata;
1655 sector_t sector_offset;
1656 sector_t sectors_per_obj;
1657 sector_t obj_sector_offset;
1658 int ret;
1661 * Find how far into its rbd object the partition-relative
1662 * bio start sector is to offset relative to the enclosing
1663 * device.
1665 sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
1666 sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
1667 obj_sector_offset = sector_offset & (sectors_per_obj - 1);
1670 * Compute the number of bytes from that offset to the end
1671 * of the object. Account for what's already used by the bio.
1673 ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
1674 if (ret > bmd->bi_size)
1675 ret -= bmd->bi_size;
1676 else
1677 ret = 0;
1680 * Don't send back more than was asked for. And if the bio
1681 * was empty, let the whole thing through because: "Note
1682 * that a block device *must* allow a single page to be
1683 * added to an empty bio."
1685 rbd_assert(bvec->bv_len <= PAGE_SIZE);
1686 if (ret > (int) bvec->bv_len || !bmd->bi_size)
1687 ret = (int) bvec->bv_len;
1689 return ret;
1692 static void rbd_free_disk(struct rbd_device *rbd_dev)
1694 struct gendisk *disk = rbd_dev->disk;
1696 if (!disk)
1697 return;
1699 if (disk->flags & GENHD_FL_UP)
1700 del_gendisk(disk);
1701 if (disk->queue)
1702 blk_cleanup_queue(disk->queue);
1703 put_disk(disk);
1707 * Read the complete header for the given rbd device.
1709 * Returns a pointer to a dynamically-allocated buffer containing
1710 * the complete and validated header. Caller can pass the address
1711 * of a variable that will be filled in with the version of the
1712 * header object at the time it was read.
1714 * Returns a pointer-coded errno if a failure occurs.
1716 static struct rbd_image_header_ondisk *
1717 rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
1719 struct rbd_image_header_ondisk *ondisk = NULL;
1720 u32 snap_count = 0;
1721 u64 names_size = 0;
1722 u32 want_count;
1723 int ret;
1726 * The complete header will include an array of its 64-bit
1727 * snapshot ids, followed by the names of those snapshots as
1728 * a contiguous block of NUL-terminated strings. Note that
1729 * the number of snapshots could change by the time we read
1730 * it in, in which case we re-read it.
1732 do {
1733 size_t size;
1735 kfree(ondisk);
1737 size = sizeof (*ondisk);
1738 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
1739 size += names_size;
1740 ondisk = kmalloc(size, GFP_KERNEL);
1741 if (!ondisk)
1742 return ERR_PTR(-ENOMEM);
1744 ret = rbd_req_sync_read(rbd_dev, CEPH_NOSNAP,
1745 rbd_dev->header_name,
1746 0, size,
1747 (char *) ondisk, version);
1749 if (ret < 0)
1750 goto out_err;
1751 if (WARN_ON((size_t) ret < size)) {
1752 ret = -ENXIO;
1753 pr_warning("short header read for image %s"
1754 " (want %zd got %d)\n",
1755 rbd_dev->spec->image_name, size, ret);
1756 goto out_err;
1758 if (!rbd_dev_ondisk_valid(ondisk)) {
1759 ret = -ENXIO;
1760 pr_warning("invalid header for image %s\n",
1761 rbd_dev->spec->image_name);
1762 goto out_err;
1765 names_size = le64_to_cpu(ondisk->snap_names_len);
1766 want_count = snap_count;
1767 snap_count = le32_to_cpu(ondisk->snap_count);
1768 } while (snap_count != want_count);
1770 return ondisk;
1772 out_err:
1773 kfree(ondisk);
1775 return ERR_PTR(ret);
1779 * reload the ondisk the header
1781 static int rbd_read_header(struct rbd_device *rbd_dev,
1782 struct rbd_image_header *header)
1784 struct rbd_image_header_ondisk *ondisk;
1785 u64 ver = 0;
1786 int ret;
1788 ondisk = rbd_dev_v1_header_read(rbd_dev, &ver);
1789 if (IS_ERR(ondisk))
1790 return PTR_ERR(ondisk);
1791 ret = rbd_header_from_disk(header, ondisk);
1792 if (ret >= 0)
1793 header->obj_version = ver;
1794 kfree(ondisk);
1796 return ret;
1799 static void rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1801 struct rbd_snap *snap;
1802 struct rbd_snap *next;
1804 list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
1805 rbd_remove_snap_dev(snap);
1808 static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
1810 sector_t size;
1812 if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
1813 return;
1815 size = (sector_t) rbd_dev->header.image_size / SECTOR_SIZE;
1816 dout("setting size to %llu sectors", (unsigned long long) size);
1817 rbd_dev->mapping.size = (u64) size;
1818 set_capacity(rbd_dev->disk, size);
1822 * only read the first part of the ondisk header, without the snaps info
1824 static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev, u64 *hver)
1826 int ret;
1827 struct rbd_image_header h;
1829 ret = rbd_read_header(rbd_dev, &h);
1830 if (ret < 0)
1831 return ret;
1833 down_write(&rbd_dev->header_rwsem);
1835 /* Update image size, and check for resize of mapped image */
1836 rbd_dev->header.image_size = h.image_size;
1837 rbd_update_mapping_size(rbd_dev);
1839 /* rbd_dev->header.object_prefix shouldn't change */
1840 kfree(rbd_dev->header.snap_sizes);
1841 kfree(rbd_dev->header.snap_names);
1842 /* osd requests may still refer to snapc */
1843 ceph_put_snap_context(rbd_dev->header.snapc);
1845 if (hver)
1846 *hver = h.obj_version;
1847 rbd_dev->header.obj_version = h.obj_version;
1848 rbd_dev->header.image_size = h.image_size;
1849 rbd_dev->header.snapc = h.snapc;
1850 rbd_dev->header.snap_names = h.snap_names;
1851 rbd_dev->header.snap_sizes = h.snap_sizes;
1852 /* Free the extra copy of the object prefix */
1853 WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
1854 kfree(h.object_prefix);
1856 ret = rbd_dev_snaps_update(rbd_dev);
1857 if (!ret)
1858 ret = rbd_dev_snaps_register(rbd_dev);
1860 up_write(&rbd_dev->header_rwsem);
1862 return ret;
1865 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver)
1867 int ret;
1869 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1870 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1871 if (rbd_dev->image_format == 1)
1872 ret = rbd_dev_v1_refresh(rbd_dev, hver);
1873 else
1874 ret = rbd_dev_v2_refresh(rbd_dev, hver);
1875 mutex_unlock(&ctl_mutex);
1877 return ret;
1880 static int rbd_init_disk(struct rbd_device *rbd_dev)
1882 struct gendisk *disk;
1883 struct request_queue *q;
1884 u64 segment_size;
1886 /* create gendisk info */
1887 disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1888 if (!disk)
1889 return -ENOMEM;
1891 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
1892 rbd_dev->dev_id);
1893 disk->major = rbd_dev->major;
1894 disk->first_minor = 0;
1895 disk->fops = &rbd_bd_ops;
1896 disk->private_data = rbd_dev;
1898 /* init rq */
1899 q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1900 if (!q)
1901 goto out_disk;
1903 /* We use the default size, but let's be explicit about it. */
1904 blk_queue_physical_block_size(q, SECTOR_SIZE);
1906 /* set io sizes to object size */
1907 segment_size = rbd_obj_bytes(&rbd_dev->header);
1908 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
1909 blk_queue_max_segment_size(q, segment_size);
1910 blk_queue_io_min(q, segment_size);
1911 blk_queue_io_opt(q, segment_size);
1913 blk_queue_merge_bvec(q, rbd_merge_bvec);
1914 disk->queue = q;
1916 q->queuedata = rbd_dev;
1918 rbd_dev->disk = disk;
1920 set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
1922 return 0;
1923 out_disk:
1924 put_disk(disk);
1926 return -ENOMEM;
1930 sysfs
1933 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
1935 return container_of(dev, struct rbd_device, dev);
1938 static ssize_t rbd_size_show(struct device *dev,
1939 struct device_attribute *attr, char *buf)
1941 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1942 sector_t size;
1944 down_read(&rbd_dev->header_rwsem);
1945 size = get_capacity(rbd_dev->disk);
1946 up_read(&rbd_dev->header_rwsem);
1948 return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
1952 * Note this shows the features for whatever's mapped, which is not
1953 * necessarily the base image.
1955 static ssize_t rbd_features_show(struct device *dev,
1956 struct device_attribute *attr, char *buf)
1958 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1960 return sprintf(buf, "0x%016llx\n",
1961 (unsigned long long) rbd_dev->mapping.features);
1964 static ssize_t rbd_major_show(struct device *dev,
1965 struct device_attribute *attr, char *buf)
1967 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1969 return sprintf(buf, "%d\n", rbd_dev->major);
1972 static ssize_t rbd_client_id_show(struct device *dev,
1973 struct device_attribute *attr, char *buf)
1975 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1977 return sprintf(buf, "client%lld\n",
1978 ceph_client_id(rbd_dev->rbd_client->client));
1981 static ssize_t rbd_pool_show(struct device *dev,
1982 struct device_attribute *attr, char *buf)
1984 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1986 return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
1989 static ssize_t rbd_pool_id_show(struct device *dev,
1990 struct device_attribute *attr, char *buf)
1992 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1994 return sprintf(buf, "%llu\n",
1995 (unsigned long long) rbd_dev->spec->pool_id);
1998 static ssize_t rbd_name_show(struct device *dev,
1999 struct device_attribute *attr, char *buf)
2001 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2003 if (rbd_dev->spec->image_name)
2004 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
2006 return sprintf(buf, "(unknown)\n");
2009 static ssize_t rbd_image_id_show(struct device *dev,
2010 struct device_attribute *attr, char *buf)
2012 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2014 return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
2018 * Shows the name of the currently-mapped snapshot (or
2019 * RBD_SNAP_HEAD_NAME for the base image).
2021 static ssize_t rbd_snap_show(struct device *dev,
2022 struct device_attribute *attr,
2023 char *buf)
2025 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2027 return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
2031 * For an rbd v2 image, shows the pool id, image id, and snapshot id
2032 * for the parent image. If there is no parent, simply shows
2033 * "(no parent image)".
2035 static ssize_t rbd_parent_show(struct device *dev,
2036 struct device_attribute *attr,
2037 char *buf)
2039 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2040 struct rbd_spec *spec = rbd_dev->parent_spec;
2041 int count;
2042 char *bufp = buf;
2044 if (!spec)
2045 return sprintf(buf, "(no parent image)\n");
2047 count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
2048 (unsigned long long) spec->pool_id, spec->pool_name);
2049 if (count < 0)
2050 return count;
2051 bufp += count;
2053 count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
2054 spec->image_name ? spec->image_name : "(unknown)");
2055 if (count < 0)
2056 return count;
2057 bufp += count;
2059 count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
2060 (unsigned long long) spec->snap_id, spec->snap_name);
2061 if (count < 0)
2062 return count;
2063 bufp += count;
2065 count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
2066 if (count < 0)
2067 return count;
2068 bufp += count;
2070 return (ssize_t) (bufp - buf);
2073 static ssize_t rbd_image_refresh(struct device *dev,
2074 struct device_attribute *attr,
2075 const char *buf,
2076 size_t size)
2078 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2079 int ret;
2081 ret = rbd_dev_refresh(rbd_dev, NULL);
2083 return ret < 0 ? ret : size;
2086 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
2087 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
2088 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
2089 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
2090 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
2091 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
2092 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
2093 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
2094 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
2095 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
2096 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
2098 static struct attribute *rbd_attrs[] = {
2099 &dev_attr_size.attr,
2100 &dev_attr_features.attr,
2101 &dev_attr_major.attr,
2102 &dev_attr_client_id.attr,
2103 &dev_attr_pool.attr,
2104 &dev_attr_pool_id.attr,
2105 &dev_attr_name.attr,
2106 &dev_attr_image_id.attr,
2107 &dev_attr_current_snap.attr,
2108 &dev_attr_parent.attr,
2109 &dev_attr_refresh.attr,
2110 NULL
2113 static struct attribute_group rbd_attr_group = {
2114 .attrs = rbd_attrs,
2117 static const struct attribute_group *rbd_attr_groups[] = {
2118 &rbd_attr_group,
2119 NULL
2122 static void rbd_sysfs_dev_release(struct device *dev)
2126 static struct device_type rbd_device_type = {
2127 .name = "rbd",
2128 .groups = rbd_attr_groups,
2129 .release = rbd_sysfs_dev_release,
2134 sysfs - snapshots
2137 static ssize_t rbd_snap_size_show(struct device *dev,
2138 struct device_attribute *attr,
2139 char *buf)
2141 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2143 return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
2146 static ssize_t rbd_snap_id_show(struct device *dev,
2147 struct device_attribute *attr,
2148 char *buf)
2150 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2152 return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
2155 static ssize_t rbd_snap_features_show(struct device *dev,
2156 struct device_attribute *attr,
2157 char *buf)
2159 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2161 return sprintf(buf, "0x%016llx\n",
2162 (unsigned long long) snap->features);
2165 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
2166 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
2167 static DEVICE_ATTR(snap_features, S_IRUGO, rbd_snap_features_show, NULL);
2169 static struct attribute *rbd_snap_attrs[] = {
2170 &dev_attr_snap_size.attr,
2171 &dev_attr_snap_id.attr,
2172 &dev_attr_snap_features.attr,
2173 NULL,
2176 static struct attribute_group rbd_snap_attr_group = {
2177 .attrs = rbd_snap_attrs,
2180 static void rbd_snap_dev_release(struct device *dev)
2182 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2183 kfree(snap->name);
2184 kfree(snap);
2187 static const struct attribute_group *rbd_snap_attr_groups[] = {
2188 &rbd_snap_attr_group,
2189 NULL
2192 static struct device_type rbd_snap_device_type = {
2193 .groups = rbd_snap_attr_groups,
2194 .release = rbd_snap_dev_release,
2197 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
2199 kref_get(&spec->kref);
2201 return spec;
2204 static void rbd_spec_free(struct kref *kref);
2205 static void rbd_spec_put(struct rbd_spec *spec)
2207 if (spec)
2208 kref_put(&spec->kref, rbd_spec_free);
2211 static struct rbd_spec *rbd_spec_alloc(void)
2213 struct rbd_spec *spec;
2215 spec = kzalloc(sizeof (*spec), GFP_KERNEL);
2216 if (!spec)
2217 return NULL;
2218 kref_init(&spec->kref);
2220 rbd_spec_put(rbd_spec_get(spec)); /* TEMPORARY */
2222 return spec;
2225 static void rbd_spec_free(struct kref *kref)
2227 struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
2229 kfree(spec->pool_name);
2230 kfree(spec->image_id);
2231 kfree(spec->image_name);
2232 kfree(spec->snap_name);
2233 kfree(spec);
2236 struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
2237 struct rbd_spec *spec)
2239 struct rbd_device *rbd_dev;
2241 rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
2242 if (!rbd_dev)
2243 return NULL;
2245 spin_lock_init(&rbd_dev->lock);
2246 INIT_LIST_HEAD(&rbd_dev->node);
2247 INIT_LIST_HEAD(&rbd_dev->snaps);
2248 init_rwsem(&rbd_dev->header_rwsem);
2250 rbd_dev->spec = spec;
2251 rbd_dev->rbd_client = rbdc;
2253 return rbd_dev;
2256 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
2258 rbd_spec_put(rbd_dev->parent_spec);
2259 kfree(rbd_dev->header_name);
2260 rbd_put_client(rbd_dev->rbd_client);
2261 rbd_spec_put(rbd_dev->spec);
2262 kfree(rbd_dev);
2265 static bool rbd_snap_registered(struct rbd_snap *snap)
2267 bool ret = snap->dev.type == &rbd_snap_device_type;
2268 bool reg = device_is_registered(&snap->dev);
2270 rbd_assert(!ret ^ reg);
2272 return ret;
2275 static void rbd_remove_snap_dev(struct rbd_snap *snap)
2277 list_del(&snap->node);
2278 if (device_is_registered(&snap->dev))
2279 device_unregister(&snap->dev);
2282 static int rbd_register_snap_dev(struct rbd_snap *snap,
2283 struct device *parent)
2285 struct device *dev = &snap->dev;
2286 int ret;
2288 dev->type = &rbd_snap_device_type;
2289 dev->parent = parent;
2290 dev->release = rbd_snap_dev_release;
2291 dev_set_name(dev, "%s%s", RBD_SNAP_DEV_NAME_PREFIX, snap->name);
2292 dout("%s: registering device for snapshot %s\n", __func__, snap->name);
2294 ret = device_register(dev);
2296 return ret;
2299 static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
2300 const char *snap_name,
2301 u64 snap_id, u64 snap_size,
2302 u64 snap_features)
2304 struct rbd_snap *snap;
2305 int ret;
2307 snap = kzalloc(sizeof (*snap), GFP_KERNEL);
2308 if (!snap)
2309 return ERR_PTR(-ENOMEM);
2311 ret = -ENOMEM;
2312 snap->name = kstrdup(snap_name, GFP_KERNEL);
2313 if (!snap->name)
2314 goto err;
2316 snap->id = snap_id;
2317 snap->size = snap_size;
2318 snap->features = snap_features;
2320 return snap;
2322 err:
2323 kfree(snap->name);
2324 kfree(snap);
2326 return ERR_PTR(ret);
2329 static char *rbd_dev_v1_snap_info(struct rbd_device *rbd_dev, u32 which,
2330 u64 *snap_size, u64 *snap_features)
2332 char *snap_name;
2334 rbd_assert(which < rbd_dev->header.snapc->num_snaps);
2336 *snap_size = rbd_dev->header.snap_sizes[which];
2337 *snap_features = 0; /* No features for v1 */
2339 /* Skip over names until we find the one we are looking for */
2341 snap_name = rbd_dev->header.snap_names;
2342 while (which--)
2343 snap_name += strlen(snap_name) + 1;
2345 return snap_name;
2349 * Get the size and object order for an image snapshot, or if
2350 * snap_id is CEPH_NOSNAP, gets this information for the base
2351 * image.
2353 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
2354 u8 *order, u64 *snap_size)
2356 __le64 snapid = cpu_to_le64(snap_id);
2357 int ret;
2358 struct {
2359 u8 order;
2360 __le64 size;
2361 } __attribute__ ((packed)) size_buf = { 0 };
2363 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2364 "rbd", "get_size",
2365 (char *) &snapid, sizeof (snapid),
2366 (char *) &size_buf, sizeof (size_buf),
2367 CEPH_OSD_FLAG_READ, NULL);
2368 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2369 if (ret < 0)
2370 return ret;
2372 *order = size_buf.order;
2373 *snap_size = le64_to_cpu(size_buf.size);
2375 dout(" snap_id 0x%016llx order = %u, snap_size = %llu\n",
2376 (unsigned long long) snap_id, (unsigned int) *order,
2377 (unsigned long long) *snap_size);
2379 return 0;
2382 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
2384 return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
2385 &rbd_dev->header.obj_order,
2386 &rbd_dev->header.image_size);
2389 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
2391 void *reply_buf;
2392 int ret;
2393 void *p;
2395 reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
2396 if (!reply_buf)
2397 return -ENOMEM;
2399 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2400 "rbd", "get_object_prefix",
2401 NULL, 0,
2402 reply_buf, RBD_OBJ_PREFIX_LEN_MAX,
2403 CEPH_OSD_FLAG_READ, NULL);
2404 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2405 if (ret < 0)
2406 goto out;
2407 ret = 0; /* rbd_req_sync_exec() can return positive */
2409 p = reply_buf;
2410 rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
2411 p + RBD_OBJ_PREFIX_LEN_MAX,
2412 NULL, GFP_NOIO);
2414 if (IS_ERR(rbd_dev->header.object_prefix)) {
2415 ret = PTR_ERR(rbd_dev->header.object_prefix);
2416 rbd_dev->header.object_prefix = NULL;
2417 } else {
2418 dout(" object_prefix = %s\n", rbd_dev->header.object_prefix);
2421 out:
2422 kfree(reply_buf);
2424 return ret;
2427 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
2428 u64 *snap_features)
2430 __le64 snapid = cpu_to_le64(snap_id);
2431 struct {
2432 __le64 features;
2433 __le64 incompat;
2434 } features_buf = { 0 };
2435 u64 incompat;
2436 int ret;
2438 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2439 "rbd", "get_features",
2440 (char *) &snapid, sizeof (snapid),
2441 (char *) &features_buf, sizeof (features_buf),
2442 CEPH_OSD_FLAG_READ, NULL);
2443 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2444 if (ret < 0)
2445 return ret;
2447 incompat = le64_to_cpu(features_buf.incompat);
2448 if (incompat & ~RBD_FEATURES_ALL)
2449 return -ENXIO;
2451 *snap_features = le64_to_cpu(features_buf.features);
2453 dout(" snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
2454 (unsigned long long) snap_id,
2455 (unsigned long long) *snap_features,
2456 (unsigned long long) le64_to_cpu(features_buf.incompat));
2458 return 0;
2461 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
2463 return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
2464 &rbd_dev->header.features);
2467 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
2469 struct rbd_spec *parent_spec;
2470 size_t size;
2471 void *reply_buf = NULL;
2472 __le64 snapid;
2473 void *p;
2474 void *end;
2475 char *image_id;
2476 u64 overlap;
2477 size_t len = 0;
2478 int ret;
2480 parent_spec = rbd_spec_alloc();
2481 if (!parent_spec)
2482 return -ENOMEM;
2484 size = sizeof (__le64) + /* pool_id */
2485 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX + /* image_id */
2486 sizeof (__le64) + /* snap_id */
2487 sizeof (__le64); /* overlap */
2488 reply_buf = kmalloc(size, GFP_KERNEL);
2489 if (!reply_buf) {
2490 ret = -ENOMEM;
2491 goto out_err;
2494 snapid = cpu_to_le64(CEPH_NOSNAP);
2495 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2496 "rbd", "get_parent",
2497 (char *) &snapid, sizeof (snapid),
2498 (char *) reply_buf, size,
2499 CEPH_OSD_FLAG_READ, NULL);
2500 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2501 if (ret < 0)
2502 goto out_err;
2504 ret = -ERANGE;
2505 p = reply_buf;
2506 end = (char *) reply_buf + size;
2507 ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err);
2508 if (parent_spec->pool_id == CEPH_NOPOOL)
2509 goto out; /* No parent? No problem. */
2511 image_id = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
2512 if (IS_ERR(image_id)) {
2513 ret = PTR_ERR(image_id);
2514 goto out_err;
2516 parent_spec->image_id = image_id;
2517 parent_spec->image_id_len = len;
2518 ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
2519 ceph_decode_64_safe(&p, end, overlap, out_err);
2521 rbd_dev->parent_overlap = overlap;
2522 rbd_dev->parent_spec = parent_spec;
2523 parent_spec = NULL; /* rbd_dev now owns this */
2524 out:
2525 ret = 0;
2526 out_err:
2527 kfree(reply_buf);
2528 rbd_spec_put(parent_spec);
2530 return ret;
2533 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
2535 size_t image_id_size;
2536 char *image_id;
2537 void *p;
2538 void *end;
2539 size_t size;
2540 void *reply_buf = NULL;
2541 size_t len = 0;
2542 char *image_name = NULL;
2543 int ret;
2545 rbd_assert(!rbd_dev->spec->image_name);
2547 image_id_size = sizeof (__le32) + rbd_dev->spec->image_id_len;
2548 image_id = kmalloc(image_id_size, GFP_KERNEL);
2549 if (!image_id)
2550 return NULL;
2552 p = image_id;
2553 end = (char *) image_id + image_id_size;
2554 ceph_encode_string(&p, end, rbd_dev->spec->image_id,
2555 (u32) rbd_dev->spec->image_id_len);
2557 size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
2558 reply_buf = kmalloc(size, GFP_KERNEL);
2559 if (!reply_buf)
2560 goto out;
2562 ret = rbd_req_sync_exec(rbd_dev, RBD_DIRECTORY,
2563 "rbd", "dir_get_name",
2564 image_id, image_id_size,
2565 (char *) reply_buf, size,
2566 CEPH_OSD_FLAG_READ, NULL);
2567 if (ret < 0)
2568 goto out;
2569 p = reply_buf;
2570 end = (char *) reply_buf + size;
2571 image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
2572 if (IS_ERR(image_name))
2573 image_name = NULL;
2574 else
2575 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
2576 out:
2577 kfree(reply_buf);
2578 kfree(image_id);
2580 return image_name;
2584 * When a parent image gets probed, we only have the pool, image,
2585 * and snapshot ids but not the names of any of them. This call
2586 * is made later to fill in those names. It has to be done after
2587 * rbd_dev_snaps_update() has completed because some of the
2588 * information (in particular, snapshot name) is not available
2589 * until then.
2591 static int rbd_dev_probe_update_spec(struct rbd_device *rbd_dev)
2593 struct ceph_osd_client *osdc;
2594 const char *name;
2595 void *reply_buf = NULL;
2596 int ret;
2598 if (rbd_dev->spec->pool_name)
2599 return 0; /* Already have the names */
2601 /* Look up the pool name */
2603 osdc = &rbd_dev->rbd_client->client->osdc;
2604 name = ceph_pg_pool_name_by_id(osdc->osdmap, rbd_dev->spec->pool_id);
2605 if (!name)
2606 return -EIO; /* pool id too large (>= 2^31) */
2608 rbd_dev->spec->pool_name = kstrdup(name, GFP_KERNEL);
2609 if (!rbd_dev->spec->pool_name)
2610 return -ENOMEM;
2612 /* Fetch the image name; tolerate failure here */
2614 name = rbd_dev_image_name(rbd_dev);
2615 if (name) {
2616 rbd_dev->spec->image_name_len = strlen(name);
2617 rbd_dev->spec->image_name = (char *) name;
2618 } else {
2619 pr_warning(RBD_DRV_NAME "%d "
2620 "unable to get image name for image id %s\n",
2621 rbd_dev->major, rbd_dev->spec->image_id);
2624 /* Look up the snapshot name. */
2626 name = rbd_snap_name(rbd_dev, rbd_dev->spec->snap_id);
2627 if (!name) {
2628 ret = -EIO;
2629 goto out_err;
2631 rbd_dev->spec->snap_name = kstrdup(name, GFP_KERNEL);
2632 if(!rbd_dev->spec->snap_name)
2633 goto out_err;
2635 return 0;
2636 out_err:
2637 kfree(reply_buf);
2638 kfree(rbd_dev->spec->pool_name);
2639 rbd_dev->spec->pool_name = NULL;
2641 return ret;
2644 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver)
2646 size_t size;
2647 int ret;
2648 void *reply_buf;
2649 void *p;
2650 void *end;
2651 u64 seq;
2652 u32 snap_count;
2653 struct ceph_snap_context *snapc;
2654 u32 i;
2657 * We'll need room for the seq value (maximum snapshot id),
2658 * snapshot count, and array of that many snapshot ids.
2659 * For now we have a fixed upper limit on the number we're
2660 * prepared to receive.
2662 size = sizeof (__le64) + sizeof (__le32) +
2663 RBD_MAX_SNAP_COUNT * sizeof (__le64);
2664 reply_buf = kzalloc(size, GFP_KERNEL);
2665 if (!reply_buf)
2666 return -ENOMEM;
2668 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2669 "rbd", "get_snapcontext",
2670 NULL, 0,
2671 reply_buf, size,
2672 CEPH_OSD_FLAG_READ, ver);
2673 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2674 if (ret < 0)
2675 goto out;
2677 ret = -ERANGE;
2678 p = reply_buf;
2679 end = (char *) reply_buf + size;
2680 ceph_decode_64_safe(&p, end, seq, out);
2681 ceph_decode_32_safe(&p, end, snap_count, out);
2684 * Make sure the reported number of snapshot ids wouldn't go
2685 * beyond the end of our buffer. But before checking that,
2686 * make sure the computed size of the snapshot context we
2687 * allocate is representable in a size_t.
2689 if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
2690 / sizeof (u64)) {
2691 ret = -EINVAL;
2692 goto out;
2694 if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
2695 goto out;
2697 size = sizeof (struct ceph_snap_context) +
2698 snap_count * sizeof (snapc->snaps[0]);
2699 snapc = kmalloc(size, GFP_KERNEL);
2700 if (!snapc) {
2701 ret = -ENOMEM;
2702 goto out;
2705 atomic_set(&snapc->nref, 1);
2706 snapc->seq = seq;
2707 snapc->num_snaps = snap_count;
2708 for (i = 0; i < snap_count; i++)
2709 snapc->snaps[i] = ceph_decode_64(&p);
2711 rbd_dev->header.snapc = snapc;
2713 dout(" snap context seq = %llu, snap_count = %u\n",
2714 (unsigned long long) seq, (unsigned int) snap_count);
2716 out:
2717 kfree(reply_buf);
2719 return 0;
2722 static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
2724 size_t size;
2725 void *reply_buf;
2726 __le64 snap_id;
2727 int ret;
2728 void *p;
2729 void *end;
2730 char *snap_name;
2732 size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
2733 reply_buf = kmalloc(size, GFP_KERNEL);
2734 if (!reply_buf)
2735 return ERR_PTR(-ENOMEM);
2737 snap_id = cpu_to_le64(rbd_dev->header.snapc->snaps[which]);
2738 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2739 "rbd", "get_snapshot_name",
2740 (char *) &snap_id, sizeof (snap_id),
2741 reply_buf, size,
2742 CEPH_OSD_FLAG_READ, NULL);
2743 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2744 if (ret < 0)
2745 goto out;
2747 p = reply_buf;
2748 end = (char *) reply_buf + size;
2749 snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
2750 if (IS_ERR(snap_name)) {
2751 ret = PTR_ERR(snap_name);
2752 goto out;
2753 } else {
2754 dout(" snap_id 0x%016llx snap_name = %s\n",
2755 (unsigned long long) le64_to_cpu(snap_id), snap_name);
2757 kfree(reply_buf);
2759 return snap_name;
2760 out:
2761 kfree(reply_buf);
2763 return ERR_PTR(ret);
2766 static char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which,
2767 u64 *snap_size, u64 *snap_features)
2769 __le64 snap_id;
2770 u8 order;
2771 int ret;
2773 snap_id = rbd_dev->header.snapc->snaps[which];
2774 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, &order, snap_size);
2775 if (ret)
2776 return ERR_PTR(ret);
2777 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, snap_features);
2778 if (ret)
2779 return ERR_PTR(ret);
2781 return rbd_dev_v2_snap_name(rbd_dev, which);
2784 static char *rbd_dev_snap_info(struct rbd_device *rbd_dev, u32 which,
2785 u64 *snap_size, u64 *snap_features)
2787 if (rbd_dev->image_format == 1)
2788 return rbd_dev_v1_snap_info(rbd_dev, which,
2789 snap_size, snap_features);
2790 if (rbd_dev->image_format == 2)
2791 return rbd_dev_v2_snap_info(rbd_dev, which,
2792 snap_size, snap_features);
2793 return ERR_PTR(-EINVAL);
2796 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver)
2798 int ret;
2799 __u8 obj_order;
2801 down_write(&rbd_dev->header_rwsem);
2803 /* Grab old order first, to see if it changes */
2805 obj_order = rbd_dev->header.obj_order,
2806 ret = rbd_dev_v2_image_size(rbd_dev);
2807 if (ret)
2808 goto out;
2809 if (rbd_dev->header.obj_order != obj_order) {
2810 ret = -EIO;
2811 goto out;
2813 rbd_update_mapping_size(rbd_dev);
2815 ret = rbd_dev_v2_snap_context(rbd_dev, hver);
2816 dout("rbd_dev_v2_snap_context returned %d\n", ret);
2817 if (ret)
2818 goto out;
2819 ret = rbd_dev_snaps_update(rbd_dev);
2820 dout("rbd_dev_snaps_update returned %d\n", ret);
2821 if (ret)
2822 goto out;
2823 ret = rbd_dev_snaps_register(rbd_dev);
2824 dout("rbd_dev_snaps_register returned %d\n", ret);
2825 out:
2826 up_write(&rbd_dev->header_rwsem);
2828 return ret;
2832 * Scan the rbd device's current snapshot list and compare it to the
2833 * newly-received snapshot context. Remove any existing snapshots
2834 * not present in the new snapshot context. Add a new snapshot for
2835 * any snaphots in the snapshot context not in the current list.
2836 * And verify there are no changes to snapshots we already know
2837 * about.
2839 * Assumes the snapshots in the snapshot context are sorted by
2840 * snapshot id, highest id first. (Snapshots in the rbd_dev's list
2841 * are also maintained in that order.)
2843 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
2845 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
2846 const u32 snap_count = snapc->num_snaps;
2847 struct list_head *head = &rbd_dev->snaps;
2848 struct list_head *links = head->next;
2849 u32 index = 0;
2851 dout("%s: snap count is %u\n", __func__, (unsigned int) snap_count);
2852 while (index < snap_count || links != head) {
2853 u64 snap_id;
2854 struct rbd_snap *snap;
2855 char *snap_name;
2856 u64 snap_size = 0;
2857 u64 snap_features = 0;
2859 snap_id = index < snap_count ? snapc->snaps[index]
2860 : CEPH_NOSNAP;
2861 snap = links != head ? list_entry(links, struct rbd_snap, node)
2862 : NULL;
2863 rbd_assert(!snap || snap->id != CEPH_NOSNAP);
2865 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
2866 struct list_head *next = links->next;
2868 /* Existing snapshot not in the new snap context */
2870 if (rbd_dev->spec->snap_id == snap->id)
2871 rbd_dev->exists = false;
2872 rbd_remove_snap_dev(snap);
2873 dout("%ssnap id %llu has been removed\n",
2874 rbd_dev->spec->snap_id == snap->id ?
2875 "mapped " : "",
2876 (unsigned long long) snap->id);
2878 /* Done with this list entry; advance */
2880 links = next;
2881 continue;
2884 snap_name = rbd_dev_snap_info(rbd_dev, index,
2885 &snap_size, &snap_features);
2886 if (IS_ERR(snap_name))
2887 return PTR_ERR(snap_name);
2889 dout("entry %u: snap_id = %llu\n", (unsigned int) snap_count,
2890 (unsigned long long) snap_id);
2891 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
2892 struct rbd_snap *new_snap;
2894 /* We haven't seen this snapshot before */
2896 new_snap = __rbd_add_snap_dev(rbd_dev, snap_name,
2897 snap_id, snap_size, snap_features);
2898 if (IS_ERR(new_snap)) {
2899 int err = PTR_ERR(new_snap);
2901 dout(" failed to add dev, error %d\n", err);
2903 return err;
2906 /* New goes before existing, or at end of list */
2908 dout(" added dev%s\n", snap ? "" : " at end\n");
2909 if (snap)
2910 list_add_tail(&new_snap->node, &snap->node);
2911 else
2912 list_add_tail(&new_snap->node, head);
2913 } else {
2914 /* Already have this one */
2916 dout(" already present\n");
2918 rbd_assert(snap->size == snap_size);
2919 rbd_assert(!strcmp(snap->name, snap_name));
2920 rbd_assert(snap->features == snap_features);
2922 /* Done with this list entry; advance */
2924 links = links->next;
2927 /* Advance to the next entry in the snapshot context */
2929 index++;
2931 dout("%s: done\n", __func__);
2933 return 0;
2937 * Scan the list of snapshots and register the devices for any that
2938 * have not already been registered.
2940 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev)
2942 struct rbd_snap *snap;
2943 int ret = 0;
2945 dout("%s called\n", __func__);
2946 if (WARN_ON(!device_is_registered(&rbd_dev->dev)))
2947 return -EIO;
2949 list_for_each_entry(snap, &rbd_dev->snaps, node) {
2950 if (!rbd_snap_registered(snap)) {
2951 ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
2952 if (ret < 0)
2953 break;
2956 dout("%s: returning %d\n", __func__, ret);
2958 return ret;
2961 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2963 struct device *dev;
2964 int ret;
2966 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2968 dev = &rbd_dev->dev;
2969 dev->bus = &rbd_bus_type;
2970 dev->type = &rbd_device_type;
2971 dev->parent = &rbd_root_dev;
2972 dev->release = rbd_dev_release;
2973 dev_set_name(dev, "%d", rbd_dev->dev_id);
2974 ret = device_register(dev);
2976 mutex_unlock(&ctl_mutex);
2978 return ret;
2981 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2983 device_unregister(&rbd_dev->dev);
2986 static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2988 int ret, rc;
2990 do {
2991 ret = rbd_req_sync_watch(rbd_dev);
2992 if (ret == -ERANGE) {
2993 rc = rbd_dev_refresh(rbd_dev, NULL);
2994 if (rc < 0)
2995 return rc;
2997 } while (ret == -ERANGE);
2999 return ret;
3002 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
3005 * Get a unique rbd identifier for the given new rbd_dev, and add
3006 * the rbd_dev to the global list. The minimum rbd id is 1.
3008 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
3010 rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
3012 spin_lock(&rbd_dev_list_lock);
3013 list_add_tail(&rbd_dev->node, &rbd_dev_list);
3014 spin_unlock(&rbd_dev_list_lock);
3015 dout("rbd_dev %p given dev id %llu\n", rbd_dev,
3016 (unsigned long long) rbd_dev->dev_id);
3020 * Remove an rbd_dev from the global list, and record that its
3021 * identifier is no longer in use.
3023 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
3025 struct list_head *tmp;
3026 int rbd_id = rbd_dev->dev_id;
3027 int max_id;
3029 rbd_assert(rbd_id > 0);
3031 dout("rbd_dev %p released dev id %llu\n", rbd_dev,
3032 (unsigned long long) rbd_dev->dev_id);
3033 spin_lock(&rbd_dev_list_lock);
3034 list_del_init(&rbd_dev->node);
3037 * If the id being "put" is not the current maximum, there
3038 * is nothing special we need to do.
3040 if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
3041 spin_unlock(&rbd_dev_list_lock);
3042 return;
3046 * We need to update the current maximum id. Search the
3047 * list to find out what it is. We're more likely to find
3048 * the maximum at the end, so search the list backward.
3050 max_id = 0;
3051 list_for_each_prev(tmp, &rbd_dev_list) {
3052 struct rbd_device *rbd_dev;
3054 rbd_dev = list_entry(tmp, struct rbd_device, node);
3055 if (rbd_dev->dev_id > max_id)
3056 max_id = rbd_dev->dev_id;
3058 spin_unlock(&rbd_dev_list_lock);
3061 * The max id could have been updated by rbd_dev_id_get(), in
3062 * which case it now accurately reflects the new maximum.
3063 * Be careful not to overwrite the maximum value in that
3064 * case.
3066 atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
3067 dout(" max dev id has been reset\n");
3071 * Skips over white space at *buf, and updates *buf to point to the
3072 * first found non-space character (if any). Returns the length of
3073 * the token (string of non-white space characters) found. Note
3074 * that *buf must be terminated with '\0'.
3076 static inline size_t next_token(const char **buf)
3079 * These are the characters that produce nonzero for
3080 * isspace() in the "C" and "POSIX" locales.
3082 const char *spaces = " \f\n\r\t\v";
3084 *buf += strspn(*buf, spaces); /* Find start of token */
3086 return strcspn(*buf, spaces); /* Return token length */
3090 * Finds the next token in *buf, and if the provided token buffer is
3091 * big enough, copies the found token into it. The result, if
3092 * copied, is guaranteed to be terminated with '\0'. Note that *buf
3093 * must be terminated with '\0' on entry.
3095 * Returns the length of the token found (not including the '\0').
3096 * Return value will be 0 if no token is found, and it will be >=
3097 * token_size if the token would not fit.
3099 * The *buf pointer will be updated to point beyond the end of the
3100 * found token. Note that this occurs even if the token buffer is
3101 * too small to hold it.
3103 static inline size_t copy_token(const char **buf,
3104 char *token,
3105 size_t token_size)
3107 size_t len;
3109 len = next_token(buf);
3110 if (len < token_size) {
3111 memcpy(token, *buf, len);
3112 *(token + len) = '\0';
3114 *buf += len;
3116 return len;
3120 * Finds the next token in *buf, dynamically allocates a buffer big
3121 * enough to hold a copy of it, and copies the token into the new
3122 * buffer. The copy is guaranteed to be terminated with '\0'. Note
3123 * that a duplicate buffer is created even for a zero-length token.
3125 * Returns a pointer to the newly-allocated duplicate, or a null
3126 * pointer if memory for the duplicate was not available. If
3127 * the lenp argument is a non-null pointer, the length of the token
3128 * (not including the '\0') is returned in *lenp.
3130 * If successful, the *buf pointer will be updated to point beyond
3131 * the end of the found token.
3133 * Note: uses GFP_KERNEL for allocation.
3135 static inline char *dup_token(const char **buf, size_t *lenp)
3137 char *dup;
3138 size_t len;
3140 len = next_token(buf);
3141 dup = kmalloc(len + 1, GFP_KERNEL);
3142 if (!dup)
3143 return NULL;
3145 memcpy(dup, *buf, len);
3146 *(dup + len) = '\0';
3147 *buf += len;
3149 if (lenp)
3150 *lenp = len;
3152 return dup;
3156 * Parse the options provided for an "rbd add" (i.e., rbd image
3157 * mapping) request. These arrive via a write to /sys/bus/rbd/add,
3158 * and the data written is passed here via a NUL-terminated buffer.
3159 * Returns 0 if successful or an error code otherwise.
3161 * The information extracted from these options is recorded in
3162 * the other parameters which return dynamically-allocated
3163 * structures:
3164 * ceph_opts
3165 * The address of a pointer that will refer to a ceph options
3166 * structure. Caller must release the returned pointer using
3167 * ceph_destroy_options() when it is no longer needed.
3168 * rbd_opts
3169 * Address of an rbd options pointer. Fully initialized by
3170 * this function; caller must release with kfree().
3171 * spec
3172 * Address of an rbd image specification pointer. Fully
3173 * initialized by this function based on parsed options.
3174 * Caller must release with rbd_spec_put().
3176 * The options passed take this form:
3177 * <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
3178 * where:
3179 * <mon_addrs>
3180 * A comma-separated list of one or more monitor addresses.
3181 * A monitor address is an ip address, optionally followed
3182 * by a port number (separated by a colon).
3183 * I.e.: ip1[:port1][,ip2[:port2]...]
3184 * <options>
3185 * A comma-separated list of ceph and/or rbd options.
3186 * <pool_name>
3187 * The name of the rados pool containing the rbd image.
3188 * <image_name>
3189 * The name of the image in that pool to map.
3190 * <snap_id>
3191 * An optional snapshot id. If provided, the mapping will
3192 * present data from the image at the time that snapshot was
3193 * created. The image head is used if no snapshot id is
3194 * provided. Snapshot mappings are always read-only.
3196 static int rbd_add_parse_args(const char *buf,
3197 struct ceph_options **ceph_opts,
3198 struct rbd_options **opts,
3199 struct rbd_spec **rbd_spec)
3201 size_t len;
3202 char *options;
3203 const char *mon_addrs;
3204 size_t mon_addrs_size;
3205 struct rbd_spec *spec = NULL;
3206 struct rbd_options *rbd_opts = NULL;
3207 struct ceph_options *copts;
3208 int ret;
3210 /* The first four tokens are required */
3212 len = next_token(&buf);
3213 if (!len)
3214 return -EINVAL; /* Missing monitor address(es) */
3215 mon_addrs = buf;
3216 mon_addrs_size = len + 1;
3217 buf += len;
3219 ret = -EINVAL;
3220 options = dup_token(&buf, NULL);
3221 if (!options)
3222 return -ENOMEM;
3223 if (!*options)
3224 goto out_err; /* Missing options */
3226 spec = rbd_spec_alloc();
3227 if (!spec)
3228 goto out_mem;
3230 spec->pool_name = dup_token(&buf, NULL);
3231 if (!spec->pool_name)
3232 goto out_mem;
3233 if (!*spec->pool_name)
3234 goto out_err; /* Missing pool name */
3236 spec->image_name = dup_token(&buf, &spec->image_name_len);
3237 if (!spec->image_name)
3238 goto out_mem;
3239 if (!*spec->image_name)
3240 goto out_err; /* Missing image name */
3243 * Snapshot name is optional; default is to use "-"
3244 * (indicating the head/no snapshot).
3246 len = next_token(&buf);
3247 if (!len) {
3248 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
3249 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
3250 } else if (len > RBD_MAX_SNAP_NAME_LEN) {
3251 ret = -ENAMETOOLONG;
3252 goto out_err;
3254 spec->snap_name = kmalloc(len + 1, GFP_KERNEL);
3255 if (!spec->snap_name)
3256 goto out_mem;
3257 memcpy(spec->snap_name, buf, len);
3258 *(spec->snap_name + len) = '\0';
3260 /* Initialize all rbd options to the defaults */
3262 rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
3263 if (!rbd_opts)
3264 goto out_mem;
3266 rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
3268 copts = ceph_parse_options(options, mon_addrs,
3269 mon_addrs + mon_addrs_size - 1,
3270 parse_rbd_opts_token, rbd_opts);
3271 if (IS_ERR(copts)) {
3272 ret = PTR_ERR(copts);
3273 goto out_err;
3275 kfree(options);
3277 *ceph_opts = copts;
3278 *opts = rbd_opts;
3279 *rbd_spec = spec;
3281 return 0;
3282 out_mem:
3283 ret = -ENOMEM;
3284 out_err:
3285 kfree(rbd_opts);
3286 rbd_spec_put(spec);
3287 kfree(options);
3289 return ret;
3293 * An rbd format 2 image has a unique identifier, distinct from the
3294 * name given to it by the user. Internally, that identifier is
3295 * what's used to specify the names of objects related to the image.
3297 * A special "rbd id" object is used to map an rbd image name to its
3298 * id. If that object doesn't exist, then there is no v2 rbd image
3299 * with the supplied name.
3301 * This function will record the given rbd_dev's image_id field if
3302 * it can be determined, and in that case will return 0. If any
3303 * errors occur a negative errno will be returned and the rbd_dev's
3304 * image_id field will be unchanged (and should be NULL).
3306 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
3308 int ret;
3309 size_t size;
3310 char *object_name;
3311 void *response;
3312 void *p;
3315 * When probing a parent image, the image id is already
3316 * known (and the image name likely is not). There's no
3317 * need to fetch the image id again in this case.
3319 if (rbd_dev->spec->image_id)
3320 return 0;
3323 * First, see if the format 2 image id file exists, and if
3324 * so, get the image's persistent id from it.
3326 size = sizeof (RBD_ID_PREFIX) + rbd_dev->spec->image_name_len;
3327 object_name = kmalloc(size, GFP_NOIO);
3328 if (!object_name)
3329 return -ENOMEM;
3330 sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
3331 dout("rbd id object name is %s\n", object_name);
3333 /* Response will be an encoded string, which includes a length */
3335 size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
3336 response = kzalloc(size, GFP_NOIO);
3337 if (!response) {
3338 ret = -ENOMEM;
3339 goto out;
3342 ret = rbd_req_sync_exec(rbd_dev, object_name,
3343 "rbd", "get_id",
3344 NULL, 0,
3345 response, RBD_IMAGE_ID_LEN_MAX,
3346 CEPH_OSD_FLAG_READ, NULL);
3347 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
3348 if (ret < 0)
3349 goto out;
3350 ret = 0; /* rbd_req_sync_exec() can return positive */
3352 p = response;
3353 rbd_dev->spec->image_id = ceph_extract_encoded_string(&p,
3354 p + RBD_IMAGE_ID_LEN_MAX,
3355 &rbd_dev->spec->image_id_len,
3356 GFP_NOIO);
3357 if (IS_ERR(rbd_dev->spec->image_id)) {
3358 ret = PTR_ERR(rbd_dev->spec->image_id);
3359 rbd_dev->spec->image_id = NULL;
3360 } else {
3361 dout("image_id is %s\n", rbd_dev->spec->image_id);
3363 out:
3364 kfree(response);
3365 kfree(object_name);
3367 return ret;
3370 static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
3372 int ret;
3373 size_t size;
3375 /* Version 1 images have no id; empty string is used */
3377 rbd_dev->spec->image_id = kstrdup("", GFP_KERNEL);
3378 if (!rbd_dev->spec->image_id)
3379 return -ENOMEM;
3380 rbd_dev->spec->image_id_len = 0;
3382 /* Record the header object name for this rbd image. */
3384 size = rbd_dev->spec->image_name_len + sizeof (RBD_SUFFIX);
3385 rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
3386 if (!rbd_dev->header_name) {
3387 ret = -ENOMEM;
3388 goto out_err;
3390 sprintf(rbd_dev->header_name, "%s%s",
3391 rbd_dev->spec->image_name, RBD_SUFFIX);
3393 /* Populate rbd image metadata */
3395 ret = rbd_read_header(rbd_dev, &rbd_dev->header);
3396 if (ret < 0)
3397 goto out_err;
3399 /* Version 1 images have no parent (no layering) */
3401 rbd_dev->parent_spec = NULL;
3402 rbd_dev->parent_overlap = 0;
3404 rbd_dev->image_format = 1;
3406 dout("discovered version 1 image, header name is %s\n",
3407 rbd_dev->header_name);
3409 return 0;
3411 out_err:
3412 kfree(rbd_dev->header_name);
3413 rbd_dev->header_name = NULL;
3414 kfree(rbd_dev->spec->image_id);
3415 rbd_dev->spec->image_id = NULL;
3417 return ret;
3420 static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
3422 size_t size;
3423 int ret;
3424 u64 ver = 0;
3427 * Image id was filled in by the caller. Record the header
3428 * object name for this rbd image.
3430 size = sizeof (RBD_HEADER_PREFIX) + rbd_dev->spec->image_id_len;
3431 rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
3432 if (!rbd_dev->header_name)
3433 return -ENOMEM;
3434 sprintf(rbd_dev->header_name, "%s%s",
3435 RBD_HEADER_PREFIX, rbd_dev->spec->image_id);
3437 /* Get the size and object order for the image */
3439 ret = rbd_dev_v2_image_size(rbd_dev);
3440 if (ret < 0)
3441 goto out_err;
3443 /* Get the object prefix (a.k.a. block_name) for the image */
3445 ret = rbd_dev_v2_object_prefix(rbd_dev);
3446 if (ret < 0)
3447 goto out_err;
3449 /* Get the and check features for the image */
3451 ret = rbd_dev_v2_features(rbd_dev);
3452 if (ret < 0)
3453 goto out_err;
3455 /* If the image supports layering, get the parent info */
3457 if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
3458 ret = rbd_dev_v2_parent_info(rbd_dev);
3459 if (ret < 0)
3460 goto out_err;
3463 /* crypto and compression type aren't (yet) supported for v2 images */
3465 rbd_dev->header.crypt_type = 0;
3466 rbd_dev->header.comp_type = 0;
3468 /* Get the snapshot context, plus the header version */
3470 ret = rbd_dev_v2_snap_context(rbd_dev, &ver);
3471 if (ret)
3472 goto out_err;
3473 rbd_dev->header.obj_version = ver;
3475 rbd_dev->image_format = 2;
3477 dout("discovered version 2 image, header name is %s\n",
3478 rbd_dev->header_name);
3480 return 0;
3481 out_err:
3482 rbd_dev->parent_overlap = 0;
3483 rbd_spec_put(rbd_dev->parent_spec);
3484 rbd_dev->parent_spec = NULL;
3485 kfree(rbd_dev->header_name);
3486 rbd_dev->header_name = NULL;
3487 kfree(rbd_dev->header.object_prefix);
3488 rbd_dev->header.object_prefix = NULL;
3490 return ret;
3493 static int rbd_dev_probe_finish(struct rbd_device *rbd_dev)
3495 int ret;
3497 /* no need to lock here, as rbd_dev is not registered yet */
3498 ret = rbd_dev_snaps_update(rbd_dev);
3499 if (ret)
3500 return ret;
3502 ret = rbd_dev_probe_update_spec(rbd_dev);
3503 if (ret)
3504 goto err_out_snaps;
3506 ret = rbd_dev_set_mapping(rbd_dev);
3507 if (ret)
3508 goto err_out_snaps;
3510 /* generate unique id: find highest unique id, add one */
3511 rbd_dev_id_get(rbd_dev);
3513 /* Fill in the device name, now that we have its id. */
3514 BUILD_BUG_ON(DEV_NAME_LEN
3515 < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
3516 sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
3518 /* Get our block major device number. */
3520 ret = register_blkdev(0, rbd_dev->name);
3521 if (ret < 0)
3522 goto err_out_id;
3523 rbd_dev->major = ret;
3525 /* Set up the blkdev mapping. */
3527 ret = rbd_init_disk(rbd_dev);
3528 if (ret)
3529 goto err_out_blkdev;
3531 ret = rbd_bus_add_dev(rbd_dev);
3532 if (ret)
3533 goto err_out_disk;
3536 * At this point cleanup in the event of an error is the job
3537 * of the sysfs code (initiated by rbd_bus_del_dev()).
3539 down_write(&rbd_dev->header_rwsem);
3540 ret = rbd_dev_snaps_register(rbd_dev);
3541 up_write(&rbd_dev->header_rwsem);
3542 if (ret)
3543 goto err_out_bus;
3545 ret = rbd_init_watch_dev(rbd_dev);
3546 if (ret)
3547 goto err_out_bus;
3549 /* Everything's ready. Announce the disk to the world. */
3551 add_disk(rbd_dev->disk);
3553 pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
3554 (unsigned long long) rbd_dev->mapping.size);
3556 return ret;
3557 err_out_bus:
3558 /* this will also clean up rest of rbd_dev stuff */
3560 rbd_bus_del_dev(rbd_dev);
3562 return ret;
3563 err_out_disk:
3564 rbd_free_disk(rbd_dev);
3565 err_out_blkdev:
3566 unregister_blkdev(rbd_dev->major, rbd_dev->name);
3567 err_out_id:
3568 rbd_dev_id_put(rbd_dev);
3569 err_out_snaps:
3570 rbd_remove_all_snaps(rbd_dev);
3572 return ret;
3576 * Probe for the existence of the header object for the given rbd
3577 * device. For format 2 images this includes determining the image
3578 * id.
3580 static int rbd_dev_probe(struct rbd_device *rbd_dev)
3582 int ret;
3585 * Get the id from the image id object. If it's not a
3586 * format 2 image, we'll get ENOENT back, and we'll assume
3587 * it's a format 1 image.
3589 ret = rbd_dev_image_id(rbd_dev);
3590 if (ret)
3591 ret = rbd_dev_v1_probe(rbd_dev);
3592 else
3593 ret = rbd_dev_v2_probe(rbd_dev);
3594 if (ret) {
3595 dout("probe failed, returning %d\n", ret);
3597 return ret;
3600 ret = rbd_dev_probe_finish(rbd_dev);
3601 if (ret)
3602 rbd_header_free(&rbd_dev->header);
3604 return ret;
3607 static ssize_t rbd_add(struct bus_type *bus,
3608 const char *buf,
3609 size_t count)
3611 struct rbd_device *rbd_dev = NULL;
3612 struct ceph_options *ceph_opts = NULL;
3613 struct rbd_options *rbd_opts = NULL;
3614 struct rbd_spec *spec = NULL;
3615 struct rbd_client *rbdc;
3616 struct ceph_osd_client *osdc;
3617 int rc = -ENOMEM;
3619 if (!try_module_get(THIS_MODULE))
3620 return -ENODEV;
3622 /* parse add command */
3623 rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
3624 if (rc < 0)
3625 goto err_out_module;
3627 rbdc = rbd_get_client(ceph_opts);
3628 if (IS_ERR(rbdc)) {
3629 rc = PTR_ERR(rbdc);
3630 goto err_out_args;
3632 ceph_opts = NULL; /* rbd_dev client now owns this */
3634 /* pick the pool */
3635 osdc = &rbdc->client->osdc;
3636 rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
3637 if (rc < 0)
3638 goto err_out_client;
3639 spec->pool_id = (u64) rc;
3641 rbd_dev = rbd_dev_create(rbdc, spec);
3642 if (!rbd_dev)
3643 goto err_out_client;
3644 rbdc = NULL; /* rbd_dev now owns this */
3645 spec = NULL; /* rbd_dev now owns this */
3647 rbd_dev->mapping.read_only = rbd_opts->read_only;
3648 kfree(rbd_opts);
3649 rbd_opts = NULL; /* done with this */
3651 rc = rbd_dev_probe(rbd_dev);
3652 if (rc < 0)
3653 goto err_out_rbd_dev;
3655 return count;
3656 err_out_rbd_dev:
3657 rbd_dev_destroy(rbd_dev);
3658 err_out_client:
3659 rbd_put_client(rbdc);
3660 err_out_args:
3661 if (ceph_opts)
3662 ceph_destroy_options(ceph_opts);
3663 kfree(rbd_opts);
3664 rbd_spec_put(spec);
3665 err_out_module:
3666 module_put(THIS_MODULE);
3668 dout("Error adding device %s\n", buf);
3670 return (ssize_t) rc;
3673 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
3675 struct list_head *tmp;
3676 struct rbd_device *rbd_dev;
3678 spin_lock(&rbd_dev_list_lock);
3679 list_for_each(tmp, &rbd_dev_list) {
3680 rbd_dev = list_entry(tmp, struct rbd_device, node);
3681 if (rbd_dev->dev_id == dev_id) {
3682 spin_unlock(&rbd_dev_list_lock);
3683 return rbd_dev;
3686 spin_unlock(&rbd_dev_list_lock);
3687 return NULL;
3690 static void rbd_dev_release(struct device *dev)
3692 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3694 if (rbd_dev->watch_request) {
3695 struct ceph_client *client = rbd_dev->rbd_client->client;
3697 ceph_osdc_unregister_linger_request(&client->osdc,
3698 rbd_dev->watch_request);
3700 if (rbd_dev->watch_event)
3701 rbd_req_sync_unwatch(rbd_dev);
3704 /* clean up and free blkdev */
3705 rbd_free_disk(rbd_dev);
3706 unregister_blkdev(rbd_dev->major, rbd_dev->name);
3708 /* release allocated disk header fields */
3709 rbd_header_free(&rbd_dev->header);
3711 /* done with the id, and with the rbd_dev */
3712 rbd_dev_id_put(rbd_dev);
3713 rbd_assert(rbd_dev->rbd_client != NULL);
3714 rbd_dev_destroy(rbd_dev);
3716 /* release module ref */
3717 module_put(THIS_MODULE);
3720 static ssize_t rbd_remove(struct bus_type *bus,
3721 const char *buf,
3722 size_t count)
3724 struct rbd_device *rbd_dev = NULL;
3725 int target_id, rc;
3726 unsigned long ul;
3727 int ret = count;
3729 rc = strict_strtoul(buf, 10, &ul);
3730 if (rc)
3731 return rc;
3733 /* convert to int; abort if we lost anything in the conversion */
3734 target_id = (int) ul;
3735 if (target_id != ul)
3736 return -EINVAL;
3738 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3740 rbd_dev = __rbd_get_dev(target_id);
3741 if (!rbd_dev) {
3742 ret = -ENOENT;
3743 goto done;
3746 if (rbd_dev->open_count) {
3747 ret = -EBUSY;
3748 goto done;
3751 rbd_remove_all_snaps(rbd_dev);
3752 rbd_bus_del_dev(rbd_dev);
3754 done:
3755 mutex_unlock(&ctl_mutex);
3757 return ret;
3761 * create control files in sysfs
3762 * /sys/bus/rbd/...
3764 static int rbd_sysfs_init(void)
3766 int ret;
3768 ret = device_register(&rbd_root_dev);
3769 if (ret < 0)
3770 return ret;
3772 ret = bus_register(&rbd_bus_type);
3773 if (ret < 0)
3774 device_unregister(&rbd_root_dev);
3776 return ret;
3779 static void rbd_sysfs_cleanup(void)
3781 bus_unregister(&rbd_bus_type);
3782 device_unregister(&rbd_root_dev);
3785 int __init rbd_init(void)
3787 int rc;
3789 rc = rbd_sysfs_init();
3790 if (rc)
3791 return rc;
3792 pr_info("loaded " RBD_DRV_NAME_LONG "\n");
3793 return 0;
3796 void __exit rbd_exit(void)
3798 rbd_sysfs_cleanup();
3801 module_init(rbd_init);
3802 module_exit(rbd_exit);
3804 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
3805 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
3806 MODULE_DESCRIPTION("rados block device");
3808 /* following authorship retained from original osdblk.c */
3809 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
3811 MODULE_LICENSE("GPL");