OSDN Git Service

io_uring: improve poll completion performance
authorJens Axboe <axboe@kernel.dk>
Thu, 19 Dec 2019 19:06:02 +0000 (12:06 -0700)
committerJens Axboe <axboe@kernel.dk>
Tue, 21 Jan 2020 00:03:59 +0000 (17:03 -0700)
For busy IORING_OP_POLL_ADD workloads, we can have enough contention
on the completion lock that we fail the inline completion path quite
often as we fail the trylock on that lock. Add a list for deferred
completions that we can use in that case. This helps reduce the number
of async offloads we have to do, as if we get multiple completions in
a row, we'll piggy back on to the poll_llist instead of having to queue
our own offload.

Signed-off-by: Jens Axboe <axboe@kernel.dk>
fs/io_uring.c

index dfa99da..c54a8bd 100644 (file)
@@ -286,7 +286,8 @@ struct io_ring_ctx {
 
        struct {
                spinlock_t              completion_lock;
-               bool                    poll_multi_file;
+               struct llist_head       poll_llist;
+
                /*
                 * ->poll_list is protected by the ctx->uring_lock for
                 * io_uring instances that don't use IORING_SETUP_SQPOLL.
@@ -296,6 +297,7 @@ struct io_ring_ctx {
                struct list_head        poll_list;
                struct hlist_head       *cancel_hash;
                unsigned                cancel_hash_bits;
+               bool                    poll_multi_file;
 
                spinlock_t              inflight_lock;
                struct list_head        inflight_list;
@@ -453,7 +455,14 @@ struct io_kiocb {
        };
 
        struct io_async_ctx             *io;
-       struct file                     *ring_file;
+       union {
+               /*
+                * ring_file is only used in the submission path, and
+                * llist_node is only used for poll deferred completions
+                */
+               struct file             *ring_file;
+               struct llist_node       llist_node;
+       };
        int                             ring_fd;
        bool                            has_user;
        bool                            in_async;
@@ -725,6 +734,7 @@ static struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
        mutex_init(&ctx->uring_lock);
        init_waitqueue_head(&ctx->wait);
        spin_lock_init(&ctx->completion_lock);
+       init_llist_head(&ctx->poll_llist);
        INIT_LIST_HEAD(&ctx->poll_list);
        INIT_LIST_HEAD(&ctx->defer_list);
        INIT_LIST_HEAD(&ctx->timeout_list);
@@ -1320,6 +1330,20 @@ static inline unsigned int io_sqring_entries(struct io_ring_ctx *ctx)
        return smp_load_acquire(&rings->sq.tail) - ctx->cached_sq_head;
 }
 
+static inline bool io_req_multi_free(struct io_kiocb *req)
+{
+       /*
+        * If we're not using fixed files, we have to pair the completion part
+        * with the file put. Use regular completions for those, only batch
+        * free for fixed file and non-linked commands.
+        */
+       if (((req->flags & (REQ_F_FIXED_FILE|REQ_F_LINK)) == REQ_F_FIXED_FILE)
+           && !io_is_fallback_req(req) && !req->io)
+               return true;
+
+       return false;
+}
+
 /*
  * Find and free completed poll iocbs
  */
@@ -1339,14 +1363,7 @@ static void io_iopoll_complete(struct io_ring_ctx *ctx, unsigned int *nr_events,
                (*nr_events)++;
 
                if (refcount_dec_and_test(&req->refs)) {
-                       /* If we're not using fixed files, we have to pair the
-                        * completion part with the file put. Use regular
-                        * completions for those, only batch free for fixed
-                        * file and non-linked commands.
-                        */
-                       if (((req->flags & (REQ_F_FIXED_FILE|REQ_F_LINK)) ==
-                           REQ_F_FIXED_FILE) && !io_is_fallback_req(req) &&
-                           !req->io) {
+                       if (io_req_multi_free(req)) {
                                reqs[to_free++] = req;
                                if (to_free == ARRAY_SIZE(reqs))
                                        io_free_req_many(ctx, reqs, &to_free);
@@ -3081,6 +3098,44 @@ static void io_poll_complete_work(struct io_wq_work **workptr)
                io_wq_assign_next(workptr, nxt);
 }
 
+static void __io_poll_flush(struct io_ring_ctx *ctx, struct llist_node *nodes)
+{
+       void *reqs[IO_IOPOLL_BATCH];
+       struct io_kiocb *req, *tmp;
+       int to_free = 0;
+
+       spin_lock_irq(&ctx->completion_lock);
+       llist_for_each_entry_safe(req, tmp, nodes, llist_node) {
+               hash_del(&req->hash_node);
+               io_poll_complete(req, req->result, 0);
+
+               if (refcount_dec_and_test(&req->refs)) {
+                       if (io_req_multi_free(req)) {
+                               reqs[to_free++] = req;
+                               if (to_free == ARRAY_SIZE(reqs))
+                                       io_free_req_many(ctx, reqs, &to_free);
+                       } else {
+                               req->flags |= REQ_F_COMP_LOCKED;
+                               io_free_req(req);
+                       }
+               }
+       }
+       spin_unlock_irq(&ctx->completion_lock);
+
+       io_cqring_ev_posted(ctx);
+       io_free_req_many(ctx, reqs, &to_free);
+}
+
+static void io_poll_flush(struct io_wq_work **workptr)
+{
+       struct io_kiocb *req = container_of(*workptr, struct io_kiocb, work);
+       struct llist_node *nodes;
+
+       nodes = llist_del_all(&req->ctx->poll_llist);
+       if (nodes)
+               __io_poll_flush(req->ctx, nodes);
+}
+
 static int io_poll_wake(struct wait_queue_entry *wait, unsigned mode, int sync,
                        void *key)
 {
@@ -3088,7 +3143,6 @@ static int io_poll_wake(struct wait_queue_entry *wait, unsigned mode, int sync,
        struct io_kiocb *req = container_of(poll, struct io_kiocb, poll);
        struct io_ring_ctx *ctx = req->ctx;
        __poll_t mask = key_to_poll(key);
-       unsigned long flags;
 
        /* for instances that support it check for an event match first: */
        if (mask && !(mask & poll->events))
@@ -3102,17 +3156,31 @@ static int io_poll_wake(struct wait_queue_entry *wait, unsigned mode, int sync,
         * If we have a link timeout we're going to need the completion_lock
         * for finalizing the request, mark us as having grabbed that already.
         */
-       if (mask && spin_trylock_irqsave(&ctx->completion_lock, flags)) {
-               hash_del(&req->hash_node);
-               io_poll_complete(req, mask, 0);
-               req->flags |= REQ_F_COMP_LOCKED;
-               io_put_req(req);
-               spin_unlock_irqrestore(&ctx->completion_lock, flags);
+       if (mask) {
+               unsigned long flags;
 
-               io_cqring_ev_posted(ctx);
-       } else {
-               io_queue_async_work(req);
+               if (llist_empty(&ctx->poll_llist) &&
+                   spin_trylock_irqsave(&ctx->completion_lock, flags)) {
+                       hash_del(&req->hash_node);
+                       io_poll_complete(req, mask, 0);
+                       req->flags |= REQ_F_COMP_LOCKED;
+                       io_put_req(req);
+                       spin_unlock_irqrestore(&ctx->completion_lock, flags);
+
+                       io_cqring_ev_posted(ctx);
+                       req = NULL;
+               } else {
+                       req->result = mask;
+                       req->llist_node.next = NULL;
+                       /* if the list wasn't empty, we're done */
+                       if (!llist_add(&req->llist_node, &ctx->poll_llist))
+                               req = NULL;
+                       else
+                               req->work.func = io_poll_flush;
+               }
        }
+       if (req)
+               io_queue_async_work(req);
 
        return 1;
 }