tg3: Fix loopback tests
[linux-2.6/linux-acpi-2.6/ibm-acpi-2.6.git] / drivers / block / rbd.c
blobe1e38b11f48ae3b60f15a27f0242b4b9c100dde5
1 /*
2 rbd.c -- Export ceph rados objects as a Linux block device
5 based on drivers/block/osdblk.c:
7 Copyright 2009 Red Hat, Inc.
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation.
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
18 You should have received a copy of the GNU General Public License
19 along with this program; see the file COPYING. If not, write to
20 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
24 For usage instructions, please refer to:
26 Documentation/ABI/testing/sysfs-bus-rbd
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
35 #include <linux/kernel.h>
36 #include <linux/device.h>
37 #include <linux/module.h>
38 #include <linux/fs.h>
39 #include <linux/blkdev.h>
41 #include "rbd_types.h"
43 #define DRV_NAME "rbd"
44 #define DRV_NAME_LONG "rbd (rados block device)"
46 #define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
48 #define RBD_MAX_MD_NAME_LEN (96 + sizeof(RBD_SUFFIX))
49 #define RBD_MAX_POOL_NAME_LEN 64
50 #define RBD_MAX_SNAP_NAME_LEN 32
51 #define RBD_MAX_OPT_LEN 1024
53 #define RBD_SNAP_HEAD_NAME "-"
55 #define DEV_NAME_LEN 32
58 * block device image metadata (in-memory version)
60 struct rbd_image_header {
61 u64 image_size;
62 char block_name[32];
63 __u8 obj_order;
64 __u8 crypt_type;
65 __u8 comp_type;
66 struct rw_semaphore snap_rwsem;
67 struct ceph_snap_context *snapc;
68 size_t snap_names_len;
69 u64 snap_seq;
70 u32 total_snaps;
72 char *snap_names;
73 u64 *snap_sizes;
77 * an instance of the client. multiple devices may share a client.
79 struct rbd_client {
80 struct ceph_client *client;
81 struct kref kref;
82 struct list_head node;
86 * a single io request
88 struct rbd_request {
89 struct request *rq; /* blk layer request */
90 struct bio *bio; /* cloned bio */
91 struct page **pages; /* list of used pages */
92 u64 len;
95 struct rbd_snap {
96 struct device dev;
97 const char *name;
98 size_t size;
99 struct list_head node;
100 u64 id;
104 * a single device
106 struct rbd_device {
107 int id; /* blkdev unique id */
109 int major; /* blkdev assigned major */
110 struct gendisk *disk; /* blkdev's gendisk and rq */
111 struct request_queue *q;
113 struct ceph_client *client;
114 struct rbd_client *rbd_client;
116 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
118 spinlock_t lock; /* queue lock */
120 struct rbd_image_header header;
121 char obj[RBD_MAX_OBJ_NAME_LEN]; /* rbd image name */
122 int obj_len;
123 char obj_md_name[RBD_MAX_MD_NAME_LEN]; /* hdr nm. */
124 char pool_name[RBD_MAX_POOL_NAME_LEN];
125 int poolid;
127 char snap_name[RBD_MAX_SNAP_NAME_LEN];
128 u32 cur_snap; /* index+1 of current snapshot within snap context
129 0 - for the head */
130 int read_only;
132 struct list_head node;
134 /* list of snapshots */
135 struct list_head snaps;
137 /* sysfs related */
138 struct device dev;
141 static struct bus_type rbd_bus_type = {
142 .name = "rbd",
145 static spinlock_t node_lock; /* protects client get/put */
147 static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
148 static LIST_HEAD(rbd_dev_list); /* devices */
149 static LIST_HEAD(rbd_client_list); /* clients */
151 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev);
152 static void rbd_dev_release(struct device *dev);
153 static ssize_t rbd_snap_rollback(struct device *dev,
154 struct device_attribute *attr,
155 const char *buf,
156 size_t size);
157 static ssize_t rbd_snap_add(struct device *dev,
158 struct device_attribute *attr,
159 const char *buf,
160 size_t count);
161 static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
162 struct rbd_snap *snap);;
165 static struct rbd_device *dev_to_rbd(struct device *dev)
167 return container_of(dev, struct rbd_device, dev);
170 static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
172 return get_device(&rbd_dev->dev);
175 static void rbd_put_dev(struct rbd_device *rbd_dev)
177 put_device(&rbd_dev->dev);
180 static int rbd_open(struct block_device *bdev, fmode_t mode)
182 struct gendisk *disk = bdev->bd_disk;
183 struct rbd_device *rbd_dev = disk->private_data;
185 rbd_get_dev(rbd_dev);
187 set_device_ro(bdev, rbd_dev->read_only);
189 if ((mode & FMODE_WRITE) && rbd_dev->read_only)
190 return -EROFS;
192 return 0;
195 static int rbd_release(struct gendisk *disk, fmode_t mode)
197 struct rbd_device *rbd_dev = disk->private_data;
199 rbd_put_dev(rbd_dev);
201 return 0;
204 static const struct block_device_operations rbd_bd_ops = {
205 .owner = THIS_MODULE,
206 .open = rbd_open,
207 .release = rbd_release,
211 * Initialize an rbd client instance.
212 * We own *opt.
214 static struct rbd_client *rbd_client_create(struct ceph_options *opt)
216 struct rbd_client *rbdc;
217 int ret = -ENOMEM;
219 dout("rbd_client_create\n");
220 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
221 if (!rbdc)
222 goto out_opt;
224 kref_init(&rbdc->kref);
225 INIT_LIST_HEAD(&rbdc->node);
227 rbdc->client = ceph_create_client(opt, rbdc);
228 if (IS_ERR(rbdc->client))
229 goto out_rbdc;
230 opt = NULL; /* Now rbdc->client is responsible for opt */
232 ret = ceph_open_session(rbdc->client);
233 if (ret < 0)
234 goto out_err;
236 spin_lock(&node_lock);
237 list_add_tail(&rbdc->node, &rbd_client_list);
238 spin_unlock(&node_lock);
240 dout("rbd_client_create created %p\n", rbdc);
241 return rbdc;
243 out_err:
244 ceph_destroy_client(rbdc->client);
245 out_rbdc:
246 kfree(rbdc);
247 out_opt:
248 if (opt)
249 ceph_destroy_options(opt);
250 return ERR_PTR(ret);
254 * Find a ceph client with specific addr and configuration.
256 static struct rbd_client *__rbd_client_find(struct ceph_options *opt)
258 struct rbd_client *client_node;
260 if (opt->flags & CEPH_OPT_NOSHARE)
261 return NULL;
263 list_for_each_entry(client_node, &rbd_client_list, node)
264 if (ceph_compare_options(opt, client_node->client) == 0)
265 return client_node;
266 return NULL;
270 * Get a ceph client with specific addr and configuration, if one does
271 * not exist create it.
273 static int rbd_get_client(struct rbd_device *rbd_dev, const char *mon_addr,
274 char *options)
276 struct rbd_client *rbdc;
277 struct ceph_options *opt;
278 int ret;
280 ret = ceph_parse_options(&opt, options, mon_addr,
281 mon_addr + strlen(mon_addr), NULL, NULL);
282 if (ret < 0)
283 return ret;
285 spin_lock(&node_lock);
286 rbdc = __rbd_client_find(opt);
287 if (rbdc) {
288 ceph_destroy_options(opt);
290 /* using an existing client */
291 kref_get(&rbdc->kref);
292 rbd_dev->rbd_client = rbdc;
293 rbd_dev->client = rbdc->client;
294 spin_unlock(&node_lock);
295 return 0;
297 spin_unlock(&node_lock);
299 rbdc = rbd_client_create(opt);
300 if (IS_ERR(rbdc))
301 return PTR_ERR(rbdc);
303 rbd_dev->rbd_client = rbdc;
304 rbd_dev->client = rbdc->client;
305 return 0;
309 * Destroy ceph client
311 static void rbd_client_release(struct kref *kref)
313 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
315 dout("rbd_release_client %p\n", rbdc);
316 spin_lock(&node_lock);
317 list_del(&rbdc->node);
318 spin_unlock(&node_lock);
320 ceph_destroy_client(rbdc->client);
321 kfree(rbdc);
325 * Drop reference to ceph client node. If it's not referenced anymore, release
326 * it.
328 static void rbd_put_client(struct rbd_device *rbd_dev)
330 kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
331 rbd_dev->rbd_client = NULL;
332 rbd_dev->client = NULL;
337 * Create a new header structure, translate header format from the on-disk
338 * header.
340 static int rbd_header_from_disk(struct rbd_image_header *header,
341 struct rbd_image_header_ondisk *ondisk,
342 int allocated_snaps,
343 gfp_t gfp_flags)
345 int i;
346 u32 snap_count = le32_to_cpu(ondisk->snap_count);
347 int ret = -ENOMEM;
349 init_rwsem(&header->snap_rwsem);
350 header->snap_names_len = le64_to_cpu(ondisk->snap_names_len);
351 header->snapc = kmalloc(sizeof(struct ceph_snap_context) +
352 snap_count *
353 sizeof(struct rbd_image_snap_ondisk),
354 gfp_flags);
355 if (!header->snapc)
356 return -ENOMEM;
357 if (snap_count) {
358 header->snap_names = kmalloc(header->snap_names_len,
359 GFP_KERNEL);
360 if (!header->snap_names)
361 goto err_snapc;
362 header->snap_sizes = kmalloc(snap_count * sizeof(u64),
363 GFP_KERNEL);
364 if (!header->snap_sizes)
365 goto err_names;
366 } else {
367 header->snap_names = NULL;
368 header->snap_sizes = NULL;
370 memcpy(header->block_name, ondisk->block_name,
371 sizeof(ondisk->block_name));
373 header->image_size = le64_to_cpu(ondisk->image_size);
374 header->obj_order = ondisk->options.order;
375 header->crypt_type = ondisk->options.crypt_type;
376 header->comp_type = ondisk->options.comp_type;
378 atomic_set(&header->snapc->nref, 1);
379 header->snap_seq = le64_to_cpu(ondisk->snap_seq);
380 header->snapc->num_snaps = snap_count;
381 header->total_snaps = snap_count;
383 if (snap_count &&
384 allocated_snaps == snap_count) {
385 for (i = 0; i < snap_count; i++) {
386 header->snapc->snaps[i] =
387 le64_to_cpu(ondisk->snaps[i].id);
388 header->snap_sizes[i] =
389 le64_to_cpu(ondisk->snaps[i].image_size);
392 /* copy snapshot names */
393 memcpy(header->snap_names, &ondisk->snaps[i],
394 header->snap_names_len);
397 return 0;
399 err_names:
400 kfree(header->snap_names);
401 err_snapc:
402 kfree(header->snapc);
403 return ret;
406 static int snap_index(struct rbd_image_header *header, int snap_num)
408 return header->total_snaps - snap_num;
411 static u64 cur_snap_id(struct rbd_device *rbd_dev)
413 struct rbd_image_header *header = &rbd_dev->header;
415 if (!rbd_dev->cur_snap)
416 return 0;
418 return header->snapc->snaps[snap_index(header, rbd_dev->cur_snap)];
421 static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
422 u64 *seq, u64 *size)
424 int i;
425 char *p = header->snap_names;
427 for (i = 0; i < header->total_snaps; i++, p += strlen(p) + 1) {
428 if (strcmp(snap_name, p) == 0)
429 break;
431 if (i == header->total_snaps)
432 return -ENOENT;
433 if (seq)
434 *seq = header->snapc->snaps[i];
436 if (size)
437 *size = header->snap_sizes[i];
439 return i;
442 static int rbd_header_set_snap(struct rbd_device *dev,
443 const char *snap_name,
444 u64 *size)
446 struct rbd_image_header *header = &dev->header;
447 struct ceph_snap_context *snapc = header->snapc;
448 int ret = -ENOENT;
450 down_write(&header->snap_rwsem);
452 if (!snap_name ||
453 !*snap_name ||
454 strcmp(snap_name, "-") == 0 ||
455 strcmp(snap_name, RBD_SNAP_HEAD_NAME) == 0) {
456 if (header->total_snaps)
457 snapc->seq = header->snap_seq;
458 else
459 snapc->seq = 0;
460 dev->cur_snap = 0;
461 dev->read_only = 0;
462 if (size)
463 *size = header->image_size;
464 } else {
465 ret = snap_by_name(header, snap_name, &snapc->seq, size);
466 if (ret < 0)
467 goto done;
469 dev->cur_snap = header->total_snaps - ret;
470 dev->read_only = 1;
473 ret = 0;
474 done:
475 up_write(&header->snap_rwsem);
476 return ret;
479 static void rbd_header_free(struct rbd_image_header *header)
481 kfree(header->snapc);
482 kfree(header->snap_names);
483 kfree(header->snap_sizes);
487 * get the actual striped segment name, offset and length
489 static u64 rbd_get_segment(struct rbd_image_header *header,
490 const char *block_name,
491 u64 ofs, u64 len,
492 char *seg_name, u64 *segofs)
494 u64 seg = ofs >> header->obj_order;
496 if (seg_name)
497 snprintf(seg_name, RBD_MAX_SEG_NAME_LEN,
498 "%s.%012llx", block_name, seg);
500 ofs = ofs & ((1 << header->obj_order) - 1);
501 len = min_t(u64, len, (1 << header->obj_order) - ofs);
503 if (segofs)
504 *segofs = ofs;
506 return len;
510 * bio helpers
513 static void bio_chain_put(struct bio *chain)
515 struct bio *tmp;
517 while (chain) {
518 tmp = chain;
519 chain = chain->bi_next;
520 bio_put(tmp);
525 * zeros a bio chain, starting at specific offset
527 static void zero_bio_chain(struct bio *chain, int start_ofs)
529 struct bio_vec *bv;
530 unsigned long flags;
531 void *buf;
532 int i;
533 int pos = 0;
535 while (chain) {
536 bio_for_each_segment(bv, chain, i) {
537 if (pos + bv->bv_len > start_ofs) {
538 int remainder = max(start_ofs - pos, 0);
539 buf = bvec_kmap_irq(bv, &flags);
540 memset(buf + remainder, 0,
541 bv->bv_len - remainder);
542 bvec_kunmap_irq(buf, &flags);
544 pos += bv->bv_len;
547 chain = chain->bi_next;
552 * bio_chain_clone - clone a chain of bios up to a certain length.
553 * might return a bio_pair that will need to be released.
555 static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
556 struct bio_pair **bp,
557 int len, gfp_t gfpmask)
559 struct bio *tmp, *old_chain = *old, *new_chain = NULL, *tail = NULL;
560 int total = 0;
562 if (*bp) {
563 bio_pair_release(*bp);
564 *bp = NULL;
567 while (old_chain && (total < len)) {
568 tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
569 if (!tmp)
570 goto err_out;
572 if (total + old_chain->bi_size > len) {
573 struct bio_pair *bp;
576 * this split can only happen with a single paged bio,
577 * split_bio will BUG_ON if this is not the case
579 dout("bio_chain_clone split! total=%d remaining=%d"
580 "bi_size=%d\n",
581 (int)total, (int)len-total,
582 (int)old_chain->bi_size);
584 /* split the bio. We'll release it either in the next
585 call, or it will have to be released outside */
586 bp = bio_split(old_chain, (len - total) / 512ULL);
587 if (!bp)
588 goto err_out;
590 __bio_clone(tmp, &bp->bio1);
592 *next = &bp->bio2;
593 } else {
594 __bio_clone(tmp, old_chain);
595 *next = old_chain->bi_next;
598 tmp->bi_bdev = NULL;
599 gfpmask &= ~__GFP_WAIT;
600 tmp->bi_next = NULL;
602 if (!new_chain) {
603 new_chain = tail = tmp;
604 } else {
605 tail->bi_next = tmp;
606 tail = tmp;
608 old_chain = old_chain->bi_next;
610 total += tmp->bi_size;
613 BUG_ON(total < len);
615 if (tail)
616 tail->bi_next = NULL;
618 *old = old_chain;
620 return new_chain;
622 err_out:
623 dout("bio_chain_clone with err\n");
624 bio_chain_put(new_chain);
625 return NULL;
629 * helpers for osd request op vectors.
631 static int rbd_create_rw_ops(struct ceph_osd_req_op **ops,
632 int num_ops,
633 int opcode,
634 u32 payload_len)
636 *ops = kzalloc(sizeof(struct ceph_osd_req_op) * (num_ops + 1),
637 GFP_NOIO);
638 if (!*ops)
639 return -ENOMEM;
640 (*ops)[0].op = opcode;
642 * op extent offset and length will be set later on
643 * in calc_raw_layout()
645 (*ops)[0].payload_len = payload_len;
646 return 0;
649 static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
651 kfree(ops);
655 * Send ceph osd request
657 static int rbd_do_request(struct request *rq,
658 struct rbd_device *dev,
659 struct ceph_snap_context *snapc,
660 u64 snapid,
661 const char *obj, u64 ofs, u64 len,
662 struct bio *bio,
663 struct page **pages,
664 int num_pages,
665 int flags,
666 struct ceph_osd_req_op *ops,
667 int num_reply,
668 void (*rbd_cb)(struct ceph_osd_request *req,
669 struct ceph_msg *msg))
671 struct ceph_osd_request *req;
672 struct ceph_file_layout *layout;
673 int ret;
674 u64 bno;
675 struct timespec mtime = CURRENT_TIME;
676 struct rbd_request *req_data;
677 struct ceph_osd_request_head *reqhead;
678 struct rbd_image_header *header = &dev->header;
680 ret = -ENOMEM;
681 req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
682 if (!req_data)
683 goto done;
685 dout("rbd_do_request len=%lld ofs=%lld\n", len, ofs);
687 down_read(&header->snap_rwsem);
689 req = ceph_osdc_alloc_request(&dev->client->osdc, flags,
690 snapc,
691 ops,
692 false,
693 GFP_NOIO, pages, bio);
694 if (IS_ERR(req)) {
695 up_read(&header->snap_rwsem);
696 ret = PTR_ERR(req);
697 goto done_pages;
700 req->r_callback = rbd_cb;
702 req_data->rq = rq;
703 req_data->bio = bio;
704 req_data->pages = pages;
705 req_data->len = len;
707 req->r_priv = req_data;
709 reqhead = req->r_request->front.iov_base;
710 reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
712 strncpy(req->r_oid, obj, sizeof(req->r_oid));
713 req->r_oid_len = strlen(req->r_oid);
715 layout = &req->r_file_layout;
716 memset(layout, 0, sizeof(*layout));
717 layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
718 layout->fl_stripe_count = cpu_to_le32(1);
719 layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
720 layout->fl_pg_preferred = cpu_to_le32(-1);
721 layout->fl_pg_pool = cpu_to_le32(dev->poolid);
722 ceph_calc_raw_layout(&dev->client->osdc, layout, snapid,
723 ofs, &len, &bno, req, ops);
725 ceph_osdc_build_request(req, ofs, &len,
726 ops,
727 snapc,
728 &mtime,
729 req->r_oid, req->r_oid_len);
730 up_read(&header->snap_rwsem);
732 ret = ceph_osdc_start_request(&dev->client->osdc, req, false);
733 if (ret < 0)
734 goto done_err;
736 if (!rbd_cb) {
737 ret = ceph_osdc_wait_request(&dev->client->osdc, req);
738 ceph_osdc_put_request(req);
740 return ret;
742 done_err:
743 bio_chain_put(req_data->bio);
744 ceph_osdc_put_request(req);
745 done_pages:
746 kfree(req_data);
747 done:
748 if (rq)
749 blk_end_request(rq, ret, len);
750 return ret;
754 * Ceph osd op callback
756 static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
758 struct rbd_request *req_data = req->r_priv;
759 struct ceph_osd_reply_head *replyhead;
760 struct ceph_osd_op *op;
761 __s32 rc;
762 u64 bytes;
763 int read_op;
765 /* parse reply */
766 replyhead = msg->front.iov_base;
767 WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
768 op = (void *)(replyhead + 1);
769 rc = le32_to_cpu(replyhead->result);
770 bytes = le64_to_cpu(op->extent.length);
771 read_op = (le32_to_cpu(op->op) == CEPH_OSD_OP_READ);
773 dout("rbd_req_cb bytes=%lld readop=%d rc=%d\n", bytes, read_op, rc);
775 if (rc == -ENOENT && read_op) {
776 zero_bio_chain(req_data->bio, 0);
777 rc = 0;
778 } else if (rc == 0 && read_op && bytes < req_data->len) {
779 zero_bio_chain(req_data->bio, bytes);
780 bytes = req_data->len;
783 blk_end_request(req_data->rq, rc, bytes);
785 if (req_data->bio)
786 bio_chain_put(req_data->bio);
788 ceph_osdc_put_request(req);
789 kfree(req_data);
793 * Do a synchronous ceph osd operation
795 static int rbd_req_sync_op(struct rbd_device *dev,
796 struct ceph_snap_context *snapc,
797 u64 snapid,
798 int opcode,
799 int flags,
800 struct ceph_osd_req_op *orig_ops,
801 int num_reply,
802 const char *obj,
803 u64 ofs, u64 len,
804 char *buf)
806 int ret;
807 struct page **pages;
808 int num_pages;
809 struct ceph_osd_req_op *ops = orig_ops;
810 u32 payload_len;
812 num_pages = calc_pages_for(ofs , len);
813 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
814 if (IS_ERR(pages))
815 return PTR_ERR(pages);
817 if (!orig_ops) {
818 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? len : 0);
819 ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
820 if (ret < 0)
821 goto done;
823 if ((flags & CEPH_OSD_FLAG_WRITE) && buf) {
824 ret = ceph_copy_to_page_vector(pages, buf, ofs, len);
825 if (ret < 0)
826 goto done_ops;
830 ret = rbd_do_request(NULL, dev, snapc, snapid,
831 obj, ofs, len, NULL,
832 pages, num_pages,
833 flags,
834 ops,
836 NULL);
837 if (ret < 0)
838 goto done_ops;
840 if ((flags & CEPH_OSD_FLAG_READ) && buf)
841 ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
843 done_ops:
844 if (!orig_ops)
845 rbd_destroy_ops(ops);
846 done:
847 ceph_release_page_vector(pages, num_pages);
848 return ret;
852 * Do an asynchronous ceph osd operation
854 static int rbd_do_op(struct request *rq,
855 struct rbd_device *rbd_dev ,
856 struct ceph_snap_context *snapc,
857 u64 snapid,
858 int opcode, int flags, int num_reply,
859 u64 ofs, u64 len,
860 struct bio *bio)
862 char *seg_name;
863 u64 seg_ofs;
864 u64 seg_len;
865 int ret;
866 struct ceph_osd_req_op *ops;
867 u32 payload_len;
869 seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
870 if (!seg_name)
871 return -ENOMEM;
873 seg_len = rbd_get_segment(&rbd_dev->header,
874 rbd_dev->header.block_name,
875 ofs, len,
876 seg_name, &seg_ofs);
878 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
880 ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
881 if (ret < 0)
882 goto done;
884 /* we've taken care of segment sizes earlier when we
885 cloned the bios. We should never have a segment
886 truncated at this point */
887 BUG_ON(seg_len < len);
889 ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
890 seg_name, seg_ofs, seg_len,
891 bio,
892 NULL, 0,
893 flags,
894 ops,
895 num_reply,
896 rbd_req_cb);
897 done:
898 kfree(seg_name);
899 return ret;
903 * Request async osd write
905 static int rbd_req_write(struct request *rq,
906 struct rbd_device *rbd_dev,
907 struct ceph_snap_context *snapc,
908 u64 ofs, u64 len,
909 struct bio *bio)
911 return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
912 CEPH_OSD_OP_WRITE,
913 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
915 ofs, len, bio);
919 * Request async osd read
921 static int rbd_req_read(struct request *rq,
922 struct rbd_device *rbd_dev,
923 u64 snapid,
924 u64 ofs, u64 len,
925 struct bio *bio)
927 return rbd_do_op(rq, rbd_dev, NULL,
928 (snapid ? snapid : CEPH_NOSNAP),
929 CEPH_OSD_OP_READ,
930 CEPH_OSD_FLAG_READ,
932 ofs, len, bio);
936 * Request sync osd read
938 static int rbd_req_sync_read(struct rbd_device *dev,
939 struct ceph_snap_context *snapc,
940 u64 snapid,
941 const char *obj,
942 u64 ofs, u64 len,
943 char *buf)
945 return rbd_req_sync_op(dev, NULL,
946 (snapid ? snapid : CEPH_NOSNAP),
947 CEPH_OSD_OP_READ,
948 CEPH_OSD_FLAG_READ,
949 NULL,
950 1, obj, ofs, len, buf);
954 * Request sync osd read
956 static int rbd_req_sync_rollback_obj(struct rbd_device *dev,
957 u64 snapid,
958 const char *obj)
960 struct ceph_osd_req_op *ops;
961 int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_ROLLBACK, 0);
962 if (ret < 0)
963 return ret;
965 ops[0].snap.snapid = snapid;
967 ret = rbd_req_sync_op(dev, NULL,
968 CEPH_NOSNAP,
970 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
971 ops,
972 1, obj, 0, 0, NULL);
974 rbd_destroy_ops(ops);
976 if (ret < 0)
977 return ret;
979 return ret;
983 * Request sync osd read
985 static int rbd_req_sync_exec(struct rbd_device *dev,
986 const char *obj,
987 const char *cls,
988 const char *method,
989 const char *data,
990 int len)
992 struct ceph_osd_req_op *ops;
993 int cls_len = strlen(cls);
994 int method_len = strlen(method);
995 int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_CALL,
996 cls_len + method_len + len);
997 if (ret < 0)
998 return ret;
1000 ops[0].cls.class_name = cls;
1001 ops[0].cls.class_len = (__u8)cls_len;
1002 ops[0].cls.method_name = method;
1003 ops[0].cls.method_len = (__u8)method_len;
1004 ops[0].cls.argc = 0;
1005 ops[0].cls.indata = data;
1006 ops[0].cls.indata_len = len;
1008 ret = rbd_req_sync_op(dev, NULL,
1009 CEPH_NOSNAP,
1011 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1012 ops,
1013 1, obj, 0, 0, NULL);
1015 rbd_destroy_ops(ops);
1017 dout("cls_exec returned %d\n", ret);
1018 return ret;
1022 * block device queue callback
1024 static void rbd_rq_fn(struct request_queue *q)
1026 struct rbd_device *rbd_dev = q->queuedata;
1027 struct request *rq;
1028 struct bio_pair *bp = NULL;
1030 rq = blk_fetch_request(q);
1032 while (1) {
1033 struct bio *bio;
1034 struct bio *rq_bio, *next_bio = NULL;
1035 bool do_write;
1036 int size, op_size = 0;
1037 u64 ofs;
1039 /* peek at request from block layer */
1040 if (!rq)
1041 break;
1043 dout("fetched request\n");
1045 /* filter out block requests we don't understand */
1046 if ((rq->cmd_type != REQ_TYPE_FS)) {
1047 __blk_end_request_all(rq, 0);
1048 goto next;
1051 /* deduce our operation (read, write) */
1052 do_write = (rq_data_dir(rq) == WRITE);
1054 size = blk_rq_bytes(rq);
1055 ofs = blk_rq_pos(rq) * 512ULL;
1056 rq_bio = rq->bio;
1057 if (do_write && rbd_dev->read_only) {
1058 __blk_end_request_all(rq, -EROFS);
1059 goto next;
1062 spin_unlock_irq(q->queue_lock);
1064 dout("%s 0x%x bytes at 0x%llx\n",
1065 do_write ? "write" : "read",
1066 size, blk_rq_pos(rq) * 512ULL);
1068 do {
1069 /* a bio clone to be passed down to OSD req */
1070 dout("rq->bio->bi_vcnt=%d\n", rq->bio->bi_vcnt);
1071 op_size = rbd_get_segment(&rbd_dev->header,
1072 rbd_dev->header.block_name,
1073 ofs, size,
1074 NULL, NULL);
1075 bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1076 op_size, GFP_ATOMIC);
1077 if (!bio) {
1078 spin_lock_irq(q->queue_lock);
1079 __blk_end_request_all(rq, -ENOMEM);
1080 goto next;
1083 /* init OSD command: write or read */
1084 if (do_write)
1085 rbd_req_write(rq, rbd_dev,
1086 rbd_dev->header.snapc,
1087 ofs,
1088 op_size, bio);
1089 else
1090 rbd_req_read(rq, rbd_dev,
1091 cur_snap_id(rbd_dev),
1092 ofs,
1093 op_size, bio);
1095 size -= op_size;
1096 ofs += op_size;
1098 rq_bio = next_bio;
1099 } while (size > 0);
1101 if (bp)
1102 bio_pair_release(bp);
1104 spin_lock_irq(q->queue_lock);
1105 next:
1106 rq = blk_fetch_request(q);
1111 * a queue callback. Makes sure that we don't create a bio that spans across
1112 * multiple osd objects. One exception would be with a single page bios,
1113 * which we handle later at bio_chain_clone
1115 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1116 struct bio_vec *bvec)
1118 struct rbd_device *rbd_dev = q->queuedata;
1119 unsigned int chunk_sectors = 1 << (rbd_dev->header.obj_order - 9);
1120 sector_t sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1121 unsigned int bio_sectors = bmd->bi_size >> 9;
1122 int max;
1124 max = (chunk_sectors - ((sector & (chunk_sectors - 1))
1125 + bio_sectors)) << 9;
1126 if (max < 0)
1127 max = 0; /* bio_add cannot handle a negative return */
1128 if (max <= bvec->bv_len && bio_sectors == 0)
1129 return bvec->bv_len;
1130 return max;
1133 static void rbd_free_disk(struct rbd_device *rbd_dev)
1135 struct gendisk *disk = rbd_dev->disk;
1137 if (!disk)
1138 return;
1140 rbd_header_free(&rbd_dev->header);
1142 if (disk->flags & GENHD_FL_UP)
1143 del_gendisk(disk);
1144 if (disk->queue)
1145 blk_cleanup_queue(disk->queue);
1146 put_disk(disk);
1150 * reload the ondisk the header
1152 static int rbd_read_header(struct rbd_device *rbd_dev,
1153 struct rbd_image_header *header)
1155 ssize_t rc;
1156 struct rbd_image_header_ondisk *dh;
1157 int snap_count = 0;
1158 u64 snap_names_len = 0;
1160 while (1) {
1161 int len = sizeof(*dh) +
1162 snap_count * sizeof(struct rbd_image_snap_ondisk) +
1163 snap_names_len;
1165 rc = -ENOMEM;
1166 dh = kmalloc(len, GFP_KERNEL);
1167 if (!dh)
1168 return -ENOMEM;
1170 rc = rbd_req_sync_read(rbd_dev,
1171 NULL, CEPH_NOSNAP,
1172 rbd_dev->obj_md_name,
1173 0, len,
1174 (char *)dh);
1175 if (rc < 0)
1176 goto out_dh;
1178 rc = rbd_header_from_disk(header, dh, snap_count, GFP_KERNEL);
1179 if (rc < 0)
1180 goto out_dh;
1182 if (snap_count != header->total_snaps) {
1183 snap_count = header->total_snaps;
1184 snap_names_len = header->snap_names_len;
1185 rbd_header_free(header);
1186 kfree(dh);
1187 continue;
1189 break;
1192 out_dh:
1193 kfree(dh);
1194 return rc;
1198 * create a snapshot
1200 static int rbd_header_add_snap(struct rbd_device *dev,
1201 const char *snap_name,
1202 gfp_t gfp_flags)
1204 int name_len = strlen(snap_name);
1205 u64 new_snapid;
1206 int ret;
1207 void *data, *data_start, *data_end;
1209 /* we should create a snapshot only if we're pointing at the head */
1210 if (dev->cur_snap)
1211 return -EINVAL;
1213 ret = ceph_monc_create_snapid(&dev->client->monc, dev->poolid,
1214 &new_snapid);
1215 dout("created snapid=%lld\n", new_snapid);
1216 if (ret < 0)
1217 return ret;
1219 data = kmalloc(name_len + 16, gfp_flags);
1220 if (!data)
1221 return -ENOMEM;
1223 data_start = data;
1224 data_end = data + name_len + 16;
1226 ceph_encode_string_safe(&data, data_end, snap_name, name_len, bad);
1227 ceph_encode_64_safe(&data, data_end, new_snapid, bad);
1229 ret = rbd_req_sync_exec(dev, dev->obj_md_name, "rbd", "snap_add",
1230 data_start, data - data_start);
1232 kfree(data_start);
1234 if (ret < 0)
1235 return ret;
1237 dev->header.snapc->seq = new_snapid;
1239 return 0;
1240 bad:
1241 return -ERANGE;
1244 static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1246 struct rbd_snap *snap;
1248 while (!list_empty(&rbd_dev->snaps)) {
1249 snap = list_first_entry(&rbd_dev->snaps, struct rbd_snap, node);
1250 __rbd_remove_snap_dev(rbd_dev, snap);
1255 * only read the first part of the ondisk header, without the snaps info
1257 static int __rbd_update_snaps(struct rbd_device *rbd_dev)
1259 int ret;
1260 struct rbd_image_header h;
1261 u64 snap_seq;
1263 ret = rbd_read_header(rbd_dev, &h);
1264 if (ret < 0)
1265 return ret;
1267 down_write(&rbd_dev->header.snap_rwsem);
1269 snap_seq = rbd_dev->header.snapc->seq;
1271 kfree(rbd_dev->header.snapc);
1272 kfree(rbd_dev->header.snap_names);
1273 kfree(rbd_dev->header.snap_sizes);
1275 rbd_dev->header.total_snaps = h.total_snaps;
1276 rbd_dev->header.snapc = h.snapc;
1277 rbd_dev->header.snap_names = h.snap_names;
1278 rbd_dev->header.snap_names_len = h.snap_names_len;
1279 rbd_dev->header.snap_sizes = h.snap_sizes;
1280 rbd_dev->header.snapc->seq = snap_seq;
1282 ret = __rbd_init_snaps_header(rbd_dev);
1284 up_write(&rbd_dev->header.snap_rwsem);
1286 return ret;
1289 static int rbd_init_disk(struct rbd_device *rbd_dev)
1291 struct gendisk *disk;
1292 struct request_queue *q;
1293 int rc;
1294 u64 total_size = 0;
1296 /* contact OSD, request size info about the object being mapped */
1297 rc = rbd_read_header(rbd_dev, &rbd_dev->header);
1298 if (rc)
1299 return rc;
1301 /* no need to lock here, as rbd_dev is not registered yet */
1302 rc = __rbd_init_snaps_header(rbd_dev);
1303 if (rc)
1304 return rc;
1306 rc = rbd_header_set_snap(rbd_dev, rbd_dev->snap_name, &total_size);
1307 if (rc)
1308 return rc;
1310 /* create gendisk info */
1311 rc = -ENOMEM;
1312 disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1313 if (!disk)
1314 goto out;
1316 sprintf(disk->disk_name, DRV_NAME "%d", rbd_dev->id);
1317 disk->major = rbd_dev->major;
1318 disk->first_minor = 0;
1319 disk->fops = &rbd_bd_ops;
1320 disk->private_data = rbd_dev;
1322 /* init rq */
1323 rc = -ENOMEM;
1324 q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1325 if (!q)
1326 goto out_disk;
1327 blk_queue_merge_bvec(q, rbd_merge_bvec);
1328 disk->queue = q;
1330 q->queuedata = rbd_dev;
1332 rbd_dev->disk = disk;
1333 rbd_dev->q = q;
1335 /* finally, announce the disk to the world */
1336 set_capacity(disk, total_size / 512ULL);
1337 add_disk(disk);
1339 pr_info("%s: added with size 0x%llx\n",
1340 disk->disk_name, (unsigned long long)total_size);
1341 return 0;
1343 out_disk:
1344 put_disk(disk);
1345 out:
1346 return rc;
1350 sysfs
1353 static ssize_t rbd_size_show(struct device *dev,
1354 struct device_attribute *attr, char *buf)
1356 struct rbd_device *rbd_dev = dev_to_rbd(dev);
1358 return sprintf(buf, "%llu\n", (unsigned long long)rbd_dev->header.image_size);
1361 static ssize_t rbd_major_show(struct device *dev,
1362 struct device_attribute *attr, char *buf)
1364 struct rbd_device *rbd_dev = dev_to_rbd(dev);
1366 return sprintf(buf, "%d\n", rbd_dev->major);
1369 static ssize_t rbd_client_id_show(struct device *dev,
1370 struct device_attribute *attr, char *buf)
1372 struct rbd_device *rbd_dev = dev_to_rbd(dev);
1374 return sprintf(buf, "client%lld\n", ceph_client_id(rbd_dev->client));
1377 static ssize_t rbd_pool_show(struct device *dev,
1378 struct device_attribute *attr, char *buf)
1380 struct rbd_device *rbd_dev = dev_to_rbd(dev);
1382 return sprintf(buf, "%s\n", rbd_dev->pool_name);
1385 static ssize_t rbd_name_show(struct device *dev,
1386 struct device_attribute *attr, char *buf)
1388 struct rbd_device *rbd_dev = dev_to_rbd(dev);
1390 return sprintf(buf, "%s\n", rbd_dev->obj);
1393 static ssize_t rbd_snap_show(struct device *dev,
1394 struct device_attribute *attr,
1395 char *buf)
1397 struct rbd_device *rbd_dev = dev_to_rbd(dev);
1399 return sprintf(buf, "%s\n", rbd_dev->snap_name);
1402 static ssize_t rbd_image_refresh(struct device *dev,
1403 struct device_attribute *attr,
1404 const char *buf,
1405 size_t size)
1407 struct rbd_device *rbd_dev = dev_to_rbd(dev);
1408 int rc;
1409 int ret = size;
1411 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1413 rc = __rbd_update_snaps(rbd_dev);
1414 if (rc < 0)
1415 ret = rc;
1417 mutex_unlock(&ctl_mutex);
1418 return ret;
1421 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
1422 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
1423 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
1424 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
1425 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
1426 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
1427 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
1428 static DEVICE_ATTR(create_snap, S_IWUSR, NULL, rbd_snap_add);
1429 static DEVICE_ATTR(rollback_snap, S_IWUSR, NULL, rbd_snap_rollback);
1431 static struct attribute *rbd_attrs[] = {
1432 &dev_attr_size.attr,
1433 &dev_attr_major.attr,
1434 &dev_attr_client_id.attr,
1435 &dev_attr_pool.attr,
1436 &dev_attr_name.attr,
1437 &dev_attr_current_snap.attr,
1438 &dev_attr_refresh.attr,
1439 &dev_attr_create_snap.attr,
1440 &dev_attr_rollback_snap.attr,
1441 NULL
1444 static struct attribute_group rbd_attr_group = {
1445 .attrs = rbd_attrs,
1448 static const struct attribute_group *rbd_attr_groups[] = {
1449 &rbd_attr_group,
1450 NULL
1453 static void rbd_sysfs_dev_release(struct device *dev)
1457 static struct device_type rbd_device_type = {
1458 .name = "rbd",
1459 .groups = rbd_attr_groups,
1460 .release = rbd_sysfs_dev_release,
1465 sysfs - snapshots
1468 static ssize_t rbd_snap_size_show(struct device *dev,
1469 struct device_attribute *attr,
1470 char *buf)
1472 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1474 return sprintf(buf, "%lld\n", (long long)snap->size);
1477 static ssize_t rbd_snap_id_show(struct device *dev,
1478 struct device_attribute *attr,
1479 char *buf)
1481 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1483 return sprintf(buf, "%lld\n", (long long)snap->id);
1486 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
1487 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
1489 static struct attribute *rbd_snap_attrs[] = {
1490 &dev_attr_snap_size.attr,
1491 &dev_attr_snap_id.attr,
1492 NULL,
1495 static struct attribute_group rbd_snap_attr_group = {
1496 .attrs = rbd_snap_attrs,
1499 static void rbd_snap_dev_release(struct device *dev)
1501 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1502 kfree(snap->name);
1503 kfree(snap);
1506 static const struct attribute_group *rbd_snap_attr_groups[] = {
1507 &rbd_snap_attr_group,
1508 NULL
1511 static struct device_type rbd_snap_device_type = {
1512 .groups = rbd_snap_attr_groups,
1513 .release = rbd_snap_dev_release,
1516 static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
1517 struct rbd_snap *snap)
1519 list_del(&snap->node);
1520 device_unregister(&snap->dev);
1523 static int rbd_register_snap_dev(struct rbd_device *rbd_dev,
1524 struct rbd_snap *snap,
1525 struct device *parent)
1527 struct device *dev = &snap->dev;
1528 int ret;
1530 dev->type = &rbd_snap_device_type;
1531 dev->parent = parent;
1532 dev->release = rbd_snap_dev_release;
1533 dev_set_name(dev, "snap_%s", snap->name);
1534 ret = device_register(dev);
1536 return ret;
1539 static int __rbd_add_snap_dev(struct rbd_device *rbd_dev,
1540 int i, const char *name,
1541 struct rbd_snap **snapp)
1543 int ret;
1544 struct rbd_snap *snap = kzalloc(sizeof(*snap), GFP_KERNEL);
1545 if (!snap)
1546 return -ENOMEM;
1547 snap->name = kstrdup(name, GFP_KERNEL);
1548 snap->size = rbd_dev->header.snap_sizes[i];
1549 snap->id = rbd_dev->header.snapc->snaps[i];
1550 if (device_is_registered(&rbd_dev->dev)) {
1551 ret = rbd_register_snap_dev(rbd_dev, snap,
1552 &rbd_dev->dev);
1553 if (ret < 0)
1554 goto err;
1556 *snapp = snap;
1557 return 0;
1558 err:
1559 kfree(snap->name);
1560 kfree(snap);
1561 return ret;
1565 * search for the previous snap in a null delimited string list
1567 const char *rbd_prev_snap_name(const char *name, const char *start)
1569 if (name < start + 2)
1570 return NULL;
1572 name -= 2;
1573 while (*name) {
1574 if (name == start)
1575 return start;
1576 name--;
1578 return name + 1;
1582 * compare the old list of snapshots that we have to what's in the header
1583 * and update it accordingly. Note that the header holds the snapshots
1584 * in a reverse order (from newest to oldest) and we need to go from
1585 * older to new so that we don't get a duplicate snap name when
1586 * doing the process (e.g., removed snapshot and recreated a new
1587 * one with the same name.
1589 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev)
1591 const char *name, *first_name;
1592 int i = rbd_dev->header.total_snaps;
1593 struct rbd_snap *snap, *old_snap = NULL;
1594 int ret;
1595 struct list_head *p, *n;
1597 first_name = rbd_dev->header.snap_names;
1598 name = first_name + rbd_dev->header.snap_names_len;
1600 list_for_each_prev_safe(p, n, &rbd_dev->snaps) {
1601 u64 cur_id;
1603 old_snap = list_entry(p, struct rbd_snap, node);
1605 if (i)
1606 cur_id = rbd_dev->header.snapc->snaps[i - 1];
1608 if (!i || old_snap->id < cur_id) {
1609 /* old_snap->id was skipped, thus was removed */
1610 __rbd_remove_snap_dev(rbd_dev, old_snap);
1611 continue;
1613 if (old_snap->id == cur_id) {
1614 /* we have this snapshot already */
1615 i--;
1616 name = rbd_prev_snap_name(name, first_name);
1617 continue;
1619 for (; i > 0;
1620 i--, name = rbd_prev_snap_name(name, first_name)) {
1621 if (!name) {
1622 WARN_ON(1);
1623 return -EINVAL;
1625 cur_id = rbd_dev->header.snapc->snaps[i];
1626 /* snapshot removal? handle it above */
1627 if (cur_id >= old_snap->id)
1628 break;
1629 /* a new snapshot */
1630 ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
1631 if (ret < 0)
1632 return ret;
1634 /* note that we add it backward so using n and not p */
1635 list_add(&snap->node, n);
1636 p = &snap->node;
1639 /* we're done going over the old snap list, just add what's left */
1640 for (; i > 0; i--) {
1641 name = rbd_prev_snap_name(name, first_name);
1642 if (!name) {
1643 WARN_ON(1);
1644 return -EINVAL;
1646 ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
1647 if (ret < 0)
1648 return ret;
1649 list_add(&snap->node, &rbd_dev->snaps);
1652 return 0;
1656 static void rbd_root_dev_release(struct device *dev)
1660 static struct device rbd_root_dev = {
1661 .init_name = "rbd",
1662 .release = rbd_root_dev_release,
1665 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
1667 int ret = -ENOMEM;
1668 struct device *dev;
1669 struct rbd_snap *snap;
1671 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1672 dev = &rbd_dev->dev;
1674 dev->bus = &rbd_bus_type;
1675 dev->type = &rbd_device_type;
1676 dev->parent = &rbd_root_dev;
1677 dev->release = rbd_dev_release;
1678 dev_set_name(dev, "%d", rbd_dev->id);
1679 ret = device_register(dev);
1680 if (ret < 0)
1681 goto done_free;
1683 list_for_each_entry(snap, &rbd_dev->snaps, node) {
1684 ret = rbd_register_snap_dev(rbd_dev, snap,
1685 &rbd_dev->dev);
1686 if (ret < 0)
1687 break;
1690 mutex_unlock(&ctl_mutex);
1691 return 0;
1692 done_free:
1693 mutex_unlock(&ctl_mutex);
1694 return ret;
1697 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
1699 device_unregister(&rbd_dev->dev);
1702 static ssize_t rbd_add(struct bus_type *bus, const char *buf, size_t count)
1704 struct ceph_osd_client *osdc;
1705 struct rbd_device *rbd_dev;
1706 ssize_t rc = -ENOMEM;
1707 int irc, new_id = 0;
1708 struct list_head *tmp;
1709 char *mon_dev_name;
1710 char *options;
1712 if (!try_module_get(THIS_MODULE))
1713 return -ENODEV;
1715 mon_dev_name = kmalloc(RBD_MAX_OPT_LEN, GFP_KERNEL);
1716 if (!mon_dev_name)
1717 goto err_out_mod;
1719 options = kmalloc(RBD_MAX_OPT_LEN, GFP_KERNEL);
1720 if (!options)
1721 goto err_mon_dev;
1723 /* new rbd_device object */
1724 rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
1725 if (!rbd_dev)
1726 goto err_out_opt;
1728 /* static rbd_device initialization */
1729 spin_lock_init(&rbd_dev->lock);
1730 INIT_LIST_HEAD(&rbd_dev->node);
1731 INIT_LIST_HEAD(&rbd_dev->snaps);
1733 /* generate unique id: find highest unique id, add one */
1734 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1736 list_for_each(tmp, &rbd_dev_list) {
1737 struct rbd_device *rbd_dev;
1739 rbd_dev = list_entry(tmp, struct rbd_device, node);
1740 if (rbd_dev->id >= new_id)
1741 new_id = rbd_dev->id + 1;
1744 rbd_dev->id = new_id;
1746 /* add to global list */
1747 list_add_tail(&rbd_dev->node, &rbd_dev_list);
1749 /* parse add command */
1750 if (sscanf(buf, "%" __stringify(RBD_MAX_OPT_LEN) "s "
1751 "%" __stringify(RBD_MAX_OPT_LEN) "s "
1752 "%" __stringify(RBD_MAX_POOL_NAME_LEN) "s "
1753 "%" __stringify(RBD_MAX_OBJ_NAME_LEN) "s"
1754 "%" __stringify(RBD_MAX_SNAP_NAME_LEN) "s",
1755 mon_dev_name, options, rbd_dev->pool_name,
1756 rbd_dev->obj, rbd_dev->snap_name) < 4) {
1757 rc = -EINVAL;
1758 goto err_out_slot;
1761 if (rbd_dev->snap_name[0] == 0)
1762 rbd_dev->snap_name[0] = '-';
1764 rbd_dev->obj_len = strlen(rbd_dev->obj);
1765 snprintf(rbd_dev->obj_md_name, sizeof(rbd_dev->obj_md_name), "%s%s",
1766 rbd_dev->obj, RBD_SUFFIX);
1768 /* initialize rest of new object */
1769 snprintf(rbd_dev->name, DEV_NAME_LEN, DRV_NAME "%d", rbd_dev->id);
1770 rc = rbd_get_client(rbd_dev, mon_dev_name, options);
1771 if (rc < 0)
1772 goto err_out_slot;
1774 mutex_unlock(&ctl_mutex);
1776 /* pick the pool */
1777 osdc = &rbd_dev->client->osdc;
1778 rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
1779 if (rc < 0)
1780 goto err_out_client;
1781 rbd_dev->poolid = rc;
1783 /* register our block device */
1784 irc = register_blkdev(0, rbd_dev->name);
1785 if (irc < 0) {
1786 rc = irc;
1787 goto err_out_client;
1789 rbd_dev->major = irc;
1791 rc = rbd_bus_add_dev(rbd_dev);
1792 if (rc)
1793 goto err_out_blkdev;
1795 /* set up and announce blkdev mapping */
1796 rc = rbd_init_disk(rbd_dev);
1797 if (rc)
1798 goto err_out_bus;
1800 return count;
1802 err_out_bus:
1803 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1804 list_del_init(&rbd_dev->node);
1805 mutex_unlock(&ctl_mutex);
1807 /* this will also clean up rest of rbd_dev stuff */
1809 rbd_bus_del_dev(rbd_dev);
1810 kfree(options);
1811 kfree(mon_dev_name);
1812 return rc;
1814 err_out_blkdev:
1815 unregister_blkdev(rbd_dev->major, rbd_dev->name);
1816 err_out_client:
1817 rbd_put_client(rbd_dev);
1818 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1819 err_out_slot:
1820 list_del_init(&rbd_dev->node);
1821 mutex_unlock(&ctl_mutex);
1823 kfree(rbd_dev);
1824 err_out_opt:
1825 kfree(options);
1826 err_mon_dev:
1827 kfree(mon_dev_name);
1828 err_out_mod:
1829 dout("Error adding device %s\n", buf);
1830 module_put(THIS_MODULE);
1831 return rc;
1834 static struct rbd_device *__rbd_get_dev(unsigned long id)
1836 struct list_head *tmp;
1837 struct rbd_device *rbd_dev;
1839 list_for_each(tmp, &rbd_dev_list) {
1840 rbd_dev = list_entry(tmp, struct rbd_device, node);
1841 if (rbd_dev->id == id)
1842 return rbd_dev;
1844 return NULL;
1847 static void rbd_dev_release(struct device *dev)
1849 struct rbd_device *rbd_dev =
1850 container_of(dev, struct rbd_device, dev);
1852 rbd_put_client(rbd_dev);
1854 /* clean up and free blkdev */
1855 rbd_free_disk(rbd_dev);
1856 unregister_blkdev(rbd_dev->major, rbd_dev->name);
1857 kfree(rbd_dev);
1859 /* release module ref */
1860 module_put(THIS_MODULE);
1863 static ssize_t rbd_remove(struct bus_type *bus,
1864 const char *buf,
1865 size_t count)
1867 struct rbd_device *rbd_dev = NULL;
1868 int target_id, rc;
1869 unsigned long ul;
1870 int ret = count;
1872 rc = strict_strtoul(buf, 10, &ul);
1873 if (rc)
1874 return rc;
1876 /* convert to int; abort if we lost anything in the conversion */
1877 target_id = (int) ul;
1878 if (target_id != ul)
1879 return -EINVAL;
1881 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1883 rbd_dev = __rbd_get_dev(target_id);
1884 if (!rbd_dev) {
1885 ret = -ENOENT;
1886 goto done;
1889 list_del_init(&rbd_dev->node);
1891 __rbd_remove_all_snaps(rbd_dev);
1892 rbd_bus_del_dev(rbd_dev);
1894 done:
1895 mutex_unlock(&ctl_mutex);
1896 return ret;
1899 static ssize_t rbd_snap_add(struct device *dev,
1900 struct device_attribute *attr,
1901 const char *buf,
1902 size_t count)
1904 struct rbd_device *rbd_dev = dev_to_rbd(dev);
1905 int ret;
1906 char *name = kmalloc(count + 1, GFP_KERNEL);
1907 if (!name)
1908 return -ENOMEM;
1910 snprintf(name, count, "%s", buf);
1912 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1914 ret = rbd_header_add_snap(rbd_dev,
1915 name, GFP_KERNEL);
1916 if (ret < 0)
1917 goto done_unlock;
1919 ret = __rbd_update_snaps(rbd_dev);
1920 if (ret < 0)
1921 goto done_unlock;
1923 ret = count;
1924 done_unlock:
1925 mutex_unlock(&ctl_mutex);
1926 kfree(name);
1927 return ret;
1930 static ssize_t rbd_snap_rollback(struct device *dev,
1931 struct device_attribute *attr,
1932 const char *buf,
1933 size_t count)
1935 struct rbd_device *rbd_dev = dev_to_rbd(dev);
1936 int ret;
1937 u64 snapid;
1938 u64 cur_ofs;
1939 char *seg_name = NULL;
1940 char *snap_name = kmalloc(count + 1, GFP_KERNEL);
1941 ret = -ENOMEM;
1942 if (!snap_name)
1943 return ret;
1945 /* parse snaps add command */
1946 snprintf(snap_name, count, "%s", buf);
1947 seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
1948 if (!seg_name)
1949 goto done;
1951 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1953 ret = snap_by_name(&rbd_dev->header, snap_name, &snapid, NULL);
1954 if (ret < 0)
1955 goto done_unlock;
1957 dout("snapid=%lld\n", snapid);
1959 cur_ofs = 0;
1960 while (cur_ofs < rbd_dev->header.image_size) {
1961 cur_ofs += rbd_get_segment(&rbd_dev->header,
1962 rbd_dev->obj,
1963 cur_ofs, (u64)-1,
1964 seg_name, NULL);
1965 dout("seg_name=%s\n", seg_name);
1967 ret = rbd_req_sync_rollback_obj(rbd_dev, snapid, seg_name);
1968 if (ret < 0)
1969 pr_warning("could not roll back obj %s err=%d\n",
1970 seg_name, ret);
1973 ret = __rbd_update_snaps(rbd_dev);
1974 if (ret < 0)
1975 goto done_unlock;
1977 ret = count;
1979 done_unlock:
1980 mutex_unlock(&ctl_mutex);
1981 done:
1982 kfree(seg_name);
1983 kfree(snap_name);
1985 return ret;
1988 static struct bus_attribute rbd_bus_attrs[] = {
1989 __ATTR(add, S_IWUSR, NULL, rbd_add),
1990 __ATTR(remove, S_IWUSR, NULL, rbd_remove),
1991 __ATTR_NULL
1995 * create control files in sysfs
1996 * /sys/bus/rbd/...
1998 static int rbd_sysfs_init(void)
2000 int ret;
2002 rbd_bus_type.bus_attrs = rbd_bus_attrs;
2004 ret = bus_register(&rbd_bus_type);
2005 if (ret < 0)
2006 return ret;
2008 ret = device_register(&rbd_root_dev);
2010 return ret;
2013 static void rbd_sysfs_cleanup(void)
2015 device_unregister(&rbd_root_dev);
2016 bus_unregister(&rbd_bus_type);
2019 int __init rbd_init(void)
2021 int rc;
2023 rc = rbd_sysfs_init();
2024 if (rc)
2025 return rc;
2026 spin_lock_init(&node_lock);
2027 pr_info("loaded " DRV_NAME_LONG "\n");
2028 return 0;
2031 void __exit rbd_exit(void)
2033 rbd_sysfs_cleanup();
2036 module_init(rbd_init);
2037 module_exit(rbd_exit);
2039 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
2040 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
2041 MODULE_DESCRIPTION("rados block device");
2043 /* following authorship retained from original osdblk.c */
2044 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
2046 MODULE_LICENSE("GPL");