2 * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
4 * This software is available to you under a choice of one of two
5 * licenses. You may choose to be licensed under the terms of the GNU
6 * General Public License (GPL) Version 2, available from the file
7 * COPYING in the main directory of this source tree, or the BSD-type
10 * Redistribution and use in source and binary forms, with or without
11 * modification, are permitted provided that the following conditions
14 * Redistributions of source code must retain the above copyright
15 * notice, this list of conditions and the following disclaimer.
17 * Redistributions in binary form must reproduce the above
18 * copyright notice, this list of conditions and the following
19 * disclaimer in the documentation and/or other materials provided
20 * with the distribution.
22 * Neither the name of the Network Appliance, Inc. nor the names of
23 * its contributors may be used to endorse or promote products
24 * derived from this software without specific prior written
27 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
28 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
29 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
30 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
31 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
32 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
33 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
34 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
35 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
36 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
37 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
43 * Encapsulates the major functions managing:
50 #include <linux/interrupt.h>
51 #include <linux/slab.h>
52 #include <linux/prefetch.h>
53 #include <asm/bitops.h>
55 #include "xprt_rdma.h"
61 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
62 # define RPCDBG_FACILITY RPCDBG_TRANS
65 static void rpcrdma_reset_frmrs(struct rpcrdma_ia *);
66 static void rpcrdma_reset_fmrs(struct rpcrdma_ia *);
73 * handle replies in tasklet context, using a single, global list
74 * rdma tasklet function -- just turn around and call the func
75 * for all replies on the list
78 static DEFINE_SPINLOCK(rpcrdma_tk_lock_g);
79 static LIST_HEAD(rpcrdma_tasklets_g);
82 rpcrdma_run_tasklet(unsigned long data)
84 struct rpcrdma_rep *rep;
85 void (*func)(struct rpcrdma_rep *);
89 spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
90 while (!list_empty(&rpcrdma_tasklets_g)) {
91 rep = list_entry(rpcrdma_tasklets_g.next,
92 struct rpcrdma_rep, rr_list);
93 list_del(&rep->rr_list);
96 spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
101 rpcrdma_recv_buffer_put(rep);
103 spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
105 spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
108 static DECLARE_TASKLET(rpcrdma_tasklet_g, rpcrdma_run_tasklet, 0UL);
110 static const char * const async_event[] = {
115 "communication established",
116 "send queue drained",
117 "path migration successful",
119 "device fatal error",
132 #define ASYNC_MSG(status) \
133 ((status) < ARRAY_SIZE(async_event) ? \
134 async_event[(status)] : "unknown async error")
137 rpcrdma_schedule_tasklet(struct list_head *sched_list)
141 spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
142 list_splice_tail(sched_list, &rpcrdma_tasklets_g);
143 spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
144 tasklet_schedule(&rpcrdma_tasklet_g);
148 rpcrdma_qp_async_error_upcall(struct ib_event *event, void *context)
150 struct rpcrdma_ep *ep = context;
152 pr_err("RPC: %s: %s on device %s ep %p\n",
153 __func__, ASYNC_MSG(event->event),
154 event->device->name, context);
155 if (ep->rep_connected == 1) {
156 ep->rep_connected = -EIO;
157 rpcrdma_conn_func(ep);
158 wake_up_all(&ep->rep_connect_wait);
163 rpcrdma_cq_async_error_upcall(struct ib_event *event, void *context)
165 struct rpcrdma_ep *ep = context;
167 pr_err("RPC: %s: %s on device %s ep %p\n",
168 __func__, ASYNC_MSG(event->event),
169 event->device->name, context);
170 if (ep->rep_connected == 1) {
171 ep->rep_connected = -EIO;
172 rpcrdma_conn_func(ep);
173 wake_up_all(&ep->rep_connect_wait);
177 static const char * const wc_status[] = {
179 "local length error",
180 "local QP operation error",
181 "local EE context operation error",
182 "local protection error",
184 "memory management operation error",
185 "bad response error",
186 "local access error",
187 "remote invalid request error",
188 "remote access error",
189 "remote operation error",
190 "transport retry counter exceeded",
191 "RNR retrycounter exceeded",
192 "local RDD violation error",
193 "remove invalid RD request",
195 "invalid EE context number",
196 "invalid EE context state",
198 "response timeout error",
202 #define COMPLETION_MSG(status) \
203 ((status) < ARRAY_SIZE(wc_status) ? \
204 wc_status[(status)] : "unexpected completion error")
207 rpcrdma_sendcq_process_wc(struct ib_wc *wc)
209 if (likely(wc->status == IB_WC_SUCCESS))
212 /* WARNING: Only wr_id and status are reliable at this point */
213 if (wc->wr_id == 0ULL) {
214 if (wc->status != IB_WC_WR_FLUSH_ERR)
215 pr_err("RPC: %s: SEND: %s\n",
216 __func__, COMPLETION_MSG(wc->status));
218 struct rpcrdma_mw *r;
220 r = (struct rpcrdma_mw *)(unsigned long)wc->wr_id;
221 r->r.frmr.fr_state = FRMR_IS_STALE;
222 pr_err("RPC: %s: frmr %p (stale): %s\n",
223 __func__, r, COMPLETION_MSG(wc->status));
228 rpcrdma_sendcq_poll(struct ib_cq *cq, struct rpcrdma_ep *ep)
231 int budget, count, rc;
233 budget = RPCRDMA_WC_BUDGET / RPCRDMA_POLLSIZE;
235 wcs = ep->rep_send_wcs;
237 rc = ib_poll_cq(cq, RPCRDMA_POLLSIZE, wcs);
243 rpcrdma_sendcq_process_wc(wcs++);
244 } while (rc == RPCRDMA_POLLSIZE && --budget);
249 * Handle send, fast_reg_mr, and local_inv completions.
251 * Send events are typically suppressed and thus do not result
252 * in an upcall. Occasionally one is signaled, however. This
253 * prevents the provider's completion queue from wrapping and
254 * losing a completion.
257 rpcrdma_sendcq_upcall(struct ib_cq *cq, void *cq_context)
259 struct rpcrdma_ep *ep = (struct rpcrdma_ep *)cq_context;
262 rc = rpcrdma_sendcq_poll(cq, ep);
264 dprintk("RPC: %s: ib_poll_cq failed: %i\n",
269 rc = ib_req_notify_cq(cq,
270 IB_CQ_NEXT_COMP | IB_CQ_REPORT_MISSED_EVENTS);
274 dprintk("RPC: %s: ib_req_notify_cq failed: %i\n",
279 rpcrdma_sendcq_poll(cq, ep);
283 rpcrdma_recvcq_process_wc(struct ib_wc *wc, struct list_head *sched_list)
285 struct rpcrdma_rep *rep =
286 (struct rpcrdma_rep *)(unsigned long)wc->wr_id;
288 /* WARNING: Only wr_id and status are reliable at this point */
289 if (wc->status != IB_WC_SUCCESS)
292 /* status == SUCCESS means all fields in wc are trustworthy */
293 if (wc->opcode != IB_WC_RECV)
296 dprintk("RPC: %s: rep %p opcode 'recv', length %u: success\n",
297 __func__, rep, wc->byte_len);
299 rep->rr_len = wc->byte_len;
300 ib_dma_sync_single_for_cpu(rdmab_to_ia(rep->rr_buffer)->ri_id->device,
301 rdmab_addr(rep->rr_rdmabuf),
302 rep->rr_len, DMA_FROM_DEVICE);
303 prefetch(rdmab_to_msg(rep->rr_rdmabuf));
306 list_add_tail(&rep->rr_list, sched_list);
309 if (wc->status != IB_WC_WR_FLUSH_ERR)
310 pr_err("RPC: %s: rep %p: %s\n",
311 __func__, rep, COMPLETION_MSG(wc->status));
317 rpcrdma_recvcq_poll(struct ib_cq *cq, struct rpcrdma_ep *ep)
319 struct list_head sched_list;
321 int budget, count, rc;
323 INIT_LIST_HEAD(&sched_list);
324 budget = RPCRDMA_WC_BUDGET / RPCRDMA_POLLSIZE;
326 wcs = ep->rep_recv_wcs;
328 rc = ib_poll_cq(cq, RPCRDMA_POLLSIZE, wcs);
334 rpcrdma_recvcq_process_wc(wcs++, &sched_list);
335 } while (rc == RPCRDMA_POLLSIZE && --budget);
339 rpcrdma_schedule_tasklet(&sched_list);
344 * Handle receive completions.
346 * It is reentrant but processes single events in order to maintain
347 * ordering of receives to keep server credits.
349 * It is the responsibility of the scheduled tasklet to return
350 * recv buffers to the pool. NOTE: this affects synchronization of
351 * connection shutdown. That is, the structures required for
352 * the completion of the reply handler must remain intact until
353 * all memory has been reclaimed.
356 rpcrdma_recvcq_upcall(struct ib_cq *cq, void *cq_context)
358 struct rpcrdma_ep *ep = (struct rpcrdma_ep *)cq_context;
361 rc = rpcrdma_recvcq_poll(cq, ep);
363 dprintk("RPC: %s: ib_poll_cq failed: %i\n",
368 rc = ib_req_notify_cq(cq,
369 IB_CQ_NEXT_COMP | IB_CQ_REPORT_MISSED_EVENTS);
373 dprintk("RPC: %s: ib_req_notify_cq failed: %i\n",
378 rpcrdma_recvcq_poll(cq, ep);
382 rpcrdma_flush_cqs(struct rpcrdma_ep *ep)
385 LIST_HEAD(sched_list);
387 while (ib_poll_cq(ep->rep_attr.recv_cq, 1, &wc) > 0)
388 rpcrdma_recvcq_process_wc(&wc, &sched_list);
389 if (!list_empty(&sched_list))
390 rpcrdma_schedule_tasklet(&sched_list);
391 while (ib_poll_cq(ep->rep_attr.send_cq, 1, &wc) > 0)
392 rpcrdma_sendcq_process_wc(&wc);
395 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
396 static const char * const conn[] = {
415 #define CONNECTION_MSG(status) \
416 ((status) < ARRAY_SIZE(conn) ? \
417 conn[(status)] : "unrecognized connection error")
421 rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
423 struct rpcrdma_xprt *xprt = id->context;
424 struct rpcrdma_ia *ia = &xprt->rx_ia;
425 struct rpcrdma_ep *ep = &xprt->rx_ep;
426 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
427 struct sockaddr_in *addr = (struct sockaddr_in *) &ep->rep_remote_addr;
429 struct ib_qp_attr *attr = &ia->ri_qp_attr;
430 struct ib_qp_init_attr *iattr = &ia->ri_qp_init_attr;
433 switch (event->event) {
434 case RDMA_CM_EVENT_ADDR_RESOLVED:
435 case RDMA_CM_EVENT_ROUTE_RESOLVED:
437 complete(&ia->ri_done);
439 case RDMA_CM_EVENT_ADDR_ERROR:
440 ia->ri_async_rc = -EHOSTUNREACH;
441 dprintk("RPC: %s: CM address resolution error, ep 0x%p\n",
443 complete(&ia->ri_done);
445 case RDMA_CM_EVENT_ROUTE_ERROR:
446 ia->ri_async_rc = -ENETUNREACH;
447 dprintk("RPC: %s: CM route resolution error, ep 0x%p\n",
449 complete(&ia->ri_done);
451 case RDMA_CM_EVENT_ESTABLISHED:
453 ib_query_qp(ia->ri_id->qp, attr,
454 IB_QP_MAX_QP_RD_ATOMIC | IB_QP_MAX_DEST_RD_ATOMIC,
456 dprintk("RPC: %s: %d responder resources"
458 __func__, attr->max_dest_rd_atomic,
459 attr->max_rd_atomic);
461 case RDMA_CM_EVENT_CONNECT_ERROR:
462 connstate = -ENOTCONN;
464 case RDMA_CM_EVENT_UNREACHABLE:
465 connstate = -ENETDOWN;
467 case RDMA_CM_EVENT_REJECTED:
468 connstate = -ECONNREFUSED;
470 case RDMA_CM_EVENT_DISCONNECTED:
471 connstate = -ECONNABORTED;
473 case RDMA_CM_EVENT_DEVICE_REMOVAL:
476 dprintk("RPC: %s: %sconnected\n",
477 __func__, connstate > 0 ? "" : "dis");
478 ep->rep_connected = connstate;
479 rpcrdma_conn_func(ep);
480 wake_up_all(&ep->rep_connect_wait);
483 dprintk("RPC: %s: %pI4:%u (ep 0x%p): %s\n",
484 __func__, &addr->sin_addr.s_addr,
485 ntohs(addr->sin_port), ep,
486 CONNECTION_MSG(event->event));
490 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
491 if (connstate == 1) {
492 int ird = attr->max_dest_rd_atomic;
493 int tird = ep->rep_remote_cma.responder_resources;
494 printk(KERN_INFO "rpcrdma: connection to %pI4:%u "
495 "on %s, memreg %d slots %d ird %d%s\n",
496 &addr->sin_addr.s_addr,
497 ntohs(addr->sin_port),
498 ia->ri_id->device->name,
499 ia->ri_memreg_strategy,
500 xprt->rx_buf.rb_max_requests,
501 ird, ird < 4 && ird < tird / 2 ? " (low!)" : "");
502 } else if (connstate < 0) {
503 printk(KERN_INFO "rpcrdma: connection to %pI4:%u closed (%d)\n",
504 &addr->sin_addr.s_addr,
505 ntohs(addr->sin_port),
513 static struct rdma_cm_id *
514 rpcrdma_create_id(struct rpcrdma_xprt *xprt,
515 struct rpcrdma_ia *ia, struct sockaddr *addr)
517 struct rdma_cm_id *id;
520 init_completion(&ia->ri_done);
522 id = rdma_create_id(rpcrdma_conn_upcall, xprt, RDMA_PS_TCP, IB_QPT_RC);
525 dprintk("RPC: %s: rdma_create_id() failed %i\n",
530 ia->ri_async_rc = -ETIMEDOUT;
531 rc = rdma_resolve_addr(id, NULL, addr, RDMA_RESOLVE_TIMEOUT);
533 dprintk("RPC: %s: rdma_resolve_addr() failed %i\n",
537 wait_for_completion_interruptible_timeout(&ia->ri_done,
538 msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1);
539 rc = ia->ri_async_rc;
543 ia->ri_async_rc = -ETIMEDOUT;
544 rc = rdma_resolve_route(id, RDMA_RESOLVE_TIMEOUT);
546 dprintk("RPC: %s: rdma_resolve_route() failed %i\n",
550 wait_for_completion_interruptible_timeout(&ia->ri_done,
551 msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1);
552 rc = ia->ri_async_rc;
564 * Drain any cq, prior to teardown.
567 rpcrdma_clean_cq(struct ib_cq *cq)
572 while (1 == ib_poll_cq(cq, 1, &wc))
576 dprintk("RPC: %s: flushed %d events (last 0x%x)\n",
577 __func__, count, wc.opcode);
581 * Exported functions.
585 * Open and initialize an Interface Adapter.
586 * o initializes fields of struct rpcrdma_ia, including
587 * interface and provider attributes and protection zone.
590 rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)
593 struct rpcrdma_ia *ia = &xprt->rx_ia;
594 struct ib_device_attr *devattr = &ia->ri_devattr;
596 ia->ri_id = rpcrdma_create_id(xprt, ia, addr);
597 if (IS_ERR(ia->ri_id)) {
598 rc = PTR_ERR(ia->ri_id);
602 ia->ri_pd = ib_alloc_pd(ia->ri_id->device);
603 if (IS_ERR(ia->ri_pd)) {
604 rc = PTR_ERR(ia->ri_pd);
605 dprintk("RPC: %s: ib_alloc_pd() failed %i\n",
610 rc = ib_query_device(ia->ri_id->device, devattr);
612 dprintk("RPC: %s: ib_query_device failed %d\n",
617 if (devattr->device_cap_flags & IB_DEVICE_LOCAL_DMA_LKEY) {
618 ia->ri_have_dma_lkey = 1;
619 ia->ri_dma_lkey = ia->ri_id->device->local_dma_lkey;
622 if (memreg == RPCRDMA_FRMR) {
623 /* Requires both frmr reg and local dma lkey */
624 if ((devattr->device_cap_flags &
625 (IB_DEVICE_MEM_MGT_EXTENSIONS|IB_DEVICE_LOCAL_DMA_LKEY)) !=
626 (IB_DEVICE_MEM_MGT_EXTENSIONS|IB_DEVICE_LOCAL_DMA_LKEY)) {
627 dprintk("RPC: %s: FRMR registration "
628 "not supported by HCA\n", __func__);
629 memreg = RPCRDMA_MTHCAFMR;
631 /* Mind the ia limit on FRMR page list depth */
632 ia->ri_max_frmr_depth = min_t(unsigned int,
633 RPCRDMA_MAX_DATA_SEGS,
634 devattr->max_fast_reg_page_list_len);
637 if (memreg == RPCRDMA_MTHCAFMR) {
638 if (!ia->ri_id->device->alloc_fmr) {
639 dprintk("RPC: %s: MTHCAFMR registration "
640 "not supported by HCA\n", __func__);
641 memreg = RPCRDMA_ALLPHYSICAL;
646 * Optionally obtain an underlying physical identity mapping in
647 * order to do a memory window-based bind. This base registration
648 * is protected from remote access - that is enabled only by binding
649 * for the specific bytes targeted during each RPC operation, and
650 * revoked after the corresponding completion similar to a storage
656 case RPCRDMA_ALLPHYSICAL:
657 mem_priv = IB_ACCESS_LOCAL_WRITE |
658 IB_ACCESS_REMOTE_WRITE |
659 IB_ACCESS_REMOTE_READ;
661 case RPCRDMA_MTHCAFMR:
662 if (ia->ri_have_dma_lkey)
664 mem_priv = IB_ACCESS_LOCAL_WRITE;
666 ia->ri_bind_mem = ib_get_dma_mr(ia->ri_pd, mem_priv);
667 if (IS_ERR(ia->ri_bind_mem)) {
668 printk(KERN_ALERT "%s: ib_get_dma_mr for "
669 "phys register failed with %lX\n",
670 __func__, PTR_ERR(ia->ri_bind_mem));
676 printk(KERN_ERR "RPC: Unsupported memory "
677 "registration mode: %d\n", memreg);
681 dprintk("RPC: %s: memory registration strategy is %d\n",
684 /* Else will do memory reg/dereg for each chunk */
685 ia->ri_memreg_strategy = memreg;
687 rwlock_init(&ia->ri_qplock);
691 ib_dealloc_pd(ia->ri_pd);
694 rdma_destroy_id(ia->ri_id);
701 * Clean up/close an IA.
702 * o if event handles and PD have been initialized, free them.
706 rpcrdma_ia_close(struct rpcrdma_ia *ia)
710 dprintk("RPC: %s: entering\n", __func__);
711 if (ia->ri_bind_mem != NULL) {
712 rc = ib_dereg_mr(ia->ri_bind_mem);
713 dprintk("RPC: %s: ib_dereg_mr returned %i\n",
716 if (ia->ri_id != NULL && !IS_ERR(ia->ri_id)) {
718 rdma_destroy_qp(ia->ri_id);
719 rdma_destroy_id(ia->ri_id);
722 if (ia->ri_pd != NULL && !IS_ERR(ia->ri_pd)) {
723 rc = ib_dealloc_pd(ia->ri_pd);
724 dprintk("RPC: %s: ib_dealloc_pd returned %i\n",
730 * Create unconnected endpoint.
733 rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
734 struct rpcrdma_create_data_internal *cdata)
736 struct ib_device_attr *devattr = &ia->ri_devattr;
737 struct ib_cq *sendcq, *recvcq;
740 /* check provider's send/recv wr limits */
741 if (cdata->max_requests > devattr->max_qp_wr)
742 cdata->max_requests = devattr->max_qp_wr;
744 ep->rep_attr.event_handler = rpcrdma_qp_async_error_upcall;
745 ep->rep_attr.qp_context = ep;
746 /* send_cq and recv_cq initialized below */
747 ep->rep_attr.srq = NULL;
748 ep->rep_attr.cap.max_send_wr = cdata->max_requests;
749 switch (ia->ri_memreg_strategy) {
753 /* Add room for frmr register and invalidate WRs.
754 * 1. FRMR reg WR for head
755 * 2. FRMR invalidate WR for head
756 * 3. N FRMR reg WRs for pagelist
757 * 4. N FRMR invalidate WRs for pagelist
758 * 5. FRMR reg WR for tail
759 * 6. FRMR invalidate WR for tail
760 * 7. The RDMA_SEND WR
763 /* Calculate N if the device max FRMR depth is smaller than
764 * RPCRDMA_MAX_DATA_SEGS.
766 if (ia->ri_max_frmr_depth < RPCRDMA_MAX_DATA_SEGS) {
767 int delta = RPCRDMA_MAX_DATA_SEGS -
768 ia->ri_max_frmr_depth;
771 depth += 2; /* FRMR reg + invalidate */
772 delta -= ia->ri_max_frmr_depth;
776 ep->rep_attr.cap.max_send_wr *= depth;
777 if (ep->rep_attr.cap.max_send_wr > devattr->max_qp_wr) {
778 cdata->max_requests = devattr->max_qp_wr / depth;
779 if (!cdata->max_requests)
781 ep->rep_attr.cap.max_send_wr = cdata->max_requests *
789 ep->rep_attr.cap.max_recv_wr = cdata->max_requests;
790 ep->rep_attr.cap.max_send_sge = (cdata->padding ? 4 : 2);
791 ep->rep_attr.cap.max_recv_sge = 1;
792 ep->rep_attr.cap.max_inline_data = 0;
793 ep->rep_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
794 ep->rep_attr.qp_type = IB_QPT_RC;
795 ep->rep_attr.port_num = ~0;
797 if (cdata->padding) {
798 ep->rep_padbuf = rpcrdma_alloc_regbuf(ia, cdata->padding,
800 if (IS_ERR(ep->rep_padbuf))
801 return PTR_ERR(ep->rep_padbuf);
803 ep->rep_padbuf = NULL;
805 dprintk("RPC: %s: requested max: dtos: send %d recv %d; "
806 "iovs: send %d recv %d\n",
808 ep->rep_attr.cap.max_send_wr,
809 ep->rep_attr.cap.max_recv_wr,
810 ep->rep_attr.cap.max_send_sge,
811 ep->rep_attr.cap.max_recv_sge);
813 /* set trigger for requesting send completion */
814 ep->rep_cqinit = ep->rep_attr.cap.max_send_wr/2 - 1;
815 if (ep->rep_cqinit > RPCRDMA_MAX_UNSIGNALED_SENDS)
816 ep->rep_cqinit = RPCRDMA_MAX_UNSIGNALED_SENDS;
817 else if (ep->rep_cqinit <= 2)
820 init_waitqueue_head(&ep->rep_connect_wait);
821 INIT_DELAYED_WORK(&ep->rep_connect_worker, rpcrdma_connect_worker);
823 sendcq = ib_create_cq(ia->ri_id->device, rpcrdma_sendcq_upcall,
824 rpcrdma_cq_async_error_upcall, ep,
825 ep->rep_attr.cap.max_send_wr + 1, 0);
826 if (IS_ERR(sendcq)) {
827 rc = PTR_ERR(sendcq);
828 dprintk("RPC: %s: failed to create send CQ: %i\n",
833 rc = ib_req_notify_cq(sendcq, IB_CQ_NEXT_COMP);
835 dprintk("RPC: %s: ib_req_notify_cq failed: %i\n",
840 recvcq = ib_create_cq(ia->ri_id->device, rpcrdma_recvcq_upcall,
841 rpcrdma_cq_async_error_upcall, ep,
842 ep->rep_attr.cap.max_recv_wr + 1, 0);
843 if (IS_ERR(recvcq)) {
844 rc = PTR_ERR(recvcq);
845 dprintk("RPC: %s: failed to create recv CQ: %i\n",
850 rc = ib_req_notify_cq(recvcq, IB_CQ_NEXT_COMP);
852 dprintk("RPC: %s: ib_req_notify_cq failed: %i\n",
854 ib_destroy_cq(recvcq);
858 ep->rep_attr.send_cq = sendcq;
859 ep->rep_attr.recv_cq = recvcq;
861 /* Initialize cma parameters */
863 /* RPC/RDMA does not use private data */
864 ep->rep_remote_cma.private_data = NULL;
865 ep->rep_remote_cma.private_data_len = 0;
867 /* Client offers RDMA Read but does not initiate */
868 ep->rep_remote_cma.initiator_depth = 0;
869 if (devattr->max_qp_rd_atom > 32) /* arbitrary but <= 255 */
870 ep->rep_remote_cma.responder_resources = 32;
872 ep->rep_remote_cma.responder_resources =
873 devattr->max_qp_rd_atom;
875 ep->rep_remote_cma.retry_count = 7;
876 ep->rep_remote_cma.flow_control = 0;
877 ep->rep_remote_cma.rnr_retry_count = 0;
882 err = ib_destroy_cq(sendcq);
884 dprintk("RPC: %s: ib_destroy_cq returned %i\n",
887 rpcrdma_free_regbuf(ia, ep->rep_padbuf);
894 * Disconnect and destroy endpoint. After this, the only
895 * valid operations on the ep are to free it (if dynamically
896 * allocated) or re-create it.
899 rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
903 dprintk("RPC: %s: entering, connected is %d\n",
904 __func__, ep->rep_connected);
906 cancel_delayed_work_sync(&ep->rep_connect_worker);
909 rpcrdma_ep_disconnect(ep, ia);
910 rdma_destroy_qp(ia->ri_id);
911 ia->ri_id->qp = NULL;
914 rpcrdma_free_regbuf(ia, ep->rep_padbuf);
916 rpcrdma_clean_cq(ep->rep_attr.recv_cq);
917 rc = ib_destroy_cq(ep->rep_attr.recv_cq);
919 dprintk("RPC: %s: ib_destroy_cq returned %i\n",
922 rpcrdma_clean_cq(ep->rep_attr.send_cq);
923 rc = ib_destroy_cq(ep->rep_attr.send_cq);
925 dprintk("RPC: %s: ib_destroy_cq returned %i\n",
930 * Connect unconnected endpoint.
933 rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
935 struct rdma_cm_id *id, *old;
939 if (ep->rep_connected != 0) {
940 struct rpcrdma_xprt *xprt;
942 dprintk("RPC: %s: reconnecting...\n", __func__);
944 rpcrdma_ep_disconnect(ep, ia);
945 rpcrdma_flush_cqs(ep);
947 switch (ia->ri_memreg_strategy) {
949 rpcrdma_reset_frmrs(ia);
951 case RPCRDMA_MTHCAFMR:
952 rpcrdma_reset_fmrs(ia);
954 case RPCRDMA_ALLPHYSICAL:
961 xprt = container_of(ia, struct rpcrdma_xprt, rx_ia);
962 id = rpcrdma_create_id(xprt, ia,
963 (struct sockaddr *)&xprt->rx_data.addr);
968 /* TEMP TEMP TEMP - fail if new device:
969 * Deregister/remarshal *all* requests!
970 * Close and recreate adapter, pd, etc!
971 * Re-determine all attributes still sane!
972 * More stuff I haven't thought of!
975 if (ia->ri_id->device != id->device) {
976 printk("RPC: %s: can't reconnect on "
977 "different device!\n", __func__);
983 rc = rdma_create_qp(id, ia->ri_pd, &ep->rep_attr);
985 dprintk("RPC: %s: rdma_create_qp failed %i\n",
992 write_lock(&ia->ri_qplock);
995 write_unlock(&ia->ri_qplock);
997 rdma_destroy_qp(old);
998 rdma_destroy_id(old);
1000 dprintk("RPC: %s: connecting...\n", __func__);
1001 rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr);
1003 dprintk("RPC: %s: rdma_create_qp failed %i\n",
1005 /* do not update ep->rep_connected */
1006 return -ENETUNREACH;
1010 ep->rep_connected = 0;
1012 rc = rdma_connect(ia->ri_id, &ep->rep_remote_cma);
1014 dprintk("RPC: %s: rdma_connect() failed with %i\n",
1019 wait_event_interruptible(ep->rep_connect_wait, ep->rep_connected != 0);
1022 * Check state. A non-peer reject indicates no listener
1023 * (ECONNREFUSED), which may be a transient state. All
1024 * others indicate a transport condition which has already
1025 * undergone a best-effort.
1027 if (ep->rep_connected == -ECONNREFUSED &&
1028 ++retry_count <= RDMA_CONNECT_RETRY_MAX) {
1029 dprintk("RPC: %s: non-peer_reject, retry\n", __func__);
1032 if (ep->rep_connected <= 0) {
1033 /* Sometimes, the only way to reliably connect to remote
1034 * CMs is to use same nonzero values for ORD and IRD. */
1035 if (retry_count++ <= RDMA_CONNECT_RETRY_MAX + 1 &&
1036 (ep->rep_remote_cma.responder_resources == 0 ||
1037 ep->rep_remote_cma.initiator_depth !=
1038 ep->rep_remote_cma.responder_resources)) {
1039 if (ep->rep_remote_cma.responder_resources == 0)
1040 ep->rep_remote_cma.responder_resources = 1;
1041 ep->rep_remote_cma.initiator_depth =
1042 ep->rep_remote_cma.responder_resources;
1045 rc = ep->rep_connected;
1047 dprintk("RPC: %s: connected\n", __func__);
1052 ep->rep_connected = rc;
1057 * rpcrdma_ep_disconnect
1059 * This is separate from destroy to facilitate the ability
1060 * to reconnect without recreating the endpoint.
1062 * This call is not reentrant, and must not be made in parallel
1063 * on the same endpoint.
1066 rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
1070 rpcrdma_flush_cqs(ep);
1071 rc = rdma_disconnect(ia->ri_id);
1073 /* returns without wait if not connected */
1074 wait_event_interruptible(ep->rep_connect_wait,
1075 ep->rep_connected != 1);
1076 dprintk("RPC: %s: after wait, %sconnected\n", __func__,
1077 (ep->rep_connected == 1) ? "still " : "dis");
1079 dprintk("RPC: %s: rdma_disconnect %i\n", __func__, rc);
1080 ep->rep_connected = rc;
1084 static struct rpcrdma_req *
1085 rpcrdma_create_req(struct rpcrdma_xprt *r_xprt)
1087 struct rpcrdma_req *req;
1089 req = kzalloc(sizeof(*req), GFP_KERNEL);
1091 return ERR_PTR(-ENOMEM);
1093 req->rl_buffer = &r_xprt->rx_buf;
1097 static struct rpcrdma_rep *
1098 rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt)
1100 struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
1101 struct rpcrdma_ia *ia = &r_xprt->rx_ia;
1102 struct rpcrdma_rep *rep;
1106 rep = kzalloc(sizeof(*rep), GFP_KERNEL);
1110 rep->rr_rdmabuf = rpcrdma_alloc_regbuf(ia, cdata->inline_rsize,
1112 if (IS_ERR(rep->rr_rdmabuf)) {
1113 rc = PTR_ERR(rep->rr_rdmabuf);
1117 rep->rr_buffer = &r_xprt->rx_buf;
1127 rpcrdma_init_fmrs(struct rpcrdma_ia *ia, struct rpcrdma_buffer *buf)
1129 int mr_access_flags = IB_ACCESS_REMOTE_WRITE | IB_ACCESS_REMOTE_READ;
1130 struct ib_fmr_attr fmr_attr = {
1131 .max_pages = RPCRDMA_MAX_DATA_SEGS,
1133 .page_shift = PAGE_SHIFT
1135 struct rpcrdma_mw *r;
1138 i = (buf->rb_max_requests + 1) * RPCRDMA_MAX_SEGS;
1139 dprintk("RPC: %s: initializing %d FMRs\n", __func__, i);
1142 r = kzalloc(sizeof(*r), GFP_KERNEL);
1146 r->r.fmr = ib_alloc_fmr(ia->ri_pd, mr_access_flags, &fmr_attr);
1147 if (IS_ERR(r->r.fmr)) {
1148 rc = PTR_ERR(r->r.fmr);
1149 dprintk("RPC: %s: ib_alloc_fmr failed %i\n",
1154 list_add(&r->mw_list, &buf->rb_mws);
1155 list_add(&r->mw_all, &buf->rb_all);
1165 rpcrdma_init_frmrs(struct rpcrdma_ia *ia, struct rpcrdma_buffer *buf)
1167 struct rpcrdma_frmr *f;
1168 struct rpcrdma_mw *r;
1171 i = (buf->rb_max_requests + 1) * RPCRDMA_MAX_SEGS;
1172 dprintk("RPC: %s: initializing %d FRMRs\n", __func__, i);
1175 r = kzalloc(sizeof(*r), GFP_KERNEL);
1180 f->fr_mr = ib_alloc_fast_reg_mr(ia->ri_pd,
1181 ia->ri_max_frmr_depth);
1182 if (IS_ERR(f->fr_mr)) {
1183 rc = PTR_ERR(f->fr_mr);
1184 dprintk("RPC: %s: ib_alloc_fast_reg_mr "
1185 "failed %i\n", __func__, rc);
1189 f->fr_pgl = ib_alloc_fast_reg_page_list(ia->ri_id->device,
1190 ia->ri_max_frmr_depth);
1191 if (IS_ERR(f->fr_pgl)) {
1192 rc = PTR_ERR(f->fr_pgl);
1193 dprintk("RPC: %s: ib_alloc_fast_reg_page_list "
1194 "failed %i\n", __func__, rc);
1196 ib_dereg_mr(f->fr_mr);
1200 list_add(&r->mw_list, &buf->rb_mws);
1201 list_add(&r->mw_all, &buf->rb_all);
1212 rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
1214 struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
1215 struct rpcrdma_ia *ia = &r_xprt->rx_ia;
1216 struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
1221 buf->rb_max_requests = cdata->max_requests;
1222 spin_lock_init(&buf->rb_lock);
1224 /* Need to allocate:
1225 * 1. arrays for send and recv pointers
1226 * 2. arrays of struct rpcrdma_req to fill in pointers
1227 * 3. array of struct rpcrdma_rep for replies
1228 * Send/recv buffers in req/rep need to be registered
1230 len = buf->rb_max_requests *
1231 (sizeof(struct rpcrdma_req *) + sizeof(struct rpcrdma_rep *));
1233 p = kzalloc(len, GFP_KERNEL);
1235 dprintk("RPC: %s: req_t/rep_t/pad kzalloc(%zd) failed\n",
1240 buf->rb_pool = p; /* for freeing it later */
1242 buf->rb_send_bufs = (struct rpcrdma_req **) p;
1243 p = (char *) &buf->rb_send_bufs[buf->rb_max_requests];
1244 buf->rb_recv_bufs = (struct rpcrdma_rep **) p;
1245 p = (char *) &buf->rb_recv_bufs[buf->rb_max_requests];
1247 INIT_LIST_HEAD(&buf->rb_mws);
1248 INIT_LIST_HEAD(&buf->rb_all);
1249 switch (ia->ri_memreg_strategy) {
1251 rc = rpcrdma_init_frmrs(ia, buf);
1255 case RPCRDMA_MTHCAFMR:
1256 rc = rpcrdma_init_fmrs(ia, buf);
1264 for (i = 0; i < buf->rb_max_requests; i++) {
1265 struct rpcrdma_req *req;
1266 struct rpcrdma_rep *rep;
1268 req = rpcrdma_create_req(r_xprt);
1270 dprintk("RPC: %s: request buffer %d alloc"
1271 " failed\n", __func__, i);
1275 buf->rb_send_bufs[i] = req;
1277 rep = rpcrdma_create_rep(r_xprt);
1279 dprintk("RPC: %s: reply buffer %d alloc failed\n",
1284 buf->rb_recv_bufs[i] = rep;
1289 rpcrdma_buffer_destroy(buf);
1294 rpcrdma_destroy_rep(struct rpcrdma_ia *ia, struct rpcrdma_rep *rep)
1299 rpcrdma_free_regbuf(ia, rep->rr_rdmabuf);
1304 rpcrdma_destroy_req(struct rpcrdma_ia *ia, struct rpcrdma_req *req)
1309 rpcrdma_free_regbuf(ia, req->rl_sendbuf);
1310 rpcrdma_free_regbuf(ia, req->rl_rdmabuf);
1315 rpcrdma_destroy_fmrs(struct rpcrdma_buffer *buf)
1317 struct rpcrdma_mw *r;
1320 while (!list_empty(&buf->rb_all)) {
1321 r = list_entry(buf->rb_all.next, struct rpcrdma_mw, mw_all);
1322 list_del(&r->mw_all);
1323 list_del(&r->mw_list);
1325 rc = ib_dealloc_fmr(r->r.fmr);
1327 dprintk("RPC: %s: ib_dealloc_fmr failed %i\n",
1335 rpcrdma_destroy_frmrs(struct rpcrdma_buffer *buf)
1337 struct rpcrdma_mw *r;
1340 while (!list_empty(&buf->rb_all)) {
1341 r = list_entry(buf->rb_all.next, struct rpcrdma_mw, mw_all);
1342 list_del(&r->mw_all);
1343 list_del(&r->mw_list);
1345 rc = ib_dereg_mr(r->r.frmr.fr_mr);
1347 dprintk("RPC: %s: ib_dereg_mr failed %i\n",
1349 ib_free_fast_reg_page_list(r->r.frmr.fr_pgl);
1356 rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
1358 struct rpcrdma_ia *ia = rdmab_to_ia(buf);
1361 /* clean up in reverse order from create
1362 * 1. recv mr memory (mr free, then kfree)
1363 * 2. send mr memory (mr free, then kfree)
1366 dprintk("RPC: %s: entering\n", __func__);
1368 for (i = 0; i < buf->rb_max_requests; i++) {
1369 if (buf->rb_recv_bufs)
1370 rpcrdma_destroy_rep(ia, buf->rb_recv_bufs[i]);
1371 if (buf->rb_send_bufs)
1372 rpcrdma_destroy_req(ia, buf->rb_send_bufs[i]);
1375 switch (ia->ri_memreg_strategy) {
1377 rpcrdma_destroy_frmrs(buf);
1379 case RPCRDMA_MTHCAFMR:
1380 rpcrdma_destroy_fmrs(buf);
1386 kfree(buf->rb_pool);
1389 /* After a disconnect, unmap all FMRs.
1391 * This is invoked only in the transport connect worker in order
1392 * to serialize with rpcrdma_register_fmr_external().
1395 rpcrdma_reset_fmrs(struct rpcrdma_ia *ia)
1397 struct rpcrdma_xprt *r_xprt =
1398 container_of(ia, struct rpcrdma_xprt, rx_ia);
1399 struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
1400 struct list_head *pos;
1401 struct rpcrdma_mw *r;
1405 list_for_each(pos, &buf->rb_all) {
1406 r = list_entry(pos, struct rpcrdma_mw, mw_all);
1409 list_add(&r->r.fmr->list, &l);
1410 rc = ib_unmap_fmr(&l);
1412 dprintk("RPC: %s: ib_unmap_fmr failed %i\n",
1417 /* After a disconnect, a flushed FAST_REG_MR can leave an FRMR in
1418 * an unusable state. Find FRMRs in this state and dereg / reg
1419 * each. FRMRs that are VALID and attached to an rpcrdma_req are
1422 * This gives all in-use FRMRs a fresh rkey and leaves them INVALID.
1424 * This is invoked only in the transport connect worker in order
1425 * to serialize with rpcrdma_register_frmr_external().
1428 rpcrdma_reset_frmrs(struct rpcrdma_ia *ia)
1430 struct rpcrdma_xprt *r_xprt =
1431 container_of(ia, struct rpcrdma_xprt, rx_ia);
1432 struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
1433 struct list_head *pos;
1434 struct rpcrdma_mw *r;
1437 list_for_each(pos, &buf->rb_all) {
1438 r = list_entry(pos, struct rpcrdma_mw, mw_all);
1440 if (r->r.frmr.fr_state == FRMR_IS_INVALID)
1443 rc = ib_dereg_mr(r->r.frmr.fr_mr);
1445 dprintk("RPC: %s: ib_dereg_mr failed %i\n",
1447 ib_free_fast_reg_page_list(r->r.frmr.fr_pgl);
1449 r->r.frmr.fr_mr = ib_alloc_fast_reg_mr(ia->ri_pd,
1450 ia->ri_max_frmr_depth);
1451 if (IS_ERR(r->r.frmr.fr_mr)) {
1452 rc = PTR_ERR(r->r.frmr.fr_mr);
1453 dprintk("RPC: %s: ib_alloc_fast_reg_mr"
1454 " failed %i\n", __func__, rc);
1457 r->r.frmr.fr_pgl = ib_alloc_fast_reg_page_list(
1459 ia->ri_max_frmr_depth);
1460 if (IS_ERR(r->r.frmr.fr_pgl)) {
1461 rc = PTR_ERR(r->r.frmr.fr_pgl);
1463 "ib_alloc_fast_reg_page_list "
1464 "failed %i\n", __func__, rc);
1466 ib_dereg_mr(r->r.frmr.fr_mr);
1469 r->r.frmr.fr_state = FRMR_IS_INVALID;
1473 /* "*mw" can be NULL when rpcrdma_buffer_get_mrs() fails, leaving
1474 * some req segments uninitialized.
1477 rpcrdma_buffer_put_mr(struct rpcrdma_mw **mw, struct rpcrdma_buffer *buf)
1480 list_add_tail(&(*mw)->mw_list, &buf->rb_mws);
1485 /* Cycle mw's back in reverse order, and "spin" them.
1486 * This delays and scrambles reuse as much as possible.
1489 rpcrdma_buffer_put_mrs(struct rpcrdma_req *req, struct rpcrdma_buffer *buf)
1491 struct rpcrdma_mr_seg *seg = req->rl_segments;
1492 struct rpcrdma_mr_seg *seg1 = seg;
1495 for (i = 1, seg++; i < RPCRDMA_MAX_SEGS; seg++, i++)
1496 rpcrdma_buffer_put_mr(&seg->rl_mw, buf);
1497 rpcrdma_buffer_put_mr(&seg1->rl_mw, buf);
1501 rpcrdma_buffer_put_sendbuf(struct rpcrdma_req *req, struct rpcrdma_buffer *buf)
1503 buf->rb_send_bufs[--buf->rb_send_index] = req;
1505 if (req->rl_reply) {
1506 buf->rb_recv_bufs[--buf->rb_recv_index] = req->rl_reply;
1507 req->rl_reply->rr_func = NULL;
1508 req->rl_reply = NULL;
1512 /* rpcrdma_unmap_one() was already done by rpcrdma_deregister_frmr_external().
1513 * Redo only the ib_post_send().
1516 rpcrdma_retry_local_inv(struct rpcrdma_mw *r, struct rpcrdma_ia *ia)
1518 struct rpcrdma_xprt *r_xprt =
1519 container_of(ia, struct rpcrdma_xprt, rx_ia);
1520 struct ib_send_wr invalidate_wr, *bad_wr;
1523 dprintk("RPC: %s: FRMR %p is stale\n", __func__, r);
1525 /* When this FRMR is re-inserted into rb_mws, it is no longer stale */
1526 r->r.frmr.fr_state = FRMR_IS_INVALID;
1528 memset(&invalidate_wr, 0, sizeof(invalidate_wr));
1529 invalidate_wr.wr_id = (unsigned long)(void *)r;
1530 invalidate_wr.opcode = IB_WR_LOCAL_INV;
1531 invalidate_wr.ex.invalidate_rkey = r->r.frmr.fr_mr->rkey;
1532 DECR_CQCOUNT(&r_xprt->rx_ep);
1534 dprintk("RPC: %s: frmr %p invalidating rkey %08x\n",
1535 __func__, r, r->r.frmr.fr_mr->rkey);
1537 read_lock(&ia->ri_qplock);
1538 rc = ib_post_send(ia->ri_id->qp, &invalidate_wr, &bad_wr);
1539 read_unlock(&ia->ri_qplock);
1541 /* Force rpcrdma_buffer_get() to retry */
1542 r->r.frmr.fr_state = FRMR_IS_STALE;
1543 dprintk("RPC: %s: ib_post_send failed, %i\n",
1549 rpcrdma_retry_flushed_linv(struct list_head *stale,
1550 struct rpcrdma_buffer *buf)
1552 struct rpcrdma_ia *ia = rdmab_to_ia(buf);
1553 struct list_head *pos;
1554 struct rpcrdma_mw *r;
1555 unsigned long flags;
1557 list_for_each(pos, stale) {
1558 r = list_entry(pos, struct rpcrdma_mw, mw_list);
1559 rpcrdma_retry_local_inv(r, ia);
1562 spin_lock_irqsave(&buf->rb_lock, flags);
1563 list_splice_tail(stale, &buf->rb_mws);
1564 spin_unlock_irqrestore(&buf->rb_lock, flags);
1567 static struct rpcrdma_req *
1568 rpcrdma_buffer_get_frmrs(struct rpcrdma_req *req, struct rpcrdma_buffer *buf,
1569 struct list_head *stale)
1571 struct rpcrdma_mw *r;
1574 i = RPCRDMA_MAX_SEGS - 1;
1575 while (!list_empty(&buf->rb_mws)) {
1576 r = list_entry(buf->rb_mws.next,
1577 struct rpcrdma_mw, mw_list);
1578 list_del(&r->mw_list);
1579 if (r->r.frmr.fr_state == FRMR_IS_STALE) {
1580 list_add(&r->mw_list, stale);
1583 req->rl_segments[i].rl_mw = r;
1584 if (unlikely(i-- == 0))
1585 return req; /* Success */
1588 /* Not enough entries on rb_mws for this req */
1589 rpcrdma_buffer_put_sendbuf(req, buf);
1590 rpcrdma_buffer_put_mrs(req, buf);
1594 static struct rpcrdma_req *
1595 rpcrdma_buffer_get_fmrs(struct rpcrdma_req *req, struct rpcrdma_buffer *buf)
1597 struct rpcrdma_mw *r;
1600 i = RPCRDMA_MAX_SEGS - 1;
1601 while (!list_empty(&buf->rb_mws)) {
1602 r = list_entry(buf->rb_mws.next,
1603 struct rpcrdma_mw, mw_list);
1604 list_del(&r->mw_list);
1605 req->rl_segments[i].rl_mw = r;
1606 if (unlikely(i-- == 0))
1607 return req; /* Success */
1610 /* Not enough entries on rb_mws for this req */
1611 rpcrdma_buffer_put_sendbuf(req, buf);
1612 rpcrdma_buffer_put_mrs(req, buf);
1617 * Get a set of request/reply buffers.
1619 * Reply buffer (if needed) is attached to send buffer upon return.
1621 * rb_send_index and rb_recv_index MUST always be pointing to the
1622 * *next* available buffer (non-NULL). They are incremented after
1623 * removing buffers, and decremented *before* returning them.
1625 struct rpcrdma_req *
1626 rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
1628 struct rpcrdma_ia *ia = rdmab_to_ia(buffers);
1629 struct list_head stale;
1630 struct rpcrdma_req *req;
1631 unsigned long flags;
1633 spin_lock_irqsave(&buffers->rb_lock, flags);
1634 if (buffers->rb_send_index == buffers->rb_max_requests) {
1635 spin_unlock_irqrestore(&buffers->rb_lock, flags);
1636 dprintk("RPC: %s: out of request buffers\n", __func__);
1637 return ((struct rpcrdma_req *)NULL);
1640 req = buffers->rb_send_bufs[buffers->rb_send_index];
1641 if (buffers->rb_send_index < buffers->rb_recv_index) {
1642 dprintk("RPC: %s: %d extra receives outstanding (ok)\n",
1644 buffers->rb_recv_index - buffers->rb_send_index);
1645 req->rl_reply = NULL;
1647 req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
1648 buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
1650 buffers->rb_send_bufs[buffers->rb_send_index++] = NULL;
1652 INIT_LIST_HEAD(&stale);
1653 switch (ia->ri_memreg_strategy) {
1655 req = rpcrdma_buffer_get_frmrs(req, buffers, &stale);
1657 case RPCRDMA_MTHCAFMR:
1658 req = rpcrdma_buffer_get_fmrs(req, buffers);
1663 spin_unlock_irqrestore(&buffers->rb_lock, flags);
1664 if (!list_empty(&stale))
1665 rpcrdma_retry_flushed_linv(&stale, buffers);
1670 * Put request/reply buffers back into pool.
1671 * Pre-decrement counter/array index.
1674 rpcrdma_buffer_put(struct rpcrdma_req *req)
1676 struct rpcrdma_buffer *buffers = req->rl_buffer;
1677 struct rpcrdma_ia *ia = rdmab_to_ia(buffers);
1678 unsigned long flags;
1680 spin_lock_irqsave(&buffers->rb_lock, flags);
1681 rpcrdma_buffer_put_sendbuf(req, buffers);
1682 switch (ia->ri_memreg_strategy) {
1684 case RPCRDMA_MTHCAFMR:
1685 rpcrdma_buffer_put_mrs(req, buffers);
1690 spin_unlock_irqrestore(&buffers->rb_lock, flags);
1694 * Recover reply buffers from pool.
1695 * This happens when recovering from error conditions.
1696 * Post-increment counter/array index.
1699 rpcrdma_recv_buffer_get(struct rpcrdma_req *req)
1701 struct rpcrdma_buffer *buffers = req->rl_buffer;
1702 unsigned long flags;
1704 spin_lock_irqsave(&buffers->rb_lock, flags);
1705 if (buffers->rb_recv_index < buffers->rb_max_requests) {
1706 req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
1707 buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
1709 spin_unlock_irqrestore(&buffers->rb_lock, flags);
1713 * Put reply buffers back into pool when not attached to
1714 * request. This happens in error conditions.
1717 rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
1719 struct rpcrdma_buffer *buffers = rep->rr_buffer;
1720 unsigned long flags;
1722 rep->rr_func = NULL;
1723 spin_lock_irqsave(&buffers->rb_lock, flags);
1724 buffers->rb_recv_bufs[--buffers->rb_recv_index] = rep;
1725 spin_unlock_irqrestore(&buffers->rb_lock, flags);
1729 * Wrappers for internal-use kmalloc memory registration, used by buffer code.
1733 rpcrdma_register_internal(struct rpcrdma_ia *ia, void *va, int len,
1734 struct ib_mr **mrp, struct ib_sge *iov)
1736 struct ib_phys_buf ipb;
1741 * All memory passed here was kmalloc'ed, therefore phys-contiguous.
1743 iov->addr = ib_dma_map_single(ia->ri_id->device,
1744 va, len, DMA_BIDIRECTIONAL);
1745 if (ib_dma_mapping_error(ia->ri_id->device, iov->addr))
1750 if (ia->ri_have_dma_lkey) {
1752 iov->lkey = ia->ri_dma_lkey;
1754 } else if (ia->ri_bind_mem != NULL) {
1756 iov->lkey = ia->ri_bind_mem->lkey;
1760 ipb.addr = iov->addr;
1761 ipb.size = iov->length;
1762 mr = ib_reg_phys_mr(ia->ri_pd, &ipb, 1,
1763 IB_ACCESS_LOCAL_WRITE, &iov->addr);
1765 dprintk("RPC: %s: phys convert: 0x%llx "
1766 "registered 0x%llx length %d\n",
1767 __func__, (unsigned long long)ipb.addr,
1768 (unsigned long long)iov->addr, len);
1773 dprintk("RPC: %s: failed with %i\n", __func__, rc);
1776 iov->lkey = mr->lkey;
1784 rpcrdma_deregister_internal(struct rpcrdma_ia *ia,
1785 struct ib_mr *mr, struct ib_sge *iov)
1789 ib_dma_unmap_single(ia->ri_id->device,
1790 iov->addr, iov->length, DMA_BIDIRECTIONAL);
1795 rc = ib_dereg_mr(mr);
1797 dprintk("RPC: %s: ib_dereg_mr failed %i\n", __func__, rc);
1802 * rpcrdma_alloc_regbuf - kmalloc and register memory for SEND/RECV buffers
1803 * @ia: controlling rpcrdma_ia
1804 * @size: size of buffer to be allocated, in bytes
1807 * Returns pointer to private header of an area of internally
1808 * registered memory, or an ERR_PTR. The registered buffer follows
1809 * the end of the private header.
1811 * xprtrdma uses a regbuf for posting an outgoing RDMA SEND, or for
1812 * receiving the payload of RDMA RECV operations. regbufs are not
1813 * used for RDMA READ/WRITE operations, thus are registered only for
1816 struct rpcrdma_regbuf *
1817 rpcrdma_alloc_regbuf(struct rpcrdma_ia *ia, size_t size, gfp_t flags)
1819 struct rpcrdma_regbuf *rb;
1823 rb = kmalloc(sizeof(*rb) + size, flags);
1828 rb->rg_owner = NULL;
1829 rc = rpcrdma_register_internal(ia, rb->rg_base, size,
1830 &rb->rg_mr, &rb->rg_iov);
1843 * rpcrdma_free_regbuf - deregister and free registered buffer
1844 * @ia: controlling rpcrdma_ia
1845 * @rb: regbuf to be deregistered and freed
1848 rpcrdma_free_regbuf(struct rpcrdma_ia *ia, struct rpcrdma_regbuf *rb)
1851 rpcrdma_deregister_internal(ia, rb->rg_mr, &rb->rg_iov);
1857 * Wrappers for chunk registration, shared by read/write chunk code.
1861 rpcrdma_map_one(struct rpcrdma_ia *ia, struct rpcrdma_mr_seg *seg, int writing)
1863 seg->mr_dir = writing ? DMA_FROM_DEVICE : DMA_TO_DEVICE;
1864 seg->mr_dmalen = seg->mr_len;
1866 seg->mr_dma = ib_dma_map_page(ia->ri_id->device,
1867 seg->mr_page, offset_in_page(seg->mr_offset),
1868 seg->mr_dmalen, seg->mr_dir);
1870 seg->mr_dma = ib_dma_map_single(ia->ri_id->device,
1872 seg->mr_dmalen, seg->mr_dir);
1873 if (ib_dma_mapping_error(ia->ri_id->device, seg->mr_dma)) {
1874 dprintk("RPC: %s: mr_dma %llx mr_offset %p mr_dma_len %zu\n",
1876 (unsigned long long)seg->mr_dma,
1877 seg->mr_offset, seg->mr_dmalen);
1882 rpcrdma_unmap_one(struct rpcrdma_ia *ia, struct rpcrdma_mr_seg *seg)
1885 ib_dma_unmap_page(ia->ri_id->device,
1886 seg->mr_dma, seg->mr_dmalen, seg->mr_dir);
1888 ib_dma_unmap_single(ia->ri_id->device,
1889 seg->mr_dma, seg->mr_dmalen, seg->mr_dir);
1893 rpcrdma_register_frmr_external(struct rpcrdma_mr_seg *seg,
1894 int *nsegs, int writing, struct rpcrdma_ia *ia,
1895 struct rpcrdma_xprt *r_xprt)
1897 struct rpcrdma_mr_seg *seg1 = seg;
1898 struct rpcrdma_mw *mw = seg1->rl_mw;
1899 struct rpcrdma_frmr *frmr = &mw->r.frmr;
1900 struct ib_mr *mr = frmr->fr_mr;
1901 struct ib_send_wr fastreg_wr, *bad_wr;
1909 pageoff = offset_in_page(seg1->mr_offset);
1910 seg1->mr_offset -= pageoff; /* start of page */
1911 seg1->mr_len += pageoff;
1913 if (*nsegs > ia->ri_max_frmr_depth)
1914 *nsegs = ia->ri_max_frmr_depth;
1915 for (page_no = i = 0; i < *nsegs;) {
1916 rpcrdma_map_one(ia, seg, writing);
1918 for (seg_len = seg->mr_len; seg_len > 0; seg_len -= PAGE_SIZE) {
1919 frmr->fr_pgl->page_list[page_no++] = pa;
1925 /* Check for holes */
1926 if ((i < *nsegs && offset_in_page(seg->mr_offset)) ||
1927 offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len))
1930 dprintk("RPC: %s: Using frmr %p to map %d segments\n",
1933 frmr->fr_state = FRMR_IS_VALID;
1935 memset(&fastreg_wr, 0, sizeof(fastreg_wr));
1936 fastreg_wr.wr_id = (unsigned long)(void *)mw;
1937 fastreg_wr.opcode = IB_WR_FAST_REG_MR;
1938 fastreg_wr.wr.fast_reg.iova_start = seg1->mr_dma;
1939 fastreg_wr.wr.fast_reg.page_list = frmr->fr_pgl;
1940 fastreg_wr.wr.fast_reg.page_list_len = page_no;
1941 fastreg_wr.wr.fast_reg.page_shift = PAGE_SHIFT;
1942 fastreg_wr.wr.fast_reg.length = page_no << PAGE_SHIFT;
1943 if (fastreg_wr.wr.fast_reg.length < len) {
1949 key = (u8)(mr->rkey & 0x000000FF);
1950 ib_update_fast_reg_key(mr, ++key);
1952 fastreg_wr.wr.fast_reg.access_flags = (writing ?
1953 IB_ACCESS_REMOTE_WRITE | IB_ACCESS_LOCAL_WRITE :
1954 IB_ACCESS_REMOTE_READ);
1955 fastreg_wr.wr.fast_reg.rkey = mr->rkey;
1956 DECR_CQCOUNT(&r_xprt->rx_ep);
1958 rc = ib_post_send(ia->ri_id->qp, &fastreg_wr, &bad_wr);
1960 dprintk("RPC: %s: failed ib_post_send for register,"
1961 " status %i\n", __func__, rc);
1962 ib_update_fast_reg_key(mr, --key);
1965 seg1->mr_rkey = mr->rkey;
1966 seg1->mr_base = seg1->mr_dma + pageoff;
1973 frmr->fr_state = FRMR_IS_INVALID;
1975 rpcrdma_unmap_one(ia, --seg);
1980 rpcrdma_deregister_frmr_external(struct rpcrdma_mr_seg *seg,
1981 struct rpcrdma_ia *ia, struct rpcrdma_xprt *r_xprt)
1983 struct rpcrdma_mr_seg *seg1 = seg;
1984 struct ib_send_wr invalidate_wr, *bad_wr;
1987 seg1->rl_mw->r.frmr.fr_state = FRMR_IS_INVALID;
1989 memset(&invalidate_wr, 0, sizeof invalidate_wr);
1990 invalidate_wr.wr_id = (unsigned long)(void *)seg1->rl_mw;
1991 invalidate_wr.opcode = IB_WR_LOCAL_INV;
1992 invalidate_wr.ex.invalidate_rkey = seg1->rl_mw->r.frmr.fr_mr->rkey;
1993 DECR_CQCOUNT(&r_xprt->rx_ep);
1995 read_lock(&ia->ri_qplock);
1996 while (seg1->mr_nsegs--)
1997 rpcrdma_unmap_one(ia, seg++);
1998 rc = ib_post_send(ia->ri_id->qp, &invalidate_wr, &bad_wr);
1999 read_unlock(&ia->ri_qplock);
2001 /* Force rpcrdma_buffer_get() to retry */
2002 seg1->rl_mw->r.frmr.fr_state = FRMR_IS_STALE;
2003 dprintk("RPC: %s: failed ib_post_send for invalidate,"
2004 " status %i\n", __func__, rc);
2010 rpcrdma_register_fmr_external(struct rpcrdma_mr_seg *seg,
2011 int *nsegs, int writing, struct rpcrdma_ia *ia)
2013 struct rpcrdma_mr_seg *seg1 = seg;
2014 u64 physaddrs[RPCRDMA_MAX_DATA_SEGS];
2015 int len, pageoff, i, rc;
2017 pageoff = offset_in_page(seg1->mr_offset);
2018 seg1->mr_offset -= pageoff; /* start of page */
2019 seg1->mr_len += pageoff;
2021 if (*nsegs > RPCRDMA_MAX_DATA_SEGS)
2022 *nsegs = RPCRDMA_MAX_DATA_SEGS;
2023 for (i = 0; i < *nsegs;) {
2024 rpcrdma_map_one(ia, seg, writing);
2025 physaddrs[i] = seg->mr_dma;
2029 /* Check for holes */
2030 if ((i < *nsegs && offset_in_page(seg->mr_offset)) ||
2031 offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len))
2034 rc = ib_map_phys_fmr(seg1->rl_mw->r.fmr, physaddrs, i, seg1->mr_dma);
2036 dprintk("RPC: %s: failed ib_map_phys_fmr "
2037 "%u@0x%llx+%i (%d)... status %i\n", __func__,
2038 len, (unsigned long long)seg1->mr_dma,
2041 rpcrdma_unmap_one(ia, --seg);
2043 seg1->mr_rkey = seg1->rl_mw->r.fmr->rkey;
2044 seg1->mr_base = seg1->mr_dma + pageoff;
2053 rpcrdma_deregister_fmr_external(struct rpcrdma_mr_seg *seg,
2054 struct rpcrdma_ia *ia)
2056 struct rpcrdma_mr_seg *seg1 = seg;
2060 list_add(&seg1->rl_mw->r.fmr->list, &l);
2061 rc = ib_unmap_fmr(&l);
2062 read_lock(&ia->ri_qplock);
2063 while (seg1->mr_nsegs--)
2064 rpcrdma_unmap_one(ia, seg++);
2065 read_unlock(&ia->ri_qplock);
2067 dprintk("RPC: %s: failed ib_unmap_fmr,"
2068 " status %i\n", __func__, rc);
2073 rpcrdma_register_external(struct rpcrdma_mr_seg *seg,
2074 int nsegs, int writing, struct rpcrdma_xprt *r_xprt)
2076 struct rpcrdma_ia *ia = &r_xprt->rx_ia;
2079 switch (ia->ri_memreg_strategy) {
2081 case RPCRDMA_ALLPHYSICAL:
2082 rpcrdma_map_one(ia, seg, writing);
2083 seg->mr_rkey = ia->ri_bind_mem->rkey;
2084 seg->mr_base = seg->mr_dma;
2089 /* Registration using frmr registration */
2091 rc = rpcrdma_register_frmr_external(seg, &nsegs, writing, ia, r_xprt);
2094 /* Registration using fmr memory registration */
2095 case RPCRDMA_MTHCAFMR:
2096 rc = rpcrdma_register_fmr_external(seg, &nsegs, writing, ia);
2109 rpcrdma_deregister_external(struct rpcrdma_mr_seg *seg,
2110 struct rpcrdma_xprt *r_xprt)
2112 struct rpcrdma_ia *ia = &r_xprt->rx_ia;
2113 int nsegs = seg->mr_nsegs, rc;
2115 switch (ia->ri_memreg_strategy) {
2117 case RPCRDMA_ALLPHYSICAL:
2118 read_lock(&ia->ri_qplock);
2119 rpcrdma_unmap_one(ia, seg);
2120 read_unlock(&ia->ri_qplock);
2124 rc = rpcrdma_deregister_frmr_external(seg, ia, r_xprt);
2127 case RPCRDMA_MTHCAFMR:
2128 rc = rpcrdma_deregister_fmr_external(seg, ia);
2138 * Prepost any receive buffer, then post send.
2140 * Receive buffer is donated to hardware, reclaimed upon recv completion.
2143 rpcrdma_ep_post(struct rpcrdma_ia *ia,
2144 struct rpcrdma_ep *ep,
2145 struct rpcrdma_req *req)
2147 struct ib_send_wr send_wr, *send_wr_fail;
2148 struct rpcrdma_rep *rep = req->rl_reply;
2152 rc = rpcrdma_ep_post_recv(ia, ep, rep);
2155 req->rl_reply = NULL;
2158 send_wr.next = NULL;
2159 send_wr.wr_id = 0ULL; /* no send cookie */
2160 send_wr.sg_list = req->rl_send_iov;
2161 send_wr.num_sge = req->rl_niovs;
2162 send_wr.opcode = IB_WR_SEND;
2163 if (send_wr.num_sge == 4) /* no need to sync any pad (constant) */
2164 ib_dma_sync_single_for_device(ia->ri_id->device,
2165 req->rl_send_iov[3].addr, req->rl_send_iov[3].length,
2167 ib_dma_sync_single_for_device(ia->ri_id->device,
2168 req->rl_send_iov[1].addr, req->rl_send_iov[1].length,
2170 ib_dma_sync_single_for_device(ia->ri_id->device,
2171 req->rl_send_iov[0].addr, req->rl_send_iov[0].length,
2174 if (DECR_CQCOUNT(ep) > 0)
2175 send_wr.send_flags = 0;
2176 else { /* Provider must take a send completion every now and then */
2178 send_wr.send_flags = IB_SEND_SIGNALED;
2181 rc = ib_post_send(ia->ri_id->qp, &send_wr, &send_wr_fail);
2183 dprintk("RPC: %s: ib_post_send returned %i\n", __func__,
2190 * (Re)post a receive buffer.
2193 rpcrdma_ep_post_recv(struct rpcrdma_ia *ia,
2194 struct rpcrdma_ep *ep,
2195 struct rpcrdma_rep *rep)
2197 struct ib_recv_wr recv_wr, *recv_wr_fail;
2200 recv_wr.next = NULL;
2201 recv_wr.wr_id = (u64) (unsigned long) rep;
2202 recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov;
2203 recv_wr.num_sge = 1;
2205 ib_dma_sync_single_for_cpu(ia->ri_id->device,
2206 rdmab_addr(rep->rr_rdmabuf),
2207 rdmab_length(rep->rr_rdmabuf),
2210 rc = ib_post_recv(ia->ri_id->qp, &recv_wr, &recv_wr_fail);
2213 dprintk("RPC: %s: ib_post_recv returned %i\n", __func__,
2218 /* Physical mapping means one Read/Write list entry per-page.
2219 * All list entries must fit within an inline buffer
2221 * NB: The server must return a Write list for NFS READ,
2222 * which has the same constraint. Factor in the inline
2226 rpcrdma_physical_max_payload(struct rpcrdma_xprt *r_xprt)
2228 struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
2229 unsigned int inline_size, pages;
2231 inline_size = min_t(unsigned int,
2232 cdata->inline_wsize, cdata->inline_rsize);
2233 inline_size -= RPCRDMA_HDRLEN_MIN;
2234 pages = inline_size / sizeof(struct rpcrdma_segment);
2235 return pages << PAGE_SHIFT;
2239 rpcrdma_mr_max_payload(struct rpcrdma_xprt *r_xprt)
2241 return RPCRDMA_MAX_DATA_SEGS << PAGE_SHIFT;
2245 rpcrdma_max_payload(struct rpcrdma_xprt *r_xprt)
2249 switch (r_xprt->rx_ia.ri_memreg_strategy) {
2250 case RPCRDMA_ALLPHYSICAL:
2251 result = rpcrdma_physical_max_payload(r_xprt);
2254 result = rpcrdma_mr_max_payload(r_xprt);