OSDN Git Service

io_uring/io-wq: forward submission ref to async
authorPavel Begunkov <asml.silence@gmail.com>
Wed, 4 Mar 2020 13:14:12 +0000 (16:14 +0300)
committerJens Axboe <axboe@kernel.dk>
Wed, 4 Mar 2020 18:39:07 +0000 (11:39 -0700)
First it changes io-wq interfaces. It replaces {get,put}_work() with
free_work(), which guaranteed to be called exactly once. It also enforces
free_work() callback to be non-NULL.

io_uring follows the changes and instead of putting a submission reference
in io_put_req_async_completion(), it will be done in io_free_work(). As
removes io_get_work() with corresponding refcount_inc(), the ref balance
is maintained.

Signed-off-by: Pavel Begunkov <asml.silence@gmail.com>
Signed-off-by: Jens Axboe <axboe@kernel.dk>
fs/io-wq.c
fs/io-wq.h
fs/io_uring.c

index 82e7601..eda36f9 100644 (file)
@@ -107,8 +107,7 @@ struct io_wq {
        struct io_wqe **wqes;
        unsigned long state;
 
-       get_work_fn *get_work;
-       put_work_fn *put_work;
+       free_work_fn *free_work;
 
        struct task_struct *manager;
        struct user_struct *user;
@@ -509,16 +508,11 @@ get_next:
                        if (test_bit(IO_WQ_BIT_CANCEL, &wq->state))
                                work->flags |= IO_WQ_WORK_CANCEL;
 
-                       if (wq->get_work)
-                               wq->get_work(work);
-
                        old_work = work;
                        work->func(&work);
                        work = (old_work == work) ? NULL : work;
                        io_assign_current_work(worker, work);
-
-                       if (wq->put_work)
-                               wq->put_work(old_work);
+                       wq->free_work(old_work);
 
                        if (hash != -1U) {
                                spin_lock_irq(&wqe->lock);
@@ -749,14 +743,17 @@ static bool io_wq_can_queue(struct io_wqe *wqe, struct io_wqe_acct *acct,
        return true;
 }
 
-static void io_run_cancel(struct io_wq_work *work)
+static void io_run_cancel(struct io_wq_work *work, struct io_wqe *wqe)
 {
+       struct io_wq *wq = wqe->wq;
+
        do {
                struct io_wq_work *old_work = work;
 
                work->flags |= IO_WQ_WORK_CANCEL;
                work->func(&work);
                work = (work == old_work) ? NULL : work;
+               wq->free_work(old_work);
        } while (work);
 }
 
@@ -773,7 +770,7 @@ static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work)
         * It's close enough to not be an issue, fork() has the same delay.
         */
        if (unlikely(!io_wq_can_queue(wqe, acct, work))) {
-               io_run_cancel(work);
+               io_run_cancel(work, wqe);
                return;
        }
 
@@ -912,7 +909,7 @@ static enum io_wq_cancel io_wqe_cancel_cb_work(struct io_wqe *wqe,
        spin_unlock_irqrestore(&wqe->lock, flags);
 
        if (found) {
-               io_run_cancel(work);
+               io_run_cancel(work, wqe);
                return IO_WQ_CANCEL_OK;
        }
 
@@ -987,7 +984,7 @@ static enum io_wq_cancel io_wqe_cancel_work(struct io_wqe *wqe,
        spin_unlock_irqrestore(&wqe->lock, flags);
 
        if (found) {
-               io_run_cancel(work);
+               io_run_cancel(work, wqe);
                return IO_WQ_CANCEL_OK;
        }
 
@@ -1064,6 +1061,9 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
        int ret = -ENOMEM, node;
        struct io_wq *wq;
 
+       if (WARN_ON_ONCE(!data->free_work))
+               return ERR_PTR(-EINVAL);
+
        wq = kzalloc(sizeof(*wq), GFP_KERNEL);
        if (!wq)
                return ERR_PTR(-ENOMEM);
@@ -1074,8 +1074,7 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
                return ERR_PTR(-ENOMEM);
        }
 
-       wq->get_work = data->get_work;
-       wq->put_work = data->put_work;
+       wq->free_work = data->free_work;
 
        /* caller must already hold a reference to this */
        wq->user = data->user;
@@ -1132,7 +1131,7 @@ err:
 
 bool io_wq_get(struct io_wq *wq, struct io_wq_data *data)
 {
-       if (data->get_work != wq->get_work || data->put_work != wq->put_work)
+       if (data->free_work != wq->free_work)
                return false;
 
        return refcount_inc_not_zero(&wq->use_refs);
index a0978d6..2117b9a 100644 (file)
@@ -81,14 +81,12 @@ struct io_wq_work {
                *(work) = (struct io_wq_work){ .func = _func }; \
        } while (0)                                             \
 
-typedef void (get_work_fn)(struct io_wq_work *);
-typedef void (put_work_fn)(struct io_wq_work *);
+typedef void (free_work_fn)(struct io_wq_work *);
 
 struct io_wq_data {
        struct user_struct *user;
 
-       get_work_fn *get_work;
-       put_work_fn *put_work;
+       free_work_fn *free_work;
 };
 
 struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data);
index 40ca9e6..0d6f4b3 100644 (file)
@@ -1558,8 +1558,8 @@ static void io_put_req(struct io_kiocb *req)
                io_free_req(req);
 }
 
-static void io_put_req_async_completion(struct io_kiocb *req,
-                                       struct io_wq_work **workptr)
+static void io_steal_work(struct io_kiocb *req,
+                         struct io_wq_work **workptr)
 {
        /*
         * It's in an io-wq worker, so there always should be at least
@@ -1569,7 +1569,6 @@ static void io_put_req_async_completion(struct io_kiocb *req,
         * It also means, that if the counter dropped to 1, then there is
         * no asynchronous users left, so it's safe to steal the next work.
         */
-       refcount_dec(&req->refs);
        if (refcount_read(&req->refs) == 1) {
                struct io_kiocb *nxt = NULL;
 
@@ -2578,7 +2577,7 @@ static bool io_req_cancelled(struct io_kiocb *req)
        if (req->work.flags & IO_WQ_WORK_CANCEL) {
                req_set_fail_links(req);
                io_cqring_add_event(req, -ECANCELED);
-               io_double_put_req(req);
+               io_put_req(req);
                return true;
        }
 
@@ -2606,7 +2605,7 @@ static void io_fsync_finish(struct io_wq_work **workptr)
        if (io_req_cancelled(req))
                return;
        __io_fsync(req);
-       io_put_req_async_completion(req, workptr);
+       io_steal_work(req, workptr);
 }
 
 static int io_fsync(struct io_kiocb *req, bool force_nonblock)
@@ -2639,7 +2638,7 @@ static void io_fallocate_finish(struct io_wq_work **workptr)
        if (io_req_cancelled(req))
                return;
        __io_fallocate(req);
-       io_put_req_async_completion(req, workptr);
+       io_steal_work(req, workptr);
 }
 
 static int io_fallocate_prep(struct io_kiocb *req,
@@ -3006,7 +3005,7 @@ static void io_close_finish(struct io_wq_work **workptr)
 
        /* not cancellable, don't do io_req_cancelled() */
        __io_close_finish(req);
-       io_put_req_async_completion(req, workptr);
+       io_steal_work(req, workptr);
 }
 
 static int io_close(struct io_kiocb *req, bool force_nonblock)
@@ -3452,7 +3451,7 @@ static void io_accept_finish(struct io_wq_work **workptr)
        if (io_req_cancelled(req))
                return;
        __io_accept(req, false);
-       io_put_req_async_completion(req, workptr);
+       io_steal_work(req, workptr);
 }
 #endif
 
@@ -4719,7 +4718,7 @@ static void io_wq_submit_work(struct io_wq_work **workptr)
                io_put_req(req);
        }
 
-       io_put_req_async_completion(req, workptr);
+       io_steal_work(req, workptr);
 }
 
 static int io_req_needs_file(struct io_kiocb *req, int fd)
@@ -6105,21 +6104,14 @@ static int io_sqe_files_update(struct io_ring_ctx *ctx, void __user *arg,
        return __io_sqe_files_update(ctx, &up, nr_args);
 }
 
-static void io_put_work(struct io_wq_work *work)
+static void io_free_work(struct io_wq_work *work)
 {
        struct io_kiocb *req = container_of(work, struct io_kiocb, work);
 
-       /* Consider that io_put_req_async_completion() relies on this ref */
+       /* Consider that io_steal_work() relies on this ref */
        io_put_req(req);
 }
 
-static void io_get_work(struct io_wq_work *work)
-{
-       struct io_kiocb *req = container_of(work, struct io_kiocb, work);
-
-       refcount_inc(&req->refs);
-}
-
 static int io_init_wq_offload(struct io_ring_ctx *ctx,
                              struct io_uring_params *p)
 {
@@ -6130,8 +6122,7 @@ static int io_init_wq_offload(struct io_ring_ctx *ctx,
        int ret = 0;
 
        data.user = ctx->user;
-       data.get_work = io_get_work;
-       data.put_work = io_put_work;
+       data.free_work = io_free_work;
 
        if (!(p->flags & IORING_SETUP_ATTACH_WQ)) {
                /* Do QD, or 4 * CPUS, whatever is smallest */