]> git.karo-electronics.de Git - karo-tx-linux.git/blob - net/sunrpc/clnt.c
SUNRPC: Ensure that rpc_mkpipe returns a refcounted dentry
[karo-tx-linux.git] / net / sunrpc / clnt.c
1 /*
2  *  linux/net/sunrpc/clnt.c
3  *
4  *  This file contains the high-level RPC interface.
5  *  It is modeled as a finite state machine to support both synchronous
6  *  and asynchronous requests.
7  *
8  *  -   RPC header generation and argument serialization.
9  *  -   Credential refresh.
10  *  -   TCP connect handling.
11  *  -   Retry of operation when it is suspected the operation failed because
12  *      of uid squashing on the server, or when the credentials were stale
13  *      and need to be refreshed, or when a packet was damaged in transit.
14  *      This may be have to be moved to the VFS layer.
15  *
16  *  NB: BSD uses a more intelligent approach to guessing when a request
17  *  or reply has been lost by keeping the RTO estimate for each procedure.
18  *  We currently make do with a constant timeout value.
19  *
20  *  Copyright (C) 1992,1993 Rick Sladkey <jrs@world.std.com>
21  *  Copyright (C) 1995,1996 Olaf Kirch <okir@monad.swb.de>
22  */
23
24 #include <asm/system.h>
25
26 #include <linux/module.h>
27 #include <linux/types.h>
28 #include <linux/mm.h>
29 #include <linux/slab.h>
30 #include <linux/utsname.h>
31
32 #include <linux/sunrpc/clnt.h>
33 #include <linux/workqueue.h>
34 #include <linux/sunrpc/rpc_pipe_fs.h>
35
36 #include <linux/nfs.h>
37
38
39 #define RPC_SLACK_SPACE         (1024)  /* total overkill */
40
41 #ifdef RPC_DEBUG
42 # define RPCDBG_FACILITY        RPCDBG_CALL
43 #endif
44
45 static DECLARE_WAIT_QUEUE_HEAD(destroy_wait);
46
47
48 static void     call_start(struct rpc_task *task);
49 static void     call_reserve(struct rpc_task *task);
50 static void     call_reserveresult(struct rpc_task *task);
51 static void     call_allocate(struct rpc_task *task);
52 static void     call_encode(struct rpc_task *task);
53 static void     call_decode(struct rpc_task *task);
54 static void     call_bind(struct rpc_task *task);
55 static void     call_bind_status(struct rpc_task *task);
56 static void     call_transmit(struct rpc_task *task);
57 static void     call_status(struct rpc_task *task);
58 static void     call_transmit_status(struct rpc_task *task);
59 static void     call_refresh(struct rpc_task *task);
60 static void     call_refreshresult(struct rpc_task *task);
61 static void     call_timeout(struct rpc_task *task);
62 static void     call_connect(struct rpc_task *task);
63 static void     call_connect_status(struct rpc_task *task);
64 static u32 *    call_header(struct rpc_task *task);
65 static u32 *    call_verify(struct rpc_task *task);
66
67
68 static int
69 rpc_setup_pipedir(struct rpc_clnt *clnt, char *dir_name)
70 {
71         static uint32_t clntid;
72         int error;
73
74         if (dir_name == NULL)
75                 return 0;
76         for (;;) {
77                 snprintf(clnt->cl_pathname, sizeof(clnt->cl_pathname),
78                                 "%s/clnt%x", dir_name,
79                                 (unsigned int)clntid++);
80                 clnt->cl_pathname[sizeof(clnt->cl_pathname) - 1] = '\0';
81                 clnt->cl_dentry = rpc_mkdir(clnt->cl_pathname, clnt);
82                 if (!IS_ERR(clnt->cl_dentry))
83                         return 0;
84                 error = PTR_ERR(clnt->cl_dentry);
85                 if (error != -EEXIST) {
86                         printk(KERN_INFO "RPC: Couldn't create pipefs entry %s, error %d\n",
87                                         clnt->cl_pathname, error);
88                         return error;
89                 }
90         }
91 }
92
93 /*
94  * Create an RPC client
95  * FIXME: This should also take a flags argument (as in task->tk_flags).
96  * It's called (among others) from pmap_create_client, which may in
97  * turn be called by an async task. In this case, rpciod should not be
98  * made to sleep too long.
99  */
100 struct rpc_clnt *
101 rpc_new_client(struct rpc_xprt *xprt, char *servname,
102                   struct rpc_program *program, u32 vers,
103                   rpc_authflavor_t flavor)
104 {
105         struct rpc_version      *version;
106         struct rpc_clnt         *clnt = NULL;
107         struct rpc_auth         *auth;
108         int err;
109         int len;
110
111         dprintk("RPC: creating %s client for %s (xprt %p)\n",
112                 program->name, servname, xprt);
113
114         err = -EINVAL;
115         if (!xprt)
116                 goto out_no_xprt;
117         if (vers >= program->nrvers || !(version = program->version[vers]))
118                 goto out_err;
119
120         err = -ENOMEM;
121         clnt = kmalloc(sizeof(*clnt), GFP_KERNEL);
122         if (!clnt)
123                 goto out_err;
124         memset(clnt, 0, sizeof(*clnt));
125         atomic_set(&clnt->cl_users, 0);
126         atomic_set(&clnt->cl_count, 1);
127         clnt->cl_parent = clnt;
128
129         clnt->cl_server = clnt->cl_inline_name;
130         len = strlen(servname) + 1;
131         if (len > sizeof(clnt->cl_inline_name)) {
132                 char *buf = kmalloc(len, GFP_KERNEL);
133                 if (buf != 0)
134                         clnt->cl_server = buf;
135                 else
136                         len = sizeof(clnt->cl_inline_name);
137         }
138         strlcpy(clnt->cl_server, servname, len);
139
140         clnt->cl_xprt     = xprt;
141         clnt->cl_procinfo = version->procs;
142         clnt->cl_maxproc  = version->nrprocs;
143         clnt->cl_protname = program->name;
144         clnt->cl_pmap     = &clnt->cl_pmap_default;
145         clnt->cl_port     = xprt->addr.sin_port;
146         clnt->cl_prog     = program->number;
147         clnt->cl_vers     = version->number;
148         clnt->cl_prot     = xprt->prot;
149         clnt->cl_stats    = program->stats;
150         rpc_init_wait_queue(&clnt->cl_pmap_default.pm_bindwait, "bindwait");
151
152         if (!clnt->cl_port)
153                 clnt->cl_autobind = 1;
154
155         clnt->cl_rtt = &clnt->cl_rtt_default;
156         rpc_init_rtt(&clnt->cl_rtt_default, xprt->timeout.to_initval);
157
158         err = rpc_setup_pipedir(clnt, program->pipe_dir_name);
159         if (err < 0)
160                 goto out_no_path;
161
162         auth = rpcauth_create(flavor, clnt);
163         if (IS_ERR(auth)) {
164                 printk(KERN_INFO "RPC: Couldn't create auth handle (flavor %u)\n",
165                                 flavor);
166                 err = PTR_ERR(auth);
167                 goto out_no_auth;
168         }
169
170         /* save the nodename */
171         clnt->cl_nodelen = strlen(system_utsname.nodename);
172         if (clnt->cl_nodelen > UNX_MAXNODENAME)
173                 clnt->cl_nodelen = UNX_MAXNODENAME;
174         memcpy(clnt->cl_nodename, system_utsname.nodename, clnt->cl_nodelen);
175         return clnt;
176
177 out_no_auth:
178         rpc_rmdir(clnt->cl_pathname);
179 out_no_path:
180         if (clnt->cl_server != clnt->cl_inline_name)
181                 kfree(clnt->cl_server);
182         kfree(clnt);
183 out_err:
184         xprt_destroy(xprt);
185 out_no_xprt:
186         return ERR_PTR(err);
187 }
188
189 /**
190  * Create an RPC client
191  * @xprt - pointer to xprt struct
192  * @servname - name of server
193  * @info - rpc_program
194  * @version - rpc_program version
195  * @authflavor - rpc_auth flavour to use
196  *
197  * Creates an RPC client structure, then pings the server in order to
198  * determine if it is up, and if it supports this program and version.
199  *
200  * This function should never be called by asynchronous tasks such as
201  * the portmapper.
202  */
203 struct rpc_clnt *rpc_create_client(struct rpc_xprt *xprt, char *servname,
204                 struct rpc_program *info, u32 version, rpc_authflavor_t authflavor)
205 {
206         struct rpc_clnt *clnt;
207         int err;
208         
209         clnt = rpc_new_client(xprt, servname, info, version, authflavor);
210         if (IS_ERR(clnt))
211                 return clnt;
212         err = rpc_ping(clnt, RPC_TASK_SOFT|RPC_TASK_NOINTR);
213         if (err == 0)
214                 return clnt;
215         rpc_shutdown_client(clnt);
216         return ERR_PTR(err);
217 }
218
219 /*
220  * This function clones the RPC client structure. It allows us to share the
221  * same transport while varying parameters such as the authentication
222  * flavour.
223  */
224 struct rpc_clnt *
225 rpc_clone_client(struct rpc_clnt *clnt)
226 {
227         struct rpc_clnt *new;
228
229         new = kmalloc(sizeof(*new), GFP_KERNEL);
230         if (!new)
231                 goto out_no_clnt;
232         memcpy(new, clnt, sizeof(*new));
233         atomic_set(&new->cl_count, 1);
234         atomic_set(&new->cl_users, 0);
235         new->cl_parent = clnt;
236         atomic_inc(&clnt->cl_count);
237         /* Duplicate portmapper */
238         rpc_init_wait_queue(&new->cl_pmap_default.pm_bindwait, "bindwait");
239         /* Turn off autobind on clones */
240         new->cl_autobind = 0;
241         new->cl_oneshot = 0;
242         new->cl_dead = 0;
243         dget(new->cl_dentry);
244         rpc_init_rtt(&new->cl_rtt_default, clnt->cl_xprt->timeout.to_initval);
245         if (new->cl_auth)
246                 atomic_inc(&new->cl_auth->au_count);
247         new->cl_pmap            = &new->cl_pmap_default;
248         rpc_init_wait_queue(&new->cl_pmap_default.pm_bindwait, "bindwait");
249         return new;
250 out_no_clnt:
251         printk(KERN_INFO "RPC: out of memory in %s\n", __FUNCTION__);
252         return ERR_PTR(-ENOMEM);
253 }
254
255 /*
256  * Properly shut down an RPC client, terminating all outstanding
257  * requests. Note that we must be certain that cl_oneshot and
258  * cl_dead are cleared, or else the client would be destroyed
259  * when the last task releases it.
260  */
261 int
262 rpc_shutdown_client(struct rpc_clnt *clnt)
263 {
264         dprintk("RPC: shutting down %s client for %s, tasks=%d\n",
265                         clnt->cl_protname, clnt->cl_server,
266                         atomic_read(&clnt->cl_users));
267
268         while (atomic_read(&clnt->cl_users) > 0) {
269                 /* Don't let rpc_release_client destroy us */
270                 clnt->cl_oneshot = 0;
271                 clnt->cl_dead = 0;
272                 rpc_killall_tasks(clnt);
273                 wait_event_timeout(destroy_wait,
274                         !atomic_read(&clnt->cl_users), 1*HZ);
275         }
276
277         if (atomic_read(&clnt->cl_users) < 0) {
278                 printk(KERN_ERR "RPC: rpc_shutdown_client clnt %p tasks=%d\n",
279                                 clnt, atomic_read(&clnt->cl_users));
280 #ifdef RPC_DEBUG
281                 rpc_show_tasks();
282 #endif
283                 BUG();
284         }
285
286         return rpc_destroy_client(clnt);
287 }
288
289 /*
290  * Delete an RPC client
291  */
292 int
293 rpc_destroy_client(struct rpc_clnt *clnt)
294 {
295         if (!atomic_dec_and_test(&clnt->cl_count))
296                 return 1;
297         BUG_ON(atomic_read(&clnt->cl_users) != 0);
298
299         dprintk("RPC: destroying %s client for %s\n",
300                         clnt->cl_protname, clnt->cl_server);
301         if (clnt->cl_auth) {
302                 rpcauth_destroy(clnt->cl_auth);
303                 clnt->cl_auth = NULL;
304         }
305         if (clnt->cl_parent != clnt) {
306                 rpc_destroy_client(clnt->cl_parent);
307                 goto out_free;
308         }
309         if (clnt->cl_pathname[0])
310                 rpc_rmdir(clnt->cl_pathname);
311         if (clnt->cl_xprt) {
312                 xprt_destroy(clnt->cl_xprt);
313                 clnt->cl_xprt = NULL;
314         }
315         if (clnt->cl_server != clnt->cl_inline_name)
316                 kfree(clnt->cl_server);
317 out_free:
318         if (clnt->cl_dentry)
319                 dput(clnt->cl_dentry);
320         kfree(clnt);
321         return 0;
322 }
323
324 /*
325  * Release an RPC client
326  */
327 void
328 rpc_release_client(struct rpc_clnt *clnt)
329 {
330         dprintk("RPC:      rpc_release_client(%p, %d)\n",
331                                 clnt, atomic_read(&clnt->cl_users));
332
333         if (!atomic_dec_and_test(&clnt->cl_users))
334                 return;
335         wake_up(&destroy_wait);
336         if (clnt->cl_oneshot || clnt->cl_dead)
337                 rpc_destroy_client(clnt);
338 }
339
340 /**
341  * rpc_bind_new_program - bind a new RPC program to an existing client
342  * @old - old rpc_client
343  * @program - rpc program to set
344  * @vers - rpc program version
345  *
346  * Clones the rpc client and sets up a new RPC program. This is mainly
347  * of use for enabling different RPC programs to share the same transport.
348  * The Sun NFSv2/v3 ACL protocol can do this.
349  */
350 struct rpc_clnt *rpc_bind_new_program(struct rpc_clnt *old,
351                                       struct rpc_program *program,
352                                       int vers)
353 {
354         struct rpc_clnt *clnt;
355         struct rpc_version *version;
356         int err;
357
358         BUG_ON(vers >= program->nrvers || !program->version[vers]);
359         version = program->version[vers];
360         clnt = rpc_clone_client(old);
361         if (IS_ERR(clnt))
362                 goto out;
363         clnt->cl_procinfo = version->procs;
364         clnt->cl_maxproc  = version->nrprocs;
365         clnt->cl_protname = program->name;
366         clnt->cl_prog     = program->number;
367         clnt->cl_vers     = version->number;
368         clnt->cl_stats    = program->stats;
369         err = rpc_ping(clnt, RPC_TASK_SOFT|RPC_TASK_NOINTR);
370         if (err != 0) {
371                 rpc_shutdown_client(clnt);
372                 clnt = ERR_PTR(err);
373         }
374 out:    
375         return clnt;
376 }
377
378 /*
379  * Default callback for async RPC calls
380  */
381 static void
382 rpc_default_callback(struct rpc_task *task, void *data)
383 {
384 }
385
386 static const struct rpc_call_ops rpc_default_ops = {
387         .rpc_call_done = rpc_default_callback,
388 };
389
390 /*
391  *      Export the signal mask handling for synchronous code that
392  *      sleeps on RPC calls
393  */
394 #define RPC_INTR_SIGNALS (sigmask(SIGHUP) | sigmask(SIGINT) | sigmask(SIGQUIT) | sigmask(SIGTERM))
395  
396 static void rpc_save_sigmask(sigset_t *oldset, int intr)
397 {
398         unsigned long   sigallow = sigmask(SIGKILL);
399         sigset_t sigmask;
400
401         /* Block all signals except those listed in sigallow */
402         if (intr)
403                 sigallow |= RPC_INTR_SIGNALS;
404         siginitsetinv(&sigmask, sigallow);
405         sigprocmask(SIG_BLOCK, &sigmask, oldset);
406 }
407
408 static inline void rpc_task_sigmask(struct rpc_task *task, sigset_t *oldset)
409 {
410         rpc_save_sigmask(oldset, !RPC_TASK_UNINTERRUPTIBLE(task));
411 }
412
413 static inline void rpc_restore_sigmask(sigset_t *oldset)
414 {
415         sigprocmask(SIG_SETMASK, oldset, NULL);
416 }
417
418 void rpc_clnt_sigmask(struct rpc_clnt *clnt, sigset_t *oldset)
419 {
420         rpc_save_sigmask(oldset, clnt->cl_intr);
421 }
422
423 void rpc_clnt_sigunmask(struct rpc_clnt *clnt, sigset_t *oldset)
424 {
425         rpc_restore_sigmask(oldset);
426 }
427
428 /*
429  * New rpc_call implementation
430  */
431 int rpc_call_sync(struct rpc_clnt *clnt, struct rpc_message *msg, int flags)
432 {
433         struct rpc_task *task;
434         sigset_t        oldset;
435         int             status;
436
437         /* If this client is slain all further I/O fails */
438         if (clnt->cl_dead) 
439                 return -EIO;
440
441         BUG_ON(flags & RPC_TASK_ASYNC);
442
443         status = -ENOMEM;
444         task = rpc_new_task(clnt, flags, &rpc_default_ops, NULL);
445         if (task == NULL)
446                 goto out;
447
448         /* Mask signals on RPC calls _and_ GSS_AUTH upcalls */
449         rpc_task_sigmask(task, &oldset);
450
451         rpc_call_setup(task, msg, 0);
452
453         /* Set up the call info struct and execute the task */
454         status = task->tk_status;
455         if (status == 0) {
456                 atomic_inc(&task->tk_count);
457                 status = rpc_execute(task);
458                 if (status == 0)
459                         status = task->tk_status;
460         }
461         rpc_restore_sigmask(&oldset);
462         rpc_release_task(task);
463 out:
464         return status;
465 }
466
467 /*
468  * New rpc_call implementation
469  */
470 int
471 rpc_call_async(struct rpc_clnt *clnt, struct rpc_message *msg, int flags,
472                const struct rpc_call_ops *tk_ops, void *data)
473 {
474         struct rpc_task *task;
475         sigset_t        oldset;
476         int             status;
477
478         /* If this client is slain all further I/O fails */
479         if (clnt->cl_dead) 
480                 return -EIO;
481
482         flags |= RPC_TASK_ASYNC;
483
484         /* Create/initialize a new RPC task */
485         status = -ENOMEM;
486         if (!(task = rpc_new_task(clnt, flags, tk_ops, data)))
487                 goto out;
488
489         /* Mask signals on GSS_AUTH upcalls */
490         rpc_task_sigmask(task, &oldset);                
491
492         rpc_call_setup(task, msg, 0);
493
494         /* Set up the call info struct and execute the task */
495         status = task->tk_status;
496         if (status == 0)
497                 rpc_execute(task);
498         else
499                 rpc_release_task(task);
500
501         rpc_restore_sigmask(&oldset);           
502 out:
503         return status;
504 }
505
506
507 void
508 rpc_call_setup(struct rpc_task *task, struct rpc_message *msg, int flags)
509 {
510         task->tk_msg   = *msg;
511         task->tk_flags |= flags;
512         /* Bind the user cred */
513         if (task->tk_msg.rpc_cred != NULL)
514                 rpcauth_holdcred(task);
515         else
516                 rpcauth_bindcred(task);
517
518         if (task->tk_status == 0)
519                 task->tk_action = call_start;
520         else
521                 task->tk_action = rpc_exit_task;
522 }
523
524 void
525 rpc_setbufsize(struct rpc_clnt *clnt, unsigned int sndsize, unsigned int rcvsize)
526 {
527         struct rpc_xprt *xprt = clnt->cl_xprt;
528         if (xprt->ops->set_buffer_size)
529                 xprt->ops->set_buffer_size(xprt, sndsize, rcvsize);
530 }
531
532 /*
533  * Return size of largest payload RPC client can support, in bytes
534  *
535  * For stream transports, this is one RPC record fragment (see RFC
536  * 1831), as we don't support multi-record requests yet.  For datagram
537  * transports, this is the size of an IP packet minus the IP, UDP, and
538  * RPC header sizes.
539  */
540 size_t rpc_max_payload(struct rpc_clnt *clnt)
541 {
542         return clnt->cl_xprt->max_payload;
543 }
544 EXPORT_SYMBOL(rpc_max_payload);
545
546 /**
547  * rpc_force_rebind - force transport to check that remote port is unchanged
548  * @clnt: client to rebind
549  *
550  */
551 void rpc_force_rebind(struct rpc_clnt *clnt)
552 {
553         if (clnt->cl_autobind)
554                 clnt->cl_port = 0;
555 }
556 EXPORT_SYMBOL(rpc_force_rebind);
557
558 /*
559  * Restart an (async) RPC call. Usually called from within the
560  * exit handler.
561  */
562 void
563 rpc_restart_call(struct rpc_task *task)
564 {
565         if (RPC_ASSASSINATED(task))
566                 return;
567
568         task->tk_action = call_start;
569 }
570
571 /*
572  * 0.  Initial state
573  *
574  *     Other FSM states can be visited zero or more times, but
575  *     this state is visited exactly once for each RPC.
576  */
577 static void
578 call_start(struct rpc_task *task)
579 {
580         struct rpc_clnt *clnt = task->tk_client;
581
582         dprintk("RPC: %4d call_start %s%d proc %d (%s)\n", task->tk_pid,
583                 clnt->cl_protname, clnt->cl_vers, task->tk_msg.rpc_proc->p_proc,
584                 (RPC_IS_ASYNC(task) ? "async" : "sync"));
585
586         /* Increment call count */
587         task->tk_msg.rpc_proc->p_count++;
588         clnt->cl_stats->rpccnt++;
589         task->tk_action = call_reserve;
590 }
591
592 /*
593  * 1.   Reserve an RPC call slot
594  */
595 static void
596 call_reserve(struct rpc_task *task)
597 {
598         dprintk("RPC: %4d call_reserve\n", task->tk_pid);
599
600         if (!rpcauth_uptodatecred(task)) {
601                 task->tk_action = call_refresh;
602                 return;
603         }
604
605         task->tk_status  = 0;
606         task->tk_action  = call_reserveresult;
607         xprt_reserve(task);
608 }
609
610 /*
611  * 1b.  Grok the result of xprt_reserve()
612  */
613 static void
614 call_reserveresult(struct rpc_task *task)
615 {
616         int status = task->tk_status;
617
618         dprintk("RPC: %4d call_reserveresult (status %d)\n",
619                                 task->tk_pid, task->tk_status);
620
621         /*
622          * After a call to xprt_reserve(), we must have either
623          * a request slot or else an error status.
624          */
625         task->tk_status = 0;
626         if (status >= 0) {
627                 if (task->tk_rqstp) {
628                         task->tk_action = call_allocate;
629                         return;
630                 }
631
632                 printk(KERN_ERR "%s: status=%d, but no request slot, exiting\n",
633                                 __FUNCTION__, status);
634                 rpc_exit(task, -EIO);
635                 return;
636         }
637
638         /*
639          * Even though there was an error, we may have acquired
640          * a request slot somehow.  Make sure not to leak it.
641          */
642         if (task->tk_rqstp) {
643                 printk(KERN_ERR "%s: status=%d, request allocated anyway\n",
644                                 __FUNCTION__, status);
645                 xprt_release(task);
646         }
647
648         switch (status) {
649         case -EAGAIN:   /* woken up; retry */
650                 task->tk_action = call_reserve;
651                 return;
652         case -EIO:      /* probably a shutdown */
653                 break;
654         default:
655                 printk(KERN_ERR "%s: unrecognized error %d, exiting\n",
656                                 __FUNCTION__, status);
657                 break;
658         }
659         rpc_exit(task, status);
660 }
661
662 /*
663  * 2.   Allocate the buffer. For details, see sched.c:rpc_malloc.
664  *      (Note: buffer memory is freed in xprt_release).
665  */
666 static void
667 call_allocate(struct rpc_task *task)
668 {
669         struct rpc_rqst *req = task->tk_rqstp;
670         struct rpc_xprt *xprt = task->tk_xprt;
671         unsigned int    bufsiz;
672
673         dprintk("RPC: %4d call_allocate (status %d)\n", 
674                                 task->tk_pid, task->tk_status);
675         task->tk_action = call_bind;
676         if (req->rq_buffer)
677                 return;
678
679         /* FIXME: compute buffer requirements more exactly using
680          * auth->au_wslack */
681         bufsiz = task->tk_msg.rpc_proc->p_bufsiz + RPC_SLACK_SPACE;
682
683         if (xprt->ops->buf_alloc(task, bufsiz << 1) != NULL)
684                 return;
685         printk(KERN_INFO "RPC: buffer allocation failed for task %p\n", task); 
686
687         if (RPC_IS_ASYNC(task) || !signalled()) {
688                 xprt_release(task);
689                 task->tk_action = call_reserve;
690                 rpc_delay(task, HZ>>4);
691                 return;
692         }
693
694         rpc_exit(task, -ERESTARTSYS);
695 }
696
697 static inline int
698 rpc_task_need_encode(struct rpc_task *task)
699 {
700         return task->tk_rqstp->rq_snd_buf.len == 0;
701 }
702
703 static inline void
704 rpc_task_force_reencode(struct rpc_task *task)
705 {
706         task->tk_rqstp->rq_snd_buf.len = 0;
707 }
708
709 /*
710  * 3.   Encode arguments of an RPC call
711  */
712 static void
713 call_encode(struct rpc_task *task)
714 {
715         struct rpc_rqst *req = task->tk_rqstp;
716         struct xdr_buf *sndbuf = &req->rq_snd_buf;
717         struct xdr_buf *rcvbuf = &req->rq_rcv_buf;
718         unsigned int    bufsiz;
719         kxdrproc_t      encode;
720         u32             *p;
721
722         dprintk("RPC: %4d call_encode (status %d)\n", 
723                                 task->tk_pid, task->tk_status);
724
725         /* Default buffer setup */
726         bufsiz = req->rq_bufsize >> 1;
727         sndbuf->head[0].iov_base = (void *)req->rq_buffer;
728         sndbuf->head[0].iov_len  = bufsiz;
729         sndbuf->tail[0].iov_len  = 0;
730         sndbuf->page_len         = 0;
731         sndbuf->len              = 0;
732         sndbuf->buflen           = bufsiz;
733         rcvbuf->head[0].iov_base = (void *)((char *)req->rq_buffer + bufsiz);
734         rcvbuf->head[0].iov_len  = bufsiz;
735         rcvbuf->tail[0].iov_len  = 0;
736         rcvbuf->page_len         = 0;
737         rcvbuf->len              = 0;
738         rcvbuf->buflen           = bufsiz;
739
740         /* Encode header and provided arguments */
741         encode = task->tk_msg.rpc_proc->p_encode;
742         if (!(p = call_header(task))) {
743                 printk(KERN_INFO "RPC: call_header failed, exit EIO\n");
744                 rpc_exit(task, -EIO);
745                 return;
746         }
747         if (encode == NULL)
748                 return;
749
750         task->tk_status = rpcauth_wrap_req(task, encode, req, p,
751                         task->tk_msg.rpc_argp);
752         if (task->tk_status == -ENOMEM) {
753                 /* XXX: Is this sane? */
754                 rpc_delay(task, 3*HZ);
755                 task->tk_status = -EAGAIN;
756         }
757 }
758
759 /*
760  * 4.   Get the server port number if not yet set
761  */
762 static void
763 call_bind(struct rpc_task *task)
764 {
765         struct rpc_clnt *clnt = task->tk_client;
766
767         dprintk("RPC: %4d call_bind (status %d)\n",
768                                 task->tk_pid, task->tk_status);
769
770         task->tk_action = call_connect;
771         if (!clnt->cl_port) {
772                 task->tk_action = call_bind_status;
773                 task->tk_timeout = task->tk_xprt->bind_timeout;
774                 rpc_getport(task, clnt);
775         }
776 }
777
778 /*
779  * 4a.  Sort out bind result
780  */
781 static void
782 call_bind_status(struct rpc_task *task)
783 {
784         int status = -EACCES;
785
786         if (task->tk_status >= 0) {
787                 dprintk("RPC: %4d call_bind_status (status %d)\n",
788                                         task->tk_pid, task->tk_status);
789                 task->tk_status = 0;
790                 task->tk_action = call_connect;
791                 return;
792         }
793
794         switch (task->tk_status) {
795         case -EACCES:
796                 dprintk("RPC: %4d remote rpcbind: RPC program/version unavailable\n",
797                                 task->tk_pid);
798                 rpc_delay(task, 3*HZ);
799                 goto retry_bind;
800         case -ETIMEDOUT:
801                 dprintk("RPC: %4d rpcbind request timed out\n",
802                                 task->tk_pid);
803                 if (RPC_IS_SOFT(task)) {
804                         status = -EIO;
805                         break;
806                 }
807                 goto retry_bind;
808         case -EPFNOSUPPORT:
809                 dprintk("RPC: %4d remote rpcbind service unavailable\n",
810                                 task->tk_pid);
811                 break;
812         case -EPROTONOSUPPORT:
813                 dprintk("RPC: %4d remote rpcbind version 2 unavailable\n",
814                                 task->tk_pid);
815                 break;
816         default:
817                 dprintk("RPC: %4d unrecognized rpcbind error (%d)\n",
818                                 task->tk_pid, -task->tk_status);
819                 status = -EIO;
820                 break;
821         }
822
823         rpc_exit(task, status);
824         return;
825
826 retry_bind:
827         task->tk_status = 0;
828         task->tk_action = call_bind;
829         return;
830 }
831
832 /*
833  * 4b.  Connect to the RPC server
834  */
835 static void
836 call_connect(struct rpc_task *task)
837 {
838         struct rpc_xprt *xprt = task->tk_xprt;
839
840         dprintk("RPC: %4d call_connect xprt %p %s connected\n",
841                         task->tk_pid, xprt,
842                         (xprt_connected(xprt) ? "is" : "is not"));
843
844         task->tk_action = call_transmit;
845         if (!xprt_connected(xprt)) {
846                 task->tk_action = call_connect_status;
847                 if (task->tk_status < 0)
848                         return;
849                 xprt_connect(task);
850         }
851 }
852
853 /*
854  * 4c.  Sort out connect result
855  */
856 static void
857 call_connect_status(struct rpc_task *task)
858 {
859         struct rpc_clnt *clnt = task->tk_client;
860         int status = task->tk_status;
861
862         dprintk("RPC: %5u call_connect_status (status %d)\n", 
863                                 task->tk_pid, task->tk_status);
864
865         task->tk_status = 0;
866         if (status >= 0) {
867                 clnt->cl_stats->netreconn++;
868                 task->tk_action = call_transmit;
869                 return;
870         }
871
872         /* Something failed: remote service port may have changed */
873         rpc_force_rebind(clnt);
874
875         switch (status) {
876         case -ENOTCONN:
877         case -ETIMEDOUT:
878         case -EAGAIN:
879                 task->tk_action = call_bind;
880                 break;
881         default:
882                 rpc_exit(task, -EIO);
883                 break;
884         }
885 }
886
887 /*
888  * 5.   Transmit the RPC request, and wait for reply
889  */
890 static void
891 call_transmit(struct rpc_task *task)
892 {
893         dprintk("RPC: %4d call_transmit (status %d)\n", 
894                                 task->tk_pid, task->tk_status);
895
896         task->tk_action = call_status;
897         if (task->tk_status < 0)
898                 return;
899         task->tk_status = xprt_prepare_transmit(task);
900         if (task->tk_status != 0)
901                 return;
902         /* Encode here so that rpcsec_gss can use correct sequence number. */
903         if (rpc_task_need_encode(task)) {
904                 task->tk_rqstp->rq_bytes_sent = 0;
905                 call_encode(task);
906                 /* Did the encode result in an error condition? */
907                 if (task->tk_status != 0)
908                         goto out_nosend;
909         }
910         task->tk_action = call_transmit_status;
911         xprt_transmit(task);
912         if (task->tk_status < 0)
913                 return;
914         if (!task->tk_msg.rpc_proc->p_decode) {
915                 task->tk_action = rpc_exit_task;
916                 rpc_wake_up_task(task);
917         }
918         return;
919 out_nosend:
920         /* release socket write lock before attempting to handle error */
921         xprt_abort_transmit(task);
922         rpc_task_force_reencode(task);
923 }
924
925 /*
926  * 6.   Sort out the RPC call status
927  */
928 static void
929 call_status(struct rpc_task *task)
930 {
931         struct rpc_clnt *clnt = task->tk_client;
932         struct rpc_rqst *req = task->tk_rqstp;
933         int             status;
934
935         if (req->rq_received > 0 && !req->rq_bytes_sent)
936                 task->tk_status = req->rq_received;
937
938         dprintk("RPC: %4d call_status (status %d)\n", 
939                                 task->tk_pid, task->tk_status);
940
941         status = task->tk_status;
942         if (status >= 0) {
943                 task->tk_action = call_decode;
944                 return;
945         }
946
947         task->tk_status = 0;
948         switch(status) {
949         case -ETIMEDOUT:
950                 task->tk_action = call_timeout;
951                 break;
952         case -ECONNREFUSED:
953         case -ENOTCONN:
954                 rpc_force_rebind(clnt);
955                 task->tk_action = call_bind;
956                 break;
957         case -EAGAIN:
958                 task->tk_action = call_transmit;
959                 break;
960         case -EIO:
961                 /* shutdown or soft timeout */
962                 rpc_exit(task, status);
963                 break;
964         default:
965                 printk("%s: RPC call returned error %d\n",
966                                clnt->cl_protname, -status);
967                 rpc_exit(task, status);
968                 break;
969         }
970 }
971
972 /*
973  * 6a.  Handle transmission errors.
974  */
975 static void
976 call_transmit_status(struct rpc_task *task)
977 {
978         if (task->tk_status != -EAGAIN)
979                 rpc_task_force_reencode(task);
980         call_status(task);
981 }
982
983 /*
984  * 6b.  Handle RPC timeout
985  *      We do not release the request slot, so we keep using the
986  *      same XID for all retransmits.
987  */
988 static void
989 call_timeout(struct rpc_task *task)
990 {
991         struct rpc_clnt *clnt = task->tk_client;
992
993         if (xprt_adjust_timeout(task->tk_rqstp) == 0) {
994                 dprintk("RPC: %4d call_timeout (minor)\n", task->tk_pid);
995                 goto retry;
996         }
997
998         dprintk("RPC: %4d call_timeout (major)\n", task->tk_pid);
999         if (RPC_IS_SOFT(task)) {
1000                 printk(KERN_NOTICE "%s: server %s not responding, timed out\n",
1001                                 clnt->cl_protname, clnt->cl_server);
1002                 rpc_exit(task, -EIO);
1003                 return;
1004         }
1005
1006         if (!(task->tk_flags & RPC_CALL_MAJORSEEN)) {
1007                 task->tk_flags |= RPC_CALL_MAJORSEEN;
1008                 printk(KERN_NOTICE "%s: server %s not responding, still trying\n",
1009                         clnt->cl_protname, clnt->cl_server);
1010         }
1011         rpc_force_rebind(clnt);
1012
1013 retry:
1014         clnt->cl_stats->rpcretrans++;
1015         task->tk_action = call_bind;
1016         task->tk_status = 0;
1017 }
1018
1019 /*
1020  * 7.   Decode the RPC reply
1021  */
1022 static void
1023 call_decode(struct rpc_task *task)
1024 {
1025         struct rpc_clnt *clnt = task->tk_client;
1026         struct rpc_rqst *req = task->tk_rqstp;
1027         kxdrproc_t      decode = task->tk_msg.rpc_proc->p_decode;
1028         u32             *p;
1029
1030         dprintk("RPC: %4d call_decode (status %d)\n", 
1031                                 task->tk_pid, task->tk_status);
1032
1033         if (task->tk_flags & RPC_CALL_MAJORSEEN) {
1034                 printk(KERN_NOTICE "%s: server %s OK\n",
1035                         clnt->cl_protname, clnt->cl_server);
1036                 task->tk_flags &= ~RPC_CALL_MAJORSEEN;
1037         }
1038
1039         if (task->tk_status < 12) {
1040                 if (!RPC_IS_SOFT(task)) {
1041                         task->tk_action = call_bind;
1042                         clnt->cl_stats->rpcretrans++;
1043                         goto out_retry;
1044                 }
1045                 printk(KERN_WARNING "%s: too small RPC reply size (%d bytes)\n",
1046                         clnt->cl_protname, task->tk_status);
1047                 rpc_exit(task, -EIO);
1048                 return;
1049         }
1050
1051         req->rq_rcv_buf.len = req->rq_private_buf.len;
1052
1053         /* Check that the softirq receive buffer is valid */
1054         WARN_ON(memcmp(&req->rq_rcv_buf, &req->rq_private_buf,
1055                                 sizeof(req->rq_rcv_buf)) != 0);
1056
1057         /* Verify the RPC header */
1058         p = call_verify(task);
1059         if (IS_ERR(p)) {
1060                 if (p == ERR_PTR(-EAGAIN))
1061                         goto out_retry;
1062                 return;
1063         }
1064
1065         task->tk_action = rpc_exit_task;
1066
1067         if (decode)
1068                 task->tk_status = rpcauth_unwrap_resp(task, decode, req, p,
1069                                                       task->tk_msg.rpc_resp);
1070         dprintk("RPC: %4d call_decode result %d\n", task->tk_pid,
1071                                         task->tk_status);
1072         return;
1073 out_retry:
1074         req->rq_received = req->rq_private_buf.len = 0;
1075         task->tk_status = 0;
1076 }
1077
1078 /*
1079  * 8.   Refresh the credentials if rejected by the server
1080  */
1081 static void
1082 call_refresh(struct rpc_task *task)
1083 {
1084         dprintk("RPC: %4d call_refresh\n", task->tk_pid);
1085
1086         xprt_release(task);     /* Must do to obtain new XID */
1087         task->tk_action = call_refreshresult;
1088         task->tk_status = 0;
1089         task->tk_client->cl_stats->rpcauthrefresh++;
1090         rpcauth_refreshcred(task);
1091 }
1092
1093 /*
1094  * 8a.  Process the results of a credential refresh
1095  */
1096 static void
1097 call_refreshresult(struct rpc_task *task)
1098 {
1099         int status = task->tk_status;
1100         dprintk("RPC: %4d call_refreshresult (status %d)\n", 
1101                                 task->tk_pid, task->tk_status);
1102
1103         task->tk_status = 0;
1104         task->tk_action = call_reserve;
1105         if (status >= 0 && rpcauth_uptodatecred(task))
1106                 return;
1107         if (status == -EACCES) {
1108                 rpc_exit(task, -EACCES);
1109                 return;
1110         }
1111         task->tk_action = call_refresh;
1112         if (status != -ETIMEDOUT)
1113                 rpc_delay(task, 3*HZ);
1114         return;
1115 }
1116
1117 /*
1118  * Call header serialization
1119  */
1120 static u32 *
1121 call_header(struct rpc_task *task)
1122 {
1123         struct rpc_clnt *clnt = task->tk_client;
1124         struct rpc_rqst *req = task->tk_rqstp;
1125         u32             *p = req->rq_svec[0].iov_base;
1126
1127         /* FIXME: check buffer size? */
1128
1129         p = xprt_skip_transport_header(task->tk_xprt, p);
1130         *p++ = req->rq_xid;             /* XID */
1131         *p++ = htonl(RPC_CALL);         /* CALL */
1132         *p++ = htonl(RPC_VERSION);      /* RPC version */
1133         *p++ = htonl(clnt->cl_prog);    /* program number */
1134         *p++ = htonl(clnt->cl_vers);    /* program version */
1135         *p++ = htonl(task->tk_msg.rpc_proc->p_proc);    /* procedure */
1136         p = rpcauth_marshcred(task, p);
1137         req->rq_slen = xdr_adjust_iovec(&req->rq_svec[0], p);
1138         return p;
1139 }
1140
1141 /*
1142  * Reply header verification
1143  */
1144 static u32 *
1145 call_verify(struct rpc_task *task)
1146 {
1147         struct kvec *iov = &task->tk_rqstp->rq_rcv_buf.head[0];
1148         int len = task->tk_rqstp->rq_rcv_buf.len >> 2;
1149         u32     *p = iov->iov_base, n;
1150         int error = -EACCES;
1151
1152         if ((len -= 3) < 0)
1153                 goto out_overflow;
1154         p += 1; /* skip XID */
1155
1156         if ((n = ntohl(*p++)) != RPC_REPLY) {
1157                 printk(KERN_WARNING "call_verify: not an RPC reply: %x\n", n);
1158                 goto out_garbage;
1159         }
1160         if ((n = ntohl(*p++)) != RPC_MSG_ACCEPTED) {
1161                 if (--len < 0)
1162                         goto out_overflow;
1163                 switch ((n = ntohl(*p++))) {
1164                         case RPC_AUTH_ERROR:
1165                                 break;
1166                         case RPC_MISMATCH:
1167                                 dprintk("%s: RPC call version mismatch!\n", __FUNCTION__);
1168                                 error = -EPROTONOSUPPORT;
1169                                 goto out_err;
1170                         default:
1171                                 dprintk("%s: RPC call rejected, unknown error: %x\n", __FUNCTION__, n);
1172                                 goto out_eio;
1173                 }
1174                 if (--len < 0)
1175                         goto out_overflow;
1176                 switch ((n = ntohl(*p++))) {
1177                 case RPC_AUTH_REJECTEDCRED:
1178                 case RPC_AUTH_REJECTEDVERF:
1179                 case RPCSEC_GSS_CREDPROBLEM:
1180                 case RPCSEC_GSS_CTXPROBLEM:
1181                         if (!task->tk_cred_retry)
1182                                 break;
1183                         task->tk_cred_retry--;
1184                         dprintk("RPC: %4d call_verify: retry stale creds\n",
1185                                                         task->tk_pid);
1186                         rpcauth_invalcred(task);
1187                         task->tk_action = call_refresh;
1188                         goto out_retry;
1189                 case RPC_AUTH_BADCRED:
1190                 case RPC_AUTH_BADVERF:
1191                         /* possibly garbled cred/verf? */
1192                         if (!task->tk_garb_retry)
1193                                 break;
1194                         task->tk_garb_retry--;
1195                         dprintk("RPC: %4d call_verify: retry garbled creds\n",
1196                                                         task->tk_pid);
1197                         task->tk_action = call_bind;
1198                         goto out_retry;
1199                 case RPC_AUTH_TOOWEAK:
1200                         printk(KERN_NOTICE "call_verify: server requires stronger "
1201                                "authentication.\n");
1202                         break;
1203                 default:
1204                         printk(KERN_WARNING "call_verify: unknown auth error: %x\n", n);
1205                         error = -EIO;
1206                 }
1207                 dprintk("RPC: %4d call_verify: call rejected %d\n",
1208                                                 task->tk_pid, n);
1209                 goto out_err;
1210         }
1211         if (!(p = rpcauth_checkverf(task, p))) {
1212                 printk(KERN_WARNING "call_verify: auth check failed\n");
1213                 goto out_garbage;               /* bad verifier, retry */
1214         }
1215         len = p - (u32 *)iov->iov_base - 1;
1216         if (len < 0)
1217                 goto out_overflow;
1218         switch ((n = ntohl(*p++))) {
1219         case RPC_SUCCESS:
1220                 return p;
1221         case RPC_PROG_UNAVAIL:
1222                 dprintk("RPC: call_verify: program %u is unsupported by server %s\n",
1223                                 (unsigned int)task->tk_client->cl_prog,
1224                                 task->tk_client->cl_server);
1225                 error = -EPFNOSUPPORT;
1226                 goto out_err;
1227         case RPC_PROG_MISMATCH:
1228                 dprintk("RPC: call_verify: program %u, version %u unsupported by server %s\n",
1229                                 (unsigned int)task->tk_client->cl_prog,
1230                                 (unsigned int)task->tk_client->cl_vers,
1231                                 task->tk_client->cl_server);
1232                 error = -EPROTONOSUPPORT;
1233                 goto out_err;
1234         case RPC_PROC_UNAVAIL:
1235                 dprintk("RPC: call_verify: proc %p unsupported by program %u, version %u on server %s\n",
1236                                 task->tk_msg.rpc_proc,
1237                                 task->tk_client->cl_prog,
1238                                 task->tk_client->cl_vers,
1239                                 task->tk_client->cl_server);
1240                 error = -EOPNOTSUPP;
1241                 goto out_err;
1242         case RPC_GARBAGE_ARGS:
1243                 dprintk("RPC: %4d %s: server saw garbage\n", task->tk_pid, __FUNCTION__);
1244                 break;                  /* retry */
1245         default:
1246                 printk(KERN_WARNING "call_verify: server accept status: %x\n", n);
1247                 /* Also retry */
1248         }
1249
1250 out_garbage:
1251         task->tk_client->cl_stats->rpcgarbage++;
1252         if (task->tk_garb_retry) {
1253                 task->tk_garb_retry--;
1254                 dprintk("RPC %s: retrying %4d\n", __FUNCTION__, task->tk_pid);
1255                 task->tk_action = call_bind;
1256 out_retry:
1257                 return ERR_PTR(-EAGAIN);
1258         }
1259         printk(KERN_WARNING "RPC %s: retry failed, exit EIO\n", __FUNCTION__);
1260 out_eio:
1261         error = -EIO;
1262 out_err:
1263         rpc_exit(task, error);
1264         return ERR_PTR(error);
1265 out_overflow:
1266         printk(KERN_WARNING "RPC %s: server reply was truncated.\n", __FUNCTION__);
1267         goto out_garbage;
1268 }
1269
1270 static int rpcproc_encode_null(void *rqstp, u32 *data, void *obj)
1271 {
1272         return 0;
1273 }
1274
1275 static int rpcproc_decode_null(void *rqstp, u32 *data, void *obj)
1276 {
1277         return 0;
1278 }
1279
1280 static struct rpc_procinfo rpcproc_null = {
1281         .p_encode = rpcproc_encode_null,
1282         .p_decode = rpcproc_decode_null,
1283 };
1284
1285 int rpc_ping(struct rpc_clnt *clnt, int flags)
1286 {
1287         struct rpc_message msg = {
1288                 .rpc_proc = &rpcproc_null,
1289         };
1290         int err;
1291         msg.rpc_cred = authnull_ops.lookup_cred(NULL, NULL, 0);
1292         err = rpc_call_sync(clnt, &msg, flags);
1293         put_rpccred(msg.rpc_cred);
1294         return err;
1295 }