OSDN Git Service

orangefs: hopefully saner op refcounting and locking
authorAl Viro <viro@zeniv.linux.org.uk>
Sat, 23 Jan 2016 00:47:47 +0000 (19:47 -0500)
committerMike Marshall <hubcap@omnibond.com>
Sat, 23 Jan 2016 18:03:12 +0000 (13:03 -0500)
* create with refcount 1
* make op_release() decrement and free if zero (i.e. old put_op()
  has become that).
* mark when submitter has given up waiting; from that point nobody
  else can move between the lists, change state, etc.
* have daemon read/write_iter grab a reference when picking op
  and *always* give it up in the end
* don't put into hash until we know it's been successfully passed to
  daemon

* move op->lock _lower_ than htab_in_progress_lock (and make sure
  to take it in purge_inprogress_ops())

Signed-off-by: Al Viro <viro@zeniv.linux.org.uk>
Signed-off-by: Mike Marshall <hubcap@omnibond.com>
fs/orangefs/devorangefs-req.c
fs/orangefs/file.c
fs/orangefs/orangefs-cache.c
fs/orangefs/orangefs-kernel.h
fs/orangefs/orangefs-mod.c
fs/orangefs/orangefs-sysfs.c
fs/orangefs/orangefs-utils.c
fs/orangefs/waitqueue.c

index 92573d9..b7a6aa4 100644 (file)
@@ -43,9 +43,7 @@ static void orangefs_devreq_add_op(struct orangefs_kernel_op_s *op)
 {
        int index = hash_func(op->tag, hash_table_size);
 
-       spin_lock(&htable_ops_in_progress_lock);
        list_add_tail(&op->list, &htable_ops_in_progress[index]);
-       spin_unlock(&htable_ops_in_progress_lock);
 }
 
 static struct orangefs_kernel_op_s *orangefs_devreq_remove_op(__u64 tag)
@@ -60,8 +58,9 @@ static struct orangefs_kernel_op_s *orangefs_devreq_remove_op(__u64 tag)
                                 next,
                                 &htable_ops_in_progress[index],
                                 list) {
-               if (op->tag == tag) {
-                       list_del(&op->list);
+               if (op->tag == tag && !op_state_purged(op)) {
+                       list_del_init(&op->list);
+                       get_op(op); /* increase ref count. */
                        spin_unlock(&htable_ops_in_progress_lock);
                        return op;
                }
@@ -127,12 +126,17 @@ static ssize_t orangefs_devreq_read(struct file *file,
                return -EINVAL;
        }
 
+restart:
        /* Get next op (if any) from top of list. */
        spin_lock(&orangefs_request_list_lock);
        list_for_each_entry_safe(op, temp, &orangefs_request_list, list) {
                __s32 fsid;
                /* This lock is held past the end of the loop when we break. */
                spin_lock(&op->lock);
+               if (unlikely(op_state_purged(op))) {
+                       spin_unlock(&op->lock);
+                       continue;
+               }
 
                fsid = fsid_of_op(op);
                if (fsid != ORANGEFS_FS_ID_NULL) {
@@ -197,16 +201,10 @@ static ssize_t orangefs_devreq_read(struct file *file,
                spin_unlock(&orangefs_request_list_lock);
                return -EAGAIN;
        }
-
-       /*
-        * Set the operation to be in progress and move it between lists since
-        * it has been sent to the client.
-        */
-       set_op_state_inprogress(cur_op);
-
-       list_del(&cur_op->list);
+       list_del_init(&cur_op->list);
+       get_op(op);
        spin_unlock(&orangefs_request_list_lock);
-       orangefs_devreq_add_op(cur_op);
+
        spin_unlock(&cur_op->lock);
 
        /* Push the upcall out. */
@@ -224,6 +222,25 @@ static ssize_t orangefs_devreq_read(struct file *file,
        if (ret != 0)
                goto error;
 
+       spin_lock(&htable_ops_in_progress_lock);
+       spin_lock(&cur_op->lock);
+       if (unlikely(op_state_given_up(cur_op))) {
+               spin_unlock(&cur_op->lock);
+               spin_unlock(&htable_ops_in_progress_lock);
+               op_release(cur_op);
+               goto restart;
+       }
+
+       /*
+        * Set the operation to be in progress and move it between lists since
+        * it has been sent to the client.
+        */
+       set_op_state_inprogress(cur_op);
+       orangefs_devreq_add_op(cur_op);
+       spin_unlock(&cur_op->lock);
+       spin_unlock(&htable_ops_in_progress_lock);
+       op_release(cur_op);
+
        /* The client only asks to read one size buffer. */
        return MAX_DEV_REQ_UPSIZE;
 error:
@@ -235,11 +252,13 @@ error:
        gossip_err("orangefs: Failed to copy data to user space\n");
        spin_lock(&orangefs_request_list_lock);
        spin_lock(&cur_op->lock);
-       set_op_state_waiting(cur_op);
-       orangefs_devreq_remove_op(cur_op->tag);
-       list_add(&cur_op->list, &orangefs_request_list);
+       if (likely(!op_state_given_up(cur_op))) {
+               set_op_state_waiting(cur_op);
+               list_add(&cur_op->list, &orangefs_request_list);
+       }
        spin_unlock(&cur_op->lock);
        spin_unlock(&orangefs_request_list_lock);
+       op_release(cur_op);
        return -EFAULT;
 }
 
@@ -278,15 +297,13 @@ static ssize_t orangefs_devreq_write_iter(struct kiocb *iocb,
                           __func__,
                           total,
                           (unsigned int) MAX_DEV_REQ_DOWNSIZE);
-               ret = -EFAULT;
-               goto out;
+               return -EFAULT;
        }
      
        n = copy_from_iter(&head, head_size, iter);
        if (n < head_size) {
                gossip_err("%s: failed to copy head.\n", __func__);
-               ret = -EFAULT;
-               goto out;
+               return -EFAULT;
        }
 
        if (head.version < ORANGEFS_MINIMUM_USERSPACE_VERSION) {
@@ -295,31 +312,26 @@ static ssize_t orangefs_devreq_write_iter(struct kiocb *iocb,
                           __func__,
                           head.version,
                           ORANGEFS_MINIMUM_USERSPACE_VERSION);
-               ret = -EPROTO;
-               goto out;
+               return -EPROTO;
        }
 
        if (head.magic != ORANGEFS_DEVREQ_MAGIC) {
                gossip_err("Error: Device magic number does not match.\n");
-               ret = -EPROTO;
-               goto out;
+               return -EPROTO;
        }
 
        op = orangefs_devreq_remove_op(head.tag);
        if (!op) {
                gossip_err("WARNING: No one's waiting for tag %llu\n",
                           llu(head.tag));
-               goto out;
+               return ret;
        }
 
-       get_op(op); /* increase ref count. */
-
        n = copy_from_iter(&op->downcall, downcall_size, iter);
        if (n != downcall_size) {
                gossip_err("%s: failed to copy downcall.\n", __func__);
-               put_op(op);
                ret = -EFAULT;
-               goto out;
+               goto Broken;
        }
 
        if (op->downcall.status)
@@ -339,9 +351,8 @@ static ssize_t orangefs_devreq_write_iter(struct kiocb *iocb,
                           downcall_size,
                           op->downcall.trailer_size,
                           total);
-               put_op(op);
                ret = -EFAULT;
-               goto out;
+               goto Broken;
        }
 
        /* Only READDIR operations should have trailers. */
@@ -350,9 +361,8 @@ static ssize_t orangefs_devreq_write_iter(struct kiocb *iocb,
                gossip_err("%s: %x operation with trailer.",
                           __func__,
                           op->downcall.type);
-               put_op(op);
                ret = -EFAULT;
-               goto out;
+               goto Broken;
        }
 
        /* READDIR operations should always have trailers. */
@@ -361,9 +371,8 @@ static ssize_t orangefs_devreq_write_iter(struct kiocb *iocb,
                gossip_err("%s: %x operation with no trailer.",
                           __func__,
                           op->downcall.type);
-               put_op(op);
                ret = -EFAULT;
-               goto out;
+               goto Broken;
        }
 
        if (op->downcall.type != ORANGEFS_VFS_OP_READDIR)
@@ -374,9 +383,8 @@ static ssize_t orangefs_devreq_write_iter(struct kiocb *iocb,
        if (op->downcall.trailer_buf == NULL) {
                gossip_err("%s: failed trailer vmalloc.\n",
                           __func__);
-               put_op(op);
                ret = -ENOMEM;
-               goto out;
+               goto Broken;
        }
        memset(op->downcall.trailer_buf, 0, op->downcall.trailer_size);
        n = copy_from_iter(op->downcall.trailer_buf,
@@ -385,9 +393,8 @@ static ssize_t orangefs_devreq_write_iter(struct kiocb *iocb,
        if (n != op->downcall.trailer_size) {
                gossip_err("%s: failed to copy trailer.\n", __func__);
                vfree(op->downcall.trailer_buf);
-               put_op(op);
                ret = -EFAULT;
-               goto out;
+               goto Broken;
        }
 
 wakeup:
@@ -404,7 +411,6 @@ wakeup:
         * the buffers are done being used.
         */
        if (op->downcall.type == ORANGEFS_VFS_OP_FILE_IO) {
-               int timed_out = 0;
                DEFINE_WAIT(wait_entry);
 
                /*
@@ -412,7 +418,8 @@ wakeup:
                 * that this op is done
                 */
                spin_lock(&op->lock);
-               set_op_state_serviced(op);
+               if (!op_state_given_up(op))
+                       set_op_state_serviced(op);
                spin_unlock(&op->lock);
 
                while (1) {
@@ -435,7 +442,6 @@ wakeup:
                                        gossip_debug(GOSSIP_DEV_DEBUG,
                                                "%s: timed out.\n",
                                                __func__);
-                                       timed_out = 1;
                                        break;
                                }
                                continue;
@@ -450,15 +456,6 @@ wakeup:
                spin_lock(&op->lock);
                finish_wait(&op->io_completion_waitq, &wait_entry);
                spin_unlock(&op->lock);
-
-               /* NOTE: for I/O operations we handle releasing the op
-                * object except in the case of timeout.  the reason we
-                * can't free the op in timeout cases is that the op
-                * service logic in the vfs retries operations using
-                * the same op ptr, thus it can't be freed.
-                */
-               if (!timed_out)
-                       op_release(op);
        } else {
                /*
                 * tell the vfs op waiting on a waitqueue that
@@ -468,11 +465,22 @@ wakeup:
                 * notification
                 */
                spin_lock(&op->lock);
-               set_op_state_serviced(op);
+               if (!op_state_given_up(op))
+                       set_op_state_serviced(op);
                spin_unlock(&op->lock);
        }
 out:
+       op_release(op);
        return ret;
+
+Broken:
+       spin_lock(&op->lock);
+       if (!op_state_given_up(op)) {
+               op->downcall.status = ret;
+               set_op_state_serviced(op);
+       }
+       spin_unlock(&op->lock);
+       goto out;
 }
 
 /* Returns whether any FS are still pending remounted */
index 171013a..df3404b 100644 (file)
@@ -105,10 +105,9 @@ static ssize_t wait_for_direct_io(enum ORANGEFS_io_type type, struct inode *inod
        ssize_t ret;
 
        new_op = op_alloc(ORANGEFS_VFS_OP_FILE_IO);
-       if (!new_op) {
-               ret = -ENOMEM;
-               goto out;
-       }
+       if (!new_op)
+               return -ENOMEM;
+
        /* synchronous I/O */
        new_op->upcall.req.io.async_vfs_io = ORANGEFS_VFS_SYNC_IO;
        new_op->upcall.req.io.readahead_size = readahead_size;
@@ -234,12 +233,9 @@ populate_shared_memory:
 
        /*
         * tell the device file owner waiting on I/O that this read has
-        * completed and it can return now.  in this exact case, on
-        * wakeup the daemon will free the op, so we *cannot* touch it
-        * after this.
+        * completed and it can return now.
         */
        wake_up_daemon_for_return(new_op);
-       new_op = NULL;
 
 out:
        if (buffer_index >= 0) {
@@ -249,10 +245,7 @@ out:
                             __func__, handle, buffer_index);
                buffer_index = -1;
        }
-       if (new_op) {
-               op_release(new_op);
-               new_op = NULL;
-       }
+       op_release(new_op);
        return ret;
 }
 
index dd4335f..adc3ab0 100644 (file)
@@ -120,7 +120,7 @@ struct orangefs_kernel_op_s *op_alloc(__s32 type)
                init_waitqueue_head(&new_op->waitq);
 
                init_waitqueue_head(&new_op->io_completion_waitq);
-               atomic_set(&new_op->ref_count, 0);
+               atomic_set(&new_op->ref_count, 1);
 
                orangefs_op_initialize(new_op);
 
@@ -149,14 +149,13 @@ struct orangefs_kernel_op_s *op_alloc(__s32 type)
        return new_op;
 }
 
-void op_release(struct orangefs_kernel_op_s *orangefs_op)
+void __op_release(struct orangefs_kernel_op_s *orangefs_op)
 {
        if (orangefs_op) {
                gossip_debug(GOSSIP_CACHE_DEBUG,
                             "Releasing OP (%p: %llu)\n",
                             orangefs_op,
                             llu(orangefs_op->tag));
-               orangefs_op_initialize(orangefs_op);
                kmem_cache_free(op_cache, orangefs_op);
        } else {
                gossip_err("NULL pointer in op_release\n");
index 4219b2f..f96ec3d 100644 (file)
@@ -94,6 +94,7 @@ sizeof(__u64) + sizeof(struct orangefs_downcall_s))
  * serviced - op has matching downcall; ok
  * purged   - op has to start a timer since client-core
  *            exited uncleanly before servicing op
+ * given up - submitter has given up waiting for it
  */
 enum orangefs_vfs_op_states {
        OP_VFS_STATE_UNKNOWN = 0,
@@ -101,30 +102,9 @@ enum orangefs_vfs_op_states {
        OP_VFS_STATE_INPROGR = 2,
        OP_VFS_STATE_SERVICED = 4,
        OP_VFS_STATE_PURGED = 8,
+       OP_VFS_STATE_GIVEN_UP = 16,
 };
 
-#define get_op(op)                                     \
-       do {                                            \
-               atomic_inc(&(op)->ref_count);   \
-               gossip_debug(GOSSIP_DEV_DEBUG,  \
-                       "(get) Alloced OP (%p:%llu)\n", \
-                       op,                             \
-                       llu((op)->tag));                \
-       } while (0)
-
-#define put_op(op)                                                     \
-       do {                                                            \
-               if (atomic_sub_and_test(1, &(op)->ref_count) == 1) {  \
-                       gossip_debug(GOSSIP_DEV_DEBUG,          \
-                               "(put) Releasing OP (%p:%llu)\n",       \
-                               op,                                     \
-                               llu((op)->tag));                        \
-                       op_release(op);                                 \
-                       }                                               \
-       } while (0)
-
-#define op_wait(op) (atomic_read(&(op)->ref_count) <= 2 ? 0 : 1)
-
 /*
  * Defines for controlling whether I/O upcalls are for async or sync operations
  */
@@ -258,6 +238,25 @@ static inline void set_op_state_purged(struct orangefs_kernel_op_s *op)
 #define op_state_in_progress(op) ((op)->op_state & OP_VFS_STATE_INPROGR)
 #define op_state_serviced(op)    ((op)->op_state & OP_VFS_STATE_SERVICED)
 #define op_state_purged(op)      ((op)->op_state & OP_VFS_STATE_PURGED)
+#define op_state_given_up(op)    ((op)->op_state & OP_VFS_STATE_GIVEN_UP)
+
+static inline void get_op(struct orangefs_kernel_op_s *op)
+{
+       atomic_inc(&op->ref_count);
+       gossip_debug(GOSSIP_DEV_DEBUG,
+                       "(get) Alloced OP (%p:%llu)\n", op, llu(op->tag));
+}
+
+void __op_release(struct orangefs_kernel_op_s *op);
+
+static inline void op_release(struct orangefs_kernel_op_s *op)
+{
+       if (atomic_dec_and_test(&op->ref_count)) {
+               gossip_debug(GOSSIP_DEV_DEBUG,
+                       "(put) Releasing OP (%p:%llu)\n", op, llu((op)->tag));
+               __op_release(op);
+       }
+}
 
 /* per inode private orangefs info */
 struct orangefs_inode_s {
@@ -459,7 +458,6 @@ int op_cache_initialize(void);
 int op_cache_finalize(void);
 struct orangefs_kernel_op_s *op_alloc(__s32 type);
 char *get_opname_string(struct orangefs_kernel_op_s *new_op);
-void op_release(struct orangefs_kernel_op_s *op);
 
 int dev_req_cache_initialize(void);
 int dev_req_cache_finalize(void);
@@ -665,11 +663,9 @@ int service_operation(struct orangefs_kernel_op_s *op,
 do {                                                           \
        if (!op_state_serviced(new_op)) {                       \
                orangefs_cancel_op_in_progress(new_op->tag);    \
-               op_release(new_op);                             \
        } else {                                                \
                wake_up_daemon_for_return(new_op);              \
        }                                                       \
-       new_op = NULL;                                          \
        orangefs_bufmap_put(bufmap, buffer_index);                              \
        buffer_index = -1;                                      \
 } while (0)
index bd9fbfe..e07874e 100644 (file)
@@ -271,6 +271,7 @@ void purge_inprogress_ops(void)
                struct orangefs_kernel_op_s *op;
                struct orangefs_kernel_op_s *next;
 
+               spin_lock(&htable_ops_in_progress_lock);
                list_for_each_entry_safe(op,
                                         next,
                                         &htable_ops_in_progress[i],
@@ -284,6 +285,7 @@ void purge_inprogress_ops(void)
                        set_op_state_purged(op);
                        spin_unlock(&op->lock);
                }
+               spin_unlock(&htable_ops_in_progress_lock);
        }
 }
 
index 3d36038..83f4053 100644 (file)
@@ -773,10 +773,8 @@ static int sysfs_service_op_show(char *kobj_id, char *buf, void *attr)
                op_alloc_type = ORANGEFS_VFS_OP_PERF_COUNT;
 
        new_op = op_alloc(op_alloc_type);
-       if (!new_op) {
-               rc = -ENOMEM;
-               goto out;
-       }
+       if (!new_op)
+               return -ENOMEM;
 
        /* Can't do a service_operation if the client is not running... */
        rc = is_daemon_in_service();
@@ -931,11 +929,7 @@ out:
                }
        }
 
-       /*
-        * if we got ENOMEM, then op_alloc probably failed...
-        */
-       if (rc != -ENOMEM)
-               op_release(new_op);
+       op_release(new_op);
 
        return rc;
 
@@ -1039,10 +1033,8 @@ static int sysfs_service_op_store(char *kobj_id, const char *buf, void *attr)
                     kobj_id);
 
        new_op = op_alloc(ORANGEFS_VFS_OP_PARAM);
-       if (!new_op) {
-               rc = -ENOMEM;
-               goto out;
-       }
+       if (!new_op)
+               return -EINVAL; /* sic */
 
        /* Can't do a service_operation if the client is not running... */
        rc = is_daemon_in_service();
@@ -1269,15 +1261,9 @@ static int sysfs_service_op_store(char *kobj_id, const char *buf, void *attr)
        }
 
 out:
-       /*
-        * if we got ENOMEM, then op_alloc probably failed...
-        */
-       if (rc == -ENOMEM)
-               rc = 0;
-       else
-               op_release(new_op);
+       op_release(new_op);
 
-       if (rc == 0)
+       if (rc == -ENOMEM || rc == 0)
                rc = -EINVAL;
 
        return rc;
index f212332..a611778 100644 (file)
@@ -429,19 +429,15 @@ int orangefs_inode_setattr(struct inode *inode, struct iattr *iattr)
        ret = copy_attributes_from_inode(inode,
                       &new_op->upcall.req.setattr.attributes,
                       iattr);
-       if (ret < 0) {
-               op_release(new_op);
-               return ret;
-       }
-
-       ret = service_operation(new_op, __func__,
+       if (ret >= 0) {
+               ret = service_operation(new_op, __func__,
                                get_interruptible_flag(inode));
 
-       gossip_debug(GOSSIP_UTILS_DEBUG,
-                    "orangefs_inode_setattr: returning %d\n",
-                    ret);
+               gossip_debug(GOSSIP_UTILS_DEBUG,
+                            "orangefs_inode_setattr: returning %d\n",
+                            ret);
+       }
 
-       /* when request is serviced properly, free req op struct */
        op_release(new_op);
 
        /*
index a257891..2e9468f 100644 (file)
@@ -279,25 +279,6 @@ retry_servicing:
        return ret;
 }
 
-static inline void remove_op_from_request_list(struct orangefs_kernel_op_s *op)
-{
-       struct list_head *tmp = NULL;
-       struct list_head *tmp_safe = NULL;
-       struct orangefs_kernel_op_s *tmp_op = NULL;
-
-       spin_lock(&orangefs_request_list_lock);
-       list_for_each_safe(tmp, tmp_safe, &orangefs_request_list) {
-               tmp_op = list_entry(tmp,
-                                   struct orangefs_kernel_op_s,
-                                   list);
-               if (tmp_op && (tmp_op == op)) {
-                       list_del(&tmp_op->list);
-                       break;
-               }
-       }
-       spin_unlock(&orangefs_request_list_lock);
-}
-
 static void orangefs_clean_up_interrupted_operation(struct orangefs_kernel_op_s *op)
 {
        /*
@@ -334,6 +315,7 @@ static void orangefs_clean_up_interrupted_operation(struct orangefs_kernel_op_s
        }
 
        spin_lock(&op->lock);
+       op->op_state |= OP_VFS_STATE_GIVEN_UP;
 
        if (op_state_waiting(op)) {
                /*
@@ -341,7 +323,9 @@ static void orangefs_clean_up_interrupted_operation(struct orangefs_kernel_op_s
                 * list.
                 */
                spin_unlock(&op->lock);
-               remove_op_from_request_list(op);
+               spin_lock(&orangefs_request_list_lock);
+               list_del(&op->list);
+               spin_unlock(&orangefs_request_list_lock);
                gossip_debug(GOSSIP_WAIT_DEBUG,
                             "Interrupted: Removed op %p from request_list\n",
                             op);