OSDN Git Service

xprtrdma: Add "open" memreg op
[uclinux-h8/linux.git] / net / sunrpc / xprtrdma / verbs.c
1 /*
2  * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
3  *
4  * This software is available to you under a choice of one of two
5  * licenses.  You may choose to be licensed under the terms of the GNU
6  * General Public License (GPL) Version 2, available from the file
7  * COPYING in the main directory of this source tree, or the BSD-type
8  * license below:
9  *
10  * Redistribution and use in source and binary forms, with or without
11  * modification, are permitted provided that the following conditions
12  * are met:
13  *
14  *      Redistributions of source code must retain the above copyright
15  *      notice, this list of conditions and the following disclaimer.
16  *
17  *      Redistributions in binary form must reproduce the above
18  *      copyright notice, this list of conditions and the following
19  *      disclaimer in the documentation and/or other materials provided
20  *      with the distribution.
21  *
22  *      Neither the name of the Network Appliance, Inc. nor the names of
23  *      its contributors may be used to endorse or promote products
24  *      derived from this software without specific prior written
25  *      permission.
26  *
27  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
28  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
29  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
30  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
31  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
32  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
33  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
34  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
35  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
36  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
37  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
38  */
39
40 /*
41  * verbs.c
42  *
43  * Encapsulates the major functions managing:
44  *  o adapters
45  *  o endpoints
46  *  o connections
47  *  o buffer memory
48  */
49
50 #include <linux/interrupt.h>
51 #include <linux/slab.h>
52 #include <linux/prefetch.h>
53 #include <linux/sunrpc/addr.h>
54 #include <asm/bitops.h>
55
56 #include "xprt_rdma.h"
57
58 /*
59  * Globals/Macros
60  */
61
62 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
63 # define RPCDBG_FACILITY        RPCDBG_TRANS
64 #endif
65
66 /*
67  * internal functions
68  */
69
70 /*
71  * handle replies in tasklet context, using a single, global list
72  * rdma tasklet function -- just turn around and call the func
73  * for all replies on the list
74  */
75
76 static DEFINE_SPINLOCK(rpcrdma_tk_lock_g);
77 static LIST_HEAD(rpcrdma_tasklets_g);
78
79 static void
80 rpcrdma_run_tasklet(unsigned long data)
81 {
82         struct rpcrdma_rep *rep;
83         void (*func)(struct rpcrdma_rep *);
84         unsigned long flags;
85
86         data = data;
87         spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
88         while (!list_empty(&rpcrdma_tasklets_g)) {
89                 rep = list_entry(rpcrdma_tasklets_g.next,
90                                  struct rpcrdma_rep, rr_list);
91                 list_del(&rep->rr_list);
92                 func = rep->rr_func;
93                 rep->rr_func = NULL;
94                 spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
95
96                 if (func)
97                         func(rep);
98                 else
99                         rpcrdma_recv_buffer_put(rep);
100
101                 spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
102         }
103         spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
104 }
105
106 static DECLARE_TASKLET(rpcrdma_tasklet_g, rpcrdma_run_tasklet, 0UL);
107
108 static const char * const async_event[] = {
109         "CQ error",
110         "QP fatal error",
111         "QP request error",
112         "QP access error",
113         "communication established",
114         "send queue drained",
115         "path migration successful",
116         "path mig error",
117         "device fatal error",
118         "port active",
119         "port error",
120         "LID change",
121         "P_key change",
122         "SM change",
123         "SRQ error",
124         "SRQ limit reached",
125         "last WQE reached",
126         "client reregister",
127         "GID change",
128 };
129
130 #define ASYNC_MSG(status)                                       \
131         ((status) < ARRAY_SIZE(async_event) ?                   \
132                 async_event[(status)] : "unknown async error")
133
134 static void
135 rpcrdma_schedule_tasklet(struct list_head *sched_list)
136 {
137         unsigned long flags;
138
139         spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
140         list_splice_tail(sched_list, &rpcrdma_tasklets_g);
141         spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
142         tasklet_schedule(&rpcrdma_tasklet_g);
143 }
144
145 static void
146 rpcrdma_qp_async_error_upcall(struct ib_event *event, void *context)
147 {
148         struct rpcrdma_ep *ep = context;
149
150         pr_err("RPC:       %s: %s on device %s ep %p\n",
151                __func__, ASYNC_MSG(event->event),
152                 event->device->name, context);
153         if (ep->rep_connected == 1) {
154                 ep->rep_connected = -EIO;
155                 rpcrdma_conn_func(ep);
156                 wake_up_all(&ep->rep_connect_wait);
157         }
158 }
159
160 static void
161 rpcrdma_cq_async_error_upcall(struct ib_event *event, void *context)
162 {
163         struct rpcrdma_ep *ep = context;
164
165         pr_err("RPC:       %s: %s on device %s ep %p\n",
166                __func__, ASYNC_MSG(event->event),
167                 event->device->name, context);
168         if (ep->rep_connected == 1) {
169                 ep->rep_connected = -EIO;
170                 rpcrdma_conn_func(ep);
171                 wake_up_all(&ep->rep_connect_wait);
172         }
173 }
174
175 static const char * const wc_status[] = {
176         "success",
177         "local length error",
178         "local QP operation error",
179         "local EE context operation error",
180         "local protection error",
181         "WR flushed",
182         "memory management operation error",
183         "bad response error",
184         "local access error",
185         "remote invalid request error",
186         "remote access error",
187         "remote operation error",
188         "transport retry counter exceeded",
189         "RNR retrycounter exceeded",
190         "local RDD violation error",
191         "remove invalid RD request",
192         "operation aborted",
193         "invalid EE context number",
194         "invalid EE context state",
195         "fatal error",
196         "response timeout error",
197         "general error",
198 };
199
200 #define COMPLETION_MSG(status)                                  \
201         ((status) < ARRAY_SIZE(wc_status) ?                     \
202                 wc_status[(status)] : "unexpected completion error")
203
204 static void
205 rpcrdma_sendcq_process_wc(struct ib_wc *wc)
206 {
207         if (likely(wc->status == IB_WC_SUCCESS))
208                 return;
209
210         /* WARNING: Only wr_id and status are reliable at this point */
211         if (wc->wr_id == 0ULL) {
212                 if (wc->status != IB_WC_WR_FLUSH_ERR)
213                         pr_err("RPC:       %s: SEND: %s\n",
214                                __func__, COMPLETION_MSG(wc->status));
215         } else {
216                 struct rpcrdma_mw *r;
217
218                 r = (struct rpcrdma_mw *)(unsigned long)wc->wr_id;
219                 r->r.frmr.fr_state = FRMR_IS_STALE;
220                 pr_err("RPC:       %s: frmr %p (stale): %s\n",
221                        __func__, r, COMPLETION_MSG(wc->status));
222         }
223 }
224
225 static int
226 rpcrdma_sendcq_poll(struct ib_cq *cq, struct rpcrdma_ep *ep)
227 {
228         struct ib_wc *wcs;
229         int budget, count, rc;
230
231         budget = RPCRDMA_WC_BUDGET / RPCRDMA_POLLSIZE;
232         do {
233                 wcs = ep->rep_send_wcs;
234
235                 rc = ib_poll_cq(cq, RPCRDMA_POLLSIZE, wcs);
236                 if (rc <= 0)
237                         return rc;
238
239                 count = rc;
240                 while (count-- > 0)
241                         rpcrdma_sendcq_process_wc(wcs++);
242         } while (rc == RPCRDMA_POLLSIZE && --budget);
243         return 0;
244 }
245
246 /*
247  * Handle send, fast_reg_mr, and local_inv completions.
248  *
249  * Send events are typically suppressed and thus do not result
250  * in an upcall. Occasionally one is signaled, however. This
251  * prevents the provider's completion queue from wrapping and
252  * losing a completion.
253  */
254 static void
255 rpcrdma_sendcq_upcall(struct ib_cq *cq, void *cq_context)
256 {
257         struct rpcrdma_ep *ep = (struct rpcrdma_ep *)cq_context;
258         int rc;
259
260         rc = rpcrdma_sendcq_poll(cq, ep);
261         if (rc) {
262                 dprintk("RPC:       %s: ib_poll_cq failed: %i\n",
263                         __func__, rc);
264                 return;
265         }
266
267         rc = ib_req_notify_cq(cq,
268                         IB_CQ_NEXT_COMP | IB_CQ_REPORT_MISSED_EVENTS);
269         if (rc == 0)
270                 return;
271         if (rc < 0) {
272                 dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
273                         __func__, rc);
274                 return;
275         }
276
277         rpcrdma_sendcq_poll(cq, ep);
278 }
279
280 static void
281 rpcrdma_recvcq_process_wc(struct ib_wc *wc, struct list_head *sched_list)
282 {
283         struct rpcrdma_rep *rep =
284                         (struct rpcrdma_rep *)(unsigned long)wc->wr_id;
285
286         /* WARNING: Only wr_id and status are reliable at this point */
287         if (wc->status != IB_WC_SUCCESS)
288                 goto out_fail;
289
290         /* status == SUCCESS means all fields in wc are trustworthy */
291         if (wc->opcode != IB_WC_RECV)
292                 return;
293
294         dprintk("RPC:       %s: rep %p opcode 'recv', length %u: success\n",
295                 __func__, rep, wc->byte_len);
296
297         rep->rr_len = wc->byte_len;
298         ib_dma_sync_single_for_cpu(rdmab_to_ia(rep->rr_buffer)->ri_id->device,
299                                    rdmab_addr(rep->rr_rdmabuf),
300                                    rep->rr_len, DMA_FROM_DEVICE);
301         prefetch(rdmab_to_msg(rep->rr_rdmabuf));
302
303 out_schedule:
304         list_add_tail(&rep->rr_list, sched_list);
305         return;
306 out_fail:
307         if (wc->status != IB_WC_WR_FLUSH_ERR)
308                 pr_err("RPC:       %s: rep %p: %s\n",
309                        __func__, rep, COMPLETION_MSG(wc->status));
310         rep->rr_len = ~0U;
311         goto out_schedule;
312 }
313
314 static int
315 rpcrdma_recvcq_poll(struct ib_cq *cq, struct rpcrdma_ep *ep)
316 {
317         struct list_head sched_list;
318         struct ib_wc *wcs;
319         int budget, count, rc;
320
321         INIT_LIST_HEAD(&sched_list);
322         budget = RPCRDMA_WC_BUDGET / RPCRDMA_POLLSIZE;
323         do {
324                 wcs = ep->rep_recv_wcs;
325
326                 rc = ib_poll_cq(cq, RPCRDMA_POLLSIZE, wcs);
327                 if (rc <= 0)
328                         goto out_schedule;
329
330                 count = rc;
331                 while (count-- > 0)
332                         rpcrdma_recvcq_process_wc(wcs++, &sched_list);
333         } while (rc == RPCRDMA_POLLSIZE && --budget);
334         rc = 0;
335
336 out_schedule:
337         rpcrdma_schedule_tasklet(&sched_list);
338         return rc;
339 }
340
341 /*
342  * Handle receive completions.
343  *
344  * It is reentrant but processes single events in order to maintain
345  * ordering of receives to keep server credits.
346  *
347  * It is the responsibility of the scheduled tasklet to return
348  * recv buffers to the pool. NOTE: this affects synchronization of
349  * connection shutdown. That is, the structures required for
350  * the completion of the reply handler must remain intact until
351  * all memory has been reclaimed.
352  */
353 static void
354 rpcrdma_recvcq_upcall(struct ib_cq *cq, void *cq_context)
355 {
356         struct rpcrdma_ep *ep = (struct rpcrdma_ep *)cq_context;
357         int rc;
358
359         rc = rpcrdma_recvcq_poll(cq, ep);
360         if (rc) {
361                 dprintk("RPC:       %s: ib_poll_cq failed: %i\n",
362                         __func__, rc);
363                 return;
364         }
365
366         rc = ib_req_notify_cq(cq,
367                         IB_CQ_NEXT_COMP | IB_CQ_REPORT_MISSED_EVENTS);
368         if (rc == 0)
369                 return;
370         if (rc < 0) {
371                 dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
372                         __func__, rc);
373                 return;
374         }
375
376         rpcrdma_recvcq_poll(cq, ep);
377 }
378
379 static void
380 rpcrdma_flush_cqs(struct rpcrdma_ep *ep)
381 {
382         struct ib_wc wc;
383         LIST_HEAD(sched_list);
384
385         while (ib_poll_cq(ep->rep_attr.recv_cq, 1, &wc) > 0)
386                 rpcrdma_recvcq_process_wc(&wc, &sched_list);
387         if (!list_empty(&sched_list))
388                 rpcrdma_schedule_tasklet(&sched_list);
389         while (ib_poll_cq(ep->rep_attr.send_cq, 1, &wc) > 0)
390                 rpcrdma_sendcq_process_wc(&wc);
391 }
392
393 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
394 static const char * const conn[] = {
395         "address resolved",
396         "address error",
397         "route resolved",
398         "route error",
399         "connect request",
400         "connect response",
401         "connect error",
402         "unreachable",
403         "rejected",
404         "established",
405         "disconnected",
406         "device removal",
407         "multicast join",
408         "multicast error",
409         "address change",
410         "timewait exit",
411 };
412
413 #define CONNECTION_MSG(status)                                          \
414         ((status) < ARRAY_SIZE(conn) ?                                  \
415                 conn[(status)] : "unrecognized connection error")
416 #endif
417
418 static int
419 rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
420 {
421         struct rpcrdma_xprt *xprt = id->context;
422         struct rpcrdma_ia *ia = &xprt->rx_ia;
423         struct rpcrdma_ep *ep = &xprt->rx_ep;
424 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
425         struct sockaddr *sap = (struct sockaddr *)&ep->rep_remote_addr;
426 #endif
427         struct ib_qp_attr *attr = &ia->ri_qp_attr;
428         struct ib_qp_init_attr *iattr = &ia->ri_qp_init_attr;
429         int connstate = 0;
430
431         switch (event->event) {
432         case RDMA_CM_EVENT_ADDR_RESOLVED:
433         case RDMA_CM_EVENT_ROUTE_RESOLVED:
434                 ia->ri_async_rc = 0;
435                 complete(&ia->ri_done);
436                 break;
437         case RDMA_CM_EVENT_ADDR_ERROR:
438                 ia->ri_async_rc = -EHOSTUNREACH;
439                 dprintk("RPC:       %s: CM address resolution error, ep 0x%p\n",
440                         __func__, ep);
441                 complete(&ia->ri_done);
442                 break;
443         case RDMA_CM_EVENT_ROUTE_ERROR:
444                 ia->ri_async_rc = -ENETUNREACH;
445                 dprintk("RPC:       %s: CM route resolution error, ep 0x%p\n",
446                         __func__, ep);
447                 complete(&ia->ri_done);
448                 break;
449         case RDMA_CM_EVENT_ESTABLISHED:
450                 connstate = 1;
451                 ib_query_qp(ia->ri_id->qp, attr,
452                             IB_QP_MAX_QP_RD_ATOMIC | IB_QP_MAX_DEST_RD_ATOMIC,
453                             iattr);
454                 dprintk("RPC:       %s: %d responder resources"
455                         " (%d initiator)\n",
456                         __func__, attr->max_dest_rd_atomic,
457                         attr->max_rd_atomic);
458                 goto connected;
459         case RDMA_CM_EVENT_CONNECT_ERROR:
460                 connstate = -ENOTCONN;
461                 goto connected;
462         case RDMA_CM_EVENT_UNREACHABLE:
463                 connstate = -ENETDOWN;
464                 goto connected;
465         case RDMA_CM_EVENT_REJECTED:
466                 connstate = -ECONNREFUSED;
467                 goto connected;
468         case RDMA_CM_EVENT_DISCONNECTED:
469                 connstate = -ECONNABORTED;
470                 goto connected;
471         case RDMA_CM_EVENT_DEVICE_REMOVAL:
472                 connstate = -ENODEV;
473 connected:
474                 dprintk("RPC:       %s: %sconnected\n",
475                                         __func__, connstate > 0 ? "" : "dis");
476                 ep->rep_connected = connstate;
477                 rpcrdma_conn_func(ep);
478                 wake_up_all(&ep->rep_connect_wait);
479                 /*FALLTHROUGH*/
480         default:
481                 dprintk("RPC:       %s: %pIS:%u (ep 0x%p): %s\n",
482                         __func__, sap, rpc_get_port(sap), ep,
483                         CONNECTION_MSG(event->event));
484                 break;
485         }
486
487 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
488         if (connstate == 1) {
489                 int ird = attr->max_dest_rd_atomic;
490                 int tird = ep->rep_remote_cma.responder_resources;
491
492                 pr_info("rpcrdma: connection to %pIS:%u on %s, memreg '%s', %d credits, %d responders%s\n",
493                         sap, rpc_get_port(sap),
494                         ia->ri_id->device->name,
495                         ia->ri_ops->ro_displayname,
496                         xprt->rx_buf.rb_max_requests,
497                         ird, ird < 4 && ird < tird / 2 ? " (low!)" : "");
498         } else if (connstate < 0) {
499                 pr_info("rpcrdma: connection to %pIS:%u closed (%d)\n",
500                         sap, rpc_get_port(sap), connstate);
501         }
502 #endif
503
504         return 0;
505 }
506
507 static struct rdma_cm_id *
508 rpcrdma_create_id(struct rpcrdma_xprt *xprt,
509                         struct rpcrdma_ia *ia, struct sockaddr *addr)
510 {
511         struct rdma_cm_id *id;
512         int rc;
513
514         init_completion(&ia->ri_done);
515
516         id = rdma_create_id(rpcrdma_conn_upcall, xprt, RDMA_PS_TCP, IB_QPT_RC);
517         if (IS_ERR(id)) {
518                 rc = PTR_ERR(id);
519                 dprintk("RPC:       %s: rdma_create_id() failed %i\n",
520                         __func__, rc);
521                 return id;
522         }
523
524         ia->ri_async_rc = -ETIMEDOUT;
525         rc = rdma_resolve_addr(id, NULL, addr, RDMA_RESOLVE_TIMEOUT);
526         if (rc) {
527                 dprintk("RPC:       %s: rdma_resolve_addr() failed %i\n",
528                         __func__, rc);
529                 goto out;
530         }
531         wait_for_completion_interruptible_timeout(&ia->ri_done,
532                                 msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1);
533         rc = ia->ri_async_rc;
534         if (rc)
535                 goto out;
536
537         ia->ri_async_rc = -ETIMEDOUT;
538         rc = rdma_resolve_route(id, RDMA_RESOLVE_TIMEOUT);
539         if (rc) {
540                 dprintk("RPC:       %s: rdma_resolve_route() failed %i\n",
541                         __func__, rc);
542                 goto out;
543         }
544         wait_for_completion_interruptible_timeout(&ia->ri_done,
545                                 msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1);
546         rc = ia->ri_async_rc;
547         if (rc)
548                 goto out;
549
550         return id;
551
552 out:
553         rdma_destroy_id(id);
554         return ERR_PTR(rc);
555 }
556
557 /*
558  * Drain any cq, prior to teardown.
559  */
560 static void
561 rpcrdma_clean_cq(struct ib_cq *cq)
562 {
563         struct ib_wc wc;
564         int count = 0;
565
566         while (1 == ib_poll_cq(cq, 1, &wc))
567                 ++count;
568
569         if (count)
570                 dprintk("RPC:       %s: flushed %d events (last 0x%x)\n",
571                         __func__, count, wc.opcode);
572 }
573
574 /*
575  * Exported functions.
576  */
577
578 /*
579  * Open and initialize an Interface Adapter.
580  *  o initializes fields of struct rpcrdma_ia, including
581  *    interface and provider attributes and protection zone.
582  */
583 int
584 rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)
585 {
586         int rc, mem_priv;
587         struct rpcrdma_ia *ia = &xprt->rx_ia;
588         struct ib_device_attr *devattr = &ia->ri_devattr;
589
590         ia->ri_id = rpcrdma_create_id(xprt, ia, addr);
591         if (IS_ERR(ia->ri_id)) {
592                 rc = PTR_ERR(ia->ri_id);
593                 goto out1;
594         }
595
596         ia->ri_pd = ib_alloc_pd(ia->ri_id->device);
597         if (IS_ERR(ia->ri_pd)) {
598                 rc = PTR_ERR(ia->ri_pd);
599                 dprintk("RPC:       %s: ib_alloc_pd() failed %i\n",
600                         __func__, rc);
601                 goto out2;
602         }
603
604         rc = ib_query_device(ia->ri_id->device, devattr);
605         if (rc) {
606                 dprintk("RPC:       %s: ib_query_device failed %d\n",
607                         __func__, rc);
608                 goto out3;
609         }
610
611         if (devattr->device_cap_flags & IB_DEVICE_LOCAL_DMA_LKEY) {
612                 ia->ri_have_dma_lkey = 1;
613                 ia->ri_dma_lkey = ia->ri_id->device->local_dma_lkey;
614         }
615
616         if (memreg == RPCRDMA_FRMR) {
617                 /* Requires both frmr reg and local dma lkey */
618                 if (((devattr->device_cap_flags &
619                      (IB_DEVICE_MEM_MGT_EXTENSIONS|IB_DEVICE_LOCAL_DMA_LKEY)) !=
620                     (IB_DEVICE_MEM_MGT_EXTENSIONS|IB_DEVICE_LOCAL_DMA_LKEY)) ||
621                       (devattr->max_fast_reg_page_list_len == 0)) {
622                         dprintk("RPC:       %s: FRMR registration "
623                                 "not supported by HCA\n", __func__);
624                         memreg = RPCRDMA_MTHCAFMR;
625                 }
626         }
627         if (memreg == RPCRDMA_MTHCAFMR) {
628                 if (!ia->ri_id->device->alloc_fmr) {
629                         dprintk("RPC:       %s: MTHCAFMR registration "
630                                 "not supported by HCA\n", __func__);
631                         memreg = RPCRDMA_ALLPHYSICAL;
632                 }
633         }
634
635         /*
636          * Optionally obtain an underlying physical identity mapping in
637          * order to do a memory window-based bind. This base registration
638          * is protected from remote access - that is enabled only by binding
639          * for the specific bytes targeted during each RPC operation, and
640          * revoked after the corresponding completion similar to a storage
641          * adapter.
642          */
643         switch (memreg) {
644         case RPCRDMA_FRMR:
645                 ia->ri_ops = &rpcrdma_frwr_memreg_ops;
646                 break;
647         case RPCRDMA_ALLPHYSICAL:
648                 ia->ri_ops = &rpcrdma_physical_memreg_ops;
649                 mem_priv = IB_ACCESS_LOCAL_WRITE |
650                                 IB_ACCESS_REMOTE_WRITE |
651                                 IB_ACCESS_REMOTE_READ;
652                 goto register_setup;
653         case RPCRDMA_MTHCAFMR:
654                 ia->ri_ops = &rpcrdma_fmr_memreg_ops;
655                 if (ia->ri_have_dma_lkey)
656                         break;
657                 mem_priv = IB_ACCESS_LOCAL_WRITE;
658         register_setup:
659                 ia->ri_bind_mem = ib_get_dma_mr(ia->ri_pd, mem_priv);
660                 if (IS_ERR(ia->ri_bind_mem)) {
661                         printk(KERN_ALERT "%s: ib_get_dma_mr for "
662                                 "phys register failed with %lX\n",
663                                 __func__, PTR_ERR(ia->ri_bind_mem));
664                         rc = -ENOMEM;
665                         goto out3;
666                 }
667                 break;
668         default:
669                 printk(KERN_ERR "RPC: Unsupported memory "
670                                 "registration mode: %d\n", memreg);
671                 rc = -ENOMEM;
672                 goto out3;
673         }
674         dprintk("RPC:       %s: memory registration strategy is '%s'\n",
675                 __func__, ia->ri_ops->ro_displayname);
676
677         /* Else will do memory reg/dereg for each chunk */
678         ia->ri_memreg_strategy = memreg;
679
680         rwlock_init(&ia->ri_qplock);
681         return 0;
682
683 out3:
684         ib_dealloc_pd(ia->ri_pd);
685         ia->ri_pd = NULL;
686 out2:
687         rdma_destroy_id(ia->ri_id);
688         ia->ri_id = NULL;
689 out1:
690         return rc;
691 }
692
693 /*
694  * Clean up/close an IA.
695  *   o if event handles and PD have been initialized, free them.
696  *   o close the IA
697  */
698 void
699 rpcrdma_ia_close(struct rpcrdma_ia *ia)
700 {
701         int rc;
702
703         dprintk("RPC:       %s: entering\n", __func__);
704         if (ia->ri_bind_mem != NULL) {
705                 rc = ib_dereg_mr(ia->ri_bind_mem);
706                 dprintk("RPC:       %s: ib_dereg_mr returned %i\n",
707                         __func__, rc);
708         }
709         if (ia->ri_id != NULL && !IS_ERR(ia->ri_id)) {
710                 if (ia->ri_id->qp)
711                         rdma_destroy_qp(ia->ri_id);
712                 rdma_destroy_id(ia->ri_id);
713                 ia->ri_id = NULL;
714         }
715         if (ia->ri_pd != NULL && !IS_ERR(ia->ri_pd)) {
716                 rc = ib_dealloc_pd(ia->ri_pd);
717                 dprintk("RPC:       %s: ib_dealloc_pd returned %i\n",
718                         __func__, rc);
719         }
720 }
721
722 /*
723  * Create unconnected endpoint.
724  */
725 int
726 rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
727                                 struct rpcrdma_create_data_internal *cdata)
728 {
729         struct ib_device_attr *devattr = &ia->ri_devattr;
730         struct ib_cq *sendcq, *recvcq;
731         int rc, err;
732
733         /* check provider's send/recv wr limits */
734         if (cdata->max_requests > devattr->max_qp_wr)
735                 cdata->max_requests = devattr->max_qp_wr;
736
737         ep->rep_attr.event_handler = rpcrdma_qp_async_error_upcall;
738         ep->rep_attr.qp_context = ep;
739         ep->rep_attr.srq = NULL;
740         ep->rep_attr.cap.max_send_wr = cdata->max_requests;
741         rc = ia->ri_ops->ro_open(ia, ep, cdata);
742         if (rc)
743                 return rc;
744         ep->rep_attr.cap.max_recv_wr = cdata->max_requests;
745         ep->rep_attr.cap.max_send_sge = (cdata->padding ? 4 : 2);
746         ep->rep_attr.cap.max_recv_sge = 1;
747         ep->rep_attr.cap.max_inline_data = 0;
748         ep->rep_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
749         ep->rep_attr.qp_type = IB_QPT_RC;
750         ep->rep_attr.port_num = ~0;
751
752         if (cdata->padding) {
753                 ep->rep_padbuf = rpcrdma_alloc_regbuf(ia, cdata->padding,
754                                                       GFP_KERNEL);
755                 if (IS_ERR(ep->rep_padbuf))
756                         return PTR_ERR(ep->rep_padbuf);
757         } else
758                 ep->rep_padbuf = NULL;
759
760         dprintk("RPC:       %s: requested max: dtos: send %d recv %d; "
761                 "iovs: send %d recv %d\n",
762                 __func__,
763                 ep->rep_attr.cap.max_send_wr,
764                 ep->rep_attr.cap.max_recv_wr,
765                 ep->rep_attr.cap.max_send_sge,
766                 ep->rep_attr.cap.max_recv_sge);
767
768         /* set trigger for requesting send completion */
769         ep->rep_cqinit = ep->rep_attr.cap.max_send_wr/2 - 1;
770         if (ep->rep_cqinit > RPCRDMA_MAX_UNSIGNALED_SENDS)
771                 ep->rep_cqinit = RPCRDMA_MAX_UNSIGNALED_SENDS;
772         else if (ep->rep_cqinit <= 2)
773                 ep->rep_cqinit = 0;
774         INIT_CQCOUNT(ep);
775         init_waitqueue_head(&ep->rep_connect_wait);
776         INIT_DELAYED_WORK(&ep->rep_connect_worker, rpcrdma_connect_worker);
777
778         sendcq = ib_create_cq(ia->ri_id->device, rpcrdma_sendcq_upcall,
779                                   rpcrdma_cq_async_error_upcall, ep,
780                                   ep->rep_attr.cap.max_send_wr + 1, 0);
781         if (IS_ERR(sendcq)) {
782                 rc = PTR_ERR(sendcq);
783                 dprintk("RPC:       %s: failed to create send CQ: %i\n",
784                         __func__, rc);
785                 goto out1;
786         }
787
788         rc = ib_req_notify_cq(sendcq, IB_CQ_NEXT_COMP);
789         if (rc) {
790                 dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
791                         __func__, rc);
792                 goto out2;
793         }
794
795         recvcq = ib_create_cq(ia->ri_id->device, rpcrdma_recvcq_upcall,
796                                   rpcrdma_cq_async_error_upcall, ep,
797                                   ep->rep_attr.cap.max_recv_wr + 1, 0);
798         if (IS_ERR(recvcq)) {
799                 rc = PTR_ERR(recvcq);
800                 dprintk("RPC:       %s: failed to create recv CQ: %i\n",
801                         __func__, rc);
802                 goto out2;
803         }
804
805         rc = ib_req_notify_cq(recvcq, IB_CQ_NEXT_COMP);
806         if (rc) {
807                 dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
808                         __func__, rc);
809                 ib_destroy_cq(recvcq);
810                 goto out2;
811         }
812
813         ep->rep_attr.send_cq = sendcq;
814         ep->rep_attr.recv_cq = recvcq;
815
816         /* Initialize cma parameters */
817
818         /* RPC/RDMA does not use private data */
819         ep->rep_remote_cma.private_data = NULL;
820         ep->rep_remote_cma.private_data_len = 0;
821
822         /* Client offers RDMA Read but does not initiate */
823         ep->rep_remote_cma.initiator_depth = 0;
824         if (devattr->max_qp_rd_atom > 32)       /* arbitrary but <= 255 */
825                 ep->rep_remote_cma.responder_resources = 32;
826         else
827                 ep->rep_remote_cma.responder_resources =
828                                                 devattr->max_qp_rd_atom;
829
830         ep->rep_remote_cma.retry_count = 7;
831         ep->rep_remote_cma.flow_control = 0;
832         ep->rep_remote_cma.rnr_retry_count = 0;
833
834         return 0;
835
836 out2:
837         err = ib_destroy_cq(sendcq);
838         if (err)
839                 dprintk("RPC:       %s: ib_destroy_cq returned %i\n",
840                         __func__, err);
841 out1:
842         rpcrdma_free_regbuf(ia, ep->rep_padbuf);
843         return rc;
844 }
845
846 /*
847  * rpcrdma_ep_destroy
848  *
849  * Disconnect and destroy endpoint. After this, the only
850  * valid operations on the ep are to free it (if dynamically
851  * allocated) or re-create it.
852  */
853 void
854 rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
855 {
856         int rc;
857
858         dprintk("RPC:       %s: entering, connected is %d\n",
859                 __func__, ep->rep_connected);
860
861         cancel_delayed_work_sync(&ep->rep_connect_worker);
862
863         if (ia->ri_id->qp) {
864                 rpcrdma_ep_disconnect(ep, ia);
865                 rdma_destroy_qp(ia->ri_id);
866                 ia->ri_id->qp = NULL;
867         }
868
869         rpcrdma_free_regbuf(ia, ep->rep_padbuf);
870
871         rpcrdma_clean_cq(ep->rep_attr.recv_cq);
872         rc = ib_destroy_cq(ep->rep_attr.recv_cq);
873         if (rc)
874                 dprintk("RPC:       %s: ib_destroy_cq returned %i\n",
875                         __func__, rc);
876
877         rpcrdma_clean_cq(ep->rep_attr.send_cq);
878         rc = ib_destroy_cq(ep->rep_attr.send_cq);
879         if (rc)
880                 dprintk("RPC:       %s: ib_destroy_cq returned %i\n",
881                         __func__, rc);
882 }
883
884 /*
885  * Connect unconnected endpoint.
886  */
887 int
888 rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
889 {
890         struct rdma_cm_id *id, *old;
891         int rc = 0;
892         int retry_count = 0;
893
894         if (ep->rep_connected != 0) {
895                 struct rpcrdma_xprt *xprt;
896 retry:
897                 dprintk("RPC:       %s: reconnecting...\n", __func__);
898
899                 rpcrdma_ep_disconnect(ep, ia);
900                 rpcrdma_flush_cqs(ep);
901
902                 xprt = container_of(ia, struct rpcrdma_xprt, rx_ia);
903                 ia->ri_ops->ro_reset(xprt);
904
905                 id = rpcrdma_create_id(xprt, ia,
906                                 (struct sockaddr *)&xprt->rx_data.addr);
907                 if (IS_ERR(id)) {
908                         rc = -EHOSTUNREACH;
909                         goto out;
910                 }
911                 /* TEMP TEMP TEMP - fail if new device:
912                  * Deregister/remarshal *all* requests!
913                  * Close and recreate adapter, pd, etc!
914                  * Re-determine all attributes still sane!
915                  * More stuff I haven't thought of!
916                  * Rrrgh!
917                  */
918                 if (ia->ri_id->device != id->device) {
919                         printk("RPC:       %s: can't reconnect on "
920                                 "different device!\n", __func__);
921                         rdma_destroy_id(id);
922                         rc = -ENETUNREACH;
923                         goto out;
924                 }
925                 /* END TEMP */
926                 rc = rdma_create_qp(id, ia->ri_pd, &ep->rep_attr);
927                 if (rc) {
928                         dprintk("RPC:       %s: rdma_create_qp failed %i\n",
929                                 __func__, rc);
930                         rdma_destroy_id(id);
931                         rc = -ENETUNREACH;
932                         goto out;
933                 }
934
935                 write_lock(&ia->ri_qplock);
936                 old = ia->ri_id;
937                 ia->ri_id = id;
938                 write_unlock(&ia->ri_qplock);
939
940                 rdma_destroy_qp(old);
941                 rdma_destroy_id(old);
942         } else {
943                 dprintk("RPC:       %s: connecting...\n", __func__);
944                 rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr);
945                 if (rc) {
946                         dprintk("RPC:       %s: rdma_create_qp failed %i\n",
947                                 __func__, rc);
948                         /* do not update ep->rep_connected */
949                         return -ENETUNREACH;
950                 }
951         }
952
953         ep->rep_connected = 0;
954
955         rc = rdma_connect(ia->ri_id, &ep->rep_remote_cma);
956         if (rc) {
957                 dprintk("RPC:       %s: rdma_connect() failed with %i\n",
958                                 __func__, rc);
959                 goto out;
960         }
961
962         wait_event_interruptible(ep->rep_connect_wait, ep->rep_connected != 0);
963
964         /*
965          * Check state. A non-peer reject indicates no listener
966          * (ECONNREFUSED), which may be a transient state. All
967          * others indicate a transport condition which has already
968          * undergone a best-effort.
969          */
970         if (ep->rep_connected == -ECONNREFUSED &&
971             ++retry_count <= RDMA_CONNECT_RETRY_MAX) {
972                 dprintk("RPC:       %s: non-peer_reject, retry\n", __func__);
973                 goto retry;
974         }
975         if (ep->rep_connected <= 0) {
976                 /* Sometimes, the only way to reliably connect to remote
977                  * CMs is to use same nonzero values for ORD and IRD. */
978                 if (retry_count++ <= RDMA_CONNECT_RETRY_MAX + 1 &&
979                     (ep->rep_remote_cma.responder_resources == 0 ||
980                      ep->rep_remote_cma.initiator_depth !=
981                                 ep->rep_remote_cma.responder_resources)) {
982                         if (ep->rep_remote_cma.responder_resources == 0)
983                                 ep->rep_remote_cma.responder_resources = 1;
984                         ep->rep_remote_cma.initiator_depth =
985                                 ep->rep_remote_cma.responder_resources;
986                         goto retry;
987                 }
988                 rc = ep->rep_connected;
989         } else {
990                 dprintk("RPC:       %s: connected\n", __func__);
991         }
992
993 out:
994         if (rc)
995                 ep->rep_connected = rc;
996         return rc;
997 }
998
999 /*
1000  * rpcrdma_ep_disconnect
1001  *
1002  * This is separate from destroy to facilitate the ability
1003  * to reconnect without recreating the endpoint.
1004  *
1005  * This call is not reentrant, and must not be made in parallel
1006  * on the same endpoint.
1007  */
1008 void
1009 rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
1010 {
1011         int rc;
1012
1013         rpcrdma_flush_cqs(ep);
1014         rc = rdma_disconnect(ia->ri_id);
1015         if (!rc) {
1016                 /* returns without wait if not connected */
1017                 wait_event_interruptible(ep->rep_connect_wait,
1018                                                         ep->rep_connected != 1);
1019                 dprintk("RPC:       %s: after wait, %sconnected\n", __func__,
1020                         (ep->rep_connected == 1) ? "still " : "dis");
1021         } else {
1022                 dprintk("RPC:       %s: rdma_disconnect %i\n", __func__, rc);
1023                 ep->rep_connected = rc;
1024         }
1025 }
1026
1027 static struct rpcrdma_req *
1028 rpcrdma_create_req(struct rpcrdma_xprt *r_xprt)
1029 {
1030         struct rpcrdma_req *req;
1031
1032         req = kzalloc(sizeof(*req), GFP_KERNEL);
1033         if (req == NULL)
1034                 return ERR_PTR(-ENOMEM);
1035
1036         req->rl_buffer = &r_xprt->rx_buf;
1037         return req;
1038 }
1039
1040 static struct rpcrdma_rep *
1041 rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt)
1042 {
1043         struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
1044         struct rpcrdma_ia *ia = &r_xprt->rx_ia;
1045         struct rpcrdma_rep *rep;
1046         int rc;
1047
1048         rc = -ENOMEM;
1049         rep = kzalloc(sizeof(*rep), GFP_KERNEL);
1050         if (rep == NULL)
1051                 goto out;
1052
1053         rep->rr_rdmabuf = rpcrdma_alloc_regbuf(ia, cdata->inline_rsize,
1054                                                GFP_KERNEL);
1055         if (IS_ERR(rep->rr_rdmabuf)) {
1056                 rc = PTR_ERR(rep->rr_rdmabuf);
1057                 goto out_free;
1058         }
1059
1060         rep->rr_buffer = &r_xprt->rx_buf;
1061         return rep;
1062
1063 out_free:
1064         kfree(rep);
1065 out:
1066         return ERR_PTR(rc);
1067 }
1068
1069 int
1070 rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
1071 {
1072         struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
1073         struct rpcrdma_ia *ia = &r_xprt->rx_ia;
1074         struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
1075         char *p;
1076         size_t len;
1077         int i, rc;
1078
1079         buf->rb_max_requests = cdata->max_requests;
1080         spin_lock_init(&buf->rb_lock);
1081
1082         /* Need to allocate:
1083          *   1.  arrays for send and recv pointers
1084          *   2.  arrays of struct rpcrdma_req to fill in pointers
1085          *   3.  array of struct rpcrdma_rep for replies
1086          * Send/recv buffers in req/rep need to be registered
1087          */
1088         len = buf->rb_max_requests *
1089                 (sizeof(struct rpcrdma_req *) + sizeof(struct rpcrdma_rep *));
1090
1091         p = kzalloc(len, GFP_KERNEL);
1092         if (p == NULL) {
1093                 dprintk("RPC:       %s: req_t/rep_t/pad kzalloc(%zd) failed\n",
1094                         __func__, len);
1095                 rc = -ENOMEM;
1096                 goto out;
1097         }
1098         buf->rb_pool = p;       /* for freeing it later */
1099
1100         buf->rb_send_bufs = (struct rpcrdma_req **) p;
1101         p = (char *) &buf->rb_send_bufs[buf->rb_max_requests];
1102         buf->rb_recv_bufs = (struct rpcrdma_rep **) p;
1103         p = (char *) &buf->rb_recv_bufs[buf->rb_max_requests];
1104
1105         rc = ia->ri_ops->ro_init(r_xprt);
1106         if (rc)
1107                 goto out;
1108
1109         for (i = 0; i < buf->rb_max_requests; i++) {
1110                 struct rpcrdma_req *req;
1111                 struct rpcrdma_rep *rep;
1112
1113                 req = rpcrdma_create_req(r_xprt);
1114                 if (IS_ERR(req)) {
1115                         dprintk("RPC:       %s: request buffer %d alloc"
1116                                 " failed\n", __func__, i);
1117                         rc = PTR_ERR(req);
1118                         goto out;
1119                 }
1120                 buf->rb_send_bufs[i] = req;
1121
1122                 rep = rpcrdma_create_rep(r_xprt);
1123                 if (IS_ERR(rep)) {
1124                         dprintk("RPC:       %s: reply buffer %d alloc failed\n",
1125                                 __func__, i);
1126                         rc = PTR_ERR(rep);
1127                         goto out;
1128                 }
1129                 buf->rb_recv_bufs[i] = rep;
1130         }
1131
1132         return 0;
1133 out:
1134         rpcrdma_buffer_destroy(buf);
1135         return rc;
1136 }
1137
1138 static void
1139 rpcrdma_destroy_rep(struct rpcrdma_ia *ia, struct rpcrdma_rep *rep)
1140 {
1141         if (!rep)
1142                 return;
1143
1144         rpcrdma_free_regbuf(ia, rep->rr_rdmabuf);
1145         kfree(rep);
1146 }
1147
1148 static void
1149 rpcrdma_destroy_req(struct rpcrdma_ia *ia, struct rpcrdma_req *req)
1150 {
1151         if (!req)
1152                 return;
1153
1154         rpcrdma_free_regbuf(ia, req->rl_sendbuf);
1155         rpcrdma_free_regbuf(ia, req->rl_rdmabuf);
1156         kfree(req);
1157 }
1158
1159 void
1160 rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
1161 {
1162         struct rpcrdma_ia *ia = rdmab_to_ia(buf);
1163         int i;
1164
1165         /* clean up in reverse order from create
1166          *   1.  recv mr memory (mr free, then kfree)
1167          *   2.  send mr memory (mr free, then kfree)
1168          *   3.  MWs
1169          */
1170         dprintk("RPC:       %s: entering\n", __func__);
1171
1172         for (i = 0; i < buf->rb_max_requests; i++) {
1173                 if (buf->rb_recv_bufs)
1174                         rpcrdma_destroy_rep(ia, buf->rb_recv_bufs[i]);
1175                 if (buf->rb_send_bufs)
1176                         rpcrdma_destroy_req(ia, buf->rb_send_bufs[i]);
1177         }
1178
1179         ia->ri_ops->ro_destroy(buf);
1180
1181         kfree(buf->rb_pool);
1182 }
1183
1184 /* "*mw" can be NULL when rpcrdma_buffer_get_mrs() fails, leaving
1185  * some req segments uninitialized.
1186  */
1187 static void
1188 rpcrdma_buffer_put_mr(struct rpcrdma_mw **mw, struct rpcrdma_buffer *buf)
1189 {
1190         if (*mw) {
1191                 list_add_tail(&(*mw)->mw_list, &buf->rb_mws);
1192                 *mw = NULL;
1193         }
1194 }
1195
1196 /* Cycle mw's back in reverse order, and "spin" them.
1197  * This delays and scrambles reuse as much as possible.
1198  */
1199 static void
1200 rpcrdma_buffer_put_mrs(struct rpcrdma_req *req, struct rpcrdma_buffer *buf)
1201 {
1202         struct rpcrdma_mr_seg *seg = req->rl_segments;
1203         struct rpcrdma_mr_seg *seg1 = seg;
1204         int i;
1205
1206         for (i = 1, seg++; i < RPCRDMA_MAX_SEGS; seg++, i++)
1207                 rpcrdma_buffer_put_mr(&seg->rl_mw, buf);
1208         rpcrdma_buffer_put_mr(&seg1->rl_mw, buf);
1209 }
1210
1211 static void
1212 rpcrdma_buffer_put_sendbuf(struct rpcrdma_req *req, struct rpcrdma_buffer *buf)
1213 {
1214         buf->rb_send_bufs[--buf->rb_send_index] = req;
1215         req->rl_niovs = 0;
1216         if (req->rl_reply) {
1217                 buf->rb_recv_bufs[--buf->rb_recv_index] = req->rl_reply;
1218                 req->rl_reply->rr_func = NULL;
1219                 req->rl_reply = NULL;
1220         }
1221 }
1222
1223 /* rpcrdma_unmap_one() was already done during deregistration.
1224  * Redo only the ib_post_send().
1225  */
1226 static void
1227 rpcrdma_retry_local_inv(struct rpcrdma_mw *r, struct rpcrdma_ia *ia)
1228 {
1229         struct rpcrdma_xprt *r_xprt =
1230                                 container_of(ia, struct rpcrdma_xprt, rx_ia);
1231         struct ib_send_wr invalidate_wr, *bad_wr;
1232         int rc;
1233
1234         dprintk("RPC:       %s: FRMR %p is stale\n", __func__, r);
1235
1236         /* When this FRMR is re-inserted into rb_mws, it is no longer stale */
1237         r->r.frmr.fr_state = FRMR_IS_INVALID;
1238
1239         memset(&invalidate_wr, 0, sizeof(invalidate_wr));
1240         invalidate_wr.wr_id = (unsigned long)(void *)r;
1241         invalidate_wr.opcode = IB_WR_LOCAL_INV;
1242         invalidate_wr.ex.invalidate_rkey = r->r.frmr.fr_mr->rkey;
1243         DECR_CQCOUNT(&r_xprt->rx_ep);
1244
1245         dprintk("RPC:       %s: frmr %p invalidating rkey %08x\n",
1246                 __func__, r, r->r.frmr.fr_mr->rkey);
1247
1248         read_lock(&ia->ri_qplock);
1249         rc = ib_post_send(ia->ri_id->qp, &invalidate_wr, &bad_wr);
1250         read_unlock(&ia->ri_qplock);
1251         if (rc) {
1252                 /* Force rpcrdma_buffer_get() to retry */
1253                 r->r.frmr.fr_state = FRMR_IS_STALE;
1254                 dprintk("RPC:       %s: ib_post_send failed, %i\n",
1255                         __func__, rc);
1256         }
1257 }
1258
1259 static void
1260 rpcrdma_retry_flushed_linv(struct list_head *stale,
1261                            struct rpcrdma_buffer *buf)
1262 {
1263         struct rpcrdma_ia *ia = rdmab_to_ia(buf);
1264         struct list_head *pos;
1265         struct rpcrdma_mw *r;
1266         unsigned long flags;
1267
1268         list_for_each(pos, stale) {
1269                 r = list_entry(pos, struct rpcrdma_mw, mw_list);
1270                 rpcrdma_retry_local_inv(r, ia);
1271         }
1272
1273         spin_lock_irqsave(&buf->rb_lock, flags);
1274         list_splice_tail(stale, &buf->rb_mws);
1275         spin_unlock_irqrestore(&buf->rb_lock, flags);
1276 }
1277
1278 static struct rpcrdma_req *
1279 rpcrdma_buffer_get_frmrs(struct rpcrdma_req *req, struct rpcrdma_buffer *buf,
1280                          struct list_head *stale)
1281 {
1282         struct rpcrdma_mw *r;
1283         int i;
1284
1285         i = RPCRDMA_MAX_SEGS - 1;
1286         while (!list_empty(&buf->rb_mws)) {
1287                 r = list_entry(buf->rb_mws.next,
1288                                struct rpcrdma_mw, mw_list);
1289                 list_del(&r->mw_list);
1290                 if (r->r.frmr.fr_state == FRMR_IS_STALE) {
1291                         list_add(&r->mw_list, stale);
1292                         continue;
1293                 }
1294                 req->rl_segments[i].rl_mw = r;
1295                 if (unlikely(i-- == 0))
1296                         return req;     /* Success */
1297         }
1298
1299         /* Not enough entries on rb_mws for this req */
1300         rpcrdma_buffer_put_sendbuf(req, buf);
1301         rpcrdma_buffer_put_mrs(req, buf);
1302         return NULL;
1303 }
1304
1305 static struct rpcrdma_req *
1306 rpcrdma_buffer_get_fmrs(struct rpcrdma_req *req, struct rpcrdma_buffer *buf)
1307 {
1308         struct rpcrdma_mw *r;
1309         int i;
1310
1311         i = RPCRDMA_MAX_SEGS - 1;
1312         while (!list_empty(&buf->rb_mws)) {
1313                 r = list_entry(buf->rb_mws.next,
1314                                struct rpcrdma_mw, mw_list);
1315                 list_del(&r->mw_list);
1316                 req->rl_segments[i].rl_mw = r;
1317                 if (unlikely(i-- == 0))
1318                         return req;     /* Success */
1319         }
1320
1321         /* Not enough entries on rb_mws for this req */
1322         rpcrdma_buffer_put_sendbuf(req, buf);
1323         rpcrdma_buffer_put_mrs(req, buf);
1324         return NULL;
1325 }
1326
1327 /*
1328  * Get a set of request/reply buffers.
1329  *
1330  * Reply buffer (if needed) is attached to send buffer upon return.
1331  * Rule:
1332  *    rb_send_index and rb_recv_index MUST always be pointing to the
1333  *    *next* available buffer (non-NULL). They are incremented after
1334  *    removing buffers, and decremented *before* returning them.
1335  */
1336 struct rpcrdma_req *
1337 rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
1338 {
1339         struct rpcrdma_ia *ia = rdmab_to_ia(buffers);
1340         struct list_head stale;
1341         struct rpcrdma_req *req;
1342         unsigned long flags;
1343
1344         spin_lock_irqsave(&buffers->rb_lock, flags);
1345         if (buffers->rb_send_index == buffers->rb_max_requests) {
1346                 spin_unlock_irqrestore(&buffers->rb_lock, flags);
1347                 dprintk("RPC:       %s: out of request buffers\n", __func__);
1348                 return ((struct rpcrdma_req *)NULL);
1349         }
1350
1351         req = buffers->rb_send_bufs[buffers->rb_send_index];
1352         if (buffers->rb_send_index < buffers->rb_recv_index) {
1353                 dprintk("RPC:       %s: %d extra receives outstanding (ok)\n",
1354                         __func__,
1355                         buffers->rb_recv_index - buffers->rb_send_index);
1356                 req->rl_reply = NULL;
1357         } else {
1358                 req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
1359                 buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
1360         }
1361         buffers->rb_send_bufs[buffers->rb_send_index++] = NULL;
1362
1363         INIT_LIST_HEAD(&stale);
1364         switch (ia->ri_memreg_strategy) {
1365         case RPCRDMA_FRMR:
1366                 req = rpcrdma_buffer_get_frmrs(req, buffers, &stale);
1367                 break;
1368         case RPCRDMA_MTHCAFMR:
1369                 req = rpcrdma_buffer_get_fmrs(req, buffers);
1370                 break;
1371         default:
1372                 break;
1373         }
1374         spin_unlock_irqrestore(&buffers->rb_lock, flags);
1375         if (!list_empty(&stale))
1376                 rpcrdma_retry_flushed_linv(&stale, buffers);
1377         return req;
1378 }
1379
1380 /*
1381  * Put request/reply buffers back into pool.
1382  * Pre-decrement counter/array index.
1383  */
1384 void
1385 rpcrdma_buffer_put(struct rpcrdma_req *req)
1386 {
1387         struct rpcrdma_buffer *buffers = req->rl_buffer;
1388         struct rpcrdma_ia *ia = rdmab_to_ia(buffers);
1389         unsigned long flags;
1390
1391         spin_lock_irqsave(&buffers->rb_lock, flags);
1392         rpcrdma_buffer_put_sendbuf(req, buffers);
1393         switch (ia->ri_memreg_strategy) {
1394         case RPCRDMA_FRMR:
1395         case RPCRDMA_MTHCAFMR:
1396                 rpcrdma_buffer_put_mrs(req, buffers);
1397                 break;
1398         default:
1399                 break;
1400         }
1401         spin_unlock_irqrestore(&buffers->rb_lock, flags);
1402 }
1403
1404 /*
1405  * Recover reply buffers from pool.
1406  * This happens when recovering from error conditions.
1407  * Post-increment counter/array index.
1408  */
1409 void
1410 rpcrdma_recv_buffer_get(struct rpcrdma_req *req)
1411 {
1412         struct rpcrdma_buffer *buffers = req->rl_buffer;
1413         unsigned long flags;
1414
1415         spin_lock_irqsave(&buffers->rb_lock, flags);
1416         if (buffers->rb_recv_index < buffers->rb_max_requests) {
1417                 req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
1418                 buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
1419         }
1420         spin_unlock_irqrestore(&buffers->rb_lock, flags);
1421 }
1422
1423 /*
1424  * Put reply buffers back into pool when not attached to
1425  * request. This happens in error conditions.
1426  */
1427 void
1428 rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
1429 {
1430         struct rpcrdma_buffer *buffers = rep->rr_buffer;
1431         unsigned long flags;
1432
1433         rep->rr_func = NULL;
1434         spin_lock_irqsave(&buffers->rb_lock, flags);
1435         buffers->rb_recv_bufs[--buffers->rb_recv_index] = rep;
1436         spin_unlock_irqrestore(&buffers->rb_lock, flags);
1437 }
1438
1439 /*
1440  * Wrappers for internal-use kmalloc memory registration, used by buffer code.
1441  */
1442
1443 static int
1444 rpcrdma_register_internal(struct rpcrdma_ia *ia, void *va, int len,
1445                                 struct ib_mr **mrp, struct ib_sge *iov)
1446 {
1447         struct ib_phys_buf ipb;
1448         struct ib_mr *mr;
1449         int rc;
1450
1451         /*
1452          * All memory passed here was kmalloc'ed, therefore phys-contiguous.
1453          */
1454         iov->addr = ib_dma_map_single(ia->ri_id->device,
1455                         va, len, DMA_BIDIRECTIONAL);
1456         if (ib_dma_mapping_error(ia->ri_id->device, iov->addr))
1457                 return -ENOMEM;
1458
1459         iov->length = len;
1460
1461         if (ia->ri_have_dma_lkey) {
1462                 *mrp = NULL;
1463                 iov->lkey = ia->ri_dma_lkey;
1464                 return 0;
1465         } else if (ia->ri_bind_mem != NULL) {
1466                 *mrp = NULL;
1467                 iov->lkey = ia->ri_bind_mem->lkey;
1468                 return 0;
1469         }
1470
1471         ipb.addr = iov->addr;
1472         ipb.size = iov->length;
1473         mr = ib_reg_phys_mr(ia->ri_pd, &ipb, 1,
1474                         IB_ACCESS_LOCAL_WRITE, &iov->addr);
1475
1476         dprintk("RPC:       %s: phys convert: 0x%llx "
1477                         "registered 0x%llx length %d\n",
1478                         __func__, (unsigned long long)ipb.addr,
1479                         (unsigned long long)iov->addr, len);
1480
1481         if (IS_ERR(mr)) {
1482                 *mrp = NULL;
1483                 rc = PTR_ERR(mr);
1484                 dprintk("RPC:       %s: failed with %i\n", __func__, rc);
1485         } else {
1486                 *mrp = mr;
1487                 iov->lkey = mr->lkey;
1488                 rc = 0;
1489         }
1490
1491         return rc;
1492 }
1493
1494 static int
1495 rpcrdma_deregister_internal(struct rpcrdma_ia *ia,
1496                                 struct ib_mr *mr, struct ib_sge *iov)
1497 {
1498         int rc;
1499
1500         ib_dma_unmap_single(ia->ri_id->device,
1501                         iov->addr, iov->length, DMA_BIDIRECTIONAL);
1502
1503         if (NULL == mr)
1504                 return 0;
1505
1506         rc = ib_dereg_mr(mr);
1507         if (rc)
1508                 dprintk("RPC:       %s: ib_dereg_mr failed %i\n", __func__, rc);
1509         return rc;
1510 }
1511
1512 /**
1513  * rpcrdma_alloc_regbuf - kmalloc and register memory for SEND/RECV buffers
1514  * @ia: controlling rpcrdma_ia
1515  * @size: size of buffer to be allocated, in bytes
1516  * @flags: GFP flags
1517  *
1518  * Returns pointer to private header of an area of internally
1519  * registered memory, or an ERR_PTR. The registered buffer follows
1520  * the end of the private header.
1521  *
1522  * xprtrdma uses a regbuf for posting an outgoing RDMA SEND, or for
1523  * receiving the payload of RDMA RECV operations. regbufs are not
1524  * used for RDMA READ/WRITE operations, thus are registered only for
1525  * LOCAL access.
1526  */
1527 struct rpcrdma_regbuf *
1528 rpcrdma_alloc_regbuf(struct rpcrdma_ia *ia, size_t size, gfp_t flags)
1529 {
1530         struct rpcrdma_regbuf *rb;
1531         int rc;
1532
1533         rc = -ENOMEM;
1534         rb = kmalloc(sizeof(*rb) + size, flags);
1535         if (rb == NULL)
1536                 goto out;
1537
1538         rb->rg_size = size;
1539         rb->rg_owner = NULL;
1540         rc = rpcrdma_register_internal(ia, rb->rg_base, size,
1541                                        &rb->rg_mr, &rb->rg_iov);
1542         if (rc)
1543                 goto out_free;
1544
1545         return rb;
1546
1547 out_free:
1548         kfree(rb);
1549 out:
1550         return ERR_PTR(rc);
1551 }
1552
1553 /**
1554  * rpcrdma_free_regbuf - deregister and free registered buffer
1555  * @ia: controlling rpcrdma_ia
1556  * @rb: regbuf to be deregistered and freed
1557  */
1558 void
1559 rpcrdma_free_regbuf(struct rpcrdma_ia *ia, struct rpcrdma_regbuf *rb)
1560 {
1561         if (rb) {
1562                 rpcrdma_deregister_internal(ia, rb->rg_mr, &rb->rg_iov);
1563                 kfree(rb);
1564         }
1565 }
1566
1567 /*
1568  * Wrappers for chunk registration, shared by read/write chunk code.
1569  */
1570
1571 void
1572 rpcrdma_map_one(struct rpcrdma_ia *ia, struct rpcrdma_mr_seg *seg, bool writing)
1573 {
1574         seg->mr_dir = writing ? DMA_FROM_DEVICE : DMA_TO_DEVICE;
1575         seg->mr_dmalen = seg->mr_len;
1576         if (seg->mr_page)
1577                 seg->mr_dma = ib_dma_map_page(ia->ri_id->device,
1578                                 seg->mr_page, offset_in_page(seg->mr_offset),
1579                                 seg->mr_dmalen, seg->mr_dir);
1580         else
1581                 seg->mr_dma = ib_dma_map_single(ia->ri_id->device,
1582                                 seg->mr_offset,
1583                                 seg->mr_dmalen, seg->mr_dir);
1584         if (ib_dma_mapping_error(ia->ri_id->device, seg->mr_dma)) {
1585                 dprintk("RPC:       %s: mr_dma %llx mr_offset %p mr_dma_len %zu\n",
1586                         __func__,
1587                         (unsigned long long)seg->mr_dma,
1588                         seg->mr_offset, seg->mr_dmalen);
1589         }
1590 }
1591
1592 void
1593 rpcrdma_unmap_one(struct rpcrdma_ia *ia, struct rpcrdma_mr_seg *seg)
1594 {
1595         if (seg->mr_page)
1596                 ib_dma_unmap_page(ia->ri_id->device,
1597                                 seg->mr_dma, seg->mr_dmalen, seg->mr_dir);
1598         else
1599                 ib_dma_unmap_single(ia->ri_id->device,
1600                                 seg->mr_dma, seg->mr_dmalen, seg->mr_dir);
1601 }
1602
1603 /*
1604  * Prepost any receive buffer, then post send.
1605  *
1606  * Receive buffer is donated to hardware, reclaimed upon recv completion.
1607  */
1608 int
1609 rpcrdma_ep_post(struct rpcrdma_ia *ia,
1610                 struct rpcrdma_ep *ep,
1611                 struct rpcrdma_req *req)
1612 {
1613         struct ib_send_wr send_wr, *send_wr_fail;
1614         struct rpcrdma_rep *rep = req->rl_reply;
1615         int rc;
1616
1617         if (rep) {
1618                 rc = rpcrdma_ep_post_recv(ia, ep, rep);
1619                 if (rc)
1620                         goto out;
1621                 req->rl_reply = NULL;
1622         }
1623
1624         send_wr.next = NULL;
1625         send_wr.wr_id = 0ULL;   /* no send cookie */
1626         send_wr.sg_list = req->rl_send_iov;
1627         send_wr.num_sge = req->rl_niovs;
1628         send_wr.opcode = IB_WR_SEND;
1629         if (send_wr.num_sge == 4)       /* no need to sync any pad (constant) */
1630                 ib_dma_sync_single_for_device(ia->ri_id->device,
1631                         req->rl_send_iov[3].addr, req->rl_send_iov[3].length,
1632                         DMA_TO_DEVICE);
1633         ib_dma_sync_single_for_device(ia->ri_id->device,
1634                 req->rl_send_iov[1].addr, req->rl_send_iov[1].length,
1635                 DMA_TO_DEVICE);
1636         ib_dma_sync_single_for_device(ia->ri_id->device,
1637                 req->rl_send_iov[0].addr, req->rl_send_iov[0].length,
1638                 DMA_TO_DEVICE);
1639
1640         if (DECR_CQCOUNT(ep) > 0)
1641                 send_wr.send_flags = 0;
1642         else { /* Provider must take a send completion every now and then */
1643                 INIT_CQCOUNT(ep);
1644                 send_wr.send_flags = IB_SEND_SIGNALED;
1645         }
1646
1647         rc = ib_post_send(ia->ri_id->qp, &send_wr, &send_wr_fail);
1648         if (rc)
1649                 dprintk("RPC:       %s: ib_post_send returned %i\n", __func__,
1650                         rc);
1651 out:
1652         return rc;
1653 }
1654
1655 /*
1656  * (Re)post a receive buffer.
1657  */
1658 int
1659 rpcrdma_ep_post_recv(struct rpcrdma_ia *ia,
1660                      struct rpcrdma_ep *ep,
1661                      struct rpcrdma_rep *rep)
1662 {
1663         struct ib_recv_wr recv_wr, *recv_wr_fail;
1664         int rc;
1665
1666         recv_wr.next = NULL;
1667         recv_wr.wr_id = (u64) (unsigned long) rep;
1668         recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov;
1669         recv_wr.num_sge = 1;
1670
1671         ib_dma_sync_single_for_cpu(ia->ri_id->device,
1672                                    rdmab_addr(rep->rr_rdmabuf),
1673                                    rdmab_length(rep->rr_rdmabuf),
1674                                    DMA_BIDIRECTIONAL);
1675
1676         rc = ib_post_recv(ia->ri_id->qp, &recv_wr, &recv_wr_fail);
1677
1678         if (rc)
1679                 dprintk("RPC:       %s: ib_post_recv returned %i\n", __func__,
1680                         rc);
1681         return rc;
1682 }
1683
1684 /* How many chunk list items fit within our inline buffers?
1685  */
1686 unsigned int
1687 rpcrdma_max_segments(struct rpcrdma_xprt *r_xprt)
1688 {
1689         struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
1690         int bytes, segments;
1691
1692         bytes = min_t(unsigned int, cdata->inline_wsize, cdata->inline_rsize);
1693         bytes -= RPCRDMA_HDRLEN_MIN;
1694         if (bytes < sizeof(struct rpcrdma_segment) * 2) {
1695                 pr_warn("RPC:       %s: inline threshold too small\n",
1696                         __func__);
1697                 return 0;
1698         }
1699
1700         segments = 1 << (fls(bytes / sizeof(struct rpcrdma_segment)) - 1);
1701         dprintk("RPC:       %s: max chunk list size = %d segments\n",
1702                 __func__, segments);
1703         return segments;
1704 }