OSDN Git Service

Merge tag 'perf-urgent-2023-09-10' of git://git.kernel.org/pub/scm/linux/kernel/git...
[tomoyo/tomoyo-test1.git] / drivers / block / rbd.c
1
2 /*
3    rbd.c -- Export ceph rados objects as a Linux block device
4
5
6    based on drivers/block/osdblk.c:
7
8    Copyright 2009 Red Hat, Inc.
9
10    This program is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation.
13
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18
19    You should have received a copy of the GNU General Public License
20    along with this program; see the file COPYING.  If not, write to
21    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
22
23
24
25    For usage instructions, please refer to:
26
27                  Documentation/ABI/testing/sysfs-bus-rbd
28
29  */
30
31 #include <linux/ceph/libceph.h>
32 #include <linux/ceph/osd_client.h>
33 #include <linux/ceph/mon_client.h>
34 #include <linux/ceph/cls_lock_client.h>
35 #include <linux/ceph/striper.h>
36 #include <linux/ceph/decode.h>
37 #include <linux/fs_parser.h>
38 #include <linux/bsearch.h>
39
40 #include <linux/kernel.h>
41 #include <linux/device.h>
42 #include <linux/module.h>
43 #include <linux/blk-mq.h>
44 #include <linux/fs.h>
45 #include <linux/blkdev.h>
46 #include <linux/slab.h>
47 #include <linux/idr.h>
48 #include <linux/workqueue.h>
49
50 #include "rbd_types.h"
51
52 #define RBD_DEBUG       /* Activate rbd_assert() calls */
53
54 /*
55  * Increment the given counter and return its updated value.
56  * If the counter is already 0 it will not be incremented.
57  * If the counter is already at its maximum value returns
58  * -EINVAL without updating it.
59  */
60 static int atomic_inc_return_safe(atomic_t *v)
61 {
62         unsigned int counter;
63
64         counter = (unsigned int)atomic_fetch_add_unless(v, 1, 0);
65         if (counter <= (unsigned int)INT_MAX)
66                 return (int)counter;
67
68         atomic_dec(v);
69
70         return -EINVAL;
71 }
72
73 /* Decrement the counter.  Return the resulting value, or -EINVAL */
74 static int atomic_dec_return_safe(atomic_t *v)
75 {
76         int counter;
77
78         counter = atomic_dec_return(v);
79         if (counter >= 0)
80                 return counter;
81
82         atomic_inc(v);
83
84         return -EINVAL;
85 }
86
87 #define RBD_DRV_NAME "rbd"
88
89 #define RBD_MINORS_PER_MAJOR            256
90 #define RBD_SINGLE_MAJOR_PART_SHIFT     4
91
92 #define RBD_MAX_PARENT_CHAIN_LEN        16
93
94 #define RBD_SNAP_DEV_NAME_PREFIX        "snap_"
95 #define RBD_MAX_SNAP_NAME_LEN   \
96                         (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
97
98 #define RBD_MAX_SNAP_COUNT      510     /* allows max snapc to fit in 4KB */
99
100 #define RBD_SNAP_HEAD_NAME      "-"
101
102 #define BAD_SNAP_INDEX  U32_MAX         /* invalid index into snap array */
103
104 /* This allows a single page to hold an image name sent by OSD */
105 #define RBD_IMAGE_NAME_LEN_MAX  (PAGE_SIZE - sizeof (__le32) - 1)
106 #define RBD_IMAGE_ID_LEN_MAX    64
107
108 #define RBD_OBJ_PREFIX_LEN_MAX  64
109
110 #define RBD_NOTIFY_TIMEOUT      5       /* seconds */
111 #define RBD_RETRY_DELAY         msecs_to_jiffies(1000)
112
113 /* Feature bits */
114
115 #define RBD_FEATURE_LAYERING            (1ULL<<0)
116 #define RBD_FEATURE_STRIPINGV2          (1ULL<<1)
117 #define RBD_FEATURE_EXCLUSIVE_LOCK      (1ULL<<2)
118 #define RBD_FEATURE_OBJECT_MAP          (1ULL<<3)
119 #define RBD_FEATURE_FAST_DIFF           (1ULL<<4)
120 #define RBD_FEATURE_DEEP_FLATTEN        (1ULL<<5)
121 #define RBD_FEATURE_DATA_POOL           (1ULL<<7)
122 #define RBD_FEATURE_OPERATIONS          (1ULL<<8)
123
124 #define RBD_FEATURES_ALL        (RBD_FEATURE_LAYERING |         \
125                                  RBD_FEATURE_STRIPINGV2 |       \
126                                  RBD_FEATURE_EXCLUSIVE_LOCK |   \
127                                  RBD_FEATURE_OBJECT_MAP |       \
128                                  RBD_FEATURE_FAST_DIFF |        \
129                                  RBD_FEATURE_DEEP_FLATTEN |     \
130                                  RBD_FEATURE_DATA_POOL |        \
131                                  RBD_FEATURE_OPERATIONS)
132
133 /* Features supported by this (client software) implementation. */
134
135 #define RBD_FEATURES_SUPPORTED  (RBD_FEATURES_ALL)
136
137 /*
138  * An RBD device name will be "rbd#", where the "rbd" comes from
139  * RBD_DRV_NAME above, and # is a unique integer identifier.
140  */
141 #define DEV_NAME_LEN            32
142
143 /*
144  * block device image metadata (in-memory version)
145  */
146 struct rbd_image_header {
147         /* These six fields never change for a given rbd image */
148         char *object_prefix;
149         __u8 obj_order;
150         u64 stripe_unit;
151         u64 stripe_count;
152         s64 data_pool_id;
153         u64 features;           /* Might be changeable someday? */
154
155         /* The remaining fields need to be updated occasionally */
156         u64 image_size;
157         struct ceph_snap_context *snapc;
158         char *snap_names;       /* format 1 only */
159         u64 *snap_sizes;        /* format 1 only */
160 };
161
162 /*
163  * An rbd image specification.
164  *
165  * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
166  * identify an image.  Each rbd_dev structure includes a pointer to
167  * an rbd_spec structure that encapsulates this identity.
168  *
169  * Each of the id's in an rbd_spec has an associated name.  For a
170  * user-mapped image, the names are supplied and the id's associated
171  * with them are looked up.  For a layered image, a parent image is
172  * defined by the tuple, and the names are looked up.
173  *
174  * An rbd_dev structure contains a parent_spec pointer which is
175  * non-null if the image it represents is a child in a layered
176  * image.  This pointer will refer to the rbd_spec structure used
177  * by the parent rbd_dev for its own identity (i.e., the structure
178  * is shared between the parent and child).
179  *
180  * Since these structures are populated once, during the discovery
181  * phase of image construction, they are effectively immutable so
182  * we make no effort to synchronize access to them.
183  *
184  * Note that code herein does not assume the image name is known (it
185  * could be a null pointer).
186  */
187 struct rbd_spec {
188         u64             pool_id;
189         const char      *pool_name;
190         const char      *pool_ns;       /* NULL if default, never "" */
191
192         const char      *image_id;
193         const char      *image_name;
194
195         u64             snap_id;
196         const char      *snap_name;
197
198         struct kref     kref;
199 };
200
201 /*
202  * an instance of the client.  multiple devices may share an rbd client.
203  */
204 struct rbd_client {
205         struct ceph_client      *client;
206         struct kref             kref;
207         struct list_head        node;
208 };
209
210 struct pending_result {
211         int                     result;         /* first nonzero result */
212         int                     num_pending;
213 };
214
215 struct rbd_img_request;
216
217 enum obj_request_type {
218         OBJ_REQUEST_NODATA = 1,
219         OBJ_REQUEST_BIO,        /* pointer into provided bio (list) */
220         OBJ_REQUEST_BVECS,      /* pointer into provided bio_vec array */
221         OBJ_REQUEST_OWN_BVECS,  /* private bio_vec array, doesn't own pages */
222 };
223
224 enum obj_operation_type {
225         OBJ_OP_READ = 1,
226         OBJ_OP_WRITE,
227         OBJ_OP_DISCARD,
228         OBJ_OP_ZEROOUT,
229 };
230
231 #define RBD_OBJ_FLAG_DELETION                   (1U << 0)
232 #define RBD_OBJ_FLAG_COPYUP_ENABLED             (1U << 1)
233 #define RBD_OBJ_FLAG_COPYUP_ZEROS               (1U << 2)
234 #define RBD_OBJ_FLAG_MAY_EXIST                  (1U << 3)
235 #define RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT       (1U << 4)
236
237 enum rbd_obj_read_state {
238         RBD_OBJ_READ_START = 1,
239         RBD_OBJ_READ_OBJECT,
240         RBD_OBJ_READ_PARENT,
241 };
242
243 /*
244  * Writes go through the following state machine to deal with
245  * layering:
246  *
247  *            . . . . . RBD_OBJ_WRITE_GUARD. . . . . . . . . . . . . .
248  *            .                 |                                    .
249  *            .                 v                                    .
250  *            .    RBD_OBJ_WRITE_READ_FROM_PARENT. . .               .
251  *            .                 |                    .               .
252  *            .                 v                    v (deep-copyup  .
253  *    (image  .   RBD_OBJ_WRITE_COPYUP_EMPTY_SNAPC   .  not needed)  .
254  * flattened) v                 |                    .               .
255  *            .                 v                    .               .
256  *            . . . .RBD_OBJ_WRITE_COPYUP_OPS. . . . .      (copyup  .
257  *                              |                        not needed) v
258  *                              v                                    .
259  *                            done . . . . . . . . . . . . . . . . . .
260  *                              ^
261  *                              |
262  *                     RBD_OBJ_WRITE_FLAT
263  *
264  * Writes start in RBD_OBJ_WRITE_GUARD or _FLAT, depending on whether
265  * assert_exists guard is needed or not (in some cases it's not needed
266  * even if there is a parent).
267  */
268 enum rbd_obj_write_state {
269         RBD_OBJ_WRITE_START = 1,
270         RBD_OBJ_WRITE_PRE_OBJECT_MAP,
271         RBD_OBJ_WRITE_OBJECT,
272         __RBD_OBJ_WRITE_COPYUP,
273         RBD_OBJ_WRITE_COPYUP,
274         RBD_OBJ_WRITE_POST_OBJECT_MAP,
275 };
276
277 enum rbd_obj_copyup_state {
278         RBD_OBJ_COPYUP_START = 1,
279         RBD_OBJ_COPYUP_READ_PARENT,
280         __RBD_OBJ_COPYUP_OBJECT_MAPS,
281         RBD_OBJ_COPYUP_OBJECT_MAPS,
282         __RBD_OBJ_COPYUP_WRITE_OBJECT,
283         RBD_OBJ_COPYUP_WRITE_OBJECT,
284 };
285
286 struct rbd_obj_request {
287         struct ceph_object_extent ex;
288         unsigned int            flags;  /* RBD_OBJ_FLAG_* */
289         union {
290                 enum rbd_obj_read_state  read_state;    /* for reads */
291                 enum rbd_obj_write_state write_state;   /* for writes */
292         };
293
294         struct rbd_img_request  *img_request;
295         struct ceph_file_extent *img_extents;
296         u32                     num_img_extents;
297
298         union {
299                 struct ceph_bio_iter    bio_pos;
300                 struct {
301                         struct ceph_bvec_iter   bvec_pos;
302                         u32                     bvec_count;
303                         u32                     bvec_idx;
304                 };
305         };
306
307         enum rbd_obj_copyup_state copyup_state;
308         struct bio_vec          *copyup_bvecs;
309         u32                     copyup_bvec_count;
310
311         struct list_head        osd_reqs;       /* w/ r_private_item */
312
313         struct mutex            state_mutex;
314         struct pending_result   pending;
315         struct kref             kref;
316 };
317
318 enum img_req_flags {
319         IMG_REQ_CHILD,          /* initiator: block = 0, child image = 1 */
320         IMG_REQ_LAYERED,        /* ENOENT handling: normal = 0, layered = 1 */
321 };
322
323 enum rbd_img_state {
324         RBD_IMG_START = 1,
325         RBD_IMG_EXCLUSIVE_LOCK,
326         __RBD_IMG_OBJECT_REQUESTS,
327         RBD_IMG_OBJECT_REQUESTS,
328 };
329
330 struct rbd_img_request {
331         struct rbd_device       *rbd_dev;
332         enum obj_operation_type op_type;
333         enum obj_request_type   data_type;
334         unsigned long           flags;
335         enum rbd_img_state      state;
336         union {
337                 u64                     snap_id;        /* for reads */
338                 struct ceph_snap_context *snapc;        /* for writes */
339         };
340         struct rbd_obj_request  *obj_request;   /* obj req initiator */
341
342         struct list_head        lock_item;
343         struct list_head        object_extents; /* obj_req.ex structs */
344
345         struct mutex            state_mutex;
346         struct pending_result   pending;
347         struct work_struct      work;
348         int                     work_result;
349 };
350
351 #define for_each_obj_request(ireq, oreq) \
352         list_for_each_entry(oreq, &(ireq)->object_extents, ex.oe_item)
353 #define for_each_obj_request_safe(ireq, oreq, n) \
354         list_for_each_entry_safe(oreq, n, &(ireq)->object_extents, ex.oe_item)
355
356 enum rbd_watch_state {
357         RBD_WATCH_STATE_UNREGISTERED,
358         RBD_WATCH_STATE_REGISTERED,
359         RBD_WATCH_STATE_ERROR,
360 };
361
362 enum rbd_lock_state {
363         RBD_LOCK_STATE_UNLOCKED,
364         RBD_LOCK_STATE_LOCKED,
365         RBD_LOCK_STATE_RELEASING,
366 };
367
368 /* WatchNotify::ClientId */
369 struct rbd_client_id {
370         u64 gid;
371         u64 handle;
372 };
373
374 struct rbd_mapping {
375         u64                     size;
376 };
377
378 /*
379  * a single device
380  */
381 struct rbd_device {
382         int                     dev_id;         /* blkdev unique id */
383
384         int                     major;          /* blkdev assigned major */
385         int                     minor;
386         struct gendisk          *disk;          /* blkdev's gendisk and rq */
387
388         u32                     image_format;   /* Either 1 or 2 */
389         struct rbd_client       *rbd_client;
390
391         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
392
393         spinlock_t              lock;           /* queue, flags, open_count */
394
395         struct rbd_image_header header;
396         unsigned long           flags;          /* possibly lock protected */
397         struct rbd_spec         *spec;
398         struct rbd_options      *opts;
399         char                    *config_info;   /* add{,_single_major} string */
400
401         struct ceph_object_id   header_oid;
402         struct ceph_object_locator header_oloc;
403
404         struct ceph_file_layout layout;         /* used for all rbd requests */
405
406         struct mutex            watch_mutex;
407         enum rbd_watch_state    watch_state;
408         struct ceph_osd_linger_request *watch_handle;
409         u64                     watch_cookie;
410         struct delayed_work     watch_dwork;
411
412         struct rw_semaphore     lock_rwsem;
413         enum rbd_lock_state     lock_state;
414         char                    lock_cookie[32];
415         struct rbd_client_id    owner_cid;
416         struct work_struct      acquired_lock_work;
417         struct work_struct      released_lock_work;
418         struct delayed_work     lock_dwork;
419         struct work_struct      unlock_work;
420         spinlock_t              lock_lists_lock;
421         struct list_head        acquiring_list;
422         struct list_head        running_list;
423         struct completion       acquire_wait;
424         int                     acquire_err;
425         struct completion       releasing_wait;
426
427         spinlock_t              object_map_lock;
428         u8                      *object_map;
429         u64                     object_map_size;        /* in objects */
430         u64                     object_map_flags;
431
432         struct workqueue_struct *task_wq;
433
434         struct rbd_spec         *parent_spec;
435         u64                     parent_overlap;
436         atomic_t                parent_ref;
437         struct rbd_device       *parent;
438
439         /* Block layer tags. */
440         struct blk_mq_tag_set   tag_set;
441
442         /* protects updating the header */
443         struct rw_semaphore     header_rwsem;
444
445         struct rbd_mapping      mapping;
446
447         struct list_head        node;
448
449         /* sysfs related */
450         struct device           dev;
451         unsigned long           open_count;     /* protected by lock */
452 };
453
454 /*
455  * Flag bits for rbd_dev->flags:
456  * - REMOVING (which is coupled with rbd_dev->open_count) is protected
457  *   by rbd_dev->lock
458  */
459 enum rbd_dev_flags {
460         RBD_DEV_FLAG_EXISTS,    /* rbd_dev_device_setup() ran */
461         RBD_DEV_FLAG_REMOVING,  /* this mapping is being removed */
462         RBD_DEV_FLAG_READONLY,  /* -o ro or snapshot */
463 };
464
465 static DEFINE_MUTEX(client_mutex);      /* Serialize client creation */
466
467 static LIST_HEAD(rbd_dev_list);    /* devices */
468 static DEFINE_SPINLOCK(rbd_dev_list_lock);
469
470 static LIST_HEAD(rbd_client_list);              /* clients */
471 static DEFINE_SPINLOCK(rbd_client_list_lock);
472
473 /* Slab caches for frequently-allocated structures */
474
475 static struct kmem_cache        *rbd_img_request_cache;
476 static struct kmem_cache        *rbd_obj_request_cache;
477
478 static int rbd_major;
479 static DEFINE_IDA(rbd_dev_id_ida);
480
481 static struct workqueue_struct *rbd_wq;
482
483 static struct ceph_snap_context rbd_empty_snapc = {
484         .nref = REFCOUNT_INIT(1),
485 };
486
487 /*
488  * single-major requires >= 0.75 version of userspace rbd utility.
489  */
490 static bool single_major = true;
491 module_param(single_major, bool, 0444);
492 MODULE_PARM_DESC(single_major, "Use a single major number for all rbd devices (default: true)");
493
494 static ssize_t add_store(const struct bus_type *bus, const char *buf, size_t count);
495 static ssize_t remove_store(const struct bus_type *bus, const char *buf,
496                             size_t count);
497 static ssize_t add_single_major_store(const struct bus_type *bus, const char *buf,
498                                       size_t count);
499 static ssize_t remove_single_major_store(const struct bus_type *bus, const char *buf,
500                                          size_t count);
501 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth);
502
503 static int rbd_dev_id_to_minor(int dev_id)
504 {
505         return dev_id << RBD_SINGLE_MAJOR_PART_SHIFT;
506 }
507
508 static int minor_to_rbd_dev_id(int minor)
509 {
510         return minor >> RBD_SINGLE_MAJOR_PART_SHIFT;
511 }
512
513 static bool rbd_is_ro(struct rbd_device *rbd_dev)
514 {
515         return test_bit(RBD_DEV_FLAG_READONLY, &rbd_dev->flags);
516 }
517
518 static bool rbd_is_snap(struct rbd_device *rbd_dev)
519 {
520         return rbd_dev->spec->snap_id != CEPH_NOSNAP;
521 }
522
523 static bool __rbd_is_lock_owner(struct rbd_device *rbd_dev)
524 {
525         lockdep_assert_held(&rbd_dev->lock_rwsem);
526
527         return rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED ||
528                rbd_dev->lock_state == RBD_LOCK_STATE_RELEASING;
529 }
530
531 static bool rbd_is_lock_owner(struct rbd_device *rbd_dev)
532 {
533         bool is_lock_owner;
534
535         down_read(&rbd_dev->lock_rwsem);
536         is_lock_owner = __rbd_is_lock_owner(rbd_dev);
537         up_read(&rbd_dev->lock_rwsem);
538         return is_lock_owner;
539 }
540
541 static ssize_t supported_features_show(const struct bus_type *bus, char *buf)
542 {
543         return sprintf(buf, "0x%llx\n", RBD_FEATURES_SUPPORTED);
544 }
545
546 static BUS_ATTR_WO(add);
547 static BUS_ATTR_WO(remove);
548 static BUS_ATTR_WO(add_single_major);
549 static BUS_ATTR_WO(remove_single_major);
550 static BUS_ATTR_RO(supported_features);
551
552 static struct attribute *rbd_bus_attrs[] = {
553         &bus_attr_add.attr,
554         &bus_attr_remove.attr,
555         &bus_attr_add_single_major.attr,
556         &bus_attr_remove_single_major.attr,
557         &bus_attr_supported_features.attr,
558         NULL,
559 };
560
561 static umode_t rbd_bus_is_visible(struct kobject *kobj,
562                                   struct attribute *attr, int index)
563 {
564         if (!single_major &&
565             (attr == &bus_attr_add_single_major.attr ||
566              attr == &bus_attr_remove_single_major.attr))
567                 return 0;
568
569         return attr->mode;
570 }
571
572 static const struct attribute_group rbd_bus_group = {
573         .attrs = rbd_bus_attrs,
574         .is_visible = rbd_bus_is_visible,
575 };
576 __ATTRIBUTE_GROUPS(rbd_bus);
577
578 static struct bus_type rbd_bus_type = {
579         .name           = "rbd",
580         .bus_groups     = rbd_bus_groups,
581 };
582
583 static void rbd_root_dev_release(struct device *dev)
584 {
585 }
586
587 static struct device rbd_root_dev = {
588         .init_name =    "rbd",
589         .release =      rbd_root_dev_release,
590 };
591
592 static __printf(2, 3)
593 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
594 {
595         struct va_format vaf;
596         va_list args;
597
598         va_start(args, fmt);
599         vaf.fmt = fmt;
600         vaf.va = &args;
601
602         if (!rbd_dev)
603                 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
604         else if (rbd_dev->disk)
605                 printk(KERN_WARNING "%s: %s: %pV\n",
606                         RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
607         else if (rbd_dev->spec && rbd_dev->spec->image_name)
608                 printk(KERN_WARNING "%s: image %s: %pV\n",
609                         RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
610         else if (rbd_dev->spec && rbd_dev->spec->image_id)
611                 printk(KERN_WARNING "%s: id %s: %pV\n",
612                         RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
613         else    /* punt */
614                 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
615                         RBD_DRV_NAME, rbd_dev, &vaf);
616         va_end(args);
617 }
618
619 #ifdef RBD_DEBUG
620 #define rbd_assert(expr)                                                \
621                 if (unlikely(!(expr))) {                                \
622                         printk(KERN_ERR "\nAssertion failure in %s() "  \
623                                                 "at line %d:\n\n"       \
624                                         "\trbd_assert(%s);\n\n",        \
625                                         __func__, __LINE__, #expr);     \
626                         BUG();                                          \
627                 }
628 #else /* !RBD_DEBUG */
629 #  define rbd_assert(expr)      ((void) 0)
630 #endif /* !RBD_DEBUG */
631
632 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
633
634 static int rbd_dev_refresh(struct rbd_device *rbd_dev);
635 static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev);
636 static int rbd_dev_header_info(struct rbd_device *rbd_dev);
637 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev);
638 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
639                                         u64 snap_id);
640 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
641                                 u8 *order, u64 *snap_size);
642 static int rbd_dev_v2_get_flags(struct rbd_device *rbd_dev);
643
644 static void rbd_obj_handle_request(struct rbd_obj_request *obj_req, int result);
645 static void rbd_img_handle_request(struct rbd_img_request *img_req, int result);
646
647 /*
648  * Return true if nothing else is pending.
649  */
650 static bool pending_result_dec(struct pending_result *pending, int *result)
651 {
652         rbd_assert(pending->num_pending > 0);
653
654         if (*result && !pending->result)
655                 pending->result = *result;
656         if (--pending->num_pending)
657                 return false;
658
659         *result = pending->result;
660         return true;
661 }
662
663 static int rbd_open(struct gendisk *disk, blk_mode_t mode)
664 {
665         struct rbd_device *rbd_dev = disk->private_data;
666         bool removing = false;
667
668         spin_lock_irq(&rbd_dev->lock);
669         if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
670                 removing = true;
671         else
672                 rbd_dev->open_count++;
673         spin_unlock_irq(&rbd_dev->lock);
674         if (removing)
675                 return -ENOENT;
676
677         (void) get_device(&rbd_dev->dev);
678
679         return 0;
680 }
681
682 static void rbd_release(struct gendisk *disk)
683 {
684         struct rbd_device *rbd_dev = disk->private_data;
685         unsigned long open_count_before;
686
687         spin_lock_irq(&rbd_dev->lock);
688         open_count_before = rbd_dev->open_count--;
689         spin_unlock_irq(&rbd_dev->lock);
690         rbd_assert(open_count_before > 0);
691
692         put_device(&rbd_dev->dev);
693 }
694
695 static const struct block_device_operations rbd_bd_ops = {
696         .owner                  = THIS_MODULE,
697         .open                   = rbd_open,
698         .release                = rbd_release,
699 };
700
701 /*
702  * Initialize an rbd client instance.  Success or not, this function
703  * consumes ceph_opts.  Caller holds client_mutex.
704  */
705 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
706 {
707         struct rbd_client *rbdc;
708         int ret = -ENOMEM;
709
710         dout("%s:\n", __func__);
711         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
712         if (!rbdc)
713                 goto out_opt;
714
715         kref_init(&rbdc->kref);
716         INIT_LIST_HEAD(&rbdc->node);
717
718         rbdc->client = ceph_create_client(ceph_opts, rbdc);
719         if (IS_ERR(rbdc->client))
720                 goto out_rbdc;
721         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
722
723         ret = ceph_open_session(rbdc->client);
724         if (ret < 0)
725                 goto out_client;
726
727         spin_lock(&rbd_client_list_lock);
728         list_add_tail(&rbdc->node, &rbd_client_list);
729         spin_unlock(&rbd_client_list_lock);
730
731         dout("%s: rbdc %p\n", __func__, rbdc);
732
733         return rbdc;
734 out_client:
735         ceph_destroy_client(rbdc->client);
736 out_rbdc:
737         kfree(rbdc);
738 out_opt:
739         if (ceph_opts)
740                 ceph_destroy_options(ceph_opts);
741         dout("%s: error %d\n", __func__, ret);
742
743         return ERR_PTR(ret);
744 }
745
746 static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
747 {
748         kref_get(&rbdc->kref);
749
750         return rbdc;
751 }
752
753 /*
754  * Find a ceph client with specific addr and configuration.  If
755  * found, bump its reference count.
756  */
757 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
758 {
759         struct rbd_client *rbdc = NULL, *iter;
760
761         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
762                 return NULL;
763
764         spin_lock(&rbd_client_list_lock);
765         list_for_each_entry(iter, &rbd_client_list, node) {
766                 if (!ceph_compare_options(ceph_opts, iter->client)) {
767                         __rbd_get_client(iter);
768
769                         rbdc = iter;
770                         break;
771                 }
772         }
773         spin_unlock(&rbd_client_list_lock);
774
775         return rbdc;
776 }
777
778 /*
779  * (Per device) rbd map options
780  */
781 enum {
782         Opt_queue_depth,
783         Opt_alloc_size,
784         Opt_lock_timeout,
785         /* int args above */
786         Opt_pool_ns,
787         Opt_compression_hint,
788         /* string args above */
789         Opt_read_only,
790         Opt_read_write,
791         Opt_lock_on_read,
792         Opt_exclusive,
793         Opt_notrim,
794 };
795
796 enum {
797         Opt_compression_hint_none,
798         Opt_compression_hint_compressible,
799         Opt_compression_hint_incompressible,
800 };
801
802 static const struct constant_table rbd_param_compression_hint[] = {
803         {"none",                Opt_compression_hint_none},
804         {"compressible",        Opt_compression_hint_compressible},
805         {"incompressible",      Opt_compression_hint_incompressible},
806         {}
807 };
808
809 static const struct fs_parameter_spec rbd_parameters[] = {
810         fsparam_u32     ("alloc_size",                  Opt_alloc_size),
811         fsparam_enum    ("compression_hint",            Opt_compression_hint,
812                          rbd_param_compression_hint),
813         fsparam_flag    ("exclusive",                   Opt_exclusive),
814         fsparam_flag    ("lock_on_read",                Opt_lock_on_read),
815         fsparam_u32     ("lock_timeout",                Opt_lock_timeout),
816         fsparam_flag    ("notrim",                      Opt_notrim),
817         fsparam_string  ("_pool_ns",                    Opt_pool_ns),
818         fsparam_u32     ("queue_depth",                 Opt_queue_depth),
819         fsparam_flag    ("read_only",                   Opt_read_only),
820         fsparam_flag    ("read_write",                  Opt_read_write),
821         fsparam_flag    ("ro",                          Opt_read_only),
822         fsparam_flag    ("rw",                          Opt_read_write),
823         {}
824 };
825
826 struct rbd_options {
827         int     queue_depth;
828         int     alloc_size;
829         unsigned long   lock_timeout;
830         bool    read_only;
831         bool    lock_on_read;
832         bool    exclusive;
833         bool    trim;
834
835         u32 alloc_hint_flags;  /* CEPH_OSD_OP_ALLOC_HINT_FLAG_* */
836 };
837
838 #define RBD_QUEUE_DEPTH_DEFAULT BLKDEV_DEFAULT_RQ
839 #define RBD_ALLOC_SIZE_DEFAULT  (64 * 1024)
840 #define RBD_LOCK_TIMEOUT_DEFAULT 0  /* no timeout */
841 #define RBD_READ_ONLY_DEFAULT   false
842 #define RBD_LOCK_ON_READ_DEFAULT false
843 #define RBD_EXCLUSIVE_DEFAULT   false
844 #define RBD_TRIM_DEFAULT        true
845
846 struct rbd_parse_opts_ctx {
847         struct rbd_spec         *spec;
848         struct ceph_options     *copts;
849         struct rbd_options      *opts;
850 };
851
852 static char* obj_op_name(enum obj_operation_type op_type)
853 {
854         switch (op_type) {
855         case OBJ_OP_READ:
856                 return "read";
857         case OBJ_OP_WRITE:
858                 return "write";
859         case OBJ_OP_DISCARD:
860                 return "discard";
861         case OBJ_OP_ZEROOUT:
862                 return "zeroout";
863         default:
864                 return "???";
865         }
866 }
867
868 /*
869  * Destroy ceph client
870  *
871  * Caller must hold rbd_client_list_lock.
872  */
873 static void rbd_client_release(struct kref *kref)
874 {
875         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
876
877         dout("%s: rbdc %p\n", __func__, rbdc);
878         spin_lock(&rbd_client_list_lock);
879         list_del(&rbdc->node);
880         spin_unlock(&rbd_client_list_lock);
881
882         ceph_destroy_client(rbdc->client);
883         kfree(rbdc);
884 }
885
886 /*
887  * Drop reference to ceph client node. If it's not referenced anymore, release
888  * it.
889  */
890 static void rbd_put_client(struct rbd_client *rbdc)
891 {
892         if (rbdc)
893                 kref_put(&rbdc->kref, rbd_client_release);
894 }
895
896 /*
897  * Get a ceph client with specific addr and configuration, if one does
898  * not exist create it.  Either way, ceph_opts is consumed by this
899  * function.
900  */
901 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
902 {
903         struct rbd_client *rbdc;
904         int ret;
905
906         mutex_lock(&client_mutex);
907         rbdc = rbd_client_find(ceph_opts);
908         if (rbdc) {
909                 ceph_destroy_options(ceph_opts);
910
911                 /*
912                  * Using an existing client.  Make sure ->pg_pools is up to
913                  * date before we look up the pool id in do_rbd_add().
914                  */
915                 ret = ceph_wait_for_latest_osdmap(rbdc->client,
916                                         rbdc->client->options->mount_timeout);
917                 if (ret) {
918                         rbd_warn(NULL, "failed to get latest osdmap: %d", ret);
919                         rbd_put_client(rbdc);
920                         rbdc = ERR_PTR(ret);
921                 }
922         } else {
923                 rbdc = rbd_client_create(ceph_opts);
924         }
925         mutex_unlock(&client_mutex);
926
927         return rbdc;
928 }
929
930 static bool rbd_image_format_valid(u32 image_format)
931 {
932         return image_format == 1 || image_format == 2;
933 }
934
935 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
936 {
937         size_t size;
938         u32 snap_count;
939
940         /* The header has to start with the magic rbd header text */
941         if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
942                 return false;
943
944         /* The bio layer requires at least sector-sized I/O */
945
946         if (ondisk->options.order < SECTOR_SHIFT)
947                 return false;
948
949         /* If we use u64 in a few spots we may be able to loosen this */
950
951         if (ondisk->options.order > 8 * sizeof (int) - 1)
952                 return false;
953
954         /*
955          * The size of a snapshot header has to fit in a size_t, and
956          * that limits the number of snapshots.
957          */
958         snap_count = le32_to_cpu(ondisk->snap_count);
959         size = SIZE_MAX - sizeof (struct ceph_snap_context);
960         if (snap_count > size / sizeof (__le64))
961                 return false;
962
963         /*
964          * Not only that, but the size of the entire the snapshot
965          * header must also be representable in a size_t.
966          */
967         size -= snap_count * sizeof (__le64);
968         if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
969                 return false;
970
971         return true;
972 }
973
974 /*
975  * returns the size of an object in the image
976  */
977 static u32 rbd_obj_bytes(struct rbd_image_header *header)
978 {
979         return 1U << header->obj_order;
980 }
981
982 static void rbd_init_layout(struct rbd_device *rbd_dev)
983 {
984         if (rbd_dev->header.stripe_unit == 0 ||
985             rbd_dev->header.stripe_count == 0) {
986                 rbd_dev->header.stripe_unit = rbd_obj_bytes(&rbd_dev->header);
987                 rbd_dev->header.stripe_count = 1;
988         }
989
990         rbd_dev->layout.stripe_unit = rbd_dev->header.stripe_unit;
991         rbd_dev->layout.stripe_count = rbd_dev->header.stripe_count;
992         rbd_dev->layout.object_size = rbd_obj_bytes(&rbd_dev->header);
993         rbd_dev->layout.pool_id = rbd_dev->header.data_pool_id == CEPH_NOPOOL ?
994                           rbd_dev->spec->pool_id : rbd_dev->header.data_pool_id;
995         RCU_INIT_POINTER(rbd_dev->layout.pool_ns, NULL);
996 }
997
998 /*
999  * Fill an rbd image header with information from the given format 1
1000  * on-disk header.
1001  */
1002 static int rbd_header_from_disk(struct rbd_device *rbd_dev,
1003                                  struct rbd_image_header_ondisk *ondisk)
1004 {
1005         struct rbd_image_header *header = &rbd_dev->header;
1006         bool first_time = header->object_prefix == NULL;
1007         struct ceph_snap_context *snapc;
1008         char *object_prefix = NULL;
1009         char *snap_names = NULL;
1010         u64 *snap_sizes = NULL;
1011         u32 snap_count;
1012         int ret = -ENOMEM;
1013         u32 i;
1014
1015         /* Allocate this now to avoid having to handle failure below */
1016
1017         if (first_time) {
1018                 object_prefix = kstrndup(ondisk->object_prefix,
1019                                          sizeof(ondisk->object_prefix),
1020                                          GFP_KERNEL);
1021                 if (!object_prefix)
1022                         return -ENOMEM;
1023         }
1024
1025         /* Allocate the snapshot context and fill it in */
1026
1027         snap_count = le32_to_cpu(ondisk->snap_count);
1028         snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
1029         if (!snapc)
1030                 goto out_err;
1031         snapc->seq = le64_to_cpu(ondisk->snap_seq);
1032         if (snap_count) {
1033                 struct rbd_image_snap_ondisk *snaps;
1034                 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
1035
1036                 /* We'll keep a copy of the snapshot names... */
1037
1038                 if (snap_names_len > (u64)SIZE_MAX)
1039                         goto out_2big;
1040                 snap_names = kmalloc(snap_names_len, GFP_KERNEL);
1041                 if (!snap_names)
1042                         goto out_err;
1043
1044                 /* ...as well as the array of their sizes. */
1045                 snap_sizes = kmalloc_array(snap_count,
1046                                            sizeof(*header->snap_sizes),
1047                                            GFP_KERNEL);
1048                 if (!snap_sizes)
1049                         goto out_err;
1050
1051                 /*
1052                  * Copy the names, and fill in each snapshot's id
1053                  * and size.
1054                  *
1055                  * Note that rbd_dev_v1_header_info() guarantees the
1056                  * ondisk buffer we're working with has
1057                  * snap_names_len bytes beyond the end of the
1058                  * snapshot id array, this memcpy() is safe.
1059                  */
1060                 memcpy(snap_names, &ondisk->snaps[snap_count], snap_names_len);
1061                 snaps = ondisk->snaps;
1062                 for (i = 0; i < snap_count; i++) {
1063                         snapc->snaps[i] = le64_to_cpu(snaps[i].id);
1064                         snap_sizes[i] = le64_to_cpu(snaps[i].image_size);
1065                 }
1066         }
1067
1068         /* We won't fail any more, fill in the header */
1069
1070         if (first_time) {
1071                 header->object_prefix = object_prefix;
1072                 header->obj_order = ondisk->options.order;
1073                 rbd_init_layout(rbd_dev);
1074         } else {
1075                 ceph_put_snap_context(header->snapc);
1076                 kfree(header->snap_names);
1077                 kfree(header->snap_sizes);
1078         }
1079
1080         /* The remaining fields always get updated (when we refresh) */
1081
1082         header->image_size = le64_to_cpu(ondisk->image_size);
1083         header->snapc = snapc;
1084         header->snap_names = snap_names;
1085         header->snap_sizes = snap_sizes;
1086
1087         return 0;
1088 out_2big:
1089         ret = -EIO;
1090 out_err:
1091         kfree(snap_sizes);
1092         kfree(snap_names);
1093         ceph_put_snap_context(snapc);
1094         kfree(object_prefix);
1095
1096         return ret;
1097 }
1098
1099 static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which)
1100 {
1101         const char *snap_name;
1102
1103         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
1104
1105         /* Skip over names until we find the one we are looking for */
1106
1107         snap_name = rbd_dev->header.snap_names;
1108         while (which--)
1109                 snap_name += strlen(snap_name) + 1;
1110
1111         return kstrdup(snap_name, GFP_KERNEL);
1112 }
1113
1114 /*
1115  * Snapshot id comparison function for use with qsort()/bsearch().
1116  * Note that result is for snapshots in *descending* order.
1117  */
1118 static int snapid_compare_reverse(const void *s1, const void *s2)
1119 {
1120         u64 snap_id1 = *(u64 *)s1;
1121         u64 snap_id2 = *(u64 *)s2;
1122
1123         if (snap_id1 < snap_id2)
1124                 return 1;
1125         return snap_id1 == snap_id2 ? 0 : -1;
1126 }
1127
1128 /*
1129  * Search a snapshot context to see if the given snapshot id is
1130  * present.
1131  *
1132  * Returns the position of the snapshot id in the array if it's found,
1133  * or BAD_SNAP_INDEX otherwise.
1134  *
1135  * Note: The snapshot array is in kept sorted (by the osd) in
1136  * reverse order, highest snapshot id first.
1137  */
1138 static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id)
1139 {
1140         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
1141         u64 *found;
1142
1143         found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps,
1144                                 sizeof (snap_id), snapid_compare_reverse);
1145
1146         return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX;
1147 }
1148
1149 static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev,
1150                                         u64 snap_id)
1151 {
1152         u32 which;
1153         const char *snap_name;
1154
1155         which = rbd_dev_snap_index(rbd_dev, snap_id);
1156         if (which == BAD_SNAP_INDEX)
1157                 return ERR_PTR(-ENOENT);
1158
1159         snap_name = _rbd_dev_v1_snap_name(rbd_dev, which);
1160         return snap_name ? snap_name : ERR_PTR(-ENOMEM);
1161 }
1162
1163 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
1164 {
1165         if (snap_id == CEPH_NOSNAP)
1166                 return RBD_SNAP_HEAD_NAME;
1167
1168         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1169         if (rbd_dev->image_format == 1)
1170                 return rbd_dev_v1_snap_name(rbd_dev, snap_id);
1171
1172         return rbd_dev_v2_snap_name(rbd_dev, snap_id);
1173 }
1174
1175 static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
1176                                 u64 *snap_size)
1177 {
1178         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1179         if (snap_id == CEPH_NOSNAP) {
1180                 *snap_size = rbd_dev->header.image_size;
1181         } else if (rbd_dev->image_format == 1) {
1182                 u32 which;
1183
1184                 which = rbd_dev_snap_index(rbd_dev, snap_id);
1185                 if (which == BAD_SNAP_INDEX)
1186                         return -ENOENT;
1187
1188                 *snap_size = rbd_dev->header.snap_sizes[which];
1189         } else {
1190                 u64 size = 0;
1191                 int ret;
1192
1193                 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
1194                 if (ret)
1195                         return ret;
1196
1197                 *snap_size = size;
1198         }
1199         return 0;
1200 }
1201
1202 static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
1203 {
1204         u64 snap_id = rbd_dev->spec->snap_id;
1205         u64 size = 0;
1206         int ret;
1207
1208         ret = rbd_snap_size(rbd_dev, snap_id, &size);
1209         if (ret)
1210                 return ret;
1211
1212         rbd_dev->mapping.size = size;
1213         return 0;
1214 }
1215
1216 static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
1217 {
1218         rbd_dev->mapping.size = 0;
1219 }
1220
1221 static void zero_bios(struct ceph_bio_iter *bio_pos, u32 off, u32 bytes)
1222 {
1223         struct ceph_bio_iter it = *bio_pos;
1224
1225         ceph_bio_iter_advance(&it, off);
1226         ceph_bio_iter_advance_step(&it, bytes, ({
1227                 memzero_bvec(&bv);
1228         }));
1229 }
1230
1231 static void zero_bvecs(struct ceph_bvec_iter *bvec_pos, u32 off, u32 bytes)
1232 {
1233         struct ceph_bvec_iter it = *bvec_pos;
1234
1235         ceph_bvec_iter_advance(&it, off);
1236         ceph_bvec_iter_advance_step(&it, bytes, ({
1237                 memzero_bvec(&bv);
1238         }));
1239 }
1240
1241 /*
1242  * Zero a range in @obj_req data buffer defined by a bio (list) or
1243  * (private) bio_vec array.
1244  *
1245  * @off is relative to the start of the data buffer.
1246  */
1247 static void rbd_obj_zero_range(struct rbd_obj_request *obj_req, u32 off,
1248                                u32 bytes)
1249 {
1250         dout("%s %p data buf %u~%u\n", __func__, obj_req, off, bytes);
1251
1252         switch (obj_req->img_request->data_type) {
1253         case OBJ_REQUEST_BIO:
1254                 zero_bios(&obj_req->bio_pos, off, bytes);
1255                 break;
1256         case OBJ_REQUEST_BVECS:
1257         case OBJ_REQUEST_OWN_BVECS:
1258                 zero_bvecs(&obj_req->bvec_pos, off, bytes);
1259                 break;
1260         default:
1261                 BUG();
1262         }
1263 }
1264
1265 static void rbd_obj_request_destroy(struct kref *kref);
1266 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1267 {
1268         rbd_assert(obj_request != NULL);
1269         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1270                 kref_read(&obj_request->kref));
1271         kref_put(&obj_request->kref, rbd_obj_request_destroy);
1272 }
1273
1274 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1275                                         struct rbd_obj_request *obj_request)
1276 {
1277         rbd_assert(obj_request->img_request == NULL);
1278
1279         /* Image request now owns object's original reference */
1280         obj_request->img_request = img_request;
1281         dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1282 }
1283
1284 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1285                                         struct rbd_obj_request *obj_request)
1286 {
1287         dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1288         list_del(&obj_request->ex.oe_item);
1289         rbd_assert(obj_request->img_request == img_request);
1290         rbd_obj_request_put(obj_request);
1291 }
1292
1293 static void rbd_osd_submit(struct ceph_osd_request *osd_req)
1294 {
1295         struct rbd_obj_request *obj_req = osd_req->r_priv;
1296
1297         dout("%s osd_req %p for obj_req %p objno %llu %llu~%llu\n",
1298              __func__, osd_req, obj_req, obj_req->ex.oe_objno,
1299              obj_req->ex.oe_off, obj_req->ex.oe_len);
1300         ceph_osdc_start_request(osd_req->r_osdc, osd_req);
1301 }
1302
1303 /*
1304  * The default/initial value for all image request flags is 0.  Each
1305  * is conditionally set to 1 at image request initialization time
1306  * and currently never change thereafter.
1307  */
1308 static void img_request_layered_set(struct rbd_img_request *img_request)
1309 {
1310         set_bit(IMG_REQ_LAYERED, &img_request->flags);
1311 }
1312
1313 static bool img_request_layered_test(struct rbd_img_request *img_request)
1314 {
1315         return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1316 }
1317
1318 static bool rbd_obj_is_entire(struct rbd_obj_request *obj_req)
1319 {
1320         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
1321
1322         return !obj_req->ex.oe_off &&
1323                obj_req->ex.oe_len == rbd_dev->layout.object_size;
1324 }
1325
1326 static bool rbd_obj_is_tail(struct rbd_obj_request *obj_req)
1327 {
1328         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
1329
1330         return obj_req->ex.oe_off + obj_req->ex.oe_len ==
1331                                         rbd_dev->layout.object_size;
1332 }
1333
1334 /*
1335  * Must be called after rbd_obj_calc_img_extents().
1336  */
1337 static void rbd_obj_set_copyup_enabled(struct rbd_obj_request *obj_req)
1338 {
1339         rbd_assert(obj_req->img_request->snapc);
1340
1341         if (obj_req->img_request->op_type == OBJ_OP_DISCARD) {
1342                 dout("%s %p objno %llu discard\n", __func__, obj_req,
1343                      obj_req->ex.oe_objno);
1344                 return;
1345         }
1346
1347         if (!obj_req->num_img_extents) {
1348                 dout("%s %p objno %llu not overlapping\n", __func__, obj_req,
1349                      obj_req->ex.oe_objno);
1350                 return;
1351         }
1352
1353         if (rbd_obj_is_entire(obj_req) &&
1354             !obj_req->img_request->snapc->num_snaps) {
1355                 dout("%s %p objno %llu entire\n", __func__, obj_req,
1356                      obj_req->ex.oe_objno);
1357                 return;
1358         }
1359
1360         obj_req->flags |= RBD_OBJ_FLAG_COPYUP_ENABLED;
1361 }
1362
1363 static u64 rbd_obj_img_extents_bytes(struct rbd_obj_request *obj_req)
1364 {
1365         return ceph_file_extents_bytes(obj_req->img_extents,
1366                                        obj_req->num_img_extents);
1367 }
1368
1369 static bool rbd_img_is_write(struct rbd_img_request *img_req)
1370 {
1371         switch (img_req->op_type) {
1372         case OBJ_OP_READ:
1373                 return false;
1374         case OBJ_OP_WRITE:
1375         case OBJ_OP_DISCARD:
1376         case OBJ_OP_ZEROOUT:
1377                 return true;
1378         default:
1379                 BUG();
1380         }
1381 }
1382
1383 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req)
1384 {
1385         struct rbd_obj_request *obj_req = osd_req->r_priv;
1386         int result;
1387
1388         dout("%s osd_req %p result %d for obj_req %p\n", __func__, osd_req,
1389              osd_req->r_result, obj_req);
1390
1391         /*
1392          * Writes aren't allowed to return a data payload.  In some
1393          * guarded write cases (e.g. stat + zero on an empty object)
1394          * a stat response makes it through, but we don't care.
1395          */
1396         if (osd_req->r_result > 0 && rbd_img_is_write(obj_req->img_request))
1397                 result = 0;
1398         else
1399                 result = osd_req->r_result;
1400
1401         rbd_obj_handle_request(obj_req, result);
1402 }
1403
1404 static void rbd_osd_format_read(struct ceph_osd_request *osd_req)
1405 {
1406         struct rbd_obj_request *obj_request = osd_req->r_priv;
1407         struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev;
1408         struct ceph_options *opt = rbd_dev->rbd_client->client->options;
1409
1410         osd_req->r_flags = CEPH_OSD_FLAG_READ | opt->read_from_replica;
1411         osd_req->r_snapid = obj_request->img_request->snap_id;
1412 }
1413
1414 static void rbd_osd_format_write(struct ceph_osd_request *osd_req)
1415 {
1416         struct rbd_obj_request *obj_request = osd_req->r_priv;
1417
1418         osd_req->r_flags = CEPH_OSD_FLAG_WRITE;
1419         ktime_get_real_ts64(&osd_req->r_mtime);
1420         osd_req->r_data_offset = obj_request->ex.oe_off;
1421 }
1422
1423 static struct ceph_osd_request *
1424 __rbd_obj_add_osd_request(struct rbd_obj_request *obj_req,
1425                           struct ceph_snap_context *snapc, int num_ops)
1426 {
1427         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
1428         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1429         struct ceph_osd_request *req;
1430         const char *name_format = rbd_dev->image_format == 1 ?
1431                                       RBD_V1_DATA_FORMAT : RBD_V2_DATA_FORMAT;
1432         int ret;
1433
1434         req = ceph_osdc_alloc_request(osdc, snapc, num_ops, false, GFP_NOIO);
1435         if (!req)
1436                 return ERR_PTR(-ENOMEM);
1437
1438         list_add_tail(&req->r_private_item, &obj_req->osd_reqs);
1439         req->r_callback = rbd_osd_req_callback;
1440         req->r_priv = obj_req;
1441
1442         /*
1443          * Data objects may be stored in a separate pool, but always in
1444          * the same namespace in that pool as the header in its pool.
1445          */
1446         ceph_oloc_copy(&req->r_base_oloc, &rbd_dev->header_oloc);
1447         req->r_base_oloc.pool = rbd_dev->layout.pool_id;
1448
1449         ret = ceph_oid_aprintf(&req->r_base_oid, GFP_NOIO, name_format,
1450                                rbd_dev->header.object_prefix,
1451                                obj_req->ex.oe_objno);
1452         if (ret)
1453                 return ERR_PTR(ret);
1454
1455         return req;
1456 }
1457
1458 static struct ceph_osd_request *
1459 rbd_obj_add_osd_request(struct rbd_obj_request *obj_req, int num_ops)
1460 {
1461         rbd_assert(obj_req->img_request->snapc);
1462         return __rbd_obj_add_osd_request(obj_req, obj_req->img_request->snapc,
1463                                          num_ops);
1464 }
1465
1466 static struct rbd_obj_request *rbd_obj_request_create(void)
1467 {
1468         struct rbd_obj_request *obj_request;
1469
1470         obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_NOIO);
1471         if (!obj_request)
1472                 return NULL;
1473
1474         ceph_object_extent_init(&obj_request->ex);
1475         INIT_LIST_HEAD(&obj_request->osd_reqs);
1476         mutex_init(&obj_request->state_mutex);
1477         kref_init(&obj_request->kref);
1478
1479         dout("%s %p\n", __func__, obj_request);
1480         return obj_request;
1481 }
1482
1483 static void rbd_obj_request_destroy(struct kref *kref)
1484 {
1485         struct rbd_obj_request *obj_request;
1486         struct ceph_osd_request *osd_req;
1487         u32 i;
1488
1489         obj_request = container_of(kref, struct rbd_obj_request, kref);
1490
1491         dout("%s: obj %p\n", __func__, obj_request);
1492
1493         while (!list_empty(&obj_request->osd_reqs)) {
1494                 osd_req = list_first_entry(&obj_request->osd_reqs,
1495                                     struct ceph_osd_request, r_private_item);
1496                 list_del_init(&osd_req->r_private_item);
1497                 ceph_osdc_put_request(osd_req);
1498         }
1499
1500         switch (obj_request->img_request->data_type) {
1501         case OBJ_REQUEST_NODATA:
1502         case OBJ_REQUEST_BIO:
1503         case OBJ_REQUEST_BVECS:
1504                 break;          /* Nothing to do */
1505         case OBJ_REQUEST_OWN_BVECS:
1506                 kfree(obj_request->bvec_pos.bvecs);
1507                 break;
1508         default:
1509                 BUG();
1510         }
1511
1512         kfree(obj_request->img_extents);
1513         if (obj_request->copyup_bvecs) {
1514                 for (i = 0; i < obj_request->copyup_bvec_count; i++) {
1515                         if (obj_request->copyup_bvecs[i].bv_page)
1516                                 __free_page(obj_request->copyup_bvecs[i].bv_page);
1517                 }
1518                 kfree(obj_request->copyup_bvecs);
1519         }
1520
1521         kmem_cache_free(rbd_obj_request_cache, obj_request);
1522 }
1523
1524 /* It's OK to call this for a device with no parent */
1525
1526 static void rbd_spec_put(struct rbd_spec *spec);
1527 static void rbd_dev_unparent(struct rbd_device *rbd_dev)
1528 {
1529         rbd_dev_remove_parent(rbd_dev);
1530         rbd_spec_put(rbd_dev->parent_spec);
1531         rbd_dev->parent_spec = NULL;
1532         rbd_dev->parent_overlap = 0;
1533 }
1534
1535 /*
1536  * Parent image reference counting is used to determine when an
1537  * image's parent fields can be safely torn down--after there are no
1538  * more in-flight requests to the parent image.  When the last
1539  * reference is dropped, cleaning them up is safe.
1540  */
1541 static void rbd_dev_parent_put(struct rbd_device *rbd_dev)
1542 {
1543         int counter;
1544
1545         if (!rbd_dev->parent_spec)
1546                 return;
1547
1548         counter = atomic_dec_return_safe(&rbd_dev->parent_ref);
1549         if (counter > 0)
1550                 return;
1551
1552         /* Last reference; clean up parent data structures */
1553
1554         if (!counter)
1555                 rbd_dev_unparent(rbd_dev);
1556         else
1557                 rbd_warn(rbd_dev, "parent reference underflow");
1558 }
1559
1560 /*
1561  * If an image has a non-zero parent overlap, get a reference to its
1562  * parent.
1563  *
1564  * Returns true if the rbd device has a parent with a non-zero
1565  * overlap and a reference for it was successfully taken, or
1566  * false otherwise.
1567  */
1568 static bool rbd_dev_parent_get(struct rbd_device *rbd_dev)
1569 {
1570         int counter = 0;
1571
1572         if (!rbd_dev->parent_spec)
1573                 return false;
1574
1575         if (rbd_dev->parent_overlap)
1576                 counter = atomic_inc_return_safe(&rbd_dev->parent_ref);
1577
1578         if (counter < 0)
1579                 rbd_warn(rbd_dev, "parent reference overflow");
1580
1581         return counter > 0;
1582 }
1583
1584 static void rbd_img_request_init(struct rbd_img_request *img_request,
1585                                  struct rbd_device *rbd_dev,
1586                                  enum obj_operation_type op_type)
1587 {
1588         memset(img_request, 0, sizeof(*img_request));
1589
1590         img_request->rbd_dev = rbd_dev;
1591         img_request->op_type = op_type;
1592
1593         INIT_LIST_HEAD(&img_request->lock_item);
1594         INIT_LIST_HEAD(&img_request->object_extents);
1595         mutex_init(&img_request->state_mutex);
1596 }
1597
1598 /*
1599  * Only snap_id is captured here, for reads.  For writes, snapshot
1600  * context is captured in rbd_img_object_requests() after exclusive
1601  * lock is ensured to be held.
1602  */
1603 static void rbd_img_capture_header(struct rbd_img_request *img_req)
1604 {
1605         struct rbd_device *rbd_dev = img_req->rbd_dev;
1606
1607         lockdep_assert_held(&rbd_dev->header_rwsem);
1608
1609         if (!rbd_img_is_write(img_req))
1610                 img_req->snap_id = rbd_dev->spec->snap_id;
1611
1612         if (rbd_dev_parent_get(rbd_dev))
1613                 img_request_layered_set(img_req);
1614 }
1615
1616 static void rbd_img_request_destroy(struct rbd_img_request *img_request)
1617 {
1618         struct rbd_obj_request *obj_request;
1619         struct rbd_obj_request *next_obj_request;
1620
1621         dout("%s: img %p\n", __func__, img_request);
1622
1623         WARN_ON(!list_empty(&img_request->lock_item));
1624         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1625                 rbd_img_obj_request_del(img_request, obj_request);
1626
1627         if (img_request_layered_test(img_request))
1628                 rbd_dev_parent_put(img_request->rbd_dev);
1629
1630         if (rbd_img_is_write(img_request))
1631                 ceph_put_snap_context(img_request->snapc);
1632
1633         if (test_bit(IMG_REQ_CHILD, &img_request->flags))
1634                 kmem_cache_free(rbd_img_request_cache, img_request);
1635 }
1636
1637 #define BITS_PER_OBJ    2
1638 #define OBJS_PER_BYTE   (BITS_PER_BYTE / BITS_PER_OBJ)
1639 #define OBJ_MASK        ((1 << BITS_PER_OBJ) - 1)
1640
1641 static void __rbd_object_map_index(struct rbd_device *rbd_dev, u64 objno,
1642                                    u64 *index, u8 *shift)
1643 {
1644         u32 off;
1645
1646         rbd_assert(objno < rbd_dev->object_map_size);
1647         *index = div_u64_rem(objno, OBJS_PER_BYTE, &off);
1648         *shift = (OBJS_PER_BYTE - off - 1) * BITS_PER_OBJ;
1649 }
1650
1651 static u8 __rbd_object_map_get(struct rbd_device *rbd_dev, u64 objno)
1652 {
1653         u64 index;
1654         u8 shift;
1655
1656         lockdep_assert_held(&rbd_dev->object_map_lock);
1657         __rbd_object_map_index(rbd_dev, objno, &index, &shift);
1658         return (rbd_dev->object_map[index] >> shift) & OBJ_MASK;
1659 }
1660
1661 static void __rbd_object_map_set(struct rbd_device *rbd_dev, u64 objno, u8 val)
1662 {
1663         u64 index;
1664         u8 shift;
1665         u8 *p;
1666
1667         lockdep_assert_held(&rbd_dev->object_map_lock);
1668         rbd_assert(!(val & ~OBJ_MASK));
1669
1670         __rbd_object_map_index(rbd_dev, objno, &index, &shift);
1671         p = &rbd_dev->object_map[index];
1672         *p = (*p & ~(OBJ_MASK << shift)) | (val << shift);
1673 }
1674
1675 static u8 rbd_object_map_get(struct rbd_device *rbd_dev, u64 objno)
1676 {
1677         u8 state;
1678
1679         spin_lock(&rbd_dev->object_map_lock);
1680         state = __rbd_object_map_get(rbd_dev, objno);
1681         spin_unlock(&rbd_dev->object_map_lock);
1682         return state;
1683 }
1684
1685 static bool use_object_map(struct rbd_device *rbd_dev)
1686 {
1687         /*
1688          * An image mapped read-only can't use the object map -- it isn't
1689          * loaded because the header lock isn't acquired.  Someone else can
1690          * write to the image and update the object map behind our back.
1691          *
1692          * A snapshot can't be written to, so using the object map is always
1693          * safe.
1694          */
1695         if (!rbd_is_snap(rbd_dev) && rbd_is_ro(rbd_dev))
1696                 return false;
1697
1698         return ((rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP) &&
1699                 !(rbd_dev->object_map_flags & RBD_FLAG_OBJECT_MAP_INVALID));
1700 }
1701
1702 static bool rbd_object_map_may_exist(struct rbd_device *rbd_dev, u64 objno)
1703 {
1704         u8 state;
1705
1706         /* fall back to default logic if object map is disabled or invalid */
1707         if (!use_object_map(rbd_dev))
1708                 return true;
1709
1710         state = rbd_object_map_get(rbd_dev, objno);
1711         return state != OBJECT_NONEXISTENT;
1712 }
1713
1714 static void rbd_object_map_name(struct rbd_device *rbd_dev, u64 snap_id,
1715                                 struct ceph_object_id *oid)
1716 {
1717         if (snap_id == CEPH_NOSNAP)
1718                 ceph_oid_printf(oid, "%s%s", RBD_OBJECT_MAP_PREFIX,
1719                                 rbd_dev->spec->image_id);
1720         else
1721                 ceph_oid_printf(oid, "%s%s.%016llx", RBD_OBJECT_MAP_PREFIX,
1722                                 rbd_dev->spec->image_id, snap_id);
1723 }
1724
1725 static int rbd_object_map_lock(struct rbd_device *rbd_dev)
1726 {
1727         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1728         CEPH_DEFINE_OID_ONSTACK(oid);
1729         u8 lock_type;
1730         char *lock_tag;
1731         struct ceph_locker *lockers;
1732         u32 num_lockers;
1733         bool broke_lock = false;
1734         int ret;
1735
1736         rbd_object_map_name(rbd_dev, CEPH_NOSNAP, &oid);
1737
1738 again:
1739         ret = ceph_cls_lock(osdc, &oid, &rbd_dev->header_oloc, RBD_LOCK_NAME,
1740                             CEPH_CLS_LOCK_EXCLUSIVE, "", "", "", 0);
1741         if (ret != -EBUSY || broke_lock) {
1742                 if (ret == -EEXIST)
1743                         ret = 0; /* already locked by myself */
1744                 if (ret)
1745                         rbd_warn(rbd_dev, "failed to lock object map: %d", ret);
1746                 return ret;
1747         }
1748
1749         ret = ceph_cls_lock_info(osdc, &oid, &rbd_dev->header_oloc,
1750                                  RBD_LOCK_NAME, &lock_type, &lock_tag,
1751                                  &lockers, &num_lockers);
1752         if (ret) {
1753                 if (ret == -ENOENT)
1754                         goto again;
1755
1756                 rbd_warn(rbd_dev, "failed to get object map lockers: %d", ret);
1757                 return ret;
1758         }
1759
1760         kfree(lock_tag);
1761         if (num_lockers == 0)
1762                 goto again;
1763
1764         rbd_warn(rbd_dev, "breaking object map lock owned by %s%llu",
1765                  ENTITY_NAME(lockers[0].id.name));
1766
1767         ret = ceph_cls_break_lock(osdc, &oid, &rbd_dev->header_oloc,
1768                                   RBD_LOCK_NAME, lockers[0].id.cookie,
1769                                   &lockers[0].id.name);
1770         ceph_free_lockers(lockers, num_lockers);
1771         if (ret) {
1772                 if (ret == -ENOENT)
1773                         goto again;
1774
1775                 rbd_warn(rbd_dev, "failed to break object map lock: %d", ret);
1776                 return ret;
1777         }
1778
1779         broke_lock = true;
1780         goto again;
1781 }
1782
1783 static void rbd_object_map_unlock(struct rbd_device *rbd_dev)
1784 {
1785         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1786         CEPH_DEFINE_OID_ONSTACK(oid);
1787         int ret;
1788
1789         rbd_object_map_name(rbd_dev, CEPH_NOSNAP, &oid);
1790
1791         ret = ceph_cls_unlock(osdc, &oid, &rbd_dev->header_oloc, RBD_LOCK_NAME,
1792                               "");
1793         if (ret && ret != -ENOENT)
1794                 rbd_warn(rbd_dev, "failed to unlock object map: %d", ret);
1795 }
1796
1797 static int decode_object_map_header(void **p, void *end, u64 *object_map_size)
1798 {
1799         u8 struct_v;
1800         u32 struct_len;
1801         u32 header_len;
1802         void *header_end;
1803         int ret;
1804
1805         ceph_decode_32_safe(p, end, header_len, e_inval);
1806         header_end = *p + header_len;
1807
1808         ret = ceph_start_decoding(p, end, 1, "BitVector header", &struct_v,
1809                                   &struct_len);
1810         if (ret)
1811                 return ret;
1812
1813         ceph_decode_64_safe(p, end, *object_map_size, e_inval);
1814
1815         *p = header_end;
1816         return 0;
1817
1818 e_inval:
1819         return -EINVAL;
1820 }
1821
1822 static int __rbd_object_map_load(struct rbd_device *rbd_dev)
1823 {
1824         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1825         CEPH_DEFINE_OID_ONSTACK(oid);
1826         struct page **pages;
1827         void *p, *end;
1828         size_t reply_len;
1829         u64 num_objects;
1830         u64 object_map_bytes;
1831         u64 object_map_size;
1832         int num_pages;
1833         int ret;
1834
1835         rbd_assert(!rbd_dev->object_map && !rbd_dev->object_map_size);
1836
1837         num_objects = ceph_get_num_objects(&rbd_dev->layout,
1838                                            rbd_dev->mapping.size);
1839         object_map_bytes = DIV_ROUND_UP_ULL(num_objects * BITS_PER_OBJ,
1840                                             BITS_PER_BYTE);
1841         num_pages = calc_pages_for(0, object_map_bytes) + 1;
1842         pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1843         if (IS_ERR(pages))
1844                 return PTR_ERR(pages);
1845
1846         reply_len = num_pages * PAGE_SIZE;
1847         rbd_object_map_name(rbd_dev, rbd_dev->spec->snap_id, &oid);
1848         ret = ceph_osdc_call(osdc, &oid, &rbd_dev->header_oloc,
1849                              "rbd", "object_map_load", CEPH_OSD_FLAG_READ,
1850                              NULL, 0, pages, &reply_len);
1851         if (ret)
1852                 goto out;
1853
1854         p = page_address(pages[0]);
1855         end = p + min(reply_len, (size_t)PAGE_SIZE);
1856         ret = decode_object_map_header(&p, end, &object_map_size);
1857         if (ret)
1858                 goto out;
1859
1860         if (object_map_size != num_objects) {
1861                 rbd_warn(rbd_dev, "object map size mismatch: %llu vs %llu",
1862                          object_map_size, num_objects);
1863                 ret = -EINVAL;
1864                 goto out;
1865         }
1866
1867         if (offset_in_page(p) + object_map_bytes > reply_len) {
1868                 ret = -EINVAL;
1869                 goto out;
1870         }
1871
1872         rbd_dev->object_map = kvmalloc(object_map_bytes, GFP_KERNEL);
1873         if (!rbd_dev->object_map) {
1874                 ret = -ENOMEM;
1875                 goto out;
1876         }
1877
1878         rbd_dev->object_map_size = object_map_size;
1879         ceph_copy_from_page_vector(pages, rbd_dev->object_map,
1880                                    offset_in_page(p), object_map_bytes);
1881
1882 out:
1883         ceph_release_page_vector(pages, num_pages);
1884         return ret;
1885 }
1886
1887 static void rbd_object_map_free(struct rbd_device *rbd_dev)
1888 {
1889         kvfree(rbd_dev->object_map);
1890         rbd_dev->object_map = NULL;
1891         rbd_dev->object_map_size = 0;
1892 }
1893
1894 static int rbd_object_map_load(struct rbd_device *rbd_dev)
1895 {
1896         int ret;
1897
1898         ret = __rbd_object_map_load(rbd_dev);
1899         if (ret)
1900                 return ret;
1901
1902         ret = rbd_dev_v2_get_flags(rbd_dev);
1903         if (ret) {
1904                 rbd_object_map_free(rbd_dev);
1905                 return ret;
1906         }
1907
1908         if (rbd_dev->object_map_flags & RBD_FLAG_OBJECT_MAP_INVALID)
1909                 rbd_warn(rbd_dev, "object map is invalid");
1910
1911         return 0;
1912 }
1913
1914 static int rbd_object_map_open(struct rbd_device *rbd_dev)
1915 {
1916         int ret;
1917
1918         ret = rbd_object_map_lock(rbd_dev);
1919         if (ret)
1920                 return ret;
1921
1922         ret = rbd_object_map_load(rbd_dev);
1923         if (ret) {
1924                 rbd_object_map_unlock(rbd_dev);
1925                 return ret;
1926         }
1927
1928         return 0;
1929 }
1930
1931 static void rbd_object_map_close(struct rbd_device *rbd_dev)
1932 {
1933         rbd_object_map_free(rbd_dev);
1934         rbd_object_map_unlock(rbd_dev);
1935 }
1936
1937 /*
1938  * This function needs snap_id (or more precisely just something to
1939  * distinguish between HEAD and snapshot object maps), new_state and
1940  * current_state that were passed to rbd_object_map_update().
1941  *
1942  * To avoid allocating and stashing a context we piggyback on the OSD
1943  * request.  A HEAD update has two ops (assert_locked).  For new_state
1944  * and current_state we decode our own object_map_update op, encoded in
1945  * rbd_cls_object_map_update().
1946  */
1947 static int rbd_object_map_update_finish(struct rbd_obj_request *obj_req,
1948                                         struct ceph_osd_request *osd_req)
1949 {
1950         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
1951         struct ceph_osd_data *osd_data;
1952         u64 objno;
1953         u8 state, new_state, current_state;
1954         bool has_current_state;
1955         void *p;
1956
1957         if (osd_req->r_result)
1958                 return osd_req->r_result;
1959
1960         /*
1961          * Nothing to do for a snapshot object map.
1962          */
1963         if (osd_req->r_num_ops == 1)
1964                 return 0;
1965
1966         /*
1967          * Update in-memory HEAD object map.
1968          */
1969         rbd_assert(osd_req->r_num_ops == 2);
1970         osd_data = osd_req_op_data(osd_req, 1, cls, request_data);
1971         rbd_assert(osd_data->type == CEPH_OSD_DATA_TYPE_PAGES);
1972
1973         p = page_address(osd_data->pages[0]);
1974         objno = ceph_decode_64(&p);
1975         rbd_assert(objno == obj_req->ex.oe_objno);
1976         rbd_assert(ceph_decode_64(&p) == objno + 1);
1977         new_state = ceph_decode_8(&p);
1978         has_current_state = ceph_decode_8(&p);
1979         if (has_current_state)
1980                 current_state = ceph_decode_8(&p);
1981
1982         spin_lock(&rbd_dev->object_map_lock);
1983         state = __rbd_object_map_get(rbd_dev, objno);
1984         if (!has_current_state || current_state == state ||
1985             (current_state == OBJECT_EXISTS && state == OBJECT_EXISTS_CLEAN))
1986                 __rbd_object_map_set(rbd_dev, objno, new_state);
1987         spin_unlock(&rbd_dev->object_map_lock);
1988
1989         return 0;
1990 }
1991
1992 static void rbd_object_map_callback(struct ceph_osd_request *osd_req)
1993 {
1994         struct rbd_obj_request *obj_req = osd_req->r_priv;
1995         int result;
1996
1997         dout("%s osd_req %p result %d for obj_req %p\n", __func__, osd_req,
1998              osd_req->r_result, obj_req);
1999
2000         result = rbd_object_map_update_finish(obj_req, osd_req);
2001         rbd_obj_handle_request(obj_req, result);
2002 }
2003
2004 static bool update_needed(struct rbd_device *rbd_dev, u64 objno, u8 new_state)
2005 {
2006         u8 state = rbd_object_map_get(rbd_dev, objno);
2007
2008         if (state == new_state ||
2009             (new_state == OBJECT_PENDING && state == OBJECT_NONEXISTENT) ||
2010             (new_state == OBJECT_NONEXISTENT && state != OBJECT_PENDING))
2011                 return false;
2012
2013         return true;
2014 }
2015
2016 static int rbd_cls_object_map_update(struct ceph_osd_request *req,
2017                                      int which, u64 objno, u8 new_state,
2018                                      const u8 *current_state)
2019 {
2020         struct page **pages;
2021         void *p, *start;
2022         int ret;
2023
2024         ret = osd_req_op_cls_init(req, which, "rbd", "object_map_update");
2025         if (ret)
2026                 return ret;
2027
2028         pages = ceph_alloc_page_vector(1, GFP_NOIO);
2029         if (IS_ERR(pages))
2030                 return PTR_ERR(pages);
2031
2032         p = start = page_address(pages[0]);
2033         ceph_encode_64(&p, objno);
2034         ceph_encode_64(&p, objno + 1);
2035         ceph_encode_8(&p, new_state);
2036         if (current_state) {
2037                 ceph_encode_8(&p, 1);
2038                 ceph_encode_8(&p, *current_state);
2039         } else {
2040                 ceph_encode_8(&p, 0);
2041         }
2042
2043         osd_req_op_cls_request_data_pages(req, which, pages, p - start, 0,
2044                                           false, true);
2045         return 0;
2046 }
2047
2048 /*
2049  * Return:
2050  *   0 - object map update sent
2051  *   1 - object map update isn't needed
2052  *  <0 - error
2053  */
2054 static int rbd_object_map_update(struct rbd_obj_request *obj_req, u64 snap_id,
2055                                  u8 new_state, const u8 *current_state)
2056 {
2057         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2058         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2059         struct ceph_osd_request *req;
2060         int num_ops = 1;
2061         int which = 0;
2062         int ret;
2063
2064         if (snap_id == CEPH_NOSNAP) {
2065                 if (!update_needed(rbd_dev, obj_req->ex.oe_objno, new_state))
2066                         return 1;
2067
2068                 num_ops++; /* assert_locked */
2069         }
2070
2071         req = ceph_osdc_alloc_request(osdc, NULL, num_ops, false, GFP_NOIO);
2072         if (!req)
2073                 return -ENOMEM;
2074
2075         list_add_tail(&req->r_private_item, &obj_req->osd_reqs);
2076         req->r_callback = rbd_object_map_callback;
2077         req->r_priv = obj_req;
2078
2079         rbd_object_map_name(rbd_dev, snap_id, &req->r_base_oid);
2080         ceph_oloc_copy(&req->r_base_oloc, &rbd_dev->header_oloc);
2081         req->r_flags = CEPH_OSD_FLAG_WRITE;
2082         ktime_get_real_ts64(&req->r_mtime);
2083
2084         if (snap_id == CEPH_NOSNAP) {
2085                 /*
2086                  * Protect against possible race conditions during lock
2087                  * ownership transitions.
2088                  */
2089                 ret = ceph_cls_assert_locked(req, which++, RBD_LOCK_NAME,
2090                                              CEPH_CLS_LOCK_EXCLUSIVE, "", "");
2091                 if (ret)
2092                         return ret;
2093         }
2094
2095         ret = rbd_cls_object_map_update(req, which, obj_req->ex.oe_objno,
2096                                         new_state, current_state);
2097         if (ret)
2098                 return ret;
2099
2100         ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
2101         if (ret)
2102                 return ret;
2103
2104         ceph_osdc_start_request(osdc, req);
2105         return 0;
2106 }
2107
2108 static void prune_extents(struct ceph_file_extent *img_extents,
2109                           u32 *num_img_extents, u64 overlap)
2110 {
2111         u32 cnt = *num_img_extents;
2112
2113         /* drop extents completely beyond the overlap */
2114         while (cnt && img_extents[cnt - 1].fe_off >= overlap)
2115                 cnt--;
2116
2117         if (cnt) {
2118                 struct ceph_file_extent *ex = &img_extents[cnt - 1];
2119
2120                 /* trim final overlapping extent */
2121                 if (ex->fe_off + ex->fe_len > overlap)
2122                         ex->fe_len = overlap - ex->fe_off;
2123         }
2124
2125         *num_img_extents = cnt;
2126 }
2127
2128 /*
2129  * Determine the byte range(s) covered by either just the object extent
2130  * or the entire object in the parent image.
2131  */
2132 static int rbd_obj_calc_img_extents(struct rbd_obj_request *obj_req,
2133                                     bool entire)
2134 {
2135         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2136         int ret;
2137
2138         if (!rbd_dev->parent_overlap)
2139                 return 0;
2140
2141         ret = ceph_extent_to_file(&rbd_dev->layout, obj_req->ex.oe_objno,
2142                                   entire ? 0 : obj_req->ex.oe_off,
2143                                   entire ? rbd_dev->layout.object_size :
2144                                                         obj_req->ex.oe_len,
2145                                   &obj_req->img_extents,
2146                                   &obj_req->num_img_extents);
2147         if (ret)
2148                 return ret;
2149
2150         prune_extents(obj_req->img_extents, &obj_req->num_img_extents,
2151                       rbd_dev->parent_overlap);
2152         return 0;
2153 }
2154
2155 static void rbd_osd_setup_data(struct ceph_osd_request *osd_req, int which)
2156 {
2157         struct rbd_obj_request *obj_req = osd_req->r_priv;
2158
2159         switch (obj_req->img_request->data_type) {
2160         case OBJ_REQUEST_BIO:
2161                 osd_req_op_extent_osd_data_bio(osd_req, which,
2162                                                &obj_req->bio_pos,
2163                                                obj_req->ex.oe_len);
2164                 break;
2165         case OBJ_REQUEST_BVECS:
2166         case OBJ_REQUEST_OWN_BVECS:
2167                 rbd_assert(obj_req->bvec_pos.iter.bi_size ==
2168                                                         obj_req->ex.oe_len);
2169                 rbd_assert(obj_req->bvec_idx == obj_req->bvec_count);
2170                 osd_req_op_extent_osd_data_bvec_pos(osd_req, which,
2171                                                     &obj_req->bvec_pos);
2172                 break;
2173         default:
2174                 BUG();
2175         }
2176 }
2177
2178 static int rbd_osd_setup_stat(struct ceph_osd_request *osd_req, int which)
2179 {
2180         struct page **pages;
2181
2182         /*
2183          * The response data for a STAT call consists of:
2184          *     le64 length;
2185          *     struct {
2186          *         le32 tv_sec;
2187          *         le32 tv_nsec;
2188          *     } mtime;
2189          */
2190         pages = ceph_alloc_page_vector(1, GFP_NOIO);
2191         if (IS_ERR(pages))
2192                 return PTR_ERR(pages);
2193
2194         osd_req_op_init(osd_req, which, CEPH_OSD_OP_STAT, 0);
2195         osd_req_op_raw_data_in_pages(osd_req, which, pages,
2196                                      8 + sizeof(struct ceph_timespec),
2197                                      0, false, true);
2198         return 0;
2199 }
2200
2201 static int rbd_osd_setup_copyup(struct ceph_osd_request *osd_req, int which,
2202                                 u32 bytes)
2203 {
2204         struct rbd_obj_request *obj_req = osd_req->r_priv;
2205         int ret;
2206
2207         ret = osd_req_op_cls_init(osd_req, which, "rbd", "copyup");
2208         if (ret)
2209                 return ret;
2210
2211         osd_req_op_cls_request_data_bvecs(osd_req, which, obj_req->copyup_bvecs,
2212                                           obj_req->copyup_bvec_count, bytes);
2213         return 0;
2214 }
2215
2216 static int rbd_obj_init_read(struct rbd_obj_request *obj_req)
2217 {
2218         obj_req->read_state = RBD_OBJ_READ_START;
2219         return 0;
2220 }
2221
2222 static void __rbd_osd_setup_write_ops(struct ceph_osd_request *osd_req,
2223                                       int which)
2224 {
2225         struct rbd_obj_request *obj_req = osd_req->r_priv;
2226         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2227         u16 opcode;
2228
2229         if (!use_object_map(rbd_dev) ||
2230             !(obj_req->flags & RBD_OBJ_FLAG_MAY_EXIST)) {
2231                 osd_req_op_alloc_hint_init(osd_req, which++,
2232                                            rbd_dev->layout.object_size,
2233                                            rbd_dev->layout.object_size,
2234                                            rbd_dev->opts->alloc_hint_flags);
2235         }
2236
2237         if (rbd_obj_is_entire(obj_req))
2238                 opcode = CEPH_OSD_OP_WRITEFULL;
2239         else
2240                 opcode = CEPH_OSD_OP_WRITE;
2241
2242         osd_req_op_extent_init(osd_req, which, opcode,
2243                                obj_req->ex.oe_off, obj_req->ex.oe_len, 0, 0);
2244         rbd_osd_setup_data(osd_req, which);
2245 }
2246
2247 static int rbd_obj_init_write(struct rbd_obj_request *obj_req)
2248 {
2249         int ret;
2250
2251         /* reverse map the entire object onto the parent */
2252         ret = rbd_obj_calc_img_extents(obj_req, true);
2253         if (ret)
2254                 return ret;
2255
2256         obj_req->write_state = RBD_OBJ_WRITE_START;
2257         return 0;
2258 }
2259
2260 static u16 truncate_or_zero_opcode(struct rbd_obj_request *obj_req)
2261 {
2262         return rbd_obj_is_tail(obj_req) ? CEPH_OSD_OP_TRUNCATE :
2263                                           CEPH_OSD_OP_ZERO;
2264 }
2265
2266 static void __rbd_osd_setup_discard_ops(struct ceph_osd_request *osd_req,
2267                                         int which)
2268 {
2269         struct rbd_obj_request *obj_req = osd_req->r_priv;
2270
2271         if (rbd_obj_is_entire(obj_req) && !obj_req->num_img_extents) {
2272                 rbd_assert(obj_req->flags & RBD_OBJ_FLAG_DELETION);
2273                 osd_req_op_init(osd_req, which, CEPH_OSD_OP_DELETE, 0);
2274         } else {
2275                 osd_req_op_extent_init(osd_req, which,
2276                                        truncate_or_zero_opcode(obj_req),
2277                                        obj_req->ex.oe_off, obj_req->ex.oe_len,
2278                                        0, 0);
2279         }
2280 }
2281
2282 static int rbd_obj_init_discard(struct rbd_obj_request *obj_req)
2283 {
2284         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2285         u64 off, next_off;
2286         int ret;
2287
2288         /*
2289          * Align the range to alloc_size boundary and punt on discards
2290          * that are too small to free up any space.
2291          *
2292          * alloc_size == object_size && is_tail() is a special case for
2293          * filestore with filestore_punch_hole = false, needed to allow
2294          * truncate (in addition to delete).
2295          */
2296         if (rbd_dev->opts->alloc_size != rbd_dev->layout.object_size ||
2297             !rbd_obj_is_tail(obj_req)) {
2298                 off = round_up(obj_req->ex.oe_off, rbd_dev->opts->alloc_size);
2299                 next_off = round_down(obj_req->ex.oe_off + obj_req->ex.oe_len,
2300                                       rbd_dev->opts->alloc_size);
2301                 if (off >= next_off)
2302                         return 1;
2303
2304                 dout("%s %p %llu~%llu -> %llu~%llu\n", __func__,
2305                      obj_req, obj_req->ex.oe_off, obj_req->ex.oe_len,
2306                      off, next_off - off);
2307                 obj_req->ex.oe_off = off;
2308                 obj_req->ex.oe_len = next_off - off;
2309         }
2310
2311         /* reverse map the entire object onto the parent */
2312         ret = rbd_obj_calc_img_extents(obj_req, true);
2313         if (ret)
2314                 return ret;
2315
2316         obj_req->flags |= RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT;
2317         if (rbd_obj_is_entire(obj_req) && !obj_req->num_img_extents)
2318                 obj_req->flags |= RBD_OBJ_FLAG_DELETION;
2319
2320         obj_req->write_state = RBD_OBJ_WRITE_START;
2321         return 0;
2322 }
2323
2324 static void __rbd_osd_setup_zeroout_ops(struct ceph_osd_request *osd_req,
2325                                         int which)
2326 {
2327         struct rbd_obj_request *obj_req = osd_req->r_priv;
2328         u16 opcode;
2329
2330         if (rbd_obj_is_entire(obj_req)) {
2331                 if (obj_req->num_img_extents) {
2332                         if (!(obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED))
2333                                 osd_req_op_init(osd_req, which++,
2334                                                 CEPH_OSD_OP_CREATE, 0);
2335                         opcode = CEPH_OSD_OP_TRUNCATE;
2336                 } else {
2337                         rbd_assert(obj_req->flags & RBD_OBJ_FLAG_DELETION);
2338                         osd_req_op_init(osd_req, which++,
2339                                         CEPH_OSD_OP_DELETE, 0);
2340                         opcode = 0;
2341                 }
2342         } else {
2343                 opcode = truncate_or_zero_opcode(obj_req);
2344         }
2345
2346         if (opcode)
2347                 osd_req_op_extent_init(osd_req, which, opcode,
2348                                        obj_req->ex.oe_off, obj_req->ex.oe_len,
2349                                        0, 0);
2350 }
2351
2352 static int rbd_obj_init_zeroout(struct rbd_obj_request *obj_req)
2353 {
2354         int ret;
2355
2356         /* reverse map the entire object onto the parent */
2357         ret = rbd_obj_calc_img_extents(obj_req, true);
2358         if (ret)
2359                 return ret;
2360
2361         if (!obj_req->num_img_extents) {
2362                 obj_req->flags |= RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT;
2363                 if (rbd_obj_is_entire(obj_req))
2364                         obj_req->flags |= RBD_OBJ_FLAG_DELETION;
2365         }
2366
2367         obj_req->write_state = RBD_OBJ_WRITE_START;
2368         return 0;
2369 }
2370
2371 static int count_write_ops(struct rbd_obj_request *obj_req)
2372 {
2373         struct rbd_img_request *img_req = obj_req->img_request;
2374
2375         switch (img_req->op_type) {
2376         case OBJ_OP_WRITE:
2377                 if (!use_object_map(img_req->rbd_dev) ||
2378                     !(obj_req->flags & RBD_OBJ_FLAG_MAY_EXIST))
2379                         return 2; /* setallochint + write/writefull */
2380
2381                 return 1; /* write/writefull */
2382         case OBJ_OP_DISCARD:
2383                 return 1; /* delete/truncate/zero */
2384         case OBJ_OP_ZEROOUT:
2385                 if (rbd_obj_is_entire(obj_req) && obj_req->num_img_extents &&
2386                     !(obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED))
2387                         return 2; /* create + truncate */
2388
2389                 return 1; /* delete/truncate/zero */
2390         default:
2391                 BUG();
2392         }
2393 }
2394
2395 static void rbd_osd_setup_write_ops(struct ceph_osd_request *osd_req,
2396                                     int which)
2397 {
2398         struct rbd_obj_request *obj_req = osd_req->r_priv;
2399
2400         switch (obj_req->img_request->op_type) {
2401         case OBJ_OP_WRITE:
2402                 __rbd_osd_setup_write_ops(osd_req, which);
2403                 break;
2404         case OBJ_OP_DISCARD:
2405                 __rbd_osd_setup_discard_ops(osd_req, which);
2406                 break;
2407         case OBJ_OP_ZEROOUT:
2408                 __rbd_osd_setup_zeroout_ops(osd_req, which);
2409                 break;
2410         default:
2411                 BUG();
2412         }
2413 }
2414
2415 /*
2416  * Prune the list of object requests (adjust offset and/or length, drop
2417  * redundant requests).  Prepare object request state machines and image
2418  * request state machine for execution.
2419  */
2420 static int __rbd_img_fill_request(struct rbd_img_request *img_req)
2421 {
2422         struct rbd_obj_request *obj_req, *next_obj_req;
2423         int ret;
2424
2425         for_each_obj_request_safe(img_req, obj_req, next_obj_req) {
2426                 switch (img_req->op_type) {
2427                 case OBJ_OP_READ:
2428                         ret = rbd_obj_init_read(obj_req);
2429                         break;
2430                 case OBJ_OP_WRITE:
2431                         ret = rbd_obj_init_write(obj_req);
2432                         break;
2433                 case OBJ_OP_DISCARD:
2434                         ret = rbd_obj_init_discard(obj_req);
2435                         break;
2436                 case OBJ_OP_ZEROOUT:
2437                         ret = rbd_obj_init_zeroout(obj_req);
2438                         break;
2439                 default:
2440                         BUG();
2441                 }
2442                 if (ret < 0)
2443                         return ret;
2444                 if (ret > 0) {
2445                         rbd_img_obj_request_del(img_req, obj_req);
2446                         continue;
2447                 }
2448         }
2449
2450         img_req->state = RBD_IMG_START;
2451         return 0;
2452 }
2453
2454 union rbd_img_fill_iter {
2455         struct ceph_bio_iter    bio_iter;
2456         struct ceph_bvec_iter   bvec_iter;
2457 };
2458
2459 struct rbd_img_fill_ctx {
2460         enum obj_request_type   pos_type;
2461         union rbd_img_fill_iter *pos;
2462         union rbd_img_fill_iter iter;
2463         ceph_object_extent_fn_t set_pos_fn;
2464         ceph_object_extent_fn_t count_fn;
2465         ceph_object_extent_fn_t copy_fn;
2466 };
2467
2468 static struct ceph_object_extent *alloc_object_extent(void *arg)
2469 {
2470         struct rbd_img_request *img_req = arg;
2471         struct rbd_obj_request *obj_req;
2472
2473         obj_req = rbd_obj_request_create();
2474         if (!obj_req)
2475                 return NULL;
2476
2477         rbd_img_obj_request_add(img_req, obj_req);
2478         return &obj_req->ex;
2479 }
2480
2481 /*
2482  * While su != os && sc == 1 is technically not fancy (it's the same
2483  * layout as su == os && sc == 1), we can't use the nocopy path for it
2484  * because ->set_pos_fn() should be called only once per object.
2485  * ceph_file_to_extents() invokes action_fn once per stripe unit, so
2486  * treat su != os && sc == 1 as fancy.
2487  */
2488 static bool rbd_layout_is_fancy(struct ceph_file_layout *l)
2489 {
2490         return l->stripe_unit != l->object_size;
2491 }
2492
2493 static int rbd_img_fill_request_nocopy(struct rbd_img_request *img_req,
2494                                        struct ceph_file_extent *img_extents,
2495                                        u32 num_img_extents,
2496                                        struct rbd_img_fill_ctx *fctx)
2497 {
2498         u32 i;
2499         int ret;
2500
2501         img_req->data_type = fctx->pos_type;
2502
2503         /*
2504          * Create object requests and set each object request's starting
2505          * position in the provided bio (list) or bio_vec array.
2506          */
2507         fctx->iter = *fctx->pos;
2508         for (i = 0; i < num_img_extents; i++) {
2509                 ret = ceph_file_to_extents(&img_req->rbd_dev->layout,
2510                                            img_extents[i].fe_off,
2511                                            img_extents[i].fe_len,
2512                                            &img_req->object_extents,
2513                                            alloc_object_extent, img_req,
2514                                            fctx->set_pos_fn, &fctx->iter);
2515                 if (ret)
2516                         return ret;
2517         }
2518
2519         return __rbd_img_fill_request(img_req);
2520 }
2521
2522 /*
2523  * Map a list of image extents to a list of object extents, create the
2524  * corresponding object requests (normally each to a different object,
2525  * but not always) and add them to @img_req.  For each object request,
2526  * set up its data descriptor to point to the corresponding chunk(s) of
2527  * @fctx->pos data buffer.
2528  *
2529  * Because ceph_file_to_extents() will merge adjacent object extents
2530  * together, each object request's data descriptor may point to multiple
2531  * different chunks of @fctx->pos data buffer.
2532  *
2533  * @fctx->pos data buffer is assumed to be large enough.
2534  */
2535 static int rbd_img_fill_request(struct rbd_img_request *img_req,
2536                                 struct ceph_file_extent *img_extents,
2537                                 u32 num_img_extents,
2538                                 struct rbd_img_fill_ctx *fctx)
2539 {
2540         struct rbd_device *rbd_dev = img_req->rbd_dev;
2541         struct rbd_obj_request *obj_req;
2542         u32 i;
2543         int ret;
2544
2545         if (fctx->pos_type == OBJ_REQUEST_NODATA ||
2546             !rbd_layout_is_fancy(&rbd_dev->layout))
2547                 return rbd_img_fill_request_nocopy(img_req, img_extents,
2548                                                    num_img_extents, fctx);
2549
2550         img_req->data_type = OBJ_REQUEST_OWN_BVECS;
2551
2552         /*
2553          * Create object requests and determine ->bvec_count for each object
2554          * request.  Note that ->bvec_count sum over all object requests may
2555          * be greater than the number of bio_vecs in the provided bio (list)
2556          * or bio_vec array because when mapped, those bio_vecs can straddle
2557          * stripe unit boundaries.
2558          */
2559         fctx->iter = *fctx->pos;
2560         for (i = 0; i < num_img_extents; i++) {
2561                 ret = ceph_file_to_extents(&rbd_dev->layout,
2562                                            img_extents[i].fe_off,
2563                                            img_extents[i].fe_len,
2564                                            &img_req->object_extents,
2565                                            alloc_object_extent, img_req,
2566                                            fctx->count_fn, &fctx->iter);
2567                 if (ret)
2568                         return ret;
2569         }
2570
2571         for_each_obj_request(img_req, obj_req) {
2572                 obj_req->bvec_pos.bvecs = kmalloc_array(obj_req->bvec_count,
2573                                               sizeof(*obj_req->bvec_pos.bvecs),
2574                                               GFP_NOIO);
2575                 if (!obj_req->bvec_pos.bvecs)
2576                         return -ENOMEM;
2577         }
2578
2579         /*
2580          * Fill in each object request's private bio_vec array, splitting and
2581          * rearranging the provided bio_vecs in stripe unit chunks as needed.
2582          */
2583         fctx->iter = *fctx->pos;
2584         for (i = 0; i < num_img_extents; i++) {
2585                 ret = ceph_iterate_extents(&rbd_dev->layout,
2586                                            img_extents[i].fe_off,
2587                                            img_extents[i].fe_len,
2588                                            &img_req->object_extents,
2589                                            fctx->copy_fn, &fctx->iter);
2590                 if (ret)
2591                         return ret;
2592         }
2593
2594         return __rbd_img_fill_request(img_req);
2595 }
2596
2597 static int rbd_img_fill_nodata(struct rbd_img_request *img_req,
2598                                u64 off, u64 len)
2599 {
2600         struct ceph_file_extent ex = { off, len };
2601         union rbd_img_fill_iter dummy = {};
2602         struct rbd_img_fill_ctx fctx = {
2603                 .pos_type = OBJ_REQUEST_NODATA,
2604                 .pos = &dummy,
2605         };
2606
2607         return rbd_img_fill_request(img_req, &ex, 1, &fctx);
2608 }
2609
2610 static void set_bio_pos(struct ceph_object_extent *ex, u32 bytes, void *arg)
2611 {
2612         struct rbd_obj_request *obj_req =
2613             container_of(ex, struct rbd_obj_request, ex);
2614         struct ceph_bio_iter *it = arg;
2615
2616         dout("%s objno %llu bytes %u\n", __func__, ex->oe_objno, bytes);
2617         obj_req->bio_pos = *it;
2618         ceph_bio_iter_advance(it, bytes);
2619 }
2620
2621 static void count_bio_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg)
2622 {
2623         struct rbd_obj_request *obj_req =
2624             container_of(ex, struct rbd_obj_request, ex);
2625         struct ceph_bio_iter *it = arg;
2626
2627         dout("%s objno %llu bytes %u\n", __func__, ex->oe_objno, bytes);
2628         ceph_bio_iter_advance_step(it, bytes, ({
2629                 obj_req->bvec_count++;
2630         }));
2631
2632 }
2633
2634 static void copy_bio_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg)
2635 {
2636         struct rbd_obj_request *obj_req =
2637             container_of(ex, struct rbd_obj_request, ex);
2638         struct ceph_bio_iter *it = arg;
2639
2640         dout("%s objno %llu bytes %u\n", __func__, ex->oe_objno, bytes);
2641         ceph_bio_iter_advance_step(it, bytes, ({
2642                 obj_req->bvec_pos.bvecs[obj_req->bvec_idx++] = bv;
2643                 obj_req->bvec_pos.iter.bi_size += bv.bv_len;
2644         }));
2645 }
2646
2647 static int __rbd_img_fill_from_bio(struct rbd_img_request *img_req,
2648                                    struct ceph_file_extent *img_extents,
2649                                    u32 num_img_extents,
2650                                    struct ceph_bio_iter *bio_pos)
2651 {
2652         struct rbd_img_fill_ctx fctx = {
2653                 .pos_type = OBJ_REQUEST_BIO,
2654                 .pos = (union rbd_img_fill_iter *)bio_pos,
2655                 .set_pos_fn = set_bio_pos,
2656                 .count_fn = count_bio_bvecs,
2657                 .copy_fn = copy_bio_bvecs,
2658         };
2659
2660         return rbd_img_fill_request(img_req, img_extents, num_img_extents,
2661                                     &fctx);
2662 }
2663
2664 static int rbd_img_fill_from_bio(struct rbd_img_request *img_req,
2665                                  u64 off, u64 len, struct bio *bio)
2666 {
2667         struct ceph_file_extent ex = { off, len };
2668         struct ceph_bio_iter it = { .bio = bio, .iter = bio->bi_iter };
2669
2670         return __rbd_img_fill_from_bio(img_req, &ex, 1, &it);
2671 }
2672
2673 static void set_bvec_pos(struct ceph_object_extent *ex, u32 bytes, void *arg)
2674 {
2675         struct rbd_obj_request *obj_req =
2676             container_of(ex, struct rbd_obj_request, ex);
2677         struct ceph_bvec_iter *it = arg;
2678
2679         obj_req->bvec_pos = *it;
2680         ceph_bvec_iter_shorten(&obj_req->bvec_pos, bytes);
2681         ceph_bvec_iter_advance(it, bytes);
2682 }
2683
2684 static void count_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg)
2685 {
2686         struct rbd_obj_request *obj_req =
2687             container_of(ex, struct rbd_obj_request, ex);
2688         struct ceph_bvec_iter *it = arg;
2689
2690         ceph_bvec_iter_advance_step(it, bytes, ({
2691                 obj_req->bvec_count++;
2692         }));
2693 }
2694
2695 static void copy_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg)
2696 {
2697         struct rbd_obj_request *obj_req =
2698             container_of(ex, struct rbd_obj_request, ex);
2699         struct ceph_bvec_iter *it = arg;
2700
2701         ceph_bvec_iter_advance_step(it, bytes, ({
2702                 obj_req->bvec_pos.bvecs[obj_req->bvec_idx++] = bv;
2703                 obj_req->bvec_pos.iter.bi_size += bv.bv_len;
2704         }));
2705 }
2706
2707 static int __rbd_img_fill_from_bvecs(struct rbd_img_request *img_req,
2708                                      struct ceph_file_extent *img_extents,
2709                                      u32 num_img_extents,
2710                                      struct ceph_bvec_iter *bvec_pos)
2711 {
2712         struct rbd_img_fill_ctx fctx = {
2713                 .pos_type = OBJ_REQUEST_BVECS,
2714                 .pos = (union rbd_img_fill_iter *)bvec_pos,
2715                 .set_pos_fn = set_bvec_pos,
2716                 .count_fn = count_bvecs,
2717                 .copy_fn = copy_bvecs,
2718         };
2719
2720         return rbd_img_fill_request(img_req, img_extents, num_img_extents,
2721                                     &fctx);
2722 }
2723
2724 static int rbd_img_fill_from_bvecs(struct rbd_img_request *img_req,
2725                                    struct ceph_file_extent *img_extents,
2726                                    u32 num_img_extents,
2727                                    struct bio_vec *bvecs)
2728 {
2729         struct ceph_bvec_iter it = {
2730                 .bvecs = bvecs,
2731                 .iter = { .bi_size = ceph_file_extents_bytes(img_extents,
2732                                                              num_img_extents) },
2733         };
2734
2735         return __rbd_img_fill_from_bvecs(img_req, img_extents, num_img_extents,
2736                                          &it);
2737 }
2738
2739 static void rbd_img_handle_request_work(struct work_struct *work)
2740 {
2741         struct rbd_img_request *img_req =
2742             container_of(work, struct rbd_img_request, work);
2743
2744         rbd_img_handle_request(img_req, img_req->work_result);
2745 }
2746
2747 static void rbd_img_schedule(struct rbd_img_request *img_req, int result)
2748 {
2749         INIT_WORK(&img_req->work, rbd_img_handle_request_work);
2750         img_req->work_result = result;
2751         queue_work(rbd_wq, &img_req->work);
2752 }
2753
2754 static bool rbd_obj_may_exist(struct rbd_obj_request *obj_req)
2755 {
2756         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2757
2758         if (rbd_object_map_may_exist(rbd_dev, obj_req->ex.oe_objno)) {
2759                 obj_req->flags |= RBD_OBJ_FLAG_MAY_EXIST;
2760                 return true;
2761         }
2762
2763         dout("%s %p objno %llu assuming dne\n", __func__, obj_req,
2764              obj_req->ex.oe_objno);
2765         return false;
2766 }
2767
2768 static int rbd_obj_read_object(struct rbd_obj_request *obj_req)
2769 {
2770         struct ceph_osd_request *osd_req;
2771         int ret;
2772
2773         osd_req = __rbd_obj_add_osd_request(obj_req, NULL, 1);
2774         if (IS_ERR(osd_req))
2775                 return PTR_ERR(osd_req);
2776
2777         osd_req_op_extent_init(osd_req, 0, CEPH_OSD_OP_READ,
2778                                obj_req->ex.oe_off, obj_req->ex.oe_len, 0, 0);
2779         rbd_osd_setup_data(osd_req, 0);
2780         rbd_osd_format_read(osd_req);
2781
2782         ret = ceph_osdc_alloc_messages(osd_req, GFP_NOIO);
2783         if (ret)
2784                 return ret;
2785
2786         rbd_osd_submit(osd_req);
2787         return 0;
2788 }
2789
2790 static int rbd_obj_read_from_parent(struct rbd_obj_request *obj_req)
2791 {
2792         struct rbd_img_request *img_req = obj_req->img_request;
2793         struct rbd_device *parent = img_req->rbd_dev->parent;
2794         struct rbd_img_request *child_img_req;
2795         int ret;
2796
2797         child_img_req = kmem_cache_alloc(rbd_img_request_cache, GFP_NOIO);
2798         if (!child_img_req)
2799                 return -ENOMEM;
2800
2801         rbd_img_request_init(child_img_req, parent, OBJ_OP_READ);
2802         __set_bit(IMG_REQ_CHILD, &child_img_req->flags);
2803         child_img_req->obj_request = obj_req;
2804
2805         down_read(&parent->header_rwsem);
2806         rbd_img_capture_header(child_img_req);
2807         up_read(&parent->header_rwsem);
2808
2809         dout("%s child_img_req %p for obj_req %p\n", __func__, child_img_req,
2810              obj_req);
2811
2812         if (!rbd_img_is_write(img_req)) {
2813                 switch (img_req->data_type) {
2814                 case OBJ_REQUEST_BIO:
2815                         ret = __rbd_img_fill_from_bio(child_img_req,
2816                                                       obj_req->img_extents,
2817                                                       obj_req->num_img_extents,
2818                                                       &obj_req->bio_pos);
2819                         break;
2820                 case OBJ_REQUEST_BVECS:
2821                 case OBJ_REQUEST_OWN_BVECS:
2822                         ret = __rbd_img_fill_from_bvecs(child_img_req,
2823                                                       obj_req->img_extents,
2824                                                       obj_req->num_img_extents,
2825                                                       &obj_req->bvec_pos);
2826                         break;
2827                 default:
2828                         BUG();
2829                 }
2830         } else {
2831                 ret = rbd_img_fill_from_bvecs(child_img_req,
2832                                               obj_req->img_extents,
2833                                               obj_req->num_img_extents,
2834                                               obj_req->copyup_bvecs);
2835         }
2836         if (ret) {
2837                 rbd_img_request_destroy(child_img_req);
2838                 return ret;
2839         }
2840
2841         /* avoid parent chain recursion */
2842         rbd_img_schedule(child_img_req, 0);
2843         return 0;
2844 }
2845
2846 static bool rbd_obj_advance_read(struct rbd_obj_request *obj_req, int *result)
2847 {
2848         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2849         int ret;
2850
2851 again:
2852         switch (obj_req->read_state) {
2853         case RBD_OBJ_READ_START:
2854                 rbd_assert(!*result);
2855
2856                 if (!rbd_obj_may_exist(obj_req)) {
2857                         *result = -ENOENT;
2858                         obj_req->read_state = RBD_OBJ_READ_OBJECT;
2859                         goto again;
2860                 }
2861
2862                 ret = rbd_obj_read_object(obj_req);
2863                 if (ret) {
2864                         *result = ret;
2865                         return true;
2866                 }
2867                 obj_req->read_state = RBD_OBJ_READ_OBJECT;
2868                 return false;
2869         case RBD_OBJ_READ_OBJECT:
2870                 if (*result == -ENOENT && rbd_dev->parent_overlap) {
2871                         /* reverse map this object extent onto the parent */
2872                         ret = rbd_obj_calc_img_extents(obj_req, false);
2873                         if (ret) {
2874                                 *result = ret;
2875                                 return true;
2876                         }
2877                         if (obj_req->num_img_extents) {
2878                                 ret = rbd_obj_read_from_parent(obj_req);
2879                                 if (ret) {
2880                                         *result = ret;
2881                                         return true;
2882                                 }
2883                                 obj_req->read_state = RBD_OBJ_READ_PARENT;
2884                                 return false;
2885                         }
2886                 }
2887
2888                 /*
2889                  * -ENOENT means a hole in the image -- zero-fill the entire
2890                  * length of the request.  A short read also implies zero-fill
2891                  * to the end of the request.
2892                  */
2893                 if (*result == -ENOENT) {
2894                         rbd_obj_zero_range(obj_req, 0, obj_req->ex.oe_len);
2895                         *result = 0;
2896                 } else if (*result >= 0) {
2897                         if (*result < obj_req->ex.oe_len)
2898                                 rbd_obj_zero_range(obj_req, *result,
2899                                                 obj_req->ex.oe_len - *result);
2900                         else
2901                                 rbd_assert(*result == obj_req->ex.oe_len);
2902                         *result = 0;
2903                 }
2904                 return true;
2905         case RBD_OBJ_READ_PARENT:
2906                 /*
2907                  * The parent image is read only up to the overlap -- zero-fill
2908                  * from the overlap to the end of the request.
2909                  */
2910                 if (!*result) {
2911                         u32 obj_overlap = rbd_obj_img_extents_bytes(obj_req);
2912
2913                         if (obj_overlap < obj_req->ex.oe_len)
2914                                 rbd_obj_zero_range(obj_req, obj_overlap,
2915                                             obj_req->ex.oe_len - obj_overlap);
2916                 }
2917                 return true;
2918         default:
2919                 BUG();
2920         }
2921 }
2922
2923 static bool rbd_obj_write_is_noop(struct rbd_obj_request *obj_req)
2924 {
2925         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2926
2927         if (rbd_object_map_may_exist(rbd_dev, obj_req->ex.oe_objno))
2928                 obj_req->flags |= RBD_OBJ_FLAG_MAY_EXIST;
2929
2930         if (!(obj_req->flags & RBD_OBJ_FLAG_MAY_EXIST) &&
2931             (obj_req->flags & RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT)) {
2932                 dout("%s %p noop for nonexistent\n", __func__, obj_req);
2933                 return true;
2934         }
2935
2936         return false;
2937 }
2938
2939 /*
2940  * Return:
2941  *   0 - object map update sent
2942  *   1 - object map update isn't needed
2943  *  <0 - error
2944  */
2945 static int rbd_obj_write_pre_object_map(struct rbd_obj_request *obj_req)
2946 {
2947         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2948         u8 new_state;
2949
2950         if (!(rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP))
2951                 return 1;
2952
2953         if (obj_req->flags & RBD_OBJ_FLAG_DELETION)
2954                 new_state = OBJECT_PENDING;
2955         else
2956                 new_state = OBJECT_EXISTS;
2957
2958         return rbd_object_map_update(obj_req, CEPH_NOSNAP, new_state, NULL);
2959 }
2960
2961 static int rbd_obj_write_object(struct rbd_obj_request *obj_req)
2962 {
2963         struct ceph_osd_request *osd_req;
2964         int num_ops = count_write_ops(obj_req);
2965         int which = 0;
2966         int ret;
2967
2968         if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED)
2969                 num_ops++; /* stat */
2970
2971         osd_req = rbd_obj_add_osd_request(obj_req, num_ops);
2972         if (IS_ERR(osd_req))
2973                 return PTR_ERR(osd_req);
2974
2975         if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED) {
2976                 ret = rbd_osd_setup_stat(osd_req, which++);
2977                 if (ret)
2978                         return ret;
2979         }
2980
2981         rbd_osd_setup_write_ops(osd_req, which);
2982         rbd_osd_format_write(osd_req);
2983
2984         ret = ceph_osdc_alloc_messages(osd_req, GFP_NOIO);
2985         if (ret)
2986                 return ret;
2987
2988         rbd_osd_submit(osd_req);
2989         return 0;
2990 }
2991
2992 /*
2993  * copyup_bvecs pages are never highmem pages
2994  */
2995 static bool is_zero_bvecs(struct bio_vec *bvecs, u32 bytes)
2996 {
2997         struct ceph_bvec_iter it = {
2998                 .bvecs = bvecs,
2999                 .iter = { .bi_size = bytes },
3000         };
3001
3002         ceph_bvec_iter_advance_step(&it, bytes, ({
3003                 if (memchr_inv(bvec_virt(&bv), 0, bv.bv_len))
3004                         return false;
3005         }));
3006         return true;
3007 }
3008
3009 #define MODS_ONLY       U32_MAX
3010
3011 static int rbd_obj_copyup_empty_snapc(struct rbd_obj_request *obj_req,
3012                                       u32 bytes)
3013 {
3014         struct ceph_osd_request *osd_req;
3015         int ret;
3016
3017         dout("%s obj_req %p bytes %u\n", __func__, obj_req, bytes);
3018         rbd_assert(bytes > 0 && bytes != MODS_ONLY);
3019
3020         osd_req = __rbd_obj_add_osd_request(obj_req, &rbd_empty_snapc, 1);
3021         if (IS_ERR(osd_req))
3022                 return PTR_ERR(osd_req);
3023
3024         ret = rbd_osd_setup_copyup(osd_req, 0, bytes);
3025         if (ret)
3026                 return ret;
3027
3028         rbd_osd_format_write(osd_req);
3029
3030         ret = ceph_osdc_alloc_messages(osd_req, GFP_NOIO);
3031         if (ret)
3032                 return ret;
3033
3034         rbd_osd_submit(osd_req);
3035         return 0;
3036 }
3037
3038 static int rbd_obj_copyup_current_snapc(struct rbd_obj_request *obj_req,
3039                                         u32 bytes)
3040 {
3041         struct ceph_osd_request *osd_req;
3042         int num_ops = count_write_ops(obj_req);
3043         int which = 0;
3044         int ret;
3045
3046         dout("%s obj_req %p bytes %u\n", __func__, obj_req, bytes);
3047
3048         if (bytes != MODS_ONLY)
3049                 num_ops++; /* copyup */
3050
3051         osd_req = rbd_obj_add_osd_request(obj_req, num_ops);
3052         if (IS_ERR(osd_req))
3053                 return PTR_ERR(osd_req);
3054
3055         if (bytes != MODS_ONLY) {
3056                 ret = rbd_osd_setup_copyup(osd_req, which++, bytes);
3057                 if (ret)
3058                         return ret;
3059         }
3060
3061         rbd_osd_setup_write_ops(osd_req, which);
3062         rbd_osd_format_write(osd_req);
3063
3064         ret = ceph_osdc_alloc_messages(osd_req, GFP_NOIO);
3065         if (ret)
3066                 return ret;
3067
3068         rbd_osd_submit(osd_req);
3069         return 0;
3070 }
3071
3072 static int setup_copyup_bvecs(struct rbd_obj_request *obj_req, u64 obj_overlap)
3073 {
3074         u32 i;
3075
3076         rbd_assert(!obj_req->copyup_bvecs);
3077         obj_req->copyup_bvec_count = calc_pages_for(0, obj_overlap);
3078         obj_req->copyup_bvecs = kcalloc(obj_req->copyup_bvec_count,
3079                                         sizeof(*obj_req->copyup_bvecs),
3080                                         GFP_NOIO);
3081         if (!obj_req->copyup_bvecs)
3082                 return -ENOMEM;
3083
3084         for (i = 0; i < obj_req->copyup_bvec_count; i++) {
3085                 unsigned int len = min(obj_overlap, (u64)PAGE_SIZE);
3086                 struct page *page = alloc_page(GFP_NOIO);
3087
3088                 if (!page)
3089                         return -ENOMEM;
3090
3091                 bvec_set_page(&obj_req->copyup_bvecs[i], page, len, 0);
3092                 obj_overlap -= len;
3093         }
3094
3095         rbd_assert(!obj_overlap);
3096         return 0;
3097 }
3098
3099 /*
3100  * The target object doesn't exist.  Read the data for the entire
3101  * target object up to the overlap point (if any) from the parent,
3102  * so we can use it for a copyup.
3103  */
3104 static int rbd_obj_copyup_read_parent(struct rbd_obj_request *obj_req)
3105 {
3106         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
3107         int ret;
3108
3109         rbd_assert(obj_req->num_img_extents);
3110         prune_extents(obj_req->img_extents, &obj_req->num_img_extents,
3111                       rbd_dev->parent_overlap);
3112         if (!obj_req->num_img_extents) {
3113                 /*
3114                  * The overlap has become 0 (most likely because the
3115                  * image has been flattened).  Re-submit the original write
3116                  * request -- pass MODS_ONLY since the copyup isn't needed
3117                  * anymore.
3118                  */
3119                 return rbd_obj_copyup_current_snapc(obj_req, MODS_ONLY);
3120         }
3121
3122         ret = setup_copyup_bvecs(obj_req, rbd_obj_img_extents_bytes(obj_req));
3123         if (ret)
3124                 return ret;
3125
3126         return rbd_obj_read_from_parent(obj_req);
3127 }
3128
3129 static void rbd_obj_copyup_object_maps(struct rbd_obj_request *obj_req)
3130 {
3131         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
3132         struct ceph_snap_context *snapc = obj_req->img_request->snapc;
3133         u8 new_state;
3134         u32 i;
3135         int ret;
3136
3137         rbd_assert(!obj_req->pending.result && !obj_req->pending.num_pending);
3138
3139         if (!(rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP))
3140                 return;
3141
3142         if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ZEROS)
3143                 return;
3144
3145         for (i = 0; i < snapc->num_snaps; i++) {
3146                 if ((rbd_dev->header.features & RBD_FEATURE_FAST_DIFF) &&
3147                     i + 1 < snapc->num_snaps)
3148                         new_state = OBJECT_EXISTS_CLEAN;
3149                 else
3150                         new_state = OBJECT_EXISTS;
3151
3152                 ret = rbd_object_map_update(obj_req, snapc->snaps[i],
3153                                             new_state, NULL);
3154                 if (ret < 0) {
3155                         obj_req->pending.result = ret;
3156                         return;
3157                 }
3158
3159                 rbd_assert(!ret);
3160                 obj_req->pending.num_pending++;
3161         }
3162 }
3163
3164 static void rbd_obj_copyup_write_object(struct rbd_obj_request *obj_req)
3165 {
3166         u32 bytes = rbd_obj_img_extents_bytes(obj_req);
3167         int ret;
3168
3169         rbd_assert(!obj_req->pending.result && !obj_req->pending.num_pending);
3170
3171         /*
3172          * Only send non-zero copyup data to save some I/O and network
3173          * bandwidth -- zero copyup data is equivalent to the object not
3174          * existing.
3175          */
3176         if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ZEROS)
3177                 bytes = 0;
3178
3179         if (obj_req->img_request->snapc->num_snaps && bytes > 0) {
3180                 /*
3181                  * Send a copyup request with an empty snapshot context to
3182                  * deep-copyup the object through all existing snapshots.
3183                  * A second request with the current snapshot context will be
3184                  * sent for the actual modification.
3185                  */
3186                 ret = rbd_obj_copyup_empty_snapc(obj_req, bytes);
3187                 if (ret) {
3188                         obj_req->pending.result = ret;
3189                         return;
3190                 }
3191
3192                 obj_req->pending.num_pending++;
3193                 bytes = MODS_ONLY;
3194         }
3195
3196         ret = rbd_obj_copyup_current_snapc(obj_req, bytes);
3197         if (ret) {
3198                 obj_req->pending.result = ret;
3199                 return;
3200         }
3201
3202         obj_req->pending.num_pending++;
3203 }
3204
3205 static bool rbd_obj_advance_copyup(struct rbd_obj_request *obj_req, int *result)
3206 {
3207         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
3208         int ret;
3209
3210 again:
3211         switch (obj_req->copyup_state) {
3212         case RBD_OBJ_COPYUP_START:
3213                 rbd_assert(!*result);
3214
3215                 ret = rbd_obj_copyup_read_parent(obj_req);
3216                 if (ret) {
3217                         *result = ret;
3218                         return true;
3219                 }
3220                 if (obj_req->num_img_extents)
3221                         obj_req->copyup_state = RBD_OBJ_COPYUP_READ_PARENT;
3222                 else
3223                         obj_req->copyup_state = RBD_OBJ_COPYUP_WRITE_OBJECT;
3224                 return false;
3225         case RBD_OBJ_COPYUP_READ_PARENT:
3226                 if (*result)
3227                         return true;
3228
3229                 if (is_zero_bvecs(obj_req->copyup_bvecs,
3230                                   rbd_obj_img_extents_bytes(obj_req))) {
3231                         dout("%s %p detected zeros\n", __func__, obj_req);
3232                         obj_req->flags |= RBD_OBJ_FLAG_COPYUP_ZEROS;
3233                 }
3234
3235                 rbd_obj_copyup_object_maps(obj_req);
3236                 if (!obj_req->pending.num_pending) {
3237                         *result = obj_req->pending.result;
3238                         obj_req->copyup_state = RBD_OBJ_COPYUP_OBJECT_MAPS;
3239                         goto again;
3240                 }
3241                 obj_req->copyup_state = __RBD_OBJ_COPYUP_OBJECT_MAPS;
3242                 return false;
3243         case __RBD_OBJ_COPYUP_OBJECT_MAPS:
3244                 if (!pending_result_dec(&obj_req->pending, result))
3245                         return false;
3246                 fallthrough;
3247         case RBD_OBJ_COPYUP_OBJECT_MAPS:
3248                 if (*result) {
3249                         rbd_warn(rbd_dev, "snap object map update failed: %d",
3250                                  *result);
3251                         return true;
3252                 }
3253
3254                 rbd_obj_copyup_write_object(obj_req);
3255                 if (!obj_req->pending.num_pending) {
3256                         *result = obj_req->pending.result;
3257                         obj_req->copyup_state = RBD_OBJ_COPYUP_WRITE_OBJECT;
3258                         goto again;
3259                 }
3260                 obj_req->copyup_state = __RBD_OBJ_COPYUP_WRITE_OBJECT;
3261                 return false;
3262         case __RBD_OBJ_COPYUP_WRITE_OBJECT:
3263                 if (!pending_result_dec(&obj_req->pending, result))
3264                         return false;
3265                 fallthrough;
3266         case RBD_OBJ_COPYUP_WRITE_OBJECT:
3267                 return true;
3268         default:
3269                 BUG();
3270         }
3271 }
3272
3273 /*
3274  * Return:
3275  *   0 - object map update sent
3276  *   1 - object map update isn't needed
3277  *  <0 - error
3278  */
3279 static int rbd_obj_write_post_object_map(struct rbd_obj_request *obj_req)
3280 {
3281         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
3282         u8 current_state = OBJECT_PENDING;
3283
3284         if (!(rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP))
3285                 return 1;
3286
3287         if (!(obj_req->flags & RBD_OBJ_FLAG_DELETION))
3288                 return 1;
3289
3290         return rbd_object_map_update(obj_req, CEPH_NOSNAP, OBJECT_NONEXISTENT,
3291                                      &current_state);
3292 }
3293
3294 static bool rbd_obj_advance_write(struct rbd_obj_request *obj_req, int *result)
3295 {
3296         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
3297         int ret;
3298
3299 again:
3300         switch (obj_req->write_state) {
3301         case RBD_OBJ_WRITE_START:
3302                 rbd_assert(!*result);
3303
3304                 rbd_obj_set_copyup_enabled(obj_req);
3305                 if (rbd_obj_write_is_noop(obj_req))
3306                         return true;
3307
3308                 ret = rbd_obj_write_pre_object_map(obj_req);
3309                 if (ret < 0) {
3310                         *result = ret;
3311                         return true;
3312                 }
3313                 obj_req->write_state = RBD_OBJ_WRITE_PRE_OBJECT_MAP;
3314                 if (ret > 0)
3315                         goto again;
3316                 return false;
3317         case RBD_OBJ_WRITE_PRE_OBJECT_MAP:
3318                 if (*result) {
3319                         rbd_warn(rbd_dev, "pre object map update failed: %d",
3320                                  *result);
3321                         return true;
3322                 }
3323                 ret = rbd_obj_write_object(obj_req);
3324                 if (ret) {
3325                         *result = ret;
3326                         return true;
3327                 }
3328                 obj_req->write_state = RBD_OBJ_WRITE_OBJECT;
3329                 return false;
3330         case RBD_OBJ_WRITE_OBJECT:
3331                 if (*result == -ENOENT) {
3332                         if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED) {
3333                                 *result = 0;
3334                                 obj_req->copyup_state = RBD_OBJ_COPYUP_START;
3335                                 obj_req->write_state = __RBD_OBJ_WRITE_COPYUP;
3336                                 goto again;
3337                         }
3338                         /*
3339                          * On a non-existent object:
3340                          *   delete - -ENOENT, truncate/zero - 0
3341                          */
3342                         if (obj_req->flags & RBD_OBJ_FLAG_DELETION)
3343                                 *result = 0;
3344                 }
3345                 if (*result)
3346                         return true;
3347
3348                 obj_req->write_state = RBD_OBJ_WRITE_COPYUP;
3349                 goto again;
3350         case __RBD_OBJ_WRITE_COPYUP:
3351                 if (!rbd_obj_advance_copyup(obj_req, result))
3352                         return false;
3353                 fallthrough;
3354         case RBD_OBJ_WRITE_COPYUP:
3355                 if (*result) {
3356                         rbd_warn(rbd_dev, "copyup failed: %d", *result);
3357                         return true;
3358                 }
3359                 ret = rbd_obj_write_post_object_map(obj_req);
3360                 if (ret < 0) {
3361                         *result = ret;
3362                         return true;
3363                 }
3364                 obj_req->write_state = RBD_OBJ_WRITE_POST_OBJECT_MAP;
3365                 if (ret > 0)
3366                         goto again;
3367                 return false;
3368         case RBD_OBJ_WRITE_POST_OBJECT_MAP:
3369                 if (*result)
3370                         rbd_warn(rbd_dev, "post object map update failed: %d",
3371                                  *result);
3372                 return true;
3373         default:
3374                 BUG();
3375         }
3376 }
3377
3378 /*
3379  * Return true if @obj_req is completed.
3380  */
3381 static bool __rbd_obj_handle_request(struct rbd_obj_request *obj_req,
3382                                      int *result)
3383 {
3384         struct rbd_img_request *img_req = obj_req->img_request;
3385         struct rbd_device *rbd_dev = img_req->rbd_dev;
3386         bool done;
3387
3388         mutex_lock(&obj_req->state_mutex);
3389         if (!rbd_img_is_write(img_req))
3390                 done = rbd_obj_advance_read(obj_req, result);
3391         else
3392                 done = rbd_obj_advance_write(obj_req, result);
3393         mutex_unlock(&obj_req->state_mutex);
3394
3395         if (done && *result) {
3396                 rbd_assert(*result < 0);
3397                 rbd_warn(rbd_dev, "%s at objno %llu %llu~%llu result %d",
3398                          obj_op_name(img_req->op_type), obj_req->ex.oe_objno,
3399                          obj_req->ex.oe_off, obj_req->ex.oe_len, *result);
3400         }
3401         return done;
3402 }
3403
3404 /*
3405  * This is open-coded in rbd_img_handle_request() to avoid parent chain
3406  * recursion.
3407  */
3408 static void rbd_obj_handle_request(struct rbd_obj_request *obj_req, int result)
3409 {
3410         if (__rbd_obj_handle_request(obj_req, &result))
3411                 rbd_img_handle_request(obj_req->img_request, result);
3412 }
3413
3414 static bool need_exclusive_lock(struct rbd_img_request *img_req)
3415 {
3416         struct rbd_device *rbd_dev = img_req->rbd_dev;
3417
3418         if (!(rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK))
3419                 return false;
3420
3421         if (rbd_is_ro(rbd_dev))
3422                 return false;
3423
3424         rbd_assert(!test_bit(IMG_REQ_CHILD, &img_req->flags));
3425         if (rbd_dev->opts->lock_on_read ||
3426             (rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP))
3427                 return true;
3428
3429         return rbd_img_is_write(img_req);
3430 }
3431
3432 static bool rbd_lock_add_request(struct rbd_img_request *img_req)
3433 {
3434         struct rbd_device *rbd_dev = img_req->rbd_dev;
3435         bool locked;
3436
3437         lockdep_assert_held(&rbd_dev->lock_rwsem);
3438         locked = rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED;
3439         spin_lock(&rbd_dev->lock_lists_lock);
3440         rbd_assert(list_empty(&img_req->lock_item));
3441         if (!locked)
3442                 list_add_tail(&img_req->lock_item, &rbd_dev->acquiring_list);
3443         else
3444                 list_add_tail(&img_req->lock_item, &rbd_dev->running_list);
3445         spin_unlock(&rbd_dev->lock_lists_lock);
3446         return locked;
3447 }
3448
3449 static void rbd_lock_del_request(struct rbd_img_request *img_req)
3450 {
3451         struct rbd_device *rbd_dev = img_req->rbd_dev;
3452         bool need_wakeup;
3453
3454         lockdep_assert_held(&rbd_dev->lock_rwsem);
3455         spin_lock(&rbd_dev->lock_lists_lock);
3456         rbd_assert(!list_empty(&img_req->lock_item));
3457         list_del_init(&img_req->lock_item);
3458         need_wakeup = (rbd_dev->lock_state == RBD_LOCK_STATE_RELEASING &&
3459                        list_empty(&rbd_dev->running_list));
3460         spin_unlock(&rbd_dev->lock_lists_lock);
3461         if (need_wakeup)
3462                 complete(&rbd_dev->releasing_wait);
3463 }
3464
3465 static int rbd_img_exclusive_lock(struct rbd_img_request *img_req)
3466 {
3467         struct rbd_device *rbd_dev = img_req->rbd_dev;
3468
3469         if (!need_exclusive_lock(img_req))
3470                 return 1;
3471
3472         if (rbd_lock_add_request(img_req))
3473                 return 1;
3474
3475         if (rbd_dev->opts->exclusive) {
3476                 WARN_ON(1); /* lock got released? */
3477                 return -EROFS;
3478         }
3479
3480         /*
3481          * Note the use of mod_delayed_work() in rbd_acquire_lock()
3482          * and cancel_delayed_work() in wake_lock_waiters().
3483          */
3484         dout("%s rbd_dev %p queueing lock_dwork\n", __func__, rbd_dev);
3485         queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
3486         return 0;
3487 }
3488
3489 static void rbd_img_object_requests(struct rbd_img_request *img_req)
3490 {
3491         struct rbd_device *rbd_dev = img_req->rbd_dev;
3492         struct rbd_obj_request *obj_req;
3493
3494         rbd_assert(!img_req->pending.result && !img_req->pending.num_pending);
3495         rbd_assert(!need_exclusive_lock(img_req) ||
3496                    __rbd_is_lock_owner(rbd_dev));
3497
3498         if (rbd_img_is_write(img_req)) {
3499                 rbd_assert(!img_req->snapc);
3500                 down_read(&rbd_dev->header_rwsem);
3501                 img_req->snapc = ceph_get_snap_context(rbd_dev->header.snapc);
3502                 up_read(&rbd_dev->header_rwsem);
3503         }
3504
3505         for_each_obj_request(img_req, obj_req) {
3506                 int result = 0;
3507
3508                 if (__rbd_obj_handle_request(obj_req, &result)) {
3509                         if (result) {
3510                                 img_req->pending.result = result;
3511                                 return;
3512                         }
3513                 } else {
3514                         img_req->pending.num_pending++;
3515                 }
3516         }
3517 }
3518
3519 static bool rbd_img_advance(struct rbd_img_request *img_req, int *result)
3520 {
3521         int ret;
3522
3523 again:
3524         switch (img_req->state) {
3525         case RBD_IMG_START:
3526                 rbd_assert(!*result);
3527
3528                 ret = rbd_img_exclusive_lock(img_req);
3529                 if (ret < 0) {
3530                         *result = ret;
3531                         return true;
3532                 }
3533                 img_req->state = RBD_IMG_EXCLUSIVE_LOCK;
3534                 if (ret > 0)
3535                         goto again;
3536                 return false;
3537         case RBD_IMG_EXCLUSIVE_LOCK:
3538                 if (*result)
3539                         return true;
3540
3541                 rbd_img_object_requests(img_req);
3542                 if (!img_req->pending.num_pending) {
3543                         *result = img_req->pending.result;
3544                         img_req->state = RBD_IMG_OBJECT_REQUESTS;
3545                         goto again;
3546                 }
3547                 img_req->state = __RBD_IMG_OBJECT_REQUESTS;
3548                 return false;
3549         case __RBD_IMG_OBJECT_REQUESTS:
3550                 if (!pending_result_dec(&img_req->pending, result))
3551                         return false;
3552                 fallthrough;
3553         case RBD_IMG_OBJECT_REQUESTS:
3554                 return true;
3555         default:
3556                 BUG();
3557         }
3558 }
3559
3560 /*
3561  * Return true if @img_req is completed.
3562  */
3563 static bool __rbd_img_handle_request(struct rbd_img_request *img_req,
3564                                      int *result)
3565 {
3566         struct rbd_device *rbd_dev = img_req->rbd_dev;
3567         bool done;
3568
3569         if (need_exclusive_lock(img_req)) {
3570                 down_read(&rbd_dev->lock_rwsem);
3571                 mutex_lock(&img_req->state_mutex);
3572                 done = rbd_img_advance(img_req, result);
3573                 if (done)
3574                         rbd_lock_del_request(img_req);
3575                 mutex_unlock(&img_req->state_mutex);
3576                 up_read(&rbd_dev->lock_rwsem);
3577         } else {
3578                 mutex_lock(&img_req->state_mutex);
3579                 done = rbd_img_advance(img_req, result);
3580                 mutex_unlock(&img_req->state_mutex);
3581         }
3582
3583         if (done && *result) {
3584                 rbd_assert(*result < 0);
3585                 rbd_warn(rbd_dev, "%s%s result %d",
3586                       test_bit(IMG_REQ_CHILD, &img_req->flags) ? "child " : "",
3587                       obj_op_name(img_req->op_type), *result);
3588         }
3589         return done;
3590 }
3591
3592 static void rbd_img_handle_request(struct rbd_img_request *img_req, int result)
3593 {
3594 again:
3595         if (!__rbd_img_handle_request(img_req, &result))
3596                 return;
3597
3598         if (test_bit(IMG_REQ_CHILD, &img_req->flags)) {
3599                 struct rbd_obj_request *obj_req = img_req->obj_request;
3600
3601                 rbd_img_request_destroy(img_req);
3602                 if (__rbd_obj_handle_request(obj_req, &result)) {
3603                         img_req = obj_req->img_request;
3604                         goto again;
3605                 }
3606         } else {
3607                 struct request *rq = blk_mq_rq_from_pdu(img_req);
3608
3609                 rbd_img_request_destroy(img_req);
3610                 blk_mq_end_request(rq, errno_to_blk_status(result));
3611         }
3612 }
3613
3614 static const struct rbd_client_id rbd_empty_cid;
3615
3616 static bool rbd_cid_equal(const struct rbd_client_id *lhs,
3617                           const struct rbd_client_id *rhs)
3618 {
3619         return lhs->gid == rhs->gid && lhs->handle == rhs->handle;
3620 }
3621
3622 static struct rbd_client_id rbd_get_cid(struct rbd_device *rbd_dev)
3623 {
3624         struct rbd_client_id cid;
3625
3626         mutex_lock(&rbd_dev->watch_mutex);
3627         cid.gid = ceph_client_gid(rbd_dev->rbd_client->client);
3628         cid.handle = rbd_dev->watch_cookie;
3629         mutex_unlock(&rbd_dev->watch_mutex);
3630         return cid;
3631 }
3632
3633 /*
3634  * lock_rwsem must be held for write
3635  */
3636 static void rbd_set_owner_cid(struct rbd_device *rbd_dev,
3637                               const struct rbd_client_id *cid)
3638 {
3639         dout("%s rbd_dev %p %llu-%llu -> %llu-%llu\n", __func__, rbd_dev,
3640              rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle,
3641              cid->gid, cid->handle);
3642         rbd_dev->owner_cid = *cid; /* struct */
3643 }
3644
3645 static void format_lock_cookie(struct rbd_device *rbd_dev, char *buf)
3646 {
3647         mutex_lock(&rbd_dev->watch_mutex);
3648         sprintf(buf, "%s %llu", RBD_LOCK_COOKIE_PREFIX, rbd_dev->watch_cookie);
3649         mutex_unlock(&rbd_dev->watch_mutex);
3650 }
3651
3652 static void __rbd_lock(struct rbd_device *rbd_dev, const char *cookie)
3653 {
3654         struct rbd_client_id cid = rbd_get_cid(rbd_dev);
3655
3656         rbd_dev->lock_state = RBD_LOCK_STATE_LOCKED;
3657         strcpy(rbd_dev->lock_cookie, cookie);
3658         rbd_set_owner_cid(rbd_dev, &cid);
3659         queue_work(rbd_dev->task_wq, &rbd_dev->acquired_lock_work);
3660 }
3661
3662 /*
3663  * lock_rwsem must be held for write
3664  */
3665 static int rbd_lock(struct rbd_device *rbd_dev)
3666 {
3667         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3668         char cookie[32];
3669         int ret;
3670
3671         WARN_ON(__rbd_is_lock_owner(rbd_dev) ||
3672                 rbd_dev->lock_cookie[0] != '\0');
3673
3674         format_lock_cookie(rbd_dev, cookie);
3675         ret = ceph_cls_lock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
3676                             RBD_LOCK_NAME, CEPH_CLS_LOCK_EXCLUSIVE, cookie,
3677                             RBD_LOCK_TAG, "", 0);
3678         if (ret && ret != -EEXIST)
3679                 return ret;
3680
3681         __rbd_lock(rbd_dev, cookie);
3682         return 0;
3683 }
3684
3685 /*
3686  * lock_rwsem must be held for write
3687  */
3688 static void rbd_unlock(struct rbd_device *rbd_dev)
3689 {
3690         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3691         int ret;
3692
3693         WARN_ON(!__rbd_is_lock_owner(rbd_dev) ||
3694                 rbd_dev->lock_cookie[0] == '\0');
3695
3696         ret = ceph_cls_unlock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
3697                               RBD_LOCK_NAME, rbd_dev->lock_cookie);
3698         if (ret && ret != -ENOENT)
3699                 rbd_warn(rbd_dev, "failed to unlock header: %d", ret);
3700
3701         /* treat errors as the image is unlocked */
3702         rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
3703         rbd_dev->lock_cookie[0] = '\0';
3704         rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
3705         queue_work(rbd_dev->task_wq, &rbd_dev->released_lock_work);
3706 }
3707
3708 static int __rbd_notify_op_lock(struct rbd_device *rbd_dev,
3709                                 enum rbd_notify_op notify_op,
3710                                 struct page ***preply_pages,
3711                                 size_t *preply_len)
3712 {
3713         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3714         struct rbd_client_id cid = rbd_get_cid(rbd_dev);
3715         char buf[4 + 8 + 8 + CEPH_ENCODING_START_BLK_LEN];
3716         int buf_size = sizeof(buf);
3717         void *p = buf;
3718
3719         dout("%s rbd_dev %p notify_op %d\n", __func__, rbd_dev, notify_op);
3720
3721         /* encode *LockPayload NotifyMessage (op + ClientId) */
3722         ceph_start_encoding(&p, 2, 1, buf_size - CEPH_ENCODING_START_BLK_LEN);
3723         ceph_encode_32(&p, notify_op);
3724         ceph_encode_64(&p, cid.gid);
3725         ceph_encode_64(&p, cid.handle);
3726
3727         return ceph_osdc_notify(osdc, &rbd_dev->header_oid,
3728                                 &rbd_dev->header_oloc, buf, buf_size,
3729                                 RBD_NOTIFY_TIMEOUT, preply_pages, preply_len);
3730 }
3731
3732 static void rbd_notify_op_lock(struct rbd_device *rbd_dev,
3733                                enum rbd_notify_op notify_op)
3734 {
3735         __rbd_notify_op_lock(rbd_dev, notify_op, NULL, NULL);
3736 }
3737
3738 static void rbd_notify_acquired_lock(struct work_struct *work)
3739 {
3740         struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
3741                                                   acquired_lock_work);
3742
3743         rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_ACQUIRED_LOCK);
3744 }
3745
3746 static void rbd_notify_released_lock(struct work_struct *work)
3747 {
3748         struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
3749                                                   released_lock_work);
3750
3751         rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_RELEASED_LOCK);
3752 }
3753
3754 static int rbd_request_lock(struct rbd_device *rbd_dev)
3755 {
3756         struct page **reply_pages;
3757         size_t reply_len;
3758         bool lock_owner_responded = false;
3759         int ret;
3760
3761         dout("%s rbd_dev %p\n", __func__, rbd_dev);
3762
3763         ret = __rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_REQUEST_LOCK,
3764                                    &reply_pages, &reply_len);
3765         if (ret && ret != -ETIMEDOUT) {
3766                 rbd_warn(rbd_dev, "failed to request lock: %d", ret);
3767                 goto out;
3768         }
3769
3770         if (reply_len > 0 && reply_len <= PAGE_SIZE) {
3771                 void *p = page_address(reply_pages[0]);
3772                 void *const end = p + reply_len;
3773                 u32 n;
3774
3775                 ceph_decode_32_safe(&p, end, n, e_inval); /* num_acks */
3776                 while (n--) {
3777                         u8 struct_v;
3778                         u32 len;
3779
3780                         ceph_decode_need(&p, end, 8 + 8, e_inval);
3781                         p += 8 + 8; /* skip gid and cookie */
3782
3783                         ceph_decode_32_safe(&p, end, len, e_inval);
3784                         if (!len)
3785                                 continue;
3786
3787                         if (lock_owner_responded) {
3788                                 rbd_warn(rbd_dev,
3789                                          "duplicate lock owners detected");
3790                                 ret = -EIO;
3791                                 goto out;
3792                         }
3793
3794                         lock_owner_responded = true;
3795                         ret = ceph_start_decoding(&p, end, 1, "ResponseMessage",
3796                                                   &struct_v, &len);
3797                         if (ret) {
3798                                 rbd_warn(rbd_dev,
3799                                          "failed to decode ResponseMessage: %d",
3800                                          ret);
3801                                 goto e_inval;
3802                         }
3803
3804                         ret = ceph_decode_32(&p);
3805                 }
3806         }
3807
3808         if (!lock_owner_responded) {
3809                 rbd_warn(rbd_dev, "no lock owners detected");
3810                 ret = -ETIMEDOUT;
3811         }
3812
3813 out:
3814         ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len));
3815         return ret;
3816
3817 e_inval:
3818         ret = -EINVAL;
3819         goto out;
3820 }
3821
3822 /*
3823  * Either image request state machine(s) or rbd_add_acquire_lock()
3824  * (i.e. "rbd map").
3825  */
3826 static void wake_lock_waiters(struct rbd_device *rbd_dev, int result)
3827 {
3828         struct rbd_img_request *img_req;
3829
3830         dout("%s rbd_dev %p result %d\n", __func__, rbd_dev, result);
3831         lockdep_assert_held_write(&rbd_dev->lock_rwsem);
3832
3833         cancel_delayed_work(&rbd_dev->lock_dwork);
3834         if (!completion_done(&rbd_dev->acquire_wait)) {
3835                 rbd_assert(list_empty(&rbd_dev->acquiring_list) &&
3836                            list_empty(&rbd_dev->running_list));
3837                 rbd_dev->acquire_err = result;
3838                 complete_all(&rbd_dev->acquire_wait);
3839                 return;
3840         }
3841
3842         list_for_each_entry(img_req, &rbd_dev->acquiring_list, lock_item) {
3843                 mutex_lock(&img_req->state_mutex);
3844                 rbd_assert(img_req->state == RBD_IMG_EXCLUSIVE_LOCK);
3845                 rbd_img_schedule(img_req, result);
3846                 mutex_unlock(&img_req->state_mutex);
3847         }
3848
3849         list_splice_tail_init(&rbd_dev->acquiring_list, &rbd_dev->running_list);
3850 }
3851
3852 static bool locker_equal(const struct ceph_locker *lhs,
3853                          const struct ceph_locker *rhs)
3854 {
3855         return lhs->id.name.type == rhs->id.name.type &&
3856                lhs->id.name.num == rhs->id.name.num &&
3857                !strcmp(lhs->id.cookie, rhs->id.cookie) &&
3858                ceph_addr_equal_no_type(&lhs->info.addr, &rhs->info.addr);
3859 }
3860
3861 static void free_locker(struct ceph_locker *locker)
3862 {
3863         if (locker)
3864                 ceph_free_lockers(locker, 1);
3865 }
3866
3867 static struct ceph_locker *get_lock_owner_info(struct rbd_device *rbd_dev)
3868 {
3869         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3870         struct ceph_locker *lockers;
3871         u32 num_lockers;
3872         u8 lock_type;
3873         char *lock_tag;
3874         u64 handle;
3875         int ret;
3876
3877         ret = ceph_cls_lock_info(osdc, &rbd_dev->header_oid,
3878                                  &rbd_dev->header_oloc, RBD_LOCK_NAME,
3879                                  &lock_type, &lock_tag, &lockers, &num_lockers);
3880         if (ret) {
3881                 rbd_warn(rbd_dev, "failed to get header lockers: %d", ret);
3882                 return ERR_PTR(ret);
3883         }
3884
3885         if (num_lockers == 0) {
3886                 dout("%s rbd_dev %p no lockers detected\n", __func__, rbd_dev);
3887                 lockers = NULL;
3888                 goto out;
3889         }
3890
3891         if (strcmp(lock_tag, RBD_LOCK_TAG)) {
3892                 rbd_warn(rbd_dev, "locked by external mechanism, tag %s",
3893                          lock_tag);
3894                 goto err_busy;
3895         }
3896
3897         if (lock_type != CEPH_CLS_LOCK_EXCLUSIVE) {
3898                 rbd_warn(rbd_dev, "incompatible lock type detected");
3899                 goto err_busy;
3900         }
3901
3902         WARN_ON(num_lockers != 1);
3903         ret = sscanf(lockers[0].id.cookie, RBD_LOCK_COOKIE_PREFIX " %llu",
3904                      &handle);
3905         if (ret != 1) {
3906                 rbd_warn(rbd_dev, "locked by external mechanism, cookie %s",
3907                          lockers[0].id.cookie);
3908                 goto err_busy;
3909         }
3910         if (ceph_addr_is_blank(&lockers[0].info.addr)) {
3911                 rbd_warn(rbd_dev, "locker has a blank address");
3912                 goto err_busy;
3913         }
3914
3915         dout("%s rbd_dev %p got locker %s%llu@%pISpc/%u handle %llu\n",
3916              __func__, rbd_dev, ENTITY_NAME(lockers[0].id.name),
3917              &lockers[0].info.addr.in_addr,
3918              le32_to_cpu(lockers[0].info.addr.nonce), handle);
3919
3920 out:
3921         kfree(lock_tag);
3922         return lockers;
3923
3924 err_busy:
3925         kfree(lock_tag);
3926         ceph_free_lockers(lockers, num_lockers);
3927         return ERR_PTR(-EBUSY);
3928 }
3929
3930 static int find_watcher(struct rbd_device *rbd_dev,
3931                         const struct ceph_locker *locker)
3932 {
3933         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3934         struct ceph_watch_item *watchers;
3935         u32 num_watchers;
3936         u64 cookie;
3937         int i;
3938         int ret;
3939
3940         ret = ceph_osdc_list_watchers(osdc, &rbd_dev->header_oid,
3941                                       &rbd_dev->header_oloc, &watchers,
3942                                       &num_watchers);
3943         if (ret) {
3944                 rbd_warn(rbd_dev, "failed to get watchers: %d", ret);
3945                 return ret;
3946         }
3947
3948         sscanf(locker->id.cookie, RBD_LOCK_COOKIE_PREFIX " %llu", &cookie);
3949         for (i = 0; i < num_watchers; i++) {
3950                 /*
3951                  * Ignore addr->type while comparing.  This mimics
3952                  * entity_addr_t::get_legacy_str() + strcmp().
3953                  */
3954                 if (ceph_addr_equal_no_type(&watchers[i].addr,
3955                                             &locker->info.addr) &&
3956                     watchers[i].cookie == cookie) {
3957                         struct rbd_client_id cid = {
3958                                 .gid = le64_to_cpu(watchers[i].name.num),
3959                                 .handle = cookie,
3960                         };
3961
3962                         dout("%s rbd_dev %p found cid %llu-%llu\n", __func__,
3963                              rbd_dev, cid.gid, cid.handle);
3964                         rbd_set_owner_cid(rbd_dev, &cid);
3965                         ret = 1;
3966                         goto out;
3967                 }
3968         }
3969
3970         dout("%s rbd_dev %p no watchers\n", __func__, rbd_dev);
3971         ret = 0;
3972 out:
3973         kfree(watchers);
3974         return ret;
3975 }
3976
3977 /*
3978  * lock_rwsem must be held for write
3979  */
3980 static int rbd_try_lock(struct rbd_device *rbd_dev)
3981 {
3982         struct ceph_client *client = rbd_dev->rbd_client->client;
3983         struct ceph_locker *locker, *refreshed_locker;
3984         int ret;
3985
3986         for (;;) {
3987                 locker = refreshed_locker = NULL;
3988
3989                 ret = rbd_lock(rbd_dev);
3990                 if (!ret)
3991                         goto out;
3992                 if (ret != -EBUSY) {
3993                         rbd_warn(rbd_dev, "failed to lock header: %d", ret);
3994                         goto out;
3995                 }
3996
3997                 /* determine if the current lock holder is still alive */
3998                 locker = get_lock_owner_info(rbd_dev);
3999                 if (IS_ERR(locker)) {
4000                         ret = PTR_ERR(locker);
4001                         locker = NULL;
4002                         goto out;
4003                 }
4004                 if (!locker)
4005                         goto again;
4006
4007                 ret = find_watcher(rbd_dev, locker);
4008                 if (ret)
4009                         goto out; /* request lock or error */
4010
4011                 refreshed_locker = get_lock_owner_info(rbd_dev);
4012                 if (IS_ERR(refreshed_locker)) {
4013                         ret = PTR_ERR(refreshed_locker);
4014                         refreshed_locker = NULL;
4015                         goto out;
4016                 }
4017                 if (!refreshed_locker ||
4018                     !locker_equal(locker, refreshed_locker))
4019                         goto again;
4020
4021                 rbd_warn(rbd_dev, "breaking header lock owned by %s%llu",
4022                          ENTITY_NAME(locker->id.name));
4023
4024                 ret = ceph_monc_blocklist_add(&client->monc,
4025                                               &locker->info.addr);
4026                 if (ret) {
4027                         rbd_warn(rbd_dev, "failed to blocklist %s%llu: %d",
4028                                  ENTITY_NAME(locker->id.name), ret);
4029                         goto out;
4030                 }
4031
4032                 ret = ceph_cls_break_lock(&client->osdc, &rbd_dev->header_oid,
4033                                           &rbd_dev->header_oloc, RBD_LOCK_NAME,
4034                                           locker->id.cookie, &locker->id.name);
4035                 if (ret && ret != -ENOENT) {
4036                         rbd_warn(rbd_dev, "failed to break header lock: %d",
4037                                  ret);
4038                         goto out;
4039                 }
4040
4041 again:
4042                 free_locker(refreshed_locker);
4043                 free_locker(locker);
4044         }
4045
4046 out:
4047         free_locker(refreshed_locker);
4048         free_locker(locker);
4049         return ret;
4050 }
4051
4052 static int rbd_post_acquire_action(struct rbd_device *rbd_dev)
4053 {
4054         int ret;
4055
4056         ret = rbd_dev_refresh(rbd_dev);
4057         if (ret)
4058                 return ret;
4059
4060         if (rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP) {
4061                 ret = rbd_object_map_open(rbd_dev);
4062                 if (ret)
4063                         return ret;
4064         }
4065
4066         return 0;
4067 }
4068
4069 /*
4070  * Return:
4071  *   0 - lock acquired
4072  *   1 - caller should call rbd_request_lock()
4073  *  <0 - error
4074  */
4075 static int rbd_try_acquire_lock(struct rbd_device *rbd_dev)
4076 {
4077         int ret;
4078
4079         down_read(&rbd_dev->lock_rwsem);
4080         dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev,
4081              rbd_dev->lock_state);
4082         if (__rbd_is_lock_owner(rbd_dev)) {
4083                 up_read(&rbd_dev->lock_rwsem);
4084                 return 0;
4085         }
4086
4087         up_read(&rbd_dev->lock_rwsem);
4088         down_write(&rbd_dev->lock_rwsem);
4089         dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev,
4090              rbd_dev->lock_state);
4091         if (__rbd_is_lock_owner(rbd_dev)) {
4092                 up_write(&rbd_dev->lock_rwsem);
4093                 return 0;
4094         }
4095
4096         ret = rbd_try_lock(rbd_dev);
4097         if (ret < 0) {
4098                 rbd_warn(rbd_dev, "failed to acquire lock: %d", ret);
4099                 goto out;
4100         }
4101         if (ret > 0) {
4102                 up_write(&rbd_dev->lock_rwsem);
4103                 return ret;
4104         }
4105
4106         rbd_assert(rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED);
4107         rbd_assert(list_empty(&rbd_dev->running_list));
4108
4109         ret = rbd_post_acquire_action(rbd_dev);
4110         if (ret) {
4111                 rbd_warn(rbd_dev, "post-acquire action failed: %d", ret);
4112                 /*
4113                  * Can't stay in RBD_LOCK_STATE_LOCKED because
4114                  * rbd_lock_add_request() would let the request through,
4115                  * assuming that e.g. object map is locked and loaded.
4116                  */
4117                 rbd_unlock(rbd_dev);
4118         }
4119
4120 out:
4121         wake_lock_waiters(rbd_dev, ret);
4122         up_write(&rbd_dev->lock_rwsem);
4123         return ret;
4124 }
4125
4126 static void rbd_acquire_lock(struct work_struct *work)
4127 {
4128         struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
4129                                             struct rbd_device, lock_dwork);
4130         int ret;
4131
4132         dout("%s rbd_dev %p\n", __func__, rbd_dev);
4133 again:
4134         ret = rbd_try_acquire_lock(rbd_dev);
4135         if (ret <= 0) {
4136                 dout("%s rbd_dev %p ret %d - done\n", __func__, rbd_dev, ret);
4137                 return;
4138         }
4139
4140         ret = rbd_request_lock(rbd_dev);
4141         if (ret == -ETIMEDOUT) {
4142                 goto again; /* treat this as a dead client */
4143         } else if (ret == -EROFS) {
4144                 rbd_warn(rbd_dev, "peer will not release lock");
4145                 down_write(&rbd_dev->lock_rwsem);
4146                 wake_lock_waiters(rbd_dev, ret);
4147                 up_write(&rbd_dev->lock_rwsem);
4148         } else if (ret < 0) {
4149                 rbd_warn(rbd_dev, "error requesting lock: %d", ret);
4150                 mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
4151                                  RBD_RETRY_DELAY);
4152         } else {
4153                 /*
4154                  * lock owner acked, but resend if we don't see them
4155                  * release the lock
4156                  */
4157                 dout("%s rbd_dev %p requeuing lock_dwork\n", __func__,
4158                      rbd_dev);
4159                 mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
4160                     msecs_to_jiffies(2 * RBD_NOTIFY_TIMEOUT * MSEC_PER_SEC));
4161         }
4162 }
4163
4164 static bool rbd_quiesce_lock(struct rbd_device *rbd_dev)
4165 {
4166         dout("%s rbd_dev %p\n", __func__, rbd_dev);
4167         lockdep_assert_held_write(&rbd_dev->lock_rwsem);
4168
4169         if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED)
4170                 return false;
4171
4172         /*
4173          * Ensure that all in-flight IO is flushed.
4174          */
4175         rbd_dev->lock_state = RBD_LOCK_STATE_RELEASING;
4176         rbd_assert(!completion_done(&rbd_dev->releasing_wait));
4177         if (list_empty(&rbd_dev->running_list))
4178                 return true;
4179
4180         up_write(&rbd_dev->lock_rwsem);
4181         wait_for_completion(&rbd_dev->releasing_wait);
4182
4183         down_write(&rbd_dev->lock_rwsem);
4184         if (rbd_dev->lock_state != RBD_LOCK_STATE_RELEASING)
4185                 return false;
4186
4187         rbd_assert(list_empty(&rbd_dev->running_list));
4188         return true;
4189 }
4190
4191 static void rbd_pre_release_action(struct rbd_device *rbd_dev)
4192 {
4193         if (rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP)
4194                 rbd_object_map_close(rbd_dev);
4195 }
4196
4197 static void __rbd_release_lock(struct rbd_device *rbd_dev)
4198 {
4199         rbd_assert(list_empty(&rbd_dev->running_list));
4200
4201         rbd_pre_release_action(rbd_dev);
4202         rbd_unlock(rbd_dev);
4203 }
4204
4205 /*
4206  * lock_rwsem must be held for write
4207  */
4208 static void rbd_release_lock(struct rbd_device *rbd_dev)
4209 {
4210         if (!rbd_quiesce_lock(rbd_dev))
4211                 return;
4212
4213         __rbd_release_lock(rbd_dev);
4214
4215         /*
4216          * Give others a chance to grab the lock - we would re-acquire
4217          * almost immediately if we got new IO while draining the running
4218          * list otherwise.  We need to ack our own notifications, so this
4219          * lock_dwork will be requeued from rbd_handle_released_lock() by
4220          * way of maybe_kick_acquire().
4221          */
4222         cancel_delayed_work(&rbd_dev->lock_dwork);
4223 }
4224
4225 static void rbd_release_lock_work(struct work_struct *work)
4226 {
4227         struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
4228                                                   unlock_work);
4229
4230         down_write(&rbd_dev->lock_rwsem);
4231         rbd_release_lock(rbd_dev);
4232         up_write(&rbd_dev->lock_rwsem);
4233 }
4234
4235 static void maybe_kick_acquire(struct rbd_device *rbd_dev)
4236 {
4237         bool have_requests;
4238
4239         dout("%s rbd_dev %p\n", __func__, rbd_dev);
4240         if (__rbd_is_lock_owner(rbd_dev))
4241                 return;
4242
4243         spin_lock(&rbd_dev->lock_lists_lock);
4244         have_requests = !list_empty(&rbd_dev->acquiring_list);
4245         spin_unlock(&rbd_dev->lock_lists_lock);
4246         if (have_requests || delayed_work_pending(&rbd_dev->lock_dwork)) {
4247                 dout("%s rbd_dev %p kicking lock_dwork\n", __func__, rbd_dev);
4248                 mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
4249         }
4250 }
4251
4252 static void rbd_handle_acquired_lock(struct rbd_device *rbd_dev, u8 struct_v,
4253                                      void **p)
4254 {
4255         struct rbd_client_id cid = { 0 };
4256
4257         if (struct_v >= 2) {
4258                 cid.gid = ceph_decode_64(p);
4259                 cid.handle = ceph_decode_64(p);
4260         }
4261
4262         dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
4263              cid.handle);
4264         if (!rbd_cid_equal(&cid, &rbd_empty_cid)) {
4265                 down_write(&rbd_dev->lock_rwsem);
4266                 if (rbd_cid_equal(&cid, &rbd_dev->owner_cid)) {
4267                         dout("%s rbd_dev %p cid %llu-%llu == owner_cid\n",
4268                              __func__, rbd_dev, cid.gid, cid.handle);
4269                 } else {
4270                         rbd_set_owner_cid(rbd_dev, &cid);
4271                 }
4272                 downgrade_write(&rbd_dev->lock_rwsem);
4273         } else {
4274                 down_read(&rbd_dev->lock_rwsem);
4275         }
4276
4277         maybe_kick_acquire(rbd_dev);
4278         up_read(&rbd_dev->lock_rwsem);
4279 }
4280
4281 static void rbd_handle_released_lock(struct rbd_device *rbd_dev, u8 struct_v,
4282                                      void **p)
4283 {
4284         struct rbd_client_id cid = { 0 };
4285
4286         if (struct_v >= 2) {
4287                 cid.gid = ceph_decode_64(p);
4288                 cid.handle = ceph_decode_64(p);
4289         }
4290
4291         dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
4292              cid.handle);
4293         if (!rbd_cid_equal(&cid, &rbd_empty_cid)) {
4294                 down_write(&rbd_dev->lock_rwsem);
4295                 if (!rbd_cid_equal(&cid, &rbd_dev->owner_cid)) {
4296                         dout("%s rbd_dev %p cid %llu-%llu != owner_cid %llu-%llu\n",
4297                              __func__, rbd_dev, cid.gid, cid.handle,
4298                              rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle);
4299                 } else {
4300                         rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
4301                 }
4302                 downgrade_write(&rbd_dev->lock_rwsem);
4303         } else {
4304                 down_read(&rbd_dev->lock_rwsem);
4305         }
4306
4307         maybe_kick_acquire(rbd_dev);
4308         up_read(&rbd_dev->lock_rwsem);
4309 }
4310
4311 /*
4312  * Returns result for ResponseMessage to be encoded (<= 0), or 1 if no
4313  * ResponseMessage is needed.
4314  */
4315 static int rbd_handle_request_lock(struct rbd_device *rbd_dev, u8 struct_v,
4316                                    void **p)
4317 {
4318         struct rbd_client_id my_cid = rbd_get_cid(rbd_dev);
4319         struct rbd_client_id cid = { 0 };
4320         int result = 1;
4321
4322         if (struct_v >= 2) {
4323                 cid.gid = ceph_decode_64(p);
4324                 cid.handle = ceph_decode_64(p);
4325         }
4326
4327         dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
4328              cid.handle);
4329         if (rbd_cid_equal(&cid, &my_cid))
4330                 return result;
4331
4332         down_read(&rbd_dev->lock_rwsem);
4333         if (__rbd_is_lock_owner(rbd_dev)) {
4334                 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED &&
4335                     rbd_cid_equal(&rbd_dev->owner_cid, &rbd_empty_cid))
4336                         goto out_unlock;
4337
4338                 /*
4339                  * encode ResponseMessage(0) so the peer can detect
4340                  * a missing owner
4341                  */
4342                 result = 0;
4343
4344                 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED) {
4345                         if (!rbd_dev->opts->exclusive) {
4346                                 dout("%s rbd_dev %p queueing unlock_work\n",
4347                                      __func__, rbd_dev);
4348                                 queue_work(rbd_dev->task_wq,
4349                                            &rbd_dev->unlock_work);
4350                         } else {
4351                                 /* refuse to release the lock */
4352                                 result = -EROFS;
4353                         }
4354                 }
4355         }
4356
4357 out_unlock:
4358         up_read(&rbd_dev->lock_rwsem);
4359         return result;
4360 }
4361
4362 static void __rbd_acknowledge_notify(struct rbd_device *rbd_dev,
4363                                      u64 notify_id, u64 cookie, s32 *result)
4364 {
4365         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4366         char buf[4 + CEPH_ENCODING_START_BLK_LEN];
4367         int buf_size = sizeof(buf);
4368         int ret;
4369
4370         if (result) {
4371                 void *p = buf;
4372
4373                 /* encode ResponseMessage */
4374                 ceph_start_encoding(&p, 1, 1,
4375                                     buf_size - CEPH_ENCODING_START_BLK_LEN);
4376                 ceph_encode_32(&p, *result);
4377         } else {
4378                 buf_size = 0;
4379         }
4380
4381         ret = ceph_osdc_notify_ack(osdc, &rbd_dev->header_oid,
4382                                    &rbd_dev->header_oloc, notify_id, cookie,
4383                                    buf, buf_size);
4384         if (ret)
4385                 rbd_warn(rbd_dev, "acknowledge_notify failed: %d", ret);
4386 }
4387
4388 static void rbd_acknowledge_notify(struct rbd_device *rbd_dev, u64 notify_id,
4389                                    u64 cookie)
4390 {
4391         dout("%s rbd_dev %p\n", __func__, rbd_dev);
4392         __rbd_acknowledge_notify(rbd_dev, notify_id, cookie, NULL);
4393 }
4394
4395 static void rbd_acknowledge_notify_result(struct rbd_device *rbd_dev,
4396                                           u64 notify_id, u64 cookie, s32 result)
4397 {
4398         dout("%s rbd_dev %p result %d\n", __func__, rbd_dev, result);
4399         __rbd_acknowledge_notify(rbd_dev, notify_id, cookie, &result);
4400 }
4401
4402 static void rbd_watch_cb(void *arg, u64 notify_id, u64 cookie,
4403                          u64 notifier_id, void *data, size_t data_len)
4404 {
4405         struct rbd_device *rbd_dev = arg;
4406         void *p = data;
4407         void *const end = p + data_len;
4408         u8 struct_v = 0;
4409         u32 len;
4410         u32 notify_op;
4411         int ret;
4412
4413         dout("%s rbd_dev %p cookie %llu notify_id %llu data_len %zu\n",
4414              __func__, rbd_dev, cookie, notify_id, data_len);
4415         if (data_len) {
4416                 ret = ceph_start_decoding(&p, end, 1, "NotifyMessage",
4417                                           &struct_v, &len);
4418                 if (ret) {
4419                         rbd_warn(rbd_dev, "failed to decode NotifyMessage: %d",
4420                                  ret);
4421                         return;
4422                 }
4423
4424                 notify_op = ceph_decode_32(&p);
4425         } else {
4426                 /* legacy notification for header updates */
4427                 notify_op = RBD_NOTIFY_OP_HEADER_UPDATE;
4428                 len = 0;
4429         }
4430
4431         dout("%s rbd_dev %p notify_op %u\n", __func__, rbd_dev, notify_op);
4432         switch (notify_op) {
4433         case RBD_NOTIFY_OP_ACQUIRED_LOCK:
4434                 rbd_handle_acquired_lock(rbd_dev, struct_v, &p);
4435                 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
4436                 break;
4437         case RBD_NOTIFY_OP_RELEASED_LOCK:
4438                 rbd_handle_released_lock(rbd_dev, struct_v, &p);
4439                 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
4440                 break;
4441         case RBD_NOTIFY_OP_REQUEST_LOCK:
4442                 ret = rbd_handle_request_lock(rbd_dev, struct_v, &p);
4443                 if (ret <= 0)
4444                         rbd_acknowledge_notify_result(rbd_dev, notify_id,
4445                                                       cookie, ret);
4446                 else
4447                         rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
4448                 break;
4449         case RBD_NOTIFY_OP_HEADER_UPDATE:
4450                 ret = rbd_dev_refresh(rbd_dev);
4451                 if (ret)
4452                         rbd_warn(rbd_dev, "refresh failed: %d", ret);
4453
4454                 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
4455                 break;
4456         default:
4457                 if (rbd_is_lock_owner(rbd_dev))
4458                         rbd_acknowledge_notify_result(rbd_dev, notify_id,
4459                                                       cookie, -EOPNOTSUPP);
4460                 else
4461                         rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
4462                 break;
4463         }
4464 }
4465
4466 static void __rbd_unregister_watch(struct rbd_device *rbd_dev);
4467
4468 static void rbd_watch_errcb(void *arg, u64 cookie, int err)
4469 {
4470         struct rbd_device *rbd_dev = arg;
4471
4472         rbd_warn(rbd_dev, "encountered watch error: %d", err);
4473
4474         down_write(&rbd_dev->lock_rwsem);
4475         rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
4476         up_write(&rbd_dev->lock_rwsem);
4477
4478         mutex_lock(&rbd_dev->watch_mutex);
4479         if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED) {
4480                 __rbd_unregister_watch(rbd_dev);
4481                 rbd_dev->watch_state = RBD_WATCH_STATE_ERROR;
4482
4483                 queue_delayed_work(rbd_dev->task_wq, &rbd_dev->watch_dwork, 0);
4484         }
4485         mutex_unlock(&rbd_dev->watch_mutex);
4486 }
4487
4488 /*
4489  * watch_mutex must be locked
4490  */
4491 static int __rbd_register_watch(struct rbd_device *rbd_dev)
4492 {
4493         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4494         struct ceph_osd_linger_request *handle;
4495
4496         rbd_assert(!rbd_dev->watch_handle);
4497         dout("%s rbd_dev %p\n", __func__, rbd_dev);
4498
4499         handle = ceph_osdc_watch(osdc, &rbd_dev->header_oid,
4500                                  &rbd_dev->header_oloc, rbd_watch_cb,
4501                                  rbd_watch_errcb, rbd_dev);
4502         if (IS_ERR(handle))
4503                 return PTR_ERR(handle);
4504
4505         rbd_dev->watch_handle = handle;
4506         return 0;
4507 }
4508
4509 /*
4510  * watch_mutex must be locked
4511  */
4512 static void __rbd_unregister_watch(struct rbd_device *rbd_dev)
4513 {
4514         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4515         int ret;
4516
4517         rbd_assert(rbd_dev->watch_handle);
4518         dout("%s rbd_dev %p\n", __func__, rbd_dev);
4519
4520         ret = ceph_osdc_unwatch(osdc, rbd_dev->watch_handle);
4521         if (ret)
4522                 rbd_warn(rbd_dev, "failed to unwatch: %d", ret);
4523
4524         rbd_dev->watch_handle = NULL;
4525 }
4526
4527 static int rbd_register_watch(struct rbd_device *rbd_dev)
4528 {
4529         int ret;
4530
4531         mutex_lock(&rbd_dev->watch_mutex);
4532         rbd_assert(rbd_dev->watch_state == RBD_WATCH_STATE_UNREGISTERED);
4533         ret = __rbd_register_watch(rbd_dev);
4534         if (ret)
4535                 goto out;
4536
4537         rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
4538         rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
4539
4540 out:
4541         mutex_unlock(&rbd_dev->watch_mutex);
4542         return ret;
4543 }
4544
4545 static void cancel_tasks_sync(struct rbd_device *rbd_dev)
4546 {
4547         dout("%s rbd_dev %p\n", __func__, rbd_dev);
4548
4549         cancel_work_sync(&rbd_dev->acquired_lock_work);
4550         cancel_work_sync(&rbd_dev->released_lock_work);
4551         cancel_delayed_work_sync(&rbd_dev->lock_dwork);
4552         cancel_work_sync(&rbd_dev->unlock_work);
4553 }
4554
4555 /*
4556  * header_rwsem must not be held to avoid a deadlock with
4557  * rbd_dev_refresh() when flushing notifies.
4558  */
4559 static void rbd_unregister_watch(struct rbd_device *rbd_dev)
4560 {
4561         cancel_tasks_sync(rbd_dev);
4562
4563         mutex_lock(&rbd_dev->watch_mutex);
4564         if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED)
4565                 __rbd_unregister_watch(rbd_dev);
4566         rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED;
4567         mutex_unlock(&rbd_dev->watch_mutex);
4568
4569         cancel_delayed_work_sync(&rbd_dev->watch_dwork);
4570         ceph_osdc_flush_notifies(&rbd_dev->rbd_client->client->osdc);
4571 }
4572
4573 /*
4574  * lock_rwsem must be held for write
4575  */
4576 static void rbd_reacquire_lock(struct rbd_device *rbd_dev)
4577 {
4578         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4579         char cookie[32];
4580         int ret;
4581
4582         if (!rbd_quiesce_lock(rbd_dev))
4583                 return;
4584
4585         format_lock_cookie(rbd_dev, cookie);
4586         ret = ceph_cls_set_cookie(osdc, &rbd_dev->header_oid,
4587                                   &rbd_dev->header_oloc, RBD_LOCK_NAME,
4588                                   CEPH_CLS_LOCK_EXCLUSIVE, rbd_dev->lock_cookie,
4589                                   RBD_LOCK_TAG, cookie);
4590         if (ret) {
4591                 if (ret != -EOPNOTSUPP)
4592                         rbd_warn(rbd_dev, "failed to update lock cookie: %d",
4593                                  ret);
4594
4595                 /*
4596                  * Lock cookie cannot be updated on older OSDs, so do
4597                  * a manual release and queue an acquire.
4598                  */
4599                 __rbd_release_lock(rbd_dev);
4600                 queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
4601         } else {
4602                 __rbd_lock(rbd_dev, cookie);
4603                 wake_lock_waiters(rbd_dev, 0);
4604         }
4605 }
4606
4607 static void rbd_reregister_watch(struct work_struct *work)
4608 {
4609         struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
4610                                             struct rbd_device, watch_dwork);
4611         int ret;
4612
4613         dout("%s rbd_dev %p\n", __func__, rbd_dev);
4614
4615         mutex_lock(&rbd_dev->watch_mutex);
4616         if (rbd_dev->watch_state != RBD_WATCH_STATE_ERROR) {
4617                 mutex_unlock(&rbd_dev->watch_mutex);
4618                 return;
4619         }
4620
4621         ret = __rbd_register_watch(rbd_dev);
4622         if (ret) {
4623                 rbd_warn(rbd_dev, "failed to reregister watch: %d", ret);
4624                 if (ret != -EBLOCKLISTED && ret != -ENOENT) {
4625                         queue_delayed_work(rbd_dev->task_wq,
4626                                            &rbd_dev->watch_dwork,
4627                                            RBD_RETRY_DELAY);
4628                         mutex_unlock(&rbd_dev->watch_mutex);
4629                         return;
4630                 }
4631
4632                 mutex_unlock(&rbd_dev->watch_mutex);
4633                 down_write(&rbd_dev->lock_rwsem);
4634                 wake_lock_waiters(rbd_dev, ret);
4635                 up_write(&rbd_dev->lock_rwsem);
4636                 return;
4637         }
4638
4639         rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
4640         rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
4641         mutex_unlock(&rbd_dev->watch_mutex);
4642
4643         down_write(&rbd_dev->lock_rwsem);
4644         if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED)
4645                 rbd_reacquire_lock(rbd_dev);
4646         up_write(&rbd_dev->lock_rwsem);
4647
4648         ret = rbd_dev_refresh(rbd_dev);
4649         if (ret)
4650                 rbd_warn(rbd_dev, "reregistration refresh failed: %d", ret);
4651 }
4652
4653 /*
4654  * Synchronous osd object method call.  Returns the number of bytes
4655  * returned in the outbound buffer, or a negative error code.
4656  */
4657 static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
4658                              struct ceph_object_id *oid,
4659                              struct ceph_object_locator *oloc,
4660                              const char *method_name,
4661                              const void *outbound,
4662                              size_t outbound_size,
4663                              void *inbound,
4664                              size_t inbound_size)
4665 {
4666         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4667         struct page *req_page = NULL;
4668         struct page *reply_page;
4669         int ret;
4670
4671         /*
4672          * Method calls are ultimately read operations.  The result
4673          * should placed into the inbound buffer provided.  They
4674          * also supply outbound data--parameters for the object
4675          * method.  Currently if this is present it will be a
4676          * snapshot id.
4677          */
4678         if (outbound) {
4679                 if (outbound_size > PAGE_SIZE)
4680                         return -E2BIG;
4681
4682                 req_page = alloc_page(GFP_KERNEL);
4683                 if (!req_page)
4684                         return -ENOMEM;
4685
4686                 memcpy(page_address(req_page), outbound, outbound_size);
4687         }
4688
4689         reply_page = alloc_page(GFP_KERNEL);
4690         if (!reply_page) {
4691                 if (req_page)
4692                         __free_page(req_page);
4693                 return -ENOMEM;
4694         }
4695
4696         ret = ceph_osdc_call(osdc, oid, oloc, RBD_DRV_NAME, method_name,
4697                              CEPH_OSD_FLAG_READ, req_page, outbound_size,
4698                              &reply_page, &inbound_size);
4699         if (!ret) {
4700                 memcpy(inbound, page_address(reply_page), inbound_size);
4701                 ret = inbound_size;
4702         }
4703
4704         if (req_page)
4705                 __free_page(req_page);
4706         __free_page(reply_page);
4707         return ret;
4708 }
4709
4710 static void rbd_queue_workfn(struct work_struct *work)
4711 {
4712         struct rbd_img_request *img_request =
4713             container_of(work, struct rbd_img_request, work);
4714         struct rbd_device *rbd_dev = img_request->rbd_dev;
4715         enum obj_operation_type op_type = img_request->op_type;
4716         struct request *rq = blk_mq_rq_from_pdu(img_request);
4717         u64 offset = (u64)blk_rq_pos(rq) << SECTOR_SHIFT;
4718         u64 length = blk_rq_bytes(rq);
4719         u64 mapping_size;
4720         int result;
4721
4722         /* Ignore/skip any zero-length requests */
4723         if (!length) {
4724                 dout("%s: zero-length request\n", __func__);
4725                 result = 0;
4726                 goto err_img_request;
4727         }
4728
4729         blk_mq_start_request(rq);
4730
4731         down_read(&rbd_dev->header_rwsem);
4732         mapping_size = rbd_dev->mapping.size;
4733         rbd_img_capture_header(img_request);
4734         up_read(&rbd_dev->header_rwsem);
4735
4736         if (offset + length > mapping_size) {
4737                 rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)", offset,
4738                          length, mapping_size);
4739                 result = -EIO;
4740                 goto err_img_request;
4741         }
4742
4743         dout("%s rbd_dev %p img_req %p %s %llu~%llu\n", __func__, rbd_dev,
4744              img_request, obj_op_name(op_type), offset, length);
4745
4746         if (op_type == OBJ_OP_DISCARD || op_type == OBJ_OP_ZEROOUT)
4747                 result = rbd_img_fill_nodata(img_request, offset, length);
4748         else
4749                 result = rbd_img_fill_from_bio(img_request, offset, length,
4750                                                rq->bio);
4751         if (result)
4752                 goto err_img_request;
4753
4754         rbd_img_handle_request(img_request, 0);
4755         return;
4756
4757 err_img_request:
4758         rbd_img_request_destroy(img_request);
4759         if (result)
4760                 rbd_warn(rbd_dev, "%s %llx at %llx result %d",
4761                          obj_op_name(op_type), length, offset, result);
4762         blk_mq_end_request(rq, errno_to_blk_status(result));
4763 }
4764
4765 static blk_status_t rbd_queue_rq(struct blk_mq_hw_ctx *hctx,
4766                 const struct blk_mq_queue_data *bd)
4767 {
4768         struct rbd_device *rbd_dev = hctx->queue->queuedata;
4769         struct rbd_img_request *img_req = blk_mq_rq_to_pdu(bd->rq);
4770         enum obj_operation_type op_type;
4771
4772         switch (req_op(bd->rq)) {
4773         case REQ_OP_DISCARD:
4774                 op_type = OBJ_OP_DISCARD;
4775                 break;
4776         case REQ_OP_WRITE_ZEROES:
4777                 op_type = OBJ_OP_ZEROOUT;
4778                 break;
4779         case REQ_OP_WRITE:
4780                 op_type = OBJ_OP_WRITE;
4781                 break;
4782         case REQ_OP_READ:
4783                 op_type = OBJ_OP_READ;
4784                 break;
4785         default:
4786                 rbd_warn(rbd_dev, "unknown req_op %d", req_op(bd->rq));
4787                 return BLK_STS_IOERR;
4788         }
4789
4790         rbd_img_request_init(img_req, rbd_dev, op_type);
4791
4792         if (rbd_img_is_write(img_req)) {
4793                 if (rbd_is_ro(rbd_dev)) {
4794                         rbd_warn(rbd_dev, "%s on read-only mapping",
4795                                  obj_op_name(img_req->op_type));
4796                         return BLK_STS_IOERR;
4797                 }
4798                 rbd_assert(!rbd_is_snap(rbd_dev));
4799         }
4800
4801         INIT_WORK(&img_req->work, rbd_queue_workfn);
4802         queue_work(rbd_wq, &img_req->work);
4803         return BLK_STS_OK;
4804 }
4805
4806 static void rbd_free_disk(struct rbd_device *rbd_dev)
4807 {
4808         put_disk(rbd_dev->disk);
4809         blk_mq_free_tag_set(&rbd_dev->tag_set);
4810         rbd_dev->disk = NULL;
4811 }
4812
4813 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
4814                              struct ceph_object_id *oid,
4815                              struct ceph_object_locator *oloc,
4816                              void *buf, int buf_len)
4817
4818 {
4819         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4820         struct ceph_osd_request *req;
4821         struct page **pages;
4822         int num_pages = calc_pages_for(0, buf_len);
4823         int ret;
4824
4825         req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_KERNEL);
4826         if (!req)
4827                 return -ENOMEM;
4828
4829         ceph_oid_copy(&req->r_base_oid, oid);
4830         ceph_oloc_copy(&req->r_base_oloc, oloc);
4831         req->r_flags = CEPH_OSD_FLAG_READ;
4832
4833         pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
4834         if (IS_ERR(pages)) {
4835                 ret = PTR_ERR(pages);
4836                 goto out_req;
4837         }
4838
4839         osd_req_op_extent_init(req, 0, CEPH_OSD_OP_READ, 0, buf_len, 0, 0);
4840         osd_req_op_extent_osd_data_pages(req, 0, pages, buf_len, 0, false,
4841                                          true);
4842
4843         ret = ceph_osdc_alloc_messages(req, GFP_KERNEL);
4844         if (ret)
4845                 goto out_req;
4846
4847         ceph_osdc_start_request(osdc, req);
4848         ret = ceph_osdc_wait_request(osdc, req);
4849         if (ret >= 0)
4850                 ceph_copy_from_page_vector(pages, buf, 0, ret);
4851
4852 out_req:
4853         ceph_osdc_put_request(req);
4854         return ret;
4855 }
4856
4857 /*
4858  * Read the complete header for the given rbd device.  On successful
4859  * return, the rbd_dev->header field will contain up-to-date
4860  * information about the image.
4861  */
4862 static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev)
4863 {
4864         struct rbd_image_header_ondisk *ondisk = NULL;
4865         u32 snap_count = 0;
4866         u64 names_size = 0;
4867         u32 want_count;
4868         int ret;
4869
4870         /*
4871          * The complete header will include an array of its 64-bit
4872          * snapshot ids, followed by the names of those snapshots as
4873          * a contiguous block of NUL-terminated strings.  Note that
4874          * the number of snapshots could change by the time we read
4875          * it in, in which case we re-read it.
4876          */
4877         do {
4878                 size_t size;
4879
4880                 kfree(ondisk);
4881
4882                 size = sizeof (*ondisk);
4883                 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
4884                 size += names_size;
4885                 ondisk = kmalloc(size, GFP_KERNEL);
4886                 if (!ondisk)
4887                         return -ENOMEM;
4888
4889                 ret = rbd_obj_read_sync(rbd_dev, &rbd_dev->header_oid,
4890                                         &rbd_dev->header_oloc, ondisk, size);
4891                 if (ret < 0)
4892                         goto out;
4893                 if ((size_t)ret < size) {
4894                         ret = -ENXIO;
4895                         rbd_warn(rbd_dev, "short header read (want %zd got %d)",
4896                                 size, ret);
4897                         goto out;
4898                 }
4899                 if (!rbd_dev_ondisk_valid(ondisk)) {
4900                         ret = -ENXIO;
4901                         rbd_warn(rbd_dev, "invalid header");
4902                         goto out;
4903                 }
4904
4905                 names_size = le64_to_cpu(ondisk->snap_names_len);
4906                 want_count = snap_count;
4907                 snap_count = le32_to_cpu(ondisk->snap_count);
4908         } while (snap_count != want_count);
4909
4910         ret = rbd_header_from_disk(rbd_dev, ondisk);
4911 out:
4912         kfree(ondisk);
4913
4914         return ret;
4915 }
4916
4917 static void rbd_dev_update_size(struct rbd_device *rbd_dev)
4918 {
4919         sector_t size;
4920
4921         /*
4922          * If EXISTS is not set, rbd_dev->disk may be NULL, so don't
4923          * try to update its size.  If REMOVING is set, updating size
4924          * is just useless work since the device can't be opened.
4925          */
4926         if (test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags) &&
4927             !test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags)) {
4928                 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
4929                 dout("setting size to %llu sectors", (unsigned long long)size);
4930                 set_capacity_and_notify(rbd_dev->disk, size);
4931         }
4932 }
4933
4934 static int rbd_dev_refresh(struct rbd_device *rbd_dev)
4935 {
4936         u64 mapping_size;
4937         int ret;
4938
4939         down_write(&rbd_dev->header_rwsem);
4940         mapping_size = rbd_dev->mapping.size;
4941
4942         ret = rbd_dev_header_info(rbd_dev);
4943         if (ret)
4944                 goto out;
4945
4946         /*
4947          * If there is a parent, see if it has disappeared due to the
4948          * mapped image getting flattened.
4949          */
4950         if (rbd_dev->parent) {
4951                 ret = rbd_dev_v2_parent_info(rbd_dev);
4952                 if (ret)
4953                         goto out;
4954         }
4955
4956         rbd_assert(!rbd_is_snap(rbd_dev));
4957         rbd_dev->mapping.size = rbd_dev->header.image_size;
4958
4959 out:
4960         up_write(&rbd_dev->header_rwsem);
4961         if (!ret && mapping_size != rbd_dev->mapping.size)
4962                 rbd_dev_update_size(rbd_dev);
4963
4964         return ret;
4965 }
4966
4967 static const struct blk_mq_ops rbd_mq_ops = {
4968         .queue_rq       = rbd_queue_rq,
4969 };
4970
4971 static int rbd_init_disk(struct rbd_device *rbd_dev)
4972 {
4973         struct gendisk *disk;
4974         struct request_queue *q;
4975         unsigned int objset_bytes =
4976             rbd_dev->layout.object_size * rbd_dev->layout.stripe_count;
4977         int err;
4978
4979         memset(&rbd_dev->tag_set, 0, sizeof(rbd_dev->tag_set));
4980         rbd_dev->tag_set.ops = &rbd_mq_ops;
4981         rbd_dev->tag_set.queue_depth = rbd_dev->opts->queue_depth;
4982         rbd_dev->tag_set.numa_node = NUMA_NO_NODE;
4983         rbd_dev->tag_set.flags = BLK_MQ_F_SHOULD_MERGE;
4984         rbd_dev->tag_set.nr_hw_queues = num_present_cpus();
4985         rbd_dev->tag_set.cmd_size = sizeof(struct rbd_img_request);
4986
4987         err = blk_mq_alloc_tag_set(&rbd_dev->tag_set);
4988         if (err)
4989                 return err;
4990
4991         disk = blk_mq_alloc_disk(&rbd_dev->tag_set, rbd_dev);
4992         if (IS_ERR(disk)) {
4993                 err = PTR_ERR(disk);
4994                 goto out_tag_set;
4995         }
4996         q = disk->queue;
4997
4998         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
4999                  rbd_dev->dev_id);
5000         disk->major = rbd_dev->major;
5001         disk->first_minor = rbd_dev->minor;
5002         if (single_major)
5003                 disk->minors = (1 << RBD_SINGLE_MAJOR_PART_SHIFT);
5004         else
5005                 disk->minors = RBD_MINORS_PER_MAJOR;
5006         disk->fops = &rbd_bd_ops;
5007         disk->private_data = rbd_dev;
5008
5009         blk_queue_flag_set(QUEUE_FLAG_NONROT, q);
5010         /* QUEUE_FLAG_ADD_RANDOM is off by default for blk-mq */
5011
5012         blk_queue_max_hw_sectors(q, objset_bytes >> SECTOR_SHIFT);
5013         q->limits.max_sectors = queue_max_hw_sectors(q);
5014         blk_queue_max_segments(q, USHRT_MAX);
5015         blk_queue_max_segment_size(q, UINT_MAX);
5016         blk_queue_io_min(q, rbd_dev->opts->alloc_size);
5017         blk_queue_io_opt(q, rbd_dev->opts->alloc_size);
5018
5019         if (rbd_dev->opts->trim) {
5020                 q->limits.discard_granularity = rbd_dev->opts->alloc_size;
5021                 blk_queue_max_discard_sectors(q, objset_bytes >> SECTOR_SHIFT);
5022                 blk_queue_max_write_zeroes_sectors(q, objset_bytes >> SECTOR_SHIFT);
5023         }
5024
5025         if (!ceph_test_opt(rbd_dev->rbd_client->client, NOCRC))
5026                 blk_queue_flag_set(QUEUE_FLAG_STABLE_WRITES, q);
5027
5028         rbd_dev->disk = disk;
5029
5030         return 0;
5031 out_tag_set:
5032         blk_mq_free_tag_set(&rbd_dev->tag_set);
5033         return err;
5034 }
5035
5036 /*
5037   sysfs
5038 */
5039
5040 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
5041 {
5042         return container_of(dev, struct rbd_device, dev);
5043 }
5044
5045 static ssize_t rbd_size_show(struct device *dev,
5046                              struct device_attribute *attr, char *buf)
5047 {
5048         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5049
5050         return sprintf(buf, "%llu\n",
5051                 (unsigned long long)rbd_dev->mapping.size);
5052 }
5053
5054 static ssize_t rbd_features_show(struct device *dev,
5055                              struct device_attribute *attr, char *buf)
5056 {
5057         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5058
5059         return sprintf(buf, "0x%016llx\n", rbd_dev->header.features);
5060 }
5061
5062 static ssize_t rbd_major_show(struct device *dev,
5063                               struct device_attribute *attr, char *buf)
5064 {
5065         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5066
5067         if (rbd_dev->major)
5068                 return sprintf(buf, "%d\n", rbd_dev->major);
5069
5070         return sprintf(buf, "(none)\n");
5071 }
5072
5073 static ssize_t rbd_minor_show(struct device *dev,
5074                               struct device_attribute *attr, char *buf)
5075 {
5076         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5077
5078         return sprintf(buf, "%d\n", rbd_dev->minor);
5079 }
5080
5081 static ssize_t rbd_client_addr_show(struct device *dev,
5082                                     struct device_attribute *attr, char *buf)
5083 {
5084         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5085         struct ceph_entity_addr *client_addr =
5086             ceph_client_addr(rbd_dev->rbd_client->client);
5087
5088         return sprintf(buf, "%pISpc/%u\n", &client_addr->in_addr,
5089                        le32_to_cpu(client_addr->nonce));
5090 }
5091
5092 static ssize_t rbd_client_id_show(struct device *dev,
5093                                   struct device_attribute *attr, char *buf)
5094 {
5095         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5096
5097         return sprintf(buf, "client%lld\n",
5098                        ceph_client_gid(rbd_dev->rbd_client->client));
5099 }
5100
5101 static ssize_t rbd_cluster_fsid_show(struct device *dev,
5102                                      struct device_attribute *attr, char *buf)
5103 {
5104         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5105
5106         return sprintf(buf, "%pU\n", &rbd_dev->rbd_client->client->fsid);
5107 }
5108
5109 static ssize_t rbd_config_info_show(struct device *dev,
5110                                     struct device_attribute *attr, char *buf)
5111 {
5112         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5113
5114         if (!capable(CAP_SYS_ADMIN))
5115                 return -EPERM;
5116
5117         return sprintf(buf, "%s\n", rbd_dev->config_info);
5118 }
5119
5120 static ssize_t rbd_pool_show(struct device *dev,
5121                              struct device_attribute *attr, char *buf)
5122 {
5123         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5124
5125         return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
5126 }
5127
5128 static ssize_t rbd_pool_id_show(struct device *dev,
5129                              struct device_attribute *attr, char *buf)
5130 {
5131         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5132
5133         return sprintf(buf, "%llu\n",
5134                         (unsigned long long) rbd_dev->spec->pool_id);
5135 }
5136
5137 static ssize_t rbd_pool_ns_show(struct device *dev,
5138                                 struct device_attribute *attr, char *buf)
5139 {
5140         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5141
5142         return sprintf(buf, "%s\n", rbd_dev->spec->pool_ns ?: "");
5143 }
5144
5145 static ssize_t rbd_name_show(struct device *dev,
5146                              struct device_attribute *attr, char *buf)
5147 {
5148         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5149
5150         if (rbd_dev->spec->image_name)
5151                 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
5152
5153         return sprintf(buf, "(unknown)\n");
5154 }
5155
5156 static ssize_t rbd_image_id_show(struct device *dev,
5157                              struct device_attribute *attr, char *buf)
5158 {
5159         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5160
5161         return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
5162 }
5163
5164 /*
5165  * Shows the name of the currently-mapped snapshot (or
5166  * RBD_SNAP_HEAD_NAME for the base image).
5167  */
5168 static ssize_t rbd_snap_show(struct device *dev,
5169                              struct device_attribute *attr,
5170                              char *buf)
5171 {
5172         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5173
5174         return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
5175 }
5176
5177 static ssize_t rbd_snap_id_show(struct device *dev,
5178                                 struct device_attribute *attr, char *buf)
5179 {
5180         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5181
5182         return sprintf(buf, "%llu\n", rbd_dev->spec->snap_id);
5183 }
5184
5185 /*
5186  * For a v2 image, shows the chain of parent images, separated by empty
5187  * lines.  For v1 images or if there is no parent, shows "(no parent
5188  * image)".
5189  */
5190 static ssize_t rbd_parent_show(struct device *dev,
5191                                struct device_attribute *attr,
5192                                char *buf)
5193 {
5194         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5195         ssize_t count = 0;
5196
5197         if (!rbd_dev->parent)
5198                 return sprintf(buf, "(no parent image)\n");
5199
5200         for ( ; rbd_dev->parent; rbd_dev = rbd_dev->parent) {
5201                 struct rbd_spec *spec = rbd_dev->parent_spec;
5202
5203                 count += sprintf(&buf[count], "%s"
5204                             "pool_id %llu\npool_name %s\n"
5205                             "pool_ns %s\n"
5206                             "image_id %s\nimage_name %s\n"
5207                             "snap_id %llu\nsnap_name %s\n"
5208                             "overlap %llu\n",
5209                             !count ? "" : "\n", /* first? */
5210                             spec->pool_id, spec->pool_name,
5211                             spec->pool_ns ?: "",
5212                             spec->image_id, spec->image_name ?: "(unknown)",
5213                             spec->snap_id, spec->snap_name,
5214                             rbd_dev->parent_overlap);
5215         }
5216
5217         return count;
5218 }
5219
5220 static ssize_t rbd_image_refresh(struct device *dev,
5221                                  struct device_attribute *attr,
5222                                  const char *buf,
5223                                  size_t size)
5224 {
5225         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5226         int ret;
5227
5228         if (!capable(CAP_SYS_ADMIN))
5229                 return -EPERM;
5230
5231         ret = rbd_dev_refresh(rbd_dev);
5232         if (ret)
5233                 return ret;
5234
5235         return size;
5236 }
5237
5238 static DEVICE_ATTR(size, 0444, rbd_size_show, NULL);
5239 static DEVICE_ATTR(features, 0444, rbd_features_show, NULL);
5240 static DEVICE_ATTR(major, 0444, rbd_major_show, NULL);
5241 static DEVICE_ATTR(minor, 0444, rbd_minor_show, NULL);
5242 static DEVICE_ATTR(client_addr, 0444, rbd_client_addr_show, NULL);
5243 static DEVICE_ATTR(client_id, 0444, rbd_client_id_show, NULL);
5244 static DEVICE_ATTR(cluster_fsid, 0444, rbd_cluster_fsid_show, NULL);
5245 static DEVICE_ATTR(config_info, 0400, rbd_config_info_show, NULL);
5246 static DEVICE_ATTR(pool, 0444, rbd_pool_show, NULL);
5247 static DEVICE_ATTR(pool_id, 0444, rbd_pool_id_show, NULL);
5248 static DEVICE_ATTR(pool_ns, 0444, rbd_pool_ns_show, NULL);
5249 static DEVICE_ATTR(name, 0444, rbd_name_show, NULL);
5250 static DEVICE_ATTR(image_id, 0444, rbd_image_id_show, NULL);
5251 static DEVICE_ATTR(refresh, 0200, NULL, rbd_image_refresh);
5252 static DEVICE_ATTR(current_snap, 0444, rbd_snap_show, NULL);
5253 static DEVICE_ATTR(snap_id, 0444, rbd_snap_id_show, NULL);
5254 static DEVICE_ATTR(parent, 0444, rbd_parent_show, NULL);
5255
5256 static struct attribute *rbd_attrs[] = {
5257         &dev_attr_size.attr,
5258         &dev_attr_features.attr,
5259         &dev_attr_major.attr,
5260         &dev_attr_minor.attr,
5261         &dev_attr_client_addr.attr,
5262         &dev_attr_client_id.attr,
5263         &dev_attr_cluster_fsid.attr,
5264         &dev_attr_config_info.attr,
5265         &dev_attr_pool.attr,
5266         &dev_attr_pool_id.attr,
5267         &dev_attr_pool_ns.attr,
5268         &dev_attr_name.attr,
5269         &dev_attr_image_id.attr,
5270         &dev_attr_current_snap.attr,
5271         &dev_attr_snap_id.attr,
5272         &dev_attr_parent.attr,
5273         &dev_attr_refresh.attr,
5274         NULL
5275 };
5276
5277 static struct attribute_group rbd_attr_group = {
5278         .attrs = rbd_attrs,
5279 };
5280
5281 static const struct attribute_group *rbd_attr_groups[] = {
5282         &rbd_attr_group,
5283         NULL
5284 };
5285
5286 static void rbd_dev_release(struct device *dev);
5287
5288 static const struct device_type rbd_device_type = {
5289         .name           = "rbd",
5290         .groups         = rbd_attr_groups,
5291         .release        = rbd_dev_release,
5292 };
5293
5294 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
5295 {
5296         kref_get(&spec->kref);
5297
5298         return spec;
5299 }
5300
5301 static void rbd_spec_free(struct kref *kref);
5302 static void rbd_spec_put(struct rbd_spec *spec)
5303 {
5304         if (spec)
5305                 kref_put(&spec->kref, rbd_spec_free);
5306 }
5307
5308 static struct rbd_spec *rbd_spec_alloc(void)
5309 {
5310         struct rbd_spec *spec;
5311
5312         spec = kzalloc(sizeof (*spec), GFP_KERNEL);
5313         if (!spec)
5314                 return NULL;
5315
5316         spec->pool_id = CEPH_NOPOOL;
5317         spec->snap_id = CEPH_NOSNAP;
5318         kref_init(&spec->kref);
5319
5320         return spec;
5321 }
5322
5323 static void rbd_spec_free(struct kref *kref)
5324 {
5325         struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
5326
5327         kfree(spec->pool_name);
5328         kfree(spec->pool_ns);
5329         kfree(spec->image_id);
5330         kfree(spec->image_name);
5331         kfree(spec->snap_name);
5332         kfree(spec);
5333 }
5334
5335 static void rbd_dev_free(struct rbd_device *rbd_dev)
5336 {
5337         WARN_ON(rbd_dev->watch_state != RBD_WATCH_STATE_UNREGISTERED);
5338         WARN_ON(rbd_dev->lock_state != RBD_LOCK_STATE_UNLOCKED);
5339
5340         ceph_oid_destroy(&rbd_dev->header_oid);
5341         ceph_oloc_destroy(&rbd_dev->header_oloc);
5342         kfree(rbd_dev->config_info);
5343
5344         rbd_put_client(rbd_dev->rbd_client);
5345         rbd_spec_put(rbd_dev->spec);
5346         kfree(rbd_dev->opts);
5347         kfree(rbd_dev);
5348 }
5349
5350 static void rbd_dev_release(struct device *dev)
5351 {
5352         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5353         bool need_put = !!rbd_dev->opts;
5354
5355         if (need_put) {
5356                 destroy_workqueue(rbd_dev->task_wq);
5357                 ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
5358         }
5359
5360         rbd_dev_free(rbd_dev);
5361
5362         /*
5363          * This is racy, but way better than putting module outside of
5364          * the release callback.  The race window is pretty small, so
5365          * doing something similar to dm (dm-builtin.c) is overkill.
5366          */
5367         if (need_put)
5368                 module_put(THIS_MODULE);
5369 }
5370
5371 static struct rbd_device *__rbd_dev_create(struct rbd_spec *spec)
5372 {
5373         struct rbd_device *rbd_dev;
5374
5375         rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
5376         if (!rbd_dev)
5377                 return NULL;
5378
5379         spin_lock_init(&rbd_dev->lock);
5380         INIT_LIST_HEAD(&rbd_dev->node);
5381         init_rwsem(&rbd_dev->header_rwsem);
5382
5383         rbd_dev->header.data_pool_id = CEPH_NOPOOL;
5384         ceph_oid_init(&rbd_dev->header_oid);
5385         rbd_dev->header_oloc.pool = spec->pool_id;
5386         if (spec->pool_ns) {
5387                 WARN_ON(!*spec->pool_ns);
5388                 rbd_dev->header_oloc.pool_ns =
5389                     ceph_find_or_create_string(spec->pool_ns,
5390                                                strlen(spec->pool_ns));
5391         }
5392
5393         mutex_init(&rbd_dev->watch_mutex);
5394         rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED;
5395         INIT_DELAYED_WORK(&rbd_dev->watch_dwork, rbd_reregister_watch);
5396
5397         init_rwsem(&rbd_dev->lock_rwsem);
5398         rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
5399         INIT_WORK(&rbd_dev->acquired_lock_work, rbd_notify_acquired_lock);
5400         INIT_WORK(&rbd_dev->released_lock_work, rbd_notify_released_lock);
5401         INIT_DELAYED_WORK(&rbd_dev->lock_dwork, rbd_acquire_lock);
5402         INIT_WORK(&rbd_dev->unlock_work, rbd_release_lock_work);
5403         spin_lock_init(&rbd_dev->lock_lists_lock);
5404         INIT_LIST_HEAD(&rbd_dev->acquiring_list);
5405         INIT_LIST_HEAD(&rbd_dev->running_list);
5406         init_completion(&rbd_dev->acquire_wait);
5407         init_completion(&rbd_dev->releasing_wait);
5408
5409         spin_lock_init(&rbd_dev->object_map_lock);
5410
5411         rbd_dev->dev.bus = &rbd_bus_type;
5412         rbd_dev->dev.type = &rbd_device_type;
5413         rbd_dev->dev.parent = &rbd_root_dev;
5414         device_initialize(&rbd_dev->dev);
5415
5416         return rbd_dev;
5417 }
5418
5419 /*
5420  * Create a mapping rbd_dev.
5421  */
5422 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
5423                                          struct rbd_spec *spec,
5424                                          struct rbd_options *opts)
5425 {
5426         struct rbd_device *rbd_dev;
5427
5428         rbd_dev = __rbd_dev_create(spec);
5429         if (!rbd_dev)
5430                 return NULL;
5431
5432         /* get an id and fill in device name */
5433         rbd_dev->dev_id = ida_simple_get(&rbd_dev_id_ida, 0,
5434                                          minor_to_rbd_dev_id(1 << MINORBITS),
5435                                          GFP_KERNEL);
5436         if (rbd_dev->dev_id < 0)
5437                 goto fail_rbd_dev;
5438
5439         sprintf(rbd_dev->name, RBD_DRV_NAME "%d", rbd_dev->dev_id);
5440         rbd_dev->task_wq = alloc_ordered_workqueue("%s-tasks", WQ_MEM_RECLAIM,
5441                                                    rbd_dev->name);
5442         if (!rbd_dev->task_wq)
5443                 goto fail_dev_id;
5444
5445         /* we have a ref from do_rbd_add() */
5446         __module_get(THIS_MODULE);
5447
5448         rbd_dev->rbd_client = rbdc;
5449         rbd_dev->spec = spec;
5450         rbd_dev->opts = opts;
5451
5452         dout("%s rbd_dev %p dev_id %d\n", __func__, rbd_dev, rbd_dev->dev_id);
5453         return rbd_dev;
5454
5455 fail_dev_id:
5456         ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
5457 fail_rbd_dev:
5458         rbd_dev_free(rbd_dev);
5459         return NULL;
5460 }
5461
5462 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
5463 {
5464         if (rbd_dev)
5465                 put_device(&rbd_dev->dev);
5466 }
5467
5468 /*
5469  * Get the size and object order for an image snapshot, or if
5470  * snap_id is CEPH_NOSNAP, gets this information for the base
5471  * image.
5472  */
5473 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
5474                                 u8 *order, u64 *snap_size)
5475 {
5476         __le64 snapid = cpu_to_le64(snap_id);
5477         int ret;
5478         struct {
5479                 u8 order;
5480                 __le64 size;
5481         } __attribute__ ((packed)) size_buf = { 0 };
5482
5483         ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5484                                   &rbd_dev->header_oloc, "get_size",
5485                                   &snapid, sizeof(snapid),
5486                                   &size_buf, sizeof(size_buf));
5487         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5488         if (ret < 0)
5489                 return ret;
5490         if (ret < sizeof (size_buf))
5491                 return -ERANGE;
5492
5493         if (order) {
5494                 *order = size_buf.order;
5495                 dout("  order %u", (unsigned int)*order);
5496         }
5497         *snap_size = le64_to_cpu(size_buf.size);
5498
5499         dout("  snap_id 0x%016llx snap_size = %llu\n",
5500                 (unsigned long long)snap_id,
5501                 (unsigned long long)*snap_size);
5502
5503         return 0;
5504 }
5505
5506 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
5507 {
5508         return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
5509                                         &rbd_dev->header.obj_order,
5510                                         &rbd_dev->header.image_size);
5511 }
5512
5513 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
5514 {
5515         size_t size;
5516         void *reply_buf;
5517         int ret;
5518         void *p;
5519
5520         /* Response will be an encoded string, which includes a length */
5521         size = sizeof(__le32) + RBD_OBJ_PREFIX_LEN_MAX;
5522         reply_buf = kzalloc(size, GFP_KERNEL);
5523         if (!reply_buf)
5524                 return -ENOMEM;
5525
5526         ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5527                                   &rbd_dev->header_oloc, "get_object_prefix",
5528                                   NULL, 0, reply_buf, size);
5529         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5530         if (ret < 0)
5531                 goto out;
5532
5533         p = reply_buf;
5534         rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
5535                                                 p + ret, NULL, GFP_NOIO);
5536         ret = 0;
5537
5538         if (IS_ERR(rbd_dev->header.object_prefix)) {
5539                 ret = PTR_ERR(rbd_dev->header.object_prefix);
5540                 rbd_dev->header.object_prefix = NULL;
5541         } else {
5542                 dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
5543         }
5544 out:
5545         kfree(reply_buf);
5546
5547         return ret;
5548 }
5549
5550 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
5551                                      bool read_only, u64 *snap_features)
5552 {
5553         struct {
5554                 __le64 snap_id;
5555                 u8 read_only;
5556         } features_in;
5557         struct {
5558                 __le64 features;
5559                 __le64 incompat;
5560         } __attribute__ ((packed)) features_buf = { 0 };
5561         u64 unsup;
5562         int ret;
5563
5564         features_in.snap_id = cpu_to_le64(snap_id);
5565         features_in.read_only = read_only;
5566
5567         ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5568                                   &rbd_dev->header_oloc, "get_features",
5569                                   &features_in, sizeof(features_in),
5570                                   &features_buf, sizeof(features_buf));
5571         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5572         if (ret < 0)
5573                 return ret;
5574         if (ret < sizeof (features_buf))
5575                 return -ERANGE;
5576
5577         unsup = le64_to_cpu(features_buf.incompat) & ~RBD_FEATURES_SUPPORTED;
5578         if (unsup) {
5579                 rbd_warn(rbd_dev, "image uses unsupported features: 0x%llx",
5580                          unsup);
5581                 return -ENXIO;
5582         }
5583
5584         *snap_features = le64_to_cpu(features_buf.features);
5585
5586         dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
5587                 (unsigned long long)snap_id,
5588                 (unsigned long long)*snap_features,
5589                 (unsigned long long)le64_to_cpu(features_buf.incompat));
5590
5591         return 0;
5592 }
5593
5594 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
5595 {
5596         return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
5597                                          rbd_is_ro(rbd_dev),
5598                                          &rbd_dev->header.features);
5599 }
5600
5601 /*
5602  * These are generic image flags, but since they are used only for
5603  * object map, store them in rbd_dev->object_map_flags.
5604  *
5605  * For the same reason, this function is called only on object map
5606  * (re)load and not on header refresh.
5607  */
5608 static int rbd_dev_v2_get_flags(struct rbd_device *rbd_dev)
5609 {
5610         __le64 snapid = cpu_to_le64(rbd_dev->spec->snap_id);
5611         __le64 flags;
5612         int ret;
5613
5614         ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5615                                   &rbd_dev->header_oloc, "get_flags",
5616                                   &snapid, sizeof(snapid),
5617                                   &flags, sizeof(flags));
5618         if (ret < 0)
5619                 return ret;
5620         if (ret < sizeof(flags))
5621                 return -EBADMSG;
5622
5623         rbd_dev->object_map_flags = le64_to_cpu(flags);
5624         return 0;
5625 }
5626
5627 struct parent_image_info {
5628         u64             pool_id;
5629         const char      *pool_ns;
5630         const char      *image_id;
5631         u64             snap_id;
5632
5633         bool            has_overlap;
5634         u64             overlap;
5635 };
5636
5637 /*
5638  * The caller is responsible for @pii.
5639  */
5640 static int decode_parent_image_spec(void **p, void *end,
5641                                     struct parent_image_info *pii)
5642 {
5643         u8 struct_v;
5644         u32 struct_len;
5645         int ret;
5646
5647         ret = ceph_start_decoding(p, end, 1, "ParentImageSpec",
5648                                   &struct_v, &struct_len);
5649         if (ret)
5650                 return ret;
5651
5652         ceph_decode_64_safe(p, end, pii->pool_id, e_inval);
5653         pii->pool_ns = ceph_extract_encoded_string(p, end, NULL, GFP_KERNEL);
5654         if (IS_ERR(pii->pool_ns)) {
5655                 ret = PTR_ERR(pii->pool_ns);
5656                 pii->pool_ns = NULL;
5657                 return ret;
5658         }
5659         pii->image_id = ceph_extract_encoded_string(p, end, NULL, GFP_KERNEL);
5660         if (IS_ERR(pii->image_id)) {
5661                 ret = PTR_ERR(pii->image_id);
5662                 pii->image_id = NULL;
5663                 return ret;
5664         }
5665         ceph_decode_64_safe(p, end, pii->snap_id, e_inval);
5666         return 0;
5667
5668 e_inval:
5669         return -EINVAL;
5670 }
5671
5672 static int __get_parent_info(struct rbd_device *rbd_dev,
5673                              struct page *req_page,
5674                              struct page *reply_page,
5675                              struct parent_image_info *pii)
5676 {
5677         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
5678         size_t reply_len = PAGE_SIZE;
5679         void *p, *end;
5680         int ret;
5681
5682         ret = ceph_osdc_call(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
5683                              "rbd", "parent_get", CEPH_OSD_FLAG_READ,
5684                              req_page, sizeof(u64), &reply_page, &reply_len);
5685         if (ret)
5686                 return ret == -EOPNOTSUPP ? 1 : ret;
5687
5688         p = page_address(reply_page);
5689         end = p + reply_len;
5690         ret = decode_parent_image_spec(&p, end, pii);
5691         if (ret)
5692                 return ret;
5693
5694         ret = ceph_osdc_call(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
5695                              "rbd", "parent_overlap_get", CEPH_OSD_FLAG_READ,
5696                              req_page, sizeof(u64), &reply_page, &reply_len);
5697         if (ret)
5698                 return ret;
5699
5700         p = page_address(reply_page);
5701         end = p + reply_len;
5702         ceph_decode_8_safe(&p, end, pii->has_overlap, e_inval);
5703         if (pii->has_overlap)
5704                 ceph_decode_64_safe(&p, end, pii->overlap, e_inval);
5705
5706         return 0;
5707
5708 e_inval:
5709         return -EINVAL;
5710 }
5711
5712 /*
5713  * The caller is responsible for @pii.
5714  */
5715 static int __get_parent_info_legacy(struct rbd_device *rbd_dev,
5716                                     struct page *req_page,
5717                                     struct page *reply_page,
5718                                     struct parent_image_info *pii)
5719 {
5720         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
5721         size_t reply_len = PAGE_SIZE;
5722         void *p, *end;
5723         int ret;
5724
5725         ret = ceph_osdc_call(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
5726                              "rbd", "get_parent", CEPH_OSD_FLAG_READ,
5727                              req_page, sizeof(u64), &reply_page, &reply_len);
5728         if (ret)
5729                 return ret;
5730
5731         p = page_address(reply_page);
5732         end = p + reply_len;
5733         ceph_decode_64_safe(&p, end, pii->pool_id, e_inval);
5734         pii->image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
5735         if (IS_ERR(pii->image_id)) {
5736                 ret = PTR_ERR(pii->image_id);
5737                 pii->image_id = NULL;
5738                 return ret;
5739         }
5740         ceph_decode_64_safe(&p, end, pii->snap_id, e_inval);
5741         pii->has_overlap = true;
5742         ceph_decode_64_safe(&p, end, pii->overlap, e_inval);
5743
5744         return 0;
5745
5746 e_inval:
5747         return -EINVAL;
5748 }
5749
5750 static int get_parent_info(struct rbd_device *rbd_dev,
5751                            struct parent_image_info *pii)
5752 {
5753         struct page *req_page, *reply_page;
5754         void *p;
5755         int ret;
5756
5757         req_page = alloc_page(GFP_KERNEL);
5758         if (!req_page)
5759                 return -ENOMEM;
5760
5761         reply_page = alloc_page(GFP_KERNEL);
5762         if (!reply_page) {
5763                 __free_page(req_page);
5764                 return -ENOMEM;
5765         }
5766
5767         p = page_address(req_page);
5768         ceph_encode_64(&p, rbd_dev->spec->snap_id);
5769         ret = __get_parent_info(rbd_dev, req_page, reply_page, pii);
5770         if (ret > 0)
5771                 ret = __get_parent_info_legacy(rbd_dev, req_page, reply_page,
5772                                                pii);
5773
5774         __free_page(req_page);
5775         __free_page(reply_page);
5776         return ret;
5777 }
5778
5779 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
5780 {
5781         struct rbd_spec *parent_spec;
5782         struct parent_image_info pii = { 0 };
5783         int ret;
5784
5785         parent_spec = rbd_spec_alloc();
5786         if (!parent_spec)
5787                 return -ENOMEM;
5788
5789         ret = get_parent_info(rbd_dev, &pii);
5790         if (ret)
5791                 goto out_err;
5792
5793         dout("%s pool_id %llu pool_ns %s image_id %s snap_id %llu has_overlap %d overlap %llu\n",
5794              __func__, pii.pool_id, pii.pool_ns, pii.image_id, pii.snap_id,
5795              pii.has_overlap, pii.overlap);
5796
5797         if (pii.pool_id == CEPH_NOPOOL || !pii.has_overlap) {
5798                 /*
5799                  * Either the parent never existed, or we have
5800                  * record of it but the image got flattened so it no
5801                  * longer has a parent.  When the parent of a
5802                  * layered image disappears we immediately set the
5803                  * overlap to 0.  The effect of this is that all new
5804                  * requests will be treated as if the image had no
5805                  * parent.
5806                  *
5807                  * If !pii.has_overlap, the parent image spec is not
5808                  * applicable.  It's there to avoid duplication in each
5809                  * snapshot record.
5810                  */
5811                 if (rbd_dev->parent_overlap) {
5812                         rbd_dev->parent_overlap = 0;
5813                         rbd_dev_parent_put(rbd_dev);
5814                         pr_info("%s: clone image has been flattened\n",
5815                                 rbd_dev->disk->disk_name);
5816                 }
5817
5818                 goto out;       /* No parent?  No problem. */
5819         }
5820
5821         /* The ceph file layout needs to fit pool id in 32 bits */
5822
5823         ret = -EIO;
5824         if (pii.pool_id > (u64)U32_MAX) {
5825                 rbd_warn(NULL, "parent pool id too large (%llu > %u)",
5826                         (unsigned long long)pii.pool_id, U32_MAX);
5827                 goto out_err;
5828         }
5829
5830         /*
5831          * The parent won't change (except when the clone is
5832          * flattened, already handled that).  So we only need to
5833          * record the parent spec we have not already done so.
5834          */
5835         if (!rbd_dev->parent_spec) {
5836                 parent_spec->pool_id = pii.pool_id;
5837                 if (pii.pool_ns && *pii.pool_ns) {
5838                         parent_spec->pool_ns = pii.pool_ns;
5839                         pii.pool_ns = NULL;
5840                 }
5841                 parent_spec->image_id = pii.image_id;
5842                 pii.image_id = NULL;
5843                 parent_spec->snap_id = pii.snap_id;
5844
5845                 rbd_dev->parent_spec = parent_spec;
5846                 parent_spec = NULL;     /* rbd_dev now owns this */
5847         }
5848
5849         /*
5850          * We always update the parent overlap.  If it's zero we issue
5851          * a warning, as we will proceed as if there was no parent.
5852          */
5853         if (!pii.overlap) {
5854                 if (parent_spec) {
5855                         /* refresh, careful to warn just once */
5856                         if (rbd_dev->parent_overlap)
5857                                 rbd_warn(rbd_dev,
5858                                     "clone now standalone (overlap became 0)");
5859                 } else {
5860                         /* initial probe */
5861                         rbd_warn(rbd_dev, "clone is standalone (overlap 0)");
5862                 }
5863         }
5864         rbd_dev->parent_overlap = pii.overlap;
5865
5866 out:
5867         ret = 0;
5868 out_err:
5869         kfree(pii.pool_ns);
5870         kfree(pii.image_id);
5871         rbd_spec_put(parent_spec);
5872         return ret;
5873 }
5874
5875 static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
5876 {
5877         struct {
5878                 __le64 stripe_unit;
5879                 __le64 stripe_count;
5880         } __attribute__ ((packed)) striping_info_buf = { 0 };
5881         size_t size = sizeof (striping_info_buf);
5882         void *p;
5883         int ret;
5884
5885         ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5886                                 &rbd_dev->header_oloc, "get_stripe_unit_count",
5887                                 NULL, 0, &striping_info_buf, size);
5888         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5889         if (ret < 0)
5890                 return ret;
5891         if (ret < size)
5892                 return -ERANGE;
5893
5894         p = &striping_info_buf;
5895         rbd_dev->header.stripe_unit = ceph_decode_64(&p);
5896         rbd_dev->header.stripe_count = ceph_decode_64(&p);
5897         return 0;
5898 }
5899
5900 static int rbd_dev_v2_data_pool(struct rbd_device *rbd_dev)
5901 {
5902         __le64 data_pool_id;
5903         int ret;
5904
5905         ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5906                                   &rbd_dev->header_oloc, "get_data_pool",
5907                                   NULL, 0, &data_pool_id, sizeof(data_pool_id));
5908         if (ret < 0)
5909                 return ret;
5910         if (ret < sizeof(data_pool_id))
5911                 return -EBADMSG;
5912
5913         rbd_dev->header.data_pool_id = le64_to_cpu(data_pool_id);
5914         WARN_ON(rbd_dev->header.data_pool_id == CEPH_NOPOOL);
5915         return 0;
5916 }
5917
5918 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
5919 {
5920         CEPH_DEFINE_OID_ONSTACK(oid);
5921         size_t image_id_size;
5922         char *image_id;
5923         void *p;
5924         void *end;
5925         size_t size;
5926         void *reply_buf = NULL;
5927         size_t len = 0;
5928         char *image_name = NULL;
5929         int ret;
5930
5931         rbd_assert(!rbd_dev->spec->image_name);
5932
5933         len = strlen(rbd_dev->spec->image_id);
5934         image_id_size = sizeof (__le32) + len;
5935         image_id = kmalloc(image_id_size, GFP_KERNEL);
5936         if (!image_id)
5937                 return NULL;
5938
5939         p = image_id;
5940         end = image_id + image_id_size;
5941         ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
5942
5943         size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
5944         reply_buf = kmalloc(size, GFP_KERNEL);
5945         if (!reply_buf)
5946                 goto out;
5947
5948         ceph_oid_printf(&oid, "%s", RBD_DIRECTORY);
5949         ret = rbd_obj_method_sync(rbd_dev, &oid, &rbd_dev->header_oloc,
5950                                   "dir_get_name", image_id, image_id_size,
5951                                   reply_buf, size);
5952         if (ret < 0)
5953                 goto out;
5954         p = reply_buf;
5955         end = reply_buf + ret;
5956
5957         image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
5958         if (IS_ERR(image_name))
5959                 image_name = NULL;
5960         else
5961                 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
5962 out:
5963         kfree(reply_buf);
5964         kfree(image_id);
5965
5966         return image_name;
5967 }
5968
5969 static u64 rbd_v1_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
5970 {
5971         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
5972         const char *snap_name;
5973         u32 which = 0;
5974
5975         /* Skip over names until we find the one we are looking for */
5976
5977         snap_name = rbd_dev->header.snap_names;
5978         while (which < snapc->num_snaps) {
5979                 if (!strcmp(name, snap_name))
5980                         return snapc->snaps[which];
5981                 snap_name += strlen(snap_name) + 1;
5982                 which++;
5983         }
5984         return CEPH_NOSNAP;
5985 }
5986
5987 static u64 rbd_v2_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
5988 {
5989         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
5990         u32 which;
5991         bool found = false;
5992         u64 snap_id;
5993
5994         for (which = 0; !found && which < snapc->num_snaps; which++) {
5995                 const char *snap_name;
5996
5997                 snap_id = snapc->snaps[which];
5998                 snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id);
5999                 if (IS_ERR(snap_name)) {
6000                         /* ignore no-longer existing snapshots */
6001                         if (PTR_ERR(snap_name) == -ENOENT)
6002                                 continue;
6003                         else
6004                                 break;
6005                 }
6006                 found = !strcmp(name, snap_name);
6007                 kfree(snap_name);
6008         }
6009         return found ? snap_id : CEPH_NOSNAP;
6010 }
6011
6012 /*
6013  * Assumes name is never RBD_SNAP_HEAD_NAME; returns CEPH_NOSNAP if
6014  * no snapshot by that name is found, or if an error occurs.
6015  */
6016 static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
6017 {
6018         if (rbd_dev->image_format == 1)
6019                 return rbd_v1_snap_id_by_name(rbd_dev, name);
6020
6021         return rbd_v2_snap_id_by_name(rbd_dev, name);
6022 }
6023
6024 /*
6025  * An image being mapped will have everything but the snap id.
6026  */
6027 static int rbd_spec_fill_snap_id(struct rbd_device *rbd_dev)
6028 {
6029         struct rbd_spec *spec = rbd_dev->spec;
6030
6031         rbd_assert(spec->pool_id != CEPH_NOPOOL && spec->pool_name);
6032         rbd_assert(spec->image_id && spec->image_name);
6033         rbd_assert(spec->snap_name);
6034
6035         if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
6036                 u64 snap_id;
6037
6038                 snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
6039                 if (snap_id == CEPH_NOSNAP)
6040                         return -ENOENT;
6041
6042                 spec->snap_id = snap_id;
6043         } else {
6044                 spec->snap_id = CEPH_NOSNAP;
6045         }
6046
6047         return 0;
6048 }
6049
6050 /*
6051  * A parent image will have all ids but none of the names.
6052  *
6053  * All names in an rbd spec are dynamically allocated.  It's OK if we
6054  * can't figure out the name for an image id.
6055  */
6056 static int rbd_spec_fill_names(struct rbd_device *rbd_dev)
6057 {
6058         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
6059         struct rbd_spec *spec = rbd_dev->spec;
6060         const char *pool_name;
6061         const char *image_name;
6062         const char *snap_name;
6063         int ret;
6064
6065         rbd_assert(spec->pool_id != CEPH_NOPOOL);
6066         rbd_assert(spec->image_id);
6067         rbd_assert(spec->snap_id != CEPH_NOSNAP);
6068
6069         /* Get the pool name; we have to make our own copy of this */
6070
6071         pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
6072         if (!pool_name) {
6073                 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
6074                 return -EIO;
6075         }
6076         pool_name = kstrdup(pool_name, GFP_KERNEL);
6077         if (!pool_name)
6078                 return -ENOMEM;
6079
6080         /* Fetch the image name; tolerate failure here */
6081
6082         image_name = rbd_dev_image_name(rbd_dev);
6083         if (!image_name)
6084                 rbd_warn(rbd_dev, "unable to get image name");
6085
6086         /* Fetch the snapshot name */
6087
6088         snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
6089         if (IS_ERR(snap_name)) {
6090                 ret = PTR_ERR(snap_name);
6091                 goto out_err;
6092         }
6093
6094         spec->pool_name = pool_name;
6095         spec->image_name = image_name;
6096         spec->snap_name = snap_name;
6097
6098         return 0;
6099
6100 out_err:
6101         kfree(image_name);
6102         kfree(pool_name);
6103         return ret;
6104 }
6105
6106 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
6107 {
6108         size_t size;
6109         int ret;
6110         void *reply_buf;
6111         void *p;
6112         void *end;
6113         u64 seq;
6114         u32 snap_count;
6115         struct ceph_snap_context *snapc;
6116         u32 i;
6117
6118         /*
6119          * We'll need room for the seq value (maximum snapshot id),
6120          * snapshot count, and array of that many snapshot ids.
6121          * For now we have a fixed upper limit on the number we're
6122          * prepared to receive.
6123          */
6124         size = sizeof (__le64) + sizeof (__le32) +
6125                         RBD_MAX_SNAP_COUNT * sizeof (__le64);
6126         reply_buf = kzalloc(size, GFP_KERNEL);
6127         if (!reply_buf)
6128                 return -ENOMEM;
6129
6130         ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
6131                                   &rbd_dev->header_oloc, "get_snapcontext",
6132                                   NULL, 0, reply_buf, size);
6133         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
6134         if (ret < 0)
6135                 goto out;
6136
6137         p = reply_buf;
6138         end = reply_buf + ret;
6139         ret = -ERANGE;
6140         ceph_decode_64_safe(&p, end, seq, out);
6141         ceph_decode_32_safe(&p, end, snap_count, out);
6142
6143         /*
6144          * Make sure the reported number of snapshot ids wouldn't go
6145          * beyond the end of our buffer.  But before checking that,
6146          * make sure the computed size of the snapshot context we
6147          * allocate is representable in a size_t.
6148          */
6149         if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
6150                                  / sizeof (u64)) {
6151                 ret = -EINVAL;
6152                 goto out;
6153         }
6154         if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
6155                 goto out;
6156         ret = 0;
6157
6158         snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
6159         if (!snapc) {
6160                 ret = -ENOMEM;
6161                 goto out;
6162         }
6163         snapc->seq = seq;
6164         for (i = 0; i < snap_count; i++)
6165                 snapc->snaps[i] = ceph_decode_64(&p);
6166
6167         ceph_put_snap_context(rbd_dev->header.snapc);
6168         rbd_dev->header.snapc = snapc;
6169
6170         dout("  snap context seq = %llu, snap_count = %u\n",
6171                 (unsigned long long)seq, (unsigned int)snap_count);
6172 out:
6173         kfree(reply_buf);
6174
6175         return ret;
6176 }
6177
6178 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
6179                                         u64 snap_id)
6180 {
6181         size_t size;
6182         void *reply_buf;
6183         __le64 snapid;
6184         int ret;
6185         void *p;
6186         void *end;
6187         char *snap_name;
6188
6189         size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
6190         reply_buf = kmalloc(size, GFP_KERNEL);
6191         if (!reply_buf)
6192                 return ERR_PTR(-ENOMEM);
6193
6194         snapid = cpu_to_le64(snap_id);
6195         ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
6196                                   &rbd_dev->header_oloc, "get_snapshot_name",
6197                                   &snapid, sizeof(snapid), reply_buf, size);
6198         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
6199         if (ret < 0) {
6200                 snap_name = ERR_PTR(ret);
6201                 goto out;
6202         }
6203
6204         p = reply_buf;
6205         end = reply_buf + ret;
6206         snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
6207         if (IS_ERR(snap_name))
6208                 goto out;
6209
6210         dout("  snap_id 0x%016llx snap_name = %s\n",
6211                 (unsigned long long)snap_id, snap_name);
6212 out:
6213         kfree(reply_buf);
6214
6215         return snap_name;
6216 }
6217
6218 static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev)
6219 {
6220         bool first_time = rbd_dev->header.object_prefix == NULL;
6221         int ret;
6222
6223         ret = rbd_dev_v2_image_size(rbd_dev);
6224         if (ret)
6225                 return ret;
6226
6227         if (first_time) {
6228                 ret = rbd_dev_v2_header_onetime(rbd_dev);
6229                 if (ret)
6230                         return ret;
6231         }
6232
6233         ret = rbd_dev_v2_snap_context(rbd_dev);
6234         if (ret && first_time) {
6235                 kfree(rbd_dev->header.object_prefix);
6236                 rbd_dev->header.object_prefix = NULL;
6237         }
6238
6239         return ret;
6240 }
6241
6242 static int rbd_dev_header_info(struct rbd_device *rbd_dev)
6243 {
6244         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
6245
6246         if (rbd_dev->image_format == 1)
6247                 return rbd_dev_v1_header_info(rbd_dev);
6248
6249         return rbd_dev_v2_header_info(rbd_dev);
6250 }
6251
6252 /*
6253  * Skips over white space at *buf, and updates *buf to point to the
6254  * first found non-space character (if any). Returns the length of
6255  * the token (string of non-white space characters) found.  Note
6256  * that *buf must be terminated with '\0'.
6257  */
6258 static inline size_t next_token(const char **buf)
6259 {
6260         /*
6261         * These are the characters that produce nonzero for
6262         * isspace() in the "C" and "POSIX" locales.
6263         */
6264         static const char spaces[] = " \f\n\r\t\v";
6265
6266         *buf += strspn(*buf, spaces);   /* Find start of token */
6267
6268         return strcspn(*buf, spaces);   /* Return token length */
6269 }
6270
6271 /*
6272  * Finds the next token in *buf, dynamically allocates a buffer big
6273  * enough to hold a copy of it, and copies the token into the new
6274  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
6275  * that a duplicate buffer is created even for a zero-length token.
6276  *
6277  * Returns a pointer to the newly-allocated duplicate, or a null
6278  * pointer if memory for the duplicate was not available.  If
6279  * the lenp argument is a non-null pointer, the length of the token
6280  * (not including the '\0') is returned in *lenp.
6281  *
6282  * If successful, the *buf pointer will be updated to point beyond
6283  * the end of the found token.
6284  *
6285  * Note: uses GFP_KERNEL for allocation.
6286  */
6287 static inline char *dup_token(const char **buf, size_t *lenp)
6288 {
6289         char *dup;
6290         size_t len;
6291
6292         len = next_token(buf);
6293         dup = kmemdup(*buf, len + 1, GFP_KERNEL);
6294         if (!dup)
6295                 return NULL;
6296         *(dup + len) = '\0';
6297         *buf += len;
6298
6299         if (lenp)
6300                 *lenp = len;
6301
6302         return dup;
6303 }
6304
6305 static int rbd_parse_param(struct fs_parameter *param,
6306                             struct rbd_parse_opts_ctx *pctx)
6307 {
6308         struct rbd_options *opt = pctx->opts;
6309         struct fs_parse_result result;
6310         struct p_log log = {.prefix = "rbd"};
6311         int token, ret;
6312
6313         ret = ceph_parse_param(param, pctx->copts, NULL);
6314         if (ret != -ENOPARAM)
6315                 return ret;
6316
6317         token = __fs_parse(&log, rbd_parameters, param, &result);
6318         dout("%s fs_parse '%s' token %d\n", __func__, param->key, token);
6319         if (token < 0) {
6320                 if (token == -ENOPARAM)
6321                         return inval_plog(&log, "Unknown parameter '%s'",
6322                                           param->key);
6323                 return token;
6324         }
6325
6326         switch (token) {
6327         case Opt_queue_depth:
6328                 if (result.uint_32 < 1)
6329                         goto out_of_range;
6330                 opt->queue_depth = result.uint_32;
6331                 break;
6332         case Opt_alloc_size:
6333                 if (result.uint_32 < SECTOR_SIZE)
6334                         goto out_of_range;
6335                 if (!is_power_of_2(result.uint_32))
6336                         return inval_plog(&log, "alloc_size must be a power of 2");
6337                 opt->alloc_size = result.uint_32;
6338                 break;
6339         case Opt_lock_timeout:
6340                 /* 0 is "wait forever" (i.e. infinite timeout) */
6341                 if (result.uint_32 > INT_MAX / 1000)
6342                         goto out_of_range;
6343                 opt->lock_timeout = msecs_to_jiffies(result.uint_32 * 1000);
6344                 break;
6345         case Opt_pool_ns:
6346                 kfree(pctx->spec->pool_ns);
6347                 pctx->spec->pool_ns = param->string;
6348                 param->string = NULL;
6349                 break;
6350         case Opt_compression_hint:
6351                 switch (result.uint_32) {
6352                 case Opt_compression_hint_none:
6353                         opt->alloc_hint_flags &=
6354                             ~(CEPH_OSD_ALLOC_HINT_FLAG_COMPRESSIBLE |
6355                               CEPH_OSD_ALLOC_HINT_FLAG_INCOMPRESSIBLE);
6356                         break;
6357                 case Opt_compression_hint_compressible:
6358                         opt->alloc_hint_flags |=
6359                             CEPH_OSD_ALLOC_HINT_FLAG_COMPRESSIBLE;
6360                         opt->alloc_hint_flags &=
6361                             ~CEPH_OSD_ALLOC_HINT_FLAG_INCOMPRESSIBLE;
6362                         break;
6363                 case Opt_compression_hint_incompressible:
6364                         opt->alloc_hint_flags |=
6365                             CEPH_OSD_ALLOC_HINT_FLAG_INCOMPRESSIBLE;
6366                         opt->alloc_hint_flags &=
6367                             ~CEPH_OSD_ALLOC_HINT_FLAG_COMPRESSIBLE;
6368                         break;
6369                 default:
6370                         BUG();
6371                 }
6372                 break;
6373         case Opt_read_only:
6374                 opt->read_only = true;
6375                 break;
6376         case Opt_read_write:
6377                 opt->read_only = false;
6378                 break;
6379         case Opt_lock_on_read:
6380                 opt->lock_on_read = true;
6381                 break;
6382         case Opt_exclusive:
6383                 opt->exclusive = true;
6384                 break;
6385         case Opt_notrim:
6386                 opt->trim = false;
6387                 break;
6388         default:
6389                 BUG();
6390         }
6391
6392         return 0;
6393
6394 out_of_range:
6395         return inval_plog(&log, "%s out of range", param->key);
6396 }
6397
6398 /*
6399  * This duplicates most of generic_parse_monolithic(), untying it from
6400  * fs_context and skipping standard superblock and security options.
6401  */
6402 static int rbd_parse_options(char *options, struct rbd_parse_opts_ctx *pctx)
6403 {
6404         char *key;
6405         int ret = 0;
6406
6407         dout("%s '%s'\n", __func__, options);
6408         while ((key = strsep(&options, ",")) != NULL) {
6409                 if (*key) {
6410                         struct fs_parameter param = {
6411                                 .key    = key,
6412                                 .type   = fs_value_is_flag,
6413                         };
6414                         char *value = strchr(key, '=');
6415                         size_t v_len = 0;
6416
6417                         if (value) {
6418                                 if (value == key)
6419                                         continue;
6420                                 *value++ = 0;
6421                                 v_len = strlen(value);
6422                                 param.string = kmemdup_nul(value, v_len,
6423                                                            GFP_KERNEL);
6424                                 if (!param.string)
6425                                         return -ENOMEM;
6426                                 param.type = fs_value_is_string;
6427                         }
6428                         param.size = v_len;
6429
6430                         ret = rbd_parse_param(&param, pctx);
6431                         kfree(param.string);
6432                         if (ret)
6433                                 break;
6434                 }
6435         }
6436
6437         return ret;
6438 }
6439
6440 /*
6441  * Parse the options provided for an "rbd add" (i.e., rbd image
6442  * mapping) request.  These arrive via a write to /sys/bus/rbd/add,
6443  * and the data written is passed here via a NUL-terminated buffer.
6444  * Returns 0 if successful or an error code otherwise.
6445  *
6446  * The information extracted from these options is recorded in
6447  * the other parameters which return dynamically-allocated
6448  * structures:
6449  *  ceph_opts
6450  *      The address of a pointer that will refer to a ceph options
6451  *      structure.  Caller must release the returned pointer using
6452  *      ceph_destroy_options() when it is no longer needed.
6453  *  rbd_opts
6454  *      Address of an rbd options pointer.  Fully initialized by
6455  *      this function; caller must release with kfree().
6456  *  spec
6457  *      Address of an rbd image specification pointer.  Fully
6458  *      initialized by this function based on parsed options.
6459  *      Caller must release with rbd_spec_put().
6460  *
6461  * The options passed take this form:
6462  *  <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
6463  * where:
6464  *  <mon_addrs>
6465  *      A comma-separated list of one or more monitor addresses.
6466  *      A monitor address is an ip address, optionally followed
6467  *      by a port number (separated by a colon).
6468  *        I.e.:  ip1[:port1][,ip2[:port2]...]
6469  *  <options>
6470  *      A comma-separated list of ceph and/or rbd options.
6471  *  <pool_name>
6472  *      The name of the rados pool containing the rbd image.
6473  *  <image_name>
6474  *      The name of the image in that pool to map.
6475  *  <snap_id>
6476  *      An optional snapshot id.  If provided, the mapping will
6477  *      present data from the image at the time that snapshot was
6478  *      created.  The image head is used if no snapshot id is
6479  *      provided.  Snapshot mappings are always read-only.
6480  */
6481 static int rbd_add_parse_args(const char *buf,
6482                                 struct ceph_options **ceph_opts,
6483                                 struct rbd_options **opts,
6484                                 struct rbd_spec **rbd_spec)
6485 {
6486         size_t len;
6487         char *options;
6488         const char *mon_addrs;
6489         char *snap_name;
6490         size_t mon_addrs_size;
6491         struct rbd_parse_opts_ctx pctx = { 0 };
6492         int ret;
6493
6494         /* The first four tokens are required */
6495
6496         len = next_token(&buf);
6497         if (!len) {
6498                 rbd_warn(NULL, "no monitor address(es) provided");
6499                 return -EINVAL;
6500         }
6501         mon_addrs = buf;
6502         mon_addrs_size = len;
6503         buf += len;
6504
6505         ret = -EINVAL;
6506         options = dup_token(&buf, NULL);
6507         if (!options)
6508                 return -ENOMEM;
6509         if (!*options) {
6510                 rbd_warn(NULL, "no options provided");
6511                 goto out_err;
6512         }
6513
6514         pctx.spec = rbd_spec_alloc();
6515         if (!pctx.spec)
6516                 goto out_mem;
6517
6518         pctx.spec->pool_name = dup_token(&buf, NULL);
6519         if (!pctx.spec->pool_name)
6520                 goto out_mem;
6521         if (!*pctx.spec->pool_name) {
6522                 rbd_warn(NULL, "no pool name provided");
6523                 goto out_err;
6524         }
6525
6526         pctx.spec->image_name = dup_token(&buf, NULL);
6527         if (!pctx.spec->image_name)
6528                 goto out_mem;
6529         if (!*pctx.spec->image_name) {
6530                 rbd_warn(NULL, "no image name provided");
6531                 goto out_err;
6532         }
6533
6534         /*
6535          * Snapshot name is optional; default is to use "-"
6536          * (indicating the head/no snapshot).
6537          */
6538         len = next_token(&buf);
6539         if (!len) {
6540                 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
6541                 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
6542         } else if (len > RBD_MAX_SNAP_NAME_LEN) {
6543                 ret = -ENAMETOOLONG;
6544                 goto out_err;
6545         }
6546         snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
6547         if (!snap_name)
6548                 goto out_mem;
6549         *(snap_name + len) = '\0';
6550         pctx.spec->snap_name = snap_name;
6551
6552         pctx.copts = ceph_alloc_options();
6553         if (!pctx.copts)
6554                 goto out_mem;
6555
6556         /* Initialize all rbd options to the defaults */
6557
6558         pctx.opts = kzalloc(sizeof(*pctx.opts), GFP_KERNEL);
6559         if (!pctx.opts)
6560                 goto out_mem;
6561
6562         pctx.opts->read_only = RBD_READ_ONLY_DEFAULT;
6563         pctx.opts->queue_depth = RBD_QUEUE_DEPTH_DEFAULT;
6564         pctx.opts->alloc_size = RBD_ALLOC_SIZE_DEFAULT;
6565         pctx.opts->lock_timeout = RBD_LOCK_TIMEOUT_DEFAULT;
6566         pctx.opts->lock_on_read = RBD_LOCK_ON_READ_DEFAULT;
6567         pctx.opts->exclusive = RBD_EXCLUSIVE_DEFAULT;
6568         pctx.opts->trim = RBD_TRIM_DEFAULT;
6569
6570         ret = ceph_parse_mon_ips(mon_addrs, mon_addrs_size, pctx.copts, NULL,
6571                                  ',');
6572         if (ret)
6573                 goto out_err;
6574
6575         ret = rbd_parse_options(options, &pctx);
6576         if (ret)
6577                 goto out_err;
6578
6579         *ceph_opts = pctx.copts;
6580         *opts = pctx.opts;
6581         *rbd_spec = pctx.spec;
6582         kfree(options);
6583         return 0;
6584
6585 out_mem:
6586         ret = -ENOMEM;
6587 out_err:
6588         kfree(pctx.opts);
6589         ceph_destroy_options(pctx.copts);
6590         rbd_spec_put(pctx.spec);
6591         kfree(options);
6592         return ret;
6593 }
6594
6595 static void rbd_dev_image_unlock(struct rbd_device *rbd_dev)
6596 {
6597         down_write(&rbd_dev->lock_rwsem);
6598         if (__rbd_is_lock_owner(rbd_dev))
6599                 __rbd_release_lock(rbd_dev);
6600         up_write(&rbd_dev->lock_rwsem);
6601 }
6602
6603 /*
6604  * If the wait is interrupted, an error is returned even if the lock
6605  * was successfully acquired.  rbd_dev_image_unlock() will release it
6606  * if needed.
6607  */
6608 static int rbd_add_acquire_lock(struct rbd_device *rbd_dev)
6609 {
6610         long ret;
6611
6612         if (!(rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK)) {
6613                 if (!rbd_dev->opts->exclusive && !rbd_dev->opts->lock_on_read)
6614                         return 0;
6615
6616                 rbd_warn(rbd_dev, "exclusive-lock feature is not enabled");
6617                 return -EINVAL;
6618         }
6619
6620         if (rbd_is_ro(rbd_dev))
6621                 return 0;
6622
6623         rbd_assert(!rbd_is_lock_owner(rbd_dev));
6624         queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
6625         ret = wait_for_completion_killable_timeout(&rbd_dev->acquire_wait,
6626                             ceph_timeout_jiffies(rbd_dev->opts->lock_timeout));
6627         if (ret > 0) {
6628                 ret = rbd_dev->acquire_err;
6629         } else {
6630                 cancel_delayed_work_sync(&rbd_dev->lock_dwork);
6631                 if (!ret)
6632                         ret = -ETIMEDOUT;
6633
6634                 rbd_warn(rbd_dev, "failed to acquire lock: %ld", ret);
6635         }
6636         if (ret)
6637                 return ret;
6638
6639         /*
6640          * The lock may have been released by now, unless automatic lock
6641          * transitions are disabled.
6642          */
6643         rbd_assert(!rbd_dev->opts->exclusive || rbd_is_lock_owner(rbd_dev));
6644         return 0;
6645 }
6646
6647 /*
6648  * An rbd format 2 image has a unique identifier, distinct from the
6649  * name given to it by the user.  Internally, that identifier is
6650  * what's used to specify the names of objects related to the image.
6651  *
6652  * A special "rbd id" object is used to map an rbd image name to its
6653  * id.  If that object doesn't exist, then there is no v2 rbd image
6654  * with the supplied name.
6655  *
6656  * This function will record the given rbd_dev's image_id field if
6657  * it can be determined, and in that case will return 0.  If any
6658  * errors occur a negative errno will be returned and the rbd_dev's
6659  * image_id field will be unchanged (and should be NULL).
6660  */
6661 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
6662 {
6663         int ret;
6664         size_t size;
6665         CEPH_DEFINE_OID_ONSTACK(oid);
6666         void *response;
6667         char *image_id;
6668
6669         /*
6670          * When probing a parent image, the image id is already
6671          * known (and the image name likely is not).  There's no
6672          * need to fetch the image id again in this case.  We
6673          * do still need to set the image format though.
6674          */
6675         if (rbd_dev->spec->image_id) {
6676                 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
6677
6678                 return 0;
6679         }
6680
6681         /*
6682          * First, see if the format 2 image id file exists, and if
6683          * so, get the image's persistent id from it.
6684          */
6685         ret = ceph_oid_aprintf(&oid, GFP_KERNEL, "%s%s", RBD_ID_PREFIX,
6686                                rbd_dev->spec->image_name);
6687         if (ret)
6688                 return ret;
6689
6690         dout("rbd id object name is %s\n", oid.name);
6691
6692         /* Response will be an encoded string, which includes a length */
6693         size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
6694         response = kzalloc(size, GFP_NOIO);
6695         if (!response) {
6696                 ret = -ENOMEM;
6697                 goto out;
6698         }
6699
6700         /* If it doesn't exist we'll assume it's a format 1 image */
6701
6702         ret = rbd_obj_method_sync(rbd_dev, &oid, &rbd_dev->header_oloc,
6703                                   "get_id", NULL, 0,
6704                                   response, size);
6705         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
6706         if (ret == -ENOENT) {
6707                 image_id = kstrdup("", GFP_KERNEL);
6708                 ret = image_id ? 0 : -ENOMEM;
6709                 if (!ret)
6710                         rbd_dev->image_format = 1;
6711         } else if (ret >= 0) {
6712                 void *p = response;
6713
6714                 image_id = ceph_extract_encoded_string(&p, p + ret,
6715                                                 NULL, GFP_NOIO);
6716                 ret = PTR_ERR_OR_ZERO(image_id);
6717                 if (!ret)
6718                         rbd_dev->image_format = 2;
6719         }
6720
6721         if (!ret) {
6722                 rbd_dev->spec->image_id = image_id;
6723                 dout("image_id is %s\n", image_id);
6724         }
6725 out:
6726         kfree(response);
6727         ceph_oid_destroy(&oid);
6728         return ret;
6729 }
6730
6731 /*
6732  * Undo whatever state changes are made by v1 or v2 header info
6733  * call.
6734  */
6735 static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
6736 {
6737         struct rbd_image_header *header;
6738
6739         rbd_dev_parent_put(rbd_dev);
6740         rbd_object_map_free(rbd_dev);
6741         rbd_dev_mapping_clear(rbd_dev);
6742
6743         /* Free dynamic fields from the header, then zero it out */
6744
6745         header = &rbd_dev->header;
6746         ceph_put_snap_context(header->snapc);
6747         kfree(header->snap_sizes);
6748         kfree(header->snap_names);
6749         kfree(header->object_prefix);
6750         memset(header, 0, sizeof (*header));
6751 }
6752
6753 static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev)
6754 {
6755         int ret;
6756
6757         ret = rbd_dev_v2_object_prefix(rbd_dev);
6758         if (ret)
6759                 goto out_err;
6760
6761         /*
6762          * Get the and check features for the image.  Currently the
6763          * features are assumed to never change.
6764          */
6765         ret = rbd_dev_v2_features(rbd_dev);
6766         if (ret)
6767                 goto out_err;
6768
6769         /* If the image supports fancy striping, get its parameters */
6770
6771         if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
6772                 ret = rbd_dev_v2_striping_info(rbd_dev);
6773                 if (ret < 0)
6774                         goto out_err;
6775         }
6776
6777         if (rbd_dev->header.features & RBD_FEATURE_DATA_POOL) {
6778                 ret = rbd_dev_v2_data_pool(rbd_dev);
6779                 if (ret)
6780                         goto out_err;
6781         }
6782
6783         rbd_init_layout(rbd_dev);
6784         return 0;
6785
6786 out_err:
6787         rbd_dev->header.features = 0;
6788         kfree(rbd_dev->header.object_prefix);
6789         rbd_dev->header.object_prefix = NULL;
6790         return ret;
6791 }
6792
6793 /*
6794  * @depth is rbd_dev_image_probe() -> rbd_dev_probe_parent() ->
6795  * rbd_dev_image_probe() recursion depth, which means it's also the
6796  * length of the already discovered part of the parent chain.
6797  */
6798 static int rbd_dev_probe_parent(struct rbd_device *rbd_dev, int depth)
6799 {
6800         struct rbd_device *parent = NULL;
6801         int ret;
6802
6803         if (!rbd_dev->parent_spec)
6804                 return 0;
6805
6806         if (++depth > RBD_MAX_PARENT_CHAIN_LEN) {
6807                 pr_info("parent chain is too long (%d)\n", depth);
6808                 ret = -EINVAL;
6809                 goto out_err;
6810         }
6811
6812         parent = __rbd_dev_create(rbd_dev->parent_spec);
6813         if (!parent) {
6814                 ret = -ENOMEM;
6815                 goto out_err;
6816         }
6817
6818         /*
6819          * Images related by parent/child relationships always share
6820          * rbd_client and spec/parent_spec, so bump their refcounts.
6821          */
6822         parent->rbd_client = __rbd_get_client(rbd_dev->rbd_client);
6823         parent->spec = rbd_spec_get(rbd_dev->parent_spec);
6824
6825         __set_bit(RBD_DEV_FLAG_READONLY, &parent->flags);
6826
6827         ret = rbd_dev_image_probe(parent, depth);
6828         if (ret < 0)
6829                 goto out_err;
6830
6831         rbd_dev->parent = parent;
6832         atomic_set(&rbd_dev->parent_ref, 1);
6833         return 0;
6834
6835 out_err:
6836         rbd_dev_unparent(rbd_dev);
6837         rbd_dev_destroy(parent);
6838         return ret;
6839 }
6840
6841 static void rbd_dev_device_release(struct rbd_device *rbd_dev)
6842 {
6843         clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
6844         rbd_free_disk(rbd_dev);
6845         if (!single_major)
6846                 unregister_blkdev(rbd_dev->major, rbd_dev->name);
6847 }
6848
6849 /*
6850  * rbd_dev->header_rwsem must be locked for write and will be unlocked
6851  * upon return.
6852  */
6853 static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
6854 {
6855         int ret;
6856
6857         /* Record our major and minor device numbers. */
6858
6859         if (!single_major) {
6860                 ret = register_blkdev(0, rbd_dev->name);
6861                 if (ret < 0)
6862                         goto err_out_unlock;
6863
6864                 rbd_dev->major = ret;
6865                 rbd_dev->minor = 0;
6866         } else {
6867                 rbd_dev->major = rbd_major;
6868                 rbd_dev->minor = rbd_dev_id_to_minor(rbd_dev->dev_id);
6869         }
6870
6871         /* Set up the blkdev mapping. */
6872
6873         ret = rbd_init_disk(rbd_dev);
6874         if (ret)
6875                 goto err_out_blkdev;
6876
6877         set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
6878         set_disk_ro(rbd_dev->disk, rbd_is_ro(rbd_dev));
6879
6880         ret = dev_set_name(&rbd_dev->dev, "%d", rbd_dev->dev_id);
6881         if (ret)
6882                 goto err_out_disk;
6883
6884         set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
6885         up_write(&rbd_dev->header_rwsem);
6886         return 0;
6887
6888 err_out_disk:
6889         rbd_free_disk(rbd_dev);
6890 err_out_blkdev:
6891         if (!single_major)
6892                 unregister_blkdev(rbd_dev->major, rbd_dev->name);
6893 err_out_unlock:
6894         up_write(&rbd_dev->header_rwsem);
6895         return ret;
6896 }
6897
6898 static int rbd_dev_header_name(struct rbd_device *rbd_dev)
6899 {
6900         struct rbd_spec *spec = rbd_dev->spec;
6901         int ret;
6902
6903         /* Record the header object name for this rbd image. */
6904
6905         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
6906         if (rbd_dev->image_format == 1)
6907                 ret = ceph_oid_aprintf(&rbd_dev->header_oid, GFP_KERNEL, "%s%s",
6908                                        spec->image_name, RBD_SUFFIX);
6909         else
6910                 ret = ceph_oid_aprintf(&rbd_dev->header_oid, GFP_KERNEL, "%s%s",
6911                                        RBD_HEADER_PREFIX, spec->image_id);
6912
6913         return ret;
6914 }
6915
6916 static void rbd_print_dne(struct rbd_device *rbd_dev, bool is_snap)
6917 {
6918         if (!is_snap) {
6919                 pr_info("image %s/%s%s%s does not exist\n",
6920                         rbd_dev->spec->pool_name,
6921                         rbd_dev->spec->pool_ns ?: "",
6922                         rbd_dev->spec->pool_ns ? "/" : "",
6923                         rbd_dev->spec->image_name);
6924         } else {
6925                 pr_info("snap %s/%s%s%s@%s does not exist\n",
6926                         rbd_dev->spec->pool_name,
6927                         rbd_dev->spec->pool_ns ?: "",
6928                         rbd_dev->spec->pool_ns ? "/" : "",
6929                         rbd_dev->spec->image_name,
6930                         rbd_dev->spec->snap_name);
6931         }
6932 }
6933
6934 static void rbd_dev_image_release(struct rbd_device *rbd_dev)
6935 {
6936         if (!rbd_is_ro(rbd_dev))
6937                 rbd_unregister_watch(rbd_dev);
6938
6939         rbd_dev_unprobe(rbd_dev);
6940         rbd_dev->image_format = 0;
6941         kfree(rbd_dev->spec->image_id);
6942         rbd_dev->spec->image_id = NULL;
6943 }
6944
6945 /*
6946  * Probe for the existence of the header object for the given rbd
6947  * device.  If this image is the one being mapped (i.e., not a
6948  * parent), initiate a watch on its header object before using that
6949  * object to get detailed information about the rbd image.
6950  *
6951  * On success, returns with header_rwsem held for write if called
6952  * with @depth == 0.
6953  */
6954 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth)
6955 {
6956         bool need_watch = !rbd_is_ro(rbd_dev);
6957         int ret;
6958
6959         /*
6960          * Get the id from the image id object.  Unless there's an
6961          * error, rbd_dev->spec->image_id will be filled in with
6962          * a dynamically-allocated string, and rbd_dev->image_format
6963          * will be set to either 1 or 2.
6964          */
6965         ret = rbd_dev_image_id(rbd_dev);
6966         if (ret)
6967                 return ret;
6968
6969         ret = rbd_dev_header_name(rbd_dev);
6970         if (ret)
6971                 goto err_out_format;
6972
6973         if (need_watch) {
6974                 ret = rbd_register_watch(rbd_dev);
6975                 if (ret) {
6976                         if (ret == -ENOENT)
6977                                 rbd_print_dne(rbd_dev, false);
6978                         goto err_out_format;
6979                 }
6980         }
6981
6982         if (!depth)
6983                 down_write(&rbd_dev->header_rwsem);
6984
6985         ret = rbd_dev_header_info(rbd_dev);
6986         if (ret) {
6987                 if (ret == -ENOENT && !need_watch)
6988                         rbd_print_dne(rbd_dev, false);
6989                 goto err_out_probe;
6990         }
6991
6992         /*
6993          * If this image is the one being mapped, we have pool name and
6994          * id, image name and id, and snap name - need to fill snap id.
6995          * Otherwise this is a parent image, identified by pool, image
6996          * and snap ids - need to fill in names for those ids.
6997          */
6998         if (!depth)
6999                 ret = rbd_spec_fill_snap_id(rbd_dev);
7000         else
7001                 ret = rbd_spec_fill_names(rbd_dev);
7002         if (ret) {
7003                 if (ret == -ENOENT)
7004                         rbd_print_dne(rbd_dev, true);
7005                 goto err_out_probe;
7006         }
7007
7008         ret = rbd_dev_mapping_set(rbd_dev);
7009         if (ret)
7010                 goto err_out_probe;
7011
7012         if (rbd_is_snap(rbd_dev) &&
7013             (rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP)) {
7014                 ret = rbd_object_map_load(rbd_dev);
7015                 if (ret)
7016                         goto err_out_probe;
7017         }
7018
7019         if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
7020                 ret = rbd_dev_v2_parent_info(rbd_dev);
7021                 if (ret)
7022                         goto err_out_probe;
7023         }
7024
7025         ret = rbd_dev_probe_parent(rbd_dev, depth);
7026         if (ret)
7027                 goto err_out_probe;
7028
7029         dout("discovered format %u image, header name is %s\n",
7030                 rbd_dev->image_format, rbd_dev->header_oid.name);
7031         return 0;
7032
7033 err_out_probe:
7034         if (!depth)
7035                 up_write(&rbd_dev->header_rwsem);
7036         if (need_watch)
7037                 rbd_unregister_watch(rbd_dev);
7038         rbd_dev_unprobe(rbd_dev);
7039 err_out_format:
7040         rbd_dev->image_format = 0;
7041         kfree(rbd_dev->spec->image_id);
7042         rbd_dev->spec->image_id = NULL;
7043         return ret;
7044 }
7045
7046 static ssize_t do_rbd_add(const char *buf, size_t count)
7047 {
7048         struct rbd_device *rbd_dev = NULL;
7049         struct ceph_options *ceph_opts = NULL;
7050         struct rbd_options *rbd_opts = NULL;
7051         struct rbd_spec *spec = NULL;
7052         struct rbd_client *rbdc;
7053         int rc;
7054
7055         if (!capable(CAP_SYS_ADMIN))
7056                 return -EPERM;
7057
7058         if (!try_module_get(THIS_MODULE))
7059                 return -ENODEV;
7060
7061         /* parse add command */
7062         rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
7063         if (rc < 0)
7064                 goto out;
7065
7066         rbdc = rbd_get_client(ceph_opts);
7067         if (IS_ERR(rbdc)) {
7068                 rc = PTR_ERR(rbdc);
7069                 goto err_out_args;
7070         }
7071
7072         /* pick the pool */
7073         rc = ceph_pg_poolid_by_name(rbdc->client->osdc.osdmap, spec->pool_name);
7074         if (rc < 0) {
7075                 if (rc == -ENOENT)
7076                         pr_info("pool %s does not exist\n", spec->pool_name);
7077                 goto err_out_client;
7078         }
7079         spec->pool_id = (u64)rc;
7080
7081         rbd_dev = rbd_dev_create(rbdc, spec, rbd_opts);
7082         if (!rbd_dev) {
7083                 rc = -ENOMEM;
7084                 goto err_out_client;
7085         }
7086         rbdc = NULL;            /* rbd_dev now owns this */
7087         spec = NULL;            /* rbd_dev now owns this */
7088         rbd_opts = NULL;        /* rbd_dev now owns this */
7089
7090         /* if we are mapping a snapshot it will be a read-only mapping */
7091         if (rbd_dev->opts->read_only ||
7092             strcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME))
7093                 __set_bit(RBD_DEV_FLAG_READONLY, &rbd_dev->flags);
7094
7095         rbd_dev->config_info = kstrdup(buf, GFP_KERNEL);
7096         if (!rbd_dev->config_info) {
7097                 rc = -ENOMEM;
7098                 goto err_out_rbd_dev;
7099         }
7100
7101         rc = rbd_dev_image_probe(rbd_dev, 0);
7102         if (rc < 0)
7103                 goto err_out_rbd_dev;
7104
7105         if (rbd_dev->opts->alloc_size > rbd_dev->layout.object_size) {
7106                 rbd_warn(rbd_dev, "alloc_size adjusted to %u",
7107                          rbd_dev->layout.object_size);
7108                 rbd_dev->opts->alloc_size = rbd_dev->layout.object_size;
7109         }
7110
7111         rc = rbd_dev_device_setup(rbd_dev);
7112         if (rc)
7113                 goto err_out_image_probe;
7114
7115         rc = rbd_add_acquire_lock(rbd_dev);
7116         if (rc)
7117                 goto err_out_image_lock;
7118
7119         /* Everything's ready.  Announce the disk to the world. */
7120
7121         rc = device_add(&rbd_dev->dev);
7122         if (rc)
7123                 goto err_out_image_lock;
7124
7125         rc = device_add_disk(&rbd_dev->dev, rbd_dev->disk, NULL);
7126         if (rc)
7127                 goto err_out_cleanup_disk;
7128
7129         spin_lock(&rbd_dev_list_lock);
7130         list_add_tail(&rbd_dev->node, &rbd_dev_list);
7131         spin_unlock(&rbd_dev_list_lock);
7132
7133         pr_info("%s: capacity %llu features 0x%llx\n", rbd_dev->disk->disk_name,
7134                 (unsigned long long)get_capacity(rbd_dev->disk) << SECTOR_SHIFT,
7135                 rbd_dev->header.features);
7136         rc = count;
7137 out:
7138         module_put(THIS_MODULE);
7139         return rc;
7140
7141 err_out_cleanup_disk:
7142         rbd_free_disk(rbd_dev);
7143 err_out_image_lock:
7144         rbd_dev_image_unlock(rbd_dev);
7145         rbd_dev_device_release(rbd_dev);
7146 err_out_image_probe:
7147         rbd_dev_image_release(rbd_dev);
7148 err_out_rbd_dev:
7149         rbd_dev_destroy(rbd_dev);
7150 err_out_client:
7151         rbd_put_client(rbdc);
7152 err_out_args:
7153         rbd_spec_put(spec);
7154         kfree(rbd_opts);
7155         goto out;
7156 }
7157
7158 static ssize_t add_store(const struct bus_type *bus, const char *buf, size_t count)
7159 {
7160         if (single_major)
7161                 return -EINVAL;
7162
7163         return do_rbd_add(buf, count);
7164 }
7165
7166 static ssize_t add_single_major_store(const struct bus_type *bus, const char *buf,
7167                                       size_t count)
7168 {
7169         return do_rbd_add(buf, count);
7170 }
7171
7172 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
7173 {
7174         while (rbd_dev->parent) {
7175                 struct rbd_device *first = rbd_dev;
7176                 struct rbd_device *second = first->parent;
7177                 struct rbd_device *third;
7178
7179                 /*
7180                  * Follow to the parent with no grandparent and
7181                  * remove it.
7182                  */
7183                 while (second && (third = second->parent)) {
7184                         first = second;
7185                         second = third;
7186                 }
7187                 rbd_assert(second);
7188                 rbd_dev_image_release(second);
7189                 rbd_dev_destroy(second);
7190                 first->parent = NULL;
7191                 first->parent_overlap = 0;
7192
7193                 rbd_assert(first->parent_spec);
7194                 rbd_spec_put(first->parent_spec);
7195                 first->parent_spec = NULL;
7196         }
7197 }
7198
7199 static ssize_t do_rbd_remove(const char *buf, size_t count)
7200 {
7201         struct rbd_device *rbd_dev = NULL;
7202         int dev_id;
7203         char opt_buf[6];
7204         bool force = false;
7205         int ret;
7206
7207         if (!capable(CAP_SYS_ADMIN))
7208                 return -EPERM;
7209
7210         dev_id = -1;
7211         opt_buf[0] = '\0';
7212         sscanf(buf, "%d %5s", &dev_id, opt_buf);
7213         if (dev_id < 0) {
7214                 pr_err("dev_id out of range\n");
7215                 return -EINVAL;
7216         }
7217         if (opt_buf[0] != '\0') {
7218                 if (!strcmp(opt_buf, "force")) {
7219                         force = true;
7220                 } else {
7221                         pr_err("bad remove option at '%s'\n", opt_buf);
7222                         return -EINVAL;
7223                 }
7224         }
7225
7226         ret = -ENOENT;
7227         spin_lock(&rbd_dev_list_lock);
7228         list_for_each_entry(rbd_dev, &rbd_dev_list, node) {
7229                 if (rbd_dev->dev_id == dev_id) {
7230                         ret = 0;
7231                         break;
7232                 }
7233         }
7234         if (!ret) {
7235                 spin_lock_irq(&rbd_dev->lock);
7236                 if (rbd_dev->open_count && !force)
7237                         ret = -EBUSY;
7238                 else if (test_and_set_bit(RBD_DEV_FLAG_REMOVING,
7239                                           &rbd_dev->flags))
7240                         ret = -EINPROGRESS;
7241                 spin_unlock_irq(&rbd_dev->lock);
7242         }
7243         spin_unlock(&rbd_dev_list_lock);
7244         if (ret)
7245                 return ret;
7246
7247         if (force) {
7248                 /*
7249                  * Prevent new IO from being queued and wait for existing
7250                  * IO to complete/fail.
7251                  */
7252                 blk_mq_freeze_queue(rbd_dev->disk->queue);
7253                 blk_mark_disk_dead(rbd_dev->disk);
7254         }
7255
7256         del_gendisk(rbd_dev->disk);
7257         spin_lock(&rbd_dev_list_lock);
7258         list_del_init(&rbd_dev->node);
7259         spin_unlock(&rbd_dev_list_lock);
7260         device_del(&rbd_dev->dev);
7261
7262         rbd_dev_image_unlock(rbd_dev);
7263         rbd_dev_device_release(rbd_dev);
7264         rbd_dev_image_release(rbd_dev);
7265         rbd_dev_destroy(rbd_dev);
7266         return count;
7267 }
7268
7269 static ssize_t remove_store(const struct bus_type *bus, const char *buf, size_t count)
7270 {
7271         if (single_major)
7272                 return -EINVAL;
7273
7274         return do_rbd_remove(buf, count);
7275 }
7276
7277 static ssize_t remove_single_major_store(const struct bus_type *bus, const char *buf,
7278                                          size_t count)
7279 {
7280         return do_rbd_remove(buf, count);
7281 }
7282
7283 /*
7284  * create control files in sysfs
7285  * /sys/bus/rbd/...
7286  */
7287 static int __init rbd_sysfs_init(void)
7288 {
7289         int ret;
7290
7291         ret = device_register(&rbd_root_dev);
7292         if (ret < 0) {
7293                 put_device(&rbd_root_dev);
7294                 return ret;
7295         }
7296
7297         ret = bus_register(&rbd_bus_type);
7298         if (ret < 0)
7299                 device_unregister(&rbd_root_dev);
7300
7301         return ret;
7302 }
7303
7304 static void __exit rbd_sysfs_cleanup(void)
7305 {
7306         bus_unregister(&rbd_bus_type);
7307         device_unregister(&rbd_root_dev);
7308 }
7309
7310 static int __init rbd_slab_init(void)
7311 {
7312         rbd_assert(!rbd_img_request_cache);
7313         rbd_img_request_cache = KMEM_CACHE(rbd_img_request, 0);
7314         if (!rbd_img_request_cache)
7315                 return -ENOMEM;
7316
7317         rbd_assert(!rbd_obj_request_cache);
7318         rbd_obj_request_cache = KMEM_CACHE(rbd_obj_request, 0);
7319         if (!rbd_obj_request_cache)
7320                 goto out_err;
7321
7322         return 0;
7323
7324 out_err:
7325         kmem_cache_destroy(rbd_img_request_cache);
7326         rbd_img_request_cache = NULL;
7327         return -ENOMEM;
7328 }
7329
7330 static void rbd_slab_exit(void)
7331 {
7332         rbd_assert(rbd_obj_request_cache);
7333         kmem_cache_destroy(rbd_obj_request_cache);
7334         rbd_obj_request_cache = NULL;
7335
7336         rbd_assert(rbd_img_request_cache);
7337         kmem_cache_destroy(rbd_img_request_cache);
7338         rbd_img_request_cache = NULL;
7339 }
7340
7341 static int __init rbd_init(void)
7342 {
7343         int rc;
7344
7345         if (!libceph_compatible(NULL)) {
7346                 rbd_warn(NULL, "libceph incompatibility (quitting)");
7347                 return -EINVAL;
7348         }
7349
7350         rc = rbd_slab_init();
7351         if (rc)
7352                 return rc;
7353
7354         /*
7355          * The number of active work items is limited by the number of
7356          * rbd devices * queue depth, so leave @max_active at default.
7357          */
7358         rbd_wq = alloc_workqueue(RBD_DRV_NAME, WQ_MEM_RECLAIM, 0);
7359         if (!rbd_wq) {
7360                 rc = -ENOMEM;
7361                 goto err_out_slab;
7362         }
7363
7364         if (single_major) {
7365                 rbd_major = register_blkdev(0, RBD_DRV_NAME);
7366                 if (rbd_major < 0) {
7367                         rc = rbd_major;
7368                         goto err_out_wq;
7369                 }
7370         }
7371
7372         rc = rbd_sysfs_init();
7373         if (rc)
7374                 goto err_out_blkdev;
7375
7376         if (single_major)
7377                 pr_info("loaded (major %d)\n", rbd_major);
7378         else
7379                 pr_info("loaded\n");
7380
7381         return 0;
7382
7383 err_out_blkdev:
7384         if (single_major)
7385                 unregister_blkdev(rbd_major, RBD_DRV_NAME);
7386 err_out_wq:
7387         destroy_workqueue(rbd_wq);
7388 err_out_slab:
7389         rbd_slab_exit();
7390         return rc;
7391 }
7392
7393 static void __exit rbd_exit(void)
7394 {
7395         ida_destroy(&rbd_dev_id_ida);
7396         rbd_sysfs_cleanup();
7397         if (single_major)
7398                 unregister_blkdev(rbd_major, RBD_DRV_NAME);
7399         destroy_workqueue(rbd_wq);
7400         rbd_slab_exit();
7401 }
7402
7403 module_init(rbd_init);
7404 module_exit(rbd_exit);
7405
7406 MODULE_AUTHOR("Alex Elder <elder@inktank.com>");
7407 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
7408 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
7409 /* following authorship retained from original osdblk.c */
7410 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
7411
7412 MODULE_DESCRIPTION("RADOS Block Device (RBD) driver");
7413 MODULE_LICENSE("GPL");