]> git.karo-electronics.de Git - karo-tx-linux.git/blob - net/sunrpc/xprtsock.c
SUNRPC: Remove sock and inet fields from rpc_xprt
[karo-tx-linux.git] / net / sunrpc / xprtsock.c
1 /*
2  * linux/net/sunrpc/xprtsock.c
3  *
4  * Client-side transport implementation for sockets.
5  *
6  * TCP callback races fixes (C) 1998 Red Hat Software <alan@redhat.com>
7  * TCP send fixes (C) 1998 Red Hat Software <alan@redhat.com>
8  * TCP NFS related read + write fixes
9  *  (C) 1999 Dave Airlie, University of Limerick, Ireland <airlied@linux.ie>
10  *
11  * Rewrite of larges part of the code in order to stabilize TCP stuff.
12  * Fix behaviour when socket buffer is full.
13  *  (C) 1999 Trond Myklebust <trond.myklebust@fys.uio.no>
14  *
15  * IP socket transport implementation, (C) 2005 Chuck Lever <cel@netapp.com>
16  */
17
18 #include <linux/types.h>
19 #include <linux/slab.h>
20 #include <linux/capability.h>
21 #include <linux/sched.h>
22 #include <linux/pagemap.h>
23 #include <linux/errno.h>
24 #include <linux/socket.h>
25 #include <linux/in.h>
26 #include <linux/net.h>
27 #include <linux/mm.h>
28 #include <linux/udp.h>
29 #include <linux/tcp.h>
30 #include <linux/sunrpc/clnt.h>
31 #include <linux/sunrpc/sched.h>
32 #include <linux/file.h>
33
34 #include <net/sock.h>
35 #include <net/checksum.h>
36 #include <net/udp.h>
37 #include <net/tcp.h>
38
39 /*
40  * xprtsock tunables
41  */
42 unsigned int xprt_udp_slot_table_entries = RPC_DEF_SLOT_TABLE;
43 unsigned int xprt_tcp_slot_table_entries = RPC_DEF_SLOT_TABLE;
44
45 unsigned int xprt_min_resvport = RPC_DEF_MIN_RESVPORT;
46 unsigned int xprt_max_resvport = RPC_DEF_MAX_RESVPORT;
47
48 /*
49  * How many times to try sending a request on a socket before waiting
50  * for the socket buffer to clear.
51  */
52 #define XS_SENDMSG_RETRY        (10U)
53
54 /*
55  * Time out for an RPC UDP socket connect.  UDP socket connects are
56  * synchronous, but we set a timeout anyway in case of resource
57  * exhaustion on the local host.
58  */
59 #define XS_UDP_CONN_TO          (5U * HZ)
60
61 /*
62  * Wait duration for an RPC TCP connection to be established.  Solaris
63  * NFS over TCP uses 60 seconds, for example, which is in line with how
64  * long a server takes to reboot.
65  */
66 #define XS_TCP_CONN_TO          (60U * HZ)
67
68 /*
69  * Wait duration for a reply from the RPC portmapper.
70  */
71 #define XS_BIND_TO              (60U * HZ)
72
73 /*
74  * Delay if a UDP socket connect error occurs.  This is most likely some
75  * kind of resource problem on the local host.
76  */
77 #define XS_UDP_REEST_TO         (2U * HZ)
78
79 /*
80  * The reestablish timeout allows clients to delay for a bit before attempting
81  * to reconnect to a server that just dropped our connection.
82  *
83  * We implement an exponential backoff when trying to reestablish a TCP
84  * transport connection with the server.  Some servers like to drop a TCP
85  * connection when they are overworked, so we start with a short timeout and
86  * increase over time if the server is down or not responding.
87  */
88 #define XS_TCP_INIT_REEST_TO    (3U * HZ)
89 #define XS_TCP_MAX_REEST_TO     (5U * 60 * HZ)
90
91 /*
92  * TCP idle timeout; client drops the transport socket if it is idle
93  * for this long.  Note that we also timeout UDP sockets to prevent
94  * holding port numbers when there is no RPC traffic.
95  */
96 #define XS_IDLE_DISC_TO         (5U * 60 * HZ)
97
98 #ifdef RPC_DEBUG
99 # undef  RPC_DEBUG_DATA
100 # define RPCDBG_FACILITY        RPCDBG_TRANS
101 #endif
102
103 #ifdef RPC_DEBUG_DATA
104 static void xs_pktdump(char *msg, u32 *packet, unsigned int count)
105 {
106         u8 *buf = (u8 *) packet;
107         int j;
108
109         dprintk("RPC:      %s\n", msg);
110         for (j = 0; j < count && j < 128; j += 4) {
111                 if (!(j & 31)) {
112                         if (j)
113                                 dprintk("\n");
114                         dprintk("0x%04x ", j);
115                 }
116                 dprintk("%02x%02x%02x%02x ",
117                         buf[j], buf[j+1], buf[j+2], buf[j+3]);
118         }
119         dprintk("\n");
120 }
121 #else
122 static inline void xs_pktdump(char *msg, u32 *packet, unsigned int count)
123 {
124         /* NOP */
125 }
126 #endif
127
128 struct sock_xprt {
129         struct rpc_xprt         xprt;
130
131         /*
132          * Network layer
133          */
134         struct socket *         sock;
135         struct sock *           inet;
136 };
137
138 static void xs_format_peer_addresses(struct rpc_xprt *xprt)
139 {
140         struct sockaddr_in *addr = (struct sockaddr_in *) &xprt->addr;
141         char *buf;
142
143         buf = kzalloc(20, GFP_KERNEL);
144         if (buf) {
145                 snprintf(buf, 20, "%u.%u.%u.%u",
146                                 NIPQUAD(addr->sin_addr.s_addr));
147         }
148         xprt->address_strings[RPC_DISPLAY_ADDR] = buf;
149
150         buf = kzalloc(8, GFP_KERNEL);
151         if (buf) {
152                 snprintf(buf, 8, "%u",
153                                 ntohs(addr->sin_port));
154         }
155         xprt->address_strings[RPC_DISPLAY_PORT] = buf;
156
157         if (xprt->prot == IPPROTO_UDP)
158                 xprt->address_strings[RPC_DISPLAY_PROTO] = "udp";
159         else
160                 xprt->address_strings[RPC_DISPLAY_PROTO] = "tcp";
161
162         buf = kzalloc(48, GFP_KERNEL);
163         if (buf) {
164                 snprintf(buf, 48, "addr=%u.%u.%u.%u port=%u proto=%s",
165                         NIPQUAD(addr->sin_addr.s_addr),
166                         ntohs(addr->sin_port),
167                         xprt->prot == IPPROTO_UDP ? "udp" : "tcp");
168         }
169         xprt->address_strings[RPC_DISPLAY_ALL] = buf;
170 }
171
172 static void xs_free_peer_addresses(struct rpc_xprt *xprt)
173 {
174         kfree(xprt->address_strings[RPC_DISPLAY_ADDR]);
175         kfree(xprt->address_strings[RPC_DISPLAY_PORT]);
176         kfree(xprt->address_strings[RPC_DISPLAY_ALL]);
177 }
178
179 #define XS_SENDMSG_FLAGS        (MSG_DONTWAIT | MSG_NOSIGNAL)
180
181 static int xs_send_kvec(struct socket *sock, struct sockaddr *addr, int addrlen, struct kvec *vec, unsigned int base, int more)
182 {
183         struct msghdr msg = {
184                 .msg_name       = addr,
185                 .msg_namelen    = addrlen,
186                 .msg_flags      = XS_SENDMSG_FLAGS | (more ? MSG_MORE : 0),
187         };
188         struct kvec iov = {
189                 .iov_base       = vec->iov_base + base,
190                 .iov_len        = vec->iov_len - base,
191         };
192
193         if (iov.iov_len != 0)
194                 return kernel_sendmsg(sock, &msg, &iov, 1, iov.iov_len);
195         return kernel_sendmsg(sock, &msg, NULL, 0, 0);
196 }
197
198 static int xs_send_pagedata(struct socket *sock, struct xdr_buf *xdr, unsigned int base, int more)
199 {
200         struct page **ppage;
201         unsigned int remainder;
202         int err, sent = 0;
203
204         remainder = xdr->page_len - base;
205         base += xdr->page_base;
206         ppage = xdr->pages + (base >> PAGE_SHIFT);
207         base &= ~PAGE_MASK;
208         for(;;) {
209                 unsigned int len = min_t(unsigned int, PAGE_SIZE - base, remainder);
210                 int flags = XS_SENDMSG_FLAGS;
211
212                 remainder -= len;
213                 if (remainder != 0 || more)
214                         flags |= MSG_MORE;
215                 err = sock->ops->sendpage(sock, *ppage, base, len, flags);
216                 if (remainder == 0 || err != len)
217                         break;
218                 sent += err;
219                 ppage++;
220                 base = 0;
221         }
222         if (sent == 0)
223                 return err;
224         if (err > 0)
225                 sent += err;
226         return sent;
227 }
228
229 /**
230  * xs_sendpages - write pages directly to a socket
231  * @sock: socket to send on
232  * @addr: UDP only -- address of destination
233  * @addrlen: UDP only -- length of destination address
234  * @xdr: buffer containing this request
235  * @base: starting position in the buffer
236  *
237  */
238 static int xs_sendpages(struct socket *sock, struct sockaddr *addr, int addrlen, struct xdr_buf *xdr, unsigned int base)
239 {
240         unsigned int remainder = xdr->len - base;
241         int err, sent = 0;
242
243         if (unlikely(!sock))
244                 return -ENOTCONN;
245
246         clear_bit(SOCK_ASYNC_NOSPACE, &sock->flags);
247         if (base != 0) {
248                 addr = NULL;
249                 addrlen = 0;
250         }
251
252         if (base < xdr->head[0].iov_len || addr != NULL) {
253                 unsigned int len = xdr->head[0].iov_len - base;
254                 remainder -= len;
255                 err = xs_send_kvec(sock, addr, addrlen, &xdr->head[0], base, remainder != 0);
256                 if (remainder == 0 || err != len)
257                         goto out;
258                 sent += err;
259                 base = 0;
260         } else
261                 base -= xdr->head[0].iov_len;
262
263         if (base < xdr->page_len) {
264                 unsigned int len = xdr->page_len - base;
265                 remainder -= len;
266                 err = xs_send_pagedata(sock, xdr, base, remainder != 0);
267                 if (remainder == 0 || err != len)
268                         goto out;
269                 sent += err;
270                 base = 0;
271         } else
272                 base -= xdr->page_len;
273
274         if (base >= xdr->tail[0].iov_len)
275                 return sent;
276         err = xs_send_kvec(sock, NULL, 0, &xdr->tail[0], base, 0);
277 out:
278         if (sent == 0)
279                 return err;
280         if (err > 0)
281                 sent += err;
282         return sent;
283 }
284
285 /**
286  * xs_nospace - place task on wait queue if transmit was incomplete
287  * @task: task to put to sleep
288  *
289  */
290 static void xs_nospace(struct rpc_task *task)
291 {
292         struct rpc_rqst *req = task->tk_rqstp;
293         struct rpc_xprt *xprt = req->rq_xprt;
294         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
295
296         dprintk("RPC: %4d xmit incomplete (%u left of %u)\n",
297                         task->tk_pid, req->rq_slen - req->rq_bytes_sent,
298                         req->rq_slen);
299
300         if (test_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags)) {
301                 /* Protect against races with write_space */
302                 spin_lock_bh(&xprt->transport_lock);
303
304                 /* Don't race with disconnect */
305                 if (!xprt_connected(xprt))
306                         task->tk_status = -ENOTCONN;
307                 else if (test_bit(SOCK_NOSPACE, &transport->sock->flags))
308                         xprt_wait_for_buffer_space(task);
309
310                 spin_unlock_bh(&xprt->transport_lock);
311         } else
312                 /* Keep holding the socket if it is blocked */
313                 rpc_delay(task, HZ>>4);
314 }
315
316 /**
317  * xs_udp_send_request - write an RPC request to a UDP socket
318  * @task: address of RPC task that manages the state of an RPC request
319  *
320  * Return values:
321  *        0:    The request has been sent
322  *   EAGAIN:    The socket was blocked, please call again later to
323  *              complete the request
324  * ENOTCONN:    Caller needs to invoke connect logic then call again
325  *    other:    Some other error occured, the request was not sent
326  */
327 static int xs_udp_send_request(struct rpc_task *task)
328 {
329         struct rpc_rqst *req = task->tk_rqstp;
330         struct rpc_xprt *xprt = req->rq_xprt;
331         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
332         struct xdr_buf *xdr = &req->rq_snd_buf;
333         int status;
334
335         xs_pktdump("packet data:",
336                                 req->rq_svec->iov_base,
337                                 req->rq_svec->iov_len);
338
339         req->rq_xtime = jiffies;
340         status = xs_sendpages(transport->sock,
341                               (struct sockaddr *) &xprt->addr,
342                               xprt->addrlen, xdr,
343                               req->rq_bytes_sent);
344
345         dprintk("RPC:      xs_udp_send_request(%u) = %d\n",
346                         xdr->len - req->rq_bytes_sent, status);
347
348         if (likely(status >= (int) req->rq_slen))
349                 return 0;
350
351         /* Still some bytes left; set up for a retry later. */
352         if (status > 0)
353                 status = -EAGAIN;
354
355         switch (status) {
356         case -ENETUNREACH:
357         case -EPIPE:
358         case -ECONNREFUSED:
359                 /* When the server has died, an ICMP port unreachable message
360                  * prompts ECONNREFUSED. */
361                 break;
362         case -EAGAIN:
363                 xs_nospace(task);
364                 break;
365         default:
366                 dprintk("RPC:      sendmsg returned unrecognized error %d\n",
367                         -status);
368                 break;
369         }
370
371         return status;
372 }
373
374 static inline void xs_encode_tcp_record_marker(struct xdr_buf *buf)
375 {
376         u32 reclen = buf->len - sizeof(rpc_fraghdr);
377         rpc_fraghdr *base = buf->head[0].iov_base;
378         *base = htonl(RPC_LAST_STREAM_FRAGMENT | reclen);
379 }
380
381 /**
382  * xs_tcp_send_request - write an RPC request to a TCP socket
383  * @task: address of RPC task that manages the state of an RPC request
384  *
385  * Return values:
386  *        0:    The request has been sent
387  *   EAGAIN:    The socket was blocked, please call again later to
388  *              complete the request
389  * ENOTCONN:    Caller needs to invoke connect logic then call again
390  *    other:    Some other error occured, the request was not sent
391  *
392  * XXX: In the case of soft timeouts, should we eventually give up
393  *      if sendmsg is not able to make progress?
394  */
395 static int xs_tcp_send_request(struct rpc_task *task)
396 {
397         struct rpc_rqst *req = task->tk_rqstp;
398         struct rpc_xprt *xprt = req->rq_xprt;
399         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
400         struct xdr_buf *xdr = &req->rq_snd_buf;
401         int status, retry = 0;
402
403         xs_encode_tcp_record_marker(&req->rq_snd_buf);
404
405         xs_pktdump("packet data:",
406                                 req->rq_svec->iov_base,
407                                 req->rq_svec->iov_len);
408
409         /* Continue transmitting the packet/record. We must be careful
410          * to cope with writespace callbacks arriving _after_ we have
411          * called sendmsg(). */
412         while (1) {
413                 req->rq_xtime = jiffies;
414                 status = xs_sendpages(transport->sock,
415                                         NULL, 0, xdr, req->rq_bytes_sent);
416
417                 dprintk("RPC:      xs_tcp_send_request(%u) = %d\n",
418                                 xdr->len - req->rq_bytes_sent, status);
419
420                 if (unlikely(status < 0))
421                         break;
422
423                 /* If we've sent the entire packet, immediately
424                  * reset the count of bytes sent. */
425                 req->rq_bytes_sent += status;
426                 task->tk_bytes_sent += status;
427                 if (likely(req->rq_bytes_sent >= req->rq_slen)) {
428                         req->rq_bytes_sent = 0;
429                         return 0;
430                 }
431
432                 status = -EAGAIN;
433                 if (retry++ > XS_SENDMSG_RETRY)
434                         break;
435         }
436
437         switch (status) {
438         case -EAGAIN:
439                 xs_nospace(task);
440                 break;
441         case -ECONNREFUSED:
442         case -ECONNRESET:
443         case -ENOTCONN:
444         case -EPIPE:
445                 status = -ENOTCONN;
446                 break;
447         default:
448                 dprintk("RPC:      sendmsg returned unrecognized error %d\n",
449                         -status);
450                 xprt_disconnect(xprt);
451                 break;
452         }
453
454         return status;
455 }
456
457 /**
458  * xs_tcp_release_xprt - clean up after a tcp transmission
459  * @xprt: transport
460  * @task: rpc task
461  *
462  * This cleans up if an error causes us to abort the transmission of a request.
463  * In this case, the socket may need to be reset in order to avoid confusing
464  * the server.
465  */
466 static void xs_tcp_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task)
467 {
468         struct rpc_rqst *req;
469
470         if (task != xprt->snd_task)
471                 return;
472         if (task == NULL)
473                 goto out_release;
474         req = task->tk_rqstp;
475         if (req->rq_bytes_sent == 0)
476                 goto out_release;
477         if (req->rq_bytes_sent == req->rq_snd_buf.len)
478                 goto out_release;
479         set_bit(XPRT_CLOSE_WAIT, &task->tk_xprt->state);
480 out_release:
481         xprt_release_xprt(xprt, task);
482 }
483
484 /**
485  * xs_close - close a socket
486  * @xprt: transport
487  *
488  * This is used when all requests are complete; ie, no DRC state remains
489  * on the server we want to save.
490  */
491 static void xs_close(struct rpc_xprt *xprt)
492 {
493         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
494         struct socket *sock = transport->sock;
495         struct sock *sk = transport->inet;
496
497         if (!sk)
498                 goto clear_close_wait;
499
500         dprintk("RPC:      xs_close xprt %p\n", xprt);
501
502         write_lock_bh(&sk->sk_callback_lock);
503         transport->inet = NULL;
504         transport->sock = NULL;
505
506         sk->sk_user_data = NULL;
507         sk->sk_data_ready = xprt->old_data_ready;
508         sk->sk_state_change = xprt->old_state_change;
509         sk->sk_write_space = xprt->old_write_space;
510         write_unlock_bh(&sk->sk_callback_lock);
511
512         sk->sk_no_check = 0;
513
514         sock_release(sock);
515 clear_close_wait:
516         smp_mb__before_clear_bit();
517         clear_bit(XPRT_CLOSE_WAIT, &xprt->state);
518         smp_mb__after_clear_bit();
519 }
520
521 /**
522  * xs_destroy - prepare to shutdown a transport
523  * @xprt: doomed transport
524  *
525  */
526 static void xs_destroy(struct rpc_xprt *xprt)
527 {
528         dprintk("RPC:      xs_destroy xprt %p\n", xprt);
529
530         cancel_delayed_work(&xprt->connect_worker);
531         flush_scheduled_work();
532
533         xprt_disconnect(xprt);
534         xs_close(xprt);
535         xs_free_peer_addresses(xprt);
536         kfree(xprt->slot);
537         kfree(xprt);
538 }
539
540 static inline struct rpc_xprt *xprt_from_sock(struct sock *sk)
541 {
542         return (struct rpc_xprt *) sk->sk_user_data;
543 }
544
545 /**
546  * xs_udp_data_ready - "data ready" callback for UDP sockets
547  * @sk: socket with data to read
548  * @len: how much data to read
549  *
550  */
551 static void xs_udp_data_ready(struct sock *sk, int len)
552 {
553         struct rpc_task *task;
554         struct rpc_xprt *xprt;
555         struct rpc_rqst *rovr;
556         struct sk_buff *skb;
557         int err, repsize, copied;
558         u32 _xid;
559         __be32 *xp;
560
561         read_lock(&sk->sk_callback_lock);
562         dprintk("RPC:      xs_udp_data_ready...\n");
563         if (!(xprt = xprt_from_sock(sk)))
564                 goto out;
565
566         if ((skb = skb_recv_datagram(sk, 0, 1, &err)) == NULL)
567                 goto out;
568
569         if (xprt->shutdown)
570                 goto dropit;
571
572         repsize = skb->len - sizeof(struct udphdr);
573         if (repsize < 4) {
574                 dprintk("RPC:      impossible RPC reply size %d!\n", repsize);
575                 goto dropit;
576         }
577
578         /* Copy the XID from the skb... */
579         xp = skb_header_pointer(skb, sizeof(struct udphdr),
580                                 sizeof(_xid), &_xid);
581         if (xp == NULL)
582                 goto dropit;
583
584         /* Look up and lock the request corresponding to the given XID */
585         spin_lock(&xprt->transport_lock);
586         rovr = xprt_lookup_rqst(xprt, *xp);
587         if (!rovr)
588                 goto out_unlock;
589         task = rovr->rq_task;
590
591         if ((copied = rovr->rq_private_buf.buflen) > repsize)
592                 copied = repsize;
593
594         /* Suck it into the iovec, verify checksum if not done by hw. */
595         if (csum_partial_copy_to_xdr(&rovr->rq_private_buf, skb))
596                 goto out_unlock;
597
598         /* Something worked... */
599         dst_confirm(skb->dst);
600
601         xprt_adjust_cwnd(task, copied);
602         xprt_update_rtt(task);
603         xprt_complete_rqst(task, copied);
604
605  out_unlock:
606         spin_unlock(&xprt->transport_lock);
607  dropit:
608         skb_free_datagram(sk, skb);
609  out:
610         read_unlock(&sk->sk_callback_lock);
611 }
612
613 static inline size_t xs_tcp_copy_data(skb_reader_t *desc, void *p, size_t len)
614 {
615         if (len > desc->count)
616                 len = desc->count;
617         if (skb_copy_bits(desc->skb, desc->offset, p, len)) {
618                 dprintk("RPC:      failed to copy %zu bytes from skb. %zu bytes remain\n",
619                                 len, desc->count);
620                 return 0;
621         }
622         desc->offset += len;
623         desc->count -= len;
624         dprintk("RPC:      copied %zu bytes from skb. %zu bytes remain\n",
625                         len, desc->count);
626         return len;
627 }
628
629 static inline void xs_tcp_read_fraghdr(struct rpc_xprt *xprt, skb_reader_t *desc)
630 {
631         size_t len, used;
632         char *p;
633
634         p = ((char *) &xprt->tcp_recm) + xprt->tcp_offset;
635         len = sizeof(xprt->tcp_recm) - xprt->tcp_offset;
636         used = xs_tcp_copy_data(desc, p, len);
637         xprt->tcp_offset += used;
638         if (used != len)
639                 return;
640
641         xprt->tcp_reclen = ntohl(xprt->tcp_recm);
642         if (xprt->tcp_reclen & RPC_LAST_STREAM_FRAGMENT)
643                 xprt->tcp_flags |= XPRT_LAST_FRAG;
644         else
645                 xprt->tcp_flags &= ~XPRT_LAST_FRAG;
646         xprt->tcp_reclen &= RPC_FRAGMENT_SIZE_MASK;
647
648         xprt->tcp_flags &= ~XPRT_COPY_RECM;
649         xprt->tcp_offset = 0;
650
651         /* Sanity check of the record length */
652         if (unlikely(xprt->tcp_reclen < 4)) {
653                 dprintk("RPC:      invalid TCP record fragment length\n");
654                 xprt_disconnect(xprt);
655                 return;
656         }
657         dprintk("RPC:      reading TCP record fragment of length %d\n",
658                         xprt->tcp_reclen);
659 }
660
661 static void xs_tcp_check_recm(struct rpc_xprt *xprt)
662 {
663         dprintk("RPC:      xprt = %p, tcp_copied = %lu, tcp_offset = %u, tcp_reclen = %u, tcp_flags = %lx\n",
664                         xprt, xprt->tcp_copied, xprt->tcp_offset, xprt->tcp_reclen, xprt->tcp_flags);
665         if (xprt->tcp_offset == xprt->tcp_reclen) {
666                 xprt->tcp_flags |= XPRT_COPY_RECM;
667                 xprt->tcp_offset = 0;
668                 if (xprt->tcp_flags & XPRT_LAST_FRAG) {
669                         xprt->tcp_flags &= ~XPRT_COPY_DATA;
670                         xprt->tcp_flags |= XPRT_COPY_XID;
671                         xprt->tcp_copied = 0;
672                 }
673         }
674 }
675
676 static inline void xs_tcp_read_xid(struct rpc_xprt *xprt, skb_reader_t *desc)
677 {
678         size_t len, used;
679         char *p;
680
681         len = sizeof(xprt->tcp_xid) - xprt->tcp_offset;
682         dprintk("RPC:      reading XID (%Zu bytes)\n", len);
683         p = ((char *) &xprt->tcp_xid) + xprt->tcp_offset;
684         used = xs_tcp_copy_data(desc, p, len);
685         xprt->tcp_offset += used;
686         if (used != len)
687                 return;
688         xprt->tcp_flags &= ~XPRT_COPY_XID;
689         xprt->tcp_flags |= XPRT_COPY_DATA;
690         xprt->tcp_copied = 4;
691         dprintk("RPC:      reading reply for XID %08x\n",
692                                                 ntohl(xprt->tcp_xid));
693         xs_tcp_check_recm(xprt);
694 }
695
696 static inline void xs_tcp_read_request(struct rpc_xprt *xprt, skb_reader_t *desc)
697 {
698         struct rpc_rqst *req;
699         struct xdr_buf *rcvbuf;
700         size_t len;
701         ssize_t r;
702
703         /* Find and lock the request corresponding to this xid */
704         spin_lock(&xprt->transport_lock);
705         req = xprt_lookup_rqst(xprt, xprt->tcp_xid);
706         if (!req) {
707                 xprt->tcp_flags &= ~XPRT_COPY_DATA;
708                 dprintk("RPC:      XID %08x request not found!\n",
709                                 ntohl(xprt->tcp_xid));
710                 spin_unlock(&xprt->transport_lock);
711                 return;
712         }
713
714         rcvbuf = &req->rq_private_buf;
715         len = desc->count;
716         if (len > xprt->tcp_reclen - xprt->tcp_offset) {
717                 skb_reader_t my_desc;
718
719                 len = xprt->tcp_reclen - xprt->tcp_offset;
720                 memcpy(&my_desc, desc, sizeof(my_desc));
721                 my_desc.count = len;
722                 r = xdr_partial_copy_from_skb(rcvbuf, xprt->tcp_copied,
723                                           &my_desc, xs_tcp_copy_data);
724                 desc->count -= r;
725                 desc->offset += r;
726         } else
727                 r = xdr_partial_copy_from_skb(rcvbuf, xprt->tcp_copied,
728                                           desc, xs_tcp_copy_data);
729
730         if (r > 0) {
731                 xprt->tcp_copied += r;
732                 xprt->tcp_offset += r;
733         }
734         if (r != len) {
735                 /* Error when copying to the receive buffer,
736                  * usually because we weren't able to allocate
737                  * additional buffer pages. All we can do now
738                  * is turn off XPRT_COPY_DATA, so the request
739                  * will not receive any additional updates,
740                  * and time out.
741                  * Any remaining data from this record will
742                  * be discarded.
743                  */
744                 xprt->tcp_flags &= ~XPRT_COPY_DATA;
745                 dprintk("RPC:      XID %08x truncated request\n",
746                                 ntohl(xprt->tcp_xid));
747                 dprintk("RPC:      xprt = %p, tcp_copied = %lu, tcp_offset = %u, tcp_reclen = %u\n",
748                                 xprt, xprt->tcp_copied, xprt->tcp_offset, xprt->tcp_reclen);
749                 goto out;
750         }
751
752         dprintk("RPC:      XID %08x read %Zd bytes\n",
753                         ntohl(xprt->tcp_xid), r);
754         dprintk("RPC:      xprt = %p, tcp_copied = %lu, tcp_offset = %u, tcp_reclen = %u\n",
755                         xprt, xprt->tcp_copied, xprt->tcp_offset, xprt->tcp_reclen);
756
757         if (xprt->tcp_copied == req->rq_private_buf.buflen)
758                 xprt->tcp_flags &= ~XPRT_COPY_DATA;
759         else if (xprt->tcp_offset == xprt->tcp_reclen) {
760                 if (xprt->tcp_flags & XPRT_LAST_FRAG)
761                         xprt->tcp_flags &= ~XPRT_COPY_DATA;
762         }
763
764 out:
765         if (!(xprt->tcp_flags & XPRT_COPY_DATA))
766                 xprt_complete_rqst(req->rq_task, xprt->tcp_copied);
767         spin_unlock(&xprt->transport_lock);
768         xs_tcp_check_recm(xprt);
769 }
770
771 static inline void xs_tcp_read_discard(struct rpc_xprt *xprt, skb_reader_t *desc)
772 {
773         size_t len;
774
775         len = xprt->tcp_reclen - xprt->tcp_offset;
776         if (len > desc->count)
777                 len = desc->count;
778         desc->count -= len;
779         desc->offset += len;
780         xprt->tcp_offset += len;
781         dprintk("RPC:      discarded %Zu bytes\n", len);
782         xs_tcp_check_recm(xprt);
783 }
784
785 static int xs_tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb, unsigned int offset, size_t len)
786 {
787         struct rpc_xprt *xprt = rd_desc->arg.data;
788         skb_reader_t desc = {
789                 .skb    = skb,
790                 .offset = offset,
791                 .count  = len,
792                 .csum   = 0
793         };
794
795         dprintk("RPC:      xs_tcp_data_recv started\n");
796         do {
797                 /* Read in a new fragment marker if necessary */
798                 /* Can we ever really expect to get completely empty fragments? */
799                 if (xprt->tcp_flags & XPRT_COPY_RECM) {
800                         xs_tcp_read_fraghdr(xprt, &desc);
801                         continue;
802                 }
803                 /* Read in the xid if necessary */
804                 if (xprt->tcp_flags & XPRT_COPY_XID) {
805                         xs_tcp_read_xid(xprt, &desc);
806                         continue;
807                 }
808                 /* Read in the request data */
809                 if (xprt->tcp_flags & XPRT_COPY_DATA) {
810                         xs_tcp_read_request(xprt, &desc);
811                         continue;
812                 }
813                 /* Skip over any trailing bytes on short reads */
814                 xs_tcp_read_discard(xprt, &desc);
815         } while (desc.count);
816         dprintk("RPC:      xs_tcp_data_recv done\n");
817         return len - desc.count;
818 }
819
820 /**
821  * xs_tcp_data_ready - "data ready" callback for TCP sockets
822  * @sk: socket with data to read
823  * @bytes: how much data to read
824  *
825  */
826 static void xs_tcp_data_ready(struct sock *sk, int bytes)
827 {
828         struct rpc_xprt *xprt;
829         read_descriptor_t rd_desc;
830
831         read_lock(&sk->sk_callback_lock);
832         dprintk("RPC:      xs_tcp_data_ready...\n");
833         if (!(xprt = xprt_from_sock(sk)))
834                 goto out;
835         if (xprt->shutdown)
836                 goto out;
837
838         /* We use rd_desc to pass struct xprt to xs_tcp_data_recv */
839         rd_desc.arg.data = xprt;
840         rd_desc.count = 65536;
841         tcp_read_sock(sk, &rd_desc, xs_tcp_data_recv);
842 out:
843         read_unlock(&sk->sk_callback_lock);
844 }
845
846 /**
847  * xs_tcp_state_change - callback to handle TCP socket state changes
848  * @sk: socket whose state has changed
849  *
850  */
851 static void xs_tcp_state_change(struct sock *sk)
852 {
853         struct rpc_xprt *xprt;
854
855         read_lock(&sk->sk_callback_lock);
856         if (!(xprt = xprt_from_sock(sk)))
857                 goto out;
858         dprintk("RPC:      xs_tcp_state_change client %p...\n", xprt);
859         dprintk("RPC:      state %x conn %d dead %d zapped %d\n",
860                                 sk->sk_state, xprt_connected(xprt),
861                                 sock_flag(sk, SOCK_DEAD),
862                                 sock_flag(sk, SOCK_ZAPPED));
863
864         switch (sk->sk_state) {
865         case TCP_ESTABLISHED:
866                 spin_lock_bh(&xprt->transport_lock);
867                 if (!xprt_test_and_set_connected(xprt)) {
868                         /* Reset TCP record info */
869                         xprt->tcp_offset = 0;
870                         xprt->tcp_reclen = 0;
871                         xprt->tcp_copied = 0;
872                         xprt->tcp_flags = XPRT_COPY_RECM | XPRT_COPY_XID;
873                         xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO;
874                         xprt_wake_pending_tasks(xprt, 0);
875                 }
876                 spin_unlock_bh(&xprt->transport_lock);
877                 break;
878         case TCP_SYN_SENT:
879         case TCP_SYN_RECV:
880                 break;
881         case TCP_CLOSE_WAIT:
882                 /* Try to schedule an autoclose RPC calls */
883                 set_bit(XPRT_CLOSE_WAIT, &xprt->state);
884                 if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0)
885                         schedule_work(&xprt->task_cleanup);
886         default:
887                 xprt_disconnect(xprt);
888         }
889  out:
890         read_unlock(&sk->sk_callback_lock);
891 }
892
893 /**
894  * xs_udp_write_space - callback invoked when socket buffer space
895  *                             becomes available
896  * @sk: socket whose state has changed
897  *
898  * Called when more output buffer space is available for this socket.
899  * We try not to wake our writers until they can make "significant"
900  * progress, otherwise we'll waste resources thrashing kernel_sendmsg
901  * with a bunch of small requests.
902  */
903 static void xs_udp_write_space(struct sock *sk)
904 {
905         read_lock(&sk->sk_callback_lock);
906
907         /* from net/core/sock.c:sock_def_write_space */
908         if (sock_writeable(sk)) {
909                 struct socket *sock;
910                 struct rpc_xprt *xprt;
911
912                 if (unlikely(!(sock = sk->sk_socket)))
913                         goto out;
914                 if (unlikely(!(xprt = xprt_from_sock(sk))))
915                         goto out;
916                 if (unlikely(!test_and_clear_bit(SOCK_NOSPACE, &sock->flags)))
917                         goto out;
918
919                 xprt_write_space(xprt);
920         }
921
922  out:
923         read_unlock(&sk->sk_callback_lock);
924 }
925
926 /**
927  * xs_tcp_write_space - callback invoked when socket buffer space
928  *                             becomes available
929  * @sk: socket whose state has changed
930  *
931  * Called when more output buffer space is available for this socket.
932  * We try not to wake our writers until they can make "significant"
933  * progress, otherwise we'll waste resources thrashing kernel_sendmsg
934  * with a bunch of small requests.
935  */
936 static void xs_tcp_write_space(struct sock *sk)
937 {
938         read_lock(&sk->sk_callback_lock);
939
940         /* from net/core/stream.c:sk_stream_write_space */
941         if (sk_stream_wspace(sk) >= sk_stream_min_wspace(sk)) {
942                 struct socket *sock;
943                 struct rpc_xprt *xprt;
944
945                 if (unlikely(!(sock = sk->sk_socket)))
946                         goto out;
947                 if (unlikely(!(xprt = xprt_from_sock(sk))))
948                         goto out;
949                 if (unlikely(!test_and_clear_bit(SOCK_NOSPACE, &sock->flags)))
950                         goto out;
951
952                 xprt_write_space(xprt);
953         }
954
955  out:
956         read_unlock(&sk->sk_callback_lock);
957 }
958
959 static void xs_udp_do_set_buffer_size(struct rpc_xprt *xprt)
960 {
961         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
962         struct sock *sk = transport->inet;
963
964         if (xprt->rcvsize) {
965                 sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
966                 sk->sk_rcvbuf = xprt->rcvsize * xprt->max_reqs *  2;
967         }
968         if (xprt->sndsize) {
969                 sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
970                 sk->sk_sndbuf = xprt->sndsize * xprt->max_reqs * 2;
971                 sk->sk_write_space(sk);
972         }
973 }
974
975 /**
976  * xs_udp_set_buffer_size - set send and receive limits
977  * @xprt: generic transport
978  * @sndsize: requested size of send buffer, in bytes
979  * @rcvsize: requested size of receive buffer, in bytes
980  *
981  * Set socket send and receive buffer size limits.
982  */
983 static void xs_udp_set_buffer_size(struct rpc_xprt *xprt, size_t sndsize, size_t rcvsize)
984 {
985         xprt->sndsize = 0;
986         if (sndsize)
987                 xprt->sndsize = sndsize + 1024;
988         xprt->rcvsize = 0;
989         if (rcvsize)
990                 xprt->rcvsize = rcvsize + 1024;
991
992         xs_udp_do_set_buffer_size(xprt);
993 }
994
995 /**
996  * xs_udp_timer - called when a retransmit timeout occurs on a UDP transport
997  * @task: task that timed out
998  *
999  * Adjust the congestion window after a retransmit timeout has occurred.
1000  */
1001 static void xs_udp_timer(struct rpc_task *task)
1002 {
1003         xprt_adjust_cwnd(task, -ETIMEDOUT);
1004 }
1005
1006 static unsigned short xs_get_random_port(void)
1007 {
1008         unsigned short range = xprt_max_resvport - xprt_min_resvport;
1009         unsigned short rand = (unsigned short) net_random() % range;
1010         return rand + xprt_min_resvport;
1011 }
1012
1013 /**
1014  * xs_print_peer_address - format an IPv4 address for printing
1015  * @xprt: generic transport
1016  * @format: flags field indicating which parts of the address to render
1017  */
1018 static char *xs_print_peer_address(struct rpc_xprt *xprt, enum rpc_display_format_t format)
1019 {
1020         if (xprt->address_strings[format] != NULL)
1021                 return xprt->address_strings[format];
1022         else
1023                 return "unprintable";
1024 }
1025
1026 /**
1027  * xs_set_port - reset the port number in the remote endpoint address
1028  * @xprt: generic transport
1029  * @port: new port number
1030  *
1031  */
1032 static void xs_set_port(struct rpc_xprt *xprt, unsigned short port)
1033 {
1034         struct sockaddr_in *sap = (struct sockaddr_in *) &xprt->addr;
1035
1036         dprintk("RPC:      setting port for xprt %p to %u\n", xprt, port);
1037
1038         sap->sin_port = htons(port);
1039 }
1040
1041 static int xs_bindresvport(struct rpc_xprt *xprt, struct socket *sock)
1042 {
1043         struct sockaddr_in myaddr = {
1044                 .sin_family = AF_INET,
1045         };
1046         int err;
1047         unsigned short port = xprt->port;
1048
1049         do {
1050                 myaddr.sin_port = htons(port);
1051                 err = kernel_bind(sock, (struct sockaddr *) &myaddr,
1052                                                 sizeof(myaddr));
1053                 if (err == 0) {
1054                         xprt->port = port;
1055                         dprintk("RPC:      xs_bindresvport bound to port %u\n",
1056                                         port);
1057                         return 0;
1058                 }
1059                 if (port <= xprt_min_resvport)
1060                         port = xprt_max_resvport;
1061                 else
1062                         port--;
1063         } while (err == -EADDRINUSE && port != xprt->port);
1064
1065         dprintk("RPC:      can't bind to reserved port (%d).\n", -err);
1066         return err;
1067 }
1068
1069 /**
1070  * xs_udp_connect_worker - set up a UDP socket
1071  * @args: RPC transport to connect
1072  *
1073  * Invoked by a work queue tasklet.
1074  */
1075 static void xs_udp_connect_worker(void *args)
1076 {
1077         struct rpc_xprt *xprt = (struct rpc_xprt *) args;
1078         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1079         struct socket *sock = transport->sock;
1080         int err, status = -EIO;
1081
1082         if (xprt->shutdown || !xprt_bound(xprt))
1083                 goto out;
1084
1085         /* Start by resetting any existing state */
1086         xs_close(xprt);
1087
1088         if ((err = sock_create_kern(PF_INET, SOCK_DGRAM, IPPROTO_UDP, &sock)) < 0) {
1089                 dprintk("RPC:      can't create UDP transport socket (%d).\n", -err);
1090                 goto out;
1091         }
1092
1093         if (xprt->resvport && xs_bindresvport(xprt, sock) < 0) {
1094                 sock_release(sock);
1095                 goto out;
1096         }
1097
1098         dprintk("RPC:      worker connecting xprt %p to address: %s\n",
1099                         xprt, xs_print_peer_address(xprt, RPC_DISPLAY_ALL));
1100
1101         if (!transport->inet) {
1102                 struct sock *sk = sock->sk;
1103
1104                 write_lock_bh(&sk->sk_callback_lock);
1105
1106                 sk->sk_user_data = xprt;
1107                 xprt->old_data_ready = sk->sk_data_ready;
1108                 xprt->old_state_change = sk->sk_state_change;
1109                 xprt->old_write_space = sk->sk_write_space;
1110                 sk->sk_data_ready = xs_udp_data_ready;
1111                 sk->sk_write_space = xs_udp_write_space;
1112                 sk->sk_no_check = UDP_CSUM_NORCV;
1113                 sk->sk_allocation = GFP_ATOMIC;
1114
1115                 xprt_set_connected(xprt);
1116
1117                 /* Reset to new socket */
1118                 transport->sock = sock;
1119                 transport->inet = sk;
1120
1121                 write_unlock_bh(&sk->sk_callback_lock);
1122         }
1123         xs_udp_do_set_buffer_size(xprt);
1124         status = 0;
1125 out:
1126         xprt_wake_pending_tasks(xprt, status);
1127         xprt_clear_connecting(xprt);
1128 }
1129
1130 /*
1131  * We need to preserve the port number so the reply cache on the server can
1132  * find our cached RPC replies when we get around to reconnecting.
1133  */
1134 static void xs_tcp_reuse_connection(struct rpc_xprt *xprt)
1135 {
1136         int result;
1137         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1138         struct sockaddr any;
1139
1140         dprintk("RPC:      disconnecting xprt %p to reuse port\n", xprt);
1141
1142         /*
1143          * Disconnect the transport socket by doing a connect operation
1144          * with AF_UNSPEC.  This should return immediately...
1145          */
1146         memset(&any, 0, sizeof(any));
1147         any.sa_family = AF_UNSPEC;
1148         result = kernel_connect(transport->sock, &any, sizeof(any), 0);
1149         if (result)
1150                 dprintk("RPC:      AF_UNSPEC connect return code %d\n",
1151                                 result);
1152 }
1153
1154 /**
1155  * xs_tcp_connect_worker - connect a TCP socket to a remote endpoint
1156  * @args: RPC transport to connect
1157  *
1158  * Invoked by a work queue tasklet.
1159  */
1160 static void xs_tcp_connect_worker(void *args)
1161 {
1162         struct rpc_xprt *xprt = (struct rpc_xprt *)args;
1163         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1164         struct socket *sock = transport->sock;
1165         int err, status = -EIO;
1166
1167         if (xprt->shutdown || !xprt_bound(xprt))
1168                 goto out;
1169
1170         if (!sock) {
1171                 /* start from scratch */
1172                 if ((err = sock_create_kern(PF_INET, SOCK_STREAM, IPPROTO_TCP, &sock)) < 0) {
1173                         dprintk("RPC:      can't create TCP transport socket (%d).\n", -err);
1174                         goto out;
1175                 }
1176
1177                 if (xprt->resvport && xs_bindresvport(xprt, sock) < 0) {
1178                         sock_release(sock);
1179                         goto out;
1180                 }
1181         } else
1182                 /* "close" the socket, preserving the local port */
1183                 xs_tcp_reuse_connection(xprt);
1184
1185         dprintk("RPC:      worker connecting xprt %p to address: %s\n",
1186                         xprt, xs_print_peer_address(xprt, RPC_DISPLAY_ALL));
1187
1188         if (!transport->inet) {
1189                 struct sock *sk = sock->sk;
1190
1191                 write_lock_bh(&sk->sk_callback_lock);
1192
1193                 sk->sk_user_data = xprt;
1194                 xprt->old_data_ready = sk->sk_data_ready;
1195                 xprt->old_state_change = sk->sk_state_change;
1196                 xprt->old_write_space = sk->sk_write_space;
1197                 sk->sk_data_ready = xs_tcp_data_ready;
1198                 sk->sk_state_change = xs_tcp_state_change;
1199                 sk->sk_write_space = xs_tcp_write_space;
1200                 sk->sk_allocation = GFP_ATOMIC;
1201
1202                 /* socket options */
1203                 sk->sk_userlocks |= SOCK_BINDPORT_LOCK;
1204                 sock_reset_flag(sk, SOCK_LINGER);
1205                 tcp_sk(sk)->linger2 = 0;
1206                 tcp_sk(sk)->nonagle |= TCP_NAGLE_OFF;
1207
1208                 xprt_clear_connected(xprt);
1209
1210                 /* Reset to new socket */
1211                 transport->sock = sock;
1212                 transport->inet = sk;
1213
1214                 write_unlock_bh(&sk->sk_callback_lock);
1215         }
1216
1217         /* Tell the socket layer to start connecting... */
1218         xprt->stat.connect_count++;
1219         xprt->stat.connect_start = jiffies;
1220         status = kernel_connect(sock, (struct sockaddr *) &xprt->addr,
1221                         xprt->addrlen, O_NONBLOCK);
1222         dprintk("RPC: %p  connect status %d connected %d sock state %d\n",
1223                         xprt, -status, xprt_connected(xprt), sock->sk->sk_state);
1224         if (status < 0) {
1225                 switch (status) {
1226                         case -EINPROGRESS:
1227                         case -EALREADY:
1228                                 goto out_clear;
1229                         case -ECONNREFUSED:
1230                         case -ECONNRESET:
1231                                 /* retry with existing socket, after a delay */
1232                                 break;
1233                         default:
1234                                 /* get rid of existing socket, and retry */
1235                                 xs_close(xprt);
1236                                 break;
1237                 }
1238         }
1239 out:
1240         xprt_wake_pending_tasks(xprt, status);
1241 out_clear:
1242         xprt_clear_connecting(xprt);
1243 }
1244
1245 /**
1246  * xs_connect - connect a socket to a remote endpoint
1247  * @task: address of RPC task that manages state of connect request
1248  *
1249  * TCP: If the remote end dropped the connection, delay reconnecting.
1250  *
1251  * UDP socket connects are synchronous, but we use a work queue anyway
1252  * to guarantee that even unprivileged user processes can set up a
1253  * socket on a privileged port.
1254  *
1255  * If a UDP socket connect fails, the delay behavior here prevents
1256  * retry floods (hard mounts).
1257  */
1258 static void xs_connect(struct rpc_task *task)
1259 {
1260         struct rpc_xprt *xprt = task->tk_xprt;
1261         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1262
1263         if (xprt_test_and_set_connecting(xprt))
1264                 return;
1265
1266         if (transport->sock != NULL) {
1267                 dprintk("RPC:      xs_connect delayed xprt %p for %lu seconds\n",
1268                                 xprt, xprt->reestablish_timeout / HZ);
1269                 schedule_delayed_work(&xprt->connect_worker,
1270                                         xprt->reestablish_timeout);
1271                 xprt->reestablish_timeout <<= 1;
1272                 if (xprt->reestablish_timeout > XS_TCP_MAX_REEST_TO)
1273                         xprt->reestablish_timeout = XS_TCP_MAX_REEST_TO;
1274         } else {
1275                 dprintk("RPC:      xs_connect scheduled xprt %p\n", xprt);
1276                 schedule_work(&xprt->connect_worker);
1277
1278                 /* flush_scheduled_work can sleep... */
1279                 if (!RPC_IS_ASYNC(task))
1280                         flush_scheduled_work();
1281         }
1282 }
1283
1284 /**
1285  * xs_udp_print_stats - display UDP socket-specifc stats
1286  * @xprt: rpc_xprt struct containing statistics
1287  * @seq: output file
1288  *
1289  */
1290 static void xs_udp_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
1291 {
1292         seq_printf(seq, "\txprt:\tudp %u %lu %lu %lu %lu %Lu %Lu\n",
1293                         xprt->port,
1294                         xprt->stat.bind_count,
1295                         xprt->stat.sends,
1296                         xprt->stat.recvs,
1297                         xprt->stat.bad_xids,
1298                         xprt->stat.req_u,
1299                         xprt->stat.bklog_u);
1300 }
1301
1302 /**
1303  * xs_tcp_print_stats - display TCP socket-specifc stats
1304  * @xprt: rpc_xprt struct containing statistics
1305  * @seq: output file
1306  *
1307  */
1308 static void xs_tcp_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
1309 {
1310         long idle_time = 0;
1311
1312         if (xprt_connected(xprt))
1313                 idle_time = (long)(jiffies - xprt->last_used) / HZ;
1314
1315         seq_printf(seq, "\txprt:\ttcp %u %lu %lu %lu %ld %lu %lu %lu %Lu %Lu\n",
1316                         xprt->port,
1317                         xprt->stat.bind_count,
1318                         xprt->stat.connect_count,
1319                         xprt->stat.connect_time,
1320                         idle_time,
1321                         xprt->stat.sends,
1322                         xprt->stat.recvs,
1323                         xprt->stat.bad_xids,
1324                         xprt->stat.req_u,
1325                         xprt->stat.bklog_u);
1326 }
1327
1328 static struct rpc_xprt_ops xs_udp_ops = {
1329         .set_buffer_size        = xs_udp_set_buffer_size,
1330         .print_addr             = xs_print_peer_address,
1331         .reserve_xprt           = xprt_reserve_xprt_cong,
1332         .release_xprt           = xprt_release_xprt_cong,
1333         .rpcbind                = rpc_getport,
1334         .set_port               = xs_set_port,
1335         .connect                = xs_connect,
1336         .buf_alloc              = rpc_malloc,
1337         .buf_free               = rpc_free,
1338         .send_request           = xs_udp_send_request,
1339         .set_retrans_timeout    = xprt_set_retrans_timeout_rtt,
1340         .timer                  = xs_udp_timer,
1341         .release_request        = xprt_release_rqst_cong,
1342         .close                  = xs_close,
1343         .destroy                = xs_destroy,
1344         .print_stats            = xs_udp_print_stats,
1345 };
1346
1347 static struct rpc_xprt_ops xs_tcp_ops = {
1348         .print_addr             = xs_print_peer_address,
1349         .reserve_xprt           = xprt_reserve_xprt,
1350         .release_xprt           = xs_tcp_release_xprt,
1351         .rpcbind                = rpc_getport,
1352         .set_port               = xs_set_port,
1353         .connect                = xs_connect,
1354         .buf_alloc              = rpc_malloc,
1355         .buf_free               = rpc_free,
1356         .send_request           = xs_tcp_send_request,
1357         .set_retrans_timeout    = xprt_set_retrans_timeout_def,
1358         .close                  = xs_close,
1359         .destroy                = xs_destroy,
1360         .print_stats            = xs_tcp_print_stats,
1361 };
1362
1363 static struct rpc_xprt *xs_setup_xprt(struct sockaddr *addr, size_t addrlen, unsigned int slot_table_size)
1364 {
1365         struct rpc_xprt *xprt;
1366         struct sock_xprt *new;
1367
1368         if (addrlen > sizeof(xprt->addr)) {
1369                 dprintk("RPC:      xs_setup_xprt: address too large\n");
1370                 return ERR_PTR(-EBADF);
1371         }
1372
1373         new = kzalloc(sizeof(*new), GFP_KERNEL);
1374         if (new == NULL) {
1375                 dprintk("RPC:      xs_setup_xprt: couldn't allocate rpc_xprt\n");
1376                 return ERR_PTR(-ENOMEM);
1377         }
1378         xprt = &new->xprt;
1379
1380         xprt->max_reqs = slot_table_size;
1381         xprt->slot = kcalloc(xprt->max_reqs, sizeof(struct rpc_rqst), GFP_KERNEL);
1382         if (xprt->slot == NULL) {
1383                 kfree(xprt);
1384                 dprintk("RPC:      xs_setup_xprt: couldn't allocate slot table\n");
1385                 return ERR_PTR(-ENOMEM);
1386         }
1387
1388         memcpy(&xprt->addr, addr, addrlen);
1389         xprt->addrlen = addrlen;
1390         xprt->port = xs_get_random_port();
1391
1392         return xprt;
1393 }
1394
1395 /**
1396  * xs_setup_udp - Set up transport to use a UDP socket
1397  * @addr: address of remote server
1398  * @addrlen: length of address in bytes
1399  * @to:   timeout parameters
1400  *
1401  */
1402 struct rpc_xprt *xs_setup_udp(struct sockaddr *addr, size_t addrlen, struct rpc_timeout *to)
1403 {
1404         struct rpc_xprt *xprt;
1405
1406         xprt = xs_setup_xprt(addr, addrlen, xprt_udp_slot_table_entries);
1407         if (IS_ERR(xprt))
1408                 return xprt;
1409
1410         if (ntohs(((struct sockaddr_in *)addr)->sin_port) != 0)
1411                 xprt_set_bound(xprt);
1412
1413         xprt->prot = IPPROTO_UDP;
1414         xprt->tsh_size = 0;
1415         /* XXX: header size can vary due to auth type, IPv6, etc. */
1416         xprt->max_payload = (1U << 16) - (MAX_HEADER << 3);
1417
1418         INIT_WORK(&xprt->connect_worker, xs_udp_connect_worker, xprt);
1419         xprt->bind_timeout = XS_BIND_TO;
1420         xprt->connect_timeout = XS_UDP_CONN_TO;
1421         xprt->reestablish_timeout = XS_UDP_REEST_TO;
1422         xprt->idle_timeout = XS_IDLE_DISC_TO;
1423
1424         xprt->ops = &xs_udp_ops;
1425
1426         if (to)
1427                 xprt->timeout = *to;
1428         else
1429                 xprt_set_timeout(&xprt->timeout, 5, 5 * HZ);
1430
1431         xs_format_peer_addresses(xprt);
1432         dprintk("RPC:      set up transport to address %s\n",
1433                         xs_print_peer_address(xprt, RPC_DISPLAY_ALL));
1434
1435         return xprt;
1436 }
1437
1438 /**
1439  * xs_setup_tcp - Set up transport to use a TCP socket
1440  * @addr: address of remote server
1441  * @addrlen: length of address in bytes
1442  * @to: timeout parameters
1443  *
1444  */
1445 struct rpc_xprt *xs_setup_tcp(struct sockaddr *addr, size_t addrlen, struct rpc_timeout *to)
1446 {
1447         struct rpc_xprt *xprt;
1448
1449         xprt = xs_setup_xprt(addr, addrlen, xprt_tcp_slot_table_entries);
1450         if (IS_ERR(xprt))
1451                 return xprt;
1452
1453         if (ntohs(((struct sockaddr_in *)addr)->sin_port) != 0)
1454                 xprt_set_bound(xprt);
1455
1456         xprt->prot = IPPROTO_TCP;
1457         xprt->tsh_size = sizeof(rpc_fraghdr) / sizeof(u32);
1458         xprt->max_payload = RPC_MAX_FRAGMENT_SIZE;
1459
1460         INIT_WORK(&xprt->connect_worker, xs_tcp_connect_worker, xprt);
1461         xprt->bind_timeout = XS_BIND_TO;
1462         xprt->connect_timeout = XS_TCP_CONN_TO;
1463         xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO;
1464         xprt->idle_timeout = XS_IDLE_DISC_TO;
1465
1466         xprt->ops = &xs_tcp_ops;
1467
1468         if (to)
1469                 xprt->timeout = *to;
1470         else
1471                 xprt_set_timeout(&xprt->timeout, 2, 60 * HZ);
1472
1473         xs_format_peer_addresses(xprt);
1474         dprintk("RPC:      set up transport to address %s\n",
1475                         xs_print_peer_address(xprt, RPC_DISPLAY_ALL));
1476
1477         return xprt;
1478 }