OSDN Git Service

SUNRPC: Simplify dealing with aborted partially transmitted messages
authorTrond Myklebust <trond.myklebust@hammerspace.com>
Fri, 31 Aug 2018 14:00:02 +0000 (10:00 -0400)
committerTrond Myklebust <trond.myklebust@hammerspace.com>
Sun, 30 Sep 2018 19:35:14 +0000 (15:35 -0400)
If the previous message was only partially transmitted, we need to close
the socket in order to avoid corruption of the message stream. To do so,
we currently hijack the unlocking of the socket in order to schedule
the close.
Now that we track the message offset in the socket state, we can move
that kind of checking out of the socket lock code, which is needed to
allow messages to remain queued after dropping the socket lock.

Signed-off-by: Trond Myklebust <trond.myklebust@hammerspace.com>
net/sunrpc/xprtsock.c

index 629cc45..3fbcceb 100644 (file)
@@ -492,6 +492,16 @@ static int xs_nospace(struct rpc_task *task)
 }
 
 /*
+ * Determine if the previous message in the stream was aborted before it
+ * could complete transmission.
+ */
+static bool
+xs_send_request_was_aborted(struct sock_xprt *transport, struct rpc_rqst *req)
+{
+       return transport->xmit.offset != 0 && req->rq_bytes_sent == 0;
+}
+
+/*
  * Construct a stream transport record marker in @buf.
  */
 static inline void xs_encode_stream_record_marker(struct xdr_buf *buf)
@@ -522,6 +532,12 @@ static int xs_local_send_request(struct rpc_task *task)
        int status;
        int sent = 0;
 
+       /* Close the stream if the previous transmission was incomplete */
+       if (xs_send_request_was_aborted(transport, req)) {
+               xs_close(xprt);
+               return -ENOTCONN;
+       }
+
        xs_encode_stream_record_marker(&req->rq_snd_buf);
 
        xs_pktdump("packet data:",
@@ -665,6 +681,13 @@ static int xs_tcp_send_request(struct rpc_task *task)
        int status;
        int sent;
 
+       /* Close the stream if the previous transmission was incomplete */
+       if (xs_send_request_was_aborted(transport, req)) {
+               if (transport->sock != NULL)
+                       kernel_sock_shutdown(transport->sock, SHUT_RDWR);
+               return -ENOTCONN;
+       }
+
        xs_encode_stream_record_marker(&req->rq_snd_buf);
 
        xs_pktdump("packet data:",
@@ -755,30 +778,6 @@ static int xs_tcp_send_request(struct rpc_task *task)
        return status;
 }
 
-/**
- * xs_tcp_release_xprt - clean up after a tcp transmission
- * @xprt: transport
- * @task: rpc task
- *
- * This cleans up if an error causes us to abort the transmission of a request.
- * In this case, the socket may need to be reset in order to avoid confusing
- * the server.
- */
-static void xs_tcp_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task)
-{
-       struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
-
-       if (task != xprt->snd_task)
-               return;
-       if (task == NULL)
-               goto out_release;
-       if (transport->xmit.offset == 0 || !xprt_connected(xprt))
-               goto out_release;
-       set_bit(XPRT_CLOSE_WAIT, &xprt->state);
-out_release:
-       xprt_release_xprt(xprt, task);
-}
-
 static void xs_save_old_callbacks(struct sock_xprt *transport, struct sock *sk)
 {
        transport->old_data_ready = sk->sk_data_ready;
@@ -2764,7 +2763,7 @@ static void bc_destroy(struct rpc_xprt *xprt)
 
 static const struct rpc_xprt_ops xs_local_ops = {
        .reserve_xprt           = xprt_reserve_xprt,
-       .release_xprt           = xs_tcp_release_xprt,
+       .release_xprt           = xprt_release_xprt,
        .alloc_slot             = xprt_alloc_slot,
        .free_slot              = xprt_free_slot,
        .rpcbind                = xs_local_rpcbind,
@@ -2806,7 +2805,7 @@ static const struct rpc_xprt_ops xs_udp_ops = {
 
 static const struct rpc_xprt_ops xs_tcp_ops = {
        .reserve_xprt           = xprt_reserve_xprt,
-       .release_xprt           = xs_tcp_release_xprt,
+       .release_xprt           = xprt_release_xprt,
        .alloc_slot             = xprt_lock_and_alloc_slot,
        .free_slot              = xprt_free_slot,
        .rpcbind                = rpcb_getport_async,