]> git.karo-electronics.de Git - karo-tx-linux.git/blob - drivers/block/drbd/drbd_main.c
drbd: downgraded error printk to info
[karo-tx-linux.git] / drivers / block / drbd / drbd_main.c
1 /*
2    drbd.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    Thanks to Carter Burden, Bart Grantham and Gennadiy Nerubayev
11    from Logicworks, Inc. for making SDP replication support possible.
12
13    drbd is free software; you can redistribute it and/or modify
14    it under the terms of the GNU General Public License as published by
15    the Free Software Foundation; either version 2, or (at your option)
16    any later version.
17
18    drbd is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
21    GNU General Public License for more details.
22
23    You should have received a copy of the GNU General Public License
24    along with drbd; see the file COPYING.  If not, write to
25    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
26
27  */
28
29 #include <linux/module.h>
30 #include <linux/drbd.h>
31 #include <asm/uaccess.h>
32 #include <asm/types.h>
33 #include <net/sock.h>
34 #include <linux/ctype.h>
35 #include <linux/mutex.h>
36 #include <linux/fs.h>
37 #include <linux/file.h>
38 #include <linux/proc_fs.h>
39 #include <linux/init.h>
40 #include <linux/mm.h>
41 #include <linux/memcontrol.h>
42 #include <linux/mm_inline.h>
43 #include <linux/slab.h>
44 #include <linux/random.h>
45 #include <linux/reboot.h>
46 #include <linux/notifier.h>
47 #include <linux/kthread.h>
48
49 #define __KERNEL_SYSCALLS__
50 #include <linux/unistd.h>
51 #include <linux/vmalloc.h>
52
53 #include <linux/drbd_limits.h>
54 #include "drbd_int.h"
55 #include "drbd_req.h" /* only for _req_mod in tl_release and tl_clear */
56
57 #include "drbd_vli.h"
58
59 struct after_state_chg_work {
60         struct drbd_work w;
61         union drbd_state os;
62         union drbd_state ns;
63         enum chg_state_flags flags;
64         struct completion *done;
65 };
66
67 static DEFINE_MUTEX(drbd_main_mutex);
68 int drbdd_init(struct drbd_thread *);
69 int drbd_worker(struct drbd_thread *);
70 int drbd_asender(struct drbd_thread *);
71
72 int drbd_init(void);
73 static int drbd_open(struct block_device *bdev, fmode_t mode);
74 static int drbd_release(struct gendisk *gd, fmode_t mode);
75 static int w_after_state_ch(struct drbd_conf *mdev, struct drbd_work *w, int unused);
76 static void after_state_ch(struct drbd_conf *mdev, union drbd_state os,
77                            union drbd_state ns, enum chg_state_flags flags);
78 static int w_md_sync(struct drbd_conf *mdev, struct drbd_work *w, int unused);
79 static void md_sync_timer_fn(unsigned long data);
80 static int w_bitmap_io(struct drbd_conf *mdev, struct drbd_work *w, int unused);
81 static int w_go_diskless(struct drbd_conf *mdev, struct drbd_work *w, int unused);
82
83 MODULE_AUTHOR("Philipp Reisner <phil@linbit.com>, "
84               "Lars Ellenberg <lars@linbit.com>");
85 MODULE_DESCRIPTION("drbd - Distributed Replicated Block Device v" REL_VERSION);
86 MODULE_VERSION(REL_VERSION);
87 MODULE_LICENSE("GPL");
88 MODULE_PARM_DESC(minor_count, "Maximum number of drbd devices ("
89                  __stringify(DRBD_MINOR_COUNT_MIN) "-" __stringify(DRBD_MINOR_COUNT_MAX) ")");
90 MODULE_ALIAS_BLOCKDEV_MAJOR(DRBD_MAJOR);
91
92 #include <linux/moduleparam.h>
93 /* allow_open_on_secondary */
94 MODULE_PARM_DESC(allow_oos, "DONT USE!");
95 /* thanks to these macros, if compiled into the kernel (not-module),
96  * this becomes the boot parameter drbd.minor_count */
97 module_param(minor_count, uint, 0444);
98 module_param(disable_sendpage, bool, 0644);
99 module_param(allow_oos, bool, 0);
100 module_param(cn_idx, uint, 0444);
101 module_param(proc_details, int, 0644);
102
103 #ifdef CONFIG_DRBD_FAULT_INJECTION
104 int enable_faults;
105 int fault_rate;
106 static int fault_count;
107 int fault_devs;
108 /* bitmap of enabled faults */
109 module_param(enable_faults, int, 0664);
110 /* fault rate % value - applies to all enabled faults */
111 module_param(fault_rate, int, 0664);
112 /* count of faults inserted */
113 module_param(fault_count, int, 0664);
114 /* bitmap of devices to insert faults on */
115 module_param(fault_devs, int, 0644);
116 #endif
117
118 /* module parameter, defined */
119 unsigned int minor_count = DRBD_MINOR_COUNT_DEF;
120 bool disable_sendpage;
121 bool allow_oos;
122 unsigned int cn_idx = CN_IDX_DRBD;
123 int proc_details;       /* Detail level in proc drbd*/
124
125 /* Module parameter for setting the user mode helper program
126  * to run. Default is /sbin/drbdadm */
127 char usermode_helper[80] = "/sbin/drbdadm";
128
129 module_param_string(usermode_helper, usermode_helper, sizeof(usermode_helper), 0644);
130
131 /* in 2.6.x, our device mapping and config info contains our virtual gendisks
132  * as member "struct gendisk *vdisk;"
133  */
134 struct drbd_conf **minor_table;
135
136 struct kmem_cache *drbd_request_cache;
137 struct kmem_cache *drbd_ee_cache;       /* epoch entries */
138 struct kmem_cache *drbd_bm_ext_cache;   /* bitmap extents */
139 struct kmem_cache *drbd_al_ext_cache;   /* activity log extents */
140 mempool_t *drbd_request_mempool;
141 mempool_t *drbd_ee_mempool;
142
143 /* I do not use a standard mempool, because:
144    1) I want to hand out the pre-allocated objects first.
145    2) I want to be able to interrupt sleeping allocation with a signal.
146    Note: This is a single linked list, the next pointer is the private
147          member of struct page.
148  */
149 struct page *drbd_pp_pool;
150 spinlock_t   drbd_pp_lock;
151 int          drbd_pp_vacant;
152 wait_queue_head_t drbd_pp_wait;
153
154 DEFINE_RATELIMIT_STATE(drbd_ratelimit_state, 5 * HZ, 5);
155
156 static const struct block_device_operations drbd_ops = {
157         .owner =   THIS_MODULE,
158         .open =    drbd_open,
159         .release = drbd_release,
160 };
161
162 #define ARRY_SIZE(A) (sizeof(A)/sizeof(A[0]))
163
164 #ifdef __CHECKER__
165 /* When checking with sparse, and this is an inline function, sparse will
166    give tons of false positives. When this is a real functions sparse works.
167  */
168 int _get_ldev_if_state(struct drbd_conf *mdev, enum drbd_disk_state mins)
169 {
170         int io_allowed;
171
172         atomic_inc(&mdev->local_cnt);
173         io_allowed = (mdev->state.disk >= mins);
174         if (!io_allowed) {
175                 if (atomic_dec_and_test(&mdev->local_cnt))
176                         wake_up(&mdev->misc_wait);
177         }
178         return io_allowed;
179 }
180
181 #endif
182
183 /**
184  * DOC: The transfer log
185  *
186  * The transfer log is a single linked list of &struct drbd_tl_epoch objects.
187  * mdev->newest_tle points to the head, mdev->oldest_tle points to the tail
188  * of the list. There is always at least one &struct drbd_tl_epoch object.
189  *
190  * Each &struct drbd_tl_epoch has a circular double linked list of requests
191  * attached.
192  */
193 static int tl_init(struct drbd_conf *mdev)
194 {
195         struct drbd_tl_epoch *b;
196
197         /* during device minor initialization, we may well use GFP_KERNEL */
198         b = kmalloc(sizeof(struct drbd_tl_epoch), GFP_KERNEL);
199         if (!b)
200                 return 0;
201         INIT_LIST_HEAD(&b->requests);
202         INIT_LIST_HEAD(&b->w.list);
203         b->next = NULL;
204         b->br_number = 4711;
205         b->n_writes = 0;
206         b->w.cb = NULL; /* if this is != NULL, we need to dec_ap_pending in tl_clear */
207
208         mdev->oldest_tle = b;
209         mdev->newest_tle = b;
210         INIT_LIST_HEAD(&mdev->out_of_sequence_requests);
211
212         mdev->tl_hash = NULL;
213         mdev->tl_hash_s = 0;
214
215         return 1;
216 }
217
218 static void tl_cleanup(struct drbd_conf *mdev)
219 {
220         D_ASSERT(mdev->oldest_tle == mdev->newest_tle);
221         D_ASSERT(list_empty(&mdev->out_of_sequence_requests));
222         kfree(mdev->oldest_tle);
223         mdev->oldest_tle = NULL;
224         kfree(mdev->unused_spare_tle);
225         mdev->unused_spare_tle = NULL;
226         kfree(mdev->tl_hash);
227         mdev->tl_hash = NULL;
228         mdev->tl_hash_s = 0;
229 }
230
231 /**
232  * _tl_add_barrier() - Adds a barrier to the transfer log
233  * @mdev:       DRBD device.
234  * @new:        Barrier to be added before the current head of the TL.
235  *
236  * The caller must hold the req_lock.
237  */
238 void _tl_add_barrier(struct drbd_conf *mdev, struct drbd_tl_epoch *new)
239 {
240         struct drbd_tl_epoch *newest_before;
241
242         INIT_LIST_HEAD(&new->requests);
243         INIT_LIST_HEAD(&new->w.list);
244         new->w.cb = NULL; /* if this is != NULL, we need to dec_ap_pending in tl_clear */
245         new->next = NULL;
246         new->n_writes = 0;
247
248         newest_before = mdev->newest_tle;
249         /* never send a barrier number == 0, because that is special-cased
250          * when using TCQ for our write ordering code */
251         new->br_number = (newest_before->br_number+1) ?: 1;
252         if (mdev->newest_tle != new) {
253                 mdev->newest_tle->next = new;
254                 mdev->newest_tle = new;
255         }
256 }
257
258 /**
259  * tl_release() - Free or recycle the oldest &struct drbd_tl_epoch object of the TL
260  * @mdev:       DRBD device.
261  * @barrier_nr: Expected identifier of the DRBD write barrier packet.
262  * @set_size:   Expected number of requests before that barrier.
263  *
264  * In case the passed barrier_nr or set_size does not match the oldest
265  * &struct drbd_tl_epoch objects this function will cause a termination
266  * of the connection.
267  */
268 void tl_release(struct drbd_conf *mdev, unsigned int barrier_nr,
269                        unsigned int set_size)
270 {
271         struct drbd_tl_epoch *b, *nob; /* next old barrier */
272         struct list_head *le, *tle;
273         struct drbd_request *r;
274
275         spin_lock_irq(&mdev->req_lock);
276
277         b = mdev->oldest_tle;
278
279         /* first some paranoia code */
280         if (b == NULL) {
281                 dev_err(DEV, "BAD! BarrierAck #%u received, but no epoch in tl!?\n",
282                         barrier_nr);
283                 goto bail;
284         }
285         if (b->br_number != barrier_nr) {
286                 dev_err(DEV, "BAD! BarrierAck #%u received, expected #%u!\n",
287                         barrier_nr, b->br_number);
288                 goto bail;
289         }
290         if (b->n_writes != set_size) {
291                 dev_err(DEV, "BAD! BarrierAck #%u received with n_writes=%u, expected n_writes=%u!\n",
292                         barrier_nr, set_size, b->n_writes);
293                 goto bail;
294         }
295
296         /* Clean up list of requests processed during current epoch */
297         list_for_each_safe(le, tle, &b->requests) {
298                 r = list_entry(le, struct drbd_request, tl_requests);
299                 _req_mod(r, barrier_acked);
300         }
301         /* There could be requests on the list waiting for completion
302            of the write to the local disk. To avoid corruptions of
303            slab's data structures we have to remove the lists head.
304
305            Also there could have been a barrier ack out of sequence, overtaking
306            the write acks - which would be a bug and violating write ordering.
307            To not deadlock in case we lose connection while such requests are
308            still pending, we need some way to find them for the
309            _req_mode(connection_lost_while_pending).
310
311            These have been list_move'd to the out_of_sequence_requests list in
312            _req_mod(, barrier_acked) above.
313            */
314         list_del_init(&b->requests);
315
316         nob = b->next;
317         if (test_and_clear_bit(CREATE_BARRIER, &mdev->flags)) {
318                 _tl_add_barrier(mdev, b);
319                 if (nob)
320                         mdev->oldest_tle = nob;
321                 /* if nob == NULL b was the only barrier, and becomes the new
322                    barrier. Therefore mdev->oldest_tle points already to b */
323         } else {
324                 D_ASSERT(nob != NULL);
325                 mdev->oldest_tle = nob;
326                 kfree(b);
327         }
328
329         spin_unlock_irq(&mdev->req_lock);
330         dec_ap_pending(mdev);
331
332         return;
333
334 bail:
335         spin_unlock_irq(&mdev->req_lock);
336         drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
337 }
338
339
340 /**
341  * _tl_restart() - Walks the transfer log, and applies an action to all requests
342  * @mdev:       DRBD device.
343  * @what:       The action/event to perform with all request objects
344  *
345  * @what might be one of connection_lost_while_pending, resend, fail_frozen_disk_io,
346  * restart_frozen_disk_io.
347  */
348 static void _tl_restart(struct drbd_conf *mdev, enum drbd_req_event what)
349 {
350         struct drbd_tl_epoch *b, *tmp, **pn;
351         struct list_head *le, *tle, carry_reads;
352         struct drbd_request *req;
353         int rv, n_writes, n_reads;
354
355         b = mdev->oldest_tle;
356         pn = &mdev->oldest_tle;
357         while (b) {
358                 n_writes = 0;
359                 n_reads = 0;
360                 INIT_LIST_HEAD(&carry_reads);
361                 list_for_each_safe(le, tle, &b->requests) {
362                         req = list_entry(le, struct drbd_request, tl_requests);
363                         rv = _req_mod(req, what);
364
365                         n_writes += (rv & MR_WRITE) >> MR_WRITE_SHIFT;
366                         n_reads  += (rv & MR_READ) >> MR_READ_SHIFT;
367                 }
368                 tmp = b->next;
369
370                 if (n_writes) {
371                         if (what == resend) {
372                                 b->n_writes = n_writes;
373                                 if (b->w.cb == NULL) {
374                                         b->w.cb = w_send_barrier;
375                                         inc_ap_pending(mdev);
376                                         set_bit(CREATE_BARRIER, &mdev->flags);
377                                 }
378
379                                 drbd_queue_work(&mdev->data.work, &b->w);
380                         }
381                         pn = &b->next;
382                 } else {
383                         if (n_reads)
384                                 list_add(&carry_reads, &b->requests);
385                         /* there could still be requests on that ring list,
386                          * in case local io is still pending */
387                         list_del(&b->requests);
388
389                         /* dec_ap_pending corresponding to queue_barrier.
390                          * the newest barrier may not have been queued yet,
391                          * in which case w.cb is still NULL. */
392                         if (b->w.cb != NULL)
393                                 dec_ap_pending(mdev);
394
395                         if (b == mdev->newest_tle) {
396                                 /* recycle, but reinit! */
397                                 D_ASSERT(tmp == NULL);
398                                 INIT_LIST_HEAD(&b->requests);
399                                 list_splice(&carry_reads, &b->requests);
400                                 INIT_LIST_HEAD(&b->w.list);
401                                 b->w.cb = NULL;
402                                 b->br_number = net_random();
403                                 b->n_writes = 0;
404
405                                 *pn = b;
406                                 break;
407                         }
408                         *pn = tmp;
409                         kfree(b);
410                 }
411                 b = tmp;
412                 list_splice(&carry_reads, &b->requests);
413         }
414 }
415
416
417 /**
418  * tl_clear() - Clears all requests and &struct drbd_tl_epoch objects out of the TL
419  * @mdev:       DRBD device.
420  *
421  * This is called after the connection to the peer was lost. The storage covered
422  * by the requests on the transfer gets marked as our of sync. Called from the
423  * receiver thread and the worker thread.
424  */
425 void tl_clear(struct drbd_conf *mdev)
426 {
427         struct list_head *le, *tle;
428         struct drbd_request *r;
429
430         spin_lock_irq(&mdev->req_lock);
431
432         _tl_restart(mdev, connection_lost_while_pending);
433
434         /* we expect this list to be empty. */
435         D_ASSERT(list_empty(&mdev->out_of_sequence_requests));
436
437         /* but just in case, clean it up anyways! */
438         list_for_each_safe(le, tle, &mdev->out_of_sequence_requests) {
439                 r = list_entry(le, struct drbd_request, tl_requests);
440                 /* It would be nice to complete outside of spinlock.
441                  * But this is easier for now. */
442                 _req_mod(r, connection_lost_while_pending);
443         }
444
445         /* ensure bit indicating barrier is required is clear */
446         clear_bit(CREATE_BARRIER, &mdev->flags);
447
448         memset(mdev->app_reads_hash, 0, APP_R_HSIZE*sizeof(void *));
449
450         spin_unlock_irq(&mdev->req_lock);
451 }
452
453 void tl_restart(struct drbd_conf *mdev, enum drbd_req_event what)
454 {
455         spin_lock_irq(&mdev->req_lock);
456         _tl_restart(mdev, what);
457         spin_unlock_irq(&mdev->req_lock);
458 }
459
460 /**
461  * cl_wide_st_chg() - true if the state change is a cluster wide one
462  * @mdev:       DRBD device.
463  * @os:         old (current) state.
464  * @ns:         new (wanted) state.
465  */
466 static int cl_wide_st_chg(struct drbd_conf *mdev,
467                           union drbd_state os, union drbd_state ns)
468 {
469         return (os.conn >= C_CONNECTED && ns.conn >= C_CONNECTED &&
470                  ((os.role != R_PRIMARY && ns.role == R_PRIMARY) ||
471                   (os.conn != C_STARTING_SYNC_T && ns.conn == C_STARTING_SYNC_T) ||
472                   (os.conn != C_STARTING_SYNC_S && ns.conn == C_STARTING_SYNC_S) ||
473                   (os.disk != D_DISKLESS && ns.disk == D_DISKLESS))) ||
474                 (os.conn >= C_CONNECTED && ns.conn == C_DISCONNECTING) ||
475                 (os.conn == C_CONNECTED && ns.conn == C_VERIFY_S);
476 }
477
478 enum drbd_state_rv
479 drbd_change_state(struct drbd_conf *mdev, enum chg_state_flags f,
480                   union drbd_state mask, union drbd_state val)
481 {
482         unsigned long flags;
483         union drbd_state os, ns;
484         enum drbd_state_rv rv;
485
486         spin_lock_irqsave(&mdev->req_lock, flags);
487         os = mdev->state;
488         ns.i = (os.i & ~mask.i) | val.i;
489         rv = _drbd_set_state(mdev, ns, f, NULL);
490         ns = mdev->state;
491         spin_unlock_irqrestore(&mdev->req_lock, flags);
492
493         return rv;
494 }
495
496 /**
497  * drbd_force_state() - Impose a change which happens outside our control on our state
498  * @mdev:       DRBD device.
499  * @mask:       mask of state bits to change.
500  * @val:        value of new state bits.
501  */
502 void drbd_force_state(struct drbd_conf *mdev,
503         union drbd_state mask, union drbd_state val)
504 {
505         drbd_change_state(mdev, CS_HARD, mask, val);
506 }
507
508 static enum drbd_state_rv is_valid_state(struct drbd_conf *, union drbd_state);
509 static enum drbd_state_rv is_valid_state_transition(struct drbd_conf *,
510                                                     union drbd_state,
511                                                     union drbd_state);
512 static union drbd_state sanitize_state(struct drbd_conf *mdev, union drbd_state os,
513                                        union drbd_state ns, const char **warn_sync_abort);
514 int drbd_send_state_req(struct drbd_conf *,
515                         union drbd_state, union drbd_state);
516
517 static enum drbd_state_rv
518 _req_st_cond(struct drbd_conf *mdev, union drbd_state mask,
519              union drbd_state val)
520 {
521         union drbd_state os, ns;
522         unsigned long flags;
523         enum drbd_state_rv rv;
524
525         if (test_and_clear_bit(CL_ST_CHG_SUCCESS, &mdev->flags))
526                 return SS_CW_SUCCESS;
527
528         if (test_and_clear_bit(CL_ST_CHG_FAIL, &mdev->flags))
529                 return SS_CW_FAILED_BY_PEER;
530
531         rv = 0;
532         spin_lock_irqsave(&mdev->req_lock, flags);
533         os = mdev->state;
534         ns.i = (os.i & ~mask.i) | val.i;
535         ns = sanitize_state(mdev, os, ns, NULL);
536
537         if (!cl_wide_st_chg(mdev, os, ns))
538                 rv = SS_CW_NO_NEED;
539         if (!rv) {
540                 rv = is_valid_state(mdev, ns);
541                 if (rv == SS_SUCCESS) {
542                         rv = is_valid_state_transition(mdev, ns, os);
543                         if (rv == SS_SUCCESS)
544                                 rv = SS_UNKNOWN_ERROR; /* cont waiting, otherwise fail. */
545                 }
546         }
547         spin_unlock_irqrestore(&mdev->req_lock, flags);
548
549         return rv;
550 }
551
552 /**
553  * drbd_req_state() - Perform an eventually cluster wide state change
554  * @mdev:       DRBD device.
555  * @mask:       mask of state bits to change.
556  * @val:        value of new state bits.
557  * @f:          flags
558  *
559  * Should not be called directly, use drbd_request_state() or
560  * _drbd_request_state().
561  */
562 static enum drbd_state_rv
563 drbd_req_state(struct drbd_conf *mdev, union drbd_state mask,
564                union drbd_state val, enum chg_state_flags f)
565 {
566         struct completion done;
567         unsigned long flags;
568         union drbd_state os, ns;
569         enum drbd_state_rv rv;
570
571         init_completion(&done);
572
573         if (f & CS_SERIALIZE)
574                 mutex_lock(&mdev->state_mutex);
575
576         spin_lock_irqsave(&mdev->req_lock, flags);
577         os = mdev->state;
578         ns.i = (os.i & ~mask.i) | val.i;
579         ns = sanitize_state(mdev, os, ns, NULL);
580
581         if (cl_wide_st_chg(mdev, os, ns)) {
582                 rv = is_valid_state(mdev, ns);
583                 if (rv == SS_SUCCESS)
584                         rv = is_valid_state_transition(mdev, ns, os);
585                 spin_unlock_irqrestore(&mdev->req_lock, flags);
586
587                 if (rv < SS_SUCCESS) {
588                         if (f & CS_VERBOSE)
589                                 print_st_err(mdev, os, ns, rv);
590                         goto abort;
591                 }
592
593                 drbd_state_lock(mdev);
594                 if (!drbd_send_state_req(mdev, mask, val)) {
595                         drbd_state_unlock(mdev);
596                         rv = SS_CW_FAILED_BY_PEER;
597                         if (f & CS_VERBOSE)
598                                 print_st_err(mdev, os, ns, rv);
599                         goto abort;
600                 }
601
602                 wait_event(mdev->state_wait,
603                         (rv = _req_st_cond(mdev, mask, val)));
604
605                 if (rv < SS_SUCCESS) {
606                         drbd_state_unlock(mdev);
607                         if (f & CS_VERBOSE)
608                                 print_st_err(mdev, os, ns, rv);
609                         goto abort;
610                 }
611                 spin_lock_irqsave(&mdev->req_lock, flags);
612                 os = mdev->state;
613                 ns.i = (os.i & ~mask.i) | val.i;
614                 rv = _drbd_set_state(mdev, ns, f, &done);
615                 drbd_state_unlock(mdev);
616         } else {
617                 rv = _drbd_set_state(mdev, ns, f, &done);
618         }
619
620         spin_unlock_irqrestore(&mdev->req_lock, flags);
621
622         if (f & CS_WAIT_COMPLETE && rv == SS_SUCCESS) {
623                 D_ASSERT(current != mdev->worker.task);
624                 wait_for_completion(&done);
625         }
626
627 abort:
628         if (f & CS_SERIALIZE)
629                 mutex_unlock(&mdev->state_mutex);
630
631         return rv;
632 }
633
634 /**
635  * _drbd_request_state() - Request a state change (with flags)
636  * @mdev:       DRBD device.
637  * @mask:       mask of state bits to change.
638  * @val:        value of new state bits.
639  * @f:          flags
640  *
641  * Cousin of drbd_request_state(), useful with the CS_WAIT_COMPLETE
642  * flag, or when logging of failed state change requests is not desired.
643  */
644 enum drbd_state_rv
645 _drbd_request_state(struct drbd_conf *mdev, union drbd_state mask,
646                     union drbd_state val, enum chg_state_flags f)
647 {
648         enum drbd_state_rv rv;
649
650         wait_event(mdev->state_wait,
651                    (rv = drbd_req_state(mdev, mask, val, f)) != SS_IN_TRANSIENT_STATE);
652
653         return rv;
654 }
655
656 static void print_st(struct drbd_conf *mdev, char *name, union drbd_state ns)
657 {
658         dev_err(DEV, " %s = { cs:%s ro:%s/%s ds:%s/%s %c%c%c%c }\n",
659             name,
660             drbd_conn_str(ns.conn),
661             drbd_role_str(ns.role),
662             drbd_role_str(ns.peer),
663             drbd_disk_str(ns.disk),
664             drbd_disk_str(ns.pdsk),
665             is_susp(ns) ? 's' : 'r',
666             ns.aftr_isp ? 'a' : '-',
667             ns.peer_isp ? 'p' : '-',
668             ns.user_isp ? 'u' : '-'
669             );
670 }
671
672 void print_st_err(struct drbd_conf *mdev, union drbd_state os,
673                   union drbd_state ns, enum drbd_state_rv err)
674 {
675         if (err == SS_IN_TRANSIENT_STATE)
676                 return;
677         dev_err(DEV, "State change failed: %s\n", drbd_set_st_err_str(err));
678         print_st(mdev, " state", os);
679         print_st(mdev, "wanted", ns);
680 }
681
682
683 /**
684  * is_valid_state() - Returns an SS_ error code if ns is not valid
685  * @mdev:       DRBD device.
686  * @ns:         State to consider.
687  */
688 static enum drbd_state_rv
689 is_valid_state(struct drbd_conf *mdev, union drbd_state ns)
690 {
691         /* See drbd_state_sw_errors in drbd_strings.c */
692
693         enum drbd_fencing_p fp;
694         enum drbd_state_rv rv = SS_SUCCESS;
695
696         fp = FP_DONT_CARE;
697         if (get_ldev(mdev)) {
698                 fp = mdev->ldev->dc.fencing;
699                 put_ldev(mdev);
700         }
701
702         if (get_net_conf(mdev)) {
703                 if (!mdev->net_conf->two_primaries &&
704                     ns.role == R_PRIMARY && ns.peer == R_PRIMARY)
705                         rv = SS_TWO_PRIMARIES;
706                 put_net_conf(mdev);
707         }
708
709         if (rv <= 0)
710                 /* already found a reason to abort */;
711         else if (ns.role == R_SECONDARY && mdev->open_cnt)
712                 rv = SS_DEVICE_IN_USE;
713
714         else if (ns.role == R_PRIMARY && ns.conn < C_CONNECTED && ns.disk < D_UP_TO_DATE)
715                 rv = SS_NO_UP_TO_DATE_DISK;
716
717         else if (fp >= FP_RESOURCE &&
718                  ns.role == R_PRIMARY && ns.conn < C_CONNECTED && ns.pdsk >= D_UNKNOWN)
719                 rv = SS_PRIMARY_NOP;
720
721         else if (ns.role == R_PRIMARY && ns.disk <= D_INCONSISTENT && ns.pdsk <= D_INCONSISTENT)
722                 rv = SS_NO_UP_TO_DATE_DISK;
723
724         else if (ns.conn > C_CONNECTED && ns.disk < D_INCONSISTENT)
725                 rv = SS_NO_LOCAL_DISK;
726
727         else if (ns.conn > C_CONNECTED && ns.pdsk < D_INCONSISTENT)
728                 rv = SS_NO_REMOTE_DISK;
729
730         else if (ns.conn > C_CONNECTED && ns.disk < D_UP_TO_DATE && ns.pdsk < D_UP_TO_DATE)
731                 rv = SS_NO_UP_TO_DATE_DISK;
732
733         else if ((ns.conn == C_CONNECTED ||
734                   ns.conn == C_WF_BITMAP_S ||
735                   ns.conn == C_SYNC_SOURCE ||
736                   ns.conn == C_PAUSED_SYNC_S) &&
737                   ns.disk == D_OUTDATED)
738                 rv = SS_CONNECTED_OUTDATES;
739
740         else if ((ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T) &&
741                  (mdev->sync_conf.verify_alg[0] == 0))
742                 rv = SS_NO_VERIFY_ALG;
743
744         else if ((ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T) &&
745                   mdev->agreed_pro_version < 88)
746                 rv = SS_NOT_SUPPORTED;
747
748         else if (ns.conn >= C_CONNECTED && ns.pdsk == D_UNKNOWN)
749                 rv = SS_CONNECTED_OUTDATES;
750
751         return rv;
752 }
753
754 /**
755  * is_valid_state_transition() - Returns an SS_ error code if the state transition is not possible
756  * @mdev:       DRBD device.
757  * @ns:         new state.
758  * @os:         old state.
759  */
760 static enum drbd_state_rv
761 is_valid_state_transition(struct drbd_conf *mdev, union drbd_state ns,
762                           union drbd_state os)
763 {
764         enum drbd_state_rv rv = SS_SUCCESS;
765
766         if ((ns.conn == C_STARTING_SYNC_T || ns.conn == C_STARTING_SYNC_S) &&
767             os.conn > C_CONNECTED)
768                 rv = SS_RESYNC_RUNNING;
769
770         if (ns.conn == C_DISCONNECTING && os.conn == C_STANDALONE)
771                 rv = SS_ALREADY_STANDALONE;
772
773         if (ns.disk > D_ATTACHING && os.disk == D_DISKLESS)
774                 rv = SS_IS_DISKLESS;
775
776         if (ns.conn == C_WF_CONNECTION && os.conn < C_UNCONNECTED)
777                 rv = SS_NO_NET_CONFIG;
778
779         if (ns.disk == D_OUTDATED && os.disk < D_OUTDATED && os.disk != D_ATTACHING)
780                 rv = SS_LOWER_THAN_OUTDATED;
781
782         if (ns.conn == C_DISCONNECTING && os.conn == C_UNCONNECTED)
783                 rv = SS_IN_TRANSIENT_STATE;
784
785         if (ns.conn == os.conn && ns.conn == C_WF_REPORT_PARAMS)
786                 rv = SS_IN_TRANSIENT_STATE;
787
788         if ((ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T) && os.conn < C_CONNECTED)
789                 rv = SS_NEED_CONNECTION;
790
791         if ((ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T) &&
792             ns.conn != os.conn && os.conn > C_CONNECTED)
793                 rv = SS_RESYNC_RUNNING;
794
795         if ((ns.conn == C_STARTING_SYNC_S || ns.conn == C_STARTING_SYNC_T) &&
796             os.conn < C_CONNECTED)
797                 rv = SS_NEED_CONNECTION;
798
799         if ((ns.conn == C_SYNC_TARGET || ns.conn == C_SYNC_SOURCE)
800             && os.conn < C_WF_REPORT_PARAMS)
801                 rv = SS_NEED_CONNECTION; /* No NetworkFailure -> SyncTarget etc... */
802
803         return rv;
804 }
805
806 /**
807  * sanitize_state() - Resolves implicitly necessary additional changes to a state transition
808  * @mdev:       DRBD device.
809  * @os:         old state.
810  * @ns:         new state.
811  * @warn_sync_abort:
812  *
813  * When we loose connection, we have to set the state of the peers disk (pdsk)
814  * to D_UNKNOWN. This rule and many more along those lines are in this function.
815  */
816 static union drbd_state sanitize_state(struct drbd_conf *mdev, union drbd_state os,
817                                        union drbd_state ns, const char **warn_sync_abort)
818 {
819         enum drbd_fencing_p fp;
820         enum drbd_disk_state disk_min, disk_max, pdsk_min, pdsk_max;
821
822         fp = FP_DONT_CARE;
823         if (get_ldev(mdev)) {
824                 fp = mdev->ldev->dc.fencing;
825                 put_ldev(mdev);
826         }
827
828         /* Disallow Network errors to configure a device's network part */
829         if ((ns.conn >= C_TIMEOUT && ns.conn <= C_TEAR_DOWN) &&
830             os.conn <= C_DISCONNECTING)
831                 ns.conn = os.conn;
832
833         /* After a network error (+C_TEAR_DOWN) only C_UNCONNECTED or C_DISCONNECTING can follow.
834          * If you try to go into some Sync* state, that shall fail (elsewhere). */
835         if (os.conn >= C_TIMEOUT && os.conn <= C_TEAR_DOWN &&
836             ns.conn != C_UNCONNECTED && ns.conn != C_DISCONNECTING && ns.conn <= C_TEAR_DOWN)
837                 ns.conn = os.conn;
838
839         /* we cannot fail (again) if we already detached */
840         if (ns.disk == D_FAILED && os.disk == D_DISKLESS)
841                 ns.disk = D_DISKLESS;
842
843         /* if we are only D_ATTACHING yet,
844          * we can (and should) go directly to D_DISKLESS. */
845         if (ns.disk == D_FAILED && os.disk == D_ATTACHING)
846                 ns.disk = D_DISKLESS;
847
848         /* After C_DISCONNECTING only C_STANDALONE may follow */
849         if (os.conn == C_DISCONNECTING && ns.conn != C_STANDALONE)
850                 ns.conn = os.conn;
851
852         if (ns.conn < C_CONNECTED) {
853                 ns.peer_isp = 0;
854                 ns.peer = R_UNKNOWN;
855                 if (ns.pdsk > D_UNKNOWN || ns.pdsk < D_INCONSISTENT)
856                         ns.pdsk = D_UNKNOWN;
857         }
858
859         /* Clear the aftr_isp when becoming unconfigured */
860         if (ns.conn == C_STANDALONE && ns.disk == D_DISKLESS && ns.role == R_SECONDARY)
861                 ns.aftr_isp = 0;
862
863         /* Abort resync if a disk fails/detaches */
864         if (os.conn > C_CONNECTED && ns.conn > C_CONNECTED &&
865             (ns.disk <= D_FAILED || ns.pdsk <= D_FAILED)) {
866                 if (warn_sync_abort)
867                         *warn_sync_abort =
868                                 os.conn == C_VERIFY_S || os.conn == C_VERIFY_T ?
869                                 "Online-verify" : "Resync";
870                 ns.conn = C_CONNECTED;
871         }
872
873         /* Connection breaks down before we finished "Negotiating" */
874         if (ns.conn < C_CONNECTED && ns.disk == D_NEGOTIATING &&
875             get_ldev_if_state(mdev, D_NEGOTIATING)) {
876                 if (mdev->ed_uuid == mdev->ldev->md.uuid[UI_CURRENT]) {
877                         ns.disk = mdev->new_state_tmp.disk;
878                         ns.pdsk = mdev->new_state_tmp.pdsk;
879                 } else {
880                         dev_alert(DEV, "Connection lost while negotiating, no data!\n");
881                         ns.disk = D_DISKLESS;
882                         ns.pdsk = D_UNKNOWN;
883                 }
884                 put_ldev(mdev);
885         }
886
887         /* D_CONSISTENT and D_OUTDATED vanish when we get connected */
888         if (ns.conn >= C_CONNECTED && ns.conn < C_AHEAD) {
889                 if (ns.disk == D_CONSISTENT || ns.disk == D_OUTDATED)
890                         ns.disk = D_UP_TO_DATE;
891                 if (ns.pdsk == D_CONSISTENT || ns.pdsk == D_OUTDATED)
892                         ns.pdsk = D_UP_TO_DATE;
893         }
894
895         /* Implications of the connection stat on the disk states */
896         disk_min = D_DISKLESS;
897         disk_max = D_UP_TO_DATE;
898         pdsk_min = D_INCONSISTENT;
899         pdsk_max = D_UNKNOWN;
900         switch ((enum drbd_conns)ns.conn) {
901         case C_WF_BITMAP_T:
902         case C_PAUSED_SYNC_T:
903         case C_STARTING_SYNC_T:
904         case C_WF_SYNC_UUID:
905         case C_BEHIND:
906                 disk_min = D_INCONSISTENT;
907                 disk_max = D_OUTDATED;
908                 pdsk_min = D_UP_TO_DATE;
909                 pdsk_max = D_UP_TO_DATE;
910                 break;
911         case C_VERIFY_S:
912         case C_VERIFY_T:
913                 disk_min = D_UP_TO_DATE;
914                 disk_max = D_UP_TO_DATE;
915                 pdsk_min = D_UP_TO_DATE;
916                 pdsk_max = D_UP_TO_DATE;
917                 break;
918         case C_CONNECTED:
919                 disk_min = D_DISKLESS;
920                 disk_max = D_UP_TO_DATE;
921                 pdsk_min = D_DISKLESS;
922                 pdsk_max = D_UP_TO_DATE;
923                 break;
924         case C_WF_BITMAP_S:
925         case C_PAUSED_SYNC_S:
926         case C_STARTING_SYNC_S:
927         case C_AHEAD:
928                 disk_min = D_UP_TO_DATE;
929                 disk_max = D_UP_TO_DATE;
930                 pdsk_min = D_INCONSISTENT;
931                 pdsk_max = D_CONSISTENT; /* D_OUTDATED would be nice. But explicit outdate necessary*/
932                 break;
933         case C_SYNC_TARGET:
934                 disk_min = D_INCONSISTENT;
935                 disk_max = D_INCONSISTENT;
936                 pdsk_min = D_UP_TO_DATE;
937                 pdsk_max = D_UP_TO_DATE;
938                 break;
939         case C_SYNC_SOURCE:
940                 disk_min = D_UP_TO_DATE;
941                 disk_max = D_UP_TO_DATE;
942                 pdsk_min = D_INCONSISTENT;
943                 pdsk_max = D_INCONSISTENT;
944                 break;
945         case C_STANDALONE:
946         case C_DISCONNECTING:
947         case C_UNCONNECTED:
948         case C_TIMEOUT:
949         case C_BROKEN_PIPE:
950         case C_NETWORK_FAILURE:
951         case C_PROTOCOL_ERROR:
952         case C_TEAR_DOWN:
953         case C_WF_CONNECTION:
954         case C_WF_REPORT_PARAMS:
955         case C_MASK:
956                 break;
957         }
958         if (ns.disk > disk_max)
959                 ns.disk = disk_max;
960
961         if (ns.disk < disk_min) {
962                 dev_warn(DEV, "Implicitly set disk from %s to %s\n",
963                          drbd_disk_str(ns.disk), drbd_disk_str(disk_min));
964                 ns.disk = disk_min;
965         }
966         if (ns.pdsk > pdsk_max)
967                 ns.pdsk = pdsk_max;
968
969         if (ns.pdsk < pdsk_min) {
970                 dev_warn(DEV, "Implicitly set pdsk from %s to %s\n",
971                          drbd_disk_str(ns.pdsk), drbd_disk_str(pdsk_min));
972                 ns.pdsk = pdsk_min;
973         }
974
975         if (fp == FP_STONITH &&
976             (ns.role == R_PRIMARY && ns.conn < C_CONNECTED && ns.pdsk > D_OUTDATED) &&
977             !(os.role == R_PRIMARY && os.conn < C_CONNECTED && os.pdsk > D_OUTDATED))
978                 ns.susp_fen = 1; /* Suspend IO while fence-peer handler runs (peer lost) */
979
980         if (mdev->sync_conf.on_no_data == OND_SUSPEND_IO &&
981             (ns.role == R_PRIMARY && ns.disk < D_UP_TO_DATE && ns.pdsk < D_UP_TO_DATE) &&
982             !(os.role == R_PRIMARY && os.disk < D_UP_TO_DATE && os.pdsk < D_UP_TO_DATE))
983                 ns.susp_nod = 1; /* Suspend IO while no data available (no accessible data available) */
984
985         if (ns.aftr_isp || ns.peer_isp || ns.user_isp) {
986                 if (ns.conn == C_SYNC_SOURCE)
987                         ns.conn = C_PAUSED_SYNC_S;
988                 if (ns.conn == C_SYNC_TARGET)
989                         ns.conn = C_PAUSED_SYNC_T;
990         } else {
991                 if (ns.conn == C_PAUSED_SYNC_S)
992                         ns.conn = C_SYNC_SOURCE;
993                 if (ns.conn == C_PAUSED_SYNC_T)
994                         ns.conn = C_SYNC_TARGET;
995         }
996
997         return ns;
998 }
999
1000 /* helper for __drbd_set_state */
1001 static void set_ov_position(struct drbd_conf *mdev, enum drbd_conns cs)
1002 {
1003         if (mdev->agreed_pro_version < 90)
1004                 mdev->ov_start_sector = 0;
1005         mdev->rs_total = drbd_bm_bits(mdev);
1006         mdev->ov_position = 0;
1007         if (cs == C_VERIFY_T) {
1008                 /* starting online verify from an arbitrary position
1009                  * does not fit well into the existing protocol.
1010                  * on C_VERIFY_T, we initialize ov_left and friends
1011                  * implicitly in receive_DataRequest once the
1012                  * first P_OV_REQUEST is received */
1013                 mdev->ov_start_sector = ~(sector_t)0;
1014         } else {
1015                 unsigned long bit = BM_SECT_TO_BIT(mdev->ov_start_sector);
1016                 if (bit >= mdev->rs_total) {
1017                         mdev->ov_start_sector =
1018                                 BM_BIT_TO_SECT(mdev->rs_total - 1);
1019                         mdev->rs_total = 1;
1020                 } else
1021                         mdev->rs_total -= bit;
1022                 mdev->ov_position = mdev->ov_start_sector;
1023         }
1024         mdev->ov_left = mdev->rs_total;
1025 }
1026
1027 static void drbd_resume_al(struct drbd_conf *mdev)
1028 {
1029         if (test_and_clear_bit(AL_SUSPENDED, &mdev->flags))
1030                 dev_info(DEV, "Resumed AL updates\n");
1031 }
1032
1033 /**
1034  * __drbd_set_state() - Set a new DRBD state
1035  * @mdev:       DRBD device.
1036  * @ns:         new state.
1037  * @flags:      Flags
1038  * @done:       Optional completion, that will get completed after the after_state_ch() finished
1039  *
1040  * Caller needs to hold req_lock, and global_state_lock. Do not call directly.
1041  */
1042 enum drbd_state_rv
1043 __drbd_set_state(struct drbd_conf *mdev, union drbd_state ns,
1044                  enum chg_state_flags flags, struct completion *done)
1045 {
1046         union drbd_state os;
1047         enum drbd_state_rv rv = SS_SUCCESS;
1048         const char *warn_sync_abort = NULL;
1049         struct after_state_chg_work *ascw;
1050
1051         os = mdev->state;
1052
1053         ns = sanitize_state(mdev, os, ns, &warn_sync_abort);
1054
1055         if (ns.i == os.i)
1056                 return SS_NOTHING_TO_DO;
1057
1058         if (!(flags & CS_HARD)) {
1059                 /*  pre-state-change checks ; only look at ns  */
1060                 /* See drbd_state_sw_errors in drbd_strings.c */
1061
1062                 rv = is_valid_state(mdev, ns);
1063                 if (rv < SS_SUCCESS) {
1064                         /* If the old state was illegal as well, then let
1065                            this happen...*/
1066
1067                         if (is_valid_state(mdev, os) == rv)
1068                                 rv = is_valid_state_transition(mdev, ns, os);
1069                 } else
1070                         rv = is_valid_state_transition(mdev, ns, os);
1071         }
1072
1073         if (rv < SS_SUCCESS) {
1074                 if (flags & CS_VERBOSE)
1075                         print_st_err(mdev, os, ns, rv);
1076                 return rv;
1077         }
1078
1079         if (warn_sync_abort)
1080                 dev_warn(DEV, "%s aborted.\n", warn_sync_abort);
1081
1082         {
1083         char *pbp, pb[300];
1084         pbp = pb;
1085         *pbp = 0;
1086         if (ns.role != os.role)
1087                 pbp += sprintf(pbp, "role( %s -> %s ) ",
1088                                drbd_role_str(os.role),
1089                                drbd_role_str(ns.role));
1090         if (ns.peer != os.peer)
1091                 pbp += sprintf(pbp, "peer( %s -> %s ) ",
1092                                drbd_role_str(os.peer),
1093                                drbd_role_str(ns.peer));
1094         if (ns.conn != os.conn)
1095                 pbp += sprintf(pbp, "conn( %s -> %s ) ",
1096                                drbd_conn_str(os.conn),
1097                                drbd_conn_str(ns.conn));
1098         if (ns.disk != os.disk)
1099                 pbp += sprintf(pbp, "disk( %s -> %s ) ",
1100                                drbd_disk_str(os.disk),
1101                                drbd_disk_str(ns.disk));
1102         if (ns.pdsk != os.pdsk)
1103                 pbp += sprintf(pbp, "pdsk( %s -> %s ) ",
1104                                drbd_disk_str(os.pdsk),
1105                                drbd_disk_str(ns.pdsk));
1106         if (is_susp(ns) != is_susp(os))
1107                 pbp += sprintf(pbp, "susp( %d -> %d ) ",
1108                                is_susp(os),
1109                                is_susp(ns));
1110         if (ns.aftr_isp != os.aftr_isp)
1111                 pbp += sprintf(pbp, "aftr_isp( %d -> %d ) ",
1112                                os.aftr_isp,
1113                                ns.aftr_isp);
1114         if (ns.peer_isp != os.peer_isp)
1115                 pbp += sprintf(pbp, "peer_isp( %d -> %d ) ",
1116                                os.peer_isp,
1117                                ns.peer_isp);
1118         if (ns.user_isp != os.user_isp)
1119                 pbp += sprintf(pbp, "user_isp( %d -> %d ) ",
1120                                os.user_isp,
1121                                ns.user_isp);
1122         dev_info(DEV, "%s\n", pb);
1123         }
1124
1125         /* solve the race between becoming unconfigured,
1126          * worker doing the cleanup, and
1127          * admin reconfiguring us:
1128          * on (re)configure, first set CONFIG_PENDING,
1129          * then wait for a potentially exiting worker,
1130          * start the worker, and schedule one no_op.
1131          * then proceed with configuration.
1132          */
1133         if (ns.disk == D_DISKLESS &&
1134             ns.conn == C_STANDALONE &&
1135             ns.role == R_SECONDARY &&
1136             !test_and_set_bit(CONFIG_PENDING, &mdev->flags))
1137                 set_bit(DEVICE_DYING, &mdev->flags);
1138
1139         /* if we are going -> D_FAILED or D_DISKLESS, grab one extra reference
1140          * on the ldev here, to be sure the transition -> D_DISKLESS resp.
1141          * drbd_ldev_destroy() won't happen before our corresponding
1142          * after_state_ch works run, where we put_ldev again. */
1143         if ((os.disk != D_FAILED && ns.disk == D_FAILED) ||
1144             (os.disk != D_DISKLESS && ns.disk == D_DISKLESS))
1145                 atomic_inc(&mdev->local_cnt);
1146
1147         mdev->state = ns;
1148
1149         if (os.disk == D_ATTACHING && ns.disk >= D_NEGOTIATING)
1150                 drbd_print_uuids(mdev, "attached to UUIDs");
1151
1152         wake_up(&mdev->misc_wait);
1153         wake_up(&mdev->state_wait);
1154
1155         /* aborted verify run. log the last position */
1156         if ((os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) &&
1157             ns.conn < C_CONNECTED) {
1158                 mdev->ov_start_sector =
1159                         BM_BIT_TO_SECT(drbd_bm_bits(mdev) - mdev->ov_left);
1160                 dev_info(DEV, "Online Verify reached sector %llu\n",
1161                         (unsigned long long)mdev->ov_start_sector);
1162         }
1163
1164         if ((os.conn == C_PAUSED_SYNC_T || os.conn == C_PAUSED_SYNC_S) &&
1165             (ns.conn == C_SYNC_TARGET  || ns.conn == C_SYNC_SOURCE)) {
1166                 dev_info(DEV, "Syncer continues.\n");
1167                 mdev->rs_paused += (long)jiffies
1168                                   -(long)mdev->rs_mark_time[mdev->rs_last_mark];
1169                 if (ns.conn == C_SYNC_TARGET)
1170                         mod_timer(&mdev->resync_timer, jiffies);
1171         }
1172
1173         if ((os.conn == C_SYNC_TARGET  || os.conn == C_SYNC_SOURCE) &&
1174             (ns.conn == C_PAUSED_SYNC_T || ns.conn == C_PAUSED_SYNC_S)) {
1175                 dev_info(DEV, "Resync suspended\n");
1176                 mdev->rs_mark_time[mdev->rs_last_mark] = jiffies;
1177         }
1178
1179         if (os.conn == C_CONNECTED &&
1180             (ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T)) {
1181                 unsigned long now = jiffies;
1182                 int i;
1183
1184                 set_ov_position(mdev, ns.conn);
1185                 mdev->rs_start = now;
1186                 mdev->rs_last_events = 0;
1187                 mdev->rs_last_sect_ev = 0;
1188                 mdev->ov_last_oos_size = 0;
1189                 mdev->ov_last_oos_start = 0;
1190
1191                 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1192                         mdev->rs_mark_left[i] = mdev->ov_left;
1193                         mdev->rs_mark_time[i] = now;
1194                 }
1195
1196                 drbd_rs_controller_reset(mdev);
1197
1198                 if (ns.conn == C_VERIFY_S) {
1199                         dev_info(DEV, "Starting Online Verify from sector %llu\n",
1200                                         (unsigned long long)mdev->ov_position);
1201                         mod_timer(&mdev->resync_timer, jiffies);
1202                 }
1203         }
1204
1205         if (get_ldev(mdev)) {
1206                 u32 mdf = mdev->ldev->md.flags & ~(MDF_CONSISTENT|MDF_PRIMARY_IND|
1207                                                  MDF_CONNECTED_IND|MDF_WAS_UP_TO_DATE|
1208                                                  MDF_PEER_OUT_DATED|MDF_CRASHED_PRIMARY);
1209
1210                 if (test_bit(CRASHED_PRIMARY, &mdev->flags))
1211                         mdf |= MDF_CRASHED_PRIMARY;
1212                 if (mdev->state.role == R_PRIMARY ||
1213                     (mdev->state.pdsk < D_INCONSISTENT && mdev->state.peer == R_PRIMARY))
1214                         mdf |= MDF_PRIMARY_IND;
1215                 if (mdev->state.conn > C_WF_REPORT_PARAMS)
1216                         mdf |= MDF_CONNECTED_IND;
1217                 if (mdev->state.disk > D_INCONSISTENT)
1218                         mdf |= MDF_CONSISTENT;
1219                 if (mdev->state.disk > D_OUTDATED)
1220                         mdf |= MDF_WAS_UP_TO_DATE;
1221                 if (mdev->state.pdsk <= D_OUTDATED && mdev->state.pdsk >= D_INCONSISTENT)
1222                         mdf |= MDF_PEER_OUT_DATED;
1223                 if (mdf != mdev->ldev->md.flags) {
1224                         mdev->ldev->md.flags = mdf;
1225                         drbd_md_mark_dirty(mdev);
1226                 }
1227                 if (os.disk < D_CONSISTENT && ns.disk >= D_CONSISTENT)
1228                         drbd_set_ed_uuid(mdev, mdev->ldev->md.uuid[UI_CURRENT]);
1229                 put_ldev(mdev);
1230         }
1231
1232         /* Peer was forced D_UP_TO_DATE & R_PRIMARY, consider to resync */
1233         if (os.disk == D_INCONSISTENT && os.pdsk == D_INCONSISTENT &&
1234             os.peer == R_SECONDARY && ns.peer == R_PRIMARY)
1235                 set_bit(CONSIDER_RESYNC, &mdev->flags);
1236
1237         /* Receiver should clean up itself */
1238         if (os.conn != C_DISCONNECTING && ns.conn == C_DISCONNECTING)
1239                 drbd_thread_stop_nowait(&mdev->receiver);
1240
1241         /* Now the receiver finished cleaning up itself, it should die */
1242         if (os.conn != C_STANDALONE && ns.conn == C_STANDALONE)
1243                 drbd_thread_stop_nowait(&mdev->receiver);
1244
1245         /* Upon network failure, we need to restart the receiver. */
1246         if (os.conn > C_TEAR_DOWN &&
1247             ns.conn <= C_TEAR_DOWN && ns.conn >= C_TIMEOUT)
1248                 drbd_thread_restart_nowait(&mdev->receiver);
1249
1250         /* Resume AL writing if we get a connection */
1251         if (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED)
1252                 drbd_resume_al(mdev);
1253
1254         ascw = kmalloc(sizeof(*ascw), GFP_ATOMIC);
1255         if (ascw) {
1256                 ascw->os = os;
1257                 ascw->ns = ns;
1258                 ascw->flags = flags;
1259                 ascw->w.cb = w_after_state_ch;
1260                 ascw->done = done;
1261                 drbd_queue_work(&mdev->data.work, &ascw->w);
1262         } else {
1263                 dev_warn(DEV, "Could not kmalloc an ascw\n");
1264         }
1265
1266         return rv;
1267 }
1268
1269 static int w_after_state_ch(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1270 {
1271         struct after_state_chg_work *ascw =
1272                 container_of(w, struct after_state_chg_work, w);
1273         after_state_ch(mdev, ascw->os, ascw->ns, ascw->flags);
1274         if (ascw->flags & CS_WAIT_COMPLETE) {
1275                 D_ASSERT(ascw->done != NULL);
1276                 complete(ascw->done);
1277         }
1278         kfree(ascw);
1279
1280         return 1;
1281 }
1282
1283 static void abw_start_sync(struct drbd_conf *mdev, int rv)
1284 {
1285         if (rv) {
1286                 dev_err(DEV, "Writing the bitmap failed not starting resync.\n");
1287                 _drbd_request_state(mdev, NS(conn, C_CONNECTED), CS_VERBOSE);
1288                 return;
1289         }
1290
1291         switch (mdev->state.conn) {
1292         case C_STARTING_SYNC_T:
1293                 _drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
1294                 break;
1295         case C_STARTING_SYNC_S:
1296                 drbd_start_resync(mdev, C_SYNC_SOURCE);
1297                 break;
1298         }
1299 }
1300
1301 int drbd_bitmap_io_from_worker(struct drbd_conf *mdev,
1302                 int (*io_fn)(struct drbd_conf *),
1303                 char *why, enum bm_flag flags)
1304 {
1305         int rv;
1306
1307         D_ASSERT(current == mdev->worker.task);
1308
1309         /* open coded non-blocking drbd_suspend_io(mdev); */
1310         set_bit(SUSPEND_IO, &mdev->flags);
1311
1312         drbd_bm_lock(mdev, why, flags);
1313         rv = io_fn(mdev);
1314         drbd_bm_unlock(mdev);
1315
1316         drbd_resume_io(mdev);
1317
1318         return rv;
1319 }
1320
1321 /**
1322  * after_state_ch() - Perform after state change actions that may sleep
1323  * @mdev:       DRBD device.
1324  * @os:         old state.
1325  * @ns:         new state.
1326  * @flags:      Flags
1327  */
1328 static void after_state_ch(struct drbd_conf *mdev, union drbd_state os,
1329                            union drbd_state ns, enum chg_state_flags flags)
1330 {
1331         enum drbd_fencing_p fp;
1332         enum drbd_req_event what = nothing;
1333         union drbd_state nsm = (union drbd_state){ .i = -1 };
1334
1335         if (os.conn != C_CONNECTED && ns.conn == C_CONNECTED) {
1336                 clear_bit(CRASHED_PRIMARY, &mdev->flags);
1337                 if (mdev->p_uuid)
1338                         mdev->p_uuid[UI_FLAGS] &= ~((u64)2);
1339         }
1340
1341         fp = FP_DONT_CARE;
1342         if (get_ldev(mdev)) {
1343                 fp = mdev->ldev->dc.fencing;
1344                 put_ldev(mdev);
1345         }
1346
1347         /* Inform userspace about the change... */
1348         drbd_bcast_state(mdev, ns);
1349
1350         if (!(os.role == R_PRIMARY && os.disk < D_UP_TO_DATE && os.pdsk < D_UP_TO_DATE) &&
1351             (ns.role == R_PRIMARY && ns.disk < D_UP_TO_DATE && ns.pdsk < D_UP_TO_DATE))
1352                 drbd_khelper(mdev, "pri-on-incon-degr");
1353
1354         /* Here we have the actions that are performed after a
1355            state change. This function might sleep */
1356
1357         nsm.i = -1;
1358         if (ns.susp_nod) {
1359                 if (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED)
1360                         what = resend;
1361
1362                 if (os.disk == D_ATTACHING && ns.disk > D_ATTACHING)
1363                         what = restart_frozen_disk_io;
1364
1365                 if (what != nothing)
1366                         nsm.susp_nod = 0;
1367         }
1368
1369         if (ns.susp_fen) {
1370                 /* case1: The outdate peer handler is successful: */
1371                 if (os.pdsk > D_OUTDATED  && ns.pdsk <= D_OUTDATED) {
1372                         tl_clear(mdev);
1373                         if (test_bit(NEW_CUR_UUID, &mdev->flags)) {
1374                                 drbd_uuid_new_current(mdev);
1375                                 clear_bit(NEW_CUR_UUID, &mdev->flags);
1376                         }
1377                         spin_lock_irq(&mdev->req_lock);
1378                         _drbd_set_state(_NS(mdev, susp_fen, 0), CS_VERBOSE, NULL);
1379                         spin_unlock_irq(&mdev->req_lock);
1380                 }
1381                 /* case2: The connection was established again: */
1382                 if (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED) {
1383                         clear_bit(NEW_CUR_UUID, &mdev->flags);
1384                         what = resend;
1385                         nsm.susp_fen = 0;
1386                 }
1387         }
1388
1389         if (what != nothing) {
1390                 spin_lock_irq(&mdev->req_lock);
1391                 _tl_restart(mdev, what);
1392                 nsm.i &= mdev->state.i;
1393                 _drbd_set_state(mdev, nsm, CS_VERBOSE, NULL);
1394                 spin_unlock_irq(&mdev->req_lock);
1395         }
1396
1397         /* Became sync source.  With protocol >= 96, we still need to send out
1398          * the sync uuid now. Need to do that before any drbd_send_state, or
1399          * the other side may go "paused sync" before receiving the sync uuids,
1400          * which is unexpected. */
1401         if ((os.conn != C_SYNC_SOURCE && os.conn != C_PAUSED_SYNC_S) &&
1402             (ns.conn == C_SYNC_SOURCE || ns.conn == C_PAUSED_SYNC_S) &&
1403             mdev->agreed_pro_version >= 96 && get_ldev(mdev)) {
1404                 drbd_gen_and_send_sync_uuid(mdev);
1405                 put_ldev(mdev);
1406         }
1407
1408         /* Do not change the order of the if above and the two below... */
1409         if (os.pdsk == D_DISKLESS && ns.pdsk > D_DISKLESS) {      /* attach on the peer */
1410                 drbd_send_uuids(mdev);
1411                 drbd_send_state(mdev);
1412         }
1413         /* No point in queuing send_bitmap if we don't have a connection
1414          * anymore, so check also the _current_ state, not only the new state
1415          * at the time this work was queued. */
1416         if (os.conn != C_WF_BITMAP_S && ns.conn == C_WF_BITMAP_S &&
1417             mdev->state.conn == C_WF_BITMAP_S)
1418                 drbd_queue_bitmap_io(mdev, &drbd_send_bitmap, NULL,
1419                                 "send_bitmap (WFBitMapS)",
1420                                 BM_LOCKED_TEST_ALLOWED);
1421
1422         /* Lost contact to peer's copy of the data */
1423         if ((os.pdsk >= D_INCONSISTENT &&
1424              os.pdsk != D_UNKNOWN &&
1425              os.pdsk != D_OUTDATED)
1426         &&  (ns.pdsk < D_INCONSISTENT ||
1427              ns.pdsk == D_UNKNOWN ||
1428              ns.pdsk == D_OUTDATED)) {
1429                 if (get_ldev(mdev)) {
1430                         if ((ns.role == R_PRIMARY || ns.peer == R_PRIMARY) &&
1431                             mdev->ldev->md.uuid[UI_BITMAP] == 0 && ns.disk >= D_UP_TO_DATE) {
1432                                 if (is_susp(mdev->state)) {
1433                                         set_bit(NEW_CUR_UUID, &mdev->flags);
1434                                 } else {
1435                                         drbd_uuid_new_current(mdev);
1436                                         drbd_send_uuids(mdev);
1437                                 }
1438                         }
1439                         put_ldev(mdev);
1440                 }
1441         }
1442
1443         if (ns.pdsk < D_INCONSISTENT && get_ldev(mdev)) {
1444                 if (ns.peer == R_PRIMARY && mdev->ldev->md.uuid[UI_BITMAP] == 0) {
1445                         drbd_uuid_new_current(mdev);
1446                         drbd_send_uuids(mdev);
1447                 }
1448
1449                 /* D_DISKLESS Peer becomes secondary */
1450                 if (os.peer == R_PRIMARY && ns.peer == R_SECONDARY)
1451                         /* We may still be Primary ourselves.
1452                          * No harm done if the bitmap still changes,
1453                          * redirtied pages will follow later. */
1454                         drbd_bitmap_io_from_worker(mdev, &drbd_bm_write,
1455                                 "demote diskless peer", BM_LOCKED_SET_ALLOWED);
1456                 put_ldev(mdev);
1457         }
1458
1459         /* Write out all changed bits on demote.
1460          * Though, no need to da that just yet
1461          * if there is a resync going on still */
1462         if (os.role == R_PRIMARY && ns.role == R_SECONDARY &&
1463                 mdev->state.conn <= C_CONNECTED && get_ldev(mdev)) {
1464                 /* No changes to the bitmap expected this time, so assert that,
1465                  * even though no harm was done if it did change. */
1466                 drbd_bitmap_io_from_worker(mdev, &drbd_bm_write,
1467                                 "demote", BM_LOCKED_TEST_ALLOWED);
1468                 put_ldev(mdev);
1469         }
1470
1471         /* Last part of the attaching process ... */
1472         if (ns.conn >= C_CONNECTED &&
1473             os.disk == D_ATTACHING && ns.disk == D_NEGOTIATING) {
1474                 drbd_send_sizes(mdev, 0, 0);  /* to start sync... */
1475                 drbd_send_uuids(mdev);
1476                 drbd_send_state(mdev);
1477         }
1478
1479         /* We want to pause/continue resync, tell peer. */
1480         if (ns.conn >= C_CONNECTED &&
1481              ((os.aftr_isp != ns.aftr_isp) ||
1482               (os.user_isp != ns.user_isp)))
1483                 drbd_send_state(mdev);
1484
1485         /* In case one of the isp bits got set, suspend other devices. */
1486         if ((!os.aftr_isp && !os.peer_isp && !os.user_isp) &&
1487             (ns.aftr_isp || ns.peer_isp || ns.user_isp))
1488                 suspend_other_sg(mdev);
1489
1490         /* Make sure the peer gets informed about eventual state
1491            changes (ISP bits) while we were in WFReportParams. */
1492         if (os.conn == C_WF_REPORT_PARAMS && ns.conn >= C_CONNECTED)
1493                 drbd_send_state(mdev);
1494
1495         if (os.conn != C_AHEAD && ns.conn == C_AHEAD)
1496                 drbd_send_state(mdev);
1497
1498         /* We are in the progress to start a full sync... */
1499         if ((os.conn != C_STARTING_SYNC_T && ns.conn == C_STARTING_SYNC_T) ||
1500             (os.conn != C_STARTING_SYNC_S && ns.conn == C_STARTING_SYNC_S))
1501                 /* no other bitmap changes expected during this phase */
1502                 drbd_queue_bitmap_io(mdev,
1503                         &drbd_bmio_set_n_write, &abw_start_sync,
1504                         "set_n_write from StartingSync", BM_LOCKED_TEST_ALLOWED);
1505
1506         /* We are invalidating our self... */
1507         if (os.conn < C_CONNECTED && ns.conn < C_CONNECTED &&
1508             os.disk > D_INCONSISTENT && ns.disk == D_INCONSISTENT)
1509                 /* other bitmap operation expected during this phase */
1510                 drbd_queue_bitmap_io(mdev, &drbd_bmio_set_n_write, NULL,
1511                         "set_n_write from invalidate", BM_LOCKED_MASK);
1512
1513         /* first half of local IO error, failure to attach,
1514          * or administrative detach */
1515         if (os.disk != D_FAILED && ns.disk == D_FAILED) {
1516                 enum drbd_io_error_p eh;
1517                 int was_io_error;
1518                 /* corresponding get_ldev was in __drbd_set_state, to serialize
1519                  * our cleanup here with the transition to D_DISKLESS,
1520                  * so it is safe to dreference ldev here. */
1521                 eh = mdev->ldev->dc.on_io_error;
1522                 was_io_error = test_and_clear_bit(WAS_IO_ERROR, &mdev->flags);
1523
1524                 /* current state still has to be D_FAILED,
1525                  * there is only one way out: to D_DISKLESS,
1526                  * and that may only happen after our put_ldev below. */
1527                 if (mdev->state.disk != D_FAILED)
1528                         dev_err(DEV,
1529                                 "ASSERT FAILED: disk is %s during detach\n",
1530                                 drbd_disk_str(mdev->state.disk));
1531
1532                 if (drbd_send_state(mdev))
1533                         dev_info(DEV, "Notified peer that I am detaching my disk\n");
1534
1535                 drbd_rs_cancel_all(mdev);
1536
1537                 /* In case we want to get something to stable storage still,
1538                  * this may be the last chance.
1539                  * Following put_ldev may transition to D_DISKLESS. */
1540                 drbd_md_sync(mdev);
1541                 put_ldev(mdev);
1542
1543                 if (was_io_error && eh == EP_CALL_HELPER)
1544                         drbd_khelper(mdev, "local-io-error");
1545         }
1546
1547         /* second half of local IO error, failure to attach,
1548          * or administrative detach,
1549          * after local_cnt references have reached zero again */
1550         if (os.disk != D_DISKLESS && ns.disk == D_DISKLESS) {
1551                 /* We must still be diskless,
1552                  * re-attach has to be serialized with this! */
1553                 if (mdev->state.disk != D_DISKLESS)
1554                         dev_err(DEV,
1555                                 "ASSERT FAILED: disk is %s while going diskless\n",
1556                                 drbd_disk_str(mdev->state.disk));
1557
1558                 mdev->rs_total = 0;
1559                 mdev->rs_failed = 0;
1560                 atomic_set(&mdev->rs_pending_cnt, 0);
1561
1562                 if (drbd_send_state(mdev))
1563                         dev_info(DEV, "Notified peer that I'm now diskless.\n");
1564                 /* corresponding get_ldev in __drbd_set_state
1565                  * this may finally trigger drbd_ldev_destroy. */
1566                 put_ldev(mdev);
1567         }
1568
1569         /* Notify peer that I had a local IO error, and did not detached.. */
1570         if (os.disk == D_UP_TO_DATE && ns.disk == D_INCONSISTENT)
1571                 drbd_send_state(mdev);
1572
1573         /* Disks got bigger while they were detached */
1574         if (ns.disk > D_NEGOTIATING && ns.pdsk > D_NEGOTIATING &&
1575             test_and_clear_bit(RESYNC_AFTER_NEG, &mdev->flags)) {
1576                 if (ns.conn == C_CONNECTED)
1577                         resync_after_online_grow(mdev);
1578         }
1579
1580         /* A resync finished or aborted, wake paused devices... */
1581         if ((os.conn > C_CONNECTED && ns.conn <= C_CONNECTED) ||
1582             (os.peer_isp && !ns.peer_isp) ||
1583             (os.user_isp && !ns.user_isp))
1584                 resume_next_sg(mdev);
1585
1586         /* sync target done with resync.  Explicitly notify peer, even though
1587          * it should (at least for non-empty resyncs) already know itself. */
1588         if (os.disk < D_UP_TO_DATE && os.conn >= C_SYNC_SOURCE && ns.conn == C_CONNECTED)
1589                 drbd_send_state(mdev);
1590
1591         /* This triggers bitmap writeout of potentially still unwritten pages
1592          * if the resync finished cleanly, or aborted because of peer disk
1593          * failure, or because of connection loss.
1594          * For resync aborted because of local disk failure, we cannot do
1595          * any bitmap writeout anymore.
1596          * No harm done if some bits change during this phase.
1597          */
1598         if (os.conn > C_CONNECTED && ns.conn <= C_CONNECTED && get_ldev(mdev)) {
1599                 drbd_queue_bitmap_io(mdev, &drbd_bm_write, NULL,
1600                         "write from resync_finished", BM_LOCKED_SET_ALLOWED);
1601                 put_ldev(mdev);
1602         }
1603
1604         /* free tl_hash if we Got thawed and are C_STANDALONE */
1605         if (ns.conn == C_STANDALONE && !is_susp(ns) && mdev->tl_hash)
1606                 drbd_free_tl_hash(mdev);
1607
1608         /* Upon network connection, we need to start the receiver */
1609         if (os.conn == C_STANDALONE && ns.conn == C_UNCONNECTED)
1610                 drbd_thread_start(&mdev->receiver);
1611
1612         /* Terminate worker thread if we are unconfigured - it will be
1613            restarted as needed... */
1614         if (ns.disk == D_DISKLESS &&
1615             ns.conn == C_STANDALONE &&
1616             ns.role == R_SECONDARY) {
1617                 if (os.aftr_isp != ns.aftr_isp)
1618                         resume_next_sg(mdev);
1619                 /* set in __drbd_set_state, unless CONFIG_PENDING was set */
1620                 if (test_bit(DEVICE_DYING, &mdev->flags))
1621                         drbd_thread_stop_nowait(&mdev->worker);
1622         }
1623
1624         drbd_md_sync(mdev);
1625 }
1626
1627
1628 static int drbd_thread_setup(void *arg)
1629 {
1630         struct drbd_thread *thi = (struct drbd_thread *) arg;
1631         struct drbd_conf *mdev = thi->mdev;
1632         unsigned long flags;
1633         int retval;
1634
1635 restart:
1636         retval = thi->function(thi);
1637
1638         spin_lock_irqsave(&thi->t_lock, flags);
1639
1640         /* if the receiver has been "Exiting", the last thing it did
1641          * was set the conn state to "StandAlone",
1642          * if now a re-connect request comes in, conn state goes C_UNCONNECTED,
1643          * and receiver thread will be "started".
1644          * drbd_thread_start needs to set "Restarting" in that case.
1645          * t_state check and assignment needs to be within the same spinlock,
1646          * so either thread_start sees Exiting, and can remap to Restarting,
1647          * or thread_start see None, and can proceed as normal.
1648          */
1649
1650         if (thi->t_state == Restarting) {
1651                 dev_info(DEV, "Restarting %s\n", current->comm);
1652                 thi->t_state = Running;
1653                 spin_unlock_irqrestore(&thi->t_lock, flags);
1654                 goto restart;
1655         }
1656
1657         thi->task = NULL;
1658         thi->t_state = None;
1659         smp_mb();
1660         complete(&thi->stop);
1661         spin_unlock_irqrestore(&thi->t_lock, flags);
1662
1663         dev_info(DEV, "Terminating %s\n", current->comm);
1664
1665         /* Release mod reference taken when thread was started */
1666         module_put(THIS_MODULE);
1667         return retval;
1668 }
1669
1670 static void drbd_thread_init(struct drbd_conf *mdev, struct drbd_thread *thi,
1671                       int (*func) (struct drbd_thread *))
1672 {
1673         spin_lock_init(&thi->t_lock);
1674         thi->task    = NULL;
1675         thi->t_state = None;
1676         thi->function = func;
1677         thi->mdev = mdev;
1678 }
1679
1680 int drbd_thread_start(struct drbd_thread *thi)
1681 {
1682         struct drbd_conf *mdev = thi->mdev;
1683         struct task_struct *nt;
1684         unsigned long flags;
1685
1686         const char *me =
1687                 thi == &mdev->receiver ? "receiver" :
1688                 thi == &mdev->asender  ? "asender"  :
1689                 thi == &mdev->worker   ? "worker"   : "NONSENSE";
1690
1691         /* is used from state engine doing drbd_thread_stop_nowait,
1692          * while holding the req lock irqsave */
1693         spin_lock_irqsave(&thi->t_lock, flags);
1694
1695         switch (thi->t_state) {
1696         case None:
1697                 dev_info(DEV, "Starting %s thread (from %s [%d])\n",
1698                                 me, current->comm, current->pid);
1699
1700                 /* Get ref on module for thread - this is released when thread exits */
1701                 if (!try_module_get(THIS_MODULE)) {
1702                         dev_err(DEV, "Failed to get module reference in drbd_thread_start\n");
1703                         spin_unlock_irqrestore(&thi->t_lock, flags);
1704                         return false;
1705                 }
1706
1707                 init_completion(&thi->stop);
1708                 D_ASSERT(thi->task == NULL);
1709                 thi->reset_cpu_mask = 1;
1710                 thi->t_state = Running;
1711                 spin_unlock_irqrestore(&thi->t_lock, flags);
1712                 flush_signals(current); /* otherw. may get -ERESTARTNOINTR */
1713
1714                 nt = kthread_create(drbd_thread_setup, (void *) thi,
1715                                     "drbd%d_%s", mdev_to_minor(mdev), me);
1716
1717                 if (IS_ERR(nt)) {
1718                         dev_err(DEV, "Couldn't start thread\n");
1719
1720                         module_put(THIS_MODULE);
1721                         return false;
1722                 }
1723                 spin_lock_irqsave(&thi->t_lock, flags);
1724                 thi->task = nt;
1725                 thi->t_state = Running;
1726                 spin_unlock_irqrestore(&thi->t_lock, flags);
1727                 wake_up_process(nt);
1728                 break;
1729         case Exiting:
1730                 thi->t_state = Restarting;
1731                 dev_info(DEV, "Restarting %s thread (from %s [%d])\n",
1732                                 me, current->comm, current->pid);
1733                 /* fall through */
1734         case Running:
1735         case Restarting:
1736         default:
1737                 spin_unlock_irqrestore(&thi->t_lock, flags);
1738                 break;
1739         }
1740
1741         return true;
1742 }
1743
1744
1745 void _drbd_thread_stop(struct drbd_thread *thi, int restart, int wait)
1746 {
1747         unsigned long flags;
1748
1749         enum drbd_thread_state ns = restart ? Restarting : Exiting;
1750
1751         /* may be called from state engine, holding the req lock irqsave */
1752         spin_lock_irqsave(&thi->t_lock, flags);
1753
1754         if (thi->t_state == None) {
1755                 spin_unlock_irqrestore(&thi->t_lock, flags);
1756                 if (restart)
1757                         drbd_thread_start(thi);
1758                 return;
1759         }
1760
1761         if (thi->t_state != ns) {
1762                 if (thi->task == NULL) {
1763                         spin_unlock_irqrestore(&thi->t_lock, flags);
1764                         return;
1765                 }
1766
1767                 thi->t_state = ns;
1768                 smp_mb();
1769                 init_completion(&thi->stop);
1770                 if (thi->task != current)
1771                         force_sig(DRBD_SIGKILL, thi->task);
1772
1773         }
1774
1775         spin_unlock_irqrestore(&thi->t_lock, flags);
1776
1777         if (wait)
1778                 wait_for_completion(&thi->stop);
1779 }
1780
1781 #ifdef CONFIG_SMP
1782 /**
1783  * drbd_calc_cpu_mask() - Generate CPU masks, spread over all CPUs
1784  * @mdev:       DRBD device.
1785  *
1786  * Forces all threads of a device onto the same CPU. This is beneficial for
1787  * DRBD's performance. May be overwritten by user's configuration.
1788  */
1789 void drbd_calc_cpu_mask(struct drbd_conf *mdev)
1790 {
1791         int ord, cpu;
1792
1793         /* user override. */
1794         if (cpumask_weight(mdev->cpu_mask))
1795                 return;
1796
1797         ord = mdev_to_minor(mdev) % cpumask_weight(cpu_online_mask);
1798         for_each_online_cpu(cpu) {
1799                 if (ord-- == 0) {
1800                         cpumask_set_cpu(cpu, mdev->cpu_mask);
1801                         return;
1802                 }
1803         }
1804         /* should not be reached */
1805         cpumask_setall(mdev->cpu_mask);
1806 }
1807
1808 /**
1809  * drbd_thread_current_set_cpu() - modifies the cpu mask of the _current_ thread
1810  * @mdev:       DRBD device.
1811  *
1812  * call in the "main loop" of _all_ threads, no need for any mutex, current won't die
1813  * prematurely.
1814  */
1815 void drbd_thread_current_set_cpu(struct drbd_conf *mdev)
1816 {
1817         struct task_struct *p = current;
1818         struct drbd_thread *thi =
1819                 p == mdev->asender.task  ? &mdev->asender  :
1820                 p == mdev->receiver.task ? &mdev->receiver :
1821                 p == mdev->worker.task   ? &mdev->worker   :
1822                 NULL;
1823         ERR_IF(thi == NULL)
1824                 return;
1825         if (!thi->reset_cpu_mask)
1826                 return;
1827         thi->reset_cpu_mask = 0;
1828         set_cpus_allowed_ptr(p, mdev->cpu_mask);
1829 }
1830 #endif
1831
1832 /* the appropriate socket mutex must be held already */
1833 int _drbd_send_cmd(struct drbd_conf *mdev, struct socket *sock,
1834                           enum drbd_packets cmd, struct p_header80 *h,
1835                           size_t size, unsigned msg_flags)
1836 {
1837         int sent, ok;
1838
1839         ERR_IF(!h) return false;
1840         ERR_IF(!size) return false;
1841
1842         h->magic   = BE_DRBD_MAGIC;
1843         h->command = cpu_to_be16(cmd);
1844         h->length  = cpu_to_be16(size-sizeof(struct p_header80));
1845
1846         sent = drbd_send(mdev, sock, h, size, msg_flags);
1847
1848         ok = (sent == size);
1849         if (!ok && !signal_pending(current))
1850                 dev_warn(DEV, "short sent %s size=%d sent=%d\n",
1851                     cmdname(cmd), (int)size, sent);
1852         return ok;
1853 }
1854
1855 /* don't pass the socket. we may only look at it
1856  * when we hold the appropriate socket mutex.
1857  */
1858 int drbd_send_cmd(struct drbd_conf *mdev, int use_data_socket,
1859                   enum drbd_packets cmd, struct p_header80 *h, size_t size)
1860 {
1861         int ok = 0;
1862         struct socket *sock;
1863
1864         if (use_data_socket) {
1865                 mutex_lock(&mdev->data.mutex);
1866                 sock = mdev->data.socket;
1867         } else {
1868                 mutex_lock(&mdev->meta.mutex);
1869                 sock = mdev->meta.socket;
1870         }
1871
1872         /* drbd_disconnect() could have called drbd_free_sock()
1873          * while we were waiting in down()... */
1874         if (likely(sock != NULL))
1875                 ok = _drbd_send_cmd(mdev, sock, cmd, h, size, 0);
1876
1877         if (use_data_socket)
1878                 mutex_unlock(&mdev->data.mutex);
1879         else
1880                 mutex_unlock(&mdev->meta.mutex);
1881         return ok;
1882 }
1883
1884 int drbd_send_cmd2(struct drbd_conf *mdev, enum drbd_packets cmd, char *data,
1885                    size_t size)
1886 {
1887         struct p_header80 h;
1888         int ok;
1889
1890         h.magic   = BE_DRBD_MAGIC;
1891         h.command = cpu_to_be16(cmd);
1892         h.length  = cpu_to_be16(size);
1893
1894         if (!drbd_get_data_sock(mdev))
1895                 return 0;
1896
1897         ok = (sizeof(h) ==
1898                 drbd_send(mdev, mdev->data.socket, &h, sizeof(h), 0));
1899         ok = ok && (size ==
1900                 drbd_send(mdev, mdev->data.socket, data, size, 0));
1901
1902         drbd_put_data_sock(mdev);
1903
1904         return ok;
1905 }
1906
1907 int drbd_send_sync_param(struct drbd_conf *mdev, struct syncer_conf *sc)
1908 {
1909         struct p_rs_param_95 *p;
1910         struct socket *sock;
1911         int size, rv;
1912         const int apv = mdev->agreed_pro_version;
1913
1914         size = apv <= 87 ? sizeof(struct p_rs_param)
1915                 : apv == 88 ? sizeof(struct p_rs_param)
1916                         + strlen(mdev->sync_conf.verify_alg) + 1
1917                 : apv <= 94 ? sizeof(struct p_rs_param_89)
1918                 : /* apv >= 95 */ sizeof(struct p_rs_param_95);
1919
1920         /* used from admin command context and receiver/worker context.
1921          * to avoid kmalloc, grab the socket right here,
1922          * then use the pre-allocated sbuf there */
1923         mutex_lock(&mdev->data.mutex);
1924         sock = mdev->data.socket;
1925
1926         if (likely(sock != NULL)) {
1927                 enum drbd_packets cmd = apv >= 89 ? P_SYNC_PARAM89 : P_SYNC_PARAM;
1928
1929                 p = &mdev->data.sbuf.rs_param_95;
1930
1931                 /* initialize verify_alg and csums_alg */
1932                 memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
1933
1934                 p->rate = cpu_to_be32(sc->rate);
1935                 p->c_plan_ahead = cpu_to_be32(sc->c_plan_ahead);
1936                 p->c_delay_target = cpu_to_be32(sc->c_delay_target);
1937                 p->c_fill_target = cpu_to_be32(sc->c_fill_target);
1938                 p->c_max_rate = cpu_to_be32(sc->c_max_rate);
1939
1940                 if (apv >= 88)
1941                         strcpy(p->verify_alg, mdev->sync_conf.verify_alg);
1942                 if (apv >= 89)
1943                         strcpy(p->csums_alg, mdev->sync_conf.csums_alg);
1944
1945                 rv = _drbd_send_cmd(mdev, sock, cmd, &p->head, size, 0);
1946         } else
1947                 rv = 0; /* not ok */
1948
1949         mutex_unlock(&mdev->data.mutex);
1950
1951         return rv;
1952 }
1953
1954 int drbd_send_protocol(struct drbd_conf *mdev)
1955 {
1956         struct p_protocol *p;
1957         int size, cf, rv;
1958
1959         size = sizeof(struct p_protocol);
1960
1961         if (mdev->agreed_pro_version >= 87)
1962                 size += strlen(mdev->net_conf->integrity_alg) + 1;
1963
1964         /* we must not recurse into our own queue,
1965          * as that is blocked during handshake */
1966         p = kmalloc(size, GFP_NOIO);
1967         if (p == NULL)
1968                 return 0;
1969
1970         p->protocol      = cpu_to_be32(mdev->net_conf->wire_protocol);
1971         p->after_sb_0p   = cpu_to_be32(mdev->net_conf->after_sb_0p);
1972         p->after_sb_1p   = cpu_to_be32(mdev->net_conf->after_sb_1p);
1973         p->after_sb_2p   = cpu_to_be32(mdev->net_conf->after_sb_2p);
1974         p->two_primaries = cpu_to_be32(mdev->net_conf->two_primaries);
1975
1976         cf = 0;
1977         if (mdev->net_conf->want_lose)
1978                 cf |= CF_WANT_LOSE;
1979         if (mdev->net_conf->dry_run) {
1980                 if (mdev->agreed_pro_version >= 92)
1981                         cf |= CF_DRY_RUN;
1982                 else {
1983                         dev_err(DEV, "--dry-run is not supported by peer");
1984                         kfree(p);
1985                         return -1;
1986                 }
1987         }
1988         p->conn_flags    = cpu_to_be32(cf);
1989
1990         if (mdev->agreed_pro_version >= 87)
1991                 strcpy(p->integrity_alg, mdev->net_conf->integrity_alg);
1992
1993         rv = drbd_send_cmd(mdev, USE_DATA_SOCKET, P_PROTOCOL,
1994                            (struct p_header80 *)p, size);
1995         kfree(p);
1996         return rv;
1997 }
1998
1999 int _drbd_send_uuids(struct drbd_conf *mdev, u64 uuid_flags)
2000 {
2001         struct p_uuids p;
2002         int i;
2003
2004         if (!get_ldev_if_state(mdev, D_NEGOTIATING))
2005                 return 1;
2006
2007         for (i = UI_CURRENT; i < UI_SIZE; i++)
2008                 p.uuid[i] = mdev->ldev ? cpu_to_be64(mdev->ldev->md.uuid[i]) : 0;
2009
2010         mdev->comm_bm_set = drbd_bm_total_weight(mdev);
2011         p.uuid[UI_SIZE] = cpu_to_be64(mdev->comm_bm_set);
2012         uuid_flags |= mdev->net_conf->want_lose ? 1 : 0;
2013         uuid_flags |= test_bit(CRASHED_PRIMARY, &mdev->flags) ? 2 : 0;
2014         uuid_flags |= mdev->new_state_tmp.disk == D_INCONSISTENT ? 4 : 0;
2015         p.uuid[UI_FLAGS] = cpu_to_be64(uuid_flags);
2016
2017         put_ldev(mdev);
2018
2019         return drbd_send_cmd(mdev, USE_DATA_SOCKET, P_UUIDS,
2020                              (struct p_header80 *)&p, sizeof(p));
2021 }
2022
2023 int drbd_send_uuids(struct drbd_conf *mdev)
2024 {
2025         return _drbd_send_uuids(mdev, 0);
2026 }
2027
2028 int drbd_send_uuids_skip_initial_sync(struct drbd_conf *mdev)
2029 {
2030         return _drbd_send_uuids(mdev, 8);
2031 }
2032
2033 void drbd_print_uuids(struct drbd_conf *mdev, const char *text)
2034 {
2035         if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
2036                 u64 *uuid = mdev->ldev->md.uuid;
2037                 dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX\n",
2038                      text,
2039                      (unsigned long long)uuid[UI_CURRENT],
2040                      (unsigned long long)uuid[UI_BITMAP],
2041                      (unsigned long long)uuid[UI_HISTORY_START],
2042                      (unsigned long long)uuid[UI_HISTORY_END]);
2043                 put_ldev(mdev);
2044         } else {
2045                 dev_info(DEV, "%s effective data uuid: %016llX\n",
2046                                 text,
2047                                 (unsigned long long)mdev->ed_uuid);
2048         }
2049 }
2050
2051 int drbd_gen_and_send_sync_uuid(struct drbd_conf *mdev)
2052 {
2053         struct p_rs_uuid p;
2054         u64 uuid;
2055
2056         D_ASSERT(mdev->state.disk == D_UP_TO_DATE);
2057
2058         uuid = mdev->ldev->md.uuid[UI_BITMAP] + UUID_NEW_BM_OFFSET;
2059         drbd_uuid_set(mdev, UI_BITMAP, uuid);
2060         drbd_print_uuids(mdev, "updated sync UUID");
2061         drbd_md_sync(mdev);
2062         p.uuid = cpu_to_be64(uuid);
2063
2064         return drbd_send_cmd(mdev, USE_DATA_SOCKET, P_SYNC_UUID,
2065                              (struct p_header80 *)&p, sizeof(p));
2066 }
2067
2068 int drbd_send_sizes(struct drbd_conf *mdev, int trigger_reply, enum dds_flags flags)
2069 {
2070         struct p_sizes p;
2071         sector_t d_size, u_size;
2072         int q_order_type, max_bio_size;
2073         int ok;
2074
2075         if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
2076                 D_ASSERT(mdev->ldev->backing_bdev);
2077                 d_size = drbd_get_max_capacity(mdev->ldev);
2078                 u_size = mdev->ldev->dc.disk_size;
2079                 q_order_type = drbd_queue_order_type(mdev);
2080                 max_bio_size = queue_max_hw_sectors(mdev->ldev->backing_bdev->bd_disk->queue) << 9;
2081                 max_bio_size = min_t(int, max_bio_size, DRBD_MAX_BIO_SIZE);
2082                 put_ldev(mdev);
2083         } else {
2084                 d_size = 0;
2085                 u_size = 0;
2086                 q_order_type = QUEUE_ORDERED_NONE;
2087                 max_bio_size = DRBD_MAX_BIO_SIZE; /* ... multiple BIOs per peer_request */
2088         }
2089
2090         p.d_size = cpu_to_be64(d_size);
2091         p.u_size = cpu_to_be64(u_size);
2092         p.c_size = cpu_to_be64(trigger_reply ? 0 : drbd_get_capacity(mdev->this_bdev));
2093         p.max_bio_size = cpu_to_be32(max_bio_size);
2094         p.queue_order_type = cpu_to_be16(q_order_type);
2095         p.dds_flags = cpu_to_be16(flags);
2096
2097         ok = drbd_send_cmd(mdev, USE_DATA_SOCKET, P_SIZES,
2098                            (struct p_header80 *)&p, sizeof(p));
2099         return ok;
2100 }
2101
2102 /**
2103  * drbd_send_state() - Sends the drbd state to the peer
2104  * @mdev:       DRBD device.
2105  */
2106 int drbd_send_state(struct drbd_conf *mdev)
2107 {
2108         struct socket *sock;
2109         struct p_state p;
2110         int ok = 0;
2111
2112         /* Grab state lock so we wont send state if we're in the middle
2113          * of a cluster wide state change on another thread */
2114         drbd_state_lock(mdev);
2115
2116         mutex_lock(&mdev->data.mutex);
2117
2118         p.state = cpu_to_be32(mdev->state.i); /* Within the send mutex */
2119         sock = mdev->data.socket;
2120
2121         if (likely(sock != NULL)) {
2122                 ok = _drbd_send_cmd(mdev, sock, P_STATE,
2123                                     (struct p_header80 *)&p, sizeof(p), 0);
2124         }
2125
2126         mutex_unlock(&mdev->data.mutex);
2127
2128         drbd_state_unlock(mdev);
2129         return ok;
2130 }
2131
2132 int drbd_send_state_req(struct drbd_conf *mdev,
2133         union drbd_state mask, union drbd_state val)
2134 {
2135         struct p_req_state p;
2136
2137         p.mask    = cpu_to_be32(mask.i);
2138         p.val     = cpu_to_be32(val.i);
2139
2140         return drbd_send_cmd(mdev, USE_DATA_SOCKET, P_STATE_CHG_REQ,
2141                              (struct p_header80 *)&p, sizeof(p));
2142 }
2143
2144 int drbd_send_sr_reply(struct drbd_conf *mdev, enum drbd_state_rv retcode)
2145 {
2146         struct p_req_state_reply p;
2147
2148         p.retcode    = cpu_to_be32(retcode);
2149
2150         return drbd_send_cmd(mdev, USE_META_SOCKET, P_STATE_CHG_REPLY,
2151                              (struct p_header80 *)&p, sizeof(p));
2152 }
2153
2154 int fill_bitmap_rle_bits(struct drbd_conf *mdev,
2155         struct p_compressed_bm *p,
2156         struct bm_xfer_ctx *c)
2157 {
2158         struct bitstream bs;
2159         unsigned long plain_bits;
2160         unsigned long tmp;
2161         unsigned long rl;
2162         unsigned len;
2163         unsigned toggle;
2164         int bits;
2165
2166         /* may we use this feature? */
2167         if ((mdev->sync_conf.use_rle == 0) ||
2168                 (mdev->agreed_pro_version < 90))
2169                         return 0;
2170
2171         if (c->bit_offset >= c->bm_bits)
2172                 return 0; /* nothing to do. */
2173
2174         /* use at most thus many bytes */
2175         bitstream_init(&bs, p->code, BM_PACKET_VLI_BYTES_MAX, 0);
2176         memset(p->code, 0, BM_PACKET_VLI_BYTES_MAX);
2177         /* plain bits covered in this code string */
2178         plain_bits = 0;
2179
2180         /* p->encoding & 0x80 stores whether the first run length is set.
2181          * bit offset is implicit.
2182          * start with toggle == 2 to be able to tell the first iteration */
2183         toggle = 2;
2184
2185         /* see how much plain bits we can stuff into one packet
2186          * using RLE and VLI. */
2187         do {
2188                 tmp = (toggle == 0) ? _drbd_bm_find_next_zero(mdev, c->bit_offset)
2189                                     : _drbd_bm_find_next(mdev, c->bit_offset);
2190                 if (tmp == -1UL)
2191                         tmp = c->bm_bits;
2192                 rl = tmp - c->bit_offset;
2193
2194                 if (toggle == 2) { /* first iteration */
2195                         if (rl == 0) {
2196                                 /* the first checked bit was set,
2197                                  * store start value, */
2198                                 DCBP_set_start(p, 1);
2199                                 /* but skip encoding of zero run length */
2200                                 toggle = !toggle;
2201                                 continue;
2202                         }
2203                         DCBP_set_start(p, 0);
2204                 }
2205
2206                 /* paranoia: catch zero runlength.
2207                  * can only happen if bitmap is modified while we scan it. */
2208                 if (rl == 0) {
2209                         dev_err(DEV, "unexpected zero runlength while encoding bitmap "
2210                             "t:%u bo:%lu\n", toggle, c->bit_offset);
2211                         return -1;
2212                 }
2213
2214                 bits = vli_encode_bits(&bs, rl);
2215                 if (bits == -ENOBUFS) /* buffer full */
2216                         break;
2217                 if (bits <= 0) {
2218                         dev_err(DEV, "error while encoding bitmap: %d\n", bits);
2219                         return 0;
2220                 }
2221
2222                 toggle = !toggle;
2223                 plain_bits += rl;
2224                 c->bit_offset = tmp;
2225         } while (c->bit_offset < c->bm_bits);
2226
2227         len = bs.cur.b - p->code + !!bs.cur.bit;
2228
2229         if (plain_bits < (len << 3)) {
2230                 /* incompressible with this method.
2231                  * we need to rewind both word and bit position. */
2232                 c->bit_offset -= plain_bits;
2233                 bm_xfer_ctx_bit_to_word_offset(c);
2234                 c->bit_offset = c->word_offset * BITS_PER_LONG;
2235                 return 0;
2236         }
2237
2238         /* RLE + VLI was able to compress it just fine.
2239          * update c->word_offset. */
2240         bm_xfer_ctx_bit_to_word_offset(c);
2241
2242         /* store pad_bits */
2243         DCBP_set_pad_bits(p, (8 - bs.cur.bit) & 0x7);
2244
2245         return len;
2246 }
2247
2248 /**
2249  * send_bitmap_rle_or_plain
2250  *
2251  * Return 0 when done, 1 when another iteration is needed, and a negative error
2252  * code upon failure.
2253  */
2254 static int
2255 send_bitmap_rle_or_plain(struct drbd_conf *mdev,
2256                          struct p_header80 *h, struct bm_xfer_ctx *c)
2257 {
2258         struct p_compressed_bm *p = (void*)h;
2259         unsigned long num_words;
2260         int len;
2261         int ok;
2262
2263         len = fill_bitmap_rle_bits(mdev, p, c);
2264
2265         if (len < 0)
2266                 return -EIO;
2267
2268         if (len) {
2269                 DCBP_set_code(p, RLE_VLI_Bits);
2270                 ok = _drbd_send_cmd(mdev, mdev->data.socket, P_COMPRESSED_BITMAP, h,
2271                         sizeof(*p) + len, 0);
2272
2273                 c->packets[0]++;
2274                 c->bytes[0] += sizeof(*p) + len;
2275
2276                 if (c->bit_offset >= c->bm_bits)
2277                         len = 0; /* DONE */
2278         } else {
2279                 /* was not compressible.
2280                  * send a buffer full of plain text bits instead. */
2281                 num_words = min_t(size_t, BM_PACKET_WORDS, c->bm_words - c->word_offset);
2282                 len = num_words * sizeof(long);
2283                 if (len)
2284                         drbd_bm_get_lel(mdev, c->word_offset, num_words, (unsigned long*)h->payload);
2285                 ok = _drbd_send_cmd(mdev, mdev->data.socket, P_BITMAP,
2286                                    h, sizeof(struct p_header80) + len, 0);
2287                 c->word_offset += num_words;
2288                 c->bit_offset = c->word_offset * BITS_PER_LONG;
2289
2290                 c->packets[1]++;
2291                 c->bytes[1] += sizeof(struct p_header80) + len;
2292
2293                 if (c->bit_offset > c->bm_bits)
2294                         c->bit_offset = c->bm_bits;
2295         }
2296         if (ok) {
2297                 if (len == 0) {
2298                         INFO_bm_xfer_stats(mdev, "send", c);
2299                         return 0;
2300                 } else
2301                         return 1;
2302         }
2303         return -EIO;
2304 }
2305
2306 /* See the comment at receive_bitmap() */
2307 int _drbd_send_bitmap(struct drbd_conf *mdev)
2308 {
2309         struct bm_xfer_ctx c;
2310         struct p_header80 *p;
2311         int err;
2312
2313         ERR_IF(!mdev->bitmap) return false;
2314
2315         /* maybe we should use some per thread scratch page,
2316          * and allocate that during initial device creation? */
2317         p = (struct p_header80 *) __get_free_page(GFP_NOIO);
2318         if (!p) {
2319                 dev_err(DEV, "failed to allocate one page buffer in %s\n", __func__);
2320                 return false;
2321         }
2322
2323         if (get_ldev(mdev)) {
2324                 if (drbd_md_test_flag(mdev->ldev, MDF_FULL_SYNC)) {
2325                         dev_info(DEV, "Writing the whole bitmap, MDF_FullSync was set.\n");
2326                         drbd_bm_set_all(mdev);
2327                         if (drbd_bm_write(mdev)) {
2328                                 /* write_bm did fail! Leave full sync flag set in Meta P_DATA
2329                                  * but otherwise process as per normal - need to tell other
2330                                  * side that a full resync is required! */
2331                                 dev_err(DEV, "Failed to write bitmap to disk!\n");
2332                         } else {
2333                                 drbd_md_clear_flag(mdev, MDF_FULL_SYNC);
2334                                 drbd_md_sync(mdev);
2335                         }
2336                 }
2337                 put_ldev(mdev);
2338         }
2339
2340         c = (struct bm_xfer_ctx) {
2341                 .bm_bits = drbd_bm_bits(mdev),
2342                 .bm_words = drbd_bm_words(mdev),
2343         };
2344
2345         do {
2346                 err = send_bitmap_rle_or_plain(mdev, p, &c);
2347         } while (err > 0);
2348
2349         free_page((unsigned long) p);
2350         return err == 0;
2351 }
2352
2353 int drbd_send_bitmap(struct drbd_conf *mdev)
2354 {
2355         int err;
2356
2357         if (!drbd_get_data_sock(mdev))
2358                 return -1;
2359         err = !_drbd_send_bitmap(mdev);
2360         drbd_put_data_sock(mdev);
2361         return err;
2362 }
2363
2364 int drbd_send_b_ack(struct drbd_conf *mdev, u32 barrier_nr, u32 set_size)
2365 {
2366         int ok;
2367         struct p_barrier_ack p;
2368
2369         p.barrier  = barrier_nr;
2370         p.set_size = cpu_to_be32(set_size);
2371
2372         if (mdev->state.conn < C_CONNECTED)
2373                 return false;
2374         ok = drbd_send_cmd(mdev, USE_META_SOCKET, P_BARRIER_ACK,
2375                         (struct p_header80 *)&p, sizeof(p));
2376         return ok;
2377 }
2378
2379 /**
2380  * _drbd_send_ack() - Sends an ack packet
2381  * @mdev:       DRBD device.
2382  * @cmd:        Packet command code.
2383  * @sector:     sector, needs to be in big endian byte order
2384  * @blksize:    size in byte, needs to be in big endian byte order
2385  * @block_id:   Id, big endian byte order
2386  */
2387 static int _drbd_send_ack(struct drbd_conf *mdev, enum drbd_packets cmd,
2388                           u64 sector,
2389                           u32 blksize,
2390                           u64 block_id)
2391 {
2392         int ok;
2393         struct p_block_ack p;
2394
2395         p.sector   = sector;
2396         p.block_id = block_id;
2397         p.blksize  = blksize;
2398         p.seq_num  = cpu_to_be32(atomic_add_return(1, &mdev->packet_seq));
2399
2400         if (!mdev->meta.socket || mdev->state.conn < C_CONNECTED)
2401                 return false;
2402         ok = drbd_send_cmd(mdev, USE_META_SOCKET, cmd,
2403                                 (struct p_header80 *)&p, sizeof(p));
2404         return ok;
2405 }
2406
2407 /* dp->sector and dp->block_id already/still in network byte order,
2408  * data_size is payload size according to dp->head,
2409  * and may need to be corrected for digest size. */
2410 int drbd_send_ack_dp(struct drbd_conf *mdev, enum drbd_packets cmd,
2411                      struct p_data *dp, int data_size)
2412 {
2413         data_size -= (mdev->agreed_pro_version >= 87 && mdev->integrity_r_tfm) ?
2414                 crypto_hash_digestsize(mdev->integrity_r_tfm) : 0;
2415         return _drbd_send_ack(mdev, cmd, dp->sector, cpu_to_be32(data_size),
2416                               dp->block_id);
2417 }
2418
2419 int drbd_send_ack_rp(struct drbd_conf *mdev, enum drbd_packets cmd,
2420                      struct p_block_req *rp)
2421 {
2422         return _drbd_send_ack(mdev, cmd, rp->sector, rp->blksize, rp->block_id);
2423 }
2424
2425 /**
2426  * drbd_send_ack() - Sends an ack packet
2427  * @mdev:       DRBD device.
2428  * @cmd:        Packet command code.
2429  * @e:          Epoch entry.
2430  */
2431 int drbd_send_ack(struct drbd_conf *mdev,
2432         enum drbd_packets cmd, struct drbd_epoch_entry *e)
2433 {
2434         return _drbd_send_ack(mdev, cmd,
2435                               cpu_to_be64(e->sector),
2436                               cpu_to_be32(e->size),
2437                               e->block_id);
2438 }
2439
2440 /* This function misuses the block_id field to signal if the blocks
2441  * are is sync or not. */
2442 int drbd_send_ack_ex(struct drbd_conf *mdev, enum drbd_packets cmd,
2443                      sector_t sector, int blksize, u64 block_id)
2444 {
2445         return _drbd_send_ack(mdev, cmd,
2446                               cpu_to_be64(sector),
2447                               cpu_to_be32(blksize),
2448                               cpu_to_be64(block_id));
2449 }
2450
2451 int drbd_send_drequest(struct drbd_conf *mdev, int cmd,
2452                        sector_t sector, int size, u64 block_id)
2453 {
2454         int ok;
2455         struct p_block_req p;
2456
2457         p.sector   = cpu_to_be64(sector);
2458         p.block_id = block_id;
2459         p.blksize  = cpu_to_be32(size);
2460
2461         ok = drbd_send_cmd(mdev, USE_DATA_SOCKET, cmd,
2462                                 (struct p_header80 *)&p, sizeof(p));
2463         return ok;
2464 }
2465
2466 int drbd_send_drequest_csum(struct drbd_conf *mdev,
2467                             sector_t sector, int size,
2468                             void *digest, int digest_size,
2469                             enum drbd_packets cmd)
2470 {
2471         int ok;
2472         struct p_block_req p;
2473
2474         p.sector   = cpu_to_be64(sector);
2475         p.block_id = BE_DRBD_MAGIC + 0xbeef;
2476         p.blksize  = cpu_to_be32(size);
2477
2478         p.head.magic   = BE_DRBD_MAGIC;
2479         p.head.command = cpu_to_be16(cmd);
2480         p.head.length  = cpu_to_be16(sizeof(p) - sizeof(struct p_header80) + digest_size);
2481
2482         mutex_lock(&mdev->data.mutex);
2483
2484         ok = (sizeof(p) == drbd_send(mdev, mdev->data.socket, &p, sizeof(p), 0));
2485         ok = ok && (digest_size == drbd_send(mdev, mdev->data.socket, digest, digest_size, 0));
2486
2487         mutex_unlock(&mdev->data.mutex);
2488
2489         return ok;
2490 }
2491
2492 int drbd_send_ov_request(struct drbd_conf *mdev, sector_t sector, int size)
2493 {
2494         int ok;
2495         struct p_block_req p;
2496
2497         p.sector   = cpu_to_be64(sector);
2498         p.block_id = BE_DRBD_MAGIC + 0xbabe;
2499         p.blksize  = cpu_to_be32(size);
2500
2501         ok = drbd_send_cmd(mdev, USE_DATA_SOCKET, P_OV_REQUEST,
2502                            (struct p_header80 *)&p, sizeof(p));
2503         return ok;
2504 }
2505
2506 /* called on sndtimeo
2507  * returns false if we should retry,
2508  * true if we think connection is dead
2509  */
2510 static int we_should_drop_the_connection(struct drbd_conf *mdev, struct socket *sock)
2511 {
2512         int drop_it;
2513         /* long elapsed = (long)(jiffies - mdev->last_received); */
2514
2515         drop_it =   mdev->meta.socket == sock
2516                 || !mdev->asender.task
2517                 || get_t_state(&mdev->asender) != Running
2518                 || mdev->state.conn < C_CONNECTED;
2519
2520         if (drop_it)
2521                 return true;
2522
2523         drop_it = !--mdev->ko_count;
2524         if (!drop_it) {
2525                 dev_err(DEV, "[%s/%d] sock_sendmsg time expired, ko = %u\n",
2526                        current->comm, current->pid, mdev->ko_count);
2527                 request_ping(mdev);
2528         }
2529
2530         return drop_it; /* && (mdev->state == R_PRIMARY) */;
2531 }
2532
2533 /* The idea of sendpage seems to be to put some kind of reference
2534  * to the page into the skb, and to hand it over to the NIC. In
2535  * this process get_page() gets called.
2536  *
2537  * As soon as the page was really sent over the network put_page()
2538  * gets called by some part of the network layer. [ NIC driver? ]
2539  *
2540  * [ get_page() / put_page() increment/decrement the count. If count
2541  *   reaches 0 the page will be freed. ]
2542  *
2543  * This works nicely with pages from FSs.
2544  * But this means that in protocol A we might signal IO completion too early!
2545  *
2546  * In order not to corrupt data during a resync we must make sure
2547  * that we do not reuse our own buffer pages (EEs) to early, therefore
2548  * we have the net_ee list.
2549  *
2550  * XFS seems to have problems, still, it submits pages with page_count == 0!
2551  * As a workaround, we disable sendpage on pages
2552  * with page_count == 0 or PageSlab.
2553  */
2554 static int _drbd_no_send_page(struct drbd_conf *mdev, struct page *page,
2555                    int offset, size_t size, unsigned msg_flags)
2556 {
2557         int sent = drbd_send(mdev, mdev->data.socket, kmap(page) + offset, size, msg_flags);
2558         kunmap(page);
2559         if (sent == size)
2560                 mdev->send_cnt += size>>9;
2561         return sent == size;
2562 }
2563
2564 static int _drbd_send_page(struct drbd_conf *mdev, struct page *page,
2565                     int offset, size_t size, unsigned msg_flags)
2566 {
2567         mm_segment_t oldfs = get_fs();
2568         int sent, ok;
2569         int len = size;
2570
2571         /* e.g. XFS meta- & log-data is in slab pages, which have a
2572          * page_count of 0 and/or have PageSlab() set.
2573          * we cannot use send_page for those, as that does get_page();
2574          * put_page(); and would cause either a VM_BUG directly, or
2575          * __page_cache_release a page that would actually still be referenced
2576          * by someone, leading to some obscure delayed Oops somewhere else. */
2577         if (disable_sendpage || (page_count(page) < 1) || PageSlab(page))
2578                 return _drbd_no_send_page(mdev, page, offset, size, msg_flags);
2579
2580         msg_flags |= MSG_NOSIGNAL;
2581         drbd_update_congested(mdev);
2582         set_fs(KERNEL_DS);
2583         do {
2584                 sent = mdev->data.socket->ops->sendpage(mdev->data.socket, page,
2585                                                         offset, len,
2586                                                         msg_flags);
2587                 if (sent == -EAGAIN) {
2588                         if (we_should_drop_the_connection(mdev,
2589                                                           mdev->data.socket))
2590                                 break;
2591                         else
2592                                 continue;
2593                 }
2594                 if (sent <= 0) {
2595                         dev_warn(DEV, "%s: size=%d len=%d sent=%d\n",
2596                              __func__, (int)size, len, sent);
2597                         break;
2598                 }
2599                 len    -= sent;
2600                 offset += sent;
2601         } while (len > 0 /* THINK && mdev->cstate >= C_CONNECTED*/);
2602         set_fs(oldfs);
2603         clear_bit(NET_CONGESTED, &mdev->flags);
2604
2605         ok = (len == 0);
2606         if (likely(ok))
2607                 mdev->send_cnt += size>>9;
2608         return ok;
2609 }
2610
2611 static int _drbd_send_bio(struct drbd_conf *mdev, struct bio *bio)
2612 {
2613         struct bio_vec *bvec;
2614         int i;
2615         /* hint all but last page with MSG_MORE */
2616         __bio_for_each_segment(bvec, bio, i, 0) {
2617                 if (!_drbd_no_send_page(mdev, bvec->bv_page,
2618                                      bvec->bv_offset, bvec->bv_len,
2619                                      i == bio->bi_vcnt -1 ? 0 : MSG_MORE))
2620                         return 0;
2621         }
2622         return 1;
2623 }
2624
2625 static int _drbd_send_zc_bio(struct drbd_conf *mdev, struct bio *bio)
2626 {
2627         struct bio_vec *bvec;
2628         int i;
2629         /* hint all but last page with MSG_MORE */
2630         __bio_for_each_segment(bvec, bio, i, 0) {
2631                 if (!_drbd_send_page(mdev, bvec->bv_page,
2632                                      bvec->bv_offset, bvec->bv_len,
2633                                      i == bio->bi_vcnt -1 ? 0 : MSG_MORE))
2634                         return 0;
2635         }
2636         return 1;
2637 }
2638
2639 static int _drbd_send_zc_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
2640 {
2641         struct page *page = e->pages;
2642         unsigned len = e->size;
2643         /* hint all but last page with MSG_MORE */
2644         page_chain_for_each(page) {
2645                 unsigned l = min_t(unsigned, len, PAGE_SIZE);
2646                 if (!_drbd_send_page(mdev, page, 0, l,
2647                                 page_chain_next(page) ? MSG_MORE : 0))
2648                         return 0;
2649                 len -= l;
2650         }
2651         return 1;
2652 }
2653
2654 static u32 bio_flags_to_wire(struct drbd_conf *mdev, unsigned long bi_rw)
2655 {
2656         if (mdev->agreed_pro_version >= 95)
2657                 return  (bi_rw & REQ_SYNC ? DP_RW_SYNC : 0) |
2658                         (bi_rw & REQ_FUA ? DP_FUA : 0) |
2659                         (bi_rw & REQ_FLUSH ? DP_FLUSH : 0) |
2660                         (bi_rw & REQ_DISCARD ? DP_DISCARD : 0);
2661         else
2662                 return bi_rw & REQ_SYNC ? DP_RW_SYNC : 0;
2663 }
2664
2665 /* Used to send write requests
2666  * R_PRIMARY -> Peer    (P_DATA)
2667  */
2668 int drbd_send_dblock(struct drbd_conf *mdev, struct drbd_request *req)
2669 {
2670         int ok = 1;
2671         struct p_data p;
2672         unsigned int dp_flags = 0;
2673         void *dgb;
2674         int dgs;
2675
2676         if (!drbd_get_data_sock(mdev))
2677                 return 0;
2678
2679         dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_w_tfm) ?
2680                 crypto_hash_digestsize(mdev->integrity_w_tfm) : 0;
2681
2682         if (req->size <= DRBD_MAX_SIZE_H80_PACKET) {
2683                 p.head.h80.magic   = BE_DRBD_MAGIC;
2684                 p.head.h80.command = cpu_to_be16(P_DATA);
2685                 p.head.h80.length  =
2686                         cpu_to_be16(sizeof(p) - sizeof(union p_header) + dgs + req->size);
2687         } else {
2688                 p.head.h95.magic   = BE_DRBD_MAGIC_BIG;
2689                 p.head.h95.command = cpu_to_be16(P_DATA);
2690                 p.head.h95.length  =
2691                         cpu_to_be32(sizeof(p) - sizeof(union p_header) + dgs + req->size);
2692         }
2693
2694         p.sector   = cpu_to_be64(req->sector);
2695         p.block_id = (unsigned long)req;
2696         p.seq_num  = cpu_to_be32(req->seq_num =
2697                                  atomic_add_return(1, &mdev->packet_seq));
2698
2699         dp_flags = bio_flags_to_wire(mdev, req->master_bio->bi_rw);
2700
2701         if (mdev->state.conn >= C_SYNC_SOURCE &&
2702             mdev->state.conn <= C_PAUSED_SYNC_T)
2703                 dp_flags |= DP_MAY_SET_IN_SYNC;
2704
2705         p.dp_flags = cpu_to_be32(dp_flags);
2706         set_bit(UNPLUG_REMOTE, &mdev->flags);
2707         ok = (sizeof(p) ==
2708                 drbd_send(mdev, mdev->data.socket, &p, sizeof(p), dgs ? MSG_MORE : 0));
2709         if (ok && dgs) {
2710                 dgb = mdev->int_dig_out;
2711                 drbd_csum_bio(mdev, mdev->integrity_w_tfm, req->master_bio, dgb);
2712                 ok = dgs == drbd_send(mdev, mdev->data.socket, dgb, dgs, 0);
2713         }
2714         if (ok) {
2715                 /* For protocol A, we have to memcpy the payload into
2716                  * socket buffers, as we may complete right away
2717                  * as soon as we handed it over to tcp, at which point the data
2718                  * pages may become invalid.
2719                  *
2720                  * For data-integrity enabled, we copy it as well, so we can be
2721                  * sure that even if the bio pages may still be modified, it
2722                  * won't change the data on the wire, thus if the digest checks
2723                  * out ok after sending on this side, but does not fit on the
2724                  * receiving side, we sure have detected corruption elsewhere.
2725                  */
2726                 if (mdev->net_conf->wire_protocol == DRBD_PROT_A || dgs)
2727                         ok = _drbd_send_bio(mdev, req->master_bio);
2728                 else
2729                         ok = _drbd_send_zc_bio(mdev, req->master_bio);
2730
2731                 /* double check digest, sometimes buffers have been modified in flight. */
2732                 if (dgs > 0 && dgs <= 64) {
2733                         /* 64 byte, 512 bit, is the largest digest size
2734                          * currently supported in kernel crypto. */
2735                         unsigned char digest[64];
2736                         drbd_csum_bio(mdev, mdev->integrity_w_tfm, req->master_bio, digest);
2737                         if (memcmp(mdev->int_dig_out, digest, dgs)) {
2738                                 dev_warn(DEV,
2739                                         "Digest mismatch, buffer modified by upper layers during write: %llus +%u\n",
2740                                         (unsigned long long)req->sector, req->size);
2741                         }
2742                 } /* else if (dgs > 64) {
2743                      ... Be noisy about digest too large ...
2744                 } */
2745         }
2746
2747         drbd_put_data_sock(mdev);
2748
2749         return ok;
2750 }
2751
2752 /* answer packet, used to send data back for read requests:
2753  *  Peer       -> (diskless) R_PRIMARY   (P_DATA_REPLY)
2754  *  C_SYNC_SOURCE -> C_SYNC_TARGET         (P_RS_DATA_REPLY)
2755  */
2756 int drbd_send_block(struct drbd_conf *mdev, enum drbd_packets cmd,
2757                     struct drbd_epoch_entry *e)
2758 {
2759         int ok;
2760         struct p_data p;
2761         void *dgb;
2762         int dgs;
2763
2764         dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_w_tfm) ?
2765                 crypto_hash_digestsize(mdev->integrity_w_tfm) : 0;
2766
2767         if (e->size <= DRBD_MAX_SIZE_H80_PACKET) {
2768                 p.head.h80.magic   = BE_DRBD_MAGIC;
2769                 p.head.h80.command = cpu_to_be16(cmd);
2770                 p.head.h80.length  =
2771                         cpu_to_be16(sizeof(p) - sizeof(struct p_header80) + dgs + e->size);
2772         } else {
2773                 p.head.h95.magic   = BE_DRBD_MAGIC_BIG;
2774                 p.head.h95.command = cpu_to_be16(cmd);
2775                 p.head.h95.length  =
2776                         cpu_to_be32(sizeof(p) - sizeof(struct p_header80) + dgs + e->size);
2777         }
2778
2779         p.sector   = cpu_to_be64(e->sector);
2780         p.block_id = e->block_id;
2781         /* p.seq_num  = 0;    No sequence numbers here.. */
2782
2783         /* Only called by our kernel thread.
2784          * This one may be interrupted by DRBD_SIG and/or DRBD_SIGKILL
2785          * in response to admin command or module unload.
2786          */
2787         if (!drbd_get_data_sock(mdev))
2788                 return 0;
2789
2790         ok = sizeof(p) == drbd_send(mdev, mdev->data.socket, &p, sizeof(p), dgs ? MSG_MORE : 0);
2791         if (ok && dgs) {
2792                 dgb = mdev->int_dig_out;
2793                 drbd_csum_ee(mdev, mdev->integrity_w_tfm, e, dgb);
2794                 ok = dgs == drbd_send(mdev, mdev->data.socket, dgb, dgs, 0);
2795         }
2796         if (ok)
2797                 ok = _drbd_send_zc_ee(mdev, e);
2798
2799         drbd_put_data_sock(mdev);
2800
2801         return ok;
2802 }
2803
2804 int drbd_send_oos(struct drbd_conf *mdev, struct drbd_request *req)
2805 {
2806         struct p_block_desc p;
2807
2808         p.sector  = cpu_to_be64(req->sector);
2809         p.blksize = cpu_to_be32(req->size);
2810
2811         return drbd_send_cmd(mdev, USE_DATA_SOCKET, P_OUT_OF_SYNC, &p.head, sizeof(p));
2812 }
2813
2814 /*
2815   drbd_send distinguishes two cases:
2816
2817   Packets sent via the data socket "sock"
2818   and packets sent via the meta data socket "msock"
2819
2820                     sock                      msock
2821   -----------------+-------------------------+------------------------------
2822   timeout           conf.timeout / 2          conf.timeout / 2
2823   timeout action    send a ping via msock     Abort communication
2824                                               and close all sockets
2825 */
2826
2827 /*
2828  * you must have down()ed the appropriate [m]sock_mutex elsewhere!
2829  */
2830 int drbd_send(struct drbd_conf *mdev, struct socket *sock,
2831               void *buf, size_t size, unsigned msg_flags)
2832 {
2833         struct kvec iov;
2834         struct msghdr msg;
2835         int rv, sent = 0;
2836
2837         if (!sock)
2838                 return -1000;
2839
2840         /* THINK  if (signal_pending) return ... ? */
2841
2842         iov.iov_base = buf;
2843         iov.iov_len  = size;
2844
2845         msg.msg_name       = NULL;
2846         msg.msg_namelen    = 0;
2847         msg.msg_control    = NULL;
2848         msg.msg_controllen = 0;
2849         msg.msg_flags      = msg_flags | MSG_NOSIGNAL;
2850
2851         if (sock == mdev->data.socket) {
2852                 mdev->ko_count = mdev->net_conf->ko_count;
2853                 drbd_update_congested(mdev);
2854         }
2855         do {
2856                 /* STRANGE
2857                  * tcp_sendmsg does _not_ use its size parameter at all ?
2858                  *
2859                  * -EAGAIN on timeout, -EINTR on signal.
2860                  */
2861 /* THINK
2862  * do we need to block DRBD_SIG if sock == &meta.socket ??
2863  * otherwise wake_asender() might interrupt some send_*Ack !
2864  */
2865                 rv = kernel_sendmsg(sock, &msg, &iov, 1, size);
2866                 if (rv == -EAGAIN) {
2867                         if (we_should_drop_the_connection(mdev, sock))
2868                                 break;
2869                         else
2870                                 continue;
2871                 }
2872                 D_ASSERT(rv != 0);
2873                 if (rv == -EINTR) {
2874                         flush_signals(current);
2875                         rv = 0;
2876                 }
2877                 if (rv < 0)
2878                         break;
2879                 sent += rv;
2880                 iov.iov_base += rv;
2881                 iov.iov_len  -= rv;
2882         } while (sent < size);
2883
2884         if (sock == mdev->data.socket)
2885                 clear_bit(NET_CONGESTED, &mdev->flags);
2886
2887         if (rv <= 0) {
2888                 if (rv != -EAGAIN) {
2889                         dev_err(DEV, "%s_sendmsg returned %d\n",
2890                             sock == mdev->meta.socket ? "msock" : "sock",
2891                             rv);
2892                         drbd_force_state(mdev, NS(conn, C_BROKEN_PIPE));
2893                 } else
2894                         drbd_force_state(mdev, NS(conn, C_TIMEOUT));
2895         }
2896
2897         return sent;
2898 }
2899
2900 static int drbd_open(struct block_device *bdev, fmode_t mode)
2901 {
2902         struct drbd_conf *mdev = bdev->bd_disk->private_data;
2903         unsigned long flags;
2904         int rv = 0;
2905
2906         mutex_lock(&drbd_main_mutex);
2907         spin_lock_irqsave(&mdev->req_lock, flags);
2908         /* to have a stable mdev->state.role
2909          * and no race with updating open_cnt */
2910
2911         if (mdev->state.role != R_PRIMARY) {
2912                 if (mode & FMODE_WRITE)
2913                         rv = -EROFS;
2914                 else if (!allow_oos)
2915                         rv = -EMEDIUMTYPE;
2916         }
2917
2918         if (!rv)
2919                 mdev->open_cnt++;
2920         spin_unlock_irqrestore(&mdev->req_lock, flags);
2921         mutex_unlock(&drbd_main_mutex);
2922
2923         return rv;
2924 }
2925
2926 static int drbd_release(struct gendisk *gd, fmode_t mode)
2927 {
2928         struct drbd_conf *mdev = gd->private_data;
2929         mutex_lock(&drbd_main_mutex);
2930         mdev->open_cnt--;
2931         mutex_unlock(&drbd_main_mutex);
2932         return 0;
2933 }
2934
2935 static void drbd_set_defaults(struct drbd_conf *mdev)
2936 {
2937         /* This way we get a compile error when sync_conf grows,
2938            and we forgot to initialize it here */
2939         mdev->sync_conf = (struct syncer_conf) {
2940                 /* .rate = */           DRBD_RATE_DEF,
2941                 /* .after = */          DRBD_AFTER_DEF,
2942                 /* .al_extents = */     DRBD_AL_EXTENTS_DEF,
2943                 /* .verify_alg = */     {}, 0,
2944                 /* .cpu_mask = */       {}, 0,
2945                 /* .csums_alg = */      {}, 0,
2946                 /* .use_rle = */        0,
2947                 /* .on_no_data = */     DRBD_ON_NO_DATA_DEF,
2948                 /* .c_plan_ahead = */   DRBD_C_PLAN_AHEAD_DEF,
2949                 /* .c_delay_target = */ DRBD_C_DELAY_TARGET_DEF,
2950                 /* .c_fill_target = */  DRBD_C_FILL_TARGET_DEF,
2951                 /* .c_max_rate = */     DRBD_C_MAX_RATE_DEF,
2952                 /* .c_min_rate = */     DRBD_C_MIN_RATE_DEF
2953         };
2954
2955         /* Have to use that way, because the layout differs between
2956            big endian and little endian */
2957         mdev->state = (union drbd_state) {
2958                 { .role = R_SECONDARY,
2959                   .peer = R_UNKNOWN,
2960                   .conn = C_STANDALONE,
2961                   .disk = D_DISKLESS,
2962                   .pdsk = D_UNKNOWN,
2963                   .susp = 0,
2964                   .susp_nod = 0,
2965                   .susp_fen = 0
2966                 } };
2967 }
2968
2969 void drbd_init_set_defaults(struct drbd_conf *mdev)
2970 {
2971         /* the memset(,0,) did most of this.
2972          * note: only assignments, no allocation in here */
2973
2974         drbd_set_defaults(mdev);
2975
2976         atomic_set(&mdev->ap_bio_cnt, 0);
2977         atomic_set(&mdev->ap_pending_cnt, 0);
2978         atomic_set(&mdev->rs_pending_cnt, 0);
2979         atomic_set(&mdev->unacked_cnt, 0);
2980         atomic_set(&mdev->local_cnt, 0);
2981         atomic_set(&mdev->net_cnt, 0);
2982         atomic_set(&mdev->packet_seq, 0);
2983         atomic_set(&mdev->pp_in_use, 0);
2984         atomic_set(&mdev->pp_in_use_by_net, 0);
2985         atomic_set(&mdev->rs_sect_in, 0);
2986         atomic_set(&mdev->rs_sect_ev, 0);
2987         atomic_set(&mdev->ap_in_flight, 0);
2988
2989         mutex_init(&mdev->md_io_mutex);
2990         mutex_init(&mdev->data.mutex);
2991         mutex_init(&mdev->meta.mutex);
2992         sema_init(&mdev->data.work.s, 0);
2993         sema_init(&mdev->meta.work.s, 0);
2994         mutex_init(&mdev->state_mutex);
2995
2996         spin_lock_init(&mdev->data.work.q_lock);
2997         spin_lock_init(&mdev->meta.work.q_lock);
2998
2999         spin_lock_init(&mdev->al_lock);
3000         spin_lock_init(&mdev->req_lock);
3001         spin_lock_init(&mdev->peer_seq_lock);
3002         spin_lock_init(&mdev->epoch_lock);
3003
3004         INIT_LIST_HEAD(&mdev->active_ee);
3005         INIT_LIST_HEAD(&mdev->sync_ee);
3006         INIT_LIST_HEAD(&mdev->done_ee);
3007         INIT_LIST_HEAD(&mdev->read_ee);
3008         INIT_LIST_HEAD(&mdev->net_ee);
3009         INIT_LIST_HEAD(&mdev->resync_reads);
3010         INIT_LIST_HEAD(&mdev->data.work.q);
3011         INIT_LIST_HEAD(&mdev->meta.work.q);
3012         INIT_LIST_HEAD(&mdev->resync_work.list);
3013         INIT_LIST_HEAD(&mdev->unplug_work.list);
3014         INIT_LIST_HEAD(&mdev->go_diskless.list);
3015         INIT_LIST_HEAD(&mdev->md_sync_work.list);
3016         INIT_LIST_HEAD(&mdev->start_resync_work.list);
3017         INIT_LIST_HEAD(&mdev->bm_io_work.w.list);
3018
3019         mdev->resync_work.cb  = w_resync_timer;
3020         mdev->unplug_work.cb  = w_send_write_hint;
3021         mdev->go_diskless.cb  = w_go_diskless;
3022         mdev->md_sync_work.cb = w_md_sync;
3023         mdev->bm_io_work.w.cb = w_bitmap_io;
3024         mdev->start_resync_work.cb = w_start_resync;
3025         init_timer(&mdev->resync_timer);
3026         init_timer(&mdev->md_sync_timer);
3027         init_timer(&mdev->start_resync_timer);
3028         init_timer(&mdev->request_timer);
3029         mdev->resync_timer.function = resync_timer_fn;
3030         mdev->resync_timer.data = (unsigned long) mdev;
3031         mdev->md_sync_timer.function = md_sync_timer_fn;
3032         mdev->md_sync_timer.data = (unsigned long) mdev;
3033         mdev->start_resync_timer.function = start_resync_timer_fn;
3034         mdev->start_resync_timer.data = (unsigned long) mdev;
3035         mdev->request_timer.function = request_timer_fn;
3036         mdev->request_timer.data = (unsigned long) mdev;
3037
3038         init_waitqueue_head(&mdev->misc_wait);
3039         init_waitqueue_head(&mdev->state_wait);
3040         init_waitqueue_head(&mdev->net_cnt_wait);
3041         init_waitqueue_head(&mdev->ee_wait);
3042         init_waitqueue_head(&mdev->al_wait);
3043         init_waitqueue_head(&mdev->seq_wait);
3044
3045         drbd_thread_init(mdev, &mdev->receiver, drbdd_init);
3046         drbd_thread_init(mdev, &mdev->worker, drbd_worker);
3047         drbd_thread_init(mdev, &mdev->asender, drbd_asender);
3048
3049         mdev->agreed_pro_version = PRO_VERSION_MAX;
3050         mdev->write_ordering = WO_bdev_flush;
3051         mdev->resync_wenr = LC_FREE;
3052         mdev->peer_max_bio_size = DRBD_MAX_BIO_SIZE_SAFE;
3053         mdev->local_max_bio_size = DRBD_MAX_BIO_SIZE_SAFE;
3054 }
3055
3056 void drbd_mdev_cleanup(struct drbd_conf *mdev)
3057 {
3058         int i;
3059         if (mdev->receiver.t_state != None)
3060                 dev_err(DEV, "ASSERT FAILED: receiver t_state == %d expected 0.\n",
3061                                 mdev->receiver.t_state);
3062
3063         /* no need to lock it, I'm the only thread alive */
3064         if (atomic_read(&mdev->current_epoch->epoch_size) !=  0)
3065                 dev_err(DEV, "epoch_size:%d\n", atomic_read(&mdev->current_epoch->epoch_size));
3066         mdev->al_writ_cnt  =
3067         mdev->bm_writ_cnt  =
3068         mdev->read_cnt     =
3069         mdev->recv_cnt     =
3070         mdev->send_cnt     =
3071         mdev->writ_cnt     =
3072         mdev->p_size       =
3073         mdev->rs_start     =
3074         mdev->rs_total     =
3075         mdev->rs_failed    = 0;
3076         mdev->rs_last_events = 0;
3077         mdev->rs_last_sect_ev = 0;
3078         for (i = 0; i < DRBD_SYNC_MARKS; i++) {
3079                 mdev->rs_mark_left[i] = 0;
3080                 mdev->rs_mark_time[i] = 0;
3081         }
3082         D_ASSERT(mdev->net_conf == NULL);
3083
3084         drbd_set_my_capacity(mdev, 0);
3085         if (mdev->bitmap) {
3086                 /* maybe never allocated. */
3087                 drbd_bm_resize(mdev, 0, 1);
3088                 drbd_bm_cleanup(mdev);
3089         }
3090
3091         drbd_free_resources(mdev);
3092         clear_bit(AL_SUSPENDED, &mdev->flags);
3093
3094         /*
3095          * currently we drbd_init_ee only on module load, so
3096          * we may do drbd_release_ee only on module unload!
3097          */
3098         D_ASSERT(list_empty(&mdev->active_ee));
3099         D_ASSERT(list_empty(&mdev->sync_ee));
3100         D_ASSERT(list_empty(&mdev->done_ee));
3101         D_ASSERT(list_empty(&mdev->read_ee));
3102         D_ASSERT(list_empty(&mdev->net_ee));
3103         D_ASSERT(list_empty(&mdev->resync_reads));
3104         D_ASSERT(list_empty(&mdev->data.work.q));
3105         D_ASSERT(list_empty(&mdev->meta.work.q));
3106         D_ASSERT(list_empty(&mdev->resync_work.list));
3107         D_ASSERT(list_empty(&mdev->unplug_work.list));
3108         D_ASSERT(list_empty(&mdev->go_diskless.list));
3109
3110         drbd_set_defaults(mdev);
3111 }
3112
3113
3114 static void drbd_destroy_mempools(void)
3115 {
3116         struct page *page;
3117
3118         while (drbd_pp_pool) {
3119                 page = drbd_pp_pool;
3120                 drbd_pp_pool = (struct page *)page_private(page);
3121                 __free_page(page);
3122                 drbd_pp_vacant--;
3123         }
3124
3125         /* D_ASSERT(atomic_read(&drbd_pp_vacant)==0); */
3126
3127         if (drbd_ee_mempool)
3128                 mempool_destroy(drbd_ee_mempool);
3129         if (drbd_request_mempool)
3130                 mempool_destroy(drbd_request_mempool);
3131         if (drbd_ee_cache)
3132                 kmem_cache_destroy(drbd_ee_cache);
3133         if (drbd_request_cache)
3134                 kmem_cache_destroy(drbd_request_cache);
3135         if (drbd_bm_ext_cache)
3136                 kmem_cache_destroy(drbd_bm_ext_cache);
3137         if (drbd_al_ext_cache)
3138                 kmem_cache_destroy(drbd_al_ext_cache);
3139
3140         drbd_ee_mempool      = NULL;
3141         drbd_request_mempool = NULL;
3142         drbd_ee_cache        = NULL;
3143         drbd_request_cache   = NULL;
3144         drbd_bm_ext_cache    = NULL;
3145         drbd_al_ext_cache    = NULL;
3146
3147         return;
3148 }
3149
3150 static int drbd_create_mempools(void)
3151 {
3152         struct page *page;
3153         const int number = (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count;
3154         int i;
3155
3156         /* prepare our caches and mempools */
3157         drbd_request_mempool = NULL;
3158         drbd_ee_cache        = NULL;
3159         drbd_request_cache   = NULL;
3160         drbd_bm_ext_cache    = NULL;
3161         drbd_al_ext_cache    = NULL;
3162         drbd_pp_pool         = NULL;
3163
3164         /* caches */
3165         drbd_request_cache = kmem_cache_create(
3166                 "drbd_req", sizeof(struct drbd_request), 0, 0, NULL);
3167         if (drbd_request_cache == NULL)
3168                 goto Enomem;
3169
3170         drbd_ee_cache = kmem_cache_create(
3171                 "drbd_ee", sizeof(struct drbd_epoch_entry), 0, 0, NULL);
3172         if (drbd_ee_cache == NULL)
3173                 goto Enomem;
3174
3175         drbd_bm_ext_cache = kmem_cache_create(
3176                 "drbd_bm", sizeof(struct bm_extent), 0, 0, NULL);
3177         if (drbd_bm_ext_cache == NULL)
3178                 goto Enomem;
3179
3180         drbd_al_ext_cache = kmem_cache_create(
3181                 "drbd_al", sizeof(struct lc_element), 0, 0, NULL);
3182         if (drbd_al_ext_cache == NULL)
3183                 goto Enomem;
3184
3185         /* mempools */
3186         drbd_request_mempool = mempool_create(number,
3187                 mempool_alloc_slab, mempool_free_slab, drbd_request_cache);
3188         if (drbd_request_mempool == NULL)
3189                 goto Enomem;
3190
3191         drbd_ee_mempool = mempool_create(number,
3192                 mempool_alloc_slab, mempool_free_slab, drbd_ee_cache);
3193         if (drbd_ee_mempool == NULL)
3194                 goto Enomem;
3195
3196         /* drbd's page pool */
3197         spin_lock_init(&drbd_pp_lock);
3198
3199         for (i = 0; i < number; i++) {
3200                 page = alloc_page(GFP_HIGHUSER);
3201                 if (!page)
3202                         goto Enomem;
3203                 set_page_private(page, (unsigned long)drbd_pp_pool);
3204                 drbd_pp_pool = page;
3205         }
3206         drbd_pp_vacant = number;
3207
3208         return 0;
3209
3210 Enomem:
3211         drbd_destroy_mempools(); /* in case we allocated some */
3212         return -ENOMEM;
3213 }
3214
3215 static int drbd_notify_sys(struct notifier_block *this, unsigned long code,
3216         void *unused)
3217 {
3218         /* just so we have it.  you never know what interesting things we
3219          * might want to do here some day...
3220          */
3221
3222         return NOTIFY_DONE;
3223 }
3224
3225 static struct notifier_block drbd_notifier = {
3226         .notifier_call = drbd_notify_sys,
3227 };
3228
3229 static void drbd_release_ee_lists(struct drbd_conf *mdev)
3230 {
3231         int rr;
3232
3233         rr = drbd_release_ee(mdev, &mdev->active_ee);
3234         if (rr)
3235                 dev_err(DEV, "%d EEs in active list found!\n", rr);
3236
3237         rr = drbd_release_ee(mdev, &mdev->sync_ee);
3238         if (rr)
3239                 dev_err(DEV, "%d EEs in sync list found!\n", rr);
3240
3241         rr = drbd_release_ee(mdev, &mdev->read_ee);
3242         if (rr)
3243                 dev_err(DEV, "%d EEs in read list found!\n", rr);
3244
3245         rr = drbd_release_ee(mdev, &mdev->done_ee);
3246         if (rr)
3247                 dev_err(DEV, "%d EEs in done list found!\n", rr);
3248
3249         rr = drbd_release_ee(mdev, &mdev->net_ee);
3250         if (rr)
3251                 dev_err(DEV, "%d EEs in net list found!\n", rr);
3252 }
3253
3254 /* caution. no locking.
3255  * currently only used from module cleanup code. */
3256 static void drbd_delete_device(unsigned int minor)
3257 {
3258         struct drbd_conf *mdev = minor_to_mdev(minor);
3259
3260         if (!mdev)
3261                 return;
3262
3263         /* paranoia asserts */
3264         if (mdev->open_cnt != 0)
3265                 dev_err(DEV, "open_cnt = %d in %s:%u", mdev->open_cnt,
3266                                 __FILE__ , __LINE__);
3267
3268         ERR_IF (!list_empty(&mdev->data.work.q)) {
3269                 struct list_head *lp;
3270                 list_for_each(lp, &mdev->data.work.q) {
3271                         dev_err(DEV, "lp = %p\n", lp);
3272                 }
3273         };
3274         /* end paranoia asserts */
3275
3276         del_gendisk(mdev->vdisk);
3277
3278         /* cleanup stuff that may have been allocated during
3279          * device (re-)configuration or state changes */
3280
3281         if (mdev->this_bdev)
3282                 bdput(mdev->this_bdev);
3283
3284         drbd_free_resources(mdev);
3285
3286         drbd_release_ee_lists(mdev);
3287
3288         /* should be freed on disconnect? */
3289         kfree(mdev->ee_hash);
3290         /*
3291         mdev->ee_hash_s = 0;
3292         mdev->ee_hash = NULL;
3293         */
3294
3295         lc_destroy(mdev->act_log);
3296         lc_destroy(mdev->resync);
3297
3298         kfree(mdev->p_uuid);
3299         /* mdev->p_uuid = NULL; */
3300
3301         kfree(mdev->int_dig_out);
3302         kfree(mdev->int_dig_in);
3303         kfree(mdev->int_dig_vv);
3304
3305         /* cleanup the rest that has been
3306          * allocated from drbd_new_device
3307          * and actually free the mdev itself */
3308         drbd_free_mdev(mdev);
3309 }
3310
3311 static void drbd_cleanup(void)
3312 {
3313         unsigned int i;
3314
3315         unregister_reboot_notifier(&drbd_notifier);
3316
3317         /* first remove proc,
3318          * drbdsetup uses it's presence to detect
3319          * whether DRBD is loaded.
3320          * If we would get stuck in proc removal,
3321          * but have netlink already deregistered,
3322          * some drbdsetup commands may wait forever
3323          * for an answer.
3324          */
3325         if (drbd_proc)
3326                 remove_proc_entry("drbd", NULL);
3327
3328         drbd_nl_cleanup();
3329
3330         if (minor_table) {
3331                 i = minor_count;
3332                 while (i--)
3333                         drbd_delete_device(i);
3334                 drbd_destroy_mempools();
3335         }
3336
3337         kfree(minor_table);
3338
3339         unregister_blkdev(DRBD_MAJOR, "drbd");
3340
3341         printk(KERN_INFO "drbd: module cleanup done.\n");
3342 }
3343
3344 /**
3345  * drbd_congested() - Callback for pdflush
3346  * @congested_data:     User data
3347  * @bdi_bits:           Bits pdflush is currently interested in
3348  *
3349  * Returns 1<<BDI_async_congested and/or 1<<BDI_sync_congested if we are congested.
3350  */
3351 static int drbd_congested(void *congested_data, int bdi_bits)
3352 {
3353         struct drbd_conf *mdev = congested_data;
3354         struct request_queue *q;
3355         char reason = '-';
3356         int r = 0;
3357
3358         if (!may_inc_ap_bio(mdev)) {
3359                 /* DRBD has frozen IO */
3360                 r = bdi_bits;
3361                 reason = 'd';
3362                 goto out;
3363         }
3364
3365         if (get_ldev(mdev)) {
3366                 q = bdev_get_queue(mdev->ldev->backing_bdev);
3367                 r = bdi_congested(&q->backing_dev_info, bdi_bits);
3368                 put_ldev(mdev);
3369                 if (r)
3370                         reason = 'b';
3371         }
3372
3373         if (bdi_bits & (1 << BDI_async_congested) && test_bit(NET_CONGESTED, &mdev->flags)) {
3374                 r |= (1 << BDI_async_congested);
3375                 reason = reason == 'b' ? 'a' : 'n';
3376         }
3377
3378 out:
3379         mdev->congestion_reason = reason;
3380         return r;
3381 }
3382
3383 struct drbd_conf *drbd_new_device(unsigned int minor)
3384 {
3385         struct drbd_conf *mdev;
3386         struct gendisk *disk;
3387         struct request_queue *q;
3388
3389         /* GFP_KERNEL, we are outside of all write-out paths */
3390         mdev = kzalloc(sizeof(struct drbd_conf), GFP_KERNEL);
3391         if (!mdev)
3392                 return NULL;
3393         if (!zalloc_cpumask_var(&mdev->cpu_mask, GFP_KERNEL))
3394                 goto out_no_cpumask;
3395
3396         mdev->minor = minor;
3397
3398         drbd_init_set_defaults(mdev);
3399
3400         q = blk_alloc_queue(GFP_KERNEL);
3401         if (!q)
3402                 goto out_no_q;
3403         mdev->rq_queue = q;
3404         q->queuedata   = mdev;
3405
3406         disk = alloc_disk(1);
3407         if (!disk)
3408                 goto out_no_disk;
3409         mdev->vdisk = disk;
3410
3411         set_disk_ro(disk, true);
3412
3413         disk->queue = q;
3414         disk->major = DRBD_MAJOR;
3415         disk->first_minor = minor;
3416         disk->fops = &drbd_ops;
3417         sprintf(disk->disk_name, "drbd%d", minor);
3418         disk->private_data = mdev;
3419
3420         mdev->this_bdev = bdget(MKDEV(DRBD_MAJOR, minor));
3421         /* we have no partitions. we contain only ourselves. */
3422         mdev->this_bdev->bd_contains = mdev->this_bdev;
3423
3424         q->backing_dev_info.congested_fn = drbd_congested;
3425         q->backing_dev_info.congested_data = mdev;
3426
3427         blk_queue_make_request(q, drbd_make_request);
3428         /* Setting the max_hw_sectors to an odd value of 8kibyte here
3429            This triggers a max_bio_size message upon first attach or connect */
3430         blk_queue_max_hw_sectors(q, DRBD_MAX_BIO_SIZE_SAFE >> 8);
3431         blk_queue_bounce_limit(q, BLK_BOUNCE_ANY);
3432         blk_queue_merge_bvec(q, drbd_merge_bvec);
3433         q->queue_lock = &mdev->req_lock;
3434
3435         mdev->md_io_page = alloc_page(GFP_KERNEL);
3436         if (!mdev->md_io_page)
3437                 goto out_no_io_page;
3438
3439         if (drbd_bm_init(mdev))
3440                 goto out_no_bitmap;
3441         /* no need to lock access, we are still initializing this minor device. */
3442         if (!tl_init(mdev))
3443                 goto out_no_tl;
3444
3445         mdev->app_reads_hash = kzalloc(APP_R_HSIZE*sizeof(void *), GFP_KERNEL);
3446         if (!mdev->app_reads_hash)
3447                 goto out_no_app_reads;
3448
3449         mdev->current_epoch = kzalloc(sizeof(struct drbd_epoch), GFP_KERNEL);
3450         if (!mdev->current_epoch)
3451                 goto out_no_epoch;
3452
3453         INIT_LIST_HEAD(&mdev->current_epoch->list);
3454         mdev->epochs = 1;
3455
3456         return mdev;
3457
3458 /* out_whatever_else:
3459         kfree(mdev->current_epoch); */
3460 out_no_epoch:
3461         kfree(mdev->app_reads_hash);
3462 out_no_app_reads:
3463         tl_cleanup(mdev);
3464 out_no_tl:
3465         drbd_bm_cleanup(mdev);
3466 out_no_bitmap:
3467         __free_page(mdev->md_io_page);
3468 out_no_io_page:
3469         put_disk(disk);
3470 out_no_disk:
3471         blk_cleanup_queue(q);
3472 out_no_q:
3473         free_cpumask_var(mdev->cpu_mask);
3474 out_no_cpumask:
3475         kfree(mdev);
3476         return NULL;
3477 }
3478
3479 /* counterpart of drbd_new_device.
3480  * last part of drbd_delete_device. */
3481 void drbd_free_mdev(struct drbd_conf *mdev)
3482 {
3483         kfree(mdev->current_epoch);
3484         kfree(mdev->app_reads_hash);
3485         tl_cleanup(mdev);
3486         if (mdev->bitmap) /* should no longer be there. */
3487                 drbd_bm_cleanup(mdev);
3488         __free_page(mdev->md_io_page);
3489         put_disk(mdev->vdisk);
3490         blk_cleanup_queue(mdev->rq_queue);
3491         free_cpumask_var(mdev->cpu_mask);
3492         drbd_free_tl_hash(mdev);
3493         kfree(mdev);
3494 }
3495
3496
3497 int __init drbd_init(void)
3498 {
3499         int err;
3500
3501         if (sizeof(struct p_handshake) != 80) {
3502                 printk(KERN_ERR
3503                        "drbd: never change the size or layout "
3504                        "of the HandShake packet.\n");
3505                 return -EINVAL;
3506         }
3507
3508         if (minor_count < DRBD_MINOR_COUNT_MIN || minor_count > DRBD_MINOR_COUNT_MAX) {
3509                 printk(KERN_ERR
3510                         "drbd: invalid minor_count (%d)\n", minor_count);
3511 #ifdef MODULE
3512                 return -EINVAL;
3513 #else
3514                 minor_count = 8;
3515 #endif
3516         }
3517
3518         err = drbd_nl_init();
3519         if (err)
3520                 return err;
3521
3522         err = register_blkdev(DRBD_MAJOR, "drbd");
3523         if (err) {
3524                 printk(KERN_ERR
3525                        "drbd: unable to register block device major %d\n",
3526                        DRBD_MAJOR);
3527                 return err;
3528         }
3529
3530         register_reboot_notifier(&drbd_notifier);
3531
3532         /*
3533          * allocate all necessary structs
3534          */
3535         err = -ENOMEM;
3536
3537         init_waitqueue_head(&drbd_pp_wait);
3538
3539         drbd_proc = NULL; /* play safe for drbd_cleanup */
3540         minor_table = kzalloc(sizeof(struct drbd_conf *)*minor_count,
3541                                 GFP_KERNEL);
3542         if (!minor_table)
3543                 goto Enomem;
3544
3545         err = drbd_create_mempools();
3546         if (err)
3547                 goto Enomem;
3548
3549         drbd_proc = proc_create_data("drbd", S_IFREG | S_IRUGO , NULL, &drbd_proc_fops, NULL);
3550         if (!drbd_proc) {
3551                 printk(KERN_ERR "drbd: unable to register proc file\n");
3552                 goto Enomem;
3553         }
3554
3555         rwlock_init(&global_state_lock);
3556
3557         printk(KERN_INFO "drbd: initialized. "
3558                "Version: " REL_VERSION " (api:%d/proto:%d-%d)\n",
3559                API_VERSION, PRO_VERSION_MIN, PRO_VERSION_MAX);
3560         printk(KERN_INFO "drbd: %s\n", drbd_buildtag());
3561         printk(KERN_INFO "drbd: registered as block device major %d\n",
3562                 DRBD_MAJOR);
3563         printk(KERN_INFO "drbd: minor_table @ 0x%p\n", minor_table);
3564
3565         return 0; /* Success! */
3566
3567 Enomem:
3568         drbd_cleanup();
3569         if (err == -ENOMEM)
3570                 /* currently always the case */
3571                 printk(KERN_ERR "drbd: ran out of memory\n");
3572         else
3573                 printk(KERN_ERR "drbd: initialization failure\n");
3574         return err;
3575 }
3576
3577 void drbd_free_bc(struct drbd_backing_dev *ldev)
3578 {
3579         if (ldev == NULL)
3580                 return;
3581
3582         blkdev_put(ldev->backing_bdev, FMODE_READ | FMODE_WRITE | FMODE_EXCL);
3583         blkdev_put(ldev->md_bdev, FMODE_READ | FMODE_WRITE | FMODE_EXCL);
3584
3585         kfree(ldev);
3586 }
3587
3588 void drbd_free_sock(struct drbd_conf *mdev)
3589 {
3590         if (mdev->data.socket) {
3591                 mutex_lock(&mdev->data.mutex);
3592                 kernel_sock_shutdown(mdev->data.socket, SHUT_RDWR);
3593                 sock_release(mdev->data.socket);
3594                 mdev->data.socket = NULL;
3595                 mutex_unlock(&mdev->data.mutex);
3596         }
3597         if (mdev->meta.socket) {
3598                 mutex_lock(&mdev->meta.mutex);
3599                 kernel_sock_shutdown(mdev->meta.socket, SHUT_RDWR);
3600                 sock_release(mdev->meta.socket);
3601                 mdev->meta.socket = NULL;
3602                 mutex_unlock(&mdev->meta.mutex);
3603         }
3604 }
3605
3606
3607 void drbd_free_resources(struct drbd_conf *mdev)
3608 {
3609         crypto_free_hash(mdev->csums_tfm);
3610         mdev->csums_tfm = NULL;
3611         crypto_free_hash(mdev->verify_tfm);
3612         mdev->verify_tfm = NULL;
3613         crypto_free_hash(mdev->cram_hmac_tfm);
3614         mdev->cram_hmac_tfm = NULL;
3615         crypto_free_hash(mdev->integrity_w_tfm);
3616         mdev->integrity_w_tfm = NULL;
3617         crypto_free_hash(mdev->integrity_r_tfm);
3618         mdev->integrity_r_tfm = NULL;
3619
3620         drbd_free_sock(mdev);
3621
3622         __no_warn(local,
3623                   drbd_free_bc(mdev->ldev);
3624                   mdev->ldev = NULL;);
3625 }
3626
3627 /* meta data management */
3628
3629 struct meta_data_on_disk {
3630         u64 la_size;           /* last agreed size. */
3631         u64 uuid[UI_SIZE];   /* UUIDs. */
3632         u64 device_uuid;
3633         u64 reserved_u64_1;
3634         u32 flags;             /* MDF */
3635         u32 magic;
3636         u32 md_size_sect;
3637         u32 al_offset;         /* offset to this block */
3638         u32 al_nr_extents;     /* important for restoring the AL */
3639               /* `-- act_log->nr_elements <-- sync_conf.al_extents */
3640         u32 bm_offset;         /* offset to the bitmap, from here */
3641         u32 bm_bytes_per_bit;  /* BM_BLOCK_SIZE */
3642         u32 la_peer_max_bio_size;   /* last peer max_bio_size */
3643         u32 reserved_u32[3];
3644
3645 } __packed;
3646
3647 /**
3648  * drbd_md_sync() - Writes the meta data super block if the MD_DIRTY flag bit is set
3649  * @mdev:       DRBD device.
3650  */
3651 void drbd_md_sync(struct drbd_conf *mdev)
3652 {
3653         struct meta_data_on_disk *buffer;
3654         sector_t sector;
3655         int i;
3656
3657         del_timer(&mdev->md_sync_timer);
3658         /* timer may be rearmed by drbd_md_mark_dirty() now. */
3659         if (!test_and_clear_bit(MD_DIRTY, &mdev->flags))
3660                 return;
3661
3662         /* We use here D_FAILED and not D_ATTACHING because we try to write
3663          * metadata even if we detach due to a disk failure! */
3664         if (!get_ldev_if_state(mdev, D_FAILED))
3665                 return;
3666
3667         mutex_lock(&mdev->md_io_mutex);
3668         buffer = (struct meta_data_on_disk *)page_address(mdev->md_io_page);
3669         memset(buffer, 0, 512);
3670
3671         buffer->la_size = cpu_to_be64(drbd_get_capacity(mdev->this_bdev));
3672         for (i = UI_CURRENT; i < UI_SIZE; i++)
3673                 buffer->uuid[i] = cpu_to_be64(mdev->ldev->md.uuid[i]);
3674         buffer->flags = cpu_to_be32(mdev->ldev->md.flags);
3675         buffer->magic = cpu_to_be32(DRBD_MD_MAGIC);
3676
3677         buffer->md_size_sect  = cpu_to_be32(mdev->ldev->md.md_size_sect);
3678         buffer->al_offset     = cpu_to_be32(mdev->ldev->md.al_offset);
3679         buffer->al_nr_extents = cpu_to_be32(mdev->act_log->nr_elements);
3680         buffer->bm_bytes_per_bit = cpu_to_be32(BM_BLOCK_SIZE);
3681         buffer->device_uuid = cpu_to_be64(mdev->ldev->md.device_uuid);
3682
3683         buffer->bm_offset = cpu_to_be32(mdev->ldev->md.bm_offset);
3684         buffer->la_peer_max_bio_size = cpu_to_be32(mdev->peer_max_bio_size);
3685
3686         D_ASSERT(drbd_md_ss__(mdev, mdev->ldev) == mdev->ldev->md.md_offset);
3687         sector = mdev->ldev->md.md_offset;
3688
3689         if (!drbd_md_sync_page_io(mdev, mdev->ldev, sector, WRITE)) {
3690                 /* this was a try anyways ... */
3691                 dev_err(DEV, "meta data update failed!\n");
3692                 drbd_chk_io_error(mdev, 1, true);
3693         }
3694
3695         /* Update mdev->ldev->md.la_size_sect,
3696          * since we updated it on metadata. */
3697         mdev->ldev->md.la_size_sect = drbd_get_capacity(mdev->this_bdev);
3698
3699         mutex_unlock(&mdev->md_io_mutex);
3700         put_ldev(mdev);
3701 }
3702
3703 /**
3704  * drbd_md_read() - Reads in the meta data super block
3705  * @mdev:       DRBD device.
3706  * @bdev:       Device from which the meta data should be read in.
3707  *
3708  * Return 0 (NO_ERROR) on success, and an enum drbd_ret_code in case
3709  * something goes wrong.  Currently only: ERR_IO_MD_DISK, ERR_MD_INVALID.
3710  */
3711 int drbd_md_read(struct drbd_conf *mdev, struct drbd_backing_dev *bdev)
3712 {
3713         struct meta_data_on_disk *buffer;
3714         int i, rv = NO_ERROR;
3715
3716         if (!get_ldev_if_state(mdev, D_ATTACHING))
3717                 return ERR_IO_MD_DISK;
3718
3719         mutex_lock(&mdev->md_io_mutex);
3720         buffer = (struct meta_data_on_disk *)page_address(mdev->md_io_page);
3721
3722         if (!drbd_md_sync_page_io(mdev, bdev, bdev->md.md_offset, READ)) {
3723                 /* NOTE: can't do normal error processing here as this is
3724                    called BEFORE disk is attached */
3725                 dev_err(DEV, "Error while reading metadata.\n");
3726                 rv = ERR_IO_MD_DISK;
3727                 goto err;
3728         }
3729
3730         if (be32_to_cpu(buffer->magic) != DRBD_MD_MAGIC) {
3731                 dev_err(DEV, "Error while reading metadata, magic not found.\n");
3732                 rv = ERR_MD_INVALID;
3733                 goto err;
3734         }
3735         if (be32_to_cpu(buffer->al_offset) != bdev->md.al_offset) {
3736                 dev_err(DEV, "unexpected al_offset: %d (expected %d)\n",
3737                     be32_to_cpu(buffer->al_offset), bdev->md.al_offset);
3738                 rv = ERR_MD_INVALID;
3739                 goto err;
3740         }
3741         if (be32_to_cpu(buffer->bm_offset) != bdev->md.bm_offset) {
3742                 dev_err(DEV, "unexpected bm_offset: %d (expected %d)\n",
3743                     be32_to_cpu(buffer->bm_offset), bdev->md.bm_offset);
3744                 rv = ERR_MD_INVALID;
3745                 goto err;
3746         }
3747         if (be32_to_cpu(buffer->md_size_sect) != bdev->md.md_size_sect) {
3748                 dev_err(DEV, "unexpected md_size: %u (expected %u)\n",
3749                     be32_to_cpu(buffer->md_size_sect), bdev->md.md_size_sect);
3750                 rv = ERR_MD_INVALID;
3751                 goto err;
3752         }
3753
3754         if (be32_to_cpu(buffer->bm_bytes_per_bit) != BM_BLOCK_SIZE) {
3755                 dev_err(DEV, "unexpected bm_bytes_per_bit: %u (expected %u)\n",
3756                     be32_to_cpu(buffer->bm_bytes_per_bit), BM_BLOCK_SIZE);
3757                 rv = ERR_MD_INVALID;
3758                 goto err;
3759         }
3760
3761         bdev->md.la_size_sect = be64_to_cpu(buffer->la_size);
3762         for (i = UI_CURRENT; i < UI_SIZE; i++)
3763                 bdev->md.uuid[i] = be64_to_cpu(buffer->uuid[i]);
3764         bdev->md.flags = be32_to_cpu(buffer->flags);
3765         mdev->sync_conf.al_extents = be32_to_cpu(buffer->al_nr_extents);
3766         bdev->md.device_uuid = be64_to_cpu(buffer->device_uuid);
3767
3768         spin_lock_irq(&mdev->req_lock);
3769         if (mdev->state.conn < C_CONNECTED) {
3770                 int peer;
3771                 peer = be32_to_cpu(buffer->la_peer_max_bio_size);
3772                 peer = max_t(int, peer, DRBD_MAX_BIO_SIZE_SAFE);
3773                 mdev->peer_max_bio_size = peer;
3774         }
3775         spin_unlock_irq(&mdev->req_lock);
3776
3777         if (mdev->sync_conf.al_extents < 7)
3778                 mdev->sync_conf.al_extents = 127;
3779
3780  err:
3781         mutex_unlock(&mdev->md_io_mutex);
3782         put_ldev(mdev);
3783
3784         return rv;
3785 }
3786
3787 /**
3788  * drbd_md_mark_dirty() - Mark meta data super block as dirty
3789  * @mdev:       DRBD device.
3790  *
3791  * Call this function if you change anything that should be written to
3792  * the meta-data super block. This function sets MD_DIRTY, and starts a
3793  * timer that ensures that within five seconds you have to call drbd_md_sync().
3794  */
3795 #ifdef DEBUG
3796 void drbd_md_mark_dirty_(struct drbd_conf *mdev, unsigned int line, const char *func)
3797 {
3798         if (!test_and_set_bit(MD_DIRTY, &mdev->flags)) {
3799                 mod_timer(&mdev->md_sync_timer, jiffies + HZ);
3800                 mdev->last_md_mark_dirty.line = line;
3801                 mdev->last_md_mark_dirty.func = func;
3802         }
3803 }
3804 #else
3805 void drbd_md_mark_dirty(struct drbd_conf *mdev)
3806 {
3807         if (!test_and_set_bit(MD_DIRTY, &mdev->flags))
3808                 mod_timer(&mdev->md_sync_timer, jiffies + 5*HZ);
3809 }
3810 #endif
3811
3812 static void drbd_uuid_move_history(struct drbd_conf *mdev) __must_hold(local)
3813 {
3814         int i;
3815
3816         for (i = UI_HISTORY_START; i < UI_HISTORY_END; i++)
3817                 mdev->ldev->md.uuid[i+1] = mdev->ldev->md.uuid[i];
3818 }
3819
3820 void _drbd_uuid_set(struct drbd_conf *mdev, int idx, u64 val) __must_hold(local)
3821 {
3822         if (idx == UI_CURRENT) {
3823                 if (mdev->state.role == R_PRIMARY)
3824                         val |= 1;
3825                 else
3826                         val &= ~((u64)1);
3827
3828                 drbd_set_ed_uuid(mdev, val);
3829         }
3830
3831         mdev->ldev->md.uuid[idx] = val;
3832         drbd_md_mark_dirty(mdev);
3833 }
3834
3835
3836 void drbd_uuid_set(struct drbd_conf *mdev, int idx, u64 val) __must_hold(local)
3837 {
3838         if (mdev->ldev->md.uuid[idx]) {
3839                 drbd_uuid_move_history(mdev);
3840                 mdev->ldev->md.uuid[UI_HISTORY_START] = mdev->ldev->md.uuid[idx];
3841         }
3842         _drbd_uuid_set(mdev, idx, val);
3843 }
3844
3845 /**
3846  * drbd_uuid_new_current() - Creates a new current UUID
3847  * @mdev:       DRBD device.
3848  *
3849  * Creates a new current UUID, and rotates the old current UUID into
3850  * the bitmap slot. Causes an incremental resync upon next connect.
3851  */
3852 void drbd_uuid_new_current(struct drbd_conf *mdev) __must_hold(local)
3853 {
3854         u64 val;
3855         unsigned long long bm_uuid = mdev->ldev->md.uuid[UI_BITMAP];
3856
3857         if (bm_uuid)
3858                 dev_warn(DEV, "bm UUID was already set: %llX\n", bm_uuid);
3859
3860         mdev->ldev->md.uuid[UI_BITMAP] = mdev->ldev->md.uuid[UI_CURRENT];
3861
3862         get_random_bytes(&val, sizeof(u64));
3863         _drbd_uuid_set(mdev, UI_CURRENT, val);
3864         drbd_print_uuids(mdev, "new current UUID");
3865         /* get it to stable storage _now_ */
3866         drbd_md_sync(mdev);
3867 }
3868
3869 void drbd_uuid_set_bm(struct drbd_conf *mdev, u64 val) __must_hold(local)
3870 {
3871         if (mdev->ldev->md.uuid[UI_BITMAP] == 0 && val == 0)
3872                 return;
3873
3874         if (val == 0) {
3875                 drbd_uuid_move_history(mdev);
3876                 mdev->ldev->md.uuid[UI_HISTORY_START] = mdev->ldev->md.uuid[UI_BITMAP];
3877                 mdev->ldev->md.uuid[UI_BITMAP] = 0;
3878         } else {
3879                 unsigned long long bm_uuid = mdev->ldev->md.uuid[UI_BITMAP];
3880                 if (bm_uuid)
3881                         dev_warn(DEV, "bm UUID was already set: %llX\n", bm_uuid);
3882
3883                 mdev->ldev->md.uuid[UI_BITMAP] = val & ~((u64)1);
3884         }
3885         drbd_md_mark_dirty(mdev);
3886 }
3887
3888 /**
3889  * drbd_bmio_set_n_write() - io_fn for drbd_queue_bitmap_io() or drbd_bitmap_io()
3890  * @mdev:       DRBD device.
3891  *
3892  * Sets all bits in the bitmap and writes the whole bitmap to stable storage.
3893  */
3894 int drbd_bmio_set_n_write(struct drbd_conf *mdev)
3895 {
3896         int rv = -EIO;
3897
3898         if (get_ldev_if_state(mdev, D_ATTACHING)) {
3899                 drbd_md_set_flag(mdev, MDF_FULL_SYNC);
3900                 drbd_md_sync(mdev);
3901                 drbd_bm_set_all(mdev);
3902
3903                 rv = drbd_bm_write(mdev);
3904
3905                 if (!rv) {
3906                         drbd_md_clear_flag(mdev, MDF_FULL_SYNC);
3907                         drbd_md_sync(mdev);
3908                 }
3909
3910                 put_ldev(mdev);
3911         }
3912
3913         return rv;
3914 }
3915
3916 /**
3917  * drbd_bmio_clear_n_write() - io_fn for drbd_queue_bitmap_io() or drbd_bitmap_io()
3918  * @mdev:       DRBD device.
3919  *
3920  * Clears all bits in the bitmap and writes the whole bitmap to stable storage.
3921  */
3922 int drbd_bmio_clear_n_write(struct drbd_conf *mdev)
3923 {
3924         int rv = -EIO;
3925
3926         drbd_resume_al(mdev);
3927         if (get_ldev_if_state(mdev, D_ATTACHING)) {
3928                 drbd_bm_clear_all(mdev);
3929                 rv = drbd_bm_write(mdev);
3930                 put_ldev(mdev);
3931         }
3932
3933         return rv;
3934 }
3935
3936 static int w_bitmap_io(struct drbd_conf *mdev, struct drbd_work *w, int unused)
3937 {
3938         struct bm_io_work *work = container_of(w, struct bm_io_work, w);
3939         int rv = -EIO;
3940
3941         D_ASSERT(atomic_read(&mdev->ap_bio_cnt) == 0);
3942
3943         if (get_ldev(mdev)) {
3944                 drbd_bm_lock(mdev, work->why, work->flags);
3945                 rv = work->io_fn(mdev);
3946                 drbd_bm_unlock(mdev);
3947                 put_ldev(mdev);
3948         }
3949
3950         clear_bit(BITMAP_IO, &mdev->flags);
3951         smp_mb__after_clear_bit();
3952         wake_up(&mdev->misc_wait);
3953
3954         if (work->done)
3955                 work->done(mdev, rv);
3956
3957         clear_bit(BITMAP_IO_QUEUED, &mdev->flags);
3958         work->why = NULL;
3959         work->flags = 0;
3960
3961         return 1;
3962 }
3963
3964 void drbd_ldev_destroy(struct drbd_conf *mdev)
3965 {
3966         lc_destroy(mdev->resync);
3967         mdev->resync = NULL;
3968         lc_destroy(mdev->act_log);
3969         mdev->act_log = NULL;
3970         __no_warn(local,
3971                 drbd_free_bc(mdev->ldev);
3972                 mdev->ldev = NULL;);
3973
3974         if (mdev->md_io_tmpp) {
3975                 __free_page(mdev->md_io_tmpp);
3976                 mdev->md_io_tmpp = NULL;
3977         }
3978         clear_bit(GO_DISKLESS, &mdev->flags);
3979 }
3980
3981 static int w_go_diskless(struct drbd_conf *mdev, struct drbd_work *w, int unused)
3982 {
3983         D_ASSERT(mdev->state.disk == D_FAILED);
3984         /* we cannot assert local_cnt == 0 here, as get_ldev_if_state will
3985          * inc/dec it frequently. Once we are D_DISKLESS, no one will touch
3986          * the protected members anymore, though, so once put_ldev reaches zero
3987          * again, it will be safe to free them. */
3988         drbd_force_state(mdev, NS(disk, D_DISKLESS));
3989         return 1;
3990 }
3991
3992 void drbd_go_diskless(struct drbd_conf *mdev)
3993 {
3994         D_ASSERT(mdev->state.disk == D_FAILED);
3995         if (!test_and_set_bit(GO_DISKLESS, &mdev->flags))
3996                 drbd_queue_work(&mdev->data.work, &mdev->go_diskless);
3997 }
3998
3999 /**
4000  * drbd_queue_bitmap_io() - Queues an IO operation on the whole bitmap
4001  * @mdev:       DRBD device.
4002  * @io_fn:      IO callback to be called when bitmap IO is possible
4003  * @done:       callback to be called after the bitmap IO was performed
4004  * @why:        Descriptive text of the reason for doing the IO
4005  *
4006  * While IO on the bitmap happens we freeze application IO thus we ensure
4007  * that drbd_set_out_of_sync() can not be called. This function MAY ONLY be
4008  * called from worker context. It MUST NOT be used while a previous such
4009  * work is still pending!
4010  */
4011 void drbd_queue_bitmap_io(struct drbd_conf *mdev,
4012                           int (*io_fn)(struct drbd_conf *),
4013                           void (*done)(struct drbd_conf *, int),
4014                           char *why, enum bm_flag flags)
4015 {
4016         D_ASSERT(current == mdev->worker.task);
4017
4018         D_ASSERT(!test_bit(BITMAP_IO_QUEUED, &mdev->flags));
4019         D_ASSERT(!test_bit(BITMAP_IO, &mdev->flags));
4020         D_ASSERT(list_empty(&mdev->bm_io_work.w.list));
4021         if (mdev->bm_io_work.why)
4022                 dev_err(DEV, "FIXME going to queue '%s' but '%s' still pending?\n",
4023                         why, mdev->bm_io_work.why);
4024
4025         mdev->bm_io_work.io_fn = io_fn;
4026         mdev->bm_io_work.done = done;
4027         mdev->bm_io_work.why = why;
4028         mdev->bm_io_work.flags = flags;
4029
4030         spin_lock_irq(&mdev->req_lock);
4031         set_bit(BITMAP_IO, &mdev->flags);
4032         if (atomic_read(&mdev->ap_bio_cnt) == 0) {
4033                 if (!test_and_set_bit(BITMAP_IO_QUEUED, &mdev->flags))
4034                         drbd_queue_work(&mdev->data.work, &mdev->bm_io_work.w);
4035         }
4036         spin_unlock_irq(&mdev->req_lock);
4037 }
4038
4039 /**
4040  * drbd_bitmap_io() -  Does an IO operation on the whole bitmap
4041  * @mdev:       DRBD device.
4042  * @io_fn:      IO callback to be called when bitmap IO is possible
4043  * @why:        Descriptive text of the reason for doing the IO
4044  *
4045  * freezes application IO while that the actual IO operations runs. This
4046  * functions MAY NOT be called from worker context.
4047  */
4048 int drbd_bitmap_io(struct drbd_conf *mdev, int (*io_fn)(struct drbd_conf *),
4049                 char *why, enum bm_flag flags)
4050 {
4051         int rv;
4052
4053         D_ASSERT(current != mdev->worker.task);
4054
4055         if ((flags & BM_LOCKED_SET_ALLOWED) == 0)
4056                 drbd_suspend_io(mdev);
4057
4058         drbd_bm_lock(mdev, why, flags);
4059         rv = io_fn(mdev);
4060         drbd_bm_unlock(mdev);
4061
4062         if ((flags & BM_LOCKED_SET_ALLOWED) == 0)
4063                 drbd_resume_io(mdev);
4064
4065         return rv;
4066 }
4067
4068 void drbd_md_set_flag(struct drbd_conf *mdev, int flag) __must_hold(local)
4069 {
4070         if ((mdev->ldev->md.flags & flag) != flag) {
4071                 drbd_md_mark_dirty(mdev);
4072                 mdev->ldev->md.flags |= flag;
4073         }
4074 }
4075
4076 void drbd_md_clear_flag(struct drbd_conf *mdev, int flag) __must_hold(local)
4077 {
4078         if ((mdev->ldev->md.flags & flag) != 0) {
4079                 drbd_md_mark_dirty(mdev);
4080                 mdev->ldev->md.flags &= ~flag;
4081         }
4082 }
4083 int drbd_md_test_flag(struct drbd_backing_dev *bdev, int flag)
4084 {
4085         return (bdev->md.flags & flag) != 0;
4086 }
4087
4088 static void md_sync_timer_fn(unsigned long data)
4089 {
4090         struct drbd_conf *mdev = (struct drbd_conf *) data;
4091
4092         drbd_queue_work_front(&mdev->data.work, &mdev->md_sync_work);
4093 }
4094
4095 static int w_md_sync(struct drbd_conf *mdev, struct drbd_work *w, int unused)
4096 {
4097         dev_warn(DEV, "md_sync_timer expired! Worker calls drbd_md_sync().\n");
4098 #ifdef DEBUG
4099         dev_warn(DEV, "last md_mark_dirty: %s:%u\n",
4100                 mdev->last_md_mark_dirty.func, mdev->last_md_mark_dirty.line);
4101 #endif
4102         drbd_md_sync(mdev);
4103         return 1;
4104 }
4105
4106 #ifdef CONFIG_DRBD_FAULT_INJECTION
4107 /* Fault insertion support including random number generator shamelessly
4108  * stolen from kernel/rcutorture.c */
4109 struct fault_random_state {
4110         unsigned long state;
4111         unsigned long count;
4112 };
4113
4114 #define FAULT_RANDOM_MULT 39916801  /* prime */
4115 #define FAULT_RANDOM_ADD        479001701 /* prime */
4116 #define FAULT_RANDOM_REFRESH 10000
4117
4118 /*
4119  * Crude but fast random-number generator.  Uses a linear congruential
4120  * generator, with occasional help from get_random_bytes().
4121  */
4122 static unsigned long
4123 _drbd_fault_random(struct fault_random_state *rsp)
4124 {
4125         long refresh;
4126
4127         if (!rsp->count--) {
4128                 get_random_bytes(&refresh, sizeof(refresh));
4129                 rsp->state += refresh;
4130                 rsp->count = FAULT_RANDOM_REFRESH;
4131         }
4132         rsp->state = rsp->state * FAULT_RANDOM_MULT + FAULT_RANDOM_ADD;
4133         return swahw32(rsp->state);
4134 }
4135
4136 static char *
4137 _drbd_fault_str(unsigned int type) {
4138         static char *_faults[] = {
4139                 [DRBD_FAULT_MD_WR] = "Meta-data write",
4140                 [DRBD_FAULT_MD_RD] = "Meta-data read",
4141                 [DRBD_FAULT_RS_WR] = "Resync write",
4142                 [DRBD_FAULT_RS_RD] = "Resync read",
4143                 [DRBD_FAULT_DT_WR] = "Data write",
4144                 [DRBD_FAULT_DT_RD] = "Data read",
4145                 [DRBD_FAULT_DT_RA] = "Data read ahead",
4146                 [DRBD_FAULT_BM_ALLOC] = "BM allocation",
4147                 [DRBD_FAULT_AL_EE] = "EE allocation",
4148                 [DRBD_FAULT_RECEIVE] = "receive data corruption",
4149         };
4150
4151         return (type < DRBD_FAULT_MAX) ? _faults[type] : "**Unknown**";
4152 }
4153
4154 unsigned int
4155 _drbd_insert_fault(struct drbd_conf *mdev, unsigned int type)
4156 {
4157         static struct fault_random_state rrs = {0, 0};
4158
4159         unsigned int ret = (
4160                 (fault_devs == 0 ||
4161                         ((1 << mdev_to_minor(mdev)) & fault_devs) != 0) &&
4162                 (((_drbd_fault_random(&rrs) % 100) + 1) <= fault_rate));
4163
4164         if (ret) {
4165                 fault_count++;
4166
4167                 if (__ratelimit(&drbd_ratelimit_state))
4168                         dev_warn(DEV, "***Simulating %s failure\n",
4169                                 _drbd_fault_str(type));
4170         }
4171
4172         return ret;
4173 }
4174 #endif
4175
4176 const char *drbd_buildtag(void)
4177 {
4178         /* DRBD built from external sources has here a reference to the
4179            git hash of the source code. */
4180
4181         static char buildtag[38] = "\0uilt-in";
4182
4183         if (buildtag[0] == 0) {
4184 #ifdef CONFIG_MODULES
4185                 if (THIS_MODULE != NULL)
4186                         sprintf(buildtag, "srcversion: %-24s", THIS_MODULE->srcversion);
4187                 else
4188 #endif
4189                         buildtag[0] = 'b';
4190         }
4191
4192         return buildtag;
4193 }
4194
4195 module_init(drbd_init)
4196 module_exit(drbd_cleanup)
4197
4198 EXPORT_SYMBOL(drbd_conn_str);
4199 EXPORT_SYMBOL(drbd_role_str);
4200 EXPORT_SYMBOL(drbd_disk_str);
4201 EXPORT_SYMBOL(drbd_set_st_err_str);