OSDN Git Service

sbitmap: fix possible io hung due to lost wakeup
authorYu Kuai <yukuai3@huawei.com>
Wed, 3 Aug 2022 12:15:04 +0000 (20:15 +0800)
committerJens Axboe <axboe@kernel.dk>
Tue, 23 Aug 2022 13:37:21 +0000 (07:37 -0600)
There are two problems can lead to lost wakeup:

1) invalid wakeup on the wrong waitqueue:

For example, 2 * wake_batch tags are put, while only wake_batch threads
are woken:

__sbq_wake_up
 atomic_cmpxchg -> reset wait_cnt
__sbq_wake_up -> decrease wait_cnt
...
__sbq_wake_up -> wait_cnt is decreased to 0 again
 atomic_cmpxchg
 sbq_index_atomic_inc -> increase wake_index
 wake_up_nr -> wake up and waitqueue might be empty
 sbq_index_atomic_inc -> increase again, one waitqueue is skipped
 wake_up_nr -> invalid wake up because old wakequeue might be empty

To fix the problem, increasing 'wake_index' before resetting 'wait_cnt'.

2) 'wait_cnt' can be decreased while waitqueue is empty

As pointed out by Jan Kara, following race is possible:

CPU1 CPU2
__sbq_wake_up  __sbq_wake_up
 sbq_wake_ptr()  sbq_wake_ptr() -> the same
 wait_cnt = atomic_dec_return()
 /* decreased to 0 */
 sbq_index_atomic_inc()
 /* move to next waitqueue */
 atomic_set()
 /* reset wait_cnt */
 wake_up_nr()
 /* wake up on the old waitqueue */
 wait_cnt = atomic_dec_return()
 /*
  * decrease wait_cnt in the old
  * waitqueue, while it can be
  * empty.
  */

Fix the problem by waking up before updating 'wake_index' and
'wait_cnt'.

With this patch, noted that 'wait_cnt' is still decreased in the old
empty waitqueue, however, the wakeup is redirected to a active waitqueue,
and the extra decrement on the old empty waitqueue is not handled.

Fixes: 88459642cba4 ("blk-mq: abstract tag allocation out into sbitmap library")
Signed-off-by: Yu Kuai <yukuai3@huawei.com>
Reviewed-by: Jan Kara <jack@suse.cz>
Link: https://lore.kernel.org/r/20220803121504.212071-1-yukuai1@huaweicloud.com
Signed-off-by: Jens Axboe <axboe@kernel.dk>
lib/sbitmap.c

index 29eb048..1f31147 100644 (file)
@@ -611,32 +611,43 @@ static bool __sbq_wake_up(struct sbitmap_queue *sbq)
                return false;
 
        wait_cnt = atomic_dec_return(&ws->wait_cnt);
-       if (wait_cnt <= 0) {
-               int ret;
+       /*
+        * For concurrent callers of this, callers should call this function
+        * again to wakeup a new batch on a different 'ws'.
+        */
+       if (wait_cnt < 0 || !waitqueue_active(&ws->wait))
+               return true;
 
-               wake_batch = READ_ONCE(sbq->wake_batch);
+       if (wait_cnt > 0)
+               return false;
 
-               /*
-                * Pairs with the memory barrier in sbitmap_queue_resize() to
-                * ensure that we see the batch size update before the wait
-                * count is reset.
-                */
-               smp_mb__before_atomic();
+       wake_batch = READ_ONCE(sbq->wake_batch);
 
-               /*
-                * For concurrent callers of this, the one that failed the
-                * atomic_cmpxhcg() race should call this function again
-                * to wakeup a new batch on a different 'ws'.
-                */
-               ret = atomic_cmpxchg(&ws->wait_cnt, wait_cnt, wake_batch);
-               if (ret == wait_cnt) {
-                       sbq_index_atomic_inc(&sbq->wake_index);
-                       wake_up_nr(&ws->wait, wake_batch);
-                       return false;
-               }
+       /*
+        * Wake up first in case that concurrent callers decrease wait_cnt
+        * while waitqueue is empty.
+        */
+       wake_up_nr(&ws->wait, wake_batch);
 
-               return true;
-       }
+       /*
+        * Pairs with the memory barrier in sbitmap_queue_resize() to
+        * ensure that we see the batch size update before the wait
+        * count is reset.
+        *
+        * Also pairs with the implicit barrier between decrementing wait_cnt
+        * and checking for waitqueue_active() to make sure waitqueue_active()
+        * sees result of the wakeup if atomic_dec_return() has seen the result
+        * of atomic_set().
+        */
+       smp_mb__before_atomic();
+
+       /*
+        * Increase wake_index before updating wait_cnt, otherwise concurrent
+        * callers can see valid wait_cnt in old waitqueue, which can cause
+        * invalid wakeup on the old waitqueue.
+        */
+       sbq_index_atomic_inc(&sbq->wake_index);
+       atomic_set(&ws->wait_cnt, wake_batch);
 
        return false;
 }