]> git.karo-electronics.de Git - karo-tx-linux.git/blob - drivers/block/drbd/drbd_main.c
drbd: Move cmdname() out of drbd_int.h
[karo-tx-linux.git] / drivers / block / drbd / drbd_main.c
1 /*
2    drbd.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    Thanks to Carter Burden, Bart Grantham and Gennadiy Nerubayev
11    from Logicworks, Inc. for making SDP replication support possible.
12
13    drbd is free software; you can redistribute it and/or modify
14    it under the terms of the GNU General Public License as published by
15    the Free Software Foundation; either version 2, or (at your option)
16    any later version.
17
18    drbd is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
21    GNU General Public License for more details.
22
23    You should have received a copy of the GNU General Public License
24    along with drbd; see the file COPYING.  If not, write to
25    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
26
27  */
28
29 #include <linux/module.h>
30 #include <linux/drbd.h>
31 #include <asm/uaccess.h>
32 #include <asm/types.h>
33 #include <net/sock.h>
34 #include <linux/ctype.h>
35 #include <linux/mutex.h>
36 #include <linux/fs.h>
37 #include <linux/file.h>
38 #include <linux/proc_fs.h>
39 #include <linux/init.h>
40 #include <linux/mm.h>
41 #include <linux/memcontrol.h>
42 #include <linux/mm_inline.h>
43 #include <linux/slab.h>
44 #include <linux/random.h>
45 #include <linux/reboot.h>
46 #include <linux/notifier.h>
47 #include <linux/kthread.h>
48
49 #define __KERNEL_SYSCALLS__
50 #include <linux/unistd.h>
51 #include <linux/vmalloc.h>
52
53 #include <linux/drbd_limits.h>
54 #include "drbd_int.h"
55 #include "drbd_req.h" /* only for _req_mod in tl_release and tl_clear */
56
57 #include "drbd_vli.h"
58
59 struct after_state_chg_work {
60         struct drbd_work w;
61         union drbd_state os;
62         union drbd_state ns;
63         enum chg_state_flags flags;
64         struct completion *done;
65 };
66
67 static DEFINE_MUTEX(drbd_main_mutex);
68 int drbdd_init(struct drbd_thread *);
69 int drbd_worker(struct drbd_thread *);
70 int drbd_asender(struct drbd_thread *);
71
72 int drbd_init(void);
73 static int drbd_open(struct block_device *bdev, fmode_t mode);
74 static int drbd_release(struct gendisk *gd, fmode_t mode);
75 static int w_after_state_ch(struct drbd_conf *mdev, struct drbd_work *w, int unused);
76 static void after_state_ch(struct drbd_conf *mdev, union drbd_state os,
77                            union drbd_state ns, enum chg_state_flags flags);
78 static int w_md_sync(struct drbd_conf *mdev, struct drbd_work *w, int unused);
79 static void md_sync_timer_fn(unsigned long data);
80 static int w_bitmap_io(struct drbd_conf *mdev, struct drbd_work *w, int unused);
81 static int w_go_diskless(struct drbd_conf *mdev, struct drbd_work *w, int unused);
82
83 MODULE_AUTHOR("Philipp Reisner <phil@linbit.com>, "
84               "Lars Ellenberg <lars@linbit.com>");
85 MODULE_DESCRIPTION("drbd - Distributed Replicated Block Device v" REL_VERSION);
86 MODULE_VERSION(REL_VERSION);
87 MODULE_LICENSE("GPL");
88 MODULE_PARM_DESC(minor_count, "Maximum number of drbd devices ("
89                  __stringify(DRBD_MINOR_COUNT_MIN) "-" __stringify(DRBD_MINOR_COUNT_MAX) ")");
90 MODULE_ALIAS_BLOCKDEV_MAJOR(DRBD_MAJOR);
91
92 #include <linux/moduleparam.h>
93 /* allow_open_on_secondary */
94 MODULE_PARM_DESC(allow_oos, "DONT USE!");
95 /* thanks to these macros, if compiled into the kernel (not-module),
96  * this becomes the boot parameter drbd.minor_count */
97 module_param(minor_count, uint, 0444);
98 module_param(disable_sendpage, bool, 0644);
99 module_param(allow_oos, bool, 0);
100 module_param(cn_idx, uint, 0444);
101 module_param(proc_details, int, 0644);
102
103 #ifdef CONFIG_DRBD_FAULT_INJECTION
104 int enable_faults;
105 int fault_rate;
106 static int fault_count;
107 int fault_devs;
108 /* bitmap of enabled faults */
109 module_param(enable_faults, int, 0664);
110 /* fault rate % value - applies to all enabled faults */
111 module_param(fault_rate, int, 0664);
112 /* count of faults inserted */
113 module_param(fault_count, int, 0664);
114 /* bitmap of devices to insert faults on */
115 module_param(fault_devs, int, 0644);
116 #endif
117
118 /* module parameter, defined */
119 unsigned int minor_count = DRBD_MINOR_COUNT_DEF;
120 int disable_sendpage;
121 int allow_oos;
122 unsigned int cn_idx = CN_IDX_DRBD;
123 int proc_details;       /* Detail level in proc drbd*/
124
125 /* Module parameter for setting the user mode helper program
126  * to run. Default is /sbin/drbdadm */
127 char usermode_helper[80] = "/sbin/drbdadm";
128
129 module_param_string(usermode_helper, usermode_helper, sizeof(usermode_helper), 0644);
130
131 /* in 2.6.x, our device mapping and config info contains our virtual gendisks
132  * as member "struct gendisk *vdisk;"
133  */
134 struct drbd_conf **minor_table;
135 struct list_head drbd_tconns;  /* list of struct drbd_tconn */
136
137 struct kmem_cache *drbd_request_cache;
138 struct kmem_cache *drbd_ee_cache;       /* epoch entries */
139 struct kmem_cache *drbd_bm_ext_cache;   /* bitmap extents */
140 struct kmem_cache *drbd_al_ext_cache;   /* activity log extents */
141 mempool_t *drbd_request_mempool;
142 mempool_t *drbd_ee_mempool;
143
144 /* I do not use a standard mempool, because:
145    1) I want to hand out the pre-allocated objects first.
146    2) I want to be able to interrupt sleeping allocation with a signal.
147    Note: This is a single linked list, the next pointer is the private
148          member of struct page.
149  */
150 struct page *drbd_pp_pool;
151 spinlock_t   drbd_pp_lock;
152 int          drbd_pp_vacant;
153 wait_queue_head_t drbd_pp_wait;
154
155 DEFINE_RATELIMIT_STATE(drbd_ratelimit_state, 5 * HZ, 5);
156
157 static const struct block_device_operations drbd_ops = {
158         .owner =   THIS_MODULE,
159         .open =    drbd_open,
160         .release = drbd_release,
161 };
162
163 #define ARRY_SIZE(A) (sizeof(A)/sizeof(A[0]))
164
165 #ifdef __CHECKER__
166 /* When checking with sparse, and this is an inline function, sparse will
167    give tons of false positives. When this is a real functions sparse works.
168  */
169 int _get_ldev_if_state(struct drbd_conf *mdev, enum drbd_disk_state mins)
170 {
171         int io_allowed;
172
173         atomic_inc(&mdev->local_cnt);
174         io_allowed = (mdev->state.disk >= mins);
175         if (!io_allowed) {
176                 if (atomic_dec_and_test(&mdev->local_cnt))
177                         wake_up(&mdev->misc_wait);
178         }
179         return io_allowed;
180 }
181
182 #endif
183
184 /**
185  * DOC: The transfer log
186  *
187  * The transfer log is a single linked list of &struct drbd_tl_epoch objects.
188  * mdev->tconn->newest_tle points to the head, mdev->tconn->oldest_tle points to the tail
189  * of the list. There is always at least one &struct drbd_tl_epoch object.
190  *
191  * Each &struct drbd_tl_epoch has a circular double linked list of requests
192  * attached.
193  */
194 static int tl_init(struct drbd_conf *mdev)
195 {
196         struct drbd_tl_epoch *b;
197
198         /* during device minor initialization, we may well use GFP_KERNEL */
199         b = kmalloc(sizeof(struct drbd_tl_epoch), GFP_KERNEL);
200         if (!b)
201                 return 0;
202         INIT_LIST_HEAD(&b->requests);
203         INIT_LIST_HEAD(&b->w.list);
204         b->next = NULL;
205         b->br_number = 4711;
206         b->n_writes = 0;
207         b->w.cb = NULL; /* if this is != NULL, we need to dec_ap_pending in tl_clear */
208
209         mdev->tconn->oldest_tle = b;
210         mdev->tconn->newest_tle = b;
211         INIT_LIST_HEAD(&mdev->tconn->out_of_sequence_requests);
212
213         return 1;
214 }
215
216 static void tl_cleanup(struct drbd_conf *mdev)
217 {
218         D_ASSERT(mdev->tconn->oldest_tle == mdev->tconn->newest_tle);
219         D_ASSERT(list_empty(&mdev->tconn->out_of_sequence_requests));
220         kfree(mdev->tconn->oldest_tle);
221         mdev->tconn->oldest_tle = NULL;
222         kfree(mdev->tconn->unused_spare_tle);
223         mdev->tconn->unused_spare_tle = NULL;
224 }
225
226 /**
227  * _tl_add_barrier() - Adds a barrier to the transfer log
228  * @mdev:       DRBD device.
229  * @new:        Barrier to be added before the current head of the TL.
230  *
231  * The caller must hold the req_lock.
232  */
233 void _tl_add_barrier(struct drbd_conf *mdev, struct drbd_tl_epoch *new)
234 {
235         struct drbd_tl_epoch *newest_before;
236
237         INIT_LIST_HEAD(&new->requests);
238         INIT_LIST_HEAD(&new->w.list);
239         new->w.cb = NULL; /* if this is != NULL, we need to dec_ap_pending in tl_clear */
240         new->next = NULL;
241         new->n_writes = 0;
242
243         newest_before = mdev->tconn->newest_tle;
244         /* never send a barrier number == 0, because that is special-cased
245          * when using TCQ for our write ordering code */
246         new->br_number = (newest_before->br_number+1) ?: 1;
247         if (mdev->tconn->newest_tle != new) {
248                 mdev->tconn->newest_tle->next = new;
249                 mdev->tconn->newest_tle = new;
250         }
251 }
252
253 /**
254  * tl_release() - Free or recycle the oldest &struct drbd_tl_epoch object of the TL
255  * @mdev:       DRBD device.
256  * @barrier_nr: Expected identifier of the DRBD write barrier packet.
257  * @set_size:   Expected number of requests before that barrier.
258  *
259  * In case the passed barrier_nr or set_size does not match the oldest
260  * &struct drbd_tl_epoch objects this function will cause a termination
261  * of the connection.
262  */
263 void tl_release(struct drbd_conf *mdev, unsigned int barrier_nr,
264                        unsigned int set_size)
265 {
266         struct drbd_tl_epoch *b, *nob; /* next old barrier */
267         struct list_head *le, *tle;
268         struct drbd_request *r;
269
270         spin_lock_irq(&mdev->tconn->req_lock);
271
272         b = mdev->tconn->oldest_tle;
273
274         /* first some paranoia code */
275         if (b == NULL) {
276                 dev_err(DEV, "BAD! BarrierAck #%u received, but no epoch in tl!?\n",
277                         barrier_nr);
278                 goto bail;
279         }
280         if (b->br_number != barrier_nr) {
281                 dev_err(DEV, "BAD! BarrierAck #%u received, expected #%u!\n",
282                         barrier_nr, b->br_number);
283                 goto bail;
284         }
285         if (b->n_writes != set_size) {
286                 dev_err(DEV, "BAD! BarrierAck #%u received with n_writes=%u, expected n_writes=%u!\n",
287                         barrier_nr, set_size, b->n_writes);
288                 goto bail;
289         }
290
291         /* Clean up list of requests processed during current epoch */
292         list_for_each_safe(le, tle, &b->requests) {
293                 r = list_entry(le, struct drbd_request, tl_requests);
294                 _req_mod(r, BARRIER_ACKED);
295         }
296         /* There could be requests on the list waiting for completion
297            of the write to the local disk. To avoid corruptions of
298            slab's data structures we have to remove the lists head.
299
300            Also there could have been a barrier ack out of sequence, overtaking
301            the write acks - which would be a bug and violating write ordering.
302            To not deadlock in case we lose connection while such requests are
303            still pending, we need some way to find them for the
304            _req_mode(CONNECTION_LOST_WHILE_PENDING).
305
306            These have been list_move'd to the out_of_sequence_requests list in
307            _req_mod(, BARRIER_ACKED) above.
308            */
309         list_del_init(&b->requests);
310
311         nob = b->next;
312         if (test_and_clear_bit(CREATE_BARRIER, &mdev->flags)) {
313                 _tl_add_barrier(mdev, b);
314                 if (nob)
315                         mdev->tconn->oldest_tle = nob;
316                 /* if nob == NULL b was the only barrier, and becomes the new
317                    barrier. Therefore mdev->tconn->oldest_tle points already to b */
318         } else {
319                 D_ASSERT(nob != NULL);
320                 mdev->tconn->oldest_tle = nob;
321                 kfree(b);
322         }
323
324         spin_unlock_irq(&mdev->tconn->req_lock);
325         dec_ap_pending(mdev);
326
327         return;
328
329 bail:
330         spin_unlock_irq(&mdev->tconn->req_lock);
331         drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
332 }
333
334
335 /**
336  * _tl_restart() - Walks the transfer log, and applies an action to all requests
337  * @mdev:       DRBD device.
338  * @what:       The action/event to perform with all request objects
339  *
340  * @what might be one of CONNECTION_LOST_WHILE_PENDING, RESEND, FAIL_FROZEN_DISK_IO,
341  * RESTART_FROZEN_DISK_IO.
342  */
343 static void _tl_restart(struct drbd_conf *mdev, enum drbd_req_event what)
344 {
345         struct drbd_tl_epoch *b, *tmp, **pn;
346         struct list_head *le, *tle, carry_reads;
347         struct drbd_request *req;
348         int rv, n_writes, n_reads;
349
350         b = mdev->tconn->oldest_tle;
351         pn = &mdev->tconn->oldest_tle;
352         while (b) {
353                 n_writes = 0;
354                 n_reads = 0;
355                 INIT_LIST_HEAD(&carry_reads);
356                 list_for_each_safe(le, tle, &b->requests) {
357                         req = list_entry(le, struct drbd_request, tl_requests);
358                         rv = _req_mod(req, what);
359
360                         n_writes += (rv & MR_WRITE) >> MR_WRITE_SHIFT;
361                         n_reads  += (rv & MR_READ) >> MR_READ_SHIFT;
362                 }
363                 tmp = b->next;
364
365                 if (n_writes) {
366                         if (what == RESEND) {
367                                 b->n_writes = n_writes;
368                                 if (b->w.cb == NULL) {
369                                         b->w.cb = w_send_barrier;
370                                         inc_ap_pending(mdev);
371                                         set_bit(CREATE_BARRIER, &mdev->flags);
372                                 }
373
374                                 drbd_queue_work(&mdev->tconn->data.work, &b->w);
375                         }
376                         pn = &b->next;
377                 } else {
378                         if (n_reads)
379                                 list_add(&carry_reads, &b->requests);
380                         /* there could still be requests on that ring list,
381                          * in case local io is still pending */
382                         list_del(&b->requests);
383
384                         /* dec_ap_pending corresponding to queue_barrier.
385                          * the newest barrier may not have been queued yet,
386                          * in which case w.cb is still NULL. */
387                         if (b->w.cb != NULL)
388                                 dec_ap_pending(mdev);
389
390                         if (b == mdev->tconn->newest_tle) {
391                                 /* recycle, but reinit! */
392                                 D_ASSERT(tmp == NULL);
393                                 INIT_LIST_HEAD(&b->requests);
394                                 list_splice(&carry_reads, &b->requests);
395                                 INIT_LIST_HEAD(&b->w.list);
396                                 b->w.cb = NULL;
397                                 b->br_number = net_random();
398                                 b->n_writes = 0;
399
400                                 *pn = b;
401                                 break;
402                         }
403                         *pn = tmp;
404                         kfree(b);
405                 }
406                 b = tmp;
407                 list_splice(&carry_reads, &b->requests);
408         }
409 }
410
411
412 /**
413  * tl_clear() - Clears all requests and &struct drbd_tl_epoch objects out of the TL
414  * @mdev:       DRBD device.
415  *
416  * This is called after the connection to the peer was lost. The storage covered
417  * by the requests on the transfer gets marked as our of sync. Called from the
418  * receiver thread and the worker thread.
419  */
420 void tl_clear(struct drbd_conf *mdev)
421 {
422         struct list_head *le, *tle;
423         struct drbd_request *r;
424
425         spin_lock_irq(&mdev->tconn->req_lock);
426
427         _tl_restart(mdev, CONNECTION_LOST_WHILE_PENDING);
428
429         /* we expect this list to be empty. */
430         D_ASSERT(list_empty(&mdev->tconn->out_of_sequence_requests));
431
432         /* but just in case, clean it up anyways! */
433         list_for_each_safe(le, tle, &mdev->tconn->out_of_sequence_requests) {
434                 r = list_entry(le, struct drbd_request, tl_requests);
435                 /* It would be nice to complete outside of spinlock.
436                  * But this is easier for now. */
437                 _req_mod(r, CONNECTION_LOST_WHILE_PENDING);
438         }
439
440         /* ensure bit indicating barrier is required is clear */
441         clear_bit(CREATE_BARRIER, &mdev->flags);
442
443         spin_unlock_irq(&mdev->tconn->req_lock);
444 }
445
446 void tl_restart(struct drbd_conf *mdev, enum drbd_req_event what)
447 {
448         spin_lock_irq(&mdev->tconn->req_lock);
449         _tl_restart(mdev, what);
450         spin_unlock_irq(&mdev->tconn->req_lock);
451 }
452
453 /**
454  * cl_wide_st_chg() - true if the state change is a cluster wide one
455  * @mdev:       DRBD device.
456  * @os:         old (current) state.
457  * @ns:         new (wanted) state.
458  */
459 static int cl_wide_st_chg(struct drbd_conf *mdev,
460                           union drbd_state os, union drbd_state ns)
461 {
462         return (os.conn >= C_CONNECTED && ns.conn >= C_CONNECTED &&
463                  ((os.role != R_PRIMARY && ns.role == R_PRIMARY) ||
464                   (os.conn != C_STARTING_SYNC_T && ns.conn == C_STARTING_SYNC_T) ||
465                   (os.conn != C_STARTING_SYNC_S && ns.conn == C_STARTING_SYNC_S) ||
466                   (os.disk != D_DISKLESS && ns.disk == D_DISKLESS))) ||
467                 (os.conn >= C_CONNECTED && ns.conn == C_DISCONNECTING) ||
468                 (os.conn == C_CONNECTED && ns.conn == C_VERIFY_S);
469 }
470
471 enum drbd_state_rv
472 drbd_change_state(struct drbd_conf *mdev, enum chg_state_flags f,
473                   union drbd_state mask, union drbd_state val)
474 {
475         unsigned long flags;
476         union drbd_state os, ns;
477         enum drbd_state_rv rv;
478
479         spin_lock_irqsave(&mdev->tconn->req_lock, flags);
480         os = mdev->state;
481         ns.i = (os.i & ~mask.i) | val.i;
482         rv = _drbd_set_state(mdev, ns, f, NULL);
483         ns = mdev->state;
484         spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
485
486         return rv;
487 }
488
489 /**
490  * drbd_force_state() - Impose a change which happens outside our control on our state
491  * @mdev:       DRBD device.
492  * @mask:       mask of state bits to change.
493  * @val:        value of new state bits.
494  */
495 void drbd_force_state(struct drbd_conf *mdev,
496         union drbd_state mask, union drbd_state val)
497 {
498         drbd_change_state(mdev, CS_HARD, mask, val);
499 }
500
501 static enum drbd_state_rv is_valid_state(struct drbd_conf *, union drbd_state);
502 static enum drbd_state_rv is_valid_state_transition(struct drbd_conf *,
503                                                     union drbd_state,
504                                                     union drbd_state);
505 static union drbd_state sanitize_state(struct drbd_conf *mdev, union drbd_state os,
506                                        union drbd_state ns, const char **warn_sync_abort);
507 int drbd_send_state_req(struct drbd_conf *,
508                         union drbd_state, union drbd_state);
509
510 static enum drbd_state_rv
511 _req_st_cond(struct drbd_conf *mdev, union drbd_state mask,
512              union drbd_state val)
513 {
514         union drbd_state os, ns;
515         unsigned long flags;
516         enum drbd_state_rv rv;
517
518         if (test_and_clear_bit(CL_ST_CHG_SUCCESS, &mdev->flags))
519                 return SS_CW_SUCCESS;
520
521         if (test_and_clear_bit(CL_ST_CHG_FAIL, &mdev->flags))
522                 return SS_CW_FAILED_BY_PEER;
523
524         rv = 0;
525         spin_lock_irqsave(&mdev->tconn->req_lock, flags);
526         os = mdev->state;
527         ns.i = (os.i & ~mask.i) | val.i;
528         ns = sanitize_state(mdev, os, ns, NULL);
529
530         if (!cl_wide_st_chg(mdev, os, ns))
531                 rv = SS_CW_NO_NEED;
532         if (!rv) {
533                 rv = is_valid_state(mdev, ns);
534                 if (rv == SS_SUCCESS) {
535                         rv = is_valid_state_transition(mdev, ns, os);
536                         if (rv == SS_SUCCESS)
537                                 rv = SS_UNKNOWN_ERROR; /* cont waiting, otherwise fail. */
538                 }
539         }
540         spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
541
542         return rv;
543 }
544
545 /**
546  * drbd_req_state() - Perform an eventually cluster wide state change
547  * @mdev:       DRBD device.
548  * @mask:       mask of state bits to change.
549  * @val:        value of new state bits.
550  * @f:          flags
551  *
552  * Should not be called directly, use drbd_request_state() or
553  * _drbd_request_state().
554  */
555 static enum drbd_state_rv
556 drbd_req_state(struct drbd_conf *mdev, union drbd_state mask,
557                union drbd_state val, enum chg_state_flags f)
558 {
559         struct completion done;
560         unsigned long flags;
561         union drbd_state os, ns;
562         enum drbd_state_rv rv;
563
564         init_completion(&done);
565
566         if (f & CS_SERIALIZE)
567                 mutex_lock(&mdev->state_mutex);
568
569         spin_lock_irqsave(&mdev->tconn->req_lock, flags);
570         os = mdev->state;
571         ns.i = (os.i & ~mask.i) | val.i;
572         ns = sanitize_state(mdev, os, ns, NULL);
573
574         if (cl_wide_st_chg(mdev, os, ns)) {
575                 rv = is_valid_state(mdev, ns);
576                 if (rv == SS_SUCCESS)
577                         rv = is_valid_state_transition(mdev, ns, os);
578                 spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
579
580                 if (rv < SS_SUCCESS) {
581                         if (f & CS_VERBOSE)
582                                 print_st_err(mdev, os, ns, rv);
583                         goto abort;
584                 }
585
586                 drbd_state_lock(mdev);
587                 if (!drbd_send_state_req(mdev, mask, val)) {
588                         drbd_state_unlock(mdev);
589                         rv = SS_CW_FAILED_BY_PEER;
590                         if (f & CS_VERBOSE)
591                                 print_st_err(mdev, os, ns, rv);
592                         goto abort;
593                 }
594
595                 wait_event(mdev->state_wait,
596                         (rv = _req_st_cond(mdev, mask, val)));
597
598                 if (rv < SS_SUCCESS) {
599                         drbd_state_unlock(mdev);
600                         if (f & CS_VERBOSE)
601                                 print_st_err(mdev, os, ns, rv);
602                         goto abort;
603                 }
604                 spin_lock_irqsave(&mdev->tconn->req_lock, flags);
605                 os = mdev->state;
606                 ns.i = (os.i & ~mask.i) | val.i;
607                 rv = _drbd_set_state(mdev, ns, f, &done);
608                 drbd_state_unlock(mdev);
609         } else {
610                 rv = _drbd_set_state(mdev, ns, f, &done);
611         }
612
613         spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
614
615         if (f & CS_WAIT_COMPLETE && rv == SS_SUCCESS) {
616                 D_ASSERT(current != mdev->tconn->worker.task);
617                 wait_for_completion(&done);
618         }
619
620 abort:
621         if (f & CS_SERIALIZE)
622                 mutex_unlock(&mdev->state_mutex);
623
624         return rv;
625 }
626
627 /**
628  * _drbd_request_state() - Request a state change (with flags)
629  * @mdev:       DRBD device.
630  * @mask:       mask of state bits to change.
631  * @val:        value of new state bits.
632  * @f:          flags
633  *
634  * Cousin of drbd_request_state(), useful with the CS_WAIT_COMPLETE
635  * flag, or when logging of failed state change requests is not desired.
636  */
637 enum drbd_state_rv
638 _drbd_request_state(struct drbd_conf *mdev, union drbd_state mask,
639                     union drbd_state val, enum chg_state_flags f)
640 {
641         enum drbd_state_rv rv;
642
643         wait_event(mdev->state_wait,
644                    (rv = drbd_req_state(mdev, mask, val, f)) != SS_IN_TRANSIENT_STATE);
645
646         return rv;
647 }
648
649 static void print_st(struct drbd_conf *mdev, char *name, union drbd_state ns)
650 {
651         dev_err(DEV, " %s = { cs:%s ro:%s/%s ds:%s/%s %c%c%c%c }\n",
652             name,
653             drbd_conn_str(ns.conn),
654             drbd_role_str(ns.role),
655             drbd_role_str(ns.peer),
656             drbd_disk_str(ns.disk),
657             drbd_disk_str(ns.pdsk),
658             is_susp(ns) ? 's' : 'r',
659             ns.aftr_isp ? 'a' : '-',
660             ns.peer_isp ? 'p' : '-',
661             ns.user_isp ? 'u' : '-'
662             );
663 }
664
665 void print_st_err(struct drbd_conf *mdev, union drbd_state os,
666                   union drbd_state ns, enum drbd_state_rv err)
667 {
668         if (err == SS_IN_TRANSIENT_STATE)
669                 return;
670         dev_err(DEV, "State change failed: %s\n", drbd_set_st_err_str(err));
671         print_st(mdev, " state", os);
672         print_st(mdev, "wanted", ns);
673 }
674
675
676 /**
677  * is_valid_state() - Returns an SS_ error code if ns is not valid
678  * @mdev:       DRBD device.
679  * @ns:         State to consider.
680  */
681 static enum drbd_state_rv
682 is_valid_state(struct drbd_conf *mdev, union drbd_state ns)
683 {
684         /* See drbd_state_sw_errors in drbd_strings.c */
685
686         enum drbd_fencing_p fp;
687         enum drbd_state_rv rv = SS_SUCCESS;
688
689         fp = FP_DONT_CARE;
690         if (get_ldev(mdev)) {
691                 fp = mdev->ldev->dc.fencing;
692                 put_ldev(mdev);
693         }
694
695         if (get_net_conf(mdev->tconn)) {
696                 if (!mdev->tconn->net_conf->two_primaries &&
697                     ns.role == R_PRIMARY && ns.peer == R_PRIMARY)
698                         rv = SS_TWO_PRIMARIES;
699                 put_net_conf(mdev->tconn);
700         }
701
702         if (rv <= 0)
703                 /* already found a reason to abort */;
704         else if (ns.role == R_SECONDARY && mdev->open_cnt)
705                 rv = SS_DEVICE_IN_USE;
706
707         else if (ns.role == R_PRIMARY && ns.conn < C_CONNECTED && ns.disk < D_UP_TO_DATE)
708                 rv = SS_NO_UP_TO_DATE_DISK;
709
710         else if (fp >= FP_RESOURCE &&
711                  ns.role == R_PRIMARY && ns.conn < C_CONNECTED && ns.pdsk >= D_UNKNOWN)
712                 rv = SS_PRIMARY_NOP;
713
714         else if (ns.role == R_PRIMARY && ns.disk <= D_INCONSISTENT && ns.pdsk <= D_INCONSISTENT)
715                 rv = SS_NO_UP_TO_DATE_DISK;
716
717         else if (ns.conn > C_CONNECTED && ns.disk < D_INCONSISTENT)
718                 rv = SS_NO_LOCAL_DISK;
719
720         else if (ns.conn > C_CONNECTED && ns.pdsk < D_INCONSISTENT)
721                 rv = SS_NO_REMOTE_DISK;
722
723         else if (ns.conn > C_CONNECTED && ns.disk < D_UP_TO_DATE && ns.pdsk < D_UP_TO_DATE)
724                 rv = SS_NO_UP_TO_DATE_DISK;
725
726         else if ((ns.conn == C_CONNECTED ||
727                   ns.conn == C_WF_BITMAP_S ||
728                   ns.conn == C_SYNC_SOURCE ||
729                   ns.conn == C_PAUSED_SYNC_S) &&
730                   ns.disk == D_OUTDATED)
731                 rv = SS_CONNECTED_OUTDATES;
732
733         else if ((ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T) &&
734                  (mdev->sync_conf.verify_alg[0] == 0))
735                 rv = SS_NO_VERIFY_ALG;
736
737         else if ((ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T) &&
738                   mdev->tconn->agreed_pro_version < 88)
739                 rv = SS_NOT_SUPPORTED;
740
741         else if (ns.conn >= C_CONNECTED && ns.pdsk == D_UNKNOWN)
742                 rv = SS_CONNECTED_OUTDATES;
743
744         return rv;
745 }
746
747 /**
748  * is_valid_state_transition() - Returns an SS_ error code if the state transition is not possible
749  * @mdev:       DRBD device.
750  * @ns:         new state.
751  * @os:         old state.
752  */
753 static enum drbd_state_rv
754 is_valid_state_transition(struct drbd_conf *mdev, union drbd_state ns,
755                           union drbd_state os)
756 {
757         enum drbd_state_rv rv = SS_SUCCESS;
758
759         if ((ns.conn == C_STARTING_SYNC_T || ns.conn == C_STARTING_SYNC_S) &&
760             os.conn > C_CONNECTED)
761                 rv = SS_RESYNC_RUNNING;
762
763         if (ns.conn == C_DISCONNECTING && os.conn == C_STANDALONE)
764                 rv = SS_ALREADY_STANDALONE;
765
766         if (ns.disk > D_ATTACHING && os.disk == D_DISKLESS)
767                 rv = SS_IS_DISKLESS;
768
769         if (ns.conn == C_WF_CONNECTION && os.conn < C_UNCONNECTED)
770                 rv = SS_NO_NET_CONFIG;
771
772         if (ns.disk == D_OUTDATED && os.disk < D_OUTDATED && os.disk != D_ATTACHING)
773                 rv = SS_LOWER_THAN_OUTDATED;
774
775         if (ns.conn == C_DISCONNECTING && os.conn == C_UNCONNECTED)
776                 rv = SS_IN_TRANSIENT_STATE;
777
778         if (ns.conn == os.conn && ns.conn == C_WF_REPORT_PARAMS)
779                 rv = SS_IN_TRANSIENT_STATE;
780
781         if ((ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T) && os.conn < C_CONNECTED)
782                 rv = SS_NEED_CONNECTION;
783
784         if ((ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T) &&
785             ns.conn != os.conn && os.conn > C_CONNECTED)
786                 rv = SS_RESYNC_RUNNING;
787
788         if ((ns.conn == C_STARTING_SYNC_S || ns.conn == C_STARTING_SYNC_T) &&
789             os.conn < C_CONNECTED)
790                 rv = SS_NEED_CONNECTION;
791
792         if ((ns.conn == C_SYNC_TARGET || ns.conn == C_SYNC_SOURCE)
793             && os.conn < C_WF_REPORT_PARAMS)
794                 rv = SS_NEED_CONNECTION; /* No NetworkFailure -> SyncTarget etc... */
795
796         return rv;
797 }
798
799 /**
800  * sanitize_state() - Resolves implicitly necessary additional changes to a state transition
801  * @mdev:       DRBD device.
802  * @os:         old state.
803  * @ns:         new state.
804  * @warn_sync_abort:
805  *
806  * When we loose connection, we have to set the state of the peers disk (pdsk)
807  * to D_UNKNOWN. This rule and many more along those lines are in this function.
808  */
809 static union drbd_state sanitize_state(struct drbd_conf *mdev, union drbd_state os,
810                                        union drbd_state ns, const char **warn_sync_abort)
811 {
812         enum drbd_fencing_p fp;
813         enum drbd_disk_state disk_min, disk_max, pdsk_min, pdsk_max;
814
815         fp = FP_DONT_CARE;
816         if (get_ldev(mdev)) {
817                 fp = mdev->ldev->dc.fencing;
818                 put_ldev(mdev);
819         }
820
821         /* Disallow Network errors to configure a device's network part */
822         if ((ns.conn >= C_TIMEOUT && ns.conn <= C_TEAR_DOWN) &&
823             os.conn <= C_DISCONNECTING)
824                 ns.conn = os.conn;
825
826         /* After a network error (+C_TEAR_DOWN) only C_UNCONNECTED or C_DISCONNECTING can follow.
827          * If you try to go into some Sync* state, that shall fail (elsewhere). */
828         if (os.conn >= C_TIMEOUT && os.conn <= C_TEAR_DOWN &&
829             ns.conn != C_UNCONNECTED && ns.conn != C_DISCONNECTING && ns.conn <= C_TEAR_DOWN)
830                 ns.conn = os.conn;
831
832         /* we cannot fail (again) if we already detached */
833         if (ns.disk == D_FAILED && os.disk == D_DISKLESS)
834                 ns.disk = D_DISKLESS;
835
836         /* if we are only D_ATTACHING yet,
837          * we can (and should) go directly to D_DISKLESS. */
838         if (ns.disk == D_FAILED && os.disk == D_ATTACHING)
839                 ns.disk = D_DISKLESS;
840
841         /* After C_DISCONNECTING only C_STANDALONE may follow */
842         if (os.conn == C_DISCONNECTING && ns.conn != C_STANDALONE)
843                 ns.conn = os.conn;
844
845         if (ns.conn < C_CONNECTED) {
846                 ns.peer_isp = 0;
847                 ns.peer = R_UNKNOWN;
848                 if (ns.pdsk > D_UNKNOWN || ns.pdsk < D_INCONSISTENT)
849                         ns.pdsk = D_UNKNOWN;
850         }
851
852         /* Clear the aftr_isp when becoming unconfigured */
853         if (ns.conn == C_STANDALONE && ns.disk == D_DISKLESS && ns.role == R_SECONDARY)
854                 ns.aftr_isp = 0;
855
856         /* Abort resync if a disk fails/detaches */
857         if (os.conn > C_CONNECTED && ns.conn > C_CONNECTED &&
858             (ns.disk <= D_FAILED || ns.pdsk <= D_FAILED)) {
859                 if (warn_sync_abort)
860                         *warn_sync_abort =
861                                 os.conn == C_VERIFY_S || os.conn == C_VERIFY_T ?
862                                 "Online-verify" : "Resync";
863                 ns.conn = C_CONNECTED;
864         }
865
866         /* Connection breaks down before we finished "Negotiating" */
867         if (ns.conn < C_CONNECTED && ns.disk == D_NEGOTIATING &&
868             get_ldev_if_state(mdev, D_NEGOTIATING)) {
869                 if (mdev->ed_uuid == mdev->ldev->md.uuid[UI_CURRENT]) {
870                         ns.disk = mdev->new_state_tmp.disk;
871                         ns.pdsk = mdev->new_state_tmp.pdsk;
872                 } else {
873                         dev_alert(DEV, "Connection lost while negotiating, no data!\n");
874                         ns.disk = D_DISKLESS;
875                         ns.pdsk = D_UNKNOWN;
876                 }
877                 put_ldev(mdev);
878         }
879
880         /* D_CONSISTENT and D_OUTDATED vanish when we get connected */
881         if (ns.conn >= C_CONNECTED && ns.conn < C_AHEAD) {
882                 if (ns.disk == D_CONSISTENT || ns.disk == D_OUTDATED)
883                         ns.disk = D_UP_TO_DATE;
884                 if (ns.pdsk == D_CONSISTENT || ns.pdsk == D_OUTDATED)
885                         ns.pdsk = D_UP_TO_DATE;
886         }
887
888         /* Implications of the connection stat on the disk states */
889         disk_min = D_DISKLESS;
890         disk_max = D_UP_TO_DATE;
891         pdsk_min = D_INCONSISTENT;
892         pdsk_max = D_UNKNOWN;
893         switch ((enum drbd_conns)ns.conn) {
894         case C_WF_BITMAP_T:
895         case C_PAUSED_SYNC_T:
896         case C_STARTING_SYNC_T:
897         case C_WF_SYNC_UUID:
898         case C_BEHIND:
899                 disk_min = D_INCONSISTENT;
900                 disk_max = D_OUTDATED;
901                 pdsk_min = D_UP_TO_DATE;
902                 pdsk_max = D_UP_TO_DATE;
903                 break;
904         case C_VERIFY_S:
905         case C_VERIFY_T:
906                 disk_min = D_UP_TO_DATE;
907                 disk_max = D_UP_TO_DATE;
908                 pdsk_min = D_UP_TO_DATE;
909                 pdsk_max = D_UP_TO_DATE;
910                 break;
911         case C_CONNECTED:
912                 disk_min = D_DISKLESS;
913                 disk_max = D_UP_TO_DATE;
914                 pdsk_min = D_DISKLESS;
915                 pdsk_max = D_UP_TO_DATE;
916                 break;
917         case C_WF_BITMAP_S:
918         case C_PAUSED_SYNC_S:
919         case C_STARTING_SYNC_S:
920         case C_AHEAD:
921                 disk_min = D_UP_TO_DATE;
922                 disk_max = D_UP_TO_DATE;
923                 pdsk_min = D_INCONSISTENT;
924                 pdsk_max = D_CONSISTENT; /* D_OUTDATED would be nice. But explicit outdate necessary*/
925                 break;
926         case C_SYNC_TARGET:
927                 disk_min = D_INCONSISTENT;
928                 disk_max = D_INCONSISTENT;
929                 pdsk_min = D_UP_TO_DATE;
930                 pdsk_max = D_UP_TO_DATE;
931                 break;
932         case C_SYNC_SOURCE:
933                 disk_min = D_UP_TO_DATE;
934                 disk_max = D_UP_TO_DATE;
935                 pdsk_min = D_INCONSISTENT;
936                 pdsk_max = D_INCONSISTENT;
937                 break;
938         case C_STANDALONE:
939         case C_DISCONNECTING:
940         case C_UNCONNECTED:
941         case C_TIMEOUT:
942         case C_BROKEN_PIPE:
943         case C_NETWORK_FAILURE:
944         case C_PROTOCOL_ERROR:
945         case C_TEAR_DOWN:
946         case C_WF_CONNECTION:
947         case C_WF_REPORT_PARAMS:
948         case C_MASK:
949                 break;
950         }
951         if (ns.disk > disk_max)
952                 ns.disk = disk_max;
953
954         if (ns.disk < disk_min) {
955                 dev_warn(DEV, "Implicitly set disk from %s to %s\n",
956                          drbd_disk_str(ns.disk), drbd_disk_str(disk_min));
957                 ns.disk = disk_min;
958         }
959         if (ns.pdsk > pdsk_max)
960                 ns.pdsk = pdsk_max;
961
962         if (ns.pdsk < pdsk_min) {
963                 dev_warn(DEV, "Implicitly set pdsk from %s to %s\n",
964                          drbd_disk_str(ns.pdsk), drbd_disk_str(pdsk_min));
965                 ns.pdsk = pdsk_min;
966         }
967
968         if (fp == FP_STONITH &&
969             (ns.role == R_PRIMARY && ns.conn < C_CONNECTED && ns.pdsk > D_OUTDATED) &&
970             !(os.role == R_PRIMARY && os.conn < C_CONNECTED && os.pdsk > D_OUTDATED))
971                 ns.susp_fen = 1; /* Suspend IO while fence-peer handler runs (peer lost) */
972
973         if (mdev->sync_conf.on_no_data == OND_SUSPEND_IO &&
974             (ns.role == R_PRIMARY && ns.disk < D_UP_TO_DATE && ns.pdsk < D_UP_TO_DATE) &&
975             !(os.role == R_PRIMARY && os.disk < D_UP_TO_DATE && os.pdsk < D_UP_TO_DATE))
976                 ns.susp_nod = 1; /* Suspend IO while no data available (no accessible data available) */
977
978         if (ns.aftr_isp || ns.peer_isp || ns.user_isp) {
979                 if (ns.conn == C_SYNC_SOURCE)
980                         ns.conn = C_PAUSED_SYNC_S;
981                 if (ns.conn == C_SYNC_TARGET)
982                         ns.conn = C_PAUSED_SYNC_T;
983         } else {
984                 if (ns.conn == C_PAUSED_SYNC_S)
985                         ns.conn = C_SYNC_SOURCE;
986                 if (ns.conn == C_PAUSED_SYNC_T)
987                         ns.conn = C_SYNC_TARGET;
988         }
989
990         return ns;
991 }
992
993 /* helper for __drbd_set_state */
994 static void set_ov_position(struct drbd_conf *mdev, enum drbd_conns cs)
995 {
996         if (mdev->tconn->agreed_pro_version < 90)
997                 mdev->ov_start_sector = 0;
998         mdev->rs_total = drbd_bm_bits(mdev);
999         mdev->ov_position = 0;
1000         if (cs == C_VERIFY_T) {
1001                 /* starting online verify from an arbitrary position
1002                  * does not fit well into the existing protocol.
1003                  * on C_VERIFY_T, we initialize ov_left and friends
1004                  * implicitly in receive_DataRequest once the
1005                  * first P_OV_REQUEST is received */
1006                 mdev->ov_start_sector = ~(sector_t)0;
1007         } else {
1008                 unsigned long bit = BM_SECT_TO_BIT(mdev->ov_start_sector);
1009                 if (bit >= mdev->rs_total) {
1010                         mdev->ov_start_sector =
1011                                 BM_BIT_TO_SECT(mdev->rs_total - 1);
1012                         mdev->rs_total = 1;
1013                 } else
1014                         mdev->rs_total -= bit;
1015                 mdev->ov_position = mdev->ov_start_sector;
1016         }
1017         mdev->ov_left = mdev->rs_total;
1018 }
1019
1020 static void drbd_resume_al(struct drbd_conf *mdev)
1021 {
1022         if (test_and_clear_bit(AL_SUSPENDED, &mdev->flags))
1023                 dev_info(DEV, "Resumed AL updates\n");
1024 }
1025
1026 /**
1027  * __drbd_set_state() - Set a new DRBD state
1028  * @mdev:       DRBD device.
1029  * @ns:         new state.
1030  * @flags:      Flags
1031  * @done:       Optional completion, that will get completed after the after_state_ch() finished
1032  *
1033  * Caller needs to hold req_lock, and global_state_lock. Do not call directly.
1034  */
1035 enum drbd_state_rv
1036 __drbd_set_state(struct drbd_conf *mdev, union drbd_state ns,
1037                  enum chg_state_flags flags, struct completion *done)
1038 {
1039         union drbd_state os;
1040         enum drbd_state_rv rv = SS_SUCCESS;
1041         const char *warn_sync_abort = NULL;
1042         struct after_state_chg_work *ascw;
1043
1044         os = mdev->state;
1045
1046         ns = sanitize_state(mdev, os, ns, &warn_sync_abort);
1047
1048         if (ns.i == os.i)
1049                 return SS_NOTHING_TO_DO;
1050
1051         if (!(flags & CS_HARD)) {
1052                 /*  pre-state-change checks ; only look at ns  */
1053                 /* See drbd_state_sw_errors in drbd_strings.c */
1054
1055                 rv = is_valid_state(mdev, ns);
1056                 if (rv < SS_SUCCESS) {
1057                         /* If the old state was illegal as well, then let
1058                            this happen...*/
1059
1060                         if (is_valid_state(mdev, os) == rv)
1061                                 rv = is_valid_state_transition(mdev, ns, os);
1062                 } else
1063                         rv = is_valid_state_transition(mdev, ns, os);
1064         }
1065
1066         if (rv < SS_SUCCESS) {
1067                 if (flags & CS_VERBOSE)
1068                         print_st_err(mdev, os, ns, rv);
1069                 return rv;
1070         }
1071
1072         if (warn_sync_abort)
1073                 dev_warn(DEV, "%s aborted.\n", warn_sync_abort);
1074
1075         {
1076         char *pbp, pb[300];
1077         pbp = pb;
1078         *pbp = 0;
1079         if (ns.role != os.role)
1080                 pbp += sprintf(pbp, "role( %s -> %s ) ",
1081                                drbd_role_str(os.role),
1082                                drbd_role_str(ns.role));
1083         if (ns.peer != os.peer)
1084                 pbp += sprintf(pbp, "peer( %s -> %s ) ",
1085                                drbd_role_str(os.peer),
1086                                drbd_role_str(ns.peer));
1087         if (ns.conn != os.conn)
1088                 pbp += sprintf(pbp, "conn( %s -> %s ) ",
1089                                drbd_conn_str(os.conn),
1090                                drbd_conn_str(ns.conn));
1091         if (ns.disk != os.disk)
1092                 pbp += sprintf(pbp, "disk( %s -> %s ) ",
1093                                drbd_disk_str(os.disk),
1094                                drbd_disk_str(ns.disk));
1095         if (ns.pdsk != os.pdsk)
1096                 pbp += sprintf(pbp, "pdsk( %s -> %s ) ",
1097                                drbd_disk_str(os.pdsk),
1098                                drbd_disk_str(ns.pdsk));
1099         if (is_susp(ns) != is_susp(os))
1100                 pbp += sprintf(pbp, "susp( %d -> %d ) ",
1101                                is_susp(os),
1102                                is_susp(ns));
1103         if (ns.aftr_isp != os.aftr_isp)
1104                 pbp += sprintf(pbp, "aftr_isp( %d -> %d ) ",
1105                                os.aftr_isp,
1106                                ns.aftr_isp);
1107         if (ns.peer_isp != os.peer_isp)
1108                 pbp += sprintf(pbp, "peer_isp( %d -> %d ) ",
1109                                os.peer_isp,
1110                                ns.peer_isp);
1111         if (ns.user_isp != os.user_isp)
1112                 pbp += sprintf(pbp, "user_isp( %d -> %d ) ",
1113                                os.user_isp,
1114                                ns.user_isp);
1115         dev_info(DEV, "%s\n", pb);
1116         }
1117
1118         /* solve the race between becoming unconfigured,
1119          * worker doing the cleanup, and
1120          * admin reconfiguring us:
1121          * on (re)configure, first set CONFIG_PENDING,
1122          * then wait for a potentially exiting worker,
1123          * start the worker, and schedule one no_op.
1124          * then proceed with configuration.
1125          */
1126         if (ns.disk == D_DISKLESS &&
1127             ns.conn == C_STANDALONE &&
1128             ns.role == R_SECONDARY &&
1129             !test_and_set_bit(CONFIG_PENDING, &mdev->flags))
1130                 set_bit(DEVICE_DYING, &mdev->flags);
1131
1132         /* if we are going -> D_FAILED or D_DISKLESS, grab one extra reference
1133          * on the ldev here, to be sure the transition -> D_DISKLESS resp.
1134          * drbd_ldev_destroy() won't happen before our corresponding
1135          * after_state_ch works run, where we put_ldev again. */
1136         if ((os.disk != D_FAILED && ns.disk == D_FAILED) ||
1137             (os.disk != D_DISKLESS && ns.disk == D_DISKLESS))
1138                 atomic_inc(&mdev->local_cnt);
1139
1140         mdev->state = ns;
1141
1142         if (os.disk == D_ATTACHING && ns.disk >= D_NEGOTIATING)
1143                 drbd_print_uuids(mdev, "attached to UUIDs");
1144
1145         wake_up(&mdev->misc_wait);
1146         wake_up(&mdev->state_wait);
1147
1148         /* aborted verify run. log the last position */
1149         if ((os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) &&
1150             ns.conn < C_CONNECTED) {
1151                 mdev->ov_start_sector =
1152                         BM_BIT_TO_SECT(drbd_bm_bits(mdev) - mdev->ov_left);
1153                 dev_info(DEV, "Online Verify reached sector %llu\n",
1154                         (unsigned long long)mdev->ov_start_sector);
1155         }
1156
1157         if ((os.conn == C_PAUSED_SYNC_T || os.conn == C_PAUSED_SYNC_S) &&
1158             (ns.conn == C_SYNC_TARGET  || ns.conn == C_SYNC_SOURCE)) {
1159                 dev_info(DEV, "Syncer continues.\n");
1160                 mdev->rs_paused += (long)jiffies
1161                                   -(long)mdev->rs_mark_time[mdev->rs_last_mark];
1162                 if (ns.conn == C_SYNC_TARGET)
1163                         mod_timer(&mdev->resync_timer, jiffies);
1164         }
1165
1166         if ((os.conn == C_SYNC_TARGET  || os.conn == C_SYNC_SOURCE) &&
1167             (ns.conn == C_PAUSED_SYNC_T || ns.conn == C_PAUSED_SYNC_S)) {
1168                 dev_info(DEV, "Resync suspended\n");
1169                 mdev->rs_mark_time[mdev->rs_last_mark] = jiffies;
1170         }
1171
1172         if (os.conn == C_CONNECTED &&
1173             (ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T)) {
1174                 unsigned long now = jiffies;
1175                 int i;
1176
1177                 set_ov_position(mdev, ns.conn);
1178                 mdev->rs_start = now;
1179                 mdev->rs_last_events = 0;
1180                 mdev->rs_last_sect_ev = 0;
1181                 mdev->ov_last_oos_size = 0;
1182                 mdev->ov_last_oos_start = 0;
1183
1184                 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1185                         mdev->rs_mark_left[i] = mdev->ov_left;
1186                         mdev->rs_mark_time[i] = now;
1187                 }
1188
1189                 drbd_rs_controller_reset(mdev);
1190
1191                 if (ns.conn == C_VERIFY_S) {
1192                         dev_info(DEV, "Starting Online Verify from sector %llu\n",
1193                                         (unsigned long long)mdev->ov_position);
1194                         mod_timer(&mdev->resync_timer, jiffies);
1195                 }
1196         }
1197
1198         if (get_ldev(mdev)) {
1199                 u32 mdf = mdev->ldev->md.flags & ~(MDF_CONSISTENT|MDF_PRIMARY_IND|
1200                                                  MDF_CONNECTED_IND|MDF_WAS_UP_TO_DATE|
1201                                                  MDF_PEER_OUT_DATED|MDF_CRASHED_PRIMARY);
1202
1203                 if (test_bit(CRASHED_PRIMARY, &mdev->flags))
1204                         mdf |= MDF_CRASHED_PRIMARY;
1205                 if (mdev->state.role == R_PRIMARY ||
1206                     (mdev->state.pdsk < D_INCONSISTENT && mdev->state.peer == R_PRIMARY))
1207                         mdf |= MDF_PRIMARY_IND;
1208                 if (mdev->state.conn > C_WF_REPORT_PARAMS)
1209                         mdf |= MDF_CONNECTED_IND;
1210                 if (mdev->state.disk > D_INCONSISTENT)
1211                         mdf |= MDF_CONSISTENT;
1212                 if (mdev->state.disk > D_OUTDATED)
1213                         mdf |= MDF_WAS_UP_TO_DATE;
1214                 if (mdev->state.pdsk <= D_OUTDATED && mdev->state.pdsk >= D_INCONSISTENT)
1215                         mdf |= MDF_PEER_OUT_DATED;
1216                 if (mdf != mdev->ldev->md.flags) {
1217                         mdev->ldev->md.flags = mdf;
1218                         drbd_md_mark_dirty(mdev);
1219                 }
1220                 if (os.disk < D_CONSISTENT && ns.disk >= D_CONSISTENT)
1221                         drbd_set_ed_uuid(mdev, mdev->ldev->md.uuid[UI_CURRENT]);
1222                 put_ldev(mdev);
1223         }
1224
1225         /* Peer was forced D_UP_TO_DATE & R_PRIMARY, consider to resync */
1226         if (os.disk == D_INCONSISTENT && os.pdsk == D_INCONSISTENT &&
1227             os.peer == R_SECONDARY && ns.peer == R_PRIMARY)
1228                 set_bit(CONSIDER_RESYNC, &mdev->flags);
1229
1230         /* Receiver should clean up itself */
1231         if (os.conn != C_DISCONNECTING && ns.conn == C_DISCONNECTING)
1232                 drbd_thread_stop_nowait(&mdev->tconn->receiver);
1233
1234         /* Now the receiver finished cleaning up itself, it should die */
1235         if (os.conn != C_STANDALONE && ns.conn == C_STANDALONE)
1236                 drbd_thread_stop_nowait(&mdev->tconn->receiver);
1237
1238         /* Upon network failure, we need to restart the receiver. */
1239         if (os.conn > C_TEAR_DOWN &&
1240             ns.conn <= C_TEAR_DOWN && ns.conn >= C_TIMEOUT)
1241                 drbd_thread_restart_nowait(&mdev->tconn->receiver);
1242
1243         /* Resume AL writing if we get a connection */
1244         if (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED)
1245                 drbd_resume_al(mdev);
1246
1247         ascw = kmalloc(sizeof(*ascw), GFP_ATOMIC);
1248         if (ascw) {
1249                 ascw->os = os;
1250                 ascw->ns = ns;
1251                 ascw->flags = flags;
1252                 ascw->w.cb = w_after_state_ch;
1253                 ascw->done = done;
1254                 drbd_queue_work(&mdev->tconn->data.work, &ascw->w);
1255         } else {
1256                 dev_warn(DEV, "Could not kmalloc an ascw\n");
1257         }
1258
1259         return rv;
1260 }
1261
1262 static int w_after_state_ch(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1263 {
1264         struct after_state_chg_work *ascw =
1265                 container_of(w, struct after_state_chg_work, w);
1266         after_state_ch(mdev, ascw->os, ascw->ns, ascw->flags);
1267         if (ascw->flags & CS_WAIT_COMPLETE) {
1268                 D_ASSERT(ascw->done != NULL);
1269                 complete(ascw->done);
1270         }
1271         kfree(ascw);
1272
1273         return 1;
1274 }
1275
1276 static void abw_start_sync(struct drbd_conf *mdev, int rv)
1277 {
1278         if (rv) {
1279                 dev_err(DEV, "Writing the bitmap failed not starting resync.\n");
1280                 _drbd_request_state(mdev, NS(conn, C_CONNECTED), CS_VERBOSE);
1281                 return;
1282         }
1283
1284         switch (mdev->state.conn) {
1285         case C_STARTING_SYNC_T:
1286                 _drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
1287                 break;
1288         case C_STARTING_SYNC_S:
1289                 drbd_start_resync(mdev, C_SYNC_SOURCE);
1290                 break;
1291         }
1292 }
1293
1294 int drbd_bitmap_io_from_worker(struct drbd_conf *mdev,
1295                 int (*io_fn)(struct drbd_conf *),
1296                 char *why, enum bm_flag flags)
1297 {
1298         int rv;
1299
1300         D_ASSERT(current == mdev->tconn->worker.task);
1301
1302         /* open coded non-blocking drbd_suspend_io(mdev); */
1303         set_bit(SUSPEND_IO, &mdev->flags);
1304
1305         drbd_bm_lock(mdev, why, flags);
1306         rv = io_fn(mdev);
1307         drbd_bm_unlock(mdev);
1308
1309         drbd_resume_io(mdev);
1310
1311         return rv;
1312 }
1313
1314 /**
1315  * after_state_ch() - Perform after state change actions that may sleep
1316  * @mdev:       DRBD device.
1317  * @os:         old state.
1318  * @ns:         new state.
1319  * @flags:      Flags
1320  */
1321 static void after_state_ch(struct drbd_conf *mdev, union drbd_state os,
1322                            union drbd_state ns, enum chg_state_flags flags)
1323 {
1324         enum drbd_fencing_p fp;
1325         enum drbd_req_event what = NOTHING;
1326         union drbd_state nsm = (union drbd_state){ .i = -1 };
1327
1328         if (os.conn != C_CONNECTED && ns.conn == C_CONNECTED) {
1329                 clear_bit(CRASHED_PRIMARY, &mdev->flags);
1330                 if (mdev->p_uuid)
1331                         mdev->p_uuid[UI_FLAGS] &= ~((u64)2);
1332         }
1333
1334         fp = FP_DONT_CARE;
1335         if (get_ldev(mdev)) {
1336                 fp = mdev->ldev->dc.fencing;
1337                 put_ldev(mdev);
1338         }
1339
1340         /* Inform userspace about the change... */
1341         drbd_bcast_state(mdev, ns);
1342
1343         if (!(os.role == R_PRIMARY && os.disk < D_UP_TO_DATE && os.pdsk < D_UP_TO_DATE) &&
1344             (ns.role == R_PRIMARY && ns.disk < D_UP_TO_DATE && ns.pdsk < D_UP_TO_DATE))
1345                 drbd_khelper(mdev, "pri-on-incon-degr");
1346
1347         /* Here we have the actions that are performed after a
1348            state change. This function might sleep */
1349
1350         nsm.i = -1;
1351         if (ns.susp_nod) {
1352                 if (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED)
1353                         what = RESEND;
1354
1355                 if (os.disk == D_ATTACHING && ns.disk > D_ATTACHING)
1356                         what = RESTART_FROZEN_DISK_IO;
1357
1358                 if (what != NOTHING)
1359                         nsm.susp_nod = 0;
1360         }
1361
1362         if (ns.susp_fen) {
1363                 /* case1: The outdate peer handler is successful: */
1364                 if (os.pdsk > D_OUTDATED  && ns.pdsk <= D_OUTDATED) {
1365                         tl_clear(mdev);
1366                         if (test_bit(NEW_CUR_UUID, &mdev->flags)) {
1367                                 drbd_uuid_new_current(mdev);
1368                                 clear_bit(NEW_CUR_UUID, &mdev->flags);
1369                         }
1370                         spin_lock_irq(&mdev->tconn->req_lock);
1371                         _drbd_set_state(_NS(mdev, susp_fen, 0), CS_VERBOSE, NULL);
1372                         spin_unlock_irq(&mdev->tconn->req_lock);
1373                 }
1374                 /* case2: The connection was established again: */
1375                 if (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED) {
1376                         clear_bit(NEW_CUR_UUID, &mdev->flags);
1377                         what = RESEND;
1378                         nsm.susp_fen = 0;
1379                 }
1380         }
1381
1382         if (what != NOTHING) {
1383                 spin_lock_irq(&mdev->tconn->req_lock);
1384                 _tl_restart(mdev, what);
1385                 nsm.i &= mdev->state.i;
1386                 _drbd_set_state(mdev, nsm, CS_VERBOSE, NULL);
1387                 spin_unlock_irq(&mdev->tconn->req_lock);
1388         }
1389
1390         /* Became sync source.  With protocol >= 96, we still need to send out
1391          * the sync uuid now. Need to do that before any drbd_send_state, or
1392          * the other side may go "paused sync" before receiving the sync uuids,
1393          * which is unexpected. */
1394         if ((os.conn != C_SYNC_SOURCE && os.conn != C_PAUSED_SYNC_S) &&
1395             (ns.conn == C_SYNC_SOURCE || ns.conn == C_PAUSED_SYNC_S) &&
1396             mdev->tconn->agreed_pro_version >= 96 && get_ldev(mdev)) {
1397                 drbd_gen_and_send_sync_uuid(mdev);
1398                 put_ldev(mdev);
1399         }
1400
1401         /* Do not change the order of the if above and the two below... */
1402         if (os.pdsk == D_DISKLESS && ns.pdsk > D_DISKLESS) {      /* attach on the peer */
1403                 drbd_send_uuids(mdev);
1404                 drbd_send_state(mdev);
1405         }
1406         /* No point in queuing send_bitmap if we don't have a connection
1407          * anymore, so check also the _current_ state, not only the new state
1408          * at the time this work was queued. */
1409         if (os.conn != C_WF_BITMAP_S && ns.conn == C_WF_BITMAP_S &&
1410             mdev->state.conn == C_WF_BITMAP_S)
1411                 drbd_queue_bitmap_io(mdev, &drbd_send_bitmap, NULL,
1412                                 "send_bitmap (WFBitMapS)",
1413                                 BM_LOCKED_TEST_ALLOWED);
1414
1415         /* Lost contact to peer's copy of the data */
1416         if ((os.pdsk >= D_INCONSISTENT &&
1417              os.pdsk != D_UNKNOWN &&
1418              os.pdsk != D_OUTDATED)
1419         &&  (ns.pdsk < D_INCONSISTENT ||
1420              ns.pdsk == D_UNKNOWN ||
1421              ns.pdsk == D_OUTDATED)) {
1422                 if (get_ldev(mdev)) {
1423                         if ((ns.role == R_PRIMARY || ns.peer == R_PRIMARY) &&
1424                             mdev->ldev->md.uuid[UI_BITMAP] == 0 && ns.disk >= D_UP_TO_DATE) {
1425                                 if (is_susp(mdev->state)) {
1426                                         set_bit(NEW_CUR_UUID, &mdev->flags);
1427                                 } else {
1428                                         drbd_uuid_new_current(mdev);
1429                                         drbd_send_uuids(mdev);
1430                                 }
1431                         }
1432                         put_ldev(mdev);
1433                 }
1434         }
1435
1436         if (ns.pdsk < D_INCONSISTENT && get_ldev(mdev)) {
1437                 if (ns.peer == R_PRIMARY && mdev->ldev->md.uuid[UI_BITMAP] == 0) {
1438                         drbd_uuid_new_current(mdev);
1439                         drbd_send_uuids(mdev);
1440                 }
1441
1442                 /* D_DISKLESS Peer becomes secondary */
1443                 if (os.peer == R_PRIMARY && ns.peer == R_SECONDARY)
1444                         /* We may still be Primary ourselves.
1445                          * No harm done if the bitmap still changes,
1446                          * redirtied pages will follow later. */
1447                         drbd_bitmap_io_from_worker(mdev, &drbd_bm_write,
1448                                 "demote diskless peer", BM_LOCKED_SET_ALLOWED);
1449                 put_ldev(mdev);
1450         }
1451
1452         /* Write out all changed bits on demote.
1453          * Though, no need to da that just yet
1454          * if there is a resync going on still */
1455         if (os.role == R_PRIMARY && ns.role == R_SECONDARY &&
1456                 mdev->state.conn <= C_CONNECTED && get_ldev(mdev)) {
1457                 /* No changes to the bitmap expected this time, so assert that,
1458                  * even though no harm was done if it did change. */
1459                 drbd_bitmap_io_from_worker(mdev, &drbd_bm_write,
1460                                 "demote", BM_LOCKED_TEST_ALLOWED);
1461                 put_ldev(mdev);
1462         }
1463
1464         /* Last part of the attaching process ... */
1465         if (ns.conn >= C_CONNECTED &&
1466             os.disk == D_ATTACHING && ns.disk == D_NEGOTIATING) {
1467                 drbd_send_sizes(mdev, 0, 0);  /* to start sync... */
1468                 drbd_send_uuids(mdev);
1469                 drbd_send_state(mdev);
1470         }
1471
1472         /* We want to pause/continue resync, tell peer. */
1473         if (ns.conn >= C_CONNECTED &&
1474              ((os.aftr_isp != ns.aftr_isp) ||
1475               (os.user_isp != ns.user_isp)))
1476                 drbd_send_state(mdev);
1477
1478         /* In case one of the isp bits got set, suspend other devices. */
1479         if ((!os.aftr_isp && !os.peer_isp && !os.user_isp) &&
1480             (ns.aftr_isp || ns.peer_isp || ns.user_isp))
1481                 suspend_other_sg(mdev);
1482
1483         /* Make sure the peer gets informed about eventual state
1484            changes (ISP bits) while we were in WFReportParams. */
1485         if (os.conn == C_WF_REPORT_PARAMS && ns.conn >= C_CONNECTED)
1486                 drbd_send_state(mdev);
1487
1488         if (os.conn != C_AHEAD && ns.conn == C_AHEAD)
1489                 drbd_send_state(mdev);
1490
1491         /* We are in the progress to start a full sync... */
1492         if ((os.conn != C_STARTING_SYNC_T && ns.conn == C_STARTING_SYNC_T) ||
1493             (os.conn != C_STARTING_SYNC_S && ns.conn == C_STARTING_SYNC_S))
1494                 /* no other bitmap changes expected during this phase */
1495                 drbd_queue_bitmap_io(mdev,
1496                         &drbd_bmio_set_n_write, &abw_start_sync,
1497                         "set_n_write from StartingSync", BM_LOCKED_TEST_ALLOWED);
1498
1499         /* We are invalidating our self... */
1500         if (os.conn < C_CONNECTED && ns.conn < C_CONNECTED &&
1501             os.disk > D_INCONSISTENT && ns.disk == D_INCONSISTENT)
1502                 /* other bitmap operation expected during this phase */
1503                 drbd_queue_bitmap_io(mdev, &drbd_bmio_set_n_write, NULL,
1504                         "set_n_write from invalidate", BM_LOCKED_MASK);
1505
1506         /* first half of local IO error, failure to attach,
1507          * or administrative detach */
1508         if (os.disk != D_FAILED && ns.disk == D_FAILED) {
1509                 enum drbd_io_error_p eh;
1510                 int was_io_error;
1511                 /* corresponding get_ldev was in __drbd_set_state, to serialize
1512                  * our cleanup here with the transition to D_DISKLESS,
1513                  * so it is safe to dreference ldev here. */
1514                 eh = mdev->ldev->dc.on_io_error;
1515                 was_io_error = test_and_clear_bit(WAS_IO_ERROR, &mdev->flags);
1516
1517                 /* current state still has to be D_FAILED,
1518                  * there is only one way out: to D_DISKLESS,
1519                  * and that may only happen after our put_ldev below. */
1520                 if (mdev->state.disk != D_FAILED)
1521                         dev_err(DEV,
1522                                 "ASSERT FAILED: disk is %s during detach\n",
1523                                 drbd_disk_str(mdev->state.disk));
1524
1525                 if (drbd_send_state(mdev))
1526                         dev_warn(DEV, "Notified peer that I am detaching my disk\n");
1527                 else
1528                         dev_err(DEV, "Sending state for detaching disk failed\n");
1529
1530                 drbd_rs_cancel_all(mdev);
1531
1532                 /* In case we want to get something to stable storage still,
1533                  * this may be the last chance.
1534                  * Following put_ldev may transition to D_DISKLESS. */
1535                 drbd_md_sync(mdev);
1536                 put_ldev(mdev);
1537
1538                 if (was_io_error && eh == EP_CALL_HELPER)
1539                         drbd_khelper(mdev, "local-io-error");
1540         }
1541
1542         /* second half of local IO error, failure to attach,
1543          * or administrative detach,
1544          * after local_cnt references have reached zero again */
1545         if (os.disk != D_DISKLESS && ns.disk == D_DISKLESS) {
1546                 /* We must still be diskless,
1547                  * re-attach has to be serialized with this! */
1548                 if (mdev->state.disk != D_DISKLESS)
1549                         dev_err(DEV,
1550                                 "ASSERT FAILED: disk is %s while going diskless\n",
1551                                 drbd_disk_str(mdev->state.disk));
1552
1553                 mdev->rs_total = 0;
1554                 mdev->rs_failed = 0;
1555                 atomic_set(&mdev->rs_pending_cnt, 0);
1556
1557                 if (drbd_send_state(mdev))
1558                         dev_warn(DEV, "Notified peer that I'm now diskless.\n");
1559                 /* corresponding get_ldev in __drbd_set_state
1560                  * this may finally trigger drbd_ldev_destroy. */
1561                 put_ldev(mdev);
1562         }
1563
1564         /* Notify peer that I had a local IO error, and did not detached.. */
1565         if (os.disk == D_UP_TO_DATE && ns.disk == D_INCONSISTENT)
1566                 drbd_send_state(mdev);
1567
1568         /* Disks got bigger while they were detached */
1569         if (ns.disk > D_NEGOTIATING && ns.pdsk > D_NEGOTIATING &&
1570             test_and_clear_bit(RESYNC_AFTER_NEG, &mdev->flags)) {
1571                 if (ns.conn == C_CONNECTED)
1572                         resync_after_online_grow(mdev);
1573         }
1574
1575         /* A resync finished or aborted, wake paused devices... */
1576         if ((os.conn > C_CONNECTED && ns.conn <= C_CONNECTED) ||
1577             (os.peer_isp && !ns.peer_isp) ||
1578             (os.user_isp && !ns.user_isp))
1579                 resume_next_sg(mdev);
1580
1581         /* sync target done with resync.  Explicitly notify peer, even though
1582          * it should (at least for non-empty resyncs) already know itself. */
1583         if (os.disk < D_UP_TO_DATE && os.conn >= C_SYNC_SOURCE && ns.conn == C_CONNECTED)
1584                 drbd_send_state(mdev);
1585
1586         /* This triggers bitmap writeout of potentially still unwritten pages
1587          * if the resync finished cleanly, or aborted because of peer disk
1588          * failure, or because of connection loss.
1589          * For resync aborted because of local disk failure, we cannot do
1590          * any bitmap writeout anymore.
1591          * No harm done if some bits change during this phase.
1592          */
1593         if (os.conn > C_CONNECTED && ns.conn <= C_CONNECTED && get_ldev(mdev)) {
1594                 drbd_queue_bitmap_io(mdev, &drbd_bm_write, NULL,
1595                         "write from resync_finished", BM_LOCKED_SET_ALLOWED);
1596                 put_ldev(mdev);
1597         }
1598
1599         /* Upon network connection, we need to start the receiver */
1600         if (os.conn == C_STANDALONE && ns.conn == C_UNCONNECTED)
1601                 drbd_thread_start(&mdev->tconn->receiver);
1602
1603         /* Terminate worker thread if we are unconfigured - it will be
1604            restarted as needed... */
1605         if (ns.disk == D_DISKLESS &&
1606             ns.conn == C_STANDALONE &&
1607             ns.role == R_SECONDARY) {
1608                 if (os.aftr_isp != ns.aftr_isp)
1609                         resume_next_sg(mdev);
1610                 /* set in __drbd_set_state, unless CONFIG_PENDING was set */
1611                 if (test_bit(DEVICE_DYING, &mdev->flags))
1612                         drbd_thread_stop_nowait(&mdev->tconn->worker);
1613         }
1614
1615         drbd_md_sync(mdev);
1616 }
1617
1618
1619 static int drbd_thread_setup(void *arg)
1620 {
1621         struct drbd_thread *thi = (struct drbd_thread *) arg;
1622         struct drbd_conf *mdev = thi->mdev;
1623         unsigned long flags;
1624         int retval;
1625
1626 restart:
1627         retval = thi->function(thi);
1628
1629         spin_lock_irqsave(&thi->t_lock, flags);
1630
1631         /* if the receiver has been "EXITING", the last thing it did
1632          * was set the conn state to "StandAlone",
1633          * if now a re-connect request comes in, conn state goes C_UNCONNECTED,
1634          * and receiver thread will be "started".
1635          * drbd_thread_start needs to set "RESTARTING" in that case.
1636          * t_state check and assignment needs to be within the same spinlock,
1637          * so either thread_start sees EXITING, and can remap to RESTARTING,
1638          * or thread_start see NONE, and can proceed as normal.
1639          */
1640
1641         if (thi->t_state == RESTARTING) {
1642                 dev_info(DEV, "Restarting %s\n", current->comm);
1643                 thi->t_state = RUNNING;
1644                 spin_unlock_irqrestore(&thi->t_lock, flags);
1645                 goto restart;
1646         }
1647
1648         thi->task = NULL;
1649         thi->t_state = NONE;
1650         smp_mb();
1651         complete(&thi->stop);
1652         spin_unlock_irqrestore(&thi->t_lock, flags);
1653
1654         dev_info(DEV, "Terminating %s\n", current->comm);
1655
1656         /* Release mod reference taken when thread was started */
1657         module_put(THIS_MODULE);
1658         return retval;
1659 }
1660
1661 static void drbd_thread_init(struct drbd_conf *mdev, struct drbd_thread *thi,
1662                       int (*func) (struct drbd_thread *))
1663 {
1664         spin_lock_init(&thi->t_lock);
1665         thi->task    = NULL;
1666         thi->t_state = NONE;
1667         thi->function = func;
1668         thi->mdev = mdev;
1669 }
1670
1671 int drbd_thread_start(struct drbd_thread *thi)
1672 {
1673         struct drbd_conf *mdev = thi->mdev;
1674         struct task_struct *nt;
1675         unsigned long flags;
1676
1677         const char *me =
1678                 thi == &mdev->tconn->receiver ? "receiver" :
1679                 thi == &mdev->tconn->asender  ? "asender"  :
1680                 thi == &mdev->tconn->worker   ? "worker"   : "NONSENSE";
1681
1682         /* is used from state engine doing drbd_thread_stop_nowait,
1683          * while holding the req lock irqsave */
1684         spin_lock_irqsave(&thi->t_lock, flags);
1685
1686         switch (thi->t_state) {
1687         case NONE:
1688                 dev_info(DEV, "Starting %s thread (from %s [%d])\n",
1689                                 me, current->comm, current->pid);
1690
1691                 /* Get ref on module for thread - this is released when thread exits */
1692                 if (!try_module_get(THIS_MODULE)) {
1693                         dev_err(DEV, "Failed to get module reference in drbd_thread_start\n");
1694                         spin_unlock_irqrestore(&thi->t_lock, flags);
1695                         return false;
1696                 }
1697
1698                 init_completion(&thi->stop);
1699                 D_ASSERT(thi->task == NULL);
1700                 thi->reset_cpu_mask = 1;
1701                 thi->t_state = RUNNING;
1702                 spin_unlock_irqrestore(&thi->t_lock, flags);
1703                 flush_signals(current); /* otherw. may get -ERESTARTNOINTR */
1704
1705                 nt = kthread_create(drbd_thread_setup, (void *) thi,
1706                                     "drbd%d_%s", mdev_to_minor(mdev), me);
1707
1708                 if (IS_ERR(nt)) {
1709                         dev_err(DEV, "Couldn't start thread\n");
1710
1711                         module_put(THIS_MODULE);
1712                         return false;
1713                 }
1714                 spin_lock_irqsave(&thi->t_lock, flags);
1715                 thi->task = nt;
1716                 thi->t_state = RUNNING;
1717                 spin_unlock_irqrestore(&thi->t_lock, flags);
1718                 wake_up_process(nt);
1719                 break;
1720         case EXITING:
1721                 thi->t_state = RESTARTING;
1722                 dev_info(DEV, "Restarting %s thread (from %s [%d])\n",
1723                                 me, current->comm, current->pid);
1724                 /* fall through */
1725         case RUNNING:
1726         case RESTARTING:
1727         default:
1728                 spin_unlock_irqrestore(&thi->t_lock, flags);
1729                 break;
1730         }
1731
1732         return true;
1733 }
1734
1735
1736 void _drbd_thread_stop(struct drbd_thread *thi, int restart, int wait)
1737 {
1738         unsigned long flags;
1739
1740         enum drbd_thread_state ns = restart ? RESTARTING : EXITING;
1741
1742         /* may be called from state engine, holding the req lock irqsave */
1743         spin_lock_irqsave(&thi->t_lock, flags);
1744
1745         if (thi->t_state == NONE) {
1746                 spin_unlock_irqrestore(&thi->t_lock, flags);
1747                 if (restart)
1748                         drbd_thread_start(thi);
1749                 return;
1750         }
1751
1752         if (thi->t_state != ns) {
1753                 if (thi->task == NULL) {
1754                         spin_unlock_irqrestore(&thi->t_lock, flags);
1755                         return;
1756                 }
1757
1758                 thi->t_state = ns;
1759                 smp_mb();
1760                 init_completion(&thi->stop);
1761                 if (thi->task != current)
1762                         force_sig(DRBD_SIGKILL, thi->task);
1763
1764         }
1765
1766         spin_unlock_irqrestore(&thi->t_lock, flags);
1767
1768         if (wait)
1769                 wait_for_completion(&thi->stop);
1770 }
1771
1772 #ifdef CONFIG_SMP
1773 /**
1774  * drbd_calc_cpu_mask() - Generate CPU masks, spread over all CPUs
1775  * @mdev:       DRBD device.
1776  *
1777  * Forces all threads of a device onto the same CPU. This is beneficial for
1778  * DRBD's performance. May be overwritten by user's configuration.
1779  */
1780 void drbd_calc_cpu_mask(struct drbd_conf *mdev)
1781 {
1782         int ord, cpu;
1783
1784         /* user override. */
1785         if (cpumask_weight(mdev->cpu_mask))
1786                 return;
1787
1788         ord = mdev_to_minor(mdev) % cpumask_weight(cpu_online_mask);
1789         for_each_online_cpu(cpu) {
1790                 if (ord-- == 0) {
1791                         cpumask_set_cpu(cpu, mdev->cpu_mask);
1792                         return;
1793                 }
1794         }
1795         /* should not be reached */
1796         cpumask_setall(mdev->cpu_mask);
1797 }
1798
1799 /**
1800  * drbd_thread_current_set_cpu() - modifies the cpu mask of the _current_ thread
1801  * @mdev:       DRBD device.
1802  *
1803  * call in the "main loop" of _all_ threads, no need for any mutex, current won't die
1804  * prematurely.
1805  */
1806 void drbd_thread_current_set_cpu(struct drbd_conf *mdev)
1807 {
1808         struct task_struct *p = current;
1809         struct drbd_thread *thi =
1810                 p == mdev->tconn->asender.task  ? &mdev->tconn->asender  :
1811                 p == mdev->tconn->receiver.task ? &mdev->tconn->receiver :
1812                 p == mdev->tconn->worker.task   ? &mdev->tconn->worker   :
1813                 NULL;
1814         if (!expect(thi != NULL))
1815                 return;
1816         if (!thi->reset_cpu_mask)
1817                 return;
1818         thi->reset_cpu_mask = 0;
1819         set_cpus_allowed_ptr(p, mdev->cpu_mask);
1820 }
1821 #endif
1822
1823 static void prepare_header80(struct drbd_conf *mdev, struct p_header80 *h,
1824                              enum drbd_packets cmd, int size)
1825 {
1826         h->magic   = cpu_to_be32(DRBD_MAGIC);
1827         h->command = cpu_to_be16(cmd);
1828         h->length  = cpu_to_be16(size);
1829 }
1830
1831 static void prepare_header95(struct drbd_conf *mdev, struct p_header95 *h,
1832                              enum drbd_packets cmd, int size)
1833 {
1834         h->magic   = cpu_to_be16(DRBD_MAGIC_BIG);
1835         h->command = cpu_to_be16(cmd);
1836         h->length  = cpu_to_be32(size);
1837 }
1838
1839 static void prepare_header(struct drbd_conf *mdev, struct p_header *h,
1840                            enum drbd_packets cmd, int size)
1841 {
1842         if (mdev->tconn->agreed_pro_version >= 100 || size > DRBD_MAX_SIZE_H80_PACKET)
1843                 prepare_header95(mdev, &h->h95, cmd, size);
1844         else
1845                 prepare_header80(mdev, &h->h80, cmd, size);
1846 }
1847
1848 /* the appropriate socket mutex must be held already */
1849 int _drbd_send_cmd(struct drbd_conf *mdev, struct socket *sock,
1850                           enum drbd_packets cmd, struct p_header *h,
1851                           size_t size, unsigned msg_flags)
1852 {
1853         int sent, ok;
1854
1855         if (!expect(h))
1856                 return false;
1857         if (!expect(size))
1858                 return false;
1859
1860         prepare_header(mdev, h, cmd, size - sizeof(struct p_header));
1861
1862         sent = drbd_send(mdev, sock, h, size, msg_flags);
1863
1864         ok = (sent == size);
1865         if (!ok && !signal_pending(current))
1866                 dev_warn(DEV, "short sent %s size=%d sent=%d\n",
1867                     cmdname(cmd), (int)size, sent);
1868         return ok;
1869 }
1870
1871 /* don't pass the socket. we may only look at it
1872  * when we hold the appropriate socket mutex.
1873  */
1874 int drbd_send_cmd(struct drbd_conf *mdev, int use_data_socket,
1875                   enum drbd_packets cmd, struct p_header *h, size_t size)
1876 {
1877         int ok = 0;
1878         struct socket *sock;
1879
1880         if (use_data_socket) {
1881                 mutex_lock(&mdev->tconn->data.mutex);
1882                 sock = mdev->tconn->data.socket;
1883         } else {
1884                 mutex_lock(&mdev->tconn->meta.mutex);
1885                 sock = mdev->tconn->meta.socket;
1886         }
1887
1888         /* drbd_disconnect() could have called drbd_free_sock()
1889          * while we were waiting in down()... */
1890         if (likely(sock != NULL))
1891                 ok = _drbd_send_cmd(mdev, sock, cmd, h, size, 0);
1892
1893         if (use_data_socket)
1894                 mutex_unlock(&mdev->tconn->data.mutex);
1895         else
1896                 mutex_unlock(&mdev->tconn->meta.mutex);
1897         return ok;
1898 }
1899
1900 int drbd_send_cmd2(struct drbd_conf *mdev, enum drbd_packets cmd, char *data,
1901                    size_t size)
1902 {
1903         struct p_header h;
1904         int ok;
1905
1906         prepare_header(mdev, &h, cmd, size);
1907
1908         if (!drbd_get_data_sock(mdev))
1909                 return 0;
1910
1911         ok = (sizeof(h) ==
1912                 drbd_send(mdev, mdev->tconn->data.socket, &h, sizeof(h), 0));
1913         ok = ok && (size ==
1914                 drbd_send(mdev, mdev->tconn->data.socket, data, size, 0));
1915
1916         drbd_put_data_sock(mdev);
1917
1918         return ok;
1919 }
1920
1921 int drbd_send_sync_param(struct drbd_conf *mdev, struct syncer_conf *sc)
1922 {
1923         struct p_rs_param_95 *p;
1924         struct socket *sock;
1925         int size, rv;
1926         const int apv = mdev->tconn->agreed_pro_version;
1927
1928         size = apv <= 87 ? sizeof(struct p_rs_param)
1929                 : apv == 88 ? sizeof(struct p_rs_param)
1930                         + strlen(mdev->sync_conf.verify_alg) + 1
1931                 : apv <= 94 ? sizeof(struct p_rs_param_89)
1932                 : /* apv >= 95 */ sizeof(struct p_rs_param_95);
1933
1934         /* used from admin command context and receiver/worker context.
1935          * to avoid kmalloc, grab the socket right here,
1936          * then use the pre-allocated sbuf there */
1937         mutex_lock(&mdev->tconn->data.mutex);
1938         sock = mdev->tconn->data.socket;
1939
1940         if (likely(sock != NULL)) {
1941                 enum drbd_packets cmd = apv >= 89 ? P_SYNC_PARAM89 : P_SYNC_PARAM;
1942
1943                 p = &mdev->tconn->data.sbuf.rs_param_95;
1944
1945                 /* initialize verify_alg and csums_alg */
1946                 memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
1947
1948                 p->rate = cpu_to_be32(sc->rate);
1949                 p->c_plan_ahead = cpu_to_be32(sc->c_plan_ahead);
1950                 p->c_delay_target = cpu_to_be32(sc->c_delay_target);
1951                 p->c_fill_target = cpu_to_be32(sc->c_fill_target);
1952                 p->c_max_rate = cpu_to_be32(sc->c_max_rate);
1953
1954                 if (apv >= 88)
1955                         strcpy(p->verify_alg, mdev->sync_conf.verify_alg);
1956                 if (apv >= 89)
1957                         strcpy(p->csums_alg, mdev->sync_conf.csums_alg);
1958
1959                 rv = _drbd_send_cmd(mdev, sock, cmd, &p->head, size, 0);
1960         } else
1961                 rv = 0; /* not ok */
1962
1963         mutex_unlock(&mdev->tconn->data.mutex);
1964
1965         return rv;
1966 }
1967
1968 int drbd_send_protocol(struct drbd_conf *mdev)
1969 {
1970         struct p_protocol *p;
1971         int size, cf, rv;
1972
1973         size = sizeof(struct p_protocol);
1974
1975         if (mdev->tconn->agreed_pro_version >= 87)
1976                 size += strlen(mdev->tconn->net_conf->integrity_alg) + 1;
1977
1978         /* we must not recurse into our own queue,
1979          * as that is blocked during handshake */
1980         p = kmalloc(size, GFP_NOIO);
1981         if (p == NULL)
1982                 return 0;
1983
1984         p->protocol      = cpu_to_be32(mdev->tconn->net_conf->wire_protocol);
1985         p->after_sb_0p   = cpu_to_be32(mdev->tconn->net_conf->after_sb_0p);
1986         p->after_sb_1p   = cpu_to_be32(mdev->tconn->net_conf->after_sb_1p);
1987         p->after_sb_2p   = cpu_to_be32(mdev->tconn->net_conf->after_sb_2p);
1988         p->two_primaries = cpu_to_be32(mdev->tconn->net_conf->two_primaries);
1989
1990         cf = 0;
1991         if (mdev->tconn->net_conf->want_lose)
1992                 cf |= CF_WANT_LOSE;
1993         if (mdev->tconn->net_conf->dry_run) {
1994                 if (mdev->tconn->agreed_pro_version >= 92)
1995                         cf |= CF_DRY_RUN;
1996                 else {
1997                         dev_err(DEV, "--dry-run is not supported by peer");
1998                         kfree(p);
1999                         return -1;
2000                 }
2001         }
2002         p->conn_flags    = cpu_to_be32(cf);
2003
2004         if (mdev->tconn->agreed_pro_version >= 87)
2005                 strcpy(p->integrity_alg, mdev->tconn->net_conf->integrity_alg);
2006
2007         rv = drbd_send_cmd(mdev, USE_DATA_SOCKET, P_PROTOCOL, &p->head, size);
2008         kfree(p);
2009         return rv;
2010 }
2011
2012 int _drbd_send_uuids(struct drbd_conf *mdev, u64 uuid_flags)
2013 {
2014         struct p_uuids p;
2015         int i;
2016
2017         if (!get_ldev_if_state(mdev, D_NEGOTIATING))
2018                 return 1;
2019
2020         for (i = UI_CURRENT; i < UI_SIZE; i++)
2021                 p.uuid[i] = mdev->ldev ? cpu_to_be64(mdev->ldev->md.uuid[i]) : 0;
2022
2023         mdev->comm_bm_set = drbd_bm_total_weight(mdev);
2024         p.uuid[UI_SIZE] = cpu_to_be64(mdev->comm_bm_set);
2025         uuid_flags |= mdev->tconn->net_conf->want_lose ? 1 : 0;
2026         uuid_flags |= test_bit(CRASHED_PRIMARY, &mdev->flags) ? 2 : 0;
2027         uuid_flags |= mdev->new_state_tmp.disk == D_INCONSISTENT ? 4 : 0;
2028         p.uuid[UI_FLAGS] = cpu_to_be64(uuid_flags);
2029
2030         put_ldev(mdev);
2031
2032         return drbd_send_cmd(mdev, USE_DATA_SOCKET, P_UUIDS, &p.head, sizeof(p));
2033 }
2034
2035 int drbd_send_uuids(struct drbd_conf *mdev)
2036 {
2037         return _drbd_send_uuids(mdev, 0);
2038 }
2039
2040 int drbd_send_uuids_skip_initial_sync(struct drbd_conf *mdev)
2041 {
2042         return _drbd_send_uuids(mdev, 8);
2043 }
2044
2045 void drbd_print_uuids(struct drbd_conf *mdev, const char *text)
2046 {
2047         if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
2048                 u64 *uuid = mdev->ldev->md.uuid;
2049                 dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX\n",
2050                      text,
2051                      (unsigned long long)uuid[UI_CURRENT],
2052                      (unsigned long long)uuid[UI_BITMAP],
2053                      (unsigned long long)uuid[UI_HISTORY_START],
2054                      (unsigned long long)uuid[UI_HISTORY_END]);
2055                 put_ldev(mdev);
2056         } else {
2057                 dev_info(DEV, "%s effective data uuid: %016llX\n",
2058                                 text,
2059                                 (unsigned long long)mdev->ed_uuid);
2060         }
2061 }
2062
2063 int drbd_gen_and_send_sync_uuid(struct drbd_conf *mdev)
2064 {
2065         struct p_rs_uuid p;
2066         u64 uuid;
2067
2068         D_ASSERT(mdev->state.disk == D_UP_TO_DATE);
2069
2070         uuid = mdev->ldev->md.uuid[UI_BITMAP] + UUID_NEW_BM_OFFSET;
2071         drbd_uuid_set(mdev, UI_BITMAP, uuid);
2072         drbd_print_uuids(mdev, "updated sync UUID");
2073         drbd_md_sync(mdev);
2074         p.uuid = cpu_to_be64(uuid);
2075
2076         return drbd_send_cmd(mdev, USE_DATA_SOCKET, P_SYNC_UUID, &p.head, sizeof(p));
2077 }
2078
2079 int drbd_send_sizes(struct drbd_conf *mdev, int trigger_reply, enum dds_flags flags)
2080 {
2081         struct p_sizes p;
2082         sector_t d_size, u_size;
2083         int q_order_type, max_bio_size;
2084         int ok;
2085
2086         if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
2087                 D_ASSERT(mdev->ldev->backing_bdev);
2088                 d_size = drbd_get_max_capacity(mdev->ldev);
2089                 u_size = mdev->ldev->dc.disk_size;
2090                 q_order_type = drbd_queue_order_type(mdev);
2091                 max_bio_size = queue_max_hw_sectors(mdev->ldev->backing_bdev->bd_disk->queue) << 9;
2092                 max_bio_size = min_t(int, max_bio_size, DRBD_MAX_BIO_SIZE);
2093                 put_ldev(mdev);
2094         } else {
2095                 d_size = 0;
2096                 u_size = 0;
2097                 q_order_type = QUEUE_ORDERED_NONE;
2098                 max_bio_size = DRBD_MAX_BIO_SIZE; /* ... multiple BIOs per peer_request */
2099         }
2100
2101         p.d_size = cpu_to_be64(d_size);
2102         p.u_size = cpu_to_be64(u_size);
2103         p.c_size = cpu_to_be64(trigger_reply ? 0 : drbd_get_capacity(mdev->this_bdev));
2104         p.max_bio_size = cpu_to_be32(max_bio_size);
2105         p.queue_order_type = cpu_to_be16(q_order_type);
2106         p.dds_flags = cpu_to_be16(flags);
2107
2108         ok = drbd_send_cmd(mdev, USE_DATA_SOCKET, P_SIZES, &p.head, sizeof(p));
2109         return ok;
2110 }
2111
2112 /**
2113  * drbd_send_state() - Sends the drbd state to the peer
2114  * @mdev:       DRBD device.
2115  */
2116 int drbd_send_state(struct drbd_conf *mdev)
2117 {
2118         struct socket *sock;
2119         struct p_state p;
2120         int ok = 0;
2121
2122         /* Grab state lock so we wont send state if we're in the middle
2123          * of a cluster wide state change on another thread */
2124         drbd_state_lock(mdev);
2125
2126         mutex_lock(&mdev->tconn->data.mutex);
2127
2128         p.state = cpu_to_be32(mdev->state.i); /* Within the send mutex */
2129         sock = mdev->tconn->data.socket;
2130
2131         if (likely(sock != NULL)) {
2132                 ok = _drbd_send_cmd(mdev, sock, P_STATE, &p.head, sizeof(p), 0);
2133         }
2134
2135         mutex_unlock(&mdev->tconn->data.mutex);
2136
2137         drbd_state_unlock(mdev);
2138         return ok;
2139 }
2140
2141 int drbd_send_state_req(struct drbd_conf *mdev,
2142         union drbd_state mask, union drbd_state val)
2143 {
2144         struct p_req_state p;
2145
2146         p.mask    = cpu_to_be32(mask.i);
2147         p.val     = cpu_to_be32(val.i);
2148
2149         return drbd_send_cmd(mdev, USE_DATA_SOCKET, P_STATE_CHG_REQ, &p.head, sizeof(p));
2150 }
2151
2152 int drbd_send_sr_reply(struct drbd_conf *mdev, enum drbd_state_rv retcode)
2153 {
2154         struct p_req_state_reply p;
2155
2156         p.retcode    = cpu_to_be32(retcode);
2157
2158         return drbd_send_cmd(mdev, USE_META_SOCKET, P_STATE_CHG_REPLY, &p.head, sizeof(p));
2159 }
2160
2161 int fill_bitmap_rle_bits(struct drbd_conf *mdev,
2162         struct p_compressed_bm *p,
2163         struct bm_xfer_ctx *c)
2164 {
2165         struct bitstream bs;
2166         unsigned long plain_bits;
2167         unsigned long tmp;
2168         unsigned long rl;
2169         unsigned len;
2170         unsigned toggle;
2171         int bits;
2172
2173         /* may we use this feature? */
2174         if ((mdev->sync_conf.use_rle == 0) ||
2175                 (mdev->tconn->agreed_pro_version < 90))
2176                         return 0;
2177
2178         if (c->bit_offset >= c->bm_bits)
2179                 return 0; /* nothing to do. */
2180
2181         /* use at most thus many bytes */
2182         bitstream_init(&bs, p->code, BM_PACKET_VLI_BYTES_MAX, 0);
2183         memset(p->code, 0, BM_PACKET_VLI_BYTES_MAX);
2184         /* plain bits covered in this code string */
2185         plain_bits = 0;
2186
2187         /* p->encoding & 0x80 stores whether the first run length is set.
2188          * bit offset is implicit.
2189          * start with toggle == 2 to be able to tell the first iteration */
2190         toggle = 2;
2191
2192         /* see how much plain bits we can stuff into one packet
2193          * using RLE and VLI. */
2194         do {
2195                 tmp = (toggle == 0) ? _drbd_bm_find_next_zero(mdev, c->bit_offset)
2196                                     : _drbd_bm_find_next(mdev, c->bit_offset);
2197                 if (tmp == -1UL)
2198                         tmp = c->bm_bits;
2199                 rl = tmp - c->bit_offset;
2200
2201                 if (toggle == 2) { /* first iteration */
2202                         if (rl == 0) {
2203                                 /* the first checked bit was set,
2204                                  * store start value, */
2205                                 DCBP_set_start(p, 1);
2206                                 /* but skip encoding of zero run length */
2207                                 toggle = !toggle;
2208                                 continue;
2209                         }
2210                         DCBP_set_start(p, 0);
2211                 }
2212
2213                 /* paranoia: catch zero runlength.
2214                  * can only happen if bitmap is modified while we scan it. */
2215                 if (rl == 0) {
2216                         dev_err(DEV, "unexpected zero runlength while encoding bitmap "
2217                             "t:%u bo:%lu\n", toggle, c->bit_offset);
2218                         return -1;
2219                 }
2220
2221                 bits = vli_encode_bits(&bs, rl);
2222                 if (bits == -ENOBUFS) /* buffer full */
2223                         break;
2224                 if (bits <= 0) {
2225                         dev_err(DEV, "error while encoding bitmap: %d\n", bits);
2226                         return 0;
2227                 }
2228
2229                 toggle = !toggle;
2230                 plain_bits += rl;
2231                 c->bit_offset = tmp;
2232         } while (c->bit_offset < c->bm_bits);
2233
2234         len = bs.cur.b - p->code + !!bs.cur.bit;
2235
2236         if (plain_bits < (len << 3)) {
2237                 /* incompressible with this method.
2238                  * we need to rewind both word and bit position. */
2239                 c->bit_offset -= plain_bits;
2240                 bm_xfer_ctx_bit_to_word_offset(c);
2241                 c->bit_offset = c->word_offset * BITS_PER_LONG;
2242                 return 0;
2243         }
2244
2245         /* RLE + VLI was able to compress it just fine.
2246          * update c->word_offset. */
2247         bm_xfer_ctx_bit_to_word_offset(c);
2248
2249         /* store pad_bits */
2250         DCBP_set_pad_bits(p, (8 - bs.cur.bit) & 0x7);
2251
2252         return len;
2253 }
2254
2255 /**
2256  * send_bitmap_rle_or_plain
2257  *
2258  * Return 0 when done, 1 when another iteration is needed, and a negative error
2259  * code upon failure.
2260  */
2261 static int
2262 send_bitmap_rle_or_plain(struct drbd_conf *mdev,
2263                          struct p_header *h, struct bm_xfer_ctx *c)
2264 {
2265         struct p_compressed_bm *p = (void*)h;
2266         unsigned long num_words;
2267         int len;
2268         int ok;
2269
2270         len = fill_bitmap_rle_bits(mdev, p, c);
2271
2272         if (len < 0)
2273                 return -EIO;
2274
2275         if (len) {
2276                 DCBP_set_code(p, RLE_VLI_Bits);
2277                 ok = _drbd_send_cmd(mdev, mdev->tconn->data.socket, P_COMPRESSED_BITMAP, h,
2278                         sizeof(*p) + len, 0);
2279
2280                 c->packets[0]++;
2281                 c->bytes[0] += sizeof(*p) + len;
2282
2283                 if (c->bit_offset >= c->bm_bits)
2284                         len = 0; /* DONE */
2285         } else {
2286                 /* was not compressible.
2287                  * send a buffer full of plain text bits instead. */
2288                 num_words = min_t(size_t, BM_PACKET_WORDS, c->bm_words - c->word_offset);
2289                 len = num_words * sizeof(long);
2290                 if (len)
2291                         drbd_bm_get_lel(mdev, c->word_offset, num_words, (unsigned long*)h->payload);
2292                 ok = _drbd_send_cmd(mdev, mdev->tconn->data.socket, P_BITMAP,
2293                                    h, sizeof(struct p_header80) + len, 0);
2294                 c->word_offset += num_words;
2295                 c->bit_offset = c->word_offset * BITS_PER_LONG;
2296
2297                 c->packets[1]++;
2298                 c->bytes[1] += sizeof(struct p_header80) + len;
2299
2300                 if (c->bit_offset > c->bm_bits)
2301                         c->bit_offset = c->bm_bits;
2302         }
2303         if (ok) {
2304                 if (len == 0) {
2305                         INFO_bm_xfer_stats(mdev, "send", c);
2306                         return 0;
2307                 } else
2308                         return 1;
2309         }
2310         return -EIO;
2311 }
2312
2313 /* See the comment at receive_bitmap() */
2314 int _drbd_send_bitmap(struct drbd_conf *mdev)
2315 {
2316         struct bm_xfer_ctx c;
2317         struct p_header *p;
2318         int err;
2319
2320         if (!expect(mdev->bitmap))
2321                 return false;
2322
2323         /* maybe we should use some per thread scratch page,
2324          * and allocate that during initial device creation? */
2325         p = (struct p_header *) __get_free_page(GFP_NOIO);
2326         if (!p) {
2327                 dev_err(DEV, "failed to allocate one page buffer in %s\n", __func__);
2328                 return false;
2329         }
2330
2331         if (get_ldev(mdev)) {
2332                 if (drbd_md_test_flag(mdev->ldev, MDF_FULL_SYNC)) {
2333                         dev_info(DEV, "Writing the whole bitmap, MDF_FullSync was set.\n");
2334                         drbd_bm_set_all(mdev);
2335                         if (drbd_bm_write(mdev)) {
2336                                 /* write_bm did fail! Leave full sync flag set in Meta P_DATA
2337                                  * but otherwise process as per normal - need to tell other
2338                                  * side that a full resync is required! */
2339                                 dev_err(DEV, "Failed to write bitmap to disk!\n");
2340                         } else {
2341                                 drbd_md_clear_flag(mdev, MDF_FULL_SYNC);
2342                                 drbd_md_sync(mdev);
2343                         }
2344                 }
2345                 put_ldev(mdev);
2346         }
2347
2348         c = (struct bm_xfer_ctx) {
2349                 .bm_bits = drbd_bm_bits(mdev),
2350                 .bm_words = drbd_bm_words(mdev),
2351         };
2352
2353         do {
2354                 err = send_bitmap_rle_or_plain(mdev, p, &c);
2355         } while (err > 0);
2356
2357         free_page((unsigned long) p);
2358         return err == 0;
2359 }
2360
2361 int drbd_send_bitmap(struct drbd_conf *mdev)
2362 {
2363         int err;
2364
2365         if (!drbd_get_data_sock(mdev))
2366                 return -1;
2367         err = !_drbd_send_bitmap(mdev);
2368         drbd_put_data_sock(mdev);
2369         return err;
2370 }
2371
2372 int drbd_send_b_ack(struct drbd_conf *mdev, u32 barrier_nr, u32 set_size)
2373 {
2374         int ok;
2375         struct p_barrier_ack p;
2376
2377         p.barrier  = barrier_nr;
2378         p.set_size = cpu_to_be32(set_size);
2379
2380         if (mdev->state.conn < C_CONNECTED)
2381                 return false;
2382         ok = drbd_send_cmd(mdev, USE_META_SOCKET, P_BARRIER_ACK, &p.head, sizeof(p));
2383         return ok;
2384 }
2385
2386 /**
2387  * _drbd_send_ack() - Sends an ack packet
2388  * @mdev:       DRBD device.
2389  * @cmd:        Packet command code.
2390  * @sector:     sector, needs to be in big endian byte order
2391  * @blksize:    size in byte, needs to be in big endian byte order
2392  * @block_id:   Id, big endian byte order
2393  */
2394 static int _drbd_send_ack(struct drbd_conf *mdev, enum drbd_packets cmd,
2395                           u64 sector,
2396                           u32 blksize,
2397                           u64 block_id)
2398 {
2399         int ok;
2400         struct p_block_ack p;
2401
2402         p.sector   = sector;
2403         p.block_id = block_id;
2404         p.blksize  = blksize;
2405         p.seq_num  = cpu_to_be32(atomic_add_return(1, &mdev->packet_seq));
2406
2407         if (!mdev->tconn->meta.socket || mdev->state.conn < C_CONNECTED)
2408                 return false;
2409         ok = drbd_send_cmd(mdev, USE_META_SOCKET, cmd, &p.head, sizeof(p));
2410         return ok;
2411 }
2412
2413 /* dp->sector and dp->block_id already/still in network byte order,
2414  * data_size is payload size according to dp->head,
2415  * and may need to be corrected for digest size. */
2416 int drbd_send_ack_dp(struct drbd_conf *mdev, enum drbd_packets cmd,
2417                      struct p_data *dp, int data_size)
2418 {
2419         data_size -= (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_r_tfm) ?
2420                 crypto_hash_digestsize(mdev->tconn->integrity_r_tfm) : 0;
2421         return _drbd_send_ack(mdev, cmd, dp->sector, cpu_to_be32(data_size),
2422                               dp->block_id);
2423 }
2424
2425 int drbd_send_ack_rp(struct drbd_conf *mdev, enum drbd_packets cmd,
2426                      struct p_block_req *rp)
2427 {
2428         return _drbd_send_ack(mdev, cmd, rp->sector, rp->blksize, rp->block_id);
2429 }
2430
2431 /**
2432  * drbd_send_ack() - Sends an ack packet
2433  * @mdev:       DRBD device.
2434  * @cmd:        Packet command code.
2435  * @e:          Epoch entry.
2436  */
2437 int drbd_send_ack(struct drbd_conf *mdev,
2438         enum drbd_packets cmd, struct drbd_epoch_entry *e)
2439 {
2440         return _drbd_send_ack(mdev, cmd,
2441                               cpu_to_be64(e->i.sector),
2442                               cpu_to_be32(e->i.size),
2443                               e->block_id);
2444 }
2445
2446 /* This function misuses the block_id field to signal if the blocks
2447  * are is sync or not. */
2448 int drbd_send_ack_ex(struct drbd_conf *mdev, enum drbd_packets cmd,
2449                      sector_t sector, int blksize, u64 block_id)
2450 {
2451         return _drbd_send_ack(mdev, cmd,
2452                               cpu_to_be64(sector),
2453                               cpu_to_be32(blksize),
2454                               cpu_to_be64(block_id));
2455 }
2456
2457 int drbd_send_drequest(struct drbd_conf *mdev, int cmd,
2458                        sector_t sector, int size, u64 block_id)
2459 {
2460         int ok;
2461         struct p_block_req p;
2462
2463         p.sector   = cpu_to_be64(sector);
2464         p.block_id = block_id;
2465         p.blksize  = cpu_to_be32(size);
2466
2467         ok = drbd_send_cmd(mdev, USE_DATA_SOCKET, cmd, &p.head, sizeof(p));
2468         return ok;
2469 }
2470
2471 int drbd_send_drequest_csum(struct drbd_conf *mdev,
2472                             sector_t sector, int size,
2473                             void *digest, int digest_size,
2474                             enum drbd_packets cmd)
2475 {
2476         int ok;
2477         struct p_block_req p;
2478
2479         prepare_header(mdev, &p.head, cmd, sizeof(p) - sizeof(struct p_header) + digest_size);
2480         p.sector   = cpu_to_be64(sector);
2481         p.block_id = ID_SYNCER /* unused */;
2482         p.blksize  = cpu_to_be32(size);
2483
2484         mutex_lock(&mdev->tconn->data.mutex);
2485
2486         ok = (sizeof(p) == drbd_send(mdev, mdev->tconn->data.socket, &p, sizeof(p), 0));
2487         ok = ok && (digest_size == drbd_send(mdev, mdev->tconn->data.socket, digest, digest_size, 0));
2488
2489         mutex_unlock(&mdev->tconn->data.mutex);
2490
2491         return ok;
2492 }
2493
2494 int drbd_send_ov_request(struct drbd_conf *mdev, sector_t sector, int size)
2495 {
2496         int ok;
2497         struct p_block_req p;
2498
2499         p.sector   = cpu_to_be64(sector);
2500         p.block_id = ID_SYNCER /* unused */;
2501         p.blksize  = cpu_to_be32(size);
2502
2503         ok = drbd_send_cmd(mdev, USE_DATA_SOCKET, P_OV_REQUEST, &p.head, sizeof(p));
2504         return ok;
2505 }
2506
2507 /* called on sndtimeo
2508  * returns false if we should retry,
2509  * true if we think connection is dead
2510  */
2511 static int we_should_drop_the_connection(struct drbd_conf *mdev, struct socket *sock)
2512 {
2513         int drop_it;
2514         /* long elapsed = (long)(jiffies - mdev->last_received); */
2515
2516         drop_it =   mdev->tconn->meta.socket == sock
2517                 || !mdev->tconn->asender.task
2518                 || get_t_state(&mdev->tconn->asender) != RUNNING
2519                 || mdev->state.conn < C_CONNECTED;
2520
2521         if (drop_it)
2522                 return true;
2523
2524         drop_it = !--mdev->tconn->ko_count;
2525         if (!drop_it) {
2526                 dev_err(DEV, "[%s/%d] sock_sendmsg time expired, ko = %u\n",
2527                        current->comm, current->pid, mdev->tconn->ko_count);
2528                 request_ping(mdev);
2529         }
2530
2531         return drop_it; /* && (mdev->state == R_PRIMARY) */;
2532 }
2533
2534 /* The idea of sendpage seems to be to put some kind of reference
2535  * to the page into the skb, and to hand it over to the NIC. In
2536  * this process get_page() gets called.
2537  *
2538  * As soon as the page was really sent over the network put_page()
2539  * gets called by some part of the network layer. [ NIC driver? ]
2540  *
2541  * [ get_page() / put_page() increment/decrement the count. If count
2542  *   reaches 0 the page will be freed. ]
2543  *
2544  * This works nicely with pages from FSs.
2545  * But this means that in protocol A we might signal IO completion too early!
2546  *
2547  * In order not to corrupt data during a resync we must make sure
2548  * that we do not reuse our own buffer pages (EEs) to early, therefore
2549  * we have the net_ee list.
2550  *
2551  * XFS seems to have problems, still, it submits pages with page_count == 0!
2552  * As a workaround, we disable sendpage on pages
2553  * with page_count == 0 or PageSlab.
2554  */
2555 static int _drbd_no_send_page(struct drbd_conf *mdev, struct page *page,
2556                    int offset, size_t size, unsigned msg_flags)
2557 {
2558         int sent = drbd_send(mdev, mdev->tconn->data.socket, kmap(page) + offset, size, msg_flags);
2559         kunmap(page);
2560         if (sent == size)
2561                 mdev->send_cnt += size>>9;
2562         return sent == size;
2563 }
2564
2565 static int _drbd_send_page(struct drbd_conf *mdev, struct page *page,
2566                     int offset, size_t size, unsigned msg_flags)
2567 {
2568         mm_segment_t oldfs = get_fs();
2569         int sent, ok;
2570         int len = size;
2571
2572         /* e.g. XFS meta- & log-data is in slab pages, which have a
2573          * page_count of 0 and/or have PageSlab() set.
2574          * we cannot use send_page for those, as that does get_page();
2575          * put_page(); and would cause either a VM_BUG directly, or
2576          * __page_cache_release a page that would actually still be referenced
2577          * by someone, leading to some obscure delayed Oops somewhere else. */
2578         if (disable_sendpage || (page_count(page) < 1) || PageSlab(page))
2579                 return _drbd_no_send_page(mdev, page, offset, size, msg_flags);
2580
2581         msg_flags |= MSG_NOSIGNAL;
2582         drbd_update_congested(mdev);
2583         set_fs(KERNEL_DS);
2584         do {
2585                 sent = mdev->tconn->data.socket->ops->sendpage(mdev->tconn->data.socket, page,
2586                                                         offset, len,
2587                                                         msg_flags);
2588                 if (sent == -EAGAIN) {
2589                         if (we_should_drop_the_connection(mdev,
2590                                                           mdev->tconn->data.socket))
2591                                 break;
2592                         else
2593                                 continue;
2594                 }
2595                 if (sent <= 0) {
2596                         dev_warn(DEV, "%s: size=%d len=%d sent=%d\n",
2597                              __func__, (int)size, len, sent);
2598                         break;
2599                 }
2600                 len    -= sent;
2601                 offset += sent;
2602         } while (len > 0 /* THINK && mdev->cstate >= C_CONNECTED*/);
2603         set_fs(oldfs);
2604         clear_bit(NET_CONGESTED, &mdev->flags);
2605
2606         ok = (len == 0);
2607         if (likely(ok))
2608                 mdev->send_cnt += size>>9;
2609         return ok;
2610 }
2611
2612 static int _drbd_send_bio(struct drbd_conf *mdev, struct bio *bio)
2613 {
2614         struct bio_vec *bvec;
2615         int i;
2616         /* hint all but last page with MSG_MORE */
2617         __bio_for_each_segment(bvec, bio, i, 0) {
2618                 if (!_drbd_no_send_page(mdev, bvec->bv_page,
2619                                      bvec->bv_offset, bvec->bv_len,
2620                                      i == bio->bi_vcnt -1 ? 0 : MSG_MORE))
2621                         return 0;
2622         }
2623         return 1;
2624 }
2625
2626 static int _drbd_send_zc_bio(struct drbd_conf *mdev, struct bio *bio)
2627 {
2628         struct bio_vec *bvec;
2629         int i;
2630         /* hint all but last page with MSG_MORE */
2631         __bio_for_each_segment(bvec, bio, i, 0) {
2632                 if (!_drbd_send_page(mdev, bvec->bv_page,
2633                                      bvec->bv_offset, bvec->bv_len,
2634                                      i == bio->bi_vcnt -1 ? 0 : MSG_MORE))
2635                         return 0;
2636         }
2637         return 1;
2638 }
2639
2640 static int _drbd_send_zc_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
2641 {
2642         struct page *page = e->pages;
2643         unsigned len = e->i.size;
2644         /* hint all but last page with MSG_MORE */
2645         page_chain_for_each(page) {
2646                 unsigned l = min_t(unsigned, len, PAGE_SIZE);
2647                 if (!_drbd_send_page(mdev, page, 0, l,
2648                                 page_chain_next(page) ? MSG_MORE : 0))
2649                         return 0;
2650                 len -= l;
2651         }
2652         return 1;
2653 }
2654
2655 static u32 bio_flags_to_wire(struct drbd_conf *mdev, unsigned long bi_rw)
2656 {
2657         if (mdev->tconn->agreed_pro_version >= 95)
2658                 return  (bi_rw & REQ_SYNC ? DP_RW_SYNC : 0) |
2659                         (bi_rw & REQ_FUA ? DP_FUA : 0) |
2660                         (bi_rw & REQ_FLUSH ? DP_FLUSH : 0) |
2661                         (bi_rw & REQ_DISCARD ? DP_DISCARD : 0);
2662         else
2663                 return bi_rw & REQ_SYNC ? DP_RW_SYNC : 0;
2664 }
2665
2666 /* Used to send write requests
2667  * R_PRIMARY -> Peer    (P_DATA)
2668  */
2669 int drbd_send_dblock(struct drbd_conf *mdev, struct drbd_request *req)
2670 {
2671         int ok = 1;
2672         struct p_data p;
2673         unsigned int dp_flags = 0;
2674         void *dgb;
2675         int dgs;
2676
2677         if (!drbd_get_data_sock(mdev))
2678                 return 0;
2679
2680         dgs = (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_w_tfm) ?
2681                 crypto_hash_digestsize(mdev->tconn->integrity_w_tfm) : 0;
2682
2683         prepare_header(mdev, &p.head, P_DATA, sizeof(p) - sizeof(struct p_header) + dgs + req->i.size);
2684         p.sector   = cpu_to_be64(req->i.sector);
2685         p.block_id = (unsigned long)req;
2686         p.seq_num  = cpu_to_be32(req->seq_num = atomic_add_return(1, &mdev->packet_seq));
2687
2688         dp_flags = bio_flags_to_wire(mdev, req->master_bio->bi_rw);
2689
2690         if (mdev->state.conn >= C_SYNC_SOURCE &&
2691             mdev->state.conn <= C_PAUSED_SYNC_T)
2692                 dp_flags |= DP_MAY_SET_IN_SYNC;
2693
2694         p.dp_flags = cpu_to_be32(dp_flags);
2695         set_bit(UNPLUG_REMOTE, &mdev->flags);
2696         ok = (sizeof(p) ==
2697                 drbd_send(mdev, mdev->tconn->data.socket, &p, sizeof(p), dgs ? MSG_MORE : 0));
2698         if (ok && dgs) {
2699                 dgb = mdev->tconn->int_dig_out;
2700                 drbd_csum_bio(mdev, mdev->tconn->integrity_w_tfm, req->master_bio, dgb);
2701                 ok = dgs == drbd_send(mdev, mdev->tconn->data.socket, dgb, dgs, 0);
2702         }
2703         if (ok) {
2704                 /* For protocol A, we have to memcpy the payload into
2705                  * socket buffers, as we may complete right away
2706                  * as soon as we handed it over to tcp, at which point the data
2707                  * pages may become invalid.
2708                  *
2709                  * For data-integrity enabled, we copy it as well, so we can be
2710                  * sure that even if the bio pages may still be modified, it
2711                  * won't change the data on the wire, thus if the digest checks
2712                  * out ok after sending on this side, but does not fit on the
2713                  * receiving side, we sure have detected corruption elsewhere.
2714                  */
2715                 if (mdev->tconn->net_conf->wire_protocol == DRBD_PROT_A || dgs)
2716                         ok = _drbd_send_bio(mdev, req->master_bio);
2717                 else
2718                         ok = _drbd_send_zc_bio(mdev, req->master_bio);
2719
2720                 /* double check digest, sometimes buffers have been modified in flight. */
2721                 if (dgs > 0 && dgs <= 64) {
2722                         /* 64 byte, 512 bit, is the largest digest size
2723                          * currently supported in kernel crypto. */
2724                         unsigned char digest[64];
2725                         drbd_csum_bio(mdev, mdev->tconn->integrity_w_tfm, req->master_bio, digest);
2726                         if (memcmp(mdev->tconn->int_dig_out, digest, dgs)) {
2727                                 dev_warn(DEV,
2728                                         "Digest mismatch, buffer modified by upper layers during write: %llus +%u\n",
2729                                         (unsigned long long)req->i.sector, req->i.size);
2730                         }
2731                 } /* else if (dgs > 64) {
2732                      ... Be noisy about digest too large ...
2733                 } */
2734         }
2735
2736         drbd_put_data_sock(mdev);
2737
2738         return ok;
2739 }
2740
2741 /* answer packet, used to send data back for read requests:
2742  *  Peer       -> (diskless) R_PRIMARY   (P_DATA_REPLY)
2743  *  C_SYNC_SOURCE -> C_SYNC_TARGET         (P_RS_DATA_REPLY)
2744  */
2745 int drbd_send_block(struct drbd_conf *mdev, enum drbd_packets cmd,
2746                     struct drbd_epoch_entry *e)
2747 {
2748         int ok;
2749         struct p_data p;
2750         void *dgb;
2751         int dgs;
2752
2753         dgs = (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_w_tfm) ?
2754                 crypto_hash_digestsize(mdev->tconn->integrity_w_tfm) : 0;
2755
2756         prepare_header(mdev, &p.head, cmd, sizeof(p) - sizeof(struct p_header80) + dgs + e->i.size);
2757         p.sector   = cpu_to_be64(e->i.sector);
2758         p.block_id = e->block_id;
2759         /* p.seq_num  = 0;    No sequence numbers here.. */
2760
2761         /* Only called by our kernel thread.
2762          * This one may be interrupted by DRBD_SIG and/or DRBD_SIGKILL
2763          * in response to admin command or module unload.
2764          */
2765         if (!drbd_get_data_sock(mdev))
2766                 return 0;
2767
2768         ok = sizeof(p) == drbd_send(mdev, mdev->tconn->data.socket, &p, sizeof(p), dgs ? MSG_MORE : 0);
2769         if (ok && dgs) {
2770                 dgb = mdev->tconn->int_dig_out;
2771                 drbd_csum_ee(mdev, mdev->tconn->integrity_w_tfm, e, dgb);
2772                 ok = dgs == drbd_send(mdev, mdev->tconn->data.socket, dgb, dgs, 0);
2773         }
2774         if (ok)
2775                 ok = _drbd_send_zc_ee(mdev, e);
2776
2777         drbd_put_data_sock(mdev);
2778
2779         return ok;
2780 }
2781
2782 int drbd_send_oos(struct drbd_conf *mdev, struct drbd_request *req)
2783 {
2784         struct p_block_desc p;
2785
2786         p.sector  = cpu_to_be64(req->i.sector);
2787         p.blksize = cpu_to_be32(req->i.size);
2788
2789         return drbd_send_cmd(mdev, USE_DATA_SOCKET, P_OUT_OF_SYNC, &p.head, sizeof(p));
2790 }
2791
2792 /*
2793   drbd_send distinguishes two cases:
2794
2795   Packets sent via the data socket "sock"
2796   and packets sent via the meta data socket "msock"
2797
2798                     sock                      msock
2799   -----------------+-------------------------+------------------------------
2800   timeout           conf.timeout / 2          conf.timeout / 2
2801   timeout action    send a ping via msock     Abort communication
2802                                               and close all sockets
2803 */
2804
2805 /*
2806  * you must have down()ed the appropriate [m]sock_mutex elsewhere!
2807  */
2808 int drbd_send(struct drbd_conf *mdev, struct socket *sock,
2809               void *buf, size_t size, unsigned msg_flags)
2810 {
2811         struct kvec iov;
2812         struct msghdr msg;
2813         int rv, sent = 0;
2814
2815         if (!sock)
2816                 return -1000;
2817
2818         /* THINK  if (signal_pending) return ... ? */
2819
2820         iov.iov_base = buf;
2821         iov.iov_len  = size;
2822
2823         msg.msg_name       = NULL;
2824         msg.msg_namelen    = 0;
2825         msg.msg_control    = NULL;
2826         msg.msg_controllen = 0;
2827         msg.msg_flags      = msg_flags | MSG_NOSIGNAL;
2828
2829         if (sock == mdev->tconn->data.socket) {
2830                 mdev->tconn->ko_count = mdev->tconn->net_conf->ko_count;
2831                 drbd_update_congested(mdev);
2832         }
2833         do {
2834                 /* STRANGE
2835                  * tcp_sendmsg does _not_ use its size parameter at all ?
2836                  *
2837                  * -EAGAIN on timeout, -EINTR on signal.
2838                  */
2839 /* THINK
2840  * do we need to block DRBD_SIG if sock == &meta.socket ??
2841  * otherwise wake_asender() might interrupt some send_*Ack !
2842  */
2843                 rv = kernel_sendmsg(sock, &msg, &iov, 1, size);
2844                 if (rv == -EAGAIN) {
2845                         if (we_should_drop_the_connection(mdev, sock))
2846                                 break;
2847                         else
2848                                 continue;
2849                 }
2850                 D_ASSERT(rv != 0);
2851                 if (rv == -EINTR) {
2852                         flush_signals(current);
2853                         rv = 0;
2854                 }
2855                 if (rv < 0)
2856                         break;
2857                 sent += rv;
2858                 iov.iov_base += rv;
2859                 iov.iov_len  -= rv;
2860         } while (sent < size);
2861
2862         if (sock == mdev->tconn->data.socket)
2863                 clear_bit(NET_CONGESTED, &mdev->flags);
2864
2865         if (rv <= 0) {
2866                 if (rv != -EAGAIN) {
2867                         dev_err(DEV, "%s_sendmsg returned %d\n",
2868                             sock == mdev->tconn->meta.socket ? "msock" : "sock",
2869                             rv);
2870                         drbd_force_state(mdev, NS(conn, C_BROKEN_PIPE));
2871                 } else
2872                         drbd_force_state(mdev, NS(conn, C_TIMEOUT));
2873         }
2874
2875         return sent;
2876 }
2877
2878 static int drbd_open(struct block_device *bdev, fmode_t mode)
2879 {
2880         struct drbd_conf *mdev = bdev->bd_disk->private_data;
2881         unsigned long flags;
2882         int rv = 0;
2883
2884         mutex_lock(&drbd_main_mutex);
2885         spin_lock_irqsave(&mdev->tconn->req_lock, flags);
2886         /* to have a stable mdev->state.role
2887          * and no race with updating open_cnt */
2888
2889         if (mdev->state.role != R_PRIMARY) {
2890                 if (mode & FMODE_WRITE)
2891                         rv = -EROFS;
2892                 else if (!allow_oos)
2893                         rv = -EMEDIUMTYPE;
2894         }
2895
2896         if (!rv)
2897                 mdev->open_cnt++;
2898         spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
2899         mutex_unlock(&drbd_main_mutex);
2900
2901         return rv;
2902 }
2903
2904 static int drbd_release(struct gendisk *gd, fmode_t mode)
2905 {
2906         struct drbd_conf *mdev = gd->private_data;
2907         mutex_lock(&drbd_main_mutex);
2908         mdev->open_cnt--;
2909         mutex_unlock(&drbd_main_mutex);
2910         return 0;
2911 }
2912
2913 static void drbd_set_defaults(struct drbd_conf *mdev)
2914 {
2915         /* This way we get a compile error when sync_conf grows,
2916            and we forgot to initialize it here */
2917         mdev->sync_conf = (struct syncer_conf) {
2918                 /* .rate = */           DRBD_RATE_DEF,
2919                 /* .after = */          DRBD_AFTER_DEF,
2920                 /* .al_extents = */     DRBD_AL_EXTENTS_DEF,
2921                 /* .verify_alg = */     {}, 0,
2922                 /* .cpu_mask = */       {}, 0,
2923                 /* .csums_alg = */      {}, 0,
2924                 /* .use_rle = */        0,
2925                 /* .on_no_data = */     DRBD_ON_NO_DATA_DEF,
2926                 /* .c_plan_ahead = */   DRBD_C_PLAN_AHEAD_DEF,
2927                 /* .c_delay_target = */ DRBD_C_DELAY_TARGET_DEF,
2928                 /* .c_fill_target = */  DRBD_C_FILL_TARGET_DEF,
2929                 /* .c_max_rate = */     DRBD_C_MAX_RATE_DEF,
2930                 /* .c_min_rate = */     DRBD_C_MIN_RATE_DEF
2931         };
2932
2933         /* Have to use that way, because the layout differs between
2934            big endian and little endian */
2935         mdev->state = (union drbd_state) {
2936                 { .role = R_SECONDARY,
2937                   .peer = R_UNKNOWN,
2938                   .conn = C_STANDALONE,
2939                   .disk = D_DISKLESS,
2940                   .pdsk = D_UNKNOWN,
2941                   .susp = 0,
2942                   .susp_nod = 0,
2943                   .susp_fen = 0
2944                 } };
2945 }
2946
2947 void drbd_init_set_defaults(struct drbd_conf *mdev)
2948 {
2949         /* the memset(,0,) did most of this.
2950          * note: only assignments, no allocation in here */
2951
2952         drbd_set_defaults(mdev);
2953
2954         atomic_set(&mdev->ap_bio_cnt, 0);
2955         atomic_set(&mdev->ap_pending_cnt, 0);
2956         atomic_set(&mdev->rs_pending_cnt, 0);
2957         atomic_set(&mdev->unacked_cnt, 0);
2958         atomic_set(&mdev->local_cnt, 0);
2959         atomic_set(&mdev->packet_seq, 0);
2960         atomic_set(&mdev->pp_in_use, 0);
2961         atomic_set(&mdev->pp_in_use_by_net, 0);
2962         atomic_set(&mdev->rs_sect_in, 0);
2963         atomic_set(&mdev->rs_sect_ev, 0);
2964         atomic_set(&mdev->ap_in_flight, 0);
2965
2966         mutex_init(&mdev->md_io_mutex);
2967         mutex_init(&mdev->tconn->data.mutex);
2968         mutex_init(&mdev->tconn->meta.mutex);
2969         sema_init(&mdev->tconn->data.work.s, 0);
2970         sema_init(&mdev->tconn->meta.work.s, 0);
2971         mutex_init(&mdev->state_mutex);
2972
2973         spin_lock_init(&mdev->tconn->data.work.q_lock);
2974         spin_lock_init(&mdev->tconn->meta.work.q_lock);
2975
2976         spin_lock_init(&mdev->al_lock);
2977         spin_lock_init(&mdev->tconn->req_lock);
2978         spin_lock_init(&mdev->peer_seq_lock);
2979         spin_lock_init(&mdev->epoch_lock);
2980
2981         INIT_LIST_HEAD(&mdev->active_ee);
2982         INIT_LIST_HEAD(&mdev->sync_ee);
2983         INIT_LIST_HEAD(&mdev->done_ee);
2984         INIT_LIST_HEAD(&mdev->read_ee);
2985         INIT_LIST_HEAD(&mdev->net_ee);
2986         INIT_LIST_HEAD(&mdev->resync_reads);
2987         INIT_LIST_HEAD(&mdev->tconn->data.work.q);
2988         INIT_LIST_HEAD(&mdev->tconn->meta.work.q);
2989         INIT_LIST_HEAD(&mdev->resync_work.list);
2990         INIT_LIST_HEAD(&mdev->unplug_work.list);
2991         INIT_LIST_HEAD(&mdev->go_diskless.list);
2992         INIT_LIST_HEAD(&mdev->md_sync_work.list);
2993         INIT_LIST_HEAD(&mdev->start_resync_work.list);
2994         INIT_LIST_HEAD(&mdev->bm_io_work.w.list);
2995
2996         mdev->resync_work.cb  = w_resync_timer;
2997         mdev->unplug_work.cb  = w_send_write_hint;
2998         mdev->go_diskless.cb  = w_go_diskless;
2999         mdev->md_sync_work.cb = w_md_sync;
3000         mdev->bm_io_work.w.cb = w_bitmap_io;
3001         mdev->start_resync_work.cb = w_start_resync;
3002         init_timer(&mdev->resync_timer);
3003         init_timer(&mdev->md_sync_timer);
3004         init_timer(&mdev->start_resync_timer);
3005         init_timer(&mdev->request_timer);
3006         mdev->resync_timer.function = resync_timer_fn;
3007         mdev->resync_timer.data = (unsigned long) mdev;
3008         mdev->md_sync_timer.function = md_sync_timer_fn;
3009         mdev->md_sync_timer.data = (unsigned long) mdev;
3010         mdev->start_resync_timer.function = start_resync_timer_fn;
3011         mdev->start_resync_timer.data = (unsigned long) mdev;
3012         mdev->request_timer.function = request_timer_fn;
3013         mdev->request_timer.data = (unsigned long) mdev;
3014
3015         init_waitqueue_head(&mdev->misc_wait);
3016         init_waitqueue_head(&mdev->state_wait);
3017         init_waitqueue_head(&mdev->ee_wait);
3018         init_waitqueue_head(&mdev->al_wait);
3019         init_waitqueue_head(&mdev->seq_wait);
3020
3021         drbd_thread_init(mdev, &mdev->tconn->receiver, drbdd_init);
3022         drbd_thread_init(mdev, &mdev->tconn->worker, drbd_worker);
3023         drbd_thread_init(mdev, &mdev->tconn->asender, drbd_asender);
3024
3025         /* mdev->tconn->agreed_pro_version gets initialized in drbd_connect() */
3026         mdev->write_ordering = WO_bdev_flush;
3027         mdev->resync_wenr = LC_FREE;
3028         mdev->peer_max_bio_size = DRBD_MAX_BIO_SIZE_SAFE;
3029         mdev->local_max_bio_size = DRBD_MAX_BIO_SIZE_SAFE;
3030 }
3031
3032 void drbd_mdev_cleanup(struct drbd_conf *mdev)
3033 {
3034         int i;
3035         if (mdev->tconn->receiver.t_state != NONE)
3036                 dev_err(DEV, "ASSERT FAILED: receiver t_state == %d expected 0.\n",
3037                                 mdev->tconn->receiver.t_state);
3038
3039         /* no need to lock it, I'm the only thread alive */
3040         if (atomic_read(&mdev->current_epoch->epoch_size) !=  0)
3041                 dev_err(DEV, "epoch_size:%d\n", atomic_read(&mdev->current_epoch->epoch_size));
3042         mdev->al_writ_cnt  =
3043         mdev->bm_writ_cnt  =
3044         mdev->read_cnt     =
3045         mdev->recv_cnt     =
3046         mdev->send_cnt     =
3047         mdev->writ_cnt     =
3048         mdev->p_size       =
3049         mdev->rs_start     =
3050         mdev->rs_total     =
3051         mdev->rs_failed    = 0;
3052         mdev->rs_last_events = 0;
3053         mdev->rs_last_sect_ev = 0;
3054         for (i = 0; i < DRBD_SYNC_MARKS; i++) {
3055                 mdev->rs_mark_left[i] = 0;
3056                 mdev->rs_mark_time[i] = 0;
3057         }
3058         D_ASSERT(mdev->tconn->net_conf == NULL);
3059
3060         drbd_set_my_capacity(mdev, 0);
3061         if (mdev->bitmap) {
3062                 /* maybe never allocated. */
3063                 drbd_bm_resize(mdev, 0, 1);
3064                 drbd_bm_cleanup(mdev);
3065         }
3066
3067         drbd_free_resources(mdev);
3068         clear_bit(AL_SUSPENDED, &mdev->flags);
3069
3070         /*
3071          * currently we drbd_init_ee only on module load, so
3072          * we may do drbd_release_ee only on module unload!
3073          */
3074         D_ASSERT(list_empty(&mdev->active_ee));
3075         D_ASSERT(list_empty(&mdev->sync_ee));
3076         D_ASSERT(list_empty(&mdev->done_ee));
3077         D_ASSERT(list_empty(&mdev->read_ee));
3078         D_ASSERT(list_empty(&mdev->net_ee));
3079         D_ASSERT(list_empty(&mdev->resync_reads));
3080         D_ASSERT(list_empty(&mdev->tconn->data.work.q));
3081         D_ASSERT(list_empty(&mdev->tconn->meta.work.q));
3082         D_ASSERT(list_empty(&mdev->resync_work.list));
3083         D_ASSERT(list_empty(&mdev->unplug_work.list));
3084         D_ASSERT(list_empty(&mdev->go_diskless.list));
3085
3086         drbd_set_defaults(mdev);
3087 }
3088
3089
3090 static void drbd_destroy_mempools(void)
3091 {
3092         struct page *page;
3093
3094         while (drbd_pp_pool) {
3095                 page = drbd_pp_pool;
3096                 drbd_pp_pool = (struct page *)page_private(page);
3097                 __free_page(page);
3098                 drbd_pp_vacant--;
3099         }
3100
3101         /* D_ASSERT(atomic_read(&drbd_pp_vacant)==0); */
3102
3103         if (drbd_ee_mempool)
3104                 mempool_destroy(drbd_ee_mempool);
3105         if (drbd_request_mempool)
3106                 mempool_destroy(drbd_request_mempool);
3107         if (drbd_ee_cache)
3108                 kmem_cache_destroy(drbd_ee_cache);
3109         if (drbd_request_cache)
3110                 kmem_cache_destroy(drbd_request_cache);
3111         if (drbd_bm_ext_cache)
3112                 kmem_cache_destroy(drbd_bm_ext_cache);
3113         if (drbd_al_ext_cache)
3114                 kmem_cache_destroy(drbd_al_ext_cache);
3115
3116         drbd_ee_mempool      = NULL;
3117         drbd_request_mempool = NULL;
3118         drbd_ee_cache        = NULL;
3119         drbd_request_cache   = NULL;
3120         drbd_bm_ext_cache    = NULL;
3121         drbd_al_ext_cache    = NULL;
3122
3123         return;
3124 }
3125
3126 static int drbd_create_mempools(void)
3127 {
3128         struct page *page;
3129         const int number = (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count;
3130         int i;
3131
3132         /* prepare our caches and mempools */
3133         drbd_request_mempool = NULL;
3134         drbd_ee_cache        = NULL;
3135         drbd_request_cache   = NULL;
3136         drbd_bm_ext_cache    = NULL;
3137         drbd_al_ext_cache    = NULL;
3138         drbd_pp_pool         = NULL;
3139
3140         /* caches */
3141         drbd_request_cache = kmem_cache_create(
3142                 "drbd_req", sizeof(struct drbd_request), 0, 0, NULL);
3143         if (drbd_request_cache == NULL)
3144                 goto Enomem;
3145
3146         drbd_ee_cache = kmem_cache_create(
3147                 "drbd_ee", sizeof(struct drbd_epoch_entry), 0, 0, NULL);
3148         if (drbd_ee_cache == NULL)
3149                 goto Enomem;
3150
3151         drbd_bm_ext_cache = kmem_cache_create(
3152                 "drbd_bm", sizeof(struct bm_extent), 0, 0, NULL);
3153         if (drbd_bm_ext_cache == NULL)
3154                 goto Enomem;
3155
3156         drbd_al_ext_cache = kmem_cache_create(
3157                 "drbd_al", sizeof(struct lc_element), 0, 0, NULL);
3158         if (drbd_al_ext_cache == NULL)
3159                 goto Enomem;
3160
3161         /* mempools */
3162         drbd_request_mempool = mempool_create(number,
3163                 mempool_alloc_slab, mempool_free_slab, drbd_request_cache);
3164         if (drbd_request_mempool == NULL)
3165                 goto Enomem;
3166
3167         drbd_ee_mempool = mempool_create(number,
3168                 mempool_alloc_slab, mempool_free_slab, drbd_ee_cache);
3169         if (drbd_ee_mempool == NULL)
3170                 goto Enomem;
3171
3172         /* drbd's page pool */
3173         spin_lock_init(&drbd_pp_lock);
3174
3175         for (i = 0; i < number; i++) {
3176                 page = alloc_page(GFP_HIGHUSER);
3177                 if (!page)
3178                         goto Enomem;
3179                 set_page_private(page, (unsigned long)drbd_pp_pool);
3180                 drbd_pp_pool = page;
3181         }
3182         drbd_pp_vacant = number;
3183
3184         return 0;
3185
3186 Enomem:
3187         drbd_destroy_mempools(); /* in case we allocated some */
3188         return -ENOMEM;
3189 }
3190
3191 static int drbd_notify_sys(struct notifier_block *this, unsigned long code,
3192         void *unused)
3193 {
3194         /* just so we have it.  you never know what interesting things we
3195          * might want to do here some day...
3196          */
3197
3198         return NOTIFY_DONE;
3199 }
3200
3201 static struct notifier_block drbd_notifier = {
3202         .notifier_call = drbd_notify_sys,
3203 };
3204
3205 static void drbd_release_ee_lists(struct drbd_conf *mdev)
3206 {
3207         int rr;
3208
3209         rr = drbd_release_ee(mdev, &mdev->active_ee);
3210         if (rr)
3211                 dev_err(DEV, "%d EEs in active list found!\n", rr);
3212
3213         rr = drbd_release_ee(mdev, &mdev->sync_ee);
3214         if (rr)
3215                 dev_err(DEV, "%d EEs in sync list found!\n", rr);
3216
3217         rr = drbd_release_ee(mdev, &mdev->read_ee);
3218         if (rr)
3219                 dev_err(DEV, "%d EEs in read list found!\n", rr);
3220
3221         rr = drbd_release_ee(mdev, &mdev->done_ee);
3222         if (rr)
3223                 dev_err(DEV, "%d EEs in done list found!\n", rr);
3224
3225         rr = drbd_release_ee(mdev, &mdev->net_ee);
3226         if (rr)
3227                 dev_err(DEV, "%d EEs in net list found!\n", rr);
3228 }
3229
3230 /* caution. no locking.
3231  * currently only used from module cleanup code. */
3232 static void drbd_delete_device(unsigned int minor)
3233 {
3234         struct drbd_conf *mdev = minor_to_mdev(minor);
3235
3236         if (!mdev)
3237                 return;
3238
3239         /* paranoia asserts */
3240         D_ASSERT(mdev->open_cnt == 0);
3241         D_ASSERT(list_empty(&mdev->tconn->data.work.q));
3242         /* end paranoia asserts */
3243
3244         del_gendisk(mdev->vdisk);
3245
3246         /* cleanup stuff that may have been allocated during
3247          * device (re-)configuration or state changes */
3248
3249         if (mdev->this_bdev)
3250                 bdput(mdev->this_bdev);
3251
3252         drbd_free_resources(mdev);
3253         drbd_free_tconn(mdev->tconn);
3254
3255         drbd_release_ee_lists(mdev);
3256
3257         lc_destroy(mdev->act_log);
3258         lc_destroy(mdev->resync);
3259
3260         kfree(mdev->p_uuid);
3261         /* mdev->p_uuid = NULL; */
3262
3263         /* cleanup the rest that has been
3264          * allocated from drbd_new_device
3265          * and actually free the mdev itself */
3266         drbd_free_mdev(mdev);
3267 }
3268
3269 static void drbd_cleanup(void)
3270 {
3271         unsigned int i;
3272
3273         unregister_reboot_notifier(&drbd_notifier);
3274
3275         /* first remove proc,
3276          * drbdsetup uses it's presence to detect
3277          * whether DRBD is loaded.
3278          * If we would get stuck in proc removal,
3279          * but have netlink already deregistered,
3280          * some drbdsetup commands may wait forever
3281          * for an answer.
3282          */
3283         if (drbd_proc)
3284                 remove_proc_entry("drbd", NULL);
3285
3286         drbd_nl_cleanup();
3287
3288         if (minor_table) {
3289                 i = minor_count;
3290                 while (i--)
3291                         drbd_delete_device(i);
3292                 drbd_destroy_mempools();
3293         }
3294
3295         kfree(minor_table);
3296
3297         unregister_blkdev(DRBD_MAJOR, "drbd");
3298
3299         printk(KERN_INFO "drbd: module cleanup done.\n");
3300 }
3301
3302 /**
3303  * drbd_congested() - Callback for pdflush
3304  * @congested_data:     User data
3305  * @bdi_bits:           Bits pdflush is currently interested in
3306  *
3307  * Returns 1<<BDI_async_congested and/or 1<<BDI_sync_congested if we are congested.
3308  */
3309 static int drbd_congested(void *congested_data, int bdi_bits)
3310 {
3311         struct drbd_conf *mdev = congested_data;
3312         struct request_queue *q;
3313         char reason = '-';
3314         int r = 0;
3315
3316         if (!may_inc_ap_bio(mdev)) {
3317                 /* DRBD has frozen IO */
3318                 r = bdi_bits;
3319                 reason = 'd';
3320                 goto out;
3321         }
3322
3323         if (get_ldev(mdev)) {
3324                 q = bdev_get_queue(mdev->ldev->backing_bdev);
3325                 r = bdi_congested(&q->backing_dev_info, bdi_bits);
3326                 put_ldev(mdev);
3327                 if (r)
3328                         reason = 'b';
3329         }
3330
3331         if (bdi_bits & (1 << BDI_async_congested) && test_bit(NET_CONGESTED, &mdev->flags)) {
3332                 r |= (1 << BDI_async_congested);
3333                 reason = reason == 'b' ? 'a' : 'n';
3334         }
3335
3336 out:
3337         mdev->congestion_reason = reason;
3338         return r;
3339 }
3340
3341 struct drbd_tconn *drbd_new_tconn(char *name)
3342 {
3343         struct drbd_tconn *tconn;
3344
3345         tconn = kzalloc(sizeof(struct drbd_tconn), GFP_KERNEL);
3346         if (!tconn)
3347                 return NULL;
3348
3349         tconn->name = kstrdup(name, GFP_KERNEL);
3350         if (!tconn->name)
3351                 goto fail;
3352
3353         atomic_set(&tconn->net_cnt, 0);
3354         init_waitqueue_head(&tconn->net_cnt_wait);
3355
3356         write_lock_irq(&global_state_lock);
3357         list_add(&tconn->all_tconn, &drbd_tconns);
3358         write_unlock_irq(&global_state_lock);
3359
3360         return tconn;
3361
3362 fail:
3363         kfree(tconn->name);
3364         kfree(tconn);
3365
3366         return NULL;
3367 }
3368
3369 void drbd_free_tconn(struct drbd_tconn *tconn)
3370 {
3371         write_lock_irq(&global_state_lock);
3372         list_del(&tconn->all_tconn);
3373         write_unlock_irq(&global_state_lock);
3374
3375         kfree(tconn->name);
3376         kfree(tconn->int_dig_out);
3377         kfree(tconn->int_dig_in);
3378         kfree(tconn->int_dig_vv);
3379         kfree(tconn);
3380 }
3381
3382 struct drbd_conf *drbd_new_device(unsigned int minor)
3383 {
3384         struct drbd_conf *mdev;
3385         struct gendisk *disk;
3386         struct request_queue *q;
3387
3388         /* GFP_KERNEL, we are outside of all write-out paths */
3389         mdev = kzalloc(sizeof(struct drbd_conf), GFP_KERNEL);
3390         if (!mdev)
3391                 return NULL;
3392         mdev->tconn = drbd_new_tconn("dummy");
3393         if (!mdev->tconn)
3394                 goto out_no_tconn;
3395
3396         if (!zalloc_cpumask_var(&mdev->cpu_mask, GFP_KERNEL))
3397                 goto out_no_cpumask;
3398
3399         mdev->tconn->volume0 = mdev;
3400         mdev->minor = minor;
3401
3402         drbd_init_set_defaults(mdev);
3403
3404         q = blk_alloc_queue(GFP_KERNEL);
3405         if (!q)
3406                 goto out_no_q;
3407         mdev->rq_queue = q;
3408         q->queuedata   = mdev;
3409
3410         disk = alloc_disk(1);
3411         if (!disk)
3412                 goto out_no_disk;
3413         mdev->vdisk = disk;
3414
3415         set_disk_ro(disk, true);
3416
3417         disk->queue = q;
3418         disk->major = DRBD_MAJOR;
3419         disk->first_minor = minor;
3420         disk->fops = &drbd_ops;
3421         sprintf(disk->disk_name, "drbd%d", minor);
3422         disk->private_data = mdev;
3423
3424         mdev->this_bdev = bdget(MKDEV(DRBD_MAJOR, minor));
3425         /* we have no partitions. we contain only ourselves. */
3426         mdev->this_bdev->bd_contains = mdev->this_bdev;
3427
3428         q->backing_dev_info.congested_fn = drbd_congested;
3429         q->backing_dev_info.congested_data = mdev;
3430
3431         blk_queue_make_request(q, drbd_make_request);
3432         /* Setting the max_hw_sectors to an odd value of 8kibyte here
3433            This triggers a max_bio_size message upon first attach or connect */
3434         blk_queue_max_hw_sectors(q, DRBD_MAX_BIO_SIZE_SAFE >> 8);
3435         blk_queue_bounce_limit(q, BLK_BOUNCE_ANY);
3436         blk_queue_merge_bvec(q, drbd_merge_bvec);
3437         q->queue_lock = &mdev->tconn->req_lock; /* needed since we use */
3438
3439         mdev->md_io_page = alloc_page(GFP_KERNEL);
3440         if (!mdev->md_io_page)
3441                 goto out_no_io_page;
3442
3443         if (drbd_bm_init(mdev))
3444                 goto out_no_bitmap;
3445         /* no need to lock access, we are still initializing this minor device. */
3446         if (!tl_init(mdev))
3447                 goto out_no_tl;
3448         mdev->read_requests = RB_ROOT;
3449         mdev->write_requests = RB_ROOT;
3450         mdev->epoch_entries = RB_ROOT;
3451
3452         mdev->current_epoch = kzalloc(sizeof(struct drbd_epoch), GFP_KERNEL);
3453         if (!mdev->current_epoch)
3454                 goto out_no_epoch;
3455
3456         INIT_LIST_HEAD(&mdev->current_epoch->list);
3457         mdev->epochs = 1;
3458
3459         return mdev;
3460
3461 /* out_whatever_else:
3462         kfree(mdev->current_epoch); */
3463 out_no_epoch:
3464         tl_cleanup(mdev);
3465 out_no_tl:
3466         drbd_bm_cleanup(mdev);
3467 out_no_bitmap:
3468         __free_page(mdev->md_io_page);
3469 out_no_io_page:
3470         put_disk(disk);
3471 out_no_disk:
3472         blk_cleanup_queue(q);
3473 out_no_q:
3474         free_cpumask_var(mdev->cpu_mask);
3475 out_no_cpumask:
3476         drbd_free_tconn(mdev->tconn);
3477 out_no_tconn:
3478         kfree(mdev);
3479         return NULL;
3480 }
3481
3482 /* counterpart of drbd_new_device.
3483  * last part of drbd_delete_device. */
3484 void drbd_free_mdev(struct drbd_conf *mdev)
3485 {
3486         kfree(mdev->current_epoch);
3487         tl_cleanup(mdev);
3488         if (mdev->bitmap) /* should no longer be there. */
3489                 drbd_bm_cleanup(mdev);
3490         __free_page(mdev->md_io_page);
3491         put_disk(mdev->vdisk);
3492         blk_cleanup_queue(mdev->rq_queue);
3493         free_cpumask_var(mdev->cpu_mask);
3494         kfree(mdev);
3495 }
3496
3497
3498 int __init drbd_init(void)
3499 {
3500         int err;
3501
3502         BUILD_BUG_ON(sizeof(struct p_header80) != sizeof(struct p_header95));
3503         BUILD_BUG_ON(sizeof(struct p_handshake) != 80);
3504
3505         if (minor_count < DRBD_MINOR_COUNT_MIN || minor_count > DRBD_MINOR_COUNT_MAX) {
3506                 printk(KERN_ERR
3507                         "drbd: invalid minor_count (%d)\n", minor_count);
3508 #ifdef MODULE
3509                 return -EINVAL;
3510 #else
3511                 minor_count = 8;
3512 #endif
3513         }
3514
3515         err = drbd_nl_init();
3516         if (err)
3517                 return err;
3518
3519         err = register_blkdev(DRBD_MAJOR, "drbd");
3520         if (err) {
3521                 printk(KERN_ERR
3522                        "drbd: unable to register block device major %d\n",
3523                        DRBD_MAJOR);
3524                 return err;
3525         }
3526
3527         register_reboot_notifier(&drbd_notifier);
3528
3529         /*
3530          * allocate all necessary structs
3531          */
3532         err = -ENOMEM;
3533
3534         init_waitqueue_head(&drbd_pp_wait);
3535
3536         drbd_proc = NULL; /* play safe for drbd_cleanup */
3537         minor_table = kzalloc(sizeof(struct drbd_conf *)*minor_count,
3538                                 GFP_KERNEL);
3539         if (!minor_table)
3540                 goto Enomem;
3541
3542         err = drbd_create_mempools();
3543         if (err)
3544                 goto Enomem;
3545
3546         drbd_proc = proc_create_data("drbd", S_IFREG | S_IRUGO , NULL, &drbd_proc_fops, NULL);
3547         if (!drbd_proc) {
3548                 printk(KERN_ERR "drbd: unable to register proc file\n");
3549                 goto Enomem;
3550         }
3551
3552         rwlock_init(&global_state_lock);
3553         INIT_LIST_HEAD(&drbd_tconns);
3554
3555         printk(KERN_INFO "drbd: initialized. "
3556                "Version: " REL_VERSION " (api:%d/proto:%d-%d)\n",
3557                API_VERSION, PRO_VERSION_MIN, PRO_VERSION_MAX);
3558         printk(KERN_INFO "drbd: %s\n", drbd_buildtag());
3559         printk(KERN_INFO "drbd: registered as block device major %d\n",
3560                 DRBD_MAJOR);
3561         printk(KERN_INFO "drbd: minor_table @ 0x%p\n", minor_table);
3562
3563         return 0; /* Success! */
3564
3565 Enomem:
3566         drbd_cleanup();
3567         if (err == -ENOMEM)
3568                 /* currently always the case */
3569                 printk(KERN_ERR "drbd: ran out of memory\n");
3570         else
3571                 printk(KERN_ERR "drbd: initialization failure\n");
3572         return err;
3573 }
3574
3575 void drbd_free_bc(struct drbd_backing_dev *ldev)
3576 {
3577         if (ldev == NULL)
3578                 return;
3579
3580         blkdev_put(ldev->backing_bdev, FMODE_READ | FMODE_WRITE | FMODE_EXCL);
3581         blkdev_put(ldev->md_bdev, FMODE_READ | FMODE_WRITE | FMODE_EXCL);
3582
3583         kfree(ldev);
3584 }
3585
3586 void drbd_free_sock(struct drbd_conf *mdev)
3587 {
3588         if (mdev->tconn->data.socket) {
3589                 mutex_lock(&mdev->tconn->data.mutex);
3590                 kernel_sock_shutdown(mdev->tconn->data.socket, SHUT_RDWR);
3591                 sock_release(mdev->tconn->data.socket);
3592                 mdev->tconn->data.socket = NULL;
3593                 mutex_unlock(&mdev->tconn->data.mutex);
3594         }
3595         if (mdev->tconn->meta.socket) {
3596                 mutex_lock(&mdev->tconn->meta.mutex);
3597                 kernel_sock_shutdown(mdev->tconn->meta.socket, SHUT_RDWR);
3598                 sock_release(mdev->tconn->meta.socket);
3599                 mdev->tconn->meta.socket = NULL;
3600                 mutex_unlock(&mdev->tconn->meta.mutex);
3601         }
3602 }
3603
3604
3605 void drbd_free_resources(struct drbd_conf *mdev)
3606 {
3607         crypto_free_hash(mdev->csums_tfm);
3608         mdev->csums_tfm = NULL;
3609         crypto_free_hash(mdev->verify_tfm);
3610         mdev->verify_tfm = NULL;
3611         crypto_free_hash(mdev->tconn->cram_hmac_tfm);
3612         mdev->tconn->cram_hmac_tfm = NULL;
3613         crypto_free_hash(mdev->tconn->integrity_w_tfm);
3614         mdev->tconn->integrity_w_tfm = NULL;
3615         crypto_free_hash(mdev->tconn->integrity_r_tfm);
3616         mdev->tconn->integrity_r_tfm = NULL;
3617
3618         drbd_free_sock(mdev);
3619
3620         __no_warn(local,
3621                   drbd_free_bc(mdev->ldev);
3622                   mdev->ldev = NULL;);
3623 }
3624
3625 /* meta data management */
3626
3627 struct meta_data_on_disk {
3628         u64 la_size;           /* last agreed size. */
3629         u64 uuid[UI_SIZE];   /* UUIDs. */
3630         u64 device_uuid;
3631         u64 reserved_u64_1;
3632         u32 flags;             /* MDF */
3633         u32 magic;
3634         u32 md_size_sect;
3635         u32 al_offset;         /* offset to this block */
3636         u32 al_nr_extents;     /* important for restoring the AL */
3637               /* `-- act_log->nr_elements <-- sync_conf.al_extents */
3638         u32 bm_offset;         /* offset to the bitmap, from here */
3639         u32 bm_bytes_per_bit;  /* BM_BLOCK_SIZE */
3640         u32 la_peer_max_bio_size;   /* last peer max_bio_size */
3641         u32 reserved_u32[3];
3642
3643 } __packed;
3644
3645 /**
3646  * drbd_md_sync() - Writes the meta data super block if the MD_DIRTY flag bit is set
3647  * @mdev:       DRBD device.
3648  */
3649 void drbd_md_sync(struct drbd_conf *mdev)
3650 {
3651         struct meta_data_on_disk *buffer;
3652         sector_t sector;
3653         int i;
3654
3655         del_timer(&mdev->md_sync_timer);
3656         /* timer may be rearmed by drbd_md_mark_dirty() now. */
3657         if (!test_and_clear_bit(MD_DIRTY, &mdev->flags))
3658                 return;
3659
3660         /* We use here D_FAILED and not D_ATTACHING because we try to write
3661          * metadata even if we detach due to a disk failure! */
3662         if (!get_ldev_if_state(mdev, D_FAILED))
3663                 return;
3664
3665         mutex_lock(&mdev->md_io_mutex);
3666         buffer = (struct meta_data_on_disk *)page_address(mdev->md_io_page);
3667         memset(buffer, 0, 512);
3668
3669         buffer->la_size = cpu_to_be64(drbd_get_capacity(mdev->this_bdev));
3670         for (i = UI_CURRENT; i < UI_SIZE; i++)
3671                 buffer->uuid[i] = cpu_to_be64(mdev->ldev->md.uuid[i]);
3672         buffer->flags = cpu_to_be32(mdev->ldev->md.flags);
3673         buffer->magic = cpu_to_be32(DRBD_MD_MAGIC);
3674
3675         buffer->md_size_sect  = cpu_to_be32(mdev->ldev->md.md_size_sect);
3676         buffer->al_offset     = cpu_to_be32(mdev->ldev->md.al_offset);
3677         buffer->al_nr_extents = cpu_to_be32(mdev->act_log->nr_elements);
3678         buffer->bm_bytes_per_bit = cpu_to_be32(BM_BLOCK_SIZE);
3679         buffer->device_uuid = cpu_to_be64(mdev->ldev->md.device_uuid);
3680
3681         buffer->bm_offset = cpu_to_be32(mdev->ldev->md.bm_offset);
3682         buffer->la_peer_max_bio_size = cpu_to_be32(mdev->peer_max_bio_size);
3683
3684         D_ASSERT(drbd_md_ss__(mdev, mdev->ldev) == mdev->ldev->md.md_offset);
3685         sector = mdev->ldev->md.md_offset;
3686
3687         if (!drbd_md_sync_page_io(mdev, mdev->ldev, sector, WRITE)) {
3688                 /* this was a try anyways ... */
3689                 dev_err(DEV, "meta data update failed!\n");
3690                 drbd_chk_io_error(mdev, 1, true);
3691         }
3692
3693         /* Update mdev->ldev->md.la_size_sect,
3694          * since we updated it on metadata. */
3695         mdev->ldev->md.la_size_sect = drbd_get_capacity(mdev->this_bdev);
3696
3697         mutex_unlock(&mdev->md_io_mutex);
3698         put_ldev(mdev);
3699 }
3700
3701 /**
3702  * drbd_md_read() - Reads in the meta data super block
3703  * @mdev:       DRBD device.
3704  * @bdev:       Device from which the meta data should be read in.
3705  *
3706  * Return 0 (NO_ERROR) on success, and an enum drbd_ret_code in case
3707  * something goes wrong.  Currently only: ERR_IO_MD_DISK, ERR_MD_INVALID.
3708  */
3709 int drbd_md_read(struct drbd_conf *mdev, struct drbd_backing_dev *bdev)
3710 {
3711         struct meta_data_on_disk *buffer;
3712         int i, rv = NO_ERROR;
3713
3714         if (!get_ldev_if_state(mdev, D_ATTACHING))
3715                 return ERR_IO_MD_DISK;
3716
3717         mutex_lock(&mdev->md_io_mutex);
3718         buffer = (struct meta_data_on_disk *)page_address(mdev->md_io_page);
3719
3720         if (!drbd_md_sync_page_io(mdev, bdev, bdev->md.md_offset, READ)) {
3721                 /* NOTE: can't do normal error processing here as this is
3722                    called BEFORE disk is attached */
3723                 dev_err(DEV, "Error while reading metadata.\n");
3724                 rv = ERR_IO_MD_DISK;
3725                 goto err;
3726         }
3727
3728         if (buffer->magic != cpu_to_be32(DRBD_MD_MAGIC)) {
3729                 dev_err(DEV, "Error while reading metadata, magic not found.\n");
3730                 rv = ERR_MD_INVALID;
3731                 goto err;
3732         }
3733         if (be32_to_cpu(buffer->al_offset) != bdev->md.al_offset) {
3734                 dev_err(DEV, "unexpected al_offset: %d (expected %d)\n",
3735                     be32_to_cpu(buffer->al_offset), bdev->md.al_offset);
3736                 rv = ERR_MD_INVALID;
3737                 goto err;
3738         }
3739         if (be32_to_cpu(buffer->bm_offset) != bdev->md.bm_offset) {
3740                 dev_err(DEV, "unexpected bm_offset: %d (expected %d)\n",
3741                     be32_to_cpu(buffer->bm_offset), bdev->md.bm_offset);
3742                 rv = ERR_MD_INVALID;
3743                 goto err;
3744         }
3745         if (be32_to_cpu(buffer->md_size_sect) != bdev->md.md_size_sect) {
3746                 dev_err(DEV, "unexpected md_size: %u (expected %u)\n",
3747                     be32_to_cpu(buffer->md_size_sect), bdev->md.md_size_sect);
3748                 rv = ERR_MD_INVALID;
3749                 goto err;
3750         }
3751
3752         if (be32_to_cpu(buffer->bm_bytes_per_bit) != BM_BLOCK_SIZE) {
3753                 dev_err(DEV, "unexpected bm_bytes_per_bit: %u (expected %u)\n",
3754                     be32_to_cpu(buffer->bm_bytes_per_bit), BM_BLOCK_SIZE);
3755                 rv = ERR_MD_INVALID;
3756                 goto err;
3757         }
3758
3759         bdev->md.la_size_sect = be64_to_cpu(buffer->la_size);
3760         for (i = UI_CURRENT; i < UI_SIZE; i++)
3761                 bdev->md.uuid[i] = be64_to_cpu(buffer->uuid[i]);
3762         bdev->md.flags = be32_to_cpu(buffer->flags);
3763         mdev->sync_conf.al_extents = be32_to_cpu(buffer->al_nr_extents);
3764         bdev->md.device_uuid = be64_to_cpu(buffer->device_uuid);
3765
3766         spin_lock_irq(&mdev->tconn->req_lock);
3767         if (mdev->state.conn < C_CONNECTED) {
3768                 int peer;
3769                 peer = be32_to_cpu(buffer->la_peer_max_bio_size);
3770                 peer = max_t(int, peer, DRBD_MAX_BIO_SIZE_SAFE);
3771                 mdev->peer_max_bio_size = peer;
3772         }
3773         spin_unlock_irq(&mdev->tconn->req_lock);
3774
3775         if (mdev->sync_conf.al_extents < 7)
3776                 mdev->sync_conf.al_extents = 127;
3777
3778  err:
3779         mutex_unlock(&mdev->md_io_mutex);
3780         put_ldev(mdev);
3781
3782         return rv;
3783 }
3784
3785 /**
3786  * drbd_md_mark_dirty() - Mark meta data super block as dirty
3787  * @mdev:       DRBD device.
3788  *
3789  * Call this function if you change anything that should be written to
3790  * the meta-data super block. This function sets MD_DIRTY, and starts a
3791  * timer that ensures that within five seconds you have to call drbd_md_sync().
3792  */
3793 #ifdef DEBUG
3794 void drbd_md_mark_dirty_(struct drbd_conf *mdev, unsigned int line, const char *func)
3795 {
3796         if (!test_and_set_bit(MD_DIRTY, &mdev->flags)) {
3797                 mod_timer(&mdev->md_sync_timer, jiffies + HZ);
3798                 mdev->last_md_mark_dirty.line = line;
3799                 mdev->last_md_mark_dirty.func = func;
3800         }
3801 }
3802 #else
3803 void drbd_md_mark_dirty(struct drbd_conf *mdev)
3804 {
3805         if (!test_and_set_bit(MD_DIRTY, &mdev->flags))
3806                 mod_timer(&mdev->md_sync_timer, jiffies + 5*HZ);
3807 }
3808 #endif
3809
3810 static void drbd_uuid_move_history(struct drbd_conf *mdev) __must_hold(local)
3811 {
3812         int i;
3813
3814         for (i = UI_HISTORY_START; i < UI_HISTORY_END; i++)
3815                 mdev->ldev->md.uuid[i+1] = mdev->ldev->md.uuid[i];
3816 }
3817
3818 void _drbd_uuid_set(struct drbd_conf *mdev, int idx, u64 val) __must_hold(local)
3819 {
3820         if (idx == UI_CURRENT) {
3821                 if (mdev->state.role == R_PRIMARY)
3822                         val |= 1;
3823                 else
3824                         val &= ~((u64)1);
3825
3826                 drbd_set_ed_uuid(mdev, val);
3827         }
3828
3829         mdev->ldev->md.uuid[idx] = val;
3830         drbd_md_mark_dirty(mdev);
3831 }
3832
3833
3834 void drbd_uuid_set(struct drbd_conf *mdev, int idx, u64 val) __must_hold(local)
3835 {
3836         if (mdev->ldev->md.uuid[idx]) {
3837                 drbd_uuid_move_history(mdev);
3838                 mdev->ldev->md.uuid[UI_HISTORY_START] = mdev->ldev->md.uuid[idx];
3839         }
3840         _drbd_uuid_set(mdev, idx, val);
3841 }
3842
3843 /**
3844  * drbd_uuid_new_current() - Creates a new current UUID
3845  * @mdev:       DRBD device.
3846  *
3847  * Creates a new current UUID, and rotates the old current UUID into
3848  * the bitmap slot. Causes an incremental resync upon next connect.
3849  */
3850 void drbd_uuid_new_current(struct drbd_conf *mdev) __must_hold(local)
3851 {
3852         u64 val;
3853         unsigned long long bm_uuid = mdev->ldev->md.uuid[UI_BITMAP];
3854
3855         if (bm_uuid)
3856                 dev_warn(DEV, "bm UUID was already set: %llX\n", bm_uuid);
3857
3858         mdev->ldev->md.uuid[UI_BITMAP] = mdev->ldev->md.uuid[UI_CURRENT];
3859
3860         get_random_bytes(&val, sizeof(u64));
3861         _drbd_uuid_set(mdev, UI_CURRENT, val);
3862         drbd_print_uuids(mdev, "new current UUID");
3863         /* get it to stable storage _now_ */
3864         drbd_md_sync(mdev);
3865 }
3866
3867 void drbd_uuid_set_bm(struct drbd_conf *mdev, u64 val) __must_hold(local)
3868 {
3869         if (mdev->ldev->md.uuid[UI_BITMAP] == 0 && val == 0)
3870                 return;
3871
3872         if (val == 0) {
3873                 drbd_uuid_move_history(mdev);
3874                 mdev->ldev->md.uuid[UI_HISTORY_START] = mdev->ldev->md.uuid[UI_BITMAP];
3875                 mdev->ldev->md.uuid[UI_BITMAP] = 0;
3876         } else {
3877                 unsigned long long bm_uuid = mdev->ldev->md.uuid[UI_BITMAP];
3878                 if (bm_uuid)
3879                         dev_warn(DEV, "bm UUID was already set: %llX\n", bm_uuid);
3880
3881                 mdev->ldev->md.uuid[UI_BITMAP] = val & ~((u64)1);
3882         }
3883         drbd_md_mark_dirty(mdev);
3884 }
3885
3886 /**
3887  * drbd_bmio_set_n_write() - io_fn for drbd_queue_bitmap_io() or drbd_bitmap_io()
3888  * @mdev:       DRBD device.
3889  *
3890  * Sets all bits in the bitmap and writes the whole bitmap to stable storage.
3891  */
3892 int drbd_bmio_set_n_write(struct drbd_conf *mdev)
3893 {
3894         int rv = -EIO;
3895
3896         if (get_ldev_if_state(mdev, D_ATTACHING)) {
3897                 drbd_md_set_flag(mdev, MDF_FULL_SYNC);
3898                 drbd_md_sync(mdev);
3899                 drbd_bm_set_all(mdev);
3900
3901                 rv = drbd_bm_write(mdev);
3902
3903                 if (!rv) {
3904                         drbd_md_clear_flag(mdev, MDF_FULL_SYNC);
3905                         drbd_md_sync(mdev);
3906                 }
3907
3908                 put_ldev(mdev);
3909         }
3910
3911         return rv;
3912 }
3913
3914 /**
3915  * drbd_bmio_clear_n_write() - io_fn for drbd_queue_bitmap_io() or drbd_bitmap_io()
3916  * @mdev:       DRBD device.
3917  *
3918  * Clears all bits in the bitmap and writes the whole bitmap to stable storage.
3919  */
3920 int drbd_bmio_clear_n_write(struct drbd_conf *mdev)
3921 {
3922         int rv = -EIO;
3923
3924         drbd_resume_al(mdev);
3925         if (get_ldev_if_state(mdev, D_ATTACHING)) {
3926                 drbd_bm_clear_all(mdev);
3927                 rv = drbd_bm_write(mdev);
3928                 put_ldev(mdev);
3929         }
3930
3931         return rv;
3932 }
3933
3934 static int w_bitmap_io(struct drbd_conf *mdev, struct drbd_work *w, int unused)
3935 {
3936         struct bm_io_work *work = container_of(w, struct bm_io_work, w);
3937         int rv = -EIO;
3938
3939         D_ASSERT(atomic_read(&mdev->ap_bio_cnt) == 0);
3940
3941         if (get_ldev(mdev)) {
3942                 drbd_bm_lock(mdev, work->why, work->flags);
3943                 rv = work->io_fn(mdev);
3944                 drbd_bm_unlock(mdev);
3945                 put_ldev(mdev);
3946         }
3947
3948         clear_bit(BITMAP_IO, &mdev->flags);
3949         smp_mb__after_clear_bit();
3950         wake_up(&mdev->misc_wait);
3951
3952         if (work->done)
3953                 work->done(mdev, rv);
3954
3955         clear_bit(BITMAP_IO_QUEUED, &mdev->flags);
3956         work->why = NULL;
3957         work->flags = 0;
3958
3959         return 1;
3960 }
3961
3962 void drbd_ldev_destroy(struct drbd_conf *mdev)
3963 {
3964         lc_destroy(mdev->resync);
3965         mdev->resync = NULL;
3966         lc_destroy(mdev->act_log);
3967         mdev->act_log = NULL;
3968         __no_warn(local,
3969                 drbd_free_bc(mdev->ldev);
3970                 mdev->ldev = NULL;);
3971
3972         if (mdev->md_io_tmpp) {
3973                 __free_page(mdev->md_io_tmpp);
3974                 mdev->md_io_tmpp = NULL;
3975         }
3976         clear_bit(GO_DISKLESS, &mdev->flags);
3977 }
3978
3979 static int w_go_diskless(struct drbd_conf *mdev, struct drbd_work *w, int unused)
3980 {
3981         D_ASSERT(mdev->state.disk == D_FAILED);
3982         /* we cannot assert local_cnt == 0 here, as get_ldev_if_state will
3983          * inc/dec it frequently. Once we are D_DISKLESS, no one will touch
3984          * the protected members anymore, though, so once put_ldev reaches zero
3985          * again, it will be safe to free them. */
3986         drbd_force_state(mdev, NS(disk, D_DISKLESS));
3987         return 1;
3988 }
3989
3990 void drbd_go_diskless(struct drbd_conf *mdev)
3991 {
3992         D_ASSERT(mdev->state.disk == D_FAILED);
3993         if (!test_and_set_bit(GO_DISKLESS, &mdev->flags))
3994                 drbd_queue_work(&mdev->tconn->data.work, &mdev->go_diskless);
3995 }
3996
3997 /**
3998  * drbd_queue_bitmap_io() - Queues an IO operation on the whole bitmap
3999  * @mdev:       DRBD device.
4000  * @io_fn:      IO callback to be called when bitmap IO is possible
4001  * @done:       callback to be called after the bitmap IO was performed
4002  * @why:        Descriptive text of the reason for doing the IO
4003  *
4004  * While IO on the bitmap happens we freeze application IO thus we ensure
4005  * that drbd_set_out_of_sync() can not be called. This function MAY ONLY be
4006  * called from worker context. It MUST NOT be used while a previous such
4007  * work is still pending!
4008  */
4009 void drbd_queue_bitmap_io(struct drbd_conf *mdev,
4010                           int (*io_fn)(struct drbd_conf *),
4011                           void (*done)(struct drbd_conf *, int),
4012                           char *why, enum bm_flag flags)
4013 {
4014         D_ASSERT(current == mdev->tconn->worker.task);
4015
4016         D_ASSERT(!test_bit(BITMAP_IO_QUEUED, &mdev->flags));
4017         D_ASSERT(!test_bit(BITMAP_IO, &mdev->flags));
4018         D_ASSERT(list_empty(&mdev->bm_io_work.w.list));
4019         if (mdev->bm_io_work.why)
4020                 dev_err(DEV, "FIXME going to queue '%s' but '%s' still pending?\n",
4021                         why, mdev->bm_io_work.why);
4022
4023         mdev->bm_io_work.io_fn = io_fn;
4024         mdev->bm_io_work.done = done;
4025         mdev->bm_io_work.why = why;
4026         mdev->bm_io_work.flags = flags;
4027
4028         spin_lock_irq(&mdev->tconn->req_lock);
4029         set_bit(BITMAP_IO, &mdev->flags);
4030         if (atomic_read(&mdev->ap_bio_cnt) == 0) {
4031                 if (!test_and_set_bit(BITMAP_IO_QUEUED, &mdev->flags))
4032                         drbd_queue_work(&mdev->tconn->data.work, &mdev->bm_io_work.w);
4033         }
4034         spin_unlock_irq(&mdev->tconn->req_lock);
4035 }
4036
4037 /**
4038  * drbd_bitmap_io() -  Does an IO operation on the whole bitmap
4039  * @mdev:       DRBD device.
4040  * @io_fn:      IO callback to be called when bitmap IO is possible
4041  * @why:        Descriptive text of the reason for doing the IO
4042  *
4043  * freezes application IO while that the actual IO operations runs. This
4044  * functions MAY NOT be called from worker context.
4045  */
4046 int drbd_bitmap_io(struct drbd_conf *mdev, int (*io_fn)(struct drbd_conf *),
4047                 char *why, enum bm_flag flags)
4048 {
4049         int rv;
4050
4051         D_ASSERT(current != mdev->tconn->worker.task);
4052
4053         if ((flags & BM_LOCKED_SET_ALLOWED) == 0)
4054                 drbd_suspend_io(mdev);
4055
4056         drbd_bm_lock(mdev, why, flags);
4057         rv = io_fn(mdev);
4058         drbd_bm_unlock(mdev);
4059
4060         if ((flags & BM_LOCKED_SET_ALLOWED) == 0)
4061                 drbd_resume_io(mdev);
4062
4063         return rv;
4064 }
4065
4066 void drbd_md_set_flag(struct drbd_conf *mdev, int flag) __must_hold(local)
4067 {
4068         if ((mdev->ldev->md.flags & flag) != flag) {
4069                 drbd_md_mark_dirty(mdev);
4070                 mdev->ldev->md.flags |= flag;
4071         }
4072 }
4073
4074 void drbd_md_clear_flag(struct drbd_conf *mdev, int flag) __must_hold(local)
4075 {
4076         if ((mdev->ldev->md.flags & flag) != 0) {
4077                 drbd_md_mark_dirty(mdev);
4078                 mdev->ldev->md.flags &= ~flag;
4079         }
4080 }
4081 int drbd_md_test_flag(struct drbd_backing_dev *bdev, int flag)
4082 {
4083         return (bdev->md.flags & flag) != 0;
4084 }
4085
4086 static void md_sync_timer_fn(unsigned long data)
4087 {
4088         struct drbd_conf *mdev = (struct drbd_conf *) data;
4089
4090         drbd_queue_work_front(&mdev->tconn->data.work, &mdev->md_sync_work);
4091 }
4092
4093 static int w_md_sync(struct drbd_conf *mdev, struct drbd_work *w, int unused)
4094 {
4095         dev_warn(DEV, "md_sync_timer expired! Worker calls drbd_md_sync().\n");
4096 #ifdef DEBUG
4097         dev_warn(DEV, "last md_mark_dirty: %s:%u\n",
4098                 mdev->last_md_mark_dirty.func, mdev->last_md_mark_dirty.line);
4099 #endif
4100         drbd_md_sync(mdev);
4101         return 1;
4102 }
4103
4104 const char *cmdname(enum drbd_packets cmd)
4105 {
4106         /* THINK may need to become several global tables
4107          * when we want to support more than
4108          * one PRO_VERSION */
4109         static const char *cmdnames[] = {
4110                 [P_DATA]                = "Data",
4111                 [P_DATA_REPLY]          = "DataReply",
4112                 [P_RS_DATA_REPLY]       = "RSDataReply",
4113                 [P_BARRIER]             = "Barrier",
4114                 [P_BITMAP]              = "ReportBitMap",
4115                 [P_BECOME_SYNC_TARGET]  = "BecomeSyncTarget",
4116                 [P_BECOME_SYNC_SOURCE]  = "BecomeSyncSource",
4117                 [P_UNPLUG_REMOTE]       = "UnplugRemote",
4118                 [P_DATA_REQUEST]        = "DataRequest",
4119                 [P_RS_DATA_REQUEST]     = "RSDataRequest",
4120                 [P_SYNC_PARAM]          = "SyncParam",
4121                 [P_SYNC_PARAM89]        = "SyncParam89",
4122                 [P_PROTOCOL]            = "ReportProtocol",
4123                 [P_UUIDS]               = "ReportUUIDs",
4124                 [P_SIZES]               = "ReportSizes",
4125                 [P_STATE]               = "ReportState",
4126                 [P_SYNC_UUID]           = "ReportSyncUUID",
4127                 [P_AUTH_CHALLENGE]      = "AuthChallenge",
4128                 [P_AUTH_RESPONSE]       = "AuthResponse",
4129                 [P_PING]                = "Ping",
4130                 [P_PING_ACK]            = "PingAck",
4131                 [P_RECV_ACK]            = "RecvAck",
4132                 [P_WRITE_ACK]           = "WriteAck",
4133                 [P_RS_WRITE_ACK]        = "RSWriteAck",
4134                 [P_DISCARD_ACK]         = "DiscardAck",
4135                 [P_NEG_ACK]             = "NegAck",
4136                 [P_NEG_DREPLY]          = "NegDReply",
4137                 [P_NEG_RS_DREPLY]       = "NegRSDReply",
4138                 [P_BARRIER_ACK]         = "BarrierAck",
4139                 [P_STATE_CHG_REQ]       = "StateChgRequest",
4140                 [P_STATE_CHG_REPLY]     = "StateChgReply",
4141                 [P_OV_REQUEST]          = "OVRequest",
4142                 [P_OV_REPLY]            = "OVReply",
4143                 [P_OV_RESULT]           = "OVResult",
4144                 [P_CSUM_RS_REQUEST]     = "CsumRSRequest",
4145                 [P_RS_IS_IN_SYNC]       = "CsumRSIsInSync",
4146                 [P_COMPRESSED_BITMAP]   = "CBitmap",
4147                 [P_DELAY_PROBE]         = "DelayProbe",
4148                 [P_OUT_OF_SYNC]         = "OutOfSync",
4149                 [P_MAX_CMD]             = NULL,
4150         };
4151
4152         if (cmd == P_HAND_SHAKE_M)
4153                 return "HandShakeM";
4154         if (cmd == P_HAND_SHAKE_S)
4155                 return "HandShakeS";
4156         if (cmd == P_HAND_SHAKE)
4157                 return "HandShake";
4158         if (cmd >= P_MAX_CMD)
4159                 return "Unknown";
4160         return cmdnames[cmd];
4161 }
4162
4163 #ifdef CONFIG_DRBD_FAULT_INJECTION
4164 /* Fault insertion support including random number generator shamelessly
4165  * stolen from kernel/rcutorture.c */
4166 struct fault_random_state {
4167         unsigned long state;
4168         unsigned long count;
4169 };
4170
4171 #define FAULT_RANDOM_MULT 39916801  /* prime */
4172 #define FAULT_RANDOM_ADD        479001701 /* prime */
4173 #define FAULT_RANDOM_REFRESH 10000
4174
4175 /*
4176  * Crude but fast random-number generator.  Uses a linear congruential
4177  * generator, with occasional help from get_random_bytes().
4178  */
4179 static unsigned long
4180 _drbd_fault_random(struct fault_random_state *rsp)
4181 {
4182         long refresh;
4183
4184         if (!rsp->count--) {
4185                 get_random_bytes(&refresh, sizeof(refresh));
4186                 rsp->state += refresh;
4187                 rsp->count = FAULT_RANDOM_REFRESH;
4188         }
4189         rsp->state = rsp->state * FAULT_RANDOM_MULT + FAULT_RANDOM_ADD;
4190         return swahw32(rsp->state);
4191 }
4192
4193 static char *
4194 _drbd_fault_str(unsigned int type) {
4195         static char *_faults[] = {
4196                 [DRBD_FAULT_MD_WR] = "Meta-data write",
4197                 [DRBD_FAULT_MD_RD] = "Meta-data read",
4198                 [DRBD_FAULT_RS_WR] = "Resync write",
4199                 [DRBD_FAULT_RS_RD] = "Resync read",
4200                 [DRBD_FAULT_DT_WR] = "Data write",
4201                 [DRBD_FAULT_DT_RD] = "Data read",
4202                 [DRBD_FAULT_DT_RA] = "Data read ahead",
4203                 [DRBD_FAULT_BM_ALLOC] = "BM allocation",
4204                 [DRBD_FAULT_AL_EE] = "EE allocation",
4205                 [DRBD_FAULT_RECEIVE] = "receive data corruption",
4206         };
4207
4208         return (type < DRBD_FAULT_MAX) ? _faults[type] : "**Unknown**";
4209 }
4210
4211 unsigned int
4212 _drbd_insert_fault(struct drbd_conf *mdev, unsigned int type)
4213 {
4214         static struct fault_random_state rrs = {0, 0};
4215
4216         unsigned int ret = (
4217                 (fault_devs == 0 ||
4218                         ((1 << mdev_to_minor(mdev)) & fault_devs) != 0) &&
4219                 (((_drbd_fault_random(&rrs) % 100) + 1) <= fault_rate));
4220
4221         if (ret) {
4222                 fault_count++;
4223
4224                 if (__ratelimit(&drbd_ratelimit_state))
4225                         dev_warn(DEV, "***Simulating %s failure\n",
4226                                 _drbd_fault_str(type));
4227         }
4228
4229         return ret;
4230 }
4231 #endif
4232
4233 const char *drbd_buildtag(void)
4234 {
4235         /* DRBD built from external sources has here a reference to the
4236            git hash of the source code. */
4237
4238         static char buildtag[38] = "\0uilt-in";
4239
4240         if (buildtag[0] == 0) {
4241 #ifdef CONFIG_MODULES
4242                 if (THIS_MODULE != NULL)
4243                         sprintf(buildtag, "srcversion: %-24s", THIS_MODULE->srcversion);
4244                 else
4245 #endif
4246                         buildtag[0] = 'b';
4247         }
4248
4249         return buildtag;
4250 }
4251
4252 module_init(drbd_init)
4253 module_exit(drbd_cleanup)
4254
4255 EXPORT_SYMBOL(drbd_conn_str);
4256 EXPORT_SYMBOL(drbd_role_str);
4257 EXPORT_SYMBOL(drbd_disk_str);
4258 EXPORT_SYMBOL(drbd_set_st_err_str);