]> git.karo-electronics.de Git - karo-tx-linux.git/blob - drivers/block/drbd/drbd_main.c
drbd: moved net_cont and net_cnt_wait from mdev to tconn
[karo-tx-linux.git] / drivers / block / drbd / drbd_main.c
1 /*
2    drbd.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    Thanks to Carter Burden, Bart Grantham and Gennadiy Nerubayev
11    from Logicworks, Inc. for making SDP replication support possible.
12
13    drbd is free software; you can redistribute it and/or modify
14    it under the terms of the GNU General Public License as published by
15    the Free Software Foundation; either version 2, or (at your option)
16    any later version.
17
18    drbd is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
21    GNU General Public License for more details.
22
23    You should have received a copy of the GNU General Public License
24    along with drbd; see the file COPYING.  If not, write to
25    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
26
27  */
28
29 #include <linux/module.h>
30 #include <linux/drbd.h>
31 #include <asm/uaccess.h>
32 #include <asm/types.h>
33 #include <net/sock.h>
34 #include <linux/ctype.h>
35 #include <linux/mutex.h>
36 #include <linux/fs.h>
37 #include <linux/file.h>
38 #include <linux/proc_fs.h>
39 #include <linux/init.h>
40 #include <linux/mm.h>
41 #include <linux/memcontrol.h>
42 #include <linux/mm_inline.h>
43 #include <linux/slab.h>
44 #include <linux/random.h>
45 #include <linux/reboot.h>
46 #include <linux/notifier.h>
47 #include <linux/kthread.h>
48
49 #define __KERNEL_SYSCALLS__
50 #include <linux/unistd.h>
51 #include <linux/vmalloc.h>
52
53 #include <linux/drbd_limits.h>
54 #include "drbd_int.h"
55 #include "drbd_req.h" /* only for _req_mod in tl_release and tl_clear */
56
57 #include "drbd_vli.h"
58
59 struct after_state_chg_work {
60         struct drbd_work w;
61         union drbd_state os;
62         union drbd_state ns;
63         enum chg_state_flags flags;
64         struct completion *done;
65 };
66
67 static DEFINE_MUTEX(drbd_main_mutex);
68 int drbdd_init(struct drbd_thread *);
69 int drbd_worker(struct drbd_thread *);
70 int drbd_asender(struct drbd_thread *);
71
72 int drbd_init(void);
73 static int drbd_open(struct block_device *bdev, fmode_t mode);
74 static int drbd_release(struct gendisk *gd, fmode_t mode);
75 static int w_after_state_ch(struct drbd_conf *mdev, struct drbd_work *w, int unused);
76 static void after_state_ch(struct drbd_conf *mdev, union drbd_state os,
77                            union drbd_state ns, enum chg_state_flags flags);
78 static int w_md_sync(struct drbd_conf *mdev, struct drbd_work *w, int unused);
79 static void md_sync_timer_fn(unsigned long data);
80 static int w_bitmap_io(struct drbd_conf *mdev, struct drbd_work *w, int unused);
81 static int w_go_diskless(struct drbd_conf *mdev, struct drbd_work *w, int unused);
82
83 MODULE_AUTHOR("Philipp Reisner <phil@linbit.com>, "
84               "Lars Ellenberg <lars@linbit.com>");
85 MODULE_DESCRIPTION("drbd - Distributed Replicated Block Device v" REL_VERSION);
86 MODULE_VERSION(REL_VERSION);
87 MODULE_LICENSE("GPL");
88 MODULE_PARM_DESC(minor_count, "Maximum number of drbd devices ("
89                  __stringify(DRBD_MINOR_COUNT_MIN) "-" __stringify(DRBD_MINOR_COUNT_MAX) ")");
90 MODULE_ALIAS_BLOCKDEV_MAJOR(DRBD_MAJOR);
91
92 #include <linux/moduleparam.h>
93 /* allow_open_on_secondary */
94 MODULE_PARM_DESC(allow_oos, "DONT USE!");
95 /* thanks to these macros, if compiled into the kernel (not-module),
96  * this becomes the boot parameter drbd.minor_count */
97 module_param(minor_count, uint, 0444);
98 module_param(disable_sendpage, bool, 0644);
99 module_param(allow_oos, bool, 0);
100 module_param(cn_idx, uint, 0444);
101 module_param(proc_details, int, 0644);
102
103 #ifdef CONFIG_DRBD_FAULT_INJECTION
104 int enable_faults;
105 int fault_rate;
106 static int fault_count;
107 int fault_devs;
108 /* bitmap of enabled faults */
109 module_param(enable_faults, int, 0664);
110 /* fault rate % value - applies to all enabled faults */
111 module_param(fault_rate, int, 0664);
112 /* count of faults inserted */
113 module_param(fault_count, int, 0664);
114 /* bitmap of devices to insert faults on */
115 module_param(fault_devs, int, 0644);
116 #endif
117
118 /* module parameter, defined */
119 unsigned int minor_count = DRBD_MINOR_COUNT_DEF;
120 int disable_sendpage;
121 int allow_oos;
122 unsigned int cn_idx = CN_IDX_DRBD;
123 int proc_details;       /* Detail level in proc drbd*/
124
125 /* Module parameter for setting the user mode helper program
126  * to run. Default is /sbin/drbdadm */
127 char usermode_helper[80] = "/sbin/drbdadm";
128
129 module_param_string(usermode_helper, usermode_helper, sizeof(usermode_helper), 0644);
130
131 /* in 2.6.x, our device mapping and config info contains our virtual gendisks
132  * as member "struct gendisk *vdisk;"
133  */
134 struct drbd_conf **minor_table;
135 struct list_head drbd_tconns;  /* list of struct drbd_tconn */
136
137 struct kmem_cache *drbd_request_cache;
138 struct kmem_cache *drbd_ee_cache;       /* epoch entries */
139 struct kmem_cache *drbd_bm_ext_cache;   /* bitmap extents */
140 struct kmem_cache *drbd_al_ext_cache;   /* activity log extents */
141 mempool_t *drbd_request_mempool;
142 mempool_t *drbd_ee_mempool;
143
144 /* I do not use a standard mempool, because:
145    1) I want to hand out the pre-allocated objects first.
146    2) I want to be able to interrupt sleeping allocation with a signal.
147    Note: This is a single linked list, the next pointer is the private
148          member of struct page.
149  */
150 struct page *drbd_pp_pool;
151 spinlock_t   drbd_pp_lock;
152 int          drbd_pp_vacant;
153 wait_queue_head_t drbd_pp_wait;
154
155 DEFINE_RATELIMIT_STATE(drbd_ratelimit_state, 5 * HZ, 5);
156
157 static const struct block_device_operations drbd_ops = {
158         .owner =   THIS_MODULE,
159         .open =    drbd_open,
160         .release = drbd_release,
161 };
162
163 #define ARRY_SIZE(A) (sizeof(A)/sizeof(A[0]))
164
165 #ifdef __CHECKER__
166 /* When checking with sparse, and this is an inline function, sparse will
167    give tons of false positives. When this is a real functions sparse works.
168  */
169 int _get_ldev_if_state(struct drbd_conf *mdev, enum drbd_disk_state mins)
170 {
171         int io_allowed;
172
173         atomic_inc(&mdev->local_cnt);
174         io_allowed = (mdev->state.disk >= mins);
175         if (!io_allowed) {
176                 if (atomic_dec_and_test(&mdev->local_cnt))
177                         wake_up(&mdev->misc_wait);
178         }
179         return io_allowed;
180 }
181
182 #endif
183
184 /**
185  * DOC: The transfer log
186  *
187  * The transfer log is a single linked list of &struct drbd_tl_epoch objects.
188  * mdev->newest_tle points to the head, mdev->oldest_tle points to the tail
189  * of the list. There is always at least one &struct drbd_tl_epoch object.
190  *
191  * Each &struct drbd_tl_epoch has a circular double linked list of requests
192  * attached.
193  */
194 static int tl_init(struct drbd_conf *mdev)
195 {
196         struct drbd_tl_epoch *b;
197
198         /* during device minor initialization, we may well use GFP_KERNEL */
199         b = kmalloc(sizeof(struct drbd_tl_epoch), GFP_KERNEL);
200         if (!b)
201                 return 0;
202         INIT_LIST_HEAD(&b->requests);
203         INIT_LIST_HEAD(&b->w.list);
204         b->next = NULL;
205         b->br_number = 4711;
206         b->n_writes = 0;
207         b->w.cb = NULL; /* if this is != NULL, we need to dec_ap_pending in tl_clear */
208
209         mdev->oldest_tle = b;
210         mdev->newest_tle = b;
211         INIT_LIST_HEAD(&mdev->out_of_sequence_requests);
212
213         return 1;
214 }
215
216 static void tl_cleanup(struct drbd_conf *mdev)
217 {
218         D_ASSERT(mdev->oldest_tle == mdev->newest_tle);
219         D_ASSERT(list_empty(&mdev->out_of_sequence_requests));
220         kfree(mdev->oldest_tle);
221         mdev->oldest_tle = NULL;
222         kfree(mdev->unused_spare_tle);
223         mdev->unused_spare_tle = NULL;
224 }
225
226 /**
227  * _tl_add_barrier() - Adds a barrier to the transfer log
228  * @mdev:       DRBD device.
229  * @new:        Barrier to be added before the current head of the TL.
230  *
231  * The caller must hold the req_lock.
232  */
233 void _tl_add_barrier(struct drbd_conf *mdev, struct drbd_tl_epoch *new)
234 {
235         struct drbd_tl_epoch *newest_before;
236
237         INIT_LIST_HEAD(&new->requests);
238         INIT_LIST_HEAD(&new->w.list);
239         new->w.cb = NULL; /* if this is != NULL, we need to dec_ap_pending in tl_clear */
240         new->next = NULL;
241         new->n_writes = 0;
242
243         newest_before = mdev->newest_tle;
244         /* never send a barrier number == 0, because that is special-cased
245          * when using TCQ for our write ordering code */
246         new->br_number = (newest_before->br_number+1) ?: 1;
247         if (mdev->newest_tle != new) {
248                 mdev->newest_tle->next = new;
249                 mdev->newest_tle = new;
250         }
251 }
252
253 /**
254  * tl_release() - Free or recycle the oldest &struct drbd_tl_epoch object of the TL
255  * @mdev:       DRBD device.
256  * @barrier_nr: Expected identifier of the DRBD write barrier packet.
257  * @set_size:   Expected number of requests before that barrier.
258  *
259  * In case the passed barrier_nr or set_size does not match the oldest
260  * &struct drbd_tl_epoch objects this function will cause a termination
261  * of the connection.
262  */
263 void tl_release(struct drbd_conf *mdev, unsigned int barrier_nr,
264                        unsigned int set_size)
265 {
266         struct drbd_tl_epoch *b, *nob; /* next old barrier */
267         struct list_head *le, *tle;
268         struct drbd_request *r;
269
270         spin_lock_irq(&mdev->req_lock);
271
272         b = mdev->oldest_tle;
273
274         /* first some paranoia code */
275         if (b == NULL) {
276                 dev_err(DEV, "BAD! BarrierAck #%u received, but no epoch in tl!?\n",
277                         barrier_nr);
278                 goto bail;
279         }
280         if (b->br_number != barrier_nr) {
281                 dev_err(DEV, "BAD! BarrierAck #%u received, expected #%u!\n",
282                         barrier_nr, b->br_number);
283                 goto bail;
284         }
285         if (b->n_writes != set_size) {
286                 dev_err(DEV, "BAD! BarrierAck #%u received with n_writes=%u, expected n_writes=%u!\n",
287                         barrier_nr, set_size, b->n_writes);
288                 goto bail;
289         }
290
291         /* Clean up list of requests processed during current epoch */
292         list_for_each_safe(le, tle, &b->requests) {
293                 r = list_entry(le, struct drbd_request, tl_requests);
294                 _req_mod(r, BARRIER_ACKED);
295         }
296         /* There could be requests on the list waiting for completion
297            of the write to the local disk. To avoid corruptions of
298            slab's data structures we have to remove the lists head.
299
300            Also there could have been a barrier ack out of sequence, overtaking
301            the write acks - which would be a bug and violating write ordering.
302            To not deadlock in case we lose connection while such requests are
303            still pending, we need some way to find them for the
304            _req_mode(CONNECTION_LOST_WHILE_PENDING).
305
306            These have been list_move'd to the out_of_sequence_requests list in
307            _req_mod(, BARRIER_ACKED) above.
308            */
309         list_del_init(&b->requests);
310
311         nob = b->next;
312         if (test_and_clear_bit(CREATE_BARRIER, &mdev->flags)) {
313                 _tl_add_barrier(mdev, b);
314                 if (nob)
315                         mdev->oldest_tle = nob;
316                 /* if nob == NULL b was the only barrier, and becomes the new
317                    barrier. Therefore mdev->oldest_tle points already to b */
318         } else {
319                 D_ASSERT(nob != NULL);
320                 mdev->oldest_tle = nob;
321                 kfree(b);
322         }
323
324         spin_unlock_irq(&mdev->req_lock);
325         dec_ap_pending(mdev);
326
327         return;
328
329 bail:
330         spin_unlock_irq(&mdev->req_lock);
331         drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
332 }
333
334
335 /**
336  * _tl_restart() - Walks the transfer log, and applies an action to all requests
337  * @mdev:       DRBD device.
338  * @what:       The action/event to perform with all request objects
339  *
340  * @what might be one of CONNECTION_LOST_WHILE_PENDING, RESEND, FAIL_FROZEN_DISK_IO,
341  * RESTART_FROZEN_DISK_IO.
342  */
343 static void _tl_restart(struct drbd_conf *mdev, enum drbd_req_event what)
344 {
345         struct drbd_tl_epoch *b, *tmp, **pn;
346         struct list_head *le, *tle, carry_reads;
347         struct drbd_request *req;
348         int rv, n_writes, n_reads;
349
350         b = mdev->oldest_tle;
351         pn = &mdev->oldest_tle;
352         while (b) {
353                 n_writes = 0;
354                 n_reads = 0;
355                 INIT_LIST_HEAD(&carry_reads);
356                 list_for_each_safe(le, tle, &b->requests) {
357                         req = list_entry(le, struct drbd_request, tl_requests);
358                         rv = _req_mod(req, what);
359
360                         n_writes += (rv & MR_WRITE) >> MR_WRITE_SHIFT;
361                         n_reads  += (rv & MR_READ) >> MR_READ_SHIFT;
362                 }
363                 tmp = b->next;
364
365                 if (n_writes) {
366                         if (what == RESEND) {
367                                 b->n_writes = n_writes;
368                                 if (b->w.cb == NULL) {
369                                         b->w.cb = w_send_barrier;
370                                         inc_ap_pending(mdev);
371                                         set_bit(CREATE_BARRIER, &mdev->flags);
372                                 }
373
374                                 drbd_queue_work(&mdev->data.work, &b->w);
375                         }
376                         pn = &b->next;
377                 } else {
378                         if (n_reads)
379                                 list_add(&carry_reads, &b->requests);
380                         /* there could still be requests on that ring list,
381                          * in case local io is still pending */
382                         list_del(&b->requests);
383
384                         /* dec_ap_pending corresponding to queue_barrier.
385                          * the newest barrier may not have been queued yet,
386                          * in which case w.cb is still NULL. */
387                         if (b->w.cb != NULL)
388                                 dec_ap_pending(mdev);
389
390                         if (b == mdev->newest_tle) {
391                                 /* recycle, but reinit! */
392                                 D_ASSERT(tmp == NULL);
393                                 INIT_LIST_HEAD(&b->requests);
394                                 list_splice(&carry_reads, &b->requests);
395                                 INIT_LIST_HEAD(&b->w.list);
396                                 b->w.cb = NULL;
397                                 b->br_number = net_random();
398                                 b->n_writes = 0;
399
400                                 *pn = b;
401                                 break;
402                         }
403                         *pn = tmp;
404                         kfree(b);
405                 }
406                 b = tmp;
407                 list_splice(&carry_reads, &b->requests);
408         }
409 }
410
411
412 /**
413  * tl_clear() - Clears all requests and &struct drbd_tl_epoch objects out of the TL
414  * @mdev:       DRBD device.
415  *
416  * This is called after the connection to the peer was lost. The storage covered
417  * by the requests on the transfer gets marked as our of sync. Called from the
418  * receiver thread and the worker thread.
419  */
420 void tl_clear(struct drbd_conf *mdev)
421 {
422         struct list_head *le, *tle;
423         struct drbd_request *r;
424
425         spin_lock_irq(&mdev->req_lock);
426
427         _tl_restart(mdev, CONNECTION_LOST_WHILE_PENDING);
428
429         /* we expect this list to be empty. */
430         D_ASSERT(list_empty(&mdev->out_of_sequence_requests));
431
432         /* but just in case, clean it up anyways! */
433         list_for_each_safe(le, tle, &mdev->out_of_sequence_requests) {
434                 r = list_entry(le, struct drbd_request, tl_requests);
435                 /* It would be nice to complete outside of spinlock.
436                  * But this is easier for now. */
437                 _req_mod(r, CONNECTION_LOST_WHILE_PENDING);
438         }
439
440         /* ensure bit indicating barrier is required is clear */
441         clear_bit(CREATE_BARRIER, &mdev->flags);
442
443         spin_unlock_irq(&mdev->req_lock);
444 }
445
446 void tl_restart(struct drbd_conf *mdev, enum drbd_req_event what)
447 {
448         spin_lock_irq(&mdev->req_lock);
449         _tl_restart(mdev, what);
450         spin_unlock_irq(&mdev->req_lock);
451 }
452
453 /**
454  * cl_wide_st_chg() - true if the state change is a cluster wide one
455  * @mdev:       DRBD device.
456  * @os:         old (current) state.
457  * @ns:         new (wanted) state.
458  */
459 static int cl_wide_st_chg(struct drbd_conf *mdev,
460                           union drbd_state os, union drbd_state ns)
461 {
462         return (os.conn >= C_CONNECTED && ns.conn >= C_CONNECTED &&
463                  ((os.role != R_PRIMARY && ns.role == R_PRIMARY) ||
464                   (os.conn != C_STARTING_SYNC_T && ns.conn == C_STARTING_SYNC_T) ||
465                   (os.conn != C_STARTING_SYNC_S && ns.conn == C_STARTING_SYNC_S) ||
466                   (os.disk != D_DISKLESS && ns.disk == D_DISKLESS))) ||
467                 (os.conn >= C_CONNECTED && ns.conn == C_DISCONNECTING) ||
468                 (os.conn == C_CONNECTED && ns.conn == C_VERIFY_S);
469 }
470
471 enum drbd_state_rv
472 drbd_change_state(struct drbd_conf *mdev, enum chg_state_flags f,
473                   union drbd_state mask, union drbd_state val)
474 {
475         unsigned long flags;
476         union drbd_state os, ns;
477         enum drbd_state_rv rv;
478
479         spin_lock_irqsave(&mdev->req_lock, flags);
480         os = mdev->state;
481         ns.i = (os.i & ~mask.i) | val.i;
482         rv = _drbd_set_state(mdev, ns, f, NULL);
483         ns = mdev->state;
484         spin_unlock_irqrestore(&mdev->req_lock, flags);
485
486         return rv;
487 }
488
489 /**
490  * drbd_force_state() - Impose a change which happens outside our control on our state
491  * @mdev:       DRBD device.
492  * @mask:       mask of state bits to change.
493  * @val:        value of new state bits.
494  */
495 void drbd_force_state(struct drbd_conf *mdev,
496         union drbd_state mask, union drbd_state val)
497 {
498         drbd_change_state(mdev, CS_HARD, mask, val);
499 }
500
501 static enum drbd_state_rv is_valid_state(struct drbd_conf *, union drbd_state);
502 static enum drbd_state_rv is_valid_state_transition(struct drbd_conf *,
503                                                     union drbd_state,
504                                                     union drbd_state);
505 static union drbd_state sanitize_state(struct drbd_conf *mdev, union drbd_state os,
506                                        union drbd_state ns, const char **warn_sync_abort);
507 int drbd_send_state_req(struct drbd_conf *,
508                         union drbd_state, union drbd_state);
509
510 static enum drbd_state_rv
511 _req_st_cond(struct drbd_conf *mdev, union drbd_state mask,
512              union drbd_state val)
513 {
514         union drbd_state os, ns;
515         unsigned long flags;
516         enum drbd_state_rv rv;
517
518         if (test_and_clear_bit(CL_ST_CHG_SUCCESS, &mdev->flags))
519                 return SS_CW_SUCCESS;
520
521         if (test_and_clear_bit(CL_ST_CHG_FAIL, &mdev->flags))
522                 return SS_CW_FAILED_BY_PEER;
523
524         rv = 0;
525         spin_lock_irqsave(&mdev->req_lock, flags);
526         os = mdev->state;
527         ns.i = (os.i & ~mask.i) | val.i;
528         ns = sanitize_state(mdev, os, ns, NULL);
529
530         if (!cl_wide_st_chg(mdev, os, ns))
531                 rv = SS_CW_NO_NEED;
532         if (!rv) {
533                 rv = is_valid_state(mdev, ns);
534                 if (rv == SS_SUCCESS) {
535                         rv = is_valid_state_transition(mdev, ns, os);
536                         if (rv == SS_SUCCESS)
537                                 rv = SS_UNKNOWN_ERROR; /* cont waiting, otherwise fail. */
538                 }
539         }
540         spin_unlock_irqrestore(&mdev->req_lock, flags);
541
542         return rv;
543 }
544
545 /**
546  * drbd_req_state() - Perform an eventually cluster wide state change
547  * @mdev:       DRBD device.
548  * @mask:       mask of state bits to change.
549  * @val:        value of new state bits.
550  * @f:          flags
551  *
552  * Should not be called directly, use drbd_request_state() or
553  * _drbd_request_state().
554  */
555 static enum drbd_state_rv
556 drbd_req_state(struct drbd_conf *mdev, union drbd_state mask,
557                union drbd_state val, enum chg_state_flags f)
558 {
559         struct completion done;
560         unsigned long flags;
561         union drbd_state os, ns;
562         enum drbd_state_rv rv;
563
564         init_completion(&done);
565
566         if (f & CS_SERIALIZE)
567                 mutex_lock(&mdev->state_mutex);
568
569         spin_lock_irqsave(&mdev->req_lock, flags);
570         os = mdev->state;
571         ns.i = (os.i & ~mask.i) | val.i;
572         ns = sanitize_state(mdev, os, ns, NULL);
573
574         if (cl_wide_st_chg(mdev, os, ns)) {
575                 rv = is_valid_state(mdev, ns);
576                 if (rv == SS_SUCCESS)
577                         rv = is_valid_state_transition(mdev, ns, os);
578                 spin_unlock_irqrestore(&mdev->req_lock, flags);
579
580                 if (rv < SS_SUCCESS) {
581                         if (f & CS_VERBOSE)
582                                 print_st_err(mdev, os, ns, rv);
583                         goto abort;
584                 }
585
586                 drbd_state_lock(mdev);
587                 if (!drbd_send_state_req(mdev, mask, val)) {
588                         drbd_state_unlock(mdev);
589                         rv = SS_CW_FAILED_BY_PEER;
590                         if (f & CS_VERBOSE)
591                                 print_st_err(mdev, os, ns, rv);
592                         goto abort;
593                 }
594
595                 wait_event(mdev->state_wait,
596                         (rv = _req_st_cond(mdev, mask, val)));
597
598                 if (rv < SS_SUCCESS) {
599                         drbd_state_unlock(mdev);
600                         if (f & CS_VERBOSE)
601                                 print_st_err(mdev, os, ns, rv);
602                         goto abort;
603                 }
604                 spin_lock_irqsave(&mdev->req_lock, flags);
605                 os = mdev->state;
606                 ns.i = (os.i & ~mask.i) | val.i;
607                 rv = _drbd_set_state(mdev, ns, f, &done);
608                 drbd_state_unlock(mdev);
609         } else {
610                 rv = _drbd_set_state(mdev, ns, f, &done);
611         }
612
613         spin_unlock_irqrestore(&mdev->req_lock, flags);
614
615         if (f & CS_WAIT_COMPLETE && rv == SS_SUCCESS) {
616                 D_ASSERT(current != mdev->worker.task);
617                 wait_for_completion(&done);
618         }
619
620 abort:
621         if (f & CS_SERIALIZE)
622                 mutex_unlock(&mdev->state_mutex);
623
624         return rv;
625 }
626
627 /**
628  * _drbd_request_state() - Request a state change (with flags)
629  * @mdev:       DRBD device.
630  * @mask:       mask of state bits to change.
631  * @val:        value of new state bits.
632  * @f:          flags
633  *
634  * Cousin of drbd_request_state(), useful with the CS_WAIT_COMPLETE
635  * flag, or when logging of failed state change requests is not desired.
636  */
637 enum drbd_state_rv
638 _drbd_request_state(struct drbd_conf *mdev, union drbd_state mask,
639                     union drbd_state val, enum chg_state_flags f)
640 {
641         enum drbd_state_rv rv;
642
643         wait_event(mdev->state_wait,
644                    (rv = drbd_req_state(mdev, mask, val, f)) != SS_IN_TRANSIENT_STATE);
645
646         return rv;
647 }
648
649 static void print_st(struct drbd_conf *mdev, char *name, union drbd_state ns)
650 {
651         dev_err(DEV, " %s = { cs:%s ro:%s/%s ds:%s/%s %c%c%c%c }\n",
652             name,
653             drbd_conn_str(ns.conn),
654             drbd_role_str(ns.role),
655             drbd_role_str(ns.peer),
656             drbd_disk_str(ns.disk),
657             drbd_disk_str(ns.pdsk),
658             is_susp(ns) ? 's' : 'r',
659             ns.aftr_isp ? 'a' : '-',
660             ns.peer_isp ? 'p' : '-',
661             ns.user_isp ? 'u' : '-'
662             );
663 }
664
665 void print_st_err(struct drbd_conf *mdev, union drbd_state os,
666                   union drbd_state ns, enum drbd_state_rv err)
667 {
668         if (err == SS_IN_TRANSIENT_STATE)
669                 return;
670         dev_err(DEV, "State change failed: %s\n", drbd_set_st_err_str(err));
671         print_st(mdev, " state", os);
672         print_st(mdev, "wanted", ns);
673 }
674
675
676 /**
677  * is_valid_state() - Returns an SS_ error code if ns is not valid
678  * @mdev:       DRBD device.
679  * @ns:         State to consider.
680  */
681 static enum drbd_state_rv
682 is_valid_state(struct drbd_conf *mdev, union drbd_state ns)
683 {
684         /* See drbd_state_sw_errors in drbd_strings.c */
685
686         enum drbd_fencing_p fp;
687         enum drbd_state_rv rv = SS_SUCCESS;
688
689         fp = FP_DONT_CARE;
690         if (get_ldev(mdev)) {
691                 fp = mdev->ldev->dc.fencing;
692                 put_ldev(mdev);
693         }
694
695         if (get_net_conf(mdev->tconn)) {
696                 if (!mdev->tconn->net_conf->two_primaries &&
697                     ns.role == R_PRIMARY && ns.peer == R_PRIMARY)
698                         rv = SS_TWO_PRIMARIES;
699                 put_net_conf(mdev->tconn);
700         }
701
702         if (rv <= 0)
703                 /* already found a reason to abort */;
704         else if (ns.role == R_SECONDARY && mdev->open_cnt)
705                 rv = SS_DEVICE_IN_USE;
706
707         else if (ns.role == R_PRIMARY && ns.conn < C_CONNECTED && ns.disk < D_UP_TO_DATE)
708                 rv = SS_NO_UP_TO_DATE_DISK;
709
710         else if (fp >= FP_RESOURCE &&
711                  ns.role == R_PRIMARY && ns.conn < C_CONNECTED && ns.pdsk >= D_UNKNOWN)
712                 rv = SS_PRIMARY_NOP;
713
714         else if (ns.role == R_PRIMARY && ns.disk <= D_INCONSISTENT && ns.pdsk <= D_INCONSISTENT)
715                 rv = SS_NO_UP_TO_DATE_DISK;
716
717         else if (ns.conn > C_CONNECTED && ns.disk < D_INCONSISTENT)
718                 rv = SS_NO_LOCAL_DISK;
719
720         else if (ns.conn > C_CONNECTED && ns.pdsk < D_INCONSISTENT)
721                 rv = SS_NO_REMOTE_DISK;
722
723         else if (ns.conn > C_CONNECTED && ns.disk < D_UP_TO_DATE && ns.pdsk < D_UP_TO_DATE)
724                 rv = SS_NO_UP_TO_DATE_DISK;
725
726         else if ((ns.conn == C_CONNECTED ||
727                   ns.conn == C_WF_BITMAP_S ||
728                   ns.conn == C_SYNC_SOURCE ||
729                   ns.conn == C_PAUSED_SYNC_S) &&
730                   ns.disk == D_OUTDATED)
731                 rv = SS_CONNECTED_OUTDATES;
732
733         else if ((ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T) &&
734                  (mdev->sync_conf.verify_alg[0] == 0))
735                 rv = SS_NO_VERIFY_ALG;
736
737         else if ((ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T) &&
738                   mdev->agreed_pro_version < 88)
739                 rv = SS_NOT_SUPPORTED;
740
741         else if (ns.conn >= C_CONNECTED && ns.pdsk == D_UNKNOWN)
742                 rv = SS_CONNECTED_OUTDATES;
743
744         return rv;
745 }
746
747 /**
748  * is_valid_state_transition() - Returns an SS_ error code if the state transition is not possible
749  * @mdev:       DRBD device.
750  * @ns:         new state.
751  * @os:         old state.
752  */
753 static enum drbd_state_rv
754 is_valid_state_transition(struct drbd_conf *mdev, union drbd_state ns,
755                           union drbd_state os)
756 {
757         enum drbd_state_rv rv = SS_SUCCESS;
758
759         if ((ns.conn == C_STARTING_SYNC_T || ns.conn == C_STARTING_SYNC_S) &&
760             os.conn > C_CONNECTED)
761                 rv = SS_RESYNC_RUNNING;
762
763         if (ns.conn == C_DISCONNECTING && os.conn == C_STANDALONE)
764                 rv = SS_ALREADY_STANDALONE;
765
766         if (ns.disk > D_ATTACHING && os.disk == D_DISKLESS)
767                 rv = SS_IS_DISKLESS;
768
769         if (ns.conn == C_WF_CONNECTION && os.conn < C_UNCONNECTED)
770                 rv = SS_NO_NET_CONFIG;
771
772         if (ns.disk == D_OUTDATED && os.disk < D_OUTDATED && os.disk != D_ATTACHING)
773                 rv = SS_LOWER_THAN_OUTDATED;
774
775         if (ns.conn == C_DISCONNECTING && os.conn == C_UNCONNECTED)
776                 rv = SS_IN_TRANSIENT_STATE;
777
778         if (ns.conn == os.conn && ns.conn == C_WF_REPORT_PARAMS)
779                 rv = SS_IN_TRANSIENT_STATE;
780
781         if ((ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T) && os.conn < C_CONNECTED)
782                 rv = SS_NEED_CONNECTION;
783
784         if ((ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T) &&
785             ns.conn != os.conn && os.conn > C_CONNECTED)
786                 rv = SS_RESYNC_RUNNING;
787
788         if ((ns.conn == C_STARTING_SYNC_S || ns.conn == C_STARTING_SYNC_T) &&
789             os.conn < C_CONNECTED)
790                 rv = SS_NEED_CONNECTION;
791
792         if ((ns.conn == C_SYNC_TARGET || ns.conn == C_SYNC_SOURCE)
793             && os.conn < C_WF_REPORT_PARAMS)
794                 rv = SS_NEED_CONNECTION; /* No NetworkFailure -> SyncTarget etc... */
795
796         return rv;
797 }
798
799 /**
800  * sanitize_state() - Resolves implicitly necessary additional changes to a state transition
801  * @mdev:       DRBD device.
802  * @os:         old state.
803  * @ns:         new state.
804  * @warn_sync_abort:
805  *
806  * When we loose connection, we have to set the state of the peers disk (pdsk)
807  * to D_UNKNOWN. This rule and many more along those lines are in this function.
808  */
809 static union drbd_state sanitize_state(struct drbd_conf *mdev, union drbd_state os,
810                                        union drbd_state ns, const char **warn_sync_abort)
811 {
812         enum drbd_fencing_p fp;
813         enum drbd_disk_state disk_min, disk_max, pdsk_min, pdsk_max;
814
815         fp = FP_DONT_CARE;
816         if (get_ldev(mdev)) {
817                 fp = mdev->ldev->dc.fencing;
818                 put_ldev(mdev);
819         }
820
821         /* Disallow Network errors to configure a device's network part */
822         if ((ns.conn >= C_TIMEOUT && ns.conn <= C_TEAR_DOWN) &&
823             os.conn <= C_DISCONNECTING)
824                 ns.conn = os.conn;
825
826         /* After a network error (+C_TEAR_DOWN) only C_UNCONNECTED or C_DISCONNECTING can follow.
827          * If you try to go into some Sync* state, that shall fail (elsewhere). */
828         if (os.conn >= C_TIMEOUT && os.conn <= C_TEAR_DOWN &&
829             ns.conn != C_UNCONNECTED && ns.conn != C_DISCONNECTING && ns.conn <= C_TEAR_DOWN)
830                 ns.conn = os.conn;
831
832         /* we cannot fail (again) if we already detached */
833         if (ns.disk == D_FAILED && os.disk == D_DISKLESS)
834                 ns.disk = D_DISKLESS;
835
836         /* if we are only D_ATTACHING yet,
837          * we can (and should) go directly to D_DISKLESS. */
838         if (ns.disk == D_FAILED && os.disk == D_ATTACHING)
839                 ns.disk = D_DISKLESS;
840
841         /* After C_DISCONNECTING only C_STANDALONE may follow */
842         if (os.conn == C_DISCONNECTING && ns.conn != C_STANDALONE)
843                 ns.conn = os.conn;
844
845         if (ns.conn < C_CONNECTED) {
846                 ns.peer_isp = 0;
847                 ns.peer = R_UNKNOWN;
848                 if (ns.pdsk > D_UNKNOWN || ns.pdsk < D_INCONSISTENT)
849                         ns.pdsk = D_UNKNOWN;
850         }
851
852         /* Clear the aftr_isp when becoming unconfigured */
853         if (ns.conn == C_STANDALONE && ns.disk == D_DISKLESS && ns.role == R_SECONDARY)
854                 ns.aftr_isp = 0;
855
856         /* Abort resync if a disk fails/detaches */
857         if (os.conn > C_CONNECTED && ns.conn > C_CONNECTED &&
858             (ns.disk <= D_FAILED || ns.pdsk <= D_FAILED)) {
859                 if (warn_sync_abort)
860                         *warn_sync_abort =
861                                 os.conn == C_VERIFY_S || os.conn == C_VERIFY_T ?
862                                 "Online-verify" : "Resync";
863                 ns.conn = C_CONNECTED;
864         }
865
866         /* Connection breaks down before we finished "Negotiating" */
867         if (ns.conn < C_CONNECTED && ns.disk == D_NEGOTIATING &&
868             get_ldev_if_state(mdev, D_NEGOTIATING)) {
869                 if (mdev->ed_uuid == mdev->ldev->md.uuid[UI_CURRENT]) {
870                         ns.disk = mdev->new_state_tmp.disk;
871                         ns.pdsk = mdev->new_state_tmp.pdsk;
872                 } else {
873                         dev_alert(DEV, "Connection lost while negotiating, no data!\n");
874                         ns.disk = D_DISKLESS;
875                         ns.pdsk = D_UNKNOWN;
876                 }
877                 put_ldev(mdev);
878         }
879
880         /* D_CONSISTENT and D_OUTDATED vanish when we get connected */
881         if (ns.conn >= C_CONNECTED && ns.conn < C_AHEAD) {
882                 if (ns.disk == D_CONSISTENT || ns.disk == D_OUTDATED)
883                         ns.disk = D_UP_TO_DATE;
884                 if (ns.pdsk == D_CONSISTENT || ns.pdsk == D_OUTDATED)
885                         ns.pdsk = D_UP_TO_DATE;
886         }
887
888         /* Implications of the connection stat on the disk states */
889         disk_min = D_DISKLESS;
890         disk_max = D_UP_TO_DATE;
891         pdsk_min = D_INCONSISTENT;
892         pdsk_max = D_UNKNOWN;
893         switch ((enum drbd_conns)ns.conn) {
894         case C_WF_BITMAP_T:
895         case C_PAUSED_SYNC_T:
896         case C_STARTING_SYNC_T:
897         case C_WF_SYNC_UUID:
898         case C_BEHIND:
899                 disk_min = D_INCONSISTENT;
900                 disk_max = D_OUTDATED;
901                 pdsk_min = D_UP_TO_DATE;
902                 pdsk_max = D_UP_TO_DATE;
903                 break;
904         case C_VERIFY_S:
905         case C_VERIFY_T:
906                 disk_min = D_UP_TO_DATE;
907                 disk_max = D_UP_TO_DATE;
908                 pdsk_min = D_UP_TO_DATE;
909                 pdsk_max = D_UP_TO_DATE;
910                 break;
911         case C_CONNECTED:
912                 disk_min = D_DISKLESS;
913                 disk_max = D_UP_TO_DATE;
914                 pdsk_min = D_DISKLESS;
915                 pdsk_max = D_UP_TO_DATE;
916                 break;
917         case C_WF_BITMAP_S:
918         case C_PAUSED_SYNC_S:
919         case C_STARTING_SYNC_S:
920         case C_AHEAD:
921                 disk_min = D_UP_TO_DATE;
922                 disk_max = D_UP_TO_DATE;
923                 pdsk_min = D_INCONSISTENT;
924                 pdsk_max = D_CONSISTENT; /* D_OUTDATED would be nice. But explicit outdate necessary*/
925                 break;
926         case C_SYNC_TARGET:
927                 disk_min = D_INCONSISTENT;
928                 disk_max = D_INCONSISTENT;
929                 pdsk_min = D_UP_TO_DATE;
930                 pdsk_max = D_UP_TO_DATE;
931                 break;
932         case C_SYNC_SOURCE:
933                 disk_min = D_UP_TO_DATE;
934                 disk_max = D_UP_TO_DATE;
935                 pdsk_min = D_INCONSISTENT;
936                 pdsk_max = D_INCONSISTENT;
937                 break;
938         case C_STANDALONE:
939         case C_DISCONNECTING:
940         case C_UNCONNECTED:
941         case C_TIMEOUT:
942         case C_BROKEN_PIPE:
943         case C_NETWORK_FAILURE:
944         case C_PROTOCOL_ERROR:
945         case C_TEAR_DOWN:
946         case C_WF_CONNECTION:
947         case C_WF_REPORT_PARAMS:
948         case C_MASK:
949                 break;
950         }
951         if (ns.disk > disk_max)
952                 ns.disk = disk_max;
953
954         if (ns.disk < disk_min) {
955                 dev_warn(DEV, "Implicitly set disk from %s to %s\n",
956                          drbd_disk_str(ns.disk), drbd_disk_str(disk_min));
957                 ns.disk = disk_min;
958         }
959         if (ns.pdsk > pdsk_max)
960                 ns.pdsk = pdsk_max;
961
962         if (ns.pdsk < pdsk_min) {
963                 dev_warn(DEV, "Implicitly set pdsk from %s to %s\n",
964                          drbd_disk_str(ns.pdsk), drbd_disk_str(pdsk_min));
965                 ns.pdsk = pdsk_min;
966         }
967
968         if (fp == FP_STONITH &&
969             (ns.role == R_PRIMARY && ns.conn < C_CONNECTED && ns.pdsk > D_OUTDATED) &&
970             !(os.role == R_PRIMARY && os.conn < C_CONNECTED && os.pdsk > D_OUTDATED))
971                 ns.susp_fen = 1; /* Suspend IO while fence-peer handler runs (peer lost) */
972
973         if (mdev->sync_conf.on_no_data == OND_SUSPEND_IO &&
974             (ns.role == R_PRIMARY && ns.disk < D_UP_TO_DATE && ns.pdsk < D_UP_TO_DATE) &&
975             !(os.role == R_PRIMARY && os.disk < D_UP_TO_DATE && os.pdsk < D_UP_TO_DATE))
976                 ns.susp_nod = 1; /* Suspend IO while no data available (no accessible data available) */
977
978         if (ns.aftr_isp || ns.peer_isp || ns.user_isp) {
979                 if (ns.conn == C_SYNC_SOURCE)
980                         ns.conn = C_PAUSED_SYNC_S;
981                 if (ns.conn == C_SYNC_TARGET)
982                         ns.conn = C_PAUSED_SYNC_T;
983         } else {
984                 if (ns.conn == C_PAUSED_SYNC_S)
985                         ns.conn = C_SYNC_SOURCE;
986                 if (ns.conn == C_PAUSED_SYNC_T)
987                         ns.conn = C_SYNC_TARGET;
988         }
989
990         return ns;
991 }
992
993 /* helper for __drbd_set_state */
994 static void set_ov_position(struct drbd_conf *mdev, enum drbd_conns cs)
995 {
996         if (mdev->agreed_pro_version < 90)
997                 mdev->ov_start_sector = 0;
998         mdev->rs_total = drbd_bm_bits(mdev);
999         mdev->ov_position = 0;
1000         if (cs == C_VERIFY_T) {
1001                 /* starting online verify from an arbitrary position
1002                  * does not fit well into the existing protocol.
1003                  * on C_VERIFY_T, we initialize ov_left and friends
1004                  * implicitly in receive_DataRequest once the
1005                  * first P_OV_REQUEST is received */
1006                 mdev->ov_start_sector = ~(sector_t)0;
1007         } else {
1008                 unsigned long bit = BM_SECT_TO_BIT(mdev->ov_start_sector);
1009                 if (bit >= mdev->rs_total) {
1010                         mdev->ov_start_sector =
1011                                 BM_BIT_TO_SECT(mdev->rs_total - 1);
1012                         mdev->rs_total = 1;
1013                 } else
1014                         mdev->rs_total -= bit;
1015                 mdev->ov_position = mdev->ov_start_sector;
1016         }
1017         mdev->ov_left = mdev->rs_total;
1018 }
1019
1020 static void drbd_resume_al(struct drbd_conf *mdev)
1021 {
1022         if (test_and_clear_bit(AL_SUSPENDED, &mdev->flags))
1023                 dev_info(DEV, "Resumed AL updates\n");
1024 }
1025
1026 /**
1027  * __drbd_set_state() - Set a new DRBD state
1028  * @mdev:       DRBD device.
1029  * @ns:         new state.
1030  * @flags:      Flags
1031  * @done:       Optional completion, that will get completed after the after_state_ch() finished
1032  *
1033  * Caller needs to hold req_lock, and global_state_lock. Do not call directly.
1034  */
1035 enum drbd_state_rv
1036 __drbd_set_state(struct drbd_conf *mdev, union drbd_state ns,
1037                  enum chg_state_flags flags, struct completion *done)
1038 {
1039         union drbd_state os;
1040         enum drbd_state_rv rv = SS_SUCCESS;
1041         const char *warn_sync_abort = NULL;
1042         struct after_state_chg_work *ascw;
1043
1044         os = mdev->state;
1045
1046         ns = sanitize_state(mdev, os, ns, &warn_sync_abort);
1047
1048         if (ns.i == os.i)
1049                 return SS_NOTHING_TO_DO;
1050
1051         if (!(flags & CS_HARD)) {
1052                 /*  pre-state-change checks ; only look at ns  */
1053                 /* See drbd_state_sw_errors in drbd_strings.c */
1054
1055                 rv = is_valid_state(mdev, ns);
1056                 if (rv < SS_SUCCESS) {
1057                         /* If the old state was illegal as well, then let
1058                            this happen...*/
1059
1060                         if (is_valid_state(mdev, os) == rv)
1061                                 rv = is_valid_state_transition(mdev, ns, os);
1062                 } else
1063                         rv = is_valid_state_transition(mdev, ns, os);
1064         }
1065
1066         if (rv < SS_SUCCESS) {
1067                 if (flags & CS_VERBOSE)
1068                         print_st_err(mdev, os, ns, rv);
1069                 return rv;
1070         }
1071
1072         if (warn_sync_abort)
1073                 dev_warn(DEV, "%s aborted.\n", warn_sync_abort);
1074
1075         {
1076         char *pbp, pb[300];
1077         pbp = pb;
1078         *pbp = 0;
1079         if (ns.role != os.role)
1080                 pbp += sprintf(pbp, "role( %s -> %s ) ",
1081                                drbd_role_str(os.role),
1082                                drbd_role_str(ns.role));
1083         if (ns.peer != os.peer)
1084                 pbp += sprintf(pbp, "peer( %s -> %s ) ",
1085                                drbd_role_str(os.peer),
1086                                drbd_role_str(ns.peer));
1087         if (ns.conn != os.conn)
1088                 pbp += sprintf(pbp, "conn( %s -> %s ) ",
1089                                drbd_conn_str(os.conn),
1090                                drbd_conn_str(ns.conn));
1091         if (ns.disk != os.disk)
1092                 pbp += sprintf(pbp, "disk( %s -> %s ) ",
1093                                drbd_disk_str(os.disk),
1094                                drbd_disk_str(ns.disk));
1095         if (ns.pdsk != os.pdsk)
1096                 pbp += sprintf(pbp, "pdsk( %s -> %s ) ",
1097                                drbd_disk_str(os.pdsk),
1098                                drbd_disk_str(ns.pdsk));
1099         if (is_susp(ns) != is_susp(os))
1100                 pbp += sprintf(pbp, "susp( %d -> %d ) ",
1101                                is_susp(os),
1102                                is_susp(ns));
1103         if (ns.aftr_isp != os.aftr_isp)
1104                 pbp += sprintf(pbp, "aftr_isp( %d -> %d ) ",
1105                                os.aftr_isp,
1106                                ns.aftr_isp);
1107         if (ns.peer_isp != os.peer_isp)
1108                 pbp += sprintf(pbp, "peer_isp( %d -> %d ) ",
1109                                os.peer_isp,
1110                                ns.peer_isp);
1111         if (ns.user_isp != os.user_isp)
1112                 pbp += sprintf(pbp, "user_isp( %d -> %d ) ",
1113                                os.user_isp,
1114                                ns.user_isp);
1115         dev_info(DEV, "%s\n", pb);
1116         }
1117
1118         /* solve the race between becoming unconfigured,
1119          * worker doing the cleanup, and
1120          * admin reconfiguring us:
1121          * on (re)configure, first set CONFIG_PENDING,
1122          * then wait for a potentially exiting worker,
1123          * start the worker, and schedule one no_op.
1124          * then proceed with configuration.
1125          */
1126         if (ns.disk == D_DISKLESS &&
1127             ns.conn == C_STANDALONE &&
1128             ns.role == R_SECONDARY &&
1129             !test_and_set_bit(CONFIG_PENDING, &mdev->flags))
1130                 set_bit(DEVICE_DYING, &mdev->flags);
1131
1132         /* if we are going -> D_FAILED or D_DISKLESS, grab one extra reference
1133          * on the ldev here, to be sure the transition -> D_DISKLESS resp.
1134          * drbd_ldev_destroy() won't happen before our corresponding
1135          * after_state_ch works run, where we put_ldev again. */
1136         if ((os.disk != D_FAILED && ns.disk == D_FAILED) ||
1137             (os.disk != D_DISKLESS && ns.disk == D_DISKLESS))
1138                 atomic_inc(&mdev->local_cnt);
1139
1140         mdev->state = ns;
1141
1142         if (os.disk == D_ATTACHING && ns.disk >= D_NEGOTIATING)
1143                 drbd_print_uuids(mdev, "attached to UUIDs");
1144
1145         wake_up(&mdev->misc_wait);
1146         wake_up(&mdev->state_wait);
1147
1148         /* aborted verify run. log the last position */
1149         if ((os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) &&
1150             ns.conn < C_CONNECTED) {
1151                 mdev->ov_start_sector =
1152                         BM_BIT_TO_SECT(drbd_bm_bits(mdev) - mdev->ov_left);
1153                 dev_info(DEV, "Online Verify reached sector %llu\n",
1154                         (unsigned long long)mdev->ov_start_sector);
1155         }
1156
1157         if ((os.conn == C_PAUSED_SYNC_T || os.conn == C_PAUSED_SYNC_S) &&
1158             (ns.conn == C_SYNC_TARGET  || ns.conn == C_SYNC_SOURCE)) {
1159                 dev_info(DEV, "Syncer continues.\n");
1160                 mdev->rs_paused += (long)jiffies
1161                                   -(long)mdev->rs_mark_time[mdev->rs_last_mark];
1162                 if (ns.conn == C_SYNC_TARGET)
1163                         mod_timer(&mdev->resync_timer, jiffies);
1164         }
1165
1166         if ((os.conn == C_SYNC_TARGET  || os.conn == C_SYNC_SOURCE) &&
1167             (ns.conn == C_PAUSED_SYNC_T || ns.conn == C_PAUSED_SYNC_S)) {
1168                 dev_info(DEV, "Resync suspended\n");
1169                 mdev->rs_mark_time[mdev->rs_last_mark] = jiffies;
1170         }
1171
1172         if (os.conn == C_CONNECTED &&
1173             (ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T)) {
1174                 unsigned long now = jiffies;
1175                 int i;
1176
1177                 set_ov_position(mdev, ns.conn);
1178                 mdev->rs_start = now;
1179                 mdev->rs_last_events = 0;
1180                 mdev->rs_last_sect_ev = 0;
1181                 mdev->ov_last_oos_size = 0;
1182                 mdev->ov_last_oos_start = 0;
1183
1184                 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1185                         mdev->rs_mark_left[i] = mdev->ov_left;
1186                         mdev->rs_mark_time[i] = now;
1187                 }
1188
1189                 drbd_rs_controller_reset(mdev);
1190
1191                 if (ns.conn == C_VERIFY_S) {
1192                         dev_info(DEV, "Starting Online Verify from sector %llu\n",
1193                                         (unsigned long long)mdev->ov_position);
1194                         mod_timer(&mdev->resync_timer, jiffies);
1195                 }
1196         }
1197
1198         if (get_ldev(mdev)) {
1199                 u32 mdf = mdev->ldev->md.flags & ~(MDF_CONSISTENT|MDF_PRIMARY_IND|
1200                                                  MDF_CONNECTED_IND|MDF_WAS_UP_TO_DATE|
1201                                                  MDF_PEER_OUT_DATED|MDF_CRASHED_PRIMARY);
1202
1203                 if (test_bit(CRASHED_PRIMARY, &mdev->flags))
1204                         mdf |= MDF_CRASHED_PRIMARY;
1205                 if (mdev->state.role == R_PRIMARY ||
1206                     (mdev->state.pdsk < D_INCONSISTENT && mdev->state.peer == R_PRIMARY))
1207                         mdf |= MDF_PRIMARY_IND;
1208                 if (mdev->state.conn > C_WF_REPORT_PARAMS)
1209                         mdf |= MDF_CONNECTED_IND;
1210                 if (mdev->state.disk > D_INCONSISTENT)
1211                         mdf |= MDF_CONSISTENT;
1212                 if (mdev->state.disk > D_OUTDATED)
1213                         mdf |= MDF_WAS_UP_TO_DATE;
1214                 if (mdev->state.pdsk <= D_OUTDATED && mdev->state.pdsk >= D_INCONSISTENT)
1215                         mdf |= MDF_PEER_OUT_DATED;
1216                 if (mdf != mdev->ldev->md.flags) {
1217                         mdev->ldev->md.flags = mdf;
1218                         drbd_md_mark_dirty(mdev);
1219                 }
1220                 if (os.disk < D_CONSISTENT && ns.disk >= D_CONSISTENT)
1221                         drbd_set_ed_uuid(mdev, mdev->ldev->md.uuid[UI_CURRENT]);
1222                 put_ldev(mdev);
1223         }
1224
1225         /* Peer was forced D_UP_TO_DATE & R_PRIMARY, consider to resync */
1226         if (os.disk == D_INCONSISTENT && os.pdsk == D_INCONSISTENT &&
1227             os.peer == R_SECONDARY && ns.peer == R_PRIMARY)
1228                 set_bit(CONSIDER_RESYNC, &mdev->flags);
1229
1230         /* Receiver should clean up itself */
1231         if (os.conn != C_DISCONNECTING && ns.conn == C_DISCONNECTING)
1232                 drbd_thread_stop_nowait(&mdev->receiver);
1233
1234         /* Now the receiver finished cleaning up itself, it should die */
1235         if (os.conn != C_STANDALONE && ns.conn == C_STANDALONE)
1236                 drbd_thread_stop_nowait(&mdev->receiver);
1237
1238         /* Upon network failure, we need to restart the receiver. */
1239         if (os.conn > C_TEAR_DOWN &&
1240             ns.conn <= C_TEAR_DOWN && ns.conn >= C_TIMEOUT)
1241                 drbd_thread_restart_nowait(&mdev->receiver);
1242
1243         /* Resume AL writing if we get a connection */
1244         if (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED)
1245                 drbd_resume_al(mdev);
1246
1247         ascw = kmalloc(sizeof(*ascw), GFP_ATOMIC);
1248         if (ascw) {
1249                 ascw->os = os;
1250                 ascw->ns = ns;
1251                 ascw->flags = flags;
1252                 ascw->w.cb = w_after_state_ch;
1253                 ascw->done = done;
1254                 drbd_queue_work(&mdev->data.work, &ascw->w);
1255         } else {
1256                 dev_warn(DEV, "Could not kmalloc an ascw\n");
1257         }
1258
1259         return rv;
1260 }
1261
1262 static int w_after_state_ch(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1263 {
1264         struct after_state_chg_work *ascw =
1265                 container_of(w, struct after_state_chg_work, w);
1266         after_state_ch(mdev, ascw->os, ascw->ns, ascw->flags);
1267         if (ascw->flags & CS_WAIT_COMPLETE) {
1268                 D_ASSERT(ascw->done != NULL);
1269                 complete(ascw->done);
1270         }
1271         kfree(ascw);
1272
1273         return 1;
1274 }
1275
1276 static void abw_start_sync(struct drbd_conf *mdev, int rv)
1277 {
1278         if (rv) {
1279                 dev_err(DEV, "Writing the bitmap failed not starting resync.\n");
1280                 _drbd_request_state(mdev, NS(conn, C_CONNECTED), CS_VERBOSE);
1281                 return;
1282         }
1283
1284         switch (mdev->state.conn) {
1285         case C_STARTING_SYNC_T:
1286                 _drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
1287                 break;
1288         case C_STARTING_SYNC_S:
1289                 drbd_start_resync(mdev, C_SYNC_SOURCE);
1290                 break;
1291         }
1292 }
1293
1294 int drbd_bitmap_io_from_worker(struct drbd_conf *mdev,
1295                 int (*io_fn)(struct drbd_conf *),
1296                 char *why, enum bm_flag flags)
1297 {
1298         int rv;
1299
1300         D_ASSERT(current == mdev->worker.task);
1301
1302         /* open coded non-blocking drbd_suspend_io(mdev); */
1303         set_bit(SUSPEND_IO, &mdev->flags);
1304
1305         drbd_bm_lock(mdev, why, flags);
1306         rv = io_fn(mdev);
1307         drbd_bm_unlock(mdev);
1308
1309         drbd_resume_io(mdev);
1310
1311         return rv;
1312 }
1313
1314 /**
1315  * after_state_ch() - Perform after state change actions that may sleep
1316  * @mdev:       DRBD device.
1317  * @os:         old state.
1318  * @ns:         new state.
1319  * @flags:      Flags
1320  */
1321 static void after_state_ch(struct drbd_conf *mdev, union drbd_state os,
1322                            union drbd_state ns, enum chg_state_flags flags)
1323 {
1324         enum drbd_fencing_p fp;
1325         enum drbd_req_event what = NOTHING;
1326         union drbd_state nsm = (union drbd_state){ .i = -1 };
1327
1328         if (os.conn != C_CONNECTED && ns.conn == C_CONNECTED) {
1329                 clear_bit(CRASHED_PRIMARY, &mdev->flags);
1330                 if (mdev->p_uuid)
1331                         mdev->p_uuid[UI_FLAGS] &= ~((u64)2);
1332         }
1333
1334         fp = FP_DONT_CARE;
1335         if (get_ldev(mdev)) {
1336                 fp = mdev->ldev->dc.fencing;
1337                 put_ldev(mdev);
1338         }
1339
1340         /* Inform userspace about the change... */
1341         drbd_bcast_state(mdev, ns);
1342
1343         if (!(os.role == R_PRIMARY && os.disk < D_UP_TO_DATE && os.pdsk < D_UP_TO_DATE) &&
1344             (ns.role == R_PRIMARY && ns.disk < D_UP_TO_DATE && ns.pdsk < D_UP_TO_DATE))
1345                 drbd_khelper(mdev, "pri-on-incon-degr");
1346
1347         /* Here we have the actions that are performed after a
1348            state change. This function might sleep */
1349
1350         nsm.i = -1;
1351         if (ns.susp_nod) {
1352                 if (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED)
1353                         what = RESEND;
1354
1355                 if (os.disk == D_ATTACHING && ns.disk > D_ATTACHING)
1356                         what = RESTART_FROZEN_DISK_IO;
1357
1358                 if (what != NOTHING)
1359                         nsm.susp_nod = 0;
1360         }
1361
1362         if (ns.susp_fen) {
1363                 /* case1: The outdate peer handler is successful: */
1364                 if (os.pdsk > D_OUTDATED  && ns.pdsk <= D_OUTDATED) {
1365                         tl_clear(mdev);
1366                         if (test_bit(NEW_CUR_UUID, &mdev->flags)) {
1367                                 drbd_uuid_new_current(mdev);
1368                                 clear_bit(NEW_CUR_UUID, &mdev->flags);
1369                         }
1370                         spin_lock_irq(&mdev->req_lock);
1371                         _drbd_set_state(_NS(mdev, susp_fen, 0), CS_VERBOSE, NULL);
1372                         spin_unlock_irq(&mdev->req_lock);
1373                 }
1374                 /* case2: The connection was established again: */
1375                 if (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED) {
1376                         clear_bit(NEW_CUR_UUID, &mdev->flags);
1377                         what = RESEND;
1378                         nsm.susp_fen = 0;
1379                 }
1380         }
1381
1382         if (what != NOTHING) {
1383                 spin_lock_irq(&mdev->req_lock);
1384                 _tl_restart(mdev, what);
1385                 nsm.i &= mdev->state.i;
1386                 _drbd_set_state(mdev, nsm, CS_VERBOSE, NULL);
1387                 spin_unlock_irq(&mdev->req_lock);
1388         }
1389
1390         /* Became sync source.  With protocol >= 96, we still need to send out
1391          * the sync uuid now. Need to do that before any drbd_send_state, or
1392          * the other side may go "paused sync" before receiving the sync uuids,
1393          * which is unexpected. */
1394         if ((os.conn != C_SYNC_SOURCE && os.conn != C_PAUSED_SYNC_S) &&
1395             (ns.conn == C_SYNC_SOURCE || ns.conn == C_PAUSED_SYNC_S) &&
1396             mdev->agreed_pro_version >= 96 && get_ldev(mdev)) {
1397                 drbd_gen_and_send_sync_uuid(mdev);
1398                 put_ldev(mdev);
1399         }
1400
1401         /* Do not change the order of the if above and the two below... */
1402         if (os.pdsk == D_DISKLESS && ns.pdsk > D_DISKLESS) {      /* attach on the peer */
1403                 drbd_send_uuids(mdev);
1404                 drbd_send_state(mdev);
1405         }
1406         /* No point in queuing send_bitmap if we don't have a connection
1407          * anymore, so check also the _current_ state, not only the new state
1408          * at the time this work was queued. */
1409         if (os.conn != C_WF_BITMAP_S && ns.conn == C_WF_BITMAP_S &&
1410             mdev->state.conn == C_WF_BITMAP_S)
1411                 drbd_queue_bitmap_io(mdev, &drbd_send_bitmap, NULL,
1412                                 "send_bitmap (WFBitMapS)",
1413                                 BM_LOCKED_TEST_ALLOWED);
1414
1415         /* Lost contact to peer's copy of the data */
1416         if ((os.pdsk >= D_INCONSISTENT &&
1417              os.pdsk != D_UNKNOWN &&
1418              os.pdsk != D_OUTDATED)
1419         &&  (ns.pdsk < D_INCONSISTENT ||
1420              ns.pdsk == D_UNKNOWN ||
1421              ns.pdsk == D_OUTDATED)) {
1422                 if (get_ldev(mdev)) {
1423                         if ((ns.role == R_PRIMARY || ns.peer == R_PRIMARY) &&
1424                             mdev->ldev->md.uuid[UI_BITMAP] == 0 && ns.disk >= D_UP_TO_DATE) {
1425                                 if (is_susp(mdev->state)) {
1426                                         set_bit(NEW_CUR_UUID, &mdev->flags);
1427                                 } else {
1428                                         drbd_uuid_new_current(mdev);
1429                                         drbd_send_uuids(mdev);
1430                                 }
1431                         }
1432                         put_ldev(mdev);
1433                 }
1434         }
1435
1436         if (ns.pdsk < D_INCONSISTENT && get_ldev(mdev)) {
1437                 if (ns.peer == R_PRIMARY && mdev->ldev->md.uuid[UI_BITMAP] == 0) {
1438                         drbd_uuid_new_current(mdev);
1439                         drbd_send_uuids(mdev);
1440                 }
1441
1442                 /* D_DISKLESS Peer becomes secondary */
1443                 if (os.peer == R_PRIMARY && ns.peer == R_SECONDARY)
1444                         /* We may still be Primary ourselves.
1445                          * No harm done if the bitmap still changes,
1446                          * redirtied pages will follow later. */
1447                         drbd_bitmap_io_from_worker(mdev, &drbd_bm_write,
1448                                 "demote diskless peer", BM_LOCKED_SET_ALLOWED);
1449                 put_ldev(mdev);
1450         }
1451
1452         /* Write out all changed bits on demote.
1453          * Though, no need to da that just yet
1454          * if there is a resync going on still */
1455         if (os.role == R_PRIMARY && ns.role == R_SECONDARY &&
1456                 mdev->state.conn <= C_CONNECTED && get_ldev(mdev)) {
1457                 /* No changes to the bitmap expected this time, so assert that,
1458                  * even though no harm was done if it did change. */
1459                 drbd_bitmap_io_from_worker(mdev, &drbd_bm_write,
1460                                 "demote", BM_LOCKED_TEST_ALLOWED);
1461                 put_ldev(mdev);
1462         }
1463
1464         /* Last part of the attaching process ... */
1465         if (ns.conn >= C_CONNECTED &&
1466             os.disk == D_ATTACHING && ns.disk == D_NEGOTIATING) {
1467                 drbd_send_sizes(mdev, 0, 0);  /* to start sync... */
1468                 drbd_send_uuids(mdev);
1469                 drbd_send_state(mdev);
1470         }
1471
1472         /* We want to pause/continue resync, tell peer. */
1473         if (ns.conn >= C_CONNECTED &&
1474              ((os.aftr_isp != ns.aftr_isp) ||
1475               (os.user_isp != ns.user_isp)))
1476                 drbd_send_state(mdev);
1477
1478         /* In case one of the isp bits got set, suspend other devices. */
1479         if ((!os.aftr_isp && !os.peer_isp && !os.user_isp) &&
1480             (ns.aftr_isp || ns.peer_isp || ns.user_isp))
1481                 suspend_other_sg(mdev);
1482
1483         /* Make sure the peer gets informed about eventual state
1484            changes (ISP bits) while we were in WFReportParams. */
1485         if (os.conn == C_WF_REPORT_PARAMS && ns.conn >= C_CONNECTED)
1486                 drbd_send_state(mdev);
1487
1488         if (os.conn != C_AHEAD && ns.conn == C_AHEAD)
1489                 drbd_send_state(mdev);
1490
1491         /* We are in the progress to start a full sync... */
1492         if ((os.conn != C_STARTING_SYNC_T && ns.conn == C_STARTING_SYNC_T) ||
1493             (os.conn != C_STARTING_SYNC_S && ns.conn == C_STARTING_SYNC_S))
1494                 /* no other bitmap changes expected during this phase */
1495                 drbd_queue_bitmap_io(mdev,
1496                         &drbd_bmio_set_n_write, &abw_start_sync,
1497                         "set_n_write from StartingSync", BM_LOCKED_TEST_ALLOWED);
1498
1499         /* We are invalidating our self... */
1500         if (os.conn < C_CONNECTED && ns.conn < C_CONNECTED &&
1501             os.disk > D_INCONSISTENT && ns.disk == D_INCONSISTENT)
1502                 /* other bitmap operation expected during this phase */
1503                 drbd_queue_bitmap_io(mdev, &drbd_bmio_set_n_write, NULL,
1504                         "set_n_write from invalidate", BM_LOCKED_MASK);
1505
1506         /* first half of local IO error, failure to attach,
1507          * or administrative detach */
1508         if (os.disk != D_FAILED && ns.disk == D_FAILED) {
1509                 enum drbd_io_error_p eh;
1510                 int was_io_error;
1511                 /* corresponding get_ldev was in __drbd_set_state, to serialize
1512                  * our cleanup here with the transition to D_DISKLESS,
1513                  * so it is safe to dreference ldev here. */
1514                 eh = mdev->ldev->dc.on_io_error;
1515                 was_io_error = test_and_clear_bit(WAS_IO_ERROR, &mdev->flags);
1516
1517                 /* current state still has to be D_FAILED,
1518                  * there is only one way out: to D_DISKLESS,
1519                  * and that may only happen after our put_ldev below. */
1520                 if (mdev->state.disk != D_FAILED)
1521                         dev_err(DEV,
1522                                 "ASSERT FAILED: disk is %s during detach\n",
1523                                 drbd_disk_str(mdev->state.disk));
1524
1525                 if (drbd_send_state(mdev))
1526                         dev_warn(DEV, "Notified peer that I am detaching my disk\n");
1527                 else
1528                         dev_err(DEV, "Sending state for detaching disk failed\n");
1529
1530                 drbd_rs_cancel_all(mdev);
1531
1532                 /* In case we want to get something to stable storage still,
1533                  * this may be the last chance.
1534                  * Following put_ldev may transition to D_DISKLESS. */
1535                 drbd_md_sync(mdev);
1536                 put_ldev(mdev);
1537
1538                 if (was_io_error && eh == EP_CALL_HELPER)
1539                         drbd_khelper(mdev, "local-io-error");
1540         }
1541
1542         /* second half of local IO error, failure to attach,
1543          * or administrative detach,
1544          * after local_cnt references have reached zero again */
1545         if (os.disk != D_DISKLESS && ns.disk == D_DISKLESS) {
1546                 /* We must still be diskless,
1547                  * re-attach has to be serialized with this! */
1548                 if (mdev->state.disk != D_DISKLESS)
1549                         dev_err(DEV,
1550                                 "ASSERT FAILED: disk is %s while going diskless\n",
1551                                 drbd_disk_str(mdev->state.disk));
1552
1553                 mdev->rs_total = 0;
1554                 mdev->rs_failed = 0;
1555                 atomic_set(&mdev->rs_pending_cnt, 0);
1556
1557                 if (drbd_send_state(mdev))
1558                         dev_warn(DEV, "Notified peer that I'm now diskless.\n");
1559                 /* corresponding get_ldev in __drbd_set_state
1560                  * this may finally trigger drbd_ldev_destroy. */
1561                 put_ldev(mdev);
1562         }
1563
1564         /* Notify peer that I had a local IO error, and did not detached.. */
1565         if (os.disk == D_UP_TO_DATE && ns.disk == D_INCONSISTENT)
1566                 drbd_send_state(mdev);
1567
1568         /* Disks got bigger while they were detached */
1569         if (ns.disk > D_NEGOTIATING && ns.pdsk > D_NEGOTIATING &&
1570             test_and_clear_bit(RESYNC_AFTER_NEG, &mdev->flags)) {
1571                 if (ns.conn == C_CONNECTED)
1572                         resync_after_online_grow(mdev);
1573         }
1574
1575         /* A resync finished or aborted, wake paused devices... */
1576         if ((os.conn > C_CONNECTED && ns.conn <= C_CONNECTED) ||
1577             (os.peer_isp && !ns.peer_isp) ||
1578             (os.user_isp && !ns.user_isp))
1579                 resume_next_sg(mdev);
1580
1581         /* sync target done with resync.  Explicitly notify peer, even though
1582          * it should (at least for non-empty resyncs) already know itself. */
1583         if (os.disk < D_UP_TO_DATE && os.conn >= C_SYNC_SOURCE && ns.conn == C_CONNECTED)
1584                 drbd_send_state(mdev);
1585
1586         /* This triggers bitmap writeout of potentially still unwritten pages
1587          * if the resync finished cleanly, or aborted because of peer disk
1588          * failure, or because of connection loss.
1589          * For resync aborted because of local disk failure, we cannot do
1590          * any bitmap writeout anymore.
1591          * No harm done if some bits change during this phase.
1592          */
1593         if (os.conn > C_CONNECTED && ns.conn <= C_CONNECTED && get_ldev(mdev)) {
1594                 drbd_queue_bitmap_io(mdev, &drbd_bm_write, NULL,
1595                         "write from resync_finished", BM_LOCKED_SET_ALLOWED);
1596                 put_ldev(mdev);
1597         }
1598
1599         /* Upon network connection, we need to start the receiver */
1600         if (os.conn == C_STANDALONE && ns.conn == C_UNCONNECTED)
1601                 drbd_thread_start(&mdev->receiver);
1602
1603         /* Terminate worker thread if we are unconfigured - it will be
1604            restarted as needed... */
1605         if (ns.disk == D_DISKLESS &&
1606             ns.conn == C_STANDALONE &&
1607             ns.role == R_SECONDARY) {
1608                 if (os.aftr_isp != ns.aftr_isp)
1609                         resume_next_sg(mdev);
1610                 /* set in __drbd_set_state, unless CONFIG_PENDING was set */
1611                 if (test_bit(DEVICE_DYING, &mdev->flags))
1612                         drbd_thread_stop_nowait(&mdev->worker);
1613         }
1614
1615         drbd_md_sync(mdev);
1616 }
1617
1618
1619 static int drbd_thread_setup(void *arg)
1620 {
1621         struct drbd_thread *thi = (struct drbd_thread *) arg;
1622         struct drbd_conf *mdev = thi->mdev;
1623         unsigned long flags;
1624         int retval;
1625
1626 restart:
1627         retval = thi->function(thi);
1628
1629         spin_lock_irqsave(&thi->t_lock, flags);
1630
1631         /* if the receiver has been "EXITING", the last thing it did
1632          * was set the conn state to "StandAlone",
1633          * if now a re-connect request comes in, conn state goes C_UNCONNECTED,
1634          * and receiver thread will be "started".
1635          * drbd_thread_start needs to set "RESTARTING" in that case.
1636          * t_state check and assignment needs to be within the same spinlock,
1637          * so either thread_start sees EXITING, and can remap to RESTARTING,
1638          * or thread_start see NONE, and can proceed as normal.
1639          */
1640
1641         if (thi->t_state == RESTARTING) {
1642                 dev_info(DEV, "Restarting %s\n", current->comm);
1643                 thi->t_state = RUNNING;
1644                 spin_unlock_irqrestore(&thi->t_lock, flags);
1645                 goto restart;
1646         }
1647
1648         thi->task = NULL;
1649         thi->t_state = NONE;
1650         smp_mb();
1651         complete(&thi->stop);
1652         spin_unlock_irqrestore(&thi->t_lock, flags);
1653
1654         dev_info(DEV, "Terminating %s\n", current->comm);
1655
1656         /* Release mod reference taken when thread was started */
1657         module_put(THIS_MODULE);
1658         return retval;
1659 }
1660
1661 static void drbd_thread_init(struct drbd_conf *mdev, struct drbd_thread *thi,
1662                       int (*func) (struct drbd_thread *))
1663 {
1664         spin_lock_init(&thi->t_lock);
1665         thi->task    = NULL;
1666         thi->t_state = NONE;
1667         thi->function = func;
1668         thi->mdev = mdev;
1669 }
1670
1671 int drbd_thread_start(struct drbd_thread *thi)
1672 {
1673         struct drbd_conf *mdev = thi->mdev;
1674         struct task_struct *nt;
1675         unsigned long flags;
1676
1677         const char *me =
1678                 thi == &mdev->receiver ? "receiver" :
1679                 thi == &mdev->asender  ? "asender"  :
1680                 thi == &mdev->worker   ? "worker"   : "NONSENSE";
1681
1682         /* is used from state engine doing drbd_thread_stop_nowait,
1683          * while holding the req lock irqsave */
1684         spin_lock_irqsave(&thi->t_lock, flags);
1685
1686         switch (thi->t_state) {
1687         case NONE:
1688                 dev_info(DEV, "Starting %s thread (from %s [%d])\n",
1689                                 me, current->comm, current->pid);
1690
1691                 /* Get ref on module for thread - this is released when thread exits */
1692                 if (!try_module_get(THIS_MODULE)) {
1693                         dev_err(DEV, "Failed to get module reference in drbd_thread_start\n");
1694                         spin_unlock_irqrestore(&thi->t_lock, flags);
1695                         return false;
1696                 }
1697
1698                 init_completion(&thi->stop);
1699                 D_ASSERT(thi->task == NULL);
1700                 thi->reset_cpu_mask = 1;
1701                 thi->t_state = RUNNING;
1702                 spin_unlock_irqrestore(&thi->t_lock, flags);
1703                 flush_signals(current); /* otherw. may get -ERESTARTNOINTR */
1704
1705                 nt = kthread_create(drbd_thread_setup, (void *) thi,
1706                                     "drbd%d_%s", mdev_to_minor(mdev), me);
1707
1708                 if (IS_ERR(nt)) {
1709                         dev_err(DEV, "Couldn't start thread\n");
1710
1711                         module_put(THIS_MODULE);
1712                         return false;
1713                 }
1714                 spin_lock_irqsave(&thi->t_lock, flags);
1715                 thi->task = nt;
1716                 thi->t_state = RUNNING;
1717                 spin_unlock_irqrestore(&thi->t_lock, flags);
1718                 wake_up_process(nt);
1719                 break;
1720         case EXITING:
1721                 thi->t_state = RESTARTING;
1722                 dev_info(DEV, "Restarting %s thread (from %s [%d])\n",
1723                                 me, current->comm, current->pid);
1724                 /* fall through */
1725         case RUNNING:
1726         case RESTARTING:
1727         default:
1728                 spin_unlock_irqrestore(&thi->t_lock, flags);
1729                 break;
1730         }
1731
1732         return true;
1733 }
1734
1735
1736 void _drbd_thread_stop(struct drbd_thread *thi, int restart, int wait)
1737 {
1738         unsigned long flags;
1739
1740         enum drbd_thread_state ns = restart ? RESTARTING : EXITING;
1741
1742         /* may be called from state engine, holding the req lock irqsave */
1743         spin_lock_irqsave(&thi->t_lock, flags);
1744
1745         if (thi->t_state == NONE) {
1746                 spin_unlock_irqrestore(&thi->t_lock, flags);
1747                 if (restart)
1748                         drbd_thread_start(thi);
1749                 return;
1750         }
1751
1752         if (thi->t_state != ns) {
1753                 if (thi->task == NULL) {
1754                         spin_unlock_irqrestore(&thi->t_lock, flags);
1755                         return;
1756                 }
1757
1758                 thi->t_state = ns;
1759                 smp_mb();
1760                 init_completion(&thi->stop);
1761                 if (thi->task != current)
1762                         force_sig(DRBD_SIGKILL, thi->task);
1763
1764         }
1765
1766         spin_unlock_irqrestore(&thi->t_lock, flags);
1767
1768         if (wait)
1769                 wait_for_completion(&thi->stop);
1770 }
1771
1772 #ifdef CONFIG_SMP
1773 /**
1774  * drbd_calc_cpu_mask() - Generate CPU masks, spread over all CPUs
1775  * @mdev:       DRBD device.
1776  *
1777  * Forces all threads of a device onto the same CPU. This is beneficial for
1778  * DRBD's performance. May be overwritten by user's configuration.
1779  */
1780 void drbd_calc_cpu_mask(struct drbd_conf *mdev)
1781 {
1782         int ord, cpu;
1783
1784         /* user override. */
1785         if (cpumask_weight(mdev->cpu_mask))
1786                 return;
1787
1788         ord = mdev_to_minor(mdev) % cpumask_weight(cpu_online_mask);
1789         for_each_online_cpu(cpu) {
1790                 if (ord-- == 0) {
1791                         cpumask_set_cpu(cpu, mdev->cpu_mask);
1792                         return;
1793                 }
1794         }
1795         /* should not be reached */
1796         cpumask_setall(mdev->cpu_mask);
1797 }
1798
1799 /**
1800  * drbd_thread_current_set_cpu() - modifies the cpu mask of the _current_ thread
1801  * @mdev:       DRBD device.
1802  *
1803  * call in the "main loop" of _all_ threads, no need for any mutex, current won't die
1804  * prematurely.
1805  */
1806 void drbd_thread_current_set_cpu(struct drbd_conf *mdev)
1807 {
1808         struct task_struct *p = current;
1809         struct drbd_thread *thi =
1810                 p == mdev->asender.task  ? &mdev->asender  :
1811                 p == mdev->receiver.task ? &mdev->receiver :
1812                 p == mdev->worker.task   ? &mdev->worker   :
1813                 NULL;
1814         if (!expect(thi != NULL))
1815                 return;
1816         if (!thi->reset_cpu_mask)
1817                 return;
1818         thi->reset_cpu_mask = 0;
1819         set_cpus_allowed_ptr(p, mdev->cpu_mask);
1820 }
1821 #endif
1822
1823 /* the appropriate socket mutex must be held already */
1824 int _drbd_send_cmd(struct drbd_conf *mdev, struct socket *sock,
1825                           enum drbd_packets cmd, struct p_header80 *h,
1826                           size_t size, unsigned msg_flags)
1827 {
1828         int sent, ok;
1829
1830         if (!expect(h))
1831                 return false;
1832         if (!expect(size))
1833                 return false;
1834
1835         h->magic   = cpu_to_be32(DRBD_MAGIC);
1836         h->command = cpu_to_be16(cmd);
1837         h->length  = cpu_to_be16(size-sizeof(struct p_header80));
1838
1839         sent = drbd_send(mdev, sock, h, size, msg_flags);
1840
1841         ok = (sent == size);
1842         if (!ok && !signal_pending(current))
1843                 dev_warn(DEV, "short sent %s size=%d sent=%d\n",
1844                     cmdname(cmd), (int)size, sent);
1845         return ok;
1846 }
1847
1848 /* don't pass the socket. we may only look at it
1849  * when we hold the appropriate socket mutex.
1850  */
1851 int drbd_send_cmd(struct drbd_conf *mdev, int use_data_socket,
1852                   enum drbd_packets cmd, struct p_header80 *h, size_t size)
1853 {
1854         int ok = 0;
1855         struct socket *sock;
1856
1857         if (use_data_socket) {
1858                 mutex_lock(&mdev->data.mutex);
1859                 sock = mdev->data.socket;
1860         } else {
1861                 mutex_lock(&mdev->meta.mutex);
1862                 sock = mdev->meta.socket;
1863         }
1864
1865         /* drbd_disconnect() could have called drbd_free_sock()
1866          * while we were waiting in down()... */
1867         if (likely(sock != NULL))
1868                 ok = _drbd_send_cmd(mdev, sock, cmd, h, size, 0);
1869
1870         if (use_data_socket)
1871                 mutex_unlock(&mdev->data.mutex);
1872         else
1873                 mutex_unlock(&mdev->meta.mutex);
1874         return ok;
1875 }
1876
1877 int drbd_send_cmd2(struct drbd_conf *mdev, enum drbd_packets cmd, char *data,
1878                    size_t size)
1879 {
1880         struct p_header80 h;
1881         int ok;
1882
1883         h.magic   = cpu_to_be32(DRBD_MAGIC);
1884         h.command = cpu_to_be16(cmd);
1885         h.length  = cpu_to_be16(size);
1886
1887         if (!drbd_get_data_sock(mdev))
1888                 return 0;
1889
1890         ok = (sizeof(h) ==
1891                 drbd_send(mdev, mdev->data.socket, &h, sizeof(h), 0));
1892         ok = ok && (size ==
1893                 drbd_send(mdev, mdev->data.socket, data, size, 0));
1894
1895         drbd_put_data_sock(mdev);
1896
1897         return ok;
1898 }
1899
1900 int drbd_send_sync_param(struct drbd_conf *mdev, struct syncer_conf *sc)
1901 {
1902         struct p_rs_param_95 *p;
1903         struct socket *sock;
1904         int size, rv;
1905         const int apv = mdev->agreed_pro_version;
1906
1907         size = apv <= 87 ? sizeof(struct p_rs_param)
1908                 : apv == 88 ? sizeof(struct p_rs_param)
1909                         + strlen(mdev->sync_conf.verify_alg) + 1
1910                 : apv <= 94 ? sizeof(struct p_rs_param_89)
1911                 : /* apv >= 95 */ sizeof(struct p_rs_param_95);
1912
1913         /* used from admin command context and receiver/worker context.
1914          * to avoid kmalloc, grab the socket right here,
1915          * then use the pre-allocated sbuf there */
1916         mutex_lock(&mdev->data.mutex);
1917         sock = mdev->data.socket;
1918
1919         if (likely(sock != NULL)) {
1920                 enum drbd_packets cmd = apv >= 89 ? P_SYNC_PARAM89 : P_SYNC_PARAM;
1921
1922                 p = &mdev->data.sbuf.rs_param_95;
1923
1924                 /* initialize verify_alg and csums_alg */
1925                 memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
1926
1927                 p->rate = cpu_to_be32(sc->rate);
1928                 p->c_plan_ahead = cpu_to_be32(sc->c_plan_ahead);
1929                 p->c_delay_target = cpu_to_be32(sc->c_delay_target);
1930                 p->c_fill_target = cpu_to_be32(sc->c_fill_target);
1931                 p->c_max_rate = cpu_to_be32(sc->c_max_rate);
1932
1933                 if (apv >= 88)
1934                         strcpy(p->verify_alg, mdev->sync_conf.verify_alg);
1935                 if (apv >= 89)
1936                         strcpy(p->csums_alg, mdev->sync_conf.csums_alg);
1937
1938                 rv = _drbd_send_cmd(mdev, sock, cmd, &p->head, size, 0);
1939         } else
1940                 rv = 0; /* not ok */
1941
1942         mutex_unlock(&mdev->data.mutex);
1943
1944         return rv;
1945 }
1946
1947 int drbd_send_protocol(struct drbd_conf *mdev)
1948 {
1949         struct p_protocol *p;
1950         int size, cf, rv;
1951
1952         size = sizeof(struct p_protocol);
1953
1954         if (mdev->agreed_pro_version >= 87)
1955                 size += strlen(mdev->tconn->net_conf->integrity_alg) + 1;
1956
1957         /* we must not recurse into our own queue,
1958          * as that is blocked during handshake */
1959         p = kmalloc(size, GFP_NOIO);
1960         if (p == NULL)
1961                 return 0;
1962
1963         p->protocol      = cpu_to_be32(mdev->tconn->net_conf->wire_protocol);
1964         p->after_sb_0p   = cpu_to_be32(mdev->tconn->net_conf->after_sb_0p);
1965         p->after_sb_1p   = cpu_to_be32(mdev->tconn->net_conf->after_sb_1p);
1966         p->after_sb_2p   = cpu_to_be32(mdev->tconn->net_conf->after_sb_2p);
1967         p->two_primaries = cpu_to_be32(mdev->tconn->net_conf->two_primaries);
1968
1969         cf = 0;
1970         if (mdev->tconn->net_conf->want_lose)
1971                 cf |= CF_WANT_LOSE;
1972         if (mdev->tconn->net_conf->dry_run) {
1973                 if (mdev->agreed_pro_version >= 92)
1974                         cf |= CF_DRY_RUN;
1975                 else {
1976                         dev_err(DEV, "--dry-run is not supported by peer");
1977                         kfree(p);
1978                         return -1;
1979                 }
1980         }
1981         p->conn_flags    = cpu_to_be32(cf);
1982
1983         if (mdev->agreed_pro_version >= 87)
1984                 strcpy(p->integrity_alg, mdev->tconn->net_conf->integrity_alg);
1985
1986         rv = drbd_send_cmd(mdev, USE_DATA_SOCKET, P_PROTOCOL,
1987                            (struct p_header80 *)p, size);
1988         kfree(p);
1989         return rv;
1990 }
1991
1992 int _drbd_send_uuids(struct drbd_conf *mdev, u64 uuid_flags)
1993 {
1994         struct p_uuids p;
1995         int i;
1996
1997         if (!get_ldev_if_state(mdev, D_NEGOTIATING))
1998                 return 1;
1999
2000         for (i = UI_CURRENT; i < UI_SIZE; i++)
2001                 p.uuid[i] = mdev->ldev ? cpu_to_be64(mdev->ldev->md.uuid[i]) : 0;
2002
2003         mdev->comm_bm_set = drbd_bm_total_weight(mdev);
2004         p.uuid[UI_SIZE] = cpu_to_be64(mdev->comm_bm_set);
2005         uuid_flags |= mdev->tconn->net_conf->want_lose ? 1 : 0;
2006         uuid_flags |= test_bit(CRASHED_PRIMARY, &mdev->flags) ? 2 : 0;
2007         uuid_flags |= mdev->new_state_tmp.disk == D_INCONSISTENT ? 4 : 0;
2008         p.uuid[UI_FLAGS] = cpu_to_be64(uuid_flags);
2009
2010         put_ldev(mdev);
2011
2012         return drbd_send_cmd(mdev, USE_DATA_SOCKET, P_UUIDS,
2013                              (struct p_header80 *)&p, sizeof(p));
2014 }
2015
2016 int drbd_send_uuids(struct drbd_conf *mdev)
2017 {
2018         return _drbd_send_uuids(mdev, 0);
2019 }
2020
2021 int drbd_send_uuids_skip_initial_sync(struct drbd_conf *mdev)
2022 {
2023         return _drbd_send_uuids(mdev, 8);
2024 }
2025
2026 void drbd_print_uuids(struct drbd_conf *mdev, const char *text)
2027 {
2028         if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
2029                 u64 *uuid = mdev->ldev->md.uuid;
2030                 dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX\n",
2031                      text,
2032                      (unsigned long long)uuid[UI_CURRENT],
2033                      (unsigned long long)uuid[UI_BITMAP],
2034                      (unsigned long long)uuid[UI_HISTORY_START],
2035                      (unsigned long long)uuid[UI_HISTORY_END]);
2036                 put_ldev(mdev);
2037         } else {
2038                 dev_info(DEV, "%s effective data uuid: %016llX\n",
2039                                 text,
2040                                 (unsigned long long)mdev->ed_uuid);
2041         }
2042 }
2043
2044 int drbd_gen_and_send_sync_uuid(struct drbd_conf *mdev)
2045 {
2046         struct p_rs_uuid p;
2047         u64 uuid;
2048
2049         D_ASSERT(mdev->state.disk == D_UP_TO_DATE);
2050
2051         uuid = mdev->ldev->md.uuid[UI_BITMAP] + UUID_NEW_BM_OFFSET;
2052         drbd_uuid_set(mdev, UI_BITMAP, uuid);
2053         drbd_print_uuids(mdev, "updated sync UUID");
2054         drbd_md_sync(mdev);
2055         p.uuid = cpu_to_be64(uuid);
2056
2057         return drbd_send_cmd(mdev, USE_DATA_SOCKET, P_SYNC_UUID,
2058                              (struct p_header80 *)&p, sizeof(p));
2059 }
2060
2061 int drbd_send_sizes(struct drbd_conf *mdev, int trigger_reply, enum dds_flags flags)
2062 {
2063         struct p_sizes p;
2064         sector_t d_size, u_size;
2065         int q_order_type, max_bio_size;
2066         int ok;
2067
2068         if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
2069                 D_ASSERT(mdev->ldev->backing_bdev);
2070                 d_size = drbd_get_max_capacity(mdev->ldev);
2071                 u_size = mdev->ldev->dc.disk_size;
2072                 q_order_type = drbd_queue_order_type(mdev);
2073                 max_bio_size = queue_max_hw_sectors(mdev->ldev->backing_bdev->bd_disk->queue) << 9;
2074                 max_bio_size = min_t(int, max_bio_size, DRBD_MAX_BIO_SIZE);
2075                 put_ldev(mdev);
2076         } else {
2077                 d_size = 0;
2078                 u_size = 0;
2079                 q_order_type = QUEUE_ORDERED_NONE;
2080                 max_bio_size = DRBD_MAX_BIO_SIZE; /* ... multiple BIOs per peer_request */
2081         }
2082
2083         p.d_size = cpu_to_be64(d_size);
2084         p.u_size = cpu_to_be64(u_size);
2085         p.c_size = cpu_to_be64(trigger_reply ? 0 : drbd_get_capacity(mdev->this_bdev));
2086         p.max_bio_size = cpu_to_be32(max_bio_size);
2087         p.queue_order_type = cpu_to_be16(q_order_type);
2088         p.dds_flags = cpu_to_be16(flags);
2089
2090         ok = drbd_send_cmd(mdev, USE_DATA_SOCKET, P_SIZES,
2091                            (struct p_header80 *)&p, sizeof(p));
2092         return ok;
2093 }
2094
2095 /**
2096  * drbd_send_state() - Sends the drbd state to the peer
2097  * @mdev:       DRBD device.
2098  */
2099 int drbd_send_state(struct drbd_conf *mdev)
2100 {
2101         struct socket *sock;
2102         struct p_state p;
2103         int ok = 0;
2104
2105         /* Grab state lock so we wont send state if we're in the middle
2106          * of a cluster wide state change on another thread */
2107         drbd_state_lock(mdev);
2108
2109         mutex_lock(&mdev->data.mutex);
2110
2111         p.state = cpu_to_be32(mdev->state.i); /* Within the send mutex */
2112         sock = mdev->data.socket;
2113
2114         if (likely(sock != NULL)) {
2115                 ok = _drbd_send_cmd(mdev, sock, P_STATE,
2116                                     (struct p_header80 *)&p, sizeof(p), 0);
2117         }
2118
2119         mutex_unlock(&mdev->data.mutex);
2120
2121         drbd_state_unlock(mdev);
2122         return ok;
2123 }
2124
2125 int drbd_send_state_req(struct drbd_conf *mdev,
2126         union drbd_state mask, union drbd_state val)
2127 {
2128         struct p_req_state p;
2129
2130         p.mask    = cpu_to_be32(mask.i);
2131         p.val     = cpu_to_be32(val.i);
2132
2133         return drbd_send_cmd(mdev, USE_DATA_SOCKET, P_STATE_CHG_REQ,
2134                              (struct p_header80 *)&p, sizeof(p));
2135 }
2136
2137 int drbd_send_sr_reply(struct drbd_conf *mdev, enum drbd_state_rv retcode)
2138 {
2139         struct p_req_state_reply p;
2140
2141         p.retcode    = cpu_to_be32(retcode);
2142
2143         return drbd_send_cmd(mdev, USE_META_SOCKET, P_STATE_CHG_REPLY,
2144                              (struct p_header80 *)&p, sizeof(p));
2145 }
2146
2147 int fill_bitmap_rle_bits(struct drbd_conf *mdev,
2148         struct p_compressed_bm *p,
2149         struct bm_xfer_ctx *c)
2150 {
2151         struct bitstream bs;
2152         unsigned long plain_bits;
2153         unsigned long tmp;
2154         unsigned long rl;
2155         unsigned len;
2156         unsigned toggle;
2157         int bits;
2158
2159         /* may we use this feature? */
2160         if ((mdev->sync_conf.use_rle == 0) ||
2161                 (mdev->agreed_pro_version < 90))
2162                         return 0;
2163
2164         if (c->bit_offset >= c->bm_bits)
2165                 return 0; /* nothing to do. */
2166
2167         /* use at most thus many bytes */
2168         bitstream_init(&bs, p->code, BM_PACKET_VLI_BYTES_MAX, 0);
2169         memset(p->code, 0, BM_PACKET_VLI_BYTES_MAX);
2170         /* plain bits covered in this code string */
2171         plain_bits = 0;
2172
2173         /* p->encoding & 0x80 stores whether the first run length is set.
2174          * bit offset is implicit.
2175          * start with toggle == 2 to be able to tell the first iteration */
2176         toggle = 2;
2177
2178         /* see how much plain bits we can stuff into one packet
2179          * using RLE and VLI. */
2180         do {
2181                 tmp = (toggle == 0) ? _drbd_bm_find_next_zero(mdev, c->bit_offset)
2182                                     : _drbd_bm_find_next(mdev, c->bit_offset);
2183                 if (tmp == -1UL)
2184                         tmp = c->bm_bits;
2185                 rl = tmp - c->bit_offset;
2186
2187                 if (toggle == 2) { /* first iteration */
2188                         if (rl == 0) {
2189                                 /* the first checked bit was set,
2190                                  * store start value, */
2191                                 DCBP_set_start(p, 1);
2192                                 /* but skip encoding of zero run length */
2193                                 toggle = !toggle;
2194                                 continue;
2195                         }
2196                         DCBP_set_start(p, 0);
2197                 }
2198
2199                 /* paranoia: catch zero runlength.
2200                  * can only happen if bitmap is modified while we scan it. */
2201                 if (rl == 0) {
2202                         dev_err(DEV, "unexpected zero runlength while encoding bitmap "
2203                             "t:%u bo:%lu\n", toggle, c->bit_offset);
2204                         return -1;
2205                 }
2206
2207                 bits = vli_encode_bits(&bs, rl);
2208                 if (bits == -ENOBUFS) /* buffer full */
2209                         break;
2210                 if (bits <= 0) {
2211                         dev_err(DEV, "error while encoding bitmap: %d\n", bits);
2212                         return 0;
2213                 }
2214
2215                 toggle = !toggle;
2216                 plain_bits += rl;
2217                 c->bit_offset = tmp;
2218         } while (c->bit_offset < c->bm_bits);
2219
2220         len = bs.cur.b - p->code + !!bs.cur.bit;
2221
2222         if (plain_bits < (len << 3)) {
2223                 /* incompressible with this method.
2224                  * we need to rewind both word and bit position. */
2225                 c->bit_offset -= plain_bits;
2226                 bm_xfer_ctx_bit_to_word_offset(c);
2227                 c->bit_offset = c->word_offset * BITS_PER_LONG;
2228                 return 0;
2229         }
2230
2231         /* RLE + VLI was able to compress it just fine.
2232          * update c->word_offset. */
2233         bm_xfer_ctx_bit_to_word_offset(c);
2234
2235         /* store pad_bits */
2236         DCBP_set_pad_bits(p, (8 - bs.cur.bit) & 0x7);
2237
2238         return len;
2239 }
2240
2241 /**
2242  * send_bitmap_rle_or_plain
2243  *
2244  * Return 0 when done, 1 when another iteration is needed, and a negative error
2245  * code upon failure.
2246  */
2247 static int
2248 send_bitmap_rle_or_plain(struct drbd_conf *mdev,
2249                          struct p_header80 *h, struct bm_xfer_ctx *c)
2250 {
2251         struct p_compressed_bm *p = (void*)h;
2252         unsigned long num_words;
2253         int len;
2254         int ok;
2255
2256         len = fill_bitmap_rle_bits(mdev, p, c);
2257
2258         if (len < 0)
2259                 return -EIO;
2260
2261         if (len) {
2262                 DCBP_set_code(p, RLE_VLI_Bits);
2263                 ok = _drbd_send_cmd(mdev, mdev->data.socket, P_COMPRESSED_BITMAP, h,
2264                         sizeof(*p) + len, 0);
2265
2266                 c->packets[0]++;
2267                 c->bytes[0] += sizeof(*p) + len;
2268
2269                 if (c->bit_offset >= c->bm_bits)
2270                         len = 0; /* DONE */
2271         } else {
2272                 /* was not compressible.
2273                  * send a buffer full of plain text bits instead. */
2274                 num_words = min_t(size_t, BM_PACKET_WORDS, c->bm_words - c->word_offset);
2275                 len = num_words * sizeof(long);
2276                 if (len)
2277                         drbd_bm_get_lel(mdev, c->word_offset, num_words, (unsigned long*)h->payload);
2278                 ok = _drbd_send_cmd(mdev, mdev->data.socket, P_BITMAP,
2279                                    h, sizeof(struct p_header80) + len, 0);
2280                 c->word_offset += num_words;
2281                 c->bit_offset = c->word_offset * BITS_PER_LONG;
2282
2283                 c->packets[1]++;
2284                 c->bytes[1] += sizeof(struct p_header80) + len;
2285
2286                 if (c->bit_offset > c->bm_bits)
2287                         c->bit_offset = c->bm_bits;
2288         }
2289         if (ok) {
2290                 if (len == 0) {
2291                         INFO_bm_xfer_stats(mdev, "send", c);
2292                         return 0;
2293                 } else
2294                         return 1;
2295         }
2296         return -EIO;
2297 }
2298
2299 /* See the comment at receive_bitmap() */
2300 int _drbd_send_bitmap(struct drbd_conf *mdev)
2301 {
2302         struct bm_xfer_ctx c;
2303         struct p_header80 *p;
2304         int err;
2305
2306         if (!expect(mdev->bitmap))
2307                 return false;
2308
2309         /* maybe we should use some per thread scratch page,
2310          * and allocate that during initial device creation? */
2311         p = (struct p_header80 *) __get_free_page(GFP_NOIO);
2312         if (!p) {
2313                 dev_err(DEV, "failed to allocate one page buffer in %s\n", __func__);
2314                 return false;
2315         }
2316
2317         if (get_ldev(mdev)) {
2318                 if (drbd_md_test_flag(mdev->ldev, MDF_FULL_SYNC)) {
2319                         dev_info(DEV, "Writing the whole bitmap, MDF_FullSync was set.\n");
2320                         drbd_bm_set_all(mdev);
2321                         if (drbd_bm_write(mdev)) {
2322                                 /* write_bm did fail! Leave full sync flag set in Meta P_DATA
2323                                  * but otherwise process as per normal - need to tell other
2324                                  * side that a full resync is required! */
2325                                 dev_err(DEV, "Failed to write bitmap to disk!\n");
2326                         } else {
2327                                 drbd_md_clear_flag(mdev, MDF_FULL_SYNC);
2328                                 drbd_md_sync(mdev);
2329                         }
2330                 }
2331                 put_ldev(mdev);
2332         }
2333
2334         c = (struct bm_xfer_ctx) {
2335                 .bm_bits = drbd_bm_bits(mdev),
2336                 .bm_words = drbd_bm_words(mdev),
2337         };
2338
2339         do {
2340                 err = send_bitmap_rle_or_plain(mdev, p, &c);
2341         } while (err > 0);
2342
2343         free_page((unsigned long) p);
2344         return err == 0;
2345 }
2346
2347 int drbd_send_bitmap(struct drbd_conf *mdev)
2348 {
2349         int err;
2350
2351         if (!drbd_get_data_sock(mdev))
2352                 return -1;
2353         err = !_drbd_send_bitmap(mdev);
2354         drbd_put_data_sock(mdev);
2355         return err;
2356 }
2357
2358 int drbd_send_b_ack(struct drbd_conf *mdev, u32 barrier_nr, u32 set_size)
2359 {
2360         int ok;
2361         struct p_barrier_ack p;
2362
2363         p.barrier  = barrier_nr;
2364         p.set_size = cpu_to_be32(set_size);
2365
2366         if (mdev->state.conn < C_CONNECTED)
2367                 return false;
2368         ok = drbd_send_cmd(mdev, USE_META_SOCKET, P_BARRIER_ACK,
2369                         (struct p_header80 *)&p, sizeof(p));
2370         return ok;
2371 }
2372
2373 /**
2374  * _drbd_send_ack() - Sends an ack packet
2375  * @mdev:       DRBD device.
2376  * @cmd:        Packet command code.
2377  * @sector:     sector, needs to be in big endian byte order
2378  * @blksize:    size in byte, needs to be in big endian byte order
2379  * @block_id:   Id, big endian byte order
2380  */
2381 static int _drbd_send_ack(struct drbd_conf *mdev, enum drbd_packets cmd,
2382                           u64 sector,
2383                           u32 blksize,
2384                           u64 block_id)
2385 {
2386         int ok;
2387         struct p_block_ack p;
2388
2389         p.sector   = sector;
2390         p.block_id = block_id;
2391         p.blksize  = blksize;
2392         p.seq_num  = cpu_to_be32(atomic_add_return(1, &mdev->packet_seq));
2393
2394         if (!mdev->meta.socket || mdev->state.conn < C_CONNECTED)
2395                 return false;
2396         ok = drbd_send_cmd(mdev, USE_META_SOCKET, cmd,
2397                                 (struct p_header80 *)&p, sizeof(p));
2398         return ok;
2399 }
2400
2401 /* dp->sector and dp->block_id already/still in network byte order,
2402  * data_size is payload size according to dp->head,
2403  * and may need to be corrected for digest size. */
2404 int drbd_send_ack_dp(struct drbd_conf *mdev, enum drbd_packets cmd,
2405                      struct p_data *dp, int data_size)
2406 {
2407         data_size -= (mdev->agreed_pro_version >= 87 && mdev->integrity_r_tfm) ?
2408                 crypto_hash_digestsize(mdev->integrity_r_tfm) : 0;
2409         return _drbd_send_ack(mdev, cmd, dp->sector, cpu_to_be32(data_size),
2410                               dp->block_id);
2411 }
2412
2413 int drbd_send_ack_rp(struct drbd_conf *mdev, enum drbd_packets cmd,
2414                      struct p_block_req *rp)
2415 {
2416         return _drbd_send_ack(mdev, cmd, rp->sector, rp->blksize, rp->block_id);
2417 }
2418
2419 /**
2420  * drbd_send_ack() - Sends an ack packet
2421  * @mdev:       DRBD device.
2422  * @cmd:        Packet command code.
2423  * @e:          Epoch entry.
2424  */
2425 int drbd_send_ack(struct drbd_conf *mdev,
2426         enum drbd_packets cmd, struct drbd_epoch_entry *e)
2427 {
2428         return _drbd_send_ack(mdev, cmd,
2429                               cpu_to_be64(e->i.sector),
2430                               cpu_to_be32(e->i.size),
2431                               e->block_id);
2432 }
2433
2434 /* This function misuses the block_id field to signal if the blocks
2435  * are is sync or not. */
2436 int drbd_send_ack_ex(struct drbd_conf *mdev, enum drbd_packets cmd,
2437                      sector_t sector, int blksize, u64 block_id)
2438 {
2439         return _drbd_send_ack(mdev, cmd,
2440                               cpu_to_be64(sector),
2441                               cpu_to_be32(blksize),
2442                               cpu_to_be64(block_id));
2443 }
2444
2445 int drbd_send_drequest(struct drbd_conf *mdev, int cmd,
2446                        sector_t sector, int size, u64 block_id)
2447 {
2448         int ok;
2449         struct p_block_req p;
2450
2451         p.sector   = cpu_to_be64(sector);
2452         p.block_id = block_id;
2453         p.blksize  = cpu_to_be32(size);
2454
2455         ok = drbd_send_cmd(mdev, USE_DATA_SOCKET, cmd,
2456                                 (struct p_header80 *)&p, sizeof(p));
2457         return ok;
2458 }
2459
2460 int drbd_send_drequest_csum(struct drbd_conf *mdev,
2461                             sector_t sector, int size,
2462                             void *digest, int digest_size,
2463                             enum drbd_packets cmd)
2464 {
2465         int ok;
2466         struct p_block_req p;
2467
2468         p.sector   = cpu_to_be64(sector);
2469         p.block_id = ID_SYNCER /* unused */;
2470         p.blksize  = cpu_to_be32(size);
2471
2472         p.head.magic   = cpu_to_be32(DRBD_MAGIC);
2473         p.head.command = cpu_to_be16(cmd);
2474         p.head.length  = cpu_to_be16(sizeof(p) - sizeof(struct p_header80) + digest_size);
2475
2476         mutex_lock(&mdev->data.mutex);
2477
2478         ok = (sizeof(p) == drbd_send(mdev, mdev->data.socket, &p, sizeof(p), 0));
2479         ok = ok && (digest_size == drbd_send(mdev, mdev->data.socket, digest, digest_size, 0));
2480
2481         mutex_unlock(&mdev->data.mutex);
2482
2483         return ok;
2484 }
2485
2486 int drbd_send_ov_request(struct drbd_conf *mdev, sector_t sector, int size)
2487 {
2488         int ok;
2489         struct p_block_req p;
2490
2491         p.sector   = cpu_to_be64(sector);
2492         p.block_id = ID_SYNCER /* unused */;
2493         p.blksize  = cpu_to_be32(size);
2494
2495         ok = drbd_send_cmd(mdev, USE_DATA_SOCKET, P_OV_REQUEST,
2496                            (struct p_header80 *)&p, sizeof(p));
2497         return ok;
2498 }
2499
2500 /* called on sndtimeo
2501  * returns false if we should retry,
2502  * true if we think connection is dead
2503  */
2504 static int we_should_drop_the_connection(struct drbd_conf *mdev, struct socket *sock)
2505 {
2506         int drop_it;
2507         /* long elapsed = (long)(jiffies - mdev->last_received); */
2508
2509         drop_it =   mdev->meta.socket == sock
2510                 || !mdev->asender.task
2511                 || get_t_state(&mdev->asender) != RUNNING
2512                 || mdev->state.conn < C_CONNECTED;
2513
2514         if (drop_it)
2515                 return true;
2516
2517         drop_it = !--mdev->ko_count;
2518         if (!drop_it) {
2519                 dev_err(DEV, "[%s/%d] sock_sendmsg time expired, ko = %u\n",
2520                        current->comm, current->pid, mdev->ko_count);
2521                 request_ping(mdev);
2522         }
2523
2524         return drop_it; /* && (mdev->state == R_PRIMARY) */;
2525 }
2526
2527 /* The idea of sendpage seems to be to put some kind of reference
2528  * to the page into the skb, and to hand it over to the NIC. In
2529  * this process get_page() gets called.
2530  *
2531  * As soon as the page was really sent over the network put_page()
2532  * gets called by some part of the network layer. [ NIC driver? ]
2533  *
2534  * [ get_page() / put_page() increment/decrement the count. If count
2535  *   reaches 0 the page will be freed. ]
2536  *
2537  * This works nicely with pages from FSs.
2538  * But this means that in protocol A we might signal IO completion too early!
2539  *
2540  * In order not to corrupt data during a resync we must make sure
2541  * that we do not reuse our own buffer pages (EEs) to early, therefore
2542  * we have the net_ee list.
2543  *
2544  * XFS seems to have problems, still, it submits pages with page_count == 0!
2545  * As a workaround, we disable sendpage on pages
2546  * with page_count == 0 or PageSlab.
2547  */
2548 static int _drbd_no_send_page(struct drbd_conf *mdev, struct page *page,
2549                    int offset, size_t size, unsigned msg_flags)
2550 {
2551         int sent = drbd_send(mdev, mdev->data.socket, kmap(page) + offset, size, msg_flags);
2552         kunmap(page);
2553         if (sent == size)
2554                 mdev->send_cnt += size>>9;
2555         return sent == size;
2556 }
2557
2558 static int _drbd_send_page(struct drbd_conf *mdev, struct page *page,
2559                     int offset, size_t size, unsigned msg_flags)
2560 {
2561         mm_segment_t oldfs = get_fs();
2562         int sent, ok;
2563         int len = size;
2564
2565         /* e.g. XFS meta- & log-data is in slab pages, which have a
2566          * page_count of 0 and/or have PageSlab() set.
2567          * we cannot use send_page for those, as that does get_page();
2568          * put_page(); and would cause either a VM_BUG directly, or
2569          * __page_cache_release a page that would actually still be referenced
2570          * by someone, leading to some obscure delayed Oops somewhere else. */
2571         if (disable_sendpage || (page_count(page) < 1) || PageSlab(page))
2572                 return _drbd_no_send_page(mdev, page, offset, size, msg_flags);
2573
2574         msg_flags |= MSG_NOSIGNAL;
2575         drbd_update_congested(mdev);
2576         set_fs(KERNEL_DS);
2577         do {
2578                 sent = mdev->data.socket->ops->sendpage(mdev->data.socket, page,
2579                                                         offset, len,
2580                                                         msg_flags);
2581                 if (sent == -EAGAIN) {
2582                         if (we_should_drop_the_connection(mdev,
2583                                                           mdev->data.socket))
2584                                 break;
2585                         else
2586                                 continue;
2587                 }
2588                 if (sent <= 0) {
2589                         dev_warn(DEV, "%s: size=%d len=%d sent=%d\n",
2590                              __func__, (int)size, len, sent);
2591                         break;
2592                 }
2593                 len    -= sent;
2594                 offset += sent;
2595         } while (len > 0 /* THINK && mdev->cstate >= C_CONNECTED*/);
2596         set_fs(oldfs);
2597         clear_bit(NET_CONGESTED, &mdev->flags);
2598
2599         ok = (len == 0);
2600         if (likely(ok))
2601                 mdev->send_cnt += size>>9;
2602         return ok;
2603 }
2604
2605 static int _drbd_send_bio(struct drbd_conf *mdev, struct bio *bio)
2606 {
2607         struct bio_vec *bvec;
2608         int i;
2609         /* hint all but last page with MSG_MORE */
2610         __bio_for_each_segment(bvec, bio, i, 0) {
2611                 if (!_drbd_no_send_page(mdev, bvec->bv_page,
2612                                      bvec->bv_offset, bvec->bv_len,
2613                                      i == bio->bi_vcnt -1 ? 0 : MSG_MORE))
2614                         return 0;
2615         }
2616         return 1;
2617 }
2618
2619 static int _drbd_send_zc_bio(struct drbd_conf *mdev, struct bio *bio)
2620 {
2621         struct bio_vec *bvec;
2622         int i;
2623         /* hint all but last page with MSG_MORE */
2624         __bio_for_each_segment(bvec, bio, i, 0) {
2625                 if (!_drbd_send_page(mdev, bvec->bv_page,
2626                                      bvec->bv_offset, bvec->bv_len,
2627                                      i == bio->bi_vcnt -1 ? 0 : MSG_MORE))
2628                         return 0;
2629         }
2630         return 1;
2631 }
2632
2633 static int _drbd_send_zc_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
2634 {
2635         struct page *page = e->pages;
2636         unsigned len = e->i.size;
2637         /* hint all but last page with MSG_MORE */
2638         page_chain_for_each(page) {
2639                 unsigned l = min_t(unsigned, len, PAGE_SIZE);
2640                 if (!_drbd_send_page(mdev, page, 0, l,
2641                                 page_chain_next(page) ? MSG_MORE : 0))
2642                         return 0;
2643                 len -= l;
2644         }
2645         return 1;
2646 }
2647
2648 static u32 bio_flags_to_wire(struct drbd_conf *mdev, unsigned long bi_rw)
2649 {
2650         if (mdev->agreed_pro_version >= 95)
2651                 return  (bi_rw & REQ_SYNC ? DP_RW_SYNC : 0) |
2652                         (bi_rw & REQ_FUA ? DP_FUA : 0) |
2653                         (bi_rw & REQ_FLUSH ? DP_FLUSH : 0) |
2654                         (bi_rw & REQ_DISCARD ? DP_DISCARD : 0);
2655         else
2656                 return bi_rw & REQ_SYNC ? DP_RW_SYNC : 0;
2657 }
2658
2659 /* Used to send write requests
2660  * R_PRIMARY -> Peer    (P_DATA)
2661  */
2662 int drbd_send_dblock(struct drbd_conf *mdev, struct drbd_request *req)
2663 {
2664         int ok = 1;
2665         struct p_data p;
2666         unsigned int dp_flags = 0;
2667         void *dgb;
2668         int dgs;
2669
2670         if (!drbd_get_data_sock(mdev))
2671                 return 0;
2672
2673         dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_w_tfm) ?
2674                 crypto_hash_digestsize(mdev->integrity_w_tfm) : 0;
2675
2676         if (req->i.size <= DRBD_MAX_SIZE_H80_PACKET) {
2677                 p.head.h80.magic   = cpu_to_be32(DRBD_MAGIC);
2678                 p.head.h80.command = cpu_to_be16(P_DATA);
2679                 p.head.h80.length  =
2680                         cpu_to_be16(sizeof(p) - sizeof(union p_header) + dgs + req->i.size);
2681         } else {
2682                 p.head.h95.magic   = cpu_to_be16(DRBD_MAGIC_BIG);
2683                 p.head.h95.command = cpu_to_be16(P_DATA);
2684                 p.head.h95.length  =
2685                         cpu_to_be32(sizeof(p) - sizeof(union p_header) + dgs + req->i.size);
2686         }
2687
2688         p.sector   = cpu_to_be64(req->i.sector);
2689         p.block_id = (unsigned long)req;
2690         p.seq_num  = cpu_to_be32(req->seq_num =
2691                                  atomic_add_return(1, &mdev->packet_seq));
2692
2693         dp_flags = bio_flags_to_wire(mdev, req->master_bio->bi_rw);
2694
2695         if (mdev->state.conn >= C_SYNC_SOURCE &&
2696             mdev->state.conn <= C_PAUSED_SYNC_T)
2697                 dp_flags |= DP_MAY_SET_IN_SYNC;
2698
2699         p.dp_flags = cpu_to_be32(dp_flags);
2700         set_bit(UNPLUG_REMOTE, &mdev->flags);
2701         ok = (sizeof(p) ==
2702                 drbd_send(mdev, mdev->data.socket, &p, sizeof(p), dgs ? MSG_MORE : 0));
2703         if (ok && dgs) {
2704                 dgb = mdev->int_dig_out;
2705                 drbd_csum_bio(mdev, mdev->integrity_w_tfm, req->master_bio, dgb);
2706                 ok = dgs == drbd_send(mdev, mdev->data.socket, dgb, dgs, 0);
2707         }
2708         if (ok) {
2709                 /* For protocol A, we have to memcpy the payload into
2710                  * socket buffers, as we may complete right away
2711                  * as soon as we handed it over to tcp, at which point the data
2712                  * pages may become invalid.
2713                  *
2714                  * For data-integrity enabled, we copy it as well, so we can be
2715                  * sure that even if the bio pages may still be modified, it
2716                  * won't change the data on the wire, thus if the digest checks
2717                  * out ok after sending on this side, but does not fit on the
2718                  * receiving side, we sure have detected corruption elsewhere.
2719                  */
2720                 if (mdev->tconn->net_conf->wire_protocol == DRBD_PROT_A || dgs)
2721                         ok = _drbd_send_bio(mdev, req->master_bio);
2722                 else
2723                         ok = _drbd_send_zc_bio(mdev, req->master_bio);
2724
2725                 /* double check digest, sometimes buffers have been modified in flight. */
2726                 if (dgs > 0 && dgs <= 64) {
2727                         /* 64 byte, 512 bit, is the largest digest size
2728                          * currently supported in kernel crypto. */
2729                         unsigned char digest[64];
2730                         drbd_csum_bio(mdev, mdev->integrity_w_tfm, req->master_bio, digest);
2731                         if (memcmp(mdev->int_dig_out, digest, dgs)) {
2732                                 dev_warn(DEV,
2733                                         "Digest mismatch, buffer modified by upper layers during write: %llus +%u\n",
2734                                         (unsigned long long)req->i.sector, req->i.size);
2735                         }
2736                 } /* else if (dgs > 64) {
2737                      ... Be noisy about digest too large ...
2738                 } */
2739         }
2740
2741         drbd_put_data_sock(mdev);
2742
2743         return ok;
2744 }
2745
2746 /* answer packet, used to send data back for read requests:
2747  *  Peer       -> (diskless) R_PRIMARY   (P_DATA_REPLY)
2748  *  C_SYNC_SOURCE -> C_SYNC_TARGET         (P_RS_DATA_REPLY)
2749  */
2750 int drbd_send_block(struct drbd_conf *mdev, enum drbd_packets cmd,
2751                     struct drbd_epoch_entry *e)
2752 {
2753         int ok;
2754         struct p_data p;
2755         void *dgb;
2756         int dgs;
2757
2758         dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_w_tfm) ?
2759                 crypto_hash_digestsize(mdev->integrity_w_tfm) : 0;
2760
2761         if (e->i.size <= DRBD_MAX_SIZE_H80_PACKET) {
2762                 p.head.h80.magic   = cpu_to_be32(DRBD_MAGIC);
2763                 p.head.h80.command = cpu_to_be16(cmd);
2764                 p.head.h80.length  =
2765                         cpu_to_be16(sizeof(p) - sizeof(struct p_header80) + dgs + e->i.size);
2766         } else {
2767                 p.head.h95.magic   = cpu_to_be16(DRBD_MAGIC_BIG);
2768                 p.head.h95.command = cpu_to_be16(cmd);
2769                 p.head.h95.length  =
2770                         cpu_to_be32(sizeof(p) - sizeof(struct p_header80) + dgs + e->i.size);
2771         }
2772
2773         p.sector   = cpu_to_be64(e->i.sector);
2774         p.block_id = e->block_id;
2775         /* p.seq_num  = 0;    No sequence numbers here.. */
2776
2777         /* Only called by our kernel thread.
2778          * This one may be interrupted by DRBD_SIG and/or DRBD_SIGKILL
2779          * in response to admin command or module unload.
2780          */
2781         if (!drbd_get_data_sock(mdev))
2782                 return 0;
2783
2784         ok = sizeof(p) == drbd_send(mdev, mdev->data.socket, &p, sizeof(p), dgs ? MSG_MORE : 0);
2785         if (ok && dgs) {
2786                 dgb = mdev->int_dig_out;
2787                 drbd_csum_ee(mdev, mdev->integrity_w_tfm, e, dgb);
2788                 ok = dgs == drbd_send(mdev, mdev->data.socket, dgb, dgs, 0);
2789         }
2790         if (ok)
2791                 ok = _drbd_send_zc_ee(mdev, e);
2792
2793         drbd_put_data_sock(mdev);
2794
2795         return ok;
2796 }
2797
2798 int drbd_send_oos(struct drbd_conf *mdev, struct drbd_request *req)
2799 {
2800         struct p_block_desc p;
2801
2802         p.sector  = cpu_to_be64(req->i.sector);
2803         p.blksize = cpu_to_be32(req->i.size);
2804
2805         return drbd_send_cmd(mdev, USE_DATA_SOCKET, P_OUT_OF_SYNC, &p.head, sizeof(p));
2806 }
2807
2808 /*
2809   drbd_send distinguishes two cases:
2810
2811   Packets sent via the data socket "sock"
2812   and packets sent via the meta data socket "msock"
2813
2814                     sock                      msock
2815   -----------------+-------------------------+------------------------------
2816   timeout           conf.timeout / 2          conf.timeout / 2
2817   timeout action    send a ping via msock     Abort communication
2818                                               and close all sockets
2819 */
2820
2821 /*
2822  * you must have down()ed the appropriate [m]sock_mutex elsewhere!
2823  */
2824 int drbd_send(struct drbd_conf *mdev, struct socket *sock,
2825               void *buf, size_t size, unsigned msg_flags)
2826 {
2827         struct kvec iov;
2828         struct msghdr msg;
2829         int rv, sent = 0;
2830
2831         if (!sock)
2832                 return -1000;
2833
2834         /* THINK  if (signal_pending) return ... ? */
2835
2836         iov.iov_base = buf;
2837         iov.iov_len  = size;
2838
2839         msg.msg_name       = NULL;
2840         msg.msg_namelen    = 0;
2841         msg.msg_control    = NULL;
2842         msg.msg_controllen = 0;
2843         msg.msg_flags      = msg_flags | MSG_NOSIGNAL;
2844
2845         if (sock == mdev->data.socket) {
2846                 mdev->ko_count = mdev->tconn->net_conf->ko_count;
2847                 drbd_update_congested(mdev);
2848         }
2849         do {
2850                 /* STRANGE
2851                  * tcp_sendmsg does _not_ use its size parameter at all ?
2852                  *
2853                  * -EAGAIN on timeout, -EINTR on signal.
2854                  */
2855 /* THINK
2856  * do we need to block DRBD_SIG if sock == &meta.socket ??
2857  * otherwise wake_asender() might interrupt some send_*Ack !
2858  */
2859                 rv = kernel_sendmsg(sock, &msg, &iov, 1, size);
2860                 if (rv == -EAGAIN) {
2861                         if (we_should_drop_the_connection(mdev, sock))
2862                                 break;
2863                         else
2864                                 continue;
2865                 }
2866                 D_ASSERT(rv != 0);
2867                 if (rv == -EINTR) {
2868                         flush_signals(current);
2869                         rv = 0;
2870                 }
2871                 if (rv < 0)
2872                         break;
2873                 sent += rv;
2874                 iov.iov_base += rv;
2875                 iov.iov_len  -= rv;
2876         } while (sent < size);
2877
2878         if (sock == mdev->data.socket)
2879                 clear_bit(NET_CONGESTED, &mdev->flags);
2880
2881         if (rv <= 0) {
2882                 if (rv != -EAGAIN) {
2883                         dev_err(DEV, "%s_sendmsg returned %d\n",
2884                             sock == mdev->meta.socket ? "msock" : "sock",
2885                             rv);
2886                         drbd_force_state(mdev, NS(conn, C_BROKEN_PIPE));
2887                 } else
2888                         drbd_force_state(mdev, NS(conn, C_TIMEOUT));
2889         }
2890
2891         return sent;
2892 }
2893
2894 static int drbd_open(struct block_device *bdev, fmode_t mode)
2895 {
2896         struct drbd_conf *mdev = bdev->bd_disk->private_data;
2897         unsigned long flags;
2898         int rv = 0;
2899
2900         mutex_lock(&drbd_main_mutex);
2901         spin_lock_irqsave(&mdev->req_lock, flags);
2902         /* to have a stable mdev->state.role
2903          * and no race with updating open_cnt */
2904
2905         if (mdev->state.role != R_PRIMARY) {
2906                 if (mode & FMODE_WRITE)
2907                         rv = -EROFS;
2908                 else if (!allow_oos)
2909                         rv = -EMEDIUMTYPE;
2910         }
2911
2912         if (!rv)
2913                 mdev->open_cnt++;
2914         spin_unlock_irqrestore(&mdev->req_lock, flags);
2915         mutex_unlock(&drbd_main_mutex);
2916
2917         return rv;
2918 }
2919
2920 static int drbd_release(struct gendisk *gd, fmode_t mode)
2921 {
2922         struct drbd_conf *mdev = gd->private_data;
2923         mutex_lock(&drbd_main_mutex);
2924         mdev->open_cnt--;
2925         mutex_unlock(&drbd_main_mutex);
2926         return 0;
2927 }
2928
2929 static void drbd_set_defaults(struct drbd_conf *mdev)
2930 {
2931         /* This way we get a compile error when sync_conf grows,
2932            and we forgot to initialize it here */
2933         mdev->sync_conf = (struct syncer_conf) {
2934                 /* .rate = */           DRBD_RATE_DEF,
2935                 /* .after = */          DRBD_AFTER_DEF,
2936                 /* .al_extents = */     DRBD_AL_EXTENTS_DEF,
2937                 /* .verify_alg = */     {}, 0,
2938                 /* .cpu_mask = */       {}, 0,
2939                 /* .csums_alg = */      {}, 0,
2940                 /* .use_rle = */        0,
2941                 /* .on_no_data = */     DRBD_ON_NO_DATA_DEF,
2942                 /* .c_plan_ahead = */   DRBD_C_PLAN_AHEAD_DEF,
2943                 /* .c_delay_target = */ DRBD_C_DELAY_TARGET_DEF,
2944                 /* .c_fill_target = */  DRBD_C_FILL_TARGET_DEF,
2945                 /* .c_max_rate = */     DRBD_C_MAX_RATE_DEF,
2946                 /* .c_min_rate = */     DRBD_C_MIN_RATE_DEF
2947         };
2948
2949         /* Have to use that way, because the layout differs between
2950            big endian and little endian */
2951         mdev->state = (union drbd_state) {
2952                 { .role = R_SECONDARY,
2953                   .peer = R_UNKNOWN,
2954                   .conn = C_STANDALONE,
2955                   .disk = D_DISKLESS,
2956                   .pdsk = D_UNKNOWN,
2957                   .susp = 0,
2958                   .susp_nod = 0,
2959                   .susp_fen = 0
2960                 } };
2961 }
2962
2963 void drbd_init_set_defaults(struct drbd_conf *mdev)
2964 {
2965         /* the memset(,0,) did most of this.
2966          * note: only assignments, no allocation in here */
2967
2968         drbd_set_defaults(mdev);
2969
2970         atomic_set(&mdev->ap_bio_cnt, 0);
2971         atomic_set(&mdev->ap_pending_cnt, 0);
2972         atomic_set(&mdev->rs_pending_cnt, 0);
2973         atomic_set(&mdev->unacked_cnt, 0);
2974         atomic_set(&mdev->local_cnt, 0);
2975         atomic_set(&mdev->packet_seq, 0);
2976         atomic_set(&mdev->pp_in_use, 0);
2977         atomic_set(&mdev->pp_in_use_by_net, 0);
2978         atomic_set(&mdev->rs_sect_in, 0);
2979         atomic_set(&mdev->rs_sect_ev, 0);
2980         atomic_set(&mdev->ap_in_flight, 0);
2981
2982         mutex_init(&mdev->md_io_mutex);
2983         mutex_init(&mdev->data.mutex);
2984         mutex_init(&mdev->meta.mutex);
2985         sema_init(&mdev->data.work.s, 0);
2986         sema_init(&mdev->meta.work.s, 0);
2987         mutex_init(&mdev->state_mutex);
2988
2989         spin_lock_init(&mdev->data.work.q_lock);
2990         spin_lock_init(&mdev->meta.work.q_lock);
2991
2992         spin_lock_init(&mdev->al_lock);
2993         spin_lock_init(&mdev->req_lock);
2994         spin_lock_init(&mdev->peer_seq_lock);
2995         spin_lock_init(&mdev->epoch_lock);
2996
2997         INIT_LIST_HEAD(&mdev->active_ee);
2998         INIT_LIST_HEAD(&mdev->sync_ee);
2999         INIT_LIST_HEAD(&mdev->done_ee);
3000         INIT_LIST_HEAD(&mdev->read_ee);
3001         INIT_LIST_HEAD(&mdev->net_ee);
3002         INIT_LIST_HEAD(&mdev->resync_reads);
3003         INIT_LIST_HEAD(&mdev->data.work.q);
3004         INIT_LIST_HEAD(&mdev->meta.work.q);
3005         INIT_LIST_HEAD(&mdev->resync_work.list);
3006         INIT_LIST_HEAD(&mdev->unplug_work.list);
3007         INIT_LIST_HEAD(&mdev->go_diskless.list);
3008         INIT_LIST_HEAD(&mdev->md_sync_work.list);
3009         INIT_LIST_HEAD(&mdev->start_resync_work.list);
3010         INIT_LIST_HEAD(&mdev->bm_io_work.w.list);
3011
3012         mdev->resync_work.cb  = w_resync_timer;
3013         mdev->unplug_work.cb  = w_send_write_hint;
3014         mdev->go_diskless.cb  = w_go_diskless;
3015         mdev->md_sync_work.cb = w_md_sync;
3016         mdev->bm_io_work.w.cb = w_bitmap_io;
3017         mdev->start_resync_work.cb = w_start_resync;
3018         init_timer(&mdev->resync_timer);
3019         init_timer(&mdev->md_sync_timer);
3020         init_timer(&mdev->start_resync_timer);
3021         init_timer(&mdev->request_timer);
3022         mdev->resync_timer.function = resync_timer_fn;
3023         mdev->resync_timer.data = (unsigned long) mdev;
3024         mdev->md_sync_timer.function = md_sync_timer_fn;
3025         mdev->md_sync_timer.data = (unsigned long) mdev;
3026         mdev->start_resync_timer.function = start_resync_timer_fn;
3027         mdev->start_resync_timer.data = (unsigned long) mdev;
3028         mdev->request_timer.function = request_timer_fn;
3029         mdev->request_timer.data = (unsigned long) mdev;
3030
3031         init_waitqueue_head(&mdev->misc_wait);
3032         init_waitqueue_head(&mdev->state_wait);
3033         init_waitqueue_head(&mdev->ee_wait);
3034         init_waitqueue_head(&mdev->al_wait);
3035         init_waitqueue_head(&mdev->seq_wait);
3036
3037         drbd_thread_init(mdev, &mdev->receiver, drbdd_init);
3038         drbd_thread_init(mdev, &mdev->worker, drbd_worker);
3039         drbd_thread_init(mdev, &mdev->asender, drbd_asender);
3040
3041         mdev->agreed_pro_version = PRO_VERSION_MAX;
3042         mdev->write_ordering = WO_bdev_flush;
3043         mdev->resync_wenr = LC_FREE;
3044         mdev->peer_max_bio_size = DRBD_MAX_BIO_SIZE_SAFE;
3045         mdev->local_max_bio_size = DRBD_MAX_BIO_SIZE_SAFE;
3046 }
3047
3048 void drbd_mdev_cleanup(struct drbd_conf *mdev)
3049 {
3050         int i;
3051         if (mdev->receiver.t_state != NONE)
3052                 dev_err(DEV, "ASSERT FAILED: receiver t_state == %d expected 0.\n",
3053                                 mdev->receiver.t_state);
3054
3055         /* no need to lock it, I'm the only thread alive */
3056         if (atomic_read(&mdev->current_epoch->epoch_size) !=  0)
3057                 dev_err(DEV, "epoch_size:%d\n", atomic_read(&mdev->current_epoch->epoch_size));
3058         mdev->al_writ_cnt  =
3059         mdev->bm_writ_cnt  =
3060         mdev->read_cnt     =
3061         mdev->recv_cnt     =
3062         mdev->send_cnt     =
3063         mdev->writ_cnt     =
3064         mdev->p_size       =
3065         mdev->rs_start     =
3066         mdev->rs_total     =
3067         mdev->rs_failed    = 0;
3068         mdev->rs_last_events = 0;
3069         mdev->rs_last_sect_ev = 0;
3070         for (i = 0; i < DRBD_SYNC_MARKS; i++) {
3071                 mdev->rs_mark_left[i] = 0;
3072                 mdev->rs_mark_time[i] = 0;
3073         }
3074         D_ASSERT(mdev->tconn->net_conf == NULL);
3075
3076         drbd_set_my_capacity(mdev, 0);
3077         if (mdev->bitmap) {
3078                 /* maybe never allocated. */
3079                 drbd_bm_resize(mdev, 0, 1);
3080                 drbd_bm_cleanup(mdev);
3081         }
3082
3083         drbd_free_resources(mdev);
3084         clear_bit(AL_SUSPENDED, &mdev->flags);
3085
3086         /*
3087          * currently we drbd_init_ee only on module load, so
3088          * we may do drbd_release_ee only on module unload!
3089          */
3090         D_ASSERT(list_empty(&mdev->active_ee));
3091         D_ASSERT(list_empty(&mdev->sync_ee));
3092         D_ASSERT(list_empty(&mdev->done_ee));
3093         D_ASSERT(list_empty(&mdev->read_ee));
3094         D_ASSERT(list_empty(&mdev->net_ee));
3095         D_ASSERT(list_empty(&mdev->resync_reads));
3096         D_ASSERT(list_empty(&mdev->data.work.q));
3097         D_ASSERT(list_empty(&mdev->meta.work.q));
3098         D_ASSERT(list_empty(&mdev->resync_work.list));
3099         D_ASSERT(list_empty(&mdev->unplug_work.list));
3100         D_ASSERT(list_empty(&mdev->go_diskless.list));
3101
3102         drbd_set_defaults(mdev);
3103 }
3104
3105
3106 static void drbd_destroy_mempools(void)
3107 {
3108         struct page *page;
3109
3110         while (drbd_pp_pool) {
3111                 page = drbd_pp_pool;
3112                 drbd_pp_pool = (struct page *)page_private(page);
3113                 __free_page(page);
3114                 drbd_pp_vacant--;
3115         }
3116
3117         /* D_ASSERT(atomic_read(&drbd_pp_vacant)==0); */
3118
3119         if (drbd_ee_mempool)
3120                 mempool_destroy(drbd_ee_mempool);
3121         if (drbd_request_mempool)
3122                 mempool_destroy(drbd_request_mempool);
3123         if (drbd_ee_cache)
3124                 kmem_cache_destroy(drbd_ee_cache);
3125         if (drbd_request_cache)
3126                 kmem_cache_destroy(drbd_request_cache);
3127         if (drbd_bm_ext_cache)
3128                 kmem_cache_destroy(drbd_bm_ext_cache);
3129         if (drbd_al_ext_cache)
3130                 kmem_cache_destroy(drbd_al_ext_cache);
3131
3132         drbd_ee_mempool      = NULL;
3133         drbd_request_mempool = NULL;
3134         drbd_ee_cache        = NULL;
3135         drbd_request_cache   = NULL;
3136         drbd_bm_ext_cache    = NULL;
3137         drbd_al_ext_cache    = NULL;
3138
3139         return;
3140 }
3141
3142 static int drbd_create_mempools(void)
3143 {
3144         struct page *page;
3145         const int number = (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count;
3146         int i;
3147
3148         /* prepare our caches and mempools */
3149         drbd_request_mempool = NULL;
3150         drbd_ee_cache        = NULL;
3151         drbd_request_cache   = NULL;
3152         drbd_bm_ext_cache    = NULL;
3153         drbd_al_ext_cache    = NULL;
3154         drbd_pp_pool         = NULL;
3155
3156         /* caches */
3157         drbd_request_cache = kmem_cache_create(
3158                 "drbd_req", sizeof(struct drbd_request), 0, 0, NULL);
3159         if (drbd_request_cache == NULL)
3160                 goto Enomem;
3161
3162         drbd_ee_cache = kmem_cache_create(
3163                 "drbd_ee", sizeof(struct drbd_epoch_entry), 0, 0, NULL);
3164         if (drbd_ee_cache == NULL)
3165                 goto Enomem;
3166
3167         drbd_bm_ext_cache = kmem_cache_create(
3168                 "drbd_bm", sizeof(struct bm_extent), 0, 0, NULL);
3169         if (drbd_bm_ext_cache == NULL)
3170                 goto Enomem;
3171
3172         drbd_al_ext_cache = kmem_cache_create(
3173                 "drbd_al", sizeof(struct lc_element), 0, 0, NULL);
3174         if (drbd_al_ext_cache == NULL)
3175                 goto Enomem;
3176
3177         /* mempools */
3178         drbd_request_mempool = mempool_create(number,
3179                 mempool_alloc_slab, mempool_free_slab, drbd_request_cache);
3180         if (drbd_request_mempool == NULL)
3181                 goto Enomem;
3182
3183         drbd_ee_mempool = mempool_create(number,
3184                 mempool_alloc_slab, mempool_free_slab, drbd_ee_cache);
3185         if (drbd_ee_mempool == NULL)
3186                 goto Enomem;
3187
3188         /* drbd's page pool */
3189         spin_lock_init(&drbd_pp_lock);
3190
3191         for (i = 0; i < number; i++) {
3192                 page = alloc_page(GFP_HIGHUSER);
3193                 if (!page)
3194                         goto Enomem;
3195                 set_page_private(page, (unsigned long)drbd_pp_pool);
3196                 drbd_pp_pool = page;
3197         }
3198         drbd_pp_vacant = number;
3199
3200         return 0;
3201
3202 Enomem:
3203         drbd_destroy_mempools(); /* in case we allocated some */
3204         return -ENOMEM;
3205 }
3206
3207 static int drbd_notify_sys(struct notifier_block *this, unsigned long code,
3208         void *unused)
3209 {
3210         /* just so we have it.  you never know what interesting things we
3211          * might want to do here some day...
3212          */
3213
3214         return NOTIFY_DONE;
3215 }
3216
3217 static struct notifier_block drbd_notifier = {
3218         .notifier_call = drbd_notify_sys,
3219 };
3220
3221 static void drbd_release_ee_lists(struct drbd_conf *mdev)
3222 {
3223         int rr;
3224
3225         rr = drbd_release_ee(mdev, &mdev->active_ee);
3226         if (rr)
3227                 dev_err(DEV, "%d EEs in active list found!\n", rr);
3228
3229         rr = drbd_release_ee(mdev, &mdev->sync_ee);
3230         if (rr)
3231                 dev_err(DEV, "%d EEs in sync list found!\n", rr);
3232
3233         rr = drbd_release_ee(mdev, &mdev->read_ee);
3234         if (rr)
3235                 dev_err(DEV, "%d EEs in read list found!\n", rr);
3236
3237         rr = drbd_release_ee(mdev, &mdev->done_ee);
3238         if (rr)
3239                 dev_err(DEV, "%d EEs in done list found!\n", rr);
3240
3241         rr = drbd_release_ee(mdev, &mdev->net_ee);
3242         if (rr)
3243                 dev_err(DEV, "%d EEs in net list found!\n", rr);
3244 }
3245
3246 /* caution. no locking.
3247  * currently only used from module cleanup code. */
3248 static void drbd_delete_device(unsigned int minor)
3249 {
3250         struct drbd_conf *mdev = minor_to_mdev(minor);
3251
3252         if (!mdev)
3253                 return;
3254
3255         /* paranoia asserts */
3256         D_ASSERT(mdev->open_cnt == 0);
3257         D_ASSERT(list_empty(&mdev->data.work.q));
3258         /* end paranoia asserts */
3259
3260         del_gendisk(mdev->vdisk);
3261
3262         /* cleanup stuff that may have been allocated during
3263          * device (re-)configuration or state changes */
3264
3265         if (mdev->this_bdev)
3266                 bdput(mdev->this_bdev);
3267
3268         drbd_free_resources(mdev);
3269         drbd_free_tconn(mdev->tconn);
3270
3271         drbd_release_ee_lists(mdev);
3272
3273         lc_destroy(mdev->act_log);
3274         lc_destroy(mdev->resync);
3275
3276         kfree(mdev->p_uuid);
3277         /* mdev->p_uuid = NULL; */
3278
3279         kfree(mdev->int_dig_out);
3280         kfree(mdev->int_dig_in);
3281         kfree(mdev->int_dig_vv);
3282
3283         /* cleanup the rest that has been
3284          * allocated from drbd_new_device
3285          * and actually free the mdev itself */
3286         drbd_free_mdev(mdev);
3287 }
3288
3289 static void drbd_cleanup(void)
3290 {
3291         unsigned int i;
3292
3293         unregister_reboot_notifier(&drbd_notifier);
3294
3295         /* first remove proc,
3296          * drbdsetup uses it's presence to detect
3297          * whether DRBD is loaded.
3298          * If we would get stuck in proc removal,
3299          * but have netlink already deregistered,
3300          * some drbdsetup commands may wait forever
3301          * for an answer.
3302          */
3303         if (drbd_proc)
3304                 remove_proc_entry("drbd", NULL);
3305
3306         drbd_nl_cleanup();
3307
3308         if (minor_table) {
3309                 i = minor_count;
3310                 while (i--)
3311                         drbd_delete_device(i);
3312                 drbd_destroy_mempools();
3313         }
3314
3315         kfree(minor_table);
3316
3317         unregister_blkdev(DRBD_MAJOR, "drbd");
3318
3319         printk(KERN_INFO "drbd: module cleanup done.\n");
3320 }
3321
3322 /**
3323  * drbd_congested() - Callback for pdflush
3324  * @congested_data:     User data
3325  * @bdi_bits:           Bits pdflush is currently interested in
3326  *
3327  * Returns 1<<BDI_async_congested and/or 1<<BDI_sync_congested if we are congested.
3328  */
3329 static int drbd_congested(void *congested_data, int bdi_bits)
3330 {
3331         struct drbd_conf *mdev = congested_data;
3332         struct request_queue *q;
3333         char reason = '-';
3334         int r = 0;
3335
3336         if (!may_inc_ap_bio(mdev)) {
3337                 /* DRBD has frozen IO */
3338                 r = bdi_bits;
3339                 reason = 'd';
3340                 goto out;
3341         }
3342
3343         if (get_ldev(mdev)) {
3344                 q = bdev_get_queue(mdev->ldev->backing_bdev);
3345                 r = bdi_congested(&q->backing_dev_info, bdi_bits);
3346                 put_ldev(mdev);
3347                 if (r)
3348                         reason = 'b';
3349         }
3350
3351         if (bdi_bits & (1 << BDI_async_congested) && test_bit(NET_CONGESTED, &mdev->flags)) {
3352                 r |= (1 << BDI_async_congested);
3353                 reason = reason == 'b' ? 'a' : 'n';
3354         }
3355
3356 out:
3357         mdev->congestion_reason = reason;
3358         return r;
3359 }
3360
3361 struct drbd_tconn *drbd_new_tconn(char *name)
3362 {
3363         struct drbd_tconn *tconn;
3364
3365         tconn = kzalloc(sizeof(struct drbd_tconn), GFP_KERNEL);
3366         if (!tconn)
3367                 return NULL;
3368
3369         tconn->name = kstrdup(name, GFP_KERNEL);
3370         if (!tconn->name)
3371                 goto fail;
3372
3373         atomic_set(&tconn->net_cnt, 0);
3374         init_waitqueue_head(&tconn->net_cnt_wait);
3375
3376         write_lock_irq(&global_state_lock);
3377         list_add(&tconn->all_tconn, &drbd_tconns);
3378         write_unlock_irq(&global_state_lock);
3379
3380         return tconn;
3381
3382 fail:
3383         kfree(tconn->name);
3384         kfree(tconn);
3385
3386         return NULL;
3387 }
3388
3389 void drbd_free_tconn(struct drbd_tconn *tconn)
3390 {
3391         write_lock_irq(&global_state_lock);
3392         list_del(&tconn->all_tconn);
3393         write_unlock_irq(&global_state_lock);
3394
3395         kfree(tconn->name);
3396         kfree(tconn);
3397 }
3398
3399 struct drbd_conf *drbd_new_device(unsigned int minor)
3400 {
3401         struct drbd_conf *mdev;
3402         struct gendisk *disk;
3403         struct request_queue *q;
3404
3405         /* GFP_KERNEL, we are outside of all write-out paths */
3406         mdev = kzalloc(sizeof(struct drbd_conf), GFP_KERNEL);
3407         if (!mdev)
3408                 return NULL;
3409         mdev->tconn = drbd_new_tconn("dummy");
3410         if (!mdev->tconn)
3411                 goto out_no_tconn;
3412
3413         if (!zalloc_cpumask_var(&mdev->cpu_mask, GFP_KERNEL))
3414                 goto out_no_cpumask;
3415
3416         mdev->tconn->volume0 = mdev;
3417         mdev->minor = minor;
3418
3419         drbd_init_set_defaults(mdev);
3420
3421         q = blk_alloc_queue(GFP_KERNEL);
3422         if (!q)
3423                 goto out_no_q;
3424         mdev->rq_queue = q;
3425         q->queuedata   = mdev;
3426
3427         disk = alloc_disk(1);
3428         if (!disk)
3429                 goto out_no_disk;
3430         mdev->vdisk = disk;
3431
3432         set_disk_ro(disk, true);
3433
3434         disk->queue = q;
3435         disk->major = DRBD_MAJOR;
3436         disk->first_minor = minor;
3437         disk->fops = &drbd_ops;
3438         sprintf(disk->disk_name, "drbd%d", minor);
3439         disk->private_data = mdev;
3440
3441         mdev->this_bdev = bdget(MKDEV(DRBD_MAJOR, minor));
3442         /* we have no partitions. we contain only ourselves. */
3443         mdev->this_bdev->bd_contains = mdev->this_bdev;
3444
3445         q->backing_dev_info.congested_fn = drbd_congested;
3446         q->backing_dev_info.congested_data = mdev;
3447
3448         blk_queue_make_request(q, drbd_make_request);
3449         /* Setting the max_hw_sectors to an odd value of 8kibyte here
3450            This triggers a max_bio_size message upon first attach or connect */
3451         blk_queue_max_hw_sectors(q, DRBD_MAX_BIO_SIZE_SAFE >> 8);
3452         blk_queue_bounce_limit(q, BLK_BOUNCE_ANY);
3453         blk_queue_merge_bvec(q, drbd_merge_bvec);
3454         q->queue_lock = &mdev->req_lock;
3455
3456         mdev->md_io_page = alloc_page(GFP_KERNEL);
3457         if (!mdev->md_io_page)
3458                 goto out_no_io_page;
3459
3460         if (drbd_bm_init(mdev))
3461                 goto out_no_bitmap;
3462         /* no need to lock access, we are still initializing this minor device. */
3463         if (!tl_init(mdev))
3464                 goto out_no_tl;
3465         mdev->read_requests = RB_ROOT;
3466         mdev->write_requests = RB_ROOT;
3467         mdev->epoch_entries = RB_ROOT;
3468
3469         mdev->current_epoch = kzalloc(sizeof(struct drbd_epoch), GFP_KERNEL);
3470         if (!mdev->current_epoch)
3471                 goto out_no_epoch;
3472
3473         INIT_LIST_HEAD(&mdev->current_epoch->list);
3474         mdev->epochs = 1;
3475
3476         return mdev;
3477
3478 /* out_whatever_else:
3479         kfree(mdev->current_epoch); */
3480 out_no_epoch:
3481         tl_cleanup(mdev);
3482 out_no_tl:
3483         drbd_bm_cleanup(mdev);
3484 out_no_bitmap:
3485         __free_page(mdev->md_io_page);
3486 out_no_io_page:
3487         put_disk(disk);
3488 out_no_disk:
3489         blk_cleanup_queue(q);
3490 out_no_q:
3491         free_cpumask_var(mdev->cpu_mask);
3492 out_no_cpumask:
3493         drbd_free_tconn(mdev->tconn);
3494 out_no_tconn:
3495         kfree(mdev);
3496         return NULL;
3497 }
3498
3499 /* counterpart of drbd_new_device.
3500  * last part of drbd_delete_device. */
3501 void drbd_free_mdev(struct drbd_conf *mdev)
3502 {
3503         kfree(mdev->current_epoch);
3504         tl_cleanup(mdev);
3505         if (mdev->bitmap) /* should no longer be there. */
3506                 drbd_bm_cleanup(mdev);
3507         __free_page(mdev->md_io_page);
3508         put_disk(mdev->vdisk);
3509         blk_cleanup_queue(mdev->rq_queue);
3510         free_cpumask_var(mdev->cpu_mask);
3511         kfree(mdev);
3512 }
3513
3514
3515 int __init drbd_init(void)
3516 {
3517         int err;
3518
3519         if (sizeof(struct p_handshake) != 80) {
3520                 printk(KERN_ERR
3521                        "drbd: never change the size or layout "
3522                        "of the HandShake packet.\n");
3523                 return -EINVAL;
3524         }
3525
3526         if (minor_count < DRBD_MINOR_COUNT_MIN || minor_count > DRBD_MINOR_COUNT_MAX) {
3527                 printk(KERN_ERR
3528                         "drbd: invalid minor_count (%d)\n", minor_count);
3529 #ifdef MODULE
3530                 return -EINVAL;
3531 #else
3532                 minor_count = 8;
3533 #endif
3534         }
3535
3536         err = drbd_nl_init();
3537         if (err)
3538                 return err;
3539
3540         err = register_blkdev(DRBD_MAJOR, "drbd");
3541         if (err) {
3542                 printk(KERN_ERR
3543                        "drbd: unable to register block device major %d\n",
3544                        DRBD_MAJOR);
3545                 return err;
3546         }
3547
3548         register_reboot_notifier(&drbd_notifier);
3549
3550         /*
3551          * allocate all necessary structs
3552          */
3553         err = -ENOMEM;
3554
3555         init_waitqueue_head(&drbd_pp_wait);
3556
3557         drbd_proc = NULL; /* play safe for drbd_cleanup */
3558         minor_table = kzalloc(sizeof(struct drbd_conf *)*minor_count,
3559                                 GFP_KERNEL);
3560         if (!minor_table)
3561                 goto Enomem;
3562
3563         err = drbd_create_mempools();
3564         if (err)
3565                 goto Enomem;
3566
3567         drbd_proc = proc_create_data("drbd", S_IFREG | S_IRUGO , NULL, &drbd_proc_fops, NULL);
3568         if (!drbd_proc) {
3569                 printk(KERN_ERR "drbd: unable to register proc file\n");
3570                 goto Enomem;
3571         }
3572
3573         rwlock_init(&global_state_lock);
3574         INIT_LIST_HEAD(&drbd_tconns);
3575
3576         printk(KERN_INFO "drbd: initialized. "
3577                "Version: " REL_VERSION " (api:%d/proto:%d-%d)\n",
3578                API_VERSION, PRO_VERSION_MIN, PRO_VERSION_MAX);
3579         printk(KERN_INFO "drbd: %s\n", drbd_buildtag());
3580         printk(KERN_INFO "drbd: registered as block device major %d\n",
3581                 DRBD_MAJOR);
3582         printk(KERN_INFO "drbd: minor_table @ 0x%p\n", minor_table);
3583
3584         return 0; /* Success! */
3585
3586 Enomem:
3587         drbd_cleanup();
3588         if (err == -ENOMEM)
3589                 /* currently always the case */
3590                 printk(KERN_ERR "drbd: ran out of memory\n");
3591         else
3592                 printk(KERN_ERR "drbd: initialization failure\n");
3593         return err;
3594 }
3595
3596 void drbd_free_bc(struct drbd_backing_dev *ldev)
3597 {
3598         if (ldev == NULL)
3599                 return;
3600
3601         blkdev_put(ldev->backing_bdev, FMODE_READ | FMODE_WRITE | FMODE_EXCL);
3602         blkdev_put(ldev->md_bdev, FMODE_READ | FMODE_WRITE | FMODE_EXCL);
3603
3604         kfree(ldev);
3605 }
3606
3607 void drbd_free_sock(struct drbd_conf *mdev)
3608 {
3609         if (mdev->data.socket) {
3610                 mutex_lock(&mdev->data.mutex);
3611                 kernel_sock_shutdown(mdev->data.socket, SHUT_RDWR);
3612                 sock_release(mdev->data.socket);
3613                 mdev->data.socket = NULL;
3614                 mutex_unlock(&mdev->data.mutex);
3615         }
3616         if (mdev->meta.socket) {
3617                 mutex_lock(&mdev->meta.mutex);
3618                 kernel_sock_shutdown(mdev->meta.socket, SHUT_RDWR);
3619                 sock_release(mdev->meta.socket);
3620                 mdev->meta.socket = NULL;
3621                 mutex_unlock(&mdev->meta.mutex);
3622         }
3623 }
3624
3625
3626 void drbd_free_resources(struct drbd_conf *mdev)
3627 {
3628         crypto_free_hash(mdev->csums_tfm);
3629         mdev->csums_tfm = NULL;
3630         crypto_free_hash(mdev->verify_tfm);
3631         mdev->verify_tfm = NULL;
3632         crypto_free_hash(mdev->cram_hmac_tfm);
3633         mdev->cram_hmac_tfm = NULL;
3634         crypto_free_hash(mdev->integrity_w_tfm);
3635         mdev->integrity_w_tfm = NULL;
3636         crypto_free_hash(mdev->integrity_r_tfm);
3637         mdev->integrity_r_tfm = NULL;
3638
3639         drbd_free_sock(mdev);
3640
3641         __no_warn(local,
3642                   drbd_free_bc(mdev->ldev);
3643                   mdev->ldev = NULL;);
3644 }
3645
3646 /* meta data management */
3647
3648 struct meta_data_on_disk {
3649         u64 la_size;           /* last agreed size. */
3650         u64 uuid[UI_SIZE];   /* UUIDs. */
3651         u64 device_uuid;
3652         u64 reserved_u64_1;
3653         u32 flags;             /* MDF */
3654         u32 magic;
3655         u32 md_size_sect;
3656         u32 al_offset;         /* offset to this block */
3657         u32 al_nr_extents;     /* important for restoring the AL */
3658               /* `-- act_log->nr_elements <-- sync_conf.al_extents */
3659         u32 bm_offset;         /* offset to the bitmap, from here */
3660         u32 bm_bytes_per_bit;  /* BM_BLOCK_SIZE */
3661         u32 la_peer_max_bio_size;   /* last peer max_bio_size */
3662         u32 reserved_u32[3];
3663
3664 } __packed;
3665
3666 /**
3667  * drbd_md_sync() - Writes the meta data super block if the MD_DIRTY flag bit is set
3668  * @mdev:       DRBD device.
3669  */
3670 void drbd_md_sync(struct drbd_conf *mdev)
3671 {
3672         struct meta_data_on_disk *buffer;
3673         sector_t sector;
3674         int i;
3675
3676         del_timer(&mdev->md_sync_timer);
3677         /* timer may be rearmed by drbd_md_mark_dirty() now. */
3678         if (!test_and_clear_bit(MD_DIRTY, &mdev->flags))
3679                 return;
3680
3681         /* We use here D_FAILED and not D_ATTACHING because we try to write
3682          * metadata even if we detach due to a disk failure! */
3683         if (!get_ldev_if_state(mdev, D_FAILED))
3684                 return;
3685
3686         mutex_lock(&mdev->md_io_mutex);
3687         buffer = (struct meta_data_on_disk *)page_address(mdev->md_io_page);
3688         memset(buffer, 0, 512);
3689
3690         buffer->la_size = cpu_to_be64(drbd_get_capacity(mdev->this_bdev));
3691         for (i = UI_CURRENT; i < UI_SIZE; i++)
3692                 buffer->uuid[i] = cpu_to_be64(mdev->ldev->md.uuid[i]);
3693         buffer->flags = cpu_to_be32(mdev->ldev->md.flags);
3694         buffer->magic = cpu_to_be32(DRBD_MD_MAGIC);
3695
3696         buffer->md_size_sect  = cpu_to_be32(mdev->ldev->md.md_size_sect);
3697         buffer->al_offset     = cpu_to_be32(mdev->ldev->md.al_offset);
3698         buffer->al_nr_extents = cpu_to_be32(mdev->act_log->nr_elements);
3699         buffer->bm_bytes_per_bit = cpu_to_be32(BM_BLOCK_SIZE);
3700         buffer->device_uuid = cpu_to_be64(mdev->ldev->md.device_uuid);
3701
3702         buffer->bm_offset = cpu_to_be32(mdev->ldev->md.bm_offset);
3703         buffer->la_peer_max_bio_size = cpu_to_be32(mdev->peer_max_bio_size);
3704
3705         D_ASSERT(drbd_md_ss__(mdev, mdev->ldev) == mdev->ldev->md.md_offset);
3706         sector = mdev->ldev->md.md_offset;
3707
3708         if (!drbd_md_sync_page_io(mdev, mdev->ldev, sector, WRITE)) {
3709                 /* this was a try anyways ... */
3710                 dev_err(DEV, "meta data update failed!\n");
3711                 drbd_chk_io_error(mdev, 1, true);
3712         }
3713
3714         /* Update mdev->ldev->md.la_size_sect,
3715          * since we updated it on metadata. */
3716         mdev->ldev->md.la_size_sect = drbd_get_capacity(mdev->this_bdev);
3717
3718         mutex_unlock(&mdev->md_io_mutex);
3719         put_ldev(mdev);
3720 }
3721
3722 /**
3723  * drbd_md_read() - Reads in the meta data super block
3724  * @mdev:       DRBD device.
3725  * @bdev:       Device from which the meta data should be read in.
3726  *
3727  * Return 0 (NO_ERROR) on success, and an enum drbd_ret_code in case
3728  * something goes wrong.  Currently only: ERR_IO_MD_DISK, ERR_MD_INVALID.
3729  */
3730 int drbd_md_read(struct drbd_conf *mdev, struct drbd_backing_dev *bdev)
3731 {
3732         struct meta_data_on_disk *buffer;
3733         int i, rv = NO_ERROR;
3734
3735         if (!get_ldev_if_state(mdev, D_ATTACHING))
3736                 return ERR_IO_MD_DISK;
3737
3738         mutex_lock(&mdev->md_io_mutex);
3739         buffer = (struct meta_data_on_disk *)page_address(mdev->md_io_page);
3740
3741         if (!drbd_md_sync_page_io(mdev, bdev, bdev->md.md_offset, READ)) {
3742                 /* NOTE: can't do normal error processing here as this is
3743                    called BEFORE disk is attached */
3744                 dev_err(DEV, "Error while reading metadata.\n");
3745                 rv = ERR_IO_MD_DISK;
3746                 goto err;
3747         }
3748
3749         if (buffer->magic != cpu_to_be32(DRBD_MD_MAGIC)) {
3750                 dev_err(DEV, "Error while reading metadata, magic not found.\n");
3751                 rv = ERR_MD_INVALID;
3752                 goto err;
3753         }
3754         if (be32_to_cpu(buffer->al_offset) != bdev->md.al_offset) {
3755                 dev_err(DEV, "unexpected al_offset: %d (expected %d)\n",
3756                     be32_to_cpu(buffer->al_offset), bdev->md.al_offset);
3757                 rv = ERR_MD_INVALID;
3758                 goto err;
3759         }
3760         if (be32_to_cpu(buffer->bm_offset) != bdev->md.bm_offset) {
3761                 dev_err(DEV, "unexpected bm_offset: %d (expected %d)\n",
3762                     be32_to_cpu(buffer->bm_offset), bdev->md.bm_offset);
3763                 rv = ERR_MD_INVALID;
3764                 goto err;
3765         }
3766         if (be32_to_cpu(buffer->md_size_sect) != bdev->md.md_size_sect) {
3767                 dev_err(DEV, "unexpected md_size: %u (expected %u)\n",
3768                     be32_to_cpu(buffer->md_size_sect), bdev->md.md_size_sect);
3769                 rv = ERR_MD_INVALID;
3770                 goto err;
3771         }
3772
3773         if (be32_to_cpu(buffer->bm_bytes_per_bit) != BM_BLOCK_SIZE) {
3774                 dev_err(DEV, "unexpected bm_bytes_per_bit: %u (expected %u)\n",
3775                     be32_to_cpu(buffer->bm_bytes_per_bit), BM_BLOCK_SIZE);
3776                 rv = ERR_MD_INVALID;
3777                 goto err;
3778         }
3779
3780         bdev->md.la_size_sect = be64_to_cpu(buffer->la_size);
3781         for (i = UI_CURRENT; i < UI_SIZE; i++)
3782                 bdev->md.uuid[i] = be64_to_cpu(buffer->uuid[i]);
3783         bdev->md.flags = be32_to_cpu(buffer->flags);
3784         mdev->sync_conf.al_extents = be32_to_cpu(buffer->al_nr_extents);
3785         bdev->md.device_uuid = be64_to_cpu(buffer->device_uuid);
3786
3787         spin_lock_irq(&mdev->req_lock);
3788         if (mdev->state.conn < C_CONNECTED) {
3789                 int peer;
3790                 peer = be32_to_cpu(buffer->la_peer_max_bio_size);
3791                 peer = max_t(int, peer, DRBD_MAX_BIO_SIZE_SAFE);
3792                 mdev->peer_max_bio_size = peer;
3793         }
3794         spin_unlock_irq(&mdev->req_lock);
3795
3796         if (mdev->sync_conf.al_extents < 7)
3797                 mdev->sync_conf.al_extents = 127;
3798
3799  err:
3800         mutex_unlock(&mdev->md_io_mutex);
3801         put_ldev(mdev);
3802
3803         return rv;
3804 }
3805
3806 /**
3807  * drbd_md_mark_dirty() - Mark meta data super block as dirty
3808  * @mdev:       DRBD device.
3809  *
3810  * Call this function if you change anything that should be written to
3811  * the meta-data super block. This function sets MD_DIRTY, and starts a
3812  * timer that ensures that within five seconds you have to call drbd_md_sync().
3813  */
3814 #ifdef DEBUG
3815 void drbd_md_mark_dirty_(struct drbd_conf *mdev, unsigned int line, const char *func)
3816 {
3817         if (!test_and_set_bit(MD_DIRTY, &mdev->flags)) {
3818                 mod_timer(&mdev->md_sync_timer, jiffies + HZ);
3819                 mdev->last_md_mark_dirty.line = line;
3820                 mdev->last_md_mark_dirty.func = func;
3821         }
3822 }
3823 #else
3824 void drbd_md_mark_dirty(struct drbd_conf *mdev)
3825 {
3826         if (!test_and_set_bit(MD_DIRTY, &mdev->flags))
3827                 mod_timer(&mdev->md_sync_timer, jiffies + 5*HZ);
3828 }
3829 #endif
3830
3831 static void drbd_uuid_move_history(struct drbd_conf *mdev) __must_hold(local)
3832 {
3833         int i;
3834
3835         for (i = UI_HISTORY_START; i < UI_HISTORY_END; i++)
3836                 mdev->ldev->md.uuid[i+1] = mdev->ldev->md.uuid[i];
3837 }
3838
3839 void _drbd_uuid_set(struct drbd_conf *mdev, int idx, u64 val) __must_hold(local)
3840 {
3841         if (idx == UI_CURRENT) {
3842                 if (mdev->state.role == R_PRIMARY)
3843                         val |= 1;
3844                 else
3845                         val &= ~((u64)1);
3846
3847                 drbd_set_ed_uuid(mdev, val);
3848         }
3849
3850         mdev->ldev->md.uuid[idx] = val;
3851         drbd_md_mark_dirty(mdev);
3852 }
3853
3854
3855 void drbd_uuid_set(struct drbd_conf *mdev, int idx, u64 val) __must_hold(local)
3856 {
3857         if (mdev->ldev->md.uuid[idx]) {
3858                 drbd_uuid_move_history(mdev);
3859                 mdev->ldev->md.uuid[UI_HISTORY_START] = mdev->ldev->md.uuid[idx];
3860         }
3861         _drbd_uuid_set(mdev, idx, val);
3862 }
3863
3864 /**
3865  * drbd_uuid_new_current() - Creates a new current UUID
3866  * @mdev:       DRBD device.
3867  *
3868  * Creates a new current UUID, and rotates the old current UUID into
3869  * the bitmap slot. Causes an incremental resync upon next connect.
3870  */
3871 void drbd_uuid_new_current(struct drbd_conf *mdev) __must_hold(local)
3872 {
3873         u64 val;
3874         unsigned long long bm_uuid = mdev->ldev->md.uuid[UI_BITMAP];
3875
3876         if (bm_uuid)
3877                 dev_warn(DEV, "bm UUID was already set: %llX\n", bm_uuid);
3878
3879         mdev->ldev->md.uuid[UI_BITMAP] = mdev->ldev->md.uuid[UI_CURRENT];
3880
3881         get_random_bytes(&val, sizeof(u64));
3882         _drbd_uuid_set(mdev, UI_CURRENT, val);
3883         drbd_print_uuids(mdev, "new current UUID");
3884         /* get it to stable storage _now_ */
3885         drbd_md_sync(mdev);
3886 }
3887
3888 void drbd_uuid_set_bm(struct drbd_conf *mdev, u64 val) __must_hold(local)
3889 {
3890         if (mdev->ldev->md.uuid[UI_BITMAP] == 0 && val == 0)
3891                 return;
3892
3893         if (val == 0) {
3894                 drbd_uuid_move_history(mdev);
3895                 mdev->ldev->md.uuid[UI_HISTORY_START] = mdev->ldev->md.uuid[UI_BITMAP];
3896                 mdev->ldev->md.uuid[UI_BITMAP] = 0;
3897         } else {
3898                 unsigned long long bm_uuid = mdev->ldev->md.uuid[UI_BITMAP];
3899                 if (bm_uuid)
3900                         dev_warn(DEV, "bm UUID was already set: %llX\n", bm_uuid);
3901
3902                 mdev->ldev->md.uuid[UI_BITMAP] = val & ~((u64)1);
3903         }
3904         drbd_md_mark_dirty(mdev);
3905 }
3906
3907 /**
3908  * drbd_bmio_set_n_write() - io_fn for drbd_queue_bitmap_io() or drbd_bitmap_io()
3909  * @mdev:       DRBD device.
3910  *
3911  * Sets all bits in the bitmap and writes the whole bitmap to stable storage.
3912  */
3913 int drbd_bmio_set_n_write(struct drbd_conf *mdev)
3914 {
3915         int rv = -EIO;
3916
3917         if (get_ldev_if_state(mdev, D_ATTACHING)) {
3918                 drbd_md_set_flag(mdev, MDF_FULL_SYNC);
3919                 drbd_md_sync(mdev);
3920                 drbd_bm_set_all(mdev);
3921
3922                 rv = drbd_bm_write(mdev);
3923
3924                 if (!rv) {
3925                         drbd_md_clear_flag(mdev, MDF_FULL_SYNC);
3926                         drbd_md_sync(mdev);
3927                 }
3928
3929                 put_ldev(mdev);
3930         }
3931
3932         return rv;
3933 }
3934
3935 /**
3936  * drbd_bmio_clear_n_write() - io_fn for drbd_queue_bitmap_io() or drbd_bitmap_io()
3937  * @mdev:       DRBD device.
3938  *
3939  * Clears all bits in the bitmap and writes the whole bitmap to stable storage.
3940  */
3941 int drbd_bmio_clear_n_write(struct drbd_conf *mdev)
3942 {
3943         int rv = -EIO;
3944
3945         drbd_resume_al(mdev);
3946         if (get_ldev_if_state(mdev, D_ATTACHING)) {
3947                 drbd_bm_clear_all(mdev);
3948                 rv = drbd_bm_write(mdev);
3949                 put_ldev(mdev);
3950         }
3951
3952         return rv;
3953 }
3954
3955 static int w_bitmap_io(struct drbd_conf *mdev, struct drbd_work *w, int unused)
3956 {
3957         struct bm_io_work *work = container_of(w, struct bm_io_work, w);
3958         int rv = -EIO;
3959
3960         D_ASSERT(atomic_read(&mdev->ap_bio_cnt) == 0);
3961
3962         if (get_ldev(mdev)) {
3963                 drbd_bm_lock(mdev, work->why, work->flags);
3964                 rv = work->io_fn(mdev);
3965                 drbd_bm_unlock(mdev);
3966                 put_ldev(mdev);
3967         }
3968
3969         clear_bit(BITMAP_IO, &mdev->flags);
3970         smp_mb__after_clear_bit();
3971         wake_up(&mdev->misc_wait);
3972
3973         if (work->done)
3974                 work->done(mdev, rv);
3975
3976         clear_bit(BITMAP_IO_QUEUED, &mdev->flags);
3977         work->why = NULL;
3978         work->flags = 0;
3979
3980         return 1;
3981 }
3982
3983 void drbd_ldev_destroy(struct drbd_conf *mdev)
3984 {
3985         lc_destroy(mdev->resync);
3986         mdev->resync = NULL;
3987         lc_destroy(mdev->act_log);
3988         mdev->act_log = NULL;
3989         __no_warn(local,
3990                 drbd_free_bc(mdev->ldev);
3991                 mdev->ldev = NULL;);
3992
3993         if (mdev->md_io_tmpp) {
3994                 __free_page(mdev->md_io_tmpp);
3995                 mdev->md_io_tmpp = NULL;
3996         }
3997         clear_bit(GO_DISKLESS, &mdev->flags);
3998 }
3999
4000 static int w_go_diskless(struct drbd_conf *mdev, struct drbd_work *w, int unused)
4001 {
4002         D_ASSERT(mdev->state.disk == D_FAILED);
4003         /* we cannot assert local_cnt == 0 here, as get_ldev_if_state will
4004          * inc/dec it frequently. Once we are D_DISKLESS, no one will touch
4005          * the protected members anymore, though, so once put_ldev reaches zero
4006          * again, it will be safe to free them. */
4007         drbd_force_state(mdev, NS(disk, D_DISKLESS));
4008         return 1;
4009 }
4010
4011 void drbd_go_diskless(struct drbd_conf *mdev)
4012 {
4013         D_ASSERT(mdev->state.disk == D_FAILED);
4014         if (!test_and_set_bit(GO_DISKLESS, &mdev->flags))
4015                 drbd_queue_work(&mdev->data.work, &mdev->go_diskless);
4016 }
4017
4018 /**
4019  * drbd_queue_bitmap_io() - Queues an IO operation on the whole bitmap
4020  * @mdev:       DRBD device.
4021  * @io_fn:      IO callback to be called when bitmap IO is possible
4022  * @done:       callback to be called after the bitmap IO was performed
4023  * @why:        Descriptive text of the reason for doing the IO
4024  *
4025  * While IO on the bitmap happens we freeze application IO thus we ensure
4026  * that drbd_set_out_of_sync() can not be called. This function MAY ONLY be
4027  * called from worker context. It MUST NOT be used while a previous such
4028  * work is still pending!
4029  */
4030 void drbd_queue_bitmap_io(struct drbd_conf *mdev,
4031                           int (*io_fn)(struct drbd_conf *),
4032                           void (*done)(struct drbd_conf *, int),
4033                           char *why, enum bm_flag flags)
4034 {
4035         D_ASSERT(current == mdev->worker.task);
4036
4037         D_ASSERT(!test_bit(BITMAP_IO_QUEUED, &mdev->flags));
4038         D_ASSERT(!test_bit(BITMAP_IO, &mdev->flags));
4039         D_ASSERT(list_empty(&mdev->bm_io_work.w.list));
4040         if (mdev->bm_io_work.why)
4041                 dev_err(DEV, "FIXME going to queue '%s' but '%s' still pending?\n",
4042                         why, mdev->bm_io_work.why);
4043
4044         mdev->bm_io_work.io_fn = io_fn;
4045         mdev->bm_io_work.done = done;
4046         mdev->bm_io_work.why = why;
4047         mdev->bm_io_work.flags = flags;
4048
4049         spin_lock_irq(&mdev->req_lock);
4050         set_bit(BITMAP_IO, &mdev->flags);
4051         if (atomic_read(&mdev->ap_bio_cnt) == 0) {
4052                 if (!test_and_set_bit(BITMAP_IO_QUEUED, &mdev->flags))
4053                         drbd_queue_work(&mdev->data.work, &mdev->bm_io_work.w);
4054         }
4055         spin_unlock_irq(&mdev->req_lock);
4056 }
4057
4058 /**
4059  * drbd_bitmap_io() -  Does an IO operation on the whole bitmap
4060  * @mdev:       DRBD device.
4061  * @io_fn:      IO callback to be called when bitmap IO is possible
4062  * @why:        Descriptive text of the reason for doing the IO
4063  *
4064  * freezes application IO while that the actual IO operations runs. This
4065  * functions MAY NOT be called from worker context.
4066  */
4067 int drbd_bitmap_io(struct drbd_conf *mdev, int (*io_fn)(struct drbd_conf *),
4068                 char *why, enum bm_flag flags)
4069 {
4070         int rv;
4071
4072         D_ASSERT(current != mdev->worker.task);
4073
4074         if ((flags & BM_LOCKED_SET_ALLOWED) == 0)
4075                 drbd_suspend_io(mdev);
4076
4077         drbd_bm_lock(mdev, why, flags);
4078         rv = io_fn(mdev);
4079         drbd_bm_unlock(mdev);
4080
4081         if ((flags & BM_LOCKED_SET_ALLOWED) == 0)
4082                 drbd_resume_io(mdev);
4083
4084         return rv;
4085 }
4086
4087 void drbd_md_set_flag(struct drbd_conf *mdev, int flag) __must_hold(local)
4088 {
4089         if ((mdev->ldev->md.flags & flag) != flag) {
4090                 drbd_md_mark_dirty(mdev);
4091                 mdev->ldev->md.flags |= flag;
4092         }
4093 }
4094
4095 void drbd_md_clear_flag(struct drbd_conf *mdev, int flag) __must_hold(local)
4096 {
4097         if ((mdev->ldev->md.flags & flag) != 0) {
4098                 drbd_md_mark_dirty(mdev);
4099                 mdev->ldev->md.flags &= ~flag;
4100         }
4101 }
4102 int drbd_md_test_flag(struct drbd_backing_dev *bdev, int flag)
4103 {
4104         return (bdev->md.flags & flag) != 0;
4105 }
4106
4107 static void md_sync_timer_fn(unsigned long data)
4108 {
4109         struct drbd_conf *mdev = (struct drbd_conf *) data;
4110
4111         drbd_queue_work_front(&mdev->data.work, &mdev->md_sync_work);
4112 }
4113
4114 static int w_md_sync(struct drbd_conf *mdev, struct drbd_work *w, int unused)
4115 {
4116         dev_warn(DEV, "md_sync_timer expired! Worker calls drbd_md_sync().\n");
4117 #ifdef DEBUG
4118         dev_warn(DEV, "last md_mark_dirty: %s:%u\n",
4119                 mdev->last_md_mark_dirty.func, mdev->last_md_mark_dirty.line);
4120 #endif
4121         drbd_md_sync(mdev);
4122         return 1;
4123 }
4124
4125 #ifdef CONFIG_DRBD_FAULT_INJECTION
4126 /* Fault insertion support including random number generator shamelessly
4127  * stolen from kernel/rcutorture.c */
4128 struct fault_random_state {
4129         unsigned long state;
4130         unsigned long count;
4131 };
4132
4133 #define FAULT_RANDOM_MULT 39916801  /* prime */
4134 #define FAULT_RANDOM_ADD        479001701 /* prime */
4135 #define FAULT_RANDOM_REFRESH 10000
4136
4137 /*
4138  * Crude but fast random-number generator.  Uses a linear congruential
4139  * generator, with occasional help from get_random_bytes().
4140  */
4141 static unsigned long
4142 _drbd_fault_random(struct fault_random_state *rsp)
4143 {
4144         long refresh;
4145
4146         if (!rsp->count--) {
4147                 get_random_bytes(&refresh, sizeof(refresh));
4148                 rsp->state += refresh;
4149                 rsp->count = FAULT_RANDOM_REFRESH;
4150         }
4151         rsp->state = rsp->state * FAULT_RANDOM_MULT + FAULT_RANDOM_ADD;
4152         return swahw32(rsp->state);
4153 }
4154
4155 static char *
4156 _drbd_fault_str(unsigned int type) {
4157         static char *_faults[] = {
4158                 [DRBD_FAULT_MD_WR] = "Meta-data write",
4159                 [DRBD_FAULT_MD_RD] = "Meta-data read",
4160                 [DRBD_FAULT_RS_WR] = "Resync write",
4161                 [DRBD_FAULT_RS_RD] = "Resync read",
4162                 [DRBD_FAULT_DT_WR] = "Data write",
4163                 [DRBD_FAULT_DT_RD] = "Data read",
4164                 [DRBD_FAULT_DT_RA] = "Data read ahead",
4165                 [DRBD_FAULT_BM_ALLOC] = "BM allocation",
4166                 [DRBD_FAULT_AL_EE] = "EE allocation",
4167                 [DRBD_FAULT_RECEIVE] = "receive data corruption",
4168         };
4169
4170         return (type < DRBD_FAULT_MAX) ? _faults[type] : "**Unknown**";
4171 }
4172
4173 unsigned int
4174 _drbd_insert_fault(struct drbd_conf *mdev, unsigned int type)
4175 {
4176         static struct fault_random_state rrs = {0, 0};
4177
4178         unsigned int ret = (
4179                 (fault_devs == 0 ||
4180                         ((1 << mdev_to_minor(mdev)) & fault_devs) != 0) &&
4181                 (((_drbd_fault_random(&rrs) % 100) + 1) <= fault_rate));
4182
4183         if (ret) {
4184                 fault_count++;
4185
4186                 if (__ratelimit(&drbd_ratelimit_state))
4187                         dev_warn(DEV, "***Simulating %s failure\n",
4188                                 _drbd_fault_str(type));
4189         }
4190
4191         return ret;
4192 }
4193 #endif
4194
4195 const char *drbd_buildtag(void)
4196 {
4197         /* DRBD built from external sources has here a reference to the
4198            git hash of the source code. */
4199
4200         static char buildtag[38] = "\0uilt-in";
4201
4202         if (buildtag[0] == 0) {
4203 #ifdef CONFIG_MODULES
4204                 if (THIS_MODULE != NULL)
4205                         sprintf(buildtag, "srcversion: %-24s", THIS_MODULE->srcversion);
4206                 else
4207 #endif
4208                         buildtag[0] = 'b';
4209         }
4210
4211         return buildtag;
4212 }
4213
4214 module_init(drbd_init)
4215 module_exit(drbd_cleanup)
4216
4217 EXPORT_SYMBOL(drbd_conn_str);
4218 EXPORT_SYMBOL(drbd_role_str);
4219 EXPORT_SYMBOL(drbd_disk_str);
4220 EXPORT_SYMBOL(drbd_set_st_err_str);