2 * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
4 * This software is available to you under a choice of one of two
5 * licenses. You may choose to be licensed under the terms of the GNU
6 * General Public License (GPL) Version 2, available from the file
7 * COPYING in the main directory of this source tree, or the BSD-type
10 * Redistribution and use in source and binary forms, with or without
11 * modification, are permitted provided that the following conditions
14 * Redistributions of source code must retain the above copyright
15 * notice, this list of conditions and the following disclaimer.
17 * Redistributions in binary form must reproduce the above
18 * copyright notice, this list of conditions and the following
19 * disclaimer in the documentation and/or other materials provided
20 * with the distribution.
22 * Neither the name of the Network Appliance, Inc. nor the names of
23 * its contributors may be used to endorse or promote products
24 * derived from this software without specific prior written
27 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
28 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
29 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
30 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
31 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
32 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
33 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
34 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
35 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
36 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
37 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
43 * Encapsulates the major functions managing:
50 #include <linux/interrupt.h>
51 #include <linux/slab.h>
52 #include <linux/prefetch.h>
53 #include <linux/sunrpc/addr.h>
54 #include <asm/bitops.h>
56 #include "xprt_rdma.h"
62 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
63 # define RPCDBG_FACILITY RPCDBG_TRANS
71 * handle replies in tasklet context, using a single, global list
72 * rdma tasklet function -- just turn around and call the func
73 * for all replies on the list
76 static DEFINE_SPINLOCK(rpcrdma_tk_lock_g);
77 static LIST_HEAD(rpcrdma_tasklets_g);
80 rpcrdma_run_tasklet(unsigned long data)
82 struct rpcrdma_rep *rep;
83 void (*func)(struct rpcrdma_rep *);
87 spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
88 while (!list_empty(&rpcrdma_tasklets_g)) {
89 rep = list_entry(rpcrdma_tasklets_g.next,
90 struct rpcrdma_rep, rr_list);
91 list_del(&rep->rr_list);
94 spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
99 rpcrdma_recv_buffer_put(rep);
101 spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
103 spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
106 static DECLARE_TASKLET(rpcrdma_tasklet_g, rpcrdma_run_tasklet, 0UL);
108 static const char * const async_event[] = {
113 "communication established",
114 "send queue drained",
115 "path migration successful",
117 "device fatal error",
130 #define ASYNC_MSG(status) \
131 ((status) < ARRAY_SIZE(async_event) ? \
132 async_event[(status)] : "unknown async error")
135 rpcrdma_schedule_tasklet(struct list_head *sched_list)
139 spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
140 list_splice_tail(sched_list, &rpcrdma_tasklets_g);
141 spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
142 tasklet_schedule(&rpcrdma_tasklet_g);
146 rpcrdma_qp_async_error_upcall(struct ib_event *event, void *context)
148 struct rpcrdma_ep *ep = context;
150 pr_err("RPC: %s: %s on device %s ep %p\n",
151 __func__, ASYNC_MSG(event->event),
152 event->device->name, context);
153 if (ep->rep_connected == 1) {
154 ep->rep_connected = -EIO;
155 rpcrdma_conn_func(ep);
156 wake_up_all(&ep->rep_connect_wait);
161 rpcrdma_cq_async_error_upcall(struct ib_event *event, void *context)
163 struct rpcrdma_ep *ep = context;
165 pr_err("RPC: %s: %s on device %s ep %p\n",
166 __func__, ASYNC_MSG(event->event),
167 event->device->name, context);
168 if (ep->rep_connected == 1) {
169 ep->rep_connected = -EIO;
170 rpcrdma_conn_func(ep);
171 wake_up_all(&ep->rep_connect_wait);
175 static const char * const wc_status[] = {
177 "local length error",
178 "local QP operation error",
179 "local EE context operation error",
180 "local protection error",
182 "memory management operation error",
183 "bad response error",
184 "local access error",
185 "remote invalid request error",
186 "remote access error",
187 "remote operation error",
188 "transport retry counter exceeded",
189 "RNR retry counter exceeded",
190 "local RDD violation error",
191 "remove invalid RD request",
193 "invalid EE context number",
194 "invalid EE context state",
196 "response timeout error",
200 #define COMPLETION_MSG(status) \
201 ((status) < ARRAY_SIZE(wc_status) ? \
202 wc_status[(status)] : "unexpected completion error")
205 rpcrdma_sendcq_process_wc(struct ib_wc *wc)
207 /* WARNING: Only wr_id and status are reliable at this point */
208 if (wc->wr_id == RPCRDMA_IGNORE_COMPLETION) {
209 if (wc->status != IB_WC_SUCCESS &&
210 wc->status != IB_WC_WR_FLUSH_ERR)
211 pr_err("RPC: %s: SEND: %s\n",
212 __func__, COMPLETION_MSG(wc->status));
214 struct rpcrdma_mw *r;
216 r = (struct rpcrdma_mw *)(unsigned long)wc->wr_id;
217 r->mw_sendcompletion(wc);
222 rpcrdma_sendcq_poll(struct ib_cq *cq, struct rpcrdma_ep *ep)
225 int budget, count, rc;
227 budget = RPCRDMA_WC_BUDGET / RPCRDMA_POLLSIZE;
229 wcs = ep->rep_send_wcs;
231 rc = ib_poll_cq(cq, RPCRDMA_POLLSIZE, wcs);
237 rpcrdma_sendcq_process_wc(wcs++);
238 } while (rc == RPCRDMA_POLLSIZE && --budget);
243 * Handle send, fast_reg_mr, and local_inv completions.
245 * Send events are typically suppressed and thus do not result
246 * in an upcall. Occasionally one is signaled, however. This
247 * prevents the provider's completion queue from wrapping and
248 * losing a completion.
251 rpcrdma_sendcq_upcall(struct ib_cq *cq, void *cq_context)
253 struct rpcrdma_ep *ep = (struct rpcrdma_ep *)cq_context;
256 rc = rpcrdma_sendcq_poll(cq, ep);
258 dprintk("RPC: %s: ib_poll_cq failed: %i\n",
263 rc = ib_req_notify_cq(cq,
264 IB_CQ_NEXT_COMP | IB_CQ_REPORT_MISSED_EVENTS);
268 dprintk("RPC: %s: ib_req_notify_cq failed: %i\n",
273 rpcrdma_sendcq_poll(cq, ep);
277 rpcrdma_recvcq_process_wc(struct ib_wc *wc, struct list_head *sched_list)
279 struct rpcrdma_rep *rep =
280 (struct rpcrdma_rep *)(unsigned long)wc->wr_id;
282 /* WARNING: Only wr_id and status are reliable at this point */
283 if (wc->status != IB_WC_SUCCESS)
286 /* status == SUCCESS means all fields in wc are trustworthy */
287 if (wc->opcode != IB_WC_RECV)
290 dprintk("RPC: %s: rep %p opcode 'recv', length %u: success\n",
291 __func__, rep, wc->byte_len);
293 rep->rr_len = wc->byte_len;
294 ib_dma_sync_single_for_cpu(rdmab_to_ia(rep->rr_buffer)->ri_id->device,
295 rdmab_addr(rep->rr_rdmabuf),
296 rep->rr_len, DMA_FROM_DEVICE);
297 prefetch(rdmab_to_msg(rep->rr_rdmabuf));
300 list_add_tail(&rep->rr_list, sched_list);
303 if (wc->status != IB_WC_WR_FLUSH_ERR)
304 pr_err("RPC: %s: rep %p: %s\n",
305 __func__, rep, COMPLETION_MSG(wc->status));
311 rpcrdma_recvcq_poll(struct ib_cq *cq, struct rpcrdma_ep *ep)
313 struct list_head sched_list;
315 int budget, count, rc;
317 INIT_LIST_HEAD(&sched_list);
318 budget = RPCRDMA_WC_BUDGET / RPCRDMA_POLLSIZE;
320 wcs = ep->rep_recv_wcs;
322 rc = ib_poll_cq(cq, RPCRDMA_POLLSIZE, wcs);
328 rpcrdma_recvcq_process_wc(wcs++, &sched_list);
329 } while (rc == RPCRDMA_POLLSIZE && --budget);
333 rpcrdma_schedule_tasklet(&sched_list);
338 * Handle receive completions.
340 * It is reentrant but processes single events in order to maintain
341 * ordering of receives to keep server credits.
343 * It is the responsibility of the scheduled tasklet to return
344 * recv buffers to the pool. NOTE: this affects synchronization of
345 * connection shutdown. That is, the structures required for
346 * the completion of the reply handler must remain intact until
347 * all memory has been reclaimed.
350 rpcrdma_recvcq_upcall(struct ib_cq *cq, void *cq_context)
352 struct rpcrdma_ep *ep = (struct rpcrdma_ep *)cq_context;
355 rc = rpcrdma_recvcq_poll(cq, ep);
357 dprintk("RPC: %s: ib_poll_cq failed: %i\n",
362 rc = ib_req_notify_cq(cq,
363 IB_CQ_NEXT_COMP | IB_CQ_REPORT_MISSED_EVENTS);
367 dprintk("RPC: %s: ib_req_notify_cq failed: %i\n",
372 rpcrdma_recvcq_poll(cq, ep);
376 rpcrdma_flush_cqs(struct rpcrdma_ep *ep)
379 LIST_HEAD(sched_list);
381 while (ib_poll_cq(ep->rep_attr.recv_cq, 1, &wc) > 0)
382 rpcrdma_recvcq_process_wc(&wc, &sched_list);
383 if (!list_empty(&sched_list))
384 rpcrdma_schedule_tasklet(&sched_list);
385 while (ib_poll_cq(ep->rep_attr.send_cq, 1, &wc) > 0)
386 rpcrdma_sendcq_process_wc(&wc);
389 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
390 static const char * const conn[] = {
409 #define CONNECTION_MSG(status) \
410 ((status) < ARRAY_SIZE(conn) ? \
411 conn[(status)] : "unrecognized connection error")
415 rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
417 struct rpcrdma_xprt *xprt = id->context;
418 struct rpcrdma_ia *ia = &xprt->rx_ia;
419 struct rpcrdma_ep *ep = &xprt->rx_ep;
420 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
421 struct sockaddr *sap = (struct sockaddr *)&ep->rep_remote_addr;
423 struct ib_qp_attr *attr = &ia->ri_qp_attr;
424 struct ib_qp_init_attr *iattr = &ia->ri_qp_init_attr;
427 switch (event->event) {
428 case RDMA_CM_EVENT_ADDR_RESOLVED:
429 case RDMA_CM_EVENT_ROUTE_RESOLVED:
431 complete(&ia->ri_done);
433 case RDMA_CM_EVENT_ADDR_ERROR:
434 ia->ri_async_rc = -EHOSTUNREACH;
435 dprintk("RPC: %s: CM address resolution error, ep 0x%p\n",
437 complete(&ia->ri_done);
439 case RDMA_CM_EVENT_ROUTE_ERROR:
440 ia->ri_async_rc = -ENETUNREACH;
441 dprintk("RPC: %s: CM route resolution error, ep 0x%p\n",
443 complete(&ia->ri_done);
445 case RDMA_CM_EVENT_ESTABLISHED:
447 ib_query_qp(ia->ri_id->qp, attr,
448 IB_QP_MAX_QP_RD_ATOMIC | IB_QP_MAX_DEST_RD_ATOMIC,
450 dprintk("RPC: %s: %d responder resources"
452 __func__, attr->max_dest_rd_atomic,
453 attr->max_rd_atomic);
455 case RDMA_CM_EVENT_CONNECT_ERROR:
456 connstate = -ENOTCONN;
458 case RDMA_CM_EVENT_UNREACHABLE:
459 connstate = -ENETDOWN;
461 case RDMA_CM_EVENT_REJECTED:
462 connstate = -ECONNREFUSED;
464 case RDMA_CM_EVENT_DISCONNECTED:
465 connstate = -ECONNABORTED;
467 case RDMA_CM_EVENT_DEVICE_REMOVAL:
470 dprintk("RPC: %s: %sconnected\n",
471 __func__, connstate > 0 ? "" : "dis");
472 ep->rep_connected = connstate;
473 rpcrdma_conn_func(ep);
474 wake_up_all(&ep->rep_connect_wait);
477 dprintk("RPC: %s: %pIS:%u (ep 0x%p): %s\n",
478 __func__, sap, rpc_get_port(sap), ep,
479 CONNECTION_MSG(event->event));
483 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
484 if (connstate == 1) {
485 int ird = attr->max_dest_rd_atomic;
486 int tird = ep->rep_remote_cma.responder_resources;
488 pr_info("rpcrdma: connection to %pIS:%u on %s, memreg '%s', %d credits, %d responders%s\n",
489 sap, rpc_get_port(sap),
490 ia->ri_id->device->name,
491 ia->ri_ops->ro_displayname,
492 xprt->rx_buf.rb_max_requests,
493 ird, ird < 4 && ird < tird / 2 ? " (low!)" : "");
494 } else if (connstate < 0) {
495 pr_info("rpcrdma: connection to %pIS:%u closed (%d)\n",
496 sap, rpc_get_port(sap), connstate);
503 static struct rdma_cm_id *
504 rpcrdma_create_id(struct rpcrdma_xprt *xprt,
505 struct rpcrdma_ia *ia, struct sockaddr *addr)
507 struct rdma_cm_id *id;
510 init_completion(&ia->ri_done);
512 id = rdma_create_id(rpcrdma_conn_upcall, xprt, RDMA_PS_TCP, IB_QPT_RC);
515 dprintk("RPC: %s: rdma_create_id() failed %i\n",
520 ia->ri_async_rc = -ETIMEDOUT;
521 rc = rdma_resolve_addr(id, NULL, addr, RDMA_RESOLVE_TIMEOUT);
523 dprintk("RPC: %s: rdma_resolve_addr() failed %i\n",
527 wait_for_completion_interruptible_timeout(&ia->ri_done,
528 msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1);
529 rc = ia->ri_async_rc;
533 ia->ri_async_rc = -ETIMEDOUT;
534 rc = rdma_resolve_route(id, RDMA_RESOLVE_TIMEOUT);
536 dprintk("RPC: %s: rdma_resolve_route() failed %i\n",
540 wait_for_completion_interruptible_timeout(&ia->ri_done,
541 msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1);
542 rc = ia->ri_async_rc;
554 * Drain any cq, prior to teardown.
557 rpcrdma_clean_cq(struct ib_cq *cq)
562 while (1 == ib_poll_cq(cq, 1, &wc))
566 dprintk("RPC: %s: flushed %d events (last 0x%x)\n",
567 __func__, count, wc.opcode);
571 * Exported functions.
575 * Open and initialize an Interface Adapter.
576 * o initializes fields of struct rpcrdma_ia, including
577 * interface and provider attributes and protection zone.
580 rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)
583 struct rpcrdma_ia *ia = &xprt->rx_ia;
584 struct ib_device_attr *devattr = &ia->ri_devattr;
586 ia->ri_id = rpcrdma_create_id(xprt, ia, addr);
587 if (IS_ERR(ia->ri_id)) {
588 rc = PTR_ERR(ia->ri_id);
592 ia->ri_pd = ib_alloc_pd(ia->ri_id->device);
593 if (IS_ERR(ia->ri_pd)) {
594 rc = PTR_ERR(ia->ri_pd);
595 dprintk("RPC: %s: ib_alloc_pd() failed %i\n",
600 rc = ib_query_device(ia->ri_id->device, devattr);
602 dprintk("RPC: %s: ib_query_device failed %d\n",
607 if (devattr->device_cap_flags & IB_DEVICE_LOCAL_DMA_LKEY) {
608 ia->ri_have_dma_lkey = 1;
609 ia->ri_dma_lkey = ia->ri_id->device->local_dma_lkey;
612 if (memreg == RPCRDMA_FRMR) {
613 /* Requires both frmr reg and local dma lkey */
614 if (((devattr->device_cap_flags &
615 (IB_DEVICE_MEM_MGT_EXTENSIONS|IB_DEVICE_LOCAL_DMA_LKEY)) !=
616 (IB_DEVICE_MEM_MGT_EXTENSIONS|IB_DEVICE_LOCAL_DMA_LKEY)) ||
617 (devattr->max_fast_reg_page_list_len == 0)) {
618 dprintk("RPC: %s: FRMR registration "
619 "not supported by HCA\n", __func__);
620 memreg = RPCRDMA_MTHCAFMR;
623 if (memreg == RPCRDMA_MTHCAFMR) {
624 if (!ia->ri_id->device->alloc_fmr) {
625 dprintk("RPC: %s: MTHCAFMR registration "
626 "not supported by HCA\n", __func__);
627 memreg = RPCRDMA_ALLPHYSICAL;
632 * Optionally obtain an underlying physical identity mapping in
633 * order to do a memory window-based bind. This base registration
634 * is protected from remote access - that is enabled only by binding
635 * for the specific bytes targeted during each RPC operation, and
636 * revoked after the corresponding completion similar to a storage
641 ia->ri_ops = &rpcrdma_frwr_memreg_ops;
643 case RPCRDMA_ALLPHYSICAL:
644 ia->ri_ops = &rpcrdma_physical_memreg_ops;
645 mem_priv = IB_ACCESS_LOCAL_WRITE |
646 IB_ACCESS_REMOTE_WRITE |
647 IB_ACCESS_REMOTE_READ;
649 case RPCRDMA_MTHCAFMR:
650 ia->ri_ops = &rpcrdma_fmr_memreg_ops;
651 if (ia->ri_have_dma_lkey)
653 mem_priv = IB_ACCESS_LOCAL_WRITE;
655 ia->ri_bind_mem = ib_get_dma_mr(ia->ri_pd, mem_priv);
656 if (IS_ERR(ia->ri_bind_mem)) {
657 printk(KERN_ALERT "%s: ib_get_dma_mr for "
658 "phys register failed with %lX\n",
659 __func__, PTR_ERR(ia->ri_bind_mem));
665 printk(KERN_ERR "RPC: Unsupported memory "
666 "registration mode: %d\n", memreg);
670 dprintk("RPC: %s: memory registration strategy is '%s'\n",
671 __func__, ia->ri_ops->ro_displayname);
673 /* Else will do memory reg/dereg for each chunk */
674 ia->ri_memreg_strategy = memreg;
676 rwlock_init(&ia->ri_qplock);
680 ib_dealloc_pd(ia->ri_pd);
683 rdma_destroy_id(ia->ri_id);
690 * Clean up/close an IA.
691 * o if event handles and PD have been initialized, free them.
695 rpcrdma_ia_close(struct rpcrdma_ia *ia)
699 dprintk("RPC: %s: entering\n", __func__);
700 if (ia->ri_bind_mem != NULL) {
701 rc = ib_dereg_mr(ia->ri_bind_mem);
702 dprintk("RPC: %s: ib_dereg_mr returned %i\n",
705 if (ia->ri_id != NULL && !IS_ERR(ia->ri_id)) {
707 rdma_destroy_qp(ia->ri_id);
708 rdma_destroy_id(ia->ri_id);
711 if (ia->ri_pd != NULL && !IS_ERR(ia->ri_pd)) {
712 rc = ib_dealloc_pd(ia->ri_pd);
713 dprintk("RPC: %s: ib_dealloc_pd returned %i\n",
719 * Create unconnected endpoint.
722 rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
723 struct rpcrdma_create_data_internal *cdata)
725 struct ib_device_attr *devattr = &ia->ri_devattr;
726 struct ib_cq *sendcq, *recvcq;
729 /* check provider's send/recv wr limits */
730 if (cdata->max_requests > devattr->max_qp_wr)
731 cdata->max_requests = devattr->max_qp_wr;
733 ep->rep_attr.event_handler = rpcrdma_qp_async_error_upcall;
734 ep->rep_attr.qp_context = ep;
735 ep->rep_attr.srq = NULL;
736 ep->rep_attr.cap.max_send_wr = cdata->max_requests;
737 rc = ia->ri_ops->ro_open(ia, ep, cdata);
740 ep->rep_attr.cap.max_recv_wr = cdata->max_requests;
741 ep->rep_attr.cap.max_send_sge = (cdata->padding ? 4 : 2);
742 ep->rep_attr.cap.max_recv_sge = 1;
743 ep->rep_attr.cap.max_inline_data = 0;
744 ep->rep_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
745 ep->rep_attr.qp_type = IB_QPT_RC;
746 ep->rep_attr.port_num = ~0;
748 if (cdata->padding) {
749 ep->rep_padbuf = rpcrdma_alloc_regbuf(ia, cdata->padding,
751 if (IS_ERR(ep->rep_padbuf))
752 return PTR_ERR(ep->rep_padbuf);
754 ep->rep_padbuf = NULL;
756 dprintk("RPC: %s: requested max: dtos: send %d recv %d; "
757 "iovs: send %d recv %d\n",
759 ep->rep_attr.cap.max_send_wr,
760 ep->rep_attr.cap.max_recv_wr,
761 ep->rep_attr.cap.max_send_sge,
762 ep->rep_attr.cap.max_recv_sge);
764 /* set trigger for requesting send completion */
765 ep->rep_cqinit = ep->rep_attr.cap.max_send_wr/2 - 1;
766 if (ep->rep_cqinit > RPCRDMA_MAX_UNSIGNALED_SENDS)
767 ep->rep_cqinit = RPCRDMA_MAX_UNSIGNALED_SENDS;
768 else if (ep->rep_cqinit <= 2)
771 init_waitqueue_head(&ep->rep_connect_wait);
772 INIT_DELAYED_WORK(&ep->rep_connect_worker, rpcrdma_connect_worker);
774 sendcq = ib_create_cq(ia->ri_id->device, rpcrdma_sendcq_upcall,
775 rpcrdma_cq_async_error_upcall, ep,
776 ep->rep_attr.cap.max_send_wr + 1, 0);
777 if (IS_ERR(sendcq)) {
778 rc = PTR_ERR(sendcq);
779 dprintk("RPC: %s: failed to create send CQ: %i\n",
784 rc = ib_req_notify_cq(sendcq, IB_CQ_NEXT_COMP);
786 dprintk("RPC: %s: ib_req_notify_cq failed: %i\n",
791 recvcq = ib_create_cq(ia->ri_id->device, rpcrdma_recvcq_upcall,
792 rpcrdma_cq_async_error_upcall, ep,
793 ep->rep_attr.cap.max_recv_wr + 1, 0);
794 if (IS_ERR(recvcq)) {
795 rc = PTR_ERR(recvcq);
796 dprintk("RPC: %s: failed to create recv CQ: %i\n",
801 rc = ib_req_notify_cq(recvcq, IB_CQ_NEXT_COMP);
803 dprintk("RPC: %s: ib_req_notify_cq failed: %i\n",
805 ib_destroy_cq(recvcq);
809 ep->rep_attr.send_cq = sendcq;
810 ep->rep_attr.recv_cq = recvcq;
812 /* Initialize cma parameters */
814 /* RPC/RDMA does not use private data */
815 ep->rep_remote_cma.private_data = NULL;
816 ep->rep_remote_cma.private_data_len = 0;
818 /* Client offers RDMA Read but does not initiate */
819 ep->rep_remote_cma.initiator_depth = 0;
820 if (devattr->max_qp_rd_atom > 32) /* arbitrary but <= 255 */
821 ep->rep_remote_cma.responder_resources = 32;
823 ep->rep_remote_cma.responder_resources =
824 devattr->max_qp_rd_atom;
826 ep->rep_remote_cma.retry_count = 7;
827 ep->rep_remote_cma.flow_control = 0;
828 ep->rep_remote_cma.rnr_retry_count = 0;
833 err = ib_destroy_cq(sendcq);
835 dprintk("RPC: %s: ib_destroy_cq returned %i\n",
838 rpcrdma_free_regbuf(ia, ep->rep_padbuf);
845 * Disconnect and destroy endpoint. After this, the only
846 * valid operations on the ep are to free it (if dynamically
847 * allocated) or re-create it.
850 rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
854 dprintk("RPC: %s: entering, connected is %d\n",
855 __func__, ep->rep_connected);
857 cancel_delayed_work_sync(&ep->rep_connect_worker);
860 rpcrdma_ep_disconnect(ep, ia);
861 rdma_destroy_qp(ia->ri_id);
862 ia->ri_id->qp = NULL;
865 rpcrdma_free_regbuf(ia, ep->rep_padbuf);
867 rpcrdma_clean_cq(ep->rep_attr.recv_cq);
868 rc = ib_destroy_cq(ep->rep_attr.recv_cq);
870 dprintk("RPC: %s: ib_destroy_cq returned %i\n",
873 rpcrdma_clean_cq(ep->rep_attr.send_cq);
874 rc = ib_destroy_cq(ep->rep_attr.send_cq);
876 dprintk("RPC: %s: ib_destroy_cq returned %i\n",
881 * Connect unconnected endpoint.
884 rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
886 struct rdma_cm_id *id, *old;
890 if (ep->rep_connected != 0) {
891 struct rpcrdma_xprt *xprt;
893 dprintk("RPC: %s: reconnecting...\n", __func__);
895 rpcrdma_ep_disconnect(ep, ia);
896 rpcrdma_flush_cqs(ep);
898 xprt = container_of(ia, struct rpcrdma_xprt, rx_ia);
899 ia->ri_ops->ro_reset(xprt);
901 id = rpcrdma_create_id(xprt, ia,
902 (struct sockaddr *)&xprt->rx_data.addr);
907 /* TEMP TEMP TEMP - fail if new device:
908 * Deregister/remarshal *all* requests!
909 * Close and recreate adapter, pd, etc!
910 * Re-determine all attributes still sane!
911 * More stuff I haven't thought of!
914 if (ia->ri_id->device != id->device) {
915 printk("RPC: %s: can't reconnect on "
916 "different device!\n", __func__);
922 rc = rdma_create_qp(id, ia->ri_pd, &ep->rep_attr);
924 dprintk("RPC: %s: rdma_create_qp failed %i\n",
931 write_lock(&ia->ri_qplock);
934 write_unlock(&ia->ri_qplock);
936 rdma_destroy_qp(old);
937 rdma_destroy_id(old);
939 dprintk("RPC: %s: connecting...\n", __func__);
940 rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr);
942 dprintk("RPC: %s: rdma_create_qp failed %i\n",
944 /* do not update ep->rep_connected */
949 ep->rep_connected = 0;
951 rc = rdma_connect(ia->ri_id, &ep->rep_remote_cma);
953 dprintk("RPC: %s: rdma_connect() failed with %i\n",
958 wait_event_interruptible(ep->rep_connect_wait, ep->rep_connected != 0);
961 * Check state. A non-peer reject indicates no listener
962 * (ECONNREFUSED), which may be a transient state. All
963 * others indicate a transport condition which has already
964 * undergone a best-effort.
966 if (ep->rep_connected == -ECONNREFUSED &&
967 ++retry_count <= RDMA_CONNECT_RETRY_MAX) {
968 dprintk("RPC: %s: non-peer_reject, retry\n", __func__);
971 if (ep->rep_connected <= 0) {
972 /* Sometimes, the only way to reliably connect to remote
973 * CMs is to use same nonzero values for ORD and IRD. */
974 if (retry_count++ <= RDMA_CONNECT_RETRY_MAX + 1 &&
975 (ep->rep_remote_cma.responder_resources == 0 ||
976 ep->rep_remote_cma.initiator_depth !=
977 ep->rep_remote_cma.responder_resources)) {
978 if (ep->rep_remote_cma.responder_resources == 0)
979 ep->rep_remote_cma.responder_resources = 1;
980 ep->rep_remote_cma.initiator_depth =
981 ep->rep_remote_cma.responder_resources;
984 rc = ep->rep_connected;
986 dprintk("RPC: %s: connected\n", __func__);
991 ep->rep_connected = rc;
996 * rpcrdma_ep_disconnect
998 * This is separate from destroy to facilitate the ability
999 * to reconnect without recreating the endpoint.
1001 * This call is not reentrant, and must not be made in parallel
1002 * on the same endpoint.
1005 rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
1009 rpcrdma_flush_cqs(ep);
1010 rc = rdma_disconnect(ia->ri_id);
1012 /* returns without wait if not connected */
1013 wait_event_interruptible(ep->rep_connect_wait,
1014 ep->rep_connected != 1);
1015 dprintk("RPC: %s: after wait, %sconnected\n", __func__,
1016 (ep->rep_connected == 1) ? "still " : "dis");
1018 dprintk("RPC: %s: rdma_disconnect %i\n", __func__, rc);
1019 ep->rep_connected = rc;
1023 static struct rpcrdma_req *
1024 rpcrdma_create_req(struct rpcrdma_xprt *r_xprt)
1026 struct rpcrdma_req *req;
1028 req = kzalloc(sizeof(*req), GFP_KERNEL);
1030 return ERR_PTR(-ENOMEM);
1032 req->rl_buffer = &r_xprt->rx_buf;
1036 static struct rpcrdma_rep *
1037 rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt)
1039 struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
1040 struct rpcrdma_ia *ia = &r_xprt->rx_ia;
1041 struct rpcrdma_rep *rep;
1045 rep = kzalloc(sizeof(*rep), GFP_KERNEL);
1049 rep->rr_rdmabuf = rpcrdma_alloc_regbuf(ia, cdata->inline_rsize,
1051 if (IS_ERR(rep->rr_rdmabuf)) {
1052 rc = PTR_ERR(rep->rr_rdmabuf);
1056 rep->rr_buffer = &r_xprt->rx_buf;
1066 rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
1068 struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
1069 struct rpcrdma_ia *ia = &r_xprt->rx_ia;
1070 struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
1075 buf->rb_max_requests = cdata->max_requests;
1076 spin_lock_init(&buf->rb_lock);
1078 /* Need to allocate:
1079 * 1. arrays for send and recv pointers
1080 * 2. arrays of struct rpcrdma_req to fill in pointers
1081 * 3. array of struct rpcrdma_rep for replies
1082 * Send/recv buffers in req/rep need to be registered
1084 len = buf->rb_max_requests *
1085 (sizeof(struct rpcrdma_req *) + sizeof(struct rpcrdma_rep *));
1087 p = kzalloc(len, GFP_KERNEL);
1089 dprintk("RPC: %s: req_t/rep_t/pad kzalloc(%zd) failed\n",
1094 buf->rb_pool = p; /* for freeing it later */
1096 buf->rb_send_bufs = (struct rpcrdma_req **) p;
1097 p = (char *) &buf->rb_send_bufs[buf->rb_max_requests];
1098 buf->rb_recv_bufs = (struct rpcrdma_rep **) p;
1099 p = (char *) &buf->rb_recv_bufs[buf->rb_max_requests];
1101 rc = ia->ri_ops->ro_init(r_xprt);
1105 for (i = 0; i < buf->rb_max_requests; i++) {
1106 struct rpcrdma_req *req;
1107 struct rpcrdma_rep *rep;
1109 req = rpcrdma_create_req(r_xprt);
1111 dprintk("RPC: %s: request buffer %d alloc"
1112 " failed\n", __func__, i);
1116 buf->rb_send_bufs[i] = req;
1118 rep = rpcrdma_create_rep(r_xprt);
1120 dprintk("RPC: %s: reply buffer %d alloc failed\n",
1125 buf->rb_recv_bufs[i] = rep;
1130 rpcrdma_buffer_destroy(buf);
1135 rpcrdma_destroy_rep(struct rpcrdma_ia *ia, struct rpcrdma_rep *rep)
1140 rpcrdma_free_regbuf(ia, rep->rr_rdmabuf);
1145 rpcrdma_destroy_req(struct rpcrdma_ia *ia, struct rpcrdma_req *req)
1150 rpcrdma_free_regbuf(ia, req->rl_sendbuf);
1151 rpcrdma_free_regbuf(ia, req->rl_rdmabuf);
1156 rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
1158 struct rpcrdma_ia *ia = rdmab_to_ia(buf);
1161 /* clean up in reverse order from create
1162 * 1. recv mr memory (mr free, then kfree)
1163 * 2. send mr memory (mr free, then kfree)
1166 dprintk("RPC: %s: entering\n", __func__);
1168 for (i = 0; i < buf->rb_max_requests; i++) {
1169 if (buf->rb_recv_bufs)
1170 rpcrdma_destroy_rep(ia, buf->rb_recv_bufs[i]);
1171 if (buf->rb_send_bufs)
1172 rpcrdma_destroy_req(ia, buf->rb_send_bufs[i]);
1175 ia->ri_ops->ro_destroy(buf);
1177 kfree(buf->rb_pool);
1180 /* "*mw" can be NULL when rpcrdma_buffer_get_mrs() fails, leaving
1181 * some req segments uninitialized.
1184 rpcrdma_buffer_put_mr(struct rpcrdma_mw **mw, struct rpcrdma_buffer *buf)
1187 list_add_tail(&(*mw)->mw_list, &buf->rb_mws);
1192 /* Cycle mw's back in reverse order, and "spin" them.
1193 * This delays and scrambles reuse as much as possible.
1196 rpcrdma_buffer_put_mrs(struct rpcrdma_req *req, struct rpcrdma_buffer *buf)
1198 struct rpcrdma_mr_seg *seg = req->rl_segments;
1199 struct rpcrdma_mr_seg *seg1 = seg;
1202 for (i = 1, seg++; i < RPCRDMA_MAX_SEGS; seg++, i++)
1203 rpcrdma_buffer_put_mr(&seg->rl_mw, buf);
1204 rpcrdma_buffer_put_mr(&seg1->rl_mw, buf);
1208 rpcrdma_buffer_put_sendbuf(struct rpcrdma_req *req, struct rpcrdma_buffer *buf)
1210 buf->rb_send_bufs[--buf->rb_send_index] = req;
1212 if (req->rl_reply) {
1213 buf->rb_recv_bufs[--buf->rb_recv_index] = req->rl_reply;
1214 req->rl_reply->rr_func = NULL;
1215 req->rl_reply = NULL;
1219 /* rpcrdma_unmap_one() was already done during deregistration.
1220 * Redo only the ib_post_send().
1223 rpcrdma_retry_local_inv(struct rpcrdma_mw *r, struct rpcrdma_ia *ia)
1225 struct rpcrdma_xprt *r_xprt =
1226 container_of(ia, struct rpcrdma_xprt, rx_ia);
1227 struct ib_send_wr invalidate_wr, *bad_wr;
1230 dprintk("RPC: %s: FRMR %p is stale\n", __func__, r);
1232 /* When this FRMR is re-inserted into rb_mws, it is no longer stale */
1233 r->r.frmr.fr_state = FRMR_IS_INVALID;
1235 memset(&invalidate_wr, 0, sizeof(invalidate_wr));
1236 invalidate_wr.wr_id = (unsigned long)(void *)r;
1237 invalidate_wr.opcode = IB_WR_LOCAL_INV;
1238 invalidate_wr.ex.invalidate_rkey = r->r.frmr.fr_mr->rkey;
1239 DECR_CQCOUNT(&r_xprt->rx_ep);
1241 dprintk("RPC: %s: frmr %p invalidating rkey %08x\n",
1242 __func__, r, r->r.frmr.fr_mr->rkey);
1244 read_lock(&ia->ri_qplock);
1245 rc = ib_post_send(ia->ri_id->qp, &invalidate_wr, &bad_wr);
1246 read_unlock(&ia->ri_qplock);
1248 /* Force rpcrdma_buffer_get() to retry */
1249 r->r.frmr.fr_state = FRMR_IS_STALE;
1250 dprintk("RPC: %s: ib_post_send failed, %i\n",
1256 rpcrdma_retry_flushed_linv(struct list_head *stale,
1257 struct rpcrdma_buffer *buf)
1259 struct rpcrdma_ia *ia = rdmab_to_ia(buf);
1260 struct list_head *pos;
1261 struct rpcrdma_mw *r;
1262 unsigned long flags;
1264 list_for_each(pos, stale) {
1265 r = list_entry(pos, struct rpcrdma_mw, mw_list);
1266 rpcrdma_retry_local_inv(r, ia);
1269 spin_lock_irqsave(&buf->rb_lock, flags);
1270 list_splice_tail(stale, &buf->rb_mws);
1271 spin_unlock_irqrestore(&buf->rb_lock, flags);
1274 static struct rpcrdma_req *
1275 rpcrdma_buffer_get_frmrs(struct rpcrdma_req *req, struct rpcrdma_buffer *buf,
1276 struct list_head *stale)
1278 struct rpcrdma_mw *r;
1281 i = RPCRDMA_MAX_SEGS - 1;
1282 while (!list_empty(&buf->rb_mws)) {
1283 r = list_entry(buf->rb_mws.next,
1284 struct rpcrdma_mw, mw_list);
1285 list_del(&r->mw_list);
1286 if (r->r.frmr.fr_state == FRMR_IS_STALE) {
1287 list_add(&r->mw_list, stale);
1290 req->rl_segments[i].rl_mw = r;
1291 if (unlikely(i-- == 0))
1292 return req; /* Success */
1295 /* Not enough entries on rb_mws for this req */
1296 rpcrdma_buffer_put_sendbuf(req, buf);
1297 rpcrdma_buffer_put_mrs(req, buf);
1301 static struct rpcrdma_req *
1302 rpcrdma_buffer_get_fmrs(struct rpcrdma_req *req, struct rpcrdma_buffer *buf)
1304 struct rpcrdma_mw *r;
1307 i = RPCRDMA_MAX_SEGS - 1;
1308 while (!list_empty(&buf->rb_mws)) {
1309 r = list_entry(buf->rb_mws.next,
1310 struct rpcrdma_mw, mw_list);
1311 list_del(&r->mw_list);
1312 req->rl_segments[i].rl_mw = r;
1313 if (unlikely(i-- == 0))
1314 return req; /* Success */
1317 /* Not enough entries on rb_mws for this req */
1318 rpcrdma_buffer_put_sendbuf(req, buf);
1319 rpcrdma_buffer_put_mrs(req, buf);
1324 * Get a set of request/reply buffers.
1326 * Reply buffer (if needed) is attached to send buffer upon return.
1328 * rb_send_index and rb_recv_index MUST always be pointing to the
1329 * *next* available buffer (non-NULL). They are incremented after
1330 * removing buffers, and decremented *before* returning them.
1332 struct rpcrdma_req *
1333 rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
1335 struct rpcrdma_ia *ia = rdmab_to_ia(buffers);
1336 struct list_head stale;
1337 struct rpcrdma_req *req;
1338 unsigned long flags;
1340 spin_lock_irqsave(&buffers->rb_lock, flags);
1341 if (buffers->rb_send_index == buffers->rb_max_requests) {
1342 spin_unlock_irqrestore(&buffers->rb_lock, flags);
1343 dprintk("RPC: %s: out of request buffers\n", __func__);
1344 return ((struct rpcrdma_req *)NULL);
1347 req = buffers->rb_send_bufs[buffers->rb_send_index];
1348 if (buffers->rb_send_index < buffers->rb_recv_index) {
1349 dprintk("RPC: %s: %d extra receives outstanding (ok)\n",
1351 buffers->rb_recv_index - buffers->rb_send_index);
1352 req->rl_reply = NULL;
1354 req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
1355 buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
1357 buffers->rb_send_bufs[buffers->rb_send_index++] = NULL;
1359 INIT_LIST_HEAD(&stale);
1360 switch (ia->ri_memreg_strategy) {
1362 req = rpcrdma_buffer_get_frmrs(req, buffers, &stale);
1364 case RPCRDMA_MTHCAFMR:
1365 req = rpcrdma_buffer_get_fmrs(req, buffers);
1370 spin_unlock_irqrestore(&buffers->rb_lock, flags);
1371 if (!list_empty(&stale))
1372 rpcrdma_retry_flushed_linv(&stale, buffers);
1377 * Put request/reply buffers back into pool.
1378 * Pre-decrement counter/array index.
1381 rpcrdma_buffer_put(struct rpcrdma_req *req)
1383 struct rpcrdma_buffer *buffers = req->rl_buffer;
1384 struct rpcrdma_ia *ia = rdmab_to_ia(buffers);
1385 unsigned long flags;
1387 spin_lock_irqsave(&buffers->rb_lock, flags);
1388 rpcrdma_buffer_put_sendbuf(req, buffers);
1389 switch (ia->ri_memreg_strategy) {
1391 case RPCRDMA_MTHCAFMR:
1392 rpcrdma_buffer_put_mrs(req, buffers);
1397 spin_unlock_irqrestore(&buffers->rb_lock, flags);
1401 * Recover reply buffers from pool.
1402 * This happens when recovering from error conditions.
1403 * Post-increment counter/array index.
1406 rpcrdma_recv_buffer_get(struct rpcrdma_req *req)
1408 struct rpcrdma_buffer *buffers = req->rl_buffer;
1409 unsigned long flags;
1411 spin_lock_irqsave(&buffers->rb_lock, flags);
1412 if (buffers->rb_recv_index < buffers->rb_max_requests) {
1413 req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
1414 buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
1416 spin_unlock_irqrestore(&buffers->rb_lock, flags);
1420 * Put reply buffers back into pool when not attached to
1421 * request. This happens in error conditions.
1424 rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
1426 struct rpcrdma_buffer *buffers = rep->rr_buffer;
1427 unsigned long flags;
1429 rep->rr_func = NULL;
1430 spin_lock_irqsave(&buffers->rb_lock, flags);
1431 buffers->rb_recv_bufs[--buffers->rb_recv_index] = rep;
1432 spin_unlock_irqrestore(&buffers->rb_lock, flags);
1436 * Wrappers for internal-use kmalloc memory registration, used by buffer code.
1440 rpcrdma_mapping_error(struct rpcrdma_mr_seg *seg)
1442 dprintk("RPC: map_one: offset %p iova %llx len %zu\n",
1444 (unsigned long long)seg->mr_dma, seg->mr_dmalen);
1448 rpcrdma_register_internal(struct rpcrdma_ia *ia, void *va, int len,
1449 struct ib_mr **mrp, struct ib_sge *iov)
1451 struct ib_phys_buf ipb;
1456 * All memory passed here was kmalloc'ed, therefore phys-contiguous.
1458 iov->addr = ib_dma_map_single(ia->ri_id->device,
1459 va, len, DMA_BIDIRECTIONAL);
1460 if (ib_dma_mapping_error(ia->ri_id->device, iov->addr))
1465 if (ia->ri_have_dma_lkey) {
1467 iov->lkey = ia->ri_dma_lkey;
1469 } else if (ia->ri_bind_mem != NULL) {
1471 iov->lkey = ia->ri_bind_mem->lkey;
1475 ipb.addr = iov->addr;
1476 ipb.size = iov->length;
1477 mr = ib_reg_phys_mr(ia->ri_pd, &ipb, 1,
1478 IB_ACCESS_LOCAL_WRITE, &iov->addr);
1480 dprintk("RPC: %s: phys convert: 0x%llx "
1481 "registered 0x%llx length %d\n",
1482 __func__, (unsigned long long)ipb.addr,
1483 (unsigned long long)iov->addr, len);
1488 dprintk("RPC: %s: failed with %i\n", __func__, rc);
1491 iov->lkey = mr->lkey;
1499 rpcrdma_deregister_internal(struct rpcrdma_ia *ia,
1500 struct ib_mr *mr, struct ib_sge *iov)
1504 ib_dma_unmap_single(ia->ri_id->device,
1505 iov->addr, iov->length, DMA_BIDIRECTIONAL);
1510 rc = ib_dereg_mr(mr);
1512 dprintk("RPC: %s: ib_dereg_mr failed %i\n", __func__, rc);
1517 * rpcrdma_alloc_regbuf - kmalloc and register memory for SEND/RECV buffers
1518 * @ia: controlling rpcrdma_ia
1519 * @size: size of buffer to be allocated, in bytes
1522 * Returns pointer to private header of an area of internally
1523 * registered memory, or an ERR_PTR. The registered buffer follows
1524 * the end of the private header.
1526 * xprtrdma uses a regbuf for posting an outgoing RDMA SEND, or for
1527 * receiving the payload of RDMA RECV operations. regbufs are not
1528 * used for RDMA READ/WRITE operations, thus are registered only for
1531 struct rpcrdma_regbuf *
1532 rpcrdma_alloc_regbuf(struct rpcrdma_ia *ia, size_t size, gfp_t flags)
1534 struct rpcrdma_regbuf *rb;
1538 rb = kmalloc(sizeof(*rb) + size, flags);
1543 rb->rg_owner = NULL;
1544 rc = rpcrdma_register_internal(ia, rb->rg_base, size,
1545 &rb->rg_mr, &rb->rg_iov);
1558 * rpcrdma_free_regbuf - deregister and free registered buffer
1559 * @ia: controlling rpcrdma_ia
1560 * @rb: regbuf to be deregistered and freed
1563 rpcrdma_free_regbuf(struct rpcrdma_ia *ia, struct rpcrdma_regbuf *rb)
1566 rpcrdma_deregister_internal(ia, rb->rg_mr, &rb->rg_iov);
1572 * Prepost any receive buffer, then post send.
1574 * Receive buffer is donated to hardware, reclaimed upon recv completion.
1577 rpcrdma_ep_post(struct rpcrdma_ia *ia,
1578 struct rpcrdma_ep *ep,
1579 struct rpcrdma_req *req)
1581 struct ib_send_wr send_wr, *send_wr_fail;
1582 struct rpcrdma_rep *rep = req->rl_reply;
1586 rc = rpcrdma_ep_post_recv(ia, ep, rep);
1589 req->rl_reply = NULL;
1592 send_wr.next = NULL;
1593 send_wr.wr_id = RPCRDMA_IGNORE_COMPLETION;
1594 send_wr.sg_list = req->rl_send_iov;
1595 send_wr.num_sge = req->rl_niovs;
1596 send_wr.opcode = IB_WR_SEND;
1597 if (send_wr.num_sge == 4) /* no need to sync any pad (constant) */
1598 ib_dma_sync_single_for_device(ia->ri_id->device,
1599 req->rl_send_iov[3].addr, req->rl_send_iov[3].length,
1601 ib_dma_sync_single_for_device(ia->ri_id->device,
1602 req->rl_send_iov[1].addr, req->rl_send_iov[1].length,
1604 ib_dma_sync_single_for_device(ia->ri_id->device,
1605 req->rl_send_iov[0].addr, req->rl_send_iov[0].length,
1608 if (DECR_CQCOUNT(ep) > 0)
1609 send_wr.send_flags = 0;
1610 else { /* Provider must take a send completion every now and then */
1612 send_wr.send_flags = IB_SEND_SIGNALED;
1615 rc = ib_post_send(ia->ri_id->qp, &send_wr, &send_wr_fail);
1617 dprintk("RPC: %s: ib_post_send returned %i\n", __func__,
1624 * (Re)post a receive buffer.
1627 rpcrdma_ep_post_recv(struct rpcrdma_ia *ia,
1628 struct rpcrdma_ep *ep,
1629 struct rpcrdma_rep *rep)
1631 struct ib_recv_wr recv_wr, *recv_wr_fail;
1634 recv_wr.next = NULL;
1635 recv_wr.wr_id = (u64) (unsigned long) rep;
1636 recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov;
1637 recv_wr.num_sge = 1;
1639 ib_dma_sync_single_for_cpu(ia->ri_id->device,
1640 rdmab_addr(rep->rr_rdmabuf),
1641 rdmab_length(rep->rr_rdmabuf),
1644 rc = ib_post_recv(ia->ri_id->qp, &recv_wr, &recv_wr_fail);
1647 dprintk("RPC: %s: ib_post_recv returned %i\n", __func__,
1652 /* How many chunk list items fit within our inline buffers?
1655 rpcrdma_max_segments(struct rpcrdma_xprt *r_xprt)
1657 struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
1658 int bytes, segments;
1660 bytes = min_t(unsigned int, cdata->inline_wsize, cdata->inline_rsize);
1661 bytes -= RPCRDMA_HDRLEN_MIN;
1662 if (bytes < sizeof(struct rpcrdma_segment) * 2) {
1663 pr_warn("RPC: %s: inline threshold too small\n",
1668 segments = 1 << (fls(bytes / sizeof(struct rpcrdma_segment)) - 1);
1669 dprintk("RPC: %s: max chunk list size = %d segments\n",
1670 __func__, segments);