OSDN Git Service

io_uring: fix failure to verify SQ_AFF cpu
[tomoyo/tomoyo-test1.git] / fs / io_uring.c
1 // SPDX-License-Identifier: GPL-2.0
2 /*
3  * Shared application/kernel submission and completion ring pairs, for
4  * supporting fast/efficient IO.
5  *
6  * A note on the read/write ordering memory barriers that are matched between
7  * the application and kernel side.
8  *
9  * After the application reads the CQ ring tail, it must use an
10  * appropriate smp_rmb() to pair with the smp_wmb() the kernel uses
11  * before writing the tail (using smp_load_acquire to read the tail will
12  * do). It also needs a smp_mb() before updating CQ head (ordering the
13  * entry load(s) with the head store), pairing with an implicit barrier
14  * through a control-dependency in io_get_cqring (smp_store_release to
15  * store head will do). Failure to do so could lead to reading invalid
16  * CQ entries.
17  *
18  * Likewise, the application must use an appropriate smp_wmb() before
19  * writing the SQ tail (ordering SQ entry stores with the tail store),
20  * which pairs with smp_load_acquire in io_get_sqring (smp_store_release
21  * to store the tail will do). And it needs a barrier ordering the SQ
22  * head load before writing new SQ entries (smp_load_acquire to read
23  * head will do).
24  *
25  * When using the SQ poll thread (IORING_SETUP_SQPOLL), the application
26  * needs to check the SQ flags for IORING_SQ_NEED_WAKEUP *after*
27  * updating the SQ tail; a full memory barrier smp_mb() is needed
28  * between.
29  *
30  * Also see the examples in the liburing library:
31  *
32  *      git://git.kernel.dk/liburing
33  *
34  * io_uring also uses READ/WRITE_ONCE() for _any_ store or load that happens
35  * from data shared between the kernel and application. This is done both
36  * for ordering purposes, but also to ensure that once a value is loaded from
37  * data that the application could potentially modify, it remains stable.
38  *
39  * Copyright (C) 2018-2019 Jens Axboe
40  * Copyright (c) 2018-2019 Christoph Hellwig
41  */
42 #include <linux/kernel.h>
43 #include <linux/init.h>
44 #include <linux/errno.h>
45 #include <linux/syscalls.h>
46 #include <linux/compat.h>
47 #include <linux/refcount.h>
48 #include <linux/uio.h>
49
50 #include <linux/sched/signal.h>
51 #include <linux/fs.h>
52 #include <linux/file.h>
53 #include <linux/fdtable.h>
54 #include <linux/mm.h>
55 #include <linux/mman.h>
56 #include <linux/mmu_context.h>
57 #include <linux/percpu.h>
58 #include <linux/slab.h>
59 #include <linux/workqueue.h>
60 #include <linux/kthread.h>
61 #include <linux/blkdev.h>
62 #include <linux/bvec.h>
63 #include <linux/net.h>
64 #include <net/sock.h>
65 #include <net/af_unix.h>
66 #include <net/scm.h>
67 #include <linux/anon_inodes.h>
68 #include <linux/sched/mm.h>
69 #include <linux/uaccess.h>
70 #include <linux/nospec.h>
71 #include <linux/sizes.h>
72 #include <linux/hugetlb.h>
73
74 #include <uapi/linux/io_uring.h>
75
76 #include "internal.h"
77
78 #define IORING_MAX_ENTRIES      4096
79 #define IORING_MAX_FIXED_FILES  1024
80
81 struct io_uring {
82         u32 head ____cacheline_aligned_in_smp;
83         u32 tail ____cacheline_aligned_in_smp;
84 };
85
86 /*
87  * This data is shared with the application through the mmap at offset
88  * IORING_OFF_SQ_RING.
89  *
90  * The offsets to the member fields are published through struct
91  * io_sqring_offsets when calling io_uring_setup.
92  */
93 struct io_sq_ring {
94         /*
95          * Head and tail offsets into the ring; the offsets need to be
96          * masked to get valid indices.
97          *
98          * The kernel controls head and the application controls tail.
99          */
100         struct io_uring         r;
101         /*
102          * Bitmask to apply to head and tail offsets (constant, equals
103          * ring_entries - 1)
104          */
105         u32                     ring_mask;
106         /* Ring size (constant, power of 2) */
107         u32                     ring_entries;
108         /*
109          * Number of invalid entries dropped by the kernel due to
110          * invalid index stored in array
111          *
112          * Written by the kernel, shouldn't be modified by the
113          * application (i.e. get number of "new events" by comparing to
114          * cached value).
115          *
116          * After a new SQ head value was read by the application this
117          * counter includes all submissions that were dropped reaching
118          * the new SQ head (and possibly more).
119          */
120         u32                     dropped;
121         /*
122          * Runtime flags
123          *
124          * Written by the kernel, shouldn't be modified by the
125          * application.
126          *
127          * The application needs a full memory barrier before checking
128          * for IORING_SQ_NEED_WAKEUP after updating the sq tail.
129          */
130         u32                     flags;
131         /*
132          * Ring buffer of indices into array of io_uring_sqe, which is
133          * mmapped by the application using the IORING_OFF_SQES offset.
134          *
135          * This indirection could e.g. be used to assign fixed
136          * io_uring_sqe entries to operations and only submit them to
137          * the queue when needed.
138          *
139          * The kernel modifies neither the indices array nor the entries
140          * array.
141          */
142         u32                     array[];
143 };
144
145 /*
146  * This data is shared with the application through the mmap at offset
147  * IORING_OFF_CQ_RING.
148  *
149  * The offsets to the member fields are published through struct
150  * io_cqring_offsets when calling io_uring_setup.
151  */
152 struct io_cq_ring {
153         /*
154          * Head and tail offsets into the ring; the offsets need to be
155          * masked to get valid indices.
156          *
157          * The application controls head and the kernel tail.
158          */
159         struct io_uring         r;
160         /*
161          * Bitmask to apply to head and tail offsets (constant, equals
162          * ring_entries - 1)
163          */
164         u32                     ring_mask;
165         /* Ring size (constant, power of 2) */
166         u32                     ring_entries;
167         /*
168          * Number of completion events lost because the queue was full;
169          * this should be avoided by the application by making sure
170          * there are not more requests pending thatn there is space in
171          * the completion queue.
172          *
173          * Written by the kernel, shouldn't be modified by the
174          * application (i.e. get number of "new events" by comparing to
175          * cached value).
176          *
177          * As completion events come in out of order this counter is not
178          * ordered with any other data.
179          */
180         u32                     overflow;
181         /*
182          * Ring buffer of completion events.
183          *
184          * The kernel writes completion events fresh every time they are
185          * produced, so the application is allowed to modify pending
186          * entries.
187          */
188         struct io_uring_cqe     cqes[];
189 };
190
191 struct io_mapped_ubuf {
192         u64             ubuf;
193         size_t          len;
194         struct          bio_vec *bvec;
195         unsigned int    nr_bvecs;
196 };
197
198 struct async_list {
199         spinlock_t              lock;
200         atomic_t                cnt;
201         struct list_head        list;
202
203         struct file             *file;
204         off_t                   io_end;
205         size_t                  io_pages;
206 };
207
208 struct io_ring_ctx {
209         struct {
210                 struct percpu_ref       refs;
211         } ____cacheline_aligned_in_smp;
212
213         struct {
214                 unsigned int            flags;
215                 bool                    compat;
216                 bool                    account_mem;
217
218                 /* SQ ring */
219                 struct io_sq_ring       *sq_ring;
220                 unsigned                cached_sq_head;
221                 unsigned                sq_entries;
222                 unsigned                sq_mask;
223                 unsigned                sq_thread_idle;
224                 struct io_uring_sqe     *sq_sqes;
225
226                 struct list_head        defer_list;
227         } ____cacheline_aligned_in_smp;
228
229         /* IO offload */
230         struct workqueue_struct *sqo_wq;
231         struct task_struct      *sqo_thread;    /* if using sq thread polling */
232         struct mm_struct        *sqo_mm;
233         wait_queue_head_t       sqo_wait;
234         unsigned                sqo_stop;
235
236         struct {
237                 /* CQ ring */
238                 struct io_cq_ring       *cq_ring;
239                 unsigned                cached_cq_tail;
240                 unsigned                cq_entries;
241                 unsigned                cq_mask;
242                 struct wait_queue_head  cq_wait;
243                 struct fasync_struct    *cq_fasync;
244                 struct eventfd_ctx      *cq_ev_fd;
245         } ____cacheline_aligned_in_smp;
246
247         /*
248          * If used, fixed file set. Writers must ensure that ->refs is dead,
249          * readers must ensure that ->refs is alive as long as the file* is
250          * used. Only updated through io_uring_register(2).
251          */
252         struct file             **user_files;
253         unsigned                nr_user_files;
254
255         /* if used, fixed mapped user buffers */
256         unsigned                nr_user_bufs;
257         struct io_mapped_ubuf   *user_bufs;
258
259         struct user_struct      *user;
260
261         struct completion       ctx_done;
262
263         struct {
264                 struct mutex            uring_lock;
265                 wait_queue_head_t       wait;
266         } ____cacheline_aligned_in_smp;
267
268         struct {
269                 spinlock_t              completion_lock;
270                 bool                    poll_multi_file;
271                 /*
272                  * ->poll_list is protected by the ctx->uring_lock for
273                  * io_uring instances that don't use IORING_SETUP_SQPOLL.
274                  * For SQPOLL, only the single threaded io_sq_thread() will
275                  * manipulate the list, hence no extra locking is needed there.
276                  */
277                 struct list_head        poll_list;
278                 struct list_head        cancel_list;
279         } ____cacheline_aligned_in_smp;
280
281         struct async_list       pending_async[2];
282
283 #if defined(CONFIG_UNIX)
284         struct socket           *ring_sock;
285 #endif
286 };
287
288 struct sqe_submit {
289         const struct io_uring_sqe       *sqe;
290         unsigned short                  index;
291         bool                            has_user;
292         bool                            needs_lock;
293         bool                            needs_fixed_file;
294 };
295
296 /*
297  * First field must be the file pointer in all the
298  * iocb unions! See also 'struct kiocb' in <linux/fs.h>
299  */
300 struct io_poll_iocb {
301         struct file                     *file;
302         struct wait_queue_head          *head;
303         __poll_t                        events;
304         bool                            done;
305         bool                            canceled;
306         struct wait_queue_entry         wait;
307 };
308
309 /*
310  * NOTE! Each of the iocb union members has the file pointer
311  * as the first entry in their struct definition. So you can
312  * access the file pointer through any of the sub-structs,
313  * or directly as just 'ki_filp' in this struct.
314  */
315 struct io_kiocb {
316         union {
317                 struct file             *file;
318                 struct kiocb            rw;
319                 struct io_poll_iocb     poll;
320         };
321
322         struct sqe_submit       submit;
323
324         struct io_ring_ctx      *ctx;
325         struct list_head        list;
326         unsigned int            flags;
327         refcount_t              refs;
328 #define REQ_F_NOWAIT            1       /* must not punt to workers */
329 #define REQ_F_IOPOLL_COMPLETED  2       /* polled IO has completed */
330 #define REQ_F_FIXED_FILE        4       /* ctx owns file */
331 #define REQ_F_SEQ_PREV          8       /* sequential with previous */
332 #define REQ_F_IO_DRAIN          16      /* drain existing IO first */
333 #define REQ_F_IO_DRAINED        32      /* drain done */
334         u64                     user_data;
335         u32                     error;  /* iopoll result from callback */
336         u32                     sequence;
337
338         struct work_struct      work;
339 };
340
341 #define IO_PLUG_THRESHOLD               2
342 #define IO_IOPOLL_BATCH                 8
343
344 struct io_submit_state {
345         struct blk_plug         plug;
346
347         /*
348          * io_kiocb alloc cache
349          */
350         void                    *reqs[IO_IOPOLL_BATCH];
351         unsigned                int free_reqs;
352         unsigned                int cur_req;
353
354         /*
355          * File reference cache
356          */
357         struct file             *file;
358         unsigned int            fd;
359         unsigned int            has_refs;
360         unsigned int            used_refs;
361         unsigned int            ios_left;
362 };
363
364 static void io_sq_wq_submit_work(struct work_struct *work);
365
366 static struct kmem_cache *req_cachep;
367
368 static const struct file_operations io_uring_fops;
369
370 struct sock *io_uring_get_socket(struct file *file)
371 {
372 #if defined(CONFIG_UNIX)
373         if (file->f_op == &io_uring_fops) {
374                 struct io_ring_ctx *ctx = file->private_data;
375
376                 return ctx->ring_sock->sk;
377         }
378 #endif
379         return NULL;
380 }
381 EXPORT_SYMBOL(io_uring_get_socket);
382
383 static void io_ring_ctx_ref_free(struct percpu_ref *ref)
384 {
385         struct io_ring_ctx *ctx = container_of(ref, struct io_ring_ctx, refs);
386
387         complete(&ctx->ctx_done);
388 }
389
390 static struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
391 {
392         struct io_ring_ctx *ctx;
393         int i;
394
395         ctx = kzalloc(sizeof(*ctx), GFP_KERNEL);
396         if (!ctx)
397                 return NULL;
398
399         if (percpu_ref_init(&ctx->refs, io_ring_ctx_ref_free, 0, GFP_KERNEL)) {
400                 kfree(ctx);
401                 return NULL;
402         }
403
404         ctx->flags = p->flags;
405         init_waitqueue_head(&ctx->cq_wait);
406         init_completion(&ctx->ctx_done);
407         mutex_init(&ctx->uring_lock);
408         init_waitqueue_head(&ctx->wait);
409         for (i = 0; i < ARRAY_SIZE(ctx->pending_async); i++) {
410                 spin_lock_init(&ctx->pending_async[i].lock);
411                 INIT_LIST_HEAD(&ctx->pending_async[i].list);
412                 atomic_set(&ctx->pending_async[i].cnt, 0);
413         }
414         spin_lock_init(&ctx->completion_lock);
415         INIT_LIST_HEAD(&ctx->poll_list);
416         INIT_LIST_HEAD(&ctx->cancel_list);
417         INIT_LIST_HEAD(&ctx->defer_list);
418         return ctx;
419 }
420
421 static inline bool io_sequence_defer(struct io_ring_ctx *ctx,
422                                      struct io_kiocb *req)
423 {
424         if ((req->flags & (REQ_F_IO_DRAIN|REQ_F_IO_DRAINED)) != REQ_F_IO_DRAIN)
425                 return false;
426
427         return req->sequence > ctx->cached_cq_tail + ctx->sq_ring->dropped;
428 }
429
430 static struct io_kiocb *io_get_deferred_req(struct io_ring_ctx *ctx)
431 {
432         struct io_kiocb *req;
433
434         if (list_empty(&ctx->defer_list))
435                 return NULL;
436
437         req = list_first_entry(&ctx->defer_list, struct io_kiocb, list);
438         if (!io_sequence_defer(ctx, req)) {
439                 list_del_init(&req->list);
440                 return req;
441         }
442
443         return NULL;
444 }
445
446 static void __io_commit_cqring(struct io_ring_ctx *ctx)
447 {
448         struct io_cq_ring *ring = ctx->cq_ring;
449
450         if (ctx->cached_cq_tail != READ_ONCE(ring->r.tail)) {
451                 /* order cqe stores with ring update */
452                 smp_store_release(&ring->r.tail, ctx->cached_cq_tail);
453
454                 if (wq_has_sleeper(&ctx->cq_wait)) {
455                         wake_up_interruptible(&ctx->cq_wait);
456                         kill_fasync(&ctx->cq_fasync, SIGIO, POLL_IN);
457                 }
458         }
459 }
460
461 static void io_commit_cqring(struct io_ring_ctx *ctx)
462 {
463         struct io_kiocb *req;
464
465         __io_commit_cqring(ctx);
466
467         while ((req = io_get_deferred_req(ctx)) != NULL) {
468                 req->flags |= REQ_F_IO_DRAINED;
469                 queue_work(ctx->sqo_wq, &req->work);
470         }
471 }
472
473 static struct io_uring_cqe *io_get_cqring(struct io_ring_ctx *ctx)
474 {
475         struct io_cq_ring *ring = ctx->cq_ring;
476         unsigned tail;
477
478         tail = ctx->cached_cq_tail;
479         /*
480          * writes to the cq entry need to come after reading head; the
481          * control dependency is enough as we're using WRITE_ONCE to
482          * fill the cq entry
483          */
484         if (tail - READ_ONCE(ring->r.head) == ring->ring_entries)
485                 return NULL;
486
487         ctx->cached_cq_tail++;
488         return &ring->cqes[tail & ctx->cq_mask];
489 }
490
491 static void io_cqring_fill_event(struct io_ring_ctx *ctx, u64 ki_user_data,
492                                  long res, unsigned ev_flags)
493 {
494         struct io_uring_cqe *cqe;
495
496         /*
497          * If we can't get a cq entry, userspace overflowed the
498          * submission (by quite a lot). Increment the overflow count in
499          * the ring.
500          */
501         cqe = io_get_cqring(ctx);
502         if (cqe) {
503                 WRITE_ONCE(cqe->user_data, ki_user_data);
504                 WRITE_ONCE(cqe->res, res);
505                 WRITE_ONCE(cqe->flags, ev_flags);
506         } else {
507                 unsigned overflow = READ_ONCE(ctx->cq_ring->overflow);
508
509                 WRITE_ONCE(ctx->cq_ring->overflow, overflow + 1);
510         }
511 }
512
513 static void io_cqring_ev_posted(struct io_ring_ctx *ctx)
514 {
515         if (waitqueue_active(&ctx->wait))
516                 wake_up(&ctx->wait);
517         if (waitqueue_active(&ctx->sqo_wait))
518                 wake_up(&ctx->sqo_wait);
519         if (ctx->cq_ev_fd)
520                 eventfd_signal(ctx->cq_ev_fd, 1);
521 }
522
523 static void io_cqring_add_event(struct io_ring_ctx *ctx, u64 user_data,
524                                 long res, unsigned ev_flags)
525 {
526         unsigned long flags;
527
528         spin_lock_irqsave(&ctx->completion_lock, flags);
529         io_cqring_fill_event(ctx, user_data, res, ev_flags);
530         io_commit_cqring(ctx);
531         spin_unlock_irqrestore(&ctx->completion_lock, flags);
532
533         io_cqring_ev_posted(ctx);
534 }
535
536 static void io_ring_drop_ctx_refs(struct io_ring_ctx *ctx, unsigned refs)
537 {
538         percpu_ref_put_many(&ctx->refs, refs);
539
540         if (waitqueue_active(&ctx->wait))
541                 wake_up(&ctx->wait);
542 }
543
544 static struct io_kiocb *io_get_req(struct io_ring_ctx *ctx,
545                                    struct io_submit_state *state)
546 {
547         gfp_t gfp = GFP_KERNEL | __GFP_NOWARN;
548         struct io_kiocb *req;
549
550         if (!percpu_ref_tryget(&ctx->refs))
551                 return NULL;
552
553         if (!state) {
554                 req = kmem_cache_alloc(req_cachep, gfp);
555                 if (unlikely(!req))
556                         goto out;
557         } else if (!state->free_reqs) {
558                 size_t sz;
559                 int ret;
560
561                 sz = min_t(size_t, state->ios_left, ARRAY_SIZE(state->reqs));
562                 ret = kmem_cache_alloc_bulk(req_cachep, gfp, sz, state->reqs);
563
564                 /*
565                  * Bulk alloc is all-or-nothing. If we fail to get a batch,
566                  * retry single alloc to be on the safe side.
567                  */
568                 if (unlikely(ret <= 0)) {
569                         state->reqs[0] = kmem_cache_alloc(req_cachep, gfp);
570                         if (!state->reqs[0])
571                                 goto out;
572                         ret = 1;
573                 }
574                 state->free_reqs = ret - 1;
575                 state->cur_req = 1;
576                 req = state->reqs[0];
577         } else {
578                 req = state->reqs[state->cur_req];
579                 state->free_reqs--;
580                 state->cur_req++;
581         }
582
583         req->ctx = ctx;
584         req->flags = 0;
585         /* one is dropped after submission, the other at completion */
586         refcount_set(&req->refs, 2);
587         return req;
588 out:
589         io_ring_drop_ctx_refs(ctx, 1);
590         return NULL;
591 }
592
593 static void io_free_req_many(struct io_ring_ctx *ctx, void **reqs, int *nr)
594 {
595         if (*nr) {
596                 kmem_cache_free_bulk(req_cachep, *nr, reqs);
597                 io_ring_drop_ctx_refs(ctx, *nr);
598                 *nr = 0;
599         }
600 }
601
602 static void io_free_req(struct io_kiocb *req)
603 {
604         if (req->file && !(req->flags & REQ_F_FIXED_FILE))
605                 fput(req->file);
606         io_ring_drop_ctx_refs(req->ctx, 1);
607         kmem_cache_free(req_cachep, req);
608 }
609
610 static void io_put_req(struct io_kiocb *req)
611 {
612         if (refcount_dec_and_test(&req->refs))
613                 io_free_req(req);
614 }
615
616 /*
617  * Find and free completed poll iocbs
618  */
619 static void io_iopoll_complete(struct io_ring_ctx *ctx, unsigned int *nr_events,
620                                struct list_head *done)
621 {
622         void *reqs[IO_IOPOLL_BATCH];
623         struct io_kiocb *req;
624         int to_free;
625
626         to_free = 0;
627         while (!list_empty(done)) {
628                 req = list_first_entry(done, struct io_kiocb, list);
629                 list_del(&req->list);
630
631                 io_cqring_fill_event(ctx, req->user_data, req->error, 0);
632                 (*nr_events)++;
633
634                 if (refcount_dec_and_test(&req->refs)) {
635                         /* If we're not using fixed files, we have to pair the
636                          * completion part with the file put. Use regular
637                          * completions for those, only batch free for fixed
638                          * file.
639                          */
640                         if (req->flags & REQ_F_FIXED_FILE) {
641                                 reqs[to_free++] = req;
642                                 if (to_free == ARRAY_SIZE(reqs))
643                                         io_free_req_many(ctx, reqs, &to_free);
644                         } else {
645                                 io_free_req(req);
646                         }
647                 }
648         }
649
650         io_commit_cqring(ctx);
651         io_free_req_many(ctx, reqs, &to_free);
652 }
653
654 static int io_do_iopoll(struct io_ring_ctx *ctx, unsigned int *nr_events,
655                         long min)
656 {
657         struct io_kiocb *req, *tmp;
658         LIST_HEAD(done);
659         bool spin;
660         int ret;
661
662         /*
663          * Only spin for completions if we don't have multiple devices hanging
664          * off our complete list, and we're under the requested amount.
665          */
666         spin = !ctx->poll_multi_file && *nr_events < min;
667
668         ret = 0;
669         list_for_each_entry_safe(req, tmp, &ctx->poll_list, list) {
670                 struct kiocb *kiocb = &req->rw;
671
672                 /*
673                  * Move completed entries to our local list. If we find a
674                  * request that requires polling, break out and complete
675                  * the done list first, if we have entries there.
676                  */
677                 if (req->flags & REQ_F_IOPOLL_COMPLETED) {
678                         list_move_tail(&req->list, &done);
679                         continue;
680                 }
681                 if (!list_empty(&done))
682                         break;
683
684                 ret = kiocb->ki_filp->f_op->iopoll(kiocb, spin);
685                 if (ret < 0)
686                         break;
687
688                 if (ret && spin)
689                         spin = false;
690                 ret = 0;
691         }
692
693         if (!list_empty(&done))
694                 io_iopoll_complete(ctx, nr_events, &done);
695
696         return ret;
697 }
698
699 /*
700  * Poll for a mininum of 'min' events. Note that if min == 0 we consider that a
701  * non-spinning poll check - we'll still enter the driver poll loop, but only
702  * as a non-spinning completion check.
703  */
704 static int io_iopoll_getevents(struct io_ring_ctx *ctx, unsigned int *nr_events,
705                                 long min)
706 {
707         while (!list_empty(&ctx->poll_list)) {
708                 int ret;
709
710                 ret = io_do_iopoll(ctx, nr_events, min);
711                 if (ret < 0)
712                         return ret;
713                 if (!min || *nr_events >= min)
714                         return 0;
715         }
716
717         return 1;
718 }
719
720 /*
721  * We can't just wait for polled events to come to us, we have to actively
722  * find and complete them.
723  */
724 static void io_iopoll_reap_events(struct io_ring_ctx *ctx)
725 {
726         if (!(ctx->flags & IORING_SETUP_IOPOLL))
727                 return;
728
729         mutex_lock(&ctx->uring_lock);
730         while (!list_empty(&ctx->poll_list)) {
731                 unsigned int nr_events = 0;
732
733                 io_iopoll_getevents(ctx, &nr_events, 1);
734         }
735         mutex_unlock(&ctx->uring_lock);
736 }
737
738 static int io_iopoll_check(struct io_ring_ctx *ctx, unsigned *nr_events,
739                            long min)
740 {
741         int ret = 0;
742
743         do {
744                 int tmin = 0;
745
746                 if (*nr_events < min)
747                         tmin = min - *nr_events;
748
749                 ret = io_iopoll_getevents(ctx, nr_events, tmin);
750                 if (ret <= 0)
751                         break;
752                 ret = 0;
753         } while (min && !*nr_events && !need_resched());
754
755         return ret;
756 }
757
758 static void kiocb_end_write(struct kiocb *kiocb)
759 {
760         if (kiocb->ki_flags & IOCB_WRITE) {
761                 struct inode *inode = file_inode(kiocb->ki_filp);
762
763                 /*
764                  * Tell lockdep we inherited freeze protection from submission
765                  * thread.
766                  */
767                 if (S_ISREG(inode->i_mode))
768                         __sb_writers_acquired(inode->i_sb, SB_FREEZE_WRITE);
769                 file_end_write(kiocb->ki_filp);
770         }
771 }
772
773 static void io_complete_rw(struct kiocb *kiocb, long res, long res2)
774 {
775         struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw);
776
777         kiocb_end_write(kiocb);
778
779         io_cqring_add_event(req->ctx, req->user_data, res, 0);
780         io_put_req(req);
781 }
782
783 static void io_complete_rw_iopoll(struct kiocb *kiocb, long res, long res2)
784 {
785         struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw);
786
787         kiocb_end_write(kiocb);
788
789         req->error = res;
790         if (res != -EAGAIN)
791                 req->flags |= REQ_F_IOPOLL_COMPLETED;
792 }
793
794 /*
795  * After the iocb has been issued, it's safe to be found on the poll list.
796  * Adding the kiocb to the list AFTER submission ensures that we don't
797  * find it from a io_iopoll_getevents() thread before the issuer is done
798  * accessing the kiocb cookie.
799  */
800 static void io_iopoll_req_issued(struct io_kiocb *req)
801 {
802         struct io_ring_ctx *ctx = req->ctx;
803
804         /*
805          * Track whether we have multiple files in our lists. This will impact
806          * how we do polling eventually, not spinning if we're on potentially
807          * different devices.
808          */
809         if (list_empty(&ctx->poll_list)) {
810                 ctx->poll_multi_file = false;
811         } else if (!ctx->poll_multi_file) {
812                 struct io_kiocb *list_req;
813
814                 list_req = list_first_entry(&ctx->poll_list, struct io_kiocb,
815                                                 list);
816                 if (list_req->rw.ki_filp != req->rw.ki_filp)
817                         ctx->poll_multi_file = true;
818         }
819
820         /*
821          * For fast devices, IO may have already completed. If it has, add
822          * it to the front so we find it first.
823          */
824         if (req->flags & REQ_F_IOPOLL_COMPLETED)
825                 list_add(&req->list, &ctx->poll_list);
826         else
827                 list_add_tail(&req->list, &ctx->poll_list);
828 }
829
830 static void io_file_put(struct io_submit_state *state)
831 {
832         if (state->file) {
833                 int diff = state->has_refs - state->used_refs;
834
835                 if (diff)
836                         fput_many(state->file, diff);
837                 state->file = NULL;
838         }
839 }
840
841 /*
842  * Get as many references to a file as we have IOs left in this submission,
843  * assuming most submissions are for one file, or at least that each file
844  * has more than one submission.
845  */
846 static struct file *io_file_get(struct io_submit_state *state, int fd)
847 {
848         if (!state)
849                 return fget(fd);
850
851         if (state->file) {
852                 if (state->fd == fd) {
853                         state->used_refs++;
854                         state->ios_left--;
855                         return state->file;
856                 }
857                 io_file_put(state);
858         }
859         state->file = fget_many(fd, state->ios_left);
860         if (!state->file)
861                 return NULL;
862
863         state->fd = fd;
864         state->has_refs = state->ios_left;
865         state->used_refs = 1;
866         state->ios_left--;
867         return state->file;
868 }
869
870 /*
871  * If we tracked the file through the SCM inflight mechanism, we could support
872  * any file. For now, just ensure that anything potentially problematic is done
873  * inline.
874  */
875 static bool io_file_supports_async(struct file *file)
876 {
877         umode_t mode = file_inode(file)->i_mode;
878
879         if (S_ISBLK(mode) || S_ISCHR(mode))
880                 return true;
881         if (S_ISREG(mode) && file->f_op != &io_uring_fops)
882                 return true;
883
884         return false;
885 }
886
887 static int io_prep_rw(struct io_kiocb *req, const struct sqe_submit *s,
888                       bool force_nonblock)
889 {
890         const struct io_uring_sqe *sqe = s->sqe;
891         struct io_ring_ctx *ctx = req->ctx;
892         struct kiocb *kiocb = &req->rw;
893         unsigned ioprio;
894         int ret;
895
896         if (!req->file)
897                 return -EBADF;
898
899         if (force_nonblock && !io_file_supports_async(req->file))
900                 force_nonblock = false;
901
902         kiocb->ki_pos = READ_ONCE(sqe->off);
903         kiocb->ki_flags = iocb_flags(kiocb->ki_filp);
904         kiocb->ki_hint = ki_hint_validate(file_write_hint(kiocb->ki_filp));
905
906         ioprio = READ_ONCE(sqe->ioprio);
907         if (ioprio) {
908                 ret = ioprio_check_cap(ioprio);
909                 if (ret)
910                         return ret;
911
912                 kiocb->ki_ioprio = ioprio;
913         } else
914                 kiocb->ki_ioprio = get_current_ioprio();
915
916         ret = kiocb_set_rw_flags(kiocb, READ_ONCE(sqe->rw_flags));
917         if (unlikely(ret))
918                 return ret;
919
920         /* don't allow async punt if RWF_NOWAIT was requested */
921         if (kiocb->ki_flags & IOCB_NOWAIT)
922                 req->flags |= REQ_F_NOWAIT;
923
924         if (force_nonblock)
925                 kiocb->ki_flags |= IOCB_NOWAIT;
926
927         if (ctx->flags & IORING_SETUP_IOPOLL) {
928                 if (!(kiocb->ki_flags & IOCB_DIRECT) ||
929                     !kiocb->ki_filp->f_op->iopoll)
930                         return -EOPNOTSUPP;
931
932                 req->error = 0;
933                 kiocb->ki_flags |= IOCB_HIPRI;
934                 kiocb->ki_complete = io_complete_rw_iopoll;
935         } else {
936                 if (kiocb->ki_flags & IOCB_HIPRI)
937                         return -EINVAL;
938                 kiocb->ki_complete = io_complete_rw;
939         }
940         return 0;
941 }
942
943 static inline void io_rw_done(struct kiocb *kiocb, ssize_t ret)
944 {
945         switch (ret) {
946         case -EIOCBQUEUED:
947                 break;
948         case -ERESTARTSYS:
949         case -ERESTARTNOINTR:
950         case -ERESTARTNOHAND:
951         case -ERESTART_RESTARTBLOCK:
952                 /*
953                  * We can't just restart the syscall, since previously
954                  * submitted sqes may already be in progress. Just fail this
955                  * IO with EINTR.
956                  */
957                 ret = -EINTR;
958                 /* fall through */
959         default:
960                 kiocb->ki_complete(kiocb, ret, 0);
961         }
962 }
963
964 static int io_import_fixed(struct io_ring_ctx *ctx, int rw,
965                            const struct io_uring_sqe *sqe,
966                            struct iov_iter *iter)
967 {
968         size_t len = READ_ONCE(sqe->len);
969         struct io_mapped_ubuf *imu;
970         unsigned index, buf_index;
971         size_t offset;
972         u64 buf_addr;
973
974         /* attempt to use fixed buffers without having provided iovecs */
975         if (unlikely(!ctx->user_bufs))
976                 return -EFAULT;
977
978         buf_index = READ_ONCE(sqe->buf_index);
979         if (unlikely(buf_index >= ctx->nr_user_bufs))
980                 return -EFAULT;
981
982         index = array_index_nospec(buf_index, ctx->nr_user_bufs);
983         imu = &ctx->user_bufs[index];
984         buf_addr = READ_ONCE(sqe->addr);
985
986         /* overflow */
987         if (buf_addr + len < buf_addr)
988                 return -EFAULT;
989         /* not inside the mapped region */
990         if (buf_addr < imu->ubuf || buf_addr + len > imu->ubuf + imu->len)
991                 return -EFAULT;
992
993         /*
994          * May not be a start of buffer, set size appropriately
995          * and advance us to the beginning.
996          */
997         offset = buf_addr - imu->ubuf;
998         iov_iter_bvec(iter, rw, imu->bvec, imu->nr_bvecs, offset + len);
999         if (offset)
1000                 iov_iter_advance(iter, offset);
1001
1002         /* don't drop a reference to these pages */
1003         iter->type |= ITER_BVEC_FLAG_NO_REF;
1004         return 0;
1005 }
1006
1007 static int io_import_iovec(struct io_ring_ctx *ctx, int rw,
1008                            const struct sqe_submit *s, struct iovec **iovec,
1009                            struct iov_iter *iter)
1010 {
1011         const struct io_uring_sqe *sqe = s->sqe;
1012         void __user *buf = u64_to_user_ptr(READ_ONCE(sqe->addr));
1013         size_t sqe_len = READ_ONCE(sqe->len);
1014         u8 opcode;
1015
1016         /*
1017          * We're reading ->opcode for the second time, but the first read
1018          * doesn't care whether it's _FIXED or not, so it doesn't matter
1019          * whether ->opcode changes concurrently. The first read does care
1020          * about whether it is a READ or a WRITE, so we don't trust this read
1021          * for that purpose and instead let the caller pass in the read/write
1022          * flag.
1023          */
1024         opcode = READ_ONCE(sqe->opcode);
1025         if (opcode == IORING_OP_READ_FIXED ||
1026             opcode == IORING_OP_WRITE_FIXED) {
1027                 int ret = io_import_fixed(ctx, rw, sqe, iter);
1028                 *iovec = NULL;
1029                 return ret;
1030         }
1031
1032         if (!s->has_user)
1033                 return -EFAULT;
1034
1035 #ifdef CONFIG_COMPAT
1036         if (ctx->compat)
1037                 return compat_import_iovec(rw, buf, sqe_len, UIO_FASTIOV,
1038                                                 iovec, iter);
1039 #endif
1040
1041         return import_iovec(rw, buf, sqe_len, UIO_FASTIOV, iovec, iter);
1042 }
1043
1044 /*
1045  * Make a note of the last file/offset/direction we punted to async
1046  * context. We'll use this information to see if we can piggy back a
1047  * sequential request onto the previous one, if it's still hasn't been
1048  * completed by the async worker.
1049  */
1050 static void io_async_list_note(int rw, struct io_kiocb *req, size_t len)
1051 {
1052         struct async_list *async_list = &req->ctx->pending_async[rw];
1053         struct kiocb *kiocb = &req->rw;
1054         struct file *filp = kiocb->ki_filp;
1055         off_t io_end = kiocb->ki_pos + len;
1056
1057         if (filp == async_list->file && kiocb->ki_pos == async_list->io_end) {
1058                 unsigned long max_pages;
1059
1060                 /* Use 8x RA size as a decent limiter for both reads/writes */
1061                 max_pages = filp->f_ra.ra_pages;
1062                 if (!max_pages)
1063                         max_pages = VM_READAHEAD_PAGES;
1064                 max_pages *= 8;
1065
1066                 /* If max pages are exceeded, reset the state */
1067                 len >>= PAGE_SHIFT;
1068                 if (async_list->io_pages + len <= max_pages) {
1069                         req->flags |= REQ_F_SEQ_PREV;
1070                         async_list->io_pages += len;
1071                 } else {
1072                         io_end = 0;
1073                         async_list->io_pages = 0;
1074                 }
1075         }
1076
1077         /* New file? Reset state. */
1078         if (async_list->file != filp) {
1079                 async_list->io_pages = 0;
1080                 async_list->file = filp;
1081         }
1082         async_list->io_end = io_end;
1083 }
1084
1085 static int io_read(struct io_kiocb *req, const struct sqe_submit *s,
1086                    bool force_nonblock)
1087 {
1088         struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs;
1089         struct kiocb *kiocb = &req->rw;
1090         struct iov_iter iter;
1091         struct file *file;
1092         size_t iov_count;
1093         int ret;
1094
1095         ret = io_prep_rw(req, s, force_nonblock);
1096         if (ret)
1097                 return ret;
1098         file = kiocb->ki_filp;
1099
1100         if (unlikely(!(file->f_mode & FMODE_READ)))
1101                 return -EBADF;
1102         if (unlikely(!file->f_op->read_iter))
1103                 return -EINVAL;
1104
1105         ret = io_import_iovec(req->ctx, READ, s, &iovec, &iter);
1106         if (ret)
1107                 return ret;
1108
1109         iov_count = iov_iter_count(&iter);
1110         ret = rw_verify_area(READ, file, &kiocb->ki_pos, iov_count);
1111         if (!ret) {
1112                 ssize_t ret2;
1113
1114                 /* Catch -EAGAIN return for forced non-blocking submission */
1115                 ret2 = call_read_iter(file, kiocb, &iter);
1116                 if (!force_nonblock || ret2 != -EAGAIN) {
1117                         io_rw_done(kiocb, ret2);
1118                 } else {
1119                         /*
1120                          * If ->needs_lock is true, we're already in async
1121                          * context.
1122                          */
1123                         if (!s->needs_lock)
1124                                 io_async_list_note(READ, req, iov_count);
1125                         ret = -EAGAIN;
1126                 }
1127         }
1128         kfree(iovec);
1129         return ret;
1130 }
1131
1132 static int io_write(struct io_kiocb *req, const struct sqe_submit *s,
1133                     bool force_nonblock)
1134 {
1135         struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs;
1136         struct kiocb *kiocb = &req->rw;
1137         struct iov_iter iter;
1138         struct file *file;
1139         size_t iov_count;
1140         int ret;
1141
1142         ret = io_prep_rw(req, s, force_nonblock);
1143         if (ret)
1144                 return ret;
1145
1146         file = kiocb->ki_filp;
1147         if (unlikely(!(file->f_mode & FMODE_WRITE)))
1148                 return -EBADF;
1149         if (unlikely(!file->f_op->write_iter))
1150                 return -EINVAL;
1151
1152         ret = io_import_iovec(req->ctx, WRITE, s, &iovec, &iter);
1153         if (ret)
1154                 return ret;
1155
1156         iov_count = iov_iter_count(&iter);
1157
1158         ret = -EAGAIN;
1159         if (force_nonblock && !(kiocb->ki_flags & IOCB_DIRECT)) {
1160                 /* If ->needs_lock is true, we're already in async context. */
1161                 if (!s->needs_lock)
1162                         io_async_list_note(WRITE, req, iov_count);
1163                 goto out_free;
1164         }
1165
1166         ret = rw_verify_area(WRITE, file, &kiocb->ki_pos, iov_count);
1167         if (!ret) {
1168                 ssize_t ret2;
1169
1170                 /*
1171                  * Open-code file_start_write here to grab freeze protection,
1172                  * which will be released by another thread in
1173                  * io_complete_rw().  Fool lockdep by telling it the lock got
1174                  * released so that it doesn't complain about the held lock when
1175                  * we return to userspace.
1176                  */
1177                 if (S_ISREG(file_inode(file)->i_mode)) {
1178                         __sb_start_write(file_inode(file)->i_sb,
1179                                                 SB_FREEZE_WRITE, true);
1180                         __sb_writers_release(file_inode(file)->i_sb,
1181                                                 SB_FREEZE_WRITE);
1182                 }
1183                 kiocb->ki_flags |= IOCB_WRITE;
1184
1185                 ret2 = call_write_iter(file, kiocb, &iter);
1186                 if (!force_nonblock || ret2 != -EAGAIN) {
1187                         io_rw_done(kiocb, ret2);
1188                 } else {
1189                         /*
1190                          * If ->needs_lock is true, we're already in async
1191                          * context.
1192                          */
1193                         if (!s->needs_lock)
1194                                 io_async_list_note(WRITE, req, iov_count);
1195                         ret = -EAGAIN;
1196                 }
1197         }
1198 out_free:
1199         kfree(iovec);
1200         return ret;
1201 }
1202
1203 /*
1204  * IORING_OP_NOP just posts a completion event, nothing else.
1205  */
1206 static int io_nop(struct io_kiocb *req, u64 user_data)
1207 {
1208         struct io_ring_ctx *ctx = req->ctx;
1209         long err = 0;
1210
1211         if (unlikely(ctx->flags & IORING_SETUP_IOPOLL))
1212                 return -EINVAL;
1213
1214         io_cqring_add_event(ctx, user_data, err, 0);
1215         io_put_req(req);
1216         return 0;
1217 }
1218
1219 static int io_prep_fsync(struct io_kiocb *req, const struct io_uring_sqe *sqe)
1220 {
1221         struct io_ring_ctx *ctx = req->ctx;
1222
1223         if (!req->file)
1224                 return -EBADF;
1225
1226         if (unlikely(ctx->flags & IORING_SETUP_IOPOLL))
1227                 return -EINVAL;
1228         if (unlikely(sqe->addr || sqe->ioprio || sqe->buf_index))
1229                 return -EINVAL;
1230
1231         return 0;
1232 }
1233
1234 static int io_fsync(struct io_kiocb *req, const struct io_uring_sqe *sqe,
1235                     bool force_nonblock)
1236 {
1237         loff_t sqe_off = READ_ONCE(sqe->off);
1238         loff_t sqe_len = READ_ONCE(sqe->len);
1239         loff_t end = sqe_off + sqe_len;
1240         unsigned fsync_flags;
1241         int ret;
1242
1243         fsync_flags = READ_ONCE(sqe->fsync_flags);
1244         if (unlikely(fsync_flags & ~IORING_FSYNC_DATASYNC))
1245                 return -EINVAL;
1246
1247         ret = io_prep_fsync(req, sqe);
1248         if (ret)
1249                 return ret;
1250
1251         /* fsync always requires a blocking context */
1252         if (force_nonblock)
1253                 return -EAGAIN;
1254
1255         ret = vfs_fsync_range(req->rw.ki_filp, sqe_off,
1256                                 end > 0 ? end : LLONG_MAX,
1257                                 fsync_flags & IORING_FSYNC_DATASYNC);
1258
1259         io_cqring_add_event(req->ctx, sqe->user_data, ret, 0);
1260         io_put_req(req);
1261         return 0;
1262 }
1263
1264 static int io_prep_sfr(struct io_kiocb *req, const struct io_uring_sqe *sqe)
1265 {
1266         struct io_ring_ctx *ctx = req->ctx;
1267         int ret = 0;
1268
1269         if (!req->file)
1270                 return -EBADF;
1271
1272         if (unlikely(ctx->flags & IORING_SETUP_IOPOLL))
1273                 return -EINVAL;
1274         if (unlikely(sqe->addr || sqe->ioprio || sqe->buf_index))
1275                 return -EINVAL;
1276
1277         return ret;
1278 }
1279
1280 static int io_sync_file_range(struct io_kiocb *req,
1281                               const struct io_uring_sqe *sqe,
1282                               bool force_nonblock)
1283 {
1284         loff_t sqe_off;
1285         loff_t sqe_len;
1286         unsigned flags;
1287         int ret;
1288
1289         ret = io_prep_sfr(req, sqe);
1290         if (ret)
1291                 return ret;
1292
1293         /* sync_file_range always requires a blocking context */
1294         if (force_nonblock)
1295                 return -EAGAIN;
1296
1297         sqe_off = READ_ONCE(sqe->off);
1298         sqe_len = READ_ONCE(sqe->len);
1299         flags = READ_ONCE(sqe->sync_range_flags);
1300
1301         ret = sync_file_range(req->rw.ki_filp, sqe_off, sqe_len, flags);
1302
1303         io_cqring_add_event(req->ctx, sqe->user_data, ret, 0);
1304         io_put_req(req);
1305         return 0;
1306 }
1307
1308 static void io_poll_remove_one(struct io_kiocb *req)
1309 {
1310         struct io_poll_iocb *poll = &req->poll;
1311
1312         spin_lock(&poll->head->lock);
1313         WRITE_ONCE(poll->canceled, true);
1314         if (!list_empty(&poll->wait.entry)) {
1315                 list_del_init(&poll->wait.entry);
1316                 queue_work(req->ctx->sqo_wq, &req->work);
1317         }
1318         spin_unlock(&poll->head->lock);
1319
1320         list_del_init(&req->list);
1321 }
1322
1323 static void io_poll_remove_all(struct io_ring_ctx *ctx)
1324 {
1325         struct io_kiocb *req;
1326
1327         spin_lock_irq(&ctx->completion_lock);
1328         while (!list_empty(&ctx->cancel_list)) {
1329                 req = list_first_entry(&ctx->cancel_list, struct io_kiocb,list);
1330                 io_poll_remove_one(req);
1331         }
1332         spin_unlock_irq(&ctx->completion_lock);
1333 }
1334
1335 /*
1336  * Find a running poll command that matches one specified in sqe->addr,
1337  * and remove it if found.
1338  */
1339 static int io_poll_remove(struct io_kiocb *req, const struct io_uring_sqe *sqe)
1340 {
1341         struct io_ring_ctx *ctx = req->ctx;
1342         struct io_kiocb *poll_req, *next;
1343         int ret = -ENOENT;
1344
1345         if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL))
1346                 return -EINVAL;
1347         if (sqe->ioprio || sqe->off || sqe->len || sqe->buf_index ||
1348             sqe->poll_events)
1349                 return -EINVAL;
1350
1351         spin_lock_irq(&ctx->completion_lock);
1352         list_for_each_entry_safe(poll_req, next, &ctx->cancel_list, list) {
1353                 if (READ_ONCE(sqe->addr) == poll_req->user_data) {
1354                         io_poll_remove_one(poll_req);
1355                         ret = 0;
1356                         break;
1357                 }
1358         }
1359         spin_unlock_irq(&ctx->completion_lock);
1360
1361         io_cqring_add_event(req->ctx, sqe->user_data, ret, 0);
1362         io_put_req(req);
1363         return 0;
1364 }
1365
1366 static void io_poll_complete(struct io_ring_ctx *ctx, struct io_kiocb *req,
1367                              __poll_t mask)
1368 {
1369         req->poll.done = true;
1370         io_cqring_fill_event(ctx, req->user_data, mangle_poll(mask), 0);
1371         io_commit_cqring(ctx);
1372 }
1373
1374 static void io_poll_complete_work(struct work_struct *work)
1375 {
1376         struct io_kiocb *req = container_of(work, struct io_kiocb, work);
1377         struct io_poll_iocb *poll = &req->poll;
1378         struct poll_table_struct pt = { ._key = poll->events };
1379         struct io_ring_ctx *ctx = req->ctx;
1380         __poll_t mask = 0;
1381
1382         if (!READ_ONCE(poll->canceled))
1383                 mask = vfs_poll(poll->file, &pt) & poll->events;
1384
1385         /*
1386          * Note that ->ki_cancel callers also delete iocb from active_reqs after
1387          * calling ->ki_cancel.  We need the ctx_lock roundtrip here to
1388          * synchronize with them.  In the cancellation case the list_del_init
1389          * itself is not actually needed, but harmless so we keep it in to
1390          * avoid further branches in the fast path.
1391          */
1392         spin_lock_irq(&ctx->completion_lock);
1393         if (!mask && !READ_ONCE(poll->canceled)) {
1394                 add_wait_queue(poll->head, &poll->wait);
1395                 spin_unlock_irq(&ctx->completion_lock);
1396                 return;
1397         }
1398         list_del_init(&req->list);
1399         io_poll_complete(ctx, req, mask);
1400         spin_unlock_irq(&ctx->completion_lock);
1401
1402         io_cqring_ev_posted(ctx);
1403         io_put_req(req);
1404 }
1405
1406 static int io_poll_wake(struct wait_queue_entry *wait, unsigned mode, int sync,
1407                         void *key)
1408 {
1409         struct io_poll_iocb *poll = container_of(wait, struct io_poll_iocb,
1410                                                         wait);
1411         struct io_kiocb *req = container_of(poll, struct io_kiocb, poll);
1412         struct io_ring_ctx *ctx = req->ctx;
1413         __poll_t mask = key_to_poll(key);
1414         unsigned long flags;
1415
1416         /* for instances that support it check for an event match first: */
1417         if (mask && !(mask & poll->events))
1418                 return 0;
1419
1420         list_del_init(&poll->wait.entry);
1421
1422         if (mask && spin_trylock_irqsave(&ctx->completion_lock, flags)) {
1423                 list_del(&req->list);
1424                 io_poll_complete(ctx, req, mask);
1425                 spin_unlock_irqrestore(&ctx->completion_lock, flags);
1426
1427                 io_cqring_ev_posted(ctx);
1428                 io_put_req(req);
1429         } else {
1430                 queue_work(ctx->sqo_wq, &req->work);
1431         }
1432
1433         return 1;
1434 }
1435
1436 struct io_poll_table {
1437         struct poll_table_struct pt;
1438         struct io_kiocb *req;
1439         int error;
1440 };
1441
1442 static void io_poll_queue_proc(struct file *file, struct wait_queue_head *head,
1443                                struct poll_table_struct *p)
1444 {
1445         struct io_poll_table *pt = container_of(p, struct io_poll_table, pt);
1446
1447         if (unlikely(pt->req->poll.head)) {
1448                 pt->error = -EINVAL;
1449                 return;
1450         }
1451
1452         pt->error = 0;
1453         pt->req->poll.head = head;
1454         add_wait_queue(head, &pt->req->poll.wait);
1455 }
1456
1457 static int io_poll_add(struct io_kiocb *req, const struct io_uring_sqe *sqe)
1458 {
1459         struct io_poll_iocb *poll = &req->poll;
1460         struct io_ring_ctx *ctx = req->ctx;
1461         struct io_poll_table ipt;
1462         bool cancel = false;
1463         __poll_t mask;
1464         u16 events;
1465
1466         if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL))
1467                 return -EINVAL;
1468         if (sqe->addr || sqe->ioprio || sqe->off || sqe->len || sqe->buf_index)
1469                 return -EINVAL;
1470         if (!poll->file)
1471                 return -EBADF;
1472
1473         INIT_WORK(&req->work, io_poll_complete_work);
1474         events = READ_ONCE(sqe->poll_events);
1475         poll->events = demangle_poll(events) | EPOLLERR | EPOLLHUP;
1476
1477         poll->head = NULL;
1478         poll->done = false;
1479         poll->canceled = false;
1480
1481         ipt.pt._qproc = io_poll_queue_proc;
1482         ipt.pt._key = poll->events;
1483         ipt.req = req;
1484         ipt.error = -EINVAL; /* same as no support for IOCB_CMD_POLL */
1485
1486         /* initialized the list so that we can do list_empty checks */
1487         INIT_LIST_HEAD(&poll->wait.entry);
1488         init_waitqueue_func_entry(&poll->wait, io_poll_wake);
1489
1490         mask = vfs_poll(poll->file, &ipt.pt) & poll->events;
1491
1492         spin_lock_irq(&ctx->completion_lock);
1493         if (likely(poll->head)) {
1494                 spin_lock(&poll->head->lock);
1495                 if (unlikely(list_empty(&poll->wait.entry))) {
1496                         if (ipt.error)
1497                                 cancel = true;
1498                         ipt.error = 0;
1499                         mask = 0;
1500                 }
1501                 if (mask || ipt.error)
1502                         list_del_init(&poll->wait.entry);
1503                 else if (cancel)
1504                         WRITE_ONCE(poll->canceled, true);
1505                 else if (!poll->done) /* actually waiting for an event */
1506                         list_add_tail(&req->list, &ctx->cancel_list);
1507                 spin_unlock(&poll->head->lock);
1508         }
1509         if (mask) { /* no async, we'd stolen it */
1510                 ipt.error = 0;
1511                 io_poll_complete(ctx, req, mask);
1512         }
1513         spin_unlock_irq(&ctx->completion_lock);
1514
1515         if (mask) {
1516                 io_cqring_ev_posted(ctx);
1517                 io_put_req(req);
1518         }
1519         return ipt.error;
1520 }
1521
1522 static int io_req_defer(struct io_ring_ctx *ctx, struct io_kiocb *req,
1523                         const struct io_uring_sqe *sqe)
1524 {
1525         struct io_uring_sqe *sqe_copy;
1526
1527         if (!io_sequence_defer(ctx, req) && list_empty(&ctx->defer_list))
1528                 return 0;
1529
1530         sqe_copy = kmalloc(sizeof(*sqe_copy), GFP_KERNEL);
1531         if (!sqe_copy)
1532                 return -EAGAIN;
1533
1534         spin_lock_irq(&ctx->completion_lock);
1535         if (!io_sequence_defer(ctx, req) && list_empty(&ctx->defer_list)) {
1536                 spin_unlock_irq(&ctx->completion_lock);
1537                 kfree(sqe_copy);
1538                 return 0;
1539         }
1540
1541         memcpy(sqe_copy, sqe, sizeof(*sqe_copy));
1542         req->submit.sqe = sqe_copy;
1543
1544         INIT_WORK(&req->work, io_sq_wq_submit_work);
1545         list_add_tail(&req->list, &ctx->defer_list);
1546         spin_unlock_irq(&ctx->completion_lock);
1547         return -EIOCBQUEUED;
1548 }
1549
1550 static int __io_submit_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
1551                            const struct sqe_submit *s, bool force_nonblock)
1552 {
1553         int ret, opcode;
1554
1555         if (unlikely(s->index >= ctx->sq_entries))
1556                 return -EINVAL;
1557         req->user_data = READ_ONCE(s->sqe->user_data);
1558
1559         opcode = READ_ONCE(s->sqe->opcode);
1560         switch (opcode) {
1561         case IORING_OP_NOP:
1562                 ret = io_nop(req, req->user_data);
1563                 break;
1564         case IORING_OP_READV:
1565                 if (unlikely(s->sqe->buf_index))
1566                         return -EINVAL;
1567                 ret = io_read(req, s, force_nonblock);
1568                 break;
1569         case IORING_OP_WRITEV:
1570                 if (unlikely(s->sqe->buf_index))
1571                         return -EINVAL;
1572                 ret = io_write(req, s, force_nonblock);
1573                 break;
1574         case IORING_OP_READ_FIXED:
1575                 ret = io_read(req, s, force_nonblock);
1576                 break;
1577         case IORING_OP_WRITE_FIXED:
1578                 ret = io_write(req, s, force_nonblock);
1579                 break;
1580         case IORING_OP_FSYNC:
1581                 ret = io_fsync(req, s->sqe, force_nonblock);
1582                 break;
1583         case IORING_OP_POLL_ADD:
1584                 ret = io_poll_add(req, s->sqe);
1585                 break;
1586         case IORING_OP_POLL_REMOVE:
1587                 ret = io_poll_remove(req, s->sqe);
1588                 break;
1589         case IORING_OP_SYNC_FILE_RANGE:
1590                 ret = io_sync_file_range(req, s->sqe, force_nonblock);
1591                 break;
1592         default:
1593                 ret = -EINVAL;
1594                 break;
1595         }
1596
1597         if (ret)
1598                 return ret;
1599
1600         if (ctx->flags & IORING_SETUP_IOPOLL) {
1601                 if (req->error == -EAGAIN)
1602                         return -EAGAIN;
1603
1604                 /* workqueue context doesn't hold uring_lock, grab it now */
1605                 if (s->needs_lock)
1606                         mutex_lock(&ctx->uring_lock);
1607                 io_iopoll_req_issued(req);
1608                 if (s->needs_lock)
1609                         mutex_unlock(&ctx->uring_lock);
1610         }
1611
1612         return 0;
1613 }
1614
1615 static struct async_list *io_async_list_from_sqe(struct io_ring_ctx *ctx,
1616                                                  const struct io_uring_sqe *sqe)
1617 {
1618         switch (sqe->opcode) {
1619         case IORING_OP_READV:
1620         case IORING_OP_READ_FIXED:
1621                 return &ctx->pending_async[READ];
1622         case IORING_OP_WRITEV:
1623         case IORING_OP_WRITE_FIXED:
1624                 return &ctx->pending_async[WRITE];
1625         default:
1626                 return NULL;
1627         }
1628 }
1629
1630 static inline bool io_sqe_needs_user(const struct io_uring_sqe *sqe)
1631 {
1632         u8 opcode = READ_ONCE(sqe->opcode);
1633
1634         return !(opcode == IORING_OP_READ_FIXED ||
1635                  opcode == IORING_OP_WRITE_FIXED);
1636 }
1637
1638 static void io_sq_wq_submit_work(struct work_struct *work)
1639 {
1640         struct io_kiocb *req = container_of(work, struct io_kiocb, work);
1641         struct io_ring_ctx *ctx = req->ctx;
1642         struct mm_struct *cur_mm = NULL;
1643         struct async_list *async_list;
1644         LIST_HEAD(req_list);
1645         mm_segment_t old_fs;
1646         int ret;
1647
1648         async_list = io_async_list_from_sqe(ctx, req->submit.sqe);
1649 restart:
1650         do {
1651                 struct sqe_submit *s = &req->submit;
1652                 const struct io_uring_sqe *sqe = s->sqe;
1653
1654                 /* Ensure we clear previously set non-block flag */
1655                 req->rw.ki_flags &= ~IOCB_NOWAIT;
1656
1657                 ret = 0;
1658                 if (io_sqe_needs_user(sqe) && !cur_mm) {
1659                         if (!mmget_not_zero(ctx->sqo_mm)) {
1660                                 ret = -EFAULT;
1661                         } else {
1662                                 cur_mm = ctx->sqo_mm;
1663                                 use_mm(cur_mm);
1664                                 old_fs = get_fs();
1665                                 set_fs(USER_DS);
1666                         }
1667                 }
1668
1669                 if (!ret) {
1670                         s->has_user = cur_mm != NULL;
1671                         s->needs_lock = true;
1672                         do {
1673                                 ret = __io_submit_sqe(ctx, req, s, false);
1674                                 /*
1675                                  * We can get EAGAIN for polled IO even though
1676                                  * we're forcing a sync submission from here,
1677                                  * since we can't wait for request slots on the
1678                                  * block side.
1679                                  */
1680                                 if (ret != -EAGAIN)
1681                                         break;
1682                                 cond_resched();
1683                         } while (1);
1684                 }
1685
1686                 /* drop submission reference */
1687                 io_put_req(req);
1688
1689                 if (ret) {
1690                         io_cqring_add_event(ctx, sqe->user_data, ret, 0);
1691                         io_put_req(req);
1692                 }
1693
1694                 /* async context always use a copy of the sqe */
1695                 kfree(sqe);
1696
1697                 if (!async_list)
1698                         break;
1699                 if (!list_empty(&req_list)) {
1700                         req = list_first_entry(&req_list, struct io_kiocb,
1701                                                 list);
1702                         list_del(&req->list);
1703                         continue;
1704                 }
1705                 if (list_empty(&async_list->list))
1706                         break;
1707
1708                 req = NULL;
1709                 spin_lock(&async_list->lock);
1710                 if (list_empty(&async_list->list)) {
1711                         spin_unlock(&async_list->lock);
1712                         break;
1713                 }
1714                 list_splice_init(&async_list->list, &req_list);
1715                 spin_unlock(&async_list->lock);
1716
1717                 req = list_first_entry(&req_list, struct io_kiocb, list);
1718                 list_del(&req->list);
1719         } while (req);
1720
1721         /*
1722          * Rare case of racing with a submitter. If we find the count has
1723          * dropped to zero AND we have pending work items, then restart
1724          * the processing. This is a tiny race window.
1725          */
1726         if (async_list) {
1727                 ret = atomic_dec_return(&async_list->cnt);
1728                 while (!ret && !list_empty(&async_list->list)) {
1729                         spin_lock(&async_list->lock);
1730                         atomic_inc(&async_list->cnt);
1731                         list_splice_init(&async_list->list, &req_list);
1732                         spin_unlock(&async_list->lock);
1733
1734                         if (!list_empty(&req_list)) {
1735                                 req = list_first_entry(&req_list,
1736                                                         struct io_kiocb, list);
1737                                 list_del(&req->list);
1738                                 goto restart;
1739                         }
1740                         ret = atomic_dec_return(&async_list->cnt);
1741                 }
1742         }
1743
1744         if (cur_mm) {
1745                 set_fs(old_fs);
1746                 unuse_mm(cur_mm);
1747                 mmput(cur_mm);
1748         }
1749 }
1750
1751 /*
1752  * See if we can piggy back onto previously submitted work, that is still
1753  * running. We currently only allow this if the new request is sequential
1754  * to the previous one we punted.
1755  */
1756 static bool io_add_to_prev_work(struct async_list *list, struct io_kiocb *req)
1757 {
1758         bool ret = false;
1759
1760         if (!list)
1761                 return false;
1762         if (!(req->flags & REQ_F_SEQ_PREV))
1763                 return false;
1764         if (!atomic_read(&list->cnt))
1765                 return false;
1766
1767         ret = true;
1768         spin_lock(&list->lock);
1769         list_add_tail(&req->list, &list->list);
1770         if (!atomic_read(&list->cnt)) {
1771                 list_del_init(&req->list);
1772                 ret = false;
1773         }
1774         spin_unlock(&list->lock);
1775         return ret;
1776 }
1777
1778 static bool io_op_needs_file(const struct io_uring_sqe *sqe)
1779 {
1780         int op = READ_ONCE(sqe->opcode);
1781
1782         switch (op) {
1783         case IORING_OP_NOP:
1784         case IORING_OP_POLL_REMOVE:
1785                 return false;
1786         default:
1787                 return true;
1788         }
1789 }
1790
1791 static int io_req_set_file(struct io_ring_ctx *ctx, const struct sqe_submit *s,
1792                            struct io_submit_state *state, struct io_kiocb *req)
1793 {
1794         unsigned flags;
1795         int fd;
1796
1797         flags = READ_ONCE(s->sqe->flags);
1798         fd = READ_ONCE(s->sqe->fd);
1799
1800         if (flags & IOSQE_IO_DRAIN) {
1801                 req->flags |= REQ_F_IO_DRAIN;
1802                 req->sequence = ctx->cached_sq_head - 1;
1803         }
1804
1805         if (!io_op_needs_file(s->sqe)) {
1806                 req->file = NULL;
1807                 return 0;
1808         }
1809
1810         if (flags & IOSQE_FIXED_FILE) {
1811                 if (unlikely(!ctx->user_files ||
1812                     (unsigned) fd >= ctx->nr_user_files))
1813                         return -EBADF;
1814                 req->file = ctx->user_files[fd];
1815                 req->flags |= REQ_F_FIXED_FILE;
1816         } else {
1817                 if (s->needs_fixed_file)
1818                         return -EBADF;
1819                 req->file = io_file_get(state, fd);
1820                 if (unlikely(!req->file))
1821                         return -EBADF;
1822         }
1823
1824         return 0;
1825 }
1826
1827 static int io_submit_sqe(struct io_ring_ctx *ctx, struct sqe_submit *s,
1828                          struct io_submit_state *state)
1829 {
1830         struct io_kiocb *req;
1831         int ret;
1832
1833         /* enforce forwards compatibility on users */
1834         if (unlikely(s->sqe->flags & ~(IOSQE_FIXED_FILE | IOSQE_IO_DRAIN)))
1835                 return -EINVAL;
1836
1837         req = io_get_req(ctx, state);
1838         if (unlikely(!req))
1839                 return -EAGAIN;
1840
1841         ret = io_req_set_file(ctx, s, state, req);
1842         if (unlikely(ret))
1843                 goto out;
1844
1845         ret = io_req_defer(ctx, req, s->sqe);
1846         if (ret) {
1847                 if (ret == -EIOCBQUEUED)
1848                         ret = 0;
1849                 return ret;
1850         }
1851
1852         ret = __io_submit_sqe(ctx, req, s, true);
1853         if (ret == -EAGAIN && !(req->flags & REQ_F_NOWAIT)) {
1854                 struct io_uring_sqe *sqe_copy;
1855
1856                 sqe_copy = kmalloc(sizeof(*sqe_copy), GFP_KERNEL);
1857                 if (sqe_copy) {
1858                         struct async_list *list;
1859
1860                         memcpy(sqe_copy, s->sqe, sizeof(*sqe_copy));
1861                         s->sqe = sqe_copy;
1862
1863                         memcpy(&req->submit, s, sizeof(*s));
1864                         list = io_async_list_from_sqe(ctx, s->sqe);
1865                         if (!io_add_to_prev_work(list, req)) {
1866                                 if (list)
1867                                         atomic_inc(&list->cnt);
1868                                 INIT_WORK(&req->work, io_sq_wq_submit_work);
1869                                 queue_work(ctx->sqo_wq, &req->work);
1870                         }
1871
1872                         /*
1873                          * Queued up for async execution, worker will release
1874                          * submit reference when the iocb is actually
1875                          * submitted.
1876                          */
1877                         return 0;
1878                 }
1879         }
1880
1881 out:
1882         /* drop submission reference */
1883         io_put_req(req);
1884
1885         /* and drop final reference, if we failed */
1886         if (ret)
1887                 io_put_req(req);
1888
1889         return ret;
1890 }
1891
1892 /*
1893  * Batched submission is done, ensure local IO is flushed out.
1894  */
1895 static void io_submit_state_end(struct io_submit_state *state)
1896 {
1897         blk_finish_plug(&state->plug);
1898         io_file_put(state);
1899         if (state->free_reqs)
1900                 kmem_cache_free_bulk(req_cachep, state->free_reqs,
1901                                         &state->reqs[state->cur_req]);
1902 }
1903
1904 /*
1905  * Start submission side cache.
1906  */
1907 static void io_submit_state_start(struct io_submit_state *state,
1908                                   struct io_ring_ctx *ctx, unsigned max_ios)
1909 {
1910         blk_start_plug(&state->plug);
1911         state->free_reqs = 0;
1912         state->file = NULL;
1913         state->ios_left = max_ios;
1914 }
1915
1916 static void io_commit_sqring(struct io_ring_ctx *ctx)
1917 {
1918         struct io_sq_ring *ring = ctx->sq_ring;
1919
1920         if (ctx->cached_sq_head != READ_ONCE(ring->r.head)) {
1921                 /*
1922                  * Ensure any loads from the SQEs are done at this point,
1923                  * since once we write the new head, the application could
1924                  * write new data to them.
1925                  */
1926                 smp_store_release(&ring->r.head, ctx->cached_sq_head);
1927         }
1928 }
1929
1930 /*
1931  * Fetch an sqe, if one is available. Note that s->sqe will point to memory
1932  * that is mapped by userspace. This means that care needs to be taken to
1933  * ensure that reads are stable, as we cannot rely on userspace always
1934  * being a good citizen. If members of the sqe are validated and then later
1935  * used, it's important that those reads are done through READ_ONCE() to
1936  * prevent a re-load down the line.
1937  */
1938 static bool io_get_sqring(struct io_ring_ctx *ctx, struct sqe_submit *s)
1939 {
1940         struct io_sq_ring *ring = ctx->sq_ring;
1941         unsigned head;
1942
1943         /*
1944          * The cached sq head (or cq tail) serves two purposes:
1945          *
1946          * 1) allows us to batch the cost of updating the user visible
1947          *    head updates.
1948          * 2) allows the kernel side to track the head on its own, even
1949          *    though the application is the one updating it.
1950          */
1951         head = ctx->cached_sq_head;
1952         /* make sure SQ entry isn't read before tail */
1953         if (head == smp_load_acquire(&ring->r.tail))
1954                 return false;
1955
1956         head = READ_ONCE(ring->array[head & ctx->sq_mask]);
1957         if (head < ctx->sq_entries) {
1958                 s->index = head;
1959                 s->sqe = &ctx->sq_sqes[head];
1960                 ctx->cached_sq_head++;
1961                 return true;
1962         }
1963
1964         /* drop invalid entries */
1965         ctx->cached_sq_head++;
1966         ring->dropped++;
1967         return false;
1968 }
1969
1970 static int io_submit_sqes(struct io_ring_ctx *ctx, struct sqe_submit *sqes,
1971                           unsigned int nr, bool has_user, bool mm_fault)
1972 {
1973         struct io_submit_state state, *statep = NULL;
1974         int ret, i, submitted = 0;
1975
1976         if (nr > IO_PLUG_THRESHOLD) {
1977                 io_submit_state_start(&state, ctx, nr);
1978                 statep = &state;
1979         }
1980
1981         for (i = 0; i < nr; i++) {
1982                 if (unlikely(mm_fault)) {
1983                         ret = -EFAULT;
1984                 } else {
1985                         sqes[i].has_user = has_user;
1986                         sqes[i].needs_lock = true;
1987                         sqes[i].needs_fixed_file = true;
1988                         ret = io_submit_sqe(ctx, &sqes[i], statep);
1989                 }
1990                 if (!ret) {
1991                         submitted++;
1992                         continue;
1993                 }
1994
1995                 io_cqring_add_event(ctx, sqes[i].sqe->user_data, ret, 0);
1996         }
1997
1998         if (statep)
1999                 io_submit_state_end(&state);
2000
2001         return submitted;
2002 }
2003
2004 static int io_sq_thread(void *data)
2005 {
2006         struct sqe_submit sqes[IO_IOPOLL_BATCH];
2007         struct io_ring_ctx *ctx = data;
2008         struct mm_struct *cur_mm = NULL;
2009         mm_segment_t old_fs;
2010         DEFINE_WAIT(wait);
2011         unsigned inflight;
2012         unsigned long timeout;
2013
2014         old_fs = get_fs();
2015         set_fs(USER_DS);
2016
2017         timeout = inflight = 0;
2018         while (!kthread_should_stop() && !ctx->sqo_stop) {
2019                 bool all_fixed, mm_fault = false;
2020                 int i;
2021
2022                 if (inflight) {
2023                         unsigned nr_events = 0;
2024
2025                         if (ctx->flags & IORING_SETUP_IOPOLL) {
2026                                 /*
2027                                  * We disallow the app entering submit/complete
2028                                  * with polling, but we still need to lock the
2029                                  * ring to prevent racing with polled issue
2030                                  * that got punted to a workqueue.
2031                                  */
2032                                 mutex_lock(&ctx->uring_lock);
2033                                 io_iopoll_check(ctx, &nr_events, 0);
2034                                 mutex_unlock(&ctx->uring_lock);
2035                         } else {
2036                                 /*
2037                                  * Normal IO, just pretend everything completed.
2038                                  * We don't have to poll completions for that.
2039                                  */
2040                                 nr_events = inflight;
2041                         }
2042
2043                         inflight -= nr_events;
2044                         if (!inflight)
2045                                 timeout = jiffies + ctx->sq_thread_idle;
2046                 }
2047
2048                 if (!io_get_sqring(ctx, &sqes[0])) {
2049                         /*
2050                          * We're polling. If we're within the defined idle
2051                          * period, then let us spin without work before going
2052                          * to sleep.
2053                          */
2054                         if (inflight || !time_after(jiffies, timeout)) {
2055                                 cpu_relax();
2056                                 continue;
2057                         }
2058
2059                         /*
2060                          * Drop cur_mm before scheduling, we can't hold it for
2061                          * long periods (or over schedule()). Do this before
2062                          * adding ourselves to the waitqueue, as the unuse/drop
2063                          * may sleep.
2064                          */
2065                         if (cur_mm) {
2066                                 unuse_mm(cur_mm);
2067                                 mmput(cur_mm);
2068                                 cur_mm = NULL;
2069                         }
2070
2071                         prepare_to_wait(&ctx->sqo_wait, &wait,
2072                                                 TASK_INTERRUPTIBLE);
2073
2074                         /* Tell userspace we may need a wakeup call */
2075                         ctx->sq_ring->flags |= IORING_SQ_NEED_WAKEUP;
2076                         /* make sure to read SQ tail after writing flags */
2077                         smp_mb();
2078
2079                         if (!io_get_sqring(ctx, &sqes[0])) {
2080                                 if (kthread_should_stop()) {
2081                                         finish_wait(&ctx->sqo_wait, &wait);
2082                                         break;
2083                                 }
2084                                 if (signal_pending(current))
2085                                         flush_signals(current);
2086                                 schedule();
2087                                 finish_wait(&ctx->sqo_wait, &wait);
2088
2089                                 ctx->sq_ring->flags &= ~IORING_SQ_NEED_WAKEUP;
2090                                 continue;
2091                         }
2092                         finish_wait(&ctx->sqo_wait, &wait);
2093
2094                         ctx->sq_ring->flags &= ~IORING_SQ_NEED_WAKEUP;
2095                 }
2096
2097                 i = 0;
2098                 all_fixed = true;
2099                 do {
2100                         if (all_fixed && io_sqe_needs_user(sqes[i].sqe))
2101                                 all_fixed = false;
2102
2103                         i++;
2104                         if (i == ARRAY_SIZE(sqes))
2105                                 break;
2106                 } while (io_get_sqring(ctx, &sqes[i]));
2107
2108                 /* Unless all new commands are FIXED regions, grab mm */
2109                 if (!all_fixed && !cur_mm) {
2110                         mm_fault = !mmget_not_zero(ctx->sqo_mm);
2111                         if (!mm_fault) {
2112                                 use_mm(ctx->sqo_mm);
2113                                 cur_mm = ctx->sqo_mm;
2114                         }
2115                 }
2116
2117                 inflight += io_submit_sqes(ctx, sqes, i, cur_mm != NULL,
2118                                                 mm_fault);
2119
2120                 /* Commit SQ ring head once we've consumed all SQEs */
2121                 io_commit_sqring(ctx);
2122         }
2123
2124         set_fs(old_fs);
2125         if (cur_mm) {
2126                 unuse_mm(cur_mm);
2127                 mmput(cur_mm);
2128         }
2129
2130         if (kthread_should_park())
2131                 kthread_parkme();
2132
2133         return 0;
2134 }
2135
2136 static int io_ring_submit(struct io_ring_ctx *ctx, unsigned int to_submit)
2137 {
2138         struct io_submit_state state, *statep = NULL;
2139         int i, submit = 0;
2140
2141         if (to_submit > IO_PLUG_THRESHOLD) {
2142                 io_submit_state_start(&state, ctx, to_submit);
2143                 statep = &state;
2144         }
2145
2146         for (i = 0; i < to_submit; i++) {
2147                 struct sqe_submit s;
2148                 int ret;
2149
2150                 if (!io_get_sqring(ctx, &s))
2151                         break;
2152
2153                 s.has_user = true;
2154                 s.needs_lock = false;
2155                 s.needs_fixed_file = false;
2156                 submit++;
2157
2158                 ret = io_submit_sqe(ctx, &s, statep);
2159                 if (ret)
2160                         io_cqring_add_event(ctx, s.sqe->user_data, ret, 0);
2161         }
2162         io_commit_sqring(ctx);
2163
2164         if (statep)
2165                 io_submit_state_end(statep);
2166
2167         return submit;
2168 }
2169
2170 static unsigned io_cqring_events(struct io_cq_ring *ring)
2171 {
2172         return READ_ONCE(ring->r.tail) - READ_ONCE(ring->r.head);
2173 }
2174
2175 /*
2176  * Wait until events become available, if we don't already have some. The
2177  * application must reap them itself, as they reside on the shared cq ring.
2178  */
2179 static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events,
2180                           const sigset_t __user *sig, size_t sigsz)
2181 {
2182         struct io_cq_ring *ring = ctx->cq_ring;
2183         sigset_t ksigmask, sigsaved;
2184         DEFINE_WAIT(wait);
2185         int ret;
2186
2187         /* See comment at the top of this file */
2188         smp_rmb();
2189         if (io_cqring_events(ring) >= min_events)
2190                 return 0;
2191
2192         if (sig) {
2193 #ifdef CONFIG_COMPAT
2194                 if (in_compat_syscall())
2195                         ret = set_compat_user_sigmask((const compat_sigset_t __user *)sig,
2196                                                       &ksigmask, &sigsaved, sigsz);
2197                 else
2198 #endif
2199                         ret = set_user_sigmask(sig, &ksigmask,
2200                                                &sigsaved, sigsz);
2201
2202                 if (ret)
2203                         return ret;
2204         }
2205
2206         do {
2207                 prepare_to_wait(&ctx->wait, &wait, TASK_INTERRUPTIBLE);
2208
2209                 ret = 0;
2210                 /* See comment at the top of this file */
2211                 smp_rmb();
2212                 if (io_cqring_events(ring) >= min_events)
2213                         break;
2214
2215                 schedule();
2216
2217                 ret = -EINTR;
2218                 if (signal_pending(current))
2219                         break;
2220         } while (1);
2221
2222         finish_wait(&ctx->wait, &wait);
2223
2224         if (sig)
2225                 restore_user_sigmask(sig, &sigsaved);
2226
2227         return READ_ONCE(ring->r.head) == READ_ONCE(ring->r.tail) ? ret : 0;
2228 }
2229
2230 static void __io_sqe_files_unregister(struct io_ring_ctx *ctx)
2231 {
2232 #if defined(CONFIG_UNIX)
2233         if (ctx->ring_sock) {
2234                 struct sock *sock = ctx->ring_sock->sk;
2235                 struct sk_buff *skb;
2236
2237                 while ((skb = skb_dequeue(&sock->sk_receive_queue)) != NULL)
2238                         kfree_skb(skb);
2239         }
2240 #else
2241         int i;
2242
2243         for (i = 0; i < ctx->nr_user_files; i++)
2244                 fput(ctx->user_files[i]);
2245 #endif
2246 }
2247
2248 static int io_sqe_files_unregister(struct io_ring_ctx *ctx)
2249 {
2250         if (!ctx->user_files)
2251                 return -ENXIO;
2252
2253         __io_sqe_files_unregister(ctx);
2254         kfree(ctx->user_files);
2255         ctx->user_files = NULL;
2256         ctx->nr_user_files = 0;
2257         return 0;
2258 }
2259
2260 static void io_sq_thread_stop(struct io_ring_ctx *ctx)
2261 {
2262         if (ctx->sqo_thread) {
2263                 ctx->sqo_stop = 1;
2264                 mb();
2265                 kthread_park(ctx->sqo_thread);
2266                 kthread_stop(ctx->sqo_thread);
2267                 ctx->sqo_thread = NULL;
2268         }
2269 }
2270
2271 static void io_finish_async(struct io_ring_ctx *ctx)
2272 {
2273         io_sq_thread_stop(ctx);
2274
2275         if (ctx->sqo_wq) {
2276                 destroy_workqueue(ctx->sqo_wq);
2277                 ctx->sqo_wq = NULL;
2278         }
2279 }
2280
2281 #if defined(CONFIG_UNIX)
2282 static void io_destruct_skb(struct sk_buff *skb)
2283 {
2284         struct io_ring_ctx *ctx = skb->sk->sk_user_data;
2285
2286         io_finish_async(ctx);
2287         unix_destruct_scm(skb);
2288 }
2289
2290 /*
2291  * Ensure the UNIX gc is aware of our file set, so we are certain that
2292  * the io_uring can be safely unregistered on process exit, even if we have
2293  * loops in the file referencing.
2294  */
2295 static int __io_sqe_files_scm(struct io_ring_ctx *ctx, int nr, int offset)
2296 {
2297         struct sock *sk = ctx->ring_sock->sk;
2298         struct scm_fp_list *fpl;
2299         struct sk_buff *skb;
2300         int i;
2301
2302         if (!capable(CAP_SYS_RESOURCE) && !capable(CAP_SYS_ADMIN)) {
2303                 unsigned long inflight = ctx->user->unix_inflight + nr;
2304
2305                 if (inflight > task_rlimit(current, RLIMIT_NOFILE))
2306                         return -EMFILE;
2307         }
2308
2309         fpl = kzalloc(sizeof(*fpl), GFP_KERNEL);
2310         if (!fpl)
2311                 return -ENOMEM;
2312
2313         skb = alloc_skb(0, GFP_KERNEL);
2314         if (!skb) {
2315                 kfree(fpl);
2316                 return -ENOMEM;
2317         }
2318
2319         skb->sk = sk;
2320         skb->destructor = io_destruct_skb;
2321
2322         fpl->user = get_uid(ctx->user);
2323         for (i = 0; i < nr; i++) {
2324                 fpl->fp[i] = get_file(ctx->user_files[i + offset]);
2325                 unix_inflight(fpl->user, fpl->fp[i]);
2326         }
2327
2328         fpl->max = fpl->count = nr;
2329         UNIXCB(skb).fp = fpl;
2330         refcount_add(skb->truesize, &sk->sk_wmem_alloc);
2331         skb_queue_head(&sk->sk_receive_queue, skb);
2332
2333         for (i = 0; i < nr; i++)
2334                 fput(fpl->fp[i]);
2335
2336         return 0;
2337 }
2338
2339 /*
2340  * If UNIX sockets are enabled, fd passing can cause a reference cycle which
2341  * causes regular reference counting to break down. We rely on the UNIX
2342  * garbage collection to take care of this problem for us.
2343  */
2344 static int io_sqe_files_scm(struct io_ring_ctx *ctx)
2345 {
2346         unsigned left, total;
2347         int ret = 0;
2348
2349         total = 0;
2350         left = ctx->nr_user_files;
2351         while (left) {
2352                 unsigned this_files = min_t(unsigned, left, SCM_MAX_FD);
2353
2354                 ret = __io_sqe_files_scm(ctx, this_files, total);
2355                 if (ret)
2356                         break;
2357                 left -= this_files;
2358                 total += this_files;
2359         }
2360
2361         if (!ret)
2362                 return 0;
2363
2364         while (total < ctx->nr_user_files) {
2365                 fput(ctx->user_files[total]);
2366                 total++;
2367         }
2368
2369         return ret;
2370 }
2371 #else
2372 static int io_sqe_files_scm(struct io_ring_ctx *ctx)
2373 {
2374         return 0;
2375 }
2376 #endif
2377
2378 static int io_sqe_files_register(struct io_ring_ctx *ctx, void __user *arg,
2379                                  unsigned nr_args)
2380 {
2381         __s32 __user *fds = (__s32 __user *) arg;
2382         int fd, ret = 0;
2383         unsigned i;
2384
2385         if (ctx->user_files)
2386                 return -EBUSY;
2387         if (!nr_args)
2388                 return -EINVAL;
2389         if (nr_args > IORING_MAX_FIXED_FILES)
2390                 return -EMFILE;
2391
2392         ctx->user_files = kcalloc(nr_args, sizeof(struct file *), GFP_KERNEL);
2393         if (!ctx->user_files)
2394                 return -ENOMEM;
2395
2396         for (i = 0; i < nr_args; i++) {
2397                 ret = -EFAULT;
2398                 if (copy_from_user(&fd, &fds[i], sizeof(fd)))
2399                         break;
2400
2401                 ctx->user_files[i] = fget(fd);
2402
2403                 ret = -EBADF;
2404                 if (!ctx->user_files[i])
2405                         break;
2406                 /*
2407                  * Don't allow io_uring instances to be registered. If UNIX
2408                  * isn't enabled, then this causes a reference cycle and this
2409                  * instance can never get freed. If UNIX is enabled we'll
2410                  * handle it just fine, but there's still no point in allowing
2411                  * a ring fd as it doesn't support regular read/write anyway.
2412                  */
2413                 if (ctx->user_files[i]->f_op == &io_uring_fops) {
2414                         fput(ctx->user_files[i]);
2415                         break;
2416                 }
2417                 ctx->nr_user_files++;
2418                 ret = 0;
2419         }
2420
2421         if (ret) {
2422                 for (i = 0; i < ctx->nr_user_files; i++)
2423                         fput(ctx->user_files[i]);
2424
2425                 kfree(ctx->user_files);
2426                 ctx->user_files = NULL;
2427                 ctx->nr_user_files = 0;
2428                 return ret;
2429         }
2430
2431         ret = io_sqe_files_scm(ctx);
2432         if (ret)
2433                 io_sqe_files_unregister(ctx);
2434
2435         return ret;
2436 }
2437
2438 static int io_sq_offload_start(struct io_ring_ctx *ctx,
2439                                struct io_uring_params *p)
2440 {
2441         int ret;
2442
2443         init_waitqueue_head(&ctx->sqo_wait);
2444         mmgrab(current->mm);
2445         ctx->sqo_mm = current->mm;
2446
2447         if (ctx->flags & IORING_SETUP_SQPOLL) {
2448                 ret = -EPERM;
2449                 if (!capable(CAP_SYS_ADMIN))
2450                         goto err;
2451
2452                 ctx->sq_thread_idle = msecs_to_jiffies(p->sq_thread_idle);
2453                 if (!ctx->sq_thread_idle)
2454                         ctx->sq_thread_idle = HZ;
2455
2456                 if (p->flags & IORING_SETUP_SQ_AFF) {
2457                         int cpu = p->sq_thread_cpu;
2458
2459                         ret = -EINVAL;
2460                         if (cpu >= nr_cpu_ids)
2461                                 goto err;
2462                         if (!cpu_online(cpu))
2463                                 goto err;
2464
2465                         ctx->sqo_thread = kthread_create_on_cpu(io_sq_thread,
2466                                                         ctx, cpu,
2467                                                         "io_uring-sq");
2468                 } else {
2469                         ctx->sqo_thread = kthread_create(io_sq_thread, ctx,
2470                                                         "io_uring-sq");
2471                 }
2472                 if (IS_ERR(ctx->sqo_thread)) {
2473                         ret = PTR_ERR(ctx->sqo_thread);
2474                         ctx->sqo_thread = NULL;
2475                         goto err;
2476                 }
2477                 wake_up_process(ctx->sqo_thread);
2478         } else if (p->flags & IORING_SETUP_SQ_AFF) {
2479                 /* Can't have SQ_AFF without SQPOLL */
2480                 ret = -EINVAL;
2481                 goto err;
2482         }
2483
2484         /* Do QD, or 2 * CPUS, whatever is smallest */
2485         ctx->sqo_wq = alloc_workqueue("io_ring-wq", WQ_UNBOUND | WQ_FREEZABLE,
2486                         min(ctx->sq_entries - 1, 2 * num_online_cpus()));
2487         if (!ctx->sqo_wq) {
2488                 ret = -ENOMEM;
2489                 goto err;
2490         }
2491
2492         return 0;
2493 err:
2494         io_sq_thread_stop(ctx);
2495         mmdrop(ctx->sqo_mm);
2496         ctx->sqo_mm = NULL;
2497         return ret;
2498 }
2499
2500 static void io_unaccount_mem(struct user_struct *user, unsigned long nr_pages)
2501 {
2502         atomic_long_sub(nr_pages, &user->locked_vm);
2503 }
2504
2505 static int io_account_mem(struct user_struct *user, unsigned long nr_pages)
2506 {
2507         unsigned long page_limit, cur_pages, new_pages;
2508
2509         /* Don't allow more pages than we can safely lock */
2510         page_limit = rlimit(RLIMIT_MEMLOCK) >> PAGE_SHIFT;
2511
2512         do {
2513                 cur_pages = atomic_long_read(&user->locked_vm);
2514                 new_pages = cur_pages + nr_pages;
2515                 if (new_pages > page_limit)
2516                         return -ENOMEM;
2517         } while (atomic_long_cmpxchg(&user->locked_vm, cur_pages,
2518                                         new_pages) != cur_pages);
2519
2520         return 0;
2521 }
2522
2523 static void io_mem_free(void *ptr)
2524 {
2525         struct page *page;
2526
2527         if (!ptr)
2528                 return;
2529
2530         page = virt_to_head_page(ptr);
2531         if (put_page_testzero(page))
2532                 free_compound_page(page);
2533 }
2534
2535 static void *io_mem_alloc(size_t size)
2536 {
2537         gfp_t gfp_flags = GFP_KERNEL | __GFP_ZERO | __GFP_NOWARN | __GFP_COMP |
2538                                 __GFP_NORETRY;
2539
2540         return (void *) __get_free_pages(gfp_flags, get_order(size));
2541 }
2542
2543 static unsigned long ring_pages(unsigned sq_entries, unsigned cq_entries)
2544 {
2545         struct io_sq_ring *sq_ring;
2546         struct io_cq_ring *cq_ring;
2547         size_t bytes;
2548
2549         bytes = struct_size(sq_ring, array, sq_entries);
2550         bytes += array_size(sizeof(struct io_uring_sqe), sq_entries);
2551         bytes += struct_size(cq_ring, cqes, cq_entries);
2552
2553         return (bytes + PAGE_SIZE - 1) / PAGE_SIZE;
2554 }
2555
2556 static int io_sqe_buffer_unregister(struct io_ring_ctx *ctx)
2557 {
2558         int i, j;
2559
2560         if (!ctx->user_bufs)
2561                 return -ENXIO;
2562
2563         for (i = 0; i < ctx->nr_user_bufs; i++) {
2564                 struct io_mapped_ubuf *imu = &ctx->user_bufs[i];
2565
2566                 for (j = 0; j < imu->nr_bvecs; j++)
2567                         put_page(imu->bvec[j].bv_page);
2568
2569                 if (ctx->account_mem)
2570                         io_unaccount_mem(ctx->user, imu->nr_bvecs);
2571                 kvfree(imu->bvec);
2572                 imu->nr_bvecs = 0;
2573         }
2574
2575         kfree(ctx->user_bufs);
2576         ctx->user_bufs = NULL;
2577         ctx->nr_user_bufs = 0;
2578         return 0;
2579 }
2580
2581 static int io_copy_iov(struct io_ring_ctx *ctx, struct iovec *dst,
2582                        void __user *arg, unsigned index)
2583 {
2584         struct iovec __user *src;
2585
2586 #ifdef CONFIG_COMPAT
2587         if (ctx->compat) {
2588                 struct compat_iovec __user *ciovs;
2589                 struct compat_iovec ciov;
2590
2591                 ciovs = (struct compat_iovec __user *) arg;
2592                 if (copy_from_user(&ciov, &ciovs[index], sizeof(ciov)))
2593                         return -EFAULT;
2594
2595                 dst->iov_base = (void __user *) (unsigned long) ciov.iov_base;
2596                 dst->iov_len = ciov.iov_len;
2597                 return 0;
2598         }
2599 #endif
2600         src = (struct iovec __user *) arg;
2601         if (copy_from_user(dst, &src[index], sizeof(*dst)))
2602                 return -EFAULT;
2603         return 0;
2604 }
2605
2606 static int io_sqe_buffer_register(struct io_ring_ctx *ctx, void __user *arg,
2607                                   unsigned nr_args)
2608 {
2609         struct vm_area_struct **vmas = NULL;
2610         struct page **pages = NULL;
2611         int i, j, got_pages = 0;
2612         int ret = -EINVAL;
2613
2614         if (ctx->user_bufs)
2615                 return -EBUSY;
2616         if (!nr_args || nr_args > UIO_MAXIOV)
2617                 return -EINVAL;
2618
2619         ctx->user_bufs = kcalloc(nr_args, sizeof(struct io_mapped_ubuf),
2620                                         GFP_KERNEL);
2621         if (!ctx->user_bufs)
2622                 return -ENOMEM;
2623
2624         for (i = 0; i < nr_args; i++) {
2625                 struct io_mapped_ubuf *imu = &ctx->user_bufs[i];
2626                 unsigned long off, start, end, ubuf;
2627                 int pret, nr_pages;
2628                 struct iovec iov;
2629                 size_t size;
2630
2631                 ret = io_copy_iov(ctx, &iov, arg, i);
2632                 if (ret)
2633                         break;
2634
2635                 /*
2636                  * Don't impose further limits on the size and buffer
2637                  * constraints here, we'll -EINVAL later when IO is
2638                  * submitted if they are wrong.
2639                  */
2640                 ret = -EFAULT;
2641                 if (!iov.iov_base || !iov.iov_len)
2642                         goto err;
2643
2644                 /* arbitrary limit, but we need something */
2645                 if (iov.iov_len > SZ_1G)
2646                         goto err;
2647
2648                 ubuf = (unsigned long) iov.iov_base;
2649                 end = (ubuf + iov.iov_len + PAGE_SIZE - 1) >> PAGE_SHIFT;
2650                 start = ubuf >> PAGE_SHIFT;
2651                 nr_pages = end - start;
2652
2653                 if (ctx->account_mem) {
2654                         ret = io_account_mem(ctx->user, nr_pages);
2655                         if (ret)
2656                                 goto err;
2657                 }
2658
2659                 ret = 0;
2660                 if (!pages || nr_pages > got_pages) {
2661                         kfree(vmas);
2662                         kfree(pages);
2663                         pages = kvmalloc_array(nr_pages, sizeof(struct page *),
2664                                                 GFP_KERNEL);
2665                         vmas = kvmalloc_array(nr_pages,
2666                                         sizeof(struct vm_area_struct *),
2667                                         GFP_KERNEL);
2668                         if (!pages || !vmas) {
2669                                 ret = -ENOMEM;
2670                                 if (ctx->account_mem)
2671                                         io_unaccount_mem(ctx->user, nr_pages);
2672                                 goto err;
2673                         }
2674                         got_pages = nr_pages;
2675                 }
2676
2677                 imu->bvec = kvmalloc_array(nr_pages, sizeof(struct bio_vec),
2678                                                 GFP_KERNEL);
2679                 ret = -ENOMEM;
2680                 if (!imu->bvec) {
2681                         if (ctx->account_mem)
2682                                 io_unaccount_mem(ctx->user, nr_pages);
2683                         goto err;
2684                 }
2685
2686                 ret = 0;
2687                 down_read(&current->mm->mmap_sem);
2688                 pret = get_user_pages_longterm(ubuf, nr_pages, FOLL_WRITE,
2689                                                 pages, vmas);
2690                 if (pret == nr_pages) {
2691                         /* don't support file backed memory */
2692                         for (j = 0; j < nr_pages; j++) {
2693                                 struct vm_area_struct *vma = vmas[j];
2694
2695                                 if (vma->vm_file &&
2696                                     !is_file_hugepages(vma->vm_file)) {
2697                                         ret = -EOPNOTSUPP;
2698                                         break;
2699                                 }
2700                         }
2701                 } else {
2702                         ret = pret < 0 ? pret : -EFAULT;
2703                 }
2704                 up_read(&current->mm->mmap_sem);
2705                 if (ret) {
2706                         /*
2707                          * if we did partial map, or found file backed vmas,
2708                          * release any pages we did get
2709                          */
2710                         if (pret > 0) {
2711                                 for (j = 0; j < pret; j++)
2712                                         put_page(pages[j]);
2713                         }
2714                         if (ctx->account_mem)
2715                                 io_unaccount_mem(ctx->user, nr_pages);
2716                         kvfree(imu->bvec);
2717                         goto err;
2718                 }
2719
2720                 off = ubuf & ~PAGE_MASK;
2721                 size = iov.iov_len;
2722                 for (j = 0; j < nr_pages; j++) {
2723                         size_t vec_len;
2724
2725                         vec_len = min_t(size_t, size, PAGE_SIZE - off);
2726                         imu->bvec[j].bv_page = pages[j];
2727                         imu->bvec[j].bv_len = vec_len;
2728                         imu->bvec[j].bv_offset = off;
2729                         off = 0;
2730                         size -= vec_len;
2731                 }
2732                 /* store original address for later verification */
2733                 imu->ubuf = ubuf;
2734                 imu->len = iov.iov_len;
2735                 imu->nr_bvecs = nr_pages;
2736
2737                 ctx->nr_user_bufs++;
2738         }
2739         kvfree(pages);
2740         kvfree(vmas);
2741         return 0;
2742 err:
2743         kvfree(pages);
2744         kvfree(vmas);
2745         io_sqe_buffer_unregister(ctx);
2746         return ret;
2747 }
2748
2749 static int io_eventfd_register(struct io_ring_ctx *ctx, void __user *arg)
2750 {
2751         __s32 __user *fds = arg;
2752         int fd;
2753
2754         if (ctx->cq_ev_fd)
2755                 return -EBUSY;
2756
2757         if (copy_from_user(&fd, fds, sizeof(*fds)))
2758                 return -EFAULT;
2759
2760         ctx->cq_ev_fd = eventfd_ctx_fdget(fd);
2761         if (IS_ERR(ctx->cq_ev_fd)) {
2762                 int ret = PTR_ERR(ctx->cq_ev_fd);
2763                 ctx->cq_ev_fd = NULL;
2764                 return ret;
2765         }
2766
2767         return 0;
2768 }
2769
2770 static int io_eventfd_unregister(struct io_ring_ctx *ctx)
2771 {
2772         if (ctx->cq_ev_fd) {
2773                 eventfd_ctx_put(ctx->cq_ev_fd);
2774                 ctx->cq_ev_fd = NULL;
2775                 return 0;
2776         }
2777
2778         return -ENXIO;
2779 }
2780
2781 static void io_ring_ctx_free(struct io_ring_ctx *ctx)
2782 {
2783         io_finish_async(ctx);
2784         if (ctx->sqo_mm)
2785                 mmdrop(ctx->sqo_mm);
2786
2787         io_iopoll_reap_events(ctx);
2788         io_sqe_buffer_unregister(ctx);
2789         io_sqe_files_unregister(ctx);
2790         io_eventfd_unregister(ctx);
2791
2792 #if defined(CONFIG_UNIX)
2793         if (ctx->ring_sock)
2794                 sock_release(ctx->ring_sock);
2795 #endif
2796
2797         io_mem_free(ctx->sq_ring);
2798         io_mem_free(ctx->sq_sqes);
2799         io_mem_free(ctx->cq_ring);
2800
2801         percpu_ref_exit(&ctx->refs);
2802         if (ctx->account_mem)
2803                 io_unaccount_mem(ctx->user,
2804                                 ring_pages(ctx->sq_entries, ctx->cq_entries));
2805         free_uid(ctx->user);
2806         kfree(ctx);
2807 }
2808
2809 static __poll_t io_uring_poll(struct file *file, poll_table *wait)
2810 {
2811         struct io_ring_ctx *ctx = file->private_data;
2812         __poll_t mask = 0;
2813
2814         poll_wait(file, &ctx->cq_wait, wait);
2815         /*
2816          * synchronizes with barrier from wq_has_sleeper call in
2817          * io_commit_cqring
2818          */
2819         smp_rmb();
2820         if (READ_ONCE(ctx->sq_ring->r.tail) - ctx->cached_sq_head !=
2821             ctx->sq_ring->ring_entries)
2822                 mask |= EPOLLOUT | EPOLLWRNORM;
2823         if (READ_ONCE(ctx->cq_ring->r.head) != ctx->cached_cq_tail)
2824                 mask |= EPOLLIN | EPOLLRDNORM;
2825
2826         return mask;
2827 }
2828
2829 static int io_uring_fasync(int fd, struct file *file, int on)
2830 {
2831         struct io_ring_ctx *ctx = file->private_data;
2832
2833         return fasync_helper(fd, file, on, &ctx->cq_fasync);
2834 }
2835
2836 static void io_ring_ctx_wait_and_kill(struct io_ring_ctx *ctx)
2837 {
2838         mutex_lock(&ctx->uring_lock);
2839         percpu_ref_kill(&ctx->refs);
2840         mutex_unlock(&ctx->uring_lock);
2841
2842         io_poll_remove_all(ctx);
2843         io_iopoll_reap_events(ctx);
2844         wait_for_completion(&ctx->ctx_done);
2845         io_ring_ctx_free(ctx);
2846 }
2847
2848 static int io_uring_release(struct inode *inode, struct file *file)
2849 {
2850         struct io_ring_ctx *ctx = file->private_data;
2851
2852         file->private_data = NULL;
2853         io_ring_ctx_wait_and_kill(ctx);
2854         return 0;
2855 }
2856
2857 static int io_uring_mmap(struct file *file, struct vm_area_struct *vma)
2858 {
2859         loff_t offset = (loff_t) vma->vm_pgoff << PAGE_SHIFT;
2860         unsigned long sz = vma->vm_end - vma->vm_start;
2861         struct io_ring_ctx *ctx = file->private_data;
2862         unsigned long pfn;
2863         struct page *page;
2864         void *ptr;
2865
2866         switch (offset) {
2867         case IORING_OFF_SQ_RING:
2868                 ptr = ctx->sq_ring;
2869                 break;
2870         case IORING_OFF_SQES:
2871                 ptr = ctx->sq_sqes;
2872                 break;
2873         case IORING_OFF_CQ_RING:
2874                 ptr = ctx->cq_ring;
2875                 break;
2876         default:
2877                 return -EINVAL;
2878         }
2879
2880         page = virt_to_head_page(ptr);
2881         if (sz > (PAGE_SIZE << compound_order(page)))
2882                 return -EINVAL;
2883
2884         pfn = virt_to_phys(ptr) >> PAGE_SHIFT;
2885         return remap_pfn_range(vma, vma->vm_start, pfn, sz, vma->vm_page_prot);
2886 }
2887
2888 SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit,
2889                 u32, min_complete, u32, flags, const sigset_t __user *, sig,
2890                 size_t, sigsz)
2891 {
2892         struct io_ring_ctx *ctx;
2893         long ret = -EBADF;
2894         int submitted = 0;
2895         struct fd f;
2896
2897         if (flags & ~(IORING_ENTER_GETEVENTS | IORING_ENTER_SQ_WAKEUP))
2898                 return -EINVAL;
2899
2900         f = fdget(fd);
2901         if (!f.file)
2902                 return -EBADF;
2903
2904         ret = -EOPNOTSUPP;
2905         if (f.file->f_op != &io_uring_fops)
2906                 goto out_fput;
2907
2908         ret = -ENXIO;
2909         ctx = f.file->private_data;
2910         if (!percpu_ref_tryget(&ctx->refs))
2911                 goto out_fput;
2912
2913         /*
2914          * For SQ polling, the thread will do all submissions and completions.
2915          * Just return the requested submit count, and wake the thread if
2916          * we were asked to.
2917          */
2918         if (ctx->flags & IORING_SETUP_SQPOLL) {
2919                 if (flags & IORING_ENTER_SQ_WAKEUP)
2920                         wake_up(&ctx->sqo_wait);
2921                 submitted = to_submit;
2922                 goto out_ctx;
2923         }
2924
2925         ret = 0;
2926         if (to_submit) {
2927                 to_submit = min(to_submit, ctx->sq_entries);
2928
2929                 mutex_lock(&ctx->uring_lock);
2930                 submitted = io_ring_submit(ctx, to_submit);
2931                 mutex_unlock(&ctx->uring_lock);
2932         }
2933         if (flags & IORING_ENTER_GETEVENTS) {
2934                 unsigned nr_events = 0;
2935
2936                 min_complete = min(min_complete, ctx->cq_entries);
2937
2938                 if (ctx->flags & IORING_SETUP_IOPOLL) {
2939                         mutex_lock(&ctx->uring_lock);
2940                         ret = io_iopoll_check(ctx, &nr_events, min_complete);
2941                         mutex_unlock(&ctx->uring_lock);
2942                 } else {
2943                         ret = io_cqring_wait(ctx, min_complete, sig, sigsz);
2944                 }
2945         }
2946
2947 out_ctx:
2948         io_ring_drop_ctx_refs(ctx, 1);
2949 out_fput:
2950         fdput(f);
2951         return submitted ? submitted : ret;
2952 }
2953
2954 static const struct file_operations io_uring_fops = {
2955         .release        = io_uring_release,
2956         .mmap           = io_uring_mmap,
2957         .poll           = io_uring_poll,
2958         .fasync         = io_uring_fasync,
2959 };
2960
2961 static int io_allocate_scq_urings(struct io_ring_ctx *ctx,
2962                                   struct io_uring_params *p)
2963 {
2964         struct io_sq_ring *sq_ring;
2965         struct io_cq_ring *cq_ring;
2966         size_t size;
2967
2968         sq_ring = io_mem_alloc(struct_size(sq_ring, array, p->sq_entries));
2969         if (!sq_ring)
2970                 return -ENOMEM;
2971
2972         ctx->sq_ring = sq_ring;
2973         sq_ring->ring_mask = p->sq_entries - 1;
2974         sq_ring->ring_entries = p->sq_entries;
2975         ctx->sq_mask = sq_ring->ring_mask;
2976         ctx->sq_entries = sq_ring->ring_entries;
2977
2978         size = array_size(sizeof(struct io_uring_sqe), p->sq_entries);
2979         if (size == SIZE_MAX)
2980                 return -EOVERFLOW;
2981
2982         ctx->sq_sqes = io_mem_alloc(size);
2983         if (!ctx->sq_sqes)
2984                 return -ENOMEM;
2985
2986         cq_ring = io_mem_alloc(struct_size(cq_ring, cqes, p->cq_entries));
2987         if (!cq_ring)
2988                 return -ENOMEM;
2989
2990         ctx->cq_ring = cq_ring;
2991         cq_ring->ring_mask = p->cq_entries - 1;
2992         cq_ring->ring_entries = p->cq_entries;
2993         ctx->cq_mask = cq_ring->ring_mask;
2994         ctx->cq_entries = cq_ring->ring_entries;
2995         return 0;
2996 }
2997
2998 /*
2999  * Allocate an anonymous fd, this is what constitutes the application
3000  * visible backing of an io_uring instance. The application mmaps this
3001  * fd to gain access to the SQ/CQ ring details. If UNIX sockets are enabled,
3002  * we have to tie this fd to a socket for file garbage collection purposes.
3003  */
3004 static int io_uring_get_fd(struct io_ring_ctx *ctx)
3005 {
3006         struct file *file;
3007         int ret;
3008
3009 #if defined(CONFIG_UNIX)
3010         ret = sock_create_kern(&init_net, PF_UNIX, SOCK_RAW, IPPROTO_IP,
3011                                 &ctx->ring_sock);
3012         if (ret)
3013                 return ret;
3014 #endif
3015
3016         ret = get_unused_fd_flags(O_RDWR | O_CLOEXEC);
3017         if (ret < 0)
3018                 goto err;
3019
3020         file = anon_inode_getfile("[io_uring]", &io_uring_fops, ctx,
3021                                         O_RDWR | O_CLOEXEC);
3022         if (IS_ERR(file)) {
3023                 put_unused_fd(ret);
3024                 ret = PTR_ERR(file);
3025                 goto err;
3026         }
3027
3028 #if defined(CONFIG_UNIX)
3029         ctx->ring_sock->file = file;
3030         ctx->ring_sock->sk->sk_user_data = ctx;
3031 #endif
3032         fd_install(ret, file);
3033         return ret;
3034 err:
3035 #if defined(CONFIG_UNIX)
3036         sock_release(ctx->ring_sock);
3037         ctx->ring_sock = NULL;
3038 #endif
3039         return ret;
3040 }
3041
3042 static int io_uring_create(unsigned entries, struct io_uring_params *p)
3043 {
3044         struct user_struct *user = NULL;
3045         struct io_ring_ctx *ctx;
3046         bool account_mem;
3047         int ret;
3048
3049         if (!entries || entries > IORING_MAX_ENTRIES)
3050                 return -EINVAL;
3051
3052         /*
3053          * Use twice as many entries for the CQ ring. It's possible for the
3054          * application to drive a higher depth than the size of the SQ ring,
3055          * since the sqes are only used at submission time. This allows for
3056          * some flexibility in overcommitting a bit.
3057          */
3058         p->sq_entries = roundup_pow_of_two(entries);
3059         p->cq_entries = 2 * p->sq_entries;
3060
3061         user = get_uid(current_user());
3062         account_mem = !capable(CAP_IPC_LOCK);
3063
3064         if (account_mem) {
3065                 ret = io_account_mem(user,
3066                                 ring_pages(p->sq_entries, p->cq_entries));
3067                 if (ret) {
3068                         free_uid(user);
3069                         return ret;
3070                 }
3071         }
3072
3073         ctx = io_ring_ctx_alloc(p);
3074         if (!ctx) {
3075                 if (account_mem)
3076                         io_unaccount_mem(user, ring_pages(p->sq_entries,
3077                                                                 p->cq_entries));
3078                 free_uid(user);
3079                 return -ENOMEM;
3080         }
3081         ctx->compat = in_compat_syscall();
3082         ctx->account_mem = account_mem;
3083         ctx->user = user;
3084
3085         ret = io_allocate_scq_urings(ctx, p);
3086         if (ret)
3087                 goto err;
3088
3089         ret = io_sq_offload_start(ctx, p);
3090         if (ret)
3091                 goto err;
3092
3093         ret = io_uring_get_fd(ctx);
3094         if (ret < 0)
3095                 goto err;
3096
3097         memset(&p->sq_off, 0, sizeof(p->sq_off));
3098         p->sq_off.head = offsetof(struct io_sq_ring, r.head);
3099         p->sq_off.tail = offsetof(struct io_sq_ring, r.tail);
3100         p->sq_off.ring_mask = offsetof(struct io_sq_ring, ring_mask);
3101         p->sq_off.ring_entries = offsetof(struct io_sq_ring, ring_entries);
3102         p->sq_off.flags = offsetof(struct io_sq_ring, flags);
3103         p->sq_off.dropped = offsetof(struct io_sq_ring, dropped);
3104         p->sq_off.array = offsetof(struct io_sq_ring, array);
3105
3106         memset(&p->cq_off, 0, sizeof(p->cq_off));
3107         p->cq_off.head = offsetof(struct io_cq_ring, r.head);
3108         p->cq_off.tail = offsetof(struct io_cq_ring, r.tail);
3109         p->cq_off.ring_mask = offsetof(struct io_cq_ring, ring_mask);
3110         p->cq_off.ring_entries = offsetof(struct io_cq_ring, ring_entries);
3111         p->cq_off.overflow = offsetof(struct io_cq_ring, overflow);
3112         p->cq_off.cqes = offsetof(struct io_cq_ring, cqes);
3113         return ret;
3114 err:
3115         io_ring_ctx_wait_and_kill(ctx);
3116         return ret;
3117 }
3118
3119 /*
3120  * Sets up an aio uring context, and returns the fd. Applications asks for a
3121  * ring size, we return the actual sq/cq ring sizes (among other things) in the
3122  * params structure passed in.
3123  */
3124 static long io_uring_setup(u32 entries, struct io_uring_params __user *params)
3125 {
3126         struct io_uring_params p;
3127         long ret;
3128         int i;
3129
3130         if (copy_from_user(&p, params, sizeof(p)))
3131                 return -EFAULT;
3132         for (i = 0; i < ARRAY_SIZE(p.resv); i++) {
3133                 if (p.resv[i])
3134                         return -EINVAL;
3135         }
3136
3137         if (p.flags & ~(IORING_SETUP_IOPOLL | IORING_SETUP_SQPOLL |
3138                         IORING_SETUP_SQ_AFF))
3139                 return -EINVAL;
3140
3141         ret = io_uring_create(entries, &p);
3142         if (ret < 0)
3143                 return ret;
3144
3145         if (copy_to_user(params, &p, sizeof(p)))
3146                 return -EFAULT;
3147
3148         return ret;
3149 }
3150
3151 SYSCALL_DEFINE2(io_uring_setup, u32, entries,
3152                 struct io_uring_params __user *, params)
3153 {
3154         return io_uring_setup(entries, params);
3155 }
3156
3157 static int __io_uring_register(struct io_ring_ctx *ctx, unsigned opcode,
3158                                void __user *arg, unsigned nr_args)
3159         __releases(ctx->uring_lock)
3160         __acquires(ctx->uring_lock)
3161 {
3162         int ret;
3163
3164         /*
3165          * We're inside the ring mutex, if the ref is already dying, then
3166          * someone else killed the ctx or is already going through
3167          * io_uring_register().
3168          */
3169         if (percpu_ref_is_dying(&ctx->refs))
3170                 return -ENXIO;
3171
3172         percpu_ref_kill(&ctx->refs);
3173
3174         /*
3175          * Drop uring mutex before waiting for references to exit. If another
3176          * thread is currently inside io_uring_enter() it might need to grab
3177          * the uring_lock to make progress. If we hold it here across the drain
3178          * wait, then we can deadlock. It's safe to drop the mutex here, since
3179          * no new references will come in after we've killed the percpu ref.
3180          */
3181         mutex_unlock(&ctx->uring_lock);
3182         wait_for_completion(&ctx->ctx_done);
3183         mutex_lock(&ctx->uring_lock);
3184
3185         switch (opcode) {
3186         case IORING_REGISTER_BUFFERS:
3187                 ret = io_sqe_buffer_register(ctx, arg, nr_args);
3188                 break;
3189         case IORING_UNREGISTER_BUFFERS:
3190                 ret = -EINVAL;
3191                 if (arg || nr_args)
3192                         break;
3193                 ret = io_sqe_buffer_unregister(ctx);
3194                 break;
3195         case IORING_REGISTER_FILES:
3196                 ret = io_sqe_files_register(ctx, arg, nr_args);
3197                 break;
3198         case IORING_UNREGISTER_FILES:
3199                 ret = -EINVAL;
3200                 if (arg || nr_args)
3201                         break;
3202                 ret = io_sqe_files_unregister(ctx);
3203                 break;
3204         case IORING_REGISTER_EVENTFD:
3205                 ret = -EINVAL;
3206                 if (nr_args != 1)
3207                         break;
3208                 ret = io_eventfd_register(ctx, arg);
3209                 break;
3210         case IORING_UNREGISTER_EVENTFD:
3211                 ret = -EINVAL;
3212                 if (arg || nr_args)
3213                         break;
3214                 ret = io_eventfd_unregister(ctx);
3215                 break;
3216         default:
3217                 ret = -EINVAL;
3218                 break;
3219         }
3220
3221         /* bring the ctx back to life */
3222         reinit_completion(&ctx->ctx_done);
3223         percpu_ref_reinit(&ctx->refs);
3224         return ret;
3225 }
3226
3227 SYSCALL_DEFINE4(io_uring_register, unsigned int, fd, unsigned int, opcode,
3228                 void __user *, arg, unsigned int, nr_args)
3229 {
3230         struct io_ring_ctx *ctx;
3231         long ret = -EBADF;
3232         struct fd f;
3233
3234         f = fdget(fd);
3235         if (!f.file)
3236                 return -EBADF;
3237
3238         ret = -EOPNOTSUPP;
3239         if (f.file->f_op != &io_uring_fops)
3240                 goto out_fput;
3241
3242         ctx = f.file->private_data;
3243
3244         mutex_lock(&ctx->uring_lock);
3245         ret = __io_uring_register(ctx, opcode, arg, nr_args);
3246         mutex_unlock(&ctx->uring_lock);
3247 out_fput:
3248         fdput(f);
3249         return ret;
3250 }
3251
3252 static int __init io_uring_init(void)
3253 {
3254         req_cachep = KMEM_CACHE(io_kiocb, SLAB_HWCACHE_ALIGN | SLAB_PANIC);
3255         return 0;
3256 };
3257 __initcall(io_uring_init);