OSDN Git Service

io_uring: remove two unnecessary function declarations
[tomoyo/tomoyo-test1.git] / fs / io_uring.c
1 // SPDX-License-Identifier: GPL-2.0
2 /*
3  * Shared application/kernel submission and completion ring pairs, for
4  * supporting fast/efficient IO.
5  *
6  * A note on the read/write ordering memory barriers that are matched between
7  * the application and kernel side.
8  *
9  * After the application reads the CQ ring tail, it must use an
10  * appropriate smp_rmb() to pair with the smp_wmb() the kernel uses
11  * before writing the tail (using smp_load_acquire to read the tail will
12  * do). It also needs a smp_mb() before updating CQ head (ordering the
13  * entry load(s) with the head store), pairing with an implicit barrier
14  * through a control-dependency in io_get_cqring (smp_store_release to
15  * store head will do). Failure to do so could lead to reading invalid
16  * CQ entries.
17  *
18  * Likewise, the application must use an appropriate smp_wmb() before
19  * writing the SQ tail (ordering SQ entry stores with the tail store),
20  * which pairs with smp_load_acquire in io_get_sqring (smp_store_release
21  * to store the tail will do). And it needs a barrier ordering the SQ
22  * head load before writing new SQ entries (smp_load_acquire to read
23  * head will do).
24  *
25  * When using the SQ poll thread (IORING_SETUP_SQPOLL), the application
26  * needs to check the SQ flags for IORING_SQ_NEED_WAKEUP *after*
27  * updating the SQ tail; a full memory barrier smp_mb() is needed
28  * between.
29  *
30  * Also see the examples in the liburing library:
31  *
32  *      git://git.kernel.dk/liburing
33  *
34  * io_uring also uses READ/WRITE_ONCE() for _any_ store or load that happens
35  * from data shared between the kernel and application. This is done both
36  * for ordering purposes, but also to ensure that once a value is loaded from
37  * data that the application could potentially modify, it remains stable.
38  *
39  * Copyright (C) 2018-2019 Jens Axboe
40  * Copyright (c) 2018-2019 Christoph Hellwig
41  */
42 #include <linux/kernel.h>
43 #include <linux/init.h>
44 #include <linux/errno.h>
45 #include <linux/syscalls.h>
46 #include <linux/compat.h>
47 #include <linux/refcount.h>
48 #include <linux/uio.h>
49
50 #include <linux/sched/signal.h>
51 #include <linux/fs.h>
52 #include <linux/file.h>
53 #include <linux/fdtable.h>
54 #include <linux/mm.h>
55 #include <linux/mman.h>
56 #include <linux/mmu_context.h>
57 #include <linux/percpu.h>
58 #include <linux/slab.h>
59 #include <linux/kthread.h>
60 #include <linux/blkdev.h>
61 #include <linux/bvec.h>
62 #include <linux/net.h>
63 #include <net/sock.h>
64 #include <net/af_unix.h>
65 #include <net/scm.h>
66 #include <linux/anon_inodes.h>
67 #include <linux/sched/mm.h>
68 #include <linux/uaccess.h>
69 #include <linux/nospec.h>
70 #include <linux/sizes.h>
71 #include <linux/hugetlb.h>
72 #include <linux/highmem.h>
73 #include <linux/namei.h>
74 #include <linux/fsnotify.h>
75
76 #define CREATE_TRACE_POINTS
77 #include <trace/events/io_uring.h>
78
79 #include <uapi/linux/io_uring.h>
80
81 #include "internal.h"
82 #include "io-wq.h"
83
84 #define IORING_MAX_ENTRIES      32768
85 #define IORING_MAX_CQ_ENTRIES   (2 * IORING_MAX_ENTRIES)
86
87 /*
88  * Shift of 9 is 512 entries, or exactly one page on 64-bit archs
89  */
90 #define IORING_FILE_TABLE_SHIFT 9
91 #define IORING_MAX_FILES_TABLE  (1U << IORING_FILE_TABLE_SHIFT)
92 #define IORING_FILE_TABLE_MASK  (IORING_MAX_FILES_TABLE - 1)
93 #define IORING_MAX_FIXED_FILES  (64 * IORING_MAX_FILES_TABLE)
94
95 struct io_uring {
96         u32 head ____cacheline_aligned_in_smp;
97         u32 tail ____cacheline_aligned_in_smp;
98 };
99
100 /*
101  * This data is shared with the application through the mmap at offsets
102  * IORING_OFF_SQ_RING and IORING_OFF_CQ_RING.
103  *
104  * The offsets to the member fields are published through struct
105  * io_sqring_offsets when calling io_uring_setup.
106  */
107 struct io_rings {
108         /*
109          * Head and tail offsets into the ring; the offsets need to be
110          * masked to get valid indices.
111          *
112          * The kernel controls head of the sq ring and the tail of the cq ring,
113          * and the application controls tail of the sq ring and the head of the
114          * cq ring.
115          */
116         struct io_uring         sq, cq;
117         /*
118          * Bitmasks to apply to head and tail offsets (constant, equals
119          * ring_entries - 1)
120          */
121         u32                     sq_ring_mask, cq_ring_mask;
122         /* Ring sizes (constant, power of 2) */
123         u32                     sq_ring_entries, cq_ring_entries;
124         /*
125          * Number of invalid entries dropped by the kernel due to
126          * invalid index stored in array
127          *
128          * Written by the kernel, shouldn't be modified by the
129          * application (i.e. get number of "new events" by comparing to
130          * cached value).
131          *
132          * After a new SQ head value was read by the application this
133          * counter includes all submissions that were dropped reaching
134          * the new SQ head (and possibly more).
135          */
136         u32                     sq_dropped;
137         /*
138          * Runtime flags
139          *
140          * Written by the kernel, shouldn't be modified by the
141          * application.
142          *
143          * The application needs a full memory barrier before checking
144          * for IORING_SQ_NEED_WAKEUP after updating the sq tail.
145          */
146         u32                     sq_flags;
147         /*
148          * Number of completion events lost because the queue was full;
149          * this should be avoided by the application by making sure
150          * there are not more requests pending than there is space in
151          * the completion queue.
152          *
153          * Written by the kernel, shouldn't be modified by the
154          * application (i.e. get number of "new events" by comparing to
155          * cached value).
156          *
157          * As completion events come in out of order this counter is not
158          * ordered with any other data.
159          */
160         u32                     cq_overflow;
161         /*
162          * Ring buffer of completion events.
163          *
164          * The kernel writes completion events fresh every time they are
165          * produced, so the application is allowed to modify pending
166          * entries.
167          */
168         struct io_uring_cqe     cqes[] ____cacheline_aligned_in_smp;
169 };
170
171 struct io_mapped_ubuf {
172         u64             ubuf;
173         size_t          len;
174         struct          bio_vec *bvec;
175         unsigned int    nr_bvecs;
176 };
177
178 struct fixed_file_table {
179         struct file             **files;
180 };
181
182 enum {
183         FFD_F_ATOMIC,
184 };
185
186 struct fixed_file_data {
187         struct fixed_file_table         *table;
188         struct io_ring_ctx              *ctx;
189
190         struct percpu_ref               refs;
191         struct llist_head               put_llist;
192         unsigned long                   state;
193         struct work_struct              ref_work;
194         struct completion               done;
195 };
196
197 struct io_ring_ctx {
198         struct {
199                 struct percpu_ref       refs;
200         } ____cacheline_aligned_in_smp;
201
202         struct {
203                 unsigned int            flags;
204                 bool                    compat;
205                 bool                    account_mem;
206                 bool                    cq_overflow_flushed;
207                 bool                    drain_next;
208
209                 /*
210                  * Ring buffer of indices into array of io_uring_sqe, which is
211                  * mmapped by the application using the IORING_OFF_SQES offset.
212                  *
213                  * This indirection could e.g. be used to assign fixed
214                  * io_uring_sqe entries to operations and only submit them to
215                  * the queue when needed.
216                  *
217                  * The kernel modifies neither the indices array nor the entries
218                  * array.
219                  */
220                 u32                     *sq_array;
221                 unsigned                cached_sq_head;
222                 unsigned                sq_entries;
223                 unsigned                sq_mask;
224                 unsigned                sq_thread_idle;
225                 unsigned                cached_sq_dropped;
226                 atomic_t                cached_cq_overflow;
227                 struct io_uring_sqe     *sq_sqes;
228
229                 struct list_head        defer_list;
230                 struct list_head        timeout_list;
231                 struct list_head        cq_overflow_list;
232
233                 wait_queue_head_t       inflight_wait;
234         } ____cacheline_aligned_in_smp;
235
236         struct io_rings *rings;
237
238         /* IO offload */
239         struct io_wq            *io_wq;
240         struct task_struct      *sqo_thread;    /* if using sq thread polling */
241         struct mm_struct        *sqo_mm;
242         wait_queue_head_t       sqo_wait;
243
244         /*
245          * If used, fixed file set. Writers must ensure that ->refs is dead,
246          * readers must ensure that ->refs is alive as long as the file* is
247          * used. Only updated through io_uring_register(2).
248          */
249         struct fixed_file_data  *file_data;
250         unsigned                nr_user_files;
251
252         /* if used, fixed mapped user buffers */
253         unsigned                nr_user_bufs;
254         struct io_mapped_ubuf   *user_bufs;
255
256         struct user_struct      *user;
257
258         const struct cred       *creds;
259
260         /* 0 is for ctx quiesce/reinit/free, 1 is for sqo_thread started */
261         struct completion       *completions;
262
263         /* if all else fails... */
264         struct io_kiocb         *fallback_req;
265
266 #if defined(CONFIG_UNIX)
267         struct socket           *ring_sock;
268 #endif
269
270         struct {
271                 unsigned                cached_cq_tail;
272                 unsigned                cq_entries;
273                 unsigned                cq_mask;
274                 atomic_t                cq_timeouts;
275                 struct wait_queue_head  cq_wait;
276                 struct fasync_struct    *cq_fasync;
277                 struct eventfd_ctx      *cq_ev_fd;
278         } ____cacheline_aligned_in_smp;
279
280         struct {
281                 struct mutex            uring_lock;
282                 wait_queue_head_t       wait;
283         } ____cacheline_aligned_in_smp;
284
285         struct {
286                 spinlock_t              completion_lock;
287                 bool                    poll_multi_file;
288                 /*
289                  * ->poll_list is protected by the ctx->uring_lock for
290                  * io_uring instances that don't use IORING_SETUP_SQPOLL.
291                  * For SQPOLL, only the single threaded io_sq_thread() will
292                  * manipulate the list, hence no extra locking is needed there.
293                  */
294                 struct list_head        poll_list;
295                 struct hlist_head       *cancel_hash;
296                 unsigned                cancel_hash_bits;
297
298                 spinlock_t              inflight_lock;
299                 struct list_head        inflight_list;
300         } ____cacheline_aligned_in_smp;
301 };
302
303 /*
304  * First field must be the file pointer in all the
305  * iocb unions! See also 'struct kiocb' in <linux/fs.h>
306  */
307 struct io_poll_iocb {
308         struct file                     *file;
309         union {
310                 struct wait_queue_head  *head;
311                 u64                     addr;
312         };
313         __poll_t                        events;
314         bool                            done;
315         bool                            canceled;
316         struct wait_queue_entry         wait;
317 };
318
319 struct io_close {
320         struct file                     *file;
321         struct file                     *put_file;
322         int                             fd;
323 };
324
325 struct io_timeout_data {
326         struct io_kiocb                 *req;
327         struct hrtimer                  timer;
328         struct timespec64               ts;
329         enum hrtimer_mode               mode;
330         u32                             seq_offset;
331 };
332
333 struct io_accept {
334         struct file                     *file;
335         struct sockaddr __user          *addr;
336         int __user                      *addr_len;
337         int                             flags;
338 };
339
340 struct io_sync {
341         struct file                     *file;
342         loff_t                          len;
343         loff_t                          off;
344         int                             flags;
345         int                             mode;
346 };
347
348 struct io_cancel {
349         struct file                     *file;
350         u64                             addr;
351 };
352
353 struct io_timeout {
354         struct file                     *file;
355         u64                             addr;
356         int                             flags;
357         unsigned                        count;
358 };
359
360 struct io_rw {
361         /* NOTE: kiocb has the file as the first member, so don't do it here */
362         struct kiocb                    kiocb;
363         u64                             addr;
364         u64                             len;
365 };
366
367 struct io_connect {
368         struct file                     *file;
369         struct sockaddr __user          *addr;
370         int                             addr_len;
371 };
372
373 struct io_sr_msg {
374         struct file                     *file;
375         struct user_msghdr __user       *msg;
376         int                             msg_flags;
377 };
378
379 struct io_open {
380         struct file                     *file;
381         int                             dfd;
382         union {
383                 umode_t                 mode;
384                 unsigned                mask;
385         };
386         const char __user               *fname;
387         struct filename                 *filename;
388         struct statx __user             *buffer;
389         int                             flags;
390 };
391
392 struct io_files_update {
393         struct file                     *file;
394         u64                             arg;
395         u32                             nr_args;
396         u32                             offset;
397 };
398
399 struct io_async_connect {
400         struct sockaddr_storage         address;
401 };
402
403 struct io_async_msghdr {
404         struct iovec                    fast_iov[UIO_FASTIOV];
405         struct iovec                    *iov;
406         struct sockaddr __user          *uaddr;
407         struct msghdr                   msg;
408 };
409
410 struct io_async_rw {
411         struct iovec                    fast_iov[UIO_FASTIOV];
412         struct iovec                    *iov;
413         ssize_t                         nr_segs;
414         ssize_t                         size;
415 };
416
417 struct io_async_open {
418         struct filename                 *filename;
419 };
420
421 struct io_async_ctx {
422         union {
423                 struct io_async_rw      rw;
424                 struct io_async_msghdr  msg;
425                 struct io_async_connect connect;
426                 struct io_timeout_data  timeout;
427                 struct io_async_open    open;
428         };
429 };
430
431 /*
432  * NOTE! Each of the iocb union members has the file pointer
433  * as the first entry in their struct definition. So you can
434  * access the file pointer through any of the sub-structs,
435  * or directly as just 'ki_filp' in this struct.
436  */
437 struct io_kiocb {
438         union {
439                 struct file             *file;
440                 struct io_rw            rw;
441                 struct io_poll_iocb     poll;
442                 struct io_accept        accept;
443                 struct io_sync          sync;
444                 struct io_cancel        cancel;
445                 struct io_timeout       timeout;
446                 struct io_connect       connect;
447                 struct io_sr_msg        sr_msg;
448                 struct io_open          open;
449                 struct io_close         close;
450                 struct io_files_update  files_update;
451         };
452
453         struct io_async_ctx             *io;
454         struct file                     *ring_file;
455         int                             ring_fd;
456         bool                            has_user;
457         bool                            in_async;
458         bool                            needs_fixed_file;
459         u8                              opcode;
460
461         struct io_ring_ctx      *ctx;
462         union {
463                 struct list_head        list;
464                 struct hlist_node       hash_node;
465         };
466         struct list_head        link_list;
467         unsigned int            flags;
468         refcount_t              refs;
469 #define REQ_F_NOWAIT            1       /* must not punt to workers */
470 #define REQ_F_IOPOLL_COMPLETED  2       /* polled IO has completed */
471 #define REQ_F_FIXED_FILE        4       /* ctx owns file */
472 #define REQ_F_LINK_NEXT         8       /* already grabbed next link */
473 #define REQ_F_IO_DRAIN          16      /* drain existing IO first */
474 #define REQ_F_IO_DRAINED        32      /* drain done */
475 #define REQ_F_LINK              64      /* linked sqes */
476 #define REQ_F_LINK_TIMEOUT      128     /* has linked timeout */
477 #define REQ_F_FAIL_LINK         256     /* fail rest of links */
478 #define REQ_F_DRAIN_LINK        512     /* link should be fully drained */
479 #define REQ_F_TIMEOUT           1024    /* timeout request */
480 #define REQ_F_ISREG             2048    /* regular file */
481 #define REQ_F_MUST_PUNT         4096    /* must be punted even for NONBLOCK */
482 #define REQ_F_TIMEOUT_NOSEQ     8192    /* no timeout sequence */
483 #define REQ_F_INFLIGHT          16384   /* on inflight list */
484 #define REQ_F_COMP_LOCKED       32768   /* completion under lock */
485 #define REQ_F_HARDLINK          65536   /* doesn't sever on completion < 0 */
486 #define REQ_F_FORCE_ASYNC       131072  /* IOSQE_ASYNC */
487         u64                     user_data;
488         u32                     result;
489         u32                     sequence;
490
491         struct list_head        inflight_entry;
492
493         struct io_wq_work       work;
494 };
495
496 #define IO_PLUG_THRESHOLD               2
497 #define IO_IOPOLL_BATCH                 8
498
499 struct io_submit_state {
500         struct blk_plug         plug;
501
502         /*
503          * io_kiocb alloc cache
504          */
505         void                    *reqs[IO_IOPOLL_BATCH];
506         unsigned                int free_reqs;
507         unsigned                int cur_req;
508
509         /*
510          * File reference cache
511          */
512         struct file             *file;
513         unsigned int            fd;
514         unsigned int            has_refs;
515         unsigned int            used_refs;
516         unsigned int            ios_left;
517 };
518
519 static void io_wq_submit_work(struct io_wq_work **workptr);
520 static void io_cqring_fill_event(struct io_kiocb *req, long res);
521 static void io_put_req(struct io_kiocb *req);
522 static void __io_double_put_req(struct io_kiocb *req);
523 static struct io_kiocb *io_prep_linked_timeout(struct io_kiocb *req);
524 static void io_queue_linked_timeout(struct io_kiocb *req);
525 static int __io_sqe_files_update(struct io_ring_ctx *ctx,
526                                  struct io_uring_files_update *ip,
527                                  unsigned nr_args);
528
529 static struct kmem_cache *req_cachep;
530
531 static const struct file_operations io_uring_fops;
532
533 struct sock *io_uring_get_socket(struct file *file)
534 {
535 #if defined(CONFIG_UNIX)
536         if (file->f_op == &io_uring_fops) {
537                 struct io_ring_ctx *ctx = file->private_data;
538
539                 return ctx->ring_sock->sk;
540         }
541 #endif
542         return NULL;
543 }
544 EXPORT_SYMBOL(io_uring_get_socket);
545
546 static void io_ring_ctx_ref_free(struct percpu_ref *ref)
547 {
548         struct io_ring_ctx *ctx = container_of(ref, struct io_ring_ctx, refs);
549
550         complete(&ctx->completions[0]);
551 }
552
553 static struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
554 {
555         struct io_ring_ctx *ctx;
556         int hash_bits;
557
558         ctx = kzalloc(sizeof(*ctx), GFP_KERNEL);
559         if (!ctx)
560                 return NULL;
561
562         ctx->fallback_req = kmem_cache_alloc(req_cachep, GFP_KERNEL);
563         if (!ctx->fallback_req)
564                 goto err;
565
566         ctx->completions = kmalloc(2 * sizeof(struct completion), GFP_KERNEL);
567         if (!ctx->completions)
568                 goto err;
569
570         /*
571          * Use 5 bits less than the max cq entries, that should give us around
572          * 32 entries per hash list if totally full and uniformly spread.
573          */
574         hash_bits = ilog2(p->cq_entries);
575         hash_bits -= 5;
576         if (hash_bits <= 0)
577                 hash_bits = 1;
578         ctx->cancel_hash_bits = hash_bits;
579         ctx->cancel_hash = kmalloc((1U << hash_bits) * sizeof(struct hlist_head),
580                                         GFP_KERNEL);
581         if (!ctx->cancel_hash)
582                 goto err;
583         __hash_init(ctx->cancel_hash, 1U << hash_bits);
584
585         if (percpu_ref_init(&ctx->refs, io_ring_ctx_ref_free,
586                             PERCPU_REF_ALLOW_REINIT, GFP_KERNEL))
587                 goto err;
588
589         ctx->flags = p->flags;
590         init_waitqueue_head(&ctx->cq_wait);
591         INIT_LIST_HEAD(&ctx->cq_overflow_list);
592         init_completion(&ctx->completions[0]);
593         init_completion(&ctx->completions[1]);
594         mutex_init(&ctx->uring_lock);
595         init_waitqueue_head(&ctx->wait);
596         spin_lock_init(&ctx->completion_lock);
597         INIT_LIST_HEAD(&ctx->poll_list);
598         INIT_LIST_HEAD(&ctx->defer_list);
599         INIT_LIST_HEAD(&ctx->timeout_list);
600         init_waitqueue_head(&ctx->inflight_wait);
601         spin_lock_init(&ctx->inflight_lock);
602         INIT_LIST_HEAD(&ctx->inflight_list);
603         return ctx;
604 err:
605         if (ctx->fallback_req)
606                 kmem_cache_free(req_cachep, ctx->fallback_req);
607         kfree(ctx->completions);
608         kfree(ctx->cancel_hash);
609         kfree(ctx);
610         return NULL;
611 }
612
613 static inline bool __req_need_defer(struct io_kiocb *req)
614 {
615         struct io_ring_ctx *ctx = req->ctx;
616
617         return req->sequence != ctx->cached_cq_tail + ctx->cached_sq_dropped
618                                         + atomic_read(&ctx->cached_cq_overflow);
619 }
620
621 static inline bool req_need_defer(struct io_kiocb *req)
622 {
623         if ((req->flags & (REQ_F_IO_DRAIN|REQ_F_IO_DRAINED)) == REQ_F_IO_DRAIN)
624                 return __req_need_defer(req);
625
626         return false;
627 }
628
629 static struct io_kiocb *io_get_deferred_req(struct io_ring_ctx *ctx)
630 {
631         struct io_kiocb *req;
632
633         req = list_first_entry_or_null(&ctx->defer_list, struct io_kiocb, list);
634         if (req && !req_need_defer(req)) {
635                 list_del_init(&req->list);
636                 return req;
637         }
638
639         return NULL;
640 }
641
642 static struct io_kiocb *io_get_timeout_req(struct io_ring_ctx *ctx)
643 {
644         struct io_kiocb *req;
645
646         req = list_first_entry_or_null(&ctx->timeout_list, struct io_kiocb, list);
647         if (req) {
648                 if (req->flags & REQ_F_TIMEOUT_NOSEQ)
649                         return NULL;
650                 if (!__req_need_defer(req)) {
651                         list_del_init(&req->list);
652                         return req;
653                 }
654         }
655
656         return NULL;
657 }
658
659 static void __io_commit_cqring(struct io_ring_ctx *ctx)
660 {
661         struct io_rings *rings = ctx->rings;
662
663         if (ctx->cached_cq_tail != READ_ONCE(rings->cq.tail)) {
664                 /* order cqe stores with ring update */
665                 smp_store_release(&rings->cq.tail, ctx->cached_cq_tail);
666
667                 if (wq_has_sleeper(&ctx->cq_wait)) {
668                         wake_up_interruptible(&ctx->cq_wait);
669                         kill_fasync(&ctx->cq_fasync, SIGIO, POLL_IN);
670                 }
671         }
672 }
673
674 static inline bool io_req_needs_user(struct io_kiocb *req)
675 {
676         return !(req->opcode == IORING_OP_READ_FIXED ||
677                  req->opcode == IORING_OP_WRITE_FIXED);
678 }
679
680 static inline bool io_prep_async_work(struct io_kiocb *req,
681                                       struct io_kiocb **link)
682 {
683         bool do_hashed = false;
684
685         switch (req->opcode) {
686         case IORING_OP_WRITEV:
687         case IORING_OP_WRITE_FIXED:
688                 /* only regular files should be hashed for writes */
689                 if (req->flags & REQ_F_ISREG)
690                         do_hashed = true;
691                 /* fall-through */
692         case IORING_OP_READV:
693         case IORING_OP_READ_FIXED:
694         case IORING_OP_SENDMSG:
695         case IORING_OP_RECVMSG:
696         case IORING_OP_ACCEPT:
697         case IORING_OP_POLL_ADD:
698         case IORING_OP_CONNECT:
699                 /*
700                  * We know REQ_F_ISREG is not set on some of these
701                  * opcodes, but this enables us to keep the check in
702                  * just one place.
703                  */
704                 if (!(req->flags & REQ_F_ISREG))
705                         req->work.flags |= IO_WQ_WORK_UNBOUND;
706                 break;
707         }
708         if (io_req_needs_user(req))
709                 req->work.flags |= IO_WQ_WORK_NEEDS_USER;
710
711         *link = io_prep_linked_timeout(req);
712         return do_hashed;
713 }
714
715 static inline void io_queue_async_work(struct io_kiocb *req)
716 {
717         struct io_ring_ctx *ctx = req->ctx;
718         struct io_kiocb *link;
719         bool do_hashed;
720
721         do_hashed = io_prep_async_work(req, &link);
722
723         trace_io_uring_queue_async_work(ctx, do_hashed, req, &req->work,
724                                         req->flags);
725         if (!do_hashed) {
726                 io_wq_enqueue(ctx->io_wq, &req->work);
727         } else {
728                 io_wq_enqueue_hashed(ctx->io_wq, &req->work,
729                                         file_inode(req->file));
730         }
731
732         if (link)
733                 io_queue_linked_timeout(link);
734 }
735
736 static void io_kill_timeout(struct io_kiocb *req)
737 {
738         int ret;
739
740         ret = hrtimer_try_to_cancel(&req->io->timeout.timer);
741         if (ret != -1) {
742                 atomic_inc(&req->ctx->cq_timeouts);
743                 list_del_init(&req->list);
744                 io_cqring_fill_event(req, 0);
745                 io_put_req(req);
746         }
747 }
748
749 static void io_kill_timeouts(struct io_ring_ctx *ctx)
750 {
751         struct io_kiocb *req, *tmp;
752
753         spin_lock_irq(&ctx->completion_lock);
754         list_for_each_entry_safe(req, tmp, &ctx->timeout_list, list)
755                 io_kill_timeout(req);
756         spin_unlock_irq(&ctx->completion_lock);
757 }
758
759 static void io_commit_cqring(struct io_ring_ctx *ctx)
760 {
761         struct io_kiocb *req;
762
763         while ((req = io_get_timeout_req(ctx)) != NULL)
764                 io_kill_timeout(req);
765
766         __io_commit_cqring(ctx);
767
768         while ((req = io_get_deferred_req(ctx)) != NULL) {
769                 req->flags |= REQ_F_IO_DRAINED;
770                 io_queue_async_work(req);
771         }
772 }
773
774 static struct io_uring_cqe *io_get_cqring(struct io_ring_ctx *ctx)
775 {
776         struct io_rings *rings = ctx->rings;
777         unsigned tail;
778
779         tail = ctx->cached_cq_tail;
780         /*
781          * writes to the cq entry need to come after reading head; the
782          * control dependency is enough as we're using WRITE_ONCE to
783          * fill the cq entry
784          */
785         if (tail - READ_ONCE(rings->cq.head) == rings->cq_ring_entries)
786                 return NULL;
787
788         ctx->cached_cq_tail++;
789         return &rings->cqes[tail & ctx->cq_mask];
790 }
791
792 static void io_cqring_ev_posted(struct io_ring_ctx *ctx)
793 {
794         if (waitqueue_active(&ctx->wait))
795                 wake_up(&ctx->wait);
796         if (waitqueue_active(&ctx->sqo_wait))
797                 wake_up(&ctx->sqo_wait);
798         if (ctx->cq_ev_fd)
799                 eventfd_signal(ctx->cq_ev_fd, 1);
800 }
801
802 /* Returns true if there are no backlogged entries after the flush */
803 static bool io_cqring_overflow_flush(struct io_ring_ctx *ctx, bool force)
804 {
805         struct io_rings *rings = ctx->rings;
806         struct io_uring_cqe *cqe;
807         struct io_kiocb *req;
808         unsigned long flags;
809         LIST_HEAD(list);
810
811         if (!force) {
812                 if (list_empty_careful(&ctx->cq_overflow_list))
813                         return true;
814                 if ((ctx->cached_cq_tail - READ_ONCE(rings->cq.head) ==
815                     rings->cq_ring_entries))
816                         return false;
817         }
818
819         spin_lock_irqsave(&ctx->completion_lock, flags);
820
821         /* if force is set, the ring is going away. always drop after that */
822         if (force)
823                 ctx->cq_overflow_flushed = true;
824
825         cqe = NULL;
826         while (!list_empty(&ctx->cq_overflow_list)) {
827                 cqe = io_get_cqring(ctx);
828                 if (!cqe && !force)
829                         break;
830
831                 req = list_first_entry(&ctx->cq_overflow_list, struct io_kiocb,
832                                                 list);
833                 list_move(&req->list, &list);
834                 if (cqe) {
835                         WRITE_ONCE(cqe->user_data, req->user_data);
836                         WRITE_ONCE(cqe->res, req->result);
837                         WRITE_ONCE(cqe->flags, 0);
838                 } else {
839                         WRITE_ONCE(ctx->rings->cq_overflow,
840                                 atomic_inc_return(&ctx->cached_cq_overflow));
841                 }
842         }
843
844         io_commit_cqring(ctx);
845         spin_unlock_irqrestore(&ctx->completion_lock, flags);
846         io_cqring_ev_posted(ctx);
847
848         while (!list_empty(&list)) {
849                 req = list_first_entry(&list, struct io_kiocb, list);
850                 list_del(&req->list);
851                 io_put_req(req);
852         }
853
854         return cqe != NULL;
855 }
856
857 static void io_cqring_fill_event(struct io_kiocb *req, long res)
858 {
859         struct io_ring_ctx *ctx = req->ctx;
860         struct io_uring_cqe *cqe;
861
862         trace_io_uring_complete(ctx, req->user_data, res);
863
864         /*
865          * If we can't get a cq entry, userspace overflowed the
866          * submission (by quite a lot). Increment the overflow count in
867          * the ring.
868          */
869         cqe = io_get_cqring(ctx);
870         if (likely(cqe)) {
871                 WRITE_ONCE(cqe->user_data, req->user_data);
872                 WRITE_ONCE(cqe->res, res);
873                 WRITE_ONCE(cqe->flags, 0);
874         } else if (ctx->cq_overflow_flushed) {
875                 WRITE_ONCE(ctx->rings->cq_overflow,
876                                 atomic_inc_return(&ctx->cached_cq_overflow));
877         } else {
878                 refcount_inc(&req->refs);
879                 req->result = res;
880                 list_add_tail(&req->list, &ctx->cq_overflow_list);
881         }
882 }
883
884 static void io_cqring_add_event(struct io_kiocb *req, long res)
885 {
886         struct io_ring_ctx *ctx = req->ctx;
887         unsigned long flags;
888
889         spin_lock_irqsave(&ctx->completion_lock, flags);
890         io_cqring_fill_event(req, res);
891         io_commit_cqring(ctx);
892         spin_unlock_irqrestore(&ctx->completion_lock, flags);
893
894         io_cqring_ev_posted(ctx);
895 }
896
897 static inline bool io_is_fallback_req(struct io_kiocb *req)
898 {
899         return req == (struct io_kiocb *)
900                         ((unsigned long) req->ctx->fallback_req & ~1UL);
901 }
902
903 static struct io_kiocb *io_get_fallback_req(struct io_ring_ctx *ctx)
904 {
905         struct io_kiocb *req;
906
907         req = ctx->fallback_req;
908         if (!test_and_set_bit_lock(0, (unsigned long *) ctx->fallback_req))
909                 return req;
910
911         return NULL;
912 }
913
914 static struct io_kiocb *io_get_req(struct io_ring_ctx *ctx,
915                                    struct io_submit_state *state)
916 {
917         gfp_t gfp = GFP_KERNEL | __GFP_NOWARN;
918         struct io_kiocb *req;
919
920         if (!percpu_ref_tryget(&ctx->refs))
921                 return NULL;
922
923         if (!state) {
924                 req = kmem_cache_alloc(req_cachep, gfp);
925                 if (unlikely(!req))
926                         goto fallback;
927         } else if (!state->free_reqs) {
928                 size_t sz;
929                 int ret;
930
931                 sz = min_t(size_t, state->ios_left, ARRAY_SIZE(state->reqs));
932                 ret = kmem_cache_alloc_bulk(req_cachep, gfp, sz, state->reqs);
933
934                 /*
935                  * Bulk alloc is all-or-nothing. If we fail to get a batch,
936                  * retry single alloc to be on the safe side.
937                  */
938                 if (unlikely(ret <= 0)) {
939                         state->reqs[0] = kmem_cache_alloc(req_cachep, gfp);
940                         if (!state->reqs[0])
941                                 goto fallback;
942                         ret = 1;
943                 }
944                 state->free_reqs = ret - 1;
945                 state->cur_req = 1;
946                 req = state->reqs[0];
947         } else {
948                 req = state->reqs[state->cur_req];
949                 state->free_reqs--;
950                 state->cur_req++;
951         }
952
953 got_it:
954         req->io = NULL;
955         req->ring_file = NULL;
956         req->file = NULL;
957         req->ctx = ctx;
958         req->flags = 0;
959         /* one is dropped after submission, the other at completion */
960         refcount_set(&req->refs, 2);
961         req->result = 0;
962         INIT_IO_WORK(&req->work, io_wq_submit_work);
963         return req;
964 fallback:
965         req = io_get_fallback_req(ctx);
966         if (req)
967                 goto got_it;
968         percpu_ref_put(&ctx->refs);
969         return NULL;
970 }
971
972 static void io_free_req_many(struct io_ring_ctx *ctx, void **reqs, int *nr)
973 {
974         if (*nr) {
975                 kmem_cache_free_bulk(req_cachep, *nr, reqs);
976                 percpu_ref_put_many(&ctx->refs, *nr);
977                 percpu_ref_put_many(&ctx->file_data->refs, *nr);
978                 *nr = 0;
979         }
980 }
981
982 static void __io_free_req(struct io_kiocb *req)
983 {
984         struct io_ring_ctx *ctx = req->ctx;
985
986         if (req->io)
987                 kfree(req->io);
988         if (req->file) {
989                 if (req->flags & REQ_F_FIXED_FILE)
990                         percpu_ref_put(&ctx->file_data->refs);
991                 else
992                         fput(req->file);
993         }
994         if (req->flags & REQ_F_INFLIGHT) {
995                 unsigned long flags;
996
997                 spin_lock_irqsave(&ctx->inflight_lock, flags);
998                 list_del(&req->inflight_entry);
999                 if (waitqueue_active(&ctx->inflight_wait))
1000                         wake_up(&ctx->inflight_wait);
1001                 spin_unlock_irqrestore(&ctx->inflight_lock, flags);
1002         }
1003         percpu_ref_put(&ctx->refs);
1004         if (likely(!io_is_fallback_req(req)))
1005                 kmem_cache_free(req_cachep, req);
1006         else
1007                 clear_bit_unlock(0, (unsigned long *) ctx->fallback_req);
1008 }
1009
1010 static bool io_link_cancel_timeout(struct io_kiocb *req)
1011 {
1012         struct io_ring_ctx *ctx = req->ctx;
1013         int ret;
1014
1015         ret = hrtimer_try_to_cancel(&req->io->timeout.timer);
1016         if (ret != -1) {
1017                 io_cqring_fill_event(req, -ECANCELED);
1018                 io_commit_cqring(ctx);
1019                 req->flags &= ~REQ_F_LINK;
1020                 io_put_req(req);
1021                 return true;
1022         }
1023
1024         return false;
1025 }
1026
1027 static void io_req_link_next(struct io_kiocb *req, struct io_kiocb **nxtptr)
1028 {
1029         struct io_ring_ctx *ctx = req->ctx;
1030         bool wake_ev = false;
1031
1032         /* Already got next link */
1033         if (req->flags & REQ_F_LINK_NEXT)
1034                 return;
1035
1036         /*
1037          * The list should never be empty when we are called here. But could
1038          * potentially happen if the chain is messed up, check to be on the
1039          * safe side.
1040          */
1041         while (!list_empty(&req->link_list)) {
1042                 struct io_kiocb *nxt = list_first_entry(&req->link_list,
1043                                                 struct io_kiocb, link_list);
1044
1045                 if (unlikely((req->flags & REQ_F_LINK_TIMEOUT) &&
1046                              (nxt->flags & REQ_F_TIMEOUT))) {
1047                         list_del_init(&nxt->link_list);
1048                         wake_ev |= io_link_cancel_timeout(nxt);
1049                         req->flags &= ~REQ_F_LINK_TIMEOUT;
1050                         continue;
1051                 }
1052
1053                 list_del_init(&req->link_list);
1054                 if (!list_empty(&nxt->link_list))
1055                         nxt->flags |= REQ_F_LINK;
1056                 *nxtptr = nxt;
1057                 break;
1058         }
1059
1060         req->flags |= REQ_F_LINK_NEXT;
1061         if (wake_ev)
1062                 io_cqring_ev_posted(ctx);
1063 }
1064
1065 /*
1066  * Called if REQ_F_LINK is set, and we fail the head request
1067  */
1068 static void io_fail_links(struct io_kiocb *req)
1069 {
1070         struct io_ring_ctx *ctx = req->ctx;
1071         unsigned long flags;
1072
1073         spin_lock_irqsave(&ctx->completion_lock, flags);
1074
1075         while (!list_empty(&req->link_list)) {
1076                 struct io_kiocb *link = list_first_entry(&req->link_list,
1077                                                 struct io_kiocb, link_list);
1078
1079                 list_del_init(&link->link_list);
1080                 trace_io_uring_fail_link(req, link);
1081
1082                 if ((req->flags & REQ_F_LINK_TIMEOUT) &&
1083                     link->opcode == IORING_OP_LINK_TIMEOUT) {
1084                         io_link_cancel_timeout(link);
1085                 } else {
1086                         io_cqring_fill_event(link, -ECANCELED);
1087                         __io_double_put_req(link);
1088                 }
1089                 req->flags &= ~REQ_F_LINK_TIMEOUT;
1090         }
1091
1092         io_commit_cqring(ctx);
1093         spin_unlock_irqrestore(&ctx->completion_lock, flags);
1094         io_cqring_ev_posted(ctx);
1095 }
1096
1097 static void io_req_find_next(struct io_kiocb *req, struct io_kiocb **nxt)
1098 {
1099         if (likely(!(req->flags & REQ_F_LINK)))
1100                 return;
1101
1102         /*
1103          * If LINK is set, we have dependent requests in this chain. If we
1104          * didn't fail this request, queue the first one up, moving any other
1105          * dependencies to the next request. In case of failure, fail the rest
1106          * of the chain.
1107          */
1108         if (req->flags & REQ_F_FAIL_LINK) {
1109                 io_fail_links(req);
1110         } else if ((req->flags & (REQ_F_LINK_TIMEOUT | REQ_F_COMP_LOCKED)) ==
1111                         REQ_F_LINK_TIMEOUT) {
1112                 struct io_ring_ctx *ctx = req->ctx;
1113                 unsigned long flags;
1114
1115                 /*
1116                  * If this is a timeout link, we could be racing with the
1117                  * timeout timer. Grab the completion lock for this case to
1118                  * protect against that.
1119                  */
1120                 spin_lock_irqsave(&ctx->completion_lock, flags);
1121                 io_req_link_next(req, nxt);
1122                 spin_unlock_irqrestore(&ctx->completion_lock, flags);
1123         } else {
1124                 io_req_link_next(req, nxt);
1125         }
1126 }
1127
1128 static void io_free_req(struct io_kiocb *req)
1129 {
1130         struct io_kiocb *nxt = NULL;
1131
1132         io_req_find_next(req, &nxt);
1133         __io_free_req(req);
1134
1135         if (nxt)
1136                 io_queue_async_work(nxt);
1137 }
1138
1139 /*
1140  * Drop reference to request, return next in chain (if there is one) if this
1141  * was the last reference to this request.
1142  */
1143 __attribute__((nonnull))
1144 static void io_put_req_find_next(struct io_kiocb *req, struct io_kiocb **nxtptr)
1145 {
1146         io_req_find_next(req, nxtptr);
1147
1148         if (refcount_dec_and_test(&req->refs))
1149                 __io_free_req(req);
1150 }
1151
1152 static void io_put_req(struct io_kiocb *req)
1153 {
1154         if (refcount_dec_and_test(&req->refs))
1155                 io_free_req(req);
1156 }
1157
1158 /*
1159  * Must only be used if we don't need to care about links, usually from
1160  * within the completion handling itself.
1161  */
1162 static void __io_double_put_req(struct io_kiocb *req)
1163 {
1164         /* drop both submit and complete references */
1165         if (refcount_sub_and_test(2, &req->refs))
1166                 __io_free_req(req);
1167 }
1168
1169 static void io_double_put_req(struct io_kiocb *req)
1170 {
1171         /* drop both submit and complete references */
1172         if (refcount_sub_and_test(2, &req->refs))
1173                 io_free_req(req);
1174 }
1175
1176 static unsigned io_cqring_events(struct io_ring_ctx *ctx, bool noflush)
1177 {
1178         struct io_rings *rings = ctx->rings;
1179
1180         /*
1181          * noflush == true is from the waitqueue handler, just ensure we wake
1182          * up the task, and the next invocation will flush the entries. We
1183          * cannot safely to it from here.
1184          */
1185         if (noflush && !list_empty(&ctx->cq_overflow_list))
1186                 return -1U;
1187
1188         io_cqring_overflow_flush(ctx, false);
1189
1190         /* See comment at the top of this file */
1191         smp_rmb();
1192         return READ_ONCE(rings->cq.tail) - READ_ONCE(rings->cq.head);
1193 }
1194
1195 static inline unsigned int io_sqring_entries(struct io_ring_ctx *ctx)
1196 {
1197         struct io_rings *rings = ctx->rings;
1198
1199         /* make sure SQ entry isn't read before tail */
1200         return smp_load_acquire(&rings->sq.tail) - ctx->cached_sq_head;
1201 }
1202
1203 /*
1204  * Find and free completed poll iocbs
1205  */
1206 static void io_iopoll_complete(struct io_ring_ctx *ctx, unsigned int *nr_events,
1207                                struct list_head *done)
1208 {
1209         void *reqs[IO_IOPOLL_BATCH];
1210         struct io_kiocb *req;
1211         int to_free;
1212
1213         to_free = 0;
1214         while (!list_empty(done)) {
1215                 req = list_first_entry(done, struct io_kiocb, list);
1216                 list_del(&req->list);
1217
1218                 io_cqring_fill_event(req, req->result);
1219                 (*nr_events)++;
1220
1221                 if (refcount_dec_and_test(&req->refs)) {
1222                         /* If we're not using fixed files, we have to pair the
1223                          * completion part with the file put. Use regular
1224                          * completions for those, only batch free for fixed
1225                          * file and non-linked commands.
1226                          */
1227                         if (((req->flags & (REQ_F_FIXED_FILE|REQ_F_LINK)) ==
1228                             REQ_F_FIXED_FILE) && !io_is_fallback_req(req) &&
1229                             !req->io) {
1230                                 reqs[to_free++] = req;
1231                                 if (to_free == ARRAY_SIZE(reqs))
1232                                         io_free_req_many(ctx, reqs, &to_free);
1233                         } else {
1234                                 io_free_req(req);
1235                         }
1236                 }
1237         }
1238
1239         io_commit_cqring(ctx);
1240         io_free_req_many(ctx, reqs, &to_free);
1241 }
1242
1243 static int io_do_iopoll(struct io_ring_ctx *ctx, unsigned int *nr_events,
1244                         long min)
1245 {
1246         struct io_kiocb *req, *tmp;
1247         LIST_HEAD(done);
1248         bool spin;
1249         int ret;
1250
1251         /*
1252          * Only spin for completions if we don't have multiple devices hanging
1253          * off our complete list, and we're under the requested amount.
1254          */
1255         spin = !ctx->poll_multi_file && *nr_events < min;
1256
1257         ret = 0;
1258         list_for_each_entry_safe(req, tmp, &ctx->poll_list, list) {
1259                 struct kiocb *kiocb = &req->rw.kiocb;
1260
1261                 /*
1262                  * Move completed entries to our local list. If we find a
1263                  * request that requires polling, break out and complete
1264                  * the done list first, if we have entries there.
1265                  */
1266                 if (req->flags & REQ_F_IOPOLL_COMPLETED) {
1267                         list_move_tail(&req->list, &done);
1268                         continue;
1269                 }
1270                 if (!list_empty(&done))
1271                         break;
1272
1273                 ret = kiocb->ki_filp->f_op->iopoll(kiocb, spin);
1274                 if (ret < 0)
1275                         break;
1276
1277                 if (ret && spin)
1278                         spin = false;
1279                 ret = 0;
1280         }
1281
1282         if (!list_empty(&done))
1283                 io_iopoll_complete(ctx, nr_events, &done);
1284
1285         return ret;
1286 }
1287
1288 /*
1289  * Poll for a minimum of 'min' events. Note that if min == 0 we consider that a
1290  * non-spinning poll check - we'll still enter the driver poll loop, but only
1291  * as a non-spinning completion check.
1292  */
1293 static int io_iopoll_getevents(struct io_ring_ctx *ctx, unsigned int *nr_events,
1294                                 long min)
1295 {
1296         while (!list_empty(&ctx->poll_list) && !need_resched()) {
1297                 int ret;
1298
1299                 ret = io_do_iopoll(ctx, nr_events, min);
1300                 if (ret < 0)
1301                         return ret;
1302                 if (!min || *nr_events >= min)
1303                         return 0;
1304         }
1305
1306         return 1;
1307 }
1308
1309 /*
1310  * We can't just wait for polled events to come to us, we have to actively
1311  * find and complete them.
1312  */
1313 static void io_iopoll_reap_events(struct io_ring_ctx *ctx)
1314 {
1315         if (!(ctx->flags & IORING_SETUP_IOPOLL))
1316                 return;
1317
1318         mutex_lock(&ctx->uring_lock);
1319         while (!list_empty(&ctx->poll_list)) {
1320                 unsigned int nr_events = 0;
1321
1322                 io_iopoll_getevents(ctx, &nr_events, 1);
1323
1324                 /*
1325                  * Ensure we allow local-to-the-cpu processing to take place,
1326                  * in this case we need to ensure that we reap all events.
1327                  */
1328                 cond_resched();
1329         }
1330         mutex_unlock(&ctx->uring_lock);
1331 }
1332
1333 static int __io_iopoll_check(struct io_ring_ctx *ctx, unsigned *nr_events,
1334                             long min)
1335 {
1336         int iters = 0, ret = 0;
1337
1338         do {
1339                 int tmin = 0;
1340
1341                 /*
1342                  * Don't enter poll loop if we already have events pending.
1343                  * If we do, we can potentially be spinning for commands that
1344                  * already triggered a CQE (eg in error).
1345                  */
1346                 if (io_cqring_events(ctx, false))
1347                         break;
1348
1349                 /*
1350                  * If a submit got punted to a workqueue, we can have the
1351                  * application entering polling for a command before it gets
1352                  * issued. That app will hold the uring_lock for the duration
1353                  * of the poll right here, so we need to take a breather every
1354                  * now and then to ensure that the issue has a chance to add
1355                  * the poll to the issued list. Otherwise we can spin here
1356                  * forever, while the workqueue is stuck trying to acquire the
1357                  * very same mutex.
1358                  */
1359                 if (!(++iters & 7)) {
1360                         mutex_unlock(&ctx->uring_lock);
1361                         mutex_lock(&ctx->uring_lock);
1362                 }
1363
1364                 if (*nr_events < min)
1365                         tmin = min - *nr_events;
1366
1367                 ret = io_iopoll_getevents(ctx, nr_events, tmin);
1368                 if (ret <= 0)
1369                         break;
1370                 ret = 0;
1371         } while (min && !*nr_events && !need_resched());
1372
1373         return ret;
1374 }
1375
1376 static int io_iopoll_check(struct io_ring_ctx *ctx, unsigned *nr_events,
1377                            long min)
1378 {
1379         int ret;
1380
1381         /*
1382          * We disallow the app entering submit/complete with polling, but we
1383          * still need to lock the ring to prevent racing with polled issue
1384          * that got punted to a workqueue.
1385          */
1386         mutex_lock(&ctx->uring_lock);
1387         ret = __io_iopoll_check(ctx, nr_events, min);
1388         mutex_unlock(&ctx->uring_lock);
1389         return ret;
1390 }
1391
1392 static void kiocb_end_write(struct io_kiocb *req)
1393 {
1394         /*
1395          * Tell lockdep we inherited freeze protection from submission
1396          * thread.
1397          */
1398         if (req->flags & REQ_F_ISREG) {
1399                 struct inode *inode = file_inode(req->file);
1400
1401                 __sb_writers_acquired(inode->i_sb, SB_FREEZE_WRITE);
1402         }
1403         file_end_write(req->file);
1404 }
1405
1406 static inline void req_set_fail_links(struct io_kiocb *req)
1407 {
1408         if ((req->flags & (REQ_F_LINK | REQ_F_HARDLINK)) == REQ_F_LINK)
1409                 req->flags |= REQ_F_FAIL_LINK;
1410 }
1411
1412 static void io_complete_rw_common(struct kiocb *kiocb, long res)
1413 {
1414         struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw.kiocb);
1415
1416         if (kiocb->ki_flags & IOCB_WRITE)
1417                 kiocb_end_write(req);
1418
1419         if (res != req->result)
1420                 req_set_fail_links(req);
1421         io_cqring_add_event(req, res);
1422 }
1423
1424 static void io_complete_rw(struct kiocb *kiocb, long res, long res2)
1425 {
1426         struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw.kiocb);
1427
1428         io_complete_rw_common(kiocb, res);
1429         io_put_req(req);
1430 }
1431
1432 static struct io_kiocb *__io_complete_rw(struct kiocb *kiocb, long res)
1433 {
1434         struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw.kiocb);
1435         struct io_kiocb *nxt = NULL;
1436
1437         io_complete_rw_common(kiocb, res);
1438         io_put_req_find_next(req, &nxt);
1439
1440         return nxt;
1441 }
1442
1443 static void io_complete_rw_iopoll(struct kiocb *kiocb, long res, long res2)
1444 {
1445         struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw.kiocb);
1446
1447         if (kiocb->ki_flags & IOCB_WRITE)
1448                 kiocb_end_write(req);
1449
1450         if (res != req->result)
1451                 req_set_fail_links(req);
1452         req->result = res;
1453         if (res != -EAGAIN)
1454                 req->flags |= REQ_F_IOPOLL_COMPLETED;
1455 }
1456
1457 /*
1458  * After the iocb has been issued, it's safe to be found on the poll list.
1459  * Adding the kiocb to the list AFTER submission ensures that we don't
1460  * find it from a io_iopoll_getevents() thread before the issuer is done
1461  * accessing the kiocb cookie.
1462  */
1463 static void io_iopoll_req_issued(struct io_kiocb *req)
1464 {
1465         struct io_ring_ctx *ctx = req->ctx;
1466
1467         /*
1468          * Track whether we have multiple files in our lists. This will impact
1469          * how we do polling eventually, not spinning if we're on potentially
1470          * different devices.
1471          */
1472         if (list_empty(&ctx->poll_list)) {
1473                 ctx->poll_multi_file = false;
1474         } else if (!ctx->poll_multi_file) {
1475                 struct io_kiocb *list_req;
1476
1477                 list_req = list_first_entry(&ctx->poll_list, struct io_kiocb,
1478                                                 list);
1479                 if (list_req->file != req->file)
1480                         ctx->poll_multi_file = true;
1481         }
1482
1483         /*
1484          * For fast devices, IO may have already completed. If it has, add
1485          * it to the front so we find it first.
1486          */
1487         if (req->flags & REQ_F_IOPOLL_COMPLETED)
1488                 list_add(&req->list, &ctx->poll_list);
1489         else
1490                 list_add_tail(&req->list, &ctx->poll_list);
1491 }
1492
1493 static void io_file_put(struct io_submit_state *state)
1494 {
1495         if (state->file) {
1496                 int diff = state->has_refs - state->used_refs;
1497
1498                 if (diff)
1499                         fput_many(state->file, diff);
1500                 state->file = NULL;
1501         }
1502 }
1503
1504 /*
1505  * Get as many references to a file as we have IOs left in this submission,
1506  * assuming most submissions are for one file, or at least that each file
1507  * has more than one submission.
1508  */
1509 static struct file *io_file_get(struct io_submit_state *state, int fd)
1510 {
1511         if (!state)
1512                 return fget(fd);
1513
1514         if (state->file) {
1515                 if (state->fd == fd) {
1516                         state->used_refs++;
1517                         state->ios_left--;
1518                         return state->file;
1519                 }
1520                 io_file_put(state);
1521         }
1522         state->file = fget_many(fd, state->ios_left);
1523         if (!state->file)
1524                 return NULL;
1525
1526         state->fd = fd;
1527         state->has_refs = state->ios_left;
1528         state->used_refs = 1;
1529         state->ios_left--;
1530         return state->file;
1531 }
1532
1533 /*
1534  * If we tracked the file through the SCM inflight mechanism, we could support
1535  * any file. For now, just ensure that anything potentially problematic is done
1536  * inline.
1537  */
1538 static bool io_file_supports_async(struct file *file)
1539 {
1540         umode_t mode = file_inode(file)->i_mode;
1541
1542         if (S_ISBLK(mode) || S_ISCHR(mode) || S_ISSOCK(mode))
1543                 return true;
1544         if (S_ISREG(mode) && file->f_op != &io_uring_fops)
1545                 return true;
1546
1547         return false;
1548 }
1549
1550 static int io_prep_rw(struct io_kiocb *req, const struct io_uring_sqe *sqe,
1551                       bool force_nonblock)
1552 {
1553         struct io_ring_ctx *ctx = req->ctx;
1554         struct kiocb *kiocb = &req->rw.kiocb;
1555         unsigned ioprio;
1556         int ret;
1557
1558         if (!req->file)
1559                 return -EBADF;
1560
1561         if (S_ISREG(file_inode(req->file)->i_mode))
1562                 req->flags |= REQ_F_ISREG;
1563
1564         kiocb->ki_pos = READ_ONCE(sqe->off);
1565         kiocb->ki_flags = iocb_flags(kiocb->ki_filp);
1566         kiocb->ki_hint = ki_hint_validate(file_write_hint(kiocb->ki_filp));
1567
1568         ioprio = READ_ONCE(sqe->ioprio);
1569         if (ioprio) {
1570                 ret = ioprio_check_cap(ioprio);
1571                 if (ret)
1572                         return ret;
1573
1574                 kiocb->ki_ioprio = ioprio;
1575         } else
1576                 kiocb->ki_ioprio = get_current_ioprio();
1577
1578         ret = kiocb_set_rw_flags(kiocb, READ_ONCE(sqe->rw_flags));
1579         if (unlikely(ret))
1580                 return ret;
1581
1582         /* don't allow async punt if RWF_NOWAIT was requested */
1583         if ((kiocb->ki_flags & IOCB_NOWAIT) ||
1584             (req->file->f_flags & O_NONBLOCK))
1585                 req->flags |= REQ_F_NOWAIT;
1586
1587         if (force_nonblock)
1588                 kiocb->ki_flags |= IOCB_NOWAIT;
1589
1590         if (ctx->flags & IORING_SETUP_IOPOLL) {
1591                 if (!(kiocb->ki_flags & IOCB_DIRECT) ||
1592                     !kiocb->ki_filp->f_op->iopoll)
1593                         return -EOPNOTSUPP;
1594
1595                 kiocb->ki_flags |= IOCB_HIPRI;
1596                 kiocb->ki_complete = io_complete_rw_iopoll;
1597                 req->result = 0;
1598         } else {
1599                 if (kiocb->ki_flags & IOCB_HIPRI)
1600                         return -EINVAL;
1601                 kiocb->ki_complete = io_complete_rw;
1602         }
1603
1604         req->rw.addr = READ_ONCE(sqe->addr);
1605         req->rw.len = READ_ONCE(sqe->len);
1606         /* we own ->private, reuse it for the buffer index */
1607         req->rw.kiocb.private = (void *) (unsigned long)
1608                                         READ_ONCE(sqe->buf_index);
1609         return 0;
1610 }
1611
1612 static inline void io_rw_done(struct kiocb *kiocb, ssize_t ret)
1613 {
1614         switch (ret) {
1615         case -EIOCBQUEUED:
1616                 break;
1617         case -ERESTARTSYS:
1618         case -ERESTARTNOINTR:
1619         case -ERESTARTNOHAND:
1620         case -ERESTART_RESTARTBLOCK:
1621                 /*
1622                  * We can't just restart the syscall, since previously
1623                  * submitted sqes may already be in progress. Just fail this
1624                  * IO with EINTR.
1625                  */
1626                 ret = -EINTR;
1627                 /* fall through */
1628         default:
1629                 kiocb->ki_complete(kiocb, ret, 0);
1630         }
1631 }
1632
1633 static void kiocb_done(struct kiocb *kiocb, ssize_t ret, struct io_kiocb **nxt,
1634                        bool in_async)
1635 {
1636         if (in_async && ret >= 0 && kiocb->ki_complete == io_complete_rw)
1637                 *nxt = __io_complete_rw(kiocb, ret);
1638         else
1639                 io_rw_done(kiocb, ret);
1640 }
1641
1642 static ssize_t io_import_fixed(struct io_kiocb *req, int rw,
1643                                struct iov_iter *iter)
1644 {
1645         struct io_ring_ctx *ctx = req->ctx;
1646         size_t len = req->rw.len;
1647         struct io_mapped_ubuf *imu;
1648         unsigned index, buf_index;
1649         size_t offset;
1650         u64 buf_addr;
1651
1652         /* attempt to use fixed buffers without having provided iovecs */
1653         if (unlikely(!ctx->user_bufs))
1654                 return -EFAULT;
1655
1656         buf_index = (unsigned long) req->rw.kiocb.private;
1657         if (unlikely(buf_index >= ctx->nr_user_bufs))
1658                 return -EFAULT;
1659
1660         index = array_index_nospec(buf_index, ctx->nr_user_bufs);
1661         imu = &ctx->user_bufs[index];
1662         buf_addr = req->rw.addr;
1663
1664         /* overflow */
1665         if (buf_addr + len < buf_addr)
1666                 return -EFAULT;
1667         /* not inside the mapped region */
1668         if (buf_addr < imu->ubuf || buf_addr + len > imu->ubuf + imu->len)
1669                 return -EFAULT;
1670
1671         /*
1672          * May not be a start of buffer, set size appropriately
1673          * and advance us to the beginning.
1674          */
1675         offset = buf_addr - imu->ubuf;
1676         iov_iter_bvec(iter, rw, imu->bvec, imu->nr_bvecs, offset + len);
1677
1678         if (offset) {
1679                 /*
1680                  * Don't use iov_iter_advance() here, as it's really slow for
1681                  * using the latter parts of a big fixed buffer - it iterates
1682                  * over each segment manually. We can cheat a bit here, because
1683                  * we know that:
1684                  *
1685                  * 1) it's a BVEC iter, we set it up
1686                  * 2) all bvecs are PAGE_SIZE in size, except potentially the
1687                  *    first and last bvec
1688                  *
1689                  * So just find our index, and adjust the iterator afterwards.
1690                  * If the offset is within the first bvec (or the whole first
1691                  * bvec, just use iov_iter_advance(). This makes it easier
1692                  * since we can just skip the first segment, which may not
1693                  * be PAGE_SIZE aligned.
1694                  */
1695                 const struct bio_vec *bvec = imu->bvec;
1696
1697                 if (offset <= bvec->bv_len) {
1698                         iov_iter_advance(iter, offset);
1699                 } else {
1700                         unsigned long seg_skip;
1701
1702                         /* skip first vec */
1703                         offset -= bvec->bv_len;
1704                         seg_skip = 1 + (offset >> PAGE_SHIFT);
1705
1706                         iter->bvec = bvec + seg_skip;
1707                         iter->nr_segs -= seg_skip;
1708                         iter->count -= bvec->bv_len + offset;
1709                         iter->iov_offset = offset & ~PAGE_MASK;
1710                 }
1711         }
1712
1713         return len;
1714 }
1715
1716 static ssize_t io_import_iovec(int rw, struct io_kiocb *req,
1717                                struct iovec **iovec, struct iov_iter *iter)
1718 {
1719         void __user *buf = u64_to_user_ptr(req->rw.addr);
1720         size_t sqe_len = req->rw.len;
1721         u8 opcode;
1722
1723         opcode = req->opcode;
1724         if (opcode == IORING_OP_READ_FIXED || opcode == IORING_OP_WRITE_FIXED) {
1725                 *iovec = NULL;
1726                 return io_import_fixed(req, rw, iter);
1727         }
1728
1729         /* buffer index only valid with fixed read/write */
1730         if (req->rw.kiocb.private)
1731                 return -EINVAL;
1732
1733         if (req->io) {
1734                 struct io_async_rw *iorw = &req->io->rw;
1735
1736                 *iovec = iorw->iov;
1737                 iov_iter_init(iter, rw, *iovec, iorw->nr_segs, iorw->size);
1738                 if (iorw->iov == iorw->fast_iov)
1739                         *iovec = NULL;
1740                 return iorw->size;
1741         }
1742
1743         if (!req->has_user)
1744                 return -EFAULT;
1745
1746 #ifdef CONFIG_COMPAT
1747         if (req->ctx->compat)
1748                 return compat_import_iovec(rw, buf, sqe_len, UIO_FASTIOV,
1749                                                 iovec, iter);
1750 #endif
1751
1752         return import_iovec(rw, buf, sqe_len, UIO_FASTIOV, iovec, iter);
1753 }
1754
1755 /*
1756  * For files that don't have ->read_iter() and ->write_iter(), handle them
1757  * by looping over ->read() or ->write() manually.
1758  */
1759 static ssize_t loop_rw_iter(int rw, struct file *file, struct kiocb *kiocb,
1760                            struct iov_iter *iter)
1761 {
1762         ssize_t ret = 0;
1763
1764         /*
1765          * Don't support polled IO through this interface, and we can't
1766          * support non-blocking either. For the latter, this just causes
1767          * the kiocb to be handled from an async context.
1768          */
1769         if (kiocb->ki_flags & IOCB_HIPRI)
1770                 return -EOPNOTSUPP;
1771         if (kiocb->ki_flags & IOCB_NOWAIT)
1772                 return -EAGAIN;
1773
1774         while (iov_iter_count(iter)) {
1775                 struct iovec iovec;
1776                 ssize_t nr;
1777
1778                 if (!iov_iter_is_bvec(iter)) {
1779                         iovec = iov_iter_iovec(iter);
1780                 } else {
1781                         /* fixed buffers import bvec */
1782                         iovec.iov_base = kmap(iter->bvec->bv_page)
1783                                                 + iter->iov_offset;
1784                         iovec.iov_len = min(iter->count,
1785                                         iter->bvec->bv_len - iter->iov_offset);
1786                 }
1787
1788                 if (rw == READ) {
1789                         nr = file->f_op->read(file, iovec.iov_base,
1790                                               iovec.iov_len, &kiocb->ki_pos);
1791                 } else {
1792                         nr = file->f_op->write(file, iovec.iov_base,
1793                                                iovec.iov_len, &kiocb->ki_pos);
1794                 }
1795
1796                 if (iov_iter_is_bvec(iter))
1797                         kunmap(iter->bvec->bv_page);
1798
1799                 if (nr < 0) {
1800                         if (!ret)
1801                                 ret = nr;
1802                         break;
1803                 }
1804                 ret += nr;
1805                 if (nr != iovec.iov_len)
1806                         break;
1807                 iov_iter_advance(iter, nr);
1808         }
1809
1810         return ret;
1811 }
1812
1813 static void io_req_map_rw(struct io_kiocb *req, ssize_t io_size,
1814                           struct iovec *iovec, struct iovec *fast_iov,
1815                           struct iov_iter *iter)
1816 {
1817         req->io->rw.nr_segs = iter->nr_segs;
1818         req->io->rw.size = io_size;
1819         req->io->rw.iov = iovec;
1820         if (!req->io->rw.iov) {
1821                 req->io->rw.iov = req->io->rw.fast_iov;
1822                 memcpy(req->io->rw.iov, fast_iov,
1823                         sizeof(struct iovec) * iter->nr_segs);
1824         }
1825 }
1826
1827 static int io_alloc_async_ctx(struct io_kiocb *req)
1828 {
1829         req->io = kmalloc(sizeof(*req->io), GFP_KERNEL);
1830         return req->io == NULL;
1831 }
1832
1833 static void io_rw_async(struct io_wq_work **workptr)
1834 {
1835         struct io_kiocb *req = container_of(*workptr, struct io_kiocb, work);
1836         struct iovec *iov = NULL;
1837
1838         if (req->io->rw.iov != req->io->rw.fast_iov)
1839                 iov = req->io->rw.iov;
1840         io_wq_submit_work(workptr);
1841         kfree(iov);
1842 }
1843
1844 static int io_setup_async_rw(struct io_kiocb *req, ssize_t io_size,
1845                              struct iovec *iovec, struct iovec *fast_iov,
1846                              struct iov_iter *iter)
1847 {
1848         if (req->opcode == IORING_OP_READ_FIXED ||
1849             req->opcode == IORING_OP_WRITE_FIXED)
1850                 return 0;
1851         if (!req->io && io_alloc_async_ctx(req))
1852                 return -ENOMEM;
1853
1854         io_req_map_rw(req, io_size, iovec, fast_iov, iter);
1855         req->work.func = io_rw_async;
1856         return 0;
1857 }
1858
1859 static int io_read_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe,
1860                         bool force_nonblock)
1861 {
1862         struct io_async_ctx *io;
1863         struct iov_iter iter;
1864         ssize_t ret;
1865
1866         ret = io_prep_rw(req, sqe, force_nonblock);
1867         if (ret)
1868                 return ret;
1869
1870         if (unlikely(!(req->file->f_mode & FMODE_READ)))
1871                 return -EBADF;
1872
1873         if (!req->io)
1874                 return 0;
1875
1876         io = req->io;
1877         io->rw.iov = io->rw.fast_iov;
1878         req->io = NULL;
1879         ret = io_import_iovec(READ, req, &io->rw.iov, &iter);
1880         req->io = io;
1881         if (ret < 0)
1882                 return ret;
1883
1884         io_req_map_rw(req, ret, io->rw.iov, io->rw.fast_iov, &iter);
1885         return 0;
1886 }
1887
1888 static int io_read(struct io_kiocb *req, struct io_kiocb **nxt,
1889                    bool force_nonblock)
1890 {
1891         struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs;
1892         struct kiocb *kiocb = &req->rw.kiocb;
1893         struct iov_iter iter;
1894         size_t iov_count;
1895         ssize_t io_size, ret;
1896
1897         ret = io_import_iovec(READ, req, &iovec, &iter);
1898         if (ret < 0)
1899                 return ret;
1900
1901         /* Ensure we clear previously set non-block flag */
1902         if (!force_nonblock)
1903                 req->rw.kiocb.ki_flags &= ~IOCB_NOWAIT;
1904
1905         req->result = 0;
1906         io_size = ret;
1907         if (req->flags & REQ_F_LINK)
1908                 req->result = io_size;
1909
1910         /*
1911          * If the file doesn't support async, mark it as REQ_F_MUST_PUNT so
1912          * we know to async punt it even if it was opened O_NONBLOCK
1913          */
1914         if (force_nonblock && !io_file_supports_async(req->file)) {
1915                 req->flags |= REQ_F_MUST_PUNT;
1916                 goto copy_iov;
1917         }
1918
1919         iov_count = iov_iter_count(&iter);
1920         ret = rw_verify_area(READ, req->file, &kiocb->ki_pos, iov_count);
1921         if (!ret) {
1922                 ssize_t ret2;
1923
1924                 if (req->file->f_op->read_iter)
1925                         ret2 = call_read_iter(req->file, kiocb, &iter);
1926                 else
1927                         ret2 = loop_rw_iter(READ, req->file, kiocb, &iter);
1928
1929                 /* Catch -EAGAIN return for forced non-blocking submission */
1930                 if (!force_nonblock || ret2 != -EAGAIN) {
1931                         kiocb_done(kiocb, ret2, nxt, req->in_async);
1932                 } else {
1933 copy_iov:
1934                         ret = io_setup_async_rw(req, io_size, iovec,
1935                                                 inline_vecs, &iter);
1936                         if (ret)
1937                                 goto out_free;
1938                         return -EAGAIN;
1939                 }
1940         }
1941 out_free:
1942         if (!io_wq_current_is_worker())
1943                 kfree(iovec);
1944         return ret;
1945 }
1946
1947 static int io_write_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe,
1948                          bool force_nonblock)
1949 {
1950         struct io_async_ctx *io;
1951         struct iov_iter iter;
1952         ssize_t ret;
1953
1954         ret = io_prep_rw(req, sqe, force_nonblock);
1955         if (ret)
1956                 return ret;
1957
1958         if (unlikely(!(req->file->f_mode & FMODE_WRITE)))
1959                 return -EBADF;
1960
1961         if (!req->io)
1962                 return 0;
1963
1964         io = req->io;
1965         io->rw.iov = io->rw.fast_iov;
1966         req->io = NULL;
1967         ret = io_import_iovec(WRITE, req, &io->rw.iov, &iter);
1968         req->io = io;
1969         if (ret < 0)
1970                 return ret;
1971
1972         io_req_map_rw(req, ret, io->rw.iov, io->rw.fast_iov, &iter);
1973         return 0;
1974 }
1975
1976 static int io_write(struct io_kiocb *req, struct io_kiocb **nxt,
1977                     bool force_nonblock)
1978 {
1979         struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs;
1980         struct kiocb *kiocb = &req->rw.kiocb;
1981         struct iov_iter iter;
1982         size_t iov_count;
1983         ssize_t ret, io_size;
1984
1985         ret = io_import_iovec(WRITE, req, &iovec, &iter);
1986         if (ret < 0)
1987                 return ret;
1988
1989         /* Ensure we clear previously set non-block flag */
1990         if (!force_nonblock)
1991                 req->rw.kiocb.ki_flags &= ~IOCB_NOWAIT;
1992
1993         req->result = 0;
1994         io_size = ret;
1995         if (req->flags & REQ_F_LINK)
1996                 req->result = io_size;
1997
1998         /*
1999          * If the file doesn't support async, mark it as REQ_F_MUST_PUNT so
2000          * we know to async punt it even if it was opened O_NONBLOCK
2001          */
2002         if (force_nonblock && !io_file_supports_async(req->file)) {
2003                 req->flags |= REQ_F_MUST_PUNT;
2004                 goto copy_iov;
2005         }
2006
2007         /* file path doesn't support NOWAIT for non-direct_IO */
2008         if (force_nonblock && !(kiocb->ki_flags & IOCB_DIRECT) &&
2009             (req->flags & REQ_F_ISREG))
2010                 goto copy_iov;
2011
2012         iov_count = iov_iter_count(&iter);
2013         ret = rw_verify_area(WRITE, req->file, &kiocb->ki_pos, iov_count);
2014         if (!ret) {
2015                 ssize_t ret2;
2016
2017                 /*
2018                  * Open-code file_start_write here to grab freeze protection,
2019                  * which will be released by another thread in
2020                  * io_complete_rw().  Fool lockdep by telling it the lock got
2021                  * released so that it doesn't complain about the held lock when
2022                  * we return to userspace.
2023                  */
2024                 if (req->flags & REQ_F_ISREG) {
2025                         __sb_start_write(file_inode(req->file)->i_sb,
2026                                                 SB_FREEZE_WRITE, true);
2027                         __sb_writers_release(file_inode(req->file)->i_sb,
2028                                                 SB_FREEZE_WRITE);
2029                 }
2030                 kiocb->ki_flags |= IOCB_WRITE;
2031
2032                 if (req->file->f_op->write_iter)
2033                         ret2 = call_write_iter(req->file, kiocb, &iter);
2034                 else
2035                         ret2 = loop_rw_iter(WRITE, req->file, kiocb, &iter);
2036                 if (!force_nonblock || ret2 != -EAGAIN) {
2037                         kiocb_done(kiocb, ret2, nxt, req->in_async);
2038                 } else {
2039 copy_iov:
2040                         ret = io_setup_async_rw(req, io_size, iovec,
2041                                                 inline_vecs, &iter);
2042                         if (ret)
2043                                 goto out_free;
2044                         return -EAGAIN;
2045                 }
2046         }
2047 out_free:
2048         if (!io_wq_current_is_worker())
2049                 kfree(iovec);
2050         return ret;
2051 }
2052
2053 /*
2054  * IORING_OP_NOP just posts a completion event, nothing else.
2055  */
2056 static int io_nop(struct io_kiocb *req)
2057 {
2058         struct io_ring_ctx *ctx = req->ctx;
2059
2060         if (unlikely(ctx->flags & IORING_SETUP_IOPOLL))
2061                 return -EINVAL;
2062
2063         io_cqring_add_event(req, 0);
2064         io_put_req(req);
2065         return 0;
2066 }
2067
2068 static int io_prep_fsync(struct io_kiocb *req, const struct io_uring_sqe *sqe)
2069 {
2070         struct io_ring_ctx *ctx = req->ctx;
2071
2072         if (!req->file)
2073                 return -EBADF;
2074
2075         if (unlikely(ctx->flags & IORING_SETUP_IOPOLL))
2076                 return -EINVAL;
2077         if (unlikely(sqe->addr || sqe->ioprio || sqe->buf_index))
2078                 return -EINVAL;
2079
2080         req->sync.flags = READ_ONCE(sqe->fsync_flags);
2081         if (unlikely(req->sync.flags & ~IORING_FSYNC_DATASYNC))
2082                 return -EINVAL;
2083
2084         req->sync.off = READ_ONCE(sqe->off);
2085         req->sync.len = READ_ONCE(sqe->len);
2086         return 0;
2087 }
2088
2089 static bool io_req_cancelled(struct io_kiocb *req)
2090 {
2091         if (req->work.flags & IO_WQ_WORK_CANCEL) {
2092                 req_set_fail_links(req);
2093                 io_cqring_add_event(req, -ECANCELED);
2094                 io_put_req(req);
2095                 return true;
2096         }
2097
2098         return false;
2099 }
2100
2101 static void io_link_work_cb(struct io_wq_work **workptr)
2102 {
2103         struct io_wq_work *work = *workptr;
2104         struct io_kiocb *link = work->data;
2105
2106         io_queue_linked_timeout(link);
2107         work->func = io_wq_submit_work;
2108 }
2109
2110 static void io_wq_assign_next(struct io_wq_work **workptr, struct io_kiocb *nxt)
2111 {
2112         struct io_kiocb *link;
2113
2114         io_prep_async_work(nxt, &link);
2115         *workptr = &nxt->work;
2116         if (link) {
2117                 nxt->work.flags |= IO_WQ_WORK_CB;
2118                 nxt->work.func = io_link_work_cb;
2119                 nxt->work.data = link;
2120         }
2121 }
2122
2123 static void io_fsync_finish(struct io_wq_work **workptr)
2124 {
2125         struct io_kiocb *req = container_of(*workptr, struct io_kiocb, work);
2126         loff_t end = req->sync.off + req->sync.len;
2127         struct io_kiocb *nxt = NULL;
2128         int ret;
2129
2130         if (io_req_cancelled(req))
2131                 return;
2132
2133         ret = vfs_fsync_range(req->file, req->sync.off,
2134                                 end > 0 ? end : LLONG_MAX,
2135                                 req->sync.flags & IORING_FSYNC_DATASYNC);
2136         if (ret < 0)
2137                 req_set_fail_links(req);
2138         io_cqring_add_event(req, ret);
2139         io_put_req_find_next(req, &nxt);
2140         if (nxt)
2141                 io_wq_assign_next(workptr, nxt);
2142 }
2143
2144 static int io_fsync(struct io_kiocb *req, struct io_kiocb **nxt,
2145                     bool force_nonblock)
2146 {
2147         struct io_wq_work *work, *old_work;
2148
2149         /* fsync always requires a blocking context */
2150         if (force_nonblock) {
2151                 io_put_req(req);
2152                 req->work.func = io_fsync_finish;
2153                 return -EAGAIN;
2154         }
2155
2156         work = old_work = &req->work;
2157         io_fsync_finish(&work);
2158         if (work && work != old_work)
2159                 *nxt = container_of(work, struct io_kiocb, work);
2160         return 0;
2161 }
2162
2163 static void io_fallocate_finish(struct io_wq_work **workptr)
2164 {
2165         struct io_kiocb *req = container_of(*workptr, struct io_kiocb, work);
2166         struct io_kiocb *nxt = NULL;
2167         int ret;
2168
2169         ret = vfs_fallocate(req->file, req->sync.mode, req->sync.off,
2170                                 req->sync.len);
2171         if (ret < 0)
2172                 req_set_fail_links(req);
2173         io_cqring_add_event(req, ret);
2174         io_put_req_find_next(req, &nxt);
2175         if (nxt)
2176                 io_wq_assign_next(workptr, nxt);
2177 }
2178
2179 static int io_fallocate_prep(struct io_kiocb *req,
2180                              const struct io_uring_sqe *sqe)
2181 {
2182         if (sqe->ioprio || sqe->buf_index || sqe->rw_flags)
2183                 return -EINVAL;
2184
2185         req->sync.off = READ_ONCE(sqe->off);
2186         req->sync.len = READ_ONCE(sqe->addr);
2187         req->sync.mode = READ_ONCE(sqe->len);
2188         return 0;
2189 }
2190
2191 static int io_fallocate(struct io_kiocb *req, struct io_kiocb **nxt,
2192                         bool force_nonblock)
2193 {
2194         struct io_wq_work *work, *old_work;
2195
2196         /* fallocate always requiring blocking context */
2197         if (force_nonblock) {
2198                 io_put_req(req);
2199                 req->work.func = io_fallocate_finish;
2200                 return -EAGAIN;
2201         }
2202
2203         work = old_work = &req->work;
2204         io_fallocate_finish(&work);
2205         if (work && work != old_work)
2206                 *nxt = container_of(work, struct io_kiocb, work);
2207
2208         return 0;
2209 }
2210
2211 static int io_openat_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
2212 {
2213         int ret;
2214
2215         if (sqe->ioprio || sqe->buf_index)
2216                 return -EINVAL;
2217
2218         req->open.dfd = READ_ONCE(sqe->fd);
2219         req->open.mode = READ_ONCE(sqe->len);
2220         req->open.fname = u64_to_user_ptr(READ_ONCE(sqe->addr));
2221         req->open.flags = READ_ONCE(sqe->open_flags);
2222
2223         req->open.filename = getname(req->open.fname);
2224         if (IS_ERR(req->open.filename)) {
2225                 ret = PTR_ERR(req->open.filename);
2226                 req->open.filename = NULL;
2227                 return ret;
2228         }
2229
2230         return 0;
2231 }
2232
2233 static int io_openat(struct io_kiocb *req, struct io_kiocb **nxt,
2234                      bool force_nonblock)
2235 {
2236         struct open_flags op;
2237         struct open_how how;
2238         struct file *file;
2239         int ret;
2240
2241         if (force_nonblock) {
2242                 req->work.flags |= IO_WQ_WORK_NEEDS_FILES;
2243                 return -EAGAIN;
2244         }
2245
2246         how = build_open_how(req->open.flags, req->open.mode);
2247         ret = build_open_flags(&how, &op);
2248         if (ret)
2249                 goto err;
2250
2251         ret = get_unused_fd_flags(how.flags);
2252         if (ret < 0)
2253                 goto err;
2254
2255         file = do_filp_open(req->open.dfd, req->open.filename, &op);
2256         if (IS_ERR(file)) {
2257                 put_unused_fd(ret);
2258                 ret = PTR_ERR(file);
2259         } else {
2260                 fsnotify_open(file);
2261                 fd_install(ret, file);
2262         }
2263 err:
2264         putname(req->open.filename);
2265         if (ret < 0)
2266                 req_set_fail_links(req);
2267         io_cqring_add_event(req, ret);
2268         io_put_req_find_next(req, nxt);
2269         return 0;
2270 }
2271
2272 static int io_statx_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
2273 {
2274         unsigned lookup_flags;
2275         int ret;
2276
2277         if (sqe->ioprio || sqe->buf_index)
2278                 return -EINVAL;
2279
2280         req->open.dfd = READ_ONCE(sqe->fd);
2281         req->open.mask = READ_ONCE(sqe->len);
2282         req->open.fname = u64_to_user_ptr(READ_ONCE(sqe->addr));
2283         req->open.buffer = u64_to_user_ptr(READ_ONCE(sqe->addr2));
2284         req->open.flags = READ_ONCE(sqe->statx_flags);
2285
2286         if (vfs_stat_set_lookup_flags(&lookup_flags, req->open.flags))
2287                 return -EINVAL;
2288
2289         req->open.filename = getname_flags(req->open.fname, lookup_flags, NULL);
2290         if (IS_ERR(req->open.filename)) {
2291                 ret = PTR_ERR(req->open.filename);
2292                 req->open.filename = NULL;
2293                 return ret;
2294         }
2295
2296         return 0;
2297 }
2298
2299 static int io_statx(struct io_kiocb *req, struct io_kiocb **nxt,
2300                     bool force_nonblock)
2301 {
2302         struct io_open *ctx = &req->open;
2303         unsigned lookup_flags;
2304         struct path path;
2305         struct kstat stat;
2306         int ret;
2307
2308         if (force_nonblock)
2309                 return -EAGAIN;
2310
2311         if (vfs_stat_set_lookup_flags(&lookup_flags, ctx->flags))
2312                 return -EINVAL;
2313
2314 retry:
2315         /* filename_lookup() drops it, keep a reference */
2316         ctx->filename->refcnt++;
2317
2318         ret = filename_lookup(ctx->dfd, ctx->filename, lookup_flags, &path,
2319                                 NULL);
2320         if (ret)
2321                 goto err;
2322
2323         ret = vfs_getattr(&path, &stat, ctx->mask, ctx->flags);
2324         path_put(&path);
2325         if (retry_estale(ret, lookup_flags)) {
2326                 lookup_flags |= LOOKUP_REVAL;
2327                 goto retry;
2328         }
2329         if (!ret)
2330                 ret = cp_statx(&stat, ctx->buffer);
2331 err:
2332         putname(ctx->filename);
2333         if (ret < 0)
2334                 req_set_fail_links(req);
2335         io_cqring_add_event(req, ret);
2336         io_put_req_find_next(req, nxt);
2337         return 0;
2338 }
2339
2340 static int io_close_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
2341 {
2342         /*
2343          * If we queue this for async, it must not be cancellable. That would
2344          * leave the 'file' in an undeterminate state.
2345          */
2346         req->work.flags |= IO_WQ_WORK_NO_CANCEL;
2347
2348         if (sqe->ioprio || sqe->off || sqe->addr || sqe->len ||
2349             sqe->rw_flags || sqe->buf_index)
2350                 return -EINVAL;
2351         if (sqe->flags & IOSQE_FIXED_FILE)
2352                 return -EINVAL;
2353
2354         req->close.fd = READ_ONCE(sqe->fd);
2355         if (req->file->f_op == &io_uring_fops ||
2356             req->close.fd == req->ring_fd)
2357                 return -EBADF;
2358
2359         return 0;
2360 }
2361
2362 static void io_close_finish(struct io_wq_work **workptr)
2363 {
2364         struct io_kiocb *req = container_of(*workptr, struct io_kiocb, work);
2365         struct io_kiocb *nxt = NULL;
2366
2367         /* Invoked with files, we need to do the close */
2368         if (req->work.files) {
2369                 int ret;
2370
2371                 ret = filp_close(req->close.put_file, req->work.files);
2372                 if (ret < 0) {
2373                         req_set_fail_links(req);
2374                 }
2375                 io_cqring_add_event(req, ret);
2376         }
2377
2378         fput(req->close.put_file);
2379
2380         /* we bypassed the re-issue, drop the submission reference */
2381         io_put_req(req);
2382         io_put_req_find_next(req, &nxt);
2383         if (nxt)
2384                 io_wq_assign_next(workptr, nxt);
2385 }
2386
2387 static int io_close(struct io_kiocb *req, struct io_kiocb **nxt,
2388                     bool force_nonblock)
2389 {
2390         int ret;
2391
2392         req->close.put_file = NULL;
2393         ret = __close_fd_get_file(req->close.fd, &req->close.put_file);
2394         if (ret < 0)
2395                 return ret;
2396
2397         /* if the file has a flush method, be safe and punt to async */
2398         if (req->close.put_file->f_op->flush && !io_wq_current_is_worker()) {
2399                 req->work.flags |= IO_WQ_WORK_NEEDS_FILES;
2400                 goto eagain;
2401         }
2402
2403         /*
2404          * No ->flush(), safely close from here and just punt the
2405          * fput() to async context.
2406          */
2407         ret = filp_close(req->close.put_file, current->files);
2408
2409         if (ret < 0)
2410                 req_set_fail_links(req);
2411         io_cqring_add_event(req, ret);
2412
2413         if (io_wq_current_is_worker()) {
2414                 struct io_wq_work *old_work, *work;
2415
2416                 old_work = work = &req->work;
2417                 io_close_finish(&work);
2418                 if (work && work != old_work)
2419                         *nxt = container_of(work, struct io_kiocb, work);
2420                 return 0;
2421         }
2422
2423 eagain:
2424         req->work.func = io_close_finish;
2425         return -EAGAIN;
2426 }
2427
2428 static int io_prep_sfr(struct io_kiocb *req, const struct io_uring_sqe *sqe)
2429 {
2430         struct io_ring_ctx *ctx = req->ctx;
2431
2432         if (!req->file)
2433                 return -EBADF;
2434
2435         if (unlikely(ctx->flags & IORING_SETUP_IOPOLL))
2436                 return -EINVAL;
2437         if (unlikely(sqe->addr || sqe->ioprio || sqe->buf_index))
2438                 return -EINVAL;
2439
2440         req->sync.off = READ_ONCE(sqe->off);
2441         req->sync.len = READ_ONCE(sqe->len);
2442         req->sync.flags = READ_ONCE(sqe->sync_range_flags);
2443         return 0;
2444 }
2445
2446 static void io_sync_file_range_finish(struct io_wq_work **workptr)
2447 {
2448         struct io_kiocb *req = container_of(*workptr, struct io_kiocb, work);
2449         struct io_kiocb *nxt = NULL;
2450         int ret;
2451
2452         if (io_req_cancelled(req))
2453                 return;
2454
2455         ret = sync_file_range(req->file, req->sync.off, req->sync.len,
2456                                 req->sync.flags);
2457         if (ret < 0)
2458                 req_set_fail_links(req);
2459         io_cqring_add_event(req, ret);
2460         io_put_req_find_next(req, &nxt);
2461         if (nxt)
2462                 io_wq_assign_next(workptr, nxt);
2463 }
2464
2465 static int io_sync_file_range(struct io_kiocb *req, struct io_kiocb **nxt,
2466                               bool force_nonblock)
2467 {
2468         struct io_wq_work *work, *old_work;
2469
2470         /* sync_file_range always requires a blocking context */
2471         if (force_nonblock) {
2472                 io_put_req(req);
2473                 req->work.func = io_sync_file_range_finish;
2474                 return -EAGAIN;
2475         }
2476
2477         work = old_work = &req->work;
2478         io_sync_file_range_finish(&work);
2479         if (work && work != old_work)
2480                 *nxt = container_of(work, struct io_kiocb, work);
2481         return 0;
2482 }
2483
2484 #if defined(CONFIG_NET)
2485 static void io_sendrecv_async(struct io_wq_work **workptr)
2486 {
2487         struct io_kiocb *req = container_of(*workptr, struct io_kiocb, work);
2488         struct iovec *iov = NULL;
2489
2490         if (req->io->rw.iov != req->io->rw.fast_iov)
2491                 iov = req->io->msg.iov;
2492         io_wq_submit_work(workptr);
2493         kfree(iov);
2494 }
2495 #endif
2496
2497 static int io_sendmsg_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
2498 {
2499 #if defined(CONFIG_NET)
2500         struct io_sr_msg *sr = &req->sr_msg;
2501         struct io_async_ctx *io = req->io;
2502
2503         sr->msg_flags = READ_ONCE(sqe->msg_flags);
2504         sr->msg = u64_to_user_ptr(READ_ONCE(sqe->addr));
2505
2506         if (!io)
2507                 return 0;
2508
2509         io->msg.iov = io->msg.fast_iov;
2510         return sendmsg_copy_msghdr(&io->msg.msg, sr->msg, sr->msg_flags,
2511                                         &io->msg.iov);
2512 #else
2513         return -EOPNOTSUPP;
2514 #endif
2515 }
2516
2517 static int io_sendmsg(struct io_kiocb *req, struct io_kiocb **nxt,
2518                       bool force_nonblock)
2519 {
2520 #if defined(CONFIG_NET)
2521         struct io_async_msghdr *kmsg = NULL;
2522         struct socket *sock;
2523         int ret;
2524
2525         if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL))
2526                 return -EINVAL;
2527
2528         sock = sock_from_file(req->file, &ret);
2529         if (sock) {
2530                 struct io_async_ctx io;
2531                 struct sockaddr_storage addr;
2532                 unsigned flags;
2533
2534                 if (req->io) {
2535                         kmsg = &req->io->msg;
2536                         kmsg->msg.msg_name = &addr;
2537                         /* if iov is set, it's allocated already */
2538                         if (!kmsg->iov)
2539                                 kmsg->iov = kmsg->fast_iov;
2540                         kmsg->msg.msg_iter.iov = kmsg->iov;
2541                 } else {
2542                         struct io_sr_msg *sr = &req->sr_msg;
2543
2544                         kmsg = &io.msg;
2545                         kmsg->msg.msg_name = &addr;
2546
2547                         io.msg.iov = io.msg.fast_iov;
2548                         ret = sendmsg_copy_msghdr(&io.msg.msg, sr->msg,
2549                                         sr->msg_flags, &io.msg.iov);
2550                         if (ret)
2551                                 return ret;
2552                 }
2553
2554                 flags = req->sr_msg.msg_flags;
2555                 if (flags & MSG_DONTWAIT)
2556                         req->flags |= REQ_F_NOWAIT;
2557                 else if (force_nonblock)
2558                         flags |= MSG_DONTWAIT;
2559
2560                 ret = __sys_sendmsg_sock(sock, &kmsg->msg, flags);
2561                 if (force_nonblock && ret == -EAGAIN) {
2562                         if (req->io)
2563                                 return -EAGAIN;
2564                         if (io_alloc_async_ctx(req))
2565                                 return -ENOMEM;
2566                         memcpy(&req->io->msg, &io.msg, sizeof(io.msg));
2567                         req->work.func = io_sendrecv_async;
2568                         return -EAGAIN;
2569                 }
2570                 if (ret == -ERESTARTSYS)
2571                         ret = -EINTR;
2572         }
2573
2574         if (!io_wq_current_is_worker() && kmsg && kmsg->iov != kmsg->fast_iov)
2575                 kfree(kmsg->iov);
2576         io_cqring_add_event(req, ret);
2577         if (ret < 0)
2578                 req_set_fail_links(req);
2579         io_put_req_find_next(req, nxt);
2580         return 0;
2581 #else
2582         return -EOPNOTSUPP;
2583 #endif
2584 }
2585
2586 static int io_recvmsg_prep(struct io_kiocb *req,
2587                            const struct io_uring_sqe *sqe)
2588 {
2589 #if defined(CONFIG_NET)
2590         struct io_sr_msg *sr = &req->sr_msg;
2591         struct io_async_ctx *io = req->io;
2592
2593         sr->msg_flags = READ_ONCE(sqe->msg_flags);
2594         sr->msg = u64_to_user_ptr(READ_ONCE(sqe->addr));
2595
2596         if (!io)
2597                 return 0;
2598
2599         io->msg.iov = io->msg.fast_iov;
2600         return recvmsg_copy_msghdr(&io->msg.msg, sr->msg, sr->msg_flags,
2601                                         &io->msg.uaddr, &io->msg.iov);
2602 #else
2603         return -EOPNOTSUPP;
2604 #endif
2605 }
2606
2607 static int io_recvmsg(struct io_kiocb *req, struct io_kiocb **nxt,
2608                       bool force_nonblock)
2609 {
2610 #if defined(CONFIG_NET)
2611         struct io_async_msghdr *kmsg = NULL;
2612         struct socket *sock;
2613         int ret;
2614
2615         if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL))
2616                 return -EINVAL;
2617
2618         sock = sock_from_file(req->file, &ret);
2619         if (sock) {
2620                 struct io_async_ctx io;
2621                 struct sockaddr_storage addr;
2622                 unsigned flags;
2623
2624                 if (req->io) {
2625                         kmsg = &req->io->msg;
2626                         kmsg->msg.msg_name = &addr;
2627                         /* if iov is set, it's allocated already */
2628                         if (!kmsg->iov)
2629                                 kmsg->iov = kmsg->fast_iov;
2630                         kmsg->msg.msg_iter.iov = kmsg->iov;
2631                 } else {
2632                         struct io_sr_msg *sr = &req->sr_msg;
2633
2634                         kmsg = &io.msg;
2635                         kmsg->msg.msg_name = &addr;
2636
2637                         io.msg.iov = io.msg.fast_iov;
2638                         ret = recvmsg_copy_msghdr(&io.msg.msg, sr->msg,
2639                                         sr->msg_flags, &io.msg.uaddr,
2640                                         &io.msg.iov);
2641                         if (ret)
2642                                 return ret;
2643                 }
2644
2645                 flags = req->sr_msg.msg_flags;
2646                 if (flags & MSG_DONTWAIT)
2647                         req->flags |= REQ_F_NOWAIT;
2648                 else if (force_nonblock)
2649                         flags |= MSG_DONTWAIT;
2650
2651                 ret = __sys_recvmsg_sock(sock, &kmsg->msg, req->sr_msg.msg,
2652                                                 kmsg->uaddr, flags);
2653                 if (force_nonblock && ret == -EAGAIN) {
2654                         if (req->io)
2655                                 return -EAGAIN;
2656                         if (io_alloc_async_ctx(req))
2657                                 return -ENOMEM;
2658                         memcpy(&req->io->msg, &io.msg, sizeof(io.msg));
2659                         req->work.func = io_sendrecv_async;
2660                         return -EAGAIN;
2661                 }
2662                 if (ret == -ERESTARTSYS)
2663                         ret = -EINTR;
2664         }
2665
2666         if (!io_wq_current_is_worker() && kmsg && kmsg->iov != kmsg->fast_iov)
2667                 kfree(kmsg->iov);
2668         io_cqring_add_event(req, ret);
2669         if (ret < 0)
2670                 req_set_fail_links(req);
2671         io_put_req_find_next(req, nxt);
2672         return 0;
2673 #else
2674         return -EOPNOTSUPP;
2675 #endif
2676 }
2677
2678 static int io_accept_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
2679 {
2680 #if defined(CONFIG_NET)
2681         struct io_accept *accept = &req->accept;
2682
2683         if (unlikely(req->ctx->flags & (IORING_SETUP_IOPOLL|IORING_SETUP_SQPOLL)))
2684                 return -EINVAL;
2685         if (sqe->ioprio || sqe->len || sqe->buf_index)
2686                 return -EINVAL;
2687
2688         accept->addr = u64_to_user_ptr(READ_ONCE(sqe->addr));
2689         accept->addr_len = u64_to_user_ptr(READ_ONCE(sqe->addr2));
2690         accept->flags = READ_ONCE(sqe->accept_flags);
2691         return 0;
2692 #else
2693         return -EOPNOTSUPP;
2694 #endif
2695 }
2696
2697 #if defined(CONFIG_NET)
2698 static int __io_accept(struct io_kiocb *req, struct io_kiocb **nxt,
2699                        bool force_nonblock)
2700 {
2701         struct io_accept *accept = &req->accept;
2702         unsigned file_flags;
2703         int ret;
2704
2705         file_flags = force_nonblock ? O_NONBLOCK : 0;
2706         ret = __sys_accept4_file(req->file, file_flags, accept->addr,
2707                                         accept->addr_len, accept->flags);
2708         if (ret == -EAGAIN && force_nonblock)
2709                 return -EAGAIN;
2710         if (ret == -ERESTARTSYS)
2711                 ret = -EINTR;
2712         if (ret < 0)
2713                 req_set_fail_links(req);
2714         io_cqring_add_event(req, ret);
2715         io_put_req_find_next(req, nxt);
2716         return 0;
2717 }
2718
2719 static void io_accept_finish(struct io_wq_work **workptr)
2720 {
2721         struct io_kiocb *req = container_of(*workptr, struct io_kiocb, work);
2722         struct io_kiocb *nxt = NULL;
2723
2724         if (io_req_cancelled(req))
2725                 return;
2726         __io_accept(req, &nxt, false);
2727         if (nxt)
2728                 io_wq_assign_next(workptr, nxt);
2729 }
2730 #endif
2731
2732 static int io_accept(struct io_kiocb *req, struct io_kiocb **nxt,
2733                      bool force_nonblock)
2734 {
2735 #if defined(CONFIG_NET)
2736         int ret;
2737
2738         ret = __io_accept(req, nxt, force_nonblock);
2739         if (ret == -EAGAIN && force_nonblock) {
2740                 req->work.func = io_accept_finish;
2741                 req->work.flags |= IO_WQ_WORK_NEEDS_FILES;
2742                 io_put_req(req);
2743                 return -EAGAIN;
2744         }
2745         return 0;
2746 #else
2747         return -EOPNOTSUPP;
2748 #endif
2749 }
2750
2751 static int io_connect_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
2752 {
2753 #if defined(CONFIG_NET)
2754         struct io_connect *conn = &req->connect;
2755         struct io_async_ctx *io = req->io;
2756
2757         if (unlikely(req->ctx->flags & (IORING_SETUP_IOPOLL|IORING_SETUP_SQPOLL)))
2758                 return -EINVAL;
2759         if (sqe->ioprio || sqe->len || sqe->buf_index || sqe->rw_flags)
2760                 return -EINVAL;
2761
2762         conn->addr = u64_to_user_ptr(READ_ONCE(sqe->addr));
2763         conn->addr_len =  READ_ONCE(sqe->addr2);
2764
2765         if (!io)
2766                 return 0;
2767
2768         return move_addr_to_kernel(conn->addr, conn->addr_len,
2769                                         &io->connect.address);
2770 #else
2771         return -EOPNOTSUPP;
2772 #endif
2773 }
2774
2775 static int io_connect(struct io_kiocb *req, struct io_kiocb **nxt,
2776                       bool force_nonblock)
2777 {
2778 #if defined(CONFIG_NET)
2779         struct io_async_ctx __io, *io;
2780         unsigned file_flags;
2781         int ret;
2782
2783         if (req->io) {
2784                 io = req->io;
2785         } else {
2786                 ret = move_addr_to_kernel(req->connect.addr,
2787                                                 req->connect.addr_len,
2788                                                 &__io.connect.address);
2789                 if (ret)
2790                         goto out;
2791                 io = &__io;
2792         }
2793
2794         file_flags = force_nonblock ? O_NONBLOCK : 0;
2795
2796         ret = __sys_connect_file(req->file, &io->connect.address,
2797                                         req->connect.addr_len, file_flags);
2798         if ((ret == -EAGAIN || ret == -EINPROGRESS) && force_nonblock) {
2799                 if (req->io)
2800                         return -EAGAIN;
2801                 if (io_alloc_async_ctx(req)) {
2802                         ret = -ENOMEM;
2803                         goto out;
2804                 }
2805                 memcpy(&req->io->connect, &__io.connect, sizeof(__io.connect));
2806                 return -EAGAIN;
2807         }
2808         if (ret == -ERESTARTSYS)
2809                 ret = -EINTR;
2810 out:
2811         if (ret < 0)
2812                 req_set_fail_links(req);
2813         io_cqring_add_event(req, ret);
2814         io_put_req_find_next(req, nxt);
2815         return 0;
2816 #else
2817         return -EOPNOTSUPP;
2818 #endif
2819 }
2820
2821 static void io_poll_remove_one(struct io_kiocb *req)
2822 {
2823         struct io_poll_iocb *poll = &req->poll;
2824
2825         spin_lock(&poll->head->lock);
2826         WRITE_ONCE(poll->canceled, true);
2827         if (!list_empty(&poll->wait.entry)) {
2828                 list_del_init(&poll->wait.entry);
2829                 io_queue_async_work(req);
2830         }
2831         spin_unlock(&poll->head->lock);
2832         hash_del(&req->hash_node);
2833 }
2834
2835 static void io_poll_remove_all(struct io_ring_ctx *ctx)
2836 {
2837         struct hlist_node *tmp;
2838         struct io_kiocb *req;
2839         int i;
2840
2841         spin_lock_irq(&ctx->completion_lock);
2842         for (i = 0; i < (1U << ctx->cancel_hash_bits); i++) {
2843                 struct hlist_head *list;
2844
2845                 list = &ctx->cancel_hash[i];
2846                 hlist_for_each_entry_safe(req, tmp, list, hash_node)
2847                         io_poll_remove_one(req);
2848         }
2849         spin_unlock_irq(&ctx->completion_lock);
2850 }
2851
2852 static int io_poll_cancel(struct io_ring_ctx *ctx, __u64 sqe_addr)
2853 {
2854         struct hlist_head *list;
2855         struct io_kiocb *req;
2856
2857         list = &ctx->cancel_hash[hash_long(sqe_addr, ctx->cancel_hash_bits)];
2858         hlist_for_each_entry(req, list, hash_node) {
2859                 if (sqe_addr == req->user_data) {
2860                         io_poll_remove_one(req);
2861                         return 0;
2862                 }
2863         }
2864
2865         return -ENOENT;
2866 }
2867
2868 static int io_poll_remove_prep(struct io_kiocb *req,
2869                                const struct io_uring_sqe *sqe)
2870 {
2871         if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL))
2872                 return -EINVAL;
2873         if (sqe->ioprio || sqe->off || sqe->len || sqe->buf_index ||
2874             sqe->poll_events)
2875                 return -EINVAL;
2876
2877         req->poll.addr = READ_ONCE(sqe->addr);
2878         return 0;
2879 }
2880
2881 /*
2882  * Find a running poll command that matches one specified in sqe->addr,
2883  * and remove it if found.
2884  */
2885 static int io_poll_remove(struct io_kiocb *req)
2886 {
2887         struct io_ring_ctx *ctx = req->ctx;
2888         u64 addr;
2889         int ret;
2890
2891         addr = req->poll.addr;
2892         spin_lock_irq(&ctx->completion_lock);
2893         ret = io_poll_cancel(ctx, addr);
2894         spin_unlock_irq(&ctx->completion_lock);
2895
2896         io_cqring_add_event(req, ret);
2897         if (ret < 0)
2898                 req_set_fail_links(req);
2899         io_put_req(req);
2900         return 0;
2901 }
2902
2903 static void io_poll_complete(struct io_kiocb *req, __poll_t mask, int error)
2904 {
2905         struct io_ring_ctx *ctx = req->ctx;
2906
2907         req->poll.done = true;
2908         if (error)
2909                 io_cqring_fill_event(req, error);
2910         else
2911                 io_cqring_fill_event(req, mangle_poll(mask));
2912         io_commit_cqring(ctx);
2913 }
2914
2915 static void io_poll_complete_work(struct io_wq_work **workptr)
2916 {
2917         struct io_wq_work *work = *workptr;
2918         struct io_kiocb *req = container_of(work, struct io_kiocb, work);
2919         struct io_poll_iocb *poll = &req->poll;
2920         struct poll_table_struct pt = { ._key = poll->events };
2921         struct io_ring_ctx *ctx = req->ctx;
2922         struct io_kiocb *nxt = NULL;
2923         __poll_t mask = 0;
2924         int ret = 0;
2925
2926         if (work->flags & IO_WQ_WORK_CANCEL) {
2927                 WRITE_ONCE(poll->canceled, true);
2928                 ret = -ECANCELED;
2929         } else if (READ_ONCE(poll->canceled)) {
2930                 ret = -ECANCELED;
2931         }
2932
2933         if (ret != -ECANCELED)
2934                 mask = vfs_poll(poll->file, &pt) & poll->events;
2935
2936         /*
2937          * Note that ->ki_cancel callers also delete iocb from active_reqs after
2938          * calling ->ki_cancel.  We need the ctx_lock roundtrip here to
2939          * synchronize with them.  In the cancellation case the list_del_init
2940          * itself is not actually needed, but harmless so we keep it in to
2941          * avoid further branches in the fast path.
2942          */
2943         spin_lock_irq(&ctx->completion_lock);
2944         if (!mask && ret != -ECANCELED) {
2945                 add_wait_queue(poll->head, &poll->wait);
2946                 spin_unlock_irq(&ctx->completion_lock);
2947                 return;
2948         }
2949         hash_del(&req->hash_node);
2950         io_poll_complete(req, mask, ret);
2951         spin_unlock_irq(&ctx->completion_lock);
2952
2953         io_cqring_ev_posted(ctx);
2954
2955         if (ret < 0)
2956                 req_set_fail_links(req);
2957         io_put_req_find_next(req, &nxt);
2958         if (nxt)
2959                 io_wq_assign_next(workptr, nxt);
2960 }
2961
2962 static int io_poll_wake(struct wait_queue_entry *wait, unsigned mode, int sync,
2963                         void *key)
2964 {
2965         struct io_poll_iocb *poll = wait->private;
2966         struct io_kiocb *req = container_of(poll, struct io_kiocb, poll);
2967         struct io_ring_ctx *ctx = req->ctx;
2968         __poll_t mask = key_to_poll(key);
2969         unsigned long flags;
2970
2971         /* for instances that support it check for an event match first: */
2972         if (mask && !(mask & poll->events))
2973                 return 0;
2974
2975         list_del_init(&poll->wait.entry);
2976
2977         /*
2978          * Run completion inline if we can. We're using trylock here because
2979          * we are violating the completion_lock -> poll wq lock ordering.
2980          * If we have a link timeout we're going to need the completion_lock
2981          * for finalizing the request, mark us as having grabbed that already.
2982          */
2983         if (mask && spin_trylock_irqsave(&ctx->completion_lock, flags)) {
2984                 hash_del(&req->hash_node);
2985                 io_poll_complete(req, mask, 0);
2986                 req->flags |= REQ_F_COMP_LOCKED;
2987                 io_put_req(req);
2988                 spin_unlock_irqrestore(&ctx->completion_lock, flags);
2989
2990                 io_cqring_ev_posted(ctx);
2991         } else {
2992                 io_queue_async_work(req);
2993         }
2994
2995         return 1;
2996 }
2997
2998 struct io_poll_table {
2999         struct poll_table_struct pt;
3000         struct io_kiocb *req;
3001         int error;
3002 };
3003
3004 static void io_poll_queue_proc(struct file *file, struct wait_queue_head *head,
3005                                struct poll_table_struct *p)
3006 {
3007         struct io_poll_table *pt = container_of(p, struct io_poll_table, pt);
3008
3009         if (unlikely(pt->req->poll.head)) {
3010                 pt->error = -EINVAL;
3011                 return;
3012         }
3013
3014         pt->error = 0;
3015         pt->req->poll.head = head;
3016         add_wait_queue(head, &pt->req->poll.wait);
3017 }
3018
3019 static void io_poll_req_insert(struct io_kiocb *req)
3020 {
3021         struct io_ring_ctx *ctx = req->ctx;
3022         struct hlist_head *list;
3023
3024         list = &ctx->cancel_hash[hash_long(req->user_data, ctx->cancel_hash_bits)];
3025         hlist_add_head(&req->hash_node, list);
3026 }
3027
3028 static int io_poll_add_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
3029 {
3030         struct io_poll_iocb *poll = &req->poll;
3031         u16 events;
3032
3033         if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL))
3034                 return -EINVAL;
3035         if (sqe->addr || sqe->ioprio || sqe->off || sqe->len || sqe->buf_index)
3036                 return -EINVAL;
3037         if (!poll->file)
3038                 return -EBADF;
3039
3040         events = READ_ONCE(sqe->poll_events);
3041         poll->events = demangle_poll(events) | EPOLLERR | EPOLLHUP;
3042         return 0;
3043 }
3044
3045 static int io_poll_add(struct io_kiocb *req, struct io_kiocb **nxt)
3046 {
3047         struct io_poll_iocb *poll = &req->poll;
3048         struct io_ring_ctx *ctx = req->ctx;
3049         struct io_poll_table ipt;
3050         bool cancel = false;
3051         __poll_t mask;
3052
3053         INIT_IO_WORK(&req->work, io_poll_complete_work);
3054         INIT_HLIST_NODE(&req->hash_node);
3055
3056         poll->head = NULL;
3057         poll->done = false;
3058         poll->canceled = false;
3059
3060         ipt.pt._qproc = io_poll_queue_proc;
3061         ipt.pt._key = poll->events;
3062         ipt.req = req;
3063         ipt.error = -EINVAL; /* same as no support for IOCB_CMD_POLL */
3064
3065         /* initialized the list so that we can do list_empty checks */
3066         INIT_LIST_HEAD(&poll->wait.entry);
3067         init_waitqueue_func_entry(&poll->wait, io_poll_wake);
3068         poll->wait.private = poll;
3069
3070         INIT_LIST_HEAD(&req->list);
3071
3072         mask = vfs_poll(poll->file, &ipt.pt) & poll->events;
3073
3074         spin_lock_irq(&ctx->completion_lock);
3075         if (likely(poll->head)) {
3076                 spin_lock(&poll->head->lock);
3077                 if (unlikely(list_empty(&poll->wait.entry))) {
3078                         if (ipt.error)
3079                                 cancel = true;
3080                         ipt.error = 0;
3081                         mask = 0;
3082                 }
3083                 if (mask || ipt.error)
3084                         list_del_init(&poll->wait.entry);
3085                 else if (cancel)
3086                         WRITE_ONCE(poll->canceled, true);
3087                 else if (!poll->done) /* actually waiting for an event */
3088                         io_poll_req_insert(req);
3089                 spin_unlock(&poll->head->lock);
3090         }
3091         if (mask) { /* no async, we'd stolen it */
3092                 ipt.error = 0;
3093                 io_poll_complete(req, mask, 0);
3094         }
3095         spin_unlock_irq(&ctx->completion_lock);
3096
3097         if (mask) {
3098                 io_cqring_ev_posted(ctx);
3099                 io_put_req_find_next(req, nxt);
3100         }
3101         return ipt.error;
3102 }
3103
3104 static enum hrtimer_restart io_timeout_fn(struct hrtimer *timer)
3105 {
3106         struct io_timeout_data *data = container_of(timer,
3107                                                 struct io_timeout_data, timer);
3108         struct io_kiocb *req = data->req;
3109         struct io_ring_ctx *ctx = req->ctx;
3110         unsigned long flags;
3111
3112         atomic_inc(&ctx->cq_timeouts);
3113
3114         spin_lock_irqsave(&ctx->completion_lock, flags);
3115         /*
3116          * We could be racing with timeout deletion. If the list is empty,
3117          * then timeout lookup already found it and will be handling it.
3118          */
3119         if (!list_empty(&req->list)) {
3120                 struct io_kiocb *prev;
3121
3122                 /*
3123                  * Adjust the reqs sequence before the current one because it
3124                  * will consume a slot in the cq_ring and the cq_tail
3125                  * pointer will be increased, otherwise other timeout reqs may
3126                  * return in advance without waiting for enough wait_nr.
3127                  */
3128                 prev = req;
3129                 list_for_each_entry_continue_reverse(prev, &ctx->timeout_list, list)
3130                         prev->sequence++;
3131                 list_del_init(&req->list);
3132         }
3133
3134         io_cqring_fill_event(req, -ETIME);
3135         io_commit_cqring(ctx);
3136         spin_unlock_irqrestore(&ctx->completion_lock, flags);
3137
3138         io_cqring_ev_posted(ctx);
3139         req_set_fail_links(req);
3140         io_put_req(req);
3141         return HRTIMER_NORESTART;
3142 }
3143
3144 static int io_timeout_cancel(struct io_ring_ctx *ctx, __u64 user_data)
3145 {
3146         struct io_kiocb *req;
3147         int ret = -ENOENT;
3148
3149         list_for_each_entry(req, &ctx->timeout_list, list) {
3150                 if (user_data == req->user_data) {
3151                         list_del_init(&req->list);
3152                         ret = 0;
3153                         break;
3154                 }
3155         }
3156
3157         if (ret == -ENOENT)
3158                 return ret;
3159
3160         ret = hrtimer_try_to_cancel(&req->io->timeout.timer);
3161         if (ret == -1)
3162                 return -EALREADY;
3163
3164         req_set_fail_links(req);
3165         io_cqring_fill_event(req, -ECANCELED);
3166         io_put_req(req);
3167         return 0;
3168 }
3169
3170 static int io_timeout_remove_prep(struct io_kiocb *req,
3171                                   const struct io_uring_sqe *sqe)
3172 {
3173         if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL))
3174                 return -EINVAL;
3175         if (sqe->flags || sqe->ioprio || sqe->buf_index || sqe->len)
3176                 return -EINVAL;
3177
3178         req->timeout.addr = READ_ONCE(sqe->addr);
3179         req->timeout.flags = READ_ONCE(sqe->timeout_flags);
3180         if (req->timeout.flags)
3181                 return -EINVAL;
3182
3183         return 0;
3184 }
3185
3186 /*
3187  * Remove or update an existing timeout command
3188  */
3189 static int io_timeout_remove(struct io_kiocb *req)
3190 {
3191         struct io_ring_ctx *ctx = req->ctx;
3192         int ret;
3193
3194         spin_lock_irq(&ctx->completion_lock);
3195         ret = io_timeout_cancel(ctx, req->timeout.addr);
3196
3197         io_cqring_fill_event(req, ret);
3198         io_commit_cqring(ctx);
3199         spin_unlock_irq(&ctx->completion_lock);
3200         io_cqring_ev_posted(ctx);
3201         if (ret < 0)
3202                 req_set_fail_links(req);
3203         io_put_req(req);
3204         return 0;
3205 }
3206
3207 static int io_timeout_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe,
3208                            bool is_timeout_link)
3209 {
3210         struct io_timeout_data *data;
3211         unsigned flags;
3212
3213         if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL))
3214                 return -EINVAL;
3215         if (sqe->ioprio || sqe->buf_index || sqe->len != 1)
3216                 return -EINVAL;
3217         if (sqe->off && is_timeout_link)
3218                 return -EINVAL;
3219         flags = READ_ONCE(sqe->timeout_flags);
3220         if (flags & ~IORING_TIMEOUT_ABS)
3221                 return -EINVAL;
3222
3223         req->timeout.count = READ_ONCE(sqe->off);
3224
3225         if (!req->io && io_alloc_async_ctx(req))
3226                 return -ENOMEM;
3227
3228         data = &req->io->timeout;
3229         data->req = req;
3230         req->flags |= REQ_F_TIMEOUT;
3231
3232         if (get_timespec64(&data->ts, u64_to_user_ptr(sqe->addr)))
3233                 return -EFAULT;
3234
3235         if (flags & IORING_TIMEOUT_ABS)
3236                 data->mode = HRTIMER_MODE_ABS;
3237         else
3238                 data->mode = HRTIMER_MODE_REL;
3239
3240         hrtimer_init(&data->timer, CLOCK_MONOTONIC, data->mode);
3241         return 0;
3242 }
3243
3244 static int io_timeout(struct io_kiocb *req)
3245 {
3246         unsigned count;
3247         struct io_ring_ctx *ctx = req->ctx;
3248         struct io_timeout_data *data;
3249         struct list_head *entry;
3250         unsigned span = 0;
3251
3252         data = &req->io->timeout;
3253
3254         /*
3255          * sqe->off holds how many events that need to occur for this
3256          * timeout event to be satisfied. If it isn't set, then this is
3257          * a pure timeout request, sequence isn't used.
3258          */
3259         count = req->timeout.count;
3260         if (!count) {
3261                 req->flags |= REQ_F_TIMEOUT_NOSEQ;
3262                 spin_lock_irq(&ctx->completion_lock);
3263                 entry = ctx->timeout_list.prev;
3264                 goto add;
3265         }
3266
3267         req->sequence = ctx->cached_sq_head + count - 1;
3268         data->seq_offset = count;
3269
3270         /*
3271          * Insertion sort, ensuring the first entry in the list is always
3272          * the one we need first.
3273          */
3274         spin_lock_irq(&ctx->completion_lock);
3275         list_for_each_prev(entry, &ctx->timeout_list) {
3276                 struct io_kiocb *nxt = list_entry(entry, struct io_kiocb, list);
3277                 unsigned nxt_sq_head;
3278                 long long tmp, tmp_nxt;
3279                 u32 nxt_offset = nxt->io->timeout.seq_offset;
3280
3281                 if (nxt->flags & REQ_F_TIMEOUT_NOSEQ)
3282                         continue;
3283
3284                 /*
3285                  * Since cached_sq_head + count - 1 can overflow, use type long
3286                  * long to store it.
3287                  */
3288                 tmp = (long long)ctx->cached_sq_head + count - 1;
3289                 nxt_sq_head = nxt->sequence - nxt_offset + 1;
3290                 tmp_nxt = (long long)nxt_sq_head + nxt_offset - 1;
3291
3292                 /*
3293                  * cached_sq_head may overflow, and it will never overflow twice
3294                  * once there is some timeout req still be valid.
3295                  */
3296                 if (ctx->cached_sq_head < nxt_sq_head)
3297                         tmp += UINT_MAX;
3298
3299                 if (tmp > tmp_nxt)
3300                         break;
3301
3302                 /*
3303                  * Sequence of reqs after the insert one and itself should
3304                  * be adjusted because each timeout req consumes a slot.
3305                  */
3306                 span++;
3307                 nxt->sequence++;
3308         }
3309         req->sequence -= span;
3310 add:
3311         list_add(&req->list, entry);
3312         data->timer.function = io_timeout_fn;
3313         hrtimer_start(&data->timer, timespec64_to_ktime(data->ts), data->mode);
3314         spin_unlock_irq(&ctx->completion_lock);
3315         return 0;
3316 }
3317
3318 static bool io_cancel_cb(struct io_wq_work *work, void *data)
3319 {
3320         struct io_kiocb *req = container_of(work, struct io_kiocb, work);
3321
3322         return req->user_data == (unsigned long) data;
3323 }
3324
3325 static int io_async_cancel_one(struct io_ring_ctx *ctx, void *sqe_addr)
3326 {
3327         enum io_wq_cancel cancel_ret;
3328         int ret = 0;
3329
3330         cancel_ret = io_wq_cancel_cb(ctx->io_wq, io_cancel_cb, sqe_addr);
3331         switch (cancel_ret) {
3332         case IO_WQ_CANCEL_OK:
3333                 ret = 0;
3334                 break;
3335         case IO_WQ_CANCEL_RUNNING:
3336                 ret = -EALREADY;
3337                 break;
3338         case IO_WQ_CANCEL_NOTFOUND:
3339                 ret = -ENOENT;
3340                 break;
3341         }
3342
3343         return ret;
3344 }
3345
3346 static void io_async_find_and_cancel(struct io_ring_ctx *ctx,
3347                                      struct io_kiocb *req, __u64 sqe_addr,
3348                                      struct io_kiocb **nxt, int success_ret)
3349 {
3350         unsigned long flags;
3351         int ret;
3352
3353         ret = io_async_cancel_one(ctx, (void *) (unsigned long) sqe_addr);
3354         if (ret != -ENOENT) {
3355                 spin_lock_irqsave(&ctx->completion_lock, flags);
3356                 goto done;
3357         }
3358
3359         spin_lock_irqsave(&ctx->completion_lock, flags);
3360         ret = io_timeout_cancel(ctx, sqe_addr);
3361         if (ret != -ENOENT)
3362                 goto done;
3363         ret = io_poll_cancel(ctx, sqe_addr);
3364 done:
3365         if (!ret)
3366                 ret = success_ret;
3367         io_cqring_fill_event(req, ret);
3368         io_commit_cqring(ctx);
3369         spin_unlock_irqrestore(&ctx->completion_lock, flags);
3370         io_cqring_ev_posted(ctx);
3371
3372         if (ret < 0)
3373                 req_set_fail_links(req);
3374         io_put_req_find_next(req, nxt);
3375 }
3376
3377 static int io_async_cancel_prep(struct io_kiocb *req,
3378                                 const struct io_uring_sqe *sqe)
3379 {
3380         if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL))
3381                 return -EINVAL;
3382         if (sqe->flags || sqe->ioprio || sqe->off || sqe->len ||
3383             sqe->cancel_flags)
3384                 return -EINVAL;
3385
3386         req->cancel.addr = READ_ONCE(sqe->addr);
3387         return 0;
3388 }
3389
3390 static int io_async_cancel(struct io_kiocb *req, struct io_kiocb **nxt)
3391 {
3392         struct io_ring_ctx *ctx = req->ctx;
3393
3394         io_async_find_and_cancel(ctx, req, req->cancel.addr, nxt, 0);
3395         return 0;
3396 }
3397
3398 static int io_files_update_prep(struct io_kiocb *req,
3399                                 const struct io_uring_sqe *sqe)
3400 {
3401         if (sqe->flags || sqe->ioprio || sqe->rw_flags)
3402                 return -EINVAL;
3403
3404         req->files_update.offset = READ_ONCE(sqe->off);
3405         req->files_update.nr_args = READ_ONCE(sqe->len);
3406         if (!req->files_update.nr_args)
3407                 return -EINVAL;
3408         req->files_update.arg = READ_ONCE(sqe->addr);
3409         return 0;
3410 }
3411
3412 static int io_files_update(struct io_kiocb *req, bool force_nonblock)
3413 {
3414         struct io_ring_ctx *ctx = req->ctx;
3415         struct io_uring_files_update up;
3416         int ret;
3417
3418         if (force_nonblock) {
3419                 req->work.flags |= IO_WQ_WORK_NEEDS_FILES;
3420                 return -EAGAIN;
3421         }
3422
3423         up.offset = req->files_update.offset;
3424         up.fds = req->files_update.arg;
3425
3426         mutex_lock(&ctx->uring_lock);
3427         ret = __io_sqe_files_update(ctx, &up, req->files_update.nr_args);
3428         mutex_unlock(&ctx->uring_lock);
3429
3430         if (ret < 0)
3431                 req_set_fail_links(req);
3432         io_cqring_add_event(req, ret);
3433         io_put_req(req);
3434         return 0;
3435 }
3436
3437 static int io_req_defer_prep(struct io_kiocb *req,
3438                              const struct io_uring_sqe *sqe)
3439 {
3440         ssize_t ret = 0;
3441
3442         switch (req->opcode) {
3443         case IORING_OP_NOP:
3444                 break;
3445         case IORING_OP_READV:
3446         case IORING_OP_READ_FIXED:
3447                 ret = io_read_prep(req, sqe, true);
3448                 break;
3449         case IORING_OP_WRITEV:
3450         case IORING_OP_WRITE_FIXED:
3451                 ret = io_write_prep(req, sqe, true);
3452                 break;
3453         case IORING_OP_POLL_ADD:
3454                 ret = io_poll_add_prep(req, sqe);
3455                 break;
3456         case IORING_OP_POLL_REMOVE:
3457                 ret = io_poll_remove_prep(req, sqe);
3458                 break;
3459         case IORING_OP_FSYNC:
3460                 ret = io_prep_fsync(req, sqe);
3461                 break;
3462         case IORING_OP_SYNC_FILE_RANGE:
3463                 ret = io_prep_sfr(req, sqe);
3464                 break;
3465         case IORING_OP_SENDMSG:
3466                 ret = io_sendmsg_prep(req, sqe);
3467                 break;
3468         case IORING_OP_RECVMSG:
3469                 ret = io_recvmsg_prep(req, sqe);
3470                 break;
3471         case IORING_OP_CONNECT:
3472                 ret = io_connect_prep(req, sqe);
3473                 break;
3474         case IORING_OP_TIMEOUT:
3475                 ret = io_timeout_prep(req, sqe, false);
3476                 break;
3477         case IORING_OP_TIMEOUT_REMOVE:
3478                 ret = io_timeout_remove_prep(req, sqe);
3479                 break;
3480         case IORING_OP_ASYNC_CANCEL:
3481                 ret = io_async_cancel_prep(req, sqe);
3482                 break;
3483         case IORING_OP_LINK_TIMEOUT:
3484                 ret = io_timeout_prep(req, sqe, true);
3485                 break;
3486         case IORING_OP_ACCEPT:
3487                 ret = io_accept_prep(req, sqe);
3488                 break;
3489         case IORING_OP_FALLOCATE:
3490                 ret = io_fallocate_prep(req, sqe);
3491                 break;
3492         case IORING_OP_OPENAT:
3493                 ret = io_openat_prep(req, sqe);
3494                 break;
3495         case IORING_OP_CLOSE:
3496                 ret = io_close_prep(req, sqe);
3497                 break;
3498         case IORING_OP_FILES_UPDATE:
3499                 ret = io_files_update_prep(req, sqe);
3500                 break;
3501         case IORING_OP_STATX:
3502                 ret = io_statx_prep(req, sqe);
3503                 break;
3504         default:
3505                 printk_once(KERN_WARNING "io_uring: unhandled opcode %d\n",
3506                                 req->opcode);
3507                 ret = -EINVAL;
3508                 break;
3509         }
3510
3511         return ret;
3512 }
3513
3514 static int io_req_defer(struct io_kiocb *req, const struct io_uring_sqe *sqe)
3515 {
3516         struct io_ring_ctx *ctx = req->ctx;
3517         int ret;
3518
3519         /* Still need defer if there is pending req in defer list. */
3520         if (!req_need_defer(req) && list_empty(&ctx->defer_list))
3521                 return 0;
3522
3523         if (!req->io && io_alloc_async_ctx(req))
3524                 return -EAGAIN;
3525
3526         ret = io_req_defer_prep(req, sqe);
3527         if (ret < 0)
3528                 return ret;
3529
3530         spin_lock_irq(&ctx->completion_lock);
3531         if (!req_need_defer(req) && list_empty(&ctx->defer_list)) {
3532                 spin_unlock_irq(&ctx->completion_lock);
3533                 return 0;
3534         }
3535
3536         trace_io_uring_defer(ctx, req, req->user_data);
3537         list_add_tail(&req->list, &ctx->defer_list);
3538         spin_unlock_irq(&ctx->completion_lock);
3539         return -EIOCBQUEUED;
3540 }
3541
3542 static int io_issue_sqe(struct io_kiocb *req, const struct io_uring_sqe *sqe,
3543                         struct io_kiocb **nxt, bool force_nonblock)
3544 {
3545         struct io_ring_ctx *ctx = req->ctx;
3546         int ret;
3547
3548         switch (req->opcode) {
3549         case IORING_OP_NOP:
3550                 ret = io_nop(req);
3551                 break;
3552         case IORING_OP_READV:
3553         case IORING_OP_READ_FIXED:
3554                 if (sqe) {
3555                         ret = io_read_prep(req, sqe, force_nonblock);
3556                         if (ret < 0)
3557                                 break;
3558                 }
3559                 ret = io_read(req, nxt, force_nonblock);
3560                 break;
3561         case IORING_OP_WRITEV:
3562         case IORING_OP_WRITE_FIXED:
3563                 if (sqe) {
3564                         ret = io_write_prep(req, sqe, force_nonblock);
3565                         if (ret < 0)
3566                                 break;
3567                 }
3568                 ret = io_write(req, nxt, force_nonblock);
3569                 break;
3570         case IORING_OP_FSYNC:
3571                 if (sqe) {
3572                         ret = io_prep_fsync(req, sqe);
3573                         if (ret < 0)
3574                                 break;
3575                 }
3576                 ret = io_fsync(req, nxt, force_nonblock);
3577                 break;
3578         case IORING_OP_POLL_ADD:
3579                 if (sqe) {
3580                         ret = io_poll_add_prep(req, sqe);
3581                         if (ret)
3582                                 break;
3583                 }
3584                 ret = io_poll_add(req, nxt);
3585                 break;
3586         case IORING_OP_POLL_REMOVE:
3587                 if (sqe) {
3588                         ret = io_poll_remove_prep(req, sqe);
3589                         if (ret < 0)
3590                                 break;
3591                 }
3592                 ret = io_poll_remove(req);
3593                 break;
3594         case IORING_OP_SYNC_FILE_RANGE:
3595                 if (sqe) {
3596                         ret = io_prep_sfr(req, sqe);
3597                         if (ret < 0)
3598                                 break;
3599                 }
3600                 ret = io_sync_file_range(req, nxt, force_nonblock);
3601                 break;
3602         case IORING_OP_SENDMSG:
3603                 if (sqe) {
3604                         ret = io_sendmsg_prep(req, sqe);
3605                         if (ret < 0)
3606                                 break;
3607                 }
3608                 ret = io_sendmsg(req, nxt, force_nonblock);
3609                 break;
3610         case IORING_OP_RECVMSG:
3611                 if (sqe) {
3612                         ret = io_recvmsg_prep(req, sqe);
3613                         if (ret)
3614                                 break;
3615                 }
3616                 ret = io_recvmsg(req, nxt, force_nonblock);
3617                 break;
3618         case IORING_OP_TIMEOUT:
3619                 if (sqe) {
3620                         ret = io_timeout_prep(req, sqe, false);
3621                         if (ret)
3622                                 break;
3623                 }
3624                 ret = io_timeout(req);
3625                 break;
3626         case IORING_OP_TIMEOUT_REMOVE:
3627                 if (sqe) {
3628                         ret = io_timeout_remove_prep(req, sqe);
3629                         if (ret)
3630                                 break;
3631                 }
3632                 ret = io_timeout_remove(req);
3633                 break;
3634         case IORING_OP_ACCEPT:
3635                 if (sqe) {
3636                         ret = io_accept_prep(req, sqe);
3637                         if (ret)
3638                                 break;
3639                 }
3640                 ret = io_accept(req, nxt, force_nonblock);
3641                 break;
3642         case IORING_OP_CONNECT:
3643                 if (sqe) {
3644                         ret = io_connect_prep(req, sqe);
3645                         if (ret)
3646                                 break;
3647                 }
3648                 ret = io_connect(req, nxt, force_nonblock);
3649                 break;
3650         case IORING_OP_ASYNC_CANCEL:
3651                 if (sqe) {
3652                         ret = io_async_cancel_prep(req, sqe);
3653                         if (ret)
3654                                 break;
3655                 }
3656                 ret = io_async_cancel(req, nxt);
3657                 break;
3658         case IORING_OP_FALLOCATE:
3659                 if (sqe) {
3660                         ret = io_fallocate_prep(req, sqe);
3661                         if (ret)
3662                                 break;
3663                 }
3664                 ret = io_fallocate(req, nxt, force_nonblock);
3665                 break;
3666         case IORING_OP_OPENAT:
3667                 if (sqe) {
3668                         ret = io_openat_prep(req, sqe);
3669                         if (ret)
3670                                 break;
3671                 }
3672                 ret = io_openat(req, nxt, force_nonblock);
3673                 break;
3674         case IORING_OP_CLOSE:
3675                 if (sqe) {
3676                         ret = io_close_prep(req, sqe);
3677                         if (ret)
3678                                 break;
3679                 }
3680                 ret = io_close(req, nxt, force_nonblock);
3681                 break;
3682         case IORING_OP_FILES_UPDATE:
3683                 if (sqe) {
3684                         ret = io_files_update_prep(req, sqe);
3685                         if (ret)
3686                                 break;
3687                 }
3688                 ret = io_files_update(req, force_nonblock);
3689                 break;
3690         case IORING_OP_STATX:
3691                 if (sqe) {
3692                         ret = io_statx_prep(req, sqe);
3693                         if (ret)
3694                                 break;
3695                 }
3696                 ret = io_statx(req, nxt, force_nonblock);
3697                 break;
3698         default:
3699                 ret = -EINVAL;
3700                 break;
3701         }
3702
3703         if (ret)
3704                 return ret;
3705
3706         if (ctx->flags & IORING_SETUP_IOPOLL) {
3707                 const bool in_async = io_wq_current_is_worker();
3708
3709                 if (req->result == -EAGAIN)
3710                         return -EAGAIN;
3711
3712                 /* workqueue context doesn't hold uring_lock, grab it now */
3713                 if (in_async)
3714                         mutex_lock(&ctx->uring_lock);
3715
3716                 io_iopoll_req_issued(req);
3717
3718                 if (in_async)
3719                         mutex_unlock(&ctx->uring_lock);
3720         }
3721
3722         return 0;
3723 }
3724
3725 static void io_wq_submit_work(struct io_wq_work **workptr)
3726 {
3727         struct io_wq_work *work = *workptr;
3728         struct io_kiocb *req = container_of(work, struct io_kiocb, work);
3729         struct io_kiocb *nxt = NULL;
3730         int ret = 0;
3731
3732         /* if NO_CANCEL is set, we must still run the work */
3733         if ((work->flags & (IO_WQ_WORK_CANCEL|IO_WQ_WORK_NO_CANCEL)) ==
3734                                 IO_WQ_WORK_CANCEL) {
3735                 ret = -ECANCELED;
3736         }
3737
3738         if (!ret) {
3739                 req->has_user = (work->flags & IO_WQ_WORK_HAS_MM) != 0;
3740                 req->in_async = true;
3741                 do {
3742                         ret = io_issue_sqe(req, NULL, &nxt, false);
3743                         /*
3744                          * We can get EAGAIN for polled IO even though we're
3745                          * forcing a sync submission from here, since we can't
3746                          * wait for request slots on the block side.
3747                          */
3748                         if (ret != -EAGAIN)
3749                                 break;
3750                         cond_resched();
3751                 } while (1);
3752         }
3753
3754         /* drop submission reference */
3755         io_put_req(req);
3756
3757         if (ret) {
3758                 req_set_fail_links(req);
3759                 io_cqring_add_event(req, ret);
3760                 io_put_req(req);
3761         }
3762
3763         /* if a dependent link is ready, pass it back */
3764         if (!ret && nxt)
3765                 io_wq_assign_next(workptr, nxt);
3766 }
3767
3768 static bool io_req_op_valid(int op)
3769 {
3770         return op >= IORING_OP_NOP && op < IORING_OP_LAST;
3771 }
3772
3773 static int io_req_needs_file(struct io_kiocb *req, int fd)
3774 {
3775         switch (req->opcode) {
3776         case IORING_OP_NOP:
3777         case IORING_OP_POLL_REMOVE:
3778         case IORING_OP_TIMEOUT:
3779         case IORING_OP_TIMEOUT_REMOVE:
3780         case IORING_OP_ASYNC_CANCEL:
3781         case IORING_OP_LINK_TIMEOUT:
3782                 return 0;
3783         case IORING_OP_OPENAT:
3784         case IORING_OP_STATX:
3785                 return fd != -1;
3786         default:
3787                 if (io_req_op_valid(req->opcode))
3788                         return 1;
3789                 return -EINVAL;
3790         }
3791 }
3792
3793 static inline struct file *io_file_from_index(struct io_ring_ctx *ctx,
3794                                               int index)
3795 {
3796         struct fixed_file_table *table;
3797
3798         table = &ctx->file_data->table[index >> IORING_FILE_TABLE_SHIFT];
3799         return table->files[index & IORING_FILE_TABLE_MASK];;
3800 }
3801
3802 static int io_req_set_file(struct io_submit_state *state, struct io_kiocb *req,
3803                            const struct io_uring_sqe *sqe)
3804 {
3805         struct io_ring_ctx *ctx = req->ctx;
3806         unsigned flags;
3807         int fd, ret;
3808
3809         flags = READ_ONCE(sqe->flags);
3810         fd = READ_ONCE(sqe->fd);
3811
3812         if (flags & IOSQE_IO_DRAIN)
3813                 req->flags |= REQ_F_IO_DRAIN;
3814
3815         ret = io_req_needs_file(req, fd);
3816         if (ret <= 0)
3817                 return ret;
3818
3819         if (flags & IOSQE_FIXED_FILE) {
3820                 if (unlikely(!ctx->file_data ||
3821                     (unsigned) fd >= ctx->nr_user_files))
3822                         return -EBADF;
3823                 fd = array_index_nospec(fd, ctx->nr_user_files);
3824                 req->file = io_file_from_index(ctx, fd);
3825                 if (!req->file)
3826                         return -EBADF;
3827                 req->flags |= REQ_F_FIXED_FILE;
3828                 percpu_ref_get(&ctx->file_data->refs);
3829         } else {
3830                 if (req->needs_fixed_file)
3831                         return -EBADF;
3832                 trace_io_uring_file_get(ctx, fd);
3833                 req->file = io_file_get(state, fd);
3834                 if (unlikely(!req->file))
3835                         return -EBADF;
3836         }
3837
3838         return 0;
3839 }
3840
3841 static int io_grab_files(struct io_kiocb *req)
3842 {
3843         int ret = -EBADF;
3844         struct io_ring_ctx *ctx = req->ctx;
3845
3846         if (!req->ring_file)
3847                 return -EBADF;
3848
3849         rcu_read_lock();
3850         spin_lock_irq(&ctx->inflight_lock);
3851         /*
3852          * We use the f_ops->flush() handler to ensure that we can flush
3853          * out work accessing these files if the fd is closed. Check if
3854          * the fd has changed since we started down this path, and disallow
3855          * this operation if it has.
3856          */
3857         if (fcheck(req->ring_fd) == req->ring_file) {
3858                 list_add(&req->inflight_entry, &ctx->inflight_list);
3859                 req->flags |= REQ_F_INFLIGHT;
3860                 req->work.files = current->files;
3861                 ret = 0;
3862         }
3863         spin_unlock_irq(&ctx->inflight_lock);
3864         rcu_read_unlock();
3865
3866         return ret;
3867 }
3868
3869 static enum hrtimer_restart io_link_timeout_fn(struct hrtimer *timer)
3870 {
3871         struct io_timeout_data *data = container_of(timer,
3872                                                 struct io_timeout_data, timer);
3873         struct io_kiocb *req = data->req;
3874         struct io_ring_ctx *ctx = req->ctx;
3875         struct io_kiocb *prev = NULL;
3876         unsigned long flags;
3877
3878         spin_lock_irqsave(&ctx->completion_lock, flags);
3879
3880         /*
3881          * We don't expect the list to be empty, that will only happen if we
3882          * race with the completion of the linked work.
3883          */
3884         if (!list_empty(&req->link_list)) {
3885                 prev = list_entry(req->link_list.prev, struct io_kiocb,
3886                                   link_list);
3887                 if (refcount_inc_not_zero(&prev->refs)) {
3888                         list_del_init(&req->link_list);
3889                         prev->flags &= ~REQ_F_LINK_TIMEOUT;
3890                 } else
3891                         prev = NULL;
3892         }
3893
3894         spin_unlock_irqrestore(&ctx->completion_lock, flags);
3895
3896         if (prev) {
3897                 req_set_fail_links(prev);
3898                 io_async_find_and_cancel(ctx, req, prev->user_data, NULL,
3899                                                 -ETIME);
3900                 io_put_req(prev);
3901         } else {
3902                 io_cqring_add_event(req, -ETIME);
3903                 io_put_req(req);
3904         }
3905         return HRTIMER_NORESTART;
3906 }
3907
3908 static void io_queue_linked_timeout(struct io_kiocb *req)
3909 {
3910         struct io_ring_ctx *ctx = req->ctx;
3911
3912         /*
3913          * If the list is now empty, then our linked request finished before
3914          * we got a chance to setup the timer
3915          */
3916         spin_lock_irq(&ctx->completion_lock);
3917         if (!list_empty(&req->link_list)) {
3918                 struct io_timeout_data *data = &req->io->timeout;
3919
3920                 data->timer.function = io_link_timeout_fn;
3921                 hrtimer_start(&data->timer, timespec64_to_ktime(data->ts),
3922                                 data->mode);
3923         }
3924         spin_unlock_irq(&ctx->completion_lock);
3925
3926         /* drop submission reference */
3927         io_put_req(req);
3928 }
3929
3930 static struct io_kiocb *io_prep_linked_timeout(struct io_kiocb *req)
3931 {
3932         struct io_kiocb *nxt;
3933
3934         if (!(req->flags & REQ_F_LINK))
3935                 return NULL;
3936
3937         nxt = list_first_entry_or_null(&req->link_list, struct io_kiocb,
3938                                         link_list);
3939         if (!nxt || nxt->opcode != IORING_OP_LINK_TIMEOUT)
3940                 return NULL;
3941
3942         req->flags |= REQ_F_LINK_TIMEOUT;
3943         return nxt;
3944 }
3945
3946 static void __io_queue_sqe(struct io_kiocb *req, const struct io_uring_sqe *sqe)
3947 {
3948         struct io_kiocb *linked_timeout;
3949         struct io_kiocb *nxt = NULL;
3950         int ret;
3951
3952 again:
3953         linked_timeout = io_prep_linked_timeout(req);
3954
3955         ret = io_issue_sqe(req, sqe, &nxt, true);
3956
3957         /*
3958          * We async punt it if the file wasn't marked NOWAIT, or if the file
3959          * doesn't support non-blocking read/write attempts
3960          */
3961         if (ret == -EAGAIN && (!(req->flags & REQ_F_NOWAIT) ||
3962             (req->flags & REQ_F_MUST_PUNT))) {
3963                 if (req->work.flags & IO_WQ_WORK_NEEDS_FILES) {
3964                         ret = io_grab_files(req);
3965                         if (ret)
3966                                 goto err;
3967                 }
3968
3969                 /*
3970                  * Queued up for async execution, worker will release
3971                  * submit reference when the iocb is actually submitted.
3972                  */
3973                 io_queue_async_work(req);
3974                 goto done_req;
3975         }
3976
3977 err:
3978         /* drop submission reference */
3979         io_put_req(req);
3980
3981         if (linked_timeout) {
3982                 if (!ret)
3983                         io_queue_linked_timeout(linked_timeout);
3984                 else
3985                         io_put_req(linked_timeout);
3986         }
3987
3988         /* and drop final reference, if we failed */
3989         if (ret) {
3990                 io_cqring_add_event(req, ret);
3991                 req_set_fail_links(req);
3992                 io_put_req(req);
3993         }
3994 done_req:
3995         if (nxt) {
3996                 req = nxt;
3997                 nxt = NULL;
3998                 goto again;
3999         }
4000 }
4001
4002 static void io_queue_sqe(struct io_kiocb *req, const struct io_uring_sqe *sqe)
4003 {
4004         int ret;
4005
4006         if (unlikely(req->ctx->drain_next)) {
4007                 req->flags |= REQ_F_IO_DRAIN;
4008                 req->ctx->drain_next = false;
4009         }
4010         req->ctx->drain_next = (req->flags & REQ_F_DRAIN_LINK);
4011
4012         ret = io_req_defer(req, sqe);
4013         if (ret) {
4014                 if (ret != -EIOCBQUEUED) {
4015                         io_cqring_add_event(req, ret);
4016                         req_set_fail_links(req);
4017                         io_double_put_req(req);
4018                 }
4019         } else if ((req->flags & REQ_F_FORCE_ASYNC) &&
4020                    !io_wq_current_is_worker()) {
4021                 /*
4022                  * Never try inline submit of IOSQE_ASYNC is set, go straight
4023                  * to async execution.
4024                  */
4025                 req->work.flags |= IO_WQ_WORK_CONCURRENT;
4026                 io_queue_async_work(req);
4027         } else {
4028                 __io_queue_sqe(req, sqe);
4029         }
4030 }
4031
4032 static inline void io_queue_link_head(struct io_kiocb *req)
4033 {
4034         if (unlikely(req->flags & REQ_F_FAIL_LINK)) {
4035                 io_cqring_add_event(req, -ECANCELED);
4036                 io_double_put_req(req);
4037         } else
4038                 io_queue_sqe(req, NULL);
4039 }
4040
4041 #define SQE_VALID_FLAGS (IOSQE_FIXED_FILE|IOSQE_IO_DRAIN|IOSQE_IO_LINK| \
4042                                 IOSQE_IO_HARDLINK | IOSQE_ASYNC)
4043
4044 static bool io_submit_sqe(struct io_kiocb *req, const struct io_uring_sqe *sqe,
4045                           struct io_submit_state *state, struct io_kiocb **link)
4046 {
4047         struct io_ring_ctx *ctx = req->ctx;
4048         unsigned int sqe_flags;
4049         int ret;
4050
4051         sqe_flags = READ_ONCE(sqe->flags);
4052
4053         /* enforce forwards compatibility on users */
4054         if (unlikely(sqe_flags & ~SQE_VALID_FLAGS)) {
4055                 ret = -EINVAL;
4056                 goto err_req;
4057         }
4058         if (sqe_flags & IOSQE_ASYNC)
4059                 req->flags |= REQ_F_FORCE_ASYNC;
4060
4061         ret = io_req_set_file(state, req, sqe);
4062         if (unlikely(ret)) {
4063 err_req:
4064                 io_cqring_add_event(req, ret);
4065                 io_double_put_req(req);
4066                 return false;
4067         }
4068
4069         /*
4070          * If we already have a head request, queue this one for async
4071          * submittal once the head completes. If we don't have a head but
4072          * IOSQE_IO_LINK is set in the sqe, start a new head. This one will be
4073          * submitted sync once the chain is complete. If none of those
4074          * conditions are true (normal request), then just queue it.
4075          */
4076         if (*link) {
4077                 struct io_kiocb *head = *link;
4078
4079                 if (sqe_flags & IOSQE_IO_DRAIN)
4080                         head->flags |= REQ_F_DRAIN_LINK | REQ_F_IO_DRAIN;
4081
4082                 if (sqe_flags & IOSQE_IO_HARDLINK)
4083                         req->flags |= REQ_F_HARDLINK;
4084
4085                 if (io_alloc_async_ctx(req)) {
4086                         ret = -EAGAIN;
4087                         goto err_req;
4088                 }
4089
4090                 ret = io_req_defer_prep(req, sqe);
4091                 if (ret) {
4092                         /* fail even hard links since we don't submit */
4093                         head->flags |= REQ_F_FAIL_LINK;
4094                         goto err_req;
4095                 }
4096                 trace_io_uring_link(ctx, req, head);
4097                 list_add_tail(&req->link_list, &head->link_list);
4098
4099                 /* last request of a link, enqueue the link */
4100                 if (!(sqe_flags & (IOSQE_IO_LINK|IOSQE_IO_HARDLINK))) {
4101                         io_queue_link_head(head);
4102                         *link = NULL;
4103                 }
4104         } else if (sqe_flags & (IOSQE_IO_LINK|IOSQE_IO_HARDLINK)) {
4105                 req->flags |= REQ_F_LINK;
4106                 if (sqe_flags & IOSQE_IO_HARDLINK)
4107                         req->flags |= REQ_F_HARDLINK;
4108
4109                 INIT_LIST_HEAD(&req->link_list);
4110                 ret = io_req_defer_prep(req, sqe);
4111                 if (ret)
4112                         req->flags |= REQ_F_FAIL_LINK;
4113                 *link = req;
4114         } else {
4115                 io_queue_sqe(req, sqe);
4116         }
4117
4118         return true;
4119 }
4120
4121 /*
4122  * Batched submission is done, ensure local IO is flushed out.
4123  */
4124 static void io_submit_state_end(struct io_submit_state *state)
4125 {
4126         blk_finish_plug(&state->plug);
4127         io_file_put(state);
4128         if (state->free_reqs)
4129                 kmem_cache_free_bulk(req_cachep, state->free_reqs,
4130                                         &state->reqs[state->cur_req]);
4131 }
4132
4133 /*
4134  * Start submission side cache.
4135  */
4136 static void io_submit_state_start(struct io_submit_state *state,
4137                                   unsigned int max_ios)
4138 {
4139         blk_start_plug(&state->plug);
4140         state->free_reqs = 0;
4141         state->file = NULL;
4142         state->ios_left = max_ios;
4143 }
4144
4145 static void io_commit_sqring(struct io_ring_ctx *ctx)
4146 {
4147         struct io_rings *rings = ctx->rings;
4148
4149         if (ctx->cached_sq_head != READ_ONCE(rings->sq.head)) {
4150                 /*
4151                  * Ensure any loads from the SQEs are done at this point,
4152                  * since once we write the new head, the application could
4153                  * write new data to them.
4154                  */
4155                 smp_store_release(&rings->sq.head, ctx->cached_sq_head);
4156         }
4157 }
4158
4159 /*
4160  * Fetch an sqe, if one is available. Note that sqe_ptr will point to memory
4161  * that is mapped by userspace. This means that care needs to be taken to
4162  * ensure that reads are stable, as we cannot rely on userspace always
4163  * being a good citizen. If members of the sqe are validated and then later
4164  * used, it's important that those reads are done through READ_ONCE() to
4165  * prevent a re-load down the line.
4166  */
4167 static bool io_get_sqring(struct io_ring_ctx *ctx, struct io_kiocb *req,
4168                           const struct io_uring_sqe **sqe_ptr)
4169 {
4170         struct io_rings *rings = ctx->rings;
4171         u32 *sq_array = ctx->sq_array;
4172         unsigned head;
4173
4174         /*
4175          * The cached sq head (or cq tail) serves two purposes:
4176          *
4177          * 1) allows us to batch the cost of updating the user visible
4178          *    head updates.
4179          * 2) allows the kernel side to track the head on its own, even
4180          *    though the application is the one updating it.
4181          */
4182         head = ctx->cached_sq_head;
4183         /* make sure SQ entry isn't read before tail */
4184         if (unlikely(head == smp_load_acquire(&rings->sq.tail)))
4185                 return false;
4186
4187         head = READ_ONCE(sq_array[head & ctx->sq_mask]);
4188         if (likely(head < ctx->sq_entries)) {
4189                 /*
4190                  * All io need record the previous position, if LINK vs DARIN,
4191                  * it can be used to mark the position of the first IO in the
4192                  * link list.
4193                  */
4194                 req->sequence = ctx->cached_sq_head;
4195                 *sqe_ptr = &ctx->sq_sqes[head];
4196                 req->opcode = READ_ONCE((*sqe_ptr)->opcode);
4197                 req->user_data = READ_ONCE((*sqe_ptr)->user_data);
4198                 ctx->cached_sq_head++;
4199                 return true;
4200         }
4201
4202         /* drop invalid entries */
4203         ctx->cached_sq_head++;
4204         ctx->cached_sq_dropped++;
4205         WRITE_ONCE(rings->sq_dropped, ctx->cached_sq_dropped);
4206         return false;
4207 }
4208
4209 static int io_submit_sqes(struct io_ring_ctx *ctx, unsigned int nr,
4210                           struct file *ring_file, int ring_fd,
4211                           struct mm_struct **mm, bool async)
4212 {
4213         struct io_submit_state state, *statep = NULL;
4214         struct io_kiocb *link = NULL;
4215         int i, submitted = 0;
4216         bool mm_fault = false;
4217
4218         /* if we have a backlog and couldn't flush it all, return BUSY */
4219         if (!list_empty(&ctx->cq_overflow_list) &&
4220             !io_cqring_overflow_flush(ctx, false))
4221                 return -EBUSY;
4222
4223         if (nr > IO_PLUG_THRESHOLD) {
4224                 io_submit_state_start(&state, nr);
4225                 statep = &state;
4226         }
4227
4228         for (i = 0; i < nr; i++) {
4229                 const struct io_uring_sqe *sqe;
4230                 struct io_kiocb *req;
4231
4232                 req = io_get_req(ctx, statep);
4233                 if (unlikely(!req)) {
4234                         if (!submitted)
4235                                 submitted = -EAGAIN;
4236                         break;
4237                 }
4238                 if (!io_get_sqring(ctx, req, &sqe)) {
4239                         __io_free_req(req);
4240                         break;
4241                 }
4242
4243                 if (io_req_needs_user(req) && !*mm) {
4244                         mm_fault = mm_fault || !mmget_not_zero(ctx->sqo_mm);
4245                         if (!mm_fault) {
4246                                 use_mm(ctx->sqo_mm);
4247                                 *mm = ctx->sqo_mm;
4248                         }
4249                 }
4250
4251                 submitted++;
4252                 req->ring_file = ring_file;
4253                 req->ring_fd = ring_fd;
4254                 req->has_user = *mm != NULL;
4255                 req->in_async = async;
4256                 req->needs_fixed_file = async;
4257                 trace_io_uring_submit_sqe(ctx, req->user_data, true, async);
4258                 if (!io_submit_sqe(req, sqe, statep, &link))
4259                         break;
4260         }
4261
4262         if (link)
4263                 io_queue_link_head(link);
4264         if (statep)
4265                 io_submit_state_end(&state);
4266
4267          /* Commit SQ ring head once we've consumed and submitted all SQEs */
4268         io_commit_sqring(ctx);
4269
4270         return submitted;
4271 }
4272
4273 static int io_sq_thread(void *data)
4274 {
4275         struct io_ring_ctx *ctx = data;
4276         struct mm_struct *cur_mm = NULL;
4277         const struct cred *old_cred;
4278         mm_segment_t old_fs;
4279         DEFINE_WAIT(wait);
4280         unsigned inflight;
4281         unsigned long timeout;
4282         int ret;
4283
4284         complete(&ctx->completions[1]);
4285
4286         old_fs = get_fs();
4287         set_fs(USER_DS);
4288         old_cred = override_creds(ctx->creds);
4289
4290         ret = timeout = inflight = 0;
4291         while (!kthread_should_park()) {
4292                 unsigned int to_submit;
4293
4294                 if (inflight) {
4295                         unsigned nr_events = 0;
4296
4297                         if (ctx->flags & IORING_SETUP_IOPOLL) {
4298                                 /*
4299                                  * inflight is the count of the maximum possible
4300                                  * entries we submitted, but it can be smaller
4301                                  * if we dropped some of them. If we don't have
4302                                  * poll entries available, then we know that we
4303                                  * have nothing left to poll for. Reset the
4304                                  * inflight count to zero in that case.
4305                                  */
4306                                 mutex_lock(&ctx->uring_lock);
4307                                 if (!list_empty(&ctx->poll_list))
4308                                         __io_iopoll_check(ctx, &nr_events, 0);
4309                                 else
4310                                         inflight = 0;
4311                                 mutex_unlock(&ctx->uring_lock);
4312                         } else {
4313                                 /*
4314                                  * Normal IO, just pretend everything completed.
4315                                  * We don't have to poll completions for that.
4316                                  */
4317                                 nr_events = inflight;
4318                         }
4319
4320                         inflight -= nr_events;
4321                         if (!inflight)
4322                                 timeout = jiffies + ctx->sq_thread_idle;
4323                 }
4324
4325                 to_submit = io_sqring_entries(ctx);
4326
4327                 /*
4328                  * If submit got -EBUSY, flag us as needing the application
4329                  * to enter the kernel to reap and flush events.
4330                  */
4331                 if (!to_submit || ret == -EBUSY) {
4332                         /*
4333                          * We're polling. If we're within the defined idle
4334                          * period, then let us spin without work before going
4335                          * to sleep. The exception is if we got EBUSY doing
4336                          * more IO, we should wait for the application to
4337                          * reap events and wake us up.
4338                          */
4339                         if (inflight ||
4340                             (!time_after(jiffies, timeout) && ret != -EBUSY)) {
4341                                 cond_resched();
4342                                 continue;
4343                         }
4344
4345                         /*
4346                          * Drop cur_mm before scheduling, we can't hold it for
4347                          * long periods (or over schedule()). Do this before
4348                          * adding ourselves to the waitqueue, as the unuse/drop
4349                          * may sleep.
4350                          */
4351                         if (cur_mm) {
4352                                 unuse_mm(cur_mm);
4353                                 mmput(cur_mm);
4354                                 cur_mm = NULL;
4355                         }
4356
4357                         prepare_to_wait(&ctx->sqo_wait, &wait,
4358                                                 TASK_INTERRUPTIBLE);
4359
4360                         /* Tell userspace we may need a wakeup call */
4361                         ctx->rings->sq_flags |= IORING_SQ_NEED_WAKEUP;
4362                         /* make sure to read SQ tail after writing flags */
4363                         smp_mb();
4364
4365                         to_submit = io_sqring_entries(ctx);
4366                         if (!to_submit || ret == -EBUSY) {
4367                                 if (kthread_should_park()) {
4368                                         finish_wait(&ctx->sqo_wait, &wait);
4369                                         break;
4370                                 }
4371                                 if (signal_pending(current))
4372                                         flush_signals(current);
4373                                 schedule();
4374                                 finish_wait(&ctx->sqo_wait, &wait);
4375
4376                                 ctx->rings->sq_flags &= ~IORING_SQ_NEED_WAKEUP;
4377                                 continue;
4378                         }
4379                         finish_wait(&ctx->sqo_wait, &wait);
4380
4381                         ctx->rings->sq_flags &= ~IORING_SQ_NEED_WAKEUP;
4382                 }
4383
4384                 to_submit = min(to_submit, ctx->sq_entries);
4385                 mutex_lock(&ctx->uring_lock);
4386                 ret = io_submit_sqes(ctx, to_submit, NULL, -1, &cur_mm, true);
4387                 mutex_unlock(&ctx->uring_lock);
4388                 if (ret > 0)
4389                         inflight += ret;
4390         }
4391
4392         set_fs(old_fs);
4393         if (cur_mm) {
4394                 unuse_mm(cur_mm);
4395                 mmput(cur_mm);
4396         }
4397         revert_creds(old_cred);
4398
4399         kthread_parkme();
4400
4401         return 0;
4402 }
4403
4404 struct io_wait_queue {
4405         struct wait_queue_entry wq;
4406         struct io_ring_ctx *ctx;
4407         unsigned to_wait;
4408         unsigned nr_timeouts;
4409 };
4410
4411 static inline bool io_should_wake(struct io_wait_queue *iowq, bool noflush)
4412 {
4413         struct io_ring_ctx *ctx = iowq->ctx;
4414
4415         /*
4416          * Wake up if we have enough events, or if a timeout occurred since we
4417          * started waiting. For timeouts, we always want to return to userspace,
4418          * regardless of event count.
4419          */
4420         return io_cqring_events(ctx, noflush) >= iowq->to_wait ||
4421                         atomic_read(&ctx->cq_timeouts) != iowq->nr_timeouts;
4422 }
4423
4424 static int io_wake_function(struct wait_queue_entry *curr, unsigned int mode,
4425                             int wake_flags, void *key)
4426 {
4427         struct io_wait_queue *iowq = container_of(curr, struct io_wait_queue,
4428                                                         wq);
4429
4430         /* use noflush == true, as we can't safely rely on locking context */
4431         if (!io_should_wake(iowq, true))
4432                 return -1;
4433
4434         return autoremove_wake_function(curr, mode, wake_flags, key);
4435 }
4436
4437 /*
4438  * Wait until events become available, if we don't already have some. The
4439  * application must reap them itself, as they reside on the shared cq ring.
4440  */
4441 static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events,
4442                           const sigset_t __user *sig, size_t sigsz)
4443 {
4444         struct io_wait_queue iowq = {
4445                 .wq = {
4446                         .private        = current,
4447                         .func           = io_wake_function,
4448                         .entry          = LIST_HEAD_INIT(iowq.wq.entry),
4449                 },
4450                 .ctx            = ctx,
4451                 .to_wait        = min_events,
4452         };
4453         struct io_rings *rings = ctx->rings;
4454         int ret = 0;
4455
4456         if (io_cqring_events(ctx, false) >= min_events)
4457                 return 0;
4458
4459         if (sig) {
4460 #ifdef CONFIG_COMPAT
4461                 if (in_compat_syscall())
4462                         ret = set_compat_user_sigmask((const compat_sigset_t __user *)sig,
4463                                                       sigsz);
4464                 else
4465 #endif
4466                         ret = set_user_sigmask(sig, sigsz);
4467
4468                 if (ret)
4469                         return ret;
4470         }
4471
4472         iowq.nr_timeouts = atomic_read(&ctx->cq_timeouts);
4473         trace_io_uring_cqring_wait(ctx, min_events);
4474         do {
4475                 prepare_to_wait_exclusive(&ctx->wait, &iowq.wq,
4476                                                 TASK_INTERRUPTIBLE);
4477                 if (io_should_wake(&iowq, false))
4478                         break;
4479                 schedule();
4480                 if (signal_pending(current)) {
4481                         ret = -EINTR;
4482                         break;
4483                 }
4484         } while (1);
4485         finish_wait(&ctx->wait, &iowq.wq);
4486
4487         restore_saved_sigmask_unless(ret == -EINTR);
4488
4489         return READ_ONCE(rings->cq.head) == READ_ONCE(rings->cq.tail) ? ret : 0;
4490 }
4491
4492 static void __io_sqe_files_unregister(struct io_ring_ctx *ctx)
4493 {
4494 #if defined(CONFIG_UNIX)
4495         if (ctx->ring_sock) {
4496                 struct sock *sock = ctx->ring_sock->sk;
4497                 struct sk_buff *skb;
4498
4499                 while ((skb = skb_dequeue(&sock->sk_receive_queue)) != NULL)
4500                         kfree_skb(skb);
4501         }
4502 #else
4503         int i;
4504
4505         for (i = 0; i < ctx->nr_user_files; i++) {
4506                 struct file *file;
4507
4508                 file = io_file_from_index(ctx, i);
4509                 if (file)
4510                         fput(file);
4511         }
4512 #endif
4513 }
4514
4515 static void io_file_ref_kill(struct percpu_ref *ref)
4516 {
4517         struct fixed_file_data *data;
4518
4519         data = container_of(ref, struct fixed_file_data, refs);
4520         complete(&data->done);
4521 }
4522
4523 static int io_sqe_files_unregister(struct io_ring_ctx *ctx)
4524 {
4525         struct fixed_file_data *data = ctx->file_data;
4526         unsigned nr_tables, i;
4527
4528         if (!data)
4529                 return -ENXIO;
4530
4531         /* protect against inflight atomic switch, which drops the ref */
4532         flush_work(&data->ref_work);
4533         percpu_ref_get(&data->refs);
4534         percpu_ref_kill_and_confirm(&data->refs, io_file_ref_kill);
4535         wait_for_completion(&data->done);
4536         percpu_ref_put(&data->refs);
4537         percpu_ref_exit(&data->refs);
4538
4539         __io_sqe_files_unregister(ctx);
4540         nr_tables = DIV_ROUND_UP(ctx->nr_user_files, IORING_MAX_FILES_TABLE);
4541         for (i = 0; i < nr_tables; i++)
4542                 kfree(data->table[i].files);
4543         kfree(data->table);
4544         kfree(data);
4545         ctx->file_data = NULL;
4546         ctx->nr_user_files = 0;
4547         return 0;
4548 }
4549
4550 static void io_sq_thread_stop(struct io_ring_ctx *ctx)
4551 {
4552         if (ctx->sqo_thread) {
4553                 wait_for_completion(&ctx->completions[1]);
4554                 /*
4555                  * The park is a bit of a work-around, without it we get
4556                  * warning spews on shutdown with SQPOLL set and affinity
4557                  * set to a single CPU.
4558                  */
4559                 kthread_park(ctx->sqo_thread);
4560                 kthread_stop(ctx->sqo_thread);
4561                 ctx->sqo_thread = NULL;
4562         }
4563 }
4564
4565 static void io_finish_async(struct io_ring_ctx *ctx)
4566 {
4567         io_sq_thread_stop(ctx);
4568
4569         if (ctx->io_wq) {
4570                 io_wq_destroy(ctx->io_wq);
4571                 ctx->io_wq = NULL;
4572         }
4573 }
4574
4575 #if defined(CONFIG_UNIX)
4576 /*
4577  * Ensure the UNIX gc is aware of our file set, so we are certain that
4578  * the io_uring can be safely unregistered on process exit, even if we have
4579  * loops in the file referencing.
4580  */
4581 static int __io_sqe_files_scm(struct io_ring_ctx *ctx, int nr, int offset)
4582 {
4583         struct sock *sk = ctx->ring_sock->sk;
4584         struct scm_fp_list *fpl;
4585         struct sk_buff *skb;
4586         int i, nr_files;
4587
4588         if (!capable(CAP_SYS_RESOURCE) && !capable(CAP_SYS_ADMIN)) {
4589                 unsigned long inflight = ctx->user->unix_inflight + nr;
4590
4591                 if (inflight > task_rlimit(current, RLIMIT_NOFILE))
4592                         return -EMFILE;
4593         }
4594
4595         fpl = kzalloc(sizeof(*fpl), GFP_KERNEL);
4596         if (!fpl)
4597                 return -ENOMEM;
4598
4599         skb = alloc_skb(0, GFP_KERNEL);
4600         if (!skb) {
4601                 kfree(fpl);
4602                 return -ENOMEM;
4603         }
4604
4605         skb->sk = sk;
4606
4607         nr_files = 0;
4608         fpl->user = get_uid(ctx->user);
4609         for (i = 0; i < nr; i++) {
4610                 struct file *file = io_file_from_index(ctx, i + offset);
4611
4612                 if (!file)
4613                         continue;
4614                 fpl->fp[nr_files] = get_file(file);
4615                 unix_inflight(fpl->user, fpl->fp[nr_files]);
4616                 nr_files++;
4617         }
4618
4619         if (nr_files) {
4620                 fpl->max = SCM_MAX_FD;
4621                 fpl->count = nr_files;
4622                 UNIXCB(skb).fp = fpl;
4623                 skb->destructor = unix_destruct_scm;
4624                 refcount_add(skb->truesize, &sk->sk_wmem_alloc);
4625                 skb_queue_head(&sk->sk_receive_queue, skb);
4626
4627                 for (i = 0; i < nr_files; i++)
4628                         fput(fpl->fp[i]);
4629         } else {
4630                 kfree_skb(skb);
4631                 kfree(fpl);
4632         }
4633
4634         return 0;
4635 }
4636
4637 /*
4638  * If UNIX sockets are enabled, fd passing can cause a reference cycle which
4639  * causes regular reference counting to break down. We rely on the UNIX
4640  * garbage collection to take care of this problem for us.
4641  */
4642 static int io_sqe_files_scm(struct io_ring_ctx *ctx)
4643 {
4644         unsigned left, total;
4645         int ret = 0;
4646
4647         total = 0;
4648         left = ctx->nr_user_files;
4649         while (left) {
4650                 unsigned this_files = min_t(unsigned, left, SCM_MAX_FD);
4651
4652                 ret = __io_sqe_files_scm(ctx, this_files, total);
4653                 if (ret)
4654                         break;
4655                 left -= this_files;
4656                 total += this_files;
4657         }
4658
4659         if (!ret)
4660                 return 0;
4661
4662         while (total < ctx->nr_user_files) {
4663                 struct file *file = io_file_from_index(ctx, total);
4664
4665                 if (file)
4666                         fput(file);
4667                 total++;
4668         }
4669
4670         return ret;
4671 }
4672 #else
4673 static int io_sqe_files_scm(struct io_ring_ctx *ctx)
4674 {
4675         return 0;
4676 }
4677 #endif
4678
4679 static int io_sqe_alloc_file_tables(struct io_ring_ctx *ctx, unsigned nr_tables,
4680                                     unsigned nr_files)
4681 {
4682         int i;
4683
4684         for (i = 0; i < nr_tables; i++) {
4685                 struct fixed_file_table *table = &ctx->file_data->table[i];
4686                 unsigned this_files;
4687
4688                 this_files = min(nr_files, IORING_MAX_FILES_TABLE);
4689                 table->files = kcalloc(this_files, sizeof(struct file *),
4690                                         GFP_KERNEL);
4691                 if (!table->files)
4692                         break;
4693                 nr_files -= this_files;
4694         }
4695
4696         if (i == nr_tables)
4697                 return 0;
4698
4699         for (i = 0; i < nr_tables; i++) {
4700                 struct fixed_file_table *table = &ctx->file_data->table[i];
4701                 kfree(table->files);
4702         }
4703         return 1;
4704 }
4705
4706 static void io_ring_file_put(struct io_ring_ctx *ctx, struct file *file)
4707 {
4708 #if defined(CONFIG_UNIX)
4709         struct sock *sock = ctx->ring_sock->sk;
4710         struct sk_buff_head list, *head = &sock->sk_receive_queue;
4711         struct sk_buff *skb;
4712         int i;
4713
4714         __skb_queue_head_init(&list);
4715
4716         /*
4717          * Find the skb that holds this file in its SCM_RIGHTS. When found,
4718          * remove this entry and rearrange the file array.
4719          */
4720         skb = skb_dequeue(head);
4721         while (skb) {
4722                 struct scm_fp_list *fp;
4723
4724                 fp = UNIXCB(skb).fp;
4725                 for (i = 0; i < fp->count; i++) {
4726                         int left;
4727
4728                         if (fp->fp[i] != file)
4729                                 continue;
4730
4731                         unix_notinflight(fp->user, fp->fp[i]);
4732                         left = fp->count - 1 - i;
4733                         if (left) {
4734                                 memmove(&fp->fp[i], &fp->fp[i + 1],
4735                                                 left * sizeof(struct file *));
4736                         }
4737                         fp->count--;
4738                         if (!fp->count) {
4739                                 kfree_skb(skb);
4740                                 skb = NULL;
4741                         } else {
4742                                 __skb_queue_tail(&list, skb);
4743                         }
4744                         fput(file);
4745                         file = NULL;
4746                         break;
4747                 }
4748
4749                 if (!file)
4750                         break;
4751
4752                 __skb_queue_tail(&list, skb);
4753
4754                 skb = skb_dequeue(head);
4755         }
4756
4757         if (skb_peek(&list)) {
4758                 spin_lock_irq(&head->lock);
4759                 while ((skb = __skb_dequeue(&list)) != NULL)
4760                         __skb_queue_tail(head, skb);
4761                 spin_unlock_irq(&head->lock);
4762         }
4763 #else
4764         fput(file);
4765 #endif
4766 }
4767
4768 struct io_file_put {
4769         struct llist_node llist;
4770         struct file *file;
4771         struct completion *done;
4772 };
4773
4774 static void io_ring_file_ref_switch(struct work_struct *work)
4775 {
4776         struct io_file_put *pfile, *tmp;
4777         struct fixed_file_data *data;
4778         struct llist_node *node;
4779
4780         data = container_of(work, struct fixed_file_data, ref_work);
4781
4782         while ((node = llist_del_all(&data->put_llist)) != NULL) {
4783                 llist_for_each_entry_safe(pfile, tmp, node, llist) {
4784                         io_ring_file_put(data->ctx, pfile->file);
4785                         if (pfile->done)
4786                                 complete(pfile->done);
4787                         else
4788                                 kfree(pfile);
4789                 }
4790         }
4791
4792         percpu_ref_get(&data->refs);
4793         percpu_ref_switch_to_percpu(&data->refs);
4794 }
4795
4796 static void io_file_data_ref_zero(struct percpu_ref *ref)
4797 {
4798         struct fixed_file_data *data;
4799
4800         data = container_of(ref, struct fixed_file_data, refs);
4801
4802         /* we can't safely switch from inside this context, punt to wq */
4803         queue_work(system_wq, &data->ref_work);
4804 }
4805
4806 static int io_sqe_files_register(struct io_ring_ctx *ctx, void __user *arg,
4807                                  unsigned nr_args)
4808 {
4809         __s32 __user *fds = (__s32 __user *) arg;
4810         unsigned nr_tables;
4811         struct file *file;
4812         int fd, ret = 0;
4813         unsigned i;
4814
4815         if (ctx->file_data)
4816                 return -EBUSY;
4817         if (!nr_args)
4818                 return -EINVAL;
4819         if (nr_args > IORING_MAX_FIXED_FILES)
4820                 return -EMFILE;
4821
4822         ctx->file_data = kzalloc(sizeof(*ctx->file_data), GFP_KERNEL);
4823         if (!ctx->file_data)
4824                 return -ENOMEM;
4825         ctx->file_data->ctx = ctx;
4826         init_completion(&ctx->file_data->done);
4827
4828         nr_tables = DIV_ROUND_UP(nr_args, IORING_MAX_FILES_TABLE);
4829         ctx->file_data->table = kcalloc(nr_tables,
4830                                         sizeof(struct fixed_file_table),
4831                                         GFP_KERNEL);
4832         if (!ctx->file_data->table) {
4833                 kfree(ctx->file_data);
4834                 ctx->file_data = NULL;
4835                 return -ENOMEM;
4836         }
4837
4838         if (percpu_ref_init(&ctx->file_data->refs, io_file_data_ref_zero,
4839                                 PERCPU_REF_ALLOW_REINIT, GFP_KERNEL)) {
4840                 kfree(ctx->file_data->table);
4841                 kfree(ctx->file_data);
4842                 ctx->file_data = NULL;
4843                 return -ENOMEM;
4844         }
4845         ctx->file_data->put_llist.first = NULL;
4846         INIT_WORK(&ctx->file_data->ref_work, io_ring_file_ref_switch);
4847
4848         if (io_sqe_alloc_file_tables(ctx, nr_tables, nr_args)) {
4849                 percpu_ref_exit(&ctx->file_data->refs);
4850                 kfree(ctx->file_data->table);
4851                 kfree(ctx->file_data);
4852                 ctx->file_data = NULL;
4853                 return -ENOMEM;
4854         }
4855
4856         for (i = 0; i < nr_args; i++, ctx->nr_user_files++) {
4857                 struct fixed_file_table *table;
4858                 unsigned index;
4859
4860                 ret = -EFAULT;
4861                 if (copy_from_user(&fd, &fds[i], sizeof(fd)))
4862                         break;
4863                 /* allow sparse sets */
4864                 if (fd == -1) {
4865                         ret = 0;
4866                         continue;
4867                 }
4868
4869                 table = &ctx->file_data->table[i >> IORING_FILE_TABLE_SHIFT];
4870                 index = i & IORING_FILE_TABLE_MASK;
4871                 file = fget(fd);
4872
4873                 ret = -EBADF;
4874                 if (!file)
4875                         break;
4876
4877                 /*
4878                  * Don't allow io_uring instances to be registered. If UNIX
4879                  * isn't enabled, then this causes a reference cycle and this
4880                  * instance can never get freed. If UNIX is enabled we'll
4881                  * handle it just fine, but there's still no point in allowing
4882                  * a ring fd as it doesn't support regular read/write anyway.
4883                  */
4884                 if (file->f_op == &io_uring_fops) {
4885                         fput(file);
4886                         break;
4887                 }
4888                 ret = 0;
4889                 table->files[index] = file;
4890         }
4891
4892         if (ret) {
4893                 for (i = 0; i < ctx->nr_user_files; i++) {
4894                         file = io_file_from_index(ctx, i);
4895                         if (file)
4896                                 fput(file);
4897                 }
4898                 for (i = 0; i < nr_tables; i++)
4899                         kfree(ctx->file_data->table[i].files);
4900
4901                 kfree(ctx->file_data->table);
4902                 kfree(ctx->file_data);
4903                 ctx->file_data = NULL;
4904                 ctx->nr_user_files = 0;
4905                 return ret;
4906         }
4907
4908         ret = io_sqe_files_scm(ctx);
4909         if (ret)
4910                 io_sqe_files_unregister(ctx);
4911
4912         return ret;
4913 }
4914
4915 static int io_sqe_file_register(struct io_ring_ctx *ctx, struct file *file,
4916                                 int index)
4917 {
4918 #if defined(CONFIG_UNIX)
4919         struct sock *sock = ctx->ring_sock->sk;
4920         struct sk_buff_head *head = &sock->sk_receive_queue;
4921         struct sk_buff *skb;
4922
4923         /*
4924          * See if we can merge this file into an existing skb SCM_RIGHTS
4925          * file set. If there's no room, fall back to allocating a new skb
4926          * and filling it in.
4927          */
4928         spin_lock_irq(&head->lock);
4929         skb = skb_peek(head);
4930         if (skb) {
4931                 struct scm_fp_list *fpl = UNIXCB(skb).fp;
4932
4933                 if (fpl->count < SCM_MAX_FD) {
4934                         __skb_unlink(skb, head);
4935                         spin_unlock_irq(&head->lock);
4936                         fpl->fp[fpl->count] = get_file(file);
4937                         unix_inflight(fpl->user, fpl->fp[fpl->count]);
4938                         fpl->count++;
4939                         spin_lock_irq(&head->lock);
4940                         __skb_queue_head(head, skb);
4941                 } else {
4942                         skb = NULL;
4943                 }
4944         }
4945         spin_unlock_irq(&head->lock);
4946
4947         if (skb) {
4948                 fput(file);
4949                 return 0;
4950         }
4951
4952         return __io_sqe_files_scm(ctx, 1, index);
4953 #else
4954         return 0;
4955 #endif
4956 }
4957
4958 static void io_atomic_switch(struct percpu_ref *ref)
4959 {
4960         struct fixed_file_data *data;
4961
4962         data = container_of(ref, struct fixed_file_data, refs);
4963         clear_bit(FFD_F_ATOMIC, &data->state);
4964 }
4965
4966 static bool io_queue_file_removal(struct fixed_file_data *data,
4967                                   struct file *file)
4968 {
4969         struct io_file_put *pfile, pfile_stack;
4970         DECLARE_COMPLETION_ONSTACK(done);
4971
4972         /*
4973          * If we fail allocating the struct we need for doing async reomval
4974          * of this file, just punt to sync and wait for it.
4975          */
4976         pfile = kzalloc(sizeof(*pfile), GFP_KERNEL);
4977         if (!pfile) {
4978                 pfile = &pfile_stack;
4979                 pfile->done = &done;
4980         }
4981
4982         pfile->file = file;
4983         llist_add(&pfile->llist, &data->put_llist);
4984
4985         if (pfile == &pfile_stack) {
4986                 if (!test_and_set_bit(FFD_F_ATOMIC, &data->state)) {
4987                         percpu_ref_put(&data->refs);
4988                         percpu_ref_switch_to_atomic(&data->refs,
4989                                                         io_atomic_switch);
4990                 }
4991                 wait_for_completion(&done);
4992                 flush_work(&data->ref_work);
4993                 return false;
4994         }
4995
4996         return true;
4997 }
4998
4999 static int __io_sqe_files_update(struct io_ring_ctx *ctx,
5000                                  struct io_uring_files_update *up,
5001                                  unsigned nr_args)
5002 {
5003         struct fixed_file_data *data = ctx->file_data;
5004         bool ref_switch = false;
5005         struct file *file;
5006         __s32 __user *fds;
5007         int fd, i, err;
5008         __u32 done;
5009
5010         if (check_add_overflow(up->offset, nr_args, &done))
5011                 return -EOVERFLOW;
5012         if (done > ctx->nr_user_files)
5013                 return -EINVAL;
5014
5015         done = 0;
5016         fds = u64_to_user_ptr(up->fds);
5017         while (nr_args) {
5018                 struct fixed_file_table *table;
5019                 unsigned index;
5020
5021                 err = 0;
5022                 if (copy_from_user(&fd, &fds[done], sizeof(fd))) {
5023                         err = -EFAULT;
5024                         break;
5025                 }
5026                 i = array_index_nospec(up->offset, ctx->nr_user_files);
5027                 table = &ctx->file_data->table[i >> IORING_FILE_TABLE_SHIFT];
5028                 index = i & IORING_FILE_TABLE_MASK;
5029                 if (table->files[index]) {
5030                         file = io_file_from_index(ctx, index);
5031                         table->files[index] = NULL;
5032                         if (io_queue_file_removal(data, file))
5033                                 ref_switch = true;
5034                 }
5035                 if (fd != -1) {
5036                         file = fget(fd);
5037                         if (!file) {
5038                                 err = -EBADF;
5039                                 break;
5040                         }
5041                         /*
5042                          * Don't allow io_uring instances to be registered. If
5043                          * UNIX isn't enabled, then this causes a reference
5044                          * cycle and this instance can never get freed. If UNIX
5045                          * is enabled we'll handle it just fine, but there's
5046                          * still no point in allowing a ring fd as it doesn't
5047                          * support regular read/write anyway.
5048                          */
5049                         if (file->f_op == &io_uring_fops) {
5050                                 fput(file);
5051                                 err = -EBADF;
5052                                 break;
5053                         }
5054                         table->files[index] = file;
5055                         err = io_sqe_file_register(ctx, file, i);
5056                         if (err)
5057                                 break;
5058                 }
5059                 nr_args--;
5060                 done++;
5061                 up->offset++;
5062         }
5063
5064         if (ref_switch && !test_and_set_bit(FFD_F_ATOMIC, &data->state)) {
5065                 percpu_ref_put(&data->refs);
5066                 percpu_ref_switch_to_atomic(&data->refs, io_atomic_switch);
5067         }
5068
5069         return done ? done : err;
5070 }
5071 static int io_sqe_files_update(struct io_ring_ctx *ctx, void __user *arg,
5072                                unsigned nr_args)
5073 {
5074         struct io_uring_files_update up;
5075
5076         if (!ctx->file_data)
5077                 return -ENXIO;
5078         if (!nr_args)
5079                 return -EINVAL;
5080         if (copy_from_user(&up, arg, sizeof(up)))
5081                 return -EFAULT;
5082         if (up.resv)
5083                 return -EINVAL;
5084
5085         return __io_sqe_files_update(ctx, &up, nr_args);
5086 }
5087
5088 static void io_put_work(struct io_wq_work *work)
5089 {
5090         struct io_kiocb *req = container_of(work, struct io_kiocb, work);
5091
5092         io_put_req(req);
5093 }
5094
5095 static void io_get_work(struct io_wq_work *work)
5096 {
5097         struct io_kiocb *req = container_of(work, struct io_kiocb, work);
5098
5099         refcount_inc(&req->refs);
5100 }
5101
5102 static int io_sq_offload_start(struct io_ring_ctx *ctx,
5103                                struct io_uring_params *p)
5104 {
5105         struct io_wq_data data;
5106         unsigned concurrency;
5107         int ret;
5108
5109         init_waitqueue_head(&ctx->sqo_wait);
5110         mmgrab(current->mm);
5111         ctx->sqo_mm = current->mm;
5112
5113         if (ctx->flags & IORING_SETUP_SQPOLL) {
5114                 ret = -EPERM;
5115                 if (!capable(CAP_SYS_ADMIN))
5116                         goto err;
5117
5118                 ctx->sq_thread_idle = msecs_to_jiffies(p->sq_thread_idle);
5119                 if (!ctx->sq_thread_idle)
5120                         ctx->sq_thread_idle = HZ;
5121
5122                 if (p->flags & IORING_SETUP_SQ_AFF) {
5123                         int cpu = p->sq_thread_cpu;
5124
5125                         ret = -EINVAL;
5126                         if (cpu >= nr_cpu_ids)
5127                                 goto err;
5128                         if (!cpu_online(cpu))
5129                                 goto err;
5130
5131                         ctx->sqo_thread = kthread_create_on_cpu(io_sq_thread,
5132                                                         ctx, cpu,
5133                                                         "io_uring-sq");
5134                 } else {
5135                         ctx->sqo_thread = kthread_create(io_sq_thread, ctx,
5136                                                         "io_uring-sq");
5137                 }
5138                 if (IS_ERR(ctx->sqo_thread)) {
5139                         ret = PTR_ERR(ctx->sqo_thread);
5140                         ctx->sqo_thread = NULL;
5141                         goto err;
5142                 }
5143                 wake_up_process(ctx->sqo_thread);
5144         } else if (p->flags & IORING_SETUP_SQ_AFF) {
5145                 /* Can't have SQ_AFF without SQPOLL */
5146                 ret = -EINVAL;
5147                 goto err;
5148         }
5149
5150         data.mm = ctx->sqo_mm;
5151         data.user = ctx->user;
5152         data.creds = ctx->creds;
5153         data.get_work = io_get_work;
5154         data.put_work = io_put_work;
5155
5156         /* Do QD, or 4 * CPUS, whatever is smallest */
5157         concurrency = min(ctx->sq_entries, 4 * num_online_cpus());
5158         ctx->io_wq = io_wq_create(concurrency, &data);
5159         if (IS_ERR(ctx->io_wq)) {
5160                 ret = PTR_ERR(ctx->io_wq);
5161                 ctx->io_wq = NULL;
5162                 goto err;
5163         }
5164
5165         return 0;
5166 err:
5167         io_finish_async(ctx);
5168         mmdrop(ctx->sqo_mm);
5169         ctx->sqo_mm = NULL;
5170         return ret;
5171 }
5172
5173 static void io_unaccount_mem(struct user_struct *user, unsigned long nr_pages)
5174 {
5175         atomic_long_sub(nr_pages, &user->locked_vm);
5176 }
5177
5178 static int io_account_mem(struct user_struct *user, unsigned long nr_pages)
5179 {
5180         unsigned long page_limit, cur_pages, new_pages;
5181
5182         /* Don't allow more pages than we can safely lock */
5183         page_limit = rlimit(RLIMIT_MEMLOCK) >> PAGE_SHIFT;
5184
5185         do {
5186                 cur_pages = atomic_long_read(&user->locked_vm);
5187                 new_pages = cur_pages + nr_pages;
5188                 if (new_pages > page_limit)
5189                         return -ENOMEM;
5190         } while (atomic_long_cmpxchg(&user->locked_vm, cur_pages,
5191                                         new_pages) != cur_pages);
5192
5193         return 0;
5194 }
5195
5196 static void io_mem_free(void *ptr)
5197 {
5198         struct page *page;
5199
5200         if (!ptr)
5201                 return;
5202
5203         page = virt_to_head_page(ptr);
5204         if (put_page_testzero(page))
5205                 free_compound_page(page);
5206 }
5207
5208 static void *io_mem_alloc(size_t size)
5209 {
5210         gfp_t gfp_flags = GFP_KERNEL | __GFP_ZERO | __GFP_NOWARN | __GFP_COMP |
5211                                 __GFP_NORETRY;
5212
5213         return (void *) __get_free_pages(gfp_flags, get_order(size));
5214 }
5215
5216 static unsigned long rings_size(unsigned sq_entries, unsigned cq_entries,
5217                                 size_t *sq_offset)
5218 {
5219         struct io_rings *rings;
5220         size_t off, sq_array_size;
5221
5222         off = struct_size(rings, cqes, cq_entries);
5223         if (off == SIZE_MAX)
5224                 return SIZE_MAX;
5225
5226 #ifdef CONFIG_SMP
5227         off = ALIGN(off, SMP_CACHE_BYTES);
5228         if (off == 0)
5229                 return SIZE_MAX;
5230 #endif
5231
5232         sq_array_size = array_size(sizeof(u32), sq_entries);
5233         if (sq_array_size == SIZE_MAX)
5234                 return SIZE_MAX;
5235
5236         if (check_add_overflow(off, sq_array_size, &off))
5237                 return SIZE_MAX;
5238
5239         if (sq_offset)
5240                 *sq_offset = off;
5241
5242         return off;
5243 }
5244
5245 static unsigned long ring_pages(unsigned sq_entries, unsigned cq_entries)
5246 {
5247         size_t pages;
5248
5249         pages = (size_t)1 << get_order(
5250                 rings_size(sq_entries, cq_entries, NULL));
5251         pages += (size_t)1 << get_order(
5252                 array_size(sizeof(struct io_uring_sqe), sq_entries));
5253
5254         return pages;
5255 }
5256
5257 static int io_sqe_buffer_unregister(struct io_ring_ctx *ctx)
5258 {
5259         int i, j;
5260
5261         if (!ctx->user_bufs)
5262                 return -ENXIO;
5263
5264         for (i = 0; i < ctx->nr_user_bufs; i++) {
5265                 struct io_mapped_ubuf *imu = &ctx->user_bufs[i];
5266
5267                 for (j = 0; j < imu->nr_bvecs; j++)
5268                         put_user_page(imu->bvec[j].bv_page);
5269
5270                 if (ctx->account_mem)
5271                         io_unaccount_mem(ctx->user, imu->nr_bvecs);
5272                 kvfree(imu->bvec);
5273                 imu->nr_bvecs = 0;
5274         }
5275
5276         kfree(ctx->user_bufs);
5277         ctx->user_bufs = NULL;
5278         ctx->nr_user_bufs = 0;
5279         return 0;
5280 }
5281
5282 static int io_copy_iov(struct io_ring_ctx *ctx, struct iovec *dst,
5283                        void __user *arg, unsigned index)
5284 {
5285         struct iovec __user *src;
5286
5287 #ifdef CONFIG_COMPAT
5288         if (ctx->compat) {
5289                 struct compat_iovec __user *ciovs;
5290                 struct compat_iovec ciov;
5291
5292                 ciovs = (struct compat_iovec __user *) arg;
5293                 if (copy_from_user(&ciov, &ciovs[index], sizeof(ciov)))
5294                         return -EFAULT;
5295
5296                 dst->iov_base = u64_to_user_ptr((u64)ciov.iov_base);
5297                 dst->iov_len = ciov.iov_len;
5298                 return 0;
5299         }
5300 #endif
5301         src = (struct iovec __user *) arg;
5302         if (copy_from_user(dst, &src[index], sizeof(*dst)))
5303                 return -EFAULT;
5304         return 0;
5305 }
5306
5307 static int io_sqe_buffer_register(struct io_ring_ctx *ctx, void __user *arg,
5308                                   unsigned nr_args)
5309 {
5310         struct vm_area_struct **vmas = NULL;
5311         struct page **pages = NULL;
5312         int i, j, got_pages = 0;
5313         int ret = -EINVAL;
5314
5315         if (ctx->user_bufs)
5316                 return -EBUSY;
5317         if (!nr_args || nr_args > UIO_MAXIOV)
5318                 return -EINVAL;
5319
5320         ctx->user_bufs = kcalloc(nr_args, sizeof(struct io_mapped_ubuf),
5321                                         GFP_KERNEL);
5322         if (!ctx->user_bufs)
5323                 return -ENOMEM;
5324
5325         for (i = 0; i < nr_args; i++) {
5326                 struct io_mapped_ubuf *imu = &ctx->user_bufs[i];
5327                 unsigned long off, start, end, ubuf;
5328                 int pret, nr_pages;
5329                 struct iovec iov;
5330                 size_t size;
5331
5332                 ret = io_copy_iov(ctx, &iov, arg, i);
5333                 if (ret)
5334                         goto err;
5335
5336                 /*
5337                  * Don't impose further limits on the size and buffer
5338                  * constraints here, we'll -EINVAL later when IO is
5339                  * submitted if they are wrong.
5340                  */
5341                 ret = -EFAULT;
5342                 if (!iov.iov_base || !iov.iov_len)
5343                         goto err;
5344
5345                 /* arbitrary limit, but we need something */
5346                 if (iov.iov_len > SZ_1G)
5347                         goto err;
5348
5349                 ubuf = (unsigned long) iov.iov_base;
5350                 end = (ubuf + iov.iov_len + PAGE_SIZE - 1) >> PAGE_SHIFT;
5351                 start = ubuf >> PAGE_SHIFT;
5352                 nr_pages = end - start;
5353
5354                 if (ctx->account_mem) {
5355                         ret = io_account_mem(ctx->user, nr_pages);
5356                         if (ret)
5357                                 goto err;
5358                 }
5359
5360                 ret = 0;
5361                 if (!pages || nr_pages > got_pages) {
5362                         kfree(vmas);
5363                         kfree(pages);
5364                         pages = kvmalloc_array(nr_pages, sizeof(struct page *),
5365                                                 GFP_KERNEL);
5366                         vmas = kvmalloc_array(nr_pages,
5367                                         sizeof(struct vm_area_struct *),
5368                                         GFP_KERNEL);
5369                         if (!pages || !vmas) {
5370                                 ret = -ENOMEM;
5371                                 if (ctx->account_mem)
5372                                         io_unaccount_mem(ctx->user, nr_pages);
5373                                 goto err;
5374                         }
5375                         got_pages = nr_pages;
5376                 }
5377
5378                 imu->bvec = kvmalloc_array(nr_pages, sizeof(struct bio_vec),
5379                                                 GFP_KERNEL);
5380                 ret = -ENOMEM;
5381                 if (!imu->bvec) {
5382                         if (ctx->account_mem)
5383                                 io_unaccount_mem(ctx->user, nr_pages);
5384                         goto err;
5385                 }
5386
5387                 ret = 0;
5388                 down_read(&current->mm->mmap_sem);
5389                 pret = get_user_pages(ubuf, nr_pages,
5390                                       FOLL_WRITE | FOLL_LONGTERM,
5391                                       pages, vmas);
5392                 if (pret == nr_pages) {
5393                         /* don't support file backed memory */
5394                         for (j = 0; j < nr_pages; j++) {
5395                                 struct vm_area_struct *vma = vmas[j];
5396
5397                                 if (vma->vm_file &&
5398                                     !is_file_hugepages(vma->vm_file)) {
5399                                         ret = -EOPNOTSUPP;
5400                                         break;
5401                                 }
5402                         }
5403                 } else {
5404                         ret = pret < 0 ? pret : -EFAULT;
5405                 }
5406                 up_read(&current->mm->mmap_sem);
5407                 if (ret) {
5408                         /*
5409                          * if we did partial map, or found file backed vmas,
5410                          * release any pages we did get
5411                          */
5412                         if (pret > 0)
5413                                 put_user_pages(pages, pret);
5414                         if (ctx->account_mem)
5415                                 io_unaccount_mem(ctx->user, nr_pages);
5416                         kvfree(imu->bvec);
5417                         goto err;
5418                 }
5419
5420                 off = ubuf & ~PAGE_MASK;
5421                 size = iov.iov_len;
5422                 for (j = 0; j < nr_pages; j++) {
5423                         size_t vec_len;
5424
5425                         vec_len = min_t(size_t, size, PAGE_SIZE - off);
5426                         imu->bvec[j].bv_page = pages[j];
5427                         imu->bvec[j].bv_len = vec_len;
5428                         imu->bvec[j].bv_offset = off;
5429                         off = 0;
5430                         size -= vec_len;
5431                 }
5432                 /* store original address for later verification */
5433                 imu->ubuf = ubuf;
5434                 imu->len = iov.iov_len;
5435                 imu->nr_bvecs = nr_pages;
5436
5437                 ctx->nr_user_bufs++;
5438         }
5439         kvfree(pages);
5440         kvfree(vmas);
5441         return 0;
5442 err:
5443         kvfree(pages);
5444         kvfree(vmas);
5445         io_sqe_buffer_unregister(ctx);
5446         return ret;
5447 }
5448
5449 static int io_eventfd_register(struct io_ring_ctx *ctx, void __user *arg)
5450 {
5451         __s32 __user *fds = arg;
5452         int fd;
5453
5454         if (ctx->cq_ev_fd)
5455                 return -EBUSY;
5456
5457         if (copy_from_user(&fd, fds, sizeof(*fds)))
5458                 return -EFAULT;
5459
5460         ctx->cq_ev_fd = eventfd_ctx_fdget(fd);
5461         if (IS_ERR(ctx->cq_ev_fd)) {
5462                 int ret = PTR_ERR(ctx->cq_ev_fd);
5463                 ctx->cq_ev_fd = NULL;
5464                 return ret;
5465         }
5466
5467         return 0;
5468 }
5469
5470 static int io_eventfd_unregister(struct io_ring_ctx *ctx)
5471 {
5472         if (ctx->cq_ev_fd) {
5473                 eventfd_ctx_put(ctx->cq_ev_fd);
5474                 ctx->cq_ev_fd = NULL;
5475                 return 0;
5476         }
5477
5478         return -ENXIO;
5479 }
5480
5481 static void io_ring_ctx_free(struct io_ring_ctx *ctx)
5482 {
5483         io_finish_async(ctx);
5484         if (ctx->sqo_mm)
5485                 mmdrop(ctx->sqo_mm);
5486
5487         io_iopoll_reap_events(ctx);
5488         io_sqe_buffer_unregister(ctx);
5489         io_sqe_files_unregister(ctx);
5490         io_eventfd_unregister(ctx);
5491
5492 #if defined(CONFIG_UNIX)
5493         if (ctx->ring_sock) {
5494                 ctx->ring_sock->file = NULL; /* so that iput() is called */
5495                 sock_release(ctx->ring_sock);
5496         }
5497 #endif
5498
5499         io_mem_free(ctx->rings);
5500         io_mem_free(ctx->sq_sqes);
5501
5502         percpu_ref_exit(&ctx->refs);
5503         if (ctx->account_mem)
5504                 io_unaccount_mem(ctx->user,
5505                                 ring_pages(ctx->sq_entries, ctx->cq_entries));
5506         free_uid(ctx->user);
5507         put_cred(ctx->creds);
5508         kfree(ctx->completions);
5509         kfree(ctx->cancel_hash);
5510         kmem_cache_free(req_cachep, ctx->fallback_req);
5511         kfree(ctx);
5512 }
5513
5514 static __poll_t io_uring_poll(struct file *file, poll_table *wait)
5515 {
5516         struct io_ring_ctx *ctx = file->private_data;
5517         __poll_t mask = 0;
5518
5519         poll_wait(file, &ctx->cq_wait, wait);
5520         /*
5521          * synchronizes with barrier from wq_has_sleeper call in
5522          * io_commit_cqring
5523          */
5524         smp_rmb();
5525         if (READ_ONCE(ctx->rings->sq.tail) - ctx->cached_sq_head !=
5526             ctx->rings->sq_ring_entries)
5527                 mask |= EPOLLOUT | EPOLLWRNORM;
5528         if (READ_ONCE(ctx->rings->cq.head) != ctx->cached_cq_tail)
5529                 mask |= EPOLLIN | EPOLLRDNORM;
5530
5531         return mask;
5532 }
5533
5534 static int io_uring_fasync(int fd, struct file *file, int on)
5535 {
5536         struct io_ring_ctx *ctx = file->private_data;
5537
5538         return fasync_helper(fd, file, on, &ctx->cq_fasync);
5539 }
5540
5541 static void io_ring_ctx_wait_and_kill(struct io_ring_ctx *ctx)
5542 {
5543         mutex_lock(&ctx->uring_lock);
5544         percpu_ref_kill(&ctx->refs);
5545         mutex_unlock(&ctx->uring_lock);
5546
5547         io_kill_timeouts(ctx);
5548         io_poll_remove_all(ctx);
5549
5550         if (ctx->io_wq)
5551                 io_wq_cancel_all(ctx->io_wq);
5552
5553         io_iopoll_reap_events(ctx);
5554         /* if we failed setting up the ctx, we might not have any rings */
5555         if (ctx->rings)
5556                 io_cqring_overflow_flush(ctx, true);
5557         wait_for_completion(&ctx->completions[0]);
5558         io_ring_ctx_free(ctx);
5559 }
5560
5561 static int io_uring_release(struct inode *inode, struct file *file)
5562 {
5563         struct io_ring_ctx *ctx = file->private_data;
5564
5565         file->private_data = NULL;
5566         io_ring_ctx_wait_and_kill(ctx);
5567         return 0;
5568 }
5569
5570 static void io_uring_cancel_files(struct io_ring_ctx *ctx,
5571                                   struct files_struct *files)
5572 {
5573         struct io_kiocb *req;
5574         DEFINE_WAIT(wait);
5575
5576         while (!list_empty_careful(&ctx->inflight_list)) {
5577                 struct io_kiocb *cancel_req = NULL;
5578
5579                 spin_lock_irq(&ctx->inflight_lock);
5580                 list_for_each_entry(req, &ctx->inflight_list, inflight_entry) {
5581                         if (req->work.files != files)
5582                                 continue;
5583                         /* req is being completed, ignore */
5584                         if (!refcount_inc_not_zero(&req->refs))
5585                                 continue;
5586                         cancel_req = req;
5587                         break;
5588                 }
5589                 if (cancel_req)
5590                         prepare_to_wait(&ctx->inflight_wait, &wait,
5591                                                 TASK_UNINTERRUPTIBLE);
5592                 spin_unlock_irq(&ctx->inflight_lock);
5593
5594                 /* We need to keep going until we don't find a matching req */
5595                 if (!cancel_req)
5596                         break;
5597
5598                 io_wq_cancel_work(ctx->io_wq, &cancel_req->work);
5599                 io_put_req(cancel_req);
5600                 schedule();
5601         }
5602         finish_wait(&ctx->inflight_wait, &wait);
5603 }
5604
5605 static int io_uring_flush(struct file *file, void *data)
5606 {
5607         struct io_ring_ctx *ctx = file->private_data;
5608
5609         io_uring_cancel_files(ctx, data);
5610         if (fatal_signal_pending(current) || (current->flags & PF_EXITING)) {
5611                 io_cqring_overflow_flush(ctx, true);
5612                 io_wq_cancel_all(ctx->io_wq);
5613         }
5614         return 0;
5615 }
5616
5617 static void *io_uring_validate_mmap_request(struct file *file,
5618                                             loff_t pgoff, size_t sz)
5619 {
5620         struct io_ring_ctx *ctx = file->private_data;
5621         loff_t offset = pgoff << PAGE_SHIFT;
5622         struct page *page;
5623         void *ptr;
5624
5625         switch (offset) {
5626         case IORING_OFF_SQ_RING:
5627         case IORING_OFF_CQ_RING:
5628                 ptr = ctx->rings;
5629                 break;
5630         case IORING_OFF_SQES:
5631                 ptr = ctx->sq_sqes;
5632                 break;
5633         default:
5634                 return ERR_PTR(-EINVAL);
5635         }
5636
5637         page = virt_to_head_page(ptr);
5638         if (sz > page_size(page))
5639                 return ERR_PTR(-EINVAL);
5640
5641         return ptr;
5642 }
5643
5644 #ifdef CONFIG_MMU
5645
5646 static int io_uring_mmap(struct file *file, struct vm_area_struct *vma)
5647 {
5648         size_t sz = vma->vm_end - vma->vm_start;
5649         unsigned long pfn;
5650         void *ptr;
5651
5652         ptr = io_uring_validate_mmap_request(file, vma->vm_pgoff, sz);
5653         if (IS_ERR(ptr))
5654                 return PTR_ERR(ptr);
5655
5656         pfn = virt_to_phys(ptr) >> PAGE_SHIFT;
5657         return remap_pfn_range(vma, vma->vm_start, pfn, sz, vma->vm_page_prot);
5658 }
5659
5660 #else /* !CONFIG_MMU */
5661
5662 static int io_uring_mmap(struct file *file, struct vm_area_struct *vma)
5663 {
5664         return vma->vm_flags & (VM_SHARED | VM_MAYSHARE) ? 0 : -EINVAL;
5665 }
5666
5667 static unsigned int io_uring_nommu_mmap_capabilities(struct file *file)
5668 {
5669         return NOMMU_MAP_DIRECT | NOMMU_MAP_READ | NOMMU_MAP_WRITE;
5670 }
5671
5672 static unsigned long io_uring_nommu_get_unmapped_area(struct file *file,
5673         unsigned long addr, unsigned long len,
5674         unsigned long pgoff, unsigned long flags)
5675 {
5676         void *ptr;
5677
5678         ptr = io_uring_validate_mmap_request(file, pgoff, len);
5679         if (IS_ERR(ptr))
5680                 return PTR_ERR(ptr);
5681
5682         return (unsigned long) ptr;
5683 }
5684
5685 #endif /* !CONFIG_MMU */
5686
5687 SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit,
5688                 u32, min_complete, u32, flags, const sigset_t __user *, sig,
5689                 size_t, sigsz)
5690 {
5691         struct io_ring_ctx *ctx;
5692         long ret = -EBADF;
5693         int submitted = 0;
5694         struct fd f;
5695
5696         if (flags & ~(IORING_ENTER_GETEVENTS | IORING_ENTER_SQ_WAKEUP))
5697                 return -EINVAL;
5698
5699         f = fdget(fd);
5700         if (!f.file)
5701                 return -EBADF;
5702
5703         ret = -EOPNOTSUPP;
5704         if (f.file->f_op != &io_uring_fops)
5705                 goto out_fput;
5706
5707         ret = -ENXIO;
5708         ctx = f.file->private_data;
5709         if (!percpu_ref_tryget(&ctx->refs))
5710                 goto out_fput;
5711
5712         /*
5713          * For SQ polling, the thread will do all submissions and completions.
5714          * Just return the requested submit count, and wake the thread if
5715          * we were asked to.
5716          */
5717         ret = 0;
5718         if (ctx->flags & IORING_SETUP_SQPOLL) {
5719                 if (!list_empty_careful(&ctx->cq_overflow_list))
5720                         io_cqring_overflow_flush(ctx, false);
5721                 if (flags & IORING_ENTER_SQ_WAKEUP)
5722                         wake_up(&ctx->sqo_wait);
5723                 submitted = to_submit;
5724         } else if (to_submit) {
5725                 struct mm_struct *cur_mm;
5726
5727                 if (current->mm != ctx->sqo_mm ||
5728                     current_cred() != ctx->creds) {
5729                         ret = -EPERM;
5730                         goto out;
5731                 }
5732
5733                 to_submit = min(to_submit, ctx->sq_entries);
5734                 mutex_lock(&ctx->uring_lock);
5735                 /* already have mm, so io_submit_sqes() won't try to grab it */
5736                 cur_mm = ctx->sqo_mm;
5737                 submitted = io_submit_sqes(ctx, to_submit, f.file, fd,
5738                                            &cur_mm, false);
5739                 mutex_unlock(&ctx->uring_lock);
5740
5741                 if (submitted != to_submit)
5742                         goto out;
5743         }
5744         if (flags & IORING_ENTER_GETEVENTS) {
5745                 unsigned nr_events = 0;
5746
5747                 min_complete = min(min_complete, ctx->cq_entries);
5748
5749                 if (ctx->flags & IORING_SETUP_IOPOLL) {
5750                         ret = io_iopoll_check(ctx, &nr_events, min_complete);
5751                 } else {
5752                         ret = io_cqring_wait(ctx, min_complete, sig, sigsz);
5753                 }
5754         }
5755
5756 out:
5757         percpu_ref_put(&ctx->refs);
5758 out_fput:
5759         fdput(f);
5760         return submitted ? submitted : ret;
5761 }
5762
5763 static const struct file_operations io_uring_fops = {
5764         .release        = io_uring_release,
5765         .flush          = io_uring_flush,
5766         .mmap           = io_uring_mmap,
5767 #ifndef CONFIG_MMU
5768         .get_unmapped_area = io_uring_nommu_get_unmapped_area,
5769         .mmap_capabilities = io_uring_nommu_mmap_capabilities,
5770 #endif
5771         .poll           = io_uring_poll,
5772         .fasync         = io_uring_fasync,
5773 };
5774
5775 static int io_allocate_scq_urings(struct io_ring_ctx *ctx,
5776                                   struct io_uring_params *p)
5777 {
5778         struct io_rings *rings;
5779         size_t size, sq_array_offset;
5780
5781         size = rings_size(p->sq_entries, p->cq_entries, &sq_array_offset);
5782         if (size == SIZE_MAX)
5783                 return -EOVERFLOW;
5784
5785         rings = io_mem_alloc(size);
5786         if (!rings)
5787                 return -ENOMEM;
5788
5789         ctx->rings = rings;
5790         ctx->sq_array = (u32 *)((char *)rings + sq_array_offset);
5791         rings->sq_ring_mask = p->sq_entries - 1;
5792         rings->cq_ring_mask = p->cq_entries - 1;
5793         rings->sq_ring_entries = p->sq_entries;
5794         rings->cq_ring_entries = p->cq_entries;
5795         ctx->sq_mask = rings->sq_ring_mask;
5796         ctx->cq_mask = rings->cq_ring_mask;
5797         ctx->sq_entries = rings->sq_ring_entries;
5798         ctx->cq_entries = rings->cq_ring_entries;
5799
5800         size = array_size(sizeof(struct io_uring_sqe), p->sq_entries);
5801         if (size == SIZE_MAX) {
5802                 io_mem_free(ctx->rings);
5803                 ctx->rings = NULL;
5804                 return -EOVERFLOW;
5805         }
5806
5807         ctx->sq_sqes = io_mem_alloc(size);
5808         if (!ctx->sq_sqes) {
5809                 io_mem_free(ctx->rings);
5810                 ctx->rings = NULL;
5811                 return -ENOMEM;
5812         }
5813
5814         return 0;
5815 }
5816
5817 /*
5818  * Allocate an anonymous fd, this is what constitutes the application
5819  * visible backing of an io_uring instance. The application mmaps this
5820  * fd to gain access to the SQ/CQ ring details. If UNIX sockets are enabled,
5821  * we have to tie this fd to a socket for file garbage collection purposes.
5822  */
5823 static int io_uring_get_fd(struct io_ring_ctx *ctx)
5824 {
5825         struct file *file;
5826         int ret;
5827
5828 #if defined(CONFIG_UNIX)
5829         ret = sock_create_kern(&init_net, PF_UNIX, SOCK_RAW, IPPROTO_IP,
5830                                 &ctx->ring_sock);
5831         if (ret)
5832                 return ret;
5833 #endif
5834
5835         ret = get_unused_fd_flags(O_RDWR | O_CLOEXEC);
5836         if (ret < 0)
5837                 goto err;
5838
5839         file = anon_inode_getfile("[io_uring]", &io_uring_fops, ctx,
5840                                         O_RDWR | O_CLOEXEC);
5841         if (IS_ERR(file)) {
5842                 put_unused_fd(ret);
5843                 ret = PTR_ERR(file);
5844                 goto err;
5845         }
5846
5847 #if defined(CONFIG_UNIX)
5848         ctx->ring_sock->file = file;
5849 #endif
5850         fd_install(ret, file);
5851         return ret;
5852 err:
5853 #if defined(CONFIG_UNIX)
5854         sock_release(ctx->ring_sock);
5855         ctx->ring_sock = NULL;
5856 #endif
5857         return ret;
5858 }
5859
5860 static int io_uring_create(unsigned entries, struct io_uring_params *p)
5861 {
5862         struct user_struct *user = NULL;
5863         struct io_ring_ctx *ctx;
5864         bool account_mem;
5865         int ret;
5866
5867         if (!entries || entries > IORING_MAX_ENTRIES)
5868                 return -EINVAL;
5869
5870         /*
5871          * Use twice as many entries for the CQ ring. It's possible for the
5872          * application to drive a higher depth than the size of the SQ ring,
5873          * since the sqes are only used at submission time. This allows for
5874          * some flexibility in overcommitting a bit. If the application has
5875          * set IORING_SETUP_CQSIZE, it will have passed in the desired number
5876          * of CQ ring entries manually.
5877          */
5878         p->sq_entries = roundup_pow_of_two(entries);
5879         if (p->flags & IORING_SETUP_CQSIZE) {
5880                 /*
5881                  * If IORING_SETUP_CQSIZE is set, we do the same roundup
5882                  * to a power-of-two, if it isn't already. We do NOT impose
5883                  * any cq vs sq ring sizing.
5884                  */
5885                 if (p->cq_entries < p->sq_entries || p->cq_entries > IORING_MAX_CQ_ENTRIES)
5886                         return -EINVAL;
5887                 p->cq_entries = roundup_pow_of_two(p->cq_entries);
5888         } else {
5889                 p->cq_entries = 2 * p->sq_entries;
5890         }
5891
5892         user = get_uid(current_user());
5893         account_mem = !capable(CAP_IPC_LOCK);
5894
5895         if (account_mem) {
5896                 ret = io_account_mem(user,
5897                                 ring_pages(p->sq_entries, p->cq_entries));
5898                 if (ret) {
5899                         free_uid(user);
5900                         return ret;
5901                 }
5902         }
5903
5904         ctx = io_ring_ctx_alloc(p);
5905         if (!ctx) {
5906                 if (account_mem)
5907                         io_unaccount_mem(user, ring_pages(p->sq_entries,
5908                                                                 p->cq_entries));
5909                 free_uid(user);
5910                 return -ENOMEM;
5911         }
5912         ctx->compat = in_compat_syscall();
5913         ctx->account_mem = account_mem;
5914         ctx->user = user;
5915         ctx->creds = get_current_cred();
5916
5917         ret = io_allocate_scq_urings(ctx, p);
5918         if (ret)
5919                 goto err;
5920
5921         ret = io_sq_offload_start(ctx, p);
5922         if (ret)
5923                 goto err;
5924
5925         memset(&p->sq_off, 0, sizeof(p->sq_off));
5926         p->sq_off.head = offsetof(struct io_rings, sq.head);
5927         p->sq_off.tail = offsetof(struct io_rings, sq.tail);
5928         p->sq_off.ring_mask = offsetof(struct io_rings, sq_ring_mask);
5929         p->sq_off.ring_entries = offsetof(struct io_rings, sq_ring_entries);
5930         p->sq_off.flags = offsetof(struct io_rings, sq_flags);
5931         p->sq_off.dropped = offsetof(struct io_rings, sq_dropped);
5932         p->sq_off.array = (char *)ctx->sq_array - (char *)ctx->rings;
5933
5934         memset(&p->cq_off, 0, sizeof(p->cq_off));
5935         p->cq_off.head = offsetof(struct io_rings, cq.head);
5936         p->cq_off.tail = offsetof(struct io_rings, cq.tail);
5937         p->cq_off.ring_mask = offsetof(struct io_rings, cq_ring_mask);
5938         p->cq_off.ring_entries = offsetof(struct io_rings, cq_ring_entries);
5939         p->cq_off.overflow = offsetof(struct io_rings, cq_overflow);
5940         p->cq_off.cqes = offsetof(struct io_rings, cqes);
5941
5942         /*
5943          * Install ring fd as the very last thing, so we don't risk someone
5944          * having closed it before we finish setup
5945          */
5946         ret = io_uring_get_fd(ctx);
5947         if (ret < 0)
5948                 goto err;
5949
5950         p->features = IORING_FEAT_SINGLE_MMAP | IORING_FEAT_NODROP |
5951                         IORING_FEAT_SUBMIT_STABLE;
5952         trace_io_uring_create(ret, ctx, p->sq_entries, p->cq_entries, p->flags);
5953         return ret;
5954 err:
5955         io_ring_ctx_wait_and_kill(ctx);
5956         return ret;
5957 }
5958
5959 /*
5960  * Sets up an aio uring context, and returns the fd. Applications asks for a
5961  * ring size, we return the actual sq/cq ring sizes (among other things) in the
5962  * params structure passed in.
5963  */
5964 static long io_uring_setup(u32 entries, struct io_uring_params __user *params)
5965 {
5966         struct io_uring_params p;
5967         long ret;
5968         int i;
5969
5970         if (copy_from_user(&p, params, sizeof(p)))
5971                 return -EFAULT;
5972         for (i = 0; i < ARRAY_SIZE(p.resv); i++) {
5973                 if (p.resv[i])
5974                         return -EINVAL;
5975         }
5976
5977         if (p.flags & ~(IORING_SETUP_IOPOLL | IORING_SETUP_SQPOLL |
5978                         IORING_SETUP_SQ_AFF | IORING_SETUP_CQSIZE))
5979                 return -EINVAL;
5980
5981         ret = io_uring_create(entries, &p);
5982         if (ret < 0)
5983                 return ret;
5984
5985         if (copy_to_user(params, &p, sizeof(p)))
5986                 return -EFAULT;
5987
5988         return ret;
5989 }
5990
5991 SYSCALL_DEFINE2(io_uring_setup, u32, entries,
5992                 struct io_uring_params __user *, params)
5993 {
5994         return io_uring_setup(entries, params);
5995 }
5996
5997 static int __io_uring_register(struct io_ring_ctx *ctx, unsigned opcode,
5998                                void __user *arg, unsigned nr_args)
5999         __releases(ctx->uring_lock)
6000         __acquires(ctx->uring_lock)
6001 {
6002         int ret;
6003
6004         /*
6005          * We're inside the ring mutex, if the ref is already dying, then
6006          * someone else killed the ctx or is already going through
6007          * io_uring_register().
6008          */
6009         if (percpu_ref_is_dying(&ctx->refs))
6010                 return -ENXIO;
6011
6012         if (opcode != IORING_UNREGISTER_FILES &&
6013             opcode != IORING_REGISTER_FILES_UPDATE) {
6014                 percpu_ref_kill(&ctx->refs);
6015
6016                 /*
6017                  * Drop uring mutex before waiting for references to exit. If
6018                  * another thread is currently inside io_uring_enter() it might
6019                  * need to grab the uring_lock to make progress. If we hold it
6020                  * here across the drain wait, then we can deadlock. It's safe
6021                  * to drop the mutex here, since no new references will come in
6022                  * after we've killed the percpu ref.
6023                  */
6024                 mutex_unlock(&ctx->uring_lock);
6025                 wait_for_completion(&ctx->completions[0]);
6026                 mutex_lock(&ctx->uring_lock);
6027         }
6028
6029         switch (opcode) {
6030         case IORING_REGISTER_BUFFERS:
6031                 ret = io_sqe_buffer_register(ctx, arg, nr_args);
6032                 break;
6033         case IORING_UNREGISTER_BUFFERS:
6034                 ret = -EINVAL;
6035                 if (arg || nr_args)
6036                         break;
6037                 ret = io_sqe_buffer_unregister(ctx);
6038                 break;
6039         case IORING_REGISTER_FILES:
6040                 ret = io_sqe_files_register(ctx, arg, nr_args);
6041                 break;
6042         case IORING_UNREGISTER_FILES:
6043                 ret = -EINVAL;
6044                 if (arg || nr_args)
6045                         break;
6046                 ret = io_sqe_files_unregister(ctx);
6047                 break;
6048         case IORING_REGISTER_FILES_UPDATE:
6049                 ret = io_sqe_files_update(ctx, arg, nr_args);
6050                 break;
6051         case IORING_REGISTER_EVENTFD:
6052                 ret = -EINVAL;
6053                 if (nr_args != 1)
6054                         break;
6055                 ret = io_eventfd_register(ctx, arg);
6056                 break;
6057         case IORING_UNREGISTER_EVENTFD:
6058                 ret = -EINVAL;
6059                 if (arg || nr_args)
6060                         break;
6061                 ret = io_eventfd_unregister(ctx);
6062                 break;
6063         default:
6064                 ret = -EINVAL;
6065                 break;
6066         }
6067
6068
6069         if (opcode != IORING_UNREGISTER_FILES &&
6070             opcode != IORING_REGISTER_FILES_UPDATE) {
6071                 /* bring the ctx back to life */
6072                 reinit_completion(&ctx->completions[0]);
6073                 percpu_ref_reinit(&ctx->refs);
6074         }
6075         return ret;
6076 }
6077
6078 SYSCALL_DEFINE4(io_uring_register, unsigned int, fd, unsigned int, opcode,
6079                 void __user *, arg, unsigned int, nr_args)
6080 {
6081         struct io_ring_ctx *ctx;
6082         long ret = -EBADF;
6083         struct fd f;
6084
6085         f = fdget(fd);
6086         if (!f.file)
6087                 return -EBADF;
6088
6089         ret = -EOPNOTSUPP;
6090         if (f.file->f_op != &io_uring_fops)
6091                 goto out_fput;
6092
6093         ctx = f.file->private_data;
6094
6095         mutex_lock(&ctx->uring_lock);
6096         ret = __io_uring_register(ctx, opcode, arg, nr_args);
6097         mutex_unlock(&ctx->uring_lock);
6098         trace_io_uring_register(ctx, opcode, ctx->nr_user_files, ctx->nr_user_bufs,
6099                                                         ctx->cq_ev_fd != NULL, ret);
6100 out_fput:
6101         fdput(f);
6102         return ret;
6103 }
6104
6105 static int __init io_uring_init(void)
6106 {
6107         req_cachep = KMEM_CACHE(io_kiocb, SLAB_HWCACHE_ALIGN | SLAB_PANIC);
6108         return 0;
6109 };
6110 __initcall(io_uring_init);