2 rbd.c -- Export ceph rados objects as a Linux block device
5 based on drivers/block/osdblk.c:
7 Copyright 2009 Red Hat, Inc.
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation.
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
18 You should have received a copy of the GNU General Public License
19 along with this program; see the file COPYING. If not, write to
20 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
24 For usage instructions, please refer to:
26 Documentation/ABI/testing/sysfs-bus-rbd
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
40 #include <linux/blkdev.h>
42 #include "rbd_types.h"
45 * The basic unit of block I/O is a sector. It is interpreted in a
46 * number of contexts in Linux (blk, bio, genhd), but the default is
47 * universally 512 bytes. These symbols are just slightly more
48 * meaningful than the bare numbers they represent.
50 #define SECTOR_SHIFT 9
51 #define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
53 #define RBD_DRV_NAME "rbd"
54 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
56 #define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
58 #define RBD_MAX_SNAP_NAME_LEN 32
59 #define RBD_MAX_OPT_LEN 1024
61 #define RBD_SNAP_HEAD_NAME "-"
64 * An RBD device name will be "rbd#", where the "rbd" comes from
65 * RBD_DRV_NAME above, and # is a unique integer identifier.
66 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
67 * enough to hold all possible device names.
69 #define DEV_NAME_LEN 32
70 #define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
72 #define RBD_NOTIFY_TIMEOUT_DEFAULT 10
75 * block device image metadata (in-memory version)
77 struct rbd_image_header {
83 struct ceph_snap_context *snapc;
84 size_t snap_names_len;
99 * an instance of the client. multiple devices may share an rbd client.
102 struct ceph_client *client;
103 struct rbd_options *rbd_opts;
105 struct list_head node;
109 * a request completion status
111 struct rbd_req_status {
118 * a collection of requests
120 struct rbd_req_coll {
124 struct rbd_req_status status[0];
128 * a single io request
131 struct request *rq; /* blk layer request */
132 struct bio *bio; /* cloned bio */
133 struct page **pages; /* list of used pages */
136 struct rbd_req_coll *coll;
143 struct list_head node;
151 int id; /* blkdev unique id */
153 int major; /* blkdev assigned major */
154 struct gendisk *disk; /* blkdev's gendisk and rq */
155 struct request_queue *q;
157 struct rbd_client *rbd_client;
159 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
161 spinlock_t lock; /* queue lock */
163 struct rbd_image_header header;
165 size_t image_name_len;
170 struct ceph_osd_event *watch_event;
171 struct ceph_osd_request *watch_request;
173 /* protects updating the header */
174 struct rw_semaphore header_rwsem;
176 u64 snap_id; /* current snapshot id */
179 struct list_head node;
181 /* list of snapshots */
182 struct list_head snaps;
188 static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
190 static LIST_HEAD(rbd_dev_list); /* devices */
191 static DEFINE_SPINLOCK(rbd_dev_list_lock);
193 static LIST_HEAD(rbd_client_list); /* clients */
194 static DEFINE_SPINLOCK(rbd_client_list_lock);
196 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev);
197 static void rbd_dev_release(struct device *dev);
198 static ssize_t rbd_snap_add(struct device *dev,
199 struct device_attribute *attr,
202 static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
203 struct rbd_snap *snap);
205 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
207 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
210 static struct bus_attribute rbd_bus_attrs[] = {
211 __ATTR(add, S_IWUSR, NULL, rbd_add),
212 __ATTR(remove, S_IWUSR, NULL, rbd_remove),
216 static struct bus_type rbd_bus_type = {
218 .bus_attrs = rbd_bus_attrs,
221 static void rbd_root_dev_release(struct device *dev)
225 static struct device rbd_root_dev = {
227 .release = rbd_root_dev_release,
231 static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
233 return get_device(&rbd_dev->dev);
236 static void rbd_put_dev(struct rbd_device *rbd_dev)
238 put_device(&rbd_dev->dev);
241 static int __rbd_refresh_header(struct rbd_device *rbd_dev);
243 static int rbd_open(struct block_device *bdev, fmode_t mode)
245 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
247 rbd_get_dev(rbd_dev);
249 set_device_ro(bdev, rbd_dev->read_only);
251 if ((mode & FMODE_WRITE) && rbd_dev->read_only)
257 static int rbd_release(struct gendisk *disk, fmode_t mode)
259 struct rbd_device *rbd_dev = disk->private_data;
261 rbd_put_dev(rbd_dev);
266 static const struct block_device_operations rbd_bd_ops = {
267 .owner = THIS_MODULE,
269 .release = rbd_release,
273 * Initialize an rbd client instance.
276 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts,
277 struct rbd_options *rbd_opts)
279 struct rbd_client *rbdc;
282 dout("rbd_client_create\n");
283 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
287 kref_init(&rbdc->kref);
288 INIT_LIST_HEAD(&rbdc->node);
290 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
292 rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
293 if (IS_ERR(rbdc->client))
295 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
297 ret = ceph_open_session(rbdc->client);
301 rbdc->rbd_opts = rbd_opts;
303 spin_lock(&rbd_client_list_lock);
304 list_add_tail(&rbdc->node, &rbd_client_list);
305 spin_unlock(&rbd_client_list_lock);
307 mutex_unlock(&ctl_mutex);
309 dout("rbd_client_create created %p\n", rbdc);
313 ceph_destroy_client(rbdc->client);
315 mutex_unlock(&ctl_mutex);
319 ceph_destroy_options(ceph_opts);
324 * Find a ceph client with specific addr and configuration.
326 static struct rbd_client *__rbd_client_find(struct ceph_options *ceph_opts)
328 struct rbd_client *client_node;
330 if (ceph_opts->flags & CEPH_OPT_NOSHARE)
333 list_for_each_entry(client_node, &rbd_client_list, node)
334 if (!ceph_compare_options(ceph_opts, client_node->client))
347 /* string args above */
350 static match_table_t rbd_opts_tokens = {
351 {Opt_notify_timeout, "notify_timeout=%d"},
353 /* string args above */
357 static int parse_rbd_opts_token(char *c, void *private)
359 struct rbd_options *rbd_opts = private;
360 substring_t argstr[MAX_OPT_ARGS];
361 int token, intval, ret;
363 token = match_token(c, rbd_opts_tokens, argstr);
367 if (token < Opt_last_int) {
368 ret = match_int(&argstr[0], &intval);
370 pr_err("bad mount option arg (not int) "
374 dout("got int token %d val %d\n", token, intval);
375 } else if (token > Opt_last_int && token < Opt_last_string) {
376 dout("got string token %d val %s\n", token,
379 dout("got token %d\n", token);
383 case Opt_notify_timeout:
384 rbd_opts->notify_timeout = intval;
393 * Get a ceph client with specific addr and configuration, if one does
394 * not exist create it.
396 static struct rbd_client *rbd_get_client(const char *mon_addr,
400 struct rbd_client *rbdc;
401 struct ceph_options *ceph_opts;
402 struct rbd_options *rbd_opts;
404 rbd_opts = kzalloc(sizeof(*rbd_opts), GFP_KERNEL);
406 return ERR_PTR(-ENOMEM);
408 rbd_opts->notify_timeout = RBD_NOTIFY_TIMEOUT_DEFAULT;
410 ceph_opts = ceph_parse_options(options, mon_addr,
411 mon_addr + mon_addr_len,
412 parse_rbd_opts_token, rbd_opts);
413 if (IS_ERR(ceph_opts)) {
415 return ERR_CAST(ceph_opts);
418 spin_lock(&rbd_client_list_lock);
419 rbdc = __rbd_client_find(ceph_opts);
421 /* using an existing client */
422 kref_get(&rbdc->kref);
423 spin_unlock(&rbd_client_list_lock);
425 ceph_destroy_options(ceph_opts);
430 spin_unlock(&rbd_client_list_lock);
432 rbdc = rbd_client_create(ceph_opts, rbd_opts);
441 * Destroy ceph client
443 * Caller must hold rbd_client_list_lock.
445 static void rbd_client_release(struct kref *kref)
447 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
449 dout("rbd_release_client %p\n", rbdc);
450 spin_lock(&rbd_client_list_lock);
451 list_del(&rbdc->node);
452 spin_unlock(&rbd_client_list_lock);
454 ceph_destroy_client(rbdc->client);
455 kfree(rbdc->rbd_opts);
460 * Drop reference to ceph client node. If it's not referenced anymore, release
463 static void rbd_put_client(struct rbd_device *rbd_dev)
465 kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
466 rbd_dev->rbd_client = NULL;
470 * Destroy requests collection
472 static void rbd_coll_release(struct kref *kref)
474 struct rbd_req_coll *coll =
475 container_of(kref, struct rbd_req_coll, kref);
477 dout("rbd_coll_release %p\n", coll);
482 * Create a new header structure, translate header format from the on-disk
485 static int rbd_header_from_disk(struct rbd_image_header *header,
486 struct rbd_image_header_ondisk *ondisk,
492 if (memcmp(ondisk, RBD_HEADER_TEXT, sizeof(RBD_HEADER_TEXT)))
495 snap_count = le32_to_cpu(ondisk->snap_count);
496 if (snap_count > (UINT_MAX - sizeof(struct ceph_snap_context))
499 header->snapc = kmalloc(sizeof(struct ceph_snap_context) +
500 snap_count * sizeof(u64),
505 header->snap_names_len = le64_to_cpu(ondisk->snap_names_len);
507 header->snap_names = kmalloc(header->snap_names_len,
509 if (!header->snap_names)
511 header->snap_sizes = kmalloc(snap_count * sizeof(u64),
513 if (!header->snap_sizes)
516 header->snap_names = NULL;
517 header->snap_sizes = NULL;
520 header->object_prefix = kmalloc(sizeof (ondisk->block_name) + 1,
522 if (!header->object_prefix)
525 memcpy(header->object_prefix, ondisk->block_name,
526 sizeof(ondisk->block_name));
527 header->object_prefix[sizeof (ondisk->block_name)] = '\0';
529 header->image_size = le64_to_cpu(ondisk->image_size);
530 header->obj_order = ondisk->options.order;
531 header->crypt_type = ondisk->options.crypt_type;
532 header->comp_type = ondisk->options.comp_type;
534 atomic_set(&header->snapc->nref, 1);
535 header->snap_seq = le64_to_cpu(ondisk->snap_seq);
536 header->snapc->num_snaps = snap_count;
537 header->total_snaps = snap_count;
539 if (snap_count && allocated_snaps == snap_count) {
540 for (i = 0; i < snap_count; i++) {
541 header->snapc->snaps[i] =
542 le64_to_cpu(ondisk->snaps[i].id);
543 header->snap_sizes[i] =
544 le64_to_cpu(ondisk->snaps[i].image_size);
547 /* copy snapshot names */
548 memcpy(header->snap_names, &ondisk->snaps[i],
549 header->snap_names_len);
555 kfree(header->snap_sizes);
557 kfree(header->snap_names);
559 kfree(header->snapc);
563 static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
567 char *p = header->snap_names;
569 for (i = 0; i < header->total_snaps; i++) {
570 if (!strcmp(snap_name, p)) {
572 /* Found it. Pass back its id and/or size */
575 *seq = header->snapc->snaps[i];
577 *size = header->snap_sizes[i];
580 p += strlen(p) + 1; /* Skip ahead to the next name */
585 static int rbd_header_set_snap(struct rbd_device *rbd_dev, u64 *size)
587 struct rbd_image_header *header = &rbd_dev->header;
588 struct ceph_snap_context *snapc = header->snapc;
591 down_write(&rbd_dev->header_rwsem);
593 if (!memcmp(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
594 sizeof (RBD_SNAP_HEAD_NAME))) {
595 if (header->total_snaps)
596 snapc->seq = header->snap_seq;
599 rbd_dev->snap_id = CEPH_NOSNAP;
600 rbd_dev->read_only = 0;
602 *size = header->image_size;
604 ret = snap_by_name(header, rbd_dev->snap_name,
608 rbd_dev->snap_id = snapc->seq;
609 rbd_dev->read_only = 1;
614 up_write(&rbd_dev->header_rwsem);
618 static void rbd_header_free(struct rbd_image_header *header)
620 kfree(header->object_prefix);
621 kfree(header->snap_sizes);
622 kfree(header->snap_names);
623 kfree(header->snapc);
627 * get the actual striped segment name, offset and length
629 static u64 rbd_get_segment(struct rbd_image_header *header,
630 const char *object_prefix,
632 char *seg_name, u64 *segofs)
634 u64 seg = ofs >> header->obj_order;
637 snprintf(seg_name, RBD_MAX_SEG_NAME_LEN,
638 "%s.%012llx", object_prefix, seg);
640 ofs = ofs & ((1 << header->obj_order) - 1);
641 len = min_t(u64, len, (1 << header->obj_order) - ofs);
649 static int rbd_get_num_segments(struct rbd_image_header *header,
652 u64 start_seg = ofs >> header->obj_order;
653 u64 end_seg = (ofs + len - 1) >> header->obj_order;
654 return end_seg - start_seg + 1;
658 * returns the size of an object in the image
660 static u64 rbd_obj_bytes(struct rbd_image_header *header)
662 return 1 << header->obj_order;
669 static void bio_chain_put(struct bio *chain)
675 chain = chain->bi_next;
681 * zeros a bio chain, starting at specific offset
683 static void zero_bio_chain(struct bio *chain, int start_ofs)
692 bio_for_each_segment(bv, chain, i) {
693 if (pos + bv->bv_len > start_ofs) {
694 int remainder = max(start_ofs - pos, 0);
695 buf = bvec_kmap_irq(bv, &flags);
696 memset(buf + remainder, 0,
697 bv->bv_len - remainder);
698 bvec_kunmap_irq(buf, &flags);
703 chain = chain->bi_next;
708 * bio_chain_clone - clone a chain of bios up to a certain length.
709 * might return a bio_pair that will need to be released.
711 static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
712 struct bio_pair **bp,
713 int len, gfp_t gfpmask)
715 struct bio *tmp, *old_chain = *old, *new_chain = NULL, *tail = NULL;
719 bio_pair_release(*bp);
723 while (old_chain && (total < len)) {
724 tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
728 if (total + old_chain->bi_size > len) {
732 * this split can only happen with a single paged bio,
733 * split_bio will BUG_ON if this is not the case
735 dout("bio_chain_clone split! total=%d remaining=%d"
737 (int)total, (int)len-total,
738 (int)old_chain->bi_size);
740 /* split the bio. We'll release it either in the next
741 call, or it will have to be released outside */
742 bp = bio_split(old_chain, (len - total) / SECTOR_SIZE);
746 __bio_clone(tmp, &bp->bio1);
750 __bio_clone(tmp, old_chain);
751 *next = old_chain->bi_next;
755 gfpmask &= ~__GFP_WAIT;
759 new_chain = tail = tmp;
764 old_chain = old_chain->bi_next;
766 total += tmp->bi_size;
772 tail->bi_next = NULL;
779 dout("bio_chain_clone with err\n");
780 bio_chain_put(new_chain);
785 * helpers for osd request op vectors.
787 static int rbd_create_rw_ops(struct ceph_osd_req_op **ops,
792 *ops = kzalloc(sizeof(struct ceph_osd_req_op) * (num_ops + 1),
796 (*ops)[0].op = opcode;
798 * op extent offset and length will be set later on
799 * in calc_raw_layout()
801 (*ops)[0].payload_len = payload_len;
805 static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
810 static void rbd_coll_end_req_index(struct request *rq,
811 struct rbd_req_coll *coll,
815 struct request_queue *q;
818 dout("rbd_coll_end_req_index %p index %d ret %d len %lld\n",
819 coll, index, ret, len);
825 blk_end_request(rq, ret, len);
831 spin_lock_irq(q->queue_lock);
832 coll->status[index].done = 1;
833 coll->status[index].rc = ret;
834 coll->status[index].bytes = len;
835 max = min = coll->num_done;
836 while (max < coll->total && coll->status[max].done)
839 for (i = min; i<max; i++) {
840 __blk_end_request(rq, coll->status[i].rc,
841 coll->status[i].bytes);
843 kref_put(&coll->kref, rbd_coll_release);
845 spin_unlock_irq(q->queue_lock);
848 static void rbd_coll_end_req(struct rbd_request *req,
851 rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
855 * Send ceph osd request
857 static int rbd_do_request(struct request *rq,
858 struct rbd_device *rbd_dev,
859 struct ceph_snap_context *snapc,
861 const char *object_name, u64 ofs, u64 len,
866 struct ceph_osd_req_op *ops,
868 struct rbd_req_coll *coll,
870 void (*rbd_cb)(struct ceph_osd_request *req,
871 struct ceph_msg *msg),
872 struct ceph_osd_request **linger_req,
875 struct ceph_osd_request *req;
876 struct ceph_file_layout *layout;
879 struct timespec mtime = CURRENT_TIME;
880 struct rbd_request *req_data;
881 struct ceph_osd_request_head *reqhead;
882 struct ceph_osd_client *osdc;
884 req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
887 rbd_coll_end_req_index(rq, coll, coll_index,
893 req_data->coll = coll;
894 req_data->coll_index = coll_index;
897 dout("rbd_do_request object_name=%s ofs=%lld len=%lld\n",
898 object_name, len, ofs);
900 down_read(&rbd_dev->header_rwsem);
902 osdc = &rbd_dev->rbd_client->client->osdc;
903 req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
904 false, GFP_NOIO, pages, bio);
906 up_read(&rbd_dev->header_rwsem);
911 req->r_callback = rbd_cb;
915 req_data->pages = pages;
918 req->r_priv = req_data;
920 reqhead = req->r_request->front.iov_base;
921 reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
923 strncpy(req->r_oid, object_name, sizeof(req->r_oid));
924 req->r_oid_len = strlen(req->r_oid);
926 layout = &req->r_file_layout;
927 memset(layout, 0, sizeof(*layout));
928 layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
929 layout->fl_stripe_count = cpu_to_le32(1);
930 layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
931 layout->fl_pg_pool = cpu_to_le32(rbd_dev->pool_id);
932 ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
935 ceph_osdc_build_request(req, ofs, &len,
939 req->r_oid, req->r_oid_len);
940 up_read(&rbd_dev->header_rwsem);
943 ceph_osdc_set_request_linger(osdc, req);
947 ret = ceph_osdc_start_request(osdc, req, false);
952 ret = ceph_osdc_wait_request(osdc, req);
954 *ver = le64_to_cpu(req->r_reassert_version.version);
955 dout("reassert_ver=%lld\n",
956 le64_to_cpu(req->r_reassert_version.version));
957 ceph_osdc_put_request(req);
962 bio_chain_put(req_data->bio);
963 ceph_osdc_put_request(req);
965 rbd_coll_end_req(req_data, ret, len);
971 * Ceph osd op callback
973 static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
975 struct rbd_request *req_data = req->r_priv;
976 struct ceph_osd_reply_head *replyhead;
977 struct ceph_osd_op *op;
983 replyhead = msg->front.iov_base;
984 WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
985 op = (void *)(replyhead + 1);
986 rc = le32_to_cpu(replyhead->result);
987 bytes = le64_to_cpu(op->extent.length);
988 read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ);
990 dout("rbd_req_cb bytes=%lld readop=%d rc=%d\n", bytes, read_op, rc);
992 if (rc == -ENOENT && read_op) {
993 zero_bio_chain(req_data->bio, 0);
995 } else if (rc == 0 && read_op && bytes < req_data->len) {
996 zero_bio_chain(req_data->bio, bytes);
997 bytes = req_data->len;
1000 rbd_coll_end_req(req_data, rc, bytes);
1003 bio_chain_put(req_data->bio);
1005 ceph_osdc_put_request(req);
1009 static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1011 ceph_osdc_put_request(req);
1015 * Do a synchronous ceph osd operation
1017 static int rbd_req_sync_op(struct rbd_device *rbd_dev,
1018 struct ceph_snap_context *snapc,
1022 struct ceph_osd_req_op *orig_ops,
1024 const char *object_name,
1027 struct ceph_osd_request **linger_req,
1031 struct page **pages;
1033 struct ceph_osd_req_op *ops = orig_ops;
1036 num_pages = calc_pages_for(ofs , len);
1037 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1039 return PTR_ERR(pages);
1042 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? len : 0);
1043 ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
1047 if ((flags & CEPH_OSD_FLAG_WRITE) && buf) {
1048 ret = ceph_copy_to_page_vector(pages, buf, ofs, len);
1054 ret = rbd_do_request(NULL, rbd_dev, snapc, snapid,
1055 object_name, ofs, len, NULL,
1066 if ((flags & CEPH_OSD_FLAG_READ) && buf)
1067 ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
1071 rbd_destroy_ops(ops);
1073 ceph_release_page_vector(pages, num_pages);
1078 * Do an asynchronous ceph osd operation
1080 static int rbd_do_op(struct request *rq,
1081 struct rbd_device *rbd_dev,
1082 struct ceph_snap_context *snapc,
1084 int opcode, int flags, int num_reply,
1087 struct rbd_req_coll *coll,
1094 struct ceph_osd_req_op *ops;
1097 seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
1101 seg_len = rbd_get_segment(&rbd_dev->header,
1102 rbd_dev->header.object_prefix,
1104 seg_name, &seg_ofs);
1106 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
1108 ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
1112 /* we've taken care of segment sizes earlier when we
1113 cloned the bios. We should never have a segment
1114 truncated at this point */
1115 BUG_ON(seg_len < len);
1117 ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1118 seg_name, seg_ofs, seg_len,
1125 rbd_req_cb, 0, NULL);
1127 rbd_destroy_ops(ops);
1134 * Request async osd write
1136 static int rbd_req_write(struct request *rq,
1137 struct rbd_device *rbd_dev,
1138 struct ceph_snap_context *snapc,
1141 struct rbd_req_coll *coll,
1144 return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
1146 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1148 ofs, len, bio, coll, coll_index);
1152 * Request async osd read
1154 static int rbd_req_read(struct request *rq,
1155 struct rbd_device *rbd_dev,
1159 struct rbd_req_coll *coll,
1162 return rbd_do_op(rq, rbd_dev, NULL,
1167 ofs, len, bio, coll, coll_index);
1171 * Request sync osd read
1173 static int rbd_req_sync_read(struct rbd_device *rbd_dev,
1174 struct ceph_snap_context *snapc,
1176 const char *object_name,
1181 return rbd_req_sync_op(rbd_dev, NULL,
1186 1, object_name, ofs, len, buf, NULL, ver);
1190 * Request sync osd watch
1192 static int rbd_req_sync_notify_ack(struct rbd_device *rbd_dev,
1195 const char *object_name)
1197 struct ceph_osd_req_op *ops;
1200 ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1204 ops[0].watch.ver = cpu_to_le64(rbd_dev->header.obj_version);
1205 ops[0].watch.cookie = notify_id;
1206 ops[0].watch.flag = 0;
1208 ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
1209 object_name, 0, 0, NULL,
1215 rbd_simple_req_cb, 0, NULL);
1217 rbd_destroy_ops(ops);
1221 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1223 struct rbd_device *rbd_dev = (struct rbd_device *)data;
1229 dout("rbd_watch_cb %s notify_id=%lld opcode=%d\n",
1230 rbd_dev->header_name, notify_id, (int) opcode);
1231 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1232 rc = __rbd_refresh_header(rbd_dev);
1233 mutex_unlock(&ctl_mutex);
1235 pr_warning(RBD_DRV_NAME "%d got notification but failed to "
1236 " update snaps: %d\n", rbd_dev->major, rc);
1238 rbd_req_sync_notify_ack(rbd_dev, ver, notify_id, rbd_dev->header_name);
1242 * Request sync osd watch
1244 static int rbd_req_sync_watch(struct rbd_device *rbd_dev,
1245 const char *object_name,
1248 struct ceph_osd_req_op *ops;
1249 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1251 int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0);
1255 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1256 (void *)rbd_dev, &rbd_dev->watch_event);
1260 ops[0].watch.ver = cpu_to_le64(ver);
1261 ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1262 ops[0].watch.flag = 1;
1264 ret = rbd_req_sync_op(rbd_dev, NULL,
1267 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1269 1, object_name, 0, 0, NULL,
1270 &rbd_dev->watch_request, NULL);
1275 rbd_destroy_ops(ops);
1279 ceph_osdc_cancel_event(rbd_dev->watch_event);
1280 rbd_dev->watch_event = NULL;
1282 rbd_destroy_ops(ops);
1287 * Request sync osd unwatch
1289 static int rbd_req_sync_unwatch(struct rbd_device *rbd_dev,
1290 const char *object_name)
1292 struct ceph_osd_req_op *ops;
1294 int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0);
1298 ops[0].watch.ver = 0;
1299 ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1300 ops[0].watch.flag = 0;
1302 ret = rbd_req_sync_op(rbd_dev, NULL,
1305 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1307 1, object_name, 0, 0, NULL, NULL, NULL);
1309 rbd_destroy_ops(ops);
1310 ceph_osdc_cancel_event(rbd_dev->watch_event);
1311 rbd_dev->watch_event = NULL;
1315 struct rbd_notify_info {
1316 struct rbd_device *rbd_dev;
1319 static void rbd_notify_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1321 struct rbd_device *rbd_dev = (struct rbd_device *)data;
1325 dout("rbd_notify_cb %s notify_id=%lld opcode=%d\n",
1326 rbd_dev->header_name,
1327 notify_id, (int)opcode);
1331 * Request sync osd notify
1333 static int rbd_req_sync_notify(struct rbd_device *rbd_dev,
1334 const char *object_name)
1336 struct ceph_osd_req_op *ops;
1337 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1338 struct ceph_osd_event *event;
1339 struct rbd_notify_info info;
1340 int payload_len = sizeof(u32) + sizeof(u32);
1343 ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY, payload_len);
1347 info.rbd_dev = rbd_dev;
1349 ret = ceph_osdc_create_event(osdc, rbd_notify_cb, 1,
1350 (void *)&info, &event);
1354 ops[0].watch.ver = 1;
1355 ops[0].watch.flag = 1;
1356 ops[0].watch.cookie = event->cookie;
1357 ops[0].watch.prot_ver = RADOS_NOTIFY_VER;
1358 ops[0].watch.timeout = 12;
1360 ret = rbd_req_sync_op(rbd_dev, NULL,
1363 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1365 1, object_name, 0, 0, NULL, NULL, NULL);
1369 ret = ceph_osdc_wait_event(event, CEPH_OSD_TIMEOUT_DEFAULT);
1370 dout("ceph_osdc_wait_event returned %d\n", ret);
1371 rbd_destroy_ops(ops);
1375 ceph_osdc_cancel_event(event);
1377 rbd_destroy_ops(ops);
1382 * Request sync osd read
1384 static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
1385 const char *object_name,
1386 const char *class_name,
1387 const char *method_name,
1392 struct ceph_osd_req_op *ops;
1393 int class_name_len = strlen(class_name);
1394 int method_name_len = strlen(method_name);
1395 int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_CALL,
1396 class_name_len + method_name_len + len);
1400 ops[0].cls.class_name = class_name;
1401 ops[0].cls.class_len = (__u8) class_name_len;
1402 ops[0].cls.method_name = method_name;
1403 ops[0].cls.method_len = (__u8) method_name_len;
1404 ops[0].cls.argc = 0;
1405 ops[0].cls.indata = data;
1406 ops[0].cls.indata_len = len;
1408 ret = rbd_req_sync_op(rbd_dev, NULL,
1411 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1413 1, object_name, 0, 0, NULL, NULL, ver);
1415 rbd_destroy_ops(ops);
1417 dout("cls_exec returned %d\n", ret);
1421 static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1423 struct rbd_req_coll *coll =
1424 kzalloc(sizeof(struct rbd_req_coll) +
1425 sizeof(struct rbd_req_status) * num_reqs,
1430 coll->total = num_reqs;
1431 kref_init(&coll->kref);
1436 * block device queue callback
1438 static void rbd_rq_fn(struct request_queue *q)
1440 struct rbd_device *rbd_dev = q->queuedata;
1442 struct bio_pair *bp = NULL;
1444 while ((rq = blk_fetch_request(q))) {
1446 struct bio *rq_bio, *next_bio = NULL;
1448 int size, op_size = 0;
1450 int num_segs, cur_seg = 0;
1451 struct rbd_req_coll *coll;
1453 /* peek at request from block layer */
1457 dout("fetched request\n");
1459 /* filter out block requests we don't understand */
1460 if ((rq->cmd_type != REQ_TYPE_FS)) {
1461 __blk_end_request_all(rq, 0);
1465 /* deduce our operation (read, write) */
1466 do_write = (rq_data_dir(rq) == WRITE);
1468 size = blk_rq_bytes(rq);
1469 ofs = blk_rq_pos(rq) * SECTOR_SIZE;
1471 if (do_write && rbd_dev->read_only) {
1472 __blk_end_request_all(rq, -EROFS);
1476 spin_unlock_irq(q->queue_lock);
1478 dout("%s 0x%x bytes at 0x%llx\n",
1479 do_write ? "write" : "read",
1480 size, blk_rq_pos(rq) * SECTOR_SIZE);
1482 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1483 coll = rbd_alloc_coll(num_segs);
1485 spin_lock_irq(q->queue_lock);
1486 __blk_end_request_all(rq, -ENOMEM);
1491 /* a bio clone to be passed down to OSD req */
1492 dout("rq->bio->bi_vcnt=%d\n", rq->bio->bi_vcnt);
1493 op_size = rbd_get_segment(&rbd_dev->header,
1494 rbd_dev->header.object_prefix,
1497 kref_get(&coll->kref);
1498 bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1499 op_size, GFP_ATOMIC);
1501 rbd_coll_end_req_index(rq, coll, cur_seg,
1507 /* init OSD command: write or read */
1509 rbd_req_write(rq, rbd_dev,
1510 rbd_dev->header.snapc,
1515 rbd_req_read(rq, rbd_dev,
1528 kref_put(&coll->kref, rbd_coll_release);
1531 bio_pair_release(bp);
1532 spin_lock_irq(q->queue_lock);
1537 * a queue callback. Makes sure that we don't create a bio that spans across
1538 * multiple osd objects. One exception would be with a single page bios,
1539 * which we handle later at bio_chain_clone
1541 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1542 struct bio_vec *bvec)
1544 struct rbd_device *rbd_dev = q->queuedata;
1545 unsigned int chunk_sectors;
1547 unsigned int bio_sectors;
1550 chunk_sectors = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
1551 sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1552 bio_sectors = bmd->bi_size >> SECTOR_SHIFT;
1554 max = (chunk_sectors - ((sector & (chunk_sectors - 1))
1555 + bio_sectors)) << SECTOR_SHIFT;
1557 max = 0; /* bio_add cannot handle a negative return */
1558 if (max <= bvec->bv_len && bio_sectors == 0)
1559 return bvec->bv_len;
1563 static void rbd_free_disk(struct rbd_device *rbd_dev)
1565 struct gendisk *disk = rbd_dev->disk;
1570 rbd_header_free(&rbd_dev->header);
1572 if (disk->flags & GENHD_FL_UP)
1575 blk_cleanup_queue(disk->queue);
1580 * reload the ondisk the header
1582 static int rbd_read_header(struct rbd_device *rbd_dev,
1583 struct rbd_image_header *header)
1586 struct rbd_image_header_ondisk *dh;
1592 * First reads the fixed-size header to determine the number
1593 * of snapshots, then re-reads it, along with all snapshot
1594 * records as well as their stored names.
1598 dh = kmalloc(len, GFP_KERNEL);
1602 rc = rbd_req_sync_read(rbd_dev,
1604 rbd_dev->header_name,
1610 rc = rbd_header_from_disk(header, dh, snap_count, GFP_KERNEL);
1613 pr_warning("unrecognized header format"
1615 rbd_dev->image_name);
1619 if (snap_count == header->total_snaps)
1622 snap_count = header->total_snaps;
1623 len = sizeof (*dh) +
1624 snap_count * sizeof(struct rbd_image_snap_ondisk) +
1625 header->snap_names_len;
1627 rbd_header_free(header);
1630 header->obj_version = ver;
1640 static int rbd_header_add_snap(struct rbd_device *rbd_dev,
1641 const char *snap_name,
1644 int name_len = strlen(snap_name);
1649 struct ceph_mon_client *monc;
1651 /* we should create a snapshot only if we're pointing at the head */
1652 if (rbd_dev->snap_id != CEPH_NOSNAP)
1655 monc = &rbd_dev->rbd_client->client->monc;
1656 ret = ceph_monc_create_snapid(monc, rbd_dev->pool_id, &new_snapid);
1657 dout("created snapid=%lld\n", new_snapid);
1661 data = kmalloc(name_len + 16, gfp_flags);
1666 e = data + name_len + 16;
1668 ceph_encode_string_safe(&p, e, snap_name, name_len, bad);
1669 ceph_encode_64_safe(&p, e, new_snapid, bad);
1671 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
1673 data, p - data, &ver);
1680 down_write(&rbd_dev->header_rwsem);
1681 rbd_dev->header.snapc->seq = new_snapid;
1682 up_write(&rbd_dev->header_rwsem);
1689 static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1691 struct rbd_snap *snap;
1693 while (!list_empty(&rbd_dev->snaps)) {
1694 snap = list_first_entry(&rbd_dev->snaps, struct rbd_snap, node);
1695 __rbd_remove_snap_dev(rbd_dev, snap);
1700 * only read the first part of the ondisk header, without the snaps info
1702 static int __rbd_refresh_header(struct rbd_device *rbd_dev)
1705 struct rbd_image_header h;
1709 ret = rbd_read_header(rbd_dev, &h);
1714 set_capacity(rbd_dev->disk, h.image_size / SECTOR_SIZE);
1716 down_write(&rbd_dev->header_rwsem);
1718 snap_seq = rbd_dev->header.snapc->seq;
1719 if (rbd_dev->header.total_snaps &&
1720 rbd_dev->header.snapc->snaps[0] == snap_seq)
1721 /* pointing at the head, will need to follow that
1725 /* rbd_dev->header.object_prefix shouldn't change */
1726 kfree(rbd_dev->header.snap_sizes);
1727 kfree(rbd_dev->header.snap_names);
1728 kfree(rbd_dev->header.snapc);
1730 rbd_dev->header.total_snaps = h.total_snaps;
1731 rbd_dev->header.snapc = h.snapc;
1732 rbd_dev->header.snap_names = h.snap_names;
1733 rbd_dev->header.snap_names_len = h.snap_names_len;
1734 rbd_dev->header.snap_sizes = h.snap_sizes;
1735 /* Free the extra copy of the object prefix */
1736 WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
1737 kfree(h.object_prefix);
1740 rbd_dev->header.snapc->seq = rbd_dev->header.snapc->snaps[0];
1742 rbd_dev->header.snapc->seq = snap_seq;
1744 ret = __rbd_init_snaps_header(rbd_dev);
1746 up_write(&rbd_dev->header_rwsem);
1751 static int rbd_init_disk(struct rbd_device *rbd_dev)
1753 struct gendisk *disk;
1754 struct request_queue *q;
1759 /* contact OSD, request size info about the object being mapped */
1760 rc = rbd_read_header(rbd_dev, &rbd_dev->header);
1764 /* no need to lock here, as rbd_dev is not registered yet */
1765 rc = __rbd_init_snaps_header(rbd_dev);
1769 rc = rbd_header_set_snap(rbd_dev, &total_size);
1773 /* create gendisk info */
1775 disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1779 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
1781 disk->major = rbd_dev->major;
1782 disk->first_minor = 0;
1783 disk->fops = &rbd_bd_ops;
1784 disk->private_data = rbd_dev;
1788 q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1792 /* We use the default size, but let's be explicit about it. */
1793 blk_queue_physical_block_size(q, SECTOR_SIZE);
1795 /* set io sizes to object size */
1796 segment_size = rbd_obj_bytes(&rbd_dev->header);
1797 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
1798 blk_queue_max_segment_size(q, segment_size);
1799 blk_queue_io_min(q, segment_size);
1800 blk_queue_io_opt(q, segment_size);
1802 blk_queue_merge_bvec(q, rbd_merge_bvec);
1805 q->queuedata = rbd_dev;
1807 rbd_dev->disk = disk;
1810 /* finally, announce the disk to the world */
1811 set_capacity(disk, total_size / SECTOR_SIZE);
1814 pr_info("%s: added with size 0x%llx\n",
1815 disk->disk_name, (unsigned long long)total_size);
1828 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
1830 return container_of(dev, struct rbd_device, dev);
1833 static ssize_t rbd_size_show(struct device *dev,
1834 struct device_attribute *attr, char *buf)
1836 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1838 return sprintf(buf, "%llu\n", (unsigned long long)rbd_dev->header.image_size);
1841 static ssize_t rbd_major_show(struct device *dev,
1842 struct device_attribute *attr, char *buf)
1844 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1846 return sprintf(buf, "%d\n", rbd_dev->major);
1849 static ssize_t rbd_client_id_show(struct device *dev,
1850 struct device_attribute *attr, char *buf)
1852 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1854 return sprintf(buf, "client%lld\n",
1855 ceph_client_id(rbd_dev->rbd_client->client));
1858 static ssize_t rbd_pool_show(struct device *dev,
1859 struct device_attribute *attr, char *buf)
1861 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1863 return sprintf(buf, "%s\n", rbd_dev->pool_name);
1866 static ssize_t rbd_pool_id_show(struct device *dev,
1867 struct device_attribute *attr, char *buf)
1869 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1871 return sprintf(buf, "%d\n", rbd_dev->pool_id);
1874 static ssize_t rbd_name_show(struct device *dev,
1875 struct device_attribute *attr, char *buf)
1877 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1879 return sprintf(buf, "%s\n", rbd_dev->image_name);
1882 static ssize_t rbd_snap_show(struct device *dev,
1883 struct device_attribute *attr,
1886 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1888 return sprintf(buf, "%s\n", rbd_dev->snap_name);
1891 static ssize_t rbd_image_refresh(struct device *dev,
1892 struct device_attribute *attr,
1896 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1900 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1902 rc = __rbd_refresh_header(rbd_dev);
1906 mutex_unlock(&ctl_mutex);
1910 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
1911 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
1912 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
1913 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
1914 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
1915 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
1916 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
1917 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
1918 static DEVICE_ATTR(create_snap, S_IWUSR, NULL, rbd_snap_add);
1920 static struct attribute *rbd_attrs[] = {
1921 &dev_attr_size.attr,
1922 &dev_attr_major.attr,
1923 &dev_attr_client_id.attr,
1924 &dev_attr_pool.attr,
1925 &dev_attr_pool_id.attr,
1926 &dev_attr_name.attr,
1927 &dev_attr_current_snap.attr,
1928 &dev_attr_refresh.attr,
1929 &dev_attr_create_snap.attr,
1933 static struct attribute_group rbd_attr_group = {
1937 static const struct attribute_group *rbd_attr_groups[] = {
1942 static void rbd_sysfs_dev_release(struct device *dev)
1946 static struct device_type rbd_device_type = {
1948 .groups = rbd_attr_groups,
1949 .release = rbd_sysfs_dev_release,
1957 static ssize_t rbd_snap_size_show(struct device *dev,
1958 struct device_attribute *attr,
1961 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1963 return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
1966 static ssize_t rbd_snap_id_show(struct device *dev,
1967 struct device_attribute *attr,
1970 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1972 return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
1975 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
1976 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
1978 static struct attribute *rbd_snap_attrs[] = {
1979 &dev_attr_snap_size.attr,
1980 &dev_attr_snap_id.attr,
1984 static struct attribute_group rbd_snap_attr_group = {
1985 .attrs = rbd_snap_attrs,
1988 static void rbd_snap_dev_release(struct device *dev)
1990 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1995 static const struct attribute_group *rbd_snap_attr_groups[] = {
1996 &rbd_snap_attr_group,
2000 static struct device_type rbd_snap_device_type = {
2001 .groups = rbd_snap_attr_groups,
2002 .release = rbd_snap_dev_release,
2005 static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
2006 struct rbd_snap *snap)
2008 list_del(&snap->node);
2009 device_unregister(&snap->dev);
2012 static int rbd_register_snap_dev(struct rbd_device *rbd_dev,
2013 struct rbd_snap *snap,
2014 struct device *parent)
2016 struct device *dev = &snap->dev;
2019 dev->type = &rbd_snap_device_type;
2020 dev->parent = parent;
2021 dev->release = rbd_snap_dev_release;
2022 dev_set_name(dev, "snap_%s", snap->name);
2023 ret = device_register(dev);
2028 static int __rbd_add_snap_dev(struct rbd_device *rbd_dev,
2029 int i, const char *name,
2030 struct rbd_snap **snapp)
2033 struct rbd_snap *snap = kzalloc(sizeof(*snap), GFP_KERNEL);
2036 snap->name = kstrdup(name, GFP_KERNEL);
2037 snap->size = rbd_dev->header.snap_sizes[i];
2038 snap->id = rbd_dev->header.snapc->snaps[i];
2039 if (device_is_registered(&rbd_dev->dev)) {
2040 ret = rbd_register_snap_dev(rbd_dev, snap,
2054 * search for the previous snap in a null delimited string list
2056 const char *rbd_prev_snap_name(const char *name, const char *start)
2058 if (name < start + 2)
2071 * compare the old list of snapshots that we have to what's in the header
2072 * and update it accordingly. Note that the header holds the snapshots
2073 * in a reverse order (from newest to oldest) and we need to go from
2074 * older to new so that we don't get a duplicate snap name when
2075 * doing the process (e.g., removed snapshot and recreated a new
2076 * one with the same name.
2078 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev)
2080 const char *name, *first_name;
2081 int i = rbd_dev->header.total_snaps;
2082 struct rbd_snap *snap, *old_snap = NULL;
2084 struct list_head *p, *n;
2086 first_name = rbd_dev->header.snap_names;
2087 name = first_name + rbd_dev->header.snap_names_len;
2089 list_for_each_prev_safe(p, n, &rbd_dev->snaps) {
2092 old_snap = list_entry(p, struct rbd_snap, node);
2095 cur_id = rbd_dev->header.snapc->snaps[i - 1];
2097 if (!i || old_snap->id < cur_id) {
2098 /* old_snap->id was skipped, thus was removed */
2099 __rbd_remove_snap_dev(rbd_dev, old_snap);
2102 if (old_snap->id == cur_id) {
2103 /* we have this snapshot already */
2105 name = rbd_prev_snap_name(name, first_name);
2109 i--, name = rbd_prev_snap_name(name, first_name)) {
2114 cur_id = rbd_dev->header.snapc->snaps[i];
2115 /* snapshot removal? handle it above */
2116 if (cur_id >= old_snap->id)
2118 /* a new snapshot */
2119 ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
2123 /* note that we add it backward so using n and not p */
2124 list_add(&snap->node, n);
2128 /* we're done going over the old snap list, just add what's left */
2129 for (; i > 0; i--) {
2130 name = rbd_prev_snap_name(name, first_name);
2135 ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
2138 list_add(&snap->node, &rbd_dev->snaps);
2144 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2148 struct rbd_snap *snap;
2150 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2151 dev = &rbd_dev->dev;
2153 dev->bus = &rbd_bus_type;
2154 dev->type = &rbd_device_type;
2155 dev->parent = &rbd_root_dev;
2156 dev->release = rbd_dev_release;
2157 dev_set_name(dev, "%d", rbd_dev->id);
2158 ret = device_register(dev);
2162 list_for_each_entry(snap, &rbd_dev->snaps, node) {
2163 ret = rbd_register_snap_dev(rbd_dev, snap,
2169 mutex_unlock(&ctl_mutex);
2173 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2175 device_unregister(&rbd_dev->dev);
2178 static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2183 ret = rbd_req_sync_watch(rbd_dev, rbd_dev->header_name,
2184 rbd_dev->header.obj_version);
2185 if (ret == -ERANGE) {
2186 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2187 rc = __rbd_refresh_header(rbd_dev);
2188 mutex_unlock(&ctl_mutex);
2192 } while (ret == -ERANGE);
2197 static atomic64_t rbd_id_max = ATOMIC64_INIT(0);
2200 * Get a unique rbd identifier for the given new rbd_dev, and add
2201 * the rbd_dev to the global list. The minimum rbd id is 1.
2203 static void rbd_id_get(struct rbd_device *rbd_dev)
2205 rbd_dev->id = atomic64_inc_return(&rbd_id_max);
2207 spin_lock(&rbd_dev_list_lock);
2208 list_add_tail(&rbd_dev->node, &rbd_dev_list);
2209 spin_unlock(&rbd_dev_list_lock);
2213 * Remove an rbd_dev from the global list, and record that its
2214 * identifier is no longer in use.
2216 static void rbd_id_put(struct rbd_device *rbd_dev)
2218 struct list_head *tmp;
2219 int rbd_id = rbd_dev->id;
2224 spin_lock(&rbd_dev_list_lock);
2225 list_del_init(&rbd_dev->node);
2228 * If the id being "put" is not the current maximum, there
2229 * is nothing special we need to do.
2231 if (rbd_id != atomic64_read(&rbd_id_max)) {
2232 spin_unlock(&rbd_dev_list_lock);
2237 * We need to update the current maximum id. Search the
2238 * list to find out what it is. We're more likely to find
2239 * the maximum at the end, so search the list backward.
2242 list_for_each_prev(tmp, &rbd_dev_list) {
2243 struct rbd_device *rbd_dev;
2245 rbd_dev = list_entry(tmp, struct rbd_device, node);
2246 if (rbd_id > max_id)
2249 spin_unlock(&rbd_dev_list_lock);
2252 * The max id could have been updated by rbd_id_get(), in
2253 * which case it now accurately reflects the new maximum.
2254 * Be careful not to overwrite the maximum value in that
2257 atomic64_cmpxchg(&rbd_id_max, rbd_id, max_id);
2261 * Skips over white space at *buf, and updates *buf to point to the
2262 * first found non-space character (if any). Returns the length of
2263 * the token (string of non-white space characters) found. Note
2264 * that *buf must be terminated with '\0'.
2266 static inline size_t next_token(const char **buf)
2269 * These are the characters that produce nonzero for
2270 * isspace() in the "C" and "POSIX" locales.
2272 const char *spaces = " \f\n\r\t\v";
2274 *buf += strspn(*buf, spaces); /* Find start of token */
2276 return strcspn(*buf, spaces); /* Return token length */
2280 * Finds the next token in *buf, and if the provided token buffer is
2281 * big enough, copies the found token into it. The result, if
2282 * copied, is guaranteed to be terminated with '\0'. Note that *buf
2283 * must be terminated with '\0' on entry.
2285 * Returns the length of the token found (not including the '\0').
2286 * Return value will be 0 if no token is found, and it will be >=
2287 * token_size if the token would not fit.
2289 * The *buf pointer will be updated to point beyond the end of the
2290 * found token. Note that this occurs even if the token buffer is
2291 * too small to hold it.
2293 static inline size_t copy_token(const char **buf,
2299 len = next_token(buf);
2300 if (len < token_size) {
2301 memcpy(token, *buf, len);
2302 *(token + len) = '\0';
2310 * Finds the next token in *buf, dynamically allocates a buffer big
2311 * enough to hold a copy of it, and copies the token into the new
2312 * buffer. The copy is guaranteed to be terminated with '\0'. Note
2313 * that a duplicate buffer is created even for a zero-length token.
2315 * Returns a pointer to the newly-allocated duplicate, or a null
2316 * pointer if memory for the duplicate was not available. If
2317 * the lenp argument is a non-null pointer, the length of the token
2318 * (not including the '\0') is returned in *lenp.
2320 * If successful, the *buf pointer will be updated to point beyond
2321 * the end of the found token.
2323 * Note: uses GFP_KERNEL for allocation.
2325 static inline char *dup_token(const char **buf, size_t *lenp)
2330 len = next_token(buf);
2331 dup = kmalloc(len + 1, GFP_KERNEL);
2335 memcpy(dup, *buf, len);
2336 *(dup + len) = '\0';
2346 * This fills in the pool_name, image_name, image_name_len, snap_name,
2347 * rbd_dev, rbd_md_name, and name fields of the given rbd_dev, based
2348 * on the list of monitor addresses and other options provided via
2351 * Note: rbd_dev is assumed to have been initially zero-filled.
2353 static int rbd_add_parse_args(struct rbd_device *rbd_dev,
2355 const char **mon_addrs,
2356 size_t *mon_addrs_size,
2358 size_t options_size)
2363 /* The first four tokens are required */
2365 len = next_token(&buf);
2368 *mon_addrs_size = len + 1;
2373 len = copy_token(&buf, options, options_size);
2374 if (!len || len >= options_size)
2378 rbd_dev->pool_name = dup_token(&buf, NULL);
2379 if (!rbd_dev->pool_name)
2382 rbd_dev->image_name = dup_token(&buf, &rbd_dev->image_name_len);
2383 if (!rbd_dev->image_name)
2386 /* Create the name of the header object */
2388 rbd_dev->header_name = kmalloc(rbd_dev->image_name_len
2389 + sizeof (RBD_SUFFIX),
2391 if (!rbd_dev->header_name)
2393 sprintf(rbd_dev->header_name, "%s%s", rbd_dev->image_name, RBD_SUFFIX);
2396 * The snapshot name is optional. If none is is supplied,
2397 * we use the default value.
2399 rbd_dev->snap_name = dup_token(&buf, &len);
2400 if (!rbd_dev->snap_name)
2403 /* Replace the empty name with the default */
2404 kfree(rbd_dev->snap_name);
2406 = kmalloc(sizeof (RBD_SNAP_HEAD_NAME), GFP_KERNEL);
2407 if (!rbd_dev->snap_name)
2410 memcpy(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
2411 sizeof (RBD_SNAP_HEAD_NAME));
2417 kfree(rbd_dev->header_name);
2418 kfree(rbd_dev->image_name);
2419 kfree(rbd_dev->pool_name);
2420 rbd_dev->pool_name = NULL;
2425 static ssize_t rbd_add(struct bus_type *bus,
2430 struct rbd_device *rbd_dev = NULL;
2431 const char *mon_addrs = NULL;
2432 size_t mon_addrs_size = 0;
2433 struct ceph_osd_client *osdc;
2436 if (!try_module_get(THIS_MODULE))
2439 options = kmalloc(count, GFP_KERNEL);
2442 rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
2446 /* static rbd_device initialization */
2447 spin_lock_init(&rbd_dev->lock);
2448 INIT_LIST_HEAD(&rbd_dev->node);
2449 INIT_LIST_HEAD(&rbd_dev->snaps);
2450 init_rwsem(&rbd_dev->header_rwsem);
2452 init_rwsem(&rbd_dev->header_rwsem);
2454 /* generate unique id: find highest unique id, add one */
2455 rbd_id_get(rbd_dev);
2457 /* Fill in the device name, now that we have its id. */
2458 BUILD_BUG_ON(DEV_NAME_LEN
2459 < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
2460 sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->id);
2462 /* parse add command */
2463 rc = rbd_add_parse_args(rbd_dev, buf, &mon_addrs, &mon_addrs_size,
2468 rbd_dev->rbd_client = rbd_get_client(mon_addrs, mon_addrs_size - 1,
2470 if (IS_ERR(rbd_dev->rbd_client)) {
2471 rc = PTR_ERR(rbd_dev->rbd_client);
2476 osdc = &rbd_dev->rbd_client->client->osdc;
2477 rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
2479 goto err_out_client;
2480 rbd_dev->pool_id = rc;
2482 /* register our block device */
2483 rc = register_blkdev(0, rbd_dev->name);
2485 goto err_out_client;
2486 rbd_dev->major = rc;
2488 rc = rbd_bus_add_dev(rbd_dev);
2490 goto err_out_blkdev;
2493 * At this point cleanup in the event of an error is the job
2494 * of the sysfs code (initiated by rbd_bus_del_dev()).
2496 * Set up and announce blkdev mapping.
2498 rc = rbd_init_disk(rbd_dev);
2502 rc = rbd_init_watch_dev(rbd_dev);
2509 /* this will also clean up rest of rbd_dev stuff */
2511 rbd_bus_del_dev(rbd_dev);
2516 unregister_blkdev(rbd_dev->major, rbd_dev->name);
2518 rbd_put_client(rbd_dev);
2520 if (rbd_dev->pool_name) {
2521 kfree(rbd_dev->snap_name);
2522 kfree(rbd_dev->header_name);
2523 kfree(rbd_dev->image_name);
2524 kfree(rbd_dev->pool_name);
2526 rbd_id_put(rbd_dev);
2531 dout("Error adding device %s\n", buf);
2532 module_put(THIS_MODULE);
2534 return (ssize_t) rc;
2537 static struct rbd_device *__rbd_get_dev(unsigned long id)
2539 struct list_head *tmp;
2540 struct rbd_device *rbd_dev;
2542 spin_lock(&rbd_dev_list_lock);
2543 list_for_each(tmp, &rbd_dev_list) {
2544 rbd_dev = list_entry(tmp, struct rbd_device, node);
2545 if (rbd_dev->id == id) {
2546 spin_unlock(&rbd_dev_list_lock);
2550 spin_unlock(&rbd_dev_list_lock);
2554 static void rbd_dev_release(struct device *dev)
2556 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2558 if (rbd_dev->watch_request) {
2559 struct ceph_client *client = rbd_dev->rbd_client->client;
2561 ceph_osdc_unregister_linger_request(&client->osdc,
2562 rbd_dev->watch_request);
2564 if (rbd_dev->watch_event)
2565 rbd_req_sync_unwatch(rbd_dev, rbd_dev->header_name);
2567 rbd_put_client(rbd_dev);
2569 /* clean up and free blkdev */
2570 rbd_free_disk(rbd_dev);
2571 unregister_blkdev(rbd_dev->major, rbd_dev->name);
2573 /* done with the id, and with the rbd_dev */
2574 kfree(rbd_dev->snap_name);
2575 kfree(rbd_dev->header_name);
2576 kfree(rbd_dev->pool_name);
2577 kfree(rbd_dev->image_name);
2578 rbd_id_put(rbd_dev);
2581 /* release module ref */
2582 module_put(THIS_MODULE);
2585 static ssize_t rbd_remove(struct bus_type *bus,
2589 struct rbd_device *rbd_dev = NULL;
2594 rc = strict_strtoul(buf, 10, &ul);
2598 /* convert to int; abort if we lost anything in the conversion */
2599 target_id = (int) ul;
2600 if (target_id != ul)
2603 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2605 rbd_dev = __rbd_get_dev(target_id);
2611 __rbd_remove_all_snaps(rbd_dev);
2612 rbd_bus_del_dev(rbd_dev);
2615 mutex_unlock(&ctl_mutex);
2619 static ssize_t rbd_snap_add(struct device *dev,
2620 struct device_attribute *attr,
2624 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2626 char *name = kmalloc(count + 1, GFP_KERNEL);
2630 snprintf(name, count, "%s", buf);
2632 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2634 ret = rbd_header_add_snap(rbd_dev,
2639 ret = __rbd_refresh_header(rbd_dev);
2643 /* shouldn't hold ctl_mutex when notifying.. notify might
2644 trigger a watch callback that would need to get that mutex */
2645 mutex_unlock(&ctl_mutex);
2647 /* make a best effort, don't error if failed */
2648 rbd_req_sync_notify(rbd_dev, rbd_dev->header_name);
2655 mutex_unlock(&ctl_mutex);
2661 * create control files in sysfs
2664 static int rbd_sysfs_init(void)
2668 ret = device_register(&rbd_root_dev);
2672 ret = bus_register(&rbd_bus_type);
2674 device_unregister(&rbd_root_dev);
2679 static void rbd_sysfs_cleanup(void)
2681 bus_unregister(&rbd_bus_type);
2682 device_unregister(&rbd_root_dev);
2685 int __init rbd_init(void)
2689 rc = rbd_sysfs_init();
2692 pr_info("loaded " RBD_DRV_NAME_LONG "\n");
2696 void __exit rbd_exit(void)
2698 rbd_sysfs_cleanup();
2701 module_init(rbd_init);
2702 module_exit(rbd_exit);
2704 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
2705 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
2706 MODULE_DESCRIPTION("rados block device");
2708 /* following authorship retained from original osdblk.c */
2709 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
2711 MODULE_LICENSE("GPL");