]> git.karo-electronics.de Git - karo-tx-linux.git/blob - drivers/block/drbd/drbd_main.c
4f33714fb3cdfaaf360900a58f78da32140b0cb3
[karo-tx-linux.git] / drivers / block / drbd / drbd_main.c
1 /*
2    drbd.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    Thanks to Carter Burden, Bart Grantham and Gennadiy Nerubayev
11    from Logicworks, Inc. for making SDP replication support possible.
12
13    drbd is free software; you can redistribute it and/or modify
14    it under the terms of the GNU General Public License as published by
15    the Free Software Foundation; either version 2, or (at your option)
16    any later version.
17
18    drbd is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
21    GNU General Public License for more details.
22
23    You should have received a copy of the GNU General Public License
24    along with drbd; see the file COPYING.  If not, write to
25    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
26
27  */
28
29 #include <linux/module.h>
30 #include <linux/drbd.h>
31 #include <asm/uaccess.h>
32 #include <asm/types.h>
33 #include <net/sock.h>
34 #include <linux/ctype.h>
35 #include <linux/smp_lock.h>
36 #include <linux/fs.h>
37 #include <linux/file.h>
38 #include <linux/proc_fs.h>
39 #include <linux/init.h>
40 #include <linux/mm.h>
41 #include <linux/memcontrol.h>
42 #include <linux/mm_inline.h>
43 #include <linux/slab.h>
44 #include <linux/random.h>
45 #include <linux/reboot.h>
46 #include <linux/notifier.h>
47 #include <linux/kthread.h>
48
49 #define __KERNEL_SYSCALLS__
50 #include <linux/unistd.h>
51 #include <linux/vmalloc.h>
52
53 #include <linux/drbd_limits.h>
54 #include "drbd_int.h"
55 #include "drbd_req.h" /* only for _req_mod in tl_release and tl_clear */
56
57 #include "drbd_vli.h"
58
59 struct after_state_chg_work {
60         struct drbd_work w;
61         union drbd_state os;
62         union drbd_state ns;
63         enum chg_state_flags flags;
64         struct completion *done;
65 };
66
67 int drbdd_init(struct drbd_thread *);
68 int drbd_worker(struct drbd_thread *);
69 int drbd_asender(struct drbd_thread *);
70
71 int drbd_init(void);
72 static int drbd_open(struct block_device *bdev, fmode_t mode);
73 static int drbd_release(struct gendisk *gd, fmode_t mode);
74 static int w_after_state_ch(struct drbd_conf *mdev, struct drbd_work *w, int unused);
75 static void after_state_ch(struct drbd_conf *mdev, union drbd_state os,
76                            union drbd_state ns, enum chg_state_flags flags);
77 static int w_md_sync(struct drbd_conf *mdev, struct drbd_work *w, int unused);
78 static void md_sync_timer_fn(unsigned long data);
79 static int w_bitmap_io(struct drbd_conf *mdev, struct drbd_work *w, int unused);
80
81 MODULE_AUTHOR("Philipp Reisner <phil@linbit.com>, "
82               "Lars Ellenberg <lars@linbit.com>");
83 MODULE_DESCRIPTION("drbd - Distributed Replicated Block Device v" REL_VERSION);
84 MODULE_VERSION(REL_VERSION);
85 MODULE_LICENSE("GPL");
86 MODULE_PARM_DESC(minor_count, "Maximum number of drbd devices (1-255)");
87 MODULE_ALIAS_BLOCKDEV_MAJOR(DRBD_MAJOR);
88
89 #include <linux/moduleparam.h>
90 /* allow_open_on_secondary */
91 MODULE_PARM_DESC(allow_oos, "DONT USE!");
92 /* thanks to these macros, if compiled into the kernel (not-module),
93  * this becomes the boot parameter drbd.minor_count */
94 module_param(minor_count, uint, 0444);
95 module_param(disable_sendpage, bool, 0644);
96 module_param(allow_oos, bool, 0);
97 module_param(cn_idx, uint, 0444);
98 module_param(proc_details, int, 0644);
99
100 #ifdef CONFIG_DRBD_FAULT_INJECTION
101 int enable_faults;
102 int fault_rate;
103 static int fault_count;
104 int fault_devs;
105 /* bitmap of enabled faults */
106 module_param(enable_faults, int, 0664);
107 /* fault rate % value - applies to all enabled faults */
108 module_param(fault_rate, int, 0664);
109 /* count of faults inserted */
110 module_param(fault_count, int, 0664);
111 /* bitmap of devices to insert faults on */
112 module_param(fault_devs, int, 0644);
113 #endif
114
115 /* module parameter, defined */
116 unsigned int minor_count = 32;
117 int disable_sendpage;
118 int allow_oos;
119 unsigned int cn_idx = CN_IDX_DRBD;
120 int proc_details;       /* Detail level in proc drbd*/
121
122 /* Module parameter for setting the user mode helper program
123  * to run. Default is /sbin/drbdadm */
124 char usermode_helper[80] = "/sbin/drbdadm";
125
126 module_param_string(usermode_helper, usermode_helper, sizeof(usermode_helper), 0644);
127
128 /* in 2.6.x, our device mapping and config info contains our virtual gendisks
129  * as member "struct gendisk *vdisk;"
130  */
131 struct drbd_conf **minor_table;
132
133 struct kmem_cache *drbd_request_cache;
134 struct kmem_cache *drbd_ee_cache;       /* epoch entries */
135 struct kmem_cache *drbd_bm_ext_cache;   /* bitmap extents */
136 struct kmem_cache *drbd_al_ext_cache;   /* activity log extents */
137 mempool_t *drbd_request_mempool;
138 mempool_t *drbd_ee_mempool;
139
140 /* I do not use a standard mempool, because:
141    1) I want to hand out the pre-allocated objects first.
142    2) I want to be able to interrupt sleeping allocation with a signal.
143    Note: This is a single linked list, the next pointer is the private
144          member of struct page.
145  */
146 struct page *drbd_pp_pool;
147 spinlock_t   drbd_pp_lock;
148 int          drbd_pp_vacant;
149 wait_queue_head_t drbd_pp_wait;
150
151 DEFINE_RATELIMIT_STATE(drbd_ratelimit_state, 5 * HZ, 5);
152
153 static const struct block_device_operations drbd_ops = {
154         .owner =   THIS_MODULE,
155         .open =    drbd_open,
156         .release = drbd_release,
157 };
158
159 #define ARRY_SIZE(A) (sizeof(A)/sizeof(A[0]))
160
161 #ifdef __CHECKER__
162 /* When checking with sparse, and this is an inline function, sparse will
163    give tons of false positives. When this is a real functions sparse works.
164  */
165 int _get_ldev_if_state(struct drbd_conf *mdev, enum drbd_disk_state mins)
166 {
167         int io_allowed;
168
169         atomic_inc(&mdev->local_cnt);
170         io_allowed = (mdev->state.disk >= mins);
171         if (!io_allowed) {
172                 if (atomic_dec_and_test(&mdev->local_cnt))
173                         wake_up(&mdev->misc_wait);
174         }
175         return io_allowed;
176 }
177
178 #endif
179
180 /**
181  * DOC: The transfer log
182  *
183  * The transfer log is a single linked list of &struct drbd_tl_epoch objects.
184  * mdev->newest_tle points to the head, mdev->oldest_tle points to the tail
185  * of the list. There is always at least one &struct drbd_tl_epoch object.
186  *
187  * Each &struct drbd_tl_epoch has a circular double linked list of requests
188  * attached.
189  */
190 static int tl_init(struct drbd_conf *mdev)
191 {
192         struct drbd_tl_epoch *b;
193
194         /* during device minor initialization, we may well use GFP_KERNEL */
195         b = kmalloc(sizeof(struct drbd_tl_epoch), GFP_KERNEL);
196         if (!b)
197                 return 0;
198         INIT_LIST_HEAD(&b->requests);
199         INIT_LIST_HEAD(&b->w.list);
200         b->next = NULL;
201         b->br_number = 4711;
202         b->n_writes = 0;
203         b->w.cb = NULL; /* if this is != NULL, we need to dec_ap_pending in tl_clear */
204
205         mdev->oldest_tle = b;
206         mdev->newest_tle = b;
207         INIT_LIST_HEAD(&mdev->out_of_sequence_requests);
208
209         mdev->tl_hash = NULL;
210         mdev->tl_hash_s = 0;
211
212         return 1;
213 }
214
215 static void tl_cleanup(struct drbd_conf *mdev)
216 {
217         D_ASSERT(mdev->oldest_tle == mdev->newest_tle);
218         D_ASSERT(list_empty(&mdev->out_of_sequence_requests));
219         kfree(mdev->oldest_tle);
220         mdev->oldest_tle = NULL;
221         kfree(mdev->unused_spare_tle);
222         mdev->unused_spare_tle = NULL;
223         kfree(mdev->tl_hash);
224         mdev->tl_hash = NULL;
225         mdev->tl_hash_s = 0;
226 }
227
228 /**
229  * _tl_add_barrier() - Adds a barrier to the transfer log
230  * @mdev:       DRBD device.
231  * @new:        Barrier to be added before the current head of the TL.
232  *
233  * The caller must hold the req_lock.
234  */
235 void _tl_add_barrier(struct drbd_conf *mdev, struct drbd_tl_epoch *new)
236 {
237         struct drbd_tl_epoch *newest_before;
238
239         INIT_LIST_HEAD(&new->requests);
240         INIT_LIST_HEAD(&new->w.list);
241         new->w.cb = NULL; /* if this is != NULL, we need to dec_ap_pending in tl_clear */
242         new->next = NULL;
243         new->n_writes = 0;
244
245         newest_before = mdev->newest_tle;
246         /* never send a barrier number == 0, because that is special-cased
247          * when using TCQ for our write ordering code */
248         new->br_number = (newest_before->br_number+1) ?: 1;
249         if (mdev->newest_tle != new) {
250                 mdev->newest_tle->next = new;
251                 mdev->newest_tle = new;
252         }
253 }
254
255 /**
256  * tl_release() - Free or recycle the oldest &struct drbd_tl_epoch object of the TL
257  * @mdev:       DRBD device.
258  * @barrier_nr: Expected identifier of the DRBD write barrier packet.
259  * @set_size:   Expected number of requests before that barrier.
260  *
261  * In case the passed barrier_nr or set_size does not match the oldest
262  * &struct drbd_tl_epoch objects this function will cause a termination
263  * of the connection.
264  */
265 void tl_release(struct drbd_conf *mdev, unsigned int barrier_nr,
266                        unsigned int set_size)
267 {
268         struct drbd_tl_epoch *b, *nob; /* next old barrier */
269         struct list_head *le, *tle;
270         struct drbd_request *r;
271
272         spin_lock_irq(&mdev->req_lock);
273
274         b = mdev->oldest_tle;
275
276         /* first some paranoia code */
277         if (b == NULL) {
278                 dev_err(DEV, "BAD! BarrierAck #%u received, but no epoch in tl!?\n",
279                         barrier_nr);
280                 goto bail;
281         }
282         if (b->br_number != barrier_nr) {
283                 dev_err(DEV, "BAD! BarrierAck #%u received, expected #%u!\n",
284                         barrier_nr, b->br_number);
285                 goto bail;
286         }
287         if (b->n_writes != set_size) {
288                 dev_err(DEV, "BAD! BarrierAck #%u received with n_writes=%u, expected n_writes=%u!\n",
289                         barrier_nr, set_size, b->n_writes);
290                 goto bail;
291         }
292
293         /* Clean up list of requests processed during current epoch */
294         list_for_each_safe(le, tle, &b->requests) {
295                 r = list_entry(le, struct drbd_request, tl_requests);
296                 _req_mod(r, barrier_acked);
297         }
298         /* There could be requests on the list waiting for completion
299            of the write to the local disk. To avoid corruptions of
300            slab's data structures we have to remove the lists head.
301
302            Also there could have been a barrier ack out of sequence, overtaking
303            the write acks - which would be a bug and violating write ordering.
304            To not deadlock in case we lose connection while such requests are
305            still pending, we need some way to find them for the
306            _req_mode(connection_lost_while_pending).
307
308            These have been list_move'd to the out_of_sequence_requests list in
309            _req_mod(, barrier_acked) above.
310            */
311         list_del_init(&b->requests);
312
313         nob = b->next;
314         if (test_and_clear_bit(CREATE_BARRIER, &mdev->flags)) {
315                 _tl_add_barrier(mdev, b);
316                 if (nob)
317                         mdev->oldest_tle = nob;
318                 /* if nob == NULL b was the only barrier, and becomes the new
319                    barrier. Therefore mdev->oldest_tle points already to b */
320         } else {
321                 D_ASSERT(nob != NULL);
322                 mdev->oldest_tle = nob;
323                 kfree(b);
324         }
325
326         spin_unlock_irq(&mdev->req_lock);
327         dec_ap_pending(mdev);
328
329         return;
330
331 bail:
332         spin_unlock_irq(&mdev->req_lock);
333         drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
334 }
335
336 /**
337  * _tl_restart() - Walks the transfer log, and applies an action to all requests
338  * @mdev:       DRBD device.
339  * @what:       The action/event to perform with all request objects
340  *
341  * @what might be one of connection_lost_while_pending, resend, fail_frozen_disk_io,
342  * restart_frozen_disk_io.
343  */
344 static void _tl_restart(struct drbd_conf *mdev, enum drbd_req_event what)
345 {
346         struct drbd_tl_epoch *b, *tmp, **pn;
347         struct list_head *le, *tle, carry_reads;
348         struct drbd_request *req;
349         int rv, n_writes, n_reads;
350
351         b = mdev->oldest_tle;
352         pn = &mdev->oldest_tle;
353         while (b) {
354                 n_writes = 0;
355                 n_reads = 0;
356                 INIT_LIST_HEAD(&carry_reads);
357                 list_for_each_safe(le, tle, &b->requests) {
358                         req = list_entry(le, struct drbd_request, tl_requests);
359                         rv = _req_mod(req, what);
360
361                         n_writes += (rv & MR_WRITE) >> MR_WRITE_SHIFT;
362                         n_reads  += (rv & MR_READ) >> MR_READ_SHIFT;
363                 }
364                 tmp = b->next;
365
366                 if (n_writes) {
367                         if (what == resend) {
368                                 b->n_writes = n_writes;
369                                 if (b->w.cb == NULL) {
370                                         b->w.cb = w_send_barrier;
371                                         inc_ap_pending(mdev);
372                                         set_bit(CREATE_BARRIER, &mdev->flags);
373                                 }
374
375                                 drbd_queue_work(&mdev->data.work, &b->w);
376                         }
377                         pn = &b->next;
378                 } else {
379                         if (n_reads)
380                                 list_add(&carry_reads, &b->requests);
381                         /* there could still be requests on that ring list,
382                          * in case local io is still pending */
383                         list_del(&b->requests);
384
385                         /* dec_ap_pending corresponding to queue_barrier.
386                          * the newest barrier may not have been queued yet,
387                          * in which case w.cb is still NULL. */
388                         if (b->w.cb != NULL)
389                                 dec_ap_pending(mdev);
390
391                         if (b == mdev->newest_tle) {
392                                 /* recycle, but reinit! */
393                                 D_ASSERT(tmp == NULL);
394                                 INIT_LIST_HEAD(&b->requests);
395                                 list_splice(&carry_reads, &b->requests);
396                                 INIT_LIST_HEAD(&b->w.list);
397                                 b->w.cb = NULL;
398                                 b->br_number = net_random();
399                                 b->n_writes = 0;
400
401                                 *pn = b;
402                                 break;
403                         }
404                         *pn = tmp;
405                         kfree(b);
406                 }
407                 b = tmp;
408                 list_splice(&carry_reads, &b->requests);
409         }
410 }
411
412
413 /**
414  * tl_clear() - Clears all requests and &struct drbd_tl_epoch objects out of the TL
415  * @mdev:       DRBD device.
416  *
417  * This is called after the connection to the peer was lost. The storage covered
418  * by the requests on the transfer gets marked as our of sync. Called from the
419  * receiver thread and the worker thread.
420  */
421 void tl_clear(struct drbd_conf *mdev)
422 {
423         struct list_head *le, *tle;
424         struct drbd_request *r;
425
426         spin_lock_irq(&mdev->req_lock);
427
428         _tl_restart(mdev, connection_lost_while_pending);
429
430         /* we expect this list to be empty. */
431         D_ASSERT(list_empty(&mdev->out_of_sequence_requests));
432
433         /* but just in case, clean it up anyways! */
434         list_for_each_safe(le, tle, &mdev->out_of_sequence_requests) {
435                 r = list_entry(le, struct drbd_request, tl_requests);
436                 /* It would be nice to complete outside of spinlock.
437                  * But this is easier for now. */
438                 _req_mod(r, connection_lost_while_pending);
439         }
440
441         /* ensure bit indicating barrier is required is clear */
442         clear_bit(CREATE_BARRIER, &mdev->flags);
443
444         memset(mdev->app_reads_hash, 0, APP_R_HSIZE*sizeof(void *));
445
446         spin_unlock_irq(&mdev->req_lock);
447 }
448
449 void tl_restart(struct drbd_conf *mdev, enum drbd_req_event what)
450 {
451         spin_lock_irq(&mdev->req_lock);
452         _tl_restart(mdev, what);
453         spin_unlock_irq(&mdev->req_lock);
454 }
455
456 /**
457  * cl_wide_st_chg() - TRUE if the state change is a cluster wide one
458  * @mdev:       DRBD device.
459  * @os:         old (current) state.
460  * @ns:         new (wanted) state.
461  */
462 static int cl_wide_st_chg(struct drbd_conf *mdev,
463                           union drbd_state os, union drbd_state ns)
464 {
465         return (os.conn >= C_CONNECTED && ns.conn >= C_CONNECTED &&
466                  ((os.role != R_PRIMARY && ns.role == R_PRIMARY) ||
467                   (os.conn != C_STARTING_SYNC_T && ns.conn == C_STARTING_SYNC_T) ||
468                   (os.conn != C_STARTING_SYNC_S && ns.conn == C_STARTING_SYNC_S) ||
469                   (os.disk != D_DISKLESS && ns.disk == D_DISKLESS))) ||
470                 (os.conn >= C_CONNECTED && ns.conn == C_DISCONNECTING) ||
471                 (os.conn == C_CONNECTED && ns.conn == C_VERIFY_S);
472 }
473
474 int drbd_change_state(struct drbd_conf *mdev, enum chg_state_flags f,
475                       union drbd_state mask, union drbd_state val)
476 {
477         unsigned long flags;
478         union drbd_state os, ns;
479         int rv;
480
481         spin_lock_irqsave(&mdev->req_lock, flags);
482         os = mdev->state;
483         ns.i = (os.i & ~mask.i) | val.i;
484         rv = _drbd_set_state(mdev, ns, f, NULL);
485         ns = mdev->state;
486         spin_unlock_irqrestore(&mdev->req_lock, flags);
487
488         return rv;
489 }
490
491 /**
492  * drbd_force_state() - Impose a change which happens outside our control on our state
493  * @mdev:       DRBD device.
494  * @mask:       mask of state bits to change.
495  * @val:        value of new state bits.
496  */
497 void drbd_force_state(struct drbd_conf *mdev,
498         union drbd_state mask, union drbd_state val)
499 {
500         drbd_change_state(mdev, CS_HARD, mask, val);
501 }
502
503 static int is_valid_state(struct drbd_conf *mdev, union drbd_state ns);
504 static int is_valid_state_transition(struct drbd_conf *,
505                                      union drbd_state, union drbd_state);
506 static union drbd_state sanitize_state(struct drbd_conf *mdev, union drbd_state os,
507                                        union drbd_state ns, const char **warn_sync_abort);
508 int drbd_send_state_req(struct drbd_conf *,
509                         union drbd_state, union drbd_state);
510
511 static enum drbd_state_ret_codes _req_st_cond(struct drbd_conf *mdev,
512                                     union drbd_state mask, union drbd_state val)
513 {
514         union drbd_state os, ns;
515         unsigned long flags;
516         int rv;
517
518         if (test_and_clear_bit(CL_ST_CHG_SUCCESS, &mdev->flags))
519                 return SS_CW_SUCCESS;
520
521         if (test_and_clear_bit(CL_ST_CHG_FAIL, &mdev->flags))
522                 return SS_CW_FAILED_BY_PEER;
523
524         rv = 0;
525         spin_lock_irqsave(&mdev->req_lock, flags);
526         os = mdev->state;
527         ns.i = (os.i & ~mask.i) | val.i;
528         ns = sanitize_state(mdev, os, ns, NULL);
529
530         if (!cl_wide_st_chg(mdev, os, ns))
531                 rv = SS_CW_NO_NEED;
532         if (!rv) {
533                 rv = is_valid_state(mdev, ns);
534                 if (rv == SS_SUCCESS) {
535                         rv = is_valid_state_transition(mdev, ns, os);
536                         if (rv == SS_SUCCESS)
537                                 rv = 0; /* cont waiting, otherwise fail. */
538                 }
539         }
540         spin_unlock_irqrestore(&mdev->req_lock, flags);
541
542         return rv;
543 }
544
545 /**
546  * drbd_req_state() - Perform an eventually cluster wide state change
547  * @mdev:       DRBD device.
548  * @mask:       mask of state bits to change.
549  * @val:        value of new state bits.
550  * @f:          flags
551  *
552  * Should not be called directly, use drbd_request_state() or
553  * _drbd_request_state().
554  */
555 static int drbd_req_state(struct drbd_conf *mdev,
556                           union drbd_state mask, union drbd_state val,
557                           enum chg_state_flags f)
558 {
559         struct completion done;
560         unsigned long flags;
561         union drbd_state os, ns;
562         int rv;
563
564         init_completion(&done);
565
566         if (f & CS_SERIALIZE)
567                 mutex_lock(&mdev->state_mutex);
568
569         spin_lock_irqsave(&mdev->req_lock, flags);
570         os = mdev->state;
571         ns.i = (os.i & ~mask.i) | val.i;
572         ns = sanitize_state(mdev, os, ns, NULL);
573
574         if (cl_wide_st_chg(mdev, os, ns)) {
575                 rv = is_valid_state(mdev, ns);
576                 if (rv == SS_SUCCESS)
577                         rv = is_valid_state_transition(mdev, ns, os);
578                 spin_unlock_irqrestore(&mdev->req_lock, flags);
579
580                 if (rv < SS_SUCCESS) {
581                         if (f & CS_VERBOSE)
582                                 print_st_err(mdev, os, ns, rv);
583                         goto abort;
584                 }
585
586                 drbd_state_lock(mdev);
587                 if (!drbd_send_state_req(mdev, mask, val)) {
588                         drbd_state_unlock(mdev);
589                         rv = SS_CW_FAILED_BY_PEER;
590                         if (f & CS_VERBOSE)
591                                 print_st_err(mdev, os, ns, rv);
592                         goto abort;
593                 }
594
595                 wait_event(mdev->state_wait,
596                         (rv = _req_st_cond(mdev, mask, val)));
597
598                 if (rv < SS_SUCCESS) {
599                         drbd_state_unlock(mdev);
600                         if (f & CS_VERBOSE)
601                                 print_st_err(mdev, os, ns, rv);
602                         goto abort;
603                 }
604                 spin_lock_irqsave(&mdev->req_lock, flags);
605                 os = mdev->state;
606                 ns.i = (os.i & ~mask.i) | val.i;
607                 rv = _drbd_set_state(mdev, ns, f, &done);
608                 drbd_state_unlock(mdev);
609         } else {
610                 rv = _drbd_set_state(mdev, ns, f, &done);
611         }
612
613         spin_unlock_irqrestore(&mdev->req_lock, flags);
614
615         if (f & CS_WAIT_COMPLETE && rv == SS_SUCCESS) {
616                 D_ASSERT(current != mdev->worker.task);
617                 wait_for_completion(&done);
618         }
619
620 abort:
621         if (f & CS_SERIALIZE)
622                 mutex_unlock(&mdev->state_mutex);
623
624         return rv;
625 }
626
627 /**
628  * _drbd_request_state() - Request a state change (with flags)
629  * @mdev:       DRBD device.
630  * @mask:       mask of state bits to change.
631  * @val:        value of new state bits.
632  * @f:          flags
633  *
634  * Cousin of drbd_request_state(), useful with the CS_WAIT_COMPLETE
635  * flag, or when logging of failed state change requests is not desired.
636  */
637 int _drbd_request_state(struct drbd_conf *mdev, union drbd_state mask,
638                         union drbd_state val,   enum chg_state_flags f)
639 {
640         int rv;
641
642         wait_event(mdev->state_wait,
643                    (rv = drbd_req_state(mdev, mask, val, f)) != SS_IN_TRANSIENT_STATE);
644
645         return rv;
646 }
647
648 static void print_st(struct drbd_conf *mdev, char *name, union drbd_state ns)
649 {
650         dev_err(DEV, " %s = { cs:%s ro:%s/%s ds:%s/%s %c%c%c%c }\n",
651             name,
652             drbd_conn_str(ns.conn),
653             drbd_role_str(ns.role),
654             drbd_role_str(ns.peer),
655             drbd_disk_str(ns.disk),
656             drbd_disk_str(ns.pdsk),
657             is_susp(ns) ? 's' : 'r',
658             ns.aftr_isp ? 'a' : '-',
659             ns.peer_isp ? 'p' : '-',
660             ns.user_isp ? 'u' : '-'
661             );
662 }
663
664 void print_st_err(struct drbd_conf *mdev,
665         union drbd_state os, union drbd_state ns, int err)
666 {
667         if (err == SS_IN_TRANSIENT_STATE)
668                 return;
669         dev_err(DEV, "State change failed: %s\n", drbd_set_st_err_str(err));
670         print_st(mdev, " state", os);
671         print_st(mdev, "wanted", ns);
672 }
673
674
675 #define drbd_peer_str drbd_role_str
676 #define drbd_pdsk_str drbd_disk_str
677
678 #define drbd_susp_str(A)     ((A) ? "1" : "0")
679 #define drbd_aftr_isp_str(A) ((A) ? "1" : "0")
680 #define drbd_peer_isp_str(A) ((A) ? "1" : "0")
681 #define drbd_user_isp_str(A) ((A) ? "1" : "0")
682
683 #define PSC(A) \
684         ({ if (ns.A != os.A) { \
685                 pbp += sprintf(pbp, #A "( %s -> %s ) ", \
686                               drbd_##A##_str(os.A), \
687                               drbd_##A##_str(ns.A)); \
688         } })
689
690 /**
691  * is_valid_state() - Returns an SS_ error code if ns is not valid
692  * @mdev:       DRBD device.
693  * @ns:         State to consider.
694  */
695 static int is_valid_state(struct drbd_conf *mdev, union drbd_state ns)
696 {
697         /* See drbd_state_sw_errors in drbd_strings.c */
698
699         enum drbd_fencing_p fp;
700         int rv = SS_SUCCESS;
701
702         fp = FP_DONT_CARE;
703         if (get_ldev(mdev)) {
704                 fp = mdev->ldev->dc.fencing;
705                 put_ldev(mdev);
706         }
707
708         if (get_net_conf(mdev)) {
709                 if (!mdev->net_conf->two_primaries &&
710                     ns.role == R_PRIMARY && ns.peer == R_PRIMARY)
711                         rv = SS_TWO_PRIMARIES;
712                 put_net_conf(mdev);
713         }
714
715         if (rv <= 0)
716                 /* already found a reason to abort */;
717         else if (ns.role == R_SECONDARY && mdev->open_cnt)
718                 rv = SS_DEVICE_IN_USE;
719
720         else if (ns.role == R_PRIMARY && ns.conn < C_CONNECTED && ns.disk < D_UP_TO_DATE)
721                 rv = SS_NO_UP_TO_DATE_DISK;
722
723         else if (fp >= FP_RESOURCE &&
724                  ns.role == R_PRIMARY && ns.conn < C_CONNECTED && ns.pdsk >= D_UNKNOWN)
725                 rv = SS_PRIMARY_NOP;
726
727         else if (ns.role == R_PRIMARY && ns.disk <= D_INCONSISTENT && ns.pdsk <= D_INCONSISTENT)
728                 rv = SS_NO_UP_TO_DATE_DISK;
729
730         else if (ns.conn > C_CONNECTED && ns.disk < D_INCONSISTENT)
731                 rv = SS_NO_LOCAL_DISK;
732
733         else if (ns.conn > C_CONNECTED && ns.pdsk < D_INCONSISTENT)
734                 rv = SS_NO_REMOTE_DISK;
735
736         else if (ns.conn > C_CONNECTED && ns.disk < D_UP_TO_DATE && ns.pdsk < D_UP_TO_DATE)
737                 rv = SS_NO_UP_TO_DATE_DISK;
738
739         else if ((ns.conn == C_CONNECTED ||
740                   ns.conn == C_WF_BITMAP_S ||
741                   ns.conn == C_SYNC_SOURCE ||
742                   ns.conn == C_PAUSED_SYNC_S) &&
743                   ns.disk == D_OUTDATED)
744                 rv = SS_CONNECTED_OUTDATES;
745
746         else if ((ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T) &&
747                  (mdev->sync_conf.verify_alg[0] == 0))
748                 rv = SS_NO_VERIFY_ALG;
749
750         else if ((ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T) &&
751                   mdev->agreed_pro_version < 88)
752                 rv = SS_NOT_SUPPORTED;
753
754         return rv;
755 }
756
757 /**
758  * is_valid_state_transition() - Returns an SS_ error code if the state transition is not possible
759  * @mdev:       DRBD device.
760  * @ns:         new state.
761  * @os:         old state.
762  */
763 static int is_valid_state_transition(struct drbd_conf *mdev,
764                                      union drbd_state ns, union drbd_state os)
765 {
766         int rv = SS_SUCCESS;
767
768         if ((ns.conn == C_STARTING_SYNC_T || ns.conn == C_STARTING_SYNC_S) &&
769             os.conn > C_CONNECTED)
770                 rv = SS_RESYNC_RUNNING;
771
772         if (ns.conn == C_DISCONNECTING && os.conn == C_STANDALONE)
773                 rv = SS_ALREADY_STANDALONE;
774
775         if (ns.disk > D_ATTACHING && os.disk == D_DISKLESS)
776                 rv = SS_IS_DISKLESS;
777
778         if (ns.conn == C_WF_CONNECTION && os.conn < C_UNCONNECTED)
779                 rv = SS_NO_NET_CONFIG;
780
781         if (ns.disk == D_OUTDATED && os.disk < D_OUTDATED && os.disk != D_ATTACHING)
782                 rv = SS_LOWER_THAN_OUTDATED;
783
784         if (ns.conn == C_DISCONNECTING && os.conn == C_UNCONNECTED)
785                 rv = SS_IN_TRANSIENT_STATE;
786
787         if (ns.conn == os.conn && ns.conn == C_WF_REPORT_PARAMS)
788                 rv = SS_IN_TRANSIENT_STATE;
789
790         if ((ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T) && os.conn < C_CONNECTED)
791                 rv = SS_NEED_CONNECTION;
792
793         if ((ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T) &&
794             ns.conn != os.conn && os.conn > C_CONNECTED)
795                 rv = SS_RESYNC_RUNNING;
796
797         if ((ns.conn == C_STARTING_SYNC_S || ns.conn == C_STARTING_SYNC_T) &&
798             os.conn < C_CONNECTED)
799                 rv = SS_NEED_CONNECTION;
800
801         return rv;
802 }
803
804 /**
805  * sanitize_state() - Resolves implicitly necessary additional changes to a state transition
806  * @mdev:       DRBD device.
807  * @os:         old state.
808  * @ns:         new state.
809  * @warn_sync_abort:
810  *
811  * When we loose connection, we have to set the state of the peers disk (pdsk)
812  * to D_UNKNOWN. This rule and many more along those lines are in this function.
813  */
814 static union drbd_state sanitize_state(struct drbd_conf *mdev, union drbd_state os,
815                                        union drbd_state ns, const char **warn_sync_abort)
816 {
817         enum drbd_fencing_p fp;
818
819         fp = FP_DONT_CARE;
820         if (get_ldev(mdev)) {
821                 fp = mdev->ldev->dc.fencing;
822                 put_ldev(mdev);
823         }
824
825         /* Disallow Network errors to configure a device's network part */
826         if ((ns.conn >= C_TIMEOUT && ns.conn <= C_TEAR_DOWN) &&
827             os.conn <= C_DISCONNECTING)
828                 ns.conn = os.conn;
829
830         /* After a network error (+C_TEAR_DOWN) only C_UNCONNECTED or C_DISCONNECTING can follow.
831          * If you try to go into some Sync* state, that shall fail (elsewhere). */
832         if (os.conn >= C_TIMEOUT && os.conn <= C_TEAR_DOWN &&
833             ns.conn != C_UNCONNECTED && ns.conn != C_DISCONNECTING && ns.conn <= C_TEAR_DOWN)
834                 ns.conn = os.conn;
835
836         /* After C_DISCONNECTING only C_STANDALONE may follow */
837         if (os.conn == C_DISCONNECTING && ns.conn != C_STANDALONE)
838                 ns.conn = os.conn;
839
840         if (ns.conn < C_CONNECTED) {
841                 ns.peer_isp = 0;
842                 ns.peer = R_UNKNOWN;
843                 if (ns.pdsk > D_UNKNOWN || ns.pdsk < D_INCONSISTENT)
844                         ns.pdsk = D_UNKNOWN;
845         }
846
847         /* Clear the aftr_isp when becoming unconfigured */
848         if (ns.conn == C_STANDALONE && ns.disk == D_DISKLESS && ns.role == R_SECONDARY)
849                 ns.aftr_isp = 0;
850
851         /* Abort resync if a disk fails/detaches */
852         if (os.conn > C_CONNECTED && ns.conn > C_CONNECTED &&
853             (ns.disk <= D_FAILED || ns.pdsk <= D_FAILED)) {
854                 if (warn_sync_abort)
855                         *warn_sync_abort =
856                                 os.conn == C_VERIFY_S || os.conn == C_VERIFY_T ?
857                                 "Online-verify" : "Resync";
858                 ns.conn = C_CONNECTED;
859         }
860
861         if (ns.conn >= C_CONNECTED &&
862             ((ns.disk == D_CONSISTENT || ns.disk == D_OUTDATED) ||
863              (ns.disk == D_NEGOTIATING && ns.conn == C_WF_BITMAP_T))) {
864                 switch (ns.conn) {
865                 case C_WF_BITMAP_T:
866                 case C_PAUSED_SYNC_T:
867                         ns.disk = D_OUTDATED;
868                         break;
869                 case C_CONNECTED:
870                 case C_WF_BITMAP_S:
871                 case C_SYNC_SOURCE:
872                 case C_PAUSED_SYNC_S:
873                         ns.disk = D_UP_TO_DATE;
874                         break;
875                 case C_SYNC_TARGET:
876                         ns.disk = D_INCONSISTENT;
877                         dev_warn(DEV, "Implicitly set disk state Inconsistent!\n");
878                         break;
879                 }
880                 if (os.disk == D_OUTDATED && ns.disk == D_UP_TO_DATE)
881                         dev_warn(DEV, "Implicitly set disk from Outdated to UpToDate\n");
882         }
883
884         if (ns.conn >= C_CONNECTED &&
885             (ns.pdsk == D_CONSISTENT || ns.pdsk == D_OUTDATED)) {
886                 switch (ns.conn) {
887                 case C_CONNECTED:
888                 case C_WF_BITMAP_T:
889                 case C_PAUSED_SYNC_T:
890                 case C_SYNC_TARGET:
891                         ns.pdsk = D_UP_TO_DATE;
892                         break;
893                 case C_WF_BITMAP_S:
894                 case C_PAUSED_SYNC_S:
895                         /* remap any consistent state to D_OUTDATED,
896                          * but disallow "upgrade" of not even consistent states.
897                          */
898                         ns.pdsk =
899                                 (D_DISKLESS < os.pdsk && os.pdsk < D_OUTDATED)
900                                 ? os.pdsk : D_OUTDATED;
901                         break;
902                 case C_SYNC_SOURCE:
903                         ns.pdsk = D_INCONSISTENT;
904                         dev_warn(DEV, "Implicitly set pdsk Inconsistent!\n");
905                         break;
906                 }
907                 if (os.pdsk == D_OUTDATED && ns.pdsk == D_UP_TO_DATE)
908                         dev_warn(DEV, "Implicitly set pdsk from Outdated to UpToDate\n");
909         }
910
911         /* Connection breaks down before we finished "Negotiating" */
912         if (ns.conn < C_CONNECTED && ns.disk == D_NEGOTIATING &&
913             get_ldev_if_state(mdev, D_NEGOTIATING)) {
914                 if (mdev->ed_uuid == mdev->ldev->md.uuid[UI_CURRENT]) {
915                         ns.disk = mdev->new_state_tmp.disk;
916                         ns.pdsk = mdev->new_state_tmp.pdsk;
917                 } else {
918                         dev_alert(DEV, "Connection lost while negotiating, no data!\n");
919                         ns.disk = D_DISKLESS;
920                         ns.pdsk = D_UNKNOWN;
921                 }
922                 put_ldev(mdev);
923         }
924
925         if (fp == FP_STONITH &&
926             (ns.role == R_PRIMARY && ns.conn < C_CONNECTED && ns.pdsk > D_OUTDATED) &&
927             !(os.role == R_PRIMARY && os.conn < C_CONNECTED && os.pdsk > D_OUTDATED))
928                 ns.susp_fen = 1; /* Suspend IO while fence-peer handler runs (peer lost) */
929
930         if (mdev->sync_conf.on_no_data == OND_SUSPEND_IO &&
931             (ns.role == R_PRIMARY && ns.disk < D_UP_TO_DATE && ns.pdsk < D_UP_TO_DATE) &&
932             !(os.role == R_PRIMARY && os.disk < D_UP_TO_DATE && os.pdsk < D_UP_TO_DATE))
933                 ns.susp_nod = 1; /* Suspend IO while no data available (no accessible data available) */
934
935         if (ns.aftr_isp || ns.peer_isp || ns.user_isp) {
936                 if (ns.conn == C_SYNC_SOURCE)
937                         ns.conn = C_PAUSED_SYNC_S;
938                 if (ns.conn == C_SYNC_TARGET)
939                         ns.conn = C_PAUSED_SYNC_T;
940         } else {
941                 if (ns.conn == C_PAUSED_SYNC_S)
942                         ns.conn = C_SYNC_SOURCE;
943                 if (ns.conn == C_PAUSED_SYNC_T)
944                         ns.conn = C_SYNC_TARGET;
945         }
946
947         return ns;
948 }
949
950 /* helper for __drbd_set_state */
951 static void set_ov_position(struct drbd_conf *mdev, enum drbd_conns cs)
952 {
953         if (cs == C_VERIFY_T) {
954                 /* starting online verify from an arbitrary position
955                  * does not fit well into the existing protocol.
956                  * on C_VERIFY_T, we initialize ov_left and friends
957                  * implicitly in receive_DataRequest once the
958                  * first P_OV_REQUEST is received */
959                 mdev->ov_start_sector = ~(sector_t)0;
960         } else {
961                 unsigned long bit = BM_SECT_TO_BIT(mdev->ov_start_sector);
962                 if (bit >= mdev->rs_total)
963                         mdev->ov_start_sector =
964                                 BM_BIT_TO_SECT(mdev->rs_total - 1);
965                 mdev->ov_position = mdev->ov_start_sector;
966         }
967 }
968
969 static void drbd_resume_al(struct drbd_conf *mdev)
970 {
971         if (test_and_clear_bit(AL_SUSPENDED, &mdev->flags))
972                 dev_info(DEV, "Resumed AL updates\n");
973 }
974
975 /**
976  * __drbd_set_state() - Set a new DRBD state
977  * @mdev:       DRBD device.
978  * @ns:         new state.
979  * @flags:      Flags
980  * @done:       Optional completion, that will get completed after the after_state_ch() finished
981  *
982  * Caller needs to hold req_lock, and global_state_lock. Do not call directly.
983  */
984 int __drbd_set_state(struct drbd_conf *mdev,
985                     union drbd_state ns, enum chg_state_flags flags,
986                     struct completion *done)
987 {
988         union drbd_state os;
989         int rv = SS_SUCCESS;
990         const char *warn_sync_abort = NULL;
991         struct after_state_chg_work *ascw;
992
993         os = mdev->state;
994
995         ns = sanitize_state(mdev, os, ns, &warn_sync_abort);
996
997         if (ns.i == os.i)
998                 return SS_NOTHING_TO_DO;
999
1000         if (!(flags & CS_HARD)) {
1001                 /*  pre-state-change checks ; only look at ns  */
1002                 /* See drbd_state_sw_errors in drbd_strings.c */
1003
1004                 rv = is_valid_state(mdev, ns);
1005                 if (rv < SS_SUCCESS) {
1006                         /* If the old state was illegal as well, then let
1007                            this happen...*/
1008
1009                         if (is_valid_state(mdev, os) == rv)
1010                                 rv = is_valid_state_transition(mdev, ns, os);
1011                 } else
1012                         rv = is_valid_state_transition(mdev, ns, os);
1013         }
1014
1015         if (rv < SS_SUCCESS) {
1016                 if (flags & CS_VERBOSE)
1017                         print_st_err(mdev, os, ns, rv);
1018                 return rv;
1019         }
1020
1021         if (warn_sync_abort)
1022                 dev_warn(DEV, "%s aborted.\n", warn_sync_abort);
1023
1024         {
1025                 char *pbp, pb[300];
1026                 pbp = pb;
1027                 *pbp = 0;
1028                 PSC(role);
1029                 PSC(peer);
1030                 PSC(conn);
1031                 PSC(disk);
1032                 PSC(pdsk);
1033                 if (is_susp(ns) != is_susp(os))
1034                         pbp += sprintf(pbp, "susp( %s -> %s ) ",
1035                                        drbd_susp_str(is_susp(os)),
1036                                        drbd_susp_str(is_susp(ns)));
1037                 PSC(aftr_isp);
1038                 PSC(peer_isp);
1039                 PSC(user_isp);
1040                 dev_info(DEV, "%s\n", pb);
1041         }
1042
1043         /* solve the race between becoming unconfigured,
1044          * worker doing the cleanup, and
1045          * admin reconfiguring us:
1046          * on (re)configure, first set CONFIG_PENDING,
1047          * then wait for a potentially exiting worker,
1048          * start the worker, and schedule one no_op.
1049          * then proceed with configuration.
1050          */
1051         if (ns.disk == D_DISKLESS &&
1052             ns.conn == C_STANDALONE &&
1053             ns.role == R_SECONDARY &&
1054             !test_and_set_bit(CONFIG_PENDING, &mdev->flags))
1055                 set_bit(DEVICE_DYING, &mdev->flags);
1056
1057         mdev->state.i = ns.i;
1058         wake_up(&mdev->misc_wait);
1059         wake_up(&mdev->state_wait);
1060
1061         /* aborted verify run. log the last position */
1062         if ((os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) &&
1063             ns.conn < C_CONNECTED) {
1064                 mdev->ov_start_sector =
1065                         BM_BIT_TO_SECT(mdev->rs_total - mdev->ov_left);
1066                 dev_info(DEV, "Online Verify reached sector %llu\n",
1067                         (unsigned long long)mdev->ov_start_sector);
1068         }
1069
1070         if ((os.conn == C_PAUSED_SYNC_T || os.conn == C_PAUSED_SYNC_S) &&
1071             (ns.conn == C_SYNC_TARGET  || ns.conn == C_SYNC_SOURCE)) {
1072                 dev_info(DEV, "Syncer continues.\n");
1073                 mdev->rs_paused += (long)jiffies
1074                                   -(long)mdev->rs_mark_time[mdev->rs_last_mark];
1075                 if (ns.conn == C_SYNC_TARGET)
1076                         mod_timer(&mdev->resync_timer, jiffies);
1077         }
1078
1079         if ((os.conn == C_SYNC_TARGET  || os.conn == C_SYNC_SOURCE) &&
1080             (ns.conn == C_PAUSED_SYNC_T || ns.conn == C_PAUSED_SYNC_S)) {
1081                 dev_info(DEV, "Resync suspended\n");
1082                 mdev->rs_mark_time[mdev->rs_last_mark] = jiffies;
1083         }
1084
1085         if (os.conn == C_CONNECTED &&
1086             (ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T)) {
1087                 unsigned long now = jiffies;
1088                 int i;
1089
1090                 mdev->ov_position = 0;
1091                 mdev->rs_total = drbd_bm_bits(mdev);
1092                 if (mdev->agreed_pro_version >= 90)
1093                         set_ov_position(mdev, ns.conn);
1094                 else
1095                         mdev->ov_start_sector = 0;
1096                 mdev->ov_left = mdev->rs_total
1097                               - BM_SECT_TO_BIT(mdev->ov_position);
1098                 mdev->rs_start = now;
1099                 mdev->rs_last_events = 0;
1100                 mdev->rs_last_sect_ev = 0;
1101                 mdev->ov_last_oos_size = 0;
1102                 mdev->ov_last_oos_start = 0;
1103
1104                 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1105                         mdev->rs_mark_left[i] = mdev->rs_total;
1106                         mdev->rs_mark_time[i] = now;
1107                 }
1108
1109                 if (ns.conn == C_VERIFY_S) {
1110                         dev_info(DEV, "Starting Online Verify from sector %llu\n",
1111                                         (unsigned long long)mdev->ov_position);
1112                         mod_timer(&mdev->resync_timer, jiffies);
1113                 }
1114         }
1115
1116         if (get_ldev(mdev)) {
1117                 u32 mdf = mdev->ldev->md.flags & ~(MDF_CONSISTENT|MDF_PRIMARY_IND|
1118                                                  MDF_CONNECTED_IND|MDF_WAS_UP_TO_DATE|
1119                                                  MDF_PEER_OUT_DATED|MDF_CRASHED_PRIMARY);
1120
1121                 if (test_bit(CRASHED_PRIMARY, &mdev->flags))
1122                         mdf |= MDF_CRASHED_PRIMARY;
1123                 if (mdev->state.role == R_PRIMARY ||
1124                     (mdev->state.pdsk < D_INCONSISTENT && mdev->state.peer == R_PRIMARY))
1125                         mdf |= MDF_PRIMARY_IND;
1126                 if (mdev->state.conn > C_WF_REPORT_PARAMS)
1127                         mdf |= MDF_CONNECTED_IND;
1128                 if (mdev->state.disk > D_INCONSISTENT)
1129                         mdf |= MDF_CONSISTENT;
1130                 if (mdev->state.disk > D_OUTDATED)
1131                         mdf |= MDF_WAS_UP_TO_DATE;
1132                 if (mdev->state.pdsk <= D_OUTDATED && mdev->state.pdsk >= D_INCONSISTENT)
1133                         mdf |= MDF_PEER_OUT_DATED;
1134                 if (mdf != mdev->ldev->md.flags) {
1135                         mdev->ldev->md.flags = mdf;
1136                         drbd_md_mark_dirty(mdev);
1137                 }
1138                 if (os.disk < D_CONSISTENT && ns.disk >= D_CONSISTENT)
1139                         drbd_set_ed_uuid(mdev, mdev->ldev->md.uuid[UI_CURRENT]);
1140                 put_ldev(mdev);
1141         }
1142
1143         /* Peer was forced D_UP_TO_DATE & R_PRIMARY, consider to resync */
1144         if (os.disk == D_INCONSISTENT && os.pdsk == D_INCONSISTENT &&
1145             os.peer == R_SECONDARY && ns.peer == R_PRIMARY)
1146                 set_bit(CONSIDER_RESYNC, &mdev->flags);
1147
1148         /* Receiver should clean up itself */
1149         if (os.conn != C_DISCONNECTING && ns.conn == C_DISCONNECTING)
1150                 drbd_thread_stop_nowait(&mdev->receiver);
1151
1152         /* Now the receiver finished cleaning up itself, it should die */
1153         if (os.conn != C_STANDALONE && ns.conn == C_STANDALONE)
1154                 drbd_thread_stop_nowait(&mdev->receiver);
1155
1156         /* Upon network failure, we need to restart the receiver. */
1157         if (os.conn > C_TEAR_DOWN &&
1158             ns.conn <= C_TEAR_DOWN && ns.conn >= C_TIMEOUT)
1159                 drbd_thread_restart_nowait(&mdev->receiver);
1160
1161         /* Resume AL writing if we get a connection */
1162         if (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED)
1163                 drbd_resume_al(mdev);
1164
1165         ascw = kmalloc(sizeof(*ascw), GFP_ATOMIC);
1166         if (ascw) {
1167                 ascw->os = os;
1168                 ascw->ns = ns;
1169                 ascw->flags = flags;
1170                 ascw->w.cb = w_after_state_ch;
1171                 ascw->done = done;
1172                 drbd_queue_work(&mdev->data.work, &ascw->w);
1173         } else {
1174                 dev_warn(DEV, "Could not kmalloc an ascw\n");
1175         }
1176
1177         return rv;
1178 }
1179
1180 static int w_after_state_ch(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1181 {
1182         struct after_state_chg_work *ascw =
1183                 container_of(w, struct after_state_chg_work, w);
1184         after_state_ch(mdev, ascw->os, ascw->ns, ascw->flags);
1185         if (ascw->flags & CS_WAIT_COMPLETE) {
1186                 D_ASSERT(ascw->done != NULL);
1187                 complete(ascw->done);
1188         }
1189         kfree(ascw);
1190
1191         return 1;
1192 }
1193
1194 static void abw_start_sync(struct drbd_conf *mdev, int rv)
1195 {
1196         if (rv) {
1197                 dev_err(DEV, "Writing the bitmap failed not starting resync.\n");
1198                 _drbd_request_state(mdev, NS(conn, C_CONNECTED), CS_VERBOSE);
1199                 return;
1200         }
1201
1202         switch (mdev->state.conn) {
1203         case C_STARTING_SYNC_T:
1204                 _drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
1205                 break;
1206         case C_STARTING_SYNC_S:
1207                 drbd_start_resync(mdev, C_SYNC_SOURCE);
1208                 break;
1209         }
1210 }
1211
1212 /**
1213  * after_state_ch() - Perform after state change actions that may sleep
1214  * @mdev:       DRBD device.
1215  * @os:         old state.
1216  * @ns:         new state.
1217  * @flags:      Flags
1218  */
1219 static void after_state_ch(struct drbd_conf *mdev, union drbd_state os,
1220                            union drbd_state ns, enum chg_state_flags flags)
1221 {
1222         enum drbd_fencing_p fp;
1223         enum drbd_req_event what = nothing;
1224         union drbd_state nsm = (union drbd_state){ .i = -1 };
1225
1226         if (os.conn != C_CONNECTED && ns.conn == C_CONNECTED) {
1227                 clear_bit(CRASHED_PRIMARY, &mdev->flags);
1228                 if (mdev->p_uuid)
1229                         mdev->p_uuid[UI_FLAGS] &= ~((u64)2);
1230         }
1231
1232         fp = FP_DONT_CARE;
1233         if (get_ldev(mdev)) {
1234                 fp = mdev->ldev->dc.fencing;
1235                 put_ldev(mdev);
1236         }
1237
1238         /* Inform userspace about the change... */
1239         drbd_bcast_state(mdev, ns);
1240
1241         if (!(os.role == R_PRIMARY && os.disk < D_UP_TO_DATE && os.pdsk < D_UP_TO_DATE) &&
1242             (ns.role == R_PRIMARY && ns.disk < D_UP_TO_DATE && ns.pdsk < D_UP_TO_DATE))
1243                 drbd_khelper(mdev, "pri-on-incon-degr");
1244
1245         /* Here we have the actions that are performed after a
1246            state change. This function might sleep */
1247
1248         nsm.i = -1;
1249         if (ns.susp_nod) {
1250                 if (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED) {
1251                         if (ns.conn == C_CONNECTED)
1252                                 what = resend, nsm.susp_nod = 0;
1253                         else /* ns.conn > C_CONNECTED */
1254                                 dev_err(DEV, "Unexpected Resynd going on!\n");
1255                 }
1256
1257                 if (os.disk == D_ATTACHING && ns.disk > D_ATTACHING)
1258                         what = restart_frozen_disk_io, nsm.susp_nod = 0;
1259
1260         }
1261
1262         if (ns.susp_fen) {
1263                 /* case1: The outdate peer handler is successful: */
1264                 if (os.pdsk > D_OUTDATED  && ns.pdsk <= D_OUTDATED) {
1265                         tl_clear(mdev);
1266                         if (test_bit(NEW_CUR_UUID, &mdev->flags)) {
1267                                 drbd_uuid_new_current(mdev);
1268                                 clear_bit(NEW_CUR_UUID, &mdev->flags);
1269                                 drbd_md_sync(mdev);
1270                         }
1271                         spin_lock_irq(&mdev->req_lock);
1272                         _drbd_set_state(_NS(mdev, susp_fen, 0), CS_VERBOSE, NULL);
1273                         spin_unlock_irq(&mdev->req_lock);
1274                 }
1275                 /* case2: The connection was established again: */
1276                 if (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED) {
1277                         clear_bit(NEW_CUR_UUID, &mdev->flags);
1278                         what = resend;
1279                         nsm.susp_fen = 0;
1280                 }
1281         }
1282
1283         if (what != nothing) {
1284                 spin_lock_irq(&mdev->req_lock);
1285                 _tl_restart(mdev, what);
1286                 nsm.i &= mdev->state.i;
1287                 _drbd_set_state(mdev, nsm, CS_VERBOSE, NULL);
1288                 spin_unlock_irq(&mdev->req_lock);
1289         }
1290
1291         /* Do not change the order of the if above and the two below... */
1292         if (os.pdsk == D_DISKLESS && ns.pdsk > D_DISKLESS) {      /* attach on the peer */
1293                 drbd_send_uuids(mdev);
1294                 drbd_send_state(mdev);
1295         }
1296         if (os.conn != C_WF_BITMAP_S && ns.conn == C_WF_BITMAP_S)
1297                 drbd_queue_bitmap_io(mdev, &drbd_send_bitmap, NULL, "send_bitmap (WFBitMapS)");
1298
1299         /* Lost contact to peer's copy of the data */
1300         if ((os.pdsk >= D_INCONSISTENT &&
1301              os.pdsk != D_UNKNOWN &&
1302              os.pdsk != D_OUTDATED)
1303         &&  (ns.pdsk < D_INCONSISTENT ||
1304              ns.pdsk == D_UNKNOWN ||
1305              ns.pdsk == D_OUTDATED)) {
1306                 if (get_ldev(mdev)) {
1307                         if ((ns.role == R_PRIMARY || ns.peer == R_PRIMARY) &&
1308                             mdev->ldev->md.uuid[UI_BITMAP] == 0 && ns.disk >= D_UP_TO_DATE) {
1309                                 if (is_susp(mdev->state)) {
1310                                         set_bit(NEW_CUR_UUID, &mdev->flags);
1311                                 } else {
1312                                         drbd_uuid_new_current(mdev);
1313                                         drbd_send_uuids(mdev);
1314                                 }
1315                         }
1316                         put_ldev(mdev);
1317                 }
1318         }
1319
1320         if (ns.pdsk < D_INCONSISTENT && get_ldev(mdev)) {
1321                 if (ns.peer == R_PRIMARY && mdev->ldev->md.uuid[UI_BITMAP] == 0) {
1322                         drbd_uuid_new_current(mdev);
1323                         drbd_send_uuids(mdev);
1324                 }
1325
1326                 /* D_DISKLESS Peer becomes secondary */
1327                 if (os.peer == R_PRIMARY && ns.peer == R_SECONDARY)
1328                         drbd_al_to_on_disk_bm(mdev);
1329                 put_ldev(mdev);
1330         }
1331
1332         /* Last part of the attaching process ... */
1333         if (ns.conn >= C_CONNECTED &&
1334             os.disk == D_ATTACHING && ns.disk == D_NEGOTIATING) {
1335                 drbd_send_sizes(mdev, 0, 0);  /* to start sync... */
1336                 drbd_send_uuids(mdev);
1337                 drbd_send_state(mdev);
1338         }
1339
1340         /* We want to pause/continue resync, tell peer. */
1341         if (ns.conn >= C_CONNECTED &&
1342              ((os.aftr_isp != ns.aftr_isp) ||
1343               (os.user_isp != ns.user_isp)))
1344                 drbd_send_state(mdev);
1345
1346         /* In case one of the isp bits got set, suspend other devices. */
1347         if ((!os.aftr_isp && !os.peer_isp && !os.user_isp) &&
1348             (ns.aftr_isp || ns.peer_isp || ns.user_isp))
1349                 suspend_other_sg(mdev);
1350
1351         /* Make sure the peer gets informed about eventual state
1352            changes (ISP bits) while we were in WFReportParams. */
1353         if (os.conn == C_WF_REPORT_PARAMS && ns.conn >= C_CONNECTED)
1354                 drbd_send_state(mdev);
1355
1356         /* We are in the progress to start a full sync... */
1357         if ((os.conn != C_STARTING_SYNC_T && ns.conn == C_STARTING_SYNC_T) ||
1358             (os.conn != C_STARTING_SYNC_S && ns.conn == C_STARTING_SYNC_S))
1359                 drbd_queue_bitmap_io(mdev, &drbd_bmio_set_n_write, &abw_start_sync, "set_n_write from StartingSync");
1360
1361         /* We are invalidating our self... */
1362         if (os.conn < C_CONNECTED && ns.conn < C_CONNECTED &&
1363             os.disk > D_INCONSISTENT && ns.disk == D_INCONSISTENT)
1364                 drbd_queue_bitmap_io(mdev, &drbd_bmio_set_n_write, NULL, "set_n_write from invalidate");
1365
1366         if (os.disk > D_FAILED && ns.disk == D_FAILED) {
1367                 enum drbd_io_error_p eh;
1368
1369                 eh = EP_PASS_ON;
1370                 if (get_ldev_if_state(mdev, D_FAILED)) {
1371                         eh = mdev->ldev->dc.on_io_error;
1372                         put_ldev(mdev);
1373                 }
1374
1375                 drbd_rs_cancel_all(mdev);
1376                 /* since get_ldev() only works as long as disk>=D_INCONSISTENT,
1377                    and it is D_DISKLESS here, local_cnt can only go down, it can
1378                    not increase... It will reach zero */
1379                 wait_event(mdev->misc_wait, !atomic_read(&mdev->local_cnt));
1380                 mdev->rs_total = 0;
1381                 mdev->rs_failed = 0;
1382                 atomic_set(&mdev->rs_pending_cnt, 0);
1383
1384                 spin_lock_irq(&mdev->req_lock);
1385                 _drbd_set_state(_NS(mdev, disk, D_DISKLESS), CS_HARD, NULL);
1386                 spin_unlock_irq(&mdev->req_lock);
1387
1388                 if (eh == EP_CALL_HELPER)
1389                         drbd_khelper(mdev, "local-io-error");
1390         }
1391
1392         if (os.disk > D_DISKLESS && ns.disk == D_DISKLESS) {
1393
1394                 if (os.disk == D_FAILED) /* && ns.disk == D_DISKLESS*/ {
1395                         if (drbd_send_state(mdev))
1396                                 dev_warn(DEV, "Notified peer that my disk is broken.\n");
1397                         else
1398                                 dev_err(DEV, "Sending state in drbd_io_error() failed\n");
1399                 }
1400
1401                 wait_event(mdev->misc_wait, !atomic_read(&mdev->local_cnt));
1402                 lc_destroy(mdev->resync);
1403                 mdev->resync = NULL;
1404                 lc_destroy(mdev->act_log);
1405                 mdev->act_log = NULL;
1406                 __no_warn(local,
1407                         drbd_free_bc(mdev->ldev);
1408                         mdev->ldev = NULL;);
1409
1410                 if (mdev->md_io_tmpp)
1411                         __free_page(mdev->md_io_tmpp);
1412         }
1413
1414         /* Disks got bigger while they were detached */
1415         if (ns.disk > D_NEGOTIATING && ns.pdsk > D_NEGOTIATING &&
1416             test_and_clear_bit(RESYNC_AFTER_NEG, &mdev->flags)) {
1417                 if (ns.conn == C_CONNECTED)
1418                         resync_after_online_grow(mdev);
1419         }
1420
1421         /* A resync finished or aborted, wake paused devices... */
1422         if ((os.conn > C_CONNECTED && ns.conn <= C_CONNECTED) ||
1423             (os.peer_isp && !ns.peer_isp) ||
1424             (os.user_isp && !ns.user_isp))
1425                 resume_next_sg(mdev);
1426
1427         /* free tl_hash if we Got thawed and are C_STANDALONE */
1428         if (ns.conn == C_STANDALONE && !is_susp(ns) && mdev->tl_hash)
1429                 drbd_free_tl_hash(mdev);
1430
1431         /* Upon network connection, we need to start the receiver */
1432         if (os.conn == C_STANDALONE && ns.conn == C_UNCONNECTED)
1433                 drbd_thread_start(&mdev->receiver);
1434
1435         /* Terminate worker thread if we are unconfigured - it will be
1436            restarted as needed... */
1437         if (ns.disk == D_DISKLESS &&
1438             ns.conn == C_STANDALONE &&
1439             ns.role == R_SECONDARY) {
1440                 if (os.aftr_isp != ns.aftr_isp)
1441                         resume_next_sg(mdev);
1442                 /* set in __drbd_set_state, unless CONFIG_PENDING was set */
1443                 if (test_bit(DEVICE_DYING, &mdev->flags))
1444                         drbd_thread_stop_nowait(&mdev->worker);
1445         }
1446
1447         drbd_md_sync(mdev);
1448 }
1449
1450
1451 static int drbd_thread_setup(void *arg)
1452 {
1453         struct drbd_thread *thi = (struct drbd_thread *) arg;
1454         struct drbd_conf *mdev = thi->mdev;
1455         unsigned long flags;
1456         int retval;
1457
1458 restart:
1459         retval = thi->function(thi);
1460
1461         spin_lock_irqsave(&thi->t_lock, flags);
1462
1463         /* if the receiver has been "Exiting", the last thing it did
1464          * was set the conn state to "StandAlone",
1465          * if now a re-connect request comes in, conn state goes C_UNCONNECTED,
1466          * and receiver thread will be "started".
1467          * drbd_thread_start needs to set "Restarting" in that case.
1468          * t_state check and assignment needs to be within the same spinlock,
1469          * so either thread_start sees Exiting, and can remap to Restarting,
1470          * or thread_start see None, and can proceed as normal.
1471          */
1472
1473         if (thi->t_state == Restarting) {
1474                 dev_info(DEV, "Restarting %s\n", current->comm);
1475                 thi->t_state = Running;
1476                 spin_unlock_irqrestore(&thi->t_lock, flags);
1477                 goto restart;
1478         }
1479
1480         thi->task = NULL;
1481         thi->t_state = None;
1482         smp_mb();
1483         complete(&thi->stop);
1484         spin_unlock_irqrestore(&thi->t_lock, flags);
1485
1486         dev_info(DEV, "Terminating %s\n", current->comm);
1487
1488         /* Release mod reference taken when thread was started */
1489         module_put(THIS_MODULE);
1490         return retval;
1491 }
1492
1493 static void drbd_thread_init(struct drbd_conf *mdev, struct drbd_thread *thi,
1494                       int (*func) (struct drbd_thread *))
1495 {
1496         spin_lock_init(&thi->t_lock);
1497         thi->task    = NULL;
1498         thi->t_state = None;
1499         thi->function = func;
1500         thi->mdev = mdev;
1501 }
1502
1503 int drbd_thread_start(struct drbd_thread *thi)
1504 {
1505         struct drbd_conf *mdev = thi->mdev;
1506         struct task_struct *nt;
1507         unsigned long flags;
1508
1509         const char *me =
1510                 thi == &mdev->receiver ? "receiver" :
1511                 thi == &mdev->asender  ? "asender"  :
1512                 thi == &mdev->worker   ? "worker"   : "NONSENSE";
1513
1514         /* is used from state engine doing drbd_thread_stop_nowait,
1515          * while holding the req lock irqsave */
1516         spin_lock_irqsave(&thi->t_lock, flags);
1517
1518         switch (thi->t_state) {
1519         case None:
1520                 dev_info(DEV, "Starting %s thread (from %s [%d])\n",
1521                                 me, current->comm, current->pid);
1522
1523                 /* Get ref on module for thread - this is released when thread exits */
1524                 if (!try_module_get(THIS_MODULE)) {
1525                         dev_err(DEV, "Failed to get module reference in drbd_thread_start\n");
1526                         spin_unlock_irqrestore(&thi->t_lock, flags);
1527                         return FALSE;
1528                 }
1529
1530                 init_completion(&thi->stop);
1531                 D_ASSERT(thi->task == NULL);
1532                 thi->reset_cpu_mask = 1;
1533                 thi->t_state = Running;
1534                 spin_unlock_irqrestore(&thi->t_lock, flags);
1535                 flush_signals(current); /* otherw. may get -ERESTARTNOINTR */
1536
1537                 nt = kthread_create(drbd_thread_setup, (void *) thi,
1538                                     "drbd%d_%s", mdev_to_minor(mdev), me);
1539
1540                 if (IS_ERR(nt)) {
1541                         dev_err(DEV, "Couldn't start thread\n");
1542
1543                         module_put(THIS_MODULE);
1544                         return FALSE;
1545                 }
1546                 spin_lock_irqsave(&thi->t_lock, flags);
1547                 thi->task = nt;
1548                 thi->t_state = Running;
1549                 spin_unlock_irqrestore(&thi->t_lock, flags);
1550                 wake_up_process(nt);
1551                 break;
1552         case Exiting:
1553                 thi->t_state = Restarting;
1554                 dev_info(DEV, "Restarting %s thread (from %s [%d])\n",
1555                                 me, current->comm, current->pid);
1556                 /* fall through */
1557         case Running:
1558         case Restarting:
1559         default:
1560                 spin_unlock_irqrestore(&thi->t_lock, flags);
1561                 break;
1562         }
1563
1564         return TRUE;
1565 }
1566
1567
1568 void _drbd_thread_stop(struct drbd_thread *thi, int restart, int wait)
1569 {
1570         unsigned long flags;
1571
1572         enum drbd_thread_state ns = restart ? Restarting : Exiting;
1573
1574         /* may be called from state engine, holding the req lock irqsave */
1575         spin_lock_irqsave(&thi->t_lock, flags);
1576
1577         if (thi->t_state == None) {
1578                 spin_unlock_irqrestore(&thi->t_lock, flags);
1579                 if (restart)
1580                         drbd_thread_start(thi);
1581                 return;
1582         }
1583
1584         if (thi->t_state != ns) {
1585                 if (thi->task == NULL) {
1586                         spin_unlock_irqrestore(&thi->t_lock, flags);
1587                         return;
1588                 }
1589
1590                 thi->t_state = ns;
1591                 smp_mb();
1592                 init_completion(&thi->stop);
1593                 if (thi->task != current)
1594                         force_sig(DRBD_SIGKILL, thi->task);
1595
1596         }
1597
1598         spin_unlock_irqrestore(&thi->t_lock, flags);
1599
1600         if (wait)
1601                 wait_for_completion(&thi->stop);
1602 }
1603
1604 #ifdef CONFIG_SMP
1605 /**
1606  * drbd_calc_cpu_mask() - Generate CPU masks, spread over all CPUs
1607  * @mdev:       DRBD device.
1608  *
1609  * Forces all threads of a device onto the same CPU. This is beneficial for
1610  * DRBD's performance. May be overwritten by user's configuration.
1611  */
1612 void drbd_calc_cpu_mask(struct drbd_conf *mdev)
1613 {
1614         int ord, cpu;
1615
1616         /* user override. */
1617         if (cpumask_weight(mdev->cpu_mask))
1618                 return;
1619
1620         ord = mdev_to_minor(mdev) % cpumask_weight(cpu_online_mask);
1621         for_each_online_cpu(cpu) {
1622                 if (ord-- == 0) {
1623                         cpumask_set_cpu(cpu, mdev->cpu_mask);
1624                         return;
1625                 }
1626         }
1627         /* should not be reached */
1628         cpumask_setall(mdev->cpu_mask);
1629 }
1630
1631 /**
1632  * drbd_thread_current_set_cpu() - modifies the cpu mask of the _current_ thread
1633  * @mdev:       DRBD device.
1634  *
1635  * call in the "main loop" of _all_ threads, no need for any mutex, current won't die
1636  * prematurely.
1637  */
1638 void drbd_thread_current_set_cpu(struct drbd_conf *mdev)
1639 {
1640         struct task_struct *p = current;
1641         struct drbd_thread *thi =
1642                 p == mdev->asender.task  ? &mdev->asender  :
1643                 p == mdev->receiver.task ? &mdev->receiver :
1644                 p == mdev->worker.task   ? &mdev->worker   :
1645                 NULL;
1646         ERR_IF(thi == NULL)
1647                 return;
1648         if (!thi->reset_cpu_mask)
1649                 return;
1650         thi->reset_cpu_mask = 0;
1651         set_cpus_allowed_ptr(p, mdev->cpu_mask);
1652 }
1653 #endif
1654
1655 /* the appropriate socket mutex must be held already */
1656 int _drbd_send_cmd(struct drbd_conf *mdev, struct socket *sock,
1657                           enum drbd_packets cmd, struct p_header80 *h,
1658                           size_t size, unsigned msg_flags)
1659 {
1660         int sent, ok;
1661
1662         ERR_IF(!h) return FALSE;
1663         ERR_IF(!size) return FALSE;
1664
1665         h->magic   = BE_DRBD_MAGIC;
1666         h->command = cpu_to_be16(cmd);
1667         h->length  = cpu_to_be16(size-sizeof(struct p_header80));
1668
1669         sent = drbd_send(mdev, sock, h, size, msg_flags);
1670
1671         ok = (sent == size);
1672         if (!ok)
1673                 dev_err(DEV, "short sent %s size=%d sent=%d\n",
1674                     cmdname(cmd), (int)size, sent);
1675         return ok;
1676 }
1677
1678 /* don't pass the socket. we may only look at it
1679  * when we hold the appropriate socket mutex.
1680  */
1681 int drbd_send_cmd(struct drbd_conf *mdev, int use_data_socket,
1682                   enum drbd_packets cmd, struct p_header80 *h, size_t size)
1683 {
1684         int ok = 0;
1685         struct socket *sock;
1686
1687         if (use_data_socket) {
1688                 mutex_lock(&mdev->data.mutex);
1689                 sock = mdev->data.socket;
1690         } else {
1691                 mutex_lock(&mdev->meta.mutex);
1692                 sock = mdev->meta.socket;
1693         }
1694
1695         /* drbd_disconnect() could have called drbd_free_sock()
1696          * while we were waiting in down()... */
1697         if (likely(sock != NULL))
1698                 ok = _drbd_send_cmd(mdev, sock, cmd, h, size, 0);
1699
1700         if (use_data_socket)
1701                 mutex_unlock(&mdev->data.mutex);
1702         else
1703                 mutex_unlock(&mdev->meta.mutex);
1704         return ok;
1705 }
1706
1707 int drbd_send_cmd2(struct drbd_conf *mdev, enum drbd_packets cmd, char *data,
1708                    size_t size)
1709 {
1710         struct p_header80 h;
1711         int ok;
1712
1713         h.magic   = BE_DRBD_MAGIC;
1714         h.command = cpu_to_be16(cmd);
1715         h.length  = cpu_to_be16(size);
1716
1717         if (!drbd_get_data_sock(mdev))
1718                 return 0;
1719
1720         ok = (sizeof(h) ==
1721                 drbd_send(mdev, mdev->data.socket, &h, sizeof(h), 0));
1722         ok = ok && (size ==
1723                 drbd_send(mdev, mdev->data.socket, data, size, 0));
1724
1725         drbd_put_data_sock(mdev);
1726
1727         return ok;
1728 }
1729
1730 int drbd_send_sync_param(struct drbd_conf *mdev, struct syncer_conf *sc)
1731 {
1732         struct p_rs_param_95 *p;
1733         struct socket *sock;
1734         int size, rv;
1735         const int apv = mdev->agreed_pro_version;
1736
1737         size = apv <= 87 ? sizeof(struct p_rs_param)
1738                 : apv == 88 ? sizeof(struct p_rs_param)
1739                         + strlen(mdev->sync_conf.verify_alg) + 1
1740                 : apv <= 94 ? sizeof(struct p_rs_param_89)
1741                 : /* apv >= 95 */ sizeof(struct p_rs_param_95);
1742
1743         /* used from admin command context and receiver/worker context.
1744          * to avoid kmalloc, grab the socket right here,
1745          * then use the pre-allocated sbuf there */
1746         mutex_lock(&mdev->data.mutex);
1747         sock = mdev->data.socket;
1748
1749         if (likely(sock != NULL)) {
1750                 enum drbd_packets cmd = apv >= 89 ? P_SYNC_PARAM89 : P_SYNC_PARAM;
1751
1752                 p = &mdev->data.sbuf.rs_param_95;
1753
1754                 /* initialize verify_alg and csums_alg */
1755                 memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
1756
1757                 p->rate = cpu_to_be32(sc->rate);
1758                 p->c_plan_ahead = cpu_to_be32(sc->c_plan_ahead);
1759                 p->c_delay_target = cpu_to_be32(sc->c_delay_target);
1760                 p->c_fill_target = cpu_to_be32(sc->c_fill_target);
1761                 p->c_max_rate = cpu_to_be32(sc->c_max_rate);
1762
1763                 if (apv >= 88)
1764                         strcpy(p->verify_alg, mdev->sync_conf.verify_alg);
1765                 if (apv >= 89)
1766                         strcpy(p->csums_alg, mdev->sync_conf.csums_alg);
1767
1768                 rv = _drbd_send_cmd(mdev, sock, cmd, &p->head, size, 0);
1769         } else
1770                 rv = 0; /* not ok */
1771
1772         mutex_unlock(&mdev->data.mutex);
1773
1774         return rv;
1775 }
1776
1777 int drbd_send_protocol(struct drbd_conf *mdev)
1778 {
1779         struct p_protocol *p;
1780         int size, cf, rv;
1781
1782         size = sizeof(struct p_protocol);
1783
1784         if (mdev->agreed_pro_version >= 87)
1785                 size += strlen(mdev->net_conf->integrity_alg) + 1;
1786
1787         /* we must not recurse into our own queue,
1788          * as that is blocked during handshake */
1789         p = kmalloc(size, GFP_NOIO);
1790         if (p == NULL)
1791                 return 0;
1792
1793         p->protocol      = cpu_to_be32(mdev->net_conf->wire_protocol);
1794         p->after_sb_0p   = cpu_to_be32(mdev->net_conf->after_sb_0p);
1795         p->after_sb_1p   = cpu_to_be32(mdev->net_conf->after_sb_1p);
1796         p->after_sb_2p   = cpu_to_be32(mdev->net_conf->after_sb_2p);
1797         p->two_primaries = cpu_to_be32(mdev->net_conf->two_primaries);
1798
1799         cf = 0;
1800         if (mdev->net_conf->want_lose)
1801                 cf |= CF_WANT_LOSE;
1802         if (mdev->net_conf->dry_run) {
1803                 if (mdev->agreed_pro_version >= 92)
1804                         cf |= CF_DRY_RUN;
1805                 else {
1806                         dev_err(DEV, "--dry-run is not supported by peer");
1807                         kfree(p);
1808                         return 0;
1809                 }
1810         }
1811         p->conn_flags    = cpu_to_be32(cf);
1812
1813         if (mdev->agreed_pro_version >= 87)
1814                 strcpy(p->integrity_alg, mdev->net_conf->integrity_alg);
1815
1816         rv = drbd_send_cmd(mdev, USE_DATA_SOCKET, P_PROTOCOL,
1817                            (struct p_header80 *)p, size);
1818         kfree(p);
1819         return rv;
1820 }
1821
1822 int _drbd_send_uuids(struct drbd_conf *mdev, u64 uuid_flags)
1823 {
1824         struct p_uuids p;
1825         int i;
1826
1827         if (!get_ldev_if_state(mdev, D_NEGOTIATING))
1828                 return 1;
1829
1830         for (i = UI_CURRENT; i < UI_SIZE; i++)
1831                 p.uuid[i] = mdev->ldev ? cpu_to_be64(mdev->ldev->md.uuid[i]) : 0;
1832
1833         mdev->comm_bm_set = drbd_bm_total_weight(mdev);
1834         p.uuid[UI_SIZE] = cpu_to_be64(mdev->comm_bm_set);
1835         uuid_flags |= mdev->net_conf->want_lose ? 1 : 0;
1836         uuid_flags |= test_bit(CRASHED_PRIMARY, &mdev->flags) ? 2 : 0;
1837         uuid_flags |= mdev->new_state_tmp.disk == D_INCONSISTENT ? 4 : 0;
1838         p.uuid[UI_FLAGS] = cpu_to_be64(uuid_flags);
1839
1840         put_ldev(mdev);
1841
1842         return drbd_send_cmd(mdev, USE_DATA_SOCKET, P_UUIDS,
1843                              (struct p_header80 *)&p, sizeof(p));
1844 }
1845
1846 int drbd_send_uuids(struct drbd_conf *mdev)
1847 {
1848         return _drbd_send_uuids(mdev, 0);
1849 }
1850
1851 int drbd_send_uuids_skip_initial_sync(struct drbd_conf *mdev)
1852 {
1853         return _drbd_send_uuids(mdev, 8);
1854 }
1855
1856
1857 int drbd_send_sync_uuid(struct drbd_conf *mdev, u64 val)
1858 {
1859         struct p_rs_uuid p;
1860
1861         p.uuid = cpu_to_be64(val);
1862
1863         return drbd_send_cmd(mdev, USE_DATA_SOCKET, P_SYNC_UUID,
1864                              (struct p_header80 *)&p, sizeof(p));
1865 }
1866
1867 int drbd_send_sizes(struct drbd_conf *mdev, int trigger_reply, enum dds_flags flags)
1868 {
1869         struct p_sizes p;
1870         sector_t d_size, u_size;
1871         int q_order_type;
1872         int ok;
1873
1874         if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
1875                 D_ASSERT(mdev->ldev->backing_bdev);
1876                 d_size = drbd_get_max_capacity(mdev->ldev);
1877                 u_size = mdev->ldev->dc.disk_size;
1878                 q_order_type = drbd_queue_order_type(mdev);
1879                 put_ldev(mdev);
1880         } else {
1881                 d_size = 0;
1882                 u_size = 0;
1883                 q_order_type = QUEUE_ORDERED_NONE;
1884         }
1885
1886         p.d_size = cpu_to_be64(d_size);
1887         p.u_size = cpu_to_be64(u_size);
1888         p.c_size = cpu_to_be64(trigger_reply ? 0 : drbd_get_capacity(mdev->this_bdev));
1889         p.max_segment_size = cpu_to_be32(queue_max_segment_size(mdev->rq_queue));
1890         p.queue_order_type = cpu_to_be16(q_order_type);
1891         p.dds_flags = cpu_to_be16(flags);
1892
1893         ok = drbd_send_cmd(mdev, USE_DATA_SOCKET, P_SIZES,
1894                            (struct p_header80 *)&p, sizeof(p));
1895         return ok;
1896 }
1897
1898 /**
1899  * drbd_send_state() - Sends the drbd state to the peer
1900  * @mdev:       DRBD device.
1901  */
1902 int drbd_send_state(struct drbd_conf *mdev)
1903 {
1904         struct socket *sock;
1905         struct p_state p;
1906         int ok = 0;
1907
1908         /* Grab state lock so we wont send state if we're in the middle
1909          * of a cluster wide state change on another thread */
1910         drbd_state_lock(mdev);
1911
1912         mutex_lock(&mdev->data.mutex);
1913
1914         p.state = cpu_to_be32(mdev->state.i); /* Within the send mutex */
1915         sock = mdev->data.socket;
1916
1917         if (likely(sock != NULL)) {
1918                 ok = _drbd_send_cmd(mdev, sock, P_STATE,
1919                                     (struct p_header80 *)&p, sizeof(p), 0);
1920         }
1921
1922         mutex_unlock(&mdev->data.mutex);
1923
1924         drbd_state_unlock(mdev);
1925         return ok;
1926 }
1927
1928 int drbd_send_state_req(struct drbd_conf *mdev,
1929         union drbd_state mask, union drbd_state val)
1930 {
1931         struct p_req_state p;
1932
1933         p.mask    = cpu_to_be32(mask.i);
1934         p.val     = cpu_to_be32(val.i);
1935
1936         return drbd_send_cmd(mdev, USE_DATA_SOCKET, P_STATE_CHG_REQ,
1937                              (struct p_header80 *)&p, sizeof(p));
1938 }
1939
1940 int drbd_send_sr_reply(struct drbd_conf *mdev, int retcode)
1941 {
1942         struct p_req_state_reply p;
1943
1944         p.retcode    = cpu_to_be32(retcode);
1945
1946         return drbd_send_cmd(mdev, USE_META_SOCKET, P_STATE_CHG_REPLY,
1947                              (struct p_header80 *)&p, sizeof(p));
1948 }
1949
1950 int fill_bitmap_rle_bits(struct drbd_conf *mdev,
1951         struct p_compressed_bm *p,
1952         struct bm_xfer_ctx *c)
1953 {
1954         struct bitstream bs;
1955         unsigned long plain_bits;
1956         unsigned long tmp;
1957         unsigned long rl;
1958         unsigned len;
1959         unsigned toggle;
1960         int bits;
1961
1962         /* may we use this feature? */
1963         if ((mdev->sync_conf.use_rle == 0) ||
1964                 (mdev->agreed_pro_version < 90))
1965                         return 0;
1966
1967         if (c->bit_offset >= c->bm_bits)
1968                 return 0; /* nothing to do. */
1969
1970         /* use at most thus many bytes */
1971         bitstream_init(&bs, p->code, BM_PACKET_VLI_BYTES_MAX, 0);
1972         memset(p->code, 0, BM_PACKET_VLI_BYTES_MAX);
1973         /* plain bits covered in this code string */
1974         plain_bits = 0;
1975
1976         /* p->encoding & 0x80 stores whether the first run length is set.
1977          * bit offset is implicit.
1978          * start with toggle == 2 to be able to tell the first iteration */
1979         toggle = 2;
1980
1981         /* see how much plain bits we can stuff into one packet
1982          * using RLE and VLI. */
1983         do {
1984                 tmp = (toggle == 0) ? _drbd_bm_find_next_zero(mdev, c->bit_offset)
1985                                     : _drbd_bm_find_next(mdev, c->bit_offset);
1986                 if (tmp == -1UL)
1987                         tmp = c->bm_bits;
1988                 rl = tmp - c->bit_offset;
1989
1990                 if (toggle == 2) { /* first iteration */
1991                         if (rl == 0) {
1992                                 /* the first checked bit was set,
1993                                  * store start value, */
1994                                 DCBP_set_start(p, 1);
1995                                 /* but skip encoding of zero run length */
1996                                 toggle = !toggle;
1997                                 continue;
1998                         }
1999                         DCBP_set_start(p, 0);
2000                 }
2001
2002                 /* paranoia: catch zero runlength.
2003                  * can only happen if bitmap is modified while we scan it. */
2004                 if (rl == 0) {
2005                         dev_err(DEV, "unexpected zero runlength while encoding bitmap "
2006                             "t:%u bo:%lu\n", toggle, c->bit_offset);
2007                         return -1;
2008                 }
2009
2010                 bits = vli_encode_bits(&bs, rl);
2011                 if (bits == -ENOBUFS) /* buffer full */
2012                         break;
2013                 if (bits <= 0) {
2014                         dev_err(DEV, "error while encoding bitmap: %d\n", bits);
2015                         return 0;
2016                 }
2017
2018                 toggle = !toggle;
2019                 plain_bits += rl;
2020                 c->bit_offset = tmp;
2021         } while (c->bit_offset < c->bm_bits);
2022
2023         len = bs.cur.b - p->code + !!bs.cur.bit;
2024
2025         if (plain_bits < (len << 3)) {
2026                 /* incompressible with this method.
2027                  * we need to rewind both word and bit position. */
2028                 c->bit_offset -= plain_bits;
2029                 bm_xfer_ctx_bit_to_word_offset(c);
2030                 c->bit_offset = c->word_offset * BITS_PER_LONG;
2031                 return 0;
2032         }
2033
2034         /* RLE + VLI was able to compress it just fine.
2035          * update c->word_offset. */
2036         bm_xfer_ctx_bit_to_word_offset(c);
2037
2038         /* store pad_bits */
2039         DCBP_set_pad_bits(p, (8 - bs.cur.bit) & 0x7);
2040
2041         return len;
2042 }
2043
2044 enum { OK, FAILED, DONE }
2045 send_bitmap_rle_or_plain(struct drbd_conf *mdev,
2046         struct p_header80 *h, struct bm_xfer_ctx *c)
2047 {
2048         struct p_compressed_bm *p = (void*)h;
2049         unsigned long num_words;
2050         int len;
2051         int ok;
2052
2053         len = fill_bitmap_rle_bits(mdev, p, c);
2054
2055         if (len < 0)
2056                 return FAILED;
2057
2058         if (len) {
2059                 DCBP_set_code(p, RLE_VLI_Bits);
2060                 ok = _drbd_send_cmd(mdev, mdev->data.socket, P_COMPRESSED_BITMAP, h,
2061                         sizeof(*p) + len, 0);
2062
2063                 c->packets[0]++;
2064                 c->bytes[0] += sizeof(*p) + len;
2065
2066                 if (c->bit_offset >= c->bm_bits)
2067                         len = 0; /* DONE */
2068         } else {
2069                 /* was not compressible.
2070                  * send a buffer full of plain text bits instead. */
2071                 num_words = min_t(size_t, BM_PACKET_WORDS, c->bm_words - c->word_offset);
2072                 len = num_words * sizeof(long);
2073                 if (len)
2074                         drbd_bm_get_lel(mdev, c->word_offset, num_words, (unsigned long*)h->payload);
2075                 ok = _drbd_send_cmd(mdev, mdev->data.socket, P_BITMAP,
2076                                    h, sizeof(struct p_header80) + len, 0);
2077                 c->word_offset += num_words;
2078                 c->bit_offset = c->word_offset * BITS_PER_LONG;
2079
2080                 c->packets[1]++;
2081                 c->bytes[1] += sizeof(struct p_header80) + len;
2082
2083                 if (c->bit_offset > c->bm_bits)
2084                         c->bit_offset = c->bm_bits;
2085         }
2086         ok = ok ? ((len == 0) ? DONE : OK) : FAILED;
2087
2088         if (ok == DONE)
2089                 INFO_bm_xfer_stats(mdev, "send", c);
2090         return ok;
2091 }
2092
2093 /* See the comment at receive_bitmap() */
2094 int _drbd_send_bitmap(struct drbd_conf *mdev)
2095 {
2096         struct bm_xfer_ctx c;
2097         struct p_header80 *p;
2098         int ret;
2099
2100         ERR_IF(!mdev->bitmap) return FALSE;
2101
2102         /* maybe we should use some per thread scratch page,
2103          * and allocate that during initial device creation? */
2104         p = (struct p_header80 *) __get_free_page(GFP_NOIO);
2105         if (!p) {
2106                 dev_err(DEV, "failed to allocate one page buffer in %s\n", __func__);
2107                 return FALSE;
2108         }
2109
2110         if (get_ldev(mdev)) {
2111                 if (drbd_md_test_flag(mdev->ldev, MDF_FULL_SYNC)) {
2112                         dev_info(DEV, "Writing the whole bitmap, MDF_FullSync was set.\n");
2113                         drbd_bm_set_all(mdev);
2114                         if (drbd_bm_write(mdev)) {
2115                                 /* write_bm did fail! Leave full sync flag set in Meta P_DATA
2116                                  * but otherwise process as per normal - need to tell other
2117                                  * side that a full resync is required! */
2118                                 dev_err(DEV, "Failed to write bitmap to disk!\n");
2119                         } else {
2120                                 drbd_md_clear_flag(mdev, MDF_FULL_SYNC);
2121                                 drbd_md_sync(mdev);
2122                         }
2123                 }
2124                 put_ldev(mdev);
2125         }
2126
2127         c = (struct bm_xfer_ctx) {
2128                 .bm_bits = drbd_bm_bits(mdev),
2129                 .bm_words = drbd_bm_words(mdev),
2130         };
2131
2132         do {
2133                 ret = send_bitmap_rle_or_plain(mdev, p, &c);
2134         } while (ret == OK);
2135
2136         free_page((unsigned long) p);
2137         return (ret == DONE);
2138 }
2139
2140 int drbd_send_bitmap(struct drbd_conf *mdev)
2141 {
2142         int err;
2143
2144         if (!drbd_get_data_sock(mdev))
2145                 return -1;
2146         err = !_drbd_send_bitmap(mdev);
2147         drbd_put_data_sock(mdev);
2148         return err;
2149 }
2150
2151 int drbd_send_b_ack(struct drbd_conf *mdev, u32 barrier_nr, u32 set_size)
2152 {
2153         int ok;
2154         struct p_barrier_ack p;
2155
2156         p.barrier  = barrier_nr;
2157         p.set_size = cpu_to_be32(set_size);
2158
2159         if (mdev->state.conn < C_CONNECTED)
2160                 return FALSE;
2161         ok = drbd_send_cmd(mdev, USE_META_SOCKET, P_BARRIER_ACK,
2162                         (struct p_header80 *)&p, sizeof(p));
2163         return ok;
2164 }
2165
2166 /**
2167  * _drbd_send_ack() - Sends an ack packet
2168  * @mdev:       DRBD device.
2169  * @cmd:        Packet command code.
2170  * @sector:     sector, needs to be in big endian byte order
2171  * @blksize:    size in byte, needs to be in big endian byte order
2172  * @block_id:   Id, big endian byte order
2173  */
2174 static int _drbd_send_ack(struct drbd_conf *mdev, enum drbd_packets cmd,
2175                           u64 sector,
2176                           u32 blksize,
2177                           u64 block_id)
2178 {
2179         int ok;
2180         struct p_block_ack p;
2181
2182         p.sector   = sector;
2183         p.block_id = block_id;
2184         p.blksize  = blksize;
2185         p.seq_num  = cpu_to_be32(atomic_add_return(1, &mdev->packet_seq));
2186
2187         if (!mdev->meta.socket || mdev->state.conn < C_CONNECTED)
2188                 return FALSE;
2189         ok = drbd_send_cmd(mdev, USE_META_SOCKET, cmd,
2190                                 (struct p_header80 *)&p, sizeof(p));
2191         return ok;
2192 }
2193
2194 int drbd_send_ack_dp(struct drbd_conf *mdev, enum drbd_packets cmd,
2195                      struct p_data *dp)
2196 {
2197         const int header_size = sizeof(struct p_data)
2198                               - sizeof(struct p_header80);
2199         int data_size  = ((struct p_header80 *)dp)->length - header_size;
2200
2201         return _drbd_send_ack(mdev, cmd, dp->sector, cpu_to_be32(data_size),
2202                               dp->block_id);
2203 }
2204
2205 int drbd_send_ack_rp(struct drbd_conf *mdev, enum drbd_packets cmd,
2206                      struct p_block_req *rp)
2207 {
2208         return _drbd_send_ack(mdev, cmd, rp->sector, rp->blksize, rp->block_id);
2209 }
2210
2211 /**
2212  * drbd_send_ack() - Sends an ack packet
2213  * @mdev:       DRBD device.
2214  * @cmd:        Packet command code.
2215  * @e:          Epoch entry.
2216  */
2217 int drbd_send_ack(struct drbd_conf *mdev,
2218         enum drbd_packets cmd, struct drbd_epoch_entry *e)
2219 {
2220         return _drbd_send_ack(mdev, cmd,
2221                               cpu_to_be64(e->sector),
2222                               cpu_to_be32(e->size),
2223                               e->block_id);
2224 }
2225
2226 /* This function misuses the block_id field to signal if the blocks
2227  * are is sync or not. */
2228 int drbd_send_ack_ex(struct drbd_conf *mdev, enum drbd_packets cmd,
2229                      sector_t sector, int blksize, u64 block_id)
2230 {
2231         return _drbd_send_ack(mdev, cmd,
2232                               cpu_to_be64(sector),
2233                               cpu_to_be32(blksize),
2234                               cpu_to_be64(block_id));
2235 }
2236
2237 int drbd_send_drequest(struct drbd_conf *mdev, int cmd,
2238                        sector_t sector, int size, u64 block_id)
2239 {
2240         int ok;
2241         struct p_block_req p;
2242
2243         p.sector   = cpu_to_be64(sector);
2244         p.block_id = block_id;
2245         p.blksize  = cpu_to_be32(size);
2246
2247         ok = drbd_send_cmd(mdev, USE_DATA_SOCKET, cmd,
2248                                 (struct p_header80 *)&p, sizeof(p));
2249         return ok;
2250 }
2251
2252 int drbd_send_drequest_csum(struct drbd_conf *mdev,
2253                             sector_t sector, int size,
2254                             void *digest, int digest_size,
2255                             enum drbd_packets cmd)
2256 {
2257         int ok;
2258         struct p_block_req p;
2259
2260         p.sector   = cpu_to_be64(sector);
2261         p.block_id = BE_DRBD_MAGIC + 0xbeef;
2262         p.blksize  = cpu_to_be32(size);
2263
2264         p.head.magic   = BE_DRBD_MAGIC;
2265         p.head.command = cpu_to_be16(cmd);
2266         p.head.length  = cpu_to_be16(sizeof(p) - sizeof(struct p_header80) + digest_size);
2267
2268         mutex_lock(&mdev->data.mutex);
2269
2270         ok = (sizeof(p) == drbd_send(mdev, mdev->data.socket, &p, sizeof(p), 0));
2271         ok = ok && (digest_size == drbd_send(mdev, mdev->data.socket, digest, digest_size, 0));
2272
2273         mutex_unlock(&mdev->data.mutex);
2274
2275         return ok;
2276 }
2277
2278 int drbd_send_ov_request(struct drbd_conf *mdev, sector_t sector, int size)
2279 {
2280         int ok;
2281         struct p_block_req p;
2282
2283         p.sector   = cpu_to_be64(sector);
2284         p.block_id = BE_DRBD_MAGIC + 0xbabe;
2285         p.blksize  = cpu_to_be32(size);
2286
2287         ok = drbd_send_cmd(mdev, USE_DATA_SOCKET, P_OV_REQUEST,
2288                            (struct p_header80 *)&p, sizeof(p));
2289         return ok;
2290 }
2291
2292 /* called on sndtimeo
2293  * returns FALSE if we should retry,
2294  * TRUE if we think connection is dead
2295  */
2296 static int we_should_drop_the_connection(struct drbd_conf *mdev, struct socket *sock)
2297 {
2298         int drop_it;
2299         /* long elapsed = (long)(jiffies - mdev->last_received); */
2300
2301         drop_it =   mdev->meta.socket == sock
2302                 || !mdev->asender.task
2303                 || get_t_state(&mdev->asender) != Running
2304                 || mdev->state.conn < C_CONNECTED;
2305
2306         if (drop_it)
2307                 return TRUE;
2308
2309         drop_it = !--mdev->ko_count;
2310         if (!drop_it) {
2311                 dev_err(DEV, "[%s/%d] sock_sendmsg time expired, ko = %u\n",
2312                        current->comm, current->pid, mdev->ko_count);
2313                 request_ping(mdev);
2314         }
2315
2316         return drop_it; /* && (mdev->state == R_PRIMARY) */;
2317 }
2318
2319 /* The idea of sendpage seems to be to put some kind of reference
2320  * to the page into the skb, and to hand it over to the NIC. In
2321  * this process get_page() gets called.
2322  *
2323  * As soon as the page was really sent over the network put_page()
2324  * gets called by some part of the network layer. [ NIC driver? ]
2325  *
2326  * [ get_page() / put_page() increment/decrement the count. If count
2327  *   reaches 0 the page will be freed. ]
2328  *
2329  * This works nicely with pages from FSs.
2330  * But this means that in protocol A we might signal IO completion too early!
2331  *
2332  * In order not to corrupt data during a resync we must make sure
2333  * that we do not reuse our own buffer pages (EEs) to early, therefore
2334  * we have the net_ee list.
2335  *
2336  * XFS seems to have problems, still, it submits pages with page_count == 0!
2337  * As a workaround, we disable sendpage on pages
2338  * with page_count == 0 or PageSlab.
2339  */
2340 static int _drbd_no_send_page(struct drbd_conf *mdev, struct page *page,
2341                    int offset, size_t size, unsigned msg_flags)
2342 {
2343         int sent = drbd_send(mdev, mdev->data.socket, kmap(page) + offset, size, msg_flags);
2344         kunmap(page);
2345         if (sent == size)
2346                 mdev->send_cnt += size>>9;
2347         return sent == size;
2348 }
2349
2350 static int _drbd_send_page(struct drbd_conf *mdev, struct page *page,
2351                     int offset, size_t size, unsigned msg_flags)
2352 {
2353         mm_segment_t oldfs = get_fs();
2354         int sent, ok;
2355         int len = size;
2356
2357         /* e.g. XFS meta- & log-data is in slab pages, which have a
2358          * page_count of 0 and/or have PageSlab() set.
2359          * we cannot use send_page for those, as that does get_page();
2360          * put_page(); and would cause either a VM_BUG directly, or
2361          * __page_cache_release a page that would actually still be referenced
2362          * by someone, leading to some obscure delayed Oops somewhere else. */
2363         if (disable_sendpage || (page_count(page) < 1) || PageSlab(page))
2364                 return _drbd_no_send_page(mdev, page, offset, size, msg_flags);
2365
2366         msg_flags |= MSG_NOSIGNAL;
2367         drbd_update_congested(mdev);
2368         set_fs(KERNEL_DS);
2369         do {
2370                 sent = mdev->data.socket->ops->sendpage(mdev->data.socket, page,
2371                                                         offset, len,
2372                                                         msg_flags);
2373                 if (sent == -EAGAIN) {
2374                         if (we_should_drop_the_connection(mdev,
2375                                                           mdev->data.socket))
2376                                 break;
2377                         else
2378                                 continue;
2379                 }
2380                 if (sent <= 0) {
2381                         dev_warn(DEV, "%s: size=%d len=%d sent=%d\n",
2382                              __func__, (int)size, len, sent);
2383                         break;
2384                 }
2385                 len    -= sent;
2386                 offset += sent;
2387         } while (len > 0 /* THINK && mdev->cstate >= C_CONNECTED*/);
2388         set_fs(oldfs);
2389         clear_bit(NET_CONGESTED, &mdev->flags);
2390
2391         ok = (len == 0);
2392         if (likely(ok))
2393                 mdev->send_cnt += size>>9;
2394         return ok;
2395 }
2396
2397 static int _drbd_send_bio(struct drbd_conf *mdev, struct bio *bio)
2398 {
2399         struct bio_vec *bvec;
2400         int i;
2401         /* hint all but last page with MSG_MORE */
2402         __bio_for_each_segment(bvec, bio, i, 0) {
2403                 if (!_drbd_no_send_page(mdev, bvec->bv_page,
2404                                      bvec->bv_offset, bvec->bv_len,
2405                                      i == bio->bi_vcnt -1 ? 0 : MSG_MORE))
2406                         return 0;
2407         }
2408         return 1;
2409 }
2410
2411 static int _drbd_send_zc_bio(struct drbd_conf *mdev, struct bio *bio)
2412 {
2413         struct bio_vec *bvec;
2414         int i;
2415         /* hint all but last page with MSG_MORE */
2416         __bio_for_each_segment(bvec, bio, i, 0) {
2417                 if (!_drbd_send_page(mdev, bvec->bv_page,
2418                                      bvec->bv_offset, bvec->bv_len,
2419                                      i == bio->bi_vcnt -1 ? 0 : MSG_MORE))
2420                         return 0;
2421         }
2422         return 1;
2423 }
2424
2425 static int _drbd_send_zc_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
2426 {
2427         struct page *page = e->pages;
2428         unsigned len = e->size;
2429         /* hint all but last page with MSG_MORE */
2430         page_chain_for_each(page) {
2431                 unsigned l = min_t(unsigned, len, PAGE_SIZE);
2432                 if (!_drbd_send_page(mdev, page, 0, l,
2433                                 page_chain_next(page) ? MSG_MORE : 0))
2434                         return 0;
2435                 len -= l;
2436         }
2437         return 1;
2438 }
2439
2440 static u32 bio_flags_to_wire(struct drbd_conf *mdev, unsigned long bi_rw)
2441 {
2442         if (mdev->agreed_pro_version >= 95)
2443                 return  (bi_rw & REQ_SYNC ? DP_RW_SYNC : 0) |
2444                         (bi_rw & REQ_UNPLUG ? DP_UNPLUG : 0) |
2445                         (bi_rw & REQ_FUA ? DP_FUA : 0) |
2446                         (bi_rw & REQ_FLUSH ? DP_FLUSH : 0) |
2447                         (bi_rw & REQ_DISCARD ? DP_DISCARD : 0);
2448         else
2449                 return bi_rw & (REQ_SYNC | REQ_UNPLUG) ? DP_RW_SYNC : 0;
2450 }
2451
2452 /* Used to send write requests
2453  * R_PRIMARY -> Peer    (P_DATA)
2454  */
2455 int drbd_send_dblock(struct drbd_conf *mdev, struct drbd_request *req)
2456 {
2457         int ok = 1;
2458         struct p_data p;
2459         unsigned int dp_flags = 0;
2460         void *dgb;
2461         int dgs;
2462
2463         if (!drbd_get_data_sock(mdev))
2464                 return 0;
2465
2466         dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_w_tfm) ?
2467                 crypto_hash_digestsize(mdev->integrity_w_tfm) : 0;
2468
2469         if (req->size <= DRBD_MAX_SIZE_H80_PACKET) {
2470                 p.head.h80.magic   = BE_DRBD_MAGIC;
2471                 p.head.h80.command = cpu_to_be16(P_DATA);
2472                 p.head.h80.length  =
2473                         cpu_to_be16(sizeof(p) - sizeof(union p_header) + dgs + req->size);
2474         } else {
2475                 p.head.h95.magic   = BE_DRBD_MAGIC_BIG;
2476                 p.head.h95.command = cpu_to_be16(P_DATA);
2477                 p.head.h95.length  =
2478                         cpu_to_be32(sizeof(p) - sizeof(union p_header) + dgs + req->size);
2479         }
2480
2481         p.sector   = cpu_to_be64(req->sector);
2482         p.block_id = (unsigned long)req;
2483         p.seq_num  = cpu_to_be32(req->seq_num =
2484                                  atomic_add_return(1, &mdev->packet_seq));
2485
2486         dp_flags = bio_flags_to_wire(mdev, req->master_bio->bi_rw);
2487
2488         if (mdev->state.conn >= C_SYNC_SOURCE &&
2489             mdev->state.conn <= C_PAUSED_SYNC_T)
2490                 dp_flags |= DP_MAY_SET_IN_SYNC;
2491
2492         p.dp_flags = cpu_to_be32(dp_flags);
2493         set_bit(UNPLUG_REMOTE, &mdev->flags);
2494         ok = (sizeof(p) ==
2495                 drbd_send(mdev, mdev->data.socket, &p, sizeof(p), dgs ? MSG_MORE : 0));
2496         if (ok && dgs) {
2497                 dgb = mdev->int_dig_out;
2498                 drbd_csum_bio(mdev, mdev->integrity_w_tfm, req->master_bio, dgb);
2499                 ok = drbd_send(mdev, mdev->data.socket, dgb, dgs, 0);
2500         }
2501         if (ok) {
2502                 if (mdev->net_conf->wire_protocol == DRBD_PROT_A)
2503                         ok = _drbd_send_bio(mdev, req->master_bio);
2504                 else
2505                         ok = _drbd_send_zc_bio(mdev, req->master_bio);
2506         }
2507
2508         drbd_put_data_sock(mdev);
2509
2510         return ok;
2511 }
2512
2513 /* answer packet, used to send data back for read requests:
2514  *  Peer       -> (diskless) R_PRIMARY   (P_DATA_REPLY)
2515  *  C_SYNC_SOURCE -> C_SYNC_TARGET         (P_RS_DATA_REPLY)
2516  */
2517 int drbd_send_block(struct drbd_conf *mdev, enum drbd_packets cmd,
2518                     struct drbd_epoch_entry *e)
2519 {
2520         int ok;
2521         struct p_data p;
2522         void *dgb;
2523         int dgs;
2524
2525         dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_w_tfm) ?
2526                 crypto_hash_digestsize(mdev->integrity_w_tfm) : 0;
2527
2528         if (e->size <= DRBD_MAX_SIZE_H80_PACKET) {
2529                 p.head.h80.magic   = BE_DRBD_MAGIC;
2530                 p.head.h80.command = cpu_to_be16(cmd);
2531                 p.head.h80.length  =
2532                         cpu_to_be16(sizeof(p) - sizeof(struct p_header80) + dgs + e->size);
2533         } else {
2534                 p.head.h95.magic   = BE_DRBD_MAGIC_BIG;
2535                 p.head.h95.command = cpu_to_be16(cmd);
2536                 p.head.h95.length  =
2537                         cpu_to_be32(sizeof(p) - sizeof(struct p_header80) + dgs + e->size);
2538         }
2539
2540         p.sector   = cpu_to_be64(e->sector);
2541         p.block_id = e->block_id;
2542         /* p.seq_num  = 0;    No sequence numbers here.. */
2543
2544         /* Only called by our kernel thread.
2545          * This one may be interrupted by DRBD_SIG and/or DRBD_SIGKILL
2546          * in response to admin command or module unload.
2547          */
2548         if (!drbd_get_data_sock(mdev))
2549                 return 0;
2550
2551         ok = sizeof(p) == drbd_send(mdev, mdev->data.socket, &p, sizeof(p), dgs ? MSG_MORE : 0);
2552         if (ok && dgs) {
2553                 dgb = mdev->int_dig_out;
2554                 drbd_csum_ee(mdev, mdev->integrity_w_tfm, e, dgb);
2555                 ok = drbd_send(mdev, mdev->data.socket, dgb, dgs, 0);
2556         }
2557         if (ok)
2558                 ok = _drbd_send_zc_ee(mdev, e);
2559
2560         drbd_put_data_sock(mdev);
2561
2562         return ok;
2563 }
2564
2565 /*
2566   drbd_send distinguishes two cases:
2567
2568   Packets sent via the data socket "sock"
2569   and packets sent via the meta data socket "msock"
2570
2571                     sock                      msock
2572   -----------------+-------------------------+------------------------------
2573   timeout           conf.timeout / 2          conf.timeout / 2
2574   timeout action    send a ping via msock     Abort communication
2575                                               and close all sockets
2576 */
2577
2578 /*
2579  * you must have down()ed the appropriate [m]sock_mutex elsewhere!
2580  */
2581 int drbd_send(struct drbd_conf *mdev, struct socket *sock,
2582               void *buf, size_t size, unsigned msg_flags)
2583 {
2584         struct kvec iov;
2585         struct msghdr msg;
2586         int rv, sent = 0;
2587
2588         if (!sock)
2589                 return -1000;
2590
2591         /* THINK  if (signal_pending) return ... ? */
2592
2593         iov.iov_base = buf;
2594         iov.iov_len  = size;
2595
2596         msg.msg_name       = NULL;
2597         msg.msg_namelen    = 0;
2598         msg.msg_control    = NULL;
2599         msg.msg_controllen = 0;
2600         msg.msg_flags      = msg_flags | MSG_NOSIGNAL;
2601
2602         if (sock == mdev->data.socket) {
2603                 mdev->ko_count = mdev->net_conf->ko_count;
2604                 drbd_update_congested(mdev);
2605         }
2606         do {
2607                 /* STRANGE
2608                  * tcp_sendmsg does _not_ use its size parameter at all ?
2609                  *
2610                  * -EAGAIN on timeout, -EINTR on signal.
2611                  */
2612 /* THINK
2613  * do we need to block DRBD_SIG if sock == &meta.socket ??
2614  * otherwise wake_asender() might interrupt some send_*Ack !
2615  */
2616                 rv = kernel_sendmsg(sock, &msg, &iov, 1, size);
2617                 if (rv == -EAGAIN) {
2618                         if (we_should_drop_the_connection(mdev, sock))
2619                                 break;
2620                         else
2621                                 continue;
2622                 }
2623                 D_ASSERT(rv != 0);
2624                 if (rv == -EINTR) {
2625                         flush_signals(current);
2626                         rv = 0;
2627                 }
2628                 if (rv < 0)
2629                         break;
2630                 sent += rv;
2631                 iov.iov_base += rv;
2632                 iov.iov_len  -= rv;
2633         } while (sent < size);
2634
2635         if (sock == mdev->data.socket)
2636                 clear_bit(NET_CONGESTED, &mdev->flags);
2637
2638         if (rv <= 0) {
2639                 if (rv != -EAGAIN) {
2640                         dev_err(DEV, "%s_sendmsg returned %d\n",
2641                             sock == mdev->meta.socket ? "msock" : "sock",
2642                             rv);
2643                         drbd_force_state(mdev, NS(conn, C_BROKEN_PIPE));
2644                 } else
2645                         drbd_force_state(mdev, NS(conn, C_TIMEOUT));
2646         }
2647
2648         return sent;
2649 }
2650
2651 static int drbd_open(struct block_device *bdev, fmode_t mode)
2652 {
2653         struct drbd_conf *mdev = bdev->bd_disk->private_data;
2654         unsigned long flags;
2655         int rv = 0;
2656
2657         lock_kernel();
2658         spin_lock_irqsave(&mdev->req_lock, flags);
2659         /* to have a stable mdev->state.role
2660          * and no race with updating open_cnt */
2661
2662         if (mdev->state.role != R_PRIMARY) {
2663                 if (mode & FMODE_WRITE)
2664                         rv = -EROFS;
2665                 else if (!allow_oos)
2666                         rv = -EMEDIUMTYPE;
2667         }
2668
2669         if (!rv)
2670                 mdev->open_cnt++;
2671         spin_unlock_irqrestore(&mdev->req_lock, flags);
2672         unlock_kernel();
2673
2674         return rv;
2675 }
2676
2677 static int drbd_release(struct gendisk *gd, fmode_t mode)
2678 {
2679         struct drbd_conf *mdev = gd->private_data;
2680         lock_kernel();
2681         mdev->open_cnt--;
2682         unlock_kernel();
2683         return 0;
2684 }
2685
2686 static void drbd_unplug_fn(struct request_queue *q)
2687 {
2688         struct drbd_conf *mdev = q->queuedata;
2689
2690         /* unplug FIRST */
2691         spin_lock_irq(q->queue_lock);
2692         blk_remove_plug(q);
2693         spin_unlock_irq(q->queue_lock);
2694
2695         /* only if connected */
2696         spin_lock_irq(&mdev->req_lock);
2697         if (mdev->state.pdsk >= D_INCONSISTENT && mdev->state.conn >= C_CONNECTED) {
2698                 D_ASSERT(mdev->state.role == R_PRIMARY);
2699                 if (test_and_clear_bit(UNPLUG_REMOTE, &mdev->flags)) {
2700                         /* add to the data.work queue,
2701                          * unless already queued.
2702                          * XXX this might be a good addition to drbd_queue_work
2703                          * anyways, to detect "double queuing" ... */
2704                         if (list_empty(&mdev->unplug_work.list))
2705                                 drbd_queue_work(&mdev->data.work,
2706                                                 &mdev->unplug_work);
2707                 }
2708         }
2709         spin_unlock_irq(&mdev->req_lock);
2710
2711         if (mdev->state.disk >= D_INCONSISTENT)
2712                 drbd_kick_lo(mdev);
2713 }
2714
2715 static void drbd_set_defaults(struct drbd_conf *mdev)
2716 {
2717         /* This way we get a compile error when sync_conf grows,
2718            and we forgot to initialize it here */
2719         mdev->sync_conf = (struct syncer_conf) {
2720                 /* .rate = */           DRBD_RATE_DEF,
2721                 /* .after = */          DRBD_AFTER_DEF,
2722                 /* .al_extents = */     DRBD_AL_EXTENTS_DEF,
2723                 /* .verify_alg = */     {}, 0,
2724                 /* .cpu_mask = */       {}, 0,
2725                 /* .csums_alg = */      {}, 0,
2726                 /* .use_rle = */        0,
2727                 /* .on_no_data = */     DRBD_ON_NO_DATA_DEF,
2728                 /* .c_plan_ahead = */   DRBD_C_PLAN_AHEAD_DEF,
2729                 /* .c_delay_target = */ DRBD_C_DELAY_TARGET_DEF,
2730                 /* .c_fill_target = */  DRBD_C_FILL_TARGET_DEF,
2731                 /* .c_max_rate = */     DRBD_C_MAX_RATE_DEF,
2732                 /* .c_min_rate = */     DRBD_C_MIN_RATE_DEF
2733         };
2734
2735         /* Have to use that way, because the layout differs between
2736            big endian and little endian */
2737         mdev->state = (union drbd_state) {
2738                 { .role = R_SECONDARY,
2739                   .peer = R_UNKNOWN,
2740                   .conn = C_STANDALONE,
2741                   .disk = D_DISKLESS,
2742                   .pdsk = D_UNKNOWN,
2743                   .susp = 0,
2744                   .susp_nod = 0,
2745                   .susp_fen = 0
2746                 } };
2747 }
2748
2749 void drbd_init_set_defaults(struct drbd_conf *mdev)
2750 {
2751         /* the memset(,0,) did most of this.
2752          * note: only assignments, no allocation in here */
2753
2754         drbd_set_defaults(mdev);
2755
2756         /* for now, we do NOT yet support it,
2757          * even though we start some framework
2758          * to eventually support barriers */
2759         set_bit(NO_BARRIER_SUPP, &mdev->flags);
2760
2761         atomic_set(&mdev->ap_bio_cnt, 0);
2762         atomic_set(&mdev->ap_pending_cnt, 0);
2763         atomic_set(&mdev->rs_pending_cnt, 0);
2764         atomic_set(&mdev->unacked_cnt, 0);
2765         atomic_set(&mdev->local_cnt, 0);
2766         atomic_set(&mdev->net_cnt, 0);
2767         atomic_set(&mdev->packet_seq, 0);
2768         atomic_set(&mdev->pp_in_use, 0);
2769         atomic_set(&mdev->pp_in_use_by_net, 0);
2770         atomic_set(&mdev->rs_sect_in, 0);
2771         atomic_set(&mdev->rs_sect_ev, 0);
2772
2773         mutex_init(&mdev->md_io_mutex);
2774         mutex_init(&mdev->data.mutex);
2775         mutex_init(&mdev->meta.mutex);
2776         sema_init(&mdev->data.work.s, 0);
2777         sema_init(&mdev->meta.work.s, 0);
2778         mutex_init(&mdev->state_mutex);
2779
2780         spin_lock_init(&mdev->data.work.q_lock);
2781         spin_lock_init(&mdev->meta.work.q_lock);
2782
2783         spin_lock_init(&mdev->al_lock);
2784         spin_lock_init(&mdev->req_lock);
2785         spin_lock_init(&mdev->peer_seq_lock);
2786         spin_lock_init(&mdev->epoch_lock);
2787
2788         INIT_LIST_HEAD(&mdev->active_ee);
2789         INIT_LIST_HEAD(&mdev->sync_ee);
2790         INIT_LIST_HEAD(&mdev->done_ee);
2791         INIT_LIST_HEAD(&mdev->read_ee);
2792         INIT_LIST_HEAD(&mdev->net_ee);
2793         INIT_LIST_HEAD(&mdev->resync_reads);
2794         INIT_LIST_HEAD(&mdev->data.work.q);
2795         INIT_LIST_HEAD(&mdev->meta.work.q);
2796         INIT_LIST_HEAD(&mdev->resync_work.list);
2797         INIT_LIST_HEAD(&mdev->unplug_work.list);
2798         INIT_LIST_HEAD(&mdev->md_sync_work.list);
2799         INIT_LIST_HEAD(&mdev->bm_io_work.w.list);
2800
2801         mdev->resync_work.cb  = w_resync_inactive;
2802         mdev->unplug_work.cb  = w_send_write_hint;
2803         mdev->md_sync_work.cb = w_md_sync;
2804         mdev->bm_io_work.w.cb = w_bitmap_io;
2805         init_timer(&mdev->resync_timer);
2806         init_timer(&mdev->md_sync_timer);
2807         mdev->resync_timer.function = resync_timer_fn;
2808         mdev->resync_timer.data = (unsigned long) mdev;
2809         mdev->md_sync_timer.function = md_sync_timer_fn;
2810         mdev->md_sync_timer.data = (unsigned long) mdev;
2811
2812         init_waitqueue_head(&mdev->misc_wait);
2813         init_waitqueue_head(&mdev->state_wait);
2814         init_waitqueue_head(&mdev->net_cnt_wait);
2815         init_waitqueue_head(&mdev->ee_wait);
2816         init_waitqueue_head(&mdev->al_wait);
2817         init_waitqueue_head(&mdev->seq_wait);
2818
2819         drbd_thread_init(mdev, &mdev->receiver, drbdd_init);
2820         drbd_thread_init(mdev, &mdev->worker, drbd_worker);
2821         drbd_thread_init(mdev, &mdev->asender, drbd_asender);
2822
2823         mdev->agreed_pro_version = PRO_VERSION_MAX;
2824         mdev->write_ordering = WO_bio_barrier;
2825         mdev->resync_wenr = LC_FREE;
2826 }
2827
2828 void drbd_mdev_cleanup(struct drbd_conf *mdev)
2829 {
2830         int i;
2831         if (mdev->receiver.t_state != None)
2832                 dev_err(DEV, "ASSERT FAILED: receiver t_state == %d expected 0.\n",
2833                                 mdev->receiver.t_state);
2834
2835         /* no need to lock it, I'm the only thread alive */
2836         if (atomic_read(&mdev->current_epoch->epoch_size) !=  0)
2837                 dev_err(DEV, "epoch_size:%d\n", atomic_read(&mdev->current_epoch->epoch_size));
2838         mdev->al_writ_cnt  =
2839         mdev->bm_writ_cnt  =
2840         mdev->read_cnt     =
2841         mdev->recv_cnt     =
2842         mdev->send_cnt     =
2843         mdev->writ_cnt     =
2844         mdev->p_size       =
2845         mdev->rs_start     =
2846         mdev->rs_total     =
2847         mdev->rs_failed    = 0;
2848         mdev->rs_last_events = 0;
2849         mdev->rs_last_sect_ev = 0;
2850         for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2851                 mdev->rs_mark_left[i] = 0;
2852                 mdev->rs_mark_time[i] = 0;
2853         }
2854         D_ASSERT(mdev->net_conf == NULL);
2855
2856         drbd_set_my_capacity(mdev, 0);
2857         if (mdev->bitmap) {
2858                 /* maybe never allocated. */
2859                 drbd_bm_resize(mdev, 0, 1);
2860                 drbd_bm_cleanup(mdev);
2861         }
2862
2863         drbd_free_resources(mdev);
2864         clear_bit(AL_SUSPENDED, &mdev->flags);
2865
2866         /*
2867          * currently we drbd_init_ee only on module load, so
2868          * we may do drbd_release_ee only on module unload!
2869          */
2870         D_ASSERT(list_empty(&mdev->active_ee));
2871         D_ASSERT(list_empty(&mdev->sync_ee));
2872         D_ASSERT(list_empty(&mdev->done_ee));
2873         D_ASSERT(list_empty(&mdev->read_ee));
2874         D_ASSERT(list_empty(&mdev->net_ee));
2875         D_ASSERT(list_empty(&mdev->resync_reads));
2876         D_ASSERT(list_empty(&mdev->data.work.q));
2877         D_ASSERT(list_empty(&mdev->meta.work.q));
2878         D_ASSERT(list_empty(&mdev->resync_work.list));
2879         D_ASSERT(list_empty(&mdev->unplug_work.list));
2880
2881 }
2882
2883
2884 static void drbd_destroy_mempools(void)
2885 {
2886         struct page *page;
2887
2888         while (drbd_pp_pool) {
2889                 page = drbd_pp_pool;
2890                 drbd_pp_pool = (struct page *)page_private(page);
2891                 __free_page(page);
2892                 drbd_pp_vacant--;
2893         }
2894
2895         /* D_ASSERT(atomic_read(&drbd_pp_vacant)==0); */
2896
2897         if (drbd_ee_mempool)
2898                 mempool_destroy(drbd_ee_mempool);
2899         if (drbd_request_mempool)
2900                 mempool_destroy(drbd_request_mempool);
2901         if (drbd_ee_cache)
2902                 kmem_cache_destroy(drbd_ee_cache);
2903         if (drbd_request_cache)
2904                 kmem_cache_destroy(drbd_request_cache);
2905         if (drbd_bm_ext_cache)
2906                 kmem_cache_destroy(drbd_bm_ext_cache);
2907         if (drbd_al_ext_cache)
2908                 kmem_cache_destroy(drbd_al_ext_cache);
2909
2910         drbd_ee_mempool      = NULL;
2911         drbd_request_mempool = NULL;
2912         drbd_ee_cache        = NULL;
2913         drbd_request_cache   = NULL;
2914         drbd_bm_ext_cache    = NULL;
2915         drbd_al_ext_cache    = NULL;
2916
2917         return;
2918 }
2919
2920 static int drbd_create_mempools(void)
2921 {
2922         struct page *page;
2923         const int number = (DRBD_MAX_SEGMENT_SIZE/PAGE_SIZE) * minor_count;
2924         int i;
2925
2926         /* prepare our caches and mempools */
2927         drbd_request_mempool = NULL;
2928         drbd_ee_cache        = NULL;
2929         drbd_request_cache   = NULL;
2930         drbd_bm_ext_cache    = NULL;
2931         drbd_al_ext_cache    = NULL;
2932         drbd_pp_pool         = NULL;
2933
2934         /* caches */
2935         drbd_request_cache = kmem_cache_create(
2936                 "drbd_req", sizeof(struct drbd_request), 0, 0, NULL);
2937         if (drbd_request_cache == NULL)
2938                 goto Enomem;
2939
2940         drbd_ee_cache = kmem_cache_create(
2941                 "drbd_ee", sizeof(struct drbd_epoch_entry), 0, 0, NULL);
2942         if (drbd_ee_cache == NULL)
2943                 goto Enomem;
2944
2945         drbd_bm_ext_cache = kmem_cache_create(
2946                 "drbd_bm", sizeof(struct bm_extent), 0, 0, NULL);
2947         if (drbd_bm_ext_cache == NULL)
2948                 goto Enomem;
2949
2950         drbd_al_ext_cache = kmem_cache_create(
2951                 "drbd_al", sizeof(struct lc_element), 0, 0, NULL);
2952         if (drbd_al_ext_cache == NULL)
2953                 goto Enomem;
2954
2955         /* mempools */
2956         drbd_request_mempool = mempool_create(number,
2957                 mempool_alloc_slab, mempool_free_slab, drbd_request_cache);
2958         if (drbd_request_mempool == NULL)
2959                 goto Enomem;
2960
2961         drbd_ee_mempool = mempool_create(number,
2962                 mempool_alloc_slab, mempool_free_slab, drbd_ee_cache);
2963         if (drbd_request_mempool == NULL)
2964                 goto Enomem;
2965
2966         /* drbd's page pool */
2967         spin_lock_init(&drbd_pp_lock);
2968
2969         for (i = 0; i < number; i++) {
2970                 page = alloc_page(GFP_HIGHUSER);
2971                 if (!page)
2972                         goto Enomem;
2973                 set_page_private(page, (unsigned long)drbd_pp_pool);
2974                 drbd_pp_pool = page;
2975         }
2976         drbd_pp_vacant = number;
2977
2978         return 0;
2979
2980 Enomem:
2981         drbd_destroy_mempools(); /* in case we allocated some */
2982         return -ENOMEM;
2983 }
2984
2985 static int drbd_notify_sys(struct notifier_block *this, unsigned long code,
2986         void *unused)
2987 {
2988         /* just so we have it.  you never know what interesting things we
2989          * might want to do here some day...
2990          */
2991
2992         return NOTIFY_DONE;
2993 }
2994
2995 static struct notifier_block drbd_notifier = {
2996         .notifier_call = drbd_notify_sys,
2997 };
2998
2999 static void drbd_release_ee_lists(struct drbd_conf *mdev)
3000 {
3001         int rr;
3002
3003         rr = drbd_release_ee(mdev, &mdev->active_ee);
3004         if (rr)
3005                 dev_err(DEV, "%d EEs in active list found!\n", rr);
3006
3007         rr = drbd_release_ee(mdev, &mdev->sync_ee);
3008         if (rr)
3009                 dev_err(DEV, "%d EEs in sync list found!\n", rr);
3010
3011         rr = drbd_release_ee(mdev, &mdev->read_ee);
3012         if (rr)
3013                 dev_err(DEV, "%d EEs in read list found!\n", rr);
3014
3015         rr = drbd_release_ee(mdev, &mdev->done_ee);
3016         if (rr)
3017                 dev_err(DEV, "%d EEs in done list found!\n", rr);
3018
3019         rr = drbd_release_ee(mdev, &mdev->net_ee);
3020         if (rr)
3021                 dev_err(DEV, "%d EEs in net list found!\n", rr);
3022 }
3023
3024 /* caution. no locking.
3025  * currently only used from module cleanup code. */
3026 static void drbd_delete_device(unsigned int minor)
3027 {
3028         struct drbd_conf *mdev = minor_to_mdev(minor);
3029
3030         if (!mdev)
3031                 return;
3032
3033         /* paranoia asserts */
3034         if (mdev->open_cnt != 0)
3035                 dev_err(DEV, "open_cnt = %d in %s:%u", mdev->open_cnt,
3036                                 __FILE__ , __LINE__);
3037
3038         ERR_IF (!list_empty(&mdev->data.work.q)) {
3039                 struct list_head *lp;
3040                 list_for_each(lp, &mdev->data.work.q) {
3041                         dev_err(DEV, "lp = %p\n", lp);
3042                 }
3043         };
3044         /* end paranoia asserts */
3045
3046         del_gendisk(mdev->vdisk);
3047
3048         /* cleanup stuff that may have been allocated during
3049          * device (re-)configuration or state changes */
3050
3051         if (mdev->this_bdev)
3052                 bdput(mdev->this_bdev);
3053
3054         drbd_free_resources(mdev);
3055
3056         drbd_release_ee_lists(mdev);
3057
3058         /* should be free'd on disconnect? */
3059         kfree(mdev->ee_hash);
3060         /*
3061         mdev->ee_hash_s = 0;
3062         mdev->ee_hash = NULL;
3063         */
3064
3065         lc_destroy(mdev->act_log);
3066         lc_destroy(mdev->resync);
3067
3068         kfree(mdev->p_uuid);
3069         /* mdev->p_uuid = NULL; */
3070
3071         kfree(mdev->int_dig_out);
3072         kfree(mdev->int_dig_in);
3073         kfree(mdev->int_dig_vv);
3074
3075         /* cleanup the rest that has been
3076          * allocated from drbd_new_device
3077          * and actually free the mdev itself */
3078         drbd_free_mdev(mdev);
3079 }
3080
3081 static void drbd_cleanup(void)
3082 {
3083         unsigned int i;
3084
3085         unregister_reboot_notifier(&drbd_notifier);
3086
3087         drbd_nl_cleanup();
3088
3089         if (minor_table) {
3090                 if (drbd_proc)
3091                         remove_proc_entry("drbd", NULL);
3092                 i = minor_count;
3093                 while (i--)
3094                         drbd_delete_device(i);
3095                 drbd_destroy_mempools();
3096         }
3097
3098         kfree(minor_table);
3099
3100         unregister_blkdev(DRBD_MAJOR, "drbd");
3101
3102         printk(KERN_INFO "drbd: module cleanup done.\n");
3103 }
3104
3105 /**
3106  * drbd_congested() - Callback for pdflush
3107  * @congested_data:     User data
3108  * @bdi_bits:           Bits pdflush is currently interested in
3109  *
3110  * Returns 1<<BDI_async_congested and/or 1<<BDI_sync_congested if we are congested.
3111  */
3112 static int drbd_congested(void *congested_data, int bdi_bits)
3113 {
3114         struct drbd_conf *mdev = congested_data;
3115         struct request_queue *q;
3116         char reason = '-';
3117         int r = 0;
3118
3119         if (!__inc_ap_bio_cond(mdev)) {
3120                 /* DRBD has frozen IO */
3121                 r = bdi_bits;
3122                 reason = 'd';
3123                 goto out;
3124         }
3125
3126         if (get_ldev(mdev)) {
3127                 q = bdev_get_queue(mdev->ldev->backing_bdev);
3128                 r = bdi_congested(&q->backing_dev_info, bdi_bits);
3129                 put_ldev(mdev);
3130                 if (r)
3131                         reason = 'b';
3132         }
3133
3134         if (bdi_bits & (1 << BDI_async_congested) && test_bit(NET_CONGESTED, &mdev->flags)) {
3135                 r |= (1 << BDI_async_congested);
3136                 reason = reason == 'b' ? 'a' : 'n';
3137         }
3138
3139 out:
3140         mdev->congestion_reason = reason;
3141         return r;
3142 }
3143
3144 struct drbd_conf *drbd_new_device(unsigned int minor)
3145 {
3146         struct drbd_conf *mdev;
3147         struct gendisk *disk;
3148         struct request_queue *q;
3149
3150         /* GFP_KERNEL, we are outside of all write-out paths */
3151         mdev = kzalloc(sizeof(struct drbd_conf), GFP_KERNEL);
3152         if (!mdev)
3153                 return NULL;
3154         if (!zalloc_cpumask_var(&mdev->cpu_mask, GFP_KERNEL))
3155                 goto out_no_cpumask;
3156
3157         mdev->minor = minor;
3158
3159         drbd_init_set_defaults(mdev);
3160
3161         q = blk_alloc_queue(GFP_KERNEL);
3162         if (!q)
3163                 goto out_no_q;
3164         mdev->rq_queue = q;
3165         q->queuedata   = mdev;
3166
3167         disk = alloc_disk(1);
3168         if (!disk)
3169                 goto out_no_disk;
3170         mdev->vdisk = disk;
3171
3172         set_disk_ro(disk, TRUE);
3173
3174         disk->queue = q;
3175         disk->major = DRBD_MAJOR;
3176         disk->first_minor = minor;
3177         disk->fops = &drbd_ops;
3178         sprintf(disk->disk_name, "drbd%d", minor);
3179         disk->private_data = mdev;
3180
3181         mdev->this_bdev = bdget(MKDEV(DRBD_MAJOR, minor));
3182         /* we have no partitions. we contain only ourselves. */
3183         mdev->this_bdev->bd_contains = mdev->this_bdev;
3184
3185         q->backing_dev_info.congested_fn = drbd_congested;
3186         q->backing_dev_info.congested_data = mdev;
3187
3188         blk_queue_make_request(q, drbd_make_request_26);
3189         blk_queue_max_segment_size(q, DRBD_MAX_SEGMENT_SIZE);
3190         blk_queue_bounce_limit(q, BLK_BOUNCE_ANY);
3191         blk_queue_merge_bvec(q, drbd_merge_bvec);
3192         q->queue_lock = &mdev->req_lock; /* needed since we use */
3193                 /* plugging on a queue, that actually has no requests! */
3194         q->unplug_fn = drbd_unplug_fn;
3195
3196         mdev->md_io_page = alloc_page(GFP_KERNEL);
3197         if (!mdev->md_io_page)
3198                 goto out_no_io_page;
3199
3200         if (drbd_bm_init(mdev))
3201                 goto out_no_bitmap;
3202         /* no need to lock access, we are still initializing this minor device. */
3203         if (!tl_init(mdev))
3204                 goto out_no_tl;
3205
3206         mdev->app_reads_hash = kzalloc(APP_R_HSIZE*sizeof(void *), GFP_KERNEL);
3207         if (!mdev->app_reads_hash)
3208                 goto out_no_app_reads;
3209
3210         mdev->current_epoch = kzalloc(sizeof(struct drbd_epoch), GFP_KERNEL);
3211         if (!mdev->current_epoch)
3212                 goto out_no_epoch;
3213
3214         INIT_LIST_HEAD(&mdev->current_epoch->list);
3215         mdev->epochs = 1;
3216
3217         return mdev;
3218
3219 /* out_whatever_else:
3220         kfree(mdev->current_epoch); */
3221 out_no_epoch:
3222         kfree(mdev->app_reads_hash);
3223 out_no_app_reads:
3224         tl_cleanup(mdev);
3225 out_no_tl:
3226         drbd_bm_cleanup(mdev);
3227 out_no_bitmap:
3228         __free_page(mdev->md_io_page);
3229 out_no_io_page:
3230         put_disk(disk);
3231 out_no_disk:
3232         blk_cleanup_queue(q);
3233 out_no_q:
3234         free_cpumask_var(mdev->cpu_mask);
3235 out_no_cpumask:
3236         kfree(mdev);
3237         return NULL;
3238 }
3239
3240 /* counterpart of drbd_new_device.
3241  * last part of drbd_delete_device. */
3242 void drbd_free_mdev(struct drbd_conf *mdev)
3243 {
3244         kfree(mdev->current_epoch);
3245         kfree(mdev->app_reads_hash);
3246         tl_cleanup(mdev);
3247         if (mdev->bitmap) /* should no longer be there. */
3248                 drbd_bm_cleanup(mdev);
3249         __free_page(mdev->md_io_page);
3250         put_disk(mdev->vdisk);
3251         blk_cleanup_queue(mdev->rq_queue);
3252         free_cpumask_var(mdev->cpu_mask);
3253         kfree(mdev);
3254 }
3255
3256
3257 int __init drbd_init(void)
3258 {
3259         int err;
3260
3261         if (sizeof(struct p_handshake) != 80) {
3262                 printk(KERN_ERR
3263                        "drbd: never change the size or layout "
3264                        "of the HandShake packet.\n");
3265                 return -EINVAL;
3266         }
3267
3268         if (1 > minor_count || minor_count > 255) {
3269                 printk(KERN_ERR
3270                         "drbd: invalid minor_count (%d)\n", minor_count);
3271 #ifdef MODULE
3272                 return -EINVAL;
3273 #else
3274                 minor_count = 8;
3275 #endif
3276         }
3277
3278         err = drbd_nl_init();
3279         if (err)
3280                 return err;
3281
3282         err = register_blkdev(DRBD_MAJOR, "drbd");
3283         if (err) {
3284                 printk(KERN_ERR
3285                        "drbd: unable to register block device major %d\n",
3286                        DRBD_MAJOR);
3287                 return err;
3288         }
3289
3290         register_reboot_notifier(&drbd_notifier);
3291
3292         /*
3293          * allocate all necessary structs
3294          */
3295         err = -ENOMEM;
3296
3297         init_waitqueue_head(&drbd_pp_wait);
3298
3299         drbd_proc = NULL; /* play safe for drbd_cleanup */
3300         minor_table = kzalloc(sizeof(struct drbd_conf *)*minor_count,
3301                                 GFP_KERNEL);
3302         if (!minor_table)
3303                 goto Enomem;
3304
3305         err = drbd_create_mempools();
3306         if (err)
3307                 goto Enomem;
3308
3309         drbd_proc = proc_create_data("drbd", S_IFREG | S_IRUGO , NULL, &drbd_proc_fops, NULL);
3310         if (!drbd_proc) {
3311                 printk(KERN_ERR "drbd: unable to register proc file\n");
3312                 goto Enomem;
3313         }
3314
3315         rwlock_init(&global_state_lock);
3316
3317         printk(KERN_INFO "drbd: initialized. "
3318                "Version: " REL_VERSION " (api:%d/proto:%d-%d)\n",
3319                API_VERSION, PRO_VERSION_MIN, PRO_VERSION_MAX);
3320         printk(KERN_INFO "drbd: %s\n", drbd_buildtag());
3321         printk(KERN_INFO "drbd: registered as block device major %d\n",
3322                 DRBD_MAJOR);
3323         printk(KERN_INFO "drbd: minor_table @ 0x%p\n", minor_table);
3324
3325         return 0; /* Success! */
3326
3327 Enomem:
3328         drbd_cleanup();
3329         if (err == -ENOMEM)
3330                 /* currently always the case */
3331                 printk(KERN_ERR "drbd: ran out of memory\n");
3332         else
3333                 printk(KERN_ERR "drbd: initialization failure\n");
3334         return err;
3335 }
3336
3337 void drbd_free_bc(struct drbd_backing_dev *ldev)
3338 {
3339         if (ldev == NULL)
3340                 return;
3341
3342         bd_release(ldev->backing_bdev);
3343         bd_release(ldev->md_bdev);
3344
3345         fput(ldev->lo_file);
3346         fput(ldev->md_file);
3347
3348         kfree(ldev);
3349 }
3350
3351 void drbd_free_sock(struct drbd_conf *mdev)
3352 {
3353         if (mdev->data.socket) {
3354                 mutex_lock(&mdev->data.mutex);
3355                 kernel_sock_shutdown(mdev->data.socket, SHUT_RDWR);
3356                 sock_release(mdev->data.socket);
3357                 mdev->data.socket = NULL;
3358                 mutex_unlock(&mdev->data.mutex);
3359         }
3360         if (mdev->meta.socket) {
3361                 mutex_lock(&mdev->meta.mutex);
3362                 kernel_sock_shutdown(mdev->meta.socket, SHUT_RDWR);
3363                 sock_release(mdev->meta.socket);
3364                 mdev->meta.socket = NULL;
3365                 mutex_unlock(&mdev->meta.mutex);
3366         }
3367 }
3368
3369
3370 void drbd_free_resources(struct drbd_conf *mdev)
3371 {
3372         crypto_free_hash(mdev->csums_tfm);
3373         mdev->csums_tfm = NULL;
3374         crypto_free_hash(mdev->verify_tfm);
3375         mdev->verify_tfm = NULL;
3376         crypto_free_hash(mdev->cram_hmac_tfm);
3377         mdev->cram_hmac_tfm = NULL;
3378         crypto_free_hash(mdev->integrity_w_tfm);
3379         mdev->integrity_w_tfm = NULL;
3380         crypto_free_hash(mdev->integrity_r_tfm);
3381         mdev->integrity_r_tfm = NULL;
3382
3383         drbd_free_sock(mdev);
3384
3385         __no_warn(local,
3386                   drbd_free_bc(mdev->ldev);
3387                   mdev->ldev = NULL;);
3388 }
3389
3390 /* meta data management */
3391
3392 struct meta_data_on_disk {
3393         u64 la_size;           /* last agreed size. */
3394         u64 uuid[UI_SIZE];   /* UUIDs. */
3395         u64 device_uuid;
3396         u64 reserved_u64_1;
3397         u32 flags;             /* MDF */
3398         u32 magic;
3399         u32 md_size_sect;
3400         u32 al_offset;         /* offset to this block */
3401         u32 al_nr_extents;     /* important for restoring the AL */
3402               /* `-- act_log->nr_elements <-- sync_conf.al_extents */
3403         u32 bm_offset;         /* offset to the bitmap, from here */
3404         u32 bm_bytes_per_bit;  /* BM_BLOCK_SIZE */
3405         u32 reserved_u32[4];
3406
3407 } __packed;
3408
3409 /**
3410  * drbd_md_sync() - Writes the meta data super block if the MD_DIRTY flag bit is set
3411  * @mdev:       DRBD device.
3412  */
3413 void drbd_md_sync(struct drbd_conf *mdev)
3414 {
3415         struct meta_data_on_disk *buffer;
3416         sector_t sector;
3417         int i;
3418
3419         del_timer(&mdev->md_sync_timer);
3420         /* timer may be rearmed by drbd_md_mark_dirty() now. */
3421         if (!test_and_clear_bit(MD_DIRTY, &mdev->flags))
3422                 return;
3423
3424         /* We use here D_FAILED and not D_ATTACHING because we try to write
3425          * metadata even if we detach due to a disk failure! */
3426         if (!get_ldev_if_state(mdev, D_FAILED))
3427                 return;
3428
3429         mutex_lock(&mdev->md_io_mutex);
3430         buffer = (struct meta_data_on_disk *)page_address(mdev->md_io_page);
3431         memset(buffer, 0, 512);
3432
3433         buffer->la_size = cpu_to_be64(drbd_get_capacity(mdev->this_bdev));
3434         for (i = UI_CURRENT; i < UI_SIZE; i++)
3435                 buffer->uuid[i] = cpu_to_be64(mdev->ldev->md.uuid[i]);
3436         buffer->flags = cpu_to_be32(mdev->ldev->md.flags);
3437         buffer->magic = cpu_to_be32(DRBD_MD_MAGIC);
3438
3439         buffer->md_size_sect  = cpu_to_be32(mdev->ldev->md.md_size_sect);
3440         buffer->al_offset     = cpu_to_be32(mdev->ldev->md.al_offset);
3441         buffer->al_nr_extents = cpu_to_be32(mdev->act_log->nr_elements);
3442         buffer->bm_bytes_per_bit = cpu_to_be32(BM_BLOCK_SIZE);
3443         buffer->device_uuid = cpu_to_be64(mdev->ldev->md.device_uuid);
3444
3445         buffer->bm_offset = cpu_to_be32(mdev->ldev->md.bm_offset);
3446
3447         D_ASSERT(drbd_md_ss__(mdev, mdev->ldev) == mdev->ldev->md.md_offset);
3448         sector = mdev->ldev->md.md_offset;
3449
3450         if (!drbd_md_sync_page_io(mdev, mdev->ldev, sector, WRITE)) {
3451                 /* this was a try anyways ... */
3452                 dev_err(DEV, "meta data update failed!\n");
3453                 drbd_chk_io_error(mdev, 1, TRUE);
3454         }
3455
3456         /* Update mdev->ldev->md.la_size_sect,
3457          * since we updated it on metadata. */
3458         mdev->ldev->md.la_size_sect = drbd_get_capacity(mdev->this_bdev);
3459
3460         mutex_unlock(&mdev->md_io_mutex);
3461         put_ldev(mdev);
3462 }
3463
3464 /**
3465  * drbd_md_read() - Reads in the meta data super block
3466  * @mdev:       DRBD device.
3467  * @bdev:       Device from which the meta data should be read in.
3468  *
3469  * Return 0 (NO_ERROR) on success, and an enum drbd_ret_codes in case
3470  * something goes wrong.  Currently only: ERR_IO_MD_DISK, ERR_MD_INVALID.
3471  */
3472 int drbd_md_read(struct drbd_conf *mdev, struct drbd_backing_dev *bdev)
3473 {
3474         struct meta_data_on_disk *buffer;
3475         int i, rv = NO_ERROR;
3476
3477         if (!get_ldev_if_state(mdev, D_ATTACHING))
3478                 return ERR_IO_MD_DISK;
3479
3480         mutex_lock(&mdev->md_io_mutex);
3481         buffer = (struct meta_data_on_disk *)page_address(mdev->md_io_page);
3482
3483         if (!drbd_md_sync_page_io(mdev, bdev, bdev->md.md_offset, READ)) {
3484                 /* NOTE: cant do normal error processing here as this is
3485                    called BEFORE disk is attached */
3486                 dev_err(DEV, "Error while reading metadata.\n");
3487                 rv = ERR_IO_MD_DISK;
3488                 goto err;
3489         }
3490
3491         if (be32_to_cpu(buffer->magic) != DRBD_MD_MAGIC) {
3492                 dev_err(DEV, "Error while reading metadata, magic not found.\n");
3493                 rv = ERR_MD_INVALID;
3494                 goto err;
3495         }
3496         if (be32_to_cpu(buffer->al_offset) != bdev->md.al_offset) {
3497                 dev_err(DEV, "unexpected al_offset: %d (expected %d)\n",
3498                     be32_to_cpu(buffer->al_offset), bdev->md.al_offset);
3499                 rv = ERR_MD_INVALID;
3500                 goto err;
3501         }
3502         if (be32_to_cpu(buffer->bm_offset) != bdev->md.bm_offset) {
3503                 dev_err(DEV, "unexpected bm_offset: %d (expected %d)\n",
3504                     be32_to_cpu(buffer->bm_offset), bdev->md.bm_offset);
3505                 rv = ERR_MD_INVALID;
3506                 goto err;
3507         }
3508         if (be32_to_cpu(buffer->md_size_sect) != bdev->md.md_size_sect) {
3509                 dev_err(DEV, "unexpected md_size: %u (expected %u)\n",
3510                     be32_to_cpu(buffer->md_size_sect), bdev->md.md_size_sect);
3511                 rv = ERR_MD_INVALID;
3512                 goto err;
3513         }
3514
3515         if (be32_to_cpu(buffer->bm_bytes_per_bit) != BM_BLOCK_SIZE) {
3516                 dev_err(DEV, "unexpected bm_bytes_per_bit: %u (expected %u)\n",
3517                     be32_to_cpu(buffer->bm_bytes_per_bit), BM_BLOCK_SIZE);
3518                 rv = ERR_MD_INVALID;
3519                 goto err;
3520         }
3521
3522         bdev->md.la_size_sect = be64_to_cpu(buffer->la_size);
3523         for (i = UI_CURRENT; i < UI_SIZE; i++)
3524                 bdev->md.uuid[i] = be64_to_cpu(buffer->uuid[i]);
3525         bdev->md.flags = be32_to_cpu(buffer->flags);
3526         mdev->sync_conf.al_extents = be32_to_cpu(buffer->al_nr_extents);
3527         bdev->md.device_uuid = be64_to_cpu(buffer->device_uuid);
3528
3529         if (mdev->sync_conf.al_extents < 7)
3530                 mdev->sync_conf.al_extents = 127;
3531
3532  err:
3533         mutex_unlock(&mdev->md_io_mutex);
3534         put_ldev(mdev);
3535
3536         return rv;
3537 }
3538
3539 /**
3540  * drbd_md_mark_dirty() - Mark meta data super block as dirty
3541  * @mdev:       DRBD device.
3542  *
3543  * Call this function if you change anything that should be written to
3544  * the meta-data super block. This function sets MD_DIRTY, and starts a
3545  * timer that ensures that within five seconds you have to call drbd_md_sync().
3546  */
3547 #ifdef DRBD_DEBUG_MD_SYNC
3548 void drbd_md_mark_dirty_(struct drbd_conf *mdev, unsigned int line, const char *func)
3549 {
3550         if (!test_and_set_bit(MD_DIRTY, &mdev->flags)) {
3551                 mod_timer(&mdev->md_sync_timer, jiffies + HZ);
3552                 mdev->last_md_mark_dirty.line = line;
3553                 mdev->last_md_mark_dirty.func = func;
3554         }
3555 }
3556 #else
3557 void drbd_md_mark_dirty(struct drbd_conf *mdev)
3558 {
3559         if (!test_and_set_bit(MD_DIRTY, &mdev->flags))
3560                 mod_timer(&mdev->md_sync_timer, jiffies + HZ);
3561 }
3562 #endif
3563
3564 static void drbd_uuid_move_history(struct drbd_conf *mdev) __must_hold(local)
3565 {
3566         int i;
3567
3568         for (i = UI_HISTORY_START; i < UI_HISTORY_END; i++)
3569                 mdev->ldev->md.uuid[i+1] = mdev->ldev->md.uuid[i];
3570 }
3571
3572 void _drbd_uuid_set(struct drbd_conf *mdev, int idx, u64 val) __must_hold(local)
3573 {
3574         if (idx == UI_CURRENT) {
3575                 if (mdev->state.role == R_PRIMARY)
3576                         val |= 1;
3577                 else
3578                         val &= ~((u64)1);
3579
3580                 drbd_set_ed_uuid(mdev, val);
3581         }
3582
3583         mdev->ldev->md.uuid[idx] = val;
3584         drbd_md_mark_dirty(mdev);
3585 }
3586
3587
3588 void drbd_uuid_set(struct drbd_conf *mdev, int idx, u64 val) __must_hold(local)
3589 {
3590         if (mdev->ldev->md.uuid[idx]) {
3591                 drbd_uuid_move_history(mdev);
3592                 mdev->ldev->md.uuid[UI_HISTORY_START] = mdev->ldev->md.uuid[idx];
3593         }
3594         _drbd_uuid_set(mdev, idx, val);
3595 }
3596
3597 /**
3598  * drbd_uuid_new_current() - Creates a new current UUID
3599  * @mdev:       DRBD device.
3600  *
3601  * Creates a new current UUID, and rotates the old current UUID into
3602  * the bitmap slot. Causes an incremental resync upon next connect.
3603  */
3604 void drbd_uuid_new_current(struct drbd_conf *mdev) __must_hold(local)
3605 {
3606         u64 val;
3607
3608         dev_info(DEV, "Creating new current UUID\n");
3609         D_ASSERT(mdev->ldev->md.uuid[UI_BITMAP] == 0);
3610         mdev->ldev->md.uuid[UI_BITMAP] = mdev->ldev->md.uuid[UI_CURRENT];
3611
3612         get_random_bytes(&val, sizeof(u64));
3613         _drbd_uuid_set(mdev, UI_CURRENT, val);
3614 }
3615
3616 void drbd_uuid_set_bm(struct drbd_conf *mdev, u64 val) __must_hold(local)
3617 {
3618         if (mdev->ldev->md.uuid[UI_BITMAP] == 0 && val == 0)
3619                 return;
3620
3621         if (val == 0) {
3622                 drbd_uuid_move_history(mdev);
3623                 mdev->ldev->md.uuid[UI_HISTORY_START] = mdev->ldev->md.uuid[UI_BITMAP];
3624                 mdev->ldev->md.uuid[UI_BITMAP] = 0;
3625         } else {
3626                 if (mdev->ldev->md.uuid[UI_BITMAP])
3627                         dev_warn(DEV, "bm UUID already set");
3628
3629                 mdev->ldev->md.uuid[UI_BITMAP] = val;
3630                 mdev->ldev->md.uuid[UI_BITMAP] &= ~((u64)1);
3631
3632         }
3633         drbd_md_mark_dirty(mdev);
3634 }
3635
3636 /**
3637  * drbd_bmio_set_n_write() - io_fn for drbd_queue_bitmap_io() or drbd_bitmap_io()
3638  * @mdev:       DRBD device.
3639  *
3640  * Sets all bits in the bitmap and writes the whole bitmap to stable storage.
3641  */
3642 int drbd_bmio_set_n_write(struct drbd_conf *mdev)
3643 {
3644         int rv = -EIO;
3645
3646         if (get_ldev_if_state(mdev, D_ATTACHING)) {
3647                 drbd_md_set_flag(mdev, MDF_FULL_SYNC);
3648                 drbd_md_sync(mdev);
3649                 drbd_bm_set_all(mdev);
3650
3651                 rv = drbd_bm_write(mdev);
3652
3653                 if (!rv) {
3654                         drbd_md_clear_flag(mdev, MDF_FULL_SYNC);
3655                         drbd_md_sync(mdev);
3656                 }
3657
3658                 put_ldev(mdev);
3659         }
3660
3661         return rv;
3662 }
3663
3664 /**
3665  * drbd_bmio_clear_n_write() - io_fn for drbd_queue_bitmap_io() or drbd_bitmap_io()
3666  * @mdev:       DRBD device.
3667  *
3668  * Clears all bits in the bitmap and writes the whole bitmap to stable storage.
3669  */
3670 int drbd_bmio_clear_n_write(struct drbd_conf *mdev)
3671 {
3672         int rv = -EIO;
3673
3674         drbd_resume_al(mdev);
3675         if (get_ldev_if_state(mdev, D_ATTACHING)) {
3676                 drbd_bm_clear_all(mdev);
3677                 rv = drbd_bm_write(mdev);
3678                 put_ldev(mdev);
3679         }
3680
3681         return rv;
3682 }
3683
3684 static int w_bitmap_io(struct drbd_conf *mdev, struct drbd_work *w, int unused)
3685 {
3686         struct bm_io_work *work = container_of(w, struct bm_io_work, w);
3687         int rv;
3688
3689         D_ASSERT(atomic_read(&mdev->ap_bio_cnt) == 0);
3690
3691         drbd_bm_lock(mdev, work->why);
3692         rv = work->io_fn(mdev);
3693         drbd_bm_unlock(mdev);
3694
3695         clear_bit(BITMAP_IO, &mdev->flags);
3696         wake_up(&mdev->misc_wait);
3697
3698         if (work->done)
3699                 work->done(mdev, rv);
3700
3701         clear_bit(BITMAP_IO_QUEUED, &mdev->flags);
3702         work->why = NULL;
3703
3704         return 1;
3705 }
3706
3707 /**
3708  * drbd_queue_bitmap_io() - Queues an IO operation on the whole bitmap
3709  * @mdev:       DRBD device.
3710  * @io_fn:      IO callback to be called when bitmap IO is possible
3711  * @done:       callback to be called after the bitmap IO was performed
3712  * @why:        Descriptive text of the reason for doing the IO
3713  *
3714  * While IO on the bitmap happens we freeze application IO thus we ensure
3715  * that drbd_set_out_of_sync() can not be called. This function MAY ONLY be
3716  * called from worker context. It MUST NOT be used while a previous such
3717  * work is still pending!
3718  */
3719 void drbd_queue_bitmap_io(struct drbd_conf *mdev,
3720                           int (*io_fn)(struct drbd_conf *),
3721                           void (*done)(struct drbd_conf *, int),
3722                           char *why)
3723 {
3724         D_ASSERT(current == mdev->worker.task);
3725
3726         D_ASSERT(!test_bit(BITMAP_IO_QUEUED, &mdev->flags));
3727         D_ASSERT(!test_bit(BITMAP_IO, &mdev->flags));
3728         D_ASSERT(list_empty(&mdev->bm_io_work.w.list));
3729         if (mdev->bm_io_work.why)
3730                 dev_err(DEV, "FIXME going to queue '%s' but '%s' still pending?\n",
3731                         why, mdev->bm_io_work.why);
3732
3733         mdev->bm_io_work.io_fn = io_fn;
3734         mdev->bm_io_work.done = done;
3735         mdev->bm_io_work.why = why;
3736
3737         set_bit(BITMAP_IO, &mdev->flags);
3738         if (atomic_read(&mdev->ap_bio_cnt) == 0) {
3739                 if (list_empty(&mdev->bm_io_work.w.list)) {
3740                         set_bit(BITMAP_IO_QUEUED, &mdev->flags);
3741                         drbd_queue_work(&mdev->data.work, &mdev->bm_io_work.w);
3742                 } else
3743                         dev_err(DEV, "FIXME avoided double queuing bm_io_work\n");
3744         }
3745 }
3746
3747 /**
3748  * drbd_bitmap_io() -  Does an IO operation on the whole bitmap
3749  * @mdev:       DRBD device.
3750  * @io_fn:      IO callback to be called when bitmap IO is possible
3751  * @why:        Descriptive text of the reason for doing the IO
3752  *
3753  * freezes application IO while that the actual IO operations runs. This
3754  * functions MAY NOT be called from worker context.
3755  */
3756 int drbd_bitmap_io(struct drbd_conf *mdev, int (*io_fn)(struct drbd_conf *), char *why)
3757 {
3758         int rv;
3759
3760         D_ASSERT(current != mdev->worker.task);
3761
3762         drbd_suspend_io(mdev);
3763
3764         drbd_bm_lock(mdev, why);
3765         rv = io_fn(mdev);
3766         drbd_bm_unlock(mdev);
3767
3768         drbd_resume_io(mdev);
3769
3770         return rv;
3771 }
3772
3773 void drbd_md_set_flag(struct drbd_conf *mdev, int flag) __must_hold(local)
3774 {
3775         if ((mdev->ldev->md.flags & flag) != flag) {
3776                 drbd_md_mark_dirty(mdev);
3777                 mdev->ldev->md.flags |= flag;
3778         }
3779 }
3780
3781 void drbd_md_clear_flag(struct drbd_conf *mdev, int flag) __must_hold(local)
3782 {
3783         if ((mdev->ldev->md.flags & flag) != 0) {
3784                 drbd_md_mark_dirty(mdev);
3785                 mdev->ldev->md.flags &= ~flag;
3786         }
3787 }
3788 int drbd_md_test_flag(struct drbd_backing_dev *bdev, int flag)
3789 {
3790         return (bdev->md.flags & flag) != 0;
3791 }
3792
3793 static void md_sync_timer_fn(unsigned long data)
3794 {
3795         struct drbd_conf *mdev = (struct drbd_conf *) data;
3796
3797         drbd_queue_work_front(&mdev->data.work, &mdev->md_sync_work);
3798 }
3799
3800 static int w_md_sync(struct drbd_conf *mdev, struct drbd_work *w, int unused)
3801 {
3802         dev_warn(DEV, "md_sync_timer expired! Worker calls drbd_md_sync().\n");
3803 #ifdef DEBUG
3804         dev_warn(DEV, "last md_mark_dirty: %s:%u\n",
3805                 mdev->last_md_mark_dirty.func, mdev->last_md_mark_dirty.line);
3806 #endif
3807         drbd_md_sync(mdev);
3808         return 1;
3809 }
3810
3811 #ifdef CONFIG_DRBD_FAULT_INJECTION
3812 /* Fault insertion support including random number generator shamelessly
3813  * stolen from kernel/rcutorture.c */
3814 struct fault_random_state {
3815         unsigned long state;
3816         unsigned long count;
3817 };
3818
3819 #define FAULT_RANDOM_MULT 39916801  /* prime */
3820 #define FAULT_RANDOM_ADD        479001701 /* prime */
3821 #define FAULT_RANDOM_REFRESH 10000
3822
3823 /*
3824  * Crude but fast random-number generator.  Uses a linear congruential
3825  * generator, with occasional help from get_random_bytes().
3826  */
3827 static unsigned long
3828 _drbd_fault_random(struct fault_random_state *rsp)
3829 {
3830         long refresh;
3831
3832         if (!rsp->count--) {
3833                 get_random_bytes(&refresh, sizeof(refresh));
3834                 rsp->state += refresh;
3835                 rsp->count = FAULT_RANDOM_REFRESH;
3836         }
3837         rsp->state = rsp->state * FAULT_RANDOM_MULT + FAULT_RANDOM_ADD;
3838         return swahw32(rsp->state);
3839 }
3840
3841 static char *
3842 _drbd_fault_str(unsigned int type) {
3843         static char *_faults[] = {
3844                 [DRBD_FAULT_MD_WR] = "Meta-data write",
3845                 [DRBD_FAULT_MD_RD] = "Meta-data read",
3846                 [DRBD_FAULT_RS_WR] = "Resync write",
3847                 [DRBD_FAULT_RS_RD] = "Resync read",
3848                 [DRBD_FAULT_DT_WR] = "Data write",
3849                 [DRBD_FAULT_DT_RD] = "Data read",
3850                 [DRBD_FAULT_DT_RA] = "Data read ahead",
3851                 [DRBD_FAULT_BM_ALLOC] = "BM allocation",
3852                 [DRBD_FAULT_AL_EE] = "EE allocation",
3853                 [DRBD_FAULT_RECEIVE] = "receive data corruption",
3854         };
3855
3856         return (type < DRBD_FAULT_MAX) ? _faults[type] : "**Unknown**";
3857 }
3858
3859 unsigned int
3860 _drbd_insert_fault(struct drbd_conf *mdev, unsigned int type)
3861 {
3862         static struct fault_random_state rrs = {0, 0};
3863
3864         unsigned int ret = (
3865                 (fault_devs == 0 ||
3866                         ((1 << mdev_to_minor(mdev)) & fault_devs) != 0) &&
3867                 (((_drbd_fault_random(&rrs) % 100) + 1) <= fault_rate));
3868
3869         if (ret) {
3870                 fault_count++;
3871
3872                 if (__ratelimit(&drbd_ratelimit_state))
3873                         dev_warn(DEV, "***Simulating %s failure\n",
3874                                 _drbd_fault_str(type));
3875         }
3876
3877         return ret;
3878 }
3879 #endif
3880
3881 const char *drbd_buildtag(void)
3882 {
3883         /* DRBD built from external sources has here a reference to the
3884            git hash of the source code. */
3885
3886         static char buildtag[38] = "\0uilt-in";
3887
3888         if (buildtag[0] == 0) {
3889 #ifdef CONFIG_MODULES
3890                 if (THIS_MODULE != NULL)
3891                         sprintf(buildtag, "srcversion: %-24s", THIS_MODULE->srcversion);
3892                 else
3893 #endif
3894                         buildtag[0] = 'b';
3895         }
3896
3897         return buildtag;
3898 }
3899
3900 module_init(drbd_init)
3901 module_exit(drbd_cleanup)
3902
3903 EXPORT_SYMBOL(drbd_conn_str);
3904 EXPORT_SYMBOL(drbd_role_str);
3905 EXPORT_SYMBOL(drbd_disk_str);
3906 EXPORT_SYMBOL(drbd_set_st_err_str);