OSDN Git Service

io_uring: punt short reads to async context
[tomoyo/tomoyo-test1.git] / fs / io_uring.c
1 // SPDX-License-Identifier: GPL-2.0
2 /*
3  * Shared application/kernel submission and completion ring pairs, for
4  * supporting fast/efficient IO.
5  *
6  * A note on the read/write ordering memory barriers that are matched between
7  * the application and kernel side.
8  *
9  * After the application reads the CQ ring tail, it must use an
10  * appropriate smp_rmb() to pair with the smp_wmb() the kernel uses
11  * before writing the tail (using smp_load_acquire to read the tail will
12  * do). It also needs a smp_mb() before updating CQ head (ordering the
13  * entry load(s) with the head store), pairing with an implicit barrier
14  * through a control-dependency in io_get_cqring (smp_store_release to
15  * store head will do). Failure to do so could lead to reading invalid
16  * CQ entries.
17  *
18  * Likewise, the application must use an appropriate smp_wmb() before
19  * writing the SQ tail (ordering SQ entry stores with the tail store),
20  * which pairs with smp_load_acquire in io_get_sqring (smp_store_release
21  * to store the tail will do). And it needs a barrier ordering the SQ
22  * head load before writing new SQ entries (smp_load_acquire to read
23  * head will do).
24  *
25  * When using the SQ poll thread (IORING_SETUP_SQPOLL), the application
26  * needs to check the SQ flags for IORING_SQ_NEED_WAKEUP *after*
27  * updating the SQ tail; a full memory barrier smp_mb() is needed
28  * between.
29  *
30  * Also see the examples in the liburing library:
31  *
32  *      git://git.kernel.dk/liburing
33  *
34  * io_uring also uses READ/WRITE_ONCE() for _any_ store or load that happens
35  * from data shared between the kernel and application. This is done both
36  * for ordering purposes, but also to ensure that once a value is loaded from
37  * data that the application could potentially modify, it remains stable.
38  *
39  * Copyright (C) 2018-2019 Jens Axboe
40  * Copyright (c) 2018-2019 Christoph Hellwig
41  */
42 #include <linux/kernel.h>
43 #include <linux/init.h>
44 #include <linux/errno.h>
45 #include <linux/syscalls.h>
46 #include <linux/compat.h>
47 #include <linux/refcount.h>
48 #include <linux/uio.h>
49
50 #include <linux/sched/signal.h>
51 #include <linux/fs.h>
52 #include <linux/file.h>
53 #include <linux/fdtable.h>
54 #include <linux/mm.h>
55 #include <linux/mman.h>
56 #include <linux/mmu_context.h>
57 #include <linux/percpu.h>
58 #include <linux/slab.h>
59 #include <linux/workqueue.h>
60 #include <linux/kthread.h>
61 #include <linux/blkdev.h>
62 #include <linux/bvec.h>
63 #include <linux/net.h>
64 #include <net/sock.h>
65 #include <net/af_unix.h>
66 #include <net/scm.h>
67 #include <linux/anon_inodes.h>
68 #include <linux/sched/mm.h>
69 #include <linux/uaccess.h>
70 #include <linux/nospec.h>
71 #include <linux/sizes.h>
72 #include <linux/hugetlb.h>
73
74 #include <uapi/linux/io_uring.h>
75
76 #include "internal.h"
77
78 #define IORING_MAX_ENTRIES      4096
79 #define IORING_MAX_FIXED_FILES  1024
80
81 struct io_uring {
82         u32 head ____cacheline_aligned_in_smp;
83         u32 tail ____cacheline_aligned_in_smp;
84 };
85
86 /*
87  * This data is shared with the application through the mmap at offset
88  * IORING_OFF_SQ_RING.
89  *
90  * The offsets to the member fields are published through struct
91  * io_sqring_offsets when calling io_uring_setup.
92  */
93 struct io_sq_ring {
94         /*
95          * Head and tail offsets into the ring; the offsets need to be
96          * masked to get valid indices.
97          *
98          * The kernel controls head and the application controls tail.
99          */
100         struct io_uring         r;
101         /*
102          * Bitmask to apply to head and tail offsets (constant, equals
103          * ring_entries - 1)
104          */
105         u32                     ring_mask;
106         /* Ring size (constant, power of 2) */
107         u32                     ring_entries;
108         /*
109          * Number of invalid entries dropped by the kernel due to
110          * invalid index stored in array
111          *
112          * Written by the kernel, shouldn't be modified by the
113          * application (i.e. get number of "new events" by comparing to
114          * cached value).
115          *
116          * After a new SQ head value was read by the application this
117          * counter includes all submissions that were dropped reaching
118          * the new SQ head (and possibly more).
119          */
120         u32                     dropped;
121         /*
122          * Runtime flags
123          *
124          * Written by the kernel, shouldn't be modified by the
125          * application.
126          *
127          * The application needs a full memory barrier before checking
128          * for IORING_SQ_NEED_WAKEUP after updating the sq tail.
129          */
130         u32                     flags;
131         /*
132          * Ring buffer of indices into array of io_uring_sqe, which is
133          * mmapped by the application using the IORING_OFF_SQES offset.
134          *
135          * This indirection could e.g. be used to assign fixed
136          * io_uring_sqe entries to operations and only submit them to
137          * the queue when needed.
138          *
139          * The kernel modifies neither the indices array nor the entries
140          * array.
141          */
142         u32                     array[];
143 };
144
145 /*
146  * This data is shared with the application through the mmap at offset
147  * IORING_OFF_CQ_RING.
148  *
149  * The offsets to the member fields are published through struct
150  * io_cqring_offsets when calling io_uring_setup.
151  */
152 struct io_cq_ring {
153         /*
154          * Head and tail offsets into the ring; the offsets need to be
155          * masked to get valid indices.
156          *
157          * The application controls head and the kernel tail.
158          */
159         struct io_uring         r;
160         /*
161          * Bitmask to apply to head and tail offsets (constant, equals
162          * ring_entries - 1)
163          */
164         u32                     ring_mask;
165         /* Ring size (constant, power of 2) */
166         u32                     ring_entries;
167         /*
168          * Number of completion events lost because the queue was full;
169          * this should be avoided by the application by making sure
170          * there are not more requests pending thatn there is space in
171          * the completion queue.
172          *
173          * Written by the kernel, shouldn't be modified by the
174          * application (i.e. get number of "new events" by comparing to
175          * cached value).
176          *
177          * As completion events come in out of order this counter is not
178          * ordered with any other data.
179          */
180         u32                     overflow;
181         /*
182          * Ring buffer of completion events.
183          *
184          * The kernel writes completion events fresh every time they are
185          * produced, so the application is allowed to modify pending
186          * entries.
187          */
188         struct io_uring_cqe     cqes[];
189 };
190
191 struct io_mapped_ubuf {
192         u64             ubuf;
193         size_t          len;
194         struct          bio_vec *bvec;
195         unsigned int    nr_bvecs;
196 };
197
198 struct async_list {
199         spinlock_t              lock;
200         atomic_t                cnt;
201         struct list_head        list;
202
203         struct file             *file;
204         off_t                   io_end;
205         size_t                  io_pages;
206 };
207
208 struct io_ring_ctx {
209         struct {
210                 struct percpu_ref       refs;
211         } ____cacheline_aligned_in_smp;
212
213         struct {
214                 unsigned int            flags;
215                 bool                    compat;
216                 bool                    account_mem;
217
218                 /* SQ ring */
219                 struct io_sq_ring       *sq_ring;
220                 unsigned                cached_sq_head;
221                 unsigned                sq_entries;
222                 unsigned                sq_mask;
223                 unsigned                sq_thread_idle;
224                 struct io_uring_sqe     *sq_sqes;
225
226                 struct list_head        defer_list;
227         } ____cacheline_aligned_in_smp;
228
229         /* IO offload */
230         struct workqueue_struct *sqo_wq;
231         struct task_struct      *sqo_thread;    /* if using sq thread polling */
232         struct mm_struct        *sqo_mm;
233         wait_queue_head_t       sqo_wait;
234
235         struct {
236                 /* CQ ring */
237                 struct io_cq_ring       *cq_ring;
238                 unsigned                cached_cq_tail;
239                 unsigned                cq_entries;
240                 unsigned                cq_mask;
241                 struct wait_queue_head  cq_wait;
242                 struct fasync_struct    *cq_fasync;
243                 struct eventfd_ctx      *cq_ev_fd;
244         } ____cacheline_aligned_in_smp;
245
246         /*
247          * If used, fixed file set. Writers must ensure that ->refs is dead,
248          * readers must ensure that ->refs is alive as long as the file* is
249          * used. Only updated through io_uring_register(2).
250          */
251         struct file             **user_files;
252         unsigned                nr_user_files;
253
254         /* if used, fixed mapped user buffers */
255         unsigned                nr_user_bufs;
256         struct io_mapped_ubuf   *user_bufs;
257
258         struct user_struct      *user;
259
260         struct completion       ctx_done;
261
262         struct {
263                 struct mutex            uring_lock;
264                 wait_queue_head_t       wait;
265         } ____cacheline_aligned_in_smp;
266
267         struct {
268                 spinlock_t              completion_lock;
269                 bool                    poll_multi_file;
270                 /*
271                  * ->poll_list is protected by the ctx->uring_lock for
272                  * io_uring instances that don't use IORING_SETUP_SQPOLL.
273                  * For SQPOLL, only the single threaded io_sq_thread() will
274                  * manipulate the list, hence no extra locking is needed there.
275                  */
276                 struct list_head        poll_list;
277                 struct list_head        cancel_list;
278         } ____cacheline_aligned_in_smp;
279
280         struct async_list       pending_async[2];
281
282 #if defined(CONFIG_UNIX)
283         struct socket           *ring_sock;
284 #endif
285 };
286
287 struct sqe_submit {
288         const struct io_uring_sqe       *sqe;
289         unsigned short                  index;
290         bool                            has_user;
291         bool                            needs_lock;
292         bool                            needs_fixed_file;
293 };
294
295 /*
296  * First field must be the file pointer in all the
297  * iocb unions! See also 'struct kiocb' in <linux/fs.h>
298  */
299 struct io_poll_iocb {
300         struct file                     *file;
301         struct wait_queue_head          *head;
302         __poll_t                        events;
303         bool                            done;
304         bool                            canceled;
305         struct wait_queue_entry         wait;
306 };
307
308 /*
309  * NOTE! Each of the iocb union members has the file pointer
310  * as the first entry in their struct definition. So you can
311  * access the file pointer through any of the sub-structs,
312  * or directly as just 'ki_filp' in this struct.
313  */
314 struct io_kiocb {
315         union {
316                 struct file             *file;
317                 struct kiocb            rw;
318                 struct io_poll_iocb     poll;
319         };
320
321         struct sqe_submit       submit;
322
323         struct io_ring_ctx      *ctx;
324         struct list_head        list;
325         unsigned int            flags;
326         refcount_t              refs;
327 #define REQ_F_NOWAIT            1       /* must not punt to workers */
328 #define REQ_F_IOPOLL_COMPLETED  2       /* polled IO has completed */
329 #define REQ_F_FIXED_FILE        4       /* ctx owns file */
330 #define REQ_F_SEQ_PREV          8       /* sequential with previous */
331 #define REQ_F_IO_DRAIN          16      /* drain existing IO first */
332 #define REQ_F_IO_DRAINED        32      /* drain done */
333         u64                     user_data;
334         u32                     error;  /* iopoll result from callback */
335         u32                     sequence;
336
337         struct work_struct      work;
338 };
339
340 #define IO_PLUG_THRESHOLD               2
341 #define IO_IOPOLL_BATCH                 8
342
343 struct io_submit_state {
344         struct blk_plug         plug;
345
346         /*
347          * io_kiocb alloc cache
348          */
349         void                    *reqs[IO_IOPOLL_BATCH];
350         unsigned                int free_reqs;
351         unsigned                int cur_req;
352
353         /*
354          * File reference cache
355          */
356         struct file             *file;
357         unsigned int            fd;
358         unsigned int            has_refs;
359         unsigned int            used_refs;
360         unsigned int            ios_left;
361 };
362
363 static void io_sq_wq_submit_work(struct work_struct *work);
364
365 static struct kmem_cache *req_cachep;
366
367 static const struct file_operations io_uring_fops;
368
369 struct sock *io_uring_get_socket(struct file *file)
370 {
371 #if defined(CONFIG_UNIX)
372         if (file->f_op == &io_uring_fops) {
373                 struct io_ring_ctx *ctx = file->private_data;
374
375                 return ctx->ring_sock->sk;
376         }
377 #endif
378         return NULL;
379 }
380 EXPORT_SYMBOL(io_uring_get_socket);
381
382 static void io_ring_ctx_ref_free(struct percpu_ref *ref)
383 {
384         struct io_ring_ctx *ctx = container_of(ref, struct io_ring_ctx, refs);
385
386         complete(&ctx->ctx_done);
387 }
388
389 static struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
390 {
391         struct io_ring_ctx *ctx;
392         int i;
393
394         ctx = kzalloc(sizeof(*ctx), GFP_KERNEL);
395         if (!ctx)
396                 return NULL;
397
398         if (percpu_ref_init(&ctx->refs, io_ring_ctx_ref_free, 0, GFP_KERNEL)) {
399                 kfree(ctx);
400                 return NULL;
401         }
402
403         ctx->flags = p->flags;
404         init_waitqueue_head(&ctx->cq_wait);
405         init_completion(&ctx->ctx_done);
406         mutex_init(&ctx->uring_lock);
407         init_waitqueue_head(&ctx->wait);
408         for (i = 0; i < ARRAY_SIZE(ctx->pending_async); i++) {
409                 spin_lock_init(&ctx->pending_async[i].lock);
410                 INIT_LIST_HEAD(&ctx->pending_async[i].list);
411                 atomic_set(&ctx->pending_async[i].cnt, 0);
412         }
413         spin_lock_init(&ctx->completion_lock);
414         INIT_LIST_HEAD(&ctx->poll_list);
415         INIT_LIST_HEAD(&ctx->cancel_list);
416         INIT_LIST_HEAD(&ctx->defer_list);
417         return ctx;
418 }
419
420 static inline bool io_sequence_defer(struct io_ring_ctx *ctx,
421                                      struct io_kiocb *req)
422 {
423         if ((req->flags & (REQ_F_IO_DRAIN|REQ_F_IO_DRAINED)) != REQ_F_IO_DRAIN)
424                 return false;
425
426         return req->sequence > ctx->cached_cq_tail + ctx->sq_ring->dropped;
427 }
428
429 static struct io_kiocb *io_get_deferred_req(struct io_ring_ctx *ctx)
430 {
431         struct io_kiocb *req;
432
433         if (list_empty(&ctx->defer_list))
434                 return NULL;
435
436         req = list_first_entry(&ctx->defer_list, struct io_kiocb, list);
437         if (!io_sequence_defer(ctx, req)) {
438                 list_del_init(&req->list);
439                 return req;
440         }
441
442         return NULL;
443 }
444
445 static void __io_commit_cqring(struct io_ring_ctx *ctx)
446 {
447         struct io_cq_ring *ring = ctx->cq_ring;
448
449         if (ctx->cached_cq_tail != READ_ONCE(ring->r.tail)) {
450                 /* order cqe stores with ring update */
451                 smp_store_release(&ring->r.tail, ctx->cached_cq_tail);
452
453                 if (wq_has_sleeper(&ctx->cq_wait)) {
454                         wake_up_interruptible(&ctx->cq_wait);
455                         kill_fasync(&ctx->cq_fasync, SIGIO, POLL_IN);
456                 }
457         }
458 }
459
460 static void io_commit_cqring(struct io_ring_ctx *ctx)
461 {
462         struct io_kiocb *req;
463
464         __io_commit_cqring(ctx);
465
466         while ((req = io_get_deferred_req(ctx)) != NULL) {
467                 req->flags |= REQ_F_IO_DRAINED;
468                 queue_work(ctx->sqo_wq, &req->work);
469         }
470 }
471
472 static struct io_uring_cqe *io_get_cqring(struct io_ring_ctx *ctx)
473 {
474         struct io_cq_ring *ring = ctx->cq_ring;
475         unsigned tail;
476
477         tail = ctx->cached_cq_tail;
478         /*
479          * writes to the cq entry need to come after reading head; the
480          * control dependency is enough as we're using WRITE_ONCE to
481          * fill the cq entry
482          */
483         if (tail - READ_ONCE(ring->r.head) == ring->ring_entries)
484                 return NULL;
485
486         ctx->cached_cq_tail++;
487         return &ring->cqes[tail & ctx->cq_mask];
488 }
489
490 static void io_cqring_fill_event(struct io_ring_ctx *ctx, u64 ki_user_data,
491                                  long res)
492 {
493         struct io_uring_cqe *cqe;
494
495         /*
496          * If we can't get a cq entry, userspace overflowed the
497          * submission (by quite a lot). Increment the overflow count in
498          * the ring.
499          */
500         cqe = io_get_cqring(ctx);
501         if (cqe) {
502                 WRITE_ONCE(cqe->user_data, ki_user_data);
503                 WRITE_ONCE(cqe->res, res);
504                 WRITE_ONCE(cqe->flags, 0);
505         } else {
506                 unsigned overflow = READ_ONCE(ctx->cq_ring->overflow);
507
508                 WRITE_ONCE(ctx->cq_ring->overflow, overflow + 1);
509         }
510 }
511
512 static void io_cqring_ev_posted(struct io_ring_ctx *ctx)
513 {
514         if (waitqueue_active(&ctx->wait))
515                 wake_up(&ctx->wait);
516         if (waitqueue_active(&ctx->sqo_wait))
517                 wake_up(&ctx->sqo_wait);
518         if (ctx->cq_ev_fd)
519                 eventfd_signal(ctx->cq_ev_fd, 1);
520 }
521
522 static void io_cqring_add_event(struct io_ring_ctx *ctx, u64 user_data,
523                                 long res)
524 {
525         unsigned long flags;
526
527         spin_lock_irqsave(&ctx->completion_lock, flags);
528         io_cqring_fill_event(ctx, user_data, res);
529         io_commit_cqring(ctx);
530         spin_unlock_irqrestore(&ctx->completion_lock, flags);
531
532         io_cqring_ev_posted(ctx);
533 }
534
535 static void io_ring_drop_ctx_refs(struct io_ring_ctx *ctx, unsigned refs)
536 {
537         percpu_ref_put_many(&ctx->refs, refs);
538
539         if (waitqueue_active(&ctx->wait))
540                 wake_up(&ctx->wait);
541 }
542
543 static struct io_kiocb *io_get_req(struct io_ring_ctx *ctx,
544                                    struct io_submit_state *state)
545 {
546         gfp_t gfp = GFP_KERNEL | __GFP_NOWARN;
547         struct io_kiocb *req;
548
549         if (!percpu_ref_tryget(&ctx->refs))
550                 return NULL;
551
552         if (!state) {
553                 req = kmem_cache_alloc(req_cachep, gfp);
554                 if (unlikely(!req))
555                         goto out;
556         } else if (!state->free_reqs) {
557                 size_t sz;
558                 int ret;
559
560                 sz = min_t(size_t, state->ios_left, ARRAY_SIZE(state->reqs));
561                 ret = kmem_cache_alloc_bulk(req_cachep, gfp, sz, state->reqs);
562
563                 /*
564                  * Bulk alloc is all-or-nothing. If we fail to get a batch,
565                  * retry single alloc to be on the safe side.
566                  */
567                 if (unlikely(ret <= 0)) {
568                         state->reqs[0] = kmem_cache_alloc(req_cachep, gfp);
569                         if (!state->reqs[0])
570                                 goto out;
571                         ret = 1;
572                 }
573                 state->free_reqs = ret - 1;
574                 state->cur_req = 1;
575                 req = state->reqs[0];
576         } else {
577                 req = state->reqs[state->cur_req];
578                 state->free_reqs--;
579                 state->cur_req++;
580         }
581
582         req->ctx = ctx;
583         req->flags = 0;
584         /* one is dropped after submission, the other at completion */
585         refcount_set(&req->refs, 2);
586         return req;
587 out:
588         io_ring_drop_ctx_refs(ctx, 1);
589         return NULL;
590 }
591
592 static void io_free_req_many(struct io_ring_ctx *ctx, void **reqs, int *nr)
593 {
594         if (*nr) {
595                 kmem_cache_free_bulk(req_cachep, *nr, reqs);
596                 io_ring_drop_ctx_refs(ctx, *nr);
597                 *nr = 0;
598         }
599 }
600
601 static void io_free_req(struct io_kiocb *req)
602 {
603         if (req->file && !(req->flags & REQ_F_FIXED_FILE))
604                 fput(req->file);
605         io_ring_drop_ctx_refs(req->ctx, 1);
606         kmem_cache_free(req_cachep, req);
607 }
608
609 static void io_put_req(struct io_kiocb *req)
610 {
611         if (refcount_dec_and_test(&req->refs))
612                 io_free_req(req);
613 }
614
615 /*
616  * Find and free completed poll iocbs
617  */
618 static void io_iopoll_complete(struct io_ring_ctx *ctx, unsigned int *nr_events,
619                                struct list_head *done)
620 {
621         void *reqs[IO_IOPOLL_BATCH];
622         struct io_kiocb *req;
623         int to_free;
624
625         to_free = 0;
626         while (!list_empty(done)) {
627                 req = list_first_entry(done, struct io_kiocb, list);
628                 list_del(&req->list);
629
630                 io_cqring_fill_event(ctx, req->user_data, req->error);
631                 (*nr_events)++;
632
633                 if (refcount_dec_and_test(&req->refs)) {
634                         /* If we're not using fixed files, we have to pair the
635                          * completion part with the file put. Use regular
636                          * completions for those, only batch free for fixed
637                          * file.
638                          */
639                         if (req->flags & REQ_F_FIXED_FILE) {
640                                 reqs[to_free++] = req;
641                                 if (to_free == ARRAY_SIZE(reqs))
642                                         io_free_req_many(ctx, reqs, &to_free);
643                         } else {
644                                 io_free_req(req);
645                         }
646                 }
647         }
648
649         io_commit_cqring(ctx);
650         io_free_req_many(ctx, reqs, &to_free);
651 }
652
653 static int io_do_iopoll(struct io_ring_ctx *ctx, unsigned int *nr_events,
654                         long min)
655 {
656         struct io_kiocb *req, *tmp;
657         LIST_HEAD(done);
658         bool spin;
659         int ret;
660
661         /*
662          * Only spin for completions if we don't have multiple devices hanging
663          * off our complete list, and we're under the requested amount.
664          */
665         spin = !ctx->poll_multi_file && *nr_events < min;
666
667         ret = 0;
668         list_for_each_entry_safe(req, tmp, &ctx->poll_list, list) {
669                 struct kiocb *kiocb = &req->rw;
670
671                 /*
672                  * Move completed entries to our local list. If we find a
673                  * request that requires polling, break out and complete
674                  * the done list first, if we have entries there.
675                  */
676                 if (req->flags & REQ_F_IOPOLL_COMPLETED) {
677                         list_move_tail(&req->list, &done);
678                         continue;
679                 }
680                 if (!list_empty(&done))
681                         break;
682
683                 ret = kiocb->ki_filp->f_op->iopoll(kiocb, spin);
684                 if (ret < 0)
685                         break;
686
687                 if (ret && spin)
688                         spin = false;
689                 ret = 0;
690         }
691
692         if (!list_empty(&done))
693                 io_iopoll_complete(ctx, nr_events, &done);
694
695         return ret;
696 }
697
698 /*
699  * Poll for a mininum of 'min' events. Note that if min == 0 we consider that a
700  * non-spinning poll check - we'll still enter the driver poll loop, but only
701  * as a non-spinning completion check.
702  */
703 static int io_iopoll_getevents(struct io_ring_ctx *ctx, unsigned int *nr_events,
704                                 long min)
705 {
706         while (!list_empty(&ctx->poll_list)) {
707                 int ret;
708
709                 ret = io_do_iopoll(ctx, nr_events, min);
710                 if (ret < 0)
711                         return ret;
712                 if (!min || *nr_events >= min)
713                         return 0;
714         }
715
716         return 1;
717 }
718
719 /*
720  * We can't just wait for polled events to come to us, we have to actively
721  * find and complete them.
722  */
723 static void io_iopoll_reap_events(struct io_ring_ctx *ctx)
724 {
725         if (!(ctx->flags & IORING_SETUP_IOPOLL))
726                 return;
727
728         mutex_lock(&ctx->uring_lock);
729         while (!list_empty(&ctx->poll_list)) {
730                 unsigned int nr_events = 0;
731
732                 io_iopoll_getevents(ctx, &nr_events, 1);
733         }
734         mutex_unlock(&ctx->uring_lock);
735 }
736
737 static int io_iopoll_check(struct io_ring_ctx *ctx, unsigned *nr_events,
738                            long min)
739 {
740         int ret = 0;
741
742         do {
743                 int tmin = 0;
744
745                 if (*nr_events < min)
746                         tmin = min - *nr_events;
747
748                 ret = io_iopoll_getevents(ctx, nr_events, tmin);
749                 if (ret <= 0)
750                         break;
751                 ret = 0;
752         } while (min && !*nr_events && !need_resched());
753
754         return ret;
755 }
756
757 static void kiocb_end_write(struct kiocb *kiocb)
758 {
759         if (kiocb->ki_flags & IOCB_WRITE) {
760                 struct inode *inode = file_inode(kiocb->ki_filp);
761
762                 /*
763                  * Tell lockdep we inherited freeze protection from submission
764                  * thread.
765                  */
766                 if (S_ISREG(inode->i_mode))
767                         __sb_writers_acquired(inode->i_sb, SB_FREEZE_WRITE);
768                 file_end_write(kiocb->ki_filp);
769         }
770 }
771
772 static void io_complete_rw(struct kiocb *kiocb, long res, long res2)
773 {
774         struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw);
775
776         kiocb_end_write(kiocb);
777
778         io_cqring_add_event(req->ctx, req->user_data, res);
779         io_put_req(req);
780 }
781
782 static void io_complete_rw_iopoll(struct kiocb *kiocb, long res, long res2)
783 {
784         struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw);
785
786         kiocb_end_write(kiocb);
787
788         req->error = res;
789         if (res != -EAGAIN)
790                 req->flags |= REQ_F_IOPOLL_COMPLETED;
791 }
792
793 /*
794  * After the iocb has been issued, it's safe to be found on the poll list.
795  * Adding the kiocb to the list AFTER submission ensures that we don't
796  * find it from a io_iopoll_getevents() thread before the issuer is done
797  * accessing the kiocb cookie.
798  */
799 static void io_iopoll_req_issued(struct io_kiocb *req)
800 {
801         struct io_ring_ctx *ctx = req->ctx;
802
803         /*
804          * Track whether we have multiple files in our lists. This will impact
805          * how we do polling eventually, not spinning if we're on potentially
806          * different devices.
807          */
808         if (list_empty(&ctx->poll_list)) {
809                 ctx->poll_multi_file = false;
810         } else if (!ctx->poll_multi_file) {
811                 struct io_kiocb *list_req;
812
813                 list_req = list_first_entry(&ctx->poll_list, struct io_kiocb,
814                                                 list);
815                 if (list_req->rw.ki_filp != req->rw.ki_filp)
816                         ctx->poll_multi_file = true;
817         }
818
819         /*
820          * For fast devices, IO may have already completed. If it has, add
821          * it to the front so we find it first.
822          */
823         if (req->flags & REQ_F_IOPOLL_COMPLETED)
824                 list_add(&req->list, &ctx->poll_list);
825         else
826                 list_add_tail(&req->list, &ctx->poll_list);
827 }
828
829 static void io_file_put(struct io_submit_state *state)
830 {
831         if (state->file) {
832                 int diff = state->has_refs - state->used_refs;
833
834                 if (diff)
835                         fput_many(state->file, diff);
836                 state->file = NULL;
837         }
838 }
839
840 /*
841  * Get as many references to a file as we have IOs left in this submission,
842  * assuming most submissions are for one file, or at least that each file
843  * has more than one submission.
844  */
845 static struct file *io_file_get(struct io_submit_state *state, int fd)
846 {
847         if (!state)
848                 return fget(fd);
849
850         if (state->file) {
851                 if (state->fd == fd) {
852                         state->used_refs++;
853                         state->ios_left--;
854                         return state->file;
855                 }
856                 io_file_put(state);
857         }
858         state->file = fget_many(fd, state->ios_left);
859         if (!state->file)
860                 return NULL;
861
862         state->fd = fd;
863         state->has_refs = state->ios_left;
864         state->used_refs = 1;
865         state->ios_left--;
866         return state->file;
867 }
868
869 /*
870  * If we tracked the file through the SCM inflight mechanism, we could support
871  * any file. For now, just ensure that anything potentially problematic is done
872  * inline.
873  */
874 static bool io_file_supports_async(struct file *file)
875 {
876         umode_t mode = file_inode(file)->i_mode;
877
878         if (S_ISBLK(mode) || S_ISCHR(mode))
879                 return true;
880         if (S_ISREG(mode) && file->f_op != &io_uring_fops)
881                 return true;
882
883         return false;
884 }
885
886 static int io_prep_rw(struct io_kiocb *req, const struct sqe_submit *s,
887                       bool force_nonblock)
888 {
889         const struct io_uring_sqe *sqe = s->sqe;
890         struct io_ring_ctx *ctx = req->ctx;
891         struct kiocb *kiocb = &req->rw;
892         unsigned ioprio;
893         int ret;
894
895         if (!req->file)
896                 return -EBADF;
897
898         if (force_nonblock && !io_file_supports_async(req->file))
899                 force_nonblock = false;
900
901         kiocb->ki_pos = READ_ONCE(sqe->off);
902         kiocb->ki_flags = iocb_flags(kiocb->ki_filp);
903         kiocb->ki_hint = ki_hint_validate(file_write_hint(kiocb->ki_filp));
904
905         ioprio = READ_ONCE(sqe->ioprio);
906         if (ioprio) {
907                 ret = ioprio_check_cap(ioprio);
908                 if (ret)
909                         return ret;
910
911                 kiocb->ki_ioprio = ioprio;
912         } else
913                 kiocb->ki_ioprio = get_current_ioprio();
914
915         ret = kiocb_set_rw_flags(kiocb, READ_ONCE(sqe->rw_flags));
916         if (unlikely(ret))
917                 return ret;
918
919         /* don't allow async punt if RWF_NOWAIT was requested */
920         if (kiocb->ki_flags & IOCB_NOWAIT)
921                 req->flags |= REQ_F_NOWAIT;
922
923         if (force_nonblock)
924                 kiocb->ki_flags |= IOCB_NOWAIT;
925
926         if (ctx->flags & IORING_SETUP_IOPOLL) {
927                 if (!(kiocb->ki_flags & IOCB_DIRECT) ||
928                     !kiocb->ki_filp->f_op->iopoll)
929                         return -EOPNOTSUPP;
930
931                 req->error = 0;
932                 kiocb->ki_flags |= IOCB_HIPRI;
933                 kiocb->ki_complete = io_complete_rw_iopoll;
934         } else {
935                 if (kiocb->ki_flags & IOCB_HIPRI)
936                         return -EINVAL;
937                 kiocb->ki_complete = io_complete_rw;
938         }
939         return 0;
940 }
941
942 static inline void io_rw_done(struct kiocb *kiocb, ssize_t ret)
943 {
944         switch (ret) {
945         case -EIOCBQUEUED:
946                 break;
947         case -ERESTARTSYS:
948         case -ERESTARTNOINTR:
949         case -ERESTARTNOHAND:
950         case -ERESTART_RESTARTBLOCK:
951                 /*
952                  * We can't just restart the syscall, since previously
953                  * submitted sqes may already be in progress. Just fail this
954                  * IO with EINTR.
955                  */
956                 ret = -EINTR;
957                 /* fall through */
958         default:
959                 kiocb->ki_complete(kiocb, ret, 0);
960         }
961 }
962
963 static int io_import_fixed(struct io_ring_ctx *ctx, int rw,
964                            const struct io_uring_sqe *sqe,
965                            struct iov_iter *iter)
966 {
967         size_t len = READ_ONCE(sqe->len);
968         struct io_mapped_ubuf *imu;
969         unsigned index, buf_index;
970         size_t offset;
971         u64 buf_addr;
972
973         /* attempt to use fixed buffers without having provided iovecs */
974         if (unlikely(!ctx->user_bufs))
975                 return -EFAULT;
976
977         buf_index = READ_ONCE(sqe->buf_index);
978         if (unlikely(buf_index >= ctx->nr_user_bufs))
979                 return -EFAULT;
980
981         index = array_index_nospec(buf_index, ctx->nr_user_bufs);
982         imu = &ctx->user_bufs[index];
983         buf_addr = READ_ONCE(sqe->addr);
984
985         /* overflow */
986         if (buf_addr + len < buf_addr)
987                 return -EFAULT;
988         /* not inside the mapped region */
989         if (buf_addr < imu->ubuf || buf_addr + len > imu->ubuf + imu->len)
990                 return -EFAULT;
991
992         /*
993          * May not be a start of buffer, set size appropriately
994          * and advance us to the beginning.
995          */
996         offset = buf_addr - imu->ubuf;
997         iov_iter_bvec(iter, rw, imu->bvec, imu->nr_bvecs, offset + len);
998         if (offset)
999                 iov_iter_advance(iter, offset);
1000
1001         /* don't drop a reference to these pages */
1002         iter->type |= ITER_BVEC_FLAG_NO_REF;
1003         return 0;
1004 }
1005
1006 static ssize_t io_import_iovec(struct io_ring_ctx *ctx, int rw,
1007                                const struct sqe_submit *s, struct iovec **iovec,
1008                                struct iov_iter *iter)
1009 {
1010         const struct io_uring_sqe *sqe = s->sqe;
1011         void __user *buf = u64_to_user_ptr(READ_ONCE(sqe->addr));
1012         size_t sqe_len = READ_ONCE(sqe->len);
1013         u8 opcode;
1014
1015         /*
1016          * We're reading ->opcode for the second time, but the first read
1017          * doesn't care whether it's _FIXED or not, so it doesn't matter
1018          * whether ->opcode changes concurrently. The first read does care
1019          * about whether it is a READ or a WRITE, so we don't trust this read
1020          * for that purpose and instead let the caller pass in the read/write
1021          * flag.
1022          */
1023         opcode = READ_ONCE(sqe->opcode);
1024         if (opcode == IORING_OP_READ_FIXED ||
1025             opcode == IORING_OP_WRITE_FIXED) {
1026                 ssize_t ret = io_import_fixed(ctx, rw, sqe, iter);
1027                 *iovec = NULL;
1028                 return ret;
1029         }
1030
1031         if (!s->has_user)
1032                 return -EFAULT;
1033
1034 #ifdef CONFIG_COMPAT
1035         if (ctx->compat)
1036                 return compat_import_iovec(rw, buf, sqe_len, UIO_FASTIOV,
1037                                                 iovec, iter);
1038 #endif
1039
1040         return import_iovec(rw, buf, sqe_len, UIO_FASTIOV, iovec, iter);
1041 }
1042
1043 /*
1044  * Make a note of the last file/offset/direction we punted to async
1045  * context. We'll use this information to see if we can piggy back a
1046  * sequential request onto the previous one, if it's still hasn't been
1047  * completed by the async worker.
1048  */
1049 static void io_async_list_note(int rw, struct io_kiocb *req, size_t len)
1050 {
1051         struct async_list *async_list = &req->ctx->pending_async[rw];
1052         struct kiocb *kiocb = &req->rw;
1053         struct file *filp = kiocb->ki_filp;
1054         off_t io_end = kiocb->ki_pos + len;
1055
1056         if (filp == async_list->file && kiocb->ki_pos == async_list->io_end) {
1057                 unsigned long max_pages;
1058
1059                 /* Use 8x RA size as a decent limiter for both reads/writes */
1060                 max_pages = filp->f_ra.ra_pages;
1061                 if (!max_pages)
1062                         max_pages = VM_READAHEAD_PAGES;
1063                 max_pages *= 8;
1064
1065                 /* If max pages are exceeded, reset the state */
1066                 len >>= PAGE_SHIFT;
1067                 if (async_list->io_pages + len <= max_pages) {
1068                         req->flags |= REQ_F_SEQ_PREV;
1069                         async_list->io_pages += len;
1070                 } else {
1071                         io_end = 0;
1072                         async_list->io_pages = 0;
1073                 }
1074         }
1075
1076         /* New file? Reset state. */
1077         if (async_list->file != filp) {
1078                 async_list->io_pages = 0;
1079                 async_list->file = filp;
1080         }
1081         async_list->io_end = io_end;
1082 }
1083
1084 static int io_read(struct io_kiocb *req, const struct sqe_submit *s,
1085                    bool force_nonblock)
1086 {
1087         struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs;
1088         struct kiocb *kiocb = &req->rw;
1089         struct iov_iter iter;
1090         struct file *file;
1091         size_t iov_count;
1092         ssize_t read_size, ret;
1093
1094         ret = io_prep_rw(req, s, force_nonblock);
1095         if (ret)
1096                 return ret;
1097         file = kiocb->ki_filp;
1098
1099         if (unlikely(!(file->f_mode & FMODE_READ)))
1100                 return -EBADF;
1101         if (unlikely(!file->f_op->read_iter))
1102                 return -EINVAL;
1103
1104         ret = io_import_iovec(req->ctx, READ, s, &iovec, &iter);
1105         if (ret < 0)
1106                 return ret;
1107
1108         read_size = ret;
1109         iov_count = iov_iter_count(&iter);
1110         ret = rw_verify_area(READ, file, &kiocb->ki_pos, iov_count);
1111         if (!ret) {
1112                 ssize_t ret2;
1113
1114                 ret2 = call_read_iter(file, kiocb, &iter);
1115                 /*
1116                  * In case of a short read, punt to async. This can happen
1117                  * if we have data partially cached. Alternatively we can
1118                  * return the short read, in which case the application will
1119                  * need to issue another SQE and wait for it. That SQE will
1120                  * need async punt anyway, so it's more efficient to do it
1121                  * here.
1122                  */
1123                 if (force_nonblock && ret2 > 0 && ret2 < read_size)
1124                         ret2 = -EAGAIN;
1125                 /* Catch -EAGAIN return for forced non-blocking submission */
1126                 if (!force_nonblock || ret2 != -EAGAIN) {
1127                         io_rw_done(kiocb, ret2);
1128                 } else {
1129                         /*
1130                          * If ->needs_lock is true, we're already in async
1131                          * context.
1132                          */
1133                         if (!s->needs_lock)
1134                                 io_async_list_note(READ, req, iov_count);
1135                         ret = -EAGAIN;
1136                 }
1137         }
1138         kfree(iovec);
1139         return ret;
1140 }
1141
1142 static int io_write(struct io_kiocb *req, const struct sqe_submit *s,
1143                     bool force_nonblock)
1144 {
1145         struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs;
1146         struct kiocb *kiocb = &req->rw;
1147         struct iov_iter iter;
1148         struct file *file;
1149         size_t iov_count;
1150         ssize_t ret;
1151
1152         ret = io_prep_rw(req, s, force_nonblock);
1153         if (ret)
1154                 return ret;
1155
1156         file = kiocb->ki_filp;
1157         if (unlikely(!(file->f_mode & FMODE_WRITE)))
1158                 return -EBADF;
1159         if (unlikely(!file->f_op->write_iter))
1160                 return -EINVAL;
1161
1162         ret = io_import_iovec(req->ctx, WRITE, s, &iovec, &iter);
1163         if (ret < 0)
1164                 return ret;
1165
1166         iov_count = iov_iter_count(&iter);
1167
1168         ret = -EAGAIN;
1169         if (force_nonblock && !(kiocb->ki_flags & IOCB_DIRECT)) {
1170                 /* If ->needs_lock is true, we're already in async context. */
1171                 if (!s->needs_lock)
1172                         io_async_list_note(WRITE, req, iov_count);
1173                 goto out_free;
1174         }
1175
1176         ret = rw_verify_area(WRITE, file, &kiocb->ki_pos, iov_count);
1177         if (!ret) {
1178                 ssize_t ret2;
1179
1180                 /*
1181                  * Open-code file_start_write here to grab freeze protection,
1182                  * which will be released by another thread in
1183                  * io_complete_rw().  Fool lockdep by telling it the lock got
1184                  * released so that it doesn't complain about the held lock when
1185                  * we return to userspace.
1186                  */
1187                 if (S_ISREG(file_inode(file)->i_mode)) {
1188                         __sb_start_write(file_inode(file)->i_sb,
1189                                                 SB_FREEZE_WRITE, true);
1190                         __sb_writers_release(file_inode(file)->i_sb,
1191                                                 SB_FREEZE_WRITE);
1192                 }
1193                 kiocb->ki_flags |= IOCB_WRITE;
1194
1195                 ret2 = call_write_iter(file, kiocb, &iter);
1196                 if (!force_nonblock || ret2 != -EAGAIN) {
1197                         io_rw_done(kiocb, ret2);
1198                 } else {
1199                         /*
1200                          * If ->needs_lock is true, we're already in async
1201                          * context.
1202                          */
1203                         if (!s->needs_lock)
1204                                 io_async_list_note(WRITE, req, iov_count);
1205                         ret = -EAGAIN;
1206                 }
1207         }
1208 out_free:
1209         kfree(iovec);
1210         return ret;
1211 }
1212
1213 /*
1214  * IORING_OP_NOP just posts a completion event, nothing else.
1215  */
1216 static int io_nop(struct io_kiocb *req, u64 user_data)
1217 {
1218         struct io_ring_ctx *ctx = req->ctx;
1219         long err = 0;
1220
1221         if (unlikely(ctx->flags & IORING_SETUP_IOPOLL))
1222                 return -EINVAL;
1223
1224         io_cqring_add_event(ctx, user_data, err);
1225         io_put_req(req);
1226         return 0;
1227 }
1228
1229 static int io_prep_fsync(struct io_kiocb *req, const struct io_uring_sqe *sqe)
1230 {
1231         struct io_ring_ctx *ctx = req->ctx;
1232
1233         if (!req->file)
1234                 return -EBADF;
1235
1236         if (unlikely(ctx->flags & IORING_SETUP_IOPOLL))
1237                 return -EINVAL;
1238         if (unlikely(sqe->addr || sqe->ioprio || sqe->buf_index))
1239                 return -EINVAL;
1240
1241         return 0;
1242 }
1243
1244 static int io_fsync(struct io_kiocb *req, const struct io_uring_sqe *sqe,
1245                     bool force_nonblock)
1246 {
1247         loff_t sqe_off = READ_ONCE(sqe->off);
1248         loff_t sqe_len = READ_ONCE(sqe->len);
1249         loff_t end = sqe_off + sqe_len;
1250         unsigned fsync_flags;
1251         int ret;
1252
1253         fsync_flags = READ_ONCE(sqe->fsync_flags);
1254         if (unlikely(fsync_flags & ~IORING_FSYNC_DATASYNC))
1255                 return -EINVAL;
1256
1257         ret = io_prep_fsync(req, sqe);
1258         if (ret)
1259                 return ret;
1260
1261         /* fsync always requires a blocking context */
1262         if (force_nonblock)
1263                 return -EAGAIN;
1264
1265         ret = vfs_fsync_range(req->rw.ki_filp, sqe_off,
1266                                 end > 0 ? end : LLONG_MAX,
1267                                 fsync_flags & IORING_FSYNC_DATASYNC);
1268
1269         io_cqring_add_event(req->ctx, sqe->user_data, ret);
1270         io_put_req(req);
1271         return 0;
1272 }
1273
1274 static int io_prep_sfr(struct io_kiocb *req, const struct io_uring_sqe *sqe)
1275 {
1276         struct io_ring_ctx *ctx = req->ctx;
1277         int ret = 0;
1278
1279         if (!req->file)
1280                 return -EBADF;
1281
1282         if (unlikely(ctx->flags & IORING_SETUP_IOPOLL))
1283                 return -EINVAL;
1284         if (unlikely(sqe->addr || sqe->ioprio || sqe->buf_index))
1285                 return -EINVAL;
1286
1287         return ret;
1288 }
1289
1290 static int io_sync_file_range(struct io_kiocb *req,
1291                               const struct io_uring_sqe *sqe,
1292                               bool force_nonblock)
1293 {
1294         loff_t sqe_off;
1295         loff_t sqe_len;
1296         unsigned flags;
1297         int ret;
1298
1299         ret = io_prep_sfr(req, sqe);
1300         if (ret)
1301                 return ret;
1302
1303         /* sync_file_range always requires a blocking context */
1304         if (force_nonblock)
1305                 return -EAGAIN;
1306
1307         sqe_off = READ_ONCE(sqe->off);
1308         sqe_len = READ_ONCE(sqe->len);
1309         flags = READ_ONCE(sqe->sync_range_flags);
1310
1311         ret = sync_file_range(req->rw.ki_filp, sqe_off, sqe_len, flags);
1312
1313         io_cqring_add_event(req->ctx, sqe->user_data, ret);
1314         io_put_req(req);
1315         return 0;
1316 }
1317
1318 static void io_poll_remove_one(struct io_kiocb *req)
1319 {
1320         struct io_poll_iocb *poll = &req->poll;
1321
1322         spin_lock(&poll->head->lock);
1323         WRITE_ONCE(poll->canceled, true);
1324         if (!list_empty(&poll->wait.entry)) {
1325                 list_del_init(&poll->wait.entry);
1326                 queue_work(req->ctx->sqo_wq, &req->work);
1327         }
1328         spin_unlock(&poll->head->lock);
1329
1330         list_del_init(&req->list);
1331 }
1332
1333 static void io_poll_remove_all(struct io_ring_ctx *ctx)
1334 {
1335         struct io_kiocb *req;
1336
1337         spin_lock_irq(&ctx->completion_lock);
1338         while (!list_empty(&ctx->cancel_list)) {
1339                 req = list_first_entry(&ctx->cancel_list, struct io_kiocb,list);
1340                 io_poll_remove_one(req);
1341         }
1342         spin_unlock_irq(&ctx->completion_lock);
1343 }
1344
1345 /*
1346  * Find a running poll command that matches one specified in sqe->addr,
1347  * and remove it if found.
1348  */
1349 static int io_poll_remove(struct io_kiocb *req, const struct io_uring_sqe *sqe)
1350 {
1351         struct io_ring_ctx *ctx = req->ctx;
1352         struct io_kiocb *poll_req, *next;
1353         int ret = -ENOENT;
1354
1355         if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL))
1356                 return -EINVAL;
1357         if (sqe->ioprio || sqe->off || sqe->len || sqe->buf_index ||
1358             sqe->poll_events)
1359                 return -EINVAL;
1360
1361         spin_lock_irq(&ctx->completion_lock);
1362         list_for_each_entry_safe(poll_req, next, &ctx->cancel_list, list) {
1363                 if (READ_ONCE(sqe->addr) == poll_req->user_data) {
1364                         io_poll_remove_one(poll_req);
1365                         ret = 0;
1366                         break;
1367                 }
1368         }
1369         spin_unlock_irq(&ctx->completion_lock);
1370
1371         io_cqring_add_event(req->ctx, sqe->user_data, ret);
1372         io_put_req(req);
1373         return 0;
1374 }
1375
1376 static void io_poll_complete(struct io_ring_ctx *ctx, struct io_kiocb *req,
1377                              __poll_t mask)
1378 {
1379         req->poll.done = true;
1380         io_cqring_fill_event(ctx, req->user_data, mangle_poll(mask));
1381         io_commit_cqring(ctx);
1382 }
1383
1384 static void io_poll_complete_work(struct work_struct *work)
1385 {
1386         struct io_kiocb *req = container_of(work, struct io_kiocb, work);
1387         struct io_poll_iocb *poll = &req->poll;
1388         struct poll_table_struct pt = { ._key = poll->events };
1389         struct io_ring_ctx *ctx = req->ctx;
1390         __poll_t mask = 0;
1391
1392         if (!READ_ONCE(poll->canceled))
1393                 mask = vfs_poll(poll->file, &pt) & poll->events;
1394
1395         /*
1396          * Note that ->ki_cancel callers also delete iocb from active_reqs after
1397          * calling ->ki_cancel.  We need the ctx_lock roundtrip here to
1398          * synchronize with them.  In the cancellation case the list_del_init
1399          * itself is not actually needed, but harmless so we keep it in to
1400          * avoid further branches in the fast path.
1401          */
1402         spin_lock_irq(&ctx->completion_lock);
1403         if (!mask && !READ_ONCE(poll->canceled)) {
1404                 add_wait_queue(poll->head, &poll->wait);
1405                 spin_unlock_irq(&ctx->completion_lock);
1406                 return;
1407         }
1408         list_del_init(&req->list);
1409         io_poll_complete(ctx, req, mask);
1410         spin_unlock_irq(&ctx->completion_lock);
1411
1412         io_cqring_ev_posted(ctx);
1413         io_put_req(req);
1414 }
1415
1416 static int io_poll_wake(struct wait_queue_entry *wait, unsigned mode, int sync,
1417                         void *key)
1418 {
1419         struct io_poll_iocb *poll = container_of(wait, struct io_poll_iocb,
1420                                                         wait);
1421         struct io_kiocb *req = container_of(poll, struct io_kiocb, poll);
1422         struct io_ring_ctx *ctx = req->ctx;
1423         __poll_t mask = key_to_poll(key);
1424         unsigned long flags;
1425
1426         /* for instances that support it check for an event match first: */
1427         if (mask && !(mask & poll->events))
1428                 return 0;
1429
1430         list_del_init(&poll->wait.entry);
1431
1432         if (mask && spin_trylock_irqsave(&ctx->completion_lock, flags)) {
1433                 list_del(&req->list);
1434                 io_poll_complete(ctx, req, mask);
1435                 spin_unlock_irqrestore(&ctx->completion_lock, flags);
1436
1437                 io_cqring_ev_posted(ctx);
1438                 io_put_req(req);
1439         } else {
1440                 queue_work(ctx->sqo_wq, &req->work);
1441         }
1442
1443         return 1;
1444 }
1445
1446 struct io_poll_table {
1447         struct poll_table_struct pt;
1448         struct io_kiocb *req;
1449         int error;
1450 };
1451
1452 static void io_poll_queue_proc(struct file *file, struct wait_queue_head *head,
1453                                struct poll_table_struct *p)
1454 {
1455         struct io_poll_table *pt = container_of(p, struct io_poll_table, pt);
1456
1457         if (unlikely(pt->req->poll.head)) {
1458                 pt->error = -EINVAL;
1459                 return;
1460         }
1461
1462         pt->error = 0;
1463         pt->req->poll.head = head;
1464         add_wait_queue(head, &pt->req->poll.wait);
1465 }
1466
1467 static int io_poll_add(struct io_kiocb *req, const struct io_uring_sqe *sqe)
1468 {
1469         struct io_poll_iocb *poll = &req->poll;
1470         struct io_ring_ctx *ctx = req->ctx;
1471         struct io_poll_table ipt;
1472         bool cancel = false;
1473         __poll_t mask;
1474         u16 events;
1475
1476         if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL))
1477                 return -EINVAL;
1478         if (sqe->addr || sqe->ioprio || sqe->off || sqe->len || sqe->buf_index)
1479                 return -EINVAL;
1480         if (!poll->file)
1481                 return -EBADF;
1482
1483         INIT_WORK(&req->work, io_poll_complete_work);
1484         events = READ_ONCE(sqe->poll_events);
1485         poll->events = demangle_poll(events) | EPOLLERR | EPOLLHUP;
1486
1487         poll->head = NULL;
1488         poll->done = false;
1489         poll->canceled = false;
1490
1491         ipt.pt._qproc = io_poll_queue_proc;
1492         ipt.pt._key = poll->events;
1493         ipt.req = req;
1494         ipt.error = -EINVAL; /* same as no support for IOCB_CMD_POLL */
1495
1496         /* initialized the list so that we can do list_empty checks */
1497         INIT_LIST_HEAD(&poll->wait.entry);
1498         init_waitqueue_func_entry(&poll->wait, io_poll_wake);
1499
1500         mask = vfs_poll(poll->file, &ipt.pt) & poll->events;
1501
1502         spin_lock_irq(&ctx->completion_lock);
1503         if (likely(poll->head)) {
1504                 spin_lock(&poll->head->lock);
1505                 if (unlikely(list_empty(&poll->wait.entry))) {
1506                         if (ipt.error)
1507                                 cancel = true;
1508                         ipt.error = 0;
1509                         mask = 0;
1510                 }
1511                 if (mask || ipt.error)
1512                         list_del_init(&poll->wait.entry);
1513                 else if (cancel)
1514                         WRITE_ONCE(poll->canceled, true);
1515                 else if (!poll->done) /* actually waiting for an event */
1516                         list_add_tail(&req->list, &ctx->cancel_list);
1517                 spin_unlock(&poll->head->lock);
1518         }
1519         if (mask) { /* no async, we'd stolen it */
1520                 ipt.error = 0;
1521                 io_poll_complete(ctx, req, mask);
1522         }
1523         spin_unlock_irq(&ctx->completion_lock);
1524
1525         if (mask) {
1526                 io_cqring_ev_posted(ctx);
1527                 io_put_req(req);
1528         }
1529         return ipt.error;
1530 }
1531
1532 static int io_req_defer(struct io_ring_ctx *ctx, struct io_kiocb *req,
1533                         const struct io_uring_sqe *sqe)
1534 {
1535         struct io_uring_sqe *sqe_copy;
1536
1537         if (!io_sequence_defer(ctx, req) && list_empty(&ctx->defer_list))
1538                 return 0;
1539
1540         sqe_copy = kmalloc(sizeof(*sqe_copy), GFP_KERNEL);
1541         if (!sqe_copy)
1542                 return -EAGAIN;
1543
1544         spin_lock_irq(&ctx->completion_lock);
1545         if (!io_sequence_defer(ctx, req) && list_empty(&ctx->defer_list)) {
1546                 spin_unlock_irq(&ctx->completion_lock);
1547                 kfree(sqe_copy);
1548                 return 0;
1549         }
1550
1551         memcpy(sqe_copy, sqe, sizeof(*sqe_copy));
1552         req->submit.sqe = sqe_copy;
1553
1554         INIT_WORK(&req->work, io_sq_wq_submit_work);
1555         list_add_tail(&req->list, &ctx->defer_list);
1556         spin_unlock_irq(&ctx->completion_lock);
1557         return -EIOCBQUEUED;
1558 }
1559
1560 static int __io_submit_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
1561                            const struct sqe_submit *s, bool force_nonblock)
1562 {
1563         int ret, opcode;
1564
1565         if (unlikely(s->index >= ctx->sq_entries))
1566                 return -EINVAL;
1567         req->user_data = READ_ONCE(s->sqe->user_data);
1568
1569         opcode = READ_ONCE(s->sqe->opcode);
1570         switch (opcode) {
1571         case IORING_OP_NOP:
1572                 ret = io_nop(req, req->user_data);
1573                 break;
1574         case IORING_OP_READV:
1575                 if (unlikely(s->sqe->buf_index))
1576                         return -EINVAL;
1577                 ret = io_read(req, s, force_nonblock);
1578                 break;
1579         case IORING_OP_WRITEV:
1580                 if (unlikely(s->sqe->buf_index))
1581                         return -EINVAL;
1582                 ret = io_write(req, s, force_nonblock);
1583                 break;
1584         case IORING_OP_READ_FIXED:
1585                 ret = io_read(req, s, force_nonblock);
1586                 break;
1587         case IORING_OP_WRITE_FIXED:
1588                 ret = io_write(req, s, force_nonblock);
1589                 break;
1590         case IORING_OP_FSYNC:
1591                 ret = io_fsync(req, s->sqe, force_nonblock);
1592                 break;
1593         case IORING_OP_POLL_ADD:
1594                 ret = io_poll_add(req, s->sqe);
1595                 break;
1596         case IORING_OP_POLL_REMOVE:
1597                 ret = io_poll_remove(req, s->sqe);
1598                 break;
1599         case IORING_OP_SYNC_FILE_RANGE:
1600                 ret = io_sync_file_range(req, s->sqe, force_nonblock);
1601                 break;
1602         default:
1603                 ret = -EINVAL;
1604                 break;
1605         }
1606
1607         if (ret)
1608                 return ret;
1609
1610         if (ctx->flags & IORING_SETUP_IOPOLL) {
1611                 if (req->error == -EAGAIN)
1612                         return -EAGAIN;
1613
1614                 /* workqueue context doesn't hold uring_lock, grab it now */
1615                 if (s->needs_lock)
1616                         mutex_lock(&ctx->uring_lock);
1617                 io_iopoll_req_issued(req);
1618                 if (s->needs_lock)
1619                         mutex_unlock(&ctx->uring_lock);
1620         }
1621
1622         return 0;
1623 }
1624
1625 static struct async_list *io_async_list_from_sqe(struct io_ring_ctx *ctx,
1626                                                  const struct io_uring_sqe *sqe)
1627 {
1628         switch (sqe->opcode) {
1629         case IORING_OP_READV:
1630         case IORING_OP_READ_FIXED:
1631                 return &ctx->pending_async[READ];
1632         case IORING_OP_WRITEV:
1633         case IORING_OP_WRITE_FIXED:
1634                 return &ctx->pending_async[WRITE];
1635         default:
1636                 return NULL;
1637         }
1638 }
1639
1640 static inline bool io_sqe_needs_user(const struct io_uring_sqe *sqe)
1641 {
1642         u8 opcode = READ_ONCE(sqe->opcode);
1643
1644         return !(opcode == IORING_OP_READ_FIXED ||
1645                  opcode == IORING_OP_WRITE_FIXED);
1646 }
1647
1648 static void io_sq_wq_submit_work(struct work_struct *work)
1649 {
1650         struct io_kiocb *req = container_of(work, struct io_kiocb, work);
1651         struct io_ring_ctx *ctx = req->ctx;
1652         struct mm_struct *cur_mm = NULL;
1653         struct async_list *async_list;
1654         LIST_HEAD(req_list);
1655         mm_segment_t old_fs;
1656         int ret;
1657
1658         async_list = io_async_list_from_sqe(ctx, req->submit.sqe);
1659 restart:
1660         do {
1661                 struct sqe_submit *s = &req->submit;
1662                 const struct io_uring_sqe *sqe = s->sqe;
1663
1664                 /* Ensure we clear previously set non-block flag */
1665                 req->rw.ki_flags &= ~IOCB_NOWAIT;
1666
1667                 ret = 0;
1668                 if (io_sqe_needs_user(sqe) && !cur_mm) {
1669                         if (!mmget_not_zero(ctx->sqo_mm)) {
1670                                 ret = -EFAULT;
1671                         } else {
1672                                 cur_mm = ctx->sqo_mm;
1673                                 use_mm(cur_mm);
1674                                 old_fs = get_fs();
1675                                 set_fs(USER_DS);
1676                         }
1677                 }
1678
1679                 if (!ret) {
1680                         s->has_user = cur_mm != NULL;
1681                         s->needs_lock = true;
1682                         do {
1683                                 ret = __io_submit_sqe(ctx, req, s, false);
1684                                 /*
1685                                  * We can get EAGAIN for polled IO even though
1686                                  * we're forcing a sync submission from here,
1687                                  * since we can't wait for request slots on the
1688                                  * block side.
1689                                  */
1690                                 if (ret != -EAGAIN)
1691                                         break;
1692                                 cond_resched();
1693                         } while (1);
1694                 }
1695
1696                 /* drop submission reference */
1697                 io_put_req(req);
1698
1699                 if (ret) {
1700                         io_cqring_add_event(ctx, sqe->user_data, ret);
1701                         io_put_req(req);
1702                 }
1703
1704                 /* async context always use a copy of the sqe */
1705                 kfree(sqe);
1706
1707                 if (!async_list)
1708                         break;
1709                 if (!list_empty(&req_list)) {
1710                         req = list_first_entry(&req_list, struct io_kiocb,
1711                                                 list);
1712                         list_del(&req->list);
1713                         continue;
1714                 }
1715                 if (list_empty(&async_list->list))
1716                         break;
1717
1718                 req = NULL;
1719                 spin_lock(&async_list->lock);
1720                 if (list_empty(&async_list->list)) {
1721                         spin_unlock(&async_list->lock);
1722                         break;
1723                 }
1724                 list_splice_init(&async_list->list, &req_list);
1725                 spin_unlock(&async_list->lock);
1726
1727                 req = list_first_entry(&req_list, struct io_kiocb, list);
1728                 list_del(&req->list);
1729         } while (req);
1730
1731         /*
1732          * Rare case of racing with a submitter. If we find the count has
1733          * dropped to zero AND we have pending work items, then restart
1734          * the processing. This is a tiny race window.
1735          */
1736         if (async_list) {
1737                 ret = atomic_dec_return(&async_list->cnt);
1738                 while (!ret && !list_empty(&async_list->list)) {
1739                         spin_lock(&async_list->lock);
1740                         atomic_inc(&async_list->cnt);
1741                         list_splice_init(&async_list->list, &req_list);
1742                         spin_unlock(&async_list->lock);
1743
1744                         if (!list_empty(&req_list)) {
1745                                 req = list_first_entry(&req_list,
1746                                                         struct io_kiocb, list);
1747                                 list_del(&req->list);
1748                                 goto restart;
1749                         }
1750                         ret = atomic_dec_return(&async_list->cnt);
1751                 }
1752         }
1753
1754         if (cur_mm) {
1755                 set_fs(old_fs);
1756                 unuse_mm(cur_mm);
1757                 mmput(cur_mm);
1758         }
1759 }
1760
1761 /*
1762  * See if we can piggy back onto previously submitted work, that is still
1763  * running. We currently only allow this if the new request is sequential
1764  * to the previous one we punted.
1765  */
1766 static bool io_add_to_prev_work(struct async_list *list, struct io_kiocb *req)
1767 {
1768         bool ret = false;
1769
1770         if (!list)
1771                 return false;
1772         if (!(req->flags & REQ_F_SEQ_PREV))
1773                 return false;
1774         if (!atomic_read(&list->cnt))
1775                 return false;
1776
1777         ret = true;
1778         spin_lock(&list->lock);
1779         list_add_tail(&req->list, &list->list);
1780         if (!atomic_read(&list->cnt)) {
1781                 list_del_init(&req->list);
1782                 ret = false;
1783         }
1784         spin_unlock(&list->lock);
1785         return ret;
1786 }
1787
1788 static bool io_op_needs_file(const struct io_uring_sqe *sqe)
1789 {
1790         int op = READ_ONCE(sqe->opcode);
1791
1792         switch (op) {
1793         case IORING_OP_NOP:
1794         case IORING_OP_POLL_REMOVE:
1795                 return false;
1796         default:
1797                 return true;
1798         }
1799 }
1800
1801 static int io_req_set_file(struct io_ring_ctx *ctx, const struct sqe_submit *s,
1802                            struct io_submit_state *state, struct io_kiocb *req)
1803 {
1804         unsigned flags;
1805         int fd;
1806
1807         flags = READ_ONCE(s->sqe->flags);
1808         fd = READ_ONCE(s->sqe->fd);
1809
1810         if (flags & IOSQE_IO_DRAIN) {
1811                 req->flags |= REQ_F_IO_DRAIN;
1812                 req->sequence = ctx->cached_sq_head - 1;
1813         }
1814
1815         if (!io_op_needs_file(s->sqe)) {
1816                 req->file = NULL;
1817                 return 0;
1818         }
1819
1820         if (flags & IOSQE_FIXED_FILE) {
1821                 if (unlikely(!ctx->user_files ||
1822                     (unsigned) fd >= ctx->nr_user_files))
1823                         return -EBADF;
1824                 req->file = ctx->user_files[fd];
1825                 req->flags |= REQ_F_FIXED_FILE;
1826         } else {
1827                 if (s->needs_fixed_file)
1828                         return -EBADF;
1829                 req->file = io_file_get(state, fd);
1830                 if (unlikely(!req->file))
1831                         return -EBADF;
1832         }
1833
1834         return 0;
1835 }
1836
1837 static int io_submit_sqe(struct io_ring_ctx *ctx, struct sqe_submit *s,
1838                          struct io_submit_state *state)
1839 {
1840         struct io_kiocb *req;
1841         int ret;
1842
1843         /* enforce forwards compatibility on users */
1844         if (unlikely(s->sqe->flags & ~(IOSQE_FIXED_FILE | IOSQE_IO_DRAIN)))
1845                 return -EINVAL;
1846
1847         req = io_get_req(ctx, state);
1848         if (unlikely(!req))
1849                 return -EAGAIN;
1850
1851         ret = io_req_set_file(ctx, s, state, req);
1852         if (unlikely(ret))
1853                 goto out;
1854
1855         ret = io_req_defer(ctx, req, s->sqe);
1856         if (ret) {
1857                 if (ret == -EIOCBQUEUED)
1858                         ret = 0;
1859                 return ret;
1860         }
1861
1862         ret = __io_submit_sqe(ctx, req, s, true);
1863         if (ret == -EAGAIN && !(req->flags & REQ_F_NOWAIT)) {
1864                 struct io_uring_sqe *sqe_copy;
1865
1866                 sqe_copy = kmalloc(sizeof(*sqe_copy), GFP_KERNEL);
1867                 if (sqe_copy) {
1868                         struct async_list *list;
1869
1870                         memcpy(sqe_copy, s->sqe, sizeof(*sqe_copy));
1871                         s->sqe = sqe_copy;
1872
1873                         memcpy(&req->submit, s, sizeof(*s));
1874                         list = io_async_list_from_sqe(ctx, s->sqe);
1875                         if (!io_add_to_prev_work(list, req)) {
1876                                 if (list)
1877                                         atomic_inc(&list->cnt);
1878                                 INIT_WORK(&req->work, io_sq_wq_submit_work);
1879                                 queue_work(ctx->sqo_wq, &req->work);
1880                         }
1881
1882                         /*
1883                          * Queued up for async execution, worker will release
1884                          * submit reference when the iocb is actually
1885                          * submitted.
1886                          */
1887                         return 0;
1888                 }
1889         }
1890
1891 out:
1892         /* drop submission reference */
1893         io_put_req(req);
1894
1895         /* and drop final reference, if we failed */
1896         if (ret)
1897                 io_put_req(req);
1898
1899         return ret;
1900 }
1901
1902 /*
1903  * Batched submission is done, ensure local IO is flushed out.
1904  */
1905 static void io_submit_state_end(struct io_submit_state *state)
1906 {
1907         blk_finish_plug(&state->plug);
1908         io_file_put(state);
1909         if (state->free_reqs)
1910                 kmem_cache_free_bulk(req_cachep, state->free_reqs,
1911                                         &state->reqs[state->cur_req]);
1912 }
1913
1914 /*
1915  * Start submission side cache.
1916  */
1917 static void io_submit_state_start(struct io_submit_state *state,
1918                                   struct io_ring_ctx *ctx, unsigned max_ios)
1919 {
1920         blk_start_plug(&state->plug);
1921         state->free_reqs = 0;
1922         state->file = NULL;
1923         state->ios_left = max_ios;
1924 }
1925
1926 static void io_commit_sqring(struct io_ring_ctx *ctx)
1927 {
1928         struct io_sq_ring *ring = ctx->sq_ring;
1929
1930         if (ctx->cached_sq_head != READ_ONCE(ring->r.head)) {
1931                 /*
1932                  * Ensure any loads from the SQEs are done at this point,
1933                  * since once we write the new head, the application could
1934                  * write new data to them.
1935                  */
1936                 smp_store_release(&ring->r.head, ctx->cached_sq_head);
1937         }
1938 }
1939
1940 /*
1941  * Fetch an sqe, if one is available. Note that s->sqe will point to memory
1942  * that is mapped by userspace. This means that care needs to be taken to
1943  * ensure that reads are stable, as we cannot rely on userspace always
1944  * being a good citizen. If members of the sqe are validated and then later
1945  * used, it's important that those reads are done through READ_ONCE() to
1946  * prevent a re-load down the line.
1947  */
1948 static bool io_get_sqring(struct io_ring_ctx *ctx, struct sqe_submit *s)
1949 {
1950         struct io_sq_ring *ring = ctx->sq_ring;
1951         unsigned head;
1952
1953         /*
1954          * The cached sq head (or cq tail) serves two purposes:
1955          *
1956          * 1) allows us to batch the cost of updating the user visible
1957          *    head updates.
1958          * 2) allows the kernel side to track the head on its own, even
1959          *    though the application is the one updating it.
1960          */
1961         head = ctx->cached_sq_head;
1962         /* make sure SQ entry isn't read before tail */
1963         if (head == smp_load_acquire(&ring->r.tail))
1964                 return false;
1965
1966         head = READ_ONCE(ring->array[head & ctx->sq_mask]);
1967         if (head < ctx->sq_entries) {
1968                 s->index = head;
1969                 s->sqe = &ctx->sq_sqes[head];
1970                 ctx->cached_sq_head++;
1971                 return true;
1972         }
1973
1974         /* drop invalid entries */
1975         ctx->cached_sq_head++;
1976         ring->dropped++;
1977         return false;
1978 }
1979
1980 static int io_submit_sqes(struct io_ring_ctx *ctx, struct sqe_submit *sqes,
1981                           unsigned int nr, bool has_user, bool mm_fault)
1982 {
1983         struct io_submit_state state, *statep = NULL;
1984         int ret, i, submitted = 0;
1985
1986         if (nr > IO_PLUG_THRESHOLD) {
1987                 io_submit_state_start(&state, ctx, nr);
1988                 statep = &state;
1989         }
1990
1991         for (i = 0; i < nr; i++) {
1992                 if (unlikely(mm_fault)) {
1993                         ret = -EFAULT;
1994                 } else {
1995                         sqes[i].has_user = has_user;
1996                         sqes[i].needs_lock = true;
1997                         sqes[i].needs_fixed_file = true;
1998                         ret = io_submit_sqe(ctx, &sqes[i], statep);
1999                 }
2000                 if (!ret) {
2001                         submitted++;
2002                         continue;
2003                 }
2004
2005                 io_cqring_add_event(ctx, sqes[i].sqe->user_data, ret);
2006         }
2007
2008         if (statep)
2009                 io_submit_state_end(&state);
2010
2011         return submitted;
2012 }
2013
2014 static int io_sq_thread(void *data)
2015 {
2016         struct sqe_submit sqes[IO_IOPOLL_BATCH];
2017         struct io_ring_ctx *ctx = data;
2018         struct mm_struct *cur_mm = NULL;
2019         mm_segment_t old_fs;
2020         DEFINE_WAIT(wait);
2021         unsigned inflight;
2022         unsigned long timeout;
2023
2024         old_fs = get_fs();
2025         set_fs(USER_DS);
2026
2027         timeout = inflight = 0;
2028         while (!kthread_should_park()) {
2029                 bool all_fixed, mm_fault = false;
2030                 int i;
2031
2032                 if (inflight) {
2033                         unsigned nr_events = 0;
2034
2035                         if (ctx->flags & IORING_SETUP_IOPOLL) {
2036                                 /*
2037                                  * We disallow the app entering submit/complete
2038                                  * with polling, but we still need to lock the
2039                                  * ring to prevent racing with polled issue
2040                                  * that got punted to a workqueue.
2041                                  */
2042                                 mutex_lock(&ctx->uring_lock);
2043                                 io_iopoll_check(ctx, &nr_events, 0);
2044                                 mutex_unlock(&ctx->uring_lock);
2045                         } else {
2046                                 /*
2047                                  * Normal IO, just pretend everything completed.
2048                                  * We don't have to poll completions for that.
2049                                  */
2050                                 nr_events = inflight;
2051                         }
2052
2053                         inflight -= nr_events;
2054                         if (!inflight)
2055                                 timeout = jiffies + ctx->sq_thread_idle;
2056                 }
2057
2058                 if (!io_get_sqring(ctx, &sqes[0])) {
2059                         /*
2060                          * We're polling. If we're within the defined idle
2061                          * period, then let us spin without work before going
2062                          * to sleep.
2063                          */
2064                         if (inflight || !time_after(jiffies, timeout)) {
2065                                 cpu_relax();
2066                                 continue;
2067                         }
2068
2069                         /*
2070                          * Drop cur_mm before scheduling, we can't hold it for
2071                          * long periods (or over schedule()). Do this before
2072                          * adding ourselves to the waitqueue, as the unuse/drop
2073                          * may sleep.
2074                          */
2075                         if (cur_mm) {
2076                                 unuse_mm(cur_mm);
2077                                 mmput(cur_mm);
2078                                 cur_mm = NULL;
2079                         }
2080
2081                         prepare_to_wait(&ctx->sqo_wait, &wait,
2082                                                 TASK_INTERRUPTIBLE);
2083
2084                         /* Tell userspace we may need a wakeup call */
2085                         ctx->sq_ring->flags |= IORING_SQ_NEED_WAKEUP;
2086                         /* make sure to read SQ tail after writing flags */
2087                         smp_mb();
2088
2089                         if (!io_get_sqring(ctx, &sqes[0])) {
2090                                 if (kthread_should_park()) {
2091                                         finish_wait(&ctx->sqo_wait, &wait);
2092                                         break;
2093                                 }
2094                                 if (signal_pending(current))
2095                                         flush_signals(current);
2096                                 schedule();
2097                                 finish_wait(&ctx->sqo_wait, &wait);
2098
2099                                 ctx->sq_ring->flags &= ~IORING_SQ_NEED_WAKEUP;
2100                                 continue;
2101                         }
2102                         finish_wait(&ctx->sqo_wait, &wait);
2103
2104                         ctx->sq_ring->flags &= ~IORING_SQ_NEED_WAKEUP;
2105                 }
2106
2107                 i = 0;
2108                 all_fixed = true;
2109                 do {
2110                         if (all_fixed && io_sqe_needs_user(sqes[i].sqe))
2111                                 all_fixed = false;
2112
2113                         i++;
2114                         if (i == ARRAY_SIZE(sqes))
2115                                 break;
2116                 } while (io_get_sqring(ctx, &sqes[i]));
2117
2118                 /* Unless all new commands are FIXED regions, grab mm */
2119                 if (!all_fixed && !cur_mm) {
2120                         mm_fault = !mmget_not_zero(ctx->sqo_mm);
2121                         if (!mm_fault) {
2122                                 use_mm(ctx->sqo_mm);
2123                                 cur_mm = ctx->sqo_mm;
2124                         }
2125                 }
2126
2127                 inflight += io_submit_sqes(ctx, sqes, i, cur_mm != NULL,
2128                                                 mm_fault);
2129
2130                 /* Commit SQ ring head once we've consumed all SQEs */
2131                 io_commit_sqring(ctx);
2132         }
2133
2134         set_fs(old_fs);
2135         if (cur_mm) {
2136                 unuse_mm(cur_mm);
2137                 mmput(cur_mm);
2138         }
2139
2140         kthread_parkme();
2141
2142         return 0;
2143 }
2144
2145 static int io_ring_submit(struct io_ring_ctx *ctx, unsigned int to_submit)
2146 {
2147         struct io_submit_state state, *statep = NULL;
2148         int i, submit = 0;
2149
2150         if (to_submit > IO_PLUG_THRESHOLD) {
2151                 io_submit_state_start(&state, ctx, to_submit);
2152                 statep = &state;
2153         }
2154
2155         for (i = 0; i < to_submit; i++) {
2156                 struct sqe_submit s;
2157                 int ret;
2158
2159                 if (!io_get_sqring(ctx, &s))
2160                         break;
2161
2162                 s.has_user = true;
2163                 s.needs_lock = false;
2164                 s.needs_fixed_file = false;
2165                 submit++;
2166
2167                 ret = io_submit_sqe(ctx, &s, statep);
2168                 if (ret)
2169                         io_cqring_add_event(ctx, s.sqe->user_data, ret);
2170         }
2171         io_commit_sqring(ctx);
2172
2173         if (statep)
2174                 io_submit_state_end(statep);
2175
2176         return submit;
2177 }
2178
2179 static unsigned io_cqring_events(struct io_cq_ring *ring)
2180 {
2181         /* See comment at the top of this file */
2182         smp_rmb();
2183         return READ_ONCE(ring->r.tail) - READ_ONCE(ring->r.head);
2184 }
2185
2186 /*
2187  * Wait until events become available, if we don't already have some. The
2188  * application must reap them itself, as they reside on the shared cq ring.
2189  */
2190 static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events,
2191                           const sigset_t __user *sig, size_t sigsz)
2192 {
2193         struct io_cq_ring *ring = ctx->cq_ring;
2194         sigset_t ksigmask, sigsaved;
2195         int ret;
2196
2197         if (io_cqring_events(ring) >= min_events)
2198                 return 0;
2199
2200         if (sig) {
2201 #ifdef CONFIG_COMPAT
2202                 if (in_compat_syscall())
2203                         ret = set_compat_user_sigmask((const compat_sigset_t __user *)sig,
2204                                                       &ksigmask, &sigsaved, sigsz);
2205                 else
2206 #endif
2207                         ret = set_user_sigmask(sig, &ksigmask,
2208                                                &sigsaved, sigsz);
2209
2210                 if (ret)
2211                         return ret;
2212         }
2213
2214         ret = wait_event_interruptible(ctx->wait, io_cqring_events(ring) >= min_events);
2215         if (ret == -ERESTARTSYS)
2216                 ret = -EINTR;
2217
2218         if (sig)
2219                 restore_user_sigmask(sig, &sigsaved);
2220
2221         return READ_ONCE(ring->r.head) == READ_ONCE(ring->r.tail) ? ret : 0;
2222 }
2223
2224 static void __io_sqe_files_unregister(struct io_ring_ctx *ctx)
2225 {
2226 #if defined(CONFIG_UNIX)
2227         if (ctx->ring_sock) {
2228                 struct sock *sock = ctx->ring_sock->sk;
2229                 struct sk_buff *skb;
2230
2231                 while ((skb = skb_dequeue(&sock->sk_receive_queue)) != NULL)
2232                         kfree_skb(skb);
2233         }
2234 #else
2235         int i;
2236
2237         for (i = 0; i < ctx->nr_user_files; i++)
2238                 fput(ctx->user_files[i]);
2239 #endif
2240 }
2241
2242 static int io_sqe_files_unregister(struct io_ring_ctx *ctx)
2243 {
2244         if (!ctx->user_files)
2245                 return -ENXIO;
2246
2247         __io_sqe_files_unregister(ctx);
2248         kfree(ctx->user_files);
2249         ctx->user_files = NULL;
2250         ctx->nr_user_files = 0;
2251         return 0;
2252 }
2253
2254 static void io_sq_thread_stop(struct io_ring_ctx *ctx)
2255 {
2256         if (ctx->sqo_thread) {
2257                 /*
2258                  * The park is a bit of a work-around, without it we get
2259                  * warning spews on shutdown with SQPOLL set and affinity
2260                  * set to a single CPU.
2261                  */
2262                 kthread_park(ctx->sqo_thread);
2263                 kthread_stop(ctx->sqo_thread);
2264                 ctx->sqo_thread = NULL;
2265         }
2266 }
2267
2268 static void io_finish_async(struct io_ring_ctx *ctx)
2269 {
2270         io_sq_thread_stop(ctx);
2271
2272         if (ctx->sqo_wq) {
2273                 destroy_workqueue(ctx->sqo_wq);
2274                 ctx->sqo_wq = NULL;
2275         }
2276 }
2277
2278 #if defined(CONFIG_UNIX)
2279 static void io_destruct_skb(struct sk_buff *skb)
2280 {
2281         struct io_ring_ctx *ctx = skb->sk->sk_user_data;
2282
2283         io_finish_async(ctx);
2284         unix_destruct_scm(skb);
2285 }
2286
2287 /*
2288  * Ensure the UNIX gc is aware of our file set, so we are certain that
2289  * the io_uring can be safely unregistered on process exit, even if we have
2290  * loops in the file referencing.
2291  */
2292 static int __io_sqe_files_scm(struct io_ring_ctx *ctx, int nr, int offset)
2293 {
2294         struct sock *sk = ctx->ring_sock->sk;
2295         struct scm_fp_list *fpl;
2296         struct sk_buff *skb;
2297         int i;
2298
2299         if (!capable(CAP_SYS_RESOURCE) && !capable(CAP_SYS_ADMIN)) {
2300                 unsigned long inflight = ctx->user->unix_inflight + nr;
2301
2302                 if (inflight > task_rlimit(current, RLIMIT_NOFILE))
2303                         return -EMFILE;
2304         }
2305
2306         fpl = kzalloc(sizeof(*fpl), GFP_KERNEL);
2307         if (!fpl)
2308                 return -ENOMEM;
2309
2310         skb = alloc_skb(0, GFP_KERNEL);
2311         if (!skb) {
2312                 kfree(fpl);
2313                 return -ENOMEM;
2314         }
2315
2316         skb->sk = sk;
2317         skb->destructor = io_destruct_skb;
2318
2319         fpl->user = get_uid(ctx->user);
2320         for (i = 0; i < nr; i++) {
2321                 fpl->fp[i] = get_file(ctx->user_files[i + offset]);
2322                 unix_inflight(fpl->user, fpl->fp[i]);
2323         }
2324
2325         fpl->max = fpl->count = nr;
2326         UNIXCB(skb).fp = fpl;
2327         refcount_add(skb->truesize, &sk->sk_wmem_alloc);
2328         skb_queue_head(&sk->sk_receive_queue, skb);
2329
2330         for (i = 0; i < nr; i++)
2331                 fput(fpl->fp[i]);
2332
2333         return 0;
2334 }
2335
2336 /*
2337  * If UNIX sockets are enabled, fd passing can cause a reference cycle which
2338  * causes regular reference counting to break down. We rely on the UNIX
2339  * garbage collection to take care of this problem for us.
2340  */
2341 static int io_sqe_files_scm(struct io_ring_ctx *ctx)
2342 {
2343         unsigned left, total;
2344         int ret = 0;
2345
2346         total = 0;
2347         left = ctx->nr_user_files;
2348         while (left) {
2349                 unsigned this_files = min_t(unsigned, left, SCM_MAX_FD);
2350
2351                 ret = __io_sqe_files_scm(ctx, this_files, total);
2352                 if (ret)
2353                         break;
2354                 left -= this_files;
2355                 total += this_files;
2356         }
2357
2358         if (!ret)
2359                 return 0;
2360
2361         while (total < ctx->nr_user_files) {
2362                 fput(ctx->user_files[total]);
2363                 total++;
2364         }
2365
2366         return ret;
2367 }
2368 #else
2369 static int io_sqe_files_scm(struct io_ring_ctx *ctx)
2370 {
2371         return 0;
2372 }
2373 #endif
2374
2375 static int io_sqe_files_register(struct io_ring_ctx *ctx, void __user *arg,
2376                                  unsigned nr_args)
2377 {
2378         __s32 __user *fds = (__s32 __user *) arg;
2379         int fd, ret = 0;
2380         unsigned i;
2381
2382         if (ctx->user_files)
2383                 return -EBUSY;
2384         if (!nr_args)
2385                 return -EINVAL;
2386         if (nr_args > IORING_MAX_FIXED_FILES)
2387                 return -EMFILE;
2388
2389         ctx->user_files = kcalloc(nr_args, sizeof(struct file *), GFP_KERNEL);
2390         if (!ctx->user_files)
2391                 return -ENOMEM;
2392
2393         for (i = 0; i < nr_args; i++) {
2394                 ret = -EFAULT;
2395                 if (copy_from_user(&fd, &fds[i], sizeof(fd)))
2396                         break;
2397
2398                 ctx->user_files[i] = fget(fd);
2399
2400                 ret = -EBADF;
2401                 if (!ctx->user_files[i])
2402                         break;
2403                 /*
2404                  * Don't allow io_uring instances to be registered. If UNIX
2405                  * isn't enabled, then this causes a reference cycle and this
2406                  * instance can never get freed. If UNIX is enabled we'll
2407                  * handle it just fine, but there's still no point in allowing
2408                  * a ring fd as it doesn't support regular read/write anyway.
2409                  */
2410                 if (ctx->user_files[i]->f_op == &io_uring_fops) {
2411                         fput(ctx->user_files[i]);
2412                         break;
2413                 }
2414                 ctx->nr_user_files++;
2415                 ret = 0;
2416         }
2417
2418         if (ret) {
2419                 for (i = 0; i < ctx->nr_user_files; i++)
2420                         fput(ctx->user_files[i]);
2421
2422                 kfree(ctx->user_files);
2423                 ctx->user_files = NULL;
2424                 ctx->nr_user_files = 0;
2425                 return ret;
2426         }
2427
2428         ret = io_sqe_files_scm(ctx);
2429         if (ret)
2430                 io_sqe_files_unregister(ctx);
2431
2432         return ret;
2433 }
2434
2435 static int io_sq_offload_start(struct io_ring_ctx *ctx,
2436                                struct io_uring_params *p)
2437 {
2438         int ret;
2439
2440         init_waitqueue_head(&ctx->sqo_wait);
2441         mmgrab(current->mm);
2442         ctx->sqo_mm = current->mm;
2443
2444         if (ctx->flags & IORING_SETUP_SQPOLL) {
2445                 ret = -EPERM;
2446                 if (!capable(CAP_SYS_ADMIN))
2447                         goto err;
2448
2449                 ctx->sq_thread_idle = msecs_to_jiffies(p->sq_thread_idle);
2450                 if (!ctx->sq_thread_idle)
2451                         ctx->sq_thread_idle = HZ;
2452
2453                 if (p->flags & IORING_SETUP_SQ_AFF) {
2454                         int cpu = p->sq_thread_cpu;
2455
2456                         ret = -EINVAL;
2457                         if (cpu >= nr_cpu_ids)
2458                                 goto err;
2459                         if (!cpu_online(cpu))
2460                                 goto err;
2461
2462                         ctx->sqo_thread = kthread_create_on_cpu(io_sq_thread,
2463                                                         ctx, cpu,
2464                                                         "io_uring-sq");
2465                 } else {
2466                         ctx->sqo_thread = kthread_create(io_sq_thread, ctx,
2467                                                         "io_uring-sq");
2468                 }
2469                 if (IS_ERR(ctx->sqo_thread)) {
2470                         ret = PTR_ERR(ctx->sqo_thread);
2471                         ctx->sqo_thread = NULL;
2472                         goto err;
2473                 }
2474                 wake_up_process(ctx->sqo_thread);
2475         } else if (p->flags & IORING_SETUP_SQ_AFF) {
2476                 /* Can't have SQ_AFF without SQPOLL */
2477                 ret = -EINVAL;
2478                 goto err;
2479         }
2480
2481         /* Do QD, or 2 * CPUS, whatever is smallest */
2482         ctx->sqo_wq = alloc_workqueue("io_ring-wq", WQ_UNBOUND | WQ_FREEZABLE,
2483                         min(ctx->sq_entries - 1, 2 * num_online_cpus()));
2484         if (!ctx->sqo_wq) {
2485                 ret = -ENOMEM;
2486                 goto err;
2487         }
2488
2489         return 0;
2490 err:
2491         io_sq_thread_stop(ctx);
2492         mmdrop(ctx->sqo_mm);
2493         ctx->sqo_mm = NULL;
2494         return ret;
2495 }
2496
2497 static void io_unaccount_mem(struct user_struct *user, unsigned long nr_pages)
2498 {
2499         atomic_long_sub(nr_pages, &user->locked_vm);
2500 }
2501
2502 static int io_account_mem(struct user_struct *user, unsigned long nr_pages)
2503 {
2504         unsigned long page_limit, cur_pages, new_pages;
2505
2506         /* Don't allow more pages than we can safely lock */
2507         page_limit = rlimit(RLIMIT_MEMLOCK) >> PAGE_SHIFT;
2508
2509         do {
2510                 cur_pages = atomic_long_read(&user->locked_vm);
2511                 new_pages = cur_pages + nr_pages;
2512                 if (new_pages > page_limit)
2513                         return -ENOMEM;
2514         } while (atomic_long_cmpxchg(&user->locked_vm, cur_pages,
2515                                         new_pages) != cur_pages);
2516
2517         return 0;
2518 }
2519
2520 static void io_mem_free(void *ptr)
2521 {
2522         struct page *page;
2523
2524         if (!ptr)
2525                 return;
2526
2527         page = virt_to_head_page(ptr);
2528         if (put_page_testzero(page))
2529                 free_compound_page(page);
2530 }
2531
2532 static void *io_mem_alloc(size_t size)
2533 {
2534         gfp_t gfp_flags = GFP_KERNEL | __GFP_ZERO | __GFP_NOWARN | __GFP_COMP |
2535                                 __GFP_NORETRY;
2536
2537         return (void *) __get_free_pages(gfp_flags, get_order(size));
2538 }
2539
2540 static unsigned long ring_pages(unsigned sq_entries, unsigned cq_entries)
2541 {
2542         struct io_sq_ring *sq_ring;
2543         struct io_cq_ring *cq_ring;
2544         size_t bytes;
2545
2546         bytes = struct_size(sq_ring, array, sq_entries);
2547         bytes += array_size(sizeof(struct io_uring_sqe), sq_entries);
2548         bytes += struct_size(cq_ring, cqes, cq_entries);
2549
2550         return (bytes + PAGE_SIZE - 1) / PAGE_SIZE;
2551 }
2552
2553 static int io_sqe_buffer_unregister(struct io_ring_ctx *ctx)
2554 {
2555         int i, j;
2556
2557         if (!ctx->user_bufs)
2558                 return -ENXIO;
2559
2560         for (i = 0; i < ctx->nr_user_bufs; i++) {
2561                 struct io_mapped_ubuf *imu = &ctx->user_bufs[i];
2562
2563                 for (j = 0; j < imu->nr_bvecs; j++)
2564                         put_page(imu->bvec[j].bv_page);
2565
2566                 if (ctx->account_mem)
2567                         io_unaccount_mem(ctx->user, imu->nr_bvecs);
2568                 kvfree(imu->bvec);
2569                 imu->nr_bvecs = 0;
2570         }
2571
2572         kfree(ctx->user_bufs);
2573         ctx->user_bufs = NULL;
2574         ctx->nr_user_bufs = 0;
2575         return 0;
2576 }
2577
2578 static int io_copy_iov(struct io_ring_ctx *ctx, struct iovec *dst,
2579                        void __user *arg, unsigned index)
2580 {
2581         struct iovec __user *src;
2582
2583 #ifdef CONFIG_COMPAT
2584         if (ctx->compat) {
2585                 struct compat_iovec __user *ciovs;
2586                 struct compat_iovec ciov;
2587
2588                 ciovs = (struct compat_iovec __user *) arg;
2589                 if (copy_from_user(&ciov, &ciovs[index], sizeof(ciov)))
2590                         return -EFAULT;
2591
2592                 dst->iov_base = (void __user *) (unsigned long) ciov.iov_base;
2593                 dst->iov_len = ciov.iov_len;
2594                 return 0;
2595         }
2596 #endif
2597         src = (struct iovec __user *) arg;
2598         if (copy_from_user(dst, &src[index], sizeof(*dst)))
2599                 return -EFAULT;
2600         return 0;
2601 }
2602
2603 static int io_sqe_buffer_register(struct io_ring_ctx *ctx, void __user *arg,
2604                                   unsigned nr_args)
2605 {
2606         struct vm_area_struct **vmas = NULL;
2607         struct page **pages = NULL;
2608         int i, j, got_pages = 0;
2609         int ret = -EINVAL;
2610
2611         if (ctx->user_bufs)
2612                 return -EBUSY;
2613         if (!nr_args || nr_args > UIO_MAXIOV)
2614                 return -EINVAL;
2615
2616         ctx->user_bufs = kcalloc(nr_args, sizeof(struct io_mapped_ubuf),
2617                                         GFP_KERNEL);
2618         if (!ctx->user_bufs)
2619                 return -ENOMEM;
2620
2621         for (i = 0; i < nr_args; i++) {
2622                 struct io_mapped_ubuf *imu = &ctx->user_bufs[i];
2623                 unsigned long off, start, end, ubuf;
2624                 int pret, nr_pages;
2625                 struct iovec iov;
2626                 size_t size;
2627
2628                 ret = io_copy_iov(ctx, &iov, arg, i);
2629                 if (ret)
2630                         goto err;
2631
2632                 /*
2633                  * Don't impose further limits on the size and buffer
2634                  * constraints here, we'll -EINVAL later when IO is
2635                  * submitted if they are wrong.
2636                  */
2637                 ret = -EFAULT;
2638                 if (!iov.iov_base || !iov.iov_len)
2639                         goto err;
2640
2641                 /* arbitrary limit, but we need something */
2642                 if (iov.iov_len > SZ_1G)
2643                         goto err;
2644
2645                 ubuf = (unsigned long) iov.iov_base;
2646                 end = (ubuf + iov.iov_len + PAGE_SIZE - 1) >> PAGE_SHIFT;
2647                 start = ubuf >> PAGE_SHIFT;
2648                 nr_pages = end - start;
2649
2650                 if (ctx->account_mem) {
2651                         ret = io_account_mem(ctx->user, nr_pages);
2652                         if (ret)
2653                                 goto err;
2654                 }
2655
2656                 ret = 0;
2657                 if (!pages || nr_pages > got_pages) {
2658                         kfree(vmas);
2659                         kfree(pages);
2660                         pages = kvmalloc_array(nr_pages, sizeof(struct page *),
2661                                                 GFP_KERNEL);
2662                         vmas = kvmalloc_array(nr_pages,
2663                                         sizeof(struct vm_area_struct *),
2664                                         GFP_KERNEL);
2665                         if (!pages || !vmas) {
2666                                 ret = -ENOMEM;
2667                                 if (ctx->account_mem)
2668                                         io_unaccount_mem(ctx->user, nr_pages);
2669                                 goto err;
2670                         }
2671                         got_pages = nr_pages;
2672                 }
2673
2674                 imu->bvec = kvmalloc_array(nr_pages, sizeof(struct bio_vec),
2675                                                 GFP_KERNEL);
2676                 ret = -ENOMEM;
2677                 if (!imu->bvec) {
2678                         if (ctx->account_mem)
2679                                 io_unaccount_mem(ctx->user, nr_pages);
2680                         goto err;
2681                 }
2682
2683                 ret = 0;
2684                 down_read(&current->mm->mmap_sem);
2685                 pret = get_user_pages(ubuf, nr_pages,
2686                                       FOLL_WRITE | FOLL_LONGTERM,
2687                                       pages, vmas);
2688                 if (pret == nr_pages) {
2689                         /* don't support file backed memory */
2690                         for (j = 0; j < nr_pages; j++) {
2691                                 struct vm_area_struct *vma = vmas[j];
2692
2693                                 if (vma->vm_file &&
2694                                     !is_file_hugepages(vma->vm_file)) {
2695                                         ret = -EOPNOTSUPP;
2696                                         break;
2697                                 }
2698                         }
2699                 } else {
2700                         ret = pret < 0 ? pret : -EFAULT;
2701                 }
2702                 up_read(&current->mm->mmap_sem);
2703                 if (ret) {
2704                         /*
2705                          * if we did partial map, or found file backed vmas,
2706                          * release any pages we did get
2707                          */
2708                         if (pret > 0) {
2709                                 for (j = 0; j < pret; j++)
2710                                         put_page(pages[j]);
2711                         }
2712                         if (ctx->account_mem)
2713                                 io_unaccount_mem(ctx->user, nr_pages);
2714                         kvfree(imu->bvec);
2715                         goto err;
2716                 }
2717
2718                 off = ubuf & ~PAGE_MASK;
2719                 size = iov.iov_len;
2720                 for (j = 0; j < nr_pages; j++) {
2721                         size_t vec_len;
2722
2723                         vec_len = min_t(size_t, size, PAGE_SIZE - off);
2724                         imu->bvec[j].bv_page = pages[j];
2725                         imu->bvec[j].bv_len = vec_len;
2726                         imu->bvec[j].bv_offset = off;
2727                         off = 0;
2728                         size -= vec_len;
2729                 }
2730                 /* store original address for later verification */
2731                 imu->ubuf = ubuf;
2732                 imu->len = iov.iov_len;
2733                 imu->nr_bvecs = nr_pages;
2734
2735                 ctx->nr_user_bufs++;
2736         }
2737         kvfree(pages);
2738         kvfree(vmas);
2739         return 0;
2740 err:
2741         kvfree(pages);
2742         kvfree(vmas);
2743         io_sqe_buffer_unregister(ctx);
2744         return ret;
2745 }
2746
2747 static int io_eventfd_register(struct io_ring_ctx *ctx, void __user *arg)
2748 {
2749         __s32 __user *fds = arg;
2750         int fd;
2751
2752         if (ctx->cq_ev_fd)
2753                 return -EBUSY;
2754
2755         if (copy_from_user(&fd, fds, sizeof(*fds)))
2756                 return -EFAULT;
2757
2758         ctx->cq_ev_fd = eventfd_ctx_fdget(fd);
2759         if (IS_ERR(ctx->cq_ev_fd)) {
2760                 int ret = PTR_ERR(ctx->cq_ev_fd);
2761                 ctx->cq_ev_fd = NULL;
2762                 return ret;
2763         }
2764
2765         return 0;
2766 }
2767
2768 static int io_eventfd_unregister(struct io_ring_ctx *ctx)
2769 {
2770         if (ctx->cq_ev_fd) {
2771                 eventfd_ctx_put(ctx->cq_ev_fd);
2772                 ctx->cq_ev_fd = NULL;
2773                 return 0;
2774         }
2775
2776         return -ENXIO;
2777 }
2778
2779 static void io_ring_ctx_free(struct io_ring_ctx *ctx)
2780 {
2781         io_finish_async(ctx);
2782         if (ctx->sqo_mm)
2783                 mmdrop(ctx->sqo_mm);
2784
2785         io_iopoll_reap_events(ctx);
2786         io_sqe_buffer_unregister(ctx);
2787         io_sqe_files_unregister(ctx);
2788         io_eventfd_unregister(ctx);
2789
2790 #if defined(CONFIG_UNIX)
2791         if (ctx->ring_sock)
2792                 sock_release(ctx->ring_sock);
2793 #endif
2794
2795         io_mem_free(ctx->sq_ring);
2796         io_mem_free(ctx->sq_sqes);
2797         io_mem_free(ctx->cq_ring);
2798
2799         percpu_ref_exit(&ctx->refs);
2800         if (ctx->account_mem)
2801                 io_unaccount_mem(ctx->user,
2802                                 ring_pages(ctx->sq_entries, ctx->cq_entries));
2803         free_uid(ctx->user);
2804         kfree(ctx);
2805 }
2806
2807 static __poll_t io_uring_poll(struct file *file, poll_table *wait)
2808 {
2809         struct io_ring_ctx *ctx = file->private_data;
2810         __poll_t mask = 0;
2811
2812         poll_wait(file, &ctx->cq_wait, wait);
2813         /*
2814          * synchronizes with barrier from wq_has_sleeper call in
2815          * io_commit_cqring
2816          */
2817         smp_rmb();
2818         if (READ_ONCE(ctx->sq_ring->r.tail) - ctx->cached_sq_head !=
2819             ctx->sq_ring->ring_entries)
2820                 mask |= EPOLLOUT | EPOLLWRNORM;
2821         if (READ_ONCE(ctx->cq_ring->r.head) != ctx->cached_cq_tail)
2822                 mask |= EPOLLIN | EPOLLRDNORM;
2823
2824         return mask;
2825 }
2826
2827 static int io_uring_fasync(int fd, struct file *file, int on)
2828 {
2829         struct io_ring_ctx *ctx = file->private_data;
2830
2831         return fasync_helper(fd, file, on, &ctx->cq_fasync);
2832 }
2833
2834 static void io_ring_ctx_wait_and_kill(struct io_ring_ctx *ctx)
2835 {
2836         mutex_lock(&ctx->uring_lock);
2837         percpu_ref_kill(&ctx->refs);
2838         mutex_unlock(&ctx->uring_lock);
2839
2840         io_poll_remove_all(ctx);
2841         io_iopoll_reap_events(ctx);
2842         wait_for_completion(&ctx->ctx_done);
2843         io_ring_ctx_free(ctx);
2844 }
2845
2846 static int io_uring_release(struct inode *inode, struct file *file)
2847 {
2848         struct io_ring_ctx *ctx = file->private_data;
2849
2850         file->private_data = NULL;
2851         io_ring_ctx_wait_and_kill(ctx);
2852         return 0;
2853 }
2854
2855 static int io_uring_mmap(struct file *file, struct vm_area_struct *vma)
2856 {
2857         loff_t offset = (loff_t) vma->vm_pgoff << PAGE_SHIFT;
2858         unsigned long sz = vma->vm_end - vma->vm_start;
2859         struct io_ring_ctx *ctx = file->private_data;
2860         unsigned long pfn;
2861         struct page *page;
2862         void *ptr;
2863
2864         switch (offset) {
2865         case IORING_OFF_SQ_RING:
2866                 ptr = ctx->sq_ring;
2867                 break;
2868         case IORING_OFF_SQES:
2869                 ptr = ctx->sq_sqes;
2870                 break;
2871         case IORING_OFF_CQ_RING:
2872                 ptr = ctx->cq_ring;
2873                 break;
2874         default:
2875                 return -EINVAL;
2876         }
2877
2878         page = virt_to_head_page(ptr);
2879         if (sz > (PAGE_SIZE << compound_order(page)))
2880                 return -EINVAL;
2881
2882         pfn = virt_to_phys(ptr) >> PAGE_SHIFT;
2883         return remap_pfn_range(vma, vma->vm_start, pfn, sz, vma->vm_page_prot);
2884 }
2885
2886 SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit,
2887                 u32, min_complete, u32, flags, const sigset_t __user *, sig,
2888                 size_t, sigsz)
2889 {
2890         struct io_ring_ctx *ctx;
2891         long ret = -EBADF;
2892         int submitted = 0;
2893         struct fd f;
2894
2895         if (flags & ~(IORING_ENTER_GETEVENTS | IORING_ENTER_SQ_WAKEUP))
2896                 return -EINVAL;
2897
2898         f = fdget(fd);
2899         if (!f.file)
2900                 return -EBADF;
2901
2902         ret = -EOPNOTSUPP;
2903         if (f.file->f_op != &io_uring_fops)
2904                 goto out_fput;
2905
2906         ret = -ENXIO;
2907         ctx = f.file->private_data;
2908         if (!percpu_ref_tryget(&ctx->refs))
2909                 goto out_fput;
2910
2911         /*
2912          * For SQ polling, the thread will do all submissions and completions.
2913          * Just return the requested submit count, and wake the thread if
2914          * we were asked to.
2915          */
2916         if (ctx->flags & IORING_SETUP_SQPOLL) {
2917                 if (flags & IORING_ENTER_SQ_WAKEUP)
2918                         wake_up(&ctx->sqo_wait);
2919                 submitted = to_submit;
2920                 goto out_ctx;
2921         }
2922
2923         ret = 0;
2924         if (to_submit) {
2925                 to_submit = min(to_submit, ctx->sq_entries);
2926
2927                 mutex_lock(&ctx->uring_lock);
2928                 submitted = io_ring_submit(ctx, to_submit);
2929                 mutex_unlock(&ctx->uring_lock);
2930         }
2931         if (flags & IORING_ENTER_GETEVENTS) {
2932                 unsigned nr_events = 0;
2933
2934                 min_complete = min(min_complete, ctx->cq_entries);
2935
2936                 if (ctx->flags & IORING_SETUP_IOPOLL) {
2937                         mutex_lock(&ctx->uring_lock);
2938                         ret = io_iopoll_check(ctx, &nr_events, min_complete);
2939                         mutex_unlock(&ctx->uring_lock);
2940                 } else {
2941                         ret = io_cqring_wait(ctx, min_complete, sig, sigsz);
2942                 }
2943         }
2944
2945 out_ctx:
2946         io_ring_drop_ctx_refs(ctx, 1);
2947 out_fput:
2948         fdput(f);
2949         return submitted ? submitted : ret;
2950 }
2951
2952 static const struct file_operations io_uring_fops = {
2953         .release        = io_uring_release,
2954         .mmap           = io_uring_mmap,
2955         .poll           = io_uring_poll,
2956         .fasync         = io_uring_fasync,
2957 };
2958
2959 static int io_allocate_scq_urings(struct io_ring_ctx *ctx,
2960                                   struct io_uring_params *p)
2961 {
2962         struct io_sq_ring *sq_ring;
2963         struct io_cq_ring *cq_ring;
2964         size_t size;
2965
2966         sq_ring = io_mem_alloc(struct_size(sq_ring, array, p->sq_entries));
2967         if (!sq_ring)
2968                 return -ENOMEM;
2969
2970         ctx->sq_ring = sq_ring;
2971         sq_ring->ring_mask = p->sq_entries - 1;
2972         sq_ring->ring_entries = p->sq_entries;
2973         ctx->sq_mask = sq_ring->ring_mask;
2974         ctx->sq_entries = sq_ring->ring_entries;
2975
2976         size = array_size(sizeof(struct io_uring_sqe), p->sq_entries);
2977         if (size == SIZE_MAX)
2978                 return -EOVERFLOW;
2979
2980         ctx->sq_sqes = io_mem_alloc(size);
2981         if (!ctx->sq_sqes)
2982                 return -ENOMEM;
2983
2984         cq_ring = io_mem_alloc(struct_size(cq_ring, cqes, p->cq_entries));
2985         if (!cq_ring)
2986                 return -ENOMEM;
2987
2988         ctx->cq_ring = cq_ring;
2989         cq_ring->ring_mask = p->cq_entries - 1;
2990         cq_ring->ring_entries = p->cq_entries;
2991         ctx->cq_mask = cq_ring->ring_mask;
2992         ctx->cq_entries = cq_ring->ring_entries;
2993         return 0;
2994 }
2995
2996 /*
2997  * Allocate an anonymous fd, this is what constitutes the application
2998  * visible backing of an io_uring instance. The application mmaps this
2999  * fd to gain access to the SQ/CQ ring details. If UNIX sockets are enabled,
3000  * we have to tie this fd to a socket for file garbage collection purposes.
3001  */
3002 static int io_uring_get_fd(struct io_ring_ctx *ctx)
3003 {
3004         struct file *file;
3005         int ret;
3006
3007 #if defined(CONFIG_UNIX)
3008         ret = sock_create_kern(&init_net, PF_UNIX, SOCK_RAW, IPPROTO_IP,
3009                                 &ctx->ring_sock);
3010         if (ret)
3011                 return ret;
3012 #endif
3013
3014         ret = get_unused_fd_flags(O_RDWR | O_CLOEXEC);
3015         if (ret < 0)
3016                 goto err;
3017
3018         file = anon_inode_getfile("[io_uring]", &io_uring_fops, ctx,
3019                                         O_RDWR | O_CLOEXEC);
3020         if (IS_ERR(file)) {
3021                 put_unused_fd(ret);
3022                 ret = PTR_ERR(file);
3023                 goto err;
3024         }
3025
3026 #if defined(CONFIG_UNIX)
3027         ctx->ring_sock->file = file;
3028         ctx->ring_sock->sk->sk_user_data = ctx;
3029 #endif
3030         fd_install(ret, file);
3031         return ret;
3032 err:
3033 #if defined(CONFIG_UNIX)
3034         sock_release(ctx->ring_sock);
3035         ctx->ring_sock = NULL;
3036 #endif
3037         return ret;
3038 }
3039
3040 static int io_uring_create(unsigned entries, struct io_uring_params *p)
3041 {
3042         struct user_struct *user = NULL;
3043         struct io_ring_ctx *ctx;
3044         bool account_mem;
3045         int ret;
3046
3047         if (!entries || entries > IORING_MAX_ENTRIES)
3048                 return -EINVAL;
3049
3050         /*
3051          * Use twice as many entries for the CQ ring. It's possible for the
3052          * application to drive a higher depth than the size of the SQ ring,
3053          * since the sqes are only used at submission time. This allows for
3054          * some flexibility in overcommitting a bit.
3055          */
3056         p->sq_entries = roundup_pow_of_two(entries);
3057         p->cq_entries = 2 * p->sq_entries;
3058
3059         user = get_uid(current_user());
3060         account_mem = !capable(CAP_IPC_LOCK);
3061
3062         if (account_mem) {
3063                 ret = io_account_mem(user,
3064                                 ring_pages(p->sq_entries, p->cq_entries));
3065                 if (ret) {
3066                         free_uid(user);
3067                         return ret;
3068                 }
3069         }
3070
3071         ctx = io_ring_ctx_alloc(p);
3072         if (!ctx) {
3073                 if (account_mem)
3074                         io_unaccount_mem(user, ring_pages(p->sq_entries,
3075                                                                 p->cq_entries));
3076                 free_uid(user);
3077                 return -ENOMEM;
3078         }
3079         ctx->compat = in_compat_syscall();
3080         ctx->account_mem = account_mem;
3081         ctx->user = user;
3082
3083         ret = io_allocate_scq_urings(ctx, p);
3084         if (ret)
3085                 goto err;
3086
3087         ret = io_sq_offload_start(ctx, p);
3088         if (ret)
3089                 goto err;
3090
3091         ret = io_uring_get_fd(ctx);
3092         if (ret < 0)
3093                 goto err;
3094
3095         memset(&p->sq_off, 0, sizeof(p->sq_off));
3096         p->sq_off.head = offsetof(struct io_sq_ring, r.head);
3097         p->sq_off.tail = offsetof(struct io_sq_ring, r.tail);
3098         p->sq_off.ring_mask = offsetof(struct io_sq_ring, ring_mask);
3099         p->sq_off.ring_entries = offsetof(struct io_sq_ring, ring_entries);
3100         p->sq_off.flags = offsetof(struct io_sq_ring, flags);
3101         p->sq_off.dropped = offsetof(struct io_sq_ring, dropped);
3102         p->sq_off.array = offsetof(struct io_sq_ring, array);
3103
3104         memset(&p->cq_off, 0, sizeof(p->cq_off));
3105         p->cq_off.head = offsetof(struct io_cq_ring, r.head);
3106         p->cq_off.tail = offsetof(struct io_cq_ring, r.tail);
3107         p->cq_off.ring_mask = offsetof(struct io_cq_ring, ring_mask);
3108         p->cq_off.ring_entries = offsetof(struct io_cq_ring, ring_entries);
3109         p->cq_off.overflow = offsetof(struct io_cq_ring, overflow);
3110         p->cq_off.cqes = offsetof(struct io_cq_ring, cqes);
3111         return ret;
3112 err:
3113         io_ring_ctx_wait_and_kill(ctx);
3114         return ret;
3115 }
3116
3117 /*
3118  * Sets up an aio uring context, and returns the fd. Applications asks for a
3119  * ring size, we return the actual sq/cq ring sizes (among other things) in the
3120  * params structure passed in.
3121  */
3122 static long io_uring_setup(u32 entries, struct io_uring_params __user *params)
3123 {
3124         struct io_uring_params p;
3125         long ret;
3126         int i;
3127
3128         if (copy_from_user(&p, params, sizeof(p)))
3129                 return -EFAULT;
3130         for (i = 0; i < ARRAY_SIZE(p.resv); i++) {
3131                 if (p.resv[i])
3132                         return -EINVAL;
3133         }
3134
3135         if (p.flags & ~(IORING_SETUP_IOPOLL | IORING_SETUP_SQPOLL |
3136                         IORING_SETUP_SQ_AFF))
3137                 return -EINVAL;
3138
3139         ret = io_uring_create(entries, &p);
3140         if (ret < 0)
3141                 return ret;
3142
3143         if (copy_to_user(params, &p, sizeof(p)))
3144                 return -EFAULT;
3145
3146         return ret;
3147 }
3148
3149 SYSCALL_DEFINE2(io_uring_setup, u32, entries,
3150                 struct io_uring_params __user *, params)
3151 {
3152         return io_uring_setup(entries, params);
3153 }
3154
3155 static int __io_uring_register(struct io_ring_ctx *ctx, unsigned opcode,
3156                                void __user *arg, unsigned nr_args)
3157         __releases(ctx->uring_lock)
3158         __acquires(ctx->uring_lock)
3159 {
3160         int ret;
3161
3162         /*
3163          * We're inside the ring mutex, if the ref is already dying, then
3164          * someone else killed the ctx or is already going through
3165          * io_uring_register().
3166          */
3167         if (percpu_ref_is_dying(&ctx->refs))
3168                 return -ENXIO;
3169
3170         percpu_ref_kill(&ctx->refs);
3171
3172         /*
3173          * Drop uring mutex before waiting for references to exit. If another
3174          * thread is currently inside io_uring_enter() it might need to grab
3175          * the uring_lock to make progress. If we hold it here across the drain
3176          * wait, then we can deadlock. It's safe to drop the mutex here, since
3177          * no new references will come in after we've killed the percpu ref.
3178          */
3179         mutex_unlock(&ctx->uring_lock);
3180         wait_for_completion(&ctx->ctx_done);
3181         mutex_lock(&ctx->uring_lock);
3182
3183         switch (opcode) {
3184         case IORING_REGISTER_BUFFERS:
3185                 ret = io_sqe_buffer_register(ctx, arg, nr_args);
3186                 break;
3187         case IORING_UNREGISTER_BUFFERS:
3188                 ret = -EINVAL;
3189                 if (arg || nr_args)
3190                         break;
3191                 ret = io_sqe_buffer_unregister(ctx);
3192                 break;
3193         case IORING_REGISTER_FILES:
3194                 ret = io_sqe_files_register(ctx, arg, nr_args);
3195                 break;
3196         case IORING_UNREGISTER_FILES:
3197                 ret = -EINVAL;
3198                 if (arg || nr_args)
3199                         break;
3200                 ret = io_sqe_files_unregister(ctx);
3201                 break;
3202         case IORING_REGISTER_EVENTFD:
3203                 ret = -EINVAL;
3204                 if (nr_args != 1)
3205                         break;
3206                 ret = io_eventfd_register(ctx, arg);
3207                 break;
3208         case IORING_UNREGISTER_EVENTFD:
3209                 ret = -EINVAL;
3210                 if (arg || nr_args)
3211                         break;
3212                 ret = io_eventfd_unregister(ctx);
3213                 break;
3214         default:
3215                 ret = -EINVAL;
3216                 break;
3217         }
3218
3219         /* bring the ctx back to life */
3220         reinit_completion(&ctx->ctx_done);
3221         percpu_ref_reinit(&ctx->refs);
3222         return ret;
3223 }
3224
3225 SYSCALL_DEFINE4(io_uring_register, unsigned int, fd, unsigned int, opcode,
3226                 void __user *, arg, unsigned int, nr_args)
3227 {
3228         struct io_ring_ctx *ctx;
3229         long ret = -EBADF;
3230         struct fd f;
3231
3232         f = fdget(fd);
3233         if (!f.file)
3234                 return -EBADF;
3235
3236         ret = -EOPNOTSUPP;
3237         if (f.file->f_op != &io_uring_fops)
3238                 goto out_fput;
3239
3240         ctx = f.file->private_data;
3241
3242         mutex_lock(&ctx->uring_lock);
3243         ret = __io_uring_register(ctx, opcode, arg, nr_args);
3244         mutex_unlock(&ctx->uring_lock);
3245 out_fput:
3246         fdput(f);
3247         return ret;
3248 }
3249
3250 static int __init io_uring_init(void)
3251 {
3252         req_cachep = KMEM_CACHE(io_kiocb, SLAB_HWCACHE_ALIGN | SLAB_PANIC);
3253         return 0;
3254 };
3255 __initcall(io_uring_init);