]> git.karo-electronics.de Git - karo-tx-linux.git/blob - net/sunrpc/xprtrdma/verbs.c
xprtrdma: Remove BUG_ON() call sites
[karo-tx-linux.git] / net / sunrpc / xprtrdma / verbs.c
1 /*
2  * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
3  *
4  * This software is available to you under a choice of one of two
5  * licenses.  You may choose to be licensed under the terms of the GNU
6  * General Public License (GPL) Version 2, available from the file
7  * COPYING in the main directory of this source tree, or the BSD-type
8  * license below:
9  *
10  * Redistribution and use in source and binary forms, with or without
11  * modification, are permitted provided that the following conditions
12  * are met:
13  *
14  *      Redistributions of source code must retain the above copyright
15  *      notice, this list of conditions and the following disclaimer.
16  *
17  *      Redistributions in binary form must reproduce the above
18  *      copyright notice, this list of conditions and the following
19  *      disclaimer in the documentation and/or other materials provided
20  *      with the distribution.
21  *
22  *      Neither the name of the Network Appliance, Inc. nor the names of
23  *      its contributors may be used to endorse or promote products
24  *      derived from this software without specific prior written
25  *      permission.
26  *
27  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
28  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
29  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
30  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
31  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
32  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
33  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
34  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
35  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
36  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
37  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
38  */
39
40 /*
41  * verbs.c
42  *
43  * Encapsulates the major functions managing:
44  *  o adapters
45  *  o endpoints
46  *  o connections
47  *  o buffer memory
48  */
49
50 #include <linux/interrupt.h>
51 #include <linux/slab.h>
52 #include <asm/bitops.h>
53
54 #include "xprt_rdma.h"
55
56 /*
57  * Globals/Macros
58  */
59
60 #ifdef RPC_DEBUG
61 # define RPCDBG_FACILITY        RPCDBG_TRANS
62 #endif
63
64 /*
65  * internal functions
66  */
67
68 /*
69  * handle replies in tasklet context, using a single, global list
70  * rdma tasklet function -- just turn around and call the func
71  * for all replies on the list
72  */
73
74 static DEFINE_SPINLOCK(rpcrdma_tk_lock_g);
75 static LIST_HEAD(rpcrdma_tasklets_g);
76
77 static void
78 rpcrdma_run_tasklet(unsigned long data)
79 {
80         struct rpcrdma_rep *rep;
81         void (*func)(struct rpcrdma_rep *);
82         unsigned long flags;
83
84         data = data;
85         spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
86         while (!list_empty(&rpcrdma_tasklets_g)) {
87                 rep = list_entry(rpcrdma_tasklets_g.next,
88                                  struct rpcrdma_rep, rr_list);
89                 list_del(&rep->rr_list);
90                 func = rep->rr_func;
91                 rep->rr_func = NULL;
92                 spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
93
94                 if (func)
95                         func(rep);
96                 else
97                         rpcrdma_recv_buffer_put(rep);
98
99                 spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
100         }
101         spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
102 }
103
104 static DECLARE_TASKLET(rpcrdma_tasklet_g, rpcrdma_run_tasklet, 0UL);
105
106 static inline void
107 rpcrdma_schedule_tasklet(struct rpcrdma_rep *rep)
108 {
109         unsigned long flags;
110
111         spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
112         list_add_tail(&rep->rr_list, &rpcrdma_tasklets_g);
113         spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
114         tasklet_schedule(&rpcrdma_tasklet_g);
115 }
116
117 static void
118 rpcrdma_qp_async_error_upcall(struct ib_event *event, void *context)
119 {
120         struct rpcrdma_ep *ep = context;
121
122         dprintk("RPC:       %s: QP error %X on device %s ep %p\n",
123                 __func__, event->event, event->device->name, context);
124         if (ep->rep_connected == 1) {
125                 ep->rep_connected = -EIO;
126                 ep->rep_func(ep);
127                 wake_up_all(&ep->rep_connect_wait);
128         }
129 }
130
131 static void
132 rpcrdma_cq_async_error_upcall(struct ib_event *event, void *context)
133 {
134         struct rpcrdma_ep *ep = context;
135
136         dprintk("RPC:       %s: CQ error %X on device %s ep %p\n",
137                 __func__, event->event, event->device->name, context);
138         if (ep->rep_connected == 1) {
139                 ep->rep_connected = -EIO;
140                 ep->rep_func(ep);
141                 wake_up_all(&ep->rep_connect_wait);
142         }
143 }
144
145 static void
146 rpcrdma_sendcq_process_wc(struct ib_wc *wc)
147 {
148         struct rpcrdma_mw *frmr = (struct rpcrdma_mw *)(unsigned long)wc->wr_id;
149
150         dprintk("RPC:       %s: frmr %p status %X opcode %d\n",
151                 __func__, frmr, wc->status, wc->opcode);
152
153         if (wc->wr_id == 0ULL)
154                 return;
155         if (wc->status != IB_WC_SUCCESS)
156                 return;
157
158         if (wc->opcode == IB_WC_FAST_REG_MR)
159                 frmr->r.frmr.state = FRMR_IS_VALID;
160         else if (wc->opcode == IB_WC_LOCAL_INV)
161                 frmr->r.frmr.state = FRMR_IS_INVALID;
162 }
163
164 static int
165 rpcrdma_sendcq_poll(struct ib_cq *cq, struct rpcrdma_ep *ep)
166 {
167         struct ib_wc *wcs;
168         int budget, count, rc;
169
170         budget = RPCRDMA_WC_BUDGET / RPCRDMA_POLLSIZE;
171         do {
172                 wcs = ep->rep_send_wcs;
173
174                 rc = ib_poll_cq(cq, RPCRDMA_POLLSIZE, wcs);
175                 if (rc <= 0)
176                         return rc;
177
178                 count = rc;
179                 while (count-- > 0)
180                         rpcrdma_sendcq_process_wc(wcs++);
181         } while (rc == RPCRDMA_POLLSIZE && --budget);
182         return 0;
183 }
184
185 /*
186  * Handle send, fast_reg_mr, and local_inv completions.
187  *
188  * Send events are typically suppressed and thus do not result
189  * in an upcall. Occasionally one is signaled, however. This
190  * prevents the provider's completion queue from wrapping and
191  * losing a completion.
192  */
193 static void
194 rpcrdma_sendcq_upcall(struct ib_cq *cq, void *cq_context)
195 {
196         struct rpcrdma_ep *ep = (struct rpcrdma_ep *)cq_context;
197         int rc;
198
199         rc = rpcrdma_sendcq_poll(cq, ep);
200         if (rc) {
201                 dprintk("RPC:       %s: ib_poll_cq failed: %i\n",
202                         __func__, rc);
203                 return;
204         }
205
206         rc = ib_req_notify_cq(cq,
207                         IB_CQ_NEXT_COMP | IB_CQ_REPORT_MISSED_EVENTS);
208         if (rc == 0)
209                 return;
210         if (rc < 0) {
211                 dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
212                         __func__, rc);
213                 return;
214         }
215
216         rpcrdma_sendcq_poll(cq, ep);
217 }
218
219 static void
220 rpcrdma_recvcq_process_wc(struct ib_wc *wc)
221 {
222         struct rpcrdma_rep *rep =
223                         (struct rpcrdma_rep *)(unsigned long)wc->wr_id;
224
225         dprintk("RPC:       %s: rep %p status %X opcode %X length %u\n",
226                 __func__, rep, wc->status, wc->opcode, wc->byte_len);
227
228         if (wc->status != IB_WC_SUCCESS) {
229                 rep->rr_len = ~0U;
230                 goto out_schedule;
231         }
232         if (wc->opcode != IB_WC_RECV)
233                 return;
234
235         rep->rr_len = wc->byte_len;
236         ib_dma_sync_single_for_cpu(rdmab_to_ia(rep->rr_buffer)->ri_id->device,
237                         rep->rr_iov.addr, rep->rr_len, DMA_FROM_DEVICE);
238
239         if (rep->rr_len >= 16) {
240                 struct rpcrdma_msg *p = (struct rpcrdma_msg *)rep->rr_base;
241                 unsigned int credits = ntohl(p->rm_credit);
242
243                 if (credits == 0)
244                         credits = 1;    /* don't deadlock */
245                 else if (credits > rep->rr_buffer->rb_max_requests)
246                         credits = rep->rr_buffer->rb_max_requests;
247                 atomic_set(&rep->rr_buffer->rb_credits, credits);
248         }
249
250 out_schedule:
251         rpcrdma_schedule_tasklet(rep);
252 }
253
254 static int
255 rpcrdma_recvcq_poll(struct ib_cq *cq, struct rpcrdma_ep *ep)
256 {
257         struct ib_wc *wcs;
258         int budget, count, rc;
259
260         budget = RPCRDMA_WC_BUDGET / RPCRDMA_POLLSIZE;
261         do {
262                 wcs = ep->rep_recv_wcs;
263
264                 rc = ib_poll_cq(cq, RPCRDMA_POLLSIZE, wcs);
265                 if (rc <= 0)
266                         return rc;
267
268                 count = rc;
269                 while (count-- > 0)
270                         rpcrdma_recvcq_process_wc(wcs++);
271         } while (rc == RPCRDMA_POLLSIZE && --budget);
272         return 0;
273 }
274
275 /*
276  * Handle receive completions.
277  *
278  * It is reentrant but processes single events in order to maintain
279  * ordering of receives to keep server credits.
280  *
281  * It is the responsibility of the scheduled tasklet to return
282  * recv buffers to the pool. NOTE: this affects synchronization of
283  * connection shutdown. That is, the structures required for
284  * the completion of the reply handler must remain intact until
285  * all memory has been reclaimed.
286  */
287 static void
288 rpcrdma_recvcq_upcall(struct ib_cq *cq, void *cq_context)
289 {
290         struct rpcrdma_ep *ep = (struct rpcrdma_ep *)cq_context;
291         int rc;
292
293         rc = rpcrdma_recvcq_poll(cq, ep);
294         if (rc) {
295                 dprintk("RPC:       %s: ib_poll_cq failed: %i\n",
296                         __func__, rc);
297                 return;
298         }
299
300         rc = ib_req_notify_cq(cq,
301                         IB_CQ_NEXT_COMP | IB_CQ_REPORT_MISSED_EVENTS);
302         if (rc == 0)
303                 return;
304         if (rc < 0) {
305                 dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
306                         __func__, rc);
307                 return;
308         }
309
310         rpcrdma_recvcq_poll(cq, ep);
311 }
312
313 #ifdef RPC_DEBUG
314 static const char * const conn[] = {
315         "address resolved",
316         "address error",
317         "route resolved",
318         "route error",
319         "connect request",
320         "connect response",
321         "connect error",
322         "unreachable",
323         "rejected",
324         "established",
325         "disconnected",
326         "device removal"
327 };
328 #endif
329
330 static int
331 rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
332 {
333         struct rpcrdma_xprt *xprt = id->context;
334         struct rpcrdma_ia *ia = &xprt->rx_ia;
335         struct rpcrdma_ep *ep = &xprt->rx_ep;
336 #ifdef RPC_DEBUG
337         struct sockaddr_in *addr = (struct sockaddr_in *) &ep->rep_remote_addr;
338 #endif
339         struct ib_qp_attr attr;
340         struct ib_qp_init_attr iattr;
341         int connstate = 0;
342
343         switch (event->event) {
344         case RDMA_CM_EVENT_ADDR_RESOLVED:
345         case RDMA_CM_EVENT_ROUTE_RESOLVED:
346                 ia->ri_async_rc = 0;
347                 complete(&ia->ri_done);
348                 break;
349         case RDMA_CM_EVENT_ADDR_ERROR:
350                 ia->ri_async_rc = -EHOSTUNREACH;
351                 dprintk("RPC:       %s: CM address resolution error, ep 0x%p\n",
352                         __func__, ep);
353                 complete(&ia->ri_done);
354                 break;
355         case RDMA_CM_EVENT_ROUTE_ERROR:
356                 ia->ri_async_rc = -ENETUNREACH;
357                 dprintk("RPC:       %s: CM route resolution error, ep 0x%p\n",
358                         __func__, ep);
359                 complete(&ia->ri_done);
360                 break;
361         case RDMA_CM_EVENT_ESTABLISHED:
362                 connstate = 1;
363                 ib_query_qp(ia->ri_id->qp, &attr,
364                         IB_QP_MAX_QP_RD_ATOMIC | IB_QP_MAX_DEST_RD_ATOMIC,
365                         &iattr);
366                 dprintk("RPC:       %s: %d responder resources"
367                         " (%d initiator)\n",
368                         __func__, attr.max_dest_rd_atomic, attr.max_rd_atomic);
369                 goto connected;
370         case RDMA_CM_EVENT_CONNECT_ERROR:
371                 connstate = -ENOTCONN;
372                 goto connected;
373         case RDMA_CM_EVENT_UNREACHABLE:
374                 connstate = -ENETDOWN;
375                 goto connected;
376         case RDMA_CM_EVENT_REJECTED:
377                 connstate = -ECONNREFUSED;
378                 goto connected;
379         case RDMA_CM_EVENT_DISCONNECTED:
380                 connstate = -ECONNABORTED;
381                 goto connected;
382         case RDMA_CM_EVENT_DEVICE_REMOVAL:
383                 connstate = -ENODEV;
384 connected:
385                 dprintk("RPC:       %s: %s: %pI4:%u (ep 0x%p event 0x%x)\n",
386                         __func__,
387                         (event->event <= 11) ? conn[event->event] :
388                                                 "unknown connection error",
389                         &addr->sin_addr.s_addr,
390                         ntohs(addr->sin_port),
391                         ep, event->event);
392                 atomic_set(&rpcx_to_rdmax(ep->rep_xprt)->rx_buf.rb_credits, 1);
393                 dprintk("RPC:       %s: %sconnected\n",
394                                         __func__, connstate > 0 ? "" : "dis");
395                 ep->rep_connected = connstate;
396                 ep->rep_func(ep);
397                 wake_up_all(&ep->rep_connect_wait);
398                 break;
399         default:
400                 dprintk("RPC:       %s: unexpected CM event %d\n",
401                         __func__, event->event);
402                 break;
403         }
404
405 #ifdef RPC_DEBUG
406         if (connstate == 1) {
407                 int ird = attr.max_dest_rd_atomic;
408                 int tird = ep->rep_remote_cma.responder_resources;
409                 printk(KERN_INFO "rpcrdma: connection to %pI4:%u "
410                         "on %s, memreg %d slots %d ird %d%s\n",
411                         &addr->sin_addr.s_addr,
412                         ntohs(addr->sin_port),
413                         ia->ri_id->device->name,
414                         ia->ri_memreg_strategy,
415                         xprt->rx_buf.rb_max_requests,
416                         ird, ird < 4 && ird < tird / 2 ? " (low!)" : "");
417         } else if (connstate < 0) {
418                 printk(KERN_INFO "rpcrdma: connection to %pI4:%u closed (%d)\n",
419                         &addr->sin_addr.s_addr,
420                         ntohs(addr->sin_port),
421                         connstate);
422         }
423 #endif
424
425         return 0;
426 }
427
428 static struct rdma_cm_id *
429 rpcrdma_create_id(struct rpcrdma_xprt *xprt,
430                         struct rpcrdma_ia *ia, struct sockaddr *addr)
431 {
432         struct rdma_cm_id *id;
433         int rc;
434
435         init_completion(&ia->ri_done);
436
437         id = rdma_create_id(rpcrdma_conn_upcall, xprt, RDMA_PS_TCP, IB_QPT_RC);
438         if (IS_ERR(id)) {
439                 rc = PTR_ERR(id);
440                 dprintk("RPC:       %s: rdma_create_id() failed %i\n",
441                         __func__, rc);
442                 return id;
443         }
444
445         ia->ri_async_rc = -ETIMEDOUT;
446         rc = rdma_resolve_addr(id, NULL, addr, RDMA_RESOLVE_TIMEOUT);
447         if (rc) {
448                 dprintk("RPC:       %s: rdma_resolve_addr() failed %i\n",
449                         __func__, rc);
450                 goto out;
451         }
452         wait_for_completion_interruptible_timeout(&ia->ri_done,
453                                 msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1);
454         rc = ia->ri_async_rc;
455         if (rc)
456                 goto out;
457
458         ia->ri_async_rc = -ETIMEDOUT;
459         rc = rdma_resolve_route(id, RDMA_RESOLVE_TIMEOUT);
460         if (rc) {
461                 dprintk("RPC:       %s: rdma_resolve_route() failed %i\n",
462                         __func__, rc);
463                 goto out;
464         }
465         wait_for_completion_interruptible_timeout(&ia->ri_done,
466                                 msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1);
467         rc = ia->ri_async_rc;
468         if (rc)
469                 goto out;
470
471         return id;
472
473 out:
474         rdma_destroy_id(id);
475         return ERR_PTR(rc);
476 }
477
478 /*
479  * Drain any cq, prior to teardown.
480  */
481 static void
482 rpcrdma_clean_cq(struct ib_cq *cq)
483 {
484         struct ib_wc wc;
485         int count = 0;
486
487         while (1 == ib_poll_cq(cq, 1, &wc))
488                 ++count;
489
490         if (count)
491                 dprintk("RPC:       %s: flushed %d events (last 0x%x)\n",
492                         __func__, count, wc.opcode);
493 }
494
495 /*
496  * Exported functions.
497  */
498
499 /*
500  * Open and initialize an Interface Adapter.
501  *  o initializes fields of struct rpcrdma_ia, including
502  *    interface and provider attributes and protection zone.
503  */
504 int
505 rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)
506 {
507         int rc, mem_priv;
508         struct ib_device_attr devattr;
509         struct rpcrdma_ia *ia = &xprt->rx_ia;
510
511         ia->ri_id = rpcrdma_create_id(xprt, ia, addr);
512         if (IS_ERR(ia->ri_id)) {
513                 rc = PTR_ERR(ia->ri_id);
514                 goto out1;
515         }
516
517         ia->ri_pd = ib_alloc_pd(ia->ri_id->device);
518         if (IS_ERR(ia->ri_pd)) {
519                 rc = PTR_ERR(ia->ri_pd);
520                 dprintk("RPC:       %s: ib_alloc_pd() failed %i\n",
521                         __func__, rc);
522                 goto out2;
523         }
524
525         /*
526          * Query the device to determine if the requested memory
527          * registration strategy is supported. If it isn't, set the
528          * strategy to a globally supported model.
529          */
530         rc = ib_query_device(ia->ri_id->device, &devattr);
531         if (rc) {
532                 dprintk("RPC:       %s: ib_query_device failed %d\n",
533                         __func__, rc);
534                 goto out2;
535         }
536
537         if (devattr.device_cap_flags & IB_DEVICE_LOCAL_DMA_LKEY) {
538                 ia->ri_have_dma_lkey = 1;
539                 ia->ri_dma_lkey = ia->ri_id->device->local_dma_lkey;
540         }
541
542         if (memreg == RPCRDMA_FRMR) {
543                 /* Requires both frmr reg and local dma lkey */
544                 if ((devattr.device_cap_flags &
545                      (IB_DEVICE_MEM_MGT_EXTENSIONS|IB_DEVICE_LOCAL_DMA_LKEY)) !=
546                     (IB_DEVICE_MEM_MGT_EXTENSIONS|IB_DEVICE_LOCAL_DMA_LKEY)) {
547                         dprintk("RPC:       %s: FRMR registration "
548                                 "not supported by HCA\n", __func__);
549                         memreg = RPCRDMA_MTHCAFMR;
550                 } else {
551                         /* Mind the ia limit on FRMR page list depth */
552                         ia->ri_max_frmr_depth = min_t(unsigned int,
553                                 RPCRDMA_MAX_DATA_SEGS,
554                                 devattr.max_fast_reg_page_list_len);
555                 }
556         }
557         if (memreg == RPCRDMA_MTHCAFMR) {
558                 if (!ia->ri_id->device->alloc_fmr) {
559                         dprintk("RPC:       %s: MTHCAFMR registration "
560                                 "not supported by HCA\n", __func__);
561 #if RPCRDMA_PERSISTENT_REGISTRATION
562                         memreg = RPCRDMA_ALLPHYSICAL;
563 #else
564                         rc = -ENOMEM;
565                         goto out2;
566 #endif
567                 }
568         }
569
570         /*
571          * Optionally obtain an underlying physical identity mapping in
572          * order to do a memory window-based bind. This base registration
573          * is protected from remote access - that is enabled only by binding
574          * for the specific bytes targeted during each RPC operation, and
575          * revoked after the corresponding completion similar to a storage
576          * adapter.
577          */
578         switch (memreg) {
579         case RPCRDMA_FRMR:
580                 break;
581 #if RPCRDMA_PERSISTENT_REGISTRATION
582         case RPCRDMA_ALLPHYSICAL:
583                 mem_priv = IB_ACCESS_LOCAL_WRITE |
584                                 IB_ACCESS_REMOTE_WRITE |
585                                 IB_ACCESS_REMOTE_READ;
586                 goto register_setup;
587 #endif
588         case RPCRDMA_MTHCAFMR:
589                 if (ia->ri_have_dma_lkey)
590                         break;
591                 mem_priv = IB_ACCESS_LOCAL_WRITE;
592 #if RPCRDMA_PERSISTENT_REGISTRATION
593         register_setup:
594 #endif
595                 ia->ri_bind_mem = ib_get_dma_mr(ia->ri_pd, mem_priv);
596                 if (IS_ERR(ia->ri_bind_mem)) {
597                         printk(KERN_ALERT "%s: ib_get_dma_mr for "
598                                 "phys register failed with %lX\n",
599                                 __func__, PTR_ERR(ia->ri_bind_mem));
600                         rc = -ENOMEM;
601                         goto out2;
602                 }
603                 break;
604         default:
605                 printk(KERN_ERR "RPC: Unsupported memory "
606                                 "registration mode: %d\n", memreg);
607                 rc = -ENOMEM;
608                 goto out2;
609         }
610         dprintk("RPC:       %s: memory registration strategy is %d\n",
611                 __func__, memreg);
612
613         /* Else will do memory reg/dereg for each chunk */
614         ia->ri_memreg_strategy = memreg;
615
616         return 0;
617 out2:
618         rdma_destroy_id(ia->ri_id);
619         ia->ri_id = NULL;
620 out1:
621         return rc;
622 }
623
624 /*
625  * Clean up/close an IA.
626  *   o if event handles and PD have been initialized, free them.
627  *   o close the IA
628  */
629 void
630 rpcrdma_ia_close(struct rpcrdma_ia *ia)
631 {
632         int rc;
633
634         dprintk("RPC:       %s: entering\n", __func__);
635         if (ia->ri_bind_mem != NULL) {
636                 rc = ib_dereg_mr(ia->ri_bind_mem);
637                 dprintk("RPC:       %s: ib_dereg_mr returned %i\n",
638                         __func__, rc);
639         }
640         if (ia->ri_id != NULL && !IS_ERR(ia->ri_id)) {
641                 if (ia->ri_id->qp)
642                         rdma_destroy_qp(ia->ri_id);
643                 rdma_destroy_id(ia->ri_id);
644                 ia->ri_id = NULL;
645         }
646         if (ia->ri_pd != NULL && !IS_ERR(ia->ri_pd)) {
647                 rc = ib_dealloc_pd(ia->ri_pd);
648                 dprintk("RPC:       %s: ib_dealloc_pd returned %i\n",
649                         __func__, rc);
650         }
651 }
652
653 /*
654  * Create unconnected endpoint.
655  */
656 int
657 rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
658                                 struct rpcrdma_create_data_internal *cdata)
659 {
660         struct ib_device_attr devattr;
661         struct ib_cq *sendcq, *recvcq;
662         int rc, err;
663
664         rc = ib_query_device(ia->ri_id->device, &devattr);
665         if (rc) {
666                 dprintk("RPC:       %s: ib_query_device failed %d\n",
667                         __func__, rc);
668                 return rc;
669         }
670
671         /* check provider's send/recv wr limits */
672         if (cdata->max_requests > devattr.max_qp_wr)
673                 cdata->max_requests = devattr.max_qp_wr;
674
675         ep->rep_attr.event_handler = rpcrdma_qp_async_error_upcall;
676         ep->rep_attr.qp_context = ep;
677         /* send_cq and recv_cq initialized below */
678         ep->rep_attr.srq = NULL;
679         ep->rep_attr.cap.max_send_wr = cdata->max_requests;
680         switch (ia->ri_memreg_strategy) {
681         case RPCRDMA_FRMR: {
682                 int depth = 7;
683
684                 /* Add room for frmr register and invalidate WRs.
685                  * 1. FRMR reg WR for head
686                  * 2. FRMR invalidate WR for head
687                  * 3. N FRMR reg WRs for pagelist
688                  * 4. N FRMR invalidate WRs for pagelist
689                  * 5. FRMR reg WR for tail
690                  * 6. FRMR invalidate WR for tail
691                  * 7. The RDMA_SEND WR
692                  */
693
694                 /* Calculate N if the device max FRMR depth is smaller than
695                  * RPCRDMA_MAX_DATA_SEGS.
696                  */
697                 if (ia->ri_max_frmr_depth < RPCRDMA_MAX_DATA_SEGS) {
698                         int delta = RPCRDMA_MAX_DATA_SEGS -
699                                     ia->ri_max_frmr_depth;
700
701                         do {
702                                 depth += 2; /* FRMR reg + invalidate */
703                                 delta -= ia->ri_max_frmr_depth;
704                         } while (delta > 0);
705
706                 }
707                 ep->rep_attr.cap.max_send_wr *= depth;
708                 if (ep->rep_attr.cap.max_send_wr > devattr.max_qp_wr) {
709                         cdata->max_requests = devattr.max_qp_wr / depth;
710                         if (!cdata->max_requests)
711                                 return -EINVAL;
712                         ep->rep_attr.cap.max_send_wr = cdata->max_requests *
713                                                        depth;
714                 }
715                 break;
716         }
717         default:
718                 break;
719         }
720         ep->rep_attr.cap.max_recv_wr = cdata->max_requests;
721         ep->rep_attr.cap.max_send_sge = (cdata->padding ? 4 : 2);
722         ep->rep_attr.cap.max_recv_sge = 1;
723         ep->rep_attr.cap.max_inline_data = 0;
724         ep->rep_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
725         ep->rep_attr.qp_type = IB_QPT_RC;
726         ep->rep_attr.port_num = ~0;
727
728         dprintk("RPC:       %s: requested max: dtos: send %d recv %d; "
729                 "iovs: send %d recv %d\n",
730                 __func__,
731                 ep->rep_attr.cap.max_send_wr,
732                 ep->rep_attr.cap.max_recv_wr,
733                 ep->rep_attr.cap.max_send_sge,
734                 ep->rep_attr.cap.max_recv_sge);
735
736         /* set trigger for requesting send completion */
737         ep->rep_cqinit = ep->rep_attr.cap.max_send_wr/2 - 1;
738         if (ep->rep_cqinit <= 2)
739                 ep->rep_cqinit = 0;
740         INIT_CQCOUNT(ep);
741         ep->rep_ia = ia;
742         init_waitqueue_head(&ep->rep_connect_wait);
743         INIT_DELAYED_WORK(&ep->rep_connect_worker, rpcrdma_connect_worker);
744
745         sendcq = ib_create_cq(ia->ri_id->device, rpcrdma_sendcq_upcall,
746                                   rpcrdma_cq_async_error_upcall, ep,
747                                   ep->rep_attr.cap.max_send_wr + 1, 0);
748         if (IS_ERR(sendcq)) {
749                 rc = PTR_ERR(sendcq);
750                 dprintk("RPC:       %s: failed to create send CQ: %i\n",
751                         __func__, rc);
752                 goto out1;
753         }
754
755         rc = ib_req_notify_cq(sendcq, IB_CQ_NEXT_COMP);
756         if (rc) {
757                 dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
758                         __func__, rc);
759                 goto out2;
760         }
761
762         recvcq = ib_create_cq(ia->ri_id->device, rpcrdma_recvcq_upcall,
763                                   rpcrdma_cq_async_error_upcall, ep,
764                                   ep->rep_attr.cap.max_recv_wr + 1, 0);
765         if (IS_ERR(recvcq)) {
766                 rc = PTR_ERR(recvcq);
767                 dprintk("RPC:       %s: failed to create recv CQ: %i\n",
768                         __func__, rc);
769                 goto out2;
770         }
771
772         rc = ib_req_notify_cq(recvcq, IB_CQ_NEXT_COMP);
773         if (rc) {
774                 dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
775                         __func__, rc);
776                 ib_destroy_cq(recvcq);
777                 goto out2;
778         }
779
780         ep->rep_attr.send_cq = sendcq;
781         ep->rep_attr.recv_cq = recvcq;
782
783         /* Initialize cma parameters */
784
785         /* RPC/RDMA does not use private data */
786         ep->rep_remote_cma.private_data = NULL;
787         ep->rep_remote_cma.private_data_len = 0;
788
789         /* Client offers RDMA Read but does not initiate */
790         ep->rep_remote_cma.initiator_depth = 0;
791         if (devattr.max_qp_rd_atom > 32)        /* arbitrary but <= 255 */
792                 ep->rep_remote_cma.responder_resources = 32;
793         else
794                 ep->rep_remote_cma.responder_resources = devattr.max_qp_rd_atom;
795
796         ep->rep_remote_cma.retry_count = 7;
797         ep->rep_remote_cma.flow_control = 0;
798         ep->rep_remote_cma.rnr_retry_count = 0;
799
800         return 0;
801
802 out2:
803         err = ib_destroy_cq(sendcq);
804         if (err)
805                 dprintk("RPC:       %s: ib_destroy_cq returned %i\n",
806                         __func__, err);
807 out1:
808         return rc;
809 }
810
811 /*
812  * rpcrdma_ep_destroy
813  *
814  * Disconnect and destroy endpoint. After this, the only
815  * valid operations on the ep are to free it (if dynamically
816  * allocated) or re-create it.
817  */
818 void
819 rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
820 {
821         int rc;
822
823         dprintk("RPC:       %s: entering, connected is %d\n",
824                 __func__, ep->rep_connected);
825
826         cancel_delayed_work_sync(&ep->rep_connect_worker);
827
828         if (ia->ri_id->qp) {
829                 rc = rpcrdma_ep_disconnect(ep, ia);
830                 if (rc)
831                         dprintk("RPC:       %s: rpcrdma_ep_disconnect"
832                                 " returned %i\n", __func__, rc);
833                 rdma_destroy_qp(ia->ri_id);
834                 ia->ri_id->qp = NULL;
835         }
836
837         /* padding - could be done in rpcrdma_buffer_destroy... */
838         if (ep->rep_pad_mr) {
839                 rpcrdma_deregister_internal(ia, ep->rep_pad_mr, &ep->rep_pad);
840                 ep->rep_pad_mr = NULL;
841         }
842
843         rpcrdma_clean_cq(ep->rep_attr.recv_cq);
844         rc = ib_destroy_cq(ep->rep_attr.recv_cq);
845         if (rc)
846                 dprintk("RPC:       %s: ib_destroy_cq returned %i\n",
847                         __func__, rc);
848
849         rpcrdma_clean_cq(ep->rep_attr.send_cq);
850         rc = ib_destroy_cq(ep->rep_attr.send_cq);
851         if (rc)
852                 dprintk("RPC:       %s: ib_destroy_cq returned %i\n",
853                         __func__, rc);
854 }
855
856 /*
857  * Connect unconnected endpoint.
858  */
859 int
860 rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
861 {
862         struct rdma_cm_id *id;
863         int rc = 0;
864         int retry_count = 0;
865
866         if (ep->rep_connected != 0) {
867                 struct rpcrdma_xprt *xprt;
868 retry:
869                 dprintk("RPC:       %s: reconnecting...\n", __func__);
870                 rc = rpcrdma_ep_disconnect(ep, ia);
871                 if (rc && rc != -ENOTCONN)
872                         dprintk("RPC:       %s: rpcrdma_ep_disconnect"
873                                 " status %i\n", __func__, rc);
874
875                 rpcrdma_clean_cq(ep->rep_attr.recv_cq);
876                 rpcrdma_clean_cq(ep->rep_attr.send_cq);
877
878                 xprt = container_of(ia, struct rpcrdma_xprt, rx_ia);
879                 id = rpcrdma_create_id(xprt, ia,
880                                 (struct sockaddr *)&xprt->rx_data.addr);
881                 if (IS_ERR(id)) {
882                         rc = -EHOSTUNREACH;
883                         goto out;
884                 }
885                 /* TEMP TEMP TEMP - fail if new device:
886                  * Deregister/remarshal *all* requests!
887                  * Close and recreate adapter, pd, etc!
888                  * Re-determine all attributes still sane!
889                  * More stuff I haven't thought of!
890                  * Rrrgh!
891                  */
892                 if (ia->ri_id->device != id->device) {
893                         printk("RPC:       %s: can't reconnect on "
894                                 "different device!\n", __func__);
895                         rdma_destroy_id(id);
896                         rc = -ENETUNREACH;
897                         goto out;
898                 }
899                 /* END TEMP */
900                 rc = rdma_create_qp(id, ia->ri_pd, &ep->rep_attr);
901                 if (rc) {
902                         dprintk("RPC:       %s: rdma_create_qp failed %i\n",
903                                 __func__, rc);
904                         rdma_destroy_id(id);
905                         rc = -ENETUNREACH;
906                         goto out;
907                 }
908                 rdma_destroy_qp(ia->ri_id);
909                 rdma_destroy_id(ia->ri_id);
910                 ia->ri_id = id;
911         } else {
912                 dprintk("RPC:       %s: connecting...\n", __func__);
913                 rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr);
914                 if (rc) {
915                         dprintk("RPC:       %s: rdma_create_qp failed %i\n",
916                                 __func__, rc);
917                         /* do not update ep->rep_connected */
918                         return -ENETUNREACH;
919                 }
920         }
921
922         ep->rep_connected = 0;
923
924         rc = rdma_connect(ia->ri_id, &ep->rep_remote_cma);
925         if (rc) {
926                 dprintk("RPC:       %s: rdma_connect() failed with %i\n",
927                                 __func__, rc);
928                 goto out;
929         }
930
931         wait_event_interruptible(ep->rep_connect_wait, ep->rep_connected != 0);
932
933         /*
934          * Check state. A non-peer reject indicates no listener
935          * (ECONNREFUSED), which may be a transient state. All
936          * others indicate a transport condition which has already
937          * undergone a best-effort.
938          */
939         if (ep->rep_connected == -ECONNREFUSED &&
940             ++retry_count <= RDMA_CONNECT_RETRY_MAX) {
941                 dprintk("RPC:       %s: non-peer_reject, retry\n", __func__);
942                 goto retry;
943         }
944         if (ep->rep_connected <= 0) {
945                 /* Sometimes, the only way to reliably connect to remote
946                  * CMs is to use same nonzero values for ORD and IRD. */
947                 if (retry_count++ <= RDMA_CONNECT_RETRY_MAX + 1 &&
948                     (ep->rep_remote_cma.responder_resources == 0 ||
949                      ep->rep_remote_cma.initiator_depth !=
950                                 ep->rep_remote_cma.responder_resources)) {
951                         if (ep->rep_remote_cma.responder_resources == 0)
952                                 ep->rep_remote_cma.responder_resources = 1;
953                         ep->rep_remote_cma.initiator_depth =
954                                 ep->rep_remote_cma.responder_resources;
955                         goto retry;
956                 }
957                 rc = ep->rep_connected;
958         } else {
959                 dprintk("RPC:       %s: connected\n", __func__);
960         }
961
962 out:
963         if (rc)
964                 ep->rep_connected = rc;
965         return rc;
966 }
967
968 /*
969  * rpcrdma_ep_disconnect
970  *
971  * This is separate from destroy to facilitate the ability
972  * to reconnect without recreating the endpoint.
973  *
974  * This call is not reentrant, and must not be made in parallel
975  * on the same endpoint.
976  */
977 int
978 rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
979 {
980         int rc;
981
982         rpcrdma_clean_cq(ep->rep_attr.recv_cq);
983         rpcrdma_clean_cq(ep->rep_attr.send_cq);
984         rc = rdma_disconnect(ia->ri_id);
985         if (!rc) {
986                 /* returns without wait if not connected */
987                 wait_event_interruptible(ep->rep_connect_wait,
988                                                         ep->rep_connected != 1);
989                 dprintk("RPC:       %s: after wait, %sconnected\n", __func__,
990                         (ep->rep_connected == 1) ? "still " : "dis");
991         } else {
992                 dprintk("RPC:       %s: rdma_disconnect %i\n", __func__, rc);
993                 ep->rep_connected = rc;
994         }
995         return rc;
996 }
997
998 /*
999  * Initialize buffer memory
1000  */
1001 int
1002 rpcrdma_buffer_create(struct rpcrdma_buffer *buf, struct rpcrdma_ep *ep,
1003         struct rpcrdma_ia *ia, struct rpcrdma_create_data_internal *cdata)
1004 {
1005         char *p;
1006         size_t len, rlen, wlen;
1007         int i, rc;
1008         struct rpcrdma_mw *r;
1009
1010         buf->rb_max_requests = cdata->max_requests;
1011         spin_lock_init(&buf->rb_lock);
1012         atomic_set(&buf->rb_credits, 1);
1013
1014         /* Need to allocate:
1015          *   1.  arrays for send and recv pointers
1016          *   2.  arrays of struct rpcrdma_req to fill in pointers
1017          *   3.  array of struct rpcrdma_rep for replies
1018          *   4.  padding, if any
1019          *   5.  mw's, fmr's or frmr's, if any
1020          * Send/recv buffers in req/rep need to be registered
1021          */
1022
1023         len = buf->rb_max_requests *
1024                 (sizeof(struct rpcrdma_req *) + sizeof(struct rpcrdma_rep *));
1025         len += cdata->padding;
1026         switch (ia->ri_memreg_strategy) {
1027         case RPCRDMA_FRMR:
1028                 len += buf->rb_max_requests * RPCRDMA_MAX_SEGS *
1029                                 sizeof(struct rpcrdma_mw);
1030                 break;
1031         case RPCRDMA_MTHCAFMR:
1032                 /* TBD we are perhaps overallocating here */
1033                 len += (buf->rb_max_requests + 1) * RPCRDMA_MAX_SEGS *
1034                                 sizeof(struct rpcrdma_mw);
1035                 break;
1036         default:
1037                 break;
1038         }
1039
1040         /* allocate 1, 4 and 5 in one shot */
1041         p = kzalloc(len, GFP_KERNEL);
1042         if (p == NULL) {
1043                 dprintk("RPC:       %s: req_t/rep_t/pad kzalloc(%zd) failed\n",
1044                         __func__, len);
1045                 rc = -ENOMEM;
1046                 goto out;
1047         }
1048         buf->rb_pool = p;       /* for freeing it later */
1049
1050         buf->rb_send_bufs = (struct rpcrdma_req **) p;
1051         p = (char *) &buf->rb_send_bufs[buf->rb_max_requests];
1052         buf->rb_recv_bufs = (struct rpcrdma_rep **) p;
1053         p = (char *) &buf->rb_recv_bufs[buf->rb_max_requests];
1054
1055         /*
1056          * Register the zeroed pad buffer, if any.
1057          */
1058         if (cdata->padding) {
1059                 rc = rpcrdma_register_internal(ia, p, cdata->padding,
1060                                             &ep->rep_pad_mr, &ep->rep_pad);
1061                 if (rc)
1062                         goto out;
1063         }
1064         p += cdata->padding;
1065
1066         INIT_LIST_HEAD(&buf->rb_mws);
1067         r = (struct rpcrdma_mw *)p;
1068         switch (ia->ri_memreg_strategy) {
1069         case RPCRDMA_FRMR:
1070                 for (i = buf->rb_max_requests * RPCRDMA_MAX_SEGS; i; i--) {
1071                         r->r.frmr.fr_mr = ib_alloc_fast_reg_mr(ia->ri_pd,
1072                                                 ia->ri_max_frmr_depth);
1073                         if (IS_ERR(r->r.frmr.fr_mr)) {
1074                                 rc = PTR_ERR(r->r.frmr.fr_mr);
1075                                 dprintk("RPC:       %s: ib_alloc_fast_reg_mr"
1076                                         " failed %i\n", __func__, rc);
1077                                 goto out;
1078                         }
1079                         r->r.frmr.fr_pgl = ib_alloc_fast_reg_page_list(
1080                                                 ia->ri_id->device,
1081                                                 ia->ri_max_frmr_depth);
1082                         if (IS_ERR(r->r.frmr.fr_pgl)) {
1083                                 rc = PTR_ERR(r->r.frmr.fr_pgl);
1084                                 dprintk("RPC:       %s: "
1085                                         "ib_alloc_fast_reg_page_list "
1086                                         "failed %i\n", __func__, rc);
1087
1088                                 ib_dereg_mr(r->r.frmr.fr_mr);
1089                                 goto out;
1090                         }
1091                         list_add(&r->mw_list, &buf->rb_mws);
1092                         ++r;
1093                 }
1094                 break;
1095         case RPCRDMA_MTHCAFMR:
1096                 /* TBD we are perhaps overallocating here */
1097                 for (i = (buf->rb_max_requests+1) * RPCRDMA_MAX_SEGS; i; i--) {
1098                         static struct ib_fmr_attr fa =
1099                                 { RPCRDMA_MAX_DATA_SEGS, 1, PAGE_SHIFT };
1100                         r->r.fmr = ib_alloc_fmr(ia->ri_pd,
1101                                 IB_ACCESS_REMOTE_WRITE | IB_ACCESS_REMOTE_READ,
1102                                 &fa);
1103                         if (IS_ERR(r->r.fmr)) {
1104                                 rc = PTR_ERR(r->r.fmr);
1105                                 dprintk("RPC:       %s: ib_alloc_fmr"
1106                                         " failed %i\n", __func__, rc);
1107                                 goto out;
1108                         }
1109                         list_add(&r->mw_list, &buf->rb_mws);
1110                         ++r;
1111                 }
1112                 break;
1113         default:
1114                 break;
1115         }
1116
1117         /*
1118          * Allocate/init the request/reply buffers. Doing this
1119          * using kmalloc for now -- one for each buf.
1120          */
1121         wlen = 1 << fls(cdata->inline_wsize + sizeof(struct rpcrdma_req));
1122         rlen = 1 << fls(cdata->inline_rsize + sizeof(struct rpcrdma_rep));
1123         dprintk("RPC:       %s: wlen = %zu, rlen = %zu\n",
1124                 __func__, wlen, rlen);
1125
1126         for (i = 0; i < buf->rb_max_requests; i++) {
1127                 struct rpcrdma_req *req;
1128                 struct rpcrdma_rep *rep;
1129
1130                 req = kmalloc(wlen, GFP_KERNEL);
1131                 if (req == NULL) {
1132                         dprintk("RPC:       %s: request buffer %d alloc"
1133                                 " failed\n", __func__, i);
1134                         rc = -ENOMEM;
1135                         goto out;
1136                 }
1137                 memset(req, 0, sizeof(struct rpcrdma_req));
1138                 buf->rb_send_bufs[i] = req;
1139                 buf->rb_send_bufs[i]->rl_buffer = buf;
1140
1141                 rc = rpcrdma_register_internal(ia, req->rl_base,
1142                                 wlen - offsetof(struct rpcrdma_req, rl_base),
1143                                 &buf->rb_send_bufs[i]->rl_handle,
1144                                 &buf->rb_send_bufs[i]->rl_iov);
1145                 if (rc)
1146                         goto out;
1147
1148                 buf->rb_send_bufs[i]->rl_size = wlen -
1149                                                 sizeof(struct rpcrdma_req);
1150
1151                 rep = kmalloc(rlen, GFP_KERNEL);
1152                 if (rep == NULL) {
1153                         dprintk("RPC:       %s: reply buffer %d alloc failed\n",
1154                                 __func__, i);
1155                         rc = -ENOMEM;
1156                         goto out;
1157                 }
1158                 memset(rep, 0, sizeof(struct rpcrdma_rep));
1159                 buf->rb_recv_bufs[i] = rep;
1160                 buf->rb_recv_bufs[i]->rr_buffer = buf;
1161
1162                 rc = rpcrdma_register_internal(ia, rep->rr_base,
1163                                 rlen - offsetof(struct rpcrdma_rep, rr_base),
1164                                 &buf->rb_recv_bufs[i]->rr_handle,
1165                                 &buf->rb_recv_bufs[i]->rr_iov);
1166                 if (rc)
1167                         goto out;
1168
1169         }
1170         dprintk("RPC:       %s: max_requests %d\n",
1171                 __func__, buf->rb_max_requests);
1172         /* done */
1173         return 0;
1174 out:
1175         rpcrdma_buffer_destroy(buf);
1176         return rc;
1177 }
1178
1179 /*
1180  * Unregister and destroy buffer memory. Need to deal with
1181  * partial initialization, so it's callable from failed create.
1182  * Must be called before destroying endpoint, as registrations
1183  * reference it.
1184  */
1185 void
1186 rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
1187 {
1188         int rc, i;
1189         struct rpcrdma_ia *ia = rdmab_to_ia(buf);
1190         struct rpcrdma_mw *r;
1191
1192         /* clean up in reverse order from create
1193          *   1.  recv mr memory (mr free, then kfree)
1194          *   2.  send mr memory (mr free, then kfree)
1195          *   3.  padding (if any) [moved to rpcrdma_ep_destroy]
1196          *   4.  arrays
1197          */
1198         dprintk("RPC:       %s: entering\n", __func__);
1199
1200         for (i = 0; i < buf->rb_max_requests; i++) {
1201                 if (buf->rb_recv_bufs && buf->rb_recv_bufs[i]) {
1202                         rpcrdma_deregister_internal(ia,
1203                                         buf->rb_recv_bufs[i]->rr_handle,
1204                                         &buf->rb_recv_bufs[i]->rr_iov);
1205                         kfree(buf->rb_recv_bufs[i]);
1206                 }
1207                 if (buf->rb_send_bufs && buf->rb_send_bufs[i]) {
1208                         rpcrdma_deregister_internal(ia,
1209                                         buf->rb_send_bufs[i]->rl_handle,
1210                                         &buf->rb_send_bufs[i]->rl_iov);
1211                         kfree(buf->rb_send_bufs[i]);
1212                 }
1213         }
1214
1215         while (!list_empty(&buf->rb_mws)) {
1216                 r = list_entry(buf->rb_mws.next,
1217                         struct rpcrdma_mw, mw_list);
1218                 list_del(&r->mw_list);
1219                 switch (ia->ri_memreg_strategy) {
1220                 case RPCRDMA_FRMR:
1221                         rc = ib_dereg_mr(r->r.frmr.fr_mr);
1222                         if (rc)
1223                                 dprintk("RPC:       %s:"
1224                                         " ib_dereg_mr"
1225                                         " failed %i\n",
1226                                         __func__, rc);
1227                         ib_free_fast_reg_page_list(r->r.frmr.fr_pgl);
1228                         break;
1229                 case RPCRDMA_MTHCAFMR:
1230                         rc = ib_dealloc_fmr(r->r.fmr);
1231                         if (rc)
1232                                 dprintk("RPC:       %s:"
1233                                         " ib_dealloc_fmr"
1234                                         " failed %i\n",
1235                                         __func__, rc);
1236                         break;
1237                 default:
1238                         break;
1239                 }
1240         }
1241
1242         kfree(buf->rb_pool);
1243 }
1244
1245 /*
1246  * Get a set of request/reply buffers.
1247  *
1248  * Reply buffer (if needed) is attached to send buffer upon return.
1249  * Rule:
1250  *    rb_send_index and rb_recv_index MUST always be pointing to the
1251  *    *next* available buffer (non-NULL). They are incremented after
1252  *    removing buffers, and decremented *before* returning them.
1253  */
1254 struct rpcrdma_req *
1255 rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
1256 {
1257         struct rpcrdma_req *req;
1258         unsigned long flags;
1259         int i;
1260         struct rpcrdma_mw *r;
1261
1262         spin_lock_irqsave(&buffers->rb_lock, flags);
1263         if (buffers->rb_send_index == buffers->rb_max_requests) {
1264                 spin_unlock_irqrestore(&buffers->rb_lock, flags);
1265                 dprintk("RPC:       %s: out of request buffers\n", __func__);
1266                 return ((struct rpcrdma_req *)NULL);
1267         }
1268
1269         req = buffers->rb_send_bufs[buffers->rb_send_index];
1270         if (buffers->rb_send_index < buffers->rb_recv_index) {
1271                 dprintk("RPC:       %s: %d extra receives outstanding (ok)\n",
1272                         __func__,
1273                         buffers->rb_recv_index - buffers->rb_send_index);
1274                 req->rl_reply = NULL;
1275         } else {
1276                 req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
1277                 buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
1278         }
1279         buffers->rb_send_bufs[buffers->rb_send_index++] = NULL;
1280         if (!list_empty(&buffers->rb_mws)) {
1281                 i = RPCRDMA_MAX_SEGS - 1;
1282                 do {
1283                         r = list_entry(buffers->rb_mws.next,
1284                                         struct rpcrdma_mw, mw_list);
1285                         list_del(&r->mw_list);
1286                         req->rl_segments[i].mr_chunk.rl_mw = r;
1287                 } while (--i >= 0);
1288         }
1289         spin_unlock_irqrestore(&buffers->rb_lock, flags);
1290         return req;
1291 }
1292
1293 /*
1294  * Put request/reply buffers back into pool.
1295  * Pre-decrement counter/array index.
1296  */
1297 void
1298 rpcrdma_buffer_put(struct rpcrdma_req *req)
1299 {
1300         struct rpcrdma_buffer *buffers = req->rl_buffer;
1301         struct rpcrdma_ia *ia = rdmab_to_ia(buffers);
1302         int i;
1303         unsigned long flags;
1304
1305         spin_lock_irqsave(&buffers->rb_lock, flags);
1306         buffers->rb_send_bufs[--buffers->rb_send_index] = req;
1307         req->rl_niovs = 0;
1308         if (req->rl_reply) {
1309                 buffers->rb_recv_bufs[--buffers->rb_recv_index] = req->rl_reply;
1310                 req->rl_reply->rr_func = NULL;
1311                 req->rl_reply = NULL;
1312         }
1313         switch (ia->ri_memreg_strategy) {
1314         case RPCRDMA_FRMR:
1315         case RPCRDMA_MTHCAFMR:
1316                 /*
1317                  * Cycle mw's back in reverse order, and "spin" them.
1318                  * This delays and scrambles reuse as much as possible.
1319                  */
1320                 i = 1;
1321                 do {
1322                         struct rpcrdma_mw **mw;
1323                         mw = &req->rl_segments[i].mr_chunk.rl_mw;
1324                         list_add_tail(&(*mw)->mw_list, &buffers->rb_mws);
1325                         *mw = NULL;
1326                 } while (++i < RPCRDMA_MAX_SEGS);
1327                 list_add_tail(&req->rl_segments[0].mr_chunk.rl_mw->mw_list,
1328                                         &buffers->rb_mws);
1329                 req->rl_segments[0].mr_chunk.rl_mw = NULL;
1330                 break;
1331         default:
1332                 break;
1333         }
1334         spin_unlock_irqrestore(&buffers->rb_lock, flags);
1335 }
1336
1337 /*
1338  * Recover reply buffers from pool.
1339  * This happens when recovering from error conditions.
1340  * Post-increment counter/array index.
1341  */
1342 void
1343 rpcrdma_recv_buffer_get(struct rpcrdma_req *req)
1344 {
1345         struct rpcrdma_buffer *buffers = req->rl_buffer;
1346         unsigned long flags;
1347
1348         if (req->rl_iov.length == 0)    /* special case xprt_rdma_allocate() */
1349                 buffers = ((struct rpcrdma_req *) buffers)->rl_buffer;
1350         spin_lock_irqsave(&buffers->rb_lock, flags);
1351         if (buffers->rb_recv_index < buffers->rb_max_requests) {
1352                 req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
1353                 buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
1354         }
1355         spin_unlock_irqrestore(&buffers->rb_lock, flags);
1356 }
1357
1358 /*
1359  * Put reply buffers back into pool when not attached to
1360  * request. This happens in error conditions.
1361  */
1362 void
1363 rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
1364 {
1365         struct rpcrdma_buffer *buffers = rep->rr_buffer;
1366         unsigned long flags;
1367
1368         rep->rr_func = NULL;
1369         spin_lock_irqsave(&buffers->rb_lock, flags);
1370         buffers->rb_recv_bufs[--buffers->rb_recv_index] = rep;
1371         spin_unlock_irqrestore(&buffers->rb_lock, flags);
1372 }
1373
1374 /*
1375  * Wrappers for internal-use kmalloc memory registration, used by buffer code.
1376  */
1377
1378 int
1379 rpcrdma_register_internal(struct rpcrdma_ia *ia, void *va, int len,
1380                                 struct ib_mr **mrp, struct ib_sge *iov)
1381 {
1382         struct ib_phys_buf ipb;
1383         struct ib_mr *mr;
1384         int rc;
1385
1386         /*
1387          * All memory passed here was kmalloc'ed, therefore phys-contiguous.
1388          */
1389         iov->addr = ib_dma_map_single(ia->ri_id->device,
1390                         va, len, DMA_BIDIRECTIONAL);
1391         iov->length = len;
1392
1393         if (ia->ri_have_dma_lkey) {
1394                 *mrp = NULL;
1395                 iov->lkey = ia->ri_dma_lkey;
1396                 return 0;
1397         } else if (ia->ri_bind_mem != NULL) {
1398                 *mrp = NULL;
1399                 iov->lkey = ia->ri_bind_mem->lkey;
1400                 return 0;
1401         }
1402
1403         ipb.addr = iov->addr;
1404         ipb.size = iov->length;
1405         mr = ib_reg_phys_mr(ia->ri_pd, &ipb, 1,
1406                         IB_ACCESS_LOCAL_WRITE, &iov->addr);
1407
1408         dprintk("RPC:       %s: phys convert: 0x%llx "
1409                         "registered 0x%llx length %d\n",
1410                         __func__, (unsigned long long)ipb.addr,
1411                         (unsigned long long)iov->addr, len);
1412
1413         if (IS_ERR(mr)) {
1414                 *mrp = NULL;
1415                 rc = PTR_ERR(mr);
1416                 dprintk("RPC:       %s: failed with %i\n", __func__, rc);
1417         } else {
1418                 *mrp = mr;
1419                 iov->lkey = mr->lkey;
1420                 rc = 0;
1421         }
1422
1423         return rc;
1424 }
1425
1426 int
1427 rpcrdma_deregister_internal(struct rpcrdma_ia *ia,
1428                                 struct ib_mr *mr, struct ib_sge *iov)
1429 {
1430         int rc;
1431
1432         ib_dma_unmap_single(ia->ri_id->device,
1433                         iov->addr, iov->length, DMA_BIDIRECTIONAL);
1434
1435         if (NULL == mr)
1436                 return 0;
1437
1438         rc = ib_dereg_mr(mr);
1439         if (rc)
1440                 dprintk("RPC:       %s: ib_dereg_mr failed %i\n", __func__, rc);
1441         return rc;
1442 }
1443
1444 /*
1445  * Wrappers for chunk registration, shared by read/write chunk code.
1446  */
1447
1448 static void
1449 rpcrdma_map_one(struct rpcrdma_ia *ia, struct rpcrdma_mr_seg *seg, int writing)
1450 {
1451         seg->mr_dir = writing ? DMA_FROM_DEVICE : DMA_TO_DEVICE;
1452         seg->mr_dmalen = seg->mr_len;
1453         if (seg->mr_page)
1454                 seg->mr_dma = ib_dma_map_page(ia->ri_id->device,
1455                                 seg->mr_page, offset_in_page(seg->mr_offset),
1456                                 seg->mr_dmalen, seg->mr_dir);
1457         else
1458                 seg->mr_dma = ib_dma_map_single(ia->ri_id->device,
1459                                 seg->mr_offset,
1460                                 seg->mr_dmalen, seg->mr_dir);
1461         if (ib_dma_mapping_error(ia->ri_id->device, seg->mr_dma)) {
1462                 dprintk("RPC:       %s: mr_dma %llx mr_offset %p mr_dma_len %zu\n",
1463                         __func__,
1464                         (unsigned long long)seg->mr_dma,
1465                         seg->mr_offset, seg->mr_dmalen);
1466         }
1467 }
1468
1469 static void
1470 rpcrdma_unmap_one(struct rpcrdma_ia *ia, struct rpcrdma_mr_seg *seg)
1471 {
1472         if (seg->mr_page)
1473                 ib_dma_unmap_page(ia->ri_id->device,
1474                                 seg->mr_dma, seg->mr_dmalen, seg->mr_dir);
1475         else
1476                 ib_dma_unmap_single(ia->ri_id->device,
1477                                 seg->mr_dma, seg->mr_dmalen, seg->mr_dir);
1478 }
1479
1480 static int
1481 rpcrdma_register_frmr_external(struct rpcrdma_mr_seg *seg,
1482                         int *nsegs, int writing, struct rpcrdma_ia *ia,
1483                         struct rpcrdma_xprt *r_xprt)
1484 {
1485         struct rpcrdma_mr_seg *seg1 = seg;
1486         struct ib_send_wr invalidate_wr, frmr_wr, *bad_wr, *post_wr;
1487
1488         u8 key;
1489         int len, pageoff;
1490         int i, rc;
1491         int seg_len;
1492         u64 pa;
1493         int page_no;
1494
1495         pageoff = offset_in_page(seg1->mr_offset);
1496         seg1->mr_offset -= pageoff;     /* start of page */
1497         seg1->mr_len += pageoff;
1498         len = -pageoff;
1499         if (*nsegs > ia->ri_max_frmr_depth)
1500                 *nsegs = ia->ri_max_frmr_depth;
1501         for (page_no = i = 0; i < *nsegs;) {
1502                 rpcrdma_map_one(ia, seg, writing);
1503                 pa = seg->mr_dma;
1504                 for (seg_len = seg->mr_len; seg_len > 0; seg_len -= PAGE_SIZE) {
1505                         seg1->mr_chunk.rl_mw->r.frmr.fr_pgl->
1506                                 page_list[page_no++] = pa;
1507                         pa += PAGE_SIZE;
1508                 }
1509                 len += seg->mr_len;
1510                 ++seg;
1511                 ++i;
1512                 /* Check for holes */
1513                 if ((i < *nsegs && offset_in_page(seg->mr_offset)) ||
1514                     offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len))
1515                         break;
1516         }
1517         dprintk("RPC:       %s: Using frmr %p to map %d segments\n",
1518                 __func__, seg1->mr_chunk.rl_mw, i);
1519
1520         if (unlikely(seg1->mr_chunk.rl_mw->r.frmr.state == FRMR_IS_VALID)) {
1521                 dprintk("RPC:       %s: frmr %x left valid, posting invalidate.\n",
1522                         __func__,
1523                         seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey);
1524                 /* Invalidate before using. */
1525                 memset(&invalidate_wr, 0, sizeof invalidate_wr);
1526                 invalidate_wr.wr_id = (unsigned long)(void *)seg1->mr_chunk.rl_mw;
1527                 invalidate_wr.next = &frmr_wr;
1528                 invalidate_wr.opcode = IB_WR_LOCAL_INV;
1529                 invalidate_wr.send_flags = IB_SEND_SIGNALED;
1530                 invalidate_wr.ex.invalidate_rkey =
1531                         seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey;
1532                 DECR_CQCOUNT(&r_xprt->rx_ep);
1533                 post_wr = &invalidate_wr;
1534         } else
1535                 post_wr = &frmr_wr;
1536
1537         /* Prepare FRMR WR */
1538         memset(&frmr_wr, 0, sizeof frmr_wr);
1539         frmr_wr.wr_id = (unsigned long)(void *)seg1->mr_chunk.rl_mw;
1540         frmr_wr.opcode = IB_WR_FAST_REG_MR;
1541         frmr_wr.send_flags = IB_SEND_SIGNALED;
1542         frmr_wr.wr.fast_reg.iova_start = seg1->mr_dma;
1543         frmr_wr.wr.fast_reg.page_list = seg1->mr_chunk.rl_mw->r.frmr.fr_pgl;
1544         frmr_wr.wr.fast_reg.page_list_len = page_no;
1545         frmr_wr.wr.fast_reg.page_shift = PAGE_SHIFT;
1546         frmr_wr.wr.fast_reg.length = page_no << PAGE_SHIFT;
1547         if (frmr_wr.wr.fast_reg.length < len) {
1548                 while (seg1->mr_nsegs--)
1549                         rpcrdma_unmap_one(ia, seg++);
1550                 return -EIO;
1551         }
1552
1553         /* Bump the key */
1554         key = (u8)(seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey & 0x000000FF);
1555         ib_update_fast_reg_key(seg1->mr_chunk.rl_mw->r.frmr.fr_mr, ++key);
1556
1557         frmr_wr.wr.fast_reg.access_flags = (writing ?
1558                                 IB_ACCESS_REMOTE_WRITE | IB_ACCESS_LOCAL_WRITE :
1559                                 IB_ACCESS_REMOTE_READ);
1560         frmr_wr.wr.fast_reg.rkey = seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey;
1561         DECR_CQCOUNT(&r_xprt->rx_ep);
1562
1563         rc = ib_post_send(ia->ri_id->qp, post_wr, &bad_wr);
1564
1565         if (rc) {
1566                 dprintk("RPC:       %s: failed ib_post_send for register,"
1567                         " status %i\n", __func__, rc);
1568                 while (i--)
1569                         rpcrdma_unmap_one(ia, --seg);
1570         } else {
1571                 seg1->mr_rkey = seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey;
1572                 seg1->mr_base = seg1->mr_dma + pageoff;
1573                 seg1->mr_nsegs = i;
1574                 seg1->mr_len = len;
1575         }
1576         *nsegs = i;
1577         return rc;
1578 }
1579
1580 static int
1581 rpcrdma_deregister_frmr_external(struct rpcrdma_mr_seg *seg,
1582                         struct rpcrdma_ia *ia, struct rpcrdma_xprt *r_xprt)
1583 {
1584         struct rpcrdma_mr_seg *seg1 = seg;
1585         struct ib_send_wr invalidate_wr, *bad_wr;
1586         int rc;
1587
1588         while (seg1->mr_nsegs--)
1589                 rpcrdma_unmap_one(ia, seg++);
1590
1591         memset(&invalidate_wr, 0, sizeof invalidate_wr);
1592         invalidate_wr.wr_id = (unsigned long)(void *)seg1->mr_chunk.rl_mw;
1593         invalidate_wr.opcode = IB_WR_LOCAL_INV;
1594         invalidate_wr.send_flags = IB_SEND_SIGNALED;
1595         invalidate_wr.ex.invalidate_rkey = seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey;
1596         DECR_CQCOUNT(&r_xprt->rx_ep);
1597
1598         rc = ib_post_send(ia->ri_id->qp, &invalidate_wr, &bad_wr);
1599         if (rc)
1600                 dprintk("RPC:       %s: failed ib_post_send for invalidate,"
1601                         " status %i\n", __func__, rc);
1602         return rc;
1603 }
1604
1605 static int
1606 rpcrdma_register_fmr_external(struct rpcrdma_mr_seg *seg,
1607                         int *nsegs, int writing, struct rpcrdma_ia *ia)
1608 {
1609         struct rpcrdma_mr_seg *seg1 = seg;
1610         u64 physaddrs[RPCRDMA_MAX_DATA_SEGS];
1611         int len, pageoff, i, rc;
1612
1613         pageoff = offset_in_page(seg1->mr_offset);
1614         seg1->mr_offset -= pageoff;     /* start of page */
1615         seg1->mr_len += pageoff;
1616         len = -pageoff;
1617         if (*nsegs > RPCRDMA_MAX_DATA_SEGS)
1618                 *nsegs = RPCRDMA_MAX_DATA_SEGS;
1619         for (i = 0; i < *nsegs;) {
1620                 rpcrdma_map_one(ia, seg, writing);
1621                 physaddrs[i] = seg->mr_dma;
1622                 len += seg->mr_len;
1623                 ++seg;
1624                 ++i;
1625                 /* Check for holes */
1626                 if ((i < *nsegs && offset_in_page(seg->mr_offset)) ||
1627                     offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len))
1628                         break;
1629         }
1630         rc = ib_map_phys_fmr(seg1->mr_chunk.rl_mw->r.fmr,
1631                                 physaddrs, i, seg1->mr_dma);
1632         if (rc) {
1633                 dprintk("RPC:       %s: failed ib_map_phys_fmr "
1634                         "%u@0x%llx+%i (%d)... status %i\n", __func__,
1635                         len, (unsigned long long)seg1->mr_dma,
1636                         pageoff, i, rc);
1637                 while (i--)
1638                         rpcrdma_unmap_one(ia, --seg);
1639         } else {
1640                 seg1->mr_rkey = seg1->mr_chunk.rl_mw->r.fmr->rkey;
1641                 seg1->mr_base = seg1->mr_dma + pageoff;
1642                 seg1->mr_nsegs = i;
1643                 seg1->mr_len = len;
1644         }
1645         *nsegs = i;
1646         return rc;
1647 }
1648
1649 static int
1650 rpcrdma_deregister_fmr_external(struct rpcrdma_mr_seg *seg,
1651                         struct rpcrdma_ia *ia)
1652 {
1653         struct rpcrdma_mr_seg *seg1 = seg;
1654         LIST_HEAD(l);
1655         int rc;
1656
1657         list_add(&seg1->mr_chunk.rl_mw->r.fmr->list, &l);
1658         rc = ib_unmap_fmr(&l);
1659         while (seg1->mr_nsegs--)
1660                 rpcrdma_unmap_one(ia, seg++);
1661         if (rc)
1662                 dprintk("RPC:       %s: failed ib_unmap_fmr,"
1663                         " status %i\n", __func__, rc);
1664         return rc;
1665 }
1666
1667 int
1668 rpcrdma_register_external(struct rpcrdma_mr_seg *seg,
1669                         int nsegs, int writing, struct rpcrdma_xprt *r_xprt)
1670 {
1671         struct rpcrdma_ia *ia = &r_xprt->rx_ia;
1672         int rc = 0;
1673
1674         switch (ia->ri_memreg_strategy) {
1675
1676 #if RPCRDMA_PERSISTENT_REGISTRATION
1677         case RPCRDMA_ALLPHYSICAL:
1678                 rpcrdma_map_one(ia, seg, writing);
1679                 seg->mr_rkey = ia->ri_bind_mem->rkey;
1680                 seg->mr_base = seg->mr_dma;
1681                 seg->mr_nsegs = 1;
1682                 nsegs = 1;
1683                 break;
1684 #endif
1685
1686         /* Registration using frmr registration */
1687         case RPCRDMA_FRMR:
1688                 rc = rpcrdma_register_frmr_external(seg, &nsegs, writing, ia, r_xprt);
1689                 break;
1690
1691         /* Registration using fmr memory registration */
1692         case RPCRDMA_MTHCAFMR:
1693                 rc = rpcrdma_register_fmr_external(seg, &nsegs, writing, ia);
1694                 break;
1695
1696         default:
1697                 return -1;
1698         }
1699         if (rc)
1700                 return -1;
1701
1702         return nsegs;
1703 }
1704
1705 int
1706 rpcrdma_deregister_external(struct rpcrdma_mr_seg *seg,
1707                 struct rpcrdma_xprt *r_xprt)
1708 {
1709         struct rpcrdma_ia *ia = &r_xprt->rx_ia;
1710         int nsegs = seg->mr_nsegs, rc;
1711
1712         switch (ia->ri_memreg_strategy) {
1713
1714 #if RPCRDMA_PERSISTENT_REGISTRATION
1715         case RPCRDMA_ALLPHYSICAL:
1716                 rpcrdma_unmap_one(ia, seg);
1717                 break;
1718 #endif
1719
1720         case RPCRDMA_FRMR:
1721                 rc = rpcrdma_deregister_frmr_external(seg, ia, r_xprt);
1722                 break;
1723
1724         case RPCRDMA_MTHCAFMR:
1725                 rc = rpcrdma_deregister_fmr_external(seg, ia);
1726                 break;
1727
1728         default:
1729                 break;
1730         }
1731         return nsegs;
1732 }
1733
1734 /*
1735  * Prepost any receive buffer, then post send.
1736  *
1737  * Receive buffer is donated to hardware, reclaimed upon recv completion.
1738  */
1739 int
1740 rpcrdma_ep_post(struct rpcrdma_ia *ia,
1741                 struct rpcrdma_ep *ep,
1742                 struct rpcrdma_req *req)
1743 {
1744         struct ib_send_wr send_wr, *send_wr_fail;
1745         struct rpcrdma_rep *rep = req->rl_reply;
1746         int rc;
1747
1748         if (rep) {
1749                 rc = rpcrdma_ep_post_recv(ia, ep, rep);
1750                 if (rc)
1751                         goto out;
1752                 req->rl_reply = NULL;
1753         }
1754
1755         send_wr.next = NULL;
1756         send_wr.wr_id = 0ULL;   /* no send cookie */
1757         send_wr.sg_list = req->rl_send_iov;
1758         send_wr.num_sge = req->rl_niovs;
1759         send_wr.opcode = IB_WR_SEND;
1760         if (send_wr.num_sge == 4)       /* no need to sync any pad (constant) */
1761                 ib_dma_sync_single_for_device(ia->ri_id->device,
1762                         req->rl_send_iov[3].addr, req->rl_send_iov[3].length,
1763                         DMA_TO_DEVICE);
1764         ib_dma_sync_single_for_device(ia->ri_id->device,
1765                 req->rl_send_iov[1].addr, req->rl_send_iov[1].length,
1766                 DMA_TO_DEVICE);
1767         ib_dma_sync_single_for_device(ia->ri_id->device,
1768                 req->rl_send_iov[0].addr, req->rl_send_iov[0].length,
1769                 DMA_TO_DEVICE);
1770
1771         if (DECR_CQCOUNT(ep) > 0)
1772                 send_wr.send_flags = 0;
1773         else { /* Provider must take a send completion every now and then */
1774                 INIT_CQCOUNT(ep);
1775                 send_wr.send_flags = IB_SEND_SIGNALED;
1776         }
1777
1778         rc = ib_post_send(ia->ri_id->qp, &send_wr, &send_wr_fail);
1779         if (rc)
1780                 dprintk("RPC:       %s: ib_post_send returned %i\n", __func__,
1781                         rc);
1782 out:
1783         return rc;
1784 }
1785
1786 /*
1787  * (Re)post a receive buffer.
1788  */
1789 int
1790 rpcrdma_ep_post_recv(struct rpcrdma_ia *ia,
1791                      struct rpcrdma_ep *ep,
1792                      struct rpcrdma_rep *rep)
1793 {
1794         struct ib_recv_wr recv_wr, *recv_wr_fail;
1795         int rc;
1796
1797         recv_wr.next = NULL;
1798         recv_wr.wr_id = (u64) (unsigned long) rep;
1799         recv_wr.sg_list = &rep->rr_iov;
1800         recv_wr.num_sge = 1;
1801
1802         ib_dma_sync_single_for_cpu(ia->ri_id->device,
1803                 rep->rr_iov.addr, rep->rr_iov.length, DMA_BIDIRECTIONAL);
1804
1805         rc = ib_post_recv(ia->ri_id->qp, &recv_wr, &recv_wr_fail);
1806
1807         if (rc)
1808                 dprintk("RPC:       %s: ib_post_recv returned %i\n", __func__,
1809                         rc);
1810         return rc;
1811 }