2 rbd.c -- Export ceph rados objects as a Linux block device
5 based on drivers/block/osdblk.c:
7 Copyright 2009 Red Hat, Inc.
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation.
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
18 You should have received a copy of the GNU General Public License
19 along with this program; see the file COPYING. If not, write to
20 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
24 For usage instructions, please refer to:
26 Documentation/ABI/testing/sysfs-bus-rbd
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
35 #include <linux/kernel.h>
36 #include <linux/device.h>
37 #include <linux/module.h>
39 #include <linux/blkdev.h>
41 #include "rbd_types.h"
43 #define DRV_NAME "rbd"
44 #define DRV_NAME_LONG "rbd (rados block device)"
46 #define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
48 #define RBD_MAX_MD_NAME_LEN (96 + sizeof(RBD_SUFFIX))
49 #define RBD_MAX_POOL_NAME_LEN 64
50 #define RBD_MAX_SNAP_NAME_LEN 32
51 #define RBD_MAX_OPT_LEN 1024
53 #define RBD_SNAP_HEAD_NAME "-"
55 #define DEV_NAME_LEN 32
58 * block device image metadata (in-memory version)
60 struct rbd_image_header
{
66 struct rw_semaphore snap_rwsem
;
67 struct ceph_snap_context
*snapc
;
68 size_t snap_names_len
;
77 * an instance of the client. multiple devices may share a client.
80 struct ceph_client
*client
;
82 struct list_head node
;
89 struct request
*rq
; /* blk layer request */
90 struct bio
*bio
; /* cloned bio */
91 struct page
**pages
; /* list of used pages */
99 struct list_head node
;
107 int id
; /* blkdev unique id */
109 int major
; /* blkdev assigned major */
110 struct gendisk
*disk
; /* blkdev's gendisk and rq */
111 struct request_queue
*q
;
113 struct ceph_client
*client
;
114 struct rbd_client
*rbd_client
;
116 char name
[DEV_NAME_LEN
]; /* blkdev name, e.g. rbd3 */
118 spinlock_t lock
; /* queue lock */
120 struct rbd_image_header header
;
121 char obj
[RBD_MAX_OBJ_NAME_LEN
]; /* rbd image name */
123 char obj_md_name
[RBD_MAX_MD_NAME_LEN
]; /* hdr nm. */
124 char pool_name
[RBD_MAX_POOL_NAME_LEN
];
127 char snap_name
[RBD_MAX_SNAP_NAME_LEN
];
128 u32 cur_snap
; /* index+1 of current snapshot within snap context
132 struct list_head node
;
134 /* list of snapshots */
135 struct list_head snaps
;
141 static struct bus_type rbd_bus_type
= {
145 static spinlock_t node_lock
; /* protects client get/put */
147 static DEFINE_MUTEX(ctl_mutex
); /* Serialize open/close/setup/teardown */
148 static LIST_HEAD(rbd_dev_list
); /* devices */
149 static LIST_HEAD(rbd_client_list
); /* clients */
151 static int __rbd_init_snaps_header(struct rbd_device
*rbd_dev
);
152 static void rbd_dev_release(struct device
*dev
);
153 static ssize_t
rbd_snap_rollback(struct device
*dev
,
154 struct device_attribute
*attr
,
157 static ssize_t
rbd_snap_add(struct device
*dev
,
158 struct device_attribute
*attr
,
161 static void __rbd_remove_snap_dev(struct rbd_device
*rbd_dev
,
162 struct rbd_snap
*snap
);;
165 static struct rbd_device
*dev_to_rbd(struct device
*dev
)
167 return container_of(dev
, struct rbd_device
, dev
);
170 static struct device
*rbd_get_dev(struct rbd_device
*rbd_dev
)
172 return get_device(&rbd_dev
->dev
);
175 static void rbd_put_dev(struct rbd_device
*rbd_dev
)
177 put_device(&rbd_dev
->dev
);
180 static int rbd_open(struct block_device
*bdev
, fmode_t mode
)
182 struct gendisk
*disk
= bdev
->bd_disk
;
183 struct rbd_device
*rbd_dev
= disk
->private_data
;
185 rbd_get_dev(rbd_dev
);
187 set_device_ro(bdev
, rbd_dev
->read_only
);
189 if ((mode
& FMODE_WRITE
) && rbd_dev
->read_only
)
195 static int rbd_release(struct gendisk
*disk
, fmode_t mode
)
197 struct rbd_device
*rbd_dev
= disk
->private_data
;
199 rbd_put_dev(rbd_dev
);
204 static const struct block_device_operations rbd_bd_ops
= {
205 .owner
= THIS_MODULE
,
207 .release
= rbd_release
,
211 * Initialize an rbd client instance.
214 static struct rbd_client
*rbd_client_create(struct ceph_options
*opt
)
216 struct rbd_client
*rbdc
;
219 dout("rbd_client_create\n");
220 rbdc
= kmalloc(sizeof(struct rbd_client
), GFP_KERNEL
);
224 kref_init(&rbdc
->kref
);
225 INIT_LIST_HEAD(&rbdc
->node
);
227 rbdc
->client
= ceph_create_client(opt
, rbdc
);
228 if (IS_ERR(rbdc
->client
))
230 opt
= NULL
; /* Now rbdc->client is responsible for opt */
232 ret
= ceph_open_session(rbdc
->client
);
236 spin_lock(&node_lock
);
237 list_add_tail(&rbdc
->node
, &rbd_client_list
);
238 spin_unlock(&node_lock
);
240 dout("rbd_client_create created %p\n", rbdc
);
244 ceph_destroy_client(rbdc
->client
);
249 ceph_destroy_options(opt
);
254 * Find a ceph client with specific addr and configuration.
256 static struct rbd_client
*__rbd_client_find(struct ceph_options
*opt
)
258 struct rbd_client
*client_node
;
260 if (opt
->flags
& CEPH_OPT_NOSHARE
)
263 list_for_each_entry(client_node
, &rbd_client_list
, node
)
264 if (ceph_compare_options(opt
, client_node
->client
) == 0)
270 * Get a ceph client with specific addr and configuration, if one does
271 * not exist create it.
273 static int rbd_get_client(struct rbd_device
*rbd_dev
, const char *mon_addr
,
276 struct rbd_client
*rbdc
;
277 struct ceph_options
*opt
;
280 ret
= ceph_parse_options(&opt
, options
, mon_addr
,
281 mon_addr
+ strlen(mon_addr
), NULL
, NULL
);
285 spin_lock(&node_lock
);
286 rbdc
= __rbd_client_find(opt
);
288 ceph_destroy_options(opt
);
290 /* using an existing client */
291 kref_get(&rbdc
->kref
);
292 rbd_dev
->rbd_client
= rbdc
;
293 rbd_dev
->client
= rbdc
->client
;
294 spin_unlock(&node_lock
);
297 spin_unlock(&node_lock
);
299 rbdc
= rbd_client_create(opt
);
301 return PTR_ERR(rbdc
);
303 rbd_dev
->rbd_client
= rbdc
;
304 rbd_dev
->client
= rbdc
->client
;
309 * Destroy ceph client
311 static void rbd_client_release(struct kref
*kref
)
313 struct rbd_client
*rbdc
= container_of(kref
, struct rbd_client
, kref
);
315 dout("rbd_release_client %p\n", rbdc
);
316 spin_lock(&node_lock
);
317 list_del(&rbdc
->node
);
318 spin_unlock(&node_lock
);
320 ceph_destroy_client(rbdc
->client
);
325 * Drop reference to ceph client node. If it's not referenced anymore, release
328 static void rbd_put_client(struct rbd_device
*rbd_dev
)
330 kref_put(&rbd_dev
->rbd_client
->kref
, rbd_client_release
);
331 rbd_dev
->rbd_client
= NULL
;
332 rbd_dev
->client
= NULL
;
337 * Create a new header structure, translate header format from the on-disk
340 static int rbd_header_from_disk(struct rbd_image_header
*header
,
341 struct rbd_image_header_ondisk
*ondisk
,
346 u32 snap_count
= le32_to_cpu(ondisk
->snap_count
);
349 init_rwsem(&header
->snap_rwsem
);
350 header
->snap_names_len
= le64_to_cpu(ondisk
->snap_names_len
);
351 header
->snapc
= kmalloc(sizeof(struct ceph_snap_context
) +
353 sizeof(struct rbd_image_snap_ondisk
),
358 header
->snap_names
= kmalloc(header
->snap_names_len
,
360 if (!header
->snap_names
)
362 header
->snap_sizes
= kmalloc(snap_count
* sizeof(u64
),
364 if (!header
->snap_sizes
)
367 header
->snap_names
= NULL
;
368 header
->snap_sizes
= NULL
;
370 memcpy(header
->block_name
, ondisk
->block_name
,
371 sizeof(ondisk
->block_name
));
373 header
->image_size
= le64_to_cpu(ondisk
->image_size
);
374 header
->obj_order
= ondisk
->options
.order
;
375 header
->crypt_type
= ondisk
->options
.crypt_type
;
376 header
->comp_type
= ondisk
->options
.comp_type
;
378 atomic_set(&header
->snapc
->nref
, 1);
379 header
->snap_seq
= le64_to_cpu(ondisk
->snap_seq
);
380 header
->snapc
->num_snaps
= snap_count
;
381 header
->total_snaps
= snap_count
;
384 allocated_snaps
== snap_count
) {
385 for (i
= 0; i
< snap_count
; i
++) {
386 header
->snapc
->snaps
[i
] =
387 le64_to_cpu(ondisk
->snaps
[i
].id
);
388 header
->snap_sizes
[i
] =
389 le64_to_cpu(ondisk
->snaps
[i
].image_size
);
392 /* copy snapshot names */
393 memcpy(header
->snap_names
, &ondisk
->snaps
[i
],
394 header
->snap_names_len
);
400 kfree(header
->snap_names
);
402 kfree(header
->snapc
);
406 static int snap_index(struct rbd_image_header
*header
, int snap_num
)
408 return header
->total_snaps
- snap_num
;
411 static u64
cur_snap_id(struct rbd_device
*rbd_dev
)
413 struct rbd_image_header
*header
= &rbd_dev
->header
;
415 if (!rbd_dev
->cur_snap
)
418 return header
->snapc
->snaps
[snap_index(header
, rbd_dev
->cur_snap
)];
421 static int snap_by_name(struct rbd_image_header
*header
, const char *snap_name
,
425 char *p
= header
->snap_names
;
427 for (i
= 0; i
< header
->total_snaps
; i
++, p
+= strlen(p
) + 1) {
428 if (strcmp(snap_name
, p
) == 0)
431 if (i
== header
->total_snaps
)
434 *seq
= header
->snapc
->snaps
[i
];
437 *size
= header
->snap_sizes
[i
];
442 static int rbd_header_set_snap(struct rbd_device
*dev
,
443 const char *snap_name
,
446 struct rbd_image_header
*header
= &dev
->header
;
447 struct ceph_snap_context
*snapc
= header
->snapc
;
450 down_write(&header
->snap_rwsem
);
454 strcmp(snap_name
, "-") == 0 ||
455 strcmp(snap_name
, RBD_SNAP_HEAD_NAME
) == 0) {
456 if (header
->total_snaps
)
457 snapc
->seq
= header
->snap_seq
;
463 *size
= header
->image_size
;
465 ret
= snap_by_name(header
, snap_name
, &snapc
->seq
, size
);
469 dev
->cur_snap
= header
->total_snaps
- ret
;
475 up_write(&header
->snap_rwsem
);
479 static void rbd_header_free(struct rbd_image_header
*header
)
481 kfree(header
->snapc
);
482 kfree(header
->snap_names
);
483 kfree(header
->snap_sizes
);
487 * get the actual striped segment name, offset and length
489 static u64
rbd_get_segment(struct rbd_image_header
*header
,
490 const char *block_name
,
492 char *seg_name
, u64
*segofs
)
494 u64 seg
= ofs
>> header
->obj_order
;
497 snprintf(seg_name
, RBD_MAX_SEG_NAME_LEN
,
498 "%s.%012llx", block_name
, seg
);
500 ofs
= ofs
& ((1 << header
->obj_order
) - 1);
501 len
= min_t(u64
, len
, (1 << header
->obj_order
) - ofs
);
513 static void bio_chain_put(struct bio
*chain
)
519 chain
= chain
->bi_next
;
525 * zeros a bio chain, starting at specific offset
527 static void zero_bio_chain(struct bio
*chain
, int start_ofs
)
536 bio_for_each_segment(bv
, chain
, i
) {
537 if (pos
+ bv
->bv_len
> start_ofs
) {
538 int remainder
= max(start_ofs
- pos
, 0);
539 buf
= bvec_kmap_irq(bv
, &flags
);
540 memset(buf
+ remainder
, 0,
541 bv
->bv_len
- remainder
);
542 bvec_kunmap_irq(buf
, &flags
);
547 chain
= chain
->bi_next
;
552 * bio_chain_clone - clone a chain of bios up to a certain length.
553 * might return a bio_pair that will need to be released.
555 static struct bio
*bio_chain_clone(struct bio
**old
, struct bio
**next
,
556 struct bio_pair
**bp
,
557 int len
, gfp_t gfpmask
)
559 struct bio
*tmp
, *old_chain
= *old
, *new_chain
= NULL
, *tail
= NULL
;
563 bio_pair_release(*bp
);
567 while (old_chain
&& (total
< len
)) {
568 tmp
= bio_kmalloc(gfpmask
, old_chain
->bi_max_vecs
);
572 if (total
+ old_chain
->bi_size
> len
) {
576 * this split can only happen with a single paged bio,
577 * split_bio will BUG_ON if this is not the case
579 dout("bio_chain_clone split! total=%d remaining=%d"
581 (int)total
, (int)len
-total
,
582 (int)old_chain
->bi_size
);
584 /* split the bio. We'll release it either in the next
585 call, or it will have to be released outside */
586 bp
= bio_split(old_chain
, (len
- total
) / 512ULL);
590 __bio_clone(tmp
, &bp
->bio1
);
594 __bio_clone(tmp
, old_chain
);
595 *next
= old_chain
->bi_next
;
599 gfpmask
&= ~__GFP_WAIT
;
603 new_chain
= tail
= tmp
;
608 old_chain
= old_chain
->bi_next
;
610 total
+= tmp
->bi_size
;
616 tail
->bi_next
= NULL
;
623 dout("bio_chain_clone with err\n");
624 bio_chain_put(new_chain
);
629 * helpers for osd request op vectors.
631 static int rbd_create_rw_ops(struct ceph_osd_req_op
**ops
,
636 *ops
= kzalloc(sizeof(struct ceph_osd_req_op
) * (num_ops
+ 1),
640 (*ops
)[0].op
= opcode
;
642 * op extent offset and length will be set later on
643 * in calc_raw_layout()
645 (*ops
)[0].payload_len
= payload_len
;
649 static void rbd_destroy_ops(struct ceph_osd_req_op
*ops
)
655 * Send ceph osd request
657 static int rbd_do_request(struct request
*rq
,
658 struct rbd_device
*dev
,
659 struct ceph_snap_context
*snapc
,
661 const char *obj
, u64 ofs
, u64 len
,
666 struct ceph_osd_req_op
*ops
,
668 void (*rbd_cb
)(struct ceph_osd_request
*req
,
669 struct ceph_msg
*msg
))
671 struct ceph_osd_request
*req
;
672 struct ceph_file_layout
*layout
;
675 struct timespec mtime
= CURRENT_TIME
;
676 struct rbd_request
*req_data
;
677 struct ceph_osd_request_head
*reqhead
;
678 struct rbd_image_header
*header
= &dev
->header
;
681 req_data
= kzalloc(sizeof(*req_data
), GFP_NOIO
);
685 dout("rbd_do_request len=%lld ofs=%lld\n", len
, ofs
);
687 down_read(&header
->snap_rwsem
);
689 req
= ceph_osdc_alloc_request(&dev
->client
->osdc
, flags
,
693 GFP_NOIO
, pages
, bio
);
695 up_read(&header
->snap_rwsem
);
700 req
->r_callback
= rbd_cb
;
704 req_data
->pages
= pages
;
707 req
->r_priv
= req_data
;
709 reqhead
= req
->r_request
->front
.iov_base
;
710 reqhead
->snapid
= cpu_to_le64(CEPH_NOSNAP
);
712 strncpy(req
->r_oid
, obj
, sizeof(req
->r_oid
));
713 req
->r_oid_len
= strlen(req
->r_oid
);
715 layout
= &req
->r_file_layout
;
716 memset(layout
, 0, sizeof(*layout
));
717 layout
->fl_stripe_unit
= cpu_to_le32(1 << RBD_MAX_OBJ_ORDER
);
718 layout
->fl_stripe_count
= cpu_to_le32(1);
719 layout
->fl_object_size
= cpu_to_le32(1 << RBD_MAX_OBJ_ORDER
);
720 layout
->fl_pg_preferred
= cpu_to_le32(-1);
721 layout
->fl_pg_pool
= cpu_to_le32(dev
->poolid
);
722 ceph_calc_raw_layout(&dev
->client
->osdc
, layout
, snapid
,
723 ofs
, &len
, &bno
, req
, ops
);
725 ceph_osdc_build_request(req
, ofs
, &len
,
729 req
->r_oid
, req
->r_oid_len
);
730 up_read(&header
->snap_rwsem
);
732 ret
= ceph_osdc_start_request(&dev
->client
->osdc
, req
, false);
737 ret
= ceph_osdc_wait_request(&dev
->client
->osdc
, req
);
738 ceph_osdc_put_request(req
);
743 bio_chain_put(req_data
->bio
);
744 ceph_osdc_put_request(req
);
749 blk_end_request(rq
, ret
, len
);
754 * Ceph osd op callback
756 static void rbd_req_cb(struct ceph_osd_request
*req
, struct ceph_msg
*msg
)
758 struct rbd_request
*req_data
= req
->r_priv
;
759 struct ceph_osd_reply_head
*replyhead
;
760 struct ceph_osd_op
*op
;
766 replyhead
= msg
->front
.iov_base
;
767 WARN_ON(le32_to_cpu(replyhead
->num_ops
) == 0);
768 op
= (void *)(replyhead
+ 1);
769 rc
= le32_to_cpu(replyhead
->result
);
770 bytes
= le64_to_cpu(op
->extent
.length
);
771 read_op
= (le32_to_cpu(op
->op
) == CEPH_OSD_OP_READ
);
773 dout("rbd_req_cb bytes=%lld readop=%d rc=%d\n", bytes
, read_op
, rc
);
775 if (rc
== -ENOENT
&& read_op
) {
776 zero_bio_chain(req_data
->bio
, 0);
778 } else if (rc
== 0 && read_op
&& bytes
< req_data
->len
) {
779 zero_bio_chain(req_data
->bio
, bytes
);
780 bytes
= req_data
->len
;
783 blk_end_request(req_data
->rq
, rc
, bytes
);
786 bio_chain_put(req_data
->bio
);
788 ceph_osdc_put_request(req
);
793 * Do a synchronous ceph osd operation
795 static int rbd_req_sync_op(struct rbd_device
*dev
,
796 struct ceph_snap_context
*snapc
,
800 struct ceph_osd_req_op
*orig_ops
,
809 struct ceph_osd_req_op
*ops
= orig_ops
;
812 num_pages
= calc_pages_for(ofs
, len
);
813 pages
= ceph_alloc_page_vector(num_pages
, GFP_KERNEL
);
815 return PTR_ERR(pages
);
818 payload_len
= (flags
& CEPH_OSD_FLAG_WRITE
? len
: 0);
819 ret
= rbd_create_rw_ops(&ops
, 1, opcode
, payload_len
);
823 if ((flags
& CEPH_OSD_FLAG_WRITE
) && buf
) {
824 ret
= ceph_copy_to_page_vector(pages
, buf
, ofs
, len
);
830 ret
= rbd_do_request(NULL
, dev
, snapc
, snapid
,
840 if ((flags
& CEPH_OSD_FLAG_READ
) && buf
)
841 ret
= ceph_copy_from_page_vector(pages
, buf
, ofs
, ret
);
845 rbd_destroy_ops(ops
);
847 ceph_release_page_vector(pages
, num_pages
);
852 * Do an asynchronous ceph osd operation
854 static int rbd_do_op(struct request
*rq
,
855 struct rbd_device
*rbd_dev
,
856 struct ceph_snap_context
*snapc
,
858 int opcode
, int flags
, int num_reply
,
866 struct ceph_osd_req_op
*ops
;
869 seg_name
= kmalloc(RBD_MAX_SEG_NAME_LEN
+ 1, GFP_NOIO
);
873 seg_len
= rbd_get_segment(&rbd_dev
->header
,
874 rbd_dev
->header
.block_name
,
878 payload_len
= (flags
& CEPH_OSD_FLAG_WRITE
? seg_len
: 0);
880 ret
= rbd_create_rw_ops(&ops
, 1, opcode
, payload_len
);
884 /* we've taken care of segment sizes earlier when we
885 cloned the bios. We should never have a segment
886 truncated at this point */
887 BUG_ON(seg_len
< len
);
889 ret
= rbd_do_request(rq
, rbd_dev
, snapc
, snapid
,
890 seg_name
, seg_ofs
, seg_len
,
903 * Request async osd write
905 static int rbd_req_write(struct request
*rq
,
906 struct rbd_device
*rbd_dev
,
907 struct ceph_snap_context
*snapc
,
911 return rbd_do_op(rq
, rbd_dev
, snapc
, CEPH_NOSNAP
,
913 CEPH_OSD_FLAG_WRITE
| CEPH_OSD_FLAG_ONDISK
,
919 * Request async osd read
921 static int rbd_req_read(struct request
*rq
,
922 struct rbd_device
*rbd_dev
,
927 return rbd_do_op(rq
, rbd_dev
, NULL
,
928 (snapid
? snapid
: CEPH_NOSNAP
),
936 * Request sync osd read
938 static int rbd_req_sync_read(struct rbd_device
*dev
,
939 struct ceph_snap_context
*snapc
,
945 return rbd_req_sync_op(dev
, NULL
,
946 (snapid
? snapid
: CEPH_NOSNAP
),
950 1, obj
, ofs
, len
, buf
);
954 * Request sync osd read
956 static int rbd_req_sync_rollback_obj(struct rbd_device
*dev
,
960 struct ceph_osd_req_op
*ops
;
961 int ret
= rbd_create_rw_ops(&ops
, 1, CEPH_OSD_OP_ROLLBACK
, 0);
965 ops
[0].snap
.snapid
= snapid
;
967 ret
= rbd_req_sync_op(dev
, NULL
,
970 CEPH_OSD_FLAG_WRITE
| CEPH_OSD_FLAG_ONDISK
,
974 rbd_destroy_ops(ops
);
983 * Request sync osd read
985 static int rbd_req_sync_exec(struct rbd_device
*dev
,
992 struct ceph_osd_req_op
*ops
;
993 int cls_len
= strlen(cls
);
994 int method_len
= strlen(method
);
995 int ret
= rbd_create_rw_ops(&ops
, 1, CEPH_OSD_OP_CALL
,
996 cls_len
+ method_len
+ len
);
1000 ops
[0].cls
.class_name
= cls
;
1001 ops
[0].cls
.class_len
= (__u8
)cls_len
;
1002 ops
[0].cls
.method_name
= method
;
1003 ops
[0].cls
.method_len
= (__u8
)method_len
;
1004 ops
[0].cls
.argc
= 0;
1005 ops
[0].cls
.indata
= data
;
1006 ops
[0].cls
.indata_len
= len
;
1008 ret
= rbd_req_sync_op(dev
, NULL
,
1011 CEPH_OSD_FLAG_WRITE
| CEPH_OSD_FLAG_ONDISK
,
1013 1, obj
, 0, 0, NULL
);
1015 rbd_destroy_ops(ops
);
1017 dout("cls_exec returned %d\n", ret
);
1022 * block device queue callback
1024 static void rbd_rq_fn(struct request_queue
*q
)
1026 struct rbd_device
*rbd_dev
= q
->queuedata
;
1028 struct bio_pair
*bp
= NULL
;
1030 rq
= blk_fetch_request(q
);
1034 struct bio
*rq_bio
, *next_bio
= NULL
;
1036 int size
, op_size
= 0;
1039 /* peek at request from block layer */
1043 dout("fetched request\n");
1045 /* filter out block requests we don't understand */
1046 if ((rq
->cmd_type
!= REQ_TYPE_FS
)) {
1047 __blk_end_request_all(rq
, 0);
1051 /* deduce our operation (read, write) */
1052 do_write
= (rq_data_dir(rq
) == WRITE
);
1054 size
= blk_rq_bytes(rq
);
1055 ofs
= blk_rq_pos(rq
) * 512ULL;
1057 if (do_write
&& rbd_dev
->read_only
) {
1058 __blk_end_request_all(rq
, -EROFS
);
1062 spin_unlock_irq(q
->queue_lock
);
1064 dout("%s 0x%x bytes at 0x%llx\n",
1065 do_write
? "write" : "read",
1066 size
, blk_rq_pos(rq
) * 512ULL);
1069 /* a bio clone to be passed down to OSD req */
1070 dout("rq->bio->bi_vcnt=%d\n", rq
->bio
->bi_vcnt
);
1071 op_size
= rbd_get_segment(&rbd_dev
->header
,
1072 rbd_dev
->header
.block_name
,
1075 bio
= bio_chain_clone(&rq_bio
, &next_bio
, &bp
,
1076 op_size
, GFP_ATOMIC
);
1078 spin_lock_irq(q
->queue_lock
);
1079 __blk_end_request_all(rq
, -ENOMEM
);
1083 /* init OSD command: write or read */
1085 rbd_req_write(rq
, rbd_dev
,
1086 rbd_dev
->header
.snapc
,
1090 rbd_req_read(rq
, rbd_dev
,
1091 cur_snap_id(rbd_dev
),
1102 bio_pair_release(bp
);
1104 spin_lock_irq(q
->queue_lock
);
1106 rq
= blk_fetch_request(q
);
1111 * a queue callback. Makes sure that we don't create a bio that spans across
1112 * multiple osd objects. One exception would be with a single page bios,
1113 * which we handle later at bio_chain_clone
1115 static int rbd_merge_bvec(struct request_queue
*q
, struct bvec_merge_data
*bmd
,
1116 struct bio_vec
*bvec
)
1118 struct rbd_device
*rbd_dev
= q
->queuedata
;
1119 unsigned int chunk_sectors
= 1 << (rbd_dev
->header
.obj_order
- 9);
1120 sector_t sector
= bmd
->bi_sector
+ get_start_sect(bmd
->bi_bdev
);
1121 unsigned int bio_sectors
= bmd
->bi_size
>> 9;
1124 max
= (chunk_sectors
- ((sector
& (chunk_sectors
- 1))
1125 + bio_sectors
)) << 9;
1127 max
= 0; /* bio_add cannot handle a negative return */
1128 if (max
<= bvec
->bv_len
&& bio_sectors
== 0)
1129 return bvec
->bv_len
;
1133 static void rbd_free_disk(struct rbd_device
*rbd_dev
)
1135 struct gendisk
*disk
= rbd_dev
->disk
;
1140 rbd_header_free(&rbd_dev
->header
);
1142 if (disk
->flags
& GENHD_FL_UP
)
1145 blk_cleanup_queue(disk
->queue
);
1150 * reload the ondisk the header
1152 static int rbd_read_header(struct rbd_device
*rbd_dev
,
1153 struct rbd_image_header
*header
)
1156 struct rbd_image_header_ondisk
*dh
;
1158 u64 snap_names_len
= 0;
1161 int len
= sizeof(*dh
) +
1162 snap_count
* sizeof(struct rbd_image_snap_ondisk
) +
1166 dh
= kmalloc(len
, GFP_KERNEL
);
1170 rc
= rbd_req_sync_read(rbd_dev
,
1172 rbd_dev
->obj_md_name
,
1178 rc
= rbd_header_from_disk(header
, dh
, snap_count
, GFP_KERNEL
);
1182 if (snap_count
!= header
->total_snaps
) {
1183 snap_count
= header
->total_snaps
;
1184 snap_names_len
= header
->snap_names_len
;
1185 rbd_header_free(header
);
1200 static int rbd_header_add_snap(struct rbd_device
*dev
,
1201 const char *snap_name
,
1204 int name_len
= strlen(snap_name
);
1207 void *data
, *data_start
, *data_end
;
1209 /* we should create a snapshot only if we're pointing at the head */
1213 ret
= ceph_monc_create_snapid(&dev
->client
->monc
, dev
->poolid
,
1215 dout("created snapid=%lld\n", new_snapid
);
1219 data
= kmalloc(name_len
+ 16, gfp_flags
);
1224 data_end
= data
+ name_len
+ 16;
1226 ceph_encode_string_safe(&data
, data_end
, snap_name
, name_len
, bad
);
1227 ceph_encode_64_safe(&data
, data_end
, new_snapid
, bad
);
1229 ret
= rbd_req_sync_exec(dev
, dev
->obj_md_name
, "rbd", "snap_add",
1230 data_start
, data
- data_start
);
1237 dev
->header
.snapc
->seq
= new_snapid
;
1244 static void __rbd_remove_all_snaps(struct rbd_device
*rbd_dev
)
1246 struct rbd_snap
*snap
;
1248 while (!list_empty(&rbd_dev
->snaps
)) {
1249 snap
= list_first_entry(&rbd_dev
->snaps
, struct rbd_snap
, node
);
1250 __rbd_remove_snap_dev(rbd_dev
, snap
);
1255 * only read the first part of the ondisk header, without the snaps info
1257 static int __rbd_update_snaps(struct rbd_device
*rbd_dev
)
1260 struct rbd_image_header h
;
1263 ret
= rbd_read_header(rbd_dev
, &h
);
1267 down_write(&rbd_dev
->header
.snap_rwsem
);
1269 snap_seq
= rbd_dev
->header
.snapc
->seq
;
1271 kfree(rbd_dev
->header
.snapc
);
1272 kfree(rbd_dev
->header
.snap_names
);
1273 kfree(rbd_dev
->header
.snap_sizes
);
1275 rbd_dev
->header
.total_snaps
= h
.total_snaps
;
1276 rbd_dev
->header
.snapc
= h
.snapc
;
1277 rbd_dev
->header
.snap_names
= h
.snap_names
;
1278 rbd_dev
->header
.snap_names_len
= h
.snap_names_len
;
1279 rbd_dev
->header
.snap_sizes
= h
.snap_sizes
;
1280 rbd_dev
->header
.snapc
->seq
= snap_seq
;
1282 ret
= __rbd_init_snaps_header(rbd_dev
);
1284 up_write(&rbd_dev
->header
.snap_rwsem
);
1289 static int rbd_init_disk(struct rbd_device
*rbd_dev
)
1291 struct gendisk
*disk
;
1292 struct request_queue
*q
;
1296 /* contact OSD, request size info about the object being mapped */
1297 rc
= rbd_read_header(rbd_dev
, &rbd_dev
->header
);
1301 /* no need to lock here, as rbd_dev is not registered yet */
1302 rc
= __rbd_init_snaps_header(rbd_dev
);
1306 rc
= rbd_header_set_snap(rbd_dev
, rbd_dev
->snap_name
, &total_size
);
1310 /* create gendisk info */
1312 disk
= alloc_disk(RBD_MINORS_PER_MAJOR
);
1316 sprintf(disk
->disk_name
, DRV_NAME
"%d", rbd_dev
->id
);
1317 disk
->major
= rbd_dev
->major
;
1318 disk
->first_minor
= 0;
1319 disk
->fops
= &rbd_bd_ops
;
1320 disk
->private_data
= rbd_dev
;
1324 q
= blk_init_queue(rbd_rq_fn
, &rbd_dev
->lock
);
1327 blk_queue_merge_bvec(q
, rbd_merge_bvec
);
1330 q
->queuedata
= rbd_dev
;
1332 rbd_dev
->disk
= disk
;
1335 /* finally, announce the disk to the world */
1336 set_capacity(disk
, total_size
/ 512ULL);
1339 pr_info("%s: added with size 0x%llx\n",
1340 disk
->disk_name
, (unsigned long long)total_size
);
1353 static ssize_t
rbd_size_show(struct device
*dev
,
1354 struct device_attribute
*attr
, char *buf
)
1356 struct rbd_device
*rbd_dev
= dev_to_rbd(dev
);
1358 return sprintf(buf
, "%llu\n", (unsigned long long)rbd_dev
->header
.image_size
);
1361 static ssize_t
rbd_major_show(struct device
*dev
,
1362 struct device_attribute
*attr
, char *buf
)
1364 struct rbd_device
*rbd_dev
= dev_to_rbd(dev
);
1366 return sprintf(buf
, "%d\n", rbd_dev
->major
);
1369 static ssize_t
rbd_client_id_show(struct device
*dev
,
1370 struct device_attribute
*attr
, char *buf
)
1372 struct rbd_device
*rbd_dev
= dev_to_rbd(dev
);
1374 return sprintf(buf
, "client%lld\n", ceph_client_id(rbd_dev
->client
));
1377 static ssize_t
rbd_pool_show(struct device
*dev
,
1378 struct device_attribute
*attr
, char *buf
)
1380 struct rbd_device
*rbd_dev
= dev_to_rbd(dev
);
1382 return sprintf(buf
, "%s\n", rbd_dev
->pool_name
);
1385 static ssize_t
rbd_name_show(struct device
*dev
,
1386 struct device_attribute
*attr
, char *buf
)
1388 struct rbd_device
*rbd_dev
= dev_to_rbd(dev
);
1390 return sprintf(buf
, "%s\n", rbd_dev
->obj
);
1393 static ssize_t
rbd_snap_show(struct device
*dev
,
1394 struct device_attribute
*attr
,
1397 struct rbd_device
*rbd_dev
= dev_to_rbd(dev
);
1399 return sprintf(buf
, "%s\n", rbd_dev
->snap_name
);
1402 static ssize_t
rbd_image_refresh(struct device
*dev
,
1403 struct device_attribute
*attr
,
1407 struct rbd_device
*rbd_dev
= dev_to_rbd(dev
);
1411 mutex_lock_nested(&ctl_mutex
, SINGLE_DEPTH_NESTING
);
1413 rc
= __rbd_update_snaps(rbd_dev
);
1417 mutex_unlock(&ctl_mutex
);
1421 static DEVICE_ATTR(size
, S_IRUGO
, rbd_size_show
, NULL
);
1422 static DEVICE_ATTR(major
, S_IRUGO
, rbd_major_show
, NULL
);
1423 static DEVICE_ATTR(client_id
, S_IRUGO
, rbd_client_id_show
, NULL
);
1424 static DEVICE_ATTR(pool
, S_IRUGO
, rbd_pool_show
, NULL
);
1425 static DEVICE_ATTR(name
, S_IRUGO
, rbd_name_show
, NULL
);
1426 static DEVICE_ATTR(refresh
, S_IWUSR
, NULL
, rbd_image_refresh
);
1427 static DEVICE_ATTR(current_snap
, S_IRUGO
, rbd_snap_show
, NULL
);
1428 static DEVICE_ATTR(create_snap
, S_IWUSR
, NULL
, rbd_snap_add
);
1429 static DEVICE_ATTR(rollback_snap
, S_IWUSR
, NULL
, rbd_snap_rollback
);
1431 static struct attribute
*rbd_attrs
[] = {
1432 &dev_attr_size
.attr
,
1433 &dev_attr_major
.attr
,
1434 &dev_attr_client_id
.attr
,
1435 &dev_attr_pool
.attr
,
1436 &dev_attr_name
.attr
,
1437 &dev_attr_current_snap
.attr
,
1438 &dev_attr_refresh
.attr
,
1439 &dev_attr_create_snap
.attr
,
1440 &dev_attr_rollback_snap
.attr
,
1444 static struct attribute_group rbd_attr_group
= {
1448 static const struct attribute_group
*rbd_attr_groups
[] = {
1453 static void rbd_sysfs_dev_release(struct device
*dev
)
1457 static struct device_type rbd_device_type
= {
1459 .groups
= rbd_attr_groups
,
1460 .release
= rbd_sysfs_dev_release
,
1468 static ssize_t
rbd_snap_size_show(struct device
*dev
,
1469 struct device_attribute
*attr
,
1472 struct rbd_snap
*snap
= container_of(dev
, struct rbd_snap
, dev
);
1474 return sprintf(buf
, "%lld\n", (long long)snap
->size
);
1477 static ssize_t
rbd_snap_id_show(struct device
*dev
,
1478 struct device_attribute
*attr
,
1481 struct rbd_snap
*snap
= container_of(dev
, struct rbd_snap
, dev
);
1483 return sprintf(buf
, "%lld\n", (long long)snap
->id
);
1486 static DEVICE_ATTR(snap_size
, S_IRUGO
, rbd_snap_size_show
, NULL
);
1487 static DEVICE_ATTR(snap_id
, S_IRUGO
, rbd_snap_id_show
, NULL
);
1489 static struct attribute
*rbd_snap_attrs
[] = {
1490 &dev_attr_snap_size
.attr
,
1491 &dev_attr_snap_id
.attr
,
1495 static struct attribute_group rbd_snap_attr_group
= {
1496 .attrs
= rbd_snap_attrs
,
1499 static void rbd_snap_dev_release(struct device
*dev
)
1501 struct rbd_snap
*snap
= container_of(dev
, struct rbd_snap
, dev
);
1506 static const struct attribute_group
*rbd_snap_attr_groups
[] = {
1507 &rbd_snap_attr_group
,
1511 static struct device_type rbd_snap_device_type
= {
1512 .groups
= rbd_snap_attr_groups
,
1513 .release
= rbd_snap_dev_release
,
1516 static void __rbd_remove_snap_dev(struct rbd_device
*rbd_dev
,
1517 struct rbd_snap
*snap
)
1519 list_del(&snap
->node
);
1520 device_unregister(&snap
->dev
);
1523 static int rbd_register_snap_dev(struct rbd_device
*rbd_dev
,
1524 struct rbd_snap
*snap
,
1525 struct device
*parent
)
1527 struct device
*dev
= &snap
->dev
;
1530 dev
->type
= &rbd_snap_device_type
;
1531 dev
->parent
= parent
;
1532 dev
->release
= rbd_snap_dev_release
;
1533 dev_set_name(dev
, "snap_%s", snap
->name
);
1534 ret
= device_register(dev
);
1539 static int __rbd_add_snap_dev(struct rbd_device
*rbd_dev
,
1540 int i
, const char *name
,
1541 struct rbd_snap
**snapp
)
1544 struct rbd_snap
*snap
= kzalloc(sizeof(*snap
), GFP_KERNEL
);
1547 snap
->name
= kstrdup(name
, GFP_KERNEL
);
1548 snap
->size
= rbd_dev
->header
.snap_sizes
[i
];
1549 snap
->id
= rbd_dev
->header
.snapc
->snaps
[i
];
1550 if (device_is_registered(&rbd_dev
->dev
)) {
1551 ret
= rbd_register_snap_dev(rbd_dev
, snap
,
1565 * search for the previous snap in a null delimited string list
1567 const char *rbd_prev_snap_name(const char *name
, const char *start
)
1569 if (name
< start
+ 2)
1582 * compare the old list of snapshots that we have to what's in the header
1583 * and update it accordingly. Note that the header holds the snapshots
1584 * in a reverse order (from newest to oldest) and we need to go from
1585 * older to new so that we don't get a duplicate snap name when
1586 * doing the process (e.g., removed snapshot and recreated a new
1587 * one with the same name.
1589 static int __rbd_init_snaps_header(struct rbd_device
*rbd_dev
)
1591 const char *name
, *first_name
;
1592 int i
= rbd_dev
->header
.total_snaps
;
1593 struct rbd_snap
*snap
, *old_snap
= NULL
;
1595 struct list_head
*p
, *n
;
1597 first_name
= rbd_dev
->header
.snap_names
;
1598 name
= first_name
+ rbd_dev
->header
.snap_names_len
;
1600 list_for_each_prev_safe(p
, n
, &rbd_dev
->snaps
) {
1603 old_snap
= list_entry(p
, struct rbd_snap
, node
);
1606 cur_id
= rbd_dev
->header
.snapc
->snaps
[i
- 1];
1608 if (!i
|| old_snap
->id
< cur_id
) {
1609 /* old_snap->id was skipped, thus was removed */
1610 __rbd_remove_snap_dev(rbd_dev
, old_snap
);
1613 if (old_snap
->id
== cur_id
) {
1614 /* we have this snapshot already */
1616 name
= rbd_prev_snap_name(name
, first_name
);
1620 i
--, name
= rbd_prev_snap_name(name
, first_name
)) {
1625 cur_id
= rbd_dev
->header
.snapc
->snaps
[i
];
1626 /* snapshot removal? handle it above */
1627 if (cur_id
>= old_snap
->id
)
1629 /* a new snapshot */
1630 ret
= __rbd_add_snap_dev(rbd_dev
, i
- 1, name
, &snap
);
1634 /* note that we add it backward so using n and not p */
1635 list_add(&snap
->node
, n
);
1639 /* we're done going over the old snap list, just add what's left */
1640 for (; i
> 0; i
--) {
1641 name
= rbd_prev_snap_name(name
, first_name
);
1646 ret
= __rbd_add_snap_dev(rbd_dev
, i
- 1, name
, &snap
);
1649 list_add(&snap
->node
, &rbd_dev
->snaps
);
1656 static void rbd_root_dev_release(struct device
*dev
)
1660 static struct device rbd_root_dev
= {
1662 .release
= rbd_root_dev_release
,
1665 static int rbd_bus_add_dev(struct rbd_device
*rbd_dev
)
1669 struct rbd_snap
*snap
;
1671 mutex_lock_nested(&ctl_mutex
, SINGLE_DEPTH_NESTING
);
1672 dev
= &rbd_dev
->dev
;
1674 dev
->bus
= &rbd_bus_type
;
1675 dev
->type
= &rbd_device_type
;
1676 dev
->parent
= &rbd_root_dev
;
1677 dev
->release
= rbd_dev_release
;
1678 dev_set_name(dev
, "%d", rbd_dev
->id
);
1679 ret
= device_register(dev
);
1683 list_for_each_entry(snap
, &rbd_dev
->snaps
, node
) {
1684 ret
= rbd_register_snap_dev(rbd_dev
, snap
,
1690 mutex_unlock(&ctl_mutex
);
1693 mutex_unlock(&ctl_mutex
);
1697 static void rbd_bus_del_dev(struct rbd_device
*rbd_dev
)
1699 device_unregister(&rbd_dev
->dev
);
1702 static ssize_t
rbd_add(struct bus_type
*bus
, const char *buf
, size_t count
)
1704 struct ceph_osd_client
*osdc
;
1705 struct rbd_device
*rbd_dev
;
1706 ssize_t rc
= -ENOMEM
;
1707 int irc
, new_id
= 0;
1708 struct list_head
*tmp
;
1712 if (!try_module_get(THIS_MODULE
))
1715 mon_dev_name
= kmalloc(RBD_MAX_OPT_LEN
, GFP_KERNEL
);
1719 options
= kmalloc(RBD_MAX_OPT_LEN
, GFP_KERNEL
);
1723 /* new rbd_device object */
1724 rbd_dev
= kzalloc(sizeof(*rbd_dev
), GFP_KERNEL
);
1728 /* static rbd_device initialization */
1729 spin_lock_init(&rbd_dev
->lock
);
1730 INIT_LIST_HEAD(&rbd_dev
->node
);
1731 INIT_LIST_HEAD(&rbd_dev
->snaps
);
1733 /* generate unique id: find highest unique id, add one */
1734 mutex_lock_nested(&ctl_mutex
, SINGLE_DEPTH_NESTING
);
1736 list_for_each(tmp
, &rbd_dev_list
) {
1737 struct rbd_device
*rbd_dev
;
1739 rbd_dev
= list_entry(tmp
, struct rbd_device
, node
);
1740 if (rbd_dev
->id
>= new_id
)
1741 new_id
= rbd_dev
->id
+ 1;
1744 rbd_dev
->id
= new_id
;
1746 /* add to global list */
1747 list_add_tail(&rbd_dev
->node
, &rbd_dev_list
);
1749 /* parse add command */
1750 if (sscanf(buf
, "%" __stringify(RBD_MAX_OPT_LEN
) "s "
1751 "%" __stringify(RBD_MAX_OPT_LEN
) "s "
1752 "%" __stringify(RBD_MAX_POOL_NAME_LEN
) "s "
1753 "%" __stringify(RBD_MAX_OBJ_NAME_LEN
) "s"
1754 "%" __stringify(RBD_MAX_SNAP_NAME_LEN
) "s",
1755 mon_dev_name
, options
, rbd_dev
->pool_name
,
1756 rbd_dev
->obj
, rbd_dev
->snap_name
) < 4) {
1761 if (rbd_dev
->snap_name
[0] == 0)
1762 rbd_dev
->snap_name
[0] = '-';
1764 rbd_dev
->obj_len
= strlen(rbd_dev
->obj
);
1765 snprintf(rbd_dev
->obj_md_name
, sizeof(rbd_dev
->obj_md_name
), "%s%s",
1766 rbd_dev
->obj
, RBD_SUFFIX
);
1768 /* initialize rest of new object */
1769 snprintf(rbd_dev
->name
, DEV_NAME_LEN
, DRV_NAME
"%d", rbd_dev
->id
);
1770 rc
= rbd_get_client(rbd_dev
, mon_dev_name
, options
);
1774 mutex_unlock(&ctl_mutex
);
1777 osdc
= &rbd_dev
->client
->osdc
;
1778 rc
= ceph_pg_poolid_by_name(osdc
->osdmap
, rbd_dev
->pool_name
);
1780 goto err_out_client
;
1781 rbd_dev
->poolid
= rc
;
1783 /* register our block device */
1784 irc
= register_blkdev(0, rbd_dev
->name
);
1787 goto err_out_client
;
1789 rbd_dev
->major
= irc
;
1791 rc
= rbd_bus_add_dev(rbd_dev
);
1793 goto err_out_blkdev
;
1795 /* set up and announce blkdev mapping */
1796 rc
= rbd_init_disk(rbd_dev
);
1803 mutex_lock_nested(&ctl_mutex
, SINGLE_DEPTH_NESTING
);
1804 list_del_init(&rbd_dev
->node
);
1805 mutex_unlock(&ctl_mutex
);
1807 /* this will also clean up rest of rbd_dev stuff */
1809 rbd_bus_del_dev(rbd_dev
);
1811 kfree(mon_dev_name
);
1815 unregister_blkdev(rbd_dev
->major
, rbd_dev
->name
);
1817 rbd_put_client(rbd_dev
);
1818 mutex_lock_nested(&ctl_mutex
, SINGLE_DEPTH_NESTING
);
1820 list_del_init(&rbd_dev
->node
);
1821 mutex_unlock(&ctl_mutex
);
1827 kfree(mon_dev_name
);
1829 dout("Error adding device %s\n", buf
);
1830 module_put(THIS_MODULE
);
1834 static struct rbd_device
*__rbd_get_dev(unsigned long id
)
1836 struct list_head
*tmp
;
1837 struct rbd_device
*rbd_dev
;
1839 list_for_each(tmp
, &rbd_dev_list
) {
1840 rbd_dev
= list_entry(tmp
, struct rbd_device
, node
);
1841 if (rbd_dev
->id
== id
)
1847 static void rbd_dev_release(struct device
*dev
)
1849 struct rbd_device
*rbd_dev
=
1850 container_of(dev
, struct rbd_device
, dev
);
1852 rbd_put_client(rbd_dev
);
1854 /* clean up and free blkdev */
1855 rbd_free_disk(rbd_dev
);
1856 unregister_blkdev(rbd_dev
->major
, rbd_dev
->name
);
1859 /* release module ref */
1860 module_put(THIS_MODULE
);
1863 static ssize_t
rbd_remove(struct bus_type
*bus
,
1867 struct rbd_device
*rbd_dev
= NULL
;
1872 rc
= strict_strtoul(buf
, 10, &ul
);
1876 /* convert to int; abort if we lost anything in the conversion */
1877 target_id
= (int) ul
;
1878 if (target_id
!= ul
)
1881 mutex_lock_nested(&ctl_mutex
, SINGLE_DEPTH_NESTING
);
1883 rbd_dev
= __rbd_get_dev(target_id
);
1889 list_del_init(&rbd_dev
->node
);
1891 __rbd_remove_all_snaps(rbd_dev
);
1892 rbd_bus_del_dev(rbd_dev
);
1895 mutex_unlock(&ctl_mutex
);
1899 static ssize_t
rbd_snap_add(struct device
*dev
,
1900 struct device_attribute
*attr
,
1904 struct rbd_device
*rbd_dev
= dev_to_rbd(dev
);
1906 char *name
= kmalloc(count
+ 1, GFP_KERNEL
);
1910 snprintf(name
, count
, "%s", buf
);
1912 mutex_lock_nested(&ctl_mutex
, SINGLE_DEPTH_NESTING
);
1914 ret
= rbd_header_add_snap(rbd_dev
,
1919 ret
= __rbd_update_snaps(rbd_dev
);
1925 mutex_unlock(&ctl_mutex
);
1930 static ssize_t
rbd_snap_rollback(struct device
*dev
,
1931 struct device_attribute
*attr
,
1935 struct rbd_device
*rbd_dev
= dev_to_rbd(dev
);
1939 char *seg_name
= NULL
;
1940 char *snap_name
= kmalloc(count
+ 1, GFP_KERNEL
);
1945 /* parse snaps add command */
1946 snprintf(snap_name
, count
, "%s", buf
);
1947 seg_name
= kmalloc(RBD_MAX_SEG_NAME_LEN
+ 1, GFP_NOIO
);
1951 mutex_lock_nested(&ctl_mutex
, SINGLE_DEPTH_NESTING
);
1953 ret
= snap_by_name(&rbd_dev
->header
, snap_name
, &snapid
, NULL
);
1957 dout("snapid=%lld\n", snapid
);
1960 while (cur_ofs
< rbd_dev
->header
.image_size
) {
1961 cur_ofs
+= rbd_get_segment(&rbd_dev
->header
,
1965 dout("seg_name=%s\n", seg_name
);
1967 ret
= rbd_req_sync_rollback_obj(rbd_dev
, snapid
, seg_name
);
1969 pr_warning("could not roll back obj %s err=%d\n",
1973 ret
= __rbd_update_snaps(rbd_dev
);
1980 mutex_unlock(&ctl_mutex
);
1988 static struct bus_attribute rbd_bus_attrs
[] = {
1989 __ATTR(add
, S_IWUSR
, NULL
, rbd_add
),
1990 __ATTR(remove
, S_IWUSR
, NULL
, rbd_remove
),
1995 * create control files in sysfs
1998 static int rbd_sysfs_init(void)
2002 rbd_bus_type
.bus_attrs
= rbd_bus_attrs
;
2004 ret
= bus_register(&rbd_bus_type
);
2008 ret
= device_register(&rbd_root_dev
);
2013 static void rbd_sysfs_cleanup(void)
2015 device_unregister(&rbd_root_dev
);
2016 bus_unregister(&rbd_bus_type
);
2019 int __init
rbd_init(void)
2023 rc
= rbd_sysfs_init();
2026 spin_lock_init(&node_lock
);
2027 pr_info("loaded " DRV_NAME_LONG
"\n");
2031 void __exit
rbd_exit(void)
2033 rbd_sysfs_cleanup();
2036 module_init(rbd_init
);
2037 module_exit(rbd_exit
);
2039 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
2040 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
2041 MODULE_DESCRIPTION("rados block device");
2043 /* following authorship retained from original osdblk.c */
2044 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
2046 MODULE_LICENSE("GPL");