OSDN Git Service

btrfs: flush write bio if we loop in extent_write_cache_pages
[sagit-ice-cold/kernel_xiaomi_msm8998.git] / kernel / workqueue.c
index c579dba..3fb2d45 100644 (file)
@@ -68,6 +68,7 @@ enum {
         * attach_mutex to avoid changing binding state while
         * worker_attach_to_pool() is in progress.
         */
+       POOL_MANAGER_ACTIVE     = 1 << 0,       /* being managed */
        POOL_DISASSOCIATED      = 1 << 2,       /* cpu can't serve workers */
 
        /* worker flags */
@@ -163,7 +164,6 @@ struct worker_pool {
                                                /* L: hash of busy workers */
 
        /* see manage_workers() for details on the two manager mutexes */
-       struct mutex            manager_arb;    /* manager arbitration */
        struct worker           *manager;       /* L: purely informational */
        struct mutex            attach_mutex;   /* attach/detach exclusion */
        struct list_head        workers;        /* A: attached workers */
@@ -295,6 +295,7 @@ static struct workqueue_attrs *wq_update_unbound_numa_attrs_buf;
 
 static DEFINE_MUTEX(wq_pool_mutex);    /* protects pools and workqueues list */
 static DEFINE_SPINLOCK(wq_mayday_lock);        /* protects wq->maydays list */
+static DECLARE_WAIT_QUEUE_HEAD(wq_manager_wait); /* wait for manager to go away */
 
 static LIST_HEAD(workqueues);          /* PR: list of all workqueues */
 static bool workqueue_freezing;                /* PL: have wqs started freezing? */
@@ -568,6 +569,16 @@ static struct pool_workqueue *unbound_pwq_by_node(struct workqueue_struct *wq,
                                                  int node)
 {
        assert_rcu_or_wq_mutex_or_pool_mutex(wq);
+
+       /*
+        * XXX: @node can be NUMA_NO_NODE if CPU goes offline while a
+        * delayed item is pending.  The plan is to keep CPU -> NODE
+        * mapping valid and stable across CPU on/offlines.  Once that
+        * happens, this workaround can be removed.
+        */
+       if (unlikely(node == NUMA_NO_NODE))
+               return wq->dfl_pwq;
+
        return rcu_dereference_raw(wq->numa_pwq_tbl[node]);
 }
 
@@ -639,6 +650,35 @@ static void set_work_pool_and_clear_pending(struct work_struct *work,
         */
        smp_wmb();
        set_work_data(work, (unsigned long)pool_id << WORK_OFFQ_POOL_SHIFT, 0);
+       /*
+        * The following mb guarantees that previous clear of a PENDING bit
+        * will not be reordered with any speculative LOADS or STORES from
+        * work->current_func, which is executed afterwards.  This possible
+        * reordering can lead to a missed execution on attempt to qeueue
+        * the same @work.  E.g. consider this case:
+        *
+        *   CPU#0                         CPU#1
+        *   ----------------------------  --------------------------------
+        *
+        * 1  STORE event_indicated
+        * 2  queue_work_on() {
+        * 3    test_and_set_bit(PENDING)
+        * 4 }                             set_..._and_clear_pending() {
+        * 5                                 set_work_data() # clear bit
+        * 6                                 smp_mb()
+        * 7                               work->current_func() {
+        * 8                                  LOAD event_indicated
+        *                                 }
+        *
+        * Without an explicit full barrier speculative LOAD on line 8 can
+        * be executed before CPU#0 does STORE on line 1.  If that happens,
+        * CPU#0 observes the PENDING bit is still set and new execution of
+        * a @work is not queued in a hope, that CPU#1 will eventually
+        * finish the queued @work.  Meanwhile CPU#1 does not see
+        * event_indicated is set, because speculative LOAD was executed
+        * before actual STORE.
+        */
+       smp_mb();
 }
 
 static void clear_work_data(struct work_struct *work)
@@ -769,7 +809,7 @@ static bool need_to_create_worker(struct worker_pool *pool)
 /* Do we have too many workers and should some go away? */
 static bool too_many_workers(struct worker_pool *pool)
 {
-       bool managing = mutex_is_locked(&pool->manager_arb);
+       bool managing = pool->flags & POOL_MANAGER_ACTIVE;
        int nr_idle = pool->nr_idle + managing; /* manager is considered idle */
        int nr_busy = pool->nr_workers - nr_idle;
 
@@ -1439,6 +1479,7 @@ static void __queue_delayed_work(int cpu, struct workqueue_struct *wq,
        struct timer_list *timer = &dwork->timer;
        struct work_struct *work = &dwork->work;
 
+       WARN_ON_ONCE(!wq);
        WARN_ON_ONCE(timer->function != delayed_work_timer_fn ||
                     timer->data != (unsigned long)dwork);
        WARN_ON_ONCE(timer_pending(timer));
@@ -1458,13 +1499,13 @@ static void __queue_delayed_work(int cpu, struct workqueue_struct *wq,
        timer_stats_timer_set_start_info(&dwork->timer);
 
        dwork->wq = wq;
-       /* timer isn't guaranteed to run in this cpu, record earlier */
-       if (cpu == WORK_CPU_UNBOUND)
-               cpu = raw_smp_processor_id();
        dwork->cpu = cpu;
        timer->expires = jiffies + delay;
 
-       add_timer_on(timer, cpu);
+       if (unlikely(cpu != WORK_CPU_UNBOUND))
+               add_timer_on(timer, cpu);
+       else
+               add_timer(timer);
 }
 
 /**
@@ -1913,24 +1954,17 @@ static bool manage_workers(struct worker *worker)
 {
        struct worker_pool *pool = worker->pool;
 
-       /*
-        * Anyone who successfully grabs manager_arb wins the arbitration
-        * and becomes the manager.  mutex_trylock() on pool->manager_arb
-        * failure while holding pool->lock reliably indicates that someone
-        * else is managing the pool and the worker which failed trylock
-        * can proceed to executing work items.  This means that anyone
-        * grabbing manager_arb is responsible for actually performing
-        * manager duties.  If manager_arb is grabbed and released without
-        * actual management, the pool may stall indefinitely.
-        */
-       if (!mutex_trylock(&pool->manager_arb))
+       if (pool->flags & POOL_MANAGER_ACTIVE)
                return false;
+
+       pool->flags |= POOL_MANAGER_ACTIVE;
        pool->manager = worker;
 
        maybe_create_worker(pool);
 
        pool->manager = NULL;
-       mutex_unlock(&pool->manager_arb);
+       pool->flags &= ~POOL_MANAGER_ACTIVE;
+       wake_up(&wq_manager_wait);
        return true;
 }
 
@@ -2274,8 +2308,14 @@ repeat:
                         */
                        if (need_to_create_worker(pool)) {
                                spin_lock(&wq_mayday_lock);
-                               get_pwq(pwq);
-                               list_move_tail(&pwq->mayday_node, &wq->maydays);
+                               /*
+                                * Queue iff we aren't racing destruction
+                                * and somebody else hasn't queued it already.
+                                */
+                               if (wq->rescuer && list_empty(&pwq->mayday_node)) {
+                                       get_pwq(pwq);
+                                       list_add_tail(&pwq->mayday_node, &wq->maydays);
+                               }
                                spin_unlock(&wq_mayday_lock);
                        }
                }
@@ -3080,7 +3120,6 @@ static int init_worker_pool(struct worker_pool *pool)
        setup_timer(&pool->mayday_timer, pool_mayday_timeout,
                    (unsigned long)pool);
 
-       mutex_init(&pool->manager_arb);
        mutex_init(&pool->attach_mutex);
        INIT_LIST_HEAD(&pool->workers);
 
@@ -3150,13 +3189,15 @@ static void put_unbound_pool(struct worker_pool *pool)
        hash_del(&pool->hash_node);
 
        /*
-        * Become the manager and destroy all workers.  Grabbing
-        * manager_arb prevents @pool's workers from blocking on
-        * attach_mutex.
+        * Become the manager and destroy all workers.  This prevents
+        * @pool's workers from blocking on attach_mutex.  We're the last
+        * manager and @pool gets freed with the flag set.
         */
-       mutex_lock(&pool->manager_arb);
-
        spin_lock_irq(&pool->lock);
+       wait_event_lock_irq(wq_manager_wait,
+                           !(pool->flags & POOL_MANAGER_ACTIVE), pool->lock);
+       pool->flags |= POOL_MANAGER_ACTIVE;
+
        while ((worker = first_idle_worker(pool)))
                destroy_worker(worker);
        WARN_ON(pool->nr_workers || pool->nr_idle);
@@ -3170,8 +3211,6 @@ static void put_unbound_pool(struct worker_pool *pool)
        if (pool->detach_completion)
                wait_for_completion(pool->detach_completion);
 
-       mutex_unlock(&pool->manager_arb);
-
        /* shut down the timers */
        del_timer_sync(&pool->idle_timer);
        del_timer_sync(&pool->mayday_timer);
@@ -3608,8 +3647,12 @@ static int apply_workqueue_attrs_locked(struct workqueue_struct *wq,
                return -EINVAL;
 
        /* creating multiple pwqs breaks ordering guarantee */
-       if (WARN_ON((wq->flags & __WQ_ORDERED) && !list_empty(&wq->pwqs)))
-               return -EINVAL;
+       if (!list_empty(&wq->pwqs)) {
+               if (WARN_ON(wq->flags & __WQ_ORDERED_EXPLICIT))
+                       return -EINVAL;
+
+               wq->flags &= ~__WQ_ORDERED;
+       }
 
        ctx = apply_wqattrs_prepare(wq, attrs);
 
@@ -3795,6 +3838,16 @@ struct workqueue_struct *__alloc_workqueue_key(const char *fmt,
        struct workqueue_struct *wq;
        struct pool_workqueue *pwq;
 
+       /*
+        * Unbound && max_active == 1 used to imply ordered, which is no
+        * longer the case on NUMA machines due to per-node pools.  While
+        * alloc_ordered_workqueue() is the right way to create an ordered
+        * workqueue, keep the previous behavior to avoid subtle breakages
+        * on NUMA.
+        */
+       if ((flags & WQ_UNBOUND) && max_active == 1)
+               flags |= __WQ_ORDERED;
+
        /* see the comment above the definition of WQ_POWER_EFFICIENT */
        if ((flags & WQ_POWER_EFFICIENT) && wq_power_efficient)
                flags |= WQ_UNBOUND;
@@ -3902,9 +3955,29 @@ void destroy_workqueue(struct workqueue_struct *wq)
        struct pool_workqueue *pwq;
        int node;
 
+       /*
+        * Remove it from sysfs first so that sanity check failure doesn't
+        * lead to sysfs name conflicts.
+        */
+       workqueue_sysfs_unregister(wq);
+
        /* drain it before proceeding with destruction */
        drain_workqueue(wq);
 
+       /* kill rescuer, if sanity checks fail, leave it w/o rescuer */
+       if (wq->rescuer) {
+               struct worker *rescuer = wq->rescuer;
+
+               /* this prevents new queueing */
+               spin_lock_irq(&wq_mayday_lock);
+               wq->rescuer = NULL;
+               spin_unlock_irq(&wq_mayday_lock);
+
+               /* rescuer will empty maydays list before exiting */
+               kthread_stop(rescuer->task);
+               kfree(rescuer);
+       }
+
        /* sanity checks */
        mutex_lock(&wq->mutex);
        for_each_pwq(pwq, wq) {
@@ -3934,11 +4007,6 @@ void destroy_workqueue(struct workqueue_struct *wq)
        list_del_rcu(&wq->list);
        mutex_unlock(&wq_pool_mutex);
 
-       workqueue_sysfs_unregister(wq);
-
-       if (wq->rescuer)
-               kthread_stop(wq->rescuer->task);
-
        if (!(wq->flags & WQ_UNBOUND)) {
                /*
                 * The base ref is never dropped on per-cpu pwqs.  Directly
@@ -3983,13 +4051,14 @@ void workqueue_set_max_active(struct workqueue_struct *wq, int max_active)
        struct pool_workqueue *pwq;
 
        /* disallow meddling with max_active for ordered workqueues */
-       if (WARN_ON(wq->flags & __WQ_ORDERED))
+       if (WARN_ON(wq->flags & __WQ_ORDERED_EXPLICIT))
                return;
 
        max_active = wq_clamp_max_active(max_active, wq->flags, wq->name);
 
        mutex_lock(&wq->mutex);
 
+       wq->flags &= ~__WQ_ORDERED;
        wq->saved_max_active = max_active;
 
        for_each_pwq(pwq, wq)
@@ -4000,6 +4069,22 @@ void workqueue_set_max_active(struct workqueue_struct *wq, int max_active)
 EXPORT_SYMBOL_GPL(workqueue_set_max_active);
 
 /**
+ * current_work - retrieve %current task's work struct
+ *
+ * Determine if %current task is a workqueue worker and what it's working on.
+ * Useful to find out the context that the %current task is running in.
+ *
+ * Return: work struct if %current task is a workqueue worker, %NULL otherwise.
+ */
+struct work_struct *current_work(void)
+{
+       struct worker *worker = current_wq_worker();
+
+       return worker ? worker->current_work : NULL;
+}
+EXPORT_SYMBOL(current_work);
+
+/**
  * current_is_workqueue_rescuer - is %current workqueue rescuer?
  *
  * Determine whether %current is a workqueue rescuer.  Can be used from
@@ -4198,7 +4283,8 @@ static void show_pwq(struct pool_workqueue *pwq)
        pr_info("  pwq %d:", pool->id);
        pr_cont_pool_info(pool);
 
-       pr_cont(" active=%d/%d%s\n", pwq->nr_active, pwq->max_active,
+       pr_cont(" active=%d/%d refcnt=%d%s\n",
+               pwq->nr_active, pwq->max_active, pwq->refcnt,
                !list_empty(&pwq->mayday_node) ? " MAYDAY" : "");
 
        hash_for_each(pool->busy_hash, bkt, worker, hentry) {
@@ -4418,6 +4504,17 @@ static void rebind_workers(struct worker_pool *pool)
                                                  pool->attrs->cpumask) < 0);
 
        spin_lock_irq(&pool->lock);
+
+       /*
+        * XXX: CPU hotplug notifiers are weird and can call DOWN_FAILED
+        * w/o preceding DOWN_PREPARE.  Work around it.  CPU hotplug is
+        * being reworked and this can go away in time.
+        */
+       if (!(pool->flags & POOL_DISASSOCIATED)) {
+               spin_unlock_irq(&pool->lock);
+               return;
+       }
+
        pool->flags &= ~POOL_DISASSOCIATED;
 
        for_each_pool_worker(worker, pool) {
@@ -5104,7 +5201,7 @@ int workqueue_sysfs_register(struct workqueue_struct *wq)
         * attributes breaks ordering guarantee.  Disallow exposing ordered
         * workqueues.
         */
-       if (WARN_ON(wq->flags & __WQ_ORDERED))
+       if (WARN_ON(wq->flags & __WQ_ORDERED_EXPLICIT))
                return -EINVAL;
 
        wq->wq_dev = wq_dev = kzalloc(sizeof(*wq_dev), GFP_KERNEL);
@@ -5124,7 +5221,7 @@ int workqueue_sysfs_register(struct workqueue_struct *wq)
 
        ret = device_register(&wq_dev->dev);
        if (ret) {
-               kfree(wq_dev);
+               put_device(&wq_dev->dev);
                wq->wq_dev = NULL;
                return ret;
        }