]> git.karo-electronics.de Git - mv-sheeva.git/blob - drivers/block/drbd/drbd_main.c
drbd: Do not send two barriers without any writes between them
[mv-sheeva.git] / drivers / block / drbd / drbd_main.c
1 /*
2    drbd.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    Thanks to Carter Burden, Bart Grantham and Gennadiy Nerubayev
11    from Logicworks, Inc. for making SDP replication support possible.
12
13    drbd is free software; you can redistribute it and/or modify
14    it under the terms of the GNU General Public License as published by
15    the Free Software Foundation; either version 2, or (at your option)
16    any later version.
17
18    drbd is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
21    GNU General Public License for more details.
22
23    You should have received a copy of the GNU General Public License
24    along with drbd; see the file COPYING.  If not, write to
25    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
26
27  */
28
29 #include <linux/module.h>
30 #include <linux/drbd.h>
31 #include <asm/uaccess.h>
32 #include <asm/types.h>
33 #include <net/sock.h>
34 #include <linux/ctype.h>
35 #include <linux/smp_lock.h>
36 #include <linux/fs.h>
37 #include <linux/file.h>
38 #include <linux/proc_fs.h>
39 #include <linux/init.h>
40 #include <linux/mm.h>
41 #include <linux/memcontrol.h>
42 #include <linux/mm_inline.h>
43 #include <linux/slab.h>
44 #include <linux/random.h>
45 #include <linux/reboot.h>
46 #include <linux/notifier.h>
47 #include <linux/kthread.h>
48
49 #define __KERNEL_SYSCALLS__
50 #include <linux/unistd.h>
51 #include <linux/vmalloc.h>
52
53 #include <linux/drbd_limits.h>
54 #include "drbd_int.h"
55 #include "drbd_req.h" /* only for _req_mod in tl_release and tl_clear */
56
57 #include "drbd_vli.h"
58
59 struct after_state_chg_work {
60         struct drbd_work w;
61         union drbd_state os;
62         union drbd_state ns;
63         enum chg_state_flags flags;
64         struct completion *done;
65 };
66
67 int drbdd_init(struct drbd_thread *);
68 int drbd_worker(struct drbd_thread *);
69 int drbd_asender(struct drbd_thread *);
70
71 int drbd_init(void);
72 static int drbd_open(struct block_device *bdev, fmode_t mode);
73 static int drbd_release(struct gendisk *gd, fmode_t mode);
74 static int w_after_state_ch(struct drbd_conf *mdev, struct drbd_work *w, int unused);
75 static void after_state_ch(struct drbd_conf *mdev, union drbd_state os,
76                            union drbd_state ns, enum chg_state_flags flags);
77 static int w_md_sync(struct drbd_conf *mdev, struct drbd_work *w, int unused);
78 static void md_sync_timer_fn(unsigned long data);
79 static int w_bitmap_io(struct drbd_conf *mdev, struct drbd_work *w, int unused);
80
81 MODULE_AUTHOR("Philipp Reisner <phil@linbit.com>, "
82               "Lars Ellenberg <lars@linbit.com>");
83 MODULE_DESCRIPTION("drbd - Distributed Replicated Block Device v" REL_VERSION);
84 MODULE_VERSION(REL_VERSION);
85 MODULE_LICENSE("GPL");
86 MODULE_PARM_DESC(minor_count, "Maximum number of drbd devices (1-255)");
87 MODULE_ALIAS_BLOCKDEV_MAJOR(DRBD_MAJOR);
88
89 #include <linux/moduleparam.h>
90 /* allow_open_on_secondary */
91 MODULE_PARM_DESC(allow_oos, "DONT USE!");
92 /* thanks to these macros, if compiled into the kernel (not-module),
93  * this becomes the boot parameter drbd.minor_count */
94 module_param(minor_count, uint, 0444);
95 module_param(disable_sendpage, bool, 0644);
96 module_param(allow_oos, bool, 0);
97 module_param(cn_idx, uint, 0444);
98 module_param(proc_details, int, 0644);
99
100 #ifdef CONFIG_DRBD_FAULT_INJECTION
101 int enable_faults;
102 int fault_rate;
103 static int fault_count;
104 int fault_devs;
105 /* bitmap of enabled faults */
106 module_param(enable_faults, int, 0664);
107 /* fault rate % value - applies to all enabled faults */
108 module_param(fault_rate, int, 0664);
109 /* count of faults inserted */
110 module_param(fault_count, int, 0664);
111 /* bitmap of devices to insert faults on */
112 module_param(fault_devs, int, 0644);
113 #endif
114
115 /* module parameter, defined */
116 unsigned int minor_count = 32;
117 int disable_sendpage;
118 int allow_oos;
119 unsigned int cn_idx = CN_IDX_DRBD;
120 int proc_details;       /* Detail level in proc drbd*/
121
122 /* Module parameter for setting the user mode helper program
123  * to run. Default is /sbin/drbdadm */
124 char usermode_helper[80] = "/sbin/drbdadm";
125
126 module_param_string(usermode_helper, usermode_helper, sizeof(usermode_helper), 0644);
127
128 /* in 2.6.x, our device mapping and config info contains our virtual gendisks
129  * as member "struct gendisk *vdisk;"
130  */
131 struct drbd_conf **minor_table;
132
133 struct kmem_cache *drbd_request_cache;
134 struct kmem_cache *drbd_ee_cache;       /* epoch entries */
135 struct kmem_cache *drbd_bm_ext_cache;   /* bitmap extents */
136 struct kmem_cache *drbd_al_ext_cache;   /* activity log extents */
137 mempool_t *drbd_request_mempool;
138 mempool_t *drbd_ee_mempool;
139
140 /* I do not use a standard mempool, because:
141    1) I want to hand out the pre-allocated objects first.
142    2) I want to be able to interrupt sleeping allocation with a signal.
143    Note: This is a single linked list, the next pointer is the private
144          member of struct page.
145  */
146 struct page *drbd_pp_pool;
147 spinlock_t   drbd_pp_lock;
148 int          drbd_pp_vacant;
149 wait_queue_head_t drbd_pp_wait;
150
151 DEFINE_RATELIMIT_STATE(drbd_ratelimit_state, 5 * HZ, 5);
152
153 static const struct block_device_operations drbd_ops = {
154         .owner =   THIS_MODULE,
155         .open =    drbd_open,
156         .release = drbd_release,
157 };
158
159 #define ARRY_SIZE(A) (sizeof(A)/sizeof(A[0]))
160
161 #ifdef __CHECKER__
162 /* When checking with sparse, and this is an inline function, sparse will
163    give tons of false positives. When this is a real functions sparse works.
164  */
165 int _get_ldev_if_state(struct drbd_conf *mdev, enum drbd_disk_state mins)
166 {
167         int io_allowed;
168
169         atomic_inc(&mdev->local_cnt);
170         io_allowed = (mdev->state.disk >= mins);
171         if (!io_allowed) {
172                 if (atomic_dec_and_test(&mdev->local_cnt))
173                         wake_up(&mdev->misc_wait);
174         }
175         return io_allowed;
176 }
177
178 #endif
179
180 /**
181  * DOC: The transfer log
182  *
183  * The transfer log is a single linked list of &struct drbd_tl_epoch objects.
184  * mdev->newest_tle points to the head, mdev->oldest_tle points to the tail
185  * of the list. There is always at least one &struct drbd_tl_epoch object.
186  *
187  * Each &struct drbd_tl_epoch has a circular double linked list of requests
188  * attached.
189  */
190 static int tl_init(struct drbd_conf *mdev)
191 {
192         struct drbd_tl_epoch *b;
193
194         /* during device minor initialization, we may well use GFP_KERNEL */
195         b = kmalloc(sizeof(struct drbd_tl_epoch), GFP_KERNEL);
196         if (!b)
197                 return 0;
198         INIT_LIST_HEAD(&b->requests);
199         INIT_LIST_HEAD(&b->w.list);
200         b->next = NULL;
201         b->br_number = 4711;
202         b->n_writes = 0;
203         b->w.cb = NULL; /* if this is != NULL, we need to dec_ap_pending in tl_clear */
204
205         mdev->oldest_tle = b;
206         mdev->newest_tle = b;
207         INIT_LIST_HEAD(&mdev->out_of_sequence_requests);
208
209         mdev->tl_hash = NULL;
210         mdev->tl_hash_s = 0;
211
212         return 1;
213 }
214
215 static void tl_cleanup(struct drbd_conf *mdev)
216 {
217         D_ASSERT(mdev->oldest_tle == mdev->newest_tle);
218         D_ASSERT(list_empty(&mdev->out_of_sequence_requests));
219         kfree(mdev->oldest_tle);
220         mdev->oldest_tle = NULL;
221         kfree(mdev->unused_spare_tle);
222         mdev->unused_spare_tle = NULL;
223         kfree(mdev->tl_hash);
224         mdev->tl_hash = NULL;
225         mdev->tl_hash_s = 0;
226 }
227
228 /**
229  * _tl_add_barrier() - Adds a barrier to the transfer log
230  * @mdev:       DRBD device.
231  * @new:        Barrier to be added before the current head of the TL.
232  *
233  * The caller must hold the req_lock.
234  */
235 void _tl_add_barrier(struct drbd_conf *mdev, struct drbd_tl_epoch *new)
236 {
237         struct drbd_tl_epoch *newest_before;
238
239         INIT_LIST_HEAD(&new->requests);
240         INIT_LIST_HEAD(&new->w.list);
241         new->w.cb = NULL; /* if this is != NULL, we need to dec_ap_pending in tl_clear */
242         new->next = NULL;
243         new->n_writes = 0;
244
245         newest_before = mdev->newest_tle;
246         /* never send a barrier number == 0, because that is special-cased
247          * when using TCQ for our write ordering code */
248         new->br_number = (newest_before->br_number+1) ?: 1;
249         if (mdev->newest_tle != new) {
250                 mdev->newest_tle->next = new;
251                 mdev->newest_tle = new;
252         }
253 }
254
255 /**
256  * tl_release() - Free or recycle the oldest &struct drbd_tl_epoch object of the TL
257  * @mdev:       DRBD device.
258  * @barrier_nr: Expected identifier of the DRBD write barrier packet.
259  * @set_size:   Expected number of requests before that barrier.
260  *
261  * In case the passed barrier_nr or set_size does not match the oldest
262  * &struct drbd_tl_epoch objects this function will cause a termination
263  * of the connection.
264  */
265 void tl_release(struct drbd_conf *mdev, unsigned int barrier_nr,
266                        unsigned int set_size)
267 {
268         struct drbd_tl_epoch *b, *nob; /* next old barrier */
269         struct list_head *le, *tle;
270         struct drbd_request *r;
271
272         spin_lock_irq(&mdev->req_lock);
273
274         b = mdev->oldest_tle;
275
276         /* first some paranoia code */
277         if (b == NULL) {
278                 dev_err(DEV, "BAD! BarrierAck #%u received, but no epoch in tl!?\n",
279                         barrier_nr);
280                 goto bail;
281         }
282         if (b->br_number != barrier_nr) {
283                 dev_err(DEV, "BAD! BarrierAck #%u received, expected #%u!\n",
284                         barrier_nr, b->br_number);
285                 goto bail;
286         }
287         if (b->n_writes != set_size) {
288                 dev_err(DEV, "BAD! BarrierAck #%u received with n_writes=%u, expected n_writes=%u!\n",
289                         barrier_nr, set_size, b->n_writes);
290                 goto bail;
291         }
292
293         /* Clean up list of requests processed during current epoch */
294         list_for_each_safe(le, tle, &b->requests) {
295                 r = list_entry(le, struct drbd_request, tl_requests);
296                 _req_mod(r, barrier_acked);
297         }
298         /* There could be requests on the list waiting for completion
299            of the write to the local disk. To avoid corruptions of
300            slab's data structures we have to remove the lists head.
301
302            Also there could have been a barrier ack out of sequence, overtaking
303            the write acks - which would be a bug and violating write ordering.
304            To not deadlock in case we lose connection while such requests are
305            still pending, we need some way to find them for the
306            _req_mode(connection_lost_while_pending).
307
308            These have been list_move'd to the out_of_sequence_requests list in
309            _req_mod(, barrier_acked) above.
310            */
311         list_del_init(&b->requests);
312
313         nob = b->next;
314         if (test_and_clear_bit(CREATE_BARRIER, &mdev->flags)) {
315                 _tl_add_barrier(mdev, b);
316                 if (nob)
317                         mdev->oldest_tle = nob;
318                 /* if nob == NULL b was the only barrier, and becomes the new
319                    barrier. Therefore mdev->oldest_tle points already to b */
320         } else {
321                 D_ASSERT(nob != NULL);
322                 mdev->oldest_tle = nob;
323                 kfree(b);
324         }
325
326         spin_unlock_irq(&mdev->req_lock);
327         dec_ap_pending(mdev);
328
329         return;
330
331 bail:
332         spin_unlock_irq(&mdev->req_lock);
333         drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
334 }
335
336 /**
337  * _tl_restart() - Walks the transfer log, and applies an action to all requests
338  * @mdev:       DRBD device.
339  * @what:       The action/event to perform with all request objects
340  *
341  * @what might be one of connection_lost_while_pending, resend, fail_frozen_disk_io,
342  * restart_frozen_disk_io.
343  */
344 static void _tl_restart(struct drbd_conf *mdev, enum drbd_req_event what)
345 {
346         struct drbd_tl_epoch *b, *tmp, **pn;
347         struct list_head *le, *tle, carry_reads;
348         struct drbd_request *req;
349         int rv, n_writes, n_reads;
350
351         b = mdev->oldest_tle;
352         pn = &mdev->oldest_tle;
353         while (b) {
354                 n_writes = 0;
355                 n_reads = 0;
356                 INIT_LIST_HEAD(&carry_reads);
357                 list_for_each_safe(le, tle, &b->requests) {
358                         req = list_entry(le, struct drbd_request, tl_requests);
359                         rv = _req_mod(req, what);
360
361                         n_writes += (rv & MR_WRITE) >> MR_WRITE_SHIFT;
362                         n_reads  += (rv & MR_READ) >> MR_READ_SHIFT;
363                 }
364                 tmp = b->next;
365
366                 if (n_writes) {
367                         if (what == resend) {
368                                 b->n_writes = n_writes;
369                                 if (b->w.cb == NULL) {
370                                         b->w.cb = w_send_barrier;
371                                         inc_ap_pending(mdev);
372                                         set_bit(CREATE_BARRIER, &mdev->flags);
373                                 }
374
375                                 drbd_queue_work(&mdev->data.work, &b->w);
376                         }
377                         pn = &b->next;
378                 } else {
379                         if (n_reads)
380                                 list_add(&carry_reads, &b->requests);
381                         /* there could still be requests on that ring list,
382                          * in case local io is still pending */
383                         list_del(&b->requests);
384
385                         /* dec_ap_pending corresponding to queue_barrier.
386                          * the newest barrier may not have been queued yet,
387                          * in which case w.cb is still NULL. */
388                         if (b->w.cb != NULL)
389                                 dec_ap_pending(mdev);
390
391                         if (b == mdev->newest_tle) {
392                                 /* recycle, but reinit! */
393                                 D_ASSERT(tmp == NULL);
394                                 INIT_LIST_HEAD(&b->requests);
395                                 list_splice(&carry_reads, &b->requests);
396                                 INIT_LIST_HEAD(&b->w.list);
397                                 b->w.cb = NULL;
398                                 b->br_number = net_random();
399                                 b->n_writes = 0;
400
401                                 *pn = b;
402                                 break;
403                         }
404                         *pn = tmp;
405                         kfree(b);
406                 }
407                 b = tmp;
408                 list_splice(&carry_reads, &b->requests);
409         }
410 }
411
412
413 /**
414  * tl_clear() - Clears all requests and &struct drbd_tl_epoch objects out of the TL
415  * @mdev:       DRBD device.
416  *
417  * This is called after the connection to the peer was lost. The storage covered
418  * by the requests on the transfer gets marked as our of sync. Called from the
419  * receiver thread and the worker thread.
420  */
421 void tl_clear(struct drbd_conf *mdev)
422 {
423         struct list_head *le, *tle;
424         struct drbd_request *r;
425
426         spin_lock_irq(&mdev->req_lock);
427
428         _tl_restart(mdev, connection_lost_while_pending);
429
430         /* we expect this list to be empty. */
431         D_ASSERT(list_empty(&mdev->out_of_sequence_requests));
432
433         /* but just in case, clean it up anyways! */
434         list_for_each_safe(le, tle, &mdev->out_of_sequence_requests) {
435                 r = list_entry(le, struct drbd_request, tl_requests);
436                 /* It would be nice to complete outside of spinlock.
437                  * But this is easier for now. */
438                 _req_mod(r, connection_lost_while_pending);
439         }
440
441         /* ensure bit indicating barrier is required is clear */
442         clear_bit(CREATE_BARRIER, &mdev->flags);
443
444         memset(mdev->app_reads_hash, 0, APP_R_HSIZE*sizeof(void *));
445
446         spin_unlock_irq(&mdev->req_lock);
447 }
448
449 void tl_restart(struct drbd_conf *mdev, enum drbd_req_event what)
450 {
451         spin_lock_irq(&mdev->req_lock);
452         _tl_restart(mdev, what);
453         spin_unlock_irq(&mdev->req_lock);
454 }
455
456 /**
457  * cl_wide_st_chg() - TRUE if the state change is a cluster wide one
458  * @mdev:       DRBD device.
459  * @os:         old (current) state.
460  * @ns:         new (wanted) state.
461  */
462 static int cl_wide_st_chg(struct drbd_conf *mdev,
463                           union drbd_state os, union drbd_state ns)
464 {
465         return (os.conn >= C_CONNECTED && ns.conn >= C_CONNECTED &&
466                  ((os.role != R_PRIMARY && ns.role == R_PRIMARY) ||
467                   (os.conn != C_STARTING_SYNC_T && ns.conn == C_STARTING_SYNC_T) ||
468                   (os.conn != C_STARTING_SYNC_S && ns.conn == C_STARTING_SYNC_S) ||
469                   (os.disk != D_DISKLESS && ns.disk == D_DISKLESS))) ||
470                 (os.conn >= C_CONNECTED && ns.conn == C_DISCONNECTING) ||
471                 (os.conn == C_CONNECTED && ns.conn == C_VERIFY_S);
472 }
473
474 int drbd_change_state(struct drbd_conf *mdev, enum chg_state_flags f,
475                       union drbd_state mask, union drbd_state val)
476 {
477         unsigned long flags;
478         union drbd_state os, ns;
479         int rv;
480
481         spin_lock_irqsave(&mdev->req_lock, flags);
482         os = mdev->state;
483         ns.i = (os.i & ~mask.i) | val.i;
484         rv = _drbd_set_state(mdev, ns, f, NULL);
485         ns = mdev->state;
486         spin_unlock_irqrestore(&mdev->req_lock, flags);
487
488         return rv;
489 }
490
491 /**
492  * drbd_force_state() - Impose a change which happens outside our control on our state
493  * @mdev:       DRBD device.
494  * @mask:       mask of state bits to change.
495  * @val:        value of new state bits.
496  */
497 void drbd_force_state(struct drbd_conf *mdev,
498         union drbd_state mask, union drbd_state val)
499 {
500         drbd_change_state(mdev, CS_HARD, mask, val);
501 }
502
503 static int is_valid_state(struct drbd_conf *mdev, union drbd_state ns);
504 static int is_valid_state_transition(struct drbd_conf *,
505                                      union drbd_state, union drbd_state);
506 static union drbd_state sanitize_state(struct drbd_conf *mdev, union drbd_state os,
507                                        union drbd_state ns, int *warn_sync_abort);
508 int drbd_send_state_req(struct drbd_conf *,
509                         union drbd_state, union drbd_state);
510
511 static enum drbd_state_ret_codes _req_st_cond(struct drbd_conf *mdev,
512                                     union drbd_state mask, union drbd_state val)
513 {
514         union drbd_state os, ns;
515         unsigned long flags;
516         int rv;
517
518         if (test_and_clear_bit(CL_ST_CHG_SUCCESS, &mdev->flags))
519                 return SS_CW_SUCCESS;
520
521         if (test_and_clear_bit(CL_ST_CHG_FAIL, &mdev->flags))
522                 return SS_CW_FAILED_BY_PEER;
523
524         rv = 0;
525         spin_lock_irqsave(&mdev->req_lock, flags);
526         os = mdev->state;
527         ns.i = (os.i & ~mask.i) | val.i;
528         ns = sanitize_state(mdev, os, ns, NULL);
529
530         if (!cl_wide_st_chg(mdev, os, ns))
531                 rv = SS_CW_NO_NEED;
532         if (!rv) {
533                 rv = is_valid_state(mdev, ns);
534                 if (rv == SS_SUCCESS) {
535                         rv = is_valid_state_transition(mdev, ns, os);
536                         if (rv == SS_SUCCESS)
537                                 rv = 0; /* cont waiting, otherwise fail. */
538                 }
539         }
540         spin_unlock_irqrestore(&mdev->req_lock, flags);
541
542         return rv;
543 }
544
545 /**
546  * drbd_req_state() - Perform an eventually cluster wide state change
547  * @mdev:       DRBD device.
548  * @mask:       mask of state bits to change.
549  * @val:        value of new state bits.
550  * @f:          flags
551  *
552  * Should not be called directly, use drbd_request_state() or
553  * _drbd_request_state().
554  */
555 static int drbd_req_state(struct drbd_conf *mdev,
556                           union drbd_state mask, union drbd_state val,
557                           enum chg_state_flags f)
558 {
559         struct completion done;
560         unsigned long flags;
561         union drbd_state os, ns;
562         int rv;
563
564         init_completion(&done);
565
566         if (f & CS_SERIALIZE)
567                 mutex_lock(&mdev->state_mutex);
568
569         spin_lock_irqsave(&mdev->req_lock, flags);
570         os = mdev->state;
571         ns.i = (os.i & ~mask.i) | val.i;
572         ns = sanitize_state(mdev, os, ns, NULL);
573
574         if (cl_wide_st_chg(mdev, os, ns)) {
575                 rv = is_valid_state(mdev, ns);
576                 if (rv == SS_SUCCESS)
577                         rv = is_valid_state_transition(mdev, ns, os);
578                 spin_unlock_irqrestore(&mdev->req_lock, flags);
579
580                 if (rv < SS_SUCCESS) {
581                         if (f & CS_VERBOSE)
582                                 print_st_err(mdev, os, ns, rv);
583                         goto abort;
584                 }
585
586                 drbd_state_lock(mdev);
587                 if (!drbd_send_state_req(mdev, mask, val)) {
588                         drbd_state_unlock(mdev);
589                         rv = SS_CW_FAILED_BY_PEER;
590                         if (f & CS_VERBOSE)
591                                 print_st_err(mdev, os, ns, rv);
592                         goto abort;
593                 }
594
595                 wait_event(mdev->state_wait,
596                         (rv = _req_st_cond(mdev, mask, val)));
597
598                 if (rv < SS_SUCCESS) {
599                         drbd_state_unlock(mdev);
600                         if (f & CS_VERBOSE)
601                                 print_st_err(mdev, os, ns, rv);
602                         goto abort;
603                 }
604                 spin_lock_irqsave(&mdev->req_lock, flags);
605                 os = mdev->state;
606                 ns.i = (os.i & ~mask.i) | val.i;
607                 rv = _drbd_set_state(mdev, ns, f, &done);
608                 drbd_state_unlock(mdev);
609         } else {
610                 rv = _drbd_set_state(mdev, ns, f, &done);
611         }
612
613         spin_unlock_irqrestore(&mdev->req_lock, flags);
614
615         if (f & CS_WAIT_COMPLETE && rv == SS_SUCCESS) {
616                 D_ASSERT(current != mdev->worker.task);
617                 wait_for_completion(&done);
618         }
619
620 abort:
621         if (f & CS_SERIALIZE)
622                 mutex_unlock(&mdev->state_mutex);
623
624         return rv;
625 }
626
627 /**
628  * _drbd_request_state() - Request a state change (with flags)
629  * @mdev:       DRBD device.
630  * @mask:       mask of state bits to change.
631  * @val:        value of new state bits.
632  * @f:          flags
633  *
634  * Cousin of drbd_request_state(), useful with the CS_WAIT_COMPLETE
635  * flag, or when logging of failed state change requests is not desired.
636  */
637 int _drbd_request_state(struct drbd_conf *mdev, union drbd_state mask,
638                         union drbd_state val,   enum chg_state_flags f)
639 {
640         int rv;
641
642         wait_event(mdev->state_wait,
643                    (rv = drbd_req_state(mdev, mask, val, f)) != SS_IN_TRANSIENT_STATE);
644
645         return rv;
646 }
647
648 static void print_st(struct drbd_conf *mdev, char *name, union drbd_state ns)
649 {
650         dev_err(DEV, " %s = { cs:%s ro:%s/%s ds:%s/%s %c%c%c%c }\n",
651             name,
652             drbd_conn_str(ns.conn),
653             drbd_role_str(ns.role),
654             drbd_role_str(ns.peer),
655             drbd_disk_str(ns.disk),
656             drbd_disk_str(ns.pdsk),
657             ns.susp ? 's' : 'r',
658             ns.aftr_isp ? 'a' : '-',
659             ns.peer_isp ? 'p' : '-',
660             ns.user_isp ? 'u' : '-'
661             );
662 }
663
664 void print_st_err(struct drbd_conf *mdev,
665         union drbd_state os, union drbd_state ns, int err)
666 {
667         if (err == SS_IN_TRANSIENT_STATE)
668                 return;
669         dev_err(DEV, "State change failed: %s\n", drbd_set_st_err_str(err));
670         print_st(mdev, " state", os);
671         print_st(mdev, "wanted", ns);
672 }
673
674
675 #define drbd_peer_str drbd_role_str
676 #define drbd_pdsk_str drbd_disk_str
677
678 #define drbd_susp_str(A)     ((A) ? "1" : "0")
679 #define drbd_aftr_isp_str(A) ((A) ? "1" : "0")
680 #define drbd_peer_isp_str(A) ((A) ? "1" : "0")
681 #define drbd_user_isp_str(A) ((A) ? "1" : "0")
682
683 #define PSC(A) \
684         ({ if (ns.A != os.A) { \
685                 pbp += sprintf(pbp, #A "( %s -> %s ) ", \
686                               drbd_##A##_str(os.A), \
687                               drbd_##A##_str(ns.A)); \
688         } })
689
690 /**
691  * is_valid_state() - Returns an SS_ error code if ns is not valid
692  * @mdev:       DRBD device.
693  * @ns:         State to consider.
694  */
695 static int is_valid_state(struct drbd_conf *mdev, union drbd_state ns)
696 {
697         /* See drbd_state_sw_errors in drbd_strings.c */
698
699         enum drbd_fencing_p fp;
700         int rv = SS_SUCCESS;
701
702         fp = FP_DONT_CARE;
703         if (get_ldev(mdev)) {
704                 fp = mdev->ldev->dc.fencing;
705                 put_ldev(mdev);
706         }
707
708         if (get_net_conf(mdev)) {
709                 if (!mdev->net_conf->two_primaries &&
710                     ns.role == R_PRIMARY && ns.peer == R_PRIMARY)
711                         rv = SS_TWO_PRIMARIES;
712                 put_net_conf(mdev);
713         }
714
715         if (rv <= 0)
716                 /* already found a reason to abort */;
717         else if (ns.role == R_SECONDARY && mdev->open_cnt)
718                 rv = SS_DEVICE_IN_USE;
719
720         else if (ns.role == R_PRIMARY && ns.conn < C_CONNECTED && ns.disk < D_UP_TO_DATE)
721                 rv = SS_NO_UP_TO_DATE_DISK;
722
723         else if (fp >= FP_RESOURCE &&
724                  ns.role == R_PRIMARY && ns.conn < C_CONNECTED && ns.pdsk >= D_UNKNOWN)
725                 rv = SS_PRIMARY_NOP;
726
727         else if (ns.role == R_PRIMARY && ns.disk <= D_INCONSISTENT && ns.pdsk <= D_INCONSISTENT)
728                 rv = SS_NO_UP_TO_DATE_DISK;
729
730         else if (ns.conn > C_CONNECTED && ns.disk < D_INCONSISTENT)
731                 rv = SS_NO_LOCAL_DISK;
732
733         else if (ns.conn > C_CONNECTED && ns.pdsk < D_INCONSISTENT)
734                 rv = SS_NO_REMOTE_DISK;
735
736         else if (ns.conn > C_CONNECTED && ns.disk < D_UP_TO_DATE && ns.pdsk < D_UP_TO_DATE)
737                 rv = SS_NO_UP_TO_DATE_DISK;
738
739         else if ((ns.conn == C_CONNECTED ||
740                   ns.conn == C_WF_BITMAP_S ||
741                   ns.conn == C_SYNC_SOURCE ||
742                   ns.conn == C_PAUSED_SYNC_S) &&
743                   ns.disk == D_OUTDATED)
744                 rv = SS_CONNECTED_OUTDATES;
745
746         else if ((ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T) &&
747                  (mdev->sync_conf.verify_alg[0] == 0))
748                 rv = SS_NO_VERIFY_ALG;
749
750         else if ((ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T) &&
751                   mdev->agreed_pro_version < 88)
752                 rv = SS_NOT_SUPPORTED;
753
754         return rv;
755 }
756
757 /**
758  * is_valid_state_transition() - Returns an SS_ error code if the state transition is not possible
759  * @mdev:       DRBD device.
760  * @ns:         new state.
761  * @os:         old state.
762  */
763 static int is_valid_state_transition(struct drbd_conf *mdev,
764                                      union drbd_state ns, union drbd_state os)
765 {
766         int rv = SS_SUCCESS;
767
768         if ((ns.conn == C_STARTING_SYNC_T || ns.conn == C_STARTING_SYNC_S) &&
769             os.conn > C_CONNECTED)
770                 rv = SS_RESYNC_RUNNING;
771
772         if (ns.conn == C_DISCONNECTING && os.conn == C_STANDALONE)
773                 rv = SS_ALREADY_STANDALONE;
774
775         if (ns.disk > D_ATTACHING && os.disk == D_DISKLESS)
776                 rv = SS_IS_DISKLESS;
777
778         if (ns.conn == C_WF_CONNECTION && os.conn < C_UNCONNECTED)
779                 rv = SS_NO_NET_CONFIG;
780
781         if (ns.disk == D_OUTDATED && os.disk < D_OUTDATED && os.disk != D_ATTACHING)
782                 rv = SS_LOWER_THAN_OUTDATED;
783
784         if (ns.conn == C_DISCONNECTING && os.conn == C_UNCONNECTED)
785                 rv = SS_IN_TRANSIENT_STATE;
786
787         if (ns.conn == os.conn && ns.conn == C_WF_REPORT_PARAMS)
788                 rv = SS_IN_TRANSIENT_STATE;
789
790         if ((ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T) && os.conn < C_CONNECTED)
791                 rv = SS_NEED_CONNECTION;
792
793         if ((ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T) &&
794             ns.conn != os.conn && os.conn > C_CONNECTED)
795                 rv = SS_RESYNC_RUNNING;
796
797         if ((ns.conn == C_STARTING_SYNC_S || ns.conn == C_STARTING_SYNC_T) &&
798             os.conn < C_CONNECTED)
799                 rv = SS_NEED_CONNECTION;
800
801         return rv;
802 }
803
804 /**
805  * sanitize_state() - Resolves implicitly necessary additional changes to a state transition
806  * @mdev:       DRBD device.
807  * @os:         old state.
808  * @ns:         new state.
809  * @warn_sync_abort:
810  *
811  * When we loose connection, we have to set the state of the peers disk (pdsk)
812  * to D_UNKNOWN. This rule and many more along those lines are in this function.
813  */
814 static union drbd_state sanitize_state(struct drbd_conf *mdev, union drbd_state os,
815                                        union drbd_state ns, int *warn_sync_abort)
816 {
817         enum drbd_fencing_p fp;
818
819         fp = FP_DONT_CARE;
820         if (get_ldev(mdev)) {
821                 fp = mdev->ldev->dc.fencing;
822                 put_ldev(mdev);
823         }
824
825         /* Disallow Network errors to configure a device's network part */
826         if ((ns.conn >= C_TIMEOUT && ns.conn <= C_TEAR_DOWN) &&
827             os.conn <= C_DISCONNECTING)
828                 ns.conn = os.conn;
829
830         /* After a network error (+C_TEAR_DOWN) only C_UNCONNECTED or C_DISCONNECTING can follow */
831         if (os.conn >= C_TIMEOUT && os.conn <= C_TEAR_DOWN &&
832             ns.conn != C_UNCONNECTED && ns.conn != C_DISCONNECTING)
833                 ns.conn = os.conn;
834
835         /* After C_DISCONNECTING only C_STANDALONE may follow */
836         if (os.conn == C_DISCONNECTING && ns.conn != C_STANDALONE)
837                 ns.conn = os.conn;
838
839         if (ns.conn < C_CONNECTED) {
840                 ns.peer_isp = 0;
841                 ns.peer = R_UNKNOWN;
842                 if (ns.pdsk > D_UNKNOWN || ns.pdsk < D_INCONSISTENT)
843                         ns.pdsk = D_UNKNOWN;
844         }
845
846         /* Clear the aftr_isp when becoming unconfigured */
847         if (ns.conn == C_STANDALONE && ns.disk == D_DISKLESS && ns.role == R_SECONDARY)
848                 ns.aftr_isp = 0;
849
850         if (ns.conn <= C_DISCONNECTING && ns.disk == D_DISKLESS)
851                 ns.pdsk = D_UNKNOWN;
852
853         /* Abort resync if a disk fails/detaches */
854         if (os.conn > C_CONNECTED && ns.conn > C_CONNECTED &&
855             (ns.disk <= D_FAILED || ns.pdsk <= D_FAILED)) {
856                 if (warn_sync_abort)
857                         *warn_sync_abort = 1;
858                 ns.conn = C_CONNECTED;
859         }
860
861         if (ns.conn >= C_CONNECTED &&
862             ((ns.disk == D_CONSISTENT || ns.disk == D_OUTDATED) ||
863              (ns.disk == D_NEGOTIATING && ns.conn == C_WF_BITMAP_T))) {
864                 switch (ns.conn) {
865                 case C_WF_BITMAP_T:
866                 case C_PAUSED_SYNC_T:
867                         ns.disk = D_OUTDATED;
868                         break;
869                 case C_CONNECTED:
870                 case C_WF_BITMAP_S:
871                 case C_SYNC_SOURCE:
872                 case C_PAUSED_SYNC_S:
873                         ns.disk = D_UP_TO_DATE;
874                         break;
875                 case C_SYNC_TARGET:
876                         ns.disk = D_INCONSISTENT;
877                         dev_warn(DEV, "Implicitly set disk state Inconsistent!\n");
878                         break;
879                 }
880                 if (os.disk == D_OUTDATED && ns.disk == D_UP_TO_DATE)
881                         dev_warn(DEV, "Implicitly set disk from Outdated to UpToDate\n");
882         }
883
884         if (ns.conn >= C_CONNECTED &&
885             (ns.pdsk == D_CONSISTENT || ns.pdsk == D_OUTDATED)) {
886                 switch (ns.conn) {
887                 case C_CONNECTED:
888                 case C_WF_BITMAP_T:
889                 case C_PAUSED_SYNC_T:
890                 case C_SYNC_TARGET:
891                         ns.pdsk = D_UP_TO_DATE;
892                         break;
893                 case C_WF_BITMAP_S:
894                 case C_PAUSED_SYNC_S:
895                         /* remap any consistent state to D_OUTDATED,
896                          * but disallow "upgrade" of not even consistent states.
897                          */
898                         ns.pdsk =
899                                 (D_DISKLESS < os.pdsk && os.pdsk < D_OUTDATED)
900                                 ? os.pdsk : D_OUTDATED;
901                         break;
902                 case C_SYNC_SOURCE:
903                         ns.pdsk = D_INCONSISTENT;
904                         dev_warn(DEV, "Implicitly set pdsk Inconsistent!\n");
905                         break;
906                 }
907                 if (os.pdsk == D_OUTDATED && ns.pdsk == D_UP_TO_DATE)
908                         dev_warn(DEV, "Implicitly set pdsk from Outdated to UpToDate\n");
909         }
910
911         /* Connection breaks down before we finished "Negotiating" */
912         if (ns.conn < C_CONNECTED && ns.disk == D_NEGOTIATING &&
913             get_ldev_if_state(mdev, D_NEGOTIATING)) {
914                 if (mdev->ed_uuid == mdev->ldev->md.uuid[UI_CURRENT]) {
915                         ns.disk = mdev->new_state_tmp.disk;
916                         ns.pdsk = mdev->new_state_tmp.pdsk;
917                 } else {
918                         dev_alert(DEV, "Connection lost while negotiating, no data!\n");
919                         ns.disk = D_DISKLESS;
920                         ns.pdsk = D_UNKNOWN;
921                 }
922                 put_ldev(mdev);
923         }
924
925         if (fp == FP_STONITH &&
926             (ns.role == R_PRIMARY && ns.conn < C_CONNECTED && ns.pdsk > D_OUTDATED) &&
927             !(os.role == R_PRIMARY && os.conn < C_CONNECTED && os.pdsk > D_OUTDATED))
928                 ns.susp = 1;
929
930         if (ns.aftr_isp || ns.peer_isp || ns.user_isp) {
931                 if (ns.conn == C_SYNC_SOURCE)
932                         ns.conn = C_PAUSED_SYNC_S;
933                 if (ns.conn == C_SYNC_TARGET)
934                         ns.conn = C_PAUSED_SYNC_T;
935         } else {
936                 if (ns.conn == C_PAUSED_SYNC_S)
937                         ns.conn = C_SYNC_SOURCE;
938                 if (ns.conn == C_PAUSED_SYNC_T)
939                         ns.conn = C_SYNC_TARGET;
940         }
941
942         return ns;
943 }
944
945 /* helper for __drbd_set_state */
946 static void set_ov_position(struct drbd_conf *mdev, enum drbd_conns cs)
947 {
948         if (cs == C_VERIFY_T) {
949                 /* starting online verify from an arbitrary position
950                  * does not fit well into the existing protocol.
951                  * on C_VERIFY_T, we initialize ov_left and friends
952                  * implicitly in receive_DataRequest once the
953                  * first P_OV_REQUEST is received */
954                 mdev->ov_start_sector = ~(sector_t)0;
955         } else {
956                 unsigned long bit = BM_SECT_TO_BIT(mdev->ov_start_sector);
957                 if (bit >= mdev->rs_total)
958                         mdev->ov_start_sector =
959                                 BM_BIT_TO_SECT(mdev->rs_total - 1);
960                 mdev->ov_position = mdev->ov_start_sector;
961         }
962 }
963
964 /**
965  * __drbd_set_state() - Set a new DRBD state
966  * @mdev:       DRBD device.
967  * @ns:         new state.
968  * @flags:      Flags
969  * @done:       Optional completion, that will get completed after the after_state_ch() finished
970  *
971  * Caller needs to hold req_lock, and global_state_lock. Do not call directly.
972  */
973 int __drbd_set_state(struct drbd_conf *mdev,
974                     union drbd_state ns, enum chg_state_flags flags,
975                     struct completion *done)
976 {
977         union drbd_state os;
978         int rv = SS_SUCCESS;
979         int warn_sync_abort = 0;
980         struct after_state_chg_work *ascw;
981
982         os = mdev->state;
983
984         ns = sanitize_state(mdev, os, ns, &warn_sync_abort);
985
986         if (ns.i == os.i)
987                 return SS_NOTHING_TO_DO;
988
989         if (!(flags & CS_HARD)) {
990                 /*  pre-state-change checks ; only look at ns  */
991                 /* See drbd_state_sw_errors in drbd_strings.c */
992
993                 rv = is_valid_state(mdev, ns);
994                 if (rv < SS_SUCCESS) {
995                         /* If the old state was illegal as well, then let
996                            this happen...*/
997
998                         if (is_valid_state(mdev, os) == rv) {
999                                 dev_err(DEV, "Considering state change from bad state. "
1000                                     "Error would be: '%s'\n",
1001                                     drbd_set_st_err_str(rv));
1002                                 print_st(mdev, "old", os);
1003                                 print_st(mdev, "new", ns);
1004                                 rv = is_valid_state_transition(mdev, ns, os);
1005                         }
1006                 } else
1007                         rv = is_valid_state_transition(mdev, ns, os);
1008         }
1009
1010         if (rv < SS_SUCCESS) {
1011                 if (flags & CS_VERBOSE)
1012                         print_st_err(mdev, os, ns, rv);
1013                 return rv;
1014         }
1015
1016         if (warn_sync_abort)
1017                 dev_warn(DEV, "Resync aborted.\n");
1018
1019         {
1020                 char *pbp, pb[300];
1021                 pbp = pb;
1022                 *pbp = 0;
1023                 PSC(role);
1024                 PSC(peer);
1025                 PSC(conn);
1026                 PSC(disk);
1027                 PSC(pdsk);
1028                 PSC(susp);
1029                 PSC(aftr_isp);
1030                 PSC(peer_isp);
1031                 PSC(user_isp);
1032                 dev_info(DEV, "%s\n", pb);
1033         }
1034
1035         /* solve the race between becoming unconfigured,
1036          * worker doing the cleanup, and
1037          * admin reconfiguring us:
1038          * on (re)configure, first set CONFIG_PENDING,
1039          * then wait for a potentially exiting worker,
1040          * start the worker, and schedule one no_op.
1041          * then proceed with configuration.
1042          */
1043         if (ns.disk == D_DISKLESS &&
1044             ns.conn == C_STANDALONE &&
1045             ns.role == R_SECONDARY &&
1046             !test_and_set_bit(CONFIG_PENDING, &mdev->flags))
1047                 set_bit(DEVICE_DYING, &mdev->flags);
1048
1049         mdev->state.i = ns.i;
1050         wake_up(&mdev->misc_wait);
1051         wake_up(&mdev->state_wait);
1052
1053         /*   post-state-change actions   */
1054         if (os.conn >= C_SYNC_SOURCE   && ns.conn <= C_CONNECTED) {
1055                 set_bit(STOP_SYNC_TIMER, &mdev->flags);
1056                 mod_timer(&mdev->resync_timer, jiffies);
1057         }
1058
1059         /* aborted verify run. log the last position */
1060         if ((os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) &&
1061             ns.conn < C_CONNECTED) {
1062                 mdev->ov_start_sector =
1063                         BM_BIT_TO_SECT(mdev->rs_total - mdev->ov_left);
1064                 dev_info(DEV, "Online Verify reached sector %llu\n",
1065                         (unsigned long long)mdev->ov_start_sector);
1066         }
1067
1068         if ((os.conn == C_PAUSED_SYNC_T || os.conn == C_PAUSED_SYNC_S) &&
1069             (ns.conn == C_SYNC_TARGET  || ns.conn == C_SYNC_SOURCE)) {
1070                 dev_info(DEV, "Syncer continues.\n");
1071                 mdev->rs_paused += (long)jiffies-(long)mdev->rs_mark_time;
1072                 if (ns.conn == C_SYNC_TARGET) {
1073                         if (!test_and_clear_bit(STOP_SYNC_TIMER, &mdev->flags))
1074                                 mod_timer(&mdev->resync_timer, jiffies);
1075                         /* This if (!test_bit) is only needed for the case
1076                            that a device that has ceased to used its timer,
1077                            i.e. it is already in drbd_resync_finished() gets
1078                            paused and resumed. */
1079                 }
1080         }
1081
1082         if ((os.conn == C_SYNC_TARGET  || os.conn == C_SYNC_SOURCE) &&
1083             (ns.conn == C_PAUSED_SYNC_T || ns.conn == C_PAUSED_SYNC_S)) {
1084                 dev_info(DEV, "Resync suspended\n");
1085                 mdev->rs_mark_time = jiffies;
1086                 if (ns.conn == C_PAUSED_SYNC_T)
1087                         set_bit(STOP_SYNC_TIMER, &mdev->flags);
1088         }
1089
1090         if (os.conn == C_CONNECTED &&
1091             (ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T)) {
1092                 mdev->ov_position = 0;
1093                 mdev->rs_total =
1094                 mdev->rs_mark_left = drbd_bm_bits(mdev);
1095                 if (mdev->agreed_pro_version >= 90)
1096                         set_ov_position(mdev, ns.conn);
1097                 else
1098                         mdev->ov_start_sector = 0;
1099                 mdev->ov_left = mdev->rs_total
1100                               - BM_SECT_TO_BIT(mdev->ov_position);
1101                 mdev->rs_start     =
1102                 mdev->rs_mark_time = jiffies;
1103                 mdev->ov_last_oos_size = 0;
1104                 mdev->ov_last_oos_start = 0;
1105
1106                 if (ns.conn == C_VERIFY_S) {
1107                         dev_info(DEV, "Starting Online Verify from sector %llu\n",
1108                                         (unsigned long long)mdev->ov_position);
1109                         mod_timer(&mdev->resync_timer, jiffies);
1110                 }
1111         }
1112
1113         if (get_ldev(mdev)) {
1114                 u32 mdf = mdev->ldev->md.flags & ~(MDF_CONSISTENT|MDF_PRIMARY_IND|
1115                                                  MDF_CONNECTED_IND|MDF_WAS_UP_TO_DATE|
1116                                                  MDF_PEER_OUT_DATED|MDF_CRASHED_PRIMARY);
1117
1118                 if (test_bit(CRASHED_PRIMARY, &mdev->flags))
1119                         mdf |= MDF_CRASHED_PRIMARY;
1120                 if (mdev->state.role == R_PRIMARY ||
1121                     (mdev->state.pdsk < D_INCONSISTENT && mdev->state.peer == R_PRIMARY))
1122                         mdf |= MDF_PRIMARY_IND;
1123                 if (mdev->state.conn > C_WF_REPORT_PARAMS)
1124                         mdf |= MDF_CONNECTED_IND;
1125                 if (mdev->state.disk > D_INCONSISTENT)
1126                         mdf |= MDF_CONSISTENT;
1127                 if (mdev->state.disk > D_OUTDATED)
1128                         mdf |= MDF_WAS_UP_TO_DATE;
1129                 if (mdev->state.pdsk <= D_OUTDATED && mdev->state.pdsk >= D_INCONSISTENT)
1130                         mdf |= MDF_PEER_OUT_DATED;
1131                 if (mdf != mdev->ldev->md.flags) {
1132                         mdev->ldev->md.flags = mdf;
1133                         drbd_md_mark_dirty(mdev);
1134                 }
1135                 if (os.disk < D_CONSISTENT && ns.disk >= D_CONSISTENT)
1136                         drbd_set_ed_uuid(mdev, mdev->ldev->md.uuid[UI_CURRENT]);
1137                 put_ldev(mdev);
1138         }
1139
1140         /* Peer was forced D_UP_TO_DATE & R_PRIMARY, consider to resync */
1141         if (os.disk == D_INCONSISTENT && os.pdsk == D_INCONSISTENT &&
1142             os.peer == R_SECONDARY && ns.peer == R_PRIMARY)
1143                 set_bit(CONSIDER_RESYNC, &mdev->flags);
1144
1145         /* Receiver should clean up itself */
1146         if (os.conn != C_DISCONNECTING && ns.conn == C_DISCONNECTING)
1147                 drbd_thread_stop_nowait(&mdev->receiver);
1148
1149         /* Now the receiver finished cleaning up itself, it should die */
1150         if (os.conn != C_STANDALONE && ns.conn == C_STANDALONE)
1151                 drbd_thread_stop_nowait(&mdev->receiver);
1152
1153         /* Upon network failure, we need to restart the receiver. */
1154         if (os.conn > C_TEAR_DOWN &&
1155             ns.conn <= C_TEAR_DOWN && ns.conn >= C_TIMEOUT)
1156                 drbd_thread_restart_nowait(&mdev->receiver);
1157
1158         ascw = kmalloc(sizeof(*ascw), GFP_ATOMIC);
1159         if (ascw) {
1160                 ascw->os = os;
1161                 ascw->ns = ns;
1162                 ascw->flags = flags;
1163                 ascw->w.cb = w_after_state_ch;
1164                 ascw->done = done;
1165                 drbd_queue_work(&mdev->data.work, &ascw->w);
1166         } else {
1167                 dev_warn(DEV, "Could not kmalloc an ascw\n");
1168         }
1169
1170         return rv;
1171 }
1172
1173 static int w_after_state_ch(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1174 {
1175         struct after_state_chg_work *ascw =
1176                 container_of(w, struct after_state_chg_work, w);
1177         after_state_ch(mdev, ascw->os, ascw->ns, ascw->flags);
1178         if (ascw->flags & CS_WAIT_COMPLETE) {
1179                 D_ASSERT(ascw->done != NULL);
1180                 complete(ascw->done);
1181         }
1182         kfree(ascw);
1183
1184         return 1;
1185 }
1186
1187 static void abw_start_sync(struct drbd_conf *mdev, int rv)
1188 {
1189         if (rv) {
1190                 dev_err(DEV, "Writing the bitmap failed not starting resync.\n");
1191                 _drbd_request_state(mdev, NS(conn, C_CONNECTED), CS_VERBOSE);
1192                 return;
1193         }
1194
1195         switch (mdev->state.conn) {
1196         case C_STARTING_SYNC_T:
1197                 _drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
1198                 break;
1199         case C_STARTING_SYNC_S:
1200                 drbd_start_resync(mdev, C_SYNC_SOURCE);
1201                 break;
1202         }
1203 }
1204
1205 /**
1206  * after_state_ch() - Perform after state change actions that may sleep
1207  * @mdev:       DRBD device.
1208  * @os:         old state.
1209  * @ns:         new state.
1210  * @flags:      Flags
1211  */
1212 static void after_state_ch(struct drbd_conf *mdev, union drbd_state os,
1213                            union drbd_state ns, enum chg_state_flags flags)
1214 {
1215         enum drbd_fencing_p fp;
1216
1217         if (os.conn != C_CONNECTED && ns.conn == C_CONNECTED) {
1218                 clear_bit(CRASHED_PRIMARY, &mdev->flags);
1219                 if (mdev->p_uuid)
1220                         mdev->p_uuid[UI_FLAGS] &= ~((u64)2);
1221         }
1222
1223         fp = FP_DONT_CARE;
1224         if (get_ldev(mdev)) {
1225                 fp = mdev->ldev->dc.fencing;
1226                 put_ldev(mdev);
1227         }
1228
1229         /* Inform userspace about the change... */
1230         drbd_bcast_state(mdev, ns);
1231
1232         if (!(os.role == R_PRIMARY && os.disk < D_UP_TO_DATE && os.pdsk < D_UP_TO_DATE) &&
1233             (ns.role == R_PRIMARY && ns.disk < D_UP_TO_DATE && ns.pdsk < D_UP_TO_DATE))
1234                 drbd_khelper(mdev, "pri-on-incon-degr");
1235
1236         /* Here we have the actions that are performed after a
1237            state change. This function might sleep */
1238
1239         if (fp == FP_STONITH && ns.susp) {
1240                 /* case1: The outdate peer handler is successful:
1241                  * case2: The connection was established again: */
1242                 if ((os.pdsk > D_OUTDATED  && ns.pdsk <= D_OUTDATED) ||
1243                     (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED)) {
1244                         tl_clear(mdev);
1245                         spin_lock_irq(&mdev->req_lock);
1246                         _drbd_set_state(_NS(mdev, susp, 0), CS_VERBOSE, NULL);
1247                         spin_unlock_irq(&mdev->req_lock);
1248                 }
1249         }
1250         /* Do not change the order of the if above and the two below... */
1251         if (os.pdsk == D_DISKLESS && ns.pdsk > D_DISKLESS) {      /* attach on the peer */
1252                 drbd_send_uuids(mdev);
1253                 drbd_send_state(mdev);
1254         }
1255         if (os.conn != C_WF_BITMAP_S && ns.conn == C_WF_BITMAP_S)
1256                 drbd_queue_bitmap_io(mdev, &drbd_send_bitmap, NULL, "send_bitmap (WFBitMapS)");
1257
1258         /* Lost contact to peer's copy of the data */
1259         if ((os.pdsk >= D_INCONSISTENT &&
1260              os.pdsk != D_UNKNOWN &&
1261              os.pdsk != D_OUTDATED)
1262         &&  (ns.pdsk < D_INCONSISTENT ||
1263              ns.pdsk == D_UNKNOWN ||
1264              ns.pdsk == D_OUTDATED)) {
1265                 if (get_ldev(mdev)) {
1266                         if ((ns.role == R_PRIMARY || ns.peer == R_PRIMARY) &&
1267                             mdev->ldev->md.uuid[UI_BITMAP] == 0 && ns.disk >= D_UP_TO_DATE) {
1268                                 drbd_uuid_new_current(mdev);
1269                                 drbd_send_uuids(mdev);
1270                         }
1271                         put_ldev(mdev);
1272                 }
1273         }
1274
1275         if (ns.pdsk < D_INCONSISTENT && get_ldev(mdev)) {
1276                 if (ns.peer == R_PRIMARY && mdev->ldev->md.uuid[UI_BITMAP] == 0)
1277                         drbd_uuid_new_current(mdev);
1278
1279                 /* D_DISKLESS Peer becomes secondary */
1280                 if (os.peer == R_PRIMARY && ns.peer == R_SECONDARY)
1281                         drbd_al_to_on_disk_bm(mdev);
1282                 put_ldev(mdev);
1283         }
1284
1285         /* Last part of the attaching process ... */
1286         if (ns.conn >= C_CONNECTED &&
1287             os.disk == D_ATTACHING && ns.disk == D_NEGOTIATING) {
1288                 drbd_send_sizes(mdev, 0, 0);  /* to start sync... */
1289                 drbd_send_uuids(mdev);
1290                 drbd_send_state(mdev);
1291         }
1292
1293         /* We want to pause/continue resync, tell peer. */
1294         if (ns.conn >= C_CONNECTED &&
1295              ((os.aftr_isp != ns.aftr_isp) ||
1296               (os.user_isp != ns.user_isp)))
1297                 drbd_send_state(mdev);
1298
1299         /* In case one of the isp bits got set, suspend other devices. */
1300         if ((!os.aftr_isp && !os.peer_isp && !os.user_isp) &&
1301             (ns.aftr_isp || ns.peer_isp || ns.user_isp))
1302                 suspend_other_sg(mdev);
1303
1304         /* Make sure the peer gets informed about eventual state
1305            changes (ISP bits) while we were in WFReportParams. */
1306         if (os.conn == C_WF_REPORT_PARAMS && ns.conn >= C_CONNECTED)
1307                 drbd_send_state(mdev);
1308
1309         /* We are in the progress to start a full sync... */
1310         if ((os.conn != C_STARTING_SYNC_T && ns.conn == C_STARTING_SYNC_T) ||
1311             (os.conn != C_STARTING_SYNC_S && ns.conn == C_STARTING_SYNC_S))
1312                 drbd_queue_bitmap_io(mdev, &drbd_bmio_set_n_write, &abw_start_sync, "set_n_write from StartingSync");
1313
1314         /* We are invalidating our self... */
1315         if (os.conn < C_CONNECTED && ns.conn < C_CONNECTED &&
1316             os.disk > D_INCONSISTENT && ns.disk == D_INCONSISTENT)
1317                 drbd_queue_bitmap_io(mdev, &drbd_bmio_set_n_write, NULL, "set_n_write from invalidate");
1318
1319         if (os.disk > D_FAILED && ns.disk == D_FAILED) {
1320                 enum drbd_io_error_p eh;
1321
1322                 eh = EP_PASS_ON;
1323                 if (get_ldev_if_state(mdev, D_FAILED)) {
1324                         eh = mdev->ldev->dc.on_io_error;
1325                         put_ldev(mdev);
1326                 }
1327
1328                 drbd_rs_cancel_all(mdev);
1329                 /* since get_ldev() only works as long as disk>=D_INCONSISTENT,
1330                    and it is D_DISKLESS here, local_cnt can only go down, it can
1331                    not increase... It will reach zero */
1332                 wait_event(mdev->misc_wait, !atomic_read(&mdev->local_cnt));
1333                 mdev->rs_total = 0;
1334                 mdev->rs_failed = 0;
1335                 atomic_set(&mdev->rs_pending_cnt, 0);
1336
1337                 spin_lock_irq(&mdev->req_lock);
1338                 _drbd_set_state(_NS(mdev, disk, D_DISKLESS), CS_HARD, NULL);
1339                 spin_unlock_irq(&mdev->req_lock);
1340
1341                 if (eh == EP_CALL_HELPER)
1342                         drbd_khelper(mdev, "local-io-error");
1343         }
1344
1345         if (os.disk > D_DISKLESS && ns.disk == D_DISKLESS) {
1346
1347                 if (os.disk == D_FAILED) /* && ns.disk == D_DISKLESS*/ {
1348                         if (drbd_send_state(mdev))
1349                                 dev_warn(DEV, "Notified peer that my disk is broken.\n");
1350                         else
1351                                 dev_err(DEV, "Sending state in drbd_io_error() failed\n");
1352                 }
1353
1354                 wait_event(mdev->misc_wait, !atomic_read(&mdev->local_cnt));
1355                 lc_destroy(mdev->resync);
1356                 mdev->resync = NULL;
1357                 lc_destroy(mdev->act_log);
1358                 mdev->act_log = NULL;
1359                 __no_warn(local,
1360                         drbd_free_bc(mdev->ldev);
1361                         mdev->ldev = NULL;);
1362
1363                 if (mdev->md_io_tmpp)
1364                         __free_page(mdev->md_io_tmpp);
1365         }
1366
1367         /* Disks got bigger while they were detached */
1368         if (ns.disk > D_NEGOTIATING && ns.pdsk > D_NEGOTIATING &&
1369             test_and_clear_bit(RESYNC_AFTER_NEG, &mdev->flags)) {
1370                 if (ns.conn == C_CONNECTED)
1371                         resync_after_online_grow(mdev);
1372         }
1373
1374         /* A resync finished or aborted, wake paused devices... */
1375         if ((os.conn > C_CONNECTED && ns.conn <= C_CONNECTED) ||
1376             (os.peer_isp && !ns.peer_isp) ||
1377             (os.user_isp && !ns.user_isp))
1378                 resume_next_sg(mdev);
1379
1380         /* Upon network connection, we need to start the receiver */
1381         if (os.conn == C_STANDALONE && ns.conn == C_UNCONNECTED)
1382                 drbd_thread_start(&mdev->receiver);
1383
1384         /* Terminate worker thread if we are unconfigured - it will be
1385            restarted as needed... */
1386         if (ns.disk == D_DISKLESS &&
1387             ns.conn == C_STANDALONE &&
1388             ns.role == R_SECONDARY) {
1389                 if (os.aftr_isp != ns.aftr_isp)
1390                         resume_next_sg(mdev);
1391                 /* set in __drbd_set_state, unless CONFIG_PENDING was set */
1392                 if (test_bit(DEVICE_DYING, &mdev->flags))
1393                         drbd_thread_stop_nowait(&mdev->worker);
1394         }
1395
1396         drbd_md_sync(mdev);
1397 }
1398
1399
1400 static int drbd_thread_setup(void *arg)
1401 {
1402         struct drbd_thread *thi = (struct drbd_thread *) arg;
1403         struct drbd_conf *mdev = thi->mdev;
1404         unsigned long flags;
1405         int retval;
1406
1407 restart:
1408         retval = thi->function(thi);
1409
1410         spin_lock_irqsave(&thi->t_lock, flags);
1411
1412         /* if the receiver has been "Exiting", the last thing it did
1413          * was set the conn state to "StandAlone",
1414          * if now a re-connect request comes in, conn state goes C_UNCONNECTED,
1415          * and receiver thread will be "started".
1416          * drbd_thread_start needs to set "Restarting" in that case.
1417          * t_state check and assignment needs to be within the same spinlock,
1418          * so either thread_start sees Exiting, and can remap to Restarting,
1419          * or thread_start see None, and can proceed as normal.
1420          */
1421
1422         if (thi->t_state == Restarting) {
1423                 dev_info(DEV, "Restarting %s\n", current->comm);
1424                 thi->t_state = Running;
1425                 spin_unlock_irqrestore(&thi->t_lock, flags);
1426                 goto restart;
1427         }
1428
1429         thi->task = NULL;
1430         thi->t_state = None;
1431         smp_mb();
1432         complete(&thi->stop);
1433         spin_unlock_irqrestore(&thi->t_lock, flags);
1434
1435         dev_info(DEV, "Terminating %s\n", current->comm);
1436
1437         /* Release mod reference taken when thread was started */
1438         module_put(THIS_MODULE);
1439         return retval;
1440 }
1441
1442 static void drbd_thread_init(struct drbd_conf *mdev, struct drbd_thread *thi,
1443                       int (*func) (struct drbd_thread *))
1444 {
1445         spin_lock_init(&thi->t_lock);
1446         thi->task    = NULL;
1447         thi->t_state = None;
1448         thi->function = func;
1449         thi->mdev = mdev;
1450 }
1451
1452 int drbd_thread_start(struct drbd_thread *thi)
1453 {
1454         struct drbd_conf *mdev = thi->mdev;
1455         struct task_struct *nt;
1456         unsigned long flags;
1457
1458         const char *me =
1459                 thi == &mdev->receiver ? "receiver" :
1460                 thi == &mdev->asender  ? "asender"  :
1461                 thi == &mdev->worker   ? "worker"   : "NONSENSE";
1462
1463         /* is used from state engine doing drbd_thread_stop_nowait,
1464          * while holding the req lock irqsave */
1465         spin_lock_irqsave(&thi->t_lock, flags);
1466
1467         switch (thi->t_state) {
1468         case None:
1469                 dev_info(DEV, "Starting %s thread (from %s [%d])\n",
1470                                 me, current->comm, current->pid);
1471
1472                 /* Get ref on module for thread - this is released when thread exits */
1473                 if (!try_module_get(THIS_MODULE)) {
1474                         dev_err(DEV, "Failed to get module reference in drbd_thread_start\n");
1475                         spin_unlock_irqrestore(&thi->t_lock, flags);
1476                         return FALSE;
1477                 }
1478
1479                 init_completion(&thi->stop);
1480                 D_ASSERT(thi->task == NULL);
1481                 thi->reset_cpu_mask = 1;
1482                 thi->t_state = Running;
1483                 spin_unlock_irqrestore(&thi->t_lock, flags);
1484                 flush_signals(current); /* otherw. may get -ERESTARTNOINTR */
1485
1486                 nt = kthread_create(drbd_thread_setup, (void *) thi,
1487                                     "drbd%d_%s", mdev_to_minor(mdev), me);
1488
1489                 if (IS_ERR(nt)) {
1490                         dev_err(DEV, "Couldn't start thread\n");
1491
1492                         module_put(THIS_MODULE);
1493                         return FALSE;
1494                 }
1495                 spin_lock_irqsave(&thi->t_lock, flags);
1496                 thi->task = nt;
1497                 thi->t_state = Running;
1498                 spin_unlock_irqrestore(&thi->t_lock, flags);
1499                 wake_up_process(nt);
1500                 break;
1501         case Exiting:
1502                 thi->t_state = Restarting;
1503                 dev_info(DEV, "Restarting %s thread (from %s [%d])\n",
1504                                 me, current->comm, current->pid);
1505                 /* fall through */
1506         case Running:
1507         case Restarting:
1508         default:
1509                 spin_unlock_irqrestore(&thi->t_lock, flags);
1510                 break;
1511         }
1512
1513         return TRUE;
1514 }
1515
1516
1517 void _drbd_thread_stop(struct drbd_thread *thi, int restart, int wait)
1518 {
1519         unsigned long flags;
1520
1521         enum drbd_thread_state ns = restart ? Restarting : Exiting;
1522
1523         /* may be called from state engine, holding the req lock irqsave */
1524         spin_lock_irqsave(&thi->t_lock, flags);
1525
1526         if (thi->t_state == None) {
1527                 spin_unlock_irqrestore(&thi->t_lock, flags);
1528                 if (restart)
1529                         drbd_thread_start(thi);
1530                 return;
1531         }
1532
1533         if (thi->t_state != ns) {
1534                 if (thi->task == NULL) {
1535                         spin_unlock_irqrestore(&thi->t_lock, flags);
1536                         return;
1537                 }
1538
1539                 thi->t_state = ns;
1540                 smp_mb();
1541                 init_completion(&thi->stop);
1542                 if (thi->task != current)
1543                         force_sig(DRBD_SIGKILL, thi->task);
1544
1545         }
1546
1547         spin_unlock_irqrestore(&thi->t_lock, flags);
1548
1549         if (wait)
1550                 wait_for_completion(&thi->stop);
1551 }
1552
1553 #ifdef CONFIG_SMP
1554 /**
1555  * drbd_calc_cpu_mask() - Generate CPU masks, spread over all CPUs
1556  * @mdev:       DRBD device.
1557  *
1558  * Forces all threads of a device onto the same CPU. This is beneficial for
1559  * DRBD's performance. May be overwritten by user's configuration.
1560  */
1561 void drbd_calc_cpu_mask(struct drbd_conf *mdev)
1562 {
1563         int ord, cpu;
1564
1565         /* user override. */
1566         if (cpumask_weight(mdev->cpu_mask))
1567                 return;
1568
1569         ord = mdev_to_minor(mdev) % cpumask_weight(cpu_online_mask);
1570         for_each_online_cpu(cpu) {
1571                 if (ord-- == 0) {
1572                         cpumask_set_cpu(cpu, mdev->cpu_mask);
1573                         return;
1574                 }
1575         }
1576         /* should not be reached */
1577         cpumask_setall(mdev->cpu_mask);
1578 }
1579
1580 /**
1581  * drbd_thread_current_set_cpu() - modifies the cpu mask of the _current_ thread
1582  * @mdev:       DRBD device.
1583  *
1584  * call in the "main loop" of _all_ threads, no need for any mutex, current won't die
1585  * prematurely.
1586  */
1587 void drbd_thread_current_set_cpu(struct drbd_conf *mdev)
1588 {
1589         struct task_struct *p = current;
1590         struct drbd_thread *thi =
1591                 p == mdev->asender.task  ? &mdev->asender  :
1592                 p == mdev->receiver.task ? &mdev->receiver :
1593                 p == mdev->worker.task   ? &mdev->worker   :
1594                 NULL;
1595         ERR_IF(thi == NULL)
1596                 return;
1597         if (!thi->reset_cpu_mask)
1598                 return;
1599         thi->reset_cpu_mask = 0;
1600         set_cpus_allowed_ptr(p, mdev->cpu_mask);
1601 }
1602 #endif
1603
1604 /* the appropriate socket mutex must be held already */
1605 int _drbd_send_cmd(struct drbd_conf *mdev, struct socket *sock,
1606                           enum drbd_packets cmd, struct p_header *h,
1607                           size_t size, unsigned msg_flags)
1608 {
1609         int sent, ok;
1610
1611         ERR_IF(!h) return FALSE;
1612         ERR_IF(!size) return FALSE;
1613
1614         h->magic   = BE_DRBD_MAGIC;
1615         h->command = cpu_to_be16(cmd);
1616         h->length  = cpu_to_be16(size-sizeof(struct p_header));
1617
1618         sent = drbd_send(mdev, sock, h, size, msg_flags);
1619
1620         ok = (sent == size);
1621         if (!ok)
1622                 dev_err(DEV, "short sent %s size=%d sent=%d\n",
1623                     cmdname(cmd), (int)size, sent);
1624         return ok;
1625 }
1626
1627 /* don't pass the socket. we may only look at it
1628  * when we hold the appropriate socket mutex.
1629  */
1630 int drbd_send_cmd(struct drbd_conf *mdev, int use_data_socket,
1631                   enum drbd_packets cmd, struct p_header *h, size_t size)
1632 {
1633         int ok = 0;
1634         struct socket *sock;
1635
1636         if (use_data_socket) {
1637                 mutex_lock(&mdev->data.mutex);
1638                 sock = mdev->data.socket;
1639         } else {
1640                 mutex_lock(&mdev->meta.mutex);
1641                 sock = mdev->meta.socket;
1642         }
1643
1644         /* drbd_disconnect() could have called drbd_free_sock()
1645          * while we were waiting in down()... */
1646         if (likely(sock != NULL))
1647                 ok = _drbd_send_cmd(mdev, sock, cmd, h, size, 0);
1648
1649         if (use_data_socket)
1650                 mutex_unlock(&mdev->data.mutex);
1651         else
1652                 mutex_unlock(&mdev->meta.mutex);
1653         return ok;
1654 }
1655
1656 int drbd_send_cmd2(struct drbd_conf *mdev, enum drbd_packets cmd, char *data,
1657                    size_t size)
1658 {
1659         struct p_header h;
1660         int ok;
1661
1662         h.magic   = BE_DRBD_MAGIC;
1663         h.command = cpu_to_be16(cmd);
1664         h.length  = cpu_to_be16(size);
1665
1666         if (!drbd_get_data_sock(mdev))
1667                 return 0;
1668
1669         ok = (sizeof(h) ==
1670                 drbd_send(mdev, mdev->data.socket, &h, sizeof(h), 0));
1671         ok = ok && (size ==
1672                 drbd_send(mdev, mdev->data.socket, data, size, 0));
1673
1674         drbd_put_data_sock(mdev);
1675
1676         return ok;
1677 }
1678
1679 int drbd_send_sync_param(struct drbd_conf *mdev, struct syncer_conf *sc)
1680 {
1681         struct p_rs_param_89 *p;
1682         struct socket *sock;
1683         int size, rv;
1684         const int apv = mdev->agreed_pro_version;
1685
1686         size = apv <= 87 ? sizeof(struct p_rs_param)
1687                 : apv == 88 ? sizeof(struct p_rs_param)
1688                         + strlen(mdev->sync_conf.verify_alg) + 1
1689                 : /* 89 */    sizeof(struct p_rs_param_89);
1690
1691         /* used from admin command context and receiver/worker context.
1692          * to avoid kmalloc, grab the socket right here,
1693          * then use the pre-allocated sbuf there */
1694         mutex_lock(&mdev->data.mutex);
1695         sock = mdev->data.socket;
1696
1697         if (likely(sock != NULL)) {
1698                 enum drbd_packets cmd = apv >= 89 ? P_SYNC_PARAM89 : P_SYNC_PARAM;
1699
1700                 p = &mdev->data.sbuf.rs_param_89;
1701
1702                 /* initialize verify_alg and csums_alg */
1703                 memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
1704
1705                 p->rate = cpu_to_be32(sc->rate);
1706
1707                 if (apv >= 88)
1708                         strcpy(p->verify_alg, mdev->sync_conf.verify_alg);
1709                 if (apv >= 89)
1710                         strcpy(p->csums_alg, mdev->sync_conf.csums_alg);
1711
1712                 rv = _drbd_send_cmd(mdev, sock, cmd, &p->head, size, 0);
1713         } else
1714                 rv = 0; /* not ok */
1715
1716         mutex_unlock(&mdev->data.mutex);
1717
1718         return rv;
1719 }
1720
1721 int drbd_send_protocol(struct drbd_conf *mdev)
1722 {
1723         struct p_protocol *p;
1724         int size, cf, rv;
1725
1726         size = sizeof(struct p_protocol);
1727
1728         if (mdev->agreed_pro_version >= 87)
1729                 size += strlen(mdev->net_conf->integrity_alg) + 1;
1730
1731         /* we must not recurse into our own queue,
1732          * as that is blocked during handshake */
1733         p = kmalloc(size, GFP_NOIO);
1734         if (p == NULL)
1735                 return 0;
1736
1737         p->protocol      = cpu_to_be32(mdev->net_conf->wire_protocol);
1738         p->after_sb_0p   = cpu_to_be32(mdev->net_conf->after_sb_0p);
1739         p->after_sb_1p   = cpu_to_be32(mdev->net_conf->after_sb_1p);
1740         p->after_sb_2p   = cpu_to_be32(mdev->net_conf->after_sb_2p);
1741         p->two_primaries = cpu_to_be32(mdev->net_conf->two_primaries);
1742
1743         cf = 0;
1744         if (mdev->net_conf->want_lose)
1745                 cf |= CF_WANT_LOSE;
1746         if (mdev->net_conf->dry_run) {
1747                 if (mdev->agreed_pro_version >= 92)
1748                         cf |= CF_DRY_RUN;
1749                 else {
1750                         dev_err(DEV, "--dry-run is not supported by peer");
1751                         kfree(p);
1752                         return 0;
1753                 }
1754         }
1755         p->conn_flags    = cpu_to_be32(cf);
1756
1757         if (mdev->agreed_pro_version >= 87)
1758                 strcpy(p->integrity_alg, mdev->net_conf->integrity_alg);
1759
1760         rv = drbd_send_cmd(mdev, USE_DATA_SOCKET, P_PROTOCOL,
1761                            (struct p_header *)p, size);
1762         kfree(p);
1763         return rv;
1764 }
1765
1766 int _drbd_send_uuids(struct drbd_conf *mdev, u64 uuid_flags)
1767 {
1768         struct p_uuids p;
1769         int i;
1770
1771         if (!get_ldev_if_state(mdev, D_NEGOTIATING))
1772                 return 1;
1773
1774         for (i = UI_CURRENT; i < UI_SIZE; i++)
1775                 p.uuid[i] = mdev->ldev ? cpu_to_be64(mdev->ldev->md.uuid[i]) : 0;
1776
1777         mdev->comm_bm_set = drbd_bm_total_weight(mdev);
1778         p.uuid[UI_SIZE] = cpu_to_be64(mdev->comm_bm_set);
1779         uuid_flags |= mdev->net_conf->want_lose ? 1 : 0;
1780         uuid_flags |= test_bit(CRASHED_PRIMARY, &mdev->flags) ? 2 : 0;
1781         uuid_flags |= mdev->new_state_tmp.disk == D_INCONSISTENT ? 4 : 0;
1782         p.uuid[UI_FLAGS] = cpu_to_be64(uuid_flags);
1783
1784         put_ldev(mdev);
1785
1786         return drbd_send_cmd(mdev, USE_DATA_SOCKET, P_UUIDS,
1787                              (struct p_header *)&p, sizeof(p));
1788 }
1789
1790 int drbd_send_uuids(struct drbd_conf *mdev)
1791 {
1792         return _drbd_send_uuids(mdev, 0);
1793 }
1794
1795 int drbd_send_uuids_skip_initial_sync(struct drbd_conf *mdev)
1796 {
1797         return _drbd_send_uuids(mdev, 8);
1798 }
1799
1800
1801 int drbd_send_sync_uuid(struct drbd_conf *mdev, u64 val)
1802 {
1803         struct p_rs_uuid p;
1804
1805         p.uuid = cpu_to_be64(val);
1806
1807         return drbd_send_cmd(mdev, USE_DATA_SOCKET, P_SYNC_UUID,
1808                              (struct p_header *)&p, sizeof(p));
1809 }
1810
1811 int drbd_send_sizes(struct drbd_conf *mdev, int trigger_reply, enum dds_flags flags)
1812 {
1813         struct p_sizes p;
1814         sector_t d_size, u_size;
1815         int q_order_type;
1816         int ok;
1817
1818         if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
1819                 D_ASSERT(mdev->ldev->backing_bdev);
1820                 d_size = drbd_get_max_capacity(mdev->ldev);
1821                 u_size = mdev->ldev->dc.disk_size;
1822                 q_order_type = drbd_queue_order_type(mdev);
1823                 put_ldev(mdev);
1824         } else {
1825                 d_size = 0;
1826                 u_size = 0;
1827                 q_order_type = QUEUE_ORDERED_NONE;
1828         }
1829
1830         p.d_size = cpu_to_be64(d_size);
1831         p.u_size = cpu_to_be64(u_size);
1832         p.c_size = cpu_to_be64(trigger_reply ? 0 : drbd_get_capacity(mdev->this_bdev));
1833         p.max_segment_size = cpu_to_be32(queue_max_segment_size(mdev->rq_queue));
1834         p.queue_order_type = cpu_to_be16(q_order_type);
1835         p.dds_flags = cpu_to_be16(flags);
1836
1837         ok = drbd_send_cmd(mdev, USE_DATA_SOCKET, P_SIZES,
1838                            (struct p_header *)&p, sizeof(p));
1839         return ok;
1840 }
1841
1842 /**
1843  * drbd_send_state() - Sends the drbd state to the peer
1844  * @mdev:       DRBD device.
1845  */
1846 int drbd_send_state(struct drbd_conf *mdev)
1847 {
1848         struct socket *sock;
1849         struct p_state p;
1850         int ok = 0;
1851
1852         /* Grab state lock so we wont send state if we're in the middle
1853          * of a cluster wide state change on another thread */
1854         drbd_state_lock(mdev);
1855
1856         mutex_lock(&mdev->data.mutex);
1857
1858         p.state = cpu_to_be32(mdev->state.i); /* Within the send mutex */
1859         sock = mdev->data.socket;
1860
1861         if (likely(sock != NULL)) {
1862                 ok = _drbd_send_cmd(mdev, sock, P_STATE,
1863                                     (struct p_header *)&p, sizeof(p), 0);
1864         }
1865
1866         mutex_unlock(&mdev->data.mutex);
1867
1868         drbd_state_unlock(mdev);
1869         return ok;
1870 }
1871
1872 int drbd_send_state_req(struct drbd_conf *mdev,
1873         union drbd_state mask, union drbd_state val)
1874 {
1875         struct p_req_state p;
1876
1877         p.mask    = cpu_to_be32(mask.i);
1878         p.val     = cpu_to_be32(val.i);
1879
1880         return drbd_send_cmd(mdev, USE_DATA_SOCKET, P_STATE_CHG_REQ,
1881                              (struct p_header *)&p, sizeof(p));
1882 }
1883
1884 int drbd_send_sr_reply(struct drbd_conf *mdev, int retcode)
1885 {
1886         struct p_req_state_reply p;
1887
1888         p.retcode    = cpu_to_be32(retcode);
1889
1890         return drbd_send_cmd(mdev, USE_META_SOCKET, P_STATE_CHG_REPLY,
1891                              (struct p_header *)&p, sizeof(p));
1892 }
1893
1894 int fill_bitmap_rle_bits(struct drbd_conf *mdev,
1895         struct p_compressed_bm *p,
1896         struct bm_xfer_ctx *c)
1897 {
1898         struct bitstream bs;
1899         unsigned long plain_bits;
1900         unsigned long tmp;
1901         unsigned long rl;
1902         unsigned len;
1903         unsigned toggle;
1904         int bits;
1905
1906         /* may we use this feature? */
1907         if ((mdev->sync_conf.use_rle == 0) ||
1908                 (mdev->agreed_pro_version < 90))
1909                         return 0;
1910
1911         if (c->bit_offset >= c->bm_bits)
1912                 return 0; /* nothing to do. */
1913
1914         /* use at most thus many bytes */
1915         bitstream_init(&bs, p->code, BM_PACKET_VLI_BYTES_MAX, 0);
1916         memset(p->code, 0, BM_PACKET_VLI_BYTES_MAX);
1917         /* plain bits covered in this code string */
1918         plain_bits = 0;
1919
1920         /* p->encoding & 0x80 stores whether the first run length is set.
1921          * bit offset is implicit.
1922          * start with toggle == 2 to be able to tell the first iteration */
1923         toggle = 2;
1924
1925         /* see how much plain bits we can stuff into one packet
1926          * using RLE and VLI. */
1927         do {
1928                 tmp = (toggle == 0) ? _drbd_bm_find_next_zero(mdev, c->bit_offset)
1929                                     : _drbd_bm_find_next(mdev, c->bit_offset);
1930                 if (tmp == -1UL)
1931                         tmp = c->bm_bits;
1932                 rl = tmp - c->bit_offset;
1933
1934                 if (toggle == 2) { /* first iteration */
1935                         if (rl == 0) {
1936                                 /* the first checked bit was set,
1937                                  * store start value, */
1938                                 DCBP_set_start(p, 1);
1939                                 /* but skip encoding of zero run length */
1940                                 toggle = !toggle;
1941                                 continue;
1942                         }
1943                         DCBP_set_start(p, 0);
1944                 }
1945
1946                 /* paranoia: catch zero runlength.
1947                  * can only happen if bitmap is modified while we scan it. */
1948                 if (rl == 0) {
1949                         dev_err(DEV, "unexpected zero runlength while encoding bitmap "
1950                             "t:%u bo:%lu\n", toggle, c->bit_offset);
1951                         return -1;
1952                 }
1953
1954                 bits = vli_encode_bits(&bs, rl);
1955                 if (bits == -ENOBUFS) /* buffer full */
1956                         break;
1957                 if (bits <= 0) {
1958                         dev_err(DEV, "error while encoding bitmap: %d\n", bits);
1959                         return 0;
1960                 }
1961
1962                 toggle = !toggle;
1963                 plain_bits += rl;
1964                 c->bit_offset = tmp;
1965         } while (c->bit_offset < c->bm_bits);
1966
1967         len = bs.cur.b - p->code + !!bs.cur.bit;
1968
1969         if (plain_bits < (len << 3)) {
1970                 /* incompressible with this method.
1971                  * we need to rewind both word and bit position. */
1972                 c->bit_offset -= plain_bits;
1973                 bm_xfer_ctx_bit_to_word_offset(c);
1974                 c->bit_offset = c->word_offset * BITS_PER_LONG;
1975                 return 0;
1976         }
1977
1978         /* RLE + VLI was able to compress it just fine.
1979          * update c->word_offset. */
1980         bm_xfer_ctx_bit_to_word_offset(c);
1981
1982         /* store pad_bits */
1983         DCBP_set_pad_bits(p, (8 - bs.cur.bit) & 0x7);
1984
1985         return len;
1986 }
1987
1988 enum { OK, FAILED, DONE }
1989 send_bitmap_rle_or_plain(struct drbd_conf *mdev,
1990         struct p_header *h, struct bm_xfer_ctx *c)
1991 {
1992         struct p_compressed_bm *p = (void*)h;
1993         unsigned long num_words;
1994         int len;
1995         int ok;
1996
1997         len = fill_bitmap_rle_bits(mdev, p, c);
1998
1999         if (len < 0)
2000                 return FAILED;
2001
2002         if (len) {
2003                 DCBP_set_code(p, RLE_VLI_Bits);
2004                 ok = _drbd_send_cmd(mdev, mdev->data.socket, P_COMPRESSED_BITMAP, h,
2005                         sizeof(*p) + len, 0);
2006
2007                 c->packets[0]++;
2008                 c->bytes[0] += sizeof(*p) + len;
2009
2010                 if (c->bit_offset >= c->bm_bits)
2011                         len = 0; /* DONE */
2012         } else {
2013                 /* was not compressible.
2014                  * send a buffer full of plain text bits instead. */
2015                 num_words = min_t(size_t, BM_PACKET_WORDS, c->bm_words - c->word_offset);
2016                 len = num_words * sizeof(long);
2017                 if (len)
2018                         drbd_bm_get_lel(mdev, c->word_offset, num_words, (unsigned long*)h->payload);
2019                 ok = _drbd_send_cmd(mdev, mdev->data.socket, P_BITMAP,
2020                                    h, sizeof(struct p_header) + len, 0);
2021                 c->word_offset += num_words;
2022                 c->bit_offset = c->word_offset * BITS_PER_LONG;
2023
2024                 c->packets[1]++;
2025                 c->bytes[1] += sizeof(struct p_header) + len;
2026
2027                 if (c->bit_offset > c->bm_bits)
2028                         c->bit_offset = c->bm_bits;
2029         }
2030         ok = ok ? ((len == 0) ? DONE : OK) : FAILED;
2031
2032         if (ok == DONE)
2033                 INFO_bm_xfer_stats(mdev, "send", c);
2034         return ok;
2035 }
2036
2037 /* See the comment at receive_bitmap() */
2038 int _drbd_send_bitmap(struct drbd_conf *mdev)
2039 {
2040         struct bm_xfer_ctx c;
2041         struct p_header *p;
2042         int ret;
2043
2044         ERR_IF(!mdev->bitmap) return FALSE;
2045
2046         /* maybe we should use some per thread scratch page,
2047          * and allocate that during initial device creation? */
2048         p = (struct p_header *) __get_free_page(GFP_NOIO);
2049         if (!p) {
2050                 dev_err(DEV, "failed to allocate one page buffer in %s\n", __func__);
2051                 return FALSE;
2052         }
2053
2054         if (get_ldev(mdev)) {
2055                 if (drbd_md_test_flag(mdev->ldev, MDF_FULL_SYNC)) {
2056                         dev_info(DEV, "Writing the whole bitmap, MDF_FullSync was set.\n");
2057                         drbd_bm_set_all(mdev);
2058                         if (drbd_bm_write(mdev)) {
2059                                 /* write_bm did fail! Leave full sync flag set in Meta P_DATA
2060                                  * but otherwise process as per normal - need to tell other
2061                                  * side that a full resync is required! */
2062                                 dev_err(DEV, "Failed to write bitmap to disk!\n");
2063                         } else {
2064                                 drbd_md_clear_flag(mdev, MDF_FULL_SYNC);
2065                                 drbd_md_sync(mdev);
2066                         }
2067                 }
2068                 put_ldev(mdev);
2069         }
2070
2071         c = (struct bm_xfer_ctx) {
2072                 .bm_bits = drbd_bm_bits(mdev),
2073                 .bm_words = drbd_bm_words(mdev),
2074         };
2075
2076         do {
2077                 ret = send_bitmap_rle_or_plain(mdev, p, &c);
2078         } while (ret == OK);
2079
2080         free_page((unsigned long) p);
2081         return (ret == DONE);
2082 }
2083
2084 int drbd_send_bitmap(struct drbd_conf *mdev)
2085 {
2086         int err;
2087
2088         if (!drbd_get_data_sock(mdev))
2089                 return -1;
2090         err = !_drbd_send_bitmap(mdev);
2091         drbd_put_data_sock(mdev);
2092         return err;
2093 }
2094
2095 int drbd_send_b_ack(struct drbd_conf *mdev, u32 barrier_nr, u32 set_size)
2096 {
2097         int ok;
2098         struct p_barrier_ack p;
2099
2100         p.barrier  = barrier_nr;
2101         p.set_size = cpu_to_be32(set_size);
2102
2103         if (mdev->state.conn < C_CONNECTED)
2104                 return FALSE;
2105         ok = drbd_send_cmd(mdev, USE_META_SOCKET, P_BARRIER_ACK,
2106                         (struct p_header *)&p, sizeof(p));
2107         return ok;
2108 }
2109
2110 /**
2111  * _drbd_send_ack() - Sends an ack packet
2112  * @mdev:       DRBD device.
2113  * @cmd:        Packet command code.
2114  * @sector:     sector, needs to be in big endian byte order
2115  * @blksize:    size in byte, needs to be in big endian byte order
2116  * @block_id:   Id, big endian byte order
2117  */
2118 static int _drbd_send_ack(struct drbd_conf *mdev, enum drbd_packets cmd,
2119                           u64 sector,
2120                           u32 blksize,
2121                           u64 block_id)
2122 {
2123         int ok;
2124         struct p_block_ack p;
2125
2126         p.sector   = sector;
2127         p.block_id = block_id;
2128         p.blksize  = blksize;
2129         p.seq_num  = cpu_to_be32(atomic_add_return(1, &mdev->packet_seq));
2130
2131         if (!mdev->meta.socket || mdev->state.conn < C_CONNECTED)
2132                 return FALSE;
2133         ok = drbd_send_cmd(mdev, USE_META_SOCKET, cmd,
2134                                 (struct p_header *)&p, sizeof(p));
2135         return ok;
2136 }
2137
2138 int drbd_send_ack_dp(struct drbd_conf *mdev, enum drbd_packets cmd,
2139                      struct p_data *dp)
2140 {
2141         const int header_size = sizeof(struct p_data)
2142                               - sizeof(struct p_header);
2143         int data_size  = ((struct p_header *)dp)->length - header_size;
2144
2145         return _drbd_send_ack(mdev, cmd, dp->sector, cpu_to_be32(data_size),
2146                               dp->block_id);
2147 }
2148
2149 int drbd_send_ack_rp(struct drbd_conf *mdev, enum drbd_packets cmd,
2150                      struct p_block_req *rp)
2151 {
2152         return _drbd_send_ack(mdev, cmd, rp->sector, rp->blksize, rp->block_id);
2153 }
2154
2155 /**
2156  * drbd_send_ack() - Sends an ack packet
2157  * @mdev:       DRBD device.
2158  * @cmd:        Packet command code.
2159  * @e:          Epoch entry.
2160  */
2161 int drbd_send_ack(struct drbd_conf *mdev,
2162         enum drbd_packets cmd, struct drbd_epoch_entry *e)
2163 {
2164         return _drbd_send_ack(mdev, cmd,
2165                               cpu_to_be64(e->sector),
2166                               cpu_to_be32(e->size),
2167                               e->block_id);
2168 }
2169
2170 /* This function misuses the block_id field to signal if the blocks
2171  * are is sync or not. */
2172 int drbd_send_ack_ex(struct drbd_conf *mdev, enum drbd_packets cmd,
2173                      sector_t sector, int blksize, u64 block_id)
2174 {
2175         return _drbd_send_ack(mdev, cmd,
2176                               cpu_to_be64(sector),
2177                               cpu_to_be32(blksize),
2178                               cpu_to_be64(block_id));
2179 }
2180
2181 int drbd_send_drequest(struct drbd_conf *mdev, int cmd,
2182                        sector_t sector, int size, u64 block_id)
2183 {
2184         int ok;
2185         struct p_block_req p;
2186
2187         p.sector   = cpu_to_be64(sector);
2188         p.block_id = block_id;
2189         p.blksize  = cpu_to_be32(size);
2190
2191         ok = drbd_send_cmd(mdev, USE_DATA_SOCKET, cmd,
2192                                 (struct p_header *)&p, sizeof(p));
2193         return ok;
2194 }
2195
2196 int drbd_send_drequest_csum(struct drbd_conf *mdev,
2197                             sector_t sector, int size,
2198                             void *digest, int digest_size,
2199                             enum drbd_packets cmd)
2200 {
2201         int ok;
2202         struct p_block_req p;
2203
2204         p.sector   = cpu_to_be64(sector);
2205         p.block_id = BE_DRBD_MAGIC + 0xbeef;
2206         p.blksize  = cpu_to_be32(size);
2207
2208         p.head.magic   = BE_DRBD_MAGIC;
2209         p.head.command = cpu_to_be16(cmd);
2210         p.head.length  = cpu_to_be16(sizeof(p) - sizeof(struct p_header) + digest_size);
2211
2212         mutex_lock(&mdev->data.mutex);
2213
2214         ok = (sizeof(p) == drbd_send(mdev, mdev->data.socket, &p, sizeof(p), 0));
2215         ok = ok && (digest_size == drbd_send(mdev, mdev->data.socket, digest, digest_size, 0));
2216
2217         mutex_unlock(&mdev->data.mutex);
2218
2219         return ok;
2220 }
2221
2222 int drbd_send_ov_request(struct drbd_conf *mdev, sector_t sector, int size)
2223 {
2224         int ok;
2225         struct p_block_req p;
2226
2227         p.sector   = cpu_to_be64(sector);
2228         p.block_id = BE_DRBD_MAGIC + 0xbabe;
2229         p.blksize  = cpu_to_be32(size);
2230
2231         ok = drbd_send_cmd(mdev, USE_DATA_SOCKET, P_OV_REQUEST,
2232                            (struct p_header *)&p, sizeof(p));
2233         return ok;
2234 }
2235
2236 /* called on sndtimeo
2237  * returns FALSE if we should retry,
2238  * TRUE if we think connection is dead
2239  */
2240 static int we_should_drop_the_connection(struct drbd_conf *mdev, struct socket *sock)
2241 {
2242         int drop_it;
2243         /* long elapsed = (long)(jiffies - mdev->last_received); */
2244
2245         drop_it =   mdev->meta.socket == sock
2246                 || !mdev->asender.task
2247                 || get_t_state(&mdev->asender) != Running
2248                 || mdev->state.conn < C_CONNECTED;
2249
2250         if (drop_it)
2251                 return TRUE;
2252
2253         drop_it = !--mdev->ko_count;
2254         if (!drop_it) {
2255                 dev_err(DEV, "[%s/%d] sock_sendmsg time expired, ko = %u\n",
2256                        current->comm, current->pid, mdev->ko_count);
2257                 request_ping(mdev);
2258         }
2259
2260         return drop_it; /* && (mdev->state == R_PRIMARY) */;
2261 }
2262
2263 /* The idea of sendpage seems to be to put some kind of reference
2264  * to the page into the skb, and to hand it over to the NIC. In
2265  * this process get_page() gets called.
2266  *
2267  * As soon as the page was really sent over the network put_page()
2268  * gets called by some part of the network layer. [ NIC driver? ]
2269  *
2270  * [ get_page() / put_page() increment/decrement the count. If count
2271  *   reaches 0 the page will be freed. ]
2272  *
2273  * This works nicely with pages from FSs.
2274  * But this means that in protocol A we might signal IO completion too early!
2275  *
2276  * In order not to corrupt data during a resync we must make sure
2277  * that we do not reuse our own buffer pages (EEs) to early, therefore
2278  * we have the net_ee list.
2279  *
2280  * XFS seems to have problems, still, it submits pages with page_count == 0!
2281  * As a workaround, we disable sendpage on pages
2282  * with page_count == 0 or PageSlab.
2283  */
2284 static int _drbd_no_send_page(struct drbd_conf *mdev, struct page *page,
2285                    int offset, size_t size, unsigned msg_flags)
2286 {
2287         int sent = drbd_send(mdev, mdev->data.socket, kmap(page) + offset, size, msg_flags);
2288         kunmap(page);
2289         if (sent == size)
2290                 mdev->send_cnt += size>>9;
2291         return sent == size;
2292 }
2293
2294 static int _drbd_send_page(struct drbd_conf *mdev, struct page *page,
2295                     int offset, size_t size, unsigned msg_flags)
2296 {
2297         mm_segment_t oldfs = get_fs();
2298         int sent, ok;
2299         int len = size;
2300
2301         /* e.g. XFS meta- & log-data is in slab pages, which have a
2302          * page_count of 0 and/or have PageSlab() set.
2303          * we cannot use send_page for those, as that does get_page();
2304          * put_page(); and would cause either a VM_BUG directly, or
2305          * __page_cache_release a page that would actually still be referenced
2306          * by someone, leading to some obscure delayed Oops somewhere else. */
2307         if (disable_sendpage || (page_count(page) < 1) || PageSlab(page))
2308                 return _drbd_no_send_page(mdev, page, offset, size, msg_flags);
2309
2310         msg_flags |= MSG_NOSIGNAL;
2311         drbd_update_congested(mdev);
2312         set_fs(KERNEL_DS);
2313         do {
2314                 sent = mdev->data.socket->ops->sendpage(mdev->data.socket, page,
2315                                                         offset, len,
2316                                                         msg_flags);
2317                 if (sent == -EAGAIN) {
2318                         if (we_should_drop_the_connection(mdev,
2319                                                           mdev->data.socket))
2320                                 break;
2321                         else
2322                                 continue;
2323                 }
2324                 if (sent <= 0) {
2325                         dev_warn(DEV, "%s: size=%d len=%d sent=%d\n",
2326                              __func__, (int)size, len, sent);
2327                         break;
2328                 }
2329                 len    -= sent;
2330                 offset += sent;
2331         } while (len > 0 /* THINK && mdev->cstate >= C_CONNECTED*/);
2332         set_fs(oldfs);
2333         clear_bit(NET_CONGESTED, &mdev->flags);
2334
2335         ok = (len == 0);
2336         if (likely(ok))
2337                 mdev->send_cnt += size>>9;
2338         return ok;
2339 }
2340
2341 static int _drbd_send_bio(struct drbd_conf *mdev, struct bio *bio)
2342 {
2343         struct bio_vec *bvec;
2344         int i;
2345         /* hint all but last page with MSG_MORE */
2346         __bio_for_each_segment(bvec, bio, i, 0) {
2347                 if (!_drbd_no_send_page(mdev, bvec->bv_page,
2348                                      bvec->bv_offset, bvec->bv_len,
2349                                      i == bio->bi_vcnt -1 ? 0 : MSG_MORE))
2350                         return 0;
2351         }
2352         return 1;
2353 }
2354
2355 static int _drbd_send_zc_bio(struct drbd_conf *mdev, struct bio *bio)
2356 {
2357         struct bio_vec *bvec;
2358         int i;
2359         /* hint all but last page with MSG_MORE */
2360         __bio_for_each_segment(bvec, bio, i, 0) {
2361                 if (!_drbd_send_page(mdev, bvec->bv_page,
2362                                      bvec->bv_offset, bvec->bv_len,
2363                                      i == bio->bi_vcnt -1 ? 0 : MSG_MORE))
2364                         return 0;
2365         }
2366         return 1;
2367 }
2368
2369 static int _drbd_send_zc_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
2370 {
2371         struct page *page = e->pages;
2372         unsigned len = e->size;
2373         /* hint all but last page with MSG_MORE */
2374         page_chain_for_each(page) {
2375                 unsigned l = min_t(unsigned, len, PAGE_SIZE);
2376                 if (!_drbd_send_page(mdev, page, 0, l,
2377                                 page_chain_next(page) ? MSG_MORE : 0))
2378                         return 0;
2379                 len -= l;
2380         }
2381         return 1;
2382 }
2383
2384 /* Used to send write requests
2385  * R_PRIMARY -> Peer    (P_DATA)
2386  */
2387 int drbd_send_dblock(struct drbd_conf *mdev, struct drbd_request *req)
2388 {
2389         int ok = 1;
2390         struct p_data p;
2391         unsigned int dp_flags = 0;
2392         void *dgb;
2393         int dgs;
2394
2395         if (!drbd_get_data_sock(mdev))
2396                 return 0;
2397
2398         dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_w_tfm) ?
2399                 crypto_hash_digestsize(mdev->integrity_w_tfm) : 0;
2400
2401         p.head.magic   = BE_DRBD_MAGIC;
2402         p.head.command = cpu_to_be16(P_DATA);
2403         p.head.length  =
2404                 cpu_to_be16(sizeof(p) - sizeof(struct p_header) + dgs + req->size);
2405
2406         p.sector   = cpu_to_be64(req->sector);
2407         p.block_id = (unsigned long)req;
2408         p.seq_num  = cpu_to_be32(req->seq_num =
2409                                  atomic_add_return(1, &mdev->packet_seq));
2410         dp_flags = 0;
2411
2412         /* NOTE: no need to check if barriers supported here as we would
2413          *       not pass the test in make_request_common in that case
2414          */
2415         if (req->master_bio->bi_rw & REQ_HARDBARRIER) {
2416                 dev_err(DEV, "ASSERT FAILED would have set DP_HARDBARRIER\n");
2417                 /* dp_flags |= DP_HARDBARRIER; */
2418         }
2419         if (req->master_bio->bi_rw & REQ_SYNC)
2420                 dp_flags |= DP_RW_SYNC;
2421         /* for now handle SYNCIO and UNPLUG
2422          * as if they still were one and the same flag */
2423         if (req->master_bio->bi_rw & REQ_UNPLUG)
2424                 dp_flags |= DP_RW_SYNC;
2425         if (mdev->state.conn >= C_SYNC_SOURCE &&
2426             mdev->state.conn <= C_PAUSED_SYNC_T)
2427                 dp_flags |= DP_MAY_SET_IN_SYNC;
2428
2429         p.dp_flags = cpu_to_be32(dp_flags);
2430         set_bit(UNPLUG_REMOTE, &mdev->flags);
2431         ok = (sizeof(p) ==
2432                 drbd_send(mdev, mdev->data.socket, &p, sizeof(p), dgs ? MSG_MORE : 0));
2433         if (ok && dgs) {
2434                 dgb = mdev->int_dig_out;
2435                 drbd_csum_bio(mdev, mdev->integrity_w_tfm, req->master_bio, dgb);
2436                 ok = drbd_send(mdev, mdev->data.socket, dgb, dgs, 0);
2437         }
2438         if (ok) {
2439                 if (mdev->net_conf->wire_protocol == DRBD_PROT_A)
2440                         ok = _drbd_send_bio(mdev, req->master_bio);
2441                 else
2442                         ok = _drbd_send_zc_bio(mdev, req->master_bio);
2443         }
2444
2445         drbd_put_data_sock(mdev);
2446
2447         return ok;
2448 }
2449
2450 /* answer packet, used to send data back for read requests:
2451  *  Peer       -> (diskless) R_PRIMARY   (P_DATA_REPLY)
2452  *  C_SYNC_SOURCE -> C_SYNC_TARGET         (P_RS_DATA_REPLY)
2453  */
2454 int drbd_send_block(struct drbd_conf *mdev, enum drbd_packets cmd,
2455                     struct drbd_epoch_entry *e)
2456 {
2457         int ok;
2458         struct p_data p;
2459         void *dgb;
2460         int dgs;
2461
2462         dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_w_tfm) ?
2463                 crypto_hash_digestsize(mdev->integrity_w_tfm) : 0;
2464
2465         p.head.magic   = BE_DRBD_MAGIC;
2466         p.head.command = cpu_to_be16(cmd);
2467         p.head.length  =
2468                 cpu_to_be16(sizeof(p) - sizeof(struct p_header) + dgs + e->size);
2469
2470         p.sector   = cpu_to_be64(e->sector);
2471         p.block_id = e->block_id;
2472         /* p.seq_num  = 0;    No sequence numbers here.. */
2473
2474         /* Only called by our kernel thread.
2475          * This one may be interrupted by DRBD_SIG and/or DRBD_SIGKILL
2476          * in response to admin command or module unload.
2477          */
2478         if (!drbd_get_data_sock(mdev))
2479                 return 0;
2480
2481         ok = sizeof(p) == drbd_send(mdev, mdev->data.socket, &p,
2482                                         sizeof(p), dgs ? MSG_MORE : 0);
2483         if (ok && dgs) {
2484                 dgb = mdev->int_dig_out;
2485                 drbd_csum_ee(mdev, mdev->integrity_w_tfm, e, dgb);
2486                 ok = drbd_send(mdev, mdev->data.socket, dgb, dgs, 0);
2487         }
2488         if (ok)
2489                 ok = _drbd_send_zc_ee(mdev, e);
2490
2491         drbd_put_data_sock(mdev);
2492
2493         return ok;
2494 }
2495
2496 /*
2497   drbd_send distinguishes two cases:
2498
2499   Packets sent via the data socket "sock"
2500   and packets sent via the meta data socket "msock"
2501
2502                     sock                      msock
2503   -----------------+-------------------------+------------------------------
2504   timeout           conf.timeout / 2          conf.timeout / 2
2505   timeout action    send a ping via msock     Abort communication
2506                                               and close all sockets
2507 */
2508
2509 /*
2510  * you must have down()ed the appropriate [m]sock_mutex elsewhere!
2511  */
2512 int drbd_send(struct drbd_conf *mdev, struct socket *sock,
2513               void *buf, size_t size, unsigned msg_flags)
2514 {
2515         struct kvec iov;
2516         struct msghdr msg;
2517         int rv, sent = 0;
2518
2519         if (!sock)
2520                 return -1000;
2521
2522         /* THINK  if (signal_pending) return ... ? */
2523
2524         iov.iov_base = buf;
2525         iov.iov_len  = size;
2526
2527         msg.msg_name       = NULL;
2528         msg.msg_namelen    = 0;
2529         msg.msg_control    = NULL;
2530         msg.msg_controllen = 0;
2531         msg.msg_flags      = msg_flags | MSG_NOSIGNAL;
2532
2533         if (sock == mdev->data.socket) {
2534                 mdev->ko_count = mdev->net_conf->ko_count;
2535                 drbd_update_congested(mdev);
2536         }
2537         do {
2538                 /* STRANGE
2539                  * tcp_sendmsg does _not_ use its size parameter at all ?
2540                  *
2541                  * -EAGAIN on timeout, -EINTR on signal.
2542                  */
2543 /* THINK
2544  * do we need to block DRBD_SIG if sock == &meta.socket ??
2545  * otherwise wake_asender() might interrupt some send_*Ack !
2546  */
2547                 rv = kernel_sendmsg(sock, &msg, &iov, 1, size);
2548                 if (rv == -EAGAIN) {
2549                         if (we_should_drop_the_connection(mdev, sock))
2550                                 break;
2551                         else
2552                                 continue;
2553                 }
2554                 D_ASSERT(rv != 0);
2555                 if (rv == -EINTR) {
2556                         flush_signals(current);
2557                         rv = 0;
2558                 }
2559                 if (rv < 0)
2560                         break;
2561                 sent += rv;
2562                 iov.iov_base += rv;
2563                 iov.iov_len  -= rv;
2564         } while (sent < size);
2565
2566         if (sock == mdev->data.socket)
2567                 clear_bit(NET_CONGESTED, &mdev->flags);
2568
2569         if (rv <= 0) {
2570                 if (rv != -EAGAIN) {
2571                         dev_err(DEV, "%s_sendmsg returned %d\n",
2572                             sock == mdev->meta.socket ? "msock" : "sock",
2573                             rv);
2574                         drbd_force_state(mdev, NS(conn, C_BROKEN_PIPE));
2575                 } else
2576                         drbd_force_state(mdev, NS(conn, C_TIMEOUT));
2577         }
2578
2579         return sent;
2580 }
2581
2582 static int drbd_open(struct block_device *bdev, fmode_t mode)
2583 {
2584         struct drbd_conf *mdev = bdev->bd_disk->private_data;
2585         unsigned long flags;
2586         int rv = 0;
2587
2588         lock_kernel();
2589         spin_lock_irqsave(&mdev->req_lock, flags);
2590         /* to have a stable mdev->state.role
2591          * and no race with updating open_cnt */
2592
2593         if (mdev->state.role != R_PRIMARY) {
2594                 if (mode & FMODE_WRITE)
2595                         rv = -EROFS;
2596                 else if (!allow_oos)
2597                         rv = -EMEDIUMTYPE;
2598         }
2599
2600         if (!rv)
2601                 mdev->open_cnt++;
2602         spin_unlock_irqrestore(&mdev->req_lock, flags);
2603         unlock_kernel();
2604
2605         return rv;
2606 }
2607
2608 static int drbd_release(struct gendisk *gd, fmode_t mode)
2609 {
2610         struct drbd_conf *mdev = gd->private_data;
2611         lock_kernel();
2612         mdev->open_cnt--;
2613         unlock_kernel();
2614         return 0;
2615 }
2616
2617 static void drbd_unplug_fn(struct request_queue *q)
2618 {
2619         struct drbd_conf *mdev = q->queuedata;
2620
2621         /* unplug FIRST */
2622         spin_lock_irq(q->queue_lock);
2623         blk_remove_plug(q);
2624         spin_unlock_irq(q->queue_lock);
2625
2626         /* only if connected */
2627         spin_lock_irq(&mdev->req_lock);
2628         if (mdev->state.pdsk >= D_INCONSISTENT && mdev->state.conn >= C_CONNECTED) {
2629                 D_ASSERT(mdev->state.role == R_PRIMARY);
2630                 if (test_and_clear_bit(UNPLUG_REMOTE, &mdev->flags)) {
2631                         /* add to the data.work queue,
2632                          * unless already queued.
2633                          * XXX this might be a good addition to drbd_queue_work
2634                          * anyways, to detect "double queuing" ... */
2635                         if (list_empty(&mdev->unplug_work.list))
2636                                 drbd_queue_work(&mdev->data.work,
2637                                                 &mdev->unplug_work);
2638                 }
2639         }
2640         spin_unlock_irq(&mdev->req_lock);
2641
2642         if (mdev->state.disk >= D_INCONSISTENT)
2643                 drbd_kick_lo(mdev);
2644 }
2645
2646 static void drbd_set_defaults(struct drbd_conf *mdev)
2647 {
2648         /* This way we get a compile error when sync_conf grows,
2649            and we forgot to initialize it here */
2650         mdev->sync_conf = (struct syncer_conf) {
2651                 /* .rate = */           DRBD_RATE_DEF,
2652                 /* .after = */          DRBD_AFTER_DEF,
2653                 /* .al_extents = */     DRBD_AL_EXTENTS_DEF,
2654                 /* .verify_alg = */     {}, 0,
2655                 /* .cpu_mask = */       {}, 0,
2656                 /* .csums_alg = */      {}, 0,
2657                 /* .use_rle = */        0
2658         };
2659
2660         /* Have to use that way, because the layout differs between
2661            big endian and little endian */
2662         mdev->state = (union drbd_state) {
2663                 { .role = R_SECONDARY,
2664                   .peer = R_UNKNOWN,
2665                   .conn = C_STANDALONE,
2666                   .disk = D_DISKLESS,
2667                   .pdsk = D_UNKNOWN,
2668                   .susp = 0
2669                 } };
2670 }
2671
2672 void drbd_init_set_defaults(struct drbd_conf *mdev)
2673 {
2674         /* the memset(,0,) did most of this.
2675          * note: only assignments, no allocation in here */
2676
2677         drbd_set_defaults(mdev);
2678
2679         /* for now, we do NOT yet support it,
2680          * even though we start some framework
2681          * to eventually support barriers */
2682         set_bit(NO_BARRIER_SUPP, &mdev->flags);
2683
2684         atomic_set(&mdev->ap_bio_cnt, 0);
2685         atomic_set(&mdev->ap_pending_cnt, 0);
2686         atomic_set(&mdev->rs_pending_cnt, 0);
2687         atomic_set(&mdev->unacked_cnt, 0);
2688         atomic_set(&mdev->local_cnt, 0);
2689         atomic_set(&mdev->net_cnt, 0);
2690         atomic_set(&mdev->packet_seq, 0);
2691         atomic_set(&mdev->pp_in_use, 0);
2692
2693         mutex_init(&mdev->md_io_mutex);
2694         mutex_init(&mdev->data.mutex);
2695         mutex_init(&mdev->meta.mutex);
2696         sema_init(&mdev->data.work.s, 0);
2697         sema_init(&mdev->meta.work.s, 0);
2698         mutex_init(&mdev->state_mutex);
2699
2700         spin_lock_init(&mdev->data.work.q_lock);
2701         spin_lock_init(&mdev->meta.work.q_lock);
2702
2703         spin_lock_init(&mdev->al_lock);
2704         spin_lock_init(&mdev->req_lock);
2705         spin_lock_init(&mdev->peer_seq_lock);
2706         spin_lock_init(&mdev->epoch_lock);
2707
2708         INIT_LIST_HEAD(&mdev->active_ee);
2709         INIT_LIST_HEAD(&mdev->sync_ee);
2710         INIT_LIST_HEAD(&mdev->done_ee);
2711         INIT_LIST_HEAD(&mdev->read_ee);
2712         INIT_LIST_HEAD(&mdev->net_ee);
2713         INIT_LIST_HEAD(&mdev->resync_reads);
2714         INIT_LIST_HEAD(&mdev->data.work.q);
2715         INIT_LIST_HEAD(&mdev->meta.work.q);
2716         INIT_LIST_HEAD(&mdev->resync_work.list);
2717         INIT_LIST_HEAD(&mdev->unplug_work.list);
2718         INIT_LIST_HEAD(&mdev->md_sync_work.list);
2719         INIT_LIST_HEAD(&mdev->bm_io_work.w.list);
2720
2721         mdev->resync_work.cb  = w_resync_inactive;
2722         mdev->unplug_work.cb  = w_send_write_hint;
2723         mdev->md_sync_work.cb = w_md_sync;
2724         mdev->bm_io_work.w.cb = w_bitmap_io;
2725         init_timer(&mdev->resync_timer);
2726         init_timer(&mdev->md_sync_timer);
2727         mdev->resync_timer.function = resync_timer_fn;
2728         mdev->resync_timer.data = (unsigned long) mdev;
2729         mdev->md_sync_timer.function = md_sync_timer_fn;
2730         mdev->md_sync_timer.data = (unsigned long) mdev;
2731
2732         init_waitqueue_head(&mdev->misc_wait);
2733         init_waitqueue_head(&mdev->state_wait);
2734         init_waitqueue_head(&mdev->ee_wait);
2735         init_waitqueue_head(&mdev->al_wait);
2736         init_waitqueue_head(&mdev->seq_wait);
2737
2738         drbd_thread_init(mdev, &mdev->receiver, drbdd_init);
2739         drbd_thread_init(mdev, &mdev->worker, drbd_worker);
2740         drbd_thread_init(mdev, &mdev->asender, drbd_asender);
2741
2742         mdev->agreed_pro_version = PRO_VERSION_MAX;
2743         mdev->write_ordering = WO_bio_barrier;
2744         mdev->resync_wenr = LC_FREE;
2745 }
2746
2747 void drbd_mdev_cleanup(struct drbd_conf *mdev)
2748 {
2749         if (mdev->receiver.t_state != None)
2750                 dev_err(DEV, "ASSERT FAILED: receiver t_state == %d expected 0.\n",
2751                                 mdev->receiver.t_state);
2752
2753         /* no need to lock it, I'm the only thread alive */
2754         if (atomic_read(&mdev->current_epoch->epoch_size) !=  0)
2755                 dev_err(DEV, "epoch_size:%d\n", atomic_read(&mdev->current_epoch->epoch_size));
2756         mdev->al_writ_cnt  =
2757         mdev->bm_writ_cnt  =
2758         mdev->read_cnt     =
2759         mdev->recv_cnt     =
2760         mdev->send_cnt     =
2761         mdev->writ_cnt     =
2762         mdev->p_size       =
2763         mdev->rs_start     =
2764         mdev->rs_total     =
2765         mdev->rs_failed    =
2766         mdev->rs_mark_left =
2767         mdev->rs_mark_time = 0;
2768         D_ASSERT(mdev->net_conf == NULL);
2769
2770         drbd_set_my_capacity(mdev, 0);
2771         if (mdev->bitmap) {
2772                 /* maybe never allocated. */
2773                 drbd_bm_resize(mdev, 0, 1);
2774                 drbd_bm_cleanup(mdev);
2775         }
2776
2777         drbd_free_resources(mdev);
2778
2779         /*
2780          * currently we drbd_init_ee only on module load, so
2781          * we may do drbd_release_ee only on module unload!
2782          */
2783         D_ASSERT(list_empty(&mdev->active_ee));
2784         D_ASSERT(list_empty(&mdev->sync_ee));
2785         D_ASSERT(list_empty(&mdev->done_ee));
2786         D_ASSERT(list_empty(&mdev->read_ee));
2787         D_ASSERT(list_empty(&mdev->net_ee));
2788         D_ASSERT(list_empty(&mdev->resync_reads));
2789         D_ASSERT(list_empty(&mdev->data.work.q));
2790         D_ASSERT(list_empty(&mdev->meta.work.q));
2791         D_ASSERT(list_empty(&mdev->resync_work.list));
2792         D_ASSERT(list_empty(&mdev->unplug_work.list));
2793
2794 }
2795
2796
2797 static void drbd_destroy_mempools(void)
2798 {
2799         struct page *page;
2800
2801         while (drbd_pp_pool) {
2802                 page = drbd_pp_pool;
2803                 drbd_pp_pool = (struct page *)page_private(page);
2804                 __free_page(page);
2805                 drbd_pp_vacant--;
2806         }
2807
2808         /* D_ASSERT(atomic_read(&drbd_pp_vacant)==0); */
2809
2810         if (drbd_ee_mempool)
2811                 mempool_destroy(drbd_ee_mempool);
2812         if (drbd_request_mempool)
2813                 mempool_destroy(drbd_request_mempool);
2814         if (drbd_ee_cache)
2815                 kmem_cache_destroy(drbd_ee_cache);
2816         if (drbd_request_cache)
2817                 kmem_cache_destroy(drbd_request_cache);
2818         if (drbd_bm_ext_cache)
2819                 kmem_cache_destroy(drbd_bm_ext_cache);
2820         if (drbd_al_ext_cache)
2821                 kmem_cache_destroy(drbd_al_ext_cache);
2822
2823         drbd_ee_mempool      = NULL;
2824         drbd_request_mempool = NULL;
2825         drbd_ee_cache        = NULL;
2826         drbd_request_cache   = NULL;
2827         drbd_bm_ext_cache    = NULL;
2828         drbd_al_ext_cache    = NULL;
2829
2830         return;
2831 }
2832
2833 static int drbd_create_mempools(void)
2834 {
2835         struct page *page;
2836         const int number = (DRBD_MAX_SEGMENT_SIZE/PAGE_SIZE) * minor_count;
2837         int i;
2838
2839         /* prepare our caches and mempools */
2840         drbd_request_mempool = NULL;
2841         drbd_ee_cache        = NULL;
2842         drbd_request_cache   = NULL;
2843         drbd_bm_ext_cache    = NULL;
2844         drbd_al_ext_cache    = NULL;
2845         drbd_pp_pool         = NULL;
2846
2847         /* caches */
2848         drbd_request_cache = kmem_cache_create(
2849                 "drbd_req", sizeof(struct drbd_request), 0, 0, NULL);
2850         if (drbd_request_cache == NULL)
2851                 goto Enomem;
2852
2853         drbd_ee_cache = kmem_cache_create(
2854                 "drbd_ee", sizeof(struct drbd_epoch_entry), 0, 0, NULL);
2855         if (drbd_ee_cache == NULL)
2856                 goto Enomem;
2857
2858         drbd_bm_ext_cache = kmem_cache_create(
2859                 "drbd_bm", sizeof(struct bm_extent), 0, 0, NULL);
2860         if (drbd_bm_ext_cache == NULL)
2861                 goto Enomem;
2862
2863         drbd_al_ext_cache = kmem_cache_create(
2864                 "drbd_al", sizeof(struct lc_element), 0, 0, NULL);
2865         if (drbd_al_ext_cache == NULL)
2866                 goto Enomem;
2867
2868         /* mempools */
2869         drbd_request_mempool = mempool_create(number,
2870                 mempool_alloc_slab, mempool_free_slab, drbd_request_cache);
2871         if (drbd_request_mempool == NULL)
2872                 goto Enomem;
2873
2874         drbd_ee_mempool = mempool_create(number,
2875                 mempool_alloc_slab, mempool_free_slab, drbd_ee_cache);
2876         if (drbd_request_mempool == NULL)
2877                 goto Enomem;
2878
2879         /* drbd's page pool */
2880         spin_lock_init(&drbd_pp_lock);
2881
2882         for (i = 0; i < number; i++) {
2883                 page = alloc_page(GFP_HIGHUSER);
2884                 if (!page)
2885                         goto Enomem;
2886                 set_page_private(page, (unsigned long)drbd_pp_pool);
2887                 drbd_pp_pool = page;
2888         }
2889         drbd_pp_vacant = number;
2890
2891         return 0;
2892
2893 Enomem:
2894         drbd_destroy_mempools(); /* in case we allocated some */
2895         return -ENOMEM;
2896 }
2897
2898 static int drbd_notify_sys(struct notifier_block *this, unsigned long code,
2899         void *unused)
2900 {
2901         /* just so we have it.  you never know what interesting things we
2902          * might want to do here some day...
2903          */
2904
2905         return NOTIFY_DONE;
2906 }
2907
2908 static struct notifier_block drbd_notifier = {
2909         .notifier_call = drbd_notify_sys,
2910 };
2911
2912 static void drbd_release_ee_lists(struct drbd_conf *mdev)
2913 {
2914         int rr;
2915
2916         rr = drbd_release_ee(mdev, &mdev->active_ee);
2917         if (rr)
2918                 dev_err(DEV, "%d EEs in active list found!\n", rr);
2919
2920         rr = drbd_release_ee(mdev, &mdev->sync_ee);
2921         if (rr)
2922                 dev_err(DEV, "%d EEs in sync list found!\n", rr);
2923
2924         rr = drbd_release_ee(mdev, &mdev->read_ee);
2925         if (rr)
2926                 dev_err(DEV, "%d EEs in read list found!\n", rr);
2927
2928         rr = drbd_release_ee(mdev, &mdev->done_ee);
2929         if (rr)
2930                 dev_err(DEV, "%d EEs in done list found!\n", rr);
2931
2932         rr = drbd_release_ee(mdev, &mdev->net_ee);
2933         if (rr)
2934                 dev_err(DEV, "%d EEs in net list found!\n", rr);
2935 }
2936
2937 /* caution. no locking.
2938  * currently only used from module cleanup code. */
2939 static void drbd_delete_device(unsigned int minor)
2940 {
2941         struct drbd_conf *mdev = minor_to_mdev(minor);
2942
2943         if (!mdev)
2944                 return;
2945
2946         /* paranoia asserts */
2947         if (mdev->open_cnt != 0)
2948                 dev_err(DEV, "open_cnt = %d in %s:%u", mdev->open_cnt,
2949                                 __FILE__ , __LINE__);
2950
2951         ERR_IF (!list_empty(&mdev->data.work.q)) {
2952                 struct list_head *lp;
2953                 list_for_each(lp, &mdev->data.work.q) {
2954                         dev_err(DEV, "lp = %p\n", lp);
2955                 }
2956         };
2957         /* end paranoia asserts */
2958
2959         del_gendisk(mdev->vdisk);
2960
2961         /* cleanup stuff that may have been allocated during
2962          * device (re-)configuration or state changes */
2963
2964         if (mdev->this_bdev)
2965                 bdput(mdev->this_bdev);
2966
2967         drbd_free_resources(mdev);
2968
2969         drbd_release_ee_lists(mdev);
2970
2971         /* should be free'd on disconnect? */
2972         kfree(mdev->ee_hash);
2973         /*
2974         mdev->ee_hash_s = 0;
2975         mdev->ee_hash = NULL;
2976         */
2977
2978         lc_destroy(mdev->act_log);
2979         lc_destroy(mdev->resync);
2980
2981         kfree(mdev->p_uuid);
2982         /* mdev->p_uuid = NULL; */
2983
2984         kfree(mdev->int_dig_out);
2985         kfree(mdev->int_dig_in);
2986         kfree(mdev->int_dig_vv);
2987
2988         /* cleanup the rest that has been
2989          * allocated from drbd_new_device
2990          * and actually free the mdev itself */
2991         drbd_free_mdev(mdev);
2992 }
2993
2994 static void drbd_cleanup(void)
2995 {
2996         unsigned int i;
2997
2998         unregister_reboot_notifier(&drbd_notifier);
2999
3000         drbd_nl_cleanup();
3001
3002         if (minor_table) {
3003                 if (drbd_proc)
3004                         remove_proc_entry("drbd", NULL);
3005                 i = minor_count;
3006                 while (i--)
3007                         drbd_delete_device(i);
3008                 drbd_destroy_mempools();
3009         }
3010
3011         kfree(minor_table);
3012
3013         unregister_blkdev(DRBD_MAJOR, "drbd");
3014
3015         printk(KERN_INFO "drbd: module cleanup done.\n");
3016 }
3017
3018 /**
3019  * drbd_congested() - Callback for pdflush
3020  * @congested_data:     User data
3021  * @bdi_bits:           Bits pdflush is currently interested in
3022  *
3023  * Returns 1<<BDI_async_congested and/or 1<<BDI_sync_congested if we are congested.
3024  */
3025 static int drbd_congested(void *congested_data, int bdi_bits)
3026 {
3027         struct drbd_conf *mdev = congested_data;
3028         struct request_queue *q;
3029         char reason = '-';
3030         int r = 0;
3031
3032         if (!__inc_ap_bio_cond(mdev)) {
3033                 /* DRBD has frozen IO */
3034                 r = bdi_bits;
3035                 reason = 'd';
3036                 goto out;
3037         }
3038
3039         if (get_ldev(mdev)) {
3040                 q = bdev_get_queue(mdev->ldev->backing_bdev);
3041                 r = bdi_congested(&q->backing_dev_info, bdi_bits);
3042                 put_ldev(mdev);
3043                 if (r)
3044                         reason = 'b';
3045         }
3046
3047         if (bdi_bits & (1 << BDI_async_congested) && test_bit(NET_CONGESTED, &mdev->flags)) {
3048                 r |= (1 << BDI_async_congested);
3049                 reason = reason == 'b' ? 'a' : 'n';
3050         }
3051
3052 out:
3053         mdev->congestion_reason = reason;
3054         return r;
3055 }
3056
3057 struct drbd_conf *drbd_new_device(unsigned int minor)
3058 {
3059         struct drbd_conf *mdev;
3060         struct gendisk *disk;
3061         struct request_queue *q;
3062
3063         /* GFP_KERNEL, we are outside of all write-out paths */
3064         mdev = kzalloc(sizeof(struct drbd_conf), GFP_KERNEL);
3065         if (!mdev)
3066                 return NULL;
3067         if (!zalloc_cpumask_var(&mdev->cpu_mask, GFP_KERNEL))
3068                 goto out_no_cpumask;
3069
3070         mdev->minor = minor;
3071
3072         drbd_init_set_defaults(mdev);
3073
3074         q = blk_alloc_queue(GFP_KERNEL);
3075         if (!q)
3076                 goto out_no_q;
3077         mdev->rq_queue = q;
3078         q->queuedata   = mdev;
3079
3080         disk = alloc_disk(1);
3081         if (!disk)
3082                 goto out_no_disk;
3083         mdev->vdisk = disk;
3084
3085         set_disk_ro(disk, TRUE);
3086
3087         disk->queue = q;
3088         disk->major = DRBD_MAJOR;
3089         disk->first_minor = minor;
3090         disk->fops = &drbd_ops;
3091         sprintf(disk->disk_name, "drbd%d", minor);
3092         disk->private_data = mdev;
3093
3094         mdev->this_bdev = bdget(MKDEV(DRBD_MAJOR, minor));
3095         /* we have no partitions. we contain only ourselves. */
3096         mdev->this_bdev->bd_contains = mdev->this_bdev;
3097
3098         q->backing_dev_info.congested_fn = drbd_congested;
3099         q->backing_dev_info.congested_data = mdev;
3100
3101         blk_queue_make_request(q, drbd_make_request_26);
3102         blk_queue_max_segment_size(q, DRBD_MAX_SEGMENT_SIZE);
3103         blk_queue_bounce_limit(q, BLK_BOUNCE_ANY);
3104         blk_queue_merge_bvec(q, drbd_merge_bvec);
3105         q->queue_lock = &mdev->req_lock; /* needed since we use */
3106                 /* plugging on a queue, that actually has no requests! */
3107         q->unplug_fn = drbd_unplug_fn;
3108
3109         mdev->md_io_page = alloc_page(GFP_KERNEL);
3110         if (!mdev->md_io_page)
3111                 goto out_no_io_page;
3112
3113         if (drbd_bm_init(mdev))
3114                 goto out_no_bitmap;
3115         /* no need to lock access, we are still initializing this minor device. */
3116         if (!tl_init(mdev))
3117                 goto out_no_tl;
3118
3119         mdev->app_reads_hash = kzalloc(APP_R_HSIZE*sizeof(void *), GFP_KERNEL);
3120         if (!mdev->app_reads_hash)
3121                 goto out_no_app_reads;
3122
3123         mdev->current_epoch = kzalloc(sizeof(struct drbd_epoch), GFP_KERNEL);
3124         if (!mdev->current_epoch)
3125                 goto out_no_epoch;
3126
3127         INIT_LIST_HEAD(&mdev->current_epoch->list);
3128         mdev->epochs = 1;
3129
3130         return mdev;
3131
3132 /* out_whatever_else:
3133         kfree(mdev->current_epoch); */
3134 out_no_epoch:
3135         kfree(mdev->app_reads_hash);
3136 out_no_app_reads:
3137         tl_cleanup(mdev);
3138 out_no_tl:
3139         drbd_bm_cleanup(mdev);
3140 out_no_bitmap:
3141         __free_page(mdev->md_io_page);
3142 out_no_io_page:
3143         put_disk(disk);
3144 out_no_disk:
3145         blk_cleanup_queue(q);
3146 out_no_q:
3147         free_cpumask_var(mdev->cpu_mask);
3148 out_no_cpumask:
3149         kfree(mdev);
3150         return NULL;
3151 }
3152
3153 /* counterpart of drbd_new_device.
3154  * last part of drbd_delete_device. */
3155 void drbd_free_mdev(struct drbd_conf *mdev)
3156 {
3157         kfree(mdev->current_epoch);
3158         kfree(mdev->app_reads_hash);
3159         tl_cleanup(mdev);
3160         if (mdev->bitmap) /* should no longer be there. */
3161                 drbd_bm_cleanup(mdev);
3162         __free_page(mdev->md_io_page);
3163         put_disk(mdev->vdisk);
3164         blk_cleanup_queue(mdev->rq_queue);
3165         free_cpumask_var(mdev->cpu_mask);
3166         kfree(mdev);
3167 }
3168
3169
3170 int __init drbd_init(void)
3171 {
3172         int err;
3173
3174         if (sizeof(struct p_handshake) != 80) {
3175                 printk(KERN_ERR
3176                        "drbd: never change the size or layout "
3177                        "of the HandShake packet.\n");
3178                 return -EINVAL;
3179         }
3180
3181         if (1 > minor_count || minor_count > 255) {
3182                 printk(KERN_ERR
3183                         "drbd: invalid minor_count (%d)\n", minor_count);
3184 #ifdef MODULE
3185                 return -EINVAL;
3186 #else
3187                 minor_count = 8;
3188 #endif
3189         }
3190
3191         err = drbd_nl_init();
3192         if (err)
3193                 return err;
3194
3195         err = register_blkdev(DRBD_MAJOR, "drbd");
3196         if (err) {
3197                 printk(KERN_ERR
3198                        "drbd: unable to register block device major %d\n",
3199                        DRBD_MAJOR);
3200                 return err;
3201         }
3202
3203         register_reboot_notifier(&drbd_notifier);
3204
3205         /*
3206          * allocate all necessary structs
3207          */
3208         err = -ENOMEM;
3209
3210         init_waitqueue_head(&drbd_pp_wait);
3211
3212         drbd_proc = NULL; /* play safe for drbd_cleanup */
3213         minor_table = kzalloc(sizeof(struct drbd_conf *)*minor_count,
3214                                 GFP_KERNEL);
3215         if (!minor_table)
3216                 goto Enomem;
3217
3218         err = drbd_create_mempools();
3219         if (err)
3220                 goto Enomem;
3221
3222         drbd_proc = proc_create_data("drbd", S_IFREG | S_IRUGO , NULL, &drbd_proc_fops, NULL);
3223         if (!drbd_proc) {
3224                 printk(KERN_ERR "drbd: unable to register proc file\n");
3225                 goto Enomem;
3226         }
3227
3228         rwlock_init(&global_state_lock);
3229
3230         printk(KERN_INFO "drbd: initialized. "
3231                "Version: " REL_VERSION " (api:%d/proto:%d-%d)\n",
3232                API_VERSION, PRO_VERSION_MIN, PRO_VERSION_MAX);
3233         printk(KERN_INFO "drbd: %s\n", drbd_buildtag());
3234         printk(KERN_INFO "drbd: registered as block device major %d\n",
3235                 DRBD_MAJOR);
3236         printk(KERN_INFO "drbd: minor_table @ 0x%p\n", minor_table);
3237
3238         return 0; /* Success! */
3239
3240 Enomem:
3241         drbd_cleanup();
3242         if (err == -ENOMEM)
3243                 /* currently always the case */
3244                 printk(KERN_ERR "drbd: ran out of memory\n");
3245         else
3246                 printk(KERN_ERR "drbd: initialization failure\n");
3247         return err;
3248 }
3249
3250 void drbd_free_bc(struct drbd_backing_dev *ldev)
3251 {
3252         if (ldev == NULL)
3253                 return;
3254
3255         bd_release(ldev->backing_bdev);
3256         bd_release(ldev->md_bdev);
3257
3258         fput(ldev->lo_file);
3259         fput(ldev->md_file);
3260
3261         kfree(ldev);
3262 }
3263
3264 void drbd_free_sock(struct drbd_conf *mdev)
3265 {
3266         if (mdev->data.socket) {
3267                 mutex_lock(&mdev->data.mutex);
3268                 kernel_sock_shutdown(mdev->data.socket, SHUT_RDWR);
3269                 sock_release(mdev->data.socket);
3270                 mdev->data.socket = NULL;
3271                 mutex_unlock(&mdev->data.mutex);
3272         }
3273         if (mdev->meta.socket) {
3274                 mutex_lock(&mdev->meta.mutex);
3275                 kernel_sock_shutdown(mdev->meta.socket, SHUT_RDWR);
3276                 sock_release(mdev->meta.socket);
3277                 mdev->meta.socket = NULL;
3278                 mutex_unlock(&mdev->meta.mutex);
3279         }
3280 }
3281
3282
3283 void drbd_free_resources(struct drbd_conf *mdev)
3284 {
3285         crypto_free_hash(mdev->csums_tfm);
3286         mdev->csums_tfm = NULL;
3287         crypto_free_hash(mdev->verify_tfm);
3288         mdev->verify_tfm = NULL;
3289         crypto_free_hash(mdev->cram_hmac_tfm);
3290         mdev->cram_hmac_tfm = NULL;
3291         crypto_free_hash(mdev->integrity_w_tfm);
3292         mdev->integrity_w_tfm = NULL;
3293         crypto_free_hash(mdev->integrity_r_tfm);
3294         mdev->integrity_r_tfm = NULL;
3295
3296         drbd_free_sock(mdev);
3297
3298         __no_warn(local,
3299                   drbd_free_bc(mdev->ldev);
3300                   mdev->ldev = NULL;);
3301 }
3302
3303 /* meta data management */
3304
3305 struct meta_data_on_disk {
3306         u64 la_size;           /* last agreed size. */
3307         u64 uuid[UI_SIZE];   /* UUIDs. */
3308         u64 device_uuid;
3309         u64 reserved_u64_1;
3310         u32 flags;             /* MDF */
3311         u32 magic;
3312         u32 md_size_sect;
3313         u32 al_offset;         /* offset to this block */
3314         u32 al_nr_extents;     /* important for restoring the AL */
3315               /* `-- act_log->nr_elements <-- sync_conf.al_extents */
3316         u32 bm_offset;         /* offset to the bitmap, from here */
3317         u32 bm_bytes_per_bit;  /* BM_BLOCK_SIZE */
3318         u32 reserved_u32[4];
3319
3320 } __packed;
3321
3322 /**
3323  * drbd_md_sync() - Writes the meta data super block if the MD_DIRTY flag bit is set
3324  * @mdev:       DRBD device.
3325  */
3326 void drbd_md_sync(struct drbd_conf *mdev)
3327 {
3328         struct meta_data_on_disk *buffer;
3329         sector_t sector;
3330         int i;
3331
3332         if (!test_and_clear_bit(MD_DIRTY, &mdev->flags))
3333                 return;
3334         del_timer(&mdev->md_sync_timer);
3335
3336         /* We use here D_FAILED and not D_ATTACHING because we try to write
3337          * metadata even if we detach due to a disk failure! */
3338         if (!get_ldev_if_state(mdev, D_FAILED))
3339                 return;
3340
3341         mutex_lock(&mdev->md_io_mutex);
3342         buffer = (struct meta_data_on_disk *)page_address(mdev->md_io_page);
3343         memset(buffer, 0, 512);
3344
3345         buffer->la_size = cpu_to_be64(drbd_get_capacity(mdev->this_bdev));
3346         for (i = UI_CURRENT; i < UI_SIZE; i++)
3347                 buffer->uuid[i] = cpu_to_be64(mdev->ldev->md.uuid[i]);
3348         buffer->flags = cpu_to_be32(mdev->ldev->md.flags);
3349         buffer->magic = cpu_to_be32(DRBD_MD_MAGIC);
3350
3351         buffer->md_size_sect  = cpu_to_be32(mdev->ldev->md.md_size_sect);
3352         buffer->al_offset     = cpu_to_be32(mdev->ldev->md.al_offset);
3353         buffer->al_nr_extents = cpu_to_be32(mdev->act_log->nr_elements);
3354         buffer->bm_bytes_per_bit = cpu_to_be32(BM_BLOCK_SIZE);
3355         buffer->device_uuid = cpu_to_be64(mdev->ldev->md.device_uuid);
3356
3357         buffer->bm_offset = cpu_to_be32(mdev->ldev->md.bm_offset);
3358
3359         D_ASSERT(drbd_md_ss__(mdev, mdev->ldev) == mdev->ldev->md.md_offset);
3360         sector = mdev->ldev->md.md_offset;
3361
3362         if (drbd_md_sync_page_io(mdev, mdev->ldev, sector, WRITE)) {
3363                 clear_bit(MD_DIRTY, &mdev->flags);
3364         } else {
3365                 /* this was a try anyways ... */
3366                 dev_err(DEV, "meta data update failed!\n");
3367
3368                 drbd_chk_io_error(mdev, 1, TRUE);
3369         }
3370
3371         /* Update mdev->ldev->md.la_size_sect,
3372          * since we updated it on metadata. */
3373         mdev->ldev->md.la_size_sect = drbd_get_capacity(mdev->this_bdev);
3374
3375         mutex_unlock(&mdev->md_io_mutex);
3376         put_ldev(mdev);
3377 }
3378
3379 /**
3380  * drbd_md_read() - Reads in the meta data super block
3381  * @mdev:       DRBD device.
3382  * @bdev:       Device from which the meta data should be read in.
3383  *
3384  * Return 0 (NO_ERROR) on success, and an enum drbd_ret_codes in case
3385  * something goes wrong.  Currently only: ERR_IO_MD_DISK, ERR_MD_INVALID.
3386  */
3387 int drbd_md_read(struct drbd_conf *mdev, struct drbd_backing_dev *bdev)
3388 {
3389         struct meta_data_on_disk *buffer;
3390         int i, rv = NO_ERROR;
3391
3392         if (!get_ldev_if_state(mdev, D_ATTACHING))
3393                 return ERR_IO_MD_DISK;
3394
3395         mutex_lock(&mdev->md_io_mutex);
3396         buffer = (struct meta_data_on_disk *)page_address(mdev->md_io_page);
3397
3398         if (!drbd_md_sync_page_io(mdev, bdev, bdev->md.md_offset, READ)) {
3399                 /* NOTE: cant do normal error processing here as this is
3400                    called BEFORE disk is attached */
3401                 dev_err(DEV, "Error while reading metadata.\n");
3402                 rv = ERR_IO_MD_DISK;
3403                 goto err;
3404         }
3405
3406         if (be32_to_cpu(buffer->magic) != DRBD_MD_MAGIC) {
3407                 dev_err(DEV, "Error while reading metadata, magic not found.\n");
3408                 rv = ERR_MD_INVALID;
3409                 goto err;
3410         }
3411         if (be32_to_cpu(buffer->al_offset) != bdev->md.al_offset) {
3412                 dev_err(DEV, "unexpected al_offset: %d (expected %d)\n",
3413                     be32_to_cpu(buffer->al_offset), bdev->md.al_offset);
3414                 rv = ERR_MD_INVALID;
3415                 goto err;
3416         }
3417         if (be32_to_cpu(buffer->bm_offset) != bdev->md.bm_offset) {
3418                 dev_err(DEV, "unexpected bm_offset: %d (expected %d)\n",
3419                     be32_to_cpu(buffer->bm_offset), bdev->md.bm_offset);
3420                 rv = ERR_MD_INVALID;
3421                 goto err;
3422         }
3423         if (be32_to_cpu(buffer->md_size_sect) != bdev->md.md_size_sect) {
3424                 dev_err(DEV, "unexpected md_size: %u (expected %u)\n",
3425                     be32_to_cpu(buffer->md_size_sect), bdev->md.md_size_sect);
3426                 rv = ERR_MD_INVALID;
3427                 goto err;
3428         }
3429
3430         if (be32_to_cpu(buffer->bm_bytes_per_bit) != BM_BLOCK_SIZE) {
3431                 dev_err(DEV, "unexpected bm_bytes_per_bit: %u (expected %u)\n",
3432                     be32_to_cpu(buffer->bm_bytes_per_bit), BM_BLOCK_SIZE);
3433                 rv = ERR_MD_INVALID;
3434                 goto err;
3435         }
3436
3437         bdev->md.la_size_sect = be64_to_cpu(buffer->la_size);
3438         for (i = UI_CURRENT; i < UI_SIZE; i++)
3439                 bdev->md.uuid[i] = be64_to_cpu(buffer->uuid[i]);
3440         bdev->md.flags = be32_to_cpu(buffer->flags);
3441         mdev->sync_conf.al_extents = be32_to_cpu(buffer->al_nr_extents);
3442         bdev->md.device_uuid = be64_to_cpu(buffer->device_uuid);
3443
3444         if (mdev->sync_conf.al_extents < 7)
3445                 mdev->sync_conf.al_extents = 127;
3446
3447  err:
3448         mutex_unlock(&mdev->md_io_mutex);
3449         put_ldev(mdev);
3450
3451         return rv;
3452 }
3453
3454 /**
3455  * drbd_md_mark_dirty() - Mark meta data super block as dirty
3456  * @mdev:       DRBD device.
3457  *
3458  * Call this function if you change anything that should be written to
3459  * the meta-data super block. This function sets MD_DIRTY, and starts a
3460  * timer that ensures that within five seconds you have to call drbd_md_sync().
3461  */
3462 void drbd_md_mark_dirty(struct drbd_conf *mdev)
3463 {
3464         set_bit(MD_DIRTY, &mdev->flags);
3465         mod_timer(&mdev->md_sync_timer, jiffies + 5*HZ);
3466 }
3467
3468
3469 static void drbd_uuid_move_history(struct drbd_conf *mdev) __must_hold(local)
3470 {
3471         int i;
3472
3473         for (i = UI_HISTORY_START; i < UI_HISTORY_END; i++)
3474                 mdev->ldev->md.uuid[i+1] = mdev->ldev->md.uuid[i];
3475 }
3476
3477 void _drbd_uuid_set(struct drbd_conf *mdev, int idx, u64 val) __must_hold(local)
3478 {
3479         if (idx == UI_CURRENT) {
3480                 if (mdev->state.role == R_PRIMARY)
3481                         val |= 1;
3482                 else
3483                         val &= ~((u64)1);
3484
3485                 drbd_set_ed_uuid(mdev, val);
3486         }
3487
3488         mdev->ldev->md.uuid[idx] = val;
3489         drbd_md_mark_dirty(mdev);
3490 }
3491
3492
3493 void drbd_uuid_set(struct drbd_conf *mdev, int idx, u64 val) __must_hold(local)
3494 {
3495         if (mdev->ldev->md.uuid[idx]) {
3496                 drbd_uuid_move_history(mdev);
3497                 mdev->ldev->md.uuid[UI_HISTORY_START] = mdev->ldev->md.uuid[idx];
3498         }
3499         _drbd_uuid_set(mdev, idx, val);
3500 }
3501
3502 /**
3503  * drbd_uuid_new_current() - Creates a new current UUID
3504  * @mdev:       DRBD device.
3505  *
3506  * Creates a new current UUID, and rotates the old current UUID into
3507  * the bitmap slot. Causes an incremental resync upon next connect.
3508  */
3509 void drbd_uuid_new_current(struct drbd_conf *mdev) __must_hold(local)
3510 {
3511         u64 val;
3512
3513         dev_info(DEV, "Creating new current UUID\n");
3514         D_ASSERT(mdev->ldev->md.uuid[UI_BITMAP] == 0);
3515         mdev->ldev->md.uuid[UI_BITMAP] = mdev->ldev->md.uuid[UI_CURRENT];
3516
3517         get_random_bytes(&val, sizeof(u64));
3518         _drbd_uuid_set(mdev, UI_CURRENT, val);
3519 }
3520
3521 void drbd_uuid_set_bm(struct drbd_conf *mdev, u64 val) __must_hold(local)
3522 {
3523         if (mdev->ldev->md.uuid[UI_BITMAP] == 0 && val == 0)
3524                 return;
3525
3526         if (val == 0) {
3527                 drbd_uuid_move_history(mdev);
3528                 mdev->ldev->md.uuid[UI_HISTORY_START] = mdev->ldev->md.uuid[UI_BITMAP];
3529                 mdev->ldev->md.uuid[UI_BITMAP] = 0;
3530         } else {
3531                 if (mdev->ldev->md.uuid[UI_BITMAP])
3532                         dev_warn(DEV, "bm UUID already set");
3533
3534                 mdev->ldev->md.uuid[UI_BITMAP] = val;
3535                 mdev->ldev->md.uuid[UI_BITMAP] &= ~((u64)1);
3536
3537         }
3538         drbd_md_mark_dirty(mdev);
3539 }
3540
3541 /**
3542  * drbd_bmio_set_n_write() - io_fn for drbd_queue_bitmap_io() or drbd_bitmap_io()
3543  * @mdev:       DRBD device.
3544  *
3545  * Sets all bits in the bitmap and writes the whole bitmap to stable storage.
3546  */
3547 int drbd_bmio_set_n_write(struct drbd_conf *mdev)
3548 {
3549         int rv = -EIO;
3550
3551         if (get_ldev_if_state(mdev, D_ATTACHING)) {
3552                 drbd_md_set_flag(mdev, MDF_FULL_SYNC);
3553                 drbd_md_sync(mdev);
3554                 drbd_bm_set_all(mdev);
3555
3556                 rv = drbd_bm_write(mdev);
3557
3558                 if (!rv) {
3559                         drbd_md_clear_flag(mdev, MDF_FULL_SYNC);
3560                         drbd_md_sync(mdev);
3561                 }
3562
3563                 put_ldev(mdev);
3564         }
3565
3566         return rv;
3567 }
3568
3569 /**
3570  * drbd_bmio_clear_n_write() - io_fn for drbd_queue_bitmap_io() or drbd_bitmap_io()
3571  * @mdev:       DRBD device.
3572  *
3573  * Clears all bits in the bitmap and writes the whole bitmap to stable storage.
3574  */
3575 int drbd_bmio_clear_n_write(struct drbd_conf *mdev)
3576 {
3577         int rv = -EIO;
3578
3579         if (get_ldev_if_state(mdev, D_ATTACHING)) {
3580                 drbd_bm_clear_all(mdev);
3581                 rv = drbd_bm_write(mdev);
3582                 put_ldev(mdev);
3583         }
3584
3585         return rv;
3586 }
3587
3588 static int w_bitmap_io(struct drbd_conf *mdev, struct drbd_work *w, int unused)
3589 {
3590         struct bm_io_work *work = container_of(w, struct bm_io_work, w);
3591         int rv;
3592
3593         D_ASSERT(atomic_read(&mdev->ap_bio_cnt) == 0);
3594
3595         drbd_bm_lock(mdev, work->why);
3596         rv = work->io_fn(mdev);
3597         drbd_bm_unlock(mdev);
3598
3599         clear_bit(BITMAP_IO, &mdev->flags);
3600         wake_up(&mdev->misc_wait);
3601
3602         if (work->done)
3603                 work->done(mdev, rv);
3604
3605         clear_bit(BITMAP_IO_QUEUED, &mdev->flags);
3606         work->why = NULL;
3607
3608         return 1;
3609 }
3610
3611 /**
3612  * drbd_queue_bitmap_io() - Queues an IO operation on the whole bitmap
3613  * @mdev:       DRBD device.
3614  * @io_fn:      IO callback to be called when bitmap IO is possible
3615  * @done:       callback to be called after the bitmap IO was performed
3616  * @why:        Descriptive text of the reason for doing the IO
3617  *
3618  * While IO on the bitmap happens we freeze application IO thus we ensure
3619  * that drbd_set_out_of_sync() can not be called. This function MAY ONLY be
3620  * called from worker context. It MUST NOT be used while a previous such
3621  * work is still pending!
3622  */
3623 void drbd_queue_bitmap_io(struct drbd_conf *mdev,
3624                           int (*io_fn)(struct drbd_conf *),
3625                           void (*done)(struct drbd_conf *, int),
3626                           char *why)
3627 {
3628         D_ASSERT(current == mdev->worker.task);
3629
3630         D_ASSERT(!test_bit(BITMAP_IO_QUEUED, &mdev->flags));
3631         D_ASSERT(!test_bit(BITMAP_IO, &mdev->flags));
3632         D_ASSERT(list_empty(&mdev->bm_io_work.w.list));
3633         if (mdev->bm_io_work.why)
3634                 dev_err(DEV, "FIXME going to queue '%s' but '%s' still pending?\n",
3635                         why, mdev->bm_io_work.why);
3636
3637         mdev->bm_io_work.io_fn = io_fn;
3638         mdev->bm_io_work.done = done;
3639         mdev->bm_io_work.why = why;
3640
3641         set_bit(BITMAP_IO, &mdev->flags);
3642         if (atomic_read(&mdev->ap_bio_cnt) == 0) {
3643                 if (list_empty(&mdev->bm_io_work.w.list)) {
3644                         set_bit(BITMAP_IO_QUEUED, &mdev->flags);
3645                         drbd_queue_work(&mdev->data.work, &mdev->bm_io_work.w);
3646                 } else
3647                         dev_err(DEV, "FIXME avoided double queuing bm_io_work\n");
3648         }
3649 }
3650
3651 /**
3652  * drbd_bitmap_io() -  Does an IO operation on the whole bitmap
3653  * @mdev:       DRBD device.
3654  * @io_fn:      IO callback to be called when bitmap IO is possible
3655  * @why:        Descriptive text of the reason for doing the IO
3656  *
3657  * freezes application IO while that the actual IO operations runs. This
3658  * functions MAY NOT be called from worker context.
3659  */
3660 int drbd_bitmap_io(struct drbd_conf *mdev, int (*io_fn)(struct drbd_conf *), char *why)
3661 {
3662         int rv;
3663
3664         D_ASSERT(current != mdev->worker.task);
3665
3666         drbd_suspend_io(mdev);
3667
3668         drbd_bm_lock(mdev, why);
3669         rv = io_fn(mdev);
3670         drbd_bm_unlock(mdev);
3671
3672         drbd_resume_io(mdev);
3673
3674         return rv;
3675 }
3676
3677 void drbd_md_set_flag(struct drbd_conf *mdev, int flag) __must_hold(local)
3678 {
3679         if ((mdev->ldev->md.flags & flag) != flag) {
3680                 drbd_md_mark_dirty(mdev);
3681                 mdev->ldev->md.flags |= flag;
3682         }
3683 }
3684
3685 void drbd_md_clear_flag(struct drbd_conf *mdev, int flag) __must_hold(local)
3686 {
3687         if ((mdev->ldev->md.flags & flag) != 0) {
3688                 drbd_md_mark_dirty(mdev);
3689                 mdev->ldev->md.flags &= ~flag;
3690         }
3691 }
3692 int drbd_md_test_flag(struct drbd_backing_dev *bdev, int flag)
3693 {
3694         return (bdev->md.flags & flag) != 0;
3695 }
3696
3697 static void md_sync_timer_fn(unsigned long data)
3698 {
3699         struct drbd_conf *mdev = (struct drbd_conf *) data;
3700
3701         drbd_queue_work_front(&mdev->data.work, &mdev->md_sync_work);
3702 }
3703
3704 static int w_md_sync(struct drbd_conf *mdev, struct drbd_work *w, int unused)
3705 {
3706         dev_warn(DEV, "md_sync_timer expired! Worker calls drbd_md_sync().\n");
3707         drbd_md_sync(mdev);
3708
3709         return 1;
3710 }
3711
3712 #ifdef CONFIG_DRBD_FAULT_INJECTION
3713 /* Fault insertion support including random number generator shamelessly
3714  * stolen from kernel/rcutorture.c */
3715 struct fault_random_state {
3716         unsigned long state;
3717         unsigned long count;
3718 };
3719
3720 #define FAULT_RANDOM_MULT 39916801  /* prime */
3721 #define FAULT_RANDOM_ADD        479001701 /* prime */
3722 #define FAULT_RANDOM_REFRESH 10000
3723
3724 /*
3725  * Crude but fast random-number generator.  Uses a linear congruential
3726  * generator, with occasional help from get_random_bytes().
3727  */
3728 static unsigned long
3729 _drbd_fault_random(struct fault_random_state *rsp)
3730 {
3731         long refresh;
3732
3733         if (!rsp->count--) {
3734                 get_random_bytes(&refresh, sizeof(refresh));
3735                 rsp->state += refresh;
3736                 rsp->count = FAULT_RANDOM_REFRESH;
3737         }
3738         rsp->state = rsp->state * FAULT_RANDOM_MULT + FAULT_RANDOM_ADD;
3739         return swahw32(rsp->state);
3740 }
3741
3742 static char *
3743 _drbd_fault_str(unsigned int type) {
3744         static char *_faults[] = {
3745                 [DRBD_FAULT_MD_WR] = "Meta-data write",
3746                 [DRBD_FAULT_MD_RD] = "Meta-data read",
3747                 [DRBD_FAULT_RS_WR] = "Resync write",
3748                 [DRBD_FAULT_RS_RD] = "Resync read",
3749                 [DRBD_FAULT_DT_WR] = "Data write",
3750                 [DRBD_FAULT_DT_RD] = "Data read",
3751                 [DRBD_FAULT_DT_RA] = "Data read ahead",
3752                 [DRBD_FAULT_BM_ALLOC] = "BM allocation",
3753                 [DRBD_FAULT_AL_EE] = "EE allocation",
3754                 [DRBD_FAULT_RECEIVE] = "receive data corruption",
3755         };
3756
3757         return (type < DRBD_FAULT_MAX) ? _faults[type] : "**Unknown**";
3758 }
3759
3760 unsigned int
3761 _drbd_insert_fault(struct drbd_conf *mdev, unsigned int type)
3762 {
3763         static struct fault_random_state rrs = {0, 0};
3764
3765         unsigned int ret = (
3766                 (fault_devs == 0 ||
3767                         ((1 << mdev_to_minor(mdev)) & fault_devs) != 0) &&
3768                 (((_drbd_fault_random(&rrs) % 100) + 1) <= fault_rate));
3769
3770         if (ret) {
3771                 fault_count++;
3772
3773                 if (__ratelimit(&drbd_ratelimit_state))
3774                         dev_warn(DEV, "***Simulating %s failure\n",
3775                                 _drbd_fault_str(type));
3776         }
3777
3778         return ret;
3779 }
3780 #endif
3781
3782 const char *drbd_buildtag(void)
3783 {
3784         /* DRBD built from external sources has here a reference to the
3785            git hash of the source code. */
3786
3787         static char buildtag[38] = "\0uilt-in";
3788
3789         if (buildtag[0] == 0) {
3790 #ifdef CONFIG_MODULES
3791                 if (THIS_MODULE != NULL)
3792                         sprintf(buildtag, "srcversion: %-24s", THIS_MODULE->srcversion);
3793                 else
3794 #endif
3795                         buildtag[0] = 'b';
3796         }
3797
3798         return buildtag;
3799 }
3800
3801 module_init(drbd_init)
3802 module_exit(drbd_cleanup)
3803
3804 EXPORT_SYMBOL(drbd_conn_str);
3805 EXPORT_SYMBOL(drbd_role_str);
3806 EXPORT_SYMBOL(drbd_disk_str);
3807 EXPORT_SYMBOL(drbd_set_st_err_str);