2 rbd.c -- Export ceph rados objects as a Linux block device
5 based on drivers/block/osdblk.c:
7 Copyright 2009 Red Hat, Inc.
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation.
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
18 You should have received a copy of the GNU General Public License
19 along with this program; see the file COPYING. If not, write to
20 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
24 For usage instructions, please refer to:
26 Documentation/ABI/testing/sysfs-bus-rbd
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
40 #include <linux/blkdev.h>
42 #include "rbd_types.h"
44 #define RBD_DEBUG /* Activate rbd_assert() calls */
47 * The basic unit of block I/O is a sector. It is interpreted in a
48 * number of contexts in Linux (blk, bio, genhd), but the default is
49 * universally 512 bytes. These symbols are just slightly more
50 * meaningful than the bare numbers they represent.
52 #define SECTOR_SHIFT 9
53 #define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
55 /* It might be useful to have these defined elsewhere */
57 #define U8_MAX ((u8) (~0U))
58 #define U16_MAX ((u16) (~0U))
59 #define U32_MAX ((u32) (~0U))
60 #define U64_MAX ((u64) (~0ULL))
62 #define RBD_DRV_NAME "rbd"
63 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
65 #define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
67 #define RBD_SNAP_DEV_NAME_PREFIX "snap_"
68 #define RBD_MAX_SNAP_NAME_LEN \
69 (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
71 #define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */
73 #define RBD_SNAP_HEAD_NAME "-"
75 /* This allows a single page to hold an image name sent by OSD */
76 #define RBD_IMAGE_NAME_LEN_MAX (PAGE_SIZE - sizeof (__le32) - 1)
77 #define RBD_IMAGE_ID_LEN_MAX 64
79 #define RBD_OBJ_PREFIX_LEN_MAX 64
83 #define RBD_FEATURE_LAYERING 1
85 /* Features supported by this (client software) implementation. */
87 #define RBD_FEATURES_ALL (0)
90 * An RBD device name will be "rbd#", where the "rbd" comes from
91 * RBD_DRV_NAME above, and # is a unique integer identifier.
92 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
93 * enough to hold all possible device names.
95 #define DEV_NAME_LEN 32
96 #define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
99 * block device image metadata (in-memory version)
101 struct rbd_image_header
{
102 /* These four fields never change for a given rbd image */
109 /* The remaining fields need to be updated occasionally */
111 struct ceph_snap_context
*snapc
;
119 * An rbd image specification.
121 * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
122 * identify an image. Each rbd_dev structure includes a pointer to
123 * an rbd_spec structure that encapsulates this identity.
125 * Each of the id's in an rbd_spec has an associated name. For a
126 * user-mapped image, the names are supplied and the id's associated
127 * with them are looked up. For a layered image, a parent image is
128 * defined by the tuple, and the names are looked up.
130 * An rbd_dev structure contains a parent_spec pointer which is
131 * non-null if the image it represents is a child in a layered
132 * image. This pointer will refer to the rbd_spec structure used
133 * by the parent rbd_dev for its own identity (i.e., the structure
134 * is shared between the parent and child).
136 * Since these structures are populated once, during the discovery
137 * phase of image construction, they are effectively immutable so
138 * we make no effort to synchronize access to them.
140 * Note that code herein does not assume the image name is known (it
141 * could be a null pointer).
157 * an instance of the client. multiple devices may share an rbd client.
160 struct ceph_client
*client
;
162 struct list_head node
;
165 struct rbd_img_request
;
166 typedef void (*rbd_img_callback_t
)(struct rbd_img_request
*);
168 #define BAD_WHICH U32_MAX /* Good which or bad which, which? */
170 struct rbd_obj_request
;
171 typedef void (*rbd_obj_callback_t
)(struct rbd_obj_request
*);
173 enum obj_request_type
{ OBJ_REQUEST_BIO
, OBJ_REQUEST_PAGES
};
175 struct rbd_obj_request
{
176 const char *object_name
;
177 u64 offset
; /* object start byte */
178 u64 length
; /* bytes from offset */
180 struct rbd_img_request
*img_request
;
181 struct list_head links
; /* img_request->obj_requests */
182 u32 which
; /* posn image request list */
184 enum obj_request_type type
;
186 struct bio
*bio_list
;
193 struct ceph_osd_request
*osd_req
;
195 u64 xferred
; /* bytes transferred */
200 rbd_obj_callback_t callback
;
201 struct completion completion
;
206 struct rbd_img_request
{
208 struct rbd_device
*rbd_dev
;
209 u64 offset
; /* starting image byte offset */
210 u64 length
; /* byte count from offset */
211 bool write_request
; /* false for read */
213 struct ceph_snap_context
*snapc
; /* for writes */
214 u64 snap_id
; /* for reads */
216 spinlock_t completion_lock
;/* protects next_completion */
218 rbd_img_callback_t callback
;
220 u32 obj_request_count
;
221 struct list_head obj_requests
; /* rbd_obj_request structs */
226 #define for_each_obj_request(ireq, oreq) \
227 list_for_each_entry(oreq, &ireq->obj_requests, links)
228 #define for_each_obj_request_from(ireq, oreq) \
229 list_for_each_entry_from(oreq, &ireq->obj_requests, links)
230 #define for_each_obj_request_safe(ireq, oreq, n) \
231 list_for_each_entry_safe_reverse(oreq, n, &ireq->obj_requests, links)
237 struct list_head node
;
252 int dev_id
; /* blkdev unique id */
254 int major
; /* blkdev assigned major */
255 struct gendisk
*disk
; /* blkdev's gendisk and rq */
257 u32 image_format
; /* Either 1 or 2 */
258 struct rbd_client
*rbd_client
;
260 char name
[DEV_NAME_LEN
]; /* blkdev name, e.g. rbd3 */
262 spinlock_t lock
; /* queue lock */
264 struct rbd_image_header header
;
266 struct rbd_spec
*spec
;
270 struct ceph_file_layout layout
;
272 struct ceph_osd_event
*watch_event
;
273 struct ceph_osd_request
*watch_request
;
275 struct rbd_spec
*parent_spec
;
278 /* protects updating the header */
279 struct rw_semaphore header_rwsem
;
281 struct rbd_mapping mapping
;
283 struct list_head node
;
285 /* list of snapshots */
286 struct list_head snaps
;
290 unsigned long open_count
;
293 static DEFINE_MUTEX(ctl_mutex
); /* Serialize open/close/setup/teardown */
295 static LIST_HEAD(rbd_dev_list
); /* devices */
296 static DEFINE_SPINLOCK(rbd_dev_list_lock
);
298 static LIST_HEAD(rbd_client_list
); /* clients */
299 static DEFINE_SPINLOCK(rbd_client_list_lock
);
301 static int rbd_dev_snaps_update(struct rbd_device
*rbd_dev
);
302 static int rbd_dev_snaps_register(struct rbd_device
*rbd_dev
);
304 static void rbd_dev_release(struct device
*dev
);
305 static void rbd_remove_snap_dev(struct rbd_snap
*snap
);
307 static ssize_t
rbd_add(struct bus_type
*bus
, const char *buf
,
309 static ssize_t
rbd_remove(struct bus_type
*bus
, const char *buf
,
312 static struct bus_attribute rbd_bus_attrs
[] = {
313 __ATTR(add
, S_IWUSR
, NULL
, rbd_add
),
314 __ATTR(remove
, S_IWUSR
, NULL
, rbd_remove
),
318 static struct bus_type rbd_bus_type
= {
320 .bus_attrs
= rbd_bus_attrs
,
323 static void rbd_root_dev_release(struct device
*dev
)
327 static struct device rbd_root_dev
= {
329 .release
= rbd_root_dev_release
,
332 static __printf(2, 3)
333 void rbd_warn(struct rbd_device
*rbd_dev
, const char *fmt
, ...)
335 struct va_format vaf
;
343 printk(KERN_WARNING
"%s: %pV\n", RBD_DRV_NAME
, &vaf
);
344 else if (rbd_dev
->disk
)
345 printk(KERN_WARNING
"%s: %s: %pV\n",
346 RBD_DRV_NAME
, rbd_dev
->disk
->disk_name
, &vaf
);
347 else if (rbd_dev
->spec
&& rbd_dev
->spec
->image_name
)
348 printk(KERN_WARNING
"%s: image %s: %pV\n",
349 RBD_DRV_NAME
, rbd_dev
->spec
->image_name
, &vaf
);
350 else if (rbd_dev
->spec
&& rbd_dev
->spec
->image_id
)
351 printk(KERN_WARNING
"%s: id %s: %pV\n",
352 RBD_DRV_NAME
, rbd_dev
->spec
->image_id
, &vaf
);
354 printk(KERN_WARNING
"%s: rbd_dev %p: %pV\n",
355 RBD_DRV_NAME
, rbd_dev
, &vaf
);
360 #define rbd_assert(expr) \
361 if (unlikely(!(expr))) { \
362 printk(KERN_ERR "\nAssertion failure in %s() " \
364 "\trbd_assert(%s);\n\n", \
365 __func__, __LINE__, #expr); \
368 #else /* !RBD_DEBUG */
369 # define rbd_assert(expr) ((void) 0)
370 #endif /* !RBD_DEBUG */
372 static int rbd_dev_refresh(struct rbd_device
*rbd_dev
, u64
*hver
);
373 static int rbd_dev_v2_refresh(struct rbd_device
*rbd_dev
, u64
*hver
);
375 static int rbd_open(struct block_device
*bdev
, fmode_t mode
)
377 struct rbd_device
*rbd_dev
= bdev
->bd_disk
->private_data
;
379 if ((mode
& FMODE_WRITE
) && rbd_dev
->mapping
.read_only
)
382 mutex_lock_nested(&ctl_mutex
, SINGLE_DEPTH_NESTING
);
383 (void) get_device(&rbd_dev
->dev
);
384 set_device_ro(bdev
, rbd_dev
->mapping
.read_only
);
385 rbd_dev
->open_count
++;
386 mutex_unlock(&ctl_mutex
);
391 static int rbd_release(struct gendisk
*disk
, fmode_t mode
)
393 struct rbd_device
*rbd_dev
= disk
->private_data
;
395 mutex_lock_nested(&ctl_mutex
, SINGLE_DEPTH_NESTING
);
396 rbd_assert(rbd_dev
->open_count
> 0);
397 rbd_dev
->open_count
--;
398 put_device(&rbd_dev
->dev
);
399 mutex_unlock(&ctl_mutex
);
404 static const struct block_device_operations rbd_bd_ops
= {
405 .owner
= THIS_MODULE
,
407 .release
= rbd_release
,
411 * Initialize an rbd client instance.
414 static struct rbd_client
*rbd_client_create(struct ceph_options
*ceph_opts
)
416 struct rbd_client
*rbdc
;
419 dout("rbd_client_create\n");
420 rbdc
= kmalloc(sizeof(struct rbd_client
), GFP_KERNEL
);
424 kref_init(&rbdc
->kref
);
425 INIT_LIST_HEAD(&rbdc
->node
);
427 mutex_lock_nested(&ctl_mutex
, SINGLE_DEPTH_NESTING
);
429 rbdc
->client
= ceph_create_client(ceph_opts
, rbdc
, 0, 0);
430 if (IS_ERR(rbdc
->client
))
432 ceph_opts
= NULL
; /* Now rbdc->client is responsible for ceph_opts */
434 ret
= ceph_open_session(rbdc
->client
);
438 spin_lock(&rbd_client_list_lock
);
439 list_add_tail(&rbdc
->node
, &rbd_client_list
);
440 spin_unlock(&rbd_client_list_lock
);
442 mutex_unlock(&ctl_mutex
);
444 dout("rbd_client_create created %p\n", rbdc
);
448 ceph_destroy_client(rbdc
->client
);
450 mutex_unlock(&ctl_mutex
);
454 ceph_destroy_options(ceph_opts
);
459 * Find a ceph client with specific addr and configuration. If
460 * found, bump its reference count.
462 static struct rbd_client
*rbd_client_find(struct ceph_options
*ceph_opts
)
464 struct rbd_client
*client_node
;
467 if (ceph_opts
->flags
& CEPH_OPT_NOSHARE
)
470 spin_lock(&rbd_client_list_lock
);
471 list_for_each_entry(client_node
, &rbd_client_list
, node
) {
472 if (!ceph_compare_options(ceph_opts
, client_node
->client
)) {
473 kref_get(&client_node
->kref
);
478 spin_unlock(&rbd_client_list_lock
);
480 return found
? client_node
: NULL
;
490 /* string args above */
493 /* Boolean args above */
497 static match_table_t rbd_opts_tokens
= {
499 /* string args above */
500 {Opt_read_only
, "read_only"},
501 {Opt_read_only
, "ro"}, /* Alternate spelling */
502 {Opt_read_write
, "read_write"},
503 {Opt_read_write
, "rw"}, /* Alternate spelling */
504 /* Boolean args above */
512 #define RBD_READ_ONLY_DEFAULT false
514 static int parse_rbd_opts_token(char *c
, void *private)
516 struct rbd_options
*rbd_opts
= private;
517 substring_t argstr
[MAX_OPT_ARGS
];
518 int token
, intval
, ret
;
520 token
= match_token(c
, rbd_opts_tokens
, argstr
);
524 if (token
< Opt_last_int
) {
525 ret
= match_int(&argstr
[0], &intval
);
527 pr_err("bad mount option arg (not int) "
531 dout("got int token %d val %d\n", token
, intval
);
532 } else if (token
> Opt_last_int
&& token
< Opt_last_string
) {
533 dout("got string token %d val %s\n", token
,
535 } else if (token
> Opt_last_string
&& token
< Opt_last_bool
) {
536 dout("got Boolean token %d\n", token
);
538 dout("got token %d\n", token
);
543 rbd_opts
->read_only
= true;
546 rbd_opts
->read_only
= false;
556 * Get a ceph client with specific addr and configuration, if one does
557 * not exist create it.
559 static struct rbd_client
*rbd_get_client(struct ceph_options
*ceph_opts
)
561 struct rbd_client
*rbdc
;
563 rbdc
= rbd_client_find(ceph_opts
);
564 if (rbdc
) /* using an existing client */
565 ceph_destroy_options(ceph_opts
);
567 rbdc
= rbd_client_create(ceph_opts
);
573 * Destroy ceph client
575 * Caller must hold rbd_client_list_lock.
577 static void rbd_client_release(struct kref
*kref
)
579 struct rbd_client
*rbdc
= container_of(kref
, struct rbd_client
, kref
);
581 dout("rbd_release_client %p\n", rbdc
);
582 spin_lock(&rbd_client_list_lock
);
583 list_del(&rbdc
->node
);
584 spin_unlock(&rbd_client_list_lock
);
586 ceph_destroy_client(rbdc
->client
);
591 * Drop reference to ceph client node. If it's not referenced anymore, release
594 static void rbd_put_client(struct rbd_client
*rbdc
)
597 kref_put(&rbdc
->kref
, rbd_client_release
);
600 static bool rbd_image_format_valid(u32 image_format
)
602 return image_format
== 1 || image_format
== 2;
605 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk
*ondisk
)
610 /* The header has to start with the magic rbd header text */
611 if (memcmp(&ondisk
->text
, RBD_HEADER_TEXT
, sizeof (RBD_HEADER_TEXT
)))
614 /* The bio layer requires at least sector-sized I/O */
616 if (ondisk
->options
.order
< SECTOR_SHIFT
)
619 /* If we use u64 in a few spots we may be able to loosen this */
621 if (ondisk
->options
.order
> 8 * sizeof (int) - 1)
625 * The size of a snapshot header has to fit in a size_t, and
626 * that limits the number of snapshots.
628 snap_count
= le32_to_cpu(ondisk
->snap_count
);
629 size
= SIZE_MAX
- sizeof (struct ceph_snap_context
);
630 if (snap_count
> size
/ sizeof (__le64
))
634 * Not only that, but the size of the entire the snapshot
635 * header must also be representable in a size_t.
637 size
-= snap_count
* sizeof (__le64
);
638 if ((u64
) size
< le64_to_cpu(ondisk
->snap_names_len
))
645 * Create a new header structure, translate header format from the on-disk
648 static int rbd_header_from_disk(struct rbd_image_header
*header
,
649 struct rbd_image_header_ondisk
*ondisk
)
656 memset(header
, 0, sizeof (*header
));
658 snap_count
= le32_to_cpu(ondisk
->snap_count
);
660 len
= strnlen(ondisk
->object_prefix
, sizeof (ondisk
->object_prefix
));
661 header
->object_prefix
= kmalloc(len
+ 1, GFP_KERNEL
);
662 if (!header
->object_prefix
)
664 memcpy(header
->object_prefix
, ondisk
->object_prefix
, len
);
665 header
->object_prefix
[len
] = '\0';
668 u64 snap_names_len
= le64_to_cpu(ondisk
->snap_names_len
);
670 /* Save a copy of the snapshot names */
672 if (snap_names_len
> (u64
) SIZE_MAX
)
674 header
->snap_names
= kmalloc(snap_names_len
, GFP_KERNEL
);
675 if (!header
->snap_names
)
678 * Note that rbd_dev_v1_header_read() guarantees
679 * the ondisk buffer we're working with has
680 * snap_names_len bytes beyond the end of the
681 * snapshot id array, this memcpy() is safe.
683 memcpy(header
->snap_names
, &ondisk
->snaps
[snap_count
],
686 /* Record each snapshot's size */
688 size
= snap_count
* sizeof (*header
->snap_sizes
);
689 header
->snap_sizes
= kmalloc(size
, GFP_KERNEL
);
690 if (!header
->snap_sizes
)
692 for (i
= 0; i
< snap_count
; i
++)
693 header
->snap_sizes
[i
] =
694 le64_to_cpu(ondisk
->snaps
[i
].image_size
);
696 WARN_ON(ondisk
->snap_names_len
);
697 header
->snap_names
= NULL
;
698 header
->snap_sizes
= NULL
;
701 header
->features
= 0; /* No features support in v1 images */
702 header
->obj_order
= ondisk
->options
.order
;
703 header
->crypt_type
= ondisk
->options
.crypt_type
;
704 header
->comp_type
= ondisk
->options
.comp_type
;
706 /* Allocate and fill in the snapshot context */
708 header
->image_size
= le64_to_cpu(ondisk
->image_size
);
709 size
= sizeof (struct ceph_snap_context
);
710 size
+= snap_count
* sizeof (header
->snapc
->snaps
[0]);
711 header
->snapc
= kzalloc(size
, GFP_KERNEL
);
715 atomic_set(&header
->snapc
->nref
, 1);
716 header
->snapc
->seq
= le64_to_cpu(ondisk
->snap_seq
);
717 header
->snapc
->num_snaps
= snap_count
;
718 for (i
= 0; i
< snap_count
; i
++)
719 header
->snapc
->snaps
[i
] =
720 le64_to_cpu(ondisk
->snaps
[i
].id
);
725 kfree(header
->snap_sizes
);
726 header
->snap_sizes
= NULL
;
727 kfree(header
->snap_names
);
728 header
->snap_names
= NULL
;
729 kfree(header
->object_prefix
);
730 header
->object_prefix
= NULL
;
735 static const char *rbd_snap_name(struct rbd_device
*rbd_dev
, u64 snap_id
)
737 struct rbd_snap
*snap
;
739 if (snap_id
== CEPH_NOSNAP
)
740 return RBD_SNAP_HEAD_NAME
;
742 list_for_each_entry(snap
, &rbd_dev
->snaps
, node
)
743 if (snap_id
== snap
->id
)
749 static int snap_by_name(struct rbd_device
*rbd_dev
, const char *snap_name
)
752 struct rbd_snap
*snap
;
754 list_for_each_entry(snap
, &rbd_dev
->snaps
, node
) {
755 if (!strcmp(snap_name
, snap
->name
)) {
756 rbd_dev
->spec
->snap_id
= snap
->id
;
757 rbd_dev
->mapping
.size
= snap
->size
;
758 rbd_dev
->mapping
.features
= snap
->features
;
767 static int rbd_dev_set_mapping(struct rbd_device
*rbd_dev
)
771 if (!memcmp(rbd_dev
->spec
->snap_name
, RBD_SNAP_HEAD_NAME
,
772 sizeof (RBD_SNAP_HEAD_NAME
))) {
773 rbd_dev
->spec
->snap_id
= CEPH_NOSNAP
;
774 rbd_dev
->mapping
.size
= rbd_dev
->header
.image_size
;
775 rbd_dev
->mapping
.features
= rbd_dev
->header
.features
;
778 ret
= snap_by_name(rbd_dev
, rbd_dev
->spec
->snap_name
);
781 rbd_dev
->mapping
.read_only
= true;
783 atomic_set(&rbd_dev
->exists
, 1);
788 static void rbd_header_free(struct rbd_image_header
*header
)
790 kfree(header
->object_prefix
);
791 header
->object_prefix
= NULL
;
792 kfree(header
->snap_sizes
);
793 header
->snap_sizes
= NULL
;
794 kfree(header
->snap_names
);
795 header
->snap_names
= NULL
;
796 ceph_put_snap_context(header
->snapc
);
797 header
->snapc
= NULL
;
800 static const char *rbd_segment_name(struct rbd_device
*rbd_dev
, u64 offset
)
806 name
= kmalloc(MAX_OBJ_NAME_SIZE
+ 1, GFP_NOIO
);
809 segment
= offset
>> rbd_dev
->header
.obj_order
;
810 ret
= snprintf(name
, MAX_OBJ_NAME_SIZE
+ 1, "%s.%012llx",
811 rbd_dev
->header
.object_prefix
, segment
);
812 if (ret
< 0 || ret
> MAX_OBJ_NAME_SIZE
) {
813 pr_err("error formatting segment name for #%llu (%d)\n",
822 static u64
rbd_segment_offset(struct rbd_device
*rbd_dev
, u64 offset
)
824 u64 segment_size
= (u64
) 1 << rbd_dev
->header
.obj_order
;
826 return offset
& (segment_size
- 1);
829 static u64
rbd_segment_length(struct rbd_device
*rbd_dev
,
830 u64 offset
, u64 length
)
832 u64 segment_size
= (u64
) 1 << rbd_dev
->header
.obj_order
;
834 offset
&= segment_size
- 1;
836 rbd_assert(length
<= U64_MAX
- offset
);
837 if (offset
+ length
> segment_size
)
838 length
= segment_size
- offset
;
844 * returns the size of an object in the image
846 static u64
rbd_obj_bytes(struct rbd_image_header
*header
)
848 return 1 << header
->obj_order
;
855 static void bio_chain_put(struct bio
*chain
)
861 chain
= chain
->bi_next
;
867 * zeros a bio chain, starting at specific offset
869 static void zero_bio_chain(struct bio
*chain
, int start_ofs
)
878 bio_for_each_segment(bv
, chain
, i
) {
879 if (pos
+ bv
->bv_len
> start_ofs
) {
880 int remainder
= max(start_ofs
- pos
, 0);
881 buf
= bvec_kmap_irq(bv
, &flags
);
882 memset(buf
+ remainder
, 0,
883 bv
->bv_len
- remainder
);
884 bvec_kunmap_irq(buf
, &flags
);
889 chain
= chain
->bi_next
;
894 * Clone a portion of a bio, starting at the given byte offset
895 * and continuing for the number of bytes indicated.
897 static struct bio
*bio_clone_range(struct bio
*bio_src
,
906 unsigned short end_idx
;
910 /* Handle the easy case for the caller */
912 if (!offset
&& len
== bio_src
->bi_size
)
913 return bio_clone(bio_src
, gfpmask
);
915 if (WARN_ON_ONCE(!len
))
917 if (WARN_ON_ONCE(len
> bio_src
->bi_size
))
919 if (WARN_ON_ONCE(offset
> bio_src
->bi_size
- len
))
922 /* Find first affected segment... */
925 __bio_for_each_segment(bv
, bio_src
, idx
, 0) {
926 if (resid
< bv
->bv_len
)
932 /* ...and the last affected segment */
935 __bio_for_each_segment(bv
, bio_src
, end_idx
, idx
) {
936 if (resid
<= bv
->bv_len
)
940 vcnt
= end_idx
- idx
+ 1;
942 /* Build the clone */
944 bio
= bio_alloc(gfpmask
, (unsigned int) vcnt
);
946 return NULL
; /* ENOMEM */
948 bio
->bi_bdev
= bio_src
->bi_bdev
;
949 bio
->bi_sector
= bio_src
->bi_sector
+ (offset
>> SECTOR_SHIFT
);
950 bio
->bi_rw
= bio_src
->bi_rw
;
951 bio
->bi_flags
|= 1 << BIO_CLONED
;
954 * Copy over our part of the bio_vec, then update the first
955 * and last (or only) entries.
957 memcpy(&bio
->bi_io_vec
[0], &bio_src
->bi_io_vec
[idx
],
958 vcnt
* sizeof (struct bio_vec
));
959 bio
->bi_io_vec
[0].bv_offset
+= voff
;
961 bio
->bi_io_vec
[0].bv_len
-= voff
;
962 bio
->bi_io_vec
[vcnt
- 1].bv_len
= resid
;
964 bio
->bi_io_vec
[0].bv_len
= len
;
975 * Clone a portion of a bio chain, starting at the given byte offset
976 * into the first bio in the source chain and continuing for the
977 * number of bytes indicated. The result is another bio chain of
978 * exactly the given length, or a null pointer on error.
980 * The bio_src and offset parameters are both in-out. On entry they
981 * refer to the first source bio and the offset into that bio where
982 * the start of data to be cloned is located.
984 * On return, bio_src is updated to refer to the bio in the source
985 * chain that contains first un-cloned byte, and *offset will
986 * contain the offset of that byte within that bio.
988 static struct bio
*bio_chain_clone_range(struct bio
**bio_src
,
989 unsigned int *offset
,
993 struct bio
*bi
= *bio_src
;
994 unsigned int off
= *offset
;
995 struct bio
*chain
= NULL
;
998 /* Build up a chain of clone bios up to the limit */
1000 if (!bi
|| off
>= bi
->bi_size
|| !len
)
1001 return NULL
; /* Nothing to clone */
1005 unsigned int bi_size
;
1009 rbd_warn(NULL
, "bio_chain exhausted with %u left", len
);
1010 goto out_err
; /* EINVAL; ran out of bio's */
1012 bi_size
= min_t(unsigned int, bi
->bi_size
- off
, len
);
1013 bio
= bio_clone_range(bi
, off
, bi_size
, gfpmask
);
1015 goto out_err
; /* ENOMEM */
1018 end
= &bio
->bi_next
;
1021 if (off
== bi
->bi_size
) {
1032 bio_chain_put(chain
);
1037 static void rbd_obj_request_get(struct rbd_obj_request
*obj_request
)
1039 kref_get(&obj_request
->kref
);
1042 static void rbd_obj_request_destroy(struct kref
*kref
);
1043 static void rbd_obj_request_put(struct rbd_obj_request
*obj_request
)
1045 rbd_assert(obj_request
!= NULL
);
1046 kref_put(&obj_request
->kref
, rbd_obj_request_destroy
);
1049 static void rbd_img_request_get(struct rbd_img_request
*img_request
)
1051 kref_get(&img_request
->kref
);
1054 static void rbd_img_request_destroy(struct kref
*kref
);
1055 static void rbd_img_request_put(struct rbd_img_request
*img_request
)
1057 rbd_assert(img_request
!= NULL
);
1058 kref_put(&img_request
->kref
, rbd_img_request_destroy
);
1061 static inline void rbd_img_obj_request_add(struct rbd_img_request
*img_request
,
1062 struct rbd_obj_request
*obj_request
)
1064 rbd_obj_request_get(obj_request
);
1065 obj_request
->img_request
= img_request
;
1066 list_add_tail(&obj_request
->links
, &img_request
->obj_requests
);
1067 obj_request
->which
= img_request
->obj_request_count
++;
1068 rbd_assert(obj_request
->which
!= BAD_WHICH
);
1071 static inline void rbd_img_obj_request_del(struct rbd_img_request
*img_request
,
1072 struct rbd_obj_request
*obj_request
)
1074 rbd_assert(obj_request
->which
!= BAD_WHICH
);
1075 obj_request
->which
= BAD_WHICH
;
1076 list_del(&obj_request
->links
);
1077 rbd_assert(obj_request
->img_request
== img_request
);
1078 obj_request
->callback
= NULL
;
1079 obj_request
->img_request
= NULL
;
1080 rbd_obj_request_put(obj_request
);
1083 static bool obj_request_type_valid(enum obj_request_type type
)
1086 case OBJ_REQUEST_BIO
:
1087 case OBJ_REQUEST_PAGES
:
1094 struct ceph_osd_req_op
*rbd_osd_req_op_create(u16 opcode
, ...)
1096 struct ceph_osd_req_op
*op
;
1100 op
= kzalloc(sizeof (*op
), GFP_NOIO
);
1104 va_start(args
, opcode
);
1106 case CEPH_OSD_OP_READ
:
1107 case CEPH_OSD_OP_WRITE
:
1108 /* rbd_osd_req_op_create(READ, offset, length) */
1109 /* rbd_osd_req_op_create(WRITE, offset, length) */
1110 op
->extent
.offset
= va_arg(args
, u64
);
1111 op
->extent
.length
= va_arg(args
, u64
);
1112 if (opcode
== CEPH_OSD_OP_WRITE
)
1113 op
->payload_len
= op
->extent
.length
;
1115 case CEPH_OSD_OP_CALL
:
1116 /* rbd_osd_req_op_create(CALL, class, method, data, datalen) */
1117 op
->cls
.class_name
= va_arg(args
, char *);
1118 size
= strlen(op
->cls
.class_name
);
1119 rbd_assert(size
<= (size_t) U8_MAX
);
1120 op
->cls
.class_len
= size
;
1121 op
->payload_len
= size
;
1123 op
->cls
.method_name
= va_arg(args
, char *);
1124 size
= strlen(op
->cls
.method_name
);
1125 rbd_assert(size
<= (size_t) U8_MAX
);
1126 op
->cls
.method_len
= size
;
1127 op
->payload_len
+= size
;
1130 op
->cls
.indata
= va_arg(args
, void *);
1131 size
= va_arg(args
, size_t);
1132 rbd_assert(size
<= (size_t) U32_MAX
);
1133 op
->cls
.indata_len
= (u32
) size
;
1134 op
->payload_len
+= size
;
1136 case CEPH_OSD_OP_NOTIFY_ACK
:
1137 case CEPH_OSD_OP_WATCH
:
1138 /* rbd_osd_req_op_create(NOTIFY_ACK, cookie, version) */
1139 /* rbd_osd_req_op_create(WATCH, cookie, version, flag) */
1140 op
->watch
.cookie
= va_arg(args
, u64
);
1141 op
->watch
.ver
= va_arg(args
, u64
);
1142 op
->watch
.ver
= cpu_to_le64(op
->watch
.ver
);
1143 if (opcode
== CEPH_OSD_OP_WATCH
&& va_arg(args
, int))
1144 op
->watch
.flag
= (u8
) 1;
1147 rbd_warn(NULL
, "unsupported opcode %hu\n", opcode
);
1157 static void rbd_osd_req_op_destroy(struct ceph_osd_req_op
*op
)
1163 * Send ceph osd request
1165 static int rbd_do_request(struct request
*rq
,
1166 struct rbd_device
*rbd_dev
,
1167 struct ceph_snap_context
*snapc
,
1169 const char *object_name
, u64 ofs
, u64 len
,
1171 struct page
**pages
,
1174 struct ceph_osd_req_op
*op
,
1175 void (*rbd_cb
)(struct ceph_osd_request
*,
1179 struct ceph_osd_client
*osdc
;
1180 struct ceph_osd_request
*osd_req
;
1181 struct timespec mtime
= CURRENT_TIME
;
1184 dout("rbd_do_request object_name=%s ofs=%llu len=%llu\n",
1185 object_name
, (unsigned long long) ofs
,
1186 (unsigned long long) len
);
1188 osdc
= &rbd_dev
->rbd_client
->client
->osdc
;
1189 osd_req
= ceph_osdc_alloc_request(osdc
, snapc
, 1, false, GFP_NOIO
);
1193 osd_req
->r_flags
= flags
;
1194 osd_req
->r_pages
= pages
;
1196 osd_req
->r_bio
= bio
;
1197 bio_get(osd_req
->r_bio
);
1200 osd_req
->r_callback
= rbd_cb
;
1201 osd_req
->r_priv
= NULL
;
1203 strncpy(osd_req
->r_oid
, object_name
, sizeof(osd_req
->r_oid
));
1204 osd_req
->r_oid_len
= strlen(osd_req
->r_oid
);
1206 osd_req
->r_file_layout
= rbd_dev
->layout
; /* struct */
1207 osd_req
->r_num_pages
= calc_pages_for(ofs
, len
);
1208 osd_req
->r_page_alignment
= ofs
& ~PAGE_MASK
;
1210 ceph_osdc_build_request(osd_req
, ofs
, len
, 1, op
,
1211 snapc
, snapid
, &mtime
);
1213 if (op
->op
== CEPH_OSD_OP_WATCH
&& op
->watch
.flag
) {
1214 ceph_osdc_set_request_linger(osdc
, osd_req
);
1215 rbd_dev
->watch_request
= osd_req
;
1218 ret
= ceph_osdc_start_request(osdc
, osd_req
, false);
1225 ret
= ceph_osdc_wait_request(osdc
, osd_req
);
1226 version
= le64_to_cpu(osd_req
->r_reassert_version
.version
);
1229 dout("reassert_ver=%llu\n", (unsigned long long) version
);
1230 ceph_osdc_put_request(osd_req
);
1236 bio_chain_put(osd_req
->r_bio
);
1237 ceph_osdc_put_request(osd_req
);
1242 static void rbd_simple_req_cb(struct ceph_osd_request
*osd_req
,
1243 struct ceph_msg
*msg
)
1245 ceph_osdc_put_request(osd_req
);
1249 * Do a synchronous ceph osd operation
1251 static int rbd_req_sync_op(struct rbd_device
*rbd_dev
,
1253 struct ceph_osd_req_op
*op
,
1254 const char *object_name
,
1255 u64 ofs
, u64 inbound_size
,
1260 struct page
**pages
;
1263 rbd_assert(op
!= NULL
);
1265 num_pages
= calc_pages_for(ofs
, inbound_size
);
1266 pages
= ceph_alloc_page_vector(num_pages
, GFP_KERNEL
);
1268 return PTR_ERR(pages
);
1270 ret
= rbd_do_request(NULL
, rbd_dev
, NULL
, CEPH_NOSNAP
,
1271 object_name
, ofs
, inbound_size
, NULL
,
1280 if ((flags
& CEPH_OSD_FLAG_READ
) && inbound
)
1281 ret
= ceph_copy_from_page_vector(pages
, inbound
, ofs
, ret
);
1284 ceph_release_page_vector(pages
, num_pages
);
1288 static int rbd_obj_request_submit(struct ceph_osd_client
*osdc
,
1289 struct rbd_obj_request
*obj_request
)
1291 return ceph_osdc_start_request(osdc
, obj_request
->osd_req
, false);
1294 static void rbd_img_request_complete(struct rbd_img_request
*img_request
)
1296 if (img_request
->callback
)
1297 img_request
->callback(img_request
);
1299 rbd_img_request_put(img_request
);
1302 /* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1304 static int rbd_obj_request_wait(struct rbd_obj_request
*obj_request
)
1306 return wait_for_completion_interruptible(&obj_request
->completion
);
1309 static void rbd_obj_request_complete(struct rbd_obj_request
*obj_request
)
1311 if (obj_request
->callback
)
1312 obj_request
->callback(obj_request
);
1314 complete_all(&obj_request
->completion
);
1318 * Synchronously read a range from an object into a provided buffer
1320 static int rbd_req_sync_read(struct rbd_device
*rbd_dev
,
1321 const char *object_name
,
1326 struct ceph_osd_req_op
*op
;
1329 op
= rbd_osd_req_op_create(CEPH_OSD_OP_READ
, ofs
, len
);
1333 ret
= rbd_req_sync_op(rbd_dev
, CEPH_OSD_FLAG_READ
,
1334 op
, object_name
, ofs
, len
, buf
, ver
);
1335 rbd_osd_req_op_destroy(op
);
1341 * Request sync osd watch
1343 static int rbd_req_sync_notify_ack(struct rbd_device
*rbd_dev
,
1347 struct ceph_osd_req_op
*op
;
1350 op
= rbd_osd_req_op_create(CEPH_OSD_OP_NOTIFY_ACK
, notify_id
, ver
);
1354 ret
= rbd_do_request(NULL
, rbd_dev
, NULL
, CEPH_NOSNAP
,
1355 rbd_dev
->header_name
, 0, 0, NULL
,
1359 rbd_simple_req_cb
, NULL
);
1361 rbd_osd_req_op_destroy(op
);
1366 static void rbd_watch_cb(u64 ver
, u64 notify_id
, u8 opcode
, void *data
)
1368 struct rbd_device
*rbd_dev
= (struct rbd_device
*)data
;
1375 dout("rbd_watch_cb %s notify_id=%llu opcode=%u\n",
1376 rbd_dev
->header_name
, (unsigned long long) notify_id
,
1377 (unsigned int) opcode
);
1378 rc
= rbd_dev_refresh(rbd_dev
, &hver
);
1380 rbd_warn(rbd_dev
, "got notification but failed to "
1381 " update snaps: %d\n", rc
);
1383 rbd_req_sync_notify_ack(rbd_dev
, hver
, notify_id
);
1387 * Request sync osd watch/unwatch. The value of "start" determines
1388 * whether a watch request is being initiated or torn down.
1390 static int rbd_req_sync_watch(struct rbd_device
*rbd_dev
, int start
)
1392 struct ceph_osd_req_op
*op
;
1395 rbd_assert(start
^ !!rbd_dev
->watch_event
);
1396 rbd_assert(start
^ !!rbd_dev
->watch_request
);
1399 struct ceph_osd_client
*osdc
;
1401 osdc
= &rbd_dev
->rbd_client
->client
->osdc
;
1402 ret
= ceph_osdc_create_event(osdc
, rbd_watch_cb
, 0, rbd_dev
,
1403 &rbd_dev
->watch_event
);
1408 op
= rbd_osd_req_op_create(CEPH_OSD_OP_WATCH
,
1409 rbd_dev
->watch_event
->cookie
,
1410 rbd_dev
->header
.obj_version
, start
);
1412 ret
= rbd_req_sync_op(rbd_dev
,
1413 CEPH_OSD_FLAG_WRITE
| CEPH_OSD_FLAG_ONDISK
,
1414 op
, rbd_dev
->header_name
,
1417 /* Cancel the event if we're tearing down, or on error */
1419 if (!start
|| !op
|| ret
< 0) {
1420 ceph_osdc_cancel_event(rbd_dev
->watch_event
);
1421 rbd_dev
->watch_event
= NULL
;
1423 rbd_osd_req_op_destroy(op
);
1429 * Synchronous osd object method call
1431 static int rbd_req_sync_exec(struct rbd_device
*rbd_dev
,
1432 const char *object_name
,
1433 const char *class_name
,
1434 const char *method_name
,
1435 const char *outbound
,
1436 size_t outbound_size
,
1438 size_t inbound_size
,
1441 struct ceph_osd_req_op
*op
;
1445 * Any input parameters required by the method we're calling
1446 * will be sent along with the class and method names as
1447 * part of the message payload. That data and its size are
1448 * supplied via the indata and indata_len fields (named from
1449 * the perspective of the server side) in the OSD request
1452 op
= rbd_osd_req_op_create(CEPH_OSD_OP_CALL
, class_name
,
1453 method_name
, outbound
, outbound_size
);
1457 ret
= rbd_req_sync_op(rbd_dev
, CEPH_OSD_FLAG_READ
, op
,
1458 object_name
, 0, inbound_size
, inbound
,
1461 rbd_osd_req_op_destroy(op
);
1463 dout("cls_exec returned %d\n", ret
);
1467 static void rbd_osd_read_callback(struct rbd_obj_request
*obj_request
,
1468 struct ceph_osd_op
*op
)
1473 * We support a 64-bit length, but ultimately it has to be
1474 * passed to blk_end_request(), which takes an unsigned int.
1476 xferred
= le64_to_cpu(op
->extent
.length
);
1477 rbd_assert(xferred
< (u64
) UINT_MAX
);
1478 if (obj_request
->result
== (s32
) -ENOENT
) {
1479 zero_bio_chain(obj_request
->bio_list
, 0);
1480 obj_request
->result
= 0;
1481 } else if (xferred
< obj_request
->length
&& !obj_request
->result
) {
1482 zero_bio_chain(obj_request
->bio_list
, xferred
);
1483 xferred
= obj_request
->length
;
1485 obj_request
->xferred
= xferred
;
1486 atomic_set(&obj_request
->done
, 1);
1489 static void rbd_osd_write_callback(struct rbd_obj_request
*obj_request
,
1490 struct ceph_osd_op
*op
)
1492 obj_request
->xferred
= le64_to_cpu(op
->extent
.length
);
1493 atomic_set(&obj_request
->done
, 1);
1496 static void rbd_osd_req_callback(struct ceph_osd_request
*osd_req
,
1497 struct ceph_msg
*msg
)
1499 struct rbd_obj_request
*obj_request
= osd_req
->r_priv
;
1500 struct ceph_osd_reply_head
*reply_head
;
1501 struct ceph_osd_op
*op
;
1505 rbd_assert(osd_req
== obj_request
->osd_req
);
1506 rbd_assert(!!obj_request
->img_request
^
1507 (obj_request
->which
== BAD_WHICH
));
1509 obj_request
->xferred
= le32_to_cpu(msg
->hdr
.data_len
);
1510 reply_head
= msg
->front
.iov_base
;
1511 obj_request
->result
= (s32
) le32_to_cpu(reply_head
->result
);
1512 obj_request
->version
= le64_to_cpu(osd_req
->r_reassert_version
.version
);
1514 num_ops
= le32_to_cpu(reply_head
->num_ops
);
1515 WARN_ON(num_ops
!= 1); /* For now */
1517 op
= &reply_head
->ops
[0];
1518 opcode
= le16_to_cpu(op
->op
);
1520 case CEPH_OSD_OP_READ
:
1521 rbd_osd_read_callback(obj_request
, op
);
1523 case CEPH_OSD_OP_WRITE
:
1524 rbd_osd_write_callback(obj_request
, op
);
1527 rbd_warn(NULL
, "%s: unsupported op %hu\n",
1528 obj_request
->object_name
, (unsigned short) opcode
);
1532 if (atomic_read(&obj_request
->done
))
1533 rbd_obj_request_complete(obj_request
);
1536 static struct ceph_osd_request
*rbd_osd_req_create(
1537 struct rbd_device
*rbd_dev
,
1539 struct rbd_obj_request
*obj_request
,
1540 struct ceph_osd_req_op
*op
)
1542 struct rbd_img_request
*img_request
= obj_request
->img_request
;
1543 struct ceph_snap_context
*snapc
= NULL
;
1544 struct ceph_osd_client
*osdc
;
1545 struct ceph_osd_request
*osd_req
;
1546 struct timespec now
;
1547 struct timespec
*mtime
;
1548 u64 snap_id
= CEPH_NOSNAP
;
1549 u64 offset
= obj_request
->offset
;
1550 u64 length
= obj_request
->length
;
1553 rbd_assert(img_request
->write_request
== write_request
);
1554 if (img_request
->write_request
)
1555 snapc
= img_request
->snapc
;
1557 snap_id
= img_request
->snap_id
;
1560 /* Allocate and initialize the request, for the single op */
1562 osdc
= &rbd_dev
->rbd_client
->client
->osdc
;
1563 osd_req
= ceph_osdc_alloc_request(osdc
, snapc
, 1, false, GFP_ATOMIC
);
1565 return NULL
; /* ENOMEM */
1567 rbd_assert(obj_request_type_valid(obj_request
->type
));
1568 switch (obj_request
->type
) {
1569 case OBJ_REQUEST_BIO
:
1570 rbd_assert(obj_request
->bio_list
!= NULL
);
1571 osd_req
->r_bio
= obj_request
->bio_list
;
1572 bio_get(osd_req
->r_bio
);
1573 /* osd client requires "num pages" even for bio */
1574 osd_req
->r_num_pages
= calc_pages_for(offset
, length
);
1576 case OBJ_REQUEST_PAGES
:
1577 osd_req
->r_pages
= obj_request
->pages
;
1578 osd_req
->r_num_pages
= obj_request
->page_count
;
1579 osd_req
->r_page_alignment
= offset
& ~PAGE_MASK
;
1583 if (write_request
) {
1584 osd_req
->r_flags
= CEPH_OSD_FLAG_WRITE
| CEPH_OSD_FLAG_ONDISK
;
1588 osd_req
->r_flags
= CEPH_OSD_FLAG_READ
;
1589 mtime
= NULL
; /* not needed for reads */
1590 offset
= 0; /* These are not used... */
1591 length
= 0; /* ...for osd read requests */
1594 osd_req
->r_callback
= rbd_osd_req_callback
;
1595 osd_req
->r_priv
= obj_request
;
1597 osd_req
->r_oid_len
= strlen(obj_request
->object_name
);
1598 rbd_assert(osd_req
->r_oid_len
< sizeof (osd_req
->r_oid
));
1599 memcpy(osd_req
->r_oid
, obj_request
->object_name
, osd_req
->r_oid_len
);
1601 osd_req
->r_file_layout
= rbd_dev
->layout
; /* struct */
1603 /* osd_req will get its own reference to snapc (if non-null) */
1605 ceph_osdc_build_request(osd_req
, offset
, length
, 1, op
,
1606 snapc
, snap_id
, mtime
);
1611 static void rbd_osd_req_destroy(struct ceph_osd_request
*osd_req
)
1613 ceph_osdc_put_request(osd_req
);
1616 /* object_name is assumed to be a non-null pointer and NUL-terminated */
1618 static struct rbd_obj_request
*rbd_obj_request_create(const char *object_name
,
1619 u64 offset
, u64 length
,
1620 enum obj_request_type type
)
1622 struct rbd_obj_request
*obj_request
;
1626 rbd_assert(obj_request_type_valid(type
));
1628 size
= strlen(object_name
) + 1;
1629 obj_request
= kzalloc(sizeof (*obj_request
) + size
, GFP_KERNEL
);
1633 name
= (char *)(obj_request
+ 1);
1634 obj_request
->object_name
= memcpy(name
, object_name
, size
);
1635 obj_request
->offset
= offset
;
1636 obj_request
->length
= length
;
1637 obj_request
->which
= BAD_WHICH
;
1638 obj_request
->type
= type
;
1639 INIT_LIST_HEAD(&obj_request
->links
);
1640 atomic_set(&obj_request
->done
, 0);
1641 init_completion(&obj_request
->completion
);
1642 kref_init(&obj_request
->kref
);
1647 static void rbd_obj_request_destroy(struct kref
*kref
)
1649 struct rbd_obj_request
*obj_request
;
1651 obj_request
= container_of(kref
, struct rbd_obj_request
, kref
);
1653 rbd_assert(obj_request
->img_request
== NULL
);
1654 rbd_assert(obj_request
->which
== BAD_WHICH
);
1656 if (obj_request
->osd_req
)
1657 rbd_osd_req_destroy(obj_request
->osd_req
);
1659 rbd_assert(obj_request_type_valid(obj_request
->type
));
1660 switch (obj_request
->type
) {
1661 case OBJ_REQUEST_BIO
:
1662 if (obj_request
->bio_list
)
1663 bio_chain_put(obj_request
->bio_list
);
1665 case OBJ_REQUEST_PAGES
:
1666 if (obj_request
->pages
)
1667 ceph_release_page_vector(obj_request
->pages
,
1668 obj_request
->page_count
);
1676 * Caller is responsible for filling in the list of object requests
1677 * that comprises the image request, and the Linux request pointer
1678 * (if there is one).
1680 struct rbd_img_request
*rbd_img_request_create(struct rbd_device
*rbd_dev
,
1681 u64 offset
, u64 length
,
1684 struct rbd_img_request
*img_request
;
1685 struct ceph_snap_context
*snapc
= NULL
;
1687 img_request
= kmalloc(sizeof (*img_request
), GFP_ATOMIC
);
1691 if (write_request
) {
1692 down_read(&rbd_dev
->header_rwsem
);
1693 snapc
= ceph_get_snap_context(rbd_dev
->header
.snapc
);
1694 up_read(&rbd_dev
->header_rwsem
);
1695 if (WARN_ON(!snapc
)) {
1697 return NULL
; /* Shouldn't happen */
1701 img_request
->rq
= NULL
;
1702 img_request
->rbd_dev
= rbd_dev
;
1703 img_request
->offset
= offset
;
1704 img_request
->length
= length
;
1705 img_request
->write_request
= write_request
;
1707 img_request
->snapc
= snapc
;
1709 img_request
->snap_id
= rbd_dev
->spec
->snap_id
;
1710 spin_lock_init(&img_request
->completion_lock
);
1711 img_request
->next_completion
= 0;
1712 img_request
->callback
= NULL
;
1713 img_request
->obj_request_count
= 0;
1714 INIT_LIST_HEAD(&img_request
->obj_requests
);
1715 kref_init(&img_request
->kref
);
1717 rbd_img_request_get(img_request
); /* Avoid a warning */
1718 rbd_img_request_put(img_request
); /* TEMPORARY */
1723 static void rbd_img_request_destroy(struct kref
*kref
)
1725 struct rbd_img_request
*img_request
;
1726 struct rbd_obj_request
*obj_request
;
1727 struct rbd_obj_request
*next_obj_request
;
1729 img_request
= container_of(kref
, struct rbd_img_request
, kref
);
1731 for_each_obj_request_safe(img_request
, obj_request
, next_obj_request
)
1732 rbd_img_obj_request_del(img_request
, obj_request
);
1734 if (img_request
->write_request
)
1735 ceph_put_snap_context(img_request
->snapc
);
1740 static int rbd_img_request_fill_bio(struct rbd_img_request
*img_request
,
1741 struct bio
*bio_list
)
1743 struct rbd_device
*rbd_dev
= img_request
->rbd_dev
;
1744 struct rbd_obj_request
*obj_request
= NULL
;
1745 struct rbd_obj_request
*next_obj_request
;
1746 unsigned int bio_offset
;
1751 opcode
= img_request
->write_request
? CEPH_OSD_OP_WRITE
1754 image_offset
= img_request
->offset
;
1755 rbd_assert(image_offset
== bio_list
->bi_sector
<< SECTOR_SHIFT
);
1756 resid
= img_request
->length
;
1758 const char *object_name
;
1759 unsigned int clone_size
;
1760 struct ceph_osd_req_op
*op
;
1764 object_name
= rbd_segment_name(rbd_dev
, image_offset
);
1767 offset
= rbd_segment_offset(rbd_dev
, image_offset
);
1768 length
= rbd_segment_length(rbd_dev
, image_offset
, resid
);
1769 obj_request
= rbd_obj_request_create(object_name
,
1772 kfree(object_name
); /* object request has its own copy */
1776 rbd_assert(length
<= (u64
) UINT_MAX
);
1777 clone_size
= (unsigned int) length
;
1778 obj_request
->bio_list
= bio_chain_clone_range(&bio_list
,
1779 &bio_offset
, clone_size
,
1781 if (!obj_request
->bio_list
)
1785 * Build up the op to use in building the osd
1786 * request. Note that the contents of the op are
1787 * copied by rbd_osd_req_create().
1789 op
= rbd_osd_req_op_create(opcode
, offset
, length
);
1792 obj_request
->osd_req
= rbd_osd_req_create(rbd_dev
,
1793 img_request
->write_request
,
1795 rbd_osd_req_op_destroy(op
);
1796 if (!obj_request
->osd_req
)
1798 /* status and version are initially zero-filled */
1800 rbd_img_obj_request_add(img_request
, obj_request
);
1802 image_offset
+= length
;
1809 rbd_obj_request_put(obj_request
);
1811 for_each_obj_request_safe(img_request
, obj_request
, next_obj_request
)
1812 rbd_obj_request_put(obj_request
);
1817 static void rbd_img_obj_callback(struct rbd_obj_request
*obj_request
)
1819 struct rbd_img_request
*img_request
;
1820 u32 which
= obj_request
->which
;
1823 img_request
= obj_request
->img_request
;
1824 rbd_assert(img_request
!= NULL
);
1825 rbd_assert(img_request
->rq
!= NULL
);
1826 rbd_assert(which
!= BAD_WHICH
);
1827 rbd_assert(which
< img_request
->obj_request_count
);
1828 rbd_assert(which
>= img_request
->next_completion
);
1830 spin_lock_irq(&img_request
->completion_lock
);
1831 if (which
!= img_request
->next_completion
)
1834 for_each_obj_request_from(img_request
, obj_request
) {
1835 unsigned int xferred
;
1839 rbd_assert(which
< img_request
->obj_request_count
);
1841 if (!atomic_read(&obj_request
->done
))
1844 rbd_assert(obj_request
->xferred
<= (u64
) UINT_MAX
);
1845 xferred
= (unsigned int) obj_request
->xferred
;
1846 result
= (int) obj_request
->result
;
1848 rbd_warn(NULL
, "obj_request %s result %d xferred %u\n",
1849 img_request
->write_request
? "write" : "read",
1852 more
= blk_end_request(img_request
->rq
, result
, xferred
);
1855 rbd_assert(more
^ (which
== img_request
->obj_request_count
));
1856 img_request
->next_completion
= which
;
1858 spin_unlock_irq(&img_request
->completion_lock
);
1861 rbd_img_request_complete(img_request
);
1864 static int rbd_img_request_submit(struct rbd_img_request
*img_request
)
1866 struct rbd_device
*rbd_dev
= img_request
->rbd_dev
;
1867 struct ceph_osd_client
*osdc
= &rbd_dev
->rbd_client
->client
->osdc
;
1868 struct rbd_obj_request
*obj_request
;
1870 for_each_obj_request(img_request
, obj_request
) {
1873 obj_request
->callback
= rbd_img_obj_callback
;
1874 ret
= rbd_obj_request_submit(osdc
, obj_request
);
1878 * The image request has its own reference to each
1879 * of its object requests, so we can safely drop the
1882 rbd_obj_request_put(obj_request
);
1888 static void rbd_request_fn(struct request_queue
*q
)
1890 struct rbd_device
*rbd_dev
= q
->queuedata
;
1891 bool read_only
= rbd_dev
->mapping
.read_only
;
1895 while ((rq
= blk_fetch_request(q
))) {
1896 bool write_request
= rq_data_dir(rq
) == WRITE
;
1897 struct rbd_img_request
*img_request
;
1901 /* Ignore any non-FS requests that filter through. */
1903 if (rq
->cmd_type
!= REQ_TYPE_FS
) {
1904 __blk_end_request_all(rq
, 0);
1908 spin_unlock_irq(q
->queue_lock
);
1910 /* Disallow writes to a read-only device */
1912 if (write_request
) {
1916 rbd_assert(rbd_dev
->spec
->snap_id
== CEPH_NOSNAP
);
1919 /* Quit early if the snapshot has disappeared */
1921 if (!atomic_read(&rbd_dev
->exists
)) {
1922 dout("request for non-existent snapshot");
1923 rbd_assert(rbd_dev
->spec
->snap_id
!= CEPH_NOSNAP
);
1928 offset
= (u64
) blk_rq_pos(rq
) << SECTOR_SHIFT
;
1929 length
= (u64
) blk_rq_bytes(rq
);
1932 if (WARN_ON(offset
&& length
> U64_MAX
- offset
+ 1))
1933 goto end_request
; /* Shouldn't happen */
1936 img_request
= rbd_img_request_create(rbd_dev
, offset
, length
,
1941 img_request
->rq
= rq
;
1943 result
= rbd_img_request_fill_bio(img_request
, rq
->bio
);
1945 result
= rbd_img_request_submit(img_request
);
1947 rbd_img_request_put(img_request
);
1949 spin_lock_irq(q
->queue_lock
);
1951 rbd_warn(rbd_dev
, "obj_request %s result %d\n",
1952 write_request
? "write" : "read", result
);
1953 __blk_end_request_all(rq
, result
);
1959 * a queue callback. Makes sure that we don't create a bio that spans across
1960 * multiple osd objects. One exception would be with a single page bios,
1961 * which we handle later at bio_chain_clone_range()
1963 static int rbd_merge_bvec(struct request_queue
*q
, struct bvec_merge_data
*bmd
,
1964 struct bio_vec
*bvec
)
1966 struct rbd_device
*rbd_dev
= q
->queuedata
;
1967 sector_t sector_offset
;
1968 sector_t sectors_per_obj
;
1969 sector_t obj_sector_offset
;
1973 * Find how far into its rbd object the partition-relative
1974 * bio start sector is to offset relative to the enclosing
1977 sector_offset
= get_start_sect(bmd
->bi_bdev
) + bmd
->bi_sector
;
1978 sectors_per_obj
= 1 << (rbd_dev
->header
.obj_order
- SECTOR_SHIFT
);
1979 obj_sector_offset
= sector_offset
& (sectors_per_obj
- 1);
1982 * Compute the number of bytes from that offset to the end
1983 * of the object. Account for what's already used by the bio.
1985 ret
= (int) (sectors_per_obj
- obj_sector_offset
) << SECTOR_SHIFT
;
1986 if (ret
> bmd
->bi_size
)
1987 ret
-= bmd
->bi_size
;
1992 * Don't send back more than was asked for. And if the bio
1993 * was empty, let the whole thing through because: "Note
1994 * that a block device *must* allow a single page to be
1995 * added to an empty bio."
1997 rbd_assert(bvec
->bv_len
<= PAGE_SIZE
);
1998 if (ret
> (int) bvec
->bv_len
|| !bmd
->bi_size
)
1999 ret
= (int) bvec
->bv_len
;
2004 static void rbd_free_disk(struct rbd_device
*rbd_dev
)
2006 struct gendisk
*disk
= rbd_dev
->disk
;
2011 if (disk
->flags
& GENHD_FL_UP
)
2014 blk_cleanup_queue(disk
->queue
);
2018 static int rbd_obj_read_sync(struct rbd_device
*rbd_dev
,
2019 const char *object_name
,
2020 u64 offset
, u64 length
,
2021 char *buf
, u64
*version
)
2024 struct ceph_osd_req_op
*op
;
2025 struct rbd_obj_request
*obj_request
;
2026 struct ceph_osd_client
*osdc
;
2027 struct page
**pages
= NULL
;
2031 page_count
= (u32
) calc_pages_for(offset
, length
);
2032 pages
= ceph_alloc_page_vector(page_count
, GFP_KERNEL
);
2034 ret
= PTR_ERR(pages
);
2037 obj_request
= rbd_obj_request_create(object_name
, offset
, length
,
2042 obj_request
->pages
= pages
;
2043 obj_request
->page_count
= page_count
;
2045 op
= rbd_osd_req_op_create(CEPH_OSD_OP_READ
, offset
, length
);
2048 obj_request
->osd_req
= rbd_osd_req_create(rbd_dev
, false,
2050 rbd_osd_req_op_destroy(op
);
2051 if (!obj_request
->osd_req
)
2054 osdc
= &rbd_dev
->rbd_client
->client
->osdc
;
2055 ret
= rbd_obj_request_submit(osdc
, obj_request
);
2058 ret
= rbd_obj_request_wait(obj_request
);
2062 ret
= obj_request
->result
;
2065 ret
= ceph_copy_from_page_vector(pages
, buf
, 0, obj_request
->xferred
);
2067 *version
= obj_request
->version
;
2070 rbd_obj_request_put(obj_request
);
2072 ceph_release_page_vector(pages
, page_count
);
2078 * Read the complete header for the given rbd device.
2080 * Returns a pointer to a dynamically-allocated buffer containing
2081 * the complete and validated header. Caller can pass the address
2082 * of a variable that will be filled in with the version of the
2083 * header object at the time it was read.
2085 * Returns a pointer-coded errno if a failure occurs.
2087 static struct rbd_image_header_ondisk
*
2088 rbd_dev_v1_header_read(struct rbd_device
*rbd_dev
, u64
*version
)
2090 struct rbd_image_header_ondisk
*ondisk
= NULL
;
2097 * The complete header will include an array of its 64-bit
2098 * snapshot ids, followed by the names of those snapshots as
2099 * a contiguous block of NUL-terminated strings. Note that
2100 * the number of snapshots could change by the time we read
2101 * it in, in which case we re-read it.
2108 size
= sizeof (*ondisk
);
2109 size
+= snap_count
* sizeof (struct rbd_image_snap_ondisk
);
2111 ondisk
= kmalloc(size
, GFP_KERNEL
);
2113 return ERR_PTR(-ENOMEM
);
2115 (void) rbd_req_sync_read
; /* avoid a warning */
2116 ret
= rbd_obj_read_sync(rbd_dev
, rbd_dev
->header_name
,
2118 (char *) ondisk
, version
);
2122 if (WARN_ON((size_t) ret
< size
)) {
2124 rbd_warn(rbd_dev
, "short header read (want %zd got %d)",
2128 if (!rbd_dev_ondisk_valid(ondisk
)) {
2130 rbd_warn(rbd_dev
, "invalid header");
2134 names_size
= le64_to_cpu(ondisk
->snap_names_len
);
2135 want_count
= snap_count
;
2136 snap_count
= le32_to_cpu(ondisk
->snap_count
);
2137 } while (snap_count
!= want_count
);
2144 return ERR_PTR(ret
);
2148 * reload the ondisk the header
2150 static int rbd_read_header(struct rbd_device
*rbd_dev
,
2151 struct rbd_image_header
*header
)
2153 struct rbd_image_header_ondisk
*ondisk
;
2157 ondisk
= rbd_dev_v1_header_read(rbd_dev
, &ver
);
2159 return PTR_ERR(ondisk
);
2160 ret
= rbd_header_from_disk(header
, ondisk
);
2162 header
->obj_version
= ver
;
2168 static void rbd_remove_all_snaps(struct rbd_device
*rbd_dev
)
2170 struct rbd_snap
*snap
;
2171 struct rbd_snap
*next
;
2173 list_for_each_entry_safe(snap
, next
, &rbd_dev
->snaps
, node
)
2174 rbd_remove_snap_dev(snap
);
2177 static void rbd_update_mapping_size(struct rbd_device
*rbd_dev
)
2181 if (rbd_dev
->spec
->snap_id
!= CEPH_NOSNAP
)
2184 size
= (sector_t
) rbd_dev
->header
.image_size
/ SECTOR_SIZE
;
2185 dout("setting size to %llu sectors", (unsigned long long) size
);
2186 rbd_dev
->mapping
.size
= (u64
) size
;
2187 set_capacity(rbd_dev
->disk
, size
);
2191 * only read the first part of the ondisk header, without the snaps info
2193 static int rbd_dev_v1_refresh(struct rbd_device
*rbd_dev
, u64
*hver
)
2196 struct rbd_image_header h
;
2198 ret
= rbd_read_header(rbd_dev
, &h
);
2202 down_write(&rbd_dev
->header_rwsem
);
2204 /* Update image size, and check for resize of mapped image */
2205 rbd_dev
->header
.image_size
= h
.image_size
;
2206 rbd_update_mapping_size(rbd_dev
);
2208 /* rbd_dev->header.object_prefix shouldn't change */
2209 kfree(rbd_dev
->header
.snap_sizes
);
2210 kfree(rbd_dev
->header
.snap_names
);
2211 /* osd requests may still refer to snapc */
2212 ceph_put_snap_context(rbd_dev
->header
.snapc
);
2215 *hver
= h
.obj_version
;
2216 rbd_dev
->header
.obj_version
= h
.obj_version
;
2217 rbd_dev
->header
.image_size
= h
.image_size
;
2218 rbd_dev
->header
.snapc
= h
.snapc
;
2219 rbd_dev
->header
.snap_names
= h
.snap_names
;
2220 rbd_dev
->header
.snap_sizes
= h
.snap_sizes
;
2221 /* Free the extra copy of the object prefix */
2222 WARN_ON(strcmp(rbd_dev
->header
.object_prefix
, h
.object_prefix
));
2223 kfree(h
.object_prefix
);
2225 ret
= rbd_dev_snaps_update(rbd_dev
);
2227 ret
= rbd_dev_snaps_register(rbd_dev
);
2229 up_write(&rbd_dev
->header_rwsem
);
2234 static int rbd_dev_refresh(struct rbd_device
*rbd_dev
, u64
*hver
)
2238 rbd_assert(rbd_image_format_valid(rbd_dev
->image_format
));
2239 mutex_lock_nested(&ctl_mutex
, SINGLE_DEPTH_NESTING
);
2240 if (rbd_dev
->image_format
== 1)
2241 ret
= rbd_dev_v1_refresh(rbd_dev
, hver
);
2243 ret
= rbd_dev_v2_refresh(rbd_dev
, hver
);
2244 mutex_unlock(&ctl_mutex
);
2249 static int rbd_init_disk(struct rbd_device
*rbd_dev
)
2251 struct gendisk
*disk
;
2252 struct request_queue
*q
;
2255 /* create gendisk info */
2256 disk
= alloc_disk(RBD_MINORS_PER_MAJOR
);
2260 snprintf(disk
->disk_name
, sizeof(disk
->disk_name
), RBD_DRV_NAME
"%d",
2262 disk
->major
= rbd_dev
->major
;
2263 disk
->first_minor
= 0;
2264 disk
->fops
= &rbd_bd_ops
;
2265 disk
->private_data
= rbd_dev
;
2267 q
= blk_init_queue(rbd_request_fn
, &rbd_dev
->lock
);
2271 /* We use the default size, but let's be explicit about it. */
2272 blk_queue_physical_block_size(q
, SECTOR_SIZE
);
2274 /* set io sizes to object size */
2275 segment_size
= rbd_obj_bytes(&rbd_dev
->header
);
2276 blk_queue_max_hw_sectors(q
, segment_size
/ SECTOR_SIZE
);
2277 blk_queue_max_segment_size(q
, segment_size
);
2278 blk_queue_io_min(q
, segment_size
);
2279 blk_queue_io_opt(q
, segment_size
);
2281 blk_queue_merge_bvec(q
, rbd_merge_bvec
);
2284 q
->queuedata
= rbd_dev
;
2286 rbd_dev
->disk
= disk
;
2288 set_capacity(rbd_dev
->disk
, rbd_dev
->mapping
.size
/ SECTOR_SIZE
);
2301 static struct rbd_device
*dev_to_rbd_dev(struct device
*dev
)
2303 return container_of(dev
, struct rbd_device
, dev
);
2306 static ssize_t
rbd_size_show(struct device
*dev
,
2307 struct device_attribute
*attr
, char *buf
)
2309 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
2312 down_read(&rbd_dev
->header_rwsem
);
2313 size
= get_capacity(rbd_dev
->disk
);
2314 up_read(&rbd_dev
->header_rwsem
);
2316 return sprintf(buf
, "%llu\n", (unsigned long long) size
* SECTOR_SIZE
);
2320 * Note this shows the features for whatever's mapped, which is not
2321 * necessarily the base image.
2323 static ssize_t
rbd_features_show(struct device
*dev
,
2324 struct device_attribute
*attr
, char *buf
)
2326 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
2328 return sprintf(buf
, "0x%016llx\n",
2329 (unsigned long long) rbd_dev
->mapping
.features
);
2332 static ssize_t
rbd_major_show(struct device
*dev
,
2333 struct device_attribute
*attr
, char *buf
)
2335 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
2337 return sprintf(buf
, "%d\n", rbd_dev
->major
);
2340 static ssize_t
rbd_client_id_show(struct device
*dev
,
2341 struct device_attribute
*attr
, char *buf
)
2343 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
2345 return sprintf(buf
, "client%lld\n",
2346 ceph_client_id(rbd_dev
->rbd_client
->client
));
2349 static ssize_t
rbd_pool_show(struct device
*dev
,
2350 struct device_attribute
*attr
, char *buf
)
2352 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
2354 return sprintf(buf
, "%s\n", rbd_dev
->spec
->pool_name
);
2357 static ssize_t
rbd_pool_id_show(struct device
*dev
,
2358 struct device_attribute
*attr
, char *buf
)
2360 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
2362 return sprintf(buf
, "%llu\n",
2363 (unsigned long long) rbd_dev
->spec
->pool_id
);
2366 static ssize_t
rbd_name_show(struct device
*dev
,
2367 struct device_attribute
*attr
, char *buf
)
2369 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
2371 if (rbd_dev
->spec
->image_name
)
2372 return sprintf(buf
, "%s\n", rbd_dev
->spec
->image_name
);
2374 return sprintf(buf
, "(unknown)\n");
2377 static ssize_t
rbd_image_id_show(struct device
*dev
,
2378 struct device_attribute
*attr
, char *buf
)
2380 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
2382 return sprintf(buf
, "%s\n", rbd_dev
->spec
->image_id
);
2386 * Shows the name of the currently-mapped snapshot (or
2387 * RBD_SNAP_HEAD_NAME for the base image).
2389 static ssize_t
rbd_snap_show(struct device
*dev
,
2390 struct device_attribute
*attr
,
2393 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
2395 return sprintf(buf
, "%s\n", rbd_dev
->spec
->snap_name
);
2399 * For an rbd v2 image, shows the pool id, image id, and snapshot id
2400 * for the parent image. If there is no parent, simply shows
2401 * "(no parent image)".
2403 static ssize_t
rbd_parent_show(struct device
*dev
,
2404 struct device_attribute
*attr
,
2407 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
2408 struct rbd_spec
*spec
= rbd_dev
->parent_spec
;
2413 return sprintf(buf
, "(no parent image)\n");
2415 count
= sprintf(bufp
, "pool_id %llu\npool_name %s\n",
2416 (unsigned long long) spec
->pool_id
, spec
->pool_name
);
2421 count
= sprintf(bufp
, "image_id %s\nimage_name %s\n", spec
->image_id
,
2422 spec
->image_name
? spec
->image_name
: "(unknown)");
2427 count
= sprintf(bufp
, "snap_id %llu\nsnap_name %s\n",
2428 (unsigned long long) spec
->snap_id
, spec
->snap_name
);
2433 count
= sprintf(bufp
, "overlap %llu\n", rbd_dev
->parent_overlap
);
2438 return (ssize_t
) (bufp
- buf
);
2441 static ssize_t
rbd_image_refresh(struct device
*dev
,
2442 struct device_attribute
*attr
,
2446 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
2449 ret
= rbd_dev_refresh(rbd_dev
, NULL
);
2451 return ret
< 0 ? ret
: size
;
2454 static DEVICE_ATTR(size
, S_IRUGO
, rbd_size_show
, NULL
);
2455 static DEVICE_ATTR(features
, S_IRUGO
, rbd_features_show
, NULL
);
2456 static DEVICE_ATTR(major
, S_IRUGO
, rbd_major_show
, NULL
);
2457 static DEVICE_ATTR(client_id
, S_IRUGO
, rbd_client_id_show
, NULL
);
2458 static DEVICE_ATTR(pool
, S_IRUGO
, rbd_pool_show
, NULL
);
2459 static DEVICE_ATTR(pool_id
, S_IRUGO
, rbd_pool_id_show
, NULL
);
2460 static DEVICE_ATTR(name
, S_IRUGO
, rbd_name_show
, NULL
);
2461 static DEVICE_ATTR(image_id
, S_IRUGO
, rbd_image_id_show
, NULL
);
2462 static DEVICE_ATTR(refresh
, S_IWUSR
, NULL
, rbd_image_refresh
);
2463 static DEVICE_ATTR(current_snap
, S_IRUGO
, rbd_snap_show
, NULL
);
2464 static DEVICE_ATTR(parent
, S_IRUGO
, rbd_parent_show
, NULL
);
2466 static struct attribute
*rbd_attrs
[] = {
2467 &dev_attr_size
.attr
,
2468 &dev_attr_features
.attr
,
2469 &dev_attr_major
.attr
,
2470 &dev_attr_client_id
.attr
,
2471 &dev_attr_pool
.attr
,
2472 &dev_attr_pool_id
.attr
,
2473 &dev_attr_name
.attr
,
2474 &dev_attr_image_id
.attr
,
2475 &dev_attr_current_snap
.attr
,
2476 &dev_attr_parent
.attr
,
2477 &dev_attr_refresh
.attr
,
2481 static struct attribute_group rbd_attr_group
= {
2485 static const struct attribute_group
*rbd_attr_groups
[] = {
2490 static void rbd_sysfs_dev_release(struct device
*dev
)
2494 static struct device_type rbd_device_type
= {
2496 .groups
= rbd_attr_groups
,
2497 .release
= rbd_sysfs_dev_release
,
2505 static ssize_t
rbd_snap_size_show(struct device
*dev
,
2506 struct device_attribute
*attr
,
2509 struct rbd_snap
*snap
= container_of(dev
, struct rbd_snap
, dev
);
2511 return sprintf(buf
, "%llu\n", (unsigned long long)snap
->size
);
2514 static ssize_t
rbd_snap_id_show(struct device
*dev
,
2515 struct device_attribute
*attr
,
2518 struct rbd_snap
*snap
= container_of(dev
, struct rbd_snap
, dev
);
2520 return sprintf(buf
, "%llu\n", (unsigned long long)snap
->id
);
2523 static ssize_t
rbd_snap_features_show(struct device
*dev
,
2524 struct device_attribute
*attr
,
2527 struct rbd_snap
*snap
= container_of(dev
, struct rbd_snap
, dev
);
2529 return sprintf(buf
, "0x%016llx\n",
2530 (unsigned long long) snap
->features
);
2533 static DEVICE_ATTR(snap_size
, S_IRUGO
, rbd_snap_size_show
, NULL
);
2534 static DEVICE_ATTR(snap_id
, S_IRUGO
, rbd_snap_id_show
, NULL
);
2535 static DEVICE_ATTR(snap_features
, S_IRUGO
, rbd_snap_features_show
, NULL
);
2537 static struct attribute
*rbd_snap_attrs
[] = {
2538 &dev_attr_snap_size
.attr
,
2539 &dev_attr_snap_id
.attr
,
2540 &dev_attr_snap_features
.attr
,
2544 static struct attribute_group rbd_snap_attr_group
= {
2545 .attrs
= rbd_snap_attrs
,
2548 static void rbd_snap_dev_release(struct device
*dev
)
2550 struct rbd_snap
*snap
= container_of(dev
, struct rbd_snap
, dev
);
2555 static const struct attribute_group
*rbd_snap_attr_groups
[] = {
2556 &rbd_snap_attr_group
,
2560 static struct device_type rbd_snap_device_type
= {
2561 .groups
= rbd_snap_attr_groups
,
2562 .release
= rbd_snap_dev_release
,
2565 static struct rbd_spec
*rbd_spec_get(struct rbd_spec
*spec
)
2567 kref_get(&spec
->kref
);
2572 static void rbd_spec_free(struct kref
*kref
);
2573 static void rbd_spec_put(struct rbd_spec
*spec
)
2576 kref_put(&spec
->kref
, rbd_spec_free
);
2579 static struct rbd_spec
*rbd_spec_alloc(void)
2581 struct rbd_spec
*spec
;
2583 spec
= kzalloc(sizeof (*spec
), GFP_KERNEL
);
2586 kref_init(&spec
->kref
);
2588 rbd_spec_put(rbd_spec_get(spec
)); /* TEMPORARY */
2593 static void rbd_spec_free(struct kref
*kref
)
2595 struct rbd_spec
*spec
= container_of(kref
, struct rbd_spec
, kref
);
2597 kfree(spec
->pool_name
);
2598 kfree(spec
->image_id
);
2599 kfree(spec
->image_name
);
2600 kfree(spec
->snap_name
);
2604 struct rbd_device
*rbd_dev_create(struct rbd_client
*rbdc
,
2605 struct rbd_spec
*spec
)
2607 struct rbd_device
*rbd_dev
;
2609 rbd_dev
= kzalloc(sizeof (*rbd_dev
), GFP_KERNEL
);
2613 spin_lock_init(&rbd_dev
->lock
);
2614 atomic_set(&rbd_dev
->exists
, 0);
2615 INIT_LIST_HEAD(&rbd_dev
->node
);
2616 INIT_LIST_HEAD(&rbd_dev
->snaps
);
2617 init_rwsem(&rbd_dev
->header_rwsem
);
2619 rbd_dev
->spec
= spec
;
2620 rbd_dev
->rbd_client
= rbdc
;
2622 /* Initialize the layout used for all rbd requests */
2624 rbd_dev
->layout
.fl_stripe_unit
= cpu_to_le32(1 << RBD_MAX_OBJ_ORDER
);
2625 rbd_dev
->layout
.fl_stripe_count
= cpu_to_le32(1);
2626 rbd_dev
->layout
.fl_object_size
= cpu_to_le32(1 << RBD_MAX_OBJ_ORDER
);
2627 rbd_dev
->layout
.fl_pg_pool
= cpu_to_le32((u32
) spec
->pool_id
);
2632 static void rbd_dev_destroy(struct rbd_device
*rbd_dev
)
2634 rbd_spec_put(rbd_dev
->parent_spec
);
2635 kfree(rbd_dev
->header_name
);
2636 rbd_put_client(rbd_dev
->rbd_client
);
2637 rbd_spec_put(rbd_dev
->spec
);
2641 static bool rbd_snap_registered(struct rbd_snap
*snap
)
2643 bool ret
= snap
->dev
.type
== &rbd_snap_device_type
;
2644 bool reg
= device_is_registered(&snap
->dev
);
2646 rbd_assert(!ret
^ reg
);
2651 static void rbd_remove_snap_dev(struct rbd_snap
*snap
)
2653 list_del(&snap
->node
);
2654 if (device_is_registered(&snap
->dev
))
2655 device_unregister(&snap
->dev
);
2658 static int rbd_register_snap_dev(struct rbd_snap
*snap
,
2659 struct device
*parent
)
2661 struct device
*dev
= &snap
->dev
;
2664 dev
->type
= &rbd_snap_device_type
;
2665 dev
->parent
= parent
;
2666 dev
->release
= rbd_snap_dev_release
;
2667 dev_set_name(dev
, "%s%s", RBD_SNAP_DEV_NAME_PREFIX
, snap
->name
);
2668 dout("%s: registering device for snapshot %s\n", __func__
, snap
->name
);
2670 ret
= device_register(dev
);
2675 static struct rbd_snap
*__rbd_add_snap_dev(struct rbd_device
*rbd_dev
,
2676 const char *snap_name
,
2677 u64 snap_id
, u64 snap_size
,
2680 struct rbd_snap
*snap
;
2683 snap
= kzalloc(sizeof (*snap
), GFP_KERNEL
);
2685 return ERR_PTR(-ENOMEM
);
2688 snap
->name
= kstrdup(snap_name
, GFP_KERNEL
);
2693 snap
->size
= snap_size
;
2694 snap
->features
= snap_features
;
2702 return ERR_PTR(ret
);
2705 static char *rbd_dev_v1_snap_info(struct rbd_device
*rbd_dev
, u32 which
,
2706 u64
*snap_size
, u64
*snap_features
)
2710 rbd_assert(which
< rbd_dev
->header
.snapc
->num_snaps
);
2712 *snap_size
= rbd_dev
->header
.snap_sizes
[which
];
2713 *snap_features
= 0; /* No features for v1 */
2715 /* Skip over names until we find the one we are looking for */
2717 snap_name
= rbd_dev
->header
.snap_names
;
2719 snap_name
+= strlen(snap_name
) + 1;
2725 * Get the size and object order for an image snapshot, or if
2726 * snap_id is CEPH_NOSNAP, gets this information for the base
2729 static int _rbd_dev_v2_snap_size(struct rbd_device
*rbd_dev
, u64 snap_id
,
2730 u8
*order
, u64
*snap_size
)
2732 __le64 snapid
= cpu_to_le64(snap_id
);
2737 } __attribute__ ((packed
)) size_buf
= { 0 };
2739 ret
= rbd_req_sync_exec(rbd_dev
, rbd_dev
->header_name
,
2741 (char *) &snapid
, sizeof (snapid
),
2742 (char *) &size_buf
, sizeof (size_buf
), NULL
);
2743 dout("%s: rbd_req_sync_exec returned %d\n", __func__
, ret
);
2747 *order
= size_buf
.order
;
2748 *snap_size
= le64_to_cpu(size_buf
.size
);
2750 dout(" snap_id 0x%016llx order = %u, snap_size = %llu\n",
2751 (unsigned long long) snap_id
, (unsigned int) *order
,
2752 (unsigned long long) *snap_size
);
2757 static int rbd_dev_v2_image_size(struct rbd_device
*rbd_dev
)
2759 return _rbd_dev_v2_snap_size(rbd_dev
, CEPH_NOSNAP
,
2760 &rbd_dev
->header
.obj_order
,
2761 &rbd_dev
->header
.image_size
);
2764 static int rbd_dev_v2_object_prefix(struct rbd_device
*rbd_dev
)
2770 reply_buf
= kzalloc(RBD_OBJ_PREFIX_LEN_MAX
, GFP_KERNEL
);
2774 ret
= rbd_req_sync_exec(rbd_dev
, rbd_dev
->header_name
,
2775 "rbd", "get_object_prefix",
2777 reply_buf
, RBD_OBJ_PREFIX_LEN_MAX
, NULL
);
2778 dout("%s: rbd_req_sync_exec returned %d\n", __func__
, ret
);
2781 ret
= 0; /* rbd_req_sync_exec() can return positive */
2784 rbd_dev
->header
.object_prefix
= ceph_extract_encoded_string(&p
,
2785 p
+ RBD_OBJ_PREFIX_LEN_MAX
,
2788 if (IS_ERR(rbd_dev
->header
.object_prefix
)) {
2789 ret
= PTR_ERR(rbd_dev
->header
.object_prefix
);
2790 rbd_dev
->header
.object_prefix
= NULL
;
2792 dout(" object_prefix = %s\n", rbd_dev
->header
.object_prefix
);
2801 static int _rbd_dev_v2_snap_features(struct rbd_device
*rbd_dev
, u64 snap_id
,
2804 __le64 snapid
= cpu_to_le64(snap_id
);
2808 } features_buf
= { 0 };
2812 ret
= rbd_req_sync_exec(rbd_dev
, rbd_dev
->header_name
,
2813 "rbd", "get_features",
2814 (char *) &snapid
, sizeof (snapid
),
2815 (char *) &features_buf
, sizeof (features_buf
),
2817 dout("%s: rbd_req_sync_exec returned %d\n", __func__
, ret
);
2821 incompat
= le64_to_cpu(features_buf
.incompat
);
2822 if (incompat
& ~RBD_FEATURES_ALL
)
2825 *snap_features
= le64_to_cpu(features_buf
.features
);
2827 dout(" snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
2828 (unsigned long long) snap_id
,
2829 (unsigned long long) *snap_features
,
2830 (unsigned long long) le64_to_cpu(features_buf
.incompat
));
2835 static int rbd_dev_v2_features(struct rbd_device
*rbd_dev
)
2837 return _rbd_dev_v2_snap_features(rbd_dev
, CEPH_NOSNAP
,
2838 &rbd_dev
->header
.features
);
2841 static int rbd_dev_v2_parent_info(struct rbd_device
*rbd_dev
)
2843 struct rbd_spec
*parent_spec
;
2845 void *reply_buf
= NULL
;
2853 parent_spec
= rbd_spec_alloc();
2857 size
= sizeof (__le64
) + /* pool_id */
2858 sizeof (__le32
) + RBD_IMAGE_ID_LEN_MAX
+ /* image_id */
2859 sizeof (__le64
) + /* snap_id */
2860 sizeof (__le64
); /* overlap */
2861 reply_buf
= kmalloc(size
, GFP_KERNEL
);
2867 snapid
= cpu_to_le64(CEPH_NOSNAP
);
2868 ret
= rbd_req_sync_exec(rbd_dev
, rbd_dev
->header_name
,
2869 "rbd", "get_parent",
2870 (char *) &snapid
, sizeof (snapid
),
2871 (char *) reply_buf
, size
, NULL
);
2872 dout("%s: rbd_req_sync_exec returned %d\n", __func__
, ret
);
2878 end
= (char *) reply_buf
+ size
;
2879 ceph_decode_64_safe(&p
, end
, parent_spec
->pool_id
, out_err
);
2880 if (parent_spec
->pool_id
== CEPH_NOPOOL
)
2881 goto out
; /* No parent? No problem. */
2883 /* The ceph file layout needs to fit pool id in 32 bits */
2886 if (WARN_ON(parent_spec
->pool_id
> (u64
) U32_MAX
))
2889 image_id
= ceph_extract_encoded_string(&p
, end
, NULL
, GFP_KERNEL
);
2890 if (IS_ERR(image_id
)) {
2891 ret
= PTR_ERR(image_id
);
2894 parent_spec
->image_id
= image_id
;
2895 ceph_decode_64_safe(&p
, end
, parent_spec
->snap_id
, out_err
);
2896 ceph_decode_64_safe(&p
, end
, overlap
, out_err
);
2898 rbd_dev
->parent_overlap
= overlap
;
2899 rbd_dev
->parent_spec
= parent_spec
;
2900 parent_spec
= NULL
; /* rbd_dev now owns this */
2905 rbd_spec_put(parent_spec
);
2910 static char *rbd_dev_image_name(struct rbd_device
*rbd_dev
)
2912 size_t image_id_size
;
2917 void *reply_buf
= NULL
;
2919 char *image_name
= NULL
;
2922 rbd_assert(!rbd_dev
->spec
->image_name
);
2924 len
= strlen(rbd_dev
->spec
->image_id
);
2925 image_id_size
= sizeof (__le32
) + len
;
2926 image_id
= kmalloc(image_id_size
, GFP_KERNEL
);
2931 end
= (char *) image_id
+ image_id_size
;
2932 ceph_encode_string(&p
, end
, rbd_dev
->spec
->image_id
, (u32
) len
);
2934 size
= sizeof (__le32
) + RBD_IMAGE_NAME_LEN_MAX
;
2935 reply_buf
= kmalloc(size
, GFP_KERNEL
);
2939 ret
= rbd_req_sync_exec(rbd_dev
, RBD_DIRECTORY
,
2940 "rbd", "dir_get_name",
2941 image_id
, image_id_size
,
2942 (char *) reply_buf
, size
, NULL
);
2946 end
= (char *) reply_buf
+ size
;
2947 image_name
= ceph_extract_encoded_string(&p
, end
, &len
, GFP_KERNEL
);
2948 if (IS_ERR(image_name
))
2951 dout("%s: name is %s len is %zd\n", __func__
, image_name
, len
);
2960 * When a parent image gets probed, we only have the pool, image,
2961 * and snapshot ids but not the names of any of them. This call
2962 * is made later to fill in those names. It has to be done after
2963 * rbd_dev_snaps_update() has completed because some of the
2964 * information (in particular, snapshot name) is not available
2967 static int rbd_dev_probe_update_spec(struct rbd_device
*rbd_dev
)
2969 struct ceph_osd_client
*osdc
;
2971 void *reply_buf
= NULL
;
2974 if (rbd_dev
->spec
->pool_name
)
2975 return 0; /* Already have the names */
2977 /* Look up the pool name */
2979 osdc
= &rbd_dev
->rbd_client
->client
->osdc
;
2980 name
= ceph_pg_pool_name_by_id(osdc
->osdmap
, rbd_dev
->spec
->pool_id
);
2982 rbd_warn(rbd_dev
, "there is no pool with id %llu",
2983 rbd_dev
->spec
->pool_id
); /* Really a BUG() */
2987 rbd_dev
->spec
->pool_name
= kstrdup(name
, GFP_KERNEL
);
2988 if (!rbd_dev
->spec
->pool_name
)
2991 /* Fetch the image name; tolerate failure here */
2993 name
= rbd_dev_image_name(rbd_dev
);
2995 rbd_dev
->spec
->image_name
= (char *) name
;
2997 rbd_warn(rbd_dev
, "unable to get image name");
2999 /* Look up the snapshot name. */
3001 name
= rbd_snap_name(rbd_dev
, rbd_dev
->spec
->snap_id
);
3003 rbd_warn(rbd_dev
, "no snapshot with id %llu",
3004 rbd_dev
->spec
->snap_id
); /* Really a BUG() */
3008 rbd_dev
->spec
->snap_name
= kstrdup(name
, GFP_KERNEL
);
3009 if(!rbd_dev
->spec
->snap_name
)
3015 kfree(rbd_dev
->spec
->pool_name
);
3016 rbd_dev
->spec
->pool_name
= NULL
;
3021 static int rbd_dev_v2_snap_context(struct rbd_device
*rbd_dev
, u64
*ver
)
3030 struct ceph_snap_context
*snapc
;
3034 * We'll need room for the seq value (maximum snapshot id),
3035 * snapshot count, and array of that many snapshot ids.
3036 * For now we have a fixed upper limit on the number we're
3037 * prepared to receive.
3039 size
= sizeof (__le64
) + sizeof (__le32
) +
3040 RBD_MAX_SNAP_COUNT
* sizeof (__le64
);
3041 reply_buf
= kzalloc(size
, GFP_KERNEL
);
3045 ret
= rbd_req_sync_exec(rbd_dev
, rbd_dev
->header_name
,
3046 "rbd", "get_snapcontext",
3048 reply_buf
, size
, ver
);
3049 dout("%s: rbd_req_sync_exec returned %d\n", __func__
, ret
);
3055 end
= (char *) reply_buf
+ size
;
3056 ceph_decode_64_safe(&p
, end
, seq
, out
);
3057 ceph_decode_32_safe(&p
, end
, snap_count
, out
);
3060 * Make sure the reported number of snapshot ids wouldn't go
3061 * beyond the end of our buffer. But before checking that,
3062 * make sure the computed size of the snapshot context we
3063 * allocate is representable in a size_t.
3065 if (snap_count
> (SIZE_MAX
- sizeof (struct ceph_snap_context
))
3070 if (!ceph_has_room(&p
, end
, snap_count
* sizeof (__le64
)))
3073 size
= sizeof (struct ceph_snap_context
) +
3074 snap_count
* sizeof (snapc
->snaps
[0]);
3075 snapc
= kmalloc(size
, GFP_KERNEL
);
3081 atomic_set(&snapc
->nref
, 1);
3083 snapc
->num_snaps
= snap_count
;
3084 for (i
= 0; i
< snap_count
; i
++)
3085 snapc
->snaps
[i
] = ceph_decode_64(&p
);
3087 rbd_dev
->header
.snapc
= snapc
;
3089 dout(" snap context seq = %llu, snap_count = %u\n",
3090 (unsigned long long) seq
, (unsigned int) snap_count
);
3098 static char *rbd_dev_v2_snap_name(struct rbd_device
*rbd_dev
, u32 which
)
3108 size
= sizeof (__le32
) + RBD_MAX_SNAP_NAME_LEN
;
3109 reply_buf
= kmalloc(size
, GFP_KERNEL
);
3111 return ERR_PTR(-ENOMEM
);
3113 snap_id
= cpu_to_le64(rbd_dev
->header
.snapc
->snaps
[which
]);
3114 ret
= rbd_req_sync_exec(rbd_dev
, rbd_dev
->header_name
,
3115 "rbd", "get_snapshot_name",
3116 (char *) &snap_id
, sizeof (snap_id
),
3117 reply_buf
, size
, NULL
);
3118 dout("%s: rbd_req_sync_exec returned %d\n", __func__
, ret
);
3123 end
= (char *) reply_buf
+ size
;
3124 snap_name
= ceph_extract_encoded_string(&p
, end
, NULL
, GFP_KERNEL
);
3125 if (IS_ERR(snap_name
)) {
3126 ret
= PTR_ERR(snap_name
);
3129 dout(" snap_id 0x%016llx snap_name = %s\n",
3130 (unsigned long long) le64_to_cpu(snap_id
), snap_name
);
3138 return ERR_PTR(ret
);
3141 static char *rbd_dev_v2_snap_info(struct rbd_device
*rbd_dev
, u32 which
,
3142 u64
*snap_size
, u64
*snap_features
)
3148 snap_id
= rbd_dev
->header
.snapc
->snaps
[which
];
3149 ret
= _rbd_dev_v2_snap_size(rbd_dev
, snap_id
, &order
, snap_size
);
3151 return ERR_PTR(ret
);
3152 ret
= _rbd_dev_v2_snap_features(rbd_dev
, snap_id
, snap_features
);
3154 return ERR_PTR(ret
);
3156 return rbd_dev_v2_snap_name(rbd_dev
, which
);
3159 static char *rbd_dev_snap_info(struct rbd_device
*rbd_dev
, u32 which
,
3160 u64
*snap_size
, u64
*snap_features
)
3162 if (rbd_dev
->image_format
== 1)
3163 return rbd_dev_v1_snap_info(rbd_dev
, which
,
3164 snap_size
, snap_features
);
3165 if (rbd_dev
->image_format
== 2)
3166 return rbd_dev_v2_snap_info(rbd_dev
, which
,
3167 snap_size
, snap_features
);
3168 return ERR_PTR(-EINVAL
);
3171 static int rbd_dev_v2_refresh(struct rbd_device
*rbd_dev
, u64
*hver
)
3176 down_write(&rbd_dev
->header_rwsem
);
3178 /* Grab old order first, to see if it changes */
3180 obj_order
= rbd_dev
->header
.obj_order
,
3181 ret
= rbd_dev_v2_image_size(rbd_dev
);
3184 if (rbd_dev
->header
.obj_order
!= obj_order
) {
3188 rbd_update_mapping_size(rbd_dev
);
3190 ret
= rbd_dev_v2_snap_context(rbd_dev
, hver
);
3191 dout("rbd_dev_v2_snap_context returned %d\n", ret
);
3194 ret
= rbd_dev_snaps_update(rbd_dev
);
3195 dout("rbd_dev_snaps_update returned %d\n", ret
);
3198 ret
= rbd_dev_snaps_register(rbd_dev
);
3199 dout("rbd_dev_snaps_register returned %d\n", ret
);
3201 up_write(&rbd_dev
->header_rwsem
);
3207 * Scan the rbd device's current snapshot list and compare it to the
3208 * newly-received snapshot context. Remove any existing snapshots
3209 * not present in the new snapshot context. Add a new snapshot for
3210 * any snaphots in the snapshot context not in the current list.
3211 * And verify there are no changes to snapshots we already know
3214 * Assumes the snapshots in the snapshot context are sorted by
3215 * snapshot id, highest id first. (Snapshots in the rbd_dev's list
3216 * are also maintained in that order.)
3218 static int rbd_dev_snaps_update(struct rbd_device
*rbd_dev
)
3220 struct ceph_snap_context
*snapc
= rbd_dev
->header
.snapc
;
3221 const u32 snap_count
= snapc
->num_snaps
;
3222 struct list_head
*head
= &rbd_dev
->snaps
;
3223 struct list_head
*links
= head
->next
;
3226 dout("%s: snap count is %u\n", __func__
, (unsigned int) snap_count
);
3227 while (index
< snap_count
|| links
!= head
) {
3229 struct rbd_snap
*snap
;
3232 u64 snap_features
= 0;
3234 snap_id
= index
< snap_count
? snapc
->snaps
[index
]
3236 snap
= links
!= head
? list_entry(links
, struct rbd_snap
, node
)
3238 rbd_assert(!snap
|| snap
->id
!= CEPH_NOSNAP
);
3240 if (snap_id
== CEPH_NOSNAP
|| (snap
&& snap
->id
> snap_id
)) {
3241 struct list_head
*next
= links
->next
;
3243 /* Existing snapshot not in the new snap context */
3245 if (rbd_dev
->spec
->snap_id
== snap
->id
)
3246 atomic_set(&rbd_dev
->exists
, 0);
3247 rbd_remove_snap_dev(snap
);
3248 dout("%ssnap id %llu has been removed\n",
3249 rbd_dev
->spec
->snap_id
== snap
->id
?
3251 (unsigned long long) snap
->id
);
3253 /* Done with this list entry; advance */
3259 snap_name
= rbd_dev_snap_info(rbd_dev
, index
,
3260 &snap_size
, &snap_features
);
3261 if (IS_ERR(snap_name
))
3262 return PTR_ERR(snap_name
);
3264 dout("entry %u: snap_id = %llu\n", (unsigned int) snap_count
,
3265 (unsigned long long) snap_id
);
3266 if (!snap
|| (snap_id
!= CEPH_NOSNAP
&& snap
->id
< snap_id
)) {
3267 struct rbd_snap
*new_snap
;
3269 /* We haven't seen this snapshot before */
3271 new_snap
= __rbd_add_snap_dev(rbd_dev
, snap_name
,
3272 snap_id
, snap_size
, snap_features
);
3273 if (IS_ERR(new_snap
)) {
3274 int err
= PTR_ERR(new_snap
);
3276 dout(" failed to add dev, error %d\n", err
);
3281 /* New goes before existing, or at end of list */
3283 dout(" added dev%s\n", snap
? "" : " at end\n");
3285 list_add_tail(&new_snap
->node
, &snap
->node
);
3287 list_add_tail(&new_snap
->node
, head
);
3289 /* Already have this one */
3291 dout(" already present\n");
3293 rbd_assert(snap
->size
== snap_size
);
3294 rbd_assert(!strcmp(snap
->name
, snap_name
));
3295 rbd_assert(snap
->features
== snap_features
);
3297 /* Done with this list entry; advance */
3299 links
= links
->next
;
3302 /* Advance to the next entry in the snapshot context */
3306 dout("%s: done\n", __func__
);
3312 * Scan the list of snapshots and register the devices for any that
3313 * have not already been registered.
3315 static int rbd_dev_snaps_register(struct rbd_device
*rbd_dev
)
3317 struct rbd_snap
*snap
;
3320 dout("%s called\n", __func__
);
3321 if (WARN_ON(!device_is_registered(&rbd_dev
->dev
)))
3324 list_for_each_entry(snap
, &rbd_dev
->snaps
, node
) {
3325 if (!rbd_snap_registered(snap
)) {
3326 ret
= rbd_register_snap_dev(snap
, &rbd_dev
->dev
);
3331 dout("%s: returning %d\n", __func__
, ret
);
3336 static int rbd_bus_add_dev(struct rbd_device
*rbd_dev
)
3341 mutex_lock_nested(&ctl_mutex
, SINGLE_DEPTH_NESTING
);
3343 dev
= &rbd_dev
->dev
;
3344 dev
->bus
= &rbd_bus_type
;
3345 dev
->type
= &rbd_device_type
;
3346 dev
->parent
= &rbd_root_dev
;
3347 dev
->release
= rbd_dev_release
;
3348 dev_set_name(dev
, "%d", rbd_dev
->dev_id
);
3349 ret
= device_register(dev
);
3351 mutex_unlock(&ctl_mutex
);
3356 static void rbd_bus_del_dev(struct rbd_device
*rbd_dev
)
3358 device_unregister(&rbd_dev
->dev
);
3361 static atomic64_t rbd_dev_id_max
= ATOMIC64_INIT(0);
3364 * Get a unique rbd identifier for the given new rbd_dev, and add
3365 * the rbd_dev to the global list. The minimum rbd id is 1.
3367 static void rbd_dev_id_get(struct rbd_device
*rbd_dev
)
3369 rbd_dev
->dev_id
= atomic64_inc_return(&rbd_dev_id_max
);
3371 spin_lock(&rbd_dev_list_lock
);
3372 list_add_tail(&rbd_dev
->node
, &rbd_dev_list
);
3373 spin_unlock(&rbd_dev_list_lock
);
3374 dout("rbd_dev %p given dev id %llu\n", rbd_dev
,
3375 (unsigned long long) rbd_dev
->dev_id
);
3379 * Remove an rbd_dev from the global list, and record that its
3380 * identifier is no longer in use.
3382 static void rbd_dev_id_put(struct rbd_device
*rbd_dev
)
3384 struct list_head
*tmp
;
3385 int rbd_id
= rbd_dev
->dev_id
;
3388 rbd_assert(rbd_id
> 0);
3390 dout("rbd_dev %p released dev id %llu\n", rbd_dev
,
3391 (unsigned long long) rbd_dev
->dev_id
);
3392 spin_lock(&rbd_dev_list_lock
);
3393 list_del_init(&rbd_dev
->node
);
3396 * If the id being "put" is not the current maximum, there
3397 * is nothing special we need to do.
3399 if (rbd_id
!= atomic64_read(&rbd_dev_id_max
)) {
3400 spin_unlock(&rbd_dev_list_lock
);
3405 * We need to update the current maximum id. Search the
3406 * list to find out what it is. We're more likely to find
3407 * the maximum at the end, so search the list backward.
3410 list_for_each_prev(tmp
, &rbd_dev_list
) {
3411 struct rbd_device
*rbd_dev
;
3413 rbd_dev
= list_entry(tmp
, struct rbd_device
, node
);
3414 if (rbd_dev
->dev_id
> max_id
)
3415 max_id
= rbd_dev
->dev_id
;
3417 spin_unlock(&rbd_dev_list_lock
);
3420 * The max id could have been updated by rbd_dev_id_get(), in
3421 * which case it now accurately reflects the new maximum.
3422 * Be careful not to overwrite the maximum value in that
3425 atomic64_cmpxchg(&rbd_dev_id_max
, rbd_id
, max_id
);
3426 dout(" max dev id has been reset\n");
3430 * Skips over white space at *buf, and updates *buf to point to the
3431 * first found non-space character (if any). Returns the length of
3432 * the token (string of non-white space characters) found. Note
3433 * that *buf must be terminated with '\0'.
3435 static inline size_t next_token(const char **buf
)
3438 * These are the characters that produce nonzero for
3439 * isspace() in the "C" and "POSIX" locales.
3441 const char *spaces
= " \f\n\r\t\v";
3443 *buf
+= strspn(*buf
, spaces
); /* Find start of token */
3445 return strcspn(*buf
, spaces
); /* Return token length */
3449 * Finds the next token in *buf, and if the provided token buffer is
3450 * big enough, copies the found token into it. The result, if
3451 * copied, is guaranteed to be terminated with '\0'. Note that *buf
3452 * must be terminated with '\0' on entry.
3454 * Returns the length of the token found (not including the '\0').
3455 * Return value will be 0 if no token is found, and it will be >=
3456 * token_size if the token would not fit.
3458 * The *buf pointer will be updated to point beyond the end of the
3459 * found token. Note that this occurs even if the token buffer is
3460 * too small to hold it.
3462 static inline size_t copy_token(const char **buf
,
3468 len
= next_token(buf
);
3469 if (len
< token_size
) {
3470 memcpy(token
, *buf
, len
);
3471 *(token
+ len
) = '\0';
3479 * Finds the next token in *buf, dynamically allocates a buffer big
3480 * enough to hold a copy of it, and copies the token into the new
3481 * buffer. The copy is guaranteed to be terminated with '\0'. Note
3482 * that a duplicate buffer is created even for a zero-length token.
3484 * Returns a pointer to the newly-allocated duplicate, or a null
3485 * pointer if memory for the duplicate was not available. If
3486 * the lenp argument is a non-null pointer, the length of the token
3487 * (not including the '\0') is returned in *lenp.
3489 * If successful, the *buf pointer will be updated to point beyond
3490 * the end of the found token.
3492 * Note: uses GFP_KERNEL for allocation.
3494 static inline char *dup_token(const char **buf
, size_t *lenp
)
3499 len
= next_token(buf
);
3500 dup
= kmemdup(*buf
, len
+ 1, GFP_KERNEL
);
3503 *(dup
+ len
) = '\0';
3513 * Parse the options provided for an "rbd add" (i.e., rbd image
3514 * mapping) request. These arrive via a write to /sys/bus/rbd/add,
3515 * and the data written is passed here via a NUL-terminated buffer.
3516 * Returns 0 if successful or an error code otherwise.
3518 * The information extracted from these options is recorded in
3519 * the other parameters which return dynamically-allocated
3522 * The address of a pointer that will refer to a ceph options
3523 * structure. Caller must release the returned pointer using
3524 * ceph_destroy_options() when it is no longer needed.
3526 * Address of an rbd options pointer. Fully initialized by
3527 * this function; caller must release with kfree().
3529 * Address of an rbd image specification pointer. Fully
3530 * initialized by this function based on parsed options.
3531 * Caller must release with rbd_spec_put().
3533 * The options passed take this form:
3534 * <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
3537 * A comma-separated list of one or more monitor addresses.
3538 * A monitor address is an ip address, optionally followed
3539 * by a port number (separated by a colon).
3540 * I.e.: ip1[:port1][,ip2[:port2]...]
3542 * A comma-separated list of ceph and/or rbd options.
3544 * The name of the rados pool containing the rbd image.
3546 * The name of the image in that pool to map.
3548 * An optional snapshot id. If provided, the mapping will
3549 * present data from the image at the time that snapshot was
3550 * created. The image head is used if no snapshot id is
3551 * provided. Snapshot mappings are always read-only.
3553 static int rbd_add_parse_args(const char *buf
,
3554 struct ceph_options
**ceph_opts
,
3555 struct rbd_options
**opts
,
3556 struct rbd_spec
**rbd_spec
)
3560 const char *mon_addrs
;
3561 size_t mon_addrs_size
;
3562 struct rbd_spec
*spec
= NULL
;
3563 struct rbd_options
*rbd_opts
= NULL
;
3564 struct ceph_options
*copts
;
3567 /* The first four tokens are required */
3569 len
= next_token(&buf
);
3571 rbd_warn(NULL
, "no monitor address(es) provided");
3575 mon_addrs_size
= len
+ 1;
3579 options
= dup_token(&buf
, NULL
);
3583 rbd_warn(NULL
, "no options provided");
3587 spec
= rbd_spec_alloc();
3591 spec
->pool_name
= dup_token(&buf
, NULL
);
3592 if (!spec
->pool_name
)
3594 if (!*spec
->pool_name
) {
3595 rbd_warn(NULL
, "no pool name provided");
3599 spec
->image_name
= dup_token(&buf
, NULL
);
3600 if (!spec
->image_name
)
3602 if (!*spec
->image_name
) {
3603 rbd_warn(NULL
, "no image name provided");
3608 * Snapshot name is optional; default is to use "-"
3609 * (indicating the head/no snapshot).
3611 len
= next_token(&buf
);
3613 buf
= RBD_SNAP_HEAD_NAME
; /* No snapshot supplied */
3614 len
= sizeof (RBD_SNAP_HEAD_NAME
) - 1;
3615 } else if (len
> RBD_MAX_SNAP_NAME_LEN
) {
3616 ret
= -ENAMETOOLONG
;
3619 spec
->snap_name
= kmemdup(buf
, len
+ 1, GFP_KERNEL
);
3620 if (!spec
->snap_name
)
3622 *(spec
->snap_name
+ len
) = '\0';
3624 /* Initialize all rbd options to the defaults */
3626 rbd_opts
= kzalloc(sizeof (*rbd_opts
), GFP_KERNEL
);
3630 rbd_opts
->read_only
= RBD_READ_ONLY_DEFAULT
;
3632 copts
= ceph_parse_options(options
, mon_addrs
,
3633 mon_addrs
+ mon_addrs_size
- 1,
3634 parse_rbd_opts_token
, rbd_opts
);
3635 if (IS_ERR(copts
)) {
3636 ret
= PTR_ERR(copts
);
3657 * An rbd format 2 image has a unique identifier, distinct from the
3658 * name given to it by the user. Internally, that identifier is
3659 * what's used to specify the names of objects related to the image.
3661 * A special "rbd id" object is used to map an rbd image name to its
3662 * id. If that object doesn't exist, then there is no v2 rbd image
3663 * with the supplied name.
3665 * This function will record the given rbd_dev's image_id field if
3666 * it can be determined, and in that case will return 0. If any
3667 * errors occur a negative errno will be returned and the rbd_dev's
3668 * image_id field will be unchanged (and should be NULL).
3670 static int rbd_dev_image_id(struct rbd_device
*rbd_dev
)
3679 * When probing a parent image, the image id is already
3680 * known (and the image name likely is not). There's no
3681 * need to fetch the image id again in this case.
3683 if (rbd_dev
->spec
->image_id
)
3687 * First, see if the format 2 image id file exists, and if
3688 * so, get the image's persistent id from it.
3690 size
= sizeof (RBD_ID_PREFIX
) + strlen(rbd_dev
->spec
->image_name
);
3691 object_name
= kmalloc(size
, GFP_NOIO
);
3694 sprintf(object_name
, "%s%s", RBD_ID_PREFIX
, rbd_dev
->spec
->image_name
);
3695 dout("rbd id object name is %s\n", object_name
);
3697 /* Response will be an encoded string, which includes a length */
3699 size
= sizeof (__le32
) + RBD_IMAGE_ID_LEN_MAX
;
3700 response
= kzalloc(size
, GFP_NOIO
);
3706 ret
= rbd_req_sync_exec(rbd_dev
, object_name
,
3709 response
, RBD_IMAGE_ID_LEN_MAX
, NULL
);
3710 dout("%s: rbd_req_sync_exec returned %d\n", __func__
, ret
);
3713 ret
= 0; /* rbd_req_sync_exec() can return positive */
3716 rbd_dev
->spec
->image_id
= ceph_extract_encoded_string(&p
,
3717 p
+ RBD_IMAGE_ID_LEN_MAX
,
3719 if (IS_ERR(rbd_dev
->spec
->image_id
)) {
3720 ret
= PTR_ERR(rbd_dev
->spec
->image_id
);
3721 rbd_dev
->spec
->image_id
= NULL
;
3723 dout("image_id is %s\n", rbd_dev
->spec
->image_id
);
3732 static int rbd_dev_v1_probe(struct rbd_device
*rbd_dev
)
3737 /* Version 1 images have no id; empty string is used */
3739 rbd_dev
->spec
->image_id
= kstrdup("", GFP_KERNEL
);
3740 if (!rbd_dev
->spec
->image_id
)
3743 /* Record the header object name for this rbd image. */
3745 size
= strlen(rbd_dev
->spec
->image_name
) + sizeof (RBD_SUFFIX
);
3746 rbd_dev
->header_name
= kmalloc(size
, GFP_KERNEL
);
3747 if (!rbd_dev
->header_name
) {
3751 sprintf(rbd_dev
->header_name
, "%s%s",
3752 rbd_dev
->spec
->image_name
, RBD_SUFFIX
);
3754 /* Populate rbd image metadata */
3756 ret
= rbd_read_header(rbd_dev
, &rbd_dev
->header
);
3760 /* Version 1 images have no parent (no layering) */
3762 rbd_dev
->parent_spec
= NULL
;
3763 rbd_dev
->parent_overlap
= 0;
3765 rbd_dev
->image_format
= 1;
3767 dout("discovered version 1 image, header name is %s\n",
3768 rbd_dev
->header_name
);
3773 kfree(rbd_dev
->header_name
);
3774 rbd_dev
->header_name
= NULL
;
3775 kfree(rbd_dev
->spec
->image_id
);
3776 rbd_dev
->spec
->image_id
= NULL
;
3781 static int rbd_dev_v2_probe(struct rbd_device
*rbd_dev
)
3788 * Image id was filled in by the caller. Record the header
3789 * object name for this rbd image.
3791 size
= sizeof (RBD_HEADER_PREFIX
) + strlen(rbd_dev
->spec
->image_id
);
3792 rbd_dev
->header_name
= kmalloc(size
, GFP_KERNEL
);
3793 if (!rbd_dev
->header_name
)
3795 sprintf(rbd_dev
->header_name
, "%s%s",
3796 RBD_HEADER_PREFIX
, rbd_dev
->spec
->image_id
);
3798 /* Get the size and object order for the image */
3800 ret
= rbd_dev_v2_image_size(rbd_dev
);
3804 /* Get the object prefix (a.k.a. block_name) for the image */
3806 ret
= rbd_dev_v2_object_prefix(rbd_dev
);
3810 /* Get the and check features for the image */
3812 ret
= rbd_dev_v2_features(rbd_dev
);
3816 /* If the image supports layering, get the parent info */
3818 if (rbd_dev
->header
.features
& RBD_FEATURE_LAYERING
) {
3819 ret
= rbd_dev_v2_parent_info(rbd_dev
);
3824 /* crypto and compression type aren't (yet) supported for v2 images */
3826 rbd_dev
->header
.crypt_type
= 0;
3827 rbd_dev
->header
.comp_type
= 0;
3829 /* Get the snapshot context, plus the header version */
3831 ret
= rbd_dev_v2_snap_context(rbd_dev
, &ver
);
3834 rbd_dev
->header
.obj_version
= ver
;
3836 rbd_dev
->image_format
= 2;
3838 dout("discovered version 2 image, header name is %s\n",
3839 rbd_dev
->header_name
);
3843 rbd_dev
->parent_overlap
= 0;
3844 rbd_spec_put(rbd_dev
->parent_spec
);
3845 rbd_dev
->parent_spec
= NULL
;
3846 kfree(rbd_dev
->header_name
);
3847 rbd_dev
->header_name
= NULL
;
3848 kfree(rbd_dev
->header
.object_prefix
);
3849 rbd_dev
->header
.object_prefix
= NULL
;
3854 static int rbd_dev_probe_finish(struct rbd_device
*rbd_dev
)
3858 /* no need to lock here, as rbd_dev is not registered yet */
3859 ret
= rbd_dev_snaps_update(rbd_dev
);
3863 ret
= rbd_dev_probe_update_spec(rbd_dev
);
3867 ret
= rbd_dev_set_mapping(rbd_dev
);
3871 /* generate unique id: find highest unique id, add one */
3872 rbd_dev_id_get(rbd_dev
);
3874 /* Fill in the device name, now that we have its id. */
3875 BUILD_BUG_ON(DEV_NAME_LEN
3876 < sizeof (RBD_DRV_NAME
) + MAX_INT_FORMAT_WIDTH
);
3877 sprintf(rbd_dev
->name
, "%s%d", RBD_DRV_NAME
, rbd_dev
->dev_id
);
3879 /* Get our block major device number. */
3881 ret
= register_blkdev(0, rbd_dev
->name
);
3884 rbd_dev
->major
= ret
;
3886 /* Set up the blkdev mapping. */
3888 ret
= rbd_init_disk(rbd_dev
);
3890 goto err_out_blkdev
;
3892 ret
= rbd_bus_add_dev(rbd_dev
);
3897 * At this point cleanup in the event of an error is the job
3898 * of the sysfs code (initiated by rbd_bus_del_dev()).
3900 down_write(&rbd_dev
->header_rwsem
);
3901 ret
= rbd_dev_snaps_register(rbd_dev
);
3902 up_write(&rbd_dev
->header_rwsem
);
3906 ret
= rbd_req_sync_watch(rbd_dev
, 1);
3910 /* Everything's ready. Announce the disk to the world. */
3912 add_disk(rbd_dev
->disk
);
3914 pr_info("%s: added with size 0x%llx\n", rbd_dev
->disk
->disk_name
,
3915 (unsigned long long) rbd_dev
->mapping
.size
);
3919 /* this will also clean up rest of rbd_dev stuff */
3921 rbd_bus_del_dev(rbd_dev
);
3925 rbd_free_disk(rbd_dev
);
3927 unregister_blkdev(rbd_dev
->major
, rbd_dev
->name
);
3929 rbd_dev_id_put(rbd_dev
);
3931 rbd_remove_all_snaps(rbd_dev
);
3937 * Probe for the existence of the header object for the given rbd
3938 * device. For format 2 images this includes determining the image
3941 static int rbd_dev_probe(struct rbd_device
*rbd_dev
)
3946 * Get the id from the image id object. If it's not a
3947 * format 2 image, we'll get ENOENT back, and we'll assume
3948 * it's a format 1 image.
3950 ret
= rbd_dev_image_id(rbd_dev
);
3952 ret
= rbd_dev_v1_probe(rbd_dev
);
3954 ret
= rbd_dev_v2_probe(rbd_dev
);
3956 dout("probe failed, returning %d\n", ret
);
3961 ret
= rbd_dev_probe_finish(rbd_dev
);
3963 rbd_header_free(&rbd_dev
->header
);
3968 static ssize_t
rbd_add(struct bus_type
*bus
,
3972 struct rbd_device
*rbd_dev
= NULL
;
3973 struct ceph_options
*ceph_opts
= NULL
;
3974 struct rbd_options
*rbd_opts
= NULL
;
3975 struct rbd_spec
*spec
= NULL
;
3976 struct rbd_client
*rbdc
;
3977 struct ceph_osd_client
*osdc
;
3980 if (!try_module_get(THIS_MODULE
))
3983 /* parse add command */
3984 rc
= rbd_add_parse_args(buf
, &ceph_opts
, &rbd_opts
, &spec
);
3986 goto err_out_module
;
3988 rbdc
= rbd_get_client(ceph_opts
);
3993 ceph_opts
= NULL
; /* rbd_dev client now owns this */
3996 osdc
= &rbdc
->client
->osdc
;
3997 rc
= ceph_pg_poolid_by_name(osdc
->osdmap
, spec
->pool_name
);
3999 goto err_out_client
;
4000 spec
->pool_id
= (u64
) rc
;
4002 /* The ceph file layout needs to fit pool id in 32 bits */
4004 if (WARN_ON(spec
->pool_id
> (u64
) U32_MAX
)) {
4006 goto err_out_client
;
4009 rbd_dev
= rbd_dev_create(rbdc
, spec
);
4011 goto err_out_client
;
4012 rbdc
= NULL
; /* rbd_dev now owns this */
4013 spec
= NULL
; /* rbd_dev now owns this */
4015 rbd_dev
->mapping
.read_only
= rbd_opts
->read_only
;
4017 rbd_opts
= NULL
; /* done with this */
4019 rc
= rbd_dev_probe(rbd_dev
);
4021 goto err_out_rbd_dev
;
4025 rbd_dev_destroy(rbd_dev
);
4027 rbd_put_client(rbdc
);
4030 ceph_destroy_options(ceph_opts
);
4034 module_put(THIS_MODULE
);
4036 dout("Error adding device %s\n", buf
);
4038 return (ssize_t
) rc
;
4041 static struct rbd_device
*__rbd_get_dev(unsigned long dev_id
)
4043 struct list_head
*tmp
;
4044 struct rbd_device
*rbd_dev
;
4046 spin_lock(&rbd_dev_list_lock
);
4047 list_for_each(tmp
, &rbd_dev_list
) {
4048 rbd_dev
= list_entry(tmp
, struct rbd_device
, node
);
4049 if (rbd_dev
->dev_id
== dev_id
) {
4050 spin_unlock(&rbd_dev_list_lock
);
4054 spin_unlock(&rbd_dev_list_lock
);
4058 static void rbd_dev_release(struct device
*dev
)
4060 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
4062 if (rbd_dev
->watch_request
) {
4063 struct ceph_client
*client
= rbd_dev
->rbd_client
->client
;
4065 ceph_osdc_unregister_linger_request(&client
->osdc
,
4066 rbd_dev
->watch_request
);
4068 if (rbd_dev
->watch_event
)
4069 rbd_req_sync_watch(rbd_dev
, 0);
4071 /* clean up and free blkdev */
4072 rbd_free_disk(rbd_dev
);
4073 unregister_blkdev(rbd_dev
->major
, rbd_dev
->name
);
4075 /* release allocated disk header fields */
4076 rbd_header_free(&rbd_dev
->header
);
4078 /* done with the id, and with the rbd_dev */
4079 rbd_dev_id_put(rbd_dev
);
4080 rbd_assert(rbd_dev
->rbd_client
!= NULL
);
4081 rbd_dev_destroy(rbd_dev
);
4083 /* release module ref */
4084 module_put(THIS_MODULE
);
4087 static ssize_t
rbd_remove(struct bus_type
*bus
,
4091 struct rbd_device
*rbd_dev
= NULL
;
4096 rc
= strict_strtoul(buf
, 10, &ul
);
4100 /* convert to int; abort if we lost anything in the conversion */
4101 target_id
= (int) ul
;
4102 if (target_id
!= ul
)
4105 mutex_lock_nested(&ctl_mutex
, SINGLE_DEPTH_NESTING
);
4107 rbd_dev
= __rbd_get_dev(target_id
);
4113 if (rbd_dev
->open_count
) {
4118 rbd_remove_all_snaps(rbd_dev
);
4119 rbd_bus_del_dev(rbd_dev
);
4122 mutex_unlock(&ctl_mutex
);
4128 * create control files in sysfs
4131 static int rbd_sysfs_init(void)
4135 ret
= device_register(&rbd_root_dev
);
4139 ret
= bus_register(&rbd_bus_type
);
4141 device_unregister(&rbd_root_dev
);
4146 static void rbd_sysfs_cleanup(void)
4148 bus_unregister(&rbd_bus_type
);
4149 device_unregister(&rbd_root_dev
);
4152 int __init
rbd_init(void)
4156 rc
= rbd_sysfs_init();
4159 pr_info("loaded " RBD_DRV_NAME_LONG
"\n");
4163 void __exit
rbd_exit(void)
4165 rbd_sysfs_cleanup();
4168 module_init(rbd_init
);
4169 module_exit(rbd_exit
);
4171 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
4172 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
4173 MODULE_DESCRIPTION("rados block device");
4175 /* following authorship retained from original osdblk.c */
4176 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
4178 MODULE_LICENSE("GPL");