block: rbd: fixed may leaks
[linux-2.6.git] / drivers / block / rbd.c
blob83b15dd24853ca39949e21e9264afc0ec92b632d
1 /*
2 rbd.c -- Export ceph rados objects as a Linux block device
5 based on drivers/block/osdblk.c:
7 Copyright 2009 Red Hat, Inc.
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation.
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
18 You should have received a copy of the GNU General Public License
19 along with this program; see the file COPYING. If not, write to
20 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
24 Instructions for use
25 --------------------
27 1) Map a Linux block device to an existing rbd image.
29 Usage: <mon ip addr> <options> <pool name> <rbd image name> [snap name]
31 $ echo "192.168.0.1 name=admin rbd foo" > /sys/class/rbd/add
33 The snapshot name can be "-" or omitted to map the image read/write.
35 2) List all active blkdev<->object mappings.
37 In this example, we have performed step #1 twice, creating two blkdevs,
38 mapped to two separate rados objects in the rados rbd pool
40 $ cat /sys/class/rbd/list
41 #id major client_name pool name snap KB
42 0 254 client4143 rbd foo - 1024000
44 The columns, in order, are:
45 - blkdev unique id
46 - blkdev assigned major
47 - rados client id
48 - rados pool name
49 - rados block device name
50 - mapped snapshot ("-" if none)
51 - device size in KB
54 3) Create a snapshot.
56 Usage: <blkdev id> <snapname>
58 $ echo "0 mysnap" > /sys/class/rbd/snap_create
61 4) Listing a snapshot.
63 $ cat /sys/class/rbd/snaps_list
64 #id snap KB
65 0 - 1024000 (*)
66 0 foo 1024000
68 The columns, in order, are:
69 - blkdev unique id
70 - snapshot name, '-' means none (active read/write version)
71 - size of device at time of snapshot
72 - the (*) indicates this is the active version
74 5) Rollback to snapshot.
76 Usage: <blkdev id> <snapname>
78 $ echo "0 mysnap" > /sys/class/rbd/snap_rollback
81 6) Mapping an image using snapshot.
83 A snapshot mapping is read-only. This is being done by passing
84 snap=<snapname> to the options when adding a device.
86 $ echo "192.168.0.1 name=admin,snap=mysnap rbd foo" > /sys/class/rbd/add
89 7) Remove an active blkdev<->rbd image mapping.
91 In this example, we remove the mapping with blkdev unique id 1.
93 $ echo 1 > /sys/class/rbd/remove
96 NOTE: The actual creation and deletion of rados objects is outside the scope
97 of this driver.
101 #include <linux/ceph/libceph.h>
102 #include <linux/ceph/osd_client.h>
103 #include <linux/ceph/mon_client.h>
104 #include <linux/ceph/decode.h>
106 #include <linux/kernel.h>
107 #include <linux/device.h>
108 #include <linux/module.h>
109 #include <linux/fs.h>
110 #include <linux/blkdev.h>
112 #include "rbd_types.h"
114 #define DRV_NAME "rbd"
115 #define DRV_NAME_LONG "rbd (rados block device)"
117 #define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
119 #define RBD_MAX_MD_NAME_LEN (96 + sizeof(RBD_SUFFIX))
120 #define RBD_MAX_POOL_NAME_LEN 64
121 #define RBD_MAX_SNAP_NAME_LEN 32
122 #define RBD_MAX_OPT_LEN 1024
124 #define RBD_SNAP_HEAD_NAME "-"
126 #define DEV_NAME_LEN 32
129 * block device image metadata (in-memory version)
131 struct rbd_image_header {
132 u64 image_size;
133 char block_name[32];
134 __u8 obj_order;
135 __u8 crypt_type;
136 __u8 comp_type;
137 struct rw_semaphore snap_rwsem;
138 struct ceph_snap_context *snapc;
139 size_t snap_names_len;
140 u64 snap_seq;
141 u32 total_snaps;
143 char *snap_names;
144 u64 *snap_sizes;
148 * an instance of the client. multiple devices may share a client.
150 struct rbd_client {
151 struct ceph_client *client;
152 struct kref kref;
153 struct list_head node;
157 * a single io request
159 struct rbd_request {
160 struct request *rq; /* blk layer request */
161 struct bio *bio; /* cloned bio */
162 struct page **pages; /* list of used pages */
163 u64 len;
167 * a single device
169 struct rbd_device {
170 int id; /* blkdev unique id */
172 int major; /* blkdev assigned major */
173 struct gendisk *disk; /* blkdev's gendisk and rq */
174 struct request_queue *q;
176 struct ceph_client *client;
177 struct rbd_client *rbd_client;
179 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
181 spinlock_t lock; /* queue lock */
183 struct rbd_image_header header;
184 char obj[RBD_MAX_OBJ_NAME_LEN]; /* rbd image name */
185 int obj_len;
186 char obj_md_name[RBD_MAX_MD_NAME_LEN]; /* hdr nm. */
187 char pool_name[RBD_MAX_POOL_NAME_LEN];
188 int poolid;
190 char snap_name[RBD_MAX_SNAP_NAME_LEN];
191 u32 cur_snap; /* index+1 of current snapshot within snap context
192 0 - for the head */
193 int read_only;
195 struct list_head node;
198 static spinlock_t node_lock; /* protects client get/put */
200 static struct class *class_rbd; /* /sys/class/rbd */
201 static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
202 static LIST_HEAD(rbd_dev_list); /* devices */
203 static LIST_HEAD(rbd_client_list); /* clients */
206 static int rbd_open(struct block_device *bdev, fmode_t mode)
208 struct gendisk *disk = bdev->bd_disk;
209 struct rbd_device *rbd_dev = disk->private_data;
211 set_device_ro(bdev, rbd_dev->read_only);
213 if ((mode & FMODE_WRITE) && rbd_dev->read_only)
214 return -EROFS;
216 return 0;
219 static const struct block_device_operations rbd_bd_ops = {
220 .owner = THIS_MODULE,
221 .open = rbd_open,
225 * Initialize an rbd client instance.
226 * We own *opt.
228 static struct rbd_client *rbd_client_create(struct ceph_options *opt)
230 struct rbd_client *rbdc;
231 int ret = -ENOMEM;
233 dout("rbd_client_create\n");
234 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
235 if (!rbdc)
236 goto out_opt;
238 kref_init(&rbdc->kref);
239 INIT_LIST_HEAD(&rbdc->node);
241 rbdc->client = ceph_create_client(opt, rbdc);
242 if (IS_ERR(rbdc->client))
243 goto out_rbdc;
244 opt = NULL; /* Now rbdc->client is responsible for opt */
246 ret = ceph_open_session(rbdc->client);
247 if (ret < 0)
248 goto out_err;
250 spin_lock(&node_lock);
251 list_add_tail(&rbdc->node, &rbd_client_list);
252 spin_unlock(&node_lock);
254 dout("rbd_client_create created %p\n", rbdc);
255 return rbdc;
257 out_err:
258 ceph_destroy_client(rbdc->client);
259 out_rbdc:
260 kfree(rbdc);
261 out_opt:
262 if (opt)
263 ceph_destroy_options(opt);
264 return ERR_PTR(ret);
268 * Find a ceph client with specific addr and configuration.
270 static struct rbd_client *__rbd_client_find(struct ceph_options *opt)
272 struct rbd_client *client_node;
274 if (opt->flags & CEPH_OPT_NOSHARE)
275 return NULL;
277 list_for_each_entry(client_node, &rbd_client_list, node)
278 if (ceph_compare_options(opt, client_node->client) == 0)
279 return client_node;
280 return NULL;
284 * Get a ceph client with specific addr and configuration, if one does
285 * not exist create it.
287 static int rbd_get_client(struct rbd_device *rbd_dev, const char *mon_addr,
288 char *options)
290 struct rbd_client *rbdc;
291 struct ceph_options *opt;
292 int ret;
294 ret = ceph_parse_options(&opt, options, mon_addr,
295 mon_addr + strlen(mon_addr), NULL, NULL);
296 if (ret < 0)
297 return ret;
299 spin_lock(&node_lock);
300 rbdc = __rbd_client_find(opt);
301 if (rbdc) {
302 ceph_destroy_options(opt);
304 /* using an existing client */
305 kref_get(&rbdc->kref);
306 rbd_dev->rbd_client = rbdc;
307 rbd_dev->client = rbdc->client;
308 spin_unlock(&node_lock);
309 return 0;
311 spin_unlock(&node_lock);
313 rbdc = rbd_client_create(opt);
314 if (IS_ERR(rbdc))
315 return PTR_ERR(rbdc);
317 rbd_dev->rbd_client = rbdc;
318 rbd_dev->client = rbdc->client;
319 return 0;
323 * Destroy ceph client
325 static void rbd_client_release(struct kref *kref)
327 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
329 dout("rbd_release_client %p\n", rbdc);
330 spin_lock(&node_lock);
331 list_del(&rbdc->node);
332 spin_unlock(&node_lock);
334 ceph_destroy_client(rbdc->client);
335 kfree(rbdc);
339 * Drop reference to ceph client node. If it's not referenced anymore, release
340 * it.
342 static void rbd_put_client(struct rbd_device *rbd_dev)
344 kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
345 rbd_dev->rbd_client = NULL;
346 rbd_dev->client = NULL;
351 * Create a new header structure, translate header format from the on-disk
352 * header.
354 static int rbd_header_from_disk(struct rbd_image_header *header,
355 struct rbd_image_header_ondisk *ondisk,
356 int allocated_snaps,
357 gfp_t gfp_flags)
359 int i;
360 u32 snap_count = le32_to_cpu(ondisk->snap_count);
361 int ret = -ENOMEM;
363 init_rwsem(&header->snap_rwsem);
365 header->snap_names_len = le64_to_cpu(ondisk->snap_names_len);
366 header->snapc = kmalloc(sizeof(struct ceph_snap_context) +
367 snap_count *
368 sizeof(struct rbd_image_snap_ondisk),
369 gfp_flags);
370 if (!header->snapc)
371 return -ENOMEM;
372 if (snap_count) {
373 header->snap_names = kmalloc(header->snap_names_len,
374 GFP_KERNEL);
375 if (!header->snap_names)
376 goto err_snapc;
377 header->snap_sizes = kmalloc(snap_count * sizeof(u64),
378 GFP_KERNEL);
379 if (!header->snap_sizes)
380 goto err_names;
381 } else {
382 header->snap_names = NULL;
383 header->snap_sizes = NULL;
385 memcpy(header->block_name, ondisk->block_name,
386 sizeof(ondisk->block_name));
388 header->image_size = le64_to_cpu(ondisk->image_size);
389 header->obj_order = ondisk->options.order;
390 header->crypt_type = ondisk->options.crypt_type;
391 header->comp_type = ondisk->options.comp_type;
393 atomic_set(&header->snapc->nref, 1);
394 header->snap_seq = le64_to_cpu(ondisk->snap_seq);
395 header->snapc->num_snaps = snap_count;
396 header->total_snaps = snap_count;
398 if (snap_count &&
399 allocated_snaps == snap_count) {
400 for (i = 0; i < snap_count; i++) {
401 header->snapc->snaps[i] =
402 le64_to_cpu(ondisk->snaps[i].id);
403 header->snap_sizes[i] =
404 le64_to_cpu(ondisk->snaps[i].image_size);
407 /* copy snapshot names */
408 memcpy(header->snap_names, &ondisk->snaps[i],
409 header->snap_names_len);
412 return 0;
414 err_names:
415 kfree(header->snap_names);
416 err_snapc:
417 kfree(header->snapc);
418 return ret;
421 static int snap_index(struct rbd_image_header *header, int snap_num)
423 return header->total_snaps - snap_num;
426 static u64 cur_snap_id(struct rbd_device *rbd_dev)
428 struct rbd_image_header *header = &rbd_dev->header;
430 if (!rbd_dev->cur_snap)
431 return 0;
433 return header->snapc->snaps[snap_index(header, rbd_dev->cur_snap)];
436 static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
437 u64 *seq, u64 *size)
439 int i;
440 char *p = header->snap_names;
442 for (i = 0; i < header->total_snaps; i++, p += strlen(p) + 1) {
443 if (strcmp(snap_name, p) == 0)
444 break;
446 if (i == header->total_snaps)
447 return -ENOENT;
448 if (seq)
449 *seq = header->snapc->snaps[i];
451 if (size)
452 *size = header->snap_sizes[i];
454 return i;
457 static int rbd_header_set_snap(struct rbd_device *dev,
458 const char *snap_name,
459 u64 *size)
461 struct rbd_image_header *header = &dev->header;
462 struct ceph_snap_context *snapc = header->snapc;
463 int ret = -ENOENT;
465 down_write(&header->snap_rwsem);
467 if (!snap_name ||
468 !*snap_name ||
469 strcmp(snap_name, "-") == 0 ||
470 strcmp(snap_name, RBD_SNAP_HEAD_NAME) == 0) {
471 if (header->total_snaps)
472 snapc->seq = header->snap_seq;
473 else
474 snapc->seq = 0;
475 dev->cur_snap = 0;
476 dev->read_only = 0;
477 if (size)
478 *size = header->image_size;
479 } else {
480 ret = snap_by_name(header, snap_name, &snapc->seq, size);
481 if (ret < 0)
482 goto done;
484 dev->cur_snap = header->total_snaps - ret;
485 dev->read_only = 1;
488 ret = 0;
489 done:
490 up_write(&header->snap_rwsem);
491 return ret;
494 static void rbd_header_free(struct rbd_image_header *header)
496 kfree(header->snapc);
497 kfree(header->snap_names);
498 kfree(header->snap_sizes);
502 * get the actual striped segment name, offset and length
504 static u64 rbd_get_segment(struct rbd_image_header *header,
505 const char *block_name,
506 u64 ofs, u64 len,
507 char *seg_name, u64 *segofs)
509 u64 seg = ofs >> header->obj_order;
511 if (seg_name)
512 snprintf(seg_name, RBD_MAX_SEG_NAME_LEN,
513 "%s.%012llx", block_name, seg);
515 ofs = ofs & ((1 << header->obj_order) - 1);
516 len = min_t(u64, len, (1 << header->obj_order) - ofs);
518 if (segofs)
519 *segofs = ofs;
521 return len;
525 * bio helpers
528 static void bio_chain_put(struct bio *chain)
530 struct bio *tmp;
532 while (chain) {
533 tmp = chain;
534 chain = chain->bi_next;
535 bio_put(tmp);
540 * zeros a bio chain, starting at specific offset
542 static void zero_bio_chain(struct bio *chain, int start_ofs)
544 struct bio_vec *bv;
545 unsigned long flags;
546 void *buf;
547 int i;
548 int pos = 0;
550 while (chain) {
551 bio_for_each_segment(bv, chain, i) {
552 if (pos + bv->bv_len > start_ofs) {
553 int remainder = max(start_ofs - pos, 0);
554 buf = bvec_kmap_irq(bv, &flags);
555 memset(buf + remainder, 0,
556 bv->bv_len - remainder);
557 bvec_kunmap_irq(bv, &flags);
559 pos += bv->bv_len;
562 chain = chain->bi_next;
567 * bio_chain_clone - clone a chain of bios up to a certain length.
568 * might return a bio_pair that will need to be released.
570 static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
571 struct bio_pair **bp,
572 int len, gfp_t gfpmask)
574 struct bio *tmp, *old_chain = *old, *new_chain = NULL, *tail = NULL;
575 int total = 0;
577 if (*bp) {
578 bio_pair_release(*bp);
579 *bp = NULL;
582 while (old_chain && (total < len)) {
583 tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
584 if (!tmp)
585 goto err_out;
587 if (total + old_chain->bi_size > len) {
588 struct bio_pair *bp;
591 * this split can only happen with a single paged bio,
592 * split_bio will BUG_ON if this is not the case
594 dout("bio_chain_clone split! total=%d remaining=%d"
595 "bi_size=%d\n",
596 (int)total, (int)len-total,
597 (int)old_chain->bi_size);
599 /* split the bio. We'll release it either in the next
600 call, or it will have to be released outside */
601 bp = bio_split(old_chain, (len - total) / 512ULL);
602 if (!bp)
603 goto err_out;
605 __bio_clone(tmp, &bp->bio1);
607 *next = &bp->bio2;
608 } else {
609 __bio_clone(tmp, old_chain);
610 *next = old_chain->bi_next;
613 tmp->bi_bdev = NULL;
614 gfpmask &= ~__GFP_WAIT;
615 tmp->bi_next = NULL;
617 if (!new_chain) {
618 new_chain = tail = tmp;
619 } else {
620 tail->bi_next = tmp;
621 tail = tmp;
623 old_chain = old_chain->bi_next;
625 total += tmp->bi_size;
628 BUG_ON(total < len);
630 if (tail)
631 tail->bi_next = NULL;
633 *old = old_chain;
635 return new_chain;
637 err_out:
638 dout("bio_chain_clone with err\n");
639 bio_chain_put(new_chain);
640 return NULL;
644 * helpers for osd request op vectors.
646 static int rbd_create_rw_ops(struct ceph_osd_req_op **ops,
647 int num_ops,
648 int opcode,
649 u32 payload_len)
651 *ops = kzalloc(sizeof(struct ceph_osd_req_op) * (num_ops + 1),
652 GFP_NOIO);
653 if (!*ops)
654 return -ENOMEM;
655 (*ops)[0].op = opcode;
657 * op extent offset and length will be set later on
658 * in calc_raw_layout()
660 (*ops)[0].payload_len = payload_len;
661 return 0;
664 static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
666 kfree(ops);
670 * Send ceph osd request
672 static int rbd_do_request(struct request *rq,
673 struct rbd_device *dev,
674 struct ceph_snap_context *snapc,
675 u64 snapid,
676 const char *obj, u64 ofs, u64 len,
677 struct bio *bio,
678 struct page **pages,
679 int num_pages,
680 int flags,
681 struct ceph_osd_req_op *ops,
682 int num_reply,
683 void (*rbd_cb)(struct ceph_osd_request *req,
684 struct ceph_msg *msg))
686 struct ceph_osd_request *req;
687 struct ceph_file_layout *layout;
688 int ret;
689 u64 bno;
690 struct timespec mtime = CURRENT_TIME;
691 struct rbd_request *req_data;
692 struct ceph_osd_request_head *reqhead;
693 struct rbd_image_header *header = &dev->header;
695 ret = -ENOMEM;
696 req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
697 if (!req_data)
698 goto done;
700 dout("rbd_do_request len=%lld ofs=%lld\n", len, ofs);
702 down_read(&header->snap_rwsem);
704 req = ceph_osdc_alloc_request(&dev->client->osdc, flags,
705 snapc,
706 ops,
707 false,
708 GFP_NOIO, pages, bio);
709 if (IS_ERR(req)) {
710 up_read(&header->snap_rwsem);
711 ret = PTR_ERR(req);
712 goto done_pages;
715 req->r_callback = rbd_cb;
717 req_data->rq = rq;
718 req_data->bio = bio;
719 req_data->pages = pages;
720 req_data->len = len;
722 req->r_priv = req_data;
724 reqhead = req->r_request->front.iov_base;
725 reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
727 strncpy(req->r_oid, obj, sizeof(req->r_oid));
728 req->r_oid_len = strlen(req->r_oid);
730 layout = &req->r_file_layout;
731 memset(layout, 0, sizeof(*layout));
732 layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
733 layout->fl_stripe_count = cpu_to_le32(1);
734 layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
735 layout->fl_pg_preferred = cpu_to_le32(-1);
736 layout->fl_pg_pool = cpu_to_le32(dev->poolid);
737 ceph_calc_raw_layout(&dev->client->osdc, layout, snapid,
738 ofs, &len, &bno, req, ops);
740 ceph_osdc_build_request(req, ofs, &len,
741 ops,
742 snapc,
743 &mtime,
744 req->r_oid, req->r_oid_len);
745 up_read(&header->snap_rwsem);
747 ret = ceph_osdc_start_request(&dev->client->osdc, req, false);
748 if (ret < 0)
749 goto done_err;
751 if (!rbd_cb) {
752 ret = ceph_osdc_wait_request(&dev->client->osdc, req);
753 ceph_osdc_put_request(req);
755 return ret;
757 done_err:
758 bio_chain_put(req_data->bio);
759 ceph_osdc_put_request(req);
760 done_pages:
761 kfree(req_data);
762 done:
763 if (rq)
764 blk_end_request(rq, ret, len);
765 return ret;
769 * Ceph osd op callback
771 static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
773 struct rbd_request *req_data = req->r_priv;
774 struct ceph_osd_reply_head *replyhead;
775 struct ceph_osd_op *op;
776 __s32 rc;
777 u64 bytes;
778 int read_op;
780 /* parse reply */
781 replyhead = msg->front.iov_base;
782 WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
783 op = (void *)(replyhead + 1);
784 rc = le32_to_cpu(replyhead->result);
785 bytes = le64_to_cpu(op->extent.length);
786 read_op = (le32_to_cpu(op->op) == CEPH_OSD_OP_READ);
788 dout("rbd_req_cb bytes=%lld readop=%d rc=%d\n", bytes, read_op, rc);
790 if (rc == -ENOENT && read_op) {
791 zero_bio_chain(req_data->bio, 0);
792 rc = 0;
793 } else if (rc == 0 && read_op && bytes < req_data->len) {
794 zero_bio_chain(req_data->bio, bytes);
795 bytes = req_data->len;
798 blk_end_request(req_data->rq, rc, bytes);
800 if (req_data->bio)
801 bio_chain_put(req_data->bio);
803 ceph_osdc_put_request(req);
804 kfree(req_data);
808 * Do a synchronous ceph osd operation
810 static int rbd_req_sync_op(struct rbd_device *dev,
811 struct ceph_snap_context *snapc,
812 u64 snapid,
813 int opcode,
814 int flags,
815 struct ceph_osd_req_op *orig_ops,
816 int num_reply,
817 const char *obj,
818 u64 ofs, u64 len,
819 char *buf)
821 int ret;
822 struct page **pages;
823 int num_pages;
824 struct ceph_osd_req_op *ops = orig_ops;
825 u32 payload_len;
827 num_pages = calc_pages_for(ofs , len);
828 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
829 if (!pages)
830 return -ENOMEM;
832 if (!orig_ops) {
833 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? len : 0);
834 ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
835 if (ret < 0)
836 goto done;
838 if ((flags & CEPH_OSD_FLAG_WRITE) && buf) {
839 ret = ceph_copy_to_page_vector(pages, buf, ofs, len);
840 if (ret < 0)
841 goto done_ops;
845 ret = rbd_do_request(NULL, dev, snapc, snapid,
846 obj, ofs, len, NULL,
847 pages, num_pages,
848 flags,
849 ops,
851 NULL);
852 if (ret < 0)
853 goto done_ops;
855 if ((flags & CEPH_OSD_FLAG_READ) && buf)
856 ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
858 done_ops:
859 if (!orig_ops)
860 rbd_destroy_ops(ops);
861 done:
862 ceph_release_page_vector(pages, num_pages);
863 return ret;
867 * Do an asynchronous ceph osd operation
869 static int rbd_do_op(struct request *rq,
870 struct rbd_device *rbd_dev ,
871 struct ceph_snap_context *snapc,
872 u64 snapid,
873 int opcode, int flags, int num_reply,
874 u64 ofs, u64 len,
875 struct bio *bio)
877 char *seg_name;
878 u64 seg_ofs;
879 u64 seg_len;
880 int ret;
881 struct ceph_osd_req_op *ops;
882 u32 payload_len;
884 seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
885 if (!seg_name)
886 return -ENOMEM;
888 seg_len = rbd_get_segment(&rbd_dev->header,
889 rbd_dev->header.block_name,
890 ofs, len,
891 seg_name, &seg_ofs);
892 if ((s64)seg_len < 0) {
893 ret = seg_len;
894 goto done;
897 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
899 ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
900 if (ret < 0)
901 goto done;
903 /* we've taken care of segment sizes earlier when we
904 cloned the bios. We should never have a segment
905 truncated at this point */
906 BUG_ON(seg_len < len);
908 ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
909 seg_name, seg_ofs, seg_len,
910 bio,
911 NULL, 0,
912 flags,
913 ops,
914 num_reply,
915 rbd_req_cb);
916 done:
917 kfree(seg_name);
918 return ret;
922 * Request async osd write
924 static int rbd_req_write(struct request *rq,
925 struct rbd_device *rbd_dev,
926 struct ceph_snap_context *snapc,
927 u64 ofs, u64 len,
928 struct bio *bio)
930 return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
931 CEPH_OSD_OP_WRITE,
932 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
934 ofs, len, bio);
938 * Request async osd read
940 static int rbd_req_read(struct request *rq,
941 struct rbd_device *rbd_dev,
942 u64 snapid,
943 u64 ofs, u64 len,
944 struct bio *bio)
946 return rbd_do_op(rq, rbd_dev, NULL,
947 (snapid ? snapid : CEPH_NOSNAP),
948 CEPH_OSD_OP_READ,
949 CEPH_OSD_FLAG_READ,
951 ofs, len, bio);
955 * Request sync osd read
957 static int rbd_req_sync_read(struct rbd_device *dev,
958 struct ceph_snap_context *snapc,
959 u64 snapid,
960 const char *obj,
961 u64 ofs, u64 len,
962 char *buf)
964 return rbd_req_sync_op(dev, NULL,
965 (snapid ? snapid : CEPH_NOSNAP),
966 CEPH_OSD_OP_READ,
967 CEPH_OSD_FLAG_READ,
968 NULL,
969 1, obj, ofs, len, buf);
973 * Request sync osd read
975 static int rbd_req_sync_rollback_obj(struct rbd_device *dev,
976 u64 snapid,
977 const char *obj)
979 struct ceph_osd_req_op *ops;
980 int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_ROLLBACK, 0);
981 if (ret < 0)
982 return ret;
984 ops[0].snap.snapid = snapid;
986 ret = rbd_req_sync_op(dev, NULL,
987 CEPH_NOSNAP,
989 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
990 ops,
991 1, obj, 0, 0, NULL);
993 rbd_destroy_ops(ops);
995 if (ret < 0)
996 return ret;
998 return ret;
1002 * Request sync osd read
1004 static int rbd_req_sync_exec(struct rbd_device *dev,
1005 const char *obj,
1006 const char *cls,
1007 const char *method,
1008 const char *data,
1009 int len)
1011 struct ceph_osd_req_op *ops;
1012 int cls_len = strlen(cls);
1013 int method_len = strlen(method);
1014 int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_CALL,
1015 cls_len + method_len + len);
1016 if (ret < 0)
1017 return ret;
1019 ops[0].cls.class_name = cls;
1020 ops[0].cls.class_len = (__u8)cls_len;
1021 ops[0].cls.method_name = method;
1022 ops[0].cls.method_len = (__u8)method_len;
1023 ops[0].cls.argc = 0;
1024 ops[0].cls.indata = data;
1025 ops[0].cls.indata_len = len;
1027 ret = rbd_req_sync_op(dev, NULL,
1028 CEPH_NOSNAP,
1030 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1031 ops,
1032 1, obj, 0, 0, NULL);
1034 rbd_destroy_ops(ops);
1036 dout("cls_exec returned %d\n", ret);
1037 return ret;
1041 * block device queue callback
1043 static void rbd_rq_fn(struct request_queue *q)
1045 struct rbd_device *rbd_dev = q->queuedata;
1046 struct request *rq;
1047 struct bio_pair *bp = NULL;
1049 rq = blk_fetch_request(q);
1051 while (1) {
1052 struct bio *bio;
1053 struct bio *rq_bio, *next_bio = NULL;
1054 bool do_write;
1055 int size, op_size = 0;
1056 u64 ofs;
1058 /* peek at request from block layer */
1059 if (!rq)
1060 break;
1062 dout("fetched request\n");
1064 /* filter out block requests we don't understand */
1065 if ((rq->cmd_type != REQ_TYPE_FS)) {
1066 __blk_end_request_all(rq, 0);
1067 goto next;
1070 /* deduce our operation (read, write) */
1071 do_write = (rq_data_dir(rq) == WRITE);
1073 size = blk_rq_bytes(rq);
1074 ofs = blk_rq_pos(rq) * 512ULL;
1075 rq_bio = rq->bio;
1076 if (do_write && rbd_dev->read_only) {
1077 __blk_end_request_all(rq, -EROFS);
1078 goto next;
1081 spin_unlock_irq(q->queue_lock);
1083 dout("%s 0x%x bytes at 0x%llx\n",
1084 do_write ? "write" : "read",
1085 size, blk_rq_pos(rq) * 512ULL);
1087 do {
1088 /* a bio clone to be passed down to OSD req */
1089 dout("rq->bio->bi_vcnt=%d\n", rq->bio->bi_vcnt);
1090 op_size = rbd_get_segment(&rbd_dev->header,
1091 rbd_dev->header.block_name,
1092 ofs, size,
1093 NULL, NULL);
1094 bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1095 op_size, GFP_ATOMIC);
1096 if (!bio) {
1097 spin_lock_irq(q->queue_lock);
1098 __blk_end_request_all(rq, -ENOMEM);
1099 goto next;
1102 /* init OSD command: write or read */
1103 if (do_write)
1104 rbd_req_write(rq, rbd_dev,
1105 rbd_dev->header.snapc,
1106 ofs,
1107 op_size, bio);
1108 else
1109 rbd_req_read(rq, rbd_dev,
1110 cur_snap_id(rbd_dev),
1111 ofs,
1112 op_size, bio);
1114 size -= op_size;
1115 ofs += op_size;
1117 rq_bio = next_bio;
1118 } while (size > 0);
1120 if (bp)
1121 bio_pair_release(bp);
1123 spin_lock_irq(q->queue_lock);
1124 next:
1125 rq = blk_fetch_request(q);
1130 * a queue callback. Makes sure that we don't create a bio that spans across
1131 * multiple osd objects. One exception would be with a single page bios,
1132 * which we handle later at bio_chain_clone
1134 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1135 struct bio_vec *bvec)
1137 struct rbd_device *rbd_dev = q->queuedata;
1138 unsigned int chunk_sectors = 1 << (rbd_dev->header.obj_order - 9);
1139 sector_t sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1140 unsigned int bio_sectors = bmd->bi_size >> 9;
1141 int max;
1143 max = (chunk_sectors - ((sector & (chunk_sectors - 1))
1144 + bio_sectors)) << 9;
1145 if (max < 0)
1146 max = 0; /* bio_add cannot handle a negative return */
1147 if (max <= bvec->bv_len && bio_sectors == 0)
1148 return bvec->bv_len;
1149 return max;
1152 static void rbd_free_disk(struct rbd_device *rbd_dev)
1154 struct gendisk *disk = rbd_dev->disk;
1156 if (!disk)
1157 return;
1159 rbd_header_free(&rbd_dev->header);
1161 if (disk->flags & GENHD_FL_UP)
1162 del_gendisk(disk);
1163 if (disk->queue)
1164 blk_cleanup_queue(disk->queue);
1165 put_disk(disk);
1169 * reload the ondisk the header
1171 static int rbd_read_header(struct rbd_device *rbd_dev,
1172 struct rbd_image_header *header)
1174 ssize_t rc;
1175 struct rbd_image_header_ondisk *dh;
1176 int snap_count = 0;
1177 u64 snap_names_len = 0;
1179 while (1) {
1180 int len = sizeof(*dh) +
1181 snap_count * sizeof(struct rbd_image_snap_ondisk) +
1182 snap_names_len;
1184 rc = -ENOMEM;
1185 dh = kmalloc(len, GFP_KERNEL);
1186 if (!dh)
1187 return -ENOMEM;
1189 rc = rbd_req_sync_read(rbd_dev,
1190 NULL, CEPH_NOSNAP,
1191 rbd_dev->obj_md_name,
1192 0, len,
1193 (char *)dh);
1194 if (rc < 0)
1195 goto out_dh;
1197 rc = rbd_header_from_disk(header, dh, snap_count, GFP_KERNEL);
1198 if (rc < 0)
1199 goto out_dh;
1201 if (snap_count != header->total_snaps) {
1202 snap_count = header->total_snaps;
1203 snap_names_len = header->snap_names_len;
1204 rbd_header_free(header);
1205 kfree(dh);
1206 continue;
1208 break;
1211 out_dh:
1212 kfree(dh);
1213 return rc;
1217 * create a snapshot
1219 static int rbd_header_add_snap(struct rbd_device *dev,
1220 const char *snap_name,
1221 gfp_t gfp_flags)
1223 int name_len = strlen(snap_name);
1224 u64 new_snapid;
1225 int ret;
1226 void *data, *data_start, *data_end;
1228 /* we should create a snapshot only if we're pointing at the head */
1229 if (dev->cur_snap)
1230 return -EINVAL;
1232 ret = ceph_monc_create_snapid(&dev->client->monc, dev->poolid,
1233 &new_snapid);
1234 dout("created snapid=%lld\n", new_snapid);
1235 if (ret < 0)
1236 return ret;
1238 data = kmalloc(name_len + 16, gfp_flags);
1239 if (!data)
1240 return -ENOMEM;
1242 data_start = data;
1243 data_end = data + name_len + 16;
1245 ceph_encode_string_safe(&data, data_end, snap_name, name_len, bad);
1246 ceph_encode_64_safe(&data, data_end, new_snapid, bad);
1248 ret = rbd_req_sync_exec(dev, dev->obj_md_name, "rbd", "snap_add",
1249 data_start, data - data_start);
1251 kfree(data_start);
1253 if (ret < 0)
1254 return ret;
1256 dev->header.snapc->seq = new_snapid;
1258 return 0;
1259 bad:
1260 return -ERANGE;
1264 * only read the first part of the ondisk header, without the snaps info
1266 static int rbd_update_snaps(struct rbd_device *rbd_dev)
1268 int ret;
1269 struct rbd_image_header h;
1270 u64 snap_seq;
1272 ret = rbd_read_header(rbd_dev, &h);
1273 if (ret < 0)
1274 return ret;
1276 down_write(&rbd_dev->header.snap_rwsem);
1278 snap_seq = rbd_dev->header.snapc->seq;
1280 kfree(rbd_dev->header.snapc);
1281 kfree(rbd_dev->header.snap_names);
1282 kfree(rbd_dev->header.snap_sizes);
1284 rbd_dev->header.total_snaps = h.total_snaps;
1285 rbd_dev->header.snapc = h.snapc;
1286 rbd_dev->header.snap_names = h.snap_names;
1287 rbd_dev->header.snap_sizes = h.snap_sizes;
1288 rbd_dev->header.snapc->seq = snap_seq;
1290 up_write(&rbd_dev->header.snap_rwsem);
1292 return 0;
1295 static int rbd_init_disk(struct rbd_device *rbd_dev)
1297 struct gendisk *disk;
1298 struct request_queue *q;
1299 int rc;
1300 u64 total_size = 0;
1302 /* contact OSD, request size info about the object being mapped */
1303 rc = rbd_read_header(rbd_dev, &rbd_dev->header);
1304 if (rc)
1305 return rc;
1307 rc = rbd_header_set_snap(rbd_dev, rbd_dev->snap_name, &total_size);
1308 if (rc)
1309 return rc;
1311 /* create gendisk info */
1312 rc = -ENOMEM;
1313 disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1314 if (!disk)
1315 goto out;
1317 sprintf(disk->disk_name, DRV_NAME "%d", rbd_dev->id);
1318 disk->major = rbd_dev->major;
1319 disk->first_minor = 0;
1320 disk->fops = &rbd_bd_ops;
1321 disk->private_data = rbd_dev;
1323 /* init rq */
1324 rc = -ENOMEM;
1325 q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1326 if (!q)
1327 goto out_disk;
1328 blk_queue_merge_bvec(q, rbd_merge_bvec);
1329 disk->queue = q;
1331 q->queuedata = rbd_dev;
1333 rbd_dev->disk = disk;
1334 rbd_dev->q = q;
1336 /* finally, announce the disk to the world */
1337 set_capacity(disk, total_size / 512ULL);
1338 add_disk(disk);
1340 pr_info("%s: added with size 0x%llx\n",
1341 disk->disk_name, (unsigned long long)total_size);
1342 return 0;
1344 out_disk:
1345 put_disk(disk);
1346 out:
1347 return rc;
1350 /********************************************************************
1351 * /sys/class/rbd/
1352 * add map rados objects to blkdev
1353 * remove unmap rados objects
1354 * list show mappings
1355 *******************************************************************/
1357 static void class_rbd_release(struct class *cls)
1359 kfree(cls);
1362 static ssize_t class_rbd_list(struct class *c,
1363 struct class_attribute *attr,
1364 char *data)
1366 int n = 0;
1367 struct list_head *tmp;
1368 int max = PAGE_SIZE;
1370 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1372 n += snprintf(data, max,
1373 "#id\tmajor\tclient_name\tpool\tname\tsnap\tKB\n");
1375 list_for_each(tmp, &rbd_dev_list) {
1376 struct rbd_device *rbd_dev;
1378 rbd_dev = list_entry(tmp, struct rbd_device, node);
1379 n += snprintf(data+n, max-n,
1380 "%d\t%d\tclient%lld\t%s\t%s\t%s\t%lld\n",
1381 rbd_dev->id,
1382 rbd_dev->major,
1383 ceph_client_id(rbd_dev->client),
1384 rbd_dev->pool_name,
1385 rbd_dev->obj, rbd_dev->snap_name,
1386 rbd_dev->header.image_size >> 10);
1387 if (n == max)
1388 break;
1391 mutex_unlock(&ctl_mutex);
1392 return n;
1395 static ssize_t class_rbd_add(struct class *c,
1396 struct class_attribute *attr,
1397 const char *buf, size_t count)
1399 struct ceph_osd_client *osdc;
1400 struct rbd_device *rbd_dev;
1401 ssize_t rc = -ENOMEM;
1402 int irc, new_id = 0;
1403 struct list_head *tmp;
1404 char *mon_dev_name;
1405 char *options;
1407 if (!try_module_get(THIS_MODULE))
1408 return -ENODEV;
1410 mon_dev_name = kmalloc(RBD_MAX_OPT_LEN, GFP_KERNEL);
1411 if (!mon_dev_name)
1412 goto err_out_mod;
1414 options = kmalloc(RBD_MAX_OPT_LEN, GFP_KERNEL);
1415 if (!options)
1416 goto err_mon_dev;
1418 /* new rbd_device object */
1419 rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
1420 if (!rbd_dev)
1421 goto err_out_opt;
1423 /* static rbd_device initialization */
1424 spin_lock_init(&rbd_dev->lock);
1425 INIT_LIST_HEAD(&rbd_dev->node);
1427 /* generate unique id: find highest unique id, add one */
1428 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1430 list_for_each(tmp, &rbd_dev_list) {
1431 struct rbd_device *rbd_dev;
1433 rbd_dev = list_entry(tmp, struct rbd_device, node);
1434 if (rbd_dev->id >= new_id)
1435 new_id = rbd_dev->id + 1;
1438 rbd_dev->id = new_id;
1440 /* add to global list */
1441 list_add_tail(&rbd_dev->node, &rbd_dev_list);
1443 /* parse add command */
1444 if (sscanf(buf, "%" __stringify(RBD_MAX_OPT_LEN) "s "
1445 "%" __stringify(RBD_MAX_OPT_LEN) "s "
1446 "%" __stringify(RBD_MAX_POOL_NAME_LEN) "s "
1447 "%" __stringify(RBD_MAX_OBJ_NAME_LEN) "s"
1448 "%" __stringify(RBD_MAX_SNAP_NAME_LEN) "s",
1449 mon_dev_name, options, rbd_dev->pool_name,
1450 rbd_dev->obj, rbd_dev->snap_name) < 4) {
1451 rc = -EINVAL;
1452 goto err_out_slot;
1455 if (rbd_dev->snap_name[0] == 0)
1456 rbd_dev->snap_name[0] = '-';
1458 rbd_dev->obj_len = strlen(rbd_dev->obj);
1459 snprintf(rbd_dev->obj_md_name, sizeof(rbd_dev->obj_md_name), "%s%s",
1460 rbd_dev->obj, RBD_SUFFIX);
1462 /* initialize rest of new object */
1463 snprintf(rbd_dev->name, DEV_NAME_LEN, DRV_NAME "%d", rbd_dev->id);
1464 rc = rbd_get_client(rbd_dev, mon_dev_name, options);
1465 if (rc < 0)
1466 goto err_out_slot;
1468 mutex_unlock(&ctl_mutex);
1470 /* pick the pool */
1471 osdc = &rbd_dev->client->osdc;
1472 rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
1473 if (rc < 0)
1474 goto err_out_client;
1475 rbd_dev->poolid = rc;
1477 /* register our block device */
1478 irc = register_blkdev(0, rbd_dev->name);
1479 if (irc < 0) {
1480 rc = irc;
1481 goto err_out_client;
1483 rbd_dev->major = irc;
1485 /* set up and announce blkdev mapping */
1486 rc = rbd_init_disk(rbd_dev);
1487 if (rc)
1488 goto err_out_blkdev;
1490 return count;
1492 err_out_blkdev:
1493 unregister_blkdev(rbd_dev->major, rbd_dev->name);
1494 err_out_client:
1495 rbd_put_client(rbd_dev);
1496 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1497 err_out_slot:
1498 list_del_init(&rbd_dev->node);
1499 mutex_unlock(&ctl_mutex);
1501 kfree(rbd_dev);
1502 err_out_opt:
1503 kfree(options);
1504 err_mon_dev:
1505 kfree(mon_dev_name);
1506 err_out_mod:
1507 dout("Error adding device %s\n", buf);
1508 module_put(THIS_MODULE);
1509 return rc;
1512 static struct rbd_device *__rbd_get_dev(unsigned long id)
1514 struct list_head *tmp;
1515 struct rbd_device *rbd_dev;
1517 list_for_each(tmp, &rbd_dev_list) {
1518 rbd_dev = list_entry(tmp, struct rbd_device, node);
1519 if (rbd_dev->id == id)
1520 return rbd_dev;
1522 return NULL;
1525 static ssize_t class_rbd_remove(struct class *c,
1526 struct class_attribute *attr,
1527 const char *buf,
1528 size_t count)
1530 struct rbd_device *rbd_dev = NULL;
1531 int target_id, rc;
1532 unsigned long ul;
1534 rc = strict_strtoul(buf, 10, &ul);
1535 if (rc)
1536 return rc;
1538 /* convert to int; abort if we lost anything in the conversion */
1539 target_id = (int) ul;
1540 if (target_id != ul)
1541 return -EINVAL;
1543 /* remove object from list immediately */
1544 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1546 rbd_dev = __rbd_get_dev(target_id);
1547 if (rbd_dev)
1548 list_del_init(&rbd_dev->node);
1550 mutex_unlock(&ctl_mutex);
1552 if (!rbd_dev)
1553 return -ENOENT;
1555 rbd_put_client(rbd_dev);
1557 /* clean up and free blkdev */
1558 rbd_free_disk(rbd_dev);
1559 unregister_blkdev(rbd_dev->major, rbd_dev->name);
1560 kfree(rbd_dev);
1562 /* release module ref */
1563 module_put(THIS_MODULE);
1565 return count;
1568 static ssize_t class_rbd_snaps_list(struct class *c,
1569 struct class_attribute *attr,
1570 char *data)
1572 struct rbd_device *rbd_dev = NULL;
1573 struct list_head *tmp;
1574 struct rbd_image_header *header;
1575 int i, n = 0, max = PAGE_SIZE;
1576 int ret;
1578 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1580 n += snprintf(data, max, "#id\tsnap\tKB\n");
1582 list_for_each(tmp, &rbd_dev_list) {
1583 char *names, *p;
1584 struct ceph_snap_context *snapc;
1586 rbd_dev = list_entry(tmp, struct rbd_device, node);
1587 header = &rbd_dev->header;
1589 down_read(&header->snap_rwsem);
1591 names = header->snap_names;
1592 snapc = header->snapc;
1594 n += snprintf(data + n, max - n, "%d\t%s\t%lld%s\n",
1595 rbd_dev->id, RBD_SNAP_HEAD_NAME,
1596 header->image_size >> 10,
1597 (!rbd_dev->cur_snap ? " (*)" : ""));
1598 if (n == max)
1599 break;
1601 p = names;
1602 for (i = 0; i < header->total_snaps; i++, p += strlen(p) + 1) {
1603 n += snprintf(data + n, max - n, "%d\t%s\t%lld%s\n",
1604 rbd_dev->id, p, header->snap_sizes[i] >> 10,
1605 (rbd_dev->cur_snap &&
1606 (snap_index(header, i) == rbd_dev->cur_snap) ?
1607 " (*)" : ""));
1608 if (n == max)
1609 break;
1612 up_read(&header->snap_rwsem);
1616 ret = n;
1617 mutex_unlock(&ctl_mutex);
1618 return ret;
1621 static ssize_t class_rbd_snaps_refresh(struct class *c,
1622 struct class_attribute *attr,
1623 const char *buf,
1624 size_t count)
1626 struct rbd_device *rbd_dev = NULL;
1627 int target_id, rc;
1628 unsigned long ul;
1629 int ret = count;
1631 rc = strict_strtoul(buf, 10, &ul);
1632 if (rc)
1633 return rc;
1635 /* convert to int; abort if we lost anything in the conversion */
1636 target_id = (int) ul;
1637 if (target_id != ul)
1638 return -EINVAL;
1640 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1642 rbd_dev = __rbd_get_dev(target_id);
1643 if (!rbd_dev) {
1644 ret = -ENOENT;
1645 goto done;
1648 rc = rbd_update_snaps(rbd_dev);
1649 if (rc < 0)
1650 ret = rc;
1652 done:
1653 mutex_unlock(&ctl_mutex);
1654 return ret;
1657 static ssize_t class_rbd_snap_create(struct class *c,
1658 struct class_attribute *attr,
1659 const char *buf,
1660 size_t count)
1662 struct rbd_device *rbd_dev = NULL;
1663 int target_id, ret;
1664 char *name;
1666 name = kmalloc(RBD_MAX_SNAP_NAME_LEN + 1, GFP_KERNEL);
1667 if (!name)
1668 return -ENOMEM;
1670 /* parse snaps add command */
1671 if (sscanf(buf, "%d "
1672 "%" __stringify(RBD_MAX_SNAP_NAME_LEN) "s",
1673 &target_id,
1674 name) != 2) {
1675 ret = -EINVAL;
1676 goto done;
1679 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1681 rbd_dev = __rbd_get_dev(target_id);
1682 if (!rbd_dev) {
1683 ret = -ENOENT;
1684 goto done_unlock;
1687 ret = rbd_header_add_snap(rbd_dev,
1688 name, GFP_KERNEL);
1689 if (ret < 0)
1690 goto done_unlock;
1692 ret = rbd_update_snaps(rbd_dev);
1693 if (ret < 0)
1694 goto done_unlock;
1696 ret = count;
1697 done_unlock:
1698 mutex_unlock(&ctl_mutex);
1699 done:
1700 kfree(name);
1701 return ret;
1704 static ssize_t class_rbd_rollback(struct class *c,
1705 struct class_attribute *attr,
1706 const char *buf,
1707 size_t count)
1709 struct rbd_device *rbd_dev = NULL;
1710 int target_id, ret;
1711 u64 snapid;
1712 char snap_name[RBD_MAX_SNAP_NAME_LEN];
1713 u64 cur_ofs;
1714 char *seg_name;
1716 /* parse snaps add command */
1717 if (sscanf(buf, "%d "
1718 "%" __stringify(RBD_MAX_SNAP_NAME_LEN) "s",
1719 &target_id,
1720 snap_name) != 2) {
1721 return -EINVAL;
1724 ret = -ENOMEM;
1725 seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
1726 if (!seg_name)
1727 return ret;
1729 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1731 rbd_dev = __rbd_get_dev(target_id);
1732 if (!rbd_dev) {
1733 ret = -ENOENT;
1734 goto done_unlock;
1737 ret = snap_by_name(&rbd_dev->header, snap_name, &snapid, NULL);
1738 if (ret < 0)
1739 goto done_unlock;
1741 dout("snapid=%lld\n", snapid);
1743 cur_ofs = 0;
1744 while (cur_ofs < rbd_dev->header.image_size) {
1745 cur_ofs += rbd_get_segment(&rbd_dev->header,
1746 rbd_dev->obj,
1747 cur_ofs, (u64)-1,
1748 seg_name, NULL);
1749 dout("seg_name=%s\n", seg_name);
1751 ret = rbd_req_sync_rollback_obj(rbd_dev, snapid, seg_name);
1752 if (ret < 0)
1753 pr_warning("could not roll back obj %s err=%d\n",
1754 seg_name, ret);
1757 ret = rbd_update_snaps(rbd_dev);
1758 if (ret < 0)
1759 goto done_unlock;
1761 ret = count;
1763 done_unlock:
1764 mutex_unlock(&ctl_mutex);
1765 kfree(seg_name);
1767 return ret;
1770 static struct class_attribute class_rbd_attrs[] = {
1771 __ATTR(add, 0200, NULL, class_rbd_add),
1772 __ATTR(remove, 0200, NULL, class_rbd_remove),
1773 __ATTR(list, 0444, class_rbd_list, NULL),
1774 __ATTR(snaps_refresh, 0200, NULL, class_rbd_snaps_refresh),
1775 __ATTR(snap_create, 0200, NULL, class_rbd_snap_create),
1776 __ATTR(snaps_list, 0444, class_rbd_snaps_list, NULL),
1777 __ATTR(snap_rollback, 0200, NULL, class_rbd_rollback),
1778 __ATTR_NULL
1782 * create control files in sysfs
1783 * /sys/class/rbd/...
1785 static int rbd_sysfs_init(void)
1787 int ret = -ENOMEM;
1789 class_rbd = kzalloc(sizeof(*class_rbd), GFP_KERNEL);
1790 if (!class_rbd)
1791 goto out;
1793 class_rbd->name = DRV_NAME;
1794 class_rbd->owner = THIS_MODULE;
1795 class_rbd->class_release = class_rbd_release;
1796 class_rbd->class_attrs = class_rbd_attrs;
1798 ret = class_register(class_rbd);
1799 if (ret)
1800 goto out_class;
1801 return 0;
1803 out_class:
1804 kfree(class_rbd);
1805 class_rbd = NULL;
1806 pr_err(DRV_NAME ": failed to create class rbd\n");
1807 out:
1808 return ret;
1811 static void rbd_sysfs_cleanup(void)
1813 if (class_rbd)
1814 class_destroy(class_rbd);
1815 class_rbd = NULL;
1818 int __init rbd_init(void)
1820 int rc;
1822 rc = rbd_sysfs_init();
1823 if (rc)
1824 return rc;
1825 spin_lock_init(&node_lock);
1826 pr_info("loaded " DRV_NAME_LONG "\n");
1827 return 0;
1830 void __exit rbd_exit(void)
1832 rbd_sysfs_cleanup();
1835 module_init(rbd_init);
1836 module_exit(rbd_exit);
1838 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
1839 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
1840 MODULE_DESCRIPTION("rados block device");
1842 /* following authorship retained from original osdblk.c */
1843 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
1845 MODULE_LICENSE("GPL");