2 * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
4 * This software is available to you under a choice of one of two
5 * licenses. You may choose to be licensed under the terms of the GNU
6 * General Public License (GPL) Version 2, available from the file
7 * COPYING in the main directory of this source tree, or the BSD-type
10 * Redistribution and use in source and binary forms, with or without
11 * modification, are permitted provided that the following conditions
14 * Redistributions of source code must retain the above copyright
15 * notice, this list of conditions and the following disclaimer.
17 * Redistributions in binary form must reproduce the above
18 * copyright notice, this list of conditions and the following
19 * disclaimer in the documentation and/or other materials provided
20 * with the distribution.
22 * Neither the name of the Network Appliance, Inc. nor the names of
23 * its contributors may be used to endorse or promote products
24 * derived from this software without specific prior written
27 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
28 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
29 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
30 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
31 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
32 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
33 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
34 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
35 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
36 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
37 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
43 * Encapsulates the major functions managing:
50 #include <linux/interrupt.h>
51 #include <linux/slab.h>
52 #include <linux/prefetch.h>
53 #include <linux/sunrpc/addr.h>
54 #include <asm/bitops.h>
56 #include "xprt_rdma.h"
62 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
63 # define RPCDBG_FACILITY RPCDBG_TRANS
71 * handle replies in tasklet context, using a single, global list
72 * rdma tasklet function -- just turn around and call the func
73 * for all replies on the list
76 static DEFINE_SPINLOCK(rpcrdma_tk_lock_g);
77 static LIST_HEAD(rpcrdma_tasklets_g);
80 rpcrdma_run_tasklet(unsigned long data)
82 struct rpcrdma_rep *rep;
83 void (*func)(struct rpcrdma_rep *);
87 spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
88 while (!list_empty(&rpcrdma_tasklets_g)) {
89 rep = list_entry(rpcrdma_tasklets_g.next,
90 struct rpcrdma_rep, rr_list);
91 list_del(&rep->rr_list);
94 spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
99 rpcrdma_recv_buffer_put(rep);
101 spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
103 spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
106 static DECLARE_TASKLET(rpcrdma_tasklet_g, rpcrdma_run_tasklet, 0UL);
108 static const char * const async_event[] = {
113 "communication established",
114 "send queue drained",
115 "path migration successful",
117 "device fatal error",
130 #define ASYNC_MSG(status) \
131 ((status) < ARRAY_SIZE(async_event) ? \
132 async_event[(status)] : "unknown async error")
135 rpcrdma_schedule_tasklet(struct list_head *sched_list)
139 spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
140 list_splice_tail(sched_list, &rpcrdma_tasklets_g);
141 spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
142 tasklet_schedule(&rpcrdma_tasklet_g);
146 rpcrdma_qp_async_error_upcall(struct ib_event *event, void *context)
148 struct rpcrdma_ep *ep = context;
150 pr_err("RPC: %s: %s on device %s ep %p\n",
151 __func__, ASYNC_MSG(event->event),
152 event->device->name, context);
153 if (ep->rep_connected == 1) {
154 ep->rep_connected = -EIO;
155 rpcrdma_conn_func(ep);
156 wake_up_all(&ep->rep_connect_wait);
161 rpcrdma_cq_async_error_upcall(struct ib_event *event, void *context)
163 struct rpcrdma_ep *ep = context;
165 pr_err("RPC: %s: %s on device %s ep %p\n",
166 __func__, ASYNC_MSG(event->event),
167 event->device->name, context);
168 if (ep->rep_connected == 1) {
169 ep->rep_connected = -EIO;
170 rpcrdma_conn_func(ep);
171 wake_up_all(&ep->rep_connect_wait);
175 static const char * const wc_status[] = {
177 "local length error",
178 "local QP operation error",
179 "local EE context operation error",
180 "local protection error",
182 "memory management operation error",
183 "bad response error",
184 "local access error",
185 "remote invalid request error",
186 "remote access error",
187 "remote operation error",
188 "transport retry counter exceeded",
189 "RNR retrycounter exceeded",
190 "local RDD violation error",
191 "remove invalid RD request",
193 "invalid EE context number",
194 "invalid EE context state",
196 "response timeout error",
200 #define COMPLETION_MSG(status) \
201 ((status) < ARRAY_SIZE(wc_status) ? \
202 wc_status[(status)] : "unexpected completion error")
205 rpcrdma_sendcq_process_wc(struct ib_wc *wc)
207 if (likely(wc->status == IB_WC_SUCCESS))
210 /* WARNING: Only wr_id and status are reliable at this point */
211 if (wc->wr_id == 0ULL) {
212 if (wc->status != IB_WC_WR_FLUSH_ERR)
213 pr_err("RPC: %s: SEND: %s\n",
214 __func__, COMPLETION_MSG(wc->status));
216 struct rpcrdma_mw *r;
218 r = (struct rpcrdma_mw *)(unsigned long)wc->wr_id;
219 r->r.frmr.fr_state = FRMR_IS_STALE;
220 pr_err("RPC: %s: frmr %p (stale): %s\n",
221 __func__, r, COMPLETION_MSG(wc->status));
226 rpcrdma_sendcq_poll(struct ib_cq *cq, struct rpcrdma_ep *ep)
229 int budget, count, rc;
231 budget = RPCRDMA_WC_BUDGET / RPCRDMA_POLLSIZE;
233 wcs = ep->rep_send_wcs;
235 rc = ib_poll_cq(cq, RPCRDMA_POLLSIZE, wcs);
241 rpcrdma_sendcq_process_wc(wcs++);
242 } while (rc == RPCRDMA_POLLSIZE && --budget);
247 * Handle send, fast_reg_mr, and local_inv completions.
249 * Send events are typically suppressed and thus do not result
250 * in an upcall. Occasionally one is signaled, however. This
251 * prevents the provider's completion queue from wrapping and
252 * losing a completion.
255 rpcrdma_sendcq_upcall(struct ib_cq *cq, void *cq_context)
257 struct rpcrdma_ep *ep = (struct rpcrdma_ep *)cq_context;
260 rc = rpcrdma_sendcq_poll(cq, ep);
262 dprintk("RPC: %s: ib_poll_cq failed: %i\n",
267 rc = ib_req_notify_cq(cq,
268 IB_CQ_NEXT_COMP | IB_CQ_REPORT_MISSED_EVENTS);
272 dprintk("RPC: %s: ib_req_notify_cq failed: %i\n",
277 rpcrdma_sendcq_poll(cq, ep);
281 rpcrdma_recvcq_process_wc(struct ib_wc *wc, struct list_head *sched_list)
283 struct rpcrdma_rep *rep =
284 (struct rpcrdma_rep *)(unsigned long)wc->wr_id;
286 /* WARNING: Only wr_id and status are reliable at this point */
287 if (wc->status != IB_WC_SUCCESS)
290 /* status == SUCCESS means all fields in wc are trustworthy */
291 if (wc->opcode != IB_WC_RECV)
294 dprintk("RPC: %s: rep %p opcode 'recv', length %u: success\n",
295 __func__, rep, wc->byte_len);
297 rep->rr_len = wc->byte_len;
298 ib_dma_sync_single_for_cpu(rdmab_to_ia(rep->rr_buffer)->ri_id->device,
299 rdmab_addr(rep->rr_rdmabuf),
300 rep->rr_len, DMA_FROM_DEVICE);
301 prefetch(rdmab_to_msg(rep->rr_rdmabuf));
304 list_add_tail(&rep->rr_list, sched_list);
307 if (wc->status != IB_WC_WR_FLUSH_ERR)
308 pr_err("RPC: %s: rep %p: %s\n",
309 __func__, rep, COMPLETION_MSG(wc->status));
315 rpcrdma_recvcq_poll(struct ib_cq *cq, struct rpcrdma_ep *ep)
317 struct list_head sched_list;
319 int budget, count, rc;
321 INIT_LIST_HEAD(&sched_list);
322 budget = RPCRDMA_WC_BUDGET / RPCRDMA_POLLSIZE;
324 wcs = ep->rep_recv_wcs;
326 rc = ib_poll_cq(cq, RPCRDMA_POLLSIZE, wcs);
332 rpcrdma_recvcq_process_wc(wcs++, &sched_list);
333 } while (rc == RPCRDMA_POLLSIZE && --budget);
337 rpcrdma_schedule_tasklet(&sched_list);
342 * Handle receive completions.
344 * It is reentrant but processes single events in order to maintain
345 * ordering of receives to keep server credits.
347 * It is the responsibility of the scheduled tasklet to return
348 * recv buffers to the pool. NOTE: this affects synchronization of
349 * connection shutdown. That is, the structures required for
350 * the completion of the reply handler must remain intact until
351 * all memory has been reclaimed.
354 rpcrdma_recvcq_upcall(struct ib_cq *cq, void *cq_context)
356 struct rpcrdma_ep *ep = (struct rpcrdma_ep *)cq_context;
359 rc = rpcrdma_recvcq_poll(cq, ep);
361 dprintk("RPC: %s: ib_poll_cq failed: %i\n",
366 rc = ib_req_notify_cq(cq,
367 IB_CQ_NEXT_COMP | IB_CQ_REPORT_MISSED_EVENTS);
371 dprintk("RPC: %s: ib_req_notify_cq failed: %i\n",
376 rpcrdma_recvcq_poll(cq, ep);
380 rpcrdma_flush_cqs(struct rpcrdma_ep *ep)
383 LIST_HEAD(sched_list);
385 while (ib_poll_cq(ep->rep_attr.recv_cq, 1, &wc) > 0)
386 rpcrdma_recvcq_process_wc(&wc, &sched_list);
387 if (!list_empty(&sched_list))
388 rpcrdma_schedule_tasklet(&sched_list);
389 while (ib_poll_cq(ep->rep_attr.send_cq, 1, &wc) > 0)
390 rpcrdma_sendcq_process_wc(&wc);
393 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
394 static const char * const conn[] = {
413 #define CONNECTION_MSG(status) \
414 ((status) < ARRAY_SIZE(conn) ? \
415 conn[(status)] : "unrecognized connection error")
419 rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
421 struct rpcrdma_xprt *xprt = id->context;
422 struct rpcrdma_ia *ia = &xprt->rx_ia;
423 struct rpcrdma_ep *ep = &xprt->rx_ep;
424 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
425 struct sockaddr *sap = (struct sockaddr *)&ep->rep_remote_addr;
427 struct ib_qp_attr *attr = &ia->ri_qp_attr;
428 struct ib_qp_init_attr *iattr = &ia->ri_qp_init_attr;
431 switch (event->event) {
432 case RDMA_CM_EVENT_ADDR_RESOLVED:
433 case RDMA_CM_EVENT_ROUTE_RESOLVED:
435 complete(&ia->ri_done);
437 case RDMA_CM_EVENT_ADDR_ERROR:
438 ia->ri_async_rc = -EHOSTUNREACH;
439 dprintk("RPC: %s: CM address resolution error, ep 0x%p\n",
441 complete(&ia->ri_done);
443 case RDMA_CM_EVENT_ROUTE_ERROR:
444 ia->ri_async_rc = -ENETUNREACH;
445 dprintk("RPC: %s: CM route resolution error, ep 0x%p\n",
447 complete(&ia->ri_done);
449 case RDMA_CM_EVENT_ESTABLISHED:
451 ib_query_qp(ia->ri_id->qp, attr,
452 IB_QP_MAX_QP_RD_ATOMIC | IB_QP_MAX_DEST_RD_ATOMIC,
454 dprintk("RPC: %s: %d responder resources"
456 __func__, attr->max_dest_rd_atomic,
457 attr->max_rd_atomic);
459 case RDMA_CM_EVENT_CONNECT_ERROR:
460 connstate = -ENOTCONN;
462 case RDMA_CM_EVENT_UNREACHABLE:
463 connstate = -ENETDOWN;
465 case RDMA_CM_EVENT_REJECTED:
466 connstate = -ECONNREFUSED;
468 case RDMA_CM_EVENT_DISCONNECTED:
469 connstate = -ECONNABORTED;
471 case RDMA_CM_EVENT_DEVICE_REMOVAL:
474 dprintk("RPC: %s: %sconnected\n",
475 __func__, connstate > 0 ? "" : "dis");
476 ep->rep_connected = connstate;
477 rpcrdma_conn_func(ep);
478 wake_up_all(&ep->rep_connect_wait);
481 dprintk("RPC: %s: %pIS:%u (ep 0x%p): %s\n",
482 __func__, sap, rpc_get_port(sap), ep,
483 CONNECTION_MSG(event->event));
487 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
488 if (connstate == 1) {
489 int ird = attr->max_dest_rd_atomic;
490 int tird = ep->rep_remote_cma.responder_resources;
492 pr_info("rpcrdma: connection to %pIS:%u on %s, memreg '%s', %d credits, %d responders%s\n",
493 sap, rpc_get_port(sap),
494 ia->ri_id->device->name,
495 ia->ri_ops->ro_displayname,
496 xprt->rx_buf.rb_max_requests,
497 ird, ird < 4 && ird < tird / 2 ? " (low!)" : "");
498 } else if (connstate < 0) {
499 pr_info("rpcrdma: connection to %pIS:%u closed (%d)\n",
500 sap, rpc_get_port(sap), connstate);
507 static struct rdma_cm_id *
508 rpcrdma_create_id(struct rpcrdma_xprt *xprt,
509 struct rpcrdma_ia *ia, struct sockaddr *addr)
511 struct rdma_cm_id *id;
514 init_completion(&ia->ri_done);
516 id = rdma_create_id(rpcrdma_conn_upcall, xprt, RDMA_PS_TCP, IB_QPT_RC);
519 dprintk("RPC: %s: rdma_create_id() failed %i\n",
524 ia->ri_async_rc = -ETIMEDOUT;
525 rc = rdma_resolve_addr(id, NULL, addr, RDMA_RESOLVE_TIMEOUT);
527 dprintk("RPC: %s: rdma_resolve_addr() failed %i\n",
531 wait_for_completion_interruptible_timeout(&ia->ri_done,
532 msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1);
533 rc = ia->ri_async_rc;
537 ia->ri_async_rc = -ETIMEDOUT;
538 rc = rdma_resolve_route(id, RDMA_RESOLVE_TIMEOUT);
540 dprintk("RPC: %s: rdma_resolve_route() failed %i\n",
544 wait_for_completion_interruptible_timeout(&ia->ri_done,
545 msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1);
546 rc = ia->ri_async_rc;
558 * Drain any cq, prior to teardown.
561 rpcrdma_clean_cq(struct ib_cq *cq)
566 while (1 == ib_poll_cq(cq, 1, &wc))
570 dprintk("RPC: %s: flushed %d events (last 0x%x)\n",
571 __func__, count, wc.opcode);
575 * Exported functions.
579 * Open and initialize an Interface Adapter.
580 * o initializes fields of struct rpcrdma_ia, including
581 * interface and provider attributes and protection zone.
584 rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)
587 struct rpcrdma_ia *ia = &xprt->rx_ia;
588 struct ib_device_attr *devattr = &ia->ri_devattr;
590 ia->ri_id = rpcrdma_create_id(xprt, ia, addr);
591 if (IS_ERR(ia->ri_id)) {
592 rc = PTR_ERR(ia->ri_id);
596 ia->ri_pd = ib_alloc_pd(ia->ri_id->device);
597 if (IS_ERR(ia->ri_pd)) {
598 rc = PTR_ERR(ia->ri_pd);
599 dprintk("RPC: %s: ib_alloc_pd() failed %i\n",
604 rc = ib_query_device(ia->ri_id->device, devattr);
606 dprintk("RPC: %s: ib_query_device failed %d\n",
611 if (devattr->device_cap_flags & IB_DEVICE_LOCAL_DMA_LKEY) {
612 ia->ri_have_dma_lkey = 1;
613 ia->ri_dma_lkey = ia->ri_id->device->local_dma_lkey;
616 if (memreg == RPCRDMA_FRMR) {
617 /* Requires both frmr reg and local dma lkey */
618 if (((devattr->device_cap_flags &
619 (IB_DEVICE_MEM_MGT_EXTENSIONS|IB_DEVICE_LOCAL_DMA_LKEY)) !=
620 (IB_DEVICE_MEM_MGT_EXTENSIONS|IB_DEVICE_LOCAL_DMA_LKEY)) ||
621 (devattr->max_fast_reg_page_list_len == 0)) {
622 dprintk("RPC: %s: FRMR registration "
623 "not supported by HCA\n", __func__);
624 memreg = RPCRDMA_MTHCAFMR;
627 if (memreg == RPCRDMA_MTHCAFMR) {
628 if (!ia->ri_id->device->alloc_fmr) {
629 dprintk("RPC: %s: MTHCAFMR registration "
630 "not supported by HCA\n", __func__);
631 memreg = RPCRDMA_ALLPHYSICAL;
636 * Optionally obtain an underlying physical identity mapping in
637 * order to do a memory window-based bind. This base registration
638 * is protected from remote access - that is enabled only by binding
639 * for the specific bytes targeted during each RPC operation, and
640 * revoked after the corresponding completion similar to a storage
645 ia->ri_ops = &rpcrdma_frwr_memreg_ops;
647 case RPCRDMA_ALLPHYSICAL:
648 ia->ri_ops = &rpcrdma_physical_memreg_ops;
649 mem_priv = IB_ACCESS_LOCAL_WRITE |
650 IB_ACCESS_REMOTE_WRITE |
651 IB_ACCESS_REMOTE_READ;
653 case RPCRDMA_MTHCAFMR:
654 ia->ri_ops = &rpcrdma_fmr_memreg_ops;
655 if (ia->ri_have_dma_lkey)
657 mem_priv = IB_ACCESS_LOCAL_WRITE;
659 ia->ri_bind_mem = ib_get_dma_mr(ia->ri_pd, mem_priv);
660 if (IS_ERR(ia->ri_bind_mem)) {
661 printk(KERN_ALERT "%s: ib_get_dma_mr for "
662 "phys register failed with %lX\n",
663 __func__, PTR_ERR(ia->ri_bind_mem));
669 printk(KERN_ERR "RPC: Unsupported memory "
670 "registration mode: %d\n", memreg);
674 dprintk("RPC: %s: memory registration strategy is '%s'\n",
675 __func__, ia->ri_ops->ro_displayname);
677 /* Else will do memory reg/dereg for each chunk */
678 ia->ri_memreg_strategy = memreg;
680 rwlock_init(&ia->ri_qplock);
684 ib_dealloc_pd(ia->ri_pd);
687 rdma_destroy_id(ia->ri_id);
694 * Clean up/close an IA.
695 * o if event handles and PD have been initialized, free them.
699 rpcrdma_ia_close(struct rpcrdma_ia *ia)
703 dprintk("RPC: %s: entering\n", __func__);
704 if (ia->ri_bind_mem != NULL) {
705 rc = ib_dereg_mr(ia->ri_bind_mem);
706 dprintk("RPC: %s: ib_dereg_mr returned %i\n",
709 if (ia->ri_id != NULL && !IS_ERR(ia->ri_id)) {
711 rdma_destroy_qp(ia->ri_id);
712 rdma_destroy_id(ia->ri_id);
715 if (ia->ri_pd != NULL && !IS_ERR(ia->ri_pd)) {
716 rc = ib_dealloc_pd(ia->ri_pd);
717 dprintk("RPC: %s: ib_dealloc_pd returned %i\n",
723 * Create unconnected endpoint.
726 rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
727 struct rpcrdma_create_data_internal *cdata)
729 struct ib_device_attr *devattr = &ia->ri_devattr;
730 struct ib_cq *sendcq, *recvcq;
733 /* check provider's send/recv wr limits */
734 if (cdata->max_requests > devattr->max_qp_wr)
735 cdata->max_requests = devattr->max_qp_wr;
737 ep->rep_attr.event_handler = rpcrdma_qp_async_error_upcall;
738 ep->rep_attr.qp_context = ep;
739 ep->rep_attr.srq = NULL;
740 ep->rep_attr.cap.max_send_wr = cdata->max_requests;
741 rc = ia->ri_ops->ro_open(ia, ep, cdata);
744 ep->rep_attr.cap.max_recv_wr = cdata->max_requests;
745 ep->rep_attr.cap.max_send_sge = (cdata->padding ? 4 : 2);
746 ep->rep_attr.cap.max_recv_sge = 1;
747 ep->rep_attr.cap.max_inline_data = 0;
748 ep->rep_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
749 ep->rep_attr.qp_type = IB_QPT_RC;
750 ep->rep_attr.port_num = ~0;
752 if (cdata->padding) {
753 ep->rep_padbuf = rpcrdma_alloc_regbuf(ia, cdata->padding,
755 if (IS_ERR(ep->rep_padbuf))
756 return PTR_ERR(ep->rep_padbuf);
758 ep->rep_padbuf = NULL;
760 dprintk("RPC: %s: requested max: dtos: send %d recv %d; "
761 "iovs: send %d recv %d\n",
763 ep->rep_attr.cap.max_send_wr,
764 ep->rep_attr.cap.max_recv_wr,
765 ep->rep_attr.cap.max_send_sge,
766 ep->rep_attr.cap.max_recv_sge);
768 /* set trigger for requesting send completion */
769 ep->rep_cqinit = ep->rep_attr.cap.max_send_wr/2 - 1;
770 if (ep->rep_cqinit > RPCRDMA_MAX_UNSIGNALED_SENDS)
771 ep->rep_cqinit = RPCRDMA_MAX_UNSIGNALED_SENDS;
772 else if (ep->rep_cqinit <= 2)
775 init_waitqueue_head(&ep->rep_connect_wait);
776 INIT_DELAYED_WORK(&ep->rep_connect_worker, rpcrdma_connect_worker);
778 sendcq = ib_create_cq(ia->ri_id->device, rpcrdma_sendcq_upcall,
779 rpcrdma_cq_async_error_upcall, ep,
780 ep->rep_attr.cap.max_send_wr + 1, 0);
781 if (IS_ERR(sendcq)) {
782 rc = PTR_ERR(sendcq);
783 dprintk("RPC: %s: failed to create send CQ: %i\n",
788 rc = ib_req_notify_cq(sendcq, IB_CQ_NEXT_COMP);
790 dprintk("RPC: %s: ib_req_notify_cq failed: %i\n",
795 recvcq = ib_create_cq(ia->ri_id->device, rpcrdma_recvcq_upcall,
796 rpcrdma_cq_async_error_upcall, ep,
797 ep->rep_attr.cap.max_recv_wr + 1, 0);
798 if (IS_ERR(recvcq)) {
799 rc = PTR_ERR(recvcq);
800 dprintk("RPC: %s: failed to create recv CQ: %i\n",
805 rc = ib_req_notify_cq(recvcq, IB_CQ_NEXT_COMP);
807 dprintk("RPC: %s: ib_req_notify_cq failed: %i\n",
809 ib_destroy_cq(recvcq);
813 ep->rep_attr.send_cq = sendcq;
814 ep->rep_attr.recv_cq = recvcq;
816 /* Initialize cma parameters */
818 /* RPC/RDMA does not use private data */
819 ep->rep_remote_cma.private_data = NULL;
820 ep->rep_remote_cma.private_data_len = 0;
822 /* Client offers RDMA Read but does not initiate */
823 ep->rep_remote_cma.initiator_depth = 0;
824 if (devattr->max_qp_rd_atom > 32) /* arbitrary but <= 255 */
825 ep->rep_remote_cma.responder_resources = 32;
827 ep->rep_remote_cma.responder_resources =
828 devattr->max_qp_rd_atom;
830 ep->rep_remote_cma.retry_count = 7;
831 ep->rep_remote_cma.flow_control = 0;
832 ep->rep_remote_cma.rnr_retry_count = 0;
837 err = ib_destroy_cq(sendcq);
839 dprintk("RPC: %s: ib_destroy_cq returned %i\n",
842 rpcrdma_free_regbuf(ia, ep->rep_padbuf);
849 * Disconnect and destroy endpoint. After this, the only
850 * valid operations on the ep are to free it (if dynamically
851 * allocated) or re-create it.
854 rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
858 dprintk("RPC: %s: entering, connected is %d\n",
859 __func__, ep->rep_connected);
861 cancel_delayed_work_sync(&ep->rep_connect_worker);
864 rpcrdma_ep_disconnect(ep, ia);
865 rdma_destroy_qp(ia->ri_id);
866 ia->ri_id->qp = NULL;
869 rpcrdma_free_regbuf(ia, ep->rep_padbuf);
871 rpcrdma_clean_cq(ep->rep_attr.recv_cq);
872 rc = ib_destroy_cq(ep->rep_attr.recv_cq);
874 dprintk("RPC: %s: ib_destroy_cq returned %i\n",
877 rpcrdma_clean_cq(ep->rep_attr.send_cq);
878 rc = ib_destroy_cq(ep->rep_attr.send_cq);
880 dprintk("RPC: %s: ib_destroy_cq returned %i\n",
885 * Connect unconnected endpoint.
888 rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
890 struct rdma_cm_id *id, *old;
894 if (ep->rep_connected != 0) {
895 struct rpcrdma_xprt *xprt;
897 dprintk("RPC: %s: reconnecting...\n", __func__);
899 rpcrdma_ep_disconnect(ep, ia);
900 rpcrdma_flush_cqs(ep);
902 xprt = container_of(ia, struct rpcrdma_xprt, rx_ia);
903 ia->ri_ops->ro_reset(xprt);
905 id = rpcrdma_create_id(xprt, ia,
906 (struct sockaddr *)&xprt->rx_data.addr);
911 /* TEMP TEMP TEMP - fail if new device:
912 * Deregister/remarshal *all* requests!
913 * Close and recreate adapter, pd, etc!
914 * Re-determine all attributes still sane!
915 * More stuff I haven't thought of!
918 if (ia->ri_id->device != id->device) {
919 printk("RPC: %s: can't reconnect on "
920 "different device!\n", __func__);
926 rc = rdma_create_qp(id, ia->ri_pd, &ep->rep_attr);
928 dprintk("RPC: %s: rdma_create_qp failed %i\n",
935 write_lock(&ia->ri_qplock);
938 write_unlock(&ia->ri_qplock);
940 rdma_destroy_qp(old);
941 rdma_destroy_id(old);
943 dprintk("RPC: %s: connecting...\n", __func__);
944 rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr);
946 dprintk("RPC: %s: rdma_create_qp failed %i\n",
948 /* do not update ep->rep_connected */
953 ep->rep_connected = 0;
955 rc = rdma_connect(ia->ri_id, &ep->rep_remote_cma);
957 dprintk("RPC: %s: rdma_connect() failed with %i\n",
962 wait_event_interruptible(ep->rep_connect_wait, ep->rep_connected != 0);
965 * Check state. A non-peer reject indicates no listener
966 * (ECONNREFUSED), which may be a transient state. All
967 * others indicate a transport condition which has already
968 * undergone a best-effort.
970 if (ep->rep_connected == -ECONNREFUSED &&
971 ++retry_count <= RDMA_CONNECT_RETRY_MAX) {
972 dprintk("RPC: %s: non-peer_reject, retry\n", __func__);
975 if (ep->rep_connected <= 0) {
976 /* Sometimes, the only way to reliably connect to remote
977 * CMs is to use same nonzero values for ORD and IRD. */
978 if (retry_count++ <= RDMA_CONNECT_RETRY_MAX + 1 &&
979 (ep->rep_remote_cma.responder_resources == 0 ||
980 ep->rep_remote_cma.initiator_depth !=
981 ep->rep_remote_cma.responder_resources)) {
982 if (ep->rep_remote_cma.responder_resources == 0)
983 ep->rep_remote_cma.responder_resources = 1;
984 ep->rep_remote_cma.initiator_depth =
985 ep->rep_remote_cma.responder_resources;
988 rc = ep->rep_connected;
990 dprintk("RPC: %s: connected\n", __func__);
995 ep->rep_connected = rc;
1000 * rpcrdma_ep_disconnect
1002 * This is separate from destroy to facilitate the ability
1003 * to reconnect without recreating the endpoint.
1005 * This call is not reentrant, and must not be made in parallel
1006 * on the same endpoint.
1009 rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
1013 rpcrdma_flush_cqs(ep);
1014 rc = rdma_disconnect(ia->ri_id);
1016 /* returns without wait if not connected */
1017 wait_event_interruptible(ep->rep_connect_wait,
1018 ep->rep_connected != 1);
1019 dprintk("RPC: %s: after wait, %sconnected\n", __func__,
1020 (ep->rep_connected == 1) ? "still " : "dis");
1022 dprintk("RPC: %s: rdma_disconnect %i\n", __func__, rc);
1023 ep->rep_connected = rc;
1027 static struct rpcrdma_req *
1028 rpcrdma_create_req(struct rpcrdma_xprt *r_xprt)
1030 struct rpcrdma_req *req;
1032 req = kzalloc(sizeof(*req), GFP_KERNEL);
1034 return ERR_PTR(-ENOMEM);
1036 req->rl_buffer = &r_xprt->rx_buf;
1040 static struct rpcrdma_rep *
1041 rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt)
1043 struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
1044 struct rpcrdma_ia *ia = &r_xprt->rx_ia;
1045 struct rpcrdma_rep *rep;
1049 rep = kzalloc(sizeof(*rep), GFP_KERNEL);
1053 rep->rr_rdmabuf = rpcrdma_alloc_regbuf(ia, cdata->inline_rsize,
1055 if (IS_ERR(rep->rr_rdmabuf)) {
1056 rc = PTR_ERR(rep->rr_rdmabuf);
1060 rep->rr_buffer = &r_xprt->rx_buf;
1070 rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
1072 struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
1073 struct rpcrdma_ia *ia = &r_xprt->rx_ia;
1074 struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
1079 buf->rb_max_requests = cdata->max_requests;
1080 spin_lock_init(&buf->rb_lock);
1082 /* Need to allocate:
1083 * 1. arrays for send and recv pointers
1084 * 2. arrays of struct rpcrdma_req to fill in pointers
1085 * 3. array of struct rpcrdma_rep for replies
1086 * Send/recv buffers in req/rep need to be registered
1088 len = buf->rb_max_requests *
1089 (sizeof(struct rpcrdma_req *) + sizeof(struct rpcrdma_rep *));
1091 p = kzalloc(len, GFP_KERNEL);
1093 dprintk("RPC: %s: req_t/rep_t/pad kzalloc(%zd) failed\n",
1098 buf->rb_pool = p; /* for freeing it later */
1100 buf->rb_send_bufs = (struct rpcrdma_req **) p;
1101 p = (char *) &buf->rb_send_bufs[buf->rb_max_requests];
1102 buf->rb_recv_bufs = (struct rpcrdma_rep **) p;
1103 p = (char *) &buf->rb_recv_bufs[buf->rb_max_requests];
1105 rc = ia->ri_ops->ro_init(r_xprt);
1109 for (i = 0; i < buf->rb_max_requests; i++) {
1110 struct rpcrdma_req *req;
1111 struct rpcrdma_rep *rep;
1113 req = rpcrdma_create_req(r_xprt);
1115 dprintk("RPC: %s: request buffer %d alloc"
1116 " failed\n", __func__, i);
1120 buf->rb_send_bufs[i] = req;
1122 rep = rpcrdma_create_rep(r_xprt);
1124 dprintk("RPC: %s: reply buffer %d alloc failed\n",
1129 buf->rb_recv_bufs[i] = rep;
1134 rpcrdma_buffer_destroy(buf);
1139 rpcrdma_destroy_rep(struct rpcrdma_ia *ia, struct rpcrdma_rep *rep)
1144 rpcrdma_free_regbuf(ia, rep->rr_rdmabuf);
1149 rpcrdma_destroy_req(struct rpcrdma_ia *ia, struct rpcrdma_req *req)
1154 rpcrdma_free_regbuf(ia, req->rl_sendbuf);
1155 rpcrdma_free_regbuf(ia, req->rl_rdmabuf);
1160 rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
1162 struct rpcrdma_ia *ia = rdmab_to_ia(buf);
1165 /* clean up in reverse order from create
1166 * 1. recv mr memory (mr free, then kfree)
1167 * 2. send mr memory (mr free, then kfree)
1170 dprintk("RPC: %s: entering\n", __func__);
1172 for (i = 0; i < buf->rb_max_requests; i++) {
1173 if (buf->rb_recv_bufs)
1174 rpcrdma_destroy_rep(ia, buf->rb_recv_bufs[i]);
1175 if (buf->rb_send_bufs)
1176 rpcrdma_destroy_req(ia, buf->rb_send_bufs[i]);
1179 ia->ri_ops->ro_destroy(buf);
1181 kfree(buf->rb_pool);
1184 /* "*mw" can be NULL when rpcrdma_buffer_get_mrs() fails, leaving
1185 * some req segments uninitialized.
1188 rpcrdma_buffer_put_mr(struct rpcrdma_mw **mw, struct rpcrdma_buffer *buf)
1191 list_add_tail(&(*mw)->mw_list, &buf->rb_mws);
1196 /* Cycle mw's back in reverse order, and "spin" them.
1197 * This delays and scrambles reuse as much as possible.
1200 rpcrdma_buffer_put_mrs(struct rpcrdma_req *req, struct rpcrdma_buffer *buf)
1202 struct rpcrdma_mr_seg *seg = req->rl_segments;
1203 struct rpcrdma_mr_seg *seg1 = seg;
1206 for (i = 1, seg++; i < RPCRDMA_MAX_SEGS; seg++, i++)
1207 rpcrdma_buffer_put_mr(&seg->rl_mw, buf);
1208 rpcrdma_buffer_put_mr(&seg1->rl_mw, buf);
1212 rpcrdma_buffer_put_sendbuf(struct rpcrdma_req *req, struct rpcrdma_buffer *buf)
1214 buf->rb_send_bufs[--buf->rb_send_index] = req;
1216 if (req->rl_reply) {
1217 buf->rb_recv_bufs[--buf->rb_recv_index] = req->rl_reply;
1218 req->rl_reply->rr_func = NULL;
1219 req->rl_reply = NULL;
1223 /* rpcrdma_unmap_one() was already done during deregistration.
1224 * Redo only the ib_post_send().
1227 rpcrdma_retry_local_inv(struct rpcrdma_mw *r, struct rpcrdma_ia *ia)
1229 struct rpcrdma_xprt *r_xprt =
1230 container_of(ia, struct rpcrdma_xprt, rx_ia);
1231 struct ib_send_wr invalidate_wr, *bad_wr;
1234 dprintk("RPC: %s: FRMR %p is stale\n", __func__, r);
1236 /* When this FRMR is re-inserted into rb_mws, it is no longer stale */
1237 r->r.frmr.fr_state = FRMR_IS_INVALID;
1239 memset(&invalidate_wr, 0, sizeof(invalidate_wr));
1240 invalidate_wr.wr_id = (unsigned long)(void *)r;
1241 invalidate_wr.opcode = IB_WR_LOCAL_INV;
1242 invalidate_wr.ex.invalidate_rkey = r->r.frmr.fr_mr->rkey;
1243 DECR_CQCOUNT(&r_xprt->rx_ep);
1245 dprintk("RPC: %s: frmr %p invalidating rkey %08x\n",
1246 __func__, r, r->r.frmr.fr_mr->rkey);
1248 read_lock(&ia->ri_qplock);
1249 rc = ib_post_send(ia->ri_id->qp, &invalidate_wr, &bad_wr);
1250 read_unlock(&ia->ri_qplock);
1252 /* Force rpcrdma_buffer_get() to retry */
1253 r->r.frmr.fr_state = FRMR_IS_STALE;
1254 dprintk("RPC: %s: ib_post_send failed, %i\n",
1260 rpcrdma_retry_flushed_linv(struct list_head *stale,
1261 struct rpcrdma_buffer *buf)
1263 struct rpcrdma_ia *ia = rdmab_to_ia(buf);
1264 struct list_head *pos;
1265 struct rpcrdma_mw *r;
1266 unsigned long flags;
1268 list_for_each(pos, stale) {
1269 r = list_entry(pos, struct rpcrdma_mw, mw_list);
1270 rpcrdma_retry_local_inv(r, ia);
1273 spin_lock_irqsave(&buf->rb_lock, flags);
1274 list_splice_tail(stale, &buf->rb_mws);
1275 spin_unlock_irqrestore(&buf->rb_lock, flags);
1278 static struct rpcrdma_req *
1279 rpcrdma_buffer_get_frmrs(struct rpcrdma_req *req, struct rpcrdma_buffer *buf,
1280 struct list_head *stale)
1282 struct rpcrdma_mw *r;
1285 i = RPCRDMA_MAX_SEGS - 1;
1286 while (!list_empty(&buf->rb_mws)) {
1287 r = list_entry(buf->rb_mws.next,
1288 struct rpcrdma_mw, mw_list);
1289 list_del(&r->mw_list);
1290 if (r->r.frmr.fr_state == FRMR_IS_STALE) {
1291 list_add(&r->mw_list, stale);
1294 req->rl_segments[i].rl_mw = r;
1295 if (unlikely(i-- == 0))
1296 return req; /* Success */
1299 /* Not enough entries on rb_mws for this req */
1300 rpcrdma_buffer_put_sendbuf(req, buf);
1301 rpcrdma_buffer_put_mrs(req, buf);
1305 static struct rpcrdma_req *
1306 rpcrdma_buffer_get_fmrs(struct rpcrdma_req *req, struct rpcrdma_buffer *buf)
1308 struct rpcrdma_mw *r;
1311 i = RPCRDMA_MAX_SEGS - 1;
1312 while (!list_empty(&buf->rb_mws)) {
1313 r = list_entry(buf->rb_mws.next,
1314 struct rpcrdma_mw, mw_list);
1315 list_del(&r->mw_list);
1316 req->rl_segments[i].rl_mw = r;
1317 if (unlikely(i-- == 0))
1318 return req; /* Success */
1321 /* Not enough entries on rb_mws for this req */
1322 rpcrdma_buffer_put_sendbuf(req, buf);
1323 rpcrdma_buffer_put_mrs(req, buf);
1328 * Get a set of request/reply buffers.
1330 * Reply buffer (if needed) is attached to send buffer upon return.
1332 * rb_send_index and rb_recv_index MUST always be pointing to the
1333 * *next* available buffer (non-NULL). They are incremented after
1334 * removing buffers, and decremented *before* returning them.
1336 struct rpcrdma_req *
1337 rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
1339 struct rpcrdma_ia *ia = rdmab_to_ia(buffers);
1340 struct list_head stale;
1341 struct rpcrdma_req *req;
1342 unsigned long flags;
1344 spin_lock_irqsave(&buffers->rb_lock, flags);
1345 if (buffers->rb_send_index == buffers->rb_max_requests) {
1346 spin_unlock_irqrestore(&buffers->rb_lock, flags);
1347 dprintk("RPC: %s: out of request buffers\n", __func__);
1348 return ((struct rpcrdma_req *)NULL);
1351 req = buffers->rb_send_bufs[buffers->rb_send_index];
1352 if (buffers->rb_send_index < buffers->rb_recv_index) {
1353 dprintk("RPC: %s: %d extra receives outstanding (ok)\n",
1355 buffers->rb_recv_index - buffers->rb_send_index);
1356 req->rl_reply = NULL;
1358 req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
1359 buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
1361 buffers->rb_send_bufs[buffers->rb_send_index++] = NULL;
1363 INIT_LIST_HEAD(&stale);
1364 switch (ia->ri_memreg_strategy) {
1366 req = rpcrdma_buffer_get_frmrs(req, buffers, &stale);
1368 case RPCRDMA_MTHCAFMR:
1369 req = rpcrdma_buffer_get_fmrs(req, buffers);
1374 spin_unlock_irqrestore(&buffers->rb_lock, flags);
1375 if (!list_empty(&stale))
1376 rpcrdma_retry_flushed_linv(&stale, buffers);
1381 * Put request/reply buffers back into pool.
1382 * Pre-decrement counter/array index.
1385 rpcrdma_buffer_put(struct rpcrdma_req *req)
1387 struct rpcrdma_buffer *buffers = req->rl_buffer;
1388 struct rpcrdma_ia *ia = rdmab_to_ia(buffers);
1389 unsigned long flags;
1391 spin_lock_irqsave(&buffers->rb_lock, flags);
1392 rpcrdma_buffer_put_sendbuf(req, buffers);
1393 switch (ia->ri_memreg_strategy) {
1395 case RPCRDMA_MTHCAFMR:
1396 rpcrdma_buffer_put_mrs(req, buffers);
1401 spin_unlock_irqrestore(&buffers->rb_lock, flags);
1405 * Recover reply buffers from pool.
1406 * This happens when recovering from error conditions.
1407 * Post-increment counter/array index.
1410 rpcrdma_recv_buffer_get(struct rpcrdma_req *req)
1412 struct rpcrdma_buffer *buffers = req->rl_buffer;
1413 unsigned long flags;
1415 spin_lock_irqsave(&buffers->rb_lock, flags);
1416 if (buffers->rb_recv_index < buffers->rb_max_requests) {
1417 req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
1418 buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
1420 spin_unlock_irqrestore(&buffers->rb_lock, flags);
1424 * Put reply buffers back into pool when not attached to
1425 * request. This happens in error conditions.
1428 rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
1430 struct rpcrdma_buffer *buffers = rep->rr_buffer;
1431 unsigned long flags;
1433 rep->rr_func = NULL;
1434 spin_lock_irqsave(&buffers->rb_lock, flags);
1435 buffers->rb_recv_bufs[--buffers->rb_recv_index] = rep;
1436 spin_unlock_irqrestore(&buffers->rb_lock, flags);
1440 * Wrappers for internal-use kmalloc memory registration, used by buffer code.
1444 rpcrdma_register_internal(struct rpcrdma_ia *ia, void *va, int len,
1445 struct ib_mr **mrp, struct ib_sge *iov)
1447 struct ib_phys_buf ipb;
1452 * All memory passed here was kmalloc'ed, therefore phys-contiguous.
1454 iov->addr = ib_dma_map_single(ia->ri_id->device,
1455 va, len, DMA_BIDIRECTIONAL);
1456 if (ib_dma_mapping_error(ia->ri_id->device, iov->addr))
1461 if (ia->ri_have_dma_lkey) {
1463 iov->lkey = ia->ri_dma_lkey;
1465 } else if (ia->ri_bind_mem != NULL) {
1467 iov->lkey = ia->ri_bind_mem->lkey;
1471 ipb.addr = iov->addr;
1472 ipb.size = iov->length;
1473 mr = ib_reg_phys_mr(ia->ri_pd, &ipb, 1,
1474 IB_ACCESS_LOCAL_WRITE, &iov->addr);
1476 dprintk("RPC: %s: phys convert: 0x%llx "
1477 "registered 0x%llx length %d\n",
1478 __func__, (unsigned long long)ipb.addr,
1479 (unsigned long long)iov->addr, len);
1484 dprintk("RPC: %s: failed with %i\n", __func__, rc);
1487 iov->lkey = mr->lkey;
1495 rpcrdma_deregister_internal(struct rpcrdma_ia *ia,
1496 struct ib_mr *mr, struct ib_sge *iov)
1500 ib_dma_unmap_single(ia->ri_id->device,
1501 iov->addr, iov->length, DMA_BIDIRECTIONAL);
1506 rc = ib_dereg_mr(mr);
1508 dprintk("RPC: %s: ib_dereg_mr failed %i\n", __func__, rc);
1513 * rpcrdma_alloc_regbuf - kmalloc and register memory for SEND/RECV buffers
1514 * @ia: controlling rpcrdma_ia
1515 * @size: size of buffer to be allocated, in bytes
1518 * Returns pointer to private header of an area of internally
1519 * registered memory, or an ERR_PTR. The registered buffer follows
1520 * the end of the private header.
1522 * xprtrdma uses a regbuf for posting an outgoing RDMA SEND, or for
1523 * receiving the payload of RDMA RECV operations. regbufs are not
1524 * used for RDMA READ/WRITE operations, thus are registered only for
1527 struct rpcrdma_regbuf *
1528 rpcrdma_alloc_regbuf(struct rpcrdma_ia *ia, size_t size, gfp_t flags)
1530 struct rpcrdma_regbuf *rb;
1534 rb = kmalloc(sizeof(*rb) + size, flags);
1539 rb->rg_owner = NULL;
1540 rc = rpcrdma_register_internal(ia, rb->rg_base, size,
1541 &rb->rg_mr, &rb->rg_iov);
1554 * rpcrdma_free_regbuf - deregister and free registered buffer
1555 * @ia: controlling rpcrdma_ia
1556 * @rb: regbuf to be deregistered and freed
1559 rpcrdma_free_regbuf(struct rpcrdma_ia *ia, struct rpcrdma_regbuf *rb)
1562 rpcrdma_deregister_internal(ia, rb->rg_mr, &rb->rg_iov);
1568 * Wrappers for chunk registration, shared by read/write chunk code.
1572 rpcrdma_map_one(struct rpcrdma_ia *ia, struct rpcrdma_mr_seg *seg, bool writing)
1574 seg->mr_dir = writing ? DMA_FROM_DEVICE : DMA_TO_DEVICE;
1575 seg->mr_dmalen = seg->mr_len;
1577 seg->mr_dma = ib_dma_map_page(ia->ri_id->device,
1578 seg->mr_page, offset_in_page(seg->mr_offset),
1579 seg->mr_dmalen, seg->mr_dir);
1581 seg->mr_dma = ib_dma_map_single(ia->ri_id->device,
1583 seg->mr_dmalen, seg->mr_dir);
1584 if (ib_dma_mapping_error(ia->ri_id->device, seg->mr_dma)) {
1585 dprintk("RPC: %s: mr_dma %llx mr_offset %p mr_dma_len %zu\n",
1587 (unsigned long long)seg->mr_dma,
1588 seg->mr_offset, seg->mr_dmalen);
1593 rpcrdma_unmap_one(struct rpcrdma_ia *ia, struct rpcrdma_mr_seg *seg)
1596 ib_dma_unmap_page(ia->ri_id->device,
1597 seg->mr_dma, seg->mr_dmalen, seg->mr_dir);
1599 ib_dma_unmap_single(ia->ri_id->device,
1600 seg->mr_dma, seg->mr_dmalen, seg->mr_dir);
1604 * Prepost any receive buffer, then post send.
1606 * Receive buffer is donated to hardware, reclaimed upon recv completion.
1609 rpcrdma_ep_post(struct rpcrdma_ia *ia,
1610 struct rpcrdma_ep *ep,
1611 struct rpcrdma_req *req)
1613 struct ib_send_wr send_wr, *send_wr_fail;
1614 struct rpcrdma_rep *rep = req->rl_reply;
1618 rc = rpcrdma_ep_post_recv(ia, ep, rep);
1621 req->rl_reply = NULL;
1624 send_wr.next = NULL;
1625 send_wr.wr_id = 0ULL; /* no send cookie */
1626 send_wr.sg_list = req->rl_send_iov;
1627 send_wr.num_sge = req->rl_niovs;
1628 send_wr.opcode = IB_WR_SEND;
1629 if (send_wr.num_sge == 4) /* no need to sync any pad (constant) */
1630 ib_dma_sync_single_for_device(ia->ri_id->device,
1631 req->rl_send_iov[3].addr, req->rl_send_iov[3].length,
1633 ib_dma_sync_single_for_device(ia->ri_id->device,
1634 req->rl_send_iov[1].addr, req->rl_send_iov[1].length,
1636 ib_dma_sync_single_for_device(ia->ri_id->device,
1637 req->rl_send_iov[0].addr, req->rl_send_iov[0].length,
1640 if (DECR_CQCOUNT(ep) > 0)
1641 send_wr.send_flags = 0;
1642 else { /* Provider must take a send completion every now and then */
1644 send_wr.send_flags = IB_SEND_SIGNALED;
1647 rc = ib_post_send(ia->ri_id->qp, &send_wr, &send_wr_fail);
1649 dprintk("RPC: %s: ib_post_send returned %i\n", __func__,
1656 * (Re)post a receive buffer.
1659 rpcrdma_ep_post_recv(struct rpcrdma_ia *ia,
1660 struct rpcrdma_ep *ep,
1661 struct rpcrdma_rep *rep)
1663 struct ib_recv_wr recv_wr, *recv_wr_fail;
1666 recv_wr.next = NULL;
1667 recv_wr.wr_id = (u64) (unsigned long) rep;
1668 recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov;
1669 recv_wr.num_sge = 1;
1671 ib_dma_sync_single_for_cpu(ia->ri_id->device,
1672 rdmab_addr(rep->rr_rdmabuf),
1673 rdmab_length(rep->rr_rdmabuf),
1676 rc = ib_post_recv(ia->ri_id->qp, &recv_wr, &recv_wr_fail);
1679 dprintk("RPC: %s: ib_post_recv returned %i\n", __func__,
1684 /* How many chunk list items fit within our inline buffers?
1687 rpcrdma_max_segments(struct rpcrdma_xprt *r_xprt)
1689 struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
1690 int bytes, segments;
1692 bytes = min_t(unsigned int, cdata->inline_wsize, cdata->inline_rsize);
1693 bytes -= RPCRDMA_HDRLEN_MIN;
1694 if (bytes < sizeof(struct rpcrdma_segment) * 2) {
1695 pr_warn("RPC: %s: inline threshold too small\n",
1700 segments = 1 << (fls(bytes / sizeof(struct rpcrdma_segment)) - 1);
1701 dprintk("RPC: %s: max chunk list size = %d segments\n",
1702 __func__, segments);