OSDN Git Service

ocfs2: fix occurring deadlock by changing ocfs2_wq from global to local
authorjiangyiwen <jiangyiwen@huawei.com>
Fri, 25 Mar 2016 21:21:32 +0000 (14:21 -0700)
committerLinus Torvalds <torvalds@linux-foundation.org>
Fri, 25 Mar 2016 23:37:42 +0000 (16:37 -0700)
This patch fixes a deadlock, as follows:

  Node 1                Node 2                  Node 3
1)volume a and b are    only mount vol a        only mount vol b
  mounted

2)                      start to mount b        start to mount a

3)                      check hb of Node 3      check hb of Node 2
                        in vol a, qs_holds++    in vol b, qs_holds++

4) -------------------- all nodes' network down --------------------

5)                      progress of mount b     the same situation as
                        failed, and then call   Node 2
                        ocfs2_dismount_volume.
                        but the process is hung,
                        since there is a work
                        in ocfs2_wq cannot beo
                        completed. This work is
                        about vol a, because
                        ocfs2_wq is global wq.
                        BTW, this work which is
                        scheduled in ocfs2_wq is
                        ocfs2_orphan_scan_work,
                        and the context in this work
                        needs to take inode lock
                        of orphan_dir, because
                        lockres owner are Node 1 and
                        all nodes' nework has been down
                        at the same time, so it can't
                        get the inode lock.

6)                      Why can't this node be fenced
                        when network disconnected?
                        Because the process of
                        mount is hung what caused qs_holds
                        is not equal 0.

Because all works in the ocfs2_wq are relative to the super block.

The solution is to change the ocfs2_wq from global to local.  In other
words, move it into struct ocfs2_super.

Signed-off-by: Yiwen Jiang <jiangyiwen@huawei.com>
Reviewed-by: Joseph Qi <joseph.qi@huawei.com>
Cc: Xue jiufei <xuejiufei@huawei.com>
Cc: Mark Fasheh <mfasheh@suse.de>
Cc: Joel Becker <jlbec@evilplan.org>
Cc: Cc: Junxiao Bi <junxiao.bi@oracle.com>
Signed-off-by: Andrew Morton <akpm@linux-foundation.org>
Signed-off-by: Linus Torvalds <torvalds@linux-foundation.org>
fs/ocfs2/alloc.c
fs/ocfs2/journal.c
fs/ocfs2/localalloc.c
fs/ocfs2/ocfs2.h
fs/ocfs2/quota_global.c
fs/ocfs2/super.c
fs/ocfs2/super.h

index d002579..1748cea 100644 (file)
@@ -6079,7 +6079,7 @@ void ocfs2_schedule_truncate_log_flush(struct ocfs2_super *osb,
                if (cancel)
                        cancel_delayed_work(&osb->osb_truncate_log_wq);
 
-               queue_delayed_work(ocfs2_wq, &osb->osb_truncate_log_wq,
+               queue_delayed_work(osb->ocfs2_wq, &osb->osb_truncate_log_wq,
                                   OCFS2_TRUNCATE_LOG_FLUSH_INTERVAL);
        }
 }
@@ -6253,7 +6253,7 @@ void ocfs2_truncate_log_shutdown(struct ocfs2_super *osb)
 
        if (tl_inode) {
                cancel_delayed_work(&osb->osb_truncate_log_wq);
-               flush_workqueue(ocfs2_wq);
+               flush_workqueue(osb->ocfs2_wq);
 
                status = ocfs2_flush_truncate_log(osb);
                if (status < 0)
index 61b833b..e607419 100644 (file)
@@ -231,7 +231,7 @@ void ocfs2_recovery_exit(struct ocfs2_super *osb)
        /* At this point, we know that no more recovery threads can be
         * launched, so wait for any recovery completion work to
         * complete. */
-       flush_workqueue(ocfs2_wq);
+       flush_workqueue(osb->ocfs2_wq);
 
        /*
         * Now that recovery is shut down, and the osb is about to be
@@ -1326,7 +1326,7 @@ static void ocfs2_queue_recovery_completion(struct ocfs2_journal *journal,
 
        spin_lock(&journal->j_lock);
        list_add_tail(&item->lri_list, &journal->j_la_cleanups);
-       queue_work(ocfs2_wq, &journal->j_recovery_work);
+       queue_work(journal->j_osb->ocfs2_wq, &journal->j_recovery_work);
        spin_unlock(&journal->j_lock);
 }
 
@@ -1968,7 +1968,7 @@ static void ocfs2_orphan_scan_work(struct work_struct *work)
        mutex_lock(&os->os_lock);
        ocfs2_queue_orphan_scan(osb);
        if (atomic_read(&os->os_state) == ORPHAN_SCAN_ACTIVE)
-               queue_delayed_work(ocfs2_wq, &os->os_orphan_scan_work,
+               queue_delayed_work(osb->ocfs2_wq, &os->os_orphan_scan_work,
                                      ocfs2_orphan_scan_timeout());
        mutex_unlock(&os->os_lock);
 }
@@ -2008,7 +2008,7 @@ void ocfs2_orphan_scan_start(struct ocfs2_super *osb)
                atomic_set(&os->os_state, ORPHAN_SCAN_INACTIVE);
        else {
                atomic_set(&os->os_state, ORPHAN_SCAN_ACTIVE);
-               queue_delayed_work(ocfs2_wq, &os->os_orphan_scan_work,
+               queue_delayed_work(osb->ocfs2_wq, &os->os_orphan_scan_work,
                                   ocfs2_orphan_scan_timeout());
        }
 }
index 7d62c43..fe0d1f9 100644 (file)
@@ -386,7 +386,7 @@ void ocfs2_shutdown_local_alloc(struct ocfs2_super *osb)
        struct ocfs2_dinode *alloc = NULL;
 
        cancel_delayed_work(&osb->la_enable_wq);
-       flush_workqueue(ocfs2_wq);
+       flush_workqueue(osb->ocfs2_wq);
 
        if (osb->local_alloc_state == OCFS2_LA_UNUSED)
                goto out;
@@ -1085,7 +1085,7 @@ static int ocfs2_recalc_la_window(struct ocfs2_super *osb,
                } else {
                        osb->local_alloc_state = OCFS2_LA_DISABLED;
                }
-               queue_delayed_work(ocfs2_wq, &osb->la_enable_wq,
+               queue_delayed_work(osb->ocfs2_wq, &osb->la_enable_wq,
                                   OCFS2_LA_ENABLE_INTERVAL);
                goto out_unlock;
        }
index 7a01262..6cf6538 100644 (file)
@@ -464,6 +464,14 @@ struct ocfs2_super
        struct ocfs2_refcount_tree *osb_ref_tree_lru;
 
        struct mutex system_file_mutex;
+
+       /*
+        * OCFS2 needs to schedule several different types of work which
+        * require cluster locking, disk I/O, recovery waits, etc. Since these
+        * types of work tend to be heavy we avoid using the kernel events
+        * workqueue and schedule on our own.
+        */
+       struct workqueue_struct *ocfs2_wq;
 };
 
 #define OCFS2_SB(sb)       ((struct ocfs2_super *)(sb)->s_fs_info)
index 91bc674..3892f3c 100644 (file)
@@ -726,7 +726,7 @@ static int ocfs2_release_dquot(struct dquot *dquot)
                dqgrab(dquot);
                /* First entry on list -> queue work */
                if (llist_add(&OCFS2_DQUOT(dquot)->list, &osb->dquot_drop_list))
-                       queue_work(ocfs2_wq, &osb->dquot_drop_work);
+                       queue_work(osb->ocfs2_wq, &osb->dquot_drop_work);
                goto out;
        }
        status = ocfs2_lock_global_qf(oinfo, 1);
index e2c4e38..7db631e 100644 (file)
@@ -80,12 +80,6 @@ static struct kmem_cache *ocfs2_inode_cachep;
 struct kmem_cache *ocfs2_dquot_cachep;
 struct kmem_cache *ocfs2_qf_chunk_cachep;
 
-/* OCFS2 needs to schedule several different types of work which
- * require cluster locking, disk I/O, recovery waits, etc. Since these
- * types of work tend to be heavy we avoid using the kernel events
- * workqueue and schedule on our own. */
-struct workqueue_struct *ocfs2_wq = NULL;
-
 static struct dentry *ocfs2_debugfs_root;
 
 MODULE_AUTHOR("Oracle");
@@ -1613,33 +1607,25 @@ static int __init ocfs2_init(void)
        if (status < 0)
                goto out2;
 
-       ocfs2_wq = create_singlethread_workqueue("ocfs2_wq");
-       if (!ocfs2_wq) {
-               status = -ENOMEM;
-               goto out3;
-       }
-
        ocfs2_debugfs_root = debugfs_create_dir("ocfs2", NULL);
        if (!ocfs2_debugfs_root) {
                status = -ENOMEM;
                mlog(ML_ERROR, "Unable to create ocfs2 debugfs root.\n");
-               goto out4;
+               goto out3;
        }
 
        ocfs2_set_locking_protocol();
 
        status = register_quota_format(&ocfs2_quota_format);
        if (status < 0)
-               goto out4;
+               goto out3;
        status = register_filesystem(&ocfs2_fs_type);
        if (!status)
                return 0;
 
        unregister_quota_format(&ocfs2_quota_format);
-out4:
-       destroy_workqueue(ocfs2_wq);
-       debugfs_remove(ocfs2_debugfs_root);
 out3:
+       debugfs_remove(ocfs2_debugfs_root);
        ocfs2_free_mem_caches();
 out2:
        exit_ocfs2_uptodate_cache();
@@ -1650,11 +1636,6 @@ out1:
 
 static void __exit ocfs2_exit(void)
 {
-       if (ocfs2_wq) {
-               flush_workqueue(ocfs2_wq);
-               destroy_workqueue(ocfs2_wq);
-       }
-
        unregister_quota_format(&ocfs2_quota_format);
 
        debugfs_remove(ocfs2_debugfs_root);
@@ -2349,6 +2330,12 @@ static int ocfs2_initialize_super(struct super_block *sb,
        }
        cleancache_init_shared_fs(sb);
 
+       osb->ocfs2_wq = create_singlethread_workqueue("ocfs2_wq");
+       if (!osb->ocfs2_wq) {
+               status = -ENOMEM;
+               mlog_errno(status);
+       }
+
 bail:
        return status;
 }
@@ -2536,6 +2523,12 @@ static void ocfs2_delete_osb(struct ocfs2_super *osb)
 {
        /* This function assumes that the caller has the main osb resource */
 
+       /* ocfs2_initializer_super have already created this workqueue */
+       if (osb->ocfs2_wq) {
+               flush_workqueue(osb->ocfs2_wq);
+               destroy_workqueue(osb->ocfs2_wq);
+       }
+
        ocfs2_free_slot_info(osb);
 
        kfree(osb->osb_orphan_wipes);
index b477d0b..b023e4f 100644 (file)
@@ -26,8 +26,6 @@
 #ifndef OCFS2_SUPER_H
 #define OCFS2_SUPER_H
 
-extern struct workqueue_struct *ocfs2_wq;
-
 int ocfs2_publish_get_mount_state(struct ocfs2_super *osb,
                                  int node_num);