OSDN Git Service

io_uring: fix infinite wait in khread_park() on io_finish_async()
[tomoyo/tomoyo-test1.git] / fs / io_uring.c
1 // SPDX-License-Identifier: GPL-2.0
2 /*
3  * Shared application/kernel submission and completion ring pairs, for
4  * supporting fast/efficient IO.
5  *
6  * A note on the read/write ordering memory barriers that are matched between
7  * the application and kernel side.
8  *
9  * After the application reads the CQ ring tail, it must use an
10  * appropriate smp_rmb() to pair with the smp_wmb() the kernel uses
11  * before writing the tail (using smp_load_acquire to read the tail will
12  * do). It also needs a smp_mb() before updating CQ head (ordering the
13  * entry load(s) with the head store), pairing with an implicit barrier
14  * through a control-dependency in io_get_cqring (smp_store_release to
15  * store head will do). Failure to do so could lead to reading invalid
16  * CQ entries.
17  *
18  * Likewise, the application must use an appropriate smp_wmb() before
19  * writing the SQ tail (ordering SQ entry stores with the tail store),
20  * which pairs with smp_load_acquire in io_get_sqring (smp_store_release
21  * to store the tail will do). And it needs a barrier ordering the SQ
22  * head load before writing new SQ entries (smp_load_acquire to read
23  * head will do).
24  *
25  * When using the SQ poll thread (IORING_SETUP_SQPOLL), the application
26  * needs to check the SQ flags for IORING_SQ_NEED_WAKEUP *after*
27  * updating the SQ tail; a full memory barrier smp_mb() is needed
28  * between.
29  *
30  * Also see the examples in the liburing library:
31  *
32  *      git://git.kernel.dk/liburing
33  *
34  * io_uring also uses READ/WRITE_ONCE() for _any_ store or load that happens
35  * from data shared between the kernel and application. This is done both
36  * for ordering purposes, but also to ensure that once a value is loaded from
37  * data that the application could potentially modify, it remains stable.
38  *
39  * Copyright (C) 2018-2019 Jens Axboe
40  * Copyright (c) 2018-2019 Christoph Hellwig
41  */
42 #include <linux/kernel.h>
43 #include <linux/init.h>
44 #include <linux/errno.h>
45 #include <linux/syscalls.h>
46 #include <linux/compat.h>
47 #include <linux/refcount.h>
48 #include <linux/uio.h>
49
50 #include <linux/sched/signal.h>
51 #include <linux/fs.h>
52 #include <linux/file.h>
53 #include <linux/fdtable.h>
54 #include <linux/mm.h>
55 #include <linux/mman.h>
56 #include <linux/mmu_context.h>
57 #include <linux/percpu.h>
58 #include <linux/slab.h>
59 #include <linux/workqueue.h>
60 #include <linux/kthread.h>
61 #include <linux/blkdev.h>
62 #include <linux/bvec.h>
63 #include <linux/net.h>
64 #include <net/sock.h>
65 #include <net/af_unix.h>
66 #include <net/scm.h>
67 #include <linux/anon_inodes.h>
68 #include <linux/sched/mm.h>
69 #include <linux/uaccess.h>
70 #include <linux/nospec.h>
71 #include <linux/sizes.h>
72 #include <linux/hugetlb.h>
73
74 #include <uapi/linux/io_uring.h>
75
76 #include "internal.h"
77
78 #define IORING_MAX_ENTRIES      4096
79 #define IORING_MAX_FIXED_FILES  1024
80
81 struct io_uring {
82         u32 head ____cacheline_aligned_in_smp;
83         u32 tail ____cacheline_aligned_in_smp;
84 };
85
86 /*
87  * This data is shared with the application through the mmap at offset
88  * IORING_OFF_SQ_RING.
89  *
90  * The offsets to the member fields are published through struct
91  * io_sqring_offsets when calling io_uring_setup.
92  */
93 struct io_sq_ring {
94         /*
95          * Head and tail offsets into the ring; the offsets need to be
96          * masked to get valid indices.
97          *
98          * The kernel controls head and the application controls tail.
99          */
100         struct io_uring         r;
101         /*
102          * Bitmask to apply to head and tail offsets (constant, equals
103          * ring_entries - 1)
104          */
105         u32                     ring_mask;
106         /* Ring size (constant, power of 2) */
107         u32                     ring_entries;
108         /*
109          * Number of invalid entries dropped by the kernel due to
110          * invalid index stored in array
111          *
112          * Written by the kernel, shouldn't be modified by the
113          * application (i.e. get number of "new events" by comparing to
114          * cached value).
115          *
116          * After a new SQ head value was read by the application this
117          * counter includes all submissions that were dropped reaching
118          * the new SQ head (and possibly more).
119          */
120         u32                     dropped;
121         /*
122          * Runtime flags
123          *
124          * Written by the kernel, shouldn't be modified by the
125          * application.
126          *
127          * The application needs a full memory barrier before checking
128          * for IORING_SQ_NEED_WAKEUP after updating the sq tail.
129          */
130         u32                     flags;
131         /*
132          * Ring buffer of indices into array of io_uring_sqe, which is
133          * mmapped by the application using the IORING_OFF_SQES offset.
134          *
135          * This indirection could e.g. be used to assign fixed
136          * io_uring_sqe entries to operations and only submit them to
137          * the queue when needed.
138          *
139          * The kernel modifies neither the indices array nor the entries
140          * array.
141          */
142         u32                     array[];
143 };
144
145 /*
146  * This data is shared with the application through the mmap at offset
147  * IORING_OFF_CQ_RING.
148  *
149  * The offsets to the member fields are published through struct
150  * io_cqring_offsets when calling io_uring_setup.
151  */
152 struct io_cq_ring {
153         /*
154          * Head and tail offsets into the ring; the offsets need to be
155          * masked to get valid indices.
156          *
157          * The application controls head and the kernel tail.
158          */
159         struct io_uring         r;
160         /*
161          * Bitmask to apply to head and tail offsets (constant, equals
162          * ring_entries - 1)
163          */
164         u32                     ring_mask;
165         /* Ring size (constant, power of 2) */
166         u32                     ring_entries;
167         /*
168          * Number of completion events lost because the queue was full;
169          * this should be avoided by the application by making sure
170          * there are not more requests pending thatn there is space in
171          * the completion queue.
172          *
173          * Written by the kernel, shouldn't be modified by the
174          * application (i.e. get number of "new events" by comparing to
175          * cached value).
176          *
177          * As completion events come in out of order this counter is not
178          * ordered with any other data.
179          */
180         u32                     overflow;
181         /*
182          * Ring buffer of completion events.
183          *
184          * The kernel writes completion events fresh every time they are
185          * produced, so the application is allowed to modify pending
186          * entries.
187          */
188         struct io_uring_cqe     cqes[];
189 };
190
191 struct io_mapped_ubuf {
192         u64             ubuf;
193         size_t          len;
194         struct          bio_vec *bvec;
195         unsigned int    nr_bvecs;
196 };
197
198 struct async_list {
199         spinlock_t              lock;
200         atomic_t                cnt;
201         struct list_head        list;
202
203         struct file             *file;
204         off_t                   io_end;
205         size_t                  io_pages;
206 };
207
208 struct io_ring_ctx {
209         struct {
210                 struct percpu_ref       refs;
211         } ____cacheline_aligned_in_smp;
212
213         struct {
214                 unsigned int            flags;
215                 bool                    compat;
216                 bool                    account_mem;
217
218                 /* SQ ring */
219                 struct io_sq_ring       *sq_ring;
220                 unsigned                cached_sq_head;
221                 unsigned                sq_entries;
222                 unsigned                sq_mask;
223                 unsigned                sq_thread_idle;
224                 struct io_uring_sqe     *sq_sqes;
225
226                 struct list_head        defer_list;
227         } ____cacheline_aligned_in_smp;
228
229         /* IO offload */
230         struct workqueue_struct *sqo_wq;
231         struct task_struct      *sqo_thread;    /* if using sq thread polling */
232         struct mm_struct        *sqo_mm;
233         wait_queue_head_t       sqo_wait;
234
235         struct {
236                 /* CQ ring */
237                 struct io_cq_ring       *cq_ring;
238                 unsigned                cached_cq_tail;
239                 unsigned                cq_entries;
240                 unsigned                cq_mask;
241                 struct wait_queue_head  cq_wait;
242                 struct fasync_struct    *cq_fasync;
243                 struct eventfd_ctx      *cq_ev_fd;
244         } ____cacheline_aligned_in_smp;
245
246         /*
247          * If used, fixed file set. Writers must ensure that ->refs is dead,
248          * readers must ensure that ->refs is alive as long as the file* is
249          * used. Only updated through io_uring_register(2).
250          */
251         struct file             **user_files;
252         unsigned                nr_user_files;
253
254         /* if used, fixed mapped user buffers */
255         unsigned                nr_user_bufs;
256         struct io_mapped_ubuf   *user_bufs;
257
258         struct user_struct      *user;
259
260         struct completion       ctx_done;
261
262         struct {
263                 struct mutex            uring_lock;
264                 wait_queue_head_t       wait;
265         } ____cacheline_aligned_in_smp;
266
267         struct {
268                 spinlock_t              completion_lock;
269                 bool                    poll_multi_file;
270                 /*
271                  * ->poll_list is protected by the ctx->uring_lock for
272                  * io_uring instances that don't use IORING_SETUP_SQPOLL.
273                  * For SQPOLL, only the single threaded io_sq_thread() will
274                  * manipulate the list, hence no extra locking is needed there.
275                  */
276                 struct list_head        poll_list;
277                 struct list_head        cancel_list;
278         } ____cacheline_aligned_in_smp;
279
280         struct async_list       pending_async[2];
281
282 #if defined(CONFIG_UNIX)
283         struct socket           *ring_sock;
284 #endif
285 };
286
287 struct sqe_submit {
288         const struct io_uring_sqe       *sqe;
289         unsigned short                  index;
290         bool                            has_user;
291         bool                            needs_lock;
292         bool                            needs_fixed_file;
293 };
294
295 /*
296  * First field must be the file pointer in all the
297  * iocb unions! See also 'struct kiocb' in <linux/fs.h>
298  */
299 struct io_poll_iocb {
300         struct file                     *file;
301         struct wait_queue_head          *head;
302         __poll_t                        events;
303         bool                            done;
304         bool                            canceled;
305         struct wait_queue_entry         wait;
306 };
307
308 /*
309  * NOTE! Each of the iocb union members has the file pointer
310  * as the first entry in their struct definition. So you can
311  * access the file pointer through any of the sub-structs,
312  * or directly as just 'ki_filp' in this struct.
313  */
314 struct io_kiocb {
315         union {
316                 struct file             *file;
317                 struct kiocb            rw;
318                 struct io_poll_iocb     poll;
319         };
320
321         struct sqe_submit       submit;
322
323         struct io_ring_ctx      *ctx;
324         struct list_head        list;
325         unsigned int            flags;
326         refcount_t              refs;
327 #define REQ_F_NOWAIT            1       /* must not punt to workers */
328 #define REQ_F_IOPOLL_COMPLETED  2       /* polled IO has completed */
329 #define REQ_F_FIXED_FILE        4       /* ctx owns file */
330 #define REQ_F_SEQ_PREV          8       /* sequential with previous */
331 #define REQ_F_IO_DRAIN          16      /* drain existing IO first */
332 #define REQ_F_IO_DRAINED        32      /* drain done */
333         u64                     user_data;
334         u32                     error;  /* iopoll result from callback */
335         u32                     sequence;
336
337         struct work_struct      work;
338 };
339
340 #define IO_PLUG_THRESHOLD               2
341 #define IO_IOPOLL_BATCH                 8
342
343 struct io_submit_state {
344         struct blk_plug         plug;
345
346         /*
347          * io_kiocb alloc cache
348          */
349         void                    *reqs[IO_IOPOLL_BATCH];
350         unsigned                int free_reqs;
351         unsigned                int cur_req;
352
353         /*
354          * File reference cache
355          */
356         struct file             *file;
357         unsigned int            fd;
358         unsigned int            has_refs;
359         unsigned int            used_refs;
360         unsigned int            ios_left;
361 };
362
363 static void io_sq_wq_submit_work(struct work_struct *work);
364
365 static struct kmem_cache *req_cachep;
366
367 static const struct file_operations io_uring_fops;
368
369 struct sock *io_uring_get_socket(struct file *file)
370 {
371 #if defined(CONFIG_UNIX)
372         if (file->f_op == &io_uring_fops) {
373                 struct io_ring_ctx *ctx = file->private_data;
374
375                 return ctx->ring_sock->sk;
376         }
377 #endif
378         return NULL;
379 }
380 EXPORT_SYMBOL(io_uring_get_socket);
381
382 static void io_ring_ctx_ref_free(struct percpu_ref *ref)
383 {
384         struct io_ring_ctx *ctx = container_of(ref, struct io_ring_ctx, refs);
385
386         complete(&ctx->ctx_done);
387 }
388
389 static struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
390 {
391         struct io_ring_ctx *ctx;
392         int i;
393
394         ctx = kzalloc(sizeof(*ctx), GFP_KERNEL);
395         if (!ctx)
396                 return NULL;
397
398         if (percpu_ref_init(&ctx->refs, io_ring_ctx_ref_free, 0, GFP_KERNEL)) {
399                 kfree(ctx);
400                 return NULL;
401         }
402
403         ctx->flags = p->flags;
404         init_waitqueue_head(&ctx->cq_wait);
405         init_completion(&ctx->ctx_done);
406         mutex_init(&ctx->uring_lock);
407         init_waitqueue_head(&ctx->wait);
408         for (i = 0; i < ARRAY_SIZE(ctx->pending_async); i++) {
409                 spin_lock_init(&ctx->pending_async[i].lock);
410                 INIT_LIST_HEAD(&ctx->pending_async[i].list);
411                 atomic_set(&ctx->pending_async[i].cnt, 0);
412         }
413         spin_lock_init(&ctx->completion_lock);
414         INIT_LIST_HEAD(&ctx->poll_list);
415         INIT_LIST_HEAD(&ctx->cancel_list);
416         INIT_LIST_HEAD(&ctx->defer_list);
417         return ctx;
418 }
419
420 static inline bool io_sequence_defer(struct io_ring_ctx *ctx,
421                                      struct io_kiocb *req)
422 {
423         if ((req->flags & (REQ_F_IO_DRAIN|REQ_F_IO_DRAINED)) != REQ_F_IO_DRAIN)
424                 return false;
425
426         return req->sequence > ctx->cached_cq_tail + ctx->sq_ring->dropped;
427 }
428
429 static struct io_kiocb *io_get_deferred_req(struct io_ring_ctx *ctx)
430 {
431         struct io_kiocb *req;
432
433         if (list_empty(&ctx->defer_list))
434                 return NULL;
435
436         req = list_first_entry(&ctx->defer_list, struct io_kiocb, list);
437         if (!io_sequence_defer(ctx, req)) {
438                 list_del_init(&req->list);
439                 return req;
440         }
441
442         return NULL;
443 }
444
445 static void __io_commit_cqring(struct io_ring_ctx *ctx)
446 {
447         struct io_cq_ring *ring = ctx->cq_ring;
448
449         if (ctx->cached_cq_tail != READ_ONCE(ring->r.tail)) {
450                 /* order cqe stores with ring update */
451                 smp_store_release(&ring->r.tail, ctx->cached_cq_tail);
452
453                 if (wq_has_sleeper(&ctx->cq_wait)) {
454                         wake_up_interruptible(&ctx->cq_wait);
455                         kill_fasync(&ctx->cq_fasync, SIGIO, POLL_IN);
456                 }
457         }
458 }
459
460 static void io_commit_cqring(struct io_ring_ctx *ctx)
461 {
462         struct io_kiocb *req;
463
464         __io_commit_cqring(ctx);
465
466         while ((req = io_get_deferred_req(ctx)) != NULL) {
467                 req->flags |= REQ_F_IO_DRAINED;
468                 queue_work(ctx->sqo_wq, &req->work);
469         }
470 }
471
472 static struct io_uring_cqe *io_get_cqring(struct io_ring_ctx *ctx)
473 {
474         struct io_cq_ring *ring = ctx->cq_ring;
475         unsigned tail;
476
477         tail = ctx->cached_cq_tail;
478         /*
479          * writes to the cq entry need to come after reading head; the
480          * control dependency is enough as we're using WRITE_ONCE to
481          * fill the cq entry
482          */
483         if (tail - READ_ONCE(ring->r.head) == ring->ring_entries)
484                 return NULL;
485
486         ctx->cached_cq_tail++;
487         return &ring->cqes[tail & ctx->cq_mask];
488 }
489
490 static void io_cqring_fill_event(struct io_ring_ctx *ctx, u64 ki_user_data,
491                                  long res)
492 {
493         struct io_uring_cqe *cqe;
494
495         /*
496          * If we can't get a cq entry, userspace overflowed the
497          * submission (by quite a lot). Increment the overflow count in
498          * the ring.
499          */
500         cqe = io_get_cqring(ctx);
501         if (cqe) {
502                 WRITE_ONCE(cqe->user_data, ki_user_data);
503                 WRITE_ONCE(cqe->res, res);
504                 WRITE_ONCE(cqe->flags, 0);
505         } else {
506                 unsigned overflow = READ_ONCE(ctx->cq_ring->overflow);
507
508                 WRITE_ONCE(ctx->cq_ring->overflow, overflow + 1);
509         }
510 }
511
512 static void io_cqring_ev_posted(struct io_ring_ctx *ctx)
513 {
514         if (waitqueue_active(&ctx->wait))
515                 wake_up(&ctx->wait);
516         if (waitqueue_active(&ctx->sqo_wait))
517                 wake_up(&ctx->sqo_wait);
518         if (ctx->cq_ev_fd)
519                 eventfd_signal(ctx->cq_ev_fd, 1);
520 }
521
522 static void io_cqring_add_event(struct io_ring_ctx *ctx, u64 user_data,
523                                 long res)
524 {
525         unsigned long flags;
526
527         spin_lock_irqsave(&ctx->completion_lock, flags);
528         io_cqring_fill_event(ctx, user_data, res);
529         io_commit_cqring(ctx);
530         spin_unlock_irqrestore(&ctx->completion_lock, flags);
531
532         io_cqring_ev_posted(ctx);
533 }
534
535 static void io_ring_drop_ctx_refs(struct io_ring_ctx *ctx, unsigned refs)
536 {
537         percpu_ref_put_many(&ctx->refs, refs);
538
539         if (waitqueue_active(&ctx->wait))
540                 wake_up(&ctx->wait);
541 }
542
543 static struct io_kiocb *io_get_req(struct io_ring_ctx *ctx,
544                                    struct io_submit_state *state)
545 {
546         gfp_t gfp = GFP_KERNEL | __GFP_NOWARN;
547         struct io_kiocb *req;
548
549         if (!percpu_ref_tryget(&ctx->refs))
550                 return NULL;
551
552         if (!state) {
553                 req = kmem_cache_alloc(req_cachep, gfp);
554                 if (unlikely(!req))
555                         goto out;
556         } else if (!state->free_reqs) {
557                 size_t sz;
558                 int ret;
559
560                 sz = min_t(size_t, state->ios_left, ARRAY_SIZE(state->reqs));
561                 ret = kmem_cache_alloc_bulk(req_cachep, gfp, sz, state->reqs);
562
563                 /*
564                  * Bulk alloc is all-or-nothing. If we fail to get a batch,
565                  * retry single alloc to be on the safe side.
566                  */
567                 if (unlikely(ret <= 0)) {
568                         state->reqs[0] = kmem_cache_alloc(req_cachep, gfp);
569                         if (!state->reqs[0])
570                                 goto out;
571                         ret = 1;
572                 }
573                 state->free_reqs = ret - 1;
574                 state->cur_req = 1;
575                 req = state->reqs[0];
576         } else {
577                 req = state->reqs[state->cur_req];
578                 state->free_reqs--;
579                 state->cur_req++;
580         }
581
582         req->ctx = ctx;
583         req->flags = 0;
584         /* one is dropped after submission, the other at completion */
585         refcount_set(&req->refs, 2);
586         return req;
587 out:
588         io_ring_drop_ctx_refs(ctx, 1);
589         return NULL;
590 }
591
592 static void io_free_req_many(struct io_ring_ctx *ctx, void **reqs, int *nr)
593 {
594         if (*nr) {
595                 kmem_cache_free_bulk(req_cachep, *nr, reqs);
596                 io_ring_drop_ctx_refs(ctx, *nr);
597                 *nr = 0;
598         }
599 }
600
601 static void io_free_req(struct io_kiocb *req)
602 {
603         if (req->file && !(req->flags & REQ_F_FIXED_FILE))
604                 fput(req->file);
605         io_ring_drop_ctx_refs(req->ctx, 1);
606         kmem_cache_free(req_cachep, req);
607 }
608
609 static void io_put_req(struct io_kiocb *req)
610 {
611         if (refcount_dec_and_test(&req->refs))
612                 io_free_req(req);
613 }
614
615 /*
616  * Find and free completed poll iocbs
617  */
618 static void io_iopoll_complete(struct io_ring_ctx *ctx, unsigned int *nr_events,
619                                struct list_head *done)
620 {
621         void *reqs[IO_IOPOLL_BATCH];
622         struct io_kiocb *req;
623         int to_free;
624
625         to_free = 0;
626         while (!list_empty(done)) {
627                 req = list_first_entry(done, struct io_kiocb, list);
628                 list_del(&req->list);
629
630                 io_cqring_fill_event(ctx, req->user_data, req->error);
631                 (*nr_events)++;
632
633                 if (refcount_dec_and_test(&req->refs)) {
634                         /* If we're not using fixed files, we have to pair the
635                          * completion part with the file put. Use regular
636                          * completions for those, only batch free for fixed
637                          * file.
638                          */
639                         if (req->flags & REQ_F_FIXED_FILE) {
640                                 reqs[to_free++] = req;
641                                 if (to_free == ARRAY_SIZE(reqs))
642                                         io_free_req_many(ctx, reqs, &to_free);
643                         } else {
644                                 io_free_req(req);
645                         }
646                 }
647         }
648
649         io_commit_cqring(ctx);
650         io_free_req_many(ctx, reqs, &to_free);
651 }
652
653 static int io_do_iopoll(struct io_ring_ctx *ctx, unsigned int *nr_events,
654                         long min)
655 {
656         struct io_kiocb *req, *tmp;
657         LIST_HEAD(done);
658         bool spin;
659         int ret;
660
661         /*
662          * Only spin for completions if we don't have multiple devices hanging
663          * off our complete list, and we're under the requested amount.
664          */
665         spin = !ctx->poll_multi_file && *nr_events < min;
666
667         ret = 0;
668         list_for_each_entry_safe(req, tmp, &ctx->poll_list, list) {
669                 struct kiocb *kiocb = &req->rw;
670
671                 /*
672                  * Move completed entries to our local list. If we find a
673                  * request that requires polling, break out and complete
674                  * the done list first, if we have entries there.
675                  */
676                 if (req->flags & REQ_F_IOPOLL_COMPLETED) {
677                         list_move_tail(&req->list, &done);
678                         continue;
679                 }
680                 if (!list_empty(&done))
681                         break;
682
683                 ret = kiocb->ki_filp->f_op->iopoll(kiocb, spin);
684                 if (ret < 0)
685                         break;
686
687                 if (ret && spin)
688                         spin = false;
689                 ret = 0;
690         }
691
692         if (!list_empty(&done))
693                 io_iopoll_complete(ctx, nr_events, &done);
694
695         return ret;
696 }
697
698 /*
699  * Poll for a mininum of 'min' events. Note that if min == 0 we consider that a
700  * non-spinning poll check - we'll still enter the driver poll loop, but only
701  * as a non-spinning completion check.
702  */
703 static int io_iopoll_getevents(struct io_ring_ctx *ctx, unsigned int *nr_events,
704                                 long min)
705 {
706         while (!list_empty(&ctx->poll_list)) {
707                 int ret;
708
709                 ret = io_do_iopoll(ctx, nr_events, min);
710                 if (ret < 0)
711                         return ret;
712                 if (!min || *nr_events >= min)
713                         return 0;
714         }
715
716         return 1;
717 }
718
719 /*
720  * We can't just wait for polled events to come to us, we have to actively
721  * find and complete them.
722  */
723 static void io_iopoll_reap_events(struct io_ring_ctx *ctx)
724 {
725         if (!(ctx->flags & IORING_SETUP_IOPOLL))
726                 return;
727
728         mutex_lock(&ctx->uring_lock);
729         while (!list_empty(&ctx->poll_list)) {
730                 unsigned int nr_events = 0;
731
732                 io_iopoll_getevents(ctx, &nr_events, 1);
733         }
734         mutex_unlock(&ctx->uring_lock);
735 }
736
737 static int io_iopoll_check(struct io_ring_ctx *ctx, unsigned *nr_events,
738                            long min)
739 {
740         int ret = 0;
741
742         do {
743                 int tmin = 0;
744
745                 if (*nr_events < min)
746                         tmin = min - *nr_events;
747
748                 ret = io_iopoll_getevents(ctx, nr_events, tmin);
749                 if (ret <= 0)
750                         break;
751                 ret = 0;
752         } while (min && !*nr_events && !need_resched());
753
754         return ret;
755 }
756
757 static void kiocb_end_write(struct kiocb *kiocb)
758 {
759         if (kiocb->ki_flags & IOCB_WRITE) {
760                 struct inode *inode = file_inode(kiocb->ki_filp);
761
762                 /*
763                  * Tell lockdep we inherited freeze protection from submission
764                  * thread.
765                  */
766                 if (S_ISREG(inode->i_mode))
767                         __sb_writers_acquired(inode->i_sb, SB_FREEZE_WRITE);
768                 file_end_write(kiocb->ki_filp);
769         }
770 }
771
772 static void io_complete_rw(struct kiocb *kiocb, long res, long res2)
773 {
774         struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw);
775
776         kiocb_end_write(kiocb);
777
778         io_cqring_add_event(req->ctx, req->user_data, res);
779         io_put_req(req);
780 }
781
782 static void io_complete_rw_iopoll(struct kiocb *kiocb, long res, long res2)
783 {
784         struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw);
785
786         kiocb_end_write(kiocb);
787
788         req->error = res;
789         if (res != -EAGAIN)
790                 req->flags |= REQ_F_IOPOLL_COMPLETED;
791 }
792
793 /*
794  * After the iocb has been issued, it's safe to be found on the poll list.
795  * Adding the kiocb to the list AFTER submission ensures that we don't
796  * find it from a io_iopoll_getevents() thread before the issuer is done
797  * accessing the kiocb cookie.
798  */
799 static void io_iopoll_req_issued(struct io_kiocb *req)
800 {
801         struct io_ring_ctx *ctx = req->ctx;
802
803         /*
804          * Track whether we have multiple files in our lists. This will impact
805          * how we do polling eventually, not spinning if we're on potentially
806          * different devices.
807          */
808         if (list_empty(&ctx->poll_list)) {
809                 ctx->poll_multi_file = false;
810         } else if (!ctx->poll_multi_file) {
811                 struct io_kiocb *list_req;
812
813                 list_req = list_first_entry(&ctx->poll_list, struct io_kiocb,
814                                                 list);
815                 if (list_req->rw.ki_filp != req->rw.ki_filp)
816                         ctx->poll_multi_file = true;
817         }
818
819         /*
820          * For fast devices, IO may have already completed. If it has, add
821          * it to the front so we find it first.
822          */
823         if (req->flags & REQ_F_IOPOLL_COMPLETED)
824                 list_add(&req->list, &ctx->poll_list);
825         else
826                 list_add_tail(&req->list, &ctx->poll_list);
827 }
828
829 static void io_file_put(struct io_submit_state *state)
830 {
831         if (state->file) {
832                 int diff = state->has_refs - state->used_refs;
833
834                 if (diff)
835                         fput_many(state->file, diff);
836                 state->file = NULL;
837         }
838 }
839
840 /*
841  * Get as many references to a file as we have IOs left in this submission,
842  * assuming most submissions are for one file, or at least that each file
843  * has more than one submission.
844  */
845 static struct file *io_file_get(struct io_submit_state *state, int fd)
846 {
847         if (!state)
848                 return fget(fd);
849
850         if (state->file) {
851                 if (state->fd == fd) {
852                         state->used_refs++;
853                         state->ios_left--;
854                         return state->file;
855                 }
856                 io_file_put(state);
857         }
858         state->file = fget_many(fd, state->ios_left);
859         if (!state->file)
860                 return NULL;
861
862         state->fd = fd;
863         state->has_refs = state->ios_left;
864         state->used_refs = 1;
865         state->ios_left--;
866         return state->file;
867 }
868
869 /*
870  * If we tracked the file through the SCM inflight mechanism, we could support
871  * any file. For now, just ensure that anything potentially problematic is done
872  * inline.
873  */
874 static bool io_file_supports_async(struct file *file)
875 {
876         umode_t mode = file_inode(file)->i_mode;
877
878         if (S_ISBLK(mode) || S_ISCHR(mode))
879                 return true;
880         if (S_ISREG(mode) && file->f_op != &io_uring_fops)
881                 return true;
882
883         return false;
884 }
885
886 static int io_prep_rw(struct io_kiocb *req, const struct sqe_submit *s,
887                       bool force_nonblock)
888 {
889         const struct io_uring_sqe *sqe = s->sqe;
890         struct io_ring_ctx *ctx = req->ctx;
891         struct kiocb *kiocb = &req->rw;
892         unsigned ioprio;
893         int ret;
894
895         if (!req->file)
896                 return -EBADF;
897
898         if (force_nonblock && !io_file_supports_async(req->file))
899                 force_nonblock = false;
900
901         kiocb->ki_pos = READ_ONCE(sqe->off);
902         kiocb->ki_flags = iocb_flags(kiocb->ki_filp);
903         kiocb->ki_hint = ki_hint_validate(file_write_hint(kiocb->ki_filp));
904
905         ioprio = READ_ONCE(sqe->ioprio);
906         if (ioprio) {
907                 ret = ioprio_check_cap(ioprio);
908                 if (ret)
909                         return ret;
910
911                 kiocb->ki_ioprio = ioprio;
912         } else
913                 kiocb->ki_ioprio = get_current_ioprio();
914
915         ret = kiocb_set_rw_flags(kiocb, READ_ONCE(sqe->rw_flags));
916         if (unlikely(ret))
917                 return ret;
918
919         /* don't allow async punt if RWF_NOWAIT was requested */
920         if (kiocb->ki_flags & IOCB_NOWAIT)
921                 req->flags |= REQ_F_NOWAIT;
922
923         if (force_nonblock)
924                 kiocb->ki_flags |= IOCB_NOWAIT;
925
926         if (ctx->flags & IORING_SETUP_IOPOLL) {
927                 if (!(kiocb->ki_flags & IOCB_DIRECT) ||
928                     !kiocb->ki_filp->f_op->iopoll)
929                         return -EOPNOTSUPP;
930
931                 req->error = 0;
932                 kiocb->ki_flags |= IOCB_HIPRI;
933                 kiocb->ki_complete = io_complete_rw_iopoll;
934         } else {
935                 if (kiocb->ki_flags & IOCB_HIPRI)
936                         return -EINVAL;
937                 kiocb->ki_complete = io_complete_rw;
938         }
939         return 0;
940 }
941
942 static inline void io_rw_done(struct kiocb *kiocb, ssize_t ret)
943 {
944         switch (ret) {
945         case -EIOCBQUEUED:
946                 break;
947         case -ERESTARTSYS:
948         case -ERESTARTNOINTR:
949         case -ERESTARTNOHAND:
950         case -ERESTART_RESTARTBLOCK:
951                 /*
952                  * We can't just restart the syscall, since previously
953                  * submitted sqes may already be in progress. Just fail this
954                  * IO with EINTR.
955                  */
956                 ret = -EINTR;
957                 /* fall through */
958         default:
959                 kiocb->ki_complete(kiocb, ret, 0);
960         }
961 }
962
963 static int io_import_fixed(struct io_ring_ctx *ctx, int rw,
964                            const struct io_uring_sqe *sqe,
965                            struct iov_iter *iter)
966 {
967         size_t len = READ_ONCE(sqe->len);
968         struct io_mapped_ubuf *imu;
969         unsigned index, buf_index;
970         size_t offset;
971         u64 buf_addr;
972
973         /* attempt to use fixed buffers without having provided iovecs */
974         if (unlikely(!ctx->user_bufs))
975                 return -EFAULT;
976
977         buf_index = READ_ONCE(sqe->buf_index);
978         if (unlikely(buf_index >= ctx->nr_user_bufs))
979                 return -EFAULT;
980
981         index = array_index_nospec(buf_index, ctx->nr_user_bufs);
982         imu = &ctx->user_bufs[index];
983         buf_addr = READ_ONCE(sqe->addr);
984
985         /* overflow */
986         if (buf_addr + len < buf_addr)
987                 return -EFAULT;
988         /* not inside the mapped region */
989         if (buf_addr < imu->ubuf || buf_addr + len > imu->ubuf + imu->len)
990                 return -EFAULT;
991
992         /*
993          * May not be a start of buffer, set size appropriately
994          * and advance us to the beginning.
995          */
996         offset = buf_addr - imu->ubuf;
997         iov_iter_bvec(iter, rw, imu->bvec, imu->nr_bvecs, offset + len);
998         if (offset)
999                 iov_iter_advance(iter, offset);
1000
1001         /* don't drop a reference to these pages */
1002         iter->type |= ITER_BVEC_FLAG_NO_REF;
1003         return 0;
1004 }
1005
1006 static int io_import_iovec(struct io_ring_ctx *ctx, int rw,
1007                            const struct sqe_submit *s, struct iovec **iovec,
1008                            struct iov_iter *iter)
1009 {
1010         const struct io_uring_sqe *sqe = s->sqe;
1011         void __user *buf = u64_to_user_ptr(READ_ONCE(sqe->addr));
1012         size_t sqe_len = READ_ONCE(sqe->len);
1013         u8 opcode;
1014
1015         /*
1016          * We're reading ->opcode for the second time, but the first read
1017          * doesn't care whether it's _FIXED or not, so it doesn't matter
1018          * whether ->opcode changes concurrently. The first read does care
1019          * about whether it is a READ or a WRITE, so we don't trust this read
1020          * for that purpose and instead let the caller pass in the read/write
1021          * flag.
1022          */
1023         opcode = READ_ONCE(sqe->opcode);
1024         if (opcode == IORING_OP_READ_FIXED ||
1025             opcode == IORING_OP_WRITE_FIXED) {
1026                 int ret = io_import_fixed(ctx, rw, sqe, iter);
1027                 *iovec = NULL;
1028                 return ret;
1029         }
1030
1031         if (!s->has_user)
1032                 return -EFAULT;
1033
1034 #ifdef CONFIG_COMPAT
1035         if (ctx->compat)
1036                 return compat_import_iovec(rw, buf, sqe_len, UIO_FASTIOV,
1037                                                 iovec, iter);
1038 #endif
1039
1040         return import_iovec(rw, buf, sqe_len, UIO_FASTIOV, iovec, iter);
1041 }
1042
1043 /*
1044  * Make a note of the last file/offset/direction we punted to async
1045  * context. We'll use this information to see if we can piggy back a
1046  * sequential request onto the previous one, if it's still hasn't been
1047  * completed by the async worker.
1048  */
1049 static void io_async_list_note(int rw, struct io_kiocb *req, size_t len)
1050 {
1051         struct async_list *async_list = &req->ctx->pending_async[rw];
1052         struct kiocb *kiocb = &req->rw;
1053         struct file *filp = kiocb->ki_filp;
1054         off_t io_end = kiocb->ki_pos + len;
1055
1056         if (filp == async_list->file && kiocb->ki_pos == async_list->io_end) {
1057                 unsigned long max_pages;
1058
1059                 /* Use 8x RA size as a decent limiter for both reads/writes */
1060                 max_pages = filp->f_ra.ra_pages;
1061                 if (!max_pages)
1062                         max_pages = VM_READAHEAD_PAGES;
1063                 max_pages *= 8;
1064
1065                 /* If max pages are exceeded, reset the state */
1066                 len >>= PAGE_SHIFT;
1067                 if (async_list->io_pages + len <= max_pages) {
1068                         req->flags |= REQ_F_SEQ_PREV;
1069                         async_list->io_pages += len;
1070                 } else {
1071                         io_end = 0;
1072                         async_list->io_pages = 0;
1073                 }
1074         }
1075
1076         /* New file? Reset state. */
1077         if (async_list->file != filp) {
1078                 async_list->io_pages = 0;
1079                 async_list->file = filp;
1080         }
1081         async_list->io_end = io_end;
1082 }
1083
1084 static int io_read(struct io_kiocb *req, const struct sqe_submit *s,
1085                    bool force_nonblock)
1086 {
1087         struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs;
1088         struct kiocb *kiocb = &req->rw;
1089         struct iov_iter iter;
1090         struct file *file;
1091         size_t iov_count;
1092         int ret;
1093
1094         ret = io_prep_rw(req, s, force_nonblock);
1095         if (ret)
1096                 return ret;
1097         file = kiocb->ki_filp;
1098
1099         if (unlikely(!(file->f_mode & FMODE_READ)))
1100                 return -EBADF;
1101         if (unlikely(!file->f_op->read_iter))
1102                 return -EINVAL;
1103
1104         ret = io_import_iovec(req->ctx, READ, s, &iovec, &iter);
1105         if (ret)
1106                 return ret;
1107
1108         iov_count = iov_iter_count(&iter);
1109         ret = rw_verify_area(READ, file, &kiocb->ki_pos, iov_count);
1110         if (!ret) {
1111                 ssize_t ret2;
1112
1113                 /* Catch -EAGAIN return for forced non-blocking submission */
1114                 ret2 = call_read_iter(file, kiocb, &iter);
1115                 if (!force_nonblock || ret2 != -EAGAIN) {
1116                         io_rw_done(kiocb, ret2);
1117                 } else {
1118                         /*
1119                          * If ->needs_lock is true, we're already in async
1120                          * context.
1121                          */
1122                         if (!s->needs_lock)
1123                                 io_async_list_note(READ, req, iov_count);
1124                         ret = -EAGAIN;
1125                 }
1126         }
1127         kfree(iovec);
1128         return ret;
1129 }
1130
1131 static int io_write(struct io_kiocb *req, const struct sqe_submit *s,
1132                     bool force_nonblock)
1133 {
1134         struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs;
1135         struct kiocb *kiocb = &req->rw;
1136         struct iov_iter iter;
1137         struct file *file;
1138         size_t iov_count;
1139         int ret;
1140
1141         ret = io_prep_rw(req, s, force_nonblock);
1142         if (ret)
1143                 return ret;
1144
1145         file = kiocb->ki_filp;
1146         if (unlikely(!(file->f_mode & FMODE_WRITE)))
1147                 return -EBADF;
1148         if (unlikely(!file->f_op->write_iter))
1149                 return -EINVAL;
1150
1151         ret = io_import_iovec(req->ctx, WRITE, s, &iovec, &iter);
1152         if (ret)
1153                 return ret;
1154
1155         iov_count = iov_iter_count(&iter);
1156
1157         ret = -EAGAIN;
1158         if (force_nonblock && !(kiocb->ki_flags & IOCB_DIRECT)) {
1159                 /* If ->needs_lock is true, we're already in async context. */
1160                 if (!s->needs_lock)
1161                         io_async_list_note(WRITE, req, iov_count);
1162                 goto out_free;
1163         }
1164
1165         ret = rw_verify_area(WRITE, file, &kiocb->ki_pos, iov_count);
1166         if (!ret) {
1167                 ssize_t ret2;
1168
1169                 /*
1170                  * Open-code file_start_write here to grab freeze protection,
1171                  * which will be released by another thread in
1172                  * io_complete_rw().  Fool lockdep by telling it the lock got
1173                  * released so that it doesn't complain about the held lock when
1174                  * we return to userspace.
1175                  */
1176                 if (S_ISREG(file_inode(file)->i_mode)) {
1177                         __sb_start_write(file_inode(file)->i_sb,
1178                                                 SB_FREEZE_WRITE, true);
1179                         __sb_writers_release(file_inode(file)->i_sb,
1180                                                 SB_FREEZE_WRITE);
1181                 }
1182                 kiocb->ki_flags |= IOCB_WRITE;
1183
1184                 ret2 = call_write_iter(file, kiocb, &iter);
1185                 if (!force_nonblock || ret2 != -EAGAIN) {
1186                         io_rw_done(kiocb, ret2);
1187                 } else {
1188                         /*
1189                          * If ->needs_lock is true, we're already in async
1190                          * context.
1191                          */
1192                         if (!s->needs_lock)
1193                                 io_async_list_note(WRITE, req, iov_count);
1194                         ret = -EAGAIN;
1195                 }
1196         }
1197 out_free:
1198         kfree(iovec);
1199         return ret;
1200 }
1201
1202 /*
1203  * IORING_OP_NOP just posts a completion event, nothing else.
1204  */
1205 static int io_nop(struct io_kiocb *req, u64 user_data)
1206 {
1207         struct io_ring_ctx *ctx = req->ctx;
1208         long err = 0;
1209
1210         if (unlikely(ctx->flags & IORING_SETUP_IOPOLL))
1211                 return -EINVAL;
1212
1213         io_cqring_add_event(ctx, user_data, err);
1214         io_put_req(req);
1215         return 0;
1216 }
1217
1218 static int io_prep_fsync(struct io_kiocb *req, const struct io_uring_sqe *sqe)
1219 {
1220         struct io_ring_ctx *ctx = req->ctx;
1221
1222         if (!req->file)
1223                 return -EBADF;
1224
1225         if (unlikely(ctx->flags & IORING_SETUP_IOPOLL))
1226                 return -EINVAL;
1227         if (unlikely(sqe->addr || sqe->ioprio || sqe->buf_index))
1228                 return -EINVAL;
1229
1230         return 0;
1231 }
1232
1233 static int io_fsync(struct io_kiocb *req, const struct io_uring_sqe *sqe,
1234                     bool force_nonblock)
1235 {
1236         loff_t sqe_off = READ_ONCE(sqe->off);
1237         loff_t sqe_len = READ_ONCE(sqe->len);
1238         loff_t end = sqe_off + sqe_len;
1239         unsigned fsync_flags;
1240         int ret;
1241
1242         fsync_flags = READ_ONCE(sqe->fsync_flags);
1243         if (unlikely(fsync_flags & ~IORING_FSYNC_DATASYNC))
1244                 return -EINVAL;
1245
1246         ret = io_prep_fsync(req, sqe);
1247         if (ret)
1248                 return ret;
1249
1250         /* fsync always requires a blocking context */
1251         if (force_nonblock)
1252                 return -EAGAIN;
1253
1254         ret = vfs_fsync_range(req->rw.ki_filp, sqe_off,
1255                                 end > 0 ? end : LLONG_MAX,
1256                                 fsync_flags & IORING_FSYNC_DATASYNC);
1257
1258         io_cqring_add_event(req->ctx, sqe->user_data, ret);
1259         io_put_req(req);
1260         return 0;
1261 }
1262
1263 static int io_prep_sfr(struct io_kiocb *req, const struct io_uring_sqe *sqe)
1264 {
1265         struct io_ring_ctx *ctx = req->ctx;
1266         int ret = 0;
1267
1268         if (!req->file)
1269                 return -EBADF;
1270
1271         if (unlikely(ctx->flags & IORING_SETUP_IOPOLL))
1272                 return -EINVAL;
1273         if (unlikely(sqe->addr || sqe->ioprio || sqe->buf_index))
1274                 return -EINVAL;
1275
1276         return ret;
1277 }
1278
1279 static int io_sync_file_range(struct io_kiocb *req,
1280                               const struct io_uring_sqe *sqe,
1281                               bool force_nonblock)
1282 {
1283         loff_t sqe_off;
1284         loff_t sqe_len;
1285         unsigned flags;
1286         int ret;
1287
1288         ret = io_prep_sfr(req, sqe);
1289         if (ret)
1290                 return ret;
1291
1292         /* sync_file_range always requires a blocking context */
1293         if (force_nonblock)
1294                 return -EAGAIN;
1295
1296         sqe_off = READ_ONCE(sqe->off);
1297         sqe_len = READ_ONCE(sqe->len);
1298         flags = READ_ONCE(sqe->sync_range_flags);
1299
1300         ret = sync_file_range(req->rw.ki_filp, sqe_off, sqe_len, flags);
1301
1302         io_cqring_add_event(req->ctx, sqe->user_data, ret);
1303         io_put_req(req);
1304         return 0;
1305 }
1306
1307 static void io_poll_remove_one(struct io_kiocb *req)
1308 {
1309         struct io_poll_iocb *poll = &req->poll;
1310
1311         spin_lock(&poll->head->lock);
1312         WRITE_ONCE(poll->canceled, true);
1313         if (!list_empty(&poll->wait.entry)) {
1314                 list_del_init(&poll->wait.entry);
1315                 queue_work(req->ctx->sqo_wq, &req->work);
1316         }
1317         spin_unlock(&poll->head->lock);
1318
1319         list_del_init(&req->list);
1320 }
1321
1322 static void io_poll_remove_all(struct io_ring_ctx *ctx)
1323 {
1324         struct io_kiocb *req;
1325
1326         spin_lock_irq(&ctx->completion_lock);
1327         while (!list_empty(&ctx->cancel_list)) {
1328                 req = list_first_entry(&ctx->cancel_list, struct io_kiocb,list);
1329                 io_poll_remove_one(req);
1330         }
1331         spin_unlock_irq(&ctx->completion_lock);
1332 }
1333
1334 /*
1335  * Find a running poll command that matches one specified in sqe->addr,
1336  * and remove it if found.
1337  */
1338 static int io_poll_remove(struct io_kiocb *req, const struct io_uring_sqe *sqe)
1339 {
1340         struct io_ring_ctx *ctx = req->ctx;
1341         struct io_kiocb *poll_req, *next;
1342         int ret = -ENOENT;
1343
1344         if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL))
1345                 return -EINVAL;
1346         if (sqe->ioprio || sqe->off || sqe->len || sqe->buf_index ||
1347             sqe->poll_events)
1348                 return -EINVAL;
1349
1350         spin_lock_irq(&ctx->completion_lock);
1351         list_for_each_entry_safe(poll_req, next, &ctx->cancel_list, list) {
1352                 if (READ_ONCE(sqe->addr) == poll_req->user_data) {
1353                         io_poll_remove_one(poll_req);
1354                         ret = 0;
1355                         break;
1356                 }
1357         }
1358         spin_unlock_irq(&ctx->completion_lock);
1359
1360         io_cqring_add_event(req->ctx, sqe->user_data, ret);
1361         io_put_req(req);
1362         return 0;
1363 }
1364
1365 static void io_poll_complete(struct io_ring_ctx *ctx, struct io_kiocb *req,
1366                              __poll_t mask)
1367 {
1368         req->poll.done = true;
1369         io_cqring_fill_event(ctx, req->user_data, mangle_poll(mask));
1370         io_commit_cqring(ctx);
1371 }
1372
1373 static void io_poll_complete_work(struct work_struct *work)
1374 {
1375         struct io_kiocb *req = container_of(work, struct io_kiocb, work);
1376         struct io_poll_iocb *poll = &req->poll;
1377         struct poll_table_struct pt = { ._key = poll->events };
1378         struct io_ring_ctx *ctx = req->ctx;
1379         __poll_t mask = 0;
1380
1381         if (!READ_ONCE(poll->canceled))
1382                 mask = vfs_poll(poll->file, &pt) & poll->events;
1383
1384         /*
1385          * Note that ->ki_cancel callers also delete iocb from active_reqs after
1386          * calling ->ki_cancel.  We need the ctx_lock roundtrip here to
1387          * synchronize with them.  In the cancellation case the list_del_init
1388          * itself is not actually needed, but harmless so we keep it in to
1389          * avoid further branches in the fast path.
1390          */
1391         spin_lock_irq(&ctx->completion_lock);
1392         if (!mask && !READ_ONCE(poll->canceled)) {
1393                 add_wait_queue(poll->head, &poll->wait);
1394                 spin_unlock_irq(&ctx->completion_lock);
1395                 return;
1396         }
1397         list_del_init(&req->list);
1398         io_poll_complete(ctx, req, mask);
1399         spin_unlock_irq(&ctx->completion_lock);
1400
1401         io_cqring_ev_posted(ctx);
1402         io_put_req(req);
1403 }
1404
1405 static int io_poll_wake(struct wait_queue_entry *wait, unsigned mode, int sync,
1406                         void *key)
1407 {
1408         struct io_poll_iocb *poll = container_of(wait, struct io_poll_iocb,
1409                                                         wait);
1410         struct io_kiocb *req = container_of(poll, struct io_kiocb, poll);
1411         struct io_ring_ctx *ctx = req->ctx;
1412         __poll_t mask = key_to_poll(key);
1413         unsigned long flags;
1414
1415         /* for instances that support it check for an event match first: */
1416         if (mask && !(mask & poll->events))
1417                 return 0;
1418
1419         list_del_init(&poll->wait.entry);
1420
1421         if (mask && spin_trylock_irqsave(&ctx->completion_lock, flags)) {
1422                 list_del(&req->list);
1423                 io_poll_complete(ctx, req, mask);
1424                 spin_unlock_irqrestore(&ctx->completion_lock, flags);
1425
1426                 io_cqring_ev_posted(ctx);
1427                 io_put_req(req);
1428         } else {
1429                 queue_work(ctx->sqo_wq, &req->work);
1430         }
1431
1432         return 1;
1433 }
1434
1435 struct io_poll_table {
1436         struct poll_table_struct pt;
1437         struct io_kiocb *req;
1438         int error;
1439 };
1440
1441 static void io_poll_queue_proc(struct file *file, struct wait_queue_head *head,
1442                                struct poll_table_struct *p)
1443 {
1444         struct io_poll_table *pt = container_of(p, struct io_poll_table, pt);
1445
1446         if (unlikely(pt->req->poll.head)) {
1447                 pt->error = -EINVAL;
1448                 return;
1449         }
1450
1451         pt->error = 0;
1452         pt->req->poll.head = head;
1453         add_wait_queue(head, &pt->req->poll.wait);
1454 }
1455
1456 static int io_poll_add(struct io_kiocb *req, const struct io_uring_sqe *sqe)
1457 {
1458         struct io_poll_iocb *poll = &req->poll;
1459         struct io_ring_ctx *ctx = req->ctx;
1460         struct io_poll_table ipt;
1461         bool cancel = false;
1462         __poll_t mask;
1463         u16 events;
1464
1465         if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL))
1466                 return -EINVAL;
1467         if (sqe->addr || sqe->ioprio || sqe->off || sqe->len || sqe->buf_index)
1468                 return -EINVAL;
1469         if (!poll->file)
1470                 return -EBADF;
1471
1472         INIT_WORK(&req->work, io_poll_complete_work);
1473         events = READ_ONCE(sqe->poll_events);
1474         poll->events = demangle_poll(events) | EPOLLERR | EPOLLHUP;
1475
1476         poll->head = NULL;
1477         poll->done = false;
1478         poll->canceled = false;
1479
1480         ipt.pt._qproc = io_poll_queue_proc;
1481         ipt.pt._key = poll->events;
1482         ipt.req = req;
1483         ipt.error = -EINVAL; /* same as no support for IOCB_CMD_POLL */
1484
1485         /* initialized the list so that we can do list_empty checks */
1486         INIT_LIST_HEAD(&poll->wait.entry);
1487         init_waitqueue_func_entry(&poll->wait, io_poll_wake);
1488
1489         mask = vfs_poll(poll->file, &ipt.pt) & poll->events;
1490
1491         spin_lock_irq(&ctx->completion_lock);
1492         if (likely(poll->head)) {
1493                 spin_lock(&poll->head->lock);
1494                 if (unlikely(list_empty(&poll->wait.entry))) {
1495                         if (ipt.error)
1496                                 cancel = true;
1497                         ipt.error = 0;
1498                         mask = 0;
1499                 }
1500                 if (mask || ipt.error)
1501                         list_del_init(&poll->wait.entry);
1502                 else if (cancel)
1503                         WRITE_ONCE(poll->canceled, true);
1504                 else if (!poll->done) /* actually waiting for an event */
1505                         list_add_tail(&req->list, &ctx->cancel_list);
1506                 spin_unlock(&poll->head->lock);
1507         }
1508         if (mask) { /* no async, we'd stolen it */
1509                 ipt.error = 0;
1510                 io_poll_complete(ctx, req, mask);
1511         }
1512         spin_unlock_irq(&ctx->completion_lock);
1513
1514         if (mask) {
1515                 io_cqring_ev_posted(ctx);
1516                 io_put_req(req);
1517         }
1518         return ipt.error;
1519 }
1520
1521 static int io_req_defer(struct io_ring_ctx *ctx, struct io_kiocb *req,
1522                         const struct io_uring_sqe *sqe)
1523 {
1524         struct io_uring_sqe *sqe_copy;
1525
1526         if (!io_sequence_defer(ctx, req) && list_empty(&ctx->defer_list))
1527                 return 0;
1528
1529         sqe_copy = kmalloc(sizeof(*sqe_copy), GFP_KERNEL);
1530         if (!sqe_copy)
1531                 return -EAGAIN;
1532
1533         spin_lock_irq(&ctx->completion_lock);
1534         if (!io_sequence_defer(ctx, req) && list_empty(&ctx->defer_list)) {
1535                 spin_unlock_irq(&ctx->completion_lock);
1536                 kfree(sqe_copy);
1537                 return 0;
1538         }
1539
1540         memcpy(sqe_copy, sqe, sizeof(*sqe_copy));
1541         req->submit.sqe = sqe_copy;
1542
1543         INIT_WORK(&req->work, io_sq_wq_submit_work);
1544         list_add_tail(&req->list, &ctx->defer_list);
1545         spin_unlock_irq(&ctx->completion_lock);
1546         return -EIOCBQUEUED;
1547 }
1548
1549 static int __io_submit_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
1550                            const struct sqe_submit *s, bool force_nonblock)
1551 {
1552         int ret, opcode;
1553
1554         if (unlikely(s->index >= ctx->sq_entries))
1555                 return -EINVAL;
1556         req->user_data = READ_ONCE(s->sqe->user_data);
1557
1558         opcode = READ_ONCE(s->sqe->opcode);
1559         switch (opcode) {
1560         case IORING_OP_NOP:
1561                 ret = io_nop(req, req->user_data);
1562                 break;
1563         case IORING_OP_READV:
1564                 if (unlikely(s->sqe->buf_index))
1565                         return -EINVAL;
1566                 ret = io_read(req, s, force_nonblock);
1567                 break;
1568         case IORING_OP_WRITEV:
1569                 if (unlikely(s->sqe->buf_index))
1570                         return -EINVAL;
1571                 ret = io_write(req, s, force_nonblock);
1572                 break;
1573         case IORING_OP_READ_FIXED:
1574                 ret = io_read(req, s, force_nonblock);
1575                 break;
1576         case IORING_OP_WRITE_FIXED:
1577                 ret = io_write(req, s, force_nonblock);
1578                 break;
1579         case IORING_OP_FSYNC:
1580                 ret = io_fsync(req, s->sqe, force_nonblock);
1581                 break;
1582         case IORING_OP_POLL_ADD:
1583                 ret = io_poll_add(req, s->sqe);
1584                 break;
1585         case IORING_OP_POLL_REMOVE:
1586                 ret = io_poll_remove(req, s->sqe);
1587                 break;
1588         case IORING_OP_SYNC_FILE_RANGE:
1589                 ret = io_sync_file_range(req, s->sqe, force_nonblock);
1590                 break;
1591         default:
1592                 ret = -EINVAL;
1593                 break;
1594         }
1595
1596         if (ret)
1597                 return ret;
1598
1599         if (ctx->flags & IORING_SETUP_IOPOLL) {
1600                 if (req->error == -EAGAIN)
1601                         return -EAGAIN;
1602
1603                 /* workqueue context doesn't hold uring_lock, grab it now */
1604                 if (s->needs_lock)
1605                         mutex_lock(&ctx->uring_lock);
1606                 io_iopoll_req_issued(req);
1607                 if (s->needs_lock)
1608                         mutex_unlock(&ctx->uring_lock);
1609         }
1610
1611         return 0;
1612 }
1613
1614 static struct async_list *io_async_list_from_sqe(struct io_ring_ctx *ctx,
1615                                                  const struct io_uring_sqe *sqe)
1616 {
1617         switch (sqe->opcode) {
1618         case IORING_OP_READV:
1619         case IORING_OP_READ_FIXED:
1620                 return &ctx->pending_async[READ];
1621         case IORING_OP_WRITEV:
1622         case IORING_OP_WRITE_FIXED:
1623                 return &ctx->pending_async[WRITE];
1624         default:
1625                 return NULL;
1626         }
1627 }
1628
1629 static inline bool io_sqe_needs_user(const struct io_uring_sqe *sqe)
1630 {
1631         u8 opcode = READ_ONCE(sqe->opcode);
1632
1633         return !(opcode == IORING_OP_READ_FIXED ||
1634                  opcode == IORING_OP_WRITE_FIXED);
1635 }
1636
1637 static void io_sq_wq_submit_work(struct work_struct *work)
1638 {
1639         struct io_kiocb *req = container_of(work, struct io_kiocb, work);
1640         struct io_ring_ctx *ctx = req->ctx;
1641         struct mm_struct *cur_mm = NULL;
1642         struct async_list *async_list;
1643         LIST_HEAD(req_list);
1644         mm_segment_t old_fs;
1645         int ret;
1646
1647         async_list = io_async_list_from_sqe(ctx, req->submit.sqe);
1648 restart:
1649         do {
1650                 struct sqe_submit *s = &req->submit;
1651                 const struct io_uring_sqe *sqe = s->sqe;
1652
1653                 /* Ensure we clear previously set non-block flag */
1654                 req->rw.ki_flags &= ~IOCB_NOWAIT;
1655
1656                 ret = 0;
1657                 if (io_sqe_needs_user(sqe) && !cur_mm) {
1658                         if (!mmget_not_zero(ctx->sqo_mm)) {
1659                                 ret = -EFAULT;
1660                         } else {
1661                                 cur_mm = ctx->sqo_mm;
1662                                 use_mm(cur_mm);
1663                                 old_fs = get_fs();
1664                                 set_fs(USER_DS);
1665                         }
1666                 }
1667
1668                 if (!ret) {
1669                         s->has_user = cur_mm != NULL;
1670                         s->needs_lock = true;
1671                         do {
1672                                 ret = __io_submit_sqe(ctx, req, s, false);
1673                                 /*
1674                                  * We can get EAGAIN for polled IO even though
1675                                  * we're forcing a sync submission from here,
1676                                  * since we can't wait for request slots on the
1677                                  * block side.
1678                                  */
1679                                 if (ret != -EAGAIN)
1680                                         break;
1681                                 cond_resched();
1682                         } while (1);
1683                 }
1684
1685                 /* drop submission reference */
1686                 io_put_req(req);
1687
1688                 if (ret) {
1689                         io_cqring_add_event(ctx, sqe->user_data, ret);
1690                         io_put_req(req);
1691                 }
1692
1693                 /* async context always use a copy of the sqe */
1694                 kfree(sqe);
1695
1696                 if (!async_list)
1697                         break;
1698                 if (!list_empty(&req_list)) {
1699                         req = list_first_entry(&req_list, struct io_kiocb,
1700                                                 list);
1701                         list_del(&req->list);
1702                         continue;
1703                 }
1704                 if (list_empty(&async_list->list))
1705                         break;
1706
1707                 req = NULL;
1708                 spin_lock(&async_list->lock);
1709                 if (list_empty(&async_list->list)) {
1710                         spin_unlock(&async_list->lock);
1711                         break;
1712                 }
1713                 list_splice_init(&async_list->list, &req_list);
1714                 spin_unlock(&async_list->lock);
1715
1716                 req = list_first_entry(&req_list, struct io_kiocb, list);
1717                 list_del(&req->list);
1718         } while (req);
1719
1720         /*
1721          * Rare case of racing with a submitter. If we find the count has
1722          * dropped to zero AND we have pending work items, then restart
1723          * the processing. This is a tiny race window.
1724          */
1725         if (async_list) {
1726                 ret = atomic_dec_return(&async_list->cnt);
1727                 while (!ret && !list_empty(&async_list->list)) {
1728                         spin_lock(&async_list->lock);
1729                         atomic_inc(&async_list->cnt);
1730                         list_splice_init(&async_list->list, &req_list);
1731                         spin_unlock(&async_list->lock);
1732
1733                         if (!list_empty(&req_list)) {
1734                                 req = list_first_entry(&req_list,
1735                                                         struct io_kiocb, list);
1736                                 list_del(&req->list);
1737                                 goto restart;
1738                         }
1739                         ret = atomic_dec_return(&async_list->cnt);
1740                 }
1741         }
1742
1743         if (cur_mm) {
1744                 set_fs(old_fs);
1745                 unuse_mm(cur_mm);
1746                 mmput(cur_mm);
1747         }
1748 }
1749
1750 /*
1751  * See if we can piggy back onto previously submitted work, that is still
1752  * running. We currently only allow this if the new request is sequential
1753  * to the previous one we punted.
1754  */
1755 static bool io_add_to_prev_work(struct async_list *list, struct io_kiocb *req)
1756 {
1757         bool ret = false;
1758
1759         if (!list)
1760                 return false;
1761         if (!(req->flags & REQ_F_SEQ_PREV))
1762                 return false;
1763         if (!atomic_read(&list->cnt))
1764                 return false;
1765
1766         ret = true;
1767         spin_lock(&list->lock);
1768         list_add_tail(&req->list, &list->list);
1769         if (!atomic_read(&list->cnt)) {
1770                 list_del_init(&req->list);
1771                 ret = false;
1772         }
1773         spin_unlock(&list->lock);
1774         return ret;
1775 }
1776
1777 static bool io_op_needs_file(const struct io_uring_sqe *sqe)
1778 {
1779         int op = READ_ONCE(sqe->opcode);
1780
1781         switch (op) {
1782         case IORING_OP_NOP:
1783         case IORING_OP_POLL_REMOVE:
1784                 return false;
1785         default:
1786                 return true;
1787         }
1788 }
1789
1790 static int io_req_set_file(struct io_ring_ctx *ctx, const struct sqe_submit *s,
1791                            struct io_submit_state *state, struct io_kiocb *req)
1792 {
1793         unsigned flags;
1794         int fd;
1795
1796         flags = READ_ONCE(s->sqe->flags);
1797         fd = READ_ONCE(s->sqe->fd);
1798
1799         if (flags & IOSQE_IO_DRAIN) {
1800                 req->flags |= REQ_F_IO_DRAIN;
1801                 req->sequence = ctx->cached_sq_head - 1;
1802         }
1803
1804         if (!io_op_needs_file(s->sqe)) {
1805                 req->file = NULL;
1806                 return 0;
1807         }
1808
1809         if (flags & IOSQE_FIXED_FILE) {
1810                 if (unlikely(!ctx->user_files ||
1811                     (unsigned) fd >= ctx->nr_user_files))
1812                         return -EBADF;
1813                 req->file = ctx->user_files[fd];
1814                 req->flags |= REQ_F_FIXED_FILE;
1815         } else {
1816                 if (s->needs_fixed_file)
1817                         return -EBADF;
1818                 req->file = io_file_get(state, fd);
1819                 if (unlikely(!req->file))
1820                         return -EBADF;
1821         }
1822
1823         return 0;
1824 }
1825
1826 static int io_submit_sqe(struct io_ring_ctx *ctx, struct sqe_submit *s,
1827                          struct io_submit_state *state)
1828 {
1829         struct io_kiocb *req;
1830         int ret;
1831
1832         /* enforce forwards compatibility on users */
1833         if (unlikely(s->sqe->flags & ~(IOSQE_FIXED_FILE | IOSQE_IO_DRAIN)))
1834                 return -EINVAL;
1835
1836         req = io_get_req(ctx, state);
1837         if (unlikely(!req))
1838                 return -EAGAIN;
1839
1840         ret = io_req_set_file(ctx, s, state, req);
1841         if (unlikely(ret))
1842                 goto out;
1843
1844         ret = io_req_defer(ctx, req, s->sqe);
1845         if (ret) {
1846                 if (ret == -EIOCBQUEUED)
1847                         ret = 0;
1848                 return ret;
1849         }
1850
1851         ret = __io_submit_sqe(ctx, req, s, true);
1852         if (ret == -EAGAIN && !(req->flags & REQ_F_NOWAIT)) {
1853                 struct io_uring_sqe *sqe_copy;
1854
1855                 sqe_copy = kmalloc(sizeof(*sqe_copy), GFP_KERNEL);
1856                 if (sqe_copy) {
1857                         struct async_list *list;
1858
1859                         memcpy(sqe_copy, s->sqe, sizeof(*sqe_copy));
1860                         s->sqe = sqe_copy;
1861
1862                         memcpy(&req->submit, s, sizeof(*s));
1863                         list = io_async_list_from_sqe(ctx, s->sqe);
1864                         if (!io_add_to_prev_work(list, req)) {
1865                                 if (list)
1866                                         atomic_inc(&list->cnt);
1867                                 INIT_WORK(&req->work, io_sq_wq_submit_work);
1868                                 queue_work(ctx->sqo_wq, &req->work);
1869                         }
1870
1871                         /*
1872                          * Queued up for async execution, worker will release
1873                          * submit reference when the iocb is actually
1874                          * submitted.
1875                          */
1876                         return 0;
1877                 }
1878         }
1879
1880 out:
1881         /* drop submission reference */
1882         io_put_req(req);
1883
1884         /* and drop final reference, if we failed */
1885         if (ret)
1886                 io_put_req(req);
1887
1888         return ret;
1889 }
1890
1891 /*
1892  * Batched submission is done, ensure local IO is flushed out.
1893  */
1894 static void io_submit_state_end(struct io_submit_state *state)
1895 {
1896         blk_finish_plug(&state->plug);
1897         io_file_put(state);
1898         if (state->free_reqs)
1899                 kmem_cache_free_bulk(req_cachep, state->free_reqs,
1900                                         &state->reqs[state->cur_req]);
1901 }
1902
1903 /*
1904  * Start submission side cache.
1905  */
1906 static void io_submit_state_start(struct io_submit_state *state,
1907                                   struct io_ring_ctx *ctx, unsigned max_ios)
1908 {
1909         blk_start_plug(&state->plug);
1910         state->free_reqs = 0;
1911         state->file = NULL;
1912         state->ios_left = max_ios;
1913 }
1914
1915 static void io_commit_sqring(struct io_ring_ctx *ctx)
1916 {
1917         struct io_sq_ring *ring = ctx->sq_ring;
1918
1919         if (ctx->cached_sq_head != READ_ONCE(ring->r.head)) {
1920                 /*
1921                  * Ensure any loads from the SQEs are done at this point,
1922                  * since once we write the new head, the application could
1923                  * write new data to them.
1924                  */
1925                 smp_store_release(&ring->r.head, ctx->cached_sq_head);
1926         }
1927 }
1928
1929 /*
1930  * Fetch an sqe, if one is available. Note that s->sqe will point to memory
1931  * that is mapped by userspace. This means that care needs to be taken to
1932  * ensure that reads are stable, as we cannot rely on userspace always
1933  * being a good citizen. If members of the sqe are validated and then later
1934  * used, it's important that those reads are done through READ_ONCE() to
1935  * prevent a re-load down the line.
1936  */
1937 static bool io_get_sqring(struct io_ring_ctx *ctx, struct sqe_submit *s)
1938 {
1939         struct io_sq_ring *ring = ctx->sq_ring;
1940         unsigned head;
1941
1942         /*
1943          * The cached sq head (or cq tail) serves two purposes:
1944          *
1945          * 1) allows us to batch the cost of updating the user visible
1946          *    head updates.
1947          * 2) allows the kernel side to track the head on its own, even
1948          *    though the application is the one updating it.
1949          */
1950         head = ctx->cached_sq_head;
1951         /* make sure SQ entry isn't read before tail */
1952         if (head == smp_load_acquire(&ring->r.tail))
1953                 return false;
1954
1955         head = READ_ONCE(ring->array[head & ctx->sq_mask]);
1956         if (head < ctx->sq_entries) {
1957                 s->index = head;
1958                 s->sqe = &ctx->sq_sqes[head];
1959                 ctx->cached_sq_head++;
1960                 return true;
1961         }
1962
1963         /* drop invalid entries */
1964         ctx->cached_sq_head++;
1965         ring->dropped++;
1966         return false;
1967 }
1968
1969 static int io_submit_sqes(struct io_ring_ctx *ctx, struct sqe_submit *sqes,
1970                           unsigned int nr, bool has_user, bool mm_fault)
1971 {
1972         struct io_submit_state state, *statep = NULL;
1973         int ret, i, submitted = 0;
1974
1975         if (nr > IO_PLUG_THRESHOLD) {
1976                 io_submit_state_start(&state, ctx, nr);
1977                 statep = &state;
1978         }
1979
1980         for (i = 0; i < nr; i++) {
1981                 if (unlikely(mm_fault)) {
1982                         ret = -EFAULT;
1983                 } else {
1984                         sqes[i].has_user = has_user;
1985                         sqes[i].needs_lock = true;
1986                         sqes[i].needs_fixed_file = true;
1987                         ret = io_submit_sqe(ctx, &sqes[i], statep);
1988                 }
1989                 if (!ret) {
1990                         submitted++;
1991                         continue;
1992                 }
1993
1994                 io_cqring_add_event(ctx, sqes[i].sqe->user_data, ret);
1995         }
1996
1997         if (statep)
1998                 io_submit_state_end(&state);
1999
2000         return submitted;
2001 }
2002
2003 static int io_sq_thread(void *data)
2004 {
2005         struct sqe_submit sqes[IO_IOPOLL_BATCH];
2006         struct io_ring_ctx *ctx = data;
2007         struct mm_struct *cur_mm = NULL;
2008         mm_segment_t old_fs;
2009         DEFINE_WAIT(wait);
2010         unsigned inflight;
2011         unsigned long timeout;
2012
2013         old_fs = get_fs();
2014         set_fs(USER_DS);
2015
2016         timeout = inflight = 0;
2017         while (!kthread_should_park()) {
2018                 bool all_fixed, mm_fault = false;
2019                 int i;
2020
2021                 if (inflight) {
2022                         unsigned nr_events = 0;
2023
2024                         if (ctx->flags & IORING_SETUP_IOPOLL) {
2025                                 /*
2026                                  * We disallow the app entering submit/complete
2027                                  * with polling, but we still need to lock the
2028                                  * ring to prevent racing with polled issue
2029                                  * that got punted to a workqueue.
2030                                  */
2031                                 mutex_lock(&ctx->uring_lock);
2032                                 io_iopoll_check(ctx, &nr_events, 0);
2033                                 mutex_unlock(&ctx->uring_lock);
2034                         } else {
2035                                 /*
2036                                  * Normal IO, just pretend everything completed.
2037                                  * We don't have to poll completions for that.
2038                                  */
2039                                 nr_events = inflight;
2040                         }
2041
2042                         inflight -= nr_events;
2043                         if (!inflight)
2044                                 timeout = jiffies + ctx->sq_thread_idle;
2045                 }
2046
2047                 if (!io_get_sqring(ctx, &sqes[0])) {
2048                         /*
2049                          * We're polling. If we're within the defined idle
2050                          * period, then let us spin without work before going
2051                          * to sleep.
2052                          */
2053                         if (inflight || !time_after(jiffies, timeout)) {
2054                                 cpu_relax();
2055                                 continue;
2056                         }
2057
2058                         /*
2059                          * Drop cur_mm before scheduling, we can't hold it for
2060                          * long periods (or over schedule()). Do this before
2061                          * adding ourselves to the waitqueue, as the unuse/drop
2062                          * may sleep.
2063                          */
2064                         if (cur_mm) {
2065                                 unuse_mm(cur_mm);
2066                                 mmput(cur_mm);
2067                                 cur_mm = NULL;
2068                         }
2069
2070                         prepare_to_wait(&ctx->sqo_wait, &wait,
2071                                                 TASK_INTERRUPTIBLE);
2072
2073                         /* Tell userspace we may need a wakeup call */
2074                         ctx->sq_ring->flags |= IORING_SQ_NEED_WAKEUP;
2075                         /* make sure to read SQ tail after writing flags */
2076                         smp_mb();
2077
2078                         if (!io_get_sqring(ctx, &sqes[0])) {
2079                                 if (kthread_should_park()) {
2080                                         finish_wait(&ctx->sqo_wait, &wait);
2081                                         break;
2082                                 }
2083                                 if (signal_pending(current))
2084                                         flush_signals(current);
2085                                 schedule();
2086                                 finish_wait(&ctx->sqo_wait, &wait);
2087
2088                                 ctx->sq_ring->flags &= ~IORING_SQ_NEED_WAKEUP;
2089                                 continue;
2090                         }
2091                         finish_wait(&ctx->sqo_wait, &wait);
2092
2093                         ctx->sq_ring->flags &= ~IORING_SQ_NEED_WAKEUP;
2094                 }
2095
2096                 i = 0;
2097                 all_fixed = true;
2098                 do {
2099                         if (all_fixed && io_sqe_needs_user(sqes[i].sqe))
2100                                 all_fixed = false;
2101
2102                         i++;
2103                         if (i == ARRAY_SIZE(sqes))
2104                                 break;
2105                 } while (io_get_sqring(ctx, &sqes[i]));
2106
2107                 /* Unless all new commands are FIXED regions, grab mm */
2108                 if (!all_fixed && !cur_mm) {
2109                         mm_fault = !mmget_not_zero(ctx->sqo_mm);
2110                         if (!mm_fault) {
2111                                 use_mm(ctx->sqo_mm);
2112                                 cur_mm = ctx->sqo_mm;
2113                         }
2114                 }
2115
2116                 inflight += io_submit_sqes(ctx, sqes, i, cur_mm != NULL,
2117                                                 mm_fault);
2118
2119                 /* Commit SQ ring head once we've consumed all SQEs */
2120                 io_commit_sqring(ctx);
2121         }
2122
2123         set_fs(old_fs);
2124         if (cur_mm) {
2125                 unuse_mm(cur_mm);
2126                 mmput(cur_mm);
2127         }
2128
2129         kthread_parkme();
2130
2131         return 0;
2132 }
2133
2134 static int io_ring_submit(struct io_ring_ctx *ctx, unsigned int to_submit)
2135 {
2136         struct io_submit_state state, *statep = NULL;
2137         int i, submit = 0;
2138
2139         if (to_submit > IO_PLUG_THRESHOLD) {
2140                 io_submit_state_start(&state, ctx, to_submit);
2141                 statep = &state;
2142         }
2143
2144         for (i = 0; i < to_submit; i++) {
2145                 struct sqe_submit s;
2146                 int ret;
2147
2148                 if (!io_get_sqring(ctx, &s))
2149                         break;
2150
2151                 s.has_user = true;
2152                 s.needs_lock = false;
2153                 s.needs_fixed_file = false;
2154                 submit++;
2155
2156                 ret = io_submit_sqe(ctx, &s, statep);
2157                 if (ret)
2158                         io_cqring_add_event(ctx, s.sqe->user_data, ret);
2159         }
2160         io_commit_sqring(ctx);
2161
2162         if (statep)
2163                 io_submit_state_end(statep);
2164
2165         return submit;
2166 }
2167
2168 static unsigned io_cqring_events(struct io_cq_ring *ring)
2169 {
2170         return READ_ONCE(ring->r.tail) - READ_ONCE(ring->r.head);
2171 }
2172
2173 /*
2174  * Wait until events become available, if we don't already have some. The
2175  * application must reap them itself, as they reside on the shared cq ring.
2176  */
2177 static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events,
2178                           const sigset_t __user *sig, size_t sigsz)
2179 {
2180         struct io_cq_ring *ring = ctx->cq_ring;
2181         sigset_t ksigmask, sigsaved;
2182         DEFINE_WAIT(wait);
2183         int ret;
2184
2185         /* See comment at the top of this file */
2186         smp_rmb();
2187         if (io_cqring_events(ring) >= min_events)
2188                 return 0;
2189
2190         if (sig) {
2191 #ifdef CONFIG_COMPAT
2192                 if (in_compat_syscall())
2193                         ret = set_compat_user_sigmask((const compat_sigset_t __user *)sig,
2194                                                       &ksigmask, &sigsaved, sigsz);
2195                 else
2196 #endif
2197                         ret = set_user_sigmask(sig, &ksigmask,
2198                                                &sigsaved, sigsz);
2199
2200                 if (ret)
2201                         return ret;
2202         }
2203
2204         do {
2205                 prepare_to_wait(&ctx->wait, &wait, TASK_INTERRUPTIBLE);
2206
2207                 ret = 0;
2208                 /* See comment at the top of this file */
2209                 smp_rmb();
2210                 if (io_cqring_events(ring) >= min_events)
2211                         break;
2212
2213                 schedule();
2214
2215                 ret = -EINTR;
2216                 if (signal_pending(current))
2217                         break;
2218         } while (1);
2219
2220         finish_wait(&ctx->wait, &wait);
2221
2222         if (sig)
2223                 restore_user_sigmask(sig, &sigsaved);
2224
2225         return READ_ONCE(ring->r.head) == READ_ONCE(ring->r.tail) ? ret : 0;
2226 }
2227
2228 static void __io_sqe_files_unregister(struct io_ring_ctx *ctx)
2229 {
2230 #if defined(CONFIG_UNIX)
2231         if (ctx->ring_sock) {
2232                 struct sock *sock = ctx->ring_sock->sk;
2233                 struct sk_buff *skb;
2234
2235                 while ((skb = skb_dequeue(&sock->sk_receive_queue)) != NULL)
2236                         kfree_skb(skb);
2237         }
2238 #else
2239         int i;
2240
2241         for (i = 0; i < ctx->nr_user_files; i++)
2242                 fput(ctx->user_files[i]);
2243 #endif
2244 }
2245
2246 static int io_sqe_files_unregister(struct io_ring_ctx *ctx)
2247 {
2248         if (!ctx->user_files)
2249                 return -ENXIO;
2250
2251         __io_sqe_files_unregister(ctx);
2252         kfree(ctx->user_files);
2253         ctx->user_files = NULL;
2254         ctx->nr_user_files = 0;
2255         return 0;
2256 }
2257
2258 static void io_sq_thread_stop(struct io_ring_ctx *ctx)
2259 {
2260         if (ctx->sqo_thread) {
2261                 /*
2262                  * The park is a bit of a work-around, without it we get
2263                  * warning spews on shutdown with SQPOLL set and affinity
2264                  * set to a single CPU.
2265                  */
2266                 kthread_park(ctx->sqo_thread);
2267                 kthread_stop(ctx->sqo_thread);
2268                 ctx->sqo_thread = NULL;
2269         }
2270 }
2271
2272 static void io_finish_async(struct io_ring_ctx *ctx)
2273 {
2274         io_sq_thread_stop(ctx);
2275
2276         if (ctx->sqo_wq) {
2277                 destroy_workqueue(ctx->sqo_wq);
2278                 ctx->sqo_wq = NULL;
2279         }
2280 }
2281
2282 #if defined(CONFIG_UNIX)
2283 static void io_destruct_skb(struct sk_buff *skb)
2284 {
2285         struct io_ring_ctx *ctx = skb->sk->sk_user_data;
2286
2287         io_finish_async(ctx);
2288         unix_destruct_scm(skb);
2289 }
2290
2291 /*
2292  * Ensure the UNIX gc is aware of our file set, so we are certain that
2293  * the io_uring can be safely unregistered on process exit, even if we have
2294  * loops in the file referencing.
2295  */
2296 static int __io_sqe_files_scm(struct io_ring_ctx *ctx, int nr, int offset)
2297 {
2298         struct sock *sk = ctx->ring_sock->sk;
2299         struct scm_fp_list *fpl;
2300         struct sk_buff *skb;
2301         int i;
2302
2303         if (!capable(CAP_SYS_RESOURCE) && !capable(CAP_SYS_ADMIN)) {
2304                 unsigned long inflight = ctx->user->unix_inflight + nr;
2305
2306                 if (inflight > task_rlimit(current, RLIMIT_NOFILE))
2307                         return -EMFILE;
2308         }
2309
2310         fpl = kzalloc(sizeof(*fpl), GFP_KERNEL);
2311         if (!fpl)
2312                 return -ENOMEM;
2313
2314         skb = alloc_skb(0, GFP_KERNEL);
2315         if (!skb) {
2316                 kfree(fpl);
2317                 return -ENOMEM;
2318         }
2319
2320         skb->sk = sk;
2321         skb->destructor = io_destruct_skb;
2322
2323         fpl->user = get_uid(ctx->user);
2324         for (i = 0; i < nr; i++) {
2325                 fpl->fp[i] = get_file(ctx->user_files[i + offset]);
2326                 unix_inflight(fpl->user, fpl->fp[i]);
2327         }
2328
2329         fpl->max = fpl->count = nr;
2330         UNIXCB(skb).fp = fpl;
2331         refcount_add(skb->truesize, &sk->sk_wmem_alloc);
2332         skb_queue_head(&sk->sk_receive_queue, skb);
2333
2334         for (i = 0; i < nr; i++)
2335                 fput(fpl->fp[i]);
2336
2337         return 0;
2338 }
2339
2340 /*
2341  * If UNIX sockets are enabled, fd passing can cause a reference cycle which
2342  * causes regular reference counting to break down. We rely on the UNIX
2343  * garbage collection to take care of this problem for us.
2344  */
2345 static int io_sqe_files_scm(struct io_ring_ctx *ctx)
2346 {
2347         unsigned left, total;
2348         int ret = 0;
2349
2350         total = 0;
2351         left = ctx->nr_user_files;
2352         while (left) {
2353                 unsigned this_files = min_t(unsigned, left, SCM_MAX_FD);
2354
2355                 ret = __io_sqe_files_scm(ctx, this_files, total);
2356                 if (ret)
2357                         break;
2358                 left -= this_files;
2359                 total += this_files;
2360         }
2361
2362         if (!ret)
2363                 return 0;
2364
2365         while (total < ctx->nr_user_files) {
2366                 fput(ctx->user_files[total]);
2367                 total++;
2368         }
2369
2370         return ret;
2371 }
2372 #else
2373 static int io_sqe_files_scm(struct io_ring_ctx *ctx)
2374 {
2375         return 0;
2376 }
2377 #endif
2378
2379 static int io_sqe_files_register(struct io_ring_ctx *ctx, void __user *arg,
2380                                  unsigned nr_args)
2381 {
2382         __s32 __user *fds = (__s32 __user *) arg;
2383         int fd, ret = 0;
2384         unsigned i;
2385
2386         if (ctx->user_files)
2387                 return -EBUSY;
2388         if (!nr_args)
2389                 return -EINVAL;
2390         if (nr_args > IORING_MAX_FIXED_FILES)
2391                 return -EMFILE;
2392
2393         ctx->user_files = kcalloc(nr_args, sizeof(struct file *), GFP_KERNEL);
2394         if (!ctx->user_files)
2395                 return -ENOMEM;
2396
2397         for (i = 0; i < nr_args; i++) {
2398                 ret = -EFAULT;
2399                 if (copy_from_user(&fd, &fds[i], sizeof(fd)))
2400                         break;
2401
2402                 ctx->user_files[i] = fget(fd);
2403
2404                 ret = -EBADF;
2405                 if (!ctx->user_files[i])
2406                         break;
2407                 /*
2408                  * Don't allow io_uring instances to be registered. If UNIX
2409                  * isn't enabled, then this causes a reference cycle and this
2410                  * instance can never get freed. If UNIX is enabled we'll
2411                  * handle it just fine, but there's still no point in allowing
2412                  * a ring fd as it doesn't support regular read/write anyway.
2413                  */
2414                 if (ctx->user_files[i]->f_op == &io_uring_fops) {
2415                         fput(ctx->user_files[i]);
2416                         break;
2417                 }
2418                 ctx->nr_user_files++;
2419                 ret = 0;
2420         }
2421
2422         if (ret) {
2423                 for (i = 0; i < ctx->nr_user_files; i++)
2424                         fput(ctx->user_files[i]);
2425
2426                 kfree(ctx->user_files);
2427                 ctx->user_files = NULL;
2428                 ctx->nr_user_files = 0;
2429                 return ret;
2430         }
2431
2432         ret = io_sqe_files_scm(ctx);
2433         if (ret)
2434                 io_sqe_files_unregister(ctx);
2435
2436         return ret;
2437 }
2438
2439 static int io_sq_offload_start(struct io_ring_ctx *ctx,
2440                                struct io_uring_params *p)
2441 {
2442         int ret;
2443
2444         init_waitqueue_head(&ctx->sqo_wait);
2445         mmgrab(current->mm);
2446         ctx->sqo_mm = current->mm;
2447
2448         if (ctx->flags & IORING_SETUP_SQPOLL) {
2449                 ret = -EPERM;
2450                 if (!capable(CAP_SYS_ADMIN))
2451                         goto err;
2452
2453                 ctx->sq_thread_idle = msecs_to_jiffies(p->sq_thread_idle);
2454                 if (!ctx->sq_thread_idle)
2455                         ctx->sq_thread_idle = HZ;
2456
2457                 if (p->flags & IORING_SETUP_SQ_AFF) {
2458                         int cpu = p->sq_thread_cpu;
2459
2460                         ret = -EINVAL;
2461                         if (cpu >= nr_cpu_ids)
2462                                 goto err;
2463                         if (!cpu_online(cpu))
2464                                 goto err;
2465
2466                         ctx->sqo_thread = kthread_create_on_cpu(io_sq_thread,
2467                                                         ctx, cpu,
2468                                                         "io_uring-sq");
2469                 } else {
2470                         ctx->sqo_thread = kthread_create(io_sq_thread, ctx,
2471                                                         "io_uring-sq");
2472                 }
2473                 if (IS_ERR(ctx->sqo_thread)) {
2474                         ret = PTR_ERR(ctx->sqo_thread);
2475                         ctx->sqo_thread = NULL;
2476                         goto err;
2477                 }
2478                 wake_up_process(ctx->sqo_thread);
2479         } else if (p->flags & IORING_SETUP_SQ_AFF) {
2480                 /* Can't have SQ_AFF without SQPOLL */
2481                 ret = -EINVAL;
2482                 goto err;
2483         }
2484
2485         /* Do QD, or 2 * CPUS, whatever is smallest */
2486         ctx->sqo_wq = alloc_workqueue("io_ring-wq", WQ_UNBOUND | WQ_FREEZABLE,
2487                         min(ctx->sq_entries - 1, 2 * num_online_cpus()));
2488         if (!ctx->sqo_wq) {
2489                 ret = -ENOMEM;
2490                 goto err;
2491         }
2492
2493         return 0;
2494 err:
2495         io_sq_thread_stop(ctx);
2496         mmdrop(ctx->sqo_mm);
2497         ctx->sqo_mm = NULL;
2498         return ret;
2499 }
2500
2501 static void io_unaccount_mem(struct user_struct *user, unsigned long nr_pages)
2502 {
2503         atomic_long_sub(nr_pages, &user->locked_vm);
2504 }
2505
2506 static int io_account_mem(struct user_struct *user, unsigned long nr_pages)
2507 {
2508         unsigned long page_limit, cur_pages, new_pages;
2509
2510         /* Don't allow more pages than we can safely lock */
2511         page_limit = rlimit(RLIMIT_MEMLOCK) >> PAGE_SHIFT;
2512
2513         do {
2514                 cur_pages = atomic_long_read(&user->locked_vm);
2515                 new_pages = cur_pages + nr_pages;
2516                 if (new_pages > page_limit)
2517                         return -ENOMEM;
2518         } while (atomic_long_cmpxchg(&user->locked_vm, cur_pages,
2519                                         new_pages) != cur_pages);
2520
2521         return 0;
2522 }
2523
2524 static void io_mem_free(void *ptr)
2525 {
2526         struct page *page;
2527
2528         if (!ptr)
2529                 return;
2530
2531         page = virt_to_head_page(ptr);
2532         if (put_page_testzero(page))
2533                 free_compound_page(page);
2534 }
2535
2536 static void *io_mem_alloc(size_t size)
2537 {
2538         gfp_t gfp_flags = GFP_KERNEL | __GFP_ZERO | __GFP_NOWARN | __GFP_COMP |
2539                                 __GFP_NORETRY;
2540
2541         return (void *) __get_free_pages(gfp_flags, get_order(size));
2542 }
2543
2544 static unsigned long ring_pages(unsigned sq_entries, unsigned cq_entries)
2545 {
2546         struct io_sq_ring *sq_ring;
2547         struct io_cq_ring *cq_ring;
2548         size_t bytes;
2549
2550         bytes = struct_size(sq_ring, array, sq_entries);
2551         bytes += array_size(sizeof(struct io_uring_sqe), sq_entries);
2552         bytes += struct_size(cq_ring, cqes, cq_entries);
2553
2554         return (bytes + PAGE_SIZE - 1) / PAGE_SIZE;
2555 }
2556
2557 static int io_sqe_buffer_unregister(struct io_ring_ctx *ctx)
2558 {
2559         int i, j;
2560
2561         if (!ctx->user_bufs)
2562                 return -ENXIO;
2563
2564         for (i = 0; i < ctx->nr_user_bufs; i++) {
2565                 struct io_mapped_ubuf *imu = &ctx->user_bufs[i];
2566
2567                 for (j = 0; j < imu->nr_bvecs; j++)
2568                         put_page(imu->bvec[j].bv_page);
2569
2570                 if (ctx->account_mem)
2571                         io_unaccount_mem(ctx->user, imu->nr_bvecs);
2572                 kvfree(imu->bvec);
2573                 imu->nr_bvecs = 0;
2574         }
2575
2576         kfree(ctx->user_bufs);
2577         ctx->user_bufs = NULL;
2578         ctx->nr_user_bufs = 0;
2579         return 0;
2580 }
2581
2582 static int io_copy_iov(struct io_ring_ctx *ctx, struct iovec *dst,
2583                        void __user *arg, unsigned index)
2584 {
2585         struct iovec __user *src;
2586
2587 #ifdef CONFIG_COMPAT
2588         if (ctx->compat) {
2589                 struct compat_iovec __user *ciovs;
2590                 struct compat_iovec ciov;
2591
2592                 ciovs = (struct compat_iovec __user *) arg;
2593                 if (copy_from_user(&ciov, &ciovs[index], sizeof(ciov)))
2594                         return -EFAULT;
2595
2596                 dst->iov_base = (void __user *) (unsigned long) ciov.iov_base;
2597                 dst->iov_len = ciov.iov_len;
2598                 return 0;
2599         }
2600 #endif
2601         src = (struct iovec __user *) arg;
2602         if (copy_from_user(dst, &src[index], sizeof(*dst)))
2603                 return -EFAULT;
2604         return 0;
2605 }
2606
2607 static int io_sqe_buffer_register(struct io_ring_ctx *ctx, void __user *arg,
2608                                   unsigned nr_args)
2609 {
2610         struct vm_area_struct **vmas = NULL;
2611         struct page **pages = NULL;
2612         int i, j, got_pages = 0;
2613         int ret = -EINVAL;
2614
2615         if (ctx->user_bufs)
2616                 return -EBUSY;
2617         if (!nr_args || nr_args > UIO_MAXIOV)
2618                 return -EINVAL;
2619
2620         ctx->user_bufs = kcalloc(nr_args, sizeof(struct io_mapped_ubuf),
2621                                         GFP_KERNEL);
2622         if (!ctx->user_bufs)
2623                 return -ENOMEM;
2624
2625         for (i = 0; i < nr_args; i++) {
2626                 struct io_mapped_ubuf *imu = &ctx->user_bufs[i];
2627                 unsigned long off, start, end, ubuf;
2628                 int pret, nr_pages;
2629                 struct iovec iov;
2630                 size_t size;
2631
2632                 ret = io_copy_iov(ctx, &iov, arg, i);
2633                 if (ret)
2634                         break;
2635
2636                 /*
2637                  * Don't impose further limits on the size and buffer
2638                  * constraints here, we'll -EINVAL later when IO is
2639                  * submitted if they are wrong.
2640                  */
2641                 ret = -EFAULT;
2642                 if (!iov.iov_base || !iov.iov_len)
2643                         goto err;
2644
2645                 /* arbitrary limit, but we need something */
2646                 if (iov.iov_len > SZ_1G)
2647                         goto err;
2648
2649                 ubuf = (unsigned long) iov.iov_base;
2650                 end = (ubuf + iov.iov_len + PAGE_SIZE - 1) >> PAGE_SHIFT;
2651                 start = ubuf >> PAGE_SHIFT;
2652                 nr_pages = end - start;
2653
2654                 if (ctx->account_mem) {
2655                         ret = io_account_mem(ctx->user, nr_pages);
2656                         if (ret)
2657                                 goto err;
2658                 }
2659
2660                 ret = 0;
2661                 if (!pages || nr_pages > got_pages) {
2662                         kfree(vmas);
2663                         kfree(pages);
2664                         pages = kvmalloc_array(nr_pages, sizeof(struct page *),
2665                                                 GFP_KERNEL);
2666                         vmas = kvmalloc_array(nr_pages,
2667                                         sizeof(struct vm_area_struct *),
2668                                         GFP_KERNEL);
2669                         if (!pages || !vmas) {
2670                                 ret = -ENOMEM;
2671                                 if (ctx->account_mem)
2672                                         io_unaccount_mem(ctx->user, nr_pages);
2673                                 goto err;
2674                         }
2675                         got_pages = nr_pages;
2676                 }
2677
2678                 imu->bvec = kvmalloc_array(nr_pages, sizeof(struct bio_vec),
2679                                                 GFP_KERNEL);
2680                 ret = -ENOMEM;
2681                 if (!imu->bvec) {
2682                         if (ctx->account_mem)
2683                                 io_unaccount_mem(ctx->user, nr_pages);
2684                         goto err;
2685                 }
2686
2687                 ret = 0;
2688                 down_read(&current->mm->mmap_sem);
2689                 pret = get_user_pages_longterm(ubuf, nr_pages, FOLL_WRITE,
2690                                                 pages, vmas);
2691                 if (pret == nr_pages) {
2692                         /* don't support file backed memory */
2693                         for (j = 0; j < nr_pages; j++) {
2694                                 struct vm_area_struct *vma = vmas[j];
2695
2696                                 if (vma->vm_file &&
2697                                     !is_file_hugepages(vma->vm_file)) {
2698                                         ret = -EOPNOTSUPP;
2699                                         break;
2700                                 }
2701                         }
2702                 } else {
2703                         ret = pret < 0 ? pret : -EFAULT;
2704                 }
2705                 up_read(&current->mm->mmap_sem);
2706                 if (ret) {
2707                         /*
2708                          * if we did partial map, or found file backed vmas,
2709                          * release any pages we did get
2710                          */
2711                         if (pret > 0) {
2712                                 for (j = 0; j < pret; j++)
2713                                         put_page(pages[j]);
2714                         }
2715                         if (ctx->account_mem)
2716                                 io_unaccount_mem(ctx->user, nr_pages);
2717                         kvfree(imu->bvec);
2718                         goto err;
2719                 }
2720
2721                 off = ubuf & ~PAGE_MASK;
2722                 size = iov.iov_len;
2723                 for (j = 0; j < nr_pages; j++) {
2724                         size_t vec_len;
2725
2726                         vec_len = min_t(size_t, size, PAGE_SIZE - off);
2727                         imu->bvec[j].bv_page = pages[j];
2728                         imu->bvec[j].bv_len = vec_len;
2729                         imu->bvec[j].bv_offset = off;
2730                         off = 0;
2731                         size -= vec_len;
2732                 }
2733                 /* store original address for later verification */
2734                 imu->ubuf = ubuf;
2735                 imu->len = iov.iov_len;
2736                 imu->nr_bvecs = nr_pages;
2737
2738                 ctx->nr_user_bufs++;
2739         }
2740         kvfree(pages);
2741         kvfree(vmas);
2742         return 0;
2743 err:
2744         kvfree(pages);
2745         kvfree(vmas);
2746         io_sqe_buffer_unregister(ctx);
2747         return ret;
2748 }
2749
2750 static int io_eventfd_register(struct io_ring_ctx *ctx, void __user *arg)
2751 {
2752         __s32 __user *fds = arg;
2753         int fd;
2754
2755         if (ctx->cq_ev_fd)
2756                 return -EBUSY;
2757
2758         if (copy_from_user(&fd, fds, sizeof(*fds)))
2759                 return -EFAULT;
2760
2761         ctx->cq_ev_fd = eventfd_ctx_fdget(fd);
2762         if (IS_ERR(ctx->cq_ev_fd)) {
2763                 int ret = PTR_ERR(ctx->cq_ev_fd);
2764                 ctx->cq_ev_fd = NULL;
2765                 return ret;
2766         }
2767
2768         return 0;
2769 }
2770
2771 static int io_eventfd_unregister(struct io_ring_ctx *ctx)
2772 {
2773         if (ctx->cq_ev_fd) {
2774                 eventfd_ctx_put(ctx->cq_ev_fd);
2775                 ctx->cq_ev_fd = NULL;
2776                 return 0;
2777         }
2778
2779         return -ENXIO;
2780 }
2781
2782 static void io_ring_ctx_free(struct io_ring_ctx *ctx)
2783 {
2784         io_finish_async(ctx);
2785         if (ctx->sqo_mm)
2786                 mmdrop(ctx->sqo_mm);
2787
2788         io_iopoll_reap_events(ctx);
2789         io_sqe_buffer_unregister(ctx);
2790         io_sqe_files_unregister(ctx);
2791         io_eventfd_unregister(ctx);
2792
2793 #if defined(CONFIG_UNIX)
2794         if (ctx->ring_sock)
2795                 sock_release(ctx->ring_sock);
2796 #endif
2797
2798         io_mem_free(ctx->sq_ring);
2799         io_mem_free(ctx->sq_sqes);
2800         io_mem_free(ctx->cq_ring);
2801
2802         percpu_ref_exit(&ctx->refs);
2803         if (ctx->account_mem)
2804                 io_unaccount_mem(ctx->user,
2805                                 ring_pages(ctx->sq_entries, ctx->cq_entries));
2806         free_uid(ctx->user);
2807         kfree(ctx);
2808 }
2809
2810 static __poll_t io_uring_poll(struct file *file, poll_table *wait)
2811 {
2812         struct io_ring_ctx *ctx = file->private_data;
2813         __poll_t mask = 0;
2814
2815         poll_wait(file, &ctx->cq_wait, wait);
2816         /*
2817          * synchronizes with barrier from wq_has_sleeper call in
2818          * io_commit_cqring
2819          */
2820         smp_rmb();
2821         if (READ_ONCE(ctx->sq_ring->r.tail) - ctx->cached_sq_head !=
2822             ctx->sq_ring->ring_entries)
2823                 mask |= EPOLLOUT | EPOLLWRNORM;
2824         if (READ_ONCE(ctx->cq_ring->r.head) != ctx->cached_cq_tail)
2825                 mask |= EPOLLIN | EPOLLRDNORM;
2826
2827         return mask;
2828 }
2829
2830 static int io_uring_fasync(int fd, struct file *file, int on)
2831 {
2832         struct io_ring_ctx *ctx = file->private_data;
2833
2834         return fasync_helper(fd, file, on, &ctx->cq_fasync);
2835 }
2836
2837 static void io_ring_ctx_wait_and_kill(struct io_ring_ctx *ctx)
2838 {
2839         mutex_lock(&ctx->uring_lock);
2840         percpu_ref_kill(&ctx->refs);
2841         mutex_unlock(&ctx->uring_lock);
2842
2843         io_poll_remove_all(ctx);
2844         io_iopoll_reap_events(ctx);
2845         wait_for_completion(&ctx->ctx_done);
2846         io_ring_ctx_free(ctx);
2847 }
2848
2849 static int io_uring_release(struct inode *inode, struct file *file)
2850 {
2851         struct io_ring_ctx *ctx = file->private_data;
2852
2853         file->private_data = NULL;
2854         io_ring_ctx_wait_and_kill(ctx);
2855         return 0;
2856 }
2857
2858 static int io_uring_mmap(struct file *file, struct vm_area_struct *vma)
2859 {
2860         loff_t offset = (loff_t) vma->vm_pgoff << PAGE_SHIFT;
2861         unsigned long sz = vma->vm_end - vma->vm_start;
2862         struct io_ring_ctx *ctx = file->private_data;
2863         unsigned long pfn;
2864         struct page *page;
2865         void *ptr;
2866
2867         switch (offset) {
2868         case IORING_OFF_SQ_RING:
2869                 ptr = ctx->sq_ring;
2870                 break;
2871         case IORING_OFF_SQES:
2872                 ptr = ctx->sq_sqes;
2873                 break;
2874         case IORING_OFF_CQ_RING:
2875                 ptr = ctx->cq_ring;
2876                 break;
2877         default:
2878                 return -EINVAL;
2879         }
2880
2881         page = virt_to_head_page(ptr);
2882         if (sz > (PAGE_SIZE << compound_order(page)))
2883                 return -EINVAL;
2884
2885         pfn = virt_to_phys(ptr) >> PAGE_SHIFT;
2886         return remap_pfn_range(vma, vma->vm_start, pfn, sz, vma->vm_page_prot);
2887 }
2888
2889 SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit,
2890                 u32, min_complete, u32, flags, const sigset_t __user *, sig,
2891                 size_t, sigsz)
2892 {
2893         struct io_ring_ctx *ctx;
2894         long ret = -EBADF;
2895         int submitted = 0;
2896         struct fd f;
2897
2898         if (flags & ~(IORING_ENTER_GETEVENTS | IORING_ENTER_SQ_WAKEUP))
2899                 return -EINVAL;
2900
2901         f = fdget(fd);
2902         if (!f.file)
2903                 return -EBADF;
2904
2905         ret = -EOPNOTSUPP;
2906         if (f.file->f_op != &io_uring_fops)
2907                 goto out_fput;
2908
2909         ret = -ENXIO;
2910         ctx = f.file->private_data;
2911         if (!percpu_ref_tryget(&ctx->refs))
2912                 goto out_fput;
2913
2914         /*
2915          * For SQ polling, the thread will do all submissions and completions.
2916          * Just return the requested submit count, and wake the thread if
2917          * we were asked to.
2918          */
2919         if (ctx->flags & IORING_SETUP_SQPOLL) {
2920                 if (flags & IORING_ENTER_SQ_WAKEUP)
2921                         wake_up(&ctx->sqo_wait);
2922                 submitted = to_submit;
2923                 goto out_ctx;
2924         }
2925
2926         ret = 0;
2927         if (to_submit) {
2928                 to_submit = min(to_submit, ctx->sq_entries);
2929
2930                 mutex_lock(&ctx->uring_lock);
2931                 submitted = io_ring_submit(ctx, to_submit);
2932                 mutex_unlock(&ctx->uring_lock);
2933         }
2934         if (flags & IORING_ENTER_GETEVENTS) {
2935                 unsigned nr_events = 0;
2936
2937                 min_complete = min(min_complete, ctx->cq_entries);
2938
2939                 if (ctx->flags & IORING_SETUP_IOPOLL) {
2940                         mutex_lock(&ctx->uring_lock);
2941                         ret = io_iopoll_check(ctx, &nr_events, min_complete);
2942                         mutex_unlock(&ctx->uring_lock);
2943                 } else {
2944                         ret = io_cqring_wait(ctx, min_complete, sig, sigsz);
2945                 }
2946         }
2947
2948 out_ctx:
2949         io_ring_drop_ctx_refs(ctx, 1);
2950 out_fput:
2951         fdput(f);
2952         return submitted ? submitted : ret;
2953 }
2954
2955 static const struct file_operations io_uring_fops = {
2956         .release        = io_uring_release,
2957         .mmap           = io_uring_mmap,
2958         .poll           = io_uring_poll,
2959         .fasync         = io_uring_fasync,
2960 };
2961
2962 static int io_allocate_scq_urings(struct io_ring_ctx *ctx,
2963                                   struct io_uring_params *p)
2964 {
2965         struct io_sq_ring *sq_ring;
2966         struct io_cq_ring *cq_ring;
2967         size_t size;
2968
2969         sq_ring = io_mem_alloc(struct_size(sq_ring, array, p->sq_entries));
2970         if (!sq_ring)
2971                 return -ENOMEM;
2972
2973         ctx->sq_ring = sq_ring;
2974         sq_ring->ring_mask = p->sq_entries - 1;
2975         sq_ring->ring_entries = p->sq_entries;
2976         ctx->sq_mask = sq_ring->ring_mask;
2977         ctx->sq_entries = sq_ring->ring_entries;
2978
2979         size = array_size(sizeof(struct io_uring_sqe), p->sq_entries);
2980         if (size == SIZE_MAX)
2981                 return -EOVERFLOW;
2982
2983         ctx->sq_sqes = io_mem_alloc(size);
2984         if (!ctx->sq_sqes)
2985                 return -ENOMEM;
2986
2987         cq_ring = io_mem_alloc(struct_size(cq_ring, cqes, p->cq_entries));
2988         if (!cq_ring)
2989                 return -ENOMEM;
2990
2991         ctx->cq_ring = cq_ring;
2992         cq_ring->ring_mask = p->cq_entries - 1;
2993         cq_ring->ring_entries = p->cq_entries;
2994         ctx->cq_mask = cq_ring->ring_mask;
2995         ctx->cq_entries = cq_ring->ring_entries;
2996         return 0;
2997 }
2998
2999 /*
3000  * Allocate an anonymous fd, this is what constitutes the application
3001  * visible backing of an io_uring instance. The application mmaps this
3002  * fd to gain access to the SQ/CQ ring details. If UNIX sockets are enabled,
3003  * we have to tie this fd to a socket for file garbage collection purposes.
3004  */
3005 static int io_uring_get_fd(struct io_ring_ctx *ctx)
3006 {
3007         struct file *file;
3008         int ret;
3009
3010 #if defined(CONFIG_UNIX)
3011         ret = sock_create_kern(&init_net, PF_UNIX, SOCK_RAW, IPPROTO_IP,
3012                                 &ctx->ring_sock);
3013         if (ret)
3014                 return ret;
3015 #endif
3016
3017         ret = get_unused_fd_flags(O_RDWR | O_CLOEXEC);
3018         if (ret < 0)
3019                 goto err;
3020
3021         file = anon_inode_getfile("[io_uring]", &io_uring_fops, ctx,
3022                                         O_RDWR | O_CLOEXEC);
3023         if (IS_ERR(file)) {
3024                 put_unused_fd(ret);
3025                 ret = PTR_ERR(file);
3026                 goto err;
3027         }
3028
3029 #if defined(CONFIG_UNIX)
3030         ctx->ring_sock->file = file;
3031         ctx->ring_sock->sk->sk_user_data = ctx;
3032 #endif
3033         fd_install(ret, file);
3034         return ret;
3035 err:
3036 #if defined(CONFIG_UNIX)
3037         sock_release(ctx->ring_sock);
3038         ctx->ring_sock = NULL;
3039 #endif
3040         return ret;
3041 }
3042
3043 static int io_uring_create(unsigned entries, struct io_uring_params *p)
3044 {
3045         struct user_struct *user = NULL;
3046         struct io_ring_ctx *ctx;
3047         bool account_mem;
3048         int ret;
3049
3050         if (!entries || entries > IORING_MAX_ENTRIES)
3051                 return -EINVAL;
3052
3053         /*
3054          * Use twice as many entries for the CQ ring. It's possible for the
3055          * application to drive a higher depth than the size of the SQ ring,
3056          * since the sqes are only used at submission time. This allows for
3057          * some flexibility in overcommitting a bit.
3058          */
3059         p->sq_entries = roundup_pow_of_two(entries);
3060         p->cq_entries = 2 * p->sq_entries;
3061
3062         user = get_uid(current_user());
3063         account_mem = !capable(CAP_IPC_LOCK);
3064
3065         if (account_mem) {
3066                 ret = io_account_mem(user,
3067                                 ring_pages(p->sq_entries, p->cq_entries));
3068                 if (ret) {
3069                         free_uid(user);
3070                         return ret;
3071                 }
3072         }
3073
3074         ctx = io_ring_ctx_alloc(p);
3075         if (!ctx) {
3076                 if (account_mem)
3077                         io_unaccount_mem(user, ring_pages(p->sq_entries,
3078                                                                 p->cq_entries));
3079                 free_uid(user);
3080                 return -ENOMEM;
3081         }
3082         ctx->compat = in_compat_syscall();
3083         ctx->account_mem = account_mem;
3084         ctx->user = user;
3085
3086         ret = io_allocate_scq_urings(ctx, p);
3087         if (ret)
3088                 goto err;
3089
3090         ret = io_sq_offload_start(ctx, p);
3091         if (ret)
3092                 goto err;
3093
3094         ret = io_uring_get_fd(ctx);
3095         if (ret < 0)
3096                 goto err;
3097
3098         memset(&p->sq_off, 0, sizeof(p->sq_off));
3099         p->sq_off.head = offsetof(struct io_sq_ring, r.head);
3100         p->sq_off.tail = offsetof(struct io_sq_ring, r.tail);
3101         p->sq_off.ring_mask = offsetof(struct io_sq_ring, ring_mask);
3102         p->sq_off.ring_entries = offsetof(struct io_sq_ring, ring_entries);
3103         p->sq_off.flags = offsetof(struct io_sq_ring, flags);
3104         p->sq_off.dropped = offsetof(struct io_sq_ring, dropped);
3105         p->sq_off.array = offsetof(struct io_sq_ring, array);
3106
3107         memset(&p->cq_off, 0, sizeof(p->cq_off));
3108         p->cq_off.head = offsetof(struct io_cq_ring, r.head);
3109         p->cq_off.tail = offsetof(struct io_cq_ring, r.tail);
3110         p->cq_off.ring_mask = offsetof(struct io_cq_ring, ring_mask);
3111         p->cq_off.ring_entries = offsetof(struct io_cq_ring, ring_entries);
3112         p->cq_off.overflow = offsetof(struct io_cq_ring, overflow);
3113         p->cq_off.cqes = offsetof(struct io_cq_ring, cqes);
3114         return ret;
3115 err:
3116         io_ring_ctx_wait_and_kill(ctx);
3117         return ret;
3118 }
3119
3120 /*
3121  * Sets up an aio uring context, and returns the fd. Applications asks for a
3122  * ring size, we return the actual sq/cq ring sizes (among other things) in the
3123  * params structure passed in.
3124  */
3125 static long io_uring_setup(u32 entries, struct io_uring_params __user *params)
3126 {
3127         struct io_uring_params p;
3128         long ret;
3129         int i;
3130
3131         if (copy_from_user(&p, params, sizeof(p)))
3132                 return -EFAULT;
3133         for (i = 0; i < ARRAY_SIZE(p.resv); i++) {
3134                 if (p.resv[i])
3135                         return -EINVAL;
3136         }
3137
3138         if (p.flags & ~(IORING_SETUP_IOPOLL | IORING_SETUP_SQPOLL |
3139                         IORING_SETUP_SQ_AFF))
3140                 return -EINVAL;
3141
3142         ret = io_uring_create(entries, &p);
3143         if (ret < 0)
3144                 return ret;
3145
3146         if (copy_to_user(params, &p, sizeof(p)))
3147                 return -EFAULT;
3148
3149         return ret;
3150 }
3151
3152 SYSCALL_DEFINE2(io_uring_setup, u32, entries,
3153                 struct io_uring_params __user *, params)
3154 {
3155         return io_uring_setup(entries, params);
3156 }
3157
3158 static int __io_uring_register(struct io_ring_ctx *ctx, unsigned opcode,
3159                                void __user *arg, unsigned nr_args)
3160         __releases(ctx->uring_lock)
3161         __acquires(ctx->uring_lock)
3162 {
3163         int ret;
3164
3165         /*
3166          * We're inside the ring mutex, if the ref is already dying, then
3167          * someone else killed the ctx or is already going through
3168          * io_uring_register().
3169          */
3170         if (percpu_ref_is_dying(&ctx->refs))
3171                 return -ENXIO;
3172
3173         percpu_ref_kill(&ctx->refs);
3174
3175         /*
3176          * Drop uring mutex before waiting for references to exit. If another
3177          * thread is currently inside io_uring_enter() it might need to grab
3178          * the uring_lock to make progress. If we hold it here across the drain
3179          * wait, then we can deadlock. It's safe to drop the mutex here, since
3180          * no new references will come in after we've killed the percpu ref.
3181          */
3182         mutex_unlock(&ctx->uring_lock);
3183         wait_for_completion(&ctx->ctx_done);
3184         mutex_lock(&ctx->uring_lock);
3185
3186         switch (opcode) {
3187         case IORING_REGISTER_BUFFERS:
3188                 ret = io_sqe_buffer_register(ctx, arg, nr_args);
3189                 break;
3190         case IORING_UNREGISTER_BUFFERS:
3191                 ret = -EINVAL;
3192                 if (arg || nr_args)
3193                         break;
3194                 ret = io_sqe_buffer_unregister(ctx);
3195                 break;
3196         case IORING_REGISTER_FILES:
3197                 ret = io_sqe_files_register(ctx, arg, nr_args);
3198                 break;
3199         case IORING_UNREGISTER_FILES:
3200                 ret = -EINVAL;
3201                 if (arg || nr_args)
3202                         break;
3203                 ret = io_sqe_files_unregister(ctx);
3204                 break;
3205         case IORING_REGISTER_EVENTFD:
3206                 ret = -EINVAL;
3207                 if (nr_args != 1)
3208                         break;
3209                 ret = io_eventfd_register(ctx, arg);
3210                 break;
3211         case IORING_UNREGISTER_EVENTFD:
3212                 ret = -EINVAL;
3213                 if (arg || nr_args)
3214                         break;
3215                 ret = io_eventfd_unregister(ctx);
3216                 break;
3217         default:
3218                 ret = -EINVAL;
3219                 break;
3220         }
3221
3222         /* bring the ctx back to life */
3223         reinit_completion(&ctx->ctx_done);
3224         percpu_ref_reinit(&ctx->refs);
3225         return ret;
3226 }
3227
3228 SYSCALL_DEFINE4(io_uring_register, unsigned int, fd, unsigned int, opcode,
3229                 void __user *, arg, unsigned int, nr_args)
3230 {
3231         struct io_ring_ctx *ctx;
3232         long ret = -EBADF;
3233         struct fd f;
3234
3235         f = fdget(fd);
3236         if (!f.file)
3237                 return -EBADF;
3238
3239         ret = -EOPNOTSUPP;
3240         if (f.file->f_op != &io_uring_fops)
3241                 goto out_fput;
3242
3243         ctx = f.file->private_data;
3244
3245         mutex_lock(&ctx->uring_lock);
3246         ret = __io_uring_register(ctx, opcode, arg, nr_args);
3247         mutex_unlock(&ctx->uring_lock);
3248 out_fput:
3249         fdput(f);
3250         return ret;
3251 }
3252
3253 static int __init io_uring_init(void)
3254 {
3255         req_cachep = KMEM_CACHE(io_kiocb, SLAB_HWCACHE_ALIGN | SLAB_PANIC);
3256         return 0;
3257 };
3258 __initcall(io_uring_init);