OSDN Git Service

Merge tag 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/rdma/rdma
[uclinux-h8/linux.git] / net / sunrpc / xprtrdma / backchannel.c
1 // SPDX-License-Identifier: GPL-2.0
2 /*
3  * Copyright (c) 2015 Oracle.  All rights reserved.
4  *
5  * Support for backward direction RPCs on RPC/RDMA.
6  */
7
8 #include <linux/sunrpc/xprt.h>
9 #include <linux/sunrpc/svc.h>
10 #include <linux/sunrpc/svc_xprt.h>
11 #include <linux/sunrpc/svc_rdma.h>
12
13 #include "xprt_rdma.h"
14 #include <trace/events/rpcrdma.h>
15
16 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
17 # define RPCDBG_FACILITY        RPCDBG_TRANS
18 #endif
19
20 #undef RPCRDMA_BACKCHANNEL_DEBUG
21
22 static int rpcrdma_bc_setup_reqs(struct rpcrdma_xprt *r_xprt,
23                                  unsigned int count)
24 {
25         struct rpc_xprt *xprt = &r_xprt->rx_xprt;
26         struct rpcrdma_req *req;
27         struct rpc_rqst *rqst;
28         unsigned int i;
29
30         for (i = 0; i < (count << 1); i++) {
31                 struct rpcrdma_regbuf *rb;
32                 size_t size;
33
34                 req = rpcrdma_create_req(r_xprt);
35                 if (IS_ERR(req))
36                         return PTR_ERR(req);
37                 rqst = &req->rl_slot;
38
39                 rqst->rq_xprt = xprt;
40                 INIT_LIST_HEAD(&rqst->rq_bc_list);
41                 __set_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state);
42                 spin_lock(&xprt->bc_pa_lock);
43                 list_add(&rqst->rq_bc_pa_list, &xprt->bc_pa_list);
44                 spin_unlock(&xprt->bc_pa_lock);
45
46                 size = r_xprt->rx_data.inline_rsize;
47                 rb = rpcrdma_alloc_regbuf(size, DMA_TO_DEVICE, GFP_KERNEL);
48                 if (IS_ERR(rb))
49                         goto out_fail;
50                 req->rl_sendbuf = rb;
51                 xdr_buf_init(&rqst->rq_snd_buf, rb->rg_base,
52                              min_t(size_t, size, PAGE_SIZE));
53         }
54         return 0;
55
56 out_fail:
57         rpcrdma_req_destroy(req);
58         return -ENOMEM;
59 }
60
61 /**
62  * xprt_rdma_bc_setup - Pre-allocate resources for handling backchannel requests
63  * @xprt: transport associated with these backchannel resources
64  * @reqs: number of concurrent incoming requests to expect
65  *
66  * Returns 0 on success; otherwise a negative errno
67  */
68 int xprt_rdma_bc_setup(struct rpc_xprt *xprt, unsigned int reqs)
69 {
70         struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
71         int rc;
72
73         /* The backchannel reply path returns each rpc_rqst to the
74          * bc_pa_list _after_ the reply is sent. If the server is
75          * faster than the client, it can send another backward
76          * direction request before the rpc_rqst is returned to the
77          * list. The client rejects the request in this case.
78          *
79          * Twice as many rpc_rqsts are prepared to ensure there is
80          * always an rpc_rqst available as soon as a reply is sent.
81          */
82         if (reqs > RPCRDMA_BACKWARD_WRS >> 1)
83                 goto out_err;
84
85         rc = rpcrdma_bc_setup_reqs(r_xprt, reqs);
86         if (rc)
87                 goto out_free;
88
89         r_xprt->rx_buf.rb_bc_srv_max_requests = reqs;
90         trace_xprtrdma_cb_setup(r_xprt, reqs);
91         return 0;
92
93 out_free:
94         xprt_rdma_bc_destroy(xprt, reqs);
95
96 out_err:
97         pr_err("RPC:       %s: setup backchannel transport failed\n", __func__);
98         return -ENOMEM;
99 }
100
101 /**
102  * xprt_rdma_bc_maxpayload - Return maximum backchannel message size
103  * @xprt: transport
104  *
105  * Returns maximum size, in bytes, of a backchannel message
106  */
107 size_t xprt_rdma_bc_maxpayload(struct rpc_xprt *xprt)
108 {
109         struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
110         struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
111         size_t maxmsg;
112
113         maxmsg = min_t(unsigned int, cdata->inline_rsize, cdata->inline_wsize);
114         maxmsg = min_t(unsigned int, maxmsg, PAGE_SIZE);
115         return maxmsg - RPCRDMA_HDRLEN_MIN;
116 }
117
118 static int rpcrdma_bc_marshal_reply(struct rpc_rqst *rqst)
119 {
120         struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt);
121         struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
122         __be32 *p;
123
124         rpcrdma_set_xdrlen(&req->rl_hdrbuf, 0);
125         xdr_init_encode(&req->rl_stream, &req->rl_hdrbuf,
126                         req->rl_rdmabuf->rg_base);
127
128         p = xdr_reserve_space(&req->rl_stream, 28);
129         if (unlikely(!p))
130                 return -EIO;
131         *p++ = rqst->rq_xid;
132         *p++ = rpcrdma_version;
133         *p++ = cpu_to_be32(r_xprt->rx_buf.rb_bc_srv_max_requests);
134         *p++ = rdma_msg;
135         *p++ = xdr_zero;
136         *p++ = xdr_zero;
137         *p = xdr_zero;
138
139         if (rpcrdma_prepare_send_sges(r_xprt, req, RPCRDMA_HDRLEN_MIN,
140                                       &rqst->rq_snd_buf, rpcrdma_noch))
141                 return -EIO;
142
143         trace_xprtrdma_cb_reply(rqst);
144         return 0;
145 }
146
147 /**
148  * xprt_rdma_bc_send_reply - marshal and send a backchannel reply
149  * @rqst: RPC rqst with a backchannel RPC reply in rq_snd_buf
150  *
151  * Caller holds the transport's write lock.
152  *
153  * Returns:
154  *      %0 if the RPC message has been sent
155  *      %-ENOTCONN if the caller should reconnect and call again
156  *      %-EIO if a permanent error occurred and the request was not
157  *              sent. Do not try to send this message again.
158  */
159 int xprt_rdma_bc_send_reply(struct rpc_rqst *rqst)
160 {
161         struct rpc_xprt *xprt = rqst->rq_xprt;
162         struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
163         struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
164         int rc;
165
166         if (!xprt_connected(xprt))
167                 return -ENOTCONN;
168
169         if (!xprt_request_get_cong(xprt, rqst))
170                 return -EBADSLT;
171
172         rc = rpcrdma_bc_marshal_reply(rqst);
173         if (rc < 0)
174                 goto failed_marshal;
175
176         if (rpcrdma_ep_post(&r_xprt->rx_ia, &r_xprt->rx_ep, req))
177                 goto drop_connection;
178         return 0;
179
180 failed_marshal:
181         if (rc != -ENOTCONN)
182                 return rc;
183 drop_connection:
184         xprt_rdma_close(xprt);
185         return -ENOTCONN;
186 }
187
188 /**
189  * xprt_rdma_bc_destroy - Release resources for handling backchannel requests
190  * @xprt: transport associated with these backchannel resources
191  * @reqs: number of incoming requests to destroy; ignored
192  */
193 void xprt_rdma_bc_destroy(struct rpc_xprt *xprt, unsigned int reqs)
194 {
195         struct rpc_rqst *rqst, *tmp;
196
197         spin_lock(&xprt->bc_pa_lock);
198         list_for_each_entry_safe(rqst, tmp, &xprt->bc_pa_list, rq_bc_pa_list) {
199                 list_del(&rqst->rq_bc_pa_list);
200                 spin_unlock(&xprt->bc_pa_lock);
201
202                 rpcrdma_req_destroy(rpcr_to_rdmar(rqst));
203
204                 spin_lock(&xprt->bc_pa_lock);
205         }
206         spin_unlock(&xprt->bc_pa_lock);
207 }
208
209 /**
210  * xprt_rdma_bc_free_rqst - Release a backchannel rqst
211  * @rqst: request to release
212  */
213 void xprt_rdma_bc_free_rqst(struct rpc_rqst *rqst)
214 {
215         struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
216         struct rpc_xprt *xprt = rqst->rq_xprt;
217
218         rpcrdma_recv_buffer_put(req->rl_reply);
219         req->rl_reply = NULL;
220
221         spin_lock(&xprt->bc_pa_lock);
222         list_add_tail(&rqst->rq_bc_pa_list, &xprt->bc_pa_list);
223         spin_unlock(&xprt->bc_pa_lock);
224 }
225
226 /**
227  * rpcrdma_bc_receive_call - Handle a backward direction call
228  * @r_xprt: transport receiving the call
229  * @rep: receive buffer containing the call
230  *
231  * Operational assumptions:
232  *    o Backchannel credits are ignored, just as the NFS server
233  *      forechannel currently does
234  *    o The ULP manages a replay cache (eg, NFSv4.1 sessions).
235  *      No replay detection is done at the transport level
236  */
237 void rpcrdma_bc_receive_call(struct rpcrdma_xprt *r_xprt,
238                              struct rpcrdma_rep *rep)
239 {
240         struct rpc_xprt *xprt = &r_xprt->rx_xprt;
241         struct svc_serv *bc_serv;
242         struct rpcrdma_req *req;
243         struct rpc_rqst *rqst;
244         struct xdr_buf *buf;
245         size_t size;
246         __be32 *p;
247
248         p = xdr_inline_decode(&rep->rr_stream, 0);
249         size = xdr_stream_remaining(&rep->rr_stream);
250
251 #ifdef RPCRDMA_BACKCHANNEL_DEBUG
252         pr_info("RPC:       %s: callback XID %08x, length=%u\n",
253                 __func__, be32_to_cpup(p), size);
254         pr_info("RPC:       %s: %*ph\n", __func__, size, p);
255 #endif
256
257         /* Grab a free bc rqst */
258         spin_lock(&xprt->bc_pa_lock);
259         if (list_empty(&xprt->bc_pa_list)) {
260                 spin_unlock(&xprt->bc_pa_lock);
261                 goto out_overflow;
262         }
263         rqst = list_first_entry(&xprt->bc_pa_list,
264                                 struct rpc_rqst, rq_bc_pa_list);
265         list_del(&rqst->rq_bc_pa_list);
266         spin_unlock(&xprt->bc_pa_lock);
267
268         /* Prepare rqst */
269         rqst->rq_reply_bytes_recvd = 0;
270         rqst->rq_bytes_sent = 0;
271         rqst->rq_xid = *p;
272
273         rqst->rq_private_buf.len = size;
274
275         buf = &rqst->rq_rcv_buf;
276         memset(buf, 0, sizeof(*buf));
277         buf->head[0].iov_base = p;
278         buf->head[0].iov_len = size;
279         buf->len = size;
280
281         /* The receive buffer has to be hooked to the rpcrdma_req
282          * so that it is not released while the req is pointing
283          * to its buffer, and so that it can be reposted after
284          * the Upper Layer is done decoding it.
285          */
286         req = rpcr_to_rdmar(rqst);
287         req->rl_reply = rep;
288         trace_xprtrdma_cb_call(rqst);
289
290         /* Queue rqst for ULP's callback service */
291         bc_serv = xprt->bc_serv;
292         spin_lock(&bc_serv->sv_cb_lock);
293         list_add(&rqst->rq_bc_list, &bc_serv->sv_cb_list);
294         spin_unlock(&bc_serv->sv_cb_lock);
295
296         wake_up(&bc_serv->sv_cb_waitq);
297
298         r_xprt->rx_stats.bcall_count++;
299         return;
300
301 out_overflow:
302         pr_warn("RPC/RDMA backchannel overflow\n");
303         xprt_force_disconnect(xprt);
304         /* This receive buffer gets reposted automatically
305          * when the connection is re-established.
306          */
307         return;
308 }