OSDN Git Service

epoll: use rwlock in order to reduce ep_poll_callback() contention
[tomoyo/tomoyo-test1.git] / fs / eventpoll.c
index 53c3d46..4a0e98d 100644 (file)
  *
  * 1) epmutex (mutex)
  * 2) ep->mtx (mutex)
- * 3) ep->wq.lock (spinlock)
+ * 3) ep->lock (rwlock)
  *
  * The acquire order is the one listed above, from 1 to 3.
- * We need a spinlock (ep->wq.lock) because we manipulate objects
+ * We need a rwlock (ep->lock) because we manipulate objects
  * from inside the poll callback, that might be triggered from
  * a wake_up() that in turn might be called from IRQ context.
  * So we can't sleep inside the poll callback and hence we need
@@ -85,7 +85,7 @@
  * of epoll file descriptors, we use the current recursion depth as
  * the lockdep subkey.
  * It is possible to drop the "ep->mtx" and to use the global
- * mutex "epmutex" (together with "ep->wq.lock") to have it working,
+ * mutex "epmutex" (together with "ep->lock") to have it working,
  * but having "ep->mtx" will make the interface more scalable.
  * Events that require holding "epmutex" are very rare, while for
  * normal operations the epoll private "ep->mtx" will guarantee
@@ -182,8 +182,6 @@ struct epitem {
  * This structure is stored inside the "private_data" member of the file
  * structure and represents the main data structure for the eventpoll
  * interface.
- *
- * Access to it is protected by the lock inside wq.
  */
 struct eventpoll {
        /*
@@ -203,13 +201,16 @@ struct eventpoll {
        /* List of ready file descriptors */
        struct list_head rdllist;
 
+       /* Lock which protects rdllist and ovflist */
+       rwlock_t lock;
+
        /* RB tree root used to store monitored fd structs */
        struct rb_root_cached rbr;
 
        /*
         * This is a single linked list that chains all the "struct epitem" that
         * happened while transferring ready events to userspace w/out
-        * holding ->wq.lock.
+        * holding ->lock.
         */
        struct epitem *ovflist;
 
@@ -697,17 +698,17 @@ static __poll_t ep_scan_ready_list(struct eventpoll *ep,
         * because we want the "sproc" callback to be able to do it
         * in a lockless way.
         */
-       spin_lock_irq(&ep->wq.lock);
+       write_lock_irq(&ep->lock);
        list_splice_init(&ep->rdllist, &txlist);
        WRITE_ONCE(ep->ovflist, NULL);
-       spin_unlock_irq(&ep->wq.lock);
+       write_unlock_irq(&ep->lock);
 
        /*
         * Now call the callback function.
         */
        res = (*sproc)(ep, &txlist, priv);
 
-       spin_lock_irq(&ep->wq.lock);
+       write_lock_irq(&ep->lock);
        /*
         * During the time we spent inside the "sproc" callback, some
         * other events might have been queued by the poll callback.
@@ -749,11 +750,11 @@ static __poll_t ep_scan_ready_list(struct eventpoll *ep,
                 * the ->poll() wait list (delayed after we release the lock).
                 */
                if (waitqueue_active(&ep->wq))
-                       wake_up_locked(&ep->wq);
+                       wake_up(&ep->wq);
                if (waitqueue_active(&ep->poll_wait))
                        pwake++;
        }
-       spin_unlock_irq(&ep->wq.lock);
+       write_unlock_irq(&ep->lock);
 
        if (!ep_locked)
                mutex_unlock(&ep->mtx);
@@ -793,10 +794,10 @@ static int ep_remove(struct eventpoll *ep, struct epitem *epi)
 
        rb_erase_cached(&epi->rbn, &ep->rbr);
 
-       spin_lock_irq(&ep->wq.lock);
+       write_lock_irq(&ep->lock);
        if (ep_is_linked(epi))
                list_del_init(&epi->rdllink);
-       spin_unlock_irq(&ep->wq.lock);
+       write_unlock_irq(&ep->lock);
 
        wakeup_source_unregister(ep_wakeup_source(epi));
        /*
@@ -846,7 +847,7 @@ static void ep_free(struct eventpoll *ep)
         * Walks through the whole tree by freeing each "struct epitem". At this
         * point we are sure no poll callbacks will be lingering around, and also by
         * holding "epmutex" we can be sure that no file cleanup code will hit
-        * us during this operation. So we can avoid the lock on "ep->wq.lock".
+        * us during this operation. So we can avoid the lock on "ep->lock".
         * We do not need to lock ep->mtx, either, we only do it to prevent
         * a lockdep warning.
         */
@@ -1027,6 +1028,7 @@ static int ep_alloc(struct eventpoll **pep)
                goto free_uid;
 
        mutex_init(&ep->mtx);
+       rwlock_init(&ep->lock);
        init_waitqueue_head(&ep->wq);
        init_waitqueue_head(&ep->poll_wait);
        INIT_LIST_HEAD(&ep->rdllist);
@@ -1116,21 +1118,107 @@ struct file *get_epoll_tfile_raw_ptr(struct file *file, int tfd,
 }
 #endif /* CONFIG_CHECKPOINT_RESTORE */
 
+/**
+ * Adds a new entry to the tail of the list in a lockless way, i.e.
+ * multiple CPUs are allowed to call this function concurrently.
+ *
+ * Beware: it is necessary to prevent any other modifications of the
+ *         existing list until all changes are completed, in other words
+ *         concurrent list_add_tail_lockless() calls should be protected
+ *         with a read lock, where write lock acts as a barrier which
+ *         makes sure all list_add_tail_lockless() calls are fully
+ *         completed.
+ *
+ *        Also an element can be locklessly added to the list only in one
+ *        direction i.e. either to the tail either to the head, otherwise
+ *        concurrent access will corrupt the list.
+ *
+ * Returns %false if element has been already added to the list, %true
+ * otherwise.
+ */
+static inline bool list_add_tail_lockless(struct list_head *new,
+                                         struct list_head *head)
+{
+       struct list_head *prev;
+
+       /*
+        * This is simple 'new->next = head' operation, but cmpxchg()
+        * is used in order to detect that same element has been just
+        * added to the list from another CPU: the winner observes
+        * new->next == new.
+        */
+       if (cmpxchg(&new->next, new, head) != new)
+               return false;
+
+       /*
+        * Initially ->next of a new element must be updated with the head
+        * (we are inserting to the tail) and only then pointers are atomically
+        * exchanged.  XCHG guarantees memory ordering, thus ->next should be
+        * updated before pointers are actually swapped and pointers are
+        * swapped before prev->next is updated.
+        */
+
+       prev = xchg(&head->prev, new);
+
+       /*
+        * It is safe to modify prev->next and new->prev, because a new element
+        * is added only to the tail and new->next is updated before XCHG.
+        */
+
+       prev->next = new;
+       new->prev = prev;
+
+       return true;
+}
+
+/**
+ * Chains a new epi entry to the tail of the ep->ovflist in a lockless way,
+ * i.e. multiple CPUs are allowed to call this function concurrently.
+ *
+ * Returns %false if epi element has been already chained, %true otherwise.
+ */
+static inline bool chain_epi_lockless(struct epitem *epi)
+{
+       struct eventpoll *ep = epi->ep;
+
+       /* Check that the same epi has not been just chained from another CPU */
+       if (cmpxchg(&epi->next, EP_UNACTIVE_PTR, NULL) != EP_UNACTIVE_PTR)
+               return false;
+
+       /* Atomically exchange tail */
+       epi->next = xchg(&ep->ovflist, epi);
+
+       return true;
+}
+
 /*
  * This is the callback that is passed to the wait queue wakeup
  * mechanism. It is called by the stored file descriptors when they
  * have events to report.
+ *
+ * This callback takes a read lock in order not to content with concurrent
+ * events from another file descriptors, thus all modifications to ->rdllist
+ * or ->ovflist are lockless.  Read lock is paired with the write lock from
+ * ep_scan_ready_list(), which stops all list modifications and guarantees
+ * that lists state is seen correctly.
+ *
+ * Another thing worth to mention is that ep_poll_callback() can be called
+ * concurrently for the same @epi from different CPUs if poll table was inited
+ * with several wait queues entries.  Plural wakeup from different CPUs of a
+ * single wait queue is serialized by wq.lock, but the case when multiple wait
+ * queues are used should be detected accordingly.  This is detected using
+ * cmpxchg() operation.
  */
 static int ep_poll_callback(wait_queue_entry_t *wait, unsigned mode, int sync, void *key)
 {
        int pwake = 0;
-       unsigned long flags;
        struct epitem *epi = ep_item_from_wait(wait);
        struct eventpoll *ep = epi->ep;
        __poll_t pollflags = key_to_poll(key);
+       unsigned long flags;
        int ewake = 0;
 
-       spin_lock_irqsave(&ep->wq.lock, flags);
+       read_lock_irqsave(&ep->lock, flags);
 
        ep_set_busy_poll_napi_id(epi);
 
@@ -1159,17 +1247,15 @@ static int ep_poll_callback(wait_queue_entry_t *wait, unsigned mode, int sync, v
         * chained in ep->ovflist and requeued later on.
         */
        if (READ_ONCE(ep->ovflist) != EP_UNACTIVE_PTR) {
-               if (epi->next == EP_UNACTIVE_PTR) {
-                       epi->next = READ_ONCE(ep->ovflist);
-                       WRITE_ONCE(ep->ovflist, epi);
+               if (epi->next == EP_UNACTIVE_PTR &&
+                   chain_epi_lockless(epi))
                        ep_pm_stay_awake_rcu(epi);
-               }
                goto out_unlock;
        }
 
        /* If this file is already in the ready list we exit soon */
-       if (!ep_is_linked(epi)) {
-               list_add_tail(&epi->rdllink, &ep->rdllist);
+       if (!ep_is_linked(epi) &&
+           list_add_tail_lockless(&epi->rdllink, &ep->rdllist)) {
                ep_pm_stay_awake_rcu(epi);
        }
 
@@ -1194,13 +1280,13 @@ static int ep_poll_callback(wait_queue_entry_t *wait, unsigned mode, int sync, v
                                break;
                        }
                }
-               wake_up_locked(&ep->wq);
+               wake_up(&ep->wq);
        }
        if (waitqueue_active(&ep->poll_wait))
                pwake++;
 
 out_unlock:
-       spin_unlock_irqrestore(&ep->wq.lock, flags);
+       read_unlock_irqrestore(&ep->lock, flags);
 
        /* We have to call this outside the lock */
        if (pwake)
@@ -1485,7 +1571,7 @@ static int ep_insert(struct eventpoll *ep, const struct epoll_event *event,
                goto error_remove_epi;
 
        /* We have to drop the new item inside our item list to keep track of it */
-       spin_lock_irq(&ep->wq.lock);
+       write_lock_irq(&ep->lock);
 
        /* record NAPI ID of new item if present */
        ep_set_busy_poll_napi_id(epi);
@@ -1497,12 +1583,12 @@ static int ep_insert(struct eventpoll *ep, const struct epoll_event *event,
 
                /* Notify waiting tasks that events are available */
                if (waitqueue_active(&ep->wq))
-                       wake_up_locked(&ep->wq);
+                       wake_up(&ep->wq);
                if (waitqueue_active(&ep->poll_wait))
                        pwake++;
        }
 
-       spin_unlock_irq(&ep->wq.lock);
+       write_unlock_irq(&ep->lock);
 
        atomic_long_inc(&ep->user->epoll_watches);
 
@@ -1528,10 +1614,10 @@ error_unregister:
         * list, since that is used/cleaned only inside a section bound by "mtx".
         * And ep_insert() is called with "mtx" held.
         */
-       spin_lock_irq(&ep->wq.lock);
+       write_lock_irq(&ep->lock);
        if (ep_is_linked(epi))
                list_del_init(&epi->rdllink);
-       spin_unlock_irq(&ep->wq.lock);
+       write_unlock_irq(&ep->lock);
 
        wakeup_source_unregister(ep_wakeup_source(epi));
 
@@ -1575,9 +1661,9 @@ static int ep_modify(struct eventpoll *ep, struct epitem *epi,
         * 1) Flush epi changes above to other CPUs.  This ensures
         *    we do not miss events from ep_poll_callback if an
         *    event occurs immediately after we call f_op->poll().
-        *    We need this because we did not take ep->wq.lock while
+        *    We need this because we did not take ep->lock while
         *    changing epi above (but ep_poll_callback does take
-        *    ep->wq.lock).
+        *    ep->lock).
         *
         * 2) We also need to ensure we do not miss _past_ events
         *    when calling f_op->poll().  This barrier also
@@ -1596,18 +1682,18 @@ static int ep_modify(struct eventpoll *ep, struct epitem *epi,
         * list, push it inside.
         */
        if (ep_item_poll(epi, &pt, 1)) {
-               spin_lock_irq(&ep->wq.lock);
+               write_lock_irq(&ep->lock);
                if (!ep_is_linked(epi)) {
                        list_add_tail(&epi->rdllink, &ep->rdllist);
                        ep_pm_stay_awake(epi);
 
                        /* Notify waiting tasks that events are available */
                        if (waitqueue_active(&ep->wq))
-                               wake_up_locked(&ep->wq);
+                               wake_up(&ep->wq);
                        if (waitqueue_active(&ep->poll_wait))
                                pwake++;
                }
-               spin_unlock_irq(&ep->wq.lock);
+               write_unlock_irq(&ep->lock);
        }
 
        /* We have to call this outside the lock */
@@ -1768,9 +1854,9 @@ static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,
                 */
                timed_out = 1;
 
-               spin_lock_irq(&ep->wq.lock);
+               write_lock_irq(&ep->lock);
                eavail = ep_events_available(ep);
-               spin_unlock_irq(&ep->wq.lock);
+               write_unlock_irq(&ep->lock);
 
                goto send_events;
        }