OSDN Git Service

SUNRPC: Refactor xprt_transmit() to remove wait for reply code
authorTrond Myklebust <trond.myklebust@hammerspace.com>
Thu, 23 Aug 2018 04:03:43 +0000 (00:03 -0400)
committerTrond Myklebust <trond.myklebust@hammerspace.com>
Sun, 30 Sep 2018 19:35:14 +0000 (15:35 -0400)
Allow the caller in clnt.c to call into the code to wait for a reply
after calling xprt_transmit(). Again, the reason is that the backchannel
code does not need this functionality.

Signed-off-by: Trond Myklebust <trond.myklebust@hammerspace.com>
include/linux/sunrpc/xprt.h
net/sunrpc/clnt.c
net/sunrpc/xprt.c

index 0250294..4fa2af0 100644 (file)
@@ -335,6 +335,7 @@ void                        xprt_free_slot(struct rpc_xprt *xprt,
 void                   xprt_lock_and_alloc_slot(struct rpc_xprt *xprt, struct rpc_task *task);
 bool                   xprt_prepare_transmit(struct rpc_task *task);
 void                   xprt_request_enqueue_receive(struct rpc_task *task);
+void                   xprt_request_wait_receive(struct rpc_task *task);
 void                   xprt_transmit(struct rpc_task *task);
 void                   xprt_end_transmit(struct rpc_task *task);
 int                    xprt_adjust_timeout(struct rpc_rqst *req);
index 4149662..775d6e8 100644 (file)
@@ -1975,15 +1975,6 @@ call_transmit(struct rpc_task *task)
                return;
        if (is_retrans)
                task->tk_client->cl_stats->rpcretrans++;
-       /*
-        * On success, ensure that we call xprt_end_transmit() before sleeping
-        * in order to allow access to the socket to other RPC requests.
-        */
-       call_transmit_status(task);
-       if (rpc_reply_expected(task))
-               return;
-       task->tk_action = rpc_exit_task;
-       rpc_wake_up_queued_task(&task->tk_rqstp->rq_xprt->pending, task);
 }
 
 /*
@@ -2000,6 +1991,7 @@ call_transmit_status(struct rpc_task *task)
         */
        if (task->tk_status == 0) {
                xprt_end_transmit(task);
+               xprt_request_wait_receive(task);
                return;
        }
 
index 2ae0a4c..a6a33c1 100644 (file)
@@ -654,6 +654,22 @@ void xprt_force_disconnect(struct rpc_xprt *xprt)
 }
 EXPORT_SYMBOL_GPL(xprt_force_disconnect);
 
+static unsigned int
+xprt_connect_cookie(struct rpc_xprt *xprt)
+{
+       return READ_ONCE(xprt->connect_cookie);
+}
+
+static bool
+xprt_request_retransmit_after_disconnect(struct rpc_task *task)
+{
+       struct rpc_rqst *req = task->tk_rqstp;
+       struct rpc_xprt *xprt = req->rq_xprt;
+
+       return req->rq_connect_cookie != xprt_connect_cookie(xprt) ||
+               !xprt_connected(xprt);
+}
+
 /**
  * xprt_conditional_disconnect - force a transport to disconnect
  * @xprt: transport to disconnect
@@ -1010,6 +1026,39 @@ static void xprt_timer(struct rpc_task *task)
 }
 
 /**
+ * xprt_request_wait_receive - wait for the reply to an RPC request
+ * @task: RPC task about to send a request
+ *
+ */
+void xprt_request_wait_receive(struct rpc_task *task)
+{
+       struct rpc_rqst *req = task->tk_rqstp;
+       struct rpc_xprt *xprt = req->rq_xprt;
+
+       if (!test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate))
+               return;
+       /*
+        * Sleep on the pending queue if we're expecting a reply.
+        * The spinlock ensures atomicity between the test of
+        * req->rq_reply_bytes_recvd, and the call to rpc_sleep_on().
+        */
+       spin_lock(&xprt->queue_lock);
+       if (test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate)) {
+               xprt->ops->set_retrans_timeout(task);
+               rpc_sleep_on(&xprt->pending, task, xprt_timer);
+               /*
+                * Send an extra queue wakeup call if the
+                * connection was dropped in case the call to
+                * rpc_sleep_on() raced.
+                */
+               if (xprt_request_retransmit_after_disconnect(task))
+                       rpc_wake_up_queued_task_set_status(&xprt->pending,
+                                       task, -ENOTCONN);
+       }
+       spin_unlock(&xprt->queue_lock);
+}
+
+/**
  * xprt_prepare_transmit - reserve the transport before sending a request
  * @task: RPC task about to send a request
  *
@@ -1028,9 +1077,8 @@ bool xprt_prepare_transmit(struct rpc_task *task)
                        task->tk_status = req->rq_reply_bytes_recvd;
                        goto out_unlock;
                }
-               if ((task->tk_flags & RPC_TASK_NO_RETRANS_TIMEOUT)
-                   && xprt_connected(xprt)
-                   && req->rq_connect_cookie == xprt->connect_cookie) {
+               if ((task->tk_flags & RPC_TASK_NO_RETRANS_TIMEOUT) &&
+                   !xprt_request_retransmit_after_disconnect(task)) {
                        xprt->ops->set_retrans_timeout(task);
                        rpc_sleep_on(&xprt->pending, task, xprt_timer);
                        goto out_unlock;
@@ -1091,8 +1139,6 @@ void xprt_transmit(struct rpc_task *task)
        task->tk_flags |= RPC_TASK_SENT;
        spin_lock_bh(&xprt->transport_lock);
 
-       xprt->ops->set_retrans_timeout(task);
-
        xprt->stat.sends++;
        xprt->stat.req_u += xprt->stat.sends - xprt->stat.recvs;
        xprt->stat.bklog_u += xprt->backlog.qlen;
@@ -1101,22 +1147,6 @@ void xprt_transmit(struct rpc_task *task)
        spin_unlock_bh(&xprt->transport_lock);
 
        req->rq_connect_cookie = connect_cookie;
-       if (test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate)) {
-               /*
-                * Sleep on the pending queue if we're expecting a reply.
-                * The spinlock ensures atomicity between the test of
-                * req->rq_reply_bytes_recvd, and the call to rpc_sleep_on().
-                */
-               spin_lock(&xprt->queue_lock);
-               if (test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate)) {
-                       rpc_sleep_on(&xprt->pending, task, xprt_timer);
-                       /* Wake up immediately if the connection was dropped */
-                       if (!xprt_connected(xprt))
-                               rpc_wake_up_queued_task_set_status(&xprt->pending,
-                                               task, -ENOTCONN);
-               }
-               spin_unlock(&xprt->queue_lock);
-       }
 }
 
 static void xprt_add_backlog(struct rpc_xprt *xprt, struct rpc_task *task)
@@ -1321,7 +1351,7 @@ xprt_request_init(struct rpc_task *task)
        req->rq_xprt    = xprt;
        req->rq_buffer  = NULL;
        req->rq_xid     = xprt_alloc_xid(xprt);
-       req->rq_connect_cookie = xprt->connect_cookie - 1;
+       req->rq_connect_cookie = xprt_connect_cookie(xprt) - 1;
        req->rq_bytes_sent = 0;
        req->rq_snd_buf.len = 0;
        req->rq_snd_buf.buflen = 0;