2 * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
4 * This software is available to you under a choice of one of two
5 * licenses. You may choose to be licensed under the terms of the GNU
6 * General Public License (GPL) Version 2, available from the file
7 * COPYING in the main directory of this source tree, or the BSD-type
10 * Redistribution and use in source and binary forms, with or without
11 * modification, are permitted provided that the following conditions
14 * Redistributions of source code must retain the above copyright
15 * notice, this list of conditions and the following disclaimer.
17 * Redistributions in binary form must reproduce the above
18 * copyright notice, this list of conditions and the following
19 * disclaimer in the documentation and/or other materials provided
20 * with the distribution.
22 * Neither the name of the Network Appliance, Inc. nor the names of
23 * its contributors may be used to endorse or promote products
24 * derived from this software without specific prior written
27 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
28 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
29 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
30 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
31 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
32 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
33 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
34 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
35 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
36 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
37 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
43 * Encapsulates the major functions managing:
50 #include <linux/interrupt.h>
51 #include <linux/slab.h>
52 #include <linux/prefetch.h>
53 #include <linux/sunrpc/addr.h>
54 #include <asm/bitops.h>
56 #include "xprt_rdma.h"
62 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
63 # define RPCDBG_FACILITY RPCDBG_TRANS
71 * handle replies in tasklet context, using a single, global list
72 * rdma tasklet function -- just turn around and call the func
73 * for all replies on the list
76 static DEFINE_SPINLOCK(rpcrdma_tk_lock_g);
77 static LIST_HEAD(rpcrdma_tasklets_g);
80 rpcrdma_run_tasklet(unsigned long data)
82 struct rpcrdma_rep *rep;
83 void (*func)(struct rpcrdma_rep *);
87 spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
88 while (!list_empty(&rpcrdma_tasklets_g)) {
89 rep = list_entry(rpcrdma_tasklets_g.next,
90 struct rpcrdma_rep, rr_list);
91 list_del(&rep->rr_list);
94 spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
99 rpcrdma_recv_buffer_put(rep);
101 spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
103 spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
106 static DECLARE_TASKLET(rpcrdma_tasklet_g, rpcrdma_run_tasklet, 0UL);
108 static const char * const async_event[] = {
113 "communication established",
114 "send queue drained",
115 "path migration successful",
117 "device fatal error",
130 #define ASYNC_MSG(status) \
131 ((status) < ARRAY_SIZE(async_event) ? \
132 async_event[(status)] : "unknown async error")
135 rpcrdma_schedule_tasklet(struct list_head *sched_list)
139 spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
140 list_splice_tail(sched_list, &rpcrdma_tasklets_g);
141 spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
142 tasklet_schedule(&rpcrdma_tasklet_g);
146 rpcrdma_qp_async_error_upcall(struct ib_event *event, void *context)
148 struct rpcrdma_ep *ep = context;
150 pr_err("RPC: %s: %s on device %s ep %p\n",
151 __func__, ASYNC_MSG(event->event),
152 event->device->name, context);
153 if (ep->rep_connected == 1) {
154 ep->rep_connected = -EIO;
155 rpcrdma_conn_func(ep);
156 wake_up_all(&ep->rep_connect_wait);
161 rpcrdma_cq_async_error_upcall(struct ib_event *event, void *context)
163 struct rpcrdma_ep *ep = context;
165 pr_err("RPC: %s: %s on device %s ep %p\n",
166 __func__, ASYNC_MSG(event->event),
167 event->device->name, context);
168 if (ep->rep_connected == 1) {
169 ep->rep_connected = -EIO;
170 rpcrdma_conn_func(ep);
171 wake_up_all(&ep->rep_connect_wait);
175 static const char * const wc_status[] = {
177 "local length error",
178 "local QP operation error",
179 "local EE context operation error",
180 "local protection error",
182 "memory management operation error",
183 "bad response error",
184 "local access error",
185 "remote invalid request error",
186 "remote access error",
187 "remote operation error",
188 "transport retry counter exceeded",
189 "RNR retry counter exceeded",
190 "local RDD violation error",
191 "remove invalid RD request",
193 "invalid EE context number",
194 "invalid EE context state",
196 "response timeout error",
200 #define COMPLETION_MSG(status) \
201 ((status) < ARRAY_SIZE(wc_status) ? \
202 wc_status[(status)] : "unexpected completion error")
205 rpcrdma_sendcq_process_wc(struct ib_wc *wc)
207 /* WARNING: Only wr_id and status are reliable at this point */
208 if (wc->wr_id == RPCRDMA_IGNORE_COMPLETION) {
209 if (wc->status != IB_WC_SUCCESS &&
210 wc->status != IB_WC_WR_FLUSH_ERR)
211 pr_err("RPC: %s: SEND: %s\n",
212 __func__, COMPLETION_MSG(wc->status));
214 struct rpcrdma_mw *r;
216 r = (struct rpcrdma_mw *)(unsigned long)wc->wr_id;
217 r->mw_sendcompletion(wc);
222 rpcrdma_sendcq_poll(struct ib_cq *cq, struct rpcrdma_ep *ep)
225 int budget, count, rc;
227 budget = RPCRDMA_WC_BUDGET / RPCRDMA_POLLSIZE;
229 wcs = ep->rep_send_wcs;
231 rc = ib_poll_cq(cq, RPCRDMA_POLLSIZE, wcs);
237 rpcrdma_sendcq_process_wc(wcs++);
238 } while (rc == RPCRDMA_POLLSIZE && --budget);
243 * Handle send, fast_reg_mr, and local_inv completions.
245 * Send events are typically suppressed and thus do not result
246 * in an upcall. Occasionally one is signaled, however. This
247 * prevents the provider's completion queue from wrapping and
248 * losing a completion.
251 rpcrdma_sendcq_upcall(struct ib_cq *cq, void *cq_context)
253 struct rpcrdma_ep *ep = (struct rpcrdma_ep *)cq_context;
256 rc = rpcrdma_sendcq_poll(cq, ep);
258 dprintk("RPC: %s: ib_poll_cq failed: %i\n",
263 rc = ib_req_notify_cq(cq,
264 IB_CQ_NEXT_COMP | IB_CQ_REPORT_MISSED_EVENTS);
268 dprintk("RPC: %s: ib_req_notify_cq failed: %i\n",
273 rpcrdma_sendcq_poll(cq, ep);
277 rpcrdma_recvcq_process_wc(struct ib_wc *wc, struct list_head *sched_list)
279 struct rpcrdma_rep *rep =
280 (struct rpcrdma_rep *)(unsigned long)wc->wr_id;
281 struct rpcrdma_ia *ia;
283 /* WARNING: Only wr_id and status are reliable at this point */
284 if (wc->status != IB_WC_SUCCESS)
287 /* status == SUCCESS means all fields in wc are trustworthy */
288 if (wc->opcode != IB_WC_RECV)
291 dprintk("RPC: %s: rep %p opcode 'recv', length %u: success\n",
292 __func__, rep, wc->byte_len);
294 ia = &rep->rr_rxprt->rx_ia;
295 rep->rr_len = wc->byte_len;
296 ib_dma_sync_single_for_cpu(ia->ri_id->device,
297 rdmab_addr(rep->rr_rdmabuf),
298 rep->rr_len, DMA_FROM_DEVICE);
299 prefetch(rdmab_to_msg(rep->rr_rdmabuf));
302 list_add_tail(&rep->rr_list, sched_list);
305 if (wc->status != IB_WC_WR_FLUSH_ERR)
306 pr_err("RPC: %s: rep %p: %s\n",
307 __func__, rep, COMPLETION_MSG(wc->status));
313 rpcrdma_recvcq_poll(struct ib_cq *cq, struct rpcrdma_ep *ep)
315 struct list_head sched_list;
317 int budget, count, rc;
319 INIT_LIST_HEAD(&sched_list);
320 budget = RPCRDMA_WC_BUDGET / RPCRDMA_POLLSIZE;
322 wcs = ep->rep_recv_wcs;
324 rc = ib_poll_cq(cq, RPCRDMA_POLLSIZE, wcs);
330 rpcrdma_recvcq_process_wc(wcs++, &sched_list);
331 } while (rc == RPCRDMA_POLLSIZE && --budget);
335 rpcrdma_schedule_tasklet(&sched_list);
340 * Handle receive completions.
342 * It is reentrant but processes single events in order to maintain
343 * ordering of receives to keep server credits.
345 * It is the responsibility of the scheduled tasklet to return
346 * recv buffers to the pool. NOTE: this affects synchronization of
347 * connection shutdown. That is, the structures required for
348 * the completion of the reply handler must remain intact until
349 * all memory has been reclaimed.
352 rpcrdma_recvcq_upcall(struct ib_cq *cq, void *cq_context)
354 struct rpcrdma_ep *ep = (struct rpcrdma_ep *)cq_context;
357 rc = rpcrdma_recvcq_poll(cq, ep);
359 dprintk("RPC: %s: ib_poll_cq failed: %i\n",
364 rc = ib_req_notify_cq(cq,
365 IB_CQ_NEXT_COMP | IB_CQ_REPORT_MISSED_EVENTS);
369 dprintk("RPC: %s: ib_req_notify_cq failed: %i\n",
374 rpcrdma_recvcq_poll(cq, ep);
378 rpcrdma_flush_cqs(struct rpcrdma_ep *ep)
381 LIST_HEAD(sched_list);
383 while (ib_poll_cq(ep->rep_attr.recv_cq, 1, &wc) > 0)
384 rpcrdma_recvcq_process_wc(&wc, &sched_list);
385 if (!list_empty(&sched_list))
386 rpcrdma_schedule_tasklet(&sched_list);
387 while (ib_poll_cq(ep->rep_attr.send_cq, 1, &wc) > 0)
388 rpcrdma_sendcq_process_wc(&wc);
391 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
392 static const char * const conn[] = {
411 #define CONNECTION_MSG(status) \
412 ((status) < ARRAY_SIZE(conn) ? \
413 conn[(status)] : "unrecognized connection error")
417 rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
419 struct rpcrdma_xprt *xprt = id->context;
420 struct rpcrdma_ia *ia = &xprt->rx_ia;
421 struct rpcrdma_ep *ep = &xprt->rx_ep;
422 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
423 struct sockaddr *sap = (struct sockaddr *)&ep->rep_remote_addr;
425 struct ib_qp_attr *attr = &ia->ri_qp_attr;
426 struct ib_qp_init_attr *iattr = &ia->ri_qp_init_attr;
429 switch (event->event) {
430 case RDMA_CM_EVENT_ADDR_RESOLVED:
431 case RDMA_CM_EVENT_ROUTE_RESOLVED:
433 complete(&ia->ri_done);
435 case RDMA_CM_EVENT_ADDR_ERROR:
436 ia->ri_async_rc = -EHOSTUNREACH;
437 dprintk("RPC: %s: CM address resolution error, ep 0x%p\n",
439 complete(&ia->ri_done);
441 case RDMA_CM_EVENT_ROUTE_ERROR:
442 ia->ri_async_rc = -ENETUNREACH;
443 dprintk("RPC: %s: CM route resolution error, ep 0x%p\n",
445 complete(&ia->ri_done);
447 case RDMA_CM_EVENT_ESTABLISHED:
449 ib_query_qp(ia->ri_id->qp, attr,
450 IB_QP_MAX_QP_RD_ATOMIC | IB_QP_MAX_DEST_RD_ATOMIC,
452 dprintk("RPC: %s: %d responder resources"
454 __func__, attr->max_dest_rd_atomic,
455 attr->max_rd_atomic);
457 case RDMA_CM_EVENT_CONNECT_ERROR:
458 connstate = -ENOTCONN;
460 case RDMA_CM_EVENT_UNREACHABLE:
461 connstate = -ENETDOWN;
463 case RDMA_CM_EVENT_REJECTED:
464 connstate = -ECONNREFUSED;
466 case RDMA_CM_EVENT_DISCONNECTED:
467 connstate = -ECONNABORTED;
469 case RDMA_CM_EVENT_DEVICE_REMOVAL:
472 dprintk("RPC: %s: %sconnected\n",
473 __func__, connstate > 0 ? "" : "dis");
474 ep->rep_connected = connstate;
475 rpcrdma_conn_func(ep);
476 wake_up_all(&ep->rep_connect_wait);
479 dprintk("RPC: %s: %pIS:%u (ep 0x%p): %s\n",
480 __func__, sap, rpc_get_port(sap), ep,
481 CONNECTION_MSG(event->event));
485 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
486 if (connstate == 1) {
487 int ird = attr->max_dest_rd_atomic;
488 int tird = ep->rep_remote_cma.responder_resources;
490 pr_info("rpcrdma: connection to %pIS:%u on %s, memreg '%s', %d credits, %d responders%s\n",
491 sap, rpc_get_port(sap),
492 ia->ri_id->device->name,
493 ia->ri_ops->ro_displayname,
494 xprt->rx_buf.rb_max_requests,
495 ird, ird < 4 && ird < tird / 2 ? " (low!)" : "");
496 } else if (connstate < 0) {
497 pr_info("rpcrdma: connection to %pIS:%u closed (%d)\n",
498 sap, rpc_get_port(sap), connstate);
505 static struct rdma_cm_id *
506 rpcrdma_create_id(struct rpcrdma_xprt *xprt,
507 struct rpcrdma_ia *ia, struct sockaddr *addr)
509 struct rdma_cm_id *id;
512 init_completion(&ia->ri_done);
514 id = rdma_create_id(rpcrdma_conn_upcall, xprt, RDMA_PS_TCP, IB_QPT_RC);
517 dprintk("RPC: %s: rdma_create_id() failed %i\n",
522 ia->ri_async_rc = -ETIMEDOUT;
523 rc = rdma_resolve_addr(id, NULL, addr, RDMA_RESOLVE_TIMEOUT);
525 dprintk("RPC: %s: rdma_resolve_addr() failed %i\n",
529 wait_for_completion_interruptible_timeout(&ia->ri_done,
530 msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1);
531 rc = ia->ri_async_rc;
535 ia->ri_async_rc = -ETIMEDOUT;
536 rc = rdma_resolve_route(id, RDMA_RESOLVE_TIMEOUT);
538 dprintk("RPC: %s: rdma_resolve_route() failed %i\n",
542 wait_for_completion_interruptible_timeout(&ia->ri_done,
543 msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1);
544 rc = ia->ri_async_rc;
556 * Drain any cq, prior to teardown.
559 rpcrdma_clean_cq(struct ib_cq *cq)
564 while (1 == ib_poll_cq(cq, 1, &wc))
568 dprintk("RPC: %s: flushed %d events (last 0x%x)\n",
569 __func__, count, wc.opcode);
573 * Exported functions.
577 * Open and initialize an Interface Adapter.
578 * o initializes fields of struct rpcrdma_ia, including
579 * interface and provider attributes and protection zone.
582 rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)
585 struct rpcrdma_ia *ia = &xprt->rx_ia;
586 struct ib_device_attr *devattr = &ia->ri_devattr;
588 ia->ri_id = rpcrdma_create_id(xprt, ia, addr);
589 if (IS_ERR(ia->ri_id)) {
590 rc = PTR_ERR(ia->ri_id);
594 ia->ri_pd = ib_alloc_pd(ia->ri_id->device);
595 if (IS_ERR(ia->ri_pd)) {
596 rc = PTR_ERR(ia->ri_pd);
597 dprintk("RPC: %s: ib_alloc_pd() failed %i\n",
602 rc = ib_query_device(ia->ri_id->device, devattr);
604 dprintk("RPC: %s: ib_query_device failed %d\n",
609 if (devattr->device_cap_flags & IB_DEVICE_LOCAL_DMA_LKEY) {
610 ia->ri_have_dma_lkey = 1;
611 ia->ri_dma_lkey = ia->ri_id->device->local_dma_lkey;
614 if (memreg == RPCRDMA_FRMR) {
615 /* Requires both frmr reg and local dma lkey */
616 if (((devattr->device_cap_flags &
617 (IB_DEVICE_MEM_MGT_EXTENSIONS|IB_DEVICE_LOCAL_DMA_LKEY)) !=
618 (IB_DEVICE_MEM_MGT_EXTENSIONS|IB_DEVICE_LOCAL_DMA_LKEY)) ||
619 (devattr->max_fast_reg_page_list_len == 0)) {
620 dprintk("RPC: %s: FRMR registration "
621 "not supported by HCA\n", __func__);
622 memreg = RPCRDMA_MTHCAFMR;
625 if (memreg == RPCRDMA_MTHCAFMR) {
626 if (!ia->ri_id->device->alloc_fmr) {
627 dprintk("RPC: %s: MTHCAFMR registration "
628 "not supported by HCA\n", __func__);
629 memreg = RPCRDMA_ALLPHYSICAL;
634 * Optionally obtain an underlying physical identity mapping in
635 * order to do a memory window-based bind. This base registration
636 * is protected from remote access - that is enabled only by binding
637 * for the specific bytes targeted during each RPC operation, and
638 * revoked after the corresponding completion similar to a storage
643 ia->ri_ops = &rpcrdma_frwr_memreg_ops;
645 case RPCRDMA_ALLPHYSICAL:
646 ia->ri_ops = &rpcrdma_physical_memreg_ops;
647 mem_priv = IB_ACCESS_LOCAL_WRITE |
648 IB_ACCESS_REMOTE_WRITE |
649 IB_ACCESS_REMOTE_READ;
651 case RPCRDMA_MTHCAFMR:
652 ia->ri_ops = &rpcrdma_fmr_memreg_ops;
653 if (ia->ri_have_dma_lkey)
655 mem_priv = IB_ACCESS_LOCAL_WRITE;
657 ia->ri_bind_mem = ib_get_dma_mr(ia->ri_pd, mem_priv);
658 if (IS_ERR(ia->ri_bind_mem)) {
659 printk(KERN_ALERT "%s: ib_get_dma_mr for "
660 "phys register failed with %lX\n",
661 __func__, PTR_ERR(ia->ri_bind_mem));
667 printk(KERN_ERR "RPC: Unsupported memory "
668 "registration mode: %d\n", memreg);
672 dprintk("RPC: %s: memory registration strategy is '%s'\n",
673 __func__, ia->ri_ops->ro_displayname);
675 /* Else will do memory reg/dereg for each chunk */
676 ia->ri_memreg_strategy = memreg;
678 rwlock_init(&ia->ri_qplock);
682 ib_dealloc_pd(ia->ri_pd);
685 rdma_destroy_id(ia->ri_id);
692 * Clean up/close an IA.
693 * o if event handles and PD have been initialized, free them.
697 rpcrdma_ia_close(struct rpcrdma_ia *ia)
701 dprintk("RPC: %s: entering\n", __func__);
702 if (ia->ri_bind_mem != NULL) {
703 rc = ib_dereg_mr(ia->ri_bind_mem);
704 dprintk("RPC: %s: ib_dereg_mr returned %i\n",
708 if (ia->ri_id != NULL && !IS_ERR(ia->ri_id)) {
710 rdma_destroy_qp(ia->ri_id);
711 rdma_destroy_id(ia->ri_id);
715 /* If the pd is still busy, xprtrdma missed freeing a resource */
716 if (ia->ri_pd && !IS_ERR(ia->ri_pd))
717 WARN_ON(ib_dealloc_pd(ia->ri_pd));
721 * Create unconnected endpoint.
724 rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
725 struct rpcrdma_create_data_internal *cdata)
727 struct ib_device_attr *devattr = &ia->ri_devattr;
728 struct ib_cq *sendcq, *recvcq;
731 /* check provider's send/recv wr limits */
732 if (cdata->max_requests > devattr->max_qp_wr)
733 cdata->max_requests = devattr->max_qp_wr;
735 ep->rep_attr.event_handler = rpcrdma_qp_async_error_upcall;
736 ep->rep_attr.qp_context = ep;
737 ep->rep_attr.srq = NULL;
738 ep->rep_attr.cap.max_send_wr = cdata->max_requests;
739 rc = ia->ri_ops->ro_open(ia, ep, cdata);
742 ep->rep_attr.cap.max_recv_wr = cdata->max_requests;
743 ep->rep_attr.cap.max_send_sge = (cdata->padding ? 4 : 2);
744 ep->rep_attr.cap.max_recv_sge = 1;
745 ep->rep_attr.cap.max_inline_data = 0;
746 ep->rep_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
747 ep->rep_attr.qp_type = IB_QPT_RC;
748 ep->rep_attr.port_num = ~0;
750 if (cdata->padding) {
751 ep->rep_padbuf = rpcrdma_alloc_regbuf(ia, cdata->padding,
753 if (IS_ERR(ep->rep_padbuf))
754 return PTR_ERR(ep->rep_padbuf);
756 ep->rep_padbuf = NULL;
758 dprintk("RPC: %s: requested max: dtos: send %d recv %d; "
759 "iovs: send %d recv %d\n",
761 ep->rep_attr.cap.max_send_wr,
762 ep->rep_attr.cap.max_recv_wr,
763 ep->rep_attr.cap.max_send_sge,
764 ep->rep_attr.cap.max_recv_sge);
766 /* set trigger for requesting send completion */
767 ep->rep_cqinit = ep->rep_attr.cap.max_send_wr/2 - 1;
768 if (ep->rep_cqinit > RPCRDMA_MAX_UNSIGNALED_SENDS)
769 ep->rep_cqinit = RPCRDMA_MAX_UNSIGNALED_SENDS;
770 else if (ep->rep_cqinit <= 2)
773 init_waitqueue_head(&ep->rep_connect_wait);
774 INIT_DELAYED_WORK(&ep->rep_connect_worker, rpcrdma_connect_worker);
776 sendcq = ib_create_cq(ia->ri_id->device, rpcrdma_sendcq_upcall,
777 rpcrdma_cq_async_error_upcall, ep,
778 ep->rep_attr.cap.max_send_wr + 1, 0);
779 if (IS_ERR(sendcq)) {
780 rc = PTR_ERR(sendcq);
781 dprintk("RPC: %s: failed to create send CQ: %i\n",
786 rc = ib_req_notify_cq(sendcq, IB_CQ_NEXT_COMP);
788 dprintk("RPC: %s: ib_req_notify_cq failed: %i\n",
793 recvcq = ib_create_cq(ia->ri_id->device, rpcrdma_recvcq_upcall,
794 rpcrdma_cq_async_error_upcall, ep,
795 ep->rep_attr.cap.max_recv_wr + 1, 0);
796 if (IS_ERR(recvcq)) {
797 rc = PTR_ERR(recvcq);
798 dprintk("RPC: %s: failed to create recv CQ: %i\n",
803 rc = ib_req_notify_cq(recvcq, IB_CQ_NEXT_COMP);
805 dprintk("RPC: %s: ib_req_notify_cq failed: %i\n",
807 ib_destroy_cq(recvcq);
811 ep->rep_attr.send_cq = sendcq;
812 ep->rep_attr.recv_cq = recvcq;
814 /* Initialize cma parameters */
816 /* RPC/RDMA does not use private data */
817 ep->rep_remote_cma.private_data = NULL;
818 ep->rep_remote_cma.private_data_len = 0;
820 /* Client offers RDMA Read but does not initiate */
821 ep->rep_remote_cma.initiator_depth = 0;
822 if (devattr->max_qp_rd_atom > 32) /* arbitrary but <= 255 */
823 ep->rep_remote_cma.responder_resources = 32;
825 ep->rep_remote_cma.responder_resources =
826 devattr->max_qp_rd_atom;
828 ep->rep_remote_cma.retry_count = 7;
829 ep->rep_remote_cma.flow_control = 0;
830 ep->rep_remote_cma.rnr_retry_count = 0;
835 err = ib_destroy_cq(sendcq);
837 dprintk("RPC: %s: ib_destroy_cq returned %i\n",
840 rpcrdma_free_regbuf(ia, ep->rep_padbuf);
847 * Disconnect and destroy endpoint. After this, the only
848 * valid operations on the ep are to free it (if dynamically
849 * allocated) or re-create it.
852 rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
856 dprintk("RPC: %s: entering, connected is %d\n",
857 __func__, ep->rep_connected);
859 cancel_delayed_work_sync(&ep->rep_connect_worker);
862 rpcrdma_ep_disconnect(ep, ia);
863 rdma_destroy_qp(ia->ri_id);
864 ia->ri_id->qp = NULL;
867 rpcrdma_free_regbuf(ia, ep->rep_padbuf);
869 rpcrdma_clean_cq(ep->rep_attr.recv_cq);
870 rc = ib_destroy_cq(ep->rep_attr.recv_cq);
872 dprintk("RPC: %s: ib_destroy_cq returned %i\n",
875 rpcrdma_clean_cq(ep->rep_attr.send_cq);
876 rc = ib_destroy_cq(ep->rep_attr.send_cq);
878 dprintk("RPC: %s: ib_destroy_cq returned %i\n",
883 * Connect unconnected endpoint.
886 rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
888 struct rdma_cm_id *id, *old;
892 if (ep->rep_connected != 0) {
893 struct rpcrdma_xprt *xprt;
895 dprintk("RPC: %s: reconnecting...\n", __func__);
897 rpcrdma_ep_disconnect(ep, ia);
898 rpcrdma_flush_cqs(ep);
900 xprt = container_of(ia, struct rpcrdma_xprt, rx_ia);
901 ia->ri_ops->ro_reset(xprt);
903 id = rpcrdma_create_id(xprt, ia,
904 (struct sockaddr *)&xprt->rx_data.addr);
909 /* TEMP TEMP TEMP - fail if new device:
910 * Deregister/remarshal *all* requests!
911 * Close and recreate adapter, pd, etc!
912 * Re-determine all attributes still sane!
913 * More stuff I haven't thought of!
916 if (ia->ri_id->device != id->device) {
917 printk("RPC: %s: can't reconnect on "
918 "different device!\n", __func__);
924 rc = rdma_create_qp(id, ia->ri_pd, &ep->rep_attr);
926 dprintk("RPC: %s: rdma_create_qp failed %i\n",
933 write_lock(&ia->ri_qplock);
936 write_unlock(&ia->ri_qplock);
938 rdma_destroy_qp(old);
939 rdma_destroy_id(old);
941 dprintk("RPC: %s: connecting...\n", __func__);
942 rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr);
944 dprintk("RPC: %s: rdma_create_qp failed %i\n",
946 /* do not update ep->rep_connected */
951 ep->rep_connected = 0;
953 rc = rdma_connect(ia->ri_id, &ep->rep_remote_cma);
955 dprintk("RPC: %s: rdma_connect() failed with %i\n",
960 wait_event_interruptible(ep->rep_connect_wait, ep->rep_connected != 0);
963 * Check state. A non-peer reject indicates no listener
964 * (ECONNREFUSED), which may be a transient state. All
965 * others indicate a transport condition which has already
966 * undergone a best-effort.
968 if (ep->rep_connected == -ECONNREFUSED &&
969 ++retry_count <= RDMA_CONNECT_RETRY_MAX) {
970 dprintk("RPC: %s: non-peer_reject, retry\n", __func__);
973 if (ep->rep_connected <= 0) {
974 /* Sometimes, the only way to reliably connect to remote
975 * CMs is to use same nonzero values for ORD and IRD. */
976 if (retry_count++ <= RDMA_CONNECT_RETRY_MAX + 1 &&
977 (ep->rep_remote_cma.responder_resources == 0 ||
978 ep->rep_remote_cma.initiator_depth !=
979 ep->rep_remote_cma.responder_resources)) {
980 if (ep->rep_remote_cma.responder_resources == 0)
981 ep->rep_remote_cma.responder_resources = 1;
982 ep->rep_remote_cma.initiator_depth =
983 ep->rep_remote_cma.responder_resources;
986 rc = ep->rep_connected;
988 dprintk("RPC: %s: connected\n", __func__);
993 ep->rep_connected = rc;
998 * rpcrdma_ep_disconnect
1000 * This is separate from destroy to facilitate the ability
1001 * to reconnect without recreating the endpoint.
1003 * This call is not reentrant, and must not be made in parallel
1004 * on the same endpoint.
1007 rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
1011 rpcrdma_flush_cqs(ep);
1012 rc = rdma_disconnect(ia->ri_id);
1014 /* returns without wait if not connected */
1015 wait_event_interruptible(ep->rep_connect_wait,
1016 ep->rep_connected != 1);
1017 dprintk("RPC: %s: after wait, %sconnected\n", __func__,
1018 (ep->rep_connected == 1) ? "still " : "dis");
1020 dprintk("RPC: %s: rdma_disconnect %i\n", __func__, rc);
1021 ep->rep_connected = rc;
1025 static struct rpcrdma_req *
1026 rpcrdma_create_req(struct rpcrdma_xprt *r_xprt)
1028 struct rpcrdma_req *req;
1030 req = kzalloc(sizeof(*req), GFP_KERNEL);
1032 return ERR_PTR(-ENOMEM);
1034 req->rl_buffer = &r_xprt->rx_buf;
1038 static struct rpcrdma_rep *
1039 rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt)
1041 struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
1042 struct rpcrdma_ia *ia = &r_xprt->rx_ia;
1043 struct rpcrdma_rep *rep;
1047 rep = kzalloc(sizeof(*rep), GFP_KERNEL);
1051 rep->rr_rdmabuf = rpcrdma_alloc_regbuf(ia, cdata->inline_rsize,
1053 if (IS_ERR(rep->rr_rdmabuf)) {
1054 rc = PTR_ERR(rep->rr_rdmabuf);
1058 rep->rr_rxprt = r_xprt;
1068 rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
1070 struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
1071 struct rpcrdma_ia *ia = &r_xprt->rx_ia;
1072 struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
1077 buf->rb_max_requests = cdata->max_requests;
1078 spin_lock_init(&buf->rb_lock);
1080 /* Need to allocate:
1081 * 1. arrays for send and recv pointers
1082 * 2. arrays of struct rpcrdma_req to fill in pointers
1083 * 3. array of struct rpcrdma_rep for replies
1084 * Send/recv buffers in req/rep need to be registered
1086 len = buf->rb_max_requests *
1087 (sizeof(struct rpcrdma_req *) + sizeof(struct rpcrdma_rep *));
1089 p = kzalloc(len, GFP_KERNEL);
1091 dprintk("RPC: %s: req_t/rep_t/pad kzalloc(%zd) failed\n",
1096 buf->rb_pool = p; /* for freeing it later */
1098 buf->rb_send_bufs = (struct rpcrdma_req **) p;
1099 p = (char *) &buf->rb_send_bufs[buf->rb_max_requests];
1100 buf->rb_recv_bufs = (struct rpcrdma_rep **) p;
1101 p = (char *) &buf->rb_recv_bufs[buf->rb_max_requests];
1103 rc = ia->ri_ops->ro_init(r_xprt);
1107 for (i = 0; i < buf->rb_max_requests; i++) {
1108 struct rpcrdma_req *req;
1109 struct rpcrdma_rep *rep;
1111 req = rpcrdma_create_req(r_xprt);
1113 dprintk("RPC: %s: request buffer %d alloc"
1114 " failed\n", __func__, i);
1118 buf->rb_send_bufs[i] = req;
1120 rep = rpcrdma_create_rep(r_xprt);
1122 dprintk("RPC: %s: reply buffer %d alloc failed\n",
1127 buf->rb_recv_bufs[i] = rep;
1132 rpcrdma_buffer_destroy(buf);
1137 rpcrdma_destroy_rep(struct rpcrdma_ia *ia, struct rpcrdma_rep *rep)
1142 rpcrdma_free_regbuf(ia, rep->rr_rdmabuf);
1147 rpcrdma_destroy_req(struct rpcrdma_ia *ia, struct rpcrdma_req *req)
1152 rpcrdma_free_regbuf(ia, req->rl_sendbuf);
1153 rpcrdma_free_regbuf(ia, req->rl_rdmabuf);
1158 rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
1160 struct rpcrdma_ia *ia = rdmab_to_ia(buf);
1163 /* clean up in reverse order from create
1164 * 1. recv mr memory (mr free, then kfree)
1165 * 2. send mr memory (mr free, then kfree)
1168 dprintk("RPC: %s: entering\n", __func__);
1170 for (i = 0; i < buf->rb_max_requests; i++) {
1171 if (buf->rb_recv_bufs)
1172 rpcrdma_destroy_rep(ia, buf->rb_recv_bufs[i]);
1173 if (buf->rb_send_bufs)
1174 rpcrdma_destroy_req(ia, buf->rb_send_bufs[i]);
1177 ia->ri_ops->ro_destroy(buf);
1179 kfree(buf->rb_pool);
1182 /* "*mw" can be NULL when rpcrdma_buffer_get_mrs() fails, leaving
1183 * some req segments uninitialized.
1186 rpcrdma_buffer_put_mr(struct rpcrdma_mw **mw, struct rpcrdma_buffer *buf)
1189 list_add_tail(&(*mw)->mw_list, &buf->rb_mws);
1194 /* Cycle mw's back in reverse order, and "spin" them.
1195 * This delays and scrambles reuse as much as possible.
1198 rpcrdma_buffer_put_mrs(struct rpcrdma_req *req, struct rpcrdma_buffer *buf)
1200 struct rpcrdma_mr_seg *seg = req->rl_segments;
1201 struct rpcrdma_mr_seg *seg1 = seg;
1204 for (i = 1, seg++; i < RPCRDMA_MAX_SEGS; seg++, i++)
1205 rpcrdma_buffer_put_mr(&seg->rl_mw, buf);
1206 rpcrdma_buffer_put_mr(&seg1->rl_mw, buf);
1210 rpcrdma_buffer_put_sendbuf(struct rpcrdma_req *req, struct rpcrdma_buffer *buf)
1212 buf->rb_send_bufs[--buf->rb_send_index] = req;
1214 if (req->rl_reply) {
1215 buf->rb_recv_bufs[--buf->rb_recv_index] = req->rl_reply;
1216 req->rl_reply->rr_func = NULL;
1217 req->rl_reply = NULL;
1221 /* rpcrdma_unmap_one() was already done during deregistration.
1222 * Redo only the ib_post_send().
1225 rpcrdma_retry_local_inv(struct rpcrdma_mw *r, struct rpcrdma_ia *ia)
1227 struct rpcrdma_xprt *r_xprt =
1228 container_of(ia, struct rpcrdma_xprt, rx_ia);
1229 struct ib_send_wr invalidate_wr, *bad_wr;
1232 dprintk("RPC: %s: FRMR %p is stale\n", __func__, r);
1234 /* When this FRMR is re-inserted into rb_mws, it is no longer stale */
1235 r->r.frmr.fr_state = FRMR_IS_INVALID;
1237 memset(&invalidate_wr, 0, sizeof(invalidate_wr));
1238 invalidate_wr.wr_id = (unsigned long)(void *)r;
1239 invalidate_wr.opcode = IB_WR_LOCAL_INV;
1240 invalidate_wr.ex.invalidate_rkey = r->r.frmr.fr_mr->rkey;
1241 DECR_CQCOUNT(&r_xprt->rx_ep);
1243 dprintk("RPC: %s: frmr %p invalidating rkey %08x\n",
1244 __func__, r, r->r.frmr.fr_mr->rkey);
1246 read_lock(&ia->ri_qplock);
1247 rc = ib_post_send(ia->ri_id->qp, &invalidate_wr, &bad_wr);
1248 read_unlock(&ia->ri_qplock);
1250 /* Force rpcrdma_buffer_get() to retry */
1251 r->r.frmr.fr_state = FRMR_IS_STALE;
1252 dprintk("RPC: %s: ib_post_send failed, %i\n",
1258 rpcrdma_retry_flushed_linv(struct list_head *stale,
1259 struct rpcrdma_buffer *buf)
1261 struct rpcrdma_ia *ia = rdmab_to_ia(buf);
1262 struct list_head *pos;
1263 struct rpcrdma_mw *r;
1264 unsigned long flags;
1266 list_for_each(pos, stale) {
1267 r = list_entry(pos, struct rpcrdma_mw, mw_list);
1268 rpcrdma_retry_local_inv(r, ia);
1271 spin_lock_irqsave(&buf->rb_lock, flags);
1272 list_splice_tail(stale, &buf->rb_mws);
1273 spin_unlock_irqrestore(&buf->rb_lock, flags);
1276 static struct rpcrdma_req *
1277 rpcrdma_buffer_get_frmrs(struct rpcrdma_req *req, struct rpcrdma_buffer *buf,
1278 struct list_head *stale)
1280 struct rpcrdma_mw *r;
1283 i = RPCRDMA_MAX_SEGS - 1;
1284 while (!list_empty(&buf->rb_mws)) {
1285 r = list_entry(buf->rb_mws.next,
1286 struct rpcrdma_mw, mw_list);
1287 list_del(&r->mw_list);
1288 if (r->r.frmr.fr_state == FRMR_IS_STALE) {
1289 list_add(&r->mw_list, stale);
1292 req->rl_segments[i].rl_mw = r;
1293 if (unlikely(i-- == 0))
1294 return req; /* Success */
1297 /* Not enough entries on rb_mws for this req */
1298 rpcrdma_buffer_put_sendbuf(req, buf);
1299 rpcrdma_buffer_put_mrs(req, buf);
1303 static struct rpcrdma_req *
1304 rpcrdma_buffer_get_fmrs(struct rpcrdma_req *req, struct rpcrdma_buffer *buf)
1306 struct rpcrdma_mw *r;
1309 i = RPCRDMA_MAX_SEGS - 1;
1310 while (!list_empty(&buf->rb_mws)) {
1311 r = list_entry(buf->rb_mws.next,
1312 struct rpcrdma_mw, mw_list);
1313 list_del(&r->mw_list);
1314 req->rl_segments[i].rl_mw = r;
1315 if (unlikely(i-- == 0))
1316 return req; /* Success */
1319 /* Not enough entries on rb_mws for this req */
1320 rpcrdma_buffer_put_sendbuf(req, buf);
1321 rpcrdma_buffer_put_mrs(req, buf);
1326 * Get a set of request/reply buffers.
1328 * Reply buffer (if needed) is attached to send buffer upon return.
1330 * rb_send_index and rb_recv_index MUST always be pointing to the
1331 * *next* available buffer (non-NULL). They are incremented after
1332 * removing buffers, and decremented *before* returning them.
1334 struct rpcrdma_req *
1335 rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
1337 struct rpcrdma_ia *ia = rdmab_to_ia(buffers);
1338 struct list_head stale;
1339 struct rpcrdma_req *req;
1340 unsigned long flags;
1342 spin_lock_irqsave(&buffers->rb_lock, flags);
1343 if (buffers->rb_send_index == buffers->rb_max_requests) {
1344 spin_unlock_irqrestore(&buffers->rb_lock, flags);
1345 dprintk("RPC: %s: out of request buffers\n", __func__);
1346 return ((struct rpcrdma_req *)NULL);
1349 req = buffers->rb_send_bufs[buffers->rb_send_index];
1350 if (buffers->rb_send_index < buffers->rb_recv_index) {
1351 dprintk("RPC: %s: %d extra receives outstanding (ok)\n",
1353 buffers->rb_recv_index - buffers->rb_send_index);
1354 req->rl_reply = NULL;
1356 req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
1357 buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
1359 buffers->rb_send_bufs[buffers->rb_send_index++] = NULL;
1361 INIT_LIST_HEAD(&stale);
1362 switch (ia->ri_memreg_strategy) {
1364 req = rpcrdma_buffer_get_frmrs(req, buffers, &stale);
1366 case RPCRDMA_MTHCAFMR:
1367 req = rpcrdma_buffer_get_fmrs(req, buffers);
1372 spin_unlock_irqrestore(&buffers->rb_lock, flags);
1373 if (!list_empty(&stale))
1374 rpcrdma_retry_flushed_linv(&stale, buffers);
1379 * Put request/reply buffers back into pool.
1380 * Pre-decrement counter/array index.
1383 rpcrdma_buffer_put(struct rpcrdma_req *req)
1385 struct rpcrdma_buffer *buffers = req->rl_buffer;
1386 struct rpcrdma_ia *ia = rdmab_to_ia(buffers);
1387 unsigned long flags;
1389 spin_lock_irqsave(&buffers->rb_lock, flags);
1390 rpcrdma_buffer_put_sendbuf(req, buffers);
1391 switch (ia->ri_memreg_strategy) {
1393 case RPCRDMA_MTHCAFMR:
1394 rpcrdma_buffer_put_mrs(req, buffers);
1399 spin_unlock_irqrestore(&buffers->rb_lock, flags);
1403 * Recover reply buffers from pool.
1404 * This happens when recovering from error conditions.
1405 * Post-increment counter/array index.
1408 rpcrdma_recv_buffer_get(struct rpcrdma_req *req)
1410 struct rpcrdma_buffer *buffers = req->rl_buffer;
1411 unsigned long flags;
1413 spin_lock_irqsave(&buffers->rb_lock, flags);
1414 if (buffers->rb_recv_index < buffers->rb_max_requests) {
1415 req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
1416 buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
1418 spin_unlock_irqrestore(&buffers->rb_lock, flags);
1422 * Put reply buffers back into pool when not attached to
1423 * request. This happens in error conditions.
1426 rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
1428 struct rpcrdma_buffer *buffers = &rep->rr_rxprt->rx_buf;
1429 unsigned long flags;
1431 rep->rr_func = NULL;
1432 spin_lock_irqsave(&buffers->rb_lock, flags);
1433 buffers->rb_recv_bufs[--buffers->rb_recv_index] = rep;
1434 spin_unlock_irqrestore(&buffers->rb_lock, flags);
1438 * Wrappers for internal-use kmalloc memory registration, used by buffer code.
1442 rpcrdma_mapping_error(struct rpcrdma_mr_seg *seg)
1444 dprintk("RPC: map_one: offset %p iova %llx len %zu\n",
1446 (unsigned long long)seg->mr_dma, seg->mr_dmalen);
1450 rpcrdma_register_internal(struct rpcrdma_ia *ia, void *va, int len,
1451 struct ib_mr **mrp, struct ib_sge *iov)
1453 struct ib_phys_buf ipb;
1458 * All memory passed here was kmalloc'ed, therefore phys-contiguous.
1460 iov->addr = ib_dma_map_single(ia->ri_id->device,
1461 va, len, DMA_BIDIRECTIONAL);
1462 if (ib_dma_mapping_error(ia->ri_id->device, iov->addr))
1467 if (ia->ri_have_dma_lkey) {
1469 iov->lkey = ia->ri_dma_lkey;
1471 } else if (ia->ri_bind_mem != NULL) {
1473 iov->lkey = ia->ri_bind_mem->lkey;
1477 ipb.addr = iov->addr;
1478 ipb.size = iov->length;
1479 mr = ib_reg_phys_mr(ia->ri_pd, &ipb, 1,
1480 IB_ACCESS_LOCAL_WRITE, &iov->addr);
1482 dprintk("RPC: %s: phys convert: 0x%llx "
1483 "registered 0x%llx length %d\n",
1484 __func__, (unsigned long long)ipb.addr,
1485 (unsigned long long)iov->addr, len);
1490 dprintk("RPC: %s: failed with %i\n", __func__, rc);
1493 iov->lkey = mr->lkey;
1501 rpcrdma_deregister_internal(struct rpcrdma_ia *ia,
1502 struct ib_mr *mr, struct ib_sge *iov)
1506 ib_dma_unmap_single(ia->ri_id->device,
1507 iov->addr, iov->length, DMA_BIDIRECTIONAL);
1512 rc = ib_dereg_mr(mr);
1514 dprintk("RPC: %s: ib_dereg_mr failed %i\n", __func__, rc);
1519 * rpcrdma_alloc_regbuf - kmalloc and register memory for SEND/RECV buffers
1520 * @ia: controlling rpcrdma_ia
1521 * @size: size of buffer to be allocated, in bytes
1524 * Returns pointer to private header of an area of internally
1525 * registered memory, or an ERR_PTR. The registered buffer follows
1526 * the end of the private header.
1528 * xprtrdma uses a regbuf for posting an outgoing RDMA SEND, or for
1529 * receiving the payload of RDMA RECV operations. regbufs are not
1530 * used for RDMA READ/WRITE operations, thus are registered only for
1533 struct rpcrdma_regbuf *
1534 rpcrdma_alloc_regbuf(struct rpcrdma_ia *ia, size_t size, gfp_t flags)
1536 struct rpcrdma_regbuf *rb;
1540 rb = kmalloc(sizeof(*rb) + size, flags);
1545 rb->rg_owner = NULL;
1546 rc = rpcrdma_register_internal(ia, rb->rg_base, size,
1547 &rb->rg_mr, &rb->rg_iov);
1560 * rpcrdma_free_regbuf - deregister and free registered buffer
1561 * @ia: controlling rpcrdma_ia
1562 * @rb: regbuf to be deregistered and freed
1565 rpcrdma_free_regbuf(struct rpcrdma_ia *ia, struct rpcrdma_regbuf *rb)
1568 rpcrdma_deregister_internal(ia, rb->rg_mr, &rb->rg_iov);
1574 * Prepost any receive buffer, then post send.
1576 * Receive buffer is donated to hardware, reclaimed upon recv completion.
1579 rpcrdma_ep_post(struct rpcrdma_ia *ia,
1580 struct rpcrdma_ep *ep,
1581 struct rpcrdma_req *req)
1583 struct ib_send_wr send_wr, *send_wr_fail;
1584 struct rpcrdma_rep *rep = req->rl_reply;
1588 rc = rpcrdma_ep_post_recv(ia, ep, rep);
1591 req->rl_reply = NULL;
1594 send_wr.next = NULL;
1595 send_wr.wr_id = RPCRDMA_IGNORE_COMPLETION;
1596 send_wr.sg_list = req->rl_send_iov;
1597 send_wr.num_sge = req->rl_niovs;
1598 send_wr.opcode = IB_WR_SEND;
1599 if (send_wr.num_sge == 4) /* no need to sync any pad (constant) */
1600 ib_dma_sync_single_for_device(ia->ri_id->device,
1601 req->rl_send_iov[3].addr, req->rl_send_iov[3].length,
1603 ib_dma_sync_single_for_device(ia->ri_id->device,
1604 req->rl_send_iov[1].addr, req->rl_send_iov[1].length,
1606 ib_dma_sync_single_for_device(ia->ri_id->device,
1607 req->rl_send_iov[0].addr, req->rl_send_iov[0].length,
1610 if (DECR_CQCOUNT(ep) > 0)
1611 send_wr.send_flags = 0;
1612 else { /* Provider must take a send completion every now and then */
1614 send_wr.send_flags = IB_SEND_SIGNALED;
1617 rc = ib_post_send(ia->ri_id->qp, &send_wr, &send_wr_fail);
1619 dprintk("RPC: %s: ib_post_send returned %i\n", __func__,
1626 * (Re)post a receive buffer.
1629 rpcrdma_ep_post_recv(struct rpcrdma_ia *ia,
1630 struct rpcrdma_ep *ep,
1631 struct rpcrdma_rep *rep)
1633 struct ib_recv_wr recv_wr, *recv_wr_fail;
1636 recv_wr.next = NULL;
1637 recv_wr.wr_id = (u64) (unsigned long) rep;
1638 recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov;
1639 recv_wr.num_sge = 1;
1641 ib_dma_sync_single_for_cpu(ia->ri_id->device,
1642 rdmab_addr(rep->rr_rdmabuf),
1643 rdmab_length(rep->rr_rdmabuf),
1646 rc = ib_post_recv(ia->ri_id->qp, &recv_wr, &recv_wr_fail);
1649 dprintk("RPC: %s: ib_post_recv returned %i\n", __func__,
1654 /* How many chunk list items fit within our inline buffers?
1657 rpcrdma_max_segments(struct rpcrdma_xprt *r_xprt)
1659 struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
1660 int bytes, segments;
1662 bytes = min_t(unsigned int, cdata->inline_wsize, cdata->inline_rsize);
1663 bytes -= RPCRDMA_HDRLEN_MIN;
1664 if (bytes < sizeof(struct rpcrdma_segment) * 2) {
1665 pr_warn("RPC: %s: inline threshold too small\n",
1670 segments = 1 << (fls(bytes / sizeof(struct rpcrdma_segment)) - 1);
1671 dprintk("RPC: %s: max chunk list size = %d segments\n",
1672 __func__, segments);