OSDN Git Service

Merge tag 'for-linus-20190502' of git://git.kernel.dk/linux-block
authorLinus Torvalds <torvalds@linux-foundation.org>
Thu, 2 May 2019 16:55:04 +0000 (09:55 -0700)
committerLinus Torvalds <torvalds@linux-foundation.org>
Thu, 2 May 2019 16:55:04 +0000 (09:55 -0700)
Pull io_uring fixes from Jens Axboe:
 "This is mostly io_uring fixes/tweaks. Most of these were actually done
  in time for the last -rc, but I wanted to ensure that everything
  tested out great before including them. The code delta looks larger
  than it really is, as it's mostly just comment additions/changes.

  Outside of the comment additions/changes, this is mostly removal of
  unnecessary barriers. In all, this pull request contains:

   - Tweak to how we handle errors at submission time. We now post a
     completion event if the error occurs on behalf of an sqe, instead
     of returning it through the system call. If the error happens
     outside of a specific sqe, we return the error through the system
     call. This makes it nicer to use and makes the "normal" use case
     behave the same as the offload cases. (me)

   - Fix for a missing req reference drop from async context (me)

   - If an sqe is submitted with RWF_NOWAIT, don't punt it to async
     context. Return -EAGAIN directly, instead of using it as a hint to
     do async punt. (Stefan)

   - Fix notes on barriers (Stefan)

   - Remove unnecessary barriers (Stefan)

   - Fix potential double free of memory in setup error (Mark)

   - Further improve sq poll CPU validation (Mark)

   - Fix page allocation warning and leak on buffer registration error
     (Mark)

   - Fix iov_iter_type() for new no-ref flag (Ming)

   - Fix a case where dio doesn't honor bio no-page-ref (Ming)"

* tag 'for-linus-20190502' of git://git.kernel.dk/linux-block:
  io_uring: avoid page allocation warnings
  iov_iter: fix iov_iter_type
  block: fix handling for BIO_NO_PAGE_REF
  io_uring: drop req submit reference always in async punt
  io_uring: free allocated io_memory once
  io_uring: fix SQPOLL cpu validation
  io_uring: have submission side sqe errors post a cqe
  io_uring: remove unnecessary barrier after unsetting IORING_SQ_NEED_WAKEUP
  io_uring: remove unnecessary barrier after incrementing dropped counter
  io_uring: remove unnecessary barrier before reading SQ tail
  io_uring: remove unnecessary barrier after updating SQ head
  io_uring: remove unnecessary barrier before reading cq head
  io_uring: remove unnecessary barrier before wq_has_sleeper
  io_uring: fix notes on barriers
  io_uring: fix handling SQEs requesting NOWAIT

fs/block_dev.c
fs/io_uring.c
include/linux/uio.h

index 24615c7..bb28e2e 100644 (file)
@@ -264,7 +264,8 @@ __blkdev_direct_IO_simple(struct kiocb *iocb, struct iov_iter *iter,
        bio_for_each_segment_all(bvec, &bio, i, iter_all) {
                if (should_dirty && !PageCompound(bvec->bv_page))
                        set_page_dirty_lock(bvec->bv_page);
-               put_page(bvec->bv_page);
+               if (!bio_flagged(&bio, BIO_NO_PAGE_REF))
+                       put_page(bvec->bv_page);
        }
 
        if (unlikely(bio.bi_status))
index 0e9fb2c..84efb89 100644 (file)
@@ -4,15 +4,28 @@
  * supporting fast/efficient IO.
  *
  * A note on the read/write ordering memory barriers that are matched between
- * the application and kernel side. When the application reads the CQ ring
- * tail, it must use an appropriate smp_rmb() to order with the smp_wmb()
- * the kernel uses after writing the tail. Failure to do so could cause a
- * delay in when the application notices that completion events available.
- * This isn't a fatal condition. Likewise, the application must use an
- * appropriate smp_wmb() both before writing the SQ tail, and after writing
- * the SQ tail. The first one orders the sqe writes with the tail write, and
- * the latter is paired with the smp_rmb() the kernel will issue before
- * reading the SQ tail on submission.
+ * the application and kernel side.
+ *
+ * After the application reads the CQ ring tail, it must use an
+ * appropriate smp_rmb() to pair with the smp_wmb() the kernel uses
+ * before writing the tail (using smp_load_acquire to read the tail will
+ * do). It also needs a smp_mb() before updating CQ head (ordering the
+ * entry load(s) with the head store), pairing with an implicit barrier
+ * through a control-dependency in io_get_cqring (smp_store_release to
+ * store head will do). Failure to do so could lead to reading invalid
+ * CQ entries.
+ *
+ * Likewise, the application must use an appropriate smp_wmb() before
+ * writing the SQ tail (ordering SQ entry stores with the tail store),
+ * which pairs with smp_load_acquire in io_get_sqring (smp_store_release
+ * to store the tail will do). And it needs a barrier ordering the SQ
+ * head load before writing new SQ entries (smp_load_acquire to read
+ * head will do).
+ *
+ * When using the SQ poll thread (IORING_SETUP_SQPOLL), the application
+ * needs to check the SQ flags for IORING_SQ_NEED_WAKEUP *after*
+ * updating the SQ tail; a full memory barrier smp_mb() is needed
+ * between.
  *
  * Also see the examples in the liburing library:
  *
@@ -70,20 +83,108 @@ struct io_uring {
        u32 tail ____cacheline_aligned_in_smp;
 };
 
+/*
+ * This data is shared with the application through the mmap at offset
+ * IORING_OFF_SQ_RING.
+ *
+ * The offsets to the member fields are published through struct
+ * io_sqring_offsets when calling io_uring_setup.
+ */
 struct io_sq_ring {
+       /*
+        * Head and tail offsets into the ring; the offsets need to be
+        * masked to get valid indices.
+        *
+        * The kernel controls head and the application controls tail.
+        */
        struct io_uring         r;
+       /*
+        * Bitmask to apply to head and tail offsets (constant, equals
+        * ring_entries - 1)
+        */
        u32                     ring_mask;
+       /* Ring size (constant, power of 2) */
        u32                     ring_entries;
+       /*
+        * Number of invalid entries dropped by the kernel due to
+        * invalid index stored in array
+        *
+        * Written by the kernel, shouldn't be modified by the
+        * application (i.e. get number of "new events" by comparing to
+        * cached value).
+        *
+        * After a new SQ head value was read by the application this
+        * counter includes all submissions that were dropped reaching
+        * the new SQ head (and possibly more).
+        */
        u32                     dropped;
+       /*
+        * Runtime flags
+        *
+        * Written by the kernel, shouldn't be modified by the
+        * application.
+        *
+        * The application needs a full memory barrier before checking
+        * for IORING_SQ_NEED_WAKEUP after updating the sq tail.
+        */
        u32                     flags;
+       /*
+        * Ring buffer of indices into array of io_uring_sqe, which is
+        * mmapped by the application using the IORING_OFF_SQES offset.
+        *
+        * This indirection could e.g. be used to assign fixed
+        * io_uring_sqe entries to operations and only submit them to
+        * the queue when needed.
+        *
+        * The kernel modifies neither the indices array nor the entries
+        * array.
+        */
        u32                     array[];
 };
 
+/*
+ * This data is shared with the application through the mmap at offset
+ * IORING_OFF_CQ_RING.
+ *
+ * The offsets to the member fields are published through struct
+ * io_cqring_offsets when calling io_uring_setup.
+ */
 struct io_cq_ring {
+       /*
+        * Head and tail offsets into the ring; the offsets need to be
+        * masked to get valid indices.
+        *
+        * The application controls head and the kernel tail.
+        */
        struct io_uring         r;
+       /*
+        * Bitmask to apply to head and tail offsets (constant, equals
+        * ring_entries - 1)
+        */
        u32                     ring_mask;
+       /* Ring size (constant, power of 2) */
        u32                     ring_entries;
+       /*
+        * Number of completion events lost because the queue was full;
+        * this should be avoided by the application by making sure
+        * there are not more requests pending thatn there is space in
+        * the completion queue.
+        *
+        * Written by the kernel, shouldn't be modified by the
+        * application (i.e. get number of "new events" by comparing to
+        * cached value).
+        *
+        * As completion events come in out of order this counter is not
+        * ordered with any other data.
+        */
        u32                     overflow;
+       /*
+        * Ring buffer of completion events.
+        *
+        * The kernel writes completion events fresh every time they are
+        * produced, so the application is allowed to modify pending
+        * entries.
+        */
        struct io_uring_cqe     cqes[];
 };
 
@@ -221,7 +322,7 @@ struct io_kiocb {
        struct list_head        list;
        unsigned int            flags;
        refcount_t              refs;
-#define REQ_F_FORCE_NONBLOCK   1       /* inline submission attempt */
+#define REQ_F_NOWAIT           1       /* must not punt to workers */
 #define REQ_F_IOPOLL_COMPLETED 2       /* polled IO has completed */
 #define REQ_F_FIXED_FILE       4       /* ctx owns file */
 #define REQ_F_SEQ_PREV         8       /* sequential with previous */
@@ -317,12 +418,6 @@ static void io_commit_cqring(struct io_ring_ctx *ctx)
                /* order cqe stores with ring update */
                smp_store_release(&ring->r.tail, ctx->cached_cq_tail);
 
-               /*
-                * Write sider barrier of tail update, app has read side. See
-                * comment at the top of this file.
-                */
-               smp_wmb();
-
                if (wq_has_sleeper(&ctx->cq_wait)) {
                        wake_up_interruptible(&ctx->cq_wait);
                        kill_fasync(&ctx->cq_fasync, SIGIO, POLL_IN);
@@ -336,8 +431,11 @@ static struct io_uring_cqe *io_get_cqring(struct io_ring_ctx *ctx)
        unsigned tail;
 
        tail = ctx->cached_cq_tail;
-       /* See comment at the top of the file */
-       smp_rmb();
+       /*
+        * writes to the cq entry need to come after reading head; the
+        * control dependency is enough as we're using WRITE_ONCE to
+        * fill the cq entry
+        */
        if (tail - READ_ONCE(ring->r.head) == ring->ring_entries)
                return NULL;
 
@@ -774,10 +872,14 @@ static int io_prep_rw(struct io_kiocb *req, const struct sqe_submit *s,
        ret = kiocb_set_rw_flags(kiocb, READ_ONCE(sqe->rw_flags));
        if (unlikely(ret))
                return ret;
-       if (force_nonblock) {
+
+       /* don't allow async punt if RWF_NOWAIT was requested */
+       if (kiocb->ki_flags & IOCB_NOWAIT)
+               req->flags |= REQ_F_NOWAIT;
+
+       if (force_nonblock)
                kiocb->ki_flags |= IOCB_NOWAIT;
-               req->flags |= REQ_F_FORCE_NONBLOCK;
-       }
+
        if (ctx->flags & IORING_SETUP_IOPOLL) {
                if (!(kiocb->ki_flags & IOCB_DIRECT) ||
                    !kiocb->ki_filp->f_op->iopoll)
@@ -1436,8 +1538,7 @@ restart:
                struct sqe_submit *s = &req->submit;
                const struct io_uring_sqe *sqe = s->sqe;
 
-               /* Ensure we clear previously set forced non-block flag */
-               req->flags &= ~REQ_F_FORCE_NONBLOCK;
+               /* Ensure we clear previously set non-block flag */
                req->rw.ki_flags &= ~IOCB_NOWAIT;
 
                ret = 0;
@@ -1467,10 +1568,11 @@ restart:
                                        break;
                                cond_resched();
                        } while (1);
-
-                       /* drop submission reference */
-                       io_put_req(req);
                }
+
+               /* drop submission reference */
+               io_put_req(req);
+
                if (ret) {
                        io_cqring_add_event(ctx, sqe->user_data, ret, 0);
                        io_put_req(req);
@@ -1623,7 +1725,7 @@ static int io_submit_sqe(struct io_ring_ctx *ctx, struct sqe_submit *s,
                goto out;
 
        ret = __io_submit_sqe(ctx, req, s, true);
-       if (ret == -EAGAIN) {
+       if (ret == -EAGAIN && !(req->flags & REQ_F_NOWAIT)) {
                struct io_uring_sqe *sqe_copy;
 
                sqe_copy = kmalloc(sizeof(*sqe_copy), GFP_KERNEL);
@@ -1697,24 +1799,10 @@ static void io_commit_sqring(struct io_ring_ctx *ctx)
                 * write new data to them.
                 */
                smp_store_release(&ring->r.head, ctx->cached_sq_head);
-
-               /*
-                * write side barrier of head update, app has read side. See
-                * comment at the top of this file
-                */
-               smp_wmb();
        }
 }
 
 /*
- * Undo last io_get_sqring()
- */
-static void io_drop_sqring(struct io_ring_ctx *ctx)
-{
-       ctx->cached_sq_head--;
-}
-
-/*
  * Fetch an sqe, if one is available. Note that s->sqe will point to memory
  * that is mapped by userspace. This means that care needs to be taken to
  * ensure that reads are stable, as we cannot rely on userspace always
@@ -1736,8 +1824,6 @@ static bool io_get_sqring(struct io_ring_ctx *ctx, struct sqe_submit *s)
         *    though the application is the one updating it.
         */
        head = ctx->cached_sq_head;
-       /* See comment at the top of this file */
-       smp_rmb();
        /* make sure SQ entry isn't read before tail */
        if (head == smp_load_acquire(&ring->r.tail))
                return false;
@@ -1753,8 +1839,6 @@ static bool io_get_sqring(struct io_ring_ctx *ctx, struct sqe_submit *s)
        /* drop invalid entries */
        ctx->cached_sq_head++;
        ring->dropped++;
-       /* See comment at the top of this file */
-       smp_wmb();
        return false;
 }
 
@@ -1878,13 +1962,11 @@ static int io_sq_thread(void *data)
                                finish_wait(&ctx->sqo_wait, &wait);
 
                                ctx->sq_ring->flags &= ~IORING_SQ_NEED_WAKEUP;
-                               smp_wmb();
                                continue;
                        }
                        finish_wait(&ctx->sqo_wait, &wait);
 
                        ctx->sq_ring->flags &= ~IORING_SQ_NEED_WAKEUP;
-                       smp_wmb();
                }
 
                i = 0;
@@ -1929,7 +2011,7 @@ static int io_sq_thread(void *data)
 static int io_ring_submit(struct io_ring_ctx *ctx, unsigned int to_submit)
 {
        struct io_submit_state state, *statep = NULL;
-       int i, ret = 0, submit = 0;
+       int i, submit = 0;
 
        if (to_submit > IO_PLUG_THRESHOLD) {
                io_submit_state_start(&state, ctx, to_submit);
@@ -1938,6 +2020,7 @@ static int io_ring_submit(struct io_ring_ctx *ctx, unsigned int to_submit)
 
        for (i = 0; i < to_submit; i++) {
                struct sqe_submit s;
+               int ret;
 
                if (!io_get_sqring(ctx, &s))
                        break;
@@ -1945,21 +2028,18 @@ static int io_ring_submit(struct io_ring_ctx *ctx, unsigned int to_submit)
                s.has_user = true;
                s.needs_lock = false;
                s.needs_fixed_file = false;
+               submit++;
 
                ret = io_submit_sqe(ctx, &s, statep);
-               if (ret) {
-                       io_drop_sqring(ctx);
-                       break;
-               }
-
-               submit++;
+               if (ret)
+                       io_cqring_add_event(ctx, s.sqe->user_data, ret, 0);
        }
        io_commit_sqring(ctx);
 
        if (statep)
                io_submit_state_end(statep);
 
-       return submit ? submit : ret;
+       return submit;
 }
 
 static unsigned io_cqring_events(struct io_cq_ring *ring)
@@ -2240,10 +2320,6 @@ static int io_sq_offload_start(struct io_ring_ctx *ctx,
        mmgrab(current->mm);
        ctx->sqo_mm = current->mm;
 
-       ret = -EINVAL;
-       if (!cpu_possible(p->sq_thread_cpu))
-               goto err;
-
        if (ctx->flags & IORING_SETUP_SQPOLL) {
                ret = -EPERM;
                if (!capable(CAP_SYS_ADMIN))
@@ -2254,11 +2330,11 @@ static int io_sq_offload_start(struct io_ring_ctx *ctx,
                        ctx->sq_thread_idle = HZ;
 
                if (p->flags & IORING_SETUP_SQ_AFF) {
-                       int cpu;
+                       int cpu = array_index_nospec(p->sq_thread_cpu,
+                                                       nr_cpu_ids);
 
-                       cpu = array_index_nospec(p->sq_thread_cpu, NR_CPUS);
                        ret = -EINVAL;
-                       if (!cpu_possible(p->sq_thread_cpu))
+                       if (!cpu_possible(cpu))
                                goto err;
 
                        ctx->sqo_thread = kthread_create_on_cpu(io_sq_thread,
@@ -2321,8 +2397,12 @@ static int io_account_mem(struct user_struct *user, unsigned long nr_pages)
 
 static void io_mem_free(void *ptr)
 {
-       struct page *page = virt_to_head_page(ptr);
+       struct page *page;
+
+       if (!ptr)
+               return;
 
+       page = virt_to_head_page(ptr);
        if (put_page_testzero(page))
                free_compound_page(page);
 }
@@ -2363,7 +2443,7 @@ static int io_sqe_buffer_unregister(struct io_ring_ctx *ctx)
 
                if (ctx->account_mem)
                        io_unaccount_mem(ctx->user, imu->nr_bvecs);
-               kfree(imu->bvec);
+               kvfree(imu->bvec);
                imu->nr_bvecs = 0;
        }
 
@@ -2455,9 +2535,9 @@ static int io_sqe_buffer_register(struct io_ring_ctx *ctx, void __user *arg,
                if (!pages || nr_pages > got_pages) {
                        kfree(vmas);
                        kfree(pages);
-                       pages = kmalloc_array(nr_pages, sizeof(struct page *),
+                       pages = kvmalloc_array(nr_pages, sizeof(struct page *),
                                                GFP_KERNEL);
-                       vmas = kmalloc_array(nr_pages,
+                       vmas = kvmalloc_array(nr_pages,
                                        sizeof(struct vm_area_struct *),
                                        GFP_KERNEL);
                        if (!pages || !vmas) {
@@ -2469,7 +2549,7 @@ static int io_sqe_buffer_register(struct io_ring_ctx *ctx, void __user *arg,
                        got_pages = nr_pages;
                }
 
-               imu->bvec = kmalloc_array(nr_pages, sizeof(struct bio_vec),
+               imu->bvec = kvmalloc_array(nr_pages, sizeof(struct bio_vec),
                                                GFP_KERNEL);
                ret = -ENOMEM;
                if (!imu->bvec) {
@@ -2508,6 +2588,7 @@ static int io_sqe_buffer_register(struct io_ring_ctx *ctx, void __user *arg,
                        }
                        if (ctx->account_mem)
                                io_unaccount_mem(ctx->user, nr_pages);
+                       kvfree(imu->bvec);
                        goto err;
                }
 
@@ -2530,12 +2611,12 @@ static int io_sqe_buffer_register(struct io_ring_ctx *ctx, void __user *arg,
 
                ctx->nr_user_bufs++;
        }
-       kfree(pages);
-       kfree(vmas);
+       kvfree(pages);
+       kvfree(vmas);
        return 0;
 err:
-       kfree(pages);
-       kfree(vmas);
+       kvfree(pages);
+       kvfree(vmas);
        io_sqe_buffer_unregister(ctx);
        return ret;
 }
@@ -2573,7 +2654,10 @@ static __poll_t io_uring_poll(struct file *file, poll_table *wait)
        __poll_t mask = 0;
 
        poll_wait(file, &ctx->cq_wait, wait);
-       /* See comment at the top of this file */
+       /*
+        * synchronizes with barrier from wq_has_sleeper call in
+        * io_commit_cqring
+        */
        smp_rmb();
        if (READ_ONCE(ctx->sq_ring->r.tail) - ctx->cached_sq_head !=
            ctx->sq_ring->ring_entries)
@@ -2687,24 +2771,12 @@ SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit,
                mutex_lock(&ctx->uring_lock);
                submitted = io_ring_submit(ctx, to_submit);
                mutex_unlock(&ctx->uring_lock);
-
-               if (submitted < 0)
-                       goto out_ctx;
        }
        if (flags & IORING_ENTER_GETEVENTS) {
                unsigned nr_events = 0;
 
                min_complete = min(min_complete, ctx->cq_entries);
 
-               /*
-                * The application could have included the 'to_submit' count
-                * in how many events it wanted to wait for. If we failed to
-                * submit the desired count, we may need to adjust the number
-                * of events to poll/wait for.
-                */
-               if (submitted < to_submit)
-                       min_complete = min_t(unsigned, submitted, min_complete);
-
                if (ctx->flags & IORING_SETUP_IOPOLL) {
                        mutex_lock(&ctx->uring_lock);
                        ret = io_iopoll_check(ctx, &nr_events, min_complete);
@@ -2750,17 +2822,12 @@ static int io_allocate_scq_urings(struct io_ring_ctx *ctx,
                return -EOVERFLOW;
 
        ctx->sq_sqes = io_mem_alloc(size);
-       if (!ctx->sq_sqes) {
-               io_mem_free(ctx->sq_ring);
+       if (!ctx->sq_sqes)
                return -ENOMEM;
-       }
 
        cq_ring = io_mem_alloc(struct_size(cq_ring, cqes, p->cq_entries));
-       if (!cq_ring) {
-               io_mem_free(ctx->sq_ring);
-               io_mem_free(ctx->sq_sqes);
+       if (!cq_ring)
                return -ENOMEM;
-       }
 
        ctx->cq_ring = cq_ring;
        cq_ring->ring_mask = p->cq_entries - 1;
index f184af1..2d0131a 100644 (file)
@@ -60,7 +60,7 @@ struct iov_iter {
 
 static inline enum iter_type iov_iter_type(const struct iov_iter *i)
 {
-       return i->type & ~(READ | WRITE);
+       return i->type & ~(READ | WRITE | ITER_BVEC_FLAG_NO_REF);
 }
 
 static inline bool iter_is_iovec(const struct iov_iter *i)