OSDN Git Service

7c44b0ef10d7937561767242bb5c2af23b363546
[tomoyo/tomoyo-test1.git] / fs / io_uring.c
1 // SPDX-License-Identifier: GPL-2.0
2 /*
3  * Shared application/kernel submission and completion ring pairs, for
4  * supporting fast/efficient IO.
5  *
6  * A note on the read/write ordering memory barriers that are matched between
7  * the application and kernel side.
8  *
9  * After the application reads the CQ ring tail, it must use an
10  * appropriate smp_rmb() to pair with the smp_wmb() the kernel uses
11  * before writing the tail (using smp_load_acquire to read the tail will
12  * do). It also needs a smp_mb() before updating CQ head (ordering the
13  * entry load(s) with the head store), pairing with an implicit barrier
14  * through a control-dependency in io_get_cqring (smp_store_release to
15  * store head will do). Failure to do so could lead to reading invalid
16  * CQ entries.
17  *
18  * Likewise, the application must use an appropriate smp_wmb() before
19  * writing the SQ tail (ordering SQ entry stores with the tail store),
20  * which pairs with smp_load_acquire in io_get_sqring (smp_store_release
21  * to store the tail will do). And it needs a barrier ordering the SQ
22  * head load before writing new SQ entries (smp_load_acquire to read
23  * head will do).
24  *
25  * When using the SQ poll thread (IORING_SETUP_SQPOLL), the application
26  * needs to check the SQ flags for IORING_SQ_NEED_WAKEUP *after*
27  * updating the SQ tail; a full memory barrier smp_mb() is needed
28  * between.
29  *
30  * Also see the examples in the liburing library:
31  *
32  *      git://git.kernel.dk/liburing
33  *
34  * io_uring also uses READ/WRITE_ONCE() for _any_ store or load that happens
35  * from data shared between the kernel and application. This is done both
36  * for ordering purposes, but also to ensure that once a value is loaded from
37  * data that the application could potentially modify, it remains stable.
38  *
39  * Copyright (C) 2018-2019 Jens Axboe
40  * Copyright (c) 2018-2019 Christoph Hellwig
41  */
42 #include <linux/kernel.h>
43 #include <linux/init.h>
44 #include <linux/errno.h>
45 #include <linux/syscalls.h>
46 #include <linux/compat.h>
47 #include <linux/refcount.h>
48 #include <linux/uio.h>
49
50 #include <linux/sched/signal.h>
51 #include <linux/fs.h>
52 #include <linux/file.h>
53 #include <linux/fdtable.h>
54 #include <linux/mm.h>
55 #include <linux/mman.h>
56 #include <linux/mmu_context.h>
57 #include <linux/percpu.h>
58 #include <linux/slab.h>
59 #include <linux/kthread.h>
60 #include <linux/blkdev.h>
61 #include <linux/bvec.h>
62 #include <linux/net.h>
63 #include <net/sock.h>
64 #include <net/af_unix.h>
65 #include <net/scm.h>
66 #include <linux/anon_inodes.h>
67 #include <linux/sched/mm.h>
68 #include <linux/uaccess.h>
69 #include <linux/nospec.h>
70 #include <linux/sizes.h>
71 #include <linux/hugetlb.h>
72 #include <linux/highmem.h>
73 #include <linux/namei.h>
74 #include <linux/fsnotify.h>
75 #include <linux/fadvise.h>
76
77 #define CREATE_TRACE_POINTS
78 #include <trace/events/io_uring.h>
79
80 #include <uapi/linux/io_uring.h>
81
82 #include "internal.h"
83 #include "io-wq.h"
84
85 #define IORING_MAX_ENTRIES      32768
86 #define IORING_MAX_CQ_ENTRIES   (2 * IORING_MAX_ENTRIES)
87
88 /*
89  * Shift of 9 is 512 entries, or exactly one page on 64-bit archs
90  */
91 #define IORING_FILE_TABLE_SHIFT 9
92 #define IORING_MAX_FILES_TABLE  (1U << IORING_FILE_TABLE_SHIFT)
93 #define IORING_FILE_TABLE_MASK  (IORING_MAX_FILES_TABLE - 1)
94 #define IORING_MAX_FIXED_FILES  (64 * IORING_MAX_FILES_TABLE)
95
96 struct io_uring {
97         u32 head ____cacheline_aligned_in_smp;
98         u32 tail ____cacheline_aligned_in_smp;
99 };
100
101 /*
102  * This data is shared with the application through the mmap at offsets
103  * IORING_OFF_SQ_RING and IORING_OFF_CQ_RING.
104  *
105  * The offsets to the member fields are published through struct
106  * io_sqring_offsets when calling io_uring_setup.
107  */
108 struct io_rings {
109         /*
110          * Head and tail offsets into the ring; the offsets need to be
111          * masked to get valid indices.
112          *
113          * The kernel controls head of the sq ring and the tail of the cq ring,
114          * and the application controls tail of the sq ring and the head of the
115          * cq ring.
116          */
117         struct io_uring         sq, cq;
118         /*
119          * Bitmasks to apply to head and tail offsets (constant, equals
120          * ring_entries - 1)
121          */
122         u32                     sq_ring_mask, cq_ring_mask;
123         /* Ring sizes (constant, power of 2) */
124         u32                     sq_ring_entries, cq_ring_entries;
125         /*
126          * Number of invalid entries dropped by the kernel due to
127          * invalid index stored in array
128          *
129          * Written by the kernel, shouldn't be modified by the
130          * application (i.e. get number of "new events" by comparing to
131          * cached value).
132          *
133          * After a new SQ head value was read by the application this
134          * counter includes all submissions that were dropped reaching
135          * the new SQ head (and possibly more).
136          */
137         u32                     sq_dropped;
138         /*
139          * Runtime flags
140          *
141          * Written by the kernel, shouldn't be modified by the
142          * application.
143          *
144          * The application needs a full memory barrier before checking
145          * for IORING_SQ_NEED_WAKEUP after updating the sq tail.
146          */
147         u32                     sq_flags;
148         /*
149          * Number of completion events lost because the queue was full;
150          * this should be avoided by the application by making sure
151          * there are not more requests pending than there is space in
152          * the completion queue.
153          *
154          * Written by the kernel, shouldn't be modified by the
155          * application (i.e. get number of "new events" by comparing to
156          * cached value).
157          *
158          * As completion events come in out of order this counter is not
159          * ordered with any other data.
160          */
161         u32                     cq_overflow;
162         /*
163          * Ring buffer of completion events.
164          *
165          * The kernel writes completion events fresh every time they are
166          * produced, so the application is allowed to modify pending
167          * entries.
168          */
169         struct io_uring_cqe     cqes[] ____cacheline_aligned_in_smp;
170 };
171
172 struct io_mapped_ubuf {
173         u64             ubuf;
174         size_t          len;
175         struct          bio_vec *bvec;
176         unsigned int    nr_bvecs;
177 };
178
179 struct fixed_file_table {
180         struct file             **files;
181 };
182
183 enum {
184         FFD_F_ATOMIC,
185 };
186
187 struct fixed_file_data {
188         struct fixed_file_table         *table;
189         struct io_ring_ctx              *ctx;
190
191         struct percpu_ref               refs;
192         struct llist_head               put_llist;
193         unsigned long                   state;
194         struct work_struct              ref_work;
195         struct completion               done;
196 };
197
198 struct io_ring_ctx {
199         struct {
200                 struct percpu_ref       refs;
201         } ____cacheline_aligned_in_smp;
202
203         struct {
204                 unsigned int            flags;
205                 bool                    compat;
206                 bool                    account_mem;
207                 bool                    cq_overflow_flushed;
208                 bool                    drain_next;
209
210                 /*
211                  * Ring buffer of indices into array of io_uring_sqe, which is
212                  * mmapped by the application using the IORING_OFF_SQES offset.
213                  *
214                  * This indirection could e.g. be used to assign fixed
215                  * io_uring_sqe entries to operations and only submit them to
216                  * the queue when needed.
217                  *
218                  * The kernel modifies neither the indices array nor the entries
219                  * array.
220                  */
221                 u32                     *sq_array;
222                 unsigned                cached_sq_head;
223                 unsigned                sq_entries;
224                 unsigned                sq_mask;
225                 unsigned                sq_thread_idle;
226                 unsigned                cached_sq_dropped;
227                 atomic_t                cached_cq_overflow;
228                 unsigned long           sq_check_overflow;
229
230                 struct list_head        defer_list;
231                 struct list_head        timeout_list;
232                 struct list_head        cq_overflow_list;
233
234                 wait_queue_head_t       inflight_wait;
235                 struct io_uring_sqe     *sq_sqes;
236         } ____cacheline_aligned_in_smp;
237
238         struct io_rings *rings;
239
240         /* IO offload */
241         struct io_wq            *io_wq;
242         struct task_struct      *sqo_thread;    /* if using sq thread polling */
243         struct mm_struct        *sqo_mm;
244         wait_queue_head_t       sqo_wait;
245
246         /*
247          * If used, fixed file set. Writers must ensure that ->refs is dead,
248          * readers must ensure that ->refs is alive as long as the file* is
249          * used. Only updated through io_uring_register(2).
250          */
251         struct fixed_file_data  *file_data;
252         unsigned                nr_user_files;
253
254         /* if used, fixed mapped user buffers */
255         unsigned                nr_user_bufs;
256         struct io_mapped_ubuf   *user_bufs;
257
258         struct user_struct      *user;
259
260         const struct cred       *creds;
261
262         /* 0 is for ctx quiesce/reinit/free, 1 is for sqo_thread started */
263         struct completion       *completions;
264
265         /* if all else fails... */
266         struct io_kiocb         *fallback_req;
267
268 #if defined(CONFIG_UNIX)
269         struct socket           *ring_sock;
270 #endif
271
272         struct {
273                 unsigned                cached_cq_tail;
274                 unsigned                cq_entries;
275                 unsigned                cq_mask;
276                 atomic_t                cq_timeouts;
277                 unsigned long           cq_check_overflow;
278                 struct wait_queue_head  cq_wait;
279                 struct fasync_struct    *cq_fasync;
280                 struct eventfd_ctx      *cq_ev_fd;
281         } ____cacheline_aligned_in_smp;
282
283         struct {
284                 struct mutex            uring_lock;
285                 wait_queue_head_t       wait;
286         } ____cacheline_aligned_in_smp;
287
288         struct {
289                 spinlock_t              completion_lock;
290                 struct llist_head       poll_llist;
291
292                 /*
293                  * ->poll_list is protected by the ctx->uring_lock for
294                  * io_uring instances that don't use IORING_SETUP_SQPOLL.
295                  * For SQPOLL, only the single threaded io_sq_thread() will
296                  * manipulate the list, hence no extra locking is needed there.
297                  */
298                 struct list_head        poll_list;
299                 struct hlist_head       *cancel_hash;
300                 unsigned                cancel_hash_bits;
301                 bool                    poll_multi_file;
302
303                 spinlock_t              inflight_lock;
304                 struct list_head        inflight_list;
305         } ____cacheline_aligned_in_smp;
306 };
307
308 /*
309  * First field must be the file pointer in all the
310  * iocb unions! See also 'struct kiocb' in <linux/fs.h>
311  */
312 struct io_poll_iocb {
313         struct file                     *file;
314         union {
315                 struct wait_queue_head  *head;
316                 u64                     addr;
317         };
318         __poll_t                        events;
319         bool                            done;
320         bool                            canceled;
321         struct wait_queue_entry         wait;
322 };
323
324 struct io_close {
325         struct file                     *file;
326         struct file                     *put_file;
327         int                             fd;
328 };
329
330 struct io_timeout_data {
331         struct io_kiocb                 *req;
332         struct hrtimer                  timer;
333         struct timespec64               ts;
334         enum hrtimer_mode               mode;
335         u32                             seq_offset;
336 };
337
338 struct io_accept {
339         struct file                     *file;
340         struct sockaddr __user          *addr;
341         int __user                      *addr_len;
342         int                             flags;
343 };
344
345 struct io_sync {
346         struct file                     *file;
347         loff_t                          len;
348         loff_t                          off;
349         int                             flags;
350         int                             mode;
351 };
352
353 struct io_cancel {
354         struct file                     *file;
355         u64                             addr;
356 };
357
358 struct io_timeout {
359         struct file                     *file;
360         u64                             addr;
361         int                             flags;
362         unsigned                        count;
363 };
364
365 struct io_rw {
366         /* NOTE: kiocb has the file as the first member, so don't do it here */
367         struct kiocb                    kiocb;
368         u64                             addr;
369         u64                             len;
370 };
371
372 struct io_connect {
373         struct file                     *file;
374         struct sockaddr __user          *addr;
375         int                             addr_len;
376 };
377
378 struct io_sr_msg {
379         struct file                     *file;
380         struct user_msghdr __user       *msg;
381         int                             msg_flags;
382 };
383
384 struct io_open {
385         struct file                     *file;
386         int                             dfd;
387         union {
388                 umode_t                 mode;
389                 unsigned                mask;
390         };
391         const char __user               *fname;
392         struct filename                 *filename;
393         struct statx __user             *buffer;
394         int                             flags;
395 };
396
397 struct io_files_update {
398         struct file                     *file;
399         u64                             arg;
400         u32                             nr_args;
401         u32                             offset;
402 };
403
404 struct io_fadvise {
405         struct file                     *file;
406         u64                             offset;
407         u32                             len;
408         u32                             advice;
409 };
410
411 struct io_madvise {
412         struct file                     *file;
413         u64                             addr;
414         u32                             len;
415         u32                             advice;
416 };
417
418 struct io_async_connect {
419         struct sockaddr_storage         address;
420 };
421
422 struct io_async_msghdr {
423         struct iovec                    fast_iov[UIO_FASTIOV];
424         struct iovec                    *iov;
425         struct sockaddr __user          *uaddr;
426         struct msghdr                   msg;
427 };
428
429 struct io_async_rw {
430         struct iovec                    fast_iov[UIO_FASTIOV];
431         struct iovec                    *iov;
432         ssize_t                         nr_segs;
433         ssize_t                         size;
434 };
435
436 struct io_async_open {
437         struct filename                 *filename;
438 };
439
440 struct io_async_ctx {
441         union {
442                 struct io_async_rw      rw;
443                 struct io_async_msghdr  msg;
444                 struct io_async_connect connect;
445                 struct io_timeout_data  timeout;
446                 struct io_async_open    open;
447         };
448 };
449
450 /*
451  * NOTE! Each of the iocb union members has the file pointer
452  * as the first entry in their struct definition. So you can
453  * access the file pointer through any of the sub-structs,
454  * or directly as just 'ki_filp' in this struct.
455  */
456 struct io_kiocb {
457         union {
458                 struct file             *file;
459                 struct io_rw            rw;
460                 struct io_poll_iocb     poll;
461                 struct io_accept        accept;
462                 struct io_sync          sync;
463                 struct io_cancel        cancel;
464                 struct io_timeout       timeout;
465                 struct io_connect       connect;
466                 struct io_sr_msg        sr_msg;
467                 struct io_open          open;
468                 struct io_close         close;
469                 struct io_files_update  files_update;
470                 struct io_fadvise       fadvise;
471                 struct io_madvise       madvise;
472         };
473
474         struct io_async_ctx             *io;
475         union {
476                 /*
477                  * ring_file is only used in the submission path, and
478                  * llist_node is only used for poll deferred completions
479                  */
480                 struct file             *ring_file;
481                 struct llist_node       llist_node;
482         };
483         int                             ring_fd;
484         bool                            has_user;
485         bool                            in_async;
486         bool                            needs_fixed_file;
487         u8                              opcode;
488
489         struct io_ring_ctx      *ctx;
490         union {
491                 struct list_head        list;
492                 struct hlist_node       hash_node;
493         };
494         struct list_head        link_list;
495         unsigned int            flags;
496         refcount_t              refs;
497 #define REQ_F_NOWAIT            1       /* must not punt to workers */
498 #define REQ_F_IOPOLL_COMPLETED  2       /* polled IO has completed */
499 #define REQ_F_FIXED_FILE        4       /* ctx owns file */
500 #define REQ_F_LINK_NEXT         8       /* already grabbed next link */
501 #define REQ_F_IO_DRAIN          16      /* drain existing IO first */
502 #define REQ_F_IO_DRAINED        32      /* drain done */
503 #define REQ_F_LINK              64      /* linked sqes */
504 #define REQ_F_LINK_TIMEOUT      128     /* has linked timeout */
505 #define REQ_F_FAIL_LINK         256     /* fail rest of links */
506 #define REQ_F_DRAIN_LINK        512     /* link should be fully drained */
507 #define REQ_F_TIMEOUT           1024    /* timeout request */
508 #define REQ_F_ISREG             2048    /* regular file */
509 #define REQ_F_MUST_PUNT         4096    /* must be punted even for NONBLOCK */
510 #define REQ_F_TIMEOUT_NOSEQ     8192    /* no timeout sequence */
511 #define REQ_F_INFLIGHT          16384   /* on inflight list */
512 #define REQ_F_COMP_LOCKED       32768   /* completion under lock */
513 #define REQ_F_HARDLINK          65536   /* doesn't sever on completion < 0 */
514 #define REQ_F_FORCE_ASYNC       131072  /* IOSQE_ASYNC */
515 #define REQ_F_CUR_POS           262144  /* read/write uses file position */
516         u64                     user_data;
517         u32                     result;
518         u32                     sequence;
519
520         struct list_head        inflight_entry;
521
522         struct io_wq_work       work;
523 };
524
525 #define IO_PLUG_THRESHOLD               2
526 #define IO_IOPOLL_BATCH                 8
527
528 struct io_submit_state {
529         struct blk_plug         plug;
530
531         /*
532          * io_kiocb alloc cache
533          */
534         void                    *reqs[IO_IOPOLL_BATCH];
535         unsigned                int free_reqs;
536         unsigned                int cur_req;
537
538         /*
539          * File reference cache
540          */
541         struct file             *file;
542         unsigned int            fd;
543         unsigned int            has_refs;
544         unsigned int            used_refs;
545         unsigned int            ios_left;
546 };
547
548 struct io_op_def {
549         /* needs req->io allocated for deferral/async */
550         unsigned                async_ctx : 1;
551         /* needs current->mm setup, does mm access */
552         unsigned                needs_mm : 1;
553         /* needs req->file assigned */
554         unsigned                needs_file : 1;
555         /* needs req->file assigned IFF fd is >= 0 */
556         unsigned                fd_non_neg : 1;
557         /* hash wq insertion if file is a regular file */
558         unsigned                hash_reg_file : 1;
559         /* unbound wq insertion if file is a non-regular file */
560         unsigned                unbound_nonreg_file : 1;
561 };
562
563 static const struct io_op_def io_op_defs[] = {
564         {
565                 /* IORING_OP_NOP */
566         },
567         {
568                 /* IORING_OP_READV */
569                 .async_ctx              = 1,
570                 .needs_mm               = 1,
571                 .needs_file             = 1,
572                 .unbound_nonreg_file    = 1,
573         },
574         {
575                 /* IORING_OP_WRITEV */
576                 .async_ctx              = 1,
577                 .needs_mm               = 1,
578                 .needs_file             = 1,
579                 .hash_reg_file          = 1,
580                 .unbound_nonreg_file    = 1,
581         },
582         {
583                 /* IORING_OP_FSYNC */
584                 .needs_file             = 1,
585         },
586         {
587                 /* IORING_OP_READ_FIXED */
588                 .needs_file             = 1,
589                 .unbound_nonreg_file    = 1,
590         },
591         {
592                 /* IORING_OP_WRITE_FIXED */
593                 .needs_file             = 1,
594                 .hash_reg_file          = 1,
595                 .unbound_nonreg_file    = 1,
596         },
597         {
598                 /* IORING_OP_POLL_ADD */
599                 .needs_file             = 1,
600                 .unbound_nonreg_file    = 1,
601         },
602         {
603                 /* IORING_OP_POLL_REMOVE */
604         },
605         {
606                 /* IORING_OP_SYNC_FILE_RANGE */
607                 .needs_file             = 1,
608         },
609         {
610                 /* IORING_OP_SENDMSG */
611                 .async_ctx              = 1,
612                 .needs_mm               = 1,
613                 .needs_file             = 1,
614                 .unbound_nonreg_file    = 1,
615         },
616         {
617                 /* IORING_OP_RECVMSG */
618                 .async_ctx              = 1,
619                 .needs_mm               = 1,
620                 .needs_file             = 1,
621                 .unbound_nonreg_file    = 1,
622         },
623         {
624                 /* IORING_OP_TIMEOUT */
625                 .async_ctx              = 1,
626                 .needs_mm               = 1,
627         },
628         {
629                 /* IORING_OP_TIMEOUT_REMOVE */
630         },
631         {
632                 /* IORING_OP_ACCEPT */
633                 .needs_mm               = 1,
634                 .needs_file             = 1,
635                 .unbound_nonreg_file    = 1,
636         },
637         {
638                 /* IORING_OP_ASYNC_CANCEL */
639         },
640         {
641                 /* IORING_OP_LINK_TIMEOUT */
642                 .async_ctx              = 1,
643                 .needs_mm               = 1,
644         },
645         {
646                 /* IORING_OP_CONNECT */
647                 .async_ctx              = 1,
648                 .needs_mm               = 1,
649                 .needs_file             = 1,
650                 .unbound_nonreg_file    = 1,
651         },
652         {
653                 /* IORING_OP_FALLOCATE */
654                 .needs_file             = 1,
655         },
656         {
657                 /* IORING_OP_OPENAT */
658                 .needs_file             = 1,
659                 .fd_non_neg             = 1,
660         },
661         {
662                 /* IORING_OP_CLOSE */
663                 .needs_file             = 1,
664         },
665         {
666                 /* IORING_OP_FILES_UPDATE */
667                 .needs_mm               = 1,
668         },
669         {
670                 /* IORING_OP_STATX */
671                 .needs_mm               = 1,
672                 .needs_file             = 1,
673                 .fd_non_neg             = 1,
674         },
675         {
676                 /* IORING_OP_READ */
677                 .needs_mm               = 1,
678                 .needs_file             = 1,
679                 .unbound_nonreg_file    = 1,
680         },
681         {
682                 /* IORING_OP_WRITE */
683                 .needs_mm               = 1,
684                 .needs_file             = 1,
685                 .unbound_nonreg_file    = 1,
686         },
687         {
688                 /* IORING_OP_FADVISE */
689                 .needs_file             = 1,
690         },
691         {
692                 /* IORING_OP_MADVISE */
693                 .needs_mm               = 1,
694         },
695 };
696
697 static void io_wq_submit_work(struct io_wq_work **workptr);
698 static void io_cqring_fill_event(struct io_kiocb *req, long res);
699 static void io_put_req(struct io_kiocb *req);
700 static void __io_double_put_req(struct io_kiocb *req);
701 static struct io_kiocb *io_prep_linked_timeout(struct io_kiocb *req);
702 static void io_queue_linked_timeout(struct io_kiocb *req);
703 static int __io_sqe_files_update(struct io_ring_ctx *ctx,
704                                  struct io_uring_files_update *ip,
705                                  unsigned nr_args);
706
707 static struct kmem_cache *req_cachep;
708
709 static const struct file_operations io_uring_fops;
710
711 struct sock *io_uring_get_socket(struct file *file)
712 {
713 #if defined(CONFIG_UNIX)
714         if (file->f_op == &io_uring_fops) {
715                 struct io_ring_ctx *ctx = file->private_data;
716
717                 return ctx->ring_sock->sk;
718         }
719 #endif
720         return NULL;
721 }
722 EXPORT_SYMBOL(io_uring_get_socket);
723
724 static void io_ring_ctx_ref_free(struct percpu_ref *ref)
725 {
726         struct io_ring_ctx *ctx = container_of(ref, struct io_ring_ctx, refs);
727
728         complete(&ctx->completions[0]);
729 }
730
731 static struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
732 {
733         struct io_ring_ctx *ctx;
734         int hash_bits;
735
736         ctx = kzalloc(sizeof(*ctx), GFP_KERNEL);
737         if (!ctx)
738                 return NULL;
739
740         ctx->fallback_req = kmem_cache_alloc(req_cachep, GFP_KERNEL);
741         if (!ctx->fallback_req)
742                 goto err;
743
744         ctx->completions = kmalloc(2 * sizeof(struct completion), GFP_KERNEL);
745         if (!ctx->completions)
746                 goto err;
747
748         /*
749          * Use 5 bits less than the max cq entries, that should give us around
750          * 32 entries per hash list if totally full and uniformly spread.
751          */
752         hash_bits = ilog2(p->cq_entries);
753         hash_bits -= 5;
754         if (hash_bits <= 0)
755                 hash_bits = 1;
756         ctx->cancel_hash_bits = hash_bits;
757         ctx->cancel_hash = kmalloc((1U << hash_bits) * sizeof(struct hlist_head),
758                                         GFP_KERNEL);
759         if (!ctx->cancel_hash)
760                 goto err;
761         __hash_init(ctx->cancel_hash, 1U << hash_bits);
762
763         if (percpu_ref_init(&ctx->refs, io_ring_ctx_ref_free,
764                             PERCPU_REF_ALLOW_REINIT, GFP_KERNEL))
765                 goto err;
766
767         ctx->flags = p->flags;
768         init_waitqueue_head(&ctx->cq_wait);
769         INIT_LIST_HEAD(&ctx->cq_overflow_list);
770         init_completion(&ctx->completions[0]);
771         init_completion(&ctx->completions[1]);
772         mutex_init(&ctx->uring_lock);
773         init_waitqueue_head(&ctx->wait);
774         spin_lock_init(&ctx->completion_lock);
775         init_llist_head(&ctx->poll_llist);
776         INIT_LIST_HEAD(&ctx->poll_list);
777         INIT_LIST_HEAD(&ctx->defer_list);
778         INIT_LIST_HEAD(&ctx->timeout_list);
779         init_waitqueue_head(&ctx->inflight_wait);
780         spin_lock_init(&ctx->inflight_lock);
781         INIT_LIST_HEAD(&ctx->inflight_list);
782         return ctx;
783 err:
784         if (ctx->fallback_req)
785                 kmem_cache_free(req_cachep, ctx->fallback_req);
786         kfree(ctx->completions);
787         kfree(ctx->cancel_hash);
788         kfree(ctx);
789         return NULL;
790 }
791
792 static inline bool __req_need_defer(struct io_kiocb *req)
793 {
794         struct io_ring_ctx *ctx = req->ctx;
795
796         return req->sequence != ctx->cached_cq_tail + ctx->cached_sq_dropped
797                                         + atomic_read(&ctx->cached_cq_overflow);
798 }
799
800 static inline bool req_need_defer(struct io_kiocb *req)
801 {
802         if ((req->flags & (REQ_F_IO_DRAIN|REQ_F_IO_DRAINED)) == REQ_F_IO_DRAIN)
803                 return __req_need_defer(req);
804
805         return false;
806 }
807
808 static struct io_kiocb *io_get_deferred_req(struct io_ring_ctx *ctx)
809 {
810         struct io_kiocb *req;
811
812         req = list_first_entry_or_null(&ctx->defer_list, struct io_kiocb, list);
813         if (req && !req_need_defer(req)) {
814                 list_del_init(&req->list);
815                 return req;
816         }
817
818         return NULL;
819 }
820
821 static struct io_kiocb *io_get_timeout_req(struct io_ring_ctx *ctx)
822 {
823         struct io_kiocb *req;
824
825         req = list_first_entry_or_null(&ctx->timeout_list, struct io_kiocb, list);
826         if (req) {
827                 if (req->flags & REQ_F_TIMEOUT_NOSEQ)
828                         return NULL;
829                 if (!__req_need_defer(req)) {
830                         list_del_init(&req->list);
831                         return req;
832                 }
833         }
834
835         return NULL;
836 }
837
838 static void __io_commit_cqring(struct io_ring_ctx *ctx)
839 {
840         struct io_rings *rings = ctx->rings;
841
842         if (ctx->cached_cq_tail != READ_ONCE(rings->cq.tail)) {
843                 /* order cqe stores with ring update */
844                 smp_store_release(&rings->cq.tail, ctx->cached_cq_tail);
845
846                 if (wq_has_sleeper(&ctx->cq_wait)) {
847                         wake_up_interruptible(&ctx->cq_wait);
848                         kill_fasync(&ctx->cq_fasync, SIGIO, POLL_IN);
849                 }
850         }
851 }
852
853 static inline bool io_prep_async_work(struct io_kiocb *req,
854                                       struct io_kiocb **link)
855 {
856         const struct io_op_def *def = &io_op_defs[req->opcode];
857         bool do_hashed = false;
858
859         if (req->flags & REQ_F_ISREG) {
860                 if (def->hash_reg_file)
861                         do_hashed = true;
862         } else {
863                 if (def->unbound_nonreg_file)
864                         req->work.flags |= IO_WQ_WORK_UNBOUND;
865         }
866         if (def->needs_mm)
867                 req->work.flags |= IO_WQ_WORK_NEEDS_USER;
868
869         *link = io_prep_linked_timeout(req);
870         return do_hashed;
871 }
872
873 static inline void io_queue_async_work(struct io_kiocb *req)
874 {
875         struct io_ring_ctx *ctx = req->ctx;
876         struct io_kiocb *link;
877         bool do_hashed;
878
879         do_hashed = io_prep_async_work(req, &link);
880
881         trace_io_uring_queue_async_work(ctx, do_hashed, req, &req->work,
882                                         req->flags);
883         if (!do_hashed) {
884                 io_wq_enqueue(ctx->io_wq, &req->work);
885         } else {
886                 io_wq_enqueue_hashed(ctx->io_wq, &req->work,
887                                         file_inode(req->file));
888         }
889
890         if (link)
891                 io_queue_linked_timeout(link);
892 }
893
894 static void io_kill_timeout(struct io_kiocb *req)
895 {
896         int ret;
897
898         ret = hrtimer_try_to_cancel(&req->io->timeout.timer);
899         if (ret != -1) {
900                 atomic_inc(&req->ctx->cq_timeouts);
901                 list_del_init(&req->list);
902                 io_cqring_fill_event(req, 0);
903                 io_put_req(req);
904         }
905 }
906
907 static void io_kill_timeouts(struct io_ring_ctx *ctx)
908 {
909         struct io_kiocb *req, *tmp;
910
911         spin_lock_irq(&ctx->completion_lock);
912         list_for_each_entry_safe(req, tmp, &ctx->timeout_list, list)
913                 io_kill_timeout(req);
914         spin_unlock_irq(&ctx->completion_lock);
915 }
916
917 static void io_commit_cqring(struct io_ring_ctx *ctx)
918 {
919         struct io_kiocb *req;
920
921         while ((req = io_get_timeout_req(ctx)) != NULL)
922                 io_kill_timeout(req);
923
924         __io_commit_cqring(ctx);
925
926         while ((req = io_get_deferred_req(ctx)) != NULL) {
927                 req->flags |= REQ_F_IO_DRAINED;
928                 io_queue_async_work(req);
929         }
930 }
931
932 static struct io_uring_cqe *io_get_cqring(struct io_ring_ctx *ctx)
933 {
934         struct io_rings *rings = ctx->rings;
935         unsigned tail;
936
937         tail = ctx->cached_cq_tail;
938         /*
939          * writes to the cq entry need to come after reading head; the
940          * control dependency is enough as we're using WRITE_ONCE to
941          * fill the cq entry
942          */
943         if (tail - READ_ONCE(rings->cq.head) == rings->cq_ring_entries)
944                 return NULL;
945
946         ctx->cached_cq_tail++;
947         return &rings->cqes[tail & ctx->cq_mask];
948 }
949
950 static void io_cqring_ev_posted(struct io_ring_ctx *ctx)
951 {
952         if (waitqueue_active(&ctx->wait))
953                 wake_up(&ctx->wait);
954         if (waitqueue_active(&ctx->sqo_wait))
955                 wake_up(&ctx->sqo_wait);
956         if (ctx->cq_ev_fd)
957                 eventfd_signal(ctx->cq_ev_fd, 1);
958 }
959
960 /* Returns true if there are no backlogged entries after the flush */
961 static bool io_cqring_overflow_flush(struct io_ring_ctx *ctx, bool force)
962 {
963         struct io_rings *rings = ctx->rings;
964         struct io_uring_cqe *cqe;
965         struct io_kiocb *req;
966         unsigned long flags;
967         LIST_HEAD(list);
968
969         if (!force) {
970                 if (list_empty_careful(&ctx->cq_overflow_list))
971                         return true;
972                 if ((ctx->cached_cq_tail - READ_ONCE(rings->cq.head) ==
973                     rings->cq_ring_entries))
974                         return false;
975         }
976
977         spin_lock_irqsave(&ctx->completion_lock, flags);
978
979         /* if force is set, the ring is going away. always drop after that */
980         if (force)
981                 ctx->cq_overflow_flushed = true;
982
983         cqe = NULL;
984         while (!list_empty(&ctx->cq_overflow_list)) {
985                 cqe = io_get_cqring(ctx);
986                 if (!cqe && !force)
987                         break;
988
989                 req = list_first_entry(&ctx->cq_overflow_list, struct io_kiocb,
990                                                 list);
991                 list_move(&req->list, &list);
992                 if (cqe) {
993                         WRITE_ONCE(cqe->user_data, req->user_data);
994                         WRITE_ONCE(cqe->res, req->result);
995                         WRITE_ONCE(cqe->flags, 0);
996                 } else {
997                         WRITE_ONCE(ctx->rings->cq_overflow,
998                                 atomic_inc_return(&ctx->cached_cq_overflow));
999                 }
1000         }
1001
1002         io_commit_cqring(ctx);
1003         if (cqe) {
1004                 clear_bit(0, &ctx->sq_check_overflow);
1005                 clear_bit(0, &ctx->cq_check_overflow);
1006         }
1007         spin_unlock_irqrestore(&ctx->completion_lock, flags);
1008         io_cqring_ev_posted(ctx);
1009
1010         while (!list_empty(&list)) {
1011                 req = list_first_entry(&list, struct io_kiocb, list);
1012                 list_del(&req->list);
1013                 io_put_req(req);
1014         }
1015
1016         return cqe != NULL;
1017 }
1018
1019 static void io_cqring_fill_event(struct io_kiocb *req, long res)
1020 {
1021         struct io_ring_ctx *ctx = req->ctx;
1022         struct io_uring_cqe *cqe;
1023
1024         trace_io_uring_complete(ctx, req->user_data, res);
1025
1026         /*
1027          * If we can't get a cq entry, userspace overflowed the
1028          * submission (by quite a lot). Increment the overflow count in
1029          * the ring.
1030          */
1031         cqe = io_get_cqring(ctx);
1032         if (likely(cqe)) {
1033                 WRITE_ONCE(cqe->user_data, req->user_data);
1034                 WRITE_ONCE(cqe->res, res);
1035                 WRITE_ONCE(cqe->flags, 0);
1036         } else if (ctx->cq_overflow_flushed) {
1037                 WRITE_ONCE(ctx->rings->cq_overflow,
1038                                 atomic_inc_return(&ctx->cached_cq_overflow));
1039         } else {
1040                 if (list_empty(&ctx->cq_overflow_list)) {
1041                         set_bit(0, &ctx->sq_check_overflow);
1042                         set_bit(0, &ctx->cq_check_overflow);
1043                 }
1044                 refcount_inc(&req->refs);
1045                 req->result = res;
1046                 list_add_tail(&req->list, &ctx->cq_overflow_list);
1047         }
1048 }
1049
1050 static void io_cqring_add_event(struct io_kiocb *req, long res)
1051 {
1052         struct io_ring_ctx *ctx = req->ctx;
1053         unsigned long flags;
1054
1055         spin_lock_irqsave(&ctx->completion_lock, flags);
1056         io_cqring_fill_event(req, res);
1057         io_commit_cqring(ctx);
1058         spin_unlock_irqrestore(&ctx->completion_lock, flags);
1059
1060         io_cqring_ev_posted(ctx);
1061 }
1062
1063 static inline bool io_is_fallback_req(struct io_kiocb *req)
1064 {
1065         return req == (struct io_kiocb *)
1066                         ((unsigned long) req->ctx->fallback_req & ~1UL);
1067 }
1068
1069 static struct io_kiocb *io_get_fallback_req(struct io_ring_ctx *ctx)
1070 {
1071         struct io_kiocb *req;
1072
1073         req = ctx->fallback_req;
1074         if (!test_and_set_bit_lock(0, (unsigned long *) ctx->fallback_req))
1075                 return req;
1076
1077         return NULL;
1078 }
1079
1080 static struct io_kiocb *io_get_req(struct io_ring_ctx *ctx,
1081                                    struct io_submit_state *state)
1082 {
1083         gfp_t gfp = GFP_KERNEL | __GFP_NOWARN;
1084         struct io_kiocb *req;
1085
1086         if (!state) {
1087                 req = kmem_cache_alloc(req_cachep, gfp);
1088                 if (unlikely(!req))
1089                         goto fallback;
1090         } else if (!state->free_reqs) {
1091                 size_t sz;
1092                 int ret;
1093
1094                 sz = min_t(size_t, state->ios_left, ARRAY_SIZE(state->reqs));
1095                 ret = kmem_cache_alloc_bulk(req_cachep, gfp, sz, state->reqs);
1096
1097                 /*
1098                  * Bulk alloc is all-or-nothing. If we fail to get a batch,
1099                  * retry single alloc to be on the safe side.
1100                  */
1101                 if (unlikely(ret <= 0)) {
1102                         state->reqs[0] = kmem_cache_alloc(req_cachep, gfp);
1103                         if (!state->reqs[0])
1104                                 goto fallback;
1105                         ret = 1;
1106                 }
1107                 state->free_reqs = ret - 1;
1108                 state->cur_req = 1;
1109                 req = state->reqs[0];
1110         } else {
1111                 req = state->reqs[state->cur_req];
1112                 state->free_reqs--;
1113                 state->cur_req++;
1114         }
1115
1116 got_it:
1117         req->io = NULL;
1118         req->ring_file = NULL;
1119         req->file = NULL;
1120         req->ctx = ctx;
1121         req->flags = 0;
1122         /* one is dropped after submission, the other at completion */
1123         refcount_set(&req->refs, 2);
1124         req->result = 0;
1125         INIT_IO_WORK(&req->work, io_wq_submit_work);
1126         return req;
1127 fallback:
1128         req = io_get_fallback_req(ctx);
1129         if (req)
1130                 goto got_it;
1131         percpu_ref_put(&ctx->refs);
1132         return NULL;
1133 }
1134
1135 static void __io_req_do_free(struct io_kiocb *req)
1136 {
1137         if (likely(!io_is_fallback_req(req)))
1138                 kmem_cache_free(req_cachep, req);
1139         else
1140                 clear_bit_unlock(0, (unsigned long *) req->ctx->fallback_req);
1141 }
1142
1143 static void __io_req_aux_free(struct io_kiocb *req)
1144 {
1145         struct io_ring_ctx *ctx = req->ctx;
1146
1147         if (req->io)
1148                 kfree(req->io);
1149         if (req->file) {
1150                 if (req->flags & REQ_F_FIXED_FILE)
1151                         percpu_ref_put(&ctx->file_data->refs);
1152                 else
1153                         fput(req->file);
1154         }
1155 }
1156
1157 static void __io_free_req(struct io_kiocb *req)
1158 {
1159         __io_req_aux_free(req);
1160
1161         if (req->flags & REQ_F_INFLIGHT) {
1162                 struct io_ring_ctx *ctx = req->ctx;
1163                 unsigned long flags;
1164
1165                 spin_lock_irqsave(&ctx->inflight_lock, flags);
1166                 list_del(&req->inflight_entry);
1167                 if (waitqueue_active(&ctx->inflight_wait))
1168                         wake_up(&ctx->inflight_wait);
1169                 spin_unlock_irqrestore(&ctx->inflight_lock, flags);
1170         }
1171
1172         percpu_ref_put(&req->ctx->refs);
1173         __io_req_do_free(req);
1174 }
1175
1176 struct req_batch {
1177         void *reqs[IO_IOPOLL_BATCH];
1178         int to_free;
1179         int need_iter;
1180 };
1181
1182 static void io_free_req_many(struct io_ring_ctx *ctx, struct req_batch *rb)
1183 {
1184         if (!rb->to_free)
1185                 return;
1186         if (rb->need_iter) {
1187                 int i, inflight = 0;
1188                 unsigned long flags;
1189
1190                 for (i = 0; i < rb->to_free; i++) {
1191                         struct io_kiocb *req = rb->reqs[i];
1192
1193                         if (req->flags & REQ_F_FIXED_FILE)
1194                                 req->file = NULL;
1195                         if (req->flags & REQ_F_INFLIGHT)
1196                                 inflight++;
1197                         else
1198                                 rb->reqs[i] = NULL;
1199                         __io_req_aux_free(req);
1200                 }
1201                 if (!inflight)
1202                         goto do_free;
1203
1204                 spin_lock_irqsave(&ctx->inflight_lock, flags);
1205                 for (i = 0; i < rb->to_free; i++) {
1206                         struct io_kiocb *req = rb->reqs[i];
1207
1208                         if (req) {
1209                                 list_del(&req->inflight_entry);
1210                                 if (!--inflight)
1211                                         break;
1212                         }
1213                 }
1214                 spin_unlock_irqrestore(&ctx->inflight_lock, flags);
1215
1216                 if (waitqueue_active(&ctx->inflight_wait))
1217                         wake_up(&ctx->inflight_wait);
1218         }
1219 do_free:
1220         kmem_cache_free_bulk(req_cachep, rb->to_free, rb->reqs);
1221         percpu_ref_put_many(&ctx->refs, rb->to_free);
1222         percpu_ref_put_many(&ctx->file_data->refs, rb->to_free);
1223         rb->to_free = rb->need_iter = 0;
1224 }
1225
1226 static bool io_link_cancel_timeout(struct io_kiocb *req)
1227 {
1228         struct io_ring_ctx *ctx = req->ctx;
1229         int ret;
1230
1231         ret = hrtimer_try_to_cancel(&req->io->timeout.timer);
1232         if (ret != -1) {
1233                 io_cqring_fill_event(req, -ECANCELED);
1234                 io_commit_cqring(ctx);
1235                 req->flags &= ~REQ_F_LINK;
1236                 io_put_req(req);
1237                 return true;
1238         }
1239
1240         return false;
1241 }
1242
1243 static void io_req_link_next(struct io_kiocb *req, struct io_kiocb **nxtptr)
1244 {
1245         struct io_ring_ctx *ctx = req->ctx;
1246         bool wake_ev = false;
1247
1248         /* Already got next link */
1249         if (req->flags & REQ_F_LINK_NEXT)
1250                 return;
1251
1252         /*
1253          * The list should never be empty when we are called here. But could
1254          * potentially happen if the chain is messed up, check to be on the
1255          * safe side.
1256          */
1257         while (!list_empty(&req->link_list)) {
1258                 struct io_kiocb *nxt = list_first_entry(&req->link_list,
1259                                                 struct io_kiocb, link_list);
1260
1261                 if (unlikely((req->flags & REQ_F_LINK_TIMEOUT) &&
1262                              (nxt->flags & REQ_F_TIMEOUT))) {
1263                         list_del_init(&nxt->link_list);
1264                         wake_ev |= io_link_cancel_timeout(nxt);
1265                         req->flags &= ~REQ_F_LINK_TIMEOUT;
1266                         continue;
1267                 }
1268
1269                 list_del_init(&req->link_list);
1270                 if (!list_empty(&nxt->link_list))
1271                         nxt->flags |= REQ_F_LINK;
1272                 *nxtptr = nxt;
1273                 break;
1274         }
1275
1276         req->flags |= REQ_F_LINK_NEXT;
1277         if (wake_ev)
1278                 io_cqring_ev_posted(ctx);
1279 }
1280
1281 /*
1282  * Called if REQ_F_LINK is set, and we fail the head request
1283  */
1284 static void io_fail_links(struct io_kiocb *req)
1285 {
1286         struct io_ring_ctx *ctx = req->ctx;
1287         unsigned long flags;
1288
1289         spin_lock_irqsave(&ctx->completion_lock, flags);
1290
1291         while (!list_empty(&req->link_list)) {
1292                 struct io_kiocb *link = list_first_entry(&req->link_list,
1293                                                 struct io_kiocb, link_list);
1294
1295                 list_del_init(&link->link_list);
1296                 trace_io_uring_fail_link(req, link);
1297
1298                 if ((req->flags & REQ_F_LINK_TIMEOUT) &&
1299                     link->opcode == IORING_OP_LINK_TIMEOUT) {
1300                         io_link_cancel_timeout(link);
1301                 } else {
1302                         io_cqring_fill_event(link, -ECANCELED);
1303                         __io_double_put_req(link);
1304                 }
1305                 req->flags &= ~REQ_F_LINK_TIMEOUT;
1306         }
1307
1308         io_commit_cqring(ctx);
1309         spin_unlock_irqrestore(&ctx->completion_lock, flags);
1310         io_cqring_ev_posted(ctx);
1311 }
1312
1313 static void io_req_find_next(struct io_kiocb *req, struct io_kiocb **nxt)
1314 {
1315         if (likely(!(req->flags & REQ_F_LINK)))
1316                 return;
1317
1318         /*
1319          * If LINK is set, we have dependent requests in this chain. If we
1320          * didn't fail this request, queue the first one up, moving any other
1321          * dependencies to the next request. In case of failure, fail the rest
1322          * of the chain.
1323          */
1324         if (req->flags & REQ_F_FAIL_LINK) {
1325                 io_fail_links(req);
1326         } else if ((req->flags & (REQ_F_LINK_TIMEOUT | REQ_F_COMP_LOCKED)) ==
1327                         REQ_F_LINK_TIMEOUT) {
1328                 struct io_ring_ctx *ctx = req->ctx;
1329                 unsigned long flags;
1330
1331                 /*
1332                  * If this is a timeout link, we could be racing with the
1333                  * timeout timer. Grab the completion lock for this case to
1334                  * protect against that.
1335                  */
1336                 spin_lock_irqsave(&ctx->completion_lock, flags);
1337                 io_req_link_next(req, nxt);
1338                 spin_unlock_irqrestore(&ctx->completion_lock, flags);
1339         } else {
1340                 io_req_link_next(req, nxt);
1341         }
1342 }
1343
1344 static void io_free_req(struct io_kiocb *req)
1345 {
1346         struct io_kiocb *nxt = NULL;
1347
1348         io_req_find_next(req, &nxt);
1349         __io_free_req(req);
1350
1351         if (nxt)
1352                 io_queue_async_work(nxt);
1353 }
1354
1355 /*
1356  * Drop reference to request, return next in chain (if there is one) if this
1357  * was the last reference to this request.
1358  */
1359 __attribute__((nonnull))
1360 static void io_put_req_find_next(struct io_kiocb *req, struct io_kiocb **nxtptr)
1361 {
1362         io_req_find_next(req, nxtptr);
1363
1364         if (refcount_dec_and_test(&req->refs))
1365                 __io_free_req(req);
1366 }
1367
1368 static void io_put_req(struct io_kiocb *req)
1369 {
1370         if (refcount_dec_and_test(&req->refs))
1371                 io_free_req(req);
1372 }
1373
1374 /*
1375  * Must only be used if we don't need to care about links, usually from
1376  * within the completion handling itself.
1377  */
1378 static void __io_double_put_req(struct io_kiocb *req)
1379 {
1380         /* drop both submit and complete references */
1381         if (refcount_sub_and_test(2, &req->refs))
1382                 __io_free_req(req);
1383 }
1384
1385 static void io_double_put_req(struct io_kiocb *req)
1386 {
1387         /* drop both submit and complete references */
1388         if (refcount_sub_and_test(2, &req->refs))
1389                 io_free_req(req);
1390 }
1391
1392 static unsigned io_cqring_events(struct io_ring_ctx *ctx, bool noflush)
1393 {
1394         struct io_rings *rings = ctx->rings;
1395
1396         if (test_bit(0, &ctx->cq_check_overflow)) {
1397                 /*
1398                  * noflush == true is from the waitqueue handler, just ensure
1399                  * we wake up the task, and the next invocation will flush the
1400                  * entries. We cannot safely to it from here.
1401                  */
1402                 if (noflush && !list_empty(&ctx->cq_overflow_list))
1403                         return -1U;
1404
1405                 io_cqring_overflow_flush(ctx, false);
1406         }
1407
1408         /* See comment at the top of this file */
1409         smp_rmb();
1410         return ctx->cached_cq_tail - READ_ONCE(rings->cq.head);
1411 }
1412
1413 static inline unsigned int io_sqring_entries(struct io_ring_ctx *ctx)
1414 {
1415         struct io_rings *rings = ctx->rings;
1416
1417         /* make sure SQ entry isn't read before tail */
1418         return smp_load_acquire(&rings->sq.tail) - ctx->cached_sq_head;
1419 }
1420
1421 static inline bool io_req_multi_free(struct req_batch *rb, struct io_kiocb *req)
1422 {
1423         if ((req->flags & REQ_F_LINK) || io_is_fallback_req(req))
1424                 return false;
1425
1426         if (!(req->flags & REQ_F_FIXED_FILE) || req->io)
1427                 rb->need_iter++;
1428
1429         rb->reqs[rb->to_free++] = req;
1430         if (unlikely(rb->to_free == ARRAY_SIZE(rb->reqs)))
1431                 io_free_req_many(req->ctx, rb);
1432         return true;
1433 }
1434
1435 /*
1436  * Find and free completed poll iocbs
1437  */
1438 static void io_iopoll_complete(struct io_ring_ctx *ctx, unsigned int *nr_events,
1439                                struct list_head *done)
1440 {
1441         struct req_batch rb;
1442         struct io_kiocb *req;
1443
1444         rb.to_free = rb.need_iter = 0;
1445         while (!list_empty(done)) {
1446                 req = list_first_entry(done, struct io_kiocb, list);
1447                 list_del(&req->list);
1448
1449                 io_cqring_fill_event(req, req->result);
1450                 (*nr_events)++;
1451
1452                 if (refcount_dec_and_test(&req->refs) &&
1453                     !io_req_multi_free(&rb, req))
1454                         io_free_req(req);
1455         }
1456
1457         io_commit_cqring(ctx);
1458         io_free_req_many(ctx, &rb);
1459 }
1460
1461 static int io_do_iopoll(struct io_ring_ctx *ctx, unsigned int *nr_events,
1462                         long min)
1463 {
1464         struct io_kiocb *req, *tmp;
1465         LIST_HEAD(done);
1466         bool spin;
1467         int ret;
1468
1469         /*
1470          * Only spin for completions if we don't have multiple devices hanging
1471          * off our complete list, and we're under the requested amount.
1472          */
1473         spin = !ctx->poll_multi_file && *nr_events < min;
1474
1475         ret = 0;
1476         list_for_each_entry_safe(req, tmp, &ctx->poll_list, list) {
1477                 struct kiocb *kiocb = &req->rw.kiocb;
1478
1479                 /*
1480                  * Move completed entries to our local list. If we find a
1481                  * request that requires polling, break out and complete
1482                  * the done list first, if we have entries there.
1483                  */
1484                 if (req->flags & REQ_F_IOPOLL_COMPLETED) {
1485                         list_move_tail(&req->list, &done);
1486                         continue;
1487                 }
1488                 if (!list_empty(&done))
1489                         break;
1490
1491                 ret = kiocb->ki_filp->f_op->iopoll(kiocb, spin);
1492                 if (ret < 0)
1493                         break;
1494
1495                 if (ret && spin)
1496                         spin = false;
1497                 ret = 0;
1498         }
1499
1500         if (!list_empty(&done))
1501                 io_iopoll_complete(ctx, nr_events, &done);
1502
1503         return ret;
1504 }
1505
1506 /*
1507  * Poll for a minimum of 'min' events. Note that if min == 0 we consider that a
1508  * non-spinning poll check - we'll still enter the driver poll loop, but only
1509  * as a non-spinning completion check.
1510  */
1511 static int io_iopoll_getevents(struct io_ring_ctx *ctx, unsigned int *nr_events,
1512                                 long min)
1513 {
1514         while (!list_empty(&ctx->poll_list) && !need_resched()) {
1515                 int ret;
1516
1517                 ret = io_do_iopoll(ctx, nr_events, min);
1518                 if (ret < 0)
1519                         return ret;
1520                 if (!min || *nr_events >= min)
1521                         return 0;
1522         }
1523
1524         return 1;
1525 }
1526
1527 /*
1528  * We can't just wait for polled events to come to us, we have to actively
1529  * find and complete them.
1530  */
1531 static void io_iopoll_reap_events(struct io_ring_ctx *ctx)
1532 {
1533         if (!(ctx->flags & IORING_SETUP_IOPOLL))
1534                 return;
1535
1536         mutex_lock(&ctx->uring_lock);
1537         while (!list_empty(&ctx->poll_list)) {
1538                 unsigned int nr_events = 0;
1539
1540                 io_iopoll_getevents(ctx, &nr_events, 1);
1541
1542                 /*
1543                  * Ensure we allow local-to-the-cpu processing to take place,
1544                  * in this case we need to ensure that we reap all events.
1545                  */
1546                 cond_resched();
1547         }
1548         mutex_unlock(&ctx->uring_lock);
1549 }
1550
1551 static int __io_iopoll_check(struct io_ring_ctx *ctx, unsigned *nr_events,
1552                             long min)
1553 {
1554         int iters = 0, ret = 0;
1555
1556         do {
1557                 int tmin = 0;
1558
1559                 /*
1560                  * Don't enter poll loop if we already have events pending.
1561                  * If we do, we can potentially be spinning for commands that
1562                  * already triggered a CQE (eg in error).
1563                  */
1564                 if (io_cqring_events(ctx, false))
1565                         break;
1566
1567                 /*
1568                  * If a submit got punted to a workqueue, we can have the
1569                  * application entering polling for a command before it gets
1570                  * issued. That app will hold the uring_lock for the duration
1571                  * of the poll right here, so we need to take a breather every
1572                  * now and then to ensure that the issue has a chance to add
1573                  * the poll to the issued list. Otherwise we can spin here
1574                  * forever, while the workqueue is stuck trying to acquire the
1575                  * very same mutex.
1576                  */
1577                 if (!(++iters & 7)) {
1578                         mutex_unlock(&ctx->uring_lock);
1579                         mutex_lock(&ctx->uring_lock);
1580                 }
1581
1582                 if (*nr_events < min)
1583                         tmin = min - *nr_events;
1584
1585                 ret = io_iopoll_getevents(ctx, nr_events, tmin);
1586                 if (ret <= 0)
1587                         break;
1588                 ret = 0;
1589         } while (min && !*nr_events && !need_resched());
1590
1591         return ret;
1592 }
1593
1594 static int io_iopoll_check(struct io_ring_ctx *ctx, unsigned *nr_events,
1595                            long min)
1596 {
1597         int ret;
1598
1599         /*
1600          * We disallow the app entering submit/complete with polling, but we
1601          * still need to lock the ring to prevent racing with polled issue
1602          * that got punted to a workqueue.
1603          */
1604         mutex_lock(&ctx->uring_lock);
1605         ret = __io_iopoll_check(ctx, nr_events, min);
1606         mutex_unlock(&ctx->uring_lock);
1607         return ret;
1608 }
1609
1610 static void kiocb_end_write(struct io_kiocb *req)
1611 {
1612         /*
1613          * Tell lockdep we inherited freeze protection from submission
1614          * thread.
1615          */
1616         if (req->flags & REQ_F_ISREG) {
1617                 struct inode *inode = file_inode(req->file);
1618
1619                 __sb_writers_acquired(inode->i_sb, SB_FREEZE_WRITE);
1620         }
1621         file_end_write(req->file);
1622 }
1623
1624 static inline void req_set_fail_links(struct io_kiocb *req)
1625 {
1626         if ((req->flags & (REQ_F_LINK | REQ_F_HARDLINK)) == REQ_F_LINK)
1627                 req->flags |= REQ_F_FAIL_LINK;
1628 }
1629
1630 static void io_complete_rw_common(struct kiocb *kiocb, long res)
1631 {
1632         struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw.kiocb);
1633
1634         if (kiocb->ki_flags & IOCB_WRITE)
1635                 kiocb_end_write(req);
1636
1637         if (res != req->result)
1638                 req_set_fail_links(req);
1639         io_cqring_add_event(req, res);
1640 }
1641
1642 static void io_complete_rw(struct kiocb *kiocb, long res, long res2)
1643 {
1644         struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw.kiocb);
1645
1646         io_complete_rw_common(kiocb, res);
1647         io_put_req(req);
1648 }
1649
1650 static struct io_kiocb *__io_complete_rw(struct kiocb *kiocb, long res)
1651 {
1652         struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw.kiocb);
1653         struct io_kiocb *nxt = NULL;
1654
1655         io_complete_rw_common(kiocb, res);
1656         io_put_req_find_next(req, &nxt);
1657
1658         return nxt;
1659 }
1660
1661 static void io_complete_rw_iopoll(struct kiocb *kiocb, long res, long res2)
1662 {
1663         struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw.kiocb);
1664
1665         if (kiocb->ki_flags & IOCB_WRITE)
1666                 kiocb_end_write(req);
1667
1668         if (res != req->result)
1669                 req_set_fail_links(req);
1670         req->result = res;
1671         if (res != -EAGAIN)
1672                 req->flags |= REQ_F_IOPOLL_COMPLETED;
1673 }
1674
1675 /*
1676  * After the iocb has been issued, it's safe to be found on the poll list.
1677  * Adding the kiocb to the list AFTER submission ensures that we don't
1678  * find it from a io_iopoll_getevents() thread before the issuer is done
1679  * accessing the kiocb cookie.
1680  */
1681 static void io_iopoll_req_issued(struct io_kiocb *req)
1682 {
1683         struct io_ring_ctx *ctx = req->ctx;
1684
1685         /*
1686          * Track whether we have multiple files in our lists. This will impact
1687          * how we do polling eventually, not spinning if we're on potentially
1688          * different devices.
1689          */
1690         if (list_empty(&ctx->poll_list)) {
1691                 ctx->poll_multi_file = false;
1692         } else if (!ctx->poll_multi_file) {
1693                 struct io_kiocb *list_req;
1694
1695                 list_req = list_first_entry(&ctx->poll_list, struct io_kiocb,
1696                                                 list);
1697                 if (list_req->file != req->file)
1698                         ctx->poll_multi_file = true;
1699         }
1700
1701         /*
1702          * For fast devices, IO may have already completed. If it has, add
1703          * it to the front so we find it first.
1704          */
1705         if (req->flags & REQ_F_IOPOLL_COMPLETED)
1706                 list_add(&req->list, &ctx->poll_list);
1707         else
1708                 list_add_tail(&req->list, &ctx->poll_list);
1709 }
1710
1711 static void io_file_put(struct io_submit_state *state)
1712 {
1713         if (state->file) {
1714                 int diff = state->has_refs - state->used_refs;
1715
1716                 if (diff)
1717                         fput_many(state->file, diff);
1718                 state->file = NULL;
1719         }
1720 }
1721
1722 /*
1723  * Get as many references to a file as we have IOs left in this submission,
1724  * assuming most submissions are for one file, or at least that each file
1725  * has more than one submission.
1726  */
1727 static struct file *io_file_get(struct io_submit_state *state, int fd)
1728 {
1729         if (!state)
1730                 return fget(fd);
1731
1732         if (state->file) {
1733                 if (state->fd == fd) {
1734                         state->used_refs++;
1735                         state->ios_left--;
1736                         return state->file;
1737                 }
1738                 io_file_put(state);
1739         }
1740         state->file = fget_many(fd, state->ios_left);
1741         if (!state->file)
1742                 return NULL;
1743
1744         state->fd = fd;
1745         state->has_refs = state->ios_left;
1746         state->used_refs = 1;
1747         state->ios_left--;
1748         return state->file;
1749 }
1750
1751 /*
1752  * If we tracked the file through the SCM inflight mechanism, we could support
1753  * any file. For now, just ensure that anything potentially problematic is done
1754  * inline.
1755  */
1756 static bool io_file_supports_async(struct file *file)
1757 {
1758         umode_t mode = file_inode(file)->i_mode;
1759
1760         if (S_ISBLK(mode) || S_ISCHR(mode) || S_ISSOCK(mode))
1761                 return true;
1762         if (S_ISREG(mode) && file->f_op != &io_uring_fops)
1763                 return true;
1764
1765         return false;
1766 }
1767
1768 static int io_prep_rw(struct io_kiocb *req, const struct io_uring_sqe *sqe,
1769                       bool force_nonblock)
1770 {
1771         struct io_ring_ctx *ctx = req->ctx;
1772         struct kiocb *kiocb = &req->rw.kiocb;
1773         unsigned ioprio;
1774         int ret;
1775
1776         if (!req->file)
1777                 return -EBADF;
1778
1779         if (S_ISREG(file_inode(req->file)->i_mode))
1780                 req->flags |= REQ_F_ISREG;
1781
1782         kiocb->ki_pos = READ_ONCE(sqe->off);
1783         if (kiocb->ki_pos == -1 && !(req->file->f_mode & FMODE_STREAM)) {
1784                 req->flags |= REQ_F_CUR_POS;
1785                 kiocb->ki_pos = req->file->f_pos;
1786         }
1787         kiocb->ki_flags = iocb_flags(kiocb->ki_filp);
1788         kiocb->ki_hint = ki_hint_validate(file_write_hint(kiocb->ki_filp));
1789
1790         ioprio = READ_ONCE(sqe->ioprio);
1791         if (ioprio) {
1792                 ret = ioprio_check_cap(ioprio);
1793                 if (ret)
1794                         return ret;
1795
1796                 kiocb->ki_ioprio = ioprio;
1797         } else
1798                 kiocb->ki_ioprio = get_current_ioprio();
1799
1800         ret = kiocb_set_rw_flags(kiocb, READ_ONCE(sqe->rw_flags));
1801         if (unlikely(ret))
1802                 return ret;
1803
1804         /* don't allow async punt if RWF_NOWAIT was requested */
1805         if ((kiocb->ki_flags & IOCB_NOWAIT) ||
1806             (req->file->f_flags & O_NONBLOCK))
1807                 req->flags |= REQ_F_NOWAIT;
1808
1809         if (force_nonblock)
1810                 kiocb->ki_flags |= IOCB_NOWAIT;
1811
1812         if (ctx->flags & IORING_SETUP_IOPOLL) {
1813                 if (!(kiocb->ki_flags & IOCB_DIRECT) ||
1814                     !kiocb->ki_filp->f_op->iopoll)
1815                         return -EOPNOTSUPP;
1816
1817                 kiocb->ki_flags |= IOCB_HIPRI;
1818                 kiocb->ki_complete = io_complete_rw_iopoll;
1819                 req->result = 0;
1820         } else {
1821                 if (kiocb->ki_flags & IOCB_HIPRI)
1822                         return -EINVAL;
1823                 kiocb->ki_complete = io_complete_rw;
1824         }
1825
1826         req->rw.addr = READ_ONCE(sqe->addr);
1827         req->rw.len = READ_ONCE(sqe->len);
1828         /* we own ->private, reuse it for the buffer index */
1829         req->rw.kiocb.private = (void *) (unsigned long)
1830                                         READ_ONCE(sqe->buf_index);
1831         return 0;
1832 }
1833
1834 static inline void io_rw_done(struct kiocb *kiocb, ssize_t ret)
1835 {
1836         switch (ret) {
1837         case -EIOCBQUEUED:
1838                 break;
1839         case -ERESTARTSYS:
1840         case -ERESTARTNOINTR:
1841         case -ERESTARTNOHAND:
1842         case -ERESTART_RESTARTBLOCK:
1843                 /*
1844                  * We can't just restart the syscall, since previously
1845                  * submitted sqes may already be in progress. Just fail this
1846                  * IO with EINTR.
1847                  */
1848                 ret = -EINTR;
1849                 /* fall through */
1850         default:
1851                 kiocb->ki_complete(kiocb, ret, 0);
1852         }
1853 }
1854
1855 static void kiocb_done(struct kiocb *kiocb, ssize_t ret, struct io_kiocb **nxt,
1856                        bool in_async)
1857 {
1858         struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw.kiocb);
1859
1860         if (req->flags & REQ_F_CUR_POS)
1861                 req->file->f_pos = kiocb->ki_pos;
1862         if (in_async && ret >= 0 && kiocb->ki_complete == io_complete_rw)
1863                 *nxt = __io_complete_rw(kiocb, ret);
1864         else
1865                 io_rw_done(kiocb, ret);
1866 }
1867
1868 static ssize_t io_import_fixed(struct io_kiocb *req, int rw,
1869                                struct iov_iter *iter)
1870 {
1871         struct io_ring_ctx *ctx = req->ctx;
1872         size_t len = req->rw.len;
1873         struct io_mapped_ubuf *imu;
1874         unsigned index, buf_index;
1875         size_t offset;
1876         u64 buf_addr;
1877
1878         /* attempt to use fixed buffers without having provided iovecs */
1879         if (unlikely(!ctx->user_bufs))
1880                 return -EFAULT;
1881
1882         buf_index = (unsigned long) req->rw.kiocb.private;
1883         if (unlikely(buf_index >= ctx->nr_user_bufs))
1884                 return -EFAULT;
1885
1886         index = array_index_nospec(buf_index, ctx->nr_user_bufs);
1887         imu = &ctx->user_bufs[index];
1888         buf_addr = req->rw.addr;
1889
1890         /* overflow */
1891         if (buf_addr + len < buf_addr)
1892                 return -EFAULT;
1893         /* not inside the mapped region */
1894         if (buf_addr < imu->ubuf || buf_addr + len > imu->ubuf + imu->len)
1895                 return -EFAULT;
1896
1897         /*
1898          * May not be a start of buffer, set size appropriately
1899          * and advance us to the beginning.
1900          */
1901         offset = buf_addr - imu->ubuf;
1902         iov_iter_bvec(iter, rw, imu->bvec, imu->nr_bvecs, offset + len);
1903
1904         if (offset) {
1905                 /*
1906                  * Don't use iov_iter_advance() here, as it's really slow for
1907                  * using the latter parts of a big fixed buffer - it iterates
1908                  * over each segment manually. We can cheat a bit here, because
1909                  * we know that:
1910                  *
1911                  * 1) it's a BVEC iter, we set it up
1912                  * 2) all bvecs are PAGE_SIZE in size, except potentially the
1913                  *    first and last bvec
1914                  *
1915                  * So just find our index, and adjust the iterator afterwards.
1916                  * If the offset is within the first bvec (or the whole first
1917                  * bvec, just use iov_iter_advance(). This makes it easier
1918                  * since we can just skip the first segment, which may not
1919                  * be PAGE_SIZE aligned.
1920                  */
1921                 const struct bio_vec *bvec = imu->bvec;
1922
1923                 if (offset <= bvec->bv_len) {
1924                         iov_iter_advance(iter, offset);
1925                 } else {
1926                         unsigned long seg_skip;
1927
1928                         /* skip first vec */
1929                         offset -= bvec->bv_len;
1930                         seg_skip = 1 + (offset >> PAGE_SHIFT);
1931
1932                         iter->bvec = bvec + seg_skip;
1933                         iter->nr_segs -= seg_skip;
1934                         iter->count -= bvec->bv_len + offset;
1935                         iter->iov_offset = offset & ~PAGE_MASK;
1936                 }
1937         }
1938
1939         return len;
1940 }
1941
1942 static ssize_t io_import_iovec(int rw, struct io_kiocb *req,
1943                                struct iovec **iovec, struct iov_iter *iter)
1944 {
1945         void __user *buf = u64_to_user_ptr(req->rw.addr);
1946         size_t sqe_len = req->rw.len;
1947         u8 opcode;
1948
1949         opcode = req->opcode;
1950         if (opcode == IORING_OP_READ_FIXED || opcode == IORING_OP_WRITE_FIXED) {
1951                 *iovec = NULL;
1952                 return io_import_fixed(req, rw, iter);
1953         }
1954
1955         /* buffer index only valid with fixed read/write */
1956         if (req->rw.kiocb.private)
1957                 return -EINVAL;
1958
1959         if (opcode == IORING_OP_READ || opcode == IORING_OP_WRITE) {
1960                 ssize_t ret;
1961                 ret = import_single_range(rw, buf, sqe_len, *iovec, iter);
1962                 *iovec = NULL;
1963                 return ret;
1964         }
1965
1966         if (req->io) {
1967                 struct io_async_rw *iorw = &req->io->rw;
1968
1969                 *iovec = iorw->iov;
1970                 iov_iter_init(iter, rw, *iovec, iorw->nr_segs, iorw->size);
1971                 if (iorw->iov == iorw->fast_iov)
1972                         *iovec = NULL;
1973                 return iorw->size;
1974         }
1975
1976         if (!req->has_user)
1977                 return -EFAULT;
1978
1979 #ifdef CONFIG_COMPAT
1980         if (req->ctx->compat)
1981                 return compat_import_iovec(rw, buf, sqe_len, UIO_FASTIOV,
1982                                                 iovec, iter);
1983 #endif
1984
1985         return import_iovec(rw, buf, sqe_len, UIO_FASTIOV, iovec, iter);
1986 }
1987
1988 /*
1989  * For files that don't have ->read_iter() and ->write_iter(), handle them
1990  * by looping over ->read() or ->write() manually.
1991  */
1992 static ssize_t loop_rw_iter(int rw, struct file *file, struct kiocb *kiocb,
1993                            struct iov_iter *iter)
1994 {
1995         ssize_t ret = 0;
1996
1997         /*
1998          * Don't support polled IO through this interface, and we can't
1999          * support non-blocking either. For the latter, this just causes
2000          * the kiocb to be handled from an async context.
2001          */
2002         if (kiocb->ki_flags & IOCB_HIPRI)
2003                 return -EOPNOTSUPP;
2004         if (kiocb->ki_flags & IOCB_NOWAIT)
2005                 return -EAGAIN;
2006
2007         while (iov_iter_count(iter)) {
2008                 struct iovec iovec;
2009                 ssize_t nr;
2010
2011                 if (!iov_iter_is_bvec(iter)) {
2012                         iovec = iov_iter_iovec(iter);
2013                 } else {
2014                         /* fixed buffers import bvec */
2015                         iovec.iov_base = kmap(iter->bvec->bv_page)
2016                                                 + iter->iov_offset;
2017                         iovec.iov_len = min(iter->count,
2018                                         iter->bvec->bv_len - iter->iov_offset);
2019                 }
2020
2021                 if (rw == READ) {
2022                         nr = file->f_op->read(file, iovec.iov_base,
2023                                               iovec.iov_len, &kiocb->ki_pos);
2024                 } else {
2025                         nr = file->f_op->write(file, iovec.iov_base,
2026                                                iovec.iov_len, &kiocb->ki_pos);
2027                 }
2028
2029                 if (iov_iter_is_bvec(iter))
2030                         kunmap(iter->bvec->bv_page);
2031
2032                 if (nr < 0) {
2033                         if (!ret)
2034                                 ret = nr;
2035                         break;
2036                 }
2037                 ret += nr;
2038                 if (nr != iovec.iov_len)
2039                         break;
2040                 iov_iter_advance(iter, nr);
2041         }
2042
2043         return ret;
2044 }
2045
2046 static void io_req_map_rw(struct io_kiocb *req, ssize_t io_size,
2047                           struct iovec *iovec, struct iovec *fast_iov,
2048                           struct iov_iter *iter)
2049 {
2050         req->io->rw.nr_segs = iter->nr_segs;
2051         req->io->rw.size = io_size;
2052         req->io->rw.iov = iovec;
2053         if (!req->io->rw.iov) {
2054                 req->io->rw.iov = req->io->rw.fast_iov;
2055                 memcpy(req->io->rw.iov, fast_iov,
2056                         sizeof(struct iovec) * iter->nr_segs);
2057         }
2058 }
2059
2060 static int io_alloc_async_ctx(struct io_kiocb *req)
2061 {
2062         if (!io_op_defs[req->opcode].async_ctx)
2063                 return 0;
2064         req->io = kmalloc(sizeof(*req->io), GFP_KERNEL);
2065         return req->io == NULL;
2066 }
2067
2068 static void io_rw_async(struct io_wq_work **workptr)
2069 {
2070         struct io_kiocb *req = container_of(*workptr, struct io_kiocb, work);
2071         struct iovec *iov = NULL;
2072
2073         if (req->io->rw.iov != req->io->rw.fast_iov)
2074                 iov = req->io->rw.iov;
2075         io_wq_submit_work(workptr);
2076         kfree(iov);
2077 }
2078
2079 static int io_setup_async_rw(struct io_kiocb *req, ssize_t io_size,
2080                              struct iovec *iovec, struct iovec *fast_iov,
2081                              struct iov_iter *iter)
2082 {
2083         if (req->opcode == IORING_OP_READ_FIXED ||
2084             req->opcode == IORING_OP_WRITE_FIXED)
2085                 return 0;
2086         if (!req->io && io_alloc_async_ctx(req))
2087                 return -ENOMEM;
2088
2089         io_req_map_rw(req, io_size, iovec, fast_iov, iter);
2090         req->work.func = io_rw_async;
2091         return 0;
2092 }
2093
2094 static int io_read_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe,
2095                         bool force_nonblock)
2096 {
2097         struct io_async_ctx *io;
2098         struct iov_iter iter;
2099         ssize_t ret;
2100
2101         ret = io_prep_rw(req, sqe, force_nonblock);
2102         if (ret)
2103                 return ret;
2104
2105         if (unlikely(!(req->file->f_mode & FMODE_READ)))
2106                 return -EBADF;
2107
2108         if (!req->io)
2109                 return 0;
2110
2111         io = req->io;
2112         io->rw.iov = io->rw.fast_iov;
2113         req->io = NULL;
2114         ret = io_import_iovec(READ, req, &io->rw.iov, &iter);
2115         req->io = io;
2116         if (ret < 0)
2117                 return ret;
2118
2119         io_req_map_rw(req, ret, io->rw.iov, io->rw.fast_iov, &iter);
2120         return 0;
2121 }
2122
2123 static int io_read(struct io_kiocb *req, struct io_kiocb **nxt,
2124                    bool force_nonblock)
2125 {
2126         struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs;
2127         struct kiocb *kiocb = &req->rw.kiocb;
2128         struct iov_iter iter;
2129         size_t iov_count;
2130         ssize_t io_size, ret;
2131
2132         ret = io_import_iovec(READ, req, &iovec, &iter);
2133         if (ret < 0)
2134                 return ret;
2135
2136         /* Ensure we clear previously set non-block flag */
2137         if (!force_nonblock)
2138                 req->rw.kiocb.ki_flags &= ~IOCB_NOWAIT;
2139
2140         req->result = 0;
2141         io_size = ret;
2142         if (req->flags & REQ_F_LINK)
2143                 req->result = io_size;
2144
2145         /*
2146          * If the file doesn't support async, mark it as REQ_F_MUST_PUNT so
2147          * we know to async punt it even if it was opened O_NONBLOCK
2148          */
2149         if (force_nonblock && !io_file_supports_async(req->file)) {
2150                 req->flags |= REQ_F_MUST_PUNT;
2151                 goto copy_iov;
2152         }
2153
2154         iov_count = iov_iter_count(&iter);
2155         ret = rw_verify_area(READ, req->file, &kiocb->ki_pos, iov_count);
2156         if (!ret) {
2157                 ssize_t ret2;
2158
2159                 if (req->file->f_op->read_iter)
2160                         ret2 = call_read_iter(req->file, kiocb, &iter);
2161                 else
2162                         ret2 = loop_rw_iter(READ, req->file, kiocb, &iter);
2163
2164                 /* Catch -EAGAIN return for forced non-blocking submission */
2165                 if (!force_nonblock || ret2 != -EAGAIN) {
2166                         kiocb_done(kiocb, ret2, nxt, req->in_async);
2167                 } else {
2168 copy_iov:
2169                         ret = io_setup_async_rw(req, io_size, iovec,
2170                                                 inline_vecs, &iter);
2171                         if (ret)
2172                                 goto out_free;
2173                         return -EAGAIN;
2174                 }
2175         }
2176 out_free:
2177         if (!io_wq_current_is_worker())
2178                 kfree(iovec);
2179         return ret;
2180 }
2181
2182 static int io_write_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe,
2183                          bool force_nonblock)
2184 {
2185         struct io_async_ctx *io;
2186         struct iov_iter iter;
2187         ssize_t ret;
2188
2189         ret = io_prep_rw(req, sqe, force_nonblock);
2190         if (ret)
2191                 return ret;
2192
2193         if (unlikely(!(req->file->f_mode & FMODE_WRITE)))
2194                 return -EBADF;
2195
2196         if (!req->io)
2197                 return 0;
2198
2199         io = req->io;
2200         io->rw.iov = io->rw.fast_iov;
2201         req->io = NULL;
2202         ret = io_import_iovec(WRITE, req, &io->rw.iov, &iter);
2203         req->io = io;
2204         if (ret < 0)
2205                 return ret;
2206
2207         io_req_map_rw(req, ret, io->rw.iov, io->rw.fast_iov, &iter);
2208         return 0;
2209 }
2210
2211 static int io_write(struct io_kiocb *req, struct io_kiocb **nxt,
2212                     bool force_nonblock)
2213 {
2214         struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs;
2215         struct kiocb *kiocb = &req->rw.kiocb;
2216         struct iov_iter iter;
2217         size_t iov_count;
2218         ssize_t ret, io_size;
2219
2220         ret = io_import_iovec(WRITE, req, &iovec, &iter);
2221         if (ret < 0)
2222                 return ret;
2223
2224         /* Ensure we clear previously set non-block flag */
2225         if (!force_nonblock)
2226                 req->rw.kiocb.ki_flags &= ~IOCB_NOWAIT;
2227
2228         req->result = 0;
2229         io_size = ret;
2230         if (req->flags & REQ_F_LINK)
2231                 req->result = io_size;
2232
2233         /*
2234          * If the file doesn't support async, mark it as REQ_F_MUST_PUNT so
2235          * we know to async punt it even if it was opened O_NONBLOCK
2236          */
2237         if (force_nonblock && !io_file_supports_async(req->file)) {
2238                 req->flags |= REQ_F_MUST_PUNT;
2239                 goto copy_iov;
2240         }
2241
2242         /* file path doesn't support NOWAIT for non-direct_IO */
2243         if (force_nonblock && !(kiocb->ki_flags & IOCB_DIRECT) &&
2244             (req->flags & REQ_F_ISREG))
2245                 goto copy_iov;
2246
2247         iov_count = iov_iter_count(&iter);
2248         ret = rw_verify_area(WRITE, req->file, &kiocb->ki_pos, iov_count);
2249         if (!ret) {
2250                 ssize_t ret2;
2251
2252                 /*
2253                  * Open-code file_start_write here to grab freeze protection,
2254                  * which will be released by another thread in
2255                  * io_complete_rw().  Fool lockdep by telling it the lock got
2256                  * released so that it doesn't complain about the held lock when
2257                  * we return to userspace.
2258                  */
2259                 if (req->flags & REQ_F_ISREG) {
2260                         __sb_start_write(file_inode(req->file)->i_sb,
2261                                                 SB_FREEZE_WRITE, true);
2262                         __sb_writers_release(file_inode(req->file)->i_sb,
2263                                                 SB_FREEZE_WRITE);
2264                 }
2265                 kiocb->ki_flags |= IOCB_WRITE;
2266
2267                 if (req->file->f_op->write_iter)
2268                         ret2 = call_write_iter(req->file, kiocb, &iter);
2269                 else
2270                         ret2 = loop_rw_iter(WRITE, req->file, kiocb, &iter);
2271                 if (!force_nonblock || ret2 != -EAGAIN) {
2272                         kiocb_done(kiocb, ret2, nxt, req->in_async);
2273                 } else {
2274 copy_iov:
2275                         ret = io_setup_async_rw(req, io_size, iovec,
2276                                                 inline_vecs, &iter);
2277                         if (ret)
2278                                 goto out_free;
2279                         return -EAGAIN;
2280                 }
2281         }
2282 out_free:
2283         if (!io_wq_current_is_worker())
2284                 kfree(iovec);
2285         return ret;
2286 }
2287
2288 /*
2289  * IORING_OP_NOP just posts a completion event, nothing else.
2290  */
2291 static int io_nop(struct io_kiocb *req)
2292 {
2293         struct io_ring_ctx *ctx = req->ctx;
2294
2295         if (unlikely(ctx->flags & IORING_SETUP_IOPOLL))
2296                 return -EINVAL;
2297
2298         io_cqring_add_event(req, 0);
2299         io_put_req(req);
2300         return 0;
2301 }
2302
2303 static int io_prep_fsync(struct io_kiocb *req, const struct io_uring_sqe *sqe)
2304 {
2305         struct io_ring_ctx *ctx = req->ctx;
2306
2307         if (!req->file)
2308                 return -EBADF;
2309
2310         if (unlikely(ctx->flags & IORING_SETUP_IOPOLL))
2311                 return -EINVAL;
2312         if (unlikely(sqe->addr || sqe->ioprio || sqe->buf_index))
2313                 return -EINVAL;
2314
2315         req->sync.flags = READ_ONCE(sqe->fsync_flags);
2316         if (unlikely(req->sync.flags & ~IORING_FSYNC_DATASYNC))
2317                 return -EINVAL;
2318
2319         req->sync.off = READ_ONCE(sqe->off);
2320         req->sync.len = READ_ONCE(sqe->len);
2321         return 0;
2322 }
2323
2324 static bool io_req_cancelled(struct io_kiocb *req)
2325 {
2326         if (req->work.flags & IO_WQ_WORK_CANCEL) {
2327                 req_set_fail_links(req);
2328                 io_cqring_add_event(req, -ECANCELED);
2329                 io_put_req(req);
2330                 return true;
2331         }
2332
2333         return false;
2334 }
2335
2336 static void io_link_work_cb(struct io_wq_work **workptr)
2337 {
2338         struct io_wq_work *work = *workptr;
2339         struct io_kiocb *link = work->data;
2340
2341         io_queue_linked_timeout(link);
2342         work->func = io_wq_submit_work;
2343 }
2344
2345 static void io_wq_assign_next(struct io_wq_work **workptr, struct io_kiocb *nxt)
2346 {
2347         struct io_kiocb *link;
2348
2349         io_prep_async_work(nxt, &link);
2350         *workptr = &nxt->work;
2351         if (link) {
2352                 nxt->work.flags |= IO_WQ_WORK_CB;
2353                 nxt->work.func = io_link_work_cb;
2354                 nxt->work.data = link;
2355         }
2356 }
2357
2358 static void io_fsync_finish(struct io_wq_work **workptr)
2359 {
2360         struct io_kiocb *req = container_of(*workptr, struct io_kiocb, work);
2361         loff_t end = req->sync.off + req->sync.len;
2362         struct io_kiocb *nxt = NULL;
2363         int ret;
2364
2365         if (io_req_cancelled(req))
2366                 return;
2367
2368         ret = vfs_fsync_range(req->file, req->sync.off,
2369                                 end > 0 ? end : LLONG_MAX,
2370                                 req->sync.flags & IORING_FSYNC_DATASYNC);
2371         if (ret < 0)
2372                 req_set_fail_links(req);
2373         io_cqring_add_event(req, ret);
2374         io_put_req_find_next(req, &nxt);
2375         if (nxt)
2376                 io_wq_assign_next(workptr, nxt);
2377 }
2378
2379 static int io_fsync(struct io_kiocb *req, struct io_kiocb **nxt,
2380                     bool force_nonblock)
2381 {
2382         struct io_wq_work *work, *old_work;
2383
2384         /* fsync always requires a blocking context */
2385         if (force_nonblock) {
2386                 io_put_req(req);
2387                 req->work.func = io_fsync_finish;
2388                 return -EAGAIN;
2389         }
2390
2391         work = old_work = &req->work;
2392         io_fsync_finish(&work);
2393         if (work && work != old_work)
2394                 *nxt = container_of(work, struct io_kiocb, work);
2395         return 0;
2396 }
2397
2398 static void io_fallocate_finish(struct io_wq_work **workptr)
2399 {
2400         struct io_kiocb *req = container_of(*workptr, struct io_kiocb, work);
2401         struct io_kiocb *nxt = NULL;
2402         int ret;
2403
2404         ret = vfs_fallocate(req->file, req->sync.mode, req->sync.off,
2405                                 req->sync.len);
2406         if (ret < 0)
2407                 req_set_fail_links(req);
2408         io_cqring_add_event(req, ret);
2409         io_put_req_find_next(req, &nxt);
2410         if (nxt)
2411                 io_wq_assign_next(workptr, nxt);
2412 }
2413
2414 static int io_fallocate_prep(struct io_kiocb *req,
2415                              const struct io_uring_sqe *sqe)
2416 {
2417         if (sqe->ioprio || sqe->buf_index || sqe->rw_flags)
2418                 return -EINVAL;
2419
2420         req->sync.off = READ_ONCE(sqe->off);
2421         req->sync.len = READ_ONCE(sqe->addr);
2422         req->sync.mode = READ_ONCE(sqe->len);
2423         return 0;
2424 }
2425
2426 static int io_fallocate(struct io_kiocb *req, struct io_kiocb **nxt,
2427                         bool force_nonblock)
2428 {
2429         struct io_wq_work *work, *old_work;
2430
2431         /* fallocate always requiring blocking context */
2432         if (force_nonblock) {
2433                 io_put_req(req);
2434                 req->work.func = io_fallocate_finish;
2435                 return -EAGAIN;
2436         }
2437
2438         work = old_work = &req->work;
2439         io_fallocate_finish(&work);
2440         if (work && work != old_work)
2441                 *nxt = container_of(work, struct io_kiocb, work);
2442
2443         return 0;
2444 }
2445
2446 static int io_openat_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
2447 {
2448         int ret;
2449
2450         if (sqe->ioprio || sqe->buf_index)
2451                 return -EINVAL;
2452
2453         req->open.dfd = READ_ONCE(sqe->fd);
2454         req->open.mode = READ_ONCE(sqe->len);
2455         req->open.fname = u64_to_user_ptr(READ_ONCE(sqe->addr));
2456         req->open.flags = READ_ONCE(sqe->open_flags);
2457
2458         req->open.filename = getname(req->open.fname);
2459         if (IS_ERR(req->open.filename)) {
2460                 ret = PTR_ERR(req->open.filename);
2461                 req->open.filename = NULL;
2462                 return ret;
2463         }
2464
2465         return 0;
2466 }
2467
2468 static int io_openat(struct io_kiocb *req, struct io_kiocb **nxt,
2469                      bool force_nonblock)
2470 {
2471         struct open_flags op;
2472         struct open_how how;
2473         struct file *file;
2474         int ret;
2475
2476         if (force_nonblock) {
2477                 req->work.flags |= IO_WQ_WORK_NEEDS_FILES;
2478                 return -EAGAIN;
2479         }
2480
2481         how = build_open_how(req->open.flags, req->open.mode);
2482         ret = build_open_flags(&how, &op);
2483         if (ret)
2484                 goto err;
2485
2486         ret = get_unused_fd_flags(how.flags);
2487         if (ret < 0)
2488                 goto err;
2489
2490         file = do_filp_open(req->open.dfd, req->open.filename, &op);
2491         if (IS_ERR(file)) {
2492                 put_unused_fd(ret);
2493                 ret = PTR_ERR(file);
2494         } else {
2495                 fsnotify_open(file);
2496                 fd_install(ret, file);
2497         }
2498 err:
2499         putname(req->open.filename);
2500         if (ret < 0)
2501                 req_set_fail_links(req);
2502         io_cqring_add_event(req, ret);
2503         io_put_req_find_next(req, nxt);
2504         return 0;
2505 }
2506
2507 static int io_madvise_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
2508 {
2509 #if defined(CONFIG_ADVISE_SYSCALLS) && defined(CONFIG_MMU)
2510         if (sqe->ioprio || sqe->buf_index || sqe->off)
2511                 return -EINVAL;
2512
2513         req->madvise.addr = READ_ONCE(sqe->addr);
2514         req->madvise.len = READ_ONCE(sqe->len);
2515         req->madvise.advice = READ_ONCE(sqe->fadvise_advice);
2516         return 0;
2517 #else
2518         return -EOPNOTSUPP;
2519 #endif
2520 }
2521
2522 static int io_madvise(struct io_kiocb *req, struct io_kiocb **nxt,
2523                       bool force_nonblock)
2524 {
2525 #if defined(CONFIG_ADVISE_SYSCALLS) && defined(CONFIG_MMU)
2526         struct io_madvise *ma = &req->madvise;
2527         int ret;
2528
2529         if (force_nonblock)
2530                 return -EAGAIN;
2531
2532         ret = do_madvise(ma->addr, ma->len, ma->advice);
2533         if (ret < 0)
2534                 req_set_fail_links(req);
2535         io_cqring_add_event(req, ret);
2536         io_put_req_find_next(req, nxt);
2537         return 0;
2538 #else
2539         return -EOPNOTSUPP;
2540 #endif
2541 }
2542
2543 static int io_fadvise_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
2544 {
2545         if (sqe->ioprio || sqe->buf_index || sqe->addr)
2546                 return -EINVAL;
2547
2548         req->fadvise.offset = READ_ONCE(sqe->off);
2549         req->fadvise.len = READ_ONCE(sqe->len);
2550         req->fadvise.advice = READ_ONCE(sqe->fadvise_advice);
2551         return 0;
2552 }
2553
2554 static int io_fadvise(struct io_kiocb *req, struct io_kiocb **nxt,
2555                       bool force_nonblock)
2556 {
2557         struct io_fadvise *fa = &req->fadvise;
2558         int ret;
2559
2560         /* DONTNEED may block, others _should_ not */
2561         if (fa->advice == POSIX_FADV_DONTNEED && force_nonblock)
2562                 return -EAGAIN;
2563
2564         ret = vfs_fadvise(req->file, fa->offset, fa->len, fa->advice);
2565         if (ret < 0)
2566                 req_set_fail_links(req);
2567         io_cqring_add_event(req, ret);
2568         io_put_req_find_next(req, nxt);
2569         return 0;
2570 }
2571
2572 static int io_statx_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
2573 {
2574         unsigned lookup_flags;
2575         int ret;
2576
2577         if (sqe->ioprio || sqe->buf_index)
2578                 return -EINVAL;
2579
2580         req->open.dfd = READ_ONCE(sqe->fd);
2581         req->open.mask = READ_ONCE(sqe->len);
2582         req->open.fname = u64_to_user_ptr(READ_ONCE(sqe->addr));
2583         req->open.buffer = u64_to_user_ptr(READ_ONCE(sqe->addr2));
2584         req->open.flags = READ_ONCE(sqe->statx_flags);
2585
2586         if (vfs_stat_set_lookup_flags(&lookup_flags, req->open.flags))
2587                 return -EINVAL;
2588
2589         req->open.filename = getname_flags(req->open.fname, lookup_flags, NULL);
2590         if (IS_ERR(req->open.filename)) {
2591                 ret = PTR_ERR(req->open.filename);
2592                 req->open.filename = NULL;
2593                 return ret;
2594         }
2595
2596         return 0;
2597 }
2598
2599 static int io_statx(struct io_kiocb *req, struct io_kiocb **nxt,
2600                     bool force_nonblock)
2601 {
2602         struct io_open *ctx = &req->open;
2603         unsigned lookup_flags;
2604         struct path path;
2605         struct kstat stat;
2606         int ret;
2607
2608         if (force_nonblock)
2609                 return -EAGAIN;
2610
2611         if (vfs_stat_set_lookup_flags(&lookup_flags, ctx->flags))
2612                 return -EINVAL;
2613
2614 retry:
2615         /* filename_lookup() drops it, keep a reference */
2616         ctx->filename->refcnt++;
2617
2618         ret = filename_lookup(ctx->dfd, ctx->filename, lookup_flags, &path,
2619                                 NULL);
2620         if (ret)
2621                 goto err;
2622
2623         ret = vfs_getattr(&path, &stat, ctx->mask, ctx->flags);
2624         path_put(&path);
2625         if (retry_estale(ret, lookup_flags)) {
2626                 lookup_flags |= LOOKUP_REVAL;
2627                 goto retry;
2628         }
2629         if (!ret)
2630                 ret = cp_statx(&stat, ctx->buffer);
2631 err:
2632         putname(ctx->filename);
2633         if (ret < 0)
2634                 req_set_fail_links(req);
2635         io_cqring_add_event(req, ret);
2636         io_put_req_find_next(req, nxt);
2637         return 0;
2638 }
2639
2640 static int io_close_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
2641 {
2642         /*
2643          * If we queue this for async, it must not be cancellable. That would
2644          * leave the 'file' in an undeterminate state.
2645          */
2646         req->work.flags |= IO_WQ_WORK_NO_CANCEL;
2647
2648         if (sqe->ioprio || sqe->off || sqe->addr || sqe->len ||
2649             sqe->rw_flags || sqe->buf_index)
2650                 return -EINVAL;
2651         if (sqe->flags & IOSQE_FIXED_FILE)
2652                 return -EINVAL;
2653
2654         req->close.fd = READ_ONCE(sqe->fd);
2655         if (req->file->f_op == &io_uring_fops ||
2656             req->close.fd == req->ring_fd)
2657                 return -EBADF;
2658
2659         return 0;
2660 }
2661
2662 static void io_close_finish(struct io_wq_work **workptr)
2663 {
2664         struct io_kiocb *req = container_of(*workptr, struct io_kiocb, work);
2665         struct io_kiocb *nxt = NULL;
2666
2667         /* Invoked with files, we need to do the close */
2668         if (req->work.files) {
2669                 int ret;
2670
2671                 ret = filp_close(req->close.put_file, req->work.files);
2672                 if (ret < 0) {
2673                         req_set_fail_links(req);
2674                 }
2675                 io_cqring_add_event(req, ret);
2676         }
2677
2678         fput(req->close.put_file);
2679
2680         /* we bypassed the re-issue, drop the submission reference */
2681         io_put_req(req);
2682         io_put_req_find_next(req, &nxt);
2683         if (nxt)
2684                 io_wq_assign_next(workptr, nxt);
2685 }
2686
2687 static int io_close(struct io_kiocb *req, struct io_kiocb **nxt,
2688                     bool force_nonblock)
2689 {
2690         int ret;
2691
2692         req->close.put_file = NULL;
2693         ret = __close_fd_get_file(req->close.fd, &req->close.put_file);
2694         if (ret < 0)
2695                 return ret;
2696
2697         /* if the file has a flush method, be safe and punt to async */
2698         if (req->close.put_file->f_op->flush && !io_wq_current_is_worker()) {
2699                 req->work.flags |= IO_WQ_WORK_NEEDS_FILES;
2700                 goto eagain;
2701         }
2702
2703         /*
2704          * No ->flush(), safely close from here and just punt the
2705          * fput() to async context.
2706          */
2707         ret = filp_close(req->close.put_file, current->files);
2708
2709         if (ret < 0)
2710                 req_set_fail_links(req);
2711         io_cqring_add_event(req, ret);
2712
2713         if (io_wq_current_is_worker()) {
2714                 struct io_wq_work *old_work, *work;
2715
2716                 old_work = work = &req->work;
2717                 io_close_finish(&work);
2718                 if (work && work != old_work)
2719                         *nxt = container_of(work, struct io_kiocb, work);
2720                 return 0;
2721         }
2722
2723 eagain:
2724         req->work.func = io_close_finish;
2725         return -EAGAIN;
2726 }
2727
2728 static int io_prep_sfr(struct io_kiocb *req, const struct io_uring_sqe *sqe)
2729 {
2730         struct io_ring_ctx *ctx = req->ctx;
2731
2732         if (!req->file)
2733                 return -EBADF;
2734
2735         if (unlikely(ctx->flags & IORING_SETUP_IOPOLL))
2736                 return -EINVAL;
2737         if (unlikely(sqe->addr || sqe->ioprio || sqe->buf_index))
2738                 return -EINVAL;
2739
2740         req->sync.off = READ_ONCE(sqe->off);
2741         req->sync.len = READ_ONCE(sqe->len);
2742         req->sync.flags = READ_ONCE(sqe->sync_range_flags);
2743         return 0;
2744 }
2745
2746 static void io_sync_file_range_finish(struct io_wq_work **workptr)
2747 {
2748         struct io_kiocb *req = container_of(*workptr, struct io_kiocb, work);
2749         struct io_kiocb *nxt = NULL;
2750         int ret;
2751
2752         if (io_req_cancelled(req))
2753                 return;
2754
2755         ret = sync_file_range(req->file, req->sync.off, req->sync.len,
2756                                 req->sync.flags);
2757         if (ret < 0)
2758                 req_set_fail_links(req);
2759         io_cqring_add_event(req, ret);
2760         io_put_req_find_next(req, &nxt);
2761         if (nxt)
2762                 io_wq_assign_next(workptr, nxt);
2763 }
2764
2765 static int io_sync_file_range(struct io_kiocb *req, struct io_kiocb **nxt,
2766                               bool force_nonblock)
2767 {
2768         struct io_wq_work *work, *old_work;
2769
2770         /* sync_file_range always requires a blocking context */
2771         if (force_nonblock) {
2772                 io_put_req(req);
2773                 req->work.func = io_sync_file_range_finish;
2774                 return -EAGAIN;
2775         }
2776
2777         work = old_work = &req->work;
2778         io_sync_file_range_finish(&work);
2779         if (work && work != old_work)
2780                 *nxt = container_of(work, struct io_kiocb, work);
2781         return 0;
2782 }
2783
2784 #if defined(CONFIG_NET)
2785 static void io_sendrecv_async(struct io_wq_work **workptr)
2786 {
2787         struct io_kiocb *req = container_of(*workptr, struct io_kiocb, work);
2788         struct iovec *iov = NULL;
2789
2790         if (req->io->rw.iov != req->io->rw.fast_iov)
2791                 iov = req->io->msg.iov;
2792         io_wq_submit_work(workptr);
2793         kfree(iov);
2794 }
2795 #endif
2796
2797 static int io_sendmsg_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
2798 {
2799 #if defined(CONFIG_NET)
2800         struct io_sr_msg *sr = &req->sr_msg;
2801         struct io_async_ctx *io = req->io;
2802
2803         sr->msg_flags = READ_ONCE(sqe->msg_flags);
2804         sr->msg = u64_to_user_ptr(READ_ONCE(sqe->addr));
2805
2806         if (!io)
2807                 return 0;
2808
2809         io->msg.iov = io->msg.fast_iov;
2810         return sendmsg_copy_msghdr(&io->msg.msg, sr->msg, sr->msg_flags,
2811                                         &io->msg.iov);
2812 #else
2813         return -EOPNOTSUPP;
2814 #endif
2815 }
2816
2817 static int io_sendmsg(struct io_kiocb *req, struct io_kiocb **nxt,
2818                       bool force_nonblock)
2819 {
2820 #if defined(CONFIG_NET)
2821         struct io_async_msghdr *kmsg = NULL;
2822         struct socket *sock;
2823         int ret;
2824
2825         if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL))
2826                 return -EINVAL;
2827
2828         sock = sock_from_file(req->file, &ret);
2829         if (sock) {
2830                 struct io_async_ctx io;
2831                 struct sockaddr_storage addr;
2832                 unsigned flags;
2833
2834                 if (req->io) {
2835                         kmsg = &req->io->msg;
2836                         kmsg->msg.msg_name = &addr;
2837                         /* if iov is set, it's allocated already */
2838                         if (!kmsg->iov)
2839                                 kmsg->iov = kmsg->fast_iov;
2840                         kmsg->msg.msg_iter.iov = kmsg->iov;
2841                 } else {
2842                         struct io_sr_msg *sr = &req->sr_msg;
2843
2844                         kmsg = &io.msg;
2845                         kmsg->msg.msg_name = &addr;
2846
2847                         io.msg.iov = io.msg.fast_iov;
2848                         ret = sendmsg_copy_msghdr(&io.msg.msg, sr->msg,
2849                                         sr->msg_flags, &io.msg.iov);
2850                         if (ret)
2851                                 return ret;
2852                 }
2853
2854                 flags = req->sr_msg.msg_flags;
2855                 if (flags & MSG_DONTWAIT)
2856                         req->flags |= REQ_F_NOWAIT;
2857                 else if (force_nonblock)
2858                         flags |= MSG_DONTWAIT;
2859
2860                 ret = __sys_sendmsg_sock(sock, &kmsg->msg, flags);
2861                 if (force_nonblock && ret == -EAGAIN) {
2862                         if (req->io)
2863                                 return -EAGAIN;
2864                         if (io_alloc_async_ctx(req))
2865                                 return -ENOMEM;
2866                         memcpy(&req->io->msg, &io.msg, sizeof(io.msg));
2867                         req->work.func = io_sendrecv_async;
2868                         return -EAGAIN;
2869                 }
2870                 if (ret == -ERESTARTSYS)
2871                         ret = -EINTR;
2872         }
2873
2874         if (!io_wq_current_is_worker() && kmsg && kmsg->iov != kmsg->fast_iov)
2875                 kfree(kmsg->iov);
2876         io_cqring_add_event(req, ret);
2877         if (ret < 0)
2878                 req_set_fail_links(req);
2879         io_put_req_find_next(req, nxt);
2880         return 0;
2881 #else
2882         return -EOPNOTSUPP;
2883 #endif
2884 }
2885
2886 static int io_recvmsg_prep(struct io_kiocb *req,
2887                            const struct io_uring_sqe *sqe)
2888 {
2889 #if defined(CONFIG_NET)
2890         struct io_sr_msg *sr = &req->sr_msg;
2891         struct io_async_ctx *io = req->io;
2892
2893         sr->msg_flags = READ_ONCE(sqe->msg_flags);
2894         sr->msg = u64_to_user_ptr(READ_ONCE(sqe->addr));
2895
2896         if (!io)
2897                 return 0;
2898
2899         io->msg.iov = io->msg.fast_iov;
2900         return recvmsg_copy_msghdr(&io->msg.msg, sr->msg, sr->msg_flags,
2901                                         &io->msg.uaddr, &io->msg.iov);
2902 #else
2903         return -EOPNOTSUPP;
2904 #endif
2905 }
2906
2907 static int io_recvmsg(struct io_kiocb *req, struct io_kiocb **nxt,
2908                       bool force_nonblock)
2909 {
2910 #if defined(CONFIG_NET)
2911         struct io_async_msghdr *kmsg = NULL;
2912         struct socket *sock;
2913         int ret;
2914
2915         if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL))
2916                 return -EINVAL;
2917
2918         sock = sock_from_file(req->file, &ret);
2919         if (sock) {
2920                 struct io_async_ctx io;
2921                 struct sockaddr_storage addr;
2922                 unsigned flags;
2923
2924                 if (req->io) {
2925                         kmsg = &req->io->msg;
2926                         kmsg->msg.msg_name = &addr;
2927                         /* if iov is set, it's allocated already */
2928                         if (!kmsg->iov)
2929                                 kmsg->iov = kmsg->fast_iov;
2930                         kmsg->msg.msg_iter.iov = kmsg->iov;
2931                 } else {
2932                         struct io_sr_msg *sr = &req->sr_msg;
2933
2934                         kmsg = &io.msg;
2935                         kmsg->msg.msg_name = &addr;
2936
2937                         io.msg.iov = io.msg.fast_iov;
2938                         ret = recvmsg_copy_msghdr(&io.msg.msg, sr->msg,
2939                                         sr->msg_flags, &io.msg.uaddr,
2940                                         &io.msg.iov);
2941                         if (ret)
2942                                 return ret;
2943                 }
2944
2945                 flags = req->sr_msg.msg_flags;
2946                 if (flags & MSG_DONTWAIT)
2947                         req->flags |= REQ_F_NOWAIT;
2948                 else if (force_nonblock)
2949                         flags |= MSG_DONTWAIT;
2950
2951                 ret = __sys_recvmsg_sock(sock, &kmsg->msg, req->sr_msg.msg,
2952                                                 kmsg->uaddr, flags);
2953                 if (force_nonblock && ret == -EAGAIN) {
2954                         if (req->io)
2955                                 return -EAGAIN;
2956                         if (io_alloc_async_ctx(req))
2957                                 return -ENOMEM;
2958                         memcpy(&req->io->msg, &io.msg, sizeof(io.msg));
2959                         req->work.func = io_sendrecv_async;
2960                         return -EAGAIN;
2961                 }
2962                 if (ret == -ERESTARTSYS)
2963                         ret = -EINTR;
2964         }
2965
2966         if (!io_wq_current_is_worker() && kmsg && kmsg->iov != kmsg->fast_iov)
2967                 kfree(kmsg->iov);
2968         io_cqring_add_event(req, ret);
2969         if (ret < 0)
2970                 req_set_fail_links(req);
2971         io_put_req_find_next(req, nxt);
2972         return 0;
2973 #else
2974         return -EOPNOTSUPP;
2975 #endif
2976 }
2977
2978 static int io_accept_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
2979 {
2980 #if defined(CONFIG_NET)
2981         struct io_accept *accept = &req->accept;
2982
2983         if (unlikely(req->ctx->flags & (IORING_SETUP_IOPOLL|IORING_SETUP_SQPOLL)))
2984                 return -EINVAL;
2985         if (sqe->ioprio || sqe->len || sqe->buf_index)
2986                 return -EINVAL;
2987
2988         accept->addr = u64_to_user_ptr(READ_ONCE(sqe->addr));
2989         accept->addr_len = u64_to_user_ptr(READ_ONCE(sqe->addr2));
2990         accept->flags = READ_ONCE(sqe->accept_flags);
2991         return 0;
2992 #else
2993         return -EOPNOTSUPP;
2994 #endif
2995 }
2996
2997 #if defined(CONFIG_NET)
2998 static int __io_accept(struct io_kiocb *req, struct io_kiocb **nxt,
2999                        bool force_nonblock)
3000 {
3001         struct io_accept *accept = &req->accept;
3002         unsigned file_flags;
3003         int ret;
3004
3005         file_flags = force_nonblock ? O_NONBLOCK : 0;
3006         ret = __sys_accept4_file(req->file, file_flags, accept->addr,
3007                                         accept->addr_len, accept->flags);
3008         if (ret == -EAGAIN && force_nonblock)
3009                 return -EAGAIN;
3010         if (ret == -ERESTARTSYS)
3011                 ret = -EINTR;
3012         if (ret < 0)
3013                 req_set_fail_links(req);
3014         io_cqring_add_event(req, ret);
3015         io_put_req_find_next(req, nxt);
3016         return 0;
3017 }
3018
3019 static void io_accept_finish(struct io_wq_work **workptr)
3020 {
3021         struct io_kiocb *req = container_of(*workptr, struct io_kiocb, work);
3022         struct io_kiocb *nxt = NULL;
3023
3024         if (io_req_cancelled(req))
3025                 return;
3026         __io_accept(req, &nxt, false);
3027         if (nxt)
3028                 io_wq_assign_next(workptr, nxt);
3029 }
3030 #endif
3031
3032 static int io_accept(struct io_kiocb *req, struct io_kiocb **nxt,
3033                      bool force_nonblock)
3034 {
3035 #if defined(CONFIG_NET)
3036         int ret;
3037
3038         ret = __io_accept(req, nxt, force_nonblock);
3039         if (ret == -EAGAIN && force_nonblock) {
3040                 req->work.func = io_accept_finish;
3041                 req->work.flags |= IO_WQ_WORK_NEEDS_FILES;
3042                 io_put_req(req);
3043                 return -EAGAIN;
3044         }
3045         return 0;
3046 #else
3047         return -EOPNOTSUPP;
3048 #endif
3049 }
3050
3051 static int io_connect_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
3052 {
3053 #if defined(CONFIG_NET)
3054         struct io_connect *conn = &req->connect;
3055         struct io_async_ctx *io = req->io;
3056
3057         if (unlikely(req->ctx->flags & (IORING_SETUP_IOPOLL|IORING_SETUP_SQPOLL)))
3058                 return -EINVAL;
3059         if (sqe->ioprio || sqe->len || sqe->buf_index || sqe->rw_flags)
3060                 return -EINVAL;
3061
3062         conn->addr = u64_to_user_ptr(READ_ONCE(sqe->addr));
3063         conn->addr_len =  READ_ONCE(sqe->addr2);
3064
3065         if (!io)
3066                 return 0;
3067
3068         return move_addr_to_kernel(conn->addr, conn->addr_len,
3069                                         &io->connect.address);
3070 #else
3071         return -EOPNOTSUPP;
3072 #endif
3073 }
3074
3075 static int io_connect(struct io_kiocb *req, struct io_kiocb **nxt,
3076                       bool force_nonblock)
3077 {
3078 #if defined(CONFIG_NET)
3079         struct io_async_ctx __io, *io;
3080         unsigned file_flags;
3081         int ret;
3082
3083         if (req->io) {
3084                 io = req->io;
3085         } else {
3086                 ret = move_addr_to_kernel(req->connect.addr,
3087                                                 req->connect.addr_len,
3088                                                 &__io.connect.address);
3089                 if (ret)
3090                         goto out;
3091                 io = &__io;
3092         }
3093
3094         file_flags = force_nonblock ? O_NONBLOCK : 0;
3095
3096         ret = __sys_connect_file(req->file, &io->connect.address,
3097                                         req->connect.addr_len, file_flags);
3098         if ((ret == -EAGAIN || ret == -EINPROGRESS) && force_nonblock) {
3099                 if (req->io)
3100                         return -EAGAIN;
3101                 if (io_alloc_async_ctx(req)) {
3102                         ret = -ENOMEM;
3103                         goto out;
3104                 }
3105                 memcpy(&req->io->connect, &__io.connect, sizeof(__io.connect));
3106                 return -EAGAIN;
3107         }
3108         if (ret == -ERESTARTSYS)
3109                 ret = -EINTR;
3110 out:
3111         if (ret < 0)
3112                 req_set_fail_links(req);
3113         io_cqring_add_event(req, ret);
3114         io_put_req_find_next(req, nxt);
3115         return 0;
3116 #else
3117         return -EOPNOTSUPP;
3118 #endif
3119 }
3120
3121 static void io_poll_remove_one(struct io_kiocb *req)
3122 {
3123         struct io_poll_iocb *poll = &req->poll;
3124
3125         spin_lock(&poll->head->lock);
3126         WRITE_ONCE(poll->canceled, true);
3127         if (!list_empty(&poll->wait.entry)) {
3128                 list_del_init(&poll->wait.entry);
3129                 io_queue_async_work(req);
3130         }
3131         spin_unlock(&poll->head->lock);
3132         hash_del(&req->hash_node);
3133 }
3134
3135 static void io_poll_remove_all(struct io_ring_ctx *ctx)
3136 {
3137         struct hlist_node *tmp;
3138         struct io_kiocb *req;
3139         int i;
3140
3141         spin_lock_irq(&ctx->completion_lock);
3142         for (i = 0; i < (1U << ctx->cancel_hash_bits); i++) {
3143                 struct hlist_head *list;
3144
3145                 list = &ctx->cancel_hash[i];
3146                 hlist_for_each_entry_safe(req, tmp, list, hash_node)
3147                         io_poll_remove_one(req);
3148         }
3149         spin_unlock_irq(&ctx->completion_lock);
3150 }
3151
3152 static int io_poll_cancel(struct io_ring_ctx *ctx, __u64 sqe_addr)
3153 {
3154         struct hlist_head *list;
3155         struct io_kiocb *req;
3156
3157         list = &ctx->cancel_hash[hash_long(sqe_addr, ctx->cancel_hash_bits)];
3158         hlist_for_each_entry(req, list, hash_node) {
3159                 if (sqe_addr == req->user_data) {
3160                         io_poll_remove_one(req);
3161                         return 0;
3162                 }
3163         }
3164
3165         return -ENOENT;
3166 }
3167
3168 static int io_poll_remove_prep(struct io_kiocb *req,
3169                                const struct io_uring_sqe *sqe)
3170 {
3171         if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL))
3172                 return -EINVAL;
3173         if (sqe->ioprio || sqe->off || sqe->len || sqe->buf_index ||
3174             sqe->poll_events)
3175                 return -EINVAL;
3176
3177         req->poll.addr = READ_ONCE(sqe->addr);
3178         return 0;
3179 }
3180
3181 /*
3182  * Find a running poll command that matches one specified in sqe->addr,
3183  * and remove it if found.
3184  */
3185 static int io_poll_remove(struct io_kiocb *req)
3186 {
3187         struct io_ring_ctx *ctx = req->ctx;
3188         u64 addr;
3189         int ret;
3190
3191         addr = req->poll.addr;
3192         spin_lock_irq(&ctx->completion_lock);
3193         ret = io_poll_cancel(ctx, addr);
3194         spin_unlock_irq(&ctx->completion_lock);
3195
3196         io_cqring_add_event(req, ret);
3197         if (ret < 0)
3198                 req_set_fail_links(req);
3199         io_put_req(req);
3200         return 0;
3201 }
3202
3203 static void io_poll_complete(struct io_kiocb *req, __poll_t mask, int error)
3204 {
3205         struct io_ring_ctx *ctx = req->ctx;
3206
3207         req->poll.done = true;
3208         if (error)
3209                 io_cqring_fill_event(req, error);
3210         else
3211                 io_cqring_fill_event(req, mangle_poll(mask));
3212         io_commit_cqring(ctx);
3213 }
3214
3215 static void io_poll_complete_work(struct io_wq_work **workptr)
3216 {
3217         struct io_wq_work *work = *workptr;
3218         struct io_kiocb *req = container_of(work, struct io_kiocb, work);
3219         struct io_poll_iocb *poll = &req->poll;
3220         struct poll_table_struct pt = { ._key = poll->events };
3221         struct io_ring_ctx *ctx = req->ctx;
3222         struct io_kiocb *nxt = NULL;
3223         __poll_t mask = 0;
3224         int ret = 0;
3225
3226         if (work->flags & IO_WQ_WORK_CANCEL) {
3227                 WRITE_ONCE(poll->canceled, true);
3228                 ret = -ECANCELED;
3229         } else if (READ_ONCE(poll->canceled)) {
3230                 ret = -ECANCELED;
3231         }
3232
3233         if (ret != -ECANCELED)
3234                 mask = vfs_poll(poll->file, &pt) & poll->events;
3235
3236         /*
3237          * Note that ->ki_cancel callers also delete iocb from active_reqs after
3238          * calling ->ki_cancel.  We need the ctx_lock roundtrip here to
3239          * synchronize with them.  In the cancellation case the list_del_init
3240          * itself is not actually needed, but harmless so we keep it in to
3241          * avoid further branches in the fast path.
3242          */
3243         spin_lock_irq(&ctx->completion_lock);
3244         if (!mask && ret != -ECANCELED) {
3245                 add_wait_queue(poll->head, &poll->wait);
3246                 spin_unlock_irq(&ctx->completion_lock);
3247                 return;
3248         }
3249         hash_del(&req->hash_node);
3250         io_poll_complete(req, mask, ret);
3251         spin_unlock_irq(&ctx->completion_lock);
3252
3253         io_cqring_ev_posted(ctx);
3254
3255         if (ret < 0)
3256                 req_set_fail_links(req);
3257         io_put_req_find_next(req, &nxt);
3258         if (nxt)
3259                 io_wq_assign_next(workptr, nxt);
3260 }
3261
3262 static void __io_poll_flush(struct io_ring_ctx *ctx, struct llist_node *nodes)
3263 {
3264         struct io_kiocb *req, *tmp;
3265         struct req_batch rb;
3266
3267         rb.to_free = rb.need_iter = 0;
3268         spin_lock_irq(&ctx->completion_lock);
3269         llist_for_each_entry_safe(req, tmp, nodes, llist_node) {
3270                 hash_del(&req->hash_node);
3271                 io_poll_complete(req, req->result, 0);
3272
3273                 if (refcount_dec_and_test(&req->refs) &&
3274                     !io_req_multi_free(&rb, req)) {
3275                         req->flags |= REQ_F_COMP_LOCKED;
3276                         io_free_req(req);
3277                 }
3278         }
3279         spin_unlock_irq(&ctx->completion_lock);
3280
3281         io_cqring_ev_posted(ctx);
3282         io_free_req_many(ctx, &rb);
3283 }
3284
3285 static void io_poll_flush(struct io_wq_work **workptr)
3286 {
3287         struct io_kiocb *req = container_of(*workptr, struct io_kiocb, work);
3288         struct llist_node *nodes;
3289
3290         nodes = llist_del_all(&req->ctx->poll_llist);
3291         if (nodes)
3292                 __io_poll_flush(req->ctx, nodes);
3293 }
3294
3295 static int io_poll_wake(struct wait_queue_entry *wait, unsigned mode, int sync,
3296                         void *key)
3297 {
3298         struct io_poll_iocb *poll = wait->private;
3299         struct io_kiocb *req = container_of(poll, struct io_kiocb, poll);
3300         struct io_ring_ctx *ctx = req->ctx;
3301         __poll_t mask = key_to_poll(key);
3302
3303         /* for instances that support it check for an event match first: */
3304         if (mask && !(mask & poll->events))
3305                 return 0;
3306
3307         list_del_init(&poll->wait.entry);
3308
3309         /*
3310          * Run completion inline if we can. We're using trylock here because
3311          * we are violating the completion_lock -> poll wq lock ordering.
3312          * If we have a link timeout we're going to need the completion_lock
3313          * for finalizing the request, mark us as having grabbed that already.
3314          */
3315         if (mask) {
3316                 unsigned long flags;
3317
3318                 if (llist_empty(&ctx->poll_llist) &&
3319                     spin_trylock_irqsave(&ctx->completion_lock, flags)) {
3320                         hash_del(&req->hash_node);
3321                         io_poll_complete(req, mask, 0);
3322                         req->flags |= REQ_F_COMP_LOCKED;
3323                         io_put_req(req);
3324                         spin_unlock_irqrestore(&ctx->completion_lock, flags);
3325
3326                         io_cqring_ev_posted(ctx);
3327                         req = NULL;
3328                 } else {
3329                         req->result = mask;
3330                         req->llist_node.next = NULL;
3331                         /* if the list wasn't empty, we're done */
3332                         if (!llist_add(&req->llist_node, &ctx->poll_llist))
3333                                 req = NULL;
3334                         else
3335                                 req->work.func = io_poll_flush;
3336                 }
3337         }
3338         if (req)
3339                 io_queue_async_work(req);
3340
3341         return 1;
3342 }
3343
3344 struct io_poll_table {
3345         struct poll_table_struct pt;
3346         struct io_kiocb *req;
3347         int error;
3348 };
3349
3350 static void io_poll_queue_proc(struct file *file, struct wait_queue_head *head,
3351                                struct poll_table_struct *p)
3352 {
3353         struct io_poll_table *pt = container_of(p, struct io_poll_table, pt);
3354
3355         if (unlikely(pt->req->poll.head)) {
3356                 pt->error = -EINVAL;
3357                 return;
3358         }
3359
3360         pt->error = 0;
3361         pt->req->poll.head = head;
3362         add_wait_queue(head, &pt->req->poll.wait);
3363 }
3364
3365 static void io_poll_req_insert(struct io_kiocb *req)
3366 {
3367         struct io_ring_ctx *ctx = req->ctx;
3368         struct hlist_head *list;
3369
3370         list = &ctx->cancel_hash[hash_long(req->user_data, ctx->cancel_hash_bits)];
3371         hlist_add_head(&req->hash_node, list);
3372 }
3373
3374 static int io_poll_add_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
3375 {
3376         struct io_poll_iocb *poll = &req->poll;
3377         u16 events;
3378
3379         if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL))
3380                 return -EINVAL;
3381         if (sqe->addr || sqe->ioprio || sqe->off || sqe->len || sqe->buf_index)
3382                 return -EINVAL;
3383         if (!poll->file)
3384                 return -EBADF;
3385
3386         events = READ_ONCE(sqe->poll_events);
3387         poll->events = demangle_poll(events) | EPOLLERR | EPOLLHUP;
3388         return 0;
3389 }
3390
3391 static int io_poll_add(struct io_kiocb *req, struct io_kiocb **nxt)
3392 {
3393         struct io_poll_iocb *poll = &req->poll;
3394         struct io_ring_ctx *ctx = req->ctx;
3395         struct io_poll_table ipt;
3396         bool cancel = false;
3397         __poll_t mask;
3398
3399         INIT_IO_WORK(&req->work, io_poll_complete_work);
3400         INIT_HLIST_NODE(&req->hash_node);
3401
3402         poll->head = NULL;
3403         poll->done = false;
3404         poll->canceled = false;
3405
3406         ipt.pt._qproc = io_poll_queue_proc;
3407         ipt.pt._key = poll->events;
3408         ipt.req = req;
3409         ipt.error = -EINVAL; /* same as no support for IOCB_CMD_POLL */
3410
3411         /* initialized the list so that we can do list_empty checks */
3412         INIT_LIST_HEAD(&poll->wait.entry);
3413         init_waitqueue_func_entry(&poll->wait, io_poll_wake);
3414         poll->wait.private = poll;
3415
3416         INIT_LIST_HEAD(&req->list);
3417
3418         mask = vfs_poll(poll->file, &ipt.pt) & poll->events;
3419
3420         spin_lock_irq(&ctx->completion_lock);
3421         if (likely(poll->head)) {
3422                 spin_lock(&poll->head->lock);
3423                 if (unlikely(list_empty(&poll->wait.entry))) {
3424                         if (ipt.error)
3425                                 cancel = true;
3426                         ipt.error = 0;
3427                         mask = 0;
3428                 }
3429                 if (mask || ipt.error)
3430                         list_del_init(&poll->wait.entry);
3431                 else if (cancel)
3432                         WRITE_ONCE(poll->canceled, true);
3433                 else if (!poll->done) /* actually waiting for an event */
3434                         io_poll_req_insert(req);
3435                 spin_unlock(&poll->head->lock);
3436         }
3437         if (mask) { /* no async, we'd stolen it */
3438                 ipt.error = 0;
3439                 io_poll_complete(req, mask, 0);
3440         }
3441         spin_unlock_irq(&ctx->completion_lock);
3442
3443         if (mask) {
3444                 io_cqring_ev_posted(ctx);
3445                 io_put_req_find_next(req, nxt);
3446         }
3447         return ipt.error;
3448 }
3449
3450 static enum hrtimer_restart io_timeout_fn(struct hrtimer *timer)
3451 {
3452         struct io_timeout_data *data = container_of(timer,
3453                                                 struct io_timeout_data, timer);
3454         struct io_kiocb *req = data->req;
3455         struct io_ring_ctx *ctx = req->ctx;
3456         unsigned long flags;
3457
3458         atomic_inc(&ctx->cq_timeouts);
3459
3460         spin_lock_irqsave(&ctx->completion_lock, flags);
3461         /*
3462          * We could be racing with timeout deletion. If the list is empty,
3463          * then timeout lookup already found it and will be handling it.
3464          */
3465         if (!list_empty(&req->list)) {
3466                 struct io_kiocb *prev;
3467
3468                 /*
3469                  * Adjust the reqs sequence before the current one because it
3470                  * will consume a slot in the cq_ring and the cq_tail
3471                  * pointer will be increased, otherwise other timeout reqs may
3472                  * return in advance without waiting for enough wait_nr.
3473                  */
3474                 prev = req;
3475                 list_for_each_entry_continue_reverse(prev, &ctx->timeout_list, list)
3476                         prev->sequence++;
3477                 list_del_init(&req->list);
3478         }
3479
3480         io_cqring_fill_event(req, -ETIME);
3481         io_commit_cqring(ctx);
3482         spin_unlock_irqrestore(&ctx->completion_lock, flags);
3483
3484         io_cqring_ev_posted(ctx);
3485         req_set_fail_links(req);
3486         io_put_req(req);
3487         return HRTIMER_NORESTART;
3488 }
3489
3490 static int io_timeout_cancel(struct io_ring_ctx *ctx, __u64 user_data)
3491 {
3492         struct io_kiocb *req;
3493         int ret = -ENOENT;
3494
3495         list_for_each_entry(req, &ctx->timeout_list, list) {
3496                 if (user_data == req->user_data) {
3497                         list_del_init(&req->list);
3498                         ret = 0;
3499                         break;
3500                 }
3501         }
3502
3503         if (ret == -ENOENT)
3504                 return ret;
3505
3506         ret = hrtimer_try_to_cancel(&req->io->timeout.timer);
3507         if (ret == -1)
3508                 return -EALREADY;
3509
3510         req_set_fail_links(req);
3511         io_cqring_fill_event(req, -ECANCELED);
3512         io_put_req(req);
3513         return 0;
3514 }
3515
3516 static int io_timeout_remove_prep(struct io_kiocb *req,
3517                                   const struct io_uring_sqe *sqe)
3518 {
3519         if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL))
3520                 return -EINVAL;
3521         if (sqe->flags || sqe->ioprio || sqe->buf_index || sqe->len)
3522                 return -EINVAL;
3523
3524         req->timeout.addr = READ_ONCE(sqe->addr);
3525         req->timeout.flags = READ_ONCE(sqe->timeout_flags);
3526         if (req->timeout.flags)
3527                 return -EINVAL;
3528
3529         return 0;
3530 }
3531
3532 /*
3533  * Remove or update an existing timeout command
3534  */
3535 static int io_timeout_remove(struct io_kiocb *req)
3536 {
3537         struct io_ring_ctx *ctx = req->ctx;
3538         int ret;
3539
3540         spin_lock_irq(&ctx->completion_lock);
3541         ret = io_timeout_cancel(ctx, req->timeout.addr);
3542
3543         io_cqring_fill_event(req, ret);
3544         io_commit_cqring(ctx);
3545         spin_unlock_irq(&ctx->completion_lock);
3546         io_cqring_ev_posted(ctx);
3547         if (ret < 0)
3548                 req_set_fail_links(req);
3549         io_put_req(req);
3550         return 0;
3551 }
3552
3553 static int io_timeout_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe,
3554                            bool is_timeout_link)
3555 {
3556         struct io_timeout_data *data;
3557         unsigned flags;
3558
3559         if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL))
3560                 return -EINVAL;
3561         if (sqe->ioprio || sqe->buf_index || sqe->len != 1)
3562                 return -EINVAL;
3563         if (sqe->off && is_timeout_link)
3564                 return -EINVAL;
3565         flags = READ_ONCE(sqe->timeout_flags);
3566         if (flags & ~IORING_TIMEOUT_ABS)
3567                 return -EINVAL;
3568
3569         req->timeout.count = READ_ONCE(sqe->off);
3570
3571         if (!req->io && io_alloc_async_ctx(req))
3572                 return -ENOMEM;
3573
3574         data = &req->io->timeout;
3575         data->req = req;
3576         req->flags |= REQ_F_TIMEOUT;
3577
3578         if (get_timespec64(&data->ts, u64_to_user_ptr(sqe->addr)))
3579                 return -EFAULT;
3580
3581         if (flags & IORING_TIMEOUT_ABS)
3582                 data->mode = HRTIMER_MODE_ABS;
3583         else
3584                 data->mode = HRTIMER_MODE_REL;
3585
3586         hrtimer_init(&data->timer, CLOCK_MONOTONIC, data->mode);
3587         return 0;
3588 }
3589
3590 static int io_timeout(struct io_kiocb *req)
3591 {
3592         unsigned count;
3593         struct io_ring_ctx *ctx = req->ctx;
3594         struct io_timeout_data *data;
3595         struct list_head *entry;
3596         unsigned span = 0;
3597
3598         data = &req->io->timeout;
3599
3600         /*
3601          * sqe->off holds how many events that need to occur for this
3602          * timeout event to be satisfied. If it isn't set, then this is
3603          * a pure timeout request, sequence isn't used.
3604          */
3605         count = req->timeout.count;
3606         if (!count) {
3607                 req->flags |= REQ_F_TIMEOUT_NOSEQ;
3608                 spin_lock_irq(&ctx->completion_lock);
3609                 entry = ctx->timeout_list.prev;
3610                 goto add;
3611         }
3612
3613         req->sequence = ctx->cached_sq_head + count - 1;
3614         data->seq_offset = count;
3615
3616         /*
3617          * Insertion sort, ensuring the first entry in the list is always
3618          * the one we need first.
3619          */
3620         spin_lock_irq(&ctx->completion_lock);
3621         list_for_each_prev(entry, &ctx->timeout_list) {
3622                 struct io_kiocb *nxt = list_entry(entry, struct io_kiocb, list);
3623                 unsigned nxt_sq_head;
3624                 long long tmp, tmp_nxt;
3625                 u32 nxt_offset = nxt->io->timeout.seq_offset;
3626
3627                 if (nxt->flags & REQ_F_TIMEOUT_NOSEQ)
3628                         continue;
3629
3630                 /*
3631                  * Since cached_sq_head + count - 1 can overflow, use type long
3632                  * long to store it.
3633                  */
3634                 tmp = (long long)ctx->cached_sq_head + count - 1;
3635                 nxt_sq_head = nxt->sequence - nxt_offset + 1;
3636                 tmp_nxt = (long long)nxt_sq_head + nxt_offset - 1;
3637
3638                 /*
3639                  * cached_sq_head may overflow, and it will never overflow twice
3640                  * once there is some timeout req still be valid.
3641                  */
3642                 if (ctx->cached_sq_head < nxt_sq_head)
3643                         tmp += UINT_MAX;
3644
3645                 if (tmp > tmp_nxt)
3646                         break;
3647
3648                 /*
3649                  * Sequence of reqs after the insert one and itself should
3650                  * be adjusted because each timeout req consumes a slot.
3651                  */
3652                 span++;
3653                 nxt->sequence++;
3654         }
3655         req->sequence -= span;
3656 add:
3657         list_add(&req->list, entry);
3658         data->timer.function = io_timeout_fn;
3659         hrtimer_start(&data->timer, timespec64_to_ktime(data->ts), data->mode);
3660         spin_unlock_irq(&ctx->completion_lock);
3661         return 0;
3662 }
3663
3664 static bool io_cancel_cb(struct io_wq_work *work, void *data)
3665 {
3666         struct io_kiocb *req = container_of(work, struct io_kiocb, work);
3667
3668         return req->user_data == (unsigned long) data;
3669 }
3670
3671 static int io_async_cancel_one(struct io_ring_ctx *ctx, void *sqe_addr)
3672 {
3673         enum io_wq_cancel cancel_ret;
3674         int ret = 0;
3675
3676         cancel_ret = io_wq_cancel_cb(ctx->io_wq, io_cancel_cb, sqe_addr);
3677         switch (cancel_ret) {
3678         case IO_WQ_CANCEL_OK:
3679                 ret = 0;
3680                 break;
3681         case IO_WQ_CANCEL_RUNNING:
3682                 ret = -EALREADY;
3683                 break;
3684         case IO_WQ_CANCEL_NOTFOUND:
3685                 ret = -ENOENT;
3686                 break;
3687         }
3688
3689         return ret;
3690 }
3691
3692 static void io_async_find_and_cancel(struct io_ring_ctx *ctx,
3693                                      struct io_kiocb *req, __u64 sqe_addr,
3694                                      struct io_kiocb **nxt, int success_ret)
3695 {
3696         unsigned long flags;
3697         int ret;
3698
3699         ret = io_async_cancel_one(ctx, (void *) (unsigned long) sqe_addr);
3700         if (ret != -ENOENT) {
3701                 spin_lock_irqsave(&ctx->completion_lock, flags);
3702                 goto done;
3703         }
3704
3705         spin_lock_irqsave(&ctx->completion_lock, flags);
3706         ret = io_timeout_cancel(ctx, sqe_addr);
3707         if (ret != -ENOENT)
3708                 goto done;
3709         ret = io_poll_cancel(ctx, sqe_addr);
3710 done:
3711         if (!ret)
3712                 ret = success_ret;
3713         io_cqring_fill_event(req, ret);
3714         io_commit_cqring(ctx);
3715         spin_unlock_irqrestore(&ctx->completion_lock, flags);
3716         io_cqring_ev_posted(ctx);
3717
3718         if (ret < 0)
3719                 req_set_fail_links(req);
3720         io_put_req_find_next(req, nxt);
3721 }
3722
3723 static int io_async_cancel_prep(struct io_kiocb *req,
3724                                 const struct io_uring_sqe *sqe)
3725 {
3726         if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL))
3727                 return -EINVAL;
3728         if (sqe->flags || sqe->ioprio || sqe->off || sqe->len ||
3729             sqe->cancel_flags)
3730                 return -EINVAL;
3731
3732         req->cancel.addr = READ_ONCE(sqe->addr);
3733         return 0;
3734 }
3735
3736 static int io_async_cancel(struct io_kiocb *req, struct io_kiocb **nxt)
3737 {
3738         struct io_ring_ctx *ctx = req->ctx;
3739
3740         io_async_find_and_cancel(ctx, req, req->cancel.addr, nxt, 0);
3741         return 0;
3742 }
3743
3744 static int io_files_update_prep(struct io_kiocb *req,
3745                                 const struct io_uring_sqe *sqe)
3746 {
3747         if (sqe->flags || sqe->ioprio || sqe->rw_flags)
3748                 return -EINVAL;
3749
3750         req->files_update.offset = READ_ONCE(sqe->off);
3751         req->files_update.nr_args = READ_ONCE(sqe->len);
3752         if (!req->files_update.nr_args)
3753                 return -EINVAL;
3754         req->files_update.arg = READ_ONCE(sqe->addr);
3755         return 0;
3756 }
3757
3758 static int io_files_update(struct io_kiocb *req, bool force_nonblock)
3759 {
3760         struct io_ring_ctx *ctx = req->ctx;
3761         struct io_uring_files_update up;
3762         int ret;
3763
3764         if (force_nonblock) {
3765                 req->work.flags |= IO_WQ_WORK_NEEDS_FILES;
3766                 return -EAGAIN;
3767         }
3768
3769         up.offset = req->files_update.offset;
3770         up.fds = req->files_update.arg;
3771
3772         mutex_lock(&ctx->uring_lock);
3773         ret = __io_sqe_files_update(ctx, &up, req->files_update.nr_args);
3774         mutex_unlock(&ctx->uring_lock);
3775
3776         if (ret < 0)
3777                 req_set_fail_links(req);
3778         io_cqring_add_event(req, ret);
3779         io_put_req(req);
3780         return 0;
3781 }
3782
3783 static int io_req_defer_prep(struct io_kiocb *req,
3784                              const struct io_uring_sqe *sqe)
3785 {
3786         ssize_t ret = 0;
3787
3788         switch (req->opcode) {
3789         case IORING_OP_NOP:
3790                 break;
3791         case IORING_OP_READV:
3792         case IORING_OP_READ_FIXED:
3793         case IORING_OP_READ:
3794                 ret = io_read_prep(req, sqe, true);
3795                 break;
3796         case IORING_OP_WRITEV:
3797         case IORING_OP_WRITE_FIXED:
3798         case IORING_OP_WRITE:
3799                 ret = io_write_prep(req, sqe, true);
3800                 break;
3801         case IORING_OP_POLL_ADD:
3802                 ret = io_poll_add_prep(req, sqe);
3803                 break;
3804         case IORING_OP_POLL_REMOVE:
3805                 ret = io_poll_remove_prep(req, sqe);
3806                 break;
3807         case IORING_OP_FSYNC:
3808                 ret = io_prep_fsync(req, sqe);
3809                 break;
3810         case IORING_OP_SYNC_FILE_RANGE:
3811                 ret = io_prep_sfr(req, sqe);
3812                 break;
3813         case IORING_OP_SENDMSG:
3814                 ret = io_sendmsg_prep(req, sqe);
3815                 break;
3816         case IORING_OP_RECVMSG:
3817                 ret = io_recvmsg_prep(req, sqe);
3818                 break;
3819         case IORING_OP_CONNECT:
3820                 ret = io_connect_prep(req, sqe);
3821                 break;
3822         case IORING_OP_TIMEOUT:
3823                 ret = io_timeout_prep(req, sqe, false);
3824                 break;
3825         case IORING_OP_TIMEOUT_REMOVE:
3826                 ret = io_timeout_remove_prep(req, sqe);
3827                 break;
3828         case IORING_OP_ASYNC_CANCEL:
3829                 ret = io_async_cancel_prep(req, sqe);
3830                 break;
3831         case IORING_OP_LINK_TIMEOUT:
3832                 ret = io_timeout_prep(req, sqe, true);
3833                 break;
3834         case IORING_OP_ACCEPT:
3835                 ret = io_accept_prep(req, sqe);
3836                 break;
3837         case IORING_OP_FALLOCATE:
3838                 ret = io_fallocate_prep(req, sqe);
3839                 break;
3840         case IORING_OP_OPENAT:
3841                 ret = io_openat_prep(req, sqe);
3842                 break;
3843         case IORING_OP_CLOSE:
3844                 ret = io_close_prep(req, sqe);
3845                 break;
3846         case IORING_OP_FILES_UPDATE:
3847                 ret = io_files_update_prep(req, sqe);
3848                 break;
3849         case IORING_OP_STATX:
3850                 ret = io_statx_prep(req, sqe);
3851                 break;
3852         case IORING_OP_FADVISE:
3853                 ret = io_fadvise_prep(req, sqe);
3854                 break;
3855         case IORING_OP_MADVISE:
3856                 ret = io_madvise_prep(req, sqe);
3857                 break;
3858         default:
3859                 printk_once(KERN_WARNING "io_uring: unhandled opcode %d\n",
3860                                 req->opcode);
3861                 ret = -EINVAL;
3862                 break;
3863         }
3864
3865         return ret;
3866 }
3867
3868 static int io_req_defer(struct io_kiocb *req, const struct io_uring_sqe *sqe)
3869 {
3870         struct io_ring_ctx *ctx = req->ctx;
3871         int ret;
3872
3873         /* Still need defer if there is pending req in defer list. */
3874         if (!req_need_defer(req) && list_empty(&ctx->defer_list))
3875                 return 0;
3876
3877         if (!req->io && io_alloc_async_ctx(req))
3878                 return -EAGAIN;
3879
3880         ret = io_req_defer_prep(req, sqe);
3881         if (ret < 0)
3882                 return ret;
3883
3884         spin_lock_irq(&ctx->completion_lock);
3885         if (!req_need_defer(req) && list_empty(&ctx->defer_list)) {
3886                 spin_unlock_irq(&ctx->completion_lock);
3887                 return 0;
3888         }
3889
3890         trace_io_uring_defer(ctx, req, req->user_data);
3891         list_add_tail(&req->list, &ctx->defer_list);
3892         spin_unlock_irq(&ctx->completion_lock);
3893         return -EIOCBQUEUED;
3894 }
3895
3896 static int io_issue_sqe(struct io_kiocb *req, const struct io_uring_sqe *sqe,
3897                         struct io_kiocb **nxt, bool force_nonblock)
3898 {
3899         struct io_ring_ctx *ctx = req->ctx;
3900         int ret;
3901
3902         switch (req->opcode) {
3903         case IORING_OP_NOP:
3904                 ret = io_nop(req);
3905                 break;
3906         case IORING_OP_READV:
3907         case IORING_OP_READ_FIXED:
3908         case IORING_OP_READ:
3909                 if (sqe) {
3910                         ret = io_read_prep(req, sqe, force_nonblock);
3911                         if (ret < 0)
3912                                 break;
3913                 }
3914                 ret = io_read(req, nxt, force_nonblock);
3915                 break;
3916         case IORING_OP_WRITEV:
3917         case IORING_OP_WRITE_FIXED:
3918         case IORING_OP_WRITE:
3919                 if (sqe) {
3920                         ret = io_write_prep(req, sqe, force_nonblock);
3921                         if (ret < 0)
3922                                 break;
3923                 }
3924                 ret = io_write(req, nxt, force_nonblock);
3925                 break;
3926         case IORING_OP_FSYNC:
3927                 if (sqe) {
3928                         ret = io_prep_fsync(req, sqe);
3929                         if (ret < 0)
3930                                 break;
3931                 }
3932                 ret = io_fsync(req, nxt, force_nonblock);
3933                 break;
3934         case IORING_OP_POLL_ADD:
3935                 if (sqe) {
3936                         ret = io_poll_add_prep(req, sqe);
3937                         if (ret)
3938                                 break;
3939                 }
3940                 ret = io_poll_add(req, nxt);
3941                 break;
3942         case IORING_OP_POLL_REMOVE:
3943                 if (sqe) {
3944                         ret = io_poll_remove_prep(req, sqe);
3945                         if (ret < 0)
3946                                 break;
3947                 }
3948                 ret = io_poll_remove(req);
3949                 break;
3950         case IORING_OP_SYNC_FILE_RANGE:
3951                 if (sqe) {
3952                         ret = io_prep_sfr(req, sqe);
3953                         if (ret < 0)
3954                                 break;
3955                 }
3956                 ret = io_sync_file_range(req, nxt, force_nonblock);
3957                 break;
3958         case IORING_OP_SENDMSG:
3959                 if (sqe) {
3960                         ret = io_sendmsg_prep(req, sqe);
3961                         if (ret < 0)
3962                                 break;
3963                 }
3964                 ret = io_sendmsg(req, nxt, force_nonblock);
3965                 break;
3966         case IORING_OP_RECVMSG:
3967                 if (sqe) {
3968                         ret = io_recvmsg_prep(req, sqe);
3969                         if (ret)
3970                                 break;
3971                 }
3972                 ret = io_recvmsg(req, nxt, force_nonblock);
3973                 break;
3974         case IORING_OP_TIMEOUT:
3975                 if (sqe) {
3976                         ret = io_timeout_prep(req, sqe, false);
3977                         if (ret)
3978                                 break;
3979                 }
3980                 ret = io_timeout(req);
3981                 break;
3982         case IORING_OP_TIMEOUT_REMOVE:
3983                 if (sqe) {
3984                         ret = io_timeout_remove_prep(req, sqe);
3985                         if (ret)
3986                                 break;
3987                 }
3988                 ret = io_timeout_remove(req);
3989                 break;
3990         case IORING_OP_ACCEPT:
3991                 if (sqe) {
3992                         ret = io_accept_prep(req, sqe);
3993                         if (ret)
3994                                 break;
3995                 }
3996                 ret = io_accept(req, nxt, force_nonblock);
3997                 break;
3998         case IORING_OP_CONNECT:
3999                 if (sqe) {
4000                         ret = io_connect_prep(req, sqe);
4001                         if (ret)
4002                                 break;
4003                 }
4004                 ret = io_connect(req, nxt, force_nonblock);
4005                 break;
4006         case IORING_OP_ASYNC_CANCEL:
4007                 if (sqe) {
4008                         ret = io_async_cancel_prep(req, sqe);
4009                         if (ret)
4010                                 break;
4011                 }
4012                 ret = io_async_cancel(req, nxt);
4013                 break;
4014         case IORING_OP_FALLOCATE:
4015                 if (sqe) {
4016                         ret = io_fallocate_prep(req, sqe);
4017                         if (ret)
4018                                 break;
4019                 }
4020                 ret = io_fallocate(req, nxt, force_nonblock);
4021                 break;
4022         case IORING_OP_OPENAT:
4023                 if (sqe) {
4024                         ret = io_openat_prep(req, sqe);
4025                         if (ret)
4026                                 break;
4027                 }
4028                 ret = io_openat(req, nxt, force_nonblock);
4029                 break;
4030         case IORING_OP_CLOSE:
4031                 if (sqe) {
4032                         ret = io_close_prep(req, sqe);
4033                         if (ret)
4034                                 break;
4035                 }
4036                 ret = io_close(req, nxt, force_nonblock);
4037                 break;
4038         case IORING_OP_FILES_UPDATE:
4039                 if (sqe) {
4040                         ret = io_files_update_prep(req, sqe);
4041                         if (ret)
4042                                 break;
4043                 }
4044                 ret = io_files_update(req, force_nonblock);
4045                 break;
4046         case IORING_OP_STATX:
4047                 if (sqe) {
4048                         ret = io_statx_prep(req, sqe);
4049                         if (ret)
4050                                 break;
4051                 }
4052                 ret = io_statx(req, nxt, force_nonblock);
4053                 break;
4054         case IORING_OP_FADVISE:
4055                 if (sqe) {
4056                         ret = io_fadvise_prep(req, sqe);
4057                         if (ret)
4058                                 break;
4059                 }
4060                 ret = io_fadvise(req, nxt, force_nonblock);
4061                 break;
4062         case IORING_OP_MADVISE:
4063                 if (sqe) {
4064                         ret = io_madvise_prep(req, sqe);
4065                         if (ret)
4066                                 break;
4067                 }
4068                 ret = io_madvise(req, nxt, force_nonblock);
4069                 break;
4070         default:
4071                 ret = -EINVAL;
4072                 break;
4073         }
4074
4075         if (ret)
4076                 return ret;
4077
4078         if (ctx->flags & IORING_SETUP_IOPOLL) {
4079                 const bool in_async = io_wq_current_is_worker();
4080
4081                 if (req->result == -EAGAIN)
4082                         return -EAGAIN;
4083
4084                 /* workqueue context doesn't hold uring_lock, grab it now */
4085                 if (in_async)
4086                         mutex_lock(&ctx->uring_lock);
4087
4088                 io_iopoll_req_issued(req);
4089
4090                 if (in_async)
4091                         mutex_unlock(&ctx->uring_lock);
4092         }
4093
4094         return 0;
4095 }
4096
4097 static void io_wq_submit_work(struct io_wq_work **workptr)
4098 {
4099         struct io_wq_work *work = *workptr;
4100         struct io_kiocb *req = container_of(work, struct io_kiocb, work);
4101         struct io_kiocb *nxt = NULL;
4102         int ret = 0;
4103
4104         /* if NO_CANCEL is set, we must still run the work */
4105         if ((work->flags & (IO_WQ_WORK_CANCEL|IO_WQ_WORK_NO_CANCEL)) ==
4106                                 IO_WQ_WORK_CANCEL) {
4107                 ret = -ECANCELED;
4108         }
4109
4110         if (!ret) {
4111                 req->has_user = (work->flags & IO_WQ_WORK_HAS_MM) != 0;
4112                 req->in_async = true;
4113                 do {
4114                         ret = io_issue_sqe(req, NULL, &nxt, false);
4115                         /*
4116                          * We can get EAGAIN for polled IO even though we're
4117                          * forcing a sync submission from here, since we can't
4118                          * wait for request slots on the block side.
4119                          */
4120                         if (ret != -EAGAIN)
4121                                 break;
4122                         cond_resched();
4123                 } while (1);
4124         }
4125
4126         /* drop submission reference */
4127         io_put_req(req);
4128
4129         if (ret) {
4130                 req_set_fail_links(req);
4131                 io_cqring_add_event(req, ret);
4132                 io_put_req(req);
4133         }
4134
4135         /* if a dependent link is ready, pass it back */
4136         if (!ret && nxt)
4137                 io_wq_assign_next(workptr, nxt);
4138 }
4139
4140 static int io_req_needs_file(struct io_kiocb *req, int fd)
4141 {
4142         if (!io_op_defs[req->opcode].needs_file)
4143                 return 0;
4144         if (fd == -1 && io_op_defs[req->opcode].fd_non_neg)
4145                 return 0;
4146         return 1;
4147 }
4148
4149 static inline struct file *io_file_from_index(struct io_ring_ctx *ctx,
4150                                               int index)
4151 {
4152         struct fixed_file_table *table;
4153
4154         table = &ctx->file_data->table[index >> IORING_FILE_TABLE_SHIFT];
4155         return table->files[index & IORING_FILE_TABLE_MASK];;
4156 }
4157
4158 static int io_req_set_file(struct io_submit_state *state, struct io_kiocb *req,
4159                            const struct io_uring_sqe *sqe)
4160 {
4161         struct io_ring_ctx *ctx = req->ctx;
4162         unsigned flags;
4163         int fd;
4164
4165         flags = READ_ONCE(sqe->flags);
4166         fd = READ_ONCE(sqe->fd);
4167
4168         if (flags & IOSQE_IO_DRAIN)
4169                 req->flags |= REQ_F_IO_DRAIN;
4170
4171         if (!io_req_needs_file(req, fd))
4172                 return 0;
4173
4174         if (flags & IOSQE_FIXED_FILE) {
4175                 if (unlikely(!ctx->file_data ||
4176                     (unsigned) fd >= ctx->nr_user_files))
4177                         return -EBADF;
4178                 fd = array_index_nospec(fd, ctx->nr_user_files);
4179                 req->file = io_file_from_index(ctx, fd);
4180                 if (!req->file)
4181                         return -EBADF;
4182                 req->flags |= REQ_F_FIXED_FILE;
4183                 percpu_ref_get(&ctx->file_data->refs);
4184         } else {
4185                 if (req->needs_fixed_file)
4186                         return -EBADF;
4187                 trace_io_uring_file_get(ctx, fd);
4188                 req->file = io_file_get(state, fd);
4189                 if (unlikely(!req->file))
4190                         return -EBADF;
4191         }
4192
4193         return 0;
4194 }
4195
4196 static int io_grab_files(struct io_kiocb *req)
4197 {
4198         int ret = -EBADF;
4199         struct io_ring_ctx *ctx = req->ctx;
4200
4201         if (!req->ring_file)
4202                 return -EBADF;
4203
4204         rcu_read_lock();
4205         spin_lock_irq(&ctx->inflight_lock);
4206         /*
4207          * We use the f_ops->flush() handler to ensure that we can flush
4208          * out work accessing these files if the fd is closed. Check if
4209          * the fd has changed since we started down this path, and disallow
4210          * this operation if it has.
4211          */
4212         if (fcheck(req->ring_fd) == req->ring_file) {
4213                 list_add(&req->inflight_entry, &ctx->inflight_list);
4214                 req->flags |= REQ_F_INFLIGHT;
4215                 req->work.files = current->files;
4216                 ret = 0;
4217         }
4218         spin_unlock_irq(&ctx->inflight_lock);
4219         rcu_read_unlock();
4220
4221         return ret;
4222 }
4223
4224 static enum hrtimer_restart io_link_timeout_fn(struct hrtimer *timer)
4225 {
4226         struct io_timeout_data *data = container_of(timer,
4227                                                 struct io_timeout_data, timer);
4228         struct io_kiocb *req = data->req;
4229         struct io_ring_ctx *ctx = req->ctx;
4230         struct io_kiocb *prev = NULL;
4231         unsigned long flags;
4232
4233         spin_lock_irqsave(&ctx->completion_lock, flags);
4234
4235         /*
4236          * We don't expect the list to be empty, that will only happen if we
4237          * race with the completion of the linked work.
4238          */
4239         if (!list_empty(&req->link_list)) {
4240                 prev = list_entry(req->link_list.prev, struct io_kiocb,
4241                                   link_list);
4242                 if (refcount_inc_not_zero(&prev->refs)) {
4243                         list_del_init(&req->link_list);
4244                         prev->flags &= ~REQ_F_LINK_TIMEOUT;
4245                 } else
4246                         prev = NULL;
4247         }
4248
4249         spin_unlock_irqrestore(&ctx->completion_lock, flags);
4250
4251         if (prev) {
4252                 req_set_fail_links(prev);
4253                 io_async_find_and_cancel(ctx, req, prev->user_data, NULL,
4254                                                 -ETIME);
4255                 io_put_req(prev);
4256         } else {
4257                 io_cqring_add_event(req, -ETIME);
4258                 io_put_req(req);
4259         }
4260         return HRTIMER_NORESTART;
4261 }
4262
4263 static void io_queue_linked_timeout(struct io_kiocb *req)
4264 {
4265         struct io_ring_ctx *ctx = req->ctx;
4266
4267         /*
4268          * If the list is now empty, then our linked request finished before
4269          * we got a chance to setup the timer
4270          */
4271         spin_lock_irq(&ctx->completion_lock);
4272         if (!list_empty(&req->link_list)) {
4273                 struct io_timeout_data *data = &req->io->timeout;
4274
4275                 data->timer.function = io_link_timeout_fn;
4276                 hrtimer_start(&data->timer, timespec64_to_ktime(data->ts),
4277                                 data->mode);
4278         }
4279         spin_unlock_irq(&ctx->completion_lock);
4280
4281         /* drop submission reference */
4282         io_put_req(req);
4283 }
4284
4285 static struct io_kiocb *io_prep_linked_timeout(struct io_kiocb *req)
4286 {
4287         struct io_kiocb *nxt;
4288
4289         if (!(req->flags & REQ_F_LINK))
4290                 return NULL;
4291
4292         nxt = list_first_entry_or_null(&req->link_list, struct io_kiocb,
4293                                         link_list);
4294         if (!nxt || nxt->opcode != IORING_OP_LINK_TIMEOUT)
4295                 return NULL;
4296
4297         req->flags |= REQ_F_LINK_TIMEOUT;
4298         return nxt;
4299 }
4300
4301 static void __io_queue_sqe(struct io_kiocb *req, const struct io_uring_sqe *sqe)
4302 {
4303         struct io_kiocb *linked_timeout;
4304         struct io_kiocb *nxt = NULL;
4305         int ret;
4306
4307 again:
4308         linked_timeout = io_prep_linked_timeout(req);
4309
4310         ret = io_issue_sqe(req, sqe, &nxt, true);
4311
4312         /*
4313          * We async punt it if the file wasn't marked NOWAIT, or if the file
4314          * doesn't support non-blocking read/write attempts
4315          */
4316         if (ret == -EAGAIN && (!(req->flags & REQ_F_NOWAIT) ||
4317             (req->flags & REQ_F_MUST_PUNT))) {
4318                 if (req->work.flags & IO_WQ_WORK_NEEDS_FILES) {
4319                         ret = io_grab_files(req);
4320                         if (ret)
4321                                 goto err;
4322                 }
4323
4324                 /*
4325                  * Queued up for async execution, worker will release
4326                  * submit reference when the iocb is actually submitted.
4327                  */
4328                 io_queue_async_work(req);
4329                 goto done_req;
4330         }
4331
4332 err:
4333         /* drop submission reference */
4334         io_put_req(req);
4335
4336         if (linked_timeout) {
4337                 if (!ret)
4338                         io_queue_linked_timeout(linked_timeout);
4339                 else
4340                         io_put_req(linked_timeout);
4341         }
4342
4343         /* and drop final reference, if we failed */
4344         if (ret) {
4345                 io_cqring_add_event(req, ret);
4346                 req_set_fail_links(req);
4347                 io_put_req(req);
4348         }
4349 done_req:
4350         if (nxt) {
4351                 req = nxt;
4352                 nxt = NULL;
4353                 goto again;
4354         }
4355 }
4356
4357 static void io_queue_sqe(struct io_kiocb *req, const struct io_uring_sqe *sqe)
4358 {
4359         int ret;
4360
4361         if (unlikely(req->ctx->drain_next)) {
4362                 req->flags |= REQ_F_IO_DRAIN;
4363                 req->ctx->drain_next = false;
4364         }
4365         req->ctx->drain_next = (req->flags & REQ_F_DRAIN_LINK);
4366
4367         ret = io_req_defer(req, sqe);
4368         if (ret) {
4369                 if (ret != -EIOCBQUEUED) {
4370                         io_cqring_add_event(req, ret);
4371                         req_set_fail_links(req);
4372                         io_double_put_req(req);
4373                 }
4374         } else if ((req->flags & REQ_F_FORCE_ASYNC) &&
4375                    !io_wq_current_is_worker()) {
4376                 /*
4377                  * Never try inline submit of IOSQE_ASYNC is set, go straight
4378                  * to async execution.
4379                  */
4380                 req->work.flags |= IO_WQ_WORK_CONCURRENT;
4381                 io_queue_async_work(req);
4382         } else {
4383                 __io_queue_sqe(req, sqe);
4384         }
4385 }
4386
4387 static inline void io_queue_link_head(struct io_kiocb *req)
4388 {
4389         if (unlikely(req->flags & REQ_F_FAIL_LINK)) {
4390                 io_cqring_add_event(req, -ECANCELED);
4391                 io_double_put_req(req);
4392         } else
4393                 io_queue_sqe(req, NULL);
4394 }
4395
4396 #define SQE_VALID_FLAGS (IOSQE_FIXED_FILE|IOSQE_IO_DRAIN|IOSQE_IO_LINK| \
4397                                 IOSQE_IO_HARDLINK | IOSQE_ASYNC)
4398
4399 static bool io_submit_sqe(struct io_kiocb *req, const struct io_uring_sqe *sqe,
4400                           struct io_submit_state *state, struct io_kiocb **link)
4401 {
4402         struct io_ring_ctx *ctx = req->ctx;
4403         unsigned int sqe_flags;
4404         int ret;
4405
4406         sqe_flags = READ_ONCE(sqe->flags);
4407
4408         /* enforce forwards compatibility on users */
4409         if (unlikely(sqe_flags & ~SQE_VALID_FLAGS)) {
4410                 ret = -EINVAL;
4411                 goto err_req;
4412         }
4413         if (sqe_flags & IOSQE_ASYNC)
4414                 req->flags |= REQ_F_FORCE_ASYNC;
4415
4416         ret = io_req_set_file(state, req, sqe);
4417         if (unlikely(ret)) {
4418 err_req:
4419                 io_cqring_add_event(req, ret);
4420                 io_double_put_req(req);
4421                 return false;
4422         }
4423
4424         /*
4425          * If we already have a head request, queue this one for async
4426          * submittal once the head completes. If we don't have a head but
4427          * IOSQE_IO_LINK is set in the sqe, start a new head. This one will be
4428          * submitted sync once the chain is complete. If none of those
4429          * conditions are true (normal request), then just queue it.
4430          */
4431         if (*link) {
4432                 struct io_kiocb *head = *link;
4433
4434                 if (sqe_flags & IOSQE_IO_DRAIN)
4435                         head->flags |= REQ_F_DRAIN_LINK | REQ_F_IO_DRAIN;
4436
4437                 if (sqe_flags & IOSQE_IO_HARDLINK)
4438                         req->flags |= REQ_F_HARDLINK;
4439
4440                 if (io_alloc_async_ctx(req)) {
4441                         ret = -EAGAIN;
4442                         goto err_req;
4443                 }
4444
4445                 ret = io_req_defer_prep(req, sqe);
4446                 if (ret) {
4447                         /* fail even hard links since we don't submit */
4448                         head->flags |= REQ_F_FAIL_LINK;
4449                         goto err_req;
4450                 }
4451                 trace_io_uring_link(ctx, req, head);
4452                 list_add_tail(&req->link_list, &head->link_list);
4453
4454                 /* last request of a link, enqueue the link */
4455                 if (!(sqe_flags & (IOSQE_IO_LINK|IOSQE_IO_HARDLINK))) {
4456                         io_queue_link_head(head);
4457                         *link = NULL;
4458                 }
4459         } else if (sqe_flags & (IOSQE_IO_LINK|IOSQE_IO_HARDLINK)) {
4460                 req->flags |= REQ_F_LINK;
4461                 if (sqe_flags & IOSQE_IO_HARDLINK)
4462                         req->flags |= REQ_F_HARDLINK;
4463
4464                 INIT_LIST_HEAD(&req->link_list);
4465                 ret = io_req_defer_prep(req, sqe);
4466                 if (ret)
4467                         req->flags |= REQ_F_FAIL_LINK;
4468                 *link = req;
4469         } else {
4470                 io_queue_sqe(req, sqe);
4471         }
4472
4473         return true;
4474 }
4475
4476 /*
4477  * Batched submission is done, ensure local IO is flushed out.
4478  */
4479 static void io_submit_state_end(struct io_submit_state *state)
4480 {
4481         blk_finish_plug(&state->plug);
4482         io_file_put(state);
4483         if (state->free_reqs)
4484                 kmem_cache_free_bulk(req_cachep, state->free_reqs,
4485                                         &state->reqs[state->cur_req]);
4486 }
4487
4488 /*
4489  * Start submission side cache.
4490  */
4491 static void io_submit_state_start(struct io_submit_state *state,
4492                                   unsigned int max_ios)
4493 {
4494         blk_start_plug(&state->plug);
4495         state->free_reqs = 0;
4496         state->file = NULL;
4497         state->ios_left = max_ios;
4498 }
4499
4500 static void io_commit_sqring(struct io_ring_ctx *ctx)
4501 {
4502         struct io_rings *rings = ctx->rings;
4503
4504         if (ctx->cached_sq_head != READ_ONCE(rings->sq.head)) {
4505                 /*
4506                  * Ensure any loads from the SQEs are done at this point,
4507                  * since once we write the new head, the application could
4508                  * write new data to them.
4509                  */
4510                 smp_store_release(&rings->sq.head, ctx->cached_sq_head);
4511         }
4512 }
4513
4514 /*
4515  * Fetch an sqe, if one is available. Note that sqe_ptr will point to memory
4516  * that is mapped by userspace. This means that care needs to be taken to
4517  * ensure that reads are stable, as we cannot rely on userspace always
4518  * being a good citizen. If members of the sqe are validated and then later
4519  * used, it's important that those reads are done through READ_ONCE() to
4520  * prevent a re-load down the line.
4521  */
4522 static bool io_get_sqring(struct io_ring_ctx *ctx, struct io_kiocb *req,
4523                           const struct io_uring_sqe **sqe_ptr)
4524 {
4525         struct io_rings *rings = ctx->rings;
4526         u32 *sq_array = ctx->sq_array;
4527         unsigned head;
4528
4529         /*
4530          * The cached sq head (or cq tail) serves two purposes:
4531          *
4532          * 1) allows us to batch the cost of updating the user visible
4533          *    head updates.
4534          * 2) allows the kernel side to track the head on its own, even
4535          *    though the application is the one updating it.
4536          */
4537         head = ctx->cached_sq_head;
4538         /* make sure SQ entry isn't read before tail */
4539         if (unlikely(head == smp_load_acquire(&rings->sq.tail)))
4540                 return false;
4541
4542         head = READ_ONCE(sq_array[head & ctx->sq_mask]);
4543         if (likely(head < ctx->sq_entries)) {
4544                 /*
4545                  * All io need record the previous position, if LINK vs DARIN,
4546                  * it can be used to mark the position of the first IO in the
4547                  * link list.
4548                  */
4549                 req->sequence = ctx->cached_sq_head;
4550                 *sqe_ptr = &ctx->sq_sqes[head];
4551                 req->opcode = READ_ONCE((*sqe_ptr)->opcode);
4552                 req->user_data = READ_ONCE((*sqe_ptr)->user_data);
4553                 ctx->cached_sq_head++;
4554                 return true;
4555         }
4556
4557         /* drop invalid entries */
4558         ctx->cached_sq_head++;
4559         ctx->cached_sq_dropped++;
4560         WRITE_ONCE(rings->sq_dropped, ctx->cached_sq_dropped);
4561         return false;
4562 }
4563
4564 static int io_submit_sqes(struct io_ring_ctx *ctx, unsigned int nr,
4565                           struct file *ring_file, int ring_fd,
4566                           struct mm_struct **mm, bool async)
4567 {
4568         struct io_submit_state state, *statep = NULL;
4569         struct io_kiocb *link = NULL;
4570         int i, submitted = 0;
4571         bool mm_fault = false;
4572
4573         /* if we have a backlog and couldn't flush it all, return BUSY */
4574         if (test_bit(0, &ctx->sq_check_overflow)) {
4575                 if (!list_empty(&ctx->cq_overflow_list) &&
4576                     !io_cqring_overflow_flush(ctx, false))
4577                         return -EBUSY;
4578         }
4579
4580         if (!percpu_ref_tryget_many(&ctx->refs, nr))
4581                 return -EAGAIN;
4582
4583         if (nr > IO_PLUG_THRESHOLD) {
4584                 io_submit_state_start(&state, nr);
4585                 statep = &state;
4586         }
4587
4588         for (i = 0; i < nr; i++) {
4589                 const struct io_uring_sqe *sqe;
4590                 struct io_kiocb *req;
4591
4592                 req = io_get_req(ctx, statep);
4593                 if (unlikely(!req)) {
4594                         if (!submitted)
4595                                 submitted = -EAGAIN;
4596                         break;
4597                 }
4598                 if (!io_get_sqring(ctx, req, &sqe)) {
4599                         __io_req_do_free(req);
4600                         break;
4601                 }
4602
4603                 /* will complete beyond this point, count as submitted */
4604                 submitted++;
4605
4606                 if (unlikely(req->opcode >= IORING_OP_LAST)) {
4607                         io_cqring_add_event(req, -EINVAL);
4608                         io_double_put_req(req);
4609                         break;
4610                 }
4611
4612                 if (io_op_defs[req->opcode].needs_mm && !*mm) {
4613                         mm_fault = mm_fault || !mmget_not_zero(ctx->sqo_mm);
4614                         if (!mm_fault) {
4615                                 use_mm(ctx->sqo_mm);
4616                                 *mm = ctx->sqo_mm;
4617                         }
4618                 }
4619
4620                 req->ring_file = ring_file;
4621                 req->ring_fd = ring_fd;
4622                 req->has_user = *mm != NULL;
4623                 req->in_async = async;
4624                 req->needs_fixed_file = async;
4625                 trace_io_uring_submit_sqe(ctx, req->user_data, true, async);
4626                 if (!io_submit_sqe(req, sqe, statep, &link))
4627                         break;
4628         }
4629
4630         if (submitted != nr)
4631                 percpu_ref_put_many(&ctx->refs, nr - submitted);
4632         if (link)
4633                 io_queue_link_head(link);
4634         if (statep)
4635                 io_submit_state_end(&state);
4636
4637          /* Commit SQ ring head once we've consumed and submitted all SQEs */
4638         io_commit_sqring(ctx);
4639
4640         return submitted;
4641 }
4642
4643 static int io_sq_thread(void *data)
4644 {
4645         struct io_ring_ctx *ctx = data;
4646         struct mm_struct *cur_mm = NULL;
4647         const struct cred *old_cred;
4648         mm_segment_t old_fs;
4649         DEFINE_WAIT(wait);
4650         unsigned inflight;
4651         unsigned long timeout;
4652         int ret;
4653
4654         complete(&ctx->completions[1]);
4655
4656         old_fs = get_fs();
4657         set_fs(USER_DS);
4658         old_cred = override_creds(ctx->creds);
4659
4660         ret = timeout = inflight = 0;
4661         while (!kthread_should_park()) {
4662                 unsigned int to_submit;
4663
4664                 if (inflight) {
4665                         unsigned nr_events = 0;
4666
4667                         if (ctx->flags & IORING_SETUP_IOPOLL) {
4668                                 /*
4669                                  * inflight is the count of the maximum possible
4670                                  * entries we submitted, but it can be smaller
4671                                  * if we dropped some of them. If we don't have
4672                                  * poll entries available, then we know that we
4673                                  * have nothing left to poll for. Reset the
4674                                  * inflight count to zero in that case.
4675                                  */
4676                                 mutex_lock(&ctx->uring_lock);
4677                                 if (!list_empty(&ctx->poll_list))
4678                                         __io_iopoll_check(ctx, &nr_events, 0);
4679                                 else
4680                                         inflight = 0;
4681                                 mutex_unlock(&ctx->uring_lock);
4682                         } else {
4683                                 /*
4684                                  * Normal IO, just pretend everything completed.
4685                                  * We don't have to poll completions for that.
4686                                  */
4687                                 nr_events = inflight;
4688                         }
4689
4690                         inflight -= nr_events;
4691                         if (!inflight)
4692                                 timeout = jiffies + ctx->sq_thread_idle;
4693                 }
4694
4695                 to_submit = io_sqring_entries(ctx);
4696
4697                 /*
4698                  * If submit got -EBUSY, flag us as needing the application
4699                  * to enter the kernel to reap and flush events.
4700                  */
4701                 if (!to_submit || ret == -EBUSY) {
4702                         /*
4703                          * We're polling. If we're within the defined idle
4704                          * period, then let us spin without work before going
4705                          * to sleep. The exception is if we got EBUSY doing
4706                          * more IO, we should wait for the application to
4707                          * reap events and wake us up.
4708                          */
4709                         if (inflight ||
4710                             (!time_after(jiffies, timeout) && ret != -EBUSY)) {
4711                                 cond_resched();
4712                                 continue;
4713                         }
4714
4715                         /*
4716                          * Drop cur_mm before scheduling, we can't hold it for
4717                          * long periods (or over schedule()). Do this before
4718                          * adding ourselves to the waitqueue, as the unuse/drop
4719                          * may sleep.
4720                          */
4721                         if (cur_mm) {
4722                                 unuse_mm(cur_mm);
4723                                 mmput(cur_mm);
4724                                 cur_mm = NULL;
4725                         }
4726
4727                         prepare_to_wait(&ctx->sqo_wait, &wait,
4728                                                 TASK_INTERRUPTIBLE);
4729
4730                         /* Tell userspace we may need a wakeup call */
4731                         ctx->rings->sq_flags |= IORING_SQ_NEED_WAKEUP;
4732                         /* make sure to read SQ tail after writing flags */
4733                         smp_mb();
4734
4735                         to_submit = io_sqring_entries(ctx);
4736                         if (!to_submit || ret == -EBUSY) {
4737                                 if (kthread_should_park()) {
4738                                         finish_wait(&ctx->sqo_wait, &wait);
4739                                         break;
4740                                 }
4741                                 if (signal_pending(current))
4742                                         flush_signals(current);
4743                                 schedule();
4744                                 finish_wait(&ctx->sqo_wait, &wait);
4745
4746                                 ctx->rings->sq_flags &= ~IORING_SQ_NEED_WAKEUP;
4747                                 continue;
4748                         }
4749                         finish_wait(&ctx->sqo_wait, &wait);
4750
4751                         ctx->rings->sq_flags &= ~IORING_SQ_NEED_WAKEUP;
4752                 }
4753
4754                 to_submit = min(to_submit, ctx->sq_entries);
4755                 mutex_lock(&ctx->uring_lock);
4756                 ret = io_submit_sqes(ctx, to_submit, NULL, -1, &cur_mm, true);
4757                 mutex_unlock(&ctx->uring_lock);
4758                 if (ret > 0)
4759                         inflight += ret;
4760         }
4761
4762         set_fs(old_fs);
4763         if (cur_mm) {
4764                 unuse_mm(cur_mm);
4765                 mmput(cur_mm);
4766         }
4767         revert_creds(old_cred);
4768
4769         kthread_parkme();
4770
4771         return 0;
4772 }
4773
4774 struct io_wait_queue {
4775         struct wait_queue_entry wq;
4776         struct io_ring_ctx *ctx;
4777         unsigned to_wait;
4778         unsigned nr_timeouts;
4779 };
4780
4781 static inline bool io_should_wake(struct io_wait_queue *iowq, bool noflush)
4782 {
4783         struct io_ring_ctx *ctx = iowq->ctx;
4784
4785         /*
4786          * Wake up if we have enough events, or if a timeout occurred since we
4787          * started waiting. For timeouts, we always want to return to userspace,
4788          * regardless of event count.
4789          */
4790         return io_cqring_events(ctx, noflush) >= iowq->to_wait ||
4791                         atomic_read(&ctx->cq_timeouts) != iowq->nr_timeouts;
4792 }
4793
4794 static int io_wake_function(struct wait_queue_entry *curr, unsigned int mode,
4795                             int wake_flags, void *key)
4796 {
4797         struct io_wait_queue *iowq = container_of(curr, struct io_wait_queue,
4798                                                         wq);
4799
4800         /* use noflush == true, as we can't safely rely on locking context */
4801         if (!io_should_wake(iowq, true))
4802                 return -1;
4803
4804         return autoremove_wake_function(curr, mode, wake_flags, key);
4805 }
4806
4807 /*
4808  * Wait until events become available, if we don't already have some. The
4809  * application must reap them itself, as they reside on the shared cq ring.
4810  */
4811 static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events,
4812                           const sigset_t __user *sig, size_t sigsz)
4813 {
4814         struct io_wait_queue iowq = {
4815                 .wq = {
4816                         .private        = current,
4817                         .func           = io_wake_function,
4818                         .entry          = LIST_HEAD_INIT(iowq.wq.entry),
4819                 },
4820                 .ctx            = ctx,
4821                 .to_wait        = min_events,
4822         };
4823         struct io_rings *rings = ctx->rings;
4824         int ret = 0;
4825
4826         if (io_cqring_events(ctx, false) >= min_events)
4827                 return 0;
4828
4829         if (sig) {
4830 #ifdef CONFIG_COMPAT
4831                 if (in_compat_syscall())
4832                         ret = set_compat_user_sigmask((const compat_sigset_t __user *)sig,
4833                                                       sigsz);
4834                 else
4835 #endif
4836                         ret = set_user_sigmask(sig, sigsz);
4837
4838                 if (ret)
4839                         return ret;
4840         }
4841
4842         iowq.nr_timeouts = atomic_read(&ctx->cq_timeouts);
4843         trace_io_uring_cqring_wait(ctx, min_events);
4844         do {
4845                 prepare_to_wait_exclusive(&ctx->wait, &iowq.wq,
4846                                                 TASK_INTERRUPTIBLE);
4847                 if (io_should_wake(&iowq, false))
4848                         break;
4849                 schedule();
4850                 if (signal_pending(current)) {
4851                         ret = -EINTR;
4852                         break;
4853                 }
4854         } while (1);
4855         finish_wait(&ctx->wait, &iowq.wq);
4856
4857         restore_saved_sigmask_unless(ret == -EINTR);
4858
4859         return READ_ONCE(rings->cq.head) == READ_ONCE(rings->cq.tail) ? ret : 0;
4860 }
4861
4862 static void __io_sqe_files_unregister(struct io_ring_ctx *ctx)
4863 {
4864 #if defined(CONFIG_UNIX)
4865         if (ctx->ring_sock) {
4866                 struct sock *sock = ctx->ring_sock->sk;
4867                 struct sk_buff *skb;
4868
4869                 while ((skb = skb_dequeue(&sock->sk_receive_queue)) != NULL)
4870                         kfree_skb(skb);
4871         }
4872 #else
4873         int i;
4874
4875         for (i = 0; i < ctx->nr_user_files; i++) {
4876                 struct file *file;
4877
4878                 file = io_file_from_index(ctx, i);
4879                 if (file)
4880                         fput(file);
4881         }
4882 #endif
4883 }
4884
4885 static void io_file_ref_kill(struct percpu_ref *ref)
4886 {
4887         struct fixed_file_data *data;
4888
4889         data = container_of(ref, struct fixed_file_data, refs);
4890         complete(&data->done);
4891 }
4892
4893 static int io_sqe_files_unregister(struct io_ring_ctx *ctx)
4894 {
4895         struct fixed_file_data *data = ctx->file_data;
4896         unsigned nr_tables, i;
4897
4898         if (!data)
4899                 return -ENXIO;
4900
4901         /* protect against inflight atomic switch, which drops the ref */
4902         flush_work(&data->ref_work);
4903         percpu_ref_get(&data->refs);
4904         percpu_ref_kill_and_confirm(&data->refs, io_file_ref_kill);
4905         wait_for_completion(&data->done);
4906         percpu_ref_put(&data->refs);
4907         percpu_ref_exit(&data->refs);
4908
4909         __io_sqe_files_unregister(ctx);
4910         nr_tables = DIV_ROUND_UP(ctx->nr_user_files, IORING_MAX_FILES_TABLE);
4911         for (i = 0; i < nr_tables; i++)
4912                 kfree(data->table[i].files);
4913         kfree(data->table);
4914         kfree(data);
4915         ctx->file_data = NULL;
4916         ctx->nr_user_files = 0;
4917         return 0;
4918 }
4919
4920 static void io_sq_thread_stop(struct io_ring_ctx *ctx)
4921 {
4922         if (ctx->sqo_thread) {
4923                 wait_for_completion(&ctx->completions[1]);
4924                 /*
4925                  * The park is a bit of a work-around, without it we get
4926                  * warning spews on shutdown with SQPOLL set and affinity
4927                  * set to a single CPU.
4928                  */
4929                 kthread_park(ctx->sqo_thread);
4930                 kthread_stop(ctx->sqo_thread);
4931                 ctx->sqo_thread = NULL;
4932         }
4933 }
4934
4935 static void io_finish_async(struct io_ring_ctx *ctx)
4936 {
4937         io_sq_thread_stop(ctx);
4938
4939         if (ctx->io_wq) {
4940                 io_wq_destroy(ctx->io_wq);
4941                 ctx->io_wq = NULL;
4942         }
4943 }
4944
4945 #if defined(CONFIG_UNIX)
4946 /*
4947  * Ensure the UNIX gc is aware of our file set, so we are certain that
4948  * the io_uring can be safely unregistered on process exit, even if we have
4949  * loops in the file referencing.
4950  */
4951 static int __io_sqe_files_scm(struct io_ring_ctx *ctx, int nr, int offset)
4952 {
4953         struct sock *sk = ctx->ring_sock->sk;
4954         struct scm_fp_list *fpl;
4955         struct sk_buff *skb;
4956         int i, nr_files;
4957
4958         if (!capable(CAP_SYS_RESOURCE) && !capable(CAP_SYS_ADMIN)) {
4959                 unsigned long inflight = ctx->user->unix_inflight + nr;
4960
4961                 if (inflight > task_rlimit(current, RLIMIT_NOFILE))
4962                         return -EMFILE;
4963         }
4964
4965         fpl = kzalloc(sizeof(*fpl), GFP_KERNEL);
4966         if (!fpl)
4967                 return -ENOMEM;
4968
4969         skb = alloc_skb(0, GFP_KERNEL);
4970         if (!skb) {
4971                 kfree(fpl);
4972                 return -ENOMEM;
4973         }
4974
4975         skb->sk = sk;
4976
4977         nr_files = 0;
4978         fpl->user = get_uid(ctx->user);
4979         for (i = 0; i < nr; i++) {
4980                 struct file *file = io_file_from_index(ctx, i + offset);
4981
4982                 if (!file)
4983                         continue;
4984                 fpl->fp[nr_files] = get_file(file);
4985                 unix_inflight(fpl->user, fpl->fp[nr_files]);
4986                 nr_files++;
4987         }
4988
4989         if (nr_files) {
4990                 fpl->max = SCM_MAX_FD;
4991                 fpl->count = nr_files;
4992                 UNIXCB(skb).fp = fpl;
4993                 skb->destructor = unix_destruct_scm;
4994                 refcount_add(skb->truesize, &sk->sk_wmem_alloc);
4995                 skb_queue_head(&sk->sk_receive_queue, skb);
4996
4997                 for (i = 0; i < nr_files; i++)
4998                         fput(fpl->fp[i]);
4999         } else {
5000                 kfree_skb(skb);
5001                 kfree(fpl);
5002         }
5003
5004         return 0;
5005 }
5006
5007 /*
5008  * If UNIX sockets are enabled, fd passing can cause a reference cycle which
5009  * causes regular reference counting to break down. We rely on the UNIX
5010  * garbage collection to take care of this problem for us.
5011  */
5012 static int io_sqe_files_scm(struct io_ring_ctx *ctx)
5013 {
5014         unsigned left, total;
5015         int ret = 0;
5016
5017         total = 0;
5018         left = ctx->nr_user_files;
5019         while (left) {
5020                 unsigned this_files = min_t(unsigned, left, SCM_MAX_FD);
5021
5022                 ret = __io_sqe_files_scm(ctx, this_files, total);
5023                 if (ret)
5024                         break;
5025                 left -= this_files;
5026                 total += this_files;
5027         }
5028
5029         if (!ret)
5030                 return 0;
5031
5032         while (total < ctx->nr_user_files) {
5033                 struct file *file = io_file_from_index(ctx, total);
5034
5035                 if (file)
5036                         fput(file);
5037                 total++;
5038         }
5039
5040         return ret;
5041 }
5042 #else
5043 static int io_sqe_files_scm(struct io_ring_ctx *ctx)
5044 {
5045         return 0;
5046 }
5047 #endif
5048
5049 static int io_sqe_alloc_file_tables(struct io_ring_ctx *ctx, unsigned nr_tables,
5050                                     unsigned nr_files)
5051 {
5052         int i;
5053
5054         for (i = 0; i < nr_tables; i++) {
5055                 struct fixed_file_table *table = &ctx->file_data->table[i];
5056                 unsigned this_files;
5057
5058                 this_files = min(nr_files, IORING_MAX_FILES_TABLE);
5059                 table->files = kcalloc(this_files, sizeof(struct file *),
5060                                         GFP_KERNEL);
5061                 if (!table->files)
5062                         break;
5063                 nr_files -= this_files;
5064         }
5065
5066         if (i == nr_tables)
5067                 return 0;
5068
5069         for (i = 0; i < nr_tables; i++) {
5070                 struct fixed_file_table *table = &ctx->file_data->table[i];
5071                 kfree(table->files);
5072         }
5073         return 1;
5074 }
5075
5076 static void io_ring_file_put(struct io_ring_ctx *ctx, struct file *file)
5077 {
5078 #if defined(CONFIG_UNIX)
5079         struct sock *sock = ctx->ring_sock->sk;
5080         struct sk_buff_head list, *head = &sock->sk_receive_queue;
5081         struct sk_buff *skb;
5082         int i;
5083
5084         __skb_queue_head_init(&list);
5085
5086         /*
5087          * Find the skb that holds this file in its SCM_RIGHTS. When found,
5088          * remove this entry and rearrange the file array.
5089          */
5090         skb = skb_dequeue(head);
5091         while (skb) {
5092                 struct scm_fp_list *fp;
5093
5094                 fp = UNIXCB(skb).fp;
5095                 for (i = 0; i < fp->count; i++) {
5096                         int left;
5097
5098                         if (fp->fp[i] != file)
5099                                 continue;
5100
5101                         unix_notinflight(fp->user, fp->fp[i]);
5102                         left = fp->count - 1 - i;
5103                         if (left) {
5104                                 memmove(&fp->fp[i], &fp->fp[i + 1],
5105                                                 left * sizeof(struct file *));
5106                         }
5107                         fp->count--;
5108                         if (!fp->count) {
5109                                 kfree_skb(skb);
5110                                 skb = NULL;
5111                         } else {
5112                                 __skb_queue_tail(&list, skb);
5113                         }
5114                         fput(file);
5115                         file = NULL;
5116                         break;
5117                 }
5118
5119                 if (!file)
5120                         break;
5121
5122                 __skb_queue_tail(&list, skb);
5123
5124                 skb = skb_dequeue(head);
5125         }
5126
5127         if (skb_peek(&list)) {
5128                 spin_lock_irq(&head->lock);
5129                 while ((skb = __skb_dequeue(&list)) != NULL)
5130                         __skb_queue_tail(head, skb);
5131                 spin_unlock_irq(&head->lock);
5132         }
5133 #else
5134         fput(file);
5135 #endif
5136 }
5137
5138 struct io_file_put {
5139         struct llist_node llist;
5140         struct file *file;
5141         struct completion *done;
5142 };
5143
5144 static void io_ring_file_ref_switch(struct work_struct *work)
5145 {
5146         struct io_file_put *pfile, *tmp;
5147         struct fixed_file_data *data;
5148         struct llist_node *node;
5149
5150         data = container_of(work, struct fixed_file_data, ref_work);
5151
5152         while ((node = llist_del_all(&data->put_llist)) != NULL) {
5153                 llist_for_each_entry_safe(pfile, tmp, node, llist) {
5154                         io_ring_file_put(data->ctx, pfile->file);
5155                         if (pfile->done)
5156                                 complete(pfile->done);
5157                         else
5158                                 kfree(pfile);
5159                 }
5160         }
5161
5162         percpu_ref_get(&data->refs);
5163         percpu_ref_switch_to_percpu(&data->refs);
5164 }
5165
5166 static void io_file_data_ref_zero(struct percpu_ref *ref)
5167 {
5168         struct fixed_file_data *data;
5169
5170         data = container_of(ref, struct fixed_file_data, refs);
5171
5172         /* we can't safely switch from inside this context, punt to wq */
5173         queue_work(system_wq, &data->ref_work);
5174 }
5175
5176 static int io_sqe_files_register(struct io_ring_ctx *ctx, void __user *arg,
5177                                  unsigned nr_args)
5178 {
5179         __s32 __user *fds = (__s32 __user *) arg;
5180         unsigned nr_tables;
5181         struct file *file;
5182         int fd, ret = 0;
5183         unsigned i;
5184
5185         if (ctx->file_data)
5186                 return -EBUSY;
5187         if (!nr_args)
5188                 return -EINVAL;
5189         if (nr_args > IORING_MAX_FIXED_FILES)
5190                 return -EMFILE;
5191
5192         ctx->file_data = kzalloc(sizeof(*ctx->file_data), GFP_KERNEL);
5193         if (!ctx->file_data)
5194                 return -ENOMEM;
5195         ctx->file_data->ctx = ctx;
5196         init_completion(&ctx->file_data->done);
5197
5198         nr_tables = DIV_ROUND_UP(nr_args, IORING_MAX_FILES_TABLE);
5199         ctx->file_data->table = kcalloc(nr_tables,
5200                                         sizeof(struct fixed_file_table),
5201                                         GFP_KERNEL);
5202         if (!ctx->file_data->table) {
5203                 kfree(ctx->file_data);
5204                 ctx->file_data = NULL;
5205                 return -ENOMEM;
5206         }
5207
5208         if (percpu_ref_init(&ctx->file_data->refs, io_file_data_ref_zero,
5209                                 PERCPU_REF_ALLOW_REINIT, GFP_KERNEL)) {
5210                 kfree(ctx->file_data->table);
5211                 kfree(ctx->file_data);
5212                 ctx->file_data = NULL;
5213                 return -ENOMEM;
5214         }
5215         ctx->file_data->put_llist.first = NULL;
5216         INIT_WORK(&ctx->file_data->ref_work, io_ring_file_ref_switch);
5217
5218         if (io_sqe_alloc_file_tables(ctx, nr_tables, nr_args)) {
5219                 percpu_ref_exit(&ctx->file_data->refs);
5220                 kfree(ctx->file_data->table);
5221                 kfree(ctx->file_data);
5222                 ctx->file_data = NULL;
5223                 return -ENOMEM;
5224         }
5225
5226         for (i = 0; i < nr_args; i++, ctx->nr_user_files++) {
5227                 struct fixed_file_table *table;
5228                 unsigned index;
5229
5230                 ret = -EFAULT;
5231                 if (copy_from_user(&fd, &fds[i], sizeof(fd)))
5232                         break;
5233                 /* allow sparse sets */
5234                 if (fd == -1) {
5235                         ret = 0;
5236                         continue;
5237                 }
5238
5239                 table = &ctx->file_data->table[i >> IORING_FILE_TABLE_SHIFT];
5240                 index = i & IORING_FILE_TABLE_MASK;
5241                 file = fget(fd);
5242
5243                 ret = -EBADF;
5244                 if (!file)
5245                         break;
5246
5247                 /*
5248                  * Don't allow io_uring instances to be registered. If UNIX
5249                  * isn't enabled, then this causes a reference cycle and this
5250                  * instance can never get freed. If UNIX is enabled we'll
5251                  * handle it just fine, but there's still no point in allowing
5252                  * a ring fd as it doesn't support regular read/write anyway.
5253                  */
5254                 if (file->f_op == &io_uring_fops) {
5255                         fput(file);
5256                         break;
5257                 }
5258                 ret = 0;
5259                 table->files[index] = file;
5260         }
5261
5262         if (ret) {
5263                 for (i = 0; i < ctx->nr_user_files; i++) {
5264                         file = io_file_from_index(ctx, i);
5265                         if (file)
5266                                 fput(file);
5267                 }
5268                 for (i = 0; i < nr_tables; i++)
5269                         kfree(ctx->file_data->table[i].files);
5270
5271                 kfree(ctx->file_data->table);
5272                 kfree(ctx->file_data);
5273                 ctx->file_data = NULL;
5274                 ctx->nr_user_files = 0;
5275                 return ret;
5276         }
5277
5278         ret = io_sqe_files_scm(ctx);
5279         if (ret)
5280                 io_sqe_files_unregister(ctx);
5281
5282         return ret;
5283 }
5284
5285 static int io_sqe_file_register(struct io_ring_ctx *ctx, struct file *file,
5286                                 int index)
5287 {
5288 #if defined(CONFIG_UNIX)
5289         struct sock *sock = ctx->ring_sock->sk;
5290         struct sk_buff_head *head = &sock->sk_receive_queue;
5291         struct sk_buff *skb;
5292
5293         /*
5294          * See if we can merge this file into an existing skb SCM_RIGHTS
5295          * file set. If there's no room, fall back to allocating a new skb
5296          * and filling it in.
5297          */
5298         spin_lock_irq(&head->lock);
5299         skb = skb_peek(head);
5300         if (skb) {
5301                 struct scm_fp_list *fpl = UNIXCB(skb).fp;
5302
5303                 if (fpl->count < SCM_MAX_FD) {
5304                         __skb_unlink(skb, head);
5305                         spin_unlock_irq(&head->lock);
5306                         fpl->fp[fpl->count] = get_file(file);
5307                         unix_inflight(fpl->user, fpl->fp[fpl->count]);
5308                         fpl->count++;
5309                         spin_lock_irq(&head->lock);
5310                         __skb_queue_head(head, skb);
5311                 } else {
5312                         skb = NULL;
5313                 }
5314         }
5315         spin_unlock_irq(&head->lock);
5316
5317         if (skb) {
5318                 fput(file);
5319                 return 0;
5320         }
5321
5322         return __io_sqe_files_scm(ctx, 1, index);
5323 #else
5324         return 0;
5325 #endif
5326 }
5327
5328 static void io_atomic_switch(struct percpu_ref *ref)
5329 {
5330         struct fixed_file_data *data;
5331
5332         data = container_of(ref, struct fixed_file_data, refs);
5333         clear_bit(FFD_F_ATOMIC, &data->state);
5334 }
5335
5336 static bool io_queue_file_removal(struct fixed_file_data *data,
5337                                   struct file *file)
5338 {
5339         struct io_file_put *pfile, pfile_stack;
5340         DECLARE_COMPLETION_ONSTACK(done);
5341
5342         /*
5343          * If we fail allocating the struct we need for doing async reomval
5344          * of this file, just punt to sync and wait for it.
5345          */
5346         pfile = kzalloc(sizeof(*pfile), GFP_KERNEL);
5347         if (!pfile) {
5348                 pfile = &pfile_stack;
5349                 pfile->done = &done;
5350         }
5351
5352         pfile->file = file;
5353         llist_add(&pfile->llist, &data->put_llist);
5354
5355         if (pfile == &pfile_stack) {
5356                 if (!test_and_set_bit(FFD_F_ATOMIC, &data->state)) {
5357                         percpu_ref_put(&data->refs);
5358                         percpu_ref_switch_to_atomic(&data->refs,
5359                                                         io_atomic_switch);
5360                 }
5361                 wait_for_completion(&done);
5362                 flush_work(&data->ref_work);
5363                 return false;
5364         }
5365
5366         return true;
5367 }
5368
5369 static int __io_sqe_files_update(struct io_ring_ctx *ctx,
5370                                  struct io_uring_files_update *up,
5371                                  unsigned nr_args)
5372 {
5373         struct fixed_file_data *data = ctx->file_data;
5374         bool ref_switch = false;
5375         struct file *file;
5376         __s32 __user *fds;
5377         int fd, i, err;
5378         __u32 done;
5379
5380         if (check_add_overflow(up->offset, nr_args, &done))
5381                 return -EOVERFLOW;
5382         if (done > ctx->nr_user_files)
5383                 return -EINVAL;
5384
5385         done = 0;
5386         fds = u64_to_user_ptr(up->fds);
5387         while (nr_args) {
5388                 struct fixed_file_table *table;
5389                 unsigned index;
5390
5391                 err = 0;
5392                 if (copy_from_user(&fd, &fds[done], sizeof(fd))) {
5393                         err = -EFAULT;
5394                         break;
5395                 }
5396                 i = array_index_nospec(up->offset, ctx->nr_user_files);
5397                 table = &ctx->file_data->table[i >> IORING_FILE_TABLE_SHIFT];
5398                 index = i & IORING_FILE_TABLE_MASK;
5399                 if (table->files[index]) {
5400                         file = io_file_from_index(ctx, index);
5401                         table->files[index] = NULL;
5402                         if (io_queue_file_removal(data, file))
5403                                 ref_switch = true;
5404                 }
5405                 if (fd != -1) {
5406                         file = fget(fd);
5407                         if (!file) {
5408                                 err = -EBADF;
5409                                 break;
5410                         }
5411                         /*
5412                          * Don't allow io_uring instances to be registered. If
5413                          * UNIX isn't enabled, then this causes a reference
5414                          * cycle and this instance can never get freed. If UNIX
5415                          * is enabled we'll handle it just fine, but there's
5416                          * still no point in allowing a ring fd as it doesn't
5417                          * support regular read/write anyway.
5418                          */
5419                         if (file->f_op == &io_uring_fops) {
5420                                 fput(file);
5421                                 err = -EBADF;
5422                                 break;
5423                         }
5424                         table->files[index] = file;
5425                         err = io_sqe_file_register(ctx, file, i);
5426                         if (err)
5427                                 break;
5428                 }
5429                 nr_args--;
5430                 done++;
5431                 up->offset++;
5432         }
5433
5434         if (ref_switch && !test_and_set_bit(FFD_F_ATOMIC, &data->state)) {
5435                 percpu_ref_put(&data->refs);
5436                 percpu_ref_switch_to_atomic(&data->refs, io_atomic_switch);
5437         }
5438
5439         return done ? done : err;
5440 }
5441 static int io_sqe_files_update(struct io_ring_ctx *ctx, void __user *arg,
5442                                unsigned nr_args)
5443 {
5444         struct io_uring_files_update up;
5445
5446         if (!ctx->file_data)
5447                 return -ENXIO;
5448         if (!nr_args)
5449                 return -EINVAL;
5450         if (copy_from_user(&up, arg, sizeof(up)))
5451                 return -EFAULT;
5452         if (up.resv)
5453                 return -EINVAL;
5454
5455         return __io_sqe_files_update(ctx, &up, nr_args);
5456 }
5457
5458 static void io_put_work(struct io_wq_work *work)
5459 {
5460         struct io_kiocb *req = container_of(work, struct io_kiocb, work);
5461
5462         io_put_req(req);
5463 }
5464
5465 static void io_get_work(struct io_wq_work *work)
5466 {
5467         struct io_kiocb *req = container_of(work, struct io_kiocb, work);
5468
5469         refcount_inc(&req->refs);
5470 }
5471
5472 static int io_sq_offload_start(struct io_ring_ctx *ctx,
5473                                struct io_uring_params *p)
5474 {
5475         struct io_wq_data data;
5476         unsigned concurrency;
5477         int ret;
5478
5479         init_waitqueue_head(&ctx->sqo_wait);
5480         mmgrab(current->mm);
5481         ctx->sqo_mm = current->mm;
5482
5483         if (ctx->flags & IORING_SETUP_SQPOLL) {
5484                 ret = -EPERM;
5485                 if (!capable(CAP_SYS_ADMIN))
5486                         goto err;
5487
5488                 ctx->sq_thread_idle = msecs_to_jiffies(p->sq_thread_idle);
5489                 if (!ctx->sq_thread_idle)
5490                         ctx->sq_thread_idle = HZ;
5491
5492                 if (p->flags & IORING_SETUP_SQ_AFF) {
5493                         int cpu = p->sq_thread_cpu;
5494
5495                         ret = -EINVAL;
5496                         if (cpu >= nr_cpu_ids)
5497                                 goto err;
5498                         if (!cpu_online(cpu))
5499                                 goto err;
5500
5501                         ctx->sqo_thread = kthread_create_on_cpu(io_sq_thread,
5502                                                         ctx, cpu,
5503                                                         "io_uring-sq");
5504                 } else {
5505                         ctx->sqo_thread = kthread_create(io_sq_thread, ctx,
5506                                                         "io_uring-sq");
5507                 }
5508                 if (IS_ERR(ctx->sqo_thread)) {
5509                         ret = PTR_ERR(ctx->sqo_thread);
5510                         ctx->sqo_thread = NULL;
5511                         goto err;
5512                 }
5513                 wake_up_process(ctx->sqo_thread);
5514         } else if (p->flags & IORING_SETUP_SQ_AFF) {
5515                 /* Can't have SQ_AFF without SQPOLL */
5516                 ret = -EINVAL;
5517                 goto err;
5518         }
5519
5520         data.mm = ctx->sqo_mm;
5521         data.user = ctx->user;
5522         data.creds = ctx->creds;
5523         data.get_work = io_get_work;
5524         data.put_work = io_put_work;
5525
5526         /* Do QD, or 4 * CPUS, whatever is smallest */
5527         concurrency = min(ctx->sq_entries, 4 * num_online_cpus());
5528         ctx->io_wq = io_wq_create(concurrency, &data);
5529         if (IS_ERR(ctx->io_wq)) {
5530                 ret = PTR_ERR(ctx->io_wq);
5531                 ctx->io_wq = NULL;
5532                 goto err;
5533         }
5534
5535         return 0;
5536 err:
5537         io_finish_async(ctx);
5538         mmdrop(ctx->sqo_mm);
5539         ctx->sqo_mm = NULL;
5540         return ret;
5541 }
5542
5543 static void io_unaccount_mem(struct user_struct *user, unsigned long nr_pages)
5544 {
5545         atomic_long_sub(nr_pages, &user->locked_vm);
5546 }
5547
5548 static int io_account_mem(struct user_struct *user, unsigned long nr_pages)
5549 {
5550         unsigned long page_limit, cur_pages, new_pages;
5551
5552         /* Don't allow more pages than we can safely lock */
5553         page_limit = rlimit(RLIMIT_MEMLOCK) >> PAGE_SHIFT;
5554
5555         do {
5556                 cur_pages = atomic_long_read(&user->locked_vm);
5557                 new_pages = cur_pages + nr_pages;
5558                 if (new_pages > page_limit)
5559                         return -ENOMEM;
5560         } while (atomic_long_cmpxchg(&user->locked_vm, cur_pages,
5561                                         new_pages) != cur_pages);
5562
5563         return 0;
5564 }
5565
5566 static void io_mem_free(void *ptr)
5567 {
5568         struct page *page;
5569
5570         if (!ptr)
5571                 return;
5572
5573         page = virt_to_head_page(ptr);
5574         if (put_page_testzero(page))
5575                 free_compound_page(page);
5576 }
5577
5578 static void *io_mem_alloc(size_t size)
5579 {
5580         gfp_t gfp_flags = GFP_KERNEL | __GFP_ZERO | __GFP_NOWARN | __GFP_COMP |
5581                                 __GFP_NORETRY;
5582
5583         return (void *) __get_free_pages(gfp_flags, get_order(size));
5584 }
5585
5586 static unsigned long rings_size(unsigned sq_entries, unsigned cq_entries,
5587                                 size_t *sq_offset)
5588 {
5589         struct io_rings *rings;
5590         size_t off, sq_array_size;
5591
5592         off = struct_size(rings, cqes, cq_entries);
5593         if (off == SIZE_MAX)
5594                 return SIZE_MAX;
5595
5596 #ifdef CONFIG_SMP
5597         off = ALIGN(off, SMP_CACHE_BYTES);
5598         if (off == 0)
5599                 return SIZE_MAX;
5600 #endif
5601
5602         sq_array_size = array_size(sizeof(u32), sq_entries);
5603         if (sq_array_size == SIZE_MAX)
5604                 return SIZE_MAX;
5605
5606         if (check_add_overflow(off, sq_array_size, &off))
5607                 return SIZE_MAX;
5608
5609         if (sq_offset)
5610                 *sq_offset = off;
5611
5612         return off;
5613 }
5614
5615 static unsigned long ring_pages(unsigned sq_entries, unsigned cq_entries)
5616 {
5617         size_t pages;
5618
5619         pages = (size_t)1 << get_order(
5620                 rings_size(sq_entries, cq_entries, NULL));
5621         pages += (size_t)1 << get_order(
5622                 array_size(sizeof(struct io_uring_sqe), sq_entries));
5623
5624         return pages;
5625 }
5626
5627 static int io_sqe_buffer_unregister(struct io_ring_ctx *ctx)
5628 {
5629         int i, j;
5630
5631         if (!ctx->user_bufs)
5632                 return -ENXIO;
5633
5634         for (i = 0; i < ctx->nr_user_bufs; i++) {
5635                 struct io_mapped_ubuf *imu = &ctx->user_bufs[i];
5636
5637                 for (j = 0; j < imu->nr_bvecs; j++)
5638                         put_user_page(imu->bvec[j].bv_page);
5639
5640                 if (ctx->account_mem)
5641                         io_unaccount_mem(ctx->user, imu->nr_bvecs);
5642                 kvfree(imu->bvec);
5643                 imu->nr_bvecs = 0;
5644         }
5645
5646         kfree(ctx->user_bufs);
5647         ctx->user_bufs = NULL;
5648         ctx->nr_user_bufs = 0;
5649         return 0;
5650 }
5651
5652 static int io_copy_iov(struct io_ring_ctx *ctx, struct iovec *dst,
5653                        void __user *arg, unsigned index)
5654 {
5655         struct iovec __user *src;
5656
5657 #ifdef CONFIG_COMPAT
5658         if (ctx->compat) {
5659                 struct compat_iovec __user *ciovs;
5660                 struct compat_iovec ciov;
5661
5662                 ciovs = (struct compat_iovec __user *) arg;
5663                 if (copy_from_user(&ciov, &ciovs[index], sizeof(ciov)))
5664                         return -EFAULT;
5665
5666                 dst->iov_base = u64_to_user_ptr((u64)ciov.iov_base);
5667                 dst->iov_len = ciov.iov_len;
5668                 return 0;
5669         }
5670 #endif
5671         src = (struct iovec __user *) arg;
5672         if (copy_from_user(dst, &src[index], sizeof(*dst)))
5673                 return -EFAULT;
5674         return 0;
5675 }
5676
5677 static int io_sqe_buffer_register(struct io_ring_ctx *ctx, void __user *arg,
5678                                   unsigned nr_args)
5679 {
5680         struct vm_area_struct **vmas = NULL;
5681         struct page **pages = NULL;
5682         int i, j, got_pages = 0;
5683         int ret = -EINVAL;
5684
5685         if (ctx->user_bufs)
5686                 return -EBUSY;
5687         if (!nr_args || nr_args > UIO_MAXIOV)
5688                 return -EINVAL;
5689
5690         ctx->user_bufs = kcalloc(nr_args, sizeof(struct io_mapped_ubuf),
5691                                         GFP_KERNEL);
5692         if (!ctx->user_bufs)
5693                 return -ENOMEM;
5694
5695         for (i = 0; i < nr_args; i++) {
5696                 struct io_mapped_ubuf *imu = &ctx->user_bufs[i];
5697                 unsigned long off, start, end, ubuf;
5698                 int pret, nr_pages;
5699                 struct iovec iov;
5700                 size_t size;
5701
5702                 ret = io_copy_iov(ctx, &iov, arg, i);
5703                 if (ret)
5704                         goto err;
5705
5706                 /*
5707                  * Don't impose further limits on the size and buffer
5708                  * constraints here, we'll -EINVAL later when IO is
5709                  * submitted if they are wrong.
5710                  */
5711                 ret = -EFAULT;
5712                 if (!iov.iov_base || !iov.iov_len)
5713                         goto err;
5714
5715                 /* arbitrary limit, but we need something */
5716                 if (iov.iov_len > SZ_1G)
5717                         goto err;
5718
5719                 ubuf = (unsigned long) iov.iov_base;
5720                 end = (ubuf + iov.iov_len + PAGE_SIZE - 1) >> PAGE_SHIFT;
5721                 start = ubuf >> PAGE_SHIFT;
5722                 nr_pages = end - start;
5723
5724                 if (ctx->account_mem) {
5725                         ret = io_account_mem(ctx->user, nr_pages);
5726                         if (ret)
5727                                 goto err;
5728                 }
5729
5730                 ret = 0;
5731                 if (!pages || nr_pages > got_pages) {
5732                         kfree(vmas);
5733                         kfree(pages);
5734                         pages = kvmalloc_array(nr_pages, sizeof(struct page *),
5735                                                 GFP_KERNEL);
5736                         vmas = kvmalloc_array(nr_pages,
5737                                         sizeof(struct vm_area_struct *),
5738                                         GFP_KERNEL);
5739                         if (!pages || !vmas) {
5740                                 ret = -ENOMEM;
5741                                 if (ctx->account_mem)
5742                                         io_unaccount_mem(ctx->user, nr_pages);
5743                                 goto err;
5744                         }
5745                         got_pages = nr_pages;
5746                 }
5747
5748                 imu->bvec = kvmalloc_array(nr_pages, sizeof(struct bio_vec),
5749                                                 GFP_KERNEL);
5750                 ret = -ENOMEM;
5751                 if (!imu->bvec) {
5752                         if (ctx->account_mem)
5753                                 io_unaccount_mem(ctx->user, nr_pages);
5754                         goto err;
5755                 }
5756
5757                 ret = 0;
5758                 down_read(&current->mm->mmap_sem);
5759                 pret = get_user_pages(ubuf, nr_pages,
5760                                       FOLL_WRITE | FOLL_LONGTERM,
5761                                       pages, vmas);
5762                 if (pret == nr_pages) {
5763                         /* don't support file backed memory */
5764                         for (j = 0; j < nr_pages; j++) {
5765                                 struct vm_area_struct *vma = vmas[j];
5766
5767                                 if (vma->vm_file &&
5768                                     !is_file_hugepages(vma->vm_file)) {
5769                                         ret = -EOPNOTSUPP;
5770                                         break;
5771                                 }
5772                         }
5773                 } else {
5774                         ret = pret < 0 ? pret : -EFAULT;
5775                 }
5776                 up_read(&current->mm->mmap_sem);
5777                 if (ret) {
5778                         /*
5779                          * if we did partial map, or found file backed vmas,
5780                          * release any pages we did get
5781                          */
5782                         if (pret > 0)
5783                                 put_user_pages(pages, pret);
5784                         if (ctx->account_mem)
5785                                 io_unaccount_mem(ctx->user, nr_pages);
5786                         kvfree(imu->bvec);
5787                         goto err;
5788                 }
5789
5790                 off = ubuf & ~PAGE_MASK;
5791                 size = iov.iov_len;
5792                 for (j = 0; j < nr_pages; j++) {
5793                         size_t vec_len;
5794
5795                         vec_len = min_t(size_t, size, PAGE_SIZE - off);
5796                         imu->bvec[j].bv_page = pages[j];
5797                         imu->bvec[j].bv_len = vec_len;
5798                         imu->bvec[j].bv_offset = off;
5799                         off = 0;
5800                         size -= vec_len;
5801                 }
5802                 /* store original address for later verification */
5803                 imu->ubuf = ubuf;
5804                 imu->len = iov.iov_len;
5805                 imu->nr_bvecs = nr_pages;
5806
5807                 ctx->nr_user_bufs++;
5808         }
5809         kvfree(pages);
5810         kvfree(vmas);
5811         return 0;
5812 err:
5813         kvfree(pages);
5814         kvfree(vmas);
5815         io_sqe_buffer_unregister(ctx);
5816         return ret;
5817 }
5818
5819 static int io_eventfd_register(struct io_ring_ctx *ctx, void __user *arg)
5820 {
5821         __s32 __user *fds = arg;
5822         int fd;
5823
5824         if (ctx->cq_ev_fd)
5825                 return -EBUSY;
5826
5827         if (copy_from_user(&fd, fds, sizeof(*fds)))
5828                 return -EFAULT;
5829
5830         ctx->cq_ev_fd = eventfd_ctx_fdget(fd);
5831         if (IS_ERR(ctx->cq_ev_fd)) {
5832                 int ret = PTR_ERR(ctx->cq_ev_fd);
5833                 ctx->cq_ev_fd = NULL;
5834                 return ret;
5835         }
5836
5837         return 0;
5838 }
5839
5840 static int io_eventfd_unregister(struct io_ring_ctx *ctx)
5841 {
5842         if (ctx->cq_ev_fd) {
5843                 eventfd_ctx_put(ctx->cq_ev_fd);
5844                 ctx->cq_ev_fd = NULL;
5845                 return 0;
5846         }
5847
5848         return -ENXIO;
5849 }
5850
5851 static void io_ring_ctx_free(struct io_ring_ctx *ctx)
5852 {
5853         io_finish_async(ctx);
5854         if (ctx->sqo_mm)
5855                 mmdrop(ctx->sqo_mm);
5856
5857         io_iopoll_reap_events(ctx);
5858         io_sqe_buffer_unregister(ctx);
5859         io_sqe_files_unregister(ctx);
5860         io_eventfd_unregister(ctx);
5861
5862 #if defined(CONFIG_UNIX)
5863         if (ctx->ring_sock) {
5864                 ctx->ring_sock->file = NULL; /* so that iput() is called */
5865                 sock_release(ctx->ring_sock);
5866         }
5867 #endif
5868
5869         io_mem_free(ctx->rings);
5870         io_mem_free(ctx->sq_sqes);
5871
5872         percpu_ref_exit(&ctx->refs);
5873         if (ctx->account_mem)
5874                 io_unaccount_mem(ctx->user,
5875                                 ring_pages(ctx->sq_entries, ctx->cq_entries));
5876         free_uid(ctx->user);
5877         put_cred(ctx->creds);
5878         kfree(ctx->completions);
5879         kfree(ctx->cancel_hash);
5880         kmem_cache_free(req_cachep, ctx->fallback_req);
5881         kfree(ctx);
5882 }
5883
5884 static __poll_t io_uring_poll(struct file *file, poll_table *wait)
5885 {
5886         struct io_ring_ctx *ctx = file->private_data;
5887         __poll_t mask = 0;
5888
5889         poll_wait(file, &ctx->cq_wait, wait);
5890         /*
5891          * synchronizes with barrier from wq_has_sleeper call in
5892          * io_commit_cqring
5893          */
5894         smp_rmb();
5895         if (READ_ONCE(ctx->rings->sq.tail) - ctx->cached_sq_head !=
5896             ctx->rings->sq_ring_entries)
5897                 mask |= EPOLLOUT | EPOLLWRNORM;
5898         if (READ_ONCE(ctx->rings->cq.head) != ctx->cached_cq_tail)
5899                 mask |= EPOLLIN | EPOLLRDNORM;
5900
5901         return mask;
5902 }
5903
5904 static int io_uring_fasync(int fd, struct file *file, int on)
5905 {
5906         struct io_ring_ctx *ctx = file->private_data;
5907
5908         return fasync_helper(fd, file, on, &ctx->cq_fasync);
5909 }
5910
5911 static void io_ring_ctx_wait_and_kill(struct io_ring_ctx *ctx)
5912 {
5913         mutex_lock(&ctx->uring_lock);
5914         percpu_ref_kill(&ctx->refs);
5915         mutex_unlock(&ctx->uring_lock);
5916
5917         io_kill_timeouts(ctx);
5918         io_poll_remove_all(ctx);
5919
5920         if (ctx->io_wq)
5921                 io_wq_cancel_all(ctx->io_wq);
5922
5923         io_iopoll_reap_events(ctx);
5924         /* if we failed setting up the ctx, we might not have any rings */
5925         if (ctx->rings)
5926                 io_cqring_overflow_flush(ctx, true);
5927         wait_for_completion(&ctx->completions[0]);
5928         io_ring_ctx_free(ctx);
5929 }
5930
5931 static int io_uring_release(struct inode *inode, struct file *file)
5932 {
5933         struct io_ring_ctx *ctx = file->private_data;
5934
5935         file->private_data = NULL;
5936         io_ring_ctx_wait_and_kill(ctx);
5937         return 0;
5938 }
5939
5940 static void io_uring_cancel_files(struct io_ring_ctx *ctx,
5941                                   struct files_struct *files)
5942 {
5943         struct io_kiocb *req;
5944         DEFINE_WAIT(wait);
5945
5946         while (!list_empty_careful(&ctx->inflight_list)) {
5947                 struct io_kiocb *cancel_req = NULL;
5948
5949                 spin_lock_irq(&ctx->inflight_lock);
5950                 list_for_each_entry(req, &ctx->inflight_list, inflight_entry) {
5951                         if (req->work.files != files)
5952                                 continue;
5953                         /* req is being completed, ignore */
5954                         if (!refcount_inc_not_zero(&req->refs))
5955                                 continue;
5956                         cancel_req = req;
5957                         break;
5958                 }
5959                 if (cancel_req)
5960                         prepare_to_wait(&ctx->inflight_wait, &wait,
5961                                                 TASK_UNINTERRUPTIBLE);
5962                 spin_unlock_irq(&ctx->inflight_lock);
5963
5964                 /* We need to keep going until we don't find a matching req */
5965                 if (!cancel_req)
5966                         break;
5967
5968                 io_wq_cancel_work(ctx->io_wq, &cancel_req->work);
5969                 io_put_req(cancel_req);
5970                 schedule();
5971         }
5972         finish_wait(&ctx->inflight_wait, &wait);
5973 }
5974
5975 static int io_uring_flush(struct file *file, void *data)
5976 {
5977         struct io_ring_ctx *ctx = file->private_data;
5978
5979         io_uring_cancel_files(ctx, data);
5980         if (fatal_signal_pending(current) || (current->flags & PF_EXITING)) {
5981                 io_cqring_overflow_flush(ctx, true);
5982                 io_wq_cancel_all(ctx->io_wq);
5983         }
5984         return 0;
5985 }
5986
5987 static void *io_uring_validate_mmap_request(struct file *file,
5988                                             loff_t pgoff, size_t sz)
5989 {
5990         struct io_ring_ctx *ctx = file->private_data;
5991         loff_t offset = pgoff << PAGE_SHIFT;
5992         struct page *page;
5993         void *ptr;
5994
5995         switch (offset) {
5996         case IORING_OFF_SQ_RING:
5997         case IORING_OFF_CQ_RING:
5998                 ptr = ctx->rings;
5999                 break;
6000         case IORING_OFF_SQES:
6001                 ptr = ctx->sq_sqes;
6002                 break;
6003         default:
6004                 return ERR_PTR(-EINVAL);
6005         }
6006
6007         page = virt_to_head_page(ptr);
6008         if (sz > page_size(page))
6009                 return ERR_PTR(-EINVAL);
6010
6011         return ptr;
6012 }
6013
6014 #ifdef CONFIG_MMU
6015
6016 static int io_uring_mmap(struct file *file, struct vm_area_struct *vma)
6017 {
6018         size_t sz = vma->vm_end - vma->vm_start;
6019         unsigned long pfn;
6020         void *ptr;
6021
6022         ptr = io_uring_validate_mmap_request(file, vma->vm_pgoff, sz);
6023         if (IS_ERR(ptr))
6024                 return PTR_ERR(ptr);
6025
6026         pfn = virt_to_phys(ptr) >> PAGE_SHIFT;
6027         return remap_pfn_range(vma, vma->vm_start, pfn, sz, vma->vm_page_prot);
6028 }
6029
6030 #else /* !CONFIG_MMU */
6031
6032 static int io_uring_mmap(struct file *file, struct vm_area_struct *vma)
6033 {
6034         return vma->vm_flags & (VM_SHARED | VM_MAYSHARE) ? 0 : -EINVAL;
6035 }
6036
6037 static unsigned int io_uring_nommu_mmap_capabilities(struct file *file)
6038 {
6039         return NOMMU_MAP_DIRECT | NOMMU_MAP_READ | NOMMU_MAP_WRITE;
6040 }
6041
6042 static unsigned long io_uring_nommu_get_unmapped_area(struct file *file,
6043         unsigned long addr, unsigned long len,
6044         unsigned long pgoff, unsigned long flags)
6045 {
6046         void *ptr;
6047
6048         ptr = io_uring_validate_mmap_request(file, pgoff, len);
6049         if (IS_ERR(ptr))
6050                 return PTR_ERR(ptr);
6051
6052         return (unsigned long) ptr;
6053 }
6054
6055 #endif /* !CONFIG_MMU */
6056
6057 SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit,
6058                 u32, min_complete, u32, flags, const sigset_t __user *, sig,
6059                 size_t, sigsz)
6060 {
6061         struct io_ring_ctx *ctx;
6062         long ret = -EBADF;
6063         int submitted = 0;
6064         struct fd f;
6065
6066         if (flags & ~(IORING_ENTER_GETEVENTS | IORING_ENTER_SQ_WAKEUP))
6067                 return -EINVAL;
6068
6069         f = fdget(fd);
6070         if (!f.file)
6071                 return -EBADF;
6072
6073         ret = -EOPNOTSUPP;
6074         if (f.file->f_op != &io_uring_fops)
6075                 goto out_fput;
6076
6077         ret = -ENXIO;
6078         ctx = f.file->private_data;
6079         if (!percpu_ref_tryget(&ctx->refs))
6080                 goto out_fput;
6081
6082         /*
6083          * For SQ polling, the thread will do all submissions and completions.
6084          * Just return the requested submit count, and wake the thread if
6085          * we were asked to.
6086          */
6087         ret = 0;
6088         if (ctx->flags & IORING_SETUP_SQPOLL) {
6089                 if (!list_empty_careful(&ctx->cq_overflow_list))
6090                         io_cqring_overflow_flush(ctx, false);
6091                 if (flags & IORING_ENTER_SQ_WAKEUP)
6092                         wake_up(&ctx->sqo_wait);
6093                 submitted = to_submit;
6094         } else if (to_submit) {
6095                 struct mm_struct *cur_mm;
6096
6097                 if (current->mm != ctx->sqo_mm ||
6098                     current_cred() != ctx->creds) {
6099                         ret = -EPERM;
6100                         goto out;
6101                 }
6102
6103                 to_submit = min(to_submit, ctx->sq_entries);
6104                 mutex_lock(&ctx->uring_lock);
6105                 /* already have mm, so io_submit_sqes() won't try to grab it */
6106                 cur_mm = ctx->sqo_mm;
6107                 submitted = io_submit_sqes(ctx, to_submit, f.file, fd,
6108                                            &cur_mm, false);
6109                 mutex_unlock(&ctx->uring_lock);
6110
6111                 if (submitted != to_submit)
6112                         goto out;
6113         }
6114         if (flags & IORING_ENTER_GETEVENTS) {
6115                 unsigned nr_events = 0;
6116
6117                 min_complete = min(min_complete, ctx->cq_entries);
6118
6119                 if (ctx->flags & IORING_SETUP_IOPOLL) {
6120                         ret = io_iopoll_check(ctx, &nr_events, min_complete);
6121                 } else {
6122                         ret = io_cqring_wait(ctx, min_complete, sig, sigsz);
6123                 }
6124         }
6125
6126 out:
6127         percpu_ref_put(&ctx->refs);
6128 out_fput:
6129         fdput(f);
6130         return submitted ? submitted : ret;
6131 }
6132
6133 static const struct file_operations io_uring_fops = {
6134         .release        = io_uring_release,
6135         .flush          = io_uring_flush,
6136         .mmap           = io_uring_mmap,
6137 #ifndef CONFIG_MMU
6138         .get_unmapped_area = io_uring_nommu_get_unmapped_area,
6139         .mmap_capabilities = io_uring_nommu_mmap_capabilities,
6140 #endif
6141         .poll           = io_uring_poll,
6142         .fasync         = io_uring_fasync,
6143 };
6144
6145 static int io_allocate_scq_urings(struct io_ring_ctx *ctx,
6146                                   struct io_uring_params *p)
6147 {
6148         struct io_rings *rings;
6149         size_t size, sq_array_offset;
6150
6151         size = rings_size(p->sq_entries, p->cq_entries, &sq_array_offset);
6152         if (size == SIZE_MAX)
6153                 return -EOVERFLOW;
6154
6155         rings = io_mem_alloc(size);
6156         if (!rings)
6157                 return -ENOMEM;
6158
6159         ctx->rings = rings;
6160         ctx->sq_array = (u32 *)((char *)rings + sq_array_offset);
6161         rings->sq_ring_mask = p->sq_entries - 1;
6162         rings->cq_ring_mask = p->cq_entries - 1;
6163         rings->sq_ring_entries = p->sq_entries;
6164         rings->cq_ring_entries = p->cq_entries;
6165         ctx->sq_mask = rings->sq_ring_mask;
6166         ctx->cq_mask = rings->cq_ring_mask;
6167         ctx->sq_entries = rings->sq_ring_entries;
6168         ctx->cq_entries = rings->cq_ring_entries;
6169
6170         size = array_size(sizeof(struct io_uring_sqe), p->sq_entries);
6171         if (size == SIZE_MAX) {
6172                 io_mem_free(ctx->rings);
6173                 ctx->rings = NULL;
6174                 return -EOVERFLOW;
6175         }
6176
6177         ctx->sq_sqes = io_mem_alloc(size);
6178         if (!ctx->sq_sqes) {
6179                 io_mem_free(ctx->rings);
6180                 ctx->rings = NULL;
6181                 return -ENOMEM;
6182         }
6183
6184         return 0;
6185 }
6186
6187 /*
6188  * Allocate an anonymous fd, this is what constitutes the application
6189  * visible backing of an io_uring instance. The application mmaps this
6190  * fd to gain access to the SQ/CQ ring details. If UNIX sockets are enabled,
6191  * we have to tie this fd to a socket for file garbage collection purposes.
6192  */
6193 static int io_uring_get_fd(struct io_ring_ctx *ctx)
6194 {
6195         struct file *file;
6196         int ret;
6197
6198 #if defined(CONFIG_UNIX)
6199         ret = sock_create_kern(&init_net, PF_UNIX, SOCK_RAW, IPPROTO_IP,
6200                                 &ctx->ring_sock);
6201         if (ret)
6202                 return ret;
6203 #endif
6204
6205         ret = get_unused_fd_flags(O_RDWR | O_CLOEXEC);
6206         if (ret < 0)
6207                 goto err;
6208
6209         file = anon_inode_getfile("[io_uring]", &io_uring_fops, ctx,
6210                                         O_RDWR | O_CLOEXEC);
6211         if (IS_ERR(file)) {
6212                 put_unused_fd(ret);
6213                 ret = PTR_ERR(file);
6214                 goto err;
6215         }
6216
6217 #if defined(CONFIG_UNIX)
6218         ctx->ring_sock->file = file;
6219 #endif
6220         fd_install(ret, file);
6221         return ret;
6222 err:
6223 #if defined(CONFIG_UNIX)
6224         sock_release(ctx->ring_sock);
6225         ctx->ring_sock = NULL;
6226 #endif
6227         return ret;
6228 }
6229
6230 static int io_uring_create(unsigned entries, struct io_uring_params *p)
6231 {
6232         struct user_struct *user = NULL;
6233         struct io_ring_ctx *ctx;
6234         bool account_mem;
6235         int ret;
6236
6237         if (!entries)
6238                 return -EINVAL;
6239         if (entries > IORING_MAX_ENTRIES) {
6240                 if (!(p->flags & IORING_SETUP_CLAMP))
6241                         return -EINVAL;
6242                 entries = IORING_MAX_ENTRIES;
6243         }
6244
6245         /*
6246          * Use twice as many entries for the CQ ring. It's possible for the
6247          * application to drive a higher depth than the size of the SQ ring,
6248          * since the sqes are only used at submission time. This allows for
6249          * some flexibility in overcommitting a bit. If the application has
6250          * set IORING_SETUP_CQSIZE, it will have passed in the desired number
6251          * of CQ ring entries manually.
6252          */
6253         p->sq_entries = roundup_pow_of_two(entries);
6254         if (p->flags & IORING_SETUP_CQSIZE) {
6255                 /*
6256                  * If IORING_SETUP_CQSIZE is set, we do the same roundup
6257                  * to a power-of-two, if it isn't already. We do NOT impose
6258                  * any cq vs sq ring sizing.
6259                  */
6260                 if (p->cq_entries < p->sq_entries)
6261                         return -EINVAL;
6262                 if (p->cq_entries > IORING_MAX_CQ_ENTRIES) {
6263                         if (!(p->flags & IORING_SETUP_CLAMP))
6264                                 return -EINVAL;
6265                         p->cq_entries = IORING_MAX_CQ_ENTRIES;
6266                 }
6267                 p->cq_entries = roundup_pow_of_two(p->cq_entries);
6268         } else {
6269                 p->cq_entries = 2 * p->sq_entries;
6270         }
6271
6272         user = get_uid(current_user());
6273         account_mem = !capable(CAP_IPC_LOCK);
6274
6275         if (account_mem) {
6276                 ret = io_account_mem(user,
6277                                 ring_pages(p->sq_entries, p->cq_entries));
6278                 if (ret) {
6279                         free_uid(user);
6280                         return ret;
6281                 }
6282         }
6283
6284         ctx = io_ring_ctx_alloc(p);
6285         if (!ctx) {
6286                 if (account_mem)
6287                         io_unaccount_mem(user, ring_pages(p->sq_entries,
6288                                                                 p->cq_entries));
6289                 free_uid(user);
6290                 return -ENOMEM;
6291         }
6292         ctx->compat = in_compat_syscall();
6293         ctx->account_mem = account_mem;
6294         ctx->user = user;
6295         ctx->creds = get_current_cred();
6296
6297         ret = io_allocate_scq_urings(ctx, p);
6298         if (ret)
6299                 goto err;
6300
6301         ret = io_sq_offload_start(ctx, p);
6302         if (ret)
6303                 goto err;
6304
6305         memset(&p->sq_off, 0, sizeof(p->sq_off));
6306         p->sq_off.head = offsetof(struct io_rings, sq.head);
6307         p->sq_off.tail = offsetof(struct io_rings, sq.tail);
6308         p->sq_off.ring_mask = offsetof(struct io_rings, sq_ring_mask);
6309         p->sq_off.ring_entries = offsetof(struct io_rings, sq_ring_entries);
6310         p->sq_off.flags = offsetof(struct io_rings, sq_flags);
6311         p->sq_off.dropped = offsetof(struct io_rings, sq_dropped);
6312         p->sq_off.array = (char *)ctx->sq_array - (char *)ctx->rings;
6313
6314         memset(&p->cq_off, 0, sizeof(p->cq_off));
6315         p->cq_off.head = offsetof(struct io_rings, cq.head);
6316         p->cq_off.tail = offsetof(struct io_rings, cq.tail);
6317         p->cq_off.ring_mask = offsetof(struct io_rings, cq_ring_mask);
6318         p->cq_off.ring_entries = offsetof(struct io_rings, cq_ring_entries);
6319         p->cq_off.overflow = offsetof(struct io_rings, cq_overflow);
6320         p->cq_off.cqes = offsetof(struct io_rings, cqes);
6321
6322         /*
6323          * Install ring fd as the very last thing, so we don't risk someone
6324          * having closed it before we finish setup
6325          */
6326         ret = io_uring_get_fd(ctx);
6327         if (ret < 0)
6328                 goto err;
6329
6330         p->features = IORING_FEAT_SINGLE_MMAP | IORING_FEAT_NODROP |
6331                         IORING_FEAT_SUBMIT_STABLE | IORING_FEAT_RW_CUR_POS;
6332         trace_io_uring_create(ret, ctx, p->sq_entries, p->cq_entries, p->flags);
6333         return ret;
6334 err:
6335         io_ring_ctx_wait_and_kill(ctx);
6336         return ret;
6337 }
6338
6339 /*
6340  * Sets up an aio uring context, and returns the fd. Applications asks for a
6341  * ring size, we return the actual sq/cq ring sizes (among other things) in the
6342  * params structure passed in.
6343  */
6344 static long io_uring_setup(u32 entries, struct io_uring_params __user *params)
6345 {
6346         struct io_uring_params p;
6347         long ret;
6348         int i;
6349
6350         if (copy_from_user(&p, params, sizeof(p)))
6351                 return -EFAULT;
6352         for (i = 0; i < ARRAY_SIZE(p.resv); i++) {
6353                 if (p.resv[i])
6354                         return -EINVAL;
6355         }
6356
6357         if (p.flags & ~(IORING_SETUP_IOPOLL | IORING_SETUP_SQPOLL |
6358                         IORING_SETUP_SQ_AFF | IORING_SETUP_CQSIZE |
6359                         IORING_SETUP_CLAMP))
6360                 return -EINVAL;
6361
6362         ret = io_uring_create(entries, &p);
6363         if (ret < 0)
6364                 return ret;
6365
6366         if (copy_to_user(params, &p, sizeof(p)))
6367                 return -EFAULT;
6368
6369         return ret;
6370 }
6371
6372 SYSCALL_DEFINE2(io_uring_setup, u32, entries,
6373                 struct io_uring_params __user *, params)
6374 {
6375         return io_uring_setup(entries, params);
6376 }
6377
6378 static int __io_uring_register(struct io_ring_ctx *ctx, unsigned opcode,
6379                                void __user *arg, unsigned nr_args)
6380         __releases(ctx->uring_lock)
6381         __acquires(ctx->uring_lock)
6382 {
6383         int ret;
6384
6385         /*
6386          * We're inside the ring mutex, if the ref is already dying, then
6387          * someone else killed the ctx or is already going through
6388          * io_uring_register().
6389          */
6390         if (percpu_ref_is_dying(&ctx->refs))
6391                 return -ENXIO;
6392
6393         if (opcode != IORING_UNREGISTER_FILES &&
6394             opcode != IORING_REGISTER_FILES_UPDATE) {
6395                 percpu_ref_kill(&ctx->refs);
6396
6397                 /*
6398                  * Drop uring mutex before waiting for references to exit. If
6399                  * another thread is currently inside io_uring_enter() it might
6400                  * need to grab the uring_lock to make progress. If we hold it
6401                  * here across the drain wait, then we can deadlock. It's safe
6402                  * to drop the mutex here, since no new references will come in
6403                  * after we've killed the percpu ref.
6404                  */
6405                 mutex_unlock(&ctx->uring_lock);
6406                 wait_for_completion(&ctx->completions[0]);
6407                 mutex_lock(&ctx->uring_lock);
6408         }
6409
6410         switch (opcode) {
6411         case IORING_REGISTER_BUFFERS:
6412                 ret = io_sqe_buffer_register(ctx, arg, nr_args);
6413                 break;
6414         case IORING_UNREGISTER_BUFFERS:
6415                 ret = -EINVAL;
6416                 if (arg || nr_args)
6417                         break;
6418                 ret = io_sqe_buffer_unregister(ctx);
6419                 break;
6420         case IORING_REGISTER_FILES:
6421                 ret = io_sqe_files_register(ctx, arg, nr_args);
6422                 break;
6423         case IORING_UNREGISTER_FILES:
6424                 ret = -EINVAL;
6425                 if (arg || nr_args)
6426                         break;
6427                 ret = io_sqe_files_unregister(ctx);
6428                 break;
6429         case IORING_REGISTER_FILES_UPDATE:
6430                 ret = io_sqe_files_update(ctx, arg, nr_args);
6431                 break;
6432         case IORING_REGISTER_EVENTFD:
6433                 ret = -EINVAL;
6434                 if (nr_args != 1)
6435                         break;
6436                 ret = io_eventfd_register(ctx, arg);
6437                 break;
6438         case IORING_UNREGISTER_EVENTFD:
6439                 ret = -EINVAL;
6440                 if (arg || nr_args)
6441                         break;
6442                 ret = io_eventfd_unregister(ctx);
6443                 break;
6444         default:
6445                 ret = -EINVAL;
6446                 break;
6447         }
6448
6449
6450         if (opcode != IORING_UNREGISTER_FILES &&
6451             opcode != IORING_REGISTER_FILES_UPDATE) {
6452                 /* bring the ctx back to life */
6453                 reinit_completion(&ctx->completions[0]);
6454                 percpu_ref_reinit(&ctx->refs);
6455         }
6456         return ret;
6457 }
6458
6459 SYSCALL_DEFINE4(io_uring_register, unsigned int, fd, unsigned int, opcode,
6460                 void __user *, arg, unsigned int, nr_args)
6461 {
6462         struct io_ring_ctx *ctx;
6463         long ret = -EBADF;
6464         struct fd f;
6465
6466         f = fdget(fd);
6467         if (!f.file)
6468                 return -EBADF;
6469
6470         ret = -EOPNOTSUPP;
6471         if (f.file->f_op != &io_uring_fops)
6472                 goto out_fput;
6473
6474         ctx = f.file->private_data;
6475
6476         mutex_lock(&ctx->uring_lock);
6477         ret = __io_uring_register(ctx, opcode, arg, nr_args);
6478         mutex_unlock(&ctx->uring_lock);
6479         trace_io_uring_register(ctx, opcode, ctx->nr_user_files, ctx->nr_user_bufs,
6480                                                         ctx->cq_ev_fd != NULL, ret);
6481 out_fput:
6482         fdput(f);
6483         return ret;
6484 }
6485
6486 static int __init io_uring_init(void)
6487 {
6488         BUILD_BUG_ON(ARRAY_SIZE(io_op_defs) != IORING_OP_LAST);
6489         req_cachep = KMEM_CACHE(io_kiocb, SLAB_HWCACHE_ALIGN | SLAB_PANIC);
6490         return 0;
6491 };
6492 __initcall(io_uring_init);