]> git.karo-electronics.de Git - karo-tx-linux.git/blob - drivers/staging/lustre/lustre/obdecho/echo_client.c
Merge tag 'sound-fix-4.5-rc1' of git://git.kernel.org/pub/scm/linux/kernel/git/tiwai...
[karo-tx-linux.git] / drivers / staging / lustre / lustre / obdecho / echo_client.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2015, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_ECHO
38 #include "../../include/linux/libcfs/libcfs.h"
39
40 #include "../include/obd.h"
41 #include "../include/obd_support.h"
42 #include "../include/obd_class.h"
43 #include "../include/lustre_debug.h"
44 #include "../include/lprocfs_status.h"
45 #include "../include/cl_object.h"
46 #include "../include/lustre_fid.h"
47 #include "../include/lustre_acl.h"
48 #include "../include/lustre_net.h"
49
50 #include "echo_internal.h"
51
52 /** \defgroup echo_client Echo Client
53  * @{
54  */
55
56 struct echo_device {
57         struct cl_device        ed_cl;
58         struct echo_client_obd *ed_ec;
59
60         struct cl_site    ed_site_myself;
61         struct cl_site   *ed_site;
62         struct lu_device       *ed_next;
63         int                  ed_next_islov;
64 };
65
66 struct echo_object {
67         struct cl_object        eo_cl;
68         struct cl_object_header eo_hdr;
69
70         struct echo_device     *eo_dev;
71         struct list_head              eo_obj_chain;
72         struct lov_stripe_md   *eo_lsm;
73         atomic_t            eo_npages;
74         int                  eo_deleted;
75 };
76
77 struct echo_object_conf {
78         struct cl_object_conf  eoc_cl;
79         struct lov_stripe_md **eoc_md;
80 };
81
82 struct echo_page {
83         struct cl_page_slice   ep_cl;
84         struct mutex            ep_lock;
85         struct page         *ep_vmpage;
86 };
87
88 struct echo_lock {
89         struct cl_lock_slice   el_cl;
90         struct list_head             el_chain;
91         struct echo_object    *el_object;
92         __u64             el_cookie;
93         atomic_t           el_refcount;
94 };
95
96 static int echo_client_setup(const struct lu_env *env,
97                              struct obd_device *obddev,
98                              struct lustre_cfg *lcfg);
99 static int echo_client_cleanup(struct obd_device *obddev);
100
101 /** \defgroup echo_helpers Helper functions
102  * @{
103  */
104 static inline struct echo_device *cl2echo_dev(const struct cl_device *dev)
105 {
106         return container_of0(dev, struct echo_device, ed_cl);
107 }
108
109 static inline struct cl_device *echo_dev2cl(struct echo_device *d)
110 {
111         return &d->ed_cl;
112 }
113
114 static inline struct echo_device *obd2echo_dev(const struct obd_device *obd)
115 {
116         return cl2echo_dev(lu2cl_dev(obd->obd_lu_dev));
117 }
118
119 static inline struct cl_object *echo_obj2cl(struct echo_object *eco)
120 {
121         return &eco->eo_cl;
122 }
123
124 static inline struct echo_object *cl2echo_obj(const struct cl_object *o)
125 {
126         return container_of(o, struct echo_object, eo_cl);
127 }
128
129 static inline struct echo_page *cl2echo_page(const struct cl_page_slice *s)
130 {
131         return container_of(s, struct echo_page, ep_cl);
132 }
133
134 static inline struct echo_lock *cl2echo_lock(const struct cl_lock_slice *s)
135 {
136         return container_of(s, struct echo_lock, el_cl);
137 }
138
139 static inline struct cl_lock *echo_lock2cl(const struct echo_lock *ecl)
140 {
141         return ecl->el_cl.cls_lock;
142 }
143
144 static struct lu_context_key echo_thread_key;
145 static inline struct echo_thread_info *echo_env_info(const struct lu_env *env)
146 {
147         struct echo_thread_info *info;
148
149         info = lu_context_key_get(&env->le_ctx, &echo_thread_key);
150         LASSERT(info != NULL);
151         return info;
152 }
153
154 static inline
155 struct echo_object_conf *cl2echo_conf(const struct cl_object_conf *c)
156 {
157         return container_of(c, struct echo_object_conf, eoc_cl);
158 }
159
160 /** @} echo_helpers */
161
162 static struct echo_object *cl_echo_object_find(struct echo_device *d,
163                                                struct lov_stripe_md **lsm);
164 static int cl_echo_object_put(struct echo_object *eco);
165 static int cl_echo_enqueue(struct echo_object *eco, u64 start,
166                            u64 end, int mode, __u64 *cookie);
167 static int cl_echo_cancel(struct echo_device *d, __u64 cookie);
168 static int cl_echo_object_brw(struct echo_object *eco, int rw, u64 offset,
169                               struct page **pages, int npages, int async);
170
171 static struct echo_thread_info *echo_env_info(const struct lu_env *env);
172
173 struct echo_thread_info {
174         struct echo_object_conf eti_conf;
175         struct lustre_md        eti_md;
176
177         struct cl_2queue        eti_queue;
178         struct cl_io        eti_io;
179         struct cl_lock_descr    eti_descr;
180         struct lu_fid      eti_fid;
181         struct lu_fid           eti_fid2;
182 };
183
184 /* No session used right now */
185 struct echo_session_info {
186         unsigned long dummy;
187 };
188
189 static struct kmem_cache *echo_lock_kmem;
190 static struct kmem_cache *echo_object_kmem;
191 static struct kmem_cache *echo_thread_kmem;
192 static struct kmem_cache *echo_session_kmem;
193
194 static struct lu_kmem_descr echo_caches[] = {
195         {
196                 .ckd_cache = &echo_lock_kmem,
197                 .ckd_name  = "echo_lock_kmem",
198                 .ckd_size  = sizeof(struct echo_lock)
199         },
200         {
201                 .ckd_cache = &echo_object_kmem,
202                 .ckd_name  = "echo_object_kmem",
203                 .ckd_size  = sizeof(struct echo_object)
204         },
205         {
206                 .ckd_cache = &echo_thread_kmem,
207                 .ckd_name  = "echo_thread_kmem",
208                 .ckd_size  = sizeof(struct echo_thread_info)
209         },
210         {
211                 .ckd_cache = &echo_session_kmem,
212                 .ckd_name  = "echo_session_kmem",
213                 .ckd_size  = sizeof(struct echo_session_info)
214         },
215         {
216                 .ckd_cache = NULL
217         }
218 };
219
220 /** \defgroup echo_page Page operations
221  *
222  * Echo page operations.
223  *
224  * @{
225  */
226 static struct page *echo_page_vmpage(const struct lu_env *env,
227                                     const struct cl_page_slice *slice)
228 {
229         return cl2echo_page(slice)->ep_vmpage;
230 }
231
232 static int echo_page_own(const struct lu_env *env,
233                          const struct cl_page_slice *slice,
234                          struct cl_io *io, int nonblock)
235 {
236         struct echo_page *ep = cl2echo_page(slice);
237
238         if (!nonblock)
239                 mutex_lock(&ep->ep_lock);
240         else if (!mutex_trylock(&ep->ep_lock))
241                 return -EAGAIN;
242         return 0;
243 }
244
245 static void echo_page_disown(const struct lu_env *env,
246                              const struct cl_page_slice *slice,
247                              struct cl_io *io)
248 {
249         struct echo_page *ep = cl2echo_page(slice);
250
251         LASSERT(mutex_is_locked(&ep->ep_lock));
252         mutex_unlock(&ep->ep_lock);
253 }
254
255 static void echo_page_discard(const struct lu_env *env,
256                               const struct cl_page_slice *slice,
257                               struct cl_io *unused)
258 {
259         cl_page_delete(env, slice->cpl_page);
260 }
261
262 static int echo_page_is_vmlocked(const struct lu_env *env,
263                                  const struct cl_page_slice *slice)
264 {
265         if (mutex_is_locked(&cl2echo_page(slice)->ep_lock))
266                 return -EBUSY;
267         return -ENODATA;
268 }
269
270 static void echo_page_completion(const struct lu_env *env,
271                                  const struct cl_page_slice *slice,
272                                  int ioret)
273 {
274         LASSERT(slice->cpl_page->cp_sync_io != NULL);
275 }
276
277 static void echo_page_fini(const struct lu_env *env,
278                            struct cl_page_slice *slice)
279 {
280         struct echo_page *ep    = cl2echo_page(slice);
281         struct echo_object *eco = cl2echo_obj(slice->cpl_obj);
282         struct page *vmpage      = ep->ep_vmpage;
283
284         atomic_dec(&eco->eo_npages);
285         page_cache_release(vmpage);
286 }
287
288 static int echo_page_prep(const struct lu_env *env,
289                           const struct cl_page_slice *slice,
290                           struct cl_io *unused)
291 {
292         return 0;
293 }
294
295 static int echo_page_print(const struct lu_env *env,
296                            const struct cl_page_slice *slice,
297                            void *cookie, lu_printer_t printer)
298 {
299         struct echo_page *ep = cl2echo_page(slice);
300
301         (*printer)(env, cookie, LUSTRE_ECHO_CLIENT_NAME"-page@%p %d vm@%p\n",
302                    ep, mutex_is_locked(&ep->ep_lock), ep->ep_vmpage);
303         return 0;
304 }
305
306 static const struct cl_page_operations echo_page_ops = {
307         .cpo_own           = echo_page_own,
308         .cpo_disown     = echo_page_disown,
309         .cpo_discard       = echo_page_discard,
310         .cpo_vmpage     = echo_page_vmpage,
311         .cpo_fini         = echo_page_fini,
312         .cpo_print       = echo_page_print,
313         .cpo_is_vmlocked   = echo_page_is_vmlocked,
314         .io = {
315                 [CRT_READ] = {
316                         .cpo_prep       = echo_page_prep,
317                         .cpo_completion  = echo_page_completion,
318                 },
319                 [CRT_WRITE] = {
320                         .cpo_prep       = echo_page_prep,
321                         .cpo_completion  = echo_page_completion,
322                 }
323         }
324 };
325
326 /** @} echo_page */
327
328 /** \defgroup echo_lock Locking
329  *
330  * echo lock operations
331  *
332  * @{
333  */
334 static void echo_lock_fini(const struct lu_env *env,
335                            struct cl_lock_slice *slice)
336 {
337         struct echo_lock *ecl = cl2echo_lock(slice);
338
339         LASSERT(list_empty(&ecl->el_chain));
340         kmem_cache_free(echo_lock_kmem, ecl);
341 }
342
343 static void echo_lock_delete(const struct lu_env *env,
344                              const struct cl_lock_slice *slice)
345 {
346         struct echo_lock *ecl      = cl2echo_lock(slice);
347
348         LASSERT(list_empty(&ecl->el_chain));
349 }
350
351 static int echo_lock_fits_into(const struct lu_env *env,
352                                const struct cl_lock_slice *slice,
353                                const struct cl_lock_descr *need,
354                                const struct cl_io *unused)
355 {
356         return 1;
357 }
358
359 static struct cl_lock_operations echo_lock_ops = {
360         .clo_fini      = echo_lock_fini,
361         .clo_delete    = echo_lock_delete,
362         .clo_fits_into = echo_lock_fits_into
363 };
364
365 /** @} echo_lock */
366
367 /** \defgroup echo_cl_ops cl_object operations
368  *
369  * operations for cl_object
370  *
371  * @{
372  */
373 static int echo_page_init(const struct lu_env *env, struct cl_object *obj,
374                         struct cl_page *page, struct page *vmpage)
375 {
376         struct echo_page *ep = cl_object_page_slice(obj, page);
377         struct echo_object *eco = cl2echo_obj(obj);
378
379         ep->ep_vmpage = vmpage;
380         page_cache_get(vmpage);
381         mutex_init(&ep->ep_lock);
382         cl_page_slice_add(page, &ep->ep_cl, obj, &echo_page_ops);
383         atomic_inc(&eco->eo_npages);
384         return 0;
385 }
386
387 static int echo_io_init(const struct lu_env *env, struct cl_object *obj,
388                         struct cl_io *io)
389 {
390         return 0;
391 }
392
393 static int echo_lock_init(const struct lu_env *env,
394                           struct cl_object *obj, struct cl_lock *lock,
395                           const struct cl_io *unused)
396 {
397         struct echo_lock *el;
398
399         el = kmem_cache_alloc(echo_lock_kmem, GFP_NOFS | __GFP_ZERO);
400         if (el != NULL) {
401                 cl_lock_slice_add(lock, &el->el_cl, obj, &echo_lock_ops);
402                 el->el_object = cl2echo_obj(obj);
403                 INIT_LIST_HEAD(&el->el_chain);
404                 atomic_set(&el->el_refcount, 0);
405         }
406         return el == NULL ? -ENOMEM : 0;
407 }
408
409 static int echo_conf_set(const struct lu_env *env, struct cl_object *obj,
410                          const struct cl_object_conf *conf)
411 {
412         return 0;
413 }
414
415 static const struct cl_object_operations echo_cl_obj_ops = {
416         .coo_page_init = echo_page_init,
417         .coo_lock_init = echo_lock_init,
418         .coo_io_init   = echo_io_init,
419         .coo_conf_set  = echo_conf_set
420 };
421
422 /** @} echo_cl_ops */
423
424 /** \defgroup echo_lu_ops lu_object operations
425  *
426  * operations for echo lu object.
427  *
428  * @{
429  */
430 static int echo_object_init(const struct lu_env *env, struct lu_object *obj,
431                             const struct lu_object_conf *conf)
432 {
433         struct echo_device *ed   = cl2echo_dev(lu2cl_dev(obj->lo_dev));
434         struct echo_client_obd *ec     = ed->ed_ec;
435         struct echo_object *eco = cl2echo_obj(lu2cl(obj));
436         const struct cl_object_conf *cconf;
437         struct echo_object_conf *econf;
438
439         if (ed->ed_next) {
440                 struct lu_object  *below;
441                 struct lu_device  *under;
442
443                 under = ed->ed_next;
444                 below = under->ld_ops->ldo_object_alloc(env, obj->lo_header,
445                                                         under);
446                 if (below == NULL)
447                         return -ENOMEM;
448                 lu_object_add(obj, below);
449         }
450
451         cconf = lu2cl_conf(conf);
452         econf = cl2echo_conf(cconf);
453
454         LASSERT(econf->eoc_md);
455         eco->eo_lsm = *econf->eoc_md;
456         /* clear the lsm pointer so that it won't get freed. */
457         *econf->eoc_md = NULL;
458
459         eco->eo_dev = ed;
460         atomic_set(&eco->eo_npages, 0);
461         cl_object_page_init(lu2cl(obj), sizeof(struct echo_page));
462
463         spin_lock(&ec->ec_lock);
464         list_add_tail(&eco->eo_obj_chain, &ec->ec_objects);
465         spin_unlock(&ec->ec_lock);
466
467         return 0;
468 }
469
470 /* taken from osc_unpackmd() */
471 static int echo_alloc_memmd(struct echo_device *ed,
472                             struct lov_stripe_md **lsmp)
473 {
474         int lsm_size;
475
476         /* If export is lov/osc then use their obd method */
477         if (ed->ed_next != NULL)
478                 return obd_alloc_memmd(ed->ed_ec->ec_exp, lsmp);
479         /* OFD has no unpackmd method, do everything here */
480         lsm_size = lov_stripe_md_size(1);
481
482         LASSERT(*lsmp == NULL);
483         *lsmp = kzalloc(lsm_size, GFP_NOFS);
484         if (!*lsmp)
485                 return -ENOMEM;
486
487         (*lsmp)->lsm_oinfo[0] = kzalloc(sizeof(struct lov_oinfo), GFP_NOFS);
488         if (!(*lsmp)->lsm_oinfo[0]) {
489                 kfree(*lsmp);
490                 return -ENOMEM;
491         }
492
493         loi_init((*lsmp)->lsm_oinfo[0]);
494         (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
495         ostid_set_seq_echo(&(*lsmp)->lsm_oi);
496
497         return lsm_size;
498 }
499
500 static int echo_free_memmd(struct echo_device *ed, struct lov_stripe_md **lsmp)
501 {
502         int lsm_size;
503
504         /* If export is lov/osc then use their obd method */
505         if (ed->ed_next != NULL)
506                 return obd_free_memmd(ed->ed_ec->ec_exp, lsmp);
507         /* OFD has no unpackmd method, do everything here */
508         lsm_size = lov_stripe_md_size(1);
509
510         LASSERT(*lsmp != NULL);
511         kfree((*lsmp)->lsm_oinfo[0]);
512         kfree(*lsmp);
513         *lsmp = NULL;
514         return 0;
515 }
516
517 static void echo_object_free(const struct lu_env *env, struct lu_object *obj)
518 {
519         struct echo_object *eco    = cl2echo_obj(lu2cl(obj));
520         struct echo_client_obd *ec = eco->eo_dev->ed_ec;
521
522         LASSERT(atomic_read(&eco->eo_npages) == 0);
523
524         spin_lock(&ec->ec_lock);
525         list_del_init(&eco->eo_obj_chain);
526         spin_unlock(&ec->ec_lock);
527
528         lu_object_fini(obj);
529         lu_object_header_fini(obj->lo_header);
530
531         if (eco->eo_lsm)
532                 echo_free_memmd(eco->eo_dev, &eco->eo_lsm);
533         kmem_cache_free(echo_object_kmem, eco);
534 }
535
536 static int echo_object_print(const struct lu_env *env, void *cookie,
537                             lu_printer_t p, const struct lu_object *o)
538 {
539         struct echo_object *obj = cl2echo_obj(lu2cl(o));
540
541         return (*p)(env, cookie, "echoclient-object@%p", obj);
542 }
543
544 static const struct lu_object_operations echo_lu_obj_ops = {
545         .loo_object_init      = echo_object_init,
546         .loo_object_delete    = NULL,
547         .loo_object_release   = NULL,
548         .loo_object_free      = echo_object_free,
549         .loo_object_print     = echo_object_print,
550         .loo_object_invariant = NULL
551 };
552
553 /** @} echo_lu_ops */
554
555 /** \defgroup echo_lu_dev_ops  lu_device operations
556  *
557  * Operations for echo lu device.
558  *
559  * @{
560  */
561 static struct lu_object *echo_object_alloc(const struct lu_env *env,
562                                            const struct lu_object_header *hdr,
563                                            struct lu_device *dev)
564 {
565         struct echo_object *eco;
566         struct lu_object *obj = NULL;
567
568         /* we're the top dev. */
569         LASSERT(hdr == NULL);
570         eco = kmem_cache_alloc(echo_object_kmem, GFP_NOFS | __GFP_ZERO);
571         if (eco != NULL) {
572                 struct cl_object_header *hdr = &eco->eo_hdr;
573
574                 obj = &echo_obj2cl(eco)->co_lu;
575                 cl_object_header_init(hdr);
576                 lu_object_init(obj, &hdr->coh_lu, dev);
577                 lu_object_add_top(&hdr->coh_lu, obj);
578
579                 eco->eo_cl.co_ops = &echo_cl_obj_ops;
580                 obj->lo_ops       = &echo_lu_obj_ops;
581         }
582         return obj;
583 }
584
585 static struct lu_device_operations echo_device_lu_ops = {
586         .ldo_object_alloc   = echo_object_alloc,
587 };
588
589 /** @} echo_lu_dev_ops */
590
591 static struct cl_device_operations echo_device_cl_ops = {
592 };
593
594 /** \defgroup echo_init Setup and teardown
595  *
596  * Init and fini functions for echo client.
597  *
598  * @{
599  */
600 static int echo_site_init(const struct lu_env *env, struct echo_device *ed)
601 {
602         struct cl_site *site = &ed->ed_site_myself;
603         int rc;
604
605         /* initialize site */
606         rc = cl_site_init(site, &ed->ed_cl);
607         if (rc) {
608                 CERROR("Cannot initialize site for echo client(%d)\n", rc);
609                 return rc;
610         }
611
612         rc = lu_site_init_finish(&site->cs_lu);
613         if (rc)
614                 return rc;
615
616         ed->ed_site = site;
617         return 0;
618 }
619
620 static void echo_site_fini(const struct lu_env *env, struct echo_device *ed)
621 {
622         if (ed->ed_site) {
623                 cl_site_fini(ed->ed_site);
624                 ed->ed_site = NULL;
625         }
626 }
627
628 static void *echo_thread_key_init(const struct lu_context *ctx,
629                           struct lu_context_key *key)
630 {
631         struct echo_thread_info *info;
632
633         info = kmem_cache_alloc(echo_thread_kmem, GFP_NOFS | __GFP_ZERO);
634         if (info == NULL)
635                 info = ERR_PTR(-ENOMEM);
636         return info;
637 }
638
639 static void echo_thread_key_fini(const struct lu_context *ctx,
640                          struct lu_context_key *key, void *data)
641 {
642         struct echo_thread_info *info = data;
643
644         kmem_cache_free(echo_thread_kmem, info);
645 }
646
647 static void echo_thread_key_exit(const struct lu_context *ctx,
648                          struct lu_context_key *key, void *data)
649 {
650 }
651
652 static struct lu_context_key echo_thread_key = {
653         .lct_tags = LCT_CL_THREAD,
654         .lct_init = echo_thread_key_init,
655         .lct_fini = echo_thread_key_fini,
656         .lct_exit = echo_thread_key_exit
657 };
658
659 static void *echo_session_key_init(const struct lu_context *ctx,
660                                   struct lu_context_key *key)
661 {
662         struct echo_session_info *session;
663
664         session = kmem_cache_alloc(echo_session_kmem, GFP_NOFS | __GFP_ZERO);
665         if (session == NULL)
666                 session = ERR_PTR(-ENOMEM);
667         return session;
668 }
669
670 static void echo_session_key_fini(const struct lu_context *ctx,
671                                  struct lu_context_key *key, void *data)
672 {
673         struct echo_session_info *session = data;
674
675         kmem_cache_free(echo_session_kmem, session);
676 }
677
678 static void echo_session_key_exit(const struct lu_context *ctx,
679                                  struct lu_context_key *key, void *data)
680 {
681 }
682
683 static struct lu_context_key echo_session_key = {
684         .lct_tags = LCT_SESSION,
685         .lct_init = echo_session_key_init,
686         .lct_fini = echo_session_key_fini,
687         .lct_exit = echo_session_key_exit
688 };
689
690 LU_TYPE_INIT_FINI(echo, &echo_thread_key, &echo_session_key);
691
692 static struct lu_device *echo_device_alloc(const struct lu_env *env,
693                                            struct lu_device_type *t,
694                                            struct lustre_cfg *cfg)
695 {
696         struct lu_device   *next;
697         struct echo_device *ed;
698         struct cl_device   *cd;
699         struct obd_device  *obd = NULL; /* to keep compiler happy */
700         struct obd_device  *tgt;
701         const char *tgt_type_name;
702         int rc;
703         int cleanup = 0;
704
705         ed = kzalloc(sizeof(*ed), GFP_NOFS);
706         if (!ed) {
707                 rc = -ENOMEM;
708                 goto out;
709         }
710
711         cleanup = 1;
712         cd = &ed->ed_cl;
713         rc = cl_device_init(cd, t);
714         if (rc)
715                 goto out;
716
717         cd->cd_lu_dev.ld_ops = &echo_device_lu_ops;
718         cd->cd_ops = &echo_device_cl_ops;
719
720         cleanup = 2;
721         obd = class_name2obd(lustre_cfg_string(cfg, 0));
722         LASSERT(obd != NULL);
723         LASSERT(env != NULL);
724
725         tgt = class_name2obd(lustre_cfg_string(cfg, 1));
726         if (tgt == NULL) {
727                 CERROR("Can not find tgt device %s\n",
728                         lustre_cfg_string(cfg, 1));
729                 rc = -ENODEV;
730                 goto out;
731         }
732
733         next = tgt->obd_lu_dev;
734         if (!strcmp(tgt->obd_type->typ_name, LUSTRE_MDT_NAME)) {
735                 CERROR("echo MDT client must be run on server\n");
736                 rc = -EOPNOTSUPP;
737                 goto out;
738         }
739
740         rc = echo_site_init(env, ed);
741         if (rc)
742                 goto out;
743
744         cleanup = 3;
745
746         rc = echo_client_setup(env, obd, cfg);
747         if (rc)
748                 goto out;
749
750         ed->ed_ec = &obd->u.echo_client;
751         cleanup = 4;
752
753         /* if echo client is to be stacked upon ost device, the next is
754          * NULL since ost is not a clio device so far */
755         if (next != NULL && !lu_device_is_cl(next))
756                 next = NULL;
757
758         tgt_type_name = tgt->obd_type->typ_name;
759         if (next != NULL) {
760                 LASSERT(next != NULL);
761                 if (next->ld_site != NULL) {
762                         rc = -EBUSY;
763                         goto out;
764                 }
765
766                 next->ld_site = &ed->ed_site->cs_lu;
767                 rc = next->ld_type->ldt_ops->ldto_device_init(env, next,
768                                                 next->ld_type->ldt_name,
769                                                               NULL);
770                 if (rc)
771                         goto out;
772
773                 /* Tricky case, I have to determine the obd type since
774                  * CLIO uses the different parameters to initialize
775                  * objects for lov & osc. */
776                 if (strcmp(tgt_type_name, LUSTRE_LOV_NAME) == 0)
777                         ed->ed_next_islov = 1;
778                 else
779                         LASSERT(strcmp(tgt_type_name,
780                                        LUSTRE_OSC_NAME) == 0);
781         } else {
782                 LASSERT(strcmp(tgt_type_name, LUSTRE_OST_NAME) == 0);
783         }
784
785         ed->ed_next = next;
786         return &cd->cd_lu_dev;
787 out:
788         switch (cleanup) {
789         case 4: {
790                 int rc2;
791
792                 rc2 = echo_client_cleanup(obd);
793                 if (rc2)
794                         CERROR("Cleanup obd device %s error(%d)\n",
795                                obd->obd_name, rc2);
796         }
797
798         case 3:
799                 echo_site_fini(env, ed);
800         case 2:
801                 cl_device_fini(&ed->ed_cl);
802         case 1:
803                 kfree(ed);
804         case 0:
805         default:
806                 break;
807         }
808         return ERR_PTR(rc);
809 }
810
811 static int echo_device_init(const struct lu_env *env, struct lu_device *d,
812                           const char *name, struct lu_device *next)
813 {
814         LBUG();
815         return 0;
816 }
817
818 static struct lu_device *echo_device_fini(const struct lu_env *env,
819                                           struct lu_device *d)
820 {
821         struct echo_device *ed = cl2echo_dev(lu2cl_dev(d));
822         struct lu_device *next = ed->ed_next;
823
824         while (next)
825                 next = next->ld_type->ldt_ops->ldto_device_fini(env, next);
826         return NULL;
827 }
828
829 static void echo_lock_release(const struct lu_env *env,
830                               struct echo_lock *ecl,
831                               int still_used)
832 {
833         struct cl_lock *clk = echo_lock2cl(ecl);
834
835         cl_lock_get(clk);
836         cl_unuse(env, clk);
837         cl_lock_release(env, clk, "ec enqueue", ecl->el_object);
838         if (!still_used) {
839                 cl_lock_mutex_get(env, clk);
840                 cl_lock_cancel(env, clk);
841                 cl_lock_delete(env, clk);
842                 cl_lock_mutex_put(env, clk);
843         }
844         cl_lock_put(env, clk);
845 }
846
847 static struct lu_device *echo_device_free(const struct lu_env *env,
848                                           struct lu_device *d)
849 {
850         struct echo_device     *ed   = cl2echo_dev(lu2cl_dev(d));
851         struct echo_client_obd *ec   = ed->ed_ec;
852         struct echo_object     *eco;
853         struct lu_device       *next = ed->ed_next;
854
855         CDEBUG(D_INFO, "echo device:%p is going to be freed, next = %p\n",
856                ed, next);
857
858         lu_site_purge(env, &ed->ed_site->cs_lu, -1);
859
860         /* check if there are objects still alive.
861          * It shouldn't have any object because lu_site_purge would cleanup
862          * all of cached objects. Anyway, probably the echo device is being
863          * parallelly accessed.
864          */
865         spin_lock(&ec->ec_lock);
866         list_for_each_entry(eco, &ec->ec_objects, eo_obj_chain)
867                 eco->eo_deleted = 1;
868         spin_unlock(&ec->ec_lock);
869
870         /* purge again */
871         lu_site_purge(env, &ed->ed_site->cs_lu, -1);
872
873         CDEBUG(D_INFO,
874                "Waiting for the reference of echo object to be dropped\n");
875
876         /* Wait for the last reference to be dropped. */
877         spin_lock(&ec->ec_lock);
878         while (!list_empty(&ec->ec_objects)) {
879                 spin_unlock(&ec->ec_lock);
880                 CERROR("echo_client still has objects at cleanup time, wait for 1 second\n");
881                 set_current_state(TASK_UNINTERRUPTIBLE);
882                 schedule_timeout(cfs_time_seconds(1));
883                 lu_site_purge(env, &ed->ed_site->cs_lu, -1);
884                 spin_lock(&ec->ec_lock);
885         }
886         spin_unlock(&ec->ec_lock);
887
888         LASSERT(list_empty(&ec->ec_locks));
889
890         CDEBUG(D_INFO, "No object exists, exiting...\n");
891
892         echo_client_cleanup(d->ld_obd);
893
894         while (next)
895                 next = next->ld_type->ldt_ops->ldto_device_free(env, next);
896
897         LASSERT(ed->ed_site == lu2cl_site(d->ld_site));
898         echo_site_fini(env, ed);
899         cl_device_fini(&ed->ed_cl);
900         kfree(ed);
901
902         return NULL;
903 }
904
905 static const struct lu_device_type_operations echo_device_type_ops = {
906         .ldto_init = echo_type_init,
907         .ldto_fini = echo_type_fini,
908
909         .ldto_start = echo_type_start,
910         .ldto_stop  = echo_type_stop,
911
912         .ldto_device_alloc = echo_device_alloc,
913         .ldto_device_free  = echo_device_free,
914         .ldto_device_init  = echo_device_init,
915         .ldto_device_fini  = echo_device_fini
916 };
917
918 static struct lu_device_type echo_device_type = {
919         .ldt_tags     = LU_DEVICE_CL,
920         .ldt_name     = LUSTRE_ECHO_CLIENT_NAME,
921         .ldt_ops      = &echo_device_type_ops,
922         .ldt_ctx_tags = LCT_CL_THREAD,
923 };
924
925 /** @} echo_init */
926
927 /** \defgroup echo_exports Exported operations
928  *
929  * exporting functions to echo client
930  *
931  * @{
932  */
933
934 /* Interfaces to echo client obd device */
935 static struct echo_object *cl_echo_object_find(struct echo_device *d,
936                                                struct lov_stripe_md **lsmp)
937 {
938         struct lu_env *env;
939         struct echo_thread_info *info;
940         struct echo_object_conf *conf;
941         struct lov_stripe_md    *lsm;
942         struct echo_object *eco;
943         struct cl_object   *obj;
944         struct lu_fid *fid;
945         int refcheck;
946         int rc;
947
948         LASSERT(lsmp);
949         lsm = *lsmp;
950         LASSERT(lsm);
951         LASSERTF(ostid_id(&lsm->lsm_oi) != 0, DOSTID"\n", POSTID(&lsm->lsm_oi));
952         LASSERTF(ostid_seq(&lsm->lsm_oi) == FID_SEQ_ECHO, DOSTID"\n",
953                  POSTID(&lsm->lsm_oi));
954
955         /* Never return an object if the obd is to be freed. */
956         if (echo_dev2cl(d)->cd_lu_dev.ld_obd->obd_stopping)
957                 return ERR_PTR(-ENODEV);
958
959         env = cl_env_get(&refcheck);
960         if (IS_ERR(env))
961                 return (void *)env;
962
963         info = echo_env_info(env);
964         conf = &info->eti_conf;
965         if (d->ed_next) {
966                 if (!d->ed_next_islov) {
967                         struct lov_oinfo *oinfo = lsm->lsm_oinfo[0];
968
969                         LASSERT(oinfo != NULL);
970                         oinfo->loi_oi = lsm->lsm_oi;
971                         conf->eoc_cl.u.coc_oinfo = oinfo;
972                 } else {
973                         struct lustre_md *md;
974
975                         md = &info->eti_md;
976                         memset(md, 0, sizeof(*md));
977                         md->lsm = lsm;
978                         conf->eoc_cl.u.coc_md = md;
979                 }
980         }
981         conf->eoc_md = lsmp;
982
983         fid  = &info->eti_fid;
984         rc = ostid_to_fid(fid, &lsm->lsm_oi, 0);
985         if (rc != 0) {
986                 eco = ERR_PTR(rc);
987                 goto out;
988         }
989
990         /* In the function below, .hs_keycmp resolves to
991          * lu_obj_hop_keycmp() */
992         /* coverity[overrun-buffer-val] */
993         obj = cl_object_find(env, echo_dev2cl(d), fid, &conf->eoc_cl);
994         if (IS_ERR(obj)) {
995                 eco = (void *)obj;
996                 goto out;
997         }
998
999         eco = cl2echo_obj(obj);
1000         if (eco->eo_deleted) {
1001                 cl_object_put(env, obj);
1002                 eco = ERR_PTR(-EAGAIN);
1003         }
1004
1005 out:
1006         cl_env_put(env, &refcheck);
1007         return eco;
1008 }
1009
1010 static int cl_echo_object_put(struct echo_object *eco)
1011 {
1012         struct lu_env *env;
1013         struct cl_object *obj = echo_obj2cl(eco);
1014         int refcheck;
1015
1016         env = cl_env_get(&refcheck);
1017         if (IS_ERR(env))
1018                 return PTR_ERR(env);
1019
1020         /* an external function to kill an object? */
1021         if (eco->eo_deleted) {
1022                 struct lu_object_header *loh = obj->co_lu.lo_header;
1023
1024                 LASSERT(&eco->eo_hdr == luh2coh(loh));
1025                 set_bit(LU_OBJECT_HEARD_BANSHEE, &loh->loh_flags);
1026         }
1027
1028         cl_object_put(env, obj);
1029         cl_env_put(env, &refcheck);
1030         return 0;
1031 }
1032
1033 static int cl_echo_enqueue0(struct lu_env *env, struct echo_object *eco,
1034                             u64 start, u64 end, int mode,
1035                             __u64 *cookie, __u32 enqflags)
1036 {
1037         struct cl_io *io;
1038         struct cl_lock *lck;
1039         struct cl_object *obj;
1040         struct cl_lock_descr *descr;
1041         struct echo_thread_info *info;
1042         int rc = -ENOMEM;
1043
1044         info = echo_env_info(env);
1045         io = &info->eti_io;
1046         descr = &info->eti_descr;
1047         obj = echo_obj2cl(eco);
1048
1049         descr->cld_obj   = obj;
1050         descr->cld_start = cl_index(obj, start);
1051         descr->cld_end   = cl_index(obj, end);
1052         descr->cld_mode  = mode == LCK_PW ? CLM_WRITE : CLM_READ;
1053         descr->cld_enq_flags = enqflags;
1054         io->ci_obj = obj;
1055
1056         lck = cl_lock_request(env, io, descr, "ec enqueue", eco);
1057         if (lck) {
1058                 struct echo_client_obd *ec = eco->eo_dev->ed_ec;
1059                 struct echo_lock *el;
1060
1061                 rc = cl_wait(env, lck);
1062                 if (rc == 0) {
1063                         el = cl2echo_lock(cl_lock_at(lck, &echo_device_type));
1064                         spin_lock(&ec->ec_lock);
1065                         if (list_empty(&el->el_chain)) {
1066                                 list_add(&el->el_chain, &ec->ec_locks);
1067                                 el->el_cookie = ++ec->ec_unique;
1068                         }
1069                         atomic_inc(&el->el_refcount);
1070                         *cookie = el->el_cookie;
1071                         spin_unlock(&ec->ec_lock);
1072                 } else {
1073                         cl_lock_release(env, lck, "ec enqueue", current);
1074                 }
1075         }
1076         return rc;
1077 }
1078
1079 static int cl_echo_enqueue(struct echo_object *eco, u64 start, u64 end,
1080                            int mode, __u64 *cookie)
1081 {
1082         struct echo_thread_info *info;
1083         struct lu_env *env;
1084         struct cl_io *io;
1085         int refcheck;
1086         int result;
1087
1088         env = cl_env_get(&refcheck);
1089         if (IS_ERR(env))
1090                 return PTR_ERR(env);
1091
1092         info = echo_env_info(env);
1093         io = &info->eti_io;
1094
1095         io->ci_ignore_layout = 1;
1096         result = cl_io_init(env, io, CIT_MISC, echo_obj2cl(eco));
1097         if (result < 0)
1098                 goto out;
1099         LASSERT(result == 0);
1100
1101         result = cl_echo_enqueue0(env, eco, start, end, mode, cookie, 0);
1102         cl_io_fini(env, io);
1103
1104 out:
1105         cl_env_put(env, &refcheck);
1106         return result;
1107 }
1108
1109 static int cl_echo_cancel0(struct lu_env *env, struct echo_device *ed,
1110                            __u64 cookie)
1111 {
1112         struct echo_client_obd *ec = ed->ed_ec;
1113         struct echo_lock       *ecl = NULL;
1114         struct list_head             *el;
1115         int found = 0, still_used = 0;
1116
1117         LASSERT(ec != NULL);
1118         spin_lock(&ec->ec_lock);
1119         list_for_each(el, &ec->ec_locks) {
1120                 ecl = list_entry(el, struct echo_lock, el_chain);
1121                 CDEBUG(D_INFO, "ecl: %p, cookie: %#llx\n", ecl, ecl->el_cookie);
1122                 found = (ecl->el_cookie == cookie);
1123                 if (found) {
1124                         if (atomic_dec_and_test(&ecl->el_refcount))
1125                                 list_del_init(&ecl->el_chain);
1126                         else
1127                                 still_used = 1;
1128                         break;
1129                 }
1130         }
1131         spin_unlock(&ec->ec_lock);
1132
1133         if (!found)
1134                 return -ENOENT;
1135
1136         echo_lock_release(env, ecl, still_used);
1137         return 0;
1138 }
1139
1140 static int cl_echo_cancel(struct echo_device *ed, __u64 cookie)
1141 {
1142         struct lu_env *env;
1143         int refcheck;
1144         int rc;
1145
1146         env = cl_env_get(&refcheck);
1147         if (IS_ERR(env))
1148                 return PTR_ERR(env);
1149
1150         rc = cl_echo_cancel0(env, ed, cookie);
1151
1152         cl_env_put(env, &refcheck);
1153         return rc;
1154 }
1155
1156 static int cl_echo_async_brw(const struct lu_env *env, struct cl_io *io,
1157                              enum cl_req_type unused, struct cl_2queue *queue)
1158 {
1159         struct cl_page *clp;
1160         struct cl_page *temp;
1161         int result = 0;
1162
1163         cl_page_list_for_each_safe(clp, temp, &queue->c2_qin) {
1164                 int rc;
1165
1166                 rc = cl_page_cache_add(env, io, clp, CRT_WRITE);
1167                 if (rc == 0)
1168                         continue;
1169                 result = result ?: rc;
1170         }
1171         return result;
1172 }
1173
1174 static int cl_echo_object_brw(struct echo_object *eco, int rw, u64 offset,
1175                               struct page **pages, int npages, int async)
1176 {
1177         struct lu_env      *env;
1178         struct echo_thread_info *info;
1179         struct cl_object        *obj = echo_obj2cl(eco);
1180         struct echo_device      *ed  = eco->eo_dev;
1181         struct cl_2queue        *queue;
1182         struct cl_io        *io;
1183         struct cl_page    *clp;
1184         struct lustre_handle    lh = { 0 };
1185         int page_size = cl_page_size(obj);
1186         int refcheck;
1187         int rc;
1188         int i;
1189
1190         LASSERT((offset & ~CFS_PAGE_MASK) == 0);
1191         LASSERT(ed->ed_next != NULL);
1192         env = cl_env_get(&refcheck);
1193         if (IS_ERR(env))
1194                 return PTR_ERR(env);
1195
1196         info    = echo_env_info(env);
1197         io      = &info->eti_io;
1198         queue   = &info->eti_queue;
1199
1200         cl_2queue_init(queue);
1201
1202         io->ci_ignore_layout = 1;
1203         rc = cl_io_init(env, io, CIT_MISC, obj);
1204         if (rc < 0)
1205                 goto out;
1206         LASSERT(rc == 0);
1207
1208         rc = cl_echo_enqueue0(env, eco, offset,
1209                               offset + npages * PAGE_CACHE_SIZE - 1,
1210                               rw == READ ? LCK_PR : LCK_PW, &lh.cookie,
1211                               CEF_NEVER);
1212         if (rc < 0)
1213                 goto error_lock;
1214
1215         for (i = 0; i < npages; i++) {
1216                 LASSERT(pages[i]);
1217                 clp = cl_page_find(env, obj, cl_index(obj, offset),
1218                                    pages[i], CPT_TRANSIENT);
1219                 if (IS_ERR(clp)) {
1220                         rc = PTR_ERR(clp);
1221                         break;
1222                 }
1223                 LASSERT(clp->cp_type == CPT_TRANSIENT);
1224
1225                 rc = cl_page_own(env, io, clp);
1226                 if (rc) {
1227                         LASSERT(clp->cp_state == CPS_FREEING);
1228                         cl_page_put(env, clp);
1229                         break;
1230                 }
1231                 /*
1232                  * Add a page to the incoming page list of 2-queue.
1233                  */
1234                 cl_page_list_add(&queue->c2_qin, clp);
1235
1236                 /* drop the reference count for cl_page_find, so that the page
1237                  * will be freed in cl_2queue_fini. */
1238                 cl_page_put(env, clp);
1239                 cl_page_clip(env, clp, 0, page_size);
1240
1241                 offset += page_size;
1242         }
1243
1244         if (rc == 0) {
1245                 enum cl_req_type typ = rw == READ ? CRT_READ : CRT_WRITE;
1246
1247                 async = async && (typ == CRT_WRITE);
1248                 if (async)
1249                         rc = cl_echo_async_brw(env, io, typ, queue);
1250                 else
1251                         rc = cl_io_submit_sync(env, io, typ, queue, 0);
1252                 CDEBUG(D_INFO, "echo_client %s write returns %d\n",
1253                        async ? "async" : "sync", rc);
1254         }
1255
1256         cl_echo_cancel0(env, ed, lh.cookie);
1257 error_lock:
1258         cl_2queue_discard(env, io, queue);
1259         cl_2queue_disown(env, io, queue);
1260         cl_2queue_fini(env, queue);
1261         cl_io_fini(env, io);
1262 out:
1263         cl_env_put(env, &refcheck);
1264         return rc;
1265 }
1266
1267 /** @} echo_exports */
1268
1269 static u64 last_object_id;
1270
1271 static int
1272 echo_copyout_lsm(struct lov_stripe_md *lsm, void *_ulsm, int ulsm_nob)
1273 {
1274         struct lov_stripe_md *ulsm = _ulsm;
1275         struct lov_oinfo **p;
1276         int nob, i;
1277
1278         nob = offsetof(struct lov_stripe_md, lsm_oinfo[lsm->lsm_stripe_count]);
1279         if (nob > ulsm_nob)
1280                 return -EINVAL;
1281
1282         if (copy_to_user(ulsm, lsm, sizeof(*ulsm)))
1283                 return -EFAULT;
1284
1285         for (i = 0, p = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++, p++) {
1286                 struct lov_oinfo __user *up;
1287                 if (get_user(up, ulsm->lsm_oinfo + i) ||
1288                     copy_to_user(up, *p, sizeof(struct lov_oinfo)))
1289                         return -EFAULT;
1290         }
1291         return 0;
1292 }
1293
1294 static int
1295 echo_copyin_lsm(struct echo_device *ed, struct lov_stripe_md *lsm,
1296                 struct lov_stripe_md __user *ulsm, int ulsm_nob)
1297 {
1298         struct echo_client_obd *ec = ed->ed_ec;
1299         struct lov_oinfo **p;
1300         int                  i;
1301
1302         if (ulsm_nob < sizeof(*lsm))
1303                 return -EINVAL;
1304
1305         if (copy_from_user(lsm, ulsm, sizeof(*lsm)))
1306                 return -EFAULT;
1307
1308         if (lsm->lsm_stripe_count > ec->ec_nstripes ||
1309             lsm->lsm_magic != LOV_MAGIC ||
1310             (lsm->lsm_stripe_size & (~CFS_PAGE_MASK)) != 0 ||
1311             ((__u64)lsm->lsm_stripe_size * lsm->lsm_stripe_count > ~0UL))
1312                 return -EINVAL;
1313
1314         for (i = 0, p = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++, p++) {
1315                 struct lov_oinfo __user *up;
1316                 if (get_user(up, ulsm->lsm_oinfo + i) ||
1317                     copy_from_user(*p, up, sizeof(struct lov_oinfo)))
1318                         return -EFAULT;
1319         }
1320         return 0;
1321 }
1322
1323 static int echo_create_object(const struct lu_env *env, struct echo_device *ed,
1324                               int on_target, struct obdo *oa, void *ulsm,
1325                               int ulsm_nob, struct obd_trans_info *oti)
1326 {
1327         struct echo_object     *eco;
1328         struct echo_client_obd *ec = ed->ed_ec;
1329         struct lov_stripe_md   *lsm = NULL;
1330         int                  rc;
1331         int                  created = 0;
1332
1333         if ((oa->o_valid & OBD_MD_FLID) == 0 && /* no obj id */
1334             (on_target ||                      /* set_stripe */
1335              ec->ec_nstripes != 0)) {      /* LOV */
1336                 CERROR("No valid oid\n");
1337                 return -EINVAL;
1338         }
1339
1340         rc = echo_alloc_memmd(ed, &lsm);
1341         if (rc < 0) {
1342                 CERROR("Cannot allocate md: rc = %d\n", rc);
1343                 goto failed;
1344         }
1345
1346         if (ulsm != NULL) {
1347                 int i, idx;
1348
1349                 rc = echo_copyin_lsm(ed, lsm, ulsm, ulsm_nob);
1350                 if (rc != 0)
1351                         goto failed;
1352
1353                 if (lsm->lsm_stripe_count == 0)
1354                         lsm->lsm_stripe_count = ec->ec_nstripes;
1355
1356                 if (lsm->lsm_stripe_size == 0)
1357                         lsm->lsm_stripe_size = PAGE_CACHE_SIZE;
1358
1359                 idx = cfs_rand();
1360
1361                 /* setup stripes: indices + default ids if required */
1362                 for (i = 0; i < lsm->lsm_stripe_count; i++) {
1363                         if (ostid_id(&lsm->lsm_oinfo[i]->loi_oi) == 0)
1364                                 lsm->lsm_oinfo[i]->loi_oi = lsm->lsm_oi;
1365
1366                         lsm->lsm_oinfo[i]->loi_ost_idx =
1367                                 (idx + i) % ec->ec_nstripes;
1368                 }
1369         }
1370
1371         /* setup object ID here for !on_target and LOV hint */
1372         if (oa->o_valid & OBD_MD_FLID) {
1373                 LASSERT(oa->o_valid & OBD_MD_FLGROUP);
1374                 lsm->lsm_oi = oa->o_oi;
1375         }
1376
1377         if (ostid_id(&lsm->lsm_oi) == 0)
1378                 ostid_set_id(&lsm->lsm_oi, ++last_object_id);
1379
1380         rc = 0;
1381         if (on_target) {
1382                 /* Only echo objects are allowed to be created */
1383                 LASSERT((oa->o_valid & OBD_MD_FLGROUP) &&
1384                         (ostid_seq(&oa->o_oi) == FID_SEQ_ECHO));
1385                 rc = obd_create(env, ec->ec_exp, oa, &lsm, oti);
1386                 if (rc != 0) {
1387                         CERROR("Cannot create objects: rc = %d\n", rc);
1388                         goto failed;
1389                 }
1390                 created = 1;
1391         }
1392
1393         /* See what object ID we were given */
1394         oa->o_oi = lsm->lsm_oi;
1395         oa->o_valid |= OBD_MD_FLID;
1396
1397         eco = cl_echo_object_find(ed, &lsm);
1398         if (IS_ERR(eco)) {
1399                 rc = PTR_ERR(eco);
1400                 goto failed;
1401         }
1402         cl_echo_object_put(eco);
1403
1404         CDEBUG(D_INFO, "oa oid "DOSTID"\n", POSTID(&oa->o_oi));
1405
1406  failed:
1407         if (created && rc)
1408                 obd_destroy(env, ec->ec_exp, oa, lsm, oti, NULL);
1409         if (lsm)
1410                 echo_free_memmd(ed, &lsm);
1411         if (rc)
1412                 CERROR("create object failed with: rc = %d\n", rc);
1413         return rc;
1414 }
1415
1416 static int echo_get_object(struct echo_object **ecop, struct echo_device *ed,
1417                            struct obdo *oa)
1418 {
1419         struct lov_stripe_md   *lsm = NULL;
1420         struct echo_object     *eco;
1421         int                  rc;
1422
1423         if ((oa->o_valid & OBD_MD_FLID) == 0 || ostid_id(&oa->o_oi) == 0) {
1424                 /* disallow use of object id 0 */
1425                 CERROR("No valid oid\n");
1426                 return -EINVAL;
1427         }
1428
1429         rc = echo_alloc_memmd(ed, &lsm);
1430         if (rc < 0)
1431                 return rc;
1432
1433         lsm->lsm_oi = oa->o_oi;
1434         if (!(oa->o_valid & OBD_MD_FLGROUP))
1435                 ostid_set_seq_echo(&lsm->lsm_oi);
1436
1437         rc = 0;
1438         eco = cl_echo_object_find(ed, &lsm);
1439         if (!IS_ERR(eco))
1440                 *ecop = eco;
1441         else
1442                 rc = PTR_ERR(eco);
1443         if (lsm)
1444                 echo_free_memmd(ed, &lsm);
1445         return rc;
1446 }
1447
1448 static void echo_put_object(struct echo_object *eco)
1449 {
1450         if (cl_echo_object_put(eco))
1451                 CERROR("echo client: drop an object failed");
1452 }
1453
1454 static void
1455 echo_get_stripe_off_id(struct lov_stripe_md *lsm, u64 *offp, u64 *idp)
1456 {
1457         unsigned long stripe_count;
1458         unsigned long stripe_size;
1459         unsigned long width;
1460         unsigned long woffset;
1461         int        stripe_index;
1462         u64       offset;
1463
1464         if (lsm->lsm_stripe_count <= 1)
1465                 return;
1466
1467         offset       = *offp;
1468         stripe_size  = lsm->lsm_stripe_size;
1469         stripe_count = lsm->lsm_stripe_count;
1470
1471         /* width = # bytes in all stripes */
1472         width = stripe_size * stripe_count;
1473
1474         /* woffset = offset within a width; offset = whole number of widths */
1475         woffset = do_div(offset, width);
1476
1477         stripe_index = woffset / stripe_size;
1478
1479         *idp = ostid_id(&lsm->lsm_oinfo[stripe_index]->loi_oi);
1480         *offp = offset * stripe_size + woffset % stripe_size;
1481 }
1482
1483 static void
1484 echo_client_page_debug_setup(struct lov_stripe_md *lsm,
1485                              struct page *page, int rw, u64 id,
1486                              u64 offset, u64 count)
1487 {
1488         char    *addr;
1489         u64      stripe_off;
1490         u64      stripe_id;
1491         int      delta;
1492
1493         /* no partial pages on the client */
1494         LASSERT(count == PAGE_CACHE_SIZE);
1495
1496         addr = kmap(page);
1497
1498         for (delta = 0; delta < PAGE_CACHE_SIZE; delta += OBD_ECHO_BLOCK_SIZE) {
1499                 if (rw == OBD_BRW_WRITE) {
1500                         stripe_off = offset + delta;
1501                         stripe_id = id;
1502                         echo_get_stripe_off_id(lsm, &stripe_off, &stripe_id);
1503                 } else {
1504                         stripe_off = 0xdeadbeef00c0ffeeULL;
1505                         stripe_id = 0xdeadbeef00c0ffeeULL;
1506                 }
1507                 block_debug_setup(addr + delta, OBD_ECHO_BLOCK_SIZE,
1508                                   stripe_off, stripe_id);
1509         }
1510
1511         kunmap(page);
1512 }
1513
1514 static int echo_client_page_debug_check(struct lov_stripe_md *lsm,
1515                                         struct page *page, u64 id,
1516                                         u64 offset, u64 count)
1517 {
1518         u64     stripe_off;
1519         u64     stripe_id;
1520         char   *addr;
1521         int     delta;
1522         int     rc;
1523         int     rc2;
1524
1525         /* no partial pages on the client */
1526         LASSERT(count == PAGE_CACHE_SIZE);
1527
1528         addr = kmap(page);
1529
1530         for (rc = delta = 0; delta < PAGE_CACHE_SIZE; delta += OBD_ECHO_BLOCK_SIZE) {
1531                 stripe_off = offset + delta;
1532                 stripe_id = id;
1533                 echo_get_stripe_off_id(lsm, &stripe_off, &stripe_id);
1534
1535                 rc2 = block_debug_check("test_brw",
1536                                         addr + delta, OBD_ECHO_BLOCK_SIZE,
1537                                         stripe_off, stripe_id);
1538                 if (rc2 != 0) {
1539                         CERROR("Error in echo object %#llx\n", id);
1540                         rc = rc2;
1541                 }
1542         }
1543
1544         kunmap(page);
1545         return rc;
1546 }
1547
1548 static int echo_client_kbrw(struct echo_device *ed, int rw, struct obdo *oa,
1549                             struct echo_object *eco, u64 offset,
1550                             u64 count, int async,
1551                             struct obd_trans_info *oti)
1552 {
1553         struct lov_stripe_md   *lsm = eco->eo_lsm;
1554         u32            npages;
1555         struct brw_page *pga;
1556         struct brw_page *pgp;
1557         struct page         **pages;
1558         u64              off;
1559         int                  i;
1560         int                  rc;
1561         int                  verify;
1562         gfp_t                gfp_mask;
1563         int                  brw_flags = 0;
1564
1565         verify = (ostid_id(&oa->o_oi) != ECHO_PERSISTENT_OBJID &&
1566                   (oa->o_valid & OBD_MD_FLFLAGS) != 0 &&
1567                   (oa->o_flags & OBD_FL_DEBUG_CHECK) != 0);
1568
1569         gfp_mask = ((ostid_id(&oa->o_oi) & 2) == 0) ? GFP_KERNEL : GFP_HIGHUSER;
1570
1571         LASSERT(rw == OBD_BRW_WRITE || rw == OBD_BRW_READ);
1572         LASSERT(lsm != NULL);
1573         LASSERT(ostid_id(&lsm->lsm_oi) == ostid_id(&oa->o_oi));
1574
1575         if (count <= 0 ||
1576             (count & (~CFS_PAGE_MASK)) != 0)
1577                 return -EINVAL;
1578
1579         /* XXX think again with misaligned I/O */
1580         npages = count >> PAGE_CACHE_SHIFT;
1581
1582         if (rw == OBD_BRW_WRITE)
1583                 brw_flags = OBD_BRW_ASYNC;
1584
1585         pga = kcalloc(npages, sizeof(*pga), GFP_NOFS);
1586         if (pga == NULL)
1587                 return -ENOMEM;
1588
1589         pages = kcalloc(npages, sizeof(*pages), GFP_NOFS);
1590         if (pages == NULL) {
1591                 kfree(pga);
1592                 return -ENOMEM;
1593         }
1594
1595         for (i = 0, pgp = pga, off = offset;
1596              i < npages;
1597              i++, pgp++, off += PAGE_CACHE_SIZE) {
1598
1599                 LASSERT(pgp->pg == NULL);      /* for cleanup */
1600
1601                 rc = -ENOMEM;
1602                 pgp->pg = alloc_page(gfp_mask);
1603                 if (pgp->pg == NULL)
1604                         goto out;
1605
1606                 pages[i] = pgp->pg;
1607                 pgp->count = PAGE_CACHE_SIZE;
1608                 pgp->off = off;
1609                 pgp->flag = brw_flags;
1610
1611                 if (verify)
1612                         echo_client_page_debug_setup(lsm, pgp->pg, rw,
1613                                                      ostid_id(&oa->o_oi), off,
1614                                                      pgp->count);
1615         }
1616
1617         /* brw mode can only be used at client */
1618         LASSERT(ed->ed_next != NULL);
1619         rc = cl_echo_object_brw(eco, rw, offset, pages, npages, async);
1620
1621  out:
1622         if (rc != 0 || rw != OBD_BRW_READ)
1623                 verify = 0;
1624
1625         for (i = 0, pgp = pga; i < npages; i++, pgp++) {
1626                 if (pgp->pg == NULL)
1627                         continue;
1628
1629                 if (verify) {
1630                         int vrc;
1631
1632                         vrc = echo_client_page_debug_check(lsm, pgp->pg,
1633                                                            ostid_id(&oa->o_oi),
1634                                                            pgp->off, pgp->count);
1635                         if (vrc != 0 && rc == 0)
1636                                 rc = vrc;
1637                 }
1638                 __free_page(pgp->pg);
1639         }
1640         kfree(pga);
1641         kfree(pages);
1642         return rc;
1643 }
1644
1645 static int echo_client_prep_commit(const struct lu_env *env,
1646                                    struct obd_export *exp, int rw,
1647                                    struct obdo *oa, struct echo_object *eco,
1648                                    u64 offset, u64 count,
1649                                    u64 batch, struct obd_trans_info *oti,
1650                                    int async)
1651 {
1652         struct lov_stripe_md *lsm = eco->eo_lsm;
1653         struct obd_ioobj ioo;
1654         struct niobuf_local *lnb;
1655         struct niobuf_remote *rnb;
1656         u64 off;
1657         u64 npages, tot_pages;
1658         int i, ret = 0, brw_flags = 0;
1659
1660         if (count <= 0 || (count & (~CFS_PAGE_MASK)) != 0 ||
1661             (lsm != NULL && ostid_id(&lsm->lsm_oi) != ostid_id(&oa->o_oi)))
1662                 return -EINVAL;
1663
1664         npages = batch >> PAGE_CACHE_SHIFT;
1665         tot_pages = count >> PAGE_CACHE_SHIFT;
1666
1667         lnb = kcalloc(npages, sizeof(struct niobuf_local), GFP_NOFS);
1668         rnb = kcalloc(npages, sizeof(struct niobuf_remote), GFP_NOFS);
1669
1670         if (lnb == NULL || rnb == NULL) {
1671                 ret = -ENOMEM;
1672                 goto out;
1673         }
1674
1675         if (rw == OBD_BRW_WRITE && async)
1676                 brw_flags |= OBD_BRW_ASYNC;
1677
1678         obdo_to_ioobj(oa, &ioo);
1679
1680         off = offset;
1681
1682         for (; tot_pages; tot_pages -= npages) {
1683                 int lpages;
1684
1685                 if (tot_pages < npages)
1686                         npages = tot_pages;
1687
1688                 for (i = 0; i < npages; i++, off += PAGE_CACHE_SIZE) {
1689                         rnb[i].offset = off;
1690                         rnb[i].len = PAGE_CACHE_SIZE;
1691                         rnb[i].flags = brw_flags;
1692                 }
1693
1694                 ioo.ioo_bufcnt = npages;
1695                 oti->oti_transno = 0;
1696
1697                 lpages = npages;
1698                 ret = obd_preprw(env, rw, exp, oa, 1, &ioo, rnb, &lpages,
1699                                  lnb, oti);
1700                 if (ret != 0)
1701                         goto out;
1702                 LASSERT(lpages == npages);
1703
1704                 for (i = 0; i < lpages; i++) {
1705                         struct page *page = lnb[i].page;
1706
1707                         /* read past eof? */
1708                         if (page == NULL && lnb[i].rc == 0)
1709                                 continue;
1710
1711                         if (async)
1712                                 lnb[i].flags |= OBD_BRW_ASYNC;
1713
1714                         if (ostid_id(&oa->o_oi) == ECHO_PERSISTENT_OBJID ||
1715                             (oa->o_valid & OBD_MD_FLFLAGS) == 0 ||
1716                             (oa->o_flags & OBD_FL_DEBUG_CHECK) == 0)
1717                                 continue;
1718
1719                         if (rw == OBD_BRW_WRITE)
1720                                 echo_client_page_debug_setup(lsm, page, rw,
1721                                                             ostid_id(&oa->o_oi),
1722                                                              rnb[i].offset,
1723                                                              rnb[i].len);
1724                         else
1725                                 echo_client_page_debug_check(lsm, page,
1726                                                             ostid_id(&oa->o_oi),
1727                                                              rnb[i].offset,
1728                                                              rnb[i].len);
1729                 }
1730
1731                 ret = obd_commitrw(env, rw, exp, oa, 1, &ioo,
1732                                    rnb, npages, lnb, oti, ret);
1733                 if (ret != 0)
1734                         goto out;
1735
1736                 /* Reset oti otherwise it would confuse ldiskfs. */
1737                 memset(oti, 0, sizeof(*oti));
1738
1739                 /* Reuse env context. */
1740                 lu_context_exit((struct lu_context *)&env->le_ctx);
1741                 lu_context_enter((struct lu_context *)&env->le_ctx);
1742         }
1743
1744 out:
1745         kfree(lnb);
1746         kfree(rnb);
1747         return ret;
1748 }
1749
1750 static int echo_client_brw_ioctl(const struct lu_env *env, int rw,
1751                                  struct obd_export *exp,
1752                                  struct obd_ioctl_data *data,
1753                                  struct obd_trans_info *dummy_oti)
1754 {
1755         struct obd_device *obd = class_exp2obd(exp);
1756         struct echo_device *ed = obd2echo_dev(obd);
1757         struct echo_client_obd *ec = ed->ed_ec;
1758         struct obdo *oa = &data->ioc_obdo1;
1759         struct echo_object *eco;
1760         int rc;
1761         int async = 1;
1762         long test_mode;
1763
1764         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
1765
1766         rc = echo_get_object(&eco, ed, oa);
1767         if (rc)
1768                 return rc;
1769
1770         oa->o_valid &= ~OBD_MD_FLHANDLE;
1771
1772         /* OFD/obdfilter works only via prep/commit */
1773         test_mode = (long)data->ioc_pbuf1;
1774         if (test_mode == 1)
1775                 async = 0;
1776
1777         if (ed->ed_next == NULL && test_mode != 3) {
1778                 test_mode = 3;
1779                 data->ioc_plen1 = data->ioc_count;
1780         }
1781
1782         /* Truncate batch size to maximum */
1783         if (data->ioc_plen1 > PTLRPC_MAX_BRW_SIZE)
1784                 data->ioc_plen1 = PTLRPC_MAX_BRW_SIZE;
1785
1786         switch (test_mode) {
1787         case 1:
1788                 /* fall through */
1789         case 2:
1790                 rc = echo_client_kbrw(ed, rw, oa,
1791                                       eco, data->ioc_offset,
1792                                       data->ioc_count, async, dummy_oti);
1793                 break;
1794         case 3:
1795                 rc = echo_client_prep_commit(env, ec->ec_exp, rw, oa,
1796                                              eco, data->ioc_offset,
1797                                              data->ioc_count, data->ioc_plen1,
1798                                              dummy_oti, async);
1799                 break;
1800         default:
1801                 rc = -EINVAL;
1802         }
1803         echo_put_object(eco);
1804         return rc;
1805 }
1806
1807 static int
1808 echo_client_enqueue(struct obd_export *exp, struct obdo *oa,
1809                     int mode, u64 offset, u64 nob)
1810 {
1811         struct echo_device     *ed = obd2echo_dev(exp->exp_obd);
1812         struct lustre_handle   *ulh = &oa->o_handle;
1813         struct echo_object     *eco;
1814         u64              end;
1815         int                  rc;
1816
1817         if (ed->ed_next == NULL)
1818                 return -EOPNOTSUPP;
1819
1820         if (!(mode == LCK_PR || mode == LCK_PW))
1821                 return -EINVAL;
1822
1823         if ((offset & (~CFS_PAGE_MASK)) != 0 ||
1824             (nob & (~CFS_PAGE_MASK)) != 0)
1825                 return -EINVAL;
1826
1827         rc = echo_get_object(&eco, ed, oa);
1828         if (rc != 0)
1829                 return rc;
1830
1831         end = (nob == 0) ? ((u64) -1) : (offset + nob - 1);
1832         rc = cl_echo_enqueue(eco, offset, end, mode, &ulh->cookie);
1833         if (rc == 0) {
1834                 oa->o_valid |= OBD_MD_FLHANDLE;
1835                 CDEBUG(D_INFO, "Cookie is %#llx\n", ulh->cookie);
1836         }
1837         echo_put_object(eco);
1838         return rc;
1839 }
1840
1841 static int
1842 echo_client_cancel(struct obd_export *exp, struct obdo *oa)
1843 {
1844         struct echo_device *ed     = obd2echo_dev(exp->exp_obd);
1845         __u64          cookie = oa->o_handle.cookie;
1846
1847         if ((oa->o_valid & OBD_MD_FLHANDLE) == 0)
1848                 return -EINVAL;
1849
1850         CDEBUG(D_INFO, "Cookie is %#llx\n", cookie);
1851         return cl_echo_cancel(ed, cookie);
1852 }
1853
1854 static int
1855 echo_client_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
1856                       void *karg, void *uarg)
1857 {
1858         struct obd_device      *obd = exp->exp_obd;
1859         struct echo_device     *ed = obd2echo_dev(obd);
1860         struct echo_client_obd *ec = ed->ed_ec;
1861         struct echo_object     *eco;
1862         struct obd_ioctl_data  *data = karg;
1863         struct obd_trans_info   dummy_oti;
1864         struct lu_env     *env;
1865         struct oti_req_ack_lock *ack_lock;
1866         struct obdo         *oa;
1867         struct lu_fid      fid;
1868         int                  rw = OBD_BRW_READ;
1869         int                  rc = 0;
1870         int                  i;
1871
1872         memset(&dummy_oti, 0, sizeof(dummy_oti));
1873
1874         oa = &data->ioc_obdo1;
1875         if (!(oa->o_valid & OBD_MD_FLGROUP)) {
1876                 oa->o_valid |= OBD_MD_FLGROUP;
1877                 ostid_set_seq_echo(&oa->o_oi);
1878         }
1879
1880         /* This FID is unpacked just for validation at this point */
1881         rc = ostid_to_fid(&fid, &oa->o_oi, 0);
1882         if (rc < 0)
1883                 return rc;
1884
1885         env = kzalloc(sizeof(*env), GFP_NOFS);
1886         if (!env)
1887                 return -ENOMEM;
1888
1889         rc = lu_env_init(env, LCT_DT_THREAD);
1890         if (rc) {
1891                 rc = -ENOMEM;
1892                 goto out;
1893         }
1894
1895         switch (cmd) {
1896         case OBD_IOC_CREATE:                /* may create echo object */
1897                 if (!capable(CFS_CAP_SYS_ADMIN)) {
1898                         rc = -EPERM;
1899                         goto out;
1900                 }
1901
1902                 rc = echo_create_object(env, ed, 1, oa, data->ioc_pbuf1,
1903                                         data->ioc_plen1, &dummy_oti);
1904                 goto out;
1905
1906         case OBD_IOC_DESTROY:
1907                 if (!capable(CFS_CAP_SYS_ADMIN)) {
1908                         rc = -EPERM;
1909                         goto out;
1910                 }
1911
1912                 rc = echo_get_object(&eco, ed, oa);
1913                 if (rc == 0) {
1914                         rc = obd_destroy(env, ec->ec_exp, oa, eco->eo_lsm,
1915                                          &dummy_oti, NULL);
1916                         if (rc == 0)
1917                                 eco->eo_deleted = 1;
1918                         echo_put_object(eco);
1919                 }
1920                 goto out;
1921
1922         case OBD_IOC_GETATTR:
1923                 rc = echo_get_object(&eco, ed, oa);
1924                 if (rc == 0) {
1925                         struct obd_info oinfo = { };
1926
1927                         oinfo.oi_md = eco->eo_lsm;
1928                         oinfo.oi_oa = oa;
1929                         rc = obd_getattr(env, ec->ec_exp, &oinfo);
1930                         echo_put_object(eco);
1931                 }
1932                 goto out;
1933
1934         case OBD_IOC_SETATTR:
1935                 if (!capable(CFS_CAP_SYS_ADMIN)) {
1936                         rc = -EPERM;
1937                         goto out;
1938                 }
1939
1940                 rc = echo_get_object(&eco, ed, oa);
1941                 if (rc == 0) {
1942                         struct obd_info oinfo = { };
1943
1944                         oinfo.oi_oa = oa;
1945                         oinfo.oi_md = eco->eo_lsm;
1946
1947                         rc = obd_setattr(env, ec->ec_exp, &oinfo, NULL);
1948                         echo_put_object(eco);
1949                 }
1950                 goto out;
1951
1952         case OBD_IOC_BRW_WRITE:
1953                 if (!capable(CFS_CAP_SYS_ADMIN)) {
1954                         rc = -EPERM;
1955                         goto out;
1956                 }
1957
1958                 rw = OBD_BRW_WRITE;
1959                 /* fall through */
1960         case OBD_IOC_BRW_READ:
1961                 rc = echo_client_brw_ioctl(env, rw, exp, data, &dummy_oti);
1962                 goto out;
1963
1964         case ECHO_IOC_GET_STRIPE:
1965                 rc = echo_get_object(&eco, ed, oa);
1966                 if (rc == 0) {
1967                         rc = echo_copyout_lsm(eco->eo_lsm, data->ioc_pbuf1,
1968                                               data->ioc_plen1);
1969                         echo_put_object(eco);
1970                 }
1971                 goto out;
1972
1973         case ECHO_IOC_SET_STRIPE:
1974                 if (!capable(CFS_CAP_SYS_ADMIN)) {
1975                         rc = -EPERM;
1976                         goto out;
1977                 }
1978
1979                 if (data->ioc_pbuf1 == NULL) {  /* unset */
1980                         rc = echo_get_object(&eco, ed, oa);
1981                         if (rc == 0) {
1982                                 eco->eo_deleted = 1;
1983                                 echo_put_object(eco);
1984                         }
1985                 } else {
1986                         rc = echo_create_object(env, ed, 0, oa,
1987                                                 data->ioc_pbuf1,
1988                                                 data->ioc_plen1, &dummy_oti);
1989                 }
1990                 goto out;
1991
1992         case ECHO_IOC_ENQUEUE:
1993                 if (!capable(CFS_CAP_SYS_ADMIN)) {
1994                         rc = -EPERM;
1995                         goto out;
1996                 }
1997
1998                 rc = echo_client_enqueue(exp, oa,
1999                                          data->ioc_conn1, /* lock mode */
2000                                          data->ioc_offset,
2001                                          data->ioc_count);/*extent*/
2002                 goto out;
2003
2004         case ECHO_IOC_CANCEL:
2005                 rc = echo_client_cancel(exp, oa);
2006                 goto out;
2007
2008         default:
2009                 CERROR("echo_ioctl(): unrecognised ioctl %#x\n", cmd);
2010                 rc = -ENOTTY;
2011                 goto out;
2012         }
2013
2014 out:
2015         lu_env_fini(env);
2016         kfree(env);
2017
2018         /* XXX this should be in a helper also called by target_send_reply */
2019         for (ack_lock = dummy_oti.oti_ack_locks, i = 0; i < 4;
2020              i++, ack_lock++) {
2021                 if (!ack_lock->mode)
2022                         break;
2023                 ldlm_lock_decref(&ack_lock->lock, ack_lock->mode);
2024         }
2025
2026         return rc;
2027 }
2028
2029 static int echo_client_setup(const struct lu_env *env,
2030                              struct obd_device *obddev, struct lustre_cfg *lcfg)
2031 {
2032         struct echo_client_obd *ec = &obddev->u.echo_client;
2033         struct obd_device *tgt;
2034         struct obd_uuid echo_uuid = { "ECHO_UUID" };
2035         struct obd_connect_data *ocd = NULL;
2036         int rc;
2037
2038         if (lcfg->lcfg_bufcount < 2 || LUSTRE_CFG_BUFLEN(lcfg, 1) < 1) {
2039                 CERROR("requires a TARGET OBD name\n");
2040                 return -EINVAL;
2041         }
2042
2043         tgt = class_name2obd(lustre_cfg_string(lcfg, 1));
2044         if (!tgt || !tgt->obd_attached || !tgt->obd_set_up) {
2045                 CERROR("device not attached or not set up (%s)\n",
2046                        lustre_cfg_string(lcfg, 1));
2047                 return -EINVAL;
2048         }
2049
2050         spin_lock_init(&ec->ec_lock);
2051         INIT_LIST_HEAD(&ec->ec_objects);
2052         INIT_LIST_HEAD(&ec->ec_locks);
2053         ec->ec_unique = 0;
2054         ec->ec_nstripes = 0;
2055
2056         ocd = kzalloc(sizeof(*ocd), GFP_NOFS);
2057         if (!ocd) {
2058                 CERROR("Can't alloc ocd connecting to %s\n",
2059                        lustre_cfg_string(lcfg, 1));
2060                 return -ENOMEM;
2061         }
2062
2063         ocd->ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_REQPORTAL |
2064                                  OBD_CONNECT_BRW_SIZE |
2065                                  OBD_CONNECT_GRANT | OBD_CONNECT_FULL20 |
2066                                  OBD_CONNECT_64BITHASH | OBD_CONNECT_LVB_TYPE |
2067                                  OBD_CONNECT_FID;
2068         ocd->ocd_brw_size = DT_MAX_BRW_SIZE;
2069         ocd->ocd_version = LUSTRE_VERSION_CODE;
2070         ocd->ocd_group = FID_SEQ_ECHO;
2071
2072         rc = obd_connect(env, &ec->ec_exp, tgt, &echo_uuid, ocd, NULL);
2073
2074         kfree(ocd);
2075
2076         if (rc != 0) {
2077                 CERROR("fail to connect to device %s\n",
2078                        lustre_cfg_string(lcfg, 1));
2079                 return rc;
2080         }
2081
2082         return rc;
2083 }
2084
2085 static int echo_client_cleanup(struct obd_device *obddev)
2086 {
2087         struct echo_client_obd *ec = &obddev->u.echo_client;
2088         int rc;
2089
2090         if (!list_empty(&obddev->obd_exports)) {
2091                 CERROR("still has clients!\n");
2092                 return -EBUSY;
2093         }
2094
2095         LASSERT(atomic_read(&ec->ec_exp->exp_refcount) > 0);
2096         rc = obd_disconnect(ec->ec_exp);
2097         if (rc != 0)
2098                 CERROR("fail to disconnect device: %d\n", rc);
2099
2100         return rc;
2101 }
2102
2103 static int echo_client_connect(const struct lu_env *env,
2104                                struct obd_export **exp,
2105                                struct obd_device *src, struct obd_uuid *cluuid,
2106                                struct obd_connect_data *data, void *localdata)
2107 {
2108         int             rc;
2109         struct lustre_handle conn = { 0 };
2110
2111         rc = class_connect(&conn, src, cluuid);
2112         if (rc == 0) {
2113                 *exp = class_conn2export(&conn);
2114         }
2115
2116         return rc;
2117 }
2118
2119 static int echo_client_disconnect(struct obd_export *exp)
2120 {
2121         int                  rc;
2122
2123         if (exp == NULL) {
2124                 rc = -EINVAL;
2125                 goto out;
2126         }
2127
2128         rc = class_disconnect(exp);
2129         goto out;
2130  out:
2131         return rc;
2132 }
2133
2134 static struct obd_ops echo_client_obd_ops = {
2135         .owner          = THIS_MODULE,
2136         .iocontrol      = echo_client_iocontrol,
2137         .connect        = echo_client_connect,
2138         .disconnect     = echo_client_disconnect
2139 };
2140
2141 static int echo_client_init(void)
2142 {
2143         int rc;
2144
2145         rc = lu_kmem_init(echo_caches);
2146         if (rc == 0) {
2147                 rc = class_register_type(&echo_client_obd_ops, NULL,
2148                                          LUSTRE_ECHO_CLIENT_NAME,
2149                                          &echo_device_type);
2150                 if (rc)
2151                         lu_kmem_fini(echo_caches);
2152         }
2153         return rc;
2154 }
2155
2156 static void echo_client_exit(void)
2157 {
2158         class_unregister_type(LUSTRE_ECHO_CLIENT_NAME);
2159         lu_kmem_fini(echo_caches);
2160 }
2161
2162 static int __init obdecho_init(void)
2163 {
2164         LCONSOLE_INFO("Echo OBD driver; http://www.lustre.org/\n");
2165
2166         LASSERT(PAGE_CACHE_SIZE % OBD_ECHO_BLOCK_SIZE == 0);
2167
2168         return echo_client_init();
2169 }
2170
2171 static void /*__exit*/ obdecho_exit(void)
2172 {
2173         echo_client_exit();
2174
2175 }
2176
2177 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
2178 MODULE_DESCRIPTION("Lustre Testing Echo OBD driver");
2179 MODULE_LICENSE("GPL");
2180 MODULE_VERSION(LUSTRE_VERSION_STRING);
2181
2182 module_init(obdecho_init);
2183 module_exit(obdecho_exit);
2184
2185 /** @} echo_client */