]> git.karo-electronics.de Git - karo-tx-linux.git/blob - net/sunrpc/xprtrdma/verbs.c
xprtrdma: Reset FRMRs after a flushed LOCAL_INV Work Request
[karo-tx-linux.git] / net / sunrpc / xprtrdma / verbs.c
1 /*
2  * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
3  *
4  * This software is available to you under a choice of one of two
5  * licenses.  You may choose to be licensed under the terms of the GNU
6  * General Public License (GPL) Version 2, available from the file
7  * COPYING in the main directory of this source tree, or the BSD-type
8  * license below:
9  *
10  * Redistribution and use in source and binary forms, with or without
11  * modification, are permitted provided that the following conditions
12  * are met:
13  *
14  *      Redistributions of source code must retain the above copyright
15  *      notice, this list of conditions and the following disclaimer.
16  *
17  *      Redistributions in binary form must reproduce the above
18  *      copyright notice, this list of conditions and the following
19  *      disclaimer in the documentation and/or other materials provided
20  *      with the distribution.
21  *
22  *      Neither the name of the Network Appliance, Inc. nor the names of
23  *      its contributors may be used to endorse or promote products
24  *      derived from this software without specific prior written
25  *      permission.
26  *
27  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
28  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
29  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
30  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
31  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
32  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
33  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
34  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
35  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
36  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
37  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
38  */
39
40 /*
41  * verbs.c
42  *
43  * Encapsulates the major functions managing:
44  *  o adapters
45  *  o endpoints
46  *  o connections
47  *  o buffer memory
48  */
49
50 #include <linux/interrupt.h>
51 #include <linux/slab.h>
52 #include <asm/bitops.h>
53
54 #include "xprt_rdma.h"
55
56 /*
57  * Globals/Macros
58  */
59
60 #ifdef RPC_DEBUG
61 # define RPCDBG_FACILITY        RPCDBG_TRANS
62 #endif
63
64 static void rpcrdma_reset_frmrs(struct rpcrdma_ia *);
65
66 /*
67  * internal functions
68  */
69
70 /*
71  * handle replies in tasklet context, using a single, global list
72  * rdma tasklet function -- just turn around and call the func
73  * for all replies on the list
74  */
75
76 static DEFINE_SPINLOCK(rpcrdma_tk_lock_g);
77 static LIST_HEAD(rpcrdma_tasklets_g);
78
79 static void
80 rpcrdma_run_tasklet(unsigned long data)
81 {
82         struct rpcrdma_rep *rep;
83         void (*func)(struct rpcrdma_rep *);
84         unsigned long flags;
85
86         data = data;
87         spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
88         while (!list_empty(&rpcrdma_tasklets_g)) {
89                 rep = list_entry(rpcrdma_tasklets_g.next,
90                                  struct rpcrdma_rep, rr_list);
91                 list_del(&rep->rr_list);
92                 func = rep->rr_func;
93                 rep->rr_func = NULL;
94                 spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
95
96                 if (func)
97                         func(rep);
98                 else
99                         rpcrdma_recv_buffer_put(rep);
100
101                 spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
102         }
103         spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
104 }
105
106 static DECLARE_TASKLET(rpcrdma_tasklet_g, rpcrdma_run_tasklet, 0UL);
107
108 static inline void
109 rpcrdma_schedule_tasklet(struct rpcrdma_rep *rep)
110 {
111         unsigned long flags;
112
113         spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
114         list_add_tail(&rep->rr_list, &rpcrdma_tasklets_g);
115         spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
116         tasklet_schedule(&rpcrdma_tasklet_g);
117 }
118
119 static void
120 rpcrdma_qp_async_error_upcall(struct ib_event *event, void *context)
121 {
122         struct rpcrdma_ep *ep = context;
123
124         dprintk("RPC:       %s: QP error %X on device %s ep %p\n",
125                 __func__, event->event, event->device->name, context);
126         if (ep->rep_connected == 1) {
127                 ep->rep_connected = -EIO;
128                 ep->rep_func(ep);
129                 wake_up_all(&ep->rep_connect_wait);
130         }
131 }
132
133 static void
134 rpcrdma_cq_async_error_upcall(struct ib_event *event, void *context)
135 {
136         struct rpcrdma_ep *ep = context;
137
138         dprintk("RPC:       %s: CQ error %X on device %s ep %p\n",
139                 __func__, event->event, event->device->name, context);
140         if (ep->rep_connected == 1) {
141                 ep->rep_connected = -EIO;
142                 ep->rep_func(ep);
143                 wake_up_all(&ep->rep_connect_wait);
144         }
145 }
146
147 static void
148 rpcrdma_sendcq_process_wc(struct ib_wc *wc)
149 {
150         struct rpcrdma_mw *frmr = (struct rpcrdma_mw *)(unsigned long)wc->wr_id;
151
152         dprintk("RPC:       %s: frmr %p status %X opcode %d\n",
153                 __func__, frmr, wc->status, wc->opcode);
154
155         if (wc->wr_id == 0ULL)
156                 return;
157         if (wc->status != IB_WC_SUCCESS) {
158                 frmr->r.frmr.fr_state = FRMR_IS_STALE;
159                 return;
160         }
161
162         if (wc->opcode == IB_WC_FAST_REG_MR)
163                 frmr->r.frmr.fr_state = FRMR_IS_VALID;
164         else if (wc->opcode == IB_WC_LOCAL_INV)
165                 frmr->r.frmr.fr_state = FRMR_IS_INVALID;
166 }
167
168 static int
169 rpcrdma_sendcq_poll(struct ib_cq *cq, struct rpcrdma_ep *ep)
170 {
171         struct ib_wc *wcs;
172         int budget, count, rc;
173
174         budget = RPCRDMA_WC_BUDGET / RPCRDMA_POLLSIZE;
175         do {
176                 wcs = ep->rep_send_wcs;
177
178                 rc = ib_poll_cq(cq, RPCRDMA_POLLSIZE, wcs);
179                 if (rc <= 0)
180                         return rc;
181
182                 count = rc;
183                 while (count-- > 0)
184                         rpcrdma_sendcq_process_wc(wcs++);
185         } while (rc == RPCRDMA_POLLSIZE && --budget);
186         return 0;
187 }
188
189 /*
190  * Handle send, fast_reg_mr, and local_inv completions.
191  *
192  * Send events are typically suppressed and thus do not result
193  * in an upcall. Occasionally one is signaled, however. This
194  * prevents the provider's completion queue from wrapping and
195  * losing a completion.
196  */
197 static void
198 rpcrdma_sendcq_upcall(struct ib_cq *cq, void *cq_context)
199 {
200         struct rpcrdma_ep *ep = (struct rpcrdma_ep *)cq_context;
201         int rc;
202
203         rc = rpcrdma_sendcq_poll(cq, ep);
204         if (rc) {
205                 dprintk("RPC:       %s: ib_poll_cq failed: %i\n",
206                         __func__, rc);
207                 return;
208         }
209
210         rc = ib_req_notify_cq(cq,
211                         IB_CQ_NEXT_COMP | IB_CQ_REPORT_MISSED_EVENTS);
212         if (rc == 0)
213                 return;
214         if (rc < 0) {
215                 dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
216                         __func__, rc);
217                 return;
218         }
219
220         rpcrdma_sendcq_poll(cq, ep);
221 }
222
223 static void
224 rpcrdma_recvcq_process_wc(struct ib_wc *wc)
225 {
226         struct rpcrdma_rep *rep =
227                         (struct rpcrdma_rep *)(unsigned long)wc->wr_id;
228
229         dprintk("RPC:       %s: rep %p status %X opcode %X length %u\n",
230                 __func__, rep, wc->status, wc->opcode, wc->byte_len);
231
232         if (wc->status != IB_WC_SUCCESS) {
233                 rep->rr_len = ~0U;
234                 goto out_schedule;
235         }
236         if (wc->opcode != IB_WC_RECV)
237                 return;
238
239         rep->rr_len = wc->byte_len;
240         ib_dma_sync_single_for_cpu(rdmab_to_ia(rep->rr_buffer)->ri_id->device,
241                         rep->rr_iov.addr, rep->rr_len, DMA_FROM_DEVICE);
242
243         if (rep->rr_len >= 16) {
244                 struct rpcrdma_msg *p = (struct rpcrdma_msg *)rep->rr_base;
245                 unsigned int credits = ntohl(p->rm_credit);
246
247                 if (credits == 0)
248                         credits = 1;    /* don't deadlock */
249                 else if (credits > rep->rr_buffer->rb_max_requests)
250                         credits = rep->rr_buffer->rb_max_requests;
251                 atomic_set(&rep->rr_buffer->rb_credits, credits);
252         }
253
254 out_schedule:
255         rpcrdma_schedule_tasklet(rep);
256 }
257
258 static int
259 rpcrdma_recvcq_poll(struct ib_cq *cq, struct rpcrdma_ep *ep)
260 {
261         struct ib_wc *wcs;
262         int budget, count, rc;
263
264         budget = RPCRDMA_WC_BUDGET / RPCRDMA_POLLSIZE;
265         do {
266                 wcs = ep->rep_recv_wcs;
267
268                 rc = ib_poll_cq(cq, RPCRDMA_POLLSIZE, wcs);
269                 if (rc <= 0)
270                         return rc;
271
272                 count = rc;
273                 while (count-- > 0)
274                         rpcrdma_recvcq_process_wc(wcs++);
275         } while (rc == RPCRDMA_POLLSIZE && --budget);
276         return 0;
277 }
278
279 /*
280  * Handle receive completions.
281  *
282  * It is reentrant but processes single events in order to maintain
283  * ordering of receives to keep server credits.
284  *
285  * It is the responsibility of the scheduled tasklet to return
286  * recv buffers to the pool. NOTE: this affects synchronization of
287  * connection shutdown. That is, the structures required for
288  * the completion of the reply handler must remain intact until
289  * all memory has been reclaimed.
290  */
291 static void
292 rpcrdma_recvcq_upcall(struct ib_cq *cq, void *cq_context)
293 {
294         struct rpcrdma_ep *ep = (struct rpcrdma_ep *)cq_context;
295         int rc;
296
297         rc = rpcrdma_recvcq_poll(cq, ep);
298         if (rc) {
299                 dprintk("RPC:       %s: ib_poll_cq failed: %i\n",
300                         __func__, rc);
301                 return;
302         }
303
304         rc = ib_req_notify_cq(cq,
305                         IB_CQ_NEXT_COMP | IB_CQ_REPORT_MISSED_EVENTS);
306         if (rc == 0)
307                 return;
308         if (rc < 0) {
309                 dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
310                         __func__, rc);
311                 return;
312         }
313
314         rpcrdma_recvcq_poll(cq, ep);
315 }
316
317 static void
318 rpcrdma_flush_cqs(struct rpcrdma_ep *ep)
319 {
320         rpcrdma_recvcq_upcall(ep->rep_attr.recv_cq, ep);
321         rpcrdma_sendcq_upcall(ep->rep_attr.send_cq, ep);
322 }
323
324 #ifdef RPC_DEBUG
325 static const char * const conn[] = {
326         "address resolved",
327         "address error",
328         "route resolved",
329         "route error",
330         "connect request",
331         "connect response",
332         "connect error",
333         "unreachable",
334         "rejected",
335         "established",
336         "disconnected",
337         "device removal"
338 };
339 #endif
340
341 static int
342 rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
343 {
344         struct rpcrdma_xprt *xprt = id->context;
345         struct rpcrdma_ia *ia = &xprt->rx_ia;
346         struct rpcrdma_ep *ep = &xprt->rx_ep;
347 #ifdef RPC_DEBUG
348         struct sockaddr_in *addr = (struct sockaddr_in *) &ep->rep_remote_addr;
349 #endif
350         struct ib_qp_attr attr;
351         struct ib_qp_init_attr iattr;
352         int connstate = 0;
353
354         switch (event->event) {
355         case RDMA_CM_EVENT_ADDR_RESOLVED:
356         case RDMA_CM_EVENT_ROUTE_RESOLVED:
357                 ia->ri_async_rc = 0;
358                 complete(&ia->ri_done);
359                 break;
360         case RDMA_CM_EVENT_ADDR_ERROR:
361                 ia->ri_async_rc = -EHOSTUNREACH;
362                 dprintk("RPC:       %s: CM address resolution error, ep 0x%p\n",
363                         __func__, ep);
364                 complete(&ia->ri_done);
365                 break;
366         case RDMA_CM_EVENT_ROUTE_ERROR:
367                 ia->ri_async_rc = -ENETUNREACH;
368                 dprintk("RPC:       %s: CM route resolution error, ep 0x%p\n",
369                         __func__, ep);
370                 complete(&ia->ri_done);
371                 break;
372         case RDMA_CM_EVENT_ESTABLISHED:
373                 connstate = 1;
374                 ib_query_qp(ia->ri_id->qp, &attr,
375                         IB_QP_MAX_QP_RD_ATOMIC | IB_QP_MAX_DEST_RD_ATOMIC,
376                         &iattr);
377                 dprintk("RPC:       %s: %d responder resources"
378                         " (%d initiator)\n",
379                         __func__, attr.max_dest_rd_atomic, attr.max_rd_atomic);
380                 goto connected;
381         case RDMA_CM_EVENT_CONNECT_ERROR:
382                 connstate = -ENOTCONN;
383                 goto connected;
384         case RDMA_CM_EVENT_UNREACHABLE:
385                 connstate = -ENETDOWN;
386                 goto connected;
387         case RDMA_CM_EVENT_REJECTED:
388                 connstate = -ECONNREFUSED;
389                 goto connected;
390         case RDMA_CM_EVENT_DISCONNECTED:
391                 connstate = -ECONNABORTED;
392                 goto connected;
393         case RDMA_CM_EVENT_DEVICE_REMOVAL:
394                 connstate = -ENODEV;
395 connected:
396                 dprintk("RPC:       %s: %s: %pI4:%u (ep 0x%p event 0x%x)\n",
397                         __func__,
398                         (event->event <= 11) ? conn[event->event] :
399                                                 "unknown connection error",
400                         &addr->sin_addr.s_addr,
401                         ntohs(addr->sin_port),
402                         ep, event->event);
403                 atomic_set(&rpcx_to_rdmax(ep->rep_xprt)->rx_buf.rb_credits, 1);
404                 dprintk("RPC:       %s: %sconnected\n",
405                                         __func__, connstate > 0 ? "" : "dis");
406                 ep->rep_connected = connstate;
407                 ep->rep_func(ep);
408                 wake_up_all(&ep->rep_connect_wait);
409                 break;
410         default:
411                 dprintk("RPC:       %s: unexpected CM event %d\n",
412                         __func__, event->event);
413                 break;
414         }
415
416 #ifdef RPC_DEBUG
417         if (connstate == 1) {
418                 int ird = attr.max_dest_rd_atomic;
419                 int tird = ep->rep_remote_cma.responder_resources;
420                 printk(KERN_INFO "rpcrdma: connection to %pI4:%u "
421                         "on %s, memreg %d slots %d ird %d%s\n",
422                         &addr->sin_addr.s_addr,
423                         ntohs(addr->sin_port),
424                         ia->ri_id->device->name,
425                         ia->ri_memreg_strategy,
426                         xprt->rx_buf.rb_max_requests,
427                         ird, ird < 4 && ird < tird / 2 ? " (low!)" : "");
428         } else if (connstate < 0) {
429                 printk(KERN_INFO "rpcrdma: connection to %pI4:%u closed (%d)\n",
430                         &addr->sin_addr.s_addr,
431                         ntohs(addr->sin_port),
432                         connstate);
433         }
434 #endif
435
436         return 0;
437 }
438
439 static struct rdma_cm_id *
440 rpcrdma_create_id(struct rpcrdma_xprt *xprt,
441                         struct rpcrdma_ia *ia, struct sockaddr *addr)
442 {
443         struct rdma_cm_id *id;
444         int rc;
445
446         init_completion(&ia->ri_done);
447
448         id = rdma_create_id(rpcrdma_conn_upcall, xprt, RDMA_PS_TCP, IB_QPT_RC);
449         if (IS_ERR(id)) {
450                 rc = PTR_ERR(id);
451                 dprintk("RPC:       %s: rdma_create_id() failed %i\n",
452                         __func__, rc);
453                 return id;
454         }
455
456         ia->ri_async_rc = -ETIMEDOUT;
457         rc = rdma_resolve_addr(id, NULL, addr, RDMA_RESOLVE_TIMEOUT);
458         if (rc) {
459                 dprintk("RPC:       %s: rdma_resolve_addr() failed %i\n",
460                         __func__, rc);
461                 goto out;
462         }
463         wait_for_completion_interruptible_timeout(&ia->ri_done,
464                                 msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1);
465         rc = ia->ri_async_rc;
466         if (rc)
467                 goto out;
468
469         ia->ri_async_rc = -ETIMEDOUT;
470         rc = rdma_resolve_route(id, RDMA_RESOLVE_TIMEOUT);
471         if (rc) {
472                 dprintk("RPC:       %s: rdma_resolve_route() failed %i\n",
473                         __func__, rc);
474                 goto out;
475         }
476         wait_for_completion_interruptible_timeout(&ia->ri_done,
477                                 msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1);
478         rc = ia->ri_async_rc;
479         if (rc)
480                 goto out;
481
482         return id;
483
484 out:
485         rdma_destroy_id(id);
486         return ERR_PTR(rc);
487 }
488
489 /*
490  * Drain any cq, prior to teardown.
491  */
492 static void
493 rpcrdma_clean_cq(struct ib_cq *cq)
494 {
495         struct ib_wc wc;
496         int count = 0;
497
498         while (1 == ib_poll_cq(cq, 1, &wc))
499                 ++count;
500
501         if (count)
502                 dprintk("RPC:       %s: flushed %d events (last 0x%x)\n",
503                         __func__, count, wc.opcode);
504 }
505
506 /*
507  * Exported functions.
508  */
509
510 /*
511  * Open and initialize an Interface Adapter.
512  *  o initializes fields of struct rpcrdma_ia, including
513  *    interface and provider attributes and protection zone.
514  */
515 int
516 rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)
517 {
518         int rc, mem_priv;
519         struct ib_device_attr devattr;
520         struct rpcrdma_ia *ia = &xprt->rx_ia;
521
522         ia->ri_id = rpcrdma_create_id(xprt, ia, addr);
523         if (IS_ERR(ia->ri_id)) {
524                 rc = PTR_ERR(ia->ri_id);
525                 goto out1;
526         }
527
528         ia->ri_pd = ib_alloc_pd(ia->ri_id->device);
529         if (IS_ERR(ia->ri_pd)) {
530                 rc = PTR_ERR(ia->ri_pd);
531                 dprintk("RPC:       %s: ib_alloc_pd() failed %i\n",
532                         __func__, rc);
533                 goto out2;
534         }
535
536         /*
537          * Query the device to determine if the requested memory
538          * registration strategy is supported. If it isn't, set the
539          * strategy to a globally supported model.
540          */
541         rc = ib_query_device(ia->ri_id->device, &devattr);
542         if (rc) {
543                 dprintk("RPC:       %s: ib_query_device failed %d\n",
544                         __func__, rc);
545                 goto out2;
546         }
547
548         if (devattr.device_cap_flags & IB_DEVICE_LOCAL_DMA_LKEY) {
549                 ia->ri_have_dma_lkey = 1;
550                 ia->ri_dma_lkey = ia->ri_id->device->local_dma_lkey;
551         }
552
553         if (memreg == RPCRDMA_FRMR) {
554                 /* Requires both frmr reg and local dma lkey */
555                 if ((devattr.device_cap_flags &
556                      (IB_DEVICE_MEM_MGT_EXTENSIONS|IB_DEVICE_LOCAL_DMA_LKEY)) !=
557                     (IB_DEVICE_MEM_MGT_EXTENSIONS|IB_DEVICE_LOCAL_DMA_LKEY)) {
558                         dprintk("RPC:       %s: FRMR registration "
559                                 "not supported by HCA\n", __func__);
560                         memreg = RPCRDMA_MTHCAFMR;
561                 } else {
562                         /* Mind the ia limit on FRMR page list depth */
563                         ia->ri_max_frmr_depth = min_t(unsigned int,
564                                 RPCRDMA_MAX_DATA_SEGS,
565                                 devattr.max_fast_reg_page_list_len);
566                 }
567         }
568         if (memreg == RPCRDMA_MTHCAFMR) {
569                 if (!ia->ri_id->device->alloc_fmr) {
570                         dprintk("RPC:       %s: MTHCAFMR registration "
571                                 "not supported by HCA\n", __func__);
572 #if RPCRDMA_PERSISTENT_REGISTRATION
573                         memreg = RPCRDMA_ALLPHYSICAL;
574 #else
575                         rc = -ENOMEM;
576                         goto out2;
577 #endif
578                 }
579         }
580
581         /*
582          * Optionally obtain an underlying physical identity mapping in
583          * order to do a memory window-based bind. This base registration
584          * is protected from remote access - that is enabled only by binding
585          * for the specific bytes targeted during each RPC operation, and
586          * revoked after the corresponding completion similar to a storage
587          * adapter.
588          */
589         switch (memreg) {
590         case RPCRDMA_FRMR:
591                 break;
592 #if RPCRDMA_PERSISTENT_REGISTRATION
593         case RPCRDMA_ALLPHYSICAL:
594                 mem_priv = IB_ACCESS_LOCAL_WRITE |
595                                 IB_ACCESS_REMOTE_WRITE |
596                                 IB_ACCESS_REMOTE_READ;
597                 goto register_setup;
598 #endif
599         case RPCRDMA_MTHCAFMR:
600                 if (ia->ri_have_dma_lkey)
601                         break;
602                 mem_priv = IB_ACCESS_LOCAL_WRITE;
603 #if RPCRDMA_PERSISTENT_REGISTRATION
604         register_setup:
605 #endif
606                 ia->ri_bind_mem = ib_get_dma_mr(ia->ri_pd, mem_priv);
607                 if (IS_ERR(ia->ri_bind_mem)) {
608                         printk(KERN_ALERT "%s: ib_get_dma_mr for "
609                                 "phys register failed with %lX\n",
610                                 __func__, PTR_ERR(ia->ri_bind_mem));
611                         rc = -ENOMEM;
612                         goto out2;
613                 }
614                 break;
615         default:
616                 printk(KERN_ERR "RPC: Unsupported memory "
617                                 "registration mode: %d\n", memreg);
618                 rc = -ENOMEM;
619                 goto out2;
620         }
621         dprintk("RPC:       %s: memory registration strategy is %d\n",
622                 __func__, memreg);
623
624         /* Else will do memory reg/dereg for each chunk */
625         ia->ri_memreg_strategy = memreg;
626
627         rwlock_init(&ia->ri_qplock);
628         return 0;
629 out2:
630         rdma_destroy_id(ia->ri_id);
631         ia->ri_id = NULL;
632 out1:
633         return rc;
634 }
635
636 /*
637  * Clean up/close an IA.
638  *   o if event handles and PD have been initialized, free them.
639  *   o close the IA
640  */
641 void
642 rpcrdma_ia_close(struct rpcrdma_ia *ia)
643 {
644         int rc;
645
646         dprintk("RPC:       %s: entering\n", __func__);
647         if (ia->ri_bind_mem != NULL) {
648                 rc = ib_dereg_mr(ia->ri_bind_mem);
649                 dprintk("RPC:       %s: ib_dereg_mr returned %i\n",
650                         __func__, rc);
651         }
652         if (ia->ri_id != NULL && !IS_ERR(ia->ri_id)) {
653                 if (ia->ri_id->qp)
654                         rdma_destroy_qp(ia->ri_id);
655                 rdma_destroy_id(ia->ri_id);
656                 ia->ri_id = NULL;
657         }
658         if (ia->ri_pd != NULL && !IS_ERR(ia->ri_pd)) {
659                 rc = ib_dealloc_pd(ia->ri_pd);
660                 dprintk("RPC:       %s: ib_dealloc_pd returned %i\n",
661                         __func__, rc);
662         }
663 }
664
665 /*
666  * Create unconnected endpoint.
667  */
668 int
669 rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
670                                 struct rpcrdma_create_data_internal *cdata)
671 {
672         struct ib_device_attr devattr;
673         struct ib_cq *sendcq, *recvcq;
674         int rc, err;
675
676         rc = ib_query_device(ia->ri_id->device, &devattr);
677         if (rc) {
678                 dprintk("RPC:       %s: ib_query_device failed %d\n",
679                         __func__, rc);
680                 return rc;
681         }
682
683         /* check provider's send/recv wr limits */
684         if (cdata->max_requests > devattr.max_qp_wr)
685                 cdata->max_requests = devattr.max_qp_wr;
686
687         ep->rep_attr.event_handler = rpcrdma_qp_async_error_upcall;
688         ep->rep_attr.qp_context = ep;
689         /* send_cq and recv_cq initialized below */
690         ep->rep_attr.srq = NULL;
691         ep->rep_attr.cap.max_send_wr = cdata->max_requests;
692         switch (ia->ri_memreg_strategy) {
693         case RPCRDMA_FRMR: {
694                 int depth = 7;
695
696                 /* Add room for frmr register and invalidate WRs.
697                  * 1. FRMR reg WR for head
698                  * 2. FRMR invalidate WR for head
699                  * 3. N FRMR reg WRs for pagelist
700                  * 4. N FRMR invalidate WRs for pagelist
701                  * 5. FRMR reg WR for tail
702                  * 6. FRMR invalidate WR for tail
703                  * 7. The RDMA_SEND WR
704                  */
705
706                 /* Calculate N if the device max FRMR depth is smaller than
707                  * RPCRDMA_MAX_DATA_SEGS.
708                  */
709                 if (ia->ri_max_frmr_depth < RPCRDMA_MAX_DATA_SEGS) {
710                         int delta = RPCRDMA_MAX_DATA_SEGS -
711                                     ia->ri_max_frmr_depth;
712
713                         do {
714                                 depth += 2; /* FRMR reg + invalidate */
715                                 delta -= ia->ri_max_frmr_depth;
716                         } while (delta > 0);
717
718                 }
719                 ep->rep_attr.cap.max_send_wr *= depth;
720                 if (ep->rep_attr.cap.max_send_wr > devattr.max_qp_wr) {
721                         cdata->max_requests = devattr.max_qp_wr / depth;
722                         if (!cdata->max_requests)
723                                 return -EINVAL;
724                         ep->rep_attr.cap.max_send_wr = cdata->max_requests *
725                                                        depth;
726                 }
727                 break;
728         }
729         default:
730                 break;
731         }
732         ep->rep_attr.cap.max_recv_wr = cdata->max_requests;
733         ep->rep_attr.cap.max_send_sge = (cdata->padding ? 4 : 2);
734         ep->rep_attr.cap.max_recv_sge = 1;
735         ep->rep_attr.cap.max_inline_data = 0;
736         ep->rep_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
737         ep->rep_attr.qp_type = IB_QPT_RC;
738         ep->rep_attr.port_num = ~0;
739
740         dprintk("RPC:       %s: requested max: dtos: send %d recv %d; "
741                 "iovs: send %d recv %d\n",
742                 __func__,
743                 ep->rep_attr.cap.max_send_wr,
744                 ep->rep_attr.cap.max_recv_wr,
745                 ep->rep_attr.cap.max_send_sge,
746                 ep->rep_attr.cap.max_recv_sge);
747
748         /* set trigger for requesting send completion */
749         ep->rep_cqinit = ep->rep_attr.cap.max_send_wr/2 - 1;
750         if (ep->rep_cqinit <= 2)
751                 ep->rep_cqinit = 0;
752         INIT_CQCOUNT(ep);
753         ep->rep_ia = ia;
754         init_waitqueue_head(&ep->rep_connect_wait);
755         INIT_DELAYED_WORK(&ep->rep_connect_worker, rpcrdma_connect_worker);
756
757         sendcq = ib_create_cq(ia->ri_id->device, rpcrdma_sendcq_upcall,
758                                   rpcrdma_cq_async_error_upcall, ep,
759                                   ep->rep_attr.cap.max_send_wr + 1, 0);
760         if (IS_ERR(sendcq)) {
761                 rc = PTR_ERR(sendcq);
762                 dprintk("RPC:       %s: failed to create send CQ: %i\n",
763                         __func__, rc);
764                 goto out1;
765         }
766
767         rc = ib_req_notify_cq(sendcq, IB_CQ_NEXT_COMP);
768         if (rc) {
769                 dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
770                         __func__, rc);
771                 goto out2;
772         }
773
774         recvcq = ib_create_cq(ia->ri_id->device, rpcrdma_recvcq_upcall,
775                                   rpcrdma_cq_async_error_upcall, ep,
776                                   ep->rep_attr.cap.max_recv_wr + 1, 0);
777         if (IS_ERR(recvcq)) {
778                 rc = PTR_ERR(recvcq);
779                 dprintk("RPC:       %s: failed to create recv CQ: %i\n",
780                         __func__, rc);
781                 goto out2;
782         }
783
784         rc = ib_req_notify_cq(recvcq, IB_CQ_NEXT_COMP);
785         if (rc) {
786                 dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
787                         __func__, rc);
788                 ib_destroy_cq(recvcq);
789                 goto out2;
790         }
791
792         ep->rep_attr.send_cq = sendcq;
793         ep->rep_attr.recv_cq = recvcq;
794
795         /* Initialize cma parameters */
796
797         /* RPC/RDMA does not use private data */
798         ep->rep_remote_cma.private_data = NULL;
799         ep->rep_remote_cma.private_data_len = 0;
800
801         /* Client offers RDMA Read but does not initiate */
802         ep->rep_remote_cma.initiator_depth = 0;
803         if (devattr.max_qp_rd_atom > 32)        /* arbitrary but <= 255 */
804                 ep->rep_remote_cma.responder_resources = 32;
805         else
806                 ep->rep_remote_cma.responder_resources = devattr.max_qp_rd_atom;
807
808         ep->rep_remote_cma.retry_count = 7;
809         ep->rep_remote_cma.flow_control = 0;
810         ep->rep_remote_cma.rnr_retry_count = 0;
811
812         return 0;
813
814 out2:
815         err = ib_destroy_cq(sendcq);
816         if (err)
817                 dprintk("RPC:       %s: ib_destroy_cq returned %i\n",
818                         __func__, err);
819 out1:
820         return rc;
821 }
822
823 /*
824  * rpcrdma_ep_destroy
825  *
826  * Disconnect and destroy endpoint. After this, the only
827  * valid operations on the ep are to free it (if dynamically
828  * allocated) or re-create it.
829  */
830 void
831 rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
832 {
833         int rc;
834
835         dprintk("RPC:       %s: entering, connected is %d\n",
836                 __func__, ep->rep_connected);
837
838         cancel_delayed_work_sync(&ep->rep_connect_worker);
839
840         if (ia->ri_id->qp) {
841                 rc = rpcrdma_ep_disconnect(ep, ia);
842                 if (rc)
843                         dprintk("RPC:       %s: rpcrdma_ep_disconnect"
844                                 " returned %i\n", __func__, rc);
845                 rdma_destroy_qp(ia->ri_id);
846                 ia->ri_id->qp = NULL;
847         }
848
849         /* padding - could be done in rpcrdma_buffer_destroy... */
850         if (ep->rep_pad_mr) {
851                 rpcrdma_deregister_internal(ia, ep->rep_pad_mr, &ep->rep_pad);
852                 ep->rep_pad_mr = NULL;
853         }
854
855         rpcrdma_clean_cq(ep->rep_attr.recv_cq);
856         rc = ib_destroy_cq(ep->rep_attr.recv_cq);
857         if (rc)
858                 dprintk("RPC:       %s: ib_destroy_cq returned %i\n",
859                         __func__, rc);
860
861         rpcrdma_clean_cq(ep->rep_attr.send_cq);
862         rc = ib_destroy_cq(ep->rep_attr.send_cq);
863         if (rc)
864                 dprintk("RPC:       %s: ib_destroy_cq returned %i\n",
865                         __func__, rc);
866 }
867
868 /*
869  * Connect unconnected endpoint.
870  */
871 int
872 rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
873 {
874         struct rdma_cm_id *id, *old;
875         int rc = 0;
876         int retry_count = 0;
877
878         if (ep->rep_connected != 0) {
879                 struct rpcrdma_xprt *xprt;
880 retry:
881                 dprintk("RPC:       %s: reconnecting...\n", __func__);
882                 rc = rpcrdma_ep_disconnect(ep, ia);
883                 if (rc && rc != -ENOTCONN)
884                         dprintk("RPC:       %s: rpcrdma_ep_disconnect"
885                                 " status %i\n", __func__, rc);
886                 rpcrdma_flush_cqs(ep);
887
888                 if (ia->ri_memreg_strategy == RPCRDMA_FRMR)
889                         rpcrdma_reset_frmrs(ia);
890
891                 xprt = container_of(ia, struct rpcrdma_xprt, rx_ia);
892                 id = rpcrdma_create_id(xprt, ia,
893                                 (struct sockaddr *)&xprt->rx_data.addr);
894                 if (IS_ERR(id)) {
895                         rc = -EHOSTUNREACH;
896                         goto out;
897                 }
898                 /* TEMP TEMP TEMP - fail if new device:
899                  * Deregister/remarshal *all* requests!
900                  * Close and recreate adapter, pd, etc!
901                  * Re-determine all attributes still sane!
902                  * More stuff I haven't thought of!
903                  * Rrrgh!
904                  */
905                 if (ia->ri_id->device != id->device) {
906                         printk("RPC:       %s: can't reconnect on "
907                                 "different device!\n", __func__);
908                         rdma_destroy_id(id);
909                         rc = -ENETUNREACH;
910                         goto out;
911                 }
912                 /* END TEMP */
913                 rc = rdma_create_qp(id, ia->ri_pd, &ep->rep_attr);
914                 if (rc) {
915                         dprintk("RPC:       %s: rdma_create_qp failed %i\n",
916                                 __func__, rc);
917                         rdma_destroy_id(id);
918                         rc = -ENETUNREACH;
919                         goto out;
920                 }
921
922                 write_lock(&ia->ri_qplock);
923                 old = ia->ri_id;
924                 ia->ri_id = id;
925                 write_unlock(&ia->ri_qplock);
926
927                 rdma_destroy_qp(old);
928                 rdma_destroy_id(old);
929         } else {
930                 dprintk("RPC:       %s: connecting...\n", __func__);
931                 rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr);
932                 if (rc) {
933                         dprintk("RPC:       %s: rdma_create_qp failed %i\n",
934                                 __func__, rc);
935                         /* do not update ep->rep_connected */
936                         return -ENETUNREACH;
937                 }
938         }
939
940         ep->rep_connected = 0;
941
942         rc = rdma_connect(ia->ri_id, &ep->rep_remote_cma);
943         if (rc) {
944                 dprintk("RPC:       %s: rdma_connect() failed with %i\n",
945                                 __func__, rc);
946                 goto out;
947         }
948
949         wait_event_interruptible(ep->rep_connect_wait, ep->rep_connected != 0);
950
951         /*
952          * Check state. A non-peer reject indicates no listener
953          * (ECONNREFUSED), which may be a transient state. All
954          * others indicate a transport condition which has already
955          * undergone a best-effort.
956          */
957         if (ep->rep_connected == -ECONNREFUSED &&
958             ++retry_count <= RDMA_CONNECT_RETRY_MAX) {
959                 dprintk("RPC:       %s: non-peer_reject, retry\n", __func__);
960                 goto retry;
961         }
962         if (ep->rep_connected <= 0) {
963                 /* Sometimes, the only way to reliably connect to remote
964                  * CMs is to use same nonzero values for ORD and IRD. */
965                 if (retry_count++ <= RDMA_CONNECT_RETRY_MAX + 1 &&
966                     (ep->rep_remote_cma.responder_resources == 0 ||
967                      ep->rep_remote_cma.initiator_depth !=
968                                 ep->rep_remote_cma.responder_resources)) {
969                         if (ep->rep_remote_cma.responder_resources == 0)
970                                 ep->rep_remote_cma.responder_resources = 1;
971                         ep->rep_remote_cma.initiator_depth =
972                                 ep->rep_remote_cma.responder_resources;
973                         goto retry;
974                 }
975                 rc = ep->rep_connected;
976         } else {
977                 dprintk("RPC:       %s: connected\n", __func__);
978         }
979
980 out:
981         if (rc)
982                 ep->rep_connected = rc;
983         return rc;
984 }
985
986 /*
987  * rpcrdma_ep_disconnect
988  *
989  * This is separate from destroy to facilitate the ability
990  * to reconnect without recreating the endpoint.
991  *
992  * This call is not reentrant, and must not be made in parallel
993  * on the same endpoint.
994  */
995 int
996 rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
997 {
998         int rc;
999
1000         rpcrdma_flush_cqs(ep);
1001         rc = rdma_disconnect(ia->ri_id);
1002         if (!rc) {
1003                 /* returns without wait if not connected */
1004                 wait_event_interruptible(ep->rep_connect_wait,
1005                                                         ep->rep_connected != 1);
1006                 dprintk("RPC:       %s: after wait, %sconnected\n", __func__,
1007                         (ep->rep_connected == 1) ? "still " : "dis");
1008         } else {
1009                 dprintk("RPC:       %s: rdma_disconnect %i\n", __func__, rc);
1010                 ep->rep_connected = rc;
1011         }
1012         return rc;
1013 }
1014
1015 /*
1016  * Initialize buffer memory
1017  */
1018 int
1019 rpcrdma_buffer_create(struct rpcrdma_buffer *buf, struct rpcrdma_ep *ep,
1020         struct rpcrdma_ia *ia, struct rpcrdma_create_data_internal *cdata)
1021 {
1022         char *p;
1023         size_t len, rlen, wlen;
1024         int i, rc;
1025         struct rpcrdma_mw *r;
1026
1027         buf->rb_max_requests = cdata->max_requests;
1028         spin_lock_init(&buf->rb_lock);
1029         atomic_set(&buf->rb_credits, 1);
1030
1031         /* Need to allocate:
1032          *   1.  arrays for send and recv pointers
1033          *   2.  arrays of struct rpcrdma_req to fill in pointers
1034          *   3.  array of struct rpcrdma_rep for replies
1035          *   4.  padding, if any
1036          *   5.  mw's, fmr's or frmr's, if any
1037          * Send/recv buffers in req/rep need to be registered
1038          */
1039
1040         len = buf->rb_max_requests *
1041                 (sizeof(struct rpcrdma_req *) + sizeof(struct rpcrdma_rep *));
1042         len += cdata->padding;
1043         switch (ia->ri_memreg_strategy) {
1044         case RPCRDMA_FRMR:
1045                 len += buf->rb_max_requests * RPCRDMA_MAX_SEGS *
1046                                 sizeof(struct rpcrdma_mw);
1047                 break;
1048         case RPCRDMA_MTHCAFMR:
1049                 /* TBD we are perhaps overallocating here */
1050                 len += (buf->rb_max_requests + 1) * RPCRDMA_MAX_SEGS *
1051                                 sizeof(struct rpcrdma_mw);
1052                 break;
1053         default:
1054                 break;
1055         }
1056
1057         /* allocate 1, 4 and 5 in one shot */
1058         p = kzalloc(len, GFP_KERNEL);
1059         if (p == NULL) {
1060                 dprintk("RPC:       %s: req_t/rep_t/pad kzalloc(%zd) failed\n",
1061                         __func__, len);
1062                 rc = -ENOMEM;
1063                 goto out;
1064         }
1065         buf->rb_pool = p;       /* for freeing it later */
1066
1067         buf->rb_send_bufs = (struct rpcrdma_req **) p;
1068         p = (char *) &buf->rb_send_bufs[buf->rb_max_requests];
1069         buf->rb_recv_bufs = (struct rpcrdma_rep **) p;
1070         p = (char *) &buf->rb_recv_bufs[buf->rb_max_requests];
1071
1072         /*
1073          * Register the zeroed pad buffer, if any.
1074          */
1075         if (cdata->padding) {
1076                 rc = rpcrdma_register_internal(ia, p, cdata->padding,
1077                                             &ep->rep_pad_mr, &ep->rep_pad);
1078                 if (rc)
1079                         goto out;
1080         }
1081         p += cdata->padding;
1082
1083         INIT_LIST_HEAD(&buf->rb_mws);
1084         INIT_LIST_HEAD(&buf->rb_all);
1085         r = (struct rpcrdma_mw *)p;
1086         switch (ia->ri_memreg_strategy) {
1087         case RPCRDMA_FRMR:
1088                 for (i = buf->rb_max_requests * RPCRDMA_MAX_SEGS; i; i--) {
1089                         r->r.frmr.fr_mr = ib_alloc_fast_reg_mr(ia->ri_pd,
1090                                                 ia->ri_max_frmr_depth);
1091                         if (IS_ERR(r->r.frmr.fr_mr)) {
1092                                 rc = PTR_ERR(r->r.frmr.fr_mr);
1093                                 dprintk("RPC:       %s: ib_alloc_fast_reg_mr"
1094                                         " failed %i\n", __func__, rc);
1095                                 goto out;
1096                         }
1097                         r->r.frmr.fr_pgl = ib_alloc_fast_reg_page_list(
1098                                                 ia->ri_id->device,
1099                                                 ia->ri_max_frmr_depth);
1100                         if (IS_ERR(r->r.frmr.fr_pgl)) {
1101                                 rc = PTR_ERR(r->r.frmr.fr_pgl);
1102                                 dprintk("RPC:       %s: "
1103                                         "ib_alloc_fast_reg_page_list "
1104                                         "failed %i\n", __func__, rc);
1105
1106                                 ib_dereg_mr(r->r.frmr.fr_mr);
1107                                 goto out;
1108                         }
1109                         list_add(&r->mw_all, &buf->rb_all);
1110                         list_add(&r->mw_list, &buf->rb_mws);
1111                         ++r;
1112                 }
1113                 break;
1114         case RPCRDMA_MTHCAFMR:
1115                 /* TBD we are perhaps overallocating here */
1116                 for (i = (buf->rb_max_requests+1) * RPCRDMA_MAX_SEGS; i; i--) {
1117                         static struct ib_fmr_attr fa =
1118                                 { RPCRDMA_MAX_DATA_SEGS, 1, PAGE_SHIFT };
1119                         r->r.fmr = ib_alloc_fmr(ia->ri_pd,
1120                                 IB_ACCESS_REMOTE_WRITE | IB_ACCESS_REMOTE_READ,
1121                                 &fa);
1122                         if (IS_ERR(r->r.fmr)) {
1123                                 rc = PTR_ERR(r->r.fmr);
1124                                 dprintk("RPC:       %s: ib_alloc_fmr"
1125                                         " failed %i\n", __func__, rc);
1126                                 goto out;
1127                         }
1128                         list_add(&r->mw_all, &buf->rb_all);
1129                         list_add(&r->mw_list, &buf->rb_mws);
1130                         ++r;
1131                 }
1132                 break;
1133         default:
1134                 break;
1135         }
1136
1137         /*
1138          * Allocate/init the request/reply buffers. Doing this
1139          * using kmalloc for now -- one for each buf.
1140          */
1141         wlen = 1 << fls(cdata->inline_wsize + sizeof(struct rpcrdma_req));
1142         rlen = 1 << fls(cdata->inline_rsize + sizeof(struct rpcrdma_rep));
1143         dprintk("RPC:       %s: wlen = %zu, rlen = %zu\n",
1144                 __func__, wlen, rlen);
1145
1146         for (i = 0; i < buf->rb_max_requests; i++) {
1147                 struct rpcrdma_req *req;
1148                 struct rpcrdma_rep *rep;
1149
1150                 req = kmalloc(wlen, GFP_KERNEL);
1151                 if (req == NULL) {
1152                         dprintk("RPC:       %s: request buffer %d alloc"
1153                                 " failed\n", __func__, i);
1154                         rc = -ENOMEM;
1155                         goto out;
1156                 }
1157                 memset(req, 0, sizeof(struct rpcrdma_req));
1158                 buf->rb_send_bufs[i] = req;
1159                 buf->rb_send_bufs[i]->rl_buffer = buf;
1160
1161                 rc = rpcrdma_register_internal(ia, req->rl_base,
1162                                 wlen - offsetof(struct rpcrdma_req, rl_base),
1163                                 &buf->rb_send_bufs[i]->rl_handle,
1164                                 &buf->rb_send_bufs[i]->rl_iov);
1165                 if (rc)
1166                         goto out;
1167
1168                 buf->rb_send_bufs[i]->rl_size = wlen -
1169                                                 sizeof(struct rpcrdma_req);
1170
1171                 rep = kmalloc(rlen, GFP_KERNEL);
1172                 if (rep == NULL) {
1173                         dprintk("RPC:       %s: reply buffer %d alloc failed\n",
1174                                 __func__, i);
1175                         rc = -ENOMEM;
1176                         goto out;
1177                 }
1178                 memset(rep, 0, sizeof(struct rpcrdma_rep));
1179                 buf->rb_recv_bufs[i] = rep;
1180                 buf->rb_recv_bufs[i]->rr_buffer = buf;
1181
1182                 rc = rpcrdma_register_internal(ia, rep->rr_base,
1183                                 rlen - offsetof(struct rpcrdma_rep, rr_base),
1184                                 &buf->rb_recv_bufs[i]->rr_handle,
1185                                 &buf->rb_recv_bufs[i]->rr_iov);
1186                 if (rc)
1187                         goto out;
1188
1189         }
1190         dprintk("RPC:       %s: max_requests %d\n",
1191                 __func__, buf->rb_max_requests);
1192         /* done */
1193         return 0;
1194 out:
1195         rpcrdma_buffer_destroy(buf);
1196         return rc;
1197 }
1198
1199 /*
1200  * Unregister and destroy buffer memory. Need to deal with
1201  * partial initialization, so it's callable from failed create.
1202  * Must be called before destroying endpoint, as registrations
1203  * reference it.
1204  */
1205 void
1206 rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
1207 {
1208         int rc, i;
1209         struct rpcrdma_ia *ia = rdmab_to_ia(buf);
1210         struct rpcrdma_mw *r;
1211
1212         /* clean up in reverse order from create
1213          *   1.  recv mr memory (mr free, then kfree)
1214          *   2.  send mr memory (mr free, then kfree)
1215          *   3.  padding (if any) [moved to rpcrdma_ep_destroy]
1216          *   4.  arrays
1217          */
1218         dprintk("RPC:       %s: entering\n", __func__);
1219
1220         for (i = 0; i < buf->rb_max_requests; i++) {
1221                 if (buf->rb_recv_bufs && buf->rb_recv_bufs[i]) {
1222                         rpcrdma_deregister_internal(ia,
1223                                         buf->rb_recv_bufs[i]->rr_handle,
1224                                         &buf->rb_recv_bufs[i]->rr_iov);
1225                         kfree(buf->rb_recv_bufs[i]);
1226                 }
1227                 if (buf->rb_send_bufs && buf->rb_send_bufs[i]) {
1228                         rpcrdma_deregister_internal(ia,
1229                                         buf->rb_send_bufs[i]->rl_handle,
1230                                         &buf->rb_send_bufs[i]->rl_iov);
1231                         kfree(buf->rb_send_bufs[i]);
1232                 }
1233         }
1234
1235         while (!list_empty(&buf->rb_mws)) {
1236                 r = list_entry(buf->rb_mws.next,
1237                         struct rpcrdma_mw, mw_list);
1238                 list_del(&r->mw_all);
1239                 list_del(&r->mw_list);
1240                 switch (ia->ri_memreg_strategy) {
1241                 case RPCRDMA_FRMR:
1242                         rc = ib_dereg_mr(r->r.frmr.fr_mr);
1243                         if (rc)
1244                                 dprintk("RPC:       %s:"
1245                                         " ib_dereg_mr"
1246                                         " failed %i\n",
1247                                         __func__, rc);
1248                         ib_free_fast_reg_page_list(r->r.frmr.fr_pgl);
1249                         break;
1250                 case RPCRDMA_MTHCAFMR:
1251                         rc = ib_dealloc_fmr(r->r.fmr);
1252                         if (rc)
1253                                 dprintk("RPC:       %s:"
1254                                         " ib_dealloc_fmr"
1255                                         " failed %i\n",
1256                                         __func__, rc);
1257                         break;
1258                 default:
1259                         break;
1260                 }
1261         }
1262
1263         kfree(buf->rb_pool);
1264 }
1265
1266 /* After a disconnect, a flushed FAST_REG_MR can leave an FRMR in
1267  * an unusable state. Find FRMRs in this state and dereg / reg
1268  * each.  FRMRs that are VALID and attached to an rpcrdma_req are
1269  * also torn down.
1270  *
1271  * This gives all in-use FRMRs a fresh rkey and leaves them INVALID.
1272  *
1273  * This is invoked only in the transport connect worker in order
1274  * to serialize with rpcrdma_register_frmr_external().
1275  */
1276 static void
1277 rpcrdma_reset_frmrs(struct rpcrdma_ia *ia)
1278 {
1279         struct rpcrdma_xprt *r_xprt =
1280                                 container_of(ia, struct rpcrdma_xprt, rx_ia);
1281         struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
1282         struct list_head *pos;
1283         struct rpcrdma_mw *r;
1284         int rc;
1285
1286         list_for_each(pos, &buf->rb_all) {
1287                 r = list_entry(pos, struct rpcrdma_mw, mw_all);
1288
1289                 if (r->r.frmr.fr_state == FRMR_IS_INVALID)
1290                         continue;
1291
1292                 rc = ib_dereg_mr(r->r.frmr.fr_mr);
1293                 if (rc)
1294                         dprintk("RPC:       %s: ib_dereg_mr failed %i\n",
1295                                 __func__, rc);
1296                 ib_free_fast_reg_page_list(r->r.frmr.fr_pgl);
1297
1298                 r->r.frmr.fr_mr = ib_alloc_fast_reg_mr(ia->ri_pd,
1299                                         ia->ri_max_frmr_depth);
1300                 if (IS_ERR(r->r.frmr.fr_mr)) {
1301                         rc = PTR_ERR(r->r.frmr.fr_mr);
1302                         dprintk("RPC:       %s: ib_alloc_fast_reg_mr"
1303                                 " failed %i\n", __func__, rc);
1304                         continue;
1305                 }
1306                 r->r.frmr.fr_pgl = ib_alloc_fast_reg_page_list(
1307                                         ia->ri_id->device,
1308                                         ia->ri_max_frmr_depth);
1309                 if (IS_ERR(r->r.frmr.fr_pgl)) {
1310                         rc = PTR_ERR(r->r.frmr.fr_pgl);
1311                         dprintk("RPC:       %s: "
1312                                 "ib_alloc_fast_reg_page_list "
1313                                 "failed %i\n", __func__, rc);
1314
1315                         ib_dereg_mr(r->r.frmr.fr_mr);
1316                         continue;
1317                 }
1318                 r->r.frmr.fr_state = FRMR_IS_INVALID;
1319         }
1320 }
1321
1322 /* "*mw" can be NULL when rpcrdma_buffer_get_mrs() fails, leaving
1323  * some req segments uninitialized.
1324  */
1325 static void
1326 rpcrdma_buffer_put_mr(struct rpcrdma_mw **mw, struct rpcrdma_buffer *buf)
1327 {
1328         if (*mw) {
1329                 list_add_tail(&(*mw)->mw_list, &buf->rb_mws);
1330                 *mw = NULL;
1331         }
1332 }
1333
1334 /* Cycle mw's back in reverse order, and "spin" them.
1335  * This delays and scrambles reuse as much as possible.
1336  */
1337 static void
1338 rpcrdma_buffer_put_mrs(struct rpcrdma_req *req, struct rpcrdma_buffer *buf)
1339 {
1340         struct rpcrdma_mr_seg *seg = req->rl_segments;
1341         struct rpcrdma_mr_seg *seg1 = seg;
1342         int i;
1343
1344         for (i = 1, seg++; i < RPCRDMA_MAX_SEGS; seg++, i++)
1345                 rpcrdma_buffer_put_mr(&seg->mr_chunk.rl_mw, buf);
1346         rpcrdma_buffer_put_mr(&seg1->mr_chunk.rl_mw, buf);
1347 }
1348
1349 static void
1350 rpcrdma_buffer_put_sendbuf(struct rpcrdma_req *req, struct rpcrdma_buffer *buf)
1351 {
1352         buf->rb_send_bufs[--buf->rb_send_index] = req;
1353         req->rl_niovs = 0;
1354         if (req->rl_reply) {
1355                 buf->rb_recv_bufs[--buf->rb_recv_index] = req->rl_reply;
1356                 req->rl_reply->rr_func = NULL;
1357                 req->rl_reply = NULL;
1358         }
1359 }
1360
1361 /* rpcrdma_unmap_one() was already done by rpcrdma_deregister_frmr_external().
1362  * Redo only the ib_post_send().
1363  */
1364 static void
1365 rpcrdma_retry_local_inv(struct rpcrdma_mw *r, struct rpcrdma_ia *ia)
1366 {
1367         struct rpcrdma_xprt *r_xprt =
1368                                 container_of(ia, struct rpcrdma_xprt, rx_ia);
1369         struct ib_send_wr invalidate_wr, *bad_wr;
1370         int rc;
1371
1372         dprintk("RPC:       %s: FRMR %p is stale\n", __func__, r);
1373
1374         /* When this FRMR is re-inserted into rb_mws, it is no longer stale */
1375         r->r.frmr.fr_state = FRMR_IS_VALID;
1376
1377         memset(&invalidate_wr, 0, sizeof(invalidate_wr));
1378         invalidate_wr.wr_id = (unsigned long)(void *)r;
1379         invalidate_wr.opcode = IB_WR_LOCAL_INV;
1380         invalidate_wr.send_flags = IB_SEND_SIGNALED;
1381         invalidate_wr.ex.invalidate_rkey = r->r.frmr.fr_mr->rkey;
1382         DECR_CQCOUNT(&r_xprt->rx_ep);
1383
1384         dprintk("RPC:       %s: frmr %p invalidating rkey %08x\n",
1385                 __func__, r, r->r.frmr.fr_mr->rkey);
1386
1387         read_lock(&ia->ri_qplock);
1388         rc = ib_post_send(ia->ri_id->qp, &invalidate_wr, &bad_wr);
1389         read_unlock(&ia->ri_qplock);
1390         if (rc) {
1391                 /* Force rpcrdma_buffer_get() to retry */
1392                 r->r.frmr.fr_state = FRMR_IS_STALE;
1393                 dprintk("RPC:       %s: ib_post_send failed, %i\n",
1394                         __func__, rc);
1395         }
1396 }
1397
1398 static void
1399 rpcrdma_retry_flushed_linv(struct list_head *stale,
1400                            struct rpcrdma_buffer *buf)
1401 {
1402         struct rpcrdma_ia *ia = rdmab_to_ia(buf);
1403         struct list_head *pos;
1404         struct rpcrdma_mw *r;
1405         unsigned long flags;
1406
1407         list_for_each(pos, stale) {
1408                 r = list_entry(pos, struct rpcrdma_mw, mw_list);
1409                 rpcrdma_retry_local_inv(r, ia);
1410         }
1411
1412         spin_lock_irqsave(&buf->rb_lock, flags);
1413         list_splice_tail(stale, &buf->rb_mws);
1414         spin_unlock_irqrestore(&buf->rb_lock, flags);
1415 }
1416
1417 static struct rpcrdma_req *
1418 rpcrdma_buffer_get_frmrs(struct rpcrdma_req *req, struct rpcrdma_buffer *buf,
1419                          struct list_head *stale)
1420 {
1421         struct rpcrdma_mw *r;
1422         int i;
1423
1424         i = RPCRDMA_MAX_SEGS - 1;
1425         while (!list_empty(&buf->rb_mws)) {
1426                 r = list_entry(buf->rb_mws.next,
1427                                struct rpcrdma_mw, mw_list);
1428                 list_del(&r->mw_list);
1429                 if (r->r.frmr.fr_state == FRMR_IS_STALE) {
1430                         list_add(&r->mw_list, stale);
1431                         continue;
1432                 }
1433                 req->rl_segments[i].mr_chunk.rl_mw = r;
1434                 if (unlikely(i-- == 0))
1435                         return req;     /* Success */
1436         }
1437
1438         /* Not enough entries on rb_mws for this req */
1439         rpcrdma_buffer_put_sendbuf(req, buf);
1440         rpcrdma_buffer_put_mrs(req, buf);
1441         return NULL;
1442 }
1443
1444 static struct rpcrdma_req *
1445 rpcrdma_buffer_get_fmrs(struct rpcrdma_req *req, struct rpcrdma_buffer *buf)
1446 {
1447         struct rpcrdma_mw *r;
1448         int i;
1449
1450         i = RPCRDMA_MAX_SEGS - 1;
1451         while (!list_empty(&buf->rb_mws)) {
1452                 r = list_entry(buf->rb_mws.next,
1453                                struct rpcrdma_mw, mw_list);
1454                 list_del(&r->mw_list);
1455                 req->rl_segments[i].mr_chunk.rl_mw = r;
1456                 if (unlikely(i-- == 0))
1457                         return req;     /* Success */
1458         }
1459
1460         /* Not enough entries on rb_mws for this req */
1461         rpcrdma_buffer_put_sendbuf(req, buf);
1462         rpcrdma_buffer_put_mrs(req, buf);
1463         return NULL;
1464 }
1465
1466 /*
1467  * Get a set of request/reply buffers.
1468  *
1469  * Reply buffer (if needed) is attached to send buffer upon return.
1470  * Rule:
1471  *    rb_send_index and rb_recv_index MUST always be pointing to the
1472  *    *next* available buffer (non-NULL). They are incremented after
1473  *    removing buffers, and decremented *before* returning them.
1474  */
1475 struct rpcrdma_req *
1476 rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
1477 {
1478         struct rpcrdma_ia *ia = rdmab_to_ia(buffers);
1479         struct list_head stale;
1480         struct rpcrdma_req *req;
1481         unsigned long flags;
1482
1483         spin_lock_irqsave(&buffers->rb_lock, flags);
1484         if (buffers->rb_send_index == buffers->rb_max_requests) {
1485                 spin_unlock_irqrestore(&buffers->rb_lock, flags);
1486                 dprintk("RPC:       %s: out of request buffers\n", __func__);
1487                 return ((struct rpcrdma_req *)NULL);
1488         }
1489
1490         req = buffers->rb_send_bufs[buffers->rb_send_index];
1491         if (buffers->rb_send_index < buffers->rb_recv_index) {
1492                 dprintk("RPC:       %s: %d extra receives outstanding (ok)\n",
1493                         __func__,
1494                         buffers->rb_recv_index - buffers->rb_send_index);
1495                 req->rl_reply = NULL;
1496         } else {
1497                 req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
1498                 buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
1499         }
1500         buffers->rb_send_bufs[buffers->rb_send_index++] = NULL;
1501
1502         INIT_LIST_HEAD(&stale);
1503         switch (ia->ri_memreg_strategy) {
1504         case RPCRDMA_FRMR:
1505                 req = rpcrdma_buffer_get_frmrs(req, buffers, &stale);
1506                 break;
1507         case RPCRDMA_MTHCAFMR:
1508                 req = rpcrdma_buffer_get_fmrs(req, buffers);
1509                 break;
1510         default:
1511                 break;
1512         }
1513         spin_unlock_irqrestore(&buffers->rb_lock, flags);
1514         if (!list_empty(&stale))
1515                 rpcrdma_retry_flushed_linv(&stale, buffers);
1516         return req;
1517 }
1518
1519 /*
1520  * Put request/reply buffers back into pool.
1521  * Pre-decrement counter/array index.
1522  */
1523 void
1524 rpcrdma_buffer_put(struct rpcrdma_req *req)
1525 {
1526         struct rpcrdma_buffer *buffers = req->rl_buffer;
1527         struct rpcrdma_ia *ia = rdmab_to_ia(buffers);
1528         unsigned long flags;
1529
1530         spin_lock_irqsave(&buffers->rb_lock, flags);
1531         rpcrdma_buffer_put_sendbuf(req, buffers);
1532         switch (ia->ri_memreg_strategy) {
1533         case RPCRDMA_FRMR:
1534         case RPCRDMA_MTHCAFMR:
1535                 rpcrdma_buffer_put_mrs(req, buffers);
1536                 break;
1537         default:
1538                 break;
1539         }
1540         spin_unlock_irqrestore(&buffers->rb_lock, flags);
1541 }
1542
1543 /*
1544  * Recover reply buffers from pool.
1545  * This happens when recovering from error conditions.
1546  * Post-increment counter/array index.
1547  */
1548 void
1549 rpcrdma_recv_buffer_get(struct rpcrdma_req *req)
1550 {
1551         struct rpcrdma_buffer *buffers = req->rl_buffer;
1552         unsigned long flags;
1553
1554         if (req->rl_iov.length == 0)    /* special case xprt_rdma_allocate() */
1555                 buffers = ((struct rpcrdma_req *) buffers)->rl_buffer;
1556         spin_lock_irqsave(&buffers->rb_lock, flags);
1557         if (buffers->rb_recv_index < buffers->rb_max_requests) {
1558                 req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
1559                 buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
1560         }
1561         spin_unlock_irqrestore(&buffers->rb_lock, flags);
1562 }
1563
1564 /*
1565  * Put reply buffers back into pool when not attached to
1566  * request. This happens in error conditions.
1567  */
1568 void
1569 rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
1570 {
1571         struct rpcrdma_buffer *buffers = rep->rr_buffer;
1572         unsigned long flags;
1573
1574         rep->rr_func = NULL;
1575         spin_lock_irqsave(&buffers->rb_lock, flags);
1576         buffers->rb_recv_bufs[--buffers->rb_recv_index] = rep;
1577         spin_unlock_irqrestore(&buffers->rb_lock, flags);
1578 }
1579
1580 /*
1581  * Wrappers for internal-use kmalloc memory registration, used by buffer code.
1582  */
1583
1584 int
1585 rpcrdma_register_internal(struct rpcrdma_ia *ia, void *va, int len,
1586                                 struct ib_mr **mrp, struct ib_sge *iov)
1587 {
1588         struct ib_phys_buf ipb;
1589         struct ib_mr *mr;
1590         int rc;
1591
1592         /*
1593          * All memory passed here was kmalloc'ed, therefore phys-contiguous.
1594          */
1595         iov->addr = ib_dma_map_single(ia->ri_id->device,
1596                         va, len, DMA_BIDIRECTIONAL);
1597         if (ib_dma_mapping_error(ia->ri_id->device, iov->addr))
1598                 return -ENOMEM;
1599
1600         iov->length = len;
1601
1602         if (ia->ri_have_dma_lkey) {
1603                 *mrp = NULL;
1604                 iov->lkey = ia->ri_dma_lkey;
1605                 return 0;
1606         } else if (ia->ri_bind_mem != NULL) {
1607                 *mrp = NULL;
1608                 iov->lkey = ia->ri_bind_mem->lkey;
1609                 return 0;
1610         }
1611
1612         ipb.addr = iov->addr;
1613         ipb.size = iov->length;
1614         mr = ib_reg_phys_mr(ia->ri_pd, &ipb, 1,
1615                         IB_ACCESS_LOCAL_WRITE, &iov->addr);
1616
1617         dprintk("RPC:       %s: phys convert: 0x%llx "
1618                         "registered 0x%llx length %d\n",
1619                         __func__, (unsigned long long)ipb.addr,
1620                         (unsigned long long)iov->addr, len);
1621
1622         if (IS_ERR(mr)) {
1623                 *mrp = NULL;
1624                 rc = PTR_ERR(mr);
1625                 dprintk("RPC:       %s: failed with %i\n", __func__, rc);
1626         } else {
1627                 *mrp = mr;
1628                 iov->lkey = mr->lkey;
1629                 rc = 0;
1630         }
1631
1632         return rc;
1633 }
1634
1635 int
1636 rpcrdma_deregister_internal(struct rpcrdma_ia *ia,
1637                                 struct ib_mr *mr, struct ib_sge *iov)
1638 {
1639         int rc;
1640
1641         ib_dma_unmap_single(ia->ri_id->device,
1642                         iov->addr, iov->length, DMA_BIDIRECTIONAL);
1643
1644         if (NULL == mr)
1645                 return 0;
1646
1647         rc = ib_dereg_mr(mr);
1648         if (rc)
1649                 dprintk("RPC:       %s: ib_dereg_mr failed %i\n", __func__, rc);
1650         return rc;
1651 }
1652
1653 /*
1654  * Wrappers for chunk registration, shared by read/write chunk code.
1655  */
1656
1657 static void
1658 rpcrdma_map_one(struct rpcrdma_ia *ia, struct rpcrdma_mr_seg *seg, int writing)
1659 {
1660         seg->mr_dir = writing ? DMA_FROM_DEVICE : DMA_TO_DEVICE;
1661         seg->mr_dmalen = seg->mr_len;
1662         if (seg->mr_page)
1663                 seg->mr_dma = ib_dma_map_page(ia->ri_id->device,
1664                                 seg->mr_page, offset_in_page(seg->mr_offset),
1665                                 seg->mr_dmalen, seg->mr_dir);
1666         else
1667                 seg->mr_dma = ib_dma_map_single(ia->ri_id->device,
1668                                 seg->mr_offset,
1669                                 seg->mr_dmalen, seg->mr_dir);
1670         if (ib_dma_mapping_error(ia->ri_id->device, seg->mr_dma)) {
1671                 dprintk("RPC:       %s: mr_dma %llx mr_offset %p mr_dma_len %zu\n",
1672                         __func__,
1673                         (unsigned long long)seg->mr_dma,
1674                         seg->mr_offset, seg->mr_dmalen);
1675         }
1676 }
1677
1678 static void
1679 rpcrdma_unmap_one(struct rpcrdma_ia *ia, struct rpcrdma_mr_seg *seg)
1680 {
1681         if (seg->mr_page)
1682                 ib_dma_unmap_page(ia->ri_id->device,
1683                                 seg->mr_dma, seg->mr_dmalen, seg->mr_dir);
1684         else
1685                 ib_dma_unmap_single(ia->ri_id->device,
1686                                 seg->mr_dma, seg->mr_dmalen, seg->mr_dir);
1687 }
1688
1689 static int
1690 rpcrdma_register_frmr_external(struct rpcrdma_mr_seg *seg,
1691                         int *nsegs, int writing, struct rpcrdma_ia *ia,
1692                         struct rpcrdma_xprt *r_xprt)
1693 {
1694         struct rpcrdma_mr_seg *seg1 = seg;
1695         struct rpcrdma_mw *mw = seg1->mr_chunk.rl_mw;
1696         struct rpcrdma_frmr *frmr = &mw->r.frmr;
1697         struct ib_mr *mr = frmr->fr_mr;
1698         struct ib_send_wr invalidate_wr, frmr_wr, *bad_wr, *post_wr;
1699
1700         u8 key;
1701         int len, pageoff;
1702         int i, rc;
1703         int seg_len;
1704         u64 pa;
1705         int page_no;
1706
1707         pageoff = offset_in_page(seg1->mr_offset);
1708         seg1->mr_offset -= pageoff;     /* start of page */
1709         seg1->mr_len += pageoff;
1710         len = -pageoff;
1711         if (*nsegs > ia->ri_max_frmr_depth)
1712                 *nsegs = ia->ri_max_frmr_depth;
1713         for (page_no = i = 0; i < *nsegs;) {
1714                 rpcrdma_map_one(ia, seg, writing);
1715                 pa = seg->mr_dma;
1716                 for (seg_len = seg->mr_len; seg_len > 0; seg_len -= PAGE_SIZE) {
1717                         frmr->fr_pgl->page_list[page_no++] = pa;
1718                         pa += PAGE_SIZE;
1719                 }
1720                 len += seg->mr_len;
1721                 ++seg;
1722                 ++i;
1723                 /* Check for holes */
1724                 if ((i < *nsegs && offset_in_page(seg->mr_offset)) ||
1725                     offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len))
1726                         break;
1727         }
1728         dprintk("RPC:       %s: Using frmr %p to map %d segments\n",
1729                 __func__, mw, i);
1730
1731         if (unlikely(frmr->fr_state != FRMR_IS_INVALID)) {
1732                 dprintk("RPC:       %s: frmr %x left valid, posting invalidate.\n",
1733                         __func__, mr->rkey);
1734                 /* Invalidate before using. */
1735                 memset(&invalidate_wr, 0, sizeof invalidate_wr);
1736                 invalidate_wr.wr_id = (unsigned long)(void *)mw;
1737                 invalidate_wr.next = &frmr_wr;
1738                 invalidate_wr.opcode = IB_WR_LOCAL_INV;
1739                 invalidate_wr.send_flags = IB_SEND_SIGNALED;
1740                 invalidate_wr.ex.invalidate_rkey = mr->rkey;
1741                 DECR_CQCOUNT(&r_xprt->rx_ep);
1742                 post_wr = &invalidate_wr;
1743         } else
1744                 post_wr = &frmr_wr;
1745
1746         /* Prepare FRMR WR */
1747         memset(&frmr_wr, 0, sizeof frmr_wr);
1748         frmr_wr.wr_id = (unsigned long)(void *)mw;
1749         frmr_wr.opcode = IB_WR_FAST_REG_MR;
1750         frmr_wr.send_flags = IB_SEND_SIGNALED;
1751         frmr_wr.wr.fast_reg.iova_start = seg1->mr_dma;
1752         frmr_wr.wr.fast_reg.page_list = frmr->fr_pgl;
1753         frmr_wr.wr.fast_reg.page_list_len = page_no;
1754         frmr_wr.wr.fast_reg.page_shift = PAGE_SHIFT;
1755         frmr_wr.wr.fast_reg.length = page_no << PAGE_SHIFT;
1756         if (frmr_wr.wr.fast_reg.length < len) {
1757                 rc = -EIO;
1758                 goto out_err;
1759         }
1760
1761         /* Bump the key */
1762         key = (u8)(mr->rkey & 0x000000FF);
1763         ib_update_fast_reg_key(mr, ++key);
1764
1765         frmr_wr.wr.fast_reg.access_flags = (writing ?
1766                                 IB_ACCESS_REMOTE_WRITE | IB_ACCESS_LOCAL_WRITE :
1767                                 IB_ACCESS_REMOTE_READ);
1768         frmr_wr.wr.fast_reg.rkey = mr->rkey;
1769         DECR_CQCOUNT(&r_xprt->rx_ep);
1770
1771         rc = ib_post_send(ia->ri_id->qp, post_wr, &bad_wr);
1772
1773         if (rc) {
1774                 dprintk("RPC:       %s: failed ib_post_send for register,"
1775                         " status %i\n", __func__, rc);
1776                 ib_update_fast_reg_key(mr, --key);
1777                 goto out_err;
1778         } else {
1779                 seg1->mr_rkey = mr->rkey;
1780                 seg1->mr_base = seg1->mr_dma + pageoff;
1781                 seg1->mr_nsegs = i;
1782                 seg1->mr_len = len;
1783         }
1784         *nsegs = i;
1785         return 0;
1786 out_err:
1787         while (i--)
1788                 rpcrdma_unmap_one(ia, --seg);
1789         return rc;
1790 }
1791
1792 static int
1793 rpcrdma_deregister_frmr_external(struct rpcrdma_mr_seg *seg,
1794                         struct rpcrdma_ia *ia, struct rpcrdma_xprt *r_xprt)
1795 {
1796         struct rpcrdma_mr_seg *seg1 = seg;
1797         struct ib_send_wr invalidate_wr, *bad_wr;
1798         int rc;
1799
1800         memset(&invalidate_wr, 0, sizeof invalidate_wr);
1801         invalidate_wr.wr_id = (unsigned long)(void *)seg1->mr_chunk.rl_mw;
1802         invalidate_wr.opcode = IB_WR_LOCAL_INV;
1803         invalidate_wr.send_flags = IB_SEND_SIGNALED;
1804         invalidate_wr.ex.invalidate_rkey = seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey;
1805         DECR_CQCOUNT(&r_xprt->rx_ep);
1806
1807         read_lock(&ia->ri_qplock);
1808         while (seg1->mr_nsegs--)
1809                 rpcrdma_unmap_one(ia, seg++);
1810         rc = ib_post_send(ia->ri_id->qp, &invalidate_wr, &bad_wr);
1811         read_unlock(&ia->ri_qplock);
1812         if (rc)
1813                 dprintk("RPC:       %s: failed ib_post_send for invalidate,"
1814                         " status %i\n", __func__, rc);
1815         return rc;
1816 }
1817
1818 static int
1819 rpcrdma_register_fmr_external(struct rpcrdma_mr_seg *seg,
1820                         int *nsegs, int writing, struct rpcrdma_ia *ia)
1821 {
1822         struct rpcrdma_mr_seg *seg1 = seg;
1823         u64 physaddrs[RPCRDMA_MAX_DATA_SEGS];
1824         int len, pageoff, i, rc;
1825
1826         pageoff = offset_in_page(seg1->mr_offset);
1827         seg1->mr_offset -= pageoff;     /* start of page */
1828         seg1->mr_len += pageoff;
1829         len = -pageoff;
1830         if (*nsegs > RPCRDMA_MAX_DATA_SEGS)
1831                 *nsegs = RPCRDMA_MAX_DATA_SEGS;
1832         for (i = 0; i < *nsegs;) {
1833                 rpcrdma_map_one(ia, seg, writing);
1834                 physaddrs[i] = seg->mr_dma;
1835                 len += seg->mr_len;
1836                 ++seg;
1837                 ++i;
1838                 /* Check for holes */
1839                 if ((i < *nsegs && offset_in_page(seg->mr_offset)) ||
1840                     offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len))
1841                         break;
1842         }
1843         rc = ib_map_phys_fmr(seg1->mr_chunk.rl_mw->r.fmr,
1844                                 physaddrs, i, seg1->mr_dma);
1845         if (rc) {
1846                 dprintk("RPC:       %s: failed ib_map_phys_fmr "
1847                         "%u@0x%llx+%i (%d)... status %i\n", __func__,
1848                         len, (unsigned long long)seg1->mr_dma,
1849                         pageoff, i, rc);
1850                 while (i--)
1851                         rpcrdma_unmap_one(ia, --seg);
1852         } else {
1853                 seg1->mr_rkey = seg1->mr_chunk.rl_mw->r.fmr->rkey;
1854                 seg1->mr_base = seg1->mr_dma + pageoff;
1855                 seg1->mr_nsegs = i;
1856                 seg1->mr_len = len;
1857         }
1858         *nsegs = i;
1859         return rc;
1860 }
1861
1862 static int
1863 rpcrdma_deregister_fmr_external(struct rpcrdma_mr_seg *seg,
1864                         struct rpcrdma_ia *ia)
1865 {
1866         struct rpcrdma_mr_seg *seg1 = seg;
1867         LIST_HEAD(l);
1868         int rc;
1869
1870         list_add(&seg1->mr_chunk.rl_mw->r.fmr->list, &l);
1871         rc = ib_unmap_fmr(&l);
1872         read_lock(&ia->ri_qplock);
1873         while (seg1->mr_nsegs--)
1874                 rpcrdma_unmap_one(ia, seg++);
1875         read_unlock(&ia->ri_qplock);
1876         if (rc)
1877                 dprintk("RPC:       %s: failed ib_unmap_fmr,"
1878                         " status %i\n", __func__, rc);
1879         return rc;
1880 }
1881
1882 int
1883 rpcrdma_register_external(struct rpcrdma_mr_seg *seg,
1884                         int nsegs, int writing, struct rpcrdma_xprt *r_xprt)
1885 {
1886         struct rpcrdma_ia *ia = &r_xprt->rx_ia;
1887         int rc = 0;
1888
1889         switch (ia->ri_memreg_strategy) {
1890
1891 #if RPCRDMA_PERSISTENT_REGISTRATION
1892         case RPCRDMA_ALLPHYSICAL:
1893                 rpcrdma_map_one(ia, seg, writing);
1894                 seg->mr_rkey = ia->ri_bind_mem->rkey;
1895                 seg->mr_base = seg->mr_dma;
1896                 seg->mr_nsegs = 1;
1897                 nsegs = 1;
1898                 break;
1899 #endif
1900
1901         /* Registration using frmr registration */
1902         case RPCRDMA_FRMR:
1903                 rc = rpcrdma_register_frmr_external(seg, &nsegs, writing, ia, r_xprt);
1904                 break;
1905
1906         /* Registration using fmr memory registration */
1907         case RPCRDMA_MTHCAFMR:
1908                 rc = rpcrdma_register_fmr_external(seg, &nsegs, writing, ia);
1909                 break;
1910
1911         default:
1912                 return -1;
1913         }
1914         if (rc)
1915                 return -1;
1916
1917         return nsegs;
1918 }
1919
1920 int
1921 rpcrdma_deregister_external(struct rpcrdma_mr_seg *seg,
1922                 struct rpcrdma_xprt *r_xprt)
1923 {
1924         struct rpcrdma_ia *ia = &r_xprt->rx_ia;
1925         int nsegs = seg->mr_nsegs, rc;
1926
1927         switch (ia->ri_memreg_strategy) {
1928
1929 #if RPCRDMA_PERSISTENT_REGISTRATION
1930         case RPCRDMA_ALLPHYSICAL:
1931                 read_lock(&ia->ri_qplock);
1932                 rpcrdma_unmap_one(ia, seg);
1933                 read_unlock(&ia->ri_qplock);
1934                 break;
1935 #endif
1936
1937         case RPCRDMA_FRMR:
1938                 rc = rpcrdma_deregister_frmr_external(seg, ia, r_xprt);
1939                 break;
1940
1941         case RPCRDMA_MTHCAFMR:
1942                 rc = rpcrdma_deregister_fmr_external(seg, ia);
1943                 break;
1944
1945         default:
1946                 break;
1947         }
1948         return nsegs;
1949 }
1950
1951 /*
1952  * Prepost any receive buffer, then post send.
1953  *
1954  * Receive buffer is donated to hardware, reclaimed upon recv completion.
1955  */
1956 int
1957 rpcrdma_ep_post(struct rpcrdma_ia *ia,
1958                 struct rpcrdma_ep *ep,
1959                 struct rpcrdma_req *req)
1960 {
1961         struct ib_send_wr send_wr, *send_wr_fail;
1962         struct rpcrdma_rep *rep = req->rl_reply;
1963         int rc;
1964
1965         if (rep) {
1966                 rc = rpcrdma_ep_post_recv(ia, ep, rep);
1967                 if (rc)
1968                         goto out;
1969                 req->rl_reply = NULL;
1970         }
1971
1972         send_wr.next = NULL;
1973         send_wr.wr_id = 0ULL;   /* no send cookie */
1974         send_wr.sg_list = req->rl_send_iov;
1975         send_wr.num_sge = req->rl_niovs;
1976         send_wr.opcode = IB_WR_SEND;
1977         if (send_wr.num_sge == 4)       /* no need to sync any pad (constant) */
1978                 ib_dma_sync_single_for_device(ia->ri_id->device,
1979                         req->rl_send_iov[3].addr, req->rl_send_iov[3].length,
1980                         DMA_TO_DEVICE);
1981         ib_dma_sync_single_for_device(ia->ri_id->device,
1982                 req->rl_send_iov[1].addr, req->rl_send_iov[1].length,
1983                 DMA_TO_DEVICE);
1984         ib_dma_sync_single_for_device(ia->ri_id->device,
1985                 req->rl_send_iov[0].addr, req->rl_send_iov[0].length,
1986                 DMA_TO_DEVICE);
1987
1988         if (DECR_CQCOUNT(ep) > 0)
1989                 send_wr.send_flags = 0;
1990         else { /* Provider must take a send completion every now and then */
1991                 INIT_CQCOUNT(ep);
1992                 send_wr.send_flags = IB_SEND_SIGNALED;
1993         }
1994
1995         rc = ib_post_send(ia->ri_id->qp, &send_wr, &send_wr_fail);
1996         if (rc)
1997                 dprintk("RPC:       %s: ib_post_send returned %i\n", __func__,
1998                         rc);
1999 out:
2000         return rc;
2001 }
2002
2003 /*
2004  * (Re)post a receive buffer.
2005  */
2006 int
2007 rpcrdma_ep_post_recv(struct rpcrdma_ia *ia,
2008                      struct rpcrdma_ep *ep,
2009                      struct rpcrdma_rep *rep)
2010 {
2011         struct ib_recv_wr recv_wr, *recv_wr_fail;
2012         int rc;
2013
2014         recv_wr.next = NULL;
2015         recv_wr.wr_id = (u64) (unsigned long) rep;
2016         recv_wr.sg_list = &rep->rr_iov;
2017         recv_wr.num_sge = 1;
2018
2019         ib_dma_sync_single_for_cpu(ia->ri_id->device,
2020                 rep->rr_iov.addr, rep->rr_iov.length, DMA_BIDIRECTIONAL);
2021
2022         rc = ib_post_recv(ia->ri_id->qp, &recv_wr, &recv_wr_fail);
2023
2024         if (rc)
2025                 dprintk("RPC:       %s: ib_post_recv returned %i\n", __func__,
2026                         rc);
2027         return rc;
2028 }
2029
2030 /* Physical mapping means one Read/Write list entry per-page.
2031  * All list entries must fit within an inline buffer
2032  *
2033  * NB: The server must return a Write list for NFS READ,
2034  *     which has the same constraint. Factor in the inline
2035  *     rsize as well.
2036  */
2037 static size_t
2038 rpcrdma_physical_max_payload(struct rpcrdma_xprt *r_xprt)
2039 {
2040         struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
2041         unsigned int inline_size, pages;
2042
2043         inline_size = min_t(unsigned int,
2044                             cdata->inline_wsize, cdata->inline_rsize);
2045         inline_size -= RPCRDMA_HDRLEN_MIN;
2046         pages = inline_size / sizeof(struct rpcrdma_segment);
2047         return pages << PAGE_SHIFT;
2048 }
2049
2050 static size_t
2051 rpcrdma_mr_max_payload(struct rpcrdma_xprt *r_xprt)
2052 {
2053         return RPCRDMA_MAX_DATA_SEGS << PAGE_SHIFT;
2054 }
2055
2056 size_t
2057 rpcrdma_max_payload(struct rpcrdma_xprt *r_xprt)
2058 {
2059         size_t result;
2060
2061         switch (r_xprt->rx_ia.ri_memreg_strategy) {
2062         case RPCRDMA_ALLPHYSICAL:
2063                 result = rpcrdma_physical_max_payload(r_xprt);
2064                 break;
2065         default:
2066                 result = rpcrdma_mr_max_payload(r_xprt);
2067         }
2068         return result;
2069 }