OSDN Git Service

srcu: Move to state-based grace-period sequencing
authorPaul E. McKenney <paulmck@linux.vnet.ibm.com>
Fri, 10 Mar 2017 23:31:55 +0000 (15:31 -0800)
committerPaul E. McKenney <paulmck@linux.vnet.ibm.com>
Tue, 18 Apr 2017 18:38:20 +0000 (11:38 -0700)
The current SRCU grace-period processing might never reach the last
portion of srcu_advance_batches().  This is OK given the current
implementation, as the first portion, up to the try_check_zero()
following the srcu_flip() is sufficient to drive grace periods forward.
However, it has the unfortunate side-effect of making it impossible to
determine when a given grace period has ended, and it will be necessary
to efficiently trace ends of grace periods in order to efficiently handle
per-CPU SRCU callback lists.

This commit therefore adds states to the SRCU grace-period processing,
so that the end of a given SRCU grace period is marked by the transition
to the SRCU_STATE_DONE state.

Signed-off-by: Paul E. McKenney <paulmck@linux.vnet.ibm.com>
include/linux/srcu.h
kernel/rcu/srcu.c

index a598cf3..f149a68 100644 (file)
@@ -48,7 +48,7 @@ struct srcu_struct {
        unsigned long completed;
        struct srcu_array __percpu *per_cpu_ref;
        spinlock_t queue_lock; /* protect ->batch_queue, ->running */
-       bool running;
+       int srcu_state;
        /* callbacks just queued */
        struct rcu_batch batch_queue;
        /* callbacks try to do the first check_zero */
@@ -62,6 +62,12 @@ struct srcu_struct {
 #endif /* #ifdef CONFIG_DEBUG_LOCK_ALLOC */
 };
 
+/* Values for -> state variable. */
+#define SRCU_STATE_IDLE                0
+#define SRCU_STATE_SCAN1       1
+#define SRCU_STATE_SCAN2       2
+#define SRCU_STATE_DONE                3
+
 #ifdef CONFIG_DEBUG_LOCK_ALLOC
 
 int __init_srcu_struct(struct srcu_struct *sp, const char *name,
@@ -89,7 +95,7 @@ void process_srcu(struct work_struct *work);
                .completed = -300,                                      \
                .per_cpu_ref = &name##_srcu_array,                      \
                .queue_lock = __SPIN_LOCK_UNLOCKED(name.queue_lock),    \
-               .running = false,                                       \
+               .srcu_state = SRCU_STATE_IDLE,                          \
                .batch_queue = RCU_BATCH_INIT(name.batch_queue),        \
                .batch_check0 = RCU_BATCH_INIT(name.batch_check0),      \
                .batch_check1 = RCU_BATCH_INIT(name.batch_check1),      \
index 821ecda..70286f6 100644 (file)
@@ -111,7 +111,7 @@ static int init_srcu_struct_fields(struct srcu_struct *sp)
 {
        sp->completed = 0;
        spin_lock_init(&sp->queue_lock);
-       sp->running = false;
+       sp->srcu_state = SRCU_STATE_IDLE;
        rcu_batch_init(&sp->batch_queue);
        rcu_batch_init(&sp->batch_check0);
        rcu_batch_init(&sp->batch_check1);
@@ -270,7 +270,7 @@ void cleanup_srcu_struct(struct srcu_struct *sp)
        if (WARN_ON(!rcu_all_batches_empty(sp)))
                return; /* Leakage unless caller handles error. */
        flush_delayed_work(&sp->work);
-       if (WARN_ON(sp->running))
+       if (WARN_ON(READ_ONCE(sp->srcu_state) != SRCU_STATE_IDLE))
                return; /* Caller forgot to stop doing call_srcu()? */
        free_percpu(sp->per_cpu_ref);
        sp->per_cpu_ref = NULL;
@@ -391,8 +391,8 @@ void call_srcu(struct srcu_struct *sp, struct rcu_head *head,
        spin_lock_irqsave(&sp->queue_lock, flags);
        smp_mb__after_unlock_lock(); /* Caller's prior accesses before GP. */
        rcu_batch_queue(&sp->batch_queue, head);
-       if (!sp->running) {
-               sp->running = true;
+       if (READ_ONCE(sp->srcu_state) == SRCU_STATE_IDLE) {
+               WRITE_ONCE(sp->srcu_state, SRCU_STATE_SCAN1);
                queue_delayed_work(system_power_efficient_wq, &sp->work, 0);
        }
        spin_unlock_irqrestore(&sp->queue_lock, flags);
@@ -424,9 +424,9 @@ static void __synchronize_srcu(struct srcu_struct *sp, int trycount)
        head->func = wakeme_after_rcu;
        spin_lock_irq(&sp->queue_lock);
        smp_mb__after_unlock_lock(); /* Caller's prior accesses before GP. */
-       if (!sp->running) {
+       if (READ_ONCE(sp->srcu_state) == SRCU_STATE_IDLE) {
                /* steal the processing owner */
-               sp->running = true;
+               WRITE_ONCE(sp->srcu_state, SRCU_STATE_SCAN1);
                rcu_batch_queue(&sp->batch_check0, head);
                spin_unlock_irq(&sp->queue_lock);
                /* give the processing owner to work_struct */
@@ -548,7 +548,9 @@ static void srcu_collect_new(struct srcu_struct *sp)
  */
 static void srcu_advance_batches(struct srcu_struct *sp, int trycount)
 {
-       int idx = 1 ^ (sp->completed & 1);
+       int idx;
+
+       WARN_ON_ONCE(sp->srcu_state == SRCU_STATE_IDLE);
 
        /*
         * Because readers might be delayed for an extended period after
@@ -558,48 +560,56 @@ static void srcu_advance_batches(struct srcu_struct *sp, int trycount)
         * invoking a callback.
         */
 
-       if (rcu_batch_empty(&sp->batch_check0) &&
-           rcu_batch_empty(&sp->batch_check1))
-               return; /* no callbacks need to be advanced */
-
-       if (!try_check_zero(sp, idx, trycount))
-               return; /* failed to advance, will try after SRCU_INTERVAL */
-
-       /*
-        * The callbacks in ->batch_check1 have already done with their
-        * first zero check and flip back when they were enqueued on
-        * ->batch_check0 in a previous invocation of srcu_advance_batches().
-        * (Presumably try_check_zero() returned false during that
-        * invocation, leaving the callbacks stranded on ->batch_check1.)
-        * They are therefore ready to invoke, so move them to ->batch_done.
-        */
-       rcu_batch_move(&sp->batch_done, &sp->batch_check1);
-
-       if (rcu_batch_empty(&sp->batch_check0))
-               return; /* no callbacks need to be advanced */
-       srcu_flip(sp);
-
-       /*
-        * The callbacks in ->batch_check0 just finished their
-        * first check zero and flip, so move them to ->batch_check1
-        * for future checking on the other idx.
-        */
-       rcu_batch_move(&sp->batch_check1, &sp->batch_check0);
-
-       /*
-        * SRCU read-side critical sections are normally short, so check
-        * at least twice in quick succession after a flip.
-        */
-       trycount = trycount < 2 ? 2 : trycount;
-       if (!try_check_zero(sp, idx^1, trycount))
-               return; /* failed to advance, will try after SRCU_INTERVAL */
+       if (sp->srcu_state == SRCU_STATE_DONE)
+               WRITE_ONCE(sp->srcu_state, SRCU_STATE_SCAN1);
+
+       if (sp->srcu_state == SRCU_STATE_SCAN1) {
+               idx = 1 ^ (sp->completed & 1);
+               if (!try_check_zero(sp, idx, trycount))
+                       return; /* readers present, retry after SRCU_INTERVAL */
+
+               /*
+                * The callbacks in ->batch_check1 have already done
+                * with their first zero check and flip back when they were
+                * enqueued on ->batch_check0 in a previous invocation of
+                * srcu_advance_batches().  (Presumably try_check_zero()
+                * returned false during that invocation, leaving the
+                * callbacks stranded on ->batch_check1.) They are therefore
+                * ready to invoke, so move them to ->batch_done.
+                */
+               rcu_batch_move(&sp->batch_done, &sp->batch_check1);
+               srcu_flip(sp);
+
+               /*
+                * The callbacks in ->batch_check0 just finished their
+                * first check zero and flip, so move them to ->batch_check1
+                * for future checking on the other idx.
+                */
+               rcu_batch_move(&sp->batch_check1, &sp->batch_check0);
+
+               WRITE_ONCE(sp->srcu_state, SRCU_STATE_SCAN2);
+       }
 
-       /*
-        * The callbacks in ->batch_check1 have now waited for all
-        * pre-existing readers using both idx values.  They are therefore
-        * ready to invoke, so move them to ->batch_done.
-        */
-       rcu_batch_move(&sp->batch_done, &sp->batch_check1);
+       if (sp->srcu_state == SRCU_STATE_SCAN2) {
+
+               /*
+                * SRCU read-side critical sections are normally short,
+                * so check at least twice in quick succession after a flip.
+                */
+               idx = 1 ^ (sp->completed & 1);
+               trycount = trycount < 2 ? 2 : trycount;
+               if (!try_check_zero(sp, idx, trycount))
+                       return; /* readers present, retry after SRCU_INTERVAL */
+
+               /*
+                * The callbacks in ->batch_check1 have now waited for
+                * all pre-existing readers using both idx values.  They are
+                * therefore ready to invoke, so move them to ->batch_done.
+                */
+               rcu_batch_move(&sp->batch_done, &sp->batch_check1);
+
+               WRITE_ONCE(sp->srcu_state, SRCU_STATE_DONE);
+       }
 }
 
 /*
@@ -633,8 +643,9 @@ static void srcu_reschedule(struct srcu_struct *sp, unsigned long delay)
 
        if (rcu_all_batches_empty(sp)) {
                spin_lock_irq(&sp->queue_lock);
-               if (rcu_all_batches_empty(sp)) {
-                       sp->running = false;
+               if (rcu_all_batches_empty(sp) &&
+                   READ_ONCE(sp->srcu_state) == SRCU_STATE_DONE) {
+                       WRITE_ONCE(sp->srcu_state, SRCU_STATE_IDLE);
                        pending = false;
                }
                spin_unlock_irq(&sp->queue_lock);