OSDN Git Service

Merge branch 'core-urgent-for-linus' of git://git.kernel.org/pub/scm/linux/kernel...
[tomoyo/tomoyo-test1.git] / fs / io_uring.c
1 // SPDX-License-Identifier: GPL-2.0
2 /*
3  * Shared application/kernel submission and completion ring pairs, for
4  * supporting fast/efficient IO.
5  *
6  * A note on the read/write ordering memory barriers that are matched between
7  * the application and kernel side.
8  *
9  * After the application reads the CQ ring tail, it must use an
10  * appropriate smp_rmb() to pair with the smp_wmb() the kernel uses
11  * before writing the tail (using smp_load_acquire to read the tail will
12  * do). It also needs a smp_mb() before updating CQ head (ordering the
13  * entry load(s) with the head store), pairing with an implicit barrier
14  * through a control-dependency in io_get_cqring (smp_store_release to
15  * store head will do). Failure to do so could lead to reading invalid
16  * CQ entries.
17  *
18  * Likewise, the application must use an appropriate smp_wmb() before
19  * writing the SQ tail (ordering SQ entry stores with the tail store),
20  * which pairs with smp_load_acquire in io_get_sqring (smp_store_release
21  * to store the tail will do). And it needs a barrier ordering the SQ
22  * head load before writing new SQ entries (smp_load_acquire to read
23  * head will do).
24  *
25  * When using the SQ poll thread (IORING_SETUP_SQPOLL), the application
26  * needs to check the SQ flags for IORING_SQ_NEED_WAKEUP *after*
27  * updating the SQ tail; a full memory barrier smp_mb() is needed
28  * between.
29  *
30  * Also see the examples in the liburing library:
31  *
32  *      git://git.kernel.dk/liburing
33  *
34  * io_uring also uses READ/WRITE_ONCE() for _any_ store or load that happens
35  * from data shared between the kernel and application. This is done both
36  * for ordering purposes, but also to ensure that once a value is loaded from
37  * data that the application could potentially modify, it remains stable.
38  *
39  * Copyright (C) 2018-2019 Jens Axboe
40  * Copyright (c) 2018-2019 Christoph Hellwig
41  */
42 #include <linux/kernel.h>
43 #include <linux/init.h>
44 #include <linux/errno.h>
45 #include <linux/syscalls.h>
46 #include <linux/compat.h>
47 #include <linux/refcount.h>
48 #include <linux/uio.h>
49
50 #include <linux/sched/signal.h>
51 #include <linux/fs.h>
52 #include <linux/file.h>
53 #include <linux/fdtable.h>
54 #include <linux/mm.h>
55 #include <linux/mman.h>
56 #include <linux/mmu_context.h>
57 #include <linux/percpu.h>
58 #include <linux/slab.h>
59 #include <linux/workqueue.h>
60 #include <linux/kthread.h>
61 #include <linux/blkdev.h>
62 #include <linux/bvec.h>
63 #include <linux/net.h>
64 #include <net/sock.h>
65 #include <net/af_unix.h>
66 #include <net/scm.h>
67 #include <linux/anon_inodes.h>
68 #include <linux/sched/mm.h>
69 #include <linux/uaccess.h>
70 #include <linux/nospec.h>
71 #include <linux/sizes.h>
72 #include <linux/hugetlb.h>
73
74 #include <uapi/linux/io_uring.h>
75
76 #include "internal.h"
77
78 #define IORING_MAX_ENTRIES      4096
79 #define IORING_MAX_FIXED_FILES  1024
80
81 struct io_uring {
82         u32 head ____cacheline_aligned_in_smp;
83         u32 tail ____cacheline_aligned_in_smp;
84 };
85
86 /*
87  * This data is shared with the application through the mmap at offset
88  * IORING_OFF_SQ_RING.
89  *
90  * The offsets to the member fields are published through struct
91  * io_sqring_offsets when calling io_uring_setup.
92  */
93 struct io_sq_ring {
94         /*
95          * Head and tail offsets into the ring; the offsets need to be
96          * masked to get valid indices.
97          *
98          * The kernel controls head and the application controls tail.
99          */
100         struct io_uring         r;
101         /*
102          * Bitmask to apply to head and tail offsets (constant, equals
103          * ring_entries - 1)
104          */
105         u32                     ring_mask;
106         /* Ring size (constant, power of 2) */
107         u32                     ring_entries;
108         /*
109          * Number of invalid entries dropped by the kernel due to
110          * invalid index stored in array
111          *
112          * Written by the kernel, shouldn't be modified by the
113          * application (i.e. get number of "new events" by comparing to
114          * cached value).
115          *
116          * After a new SQ head value was read by the application this
117          * counter includes all submissions that were dropped reaching
118          * the new SQ head (and possibly more).
119          */
120         u32                     dropped;
121         /*
122          * Runtime flags
123          *
124          * Written by the kernel, shouldn't be modified by the
125          * application.
126          *
127          * The application needs a full memory barrier before checking
128          * for IORING_SQ_NEED_WAKEUP after updating the sq tail.
129          */
130         u32                     flags;
131         /*
132          * Ring buffer of indices into array of io_uring_sqe, which is
133          * mmapped by the application using the IORING_OFF_SQES offset.
134          *
135          * This indirection could e.g. be used to assign fixed
136          * io_uring_sqe entries to operations and only submit them to
137          * the queue when needed.
138          *
139          * The kernel modifies neither the indices array nor the entries
140          * array.
141          */
142         u32                     array[];
143 };
144
145 /*
146  * This data is shared with the application through the mmap at offset
147  * IORING_OFF_CQ_RING.
148  *
149  * The offsets to the member fields are published through struct
150  * io_cqring_offsets when calling io_uring_setup.
151  */
152 struct io_cq_ring {
153         /*
154          * Head and tail offsets into the ring; the offsets need to be
155          * masked to get valid indices.
156          *
157          * The application controls head and the kernel tail.
158          */
159         struct io_uring         r;
160         /*
161          * Bitmask to apply to head and tail offsets (constant, equals
162          * ring_entries - 1)
163          */
164         u32                     ring_mask;
165         /* Ring size (constant, power of 2) */
166         u32                     ring_entries;
167         /*
168          * Number of completion events lost because the queue was full;
169          * this should be avoided by the application by making sure
170          * there are not more requests pending thatn there is space in
171          * the completion queue.
172          *
173          * Written by the kernel, shouldn't be modified by the
174          * application (i.e. get number of "new events" by comparing to
175          * cached value).
176          *
177          * As completion events come in out of order this counter is not
178          * ordered with any other data.
179          */
180         u32                     overflow;
181         /*
182          * Ring buffer of completion events.
183          *
184          * The kernel writes completion events fresh every time they are
185          * produced, so the application is allowed to modify pending
186          * entries.
187          */
188         struct io_uring_cqe     cqes[];
189 };
190
191 struct io_mapped_ubuf {
192         u64             ubuf;
193         size_t          len;
194         struct          bio_vec *bvec;
195         unsigned int    nr_bvecs;
196 };
197
198 struct async_list {
199         spinlock_t              lock;
200         atomic_t                cnt;
201         struct list_head        list;
202
203         struct file             *file;
204         off_t                   io_end;
205         size_t                  io_pages;
206 };
207
208 struct io_ring_ctx {
209         struct {
210                 struct percpu_ref       refs;
211         } ____cacheline_aligned_in_smp;
212
213         struct {
214                 unsigned int            flags;
215                 bool                    compat;
216                 bool                    account_mem;
217
218                 /* SQ ring */
219                 struct io_sq_ring       *sq_ring;
220                 unsigned                cached_sq_head;
221                 unsigned                sq_entries;
222                 unsigned                sq_mask;
223                 unsigned                sq_thread_idle;
224                 struct io_uring_sqe     *sq_sqes;
225
226                 struct list_head        defer_list;
227         } ____cacheline_aligned_in_smp;
228
229         /* IO offload */
230         struct workqueue_struct *sqo_wq;
231         struct task_struct      *sqo_thread;    /* if using sq thread polling */
232         struct mm_struct        *sqo_mm;
233         wait_queue_head_t       sqo_wait;
234
235         struct {
236                 /* CQ ring */
237                 struct io_cq_ring       *cq_ring;
238                 unsigned                cached_cq_tail;
239                 unsigned                cq_entries;
240                 unsigned                cq_mask;
241                 struct wait_queue_head  cq_wait;
242                 struct fasync_struct    *cq_fasync;
243                 struct eventfd_ctx      *cq_ev_fd;
244         } ____cacheline_aligned_in_smp;
245
246         /*
247          * If used, fixed file set. Writers must ensure that ->refs is dead,
248          * readers must ensure that ->refs is alive as long as the file* is
249          * used. Only updated through io_uring_register(2).
250          */
251         struct file             **user_files;
252         unsigned                nr_user_files;
253
254         /* if used, fixed mapped user buffers */
255         unsigned                nr_user_bufs;
256         struct io_mapped_ubuf   *user_bufs;
257
258         struct user_struct      *user;
259
260         struct completion       ctx_done;
261
262         struct {
263                 struct mutex            uring_lock;
264                 wait_queue_head_t       wait;
265         } ____cacheline_aligned_in_smp;
266
267         struct {
268                 spinlock_t              completion_lock;
269                 bool                    poll_multi_file;
270                 /*
271                  * ->poll_list is protected by the ctx->uring_lock for
272                  * io_uring instances that don't use IORING_SETUP_SQPOLL.
273                  * For SQPOLL, only the single threaded io_sq_thread() will
274                  * manipulate the list, hence no extra locking is needed there.
275                  */
276                 struct list_head        poll_list;
277                 struct list_head        cancel_list;
278         } ____cacheline_aligned_in_smp;
279
280         struct async_list       pending_async[2];
281
282 #if defined(CONFIG_UNIX)
283         struct socket           *ring_sock;
284 #endif
285 };
286
287 struct sqe_submit {
288         const struct io_uring_sqe       *sqe;
289         unsigned short                  index;
290         bool                            has_user;
291         bool                            needs_lock;
292         bool                            needs_fixed_file;
293 };
294
295 /*
296  * First field must be the file pointer in all the
297  * iocb unions! See also 'struct kiocb' in <linux/fs.h>
298  */
299 struct io_poll_iocb {
300         struct file                     *file;
301         struct wait_queue_head          *head;
302         __poll_t                        events;
303         bool                            done;
304         bool                            canceled;
305         struct wait_queue_entry         wait;
306 };
307
308 /*
309  * NOTE! Each of the iocb union members has the file pointer
310  * as the first entry in their struct definition. So you can
311  * access the file pointer through any of the sub-structs,
312  * or directly as just 'ki_filp' in this struct.
313  */
314 struct io_kiocb {
315         union {
316                 struct file             *file;
317                 struct kiocb            rw;
318                 struct io_poll_iocb     poll;
319         };
320
321         struct sqe_submit       submit;
322
323         struct io_ring_ctx      *ctx;
324         struct list_head        list;
325         unsigned int            flags;
326         refcount_t              refs;
327 #define REQ_F_NOWAIT            1       /* must not punt to workers */
328 #define REQ_F_IOPOLL_COMPLETED  2       /* polled IO has completed */
329 #define REQ_F_FIXED_FILE        4       /* ctx owns file */
330 #define REQ_F_SEQ_PREV          8       /* sequential with previous */
331 #define REQ_F_IO_DRAIN          16      /* drain existing IO first */
332 #define REQ_F_IO_DRAINED        32      /* drain done */
333         u64                     user_data;
334         u32                     error;  /* iopoll result from callback */
335         u32                     sequence;
336
337         struct work_struct      work;
338 };
339
340 #define IO_PLUG_THRESHOLD               2
341 #define IO_IOPOLL_BATCH                 8
342
343 struct io_submit_state {
344         struct blk_plug         plug;
345
346         /*
347          * io_kiocb alloc cache
348          */
349         void                    *reqs[IO_IOPOLL_BATCH];
350         unsigned                int free_reqs;
351         unsigned                int cur_req;
352
353         /*
354          * File reference cache
355          */
356         struct file             *file;
357         unsigned int            fd;
358         unsigned int            has_refs;
359         unsigned int            used_refs;
360         unsigned int            ios_left;
361 };
362
363 static void io_sq_wq_submit_work(struct work_struct *work);
364
365 static struct kmem_cache *req_cachep;
366
367 static const struct file_operations io_uring_fops;
368
369 struct sock *io_uring_get_socket(struct file *file)
370 {
371 #if defined(CONFIG_UNIX)
372         if (file->f_op == &io_uring_fops) {
373                 struct io_ring_ctx *ctx = file->private_data;
374
375                 return ctx->ring_sock->sk;
376         }
377 #endif
378         return NULL;
379 }
380 EXPORT_SYMBOL(io_uring_get_socket);
381
382 static void io_ring_ctx_ref_free(struct percpu_ref *ref)
383 {
384         struct io_ring_ctx *ctx = container_of(ref, struct io_ring_ctx, refs);
385
386         complete(&ctx->ctx_done);
387 }
388
389 static struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
390 {
391         struct io_ring_ctx *ctx;
392         int i;
393
394         ctx = kzalloc(sizeof(*ctx), GFP_KERNEL);
395         if (!ctx)
396                 return NULL;
397
398         if (percpu_ref_init(&ctx->refs, io_ring_ctx_ref_free, 0, GFP_KERNEL)) {
399                 kfree(ctx);
400                 return NULL;
401         }
402
403         ctx->flags = p->flags;
404         init_waitqueue_head(&ctx->cq_wait);
405         init_completion(&ctx->ctx_done);
406         mutex_init(&ctx->uring_lock);
407         init_waitqueue_head(&ctx->wait);
408         for (i = 0; i < ARRAY_SIZE(ctx->pending_async); i++) {
409                 spin_lock_init(&ctx->pending_async[i].lock);
410                 INIT_LIST_HEAD(&ctx->pending_async[i].list);
411                 atomic_set(&ctx->pending_async[i].cnt, 0);
412         }
413         spin_lock_init(&ctx->completion_lock);
414         INIT_LIST_HEAD(&ctx->poll_list);
415         INIT_LIST_HEAD(&ctx->cancel_list);
416         INIT_LIST_HEAD(&ctx->defer_list);
417         return ctx;
418 }
419
420 static inline bool io_sequence_defer(struct io_ring_ctx *ctx,
421                                      struct io_kiocb *req)
422 {
423         if ((req->flags & (REQ_F_IO_DRAIN|REQ_F_IO_DRAINED)) != REQ_F_IO_DRAIN)
424                 return false;
425
426         return req->sequence > ctx->cached_cq_tail + ctx->sq_ring->dropped;
427 }
428
429 static struct io_kiocb *io_get_deferred_req(struct io_ring_ctx *ctx)
430 {
431         struct io_kiocb *req;
432
433         if (list_empty(&ctx->defer_list))
434                 return NULL;
435
436         req = list_first_entry(&ctx->defer_list, struct io_kiocb, list);
437         if (!io_sequence_defer(ctx, req)) {
438                 list_del_init(&req->list);
439                 return req;
440         }
441
442         return NULL;
443 }
444
445 static void __io_commit_cqring(struct io_ring_ctx *ctx)
446 {
447         struct io_cq_ring *ring = ctx->cq_ring;
448
449         if (ctx->cached_cq_tail != READ_ONCE(ring->r.tail)) {
450                 /* order cqe stores with ring update */
451                 smp_store_release(&ring->r.tail, ctx->cached_cq_tail);
452
453                 if (wq_has_sleeper(&ctx->cq_wait)) {
454                         wake_up_interruptible(&ctx->cq_wait);
455                         kill_fasync(&ctx->cq_fasync, SIGIO, POLL_IN);
456                 }
457         }
458 }
459
460 static void io_commit_cqring(struct io_ring_ctx *ctx)
461 {
462         struct io_kiocb *req;
463
464         __io_commit_cqring(ctx);
465
466         while ((req = io_get_deferred_req(ctx)) != NULL) {
467                 req->flags |= REQ_F_IO_DRAINED;
468                 queue_work(ctx->sqo_wq, &req->work);
469         }
470 }
471
472 static struct io_uring_cqe *io_get_cqring(struct io_ring_ctx *ctx)
473 {
474         struct io_cq_ring *ring = ctx->cq_ring;
475         unsigned tail;
476
477         tail = ctx->cached_cq_tail;
478         /*
479          * writes to the cq entry need to come after reading head; the
480          * control dependency is enough as we're using WRITE_ONCE to
481          * fill the cq entry
482          */
483         if (tail - READ_ONCE(ring->r.head) == ring->ring_entries)
484                 return NULL;
485
486         ctx->cached_cq_tail++;
487         return &ring->cqes[tail & ctx->cq_mask];
488 }
489
490 static void io_cqring_fill_event(struct io_ring_ctx *ctx, u64 ki_user_data,
491                                  long res)
492 {
493         struct io_uring_cqe *cqe;
494
495         /*
496          * If we can't get a cq entry, userspace overflowed the
497          * submission (by quite a lot). Increment the overflow count in
498          * the ring.
499          */
500         cqe = io_get_cqring(ctx);
501         if (cqe) {
502                 WRITE_ONCE(cqe->user_data, ki_user_data);
503                 WRITE_ONCE(cqe->res, res);
504                 WRITE_ONCE(cqe->flags, 0);
505         } else {
506                 unsigned overflow = READ_ONCE(ctx->cq_ring->overflow);
507
508                 WRITE_ONCE(ctx->cq_ring->overflow, overflow + 1);
509         }
510 }
511
512 static void io_cqring_ev_posted(struct io_ring_ctx *ctx)
513 {
514         if (waitqueue_active(&ctx->wait))
515                 wake_up(&ctx->wait);
516         if (waitqueue_active(&ctx->sqo_wait))
517                 wake_up(&ctx->sqo_wait);
518         if (ctx->cq_ev_fd)
519                 eventfd_signal(ctx->cq_ev_fd, 1);
520 }
521
522 static void io_cqring_add_event(struct io_ring_ctx *ctx, u64 user_data,
523                                 long res)
524 {
525         unsigned long flags;
526
527         spin_lock_irqsave(&ctx->completion_lock, flags);
528         io_cqring_fill_event(ctx, user_data, res);
529         io_commit_cqring(ctx);
530         spin_unlock_irqrestore(&ctx->completion_lock, flags);
531
532         io_cqring_ev_posted(ctx);
533 }
534
535 static void io_ring_drop_ctx_refs(struct io_ring_ctx *ctx, unsigned refs)
536 {
537         percpu_ref_put_many(&ctx->refs, refs);
538
539         if (waitqueue_active(&ctx->wait))
540                 wake_up(&ctx->wait);
541 }
542
543 static struct io_kiocb *io_get_req(struct io_ring_ctx *ctx,
544                                    struct io_submit_state *state)
545 {
546         gfp_t gfp = GFP_KERNEL | __GFP_NOWARN;
547         struct io_kiocb *req;
548
549         if (!percpu_ref_tryget(&ctx->refs))
550                 return NULL;
551
552         if (!state) {
553                 req = kmem_cache_alloc(req_cachep, gfp);
554                 if (unlikely(!req))
555                         goto out;
556         } else if (!state->free_reqs) {
557                 size_t sz;
558                 int ret;
559
560                 sz = min_t(size_t, state->ios_left, ARRAY_SIZE(state->reqs));
561                 ret = kmem_cache_alloc_bulk(req_cachep, gfp, sz, state->reqs);
562
563                 /*
564                  * Bulk alloc is all-or-nothing. If we fail to get a batch,
565                  * retry single alloc to be on the safe side.
566                  */
567                 if (unlikely(ret <= 0)) {
568                         state->reqs[0] = kmem_cache_alloc(req_cachep, gfp);
569                         if (!state->reqs[0])
570                                 goto out;
571                         ret = 1;
572                 }
573                 state->free_reqs = ret - 1;
574                 state->cur_req = 1;
575                 req = state->reqs[0];
576         } else {
577                 req = state->reqs[state->cur_req];
578                 state->free_reqs--;
579                 state->cur_req++;
580         }
581
582         req->ctx = ctx;
583         req->flags = 0;
584         /* one is dropped after submission, the other at completion */
585         refcount_set(&req->refs, 2);
586         return req;
587 out:
588         io_ring_drop_ctx_refs(ctx, 1);
589         return NULL;
590 }
591
592 static void io_free_req_many(struct io_ring_ctx *ctx, void **reqs, int *nr)
593 {
594         if (*nr) {
595                 kmem_cache_free_bulk(req_cachep, *nr, reqs);
596                 io_ring_drop_ctx_refs(ctx, *nr);
597                 *nr = 0;
598         }
599 }
600
601 static void io_free_req(struct io_kiocb *req)
602 {
603         if (req->file && !(req->flags & REQ_F_FIXED_FILE))
604                 fput(req->file);
605         io_ring_drop_ctx_refs(req->ctx, 1);
606         kmem_cache_free(req_cachep, req);
607 }
608
609 static void io_put_req(struct io_kiocb *req)
610 {
611         if (refcount_dec_and_test(&req->refs))
612                 io_free_req(req);
613 }
614
615 /*
616  * Find and free completed poll iocbs
617  */
618 static void io_iopoll_complete(struct io_ring_ctx *ctx, unsigned int *nr_events,
619                                struct list_head *done)
620 {
621         void *reqs[IO_IOPOLL_BATCH];
622         struct io_kiocb *req;
623         int to_free;
624
625         to_free = 0;
626         while (!list_empty(done)) {
627                 req = list_first_entry(done, struct io_kiocb, list);
628                 list_del(&req->list);
629
630                 io_cqring_fill_event(ctx, req->user_data, req->error);
631                 (*nr_events)++;
632
633                 if (refcount_dec_and_test(&req->refs)) {
634                         /* If we're not using fixed files, we have to pair the
635                          * completion part with the file put. Use regular
636                          * completions for those, only batch free for fixed
637                          * file.
638                          */
639                         if (req->flags & REQ_F_FIXED_FILE) {
640                                 reqs[to_free++] = req;
641                                 if (to_free == ARRAY_SIZE(reqs))
642                                         io_free_req_many(ctx, reqs, &to_free);
643                         } else {
644                                 io_free_req(req);
645                         }
646                 }
647         }
648
649         io_commit_cqring(ctx);
650         io_free_req_many(ctx, reqs, &to_free);
651 }
652
653 static int io_do_iopoll(struct io_ring_ctx *ctx, unsigned int *nr_events,
654                         long min)
655 {
656         struct io_kiocb *req, *tmp;
657         LIST_HEAD(done);
658         bool spin;
659         int ret;
660
661         /*
662          * Only spin for completions if we don't have multiple devices hanging
663          * off our complete list, and we're under the requested amount.
664          */
665         spin = !ctx->poll_multi_file && *nr_events < min;
666
667         ret = 0;
668         list_for_each_entry_safe(req, tmp, &ctx->poll_list, list) {
669                 struct kiocb *kiocb = &req->rw;
670
671                 /*
672                  * Move completed entries to our local list. If we find a
673                  * request that requires polling, break out and complete
674                  * the done list first, if we have entries there.
675                  */
676                 if (req->flags & REQ_F_IOPOLL_COMPLETED) {
677                         list_move_tail(&req->list, &done);
678                         continue;
679                 }
680                 if (!list_empty(&done))
681                         break;
682
683                 ret = kiocb->ki_filp->f_op->iopoll(kiocb, spin);
684                 if (ret < 0)
685                         break;
686
687                 if (ret && spin)
688                         spin = false;
689                 ret = 0;
690         }
691
692         if (!list_empty(&done))
693                 io_iopoll_complete(ctx, nr_events, &done);
694
695         return ret;
696 }
697
698 /*
699  * Poll for a mininum of 'min' events. Note that if min == 0 we consider that a
700  * non-spinning poll check - we'll still enter the driver poll loop, but only
701  * as a non-spinning completion check.
702  */
703 static int io_iopoll_getevents(struct io_ring_ctx *ctx, unsigned int *nr_events,
704                                 long min)
705 {
706         while (!list_empty(&ctx->poll_list)) {
707                 int ret;
708
709                 ret = io_do_iopoll(ctx, nr_events, min);
710                 if (ret < 0)
711                         return ret;
712                 if (!min || *nr_events >= min)
713                         return 0;
714         }
715
716         return 1;
717 }
718
719 /*
720  * We can't just wait for polled events to come to us, we have to actively
721  * find and complete them.
722  */
723 static void io_iopoll_reap_events(struct io_ring_ctx *ctx)
724 {
725         if (!(ctx->flags & IORING_SETUP_IOPOLL))
726                 return;
727
728         mutex_lock(&ctx->uring_lock);
729         while (!list_empty(&ctx->poll_list)) {
730                 unsigned int nr_events = 0;
731
732                 io_iopoll_getevents(ctx, &nr_events, 1);
733         }
734         mutex_unlock(&ctx->uring_lock);
735 }
736
737 static int io_iopoll_check(struct io_ring_ctx *ctx, unsigned *nr_events,
738                            long min)
739 {
740         int ret = 0;
741
742         do {
743                 int tmin = 0;
744
745                 if (*nr_events < min)
746                         tmin = min - *nr_events;
747
748                 ret = io_iopoll_getevents(ctx, nr_events, tmin);
749                 if (ret <= 0)
750                         break;
751                 ret = 0;
752         } while (min && !*nr_events && !need_resched());
753
754         return ret;
755 }
756
757 static void kiocb_end_write(struct kiocb *kiocb)
758 {
759         if (kiocb->ki_flags & IOCB_WRITE) {
760                 struct inode *inode = file_inode(kiocb->ki_filp);
761
762                 /*
763                  * Tell lockdep we inherited freeze protection from submission
764                  * thread.
765                  */
766                 if (S_ISREG(inode->i_mode))
767                         __sb_writers_acquired(inode->i_sb, SB_FREEZE_WRITE);
768                 file_end_write(kiocb->ki_filp);
769         }
770 }
771
772 static void io_complete_rw(struct kiocb *kiocb, long res, long res2)
773 {
774         struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw);
775
776         kiocb_end_write(kiocb);
777
778         io_cqring_add_event(req->ctx, req->user_data, res);
779         io_put_req(req);
780 }
781
782 static void io_complete_rw_iopoll(struct kiocb *kiocb, long res, long res2)
783 {
784         struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw);
785
786         kiocb_end_write(kiocb);
787
788         req->error = res;
789         if (res != -EAGAIN)
790                 req->flags |= REQ_F_IOPOLL_COMPLETED;
791 }
792
793 /*
794  * After the iocb has been issued, it's safe to be found on the poll list.
795  * Adding the kiocb to the list AFTER submission ensures that we don't
796  * find it from a io_iopoll_getevents() thread before the issuer is done
797  * accessing the kiocb cookie.
798  */
799 static void io_iopoll_req_issued(struct io_kiocb *req)
800 {
801         struct io_ring_ctx *ctx = req->ctx;
802
803         /*
804          * Track whether we have multiple files in our lists. This will impact
805          * how we do polling eventually, not spinning if we're on potentially
806          * different devices.
807          */
808         if (list_empty(&ctx->poll_list)) {
809                 ctx->poll_multi_file = false;
810         } else if (!ctx->poll_multi_file) {
811                 struct io_kiocb *list_req;
812
813                 list_req = list_first_entry(&ctx->poll_list, struct io_kiocb,
814                                                 list);
815                 if (list_req->rw.ki_filp != req->rw.ki_filp)
816                         ctx->poll_multi_file = true;
817         }
818
819         /*
820          * For fast devices, IO may have already completed. If it has, add
821          * it to the front so we find it first.
822          */
823         if (req->flags & REQ_F_IOPOLL_COMPLETED)
824                 list_add(&req->list, &ctx->poll_list);
825         else
826                 list_add_tail(&req->list, &ctx->poll_list);
827 }
828
829 static void io_file_put(struct io_submit_state *state)
830 {
831         if (state->file) {
832                 int diff = state->has_refs - state->used_refs;
833
834                 if (diff)
835                         fput_many(state->file, diff);
836                 state->file = NULL;
837         }
838 }
839
840 /*
841  * Get as many references to a file as we have IOs left in this submission,
842  * assuming most submissions are for one file, or at least that each file
843  * has more than one submission.
844  */
845 static struct file *io_file_get(struct io_submit_state *state, int fd)
846 {
847         if (!state)
848                 return fget(fd);
849
850         if (state->file) {
851                 if (state->fd == fd) {
852                         state->used_refs++;
853                         state->ios_left--;
854                         return state->file;
855                 }
856                 io_file_put(state);
857         }
858         state->file = fget_many(fd, state->ios_left);
859         if (!state->file)
860                 return NULL;
861
862         state->fd = fd;
863         state->has_refs = state->ios_left;
864         state->used_refs = 1;
865         state->ios_left--;
866         return state->file;
867 }
868
869 /*
870  * If we tracked the file through the SCM inflight mechanism, we could support
871  * any file. For now, just ensure that anything potentially problematic is done
872  * inline.
873  */
874 static bool io_file_supports_async(struct file *file)
875 {
876         umode_t mode = file_inode(file)->i_mode;
877
878         if (S_ISBLK(mode) || S_ISCHR(mode))
879                 return true;
880         if (S_ISREG(mode) && file->f_op != &io_uring_fops)
881                 return true;
882
883         return false;
884 }
885
886 static int io_prep_rw(struct io_kiocb *req, const struct sqe_submit *s,
887                       bool force_nonblock)
888 {
889         const struct io_uring_sqe *sqe = s->sqe;
890         struct io_ring_ctx *ctx = req->ctx;
891         struct kiocb *kiocb = &req->rw;
892         unsigned ioprio;
893         int ret;
894
895         if (!req->file)
896                 return -EBADF;
897
898         if (force_nonblock && !io_file_supports_async(req->file))
899                 force_nonblock = false;
900
901         kiocb->ki_pos = READ_ONCE(sqe->off);
902         kiocb->ki_flags = iocb_flags(kiocb->ki_filp);
903         kiocb->ki_hint = ki_hint_validate(file_write_hint(kiocb->ki_filp));
904
905         ioprio = READ_ONCE(sqe->ioprio);
906         if (ioprio) {
907                 ret = ioprio_check_cap(ioprio);
908                 if (ret)
909                         return ret;
910
911                 kiocb->ki_ioprio = ioprio;
912         } else
913                 kiocb->ki_ioprio = get_current_ioprio();
914
915         ret = kiocb_set_rw_flags(kiocb, READ_ONCE(sqe->rw_flags));
916         if (unlikely(ret))
917                 return ret;
918
919         /* don't allow async punt if RWF_NOWAIT was requested */
920         if (kiocb->ki_flags & IOCB_NOWAIT)
921                 req->flags |= REQ_F_NOWAIT;
922
923         if (force_nonblock)
924                 kiocb->ki_flags |= IOCB_NOWAIT;
925
926         if (ctx->flags & IORING_SETUP_IOPOLL) {
927                 if (!(kiocb->ki_flags & IOCB_DIRECT) ||
928                     !kiocb->ki_filp->f_op->iopoll)
929                         return -EOPNOTSUPP;
930
931                 req->error = 0;
932                 kiocb->ki_flags |= IOCB_HIPRI;
933                 kiocb->ki_complete = io_complete_rw_iopoll;
934         } else {
935                 if (kiocb->ki_flags & IOCB_HIPRI)
936                         return -EINVAL;
937                 kiocb->ki_complete = io_complete_rw;
938         }
939         return 0;
940 }
941
942 static inline void io_rw_done(struct kiocb *kiocb, ssize_t ret)
943 {
944         switch (ret) {
945         case -EIOCBQUEUED:
946                 break;
947         case -ERESTARTSYS:
948         case -ERESTARTNOINTR:
949         case -ERESTARTNOHAND:
950         case -ERESTART_RESTARTBLOCK:
951                 /*
952                  * We can't just restart the syscall, since previously
953                  * submitted sqes may already be in progress. Just fail this
954                  * IO with EINTR.
955                  */
956                 ret = -EINTR;
957                 /* fall through */
958         default:
959                 kiocb->ki_complete(kiocb, ret, 0);
960         }
961 }
962
963 static int io_import_fixed(struct io_ring_ctx *ctx, int rw,
964                            const struct io_uring_sqe *sqe,
965                            struct iov_iter *iter)
966 {
967         size_t len = READ_ONCE(sqe->len);
968         struct io_mapped_ubuf *imu;
969         unsigned index, buf_index;
970         size_t offset;
971         u64 buf_addr;
972
973         /* attempt to use fixed buffers without having provided iovecs */
974         if (unlikely(!ctx->user_bufs))
975                 return -EFAULT;
976
977         buf_index = READ_ONCE(sqe->buf_index);
978         if (unlikely(buf_index >= ctx->nr_user_bufs))
979                 return -EFAULT;
980
981         index = array_index_nospec(buf_index, ctx->nr_user_bufs);
982         imu = &ctx->user_bufs[index];
983         buf_addr = READ_ONCE(sqe->addr);
984
985         /* overflow */
986         if (buf_addr + len < buf_addr)
987                 return -EFAULT;
988         /* not inside the mapped region */
989         if (buf_addr < imu->ubuf || buf_addr + len > imu->ubuf + imu->len)
990                 return -EFAULT;
991
992         /*
993          * May not be a start of buffer, set size appropriately
994          * and advance us to the beginning.
995          */
996         offset = buf_addr - imu->ubuf;
997         iov_iter_bvec(iter, rw, imu->bvec, imu->nr_bvecs, offset + len);
998         if (offset)
999                 iov_iter_advance(iter, offset);
1000
1001         /* don't drop a reference to these pages */
1002         iter->type |= ITER_BVEC_FLAG_NO_REF;
1003         return 0;
1004 }
1005
1006 static int io_import_iovec(struct io_ring_ctx *ctx, int rw,
1007                            const struct sqe_submit *s, struct iovec **iovec,
1008                            struct iov_iter *iter)
1009 {
1010         const struct io_uring_sqe *sqe = s->sqe;
1011         void __user *buf = u64_to_user_ptr(READ_ONCE(sqe->addr));
1012         size_t sqe_len = READ_ONCE(sqe->len);
1013         u8 opcode;
1014
1015         /*
1016          * We're reading ->opcode for the second time, but the first read
1017          * doesn't care whether it's _FIXED or not, so it doesn't matter
1018          * whether ->opcode changes concurrently. The first read does care
1019          * about whether it is a READ or a WRITE, so we don't trust this read
1020          * for that purpose and instead let the caller pass in the read/write
1021          * flag.
1022          */
1023         opcode = READ_ONCE(sqe->opcode);
1024         if (opcode == IORING_OP_READ_FIXED ||
1025             opcode == IORING_OP_WRITE_FIXED) {
1026                 int ret = io_import_fixed(ctx, rw, sqe, iter);
1027                 *iovec = NULL;
1028                 return ret;
1029         }
1030
1031         if (!s->has_user)
1032                 return -EFAULT;
1033
1034 #ifdef CONFIG_COMPAT
1035         if (ctx->compat)
1036                 return compat_import_iovec(rw, buf, sqe_len, UIO_FASTIOV,
1037                                                 iovec, iter);
1038 #endif
1039
1040         return import_iovec(rw, buf, sqe_len, UIO_FASTIOV, iovec, iter);
1041 }
1042
1043 /*
1044  * Make a note of the last file/offset/direction we punted to async
1045  * context. We'll use this information to see if we can piggy back a
1046  * sequential request onto the previous one, if it's still hasn't been
1047  * completed by the async worker.
1048  */
1049 static void io_async_list_note(int rw, struct io_kiocb *req, size_t len)
1050 {
1051         struct async_list *async_list = &req->ctx->pending_async[rw];
1052         struct kiocb *kiocb = &req->rw;
1053         struct file *filp = kiocb->ki_filp;
1054         off_t io_end = kiocb->ki_pos + len;
1055
1056         if (filp == async_list->file && kiocb->ki_pos == async_list->io_end) {
1057                 unsigned long max_pages;
1058
1059                 /* Use 8x RA size as a decent limiter for both reads/writes */
1060                 max_pages = filp->f_ra.ra_pages;
1061                 if (!max_pages)
1062                         max_pages = VM_READAHEAD_PAGES;
1063                 max_pages *= 8;
1064
1065                 /* If max pages are exceeded, reset the state */
1066                 len >>= PAGE_SHIFT;
1067                 if (async_list->io_pages + len <= max_pages) {
1068                         req->flags |= REQ_F_SEQ_PREV;
1069                         async_list->io_pages += len;
1070                 } else {
1071                         io_end = 0;
1072                         async_list->io_pages = 0;
1073                 }
1074         }
1075
1076         /* New file? Reset state. */
1077         if (async_list->file != filp) {
1078                 async_list->io_pages = 0;
1079                 async_list->file = filp;
1080         }
1081         async_list->io_end = io_end;
1082 }
1083
1084 static int io_read(struct io_kiocb *req, const struct sqe_submit *s,
1085                    bool force_nonblock)
1086 {
1087         struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs;
1088         struct kiocb *kiocb = &req->rw;
1089         struct iov_iter iter;
1090         struct file *file;
1091         size_t iov_count;
1092         int ret;
1093
1094         ret = io_prep_rw(req, s, force_nonblock);
1095         if (ret)
1096                 return ret;
1097         file = kiocb->ki_filp;
1098
1099         if (unlikely(!(file->f_mode & FMODE_READ)))
1100                 return -EBADF;
1101         if (unlikely(!file->f_op->read_iter))
1102                 return -EINVAL;
1103
1104         ret = io_import_iovec(req->ctx, READ, s, &iovec, &iter);
1105         if (ret)
1106                 return ret;
1107
1108         iov_count = iov_iter_count(&iter);
1109         ret = rw_verify_area(READ, file, &kiocb->ki_pos, iov_count);
1110         if (!ret) {
1111                 ssize_t ret2;
1112
1113                 /* Catch -EAGAIN return for forced non-blocking submission */
1114                 ret2 = call_read_iter(file, kiocb, &iter);
1115                 if (!force_nonblock || ret2 != -EAGAIN) {
1116                         io_rw_done(kiocb, ret2);
1117                 } else {
1118                         /*
1119                          * If ->needs_lock is true, we're already in async
1120                          * context.
1121                          */
1122                         if (!s->needs_lock)
1123                                 io_async_list_note(READ, req, iov_count);
1124                         ret = -EAGAIN;
1125                 }
1126         }
1127         kfree(iovec);
1128         return ret;
1129 }
1130
1131 static int io_write(struct io_kiocb *req, const struct sqe_submit *s,
1132                     bool force_nonblock)
1133 {
1134         struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs;
1135         struct kiocb *kiocb = &req->rw;
1136         struct iov_iter iter;
1137         struct file *file;
1138         size_t iov_count;
1139         int ret;
1140
1141         ret = io_prep_rw(req, s, force_nonblock);
1142         if (ret)
1143                 return ret;
1144
1145         file = kiocb->ki_filp;
1146         if (unlikely(!(file->f_mode & FMODE_WRITE)))
1147                 return -EBADF;
1148         if (unlikely(!file->f_op->write_iter))
1149                 return -EINVAL;
1150
1151         ret = io_import_iovec(req->ctx, WRITE, s, &iovec, &iter);
1152         if (ret)
1153                 return ret;
1154
1155         iov_count = iov_iter_count(&iter);
1156
1157         ret = -EAGAIN;
1158         if (force_nonblock && !(kiocb->ki_flags & IOCB_DIRECT)) {
1159                 /* If ->needs_lock is true, we're already in async context. */
1160                 if (!s->needs_lock)
1161                         io_async_list_note(WRITE, req, iov_count);
1162                 goto out_free;
1163         }
1164
1165         ret = rw_verify_area(WRITE, file, &kiocb->ki_pos, iov_count);
1166         if (!ret) {
1167                 ssize_t ret2;
1168
1169                 /*
1170                  * Open-code file_start_write here to grab freeze protection,
1171                  * which will be released by another thread in
1172                  * io_complete_rw().  Fool lockdep by telling it the lock got
1173                  * released so that it doesn't complain about the held lock when
1174                  * we return to userspace.
1175                  */
1176                 if (S_ISREG(file_inode(file)->i_mode)) {
1177                         __sb_start_write(file_inode(file)->i_sb,
1178                                                 SB_FREEZE_WRITE, true);
1179                         __sb_writers_release(file_inode(file)->i_sb,
1180                                                 SB_FREEZE_WRITE);
1181                 }
1182                 kiocb->ki_flags |= IOCB_WRITE;
1183
1184                 ret2 = call_write_iter(file, kiocb, &iter);
1185                 if (!force_nonblock || ret2 != -EAGAIN) {
1186                         io_rw_done(kiocb, ret2);
1187                 } else {
1188                         /*
1189                          * If ->needs_lock is true, we're already in async
1190                          * context.
1191                          */
1192                         if (!s->needs_lock)
1193                                 io_async_list_note(WRITE, req, iov_count);
1194                         ret = -EAGAIN;
1195                 }
1196         }
1197 out_free:
1198         kfree(iovec);
1199         return ret;
1200 }
1201
1202 /*
1203  * IORING_OP_NOP just posts a completion event, nothing else.
1204  */
1205 static int io_nop(struct io_kiocb *req, u64 user_data)
1206 {
1207         struct io_ring_ctx *ctx = req->ctx;
1208         long err = 0;
1209
1210         if (unlikely(ctx->flags & IORING_SETUP_IOPOLL))
1211                 return -EINVAL;
1212
1213         io_cqring_add_event(ctx, user_data, err);
1214         io_put_req(req);
1215         return 0;
1216 }
1217
1218 static int io_prep_fsync(struct io_kiocb *req, const struct io_uring_sqe *sqe)
1219 {
1220         struct io_ring_ctx *ctx = req->ctx;
1221
1222         if (!req->file)
1223                 return -EBADF;
1224
1225         if (unlikely(ctx->flags & IORING_SETUP_IOPOLL))
1226                 return -EINVAL;
1227         if (unlikely(sqe->addr || sqe->ioprio || sqe->buf_index))
1228                 return -EINVAL;
1229
1230         return 0;
1231 }
1232
1233 static int io_fsync(struct io_kiocb *req, const struct io_uring_sqe *sqe,
1234                     bool force_nonblock)
1235 {
1236         loff_t sqe_off = READ_ONCE(sqe->off);
1237         loff_t sqe_len = READ_ONCE(sqe->len);
1238         loff_t end = sqe_off + sqe_len;
1239         unsigned fsync_flags;
1240         int ret;
1241
1242         fsync_flags = READ_ONCE(sqe->fsync_flags);
1243         if (unlikely(fsync_flags & ~IORING_FSYNC_DATASYNC))
1244                 return -EINVAL;
1245
1246         ret = io_prep_fsync(req, sqe);
1247         if (ret)
1248                 return ret;
1249
1250         /* fsync always requires a blocking context */
1251         if (force_nonblock)
1252                 return -EAGAIN;
1253
1254         ret = vfs_fsync_range(req->rw.ki_filp, sqe_off,
1255                                 end > 0 ? end : LLONG_MAX,
1256                                 fsync_flags & IORING_FSYNC_DATASYNC);
1257
1258         io_cqring_add_event(req->ctx, sqe->user_data, ret);
1259         io_put_req(req);
1260         return 0;
1261 }
1262
1263 static int io_prep_sfr(struct io_kiocb *req, const struct io_uring_sqe *sqe)
1264 {
1265         struct io_ring_ctx *ctx = req->ctx;
1266         int ret = 0;
1267
1268         if (!req->file)
1269                 return -EBADF;
1270
1271         if (unlikely(ctx->flags & IORING_SETUP_IOPOLL))
1272                 return -EINVAL;
1273         if (unlikely(sqe->addr || sqe->ioprio || sqe->buf_index))
1274                 return -EINVAL;
1275
1276         return ret;
1277 }
1278
1279 static int io_sync_file_range(struct io_kiocb *req,
1280                               const struct io_uring_sqe *sqe,
1281                               bool force_nonblock)
1282 {
1283         loff_t sqe_off;
1284         loff_t sqe_len;
1285         unsigned flags;
1286         int ret;
1287
1288         ret = io_prep_sfr(req, sqe);
1289         if (ret)
1290                 return ret;
1291
1292         /* sync_file_range always requires a blocking context */
1293         if (force_nonblock)
1294                 return -EAGAIN;
1295
1296         sqe_off = READ_ONCE(sqe->off);
1297         sqe_len = READ_ONCE(sqe->len);
1298         flags = READ_ONCE(sqe->sync_range_flags);
1299
1300         ret = sync_file_range(req->rw.ki_filp, sqe_off, sqe_len, flags);
1301
1302         io_cqring_add_event(req->ctx, sqe->user_data, ret);
1303         io_put_req(req);
1304         return 0;
1305 }
1306
1307 static void io_poll_remove_one(struct io_kiocb *req)
1308 {
1309         struct io_poll_iocb *poll = &req->poll;
1310
1311         spin_lock(&poll->head->lock);
1312         WRITE_ONCE(poll->canceled, true);
1313         if (!list_empty(&poll->wait.entry)) {
1314                 list_del_init(&poll->wait.entry);
1315                 queue_work(req->ctx->sqo_wq, &req->work);
1316         }
1317         spin_unlock(&poll->head->lock);
1318
1319         list_del_init(&req->list);
1320 }
1321
1322 static void io_poll_remove_all(struct io_ring_ctx *ctx)
1323 {
1324         struct io_kiocb *req;
1325
1326         spin_lock_irq(&ctx->completion_lock);
1327         while (!list_empty(&ctx->cancel_list)) {
1328                 req = list_first_entry(&ctx->cancel_list, struct io_kiocb,list);
1329                 io_poll_remove_one(req);
1330         }
1331         spin_unlock_irq(&ctx->completion_lock);
1332 }
1333
1334 /*
1335  * Find a running poll command that matches one specified in sqe->addr,
1336  * and remove it if found.
1337  */
1338 static int io_poll_remove(struct io_kiocb *req, const struct io_uring_sqe *sqe)
1339 {
1340         struct io_ring_ctx *ctx = req->ctx;
1341         struct io_kiocb *poll_req, *next;
1342         int ret = -ENOENT;
1343
1344         if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL))
1345                 return -EINVAL;
1346         if (sqe->ioprio || sqe->off || sqe->len || sqe->buf_index ||
1347             sqe->poll_events)
1348                 return -EINVAL;
1349
1350         spin_lock_irq(&ctx->completion_lock);
1351         list_for_each_entry_safe(poll_req, next, &ctx->cancel_list, list) {
1352                 if (READ_ONCE(sqe->addr) == poll_req->user_data) {
1353                         io_poll_remove_one(poll_req);
1354                         ret = 0;
1355                         break;
1356                 }
1357         }
1358         spin_unlock_irq(&ctx->completion_lock);
1359
1360         io_cqring_add_event(req->ctx, sqe->user_data, ret);
1361         io_put_req(req);
1362         return 0;
1363 }
1364
1365 static void io_poll_complete(struct io_ring_ctx *ctx, struct io_kiocb *req,
1366                              __poll_t mask)
1367 {
1368         req->poll.done = true;
1369         io_cqring_fill_event(ctx, req->user_data, mangle_poll(mask));
1370         io_commit_cqring(ctx);
1371 }
1372
1373 static void io_poll_complete_work(struct work_struct *work)
1374 {
1375         struct io_kiocb *req = container_of(work, struct io_kiocb, work);
1376         struct io_poll_iocb *poll = &req->poll;
1377         struct poll_table_struct pt = { ._key = poll->events };
1378         struct io_ring_ctx *ctx = req->ctx;
1379         __poll_t mask = 0;
1380
1381         if (!READ_ONCE(poll->canceled))
1382                 mask = vfs_poll(poll->file, &pt) & poll->events;
1383
1384         /*
1385          * Note that ->ki_cancel callers also delete iocb from active_reqs after
1386          * calling ->ki_cancel.  We need the ctx_lock roundtrip here to
1387          * synchronize with them.  In the cancellation case the list_del_init
1388          * itself is not actually needed, but harmless so we keep it in to
1389          * avoid further branches in the fast path.
1390          */
1391         spin_lock_irq(&ctx->completion_lock);
1392         if (!mask && !READ_ONCE(poll->canceled)) {
1393                 add_wait_queue(poll->head, &poll->wait);
1394                 spin_unlock_irq(&ctx->completion_lock);
1395                 return;
1396         }
1397         list_del_init(&req->list);
1398         io_poll_complete(ctx, req, mask);
1399         spin_unlock_irq(&ctx->completion_lock);
1400
1401         io_cqring_ev_posted(ctx);
1402         io_put_req(req);
1403 }
1404
1405 static int io_poll_wake(struct wait_queue_entry *wait, unsigned mode, int sync,
1406                         void *key)
1407 {
1408         struct io_poll_iocb *poll = container_of(wait, struct io_poll_iocb,
1409                                                         wait);
1410         struct io_kiocb *req = container_of(poll, struct io_kiocb, poll);
1411         struct io_ring_ctx *ctx = req->ctx;
1412         __poll_t mask = key_to_poll(key);
1413         unsigned long flags;
1414
1415         /* for instances that support it check for an event match first: */
1416         if (mask && !(mask & poll->events))
1417                 return 0;
1418
1419         list_del_init(&poll->wait.entry);
1420
1421         if (mask && spin_trylock_irqsave(&ctx->completion_lock, flags)) {
1422                 list_del(&req->list);
1423                 io_poll_complete(ctx, req, mask);
1424                 spin_unlock_irqrestore(&ctx->completion_lock, flags);
1425
1426                 io_cqring_ev_posted(ctx);
1427                 io_put_req(req);
1428         } else {
1429                 queue_work(ctx->sqo_wq, &req->work);
1430         }
1431
1432         return 1;
1433 }
1434
1435 struct io_poll_table {
1436         struct poll_table_struct pt;
1437         struct io_kiocb *req;
1438         int error;
1439 };
1440
1441 static void io_poll_queue_proc(struct file *file, struct wait_queue_head *head,
1442                                struct poll_table_struct *p)
1443 {
1444         struct io_poll_table *pt = container_of(p, struct io_poll_table, pt);
1445
1446         if (unlikely(pt->req->poll.head)) {
1447                 pt->error = -EINVAL;
1448                 return;
1449         }
1450
1451         pt->error = 0;
1452         pt->req->poll.head = head;
1453         add_wait_queue(head, &pt->req->poll.wait);
1454 }
1455
1456 static int io_poll_add(struct io_kiocb *req, const struct io_uring_sqe *sqe)
1457 {
1458         struct io_poll_iocb *poll = &req->poll;
1459         struct io_ring_ctx *ctx = req->ctx;
1460         struct io_poll_table ipt;
1461         bool cancel = false;
1462         __poll_t mask;
1463         u16 events;
1464
1465         if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL))
1466                 return -EINVAL;
1467         if (sqe->addr || sqe->ioprio || sqe->off || sqe->len || sqe->buf_index)
1468                 return -EINVAL;
1469         if (!poll->file)
1470                 return -EBADF;
1471
1472         INIT_WORK(&req->work, io_poll_complete_work);
1473         events = READ_ONCE(sqe->poll_events);
1474         poll->events = demangle_poll(events) | EPOLLERR | EPOLLHUP;
1475
1476         poll->head = NULL;
1477         poll->done = false;
1478         poll->canceled = false;
1479
1480         ipt.pt._qproc = io_poll_queue_proc;
1481         ipt.pt._key = poll->events;
1482         ipt.req = req;
1483         ipt.error = -EINVAL; /* same as no support for IOCB_CMD_POLL */
1484
1485         /* initialized the list so that we can do list_empty checks */
1486         INIT_LIST_HEAD(&poll->wait.entry);
1487         init_waitqueue_func_entry(&poll->wait, io_poll_wake);
1488
1489         mask = vfs_poll(poll->file, &ipt.pt) & poll->events;
1490
1491         spin_lock_irq(&ctx->completion_lock);
1492         if (likely(poll->head)) {
1493                 spin_lock(&poll->head->lock);
1494                 if (unlikely(list_empty(&poll->wait.entry))) {
1495                         if (ipt.error)
1496                                 cancel = true;
1497                         ipt.error = 0;
1498                         mask = 0;
1499                 }
1500                 if (mask || ipt.error)
1501                         list_del_init(&poll->wait.entry);
1502                 else if (cancel)
1503                         WRITE_ONCE(poll->canceled, true);
1504                 else if (!poll->done) /* actually waiting for an event */
1505                         list_add_tail(&req->list, &ctx->cancel_list);
1506                 spin_unlock(&poll->head->lock);
1507         }
1508         if (mask) { /* no async, we'd stolen it */
1509                 ipt.error = 0;
1510                 io_poll_complete(ctx, req, mask);
1511         }
1512         spin_unlock_irq(&ctx->completion_lock);
1513
1514         if (mask) {
1515                 io_cqring_ev_posted(ctx);
1516                 io_put_req(req);
1517         }
1518         return ipt.error;
1519 }
1520
1521 static int io_req_defer(struct io_ring_ctx *ctx, struct io_kiocb *req,
1522                         const struct io_uring_sqe *sqe)
1523 {
1524         struct io_uring_sqe *sqe_copy;
1525
1526         if (!io_sequence_defer(ctx, req) && list_empty(&ctx->defer_list))
1527                 return 0;
1528
1529         sqe_copy = kmalloc(sizeof(*sqe_copy), GFP_KERNEL);
1530         if (!sqe_copy)
1531                 return -EAGAIN;
1532
1533         spin_lock_irq(&ctx->completion_lock);
1534         if (!io_sequence_defer(ctx, req) && list_empty(&ctx->defer_list)) {
1535                 spin_unlock_irq(&ctx->completion_lock);
1536                 kfree(sqe_copy);
1537                 return 0;
1538         }
1539
1540         memcpy(sqe_copy, sqe, sizeof(*sqe_copy));
1541         req->submit.sqe = sqe_copy;
1542
1543         INIT_WORK(&req->work, io_sq_wq_submit_work);
1544         list_add_tail(&req->list, &ctx->defer_list);
1545         spin_unlock_irq(&ctx->completion_lock);
1546         return -EIOCBQUEUED;
1547 }
1548
1549 static int __io_submit_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
1550                            const struct sqe_submit *s, bool force_nonblock)
1551 {
1552         int ret, opcode;
1553
1554         if (unlikely(s->index >= ctx->sq_entries))
1555                 return -EINVAL;
1556         req->user_data = READ_ONCE(s->sqe->user_data);
1557
1558         opcode = READ_ONCE(s->sqe->opcode);
1559         switch (opcode) {
1560         case IORING_OP_NOP:
1561                 ret = io_nop(req, req->user_data);
1562                 break;
1563         case IORING_OP_READV:
1564                 if (unlikely(s->sqe->buf_index))
1565                         return -EINVAL;
1566                 ret = io_read(req, s, force_nonblock);
1567                 break;
1568         case IORING_OP_WRITEV:
1569                 if (unlikely(s->sqe->buf_index))
1570                         return -EINVAL;
1571                 ret = io_write(req, s, force_nonblock);
1572                 break;
1573         case IORING_OP_READ_FIXED:
1574                 ret = io_read(req, s, force_nonblock);
1575                 break;
1576         case IORING_OP_WRITE_FIXED:
1577                 ret = io_write(req, s, force_nonblock);
1578                 break;
1579         case IORING_OP_FSYNC:
1580                 ret = io_fsync(req, s->sqe, force_nonblock);
1581                 break;
1582         case IORING_OP_POLL_ADD:
1583                 ret = io_poll_add(req, s->sqe);
1584                 break;
1585         case IORING_OP_POLL_REMOVE:
1586                 ret = io_poll_remove(req, s->sqe);
1587                 break;
1588         case IORING_OP_SYNC_FILE_RANGE:
1589                 ret = io_sync_file_range(req, s->sqe, force_nonblock);
1590                 break;
1591         default:
1592                 ret = -EINVAL;
1593                 break;
1594         }
1595
1596         if (ret)
1597                 return ret;
1598
1599         if (ctx->flags & IORING_SETUP_IOPOLL) {
1600                 if (req->error == -EAGAIN)
1601                         return -EAGAIN;
1602
1603                 /* workqueue context doesn't hold uring_lock, grab it now */
1604                 if (s->needs_lock)
1605                         mutex_lock(&ctx->uring_lock);
1606                 io_iopoll_req_issued(req);
1607                 if (s->needs_lock)
1608                         mutex_unlock(&ctx->uring_lock);
1609         }
1610
1611         return 0;
1612 }
1613
1614 static struct async_list *io_async_list_from_sqe(struct io_ring_ctx *ctx,
1615                                                  const struct io_uring_sqe *sqe)
1616 {
1617         switch (sqe->opcode) {
1618         case IORING_OP_READV:
1619         case IORING_OP_READ_FIXED:
1620                 return &ctx->pending_async[READ];
1621         case IORING_OP_WRITEV:
1622         case IORING_OP_WRITE_FIXED:
1623                 return &ctx->pending_async[WRITE];
1624         default:
1625                 return NULL;
1626         }
1627 }
1628
1629 static inline bool io_sqe_needs_user(const struct io_uring_sqe *sqe)
1630 {
1631         u8 opcode = READ_ONCE(sqe->opcode);
1632
1633         return !(opcode == IORING_OP_READ_FIXED ||
1634                  opcode == IORING_OP_WRITE_FIXED);
1635 }
1636
1637 static void io_sq_wq_submit_work(struct work_struct *work)
1638 {
1639         struct io_kiocb *req = container_of(work, struct io_kiocb, work);
1640         struct io_ring_ctx *ctx = req->ctx;
1641         struct mm_struct *cur_mm = NULL;
1642         struct async_list *async_list;
1643         LIST_HEAD(req_list);
1644         mm_segment_t old_fs;
1645         int ret;
1646
1647         async_list = io_async_list_from_sqe(ctx, req->submit.sqe);
1648 restart:
1649         do {
1650                 struct sqe_submit *s = &req->submit;
1651                 const struct io_uring_sqe *sqe = s->sqe;
1652
1653                 /* Ensure we clear previously set non-block flag */
1654                 req->rw.ki_flags &= ~IOCB_NOWAIT;
1655
1656                 ret = 0;
1657                 if (io_sqe_needs_user(sqe) && !cur_mm) {
1658                         if (!mmget_not_zero(ctx->sqo_mm)) {
1659                                 ret = -EFAULT;
1660                         } else {
1661                                 cur_mm = ctx->sqo_mm;
1662                                 use_mm(cur_mm);
1663                                 old_fs = get_fs();
1664                                 set_fs(USER_DS);
1665                         }
1666                 }
1667
1668                 if (!ret) {
1669                         s->has_user = cur_mm != NULL;
1670                         s->needs_lock = true;
1671                         do {
1672                                 ret = __io_submit_sqe(ctx, req, s, false);
1673                                 /*
1674                                  * We can get EAGAIN for polled IO even though
1675                                  * we're forcing a sync submission from here,
1676                                  * since we can't wait for request slots on the
1677                                  * block side.
1678                                  */
1679                                 if (ret != -EAGAIN)
1680                                         break;
1681                                 cond_resched();
1682                         } while (1);
1683                 }
1684
1685                 /* drop submission reference */
1686                 io_put_req(req);
1687
1688                 if (ret) {
1689                         io_cqring_add_event(ctx, sqe->user_data, ret);
1690                         io_put_req(req);
1691                 }
1692
1693                 /* async context always use a copy of the sqe */
1694                 kfree(sqe);
1695
1696                 if (!async_list)
1697                         break;
1698                 if (!list_empty(&req_list)) {
1699                         req = list_first_entry(&req_list, struct io_kiocb,
1700                                                 list);
1701                         list_del(&req->list);
1702                         continue;
1703                 }
1704                 if (list_empty(&async_list->list))
1705                         break;
1706
1707                 req = NULL;
1708                 spin_lock(&async_list->lock);
1709                 if (list_empty(&async_list->list)) {
1710                         spin_unlock(&async_list->lock);
1711                         break;
1712                 }
1713                 list_splice_init(&async_list->list, &req_list);
1714                 spin_unlock(&async_list->lock);
1715
1716                 req = list_first_entry(&req_list, struct io_kiocb, list);
1717                 list_del(&req->list);
1718         } while (req);
1719
1720         /*
1721          * Rare case of racing with a submitter. If we find the count has
1722          * dropped to zero AND we have pending work items, then restart
1723          * the processing. This is a tiny race window.
1724          */
1725         if (async_list) {
1726                 ret = atomic_dec_return(&async_list->cnt);
1727                 while (!ret && !list_empty(&async_list->list)) {
1728                         spin_lock(&async_list->lock);
1729                         atomic_inc(&async_list->cnt);
1730                         list_splice_init(&async_list->list, &req_list);
1731                         spin_unlock(&async_list->lock);
1732
1733                         if (!list_empty(&req_list)) {
1734                                 req = list_first_entry(&req_list,
1735                                                         struct io_kiocb, list);
1736                                 list_del(&req->list);
1737                                 goto restart;
1738                         }
1739                         ret = atomic_dec_return(&async_list->cnt);
1740                 }
1741         }
1742
1743         if (cur_mm) {
1744                 set_fs(old_fs);
1745                 unuse_mm(cur_mm);
1746                 mmput(cur_mm);
1747         }
1748 }
1749
1750 /*
1751  * See if we can piggy back onto previously submitted work, that is still
1752  * running. We currently only allow this if the new request is sequential
1753  * to the previous one we punted.
1754  */
1755 static bool io_add_to_prev_work(struct async_list *list, struct io_kiocb *req)
1756 {
1757         bool ret = false;
1758
1759         if (!list)
1760                 return false;
1761         if (!(req->flags & REQ_F_SEQ_PREV))
1762                 return false;
1763         if (!atomic_read(&list->cnt))
1764                 return false;
1765
1766         ret = true;
1767         spin_lock(&list->lock);
1768         list_add_tail(&req->list, &list->list);
1769         if (!atomic_read(&list->cnt)) {
1770                 list_del_init(&req->list);
1771                 ret = false;
1772         }
1773         spin_unlock(&list->lock);
1774         return ret;
1775 }
1776
1777 static bool io_op_needs_file(const struct io_uring_sqe *sqe)
1778 {
1779         int op = READ_ONCE(sqe->opcode);
1780
1781         switch (op) {
1782         case IORING_OP_NOP:
1783         case IORING_OP_POLL_REMOVE:
1784                 return false;
1785         default:
1786                 return true;
1787         }
1788 }
1789
1790 static int io_req_set_file(struct io_ring_ctx *ctx, const struct sqe_submit *s,
1791                            struct io_submit_state *state, struct io_kiocb *req)
1792 {
1793         unsigned flags;
1794         int fd;
1795
1796         flags = READ_ONCE(s->sqe->flags);
1797         fd = READ_ONCE(s->sqe->fd);
1798
1799         if (flags & IOSQE_IO_DRAIN) {
1800                 req->flags |= REQ_F_IO_DRAIN;
1801                 req->sequence = ctx->cached_sq_head - 1;
1802         }
1803
1804         if (!io_op_needs_file(s->sqe)) {
1805                 req->file = NULL;
1806                 return 0;
1807         }
1808
1809         if (flags & IOSQE_FIXED_FILE) {
1810                 if (unlikely(!ctx->user_files ||
1811                     (unsigned) fd >= ctx->nr_user_files))
1812                         return -EBADF;
1813                 req->file = ctx->user_files[fd];
1814                 req->flags |= REQ_F_FIXED_FILE;
1815         } else {
1816                 if (s->needs_fixed_file)
1817                         return -EBADF;
1818                 req->file = io_file_get(state, fd);
1819                 if (unlikely(!req->file))
1820                         return -EBADF;
1821         }
1822
1823         return 0;
1824 }
1825
1826 static int io_submit_sqe(struct io_ring_ctx *ctx, struct sqe_submit *s,
1827                          struct io_submit_state *state)
1828 {
1829         struct io_kiocb *req;
1830         int ret;
1831
1832         /* enforce forwards compatibility on users */
1833         if (unlikely(s->sqe->flags & ~(IOSQE_FIXED_FILE | IOSQE_IO_DRAIN)))
1834                 return -EINVAL;
1835
1836         req = io_get_req(ctx, state);
1837         if (unlikely(!req))
1838                 return -EAGAIN;
1839
1840         ret = io_req_set_file(ctx, s, state, req);
1841         if (unlikely(ret))
1842                 goto out;
1843
1844         ret = io_req_defer(ctx, req, s->sqe);
1845         if (ret) {
1846                 if (ret == -EIOCBQUEUED)
1847                         ret = 0;
1848                 return ret;
1849         }
1850
1851         ret = __io_submit_sqe(ctx, req, s, true);
1852         if (ret == -EAGAIN && !(req->flags & REQ_F_NOWAIT)) {
1853                 struct io_uring_sqe *sqe_copy;
1854
1855                 sqe_copy = kmalloc(sizeof(*sqe_copy), GFP_KERNEL);
1856                 if (sqe_copy) {
1857                         struct async_list *list;
1858
1859                         memcpy(sqe_copy, s->sqe, sizeof(*sqe_copy));
1860                         s->sqe = sqe_copy;
1861
1862                         memcpy(&req->submit, s, sizeof(*s));
1863                         list = io_async_list_from_sqe(ctx, s->sqe);
1864                         if (!io_add_to_prev_work(list, req)) {
1865                                 if (list)
1866                                         atomic_inc(&list->cnt);
1867                                 INIT_WORK(&req->work, io_sq_wq_submit_work);
1868                                 queue_work(ctx->sqo_wq, &req->work);
1869                         }
1870
1871                         /*
1872                          * Queued up for async execution, worker will release
1873                          * submit reference when the iocb is actually
1874                          * submitted.
1875                          */
1876                         return 0;
1877                 }
1878         }
1879
1880 out:
1881         /* drop submission reference */
1882         io_put_req(req);
1883
1884         /* and drop final reference, if we failed */
1885         if (ret)
1886                 io_put_req(req);
1887
1888         return ret;
1889 }
1890
1891 /*
1892  * Batched submission is done, ensure local IO is flushed out.
1893  */
1894 static void io_submit_state_end(struct io_submit_state *state)
1895 {
1896         blk_finish_plug(&state->plug);
1897         io_file_put(state);
1898         if (state->free_reqs)
1899                 kmem_cache_free_bulk(req_cachep, state->free_reqs,
1900                                         &state->reqs[state->cur_req]);
1901 }
1902
1903 /*
1904  * Start submission side cache.
1905  */
1906 static void io_submit_state_start(struct io_submit_state *state,
1907                                   struct io_ring_ctx *ctx, unsigned max_ios)
1908 {
1909         blk_start_plug(&state->plug);
1910         state->free_reqs = 0;
1911         state->file = NULL;
1912         state->ios_left = max_ios;
1913 }
1914
1915 static void io_commit_sqring(struct io_ring_ctx *ctx)
1916 {
1917         struct io_sq_ring *ring = ctx->sq_ring;
1918
1919         if (ctx->cached_sq_head != READ_ONCE(ring->r.head)) {
1920                 /*
1921                  * Ensure any loads from the SQEs are done at this point,
1922                  * since once we write the new head, the application could
1923                  * write new data to them.
1924                  */
1925                 smp_store_release(&ring->r.head, ctx->cached_sq_head);
1926         }
1927 }
1928
1929 /*
1930  * Fetch an sqe, if one is available. Note that s->sqe will point to memory
1931  * that is mapped by userspace. This means that care needs to be taken to
1932  * ensure that reads are stable, as we cannot rely on userspace always
1933  * being a good citizen. If members of the sqe are validated and then later
1934  * used, it's important that those reads are done through READ_ONCE() to
1935  * prevent a re-load down the line.
1936  */
1937 static bool io_get_sqring(struct io_ring_ctx *ctx, struct sqe_submit *s)
1938 {
1939         struct io_sq_ring *ring = ctx->sq_ring;
1940         unsigned head;
1941
1942         /*
1943          * The cached sq head (or cq tail) serves two purposes:
1944          *
1945          * 1) allows us to batch the cost of updating the user visible
1946          *    head updates.
1947          * 2) allows the kernel side to track the head on its own, even
1948          *    though the application is the one updating it.
1949          */
1950         head = ctx->cached_sq_head;
1951         /* make sure SQ entry isn't read before tail */
1952         if (head == smp_load_acquire(&ring->r.tail))
1953                 return false;
1954
1955         head = READ_ONCE(ring->array[head & ctx->sq_mask]);
1956         if (head < ctx->sq_entries) {
1957                 s->index = head;
1958                 s->sqe = &ctx->sq_sqes[head];
1959                 ctx->cached_sq_head++;
1960                 return true;
1961         }
1962
1963         /* drop invalid entries */
1964         ctx->cached_sq_head++;
1965         ring->dropped++;
1966         return false;
1967 }
1968
1969 static int io_submit_sqes(struct io_ring_ctx *ctx, struct sqe_submit *sqes,
1970                           unsigned int nr, bool has_user, bool mm_fault)
1971 {
1972         struct io_submit_state state, *statep = NULL;
1973         int ret, i, submitted = 0;
1974
1975         if (nr > IO_PLUG_THRESHOLD) {
1976                 io_submit_state_start(&state, ctx, nr);
1977                 statep = &state;
1978         }
1979
1980         for (i = 0; i < nr; i++) {
1981                 if (unlikely(mm_fault)) {
1982                         ret = -EFAULT;
1983                 } else {
1984                         sqes[i].has_user = has_user;
1985                         sqes[i].needs_lock = true;
1986                         sqes[i].needs_fixed_file = true;
1987                         ret = io_submit_sqe(ctx, &sqes[i], statep);
1988                 }
1989                 if (!ret) {
1990                         submitted++;
1991                         continue;
1992                 }
1993
1994                 io_cqring_add_event(ctx, sqes[i].sqe->user_data, ret);
1995         }
1996
1997         if (statep)
1998                 io_submit_state_end(&state);
1999
2000         return submitted;
2001 }
2002
2003 static int io_sq_thread(void *data)
2004 {
2005         struct sqe_submit sqes[IO_IOPOLL_BATCH];
2006         struct io_ring_ctx *ctx = data;
2007         struct mm_struct *cur_mm = NULL;
2008         mm_segment_t old_fs;
2009         DEFINE_WAIT(wait);
2010         unsigned inflight;
2011         unsigned long timeout;
2012
2013         old_fs = get_fs();
2014         set_fs(USER_DS);
2015
2016         timeout = inflight = 0;
2017         while (!kthread_should_park()) {
2018                 bool all_fixed, mm_fault = false;
2019                 int i;
2020
2021                 if (inflight) {
2022                         unsigned nr_events = 0;
2023
2024                         if (ctx->flags & IORING_SETUP_IOPOLL) {
2025                                 /*
2026                                  * We disallow the app entering submit/complete
2027                                  * with polling, but we still need to lock the
2028                                  * ring to prevent racing with polled issue
2029                                  * that got punted to a workqueue.
2030                                  */
2031                                 mutex_lock(&ctx->uring_lock);
2032                                 io_iopoll_check(ctx, &nr_events, 0);
2033                                 mutex_unlock(&ctx->uring_lock);
2034                         } else {
2035                                 /*
2036                                  * Normal IO, just pretend everything completed.
2037                                  * We don't have to poll completions for that.
2038                                  */
2039                                 nr_events = inflight;
2040                         }
2041
2042                         inflight -= nr_events;
2043                         if (!inflight)
2044                                 timeout = jiffies + ctx->sq_thread_idle;
2045                 }
2046
2047                 if (!io_get_sqring(ctx, &sqes[0])) {
2048                         /*
2049                          * We're polling. If we're within the defined idle
2050                          * period, then let us spin without work before going
2051                          * to sleep.
2052                          */
2053                         if (inflight || !time_after(jiffies, timeout)) {
2054                                 cpu_relax();
2055                                 continue;
2056                         }
2057
2058                         /*
2059                          * Drop cur_mm before scheduling, we can't hold it for
2060                          * long periods (or over schedule()). Do this before
2061                          * adding ourselves to the waitqueue, as the unuse/drop
2062                          * may sleep.
2063                          */
2064                         if (cur_mm) {
2065                                 unuse_mm(cur_mm);
2066                                 mmput(cur_mm);
2067                                 cur_mm = NULL;
2068                         }
2069
2070                         prepare_to_wait(&ctx->sqo_wait, &wait,
2071                                                 TASK_INTERRUPTIBLE);
2072
2073                         /* Tell userspace we may need a wakeup call */
2074                         ctx->sq_ring->flags |= IORING_SQ_NEED_WAKEUP;
2075                         /* make sure to read SQ tail after writing flags */
2076                         smp_mb();
2077
2078                         if (!io_get_sqring(ctx, &sqes[0])) {
2079                                 if (kthread_should_park()) {
2080                                         finish_wait(&ctx->sqo_wait, &wait);
2081                                         break;
2082                                 }
2083                                 if (signal_pending(current))
2084                                         flush_signals(current);
2085                                 schedule();
2086                                 finish_wait(&ctx->sqo_wait, &wait);
2087
2088                                 ctx->sq_ring->flags &= ~IORING_SQ_NEED_WAKEUP;
2089                                 continue;
2090                         }
2091                         finish_wait(&ctx->sqo_wait, &wait);
2092
2093                         ctx->sq_ring->flags &= ~IORING_SQ_NEED_WAKEUP;
2094                 }
2095
2096                 i = 0;
2097                 all_fixed = true;
2098                 do {
2099                         if (all_fixed && io_sqe_needs_user(sqes[i].sqe))
2100                                 all_fixed = false;
2101
2102                         i++;
2103                         if (i == ARRAY_SIZE(sqes))
2104                                 break;
2105                 } while (io_get_sqring(ctx, &sqes[i]));
2106
2107                 /* Unless all new commands are FIXED regions, grab mm */
2108                 if (!all_fixed && !cur_mm) {
2109                         mm_fault = !mmget_not_zero(ctx->sqo_mm);
2110                         if (!mm_fault) {
2111                                 use_mm(ctx->sqo_mm);
2112                                 cur_mm = ctx->sqo_mm;
2113                         }
2114                 }
2115
2116                 inflight += io_submit_sqes(ctx, sqes, i, cur_mm != NULL,
2117                                                 mm_fault);
2118
2119                 /* Commit SQ ring head once we've consumed all SQEs */
2120                 io_commit_sqring(ctx);
2121         }
2122
2123         set_fs(old_fs);
2124         if (cur_mm) {
2125                 unuse_mm(cur_mm);
2126                 mmput(cur_mm);
2127         }
2128
2129         kthread_parkme();
2130
2131         return 0;
2132 }
2133
2134 static int io_ring_submit(struct io_ring_ctx *ctx, unsigned int to_submit)
2135 {
2136         struct io_submit_state state, *statep = NULL;
2137         int i, submit = 0;
2138
2139         if (to_submit > IO_PLUG_THRESHOLD) {
2140                 io_submit_state_start(&state, ctx, to_submit);
2141                 statep = &state;
2142         }
2143
2144         for (i = 0; i < to_submit; i++) {
2145                 struct sqe_submit s;
2146                 int ret;
2147
2148                 if (!io_get_sqring(ctx, &s))
2149                         break;
2150
2151                 s.has_user = true;
2152                 s.needs_lock = false;
2153                 s.needs_fixed_file = false;
2154                 submit++;
2155
2156                 ret = io_submit_sqe(ctx, &s, statep);
2157                 if (ret)
2158                         io_cqring_add_event(ctx, s.sqe->user_data, ret);
2159         }
2160         io_commit_sqring(ctx);
2161
2162         if (statep)
2163                 io_submit_state_end(statep);
2164
2165         return submit;
2166 }
2167
2168 static unsigned io_cqring_events(struct io_cq_ring *ring)
2169 {
2170         /* See comment at the top of this file */
2171         smp_rmb();
2172         return READ_ONCE(ring->r.tail) - READ_ONCE(ring->r.head);
2173 }
2174
2175 /*
2176  * Wait until events become available, if we don't already have some. The
2177  * application must reap them itself, as they reside on the shared cq ring.
2178  */
2179 static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events,
2180                           const sigset_t __user *sig, size_t sigsz)
2181 {
2182         struct io_cq_ring *ring = ctx->cq_ring;
2183         sigset_t ksigmask, sigsaved;
2184         int ret;
2185
2186         if (io_cqring_events(ring) >= min_events)
2187                 return 0;
2188
2189         if (sig) {
2190 #ifdef CONFIG_COMPAT
2191                 if (in_compat_syscall())
2192                         ret = set_compat_user_sigmask((const compat_sigset_t __user *)sig,
2193                                                       &ksigmask, &sigsaved, sigsz);
2194                 else
2195 #endif
2196                         ret = set_user_sigmask(sig, &ksigmask,
2197                                                &sigsaved, sigsz);
2198
2199                 if (ret)
2200                         return ret;
2201         }
2202
2203         ret = wait_event_interruptible(ctx->wait, io_cqring_events(ring) >= min_events);
2204         if (ret == -ERESTARTSYS)
2205                 ret = -EINTR;
2206
2207         if (sig)
2208                 restore_user_sigmask(sig, &sigsaved);
2209
2210         return READ_ONCE(ring->r.head) == READ_ONCE(ring->r.tail) ? ret : 0;
2211 }
2212
2213 static void __io_sqe_files_unregister(struct io_ring_ctx *ctx)
2214 {
2215 #if defined(CONFIG_UNIX)
2216         if (ctx->ring_sock) {
2217                 struct sock *sock = ctx->ring_sock->sk;
2218                 struct sk_buff *skb;
2219
2220                 while ((skb = skb_dequeue(&sock->sk_receive_queue)) != NULL)
2221                         kfree_skb(skb);
2222         }
2223 #else
2224         int i;
2225
2226         for (i = 0; i < ctx->nr_user_files; i++)
2227                 fput(ctx->user_files[i]);
2228 #endif
2229 }
2230
2231 static int io_sqe_files_unregister(struct io_ring_ctx *ctx)
2232 {
2233         if (!ctx->user_files)
2234                 return -ENXIO;
2235
2236         __io_sqe_files_unregister(ctx);
2237         kfree(ctx->user_files);
2238         ctx->user_files = NULL;
2239         ctx->nr_user_files = 0;
2240         return 0;
2241 }
2242
2243 static void io_sq_thread_stop(struct io_ring_ctx *ctx)
2244 {
2245         if (ctx->sqo_thread) {
2246                 /*
2247                  * The park is a bit of a work-around, without it we get
2248                  * warning spews on shutdown with SQPOLL set and affinity
2249                  * set to a single CPU.
2250                  */
2251                 kthread_park(ctx->sqo_thread);
2252                 kthread_stop(ctx->sqo_thread);
2253                 ctx->sqo_thread = NULL;
2254         }
2255 }
2256
2257 static void io_finish_async(struct io_ring_ctx *ctx)
2258 {
2259         io_sq_thread_stop(ctx);
2260
2261         if (ctx->sqo_wq) {
2262                 destroy_workqueue(ctx->sqo_wq);
2263                 ctx->sqo_wq = NULL;
2264         }
2265 }
2266
2267 #if defined(CONFIG_UNIX)
2268 static void io_destruct_skb(struct sk_buff *skb)
2269 {
2270         struct io_ring_ctx *ctx = skb->sk->sk_user_data;
2271
2272         io_finish_async(ctx);
2273         unix_destruct_scm(skb);
2274 }
2275
2276 /*
2277  * Ensure the UNIX gc is aware of our file set, so we are certain that
2278  * the io_uring can be safely unregistered on process exit, even if we have
2279  * loops in the file referencing.
2280  */
2281 static int __io_sqe_files_scm(struct io_ring_ctx *ctx, int nr, int offset)
2282 {
2283         struct sock *sk = ctx->ring_sock->sk;
2284         struct scm_fp_list *fpl;
2285         struct sk_buff *skb;
2286         int i;
2287
2288         if (!capable(CAP_SYS_RESOURCE) && !capable(CAP_SYS_ADMIN)) {
2289                 unsigned long inflight = ctx->user->unix_inflight + nr;
2290
2291                 if (inflight > task_rlimit(current, RLIMIT_NOFILE))
2292                         return -EMFILE;
2293         }
2294
2295         fpl = kzalloc(sizeof(*fpl), GFP_KERNEL);
2296         if (!fpl)
2297                 return -ENOMEM;
2298
2299         skb = alloc_skb(0, GFP_KERNEL);
2300         if (!skb) {
2301                 kfree(fpl);
2302                 return -ENOMEM;
2303         }
2304
2305         skb->sk = sk;
2306         skb->destructor = io_destruct_skb;
2307
2308         fpl->user = get_uid(ctx->user);
2309         for (i = 0; i < nr; i++) {
2310                 fpl->fp[i] = get_file(ctx->user_files[i + offset]);
2311                 unix_inflight(fpl->user, fpl->fp[i]);
2312         }
2313
2314         fpl->max = fpl->count = nr;
2315         UNIXCB(skb).fp = fpl;
2316         refcount_add(skb->truesize, &sk->sk_wmem_alloc);
2317         skb_queue_head(&sk->sk_receive_queue, skb);
2318
2319         for (i = 0; i < nr; i++)
2320                 fput(fpl->fp[i]);
2321
2322         return 0;
2323 }
2324
2325 /*
2326  * If UNIX sockets are enabled, fd passing can cause a reference cycle which
2327  * causes regular reference counting to break down. We rely on the UNIX
2328  * garbage collection to take care of this problem for us.
2329  */
2330 static int io_sqe_files_scm(struct io_ring_ctx *ctx)
2331 {
2332         unsigned left, total;
2333         int ret = 0;
2334
2335         total = 0;
2336         left = ctx->nr_user_files;
2337         while (left) {
2338                 unsigned this_files = min_t(unsigned, left, SCM_MAX_FD);
2339
2340                 ret = __io_sqe_files_scm(ctx, this_files, total);
2341                 if (ret)
2342                         break;
2343                 left -= this_files;
2344                 total += this_files;
2345         }
2346
2347         if (!ret)
2348                 return 0;
2349
2350         while (total < ctx->nr_user_files) {
2351                 fput(ctx->user_files[total]);
2352                 total++;
2353         }
2354
2355         return ret;
2356 }
2357 #else
2358 static int io_sqe_files_scm(struct io_ring_ctx *ctx)
2359 {
2360         return 0;
2361 }
2362 #endif
2363
2364 static int io_sqe_files_register(struct io_ring_ctx *ctx, void __user *arg,
2365                                  unsigned nr_args)
2366 {
2367         __s32 __user *fds = (__s32 __user *) arg;
2368         int fd, ret = 0;
2369         unsigned i;
2370
2371         if (ctx->user_files)
2372                 return -EBUSY;
2373         if (!nr_args)
2374                 return -EINVAL;
2375         if (nr_args > IORING_MAX_FIXED_FILES)
2376                 return -EMFILE;
2377
2378         ctx->user_files = kcalloc(nr_args, sizeof(struct file *), GFP_KERNEL);
2379         if (!ctx->user_files)
2380                 return -ENOMEM;
2381
2382         for (i = 0; i < nr_args; i++) {
2383                 ret = -EFAULT;
2384                 if (copy_from_user(&fd, &fds[i], sizeof(fd)))
2385                         break;
2386
2387                 ctx->user_files[i] = fget(fd);
2388
2389                 ret = -EBADF;
2390                 if (!ctx->user_files[i])
2391                         break;
2392                 /*
2393                  * Don't allow io_uring instances to be registered. If UNIX
2394                  * isn't enabled, then this causes a reference cycle and this
2395                  * instance can never get freed. If UNIX is enabled we'll
2396                  * handle it just fine, but there's still no point in allowing
2397                  * a ring fd as it doesn't support regular read/write anyway.
2398                  */
2399                 if (ctx->user_files[i]->f_op == &io_uring_fops) {
2400                         fput(ctx->user_files[i]);
2401                         break;
2402                 }
2403                 ctx->nr_user_files++;
2404                 ret = 0;
2405         }
2406
2407         if (ret) {
2408                 for (i = 0; i < ctx->nr_user_files; i++)
2409                         fput(ctx->user_files[i]);
2410
2411                 kfree(ctx->user_files);
2412                 ctx->user_files = NULL;
2413                 ctx->nr_user_files = 0;
2414                 return ret;
2415         }
2416
2417         ret = io_sqe_files_scm(ctx);
2418         if (ret)
2419                 io_sqe_files_unregister(ctx);
2420
2421         return ret;
2422 }
2423
2424 static int io_sq_offload_start(struct io_ring_ctx *ctx,
2425                                struct io_uring_params *p)
2426 {
2427         int ret;
2428
2429         init_waitqueue_head(&ctx->sqo_wait);
2430         mmgrab(current->mm);
2431         ctx->sqo_mm = current->mm;
2432
2433         if (ctx->flags & IORING_SETUP_SQPOLL) {
2434                 ret = -EPERM;
2435                 if (!capable(CAP_SYS_ADMIN))
2436                         goto err;
2437
2438                 ctx->sq_thread_idle = msecs_to_jiffies(p->sq_thread_idle);
2439                 if (!ctx->sq_thread_idle)
2440                         ctx->sq_thread_idle = HZ;
2441
2442                 if (p->flags & IORING_SETUP_SQ_AFF) {
2443                         int cpu = p->sq_thread_cpu;
2444
2445                         ret = -EINVAL;
2446                         if (cpu >= nr_cpu_ids)
2447                                 goto err;
2448                         if (!cpu_online(cpu))
2449                                 goto err;
2450
2451                         ctx->sqo_thread = kthread_create_on_cpu(io_sq_thread,
2452                                                         ctx, cpu,
2453                                                         "io_uring-sq");
2454                 } else {
2455                         ctx->sqo_thread = kthread_create(io_sq_thread, ctx,
2456                                                         "io_uring-sq");
2457                 }
2458                 if (IS_ERR(ctx->sqo_thread)) {
2459                         ret = PTR_ERR(ctx->sqo_thread);
2460                         ctx->sqo_thread = NULL;
2461                         goto err;
2462                 }
2463                 wake_up_process(ctx->sqo_thread);
2464         } else if (p->flags & IORING_SETUP_SQ_AFF) {
2465                 /* Can't have SQ_AFF without SQPOLL */
2466                 ret = -EINVAL;
2467                 goto err;
2468         }
2469
2470         /* Do QD, or 2 * CPUS, whatever is smallest */
2471         ctx->sqo_wq = alloc_workqueue("io_ring-wq", WQ_UNBOUND | WQ_FREEZABLE,
2472                         min(ctx->sq_entries - 1, 2 * num_online_cpus()));
2473         if (!ctx->sqo_wq) {
2474                 ret = -ENOMEM;
2475                 goto err;
2476         }
2477
2478         return 0;
2479 err:
2480         io_sq_thread_stop(ctx);
2481         mmdrop(ctx->sqo_mm);
2482         ctx->sqo_mm = NULL;
2483         return ret;
2484 }
2485
2486 static void io_unaccount_mem(struct user_struct *user, unsigned long nr_pages)
2487 {
2488         atomic_long_sub(nr_pages, &user->locked_vm);
2489 }
2490
2491 static int io_account_mem(struct user_struct *user, unsigned long nr_pages)
2492 {
2493         unsigned long page_limit, cur_pages, new_pages;
2494
2495         /* Don't allow more pages than we can safely lock */
2496         page_limit = rlimit(RLIMIT_MEMLOCK) >> PAGE_SHIFT;
2497
2498         do {
2499                 cur_pages = atomic_long_read(&user->locked_vm);
2500                 new_pages = cur_pages + nr_pages;
2501                 if (new_pages > page_limit)
2502                         return -ENOMEM;
2503         } while (atomic_long_cmpxchg(&user->locked_vm, cur_pages,
2504                                         new_pages) != cur_pages);
2505
2506         return 0;
2507 }
2508
2509 static void io_mem_free(void *ptr)
2510 {
2511         struct page *page;
2512
2513         if (!ptr)
2514                 return;
2515
2516         page = virt_to_head_page(ptr);
2517         if (put_page_testzero(page))
2518                 free_compound_page(page);
2519 }
2520
2521 static void *io_mem_alloc(size_t size)
2522 {
2523         gfp_t gfp_flags = GFP_KERNEL | __GFP_ZERO | __GFP_NOWARN | __GFP_COMP |
2524                                 __GFP_NORETRY;
2525
2526         return (void *) __get_free_pages(gfp_flags, get_order(size));
2527 }
2528
2529 static unsigned long ring_pages(unsigned sq_entries, unsigned cq_entries)
2530 {
2531         struct io_sq_ring *sq_ring;
2532         struct io_cq_ring *cq_ring;
2533         size_t bytes;
2534
2535         bytes = struct_size(sq_ring, array, sq_entries);
2536         bytes += array_size(sizeof(struct io_uring_sqe), sq_entries);
2537         bytes += struct_size(cq_ring, cqes, cq_entries);
2538
2539         return (bytes + PAGE_SIZE - 1) / PAGE_SIZE;
2540 }
2541
2542 static int io_sqe_buffer_unregister(struct io_ring_ctx *ctx)
2543 {
2544         int i, j;
2545
2546         if (!ctx->user_bufs)
2547                 return -ENXIO;
2548
2549         for (i = 0; i < ctx->nr_user_bufs; i++) {
2550                 struct io_mapped_ubuf *imu = &ctx->user_bufs[i];
2551
2552                 for (j = 0; j < imu->nr_bvecs; j++)
2553                         put_page(imu->bvec[j].bv_page);
2554
2555                 if (ctx->account_mem)
2556                         io_unaccount_mem(ctx->user, imu->nr_bvecs);
2557                 kvfree(imu->bvec);
2558                 imu->nr_bvecs = 0;
2559         }
2560
2561         kfree(ctx->user_bufs);
2562         ctx->user_bufs = NULL;
2563         ctx->nr_user_bufs = 0;
2564         return 0;
2565 }
2566
2567 static int io_copy_iov(struct io_ring_ctx *ctx, struct iovec *dst,
2568                        void __user *arg, unsigned index)
2569 {
2570         struct iovec __user *src;
2571
2572 #ifdef CONFIG_COMPAT
2573         if (ctx->compat) {
2574                 struct compat_iovec __user *ciovs;
2575                 struct compat_iovec ciov;
2576
2577                 ciovs = (struct compat_iovec __user *) arg;
2578                 if (copy_from_user(&ciov, &ciovs[index], sizeof(ciov)))
2579                         return -EFAULT;
2580
2581                 dst->iov_base = (void __user *) (unsigned long) ciov.iov_base;
2582                 dst->iov_len = ciov.iov_len;
2583                 return 0;
2584         }
2585 #endif
2586         src = (struct iovec __user *) arg;
2587         if (copy_from_user(dst, &src[index], sizeof(*dst)))
2588                 return -EFAULT;
2589         return 0;
2590 }
2591
2592 static int io_sqe_buffer_register(struct io_ring_ctx *ctx, void __user *arg,
2593                                   unsigned nr_args)
2594 {
2595         struct vm_area_struct **vmas = NULL;
2596         struct page **pages = NULL;
2597         int i, j, got_pages = 0;
2598         int ret = -EINVAL;
2599
2600         if (ctx->user_bufs)
2601                 return -EBUSY;
2602         if (!nr_args || nr_args > UIO_MAXIOV)
2603                 return -EINVAL;
2604
2605         ctx->user_bufs = kcalloc(nr_args, sizeof(struct io_mapped_ubuf),
2606                                         GFP_KERNEL);
2607         if (!ctx->user_bufs)
2608                 return -ENOMEM;
2609
2610         for (i = 0; i < nr_args; i++) {
2611                 struct io_mapped_ubuf *imu = &ctx->user_bufs[i];
2612                 unsigned long off, start, end, ubuf;
2613                 int pret, nr_pages;
2614                 struct iovec iov;
2615                 size_t size;
2616
2617                 ret = io_copy_iov(ctx, &iov, arg, i);
2618                 if (ret)
2619                         break;
2620
2621                 /*
2622                  * Don't impose further limits on the size and buffer
2623                  * constraints here, we'll -EINVAL later when IO is
2624                  * submitted if they are wrong.
2625                  */
2626                 ret = -EFAULT;
2627                 if (!iov.iov_base || !iov.iov_len)
2628                         goto err;
2629
2630                 /* arbitrary limit, but we need something */
2631                 if (iov.iov_len > SZ_1G)
2632                         goto err;
2633
2634                 ubuf = (unsigned long) iov.iov_base;
2635                 end = (ubuf + iov.iov_len + PAGE_SIZE - 1) >> PAGE_SHIFT;
2636                 start = ubuf >> PAGE_SHIFT;
2637                 nr_pages = end - start;
2638
2639                 if (ctx->account_mem) {
2640                         ret = io_account_mem(ctx->user, nr_pages);
2641                         if (ret)
2642                                 goto err;
2643                 }
2644
2645                 ret = 0;
2646                 if (!pages || nr_pages > got_pages) {
2647                         kfree(vmas);
2648                         kfree(pages);
2649                         pages = kvmalloc_array(nr_pages, sizeof(struct page *),
2650                                                 GFP_KERNEL);
2651                         vmas = kvmalloc_array(nr_pages,
2652                                         sizeof(struct vm_area_struct *),
2653                                         GFP_KERNEL);
2654                         if (!pages || !vmas) {
2655                                 ret = -ENOMEM;
2656                                 if (ctx->account_mem)
2657                                         io_unaccount_mem(ctx->user, nr_pages);
2658                                 goto err;
2659                         }
2660                         got_pages = nr_pages;
2661                 }
2662
2663                 imu->bvec = kvmalloc_array(nr_pages, sizeof(struct bio_vec),
2664                                                 GFP_KERNEL);
2665                 ret = -ENOMEM;
2666                 if (!imu->bvec) {
2667                         if (ctx->account_mem)
2668                                 io_unaccount_mem(ctx->user, nr_pages);
2669                         goto err;
2670                 }
2671
2672                 ret = 0;
2673                 down_read(&current->mm->mmap_sem);
2674                 pret = get_user_pages(ubuf, nr_pages,
2675                                       FOLL_WRITE | FOLL_LONGTERM,
2676                                       pages, vmas);
2677                 if (pret == nr_pages) {
2678                         /* don't support file backed memory */
2679                         for (j = 0; j < nr_pages; j++) {
2680                                 struct vm_area_struct *vma = vmas[j];
2681
2682                                 if (vma->vm_file &&
2683                                     !is_file_hugepages(vma->vm_file)) {
2684                                         ret = -EOPNOTSUPP;
2685                                         break;
2686                                 }
2687                         }
2688                 } else {
2689                         ret = pret < 0 ? pret : -EFAULT;
2690                 }
2691                 up_read(&current->mm->mmap_sem);
2692                 if (ret) {
2693                         /*
2694                          * if we did partial map, or found file backed vmas,
2695                          * release any pages we did get
2696                          */
2697                         if (pret > 0) {
2698                                 for (j = 0; j < pret; j++)
2699                                         put_page(pages[j]);
2700                         }
2701                         if (ctx->account_mem)
2702                                 io_unaccount_mem(ctx->user, nr_pages);
2703                         kvfree(imu->bvec);
2704                         goto err;
2705                 }
2706
2707                 off = ubuf & ~PAGE_MASK;
2708                 size = iov.iov_len;
2709                 for (j = 0; j < nr_pages; j++) {
2710                         size_t vec_len;
2711
2712                         vec_len = min_t(size_t, size, PAGE_SIZE - off);
2713                         imu->bvec[j].bv_page = pages[j];
2714                         imu->bvec[j].bv_len = vec_len;
2715                         imu->bvec[j].bv_offset = off;
2716                         off = 0;
2717                         size -= vec_len;
2718                 }
2719                 /* store original address for later verification */
2720                 imu->ubuf = ubuf;
2721                 imu->len = iov.iov_len;
2722                 imu->nr_bvecs = nr_pages;
2723
2724                 ctx->nr_user_bufs++;
2725         }
2726         kvfree(pages);
2727         kvfree(vmas);
2728         return 0;
2729 err:
2730         kvfree(pages);
2731         kvfree(vmas);
2732         io_sqe_buffer_unregister(ctx);
2733         return ret;
2734 }
2735
2736 static int io_eventfd_register(struct io_ring_ctx *ctx, void __user *arg)
2737 {
2738         __s32 __user *fds = arg;
2739         int fd;
2740
2741         if (ctx->cq_ev_fd)
2742                 return -EBUSY;
2743
2744         if (copy_from_user(&fd, fds, sizeof(*fds)))
2745                 return -EFAULT;
2746
2747         ctx->cq_ev_fd = eventfd_ctx_fdget(fd);
2748         if (IS_ERR(ctx->cq_ev_fd)) {
2749                 int ret = PTR_ERR(ctx->cq_ev_fd);
2750                 ctx->cq_ev_fd = NULL;
2751                 return ret;
2752         }
2753
2754         return 0;
2755 }
2756
2757 static int io_eventfd_unregister(struct io_ring_ctx *ctx)
2758 {
2759         if (ctx->cq_ev_fd) {
2760                 eventfd_ctx_put(ctx->cq_ev_fd);
2761                 ctx->cq_ev_fd = NULL;
2762                 return 0;
2763         }
2764
2765         return -ENXIO;
2766 }
2767
2768 static void io_ring_ctx_free(struct io_ring_ctx *ctx)
2769 {
2770         io_finish_async(ctx);
2771         if (ctx->sqo_mm)
2772                 mmdrop(ctx->sqo_mm);
2773
2774         io_iopoll_reap_events(ctx);
2775         io_sqe_buffer_unregister(ctx);
2776         io_sqe_files_unregister(ctx);
2777         io_eventfd_unregister(ctx);
2778
2779 #if defined(CONFIG_UNIX)
2780         if (ctx->ring_sock)
2781                 sock_release(ctx->ring_sock);
2782 #endif
2783
2784         io_mem_free(ctx->sq_ring);
2785         io_mem_free(ctx->sq_sqes);
2786         io_mem_free(ctx->cq_ring);
2787
2788         percpu_ref_exit(&ctx->refs);
2789         if (ctx->account_mem)
2790                 io_unaccount_mem(ctx->user,
2791                                 ring_pages(ctx->sq_entries, ctx->cq_entries));
2792         free_uid(ctx->user);
2793         kfree(ctx);
2794 }
2795
2796 static __poll_t io_uring_poll(struct file *file, poll_table *wait)
2797 {
2798         struct io_ring_ctx *ctx = file->private_data;
2799         __poll_t mask = 0;
2800
2801         poll_wait(file, &ctx->cq_wait, wait);
2802         /*
2803          * synchronizes with barrier from wq_has_sleeper call in
2804          * io_commit_cqring
2805          */
2806         smp_rmb();
2807         if (READ_ONCE(ctx->sq_ring->r.tail) - ctx->cached_sq_head !=
2808             ctx->sq_ring->ring_entries)
2809                 mask |= EPOLLOUT | EPOLLWRNORM;
2810         if (READ_ONCE(ctx->cq_ring->r.head) != ctx->cached_cq_tail)
2811                 mask |= EPOLLIN | EPOLLRDNORM;
2812
2813         return mask;
2814 }
2815
2816 static int io_uring_fasync(int fd, struct file *file, int on)
2817 {
2818         struct io_ring_ctx *ctx = file->private_data;
2819
2820         return fasync_helper(fd, file, on, &ctx->cq_fasync);
2821 }
2822
2823 static void io_ring_ctx_wait_and_kill(struct io_ring_ctx *ctx)
2824 {
2825         mutex_lock(&ctx->uring_lock);
2826         percpu_ref_kill(&ctx->refs);
2827         mutex_unlock(&ctx->uring_lock);
2828
2829         io_poll_remove_all(ctx);
2830         io_iopoll_reap_events(ctx);
2831         wait_for_completion(&ctx->ctx_done);
2832         io_ring_ctx_free(ctx);
2833 }
2834
2835 static int io_uring_release(struct inode *inode, struct file *file)
2836 {
2837         struct io_ring_ctx *ctx = file->private_data;
2838
2839         file->private_data = NULL;
2840         io_ring_ctx_wait_and_kill(ctx);
2841         return 0;
2842 }
2843
2844 static int io_uring_mmap(struct file *file, struct vm_area_struct *vma)
2845 {
2846         loff_t offset = (loff_t) vma->vm_pgoff << PAGE_SHIFT;
2847         unsigned long sz = vma->vm_end - vma->vm_start;
2848         struct io_ring_ctx *ctx = file->private_data;
2849         unsigned long pfn;
2850         struct page *page;
2851         void *ptr;
2852
2853         switch (offset) {
2854         case IORING_OFF_SQ_RING:
2855                 ptr = ctx->sq_ring;
2856                 break;
2857         case IORING_OFF_SQES:
2858                 ptr = ctx->sq_sqes;
2859                 break;
2860         case IORING_OFF_CQ_RING:
2861                 ptr = ctx->cq_ring;
2862                 break;
2863         default:
2864                 return -EINVAL;
2865         }
2866
2867         page = virt_to_head_page(ptr);
2868         if (sz > (PAGE_SIZE << compound_order(page)))
2869                 return -EINVAL;
2870
2871         pfn = virt_to_phys(ptr) >> PAGE_SHIFT;
2872         return remap_pfn_range(vma, vma->vm_start, pfn, sz, vma->vm_page_prot);
2873 }
2874
2875 SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit,
2876                 u32, min_complete, u32, flags, const sigset_t __user *, sig,
2877                 size_t, sigsz)
2878 {
2879         struct io_ring_ctx *ctx;
2880         long ret = -EBADF;
2881         int submitted = 0;
2882         struct fd f;
2883
2884         if (flags & ~(IORING_ENTER_GETEVENTS | IORING_ENTER_SQ_WAKEUP))
2885                 return -EINVAL;
2886
2887         f = fdget(fd);
2888         if (!f.file)
2889                 return -EBADF;
2890
2891         ret = -EOPNOTSUPP;
2892         if (f.file->f_op != &io_uring_fops)
2893                 goto out_fput;
2894
2895         ret = -ENXIO;
2896         ctx = f.file->private_data;
2897         if (!percpu_ref_tryget(&ctx->refs))
2898                 goto out_fput;
2899
2900         /*
2901          * For SQ polling, the thread will do all submissions and completions.
2902          * Just return the requested submit count, and wake the thread if
2903          * we were asked to.
2904          */
2905         if (ctx->flags & IORING_SETUP_SQPOLL) {
2906                 if (flags & IORING_ENTER_SQ_WAKEUP)
2907                         wake_up(&ctx->sqo_wait);
2908                 submitted = to_submit;
2909                 goto out_ctx;
2910         }
2911
2912         ret = 0;
2913         if (to_submit) {
2914                 to_submit = min(to_submit, ctx->sq_entries);
2915
2916                 mutex_lock(&ctx->uring_lock);
2917                 submitted = io_ring_submit(ctx, to_submit);
2918                 mutex_unlock(&ctx->uring_lock);
2919         }
2920         if (flags & IORING_ENTER_GETEVENTS) {
2921                 unsigned nr_events = 0;
2922
2923                 min_complete = min(min_complete, ctx->cq_entries);
2924
2925                 if (ctx->flags & IORING_SETUP_IOPOLL) {
2926                         mutex_lock(&ctx->uring_lock);
2927                         ret = io_iopoll_check(ctx, &nr_events, min_complete);
2928                         mutex_unlock(&ctx->uring_lock);
2929                 } else {
2930                         ret = io_cqring_wait(ctx, min_complete, sig, sigsz);
2931                 }
2932         }
2933
2934 out_ctx:
2935         io_ring_drop_ctx_refs(ctx, 1);
2936 out_fput:
2937         fdput(f);
2938         return submitted ? submitted : ret;
2939 }
2940
2941 static const struct file_operations io_uring_fops = {
2942         .release        = io_uring_release,
2943         .mmap           = io_uring_mmap,
2944         .poll           = io_uring_poll,
2945         .fasync         = io_uring_fasync,
2946 };
2947
2948 static int io_allocate_scq_urings(struct io_ring_ctx *ctx,
2949                                   struct io_uring_params *p)
2950 {
2951         struct io_sq_ring *sq_ring;
2952         struct io_cq_ring *cq_ring;
2953         size_t size;
2954
2955         sq_ring = io_mem_alloc(struct_size(sq_ring, array, p->sq_entries));
2956         if (!sq_ring)
2957                 return -ENOMEM;
2958
2959         ctx->sq_ring = sq_ring;
2960         sq_ring->ring_mask = p->sq_entries - 1;
2961         sq_ring->ring_entries = p->sq_entries;
2962         ctx->sq_mask = sq_ring->ring_mask;
2963         ctx->sq_entries = sq_ring->ring_entries;
2964
2965         size = array_size(sizeof(struct io_uring_sqe), p->sq_entries);
2966         if (size == SIZE_MAX)
2967                 return -EOVERFLOW;
2968
2969         ctx->sq_sqes = io_mem_alloc(size);
2970         if (!ctx->sq_sqes)
2971                 return -ENOMEM;
2972
2973         cq_ring = io_mem_alloc(struct_size(cq_ring, cqes, p->cq_entries));
2974         if (!cq_ring)
2975                 return -ENOMEM;
2976
2977         ctx->cq_ring = cq_ring;
2978         cq_ring->ring_mask = p->cq_entries - 1;
2979         cq_ring->ring_entries = p->cq_entries;
2980         ctx->cq_mask = cq_ring->ring_mask;
2981         ctx->cq_entries = cq_ring->ring_entries;
2982         return 0;
2983 }
2984
2985 /*
2986  * Allocate an anonymous fd, this is what constitutes the application
2987  * visible backing of an io_uring instance. The application mmaps this
2988  * fd to gain access to the SQ/CQ ring details. If UNIX sockets are enabled,
2989  * we have to tie this fd to a socket for file garbage collection purposes.
2990  */
2991 static int io_uring_get_fd(struct io_ring_ctx *ctx)
2992 {
2993         struct file *file;
2994         int ret;
2995
2996 #if defined(CONFIG_UNIX)
2997         ret = sock_create_kern(&init_net, PF_UNIX, SOCK_RAW, IPPROTO_IP,
2998                                 &ctx->ring_sock);
2999         if (ret)
3000                 return ret;
3001 #endif
3002
3003         ret = get_unused_fd_flags(O_RDWR | O_CLOEXEC);
3004         if (ret < 0)
3005                 goto err;
3006
3007         file = anon_inode_getfile("[io_uring]", &io_uring_fops, ctx,
3008                                         O_RDWR | O_CLOEXEC);
3009         if (IS_ERR(file)) {
3010                 put_unused_fd(ret);
3011                 ret = PTR_ERR(file);
3012                 goto err;
3013         }
3014
3015 #if defined(CONFIG_UNIX)
3016         ctx->ring_sock->file = file;
3017         ctx->ring_sock->sk->sk_user_data = ctx;
3018 #endif
3019         fd_install(ret, file);
3020         return ret;
3021 err:
3022 #if defined(CONFIG_UNIX)
3023         sock_release(ctx->ring_sock);
3024         ctx->ring_sock = NULL;
3025 #endif
3026         return ret;
3027 }
3028
3029 static int io_uring_create(unsigned entries, struct io_uring_params *p)
3030 {
3031         struct user_struct *user = NULL;
3032         struct io_ring_ctx *ctx;
3033         bool account_mem;
3034         int ret;
3035
3036         if (!entries || entries > IORING_MAX_ENTRIES)
3037                 return -EINVAL;
3038
3039         /*
3040          * Use twice as many entries for the CQ ring. It's possible for the
3041          * application to drive a higher depth than the size of the SQ ring,
3042          * since the sqes are only used at submission time. This allows for
3043          * some flexibility in overcommitting a bit.
3044          */
3045         p->sq_entries = roundup_pow_of_two(entries);
3046         p->cq_entries = 2 * p->sq_entries;
3047
3048         user = get_uid(current_user());
3049         account_mem = !capable(CAP_IPC_LOCK);
3050
3051         if (account_mem) {
3052                 ret = io_account_mem(user,
3053                                 ring_pages(p->sq_entries, p->cq_entries));
3054                 if (ret) {
3055                         free_uid(user);
3056                         return ret;
3057                 }
3058         }
3059
3060         ctx = io_ring_ctx_alloc(p);
3061         if (!ctx) {
3062                 if (account_mem)
3063                         io_unaccount_mem(user, ring_pages(p->sq_entries,
3064                                                                 p->cq_entries));
3065                 free_uid(user);
3066                 return -ENOMEM;
3067         }
3068         ctx->compat = in_compat_syscall();
3069         ctx->account_mem = account_mem;
3070         ctx->user = user;
3071
3072         ret = io_allocate_scq_urings(ctx, p);
3073         if (ret)
3074                 goto err;
3075
3076         ret = io_sq_offload_start(ctx, p);
3077         if (ret)
3078                 goto err;
3079
3080         ret = io_uring_get_fd(ctx);
3081         if (ret < 0)
3082                 goto err;
3083
3084         memset(&p->sq_off, 0, sizeof(p->sq_off));
3085         p->sq_off.head = offsetof(struct io_sq_ring, r.head);
3086         p->sq_off.tail = offsetof(struct io_sq_ring, r.tail);
3087         p->sq_off.ring_mask = offsetof(struct io_sq_ring, ring_mask);
3088         p->sq_off.ring_entries = offsetof(struct io_sq_ring, ring_entries);
3089         p->sq_off.flags = offsetof(struct io_sq_ring, flags);
3090         p->sq_off.dropped = offsetof(struct io_sq_ring, dropped);
3091         p->sq_off.array = offsetof(struct io_sq_ring, array);
3092
3093         memset(&p->cq_off, 0, sizeof(p->cq_off));
3094         p->cq_off.head = offsetof(struct io_cq_ring, r.head);
3095         p->cq_off.tail = offsetof(struct io_cq_ring, r.tail);
3096         p->cq_off.ring_mask = offsetof(struct io_cq_ring, ring_mask);
3097         p->cq_off.ring_entries = offsetof(struct io_cq_ring, ring_entries);
3098         p->cq_off.overflow = offsetof(struct io_cq_ring, overflow);
3099         p->cq_off.cqes = offsetof(struct io_cq_ring, cqes);
3100         return ret;
3101 err:
3102         io_ring_ctx_wait_and_kill(ctx);
3103         return ret;
3104 }
3105
3106 /*
3107  * Sets up an aio uring context, and returns the fd. Applications asks for a
3108  * ring size, we return the actual sq/cq ring sizes (among other things) in the
3109  * params structure passed in.
3110  */
3111 static long io_uring_setup(u32 entries, struct io_uring_params __user *params)
3112 {
3113         struct io_uring_params p;
3114         long ret;
3115         int i;
3116
3117         if (copy_from_user(&p, params, sizeof(p)))
3118                 return -EFAULT;
3119         for (i = 0; i < ARRAY_SIZE(p.resv); i++) {
3120                 if (p.resv[i])
3121                         return -EINVAL;
3122         }
3123
3124         if (p.flags & ~(IORING_SETUP_IOPOLL | IORING_SETUP_SQPOLL |
3125                         IORING_SETUP_SQ_AFF))
3126                 return -EINVAL;
3127
3128         ret = io_uring_create(entries, &p);
3129         if (ret < 0)
3130                 return ret;
3131
3132         if (copy_to_user(params, &p, sizeof(p)))
3133                 return -EFAULT;
3134
3135         return ret;
3136 }
3137
3138 SYSCALL_DEFINE2(io_uring_setup, u32, entries,
3139                 struct io_uring_params __user *, params)
3140 {
3141         return io_uring_setup(entries, params);
3142 }
3143
3144 static int __io_uring_register(struct io_ring_ctx *ctx, unsigned opcode,
3145                                void __user *arg, unsigned nr_args)
3146         __releases(ctx->uring_lock)
3147         __acquires(ctx->uring_lock)
3148 {
3149         int ret;
3150
3151         /*
3152          * We're inside the ring mutex, if the ref is already dying, then
3153          * someone else killed the ctx or is already going through
3154          * io_uring_register().
3155          */
3156         if (percpu_ref_is_dying(&ctx->refs))
3157                 return -ENXIO;
3158
3159         percpu_ref_kill(&ctx->refs);
3160
3161         /*
3162          * Drop uring mutex before waiting for references to exit. If another
3163          * thread is currently inside io_uring_enter() it might need to grab
3164          * the uring_lock to make progress. If we hold it here across the drain
3165          * wait, then we can deadlock. It's safe to drop the mutex here, since
3166          * no new references will come in after we've killed the percpu ref.
3167          */
3168         mutex_unlock(&ctx->uring_lock);
3169         wait_for_completion(&ctx->ctx_done);
3170         mutex_lock(&ctx->uring_lock);
3171
3172         switch (opcode) {
3173         case IORING_REGISTER_BUFFERS:
3174                 ret = io_sqe_buffer_register(ctx, arg, nr_args);
3175                 break;
3176         case IORING_UNREGISTER_BUFFERS:
3177                 ret = -EINVAL;
3178                 if (arg || nr_args)
3179                         break;
3180                 ret = io_sqe_buffer_unregister(ctx);
3181                 break;
3182         case IORING_REGISTER_FILES:
3183                 ret = io_sqe_files_register(ctx, arg, nr_args);
3184                 break;
3185         case IORING_UNREGISTER_FILES:
3186                 ret = -EINVAL;
3187                 if (arg || nr_args)
3188                         break;
3189                 ret = io_sqe_files_unregister(ctx);
3190                 break;
3191         case IORING_REGISTER_EVENTFD:
3192                 ret = -EINVAL;
3193                 if (nr_args != 1)
3194                         break;
3195                 ret = io_eventfd_register(ctx, arg);
3196                 break;
3197         case IORING_UNREGISTER_EVENTFD:
3198                 ret = -EINVAL;
3199                 if (arg || nr_args)
3200                         break;
3201                 ret = io_eventfd_unregister(ctx);
3202                 break;
3203         default:
3204                 ret = -EINVAL;
3205                 break;
3206         }
3207
3208         /* bring the ctx back to life */
3209         reinit_completion(&ctx->ctx_done);
3210         percpu_ref_reinit(&ctx->refs);
3211         return ret;
3212 }
3213
3214 SYSCALL_DEFINE4(io_uring_register, unsigned int, fd, unsigned int, opcode,
3215                 void __user *, arg, unsigned int, nr_args)
3216 {
3217         struct io_ring_ctx *ctx;
3218         long ret = -EBADF;
3219         struct fd f;
3220
3221         f = fdget(fd);
3222         if (!f.file)
3223                 return -EBADF;
3224
3225         ret = -EOPNOTSUPP;
3226         if (f.file->f_op != &io_uring_fops)
3227                 goto out_fput;
3228
3229         ctx = f.file->private_data;
3230
3231         mutex_lock(&ctx->uring_lock);
3232         ret = __io_uring_register(ctx, opcode, arg, nr_args);
3233         mutex_unlock(&ctx->uring_lock);
3234 out_fput:
3235         fdput(f);
3236         return ret;
3237 }
3238
3239 static int __init io_uring_init(void)
3240 {
3241         req_cachep = KMEM_CACHE(io_kiocb, SLAB_HWCACHE_ALIGN | SLAB_PANIC);
3242         return 0;
3243 };
3244 __initcall(io_uring_init);