]> git.karo-electronics.de Git - karo-tx-linux.git/blob - net/sunrpc/xprtrdma/verbs.c
xprtrdma: Protect ia->ri_id when unmapping/invalidating MRs
[karo-tx-linux.git] / net / sunrpc / xprtrdma / verbs.c
1 /*
2  * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
3  *
4  * This software is available to you under a choice of one of two
5  * licenses.  You may choose to be licensed under the terms of the GNU
6  * General Public License (GPL) Version 2, available from the file
7  * COPYING in the main directory of this source tree, or the BSD-type
8  * license below:
9  *
10  * Redistribution and use in source and binary forms, with or without
11  * modification, are permitted provided that the following conditions
12  * are met:
13  *
14  *      Redistributions of source code must retain the above copyright
15  *      notice, this list of conditions and the following disclaimer.
16  *
17  *      Redistributions in binary form must reproduce the above
18  *      copyright notice, this list of conditions and the following
19  *      disclaimer in the documentation and/or other materials provided
20  *      with the distribution.
21  *
22  *      Neither the name of the Network Appliance, Inc. nor the names of
23  *      its contributors may be used to endorse or promote products
24  *      derived from this software without specific prior written
25  *      permission.
26  *
27  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
28  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
29  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
30  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
31  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
32  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
33  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
34  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
35  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
36  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
37  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
38  */
39
40 /*
41  * verbs.c
42  *
43  * Encapsulates the major functions managing:
44  *  o adapters
45  *  o endpoints
46  *  o connections
47  *  o buffer memory
48  */
49
50 #include <linux/interrupt.h>
51 #include <linux/slab.h>
52 #include <asm/bitops.h>
53
54 #include "xprt_rdma.h"
55
56 /*
57  * Globals/Macros
58  */
59
60 #ifdef RPC_DEBUG
61 # define RPCDBG_FACILITY        RPCDBG_TRANS
62 #endif
63
64 /*
65  * internal functions
66  */
67
68 /*
69  * handle replies in tasklet context, using a single, global list
70  * rdma tasklet function -- just turn around and call the func
71  * for all replies on the list
72  */
73
74 static DEFINE_SPINLOCK(rpcrdma_tk_lock_g);
75 static LIST_HEAD(rpcrdma_tasklets_g);
76
77 static void
78 rpcrdma_run_tasklet(unsigned long data)
79 {
80         struct rpcrdma_rep *rep;
81         void (*func)(struct rpcrdma_rep *);
82         unsigned long flags;
83
84         data = data;
85         spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
86         while (!list_empty(&rpcrdma_tasklets_g)) {
87                 rep = list_entry(rpcrdma_tasklets_g.next,
88                                  struct rpcrdma_rep, rr_list);
89                 list_del(&rep->rr_list);
90                 func = rep->rr_func;
91                 rep->rr_func = NULL;
92                 spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
93
94                 if (func)
95                         func(rep);
96                 else
97                         rpcrdma_recv_buffer_put(rep);
98
99                 spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
100         }
101         spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
102 }
103
104 static DECLARE_TASKLET(rpcrdma_tasklet_g, rpcrdma_run_tasklet, 0UL);
105
106 static inline void
107 rpcrdma_schedule_tasklet(struct rpcrdma_rep *rep)
108 {
109         unsigned long flags;
110
111         spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
112         list_add_tail(&rep->rr_list, &rpcrdma_tasklets_g);
113         spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
114         tasklet_schedule(&rpcrdma_tasklet_g);
115 }
116
117 static void
118 rpcrdma_qp_async_error_upcall(struct ib_event *event, void *context)
119 {
120         struct rpcrdma_ep *ep = context;
121
122         dprintk("RPC:       %s: QP error %X on device %s ep %p\n",
123                 __func__, event->event, event->device->name, context);
124         if (ep->rep_connected == 1) {
125                 ep->rep_connected = -EIO;
126                 ep->rep_func(ep);
127                 wake_up_all(&ep->rep_connect_wait);
128         }
129 }
130
131 static void
132 rpcrdma_cq_async_error_upcall(struct ib_event *event, void *context)
133 {
134         struct rpcrdma_ep *ep = context;
135
136         dprintk("RPC:       %s: CQ error %X on device %s ep %p\n",
137                 __func__, event->event, event->device->name, context);
138         if (ep->rep_connected == 1) {
139                 ep->rep_connected = -EIO;
140                 ep->rep_func(ep);
141                 wake_up_all(&ep->rep_connect_wait);
142         }
143 }
144
145 static void
146 rpcrdma_sendcq_process_wc(struct ib_wc *wc)
147 {
148         struct rpcrdma_mw *frmr = (struct rpcrdma_mw *)(unsigned long)wc->wr_id;
149
150         dprintk("RPC:       %s: frmr %p status %X opcode %d\n",
151                 __func__, frmr, wc->status, wc->opcode);
152
153         if (wc->wr_id == 0ULL)
154                 return;
155         if (wc->status != IB_WC_SUCCESS)
156                 return;
157
158         if (wc->opcode == IB_WC_FAST_REG_MR)
159                 frmr->r.frmr.state = FRMR_IS_VALID;
160         else if (wc->opcode == IB_WC_LOCAL_INV)
161                 frmr->r.frmr.state = FRMR_IS_INVALID;
162 }
163
164 static int
165 rpcrdma_sendcq_poll(struct ib_cq *cq, struct rpcrdma_ep *ep)
166 {
167         struct ib_wc *wcs;
168         int budget, count, rc;
169
170         budget = RPCRDMA_WC_BUDGET / RPCRDMA_POLLSIZE;
171         do {
172                 wcs = ep->rep_send_wcs;
173
174                 rc = ib_poll_cq(cq, RPCRDMA_POLLSIZE, wcs);
175                 if (rc <= 0)
176                         return rc;
177
178                 count = rc;
179                 while (count-- > 0)
180                         rpcrdma_sendcq_process_wc(wcs++);
181         } while (rc == RPCRDMA_POLLSIZE && --budget);
182         return 0;
183 }
184
185 /*
186  * Handle send, fast_reg_mr, and local_inv completions.
187  *
188  * Send events are typically suppressed and thus do not result
189  * in an upcall. Occasionally one is signaled, however. This
190  * prevents the provider's completion queue from wrapping and
191  * losing a completion.
192  */
193 static void
194 rpcrdma_sendcq_upcall(struct ib_cq *cq, void *cq_context)
195 {
196         struct rpcrdma_ep *ep = (struct rpcrdma_ep *)cq_context;
197         int rc;
198
199         rc = rpcrdma_sendcq_poll(cq, ep);
200         if (rc) {
201                 dprintk("RPC:       %s: ib_poll_cq failed: %i\n",
202                         __func__, rc);
203                 return;
204         }
205
206         rc = ib_req_notify_cq(cq,
207                         IB_CQ_NEXT_COMP | IB_CQ_REPORT_MISSED_EVENTS);
208         if (rc == 0)
209                 return;
210         if (rc < 0) {
211                 dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
212                         __func__, rc);
213                 return;
214         }
215
216         rpcrdma_sendcq_poll(cq, ep);
217 }
218
219 static void
220 rpcrdma_recvcq_process_wc(struct ib_wc *wc)
221 {
222         struct rpcrdma_rep *rep =
223                         (struct rpcrdma_rep *)(unsigned long)wc->wr_id;
224
225         dprintk("RPC:       %s: rep %p status %X opcode %X length %u\n",
226                 __func__, rep, wc->status, wc->opcode, wc->byte_len);
227
228         if (wc->status != IB_WC_SUCCESS) {
229                 rep->rr_len = ~0U;
230                 goto out_schedule;
231         }
232         if (wc->opcode != IB_WC_RECV)
233                 return;
234
235         rep->rr_len = wc->byte_len;
236         ib_dma_sync_single_for_cpu(rdmab_to_ia(rep->rr_buffer)->ri_id->device,
237                         rep->rr_iov.addr, rep->rr_len, DMA_FROM_DEVICE);
238
239         if (rep->rr_len >= 16) {
240                 struct rpcrdma_msg *p = (struct rpcrdma_msg *)rep->rr_base;
241                 unsigned int credits = ntohl(p->rm_credit);
242
243                 if (credits == 0)
244                         credits = 1;    /* don't deadlock */
245                 else if (credits > rep->rr_buffer->rb_max_requests)
246                         credits = rep->rr_buffer->rb_max_requests;
247                 atomic_set(&rep->rr_buffer->rb_credits, credits);
248         }
249
250 out_schedule:
251         rpcrdma_schedule_tasklet(rep);
252 }
253
254 static int
255 rpcrdma_recvcq_poll(struct ib_cq *cq, struct rpcrdma_ep *ep)
256 {
257         struct ib_wc *wcs;
258         int budget, count, rc;
259
260         budget = RPCRDMA_WC_BUDGET / RPCRDMA_POLLSIZE;
261         do {
262                 wcs = ep->rep_recv_wcs;
263
264                 rc = ib_poll_cq(cq, RPCRDMA_POLLSIZE, wcs);
265                 if (rc <= 0)
266                         return rc;
267
268                 count = rc;
269                 while (count-- > 0)
270                         rpcrdma_recvcq_process_wc(wcs++);
271         } while (rc == RPCRDMA_POLLSIZE && --budget);
272         return 0;
273 }
274
275 /*
276  * Handle receive completions.
277  *
278  * It is reentrant but processes single events in order to maintain
279  * ordering of receives to keep server credits.
280  *
281  * It is the responsibility of the scheduled tasklet to return
282  * recv buffers to the pool. NOTE: this affects synchronization of
283  * connection shutdown. That is, the structures required for
284  * the completion of the reply handler must remain intact until
285  * all memory has been reclaimed.
286  */
287 static void
288 rpcrdma_recvcq_upcall(struct ib_cq *cq, void *cq_context)
289 {
290         struct rpcrdma_ep *ep = (struct rpcrdma_ep *)cq_context;
291         int rc;
292
293         rc = rpcrdma_recvcq_poll(cq, ep);
294         if (rc) {
295                 dprintk("RPC:       %s: ib_poll_cq failed: %i\n",
296                         __func__, rc);
297                 return;
298         }
299
300         rc = ib_req_notify_cq(cq,
301                         IB_CQ_NEXT_COMP | IB_CQ_REPORT_MISSED_EVENTS);
302         if (rc == 0)
303                 return;
304         if (rc < 0) {
305                 dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
306                         __func__, rc);
307                 return;
308         }
309
310         rpcrdma_recvcq_poll(cq, ep);
311 }
312
313 #ifdef RPC_DEBUG
314 static const char * const conn[] = {
315         "address resolved",
316         "address error",
317         "route resolved",
318         "route error",
319         "connect request",
320         "connect response",
321         "connect error",
322         "unreachable",
323         "rejected",
324         "established",
325         "disconnected",
326         "device removal"
327 };
328 #endif
329
330 static int
331 rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
332 {
333         struct rpcrdma_xprt *xprt = id->context;
334         struct rpcrdma_ia *ia = &xprt->rx_ia;
335         struct rpcrdma_ep *ep = &xprt->rx_ep;
336 #ifdef RPC_DEBUG
337         struct sockaddr_in *addr = (struct sockaddr_in *) &ep->rep_remote_addr;
338 #endif
339         struct ib_qp_attr attr;
340         struct ib_qp_init_attr iattr;
341         int connstate = 0;
342
343         switch (event->event) {
344         case RDMA_CM_EVENT_ADDR_RESOLVED:
345         case RDMA_CM_EVENT_ROUTE_RESOLVED:
346                 ia->ri_async_rc = 0;
347                 complete(&ia->ri_done);
348                 break;
349         case RDMA_CM_EVENT_ADDR_ERROR:
350                 ia->ri_async_rc = -EHOSTUNREACH;
351                 dprintk("RPC:       %s: CM address resolution error, ep 0x%p\n",
352                         __func__, ep);
353                 complete(&ia->ri_done);
354                 break;
355         case RDMA_CM_EVENT_ROUTE_ERROR:
356                 ia->ri_async_rc = -ENETUNREACH;
357                 dprintk("RPC:       %s: CM route resolution error, ep 0x%p\n",
358                         __func__, ep);
359                 complete(&ia->ri_done);
360                 break;
361         case RDMA_CM_EVENT_ESTABLISHED:
362                 connstate = 1;
363                 ib_query_qp(ia->ri_id->qp, &attr,
364                         IB_QP_MAX_QP_RD_ATOMIC | IB_QP_MAX_DEST_RD_ATOMIC,
365                         &iattr);
366                 dprintk("RPC:       %s: %d responder resources"
367                         " (%d initiator)\n",
368                         __func__, attr.max_dest_rd_atomic, attr.max_rd_atomic);
369                 goto connected;
370         case RDMA_CM_EVENT_CONNECT_ERROR:
371                 connstate = -ENOTCONN;
372                 goto connected;
373         case RDMA_CM_EVENT_UNREACHABLE:
374                 connstate = -ENETDOWN;
375                 goto connected;
376         case RDMA_CM_EVENT_REJECTED:
377                 connstate = -ECONNREFUSED;
378                 goto connected;
379         case RDMA_CM_EVENT_DISCONNECTED:
380                 connstate = -ECONNABORTED;
381                 goto connected;
382         case RDMA_CM_EVENT_DEVICE_REMOVAL:
383                 connstate = -ENODEV;
384 connected:
385                 dprintk("RPC:       %s: %s: %pI4:%u (ep 0x%p event 0x%x)\n",
386                         __func__,
387                         (event->event <= 11) ? conn[event->event] :
388                                                 "unknown connection error",
389                         &addr->sin_addr.s_addr,
390                         ntohs(addr->sin_port),
391                         ep, event->event);
392                 atomic_set(&rpcx_to_rdmax(ep->rep_xprt)->rx_buf.rb_credits, 1);
393                 dprintk("RPC:       %s: %sconnected\n",
394                                         __func__, connstate > 0 ? "" : "dis");
395                 ep->rep_connected = connstate;
396                 ep->rep_func(ep);
397                 wake_up_all(&ep->rep_connect_wait);
398                 break;
399         default:
400                 dprintk("RPC:       %s: unexpected CM event %d\n",
401                         __func__, event->event);
402                 break;
403         }
404
405 #ifdef RPC_DEBUG
406         if (connstate == 1) {
407                 int ird = attr.max_dest_rd_atomic;
408                 int tird = ep->rep_remote_cma.responder_resources;
409                 printk(KERN_INFO "rpcrdma: connection to %pI4:%u "
410                         "on %s, memreg %d slots %d ird %d%s\n",
411                         &addr->sin_addr.s_addr,
412                         ntohs(addr->sin_port),
413                         ia->ri_id->device->name,
414                         ia->ri_memreg_strategy,
415                         xprt->rx_buf.rb_max_requests,
416                         ird, ird < 4 && ird < tird / 2 ? " (low!)" : "");
417         } else if (connstate < 0) {
418                 printk(KERN_INFO "rpcrdma: connection to %pI4:%u closed (%d)\n",
419                         &addr->sin_addr.s_addr,
420                         ntohs(addr->sin_port),
421                         connstate);
422         }
423 #endif
424
425         return 0;
426 }
427
428 static struct rdma_cm_id *
429 rpcrdma_create_id(struct rpcrdma_xprt *xprt,
430                         struct rpcrdma_ia *ia, struct sockaddr *addr)
431 {
432         struct rdma_cm_id *id;
433         int rc;
434
435         init_completion(&ia->ri_done);
436
437         id = rdma_create_id(rpcrdma_conn_upcall, xprt, RDMA_PS_TCP, IB_QPT_RC);
438         if (IS_ERR(id)) {
439                 rc = PTR_ERR(id);
440                 dprintk("RPC:       %s: rdma_create_id() failed %i\n",
441                         __func__, rc);
442                 return id;
443         }
444
445         ia->ri_async_rc = -ETIMEDOUT;
446         rc = rdma_resolve_addr(id, NULL, addr, RDMA_RESOLVE_TIMEOUT);
447         if (rc) {
448                 dprintk("RPC:       %s: rdma_resolve_addr() failed %i\n",
449                         __func__, rc);
450                 goto out;
451         }
452         wait_for_completion_interruptible_timeout(&ia->ri_done,
453                                 msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1);
454         rc = ia->ri_async_rc;
455         if (rc)
456                 goto out;
457
458         ia->ri_async_rc = -ETIMEDOUT;
459         rc = rdma_resolve_route(id, RDMA_RESOLVE_TIMEOUT);
460         if (rc) {
461                 dprintk("RPC:       %s: rdma_resolve_route() failed %i\n",
462                         __func__, rc);
463                 goto out;
464         }
465         wait_for_completion_interruptible_timeout(&ia->ri_done,
466                                 msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1);
467         rc = ia->ri_async_rc;
468         if (rc)
469                 goto out;
470
471         return id;
472
473 out:
474         rdma_destroy_id(id);
475         return ERR_PTR(rc);
476 }
477
478 /*
479  * Drain any cq, prior to teardown.
480  */
481 static void
482 rpcrdma_clean_cq(struct ib_cq *cq)
483 {
484         struct ib_wc wc;
485         int count = 0;
486
487         while (1 == ib_poll_cq(cq, 1, &wc))
488                 ++count;
489
490         if (count)
491                 dprintk("RPC:       %s: flushed %d events (last 0x%x)\n",
492                         __func__, count, wc.opcode);
493 }
494
495 /*
496  * Exported functions.
497  */
498
499 /*
500  * Open and initialize an Interface Adapter.
501  *  o initializes fields of struct rpcrdma_ia, including
502  *    interface and provider attributes and protection zone.
503  */
504 int
505 rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)
506 {
507         int rc, mem_priv;
508         struct ib_device_attr devattr;
509         struct rpcrdma_ia *ia = &xprt->rx_ia;
510
511         ia->ri_id = rpcrdma_create_id(xprt, ia, addr);
512         if (IS_ERR(ia->ri_id)) {
513                 rc = PTR_ERR(ia->ri_id);
514                 goto out1;
515         }
516
517         ia->ri_pd = ib_alloc_pd(ia->ri_id->device);
518         if (IS_ERR(ia->ri_pd)) {
519                 rc = PTR_ERR(ia->ri_pd);
520                 dprintk("RPC:       %s: ib_alloc_pd() failed %i\n",
521                         __func__, rc);
522                 goto out2;
523         }
524
525         /*
526          * Query the device to determine if the requested memory
527          * registration strategy is supported. If it isn't, set the
528          * strategy to a globally supported model.
529          */
530         rc = ib_query_device(ia->ri_id->device, &devattr);
531         if (rc) {
532                 dprintk("RPC:       %s: ib_query_device failed %d\n",
533                         __func__, rc);
534                 goto out2;
535         }
536
537         if (devattr.device_cap_flags & IB_DEVICE_LOCAL_DMA_LKEY) {
538                 ia->ri_have_dma_lkey = 1;
539                 ia->ri_dma_lkey = ia->ri_id->device->local_dma_lkey;
540         }
541
542         if (memreg == RPCRDMA_FRMR) {
543                 /* Requires both frmr reg and local dma lkey */
544                 if ((devattr.device_cap_flags &
545                      (IB_DEVICE_MEM_MGT_EXTENSIONS|IB_DEVICE_LOCAL_DMA_LKEY)) !=
546                     (IB_DEVICE_MEM_MGT_EXTENSIONS|IB_DEVICE_LOCAL_DMA_LKEY)) {
547                         dprintk("RPC:       %s: FRMR registration "
548                                 "not supported by HCA\n", __func__);
549                         memreg = RPCRDMA_MTHCAFMR;
550                 } else {
551                         /* Mind the ia limit on FRMR page list depth */
552                         ia->ri_max_frmr_depth = min_t(unsigned int,
553                                 RPCRDMA_MAX_DATA_SEGS,
554                                 devattr.max_fast_reg_page_list_len);
555                 }
556         }
557         if (memreg == RPCRDMA_MTHCAFMR) {
558                 if (!ia->ri_id->device->alloc_fmr) {
559                         dprintk("RPC:       %s: MTHCAFMR registration "
560                                 "not supported by HCA\n", __func__);
561 #if RPCRDMA_PERSISTENT_REGISTRATION
562                         memreg = RPCRDMA_ALLPHYSICAL;
563 #else
564                         rc = -ENOMEM;
565                         goto out2;
566 #endif
567                 }
568         }
569
570         /*
571          * Optionally obtain an underlying physical identity mapping in
572          * order to do a memory window-based bind. This base registration
573          * is protected from remote access - that is enabled only by binding
574          * for the specific bytes targeted during each RPC operation, and
575          * revoked after the corresponding completion similar to a storage
576          * adapter.
577          */
578         switch (memreg) {
579         case RPCRDMA_FRMR:
580                 break;
581 #if RPCRDMA_PERSISTENT_REGISTRATION
582         case RPCRDMA_ALLPHYSICAL:
583                 mem_priv = IB_ACCESS_LOCAL_WRITE |
584                                 IB_ACCESS_REMOTE_WRITE |
585                                 IB_ACCESS_REMOTE_READ;
586                 goto register_setup;
587 #endif
588         case RPCRDMA_MTHCAFMR:
589                 if (ia->ri_have_dma_lkey)
590                         break;
591                 mem_priv = IB_ACCESS_LOCAL_WRITE;
592 #if RPCRDMA_PERSISTENT_REGISTRATION
593         register_setup:
594 #endif
595                 ia->ri_bind_mem = ib_get_dma_mr(ia->ri_pd, mem_priv);
596                 if (IS_ERR(ia->ri_bind_mem)) {
597                         printk(KERN_ALERT "%s: ib_get_dma_mr for "
598                                 "phys register failed with %lX\n",
599                                 __func__, PTR_ERR(ia->ri_bind_mem));
600                         rc = -ENOMEM;
601                         goto out2;
602                 }
603                 break;
604         default:
605                 printk(KERN_ERR "RPC: Unsupported memory "
606                                 "registration mode: %d\n", memreg);
607                 rc = -ENOMEM;
608                 goto out2;
609         }
610         dprintk("RPC:       %s: memory registration strategy is %d\n",
611                 __func__, memreg);
612
613         /* Else will do memory reg/dereg for each chunk */
614         ia->ri_memreg_strategy = memreg;
615
616         rwlock_init(&ia->ri_qplock);
617         return 0;
618 out2:
619         rdma_destroy_id(ia->ri_id);
620         ia->ri_id = NULL;
621 out1:
622         return rc;
623 }
624
625 /*
626  * Clean up/close an IA.
627  *   o if event handles and PD have been initialized, free them.
628  *   o close the IA
629  */
630 void
631 rpcrdma_ia_close(struct rpcrdma_ia *ia)
632 {
633         int rc;
634
635         dprintk("RPC:       %s: entering\n", __func__);
636         if (ia->ri_bind_mem != NULL) {
637                 rc = ib_dereg_mr(ia->ri_bind_mem);
638                 dprintk("RPC:       %s: ib_dereg_mr returned %i\n",
639                         __func__, rc);
640         }
641         if (ia->ri_id != NULL && !IS_ERR(ia->ri_id)) {
642                 if (ia->ri_id->qp)
643                         rdma_destroy_qp(ia->ri_id);
644                 rdma_destroy_id(ia->ri_id);
645                 ia->ri_id = NULL;
646         }
647         if (ia->ri_pd != NULL && !IS_ERR(ia->ri_pd)) {
648                 rc = ib_dealloc_pd(ia->ri_pd);
649                 dprintk("RPC:       %s: ib_dealloc_pd returned %i\n",
650                         __func__, rc);
651         }
652 }
653
654 /*
655  * Create unconnected endpoint.
656  */
657 int
658 rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
659                                 struct rpcrdma_create_data_internal *cdata)
660 {
661         struct ib_device_attr devattr;
662         struct ib_cq *sendcq, *recvcq;
663         int rc, err;
664
665         rc = ib_query_device(ia->ri_id->device, &devattr);
666         if (rc) {
667                 dprintk("RPC:       %s: ib_query_device failed %d\n",
668                         __func__, rc);
669                 return rc;
670         }
671
672         /* check provider's send/recv wr limits */
673         if (cdata->max_requests > devattr.max_qp_wr)
674                 cdata->max_requests = devattr.max_qp_wr;
675
676         ep->rep_attr.event_handler = rpcrdma_qp_async_error_upcall;
677         ep->rep_attr.qp_context = ep;
678         /* send_cq and recv_cq initialized below */
679         ep->rep_attr.srq = NULL;
680         ep->rep_attr.cap.max_send_wr = cdata->max_requests;
681         switch (ia->ri_memreg_strategy) {
682         case RPCRDMA_FRMR: {
683                 int depth = 7;
684
685                 /* Add room for frmr register and invalidate WRs.
686                  * 1. FRMR reg WR for head
687                  * 2. FRMR invalidate WR for head
688                  * 3. N FRMR reg WRs for pagelist
689                  * 4. N FRMR invalidate WRs for pagelist
690                  * 5. FRMR reg WR for tail
691                  * 6. FRMR invalidate WR for tail
692                  * 7. The RDMA_SEND WR
693                  */
694
695                 /* Calculate N if the device max FRMR depth is smaller than
696                  * RPCRDMA_MAX_DATA_SEGS.
697                  */
698                 if (ia->ri_max_frmr_depth < RPCRDMA_MAX_DATA_SEGS) {
699                         int delta = RPCRDMA_MAX_DATA_SEGS -
700                                     ia->ri_max_frmr_depth;
701
702                         do {
703                                 depth += 2; /* FRMR reg + invalidate */
704                                 delta -= ia->ri_max_frmr_depth;
705                         } while (delta > 0);
706
707                 }
708                 ep->rep_attr.cap.max_send_wr *= depth;
709                 if (ep->rep_attr.cap.max_send_wr > devattr.max_qp_wr) {
710                         cdata->max_requests = devattr.max_qp_wr / depth;
711                         if (!cdata->max_requests)
712                                 return -EINVAL;
713                         ep->rep_attr.cap.max_send_wr = cdata->max_requests *
714                                                        depth;
715                 }
716                 break;
717         }
718         default:
719                 break;
720         }
721         ep->rep_attr.cap.max_recv_wr = cdata->max_requests;
722         ep->rep_attr.cap.max_send_sge = (cdata->padding ? 4 : 2);
723         ep->rep_attr.cap.max_recv_sge = 1;
724         ep->rep_attr.cap.max_inline_data = 0;
725         ep->rep_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
726         ep->rep_attr.qp_type = IB_QPT_RC;
727         ep->rep_attr.port_num = ~0;
728
729         dprintk("RPC:       %s: requested max: dtos: send %d recv %d; "
730                 "iovs: send %d recv %d\n",
731                 __func__,
732                 ep->rep_attr.cap.max_send_wr,
733                 ep->rep_attr.cap.max_recv_wr,
734                 ep->rep_attr.cap.max_send_sge,
735                 ep->rep_attr.cap.max_recv_sge);
736
737         /* set trigger for requesting send completion */
738         ep->rep_cqinit = ep->rep_attr.cap.max_send_wr/2 - 1;
739         if (ep->rep_cqinit <= 2)
740                 ep->rep_cqinit = 0;
741         INIT_CQCOUNT(ep);
742         ep->rep_ia = ia;
743         init_waitqueue_head(&ep->rep_connect_wait);
744         INIT_DELAYED_WORK(&ep->rep_connect_worker, rpcrdma_connect_worker);
745
746         sendcq = ib_create_cq(ia->ri_id->device, rpcrdma_sendcq_upcall,
747                                   rpcrdma_cq_async_error_upcall, ep,
748                                   ep->rep_attr.cap.max_send_wr + 1, 0);
749         if (IS_ERR(sendcq)) {
750                 rc = PTR_ERR(sendcq);
751                 dprintk("RPC:       %s: failed to create send CQ: %i\n",
752                         __func__, rc);
753                 goto out1;
754         }
755
756         rc = ib_req_notify_cq(sendcq, IB_CQ_NEXT_COMP);
757         if (rc) {
758                 dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
759                         __func__, rc);
760                 goto out2;
761         }
762
763         recvcq = ib_create_cq(ia->ri_id->device, rpcrdma_recvcq_upcall,
764                                   rpcrdma_cq_async_error_upcall, ep,
765                                   ep->rep_attr.cap.max_recv_wr + 1, 0);
766         if (IS_ERR(recvcq)) {
767                 rc = PTR_ERR(recvcq);
768                 dprintk("RPC:       %s: failed to create recv CQ: %i\n",
769                         __func__, rc);
770                 goto out2;
771         }
772
773         rc = ib_req_notify_cq(recvcq, IB_CQ_NEXT_COMP);
774         if (rc) {
775                 dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
776                         __func__, rc);
777                 ib_destroy_cq(recvcq);
778                 goto out2;
779         }
780
781         ep->rep_attr.send_cq = sendcq;
782         ep->rep_attr.recv_cq = recvcq;
783
784         /* Initialize cma parameters */
785
786         /* RPC/RDMA does not use private data */
787         ep->rep_remote_cma.private_data = NULL;
788         ep->rep_remote_cma.private_data_len = 0;
789
790         /* Client offers RDMA Read but does not initiate */
791         ep->rep_remote_cma.initiator_depth = 0;
792         if (devattr.max_qp_rd_atom > 32)        /* arbitrary but <= 255 */
793                 ep->rep_remote_cma.responder_resources = 32;
794         else
795                 ep->rep_remote_cma.responder_resources = devattr.max_qp_rd_atom;
796
797         ep->rep_remote_cma.retry_count = 7;
798         ep->rep_remote_cma.flow_control = 0;
799         ep->rep_remote_cma.rnr_retry_count = 0;
800
801         return 0;
802
803 out2:
804         err = ib_destroy_cq(sendcq);
805         if (err)
806                 dprintk("RPC:       %s: ib_destroy_cq returned %i\n",
807                         __func__, err);
808 out1:
809         return rc;
810 }
811
812 /*
813  * rpcrdma_ep_destroy
814  *
815  * Disconnect and destroy endpoint. After this, the only
816  * valid operations on the ep are to free it (if dynamically
817  * allocated) or re-create it.
818  */
819 void
820 rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
821 {
822         int rc;
823
824         dprintk("RPC:       %s: entering, connected is %d\n",
825                 __func__, ep->rep_connected);
826
827         cancel_delayed_work_sync(&ep->rep_connect_worker);
828
829         if (ia->ri_id->qp) {
830                 rc = rpcrdma_ep_disconnect(ep, ia);
831                 if (rc)
832                         dprintk("RPC:       %s: rpcrdma_ep_disconnect"
833                                 " returned %i\n", __func__, rc);
834                 rdma_destroy_qp(ia->ri_id);
835                 ia->ri_id->qp = NULL;
836         }
837
838         /* padding - could be done in rpcrdma_buffer_destroy... */
839         if (ep->rep_pad_mr) {
840                 rpcrdma_deregister_internal(ia, ep->rep_pad_mr, &ep->rep_pad);
841                 ep->rep_pad_mr = NULL;
842         }
843
844         rpcrdma_clean_cq(ep->rep_attr.recv_cq);
845         rc = ib_destroy_cq(ep->rep_attr.recv_cq);
846         if (rc)
847                 dprintk("RPC:       %s: ib_destroy_cq returned %i\n",
848                         __func__, rc);
849
850         rpcrdma_clean_cq(ep->rep_attr.send_cq);
851         rc = ib_destroy_cq(ep->rep_attr.send_cq);
852         if (rc)
853                 dprintk("RPC:       %s: ib_destroy_cq returned %i\n",
854                         __func__, rc);
855 }
856
857 /*
858  * Connect unconnected endpoint.
859  */
860 int
861 rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
862 {
863         struct rdma_cm_id *id, *old;
864         int rc = 0;
865         int retry_count = 0;
866
867         if (ep->rep_connected != 0) {
868                 struct rpcrdma_xprt *xprt;
869 retry:
870                 dprintk("RPC:       %s: reconnecting...\n", __func__);
871                 rc = rpcrdma_ep_disconnect(ep, ia);
872                 if (rc && rc != -ENOTCONN)
873                         dprintk("RPC:       %s: rpcrdma_ep_disconnect"
874                                 " status %i\n", __func__, rc);
875
876                 rpcrdma_clean_cq(ep->rep_attr.recv_cq);
877                 rpcrdma_clean_cq(ep->rep_attr.send_cq);
878
879                 xprt = container_of(ia, struct rpcrdma_xprt, rx_ia);
880                 id = rpcrdma_create_id(xprt, ia,
881                                 (struct sockaddr *)&xprt->rx_data.addr);
882                 if (IS_ERR(id)) {
883                         rc = -EHOSTUNREACH;
884                         goto out;
885                 }
886                 /* TEMP TEMP TEMP - fail if new device:
887                  * Deregister/remarshal *all* requests!
888                  * Close and recreate adapter, pd, etc!
889                  * Re-determine all attributes still sane!
890                  * More stuff I haven't thought of!
891                  * Rrrgh!
892                  */
893                 if (ia->ri_id->device != id->device) {
894                         printk("RPC:       %s: can't reconnect on "
895                                 "different device!\n", __func__);
896                         rdma_destroy_id(id);
897                         rc = -ENETUNREACH;
898                         goto out;
899                 }
900                 /* END TEMP */
901                 rc = rdma_create_qp(id, ia->ri_pd, &ep->rep_attr);
902                 if (rc) {
903                         dprintk("RPC:       %s: rdma_create_qp failed %i\n",
904                                 __func__, rc);
905                         rdma_destroy_id(id);
906                         rc = -ENETUNREACH;
907                         goto out;
908                 }
909
910                 write_lock(&ia->ri_qplock);
911                 old = ia->ri_id;
912                 ia->ri_id = id;
913                 write_unlock(&ia->ri_qplock);
914
915                 rdma_destroy_qp(old);
916                 rdma_destroy_id(old);
917         } else {
918                 dprintk("RPC:       %s: connecting...\n", __func__);
919                 rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr);
920                 if (rc) {
921                         dprintk("RPC:       %s: rdma_create_qp failed %i\n",
922                                 __func__, rc);
923                         /* do not update ep->rep_connected */
924                         return -ENETUNREACH;
925                 }
926         }
927
928         ep->rep_connected = 0;
929
930         rc = rdma_connect(ia->ri_id, &ep->rep_remote_cma);
931         if (rc) {
932                 dprintk("RPC:       %s: rdma_connect() failed with %i\n",
933                                 __func__, rc);
934                 goto out;
935         }
936
937         wait_event_interruptible(ep->rep_connect_wait, ep->rep_connected != 0);
938
939         /*
940          * Check state. A non-peer reject indicates no listener
941          * (ECONNREFUSED), which may be a transient state. All
942          * others indicate a transport condition which has already
943          * undergone a best-effort.
944          */
945         if (ep->rep_connected == -ECONNREFUSED &&
946             ++retry_count <= RDMA_CONNECT_RETRY_MAX) {
947                 dprintk("RPC:       %s: non-peer_reject, retry\n", __func__);
948                 goto retry;
949         }
950         if (ep->rep_connected <= 0) {
951                 /* Sometimes, the only way to reliably connect to remote
952                  * CMs is to use same nonzero values for ORD and IRD. */
953                 if (retry_count++ <= RDMA_CONNECT_RETRY_MAX + 1 &&
954                     (ep->rep_remote_cma.responder_resources == 0 ||
955                      ep->rep_remote_cma.initiator_depth !=
956                                 ep->rep_remote_cma.responder_resources)) {
957                         if (ep->rep_remote_cma.responder_resources == 0)
958                                 ep->rep_remote_cma.responder_resources = 1;
959                         ep->rep_remote_cma.initiator_depth =
960                                 ep->rep_remote_cma.responder_resources;
961                         goto retry;
962                 }
963                 rc = ep->rep_connected;
964         } else {
965                 dprintk("RPC:       %s: connected\n", __func__);
966         }
967
968 out:
969         if (rc)
970                 ep->rep_connected = rc;
971         return rc;
972 }
973
974 /*
975  * rpcrdma_ep_disconnect
976  *
977  * This is separate from destroy to facilitate the ability
978  * to reconnect without recreating the endpoint.
979  *
980  * This call is not reentrant, and must not be made in parallel
981  * on the same endpoint.
982  */
983 int
984 rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
985 {
986         int rc;
987
988         rpcrdma_clean_cq(ep->rep_attr.recv_cq);
989         rpcrdma_clean_cq(ep->rep_attr.send_cq);
990         rc = rdma_disconnect(ia->ri_id);
991         if (!rc) {
992                 /* returns without wait if not connected */
993                 wait_event_interruptible(ep->rep_connect_wait,
994                                                         ep->rep_connected != 1);
995                 dprintk("RPC:       %s: after wait, %sconnected\n", __func__,
996                         (ep->rep_connected == 1) ? "still " : "dis");
997         } else {
998                 dprintk("RPC:       %s: rdma_disconnect %i\n", __func__, rc);
999                 ep->rep_connected = rc;
1000         }
1001         return rc;
1002 }
1003
1004 /*
1005  * Initialize buffer memory
1006  */
1007 int
1008 rpcrdma_buffer_create(struct rpcrdma_buffer *buf, struct rpcrdma_ep *ep,
1009         struct rpcrdma_ia *ia, struct rpcrdma_create_data_internal *cdata)
1010 {
1011         char *p;
1012         size_t len, rlen, wlen;
1013         int i, rc;
1014         struct rpcrdma_mw *r;
1015
1016         buf->rb_max_requests = cdata->max_requests;
1017         spin_lock_init(&buf->rb_lock);
1018         atomic_set(&buf->rb_credits, 1);
1019
1020         /* Need to allocate:
1021          *   1.  arrays for send and recv pointers
1022          *   2.  arrays of struct rpcrdma_req to fill in pointers
1023          *   3.  array of struct rpcrdma_rep for replies
1024          *   4.  padding, if any
1025          *   5.  mw's, fmr's or frmr's, if any
1026          * Send/recv buffers in req/rep need to be registered
1027          */
1028
1029         len = buf->rb_max_requests *
1030                 (sizeof(struct rpcrdma_req *) + sizeof(struct rpcrdma_rep *));
1031         len += cdata->padding;
1032         switch (ia->ri_memreg_strategy) {
1033         case RPCRDMA_FRMR:
1034                 len += buf->rb_max_requests * RPCRDMA_MAX_SEGS *
1035                                 sizeof(struct rpcrdma_mw);
1036                 break;
1037         case RPCRDMA_MTHCAFMR:
1038                 /* TBD we are perhaps overallocating here */
1039                 len += (buf->rb_max_requests + 1) * RPCRDMA_MAX_SEGS *
1040                                 sizeof(struct rpcrdma_mw);
1041                 break;
1042         default:
1043                 break;
1044         }
1045
1046         /* allocate 1, 4 and 5 in one shot */
1047         p = kzalloc(len, GFP_KERNEL);
1048         if (p == NULL) {
1049                 dprintk("RPC:       %s: req_t/rep_t/pad kzalloc(%zd) failed\n",
1050                         __func__, len);
1051                 rc = -ENOMEM;
1052                 goto out;
1053         }
1054         buf->rb_pool = p;       /* for freeing it later */
1055
1056         buf->rb_send_bufs = (struct rpcrdma_req **) p;
1057         p = (char *) &buf->rb_send_bufs[buf->rb_max_requests];
1058         buf->rb_recv_bufs = (struct rpcrdma_rep **) p;
1059         p = (char *) &buf->rb_recv_bufs[buf->rb_max_requests];
1060
1061         /*
1062          * Register the zeroed pad buffer, if any.
1063          */
1064         if (cdata->padding) {
1065                 rc = rpcrdma_register_internal(ia, p, cdata->padding,
1066                                             &ep->rep_pad_mr, &ep->rep_pad);
1067                 if (rc)
1068                         goto out;
1069         }
1070         p += cdata->padding;
1071
1072         INIT_LIST_HEAD(&buf->rb_mws);
1073         r = (struct rpcrdma_mw *)p;
1074         switch (ia->ri_memreg_strategy) {
1075         case RPCRDMA_FRMR:
1076                 for (i = buf->rb_max_requests * RPCRDMA_MAX_SEGS; i; i--) {
1077                         r->r.frmr.fr_mr = ib_alloc_fast_reg_mr(ia->ri_pd,
1078                                                 ia->ri_max_frmr_depth);
1079                         if (IS_ERR(r->r.frmr.fr_mr)) {
1080                                 rc = PTR_ERR(r->r.frmr.fr_mr);
1081                                 dprintk("RPC:       %s: ib_alloc_fast_reg_mr"
1082                                         " failed %i\n", __func__, rc);
1083                                 goto out;
1084                         }
1085                         r->r.frmr.fr_pgl = ib_alloc_fast_reg_page_list(
1086                                                 ia->ri_id->device,
1087                                                 ia->ri_max_frmr_depth);
1088                         if (IS_ERR(r->r.frmr.fr_pgl)) {
1089                                 rc = PTR_ERR(r->r.frmr.fr_pgl);
1090                                 dprintk("RPC:       %s: "
1091                                         "ib_alloc_fast_reg_page_list "
1092                                         "failed %i\n", __func__, rc);
1093
1094                                 ib_dereg_mr(r->r.frmr.fr_mr);
1095                                 goto out;
1096                         }
1097                         list_add(&r->mw_list, &buf->rb_mws);
1098                         ++r;
1099                 }
1100                 break;
1101         case RPCRDMA_MTHCAFMR:
1102                 /* TBD we are perhaps overallocating here */
1103                 for (i = (buf->rb_max_requests+1) * RPCRDMA_MAX_SEGS; i; i--) {
1104                         static struct ib_fmr_attr fa =
1105                                 { RPCRDMA_MAX_DATA_SEGS, 1, PAGE_SHIFT };
1106                         r->r.fmr = ib_alloc_fmr(ia->ri_pd,
1107                                 IB_ACCESS_REMOTE_WRITE | IB_ACCESS_REMOTE_READ,
1108                                 &fa);
1109                         if (IS_ERR(r->r.fmr)) {
1110                                 rc = PTR_ERR(r->r.fmr);
1111                                 dprintk("RPC:       %s: ib_alloc_fmr"
1112                                         " failed %i\n", __func__, rc);
1113                                 goto out;
1114                         }
1115                         list_add(&r->mw_list, &buf->rb_mws);
1116                         ++r;
1117                 }
1118                 break;
1119         default:
1120                 break;
1121         }
1122
1123         /*
1124          * Allocate/init the request/reply buffers. Doing this
1125          * using kmalloc for now -- one for each buf.
1126          */
1127         wlen = 1 << fls(cdata->inline_wsize + sizeof(struct rpcrdma_req));
1128         rlen = 1 << fls(cdata->inline_rsize + sizeof(struct rpcrdma_rep));
1129         dprintk("RPC:       %s: wlen = %zu, rlen = %zu\n",
1130                 __func__, wlen, rlen);
1131
1132         for (i = 0; i < buf->rb_max_requests; i++) {
1133                 struct rpcrdma_req *req;
1134                 struct rpcrdma_rep *rep;
1135
1136                 req = kmalloc(wlen, GFP_KERNEL);
1137                 if (req == NULL) {
1138                         dprintk("RPC:       %s: request buffer %d alloc"
1139                                 " failed\n", __func__, i);
1140                         rc = -ENOMEM;
1141                         goto out;
1142                 }
1143                 memset(req, 0, sizeof(struct rpcrdma_req));
1144                 buf->rb_send_bufs[i] = req;
1145                 buf->rb_send_bufs[i]->rl_buffer = buf;
1146
1147                 rc = rpcrdma_register_internal(ia, req->rl_base,
1148                                 wlen - offsetof(struct rpcrdma_req, rl_base),
1149                                 &buf->rb_send_bufs[i]->rl_handle,
1150                                 &buf->rb_send_bufs[i]->rl_iov);
1151                 if (rc)
1152                         goto out;
1153
1154                 buf->rb_send_bufs[i]->rl_size = wlen -
1155                                                 sizeof(struct rpcrdma_req);
1156
1157                 rep = kmalloc(rlen, GFP_KERNEL);
1158                 if (rep == NULL) {
1159                         dprintk("RPC:       %s: reply buffer %d alloc failed\n",
1160                                 __func__, i);
1161                         rc = -ENOMEM;
1162                         goto out;
1163                 }
1164                 memset(rep, 0, sizeof(struct rpcrdma_rep));
1165                 buf->rb_recv_bufs[i] = rep;
1166                 buf->rb_recv_bufs[i]->rr_buffer = buf;
1167
1168                 rc = rpcrdma_register_internal(ia, rep->rr_base,
1169                                 rlen - offsetof(struct rpcrdma_rep, rr_base),
1170                                 &buf->rb_recv_bufs[i]->rr_handle,
1171                                 &buf->rb_recv_bufs[i]->rr_iov);
1172                 if (rc)
1173                         goto out;
1174
1175         }
1176         dprintk("RPC:       %s: max_requests %d\n",
1177                 __func__, buf->rb_max_requests);
1178         /* done */
1179         return 0;
1180 out:
1181         rpcrdma_buffer_destroy(buf);
1182         return rc;
1183 }
1184
1185 /*
1186  * Unregister and destroy buffer memory. Need to deal with
1187  * partial initialization, so it's callable from failed create.
1188  * Must be called before destroying endpoint, as registrations
1189  * reference it.
1190  */
1191 void
1192 rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
1193 {
1194         int rc, i;
1195         struct rpcrdma_ia *ia = rdmab_to_ia(buf);
1196         struct rpcrdma_mw *r;
1197
1198         /* clean up in reverse order from create
1199          *   1.  recv mr memory (mr free, then kfree)
1200          *   2.  send mr memory (mr free, then kfree)
1201          *   3.  padding (if any) [moved to rpcrdma_ep_destroy]
1202          *   4.  arrays
1203          */
1204         dprintk("RPC:       %s: entering\n", __func__);
1205
1206         for (i = 0; i < buf->rb_max_requests; i++) {
1207                 if (buf->rb_recv_bufs && buf->rb_recv_bufs[i]) {
1208                         rpcrdma_deregister_internal(ia,
1209                                         buf->rb_recv_bufs[i]->rr_handle,
1210                                         &buf->rb_recv_bufs[i]->rr_iov);
1211                         kfree(buf->rb_recv_bufs[i]);
1212                 }
1213                 if (buf->rb_send_bufs && buf->rb_send_bufs[i]) {
1214                         rpcrdma_deregister_internal(ia,
1215                                         buf->rb_send_bufs[i]->rl_handle,
1216                                         &buf->rb_send_bufs[i]->rl_iov);
1217                         kfree(buf->rb_send_bufs[i]);
1218                 }
1219         }
1220
1221         while (!list_empty(&buf->rb_mws)) {
1222                 r = list_entry(buf->rb_mws.next,
1223                         struct rpcrdma_mw, mw_list);
1224                 list_del(&r->mw_list);
1225                 switch (ia->ri_memreg_strategy) {
1226                 case RPCRDMA_FRMR:
1227                         rc = ib_dereg_mr(r->r.frmr.fr_mr);
1228                         if (rc)
1229                                 dprintk("RPC:       %s:"
1230                                         " ib_dereg_mr"
1231                                         " failed %i\n",
1232                                         __func__, rc);
1233                         ib_free_fast_reg_page_list(r->r.frmr.fr_pgl);
1234                         break;
1235                 case RPCRDMA_MTHCAFMR:
1236                         rc = ib_dealloc_fmr(r->r.fmr);
1237                         if (rc)
1238                                 dprintk("RPC:       %s:"
1239                                         " ib_dealloc_fmr"
1240                                         " failed %i\n",
1241                                         __func__, rc);
1242                         break;
1243                 default:
1244                         break;
1245                 }
1246         }
1247
1248         kfree(buf->rb_pool);
1249 }
1250
1251 /*
1252  * Get a set of request/reply buffers.
1253  *
1254  * Reply buffer (if needed) is attached to send buffer upon return.
1255  * Rule:
1256  *    rb_send_index and rb_recv_index MUST always be pointing to the
1257  *    *next* available buffer (non-NULL). They are incremented after
1258  *    removing buffers, and decremented *before* returning them.
1259  */
1260 struct rpcrdma_req *
1261 rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
1262 {
1263         struct rpcrdma_req *req;
1264         unsigned long flags;
1265         int i;
1266         struct rpcrdma_mw *r;
1267
1268         spin_lock_irqsave(&buffers->rb_lock, flags);
1269         if (buffers->rb_send_index == buffers->rb_max_requests) {
1270                 spin_unlock_irqrestore(&buffers->rb_lock, flags);
1271                 dprintk("RPC:       %s: out of request buffers\n", __func__);
1272                 return ((struct rpcrdma_req *)NULL);
1273         }
1274
1275         req = buffers->rb_send_bufs[buffers->rb_send_index];
1276         if (buffers->rb_send_index < buffers->rb_recv_index) {
1277                 dprintk("RPC:       %s: %d extra receives outstanding (ok)\n",
1278                         __func__,
1279                         buffers->rb_recv_index - buffers->rb_send_index);
1280                 req->rl_reply = NULL;
1281         } else {
1282                 req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
1283                 buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
1284         }
1285         buffers->rb_send_bufs[buffers->rb_send_index++] = NULL;
1286         if (!list_empty(&buffers->rb_mws)) {
1287                 i = RPCRDMA_MAX_SEGS - 1;
1288                 do {
1289                         r = list_entry(buffers->rb_mws.next,
1290                                         struct rpcrdma_mw, mw_list);
1291                         list_del(&r->mw_list);
1292                         req->rl_segments[i].mr_chunk.rl_mw = r;
1293                 } while (--i >= 0);
1294         }
1295         spin_unlock_irqrestore(&buffers->rb_lock, flags);
1296         return req;
1297 }
1298
1299 /*
1300  * Put request/reply buffers back into pool.
1301  * Pre-decrement counter/array index.
1302  */
1303 void
1304 rpcrdma_buffer_put(struct rpcrdma_req *req)
1305 {
1306         struct rpcrdma_buffer *buffers = req->rl_buffer;
1307         struct rpcrdma_ia *ia = rdmab_to_ia(buffers);
1308         int i;
1309         unsigned long flags;
1310
1311         spin_lock_irqsave(&buffers->rb_lock, flags);
1312         buffers->rb_send_bufs[--buffers->rb_send_index] = req;
1313         req->rl_niovs = 0;
1314         if (req->rl_reply) {
1315                 buffers->rb_recv_bufs[--buffers->rb_recv_index] = req->rl_reply;
1316                 req->rl_reply->rr_func = NULL;
1317                 req->rl_reply = NULL;
1318         }
1319         switch (ia->ri_memreg_strategy) {
1320         case RPCRDMA_FRMR:
1321         case RPCRDMA_MTHCAFMR:
1322                 /*
1323                  * Cycle mw's back in reverse order, and "spin" them.
1324                  * This delays and scrambles reuse as much as possible.
1325                  */
1326                 i = 1;
1327                 do {
1328                         struct rpcrdma_mw **mw;
1329                         mw = &req->rl_segments[i].mr_chunk.rl_mw;
1330                         list_add_tail(&(*mw)->mw_list, &buffers->rb_mws);
1331                         *mw = NULL;
1332                 } while (++i < RPCRDMA_MAX_SEGS);
1333                 list_add_tail(&req->rl_segments[0].mr_chunk.rl_mw->mw_list,
1334                                         &buffers->rb_mws);
1335                 req->rl_segments[0].mr_chunk.rl_mw = NULL;
1336                 break;
1337         default:
1338                 break;
1339         }
1340         spin_unlock_irqrestore(&buffers->rb_lock, flags);
1341 }
1342
1343 /*
1344  * Recover reply buffers from pool.
1345  * This happens when recovering from error conditions.
1346  * Post-increment counter/array index.
1347  */
1348 void
1349 rpcrdma_recv_buffer_get(struct rpcrdma_req *req)
1350 {
1351         struct rpcrdma_buffer *buffers = req->rl_buffer;
1352         unsigned long flags;
1353
1354         if (req->rl_iov.length == 0)    /* special case xprt_rdma_allocate() */
1355                 buffers = ((struct rpcrdma_req *) buffers)->rl_buffer;
1356         spin_lock_irqsave(&buffers->rb_lock, flags);
1357         if (buffers->rb_recv_index < buffers->rb_max_requests) {
1358                 req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
1359                 buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
1360         }
1361         spin_unlock_irqrestore(&buffers->rb_lock, flags);
1362 }
1363
1364 /*
1365  * Put reply buffers back into pool when not attached to
1366  * request. This happens in error conditions.
1367  */
1368 void
1369 rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
1370 {
1371         struct rpcrdma_buffer *buffers = rep->rr_buffer;
1372         unsigned long flags;
1373
1374         rep->rr_func = NULL;
1375         spin_lock_irqsave(&buffers->rb_lock, flags);
1376         buffers->rb_recv_bufs[--buffers->rb_recv_index] = rep;
1377         spin_unlock_irqrestore(&buffers->rb_lock, flags);
1378 }
1379
1380 /*
1381  * Wrappers for internal-use kmalloc memory registration, used by buffer code.
1382  */
1383
1384 int
1385 rpcrdma_register_internal(struct rpcrdma_ia *ia, void *va, int len,
1386                                 struct ib_mr **mrp, struct ib_sge *iov)
1387 {
1388         struct ib_phys_buf ipb;
1389         struct ib_mr *mr;
1390         int rc;
1391
1392         /*
1393          * All memory passed here was kmalloc'ed, therefore phys-contiguous.
1394          */
1395         iov->addr = ib_dma_map_single(ia->ri_id->device,
1396                         va, len, DMA_BIDIRECTIONAL);
1397         if (ib_dma_mapping_error(ia->ri_id->device, iov->addr))
1398                 return -ENOMEM;
1399
1400         iov->length = len;
1401
1402         if (ia->ri_have_dma_lkey) {
1403                 *mrp = NULL;
1404                 iov->lkey = ia->ri_dma_lkey;
1405                 return 0;
1406         } else if (ia->ri_bind_mem != NULL) {
1407                 *mrp = NULL;
1408                 iov->lkey = ia->ri_bind_mem->lkey;
1409                 return 0;
1410         }
1411
1412         ipb.addr = iov->addr;
1413         ipb.size = iov->length;
1414         mr = ib_reg_phys_mr(ia->ri_pd, &ipb, 1,
1415                         IB_ACCESS_LOCAL_WRITE, &iov->addr);
1416
1417         dprintk("RPC:       %s: phys convert: 0x%llx "
1418                         "registered 0x%llx length %d\n",
1419                         __func__, (unsigned long long)ipb.addr,
1420                         (unsigned long long)iov->addr, len);
1421
1422         if (IS_ERR(mr)) {
1423                 *mrp = NULL;
1424                 rc = PTR_ERR(mr);
1425                 dprintk("RPC:       %s: failed with %i\n", __func__, rc);
1426         } else {
1427                 *mrp = mr;
1428                 iov->lkey = mr->lkey;
1429                 rc = 0;
1430         }
1431
1432         return rc;
1433 }
1434
1435 int
1436 rpcrdma_deregister_internal(struct rpcrdma_ia *ia,
1437                                 struct ib_mr *mr, struct ib_sge *iov)
1438 {
1439         int rc;
1440
1441         ib_dma_unmap_single(ia->ri_id->device,
1442                         iov->addr, iov->length, DMA_BIDIRECTIONAL);
1443
1444         if (NULL == mr)
1445                 return 0;
1446
1447         rc = ib_dereg_mr(mr);
1448         if (rc)
1449                 dprintk("RPC:       %s: ib_dereg_mr failed %i\n", __func__, rc);
1450         return rc;
1451 }
1452
1453 /*
1454  * Wrappers for chunk registration, shared by read/write chunk code.
1455  */
1456
1457 static void
1458 rpcrdma_map_one(struct rpcrdma_ia *ia, struct rpcrdma_mr_seg *seg, int writing)
1459 {
1460         seg->mr_dir = writing ? DMA_FROM_DEVICE : DMA_TO_DEVICE;
1461         seg->mr_dmalen = seg->mr_len;
1462         if (seg->mr_page)
1463                 seg->mr_dma = ib_dma_map_page(ia->ri_id->device,
1464                                 seg->mr_page, offset_in_page(seg->mr_offset),
1465                                 seg->mr_dmalen, seg->mr_dir);
1466         else
1467                 seg->mr_dma = ib_dma_map_single(ia->ri_id->device,
1468                                 seg->mr_offset,
1469                                 seg->mr_dmalen, seg->mr_dir);
1470         if (ib_dma_mapping_error(ia->ri_id->device, seg->mr_dma)) {
1471                 dprintk("RPC:       %s: mr_dma %llx mr_offset %p mr_dma_len %zu\n",
1472                         __func__,
1473                         (unsigned long long)seg->mr_dma,
1474                         seg->mr_offset, seg->mr_dmalen);
1475         }
1476 }
1477
1478 static void
1479 rpcrdma_unmap_one(struct rpcrdma_ia *ia, struct rpcrdma_mr_seg *seg)
1480 {
1481         if (seg->mr_page)
1482                 ib_dma_unmap_page(ia->ri_id->device,
1483                                 seg->mr_dma, seg->mr_dmalen, seg->mr_dir);
1484         else
1485                 ib_dma_unmap_single(ia->ri_id->device,
1486                                 seg->mr_dma, seg->mr_dmalen, seg->mr_dir);
1487 }
1488
1489 static int
1490 rpcrdma_register_frmr_external(struct rpcrdma_mr_seg *seg,
1491                         int *nsegs, int writing, struct rpcrdma_ia *ia,
1492                         struct rpcrdma_xprt *r_xprt)
1493 {
1494         struct rpcrdma_mr_seg *seg1 = seg;
1495         struct ib_send_wr invalidate_wr, frmr_wr, *bad_wr, *post_wr;
1496
1497         u8 key;
1498         int len, pageoff;
1499         int i, rc;
1500         int seg_len;
1501         u64 pa;
1502         int page_no;
1503
1504         pageoff = offset_in_page(seg1->mr_offset);
1505         seg1->mr_offset -= pageoff;     /* start of page */
1506         seg1->mr_len += pageoff;
1507         len = -pageoff;
1508         if (*nsegs > ia->ri_max_frmr_depth)
1509                 *nsegs = ia->ri_max_frmr_depth;
1510         for (page_no = i = 0; i < *nsegs;) {
1511                 rpcrdma_map_one(ia, seg, writing);
1512                 pa = seg->mr_dma;
1513                 for (seg_len = seg->mr_len; seg_len > 0; seg_len -= PAGE_SIZE) {
1514                         seg1->mr_chunk.rl_mw->r.frmr.fr_pgl->
1515                                 page_list[page_no++] = pa;
1516                         pa += PAGE_SIZE;
1517                 }
1518                 len += seg->mr_len;
1519                 ++seg;
1520                 ++i;
1521                 /* Check for holes */
1522                 if ((i < *nsegs && offset_in_page(seg->mr_offset)) ||
1523                     offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len))
1524                         break;
1525         }
1526         dprintk("RPC:       %s: Using frmr %p to map %d segments\n",
1527                 __func__, seg1->mr_chunk.rl_mw, i);
1528
1529         if (unlikely(seg1->mr_chunk.rl_mw->r.frmr.state == FRMR_IS_VALID)) {
1530                 dprintk("RPC:       %s: frmr %x left valid, posting invalidate.\n",
1531                         __func__,
1532                         seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey);
1533                 /* Invalidate before using. */
1534                 memset(&invalidate_wr, 0, sizeof invalidate_wr);
1535                 invalidate_wr.wr_id = (unsigned long)(void *)seg1->mr_chunk.rl_mw;
1536                 invalidate_wr.next = &frmr_wr;
1537                 invalidate_wr.opcode = IB_WR_LOCAL_INV;
1538                 invalidate_wr.send_flags = IB_SEND_SIGNALED;
1539                 invalidate_wr.ex.invalidate_rkey =
1540                         seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey;
1541                 DECR_CQCOUNT(&r_xprt->rx_ep);
1542                 post_wr = &invalidate_wr;
1543         } else
1544                 post_wr = &frmr_wr;
1545
1546         /* Prepare FRMR WR */
1547         memset(&frmr_wr, 0, sizeof frmr_wr);
1548         frmr_wr.wr_id = (unsigned long)(void *)seg1->mr_chunk.rl_mw;
1549         frmr_wr.opcode = IB_WR_FAST_REG_MR;
1550         frmr_wr.send_flags = IB_SEND_SIGNALED;
1551         frmr_wr.wr.fast_reg.iova_start = seg1->mr_dma;
1552         frmr_wr.wr.fast_reg.page_list = seg1->mr_chunk.rl_mw->r.frmr.fr_pgl;
1553         frmr_wr.wr.fast_reg.page_list_len = page_no;
1554         frmr_wr.wr.fast_reg.page_shift = PAGE_SHIFT;
1555         frmr_wr.wr.fast_reg.length = page_no << PAGE_SHIFT;
1556         if (frmr_wr.wr.fast_reg.length < len) {
1557                 rc = -EIO;
1558                 goto out_err;
1559         }
1560
1561         /* Bump the key */
1562         key = (u8)(seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey & 0x000000FF);
1563         ib_update_fast_reg_key(seg1->mr_chunk.rl_mw->r.frmr.fr_mr, ++key);
1564
1565         frmr_wr.wr.fast_reg.access_flags = (writing ?
1566                                 IB_ACCESS_REMOTE_WRITE | IB_ACCESS_LOCAL_WRITE :
1567                                 IB_ACCESS_REMOTE_READ);
1568         frmr_wr.wr.fast_reg.rkey = seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey;
1569         DECR_CQCOUNT(&r_xprt->rx_ep);
1570
1571         rc = ib_post_send(ia->ri_id->qp, post_wr, &bad_wr);
1572
1573         if (rc) {
1574                 dprintk("RPC:       %s: failed ib_post_send for register,"
1575                         " status %i\n", __func__, rc);
1576                 goto out_err;
1577         } else {
1578                 seg1->mr_rkey = seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey;
1579                 seg1->mr_base = seg1->mr_dma + pageoff;
1580                 seg1->mr_nsegs = i;
1581                 seg1->mr_len = len;
1582         }
1583         *nsegs = i;
1584         return 0;
1585 out_err:
1586         while (i--)
1587                 rpcrdma_unmap_one(ia, --seg);
1588         return rc;
1589 }
1590
1591 static int
1592 rpcrdma_deregister_frmr_external(struct rpcrdma_mr_seg *seg,
1593                         struct rpcrdma_ia *ia, struct rpcrdma_xprt *r_xprt)
1594 {
1595         struct rpcrdma_mr_seg *seg1 = seg;
1596         struct ib_send_wr invalidate_wr, *bad_wr;
1597         int rc;
1598
1599         memset(&invalidate_wr, 0, sizeof invalidate_wr);
1600         invalidate_wr.wr_id = (unsigned long)(void *)seg1->mr_chunk.rl_mw;
1601         invalidate_wr.opcode = IB_WR_LOCAL_INV;
1602         invalidate_wr.send_flags = IB_SEND_SIGNALED;
1603         invalidate_wr.ex.invalidate_rkey = seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey;
1604         DECR_CQCOUNT(&r_xprt->rx_ep);
1605
1606         read_lock(&ia->ri_qplock);
1607         while (seg1->mr_nsegs--)
1608                 rpcrdma_unmap_one(ia, seg++);
1609         rc = ib_post_send(ia->ri_id->qp, &invalidate_wr, &bad_wr);
1610         read_unlock(&ia->ri_qplock);
1611         if (rc)
1612                 dprintk("RPC:       %s: failed ib_post_send for invalidate,"
1613                         " status %i\n", __func__, rc);
1614         return rc;
1615 }
1616
1617 static int
1618 rpcrdma_register_fmr_external(struct rpcrdma_mr_seg *seg,
1619                         int *nsegs, int writing, struct rpcrdma_ia *ia)
1620 {
1621         struct rpcrdma_mr_seg *seg1 = seg;
1622         u64 physaddrs[RPCRDMA_MAX_DATA_SEGS];
1623         int len, pageoff, i, rc;
1624
1625         pageoff = offset_in_page(seg1->mr_offset);
1626         seg1->mr_offset -= pageoff;     /* start of page */
1627         seg1->mr_len += pageoff;
1628         len = -pageoff;
1629         if (*nsegs > RPCRDMA_MAX_DATA_SEGS)
1630                 *nsegs = RPCRDMA_MAX_DATA_SEGS;
1631         for (i = 0; i < *nsegs;) {
1632                 rpcrdma_map_one(ia, seg, writing);
1633                 physaddrs[i] = seg->mr_dma;
1634                 len += seg->mr_len;
1635                 ++seg;
1636                 ++i;
1637                 /* Check for holes */
1638                 if ((i < *nsegs && offset_in_page(seg->mr_offset)) ||
1639                     offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len))
1640                         break;
1641         }
1642         rc = ib_map_phys_fmr(seg1->mr_chunk.rl_mw->r.fmr,
1643                                 physaddrs, i, seg1->mr_dma);
1644         if (rc) {
1645                 dprintk("RPC:       %s: failed ib_map_phys_fmr "
1646                         "%u@0x%llx+%i (%d)... status %i\n", __func__,
1647                         len, (unsigned long long)seg1->mr_dma,
1648                         pageoff, i, rc);
1649                 while (i--)
1650                         rpcrdma_unmap_one(ia, --seg);
1651         } else {
1652                 seg1->mr_rkey = seg1->mr_chunk.rl_mw->r.fmr->rkey;
1653                 seg1->mr_base = seg1->mr_dma + pageoff;
1654                 seg1->mr_nsegs = i;
1655                 seg1->mr_len = len;
1656         }
1657         *nsegs = i;
1658         return rc;
1659 }
1660
1661 static int
1662 rpcrdma_deregister_fmr_external(struct rpcrdma_mr_seg *seg,
1663                         struct rpcrdma_ia *ia)
1664 {
1665         struct rpcrdma_mr_seg *seg1 = seg;
1666         LIST_HEAD(l);
1667         int rc;
1668
1669         list_add(&seg1->mr_chunk.rl_mw->r.fmr->list, &l);
1670         rc = ib_unmap_fmr(&l);
1671         read_lock(&ia->ri_qplock);
1672         while (seg1->mr_nsegs--)
1673                 rpcrdma_unmap_one(ia, seg++);
1674         read_unlock(&ia->ri_qplock);
1675         if (rc)
1676                 dprintk("RPC:       %s: failed ib_unmap_fmr,"
1677                         " status %i\n", __func__, rc);
1678         return rc;
1679 }
1680
1681 int
1682 rpcrdma_register_external(struct rpcrdma_mr_seg *seg,
1683                         int nsegs, int writing, struct rpcrdma_xprt *r_xprt)
1684 {
1685         struct rpcrdma_ia *ia = &r_xprt->rx_ia;
1686         int rc = 0;
1687
1688         switch (ia->ri_memreg_strategy) {
1689
1690 #if RPCRDMA_PERSISTENT_REGISTRATION
1691         case RPCRDMA_ALLPHYSICAL:
1692                 rpcrdma_map_one(ia, seg, writing);
1693                 seg->mr_rkey = ia->ri_bind_mem->rkey;
1694                 seg->mr_base = seg->mr_dma;
1695                 seg->mr_nsegs = 1;
1696                 nsegs = 1;
1697                 break;
1698 #endif
1699
1700         /* Registration using frmr registration */
1701         case RPCRDMA_FRMR:
1702                 rc = rpcrdma_register_frmr_external(seg, &nsegs, writing, ia, r_xprt);
1703                 break;
1704
1705         /* Registration using fmr memory registration */
1706         case RPCRDMA_MTHCAFMR:
1707                 rc = rpcrdma_register_fmr_external(seg, &nsegs, writing, ia);
1708                 break;
1709
1710         default:
1711                 return -1;
1712         }
1713         if (rc)
1714                 return -1;
1715
1716         return nsegs;
1717 }
1718
1719 int
1720 rpcrdma_deregister_external(struct rpcrdma_mr_seg *seg,
1721                 struct rpcrdma_xprt *r_xprt)
1722 {
1723         struct rpcrdma_ia *ia = &r_xprt->rx_ia;
1724         int nsegs = seg->mr_nsegs, rc;
1725
1726         switch (ia->ri_memreg_strategy) {
1727
1728 #if RPCRDMA_PERSISTENT_REGISTRATION
1729         case RPCRDMA_ALLPHYSICAL:
1730                 read_lock(&ia->ri_qplock);
1731                 rpcrdma_unmap_one(ia, seg);
1732                 read_unlock(&ia->ri_qplock);
1733                 break;
1734 #endif
1735
1736         case RPCRDMA_FRMR:
1737                 rc = rpcrdma_deregister_frmr_external(seg, ia, r_xprt);
1738                 break;
1739
1740         case RPCRDMA_MTHCAFMR:
1741                 rc = rpcrdma_deregister_fmr_external(seg, ia);
1742                 break;
1743
1744         default:
1745                 break;
1746         }
1747         return nsegs;
1748 }
1749
1750 /*
1751  * Prepost any receive buffer, then post send.
1752  *
1753  * Receive buffer is donated to hardware, reclaimed upon recv completion.
1754  */
1755 int
1756 rpcrdma_ep_post(struct rpcrdma_ia *ia,
1757                 struct rpcrdma_ep *ep,
1758                 struct rpcrdma_req *req)
1759 {
1760         struct ib_send_wr send_wr, *send_wr_fail;
1761         struct rpcrdma_rep *rep = req->rl_reply;
1762         int rc;
1763
1764         if (rep) {
1765                 rc = rpcrdma_ep_post_recv(ia, ep, rep);
1766                 if (rc)
1767                         goto out;
1768                 req->rl_reply = NULL;
1769         }
1770
1771         send_wr.next = NULL;
1772         send_wr.wr_id = 0ULL;   /* no send cookie */
1773         send_wr.sg_list = req->rl_send_iov;
1774         send_wr.num_sge = req->rl_niovs;
1775         send_wr.opcode = IB_WR_SEND;
1776         if (send_wr.num_sge == 4)       /* no need to sync any pad (constant) */
1777                 ib_dma_sync_single_for_device(ia->ri_id->device,
1778                         req->rl_send_iov[3].addr, req->rl_send_iov[3].length,
1779                         DMA_TO_DEVICE);
1780         ib_dma_sync_single_for_device(ia->ri_id->device,
1781                 req->rl_send_iov[1].addr, req->rl_send_iov[1].length,
1782                 DMA_TO_DEVICE);
1783         ib_dma_sync_single_for_device(ia->ri_id->device,
1784                 req->rl_send_iov[0].addr, req->rl_send_iov[0].length,
1785                 DMA_TO_DEVICE);
1786
1787         if (DECR_CQCOUNT(ep) > 0)
1788                 send_wr.send_flags = 0;
1789         else { /* Provider must take a send completion every now and then */
1790                 INIT_CQCOUNT(ep);
1791                 send_wr.send_flags = IB_SEND_SIGNALED;
1792         }
1793
1794         rc = ib_post_send(ia->ri_id->qp, &send_wr, &send_wr_fail);
1795         if (rc)
1796                 dprintk("RPC:       %s: ib_post_send returned %i\n", __func__,
1797                         rc);
1798 out:
1799         return rc;
1800 }
1801
1802 /*
1803  * (Re)post a receive buffer.
1804  */
1805 int
1806 rpcrdma_ep_post_recv(struct rpcrdma_ia *ia,
1807                      struct rpcrdma_ep *ep,
1808                      struct rpcrdma_rep *rep)
1809 {
1810         struct ib_recv_wr recv_wr, *recv_wr_fail;
1811         int rc;
1812
1813         recv_wr.next = NULL;
1814         recv_wr.wr_id = (u64) (unsigned long) rep;
1815         recv_wr.sg_list = &rep->rr_iov;
1816         recv_wr.num_sge = 1;
1817
1818         ib_dma_sync_single_for_cpu(ia->ri_id->device,
1819                 rep->rr_iov.addr, rep->rr_iov.length, DMA_BIDIRECTIONAL);
1820
1821         rc = ib_post_recv(ia->ri_id->qp, &recv_wr, &recv_wr_fail);
1822
1823         if (rc)
1824                 dprintk("RPC:       %s: ib_post_recv returned %i\n", __func__,
1825                         rc);
1826         return rc;
1827 }