]> git.karo-electronics.de Git - karo-tx-linux.git/blob - drivers/staging/lustre/lustre/obdclass/cl_object.c
staging/lustre: adapt proc_dir_entry change
[karo-tx-linux.git] / drivers / staging / lustre / lustre / obdclass / cl_object.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * Client Lustre Object.
37  *
38  *   Author: Nikita Danilov <nikita.danilov@sun.com>
39  */
40
41 /*
42  * Locking.
43  *
44  *  i_mutex
45  *      PG_locked
46  *        ->coh_page_guard
47  *        ->coh_lock_guard
48  *        ->coh_attr_guard
49  *        ->ls_guard
50  */
51
52 #define DEBUG_SUBSYSTEM S_CLASS
53
54 #include <linux/libcfs/libcfs.h>
55 /* class_put_type() */
56 #include <obd_class.h>
57 #include <obd_support.h>
58 #include <lustre_fid.h>
59 #include <linux/list.h>
60 #include <linux/libcfs/libcfs_hash.h> /* for cfs_hash stuff */
61 #include <cl_object.h>
62 #include "cl_internal.h"
63
64 static struct kmem_cache *cl_env_kmem;
65
66 /** Lock class of cl_object_header::coh_page_guard */
67 static struct lock_class_key cl_page_guard_class;
68 /** Lock class of cl_object_header::coh_lock_guard */
69 static struct lock_class_key cl_lock_guard_class;
70 /** Lock class of cl_object_header::coh_attr_guard */
71 static struct lock_class_key cl_attr_guard_class;
72
73 extern __u32 lu_context_tags_default;
74 extern __u32 lu_session_tags_default;
75 /**
76  * Initialize cl_object_header.
77  */
78 int cl_object_header_init(struct cl_object_header *h)
79 {
80         int result;
81
82         ENTRY;
83         result = lu_object_header_init(&h->coh_lu);
84         if (result == 0) {
85                 spin_lock_init(&h->coh_page_guard);
86                 spin_lock_init(&h->coh_lock_guard);
87                 spin_lock_init(&h->coh_attr_guard);
88                 lockdep_set_class(&h->coh_page_guard, &cl_page_guard_class);
89                 lockdep_set_class(&h->coh_lock_guard, &cl_lock_guard_class);
90                 lockdep_set_class(&h->coh_attr_guard, &cl_attr_guard_class);
91                 h->coh_pages = 0;
92                 /* XXX hard coded GFP_* mask. */
93                 INIT_RADIX_TREE(&h->coh_tree, GFP_ATOMIC);
94                 INIT_LIST_HEAD(&h->coh_locks);
95                 h->coh_page_bufsize = ALIGN(sizeof(struct cl_page), 8);
96         }
97         RETURN(result);
98 }
99 EXPORT_SYMBOL(cl_object_header_init);
100
101 /**
102  * Finalize cl_object_header.
103  */
104 void cl_object_header_fini(struct cl_object_header *h)
105 {
106         LASSERT(list_empty(&h->coh_locks));
107         lu_object_header_fini(&h->coh_lu);
108 }
109 EXPORT_SYMBOL(cl_object_header_fini);
110
111 /**
112  * Returns a cl_object with a given \a fid.
113  *
114  * Returns either cached or newly created object. Additional reference on the
115  * returned object is acquired.
116  *
117  * \see lu_object_find(), cl_page_find(), cl_lock_find()
118  */
119 struct cl_object *cl_object_find(const struct lu_env *env,
120                                  struct cl_device *cd, const struct lu_fid *fid,
121                                  const struct cl_object_conf *c)
122 {
123         might_sleep();
124         return lu2cl(lu_object_find_slice(env, cl2lu_dev(cd), fid, &c->coc_lu));
125 }
126 EXPORT_SYMBOL(cl_object_find);
127
128 /**
129  * Releases a reference on \a o.
130  *
131  * When last reference is released object is returned to the cache, unless
132  * lu_object_header_flags::LU_OBJECT_HEARD_BANSHEE bit is set in its header.
133  *
134  * \see cl_page_put(), cl_lock_put().
135  */
136 void cl_object_put(const struct lu_env *env, struct cl_object *o)
137 {
138         lu_object_put(env, &o->co_lu);
139 }
140 EXPORT_SYMBOL(cl_object_put);
141
142 /**
143  * Acquire an additional reference to the object \a o.
144  *
145  * This can only be used to acquire _additional_ reference, i.e., caller
146  * already has to possess at least one reference to \a o before calling this.
147  *
148  * \see cl_page_get(), cl_lock_get().
149  */
150 void cl_object_get(struct cl_object *o)
151 {
152         lu_object_get(&o->co_lu);
153 }
154 EXPORT_SYMBOL(cl_object_get);
155
156 /**
157  * Returns the top-object for a given \a o.
158  *
159  * \see cl_page_top(), cl_io_top()
160  */
161 struct cl_object *cl_object_top(struct cl_object *o)
162 {
163         struct cl_object_header *hdr = cl_object_header(o);
164         struct cl_object *top;
165
166         while (hdr->coh_parent != NULL)
167                 hdr = hdr->coh_parent;
168
169         top = lu2cl(lu_object_top(&hdr->coh_lu));
170         CDEBUG(D_TRACE, "%p -> %p\n", o, top);
171         return top;
172 }
173 EXPORT_SYMBOL(cl_object_top);
174
175 /**
176  * Returns pointer to the lock protecting data-attributes for the given object
177  * \a o.
178  *
179  * Data-attributes are protected by the cl_object_header::coh_attr_guard
180  * spin-lock in the top-object.
181  *
182  * \see cl_attr, cl_object_attr_lock(), cl_object_operations::coo_attr_get().
183  */
184 static spinlock_t *cl_object_attr_guard(struct cl_object *o)
185 {
186         return &cl_object_header(cl_object_top(o))->coh_attr_guard;
187 }
188
189 /**
190  * Locks data-attributes.
191  *
192  * Prevents data-attributes from changing, until lock is released by
193  * cl_object_attr_unlock(). This has to be called before calls to
194  * cl_object_attr_get(), cl_object_attr_set().
195  */
196 void cl_object_attr_lock(struct cl_object *o)
197 {
198         spin_lock(cl_object_attr_guard(o));
199 }
200 EXPORT_SYMBOL(cl_object_attr_lock);
201
202 /**
203  * Releases data-attributes lock, acquired by cl_object_attr_lock().
204  */
205 void cl_object_attr_unlock(struct cl_object *o)
206 {
207         spin_unlock(cl_object_attr_guard(o));
208 }
209 EXPORT_SYMBOL(cl_object_attr_unlock);
210
211 /**
212  * Returns data-attributes of an object \a obj.
213  *
214  * Every layer is asked (by calling cl_object_operations::coo_attr_get())
215  * top-to-bottom to fill in parts of \a attr that this layer is responsible
216  * for.
217  */
218 int cl_object_attr_get(const struct lu_env *env, struct cl_object *obj,
219                        struct cl_attr *attr)
220 {
221         struct lu_object_header *top;
222         int result;
223
224         LASSERT(spin_is_locked(cl_object_attr_guard(obj)));
225         ENTRY;
226
227         top = obj->co_lu.lo_header;
228         result = 0;
229         list_for_each_entry(obj, &top->loh_layers, co_lu.lo_linkage) {
230                 if (obj->co_ops->coo_attr_get != NULL) {
231                         result = obj->co_ops->coo_attr_get(env, obj, attr);
232                         if (result != 0) {
233                                 if (result > 0)
234                                         result = 0;
235                                 break;
236                         }
237                 }
238         }
239         RETURN(result);
240 }
241 EXPORT_SYMBOL(cl_object_attr_get);
242
243 /**
244  * Updates data-attributes of an object \a obj.
245  *
246  * Only attributes, mentioned in a validness bit-mask \a v are
247  * updated. Calls cl_object_operations::coo_attr_set() on every layer, bottom
248  * to top.
249  */
250 int cl_object_attr_set(const struct lu_env *env, struct cl_object *obj,
251                        const struct cl_attr *attr, unsigned v)
252 {
253         struct lu_object_header *top;
254         int result;
255
256         LASSERT(spin_is_locked(cl_object_attr_guard(obj)));
257         ENTRY;
258
259         top = obj->co_lu.lo_header;
260         result = 0;
261         list_for_each_entry_reverse(obj, &top->loh_layers,
262                                         co_lu.lo_linkage) {
263                 if (obj->co_ops->coo_attr_set != NULL) {
264                         result = obj->co_ops->coo_attr_set(env, obj, attr, v);
265                         if (result != 0) {
266                                 if (result > 0)
267                                         result = 0;
268                                 break;
269                         }
270                 }
271         }
272         RETURN(result);
273 }
274 EXPORT_SYMBOL(cl_object_attr_set);
275
276 /**
277  * Notifies layers (bottom-to-top) that glimpse AST was received.
278  *
279  * Layers have to fill \a lvb fields with information that will be shipped
280  * back to glimpse issuer.
281  *
282  * \see cl_lock_operations::clo_glimpse()
283  */
284 int cl_object_glimpse(const struct lu_env *env, struct cl_object *obj,
285                       struct ost_lvb *lvb)
286 {
287         struct lu_object_header *top;
288         int result;
289
290         ENTRY;
291         top = obj->co_lu.lo_header;
292         result = 0;
293         list_for_each_entry_reverse(obj, &top->loh_layers,
294                                         co_lu.lo_linkage) {
295                 if (obj->co_ops->coo_glimpse != NULL) {
296                         result = obj->co_ops->coo_glimpse(env, obj, lvb);
297                         if (result != 0)
298                                 break;
299                 }
300         }
301         LU_OBJECT_HEADER(D_DLMTRACE, env, lu_object_top(top),
302                          "size: "LPU64" mtime: "LPU64" atime: "LPU64" "
303                          "ctime: "LPU64" blocks: "LPU64"\n",
304                          lvb->lvb_size, lvb->lvb_mtime, lvb->lvb_atime,
305                          lvb->lvb_ctime, lvb->lvb_blocks);
306         RETURN(result);
307 }
308 EXPORT_SYMBOL(cl_object_glimpse);
309
310 /**
311  * Updates a configuration of an object \a obj.
312  */
313 int cl_conf_set(const struct lu_env *env, struct cl_object *obj,
314                 const struct cl_object_conf *conf)
315 {
316         struct lu_object_header *top;
317         int result;
318
319         ENTRY;
320         top = obj->co_lu.lo_header;
321         result = 0;
322         list_for_each_entry(obj, &top->loh_layers, co_lu.lo_linkage) {
323                 if (obj->co_ops->coo_conf_set != NULL) {
324                         result = obj->co_ops->coo_conf_set(env, obj, conf);
325                         if (result != 0)
326                                 break;
327                 }
328         }
329         RETURN(result);
330 }
331 EXPORT_SYMBOL(cl_conf_set);
332
333 /**
334  * Helper function removing all object locks, and marking object for
335  * deletion. All object pages must have been deleted at this point.
336  *
337  * This is called by cl_inode_fini() and lov_object_delete() to destroy top-
338  * and sub- objects respectively.
339  */
340 void cl_object_kill(const struct lu_env *env, struct cl_object *obj)
341 {
342         struct cl_object_header *hdr;
343
344         hdr = cl_object_header(obj);
345         LASSERT(hdr->coh_tree.rnode == NULL);
346         LASSERT(hdr->coh_pages == 0);
347
348         set_bit(LU_OBJECT_HEARD_BANSHEE, &hdr->coh_lu.loh_flags);
349         /*
350          * Destroy all locks. Object destruction (including cl_inode_fini())
351          * cannot cancel the locks, because in the case of a local client,
352          * where client and server share the same thread running
353          * prune_icache(), this can dead-lock with ldlm_cancel_handler()
354          * waiting on __wait_on_freeing_inode().
355          */
356         cl_locks_prune(env, obj, 0);
357 }
358 EXPORT_SYMBOL(cl_object_kill);
359
360 /**
361  * Prunes caches of pages and locks for this object.
362  */
363 void cl_object_prune(const struct lu_env *env, struct cl_object *obj)
364 {
365         ENTRY;
366         cl_pages_prune(env, obj);
367         cl_locks_prune(env, obj, 1);
368         EXIT;
369 }
370 EXPORT_SYMBOL(cl_object_prune);
371
372 /**
373  * Check if the object has locks.
374  */
375 int cl_object_has_locks(struct cl_object *obj)
376 {
377         struct cl_object_header *head = cl_object_header(obj);
378         int has;
379
380         spin_lock(&head->coh_lock_guard);
381         has = list_empty(&head->coh_locks);
382         spin_unlock(&head->coh_lock_guard);
383
384         return (has == 0);
385 }
386 EXPORT_SYMBOL(cl_object_has_locks);
387
388 void cache_stats_init(struct cache_stats *cs, const char *name)
389 {
390         int i;
391
392         cs->cs_name = name;
393         for (i = 0; i < CS_NR; i++)
394                 atomic_set(&cs->cs_stats[i], 0);
395 }
396
397 int cache_stats_print(const struct cache_stats *cs, struct seq_file *m, int h)
398 {
399         int i;
400         /*
401          *   lookup    hit    total  cached create
402          * env: ...... ...... ...... ...... ......
403          */
404         if (h) {
405                 const char *names[CS_NR] = CS_NAMES;
406
407                 seq_printf(m, "%6s", " ");
408                 for (i = 0; i < CS_NR; i++)
409                         seq_printf(m, "%8s", names[i]);
410                 seq_printf(m, "\n");
411         }
412
413         seq_printf(m, "%5.5s:", cs->cs_name);
414         for (i = 0; i < CS_NR; i++)
415                 seq_printf(m, "%8u", atomic_read(&cs->cs_stats[i]));
416         return 0;
417 }
418
419 /**
420  * Initialize client site.
421  *
422  * Perform common initialization (lu_site_init()), and initialize statistical
423  * counters. Also perform global initializations on the first call.
424  */
425 int cl_site_init(struct cl_site *s, struct cl_device *d)
426 {
427         int i;
428         int result;
429
430         result = lu_site_init(&s->cs_lu, &d->cd_lu_dev);
431         if (result == 0) {
432                 cache_stats_init(&s->cs_pages, "pages");
433                 cache_stats_init(&s->cs_locks, "locks");
434                 for (i = 0; i < ARRAY_SIZE(s->cs_pages_state); ++i)
435                         atomic_set(&s->cs_pages_state[0], 0);
436                 for (i = 0; i < ARRAY_SIZE(s->cs_locks_state); ++i)
437                         atomic_set(&s->cs_locks_state[i], 0);
438         }
439         return result;
440 }
441 EXPORT_SYMBOL(cl_site_init);
442
443 /**
444  * Finalize client site. Dual to cl_site_init().
445  */
446 void cl_site_fini(struct cl_site *s)
447 {
448         lu_site_fini(&s->cs_lu);
449 }
450 EXPORT_SYMBOL(cl_site_fini);
451
452 static struct cache_stats cl_env_stats = {
453         .cs_name    = "envs",
454         .cs_stats = { ATOMIC_INIT(0), }
455 };
456
457 /**
458  * Outputs client site statistical counters into a buffer. Suitable for
459  * ll_rd_*()-style functions.
460  */
461 int cl_site_stats_print(const struct cl_site *site, struct seq_file *m)
462 {
463         int i;
464         static const char *pstate[] = {
465                 [CPS_CACHED]  = "c",
466                 [CPS_OWNED]   = "o",
467                 [CPS_PAGEOUT] = "w",
468                 [CPS_PAGEIN]  = "r",
469                 [CPS_FREEING] = "f"
470         };
471         static const char *lstate[] = {
472                 [CLS_NEW]       = "n",
473                 [CLS_QUEUING]   = "q",
474                 [CLS_ENQUEUED]  = "e",
475                 [CLS_HELD]      = "h",
476                 [CLS_INTRANSIT] = "t",
477                 [CLS_CACHED]    = "c",
478                 [CLS_FREEING]   = "f"
479         };
480 /*
481        lookup    hit  total   busy create
482 pages: ...... ...... ...... ...... ...... [...... ...... ...... ......]
483 locks: ...... ...... ...... ...... ...... [...... ...... ...... ...... ......]
484   env: ...... ...... ...... ...... ......
485  */
486         lu_site_stats_print(&site->cs_lu, m);
487         cache_stats_print(&site->cs_pages, m, 1);
488         seq_printf(m, " [");
489         for (i = 0; i < ARRAY_SIZE(site->cs_pages_state); ++i)
490                 seq_printf(m, "%s: %u ", pstate[i],
491                                 atomic_read(&site->cs_pages_state[i]));
492         seq_printf(m, "]\n");
493         cache_stats_print(&site->cs_locks, m, 0);
494         seq_printf(m, " [");
495         for (i = 0; i < ARRAY_SIZE(site->cs_locks_state); ++i)
496                 seq_printf(m, "%s: %u ", lstate[i],
497                                 atomic_read(&site->cs_locks_state[i]));
498         seq_printf(m, "]\n");
499         cache_stats_print(&cl_env_stats, m, 0);
500         seq_printf(m, "\n");
501         return 0;
502 }
503 EXPORT_SYMBOL(cl_site_stats_print);
504
505 /*****************************************************************************
506  *
507  * lu_env handling on client.
508  *
509  */
510
511 /**
512  * The most efficient way is to store cl_env pointer in task specific
513  * structures. On Linux, it wont' be easy to use task_struct->journal_info
514  * because Lustre code may call into other fs which has certain assumptions
515  * about journal_info. Currently following fields in task_struct are identified
516  * can be used for this purpose:
517  *  - cl_env: for liblustre.
518  *  - tux_info: ony on RedHat kernel.
519  *  - ...
520  * \note As long as we use task_struct to store cl_env, we assume that once
521  * called into Lustre, we'll never call into the other part of the kernel
522  * which will use those fields in task_struct without explicitly exiting
523  * Lustre.
524  *
525  * If there's no space in task_struct is available, hash will be used.
526  * bz20044, bz22683.
527  */
528
529 struct cl_env {
530         void         *ce_magic;
531         struct lu_env     ce_lu;
532         struct lu_context ce_ses;
533
534         /**
535          * This allows cl_env to be entered into cl_env_hash which implements
536          * the current thread -> client environment lookup.
537          */
538         struct hlist_node  ce_node;
539         /**
540          * Owner for the current cl_env.
541          *
542          * If LL_TASK_CL_ENV is defined, this point to the owning current,
543          * only for debugging purpose ;
544          * Otherwise hash is used, and this is the key for cfs_hash.
545          * Now current thread pid is stored. Note using thread pointer would
546          * lead to unbalanced hash because of its specific allocation locality
547          * and could be varied for different platforms and OSes, even different
548          * OS versions.
549          */
550         void         *ce_owner;
551
552         /*
553          * Linkage into global list of all client environments. Used for
554          * garbage collection.
555          */
556         struct list_head        ce_linkage;
557         /*
558          *
559          */
560         int            ce_ref;
561         /*
562          * Debugging field: address of the caller who made original
563          * allocation.
564          */
565         void         *ce_debug;
566 };
567
568 #define CL_ENV_INC(counter)
569 #define CL_ENV_DEC(counter)
570
571 static void cl_env_init0(struct cl_env *cle, void *debug)
572 {
573         LASSERT(cle->ce_ref == 0);
574         LASSERT(cle->ce_magic == &cl_env_init0);
575         LASSERT(cle->ce_debug == NULL && cle->ce_owner == NULL);
576
577         cle->ce_ref = 1;
578         cle->ce_debug = debug;
579         CL_ENV_INC(busy);
580 }
581
582
583 /*
584  * The implementation of using hash table to connect cl_env and thread
585  */
586
587 static cfs_hash_t *cl_env_hash;
588
589 static unsigned cl_env_hops_hash(cfs_hash_t *lh,
590                                  const void *key, unsigned mask)
591 {
592 #if BITS_PER_LONG == 64
593         return cfs_hash_u64_hash((__u64)key, mask);
594 #else
595         return cfs_hash_u32_hash((__u32)key, mask);
596 #endif
597 }
598
599 static void *cl_env_hops_obj(struct hlist_node *hn)
600 {
601         struct cl_env *cle = hlist_entry(hn, struct cl_env, ce_node);
602         LASSERT(cle->ce_magic == &cl_env_init0);
603         return (void *)cle;
604 }
605
606 static int cl_env_hops_keycmp(const void *key, struct hlist_node *hn)
607 {
608         struct cl_env *cle = cl_env_hops_obj(hn);
609
610         LASSERT(cle->ce_owner != NULL);
611         return (key == cle->ce_owner);
612 }
613
614 static void cl_env_hops_noop(cfs_hash_t *hs, struct hlist_node *hn)
615 {
616         struct cl_env *cle = hlist_entry(hn, struct cl_env, ce_node);
617         LASSERT(cle->ce_magic == &cl_env_init0);
618 }
619
620 static cfs_hash_ops_t cl_env_hops = {
621         .hs_hash        = cl_env_hops_hash,
622         .hs_key  = cl_env_hops_obj,
623         .hs_keycmp      = cl_env_hops_keycmp,
624         .hs_object      = cl_env_hops_obj,
625         .hs_get  = cl_env_hops_noop,
626         .hs_put_locked  = cl_env_hops_noop,
627 };
628
629 static inline struct cl_env *cl_env_fetch(void)
630 {
631         struct cl_env *cle;
632
633         cle = cfs_hash_lookup(cl_env_hash, (void *) (long) current->pid);
634         LASSERT(ergo(cle, cle->ce_magic == &cl_env_init0));
635         return cle;
636 }
637
638 static inline void cl_env_attach(struct cl_env *cle)
639 {
640         if (cle) {
641                 int rc;
642
643                 LASSERT(cle->ce_owner == NULL);
644                 cle->ce_owner = (void *) (long) current->pid;
645                 rc = cfs_hash_add_unique(cl_env_hash, cle->ce_owner,
646                                          &cle->ce_node);
647                 LASSERT(rc == 0);
648         }
649 }
650
651 static inline void cl_env_do_detach(struct cl_env *cle)
652 {
653         void *cookie;
654
655         LASSERT(cle->ce_owner == (void *) (long) current->pid);
656         cookie = cfs_hash_del(cl_env_hash, cle->ce_owner,
657                               &cle->ce_node);
658         LASSERT(cookie == cle);
659         cle->ce_owner = NULL;
660 }
661
662 static int cl_env_store_init(void) {
663         cl_env_hash = cfs_hash_create("cl_env",
664                                       HASH_CL_ENV_BITS, HASH_CL_ENV_BITS,
665                                       HASH_CL_ENV_BKT_BITS, 0,
666                                       CFS_HASH_MIN_THETA,
667                                       CFS_HASH_MAX_THETA,
668                                       &cl_env_hops,
669                                       CFS_HASH_RW_BKTLOCK);
670         return cl_env_hash != NULL ? 0 :-ENOMEM;
671 }
672
673 static void cl_env_store_fini(void) {
674         cfs_hash_putref(cl_env_hash);
675 }
676
677
678 static inline struct cl_env *cl_env_detach(struct cl_env *cle)
679 {
680         if (cle == NULL)
681                 cle = cl_env_fetch();
682
683         if (cle && cle->ce_owner)
684                 cl_env_do_detach(cle);
685
686         return cle;
687 }
688
689 static struct lu_env *cl_env_new(__u32 ctx_tags, __u32 ses_tags, void *debug)
690 {
691         struct lu_env *env;
692         struct cl_env *cle;
693
694         OBD_SLAB_ALLOC_PTR_GFP(cle, cl_env_kmem, __GFP_IO);
695         if (cle != NULL) {
696                 int rc;
697
698                 INIT_LIST_HEAD(&cle->ce_linkage);
699                 cle->ce_magic = &cl_env_init0;
700                 env = &cle->ce_lu;
701                 rc = lu_env_init(env, LCT_CL_THREAD|ctx_tags);
702                 if (rc == 0) {
703                         rc = lu_context_init(&cle->ce_ses,
704                                              LCT_SESSION | ses_tags);
705                         if (rc == 0) {
706                                 lu_context_enter(&cle->ce_ses);
707                                 env->le_ses = &cle->ce_ses;
708                                 cl_env_init0(cle, debug);
709                         } else
710                                 lu_env_fini(env);
711                 }
712                 if (rc != 0) {
713                         OBD_SLAB_FREE_PTR(cle, cl_env_kmem);
714                         env = ERR_PTR(rc);
715                 } else {
716                         CL_ENV_INC(create);
717                         CL_ENV_INC(total);
718                 }
719         } else
720                 env = ERR_PTR(-ENOMEM);
721         return env;
722 }
723
724 static void cl_env_fini(struct cl_env *cle)
725 {
726         CL_ENV_DEC(total);
727         lu_context_fini(&cle->ce_lu.le_ctx);
728         lu_context_fini(&cle->ce_ses);
729         OBD_SLAB_FREE_PTR(cle, cl_env_kmem);
730 }
731
732 static inline struct cl_env *cl_env_container(struct lu_env *env)
733 {
734         return container_of(env, struct cl_env, ce_lu);
735 }
736
737 struct lu_env *cl_env_peek(int *refcheck)
738 {
739         struct lu_env *env;
740         struct cl_env *cle;
741
742         CL_ENV_INC(lookup);
743
744         /* check that we don't go far from untrusted pointer */
745         CLASSERT(offsetof(struct cl_env, ce_magic) == 0);
746
747         env = NULL;
748         cle = cl_env_fetch();
749         if (cle != NULL) {
750                 CL_ENV_INC(hit);
751                 env = &cle->ce_lu;
752                 *refcheck = ++cle->ce_ref;
753         }
754         CDEBUG(D_OTHER, "%d@%p\n", cle ? cle->ce_ref : 0, cle);
755         return env;
756 }
757 EXPORT_SYMBOL(cl_env_peek);
758
759 /**
760  * Returns lu_env: if there already is an environment associated with the
761  * current thread, it is returned, otherwise, new environment is allocated.
762  *
763  * \param refcheck pointer to a counter used to detect environment leaks. In
764  * the usual case cl_env_get() and cl_env_put() are called in the same lexical
765  * scope and pointer to the same integer is passed as \a refcheck. This is
766  * used to detect missed cl_env_put().
767  *
768  * \see cl_env_put()
769  */
770 struct lu_env *cl_env_get(int *refcheck)
771 {
772         struct lu_env *env;
773
774         env = cl_env_peek(refcheck);
775         if (env == NULL) {
776                 env = cl_env_new(lu_context_tags_default,
777                                  lu_session_tags_default,
778                                  __builtin_return_address(0));
779
780                 if (!IS_ERR(env)) {
781                         struct cl_env *cle;
782
783                         cle = cl_env_container(env);
784                         cl_env_attach(cle);
785                         *refcheck = cle->ce_ref;
786                         CDEBUG(D_OTHER, "%d@%p\n", cle->ce_ref, cle);
787                 }
788         }
789         return env;
790 }
791 EXPORT_SYMBOL(cl_env_get);
792
793 /**
794  * Forces an allocation of a fresh environment with given tags.
795  *
796  * \see cl_env_get()
797  */
798 struct lu_env *cl_env_alloc(int *refcheck, __u32 tags)
799 {
800         struct lu_env *env;
801
802         LASSERT(cl_env_peek(refcheck) == NULL);
803         env = cl_env_new(tags, tags, __builtin_return_address(0));
804         if (!IS_ERR(env)) {
805                 struct cl_env *cle;
806
807                 cle = cl_env_container(env);
808                 *refcheck = cle->ce_ref;
809                 CDEBUG(D_OTHER, "%d@%p\n", cle->ce_ref, cle);
810         }
811         return env;
812 }
813 EXPORT_SYMBOL(cl_env_alloc);
814
815 static void cl_env_exit(struct cl_env *cle)
816 {
817         LASSERT(cle->ce_owner == NULL);
818         lu_context_exit(&cle->ce_lu.le_ctx);
819         lu_context_exit(&cle->ce_ses);
820 }
821
822 /**
823  * Release an environment.
824  *
825  * Decrement \a env reference counter. When counter drops to 0, nothing in
826  * this thread is using environment and it is returned to the allocation
827  * cache, or freed straight away, if cache is large enough.
828  */
829 void cl_env_put(struct lu_env *env, int *refcheck)
830 {
831         struct cl_env *cle;
832
833         cle = cl_env_container(env);
834
835         LASSERT(cle->ce_ref > 0);
836         LASSERT(ergo(refcheck != NULL, cle->ce_ref == *refcheck));
837
838         CDEBUG(D_OTHER, "%d@%p\n", cle->ce_ref, cle);
839         if (--cle->ce_ref == 0) {
840                 CL_ENV_DEC(busy);
841                 cl_env_detach(cle);
842                 cle->ce_debug = NULL;
843                 cl_env_exit(cle);
844                 cl_env_fini(cle);
845         }
846 }
847 EXPORT_SYMBOL(cl_env_put);
848
849 /**
850  * Declares a point of re-entrancy.
851  *
852  * \see cl_env_reexit()
853  */
854 void *cl_env_reenter(void)
855 {
856         return cl_env_detach(NULL);
857 }
858 EXPORT_SYMBOL(cl_env_reenter);
859
860 /**
861  * Exits re-entrancy.
862  */
863 void cl_env_reexit(void *cookie)
864 {
865         cl_env_detach(NULL);
866         cl_env_attach(cookie);
867 }
868 EXPORT_SYMBOL(cl_env_reexit);
869
870 /**
871  * Setup user-supplied \a env as a current environment. This is to be used to
872  * guaranteed that environment exists even when cl_env_get() fails. It is up
873  * to user to ensure proper concurrency control.
874  *
875  * \see cl_env_unplant()
876  */
877 void cl_env_implant(struct lu_env *env, int *refcheck)
878 {
879         struct cl_env *cle = cl_env_container(env);
880
881         LASSERT(cle->ce_ref > 0);
882
883         cl_env_attach(cle);
884         cl_env_get(refcheck);
885         CDEBUG(D_OTHER, "%d@%p\n", cle->ce_ref, cle);
886 }
887 EXPORT_SYMBOL(cl_env_implant);
888
889 /**
890  * Detach environment installed earlier by cl_env_implant().
891  */
892 void cl_env_unplant(struct lu_env *env, int *refcheck)
893 {
894         struct cl_env *cle = cl_env_container(env);
895
896         LASSERT(cle->ce_ref > 1);
897
898         CDEBUG(D_OTHER, "%d@%p\n", cle->ce_ref, cle);
899
900         cl_env_detach(cle);
901         cl_env_put(env, refcheck);
902 }
903 EXPORT_SYMBOL(cl_env_unplant);
904
905 struct lu_env *cl_env_nested_get(struct cl_env_nest *nest)
906 {
907         struct lu_env *env;
908
909         nest->cen_cookie = NULL;
910         env = cl_env_peek(&nest->cen_refcheck);
911         if (env != NULL) {
912                 if (!cl_io_is_going(env))
913                         return env;
914                 else {
915                         cl_env_put(env, &nest->cen_refcheck);
916                         nest->cen_cookie = cl_env_reenter();
917                 }
918         }
919         env = cl_env_get(&nest->cen_refcheck);
920         if (IS_ERR(env)) {
921                 cl_env_reexit(nest->cen_cookie);
922                 return env;
923         }
924
925         LASSERT(!cl_io_is_going(env));
926         return env;
927 }
928 EXPORT_SYMBOL(cl_env_nested_get);
929
930 void cl_env_nested_put(struct cl_env_nest *nest, struct lu_env *env)
931 {
932         cl_env_put(env, &nest->cen_refcheck);
933         cl_env_reexit(nest->cen_cookie);
934 }
935 EXPORT_SYMBOL(cl_env_nested_put);
936
937 /**
938  * Converts struct cl_attr to struct ost_lvb.
939  *
940  * \see cl_lvb2attr
941  */
942 void cl_attr2lvb(struct ost_lvb *lvb, const struct cl_attr *attr)
943 {
944         ENTRY;
945         lvb->lvb_size   = attr->cat_size;
946         lvb->lvb_mtime  = attr->cat_mtime;
947         lvb->lvb_atime  = attr->cat_atime;
948         lvb->lvb_ctime  = attr->cat_ctime;
949         lvb->lvb_blocks = attr->cat_blocks;
950         EXIT;
951 }
952 EXPORT_SYMBOL(cl_attr2lvb);
953
954 /**
955  * Converts struct ost_lvb to struct cl_attr.
956  *
957  * \see cl_attr2lvb
958  */
959 void cl_lvb2attr(struct cl_attr *attr, const struct ost_lvb *lvb)
960 {
961         ENTRY;
962         attr->cat_size   = lvb->lvb_size;
963         attr->cat_mtime  = lvb->lvb_mtime;
964         attr->cat_atime  = lvb->lvb_atime;
965         attr->cat_ctime  = lvb->lvb_ctime;
966         attr->cat_blocks = lvb->lvb_blocks;
967         EXIT;
968 }
969 EXPORT_SYMBOL(cl_lvb2attr);
970
971 /*****************************************************************************
972  *
973  * Temporary prototype thing: mirror obd-devices into cl devices.
974  *
975  */
976
977 struct cl_device *cl_type_setup(const struct lu_env *env, struct lu_site *site,
978                                 struct lu_device_type *ldt,
979                                 struct lu_device *next)
980 {
981         const char       *typename;
982         struct lu_device *d;
983
984         LASSERT(ldt != NULL);
985
986         typename = ldt->ldt_name;
987         d = ldt->ldt_ops->ldto_device_alloc(env, ldt, NULL);
988         if (!IS_ERR(d)) {
989                 int rc;
990
991                 if (site != NULL)
992                         d->ld_site = site;
993                 rc = ldt->ldt_ops->ldto_device_init(env, d, typename, next);
994                 if (rc == 0) {
995                         lu_device_get(d);
996                         lu_ref_add(&d->ld_reference,
997                                    "lu-stack", &lu_site_init);
998                 } else {
999                         ldt->ldt_ops->ldto_device_free(env, d);
1000                         CERROR("can't init device '%s', %d\n", typename, rc);
1001                         d = ERR_PTR(rc);
1002                 }
1003         } else
1004                 CERROR("Cannot allocate device: '%s'\n", typename);
1005         return lu2cl_dev(d);
1006 }
1007 EXPORT_SYMBOL(cl_type_setup);
1008
1009 /**
1010  * Finalize device stack by calling lu_stack_fini().
1011  */
1012 void cl_stack_fini(const struct lu_env *env, struct cl_device *cl)
1013 {
1014         lu_stack_fini(env, cl2lu_dev(cl));
1015 }
1016 EXPORT_SYMBOL(cl_stack_fini);
1017
1018 int  cl_lock_init(void);
1019 void cl_lock_fini(void);
1020
1021 int  cl_page_init(void);
1022 void cl_page_fini(void);
1023
1024 static struct lu_context_key cl_key;
1025
1026 struct cl_thread_info *cl_env_info(const struct lu_env *env)
1027 {
1028         return lu_context_key_get(&env->le_ctx, &cl_key);
1029 }
1030
1031 /* defines cl0_key_{init,fini}() */
1032 LU_KEY_INIT_FINI(cl0, struct cl_thread_info);
1033
1034 static void *cl_key_init(const struct lu_context *ctx,
1035                          struct lu_context_key *key)
1036 {
1037         struct cl_thread_info *info;
1038
1039         info = cl0_key_init(ctx, key);
1040         if (!IS_ERR(info)) {
1041                 int i;
1042
1043                 for (i = 0; i < ARRAY_SIZE(info->clt_counters); ++i)
1044                         lu_ref_init(&info->clt_counters[i].ctc_locks_locked);
1045         }
1046         return info;
1047 }
1048
1049 static void cl_key_fini(const struct lu_context *ctx,
1050                         struct lu_context_key *key, void *data)
1051 {
1052         struct cl_thread_info *info;
1053         int i;
1054
1055         info = data;
1056         for (i = 0; i < ARRAY_SIZE(info->clt_counters); ++i)
1057                 lu_ref_fini(&info->clt_counters[i].ctc_locks_locked);
1058         cl0_key_fini(ctx, key, data);
1059 }
1060
1061 static void cl_key_exit(const struct lu_context *ctx,
1062                         struct lu_context_key *key, void *data)
1063 {
1064         struct cl_thread_info *info = data;
1065         int i;
1066
1067         for (i = 0; i < ARRAY_SIZE(info->clt_counters); ++i) {
1068                 LASSERT(info->clt_counters[i].ctc_nr_held == 0);
1069                 LASSERT(info->clt_counters[i].ctc_nr_used == 0);
1070                 LASSERT(info->clt_counters[i].ctc_nr_locks_acquired == 0);
1071                 LASSERT(info->clt_counters[i].ctc_nr_locks_locked == 0);
1072                 lu_ref_fini(&info->clt_counters[i].ctc_locks_locked);
1073                 lu_ref_init(&info->clt_counters[i].ctc_locks_locked);
1074         }
1075 }
1076
1077 static struct lu_context_key cl_key = {
1078         .lct_tags = LCT_CL_THREAD,
1079         .lct_init = cl_key_init,
1080         .lct_fini = cl_key_fini,
1081         .lct_exit = cl_key_exit
1082 };
1083
1084 static struct lu_kmem_descr cl_object_caches[] = {
1085         {
1086                 .ckd_cache = &cl_env_kmem,
1087                 .ckd_name  = "cl_env_kmem",
1088                 .ckd_size  = sizeof (struct cl_env)
1089         },
1090         {
1091                 .ckd_cache = NULL
1092         }
1093 };
1094
1095 /**
1096  * Global initialization of cl-data. Create kmem caches, register
1097  * lu_context_key's, etc.
1098  *
1099  * \see cl_global_fini()
1100  */
1101 int cl_global_init(void)
1102 {
1103         int result;
1104
1105         result = cl_env_store_init();
1106         if (result)
1107                 return result;
1108
1109         result = lu_kmem_init(cl_object_caches);
1110         if (result)
1111                 goto out_store;
1112
1113         LU_CONTEXT_KEY_INIT(&cl_key);
1114         result = lu_context_key_register(&cl_key);
1115         if (result)
1116                 goto out_kmem;
1117
1118         result = cl_lock_init();
1119         if (result)
1120                 goto out_context;
1121
1122         result = cl_page_init();
1123         if (result)
1124                 goto out_lock;
1125
1126         return 0;
1127 out_lock:
1128         cl_lock_fini();
1129 out_context:
1130         lu_context_key_degister(&cl_key);
1131 out_kmem:
1132         lu_kmem_fini(cl_object_caches);
1133 out_store:
1134         cl_env_store_fini();
1135         return result;
1136 }
1137
1138 /**
1139  * Finalization of global cl-data. Dual to cl_global_init().
1140  */
1141 void cl_global_fini(void)
1142 {
1143         cl_lock_fini();
1144         cl_page_fini();
1145         lu_context_key_degister(&cl_key);
1146         lu_kmem_fini(cl_object_caches);
1147         cl_env_store_fini();
1148 }