]> git.karo-electronics.de Git - karo-tx-linux.git/blob - net/sunrpc/xprtrdma/verbs.c
xprtrdma: Properly handle exhaustion of the rb_mws list
[karo-tx-linux.git] / net / sunrpc / xprtrdma / verbs.c
1 /*
2  * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
3  *
4  * This software is available to you under a choice of one of two
5  * licenses.  You may choose to be licensed under the terms of the GNU
6  * General Public License (GPL) Version 2, available from the file
7  * COPYING in the main directory of this source tree, or the BSD-type
8  * license below:
9  *
10  * Redistribution and use in source and binary forms, with or without
11  * modification, are permitted provided that the following conditions
12  * are met:
13  *
14  *      Redistributions of source code must retain the above copyright
15  *      notice, this list of conditions and the following disclaimer.
16  *
17  *      Redistributions in binary form must reproduce the above
18  *      copyright notice, this list of conditions and the following
19  *      disclaimer in the documentation and/or other materials provided
20  *      with the distribution.
21  *
22  *      Neither the name of the Network Appliance, Inc. nor the names of
23  *      its contributors may be used to endorse or promote products
24  *      derived from this software without specific prior written
25  *      permission.
26  *
27  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
28  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
29  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
30  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
31  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
32  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
33  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
34  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
35  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
36  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
37  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
38  */
39
40 /*
41  * verbs.c
42  *
43  * Encapsulates the major functions managing:
44  *  o adapters
45  *  o endpoints
46  *  o connections
47  *  o buffer memory
48  */
49
50 #include <linux/interrupt.h>
51 #include <linux/slab.h>
52 #include <asm/bitops.h>
53
54 #include "xprt_rdma.h"
55
56 /*
57  * Globals/Macros
58  */
59
60 #ifdef RPC_DEBUG
61 # define RPCDBG_FACILITY        RPCDBG_TRANS
62 #endif
63
64 /*
65  * internal functions
66  */
67
68 /*
69  * handle replies in tasklet context, using a single, global list
70  * rdma tasklet function -- just turn around and call the func
71  * for all replies on the list
72  */
73
74 static DEFINE_SPINLOCK(rpcrdma_tk_lock_g);
75 static LIST_HEAD(rpcrdma_tasklets_g);
76
77 static void
78 rpcrdma_run_tasklet(unsigned long data)
79 {
80         struct rpcrdma_rep *rep;
81         void (*func)(struct rpcrdma_rep *);
82         unsigned long flags;
83
84         data = data;
85         spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
86         while (!list_empty(&rpcrdma_tasklets_g)) {
87                 rep = list_entry(rpcrdma_tasklets_g.next,
88                                  struct rpcrdma_rep, rr_list);
89                 list_del(&rep->rr_list);
90                 func = rep->rr_func;
91                 rep->rr_func = NULL;
92                 spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
93
94                 if (func)
95                         func(rep);
96                 else
97                         rpcrdma_recv_buffer_put(rep);
98
99                 spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
100         }
101         spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
102 }
103
104 static DECLARE_TASKLET(rpcrdma_tasklet_g, rpcrdma_run_tasklet, 0UL);
105
106 static inline void
107 rpcrdma_schedule_tasklet(struct rpcrdma_rep *rep)
108 {
109         unsigned long flags;
110
111         spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
112         list_add_tail(&rep->rr_list, &rpcrdma_tasklets_g);
113         spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
114         tasklet_schedule(&rpcrdma_tasklet_g);
115 }
116
117 static void
118 rpcrdma_qp_async_error_upcall(struct ib_event *event, void *context)
119 {
120         struct rpcrdma_ep *ep = context;
121
122         dprintk("RPC:       %s: QP error %X on device %s ep %p\n",
123                 __func__, event->event, event->device->name, context);
124         if (ep->rep_connected == 1) {
125                 ep->rep_connected = -EIO;
126                 ep->rep_func(ep);
127                 wake_up_all(&ep->rep_connect_wait);
128         }
129 }
130
131 static void
132 rpcrdma_cq_async_error_upcall(struct ib_event *event, void *context)
133 {
134         struct rpcrdma_ep *ep = context;
135
136         dprintk("RPC:       %s: CQ error %X on device %s ep %p\n",
137                 __func__, event->event, event->device->name, context);
138         if (ep->rep_connected == 1) {
139                 ep->rep_connected = -EIO;
140                 ep->rep_func(ep);
141                 wake_up_all(&ep->rep_connect_wait);
142         }
143 }
144
145 static void
146 rpcrdma_sendcq_process_wc(struct ib_wc *wc)
147 {
148         struct rpcrdma_mw *frmr = (struct rpcrdma_mw *)(unsigned long)wc->wr_id;
149
150         dprintk("RPC:       %s: frmr %p status %X opcode %d\n",
151                 __func__, frmr, wc->status, wc->opcode);
152
153         if (wc->wr_id == 0ULL)
154                 return;
155         if (wc->status != IB_WC_SUCCESS)
156                 return;
157
158         if (wc->opcode == IB_WC_FAST_REG_MR)
159                 frmr->r.frmr.fr_state = FRMR_IS_VALID;
160         else if (wc->opcode == IB_WC_LOCAL_INV)
161                 frmr->r.frmr.fr_state = FRMR_IS_INVALID;
162 }
163
164 static int
165 rpcrdma_sendcq_poll(struct ib_cq *cq, struct rpcrdma_ep *ep)
166 {
167         struct ib_wc *wcs;
168         int budget, count, rc;
169
170         budget = RPCRDMA_WC_BUDGET / RPCRDMA_POLLSIZE;
171         do {
172                 wcs = ep->rep_send_wcs;
173
174                 rc = ib_poll_cq(cq, RPCRDMA_POLLSIZE, wcs);
175                 if (rc <= 0)
176                         return rc;
177
178                 count = rc;
179                 while (count-- > 0)
180                         rpcrdma_sendcq_process_wc(wcs++);
181         } while (rc == RPCRDMA_POLLSIZE && --budget);
182         return 0;
183 }
184
185 /*
186  * Handle send, fast_reg_mr, and local_inv completions.
187  *
188  * Send events are typically suppressed and thus do not result
189  * in an upcall. Occasionally one is signaled, however. This
190  * prevents the provider's completion queue from wrapping and
191  * losing a completion.
192  */
193 static void
194 rpcrdma_sendcq_upcall(struct ib_cq *cq, void *cq_context)
195 {
196         struct rpcrdma_ep *ep = (struct rpcrdma_ep *)cq_context;
197         int rc;
198
199         rc = rpcrdma_sendcq_poll(cq, ep);
200         if (rc) {
201                 dprintk("RPC:       %s: ib_poll_cq failed: %i\n",
202                         __func__, rc);
203                 return;
204         }
205
206         rc = ib_req_notify_cq(cq,
207                         IB_CQ_NEXT_COMP | IB_CQ_REPORT_MISSED_EVENTS);
208         if (rc == 0)
209                 return;
210         if (rc < 0) {
211                 dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
212                         __func__, rc);
213                 return;
214         }
215
216         rpcrdma_sendcq_poll(cq, ep);
217 }
218
219 static void
220 rpcrdma_recvcq_process_wc(struct ib_wc *wc)
221 {
222         struct rpcrdma_rep *rep =
223                         (struct rpcrdma_rep *)(unsigned long)wc->wr_id;
224
225         dprintk("RPC:       %s: rep %p status %X opcode %X length %u\n",
226                 __func__, rep, wc->status, wc->opcode, wc->byte_len);
227
228         if (wc->status != IB_WC_SUCCESS) {
229                 rep->rr_len = ~0U;
230                 goto out_schedule;
231         }
232         if (wc->opcode != IB_WC_RECV)
233                 return;
234
235         rep->rr_len = wc->byte_len;
236         ib_dma_sync_single_for_cpu(rdmab_to_ia(rep->rr_buffer)->ri_id->device,
237                         rep->rr_iov.addr, rep->rr_len, DMA_FROM_DEVICE);
238
239         if (rep->rr_len >= 16) {
240                 struct rpcrdma_msg *p = (struct rpcrdma_msg *)rep->rr_base;
241                 unsigned int credits = ntohl(p->rm_credit);
242
243                 if (credits == 0)
244                         credits = 1;    /* don't deadlock */
245                 else if (credits > rep->rr_buffer->rb_max_requests)
246                         credits = rep->rr_buffer->rb_max_requests;
247                 atomic_set(&rep->rr_buffer->rb_credits, credits);
248         }
249
250 out_schedule:
251         rpcrdma_schedule_tasklet(rep);
252 }
253
254 static int
255 rpcrdma_recvcq_poll(struct ib_cq *cq, struct rpcrdma_ep *ep)
256 {
257         struct ib_wc *wcs;
258         int budget, count, rc;
259
260         budget = RPCRDMA_WC_BUDGET / RPCRDMA_POLLSIZE;
261         do {
262                 wcs = ep->rep_recv_wcs;
263
264                 rc = ib_poll_cq(cq, RPCRDMA_POLLSIZE, wcs);
265                 if (rc <= 0)
266                         return rc;
267
268                 count = rc;
269                 while (count-- > 0)
270                         rpcrdma_recvcq_process_wc(wcs++);
271         } while (rc == RPCRDMA_POLLSIZE && --budget);
272         return 0;
273 }
274
275 /*
276  * Handle receive completions.
277  *
278  * It is reentrant but processes single events in order to maintain
279  * ordering of receives to keep server credits.
280  *
281  * It is the responsibility of the scheduled tasklet to return
282  * recv buffers to the pool. NOTE: this affects synchronization of
283  * connection shutdown. That is, the structures required for
284  * the completion of the reply handler must remain intact until
285  * all memory has been reclaimed.
286  */
287 static void
288 rpcrdma_recvcq_upcall(struct ib_cq *cq, void *cq_context)
289 {
290         struct rpcrdma_ep *ep = (struct rpcrdma_ep *)cq_context;
291         int rc;
292
293         rc = rpcrdma_recvcq_poll(cq, ep);
294         if (rc) {
295                 dprintk("RPC:       %s: ib_poll_cq failed: %i\n",
296                         __func__, rc);
297                 return;
298         }
299
300         rc = ib_req_notify_cq(cq,
301                         IB_CQ_NEXT_COMP | IB_CQ_REPORT_MISSED_EVENTS);
302         if (rc == 0)
303                 return;
304         if (rc < 0) {
305                 dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
306                         __func__, rc);
307                 return;
308         }
309
310         rpcrdma_recvcq_poll(cq, ep);
311 }
312
313 static void
314 rpcrdma_flush_cqs(struct rpcrdma_ep *ep)
315 {
316         rpcrdma_recvcq_upcall(ep->rep_attr.recv_cq, ep);
317         rpcrdma_sendcq_upcall(ep->rep_attr.send_cq, ep);
318 }
319
320 #ifdef RPC_DEBUG
321 static const char * const conn[] = {
322         "address resolved",
323         "address error",
324         "route resolved",
325         "route error",
326         "connect request",
327         "connect response",
328         "connect error",
329         "unreachable",
330         "rejected",
331         "established",
332         "disconnected",
333         "device removal"
334 };
335 #endif
336
337 static int
338 rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
339 {
340         struct rpcrdma_xprt *xprt = id->context;
341         struct rpcrdma_ia *ia = &xprt->rx_ia;
342         struct rpcrdma_ep *ep = &xprt->rx_ep;
343 #ifdef RPC_DEBUG
344         struct sockaddr_in *addr = (struct sockaddr_in *) &ep->rep_remote_addr;
345 #endif
346         struct ib_qp_attr attr;
347         struct ib_qp_init_attr iattr;
348         int connstate = 0;
349
350         switch (event->event) {
351         case RDMA_CM_EVENT_ADDR_RESOLVED:
352         case RDMA_CM_EVENT_ROUTE_RESOLVED:
353                 ia->ri_async_rc = 0;
354                 complete(&ia->ri_done);
355                 break;
356         case RDMA_CM_EVENT_ADDR_ERROR:
357                 ia->ri_async_rc = -EHOSTUNREACH;
358                 dprintk("RPC:       %s: CM address resolution error, ep 0x%p\n",
359                         __func__, ep);
360                 complete(&ia->ri_done);
361                 break;
362         case RDMA_CM_EVENT_ROUTE_ERROR:
363                 ia->ri_async_rc = -ENETUNREACH;
364                 dprintk("RPC:       %s: CM route resolution error, ep 0x%p\n",
365                         __func__, ep);
366                 complete(&ia->ri_done);
367                 break;
368         case RDMA_CM_EVENT_ESTABLISHED:
369                 connstate = 1;
370                 ib_query_qp(ia->ri_id->qp, &attr,
371                         IB_QP_MAX_QP_RD_ATOMIC | IB_QP_MAX_DEST_RD_ATOMIC,
372                         &iattr);
373                 dprintk("RPC:       %s: %d responder resources"
374                         " (%d initiator)\n",
375                         __func__, attr.max_dest_rd_atomic, attr.max_rd_atomic);
376                 goto connected;
377         case RDMA_CM_EVENT_CONNECT_ERROR:
378                 connstate = -ENOTCONN;
379                 goto connected;
380         case RDMA_CM_EVENT_UNREACHABLE:
381                 connstate = -ENETDOWN;
382                 goto connected;
383         case RDMA_CM_EVENT_REJECTED:
384                 connstate = -ECONNREFUSED;
385                 goto connected;
386         case RDMA_CM_EVENT_DISCONNECTED:
387                 connstate = -ECONNABORTED;
388                 goto connected;
389         case RDMA_CM_EVENT_DEVICE_REMOVAL:
390                 connstate = -ENODEV;
391 connected:
392                 dprintk("RPC:       %s: %s: %pI4:%u (ep 0x%p event 0x%x)\n",
393                         __func__,
394                         (event->event <= 11) ? conn[event->event] :
395                                                 "unknown connection error",
396                         &addr->sin_addr.s_addr,
397                         ntohs(addr->sin_port),
398                         ep, event->event);
399                 atomic_set(&rpcx_to_rdmax(ep->rep_xprt)->rx_buf.rb_credits, 1);
400                 dprintk("RPC:       %s: %sconnected\n",
401                                         __func__, connstate > 0 ? "" : "dis");
402                 ep->rep_connected = connstate;
403                 ep->rep_func(ep);
404                 wake_up_all(&ep->rep_connect_wait);
405                 break;
406         default:
407                 dprintk("RPC:       %s: unexpected CM event %d\n",
408                         __func__, event->event);
409                 break;
410         }
411
412 #ifdef RPC_DEBUG
413         if (connstate == 1) {
414                 int ird = attr.max_dest_rd_atomic;
415                 int tird = ep->rep_remote_cma.responder_resources;
416                 printk(KERN_INFO "rpcrdma: connection to %pI4:%u "
417                         "on %s, memreg %d slots %d ird %d%s\n",
418                         &addr->sin_addr.s_addr,
419                         ntohs(addr->sin_port),
420                         ia->ri_id->device->name,
421                         ia->ri_memreg_strategy,
422                         xprt->rx_buf.rb_max_requests,
423                         ird, ird < 4 && ird < tird / 2 ? " (low!)" : "");
424         } else if (connstate < 0) {
425                 printk(KERN_INFO "rpcrdma: connection to %pI4:%u closed (%d)\n",
426                         &addr->sin_addr.s_addr,
427                         ntohs(addr->sin_port),
428                         connstate);
429         }
430 #endif
431
432         return 0;
433 }
434
435 static struct rdma_cm_id *
436 rpcrdma_create_id(struct rpcrdma_xprt *xprt,
437                         struct rpcrdma_ia *ia, struct sockaddr *addr)
438 {
439         struct rdma_cm_id *id;
440         int rc;
441
442         init_completion(&ia->ri_done);
443
444         id = rdma_create_id(rpcrdma_conn_upcall, xprt, RDMA_PS_TCP, IB_QPT_RC);
445         if (IS_ERR(id)) {
446                 rc = PTR_ERR(id);
447                 dprintk("RPC:       %s: rdma_create_id() failed %i\n",
448                         __func__, rc);
449                 return id;
450         }
451
452         ia->ri_async_rc = -ETIMEDOUT;
453         rc = rdma_resolve_addr(id, NULL, addr, RDMA_RESOLVE_TIMEOUT);
454         if (rc) {
455                 dprintk("RPC:       %s: rdma_resolve_addr() failed %i\n",
456                         __func__, rc);
457                 goto out;
458         }
459         wait_for_completion_interruptible_timeout(&ia->ri_done,
460                                 msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1);
461         rc = ia->ri_async_rc;
462         if (rc)
463                 goto out;
464
465         ia->ri_async_rc = -ETIMEDOUT;
466         rc = rdma_resolve_route(id, RDMA_RESOLVE_TIMEOUT);
467         if (rc) {
468                 dprintk("RPC:       %s: rdma_resolve_route() failed %i\n",
469                         __func__, rc);
470                 goto out;
471         }
472         wait_for_completion_interruptible_timeout(&ia->ri_done,
473                                 msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1);
474         rc = ia->ri_async_rc;
475         if (rc)
476                 goto out;
477
478         return id;
479
480 out:
481         rdma_destroy_id(id);
482         return ERR_PTR(rc);
483 }
484
485 /*
486  * Drain any cq, prior to teardown.
487  */
488 static void
489 rpcrdma_clean_cq(struct ib_cq *cq)
490 {
491         struct ib_wc wc;
492         int count = 0;
493
494         while (1 == ib_poll_cq(cq, 1, &wc))
495                 ++count;
496
497         if (count)
498                 dprintk("RPC:       %s: flushed %d events (last 0x%x)\n",
499                         __func__, count, wc.opcode);
500 }
501
502 /*
503  * Exported functions.
504  */
505
506 /*
507  * Open and initialize an Interface Adapter.
508  *  o initializes fields of struct rpcrdma_ia, including
509  *    interface and provider attributes and protection zone.
510  */
511 int
512 rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)
513 {
514         int rc, mem_priv;
515         struct ib_device_attr devattr;
516         struct rpcrdma_ia *ia = &xprt->rx_ia;
517
518         ia->ri_id = rpcrdma_create_id(xprt, ia, addr);
519         if (IS_ERR(ia->ri_id)) {
520                 rc = PTR_ERR(ia->ri_id);
521                 goto out1;
522         }
523
524         ia->ri_pd = ib_alloc_pd(ia->ri_id->device);
525         if (IS_ERR(ia->ri_pd)) {
526                 rc = PTR_ERR(ia->ri_pd);
527                 dprintk("RPC:       %s: ib_alloc_pd() failed %i\n",
528                         __func__, rc);
529                 goto out2;
530         }
531
532         /*
533          * Query the device to determine if the requested memory
534          * registration strategy is supported. If it isn't, set the
535          * strategy to a globally supported model.
536          */
537         rc = ib_query_device(ia->ri_id->device, &devattr);
538         if (rc) {
539                 dprintk("RPC:       %s: ib_query_device failed %d\n",
540                         __func__, rc);
541                 goto out2;
542         }
543
544         if (devattr.device_cap_flags & IB_DEVICE_LOCAL_DMA_LKEY) {
545                 ia->ri_have_dma_lkey = 1;
546                 ia->ri_dma_lkey = ia->ri_id->device->local_dma_lkey;
547         }
548
549         if (memreg == RPCRDMA_FRMR) {
550                 /* Requires both frmr reg and local dma lkey */
551                 if ((devattr.device_cap_flags &
552                      (IB_DEVICE_MEM_MGT_EXTENSIONS|IB_DEVICE_LOCAL_DMA_LKEY)) !=
553                     (IB_DEVICE_MEM_MGT_EXTENSIONS|IB_DEVICE_LOCAL_DMA_LKEY)) {
554                         dprintk("RPC:       %s: FRMR registration "
555                                 "not supported by HCA\n", __func__);
556                         memreg = RPCRDMA_MTHCAFMR;
557                 } else {
558                         /* Mind the ia limit on FRMR page list depth */
559                         ia->ri_max_frmr_depth = min_t(unsigned int,
560                                 RPCRDMA_MAX_DATA_SEGS,
561                                 devattr.max_fast_reg_page_list_len);
562                 }
563         }
564         if (memreg == RPCRDMA_MTHCAFMR) {
565                 if (!ia->ri_id->device->alloc_fmr) {
566                         dprintk("RPC:       %s: MTHCAFMR registration "
567                                 "not supported by HCA\n", __func__);
568 #if RPCRDMA_PERSISTENT_REGISTRATION
569                         memreg = RPCRDMA_ALLPHYSICAL;
570 #else
571                         rc = -ENOMEM;
572                         goto out2;
573 #endif
574                 }
575         }
576
577         /*
578          * Optionally obtain an underlying physical identity mapping in
579          * order to do a memory window-based bind. This base registration
580          * is protected from remote access - that is enabled only by binding
581          * for the specific bytes targeted during each RPC operation, and
582          * revoked after the corresponding completion similar to a storage
583          * adapter.
584          */
585         switch (memreg) {
586         case RPCRDMA_FRMR:
587                 break;
588 #if RPCRDMA_PERSISTENT_REGISTRATION
589         case RPCRDMA_ALLPHYSICAL:
590                 mem_priv = IB_ACCESS_LOCAL_WRITE |
591                                 IB_ACCESS_REMOTE_WRITE |
592                                 IB_ACCESS_REMOTE_READ;
593                 goto register_setup;
594 #endif
595         case RPCRDMA_MTHCAFMR:
596                 if (ia->ri_have_dma_lkey)
597                         break;
598                 mem_priv = IB_ACCESS_LOCAL_WRITE;
599 #if RPCRDMA_PERSISTENT_REGISTRATION
600         register_setup:
601 #endif
602                 ia->ri_bind_mem = ib_get_dma_mr(ia->ri_pd, mem_priv);
603                 if (IS_ERR(ia->ri_bind_mem)) {
604                         printk(KERN_ALERT "%s: ib_get_dma_mr for "
605                                 "phys register failed with %lX\n",
606                                 __func__, PTR_ERR(ia->ri_bind_mem));
607                         rc = -ENOMEM;
608                         goto out2;
609                 }
610                 break;
611         default:
612                 printk(KERN_ERR "RPC: Unsupported memory "
613                                 "registration mode: %d\n", memreg);
614                 rc = -ENOMEM;
615                 goto out2;
616         }
617         dprintk("RPC:       %s: memory registration strategy is %d\n",
618                 __func__, memreg);
619
620         /* Else will do memory reg/dereg for each chunk */
621         ia->ri_memreg_strategy = memreg;
622
623         rwlock_init(&ia->ri_qplock);
624         return 0;
625 out2:
626         rdma_destroy_id(ia->ri_id);
627         ia->ri_id = NULL;
628 out1:
629         return rc;
630 }
631
632 /*
633  * Clean up/close an IA.
634  *   o if event handles and PD have been initialized, free them.
635  *   o close the IA
636  */
637 void
638 rpcrdma_ia_close(struct rpcrdma_ia *ia)
639 {
640         int rc;
641
642         dprintk("RPC:       %s: entering\n", __func__);
643         if (ia->ri_bind_mem != NULL) {
644                 rc = ib_dereg_mr(ia->ri_bind_mem);
645                 dprintk("RPC:       %s: ib_dereg_mr returned %i\n",
646                         __func__, rc);
647         }
648         if (ia->ri_id != NULL && !IS_ERR(ia->ri_id)) {
649                 if (ia->ri_id->qp)
650                         rdma_destroy_qp(ia->ri_id);
651                 rdma_destroy_id(ia->ri_id);
652                 ia->ri_id = NULL;
653         }
654         if (ia->ri_pd != NULL && !IS_ERR(ia->ri_pd)) {
655                 rc = ib_dealloc_pd(ia->ri_pd);
656                 dprintk("RPC:       %s: ib_dealloc_pd returned %i\n",
657                         __func__, rc);
658         }
659 }
660
661 /*
662  * Create unconnected endpoint.
663  */
664 int
665 rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
666                                 struct rpcrdma_create_data_internal *cdata)
667 {
668         struct ib_device_attr devattr;
669         struct ib_cq *sendcq, *recvcq;
670         int rc, err;
671
672         rc = ib_query_device(ia->ri_id->device, &devattr);
673         if (rc) {
674                 dprintk("RPC:       %s: ib_query_device failed %d\n",
675                         __func__, rc);
676                 return rc;
677         }
678
679         /* check provider's send/recv wr limits */
680         if (cdata->max_requests > devattr.max_qp_wr)
681                 cdata->max_requests = devattr.max_qp_wr;
682
683         ep->rep_attr.event_handler = rpcrdma_qp_async_error_upcall;
684         ep->rep_attr.qp_context = ep;
685         /* send_cq and recv_cq initialized below */
686         ep->rep_attr.srq = NULL;
687         ep->rep_attr.cap.max_send_wr = cdata->max_requests;
688         switch (ia->ri_memreg_strategy) {
689         case RPCRDMA_FRMR: {
690                 int depth = 7;
691
692                 /* Add room for frmr register and invalidate WRs.
693                  * 1. FRMR reg WR for head
694                  * 2. FRMR invalidate WR for head
695                  * 3. N FRMR reg WRs for pagelist
696                  * 4. N FRMR invalidate WRs for pagelist
697                  * 5. FRMR reg WR for tail
698                  * 6. FRMR invalidate WR for tail
699                  * 7. The RDMA_SEND WR
700                  */
701
702                 /* Calculate N if the device max FRMR depth is smaller than
703                  * RPCRDMA_MAX_DATA_SEGS.
704                  */
705                 if (ia->ri_max_frmr_depth < RPCRDMA_MAX_DATA_SEGS) {
706                         int delta = RPCRDMA_MAX_DATA_SEGS -
707                                     ia->ri_max_frmr_depth;
708
709                         do {
710                                 depth += 2; /* FRMR reg + invalidate */
711                                 delta -= ia->ri_max_frmr_depth;
712                         } while (delta > 0);
713
714                 }
715                 ep->rep_attr.cap.max_send_wr *= depth;
716                 if (ep->rep_attr.cap.max_send_wr > devattr.max_qp_wr) {
717                         cdata->max_requests = devattr.max_qp_wr / depth;
718                         if (!cdata->max_requests)
719                                 return -EINVAL;
720                         ep->rep_attr.cap.max_send_wr = cdata->max_requests *
721                                                        depth;
722                 }
723                 break;
724         }
725         default:
726                 break;
727         }
728         ep->rep_attr.cap.max_recv_wr = cdata->max_requests;
729         ep->rep_attr.cap.max_send_sge = (cdata->padding ? 4 : 2);
730         ep->rep_attr.cap.max_recv_sge = 1;
731         ep->rep_attr.cap.max_inline_data = 0;
732         ep->rep_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
733         ep->rep_attr.qp_type = IB_QPT_RC;
734         ep->rep_attr.port_num = ~0;
735
736         dprintk("RPC:       %s: requested max: dtos: send %d recv %d; "
737                 "iovs: send %d recv %d\n",
738                 __func__,
739                 ep->rep_attr.cap.max_send_wr,
740                 ep->rep_attr.cap.max_recv_wr,
741                 ep->rep_attr.cap.max_send_sge,
742                 ep->rep_attr.cap.max_recv_sge);
743
744         /* set trigger for requesting send completion */
745         ep->rep_cqinit = ep->rep_attr.cap.max_send_wr/2 - 1;
746         if (ep->rep_cqinit <= 2)
747                 ep->rep_cqinit = 0;
748         INIT_CQCOUNT(ep);
749         ep->rep_ia = ia;
750         init_waitqueue_head(&ep->rep_connect_wait);
751         INIT_DELAYED_WORK(&ep->rep_connect_worker, rpcrdma_connect_worker);
752
753         sendcq = ib_create_cq(ia->ri_id->device, rpcrdma_sendcq_upcall,
754                                   rpcrdma_cq_async_error_upcall, ep,
755                                   ep->rep_attr.cap.max_send_wr + 1, 0);
756         if (IS_ERR(sendcq)) {
757                 rc = PTR_ERR(sendcq);
758                 dprintk("RPC:       %s: failed to create send CQ: %i\n",
759                         __func__, rc);
760                 goto out1;
761         }
762
763         rc = ib_req_notify_cq(sendcq, IB_CQ_NEXT_COMP);
764         if (rc) {
765                 dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
766                         __func__, rc);
767                 goto out2;
768         }
769
770         recvcq = ib_create_cq(ia->ri_id->device, rpcrdma_recvcq_upcall,
771                                   rpcrdma_cq_async_error_upcall, ep,
772                                   ep->rep_attr.cap.max_recv_wr + 1, 0);
773         if (IS_ERR(recvcq)) {
774                 rc = PTR_ERR(recvcq);
775                 dprintk("RPC:       %s: failed to create recv CQ: %i\n",
776                         __func__, rc);
777                 goto out2;
778         }
779
780         rc = ib_req_notify_cq(recvcq, IB_CQ_NEXT_COMP);
781         if (rc) {
782                 dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
783                         __func__, rc);
784                 ib_destroy_cq(recvcq);
785                 goto out2;
786         }
787
788         ep->rep_attr.send_cq = sendcq;
789         ep->rep_attr.recv_cq = recvcq;
790
791         /* Initialize cma parameters */
792
793         /* RPC/RDMA does not use private data */
794         ep->rep_remote_cma.private_data = NULL;
795         ep->rep_remote_cma.private_data_len = 0;
796
797         /* Client offers RDMA Read but does not initiate */
798         ep->rep_remote_cma.initiator_depth = 0;
799         if (devattr.max_qp_rd_atom > 32)        /* arbitrary but <= 255 */
800                 ep->rep_remote_cma.responder_resources = 32;
801         else
802                 ep->rep_remote_cma.responder_resources = devattr.max_qp_rd_atom;
803
804         ep->rep_remote_cma.retry_count = 7;
805         ep->rep_remote_cma.flow_control = 0;
806         ep->rep_remote_cma.rnr_retry_count = 0;
807
808         return 0;
809
810 out2:
811         err = ib_destroy_cq(sendcq);
812         if (err)
813                 dprintk("RPC:       %s: ib_destroy_cq returned %i\n",
814                         __func__, err);
815 out1:
816         return rc;
817 }
818
819 /*
820  * rpcrdma_ep_destroy
821  *
822  * Disconnect and destroy endpoint. After this, the only
823  * valid operations on the ep are to free it (if dynamically
824  * allocated) or re-create it.
825  */
826 void
827 rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
828 {
829         int rc;
830
831         dprintk("RPC:       %s: entering, connected is %d\n",
832                 __func__, ep->rep_connected);
833
834         cancel_delayed_work_sync(&ep->rep_connect_worker);
835
836         if (ia->ri_id->qp) {
837                 rc = rpcrdma_ep_disconnect(ep, ia);
838                 if (rc)
839                         dprintk("RPC:       %s: rpcrdma_ep_disconnect"
840                                 " returned %i\n", __func__, rc);
841                 rdma_destroy_qp(ia->ri_id);
842                 ia->ri_id->qp = NULL;
843         }
844
845         /* padding - could be done in rpcrdma_buffer_destroy... */
846         if (ep->rep_pad_mr) {
847                 rpcrdma_deregister_internal(ia, ep->rep_pad_mr, &ep->rep_pad);
848                 ep->rep_pad_mr = NULL;
849         }
850
851         rpcrdma_clean_cq(ep->rep_attr.recv_cq);
852         rc = ib_destroy_cq(ep->rep_attr.recv_cq);
853         if (rc)
854                 dprintk("RPC:       %s: ib_destroy_cq returned %i\n",
855                         __func__, rc);
856
857         rpcrdma_clean_cq(ep->rep_attr.send_cq);
858         rc = ib_destroy_cq(ep->rep_attr.send_cq);
859         if (rc)
860                 dprintk("RPC:       %s: ib_destroy_cq returned %i\n",
861                         __func__, rc);
862 }
863
864 /*
865  * Connect unconnected endpoint.
866  */
867 int
868 rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
869 {
870         struct rdma_cm_id *id, *old;
871         int rc = 0;
872         int retry_count = 0;
873
874         if (ep->rep_connected != 0) {
875                 struct rpcrdma_xprt *xprt;
876 retry:
877                 dprintk("RPC:       %s: reconnecting...\n", __func__);
878                 rc = rpcrdma_ep_disconnect(ep, ia);
879                 if (rc && rc != -ENOTCONN)
880                         dprintk("RPC:       %s: rpcrdma_ep_disconnect"
881                                 " status %i\n", __func__, rc);
882                 rpcrdma_flush_cqs(ep);
883
884                 xprt = container_of(ia, struct rpcrdma_xprt, rx_ia);
885                 id = rpcrdma_create_id(xprt, ia,
886                                 (struct sockaddr *)&xprt->rx_data.addr);
887                 if (IS_ERR(id)) {
888                         rc = -EHOSTUNREACH;
889                         goto out;
890                 }
891                 /* TEMP TEMP TEMP - fail if new device:
892                  * Deregister/remarshal *all* requests!
893                  * Close and recreate adapter, pd, etc!
894                  * Re-determine all attributes still sane!
895                  * More stuff I haven't thought of!
896                  * Rrrgh!
897                  */
898                 if (ia->ri_id->device != id->device) {
899                         printk("RPC:       %s: can't reconnect on "
900                                 "different device!\n", __func__);
901                         rdma_destroy_id(id);
902                         rc = -ENETUNREACH;
903                         goto out;
904                 }
905                 /* END TEMP */
906                 rc = rdma_create_qp(id, ia->ri_pd, &ep->rep_attr);
907                 if (rc) {
908                         dprintk("RPC:       %s: rdma_create_qp failed %i\n",
909                                 __func__, rc);
910                         rdma_destroy_id(id);
911                         rc = -ENETUNREACH;
912                         goto out;
913                 }
914
915                 write_lock(&ia->ri_qplock);
916                 old = ia->ri_id;
917                 ia->ri_id = id;
918                 write_unlock(&ia->ri_qplock);
919
920                 rdma_destroy_qp(old);
921                 rdma_destroy_id(old);
922         } else {
923                 dprintk("RPC:       %s: connecting...\n", __func__);
924                 rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr);
925                 if (rc) {
926                         dprintk("RPC:       %s: rdma_create_qp failed %i\n",
927                                 __func__, rc);
928                         /* do not update ep->rep_connected */
929                         return -ENETUNREACH;
930                 }
931         }
932
933         ep->rep_connected = 0;
934
935         rc = rdma_connect(ia->ri_id, &ep->rep_remote_cma);
936         if (rc) {
937                 dprintk("RPC:       %s: rdma_connect() failed with %i\n",
938                                 __func__, rc);
939                 goto out;
940         }
941
942         wait_event_interruptible(ep->rep_connect_wait, ep->rep_connected != 0);
943
944         /*
945          * Check state. A non-peer reject indicates no listener
946          * (ECONNREFUSED), which may be a transient state. All
947          * others indicate a transport condition which has already
948          * undergone a best-effort.
949          */
950         if (ep->rep_connected == -ECONNREFUSED &&
951             ++retry_count <= RDMA_CONNECT_RETRY_MAX) {
952                 dprintk("RPC:       %s: non-peer_reject, retry\n", __func__);
953                 goto retry;
954         }
955         if (ep->rep_connected <= 0) {
956                 /* Sometimes, the only way to reliably connect to remote
957                  * CMs is to use same nonzero values for ORD and IRD. */
958                 if (retry_count++ <= RDMA_CONNECT_RETRY_MAX + 1 &&
959                     (ep->rep_remote_cma.responder_resources == 0 ||
960                      ep->rep_remote_cma.initiator_depth !=
961                                 ep->rep_remote_cma.responder_resources)) {
962                         if (ep->rep_remote_cma.responder_resources == 0)
963                                 ep->rep_remote_cma.responder_resources = 1;
964                         ep->rep_remote_cma.initiator_depth =
965                                 ep->rep_remote_cma.responder_resources;
966                         goto retry;
967                 }
968                 rc = ep->rep_connected;
969         } else {
970                 dprintk("RPC:       %s: connected\n", __func__);
971         }
972
973 out:
974         if (rc)
975                 ep->rep_connected = rc;
976         return rc;
977 }
978
979 /*
980  * rpcrdma_ep_disconnect
981  *
982  * This is separate from destroy to facilitate the ability
983  * to reconnect without recreating the endpoint.
984  *
985  * This call is not reentrant, and must not be made in parallel
986  * on the same endpoint.
987  */
988 int
989 rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
990 {
991         int rc;
992
993         rpcrdma_flush_cqs(ep);
994         rc = rdma_disconnect(ia->ri_id);
995         if (!rc) {
996                 /* returns without wait if not connected */
997                 wait_event_interruptible(ep->rep_connect_wait,
998                                                         ep->rep_connected != 1);
999                 dprintk("RPC:       %s: after wait, %sconnected\n", __func__,
1000                         (ep->rep_connected == 1) ? "still " : "dis");
1001         } else {
1002                 dprintk("RPC:       %s: rdma_disconnect %i\n", __func__, rc);
1003                 ep->rep_connected = rc;
1004         }
1005         return rc;
1006 }
1007
1008 /*
1009  * Initialize buffer memory
1010  */
1011 int
1012 rpcrdma_buffer_create(struct rpcrdma_buffer *buf, struct rpcrdma_ep *ep,
1013         struct rpcrdma_ia *ia, struct rpcrdma_create_data_internal *cdata)
1014 {
1015         char *p;
1016         size_t len, rlen, wlen;
1017         int i, rc;
1018         struct rpcrdma_mw *r;
1019
1020         buf->rb_max_requests = cdata->max_requests;
1021         spin_lock_init(&buf->rb_lock);
1022         atomic_set(&buf->rb_credits, 1);
1023
1024         /* Need to allocate:
1025          *   1.  arrays for send and recv pointers
1026          *   2.  arrays of struct rpcrdma_req to fill in pointers
1027          *   3.  array of struct rpcrdma_rep for replies
1028          *   4.  padding, if any
1029          *   5.  mw's, fmr's or frmr's, if any
1030          * Send/recv buffers in req/rep need to be registered
1031          */
1032
1033         len = buf->rb_max_requests *
1034                 (sizeof(struct rpcrdma_req *) + sizeof(struct rpcrdma_rep *));
1035         len += cdata->padding;
1036         switch (ia->ri_memreg_strategy) {
1037         case RPCRDMA_FRMR:
1038                 len += buf->rb_max_requests * RPCRDMA_MAX_SEGS *
1039                                 sizeof(struct rpcrdma_mw);
1040                 break;
1041         case RPCRDMA_MTHCAFMR:
1042                 /* TBD we are perhaps overallocating here */
1043                 len += (buf->rb_max_requests + 1) * RPCRDMA_MAX_SEGS *
1044                                 sizeof(struct rpcrdma_mw);
1045                 break;
1046         default:
1047                 break;
1048         }
1049
1050         /* allocate 1, 4 and 5 in one shot */
1051         p = kzalloc(len, GFP_KERNEL);
1052         if (p == NULL) {
1053                 dprintk("RPC:       %s: req_t/rep_t/pad kzalloc(%zd) failed\n",
1054                         __func__, len);
1055                 rc = -ENOMEM;
1056                 goto out;
1057         }
1058         buf->rb_pool = p;       /* for freeing it later */
1059
1060         buf->rb_send_bufs = (struct rpcrdma_req **) p;
1061         p = (char *) &buf->rb_send_bufs[buf->rb_max_requests];
1062         buf->rb_recv_bufs = (struct rpcrdma_rep **) p;
1063         p = (char *) &buf->rb_recv_bufs[buf->rb_max_requests];
1064
1065         /*
1066          * Register the zeroed pad buffer, if any.
1067          */
1068         if (cdata->padding) {
1069                 rc = rpcrdma_register_internal(ia, p, cdata->padding,
1070                                             &ep->rep_pad_mr, &ep->rep_pad);
1071                 if (rc)
1072                         goto out;
1073         }
1074         p += cdata->padding;
1075
1076         INIT_LIST_HEAD(&buf->rb_mws);
1077         INIT_LIST_HEAD(&buf->rb_all);
1078         r = (struct rpcrdma_mw *)p;
1079         switch (ia->ri_memreg_strategy) {
1080         case RPCRDMA_FRMR:
1081                 for (i = buf->rb_max_requests * RPCRDMA_MAX_SEGS; i; i--) {
1082                         r->r.frmr.fr_mr = ib_alloc_fast_reg_mr(ia->ri_pd,
1083                                                 ia->ri_max_frmr_depth);
1084                         if (IS_ERR(r->r.frmr.fr_mr)) {
1085                                 rc = PTR_ERR(r->r.frmr.fr_mr);
1086                                 dprintk("RPC:       %s: ib_alloc_fast_reg_mr"
1087                                         " failed %i\n", __func__, rc);
1088                                 goto out;
1089                         }
1090                         r->r.frmr.fr_pgl = ib_alloc_fast_reg_page_list(
1091                                                 ia->ri_id->device,
1092                                                 ia->ri_max_frmr_depth);
1093                         if (IS_ERR(r->r.frmr.fr_pgl)) {
1094                                 rc = PTR_ERR(r->r.frmr.fr_pgl);
1095                                 dprintk("RPC:       %s: "
1096                                         "ib_alloc_fast_reg_page_list "
1097                                         "failed %i\n", __func__, rc);
1098
1099                                 ib_dereg_mr(r->r.frmr.fr_mr);
1100                                 goto out;
1101                         }
1102                         list_add(&r->mw_all, &buf->rb_all);
1103                         list_add(&r->mw_list, &buf->rb_mws);
1104                         ++r;
1105                 }
1106                 break;
1107         case RPCRDMA_MTHCAFMR:
1108                 /* TBD we are perhaps overallocating here */
1109                 for (i = (buf->rb_max_requests+1) * RPCRDMA_MAX_SEGS; i; i--) {
1110                         static struct ib_fmr_attr fa =
1111                                 { RPCRDMA_MAX_DATA_SEGS, 1, PAGE_SHIFT };
1112                         r->r.fmr = ib_alloc_fmr(ia->ri_pd,
1113                                 IB_ACCESS_REMOTE_WRITE | IB_ACCESS_REMOTE_READ,
1114                                 &fa);
1115                         if (IS_ERR(r->r.fmr)) {
1116                                 rc = PTR_ERR(r->r.fmr);
1117                                 dprintk("RPC:       %s: ib_alloc_fmr"
1118                                         " failed %i\n", __func__, rc);
1119                                 goto out;
1120                         }
1121                         list_add(&r->mw_all, &buf->rb_all);
1122                         list_add(&r->mw_list, &buf->rb_mws);
1123                         ++r;
1124                 }
1125                 break;
1126         default:
1127                 break;
1128         }
1129
1130         /*
1131          * Allocate/init the request/reply buffers. Doing this
1132          * using kmalloc for now -- one for each buf.
1133          */
1134         wlen = 1 << fls(cdata->inline_wsize + sizeof(struct rpcrdma_req));
1135         rlen = 1 << fls(cdata->inline_rsize + sizeof(struct rpcrdma_rep));
1136         dprintk("RPC:       %s: wlen = %zu, rlen = %zu\n",
1137                 __func__, wlen, rlen);
1138
1139         for (i = 0; i < buf->rb_max_requests; i++) {
1140                 struct rpcrdma_req *req;
1141                 struct rpcrdma_rep *rep;
1142
1143                 req = kmalloc(wlen, GFP_KERNEL);
1144                 if (req == NULL) {
1145                         dprintk("RPC:       %s: request buffer %d alloc"
1146                                 " failed\n", __func__, i);
1147                         rc = -ENOMEM;
1148                         goto out;
1149                 }
1150                 memset(req, 0, sizeof(struct rpcrdma_req));
1151                 buf->rb_send_bufs[i] = req;
1152                 buf->rb_send_bufs[i]->rl_buffer = buf;
1153
1154                 rc = rpcrdma_register_internal(ia, req->rl_base,
1155                                 wlen - offsetof(struct rpcrdma_req, rl_base),
1156                                 &buf->rb_send_bufs[i]->rl_handle,
1157                                 &buf->rb_send_bufs[i]->rl_iov);
1158                 if (rc)
1159                         goto out;
1160
1161                 buf->rb_send_bufs[i]->rl_size = wlen -
1162                                                 sizeof(struct rpcrdma_req);
1163
1164                 rep = kmalloc(rlen, GFP_KERNEL);
1165                 if (rep == NULL) {
1166                         dprintk("RPC:       %s: reply buffer %d alloc failed\n",
1167                                 __func__, i);
1168                         rc = -ENOMEM;
1169                         goto out;
1170                 }
1171                 memset(rep, 0, sizeof(struct rpcrdma_rep));
1172                 buf->rb_recv_bufs[i] = rep;
1173                 buf->rb_recv_bufs[i]->rr_buffer = buf;
1174
1175                 rc = rpcrdma_register_internal(ia, rep->rr_base,
1176                                 rlen - offsetof(struct rpcrdma_rep, rr_base),
1177                                 &buf->rb_recv_bufs[i]->rr_handle,
1178                                 &buf->rb_recv_bufs[i]->rr_iov);
1179                 if (rc)
1180                         goto out;
1181
1182         }
1183         dprintk("RPC:       %s: max_requests %d\n",
1184                 __func__, buf->rb_max_requests);
1185         /* done */
1186         return 0;
1187 out:
1188         rpcrdma_buffer_destroy(buf);
1189         return rc;
1190 }
1191
1192 /*
1193  * Unregister and destroy buffer memory. Need to deal with
1194  * partial initialization, so it's callable from failed create.
1195  * Must be called before destroying endpoint, as registrations
1196  * reference it.
1197  */
1198 void
1199 rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
1200 {
1201         int rc, i;
1202         struct rpcrdma_ia *ia = rdmab_to_ia(buf);
1203         struct rpcrdma_mw *r;
1204
1205         /* clean up in reverse order from create
1206          *   1.  recv mr memory (mr free, then kfree)
1207          *   2.  send mr memory (mr free, then kfree)
1208          *   3.  padding (if any) [moved to rpcrdma_ep_destroy]
1209          *   4.  arrays
1210          */
1211         dprintk("RPC:       %s: entering\n", __func__);
1212
1213         for (i = 0; i < buf->rb_max_requests; i++) {
1214                 if (buf->rb_recv_bufs && buf->rb_recv_bufs[i]) {
1215                         rpcrdma_deregister_internal(ia,
1216                                         buf->rb_recv_bufs[i]->rr_handle,
1217                                         &buf->rb_recv_bufs[i]->rr_iov);
1218                         kfree(buf->rb_recv_bufs[i]);
1219                 }
1220                 if (buf->rb_send_bufs && buf->rb_send_bufs[i]) {
1221                         rpcrdma_deregister_internal(ia,
1222                                         buf->rb_send_bufs[i]->rl_handle,
1223                                         &buf->rb_send_bufs[i]->rl_iov);
1224                         kfree(buf->rb_send_bufs[i]);
1225                 }
1226         }
1227
1228         while (!list_empty(&buf->rb_mws)) {
1229                 r = list_entry(buf->rb_mws.next,
1230                         struct rpcrdma_mw, mw_list);
1231                 list_del(&r->mw_all);
1232                 list_del(&r->mw_list);
1233                 switch (ia->ri_memreg_strategy) {
1234                 case RPCRDMA_FRMR:
1235                         rc = ib_dereg_mr(r->r.frmr.fr_mr);
1236                         if (rc)
1237                                 dprintk("RPC:       %s:"
1238                                         " ib_dereg_mr"
1239                                         " failed %i\n",
1240                                         __func__, rc);
1241                         ib_free_fast_reg_page_list(r->r.frmr.fr_pgl);
1242                         break;
1243                 case RPCRDMA_MTHCAFMR:
1244                         rc = ib_dealloc_fmr(r->r.fmr);
1245                         if (rc)
1246                                 dprintk("RPC:       %s:"
1247                                         " ib_dealloc_fmr"
1248                                         " failed %i\n",
1249                                         __func__, rc);
1250                         break;
1251                 default:
1252                         break;
1253                 }
1254         }
1255
1256         kfree(buf->rb_pool);
1257 }
1258
1259 /* "*mw" can be NULL when rpcrdma_buffer_get_mrs() fails, leaving
1260  * some req segments uninitialized.
1261  */
1262 static void
1263 rpcrdma_buffer_put_mr(struct rpcrdma_mw **mw, struct rpcrdma_buffer *buf)
1264 {
1265         if (*mw) {
1266                 list_add_tail(&(*mw)->mw_list, &buf->rb_mws);
1267                 *mw = NULL;
1268         }
1269 }
1270
1271 /* Cycle mw's back in reverse order, and "spin" them.
1272  * This delays and scrambles reuse as much as possible.
1273  */
1274 static void
1275 rpcrdma_buffer_put_mrs(struct rpcrdma_req *req, struct rpcrdma_buffer *buf)
1276 {
1277         struct rpcrdma_mr_seg *seg = req->rl_segments;
1278         struct rpcrdma_mr_seg *seg1 = seg;
1279         int i;
1280
1281         for (i = 1, seg++; i < RPCRDMA_MAX_SEGS; seg++, i++)
1282                 rpcrdma_buffer_put_mr(&seg->mr_chunk.rl_mw, buf);
1283         rpcrdma_buffer_put_mr(&seg1->mr_chunk.rl_mw, buf);
1284 }
1285
1286 static void
1287 rpcrdma_buffer_put_sendbuf(struct rpcrdma_req *req, struct rpcrdma_buffer *buf)
1288 {
1289         buf->rb_send_bufs[--buf->rb_send_index] = req;
1290         req->rl_niovs = 0;
1291         if (req->rl_reply) {
1292                 buf->rb_recv_bufs[--buf->rb_recv_index] = req->rl_reply;
1293                 req->rl_reply->rr_func = NULL;
1294                 req->rl_reply = NULL;
1295         }
1296 }
1297
1298 static struct rpcrdma_req *
1299 rpcrdma_buffer_get_mrs(struct rpcrdma_req *req, struct rpcrdma_buffer *buf)
1300 {
1301         struct rpcrdma_mw *r;
1302         int i;
1303
1304         i = RPCRDMA_MAX_SEGS - 1;
1305         while (!list_empty(&buf->rb_mws)) {
1306                 r = list_entry(buf->rb_mws.next,
1307                                struct rpcrdma_mw, mw_list);
1308                 list_del(&r->mw_list);
1309                 req->rl_segments[i].mr_chunk.rl_mw = r;
1310                 if (unlikely(i-- == 0))
1311                         return req;     /* Success */
1312         }
1313
1314         /* Not enough entries on rb_mws for this req */
1315         rpcrdma_buffer_put_sendbuf(req, buf);
1316         rpcrdma_buffer_put_mrs(req, buf);
1317         return NULL;
1318 }
1319
1320 /*
1321  * Get a set of request/reply buffers.
1322  *
1323  * Reply buffer (if needed) is attached to send buffer upon return.
1324  * Rule:
1325  *    rb_send_index and rb_recv_index MUST always be pointing to the
1326  *    *next* available buffer (non-NULL). They are incremented after
1327  *    removing buffers, and decremented *before* returning them.
1328  */
1329 struct rpcrdma_req *
1330 rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
1331 {
1332         struct rpcrdma_ia *ia = rdmab_to_ia(buffers);
1333         struct rpcrdma_req *req;
1334         unsigned long flags;
1335
1336         spin_lock_irqsave(&buffers->rb_lock, flags);
1337         if (buffers->rb_send_index == buffers->rb_max_requests) {
1338                 spin_unlock_irqrestore(&buffers->rb_lock, flags);
1339                 dprintk("RPC:       %s: out of request buffers\n", __func__);
1340                 return ((struct rpcrdma_req *)NULL);
1341         }
1342
1343         req = buffers->rb_send_bufs[buffers->rb_send_index];
1344         if (buffers->rb_send_index < buffers->rb_recv_index) {
1345                 dprintk("RPC:       %s: %d extra receives outstanding (ok)\n",
1346                         __func__,
1347                         buffers->rb_recv_index - buffers->rb_send_index);
1348                 req->rl_reply = NULL;
1349         } else {
1350                 req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
1351                 buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
1352         }
1353         buffers->rb_send_bufs[buffers->rb_send_index++] = NULL;
1354         switch (ia->ri_memreg_strategy) {
1355         case RPCRDMA_FRMR:
1356         case RPCRDMA_MTHCAFMR:
1357                 req = rpcrdma_buffer_get_mrs(req, buffers);
1358                 break;
1359         default:
1360                 break;
1361         }
1362         spin_unlock_irqrestore(&buffers->rb_lock, flags);
1363         return req;
1364 }
1365
1366 /*
1367  * Put request/reply buffers back into pool.
1368  * Pre-decrement counter/array index.
1369  */
1370 void
1371 rpcrdma_buffer_put(struct rpcrdma_req *req)
1372 {
1373         struct rpcrdma_buffer *buffers = req->rl_buffer;
1374         struct rpcrdma_ia *ia = rdmab_to_ia(buffers);
1375         unsigned long flags;
1376
1377         spin_lock_irqsave(&buffers->rb_lock, flags);
1378         rpcrdma_buffer_put_sendbuf(req, buffers);
1379         switch (ia->ri_memreg_strategy) {
1380         case RPCRDMA_FRMR:
1381         case RPCRDMA_MTHCAFMR:
1382                 rpcrdma_buffer_put_mrs(req, buffers);
1383                 break;
1384         default:
1385                 break;
1386         }
1387         spin_unlock_irqrestore(&buffers->rb_lock, flags);
1388 }
1389
1390 /*
1391  * Recover reply buffers from pool.
1392  * This happens when recovering from error conditions.
1393  * Post-increment counter/array index.
1394  */
1395 void
1396 rpcrdma_recv_buffer_get(struct rpcrdma_req *req)
1397 {
1398         struct rpcrdma_buffer *buffers = req->rl_buffer;
1399         unsigned long flags;
1400
1401         if (req->rl_iov.length == 0)    /* special case xprt_rdma_allocate() */
1402                 buffers = ((struct rpcrdma_req *) buffers)->rl_buffer;
1403         spin_lock_irqsave(&buffers->rb_lock, flags);
1404         if (buffers->rb_recv_index < buffers->rb_max_requests) {
1405                 req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
1406                 buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
1407         }
1408         spin_unlock_irqrestore(&buffers->rb_lock, flags);
1409 }
1410
1411 /*
1412  * Put reply buffers back into pool when not attached to
1413  * request. This happens in error conditions.
1414  */
1415 void
1416 rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
1417 {
1418         struct rpcrdma_buffer *buffers = rep->rr_buffer;
1419         unsigned long flags;
1420
1421         rep->rr_func = NULL;
1422         spin_lock_irqsave(&buffers->rb_lock, flags);
1423         buffers->rb_recv_bufs[--buffers->rb_recv_index] = rep;
1424         spin_unlock_irqrestore(&buffers->rb_lock, flags);
1425 }
1426
1427 /*
1428  * Wrappers for internal-use kmalloc memory registration, used by buffer code.
1429  */
1430
1431 int
1432 rpcrdma_register_internal(struct rpcrdma_ia *ia, void *va, int len,
1433                                 struct ib_mr **mrp, struct ib_sge *iov)
1434 {
1435         struct ib_phys_buf ipb;
1436         struct ib_mr *mr;
1437         int rc;
1438
1439         /*
1440          * All memory passed here was kmalloc'ed, therefore phys-contiguous.
1441          */
1442         iov->addr = ib_dma_map_single(ia->ri_id->device,
1443                         va, len, DMA_BIDIRECTIONAL);
1444         if (ib_dma_mapping_error(ia->ri_id->device, iov->addr))
1445                 return -ENOMEM;
1446
1447         iov->length = len;
1448
1449         if (ia->ri_have_dma_lkey) {
1450                 *mrp = NULL;
1451                 iov->lkey = ia->ri_dma_lkey;
1452                 return 0;
1453         } else if (ia->ri_bind_mem != NULL) {
1454                 *mrp = NULL;
1455                 iov->lkey = ia->ri_bind_mem->lkey;
1456                 return 0;
1457         }
1458
1459         ipb.addr = iov->addr;
1460         ipb.size = iov->length;
1461         mr = ib_reg_phys_mr(ia->ri_pd, &ipb, 1,
1462                         IB_ACCESS_LOCAL_WRITE, &iov->addr);
1463
1464         dprintk("RPC:       %s: phys convert: 0x%llx "
1465                         "registered 0x%llx length %d\n",
1466                         __func__, (unsigned long long)ipb.addr,
1467                         (unsigned long long)iov->addr, len);
1468
1469         if (IS_ERR(mr)) {
1470                 *mrp = NULL;
1471                 rc = PTR_ERR(mr);
1472                 dprintk("RPC:       %s: failed with %i\n", __func__, rc);
1473         } else {
1474                 *mrp = mr;
1475                 iov->lkey = mr->lkey;
1476                 rc = 0;
1477         }
1478
1479         return rc;
1480 }
1481
1482 int
1483 rpcrdma_deregister_internal(struct rpcrdma_ia *ia,
1484                                 struct ib_mr *mr, struct ib_sge *iov)
1485 {
1486         int rc;
1487
1488         ib_dma_unmap_single(ia->ri_id->device,
1489                         iov->addr, iov->length, DMA_BIDIRECTIONAL);
1490
1491         if (NULL == mr)
1492                 return 0;
1493
1494         rc = ib_dereg_mr(mr);
1495         if (rc)
1496                 dprintk("RPC:       %s: ib_dereg_mr failed %i\n", __func__, rc);
1497         return rc;
1498 }
1499
1500 /*
1501  * Wrappers for chunk registration, shared by read/write chunk code.
1502  */
1503
1504 static void
1505 rpcrdma_map_one(struct rpcrdma_ia *ia, struct rpcrdma_mr_seg *seg, int writing)
1506 {
1507         seg->mr_dir = writing ? DMA_FROM_DEVICE : DMA_TO_DEVICE;
1508         seg->mr_dmalen = seg->mr_len;
1509         if (seg->mr_page)
1510                 seg->mr_dma = ib_dma_map_page(ia->ri_id->device,
1511                                 seg->mr_page, offset_in_page(seg->mr_offset),
1512                                 seg->mr_dmalen, seg->mr_dir);
1513         else
1514                 seg->mr_dma = ib_dma_map_single(ia->ri_id->device,
1515                                 seg->mr_offset,
1516                                 seg->mr_dmalen, seg->mr_dir);
1517         if (ib_dma_mapping_error(ia->ri_id->device, seg->mr_dma)) {
1518                 dprintk("RPC:       %s: mr_dma %llx mr_offset %p mr_dma_len %zu\n",
1519                         __func__,
1520                         (unsigned long long)seg->mr_dma,
1521                         seg->mr_offset, seg->mr_dmalen);
1522         }
1523 }
1524
1525 static void
1526 rpcrdma_unmap_one(struct rpcrdma_ia *ia, struct rpcrdma_mr_seg *seg)
1527 {
1528         if (seg->mr_page)
1529                 ib_dma_unmap_page(ia->ri_id->device,
1530                                 seg->mr_dma, seg->mr_dmalen, seg->mr_dir);
1531         else
1532                 ib_dma_unmap_single(ia->ri_id->device,
1533                                 seg->mr_dma, seg->mr_dmalen, seg->mr_dir);
1534 }
1535
1536 static int
1537 rpcrdma_register_frmr_external(struct rpcrdma_mr_seg *seg,
1538                         int *nsegs, int writing, struct rpcrdma_ia *ia,
1539                         struct rpcrdma_xprt *r_xprt)
1540 {
1541         struct rpcrdma_mr_seg *seg1 = seg;
1542         struct rpcrdma_mw *mw = seg1->mr_chunk.rl_mw;
1543         struct rpcrdma_frmr *frmr = &mw->r.frmr;
1544         struct ib_mr *mr = frmr->fr_mr;
1545         struct ib_send_wr invalidate_wr, frmr_wr, *bad_wr, *post_wr;
1546
1547         u8 key;
1548         int len, pageoff;
1549         int i, rc;
1550         int seg_len;
1551         u64 pa;
1552         int page_no;
1553
1554         pageoff = offset_in_page(seg1->mr_offset);
1555         seg1->mr_offset -= pageoff;     /* start of page */
1556         seg1->mr_len += pageoff;
1557         len = -pageoff;
1558         if (*nsegs > ia->ri_max_frmr_depth)
1559                 *nsegs = ia->ri_max_frmr_depth;
1560         for (page_no = i = 0; i < *nsegs;) {
1561                 rpcrdma_map_one(ia, seg, writing);
1562                 pa = seg->mr_dma;
1563                 for (seg_len = seg->mr_len; seg_len > 0; seg_len -= PAGE_SIZE) {
1564                         frmr->fr_pgl->page_list[page_no++] = pa;
1565                         pa += PAGE_SIZE;
1566                 }
1567                 len += seg->mr_len;
1568                 ++seg;
1569                 ++i;
1570                 /* Check for holes */
1571                 if ((i < *nsegs && offset_in_page(seg->mr_offset)) ||
1572                     offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len))
1573                         break;
1574         }
1575         dprintk("RPC:       %s: Using frmr %p to map %d segments\n",
1576                 __func__, mw, i);
1577
1578         if (unlikely(frmr->fr_state == FRMR_IS_VALID)) {
1579                 dprintk("RPC:       %s: frmr %x left valid, posting invalidate.\n",
1580                         __func__, mr->rkey);
1581                 /* Invalidate before using. */
1582                 memset(&invalidate_wr, 0, sizeof invalidate_wr);
1583                 invalidate_wr.wr_id = (unsigned long)(void *)mw;
1584                 invalidate_wr.next = &frmr_wr;
1585                 invalidate_wr.opcode = IB_WR_LOCAL_INV;
1586                 invalidate_wr.send_flags = IB_SEND_SIGNALED;
1587                 invalidate_wr.ex.invalidate_rkey = mr->rkey;
1588                 DECR_CQCOUNT(&r_xprt->rx_ep);
1589                 post_wr = &invalidate_wr;
1590         } else
1591                 post_wr = &frmr_wr;
1592
1593         /* Prepare FRMR WR */
1594         memset(&frmr_wr, 0, sizeof frmr_wr);
1595         frmr_wr.wr_id = (unsigned long)(void *)mw;
1596         frmr_wr.opcode = IB_WR_FAST_REG_MR;
1597         frmr_wr.send_flags = IB_SEND_SIGNALED;
1598         frmr_wr.wr.fast_reg.iova_start = seg1->mr_dma;
1599         frmr_wr.wr.fast_reg.page_list = frmr->fr_pgl;
1600         frmr_wr.wr.fast_reg.page_list_len = page_no;
1601         frmr_wr.wr.fast_reg.page_shift = PAGE_SHIFT;
1602         frmr_wr.wr.fast_reg.length = page_no << PAGE_SHIFT;
1603         if (frmr_wr.wr.fast_reg.length < len) {
1604                 rc = -EIO;
1605                 goto out_err;
1606         }
1607
1608         /* Bump the key */
1609         key = (u8)(mr->rkey & 0x000000FF);
1610         ib_update_fast_reg_key(mr, ++key);
1611
1612         frmr_wr.wr.fast_reg.access_flags = (writing ?
1613                                 IB_ACCESS_REMOTE_WRITE | IB_ACCESS_LOCAL_WRITE :
1614                                 IB_ACCESS_REMOTE_READ);
1615         frmr_wr.wr.fast_reg.rkey = mr->rkey;
1616         DECR_CQCOUNT(&r_xprt->rx_ep);
1617
1618         rc = ib_post_send(ia->ri_id->qp, post_wr, &bad_wr);
1619
1620         if (rc) {
1621                 dprintk("RPC:       %s: failed ib_post_send for register,"
1622                         " status %i\n", __func__, rc);
1623                 ib_update_fast_reg_key(mr, --key);
1624                 goto out_err;
1625         } else {
1626                 seg1->mr_rkey = mr->rkey;
1627                 seg1->mr_base = seg1->mr_dma + pageoff;
1628                 seg1->mr_nsegs = i;
1629                 seg1->mr_len = len;
1630         }
1631         *nsegs = i;
1632         return 0;
1633 out_err:
1634         while (i--)
1635                 rpcrdma_unmap_one(ia, --seg);
1636         return rc;
1637 }
1638
1639 static int
1640 rpcrdma_deregister_frmr_external(struct rpcrdma_mr_seg *seg,
1641                         struct rpcrdma_ia *ia, struct rpcrdma_xprt *r_xprt)
1642 {
1643         struct rpcrdma_mr_seg *seg1 = seg;
1644         struct ib_send_wr invalidate_wr, *bad_wr;
1645         int rc;
1646
1647         memset(&invalidate_wr, 0, sizeof invalidate_wr);
1648         invalidate_wr.wr_id = (unsigned long)(void *)seg1->mr_chunk.rl_mw;
1649         invalidate_wr.opcode = IB_WR_LOCAL_INV;
1650         invalidate_wr.send_flags = IB_SEND_SIGNALED;
1651         invalidate_wr.ex.invalidate_rkey = seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey;
1652         DECR_CQCOUNT(&r_xprt->rx_ep);
1653
1654         read_lock(&ia->ri_qplock);
1655         while (seg1->mr_nsegs--)
1656                 rpcrdma_unmap_one(ia, seg++);
1657         rc = ib_post_send(ia->ri_id->qp, &invalidate_wr, &bad_wr);
1658         read_unlock(&ia->ri_qplock);
1659         if (rc)
1660                 dprintk("RPC:       %s: failed ib_post_send for invalidate,"
1661                         " status %i\n", __func__, rc);
1662         return rc;
1663 }
1664
1665 static int
1666 rpcrdma_register_fmr_external(struct rpcrdma_mr_seg *seg,
1667                         int *nsegs, int writing, struct rpcrdma_ia *ia)
1668 {
1669         struct rpcrdma_mr_seg *seg1 = seg;
1670         u64 physaddrs[RPCRDMA_MAX_DATA_SEGS];
1671         int len, pageoff, i, rc;
1672
1673         pageoff = offset_in_page(seg1->mr_offset);
1674         seg1->mr_offset -= pageoff;     /* start of page */
1675         seg1->mr_len += pageoff;
1676         len = -pageoff;
1677         if (*nsegs > RPCRDMA_MAX_DATA_SEGS)
1678                 *nsegs = RPCRDMA_MAX_DATA_SEGS;
1679         for (i = 0; i < *nsegs;) {
1680                 rpcrdma_map_one(ia, seg, writing);
1681                 physaddrs[i] = seg->mr_dma;
1682                 len += seg->mr_len;
1683                 ++seg;
1684                 ++i;
1685                 /* Check for holes */
1686                 if ((i < *nsegs && offset_in_page(seg->mr_offset)) ||
1687                     offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len))
1688                         break;
1689         }
1690         rc = ib_map_phys_fmr(seg1->mr_chunk.rl_mw->r.fmr,
1691                                 physaddrs, i, seg1->mr_dma);
1692         if (rc) {
1693                 dprintk("RPC:       %s: failed ib_map_phys_fmr "
1694                         "%u@0x%llx+%i (%d)... status %i\n", __func__,
1695                         len, (unsigned long long)seg1->mr_dma,
1696                         pageoff, i, rc);
1697                 while (i--)
1698                         rpcrdma_unmap_one(ia, --seg);
1699         } else {
1700                 seg1->mr_rkey = seg1->mr_chunk.rl_mw->r.fmr->rkey;
1701                 seg1->mr_base = seg1->mr_dma + pageoff;
1702                 seg1->mr_nsegs = i;
1703                 seg1->mr_len = len;
1704         }
1705         *nsegs = i;
1706         return rc;
1707 }
1708
1709 static int
1710 rpcrdma_deregister_fmr_external(struct rpcrdma_mr_seg *seg,
1711                         struct rpcrdma_ia *ia)
1712 {
1713         struct rpcrdma_mr_seg *seg1 = seg;
1714         LIST_HEAD(l);
1715         int rc;
1716
1717         list_add(&seg1->mr_chunk.rl_mw->r.fmr->list, &l);
1718         rc = ib_unmap_fmr(&l);
1719         read_lock(&ia->ri_qplock);
1720         while (seg1->mr_nsegs--)
1721                 rpcrdma_unmap_one(ia, seg++);
1722         read_unlock(&ia->ri_qplock);
1723         if (rc)
1724                 dprintk("RPC:       %s: failed ib_unmap_fmr,"
1725                         " status %i\n", __func__, rc);
1726         return rc;
1727 }
1728
1729 int
1730 rpcrdma_register_external(struct rpcrdma_mr_seg *seg,
1731                         int nsegs, int writing, struct rpcrdma_xprt *r_xprt)
1732 {
1733         struct rpcrdma_ia *ia = &r_xprt->rx_ia;
1734         int rc = 0;
1735
1736         switch (ia->ri_memreg_strategy) {
1737
1738 #if RPCRDMA_PERSISTENT_REGISTRATION
1739         case RPCRDMA_ALLPHYSICAL:
1740                 rpcrdma_map_one(ia, seg, writing);
1741                 seg->mr_rkey = ia->ri_bind_mem->rkey;
1742                 seg->mr_base = seg->mr_dma;
1743                 seg->mr_nsegs = 1;
1744                 nsegs = 1;
1745                 break;
1746 #endif
1747
1748         /* Registration using frmr registration */
1749         case RPCRDMA_FRMR:
1750                 rc = rpcrdma_register_frmr_external(seg, &nsegs, writing, ia, r_xprt);
1751                 break;
1752
1753         /* Registration using fmr memory registration */
1754         case RPCRDMA_MTHCAFMR:
1755                 rc = rpcrdma_register_fmr_external(seg, &nsegs, writing, ia);
1756                 break;
1757
1758         default:
1759                 return -1;
1760         }
1761         if (rc)
1762                 return -1;
1763
1764         return nsegs;
1765 }
1766
1767 int
1768 rpcrdma_deregister_external(struct rpcrdma_mr_seg *seg,
1769                 struct rpcrdma_xprt *r_xprt)
1770 {
1771         struct rpcrdma_ia *ia = &r_xprt->rx_ia;
1772         int nsegs = seg->mr_nsegs, rc;
1773
1774         switch (ia->ri_memreg_strategy) {
1775
1776 #if RPCRDMA_PERSISTENT_REGISTRATION
1777         case RPCRDMA_ALLPHYSICAL:
1778                 read_lock(&ia->ri_qplock);
1779                 rpcrdma_unmap_one(ia, seg);
1780                 read_unlock(&ia->ri_qplock);
1781                 break;
1782 #endif
1783
1784         case RPCRDMA_FRMR:
1785                 rc = rpcrdma_deregister_frmr_external(seg, ia, r_xprt);
1786                 break;
1787
1788         case RPCRDMA_MTHCAFMR:
1789                 rc = rpcrdma_deregister_fmr_external(seg, ia);
1790                 break;
1791
1792         default:
1793                 break;
1794         }
1795         return nsegs;
1796 }
1797
1798 /*
1799  * Prepost any receive buffer, then post send.
1800  *
1801  * Receive buffer is donated to hardware, reclaimed upon recv completion.
1802  */
1803 int
1804 rpcrdma_ep_post(struct rpcrdma_ia *ia,
1805                 struct rpcrdma_ep *ep,
1806                 struct rpcrdma_req *req)
1807 {
1808         struct ib_send_wr send_wr, *send_wr_fail;
1809         struct rpcrdma_rep *rep = req->rl_reply;
1810         int rc;
1811
1812         if (rep) {
1813                 rc = rpcrdma_ep_post_recv(ia, ep, rep);
1814                 if (rc)
1815                         goto out;
1816                 req->rl_reply = NULL;
1817         }
1818
1819         send_wr.next = NULL;
1820         send_wr.wr_id = 0ULL;   /* no send cookie */
1821         send_wr.sg_list = req->rl_send_iov;
1822         send_wr.num_sge = req->rl_niovs;
1823         send_wr.opcode = IB_WR_SEND;
1824         if (send_wr.num_sge == 4)       /* no need to sync any pad (constant) */
1825                 ib_dma_sync_single_for_device(ia->ri_id->device,
1826                         req->rl_send_iov[3].addr, req->rl_send_iov[3].length,
1827                         DMA_TO_DEVICE);
1828         ib_dma_sync_single_for_device(ia->ri_id->device,
1829                 req->rl_send_iov[1].addr, req->rl_send_iov[1].length,
1830                 DMA_TO_DEVICE);
1831         ib_dma_sync_single_for_device(ia->ri_id->device,
1832                 req->rl_send_iov[0].addr, req->rl_send_iov[0].length,
1833                 DMA_TO_DEVICE);
1834
1835         if (DECR_CQCOUNT(ep) > 0)
1836                 send_wr.send_flags = 0;
1837         else { /* Provider must take a send completion every now and then */
1838                 INIT_CQCOUNT(ep);
1839                 send_wr.send_flags = IB_SEND_SIGNALED;
1840         }
1841
1842         rc = ib_post_send(ia->ri_id->qp, &send_wr, &send_wr_fail);
1843         if (rc)
1844                 dprintk("RPC:       %s: ib_post_send returned %i\n", __func__,
1845                         rc);
1846 out:
1847         return rc;
1848 }
1849
1850 /*
1851  * (Re)post a receive buffer.
1852  */
1853 int
1854 rpcrdma_ep_post_recv(struct rpcrdma_ia *ia,
1855                      struct rpcrdma_ep *ep,
1856                      struct rpcrdma_rep *rep)
1857 {
1858         struct ib_recv_wr recv_wr, *recv_wr_fail;
1859         int rc;
1860
1861         recv_wr.next = NULL;
1862         recv_wr.wr_id = (u64) (unsigned long) rep;
1863         recv_wr.sg_list = &rep->rr_iov;
1864         recv_wr.num_sge = 1;
1865
1866         ib_dma_sync_single_for_cpu(ia->ri_id->device,
1867                 rep->rr_iov.addr, rep->rr_iov.length, DMA_BIDIRECTIONAL);
1868
1869         rc = ib_post_recv(ia->ri_id->qp, &recv_wr, &recv_wr_fail);
1870
1871         if (rc)
1872                 dprintk("RPC:       %s: ib_post_recv returned %i\n", __func__,
1873                         rc);
1874         return rc;
1875 }
1876
1877 /* Physical mapping means one Read/Write list entry per-page.
1878  * All list entries must fit within an inline buffer
1879  *
1880  * NB: The server must return a Write list for NFS READ,
1881  *     which has the same constraint. Factor in the inline
1882  *     rsize as well.
1883  */
1884 static size_t
1885 rpcrdma_physical_max_payload(struct rpcrdma_xprt *r_xprt)
1886 {
1887         struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
1888         unsigned int inline_size, pages;
1889
1890         inline_size = min_t(unsigned int,
1891                             cdata->inline_wsize, cdata->inline_rsize);
1892         inline_size -= RPCRDMA_HDRLEN_MIN;
1893         pages = inline_size / sizeof(struct rpcrdma_segment);
1894         return pages << PAGE_SHIFT;
1895 }
1896
1897 static size_t
1898 rpcrdma_mr_max_payload(struct rpcrdma_xprt *r_xprt)
1899 {
1900         return RPCRDMA_MAX_DATA_SEGS << PAGE_SHIFT;
1901 }
1902
1903 size_t
1904 rpcrdma_max_payload(struct rpcrdma_xprt *r_xprt)
1905 {
1906         size_t result;
1907
1908         switch (r_xprt->rx_ia.ri_memreg_strategy) {
1909         case RPCRDMA_ALLPHYSICAL:
1910                 result = rpcrdma_physical_max_payload(r_xprt);
1911                 break;
1912         default:
1913                 result = rpcrdma_mr_max_payload(r_xprt);
1914         }
1915         return result;
1916 }