]> git.karo-electronics.de Git - karo-tx-linux.git/blob - net/sunrpc/xprtrdma/svc_rdma_sendto.c
2eb3df698e11b880fc715a6d712b9e187655fb90
[karo-tx-linux.git] / net / sunrpc / xprtrdma / svc_rdma_sendto.c
1 /*
2  * Copyright (c) 2014 Open Grid Computing, Inc. All rights reserved.
3  * Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved.
4  *
5  * This software is available to you under a choice of one of two
6  * licenses.  You may choose to be licensed under the terms of the GNU
7  * General Public License (GPL) Version 2, available from the file
8  * COPYING in the main directory of this source tree, or the BSD-type
9  * license below:
10  *
11  * Redistribution and use in source and binary forms, with or without
12  * modification, are permitted provided that the following conditions
13  * are met:
14  *
15  *      Redistributions of source code must retain the above copyright
16  *      notice, this list of conditions and the following disclaimer.
17  *
18  *      Redistributions in binary form must reproduce the above
19  *      copyright notice, this list of conditions and the following
20  *      disclaimer in the documentation and/or other materials provided
21  *      with the distribution.
22  *
23  *      Neither the name of the Network Appliance, Inc. nor the names of
24  *      its contributors may be used to endorse or promote products
25  *      derived from this software without specific prior written
26  *      permission.
27  *
28  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
29  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
30  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
31  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
32  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
33  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
34  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
35  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
36  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
37  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
38  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
39  *
40  * Author: Tom Tucker <tom@opengridcomputing.com>
41  */
42
43 #include <linux/sunrpc/debug.h>
44 #include <linux/sunrpc/rpc_rdma.h>
45 #include <linux/spinlock.h>
46 #include <asm/unaligned.h>
47 #include <rdma/ib_verbs.h>
48 #include <rdma/rdma_cm.h>
49 #include <linux/sunrpc/svc_rdma.h>
50
51 #define RPCDBG_FACILITY RPCDBG_SVCXPRT
52
53 static u32 xdr_padsize(u32 len)
54 {
55         return (len & 3) ? (4 - (len & 3)) : 0;
56 }
57
58 int svc_rdma_map_xdr(struct svcxprt_rdma *xprt,
59                      struct xdr_buf *xdr,
60                      struct svc_rdma_req_map *vec,
61                      bool write_chunk_present)
62 {
63         int sge_no;
64         u32 sge_bytes;
65         u32 page_bytes;
66         u32 page_off;
67         int page_no;
68
69         if (xdr->len !=
70             (xdr->head[0].iov_len + xdr->page_len + xdr->tail[0].iov_len)) {
71                 pr_err("svcrdma: %s: XDR buffer length error\n", __func__);
72                 return -EIO;
73         }
74
75         /* Skip the first sge, this is for the RPCRDMA header */
76         sge_no = 1;
77
78         /* Head SGE */
79         vec->sge[sge_no].iov_base = xdr->head[0].iov_base;
80         vec->sge[sge_no].iov_len = xdr->head[0].iov_len;
81         sge_no++;
82
83         /* pages SGE */
84         page_no = 0;
85         page_bytes = xdr->page_len;
86         page_off = xdr->page_base;
87         while (page_bytes) {
88                 vec->sge[sge_no].iov_base =
89                         page_address(xdr->pages[page_no]) + page_off;
90                 sge_bytes = min_t(u32, page_bytes, (PAGE_SIZE - page_off));
91                 page_bytes -= sge_bytes;
92                 vec->sge[sge_no].iov_len = sge_bytes;
93
94                 sge_no++;
95                 page_no++;
96                 page_off = 0; /* reset for next time through loop */
97         }
98
99         /* Tail SGE */
100         if (xdr->tail[0].iov_len) {
101                 unsigned char *base = xdr->tail[0].iov_base;
102                 size_t len = xdr->tail[0].iov_len;
103                 u32 xdr_pad = xdr_padsize(xdr->page_len);
104
105                 if (write_chunk_present && xdr_pad) {
106                         base += xdr_pad;
107                         len -= xdr_pad;
108                 }
109
110                 if (len) {
111                         vec->sge[sge_no].iov_base = base;
112                         vec->sge[sge_no].iov_len = len;
113                         sge_no++;
114                 }
115         }
116
117         dprintk("svcrdma: %s: sge_no %d page_no %d "
118                 "page_base %u page_len %u head_len %zu tail_len %zu\n",
119                 __func__, sge_no, page_no, xdr->page_base, xdr->page_len,
120                 xdr->head[0].iov_len, xdr->tail[0].iov_len);
121
122         vec->count = sge_no;
123         return 0;
124 }
125
126 static dma_addr_t dma_map_xdr(struct svcxprt_rdma *xprt,
127                               struct xdr_buf *xdr,
128                               u32 xdr_off, size_t len, int dir)
129 {
130         struct page *page;
131         dma_addr_t dma_addr;
132         if (xdr_off < xdr->head[0].iov_len) {
133                 /* This offset is in the head */
134                 xdr_off += (unsigned long)xdr->head[0].iov_base & ~PAGE_MASK;
135                 page = virt_to_page(xdr->head[0].iov_base);
136         } else {
137                 xdr_off -= xdr->head[0].iov_len;
138                 if (xdr_off < xdr->page_len) {
139                         /* This offset is in the page list */
140                         xdr_off += xdr->page_base;
141                         page = xdr->pages[xdr_off >> PAGE_SHIFT];
142                         xdr_off &= ~PAGE_MASK;
143                 } else {
144                         /* This offset is in the tail */
145                         xdr_off -= xdr->page_len;
146                         xdr_off += (unsigned long)
147                                 xdr->tail[0].iov_base & ~PAGE_MASK;
148                         page = virt_to_page(xdr->tail[0].iov_base);
149                 }
150         }
151         dma_addr = ib_dma_map_page(xprt->sc_cm_id->device, page, xdr_off,
152                                    min_t(size_t, PAGE_SIZE, len), dir);
153         return dma_addr;
154 }
155
156 /* Parse the RPC Call's transport header.
157  */
158 static void svc_rdma_get_write_arrays(struct rpcrdma_msg *rmsgp,
159                                       struct rpcrdma_write_array **write,
160                                       struct rpcrdma_write_array **reply)
161 {
162         __be32 *p;
163
164         p = (__be32 *)&rmsgp->rm_body.rm_chunks[0];
165
166         /* Read list */
167         while (*p++ != xdr_zero)
168                 p += 5;
169
170         /* Write list */
171         if (*p != xdr_zero) {
172                 *write = (struct rpcrdma_write_array *)p;
173                 while (*p++ != xdr_zero)
174                         p += 1 + be32_to_cpu(*p) * 4;
175         } else {
176                 *write = NULL;
177                 p++;
178         }
179
180         /* Reply chunk */
181         if (*p != xdr_zero)
182                 *reply = (struct rpcrdma_write_array *)p;
183         else
184                 *reply = NULL;
185 }
186
187 /* RPC-over-RDMA Version One private extension: Remote Invalidation.
188  * Responder's choice: requester signals it can handle Send With
189  * Invalidate, and responder chooses one rkey to invalidate.
190  *
191  * Find a candidate rkey to invalidate when sending a reply.  Picks the
192  * first R_key it finds in the chunk lists.
193  *
194  * Returns zero if RPC's chunk lists are empty.
195  */
196 static u32 svc_rdma_get_inv_rkey(__be32 *rdma_argp,
197                                  __be32 *wr_lst, __be32 *rp_ch)
198 {
199         __be32 *p;
200
201         p = rdma_argp + rpcrdma_fixed_maxsz;
202         if (*p != xdr_zero)
203                 p += 2;
204         else if (wr_lst && be32_to_cpup(wr_lst + 1))
205                 p = wr_lst + 2;
206         else if (rp_ch && be32_to_cpup(rp_ch + 1))
207                 p = rp_ch + 2;
208         else
209                 return 0;
210         return be32_to_cpup(p);
211 }
212
213 static int svc_rdma_dma_map_page(struct svcxprt_rdma *rdma,
214                                  struct svc_rdma_op_ctxt *ctxt,
215                                  unsigned int sge_no,
216                                  struct page *page,
217                                  unsigned int offset,
218                                  unsigned int len)
219 {
220         struct ib_device *dev = rdma->sc_cm_id->device;
221         dma_addr_t dma_addr;
222
223         dma_addr = ib_dma_map_page(dev, page, offset, len, DMA_TO_DEVICE);
224         if (ib_dma_mapping_error(dev, dma_addr))
225                 return -EIO;
226
227         ctxt->sge[sge_no].addr = dma_addr;
228         ctxt->sge[sge_no].length = len;
229         ctxt->sge[sge_no].lkey = rdma->sc_pd->local_dma_lkey;
230         svc_rdma_count_mappings(rdma, ctxt);
231         return 0;
232 }
233
234 /**
235  * svc_rdma_map_reply_hdr - DMA map the transport header buffer
236  * @rdma: controlling transport
237  * @ctxt: op_ctxt for the Send WR
238  * @rdma_resp: buffer containing transport header
239  * @len: length of transport header
240  *
241  * Returns:
242  *      %0 if the header is DMA mapped,
243  *      %-EIO if DMA mapping failed.
244  */
245 int svc_rdma_map_reply_hdr(struct svcxprt_rdma *rdma,
246                            struct svc_rdma_op_ctxt *ctxt,
247                            __be32 *rdma_resp,
248                            unsigned int len)
249 {
250         ctxt->direction = DMA_TO_DEVICE;
251         ctxt->pages[0] = virt_to_page(rdma_resp);
252         ctxt->count = 1;
253         return svc_rdma_dma_map_page(rdma, ctxt, 0, ctxt->pages[0], 0, len);
254 }
255
256 /* Assumptions:
257  * - The specified write_len can be represented in sc_max_sge * PAGE_SIZE
258  */
259 static int send_write(struct svcxprt_rdma *xprt, struct svc_rqst *rqstp,
260                       u32 rmr, u64 to,
261                       u32 xdr_off, int write_len,
262                       struct svc_rdma_req_map *vec)
263 {
264         struct ib_rdma_wr write_wr;
265         struct ib_sge *sge;
266         int xdr_sge_no;
267         int sge_no;
268         int sge_bytes;
269         int sge_off;
270         int bc;
271         struct svc_rdma_op_ctxt *ctxt;
272
273         if (vec->count > RPCSVC_MAXPAGES) {
274                 pr_err("svcrdma: Too many pages (%lu)\n", vec->count);
275                 return -EIO;
276         }
277
278         dprintk("svcrdma: RDMA_WRITE rmr=%x, to=%llx, xdr_off=%d, "
279                 "write_len=%d, vec->sge=%p, vec->count=%lu\n",
280                 rmr, (unsigned long long)to, xdr_off,
281                 write_len, vec->sge, vec->count);
282
283         ctxt = svc_rdma_get_context(xprt);
284         ctxt->direction = DMA_TO_DEVICE;
285         sge = ctxt->sge;
286
287         /* Find the SGE associated with xdr_off */
288         for (bc = xdr_off, xdr_sge_no = 1; bc && xdr_sge_no < vec->count;
289              xdr_sge_no++) {
290                 if (vec->sge[xdr_sge_no].iov_len > bc)
291                         break;
292                 bc -= vec->sge[xdr_sge_no].iov_len;
293         }
294
295         sge_off = bc;
296         bc = write_len;
297         sge_no = 0;
298
299         /* Copy the remaining SGE */
300         while (bc != 0) {
301                 sge_bytes = min_t(size_t,
302                           bc, vec->sge[xdr_sge_no].iov_len-sge_off);
303                 sge[sge_no].length = sge_bytes;
304                 sge[sge_no].addr =
305                         dma_map_xdr(xprt, &rqstp->rq_res, xdr_off,
306                                     sge_bytes, DMA_TO_DEVICE);
307                 xdr_off += sge_bytes;
308                 if (ib_dma_mapping_error(xprt->sc_cm_id->device,
309                                          sge[sge_no].addr))
310                         goto err;
311                 svc_rdma_count_mappings(xprt, ctxt);
312                 sge[sge_no].lkey = xprt->sc_pd->local_dma_lkey;
313                 ctxt->count++;
314                 sge_off = 0;
315                 sge_no++;
316                 xdr_sge_no++;
317                 if (xdr_sge_no > vec->count) {
318                         pr_err("svcrdma: Too many sges (%d)\n", xdr_sge_no);
319                         goto err;
320                 }
321                 bc -= sge_bytes;
322                 if (sge_no == xprt->sc_max_sge)
323                         break;
324         }
325
326         /* Prepare WRITE WR */
327         memset(&write_wr, 0, sizeof write_wr);
328         ctxt->cqe.done = svc_rdma_wc_write;
329         write_wr.wr.wr_cqe = &ctxt->cqe;
330         write_wr.wr.sg_list = &sge[0];
331         write_wr.wr.num_sge = sge_no;
332         write_wr.wr.opcode = IB_WR_RDMA_WRITE;
333         write_wr.wr.send_flags = IB_SEND_SIGNALED;
334         write_wr.rkey = rmr;
335         write_wr.remote_addr = to;
336
337         /* Post It */
338         atomic_inc(&rdma_stat_write);
339         if (svc_rdma_send(xprt, &write_wr.wr))
340                 goto err;
341         return write_len - bc;
342  err:
343         svc_rdma_unmap_dma(ctxt);
344         svc_rdma_put_context(ctxt, 0);
345         return -EIO;
346 }
347
348 noinline
349 static int send_write_chunks(struct svcxprt_rdma *xprt,
350                              struct rpcrdma_write_array *wr_ary,
351                              struct rpcrdma_msg *rdma_resp,
352                              struct svc_rqst *rqstp,
353                              struct svc_rdma_req_map *vec)
354 {
355         u32 xfer_len = rqstp->rq_res.page_len;
356         int write_len;
357         u32 xdr_off;
358         int chunk_off;
359         int chunk_no;
360         int nchunks;
361         struct rpcrdma_write_array *res_ary;
362         int ret;
363
364         res_ary = (struct rpcrdma_write_array *)
365                 &rdma_resp->rm_body.rm_chunks[1];
366
367         /* Write chunks start at the pagelist */
368         nchunks = be32_to_cpu(wr_ary->wc_nchunks);
369         for (xdr_off = rqstp->rq_res.head[0].iov_len, chunk_no = 0;
370              xfer_len && chunk_no < nchunks;
371              chunk_no++) {
372                 struct rpcrdma_segment *arg_ch;
373                 u64 rs_offset;
374
375                 arg_ch = &wr_ary->wc_array[chunk_no].wc_target;
376                 write_len = min(xfer_len, be32_to_cpu(arg_ch->rs_length));
377
378                 /* Prepare the response chunk given the length actually
379                  * written */
380                 xdr_decode_hyper((__be32 *)&arg_ch->rs_offset, &rs_offset);
381                 svc_rdma_xdr_encode_array_chunk(res_ary, chunk_no,
382                                                 arg_ch->rs_handle,
383                                                 arg_ch->rs_offset,
384                                                 write_len);
385                 chunk_off = 0;
386                 while (write_len) {
387                         ret = send_write(xprt, rqstp,
388                                          be32_to_cpu(arg_ch->rs_handle),
389                                          rs_offset + chunk_off,
390                                          xdr_off,
391                                          write_len,
392                                          vec);
393                         if (ret <= 0)
394                                 goto out_err;
395                         chunk_off += ret;
396                         xdr_off += ret;
397                         xfer_len -= ret;
398                         write_len -= ret;
399                 }
400         }
401         /* Update the req with the number of chunks actually used */
402         svc_rdma_xdr_encode_write_list(rdma_resp, chunk_no);
403
404         return rqstp->rq_res.page_len;
405
406 out_err:
407         pr_err("svcrdma: failed to send write chunks, rc=%d\n", ret);
408         return -EIO;
409 }
410
411 noinline
412 static int send_reply_chunks(struct svcxprt_rdma *xprt,
413                              struct rpcrdma_write_array *rp_ary,
414                              struct rpcrdma_msg *rdma_resp,
415                              struct svc_rqst *rqstp,
416                              struct svc_rdma_req_map *vec)
417 {
418         u32 xfer_len = rqstp->rq_res.len;
419         int write_len;
420         u32 xdr_off;
421         int chunk_no;
422         int chunk_off;
423         int nchunks;
424         struct rpcrdma_segment *ch;
425         struct rpcrdma_write_array *res_ary;
426         int ret;
427
428         /* XXX: need to fix when reply lists occur with read-list and or
429          * write-list */
430         res_ary = (struct rpcrdma_write_array *)
431                 &rdma_resp->rm_body.rm_chunks[2];
432
433         /* xdr offset starts at RPC message */
434         nchunks = be32_to_cpu(rp_ary->wc_nchunks);
435         for (xdr_off = 0, chunk_no = 0;
436              xfer_len && chunk_no < nchunks;
437              chunk_no++) {
438                 u64 rs_offset;
439                 ch = &rp_ary->wc_array[chunk_no].wc_target;
440                 write_len = min(xfer_len, be32_to_cpu(ch->rs_length));
441
442                 /* Prepare the reply chunk given the length actually
443                  * written */
444                 xdr_decode_hyper((__be32 *)&ch->rs_offset, &rs_offset);
445                 svc_rdma_xdr_encode_array_chunk(res_ary, chunk_no,
446                                                 ch->rs_handle, ch->rs_offset,
447                                                 write_len);
448                 chunk_off = 0;
449                 while (write_len) {
450                         ret = send_write(xprt, rqstp,
451                                          be32_to_cpu(ch->rs_handle),
452                                          rs_offset + chunk_off,
453                                          xdr_off,
454                                          write_len,
455                                          vec);
456                         if (ret <= 0)
457                                 goto out_err;
458                         chunk_off += ret;
459                         xdr_off += ret;
460                         xfer_len -= ret;
461                         write_len -= ret;
462                 }
463         }
464         /* Update the req with the number of chunks actually used */
465         svc_rdma_xdr_encode_reply_array(res_ary, chunk_no);
466
467         return rqstp->rq_res.len;
468
469 out_err:
470         pr_err("svcrdma: failed to send reply chunks, rc=%d\n", ret);
471         return -EIO;
472 }
473
474 /* The svc_rqst and all resources it owns are released as soon as
475  * svc_rdma_sendto returns. Transfer pages under I/O to the ctxt
476  * so they are released by the Send completion handler.
477  */
478 static void svc_rdma_save_io_pages(struct svc_rqst *rqstp,
479                                    struct svc_rdma_op_ctxt *ctxt)
480 {
481         int i, pages = rqstp->rq_next_page - rqstp->rq_respages;
482
483         ctxt->count += pages;
484         for (i = 0; i < pages; i++) {
485                 ctxt->pages[i + 1] = rqstp->rq_respages[i];
486                 rqstp->rq_respages[i] = NULL;
487         }
488         rqstp->rq_next_page = rqstp->rq_respages + 1;
489 }
490
491 /**
492  * svc_rdma_post_send_wr - Set up and post one Send Work Request
493  * @rdma: controlling transport
494  * @ctxt: op_ctxt for transmitting the Send WR
495  * @num_sge: number of SGEs to send
496  * @inv_rkey: R_key argument to Send With Invalidate, or zero
497  *
498  * Returns:
499  *      %0 if the Send* was posted successfully,
500  *      %-ENOTCONN if the connection was lost or dropped,
501  *      %-EINVAL if there was a problem with the Send we built,
502  *      %-ENOMEM if ib_post_send failed.
503  */
504 int svc_rdma_post_send_wr(struct svcxprt_rdma *rdma,
505                           struct svc_rdma_op_ctxt *ctxt, int num_sge,
506                           u32 inv_rkey)
507 {
508         struct ib_send_wr *send_wr = &ctxt->send_wr;
509
510         dprintk("svcrdma: posting Send WR with %u sge(s)\n", num_sge);
511
512         send_wr->next = NULL;
513         ctxt->cqe.done = svc_rdma_wc_send;
514         send_wr->wr_cqe = &ctxt->cqe;
515         send_wr->sg_list = ctxt->sge;
516         send_wr->num_sge = num_sge;
517         send_wr->send_flags = IB_SEND_SIGNALED;
518         if (inv_rkey) {
519                 send_wr->opcode = IB_WR_SEND_WITH_INV;
520                 send_wr->ex.invalidate_rkey = inv_rkey;
521         } else {
522                 send_wr->opcode = IB_WR_SEND;
523         }
524
525         return svc_rdma_send(rdma, send_wr);
526 }
527
528 /* This function prepares the portion of the RPCRDMA message to be
529  * sent in the RDMA_SEND. This function is called after data sent via
530  * RDMA has already been transmitted. There are three cases:
531  * - The RPCRDMA header, RPC header, and payload are all sent in a
532  *   single RDMA_SEND. This is the "inline" case.
533  * - The RPCRDMA header and some portion of the RPC header and data
534  *   are sent via this RDMA_SEND and another portion of the data is
535  *   sent via RDMA.
536  * - The RPCRDMA header [NOMSG] is sent in this RDMA_SEND and the RPC
537  *   header and data are all transmitted via RDMA.
538  * In all three cases, this function prepares the RPCRDMA header in
539  * sge[0], the 'type' parameter indicates the type to place in the
540  * RPCRDMA header, and the 'byte_count' field indicates how much of
541  * the XDR to include in this RDMA_SEND. NB: The offset of the payload
542  * to send is zero in the XDR.
543  */
544 static int send_reply(struct svcxprt_rdma *rdma,
545                       struct svc_rqst *rqstp,
546                       struct page *page,
547                       struct rpcrdma_msg *rdma_resp,
548                       struct svc_rdma_req_map *vec,
549                       int byte_count,
550                       u32 inv_rkey)
551 {
552         struct svc_rdma_op_ctxt *ctxt;
553         u32 xdr_off;
554         int sge_no;
555         int sge_bytes;
556         int ret = -EIO;
557
558         /* Prepare the context */
559         ctxt = svc_rdma_get_context(rdma);
560         ctxt->direction = DMA_TO_DEVICE;
561         ctxt->pages[0] = page;
562         ctxt->count = 1;
563
564         /* Prepare the SGE for the RPCRDMA Header */
565         ctxt->sge[0].lkey = rdma->sc_pd->local_dma_lkey;
566         ctxt->sge[0].length =
567             svc_rdma_xdr_get_reply_hdr_len((__be32 *)rdma_resp);
568         ctxt->sge[0].addr =
569             ib_dma_map_page(rdma->sc_cm_id->device, page, 0,
570                             ctxt->sge[0].length, DMA_TO_DEVICE);
571         if (ib_dma_mapping_error(rdma->sc_cm_id->device, ctxt->sge[0].addr))
572                 goto err;
573         svc_rdma_count_mappings(rdma, ctxt);
574
575         ctxt->direction = DMA_TO_DEVICE;
576
577         /* Map the payload indicated by 'byte_count' */
578         xdr_off = 0;
579         for (sge_no = 1; byte_count && sge_no < vec->count; sge_no++) {
580                 sge_bytes = min_t(size_t, vec->sge[sge_no].iov_len, byte_count);
581                 byte_count -= sge_bytes;
582                 ctxt->sge[sge_no].addr =
583                         dma_map_xdr(rdma, &rqstp->rq_res, xdr_off,
584                                     sge_bytes, DMA_TO_DEVICE);
585                 xdr_off += sge_bytes;
586                 if (ib_dma_mapping_error(rdma->sc_cm_id->device,
587                                          ctxt->sge[sge_no].addr))
588                         goto err;
589                 svc_rdma_count_mappings(rdma, ctxt);
590                 ctxt->sge[sge_no].lkey = rdma->sc_pd->local_dma_lkey;
591                 ctxt->sge[sge_no].length = sge_bytes;
592         }
593         if (byte_count != 0) {
594                 pr_err("svcrdma: Could not map %d bytes\n", byte_count);
595                 goto err;
596         }
597
598         svc_rdma_save_io_pages(rqstp, ctxt);
599
600         if (sge_no > rdma->sc_max_sge) {
601                 pr_err("svcrdma: Too many sges (%d)\n", sge_no);
602                 goto err;
603         }
604
605         ret = svc_rdma_post_send_wr(rdma, ctxt, sge_no, inv_rkey);
606         if (ret)
607                 goto err;
608
609         return 0;
610
611  err:
612         svc_rdma_unmap_dma(ctxt);
613         svc_rdma_put_context(ctxt, 1);
614         return ret;
615 }
616
617 void svc_rdma_prep_reply_hdr(struct svc_rqst *rqstp)
618 {
619 }
620
621 int svc_rdma_sendto(struct svc_rqst *rqstp)
622 {
623         struct svc_xprt *xprt = rqstp->rq_xprt;
624         struct svcxprt_rdma *rdma =
625                 container_of(xprt, struct svcxprt_rdma, sc_xprt);
626         struct rpcrdma_msg *rdma_argp;
627         struct rpcrdma_msg *rdma_resp;
628         struct rpcrdma_write_array *wr_ary, *rp_ary;
629         int ret;
630         int inline_bytes;
631         struct page *res_page;
632         struct svc_rdma_req_map *vec;
633         u32 inv_rkey;
634         __be32 *p;
635
636         dprintk("svcrdma: sending response for rqstp=%p\n", rqstp);
637
638         /* Get the RDMA request header. The receive logic always
639          * places this at the start of page 0.
640          */
641         rdma_argp = page_address(rqstp->rq_pages[0]);
642         svc_rdma_get_write_arrays(rdma_argp, &wr_ary, &rp_ary);
643
644         inv_rkey = 0;
645         if (rdma->sc_snd_w_inv)
646                 inv_rkey = svc_rdma_get_inv_rkey(&rdma_argp->rm_xid,
647                                                  (__be32 *)wr_ary,
648                                                  (__be32 *)rp_ary);
649
650         /* Build an req vec for the XDR */
651         vec = svc_rdma_get_req_map(rdma);
652         ret = svc_rdma_map_xdr(rdma, &rqstp->rq_res, vec, wr_ary != NULL);
653         if (ret)
654                 goto err0;
655         inline_bytes = rqstp->rq_res.len;
656
657         /* Create the RDMA response header. xprt->xpt_mutex,
658          * acquired in svc_send(), serializes RPC replies. The
659          * code path below that inserts the credit grant value
660          * into each transport header runs only inside this
661          * critical section.
662          */
663         ret = -ENOMEM;
664         res_page = alloc_page(GFP_KERNEL);
665         if (!res_page)
666                 goto err0;
667         rdma_resp = page_address(res_page);
668
669         p = &rdma_resp->rm_xid;
670         *p++ = rdma_argp->rm_xid;
671         *p++ = rdma_argp->rm_vers;
672         *p++ = rdma->sc_fc_credits;
673         *p++ = rp_ary ? rdma_nomsg : rdma_msg;
674
675         /* Start with empty chunks */
676         *p++ = xdr_zero;
677         *p++ = xdr_zero;
678         *p   = xdr_zero;
679
680         /* Send any write-chunk data and build resp write-list */
681         if (wr_ary) {
682                 ret = send_write_chunks(rdma, wr_ary, rdma_resp, rqstp, vec);
683                 if (ret < 0)
684                         goto err1;
685                 inline_bytes -= ret + xdr_padsize(ret);
686         }
687
688         /* Send any reply-list data and update resp reply-list */
689         if (rp_ary) {
690                 ret = send_reply_chunks(rdma, rp_ary, rdma_resp, rqstp, vec);
691                 if (ret < 0)
692                         goto err1;
693                 inline_bytes -= ret;
694         }
695
696         /* Post a fresh Receive buffer _before_ sending the reply */
697         ret = svc_rdma_post_recv(rdma, GFP_KERNEL);
698         if (ret)
699                 goto err1;
700
701         ret = send_reply(rdma, rqstp, res_page, rdma_resp, vec,
702                          inline_bytes, inv_rkey);
703         if (ret < 0)
704                 goto err0;
705
706         svc_rdma_put_req_map(rdma, vec);
707         dprintk("svcrdma: send_reply returns %d\n", ret);
708         return ret;
709
710  err1:
711         put_page(res_page);
712  err0:
713         svc_rdma_put_req_map(rdma, vec);
714         pr_err("svcrdma: Could not send reply, err=%d. Closing transport.\n",
715                ret);
716         set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags);
717         return -ENOTCONN;
718 }
719
720 void svc_rdma_send_error(struct svcxprt_rdma *xprt, struct rpcrdma_msg *rmsgp,
721                          int status)
722 {
723         struct page *p;
724         struct svc_rdma_op_ctxt *ctxt;
725         enum rpcrdma_errcode err;
726         __be32 *va;
727         int length;
728         int ret;
729
730         ret = svc_rdma_repost_recv(xprt, GFP_KERNEL);
731         if (ret)
732                 return;
733
734         p = alloc_page(GFP_KERNEL);
735         if (!p)
736                 return;
737         va = page_address(p);
738
739         /* XDR encode an error reply */
740         err = ERR_CHUNK;
741         if (status == -EPROTONOSUPPORT)
742                 err = ERR_VERS;
743         length = svc_rdma_xdr_encode_error(xprt, rmsgp, err, va);
744
745         /* Map transport header; no RPC message payload */
746         ctxt = svc_rdma_get_context(xprt);
747         ret = svc_rdma_map_reply_hdr(xprt, ctxt, &rmsgp->rm_xid, length);
748         if (ret) {
749                 dprintk("svcrdma: Error %d mapping send for protocol error\n",
750                         ret);
751                 return;
752         }
753
754         ret = svc_rdma_post_send_wr(xprt, ctxt, 1, 0);
755         if (ret) {
756                 dprintk("svcrdma: Error %d posting send for protocol error\n",
757                         ret);
758                 svc_rdma_unmap_dma(ctxt);
759                 svc_rdma_put_context(ctxt, 1);
760         }
761 }