]> git.karo-electronics.de Git - karo-tx-linux.git/blob - fs/ceph/mds_client.c
ceph: don't call truncate_pagecache in ceph_writepages_start
[karo-tx-linux.git] / fs / ceph / mds_client.c
1 #include <linux/ceph/ceph_debug.h>
2
3 #include <linux/fs.h>
4 #include <linux/wait.h>
5 #include <linux/slab.h>
6 #include <linux/gfp.h>
7 #include <linux/sched.h>
8 #include <linux/debugfs.h>
9 #include <linux/seq_file.h>
10 #include <linux/utsname.h>
11 #include <linux/ratelimit.h>
12
13 #include "super.h"
14 #include "mds_client.h"
15
16 #include <linux/ceph/ceph_features.h>
17 #include <linux/ceph/messenger.h>
18 #include <linux/ceph/decode.h>
19 #include <linux/ceph/pagelist.h>
20 #include <linux/ceph/auth.h>
21 #include <linux/ceph/debugfs.h>
22
23 /*
24  * A cluster of MDS (metadata server) daemons is responsible for
25  * managing the file system namespace (the directory hierarchy and
26  * inodes) and for coordinating shared access to storage.  Metadata is
27  * partitioning hierarchically across a number of servers, and that
28  * partition varies over time as the cluster adjusts the distribution
29  * in order to balance load.
30  *
31  * The MDS client is primarily responsible to managing synchronous
32  * metadata requests for operations like open, unlink, and so forth.
33  * If there is a MDS failure, we find out about it when we (possibly
34  * request and) receive a new MDS map, and can resubmit affected
35  * requests.
36  *
37  * For the most part, though, we take advantage of a lossless
38  * communications channel to the MDS, and do not need to worry about
39  * timing out or resubmitting requests.
40  *
41  * We maintain a stateful "session" with each MDS we interact with.
42  * Within each session, we sent periodic heartbeat messages to ensure
43  * any capabilities or leases we have been issues remain valid.  If
44  * the session times out and goes stale, our leases and capabilities
45  * are no longer valid.
46  */
47
48 struct ceph_reconnect_state {
49         int nr_caps;
50         struct ceph_pagelist *pagelist;
51         bool flock;
52 };
53
54 static void __wake_requests(struct ceph_mds_client *mdsc,
55                             struct list_head *head);
56
57 static const struct ceph_connection_operations mds_con_ops;
58
59
60 /*
61  * mds reply parsing
62  */
63
64 /*
65  * parse individual inode info
66  */
67 static int parse_reply_info_in(void **p, void *end,
68                                struct ceph_mds_reply_info_in *info,
69                                u64 features)
70 {
71         int err = -EIO;
72
73         info->in = *p;
74         *p += sizeof(struct ceph_mds_reply_inode) +
75                 sizeof(*info->in->fragtree.splits) *
76                 le32_to_cpu(info->in->fragtree.nsplits);
77
78         ceph_decode_32_safe(p, end, info->symlink_len, bad);
79         ceph_decode_need(p, end, info->symlink_len, bad);
80         info->symlink = *p;
81         *p += info->symlink_len;
82
83         if (features & CEPH_FEATURE_DIRLAYOUTHASH)
84                 ceph_decode_copy_safe(p, end, &info->dir_layout,
85                                       sizeof(info->dir_layout), bad);
86         else
87                 memset(&info->dir_layout, 0, sizeof(info->dir_layout));
88
89         ceph_decode_32_safe(p, end, info->xattr_len, bad);
90         ceph_decode_need(p, end, info->xattr_len, bad);
91         info->xattr_data = *p;
92         *p += info->xattr_len;
93
94         if (features & CEPH_FEATURE_MDS_INLINE_DATA) {
95                 ceph_decode_64_safe(p, end, info->inline_version, bad);
96                 ceph_decode_32_safe(p, end, info->inline_len, bad);
97                 ceph_decode_need(p, end, info->inline_len, bad);
98                 info->inline_data = *p;
99                 *p += info->inline_len;
100         } else
101                 info->inline_version = CEPH_INLINE_NONE;
102
103         if (features & CEPH_FEATURE_FS_FILE_LAYOUT_V2) {
104                 ceph_decode_32_safe(p, end, info->pool_ns_len, bad);
105                 ceph_decode_need(p, end, info->pool_ns_len, bad);
106                 *p += info->pool_ns_len;
107         } else {
108                 info->pool_ns_len = 0;
109         }
110
111         return 0;
112 bad:
113         return err;
114 }
115
116 /*
117  * parse a normal reply, which may contain a (dir+)dentry and/or a
118  * target inode.
119  */
120 static int parse_reply_info_trace(void **p, void *end,
121                                   struct ceph_mds_reply_info_parsed *info,
122                                   u64 features)
123 {
124         int err;
125
126         if (info->head->is_dentry) {
127                 err = parse_reply_info_in(p, end, &info->diri, features);
128                 if (err < 0)
129                         goto out_bad;
130
131                 if (unlikely(*p + sizeof(*info->dirfrag) > end))
132                         goto bad;
133                 info->dirfrag = *p;
134                 *p += sizeof(*info->dirfrag) +
135                         sizeof(u32)*le32_to_cpu(info->dirfrag->ndist);
136                 if (unlikely(*p > end))
137                         goto bad;
138
139                 ceph_decode_32_safe(p, end, info->dname_len, bad);
140                 ceph_decode_need(p, end, info->dname_len, bad);
141                 info->dname = *p;
142                 *p += info->dname_len;
143                 info->dlease = *p;
144                 *p += sizeof(*info->dlease);
145         }
146
147         if (info->head->is_target) {
148                 err = parse_reply_info_in(p, end, &info->targeti, features);
149                 if (err < 0)
150                         goto out_bad;
151         }
152
153         if (unlikely(*p != end))
154                 goto bad;
155         return 0;
156
157 bad:
158         err = -EIO;
159 out_bad:
160         pr_err("problem parsing mds trace %d\n", err);
161         return err;
162 }
163
164 /*
165  * parse readdir results
166  */
167 static int parse_reply_info_dir(void **p, void *end,
168                                 struct ceph_mds_reply_info_parsed *info,
169                                 u64 features)
170 {
171         u32 num, i = 0;
172         int err;
173
174         info->dir_dir = *p;
175         if (*p + sizeof(*info->dir_dir) > end)
176                 goto bad;
177         *p += sizeof(*info->dir_dir) +
178                 sizeof(u32)*le32_to_cpu(info->dir_dir->ndist);
179         if (*p > end)
180                 goto bad;
181
182         ceph_decode_need(p, end, sizeof(num) + 2, bad);
183         num = ceph_decode_32(p);
184         info->dir_end = ceph_decode_8(p);
185         info->dir_complete = ceph_decode_8(p);
186         if (num == 0)
187                 goto done;
188
189         BUG_ON(!info->dir_in);
190         info->dir_dname = (void *)(info->dir_in + num);
191         info->dir_dname_len = (void *)(info->dir_dname + num);
192         info->dir_dlease = (void *)(info->dir_dname_len + num);
193         if ((unsigned long)(info->dir_dlease + num) >
194             (unsigned long)info->dir_in + info->dir_buf_size) {
195                 pr_err("dir contents are larger than expected\n");
196                 WARN_ON(1);
197                 goto bad;
198         }
199
200         info->dir_nr = num;
201         while (num) {
202                 /* dentry */
203                 ceph_decode_need(p, end, sizeof(u32)*2, bad);
204                 info->dir_dname_len[i] = ceph_decode_32(p);
205                 ceph_decode_need(p, end, info->dir_dname_len[i], bad);
206                 info->dir_dname[i] = *p;
207                 *p += info->dir_dname_len[i];
208                 dout("parsed dir dname '%.*s'\n", info->dir_dname_len[i],
209                      info->dir_dname[i]);
210                 info->dir_dlease[i] = *p;
211                 *p += sizeof(struct ceph_mds_reply_lease);
212
213                 /* inode */
214                 err = parse_reply_info_in(p, end, &info->dir_in[i], features);
215                 if (err < 0)
216                         goto out_bad;
217                 i++;
218                 num--;
219         }
220
221 done:
222         if (*p != end)
223                 goto bad;
224         return 0;
225
226 bad:
227         err = -EIO;
228 out_bad:
229         pr_err("problem parsing dir contents %d\n", err);
230         return err;
231 }
232
233 /*
234  * parse fcntl F_GETLK results
235  */
236 static int parse_reply_info_filelock(void **p, void *end,
237                                      struct ceph_mds_reply_info_parsed *info,
238                                      u64 features)
239 {
240         if (*p + sizeof(*info->filelock_reply) > end)
241                 goto bad;
242
243         info->filelock_reply = *p;
244         *p += sizeof(*info->filelock_reply);
245
246         if (unlikely(*p != end))
247                 goto bad;
248         return 0;
249
250 bad:
251         return -EIO;
252 }
253
254 /*
255  * parse create results
256  */
257 static int parse_reply_info_create(void **p, void *end,
258                                   struct ceph_mds_reply_info_parsed *info,
259                                   u64 features)
260 {
261         if (features & CEPH_FEATURE_REPLY_CREATE_INODE) {
262                 if (*p == end) {
263                         info->has_create_ino = false;
264                 } else {
265                         info->has_create_ino = true;
266                         info->ino = ceph_decode_64(p);
267                 }
268         }
269
270         if (unlikely(*p != end))
271                 goto bad;
272         return 0;
273
274 bad:
275         return -EIO;
276 }
277
278 /*
279  * parse extra results
280  */
281 static int parse_reply_info_extra(void **p, void *end,
282                                   struct ceph_mds_reply_info_parsed *info,
283                                   u64 features)
284 {
285         if (info->head->op == CEPH_MDS_OP_GETFILELOCK)
286                 return parse_reply_info_filelock(p, end, info, features);
287         else if (info->head->op == CEPH_MDS_OP_READDIR ||
288                  info->head->op == CEPH_MDS_OP_LSSNAP)
289                 return parse_reply_info_dir(p, end, info, features);
290         else if (info->head->op == CEPH_MDS_OP_CREATE)
291                 return parse_reply_info_create(p, end, info, features);
292         else
293                 return -EIO;
294 }
295
296 /*
297  * parse entire mds reply
298  */
299 static int parse_reply_info(struct ceph_msg *msg,
300                             struct ceph_mds_reply_info_parsed *info,
301                             u64 features)
302 {
303         void *p, *end;
304         u32 len;
305         int err;
306
307         info->head = msg->front.iov_base;
308         p = msg->front.iov_base + sizeof(struct ceph_mds_reply_head);
309         end = p + msg->front.iov_len - sizeof(struct ceph_mds_reply_head);
310
311         /* trace */
312         ceph_decode_32_safe(&p, end, len, bad);
313         if (len > 0) {
314                 ceph_decode_need(&p, end, len, bad);
315                 err = parse_reply_info_trace(&p, p+len, info, features);
316                 if (err < 0)
317                         goto out_bad;
318         }
319
320         /* extra */
321         ceph_decode_32_safe(&p, end, len, bad);
322         if (len > 0) {
323                 ceph_decode_need(&p, end, len, bad);
324                 err = parse_reply_info_extra(&p, p+len, info, features);
325                 if (err < 0)
326                         goto out_bad;
327         }
328
329         /* snap blob */
330         ceph_decode_32_safe(&p, end, len, bad);
331         info->snapblob_len = len;
332         info->snapblob = p;
333         p += len;
334
335         if (p != end)
336                 goto bad;
337         return 0;
338
339 bad:
340         err = -EIO;
341 out_bad:
342         pr_err("mds parse_reply err %d\n", err);
343         return err;
344 }
345
346 static void destroy_reply_info(struct ceph_mds_reply_info_parsed *info)
347 {
348         if (!info->dir_in)
349                 return;
350         free_pages((unsigned long)info->dir_in, get_order(info->dir_buf_size));
351 }
352
353
354 /*
355  * sessions
356  */
357 const char *ceph_session_state_name(int s)
358 {
359         switch (s) {
360         case CEPH_MDS_SESSION_NEW: return "new";
361         case CEPH_MDS_SESSION_OPENING: return "opening";
362         case CEPH_MDS_SESSION_OPEN: return "open";
363         case CEPH_MDS_SESSION_HUNG: return "hung";
364         case CEPH_MDS_SESSION_CLOSING: return "closing";
365         case CEPH_MDS_SESSION_RESTARTING: return "restarting";
366         case CEPH_MDS_SESSION_RECONNECTING: return "reconnecting";
367         default: return "???";
368         }
369 }
370
371 static struct ceph_mds_session *get_session(struct ceph_mds_session *s)
372 {
373         if (atomic_inc_not_zero(&s->s_ref)) {
374                 dout("mdsc get_session %p %d -> %d\n", s,
375                      atomic_read(&s->s_ref)-1, atomic_read(&s->s_ref));
376                 return s;
377         } else {
378                 dout("mdsc get_session %p 0 -- FAIL", s);
379                 return NULL;
380         }
381 }
382
383 void ceph_put_mds_session(struct ceph_mds_session *s)
384 {
385         dout("mdsc put_session %p %d -> %d\n", s,
386              atomic_read(&s->s_ref), atomic_read(&s->s_ref)-1);
387         if (atomic_dec_and_test(&s->s_ref)) {
388                 if (s->s_auth.authorizer)
389                         ceph_auth_destroy_authorizer(s->s_auth.authorizer);
390                 kfree(s);
391         }
392 }
393
394 /*
395  * called under mdsc->mutex
396  */
397 struct ceph_mds_session *__ceph_lookup_mds_session(struct ceph_mds_client *mdsc,
398                                                    int mds)
399 {
400         struct ceph_mds_session *session;
401
402         if (mds >= mdsc->max_sessions || mdsc->sessions[mds] == NULL)
403                 return NULL;
404         session = mdsc->sessions[mds];
405         dout("lookup_mds_session %p %d\n", session,
406              atomic_read(&session->s_ref));
407         get_session(session);
408         return session;
409 }
410
411 static bool __have_session(struct ceph_mds_client *mdsc, int mds)
412 {
413         if (mds >= mdsc->max_sessions)
414                 return false;
415         return mdsc->sessions[mds];
416 }
417
418 static int __verify_registered_session(struct ceph_mds_client *mdsc,
419                                        struct ceph_mds_session *s)
420 {
421         if (s->s_mds >= mdsc->max_sessions ||
422             mdsc->sessions[s->s_mds] != s)
423                 return -ENOENT;
424         return 0;
425 }
426
427 /*
428  * create+register a new session for given mds.
429  * called under mdsc->mutex.
430  */
431 static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
432                                                  int mds)
433 {
434         struct ceph_mds_session *s;
435
436         if (mds >= mdsc->mdsmap->m_max_mds)
437                 return ERR_PTR(-EINVAL);
438
439         s = kzalloc(sizeof(*s), GFP_NOFS);
440         if (!s)
441                 return ERR_PTR(-ENOMEM);
442         s->s_mdsc = mdsc;
443         s->s_mds = mds;
444         s->s_state = CEPH_MDS_SESSION_NEW;
445         s->s_ttl = 0;
446         s->s_seq = 0;
447         mutex_init(&s->s_mutex);
448
449         ceph_con_init(&s->s_con, s, &mds_con_ops, &mdsc->fsc->client->msgr);
450
451         spin_lock_init(&s->s_gen_ttl_lock);
452         s->s_cap_gen = 0;
453         s->s_cap_ttl = jiffies - 1;
454
455         spin_lock_init(&s->s_cap_lock);
456         s->s_renew_requested = 0;
457         s->s_renew_seq = 0;
458         INIT_LIST_HEAD(&s->s_caps);
459         s->s_nr_caps = 0;
460         s->s_trim_caps = 0;
461         atomic_set(&s->s_ref, 1);
462         INIT_LIST_HEAD(&s->s_waiting);
463         INIT_LIST_HEAD(&s->s_unsafe);
464         s->s_num_cap_releases = 0;
465         s->s_cap_reconnect = 0;
466         s->s_cap_iterator = NULL;
467         INIT_LIST_HEAD(&s->s_cap_releases);
468         INIT_LIST_HEAD(&s->s_cap_flushing);
469         INIT_LIST_HEAD(&s->s_cap_snaps_flushing);
470
471         dout("register_session mds%d\n", mds);
472         if (mds >= mdsc->max_sessions) {
473                 int newmax = 1 << get_count_order(mds+1);
474                 struct ceph_mds_session **sa;
475
476                 dout("register_session realloc to %d\n", newmax);
477                 sa = kcalloc(newmax, sizeof(void *), GFP_NOFS);
478                 if (sa == NULL)
479                         goto fail_realloc;
480                 if (mdsc->sessions) {
481                         memcpy(sa, mdsc->sessions,
482                                mdsc->max_sessions * sizeof(void *));
483                         kfree(mdsc->sessions);
484                 }
485                 mdsc->sessions = sa;
486                 mdsc->max_sessions = newmax;
487         }
488         mdsc->sessions[mds] = s;
489         atomic_inc(&mdsc->num_sessions);
490         atomic_inc(&s->s_ref);  /* one ref to sessions[], one to caller */
491
492         ceph_con_open(&s->s_con, CEPH_ENTITY_TYPE_MDS, mds,
493                       ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
494
495         return s;
496
497 fail_realloc:
498         kfree(s);
499         return ERR_PTR(-ENOMEM);
500 }
501
502 /*
503  * called under mdsc->mutex
504  */
505 static void __unregister_session(struct ceph_mds_client *mdsc,
506                                struct ceph_mds_session *s)
507 {
508         dout("__unregister_session mds%d %p\n", s->s_mds, s);
509         BUG_ON(mdsc->sessions[s->s_mds] != s);
510         mdsc->sessions[s->s_mds] = NULL;
511         ceph_con_close(&s->s_con);
512         ceph_put_mds_session(s);
513         atomic_dec(&mdsc->num_sessions);
514 }
515
516 /*
517  * drop session refs in request.
518  *
519  * should be last request ref, or hold mdsc->mutex
520  */
521 static void put_request_session(struct ceph_mds_request *req)
522 {
523         if (req->r_session) {
524                 ceph_put_mds_session(req->r_session);
525                 req->r_session = NULL;
526         }
527 }
528
529 void ceph_mdsc_release_request(struct kref *kref)
530 {
531         struct ceph_mds_request *req = container_of(kref,
532                                                     struct ceph_mds_request,
533                                                     r_kref);
534         destroy_reply_info(&req->r_reply_info);
535         if (req->r_request)
536                 ceph_msg_put(req->r_request);
537         if (req->r_reply)
538                 ceph_msg_put(req->r_reply);
539         if (req->r_inode) {
540                 ceph_put_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
541                 iput(req->r_inode);
542         }
543         if (req->r_locked_dir)
544                 ceph_put_cap_refs(ceph_inode(req->r_locked_dir), CEPH_CAP_PIN);
545         iput(req->r_target_inode);
546         if (req->r_dentry)
547                 dput(req->r_dentry);
548         if (req->r_old_dentry)
549                 dput(req->r_old_dentry);
550         if (req->r_old_dentry_dir) {
551                 /*
552                  * track (and drop pins for) r_old_dentry_dir
553                  * separately, since r_old_dentry's d_parent may have
554                  * changed between the dir mutex being dropped and
555                  * this request being freed.
556                  */
557                 ceph_put_cap_refs(ceph_inode(req->r_old_dentry_dir),
558                                   CEPH_CAP_PIN);
559                 iput(req->r_old_dentry_dir);
560         }
561         kfree(req->r_path1);
562         kfree(req->r_path2);
563         if (req->r_pagelist)
564                 ceph_pagelist_release(req->r_pagelist);
565         put_request_session(req);
566         ceph_unreserve_caps(req->r_mdsc, &req->r_caps_reservation);
567         kfree(req);
568 }
569
570 DEFINE_RB_FUNCS(request, struct ceph_mds_request, r_tid, r_node)
571
572 /*
573  * lookup session, bump ref if found.
574  *
575  * called under mdsc->mutex.
576  */
577 static struct ceph_mds_request *
578 lookup_get_request(struct ceph_mds_client *mdsc, u64 tid)
579 {
580         struct ceph_mds_request *req;
581
582         req = lookup_request(&mdsc->request_tree, tid);
583         if (req)
584                 ceph_mdsc_get_request(req);
585
586         return req;
587 }
588
589 /*
590  * Register an in-flight request, and assign a tid.  Link to directory
591  * are modifying (if any).
592  *
593  * Called under mdsc->mutex.
594  */
595 static void __register_request(struct ceph_mds_client *mdsc,
596                                struct ceph_mds_request *req,
597                                struct inode *dir)
598 {
599         req->r_tid = ++mdsc->last_tid;
600         if (req->r_num_caps)
601                 ceph_reserve_caps(mdsc, &req->r_caps_reservation,
602                                   req->r_num_caps);
603         dout("__register_request %p tid %lld\n", req, req->r_tid);
604         ceph_mdsc_get_request(req);
605         insert_request(&mdsc->request_tree, req);
606
607         req->r_uid = current_fsuid();
608         req->r_gid = current_fsgid();
609
610         if (mdsc->oldest_tid == 0 && req->r_op != CEPH_MDS_OP_SETFILELOCK)
611                 mdsc->oldest_tid = req->r_tid;
612
613         if (dir) {
614                 ihold(dir);
615                 req->r_unsafe_dir = dir;
616         }
617 }
618
619 static void __unregister_request(struct ceph_mds_client *mdsc,
620                                  struct ceph_mds_request *req)
621 {
622         dout("__unregister_request %p tid %lld\n", req, req->r_tid);
623
624         if (req->r_tid == mdsc->oldest_tid) {
625                 struct rb_node *p = rb_next(&req->r_node);
626                 mdsc->oldest_tid = 0;
627                 while (p) {
628                         struct ceph_mds_request *next_req =
629                                 rb_entry(p, struct ceph_mds_request, r_node);
630                         if (next_req->r_op != CEPH_MDS_OP_SETFILELOCK) {
631                                 mdsc->oldest_tid = next_req->r_tid;
632                                 break;
633                         }
634                         p = rb_next(p);
635                 }
636         }
637
638         erase_request(&mdsc->request_tree, req);
639
640         if (req->r_unsafe_dir && req->r_got_unsafe) {
641                 struct ceph_inode_info *ci = ceph_inode(req->r_unsafe_dir);
642                 spin_lock(&ci->i_unsafe_lock);
643                 list_del_init(&req->r_unsafe_dir_item);
644                 spin_unlock(&ci->i_unsafe_lock);
645         }
646         if (req->r_target_inode && req->r_got_unsafe) {
647                 struct ceph_inode_info *ci = ceph_inode(req->r_target_inode);
648                 spin_lock(&ci->i_unsafe_lock);
649                 list_del_init(&req->r_unsafe_target_item);
650                 spin_unlock(&ci->i_unsafe_lock);
651         }
652
653         if (req->r_unsafe_dir) {
654                 iput(req->r_unsafe_dir);
655                 req->r_unsafe_dir = NULL;
656         }
657
658         complete_all(&req->r_safe_completion);
659
660         ceph_mdsc_put_request(req);
661 }
662
663 /*
664  * Choose mds to send request to next.  If there is a hint set in the
665  * request (e.g., due to a prior forward hint from the mds), use that.
666  * Otherwise, consult frag tree and/or caps to identify the
667  * appropriate mds.  If all else fails, choose randomly.
668  *
669  * Called under mdsc->mutex.
670  */
671 static struct dentry *get_nonsnap_parent(struct dentry *dentry)
672 {
673         /*
674          * we don't need to worry about protecting the d_parent access
675          * here because we never renaming inside the snapped namespace
676          * except to resplice to another snapdir, and either the old or new
677          * result is a valid result.
678          */
679         while (!IS_ROOT(dentry) && ceph_snap(d_inode(dentry)) != CEPH_NOSNAP)
680                 dentry = dentry->d_parent;
681         return dentry;
682 }
683
684 static int __choose_mds(struct ceph_mds_client *mdsc,
685                         struct ceph_mds_request *req)
686 {
687         struct inode *inode;
688         struct ceph_inode_info *ci;
689         struct ceph_cap *cap;
690         int mode = req->r_direct_mode;
691         int mds = -1;
692         u32 hash = req->r_direct_hash;
693         bool is_hash = req->r_direct_is_hash;
694
695         /*
696          * is there a specific mds we should try?  ignore hint if we have
697          * no session and the mds is not up (active or recovering).
698          */
699         if (req->r_resend_mds >= 0 &&
700             (__have_session(mdsc, req->r_resend_mds) ||
701              ceph_mdsmap_get_state(mdsc->mdsmap, req->r_resend_mds) > 0)) {
702                 dout("choose_mds using resend_mds mds%d\n",
703                      req->r_resend_mds);
704                 return req->r_resend_mds;
705         }
706
707         if (mode == USE_RANDOM_MDS)
708                 goto random;
709
710         inode = NULL;
711         if (req->r_inode) {
712                 inode = req->r_inode;
713         } else if (req->r_dentry) {
714                 /* ignore race with rename; old or new d_parent is okay */
715                 struct dentry *parent = req->r_dentry->d_parent;
716                 struct inode *dir = d_inode(parent);
717
718                 if (dir->i_sb != mdsc->fsc->sb) {
719                         /* not this fs! */
720                         inode = d_inode(req->r_dentry);
721                 } else if (ceph_snap(dir) != CEPH_NOSNAP) {
722                         /* direct snapped/virtual snapdir requests
723                          * based on parent dir inode */
724                         struct dentry *dn = get_nonsnap_parent(parent);
725                         inode = d_inode(dn);
726                         dout("__choose_mds using nonsnap parent %p\n", inode);
727                 } else {
728                         /* dentry target */
729                         inode = d_inode(req->r_dentry);
730                         if (!inode || mode == USE_AUTH_MDS) {
731                                 /* dir + name */
732                                 inode = dir;
733                                 hash = ceph_dentry_hash(dir, req->r_dentry);
734                                 is_hash = true;
735                         }
736                 }
737         }
738
739         dout("__choose_mds %p is_hash=%d (%d) mode %d\n", inode, (int)is_hash,
740              (int)hash, mode);
741         if (!inode)
742                 goto random;
743         ci = ceph_inode(inode);
744
745         if (is_hash && S_ISDIR(inode->i_mode)) {
746                 struct ceph_inode_frag frag;
747                 int found;
748
749                 ceph_choose_frag(ci, hash, &frag, &found);
750                 if (found) {
751                         if (mode == USE_ANY_MDS && frag.ndist > 0) {
752                                 u8 r;
753
754                                 /* choose a random replica */
755                                 get_random_bytes(&r, 1);
756                                 r %= frag.ndist;
757                                 mds = frag.dist[r];
758                                 dout("choose_mds %p %llx.%llx "
759                                      "frag %u mds%d (%d/%d)\n",
760                                      inode, ceph_vinop(inode),
761                                      frag.frag, mds,
762                                      (int)r, frag.ndist);
763                                 if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
764                                     CEPH_MDS_STATE_ACTIVE)
765                                         return mds;
766                         }
767
768                         /* since this file/dir wasn't known to be
769                          * replicated, then we want to look for the
770                          * authoritative mds. */
771                         mode = USE_AUTH_MDS;
772                         if (frag.mds >= 0) {
773                                 /* choose auth mds */
774                                 mds = frag.mds;
775                                 dout("choose_mds %p %llx.%llx "
776                                      "frag %u mds%d (auth)\n",
777                                      inode, ceph_vinop(inode), frag.frag, mds);
778                                 if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
779                                     CEPH_MDS_STATE_ACTIVE)
780                                         return mds;
781                         }
782                 }
783         }
784
785         spin_lock(&ci->i_ceph_lock);
786         cap = NULL;
787         if (mode == USE_AUTH_MDS)
788                 cap = ci->i_auth_cap;
789         if (!cap && !RB_EMPTY_ROOT(&ci->i_caps))
790                 cap = rb_entry(rb_first(&ci->i_caps), struct ceph_cap, ci_node);
791         if (!cap) {
792                 spin_unlock(&ci->i_ceph_lock);
793                 goto random;
794         }
795         mds = cap->session->s_mds;
796         dout("choose_mds %p %llx.%llx mds%d (%scap %p)\n",
797              inode, ceph_vinop(inode), mds,
798              cap == ci->i_auth_cap ? "auth " : "", cap);
799         spin_unlock(&ci->i_ceph_lock);
800         return mds;
801
802 random:
803         mds = ceph_mdsmap_get_random_mds(mdsc->mdsmap);
804         dout("choose_mds chose random mds%d\n", mds);
805         return mds;
806 }
807
808
809 /*
810  * session messages
811  */
812 static struct ceph_msg *create_session_msg(u32 op, u64 seq)
813 {
814         struct ceph_msg *msg;
815         struct ceph_mds_session_head *h;
816
817         msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), GFP_NOFS,
818                            false);
819         if (!msg) {
820                 pr_err("create_session_msg ENOMEM creating msg\n");
821                 return NULL;
822         }
823         h = msg->front.iov_base;
824         h->op = cpu_to_le32(op);
825         h->seq = cpu_to_le64(seq);
826
827         return msg;
828 }
829
830 /*
831  * session message, specialization for CEPH_SESSION_REQUEST_OPEN
832  * to include additional client metadata fields.
833  */
834 static struct ceph_msg *create_session_open_msg(struct ceph_mds_client *mdsc, u64 seq)
835 {
836         struct ceph_msg *msg;
837         struct ceph_mds_session_head *h;
838         int i = -1;
839         int metadata_bytes = 0;
840         int metadata_key_count = 0;
841         struct ceph_options *opt = mdsc->fsc->client->options;
842         void *p;
843
844         const char* metadata[][2] = {
845                 {"hostname", utsname()->nodename},
846                 {"kernel_version", utsname()->release},
847                 {"entity_id", opt->name ? opt->name : ""},
848                 {NULL, NULL}
849         };
850
851         /* Calculate serialized length of metadata */
852         metadata_bytes = 4;  /* map length */
853         for (i = 0; metadata[i][0] != NULL; ++i) {
854                 metadata_bytes += 8 + strlen(metadata[i][0]) +
855                         strlen(metadata[i][1]);
856                 metadata_key_count++;
857         }
858
859         /* Allocate the message */
860         msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h) + metadata_bytes,
861                            GFP_NOFS, false);
862         if (!msg) {
863                 pr_err("create_session_msg ENOMEM creating msg\n");
864                 return NULL;
865         }
866         h = msg->front.iov_base;
867         h->op = cpu_to_le32(CEPH_SESSION_REQUEST_OPEN);
868         h->seq = cpu_to_le64(seq);
869
870         /*
871          * Serialize client metadata into waiting buffer space, using
872          * the format that userspace expects for map<string, string>
873          *
874          * ClientSession messages with metadata are v2
875          */
876         msg->hdr.version = cpu_to_le16(2);
877         msg->hdr.compat_version = cpu_to_le16(1);
878
879         /* The write pointer, following the session_head structure */
880         p = msg->front.iov_base + sizeof(*h);
881
882         /* Number of entries in the map */
883         ceph_encode_32(&p, metadata_key_count);
884
885         /* Two length-prefixed strings for each entry in the map */
886         for (i = 0; metadata[i][0] != NULL; ++i) {
887                 size_t const key_len = strlen(metadata[i][0]);
888                 size_t const val_len = strlen(metadata[i][1]);
889
890                 ceph_encode_32(&p, key_len);
891                 memcpy(p, metadata[i][0], key_len);
892                 p += key_len;
893                 ceph_encode_32(&p, val_len);
894                 memcpy(p, metadata[i][1], val_len);
895                 p += val_len;
896         }
897
898         return msg;
899 }
900
901 /*
902  * send session open request.
903  *
904  * called under mdsc->mutex
905  */
906 static int __open_session(struct ceph_mds_client *mdsc,
907                           struct ceph_mds_session *session)
908 {
909         struct ceph_msg *msg;
910         int mstate;
911         int mds = session->s_mds;
912
913         /* wait for mds to go active? */
914         mstate = ceph_mdsmap_get_state(mdsc->mdsmap, mds);
915         dout("open_session to mds%d (%s)\n", mds,
916              ceph_mds_state_name(mstate));
917         session->s_state = CEPH_MDS_SESSION_OPENING;
918         session->s_renew_requested = jiffies;
919
920         /* send connect message */
921         msg = create_session_open_msg(mdsc, session->s_seq);
922         if (!msg)
923                 return -ENOMEM;
924         ceph_con_send(&session->s_con, msg);
925         return 0;
926 }
927
928 /*
929  * open sessions for any export targets for the given mds
930  *
931  * called under mdsc->mutex
932  */
933 static struct ceph_mds_session *
934 __open_export_target_session(struct ceph_mds_client *mdsc, int target)
935 {
936         struct ceph_mds_session *session;
937
938         session = __ceph_lookup_mds_session(mdsc, target);
939         if (!session) {
940                 session = register_session(mdsc, target);
941                 if (IS_ERR(session))
942                         return session;
943         }
944         if (session->s_state == CEPH_MDS_SESSION_NEW ||
945             session->s_state == CEPH_MDS_SESSION_CLOSING)
946                 __open_session(mdsc, session);
947
948         return session;
949 }
950
951 struct ceph_mds_session *
952 ceph_mdsc_open_export_target_session(struct ceph_mds_client *mdsc, int target)
953 {
954         struct ceph_mds_session *session;
955
956         dout("open_export_target_session to mds%d\n", target);
957
958         mutex_lock(&mdsc->mutex);
959         session = __open_export_target_session(mdsc, target);
960         mutex_unlock(&mdsc->mutex);
961
962         return session;
963 }
964
965 static void __open_export_target_sessions(struct ceph_mds_client *mdsc,
966                                           struct ceph_mds_session *session)
967 {
968         struct ceph_mds_info *mi;
969         struct ceph_mds_session *ts;
970         int i, mds = session->s_mds;
971
972         if (mds >= mdsc->mdsmap->m_max_mds)
973                 return;
974
975         mi = &mdsc->mdsmap->m_info[mds];
976         dout("open_export_target_sessions for mds%d (%d targets)\n",
977              session->s_mds, mi->num_export_targets);
978
979         for (i = 0; i < mi->num_export_targets; i++) {
980                 ts = __open_export_target_session(mdsc, mi->export_targets[i]);
981                 if (!IS_ERR(ts))
982                         ceph_put_mds_session(ts);
983         }
984 }
985
986 void ceph_mdsc_open_export_target_sessions(struct ceph_mds_client *mdsc,
987                                            struct ceph_mds_session *session)
988 {
989         mutex_lock(&mdsc->mutex);
990         __open_export_target_sessions(mdsc, session);
991         mutex_unlock(&mdsc->mutex);
992 }
993
994 /*
995  * session caps
996  */
997
998 /* caller holds s_cap_lock, we drop it */
999 static void cleanup_cap_releases(struct ceph_mds_client *mdsc,
1000                                  struct ceph_mds_session *session)
1001         __releases(session->s_cap_lock)
1002 {
1003         LIST_HEAD(tmp_list);
1004         list_splice_init(&session->s_cap_releases, &tmp_list);
1005         session->s_num_cap_releases = 0;
1006         spin_unlock(&session->s_cap_lock);
1007
1008         dout("cleanup_cap_releases mds%d\n", session->s_mds);
1009         while (!list_empty(&tmp_list)) {
1010                 struct ceph_cap *cap;
1011                 /* zero out the in-progress message */
1012                 cap = list_first_entry(&tmp_list,
1013                                         struct ceph_cap, session_caps);
1014                 list_del(&cap->session_caps);
1015                 ceph_put_cap(mdsc, cap);
1016         }
1017 }
1018
1019 static void cleanup_session_requests(struct ceph_mds_client *mdsc,
1020                                      struct ceph_mds_session *session)
1021 {
1022         struct ceph_mds_request *req;
1023         struct rb_node *p;
1024
1025         dout("cleanup_session_requests mds%d\n", session->s_mds);
1026         mutex_lock(&mdsc->mutex);
1027         while (!list_empty(&session->s_unsafe)) {
1028                 req = list_first_entry(&session->s_unsafe,
1029                                        struct ceph_mds_request, r_unsafe_item);
1030                 list_del_init(&req->r_unsafe_item);
1031                 pr_warn_ratelimited(" dropping unsafe request %llu\n",
1032                                     req->r_tid);
1033                 __unregister_request(mdsc, req);
1034         }
1035         /* zero r_attempts, so kick_requests() will re-send requests */
1036         p = rb_first(&mdsc->request_tree);
1037         while (p) {
1038                 req = rb_entry(p, struct ceph_mds_request, r_node);
1039                 p = rb_next(p);
1040                 if (req->r_session &&
1041                     req->r_session->s_mds == session->s_mds)
1042                         req->r_attempts = 0;
1043         }
1044         mutex_unlock(&mdsc->mutex);
1045 }
1046
1047 /*
1048  * Helper to safely iterate over all caps associated with a session, with
1049  * special care taken to handle a racing __ceph_remove_cap().
1050  *
1051  * Caller must hold session s_mutex.
1052  */
1053 static int iterate_session_caps(struct ceph_mds_session *session,
1054                                  int (*cb)(struct inode *, struct ceph_cap *,
1055                                             void *), void *arg)
1056 {
1057         struct list_head *p;
1058         struct ceph_cap *cap;
1059         struct inode *inode, *last_inode = NULL;
1060         struct ceph_cap *old_cap = NULL;
1061         int ret;
1062
1063         dout("iterate_session_caps %p mds%d\n", session, session->s_mds);
1064         spin_lock(&session->s_cap_lock);
1065         p = session->s_caps.next;
1066         while (p != &session->s_caps) {
1067                 cap = list_entry(p, struct ceph_cap, session_caps);
1068                 inode = igrab(&cap->ci->vfs_inode);
1069                 if (!inode) {
1070                         p = p->next;
1071                         continue;
1072                 }
1073                 session->s_cap_iterator = cap;
1074                 spin_unlock(&session->s_cap_lock);
1075
1076                 if (last_inode) {
1077                         iput(last_inode);
1078                         last_inode = NULL;
1079                 }
1080                 if (old_cap) {
1081                         ceph_put_cap(session->s_mdsc, old_cap);
1082                         old_cap = NULL;
1083                 }
1084
1085                 ret = cb(inode, cap, arg);
1086                 last_inode = inode;
1087
1088                 spin_lock(&session->s_cap_lock);
1089                 p = p->next;
1090                 if (cap->ci == NULL) {
1091                         dout("iterate_session_caps  finishing cap %p removal\n",
1092                              cap);
1093                         BUG_ON(cap->session != session);
1094                         cap->session = NULL;
1095                         list_del_init(&cap->session_caps);
1096                         session->s_nr_caps--;
1097                         if (cap->queue_release) {
1098                                 list_add_tail(&cap->session_caps,
1099                                               &session->s_cap_releases);
1100                                 session->s_num_cap_releases++;
1101                         } else {
1102                                 old_cap = cap;  /* put_cap it w/o locks held */
1103                         }
1104                 }
1105                 if (ret < 0)
1106                         goto out;
1107         }
1108         ret = 0;
1109 out:
1110         session->s_cap_iterator = NULL;
1111         spin_unlock(&session->s_cap_lock);
1112
1113         iput(last_inode);
1114         if (old_cap)
1115                 ceph_put_cap(session->s_mdsc, old_cap);
1116
1117         return ret;
1118 }
1119
1120 static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
1121                                   void *arg)
1122 {
1123         struct ceph_fs_client *fsc = (struct ceph_fs_client *)arg;
1124         struct ceph_inode_info *ci = ceph_inode(inode);
1125         LIST_HEAD(to_remove);
1126         bool drop = false;
1127         bool invalidate = false;
1128
1129         dout("removing cap %p, ci is %p, inode is %p\n",
1130              cap, ci, &ci->vfs_inode);
1131         spin_lock(&ci->i_ceph_lock);
1132         __ceph_remove_cap(cap, false);
1133         if (!ci->i_auth_cap) {
1134                 struct ceph_cap_flush *cf;
1135                 struct ceph_mds_client *mdsc = fsc->mdsc;
1136
1137                 ci->i_ceph_flags |= CEPH_I_CAP_DROPPED;
1138
1139                 if (ci->i_wrbuffer_ref > 0 &&
1140                     ACCESS_ONCE(fsc->mount_state) == CEPH_MOUNT_SHUTDOWN)
1141                         invalidate = true;
1142
1143                 while (true) {
1144                         struct rb_node *n = rb_first(&ci->i_cap_flush_tree);
1145                         if (!n)
1146                                 break;
1147                         cf = rb_entry(n, struct ceph_cap_flush, i_node);
1148                         rb_erase(&cf->i_node, &ci->i_cap_flush_tree);
1149                         list_add(&cf->list, &to_remove);
1150                 }
1151
1152                 spin_lock(&mdsc->cap_dirty_lock);
1153
1154                 list_for_each_entry(cf, &to_remove, list)
1155                         rb_erase(&cf->g_node, &mdsc->cap_flush_tree);
1156
1157                 if (!list_empty(&ci->i_dirty_item)) {
1158                         pr_warn_ratelimited(
1159                                 " dropping dirty %s state for %p %lld\n",
1160                                 ceph_cap_string(ci->i_dirty_caps),
1161                                 inode, ceph_ino(inode));
1162                         ci->i_dirty_caps = 0;
1163                         list_del_init(&ci->i_dirty_item);
1164                         drop = true;
1165                 }
1166                 if (!list_empty(&ci->i_flushing_item)) {
1167                         pr_warn_ratelimited(
1168                                 " dropping dirty+flushing %s state for %p %lld\n",
1169                                 ceph_cap_string(ci->i_flushing_caps),
1170                                 inode, ceph_ino(inode));
1171                         ci->i_flushing_caps = 0;
1172                         list_del_init(&ci->i_flushing_item);
1173                         mdsc->num_cap_flushing--;
1174                         drop = true;
1175                 }
1176                 spin_unlock(&mdsc->cap_dirty_lock);
1177
1178                 if (!ci->i_dirty_caps && ci->i_prealloc_cap_flush) {
1179                         list_add(&ci->i_prealloc_cap_flush->list, &to_remove);
1180                         ci->i_prealloc_cap_flush = NULL;
1181                 }
1182         }
1183         spin_unlock(&ci->i_ceph_lock);
1184         while (!list_empty(&to_remove)) {
1185                 struct ceph_cap_flush *cf;
1186                 cf = list_first_entry(&to_remove,
1187                                       struct ceph_cap_flush, list);
1188                 list_del(&cf->list);
1189                 ceph_free_cap_flush(cf);
1190         }
1191
1192         wake_up_all(&ci->i_cap_wq);
1193         if (invalidate)
1194                 ceph_queue_invalidate(inode);
1195         if (drop)
1196                 iput(inode);
1197         return 0;
1198 }
1199
1200 /*
1201  * caller must hold session s_mutex
1202  */
1203 static void remove_session_caps(struct ceph_mds_session *session)
1204 {
1205         struct ceph_fs_client *fsc = session->s_mdsc->fsc;
1206         struct super_block *sb = fsc->sb;
1207         dout("remove_session_caps on %p\n", session);
1208         iterate_session_caps(session, remove_session_caps_cb, fsc);
1209
1210         spin_lock(&session->s_cap_lock);
1211         if (session->s_nr_caps > 0) {
1212                 struct inode *inode;
1213                 struct ceph_cap *cap, *prev = NULL;
1214                 struct ceph_vino vino;
1215                 /*
1216                  * iterate_session_caps() skips inodes that are being
1217                  * deleted, we need to wait until deletions are complete.
1218                  * __wait_on_freeing_inode() is designed for the job,
1219                  * but it is not exported, so use lookup inode function
1220                  * to access it.
1221                  */
1222                 while (!list_empty(&session->s_caps)) {
1223                         cap = list_entry(session->s_caps.next,
1224                                          struct ceph_cap, session_caps);
1225                         if (cap == prev)
1226                                 break;
1227                         prev = cap;
1228                         vino = cap->ci->i_vino;
1229                         spin_unlock(&session->s_cap_lock);
1230
1231                         inode = ceph_find_inode(sb, vino);
1232                         iput(inode);
1233
1234                         spin_lock(&session->s_cap_lock);
1235                 }
1236         }
1237
1238         // drop cap expires and unlock s_cap_lock
1239         cleanup_cap_releases(session->s_mdsc, session);
1240
1241         BUG_ON(session->s_nr_caps > 0);
1242         BUG_ON(!list_empty(&session->s_cap_flushing));
1243 }
1244
1245 /*
1246  * wake up any threads waiting on this session's caps.  if the cap is
1247  * old (didn't get renewed on the client reconnect), remove it now.
1248  *
1249  * caller must hold s_mutex.
1250  */
1251 static int wake_up_session_cb(struct inode *inode, struct ceph_cap *cap,
1252                               void *arg)
1253 {
1254         struct ceph_inode_info *ci = ceph_inode(inode);
1255
1256         wake_up_all(&ci->i_cap_wq);
1257         if (arg) {
1258                 spin_lock(&ci->i_ceph_lock);
1259                 ci->i_wanted_max_size = 0;
1260                 ci->i_requested_max_size = 0;
1261                 spin_unlock(&ci->i_ceph_lock);
1262         }
1263         return 0;
1264 }
1265
1266 static void wake_up_session_caps(struct ceph_mds_session *session,
1267                                  int reconnect)
1268 {
1269         dout("wake_up_session_caps %p mds%d\n", session, session->s_mds);
1270         iterate_session_caps(session, wake_up_session_cb,
1271                              (void *)(unsigned long)reconnect);
1272 }
1273
1274 /*
1275  * Send periodic message to MDS renewing all currently held caps.  The
1276  * ack will reset the expiration for all caps from this session.
1277  *
1278  * caller holds s_mutex
1279  */
1280 static int send_renew_caps(struct ceph_mds_client *mdsc,
1281                            struct ceph_mds_session *session)
1282 {
1283         struct ceph_msg *msg;
1284         int state;
1285
1286         if (time_after_eq(jiffies, session->s_cap_ttl) &&
1287             time_after_eq(session->s_cap_ttl, session->s_renew_requested))
1288                 pr_info("mds%d caps stale\n", session->s_mds);
1289         session->s_renew_requested = jiffies;
1290
1291         /* do not try to renew caps until a recovering mds has reconnected
1292          * with its clients. */
1293         state = ceph_mdsmap_get_state(mdsc->mdsmap, session->s_mds);
1294         if (state < CEPH_MDS_STATE_RECONNECT) {
1295                 dout("send_renew_caps ignoring mds%d (%s)\n",
1296                      session->s_mds, ceph_mds_state_name(state));
1297                 return 0;
1298         }
1299
1300         dout("send_renew_caps to mds%d (%s)\n", session->s_mds,
1301                 ceph_mds_state_name(state));
1302         msg = create_session_msg(CEPH_SESSION_REQUEST_RENEWCAPS,
1303                                  ++session->s_renew_seq);
1304         if (!msg)
1305                 return -ENOMEM;
1306         ceph_con_send(&session->s_con, msg);
1307         return 0;
1308 }
1309
1310 static int send_flushmsg_ack(struct ceph_mds_client *mdsc,
1311                              struct ceph_mds_session *session, u64 seq)
1312 {
1313         struct ceph_msg *msg;
1314
1315         dout("send_flushmsg_ack to mds%d (%s)s seq %lld\n",
1316              session->s_mds, ceph_session_state_name(session->s_state), seq);
1317         msg = create_session_msg(CEPH_SESSION_FLUSHMSG_ACK, seq);
1318         if (!msg)
1319                 return -ENOMEM;
1320         ceph_con_send(&session->s_con, msg);
1321         return 0;
1322 }
1323
1324
1325 /*
1326  * Note new cap ttl, and any transition from stale -> not stale (fresh?).
1327  *
1328  * Called under session->s_mutex
1329  */
1330 static void renewed_caps(struct ceph_mds_client *mdsc,
1331                          struct ceph_mds_session *session, int is_renew)
1332 {
1333         int was_stale;
1334         int wake = 0;
1335
1336         spin_lock(&session->s_cap_lock);
1337         was_stale = is_renew && time_after_eq(jiffies, session->s_cap_ttl);
1338
1339         session->s_cap_ttl = session->s_renew_requested +
1340                 mdsc->mdsmap->m_session_timeout*HZ;
1341
1342         if (was_stale) {
1343                 if (time_before(jiffies, session->s_cap_ttl)) {
1344                         pr_info("mds%d caps renewed\n", session->s_mds);
1345                         wake = 1;
1346                 } else {
1347                         pr_info("mds%d caps still stale\n", session->s_mds);
1348                 }
1349         }
1350         dout("renewed_caps mds%d ttl now %lu, was %s, now %s\n",
1351              session->s_mds, session->s_cap_ttl, was_stale ? "stale" : "fresh",
1352              time_before(jiffies, session->s_cap_ttl) ? "stale" : "fresh");
1353         spin_unlock(&session->s_cap_lock);
1354
1355         if (wake)
1356                 wake_up_session_caps(session, 0);
1357 }
1358
1359 /*
1360  * send a session close request
1361  */
1362 static int request_close_session(struct ceph_mds_client *mdsc,
1363                                  struct ceph_mds_session *session)
1364 {
1365         struct ceph_msg *msg;
1366
1367         dout("request_close_session mds%d state %s seq %lld\n",
1368              session->s_mds, ceph_session_state_name(session->s_state),
1369              session->s_seq);
1370         msg = create_session_msg(CEPH_SESSION_REQUEST_CLOSE, session->s_seq);
1371         if (!msg)
1372                 return -ENOMEM;
1373         ceph_con_send(&session->s_con, msg);
1374         return 0;
1375 }
1376
1377 /*
1378  * Called with s_mutex held.
1379  */
1380 static int __close_session(struct ceph_mds_client *mdsc,
1381                          struct ceph_mds_session *session)
1382 {
1383         if (session->s_state >= CEPH_MDS_SESSION_CLOSING)
1384                 return 0;
1385         session->s_state = CEPH_MDS_SESSION_CLOSING;
1386         return request_close_session(mdsc, session);
1387 }
1388
1389 /*
1390  * Trim old(er) caps.
1391  *
1392  * Because we can't cache an inode without one or more caps, we do
1393  * this indirectly: if a cap is unused, we prune its aliases, at which
1394  * point the inode will hopefully get dropped to.
1395  *
1396  * Yes, this is a bit sloppy.  Our only real goal here is to respond to
1397  * memory pressure from the MDS, though, so it needn't be perfect.
1398  */
1399 static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg)
1400 {
1401         struct ceph_mds_session *session = arg;
1402         struct ceph_inode_info *ci = ceph_inode(inode);
1403         int used, wanted, oissued, mine;
1404
1405         if (session->s_trim_caps <= 0)
1406                 return -1;
1407
1408         spin_lock(&ci->i_ceph_lock);
1409         mine = cap->issued | cap->implemented;
1410         used = __ceph_caps_used(ci);
1411         wanted = __ceph_caps_file_wanted(ci);
1412         oissued = __ceph_caps_issued_other(ci, cap);
1413
1414         dout("trim_caps_cb %p cap %p mine %s oissued %s used %s wanted %s\n",
1415              inode, cap, ceph_cap_string(mine), ceph_cap_string(oissued),
1416              ceph_cap_string(used), ceph_cap_string(wanted));
1417         if (cap == ci->i_auth_cap) {
1418                 if (ci->i_dirty_caps || ci->i_flushing_caps ||
1419                     !list_empty(&ci->i_cap_snaps))
1420                         goto out;
1421                 if ((used | wanted) & CEPH_CAP_ANY_WR)
1422                         goto out;
1423         }
1424         /* The inode has cached pages, but it's no longer used.
1425          * we can safely drop it */
1426         if (wanted == 0 && used == CEPH_CAP_FILE_CACHE &&
1427             !(oissued & CEPH_CAP_FILE_CACHE)) {
1428           used = 0;
1429           oissued = 0;
1430         }
1431         if ((used | wanted) & ~oissued & mine)
1432                 goto out;   /* we need these caps */
1433
1434         session->s_trim_caps--;
1435         if (oissued) {
1436                 /* we aren't the only cap.. just remove us */
1437                 __ceph_remove_cap(cap, true);
1438         } else {
1439                 /* try dropping referring dentries */
1440                 spin_unlock(&ci->i_ceph_lock);
1441                 d_prune_aliases(inode);
1442                 dout("trim_caps_cb %p cap %p  pruned, count now %d\n",
1443                      inode, cap, atomic_read(&inode->i_count));
1444                 return 0;
1445         }
1446
1447 out:
1448         spin_unlock(&ci->i_ceph_lock);
1449         return 0;
1450 }
1451
1452 /*
1453  * Trim session cap count down to some max number.
1454  */
1455 static int trim_caps(struct ceph_mds_client *mdsc,
1456                      struct ceph_mds_session *session,
1457                      int max_caps)
1458 {
1459         int trim_caps = session->s_nr_caps - max_caps;
1460
1461         dout("trim_caps mds%d start: %d / %d, trim %d\n",
1462              session->s_mds, session->s_nr_caps, max_caps, trim_caps);
1463         if (trim_caps > 0) {
1464                 session->s_trim_caps = trim_caps;
1465                 iterate_session_caps(session, trim_caps_cb, session);
1466                 dout("trim_caps mds%d done: %d / %d, trimmed %d\n",
1467                      session->s_mds, session->s_nr_caps, max_caps,
1468                         trim_caps - session->s_trim_caps);
1469                 session->s_trim_caps = 0;
1470         }
1471
1472         ceph_send_cap_releases(mdsc, session);
1473         return 0;
1474 }
1475
1476 static int check_capsnap_flush(struct ceph_inode_info *ci,
1477                                u64 want_snap_seq)
1478 {
1479         int ret = 1;
1480         spin_lock(&ci->i_ceph_lock);
1481         if (want_snap_seq > 0 && !list_empty(&ci->i_cap_snaps)) {
1482                 struct ceph_cap_snap *capsnap =
1483                         list_first_entry(&ci->i_cap_snaps,
1484                                          struct ceph_cap_snap, ci_item);
1485                 ret = capsnap->follows >= want_snap_seq;
1486         }
1487         spin_unlock(&ci->i_ceph_lock);
1488         return ret;
1489 }
1490
1491 static int check_caps_flush(struct ceph_mds_client *mdsc,
1492                             u64 want_flush_tid)
1493 {
1494         struct rb_node *n;
1495         struct ceph_cap_flush *cf;
1496         int ret = 1;
1497
1498         spin_lock(&mdsc->cap_dirty_lock);
1499         n = rb_first(&mdsc->cap_flush_tree);
1500         cf = n ? rb_entry(n, struct ceph_cap_flush, g_node) : NULL;
1501         if (cf && cf->tid <= want_flush_tid) {
1502                 dout("check_caps_flush still flushing tid %llu <= %llu\n",
1503                      cf->tid, want_flush_tid);
1504                 ret = 0;
1505         }
1506         spin_unlock(&mdsc->cap_dirty_lock);
1507         return ret;
1508 }
1509
1510 /*
1511  * flush all dirty inode data to disk.
1512  *
1513  * returns true if we've flushed through want_flush_tid
1514  */
1515 static void wait_caps_flush(struct ceph_mds_client *mdsc,
1516                             u64 want_flush_tid, u64 want_snap_seq)
1517 {
1518         int mds;
1519
1520         dout("check_caps_flush want %llu snap want %llu\n",
1521              want_flush_tid, want_snap_seq);
1522         mutex_lock(&mdsc->mutex);
1523         for (mds = 0; mds < mdsc->max_sessions; ) {
1524                 struct ceph_mds_session *session = mdsc->sessions[mds];
1525                 struct inode *inode = NULL;
1526
1527                 if (!session) {
1528                         mds++;
1529                         continue;
1530                 }
1531                 get_session(session);
1532                 mutex_unlock(&mdsc->mutex);
1533
1534                 mutex_lock(&session->s_mutex);
1535                 if (!list_empty(&session->s_cap_snaps_flushing)) {
1536                         struct ceph_cap_snap *capsnap =
1537                                 list_first_entry(&session->s_cap_snaps_flushing,
1538                                                  struct ceph_cap_snap,
1539                                                  flushing_item);
1540                         struct ceph_inode_info *ci = capsnap->ci;
1541                         if (!check_capsnap_flush(ci, want_snap_seq)) {
1542                                 dout("check_cap_flush still flushing snap %p "
1543                                      "follows %lld <= %lld to mds%d\n",
1544                                      &ci->vfs_inode, capsnap->follows,
1545                                      want_snap_seq, mds);
1546                                 inode = igrab(&ci->vfs_inode);
1547                         }
1548                 }
1549                 mutex_unlock(&session->s_mutex);
1550                 ceph_put_mds_session(session);
1551
1552                 if (inode) {
1553                         wait_event(mdsc->cap_flushing_wq,
1554                                    check_capsnap_flush(ceph_inode(inode),
1555                                                        want_snap_seq));
1556                         iput(inode);
1557                 } else {
1558                         mds++;
1559                 }
1560
1561                 mutex_lock(&mdsc->mutex);
1562         }
1563         mutex_unlock(&mdsc->mutex);
1564
1565         wait_event(mdsc->cap_flushing_wq,
1566                    check_caps_flush(mdsc, want_flush_tid));
1567
1568         dout("check_caps_flush ok, flushed thru %llu\n", want_flush_tid);
1569 }
1570
1571 /*
1572  * called under s_mutex
1573  */
1574 void ceph_send_cap_releases(struct ceph_mds_client *mdsc,
1575                             struct ceph_mds_session *session)
1576 {
1577         struct ceph_msg *msg = NULL;
1578         struct ceph_mds_cap_release *head;
1579         struct ceph_mds_cap_item *item;
1580         struct ceph_cap *cap;
1581         LIST_HEAD(tmp_list);
1582         int num_cap_releases;
1583
1584         spin_lock(&session->s_cap_lock);
1585 again:
1586         list_splice_init(&session->s_cap_releases, &tmp_list);
1587         num_cap_releases = session->s_num_cap_releases;
1588         session->s_num_cap_releases = 0;
1589         spin_unlock(&session->s_cap_lock);
1590
1591         while (!list_empty(&tmp_list)) {
1592                 if (!msg) {
1593                         msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE,
1594                                         PAGE_SIZE, GFP_NOFS, false);
1595                         if (!msg)
1596                                 goto out_err;
1597                         head = msg->front.iov_base;
1598                         head->num = cpu_to_le32(0);
1599                         msg->front.iov_len = sizeof(*head);
1600                 }
1601                 cap = list_first_entry(&tmp_list, struct ceph_cap,
1602                                         session_caps);
1603                 list_del(&cap->session_caps);
1604                 num_cap_releases--;
1605
1606                 head = msg->front.iov_base;
1607                 le32_add_cpu(&head->num, 1);
1608                 item = msg->front.iov_base + msg->front.iov_len;
1609                 item->ino = cpu_to_le64(cap->cap_ino);
1610                 item->cap_id = cpu_to_le64(cap->cap_id);
1611                 item->migrate_seq = cpu_to_le32(cap->mseq);
1612                 item->seq = cpu_to_le32(cap->issue_seq);
1613                 msg->front.iov_len += sizeof(*item);
1614
1615                 ceph_put_cap(mdsc, cap);
1616
1617                 if (le32_to_cpu(head->num) == CEPH_CAPS_PER_RELEASE) {
1618                         msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
1619                         dout("send_cap_releases mds%d %p\n", session->s_mds, msg);
1620                         ceph_con_send(&session->s_con, msg);
1621                         msg = NULL;
1622                 }
1623         }
1624
1625         BUG_ON(num_cap_releases != 0);
1626
1627         spin_lock(&session->s_cap_lock);
1628         if (!list_empty(&session->s_cap_releases))
1629                 goto again;
1630         spin_unlock(&session->s_cap_lock);
1631
1632         if (msg) {
1633                 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
1634                 dout("send_cap_releases mds%d %p\n", session->s_mds, msg);
1635                 ceph_con_send(&session->s_con, msg);
1636         }
1637         return;
1638 out_err:
1639         pr_err("send_cap_releases mds%d, failed to allocate message\n",
1640                 session->s_mds);
1641         spin_lock(&session->s_cap_lock);
1642         list_splice(&tmp_list, &session->s_cap_releases);
1643         session->s_num_cap_releases += num_cap_releases;
1644         spin_unlock(&session->s_cap_lock);
1645 }
1646
1647 /*
1648  * requests
1649  */
1650
1651 int ceph_alloc_readdir_reply_buffer(struct ceph_mds_request *req,
1652                                     struct inode *dir)
1653 {
1654         struct ceph_inode_info *ci = ceph_inode(dir);
1655         struct ceph_mds_reply_info_parsed *rinfo = &req->r_reply_info;
1656         struct ceph_mount_options *opt = req->r_mdsc->fsc->mount_options;
1657         size_t size = sizeof(*rinfo->dir_in) + sizeof(*rinfo->dir_dname_len) +
1658                       sizeof(*rinfo->dir_dname) + sizeof(*rinfo->dir_dlease);
1659         int order, num_entries;
1660
1661         spin_lock(&ci->i_ceph_lock);
1662         num_entries = ci->i_files + ci->i_subdirs;
1663         spin_unlock(&ci->i_ceph_lock);
1664         num_entries = max(num_entries, 1);
1665         num_entries = min(num_entries, opt->max_readdir);
1666
1667         order = get_order(size * num_entries);
1668         while (order >= 0) {
1669                 rinfo->dir_in = (void*)__get_free_pages(GFP_KERNEL |
1670                                                         __GFP_NOWARN,
1671                                                         order);
1672                 if (rinfo->dir_in)
1673                         break;
1674                 order--;
1675         }
1676         if (!rinfo->dir_in)
1677                 return -ENOMEM;
1678
1679         num_entries = (PAGE_SIZE << order) / size;
1680         num_entries = min(num_entries, opt->max_readdir);
1681
1682         rinfo->dir_buf_size = PAGE_SIZE << order;
1683         req->r_num_caps = num_entries + 1;
1684         req->r_args.readdir.max_entries = cpu_to_le32(num_entries);
1685         req->r_args.readdir.max_bytes = cpu_to_le32(opt->max_readdir_bytes);
1686         return 0;
1687 }
1688
1689 /*
1690  * Create an mds request.
1691  */
1692 struct ceph_mds_request *
1693 ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode)
1694 {
1695         struct ceph_mds_request *req = kzalloc(sizeof(*req), GFP_NOFS);
1696
1697         if (!req)
1698                 return ERR_PTR(-ENOMEM);
1699
1700         mutex_init(&req->r_fill_mutex);
1701         req->r_mdsc = mdsc;
1702         req->r_started = jiffies;
1703         req->r_resend_mds = -1;
1704         INIT_LIST_HEAD(&req->r_unsafe_dir_item);
1705         INIT_LIST_HEAD(&req->r_unsafe_target_item);
1706         req->r_fmode = -1;
1707         kref_init(&req->r_kref);
1708         RB_CLEAR_NODE(&req->r_node);
1709         INIT_LIST_HEAD(&req->r_wait);
1710         init_completion(&req->r_completion);
1711         init_completion(&req->r_safe_completion);
1712         INIT_LIST_HEAD(&req->r_unsafe_item);
1713
1714         req->r_stamp = current_fs_time(mdsc->fsc->sb);
1715
1716         req->r_op = op;
1717         req->r_direct_mode = mode;
1718         return req;
1719 }
1720
1721 /*
1722  * return oldest (lowest) request, tid in request tree, 0 if none.
1723  *
1724  * called under mdsc->mutex.
1725  */
1726 static struct ceph_mds_request *__get_oldest_req(struct ceph_mds_client *mdsc)
1727 {
1728         if (RB_EMPTY_ROOT(&mdsc->request_tree))
1729                 return NULL;
1730         return rb_entry(rb_first(&mdsc->request_tree),
1731                         struct ceph_mds_request, r_node);
1732 }
1733
1734 static inline  u64 __get_oldest_tid(struct ceph_mds_client *mdsc)
1735 {
1736         return mdsc->oldest_tid;
1737 }
1738
1739 /*
1740  * Build a dentry's path.  Allocate on heap; caller must kfree.  Based
1741  * on build_path_from_dentry in fs/cifs/dir.c.
1742  *
1743  * If @stop_on_nosnap, generate path relative to the first non-snapped
1744  * inode.
1745  *
1746  * Encode hidden .snap dirs as a double /, i.e.
1747  *   foo/.snap/bar -> foo//bar
1748  */
1749 char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *base,
1750                            int stop_on_nosnap)
1751 {
1752         struct dentry *temp;
1753         char *path;
1754         int len, pos;
1755         unsigned seq;
1756
1757         if (dentry == NULL)
1758                 return ERR_PTR(-EINVAL);
1759
1760 retry:
1761         len = 0;
1762         seq = read_seqbegin(&rename_lock);
1763         rcu_read_lock();
1764         for (temp = dentry; !IS_ROOT(temp);) {
1765                 struct inode *inode = d_inode(temp);
1766                 if (inode && ceph_snap(inode) == CEPH_SNAPDIR)
1767                         len++;  /* slash only */
1768                 else if (stop_on_nosnap && inode &&
1769                          ceph_snap(inode) == CEPH_NOSNAP)
1770                         break;
1771                 else
1772                         len += 1 + temp->d_name.len;
1773                 temp = temp->d_parent;
1774         }
1775         rcu_read_unlock();
1776         if (len)
1777                 len--;  /* no leading '/' */
1778
1779         path = kmalloc(len+1, GFP_NOFS);
1780         if (path == NULL)
1781                 return ERR_PTR(-ENOMEM);
1782         pos = len;
1783         path[pos] = 0;  /* trailing null */
1784         rcu_read_lock();
1785         for (temp = dentry; !IS_ROOT(temp) && pos != 0; ) {
1786                 struct inode *inode;
1787
1788                 spin_lock(&temp->d_lock);
1789                 inode = d_inode(temp);
1790                 if (inode && ceph_snap(inode) == CEPH_SNAPDIR) {
1791                         dout("build_path path+%d: %p SNAPDIR\n",
1792                              pos, temp);
1793                 } else if (stop_on_nosnap && inode &&
1794                            ceph_snap(inode) == CEPH_NOSNAP) {
1795                         spin_unlock(&temp->d_lock);
1796                         break;
1797                 } else {
1798                         pos -= temp->d_name.len;
1799                         if (pos < 0) {
1800                                 spin_unlock(&temp->d_lock);
1801                                 break;
1802                         }
1803                         strncpy(path + pos, temp->d_name.name,
1804                                 temp->d_name.len);
1805                 }
1806                 spin_unlock(&temp->d_lock);
1807                 if (pos)
1808                         path[--pos] = '/';
1809                 temp = temp->d_parent;
1810         }
1811         rcu_read_unlock();
1812         if (pos != 0 || read_seqretry(&rename_lock, seq)) {
1813                 pr_err("build_path did not end path lookup where "
1814                        "expected, namelen is %d, pos is %d\n", len, pos);
1815                 /* presumably this is only possible if racing with a
1816                    rename of one of the parent directories (we can not
1817                    lock the dentries above us to prevent this, but
1818                    retrying should be harmless) */
1819                 kfree(path);
1820                 goto retry;
1821         }
1822
1823         *base = ceph_ino(d_inode(temp));
1824         *plen = len;
1825         dout("build_path on %p %d built %llx '%.*s'\n",
1826              dentry, d_count(dentry), *base, len, path);
1827         return path;
1828 }
1829
1830 static int build_dentry_path(struct dentry *dentry,
1831                              const char **ppath, int *ppathlen, u64 *pino,
1832                              int *pfreepath)
1833 {
1834         char *path;
1835
1836         if (ceph_snap(d_inode(dentry->d_parent)) == CEPH_NOSNAP) {
1837                 *pino = ceph_ino(d_inode(dentry->d_parent));
1838                 *ppath = dentry->d_name.name;
1839                 *ppathlen = dentry->d_name.len;
1840                 return 0;
1841         }
1842         path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1);
1843         if (IS_ERR(path))
1844                 return PTR_ERR(path);
1845         *ppath = path;
1846         *pfreepath = 1;
1847         return 0;
1848 }
1849
1850 static int build_inode_path(struct inode *inode,
1851                             const char **ppath, int *ppathlen, u64 *pino,
1852                             int *pfreepath)
1853 {
1854         struct dentry *dentry;
1855         char *path;
1856
1857         if (ceph_snap(inode) == CEPH_NOSNAP) {
1858                 *pino = ceph_ino(inode);
1859                 *ppathlen = 0;
1860                 return 0;
1861         }
1862         dentry = d_find_alias(inode);
1863         path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1);
1864         dput(dentry);
1865         if (IS_ERR(path))
1866                 return PTR_ERR(path);
1867         *ppath = path;
1868         *pfreepath = 1;
1869         return 0;
1870 }
1871
1872 /*
1873  * request arguments may be specified via an inode *, a dentry *, or
1874  * an explicit ino+path.
1875  */
1876 static int set_request_path_attr(struct inode *rinode, struct dentry *rdentry,
1877                                   const char *rpath, u64 rino,
1878                                   const char **ppath, int *pathlen,
1879                                   u64 *ino, int *freepath)
1880 {
1881         int r = 0;
1882
1883         if (rinode) {
1884                 r = build_inode_path(rinode, ppath, pathlen, ino, freepath);
1885                 dout(" inode %p %llx.%llx\n", rinode, ceph_ino(rinode),
1886                      ceph_snap(rinode));
1887         } else if (rdentry) {
1888                 r = build_dentry_path(rdentry, ppath, pathlen, ino, freepath);
1889                 dout(" dentry %p %llx/%.*s\n", rdentry, *ino, *pathlen,
1890                      *ppath);
1891         } else if (rpath || rino) {
1892                 *ino = rino;
1893                 *ppath = rpath;
1894                 *pathlen = rpath ? strlen(rpath) : 0;
1895                 dout(" path %.*s\n", *pathlen, rpath);
1896         }
1897
1898         return r;
1899 }
1900
1901 /*
1902  * called under mdsc->mutex
1903  */
1904 static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
1905                                                struct ceph_mds_request *req,
1906                                                int mds, bool drop_cap_releases)
1907 {
1908         struct ceph_msg *msg;
1909         struct ceph_mds_request_head *head;
1910         const char *path1 = NULL;
1911         const char *path2 = NULL;
1912         u64 ino1 = 0, ino2 = 0;
1913         int pathlen1 = 0, pathlen2 = 0;
1914         int freepath1 = 0, freepath2 = 0;
1915         int len;
1916         u16 releases;
1917         void *p, *end;
1918         int ret;
1919
1920         ret = set_request_path_attr(req->r_inode, req->r_dentry,
1921                               req->r_path1, req->r_ino1.ino,
1922                               &path1, &pathlen1, &ino1, &freepath1);
1923         if (ret < 0) {
1924                 msg = ERR_PTR(ret);
1925                 goto out;
1926         }
1927
1928         ret = set_request_path_attr(NULL, req->r_old_dentry,
1929                               req->r_path2, req->r_ino2.ino,
1930                               &path2, &pathlen2, &ino2, &freepath2);
1931         if (ret < 0) {
1932                 msg = ERR_PTR(ret);
1933                 goto out_free1;
1934         }
1935
1936         len = sizeof(*head) +
1937                 pathlen1 + pathlen2 + 2*(1 + sizeof(u32) + sizeof(u64)) +
1938                 sizeof(struct ceph_timespec);
1939
1940         /* calculate (max) length for cap releases */
1941         len += sizeof(struct ceph_mds_request_release) *
1942                 (!!req->r_inode_drop + !!req->r_dentry_drop +
1943                  !!req->r_old_inode_drop + !!req->r_old_dentry_drop);
1944         if (req->r_dentry_drop)
1945                 len += req->r_dentry->d_name.len;
1946         if (req->r_old_dentry_drop)
1947                 len += req->r_old_dentry->d_name.len;
1948
1949         msg = ceph_msg_new(CEPH_MSG_CLIENT_REQUEST, len, GFP_NOFS, false);
1950         if (!msg) {
1951                 msg = ERR_PTR(-ENOMEM);
1952                 goto out_free2;
1953         }
1954
1955         msg->hdr.version = cpu_to_le16(2);
1956         msg->hdr.tid = cpu_to_le64(req->r_tid);
1957
1958         head = msg->front.iov_base;
1959         p = msg->front.iov_base + sizeof(*head);
1960         end = msg->front.iov_base + msg->front.iov_len;
1961
1962         head->mdsmap_epoch = cpu_to_le32(mdsc->mdsmap->m_epoch);
1963         head->op = cpu_to_le32(req->r_op);
1964         head->caller_uid = cpu_to_le32(from_kuid(&init_user_ns, req->r_uid));
1965         head->caller_gid = cpu_to_le32(from_kgid(&init_user_ns, req->r_gid));
1966         head->args = req->r_args;
1967
1968         ceph_encode_filepath(&p, end, ino1, path1);
1969         ceph_encode_filepath(&p, end, ino2, path2);
1970
1971         /* make note of release offset, in case we need to replay */
1972         req->r_request_release_offset = p - msg->front.iov_base;
1973
1974         /* cap releases */
1975         releases = 0;
1976         if (req->r_inode_drop)
1977                 releases += ceph_encode_inode_release(&p,
1978                       req->r_inode ? req->r_inode : d_inode(req->r_dentry),
1979                       mds, req->r_inode_drop, req->r_inode_unless, 0);
1980         if (req->r_dentry_drop)
1981                 releases += ceph_encode_dentry_release(&p, req->r_dentry,
1982                        mds, req->r_dentry_drop, req->r_dentry_unless);
1983         if (req->r_old_dentry_drop)
1984                 releases += ceph_encode_dentry_release(&p, req->r_old_dentry,
1985                        mds, req->r_old_dentry_drop, req->r_old_dentry_unless);
1986         if (req->r_old_inode_drop)
1987                 releases += ceph_encode_inode_release(&p,
1988                       d_inode(req->r_old_dentry),
1989                       mds, req->r_old_inode_drop, req->r_old_inode_unless, 0);
1990
1991         if (drop_cap_releases) {
1992                 releases = 0;
1993                 p = msg->front.iov_base + req->r_request_release_offset;
1994         }
1995
1996         head->num_releases = cpu_to_le16(releases);
1997
1998         /* time stamp */
1999         {
2000                 struct ceph_timespec ts;
2001                 ceph_encode_timespec(&ts, &req->r_stamp);
2002                 ceph_encode_copy(&p, &ts, sizeof(ts));
2003         }
2004
2005         BUG_ON(p > end);
2006         msg->front.iov_len = p - msg->front.iov_base;
2007         msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
2008
2009         if (req->r_pagelist) {
2010                 struct ceph_pagelist *pagelist = req->r_pagelist;
2011                 atomic_inc(&pagelist->refcnt);
2012                 ceph_msg_data_add_pagelist(msg, pagelist);
2013                 msg->hdr.data_len = cpu_to_le32(pagelist->length);
2014         } else {
2015                 msg->hdr.data_len = 0;
2016         }
2017
2018         msg->hdr.data_off = cpu_to_le16(0);
2019
2020 out_free2:
2021         if (freepath2)
2022                 kfree((char *)path2);
2023 out_free1:
2024         if (freepath1)
2025                 kfree((char *)path1);
2026 out:
2027         return msg;
2028 }
2029
2030 /*
2031  * called under mdsc->mutex if error, under no mutex if
2032  * success.
2033  */
2034 static void complete_request(struct ceph_mds_client *mdsc,
2035                              struct ceph_mds_request *req)
2036 {
2037         if (req->r_callback)
2038                 req->r_callback(mdsc, req);
2039         else
2040                 complete_all(&req->r_completion);
2041 }
2042
2043 /*
2044  * called under mdsc->mutex
2045  */
2046 static int __prepare_send_request(struct ceph_mds_client *mdsc,
2047                                   struct ceph_mds_request *req,
2048                                   int mds, bool drop_cap_releases)
2049 {
2050         struct ceph_mds_request_head *rhead;
2051         struct ceph_msg *msg;
2052         int flags = 0;
2053
2054         req->r_attempts++;
2055         if (req->r_inode) {
2056                 struct ceph_cap *cap =
2057                         ceph_get_cap_for_mds(ceph_inode(req->r_inode), mds);
2058
2059                 if (cap)
2060                         req->r_sent_on_mseq = cap->mseq;
2061                 else
2062                         req->r_sent_on_mseq = -1;
2063         }
2064         dout("prepare_send_request %p tid %lld %s (attempt %d)\n", req,
2065              req->r_tid, ceph_mds_op_name(req->r_op), req->r_attempts);
2066
2067         if (req->r_got_unsafe) {
2068                 void *p;
2069                 /*
2070                  * Replay.  Do not regenerate message (and rebuild
2071                  * paths, etc.); just use the original message.
2072                  * Rebuilding paths will break for renames because
2073                  * d_move mangles the src name.
2074                  */
2075                 msg = req->r_request;
2076                 rhead = msg->front.iov_base;
2077
2078                 flags = le32_to_cpu(rhead->flags);
2079                 flags |= CEPH_MDS_FLAG_REPLAY;
2080                 rhead->flags = cpu_to_le32(flags);
2081
2082                 if (req->r_target_inode)
2083                         rhead->ino = cpu_to_le64(ceph_ino(req->r_target_inode));
2084
2085                 rhead->num_retry = req->r_attempts - 1;
2086
2087                 /* remove cap/dentry releases from message */
2088                 rhead->num_releases = 0;
2089
2090                 /* time stamp */
2091                 p = msg->front.iov_base + req->r_request_release_offset;
2092                 {
2093                         struct ceph_timespec ts;
2094                         ceph_encode_timespec(&ts, &req->r_stamp);
2095                         ceph_encode_copy(&p, &ts, sizeof(ts));
2096                 }
2097
2098                 msg->front.iov_len = p - msg->front.iov_base;
2099                 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
2100                 return 0;
2101         }
2102
2103         if (req->r_request) {
2104                 ceph_msg_put(req->r_request);
2105                 req->r_request = NULL;
2106         }
2107         msg = create_request_message(mdsc, req, mds, drop_cap_releases);
2108         if (IS_ERR(msg)) {
2109                 req->r_err = PTR_ERR(msg);
2110                 return PTR_ERR(msg);
2111         }
2112         req->r_request = msg;
2113
2114         rhead = msg->front.iov_base;
2115         rhead->oldest_client_tid = cpu_to_le64(__get_oldest_tid(mdsc));
2116         if (req->r_got_unsafe)
2117                 flags |= CEPH_MDS_FLAG_REPLAY;
2118         if (req->r_locked_dir)
2119                 flags |= CEPH_MDS_FLAG_WANT_DENTRY;
2120         rhead->flags = cpu_to_le32(flags);
2121         rhead->num_fwd = req->r_num_fwd;
2122         rhead->num_retry = req->r_attempts - 1;
2123         rhead->ino = 0;
2124
2125         dout(" r_locked_dir = %p\n", req->r_locked_dir);
2126         return 0;
2127 }
2128
2129 /*
2130  * send request, or put it on the appropriate wait list.
2131  */
2132 static int __do_request(struct ceph_mds_client *mdsc,
2133                         struct ceph_mds_request *req)
2134 {
2135         struct ceph_mds_session *session = NULL;
2136         int mds = -1;
2137         int err = 0;
2138
2139         if (req->r_err || req->r_got_result) {
2140                 if (req->r_aborted)
2141                         __unregister_request(mdsc, req);
2142                 goto out;
2143         }
2144
2145         if (req->r_timeout &&
2146             time_after_eq(jiffies, req->r_started + req->r_timeout)) {
2147                 dout("do_request timed out\n");
2148                 err = -EIO;
2149                 goto finish;
2150         }
2151         if (ACCESS_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) {
2152                 dout("do_request forced umount\n");
2153                 err = -EIO;
2154                 goto finish;
2155         }
2156
2157         put_request_session(req);
2158
2159         mds = __choose_mds(mdsc, req);
2160         if (mds < 0 ||
2161             ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) {
2162                 dout("do_request no mds or not active, waiting for map\n");
2163                 list_add(&req->r_wait, &mdsc->waiting_for_map);
2164                 goto out;
2165         }
2166
2167         /* get, open session */
2168         session = __ceph_lookup_mds_session(mdsc, mds);
2169         if (!session) {
2170                 session = register_session(mdsc, mds);
2171                 if (IS_ERR(session)) {
2172                         err = PTR_ERR(session);
2173                         goto finish;
2174                 }
2175         }
2176         req->r_session = get_session(session);
2177
2178         dout("do_request mds%d session %p state %s\n", mds, session,
2179              ceph_session_state_name(session->s_state));
2180         if (session->s_state != CEPH_MDS_SESSION_OPEN &&
2181             session->s_state != CEPH_MDS_SESSION_HUNG) {
2182                 if (session->s_state == CEPH_MDS_SESSION_NEW ||
2183                     session->s_state == CEPH_MDS_SESSION_CLOSING)
2184                         __open_session(mdsc, session);
2185                 list_add(&req->r_wait, &session->s_waiting);
2186                 goto out_session;
2187         }
2188
2189         /* send request */
2190         req->r_resend_mds = -1;   /* forget any previous mds hint */
2191
2192         if (req->r_request_started == 0)   /* note request start time */
2193                 req->r_request_started = jiffies;
2194
2195         err = __prepare_send_request(mdsc, req, mds, false);
2196         if (!err) {
2197                 ceph_msg_get(req->r_request);
2198                 ceph_con_send(&session->s_con, req->r_request);
2199         }
2200
2201 out_session:
2202         ceph_put_mds_session(session);
2203 finish:
2204         if (err) {
2205                 dout("__do_request early error %d\n", err);
2206                 req->r_err = err;
2207                 complete_request(mdsc, req);
2208                 __unregister_request(mdsc, req);
2209         }
2210 out:
2211         return err;
2212 }
2213
2214 /*
2215  * called under mdsc->mutex
2216  */
2217 static void __wake_requests(struct ceph_mds_client *mdsc,
2218                             struct list_head *head)
2219 {
2220         struct ceph_mds_request *req;
2221         LIST_HEAD(tmp_list);
2222
2223         list_splice_init(head, &tmp_list);
2224
2225         while (!list_empty(&tmp_list)) {
2226                 req = list_entry(tmp_list.next,
2227                                  struct ceph_mds_request, r_wait);
2228                 list_del_init(&req->r_wait);
2229                 dout(" wake request %p tid %llu\n", req, req->r_tid);
2230                 __do_request(mdsc, req);
2231         }
2232 }
2233
2234 /*
2235  * Wake up threads with requests pending for @mds, so that they can
2236  * resubmit their requests to a possibly different mds.
2237  */
2238 static void kick_requests(struct ceph_mds_client *mdsc, int mds)
2239 {
2240         struct ceph_mds_request *req;
2241         struct rb_node *p = rb_first(&mdsc->request_tree);
2242
2243         dout("kick_requests mds%d\n", mds);
2244         while (p) {
2245                 req = rb_entry(p, struct ceph_mds_request, r_node);
2246                 p = rb_next(p);
2247                 if (req->r_got_unsafe)
2248                         continue;
2249                 if (req->r_attempts > 0)
2250                         continue; /* only new requests */
2251                 if (req->r_session &&
2252                     req->r_session->s_mds == mds) {
2253                         dout(" kicking tid %llu\n", req->r_tid);
2254                         list_del_init(&req->r_wait);
2255                         __do_request(mdsc, req);
2256                 }
2257         }
2258 }
2259
2260 void ceph_mdsc_submit_request(struct ceph_mds_client *mdsc,
2261                               struct ceph_mds_request *req)
2262 {
2263         dout("submit_request on %p\n", req);
2264         mutex_lock(&mdsc->mutex);
2265         __register_request(mdsc, req, NULL);
2266         __do_request(mdsc, req);
2267         mutex_unlock(&mdsc->mutex);
2268 }
2269
2270 /*
2271  * Synchrously perform an mds request.  Take care of all of the
2272  * session setup, forwarding, retry details.
2273  */
2274 int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
2275                          struct inode *dir,
2276                          struct ceph_mds_request *req)
2277 {
2278         int err;
2279
2280         dout("do_request on %p\n", req);
2281
2282         /* take CAP_PIN refs for r_inode, r_locked_dir, r_old_dentry */
2283         if (req->r_inode)
2284                 ceph_get_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
2285         if (req->r_locked_dir)
2286                 ceph_get_cap_refs(ceph_inode(req->r_locked_dir), CEPH_CAP_PIN);
2287         if (req->r_old_dentry_dir)
2288                 ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir),
2289                                   CEPH_CAP_PIN);
2290
2291         /* deny access to directories with pool_ns layouts */
2292         if (req->r_inode && S_ISDIR(req->r_inode->i_mode) &&
2293             ceph_inode(req->r_inode)->i_pool_ns_len)
2294                 return -EIO;
2295         if (req->r_locked_dir &&
2296             ceph_inode(req->r_locked_dir)->i_pool_ns_len)
2297                 return -EIO;
2298
2299         /* issue */
2300         mutex_lock(&mdsc->mutex);
2301         __register_request(mdsc, req, dir);
2302         __do_request(mdsc, req);
2303
2304         if (req->r_err) {
2305                 err = req->r_err;
2306                 goto out;
2307         }
2308
2309         /* wait */
2310         mutex_unlock(&mdsc->mutex);
2311         dout("do_request waiting\n");
2312         if (!req->r_timeout && req->r_wait_for_completion) {
2313                 err = req->r_wait_for_completion(mdsc, req);
2314         } else {
2315                 long timeleft = wait_for_completion_killable_timeout(
2316                                         &req->r_completion,
2317                                         ceph_timeout_jiffies(req->r_timeout));
2318                 if (timeleft > 0)
2319                         err = 0;
2320                 else if (!timeleft)
2321                         err = -EIO;  /* timed out */
2322                 else
2323                         err = timeleft;  /* killed */
2324         }
2325         dout("do_request waited, got %d\n", err);
2326         mutex_lock(&mdsc->mutex);
2327
2328         /* only abort if we didn't race with a real reply */
2329         if (req->r_got_result) {
2330                 err = le32_to_cpu(req->r_reply_info.head->result);
2331         } else if (err < 0) {
2332                 dout("aborted request %lld with %d\n", req->r_tid, err);
2333
2334                 /*
2335                  * ensure we aren't running concurrently with
2336                  * ceph_fill_trace or ceph_readdir_prepopulate, which
2337                  * rely on locks (dir mutex) held by our caller.
2338                  */
2339                 mutex_lock(&req->r_fill_mutex);
2340                 req->r_err = err;
2341                 req->r_aborted = true;
2342                 mutex_unlock(&req->r_fill_mutex);
2343
2344                 if (req->r_locked_dir &&
2345                     (req->r_op & CEPH_MDS_OP_WRITE))
2346                         ceph_invalidate_dir_request(req);
2347         } else {
2348                 err = req->r_err;
2349         }
2350
2351 out:
2352         mutex_unlock(&mdsc->mutex);
2353         dout("do_request %p done, result %d\n", req, err);
2354         return err;
2355 }
2356
2357 /*
2358  * Invalidate dir's completeness, dentry lease state on an aborted MDS
2359  * namespace request.
2360  */
2361 void ceph_invalidate_dir_request(struct ceph_mds_request *req)
2362 {
2363         struct inode *inode = req->r_locked_dir;
2364
2365         dout("invalidate_dir_request %p (complete, lease(s))\n", inode);
2366
2367         ceph_dir_clear_complete(inode);
2368         if (req->r_dentry)
2369                 ceph_invalidate_dentry_lease(req->r_dentry);
2370         if (req->r_old_dentry)
2371                 ceph_invalidate_dentry_lease(req->r_old_dentry);
2372 }
2373
2374 /*
2375  * Handle mds reply.
2376  *
2377  * We take the session mutex and parse and process the reply immediately.
2378  * This preserves the logical ordering of replies, capabilities, etc., sent
2379  * by the MDS as they are applied to our local cache.
2380  */
2381 static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
2382 {
2383         struct ceph_mds_client *mdsc = session->s_mdsc;
2384         struct ceph_mds_request *req;
2385         struct ceph_mds_reply_head *head = msg->front.iov_base;
2386         struct ceph_mds_reply_info_parsed *rinfo;  /* parsed reply info */
2387         struct ceph_snap_realm *realm;
2388         u64 tid;
2389         int err, result;
2390         int mds = session->s_mds;
2391
2392         if (msg->front.iov_len < sizeof(*head)) {
2393                 pr_err("mdsc_handle_reply got corrupt (short) reply\n");
2394                 ceph_msg_dump(msg);
2395                 return;
2396         }
2397
2398         /* get request, session */
2399         tid = le64_to_cpu(msg->hdr.tid);
2400         mutex_lock(&mdsc->mutex);
2401         req = lookup_get_request(mdsc, tid);
2402         if (!req) {
2403                 dout("handle_reply on unknown tid %llu\n", tid);
2404                 mutex_unlock(&mdsc->mutex);
2405                 return;
2406         }
2407         dout("handle_reply %p\n", req);
2408
2409         /* correct session? */
2410         if (req->r_session != session) {
2411                 pr_err("mdsc_handle_reply got %llu on session mds%d"
2412                        " not mds%d\n", tid, session->s_mds,
2413                        req->r_session ? req->r_session->s_mds : -1);
2414                 mutex_unlock(&mdsc->mutex);
2415                 goto out;
2416         }
2417
2418         /* dup? */
2419         if ((req->r_got_unsafe && !head->safe) ||
2420             (req->r_got_safe && head->safe)) {
2421                 pr_warn("got a dup %s reply on %llu from mds%d\n",
2422                            head->safe ? "safe" : "unsafe", tid, mds);
2423                 mutex_unlock(&mdsc->mutex);
2424                 goto out;
2425         }
2426         if (req->r_got_safe) {
2427                 pr_warn("got unsafe after safe on %llu from mds%d\n",
2428                            tid, mds);
2429                 mutex_unlock(&mdsc->mutex);
2430                 goto out;
2431         }
2432
2433         result = le32_to_cpu(head->result);
2434
2435         /*
2436          * Handle an ESTALE
2437          * if we're not talking to the authority, send to them
2438          * if the authority has changed while we weren't looking,
2439          * send to new authority
2440          * Otherwise we just have to return an ESTALE
2441          */
2442         if (result == -ESTALE) {
2443                 dout("got ESTALE on request %llu", req->r_tid);
2444                 req->r_resend_mds = -1;
2445                 if (req->r_direct_mode != USE_AUTH_MDS) {
2446                         dout("not using auth, setting for that now");
2447                         req->r_direct_mode = USE_AUTH_MDS;
2448                         __do_request(mdsc, req);
2449                         mutex_unlock(&mdsc->mutex);
2450                         goto out;
2451                 } else  {
2452                         int mds = __choose_mds(mdsc, req);
2453                         if (mds >= 0 && mds != req->r_session->s_mds) {
2454                                 dout("but auth changed, so resending");
2455                                 __do_request(mdsc, req);
2456                                 mutex_unlock(&mdsc->mutex);
2457                                 goto out;
2458                         }
2459                 }
2460                 dout("have to return ESTALE on request %llu", req->r_tid);
2461         }
2462
2463
2464         if (head->safe) {
2465                 req->r_got_safe = true;
2466                 __unregister_request(mdsc, req);
2467
2468                 if (req->r_got_unsafe) {
2469                         /*
2470                          * We already handled the unsafe response, now do the
2471                          * cleanup.  No need to examine the response; the MDS
2472                          * doesn't include any result info in the safe
2473                          * response.  And even if it did, there is nothing
2474                          * useful we could do with a revised return value.
2475                          */
2476                         dout("got safe reply %llu, mds%d\n", tid, mds);
2477                         list_del_init(&req->r_unsafe_item);
2478
2479                         /* last unsafe request during umount? */
2480                         if (mdsc->stopping && !__get_oldest_req(mdsc))
2481                                 complete_all(&mdsc->safe_umount_waiters);
2482                         mutex_unlock(&mdsc->mutex);
2483                         goto out;
2484                 }
2485         } else {
2486                 req->r_got_unsafe = true;
2487                 list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe);
2488                 if (req->r_unsafe_dir) {
2489                         struct ceph_inode_info *ci =
2490                                         ceph_inode(req->r_unsafe_dir);
2491                         spin_lock(&ci->i_unsafe_lock);
2492                         list_add_tail(&req->r_unsafe_dir_item,
2493                                       &ci->i_unsafe_dirops);
2494                         spin_unlock(&ci->i_unsafe_lock);
2495                 }
2496         }
2497
2498         dout("handle_reply tid %lld result %d\n", tid, result);
2499         rinfo = &req->r_reply_info;
2500         err = parse_reply_info(msg, rinfo, session->s_con.peer_features);
2501         mutex_unlock(&mdsc->mutex);
2502
2503         mutex_lock(&session->s_mutex);
2504         if (err < 0) {
2505                 pr_err("mdsc_handle_reply got corrupt reply mds%d(tid:%lld)\n", mds, tid);
2506                 ceph_msg_dump(msg);
2507                 goto out_err;
2508         }
2509
2510         /* snap trace */
2511         realm = NULL;
2512         if (rinfo->snapblob_len) {
2513                 down_write(&mdsc->snap_rwsem);
2514                 ceph_update_snap_trace(mdsc, rinfo->snapblob,
2515                                 rinfo->snapblob + rinfo->snapblob_len,
2516                                 le32_to_cpu(head->op) == CEPH_MDS_OP_RMSNAP,
2517                                 &realm);
2518                 downgrade_write(&mdsc->snap_rwsem);
2519         } else {
2520                 down_read(&mdsc->snap_rwsem);
2521         }
2522
2523         /* insert trace into our cache */
2524         mutex_lock(&req->r_fill_mutex);
2525         current->journal_info = req;
2526         err = ceph_fill_trace(mdsc->fsc->sb, req, req->r_session);
2527         if (err == 0) {
2528                 if (result == 0 && (req->r_op == CEPH_MDS_OP_READDIR ||
2529                                     req->r_op == CEPH_MDS_OP_LSSNAP))
2530                         ceph_readdir_prepopulate(req, req->r_session);
2531                 ceph_unreserve_caps(mdsc, &req->r_caps_reservation);
2532         }
2533         current->journal_info = NULL;
2534         mutex_unlock(&req->r_fill_mutex);
2535
2536         up_read(&mdsc->snap_rwsem);
2537         if (realm)
2538                 ceph_put_snap_realm(mdsc, realm);
2539
2540         if (err == 0 && req->r_got_unsafe && req->r_target_inode) {
2541                 struct ceph_inode_info *ci = ceph_inode(req->r_target_inode);
2542                 spin_lock(&ci->i_unsafe_lock);
2543                 list_add_tail(&req->r_unsafe_target_item, &ci->i_unsafe_iops);
2544                 spin_unlock(&ci->i_unsafe_lock);
2545         }
2546 out_err:
2547         mutex_lock(&mdsc->mutex);
2548         if (!req->r_aborted) {
2549                 if (err) {
2550                         req->r_err = err;
2551                 } else {
2552                         req->r_reply =  ceph_msg_get(msg);
2553                         req->r_got_result = true;
2554                 }
2555         } else {
2556                 dout("reply arrived after request %lld was aborted\n", tid);
2557         }
2558         mutex_unlock(&mdsc->mutex);
2559
2560         mutex_unlock(&session->s_mutex);
2561
2562         /* kick calling process */
2563         complete_request(mdsc, req);
2564 out:
2565         ceph_mdsc_put_request(req);
2566         return;
2567 }
2568
2569
2570
2571 /*
2572  * handle mds notification that our request has been forwarded.
2573  */
2574 static void handle_forward(struct ceph_mds_client *mdsc,
2575                            struct ceph_mds_session *session,
2576                            struct ceph_msg *msg)
2577 {
2578         struct ceph_mds_request *req;
2579         u64 tid = le64_to_cpu(msg->hdr.tid);
2580         u32 next_mds;
2581         u32 fwd_seq;
2582         int err = -EINVAL;
2583         void *p = msg->front.iov_base;
2584         void *end = p + msg->front.iov_len;
2585
2586         ceph_decode_need(&p, end, 2*sizeof(u32), bad);
2587         next_mds = ceph_decode_32(&p);
2588         fwd_seq = ceph_decode_32(&p);
2589
2590         mutex_lock(&mdsc->mutex);
2591         req = lookup_get_request(mdsc, tid);
2592         if (!req) {
2593                 dout("forward tid %llu to mds%d - req dne\n", tid, next_mds);
2594                 goto out;  /* dup reply? */
2595         }
2596
2597         if (req->r_aborted) {
2598                 dout("forward tid %llu aborted, unregistering\n", tid);
2599                 __unregister_request(mdsc, req);
2600         } else if (fwd_seq <= req->r_num_fwd) {
2601                 dout("forward tid %llu to mds%d - old seq %d <= %d\n",
2602                      tid, next_mds, req->r_num_fwd, fwd_seq);
2603         } else {
2604                 /* resend. forward race not possible; mds would drop */
2605                 dout("forward tid %llu to mds%d (we resend)\n", tid, next_mds);
2606                 BUG_ON(req->r_err);
2607                 BUG_ON(req->r_got_result);
2608                 req->r_attempts = 0;
2609                 req->r_num_fwd = fwd_seq;
2610                 req->r_resend_mds = next_mds;
2611                 put_request_session(req);
2612                 __do_request(mdsc, req);
2613         }
2614         ceph_mdsc_put_request(req);
2615 out:
2616         mutex_unlock(&mdsc->mutex);
2617         return;
2618
2619 bad:
2620         pr_err("mdsc_handle_forward decode error err=%d\n", err);
2621 }
2622
2623 /*
2624  * handle a mds session control message
2625  */
2626 static void handle_session(struct ceph_mds_session *session,
2627                            struct ceph_msg *msg)
2628 {
2629         struct ceph_mds_client *mdsc = session->s_mdsc;
2630         u32 op;
2631         u64 seq;
2632         int mds = session->s_mds;
2633         struct ceph_mds_session_head *h = msg->front.iov_base;
2634         int wake = 0;
2635
2636         /* decode */
2637         if (msg->front.iov_len != sizeof(*h))
2638                 goto bad;
2639         op = le32_to_cpu(h->op);
2640         seq = le64_to_cpu(h->seq);
2641
2642         mutex_lock(&mdsc->mutex);
2643         if (op == CEPH_SESSION_CLOSE)
2644                 __unregister_session(mdsc, session);
2645         /* FIXME: this ttl calculation is generous */
2646         session->s_ttl = jiffies + HZ*mdsc->mdsmap->m_session_autoclose;
2647         mutex_unlock(&mdsc->mutex);
2648
2649         mutex_lock(&session->s_mutex);
2650
2651         dout("handle_session mds%d %s %p state %s seq %llu\n",
2652              mds, ceph_session_op_name(op), session,
2653              ceph_session_state_name(session->s_state), seq);
2654
2655         if (session->s_state == CEPH_MDS_SESSION_HUNG) {
2656                 session->s_state = CEPH_MDS_SESSION_OPEN;
2657                 pr_info("mds%d came back\n", session->s_mds);
2658         }
2659
2660         switch (op) {
2661         case CEPH_SESSION_OPEN:
2662                 if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
2663                         pr_info("mds%d reconnect success\n", session->s_mds);
2664                 session->s_state = CEPH_MDS_SESSION_OPEN;
2665                 renewed_caps(mdsc, session, 0);
2666                 wake = 1;
2667                 if (mdsc->stopping)
2668                         __close_session(mdsc, session);
2669                 break;
2670
2671         case CEPH_SESSION_RENEWCAPS:
2672                 if (session->s_renew_seq == seq)
2673                         renewed_caps(mdsc, session, 1);
2674                 break;
2675
2676         case CEPH_SESSION_CLOSE:
2677                 if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
2678                         pr_info("mds%d reconnect denied\n", session->s_mds);
2679                 cleanup_session_requests(mdsc, session);
2680                 remove_session_caps(session);
2681                 wake = 2; /* for good measure */
2682                 wake_up_all(&mdsc->session_close_wq);
2683                 break;
2684
2685         case CEPH_SESSION_STALE:
2686                 pr_info("mds%d caps went stale, renewing\n",
2687                         session->s_mds);
2688                 spin_lock(&session->s_gen_ttl_lock);
2689                 session->s_cap_gen++;
2690                 session->s_cap_ttl = jiffies - 1;
2691                 spin_unlock(&session->s_gen_ttl_lock);
2692                 send_renew_caps(mdsc, session);
2693                 break;
2694
2695         case CEPH_SESSION_RECALL_STATE:
2696                 trim_caps(mdsc, session, le32_to_cpu(h->max_caps));
2697                 break;
2698
2699         case CEPH_SESSION_FLUSHMSG:
2700                 send_flushmsg_ack(mdsc, session, seq);
2701                 break;
2702
2703         case CEPH_SESSION_FORCE_RO:
2704                 dout("force_session_readonly %p\n", session);
2705                 spin_lock(&session->s_cap_lock);
2706                 session->s_readonly = true;
2707                 spin_unlock(&session->s_cap_lock);
2708                 wake_up_session_caps(session, 0);
2709                 break;
2710
2711         default:
2712                 pr_err("mdsc_handle_session bad op %d mds%d\n", op, mds);
2713                 WARN_ON(1);
2714         }
2715
2716         mutex_unlock(&session->s_mutex);
2717         if (wake) {
2718                 mutex_lock(&mdsc->mutex);
2719                 __wake_requests(mdsc, &session->s_waiting);
2720                 if (wake == 2)
2721                         kick_requests(mdsc, mds);
2722                 mutex_unlock(&mdsc->mutex);
2723         }
2724         return;
2725
2726 bad:
2727         pr_err("mdsc_handle_session corrupt message mds%d len %d\n", mds,
2728                (int)msg->front.iov_len);
2729         ceph_msg_dump(msg);
2730         return;
2731 }
2732
2733
2734 /*
2735  * called under session->mutex.
2736  */
2737 static void replay_unsafe_requests(struct ceph_mds_client *mdsc,
2738                                    struct ceph_mds_session *session)
2739 {
2740         struct ceph_mds_request *req, *nreq;
2741         struct rb_node *p;
2742         int err;
2743
2744         dout("replay_unsafe_requests mds%d\n", session->s_mds);
2745
2746         mutex_lock(&mdsc->mutex);
2747         list_for_each_entry_safe(req, nreq, &session->s_unsafe, r_unsafe_item) {
2748                 err = __prepare_send_request(mdsc, req, session->s_mds, true);
2749                 if (!err) {
2750                         ceph_msg_get(req->r_request);
2751                         ceph_con_send(&session->s_con, req->r_request);
2752                 }
2753         }
2754
2755         /*
2756          * also re-send old requests when MDS enters reconnect stage. So that MDS
2757          * can process completed request in clientreplay stage.
2758          */
2759         p = rb_first(&mdsc->request_tree);
2760         while (p) {
2761                 req = rb_entry(p, struct ceph_mds_request, r_node);
2762                 p = rb_next(p);
2763                 if (req->r_got_unsafe)
2764                         continue;
2765                 if (req->r_attempts == 0)
2766                         continue; /* only old requests */
2767                 if (req->r_session &&
2768                     req->r_session->s_mds == session->s_mds) {
2769                         err = __prepare_send_request(mdsc, req,
2770                                                      session->s_mds, true);
2771                         if (!err) {
2772                                 ceph_msg_get(req->r_request);
2773                                 ceph_con_send(&session->s_con, req->r_request);
2774                         }
2775                 }
2776         }
2777         mutex_unlock(&mdsc->mutex);
2778 }
2779
2780 /*
2781  * Encode information about a cap for a reconnect with the MDS.
2782  */
2783 static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
2784                           void *arg)
2785 {
2786         union {
2787                 struct ceph_mds_cap_reconnect v2;
2788                 struct ceph_mds_cap_reconnect_v1 v1;
2789         } rec;
2790         size_t reclen;
2791         struct ceph_inode_info *ci;
2792         struct ceph_reconnect_state *recon_state = arg;
2793         struct ceph_pagelist *pagelist = recon_state->pagelist;
2794         char *path;
2795         int pathlen, err;
2796         u64 pathbase;
2797         struct dentry *dentry;
2798
2799         ci = cap->ci;
2800
2801         dout(" adding %p ino %llx.%llx cap %p %lld %s\n",
2802              inode, ceph_vinop(inode), cap, cap->cap_id,
2803              ceph_cap_string(cap->issued));
2804         err = ceph_pagelist_encode_64(pagelist, ceph_ino(inode));
2805         if (err)
2806                 return err;
2807
2808         dentry = d_find_alias(inode);
2809         if (dentry) {
2810                 path = ceph_mdsc_build_path(dentry, &pathlen, &pathbase, 0);
2811                 if (IS_ERR(path)) {
2812                         err = PTR_ERR(path);
2813                         goto out_dput;
2814                 }
2815         } else {
2816                 path = NULL;
2817                 pathlen = 0;
2818         }
2819         err = ceph_pagelist_encode_string(pagelist, path, pathlen);
2820         if (err)
2821                 goto out_free;
2822
2823         spin_lock(&ci->i_ceph_lock);
2824         cap->seq = 0;        /* reset cap seq */
2825         cap->issue_seq = 0;  /* and issue_seq */
2826         cap->mseq = 0;       /* and migrate_seq */
2827         cap->cap_gen = cap->session->s_cap_gen;
2828
2829         if (recon_state->flock) {
2830                 rec.v2.cap_id = cpu_to_le64(cap->cap_id);
2831                 rec.v2.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
2832                 rec.v2.issued = cpu_to_le32(cap->issued);
2833                 rec.v2.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
2834                 rec.v2.pathbase = cpu_to_le64(pathbase);
2835                 rec.v2.flock_len = 0;
2836                 reclen = sizeof(rec.v2);
2837         } else {
2838                 rec.v1.cap_id = cpu_to_le64(cap->cap_id);
2839                 rec.v1.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
2840                 rec.v1.issued = cpu_to_le32(cap->issued);
2841                 rec.v1.size = cpu_to_le64(inode->i_size);
2842                 ceph_encode_timespec(&rec.v1.mtime, &inode->i_mtime);
2843                 ceph_encode_timespec(&rec.v1.atime, &inode->i_atime);
2844                 rec.v1.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
2845                 rec.v1.pathbase = cpu_to_le64(pathbase);
2846                 reclen = sizeof(rec.v1);
2847         }
2848         spin_unlock(&ci->i_ceph_lock);
2849
2850         if (recon_state->flock) {
2851                 int num_fcntl_locks, num_flock_locks;
2852                 struct ceph_filelock *flocks;
2853
2854 encode_again:
2855                 ceph_count_locks(inode, &num_fcntl_locks, &num_flock_locks);
2856                 flocks = kmalloc((num_fcntl_locks+num_flock_locks) *
2857                                  sizeof(struct ceph_filelock), GFP_NOFS);
2858                 if (!flocks) {
2859                         err = -ENOMEM;
2860                         goto out_free;
2861                 }
2862                 err = ceph_encode_locks_to_buffer(inode, flocks,
2863                                                   num_fcntl_locks,
2864                                                   num_flock_locks);
2865                 if (err) {
2866                         kfree(flocks);
2867                         if (err == -ENOSPC)
2868                                 goto encode_again;
2869                         goto out_free;
2870                 }
2871                 /*
2872                  * number of encoded locks is stable, so copy to pagelist
2873                  */
2874                 rec.v2.flock_len = cpu_to_le32(2*sizeof(u32) +
2875                                     (num_fcntl_locks+num_flock_locks) *
2876                                     sizeof(struct ceph_filelock));
2877                 err = ceph_pagelist_append(pagelist, &rec, reclen);
2878                 if (!err)
2879                         err = ceph_locks_to_pagelist(flocks, pagelist,
2880                                                      num_fcntl_locks,
2881                                                      num_flock_locks);
2882                 kfree(flocks);
2883         } else {
2884                 err = ceph_pagelist_append(pagelist, &rec, reclen);
2885         }
2886
2887         recon_state->nr_caps++;
2888 out_free:
2889         kfree(path);
2890 out_dput:
2891         dput(dentry);
2892         return err;
2893 }
2894
2895
2896 /*
2897  * If an MDS fails and recovers, clients need to reconnect in order to
2898  * reestablish shared state.  This includes all caps issued through
2899  * this session _and_ the snap_realm hierarchy.  Because it's not
2900  * clear which snap realms the mds cares about, we send everything we
2901  * know about.. that ensures we'll then get any new info the
2902  * recovering MDS might have.
2903  *
2904  * This is a relatively heavyweight operation, but it's rare.
2905  *
2906  * called with mdsc->mutex held.
2907  */
2908 static void send_mds_reconnect(struct ceph_mds_client *mdsc,
2909                                struct ceph_mds_session *session)
2910 {
2911         struct ceph_msg *reply;
2912         struct rb_node *p;
2913         int mds = session->s_mds;
2914         int err = -ENOMEM;
2915         int s_nr_caps;
2916         struct ceph_pagelist *pagelist;
2917         struct ceph_reconnect_state recon_state;
2918
2919         pr_info("mds%d reconnect start\n", mds);
2920
2921         pagelist = kmalloc(sizeof(*pagelist), GFP_NOFS);
2922         if (!pagelist)
2923                 goto fail_nopagelist;
2924         ceph_pagelist_init(pagelist);
2925
2926         reply = ceph_msg_new(CEPH_MSG_CLIENT_RECONNECT, 0, GFP_NOFS, false);
2927         if (!reply)
2928                 goto fail_nomsg;
2929
2930         mutex_lock(&session->s_mutex);
2931         session->s_state = CEPH_MDS_SESSION_RECONNECTING;
2932         session->s_seq = 0;
2933
2934         dout("session %p state %s\n", session,
2935              ceph_session_state_name(session->s_state));
2936
2937         spin_lock(&session->s_gen_ttl_lock);
2938         session->s_cap_gen++;
2939         spin_unlock(&session->s_gen_ttl_lock);
2940
2941         spin_lock(&session->s_cap_lock);
2942         /* don't know if session is readonly */
2943         session->s_readonly = 0;
2944         /*
2945          * notify __ceph_remove_cap() that we are composing cap reconnect.
2946          * If a cap get released before being added to the cap reconnect,
2947          * __ceph_remove_cap() should skip queuing cap release.
2948          */
2949         session->s_cap_reconnect = 1;
2950         /* drop old cap expires; we're about to reestablish that state */
2951         cleanup_cap_releases(mdsc, session);
2952
2953         /* trim unused caps to reduce MDS's cache rejoin time */
2954         if (mdsc->fsc->sb->s_root)
2955                 shrink_dcache_parent(mdsc->fsc->sb->s_root);
2956
2957         ceph_con_close(&session->s_con);
2958         ceph_con_open(&session->s_con,
2959                       CEPH_ENTITY_TYPE_MDS, mds,
2960                       ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
2961
2962         /* replay unsafe requests */
2963         replay_unsafe_requests(mdsc, session);
2964
2965         down_read(&mdsc->snap_rwsem);
2966
2967         /* traverse this session's caps */
2968         s_nr_caps = session->s_nr_caps;
2969         err = ceph_pagelist_encode_32(pagelist, s_nr_caps);
2970         if (err)
2971                 goto fail;
2972
2973         recon_state.nr_caps = 0;
2974         recon_state.pagelist = pagelist;
2975         recon_state.flock = session->s_con.peer_features & CEPH_FEATURE_FLOCK;
2976         err = iterate_session_caps(session, encode_caps_cb, &recon_state);
2977         if (err < 0)
2978                 goto fail;
2979
2980         spin_lock(&session->s_cap_lock);
2981         session->s_cap_reconnect = 0;
2982         spin_unlock(&session->s_cap_lock);
2983
2984         /*
2985          * snaprealms.  we provide mds with the ino, seq (version), and
2986          * parent for all of our realms.  If the mds has any newer info,
2987          * it will tell us.
2988          */
2989         for (p = rb_first(&mdsc->snap_realms); p; p = rb_next(p)) {
2990                 struct ceph_snap_realm *realm =
2991                         rb_entry(p, struct ceph_snap_realm, node);
2992                 struct ceph_mds_snaprealm_reconnect sr_rec;
2993
2994                 dout(" adding snap realm %llx seq %lld parent %llx\n",
2995                      realm->ino, realm->seq, realm->parent_ino);
2996                 sr_rec.ino = cpu_to_le64(realm->ino);
2997                 sr_rec.seq = cpu_to_le64(realm->seq);
2998                 sr_rec.parent = cpu_to_le64(realm->parent_ino);
2999                 err = ceph_pagelist_append(pagelist, &sr_rec, sizeof(sr_rec));
3000                 if (err)
3001                         goto fail;
3002         }
3003
3004         if (recon_state.flock)
3005                 reply->hdr.version = cpu_to_le16(2);
3006
3007         /* raced with cap release? */
3008         if (s_nr_caps != recon_state.nr_caps) {
3009                 struct page *page = list_first_entry(&pagelist->head,
3010                                                      struct page, lru);
3011                 __le32 *addr = kmap_atomic(page);
3012                 *addr = cpu_to_le32(recon_state.nr_caps);
3013                 kunmap_atomic(addr);
3014         }
3015
3016         reply->hdr.data_len = cpu_to_le32(pagelist->length);
3017         ceph_msg_data_add_pagelist(reply, pagelist);
3018
3019         ceph_early_kick_flushing_caps(mdsc, session);
3020
3021         ceph_con_send(&session->s_con, reply);
3022
3023         mutex_unlock(&session->s_mutex);
3024
3025         mutex_lock(&mdsc->mutex);
3026         __wake_requests(mdsc, &session->s_waiting);
3027         mutex_unlock(&mdsc->mutex);
3028
3029         up_read(&mdsc->snap_rwsem);
3030         return;
3031
3032 fail:
3033         ceph_msg_put(reply);
3034         up_read(&mdsc->snap_rwsem);
3035         mutex_unlock(&session->s_mutex);
3036 fail_nomsg:
3037         ceph_pagelist_release(pagelist);
3038 fail_nopagelist:
3039         pr_err("error %d preparing reconnect for mds%d\n", err, mds);
3040         return;
3041 }
3042
3043
3044 /*
3045  * compare old and new mdsmaps, kicking requests
3046  * and closing out old connections as necessary
3047  *
3048  * called under mdsc->mutex.
3049  */
3050 static void check_new_map(struct ceph_mds_client *mdsc,
3051                           struct ceph_mdsmap *newmap,
3052                           struct ceph_mdsmap *oldmap)
3053 {
3054         int i;
3055         int oldstate, newstate;
3056         struct ceph_mds_session *s;
3057
3058         dout("check_new_map new %u old %u\n",
3059              newmap->m_epoch, oldmap->m_epoch);
3060
3061         for (i = 0; i < oldmap->m_max_mds && i < mdsc->max_sessions; i++) {
3062                 if (mdsc->sessions[i] == NULL)
3063                         continue;
3064                 s = mdsc->sessions[i];
3065                 oldstate = ceph_mdsmap_get_state(oldmap, i);
3066                 newstate = ceph_mdsmap_get_state(newmap, i);
3067
3068                 dout("check_new_map mds%d state %s%s -> %s%s (session %s)\n",
3069                      i, ceph_mds_state_name(oldstate),
3070                      ceph_mdsmap_is_laggy(oldmap, i) ? " (laggy)" : "",
3071                      ceph_mds_state_name(newstate),
3072                      ceph_mdsmap_is_laggy(newmap, i) ? " (laggy)" : "",
3073                      ceph_session_state_name(s->s_state));
3074
3075                 if (i >= newmap->m_max_mds ||
3076                     memcmp(ceph_mdsmap_get_addr(oldmap, i),
3077                            ceph_mdsmap_get_addr(newmap, i),
3078                            sizeof(struct ceph_entity_addr))) {
3079                         if (s->s_state == CEPH_MDS_SESSION_OPENING) {
3080                                 /* the session never opened, just close it
3081                                  * out now */
3082                                 __wake_requests(mdsc, &s->s_waiting);
3083                                 __unregister_session(mdsc, s);
3084                         } else {
3085                                 /* just close it */
3086                                 mutex_unlock(&mdsc->mutex);
3087                                 mutex_lock(&s->s_mutex);
3088                                 mutex_lock(&mdsc->mutex);
3089                                 ceph_con_close(&s->s_con);
3090                                 mutex_unlock(&s->s_mutex);
3091                                 s->s_state = CEPH_MDS_SESSION_RESTARTING;
3092                         }
3093                 } else if (oldstate == newstate) {
3094                         continue;  /* nothing new with this mds */
3095                 }
3096
3097                 /*
3098                  * send reconnect?
3099                  */
3100                 if (s->s_state == CEPH_MDS_SESSION_RESTARTING &&
3101                     newstate >= CEPH_MDS_STATE_RECONNECT) {
3102                         mutex_unlock(&mdsc->mutex);
3103                         send_mds_reconnect(mdsc, s);
3104                         mutex_lock(&mdsc->mutex);
3105                 }
3106
3107                 /*
3108                  * kick request on any mds that has gone active.
3109                  */
3110                 if (oldstate < CEPH_MDS_STATE_ACTIVE &&
3111                     newstate >= CEPH_MDS_STATE_ACTIVE) {
3112                         if (oldstate != CEPH_MDS_STATE_CREATING &&
3113                             oldstate != CEPH_MDS_STATE_STARTING)
3114                                 pr_info("mds%d recovery completed\n", s->s_mds);
3115                         kick_requests(mdsc, i);
3116                         ceph_kick_flushing_caps(mdsc, s);
3117                         wake_up_session_caps(s, 1);
3118                 }
3119         }
3120
3121         for (i = 0; i < newmap->m_max_mds && i < mdsc->max_sessions; i++) {
3122                 s = mdsc->sessions[i];
3123                 if (!s)
3124                         continue;
3125                 if (!ceph_mdsmap_is_laggy(newmap, i))
3126                         continue;
3127                 if (s->s_state == CEPH_MDS_SESSION_OPEN ||
3128                     s->s_state == CEPH_MDS_SESSION_HUNG ||
3129                     s->s_state == CEPH_MDS_SESSION_CLOSING) {
3130                         dout(" connecting to export targets of laggy mds%d\n",
3131                              i);
3132                         __open_export_target_sessions(mdsc, s);
3133                 }
3134         }
3135 }
3136
3137
3138
3139 /*
3140  * leases
3141  */
3142
3143 /*
3144  * caller must hold session s_mutex, dentry->d_lock
3145  */
3146 void __ceph_mdsc_drop_dentry_lease(struct dentry *dentry)
3147 {
3148         struct ceph_dentry_info *di = ceph_dentry(dentry);
3149
3150         ceph_put_mds_session(di->lease_session);
3151         di->lease_session = NULL;
3152 }
3153
3154 static void handle_lease(struct ceph_mds_client *mdsc,
3155                          struct ceph_mds_session *session,
3156                          struct ceph_msg *msg)
3157 {
3158         struct super_block *sb = mdsc->fsc->sb;
3159         struct inode *inode;
3160         struct dentry *parent, *dentry;
3161         struct ceph_dentry_info *di;
3162         int mds = session->s_mds;
3163         struct ceph_mds_lease *h = msg->front.iov_base;
3164         u32 seq;
3165         struct ceph_vino vino;
3166         struct qstr dname;
3167         int release = 0;
3168
3169         dout("handle_lease from mds%d\n", mds);
3170
3171         /* decode */
3172         if (msg->front.iov_len < sizeof(*h) + sizeof(u32))
3173                 goto bad;
3174         vino.ino = le64_to_cpu(h->ino);
3175         vino.snap = CEPH_NOSNAP;
3176         seq = le32_to_cpu(h->seq);
3177         dname.name = (void *)h + sizeof(*h) + sizeof(u32);
3178         dname.len = msg->front.iov_len - sizeof(*h) - sizeof(u32);
3179         if (dname.len != get_unaligned_le32(h+1))
3180                 goto bad;
3181
3182         /* lookup inode */
3183         inode = ceph_find_inode(sb, vino);
3184         dout("handle_lease %s, ino %llx %p %.*s\n",
3185              ceph_lease_op_name(h->action), vino.ino, inode,
3186              dname.len, dname.name);
3187
3188         mutex_lock(&session->s_mutex);
3189         session->s_seq++;
3190
3191         if (inode == NULL) {
3192                 dout("handle_lease no inode %llx\n", vino.ino);
3193                 goto release;
3194         }
3195
3196         /* dentry */
3197         parent = d_find_alias(inode);
3198         if (!parent) {
3199                 dout("no parent dentry on inode %p\n", inode);
3200                 WARN_ON(1);
3201                 goto release;  /* hrm... */
3202         }
3203         dname.hash = full_name_hash(dname.name, dname.len);
3204         dentry = d_lookup(parent, &dname);
3205         dput(parent);
3206         if (!dentry)
3207                 goto release;
3208
3209         spin_lock(&dentry->d_lock);
3210         di = ceph_dentry(dentry);
3211         switch (h->action) {
3212         case CEPH_MDS_LEASE_REVOKE:
3213                 if (di->lease_session == session) {
3214                         if (ceph_seq_cmp(di->lease_seq, seq) > 0)
3215                                 h->seq = cpu_to_le32(di->lease_seq);
3216                         __ceph_mdsc_drop_dentry_lease(dentry);
3217                 }
3218                 release = 1;
3219                 break;
3220
3221         case CEPH_MDS_LEASE_RENEW:
3222                 if (di->lease_session == session &&
3223                     di->lease_gen == session->s_cap_gen &&
3224                     di->lease_renew_from &&
3225                     di->lease_renew_after == 0) {
3226                         unsigned long duration =
3227                                 msecs_to_jiffies(le32_to_cpu(h->duration_ms));
3228
3229                         di->lease_seq = seq;
3230                         dentry->d_time = di->lease_renew_from + duration;
3231                         di->lease_renew_after = di->lease_renew_from +
3232                                 (duration >> 1);
3233                         di->lease_renew_from = 0;
3234                 }
3235                 break;
3236         }
3237         spin_unlock(&dentry->d_lock);
3238         dput(dentry);
3239
3240         if (!release)
3241                 goto out;
3242
3243 release:
3244         /* let's just reuse the same message */
3245         h->action = CEPH_MDS_LEASE_REVOKE_ACK;
3246         ceph_msg_get(msg);
3247         ceph_con_send(&session->s_con, msg);
3248
3249 out:
3250         iput(inode);
3251         mutex_unlock(&session->s_mutex);
3252         return;
3253
3254 bad:
3255         pr_err("corrupt lease message\n");
3256         ceph_msg_dump(msg);
3257 }
3258
3259 void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session,
3260                               struct inode *inode,
3261                               struct dentry *dentry, char action,
3262                               u32 seq)
3263 {
3264         struct ceph_msg *msg;
3265         struct ceph_mds_lease *lease;
3266         int len = sizeof(*lease) + sizeof(u32);
3267         int dnamelen = 0;
3268
3269         dout("lease_send_msg inode %p dentry %p %s to mds%d\n",
3270              inode, dentry, ceph_lease_op_name(action), session->s_mds);
3271         dnamelen = dentry->d_name.len;
3272         len += dnamelen;
3273
3274         msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, GFP_NOFS, false);
3275         if (!msg)
3276                 return;
3277         lease = msg->front.iov_base;
3278         lease->action = action;
3279         lease->ino = cpu_to_le64(ceph_vino(inode).ino);
3280         lease->first = lease->last = cpu_to_le64(ceph_vino(inode).snap);
3281         lease->seq = cpu_to_le32(seq);
3282         put_unaligned_le32(dnamelen, lease + 1);
3283         memcpy((void *)(lease + 1) + 4, dentry->d_name.name, dnamelen);
3284
3285         /*
3286          * if this is a preemptive lease RELEASE, no need to
3287          * flush request stream, since the actual request will
3288          * soon follow.
3289          */
3290         msg->more_to_follow = (action == CEPH_MDS_LEASE_RELEASE);
3291
3292         ceph_con_send(&session->s_con, msg);
3293 }
3294
3295 /*
3296  * Preemptively release a lease we expect to invalidate anyway.
3297  * Pass @inode always, @dentry is optional.
3298  */
3299 void ceph_mdsc_lease_release(struct ceph_mds_client *mdsc, struct inode *inode,
3300                              struct dentry *dentry)
3301 {
3302         struct ceph_dentry_info *di;
3303         struct ceph_mds_session *session;
3304         u32 seq;
3305
3306         BUG_ON(inode == NULL);
3307         BUG_ON(dentry == NULL);
3308
3309         /* is dentry lease valid? */
3310         spin_lock(&dentry->d_lock);
3311         di = ceph_dentry(dentry);
3312         if (!di || !di->lease_session ||
3313             di->lease_session->s_mds < 0 ||
3314             di->lease_gen != di->lease_session->s_cap_gen ||
3315             !time_before(jiffies, dentry->d_time)) {
3316                 dout("lease_release inode %p dentry %p -- "
3317                      "no lease\n",
3318                      inode, dentry);
3319                 spin_unlock(&dentry->d_lock);
3320                 return;
3321         }
3322
3323         /* we do have a lease on this dentry; note mds and seq */
3324         session = ceph_get_mds_session(di->lease_session);
3325         seq = di->lease_seq;
3326         __ceph_mdsc_drop_dentry_lease(dentry);
3327         spin_unlock(&dentry->d_lock);
3328
3329         dout("lease_release inode %p dentry %p to mds%d\n",
3330              inode, dentry, session->s_mds);
3331         ceph_mdsc_lease_send_msg(session, inode, dentry,
3332                                  CEPH_MDS_LEASE_RELEASE, seq);
3333         ceph_put_mds_session(session);
3334 }
3335
3336 /*
3337  * drop all leases (and dentry refs) in preparation for umount
3338  */
3339 static void drop_leases(struct ceph_mds_client *mdsc)
3340 {
3341         int i;
3342
3343         dout("drop_leases\n");
3344         mutex_lock(&mdsc->mutex);
3345         for (i = 0; i < mdsc->max_sessions; i++) {
3346                 struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i);
3347                 if (!s)
3348                         continue;
3349                 mutex_unlock(&mdsc->mutex);
3350                 mutex_lock(&s->s_mutex);
3351                 mutex_unlock(&s->s_mutex);
3352                 ceph_put_mds_session(s);
3353                 mutex_lock(&mdsc->mutex);
3354         }
3355         mutex_unlock(&mdsc->mutex);
3356 }
3357
3358
3359
3360 /*
3361  * delayed work -- periodically trim expired leases, renew caps with mds
3362  */
3363 static void schedule_delayed(struct ceph_mds_client *mdsc)
3364 {
3365         int delay = 5;
3366         unsigned hz = round_jiffies_relative(HZ * delay);
3367         schedule_delayed_work(&mdsc->delayed_work, hz);
3368 }
3369
3370 static void delayed_work(struct work_struct *work)
3371 {
3372         int i;
3373         struct ceph_mds_client *mdsc =
3374                 container_of(work, struct ceph_mds_client, delayed_work.work);
3375         int renew_interval;
3376         int renew_caps;
3377
3378         dout("mdsc delayed_work\n");
3379         ceph_check_delayed_caps(mdsc);
3380
3381         mutex_lock(&mdsc->mutex);
3382         renew_interval = mdsc->mdsmap->m_session_timeout >> 2;
3383         renew_caps = time_after_eq(jiffies, HZ*renew_interval +
3384                                    mdsc->last_renew_caps);
3385         if (renew_caps)
3386                 mdsc->last_renew_caps = jiffies;
3387
3388         for (i = 0; i < mdsc->max_sessions; i++) {
3389                 struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i);
3390                 if (s == NULL)
3391                         continue;
3392                 if (s->s_state == CEPH_MDS_SESSION_CLOSING) {
3393                         dout("resending session close request for mds%d\n",
3394                              s->s_mds);
3395                         request_close_session(mdsc, s);
3396                         ceph_put_mds_session(s);
3397                         continue;
3398                 }
3399                 if (s->s_ttl && time_after(jiffies, s->s_ttl)) {
3400                         if (s->s_state == CEPH_MDS_SESSION_OPEN) {
3401                                 s->s_state = CEPH_MDS_SESSION_HUNG;
3402                                 pr_info("mds%d hung\n", s->s_mds);
3403                         }
3404                 }
3405                 if (s->s_state < CEPH_MDS_SESSION_OPEN) {
3406                         /* this mds is failed or recovering, just wait */
3407                         ceph_put_mds_session(s);
3408                         continue;
3409                 }
3410                 mutex_unlock(&mdsc->mutex);
3411
3412                 mutex_lock(&s->s_mutex);
3413                 if (renew_caps)
3414                         send_renew_caps(mdsc, s);
3415                 else
3416                         ceph_con_keepalive(&s->s_con);
3417                 if (s->s_state == CEPH_MDS_SESSION_OPEN ||
3418                     s->s_state == CEPH_MDS_SESSION_HUNG)
3419                         ceph_send_cap_releases(mdsc, s);
3420                 mutex_unlock(&s->s_mutex);
3421                 ceph_put_mds_session(s);
3422
3423                 mutex_lock(&mdsc->mutex);
3424         }
3425         mutex_unlock(&mdsc->mutex);
3426
3427         schedule_delayed(mdsc);
3428 }
3429
3430 int ceph_mdsc_init(struct ceph_fs_client *fsc)
3431
3432 {
3433         struct ceph_mds_client *mdsc;
3434
3435         mdsc = kzalloc(sizeof(struct ceph_mds_client), GFP_NOFS);
3436         if (!mdsc)
3437                 return -ENOMEM;
3438         mdsc->fsc = fsc;
3439         fsc->mdsc = mdsc;
3440         mutex_init(&mdsc->mutex);
3441         mdsc->mdsmap = kzalloc(sizeof(*mdsc->mdsmap), GFP_NOFS);
3442         if (mdsc->mdsmap == NULL) {
3443                 kfree(mdsc);
3444                 return -ENOMEM;
3445         }
3446
3447         init_completion(&mdsc->safe_umount_waiters);
3448         init_waitqueue_head(&mdsc->session_close_wq);
3449         INIT_LIST_HEAD(&mdsc->waiting_for_map);
3450         mdsc->sessions = NULL;
3451         atomic_set(&mdsc->num_sessions, 0);
3452         mdsc->max_sessions = 0;
3453         mdsc->stopping = 0;
3454         mdsc->last_snap_seq = 0;
3455         init_rwsem(&mdsc->snap_rwsem);
3456         mdsc->snap_realms = RB_ROOT;
3457         INIT_LIST_HEAD(&mdsc->snap_empty);
3458         spin_lock_init(&mdsc->snap_empty_lock);
3459         mdsc->last_tid = 0;
3460         mdsc->oldest_tid = 0;
3461         mdsc->request_tree = RB_ROOT;
3462         INIT_DELAYED_WORK(&mdsc->delayed_work, delayed_work);
3463         mdsc->last_renew_caps = jiffies;
3464         INIT_LIST_HEAD(&mdsc->cap_delay_list);
3465         spin_lock_init(&mdsc->cap_delay_lock);
3466         INIT_LIST_HEAD(&mdsc->snap_flush_list);
3467         spin_lock_init(&mdsc->snap_flush_lock);
3468         mdsc->last_cap_flush_tid = 1;
3469         mdsc->cap_flush_tree = RB_ROOT;
3470         INIT_LIST_HEAD(&mdsc->cap_dirty);
3471         INIT_LIST_HEAD(&mdsc->cap_dirty_migrating);
3472         mdsc->num_cap_flushing = 0;
3473         spin_lock_init(&mdsc->cap_dirty_lock);
3474         init_waitqueue_head(&mdsc->cap_flushing_wq);
3475         spin_lock_init(&mdsc->dentry_lru_lock);
3476         INIT_LIST_HEAD(&mdsc->dentry_lru);
3477
3478         ceph_caps_init(mdsc);
3479         ceph_adjust_min_caps(mdsc, fsc->min_caps);
3480
3481         init_rwsem(&mdsc->pool_perm_rwsem);
3482         mdsc->pool_perm_tree = RB_ROOT;
3483
3484         return 0;
3485 }
3486
3487 /*
3488  * Wait for safe replies on open mds requests.  If we time out, drop
3489  * all requests from the tree to avoid dangling dentry refs.
3490  */
3491 static void wait_requests(struct ceph_mds_client *mdsc)
3492 {
3493         struct ceph_options *opts = mdsc->fsc->client->options;
3494         struct ceph_mds_request *req;
3495
3496         mutex_lock(&mdsc->mutex);
3497         if (__get_oldest_req(mdsc)) {
3498                 mutex_unlock(&mdsc->mutex);
3499
3500                 dout("wait_requests waiting for requests\n");
3501                 wait_for_completion_timeout(&mdsc->safe_umount_waiters,
3502                                     ceph_timeout_jiffies(opts->mount_timeout));
3503
3504                 /* tear down remaining requests */
3505                 mutex_lock(&mdsc->mutex);
3506                 while ((req = __get_oldest_req(mdsc))) {
3507                         dout("wait_requests timed out on tid %llu\n",
3508                              req->r_tid);
3509                         __unregister_request(mdsc, req);
3510                 }
3511         }
3512         mutex_unlock(&mdsc->mutex);
3513         dout("wait_requests done\n");
3514 }
3515
3516 /*
3517  * called before mount is ro, and before dentries are torn down.
3518  * (hmm, does this still race with new lookups?)
3519  */
3520 void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc)
3521 {
3522         dout("pre_umount\n");
3523         mdsc->stopping = 1;
3524
3525         drop_leases(mdsc);
3526         ceph_flush_dirty_caps(mdsc);
3527         wait_requests(mdsc);
3528
3529         /*
3530          * wait for reply handlers to drop their request refs and
3531          * their inode/dcache refs
3532          */
3533         ceph_msgr_flush();
3534 }
3535
3536 /*
3537  * wait for all write mds requests to flush.
3538  */
3539 static void wait_unsafe_requests(struct ceph_mds_client *mdsc, u64 want_tid)
3540 {
3541         struct ceph_mds_request *req = NULL, *nextreq;
3542         struct rb_node *n;
3543
3544         mutex_lock(&mdsc->mutex);
3545         dout("wait_unsafe_requests want %lld\n", want_tid);
3546 restart:
3547         req = __get_oldest_req(mdsc);
3548         while (req && req->r_tid <= want_tid) {
3549                 /* find next request */
3550                 n = rb_next(&req->r_node);
3551                 if (n)
3552                         nextreq = rb_entry(n, struct ceph_mds_request, r_node);
3553                 else
3554                         nextreq = NULL;
3555                 if (req->r_op != CEPH_MDS_OP_SETFILELOCK &&
3556                     (req->r_op & CEPH_MDS_OP_WRITE)) {
3557                         /* write op */
3558                         ceph_mdsc_get_request(req);
3559                         if (nextreq)
3560                                 ceph_mdsc_get_request(nextreq);
3561                         mutex_unlock(&mdsc->mutex);
3562                         dout("wait_unsafe_requests  wait on %llu (want %llu)\n",
3563                              req->r_tid, want_tid);
3564                         wait_for_completion(&req->r_safe_completion);
3565                         mutex_lock(&mdsc->mutex);
3566                         ceph_mdsc_put_request(req);
3567                         if (!nextreq)
3568                                 break;  /* next dne before, so we're done! */
3569                         if (RB_EMPTY_NODE(&nextreq->r_node)) {
3570                                 /* next request was removed from tree */
3571                                 ceph_mdsc_put_request(nextreq);
3572                                 goto restart;
3573                         }
3574                         ceph_mdsc_put_request(nextreq);  /* won't go away */
3575                 }
3576                 req = nextreq;
3577         }
3578         mutex_unlock(&mdsc->mutex);
3579         dout("wait_unsafe_requests done\n");
3580 }
3581
3582 void ceph_mdsc_sync(struct ceph_mds_client *mdsc)
3583 {
3584         u64 want_tid, want_flush, want_snap;
3585
3586         if (ACCESS_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN)
3587                 return;
3588
3589         dout("sync\n");
3590         mutex_lock(&mdsc->mutex);
3591         want_tid = mdsc->last_tid;
3592         mutex_unlock(&mdsc->mutex);
3593
3594         ceph_flush_dirty_caps(mdsc);
3595         spin_lock(&mdsc->cap_dirty_lock);
3596         want_flush = mdsc->last_cap_flush_tid;
3597         spin_unlock(&mdsc->cap_dirty_lock);
3598
3599         down_read(&mdsc->snap_rwsem);
3600         want_snap = mdsc->last_snap_seq;
3601         up_read(&mdsc->snap_rwsem);
3602
3603         dout("sync want tid %lld flush_seq %lld snap_seq %lld\n",
3604              want_tid, want_flush, want_snap);
3605
3606         wait_unsafe_requests(mdsc, want_tid);
3607         wait_caps_flush(mdsc, want_flush, want_snap);
3608 }
3609
3610 /*
3611  * true if all sessions are closed, or we force unmount
3612  */
3613 static bool done_closing_sessions(struct ceph_mds_client *mdsc)
3614 {
3615         if (ACCESS_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN)
3616                 return true;
3617         return atomic_read(&mdsc->num_sessions) == 0;
3618 }
3619
3620 /*
3621  * called after sb is ro.
3622  */
3623 void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc)
3624 {
3625         struct ceph_options *opts = mdsc->fsc->client->options;
3626         struct ceph_mds_session *session;
3627         int i;
3628
3629         dout("close_sessions\n");
3630
3631         /* close sessions */
3632         mutex_lock(&mdsc->mutex);
3633         for (i = 0; i < mdsc->max_sessions; i++) {
3634                 session = __ceph_lookup_mds_session(mdsc, i);
3635                 if (!session)
3636                         continue;
3637                 mutex_unlock(&mdsc->mutex);
3638                 mutex_lock(&session->s_mutex);
3639                 __close_session(mdsc, session);
3640                 mutex_unlock(&session->s_mutex);
3641                 ceph_put_mds_session(session);
3642                 mutex_lock(&mdsc->mutex);
3643         }
3644         mutex_unlock(&mdsc->mutex);
3645
3646         dout("waiting for sessions to close\n");
3647         wait_event_timeout(mdsc->session_close_wq, done_closing_sessions(mdsc),
3648                            ceph_timeout_jiffies(opts->mount_timeout));
3649
3650         /* tear down remaining sessions */
3651         mutex_lock(&mdsc->mutex);
3652         for (i = 0; i < mdsc->max_sessions; i++) {
3653                 if (mdsc->sessions[i]) {
3654                         session = get_session(mdsc->sessions[i]);
3655                         __unregister_session(mdsc, session);
3656                         mutex_unlock(&mdsc->mutex);
3657                         mutex_lock(&session->s_mutex);
3658                         remove_session_caps(session);
3659                         mutex_unlock(&session->s_mutex);
3660                         ceph_put_mds_session(session);
3661                         mutex_lock(&mdsc->mutex);
3662                 }
3663         }
3664         WARN_ON(!list_empty(&mdsc->cap_delay_list));
3665         mutex_unlock(&mdsc->mutex);
3666
3667         ceph_cleanup_empty_realms(mdsc);
3668
3669         cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */
3670
3671         dout("stopped\n");
3672 }
3673
3674 void ceph_mdsc_force_umount(struct ceph_mds_client *mdsc)
3675 {
3676         struct ceph_mds_session *session;
3677         int mds;
3678
3679         dout("force umount\n");
3680
3681         mutex_lock(&mdsc->mutex);
3682         for (mds = 0; mds < mdsc->max_sessions; mds++) {
3683                 session = __ceph_lookup_mds_session(mdsc, mds);
3684                 if (!session)
3685                         continue;
3686                 mutex_unlock(&mdsc->mutex);
3687                 mutex_lock(&session->s_mutex);
3688                 __close_session(mdsc, session);
3689                 if (session->s_state == CEPH_MDS_SESSION_CLOSING) {
3690                         cleanup_session_requests(mdsc, session);
3691                         remove_session_caps(session);
3692                 }
3693                 mutex_unlock(&session->s_mutex);
3694                 ceph_put_mds_session(session);
3695                 mutex_lock(&mdsc->mutex);
3696                 kick_requests(mdsc, mds);
3697         }
3698         __wake_requests(mdsc, &mdsc->waiting_for_map);
3699         mutex_unlock(&mdsc->mutex);
3700 }
3701
3702 static void ceph_mdsc_stop(struct ceph_mds_client *mdsc)
3703 {
3704         dout("stop\n");
3705         cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */
3706         if (mdsc->mdsmap)
3707                 ceph_mdsmap_destroy(mdsc->mdsmap);
3708         kfree(mdsc->sessions);
3709         ceph_caps_finalize(mdsc);
3710         ceph_pool_perm_destroy(mdsc);
3711 }
3712
3713 void ceph_mdsc_destroy(struct ceph_fs_client *fsc)
3714 {
3715         struct ceph_mds_client *mdsc = fsc->mdsc;
3716
3717         dout("mdsc_destroy %p\n", mdsc);
3718         ceph_mdsc_stop(mdsc);
3719
3720         /* flush out any connection work with references to us */
3721         ceph_msgr_flush();
3722
3723         fsc->mdsc = NULL;
3724         kfree(mdsc);
3725         dout("mdsc_destroy %p done\n", mdsc);
3726 }
3727
3728
3729 /*
3730  * handle mds map update.
3731  */
3732 void ceph_mdsc_handle_map(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
3733 {
3734         u32 epoch;
3735         u32 maplen;
3736         void *p = msg->front.iov_base;
3737         void *end = p + msg->front.iov_len;
3738         struct ceph_mdsmap *newmap, *oldmap;
3739         struct ceph_fsid fsid;
3740         int err = -EINVAL;
3741
3742         ceph_decode_need(&p, end, sizeof(fsid)+2*sizeof(u32), bad);
3743         ceph_decode_copy(&p, &fsid, sizeof(fsid));
3744         if (ceph_check_fsid(mdsc->fsc->client, &fsid) < 0)
3745                 return;
3746         epoch = ceph_decode_32(&p);
3747         maplen = ceph_decode_32(&p);
3748         dout("handle_map epoch %u len %d\n", epoch, (int)maplen);
3749
3750         /* do we need it? */
3751         mutex_lock(&mdsc->mutex);
3752         if (mdsc->mdsmap && epoch <= mdsc->mdsmap->m_epoch) {
3753                 dout("handle_map epoch %u <= our %u\n",
3754                      epoch, mdsc->mdsmap->m_epoch);
3755                 mutex_unlock(&mdsc->mutex);
3756                 return;
3757         }
3758
3759         newmap = ceph_mdsmap_decode(&p, end);
3760         if (IS_ERR(newmap)) {
3761                 err = PTR_ERR(newmap);
3762                 goto bad_unlock;
3763         }
3764
3765         /* swap into place */
3766         if (mdsc->mdsmap) {
3767                 oldmap = mdsc->mdsmap;
3768                 mdsc->mdsmap = newmap;
3769                 check_new_map(mdsc, newmap, oldmap);
3770                 ceph_mdsmap_destroy(oldmap);
3771         } else {
3772                 mdsc->mdsmap = newmap;  /* first mds map */
3773         }
3774         mdsc->fsc->sb->s_maxbytes = mdsc->mdsmap->m_max_file_size;
3775
3776         __wake_requests(mdsc, &mdsc->waiting_for_map);
3777         ceph_monc_got_map(&mdsc->fsc->client->monc, CEPH_SUB_MDSMAP,
3778                           mdsc->mdsmap->m_epoch);
3779
3780         mutex_unlock(&mdsc->mutex);
3781         schedule_delayed(mdsc);
3782         return;
3783
3784 bad_unlock:
3785         mutex_unlock(&mdsc->mutex);
3786 bad:
3787         pr_err("error decoding mdsmap %d\n", err);
3788         return;
3789 }
3790
3791 static struct ceph_connection *con_get(struct ceph_connection *con)
3792 {
3793         struct ceph_mds_session *s = con->private;
3794
3795         if (get_session(s)) {
3796                 dout("mdsc con_get %p ok (%d)\n", s, atomic_read(&s->s_ref));
3797                 return con;
3798         }
3799         dout("mdsc con_get %p FAIL\n", s);
3800         return NULL;
3801 }
3802
3803 static void con_put(struct ceph_connection *con)
3804 {
3805         struct ceph_mds_session *s = con->private;
3806
3807         dout("mdsc con_put %p (%d)\n", s, atomic_read(&s->s_ref) - 1);
3808         ceph_put_mds_session(s);
3809 }
3810
3811 /*
3812  * if the client is unresponsive for long enough, the mds will kill
3813  * the session entirely.
3814  */
3815 static void peer_reset(struct ceph_connection *con)
3816 {
3817         struct ceph_mds_session *s = con->private;
3818         struct ceph_mds_client *mdsc = s->s_mdsc;
3819
3820         pr_warn("mds%d closed our session\n", s->s_mds);
3821         send_mds_reconnect(mdsc, s);
3822 }
3823
3824 static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
3825 {
3826         struct ceph_mds_session *s = con->private;
3827         struct ceph_mds_client *mdsc = s->s_mdsc;
3828         int type = le16_to_cpu(msg->hdr.type);
3829
3830         mutex_lock(&mdsc->mutex);
3831         if (__verify_registered_session(mdsc, s) < 0) {
3832                 mutex_unlock(&mdsc->mutex);
3833                 goto out;
3834         }
3835         mutex_unlock(&mdsc->mutex);
3836
3837         switch (type) {
3838         case CEPH_MSG_MDS_MAP:
3839                 ceph_mdsc_handle_map(mdsc, msg);
3840                 break;
3841         case CEPH_MSG_CLIENT_SESSION:
3842                 handle_session(s, msg);
3843                 break;
3844         case CEPH_MSG_CLIENT_REPLY:
3845                 handle_reply(s, msg);
3846                 break;
3847         case CEPH_MSG_CLIENT_REQUEST_FORWARD:
3848                 handle_forward(mdsc, s, msg);
3849                 break;
3850         case CEPH_MSG_CLIENT_CAPS:
3851                 ceph_handle_caps(s, msg);
3852                 break;
3853         case CEPH_MSG_CLIENT_SNAP:
3854                 ceph_handle_snap(mdsc, s, msg);
3855                 break;
3856         case CEPH_MSG_CLIENT_LEASE:
3857                 handle_lease(mdsc, s, msg);
3858                 break;
3859
3860         default:
3861                 pr_err("received unknown message type %d %s\n", type,
3862                        ceph_msg_type_name(type));
3863         }
3864 out:
3865         ceph_msg_put(msg);
3866 }
3867
3868 /*
3869  * authentication
3870  */
3871
3872 /*
3873  * Note: returned pointer is the address of a structure that's
3874  * managed separately.  Caller must *not* attempt to free it.
3875  */
3876 static struct ceph_auth_handshake *get_authorizer(struct ceph_connection *con,
3877                                         int *proto, int force_new)
3878 {
3879         struct ceph_mds_session *s = con->private;
3880         struct ceph_mds_client *mdsc = s->s_mdsc;
3881         struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
3882         struct ceph_auth_handshake *auth = &s->s_auth;
3883
3884         if (force_new && auth->authorizer) {
3885                 ceph_auth_destroy_authorizer(auth->authorizer);
3886                 auth->authorizer = NULL;
3887         }
3888         if (!auth->authorizer) {
3889                 int ret = ceph_auth_create_authorizer(ac, CEPH_ENTITY_TYPE_MDS,
3890                                                       auth);
3891                 if (ret)
3892                         return ERR_PTR(ret);
3893         } else {
3894                 int ret = ceph_auth_update_authorizer(ac, CEPH_ENTITY_TYPE_MDS,
3895                                                       auth);
3896                 if (ret)
3897                         return ERR_PTR(ret);
3898         }
3899         *proto = ac->protocol;
3900
3901         return auth;
3902 }
3903
3904
3905 static int verify_authorizer_reply(struct ceph_connection *con, int len)
3906 {
3907         struct ceph_mds_session *s = con->private;
3908         struct ceph_mds_client *mdsc = s->s_mdsc;
3909         struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
3910
3911         return ceph_auth_verify_authorizer_reply(ac, s->s_auth.authorizer, len);
3912 }
3913
3914 static int invalidate_authorizer(struct ceph_connection *con)
3915 {
3916         struct ceph_mds_session *s = con->private;
3917         struct ceph_mds_client *mdsc = s->s_mdsc;
3918         struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
3919
3920         ceph_auth_invalidate_authorizer(ac, CEPH_ENTITY_TYPE_MDS);
3921
3922         return ceph_monc_validate_auth(&mdsc->fsc->client->monc);
3923 }
3924
3925 static struct ceph_msg *mds_alloc_msg(struct ceph_connection *con,
3926                                 struct ceph_msg_header *hdr, int *skip)
3927 {
3928         struct ceph_msg *msg;
3929         int type = (int) le16_to_cpu(hdr->type);
3930         int front_len = (int) le32_to_cpu(hdr->front_len);
3931
3932         if (con->in_msg)
3933                 return con->in_msg;
3934
3935         *skip = 0;
3936         msg = ceph_msg_new(type, front_len, GFP_NOFS, false);
3937         if (!msg) {
3938                 pr_err("unable to allocate msg type %d len %d\n",
3939                        type, front_len);
3940                 return NULL;
3941         }
3942
3943         return msg;
3944 }
3945
3946 static int mds_sign_message(struct ceph_msg *msg)
3947 {
3948        struct ceph_mds_session *s = msg->con->private;
3949        struct ceph_auth_handshake *auth = &s->s_auth;
3950
3951        return ceph_auth_sign_message(auth, msg);
3952 }
3953
3954 static int mds_check_message_signature(struct ceph_msg *msg)
3955 {
3956        struct ceph_mds_session *s = msg->con->private;
3957        struct ceph_auth_handshake *auth = &s->s_auth;
3958
3959        return ceph_auth_check_message_signature(auth, msg);
3960 }
3961
3962 static const struct ceph_connection_operations mds_con_ops = {
3963         .get = con_get,
3964         .put = con_put,
3965         .dispatch = dispatch,
3966         .get_authorizer = get_authorizer,
3967         .verify_authorizer_reply = verify_authorizer_reply,
3968         .invalidate_authorizer = invalidate_authorizer,
3969         .peer_reset = peer_reset,
3970         .alloc_msg = mds_alloc_msg,
3971         .sign_message = mds_sign_message,
3972         .check_message_signature = mds_check_message_signature,
3973 };
3974
3975 /* eof */