OSDN Git Service

xprtrdma: Replace rpcrdma_rep::rr_buffer with rr_rxprt
[uclinux-h8/linux.git] / net / sunrpc / xprtrdma / verbs.c
1 /*
2  * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
3  *
4  * This software is available to you under a choice of one of two
5  * licenses.  You may choose to be licensed under the terms of the GNU
6  * General Public License (GPL) Version 2, available from the file
7  * COPYING in the main directory of this source tree, or the BSD-type
8  * license below:
9  *
10  * Redistribution and use in source and binary forms, with or without
11  * modification, are permitted provided that the following conditions
12  * are met:
13  *
14  *      Redistributions of source code must retain the above copyright
15  *      notice, this list of conditions and the following disclaimer.
16  *
17  *      Redistributions in binary form must reproduce the above
18  *      copyright notice, this list of conditions and the following
19  *      disclaimer in the documentation and/or other materials provided
20  *      with the distribution.
21  *
22  *      Neither the name of the Network Appliance, Inc. nor the names of
23  *      its contributors may be used to endorse or promote products
24  *      derived from this software without specific prior written
25  *      permission.
26  *
27  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
28  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
29  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
30  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
31  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
32  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
33  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
34  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
35  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
36  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
37  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
38  */
39
40 /*
41  * verbs.c
42  *
43  * Encapsulates the major functions managing:
44  *  o adapters
45  *  o endpoints
46  *  o connections
47  *  o buffer memory
48  */
49
50 #include <linux/interrupt.h>
51 #include <linux/slab.h>
52 #include <linux/prefetch.h>
53 #include <linux/sunrpc/addr.h>
54 #include <asm/bitops.h>
55
56 #include "xprt_rdma.h"
57
58 /*
59  * Globals/Macros
60  */
61
62 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
63 # define RPCDBG_FACILITY        RPCDBG_TRANS
64 #endif
65
66 /*
67  * internal functions
68  */
69
70 /*
71  * handle replies in tasklet context, using a single, global list
72  * rdma tasklet function -- just turn around and call the func
73  * for all replies on the list
74  */
75
76 static DEFINE_SPINLOCK(rpcrdma_tk_lock_g);
77 static LIST_HEAD(rpcrdma_tasklets_g);
78
79 static void
80 rpcrdma_run_tasklet(unsigned long data)
81 {
82         struct rpcrdma_rep *rep;
83         void (*func)(struct rpcrdma_rep *);
84         unsigned long flags;
85
86         data = data;
87         spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
88         while (!list_empty(&rpcrdma_tasklets_g)) {
89                 rep = list_entry(rpcrdma_tasklets_g.next,
90                                  struct rpcrdma_rep, rr_list);
91                 list_del(&rep->rr_list);
92                 func = rep->rr_func;
93                 rep->rr_func = NULL;
94                 spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
95
96                 if (func)
97                         func(rep);
98                 else
99                         rpcrdma_recv_buffer_put(rep);
100
101                 spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
102         }
103         spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
104 }
105
106 static DECLARE_TASKLET(rpcrdma_tasklet_g, rpcrdma_run_tasklet, 0UL);
107
108 static const char * const async_event[] = {
109         "CQ error",
110         "QP fatal error",
111         "QP request error",
112         "QP access error",
113         "communication established",
114         "send queue drained",
115         "path migration successful",
116         "path mig error",
117         "device fatal error",
118         "port active",
119         "port error",
120         "LID change",
121         "P_key change",
122         "SM change",
123         "SRQ error",
124         "SRQ limit reached",
125         "last WQE reached",
126         "client reregister",
127         "GID change",
128 };
129
130 #define ASYNC_MSG(status)                                       \
131         ((status) < ARRAY_SIZE(async_event) ?                   \
132                 async_event[(status)] : "unknown async error")
133
134 static void
135 rpcrdma_schedule_tasklet(struct list_head *sched_list)
136 {
137         unsigned long flags;
138
139         spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
140         list_splice_tail(sched_list, &rpcrdma_tasklets_g);
141         spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
142         tasklet_schedule(&rpcrdma_tasklet_g);
143 }
144
145 static void
146 rpcrdma_qp_async_error_upcall(struct ib_event *event, void *context)
147 {
148         struct rpcrdma_ep *ep = context;
149
150         pr_err("RPC:       %s: %s on device %s ep %p\n",
151                __func__, ASYNC_MSG(event->event),
152                 event->device->name, context);
153         if (ep->rep_connected == 1) {
154                 ep->rep_connected = -EIO;
155                 rpcrdma_conn_func(ep);
156                 wake_up_all(&ep->rep_connect_wait);
157         }
158 }
159
160 static void
161 rpcrdma_cq_async_error_upcall(struct ib_event *event, void *context)
162 {
163         struct rpcrdma_ep *ep = context;
164
165         pr_err("RPC:       %s: %s on device %s ep %p\n",
166                __func__, ASYNC_MSG(event->event),
167                 event->device->name, context);
168         if (ep->rep_connected == 1) {
169                 ep->rep_connected = -EIO;
170                 rpcrdma_conn_func(ep);
171                 wake_up_all(&ep->rep_connect_wait);
172         }
173 }
174
175 static const char * const wc_status[] = {
176         "success",
177         "local length error",
178         "local QP operation error",
179         "local EE context operation error",
180         "local protection error",
181         "WR flushed",
182         "memory management operation error",
183         "bad response error",
184         "local access error",
185         "remote invalid request error",
186         "remote access error",
187         "remote operation error",
188         "transport retry counter exceeded",
189         "RNR retry counter exceeded",
190         "local RDD violation error",
191         "remove invalid RD request",
192         "operation aborted",
193         "invalid EE context number",
194         "invalid EE context state",
195         "fatal error",
196         "response timeout error",
197         "general error",
198 };
199
200 #define COMPLETION_MSG(status)                                  \
201         ((status) < ARRAY_SIZE(wc_status) ?                     \
202                 wc_status[(status)] : "unexpected completion error")
203
204 static void
205 rpcrdma_sendcq_process_wc(struct ib_wc *wc)
206 {
207         /* WARNING: Only wr_id and status are reliable at this point */
208         if (wc->wr_id == RPCRDMA_IGNORE_COMPLETION) {
209                 if (wc->status != IB_WC_SUCCESS &&
210                     wc->status != IB_WC_WR_FLUSH_ERR)
211                         pr_err("RPC:       %s: SEND: %s\n",
212                                __func__, COMPLETION_MSG(wc->status));
213         } else {
214                 struct rpcrdma_mw *r;
215
216                 r = (struct rpcrdma_mw *)(unsigned long)wc->wr_id;
217                 r->mw_sendcompletion(wc);
218         }
219 }
220
221 static int
222 rpcrdma_sendcq_poll(struct ib_cq *cq, struct rpcrdma_ep *ep)
223 {
224         struct ib_wc *wcs;
225         int budget, count, rc;
226
227         budget = RPCRDMA_WC_BUDGET / RPCRDMA_POLLSIZE;
228         do {
229                 wcs = ep->rep_send_wcs;
230
231                 rc = ib_poll_cq(cq, RPCRDMA_POLLSIZE, wcs);
232                 if (rc <= 0)
233                         return rc;
234
235                 count = rc;
236                 while (count-- > 0)
237                         rpcrdma_sendcq_process_wc(wcs++);
238         } while (rc == RPCRDMA_POLLSIZE && --budget);
239         return 0;
240 }
241
242 /*
243  * Handle send, fast_reg_mr, and local_inv completions.
244  *
245  * Send events are typically suppressed and thus do not result
246  * in an upcall. Occasionally one is signaled, however. This
247  * prevents the provider's completion queue from wrapping and
248  * losing a completion.
249  */
250 static void
251 rpcrdma_sendcq_upcall(struct ib_cq *cq, void *cq_context)
252 {
253         struct rpcrdma_ep *ep = (struct rpcrdma_ep *)cq_context;
254         int rc;
255
256         rc = rpcrdma_sendcq_poll(cq, ep);
257         if (rc) {
258                 dprintk("RPC:       %s: ib_poll_cq failed: %i\n",
259                         __func__, rc);
260                 return;
261         }
262
263         rc = ib_req_notify_cq(cq,
264                         IB_CQ_NEXT_COMP | IB_CQ_REPORT_MISSED_EVENTS);
265         if (rc == 0)
266                 return;
267         if (rc < 0) {
268                 dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
269                         __func__, rc);
270                 return;
271         }
272
273         rpcrdma_sendcq_poll(cq, ep);
274 }
275
276 static void
277 rpcrdma_recvcq_process_wc(struct ib_wc *wc, struct list_head *sched_list)
278 {
279         struct rpcrdma_rep *rep =
280                         (struct rpcrdma_rep *)(unsigned long)wc->wr_id;
281         struct rpcrdma_ia *ia;
282
283         /* WARNING: Only wr_id and status are reliable at this point */
284         if (wc->status != IB_WC_SUCCESS)
285                 goto out_fail;
286
287         /* status == SUCCESS means all fields in wc are trustworthy */
288         if (wc->opcode != IB_WC_RECV)
289                 return;
290
291         dprintk("RPC:       %s: rep %p opcode 'recv', length %u: success\n",
292                 __func__, rep, wc->byte_len);
293
294         ia = &rep->rr_rxprt->rx_ia;
295         rep->rr_len = wc->byte_len;
296         ib_dma_sync_single_for_cpu(ia->ri_id->device,
297                                    rdmab_addr(rep->rr_rdmabuf),
298                                    rep->rr_len, DMA_FROM_DEVICE);
299         prefetch(rdmab_to_msg(rep->rr_rdmabuf));
300
301 out_schedule:
302         list_add_tail(&rep->rr_list, sched_list);
303         return;
304 out_fail:
305         if (wc->status != IB_WC_WR_FLUSH_ERR)
306                 pr_err("RPC:       %s: rep %p: %s\n",
307                        __func__, rep, COMPLETION_MSG(wc->status));
308         rep->rr_len = ~0U;
309         goto out_schedule;
310 }
311
312 static int
313 rpcrdma_recvcq_poll(struct ib_cq *cq, struct rpcrdma_ep *ep)
314 {
315         struct list_head sched_list;
316         struct ib_wc *wcs;
317         int budget, count, rc;
318
319         INIT_LIST_HEAD(&sched_list);
320         budget = RPCRDMA_WC_BUDGET / RPCRDMA_POLLSIZE;
321         do {
322                 wcs = ep->rep_recv_wcs;
323
324                 rc = ib_poll_cq(cq, RPCRDMA_POLLSIZE, wcs);
325                 if (rc <= 0)
326                         goto out_schedule;
327
328                 count = rc;
329                 while (count-- > 0)
330                         rpcrdma_recvcq_process_wc(wcs++, &sched_list);
331         } while (rc == RPCRDMA_POLLSIZE && --budget);
332         rc = 0;
333
334 out_schedule:
335         rpcrdma_schedule_tasklet(&sched_list);
336         return rc;
337 }
338
339 /*
340  * Handle receive completions.
341  *
342  * It is reentrant but processes single events in order to maintain
343  * ordering of receives to keep server credits.
344  *
345  * It is the responsibility of the scheduled tasklet to return
346  * recv buffers to the pool. NOTE: this affects synchronization of
347  * connection shutdown. That is, the structures required for
348  * the completion of the reply handler must remain intact until
349  * all memory has been reclaimed.
350  */
351 static void
352 rpcrdma_recvcq_upcall(struct ib_cq *cq, void *cq_context)
353 {
354         struct rpcrdma_ep *ep = (struct rpcrdma_ep *)cq_context;
355         int rc;
356
357         rc = rpcrdma_recvcq_poll(cq, ep);
358         if (rc) {
359                 dprintk("RPC:       %s: ib_poll_cq failed: %i\n",
360                         __func__, rc);
361                 return;
362         }
363
364         rc = ib_req_notify_cq(cq,
365                         IB_CQ_NEXT_COMP | IB_CQ_REPORT_MISSED_EVENTS);
366         if (rc == 0)
367                 return;
368         if (rc < 0) {
369                 dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
370                         __func__, rc);
371                 return;
372         }
373
374         rpcrdma_recvcq_poll(cq, ep);
375 }
376
377 static void
378 rpcrdma_flush_cqs(struct rpcrdma_ep *ep)
379 {
380         struct ib_wc wc;
381         LIST_HEAD(sched_list);
382
383         while (ib_poll_cq(ep->rep_attr.recv_cq, 1, &wc) > 0)
384                 rpcrdma_recvcq_process_wc(&wc, &sched_list);
385         if (!list_empty(&sched_list))
386                 rpcrdma_schedule_tasklet(&sched_list);
387         while (ib_poll_cq(ep->rep_attr.send_cq, 1, &wc) > 0)
388                 rpcrdma_sendcq_process_wc(&wc);
389 }
390
391 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
392 static const char * const conn[] = {
393         "address resolved",
394         "address error",
395         "route resolved",
396         "route error",
397         "connect request",
398         "connect response",
399         "connect error",
400         "unreachable",
401         "rejected",
402         "established",
403         "disconnected",
404         "device removal",
405         "multicast join",
406         "multicast error",
407         "address change",
408         "timewait exit",
409 };
410
411 #define CONNECTION_MSG(status)                                          \
412         ((status) < ARRAY_SIZE(conn) ?                                  \
413                 conn[(status)] : "unrecognized connection error")
414 #endif
415
416 static int
417 rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
418 {
419         struct rpcrdma_xprt *xprt = id->context;
420         struct rpcrdma_ia *ia = &xprt->rx_ia;
421         struct rpcrdma_ep *ep = &xprt->rx_ep;
422 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
423         struct sockaddr *sap = (struct sockaddr *)&ep->rep_remote_addr;
424 #endif
425         struct ib_qp_attr *attr = &ia->ri_qp_attr;
426         struct ib_qp_init_attr *iattr = &ia->ri_qp_init_attr;
427         int connstate = 0;
428
429         switch (event->event) {
430         case RDMA_CM_EVENT_ADDR_RESOLVED:
431         case RDMA_CM_EVENT_ROUTE_RESOLVED:
432                 ia->ri_async_rc = 0;
433                 complete(&ia->ri_done);
434                 break;
435         case RDMA_CM_EVENT_ADDR_ERROR:
436                 ia->ri_async_rc = -EHOSTUNREACH;
437                 dprintk("RPC:       %s: CM address resolution error, ep 0x%p\n",
438                         __func__, ep);
439                 complete(&ia->ri_done);
440                 break;
441         case RDMA_CM_EVENT_ROUTE_ERROR:
442                 ia->ri_async_rc = -ENETUNREACH;
443                 dprintk("RPC:       %s: CM route resolution error, ep 0x%p\n",
444                         __func__, ep);
445                 complete(&ia->ri_done);
446                 break;
447         case RDMA_CM_EVENT_ESTABLISHED:
448                 connstate = 1;
449                 ib_query_qp(ia->ri_id->qp, attr,
450                             IB_QP_MAX_QP_RD_ATOMIC | IB_QP_MAX_DEST_RD_ATOMIC,
451                             iattr);
452                 dprintk("RPC:       %s: %d responder resources"
453                         " (%d initiator)\n",
454                         __func__, attr->max_dest_rd_atomic,
455                         attr->max_rd_atomic);
456                 goto connected;
457         case RDMA_CM_EVENT_CONNECT_ERROR:
458                 connstate = -ENOTCONN;
459                 goto connected;
460         case RDMA_CM_EVENT_UNREACHABLE:
461                 connstate = -ENETDOWN;
462                 goto connected;
463         case RDMA_CM_EVENT_REJECTED:
464                 connstate = -ECONNREFUSED;
465                 goto connected;
466         case RDMA_CM_EVENT_DISCONNECTED:
467                 connstate = -ECONNABORTED;
468                 goto connected;
469         case RDMA_CM_EVENT_DEVICE_REMOVAL:
470                 connstate = -ENODEV;
471 connected:
472                 dprintk("RPC:       %s: %sconnected\n",
473                                         __func__, connstate > 0 ? "" : "dis");
474                 ep->rep_connected = connstate;
475                 rpcrdma_conn_func(ep);
476                 wake_up_all(&ep->rep_connect_wait);
477                 /*FALLTHROUGH*/
478         default:
479                 dprintk("RPC:       %s: %pIS:%u (ep 0x%p): %s\n",
480                         __func__, sap, rpc_get_port(sap), ep,
481                         CONNECTION_MSG(event->event));
482                 break;
483         }
484
485 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
486         if (connstate == 1) {
487                 int ird = attr->max_dest_rd_atomic;
488                 int tird = ep->rep_remote_cma.responder_resources;
489
490                 pr_info("rpcrdma: connection to %pIS:%u on %s, memreg '%s', %d credits, %d responders%s\n",
491                         sap, rpc_get_port(sap),
492                         ia->ri_id->device->name,
493                         ia->ri_ops->ro_displayname,
494                         xprt->rx_buf.rb_max_requests,
495                         ird, ird < 4 && ird < tird / 2 ? " (low!)" : "");
496         } else if (connstate < 0) {
497                 pr_info("rpcrdma: connection to %pIS:%u closed (%d)\n",
498                         sap, rpc_get_port(sap), connstate);
499         }
500 #endif
501
502         return 0;
503 }
504
505 static struct rdma_cm_id *
506 rpcrdma_create_id(struct rpcrdma_xprt *xprt,
507                         struct rpcrdma_ia *ia, struct sockaddr *addr)
508 {
509         struct rdma_cm_id *id;
510         int rc;
511
512         init_completion(&ia->ri_done);
513
514         id = rdma_create_id(rpcrdma_conn_upcall, xprt, RDMA_PS_TCP, IB_QPT_RC);
515         if (IS_ERR(id)) {
516                 rc = PTR_ERR(id);
517                 dprintk("RPC:       %s: rdma_create_id() failed %i\n",
518                         __func__, rc);
519                 return id;
520         }
521
522         ia->ri_async_rc = -ETIMEDOUT;
523         rc = rdma_resolve_addr(id, NULL, addr, RDMA_RESOLVE_TIMEOUT);
524         if (rc) {
525                 dprintk("RPC:       %s: rdma_resolve_addr() failed %i\n",
526                         __func__, rc);
527                 goto out;
528         }
529         wait_for_completion_interruptible_timeout(&ia->ri_done,
530                                 msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1);
531         rc = ia->ri_async_rc;
532         if (rc)
533                 goto out;
534
535         ia->ri_async_rc = -ETIMEDOUT;
536         rc = rdma_resolve_route(id, RDMA_RESOLVE_TIMEOUT);
537         if (rc) {
538                 dprintk("RPC:       %s: rdma_resolve_route() failed %i\n",
539                         __func__, rc);
540                 goto out;
541         }
542         wait_for_completion_interruptible_timeout(&ia->ri_done,
543                                 msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1);
544         rc = ia->ri_async_rc;
545         if (rc)
546                 goto out;
547
548         return id;
549
550 out:
551         rdma_destroy_id(id);
552         return ERR_PTR(rc);
553 }
554
555 /*
556  * Drain any cq, prior to teardown.
557  */
558 static void
559 rpcrdma_clean_cq(struct ib_cq *cq)
560 {
561         struct ib_wc wc;
562         int count = 0;
563
564         while (1 == ib_poll_cq(cq, 1, &wc))
565                 ++count;
566
567         if (count)
568                 dprintk("RPC:       %s: flushed %d events (last 0x%x)\n",
569                         __func__, count, wc.opcode);
570 }
571
572 /*
573  * Exported functions.
574  */
575
576 /*
577  * Open and initialize an Interface Adapter.
578  *  o initializes fields of struct rpcrdma_ia, including
579  *    interface and provider attributes and protection zone.
580  */
581 int
582 rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)
583 {
584         int rc, mem_priv;
585         struct rpcrdma_ia *ia = &xprt->rx_ia;
586         struct ib_device_attr *devattr = &ia->ri_devattr;
587
588         ia->ri_id = rpcrdma_create_id(xprt, ia, addr);
589         if (IS_ERR(ia->ri_id)) {
590                 rc = PTR_ERR(ia->ri_id);
591                 goto out1;
592         }
593
594         ia->ri_pd = ib_alloc_pd(ia->ri_id->device);
595         if (IS_ERR(ia->ri_pd)) {
596                 rc = PTR_ERR(ia->ri_pd);
597                 dprintk("RPC:       %s: ib_alloc_pd() failed %i\n",
598                         __func__, rc);
599                 goto out2;
600         }
601
602         rc = ib_query_device(ia->ri_id->device, devattr);
603         if (rc) {
604                 dprintk("RPC:       %s: ib_query_device failed %d\n",
605                         __func__, rc);
606                 goto out3;
607         }
608
609         if (devattr->device_cap_flags & IB_DEVICE_LOCAL_DMA_LKEY) {
610                 ia->ri_have_dma_lkey = 1;
611                 ia->ri_dma_lkey = ia->ri_id->device->local_dma_lkey;
612         }
613
614         if (memreg == RPCRDMA_FRMR) {
615                 /* Requires both frmr reg and local dma lkey */
616                 if (((devattr->device_cap_flags &
617                      (IB_DEVICE_MEM_MGT_EXTENSIONS|IB_DEVICE_LOCAL_DMA_LKEY)) !=
618                     (IB_DEVICE_MEM_MGT_EXTENSIONS|IB_DEVICE_LOCAL_DMA_LKEY)) ||
619                       (devattr->max_fast_reg_page_list_len == 0)) {
620                         dprintk("RPC:       %s: FRMR registration "
621                                 "not supported by HCA\n", __func__);
622                         memreg = RPCRDMA_MTHCAFMR;
623                 }
624         }
625         if (memreg == RPCRDMA_MTHCAFMR) {
626                 if (!ia->ri_id->device->alloc_fmr) {
627                         dprintk("RPC:       %s: MTHCAFMR registration "
628                                 "not supported by HCA\n", __func__);
629                         memreg = RPCRDMA_ALLPHYSICAL;
630                 }
631         }
632
633         /*
634          * Optionally obtain an underlying physical identity mapping in
635          * order to do a memory window-based bind. This base registration
636          * is protected from remote access - that is enabled only by binding
637          * for the specific bytes targeted during each RPC operation, and
638          * revoked after the corresponding completion similar to a storage
639          * adapter.
640          */
641         switch (memreg) {
642         case RPCRDMA_FRMR:
643                 ia->ri_ops = &rpcrdma_frwr_memreg_ops;
644                 break;
645         case RPCRDMA_ALLPHYSICAL:
646                 ia->ri_ops = &rpcrdma_physical_memreg_ops;
647                 mem_priv = IB_ACCESS_LOCAL_WRITE |
648                                 IB_ACCESS_REMOTE_WRITE |
649                                 IB_ACCESS_REMOTE_READ;
650                 goto register_setup;
651         case RPCRDMA_MTHCAFMR:
652                 ia->ri_ops = &rpcrdma_fmr_memreg_ops;
653                 if (ia->ri_have_dma_lkey)
654                         break;
655                 mem_priv = IB_ACCESS_LOCAL_WRITE;
656         register_setup:
657                 ia->ri_bind_mem = ib_get_dma_mr(ia->ri_pd, mem_priv);
658                 if (IS_ERR(ia->ri_bind_mem)) {
659                         printk(KERN_ALERT "%s: ib_get_dma_mr for "
660                                 "phys register failed with %lX\n",
661                                 __func__, PTR_ERR(ia->ri_bind_mem));
662                         rc = -ENOMEM;
663                         goto out3;
664                 }
665                 break;
666         default:
667                 printk(KERN_ERR "RPC: Unsupported memory "
668                                 "registration mode: %d\n", memreg);
669                 rc = -ENOMEM;
670                 goto out3;
671         }
672         dprintk("RPC:       %s: memory registration strategy is '%s'\n",
673                 __func__, ia->ri_ops->ro_displayname);
674
675         /* Else will do memory reg/dereg for each chunk */
676         ia->ri_memreg_strategy = memreg;
677
678         rwlock_init(&ia->ri_qplock);
679         return 0;
680
681 out3:
682         ib_dealloc_pd(ia->ri_pd);
683         ia->ri_pd = NULL;
684 out2:
685         rdma_destroy_id(ia->ri_id);
686         ia->ri_id = NULL;
687 out1:
688         return rc;
689 }
690
691 /*
692  * Clean up/close an IA.
693  *   o if event handles and PD have been initialized, free them.
694  *   o close the IA
695  */
696 void
697 rpcrdma_ia_close(struct rpcrdma_ia *ia)
698 {
699         int rc;
700
701         dprintk("RPC:       %s: entering\n", __func__);
702         if (ia->ri_bind_mem != NULL) {
703                 rc = ib_dereg_mr(ia->ri_bind_mem);
704                 dprintk("RPC:       %s: ib_dereg_mr returned %i\n",
705                         __func__, rc);
706         }
707
708         if (ia->ri_id != NULL && !IS_ERR(ia->ri_id)) {
709                 if (ia->ri_id->qp)
710                         rdma_destroy_qp(ia->ri_id);
711                 rdma_destroy_id(ia->ri_id);
712                 ia->ri_id = NULL;
713         }
714
715         /* If the pd is still busy, xprtrdma missed freeing a resource */
716         if (ia->ri_pd && !IS_ERR(ia->ri_pd))
717                 WARN_ON(ib_dealloc_pd(ia->ri_pd));
718 }
719
720 /*
721  * Create unconnected endpoint.
722  */
723 int
724 rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
725                                 struct rpcrdma_create_data_internal *cdata)
726 {
727         struct ib_device_attr *devattr = &ia->ri_devattr;
728         struct ib_cq *sendcq, *recvcq;
729         int rc, err;
730
731         /* check provider's send/recv wr limits */
732         if (cdata->max_requests > devattr->max_qp_wr)
733                 cdata->max_requests = devattr->max_qp_wr;
734
735         ep->rep_attr.event_handler = rpcrdma_qp_async_error_upcall;
736         ep->rep_attr.qp_context = ep;
737         ep->rep_attr.srq = NULL;
738         ep->rep_attr.cap.max_send_wr = cdata->max_requests;
739         rc = ia->ri_ops->ro_open(ia, ep, cdata);
740         if (rc)
741                 return rc;
742         ep->rep_attr.cap.max_recv_wr = cdata->max_requests;
743         ep->rep_attr.cap.max_send_sge = (cdata->padding ? 4 : 2);
744         ep->rep_attr.cap.max_recv_sge = 1;
745         ep->rep_attr.cap.max_inline_data = 0;
746         ep->rep_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
747         ep->rep_attr.qp_type = IB_QPT_RC;
748         ep->rep_attr.port_num = ~0;
749
750         if (cdata->padding) {
751                 ep->rep_padbuf = rpcrdma_alloc_regbuf(ia, cdata->padding,
752                                                       GFP_KERNEL);
753                 if (IS_ERR(ep->rep_padbuf))
754                         return PTR_ERR(ep->rep_padbuf);
755         } else
756                 ep->rep_padbuf = NULL;
757
758         dprintk("RPC:       %s: requested max: dtos: send %d recv %d; "
759                 "iovs: send %d recv %d\n",
760                 __func__,
761                 ep->rep_attr.cap.max_send_wr,
762                 ep->rep_attr.cap.max_recv_wr,
763                 ep->rep_attr.cap.max_send_sge,
764                 ep->rep_attr.cap.max_recv_sge);
765
766         /* set trigger for requesting send completion */
767         ep->rep_cqinit = ep->rep_attr.cap.max_send_wr/2 - 1;
768         if (ep->rep_cqinit > RPCRDMA_MAX_UNSIGNALED_SENDS)
769                 ep->rep_cqinit = RPCRDMA_MAX_UNSIGNALED_SENDS;
770         else if (ep->rep_cqinit <= 2)
771                 ep->rep_cqinit = 0;
772         INIT_CQCOUNT(ep);
773         init_waitqueue_head(&ep->rep_connect_wait);
774         INIT_DELAYED_WORK(&ep->rep_connect_worker, rpcrdma_connect_worker);
775
776         sendcq = ib_create_cq(ia->ri_id->device, rpcrdma_sendcq_upcall,
777                                   rpcrdma_cq_async_error_upcall, ep,
778                                   ep->rep_attr.cap.max_send_wr + 1, 0);
779         if (IS_ERR(sendcq)) {
780                 rc = PTR_ERR(sendcq);
781                 dprintk("RPC:       %s: failed to create send CQ: %i\n",
782                         __func__, rc);
783                 goto out1;
784         }
785
786         rc = ib_req_notify_cq(sendcq, IB_CQ_NEXT_COMP);
787         if (rc) {
788                 dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
789                         __func__, rc);
790                 goto out2;
791         }
792
793         recvcq = ib_create_cq(ia->ri_id->device, rpcrdma_recvcq_upcall,
794                                   rpcrdma_cq_async_error_upcall, ep,
795                                   ep->rep_attr.cap.max_recv_wr + 1, 0);
796         if (IS_ERR(recvcq)) {
797                 rc = PTR_ERR(recvcq);
798                 dprintk("RPC:       %s: failed to create recv CQ: %i\n",
799                         __func__, rc);
800                 goto out2;
801         }
802
803         rc = ib_req_notify_cq(recvcq, IB_CQ_NEXT_COMP);
804         if (rc) {
805                 dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
806                         __func__, rc);
807                 ib_destroy_cq(recvcq);
808                 goto out2;
809         }
810
811         ep->rep_attr.send_cq = sendcq;
812         ep->rep_attr.recv_cq = recvcq;
813
814         /* Initialize cma parameters */
815
816         /* RPC/RDMA does not use private data */
817         ep->rep_remote_cma.private_data = NULL;
818         ep->rep_remote_cma.private_data_len = 0;
819
820         /* Client offers RDMA Read but does not initiate */
821         ep->rep_remote_cma.initiator_depth = 0;
822         if (devattr->max_qp_rd_atom > 32)       /* arbitrary but <= 255 */
823                 ep->rep_remote_cma.responder_resources = 32;
824         else
825                 ep->rep_remote_cma.responder_resources =
826                                                 devattr->max_qp_rd_atom;
827
828         ep->rep_remote_cma.retry_count = 7;
829         ep->rep_remote_cma.flow_control = 0;
830         ep->rep_remote_cma.rnr_retry_count = 0;
831
832         return 0;
833
834 out2:
835         err = ib_destroy_cq(sendcq);
836         if (err)
837                 dprintk("RPC:       %s: ib_destroy_cq returned %i\n",
838                         __func__, err);
839 out1:
840         rpcrdma_free_regbuf(ia, ep->rep_padbuf);
841         return rc;
842 }
843
844 /*
845  * rpcrdma_ep_destroy
846  *
847  * Disconnect and destroy endpoint. After this, the only
848  * valid operations on the ep are to free it (if dynamically
849  * allocated) or re-create it.
850  */
851 void
852 rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
853 {
854         int rc;
855
856         dprintk("RPC:       %s: entering, connected is %d\n",
857                 __func__, ep->rep_connected);
858
859         cancel_delayed_work_sync(&ep->rep_connect_worker);
860
861         if (ia->ri_id->qp) {
862                 rpcrdma_ep_disconnect(ep, ia);
863                 rdma_destroy_qp(ia->ri_id);
864                 ia->ri_id->qp = NULL;
865         }
866
867         rpcrdma_free_regbuf(ia, ep->rep_padbuf);
868
869         rpcrdma_clean_cq(ep->rep_attr.recv_cq);
870         rc = ib_destroy_cq(ep->rep_attr.recv_cq);
871         if (rc)
872                 dprintk("RPC:       %s: ib_destroy_cq returned %i\n",
873                         __func__, rc);
874
875         rpcrdma_clean_cq(ep->rep_attr.send_cq);
876         rc = ib_destroy_cq(ep->rep_attr.send_cq);
877         if (rc)
878                 dprintk("RPC:       %s: ib_destroy_cq returned %i\n",
879                         __func__, rc);
880 }
881
882 /*
883  * Connect unconnected endpoint.
884  */
885 int
886 rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
887 {
888         struct rdma_cm_id *id, *old;
889         int rc = 0;
890         int retry_count = 0;
891
892         if (ep->rep_connected != 0) {
893                 struct rpcrdma_xprt *xprt;
894 retry:
895                 dprintk("RPC:       %s: reconnecting...\n", __func__);
896
897                 rpcrdma_ep_disconnect(ep, ia);
898                 rpcrdma_flush_cqs(ep);
899
900                 xprt = container_of(ia, struct rpcrdma_xprt, rx_ia);
901                 ia->ri_ops->ro_reset(xprt);
902
903                 id = rpcrdma_create_id(xprt, ia,
904                                 (struct sockaddr *)&xprt->rx_data.addr);
905                 if (IS_ERR(id)) {
906                         rc = -EHOSTUNREACH;
907                         goto out;
908                 }
909                 /* TEMP TEMP TEMP - fail if new device:
910                  * Deregister/remarshal *all* requests!
911                  * Close and recreate adapter, pd, etc!
912                  * Re-determine all attributes still sane!
913                  * More stuff I haven't thought of!
914                  * Rrrgh!
915                  */
916                 if (ia->ri_id->device != id->device) {
917                         printk("RPC:       %s: can't reconnect on "
918                                 "different device!\n", __func__);
919                         rdma_destroy_id(id);
920                         rc = -ENETUNREACH;
921                         goto out;
922                 }
923                 /* END TEMP */
924                 rc = rdma_create_qp(id, ia->ri_pd, &ep->rep_attr);
925                 if (rc) {
926                         dprintk("RPC:       %s: rdma_create_qp failed %i\n",
927                                 __func__, rc);
928                         rdma_destroy_id(id);
929                         rc = -ENETUNREACH;
930                         goto out;
931                 }
932
933                 write_lock(&ia->ri_qplock);
934                 old = ia->ri_id;
935                 ia->ri_id = id;
936                 write_unlock(&ia->ri_qplock);
937
938                 rdma_destroy_qp(old);
939                 rdma_destroy_id(old);
940         } else {
941                 dprintk("RPC:       %s: connecting...\n", __func__);
942                 rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr);
943                 if (rc) {
944                         dprintk("RPC:       %s: rdma_create_qp failed %i\n",
945                                 __func__, rc);
946                         /* do not update ep->rep_connected */
947                         return -ENETUNREACH;
948                 }
949         }
950
951         ep->rep_connected = 0;
952
953         rc = rdma_connect(ia->ri_id, &ep->rep_remote_cma);
954         if (rc) {
955                 dprintk("RPC:       %s: rdma_connect() failed with %i\n",
956                                 __func__, rc);
957                 goto out;
958         }
959
960         wait_event_interruptible(ep->rep_connect_wait, ep->rep_connected != 0);
961
962         /*
963          * Check state. A non-peer reject indicates no listener
964          * (ECONNREFUSED), which may be a transient state. All
965          * others indicate a transport condition which has already
966          * undergone a best-effort.
967          */
968         if (ep->rep_connected == -ECONNREFUSED &&
969             ++retry_count <= RDMA_CONNECT_RETRY_MAX) {
970                 dprintk("RPC:       %s: non-peer_reject, retry\n", __func__);
971                 goto retry;
972         }
973         if (ep->rep_connected <= 0) {
974                 /* Sometimes, the only way to reliably connect to remote
975                  * CMs is to use same nonzero values for ORD and IRD. */
976                 if (retry_count++ <= RDMA_CONNECT_RETRY_MAX + 1 &&
977                     (ep->rep_remote_cma.responder_resources == 0 ||
978                      ep->rep_remote_cma.initiator_depth !=
979                                 ep->rep_remote_cma.responder_resources)) {
980                         if (ep->rep_remote_cma.responder_resources == 0)
981                                 ep->rep_remote_cma.responder_resources = 1;
982                         ep->rep_remote_cma.initiator_depth =
983                                 ep->rep_remote_cma.responder_resources;
984                         goto retry;
985                 }
986                 rc = ep->rep_connected;
987         } else {
988                 dprintk("RPC:       %s: connected\n", __func__);
989         }
990
991 out:
992         if (rc)
993                 ep->rep_connected = rc;
994         return rc;
995 }
996
997 /*
998  * rpcrdma_ep_disconnect
999  *
1000  * This is separate from destroy to facilitate the ability
1001  * to reconnect without recreating the endpoint.
1002  *
1003  * This call is not reentrant, and must not be made in parallel
1004  * on the same endpoint.
1005  */
1006 void
1007 rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
1008 {
1009         int rc;
1010
1011         rpcrdma_flush_cqs(ep);
1012         rc = rdma_disconnect(ia->ri_id);
1013         if (!rc) {
1014                 /* returns without wait if not connected */
1015                 wait_event_interruptible(ep->rep_connect_wait,
1016                                                         ep->rep_connected != 1);
1017                 dprintk("RPC:       %s: after wait, %sconnected\n", __func__,
1018                         (ep->rep_connected == 1) ? "still " : "dis");
1019         } else {
1020                 dprintk("RPC:       %s: rdma_disconnect %i\n", __func__, rc);
1021                 ep->rep_connected = rc;
1022         }
1023 }
1024
1025 static struct rpcrdma_req *
1026 rpcrdma_create_req(struct rpcrdma_xprt *r_xprt)
1027 {
1028         struct rpcrdma_req *req;
1029
1030         req = kzalloc(sizeof(*req), GFP_KERNEL);
1031         if (req == NULL)
1032                 return ERR_PTR(-ENOMEM);
1033
1034         req->rl_buffer = &r_xprt->rx_buf;
1035         return req;
1036 }
1037
1038 static struct rpcrdma_rep *
1039 rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt)
1040 {
1041         struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
1042         struct rpcrdma_ia *ia = &r_xprt->rx_ia;
1043         struct rpcrdma_rep *rep;
1044         int rc;
1045
1046         rc = -ENOMEM;
1047         rep = kzalloc(sizeof(*rep), GFP_KERNEL);
1048         if (rep == NULL)
1049                 goto out;
1050
1051         rep->rr_rdmabuf = rpcrdma_alloc_regbuf(ia, cdata->inline_rsize,
1052                                                GFP_KERNEL);
1053         if (IS_ERR(rep->rr_rdmabuf)) {
1054                 rc = PTR_ERR(rep->rr_rdmabuf);
1055                 goto out_free;
1056         }
1057
1058         rep->rr_rxprt = r_xprt;
1059         return rep;
1060
1061 out_free:
1062         kfree(rep);
1063 out:
1064         return ERR_PTR(rc);
1065 }
1066
1067 int
1068 rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
1069 {
1070         struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
1071         struct rpcrdma_ia *ia = &r_xprt->rx_ia;
1072         struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
1073         char *p;
1074         size_t len;
1075         int i, rc;
1076
1077         buf->rb_max_requests = cdata->max_requests;
1078         spin_lock_init(&buf->rb_lock);
1079
1080         /* Need to allocate:
1081          *   1.  arrays for send and recv pointers
1082          *   2.  arrays of struct rpcrdma_req to fill in pointers
1083          *   3.  array of struct rpcrdma_rep for replies
1084          * Send/recv buffers in req/rep need to be registered
1085          */
1086         len = buf->rb_max_requests *
1087                 (sizeof(struct rpcrdma_req *) + sizeof(struct rpcrdma_rep *));
1088
1089         p = kzalloc(len, GFP_KERNEL);
1090         if (p == NULL) {
1091                 dprintk("RPC:       %s: req_t/rep_t/pad kzalloc(%zd) failed\n",
1092                         __func__, len);
1093                 rc = -ENOMEM;
1094                 goto out;
1095         }
1096         buf->rb_pool = p;       /* for freeing it later */
1097
1098         buf->rb_send_bufs = (struct rpcrdma_req **) p;
1099         p = (char *) &buf->rb_send_bufs[buf->rb_max_requests];
1100         buf->rb_recv_bufs = (struct rpcrdma_rep **) p;
1101         p = (char *) &buf->rb_recv_bufs[buf->rb_max_requests];
1102
1103         rc = ia->ri_ops->ro_init(r_xprt);
1104         if (rc)
1105                 goto out;
1106
1107         for (i = 0; i < buf->rb_max_requests; i++) {
1108                 struct rpcrdma_req *req;
1109                 struct rpcrdma_rep *rep;
1110
1111                 req = rpcrdma_create_req(r_xprt);
1112                 if (IS_ERR(req)) {
1113                         dprintk("RPC:       %s: request buffer %d alloc"
1114                                 " failed\n", __func__, i);
1115                         rc = PTR_ERR(req);
1116                         goto out;
1117                 }
1118                 buf->rb_send_bufs[i] = req;
1119
1120                 rep = rpcrdma_create_rep(r_xprt);
1121                 if (IS_ERR(rep)) {
1122                         dprintk("RPC:       %s: reply buffer %d alloc failed\n",
1123                                 __func__, i);
1124                         rc = PTR_ERR(rep);
1125                         goto out;
1126                 }
1127                 buf->rb_recv_bufs[i] = rep;
1128         }
1129
1130         return 0;
1131 out:
1132         rpcrdma_buffer_destroy(buf);
1133         return rc;
1134 }
1135
1136 static void
1137 rpcrdma_destroy_rep(struct rpcrdma_ia *ia, struct rpcrdma_rep *rep)
1138 {
1139         if (!rep)
1140                 return;
1141
1142         rpcrdma_free_regbuf(ia, rep->rr_rdmabuf);
1143         kfree(rep);
1144 }
1145
1146 static void
1147 rpcrdma_destroy_req(struct rpcrdma_ia *ia, struct rpcrdma_req *req)
1148 {
1149         if (!req)
1150                 return;
1151
1152         rpcrdma_free_regbuf(ia, req->rl_sendbuf);
1153         rpcrdma_free_regbuf(ia, req->rl_rdmabuf);
1154         kfree(req);
1155 }
1156
1157 void
1158 rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
1159 {
1160         struct rpcrdma_ia *ia = rdmab_to_ia(buf);
1161         int i;
1162
1163         /* clean up in reverse order from create
1164          *   1.  recv mr memory (mr free, then kfree)
1165          *   2.  send mr memory (mr free, then kfree)
1166          *   3.  MWs
1167          */
1168         dprintk("RPC:       %s: entering\n", __func__);
1169
1170         for (i = 0; i < buf->rb_max_requests; i++) {
1171                 if (buf->rb_recv_bufs)
1172                         rpcrdma_destroy_rep(ia, buf->rb_recv_bufs[i]);
1173                 if (buf->rb_send_bufs)
1174                         rpcrdma_destroy_req(ia, buf->rb_send_bufs[i]);
1175         }
1176
1177         ia->ri_ops->ro_destroy(buf);
1178
1179         kfree(buf->rb_pool);
1180 }
1181
1182 /* "*mw" can be NULL when rpcrdma_buffer_get_mrs() fails, leaving
1183  * some req segments uninitialized.
1184  */
1185 static void
1186 rpcrdma_buffer_put_mr(struct rpcrdma_mw **mw, struct rpcrdma_buffer *buf)
1187 {
1188         if (*mw) {
1189                 list_add_tail(&(*mw)->mw_list, &buf->rb_mws);
1190                 *mw = NULL;
1191         }
1192 }
1193
1194 /* Cycle mw's back in reverse order, and "spin" them.
1195  * This delays and scrambles reuse as much as possible.
1196  */
1197 static void
1198 rpcrdma_buffer_put_mrs(struct rpcrdma_req *req, struct rpcrdma_buffer *buf)
1199 {
1200         struct rpcrdma_mr_seg *seg = req->rl_segments;
1201         struct rpcrdma_mr_seg *seg1 = seg;
1202         int i;
1203
1204         for (i = 1, seg++; i < RPCRDMA_MAX_SEGS; seg++, i++)
1205                 rpcrdma_buffer_put_mr(&seg->rl_mw, buf);
1206         rpcrdma_buffer_put_mr(&seg1->rl_mw, buf);
1207 }
1208
1209 static void
1210 rpcrdma_buffer_put_sendbuf(struct rpcrdma_req *req, struct rpcrdma_buffer *buf)
1211 {
1212         buf->rb_send_bufs[--buf->rb_send_index] = req;
1213         req->rl_niovs = 0;
1214         if (req->rl_reply) {
1215                 buf->rb_recv_bufs[--buf->rb_recv_index] = req->rl_reply;
1216                 req->rl_reply->rr_func = NULL;
1217                 req->rl_reply = NULL;
1218         }
1219 }
1220
1221 /* rpcrdma_unmap_one() was already done during deregistration.
1222  * Redo only the ib_post_send().
1223  */
1224 static void
1225 rpcrdma_retry_local_inv(struct rpcrdma_mw *r, struct rpcrdma_ia *ia)
1226 {
1227         struct rpcrdma_xprt *r_xprt =
1228                                 container_of(ia, struct rpcrdma_xprt, rx_ia);
1229         struct ib_send_wr invalidate_wr, *bad_wr;
1230         int rc;
1231
1232         dprintk("RPC:       %s: FRMR %p is stale\n", __func__, r);
1233
1234         /* When this FRMR is re-inserted into rb_mws, it is no longer stale */
1235         r->r.frmr.fr_state = FRMR_IS_INVALID;
1236
1237         memset(&invalidate_wr, 0, sizeof(invalidate_wr));
1238         invalidate_wr.wr_id = (unsigned long)(void *)r;
1239         invalidate_wr.opcode = IB_WR_LOCAL_INV;
1240         invalidate_wr.ex.invalidate_rkey = r->r.frmr.fr_mr->rkey;
1241         DECR_CQCOUNT(&r_xprt->rx_ep);
1242
1243         dprintk("RPC:       %s: frmr %p invalidating rkey %08x\n",
1244                 __func__, r, r->r.frmr.fr_mr->rkey);
1245
1246         read_lock(&ia->ri_qplock);
1247         rc = ib_post_send(ia->ri_id->qp, &invalidate_wr, &bad_wr);
1248         read_unlock(&ia->ri_qplock);
1249         if (rc) {
1250                 /* Force rpcrdma_buffer_get() to retry */
1251                 r->r.frmr.fr_state = FRMR_IS_STALE;
1252                 dprintk("RPC:       %s: ib_post_send failed, %i\n",
1253                         __func__, rc);
1254         }
1255 }
1256
1257 static void
1258 rpcrdma_retry_flushed_linv(struct list_head *stale,
1259                            struct rpcrdma_buffer *buf)
1260 {
1261         struct rpcrdma_ia *ia = rdmab_to_ia(buf);
1262         struct list_head *pos;
1263         struct rpcrdma_mw *r;
1264         unsigned long flags;
1265
1266         list_for_each(pos, stale) {
1267                 r = list_entry(pos, struct rpcrdma_mw, mw_list);
1268                 rpcrdma_retry_local_inv(r, ia);
1269         }
1270
1271         spin_lock_irqsave(&buf->rb_lock, flags);
1272         list_splice_tail(stale, &buf->rb_mws);
1273         spin_unlock_irqrestore(&buf->rb_lock, flags);
1274 }
1275
1276 static struct rpcrdma_req *
1277 rpcrdma_buffer_get_frmrs(struct rpcrdma_req *req, struct rpcrdma_buffer *buf,
1278                          struct list_head *stale)
1279 {
1280         struct rpcrdma_mw *r;
1281         int i;
1282
1283         i = RPCRDMA_MAX_SEGS - 1;
1284         while (!list_empty(&buf->rb_mws)) {
1285                 r = list_entry(buf->rb_mws.next,
1286                                struct rpcrdma_mw, mw_list);
1287                 list_del(&r->mw_list);
1288                 if (r->r.frmr.fr_state == FRMR_IS_STALE) {
1289                         list_add(&r->mw_list, stale);
1290                         continue;
1291                 }
1292                 req->rl_segments[i].rl_mw = r;
1293                 if (unlikely(i-- == 0))
1294                         return req;     /* Success */
1295         }
1296
1297         /* Not enough entries on rb_mws for this req */
1298         rpcrdma_buffer_put_sendbuf(req, buf);
1299         rpcrdma_buffer_put_mrs(req, buf);
1300         return NULL;
1301 }
1302
1303 static struct rpcrdma_req *
1304 rpcrdma_buffer_get_fmrs(struct rpcrdma_req *req, struct rpcrdma_buffer *buf)
1305 {
1306         struct rpcrdma_mw *r;
1307         int i;
1308
1309         i = RPCRDMA_MAX_SEGS - 1;
1310         while (!list_empty(&buf->rb_mws)) {
1311                 r = list_entry(buf->rb_mws.next,
1312                                struct rpcrdma_mw, mw_list);
1313                 list_del(&r->mw_list);
1314                 req->rl_segments[i].rl_mw = r;
1315                 if (unlikely(i-- == 0))
1316                         return req;     /* Success */
1317         }
1318
1319         /* Not enough entries on rb_mws for this req */
1320         rpcrdma_buffer_put_sendbuf(req, buf);
1321         rpcrdma_buffer_put_mrs(req, buf);
1322         return NULL;
1323 }
1324
1325 /*
1326  * Get a set of request/reply buffers.
1327  *
1328  * Reply buffer (if needed) is attached to send buffer upon return.
1329  * Rule:
1330  *    rb_send_index and rb_recv_index MUST always be pointing to the
1331  *    *next* available buffer (non-NULL). They are incremented after
1332  *    removing buffers, and decremented *before* returning them.
1333  */
1334 struct rpcrdma_req *
1335 rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
1336 {
1337         struct rpcrdma_ia *ia = rdmab_to_ia(buffers);
1338         struct list_head stale;
1339         struct rpcrdma_req *req;
1340         unsigned long flags;
1341
1342         spin_lock_irqsave(&buffers->rb_lock, flags);
1343         if (buffers->rb_send_index == buffers->rb_max_requests) {
1344                 spin_unlock_irqrestore(&buffers->rb_lock, flags);
1345                 dprintk("RPC:       %s: out of request buffers\n", __func__);
1346                 return ((struct rpcrdma_req *)NULL);
1347         }
1348
1349         req = buffers->rb_send_bufs[buffers->rb_send_index];
1350         if (buffers->rb_send_index < buffers->rb_recv_index) {
1351                 dprintk("RPC:       %s: %d extra receives outstanding (ok)\n",
1352                         __func__,
1353                         buffers->rb_recv_index - buffers->rb_send_index);
1354                 req->rl_reply = NULL;
1355         } else {
1356                 req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
1357                 buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
1358         }
1359         buffers->rb_send_bufs[buffers->rb_send_index++] = NULL;
1360
1361         INIT_LIST_HEAD(&stale);
1362         switch (ia->ri_memreg_strategy) {
1363         case RPCRDMA_FRMR:
1364                 req = rpcrdma_buffer_get_frmrs(req, buffers, &stale);
1365                 break;
1366         case RPCRDMA_MTHCAFMR:
1367                 req = rpcrdma_buffer_get_fmrs(req, buffers);
1368                 break;
1369         default:
1370                 break;
1371         }
1372         spin_unlock_irqrestore(&buffers->rb_lock, flags);
1373         if (!list_empty(&stale))
1374                 rpcrdma_retry_flushed_linv(&stale, buffers);
1375         return req;
1376 }
1377
1378 /*
1379  * Put request/reply buffers back into pool.
1380  * Pre-decrement counter/array index.
1381  */
1382 void
1383 rpcrdma_buffer_put(struct rpcrdma_req *req)
1384 {
1385         struct rpcrdma_buffer *buffers = req->rl_buffer;
1386         struct rpcrdma_ia *ia = rdmab_to_ia(buffers);
1387         unsigned long flags;
1388
1389         spin_lock_irqsave(&buffers->rb_lock, flags);
1390         rpcrdma_buffer_put_sendbuf(req, buffers);
1391         switch (ia->ri_memreg_strategy) {
1392         case RPCRDMA_FRMR:
1393         case RPCRDMA_MTHCAFMR:
1394                 rpcrdma_buffer_put_mrs(req, buffers);
1395                 break;
1396         default:
1397                 break;
1398         }
1399         spin_unlock_irqrestore(&buffers->rb_lock, flags);
1400 }
1401
1402 /*
1403  * Recover reply buffers from pool.
1404  * This happens when recovering from error conditions.
1405  * Post-increment counter/array index.
1406  */
1407 void
1408 rpcrdma_recv_buffer_get(struct rpcrdma_req *req)
1409 {
1410         struct rpcrdma_buffer *buffers = req->rl_buffer;
1411         unsigned long flags;
1412
1413         spin_lock_irqsave(&buffers->rb_lock, flags);
1414         if (buffers->rb_recv_index < buffers->rb_max_requests) {
1415                 req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
1416                 buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
1417         }
1418         spin_unlock_irqrestore(&buffers->rb_lock, flags);
1419 }
1420
1421 /*
1422  * Put reply buffers back into pool when not attached to
1423  * request. This happens in error conditions.
1424  */
1425 void
1426 rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
1427 {
1428         struct rpcrdma_buffer *buffers = &rep->rr_rxprt->rx_buf;
1429         unsigned long flags;
1430
1431         rep->rr_func = NULL;
1432         spin_lock_irqsave(&buffers->rb_lock, flags);
1433         buffers->rb_recv_bufs[--buffers->rb_recv_index] = rep;
1434         spin_unlock_irqrestore(&buffers->rb_lock, flags);
1435 }
1436
1437 /*
1438  * Wrappers for internal-use kmalloc memory registration, used by buffer code.
1439  */
1440
1441 void
1442 rpcrdma_mapping_error(struct rpcrdma_mr_seg *seg)
1443 {
1444         dprintk("RPC:       map_one: offset %p iova %llx len %zu\n",
1445                 seg->mr_offset,
1446                 (unsigned long long)seg->mr_dma, seg->mr_dmalen);
1447 }
1448
1449 static int
1450 rpcrdma_register_internal(struct rpcrdma_ia *ia, void *va, int len,
1451                                 struct ib_mr **mrp, struct ib_sge *iov)
1452 {
1453         struct ib_phys_buf ipb;
1454         struct ib_mr *mr;
1455         int rc;
1456
1457         /*
1458          * All memory passed here was kmalloc'ed, therefore phys-contiguous.
1459          */
1460         iov->addr = ib_dma_map_single(ia->ri_id->device,
1461                         va, len, DMA_BIDIRECTIONAL);
1462         if (ib_dma_mapping_error(ia->ri_id->device, iov->addr))
1463                 return -ENOMEM;
1464
1465         iov->length = len;
1466
1467         if (ia->ri_have_dma_lkey) {
1468                 *mrp = NULL;
1469                 iov->lkey = ia->ri_dma_lkey;
1470                 return 0;
1471         } else if (ia->ri_bind_mem != NULL) {
1472                 *mrp = NULL;
1473                 iov->lkey = ia->ri_bind_mem->lkey;
1474                 return 0;
1475         }
1476
1477         ipb.addr = iov->addr;
1478         ipb.size = iov->length;
1479         mr = ib_reg_phys_mr(ia->ri_pd, &ipb, 1,
1480                         IB_ACCESS_LOCAL_WRITE, &iov->addr);
1481
1482         dprintk("RPC:       %s: phys convert: 0x%llx "
1483                         "registered 0x%llx length %d\n",
1484                         __func__, (unsigned long long)ipb.addr,
1485                         (unsigned long long)iov->addr, len);
1486
1487         if (IS_ERR(mr)) {
1488                 *mrp = NULL;
1489                 rc = PTR_ERR(mr);
1490                 dprintk("RPC:       %s: failed with %i\n", __func__, rc);
1491         } else {
1492                 *mrp = mr;
1493                 iov->lkey = mr->lkey;
1494                 rc = 0;
1495         }
1496
1497         return rc;
1498 }
1499
1500 static int
1501 rpcrdma_deregister_internal(struct rpcrdma_ia *ia,
1502                                 struct ib_mr *mr, struct ib_sge *iov)
1503 {
1504         int rc;
1505
1506         ib_dma_unmap_single(ia->ri_id->device,
1507                         iov->addr, iov->length, DMA_BIDIRECTIONAL);
1508
1509         if (NULL == mr)
1510                 return 0;
1511
1512         rc = ib_dereg_mr(mr);
1513         if (rc)
1514                 dprintk("RPC:       %s: ib_dereg_mr failed %i\n", __func__, rc);
1515         return rc;
1516 }
1517
1518 /**
1519  * rpcrdma_alloc_regbuf - kmalloc and register memory for SEND/RECV buffers
1520  * @ia: controlling rpcrdma_ia
1521  * @size: size of buffer to be allocated, in bytes
1522  * @flags: GFP flags
1523  *
1524  * Returns pointer to private header of an area of internally
1525  * registered memory, or an ERR_PTR. The registered buffer follows
1526  * the end of the private header.
1527  *
1528  * xprtrdma uses a regbuf for posting an outgoing RDMA SEND, or for
1529  * receiving the payload of RDMA RECV operations. regbufs are not
1530  * used for RDMA READ/WRITE operations, thus are registered only for
1531  * LOCAL access.
1532  */
1533 struct rpcrdma_regbuf *
1534 rpcrdma_alloc_regbuf(struct rpcrdma_ia *ia, size_t size, gfp_t flags)
1535 {
1536         struct rpcrdma_regbuf *rb;
1537         int rc;
1538
1539         rc = -ENOMEM;
1540         rb = kmalloc(sizeof(*rb) + size, flags);
1541         if (rb == NULL)
1542                 goto out;
1543
1544         rb->rg_size = size;
1545         rb->rg_owner = NULL;
1546         rc = rpcrdma_register_internal(ia, rb->rg_base, size,
1547                                        &rb->rg_mr, &rb->rg_iov);
1548         if (rc)
1549                 goto out_free;
1550
1551         return rb;
1552
1553 out_free:
1554         kfree(rb);
1555 out:
1556         return ERR_PTR(rc);
1557 }
1558
1559 /**
1560  * rpcrdma_free_regbuf - deregister and free registered buffer
1561  * @ia: controlling rpcrdma_ia
1562  * @rb: regbuf to be deregistered and freed
1563  */
1564 void
1565 rpcrdma_free_regbuf(struct rpcrdma_ia *ia, struct rpcrdma_regbuf *rb)
1566 {
1567         if (rb) {
1568                 rpcrdma_deregister_internal(ia, rb->rg_mr, &rb->rg_iov);
1569                 kfree(rb);
1570         }
1571 }
1572
1573 /*
1574  * Prepost any receive buffer, then post send.
1575  *
1576  * Receive buffer is donated to hardware, reclaimed upon recv completion.
1577  */
1578 int
1579 rpcrdma_ep_post(struct rpcrdma_ia *ia,
1580                 struct rpcrdma_ep *ep,
1581                 struct rpcrdma_req *req)
1582 {
1583         struct ib_send_wr send_wr, *send_wr_fail;
1584         struct rpcrdma_rep *rep = req->rl_reply;
1585         int rc;
1586
1587         if (rep) {
1588                 rc = rpcrdma_ep_post_recv(ia, ep, rep);
1589                 if (rc)
1590                         goto out;
1591                 req->rl_reply = NULL;
1592         }
1593
1594         send_wr.next = NULL;
1595         send_wr.wr_id = RPCRDMA_IGNORE_COMPLETION;
1596         send_wr.sg_list = req->rl_send_iov;
1597         send_wr.num_sge = req->rl_niovs;
1598         send_wr.opcode = IB_WR_SEND;
1599         if (send_wr.num_sge == 4)       /* no need to sync any pad (constant) */
1600                 ib_dma_sync_single_for_device(ia->ri_id->device,
1601                         req->rl_send_iov[3].addr, req->rl_send_iov[3].length,
1602                         DMA_TO_DEVICE);
1603         ib_dma_sync_single_for_device(ia->ri_id->device,
1604                 req->rl_send_iov[1].addr, req->rl_send_iov[1].length,
1605                 DMA_TO_DEVICE);
1606         ib_dma_sync_single_for_device(ia->ri_id->device,
1607                 req->rl_send_iov[0].addr, req->rl_send_iov[0].length,
1608                 DMA_TO_DEVICE);
1609
1610         if (DECR_CQCOUNT(ep) > 0)
1611                 send_wr.send_flags = 0;
1612         else { /* Provider must take a send completion every now and then */
1613                 INIT_CQCOUNT(ep);
1614                 send_wr.send_flags = IB_SEND_SIGNALED;
1615         }
1616
1617         rc = ib_post_send(ia->ri_id->qp, &send_wr, &send_wr_fail);
1618         if (rc)
1619                 dprintk("RPC:       %s: ib_post_send returned %i\n", __func__,
1620                         rc);
1621 out:
1622         return rc;
1623 }
1624
1625 /*
1626  * (Re)post a receive buffer.
1627  */
1628 int
1629 rpcrdma_ep_post_recv(struct rpcrdma_ia *ia,
1630                      struct rpcrdma_ep *ep,
1631                      struct rpcrdma_rep *rep)
1632 {
1633         struct ib_recv_wr recv_wr, *recv_wr_fail;
1634         int rc;
1635
1636         recv_wr.next = NULL;
1637         recv_wr.wr_id = (u64) (unsigned long) rep;
1638         recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov;
1639         recv_wr.num_sge = 1;
1640
1641         ib_dma_sync_single_for_cpu(ia->ri_id->device,
1642                                    rdmab_addr(rep->rr_rdmabuf),
1643                                    rdmab_length(rep->rr_rdmabuf),
1644                                    DMA_BIDIRECTIONAL);
1645
1646         rc = ib_post_recv(ia->ri_id->qp, &recv_wr, &recv_wr_fail);
1647
1648         if (rc)
1649                 dprintk("RPC:       %s: ib_post_recv returned %i\n", __func__,
1650                         rc);
1651         return rc;
1652 }
1653
1654 /* How many chunk list items fit within our inline buffers?
1655  */
1656 unsigned int
1657 rpcrdma_max_segments(struct rpcrdma_xprt *r_xprt)
1658 {
1659         struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
1660         int bytes, segments;
1661
1662         bytes = min_t(unsigned int, cdata->inline_wsize, cdata->inline_rsize);
1663         bytes -= RPCRDMA_HDRLEN_MIN;
1664         if (bytes < sizeof(struct rpcrdma_segment) * 2) {
1665                 pr_warn("RPC:       %s: inline threshold too small\n",
1666                         __func__);
1667                 return 0;
1668         }
1669
1670         segments = 1 << (fls(bytes / sizeof(struct rpcrdma_segment)) - 1);
1671         dprintk("RPC:       %s: max chunk list size = %d segments\n",
1672                 __func__, segments);
1673         return segments;
1674 }