]> git.karo-electronics.de Git - mv-sheeva.git/blob - net/sunrpc/xprtsock.c
SUNRPC: Move rpc_xprt socket connect fields into private data structure
[mv-sheeva.git] / net / sunrpc / xprtsock.c
1 /*
2  * linux/net/sunrpc/xprtsock.c
3  *
4  * Client-side transport implementation for sockets.
5  *
6  * TCP callback races fixes (C) 1998 Red Hat Software <alan@redhat.com>
7  * TCP send fixes (C) 1998 Red Hat Software <alan@redhat.com>
8  * TCP NFS related read + write fixes
9  *  (C) 1999 Dave Airlie, University of Limerick, Ireland <airlied@linux.ie>
10  *
11  * Rewrite of larges part of the code in order to stabilize TCP stuff.
12  * Fix behaviour when socket buffer is full.
13  *  (C) 1999 Trond Myklebust <trond.myklebust@fys.uio.no>
14  *
15  * IP socket transport implementation, (C) 2005 Chuck Lever <cel@netapp.com>
16  */
17
18 #include <linux/types.h>
19 #include <linux/slab.h>
20 #include <linux/capability.h>
21 #include <linux/sched.h>
22 #include <linux/pagemap.h>
23 #include <linux/errno.h>
24 #include <linux/socket.h>
25 #include <linux/in.h>
26 #include <linux/net.h>
27 #include <linux/mm.h>
28 #include <linux/udp.h>
29 #include <linux/tcp.h>
30 #include <linux/sunrpc/clnt.h>
31 #include <linux/sunrpc/sched.h>
32 #include <linux/file.h>
33
34 #include <net/sock.h>
35 #include <net/checksum.h>
36 #include <net/udp.h>
37 #include <net/tcp.h>
38
39 /*
40  * xprtsock tunables
41  */
42 unsigned int xprt_udp_slot_table_entries = RPC_DEF_SLOT_TABLE;
43 unsigned int xprt_tcp_slot_table_entries = RPC_DEF_SLOT_TABLE;
44
45 unsigned int xprt_min_resvport = RPC_DEF_MIN_RESVPORT;
46 unsigned int xprt_max_resvport = RPC_DEF_MAX_RESVPORT;
47
48 /*
49  * How many times to try sending a request on a socket before waiting
50  * for the socket buffer to clear.
51  */
52 #define XS_SENDMSG_RETRY        (10U)
53
54 /*
55  * Time out for an RPC UDP socket connect.  UDP socket connects are
56  * synchronous, but we set a timeout anyway in case of resource
57  * exhaustion on the local host.
58  */
59 #define XS_UDP_CONN_TO          (5U * HZ)
60
61 /*
62  * Wait duration for an RPC TCP connection to be established.  Solaris
63  * NFS over TCP uses 60 seconds, for example, which is in line with how
64  * long a server takes to reboot.
65  */
66 #define XS_TCP_CONN_TO          (60U * HZ)
67
68 /*
69  * Wait duration for a reply from the RPC portmapper.
70  */
71 #define XS_BIND_TO              (60U * HZ)
72
73 /*
74  * Delay if a UDP socket connect error occurs.  This is most likely some
75  * kind of resource problem on the local host.
76  */
77 #define XS_UDP_REEST_TO         (2U * HZ)
78
79 /*
80  * The reestablish timeout allows clients to delay for a bit before attempting
81  * to reconnect to a server that just dropped our connection.
82  *
83  * We implement an exponential backoff when trying to reestablish a TCP
84  * transport connection with the server.  Some servers like to drop a TCP
85  * connection when they are overworked, so we start with a short timeout and
86  * increase over time if the server is down or not responding.
87  */
88 #define XS_TCP_INIT_REEST_TO    (3U * HZ)
89 #define XS_TCP_MAX_REEST_TO     (5U * 60 * HZ)
90
91 /*
92  * TCP idle timeout; client drops the transport socket if it is idle
93  * for this long.  Note that we also timeout UDP sockets to prevent
94  * holding port numbers when there is no RPC traffic.
95  */
96 #define XS_IDLE_DISC_TO         (5U * 60 * HZ)
97
98 #ifdef RPC_DEBUG
99 # undef  RPC_DEBUG_DATA
100 # define RPCDBG_FACILITY        RPCDBG_TRANS
101 #endif
102
103 #ifdef RPC_DEBUG_DATA
104 static void xs_pktdump(char *msg, u32 *packet, unsigned int count)
105 {
106         u8 *buf = (u8 *) packet;
107         int j;
108
109         dprintk("RPC:      %s\n", msg);
110         for (j = 0; j < count && j < 128; j += 4) {
111                 if (!(j & 31)) {
112                         if (j)
113                                 dprintk("\n");
114                         dprintk("0x%04x ", j);
115                 }
116                 dprintk("%02x%02x%02x%02x ",
117                         buf[j], buf[j+1], buf[j+2], buf[j+3]);
118         }
119         dprintk("\n");
120 }
121 #else
122 static inline void xs_pktdump(char *msg, u32 *packet, unsigned int count)
123 {
124         /* NOP */
125 }
126 #endif
127
128 struct sock_xprt {
129         struct rpc_xprt         xprt;
130
131         /*
132          * Network layer
133          */
134         struct socket *         sock;
135         struct sock *           inet;
136
137         /*
138          * State of TCP reply receive
139          */
140         __be32                  tcp_fraghdr,
141                                 tcp_xid;
142
143         u32                     tcp_offset,
144                                 tcp_reclen;
145
146         unsigned long           tcp_copied,
147                                 tcp_flags;
148
149         /*
150          * Connection of transports
151          */
152         struct work_struct      connect_worker;
153         unsigned short          port;
154 };
155
156 /*
157  * TCP receive state flags
158  */
159 #define TCP_RCV_LAST_FRAG       (1UL << 0)
160 #define TCP_RCV_COPY_FRAGHDR    (1UL << 1)
161 #define TCP_RCV_COPY_XID        (1UL << 2)
162 #define TCP_RCV_COPY_DATA       (1UL << 3)
163
164 static void xs_format_peer_addresses(struct rpc_xprt *xprt)
165 {
166         struct sockaddr_in *addr = (struct sockaddr_in *) &xprt->addr;
167         char *buf;
168
169         buf = kzalloc(20, GFP_KERNEL);
170         if (buf) {
171                 snprintf(buf, 20, "%u.%u.%u.%u",
172                                 NIPQUAD(addr->sin_addr.s_addr));
173         }
174         xprt->address_strings[RPC_DISPLAY_ADDR] = buf;
175
176         buf = kzalloc(8, GFP_KERNEL);
177         if (buf) {
178                 snprintf(buf, 8, "%u",
179                                 ntohs(addr->sin_port));
180         }
181         xprt->address_strings[RPC_DISPLAY_PORT] = buf;
182
183         if (xprt->prot == IPPROTO_UDP)
184                 xprt->address_strings[RPC_DISPLAY_PROTO] = "udp";
185         else
186                 xprt->address_strings[RPC_DISPLAY_PROTO] = "tcp";
187
188         buf = kzalloc(48, GFP_KERNEL);
189         if (buf) {
190                 snprintf(buf, 48, "addr=%u.%u.%u.%u port=%u proto=%s",
191                         NIPQUAD(addr->sin_addr.s_addr),
192                         ntohs(addr->sin_port),
193                         xprt->prot == IPPROTO_UDP ? "udp" : "tcp");
194         }
195         xprt->address_strings[RPC_DISPLAY_ALL] = buf;
196 }
197
198 static void xs_free_peer_addresses(struct rpc_xprt *xprt)
199 {
200         kfree(xprt->address_strings[RPC_DISPLAY_ADDR]);
201         kfree(xprt->address_strings[RPC_DISPLAY_PORT]);
202         kfree(xprt->address_strings[RPC_DISPLAY_ALL]);
203 }
204
205 #define XS_SENDMSG_FLAGS        (MSG_DONTWAIT | MSG_NOSIGNAL)
206
207 static int xs_send_kvec(struct socket *sock, struct sockaddr *addr, int addrlen, struct kvec *vec, unsigned int base, int more)
208 {
209         struct msghdr msg = {
210                 .msg_name       = addr,
211                 .msg_namelen    = addrlen,
212                 .msg_flags      = XS_SENDMSG_FLAGS | (more ? MSG_MORE : 0),
213         };
214         struct kvec iov = {
215                 .iov_base       = vec->iov_base + base,
216                 .iov_len        = vec->iov_len - base,
217         };
218
219         if (iov.iov_len != 0)
220                 return kernel_sendmsg(sock, &msg, &iov, 1, iov.iov_len);
221         return kernel_sendmsg(sock, &msg, NULL, 0, 0);
222 }
223
224 static int xs_send_pagedata(struct socket *sock, struct xdr_buf *xdr, unsigned int base, int more)
225 {
226         struct page **ppage;
227         unsigned int remainder;
228         int err, sent = 0;
229
230         remainder = xdr->page_len - base;
231         base += xdr->page_base;
232         ppage = xdr->pages + (base >> PAGE_SHIFT);
233         base &= ~PAGE_MASK;
234         for(;;) {
235                 unsigned int len = min_t(unsigned int, PAGE_SIZE - base, remainder);
236                 int flags = XS_SENDMSG_FLAGS;
237
238                 remainder -= len;
239                 if (remainder != 0 || more)
240                         flags |= MSG_MORE;
241                 err = sock->ops->sendpage(sock, *ppage, base, len, flags);
242                 if (remainder == 0 || err != len)
243                         break;
244                 sent += err;
245                 ppage++;
246                 base = 0;
247         }
248         if (sent == 0)
249                 return err;
250         if (err > 0)
251                 sent += err;
252         return sent;
253 }
254
255 /**
256  * xs_sendpages - write pages directly to a socket
257  * @sock: socket to send on
258  * @addr: UDP only -- address of destination
259  * @addrlen: UDP only -- length of destination address
260  * @xdr: buffer containing this request
261  * @base: starting position in the buffer
262  *
263  */
264 static int xs_sendpages(struct socket *sock, struct sockaddr *addr, int addrlen, struct xdr_buf *xdr, unsigned int base)
265 {
266         unsigned int remainder = xdr->len - base;
267         int err, sent = 0;
268
269         if (unlikely(!sock))
270                 return -ENOTCONN;
271
272         clear_bit(SOCK_ASYNC_NOSPACE, &sock->flags);
273         if (base != 0) {
274                 addr = NULL;
275                 addrlen = 0;
276         }
277
278         if (base < xdr->head[0].iov_len || addr != NULL) {
279                 unsigned int len = xdr->head[0].iov_len - base;
280                 remainder -= len;
281                 err = xs_send_kvec(sock, addr, addrlen, &xdr->head[0], base, remainder != 0);
282                 if (remainder == 0 || err != len)
283                         goto out;
284                 sent += err;
285                 base = 0;
286         } else
287                 base -= xdr->head[0].iov_len;
288
289         if (base < xdr->page_len) {
290                 unsigned int len = xdr->page_len - base;
291                 remainder -= len;
292                 err = xs_send_pagedata(sock, xdr, base, remainder != 0);
293                 if (remainder == 0 || err != len)
294                         goto out;
295                 sent += err;
296                 base = 0;
297         } else
298                 base -= xdr->page_len;
299
300         if (base >= xdr->tail[0].iov_len)
301                 return sent;
302         err = xs_send_kvec(sock, NULL, 0, &xdr->tail[0], base, 0);
303 out:
304         if (sent == 0)
305                 return err;
306         if (err > 0)
307                 sent += err;
308         return sent;
309 }
310
311 /**
312  * xs_nospace - place task on wait queue if transmit was incomplete
313  * @task: task to put to sleep
314  *
315  */
316 static void xs_nospace(struct rpc_task *task)
317 {
318         struct rpc_rqst *req = task->tk_rqstp;
319         struct rpc_xprt *xprt = req->rq_xprt;
320         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
321
322         dprintk("RPC: %4d xmit incomplete (%u left of %u)\n",
323                         task->tk_pid, req->rq_slen - req->rq_bytes_sent,
324                         req->rq_slen);
325
326         if (test_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags)) {
327                 /* Protect against races with write_space */
328                 spin_lock_bh(&xprt->transport_lock);
329
330                 /* Don't race with disconnect */
331                 if (!xprt_connected(xprt))
332                         task->tk_status = -ENOTCONN;
333                 else if (test_bit(SOCK_NOSPACE, &transport->sock->flags))
334                         xprt_wait_for_buffer_space(task);
335
336                 spin_unlock_bh(&xprt->transport_lock);
337         } else
338                 /* Keep holding the socket if it is blocked */
339                 rpc_delay(task, HZ>>4);
340 }
341
342 /**
343  * xs_udp_send_request - write an RPC request to a UDP socket
344  * @task: address of RPC task that manages the state of an RPC request
345  *
346  * Return values:
347  *        0:    The request has been sent
348  *   EAGAIN:    The socket was blocked, please call again later to
349  *              complete the request
350  * ENOTCONN:    Caller needs to invoke connect logic then call again
351  *    other:    Some other error occured, the request was not sent
352  */
353 static int xs_udp_send_request(struct rpc_task *task)
354 {
355         struct rpc_rqst *req = task->tk_rqstp;
356         struct rpc_xprt *xprt = req->rq_xprt;
357         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
358         struct xdr_buf *xdr = &req->rq_snd_buf;
359         int status;
360
361         xs_pktdump("packet data:",
362                                 req->rq_svec->iov_base,
363                                 req->rq_svec->iov_len);
364
365         req->rq_xtime = jiffies;
366         status = xs_sendpages(transport->sock,
367                               (struct sockaddr *) &xprt->addr,
368                               xprt->addrlen, xdr,
369                               req->rq_bytes_sent);
370
371         dprintk("RPC:      xs_udp_send_request(%u) = %d\n",
372                         xdr->len - req->rq_bytes_sent, status);
373
374         if (likely(status >= (int) req->rq_slen))
375                 return 0;
376
377         /* Still some bytes left; set up for a retry later. */
378         if (status > 0)
379                 status = -EAGAIN;
380
381         switch (status) {
382         case -ENETUNREACH:
383         case -EPIPE:
384         case -ECONNREFUSED:
385                 /* When the server has died, an ICMP port unreachable message
386                  * prompts ECONNREFUSED. */
387                 break;
388         case -EAGAIN:
389                 xs_nospace(task);
390                 break;
391         default:
392                 dprintk("RPC:      sendmsg returned unrecognized error %d\n",
393                         -status);
394                 break;
395         }
396
397         return status;
398 }
399
400 static inline void xs_encode_tcp_record_marker(struct xdr_buf *buf)
401 {
402         u32 reclen = buf->len - sizeof(rpc_fraghdr);
403         rpc_fraghdr *base = buf->head[0].iov_base;
404         *base = htonl(RPC_LAST_STREAM_FRAGMENT | reclen);
405 }
406
407 /**
408  * xs_tcp_send_request - write an RPC request to a TCP socket
409  * @task: address of RPC task that manages the state of an RPC request
410  *
411  * Return values:
412  *        0:    The request has been sent
413  *   EAGAIN:    The socket was blocked, please call again later to
414  *              complete the request
415  * ENOTCONN:    Caller needs to invoke connect logic then call again
416  *    other:    Some other error occured, the request was not sent
417  *
418  * XXX: In the case of soft timeouts, should we eventually give up
419  *      if sendmsg is not able to make progress?
420  */
421 static int xs_tcp_send_request(struct rpc_task *task)
422 {
423         struct rpc_rqst *req = task->tk_rqstp;
424         struct rpc_xprt *xprt = req->rq_xprt;
425         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
426         struct xdr_buf *xdr = &req->rq_snd_buf;
427         int status, retry = 0;
428
429         xs_encode_tcp_record_marker(&req->rq_snd_buf);
430
431         xs_pktdump("packet data:",
432                                 req->rq_svec->iov_base,
433                                 req->rq_svec->iov_len);
434
435         /* Continue transmitting the packet/record. We must be careful
436          * to cope with writespace callbacks arriving _after_ we have
437          * called sendmsg(). */
438         while (1) {
439                 req->rq_xtime = jiffies;
440                 status = xs_sendpages(transport->sock,
441                                         NULL, 0, xdr, req->rq_bytes_sent);
442
443                 dprintk("RPC:      xs_tcp_send_request(%u) = %d\n",
444                                 xdr->len - req->rq_bytes_sent, status);
445
446                 if (unlikely(status < 0))
447                         break;
448
449                 /* If we've sent the entire packet, immediately
450                  * reset the count of bytes sent. */
451                 req->rq_bytes_sent += status;
452                 task->tk_bytes_sent += status;
453                 if (likely(req->rq_bytes_sent >= req->rq_slen)) {
454                         req->rq_bytes_sent = 0;
455                         return 0;
456                 }
457
458                 status = -EAGAIN;
459                 if (retry++ > XS_SENDMSG_RETRY)
460                         break;
461         }
462
463         switch (status) {
464         case -EAGAIN:
465                 xs_nospace(task);
466                 break;
467         case -ECONNREFUSED:
468         case -ECONNRESET:
469         case -ENOTCONN:
470         case -EPIPE:
471                 status = -ENOTCONN;
472                 break;
473         default:
474                 dprintk("RPC:      sendmsg returned unrecognized error %d\n",
475                         -status);
476                 xprt_disconnect(xprt);
477                 break;
478         }
479
480         return status;
481 }
482
483 /**
484  * xs_tcp_release_xprt - clean up after a tcp transmission
485  * @xprt: transport
486  * @task: rpc task
487  *
488  * This cleans up if an error causes us to abort the transmission of a request.
489  * In this case, the socket may need to be reset in order to avoid confusing
490  * the server.
491  */
492 static void xs_tcp_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task)
493 {
494         struct rpc_rqst *req;
495
496         if (task != xprt->snd_task)
497                 return;
498         if (task == NULL)
499                 goto out_release;
500         req = task->tk_rqstp;
501         if (req->rq_bytes_sent == 0)
502                 goto out_release;
503         if (req->rq_bytes_sent == req->rq_snd_buf.len)
504                 goto out_release;
505         set_bit(XPRT_CLOSE_WAIT, &task->tk_xprt->state);
506 out_release:
507         xprt_release_xprt(xprt, task);
508 }
509
510 /**
511  * xs_close - close a socket
512  * @xprt: transport
513  *
514  * This is used when all requests are complete; ie, no DRC state remains
515  * on the server we want to save.
516  */
517 static void xs_close(struct rpc_xprt *xprt)
518 {
519         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
520         struct socket *sock = transport->sock;
521         struct sock *sk = transport->inet;
522
523         if (!sk)
524                 goto clear_close_wait;
525
526         dprintk("RPC:      xs_close xprt %p\n", xprt);
527
528         write_lock_bh(&sk->sk_callback_lock);
529         transport->inet = NULL;
530         transport->sock = NULL;
531
532         sk->sk_user_data = NULL;
533         sk->sk_data_ready = xprt->old_data_ready;
534         sk->sk_state_change = xprt->old_state_change;
535         sk->sk_write_space = xprt->old_write_space;
536         write_unlock_bh(&sk->sk_callback_lock);
537
538         sk->sk_no_check = 0;
539
540         sock_release(sock);
541 clear_close_wait:
542         smp_mb__before_clear_bit();
543         clear_bit(XPRT_CLOSE_WAIT, &xprt->state);
544         smp_mb__after_clear_bit();
545 }
546
547 /**
548  * xs_destroy - prepare to shutdown a transport
549  * @xprt: doomed transport
550  *
551  */
552 static void xs_destroy(struct rpc_xprt *xprt)
553 {
554         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
555
556         dprintk("RPC:      xs_destroy xprt %p\n", xprt);
557
558         cancel_delayed_work(&transport->connect_worker);
559         flush_scheduled_work();
560
561         xprt_disconnect(xprt);
562         xs_close(xprt);
563         xs_free_peer_addresses(xprt);
564         kfree(xprt->slot);
565         kfree(xprt);
566 }
567
568 static inline struct rpc_xprt *xprt_from_sock(struct sock *sk)
569 {
570         return (struct rpc_xprt *) sk->sk_user_data;
571 }
572
573 /**
574  * xs_udp_data_ready - "data ready" callback for UDP sockets
575  * @sk: socket with data to read
576  * @len: how much data to read
577  *
578  */
579 static void xs_udp_data_ready(struct sock *sk, int len)
580 {
581         struct rpc_task *task;
582         struct rpc_xprt *xprt;
583         struct rpc_rqst *rovr;
584         struct sk_buff *skb;
585         int err, repsize, copied;
586         u32 _xid;
587         __be32 *xp;
588
589         read_lock(&sk->sk_callback_lock);
590         dprintk("RPC:      xs_udp_data_ready...\n");
591         if (!(xprt = xprt_from_sock(sk)))
592                 goto out;
593
594         if ((skb = skb_recv_datagram(sk, 0, 1, &err)) == NULL)
595                 goto out;
596
597         if (xprt->shutdown)
598                 goto dropit;
599
600         repsize = skb->len - sizeof(struct udphdr);
601         if (repsize < 4) {
602                 dprintk("RPC:      impossible RPC reply size %d!\n", repsize);
603                 goto dropit;
604         }
605
606         /* Copy the XID from the skb... */
607         xp = skb_header_pointer(skb, sizeof(struct udphdr),
608                                 sizeof(_xid), &_xid);
609         if (xp == NULL)
610                 goto dropit;
611
612         /* Look up and lock the request corresponding to the given XID */
613         spin_lock(&xprt->transport_lock);
614         rovr = xprt_lookup_rqst(xprt, *xp);
615         if (!rovr)
616                 goto out_unlock;
617         task = rovr->rq_task;
618
619         if ((copied = rovr->rq_private_buf.buflen) > repsize)
620                 copied = repsize;
621
622         /* Suck it into the iovec, verify checksum if not done by hw. */
623         if (csum_partial_copy_to_xdr(&rovr->rq_private_buf, skb))
624                 goto out_unlock;
625
626         /* Something worked... */
627         dst_confirm(skb->dst);
628
629         xprt_adjust_cwnd(task, copied);
630         xprt_update_rtt(task);
631         xprt_complete_rqst(task, copied);
632
633  out_unlock:
634         spin_unlock(&xprt->transport_lock);
635  dropit:
636         skb_free_datagram(sk, skb);
637  out:
638         read_unlock(&sk->sk_callback_lock);
639 }
640
641 static inline size_t xs_tcp_copy_data(skb_reader_t *desc, void *p, size_t len)
642 {
643         if (len > desc->count)
644                 len = desc->count;
645         if (skb_copy_bits(desc->skb, desc->offset, p, len)) {
646                 dprintk("RPC:      failed to copy %zu bytes from skb. %zu bytes remain\n",
647                                 len, desc->count);
648                 return 0;
649         }
650         desc->offset += len;
651         desc->count -= len;
652         dprintk("RPC:      copied %zu bytes from skb. %zu bytes remain\n",
653                         len, desc->count);
654         return len;
655 }
656
657 static inline void xs_tcp_read_fraghdr(struct rpc_xprt *xprt, skb_reader_t *desc)
658 {
659         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
660         size_t len, used;
661         char *p;
662
663         p = ((char *) &transport->tcp_fraghdr) + transport->tcp_offset;
664         len = sizeof(transport->tcp_fraghdr) - transport->tcp_offset;
665         used = xs_tcp_copy_data(desc, p, len);
666         transport->tcp_offset += used;
667         if (used != len)
668                 return;
669
670         transport->tcp_reclen = ntohl(transport->tcp_fraghdr);
671         if (transport->tcp_reclen & RPC_LAST_STREAM_FRAGMENT)
672                 transport->tcp_flags |= TCP_RCV_LAST_FRAG;
673         else
674                 transport->tcp_flags &= ~TCP_RCV_LAST_FRAG;
675         transport->tcp_reclen &= RPC_FRAGMENT_SIZE_MASK;
676
677         transport->tcp_flags &= ~TCP_RCV_COPY_FRAGHDR;
678         transport->tcp_offset = 0;
679
680         /* Sanity check of the record length */
681         if (unlikely(transport->tcp_reclen < 4)) {
682                 dprintk("RPC:      invalid TCP record fragment length\n");
683                 xprt_disconnect(xprt);
684                 return;
685         }
686         dprintk("RPC:      reading TCP record fragment of length %d\n",
687                         transport->tcp_reclen);
688 }
689
690 static void xs_tcp_check_fraghdr(struct sock_xprt *transport)
691 {
692         if (transport->tcp_offset == transport->tcp_reclen) {
693                 transport->tcp_flags |= TCP_RCV_COPY_FRAGHDR;
694                 transport->tcp_offset = 0;
695                 if (transport->tcp_flags & TCP_RCV_LAST_FRAG) {
696                         transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
697                         transport->tcp_flags |= TCP_RCV_COPY_XID;
698                         transport->tcp_copied = 0;
699                 }
700         }
701 }
702
703 static inline void xs_tcp_read_xid(struct sock_xprt *transport, skb_reader_t *desc)
704 {
705         size_t len, used;
706         char *p;
707
708         len = sizeof(transport->tcp_xid) - transport->tcp_offset;
709         dprintk("RPC:      reading XID (%Zu bytes)\n", len);
710         p = ((char *) &transport->tcp_xid) + transport->tcp_offset;
711         used = xs_tcp_copy_data(desc, p, len);
712         transport->tcp_offset += used;
713         if (used != len)
714                 return;
715         transport->tcp_flags &= ~TCP_RCV_COPY_XID;
716         transport->tcp_flags |= TCP_RCV_COPY_DATA;
717         transport->tcp_copied = 4;
718         dprintk("RPC:      reading reply for XID %08x\n",
719                         ntohl(transport->tcp_xid));
720         xs_tcp_check_fraghdr(transport);
721 }
722
723 static inline void xs_tcp_read_request(struct rpc_xprt *xprt, skb_reader_t *desc)
724 {
725         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
726         struct rpc_rqst *req;
727         struct xdr_buf *rcvbuf;
728         size_t len;
729         ssize_t r;
730
731         /* Find and lock the request corresponding to this xid */
732         spin_lock(&xprt->transport_lock);
733         req = xprt_lookup_rqst(xprt, transport->tcp_xid);
734         if (!req) {
735                 transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
736                 dprintk("RPC:      XID %08x request not found!\n",
737                                 ntohl(transport->tcp_xid));
738                 spin_unlock(&xprt->transport_lock);
739                 return;
740         }
741
742         rcvbuf = &req->rq_private_buf;
743         len = desc->count;
744         if (len > transport->tcp_reclen - transport->tcp_offset) {
745                 skb_reader_t my_desc;
746
747                 len = transport->tcp_reclen - transport->tcp_offset;
748                 memcpy(&my_desc, desc, sizeof(my_desc));
749                 my_desc.count = len;
750                 r = xdr_partial_copy_from_skb(rcvbuf, transport->tcp_copied,
751                                           &my_desc, xs_tcp_copy_data);
752                 desc->count -= r;
753                 desc->offset += r;
754         } else
755                 r = xdr_partial_copy_from_skb(rcvbuf, transport->tcp_copied,
756                                           desc, xs_tcp_copy_data);
757
758         if (r > 0) {
759                 transport->tcp_copied += r;
760                 transport->tcp_offset += r;
761         }
762         if (r != len) {
763                 /* Error when copying to the receive buffer,
764                  * usually because we weren't able to allocate
765                  * additional buffer pages. All we can do now
766                  * is turn off TCP_RCV_COPY_DATA, so the request
767                  * will not receive any additional updates,
768                  * and time out.
769                  * Any remaining data from this record will
770                  * be discarded.
771                  */
772                 transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
773                 dprintk("RPC:      XID %08x truncated request\n",
774                                 ntohl(transport->tcp_xid));
775                 dprintk("RPC:      xprt = %p, tcp_copied = %lu, tcp_offset = %u, tcp_reclen = %u\n",
776                                 xprt, transport->tcp_copied, transport->tcp_offset,
777                                         transport->tcp_reclen);
778                 goto out;
779         }
780
781         dprintk("RPC:      XID %08x read %Zd bytes\n",
782                         ntohl(transport->tcp_xid), r);
783         dprintk("RPC:      xprt = %p, tcp_copied = %lu, tcp_offset = %u, tcp_reclen = %u\n",
784                         xprt, transport->tcp_copied, transport->tcp_offset,
785                                 transport->tcp_reclen);
786
787         if (transport->tcp_copied == req->rq_private_buf.buflen)
788                 transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
789         else if (transport->tcp_offset == transport->tcp_reclen) {
790                 if (transport->tcp_flags & TCP_RCV_LAST_FRAG)
791                         transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
792         }
793
794 out:
795         if (!(transport->tcp_flags & TCP_RCV_COPY_DATA))
796                 xprt_complete_rqst(req->rq_task, transport->tcp_copied);
797         spin_unlock(&xprt->transport_lock);
798         xs_tcp_check_fraghdr(transport);
799 }
800
801 static inline void xs_tcp_read_discard(struct sock_xprt *transport, skb_reader_t *desc)
802 {
803         size_t len;
804
805         len = transport->tcp_reclen - transport->tcp_offset;
806         if (len > desc->count)
807                 len = desc->count;
808         desc->count -= len;
809         desc->offset += len;
810         transport->tcp_offset += len;
811         dprintk("RPC:      discarded %Zu bytes\n", len);
812         xs_tcp_check_fraghdr(transport);
813 }
814
815 static int xs_tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb, unsigned int offset, size_t len)
816 {
817         struct rpc_xprt *xprt = rd_desc->arg.data;
818         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
819         skb_reader_t desc = {
820                 .skb    = skb,
821                 .offset = offset,
822                 .count  = len,
823         };
824
825         dprintk("RPC:      xs_tcp_data_recv started\n");
826         do {
827                 /* Read in a new fragment marker if necessary */
828                 /* Can we ever really expect to get completely empty fragments? */
829                 if (transport->tcp_flags & TCP_RCV_COPY_FRAGHDR) {
830                         xs_tcp_read_fraghdr(xprt, &desc);
831                         continue;
832                 }
833                 /* Read in the xid if necessary */
834                 if (transport->tcp_flags & TCP_RCV_COPY_XID) {
835                         xs_tcp_read_xid(transport, &desc);
836                         continue;
837                 }
838                 /* Read in the request data */
839                 if (transport->tcp_flags & TCP_RCV_COPY_DATA) {
840                         xs_tcp_read_request(xprt, &desc);
841                         continue;
842                 }
843                 /* Skip over any trailing bytes on short reads */
844                 xs_tcp_read_discard(transport, &desc);
845         } while (desc.count);
846         dprintk("RPC:      xs_tcp_data_recv done\n");
847         return len - desc.count;
848 }
849
850 /**
851  * xs_tcp_data_ready - "data ready" callback for TCP sockets
852  * @sk: socket with data to read
853  * @bytes: how much data to read
854  *
855  */
856 static void xs_tcp_data_ready(struct sock *sk, int bytes)
857 {
858         struct rpc_xprt *xprt;
859         read_descriptor_t rd_desc;
860
861         read_lock(&sk->sk_callback_lock);
862         dprintk("RPC:      xs_tcp_data_ready...\n");
863         if (!(xprt = xprt_from_sock(sk)))
864                 goto out;
865         if (xprt->shutdown)
866                 goto out;
867
868         /* We use rd_desc to pass struct xprt to xs_tcp_data_recv */
869         rd_desc.arg.data = xprt;
870         rd_desc.count = 65536;
871         tcp_read_sock(sk, &rd_desc, xs_tcp_data_recv);
872 out:
873         read_unlock(&sk->sk_callback_lock);
874 }
875
876 /**
877  * xs_tcp_state_change - callback to handle TCP socket state changes
878  * @sk: socket whose state has changed
879  *
880  */
881 static void xs_tcp_state_change(struct sock *sk)
882 {
883         struct rpc_xprt *xprt;
884
885         read_lock(&sk->sk_callback_lock);
886         if (!(xprt = xprt_from_sock(sk)))
887                 goto out;
888         dprintk("RPC:      xs_tcp_state_change client %p...\n", xprt);
889         dprintk("RPC:      state %x conn %d dead %d zapped %d\n",
890                                 sk->sk_state, xprt_connected(xprt),
891                                 sock_flag(sk, SOCK_DEAD),
892                                 sock_flag(sk, SOCK_ZAPPED));
893
894         switch (sk->sk_state) {
895         case TCP_ESTABLISHED:
896                 spin_lock_bh(&xprt->transport_lock);
897                 if (!xprt_test_and_set_connected(xprt)) {
898                         struct sock_xprt *transport = container_of(xprt,
899                                         struct sock_xprt, xprt);
900
901                         /* Reset TCP record info */
902                         transport->tcp_offset = 0;
903                         transport->tcp_reclen = 0;
904                         transport->tcp_copied = 0;
905                         transport->tcp_flags =
906                                 TCP_RCV_COPY_FRAGHDR | TCP_RCV_COPY_XID;
907
908                         xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO;
909                         xprt_wake_pending_tasks(xprt, 0);
910                 }
911                 spin_unlock_bh(&xprt->transport_lock);
912                 break;
913         case TCP_SYN_SENT:
914         case TCP_SYN_RECV:
915                 break;
916         case TCP_CLOSE_WAIT:
917                 /* Try to schedule an autoclose RPC calls */
918                 set_bit(XPRT_CLOSE_WAIT, &xprt->state);
919                 if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0)
920                         schedule_work(&xprt->task_cleanup);
921         default:
922                 xprt_disconnect(xprt);
923         }
924  out:
925         read_unlock(&sk->sk_callback_lock);
926 }
927
928 /**
929  * xs_udp_write_space - callback invoked when socket buffer space
930  *                             becomes available
931  * @sk: socket whose state has changed
932  *
933  * Called when more output buffer space is available for this socket.
934  * We try not to wake our writers until they can make "significant"
935  * progress, otherwise we'll waste resources thrashing kernel_sendmsg
936  * with a bunch of small requests.
937  */
938 static void xs_udp_write_space(struct sock *sk)
939 {
940         read_lock(&sk->sk_callback_lock);
941
942         /* from net/core/sock.c:sock_def_write_space */
943         if (sock_writeable(sk)) {
944                 struct socket *sock;
945                 struct rpc_xprt *xprt;
946
947                 if (unlikely(!(sock = sk->sk_socket)))
948                         goto out;
949                 if (unlikely(!(xprt = xprt_from_sock(sk))))
950                         goto out;
951                 if (unlikely(!test_and_clear_bit(SOCK_NOSPACE, &sock->flags)))
952                         goto out;
953
954                 xprt_write_space(xprt);
955         }
956
957  out:
958         read_unlock(&sk->sk_callback_lock);
959 }
960
961 /**
962  * xs_tcp_write_space - callback invoked when socket buffer space
963  *                             becomes available
964  * @sk: socket whose state has changed
965  *
966  * Called when more output buffer space is available for this socket.
967  * We try not to wake our writers until they can make "significant"
968  * progress, otherwise we'll waste resources thrashing kernel_sendmsg
969  * with a bunch of small requests.
970  */
971 static void xs_tcp_write_space(struct sock *sk)
972 {
973         read_lock(&sk->sk_callback_lock);
974
975         /* from net/core/stream.c:sk_stream_write_space */
976         if (sk_stream_wspace(sk) >= sk_stream_min_wspace(sk)) {
977                 struct socket *sock;
978                 struct rpc_xprt *xprt;
979
980                 if (unlikely(!(sock = sk->sk_socket)))
981                         goto out;
982                 if (unlikely(!(xprt = xprt_from_sock(sk))))
983                         goto out;
984                 if (unlikely(!test_and_clear_bit(SOCK_NOSPACE, &sock->flags)))
985                         goto out;
986
987                 xprt_write_space(xprt);
988         }
989
990  out:
991         read_unlock(&sk->sk_callback_lock);
992 }
993
994 static void xs_udp_do_set_buffer_size(struct rpc_xprt *xprt)
995 {
996         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
997         struct sock *sk = transport->inet;
998
999         if (xprt->rcvsize) {
1000                 sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
1001                 sk->sk_rcvbuf = xprt->rcvsize * xprt->max_reqs *  2;
1002         }
1003         if (xprt->sndsize) {
1004                 sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
1005                 sk->sk_sndbuf = xprt->sndsize * xprt->max_reqs * 2;
1006                 sk->sk_write_space(sk);
1007         }
1008 }
1009
1010 /**
1011  * xs_udp_set_buffer_size - set send and receive limits
1012  * @xprt: generic transport
1013  * @sndsize: requested size of send buffer, in bytes
1014  * @rcvsize: requested size of receive buffer, in bytes
1015  *
1016  * Set socket send and receive buffer size limits.
1017  */
1018 static void xs_udp_set_buffer_size(struct rpc_xprt *xprt, size_t sndsize, size_t rcvsize)
1019 {
1020         xprt->sndsize = 0;
1021         if (sndsize)
1022                 xprt->sndsize = sndsize + 1024;
1023         xprt->rcvsize = 0;
1024         if (rcvsize)
1025                 xprt->rcvsize = rcvsize + 1024;
1026
1027         xs_udp_do_set_buffer_size(xprt);
1028 }
1029
1030 /**
1031  * xs_udp_timer - called when a retransmit timeout occurs on a UDP transport
1032  * @task: task that timed out
1033  *
1034  * Adjust the congestion window after a retransmit timeout has occurred.
1035  */
1036 static void xs_udp_timer(struct rpc_task *task)
1037 {
1038         xprt_adjust_cwnd(task, -ETIMEDOUT);
1039 }
1040
1041 static unsigned short xs_get_random_port(void)
1042 {
1043         unsigned short range = xprt_max_resvport - xprt_min_resvport;
1044         unsigned short rand = (unsigned short) net_random() % range;
1045         return rand + xprt_min_resvport;
1046 }
1047
1048 /**
1049  * xs_print_peer_address - format an IPv4 address for printing
1050  * @xprt: generic transport
1051  * @format: flags field indicating which parts of the address to render
1052  */
1053 static char *xs_print_peer_address(struct rpc_xprt *xprt, enum rpc_display_format_t format)
1054 {
1055         if (xprt->address_strings[format] != NULL)
1056                 return xprt->address_strings[format];
1057         else
1058                 return "unprintable";
1059 }
1060
1061 /**
1062  * xs_set_port - reset the port number in the remote endpoint address
1063  * @xprt: generic transport
1064  * @port: new port number
1065  *
1066  */
1067 static void xs_set_port(struct rpc_xprt *xprt, unsigned short port)
1068 {
1069         struct sockaddr_in *sap = (struct sockaddr_in *) &xprt->addr;
1070
1071         dprintk("RPC:      setting port for xprt %p to %u\n", xprt, port);
1072
1073         sap->sin_port = htons(port);
1074 }
1075
1076 static int xs_bindresvport(struct sock_xprt *transport, struct socket *sock)
1077 {
1078         struct sockaddr_in myaddr = {
1079                 .sin_family = AF_INET,
1080         };
1081         int err;
1082         unsigned short port = transport->port;
1083
1084         do {
1085                 myaddr.sin_port = htons(port);
1086                 err = kernel_bind(sock, (struct sockaddr *) &myaddr,
1087                                                 sizeof(myaddr));
1088                 if (err == 0) {
1089                         transport->port = port;
1090                         dprintk("RPC:      xs_bindresvport bound to port %u\n",
1091                                         port);
1092                         return 0;
1093                 }
1094                 if (port <= xprt_min_resvport)
1095                         port = xprt_max_resvport;
1096                 else
1097                         port--;
1098         } while (err == -EADDRINUSE && port != transport->port);
1099
1100         dprintk("RPC:      can't bind to reserved port (%d).\n", -err);
1101         return err;
1102 }
1103
1104 /**
1105  * xs_udp_connect_worker - set up a UDP socket
1106  * @args: RPC transport to connect
1107  *
1108  * Invoked by a work queue tasklet.
1109  */
1110 static void xs_udp_connect_worker(void *args)
1111 {
1112         struct sock_xprt *transport = (struct sock_xprt *)args;
1113         struct rpc_xprt *xprt = &transport->xprt;
1114         struct socket *sock = transport->sock;
1115         int err, status = -EIO;
1116
1117         if (xprt->shutdown || !xprt_bound(xprt))
1118                 goto out;
1119
1120         /* Start by resetting any existing state */
1121         xs_close(xprt);
1122
1123         if ((err = sock_create_kern(PF_INET, SOCK_DGRAM, IPPROTO_UDP, &sock)) < 0) {
1124                 dprintk("RPC:      can't create UDP transport socket (%d).\n", -err);
1125                 goto out;
1126         }
1127
1128         if (xprt->resvport && xs_bindresvport(transport, sock) < 0) {
1129                 sock_release(sock);
1130                 goto out;
1131         }
1132
1133         dprintk("RPC:      worker connecting xprt %p to address: %s\n",
1134                         xprt, xs_print_peer_address(xprt, RPC_DISPLAY_ALL));
1135
1136         if (!transport->inet) {
1137                 struct sock *sk = sock->sk;
1138
1139                 write_lock_bh(&sk->sk_callback_lock);
1140
1141                 sk->sk_user_data = xprt;
1142                 xprt->old_data_ready = sk->sk_data_ready;
1143                 xprt->old_state_change = sk->sk_state_change;
1144                 xprt->old_write_space = sk->sk_write_space;
1145                 sk->sk_data_ready = xs_udp_data_ready;
1146                 sk->sk_write_space = xs_udp_write_space;
1147                 sk->sk_no_check = UDP_CSUM_NORCV;
1148                 sk->sk_allocation = GFP_ATOMIC;
1149
1150                 xprt_set_connected(xprt);
1151
1152                 /* Reset to new socket */
1153                 transport->sock = sock;
1154                 transport->inet = sk;
1155
1156                 write_unlock_bh(&sk->sk_callback_lock);
1157         }
1158         xs_udp_do_set_buffer_size(xprt);
1159         status = 0;
1160 out:
1161         xprt_wake_pending_tasks(xprt, status);
1162         xprt_clear_connecting(xprt);
1163 }
1164
1165 /*
1166  * We need to preserve the port number so the reply cache on the server can
1167  * find our cached RPC replies when we get around to reconnecting.
1168  */
1169 static void xs_tcp_reuse_connection(struct rpc_xprt *xprt)
1170 {
1171         int result;
1172         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1173         struct sockaddr any;
1174
1175         dprintk("RPC:      disconnecting xprt %p to reuse port\n", xprt);
1176
1177         /*
1178          * Disconnect the transport socket by doing a connect operation
1179          * with AF_UNSPEC.  This should return immediately...
1180          */
1181         memset(&any, 0, sizeof(any));
1182         any.sa_family = AF_UNSPEC;
1183         result = kernel_connect(transport->sock, &any, sizeof(any), 0);
1184         if (result)
1185                 dprintk("RPC:      AF_UNSPEC connect return code %d\n",
1186                                 result);
1187 }
1188
1189 /**
1190  * xs_tcp_connect_worker - connect a TCP socket to a remote endpoint
1191  * @args: RPC transport to connect
1192  *
1193  * Invoked by a work queue tasklet.
1194  */
1195 static void xs_tcp_connect_worker(void *args)
1196 {
1197         struct sock_xprt *transport = (struct sock_xprt *)args;
1198         struct rpc_xprt *xprt = &transport->xprt;
1199         struct socket *sock = transport->sock;
1200         int err, status = -EIO;
1201
1202         if (xprt->shutdown || !xprt_bound(xprt))
1203                 goto out;
1204
1205         if (!sock) {
1206                 /* start from scratch */
1207                 if ((err = sock_create_kern(PF_INET, SOCK_STREAM, IPPROTO_TCP, &sock)) < 0) {
1208                         dprintk("RPC:      can't create TCP transport socket (%d).\n", -err);
1209                         goto out;
1210                 }
1211
1212                 if (xprt->resvport && xs_bindresvport(transport, sock) < 0) {
1213                         sock_release(sock);
1214                         goto out;
1215                 }
1216         } else
1217                 /* "close" the socket, preserving the local port */
1218                 xs_tcp_reuse_connection(xprt);
1219
1220         dprintk("RPC:      worker connecting xprt %p to address: %s\n",
1221                         xprt, xs_print_peer_address(xprt, RPC_DISPLAY_ALL));
1222
1223         if (!transport->inet) {
1224                 struct sock *sk = sock->sk;
1225
1226                 write_lock_bh(&sk->sk_callback_lock);
1227
1228                 sk->sk_user_data = xprt;
1229                 xprt->old_data_ready = sk->sk_data_ready;
1230                 xprt->old_state_change = sk->sk_state_change;
1231                 xprt->old_write_space = sk->sk_write_space;
1232                 sk->sk_data_ready = xs_tcp_data_ready;
1233                 sk->sk_state_change = xs_tcp_state_change;
1234                 sk->sk_write_space = xs_tcp_write_space;
1235                 sk->sk_allocation = GFP_ATOMIC;
1236
1237                 /* socket options */
1238                 sk->sk_userlocks |= SOCK_BINDPORT_LOCK;
1239                 sock_reset_flag(sk, SOCK_LINGER);
1240                 tcp_sk(sk)->linger2 = 0;
1241                 tcp_sk(sk)->nonagle |= TCP_NAGLE_OFF;
1242
1243                 xprt_clear_connected(xprt);
1244
1245                 /* Reset to new socket */
1246                 transport->sock = sock;
1247                 transport->inet = sk;
1248
1249                 write_unlock_bh(&sk->sk_callback_lock);
1250         }
1251
1252         /* Tell the socket layer to start connecting... */
1253         xprt->stat.connect_count++;
1254         xprt->stat.connect_start = jiffies;
1255         status = kernel_connect(sock, (struct sockaddr *) &xprt->addr,
1256                         xprt->addrlen, O_NONBLOCK);
1257         dprintk("RPC: %p  connect status %d connected %d sock state %d\n",
1258                         xprt, -status, xprt_connected(xprt), sock->sk->sk_state);
1259         if (status < 0) {
1260                 switch (status) {
1261                         case -EINPROGRESS:
1262                         case -EALREADY:
1263                                 goto out_clear;
1264                         case -ECONNREFUSED:
1265                         case -ECONNRESET:
1266                                 /* retry with existing socket, after a delay */
1267                                 break;
1268                         default:
1269                                 /* get rid of existing socket, and retry */
1270                                 xs_close(xprt);
1271                                 break;
1272                 }
1273         }
1274 out:
1275         xprt_wake_pending_tasks(xprt, status);
1276 out_clear:
1277         xprt_clear_connecting(xprt);
1278 }
1279
1280 /**
1281  * xs_connect - connect a socket to a remote endpoint
1282  * @task: address of RPC task that manages state of connect request
1283  *
1284  * TCP: If the remote end dropped the connection, delay reconnecting.
1285  *
1286  * UDP socket connects are synchronous, but we use a work queue anyway
1287  * to guarantee that even unprivileged user processes can set up a
1288  * socket on a privileged port.
1289  *
1290  * If a UDP socket connect fails, the delay behavior here prevents
1291  * retry floods (hard mounts).
1292  */
1293 static void xs_connect(struct rpc_task *task)
1294 {
1295         struct rpc_xprt *xprt = task->tk_xprt;
1296         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1297
1298         if (xprt_test_and_set_connecting(xprt))
1299                 return;
1300
1301         if (transport->sock != NULL) {
1302                 dprintk("RPC:      xs_connect delayed xprt %p for %lu seconds\n",
1303                                 xprt, xprt->reestablish_timeout / HZ);
1304                 schedule_delayed_work(&transport->connect_worker,
1305                                         xprt->reestablish_timeout);
1306                 xprt->reestablish_timeout <<= 1;
1307                 if (xprt->reestablish_timeout > XS_TCP_MAX_REEST_TO)
1308                         xprt->reestablish_timeout = XS_TCP_MAX_REEST_TO;
1309         } else {
1310                 dprintk("RPC:      xs_connect scheduled xprt %p\n", xprt);
1311                 schedule_work(&transport->connect_worker);
1312
1313                 /* flush_scheduled_work can sleep... */
1314                 if (!RPC_IS_ASYNC(task))
1315                         flush_scheduled_work();
1316         }
1317 }
1318
1319 /**
1320  * xs_udp_print_stats - display UDP socket-specifc stats
1321  * @xprt: rpc_xprt struct containing statistics
1322  * @seq: output file
1323  *
1324  */
1325 static void xs_udp_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
1326 {
1327         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1328
1329         seq_printf(seq, "\txprt:\tudp %u %lu %lu %lu %lu %Lu %Lu\n",
1330                         transport->port,
1331                         xprt->stat.bind_count,
1332                         xprt->stat.sends,
1333                         xprt->stat.recvs,
1334                         xprt->stat.bad_xids,
1335                         xprt->stat.req_u,
1336                         xprt->stat.bklog_u);
1337 }
1338
1339 /**
1340  * xs_tcp_print_stats - display TCP socket-specifc stats
1341  * @xprt: rpc_xprt struct containing statistics
1342  * @seq: output file
1343  *
1344  */
1345 static void xs_tcp_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
1346 {
1347         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1348         long idle_time = 0;
1349
1350         if (xprt_connected(xprt))
1351                 idle_time = (long)(jiffies - xprt->last_used) / HZ;
1352
1353         seq_printf(seq, "\txprt:\ttcp %u %lu %lu %lu %ld %lu %lu %lu %Lu %Lu\n",
1354                         transport->port,
1355                         xprt->stat.bind_count,
1356                         xprt->stat.connect_count,
1357                         xprt->stat.connect_time,
1358                         idle_time,
1359                         xprt->stat.sends,
1360                         xprt->stat.recvs,
1361                         xprt->stat.bad_xids,
1362                         xprt->stat.req_u,
1363                         xprt->stat.bklog_u);
1364 }
1365
1366 static struct rpc_xprt_ops xs_udp_ops = {
1367         .set_buffer_size        = xs_udp_set_buffer_size,
1368         .print_addr             = xs_print_peer_address,
1369         .reserve_xprt           = xprt_reserve_xprt_cong,
1370         .release_xprt           = xprt_release_xprt_cong,
1371         .rpcbind                = rpc_getport,
1372         .set_port               = xs_set_port,
1373         .connect                = xs_connect,
1374         .buf_alloc              = rpc_malloc,
1375         .buf_free               = rpc_free,
1376         .send_request           = xs_udp_send_request,
1377         .set_retrans_timeout    = xprt_set_retrans_timeout_rtt,
1378         .timer                  = xs_udp_timer,
1379         .release_request        = xprt_release_rqst_cong,
1380         .close                  = xs_close,
1381         .destroy                = xs_destroy,
1382         .print_stats            = xs_udp_print_stats,
1383 };
1384
1385 static struct rpc_xprt_ops xs_tcp_ops = {
1386         .print_addr             = xs_print_peer_address,
1387         .reserve_xprt           = xprt_reserve_xprt,
1388         .release_xprt           = xs_tcp_release_xprt,
1389         .rpcbind                = rpc_getport,
1390         .set_port               = xs_set_port,
1391         .connect                = xs_connect,
1392         .buf_alloc              = rpc_malloc,
1393         .buf_free               = rpc_free,
1394         .send_request           = xs_tcp_send_request,
1395         .set_retrans_timeout    = xprt_set_retrans_timeout_def,
1396         .close                  = xs_close,
1397         .destroy                = xs_destroy,
1398         .print_stats            = xs_tcp_print_stats,
1399 };
1400
1401 static struct rpc_xprt *xs_setup_xprt(struct sockaddr *addr, size_t addrlen, unsigned int slot_table_size)
1402 {
1403         struct rpc_xprt *xprt;
1404         struct sock_xprt *new;
1405
1406         if (addrlen > sizeof(xprt->addr)) {
1407                 dprintk("RPC:      xs_setup_xprt: address too large\n");
1408                 return ERR_PTR(-EBADF);
1409         }
1410
1411         new = kzalloc(sizeof(*new), GFP_KERNEL);
1412         if (new == NULL) {
1413                 dprintk("RPC:      xs_setup_xprt: couldn't allocate rpc_xprt\n");
1414                 return ERR_PTR(-ENOMEM);
1415         }
1416         xprt = &new->xprt;
1417
1418         xprt->max_reqs = slot_table_size;
1419         xprt->slot = kcalloc(xprt->max_reqs, sizeof(struct rpc_rqst), GFP_KERNEL);
1420         if (xprt->slot == NULL) {
1421                 kfree(xprt);
1422                 dprintk("RPC:      xs_setup_xprt: couldn't allocate slot table\n");
1423                 return ERR_PTR(-ENOMEM);
1424         }
1425
1426         memcpy(&xprt->addr, addr, addrlen);
1427         xprt->addrlen = addrlen;
1428         new->port = xs_get_random_port();
1429
1430         return xprt;
1431 }
1432
1433 /**
1434  * xs_setup_udp - Set up transport to use a UDP socket
1435  * @addr: address of remote server
1436  * @addrlen: length of address in bytes
1437  * @to:   timeout parameters
1438  *
1439  */
1440 struct rpc_xprt *xs_setup_udp(struct sockaddr *addr, size_t addrlen, struct rpc_timeout *to)
1441 {
1442         struct rpc_xprt *xprt;
1443         struct sock_xprt *transport;
1444
1445         xprt = xs_setup_xprt(addr, addrlen, xprt_udp_slot_table_entries);
1446         if (IS_ERR(xprt))
1447                 return xprt;
1448         transport = container_of(xprt, struct sock_xprt, xprt);
1449
1450         if (ntohs(((struct sockaddr_in *)addr)->sin_port) != 0)
1451                 xprt_set_bound(xprt);
1452
1453         xprt->prot = IPPROTO_UDP;
1454         xprt->tsh_size = 0;
1455         /* XXX: header size can vary due to auth type, IPv6, etc. */
1456         xprt->max_payload = (1U << 16) - (MAX_HEADER << 3);
1457
1458         INIT_WORK(&transport->connect_worker, xs_udp_connect_worker, transport);
1459         xprt->bind_timeout = XS_BIND_TO;
1460         xprt->connect_timeout = XS_UDP_CONN_TO;
1461         xprt->reestablish_timeout = XS_UDP_REEST_TO;
1462         xprt->idle_timeout = XS_IDLE_DISC_TO;
1463
1464         xprt->ops = &xs_udp_ops;
1465
1466         if (to)
1467                 xprt->timeout = *to;
1468         else
1469                 xprt_set_timeout(&xprt->timeout, 5, 5 * HZ);
1470
1471         xs_format_peer_addresses(xprt);
1472         dprintk("RPC:      set up transport to address %s\n",
1473                         xs_print_peer_address(xprt, RPC_DISPLAY_ALL));
1474
1475         return xprt;
1476 }
1477
1478 /**
1479  * xs_setup_tcp - Set up transport to use a TCP socket
1480  * @addr: address of remote server
1481  * @addrlen: length of address in bytes
1482  * @to: timeout parameters
1483  *
1484  */
1485 struct rpc_xprt *xs_setup_tcp(struct sockaddr *addr, size_t addrlen, struct rpc_timeout *to)
1486 {
1487         struct rpc_xprt *xprt;
1488         struct sock_xprt *transport;
1489
1490         xprt = xs_setup_xprt(addr, addrlen, xprt_tcp_slot_table_entries);
1491         if (IS_ERR(xprt))
1492                 return xprt;
1493         transport = container_of(xprt, struct sock_xprt, xprt);
1494
1495         if (ntohs(((struct sockaddr_in *)addr)->sin_port) != 0)
1496                 xprt_set_bound(xprt);
1497
1498         xprt->prot = IPPROTO_TCP;
1499         xprt->tsh_size = sizeof(rpc_fraghdr) / sizeof(u32);
1500         xprt->max_payload = RPC_MAX_FRAGMENT_SIZE;
1501
1502         INIT_WORK(&transport->connect_worker, xs_tcp_connect_worker, transport);
1503         xprt->bind_timeout = XS_BIND_TO;
1504         xprt->connect_timeout = XS_TCP_CONN_TO;
1505         xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO;
1506         xprt->idle_timeout = XS_IDLE_DISC_TO;
1507
1508         xprt->ops = &xs_tcp_ops;
1509
1510         if (to)
1511                 xprt->timeout = *to;
1512         else
1513                 xprt_set_timeout(&xprt->timeout, 2, 60 * HZ);
1514
1515         xs_format_peer_addresses(xprt);
1516         dprintk("RPC:      set up transport to address %s\n",
1517                         xs_print_peer_address(xprt, RPC_DISPLAY_ALL));
1518
1519         return xprt;
1520 }