From 3da691bf436690c4bb943d5d16e5934937625578 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Mon, 29 Jan 2018 14:04:08 +0100 Subject: [PATCH] rbd: new request handling code The notable changes are: - instead of explicitly stat'ing the object to see if it exists before issuing the write, send the write optimistically along with the stat in a single OSD request - zero copyup optimization - all object requests are associated with an image request and have a valid ->img_request pointer; there are no standalone (!IMG_DATA) object requests anymore - code is structured as a state machine (vs a bunch of callbacks with implicit state) Signed-off-by: Ilya Dryomov --- drivers/block/rbd.c | 678 ++++++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 601 insertions(+), 77 deletions(-) diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index bff3e138543f..1bffad122dc2 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -235,11 +235,37 @@ enum obj_req_flags { OBJ_REQ_EXISTS, /* target exists: no = 0, yes = 1 */ }; +/* + * Writes go through the following state machine to deal with + * layering: + * + * need copyup + * RBD_OBJ_WRITE_GUARD ---------------> RBD_OBJ_WRITE_COPYUP + * | ^ | + * v \------------------------------/ + * done + * ^ + * | + * RBD_OBJ_WRITE_FLAT + * + * Writes start in RBD_OBJ_WRITE_GUARD or _FLAT, depending on whether + * there is a parent or not. + */ +enum rbd_obj_write_state { + RBD_OBJ_WRITE_FLAT = 1, + RBD_OBJ_WRITE_GUARD, + RBD_OBJ_WRITE_COPYUP, +}; + struct rbd_obj_request { u64 object_no; u64 offset; /* object start byte */ u64 length; /* bytes from offset */ unsigned long flags; + union { + bool tried_parent; /* for reads */ + enum rbd_obj_write_state write_state; /* for writes */ + }; /* * An object request associated with an image will have its @@ -1283,6 +1309,27 @@ static void zero_bvecs(struct ceph_bvec_iter *bvec_pos, u32 off, u32 bytes) } /* + * Zero a range in @obj_req data buffer defined by a bio (list) or + * bio_vec array. + * + * @off is relative to the start of the data buffer. + */ +static void rbd_obj_zero_range(struct rbd_obj_request *obj_req, u32 off, + u32 bytes) +{ + switch (obj_req->type) { + case OBJ_REQUEST_BIO: + zero_bios(&obj_req->bio_pos, off, bytes); + break; + case OBJ_REQUEST_BVECS: + zero_bvecs(&obj_req->bvec_pos, off, bytes); + break; + default: + rbd_assert(0); + } +} + +/* * The default/initial value for all object request flags is 0. For * each flag, once its value is set to 1 it is never reset to 0 * again. @@ -1567,6 +1614,35 @@ rbd_img_request_op_type(struct rbd_img_request *img_request) return OBJ_OP_READ; } +static bool rbd_obj_is_entire(struct rbd_obj_request *obj_req) +{ + struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; + + return !obj_req->offset && + obj_req->length == rbd_dev->layout.object_size; +} + +static bool rbd_obj_is_tail(struct rbd_obj_request *obj_req) +{ + struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; + + return obj_req->offset + obj_req->length == + rbd_dev->layout.object_size; +} + +static bool rbd_img_is_write(struct rbd_img_request *img_req) +{ + switch (rbd_img_request_op_type(img_req)) { + case OBJ_OP_READ: + return false; + case OBJ_OP_WRITE: + case OBJ_OP_DISCARD: + return true; + default: + rbd_assert(0); + } +} + static void rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request) { @@ -1697,63 +1773,28 @@ static void rbd_osd_call_callback(struct rbd_obj_request *obj_request) obj_request_done_set(obj_request); } +static void rbd_obj_handle_request(struct rbd_obj_request *obj_req); + static void rbd_osd_req_callback(struct ceph_osd_request *osd_req) { - struct rbd_obj_request *obj_request = osd_req->r_priv; - u16 opcode; - - dout("%s: osd_req %p\n", __func__, osd_req); - rbd_assert(osd_req == obj_request->osd_req); - if (obj_request_img_data_test(obj_request)) { - rbd_assert(obj_request->img_request); - rbd_assert(obj_request->which != BAD_WHICH); - } else { - rbd_assert(obj_request->which == BAD_WHICH); - } + struct rbd_obj_request *obj_req = osd_req->r_priv; - if (osd_req->r_result < 0) - obj_request->result = osd_req->r_result; + dout("%s osd_req %p result %d for obj_req %p\n", __func__, osd_req, + osd_req->r_result, obj_req); + rbd_assert(osd_req == obj_req->osd_req); - /* - * We support a 64-bit length, but ultimately it has to be - * passed to the block layer, which just supports a 32-bit - * length field. - */ - obj_request->xferred = osd_req->r_ops[0].outdata_len; - rbd_assert(obj_request->xferred < (u64)UINT_MAX); - - opcode = osd_req->r_ops[0].op; - switch (opcode) { - case CEPH_OSD_OP_READ: - rbd_osd_read_callback(obj_request); - break; - case CEPH_OSD_OP_SETALLOCHINT: - rbd_assert(osd_req->r_ops[1].op == CEPH_OSD_OP_WRITE || - osd_req->r_ops[1].op == CEPH_OSD_OP_WRITEFULL); - /* fall through */ - case CEPH_OSD_OP_WRITE: - case CEPH_OSD_OP_WRITEFULL: - rbd_osd_write_callback(obj_request); - break; - case CEPH_OSD_OP_STAT: - rbd_osd_stat_callback(obj_request); - break; - case CEPH_OSD_OP_DELETE: - case CEPH_OSD_OP_TRUNCATE: - case CEPH_OSD_OP_ZERO: - rbd_osd_discard_callback(obj_request); - break; - case CEPH_OSD_OP_CALL: - rbd_osd_call_callback(obj_request); - break; - default: - rbd_warn(NULL, "unexpected OSD op: object_no %016llx opcode %d", - obj_request->object_no, opcode); - break; - } + obj_req->result = osd_req->r_result < 0 ? osd_req->r_result : 0; + if (!obj_req->result && !rbd_img_is_write(obj_req->img_request)) + obj_req->xferred = osd_req->r_result; + else + /* + * Writes aren't allowed to return a data payload. In some + * guarded write cases (e.g. stat + zero on an empty object) + * a stat response makes it through, but we don't care. + */ + obj_req->xferred = 0; - if (obj_request_done_test(obj_request)) - rbd_obj_request_complete(obj_request); + rbd_obj_handle_request(obj_req); } static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request) @@ -1806,12 +1847,6 @@ err_req: return NULL; } -/* - * Create an osd request. A read request has one osd op (read). - * A write request has either one (watch) or two (hint+write) osd ops. - * (All rbd data writes are prefixed with an allocation hint op, but - * technically osd watch is a write request, hence this distinction.) - */ static struct ceph_osd_request *rbd_osd_req_create( struct rbd_device *rbd_dev, enum obj_operation_type op_type, @@ -1831,8 +1866,6 @@ static struct ceph_osd_request *rbd_osd_req_create( snapc = img_request->snapc; } - rbd_assert(num_ops == 1 || ((op_type == OBJ_OP_WRITE) && num_ops == 2)); - return __rbd_osd_req_create(rbd_dev, snapc, num_ops, (op_type == OBJ_OP_WRITE || op_type == OBJ_OP_DISCARD) ? CEPH_OSD_FLAG_WRITE : CEPH_OSD_FLAG_READ, obj_request); @@ -2251,6 +2284,211 @@ static void rbd_img_obj_request_fill(struct rbd_obj_request *obj_request, rbd_osd_req_format_read(obj_request); } +static void rbd_osd_req_setup_data(struct rbd_obj_request *obj_req, u32 which) +{ + switch (obj_req->type) { + case OBJ_REQUEST_BIO: + osd_req_op_extent_osd_data_bio(obj_req->osd_req, which, + &obj_req->bio_pos, + obj_req->length); + break; + case OBJ_REQUEST_BVECS: + rbd_assert(obj_req->bvec_pos.iter.bi_size == + obj_req->length); + osd_req_op_extent_osd_data_bvec_pos(obj_req->osd_req, which, + &obj_req->bvec_pos); + break; + default: + rbd_assert(0); + } +} + +static int rbd_obj_setup_read(struct rbd_obj_request *obj_req) +{ + struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; + + obj_req->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1, obj_req); + if (!obj_req->osd_req) + return -ENOMEM; + + osd_req_op_extent_init(obj_req->osd_req, 0, CEPH_OSD_OP_READ, + obj_req->offset, obj_req->length, 0, 0); + rbd_osd_req_setup_data(obj_req, 0); + + rbd_osd_req_format_read(obj_req); + return 0; +} + +static int __rbd_obj_setup_stat(struct rbd_obj_request *obj_req, + unsigned int which) +{ + struct page **pages; + + /* + * The response data for a STAT call consists of: + * le64 length; + * struct { + * le32 tv_sec; + * le32 tv_nsec; + * } mtime; + */ + pages = ceph_alloc_page_vector(1, GFP_NOIO); + if (IS_ERR(pages)) + return PTR_ERR(pages); + + osd_req_op_init(obj_req->osd_req, which, CEPH_OSD_OP_STAT, 0); + osd_req_op_raw_data_in_pages(obj_req->osd_req, which, pages, + 8 + sizeof(struct ceph_timespec), + 0, false, true); + return 0; +} + +static void __rbd_obj_setup_write(struct rbd_obj_request *obj_req, + unsigned int which) +{ + struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; + u16 opcode; + + osd_req_op_alloc_hint_init(obj_req->osd_req, which++, + rbd_dev->layout.object_size, + rbd_dev->layout.object_size); + + if (rbd_obj_is_entire(obj_req)) + opcode = CEPH_OSD_OP_WRITEFULL; + else + opcode = CEPH_OSD_OP_WRITE; + + osd_req_op_extent_init(obj_req->osd_req, which, opcode, + obj_req->offset, obj_req->length, 0, 0); + rbd_osd_req_setup_data(obj_req, which++); + + rbd_assert(which == obj_req->osd_req->r_num_ops); + rbd_osd_req_format_write(obj_req); +} + +static int rbd_obj_setup_write(struct rbd_obj_request *obj_req) +{ + struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; + unsigned int num_osd_ops, which = 0; + int ret; + + if (obj_request_overlaps_parent(obj_req)) { + obj_req->write_state = RBD_OBJ_WRITE_GUARD; + num_osd_ops = 3; /* stat + setallochint + write/writefull */ + } else { + obj_req->write_state = RBD_OBJ_WRITE_FLAT; + num_osd_ops = 2; /* setallochint + write/writefull */ + } + + obj_req->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_WRITE, + num_osd_ops, obj_req); + if (!obj_req->osd_req) + return -ENOMEM; + + if (obj_request_overlaps_parent(obj_req)) { + ret = __rbd_obj_setup_stat(obj_req, which++); + if (ret) + return ret; + } + + __rbd_obj_setup_write(obj_req, which); + return 0; +} + +static void __rbd_obj_setup_discard(struct rbd_obj_request *obj_req, + unsigned int which) +{ + u16 opcode; + + if (rbd_obj_is_entire(obj_req)) { + if (obj_request_overlaps_parent(obj_req)) { + opcode = CEPH_OSD_OP_TRUNCATE; + } else { + osd_req_op_init(obj_req->osd_req, which++, + CEPH_OSD_OP_DELETE, 0); + opcode = 0; + } + } else if (rbd_obj_is_tail(obj_req)) { + opcode = CEPH_OSD_OP_TRUNCATE; + } else { + opcode = CEPH_OSD_OP_ZERO; + } + + if (opcode) + osd_req_op_extent_init(obj_req->osd_req, which++, opcode, + obj_req->offset, obj_req->length, + 0, 0); + + rbd_assert(which == obj_req->osd_req->r_num_ops); + rbd_osd_req_format_write(obj_req); +} + +static int rbd_obj_setup_discard(struct rbd_obj_request *obj_req) +{ + struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; + unsigned int num_osd_ops, which = 0; + int ret; + + if (rbd_obj_is_entire(obj_req)) { + obj_req->write_state = RBD_OBJ_WRITE_FLAT; + num_osd_ops = 1; /* truncate/delete */ + } else { + if (obj_request_overlaps_parent(obj_req)) { + obj_req->write_state = RBD_OBJ_WRITE_GUARD; + num_osd_ops = 2; /* stat + truncate/zero */ + } else { + obj_req->write_state = RBD_OBJ_WRITE_FLAT; + num_osd_ops = 1; /* truncate/zero */ + } + } + + obj_req->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_DISCARD, + num_osd_ops, obj_req); + if (!obj_req->osd_req) + return -ENOMEM; + + if (!rbd_obj_is_entire(obj_req) && + obj_request_overlaps_parent(obj_req)) { + ret = __rbd_obj_setup_stat(obj_req, which++); + if (ret) + return ret; + } + + __rbd_obj_setup_discard(obj_req, which); + return 0; +} + +/* + * For each object request in @img_req, allocate an OSD request, add + * individual OSD ops and prepare them for submission. The number of + * OSD ops depends on op_type and the overlap point (if any). + */ +static int __rbd_img_fill_request(struct rbd_img_request *img_req) +{ + struct rbd_obj_request *obj_req; + int ret; + + for_each_obj_request(img_req, obj_req) { + switch (rbd_img_request_op_type(img_req)) { + case OBJ_OP_READ: + ret = rbd_obj_setup_read(obj_req); + break; + case OBJ_OP_WRITE: + ret = rbd_obj_setup_write(obj_req); + break; + case OBJ_OP_DISCARD: + ret = rbd_obj_setup_discard(obj_req); + break; + default: + rbd_assert(0); + } + if (ret) + return ret; + } + + return 0; +} + /* * Split up an image request into one or more object requests, each * to a different object. The "type" parameter indicates whether @@ -2268,7 +2506,6 @@ static int rbd_img_request_fill(struct rbd_img_request *img_request, struct rbd_obj_request *next_obj_request; struct ceph_bio_iter bio_it; struct ceph_bvec_iter bvec_it; - enum obj_operation_type op_type; u64 img_offset; u64 resid; @@ -2278,7 +2515,6 @@ static int rbd_img_request_fill(struct rbd_img_request *img_request, img_offset = img_request->offset; resid = img_request->length; rbd_assert(resid > 0); - op_type = rbd_img_request_op_type(img_request); if (type == OBJ_REQUEST_BIO) { bio_it = *(struct ceph_bio_iter *)data_desc; @@ -2289,7 +2525,6 @@ static int rbd_img_request_fill(struct rbd_img_request *img_request, } while (resid) { - struct ceph_osd_request *osd_req; u64 object_no = img_offset >> rbd_dev->header.obj_order; u64 offset = rbd_segment_offset(rbd_dev, img_offset); u64 length = rbd_segment_length(rbd_dev, img_offset, resid); @@ -2317,23 +2552,14 @@ static int rbd_img_request_fill(struct rbd_img_request *img_request, ceph_bvec_iter_advance(&bvec_it, length); } - osd_req = rbd_osd_req_create(rbd_dev, op_type, - (op_type == OBJ_OP_WRITE) ? 2 : 1, - obj_request); - if (!osd_req) - goto out_unwind; - - obj_request->osd_req = osd_req; obj_request->callback = rbd_img_obj_callback; obj_request->img_offset = img_offset; - rbd_img_obj_request_fill(obj_request, osd_req, op_type, 0); - img_offset += length; resid -= length; } - return 0; + return __rbd_img_fill_request(img_request); out_unwind: for_each_obj_request_safe(img_request, obj_request, next_obj_request) @@ -2712,16 +2938,171 @@ static int rbd_img_request_submit(struct rbd_img_request *img_request) rbd_img_request_get(img_request); for_each_obj_request_safe(img_request, obj_request, next_obj_request) { - ret = rbd_img_obj_request_submit(obj_request); - if (ret) - goto out_put_ireq; + rbd_obj_request_submit(obj_request); } -out_put_ireq: rbd_img_request_put(img_request); return ret; } +static void rbd_img_end_child_request(struct rbd_img_request *img_req); + +static int rbd_obj_read_from_parent(struct rbd_obj_request *obj_req, + u64 img_offset, u32 bytes) +{ + struct rbd_img_request *img_req = obj_req->img_request; + struct rbd_img_request *child_img_req; + int ret; + + child_img_req = rbd_parent_request_create(obj_req, img_offset, bytes); + if (!child_img_req) + return -ENOMEM; + + child_img_req->callback = rbd_img_end_child_request; + + if (!rbd_img_is_write(img_req)) { + switch (obj_req->type) { + case OBJ_REQUEST_BIO: + ret = rbd_img_request_fill(child_img_req, + OBJ_REQUEST_BIO, + &obj_req->bio_pos); + break; + case OBJ_REQUEST_BVECS: + ret = rbd_img_request_fill(child_img_req, + OBJ_REQUEST_BVECS, + &obj_req->bvec_pos); + break; + default: + rbd_assert(0); + } + } else { + struct ceph_bvec_iter it = { + .bvecs = obj_req->copyup_bvecs, + .iter = { .bi_size = bytes }, + }; + + ret = rbd_img_request_fill(child_img_req, OBJ_REQUEST_BVECS, + &it); + } + if (ret) { + rbd_img_request_put(child_img_req); + return ret; + } + + rbd_img_request_submit(child_img_req); + return 0; +} + +static bool rbd_obj_handle_read(struct rbd_obj_request *obj_req) +{ + struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; + int ret; + + if (obj_req->result == -ENOENT && + obj_req->img_offset < rbd_dev->parent_overlap && + !obj_req->tried_parent) { + u64 obj_overlap = min(obj_req->length, + rbd_dev->parent_overlap - obj_req->img_offset); + + obj_req->tried_parent = true; + ret = rbd_obj_read_from_parent(obj_req, obj_req->img_offset, + obj_overlap); + if (ret) { + obj_req->result = ret; + return true; + } + return false; + } + + /* + * -ENOENT means a hole in the image -- zero-fill the entire + * length of the request. A short read also implies zero-fill + * to the end of the request. In both cases we update xferred + * count to indicate the whole request was satisfied. + */ + if (obj_req->result == -ENOENT || + (!obj_req->result && obj_req->xferred < obj_req->length)) { + rbd_assert(!obj_req->xferred || !obj_req->result); + rbd_obj_zero_range(obj_req, obj_req->xferred, + obj_req->length - obj_req->xferred); + obj_req->result = 0; + obj_req->xferred = obj_req->length; + } + + return true; +} + +/* + * copyup_bvecs pages are never highmem pages + */ +static bool is_zero_bvecs(struct bio_vec *bvecs, u32 bytes) +{ + struct ceph_bvec_iter it = { + .bvecs = bvecs, + .iter = { .bi_size = bytes }, + }; + + ceph_bvec_iter_advance_step(&it, bytes, ({ + if (memchr_inv(page_address(bv.bv_page) + bv.bv_offset, 0, + bv.bv_len)) + return false; + })); + return true; +} + +static int rbd_obj_issue_copyup(struct rbd_obj_request *obj_req, u32 bytes) +{ + struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; + unsigned int num_osd_ops = obj_req->osd_req->r_num_ops; + + dout("%s obj_req %p bytes %u\n", __func__, obj_req, bytes); + rbd_assert(obj_req->osd_req->r_ops[0].op == CEPH_OSD_OP_STAT); + rbd_osd_req_destroy(obj_req->osd_req); + + /* + * Create a copyup request with the same number of OSD ops as + * the original request. The original request was stat + op(s), + * the new copyup request will be copyup + the same op(s). + */ + obj_req->osd_req = rbd_osd_req_create(rbd_dev, + rbd_img_request_op_type(obj_req->img_request), + num_osd_ops, obj_req); + if (!obj_req->osd_req) + return -ENOMEM; + + /* + * Only send non-zero copyup data to save some I/O and network + * bandwidth -- zero copyup data is equivalent to the object not + * existing. + */ + if (is_zero_bvecs(obj_req->copyup_bvecs, bytes)) { + dout("%s obj_req %p detected zeroes\n", __func__, obj_req); + bytes = 0; + } + + osd_req_op_cls_init(obj_req->osd_req, 0, CEPH_OSD_OP_CALL, "rbd", + "copyup"); + osd_req_op_cls_request_data_bvecs(obj_req->osd_req, 0, + obj_req->copyup_bvecs, bytes); + + switch (rbd_img_request_op_type(obj_req->img_request)) { + case OBJ_OP_WRITE: + __rbd_obj_setup_write(obj_req, 1); + break; + case OBJ_OP_DISCARD: + rbd_assert(!rbd_obj_is_entire(obj_req)); + __rbd_obj_setup_discard(obj_req, 1); + break; + default: + rbd_assert(0); + } + + rbd_obj_request_submit(obj_req); + /* FIXME: in lieu of rbd_img_obj_callback() */ + rbd_img_request_put(obj_req->img_request); + return 0; +} + static int setup_copyup_bvecs(struct rbd_obj_request *obj_req, u64 obj_overlap) { u32 i; @@ -2850,6 +3231,149 @@ out_err: obj_request_done_set(obj_request); } +static int rbd_obj_handle_write_guard(struct rbd_obj_request *obj_req) +{ + struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; + u64 img_offset; + u64 obj_overlap; + int ret; + + if (!obj_request_overlaps_parent(obj_req)) { + /* + * The overlap has become 0 (most likely because the + * image has been flattened). Use rbd_obj_issue_copyup() + * to re-submit the original write request -- the copyup + * operation itself will be a no-op, since someone must + * have populated the child object while we weren't + * looking. Move to WRITE_FLAT state as we'll be done + * with the operation once the null copyup completes. + */ + obj_req->write_state = RBD_OBJ_WRITE_FLAT; + return rbd_obj_issue_copyup(obj_req, 0); + } + + /* + * Determine the byte range covered by the object in the + * child image to which the original request was to be sent. + */ + img_offset = obj_req->img_offset - obj_req->offset; + obj_overlap = rbd_dev->layout.object_size; + + /* + * There is no defined parent data beyond the parent + * overlap, so limit what we read at that boundary if + * necessary. + */ + if (img_offset + obj_overlap > rbd_dev->parent_overlap) { + rbd_assert(img_offset < rbd_dev->parent_overlap); + obj_overlap = rbd_dev->parent_overlap - img_offset; + } + + ret = setup_copyup_bvecs(obj_req, obj_overlap); + if (ret) + return ret; + + obj_req->write_state = RBD_OBJ_WRITE_COPYUP; + return rbd_obj_read_from_parent(obj_req, img_offset, obj_overlap); +} + +static bool rbd_obj_handle_write(struct rbd_obj_request *obj_req) +{ + int ret; + +again: + switch (obj_req->write_state) { + case RBD_OBJ_WRITE_GUARD: + rbd_assert(!obj_req->xferred); + if (obj_req->result == -ENOENT) { + /* + * The target object doesn't exist. Read the data for + * the entire target object up to the overlap point (if + * any) from the parent, so we can use it for a copyup. + */ + ret = rbd_obj_handle_write_guard(obj_req); + if (ret) { + obj_req->result = ret; + return true; + } + return false; + } + /* fall through */ + case RBD_OBJ_WRITE_FLAT: + if (!obj_req->result) + /* + * There is no such thing as a successful short + * write -- indicate the whole request was satisfied. + */ + obj_req->xferred = obj_req->length; + return true; + case RBD_OBJ_WRITE_COPYUP: + obj_req->write_state = RBD_OBJ_WRITE_GUARD; + if (obj_req->result) + goto again; + + rbd_assert(obj_req->xferred); + ret = rbd_obj_issue_copyup(obj_req, obj_req->xferred); + if (ret) { + obj_req->result = ret; + return true; + } + return false; + default: + rbd_assert(0); + } +} + +/* + * Returns true if @obj_req is completed, or false otherwise. + */ +static bool __rbd_obj_handle_request(struct rbd_obj_request *obj_req) +{ + switch (rbd_img_request_op_type(obj_req->img_request)) { + case OBJ_OP_READ: + return rbd_obj_handle_read(obj_req); + case OBJ_OP_WRITE: + return rbd_obj_handle_write(obj_req); + case OBJ_OP_DISCARD: + if (rbd_obj_handle_write(obj_req)) { + /* + * Hide -ENOENT from delete/truncate/zero -- discarding + * a non-existent object is not a problem. + */ + if (obj_req->result == -ENOENT) { + obj_req->result = 0; + obj_req->xferred = obj_req->length; + } + return true; + } + return false; + default: + rbd_assert(0); + } +} + +static void rbd_img_end_child_request(struct rbd_img_request *img_req) +{ + struct rbd_obj_request *obj_req = img_req->obj_request; + + rbd_assert(test_bit(IMG_REQ_CHILD, &img_req->flags)); + + obj_req->result = img_req->result; + obj_req->xferred = img_req->xferred; + rbd_img_request_put(img_req); + + rbd_obj_handle_request(obj_req); +} + +static void rbd_obj_handle_request(struct rbd_obj_request *obj_req) +{ + if (!__rbd_obj_handle_request(obj_req)) + return; + + obj_request_done_set(obj_req); + rbd_obj_request_complete(obj_req); +} + static const struct rbd_client_id rbd_empty_cid; static bool rbd_cid_equal(const struct rbd_client_id *lhs, -- 2.11.0