]> git.karo-electronics.de Git - mv-sheeva.git/blob - fs/ceph/mds_client.c
ceph: support v2 reconnect encoding
[mv-sheeva.git] / fs / ceph / mds_client.c
1 #include "ceph_debug.h"
2
3 #include <linux/wait.h>
4 #include <linux/slab.h>
5 #include <linux/sched.h>
6
7 #include "mds_client.h"
8 #include "mon_client.h"
9 #include "super.h"
10 #include "messenger.h"
11 #include "decode.h"
12 #include "auth.h"
13 #include "pagelist.h"
14
15 /*
16  * A cluster of MDS (metadata server) daemons is responsible for
17  * managing the file system namespace (the directory hierarchy and
18  * inodes) and for coordinating shared access to storage.  Metadata is
19  * partitioning hierarchically across a number of servers, and that
20  * partition varies over time as the cluster adjusts the distribution
21  * in order to balance load.
22  *
23  * The MDS client is primarily responsible to managing synchronous
24  * metadata requests for operations like open, unlink, and so forth.
25  * If there is a MDS failure, we find out about it when we (possibly
26  * request and) receive a new MDS map, and can resubmit affected
27  * requests.
28  *
29  * For the most part, though, we take advantage of a lossless
30  * communications channel to the MDS, and do not need to worry about
31  * timing out or resubmitting requests.
32  *
33  * We maintain a stateful "session" with each MDS we interact with.
34  * Within each session, we sent periodic heartbeat messages to ensure
35  * any capabilities or leases we have been issues remain valid.  If
36  * the session times out and goes stale, our leases and capabilities
37  * are no longer valid.
38  */
39
40 struct ceph_reconnect_state {
41         struct ceph_pagelist *pagelist;
42         bool flock;
43 };
44
45 static void __wake_requests(struct ceph_mds_client *mdsc,
46                             struct list_head *head);
47
48 static const struct ceph_connection_operations mds_con_ops;
49
50
51 /*
52  * mds reply parsing
53  */
54
55 /*
56  * parse individual inode info
57  */
58 static int parse_reply_info_in(void **p, void *end,
59                                struct ceph_mds_reply_info_in *info)
60 {
61         int err = -EIO;
62
63         info->in = *p;
64         *p += sizeof(struct ceph_mds_reply_inode) +
65                 sizeof(*info->in->fragtree.splits) *
66                 le32_to_cpu(info->in->fragtree.nsplits);
67
68         ceph_decode_32_safe(p, end, info->symlink_len, bad);
69         ceph_decode_need(p, end, info->symlink_len, bad);
70         info->symlink = *p;
71         *p += info->symlink_len;
72
73         ceph_decode_32_safe(p, end, info->xattr_len, bad);
74         ceph_decode_need(p, end, info->xattr_len, bad);
75         info->xattr_data = *p;
76         *p += info->xattr_len;
77         return 0;
78 bad:
79         return err;
80 }
81
82 /*
83  * parse a normal reply, which may contain a (dir+)dentry and/or a
84  * target inode.
85  */
86 static int parse_reply_info_trace(void **p, void *end,
87                                   struct ceph_mds_reply_info_parsed *info)
88 {
89         int err;
90
91         if (info->head->is_dentry) {
92                 err = parse_reply_info_in(p, end, &info->diri);
93                 if (err < 0)
94                         goto out_bad;
95
96                 if (unlikely(*p + sizeof(*info->dirfrag) > end))
97                         goto bad;
98                 info->dirfrag = *p;
99                 *p += sizeof(*info->dirfrag) +
100                         sizeof(u32)*le32_to_cpu(info->dirfrag->ndist);
101                 if (unlikely(*p > end))
102                         goto bad;
103
104                 ceph_decode_32_safe(p, end, info->dname_len, bad);
105                 ceph_decode_need(p, end, info->dname_len, bad);
106                 info->dname = *p;
107                 *p += info->dname_len;
108                 info->dlease = *p;
109                 *p += sizeof(*info->dlease);
110         }
111
112         if (info->head->is_target) {
113                 err = parse_reply_info_in(p, end, &info->targeti);
114                 if (err < 0)
115                         goto out_bad;
116         }
117
118         if (unlikely(*p != end))
119                 goto bad;
120         return 0;
121
122 bad:
123         err = -EIO;
124 out_bad:
125         pr_err("problem parsing mds trace %d\n", err);
126         return err;
127 }
128
129 /*
130  * parse readdir results
131  */
132 static int parse_reply_info_dir(void **p, void *end,
133                                 struct ceph_mds_reply_info_parsed *info)
134 {
135         u32 num, i = 0;
136         int err;
137
138         info->dir_dir = *p;
139         if (*p + sizeof(*info->dir_dir) > end)
140                 goto bad;
141         *p += sizeof(*info->dir_dir) +
142                 sizeof(u32)*le32_to_cpu(info->dir_dir->ndist);
143         if (*p > end)
144                 goto bad;
145
146         ceph_decode_need(p, end, sizeof(num) + 2, bad);
147         num = ceph_decode_32(p);
148         info->dir_end = ceph_decode_8(p);
149         info->dir_complete = ceph_decode_8(p);
150         if (num == 0)
151                 goto done;
152
153         /* alloc large array */
154         info->dir_nr = num;
155         info->dir_in = kcalloc(num, sizeof(*info->dir_in) +
156                                sizeof(*info->dir_dname) +
157                                sizeof(*info->dir_dname_len) +
158                                sizeof(*info->dir_dlease),
159                                GFP_NOFS);
160         if (info->dir_in == NULL) {
161                 err = -ENOMEM;
162                 goto out_bad;
163         }
164         info->dir_dname = (void *)(info->dir_in + num);
165         info->dir_dname_len = (void *)(info->dir_dname + num);
166         info->dir_dlease = (void *)(info->dir_dname_len + num);
167
168         while (num) {
169                 /* dentry */
170                 ceph_decode_need(p, end, sizeof(u32)*2, bad);
171                 info->dir_dname_len[i] = ceph_decode_32(p);
172                 ceph_decode_need(p, end, info->dir_dname_len[i], bad);
173                 info->dir_dname[i] = *p;
174                 *p += info->dir_dname_len[i];
175                 dout("parsed dir dname '%.*s'\n", info->dir_dname_len[i],
176                      info->dir_dname[i]);
177                 info->dir_dlease[i] = *p;
178                 *p += sizeof(struct ceph_mds_reply_lease);
179
180                 /* inode */
181                 err = parse_reply_info_in(p, end, &info->dir_in[i]);
182                 if (err < 0)
183                         goto out_bad;
184                 i++;
185                 num--;
186         }
187
188 done:
189         if (*p != end)
190                 goto bad;
191         return 0;
192
193 bad:
194         err = -EIO;
195 out_bad:
196         pr_err("problem parsing dir contents %d\n", err);
197         return err;
198 }
199
200 /*
201  * parse entire mds reply
202  */
203 static int parse_reply_info(struct ceph_msg *msg,
204                             struct ceph_mds_reply_info_parsed *info)
205 {
206         void *p, *end;
207         u32 len;
208         int err;
209
210         info->head = msg->front.iov_base;
211         p = msg->front.iov_base + sizeof(struct ceph_mds_reply_head);
212         end = p + msg->front.iov_len - sizeof(struct ceph_mds_reply_head);
213
214         /* trace */
215         ceph_decode_32_safe(&p, end, len, bad);
216         if (len > 0) {
217                 err = parse_reply_info_trace(&p, p+len, info);
218                 if (err < 0)
219                         goto out_bad;
220         }
221
222         /* dir content */
223         ceph_decode_32_safe(&p, end, len, bad);
224         if (len > 0) {
225                 err = parse_reply_info_dir(&p, p+len, info);
226                 if (err < 0)
227                         goto out_bad;
228         }
229
230         /* snap blob */
231         ceph_decode_32_safe(&p, end, len, bad);
232         info->snapblob_len = len;
233         info->snapblob = p;
234         p += len;
235
236         if (p != end)
237                 goto bad;
238         return 0;
239
240 bad:
241         err = -EIO;
242 out_bad:
243         pr_err("mds parse_reply err %d\n", err);
244         return err;
245 }
246
247 static void destroy_reply_info(struct ceph_mds_reply_info_parsed *info)
248 {
249         kfree(info->dir_in);
250 }
251
252
253 /*
254  * sessions
255  */
256 static const char *session_state_name(int s)
257 {
258         switch (s) {
259         case CEPH_MDS_SESSION_NEW: return "new";
260         case CEPH_MDS_SESSION_OPENING: return "opening";
261         case CEPH_MDS_SESSION_OPEN: return "open";
262         case CEPH_MDS_SESSION_HUNG: return "hung";
263         case CEPH_MDS_SESSION_CLOSING: return "closing";
264         case CEPH_MDS_SESSION_RESTARTING: return "restarting";
265         case CEPH_MDS_SESSION_RECONNECTING: return "reconnecting";
266         default: return "???";
267         }
268 }
269
270 static struct ceph_mds_session *get_session(struct ceph_mds_session *s)
271 {
272         if (atomic_inc_not_zero(&s->s_ref)) {
273                 dout("mdsc get_session %p %d -> %d\n", s,
274                      atomic_read(&s->s_ref)-1, atomic_read(&s->s_ref));
275                 return s;
276         } else {
277                 dout("mdsc get_session %p 0 -- FAIL", s);
278                 return NULL;
279         }
280 }
281
282 void ceph_put_mds_session(struct ceph_mds_session *s)
283 {
284         dout("mdsc put_session %p %d -> %d\n", s,
285              atomic_read(&s->s_ref), atomic_read(&s->s_ref)-1);
286         if (atomic_dec_and_test(&s->s_ref)) {
287                 if (s->s_authorizer)
288                         s->s_mdsc->client->monc.auth->ops->destroy_authorizer(
289                                 s->s_mdsc->client->monc.auth, s->s_authorizer);
290                 kfree(s);
291         }
292 }
293
294 /*
295  * called under mdsc->mutex
296  */
297 struct ceph_mds_session *__ceph_lookup_mds_session(struct ceph_mds_client *mdsc,
298                                                    int mds)
299 {
300         struct ceph_mds_session *session;
301
302         if (mds >= mdsc->max_sessions || mdsc->sessions[mds] == NULL)
303                 return NULL;
304         session = mdsc->sessions[mds];
305         dout("lookup_mds_session %p %d\n", session,
306              atomic_read(&session->s_ref));
307         get_session(session);
308         return session;
309 }
310
311 static bool __have_session(struct ceph_mds_client *mdsc, int mds)
312 {
313         if (mds >= mdsc->max_sessions)
314                 return false;
315         return mdsc->sessions[mds];
316 }
317
318 static int __verify_registered_session(struct ceph_mds_client *mdsc,
319                                        struct ceph_mds_session *s)
320 {
321         if (s->s_mds >= mdsc->max_sessions ||
322             mdsc->sessions[s->s_mds] != s)
323                 return -ENOENT;
324         return 0;
325 }
326
327 /*
328  * create+register a new session for given mds.
329  * called under mdsc->mutex.
330  */
331 static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
332                                                  int mds)
333 {
334         struct ceph_mds_session *s;
335
336         s = kzalloc(sizeof(*s), GFP_NOFS);
337         if (!s)
338                 return ERR_PTR(-ENOMEM);
339         s->s_mdsc = mdsc;
340         s->s_mds = mds;
341         s->s_state = CEPH_MDS_SESSION_NEW;
342         s->s_ttl = 0;
343         s->s_seq = 0;
344         mutex_init(&s->s_mutex);
345
346         ceph_con_init(mdsc->client->msgr, &s->s_con);
347         s->s_con.private = s;
348         s->s_con.ops = &mds_con_ops;
349         s->s_con.peer_name.type = CEPH_ENTITY_TYPE_MDS;
350         s->s_con.peer_name.num = cpu_to_le64(mds);
351
352         spin_lock_init(&s->s_cap_lock);
353         s->s_cap_gen = 0;
354         s->s_cap_ttl = 0;
355         s->s_renew_requested = 0;
356         s->s_renew_seq = 0;
357         INIT_LIST_HEAD(&s->s_caps);
358         s->s_nr_caps = 0;
359         s->s_trim_caps = 0;
360         atomic_set(&s->s_ref, 1);
361         INIT_LIST_HEAD(&s->s_waiting);
362         INIT_LIST_HEAD(&s->s_unsafe);
363         s->s_num_cap_releases = 0;
364         s->s_cap_iterator = NULL;
365         INIT_LIST_HEAD(&s->s_cap_releases);
366         INIT_LIST_HEAD(&s->s_cap_releases_done);
367         INIT_LIST_HEAD(&s->s_cap_flushing);
368         INIT_LIST_HEAD(&s->s_cap_snaps_flushing);
369
370         dout("register_session mds%d\n", mds);
371         if (mds >= mdsc->max_sessions) {
372                 int newmax = 1 << get_count_order(mds+1);
373                 struct ceph_mds_session **sa;
374
375                 dout("register_session realloc to %d\n", newmax);
376                 sa = kcalloc(newmax, sizeof(void *), GFP_NOFS);
377                 if (sa == NULL)
378                         goto fail_realloc;
379                 if (mdsc->sessions) {
380                         memcpy(sa, mdsc->sessions,
381                                mdsc->max_sessions * sizeof(void *));
382                         kfree(mdsc->sessions);
383                 }
384                 mdsc->sessions = sa;
385                 mdsc->max_sessions = newmax;
386         }
387         mdsc->sessions[mds] = s;
388         atomic_inc(&s->s_ref);  /* one ref to sessions[], one to caller */
389
390         ceph_con_open(&s->s_con, ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
391
392         return s;
393
394 fail_realloc:
395         kfree(s);
396         return ERR_PTR(-ENOMEM);
397 }
398
399 /*
400  * called under mdsc->mutex
401  */
402 static void __unregister_session(struct ceph_mds_client *mdsc,
403                                struct ceph_mds_session *s)
404 {
405         dout("__unregister_session mds%d %p\n", s->s_mds, s);
406         BUG_ON(mdsc->sessions[s->s_mds] != s);
407         mdsc->sessions[s->s_mds] = NULL;
408         ceph_con_close(&s->s_con);
409         ceph_put_mds_session(s);
410 }
411
412 /*
413  * drop session refs in request.
414  *
415  * should be last request ref, or hold mdsc->mutex
416  */
417 static void put_request_session(struct ceph_mds_request *req)
418 {
419         if (req->r_session) {
420                 ceph_put_mds_session(req->r_session);
421                 req->r_session = NULL;
422         }
423 }
424
425 void ceph_mdsc_release_request(struct kref *kref)
426 {
427         struct ceph_mds_request *req = container_of(kref,
428                                                     struct ceph_mds_request,
429                                                     r_kref);
430         if (req->r_request)
431                 ceph_msg_put(req->r_request);
432         if (req->r_reply) {
433                 ceph_msg_put(req->r_reply);
434                 destroy_reply_info(&req->r_reply_info);
435         }
436         if (req->r_inode) {
437                 ceph_put_cap_refs(ceph_inode(req->r_inode),
438                                   CEPH_CAP_PIN);
439                 iput(req->r_inode);
440         }
441         if (req->r_locked_dir)
442                 ceph_put_cap_refs(ceph_inode(req->r_locked_dir),
443                                   CEPH_CAP_PIN);
444         if (req->r_target_inode)
445                 iput(req->r_target_inode);
446         if (req->r_dentry)
447                 dput(req->r_dentry);
448         if (req->r_old_dentry) {
449                 ceph_put_cap_refs(
450                         ceph_inode(req->r_old_dentry->d_parent->d_inode),
451                         CEPH_CAP_PIN);
452                 dput(req->r_old_dentry);
453         }
454         kfree(req->r_path1);
455         kfree(req->r_path2);
456         put_request_session(req);
457         ceph_unreserve_caps(req->r_mdsc, &req->r_caps_reservation);
458         kfree(req);
459 }
460
461 /*
462  * lookup session, bump ref if found.
463  *
464  * called under mdsc->mutex.
465  */
466 static struct ceph_mds_request *__lookup_request(struct ceph_mds_client *mdsc,
467                                              u64 tid)
468 {
469         struct ceph_mds_request *req;
470         struct rb_node *n = mdsc->request_tree.rb_node;
471
472         while (n) {
473                 req = rb_entry(n, struct ceph_mds_request, r_node);
474                 if (tid < req->r_tid)
475                         n = n->rb_left;
476                 else if (tid > req->r_tid)
477                         n = n->rb_right;
478                 else {
479                         ceph_mdsc_get_request(req);
480                         return req;
481                 }
482         }
483         return NULL;
484 }
485
486 static void __insert_request(struct ceph_mds_client *mdsc,
487                              struct ceph_mds_request *new)
488 {
489         struct rb_node **p = &mdsc->request_tree.rb_node;
490         struct rb_node *parent = NULL;
491         struct ceph_mds_request *req = NULL;
492
493         while (*p) {
494                 parent = *p;
495                 req = rb_entry(parent, struct ceph_mds_request, r_node);
496                 if (new->r_tid < req->r_tid)
497                         p = &(*p)->rb_left;
498                 else if (new->r_tid > req->r_tid)
499                         p = &(*p)->rb_right;
500                 else
501                         BUG();
502         }
503
504         rb_link_node(&new->r_node, parent, p);
505         rb_insert_color(&new->r_node, &mdsc->request_tree);
506 }
507
508 /*
509  * Register an in-flight request, and assign a tid.  Link to directory
510  * are modifying (if any).
511  *
512  * Called under mdsc->mutex.
513  */
514 static void __register_request(struct ceph_mds_client *mdsc,
515                                struct ceph_mds_request *req,
516                                struct inode *dir)
517 {
518         req->r_tid = ++mdsc->last_tid;
519         if (req->r_num_caps)
520                 ceph_reserve_caps(mdsc, &req->r_caps_reservation,
521                                   req->r_num_caps);
522         dout("__register_request %p tid %lld\n", req, req->r_tid);
523         ceph_mdsc_get_request(req);
524         __insert_request(mdsc, req);
525
526         if (dir) {
527                 struct ceph_inode_info *ci = ceph_inode(dir);
528
529                 spin_lock(&ci->i_unsafe_lock);
530                 req->r_unsafe_dir = dir;
531                 list_add_tail(&req->r_unsafe_dir_item, &ci->i_unsafe_dirops);
532                 spin_unlock(&ci->i_unsafe_lock);
533         }
534 }
535
536 static void __unregister_request(struct ceph_mds_client *mdsc,
537                                  struct ceph_mds_request *req)
538 {
539         dout("__unregister_request %p tid %lld\n", req, req->r_tid);
540         rb_erase(&req->r_node, &mdsc->request_tree);
541         RB_CLEAR_NODE(&req->r_node);
542
543         if (req->r_unsafe_dir) {
544                 struct ceph_inode_info *ci = ceph_inode(req->r_unsafe_dir);
545
546                 spin_lock(&ci->i_unsafe_lock);
547                 list_del_init(&req->r_unsafe_dir_item);
548                 spin_unlock(&ci->i_unsafe_lock);
549         }
550
551         ceph_mdsc_put_request(req);
552 }
553
554 /*
555  * Choose mds to send request to next.  If there is a hint set in the
556  * request (e.g., due to a prior forward hint from the mds), use that.
557  * Otherwise, consult frag tree and/or caps to identify the
558  * appropriate mds.  If all else fails, choose randomly.
559  *
560  * Called under mdsc->mutex.
561  */
562 static int __choose_mds(struct ceph_mds_client *mdsc,
563                         struct ceph_mds_request *req)
564 {
565         struct inode *inode;
566         struct ceph_inode_info *ci;
567         struct ceph_cap *cap;
568         int mode = req->r_direct_mode;
569         int mds = -1;
570         u32 hash = req->r_direct_hash;
571         bool is_hash = req->r_direct_is_hash;
572
573         /*
574          * is there a specific mds we should try?  ignore hint if we have
575          * no session and the mds is not up (active or recovering).
576          */
577         if (req->r_resend_mds >= 0 &&
578             (__have_session(mdsc, req->r_resend_mds) ||
579              ceph_mdsmap_get_state(mdsc->mdsmap, req->r_resend_mds) > 0)) {
580                 dout("choose_mds using resend_mds mds%d\n",
581                      req->r_resend_mds);
582                 return req->r_resend_mds;
583         }
584
585         if (mode == USE_RANDOM_MDS)
586                 goto random;
587
588         inode = NULL;
589         if (req->r_inode) {
590                 inode = req->r_inode;
591         } else if (req->r_dentry) {
592                 if (req->r_dentry->d_inode) {
593                         inode = req->r_dentry->d_inode;
594                 } else {
595                         inode = req->r_dentry->d_parent->d_inode;
596                         hash = req->r_dentry->d_name.hash;
597                         is_hash = true;
598                 }
599         }
600         dout("__choose_mds %p is_hash=%d (%d) mode %d\n", inode, (int)is_hash,
601              (int)hash, mode);
602         if (!inode)
603                 goto random;
604         ci = ceph_inode(inode);
605
606         if (is_hash && S_ISDIR(inode->i_mode)) {
607                 struct ceph_inode_frag frag;
608                 int found;
609
610                 ceph_choose_frag(ci, hash, &frag, &found);
611                 if (found) {
612                         if (mode == USE_ANY_MDS && frag.ndist > 0) {
613                                 u8 r;
614
615                                 /* choose a random replica */
616                                 get_random_bytes(&r, 1);
617                                 r %= frag.ndist;
618                                 mds = frag.dist[r];
619                                 dout("choose_mds %p %llx.%llx "
620                                      "frag %u mds%d (%d/%d)\n",
621                                      inode, ceph_vinop(inode),
622                                      frag.frag, frag.mds,
623                                      (int)r, frag.ndist);
624                                 return mds;
625                         }
626
627                         /* since this file/dir wasn't known to be
628                          * replicated, then we want to look for the
629                          * authoritative mds. */
630                         mode = USE_AUTH_MDS;
631                         if (frag.mds >= 0) {
632                                 /* choose auth mds */
633                                 mds = frag.mds;
634                                 dout("choose_mds %p %llx.%llx "
635                                      "frag %u mds%d (auth)\n",
636                                      inode, ceph_vinop(inode), frag.frag, mds);
637                                 return mds;
638                         }
639                 }
640         }
641
642         spin_lock(&inode->i_lock);
643         cap = NULL;
644         if (mode == USE_AUTH_MDS)
645                 cap = ci->i_auth_cap;
646         if (!cap && !RB_EMPTY_ROOT(&ci->i_caps))
647                 cap = rb_entry(rb_first(&ci->i_caps), struct ceph_cap, ci_node);
648         if (!cap) {
649                 spin_unlock(&inode->i_lock);
650                 goto random;
651         }
652         mds = cap->session->s_mds;
653         dout("choose_mds %p %llx.%llx mds%d (%scap %p)\n",
654              inode, ceph_vinop(inode), mds,
655              cap == ci->i_auth_cap ? "auth " : "", cap);
656         spin_unlock(&inode->i_lock);
657         return mds;
658
659 random:
660         mds = ceph_mdsmap_get_random_mds(mdsc->mdsmap);
661         dout("choose_mds chose random mds%d\n", mds);
662         return mds;
663 }
664
665
666 /*
667  * session messages
668  */
669 static struct ceph_msg *create_session_msg(u32 op, u64 seq)
670 {
671         struct ceph_msg *msg;
672         struct ceph_mds_session_head *h;
673
674         msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), GFP_NOFS);
675         if (!msg) {
676                 pr_err("create_session_msg ENOMEM creating msg\n");
677                 return NULL;
678         }
679         h = msg->front.iov_base;
680         h->op = cpu_to_le32(op);
681         h->seq = cpu_to_le64(seq);
682         return msg;
683 }
684
685 /*
686  * send session open request.
687  *
688  * called under mdsc->mutex
689  */
690 static int __open_session(struct ceph_mds_client *mdsc,
691                           struct ceph_mds_session *session)
692 {
693         struct ceph_msg *msg;
694         int mstate;
695         int mds = session->s_mds;
696
697         /* wait for mds to go active? */
698         mstate = ceph_mdsmap_get_state(mdsc->mdsmap, mds);
699         dout("open_session to mds%d (%s)\n", mds,
700              ceph_mds_state_name(mstate));
701         session->s_state = CEPH_MDS_SESSION_OPENING;
702         session->s_renew_requested = jiffies;
703
704         /* send connect message */
705         msg = create_session_msg(CEPH_SESSION_REQUEST_OPEN, session->s_seq);
706         if (!msg)
707                 return -ENOMEM;
708         ceph_con_send(&session->s_con, msg);
709         return 0;
710 }
711
712 /*
713  * open sessions for any export targets for the given mds
714  *
715  * called under mdsc->mutex
716  */
717 static void __open_export_target_sessions(struct ceph_mds_client *mdsc,
718                                           struct ceph_mds_session *session)
719 {
720         struct ceph_mds_info *mi;
721         struct ceph_mds_session *ts;
722         int i, mds = session->s_mds;
723         int target;
724
725         if (mds >= mdsc->mdsmap->m_max_mds)
726                 return;
727         mi = &mdsc->mdsmap->m_info[mds];
728         dout("open_export_target_sessions for mds%d (%d targets)\n",
729              session->s_mds, mi->num_export_targets);
730
731         for (i = 0; i < mi->num_export_targets; i++) {
732                 target = mi->export_targets[i];
733                 ts = __ceph_lookup_mds_session(mdsc, target);
734                 if (!ts) {
735                         ts = register_session(mdsc, target);
736                         if (IS_ERR(ts))
737                                 return;
738                 }
739                 if (session->s_state == CEPH_MDS_SESSION_NEW ||
740                     session->s_state == CEPH_MDS_SESSION_CLOSING)
741                         __open_session(mdsc, session);
742                 else
743                         dout(" mds%d target mds%d %p is %s\n", session->s_mds,
744                              i, ts, session_state_name(ts->s_state));
745                 ceph_put_mds_session(ts);
746         }
747 }
748
749 void ceph_mdsc_open_export_target_sessions(struct ceph_mds_client *mdsc,
750                                            struct ceph_mds_session *session)
751 {
752         mutex_lock(&mdsc->mutex);
753         __open_export_target_sessions(mdsc, session);
754         mutex_unlock(&mdsc->mutex);
755 }
756
757 /*
758  * session caps
759  */
760
761 /*
762  * Free preallocated cap messages assigned to this session
763  */
764 static void cleanup_cap_releases(struct ceph_mds_session *session)
765 {
766         struct ceph_msg *msg;
767
768         spin_lock(&session->s_cap_lock);
769         while (!list_empty(&session->s_cap_releases)) {
770                 msg = list_first_entry(&session->s_cap_releases,
771                                        struct ceph_msg, list_head);
772                 list_del_init(&msg->list_head);
773                 ceph_msg_put(msg);
774         }
775         while (!list_empty(&session->s_cap_releases_done)) {
776                 msg = list_first_entry(&session->s_cap_releases_done,
777                                        struct ceph_msg, list_head);
778                 list_del_init(&msg->list_head);
779                 ceph_msg_put(msg);
780         }
781         spin_unlock(&session->s_cap_lock);
782 }
783
784 /*
785  * Helper to safely iterate over all caps associated with a session, with
786  * special care taken to handle a racing __ceph_remove_cap().
787  *
788  * Caller must hold session s_mutex.
789  */
790 static int iterate_session_caps(struct ceph_mds_session *session,
791                                  int (*cb)(struct inode *, struct ceph_cap *,
792                                             void *), void *arg)
793 {
794         struct list_head *p;
795         struct ceph_cap *cap;
796         struct inode *inode, *last_inode = NULL;
797         struct ceph_cap *old_cap = NULL;
798         int ret;
799
800         dout("iterate_session_caps %p mds%d\n", session, session->s_mds);
801         spin_lock(&session->s_cap_lock);
802         p = session->s_caps.next;
803         while (p != &session->s_caps) {
804                 cap = list_entry(p, struct ceph_cap, session_caps);
805                 inode = igrab(&cap->ci->vfs_inode);
806                 if (!inode) {
807                         p = p->next;
808                         continue;
809                 }
810                 session->s_cap_iterator = cap;
811                 spin_unlock(&session->s_cap_lock);
812
813                 if (last_inode) {
814                         iput(last_inode);
815                         last_inode = NULL;
816                 }
817                 if (old_cap) {
818                         ceph_put_cap(session->s_mdsc, old_cap);
819                         old_cap = NULL;
820                 }
821
822                 ret = cb(inode, cap, arg);
823                 last_inode = inode;
824
825                 spin_lock(&session->s_cap_lock);
826                 p = p->next;
827                 if (cap->ci == NULL) {
828                         dout("iterate_session_caps  finishing cap %p removal\n",
829                              cap);
830                         BUG_ON(cap->session != session);
831                         list_del_init(&cap->session_caps);
832                         session->s_nr_caps--;
833                         cap->session = NULL;
834                         old_cap = cap;  /* put_cap it w/o locks held */
835                 }
836                 if (ret < 0)
837                         goto out;
838         }
839         ret = 0;
840 out:
841         session->s_cap_iterator = NULL;
842         spin_unlock(&session->s_cap_lock);
843
844         if (last_inode)
845                 iput(last_inode);
846         if (old_cap)
847                 ceph_put_cap(session->s_mdsc, old_cap);
848
849         return ret;
850 }
851
852 static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
853                                   void *arg)
854 {
855         struct ceph_inode_info *ci = ceph_inode(inode);
856         int drop = 0;
857
858         dout("removing cap %p, ci is %p, inode is %p\n",
859              cap, ci, &ci->vfs_inode);
860         spin_lock(&inode->i_lock);
861         __ceph_remove_cap(cap);
862         if (!__ceph_is_any_real_caps(ci)) {
863                 struct ceph_mds_client *mdsc =
864                         &ceph_sb_to_client(inode->i_sb)->mdsc;
865
866                 spin_lock(&mdsc->cap_dirty_lock);
867                 if (!list_empty(&ci->i_dirty_item)) {
868                         pr_info(" dropping dirty %s state for %p %lld\n",
869                                 ceph_cap_string(ci->i_dirty_caps),
870                                 inode, ceph_ino(inode));
871                         ci->i_dirty_caps = 0;
872                         list_del_init(&ci->i_dirty_item);
873                         drop = 1;
874                 }
875                 if (!list_empty(&ci->i_flushing_item)) {
876                         pr_info(" dropping dirty+flushing %s state for %p %lld\n",
877                                 ceph_cap_string(ci->i_flushing_caps),
878                                 inode, ceph_ino(inode));
879                         ci->i_flushing_caps = 0;
880                         list_del_init(&ci->i_flushing_item);
881                         mdsc->num_cap_flushing--;
882                         drop = 1;
883                 }
884                 if (drop && ci->i_wrbuffer_ref) {
885                         pr_info(" dropping dirty data for %p %lld\n",
886                                 inode, ceph_ino(inode));
887                         ci->i_wrbuffer_ref = 0;
888                         ci->i_wrbuffer_ref_head = 0;
889                         drop++;
890                 }
891                 spin_unlock(&mdsc->cap_dirty_lock);
892         }
893         spin_unlock(&inode->i_lock);
894         while (drop--)
895                 iput(inode);
896         return 0;
897 }
898
899 /*
900  * caller must hold session s_mutex
901  */
902 static void remove_session_caps(struct ceph_mds_session *session)
903 {
904         dout("remove_session_caps on %p\n", session);
905         iterate_session_caps(session, remove_session_caps_cb, NULL);
906         BUG_ON(session->s_nr_caps > 0);
907         BUG_ON(!list_empty(&session->s_cap_flushing));
908         cleanup_cap_releases(session);
909 }
910
911 /*
912  * wake up any threads waiting on this session's caps.  if the cap is
913  * old (didn't get renewed on the client reconnect), remove it now.
914  *
915  * caller must hold s_mutex.
916  */
917 static int wake_up_session_cb(struct inode *inode, struct ceph_cap *cap,
918                               void *arg)
919 {
920         struct ceph_inode_info *ci = ceph_inode(inode);
921
922         wake_up_all(&ci->i_cap_wq);
923         if (arg) {
924                 spin_lock(&inode->i_lock);
925                 ci->i_wanted_max_size = 0;
926                 ci->i_requested_max_size = 0;
927                 spin_unlock(&inode->i_lock);
928         }
929         return 0;
930 }
931
932 static void wake_up_session_caps(struct ceph_mds_session *session,
933                                  int reconnect)
934 {
935         dout("wake_up_session_caps %p mds%d\n", session, session->s_mds);
936         iterate_session_caps(session, wake_up_session_cb,
937                              (void *)(unsigned long)reconnect);
938 }
939
940 /*
941  * Send periodic message to MDS renewing all currently held caps.  The
942  * ack will reset the expiration for all caps from this session.
943  *
944  * caller holds s_mutex
945  */
946 static int send_renew_caps(struct ceph_mds_client *mdsc,
947                            struct ceph_mds_session *session)
948 {
949         struct ceph_msg *msg;
950         int state;
951
952         if (time_after_eq(jiffies, session->s_cap_ttl) &&
953             time_after_eq(session->s_cap_ttl, session->s_renew_requested))
954                 pr_info("mds%d caps stale\n", session->s_mds);
955         session->s_renew_requested = jiffies;
956
957         /* do not try to renew caps until a recovering mds has reconnected
958          * with its clients. */
959         state = ceph_mdsmap_get_state(mdsc->mdsmap, session->s_mds);
960         if (state < CEPH_MDS_STATE_RECONNECT) {
961                 dout("send_renew_caps ignoring mds%d (%s)\n",
962                      session->s_mds, ceph_mds_state_name(state));
963                 return 0;
964         }
965
966         dout("send_renew_caps to mds%d (%s)\n", session->s_mds,
967                 ceph_mds_state_name(state));
968         msg = create_session_msg(CEPH_SESSION_REQUEST_RENEWCAPS,
969                                  ++session->s_renew_seq);
970         if (!msg)
971                 return -ENOMEM;
972         ceph_con_send(&session->s_con, msg);
973         return 0;
974 }
975
976 /*
977  * Note new cap ttl, and any transition from stale -> not stale (fresh?).
978  *
979  * Called under session->s_mutex
980  */
981 static void renewed_caps(struct ceph_mds_client *mdsc,
982                          struct ceph_mds_session *session, int is_renew)
983 {
984         int was_stale;
985         int wake = 0;
986
987         spin_lock(&session->s_cap_lock);
988         was_stale = is_renew && (session->s_cap_ttl == 0 ||
989                                  time_after_eq(jiffies, session->s_cap_ttl));
990
991         session->s_cap_ttl = session->s_renew_requested +
992                 mdsc->mdsmap->m_session_timeout*HZ;
993
994         if (was_stale) {
995                 if (time_before(jiffies, session->s_cap_ttl)) {
996                         pr_info("mds%d caps renewed\n", session->s_mds);
997                         wake = 1;
998                 } else {
999                         pr_info("mds%d caps still stale\n", session->s_mds);
1000                 }
1001         }
1002         dout("renewed_caps mds%d ttl now %lu, was %s, now %s\n",
1003              session->s_mds, session->s_cap_ttl, was_stale ? "stale" : "fresh",
1004              time_before(jiffies, session->s_cap_ttl) ? "stale" : "fresh");
1005         spin_unlock(&session->s_cap_lock);
1006
1007         if (wake)
1008                 wake_up_session_caps(session, 0);
1009 }
1010
1011 /*
1012  * send a session close request
1013  */
1014 static int request_close_session(struct ceph_mds_client *mdsc,
1015                                  struct ceph_mds_session *session)
1016 {
1017         struct ceph_msg *msg;
1018
1019         dout("request_close_session mds%d state %s seq %lld\n",
1020              session->s_mds, session_state_name(session->s_state),
1021              session->s_seq);
1022         msg = create_session_msg(CEPH_SESSION_REQUEST_CLOSE, session->s_seq);
1023         if (!msg)
1024                 return -ENOMEM;
1025         ceph_con_send(&session->s_con, msg);
1026         return 0;
1027 }
1028
1029 /*
1030  * Called with s_mutex held.
1031  */
1032 static int __close_session(struct ceph_mds_client *mdsc,
1033                          struct ceph_mds_session *session)
1034 {
1035         if (session->s_state >= CEPH_MDS_SESSION_CLOSING)
1036                 return 0;
1037         session->s_state = CEPH_MDS_SESSION_CLOSING;
1038         return request_close_session(mdsc, session);
1039 }
1040
1041 /*
1042  * Trim old(er) caps.
1043  *
1044  * Because we can't cache an inode without one or more caps, we do
1045  * this indirectly: if a cap is unused, we prune its aliases, at which
1046  * point the inode will hopefully get dropped to.
1047  *
1048  * Yes, this is a bit sloppy.  Our only real goal here is to respond to
1049  * memory pressure from the MDS, though, so it needn't be perfect.
1050  */
1051 static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg)
1052 {
1053         struct ceph_mds_session *session = arg;
1054         struct ceph_inode_info *ci = ceph_inode(inode);
1055         int used, oissued, mine;
1056
1057         if (session->s_trim_caps <= 0)
1058                 return -1;
1059
1060         spin_lock(&inode->i_lock);
1061         mine = cap->issued | cap->implemented;
1062         used = __ceph_caps_used(ci);
1063         oissued = __ceph_caps_issued_other(ci, cap);
1064
1065         dout("trim_caps_cb %p cap %p mine %s oissued %s used %s\n",
1066              inode, cap, ceph_cap_string(mine), ceph_cap_string(oissued),
1067              ceph_cap_string(used));
1068         if (ci->i_dirty_caps)
1069                 goto out;   /* dirty caps */
1070         if ((used & ~oissued) & mine)
1071                 goto out;   /* we need these caps */
1072
1073         session->s_trim_caps--;
1074         if (oissued) {
1075                 /* we aren't the only cap.. just remove us */
1076                 __ceph_remove_cap(cap);
1077         } else {
1078                 /* try to drop referring dentries */
1079                 spin_unlock(&inode->i_lock);
1080                 d_prune_aliases(inode);
1081                 dout("trim_caps_cb %p cap %p  pruned, count now %d\n",
1082                      inode, cap, atomic_read(&inode->i_count));
1083                 return 0;
1084         }
1085
1086 out:
1087         spin_unlock(&inode->i_lock);
1088         return 0;
1089 }
1090
1091 /*
1092  * Trim session cap count down to some max number.
1093  */
1094 static int trim_caps(struct ceph_mds_client *mdsc,
1095                      struct ceph_mds_session *session,
1096                      int max_caps)
1097 {
1098         int trim_caps = session->s_nr_caps - max_caps;
1099
1100         dout("trim_caps mds%d start: %d / %d, trim %d\n",
1101              session->s_mds, session->s_nr_caps, max_caps, trim_caps);
1102         if (trim_caps > 0) {
1103                 session->s_trim_caps = trim_caps;
1104                 iterate_session_caps(session, trim_caps_cb, session);
1105                 dout("trim_caps mds%d done: %d / %d, trimmed %d\n",
1106                      session->s_mds, session->s_nr_caps, max_caps,
1107                         trim_caps - session->s_trim_caps);
1108                 session->s_trim_caps = 0;
1109         }
1110         return 0;
1111 }
1112
1113 /*
1114  * Allocate cap_release messages.  If there is a partially full message
1115  * in the queue, try to allocate enough to cover it's remainder, so that
1116  * we can send it immediately.
1117  *
1118  * Called under s_mutex.
1119  */
1120 int ceph_add_cap_releases(struct ceph_mds_client *mdsc,
1121                           struct ceph_mds_session *session)
1122 {
1123         struct ceph_msg *msg, *partial = NULL;
1124         struct ceph_mds_cap_release *head;
1125         int err = -ENOMEM;
1126         int extra = mdsc->client->mount_args->cap_release_safety;
1127         int num;
1128
1129         dout("add_cap_releases %p mds%d extra %d\n", session, session->s_mds,
1130              extra);
1131
1132         spin_lock(&session->s_cap_lock);
1133
1134         if (!list_empty(&session->s_cap_releases)) {
1135                 msg = list_first_entry(&session->s_cap_releases,
1136                                        struct ceph_msg,
1137                                  list_head);
1138                 head = msg->front.iov_base;
1139                 num = le32_to_cpu(head->num);
1140                 if (num) {
1141                         dout(" partial %p with (%d/%d)\n", msg, num,
1142                              (int)CEPH_CAPS_PER_RELEASE);
1143                         extra += CEPH_CAPS_PER_RELEASE - num;
1144                         partial = msg;
1145                 }
1146         }
1147         while (session->s_num_cap_releases < session->s_nr_caps + extra) {
1148                 spin_unlock(&session->s_cap_lock);
1149                 msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE, PAGE_CACHE_SIZE,
1150                                    GFP_NOFS);
1151                 if (!msg)
1152                         goto out_unlocked;
1153                 dout("add_cap_releases %p msg %p now %d\n", session, msg,
1154                      (int)msg->front.iov_len);
1155                 head = msg->front.iov_base;
1156                 head->num = cpu_to_le32(0);
1157                 msg->front.iov_len = sizeof(*head);
1158                 spin_lock(&session->s_cap_lock);
1159                 list_add(&msg->list_head, &session->s_cap_releases);
1160                 session->s_num_cap_releases += CEPH_CAPS_PER_RELEASE;
1161         }
1162
1163         if (partial) {
1164                 head = partial->front.iov_base;
1165                 num = le32_to_cpu(head->num);
1166                 dout(" queueing partial %p with %d/%d\n", partial, num,
1167                      (int)CEPH_CAPS_PER_RELEASE);
1168                 list_move_tail(&partial->list_head,
1169                                &session->s_cap_releases_done);
1170                 session->s_num_cap_releases -= CEPH_CAPS_PER_RELEASE - num;
1171         }
1172         err = 0;
1173         spin_unlock(&session->s_cap_lock);
1174 out_unlocked:
1175         return err;
1176 }
1177
1178 /*
1179  * flush all dirty inode data to disk.
1180  *
1181  * returns true if we've flushed through want_flush_seq
1182  */
1183 static int check_cap_flush(struct ceph_mds_client *mdsc, u64 want_flush_seq)
1184 {
1185         int mds, ret = 1;
1186
1187         dout("check_cap_flush want %lld\n", want_flush_seq);
1188         mutex_lock(&mdsc->mutex);
1189         for (mds = 0; ret && mds < mdsc->max_sessions; mds++) {
1190                 struct ceph_mds_session *session = mdsc->sessions[mds];
1191
1192                 if (!session)
1193                         continue;
1194                 get_session(session);
1195                 mutex_unlock(&mdsc->mutex);
1196
1197                 mutex_lock(&session->s_mutex);
1198                 if (!list_empty(&session->s_cap_flushing)) {
1199                         struct ceph_inode_info *ci =
1200                                 list_entry(session->s_cap_flushing.next,
1201                                            struct ceph_inode_info,
1202                                            i_flushing_item);
1203                         struct inode *inode = &ci->vfs_inode;
1204
1205                         spin_lock(&inode->i_lock);
1206                         if (ci->i_cap_flush_seq <= want_flush_seq) {
1207                                 dout("check_cap_flush still flushing %p "
1208                                      "seq %lld <= %lld to mds%d\n", inode,
1209                                      ci->i_cap_flush_seq, want_flush_seq,
1210                                      session->s_mds);
1211                                 ret = 0;
1212                         }
1213                         spin_unlock(&inode->i_lock);
1214                 }
1215                 mutex_unlock(&session->s_mutex);
1216                 ceph_put_mds_session(session);
1217
1218                 if (!ret)
1219                         return ret;
1220                 mutex_lock(&mdsc->mutex);
1221         }
1222
1223         mutex_unlock(&mdsc->mutex);
1224         dout("check_cap_flush ok, flushed thru %lld\n", want_flush_seq);
1225         return ret;
1226 }
1227
1228 /*
1229  * called under s_mutex
1230  */
1231 void ceph_send_cap_releases(struct ceph_mds_client *mdsc,
1232                             struct ceph_mds_session *session)
1233 {
1234         struct ceph_msg *msg;
1235
1236         dout("send_cap_releases mds%d\n", session->s_mds);
1237         spin_lock(&session->s_cap_lock);
1238         while (!list_empty(&session->s_cap_releases_done)) {
1239                 msg = list_first_entry(&session->s_cap_releases_done,
1240                                  struct ceph_msg, list_head);
1241                 list_del_init(&msg->list_head);
1242                 spin_unlock(&session->s_cap_lock);
1243                 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
1244                 dout("send_cap_releases mds%d %p\n", session->s_mds, msg);
1245                 ceph_con_send(&session->s_con, msg);
1246                 spin_lock(&session->s_cap_lock);
1247         }
1248         spin_unlock(&session->s_cap_lock);
1249 }
1250
1251 static void discard_cap_releases(struct ceph_mds_client *mdsc,
1252                                  struct ceph_mds_session *session)
1253 {
1254         struct ceph_msg *msg;
1255         struct ceph_mds_cap_release *head;
1256         unsigned num;
1257
1258         dout("discard_cap_releases mds%d\n", session->s_mds);
1259         spin_lock(&session->s_cap_lock);
1260
1261         /* zero out the in-progress message */
1262         msg = list_first_entry(&session->s_cap_releases,
1263                                struct ceph_msg, list_head);
1264         head = msg->front.iov_base;
1265         num = le32_to_cpu(head->num);
1266         dout("discard_cap_releases mds%d %p %u\n", session->s_mds, msg, num);
1267         head->num = cpu_to_le32(0);
1268         session->s_num_cap_releases += num;
1269
1270         /* requeue completed messages */
1271         while (!list_empty(&session->s_cap_releases_done)) {
1272                 msg = list_first_entry(&session->s_cap_releases_done,
1273                                  struct ceph_msg, list_head);
1274                 list_del_init(&msg->list_head);
1275
1276                 head = msg->front.iov_base;
1277                 num = le32_to_cpu(head->num);
1278                 dout("discard_cap_releases mds%d %p %u\n", session->s_mds, msg,
1279                      num);
1280                 session->s_num_cap_releases += num;
1281                 head->num = cpu_to_le32(0);
1282                 msg->front.iov_len = sizeof(*head);
1283                 list_add(&msg->list_head, &session->s_cap_releases);
1284         }
1285
1286         spin_unlock(&session->s_cap_lock);
1287 }
1288
1289 /*
1290  * requests
1291  */
1292
1293 /*
1294  * Create an mds request.
1295  */
1296 struct ceph_mds_request *
1297 ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode)
1298 {
1299         struct ceph_mds_request *req = kzalloc(sizeof(*req), GFP_NOFS);
1300
1301         if (!req)
1302                 return ERR_PTR(-ENOMEM);
1303
1304         mutex_init(&req->r_fill_mutex);
1305         req->r_mdsc = mdsc;
1306         req->r_started = jiffies;
1307         req->r_resend_mds = -1;
1308         INIT_LIST_HEAD(&req->r_unsafe_dir_item);
1309         req->r_fmode = -1;
1310         kref_init(&req->r_kref);
1311         INIT_LIST_HEAD(&req->r_wait);
1312         init_completion(&req->r_completion);
1313         init_completion(&req->r_safe_completion);
1314         INIT_LIST_HEAD(&req->r_unsafe_item);
1315
1316         req->r_op = op;
1317         req->r_direct_mode = mode;
1318         return req;
1319 }
1320
1321 /*
1322  * return oldest (lowest) request, tid in request tree, 0 if none.
1323  *
1324  * called under mdsc->mutex.
1325  */
1326 static struct ceph_mds_request *__get_oldest_req(struct ceph_mds_client *mdsc)
1327 {
1328         if (RB_EMPTY_ROOT(&mdsc->request_tree))
1329                 return NULL;
1330         return rb_entry(rb_first(&mdsc->request_tree),
1331                         struct ceph_mds_request, r_node);
1332 }
1333
1334 static u64 __get_oldest_tid(struct ceph_mds_client *mdsc)
1335 {
1336         struct ceph_mds_request *req = __get_oldest_req(mdsc);
1337
1338         if (req)
1339                 return req->r_tid;
1340         return 0;
1341 }
1342
1343 /*
1344  * Build a dentry's path.  Allocate on heap; caller must kfree.  Based
1345  * on build_path_from_dentry in fs/cifs/dir.c.
1346  *
1347  * If @stop_on_nosnap, generate path relative to the first non-snapped
1348  * inode.
1349  *
1350  * Encode hidden .snap dirs as a double /, i.e.
1351  *   foo/.snap/bar -> foo//bar
1352  */
1353 char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *base,
1354                            int stop_on_nosnap)
1355 {
1356         struct dentry *temp;
1357         char *path;
1358         int len, pos;
1359
1360         if (dentry == NULL)
1361                 return ERR_PTR(-EINVAL);
1362
1363 retry:
1364         len = 0;
1365         for (temp = dentry; !IS_ROOT(temp);) {
1366                 struct inode *inode = temp->d_inode;
1367                 if (inode && ceph_snap(inode) == CEPH_SNAPDIR)
1368                         len++;  /* slash only */
1369                 else if (stop_on_nosnap && inode &&
1370                          ceph_snap(inode) == CEPH_NOSNAP)
1371                         break;
1372                 else
1373                         len += 1 + temp->d_name.len;
1374                 temp = temp->d_parent;
1375                 if (temp == NULL) {
1376                         pr_err("build_path corrupt dentry %p\n", dentry);
1377                         return ERR_PTR(-EINVAL);
1378                 }
1379         }
1380         if (len)
1381                 len--;  /* no leading '/' */
1382
1383         path = kmalloc(len+1, GFP_NOFS);
1384         if (path == NULL)
1385                 return ERR_PTR(-ENOMEM);
1386         pos = len;
1387         path[pos] = 0;  /* trailing null */
1388         for (temp = dentry; !IS_ROOT(temp) && pos != 0; ) {
1389                 struct inode *inode = temp->d_inode;
1390
1391                 if (inode && ceph_snap(inode) == CEPH_SNAPDIR) {
1392                         dout("build_path path+%d: %p SNAPDIR\n",
1393                              pos, temp);
1394                 } else if (stop_on_nosnap && inode &&
1395                            ceph_snap(inode) == CEPH_NOSNAP) {
1396                         break;
1397                 } else {
1398                         pos -= temp->d_name.len;
1399                         if (pos < 0)
1400                                 break;
1401                         strncpy(path + pos, temp->d_name.name,
1402                                 temp->d_name.len);
1403                 }
1404                 if (pos)
1405                         path[--pos] = '/';
1406                 temp = temp->d_parent;
1407                 if (temp == NULL) {
1408                         pr_err("build_path corrupt dentry\n");
1409                         kfree(path);
1410                         return ERR_PTR(-EINVAL);
1411                 }
1412         }
1413         if (pos != 0) {
1414                 pr_err("build_path did not end path lookup where "
1415                        "expected, namelen is %d, pos is %d\n", len, pos);
1416                 /* presumably this is only possible if racing with a
1417                    rename of one of the parent directories (we can not
1418                    lock the dentries above us to prevent this, but
1419                    retrying should be harmless) */
1420                 kfree(path);
1421                 goto retry;
1422         }
1423
1424         *base = ceph_ino(temp->d_inode);
1425         *plen = len;
1426         dout("build_path on %p %d built %llx '%.*s'\n",
1427              dentry, atomic_read(&dentry->d_count), *base, len, path);
1428         return path;
1429 }
1430
1431 static int build_dentry_path(struct dentry *dentry,
1432                              const char **ppath, int *ppathlen, u64 *pino,
1433                              int *pfreepath)
1434 {
1435         char *path;
1436
1437         if (ceph_snap(dentry->d_parent->d_inode) == CEPH_NOSNAP) {
1438                 *pino = ceph_ino(dentry->d_parent->d_inode);
1439                 *ppath = dentry->d_name.name;
1440                 *ppathlen = dentry->d_name.len;
1441                 return 0;
1442         }
1443         path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1);
1444         if (IS_ERR(path))
1445                 return PTR_ERR(path);
1446         *ppath = path;
1447         *pfreepath = 1;
1448         return 0;
1449 }
1450
1451 static int build_inode_path(struct inode *inode,
1452                             const char **ppath, int *ppathlen, u64 *pino,
1453                             int *pfreepath)
1454 {
1455         struct dentry *dentry;
1456         char *path;
1457
1458         if (ceph_snap(inode) == CEPH_NOSNAP) {
1459                 *pino = ceph_ino(inode);
1460                 *ppathlen = 0;
1461                 return 0;
1462         }
1463         dentry = d_find_alias(inode);
1464         path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1);
1465         dput(dentry);
1466         if (IS_ERR(path))
1467                 return PTR_ERR(path);
1468         *ppath = path;
1469         *pfreepath = 1;
1470         return 0;
1471 }
1472
1473 /*
1474  * request arguments may be specified via an inode *, a dentry *, or
1475  * an explicit ino+path.
1476  */
1477 static int set_request_path_attr(struct inode *rinode, struct dentry *rdentry,
1478                                   const char *rpath, u64 rino,
1479                                   const char **ppath, int *pathlen,
1480                                   u64 *ino, int *freepath)
1481 {
1482         int r = 0;
1483
1484         if (rinode) {
1485                 r = build_inode_path(rinode, ppath, pathlen, ino, freepath);
1486                 dout(" inode %p %llx.%llx\n", rinode, ceph_ino(rinode),
1487                      ceph_snap(rinode));
1488         } else if (rdentry) {
1489                 r = build_dentry_path(rdentry, ppath, pathlen, ino, freepath);
1490                 dout(" dentry %p %llx/%.*s\n", rdentry, *ino, *pathlen,
1491                      *ppath);
1492         } else if (rpath) {
1493                 *ino = rino;
1494                 *ppath = rpath;
1495                 *pathlen = strlen(rpath);
1496                 dout(" path %.*s\n", *pathlen, rpath);
1497         }
1498
1499         return r;
1500 }
1501
1502 /*
1503  * called under mdsc->mutex
1504  */
1505 static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
1506                                                struct ceph_mds_request *req,
1507                                                int mds)
1508 {
1509         struct ceph_msg *msg;
1510         struct ceph_mds_request_head *head;
1511         const char *path1 = NULL;
1512         const char *path2 = NULL;
1513         u64 ino1 = 0, ino2 = 0;
1514         int pathlen1 = 0, pathlen2 = 0;
1515         int freepath1 = 0, freepath2 = 0;
1516         int len;
1517         u16 releases;
1518         void *p, *end;
1519         int ret;
1520
1521         ret = set_request_path_attr(req->r_inode, req->r_dentry,
1522                               req->r_path1, req->r_ino1.ino,
1523                               &path1, &pathlen1, &ino1, &freepath1);
1524         if (ret < 0) {
1525                 msg = ERR_PTR(ret);
1526                 goto out;
1527         }
1528
1529         ret = set_request_path_attr(NULL, req->r_old_dentry,
1530                               req->r_path2, req->r_ino2.ino,
1531                               &path2, &pathlen2, &ino2, &freepath2);
1532         if (ret < 0) {
1533                 msg = ERR_PTR(ret);
1534                 goto out_free1;
1535         }
1536
1537         len = sizeof(*head) +
1538                 pathlen1 + pathlen2 + 2*(1 + sizeof(u32) + sizeof(u64));
1539
1540         /* calculate (max) length for cap releases */
1541         len += sizeof(struct ceph_mds_request_release) *
1542                 (!!req->r_inode_drop + !!req->r_dentry_drop +
1543                  !!req->r_old_inode_drop + !!req->r_old_dentry_drop);
1544         if (req->r_dentry_drop)
1545                 len += req->r_dentry->d_name.len;
1546         if (req->r_old_dentry_drop)
1547                 len += req->r_old_dentry->d_name.len;
1548
1549         msg = ceph_msg_new(CEPH_MSG_CLIENT_REQUEST, len, GFP_NOFS);
1550         if (!msg) {
1551                 msg = ERR_PTR(-ENOMEM);
1552                 goto out_free2;
1553         }
1554
1555         msg->hdr.tid = cpu_to_le64(req->r_tid);
1556
1557         head = msg->front.iov_base;
1558         p = msg->front.iov_base + sizeof(*head);
1559         end = msg->front.iov_base + msg->front.iov_len;
1560
1561         head->mdsmap_epoch = cpu_to_le32(mdsc->mdsmap->m_epoch);
1562         head->op = cpu_to_le32(req->r_op);
1563         head->caller_uid = cpu_to_le32(current_fsuid());
1564         head->caller_gid = cpu_to_le32(current_fsgid());
1565         head->args = req->r_args;
1566
1567         ceph_encode_filepath(&p, end, ino1, path1);
1568         ceph_encode_filepath(&p, end, ino2, path2);
1569
1570         /* make note of release offset, in case we need to replay */
1571         req->r_request_release_offset = p - msg->front.iov_base;
1572
1573         /* cap releases */
1574         releases = 0;
1575         if (req->r_inode_drop)
1576                 releases += ceph_encode_inode_release(&p,
1577                       req->r_inode ? req->r_inode : req->r_dentry->d_inode,
1578                       mds, req->r_inode_drop, req->r_inode_unless, 0);
1579         if (req->r_dentry_drop)
1580                 releases += ceph_encode_dentry_release(&p, req->r_dentry,
1581                        mds, req->r_dentry_drop, req->r_dentry_unless);
1582         if (req->r_old_dentry_drop)
1583                 releases += ceph_encode_dentry_release(&p, req->r_old_dentry,
1584                        mds, req->r_old_dentry_drop, req->r_old_dentry_unless);
1585         if (req->r_old_inode_drop)
1586                 releases += ceph_encode_inode_release(&p,
1587                       req->r_old_dentry->d_inode,
1588                       mds, req->r_old_inode_drop, req->r_old_inode_unless, 0);
1589         head->num_releases = cpu_to_le16(releases);
1590
1591         BUG_ON(p > end);
1592         msg->front.iov_len = p - msg->front.iov_base;
1593         msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
1594
1595         msg->pages = req->r_pages;
1596         msg->nr_pages = req->r_num_pages;
1597         msg->hdr.data_len = cpu_to_le32(req->r_data_len);
1598         msg->hdr.data_off = cpu_to_le16(0);
1599
1600 out_free2:
1601         if (freepath2)
1602                 kfree((char *)path2);
1603 out_free1:
1604         if (freepath1)
1605                 kfree((char *)path1);
1606 out:
1607         return msg;
1608 }
1609
1610 /*
1611  * called under mdsc->mutex if error, under no mutex if
1612  * success.
1613  */
1614 static void complete_request(struct ceph_mds_client *mdsc,
1615                              struct ceph_mds_request *req)
1616 {
1617         if (req->r_callback)
1618                 req->r_callback(mdsc, req);
1619         else
1620                 complete_all(&req->r_completion);
1621 }
1622
1623 /*
1624  * called under mdsc->mutex
1625  */
1626 static int __prepare_send_request(struct ceph_mds_client *mdsc,
1627                                   struct ceph_mds_request *req,
1628                                   int mds)
1629 {
1630         struct ceph_mds_request_head *rhead;
1631         struct ceph_msg *msg;
1632         int flags = 0;
1633
1634         req->r_mds = mds;
1635         req->r_attempts++;
1636         if (req->r_inode) {
1637                 struct ceph_cap *cap =
1638                         ceph_get_cap_for_mds(ceph_inode(req->r_inode), mds);
1639
1640                 if (cap)
1641                         req->r_sent_on_mseq = cap->mseq;
1642                 else
1643                         req->r_sent_on_mseq = -1;
1644         }
1645         dout("prepare_send_request %p tid %lld %s (attempt %d)\n", req,
1646              req->r_tid, ceph_mds_op_name(req->r_op), req->r_attempts);
1647
1648         if (req->r_got_unsafe) {
1649                 /*
1650                  * Replay.  Do not regenerate message (and rebuild
1651                  * paths, etc.); just use the original message.
1652                  * Rebuilding paths will break for renames because
1653                  * d_move mangles the src name.
1654                  */
1655                 msg = req->r_request;
1656                 rhead = msg->front.iov_base;
1657
1658                 flags = le32_to_cpu(rhead->flags);
1659                 flags |= CEPH_MDS_FLAG_REPLAY;
1660                 rhead->flags = cpu_to_le32(flags);
1661
1662                 if (req->r_target_inode)
1663                         rhead->ino = cpu_to_le64(ceph_ino(req->r_target_inode));
1664
1665                 rhead->num_retry = req->r_attempts - 1;
1666
1667                 /* remove cap/dentry releases from message */
1668                 rhead->num_releases = 0;
1669                 msg->hdr.front_len = cpu_to_le32(req->r_request_release_offset);
1670                 msg->front.iov_len = req->r_request_release_offset;
1671                 return 0;
1672         }
1673
1674         if (req->r_request) {
1675                 ceph_msg_put(req->r_request);
1676                 req->r_request = NULL;
1677         }
1678         msg = create_request_message(mdsc, req, mds);
1679         if (IS_ERR(msg)) {
1680                 req->r_err = PTR_ERR(msg);
1681                 complete_request(mdsc, req);
1682                 return PTR_ERR(msg);
1683         }
1684         req->r_request = msg;
1685
1686         rhead = msg->front.iov_base;
1687         rhead->oldest_client_tid = cpu_to_le64(__get_oldest_tid(mdsc));
1688         if (req->r_got_unsafe)
1689                 flags |= CEPH_MDS_FLAG_REPLAY;
1690         if (req->r_locked_dir)
1691                 flags |= CEPH_MDS_FLAG_WANT_DENTRY;
1692         rhead->flags = cpu_to_le32(flags);
1693         rhead->num_fwd = req->r_num_fwd;
1694         rhead->num_retry = req->r_attempts - 1;
1695         rhead->ino = 0;
1696
1697         dout(" r_locked_dir = %p\n", req->r_locked_dir);
1698         return 0;
1699 }
1700
1701 /*
1702  * send request, or put it on the appropriate wait list.
1703  */
1704 static int __do_request(struct ceph_mds_client *mdsc,
1705                         struct ceph_mds_request *req)
1706 {
1707         struct ceph_mds_session *session = NULL;
1708         int mds = -1;
1709         int err = -EAGAIN;
1710
1711         if (req->r_err || req->r_got_result)
1712                 goto out;
1713
1714         if (req->r_timeout &&
1715             time_after_eq(jiffies, req->r_started + req->r_timeout)) {
1716                 dout("do_request timed out\n");
1717                 err = -EIO;
1718                 goto finish;
1719         }
1720
1721         mds = __choose_mds(mdsc, req);
1722         if (mds < 0 ||
1723             ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) {
1724                 dout("do_request no mds or not active, waiting for map\n");
1725                 list_add(&req->r_wait, &mdsc->waiting_for_map);
1726                 goto out;
1727         }
1728
1729         /* get, open session */
1730         session = __ceph_lookup_mds_session(mdsc, mds);
1731         if (!session) {
1732                 session = register_session(mdsc, mds);
1733                 if (IS_ERR(session)) {
1734                         err = PTR_ERR(session);
1735                         goto finish;
1736                 }
1737         }
1738         dout("do_request mds%d session %p state %s\n", mds, session,
1739              session_state_name(session->s_state));
1740         if (session->s_state != CEPH_MDS_SESSION_OPEN &&
1741             session->s_state != CEPH_MDS_SESSION_HUNG) {
1742                 if (session->s_state == CEPH_MDS_SESSION_NEW ||
1743                     session->s_state == CEPH_MDS_SESSION_CLOSING)
1744                         __open_session(mdsc, session);
1745                 list_add(&req->r_wait, &session->s_waiting);
1746                 goto out_session;
1747         }
1748
1749         /* send request */
1750         req->r_session = get_session(session);
1751         req->r_resend_mds = -1;   /* forget any previous mds hint */
1752
1753         if (req->r_request_started == 0)   /* note request start time */
1754                 req->r_request_started = jiffies;
1755
1756         err = __prepare_send_request(mdsc, req, mds);
1757         if (!err) {
1758                 ceph_msg_get(req->r_request);
1759                 ceph_con_send(&session->s_con, req->r_request);
1760         }
1761
1762 out_session:
1763         ceph_put_mds_session(session);
1764 out:
1765         return err;
1766
1767 finish:
1768         req->r_err = err;
1769         complete_request(mdsc, req);
1770         goto out;
1771 }
1772
1773 /*
1774  * called under mdsc->mutex
1775  */
1776 static void __wake_requests(struct ceph_mds_client *mdsc,
1777                             struct list_head *head)
1778 {
1779         struct ceph_mds_request *req, *nreq;
1780
1781         list_for_each_entry_safe(req, nreq, head, r_wait) {
1782                 list_del_init(&req->r_wait);
1783                 __do_request(mdsc, req);
1784         }
1785 }
1786
1787 /*
1788  * Wake up threads with requests pending for @mds, so that they can
1789  * resubmit their requests to a possibly different mds.
1790  */
1791 static void kick_requests(struct ceph_mds_client *mdsc, int mds)
1792 {
1793         struct ceph_mds_request *req;
1794         struct rb_node *p;
1795
1796         dout("kick_requests mds%d\n", mds);
1797         for (p = rb_first(&mdsc->request_tree); p; p = rb_next(p)) {
1798                 req = rb_entry(p, struct ceph_mds_request, r_node);
1799                 if (req->r_got_unsafe)
1800                         continue;
1801                 if (req->r_session &&
1802                     req->r_session->s_mds == mds) {
1803                         dout(" kicking tid %llu\n", req->r_tid);
1804                         put_request_session(req);
1805                         __do_request(mdsc, req);
1806                 }
1807         }
1808 }
1809
1810 void ceph_mdsc_submit_request(struct ceph_mds_client *mdsc,
1811                               struct ceph_mds_request *req)
1812 {
1813         dout("submit_request on %p\n", req);
1814         mutex_lock(&mdsc->mutex);
1815         __register_request(mdsc, req, NULL);
1816         __do_request(mdsc, req);
1817         mutex_unlock(&mdsc->mutex);
1818 }
1819
1820 /*
1821  * Synchrously perform an mds request.  Take care of all of the
1822  * session setup, forwarding, retry details.
1823  */
1824 int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
1825                          struct inode *dir,
1826                          struct ceph_mds_request *req)
1827 {
1828         int err;
1829
1830         dout("do_request on %p\n", req);
1831
1832         /* take CAP_PIN refs for r_inode, r_locked_dir, r_old_dentry */
1833         if (req->r_inode)
1834                 ceph_get_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
1835         if (req->r_locked_dir)
1836                 ceph_get_cap_refs(ceph_inode(req->r_locked_dir), CEPH_CAP_PIN);
1837         if (req->r_old_dentry)
1838                 ceph_get_cap_refs(
1839                         ceph_inode(req->r_old_dentry->d_parent->d_inode),
1840                         CEPH_CAP_PIN);
1841
1842         /* issue */
1843         mutex_lock(&mdsc->mutex);
1844         __register_request(mdsc, req, dir);
1845         __do_request(mdsc, req);
1846
1847         if (req->r_err) {
1848                 err = req->r_err;
1849                 __unregister_request(mdsc, req);
1850                 dout("do_request early error %d\n", err);
1851                 goto out;
1852         }
1853
1854         /* wait */
1855         mutex_unlock(&mdsc->mutex);
1856         dout("do_request waiting\n");
1857         if (req->r_timeout) {
1858                 err = (long)wait_for_completion_killable_timeout(
1859                         &req->r_completion, req->r_timeout);
1860                 if (err == 0)
1861                         err = -EIO;
1862         } else {
1863                 err = wait_for_completion_killable(&req->r_completion);
1864         }
1865         dout("do_request waited, got %d\n", err);
1866         mutex_lock(&mdsc->mutex);
1867
1868         /* only abort if we didn't race with a real reply */
1869         if (req->r_got_result) {
1870                 err = le32_to_cpu(req->r_reply_info.head->result);
1871         } else if (err < 0) {
1872                 dout("aborted request %lld with %d\n", req->r_tid, err);
1873
1874                 /*
1875                  * ensure we aren't running concurrently with
1876                  * ceph_fill_trace or ceph_readdir_prepopulate, which
1877                  * rely on locks (dir mutex) held by our caller.
1878                  */
1879                 mutex_lock(&req->r_fill_mutex);
1880                 req->r_err = err;
1881                 req->r_aborted = true;
1882                 mutex_unlock(&req->r_fill_mutex);
1883
1884                 if (req->r_locked_dir &&
1885                     (req->r_op & CEPH_MDS_OP_WRITE))
1886                         ceph_invalidate_dir_request(req);
1887         } else {
1888                 err = req->r_err;
1889         }
1890
1891 out:
1892         mutex_unlock(&mdsc->mutex);
1893         dout("do_request %p done, result %d\n", req, err);
1894         return err;
1895 }
1896
1897 /*
1898  * Invalidate dir I_COMPLETE, dentry lease state on an aborted MDS
1899  * namespace request.
1900  */
1901 void ceph_invalidate_dir_request(struct ceph_mds_request *req)
1902 {
1903         struct inode *inode = req->r_locked_dir;
1904         struct ceph_inode_info *ci = ceph_inode(inode);
1905
1906         dout("invalidate_dir_request %p (I_COMPLETE, lease(s))\n", inode);
1907         spin_lock(&inode->i_lock);
1908         ci->i_ceph_flags &= ~CEPH_I_COMPLETE;
1909         ci->i_release_count++;
1910         spin_unlock(&inode->i_lock);
1911
1912         if (req->r_dentry)
1913                 ceph_invalidate_dentry_lease(req->r_dentry);
1914         if (req->r_old_dentry)
1915                 ceph_invalidate_dentry_lease(req->r_old_dentry);
1916 }
1917
1918 /*
1919  * Handle mds reply.
1920  *
1921  * We take the session mutex and parse and process the reply immediately.
1922  * This preserves the logical ordering of replies, capabilities, etc., sent
1923  * by the MDS as they are applied to our local cache.
1924  */
1925 static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
1926 {
1927         struct ceph_mds_client *mdsc = session->s_mdsc;
1928         struct ceph_mds_request *req;
1929         struct ceph_mds_reply_head *head = msg->front.iov_base;
1930         struct ceph_mds_reply_info_parsed *rinfo;  /* parsed reply info */
1931         u64 tid;
1932         int err, result;
1933         int mds = session->s_mds;
1934
1935         if (msg->front.iov_len < sizeof(*head)) {
1936                 pr_err("mdsc_handle_reply got corrupt (short) reply\n");
1937                 ceph_msg_dump(msg);
1938                 return;
1939         }
1940
1941         /* get request, session */
1942         tid = le64_to_cpu(msg->hdr.tid);
1943         mutex_lock(&mdsc->mutex);
1944         req = __lookup_request(mdsc, tid);
1945         if (!req) {
1946                 dout("handle_reply on unknown tid %llu\n", tid);
1947                 mutex_unlock(&mdsc->mutex);
1948                 return;
1949         }
1950         dout("handle_reply %p\n", req);
1951
1952         /* correct session? */
1953         if (req->r_session != session) {
1954                 pr_err("mdsc_handle_reply got %llu on session mds%d"
1955                        " not mds%d\n", tid, session->s_mds,
1956                        req->r_session ? req->r_session->s_mds : -1);
1957                 mutex_unlock(&mdsc->mutex);
1958                 goto out;
1959         }
1960
1961         /* dup? */
1962         if ((req->r_got_unsafe && !head->safe) ||
1963             (req->r_got_safe && head->safe)) {
1964                 pr_warning("got a dup %s reply on %llu from mds%d\n",
1965                            head->safe ? "safe" : "unsafe", tid, mds);
1966                 mutex_unlock(&mdsc->mutex);
1967                 goto out;
1968         }
1969         if (req->r_got_safe && !head->safe) {
1970                 pr_warning("got unsafe after safe on %llu from mds%d\n",
1971                            tid, mds);
1972                 mutex_unlock(&mdsc->mutex);
1973                 goto out;
1974         }
1975
1976         result = le32_to_cpu(head->result);
1977
1978         /*
1979          * Handle an ESTALE
1980          * if we're not talking to the authority, send to them
1981          * if the authority has changed while we weren't looking,
1982          * send to new authority
1983          * Otherwise we just have to return an ESTALE
1984          */
1985         if (result == -ESTALE) {
1986                 dout("got ESTALE on request %llu", req->r_tid);
1987                 if (!req->r_inode) ; //do nothing; not an authority problem
1988                 else if (req->r_direct_mode != USE_AUTH_MDS) {
1989                         dout("not using auth, setting for that now");
1990                         req->r_direct_mode = USE_AUTH_MDS;
1991                         __do_request(mdsc, req);
1992                         mutex_unlock(&mdsc->mutex);
1993                         goto out;
1994                 } else  {
1995                         struct ceph_inode_info *ci = ceph_inode(req->r_inode);
1996                         struct ceph_cap *cap =
1997                                 ceph_get_cap_for_mds(ci, req->r_mds);;
1998
1999                         dout("already using auth");
2000                         if ((!cap || cap != ci->i_auth_cap) ||
2001                             (cap->mseq != req->r_sent_on_mseq)) {
2002                                 dout("but cap changed, so resending");
2003                                 __do_request(mdsc, req);
2004                                 mutex_unlock(&mdsc->mutex);
2005                                 goto out;
2006                         }
2007                 }
2008                 dout("have to return ESTALE on request %llu", req->r_tid);
2009         }
2010
2011
2012         if (head->safe) {
2013                 req->r_got_safe = true;
2014                 __unregister_request(mdsc, req);
2015                 complete_all(&req->r_safe_completion);
2016
2017                 if (req->r_got_unsafe) {
2018                         /*
2019                          * We already handled the unsafe response, now do the
2020                          * cleanup.  No need to examine the response; the MDS
2021                          * doesn't include any result info in the safe
2022                          * response.  And even if it did, there is nothing
2023                          * useful we could do with a revised return value.
2024                          */
2025                         dout("got safe reply %llu, mds%d\n", tid, mds);
2026                         list_del_init(&req->r_unsafe_item);
2027
2028                         /* last unsafe request during umount? */
2029                         if (mdsc->stopping && !__get_oldest_req(mdsc))
2030                                 complete_all(&mdsc->safe_umount_waiters);
2031                         mutex_unlock(&mdsc->mutex);
2032                         goto out;
2033                 }
2034         } else {
2035                 req->r_got_unsafe = true;
2036                 list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe);
2037         }
2038
2039         dout("handle_reply tid %lld result %d\n", tid, result);
2040         rinfo = &req->r_reply_info;
2041         err = parse_reply_info(msg, rinfo);
2042         mutex_unlock(&mdsc->mutex);
2043
2044         mutex_lock(&session->s_mutex);
2045         if (err < 0) {
2046                 pr_err("mdsc_handle_reply got corrupt reply mds%d\n", mds);
2047                 ceph_msg_dump(msg);
2048                 goto out_err;
2049         }
2050
2051         /* snap trace */
2052         if (rinfo->snapblob_len) {
2053                 down_write(&mdsc->snap_rwsem);
2054                 ceph_update_snap_trace(mdsc, rinfo->snapblob,
2055                                rinfo->snapblob + rinfo->snapblob_len,
2056                                le32_to_cpu(head->op) == CEPH_MDS_OP_RMSNAP);
2057                 downgrade_write(&mdsc->snap_rwsem);
2058         } else {
2059                 down_read(&mdsc->snap_rwsem);
2060         }
2061
2062         /* insert trace into our cache */
2063         mutex_lock(&req->r_fill_mutex);
2064         err = ceph_fill_trace(mdsc->client->sb, req, req->r_session);
2065         if (err == 0) {
2066                 if (result == 0 && rinfo->dir_nr)
2067                         ceph_readdir_prepopulate(req, req->r_session);
2068                 ceph_unreserve_caps(mdsc, &req->r_caps_reservation);
2069         }
2070         mutex_unlock(&req->r_fill_mutex);
2071
2072         up_read(&mdsc->snap_rwsem);
2073 out_err:
2074         mutex_lock(&mdsc->mutex);
2075         if (!req->r_aborted) {
2076                 if (err) {
2077                         req->r_err = err;
2078                 } else {
2079                         req->r_reply = msg;
2080                         ceph_msg_get(msg);
2081                         req->r_got_result = true;
2082                 }
2083         } else {
2084                 dout("reply arrived after request %lld was aborted\n", tid);
2085         }
2086         mutex_unlock(&mdsc->mutex);
2087
2088         ceph_add_cap_releases(mdsc, req->r_session);
2089         mutex_unlock(&session->s_mutex);
2090
2091         /* kick calling process */
2092         complete_request(mdsc, req);
2093 out:
2094         ceph_mdsc_put_request(req);
2095         return;
2096 }
2097
2098
2099
2100 /*
2101  * handle mds notification that our request has been forwarded.
2102  */
2103 static void handle_forward(struct ceph_mds_client *mdsc,
2104                            struct ceph_mds_session *session,
2105                            struct ceph_msg *msg)
2106 {
2107         struct ceph_mds_request *req;
2108         u64 tid = le64_to_cpu(msg->hdr.tid);
2109         u32 next_mds;
2110         u32 fwd_seq;
2111         int err = -EINVAL;
2112         void *p = msg->front.iov_base;
2113         void *end = p + msg->front.iov_len;
2114
2115         ceph_decode_need(&p, end, 2*sizeof(u32), bad);
2116         next_mds = ceph_decode_32(&p);
2117         fwd_seq = ceph_decode_32(&p);
2118
2119         mutex_lock(&mdsc->mutex);
2120         req = __lookup_request(mdsc, tid);
2121         if (!req) {
2122                 dout("forward tid %llu to mds%d - req dne\n", tid, next_mds);
2123                 goto out;  /* dup reply? */
2124         }
2125
2126         if (req->r_aborted) {
2127                 dout("forward tid %llu aborted, unregistering\n", tid);
2128                 __unregister_request(mdsc, req);
2129         } else if (fwd_seq <= req->r_num_fwd) {
2130                 dout("forward tid %llu to mds%d - old seq %d <= %d\n",
2131                      tid, next_mds, req->r_num_fwd, fwd_seq);
2132         } else {
2133                 /* resend. forward race not possible; mds would drop */
2134                 dout("forward tid %llu to mds%d (we resend)\n", tid, next_mds);
2135                 BUG_ON(req->r_err);
2136                 BUG_ON(req->r_got_result);
2137                 req->r_num_fwd = fwd_seq;
2138                 req->r_resend_mds = next_mds;
2139                 put_request_session(req);
2140                 __do_request(mdsc, req);
2141         }
2142         ceph_mdsc_put_request(req);
2143 out:
2144         mutex_unlock(&mdsc->mutex);
2145         return;
2146
2147 bad:
2148         pr_err("mdsc_handle_forward decode error err=%d\n", err);
2149 }
2150
2151 /*
2152  * handle a mds session control message
2153  */
2154 static void handle_session(struct ceph_mds_session *session,
2155                            struct ceph_msg *msg)
2156 {
2157         struct ceph_mds_client *mdsc = session->s_mdsc;
2158         u32 op;
2159         u64 seq;
2160         int mds = session->s_mds;
2161         struct ceph_mds_session_head *h = msg->front.iov_base;
2162         int wake = 0;
2163
2164         /* decode */
2165         if (msg->front.iov_len != sizeof(*h))
2166                 goto bad;
2167         op = le32_to_cpu(h->op);
2168         seq = le64_to_cpu(h->seq);
2169
2170         mutex_lock(&mdsc->mutex);
2171         if (op == CEPH_SESSION_CLOSE)
2172                 __unregister_session(mdsc, session);
2173         /* FIXME: this ttl calculation is generous */
2174         session->s_ttl = jiffies + HZ*mdsc->mdsmap->m_session_autoclose;
2175         mutex_unlock(&mdsc->mutex);
2176
2177         mutex_lock(&session->s_mutex);
2178
2179         dout("handle_session mds%d %s %p state %s seq %llu\n",
2180              mds, ceph_session_op_name(op), session,
2181              session_state_name(session->s_state), seq);
2182
2183         if (session->s_state == CEPH_MDS_SESSION_HUNG) {
2184                 session->s_state = CEPH_MDS_SESSION_OPEN;
2185                 pr_info("mds%d came back\n", session->s_mds);
2186         }
2187
2188         switch (op) {
2189         case CEPH_SESSION_OPEN:
2190                 if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
2191                         pr_info("mds%d reconnect success\n", session->s_mds);
2192                 session->s_state = CEPH_MDS_SESSION_OPEN;
2193                 renewed_caps(mdsc, session, 0);
2194                 wake = 1;
2195                 if (mdsc->stopping)
2196                         __close_session(mdsc, session);
2197                 break;
2198
2199         case CEPH_SESSION_RENEWCAPS:
2200                 if (session->s_renew_seq == seq)
2201                         renewed_caps(mdsc, session, 1);
2202                 break;
2203
2204         case CEPH_SESSION_CLOSE:
2205                 if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
2206                         pr_info("mds%d reconnect denied\n", session->s_mds);
2207                 remove_session_caps(session);
2208                 wake = 1; /* for good measure */
2209                 complete_all(&mdsc->session_close_waiters);
2210                 kick_requests(mdsc, mds);
2211                 break;
2212
2213         case CEPH_SESSION_STALE:
2214                 pr_info("mds%d caps went stale, renewing\n",
2215                         session->s_mds);
2216                 spin_lock(&session->s_cap_lock);
2217                 session->s_cap_gen++;
2218                 session->s_cap_ttl = 0;
2219                 spin_unlock(&session->s_cap_lock);
2220                 send_renew_caps(mdsc, session);
2221                 break;
2222
2223         case CEPH_SESSION_RECALL_STATE:
2224                 trim_caps(mdsc, session, le32_to_cpu(h->max_caps));
2225                 break;
2226
2227         default:
2228                 pr_err("mdsc_handle_session bad op %d mds%d\n", op, mds);
2229                 WARN_ON(1);
2230         }
2231
2232         mutex_unlock(&session->s_mutex);
2233         if (wake) {
2234                 mutex_lock(&mdsc->mutex);
2235                 __wake_requests(mdsc, &session->s_waiting);
2236                 mutex_unlock(&mdsc->mutex);
2237         }
2238         return;
2239
2240 bad:
2241         pr_err("mdsc_handle_session corrupt message mds%d len %d\n", mds,
2242                (int)msg->front.iov_len);
2243         ceph_msg_dump(msg);
2244         return;
2245 }
2246
2247
2248 /*
2249  * called under session->mutex.
2250  */
2251 static void replay_unsafe_requests(struct ceph_mds_client *mdsc,
2252                                    struct ceph_mds_session *session)
2253 {
2254         struct ceph_mds_request *req, *nreq;
2255         int err;
2256
2257         dout("replay_unsafe_requests mds%d\n", session->s_mds);
2258
2259         mutex_lock(&mdsc->mutex);
2260         list_for_each_entry_safe(req, nreq, &session->s_unsafe, r_unsafe_item) {
2261                 err = __prepare_send_request(mdsc, req, session->s_mds);
2262                 if (!err) {
2263                         ceph_msg_get(req->r_request);
2264                         ceph_con_send(&session->s_con, req->r_request);
2265                 }
2266         }
2267         mutex_unlock(&mdsc->mutex);
2268 }
2269
2270 /*
2271  * Encode information about a cap for a reconnect with the MDS.
2272  */
2273 static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
2274                           void *arg)
2275 {
2276         union {
2277                 struct ceph_mds_cap_reconnect v2;
2278                 struct ceph_mds_cap_reconnect_v1 v1;
2279         } rec;
2280         size_t reclen;
2281         struct ceph_inode_info *ci;
2282         struct ceph_reconnect_state *recon_state = arg;
2283         struct ceph_pagelist *pagelist = recon_state->pagelist;
2284         char *path;
2285         int pathlen, err;
2286         u64 pathbase;
2287         struct dentry *dentry;
2288
2289         ci = cap->ci;
2290
2291         dout(" adding %p ino %llx.%llx cap %p %lld %s\n",
2292              inode, ceph_vinop(inode), cap, cap->cap_id,
2293              ceph_cap_string(cap->issued));
2294         err = ceph_pagelist_encode_64(pagelist, ceph_ino(inode));
2295         if (err)
2296                 return err;
2297
2298         dentry = d_find_alias(inode);
2299         if (dentry) {
2300                 path = ceph_mdsc_build_path(dentry, &pathlen, &pathbase, 0);
2301                 if (IS_ERR(path)) {
2302                         err = PTR_ERR(path);
2303                         BUG_ON(err);
2304                 }
2305         } else {
2306                 path = NULL;
2307                 pathlen = 0;
2308         }
2309         err = ceph_pagelist_encode_string(pagelist, path, pathlen);
2310         if (err)
2311                 goto out;
2312
2313         spin_lock(&inode->i_lock);
2314         cap->seq = 0;        /* reset cap seq */
2315         cap->issue_seq = 0;  /* and issue_seq */
2316
2317         if (recon_state->flock) {
2318                 rec.v2.cap_id = cpu_to_le64(cap->cap_id);
2319                 rec.v2.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
2320                 rec.v2.issued = cpu_to_le32(cap->issued);
2321                 rec.v2.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
2322                 rec.v2.pathbase = cpu_to_le64(pathbase);
2323                 rec.v2.flock_len = 0;
2324                 reclen = sizeof(rec.v2);
2325         } else {
2326                 rec.v1.cap_id = cpu_to_le64(cap->cap_id);
2327                 rec.v1.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
2328                 rec.v1.issued = cpu_to_le32(cap->issued);
2329                 rec.v1.size = cpu_to_le64(inode->i_size);
2330                 ceph_encode_timespec(&rec.v1.mtime, &inode->i_mtime);
2331                 ceph_encode_timespec(&rec.v1.atime, &inode->i_atime);
2332                 rec.v1.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
2333                 rec.v1.pathbase = cpu_to_le64(pathbase);
2334                 reclen = sizeof(rec.v1);
2335         }
2336         spin_unlock(&inode->i_lock);
2337
2338         err = ceph_pagelist_append(pagelist, &rec, reclen);
2339
2340 out:
2341         kfree(path);
2342         dput(dentry);
2343         return err;
2344 }
2345
2346
2347 /*
2348  * If an MDS fails and recovers, clients need to reconnect in order to
2349  * reestablish shared state.  This includes all caps issued through
2350  * this session _and_ the snap_realm hierarchy.  Because it's not
2351  * clear which snap realms the mds cares about, we send everything we
2352  * know about.. that ensures we'll then get any new info the
2353  * recovering MDS might have.
2354  *
2355  * This is a relatively heavyweight operation, but it's rare.
2356  *
2357  * called with mdsc->mutex held.
2358  */
2359 static void send_mds_reconnect(struct ceph_mds_client *mdsc,
2360                                struct ceph_mds_session *session)
2361 {
2362         struct ceph_msg *reply;
2363         struct rb_node *p;
2364         int mds = session->s_mds;
2365         int err = -ENOMEM;
2366         struct ceph_pagelist *pagelist;
2367         struct ceph_reconnect_state recon_state;
2368
2369         pr_info("mds%d reconnect start\n", mds);
2370
2371         pagelist = kmalloc(sizeof(*pagelist), GFP_NOFS);
2372         if (!pagelist)
2373                 goto fail_nopagelist;
2374         ceph_pagelist_init(pagelist);
2375
2376         reply = ceph_msg_new(CEPH_MSG_CLIENT_RECONNECT, 0, GFP_NOFS);
2377         if (!reply)
2378                 goto fail_nomsg;
2379
2380         mutex_lock(&session->s_mutex);
2381         session->s_state = CEPH_MDS_SESSION_RECONNECTING;
2382         session->s_seq = 0;
2383
2384         ceph_con_open(&session->s_con,
2385                       ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
2386
2387         /* replay unsafe requests */
2388         replay_unsafe_requests(mdsc, session);
2389
2390         down_read(&mdsc->snap_rwsem);
2391
2392         dout("session %p state %s\n", session,
2393              session_state_name(session->s_state));
2394
2395         /* drop old cap expires; we're about to reestablish that state */
2396         discard_cap_releases(mdsc, session);
2397
2398         /* traverse this session's caps */
2399         err = ceph_pagelist_encode_32(pagelist, session->s_nr_caps);
2400         if (err)
2401                 goto fail;
2402
2403         recon_state.pagelist = pagelist;
2404         recon_state.flock = session->s_con.peer_features & CEPH_FEATURE_FLOCK;
2405         err = iterate_session_caps(session, encode_caps_cb, &recon_state);
2406         if (err < 0)
2407                 goto fail;
2408
2409         /*
2410          * snaprealms.  we provide mds with the ino, seq (version), and
2411          * parent for all of our realms.  If the mds has any newer info,
2412          * it will tell us.
2413          */
2414         for (p = rb_first(&mdsc->snap_realms); p; p = rb_next(p)) {
2415                 struct ceph_snap_realm *realm =
2416                         rb_entry(p, struct ceph_snap_realm, node);
2417                 struct ceph_mds_snaprealm_reconnect sr_rec;
2418
2419                 dout(" adding snap realm %llx seq %lld parent %llx\n",
2420                      realm->ino, realm->seq, realm->parent_ino);
2421                 sr_rec.ino = cpu_to_le64(realm->ino);
2422                 sr_rec.seq = cpu_to_le64(realm->seq);
2423                 sr_rec.parent = cpu_to_le64(realm->parent_ino);
2424                 err = ceph_pagelist_append(pagelist, &sr_rec, sizeof(sr_rec));
2425                 if (err)
2426                         goto fail;
2427         }
2428
2429         reply->pagelist = pagelist;
2430         if (recon_state.flock)
2431                 reply->hdr.version = cpu_to_le16(2);
2432         reply->hdr.data_len = cpu_to_le32(pagelist->length);
2433         reply->nr_pages = calc_pages_for(0, pagelist->length);
2434         ceph_con_send(&session->s_con, reply);
2435
2436         mutex_unlock(&session->s_mutex);
2437
2438         mutex_lock(&mdsc->mutex);
2439         __wake_requests(mdsc, &session->s_waiting);
2440         mutex_unlock(&mdsc->mutex);
2441
2442         up_read(&mdsc->snap_rwsem);
2443         return;
2444
2445 fail:
2446         ceph_msg_put(reply);
2447         up_read(&mdsc->snap_rwsem);
2448         mutex_unlock(&session->s_mutex);
2449 fail_nomsg:
2450         ceph_pagelist_release(pagelist);
2451         kfree(pagelist);
2452 fail_nopagelist:
2453         pr_err("error %d preparing reconnect for mds%d\n", err, mds);
2454         return;
2455 }
2456
2457
2458 /*
2459  * compare old and new mdsmaps, kicking requests
2460  * and closing out old connections as necessary
2461  *
2462  * called under mdsc->mutex.
2463  */
2464 static void check_new_map(struct ceph_mds_client *mdsc,
2465                           struct ceph_mdsmap *newmap,
2466                           struct ceph_mdsmap *oldmap)
2467 {
2468         int i;
2469         int oldstate, newstate;
2470         struct ceph_mds_session *s;
2471
2472         dout("check_new_map new %u old %u\n",
2473              newmap->m_epoch, oldmap->m_epoch);
2474
2475         for (i = 0; i < oldmap->m_max_mds && i < mdsc->max_sessions; i++) {
2476                 if (mdsc->sessions[i] == NULL)
2477                         continue;
2478                 s = mdsc->sessions[i];
2479                 oldstate = ceph_mdsmap_get_state(oldmap, i);
2480                 newstate = ceph_mdsmap_get_state(newmap, i);
2481
2482                 dout("check_new_map mds%d state %s%s -> %s%s (session %s)\n",
2483                      i, ceph_mds_state_name(oldstate),
2484                      ceph_mdsmap_is_laggy(oldmap, i) ? " (laggy)" : "",
2485                      ceph_mds_state_name(newstate),
2486                      ceph_mdsmap_is_laggy(newmap, i) ? " (laggy)" : "",
2487                      session_state_name(s->s_state));
2488
2489                 if (memcmp(ceph_mdsmap_get_addr(oldmap, i),
2490                            ceph_mdsmap_get_addr(newmap, i),
2491                            sizeof(struct ceph_entity_addr))) {
2492                         if (s->s_state == CEPH_MDS_SESSION_OPENING) {
2493                                 /* the session never opened, just close it
2494                                  * out now */
2495                                 __wake_requests(mdsc, &s->s_waiting);
2496                                 __unregister_session(mdsc, s);
2497                         } else {
2498                                 /* just close it */
2499                                 mutex_unlock(&mdsc->mutex);
2500                                 mutex_lock(&s->s_mutex);
2501                                 mutex_lock(&mdsc->mutex);
2502                                 ceph_con_close(&s->s_con);
2503                                 mutex_unlock(&s->s_mutex);
2504                                 s->s_state = CEPH_MDS_SESSION_RESTARTING;
2505                         }
2506
2507                         /* kick any requests waiting on the recovering mds */
2508                         kick_requests(mdsc, i);
2509                 } else if (oldstate == newstate) {
2510                         continue;  /* nothing new with this mds */
2511                 }
2512
2513                 /*
2514                  * send reconnect?
2515                  */
2516                 if (s->s_state == CEPH_MDS_SESSION_RESTARTING &&
2517                     newstate >= CEPH_MDS_STATE_RECONNECT) {
2518                         mutex_unlock(&mdsc->mutex);
2519                         send_mds_reconnect(mdsc, s);
2520                         mutex_lock(&mdsc->mutex);
2521                 }
2522
2523                 /*
2524                  * kick request on any mds that has gone active.
2525                  */
2526                 if (oldstate < CEPH_MDS_STATE_ACTIVE &&
2527                     newstate >= CEPH_MDS_STATE_ACTIVE) {
2528                         if (oldstate != CEPH_MDS_STATE_CREATING &&
2529                             oldstate != CEPH_MDS_STATE_STARTING)
2530                                 pr_info("mds%d recovery completed\n", s->s_mds);
2531                         kick_requests(mdsc, i);
2532                         ceph_kick_flushing_caps(mdsc, s);
2533                         wake_up_session_caps(s, 1);
2534                 }
2535         }
2536
2537         for (i = 0; i < newmap->m_max_mds && i < mdsc->max_sessions; i++) {
2538                 s = mdsc->sessions[i];
2539                 if (!s)
2540                         continue;
2541                 if (!ceph_mdsmap_is_laggy(newmap, i))
2542                         continue;
2543                 if (s->s_state == CEPH_MDS_SESSION_OPEN ||
2544                     s->s_state == CEPH_MDS_SESSION_HUNG ||
2545                     s->s_state == CEPH_MDS_SESSION_CLOSING) {
2546                         dout(" connecting to export targets of laggy mds%d\n",
2547                              i);
2548                         __open_export_target_sessions(mdsc, s);
2549                 }
2550         }
2551 }
2552
2553
2554
2555 /*
2556  * leases
2557  */
2558
2559 /*
2560  * caller must hold session s_mutex, dentry->d_lock
2561  */
2562 void __ceph_mdsc_drop_dentry_lease(struct dentry *dentry)
2563 {
2564         struct ceph_dentry_info *di = ceph_dentry(dentry);
2565
2566         ceph_put_mds_session(di->lease_session);
2567         di->lease_session = NULL;
2568 }
2569
2570 static void handle_lease(struct ceph_mds_client *mdsc,
2571                          struct ceph_mds_session *session,
2572                          struct ceph_msg *msg)
2573 {
2574         struct super_block *sb = mdsc->client->sb;
2575         struct inode *inode;
2576         struct ceph_inode_info *ci;
2577         struct dentry *parent, *dentry;
2578         struct ceph_dentry_info *di;
2579         int mds = session->s_mds;
2580         struct ceph_mds_lease *h = msg->front.iov_base;
2581         u32 seq;
2582         struct ceph_vino vino;
2583         int mask;
2584         struct qstr dname;
2585         int release = 0;
2586
2587         dout("handle_lease from mds%d\n", mds);
2588
2589         /* decode */
2590         if (msg->front.iov_len < sizeof(*h) + sizeof(u32))
2591                 goto bad;
2592         vino.ino = le64_to_cpu(h->ino);
2593         vino.snap = CEPH_NOSNAP;
2594         mask = le16_to_cpu(h->mask);
2595         seq = le32_to_cpu(h->seq);
2596         dname.name = (void *)h + sizeof(*h) + sizeof(u32);
2597         dname.len = msg->front.iov_len - sizeof(*h) - sizeof(u32);
2598         if (dname.len != get_unaligned_le32(h+1))
2599                 goto bad;
2600
2601         mutex_lock(&session->s_mutex);
2602         session->s_seq++;
2603
2604         /* lookup inode */
2605         inode = ceph_find_inode(sb, vino);
2606         dout("handle_lease %s, mask %d, ino %llx %p %.*s\n",
2607              ceph_lease_op_name(h->action), mask, vino.ino, inode,
2608              dname.len, dname.name);
2609         if (inode == NULL) {
2610                 dout("handle_lease no inode %llx\n", vino.ino);
2611                 goto release;
2612         }
2613         ci = ceph_inode(inode);
2614
2615         /* dentry */
2616         parent = d_find_alias(inode);
2617         if (!parent) {
2618                 dout("no parent dentry on inode %p\n", inode);
2619                 WARN_ON(1);
2620                 goto release;  /* hrm... */
2621         }
2622         dname.hash = full_name_hash(dname.name, dname.len);
2623         dentry = d_lookup(parent, &dname);
2624         dput(parent);
2625         if (!dentry)
2626                 goto release;
2627
2628         spin_lock(&dentry->d_lock);
2629         di = ceph_dentry(dentry);
2630         switch (h->action) {
2631         case CEPH_MDS_LEASE_REVOKE:
2632                 if (di && di->lease_session == session) {
2633                         if (ceph_seq_cmp(di->lease_seq, seq) > 0)
2634                                 h->seq = cpu_to_le32(di->lease_seq);
2635                         __ceph_mdsc_drop_dentry_lease(dentry);
2636                 }
2637                 release = 1;
2638                 break;
2639
2640         case CEPH_MDS_LEASE_RENEW:
2641                 if (di && di->lease_session == session &&
2642                     di->lease_gen == session->s_cap_gen &&
2643                     di->lease_renew_from &&
2644                     di->lease_renew_after == 0) {
2645                         unsigned long duration =
2646                                 le32_to_cpu(h->duration_ms) * HZ / 1000;
2647
2648                         di->lease_seq = seq;
2649                         dentry->d_time = di->lease_renew_from + duration;
2650                         di->lease_renew_after = di->lease_renew_from +
2651                                 (duration >> 1);
2652                         di->lease_renew_from = 0;
2653                 }
2654                 break;
2655         }
2656         spin_unlock(&dentry->d_lock);
2657         dput(dentry);
2658
2659         if (!release)
2660                 goto out;
2661
2662 release:
2663         /* let's just reuse the same message */
2664         h->action = CEPH_MDS_LEASE_REVOKE_ACK;
2665         ceph_msg_get(msg);
2666         ceph_con_send(&session->s_con, msg);
2667
2668 out:
2669         iput(inode);
2670         mutex_unlock(&session->s_mutex);
2671         return;
2672
2673 bad:
2674         pr_err("corrupt lease message\n");
2675         ceph_msg_dump(msg);
2676 }
2677
2678 void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session,
2679                               struct inode *inode,
2680                               struct dentry *dentry, char action,
2681                               u32 seq)
2682 {
2683         struct ceph_msg *msg;
2684         struct ceph_mds_lease *lease;
2685         int len = sizeof(*lease) + sizeof(u32);
2686         int dnamelen = 0;
2687
2688         dout("lease_send_msg inode %p dentry %p %s to mds%d\n",
2689              inode, dentry, ceph_lease_op_name(action), session->s_mds);
2690         dnamelen = dentry->d_name.len;
2691         len += dnamelen;
2692
2693         msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, GFP_NOFS);
2694         if (!msg)
2695                 return;
2696         lease = msg->front.iov_base;
2697         lease->action = action;
2698         lease->mask = cpu_to_le16(1);
2699         lease->ino = cpu_to_le64(ceph_vino(inode).ino);
2700         lease->first = lease->last = cpu_to_le64(ceph_vino(inode).snap);
2701         lease->seq = cpu_to_le32(seq);
2702         put_unaligned_le32(dnamelen, lease + 1);
2703         memcpy((void *)(lease + 1) + 4, dentry->d_name.name, dnamelen);
2704
2705         /*
2706          * if this is a preemptive lease RELEASE, no need to
2707          * flush request stream, since the actual request will
2708          * soon follow.
2709          */
2710         msg->more_to_follow = (action == CEPH_MDS_LEASE_RELEASE);
2711
2712         ceph_con_send(&session->s_con, msg);
2713 }
2714
2715 /*
2716  * Preemptively release a lease we expect to invalidate anyway.
2717  * Pass @inode always, @dentry is optional.
2718  */
2719 void ceph_mdsc_lease_release(struct ceph_mds_client *mdsc, struct inode *inode,
2720                              struct dentry *dentry, int mask)
2721 {
2722         struct ceph_dentry_info *di;
2723         struct ceph_mds_session *session;
2724         u32 seq;
2725
2726         BUG_ON(inode == NULL);
2727         BUG_ON(dentry == NULL);
2728         BUG_ON(mask == 0);
2729
2730         /* is dentry lease valid? */
2731         spin_lock(&dentry->d_lock);
2732         di = ceph_dentry(dentry);
2733         if (!di || !di->lease_session ||
2734             di->lease_session->s_mds < 0 ||
2735             di->lease_gen != di->lease_session->s_cap_gen ||
2736             !time_before(jiffies, dentry->d_time)) {
2737                 dout("lease_release inode %p dentry %p -- "
2738                      "no lease on %d\n",
2739                      inode, dentry, mask);
2740                 spin_unlock(&dentry->d_lock);
2741                 return;
2742         }
2743
2744         /* we do have a lease on this dentry; note mds and seq */
2745         session = ceph_get_mds_session(di->lease_session);
2746         seq = di->lease_seq;
2747         __ceph_mdsc_drop_dentry_lease(dentry);
2748         spin_unlock(&dentry->d_lock);
2749
2750         dout("lease_release inode %p dentry %p mask %d to mds%d\n",
2751              inode, dentry, mask, session->s_mds);
2752         ceph_mdsc_lease_send_msg(session, inode, dentry,
2753                                  CEPH_MDS_LEASE_RELEASE, seq);
2754         ceph_put_mds_session(session);
2755 }
2756
2757 /*
2758  * drop all leases (and dentry refs) in preparation for umount
2759  */
2760 static void drop_leases(struct ceph_mds_client *mdsc)
2761 {
2762         int i;
2763
2764         dout("drop_leases\n");
2765         mutex_lock(&mdsc->mutex);
2766         for (i = 0; i < mdsc->max_sessions; i++) {
2767                 struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i);
2768                 if (!s)
2769                         continue;
2770                 mutex_unlock(&mdsc->mutex);
2771                 mutex_lock(&s->s_mutex);
2772                 mutex_unlock(&s->s_mutex);
2773                 ceph_put_mds_session(s);
2774                 mutex_lock(&mdsc->mutex);
2775         }
2776         mutex_unlock(&mdsc->mutex);
2777 }
2778
2779
2780
2781 /*
2782  * delayed work -- periodically trim expired leases, renew caps with mds
2783  */
2784 static void schedule_delayed(struct ceph_mds_client *mdsc)
2785 {
2786         int delay = 5;
2787         unsigned hz = round_jiffies_relative(HZ * delay);
2788         schedule_delayed_work(&mdsc->delayed_work, hz);
2789 }
2790
2791 static void delayed_work(struct work_struct *work)
2792 {
2793         int i;
2794         struct ceph_mds_client *mdsc =
2795                 container_of(work, struct ceph_mds_client, delayed_work.work);
2796         int renew_interval;
2797         int renew_caps;
2798
2799         dout("mdsc delayed_work\n");
2800         ceph_check_delayed_caps(mdsc);
2801
2802         mutex_lock(&mdsc->mutex);
2803         renew_interval = mdsc->mdsmap->m_session_timeout >> 2;
2804         renew_caps = time_after_eq(jiffies, HZ*renew_interval +
2805                                    mdsc->last_renew_caps);
2806         if (renew_caps)
2807                 mdsc->last_renew_caps = jiffies;
2808
2809         for (i = 0; i < mdsc->max_sessions; i++) {
2810                 struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i);
2811                 if (s == NULL)
2812                         continue;
2813                 if (s->s_state == CEPH_MDS_SESSION_CLOSING) {
2814                         dout("resending session close request for mds%d\n",
2815                              s->s_mds);
2816                         request_close_session(mdsc, s);
2817                         ceph_put_mds_session(s);
2818                         continue;
2819                 }
2820                 if (s->s_ttl && time_after(jiffies, s->s_ttl)) {
2821                         if (s->s_state == CEPH_MDS_SESSION_OPEN) {
2822                                 s->s_state = CEPH_MDS_SESSION_HUNG;
2823                                 pr_info("mds%d hung\n", s->s_mds);
2824                         }
2825                 }
2826                 if (s->s_state < CEPH_MDS_SESSION_OPEN) {
2827                         /* this mds is failed or recovering, just wait */
2828                         ceph_put_mds_session(s);
2829                         continue;
2830                 }
2831                 mutex_unlock(&mdsc->mutex);
2832
2833                 mutex_lock(&s->s_mutex);
2834                 if (renew_caps)
2835                         send_renew_caps(mdsc, s);
2836                 else
2837                         ceph_con_keepalive(&s->s_con);
2838                 ceph_add_cap_releases(mdsc, s);
2839                 if (s->s_state == CEPH_MDS_SESSION_OPEN ||
2840                     s->s_state == CEPH_MDS_SESSION_HUNG)
2841                         ceph_send_cap_releases(mdsc, s);
2842                 mutex_unlock(&s->s_mutex);
2843                 ceph_put_mds_session(s);
2844
2845                 mutex_lock(&mdsc->mutex);
2846         }
2847         mutex_unlock(&mdsc->mutex);
2848
2849         schedule_delayed(mdsc);
2850 }
2851
2852
2853 int ceph_mdsc_init(struct ceph_mds_client *mdsc, struct ceph_client *client)
2854 {
2855         mdsc->client = client;
2856         mutex_init(&mdsc->mutex);
2857         mdsc->mdsmap = kzalloc(sizeof(*mdsc->mdsmap), GFP_NOFS);
2858         if (mdsc->mdsmap == NULL)
2859                 return -ENOMEM;
2860
2861         init_completion(&mdsc->safe_umount_waiters);
2862         init_completion(&mdsc->session_close_waiters);
2863         INIT_LIST_HEAD(&mdsc->waiting_for_map);
2864         mdsc->sessions = NULL;
2865         mdsc->max_sessions = 0;
2866         mdsc->stopping = 0;
2867         init_rwsem(&mdsc->snap_rwsem);
2868         mdsc->snap_realms = RB_ROOT;
2869         INIT_LIST_HEAD(&mdsc->snap_empty);
2870         spin_lock_init(&mdsc->snap_empty_lock);
2871         mdsc->last_tid = 0;
2872         mdsc->request_tree = RB_ROOT;
2873         INIT_DELAYED_WORK(&mdsc->delayed_work, delayed_work);
2874         mdsc->last_renew_caps = jiffies;
2875         INIT_LIST_HEAD(&mdsc->cap_delay_list);
2876         spin_lock_init(&mdsc->cap_delay_lock);
2877         INIT_LIST_HEAD(&mdsc->snap_flush_list);
2878         spin_lock_init(&mdsc->snap_flush_lock);
2879         mdsc->cap_flush_seq = 0;
2880         INIT_LIST_HEAD(&mdsc->cap_dirty);
2881         mdsc->num_cap_flushing = 0;
2882         spin_lock_init(&mdsc->cap_dirty_lock);
2883         init_waitqueue_head(&mdsc->cap_flushing_wq);
2884         spin_lock_init(&mdsc->dentry_lru_lock);
2885         INIT_LIST_HEAD(&mdsc->dentry_lru);
2886
2887         ceph_caps_init(mdsc);
2888         ceph_adjust_min_caps(mdsc, client->min_caps);
2889
2890         return 0;
2891 }
2892
2893 /*
2894  * Wait for safe replies on open mds requests.  If we time out, drop
2895  * all requests from the tree to avoid dangling dentry refs.
2896  */
2897 static void wait_requests(struct ceph_mds_client *mdsc)
2898 {
2899         struct ceph_mds_request *req;
2900         struct ceph_client *client = mdsc->client;
2901
2902         mutex_lock(&mdsc->mutex);
2903         if (__get_oldest_req(mdsc)) {
2904                 mutex_unlock(&mdsc->mutex);
2905
2906                 dout("wait_requests waiting for requests\n");
2907                 wait_for_completion_timeout(&mdsc->safe_umount_waiters,
2908                                     client->mount_args->mount_timeout * HZ);
2909
2910                 /* tear down remaining requests */
2911                 mutex_lock(&mdsc->mutex);
2912                 while ((req = __get_oldest_req(mdsc))) {
2913                         dout("wait_requests timed out on tid %llu\n",
2914                              req->r_tid);
2915                         __unregister_request(mdsc, req);
2916                 }
2917         }
2918         mutex_unlock(&mdsc->mutex);
2919         dout("wait_requests done\n");
2920 }
2921
2922 /*
2923  * called before mount is ro, and before dentries are torn down.
2924  * (hmm, does this still race with new lookups?)
2925  */
2926 void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc)
2927 {
2928         dout("pre_umount\n");
2929         mdsc->stopping = 1;
2930
2931         drop_leases(mdsc);
2932         ceph_flush_dirty_caps(mdsc);
2933         wait_requests(mdsc);
2934
2935         /*
2936          * wait for reply handlers to drop their request refs and
2937          * their inode/dcache refs
2938          */
2939         ceph_msgr_flush();
2940 }
2941
2942 /*
2943  * wait for all write mds requests to flush.
2944  */
2945 static void wait_unsafe_requests(struct ceph_mds_client *mdsc, u64 want_tid)
2946 {
2947         struct ceph_mds_request *req = NULL, *nextreq;
2948         struct rb_node *n;
2949
2950         mutex_lock(&mdsc->mutex);
2951         dout("wait_unsafe_requests want %lld\n", want_tid);
2952 restart:
2953         req = __get_oldest_req(mdsc);
2954         while (req && req->r_tid <= want_tid) {
2955                 /* find next request */
2956                 n = rb_next(&req->r_node);
2957                 if (n)
2958                         nextreq = rb_entry(n, struct ceph_mds_request, r_node);
2959                 else
2960                         nextreq = NULL;
2961                 if ((req->r_op & CEPH_MDS_OP_WRITE)) {
2962                         /* write op */
2963                         ceph_mdsc_get_request(req);
2964                         if (nextreq)
2965                                 ceph_mdsc_get_request(nextreq);
2966                         mutex_unlock(&mdsc->mutex);
2967                         dout("wait_unsafe_requests  wait on %llu (want %llu)\n",
2968                              req->r_tid, want_tid);
2969                         wait_for_completion(&req->r_safe_completion);
2970                         mutex_lock(&mdsc->mutex);
2971                         ceph_mdsc_put_request(req);
2972                         if (!nextreq)
2973                                 break;  /* next dne before, so we're done! */
2974                         if (RB_EMPTY_NODE(&nextreq->r_node)) {
2975                                 /* next request was removed from tree */
2976                                 ceph_mdsc_put_request(nextreq);
2977                                 goto restart;
2978                         }
2979                         ceph_mdsc_put_request(nextreq);  /* won't go away */
2980                 }
2981                 req = nextreq;
2982         }
2983         mutex_unlock(&mdsc->mutex);
2984         dout("wait_unsafe_requests done\n");
2985 }
2986
2987 void ceph_mdsc_sync(struct ceph_mds_client *mdsc)
2988 {
2989         u64 want_tid, want_flush;
2990
2991         if (mdsc->client->mount_state == CEPH_MOUNT_SHUTDOWN)
2992                 return;
2993
2994         dout("sync\n");
2995         mutex_lock(&mdsc->mutex);
2996         want_tid = mdsc->last_tid;
2997         want_flush = mdsc->cap_flush_seq;
2998         mutex_unlock(&mdsc->mutex);
2999         dout("sync want tid %lld flush_seq %lld\n", want_tid, want_flush);
3000
3001         ceph_flush_dirty_caps(mdsc);
3002
3003         wait_unsafe_requests(mdsc, want_tid);
3004         wait_event(mdsc->cap_flushing_wq, check_cap_flush(mdsc, want_flush));
3005 }
3006
3007
3008 /*
3009  * called after sb is ro.
3010  */
3011 void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc)
3012 {
3013         struct ceph_mds_session *session;
3014         int i;
3015         int n;
3016         struct ceph_client *client = mdsc->client;
3017         unsigned long started, timeout = client->mount_args->mount_timeout * HZ;
3018
3019         dout("close_sessions\n");
3020
3021         mutex_lock(&mdsc->mutex);
3022
3023         /* close sessions */
3024         started = jiffies;
3025         while (time_before(jiffies, started + timeout)) {
3026                 dout("closing sessions\n");
3027                 n = 0;
3028                 for (i = 0; i < mdsc->max_sessions; i++) {
3029                         session = __ceph_lookup_mds_session(mdsc, i);
3030                         if (!session)
3031                                 continue;
3032                         mutex_unlock(&mdsc->mutex);
3033                         mutex_lock(&session->s_mutex);
3034                         __close_session(mdsc, session);
3035                         mutex_unlock(&session->s_mutex);
3036                         ceph_put_mds_session(session);
3037                         mutex_lock(&mdsc->mutex);
3038                         n++;
3039                 }
3040                 if (n == 0)
3041                         break;
3042
3043                 if (client->mount_state == CEPH_MOUNT_SHUTDOWN)
3044                         break;
3045
3046                 dout("waiting for sessions to close\n");
3047                 mutex_unlock(&mdsc->mutex);
3048                 wait_for_completion_timeout(&mdsc->session_close_waiters,
3049                                             timeout);
3050                 mutex_lock(&mdsc->mutex);
3051         }
3052
3053         /* tear down remaining sessions */
3054         for (i = 0; i < mdsc->max_sessions; i++) {
3055                 if (mdsc->sessions[i]) {
3056                         session = get_session(mdsc->sessions[i]);
3057                         __unregister_session(mdsc, session);
3058                         mutex_unlock(&mdsc->mutex);
3059                         mutex_lock(&session->s_mutex);
3060                         remove_session_caps(session);
3061                         mutex_unlock(&session->s_mutex);
3062                         ceph_put_mds_session(session);
3063                         mutex_lock(&mdsc->mutex);
3064                 }
3065         }
3066
3067         WARN_ON(!list_empty(&mdsc->cap_delay_list));
3068
3069         mutex_unlock(&mdsc->mutex);
3070
3071         ceph_cleanup_empty_realms(mdsc);
3072
3073         cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */
3074
3075         dout("stopped\n");
3076 }
3077
3078 void ceph_mdsc_stop(struct ceph_mds_client *mdsc)
3079 {
3080         dout("stop\n");
3081         cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */
3082         if (mdsc->mdsmap)
3083                 ceph_mdsmap_destroy(mdsc->mdsmap);
3084         kfree(mdsc->sessions);
3085         ceph_caps_finalize(mdsc);
3086 }
3087
3088
3089 /*
3090  * handle mds map update.
3091  */
3092 void ceph_mdsc_handle_map(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
3093 {
3094         u32 epoch;
3095         u32 maplen;
3096         void *p = msg->front.iov_base;
3097         void *end = p + msg->front.iov_len;
3098         struct ceph_mdsmap *newmap, *oldmap;
3099         struct ceph_fsid fsid;
3100         int err = -EINVAL;
3101
3102         ceph_decode_need(&p, end, sizeof(fsid)+2*sizeof(u32), bad);
3103         ceph_decode_copy(&p, &fsid, sizeof(fsid));
3104         if (ceph_check_fsid(mdsc->client, &fsid) < 0)
3105                 return;
3106         epoch = ceph_decode_32(&p);
3107         maplen = ceph_decode_32(&p);
3108         dout("handle_map epoch %u len %d\n", epoch, (int)maplen);
3109
3110         /* do we need it? */
3111         ceph_monc_got_mdsmap(&mdsc->client->monc, epoch);
3112         mutex_lock(&mdsc->mutex);
3113         if (mdsc->mdsmap && epoch <= mdsc->mdsmap->m_epoch) {
3114                 dout("handle_map epoch %u <= our %u\n",
3115                      epoch, mdsc->mdsmap->m_epoch);
3116                 mutex_unlock(&mdsc->mutex);
3117                 return;
3118         }
3119
3120         newmap = ceph_mdsmap_decode(&p, end);
3121         if (IS_ERR(newmap)) {
3122                 err = PTR_ERR(newmap);
3123                 goto bad_unlock;
3124         }
3125
3126         /* swap into place */
3127         if (mdsc->mdsmap) {
3128                 oldmap = mdsc->mdsmap;
3129                 mdsc->mdsmap = newmap;
3130                 check_new_map(mdsc, newmap, oldmap);
3131                 ceph_mdsmap_destroy(oldmap);
3132         } else {
3133                 mdsc->mdsmap = newmap;  /* first mds map */
3134         }
3135         mdsc->client->sb->s_maxbytes = mdsc->mdsmap->m_max_file_size;
3136
3137         __wake_requests(mdsc, &mdsc->waiting_for_map);
3138
3139         mutex_unlock(&mdsc->mutex);
3140         schedule_delayed(mdsc);
3141         return;
3142
3143 bad_unlock:
3144         mutex_unlock(&mdsc->mutex);
3145 bad:
3146         pr_err("error decoding mdsmap %d\n", err);
3147         return;
3148 }
3149
3150 static struct ceph_connection *con_get(struct ceph_connection *con)
3151 {
3152         struct ceph_mds_session *s = con->private;
3153
3154         if (get_session(s)) {
3155                 dout("mdsc con_get %p ok (%d)\n", s, atomic_read(&s->s_ref));
3156                 return con;
3157         }
3158         dout("mdsc con_get %p FAIL\n", s);
3159         return NULL;
3160 }
3161
3162 static void con_put(struct ceph_connection *con)
3163 {
3164         struct ceph_mds_session *s = con->private;
3165
3166         ceph_put_mds_session(s);
3167         dout("mdsc con_put %p (%d)\n", s, atomic_read(&s->s_ref));
3168 }
3169
3170 /*
3171  * if the client is unresponsive for long enough, the mds will kill
3172  * the session entirely.
3173  */
3174 static void peer_reset(struct ceph_connection *con)
3175 {
3176         struct ceph_mds_session *s = con->private;
3177         struct ceph_mds_client *mdsc = s->s_mdsc;
3178
3179         pr_warning("mds%d closed our session\n", s->s_mds);
3180         send_mds_reconnect(mdsc, s);
3181 }
3182
3183 static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
3184 {
3185         struct ceph_mds_session *s = con->private;
3186         struct ceph_mds_client *mdsc = s->s_mdsc;
3187         int type = le16_to_cpu(msg->hdr.type);
3188
3189         mutex_lock(&mdsc->mutex);
3190         if (__verify_registered_session(mdsc, s) < 0) {
3191                 mutex_unlock(&mdsc->mutex);
3192                 goto out;
3193         }
3194         mutex_unlock(&mdsc->mutex);
3195
3196         switch (type) {
3197         case CEPH_MSG_MDS_MAP:
3198                 ceph_mdsc_handle_map(mdsc, msg);
3199                 break;
3200         case CEPH_MSG_CLIENT_SESSION:
3201                 handle_session(s, msg);
3202                 break;
3203         case CEPH_MSG_CLIENT_REPLY:
3204                 handle_reply(s, msg);
3205                 break;
3206         case CEPH_MSG_CLIENT_REQUEST_FORWARD:
3207                 handle_forward(mdsc, s, msg);
3208                 break;
3209         case CEPH_MSG_CLIENT_CAPS:
3210                 ceph_handle_caps(s, msg);
3211                 break;
3212         case CEPH_MSG_CLIENT_SNAP:
3213                 ceph_handle_snap(mdsc, s, msg);
3214                 break;
3215         case CEPH_MSG_CLIENT_LEASE:
3216                 handle_lease(mdsc, s, msg);
3217                 break;
3218
3219         default:
3220                 pr_err("received unknown message type %d %s\n", type,
3221                        ceph_msg_type_name(type));
3222         }
3223 out:
3224         ceph_msg_put(msg);
3225 }
3226
3227 /*
3228  * authentication
3229  */
3230 static int get_authorizer(struct ceph_connection *con,
3231                           void **buf, int *len, int *proto,
3232                           void **reply_buf, int *reply_len, int force_new)
3233 {
3234         struct ceph_mds_session *s = con->private;
3235         struct ceph_mds_client *mdsc = s->s_mdsc;
3236         struct ceph_auth_client *ac = mdsc->client->monc.auth;
3237         int ret = 0;
3238
3239         if (force_new && s->s_authorizer) {
3240                 ac->ops->destroy_authorizer(ac, s->s_authorizer);
3241                 s->s_authorizer = NULL;
3242         }
3243         if (s->s_authorizer == NULL) {
3244                 if (ac->ops->create_authorizer) {
3245                         ret = ac->ops->create_authorizer(
3246                                 ac, CEPH_ENTITY_TYPE_MDS,
3247                                 &s->s_authorizer,
3248                                 &s->s_authorizer_buf,
3249                                 &s->s_authorizer_buf_len,
3250                                 &s->s_authorizer_reply_buf,
3251                                 &s->s_authorizer_reply_buf_len);
3252                         if (ret)
3253                                 return ret;
3254                 }
3255         }
3256
3257         *proto = ac->protocol;
3258         *buf = s->s_authorizer_buf;
3259         *len = s->s_authorizer_buf_len;
3260         *reply_buf = s->s_authorizer_reply_buf;
3261         *reply_len = s->s_authorizer_reply_buf_len;
3262         return 0;
3263 }
3264
3265
3266 static int verify_authorizer_reply(struct ceph_connection *con, int len)
3267 {
3268         struct ceph_mds_session *s = con->private;
3269         struct ceph_mds_client *mdsc = s->s_mdsc;
3270         struct ceph_auth_client *ac = mdsc->client->monc.auth;
3271
3272         return ac->ops->verify_authorizer_reply(ac, s->s_authorizer, len);
3273 }
3274
3275 static int invalidate_authorizer(struct ceph_connection *con)
3276 {
3277         struct ceph_mds_session *s = con->private;
3278         struct ceph_mds_client *mdsc = s->s_mdsc;
3279         struct ceph_auth_client *ac = mdsc->client->monc.auth;
3280
3281         if (ac->ops->invalidate_authorizer)
3282                 ac->ops->invalidate_authorizer(ac, CEPH_ENTITY_TYPE_MDS);
3283
3284         return ceph_monc_validate_auth(&mdsc->client->monc);
3285 }
3286
3287 static const struct ceph_connection_operations mds_con_ops = {
3288         .get = con_get,
3289         .put = con_put,
3290         .dispatch = dispatch,
3291         .get_authorizer = get_authorizer,
3292         .verify_authorizer_reply = verify_authorizer_reply,
3293         .invalidate_authorizer = invalidate_authorizer,
3294         .peer_reset = peer_reset,
3295 };
3296
3297
3298
3299
3300 /* eof */