]> git.karo-electronics.de Git - karo-tx-linux.git/blob - net/sunrpc/xprt.c
SUNRPC: provide a mechanism for collecting stats in the RPC client
[karo-tx-linux.git] / net / sunrpc / xprt.c
1 /*
2  *  linux/net/sunrpc/xprt.c
3  *
4  *  This is a generic RPC call interface supporting congestion avoidance,
5  *  and asynchronous calls.
6  *
7  *  The interface works like this:
8  *
9  *  -   When a process places a call, it allocates a request slot if
10  *      one is available. Otherwise, it sleeps on the backlog queue
11  *      (xprt_reserve).
12  *  -   Next, the caller puts together the RPC message, stuffs it into
13  *      the request struct, and calls xprt_transmit().
14  *  -   xprt_transmit sends the message and installs the caller on the
15  *      transport's wait list. At the same time, it installs a timer that
16  *      is run after the packet's timeout has expired.
17  *  -   When a packet arrives, the data_ready handler walks the list of
18  *      pending requests for that transport. If a matching XID is found, the
19  *      caller is woken up, and the timer removed.
20  *  -   When no reply arrives within the timeout interval, the timer is
21  *      fired by the kernel and runs xprt_timer(). It either adjusts the
22  *      timeout values (minor timeout) or wakes up the caller with a status
23  *      of -ETIMEDOUT.
24  *  -   When the caller receives a notification from RPC that a reply arrived,
25  *      it should release the RPC slot, and process the reply.
26  *      If the call timed out, it may choose to retry the operation by
27  *      adjusting the initial timeout value, and simply calling rpc_call
28  *      again.
29  *
30  *  Support for async RPC is done through a set of RPC-specific scheduling
31  *  primitives that `transparently' work for processes as well as async
32  *  tasks that rely on callbacks.
33  *
34  *  Copyright (C) 1995-1997, Olaf Kirch <okir@monad.swb.de>
35  *
36  *  Transport switch API copyright (C) 2005, Chuck Lever <cel@netapp.com>
37  */
38
39 #include <linux/module.h>
40
41 #include <linux/types.h>
42 #include <linux/interrupt.h>
43 #include <linux/workqueue.h>
44 #include <linux/random.h>
45
46 #include <linux/sunrpc/clnt.h>
47 #include <linux/sunrpc/metrics.h>
48
49 /*
50  * Local variables
51  */
52
53 #ifdef RPC_DEBUG
54 # undef  RPC_DEBUG_DATA
55 # define RPCDBG_FACILITY        RPCDBG_XPRT
56 #endif
57
58 /*
59  * Local functions
60  */
61 static void     xprt_request_init(struct rpc_task *, struct rpc_xprt *);
62 static inline void      do_xprt_reserve(struct rpc_task *);
63 static void     xprt_connect_status(struct rpc_task *task);
64 static int      __xprt_get_cong(struct rpc_xprt *, struct rpc_task *);
65
66 /*
67  * The transport code maintains an estimate on the maximum number of out-
68  * standing RPC requests, using a smoothed version of the congestion
69  * avoidance implemented in 44BSD. This is basically the Van Jacobson
70  * congestion algorithm: If a retransmit occurs, the congestion window is
71  * halved; otherwise, it is incremented by 1/cwnd when
72  *
73  *      -       a reply is received and
74  *      -       a full number of requests are outstanding and
75  *      -       the congestion window hasn't been updated recently.
76  */
77 #define RPC_CWNDSHIFT           (8U)
78 #define RPC_CWNDSCALE           (1U << RPC_CWNDSHIFT)
79 #define RPC_INITCWND            RPC_CWNDSCALE
80 #define RPC_MAXCWND(xprt)       ((xprt)->max_reqs << RPC_CWNDSHIFT)
81
82 #define RPCXPRT_CONGESTED(xprt) ((xprt)->cong >= (xprt)->cwnd)
83
84 /**
85  * xprt_reserve_xprt - serialize write access to transports
86  * @task: task that is requesting access to the transport
87  *
88  * This prevents mixing the payload of separate requests, and prevents
89  * transport connects from colliding with writes.  No congestion control
90  * is provided.
91  */
92 int xprt_reserve_xprt(struct rpc_task *task)
93 {
94         struct rpc_xprt *xprt = task->tk_xprt;
95         struct rpc_rqst *req = task->tk_rqstp;
96
97         if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) {
98                 if (task == xprt->snd_task)
99                         return 1;
100                 if (task == NULL)
101                         return 0;
102                 goto out_sleep;
103         }
104         xprt->snd_task = task;
105         if (req) {
106                 req->rq_bytes_sent = 0;
107                 req->rq_ntrans++;
108         }
109         return 1;
110
111 out_sleep:
112         dprintk("RPC: %4d failed to lock transport %p\n",
113                         task->tk_pid, xprt);
114         task->tk_timeout = 0;
115         task->tk_status = -EAGAIN;
116         if (req && req->rq_ntrans)
117                 rpc_sleep_on(&xprt->resend, task, NULL, NULL);
118         else
119                 rpc_sleep_on(&xprt->sending, task, NULL, NULL);
120         return 0;
121 }
122
123 static void xprt_clear_locked(struct rpc_xprt *xprt)
124 {
125         xprt->snd_task = NULL;
126         if (!test_bit(XPRT_CLOSE_WAIT, &xprt->state) || xprt->shutdown) {
127                 smp_mb__before_clear_bit();
128                 clear_bit(XPRT_LOCKED, &xprt->state);
129                 smp_mb__after_clear_bit();
130         } else
131                 schedule_work(&xprt->task_cleanup);
132 }
133
134 /*
135  * xprt_reserve_xprt_cong - serialize write access to transports
136  * @task: task that is requesting access to the transport
137  *
138  * Same as xprt_reserve_xprt, but Van Jacobson congestion control is
139  * integrated into the decision of whether a request is allowed to be
140  * woken up and given access to the transport.
141  */
142 int xprt_reserve_xprt_cong(struct rpc_task *task)
143 {
144         struct rpc_xprt *xprt = task->tk_xprt;
145         struct rpc_rqst *req = task->tk_rqstp;
146
147         if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) {
148                 if (task == xprt->snd_task)
149                         return 1;
150                 goto out_sleep;
151         }
152         if (__xprt_get_cong(xprt, task)) {
153                 xprt->snd_task = task;
154                 if (req) {
155                         req->rq_bytes_sent = 0;
156                         req->rq_ntrans++;
157                 }
158                 return 1;
159         }
160         xprt_clear_locked(xprt);
161 out_sleep:
162         dprintk("RPC: %4d failed to lock transport %p\n", task->tk_pid, xprt);
163         task->tk_timeout = 0;
164         task->tk_status = -EAGAIN;
165         if (req && req->rq_ntrans)
166                 rpc_sleep_on(&xprt->resend, task, NULL, NULL);
167         else
168                 rpc_sleep_on(&xprt->sending, task, NULL, NULL);
169         return 0;
170 }
171
172 static inline int xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task)
173 {
174         int retval;
175
176         spin_lock_bh(&xprt->transport_lock);
177         retval = xprt->ops->reserve_xprt(task);
178         spin_unlock_bh(&xprt->transport_lock);
179         return retval;
180 }
181
182 static void __xprt_lock_write_next(struct rpc_xprt *xprt)
183 {
184         struct rpc_task *task;
185         struct rpc_rqst *req;
186
187         if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
188                 return;
189
190         task = rpc_wake_up_next(&xprt->resend);
191         if (!task) {
192                 task = rpc_wake_up_next(&xprt->sending);
193                 if (!task)
194                         goto out_unlock;
195         }
196
197         req = task->tk_rqstp;
198         xprt->snd_task = task;
199         if (req) {
200                 req->rq_bytes_sent = 0;
201                 req->rq_ntrans++;
202         }
203         return;
204
205 out_unlock:
206         xprt_clear_locked(xprt);
207 }
208
209 static void __xprt_lock_write_next_cong(struct rpc_xprt *xprt)
210 {
211         struct rpc_task *task;
212
213         if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
214                 return;
215         if (RPCXPRT_CONGESTED(xprt))
216                 goto out_unlock;
217         task = rpc_wake_up_next(&xprt->resend);
218         if (!task) {
219                 task = rpc_wake_up_next(&xprt->sending);
220                 if (!task)
221                         goto out_unlock;
222         }
223         if (__xprt_get_cong(xprt, task)) {
224                 struct rpc_rqst *req = task->tk_rqstp;
225                 xprt->snd_task = task;
226                 if (req) {
227                         req->rq_bytes_sent = 0;
228                         req->rq_ntrans++;
229                 }
230                 return;
231         }
232 out_unlock:
233         xprt_clear_locked(xprt);
234 }
235
236 /**
237  * xprt_release_xprt - allow other requests to use a transport
238  * @xprt: transport with other tasks potentially waiting
239  * @task: task that is releasing access to the transport
240  *
241  * Note that "task" can be NULL.  No congestion control is provided.
242  */
243 void xprt_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task)
244 {
245         if (xprt->snd_task == task) {
246                 xprt_clear_locked(xprt);
247                 __xprt_lock_write_next(xprt);
248         }
249 }
250
251 /**
252  * xprt_release_xprt_cong - allow other requests to use a transport
253  * @xprt: transport with other tasks potentially waiting
254  * @task: task that is releasing access to the transport
255  *
256  * Note that "task" can be NULL.  Another task is awoken to use the
257  * transport if the transport's congestion window allows it.
258  */
259 void xprt_release_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task)
260 {
261         if (xprt->snd_task == task) {
262                 xprt_clear_locked(xprt);
263                 __xprt_lock_write_next_cong(xprt);
264         }
265 }
266
267 static inline void xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task)
268 {
269         spin_lock_bh(&xprt->transport_lock);
270         xprt->ops->release_xprt(xprt, task);
271         spin_unlock_bh(&xprt->transport_lock);
272 }
273
274 /*
275  * Van Jacobson congestion avoidance. Check if the congestion window
276  * overflowed. Put the task to sleep if this is the case.
277  */
278 static int
279 __xprt_get_cong(struct rpc_xprt *xprt, struct rpc_task *task)
280 {
281         struct rpc_rqst *req = task->tk_rqstp;
282
283         if (req->rq_cong)
284                 return 1;
285         dprintk("RPC: %4d xprt_cwnd_limited cong = %ld cwnd = %ld\n",
286                         task->tk_pid, xprt->cong, xprt->cwnd);
287         if (RPCXPRT_CONGESTED(xprt))
288                 return 0;
289         req->rq_cong = 1;
290         xprt->cong += RPC_CWNDSCALE;
291         return 1;
292 }
293
294 /*
295  * Adjust the congestion window, and wake up the next task
296  * that has been sleeping due to congestion
297  */
298 static void
299 __xprt_put_cong(struct rpc_xprt *xprt, struct rpc_rqst *req)
300 {
301         if (!req->rq_cong)
302                 return;
303         req->rq_cong = 0;
304         xprt->cong -= RPC_CWNDSCALE;
305         __xprt_lock_write_next_cong(xprt);
306 }
307
308 /**
309  * xprt_release_rqst_cong - housekeeping when request is complete
310  * @task: RPC request that recently completed
311  *
312  * Useful for transports that require congestion control.
313  */
314 void xprt_release_rqst_cong(struct rpc_task *task)
315 {
316         __xprt_put_cong(task->tk_xprt, task->tk_rqstp);
317 }
318
319 /**
320  * xprt_adjust_cwnd - adjust transport congestion window
321  * @task: recently completed RPC request used to adjust window
322  * @result: result code of completed RPC request
323  *
324  * We use a time-smoothed congestion estimator to avoid heavy oscillation.
325  */
326 void xprt_adjust_cwnd(struct rpc_task *task, int result)
327 {
328         struct rpc_rqst *req = task->tk_rqstp;
329         struct rpc_xprt *xprt = task->tk_xprt;
330         unsigned long cwnd = xprt->cwnd;
331
332         if (result >= 0 && cwnd <= xprt->cong) {
333                 /* The (cwnd >> 1) term makes sure
334                  * the result gets rounded properly. */
335                 cwnd += (RPC_CWNDSCALE * RPC_CWNDSCALE + (cwnd >> 1)) / cwnd;
336                 if (cwnd > RPC_MAXCWND(xprt))
337                         cwnd = RPC_MAXCWND(xprt);
338                 __xprt_lock_write_next_cong(xprt);
339         } else if (result == -ETIMEDOUT) {
340                 cwnd >>= 1;
341                 if (cwnd < RPC_CWNDSCALE)
342                         cwnd = RPC_CWNDSCALE;
343         }
344         dprintk("RPC:      cong %ld, cwnd was %ld, now %ld\n",
345                         xprt->cong, xprt->cwnd, cwnd);
346         xprt->cwnd = cwnd;
347         __xprt_put_cong(xprt, req);
348 }
349
350 /**
351  * xprt_wake_pending_tasks - wake all tasks on a transport's pending queue
352  * @xprt: transport with waiting tasks
353  * @status: result code to plant in each task before waking it
354  *
355  */
356 void xprt_wake_pending_tasks(struct rpc_xprt *xprt, int status)
357 {
358         if (status < 0)
359                 rpc_wake_up_status(&xprt->pending, status);
360         else
361                 rpc_wake_up(&xprt->pending);
362 }
363
364 /**
365  * xprt_wait_for_buffer_space - wait for transport output buffer to clear
366  * @task: task to be put to sleep
367  *
368  */
369 void xprt_wait_for_buffer_space(struct rpc_task *task)
370 {
371         struct rpc_rqst *req = task->tk_rqstp;
372         struct rpc_xprt *xprt = req->rq_xprt;
373
374         task->tk_timeout = req->rq_timeout;
375         rpc_sleep_on(&xprt->pending, task, NULL, NULL);
376 }
377
378 /**
379  * xprt_write_space - wake the task waiting for transport output buffer space
380  * @xprt: transport with waiting tasks
381  *
382  * Can be called in a soft IRQ context, so xprt_write_space never sleeps.
383  */
384 void xprt_write_space(struct rpc_xprt *xprt)
385 {
386         if (unlikely(xprt->shutdown))
387                 return;
388
389         spin_lock_bh(&xprt->transport_lock);
390         if (xprt->snd_task) {
391                 dprintk("RPC:      write space: waking waiting task on xprt %p\n",
392                                 xprt);
393                 rpc_wake_up_task(xprt->snd_task);
394         }
395         spin_unlock_bh(&xprt->transport_lock);
396 }
397
398 /**
399  * xprt_set_retrans_timeout_def - set a request's retransmit timeout
400  * @task: task whose timeout is to be set
401  *
402  * Set a request's retransmit timeout based on the transport's
403  * default timeout parameters.  Used by transports that don't adjust
404  * the retransmit timeout based on round-trip time estimation.
405  */
406 void xprt_set_retrans_timeout_def(struct rpc_task *task)
407 {
408         task->tk_timeout = task->tk_rqstp->rq_timeout;
409 }
410
411 /*
412  * xprt_set_retrans_timeout_rtt - set a request's retransmit timeout
413  * @task: task whose timeout is to be set
414  * 
415  * Set a request's retransmit timeout using the RTT estimator.
416  */
417 void xprt_set_retrans_timeout_rtt(struct rpc_task *task)
418 {
419         int timer = task->tk_msg.rpc_proc->p_timer;
420         struct rpc_rtt *rtt = task->tk_client->cl_rtt;
421         struct rpc_rqst *req = task->tk_rqstp;
422         unsigned long max_timeout = req->rq_xprt->timeout.to_maxval;
423
424         task->tk_timeout = rpc_calc_rto(rtt, timer);
425         task->tk_timeout <<= rpc_ntimeo(rtt, timer) + req->rq_retries;
426         if (task->tk_timeout > max_timeout || task->tk_timeout == 0)
427                 task->tk_timeout = max_timeout;
428 }
429
430 static void xprt_reset_majortimeo(struct rpc_rqst *req)
431 {
432         struct rpc_timeout *to = &req->rq_xprt->timeout;
433
434         req->rq_majortimeo = req->rq_timeout;
435         if (to->to_exponential)
436                 req->rq_majortimeo <<= to->to_retries;
437         else
438                 req->rq_majortimeo += to->to_increment * to->to_retries;
439         if (req->rq_majortimeo > to->to_maxval || req->rq_majortimeo == 0)
440                 req->rq_majortimeo = to->to_maxval;
441         req->rq_majortimeo += jiffies;
442 }
443
444 /**
445  * xprt_adjust_timeout - adjust timeout values for next retransmit
446  * @req: RPC request containing parameters to use for the adjustment
447  *
448  */
449 int xprt_adjust_timeout(struct rpc_rqst *req)
450 {
451         struct rpc_xprt *xprt = req->rq_xprt;
452         struct rpc_timeout *to = &xprt->timeout;
453         int status = 0;
454
455         if (time_before(jiffies, req->rq_majortimeo)) {
456                 if (to->to_exponential)
457                         req->rq_timeout <<= 1;
458                 else
459                         req->rq_timeout += to->to_increment;
460                 if (to->to_maxval && req->rq_timeout >= to->to_maxval)
461                         req->rq_timeout = to->to_maxval;
462                 req->rq_retries++;
463                 pprintk("RPC: %lu retrans\n", jiffies);
464         } else {
465                 req->rq_timeout = to->to_initval;
466                 req->rq_retries = 0;
467                 xprt_reset_majortimeo(req);
468                 /* Reset the RTT counters == "slow start" */
469                 spin_lock_bh(&xprt->transport_lock);
470                 rpc_init_rtt(req->rq_task->tk_client->cl_rtt, to->to_initval);
471                 spin_unlock_bh(&xprt->transport_lock);
472                 pprintk("RPC: %lu timeout\n", jiffies);
473                 status = -ETIMEDOUT;
474         }
475
476         if (req->rq_timeout == 0) {
477                 printk(KERN_WARNING "xprt_adjust_timeout: rq_timeout = 0!\n");
478                 req->rq_timeout = 5 * HZ;
479         }
480         return status;
481 }
482
483 static void xprt_autoclose(void *args)
484 {
485         struct rpc_xprt *xprt = (struct rpc_xprt *)args;
486
487         xprt_disconnect(xprt);
488         xprt->ops->close(xprt);
489         xprt_release_write(xprt, NULL);
490 }
491
492 /**
493  * xprt_disconnect - mark a transport as disconnected
494  * @xprt: transport to flag for disconnect
495  *
496  */
497 void xprt_disconnect(struct rpc_xprt *xprt)
498 {
499         dprintk("RPC:      disconnected transport %p\n", xprt);
500         spin_lock_bh(&xprt->transport_lock);
501         xprt_clear_connected(xprt);
502         xprt_wake_pending_tasks(xprt, -ENOTCONN);
503         spin_unlock_bh(&xprt->transport_lock);
504 }
505
506 static void
507 xprt_init_autodisconnect(unsigned long data)
508 {
509         struct rpc_xprt *xprt = (struct rpc_xprt *)data;
510
511         spin_lock(&xprt->transport_lock);
512         if (!list_empty(&xprt->recv) || xprt->shutdown)
513                 goto out_abort;
514         if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
515                 goto out_abort;
516         spin_unlock(&xprt->transport_lock);
517         if (xprt_connecting(xprt))
518                 xprt_release_write(xprt, NULL);
519         else
520                 schedule_work(&xprt->task_cleanup);
521         return;
522 out_abort:
523         spin_unlock(&xprt->transport_lock);
524 }
525
526 /**
527  * xprt_connect - schedule a transport connect operation
528  * @task: RPC task that is requesting the connect
529  *
530  */
531 void xprt_connect(struct rpc_task *task)
532 {
533         struct rpc_xprt *xprt = task->tk_xprt;
534
535         dprintk("RPC: %4d xprt_connect xprt %p %s connected\n", task->tk_pid,
536                         xprt, (xprt_connected(xprt) ? "is" : "is not"));
537
538         if (!xprt->addr.sin_port) {
539                 task->tk_status = -EIO;
540                 return;
541         }
542         if (!xprt_lock_write(xprt, task))
543                 return;
544         if (xprt_connected(xprt))
545                 xprt_release_write(xprt, task);
546         else {
547                 if (task->tk_rqstp)
548                         task->tk_rqstp->rq_bytes_sent = 0;
549
550                 task->tk_timeout = xprt->connect_timeout;
551                 rpc_sleep_on(&xprt->pending, task, xprt_connect_status, NULL);
552                 xprt->stat.connect_start = jiffies;
553                 xprt->ops->connect(task);
554         }
555         return;
556 }
557
558 static void xprt_connect_status(struct rpc_task *task)
559 {
560         struct rpc_xprt *xprt = task->tk_xprt;
561
562         if (task->tk_status >= 0) {
563                 xprt->stat.connect_count++;
564                 xprt->stat.connect_time += (long)jiffies - xprt->stat.connect_start;
565                 dprintk("RPC: %4d xprt_connect_status: connection established\n",
566                                 task->tk_pid);
567                 return;
568         }
569
570         switch (task->tk_status) {
571         case -ECONNREFUSED:
572         case -ECONNRESET:
573                 dprintk("RPC: %4d xprt_connect_status: server %s refused connection\n",
574                                 task->tk_pid, task->tk_client->cl_server);
575                 break;
576         case -ENOTCONN:
577                 dprintk("RPC: %4d xprt_connect_status: connection broken\n",
578                                 task->tk_pid);
579                 break;
580         case -ETIMEDOUT:
581                 dprintk("RPC: %4d xprt_connect_status: connect attempt timed out\n",
582                                 task->tk_pid);
583                 break;
584         default:
585                 dprintk("RPC: %4d xprt_connect_status: error %d connecting to server %s\n",
586                                 task->tk_pid, -task->tk_status, task->tk_client->cl_server);
587                 xprt_release_write(xprt, task);
588                 task->tk_status = -EIO;
589                 return;
590         }
591
592         /* if soft mounted, just cause this RPC to fail */
593         if (RPC_IS_SOFT(task)) {
594                 xprt_release_write(xprt, task);
595                 task->tk_status = -EIO;
596         }
597 }
598
599 /**
600  * xprt_lookup_rqst - find an RPC request corresponding to an XID
601  * @xprt: transport on which the original request was transmitted
602  * @xid: RPC XID of incoming reply
603  *
604  */
605 struct rpc_rqst *xprt_lookup_rqst(struct rpc_xprt *xprt, u32 xid)
606 {
607         struct list_head *pos;
608
609         list_for_each(pos, &xprt->recv) {
610                 struct rpc_rqst *entry = list_entry(pos, struct rpc_rqst, rq_list);
611                 if (entry->rq_xid == xid)
612                         return entry;
613         }
614         xprt->stat.bad_xids++;
615         return NULL;
616 }
617
618 /**
619  * xprt_update_rtt - update an RPC client's RTT state after receiving a reply
620  * @task: RPC request that recently completed
621  *
622  */
623 void xprt_update_rtt(struct rpc_task *task)
624 {
625         struct rpc_rqst *req = task->tk_rqstp;
626         struct rpc_rtt *rtt = task->tk_client->cl_rtt;
627         unsigned timer = task->tk_msg.rpc_proc->p_timer;
628
629         if (timer) {
630                 if (req->rq_ntrans == 1)
631                         rpc_update_rtt(rtt, timer,
632                                         (long)jiffies - req->rq_xtime);
633                 rpc_set_timeo(rtt, timer, req->rq_ntrans - 1);
634         }
635 }
636
637 /**
638  * xprt_complete_rqst - called when reply processing is complete
639  * @task: RPC request that recently completed
640  * @copied: actual number of bytes received from the transport
641  *
642  * Caller holds transport lock.
643  */
644 void xprt_complete_rqst(struct rpc_task *task, int copied)
645 {
646         struct rpc_rqst *req = task->tk_rqstp;
647
648         dprintk("RPC: %5u xid %08x complete (%d bytes received)\n",
649                         task->tk_pid, ntohl(req->rq_xid), copied);
650
651         task->tk_xprt->stat.recvs++;
652         task->tk_rtt = (long)jiffies - req->rq_xtime;
653
654         list_del_init(&req->rq_list);
655         req->rq_received = req->rq_private_buf.len = copied;
656         rpc_wake_up_task(task);
657 }
658
659 static void xprt_timer(struct rpc_task *task)
660 {
661         struct rpc_rqst *req = task->tk_rqstp;
662         struct rpc_xprt *xprt = req->rq_xprt;
663
664         dprintk("RPC: %4d xprt_timer\n", task->tk_pid);
665
666         spin_lock(&xprt->transport_lock);
667         if (!req->rq_received) {
668                 if (xprt->ops->timer)
669                         xprt->ops->timer(task);
670                 task->tk_status = -ETIMEDOUT;
671         }
672         task->tk_timeout = 0;
673         rpc_wake_up_task(task);
674         spin_unlock(&xprt->transport_lock);
675 }
676
677 /**
678  * xprt_prepare_transmit - reserve the transport before sending a request
679  * @task: RPC task about to send a request
680  *
681  */
682 int xprt_prepare_transmit(struct rpc_task *task)
683 {
684         struct rpc_rqst *req = task->tk_rqstp;
685         struct rpc_xprt *xprt = req->rq_xprt;
686         int err = 0;
687
688         dprintk("RPC: %4d xprt_prepare_transmit\n", task->tk_pid);
689
690         spin_lock_bh(&xprt->transport_lock);
691         if (req->rq_received && !req->rq_bytes_sent) {
692                 err = req->rq_received;
693                 goto out_unlock;
694         }
695         if (!xprt->ops->reserve_xprt(task)) {
696                 err = -EAGAIN;
697                 goto out_unlock;
698         }
699
700         if (!xprt_connected(xprt)) {
701                 err = -ENOTCONN;
702                 goto out_unlock;
703         }
704 out_unlock:
705         spin_unlock_bh(&xprt->transport_lock);
706         return err;
707 }
708
709 void
710 xprt_abort_transmit(struct rpc_task *task)
711 {
712         struct rpc_xprt *xprt = task->tk_xprt;
713
714         xprt_release_write(xprt, task);
715 }
716
717 /**
718  * xprt_transmit - send an RPC request on a transport
719  * @task: controlling RPC task
720  *
721  * We have to copy the iovec because sendmsg fiddles with its contents.
722  */
723 void xprt_transmit(struct rpc_task *task)
724 {
725         struct rpc_rqst *req = task->tk_rqstp;
726         struct rpc_xprt *xprt = req->rq_xprt;
727         int status;
728
729         dprintk("RPC: %4d xprt_transmit(%u)\n", task->tk_pid, req->rq_slen);
730
731         smp_rmb();
732         if (!req->rq_received) {
733                 if (list_empty(&req->rq_list)) {
734                         spin_lock_bh(&xprt->transport_lock);
735                         /* Update the softirq receive buffer */
736                         memcpy(&req->rq_private_buf, &req->rq_rcv_buf,
737                                         sizeof(req->rq_private_buf));
738                         /* Add request to the receive list */
739                         list_add_tail(&req->rq_list, &xprt->recv);
740                         spin_unlock_bh(&xprt->transport_lock);
741                         xprt_reset_majortimeo(req);
742                         /* Turn off autodisconnect */
743                         del_singleshot_timer_sync(&xprt->timer);
744                 }
745         } else if (!req->rq_bytes_sent)
746                 return;
747
748         status = xprt->ops->send_request(task);
749         if (status == 0) {
750                 dprintk("RPC: %4d xmit complete\n", task->tk_pid);
751                 spin_lock_bh(&xprt->transport_lock);
752
753                 xprt->ops->set_retrans_timeout(task);
754
755                 xprt->stat.sends++;
756                 xprt->stat.req_u += xprt->stat.sends - xprt->stat.recvs;
757                 xprt->stat.bklog_u += xprt->backlog.qlen;
758
759                 /* Don't race with disconnect */
760                 if (!xprt_connected(xprt))
761                         task->tk_status = -ENOTCONN;
762                 else if (!req->rq_received)
763                         rpc_sleep_on(&xprt->pending, task, NULL, xprt_timer);
764
765                 xprt->ops->release_xprt(xprt, task);
766                 spin_unlock_bh(&xprt->transport_lock);
767                 return;
768         }
769
770         /* Note: at this point, task->tk_sleeping has not yet been set,
771          *       hence there is no danger of the waking up task being put on
772          *       schedq, and being picked up by a parallel run of rpciod().
773          */
774         task->tk_status = status;
775
776         switch (status) {
777         case -ECONNREFUSED:
778                 rpc_sleep_on(&xprt->sending, task, NULL, NULL);
779         case -EAGAIN:
780         case -ENOTCONN:
781                 return;
782         default:
783                 break;
784         }
785         xprt_release_write(xprt, task);
786         return;
787 }
788
789 static inline void do_xprt_reserve(struct rpc_task *task)
790 {
791         struct rpc_xprt *xprt = task->tk_xprt;
792
793         task->tk_status = 0;
794         if (task->tk_rqstp)
795                 return;
796         if (!list_empty(&xprt->free)) {
797                 struct rpc_rqst *req = list_entry(xprt->free.next, struct rpc_rqst, rq_list);
798                 list_del_init(&req->rq_list);
799                 task->tk_rqstp = req;
800                 xprt_request_init(task, xprt);
801                 return;
802         }
803         dprintk("RPC:      waiting for request slot\n");
804         task->tk_status = -EAGAIN;
805         task->tk_timeout = 0;
806         rpc_sleep_on(&xprt->backlog, task, NULL, NULL);
807 }
808
809 /**
810  * xprt_reserve - allocate an RPC request slot
811  * @task: RPC task requesting a slot allocation
812  *
813  * If no more slots are available, place the task on the transport's
814  * backlog queue.
815  */
816 void xprt_reserve(struct rpc_task *task)
817 {
818         struct rpc_xprt *xprt = task->tk_xprt;
819
820         task->tk_status = -EIO;
821         spin_lock(&xprt->reserve_lock);
822         do_xprt_reserve(task);
823         spin_unlock(&xprt->reserve_lock);
824 }
825
826 static inline u32 xprt_alloc_xid(struct rpc_xprt *xprt)
827 {
828         return xprt->xid++;
829 }
830
831 static inline void xprt_init_xid(struct rpc_xprt *xprt)
832 {
833         get_random_bytes(&xprt->xid, sizeof(xprt->xid));
834 }
835
836 static void xprt_request_init(struct rpc_task *task, struct rpc_xprt *xprt)
837 {
838         struct rpc_rqst *req = task->tk_rqstp;
839
840         req->rq_timeout = xprt->timeout.to_initval;
841         req->rq_task    = task;
842         req->rq_xprt    = xprt;
843         req->rq_buffer  = NULL;
844         req->rq_bufsize = 0;
845         req->rq_xid     = xprt_alloc_xid(xprt);
846         req->rq_release_snd_buf = NULL;
847         dprintk("RPC: %4d reserved req %p xid %08x\n", task->tk_pid,
848                         req, ntohl(req->rq_xid));
849 }
850
851 /**
852  * xprt_release - release an RPC request slot
853  * @task: task which is finished with the slot
854  *
855  */
856 void xprt_release(struct rpc_task *task)
857 {
858         struct rpc_xprt *xprt = task->tk_xprt;
859         struct rpc_rqst *req;
860
861         if (!(req = task->tk_rqstp))
862                 return;
863         rpc_count_iostats(task);
864         spin_lock_bh(&xprt->transport_lock);
865         xprt->ops->release_xprt(xprt, task);
866         if (xprt->ops->release_request)
867                 xprt->ops->release_request(task);
868         if (!list_empty(&req->rq_list))
869                 list_del(&req->rq_list);
870         xprt->last_used = jiffies;
871         if (list_empty(&xprt->recv))
872                 mod_timer(&xprt->timer,
873                                 xprt->last_used + xprt->idle_timeout);
874         spin_unlock_bh(&xprt->transport_lock);
875         xprt->ops->buf_free(task);
876         task->tk_rqstp = NULL;
877         if (req->rq_release_snd_buf)
878                 req->rq_release_snd_buf(req);
879         memset(req, 0, sizeof(*req));   /* mark unused */
880
881         dprintk("RPC: %4d release request %p\n", task->tk_pid, req);
882
883         spin_lock(&xprt->reserve_lock);
884         list_add(&req->rq_list, &xprt->free);
885         rpc_wake_up_next(&xprt->backlog);
886         spin_unlock(&xprt->reserve_lock);
887 }
888
889 /**
890  * xprt_set_timeout - set constant RPC timeout
891  * @to: RPC timeout parameters to set up
892  * @retr: number of retries
893  * @incr: amount of increase after each retry
894  *
895  */
896 void xprt_set_timeout(struct rpc_timeout *to, unsigned int retr, unsigned long incr)
897 {
898         to->to_initval   = 
899         to->to_increment = incr;
900         to->to_maxval    = to->to_initval + (incr * retr);
901         to->to_retries   = retr;
902         to->to_exponential = 0;
903 }
904
905 static struct rpc_xprt *xprt_setup(int proto, struct sockaddr_in *ap, struct rpc_timeout *to)
906 {
907         int result;
908         struct rpc_xprt *xprt;
909         struct rpc_rqst *req;
910
911         if ((xprt = kmalloc(sizeof(struct rpc_xprt), GFP_KERNEL)) == NULL)
912                 return ERR_PTR(-ENOMEM);
913         memset(xprt, 0, sizeof(*xprt)); /* Nnnngh! */
914
915         xprt->addr = *ap;
916
917         switch (proto) {
918         case IPPROTO_UDP:
919                 result = xs_setup_udp(xprt, to);
920                 break;
921         case IPPROTO_TCP:
922                 result = xs_setup_tcp(xprt, to);
923                 break;
924         default:
925                 printk(KERN_ERR "RPC: unrecognized transport protocol: %d\n",
926                                 proto);
927                 result = -EIO;
928                 break;
929         }
930         if (result) {
931                 kfree(xprt);
932                 return ERR_PTR(result);
933         }
934
935         spin_lock_init(&xprt->transport_lock);
936         spin_lock_init(&xprt->reserve_lock);
937
938         INIT_LIST_HEAD(&xprt->free);
939         INIT_LIST_HEAD(&xprt->recv);
940         INIT_WORK(&xprt->task_cleanup, xprt_autoclose, xprt);
941         init_timer(&xprt->timer);
942         xprt->timer.function = xprt_init_autodisconnect;
943         xprt->timer.data = (unsigned long) xprt;
944         xprt->last_used = jiffies;
945         xprt->cwnd = RPC_INITCWND;
946
947         rpc_init_wait_queue(&xprt->pending, "xprt_pending");
948         rpc_init_wait_queue(&xprt->sending, "xprt_sending");
949         rpc_init_wait_queue(&xprt->resend, "xprt_resend");
950         rpc_init_priority_wait_queue(&xprt->backlog, "xprt_backlog");
951
952         /* initialize free list */
953         for (req = &xprt->slot[xprt->max_reqs-1]; req >= &xprt->slot[0]; req--)
954                 list_add(&req->rq_list, &xprt->free);
955
956         xprt_init_xid(xprt);
957
958         dprintk("RPC:      created transport %p with %u slots\n", xprt,
959                         xprt->max_reqs);
960         
961         return xprt;
962 }
963
964 /**
965  * xprt_create_proto - create an RPC client transport
966  * @proto: requested transport protocol
967  * @sap: remote peer's address
968  * @to: timeout parameters for new transport
969  *
970  */
971 struct rpc_xprt *xprt_create_proto(int proto, struct sockaddr_in *sap, struct rpc_timeout *to)
972 {
973         struct rpc_xprt *xprt;
974
975         xprt = xprt_setup(proto, sap, to);
976         if (IS_ERR(xprt))
977                 dprintk("RPC:      xprt_create_proto failed\n");
978         else
979                 dprintk("RPC:      xprt_create_proto created xprt %p\n", xprt);
980         return xprt;
981 }
982
983 /**
984  * xprt_destroy - destroy an RPC transport, killing off all requests.
985  * @xprt: transport to destroy
986  *
987  */
988 int xprt_destroy(struct rpc_xprt *xprt)
989 {
990         dprintk("RPC:      destroying transport %p\n", xprt);
991         xprt->shutdown = 1;
992         del_timer_sync(&xprt->timer);
993         xprt->ops->destroy(xprt);
994         kfree(xprt);
995
996         return 0;
997 }