From 28b240877bbcdc8add61be227f429b536edd4653 Mon Sep 17 00:00:00 2001 From: Paolo Bonzini Date: Thu, 11 Dec 2014 14:52:26 +0100 Subject: [PATCH] linux-aio: queue requests that cannot be submitted Keep a queue of requests that were not submitted; pass them to the kernel when a completion is reported, unless the queue is plugged. The array of iocbs is rebuilt every time from scratch. This avoids keeping the iocbs array and list synchronized. Signed-off-by: Paolo Bonzini Reviewed-by: Kevin Wolf Message-id: 1418305950-30924-2-git-send-email-pbonzini@redhat.com Signed-off-by: Stefan Hajnoczi --- block/linux-aio.c | 75 ++++++++++++++++++++++++------------------------------- 1 file changed, 33 insertions(+), 42 deletions(-) diff --git a/block/linux-aio.c b/block/linux-aio.c index d92513b0f9..b6fbfd8c13 100644 --- a/block/linux-aio.c +++ b/block/linux-aio.c @@ -35,14 +35,13 @@ struct qemu_laiocb { size_t nbytes; QEMUIOVector *qiov; bool is_read; - QLIST_ENTRY(qemu_laiocb) node; + QSIMPLEQ_ENTRY(qemu_laiocb) next; }; typedef struct { - struct iocb *iocbs[MAX_QUEUED_IO]; int plugged; - unsigned int size; unsigned int idx; + QSIMPLEQ_HEAD(, qemu_laiocb) pending; } LaioQueue; struct qemu_laio_state { @@ -59,6 +58,8 @@ struct qemu_laio_state { int event_max; }; +static int ioq_submit(struct qemu_laio_state *s); + static inline ssize_t io_event_ret(struct io_event *ev) { return (ssize_t)(((uint64_t)ev->res2 << 32) | ev->res); @@ -135,6 +136,10 @@ static void qemu_laio_completion_bh(void *opaque) qemu_laio_process_completion(s, laiocb); } + + if (!s->io_q.plugged && !QSIMPLEQ_EMPTY(&s->io_q.pending)) { + ioq_submit(s); + } } static void qemu_laio_completion_cb(EventNotifier *e) @@ -172,52 +177,40 @@ static const AIOCBInfo laio_aiocb_info = { static void ioq_init(LaioQueue *io_q) { - io_q->size = MAX_QUEUED_IO; - io_q->idx = 0; + QSIMPLEQ_INIT(&io_q->pending); io_q->plugged = 0; + io_q->idx = 0; } static int ioq_submit(struct qemu_laio_state *s) { - int ret, i = 0; - int len = s->io_q.idx; - - do { - ret = io_submit(s->ctx, len, s->io_q.iocbs); - } while (i++ < 3 && ret == -EAGAIN); + int ret, i; + int len = 0; + struct qemu_laiocb *aiocb; + struct iocb *iocbs[MAX_QUEUED_IO]; - /* empty io queue */ - s->io_q.idx = 0; + QSIMPLEQ_FOREACH(aiocb, &s->io_q.pending, next) { + iocbs[len++] = &aiocb->iocb; + if (len == MAX_QUEUED_IO) { + break; + } + } + ret = io_submit(s->ctx, len, iocbs); + if (ret == -EAGAIN) { + ret = 0; + } if (ret < 0) { - i = 0; - } else { - i = ret; + abort(); } - for (; i < len; i++) { - struct qemu_laiocb *laiocb = - container_of(s->io_q.iocbs[i], struct qemu_laiocb, iocb); - - laiocb->ret = (ret < 0) ? ret : -EIO; - qemu_laio_process_completion(s, laiocb); + for (i = 0; i < ret; i++) { + s->io_q.idx--; + QSIMPLEQ_REMOVE_HEAD(&s->io_q.pending, next); } return ret; } -static void ioq_enqueue(struct qemu_laio_state *s, struct iocb *iocb) -{ - unsigned int idx = s->io_q.idx; - - s->io_q.iocbs[idx++] = iocb; - s->io_q.idx = idx; - - /* submit immediately if queue is full */ - if (idx == s->io_q.size) { - ioq_submit(s); - } -} - void laio_io_plug(BlockDriverState *bs, void *aio_ctx) { struct qemu_laio_state *s = aio_ctx; @@ -236,7 +229,7 @@ int laio_io_unplug(BlockDriverState *bs, void *aio_ctx, bool unplug) return 0; } - if (s->io_q.idx > 0) { + if (!QSIMPLEQ_EMPTY(&s->io_q.pending)) { ret = ioq_submit(s); } @@ -276,12 +269,10 @@ BlockAIOCB *laio_submit(BlockDriverState *bs, void *aio_ctx, int fd, } io_set_eventfd(&laiocb->iocb, event_notifier_get_fd(&s->e)); - if (!s->io_q.plugged) { - if (io_submit(s->ctx, 1, &iocbs) < 0) { - goto out_free_aiocb; - } - } else { - ioq_enqueue(s, iocbs); + QSIMPLEQ_INSERT_TAIL(&s->io_q.pending, laiocb, next); + s->io_q.idx++; + if (s->io_q.idx == (s->io_q.plugged ? MAX_QUEUED_IO : 1)) { + ioq_submit(s); } return &laiocb->common; -- 2.11.0