]> git.karo-electronics.de Git - karo-tx-linux.git/blob - drivers/block/drbd/drbd_main.c
drbd: Implemented conn_send_state_req()
[karo-tx-linux.git] / drivers / block / drbd / drbd_main.c
1 /*
2    drbd.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    Thanks to Carter Burden, Bart Grantham and Gennadiy Nerubayev
11    from Logicworks, Inc. for making SDP replication support possible.
12
13    drbd is free software; you can redistribute it and/or modify
14    it under the terms of the GNU General Public License as published by
15    the Free Software Foundation; either version 2, or (at your option)
16    any later version.
17
18    drbd is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
21    GNU General Public License for more details.
22
23    You should have received a copy of the GNU General Public License
24    along with drbd; see the file COPYING.  If not, write to
25    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
26
27  */
28
29 #include <linux/module.h>
30 #include <linux/drbd.h>
31 #include <asm/uaccess.h>
32 #include <asm/types.h>
33 #include <net/sock.h>
34 #include <linux/ctype.h>
35 #include <linux/mutex.h>
36 #include <linux/fs.h>
37 #include <linux/file.h>
38 #include <linux/proc_fs.h>
39 #include <linux/init.h>
40 #include <linux/mm.h>
41 #include <linux/memcontrol.h>
42 #include <linux/mm_inline.h>
43 #include <linux/slab.h>
44 #include <linux/random.h>
45 #include <linux/reboot.h>
46 #include <linux/notifier.h>
47 #include <linux/kthread.h>
48
49 #define __KERNEL_SYSCALLS__
50 #include <linux/unistd.h>
51 #include <linux/vmalloc.h>
52
53 #include <linux/drbd_limits.h>
54 #include "drbd_int.h"
55 #include "drbd_req.h" /* only for _req_mod in tl_release and tl_clear */
56
57 #include "drbd_vli.h"
58
59 static DEFINE_MUTEX(drbd_main_mutex);
60 int drbdd_init(struct drbd_thread *);
61 int drbd_worker(struct drbd_thread *);
62 int drbd_asender(struct drbd_thread *);
63
64 int drbd_init(void);
65 static int drbd_open(struct block_device *bdev, fmode_t mode);
66 static int drbd_release(struct gendisk *gd, fmode_t mode);
67 static int w_md_sync(struct drbd_work *w, int unused);
68 static void md_sync_timer_fn(unsigned long data);
69 static int w_bitmap_io(struct drbd_work *w, int unused);
70 static int w_go_diskless(struct drbd_work *w, int unused);
71
72 MODULE_AUTHOR("Philipp Reisner <phil@linbit.com>, "
73               "Lars Ellenberg <lars@linbit.com>");
74 MODULE_DESCRIPTION("drbd - Distributed Replicated Block Device v" REL_VERSION);
75 MODULE_VERSION(REL_VERSION);
76 MODULE_LICENSE("GPL");
77 MODULE_PARM_DESC(minor_count, "Maximum number of drbd devices ("
78                  __stringify(DRBD_MINOR_COUNT_MIN) "-" __stringify(DRBD_MINOR_COUNT_MAX) ")");
79 MODULE_ALIAS_BLOCKDEV_MAJOR(DRBD_MAJOR);
80
81 #include <linux/moduleparam.h>
82 /* allow_open_on_secondary */
83 MODULE_PARM_DESC(allow_oos, "DONT USE!");
84 /* thanks to these macros, if compiled into the kernel (not-module),
85  * this becomes the boot parameter drbd.minor_count */
86 module_param(minor_count, uint, 0444);
87 module_param(disable_sendpage, bool, 0644);
88 module_param(allow_oos, bool, 0);
89 module_param(cn_idx, uint, 0444);
90 module_param(proc_details, int, 0644);
91
92 #ifdef CONFIG_DRBD_FAULT_INJECTION
93 int enable_faults;
94 int fault_rate;
95 static int fault_count;
96 int fault_devs;
97 /* bitmap of enabled faults */
98 module_param(enable_faults, int, 0664);
99 /* fault rate % value - applies to all enabled faults */
100 module_param(fault_rate, int, 0664);
101 /* count of faults inserted */
102 module_param(fault_count, int, 0664);
103 /* bitmap of devices to insert faults on */
104 module_param(fault_devs, int, 0644);
105 #endif
106
107 /* module parameter, defined */
108 unsigned int minor_count = DRBD_MINOR_COUNT_DEF;
109 int disable_sendpage;
110 int allow_oos;
111 unsigned int cn_idx = CN_IDX_DRBD;
112 int proc_details;       /* Detail level in proc drbd*/
113
114 /* Module parameter for setting the user mode helper program
115  * to run. Default is /sbin/drbdadm */
116 char usermode_helper[80] = "/sbin/drbdadm";
117
118 module_param_string(usermode_helper, usermode_helper, sizeof(usermode_helper), 0644);
119
120 /* in 2.6.x, our device mapping and config info contains our virtual gendisks
121  * as member "struct gendisk *vdisk;"
122  */
123 struct drbd_conf **minor_table;
124 struct list_head drbd_tconns;  /* list of struct drbd_tconn */
125
126 struct kmem_cache *drbd_request_cache;
127 struct kmem_cache *drbd_ee_cache;       /* peer requests */
128 struct kmem_cache *drbd_bm_ext_cache;   /* bitmap extents */
129 struct kmem_cache *drbd_al_ext_cache;   /* activity log extents */
130 mempool_t *drbd_request_mempool;
131 mempool_t *drbd_ee_mempool;
132
133 /* I do not use a standard mempool, because:
134    1) I want to hand out the pre-allocated objects first.
135    2) I want to be able to interrupt sleeping allocation with a signal.
136    Note: This is a single linked list, the next pointer is the private
137          member of struct page.
138  */
139 struct page *drbd_pp_pool;
140 spinlock_t   drbd_pp_lock;
141 int          drbd_pp_vacant;
142 wait_queue_head_t drbd_pp_wait;
143
144 DEFINE_RATELIMIT_STATE(drbd_ratelimit_state, 5 * HZ, 5);
145
146 static const struct block_device_operations drbd_ops = {
147         .owner =   THIS_MODULE,
148         .open =    drbd_open,
149         .release = drbd_release,
150 };
151
152 #define ARRY_SIZE(A) (sizeof(A)/sizeof(A[0]))
153
154 #ifdef __CHECKER__
155 /* When checking with sparse, and this is an inline function, sparse will
156    give tons of false positives. When this is a real functions sparse works.
157  */
158 int _get_ldev_if_state(struct drbd_conf *mdev, enum drbd_disk_state mins)
159 {
160         int io_allowed;
161
162         atomic_inc(&mdev->local_cnt);
163         io_allowed = (mdev->state.disk >= mins);
164         if (!io_allowed) {
165                 if (atomic_dec_and_test(&mdev->local_cnt))
166                         wake_up(&mdev->misc_wait);
167         }
168         return io_allowed;
169 }
170
171 #endif
172
173 /**
174  * DOC: The transfer log
175  *
176  * The transfer log is a single linked list of &struct drbd_tl_epoch objects.
177  * mdev->tconn->newest_tle points to the head, mdev->tconn->oldest_tle points to the tail
178  * of the list. There is always at least one &struct drbd_tl_epoch object.
179  *
180  * Each &struct drbd_tl_epoch has a circular double linked list of requests
181  * attached.
182  */
183 static int tl_init(struct drbd_conf *mdev)
184 {
185         struct drbd_tl_epoch *b;
186
187         /* during device minor initialization, we may well use GFP_KERNEL */
188         b = kmalloc(sizeof(struct drbd_tl_epoch), GFP_KERNEL);
189         if (!b)
190                 return 0;
191         INIT_LIST_HEAD(&b->requests);
192         INIT_LIST_HEAD(&b->w.list);
193         b->next = NULL;
194         b->br_number = 4711;
195         b->n_writes = 0;
196         b->w.cb = NULL; /* if this is != NULL, we need to dec_ap_pending in tl_clear */
197
198         mdev->tconn->oldest_tle = b;
199         mdev->tconn->newest_tle = b;
200         INIT_LIST_HEAD(&mdev->tconn->out_of_sequence_requests);
201
202         return 1;
203 }
204
205 static void tl_cleanup(struct drbd_conf *mdev)
206 {
207         D_ASSERT(mdev->tconn->oldest_tle == mdev->tconn->newest_tle);
208         D_ASSERT(list_empty(&mdev->tconn->out_of_sequence_requests));
209         kfree(mdev->tconn->oldest_tle);
210         mdev->tconn->oldest_tle = NULL;
211         kfree(mdev->tconn->unused_spare_tle);
212         mdev->tconn->unused_spare_tle = NULL;
213 }
214
215 /**
216  * _tl_add_barrier() - Adds a barrier to the transfer log
217  * @mdev:       DRBD device.
218  * @new:        Barrier to be added before the current head of the TL.
219  *
220  * The caller must hold the req_lock.
221  */
222 void _tl_add_barrier(struct drbd_conf *mdev, struct drbd_tl_epoch *new)
223 {
224         struct drbd_tl_epoch *newest_before;
225
226         INIT_LIST_HEAD(&new->requests);
227         INIT_LIST_HEAD(&new->w.list);
228         new->w.cb = NULL; /* if this is != NULL, we need to dec_ap_pending in tl_clear */
229         new->next = NULL;
230         new->n_writes = 0;
231
232         newest_before = mdev->tconn->newest_tle;
233         /* never send a barrier number == 0, because that is special-cased
234          * when using TCQ for our write ordering code */
235         new->br_number = (newest_before->br_number+1) ?: 1;
236         if (mdev->tconn->newest_tle != new) {
237                 mdev->tconn->newest_tle->next = new;
238                 mdev->tconn->newest_tle = new;
239         }
240 }
241
242 /**
243  * tl_release() - Free or recycle the oldest &struct drbd_tl_epoch object of the TL
244  * @mdev:       DRBD device.
245  * @barrier_nr: Expected identifier of the DRBD write barrier packet.
246  * @set_size:   Expected number of requests before that barrier.
247  *
248  * In case the passed barrier_nr or set_size does not match the oldest
249  * &struct drbd_tl_epoch objects this function will cause a termination
250  * of the connection.
251  */
252 void tl_release(struct drbd_conf *mdev, unsigned int barrier_nr,
253                        unsigned int set_size)
254 {
255         struct drbd_tl_epoch *b, *nob; /* next old barrier */
256         struct list_head *le, *tle;
257         struct drbd_request *r;
258
259         spin_lock_irq(&mdev->tconn->req_lock);
260
261         b = mdev->tconn->oldest_tle;
262
263         /* first some paranoia code */
264         if (b == NULL) {
265                 dev_err(DEV, "BAD! BarrierAck #%u received, but no epoch in tl!?\n",
266                         barrier_nr);
267                 goto bail;
268         }
269         if (b->br_number != barrier_nr) {
270                 dev_err(DEV, "BAD! BarrierAck #%u received, expected #%u!\n",
271                         barrier_nr, b->br_number);
272                 goto bail;
273         }
274         if (b->n_writes != set_size) {
275                 dev_err(DEV, "BAD! BarrierAck #%u received with n_writes=%u, expected n_writes=%u!\n",
276                         barrier_nr, set_size, b->n_writes);
277                 goto bail;
278         }
279
280         /* Clean up list of requests processed during current epoch */
281         list_for_each_safe(le, tle, &b->requests) {
282                 r = list_entry(le, struct drbd_request, tl_requests);
283                 _req_mod(r, BARRIER_ACKED);
284         }
285         /* There could be requests on the list waiting for completion
286            of the write to the local disk. To avoid corruptions of
287            slab's data structures we have to remove the lists head.
288
289            Also there could have been a barrier ack out of sequence, overtaking
290            the write acks - which would be a bug and violating write ordering.
291            To not deadlock in case we lose connection while such requests are
292            still pending, we need some way to find them for the
293            _req_mode(CONNECTION_LOST_WHILE_PENDING).
294
295            These have been list_move'd to the out_of_sequence_requests list in
296            _req_mod(, BARRIER_ACKED) above.
297            */
298         list_del_init(&b->requests);
299
300         nob = b->next;
301         if (test_and_clear_bit(CREATE_BARRIER, &mdev->flags)) {
302                 _tl_add_barrier(mdev, b);
303                 if (nob)
304                         mdev->tconn->oldest_tle = nob;
305                 /* if nob == NULL b was the only barrier, and becomes the new
306                    barrier. Therefore mdev->tconn->oldest_tle points already to b */
307         } else {
308                 D_ASSERT(nob != NULL);
309                 mdev->tconn->oldest_tle = nob;
310                 kfree(b);
311         }
312
313         spin_unlock_irq(&mdev->tconn->req_lock);
314         dec_ap_pending(mdev);
315
316         return;
317
318 bail:
319         spin_unlock_irq(&mdev->tconn->req_lock);
320         drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
321 }
322
323
324 /**
325  * _tl_restart() - Walks the transfer log, and applies an action to all requests
326  * @mdev:       DRBD device.
327  * @what:       The action/event to perform with all request objects
328  *
329  * @what might be one of CONNECTION_LOST_WHILE_PENDING, RESEND, FAIL_FROZEN_DISK_IO,
330  * RESTART_FROZEN_DISK_IO.
331  */
332 void _tl_restart(struct drbd_conf *mdev, enum drbd_req_event what)
333 {
334         struct drbd_tl_epoch *b, *tmp, **pn;
335         struct list_head *le, *tle, carry_reads;
336         struct drbd_request *req;
337         int rv, n_writes, n_reads;
338
339         b = mdev->tconn->oldest_tle;
340         pn = &mdev->tconn->oldest_tle;
341         while (b) {
342                 n_writes = 0;
343                 n_reads = 0;
344                 INIT_LIST_HEAD(&carry_reads);
345                 list_for_each_safe(le, tle, &b->requests) {
346                         req = list_entry(le, struct drbd_request, tl_requests);
347                         rv = _req_mod(req, what);
348
349                         n_writes += (rv & MR_WRITE) >> MR_WRITE_SHIFT;
350                         n_reads  += (rv & MR_READ) >> MR_READ_SHIFT;
351                 }
352                 tmp = b->next;
353
354                 if (n_writes) {
355                         if (what == RESEND) {
356                                 b->n_writes = n_writes;
357                                 if (b->w.cb == NULL) {
358                                         b->w.cb = w_send_barrier;
359                                         inc_ap_pending(mdev);
360                                         set_bit(CREATE_BARRIER, &mdev->flags);
361                                 }
362
363                                 drbd_queue_work(&mdev->tconn->data.work, &b->w);
364                         }
365                         pn = &b->next;
366                 } else {
367                         if (n_reads)
368                                 list_add(&carry_reads, &b->requests);
369                         /* there could still be requests on that ring list,
370                          * in case local io is still pending */
371                         list_del(&b->requests);
372
373                         /* dec_ap_pending corresponding to queue_barrier.
374                          * the newest barrier may not have been queued yet,
375                          * in which case w.cb is still NULL. */
376                         if (b->w.cb != NULL)
377                                 dec_ap_pending(mdev);
378
379                         if (b == mdev->tconn->newest_tle) {
380                                 /* recycle, but reinit! */
381                                 D_ASSERT(tmp == NULL);
382                                 INIT_LIST_HEAD(&b->requests);
383                                 list_splice(&carry_reads, &b->requests);
384                                 INIT_LIST_HEAD(&b->w.list);
385                                 b->w.cb = NULL;
386                                 b->br_number = net_random();
387                                 b->n_writes = 0;
388
389                                 *pn = b;
390                                 break;
391                         }
392                         *pn = tmp;
393                         kfree(b);
394                 }
395                 b = tmp;
396                 list_splice(&carry_reads, &b->requests);
397         }
398 }
399
400
401 /**
402  * tl_clear() - Clears all requests and &struct drbd_tl_epoch objects out of the TL
403  * @mdev:       DRBD device.
404  *
405  * This is called after the connection to the peer was lost. The storage covered
406  * by the requests on the transfer gets marked as our of sync. Called from the
407  * receiver thread and the worker thread.
408  */
409 void tl_clear(struct drbd_conf *mdev)
410 {
411         struct list_head *le, *tle;
412         struct drbd_request *r;
413
414         spin_lock_irq(&mdev->tconn->req_lock);
415
416         _tl_restart(mdev, CONNECTION_LOST_WHILE_PENDING);
417
418         /* we expect this list to be empty. */
419         D_ASSERT(list_empty(&mdev->tconn->out_of_sequence_requests));
420
421         /* but just in case, clean it up anyways! */
422         list_for_each_safe(le, tle, &mdev->tconn->out_of_sequence_requests) {
423                 r = list_entry(le, struct drbd_request, tl_requests);
424                 /* It would be nice to complete outside of spinlock.
425                  * But this is easier for now. */
426                 _req_mod(r, CONNECTION_LOST_WHILE_PENDING);
427         }
428
429         /* ensure bit indicating barrier is required is clear */
430         clear_bit(CREATE_BARRIER, &mdev->flags);
431
432         spin_unlock_irq(&mdev->tconn->req_lock);
433 }
434
435 void tl_restart(struct drbd_conf *mdev, enum drbd_req_event what)
436 {
437         spin_lock_irq(&mdev->tconn->req_lock);
438         _tl_restart(mdev, what);
439         spin_unlock_irq(&mdev->tconn->req_lock);
440 }
441
442 static int drbd_thread_setup(void *arg)
443 {
444         struct drbd_thread *thi = (struct drbd_thread *) arg;
445         struct drbd_tconn *tconn = thi->tconn;
446         unsigned long flags;
447         int retval;
448
449         snprintf(current->comm, sizeof(current->comm), "drbd_%c_%s",
450                  thi->name[0], thi->tconn->name);
451
452 restart:
453         retval = thi->function(thi);
454
455         spin_lock_irqsave(&thi->t_lock, flags);
456
457         /* if the receiver has been "EXITING", the last thing it did
458          * was set the conn state to "StandAlone",
459          * if now a re-connect request comes in, conn state goes C_UNCONNECTED,
460          * and receiver thread will be "started".
461          * drbd_thread_start needs to set "RESTARTING" in that case.
462          * t_state check and assignment needs to be within the same spinlock,
463          * so either thread_start sees EXITING, and can remap to RESTARTING,
464          * or thread_start see NONE, and can proceed as normal.
465          */
466
467         if (thi->t_state == RESTARTING) {
468                 conn_info(tconn, "Restarting %s thread\n", thi->name);
469                 thi->t_state = RUNNING;
470                 spin_unlock_irqrestore(&thi->t_lock, flags);
471                 goto restart;
472         }
473
474         thi->task = NULL;
475         thi->t_state = NONE;
476         smp_mb();
477         complete(&thi->stop);
478         spin_unlock_irqrestore(&thi->t_lock, flags);
479
480         conn_info(tconn, "Terminating %s\n", current->comm);
481
482         /* Release mod reference taken when thread was started */
483         module_put(THIS_MODULE);
484         return retval;
485 }
486
487 static void drbd_thread_init(struct drbd_tconn *tconn, struct drbd_thread *thi,
488                              int (*func) (struct drbd_thread *), char *name)
489 {
490         spin_lock_init(&thi->t_lock);
491         thi->task    = NULL;
492         thi->t_state = NONE;
493         thi->function = func;
494         thi->tconn = tconn;
495         strncpy(thi->name, name, ARRAY_SIZE(thi->name));
496 }
497
498 int drbd_thread_start(struct drbd_thread *thi)
499 {
500         struct drbd_tconn *tconn = thi->tconn;
501         struct task_struct *nt;
502         unsigned long flags;
503
504         /* is used from state engine doing drbd_thread_stop_nowait,
505          * while holding the req lock irqsave */
506         spin_lock_irqsave(&thi->t_lock, flags);
507
508         switch (thi->t_state) {
509         case NONE:
510                 conn_info(tconn, "Starting %s thread (from %s [%d])\n",
511                          thi->name, current->comm, current->pid);
512
513                 /* Get ref on module for thread - this is released when thread exits */
514                 if (!try_module_get(THIS_MODULE)) {
515                         conn_err(tconn, "Failed to get module reference in drbd_thread_start\n");
516                         spin_unlock_irqrestore(&thi->t_lock, flags);
517                         return false;
518                 }
519
520                 init_completion(&thi->stop);
521                 thi->reset_cpu_mask = 1;
522                 thi->t_state = RUNNING;
523                 spin_unlock_irqrestore(&thi->t_lock, flags);
524                 flush_signals(current); /* otherw. may get -ERESTARTNOINTR */
525
526                 nt = kthread_create(drbd_thread_setup, (void *) thi,
527                                     "drbd_%c_%s", thi->name[0], thi->tconn->name);
528
529                 if (IS_ERR(nt)) {
530                         conn_err(tconn, "Couldn't start thread\n");
531
532                         module_put(THIS_MODULE);
533                         return false;
534                 }
535                 spin_lock_irqsave(&thi->t_lock, flags);
536                 thi->task = nt;
537                 thi->t_state = RUNNING;
538                 spin_unlock_irqrestore(&thi->t_lock, flags);
539                 wake_up_process(nt);
540                 break;
541         case EXITING:
542                 thi->t_state = RESTARTING;
543                 conn_info(tconn, "Restarting %s thread (from %s [%d])\n",
544                                 thi->name, current->comm, current->pid);
545                 /* fall through */
546         case RUNNING:
547         case RESTARTING:
548         default:
549                 spin_unlock_irqrestore(&thi->t_lock, flags);
550                 break;
551         }
552
553         return true;
554 }
555
556
557 void _drbd_thread_stop(struct drbd_thread *thi, int restart, int wait)
558 {
559         unsigned long flags;
560
561         enum drbd_thread_state ns = restart ? RESTARTING : EXITING;
562
563         /* may be called from state engine, holding the req lock irqsave */
564         spin_lock_irqsave(&thi->t_lock, flags);
565
566         if (thi->t_state == NONE) {
567                 spin_unlock_irqrestore(&thi->t_lock, flags);
568                 if (restart)
569                         drbd_thread_start(thi);
570                 return;
571         }
572
573         if (thi->t_state != ns) {
574                 if (thi->task == NULL) {
575                         spin_unlock_irqrestore(&thi->t_lock, flags);
576                         return;
577                 }
578
579                 thi->t_state = ns;
580                 smp_mb();
581                 init_completion(&thi->stop);
582                 if (thi->task != current)
583                         force_sig(DRBD_SIGKILL, thi->task);
584         }
585
586         spin_unlock_irqrestore(&thi->t_lock, flags);
587
588         if (wait)
589                 wait_for_completion(&thi->stop);
590 }
591
592 static struct drbd_thread *drbd_task_to_thread(struct drbd_tconn *tconn, struct task_struct *task)
593 {
594         struct drbd_thread *thi =
595                 task == tconn->receiver.task ? &tconn->receiver :
596                 task == tconn->asender.task  ? &tconn->asender :
597                 task == tconn->worker.task   ? &tconn->worker : NULL;
598
599         return thi;
600 }
601
602 char *drbd_task_to_thread_name(struct drbd_tconn *tconn, struct task_struct *task)
603 {
604         struct drbd_thread *thi = drbd_task_to_thread(tconn, task);
605         return thi ? thi->name : task->comm;
606 }
607
608 #ifdef CONFIG_SMP
609 static int conn_lowest_minor(struct drbd_tconn *tconn)
610 {
611         int minor = 0;
612         idr_get_next(&tconn->volumes, &minor);
613         return minor;
614 }
615 /**
616  * drbd_calc_cpu_mask() - Generate CPU masks, spread over all CPUs
617  * @mdev:       DRBD device.
618  *
619  * Forces all threads of a device onto the same CPU. This is beneficial for
620  * DRBD's performance. May be overwritten by user's configuration.
621  */
622 void drbd_calc_cpu_mask(struct drbd_tconn *tconn)
623 {
624         int ord, cpu;
625
626         /* user override. */
627         if (cpumask_weight(tconn->cpu_mask))
628                 return;
629
630         ord = conn_lowest_minor(tconn) % cpumask_weight(cpu_online_mask);
631         for_each_online_cpu(cpu) {
632                 if (ord-- == 0) {
633                         cpumask_set_cpu(cpu, tconn->cpu_mask);
634                         return;
635                 }
636         }
637         /* should not be reached */
638         cpumask_setall(tconn->cpu_mask);
639 }
640
641 /**
642  * drbd_thread_current_set_cpu() - modifies the cpu mask of the _current_ thread
643  * @mdev:       DRBD device.
644  * @thi:        drbd_thread object
645  *
646  * call in the "main loop" of _all_ threads, no need for any mutex, current won't die
647  * prematurely.
648  */
649 void drbd_thread_current_set_cpu(struct drbd_thread *thi)
650 {
651         struct task_struct *p = current;
652
653         if (!thi->reset_cpu_mask)
654                 return;
655         thi->reset_cpu_mask = 0;
656         set_cpus_allowed_ptr(p, thi->tconn->cpu_mask);
657 }
658 #endif
659
660 static void prepare_header80(struct p_header80 *h, enum drbd_packet cmd, int size)
661 {
662         h->magic   = cpu_to_be32(DRBD_MAGIC);
663         h->command = cpu_to_be16(cmd);
664         h->length  = cpu_to_be16(size);
665 }
666
667 static void prepare_header95(struct p_header95 *h, enum drbd_packet cmd, int size)
668 {
669         h->magic   = cpu_to_be16(DRBD_MAGIC_BIG);
670         h->command = cpu_to_be16(cmd);
671         h->length  = cpu_to_be32(size);
672 }
673
674 static void _prepare_header(struct drbd_tconn *tconn, int vnr, struct p_header *h,
675                             enum drbd_packet cmd, int size)
676 {
677         if (tconn->agreed_pro_version >= 100 || size > DRBD_MAX_SIZE_H80_PACKET)
678                 prepare_header95(&h->h95, cmd, size);
679         else
680                 prepare_header80(&h->h80, cmd, size);
681 }
682
683 static void prepare_header(struct drbd_conf *mdev, struct p_header *h,
684                            enum drbd_packet cmd, int size)
685 {
686         _prepare_header(mdev->tconn, mdev->vnr, h, cmd, size);
687 }
688
689 /* the appropriate socket mutex must be held already */
690 int _conn_send_cmd(struct drbd_tconn *tconn, int vnr, struct socket *sock,
691                    enum drbd_packet cmd, struct p_header *h, size_t size,
692                    unsigned msg_flags)
693 {
694         int sent, ok;
695
696         _prepare_header(tconn, vnr, h, cmd, size - sizeof(struct p_header));
697
698         sent = drbd_send(tconn, sock, h, size, msg_flags);
699
700         ok = (sent == size);
701         if (!ok && !signal_pending(current))
702                 conn_warn(tconn, "short sent %s size=%d sent=%d\n",
703                           cmdname(cmd), (int)size, sent);
704         return ok;
705 }
706
707 /* don't pass the socket. we may only look at it
708  * when we hold the appropriate socket mutex.
709  */
710 int conn_send_cmd(struct drbd_tconn *tconn, int vnr, int use_data_socket,
711                   enum drbd_packet cmd, struct p_header *h, size_t size)
712 {
713         int ok = 0;
714         struct socket *sock;
715
716         if (use_data_socket) {
717                 mutex_lock(&tconn->data.mutex);
718                 sock = tconn->data.socket;
719         } else {
720                 mutex_lock(&tconn->meta.mutex);
721                 sock = tconn->meta.socket;
722         }
723
724         /* drbd_disconnect() could have called drbd_free_sock()
725          * while we were waiting in down()... */
726         if (likely(sock != NULL))
727                 ok = _conn_send_cmd(tconn, vnr, sock, cmd, h, size, 0);
728
729         if (use_data_socket)
730                 mutex_unlock(&tconn->data.mutex);
731         else
732                 mutex_unlock(&tconn->meta.mutex);
733         return ok;
734 }
735
736 int conn_send_cmd2(struct drbd_tconn *tconn, enum drbd_packet cmd, char *data,
737                    size_t size)
738 {
739         struct p_header80 h;
740         int ok;
741
742         prepare_header80(&h, cmd, size);
743
744         if (!drbd_get_data_sock(tconn))
745                 return 0;
746
747         ok = (sizeof(h) ==
748                 drbd_send(tconn, tconn->data.socket, &h, sizeof(h), 0));
749         ok = ok && (size ==
750                 drbd_send(tconn, tconn->data.socket, data, size, 0));
751
752         drbd_put_data_sock(tconn);
753
754         return ok;
755 }
756
757 int drbd_send_sync_param(struct drbd_conf *mdev, struct syncer_conf *sc)
758 {
759         struct p_rs_param_95 *p;
760         struct socket *sock;
761         int size, rv;
762         const int apv = mdev->tconn->agreed_pro_version;
763
764         size = apv <= 87 ? sizeof(struct p_rs_param)
765                 : apv == 88 ? sizeof(struct p_rs_param)
766                         + strlen(mdev->sync_conf.verify_alg) + 1
767                 : apv <= 94 ? sizeof(struct p_rs_param_89)
768                 : /* apv >= 95 */ sizeof(struct p_rs_param_95);
769
770         /* used from admin command context and receiver/worker context.
771          * to avoid kmalloc, grab the socket right here,
772          * then use the pre-allocated sbuf there */
773         mutex_lock(&mdev->tconn->data.mutex);
774         sock = mdev->tconn->data.socket;
775
776         if (likely(sock != NULL)) {
777                 enum drbd_packet cmd =
778                         apv >= 89 ? P_SYNC_PARAM89 : P_SYNC_PARAM;
779
780                 p = &mdev->tconn->data.sbuf.rs_param_95;
781
782                 /* initialize verify_alg and csums_alg */
783                 memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
784
785                 p->rate = cpu_to_be32(sc->rate);
786                 p->c_plan_ahead = cpu_to_be32(sc->c_plan_ahead);
787                 p->c_delay_target = cpu_to_be32(sc->c_delay_target);
788                 p->c_fill_target = cpu_to_be32(sc->c_fill_target);
789                 p->c_max_rate = cpu_to_be32(sc->c_max_rate);
790
791                 if (apv >= 88)
792                         strcpy(p->verify_alg, mdev->sync_conf.verify_alg);
793                 if (apv >= 89)
794                         strcpy(p->csums_alg, mdev->sync_conf.csums_alg);
795
796                 rv = _drbd_send_cmd(mdev, sock, cmd, &p->head, size, 0);
797         } else
798                 rv = 0; /* not ok */
799
800         mutex_unlock(&mdev->tconn->data.mutex);
801
802         return rv;
803 }
804
805 int drbd_send_protocol(struct drbd_tconn *tconn)
806 {
807         struct p_protocol *p;
808         int size, cf, rv;
809
810         size = sizeof(struct p_protocol);
811
812         if (tconn->agreed_pro_version >= 87)
813                 size += strlen(tconn->net_conf->integrity_alg) + 1;
814
815         /* we must not recurse into our own queue,
816          * as that is blocked during handshake */
817         p = kmalloc(size, GFP_NOIO);
818         if (p == NULL)
819                 return 0;
820
821         p->protocol      = cpu_to_be32(tconn->net_conf->wire_protocol);
822         p->after_sb_0p   = cpu_to_be32(tconn->net_conf->after_sb_0p);
823         p->after_sb_1p   = cpu_to_be32(tconn->net_conf->after_sb_1p);
824         p->after_sb_2p   = cpu_to_be32(tconn->net_conf->after_sb_2p);
825         p->two_primaries = cpu_to_be32(tconn->net_conf->two_primaries);
826
827         cf = 0;
828         if (tconn->net_conf->want_lose)
829                 cf |= CF_WANT_LOSE;
830         if (tconn->net_conf->dry_run) {
831                 if (tconn->agreed_pro_version >= 92)
832                         cf |= CF_DRY_RUN;
833                 else {
834                         conn_err(tconn, "--dry-run is not supported by peer");
835                         kfree(p);
836                         return -1;
837                 }
838         }
839         p->conn_flags    = cpu_to_be32(cf);
840
841         if (tconn->agreed_pro_version >= 87)
842                 strcpy(p->integrity_alg, tconn->net_conf->integrity_alg);
843
844         rv = conn_send_cmd2(tconn, P_PROTOCOL, p->head.payload, size - sizeof(struct p_header));
845         kfree(p);
846         return rv;
847 }
848
849 int _drbd_send_uuids(struct drbd_conf *mdev, u64 uuid_flags)
850 {
851         struct p_uuids p;
852         int i;
853
854         if (!get_ldev_if_state(mdev, D_NEGOTIATING))
855                 return 1;
856
857         for (i = UI_CURRENT; i < UI_SIZE; i++)
858                 p.uuid[i] = mdev->ldev ? cpu_to_be64(mdev->ldev->md.uuid[i]) : 0;
859
860         mdev->comm_bm_set = drbd_bm_total_weight(mdev);
861         p.uuid[UI_SIZE] = cpu_to_be64(mdev->comm_bm_set);
862         uuid_flags |= mdev->tconn->net_conf->want_lose ? 1 : 0;
863         uuid_flags |= test_bit(CRASHED_PRIMARY, &mdev->flags) ? 2 : 0;
864         uuid_flags |= mdev->new_state_tmp.disk == D_INCONSISTENT ? 4 : 0;
865         p.uuid[UI_FLAGS] = cpu_to_be64(uuid_flags);
866
867         put_ldev(mdev);
868
869         return drbd_send_cmd(mdev, USE_DATA_SOCKET, P_UUIDS, &p.head, sizeof(p));
870 }
871
872 int drbd_send_uuids(struct drbd_conf *mdev)
873 {
874         return _drbd_send_uuids(mdev, 0);
875 }
876
877 int drbd_send_uuids_skip_initial_sync(struct drbd_conf *mdev)
878 {
879         return _drbd_send_uuids(mdev, 8);
880 }
881
882 void drbd_print_uuids(struct drbd_conf *mdev, const char *text)
883 {
884         if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
885                 u64 *uuid = mdev->ldev->md.uuid;
886                 dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX\n",
887                      text,
888                      (unsigned long long)uuid[UI_CURRENT],
889                      (unsigned long long)uuid[UI_BITMAP],
890                      (unsigned long long)uuid[UI_HISTORY_START],
891                      (unsigned long long)uuid[UI_HISTORY_END]);
892                 put_ldev(mdev);
893         } else {
894                 dev_info(DEV, "%s effective data uuid: %016llX\n",
895                                 text,
896                                 (unsigned long long)mdev->ed_uuid);
897         }
898 }
899
900 int drbd_gen_and_send_sync_uuid(struct drbd_conf *mdev)
901 {
902         struct p_rs_uuid p;
903         u64 uuid;
904
905         D_ASSERT(mdev->state.disk == D_UP_TO_DATE);
906
907         uuid = mdev->ldev->md.uuid[UI_BITMAP] + UUID_NEW_BM_OFFSET;
908         drbd_uuid_set(mdev, UI_BITMAP, uuid);
909         drbd_print_uuids(mdev, "updated sync UUID");
910         drbd_md_sync(mdev);
911         p.uuid = cpu_to_be64(uuid);
912
913         return drbd_send_cmd(mdev, USE_DATA_SOCKET, P_SYNC_UUID, &p.head, sizeof(p));
914 }
915
916 int drbd_send_sizes(struct drbd_conf *mdev, int trigger_reply, enum dds_flags flags)
917 {
918         struct p_sizes p;
919         sector_t d_size, u_size;
920         int q_order_type, max_bio_size;
921         int ok;
922
923         if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
924                 D_ASSERT(mdev->ldev->backing_bdev);
925                 d_size = drbd_get_max_capacity(mdev->ldev);
926                 u_size = mdev->ldev->dc.disk_size;
927                 q_order_type = drbd_queue_order_type(mdev);
928                 max_bio_size = queue_max_hw_sectors(mdev->ldev->backing_bdev->bd_disk->queue) << 9;
929                 max_bio_size = min_t(int, max_bio_size, DRBD_MAX_BIO_SIZE);
930                 put_ldev(mdev);
931         } else {
932                 d_size = 0;
933                 u_size = 0;
934                 q_order_type = QUEUE_ORDERED_NONE;
935                 max_bio_size = DRBD_MAX_BIO_SIZE; /* ... multiple BIOs per peer_request */
936         }
937
938         p.d_size = cpu_to_be64(d_size);
939         p.u_size = cpu_to_be64(u_size);
940         p.c_size = cpu_to_be64(trigger_reply ? 0 : drbd_get_capacity(mdev->this_bdev));
941         p.max_bio_size = cpu_to_be32(max_bio_size);
942         p.queue_order_type = cpu_to_be16(q_order_type);
943         p.dds_flags = cpu_to_be16(flags);
944
945         ok = drbd_send_cmd(mdev, USE_DATA_SOCKET, P_SIZES, &p.head, sizeof(p));
946         return ok;
947 }
948
949 /**
950  * drbd_send_state() - Sends the drbd state to the peer
951  * @mdev:       DRBD device.
952  */
953 int drbd_send_state(struct drbd_conf *mdev)
954 {
955         struct socket *sock;
956         struct p_state p;
957         int ok = 0;
958
959         mutex_lock(&mdev->tconn->data.mutex);
960
961         p.state = cpu_to_be32(mdev->state.i); /* Within the send mutex */
962         sock = mdev->tconn->data.socket;
963
964         if (likely(sock != NULL)) {
965                 ok = _drbd_send_cmd(mdev, sock, P_STATE, &p.head, sizeof(p), 0);
966         }
967
968         mutex_unlock(&mdev->tconn->data.mutex);
969
970         return ok;
971 }
972
973 int _conn_send_state_req(struct drbd_tconn *tconn, int vnr, enum drbd_packet cmd,
974                          union drbd_state mask, union drbd_state val)
975 {
976         struct p_req_state p;
977
978         p.mask    = cpu_to_be32(mask.i);
979         p.val     = cpu_to_be32(val.i);
980
981         return conn_send_cmd(tconn, vnr, USE_DATA_SOCKET, cmd, &p.head, sizeof(p));
982 }
983
984 int drbd_send_sr_reply(struct drbd_conf *mdev, enum drbd_state_rv retcode)
985 {
986         struct p_req_state_reply p;
987
988         p.retcode    = cpu_to_be32(retcode);
989
990         return drbd_send_cmd(mdev, USE_META_SOCKET, P_STATE_CHG_REPLY, &p.head, sizeof(p));
991 }
992
993 int fill_bitmap_rle_bits(struct drbd_conf *mdev,
994         struct p_compressed_bm *p,
995         struct bm_xfer_ctx *c)
996 {
997         struct bitstream bs;
998         unsigned long plain_bits;
999         unsigned long tmp;
1000         unsigned long rl;
1001         unsigned len;
1002         unsigned toggle;
1003         int bits;
1004
1005         /* may we use this feature? */
1006         if ((mdev->sync_conf.use_rle == 0) ||
1007                 (mdev->tconn->agreed_pro_version < 90))
1008                         return 0;
1009
1010         if (c->bit_offset >= c->bm_bits)
1011                 return 0; /* nothing to do. */
1012
1013         /* use at most thus many bytes */
1014         bitstream_init(&bs, p->code, BM_PACKET_VLI_BYTES_MAX, 0);
1015         memset(p->code, 0, BM_PACKET_VLI_BYTES_MAX);
1016         /* plain bits covered in this code string */
1017         plain_bits = 0;
1018
1019         /* p->encoding & 0x80 stores whether the first run length is set.
1020          * bit offset is implicit.
1021          * start with toggle == 2 to be able to tell the first iteration */
1022         toggle = 2;
1023
1024         /* see how much plain bits we can stuff into one packet
1025          * using RLE and VLI. */
1026         do {
1027                 tmp = (toggle == 0) ? _drbd_bm_find_next_zero(mdev, c->bit_offset)
1028                                     : _drbd_bm_find_next(mdev, c->bit_offset);
1029                 if (tmp == -1UL)
1030                         tmp = c->bm_bits;
1031                 rl = tmp - c->bit_offset;
1032
1033                 if (toggle == 2) { /* first iteration */
1034                         if (rl == 0) {
1035                                 /* the first checked bit was set,
1036                                  * store start value, */
1037                                 DCBP_set_start(p, 1);
1038                                 /* but skip encoding of zero run length */
1039                                 toggle = !toggle;
1040                                 continue;
1041                         }
1042                         DCBP_set_start(p, 0);
1043                 }
1044
1045                 /* paranoia: catch zero runlength.
1046                  * can only happen if bitmap is modified while we scan it. */
1047                 if (rl == 0) {
1048                         dev_err(DEV, "unexpected zero runlength while encoding bitmap "
1049                             "t:%u bo:%lu\n", toggle, c->bit_offset);
1050                         return -1;
1051                 }
1052
1053                 bits = vli_encode_bits(&bs, rl);
1054                 if (bits == -ENOBUFS) /* buffer full */
1055                         break;
1056                 if (bits <= 0) {
1057                         dev_err(DEV, "error while encoding bitmap: %d\n", bits);
1058                         return 0;
1059                 }
1060
1061                 toggle = !toggle;
1062                 plain_bits += rl;
1063                 c->bit_offset = tmp;
1064         } while (c->bit_offset < c->bm_bits);
1065
1066         len = bs.cur.b - p->code + !!bs.cur.bit;
1067
1068         if (plain_bits < (len << 3)) {
1069                 /* incompressible with this method.
1070                  * we need to rewind both word and bit position. */
1071                 c->bit_offset -= plain_bits;
1072                 bm_xfer_ctx_bit_to_word_offset(c);
1073                 c->bit_offset = c->word_offset * BITS_PER_LONG;
1074                 return 0;
1075         }
1076
1077         /* RLE + VLI was able to compress it just fine.
1078          * update c->word_offset. */
1079         bm_xfer_ctx_bit_to_word_offset(c);
1080
1081         /* store pad_bits */
1082         DCBP_set_pad_bits(p, (8 - bs.cur.bit) & 0x7);
1083
1084         return len;
1085 }
1086
1087 /**
1088  * send_bitmap_rle_or_plain
1089  *
1090  * Return 0 when done, 1 when another iteration is needed, and a negative error
1091  * code upon failure.
1092  */
1093 static int
1094 send_bitmap_rle_or_plain(struct drbd_conf *mdev,
1095                          struct p_header *h, struct bm_xfer_ctx *c)
1096 {
1097         struct p_compressed_bm *p = (void*)h;
1098         unsigned long num_words;
1099         int len;
1100         int ok;
1101
1102         len = fill_bitmap_rle_bits(mdev, p, c);
1103
1104         if (len < 0)
1105                 return -EIO;
1106
1107         if (len) {
1108                 DCBP_set_code(p, RLE_VLI_Bits);
1109                 ok = _drbd_send_cmd(mdev, mdev->tconn->data.socket, P_COMPRESSED_BITMAP, h,
1110                         sizeof(*p) + len, 0);
1111
1112                 c->packets[0]++;
1113                 c->bytes[0] += sizeof(*p) + len;
1114
1115                 if (c->bit_offset >= c->bm_bits)
1116                         len = 0; /* DONE */
1117         } else {
1118                 /* was not compressible.
1119                  * send a buffer full of plain text bits instead. */
1120                 num_words = min_t(size_t, BM_PACKET_WORDS, c->bm_words - c->word_offset);
1121                 len = num_words * sizeof(long);
1122                 if (len)
1123                         drbd_bm_get_lel(mdev, c->word_offset, num_words, (unsigned long*)h->payload);
1124                 ok = _drbd_send_cmd(mdev, mdev->tconn->data.socket, P_BITMAP,
1125                                    h, sizeof(struct p_header80) + len, 0);
1126                 c->word_offset += num_words;
1127                 c->bit_offset = c->word_offset * BITS_PER_LONG;
1128
1129                 c->packets[1]++;
1130                 c->bytes[1] += sizeof(struct p_header80) + len;
1131
1132                 if (c->bit_offset > c->bm_bits)
1133                         c->bit_offset = c->bm_bits;
1134         }
1135         if (ok) {
1136                 if (len == 0) {
1137                         INFO_bm_xfer_stats(mdev, "send", c);
1138                         return 0;
1139                 } else
1140                         return 1;
1141         }
1142         return -EIO;
1143 }
1144
1145 /* See the comment at receive_bitmap() */
1146 int _drbd_send_bitmap(struct drbd_conf *mdev)
1147 {
1148         struct bm_xfer_ctx c;
1149         struct p_header *p;
1150         int err;
1151
1152         if (!expect(mdev->bitmap))
1153                 return false;
1154
1155         /* maybe we should use some per thread scratch page,
1156          * and allocate that during initial device creation? */
1157         p = (struct p_header *) __get_free_page(GFP_NOIO);
1158         if (!p) {
1159                 dev_err(DEV, "failed to allocate one page buffer in %s\n", __func__);
1160                 return false;
1161         }
1162
1163         if (get_ldev(mdev)) {
1164                 if (drbd_md_test_flag(mdev->ldev, MDF_FULL_SYNC)) {
1165                         dev_info(DEV, "Writing the whole bitmap, MDF_FullSync was set.\n");
1166                         drbd_bm_set_all(mdev);
1167                         if (drbd_bm_write(mdev)) {
1168                                 /* write_bm did fail! Leave full sync flag set in Meta P_DATA
1169                                  * but otherwise process as per normal - need to tell other
1170                                  * side that a full resync is required! */
1171                                 dev_err(DEV, "Failed to write bitmap to disk!\n");
1172                         } else {
1173                                 drbd_md_clear_flag(mdev, MDF_FULL_SYNC);
1174                                 drbd_md_sync(mdev);
1175                         }
1176                 }
1177                 put_ldev(mdev);
1178         }
1179
1180         c = (struct bm_xfer_ctx) {
1181                 .bm_bits = drbd_bm_bits(mdev),
1182                 .bm_words = drbd_bm_words(mdev),
1183         };
1184
1185         do {
1186                 err = send_bitmap_rle_or_plain(mdev, p, &c);
1187         } while (err > 0);
1188
1189         free_page((unsigned long) p);
1190         return err == 0;
1191 }
1192
1193 int drbd_send_bitmap(struct drbd_conf *mdev)
1194 {
1195         int err;
1196
1197         if (!drbd_get_data_sock(mdev->tconn))
1198                 return -1;
1199         err = !_drbd_send_bitmap(mdev);
1200         drbd_put_data_sock(mdev->tconn);
1201         return err;
1202 }
1203
1204 int drbd_send_b_ack(struct drbd_conf *mdev, u32 barrier_nr, u32 set_size)
1205 {
1206         int ok;
1207         struct p_barrier_ack p;
1208
1209         p.barrier  = barrier_nr;
1210         p.set_size = cpu_to_be32(set_size);
1211
1212         if (mdev->state.conn < C_CONNECTED)
1213                 return false;
1214         ok = drbd_send_cmd(mdev, USE_META_SOCKET, P_BARRIER_ACK, &p.head, sizeof(p));
1215         return ok;
1216 }
1217
1218 /**
1219  * _drbd_send_ack() - Sends an ack packet
1220  * @mdev:       DRBD device.
1221  * @cmd:        Packet command code.
1222  * @sector:     sector, needs to be in big endian byte order
1223  * @blksize:    size in byte, needs to be in big endian byte order
1224  * @block_id:   Id, big endian byte order
1225  */
1226 static int _drbd_send_ack(struct drbd_conf *mdev, enum drbd_packet cmd,
1227                           u64 sector, u32 blksize, u64 block_id)
1228 {
1229         int ok;
1230         struct p_block_ack p;
1231
1232         p.sector   = sector;
1233         p.block_id = block_id;
1234         p.blksize  = blksize;
1235         p.seq_num  = cpu_to_be32(atomic_add_return(1, &mdev->packet_seq));
1236
1237         if (!mdev->tconn->meta.socket || mdev->state.conn < C_CONNECTED)
1238                 return false;
1239         ok = drbd_send_cmd(mdev, USE_META_SOCKET, cmd, &p.head, sizeof(p));
1240         return ok;
1241 }
1242
1243 /* dp->sector and dp->block_id already/still in network byte order,
1244  * data_size is payload size according to dp->head,
1245  * and may need to be corrected for digest size. */
1246 int drbd_send_ack_dp(struct drbd_conf *mdev, enum drbd_packet cmd,
1247                      struct p_data *dp, int data_size)
1248 {
1249         data_size -= (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_r_tfm) ?
1250                 crypto_hash_digestsize(mdev->tconn->integrity_r_tfm) : 0;
1251         return _drbd_send_ack(mdev, cmd, dp->sector, cpu_to_be32(data_size),
1252                               dp->block_id);
1253 }
1254
1255 int drbd_send_ack_rp(struct drbd_conf *mdev, enum drbd_packet cmd,
1256                      struct p_block_req *rp)
1257 {
1258         return _drbd_send_ack(mdev, cmd, rp->sector, rp->blksize, rp->block_id);
1259 }
1260
1261 /**
1262  * drbd_send_ack() - Sends an ack packet
1263  * @mdev:       DRBD device
1264  * @cmd:        packet command code
1265  * @peer_req:   peer request
1266  */
1267 int drbd_send_ack(struct drbd_conf *mdev, enum drbd_packet cmd,
1268                   struct drbd_peer_request *peer_req)
1269 {
1270         return _drbd_send_ack(mdev, cmd,
1271                               cpu_to_be64(peer_req->i.sector),
1272                               cpu_to_be32(peer_req->i.size),
1273                               peer_req->block_id);
1274 }
1275
1276 /* This function misuses the block_id field to signal if the blocks
1277  * are is sync or not. */
1278 int drbd_send_ack_ex(struct drbd_conf *mdev, enum drbd_packet cmd,
1279                      sector_t sector, int blksize, u64 block_id)
1280 {
1281         return _drbd_send_ack(mdev, cmd,
1282                               cpu_to_be64(sector),
1283                               cpu_to_be32(blksize),
1284                               cpu_to_be64(block_id));
1285 }
1286
1287 int drbd_send_drequest(struct drbd_conf *mdev, int cmd,
1288                        sector_t sector, int size, u64 block_id)
1289 {
1290         int ok;
1291         struct p_block_req p;
1292
1293         p.sector   = cpu_to_be64(sector);
1294         p.block_id = block_id;
1295         p.blksize  = cpu_to_be32(size);
1296
1297         ok = drbd_send_cmd(mdev, USE_DATA_SOCKET, cmd, &p.head, sizeof(p));
1298         return ok;
1299 }
1300
1301 int drbd_send_drequest_csum(struct drbd_conf *mdev, sector_t sector, int size,
1302                             void *digest, int digest_size, enum drbd_packet cmd)
1303 {
1304         int ok;
1305         struct p_block_req p;
1306
1307         prepare_header(mdev, &p.head, cmd, sizeof(p) - sizeof(struct p_header) + digest_size);
1308         p.sector   = cpu_to_be64(sector);
1309         p.block_id = ID_SYNCER /* unused */;
1310         p.blksize  = cpu_to_be32(size);
1311
1312         mutex_lock(&mdev->tconn->data.mutex);
1313
1314         ok = (sizeof(p) == drbd_send(mdev->tconn, mdev->tconn->data.socket, &p, sizeof(p), 0));
1315         ok = ok && (digest_size == drbd_send(mdev->tconn, mdev->tconn->data.socket, digest, digest_size, 0));
1316
1317         mutex_unlock(&mdev->tconn->data.mutex);
1318
1319         return ok;
1320 }
1321
1322 int drbd_send_ov_request(struct drbd_conf *mdev, sector_t sector, int size)
1323 {
1324         int ok;
1325         struct p_block_req p;
1326
1327         p.sector   = cpu_to_be64(sector);
1328         p.block_id = ID_SYNCER /* unused */;
1329         p.blksize  = cpu_to_be32(size);
1330
1331         ok = drbd_send_cmd(mdev, USE_DATA_SOCKET, P_OV_REQUEST, &p.head, sizeof(p));
1332         return ok;
1333 }
1334
1335 /* called on sndtimeo
1336  * returns false if we should retry,
1337  * true if we think connection is dead
1338  */
1339 static int we_should_drop_the_connection(struct drbd_tconn *tconn, struct socket *sock)
1340 {
1341         int drop_it;
1342         /* long elapsed = (long)(jiffies - mdev->last_received); */
1343
1344         drop_it =   tconn->meta.socket == sock
1345                 || !tconn->asender.task
1346                 || get_t_state(&tconn->asender) != RUNNING
1347                 || tconn->cstate < C_WF_REPORT_PARAMS;
1348
1349         if (drop_it)
1350                 return true;
1351
1352         drop_it = !--tconn->ko_count;
1353         if (!drop_it) {
1354                 conn_err(tconn, "[%s/%d] sock_sendmsg time expired, ko = %u\n",
1355                          current->comm, current->pid, tconn->ko_count);
1356                 request_ping(tconn);
1357         }
1358
1359         return drop_it; /* && (mdev->state == R_PRIMARY) */;
1360 }
1361
1362 static void drbd_update_congested(struct drbd_tconn *tconn)
1363 {
1364         struct sock *sk = tconn->data.socket->sk;
1365         if (sk->sk_wmem_queued > sk->sk_sndbuf * 4 / 5)
1366                 set_bit(NET_CONGESTED, &tconn->flags);
1367 }
1368
1369 /* The idea of sendpage seems to be to put some kind of reference
1370  * to the page into the skb, and to hand it over to the NIC. In
1371  * this process get_page() gets called.
1372  *
1373  * As soon as the page was really sent over the network put_page()
1374  * gets called by some part of the network layer. [ NIC driver? ]
1375  *
1376  * [ get_page() / put_page() increment/decrement the count. If count
1377  *   reaches 0 the page will be freed. ]
1378  *
1379  * This works nicely with pages from FSs.
1380  * But this means that in protocol A we might signal IO completion too early!
1381  *
1382  * In order not to corrupt data during a resync we must make sure
1383  * that we do not reuse our own buffer pages (EEs) to early, therefore
1384  * we have the net_ee list.
1385  *
1386  * XFS seems to have problems, still, it submits pages with page_count == 0!
1387  * As a workaround, we disable sendpage on pages
1388  * with page_count == 0 or PageSlab.
1389  */
1390 static int _drbd_no_send_page(struct drbd_conf *mdev, struct page *page,
1391                    int offset, size_t size, unsigned msg_flags)
1392 {
1393         int sent = drbd_send(mdev->tconn, mdev->tconn->data.socket, kmap(page) + offset, size, msg_flags);
1394         kunmap(page);
1395         if (sent == size)
1396                 mdev->send_cnt += size>>9;
1397         return sent == size;
1398 }
1399
1400 static int _drbd_send_page(struct drbd_conf *mdev, struct page *page,
1401                     int offset, size_t size, unsigned msg_flags)
1402 {
1403         mm_segment_t oldfs = get_fs();
1404         int sent, ok;
1405         int len = size;
1406
1407         /* e.g. XFS meta- & log-data is in slab pages, which have a
1408          * page_count of 0 and/or have PageSlab() set.
1409          * we cannot use send_page for those, as that does get_page();
1410          * put_page(); and would cause either a VM_BUG directly, or
1411          * __page_cache_release a page that would actually still be referenced
1412          * by someone, leading to some obscure delayed Oops somewhere else. */
1413         if (disable_sendpage || (page_count(page) < 1) || PageSlab(page))
1414                 return _drbd_no_send_page(mdev, page, offset, size, msg_flags);
1415
1416         msg_flags |= MSG_NOSIGNAL;
1417         drbd_update_congested(mdev->tconn);
1418         set_fs(KERNEL_DS);
1419         do {
1420                 sent = mdev->tconn->data.socket->ops->sendpage(mdev->tconn->data.socket, page,
1421                                                         offset, len,
1422                                                         msg_flags);
1423                 if (sent == -EAGAIN) {
1424                         if (we_should_drop_the_connection(mdev->tconn,
1425                                                           mdev->tconn->data.socket))
1426                                 break;
1427                         else
1428                                 continue;
1429                 }
1430                 if (sent <= 0) {
1431                         dev_warn(DEV, "%s: size=%d len=%d sent=%d\n",
1432                              __func__, (int)size, len, sent);
1433                         break;
1434                 }
1435                 len    -= sent;
1436                 offset += sent;
1437         } while (len > 0 /* THINK && mdev->cstate >= C_CONNECTED*/);
1438         set_fs(oldfs);
1439         clear_bit(NET_CONGESTED, &mdev->tconn->flags);
1440
1441         ok = (len == 0);
1442         if (likely(ok))
1443                 mdev->send_cnt += size>>9;
1444         return ok;
1445 }
1446
1447 static int _drbd_send_bio(struct drbd_conf *mdev, struct bio *bio)
1448 {
1449         struct bio_vec *bvec;
1450         int i;
1451         /* hint all but last page with MSG_MORE */
1452         __bio_for_each_segment(bvec, bio, i, 0) {
1453                 if (!_drbd_no_send_page(mdev, bvec->bv_page,
1454                                      bvec->bv_offset, bvec->bv_len,
1455                                      i == bio->bi_vcnt -1 ? 0 : MSG_MORE))
1456                         return 0;
1457         }
1458         return 1;
1459 }
1460
1461 static int _drbd_send_zc_bio(struct drbd_conf *mdev, struct bio *bio)
1462 {
1463         struct bio_vec *bvec;
1464         int i;
1465         /* hint all but last page with MSG_MORE */
1466         __bio_for_each_segment(bvec, bio, i, 0) {
1467                 if (!_drbd_send_page(mdev, bvec->bv_page,
1468                                      bvec->bv_offset, bvec->bv_len,
1469                                      i == bio->bi_vcnt -1 ? 0 : MSG_MORE))
1470                         return 0;
1471         }
1472         return 1;
1473 }
1474
1475 static int _drbd_send_zc_ee(struct drbd_conf *mdev,
1476                             struct drbd_peer_request *peer_req)
1477 {
1478         struct page *page = peer_req->pages;
1479         unsigned len = peer_req->i.size;
1480
1481         /* hint all but last page with MSG_MORE */
1482         page_chain_for_each(page) {
1483                 unsigned l = min_t(unsigned, len, PAGE_SIZE);
1484                 if (!_drbd_send_page(mdev, page, 0, l,
1485                                 page_chain_next(page) ? MSG_MORE : 0))
1486                         return 0;
1487                 len -= l;
1488         }
1489         return 1;
1490 }
1491
1492 static u32 bio_flags_to_wire(struct drbd_conf *mdev, unsigned long bi_rw)
1493 {
1494         if (mdev->tconn->agreed_pro_version >= 95)
1495                 return  (bi_rw & REQ_SYNC ? DP_RW_SYNC : 0) |
1496                         (bi_rw & REQ_FUA ? DP_FUA : 0) |
1497                         (bi_rw & REQ_FLUSH ? DP_FLUSH : 0) |
1498                         (bi_rw & REQ_DISCARD ? DP_DISCARD : 0);
1499         else
1500                 return bi_rw & REQ_SYNC ? DP_RW_SYNC : 0;
1501 }
1502
1503 /* Used to send write requests
1504  * R_PRIMARY -> Peer    (P_DATA)
1505  */
1506 int drbd_send_dblock(struct drbd_conf *mdev, struct drbd_request *req)
1507 {
1508         int ok = 1;
1509         struct p_data p;
1510         unsigned int dp_flags = 0;
1511         void *dgb;
1512         int dgs;
1513
1514         if (!drbd_get_data_sock(mdev->tconn))
1515                 return 0;
1516
1517         dgs = (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_w_tfm) ?
1518                 crypto_hash_digestsize(mdev->tconn->integrity_w_tfm) : 0;
1519
1520         prepare_header(mdev, &p.head, P_DATA, sizeof(p) - sizeof(struct p_header) + dgs + req->i.size);
1521         p.sector   = cpu_to_be64(req->i.sector);
1522         p.block_id = (unsigned long)req;
1523         p.seq_num  = cpu_to_be32(req->seq_num = atomic_add_return(1, &mdev->packet_seq));
1524
1525         dp_flags = bio_flags_to_wire(mdev, req->master_bio->bi_rw);
1526
1527         if (mdev->state.conn >= C_SYNC_SOURCE &&
1528             mdev->state.conn <= C_PAUSED_SYNC_T)
1529                 dp_flags |= DP_MAY_SET_IN_SYNC;
1530
1531         p.dp_flags = cpu_to_be32(dp_flags);
1532         set_bit(UNPLUG_REMOTE, &mdev->flags);
1533         ok = (sizeof(p) ==
1534                 drbd_send(mdev->tconn, mdev->tconn->data.socket, &p, sizeof(p), dgs ? MSG_MORE : 0));
1535         if (ok && dgs) {
1536                 dgb = mdev->tconn->int_dig_out;
1537                 drbd_csum_bio(mdev, mdev->tconn->integrity_w_tfm, req->master_bio, dgb);
1538                 ok = dgs == drbd_send(mdev->tconn, mdev->tconn->data.socket, dgb, dgs, 0);
1539         }
1540         if (ok) {
1541                 /* For protocol A, we have to memcpy the payload into
1542                  * socket buffers, as we may complete right away
1543                  * as soon as we handed it over to tcp, at which point the data
1544                  * pages may become invalid.
1545                  *
1546                  * For data-integrity enabled, we copy it as well, so we can be
1547                  * sure that even if the bio pages may still be modified, it
1548                  * won't change the data on the wire, thus if the digest checks
1549                  * out ok after sending on this side, but does not fit on the
1550                  * receiving side, we sure have detected corruption elsewhere.
1551                  */
1552                 if (mdev->tconn->net_conf->wire_protocol == DRBD_PROT_A || dgs)
1553                         ok = _drbd_send_bio(mdev, req->master_bio);
1554                 else
1555                         ok = _drbd_send_zc_bio(mdev, req->master_bio);
1556
1557                 /* double check digest, sometimes buffers have been modified in flight. */
1558                 if (dgs > 0 && dgs <= 64) {
1559                         /* 64 byte, 512 bit, is the largest digest size
1560                          * currently supported in kernel crypto. */
1561                         unsigned char digest[64];
1562                         drbd_csum_bio(mdev, mdev->tconn->integrity_w_tfm, req->master_bio, digest);
1563                         if (memcmp(mdev->tconn->int_dig_out, digest, dgs)) {
1564                                 dev_warn(DEV,
1565                                         "Digest mismatch, buffer modified by upper layers during write: %llus +%u\n",
1566                                         (unsigned long long)req->i.sector, req->i.size);
1567                         }
1568                 } /* else if (dgs > 64) {
1569                      ... Be noisy about digest too large ...
1570                 } */
1571         }
1572
1573         drbd_put_data_sock(mdev->tconn);
1574
1575         return ok;
1576 }
1577
1578 /* answer packet, used to send data back for read requests:
1579  *  Peer       -> (diskless) R_PRIMARY   (P_DATA_REPLY)
1580  *  C_SYNC_SOURCE -> C_SYNC_TARGET         (P_RS_DATA_REPLY)
1581  */
1582 int drbd_send_block(struct drbd_conf *mdev, enum drbd_packet cmd,
1583                     struct drbd_peer_request *peer_req)
1584 {
1585         int ok;
1586         struct p_data p;
1587         void *dgb;
1588         int dgs;
1589
1590         dgs = (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_w_tfm) ?
1591                 crypto_hash_digestsize(mdev->tconn->integrity_w_tfm) : 0;
1592
1593         prepare_header(mdev, &p.head, cmd, sizeof(p) -
1594                                            sizeof(struct p_header80) +
1595                                            dgs + peer_req->i.size);
1596         p.sector   = cpu_to_be64(peer_req->i.sector);
1597         p.block_id = peer_req->block_id;
1598         p.seq_num = 0;  /* unused */
1599
1600         /* Only called by our kernel thread.
1601          * This one may be interrupted by DRBD_SIG and/or DRBD_SIGKILL
1602          * in response to admin command or module unload.
1603          */
1604         if (!drbd_get_data_sock(mdev->tconn))
1605                 return 0;
1606
1607         ok = sizeof(p) == drbd_send(mdev->tconn, mdev->tconn->data.socket, &p, sizeof(p), dgs ? MSG_MORE : 0);
1608         if (ok && dgs) {
1609                 dgb = mdev->tconn->int_dig_out;
1610                 drbd_csum_ee(mdev, mdev->tconn->integrity_w_tfm, peer_req, dgb);
1611                 ok = dgs == drbd_send(mdev->tconn, mdev->tconn->data.socket, dgb, dgs, 0);
1612         }
1613         if (ok)
1614                 ok = _drbd_send_zc_ee(mdev, peer_req);
1615
1616         drbd_put_data_sock(mdev->tconn);
1617
1618         return ok;
1619 }
1620
1621 int drbd_send_oos(struct drbd_conf *mdev, struct drbd_request *req)
1622 {
1623         struct p_block_desc p;
1624
1625         p.sector  = cpu_to_be64(req->i.sector);
1626         p.blksize = cpu_to_be32(req->i.size);
1627
1628         return drbd_send_cmd(mdev, USE_DATA_SOCKET, P_OUT_OF_SYNC, &p.head, sizeof(p));
1629 }
1630
1631 /*
1632   drbd_send distinguishes two cases:
1633
1634   Packets sent via the data socket "sock"
1635   and packets sent via the meta data socket "msock"
1636
1637                     sock                      msock
1638   -----------------+-------------------------+------------------------------
1639   timeout           conf.timeout / 2          conf.timeout / 2
1640   timeout action    send a ping via msock     Abort communication
1641                                               and close all sockets
1642 */
1643
1644 /*
1645  * you must have down()ed the appropriate [m]sock_mutex elsewhere!
1646  */
1647 int drbd_send(struct drbd_tconn *tconn, struct socket *sock,
1648               void *buf, size_t size, unsigned msg_flags)
1649 {
1650         struct kvec iov;
1651         struct msghdr msg;
1652         int rv, sent = 0;
1653
1654         if (!sock)
1655                 return -1000;
1656
1657         /* THINK  if (signal_pending) return ... ? */
1658
1659         iov.iov_base = buf;
1660         iov.iov_len  = size;
1661
1662         msg.msg_name       = NULL;
1663         msg.msg_namelen    = 0;
1664         msg.msg_control    = NULL;
1665         msg.msg_controllen = 0;
1666         msg.msg_flags      = msg_flags | MSG_NOSIGNAL;
1667
1668         if (sock == tconn->data.socket) {
1669                 tconn->ko_count = tconn->net_conf->ko_count;
1670                 drbd_update_congested(tconn);
1671         }
1672         do {
1673                 /* STRANGE
1674                  * tcp_sendmsg does _not_ use its size parameter at all ?
1675                  *
1676                  * -EAGAIN on timeout, -EINTR on signal.
1677                  */
1678 /* THINK
1679  * do we need to block DRBD_SIG if sock == &meta.socket ??
1680  * otherwise wake_asender() might interrupt some send_*Ack !
1681  */
1682                 rv = kernel_sendmsg(sock, &msg, &iov, 1, size);
1683                 if (rv == -EAGAIN) {
1684                         if (we_should_drop_the_connection(tconn, sock))
1685                                 break;
1686                         else
1687                                 continue;
1688                 }
1689                 if (rv == -EINTR) {
1690                         flush_signals(current);
1691                         rv = 0;
1692                 }
1693                 if (rv < 0)
1694                         break;
1695                 sent += rv;
1696                 iov.iov_base += rv;
1697                 iov.iov_len  -= rv;
1698         } while (sent < size);
1699
1700         if (sock == tconn->data.socket)
1701                 clear_bit(NET_CONGESTED, &tconn->flags);
1702
1703         if (rv <= 0) {
1704                 if (rv != -EAGAIN) {
1705                         conn_err(tconn, "%s_sendmsg returned %d\n",
1706                                  sock == tconn->meta.socket ? "msock" : "sock",
1707                                  rv);
1708                         conn_request_state(tconn, NS(conn, C_BROKEN_PIPE), CS_HARD);
1709                 } else
1710                         conn_request_state(tconn, NS(conn, C_TIMEOUT), CS_HARD);
1711         }
1712
1713         return sent;
1714 }
1715
1716 static int drbd_open(struct block_device *bdev, fmode_t mode)
1717 {
1718         struct drbd_conf *mdev = bdev->bd_disk->private_data;
1719         unsigned long flags;
1720         int rv = 0;
1721
1722         mutex_lock(&drbd_main_mutex);
1723         spin_lock_irqsave(&mdev->tconn->req_lock, flags);
1724         /* to have a stable mdev->state.role
1725          * and no race with updating open_cnt */
1726
1727         if (mdev->state.role != R_PRIMARY) {
1728                 if (mode & FMODE_WRITE)
1729                         rv = -EROFS;
1730                 else if (!allow_oos)
1731                         rv = -EMEDIUMTYPE;
1732         }
1733
1734         if (!rv)
1735                 mdev->open_cnt++;
1736         spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
1737         mutex_unlock(&drbd_main_mutex);
1738
1739         return rv;
1740 }
1741
1742 static int drbd_release(struct gendisk *gd, fmode_t mode)
1743 {
1744         struct drbd_conf *mdev = gd->private_data;
1745         mutex_lock(&drbd_main_mutex);
1746         mdev->open_cnt--;
1747         mutex_unlock(&drbd_main_mutex);
1748         return 0;
1749 }
1750
1751 static void drbd_set_defaults(struct drbd_conf *mdev)
1752 {
1753         /* This way we get a compile error when sync_conf grows,
1754            and we forgot to initialize it here */
1755         mdev->sync_conf = (struct syncer_conf) {
1756                 /* .rate = */           DRBD_RATE_DEF,
1757                 /* .after = */          DRBD_AFTER_DEF,
1758                 /* .al_extents = */     DRBD_AL_EXTENTS_DEF,
1759                 /* .verify_alg = */     {}, 0,
1760                 /* .cpu_mask = */       {}, 0,
1761                 /* .csums_alg = */      {}, 0,
1762                 /* .use_rle = */        0,
1763                 /* .on_no_data = */     DRBD_ON_NO_DATA_DEF,
1764                 /* .c_plan_ahead = */   DRBD_C_PLAN_AHEAD_DEF,
1765                 /* .c_delay_target = */ DRBD_C_DELAY_TARGET_DEF,
1766                 /* .c_fill_target = */  DRBD_C_FILL_TARGET_DEF,
1767                 /* .c_max_rate = */     DRBD_C_MAX_RATE_DEF,
1768                 /* .c_min_rate = */     DRBD_C_MIN_RATE_DEF
1769         };
1770
1771         /* Have to use that way, because the layout differs between
1772            big endian and little endian */
1773         mdev->state = (union drbd_state) {
1774                 { .role = R_SECONDARY,
1775                   .peer = R_UNKNOWN,
1776                   .conn = C_STANDALONE,
1777                   .disk = D_DISKLESS,
1778                   .pdsk = D_UNKNOWN,
1779                   .susp = 0,
1780                   .susp_nod = 0,
1781                   .susp_fen = 0
1782                 } };
1783 }
1784
1785 void drbd_init_set_defaults(struct drbd_conf *mdev)
1786 {
1787         /* the memset(,0,) did most of this.
1788          * note: only assignments, no allocation in here */
1789
1790         drbd_set_defaults(mdev);
1791
1792         atomic_set(&mdev->ap_bio_cnt, 0);
1793         atomic_set(&mdev->ap_pending_cnt, 0);
1794         atomic_set(&mdev->rs_pending_cnt, 0);
1795         atomic_set(&mdev->unacked_cnt, 0);
1796         atomic_set(&mdev->local_cnt, 0);
1797         atomic_set(&mdev->pp_in_use, 0);
1798         atomic_set(&mdev->pp_in_use_by_net, 0);
1799         atomic_set(&mdev->rs_sect_in, 0);
1800         atomic_set(&mdev->rs_sect_ev, 0);
1801         atomic_set(&mdev->ap_in_flight, 0);
1802
1803         mutex_init(&mdev->md_io_mutex);
1804         mutex_init(&mdev->own_state_mutex);
1805         mdev->state_mutex = &mdev->own_state_mutex;
1806
1807         spin_lock_init(&mdev->al_lock);
1808         spin_lock_init(&mdev->peer_seq_lock);
1809         spin_lock_init(&mdev->epoch_lock);
1810
1811         INIT_LIST_HEAD(&mdev->active_ee);
1812         INIT_LIST_HEAD(&mdev->sync_ee);
1813         INIT_LIST_HEAD(&mdev->done_ee);
1814         INIT_LIST_HEAD(&mdev->read_ee);
1815         INIT_LIST_HEAD(&mdev->net_ee);
1816         INIT_LIST_HEAD(&mdev->resync_reads);
1817         INIT_LIST_HEAD(&mdev->resync_work.list);
1818         INIT_LIST_HEAD(&mdev->unplug_work.list);
1819         INIT_LIST_HEAD(&mdev->go_diskless.list);
1820         INIT_LIST_HEAD(&mdev->md_sync_work.list);
1821         INIT_LIST_HEAD(&mdev->start_resync_work.list);
1822         INIT_LIST_HEAD(&mdev->bm_io_work.w.list);
1823
1824         mdev->resync_work.cb  = w_resync_timer;
1825         mdev->unplug_work.cb  = w_send_write_hint;
1826         mdev->go_diskless.cb  = w_go_diskless;
1827         mdev->md_sync_work.cb = w_md_sync;
1828         mdev->bm_io_work.w.cb = w_bitmap_io;
1829         mdev->start_resync_work.cb = w_start_resync;
1830
1831         mdev->resync_work.mdev  = mdev;
1832         mdev->unplug_work.mdev  = mdev;
1833         mdev->go_diskless.mdev  = mdev;
1834         mdev->md_sync_work.mdev = mdev;
1835         mdev->bm_io_work.w.mdev = mdev;
1836         mdev->start_resync_work.mdev = mdev;
1837
1838         init_timer(&mdev->resync_timer);
1839         init_timer(&mdev->md_sync_timer);
1840         init_timer(&mdev->start_resync_timer);
1841         init_timer(&mdev->request_timer);
1842         mdev->resync_timer.function = resync_timer_fn;
1843         mdev->resync_timer.data = (unsigned long) mdev;
1844         mdev->md_sync_timer.function = md_sync_timer_fn;
1845         mdev->md_sync_timer.data = (unsigned long) mdev;
1846         mdev->start_resync_timer.function = start_resync_timer_fn;
1847         mdev->start_resync_timer.data = (unsigned long) mdev;
1848         mdev->request_timer.function = request_timer_fn;
1849         mdev->request_timer.data = (unsigned long) mdev;
1850
1851         init_waitqueue_head(&mdev->misc_wait);
1852         init_waitqueue_head(&mdev->state_wait);
1853         init_waitqueue_head(&mdev->ee_wait);
1854         init_waitqueue_head(&mdev->al_wait);
1855         init_waitqueue_head(&mdev->seq_wait);
1856
1857         /* mdev->tconn->agreed_pro_version gets initialized in drbd_connect() */
1858         mdev->write_ordering = WO_bdev_flush;
1859         mdev->resync_wenr = LC_FREE;
1860         mdev->peer_max_bio_size = DRBD_MAX_BIO_SIZE_SAFE;
1861         mdev->local_max_bio_size = DRBD_MAX_BIO_SIZE_SAFE;
1862 }
1863
1864 void drbd_mdev_cleanup(struct drbd_conf *mdev)
1865 {
1866         int i;
1867         if (mdev->tconn->receiver.t_state != NONE)
1868                 dev_err(DEV, "ASSERT FAILED: receiver t_state == %d expected 0.\n",
1869                                 mdev->tconn->receiver.t_state);
1870
1871         /* no need to lock it, I'm the only thread alive */
1872         if (atomic_read(&mdev->current_epoch->epoch_size) !=  0)
1873                 dev_err(DEV, "epoch_size:%d\n", atomic_read(&mdev->current_epoch->epoch_size));
1874         mdev->al_writ_cnt  =
1875         mdev->bm_writ_cnt  =
1876         mdev->read_cnt     =
1877         mdev->recv_cnt     =
1878         mdev->send_cnt     =
1879         mdev->writ_cnt     =
1880         mdev->p_size       =
1881         mdev->rs_start     =
1882         mdev->rs_total     =
1883         mdev->rs_failed    = 0;
1884         mdev->rs_last_events = 0;
1885         mdev->rs_last_sect_ev = 0;
1886         for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1887                 mdev->rs_mark_left[i] = 0;
1888                 mdev->rs_mark_time[i] = 0;
1889         }
1890         D_ASSERT(mdev->tconn->net_conf == NULL);
1891
1892         drbd_set_my_capacity(mdev, 0);
1893         if (mdev->bitmap) {
1894                 /* maybe never allocated. */
1895                 drbd_bm_resize(mdev, 0, 1);
1896                 drbd_bm_cleanup(mdev);
1897         }
1898
1899         drbd_free_resources(mdev);
1900         clear_bit(AL_SUSPENDED, &mdev->flags);
1901
1902         /*
1903          * currently we drbd_init_ee only on module load, so
1904          * we may do drbd_release_ee only on module unload!
1905          */
1906         D_ASSERT(list_empty(&mdev->active_ee));
1907         D_ASSERT(list_empty(&mdev->sync_ee));
1908         D_ASSERT(list_empty(&mdev->done_ee));
1909         D_ASSERT(list_empty(&mdev->read_ee));
1910         D_ASSERT(list_empty(&mdev->net_ee));
1911         D_ASSERT(list_empty(&mdev->resync_reads));
1912         D_ASSERT(list_empty(&mdev->tconn->data.work.q));
1913         D_ASSERT(list_empty(&mdev->tconn->meta.work.q));
1914         D_ASSERT(list_empty(&mdev->resync_work.list));
1915         D_ASSERT(list_empty(&mdev->unplug_work.list));
1916         D_ASSERT(list_empty(&mdev->go_diskless.list));
1917
1918         drbd_set_defaults(mdev);
1919 }
1920
1921
1922 static void drbd_destroy_mempools(void)
1923 {
1924         struct page *page;
1925
1926         while (drbd_pp_pool) {
1927                 page = drbd_pp_pool;
1928                 drbd_pp_pool = (struct page *)page_private(page);
1929                 __free_page(page);
1930                 drbd_pp_vacant--;
1931         }
1932
1933         /* D_ASSERT(atomic_read(&drbd_pp_vacant)==0); */
1934
1935         if (drbd_ee_mempool)
1936                 mempool_destroy(drbd_ee_mempool);
1937         if (drbd_request_mempool)
1938                 mempool_destroy(drbd_request_mempool);
1939         if (drbd_ee_cache)
1940                 kmem_cache_destroy(drbd_ee_cache);
1941         if (drbd_request_cache)
1942                 kmem_cache_destroy(drbd_request_cache);
1943         if (drbd_bm_ext_cache)
1944                 kmem_cache_destroy(drbd_bm_ext_cache);
1945         if (drbd_al_ext_cache)
1946                 kmem_cache_destroy(drbd_al_ext_cache);
1947
1948         drbd_ee_mempool      = NULL;
1949         drbd_request_mempool = NULL;
1950         drbd_ee_cache        = NULL;
1951         drbd_request_cache   = NULL;
1952         drbd_bm_ext_cache    = NULL;
1953         drbd_al_ext_cache    = NULL;
1954
1955         return;
1956 }
1957
1958 static int drbd_create_mempools(void)
1959 {
1960         struct page *page;
1961         const int number = (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count;
1962         int i;
1963
1964         /* prepare our caches and mempools */
1965         drbd_request_mempool = NULL;
1966         drbd_ee_cache        = NULL;
1967         drbd_request_cache   = NULL;
1968         drbd_bm_ext_cache    = NULL;
1969         drbd_al_ext_cache    = NULL;
1970         drbd_pp_pool         = NULL;
1971
1972         /* caches */
1973         drbd_request_cache = kmem_cache_create(
1974                 "drbd_req", sizeof(struct drbd_request), 0, 0, NULL);
1975         if (drbd_request_cache == NULL)
1976                 goto Enomem;
1977
1978         drbd_ee_cache = kmem_cache_create(
1979                 "drbd_ee", sizeof(struct drbd_peer_request), 0, 0, NULL);
1980         if (drbd_ee_cache == NULL)
1981                 goto Enomem;
1982
1983         drbd_bm_ext_cache = kmem_cache_create(
1984                 "drbd_bm", sizeof(struct bm_extent), 0, 0, NULL);
1985         if (drbd_bm_ext_cache == NULL)
1986                 goto Enomem;
1987
1988         drbd_al_ext_cache = kmem_cache_create(
1989                 "drbd_al", sizeof(struct lc_element), 0, 0, NULL);
1990         if (drbd_al_ext_cache == NULL)
1991                 goto Enomem;
1992
1993         /* mempools */
1994         drbd_request_mempool = mempool_create(number,
1995                 mempool_alloc_slab, mempool_free_slab, drbd_request_cache);
1996         if (drbd_request_mempool == NULL)
1997                 goto Enomem;
1998
1999         drbd_ee_mempool = mempool_create(number,
2000                 mempool_alloc_slab, mempool_free_slab, drbd_ee_cache);
2001         if (drbd_ee_mempool == NULL)
2002                 goto Enomem;
2003
2004         /* drbd's page pool */
2005         spin_lock_init(&drbd_pp_lock);
2006
2007         for (i = 0; i < number; i++) {
2008                 page = alloc_page(GFP_HIGHUSER);
2009                 if (!page)
2010                         goto Enomem;
2011                 set_page_private(page, (unsigned long)drbd_pp_pool);
2012                 drbd_pp_pool = page;
2013         }
2014         drbd_pp_vacant = number;
2015
2016         return 0;
2017
2018 Enomem:
2019         drbd_destroy_mempools(); /* in case we allocated some */
2020         return -ENOMEM;
2021 }
2022
2023 static int drbd_notify_sys(struct notifier_block *this, unsigned long code,
2024         void *unused)
2025 {
2026         /* just so we have it.  you never know what interesting things we
2027          * might want to do here some day...
2028          */
2029
2030         return NOTIFY_DONE;
2031 }
2032
2033 static struct notifier_block drbd_notifier = {
2034         .notifier_call = drbd_notify_sys,
2035 };
2036
2037 static void drbd_release_ee_lists(struct drbd_conf *mdev)
2038 {
2039         int rr;
2040
2041         rr = drbd_release_ee(mdev, &mdev->active_ee);
2042         if (rr)
2043                 dev_err(DEV, "%d EEs in active list found!\n", rr);
2044
2045         rr = drbd_release_ee(mdev, &mdev->sync_ee);
2046         if (rr)
2047                 dev_err(DEV, "%d EEs in sync list found!\n", rr);
2048
2049         rr = drbd_release_ee(mdev, &mdev->read_ee);
2050         if (rr)
2051                 dev_err(DEV, "%d EEs in read list found!\n", rr);
2052
2053         rr = drbd_release_ee(mdev, &mdev->done_ee);
2054         if (rr)
2055                 dev_err(DEV, "%d EEs in done list found!\n", rr);
2056
2057         rr = drbd_release_ee(mdev, &mdev->net_ee);
2058         if (rr)
2059                 dev_err(DEV, "%d EEs in net list found!\n", rr);
2060 }
2061
2062 /* caution. no locking.
2063  * currently only used from module cleanup code. */
2064 static void drbd_delete_device(unsigned int minor)
2065 {
2066         struct drbd_conf *mdev = minor_to_mdev(minor);
2067
2068         if (!mdev)
2069                 return;
2070
2071         /* paranoia asserts */
2072         D_ASSERT(mdev->open_cnt == 0);
2073         D_ASSERT(list_empty(&mdev->tconn->data.work.q));
2074         /* end paranoia asserts */
2075
2076         del_gendisk(mdev->vdisk);
2077
2078         /* cleanup stuff that may have been allocated during
2079          * device (re-)configuration or state changes */
2080
2081         if (mdev->this_bdev)
2082                 bdput(mdev->this_bdev);
2083
2084         drbd_free_resources(mdev);
2085         drbd_free_tconn(mdev->tconn);
2086
2087         drbd_release_ee_lists(mdev);
2088
2089         lc_destroy(mdev->act_log);
2090         lc_destroy(mdev->resync);
2091
2092         kfree(mdev->p_uuid);
2093         /* mdev->p_uuid = NULL; */
2094
2095         /* cleanup the rest that has been
2096          * allocated from drbd_new_device
2097          * and actually free the mdev itself */
2098         drbd_free_mdev(mdev);
2099 }
2100
2101 static void drbd_cleanup(void)
2102 {
2103         unsigned int i;
2104
2105         unregister_reboot_notifier(&drbd_notifier);
2106
2107         /* first remove proc,
2108          * drbdsetup uses it's presence to detect
2109          * whether DRBD is loaded.
2110          * If we would get stuck in proc removal,
2111          * but have netlink already deregistered,
2112          * some drbdsetup commands may wait forever
2113          * for an answer.
2114          */
2115         if (drbd_proc)
2116                 remove_proc_entry("drbd", NULL);
2117
2118         drbd_nl_cleanup();
2119
2120         if (minor_table) {
2121                 i = minor_count;
2122                 while (i--)
2123                         drbd_delete_device(i);
2124                 drbd_destroy_mempools();
2125         }
2126
2127         kfree(minor_table);
2128
2129         unregister_blkdev(DRBD_MAJOR, "drbd");
2130
2131         printk(KERN_INFO "drbd: module cleanup done.\n");
2132 }
2133
2134 /**
2135  * drbd_congested() - Callback for pdflush
2136  * @congested_data:     User data
2137  * @bdi_bits:           Bits pdflush is currently interested in
2138  *
2139  * Returns 1<<BDI_async_congested and/or 1<<BDI_sync_congested if we are congested.
2140  */
2141 static int drbd_congested(void *congested_data, int bdi_bits)
2142 {
2143         struct drbd_conf *mdev = congested_data;
2144         struct request_queue *q;
2145         char reason = '-';
2146         int r = 0;
2147
2148         if (!may_inc_ap_bio(mdev)) {
2149                 /* DRBD has frozen IO */
2150                 r = bdi_bits;
2151                 reason = 'd';
2152                 goto out;
2153         }
2154
2155         if (get_ldev(mdev)) {
2156                 q = bdev_get_queue(mdev->ldev->backing_bdev);
2157                 r = bdi_congested(&q->backing_dev_info, bdi_bits);
2158                 put_ldev(mdev);
2159                 if (r)
2160                         reason = 'b';
2161         }
2162
2163         if (bdi_bits & (1 << BDI_async_congested) && test_bit(NET_CONGESTED, &mdev->tconn->flags)) {
2164                 r |= (1 << BDI_async_congested);
2165                 reason = reason == 'b' ? 'a' : 'n';
2166         }
2167
2168 out:
2169         mdev->congestion_reason = reason;
2170         return r;
2171 }
2172
2173 static void drbd_init_workqueue(struct drbd_work_queue* wq)
2174 {
2175         sema_init(&wq->s, 0);
2176         spin_lock_init(&wq->q_lock);
2177         INIT_LIST_HEAD(&wq->q);
2178 }
2179
2180 struct drbd_tconn *drbd_new_tconn(char *name)
2181 {
2182         struct drbd_tconn *tconn;
2183
2184         tconn = kzalloc(sizeof(struct drbd_tconn), GFP_KERNEL);
2185         if (!tconn)
2186                 return NULL;
2187
2188         tconn->name = kstrdup(name, GFP_KERNEL);
2189         if (!tconn->name)
2190                 goto fail;
2191
2192         tconn->cstate = C_STANDALONE;
2193         mutex_init(&tconn->cstate_mutex);
2194         spin_lock_init(&tconn->req_lock);
2195         atomic_set(&tconn->net_cnt, 0);
2196         init_waitqueue_head(&tconn->net_cnt_wait);
2197         init_waitqueue_head(&tconn->ping_wait);
2198         idr_init(&tconn->volumes);
2199
2200         drbd_init_workqueue(&tconn->data.work);
2201         mutex_init(&tconn->data.mutex);
2202
2203         drbd_init_workqueue(&tconn->meta.work);
2204         mutex_init(&tconn->meta.mutex);
2205
2206         drbd_thread_init(tconn, &tconn->receiver, drbdd_init, "receiver");
2207         drbd_thread_init(tconn, &tconn->worker, drbd_worker, "worker");
2208         drbd_thread_init(tconn, &tconn->asender, drbd_asender, "asender");
2209
2210         write_lock_irq(&global_state_lock);
2211         list_add(&tconn->all_tconn, &drbd_tconns);
2212         write_unlock_irq(&global_state_lock);
2213
2214         return tconn;
2215
2216 fail:
2217         kfree(tconn->name);
2218         kfree(tconn);
2219
2220         return NULL;
2221 }
2222
2223 void drbd_free_tconn(struct drbd_tconn *tconn)
2224 {
2225         write_lock_irq(&global_state_lock);
2226         list_del(&tconn->all_tconn);
2227         write_unlock_irq(&global_state_lock);
2228         idr_destroy(&tconn->volumes);
2229
2230         kfree(tconn->name);
2231         kfree(tconn->int_dig_out);
2232         kfree(tconn->int_dig_in);
2233         kfree(tconn->int_dig_vv);
2234         kfree(tconn);
2235 }
2236
2237 struct drbd_conf *drbd_new_device(unsigned int minor)
2238 {
2239         struct drbd_conf *mdev;
2240         struct gendisk *disk;
2241         struct request_queue *q;
2242         char conn_name[9]; /* drbd1234N */
2243         int vnr;
2244
2245         /* GFP_KERNEL, we are outside of all write-out paths */
2246         mdev = kzalloc(sizeof(struct drbd_conf), GFP_KERNEL);
2247         if (!mdev)
2248                 return NULL;
2249         sprintf(conn_name, "drbd%d", minor);
2250         mdev->tconn = drbd_new_tconn(conn_name);
2251         if (!mdev->tconn)
2252                 goto out_no_tconn;
2253         if (!idr_pre_get(&mdev->tconn->volumes, GFP_KERNEL))
2254                 goto out_no_cpumask;
2255         if (idr_get_new(&mdev->tconn->volumes, mdev, &vnr))
2256                 goto out_no_cpumask;
2257         if (vnr != 0) {
2258                 dev_err(DEV, "vnr = %d\n", vnr);
2259                 goto out_no_cpumask;
2260         }
2261         if (!zalloc_cpumask_var(&mdev->tconn->cpu_mask, GFP_KERNEL))
2262                 goto out_no_cpumask;
2263
2264         mdev->minor = minor;
2265
2266         drbd_init_set_defaults(mdev);
2267
2268         q = blk_alloc_queue(GFP_KERNEL);
2269         if (!q)
2270                 goto out_no_q;
2271         mdev->rq_queue = q;
2272         q->queuedata   = mdev;
2273
2274         disk = alloc_disk(1);
2275         if (!disk)
2276                 goto out_no_disk;
2277         mdev->vdisk = disk;
2278
2279         set_disk_ro(disk, true);
2280
2281         disk->queue = q;
2282         disk->major = DRBD_MAJOR;
2283         disk->first_minor = minor;
2284         disk->fops = &drbd_ops;
2285         sprintf(disk->disk_name, "drbd%d", minor);
2286         disk->private_data = mdev;
2287
2288         mdev->this_bdev = bdget(MKDEV(DRBD_MAJOR, minor));
2289         /* we have no partitions. we contain only ourselves. */
2290         mdev->this_bdev->bd_contains = mdev->this_bdev;
2291
2292         q->backing_dev_info.congested_fn = drbd_congested;
2293         q->backing_dev_info.congested_data = mdev;
2294
2295         blk_queue_make_request(q, drbd_make_request);
2296         /* Setting the max_hw_sectors to an odd value of 8kibyte here
2297            This triggers a max_bio_size message upon first attach or connect */
2298         blk_queue_max_hw_sectors(q, DRBD_MAX_BIO_SIZE_SAFE >> 8);
2299         blk_queue_bounce_limit(q, BLK_BOUNCE_ANY);
2300         blk_queue_merge_bvec(q, drbd_merge_bvec);
2301         q->queue_lock = &mdev->tconn->req_lock; /* needed since we use */
2302
2303         mdev->md_io_page = alloc_page(GFP_KERNEL);
2304         if (!mdev->md_io_page)
2305                 goto out_no_io_page;
2306
2307         if (drbd_bm_init(mdev))
2308                 goto out_no_bitmap;
2309         /* no need to lock access, we are still initializing this minor device. */
2310         if (!tl_init(mdev))
2311                 goto out_no_tl;
2312         mdev->read_requests = RB_ROOT;
2313         mdev->write_requests = RB_ROOT;
2314
2315         mdev->current_epoch = kzalloc(sizeof(struct drbd_epoch), GFP_KERNEL);
2316         if (!mdev->current_epoch)
2317                 goto out_no_epoch;
2318
2319         INIT_LIST_HEAD(&mdev->current_epoch->list);
2320         mdev->epochs = 1;
2321
2322         return mdev;
2323
2324 /* out_whatever_else:
2325         kfree(mdev->current_epoch); */
2326 out_no_epoch:
2327         tl_cleanup(mdev);
2328 out_no_tl:
2329         drbd_bm_cleanup(mdev);
2330 out_no_bitmap:
2331         __free_page(mdev->md_io_page);
2332 out_no_io_page:
2333         put_disk(disk);
2334 out_no_disk:
2335         blk_cleanup_queue(q);
2336 out_no_q:
2337         free_cpumask_var(mdev->tconn->cpu_mask);
2338 out_no_cpumask:
2339         drbd_free_tconn(mdev->tconn);
2340 out_no_tconn:
2341         kfree(mdev);
2342         return NULL;
2343 }
2344
2345 /* counterpart of drbd_new_device.
2346  * last part of drbd_delete_device. */
2347 void drbd_free_mdev(struct drbd_conf *mdev)
2348 {
2349         kfree(mdev->current_epoch);
2350         tl_cleanup(mdev);
2351         if (mdev->bitmap) /* should no longer be there. */
2352                 drbd_bm_cleanup(mdev);
2353         __free_page(mdev->md_io_page);
2354         put_disk(mdev->vdisk);
2355         blk_cleanup_queue(mdev->rq_queue);
2356         kfree(mdev);
2357 }
2358
2359
2360 int __init drbd_init(void)
2361 {
2362         int err;
2363
2364         BUILD_BUG_ON(sizeof(struct p_header80) != sizeof(struct p_header95));
2365         BUILD_BUG_ON(sizeof(struct p_handshake) != 80);
2366
2367         if (minor_count < DRBD_MINOR_COUNT_MIN || minor_count > DRBD_MINOR_COUNT_MAX) {
2368                 printk(KERN_ERR
2369                         "drbd: invalid minor_count (%d)\n", minor_count);
2370 #ifdef MODULE
2371                 return -EINVAL;
2372 #else
2373                 minor_count = 8;
2374 #endif
2375         }
2376
2377         err = drbd_nl_init();
2378         if (err)
2379                 return err;
2380
2381         err = register_blkdev(DRBD_MAJOR, "drbd");
2382         if (err) {
2383                 printk(KERN_ERR
2384                        "drbd: unable to register block device major %d\n",
2385                        DRBD_MAJOR);
2386                 return err;
2387         }
2388
2389         register_reboot_notifier(&drbd_notifier);
2390
2391         /*
2392          * allocate all necessary structs
2393          */
2394         err = -ENOMEM;
2395
2396         init_waitqueue_head(&drbd_pp_wait);
2397
2398         drbd_proc = NULL; /* play safe for drbd_cleanup */
2399         minor_table = kzalloc(sizeof(struct drbd_conf *)*minor_count,
2400                                 GFP_KERNEL);
2401         if (!minor_table)
2402                 goto Enomem;
2403
2404         err = drbd_create_mempools();
2405         if (err)
2406                 goto Enomem;
2407
2408         drbd_proc = proc_create_data("drbd", S_IFREG | S_IRUGO , NULL, &drbd_proc_fops, NULL);
2409         if (!drbd_proc) {
2410                 printk(KERN_ERR "drbd: unable to register proc file\n");
2411                 goto Enomem;
2412         }
2413
2414         rwlock_init(&global_state_lock);
2415         INIT_LIST_HEAD(&drbd_tconns);
2416
2417         printk(KERN_INFO "drbd: initialized. "
2418                "Version: " REL_VERSION " (api:%d/proto:%d-%d)\n",
2419                API_VERSION, PRO_VERSION_MIN, PRO_VERSION_MAX);
2420         printk(KERN_INFO "drbd: %s\n", drbd_buildtag());
2421         printk(KERN_INFO "drbd: registered as block device major %d\n",
2422                 DRBD_MAJOR);
2423         printk(KERN_INFO "drbd: minor_table @ 0x%p\n", minor_table);
2424
2425         return 0; /* Success! */
2426
2427 Enomem:
2428         drbd_cleanup();
2429         if (err == -ENOMEM)
2430                 /* currently always the case */
2431                 printk(KERN_ERR "drbd: ran out of memory\n");
2432         else
2433                 printk(KERN_ERR "drbd: initialization failure\n");
2434         return err;
2435 }
2436
2437 void drbd_free_bc(struct drbd_backing_dev *ldev)
2438 {
2439         if (ldev == NULL)
2440                 return;
2441
2442         blkdev_put(ldev->backing_bdev, FMODE_READ | FMODE_WRITE | FMODE_EXCL);
2443         blkdev_put(ldev->md_bdev, FMODE_READ | FMODE_WRITE | FMODE_EXCL);
2444
2445         kfree(ldev);
2446 }
2447
2448 void drbd_free_sock(struct drbd_tconn *tconn)
2449 {
2450         if (tconn->data.socket) {
2451                 mutex_lock(&tconn->data.mutex);
2452                 kernel_sock_shutdown(tconn->data.socket, SHUT_RDWR);
2453                 sock_release(tconn->data.socket);
2454                 tconn->data.socket = NULL;
2455                 mutex_unlock(&tconn->data.mutex);
2456         }
2457         if (tconn->meta.socket) {
2458                 mutex_lock(&tconn->meta.mutex);
2459                 kernel_sock_shutdown(tconn->meta.socket, SHUT_RDWR);
2460                 sock_release(tconn->meta.socket);
2461                 tconn->meta.socket = NULL;
2462                 mutex_unlock(&tconn->meta.mutex);
2463         }
2464 }
2465
2466
2467 void drbd_free_resources(struct drbd_conf *mdev)
2468 {
2469         crypto_free_hash(mdev->csums_tfm);
2470         mdev->csums_tfm = NULL;
2471         crypto_free_hash(mdev->verify_tfm);
2472         mdev->verify_tfm = NULL;
2473         crypto_free_hash(mdev->tconn->cram_hmac_tfm);
2474         mdev->tconn->cram_hmac_tfm = NULL;
2475         crypto_free_hash(mdev->tconn->integrity_w_tfm);
2476         mdev->tconn->integrity_w_tfm = NULL;
2477         crypto_free_hash(mdev->tconn->integrity_r_tfm);
2478         mdev->tconn->integrity_r_tfm = NULL;
2479
2480         drbd_free_sock(mdev->tconn);
2481
2482         __no_warn(local,
2483                   drbd_free_bc(mdev->ldev);
2484                   mdev->ldev = NULL;);
2485 }
2486
2487 /* meta data management */
2488
2489 struct meta_data_on_disk {
2490         u64 la_size;           /* last agreed size. */
2491         u64 uuid[UI_SIZE];   /* UUIDs. */
2492         u64 device_uuid;
2493         u64 reserved_u64_1;
2494         u32 flags;             /* MDF */
2495         u32 magic;
2496         u32 md_size_sect;
2497         u32 al_offset;         /* offset to this block */
2498         u32 al_nr_extents;     /* important for restoring the AL */
2499               /* `-- act_log->nr_elements <-- sync_conf.al_extents */
2500         u32 bm_offset;         /* offset to the bitmap, from here */
2501         u32 bm_bytes_per_bit;  /* BM_BLOCK_SIZE */
2502         u32 la_peer_max_bio_size;   /* last peer max_bio_size */
2503         u32 reserved_u32[3];
2504
2505 } __packed;
2506
2507 /**
2508  * drbd_md_sync() - Writes the meta data super block if the MD_DIRTY flag bit is set
2509  * @mdev:       DRBD device.
2510  */
2511 void drbd_md_sync(struct drbd_conf *mdev)
2512 {
2513         struct meta_data_on_disk *buffer;
2514         sector_t sector;
2515         int i;
2516
2517         del_timer(&mdev->md_sync_timer);
2518         /* timer may be rearmed by drbd_md_mark_dirty() now. */
2519         if (!test_and_clear_bit(MD_DIRTY, &mdev->flags))
2520                 return;
2521
2522         /* We use here D_FAILED and not D_ATTACHING because we try to write
2523          * metadata even if we detach due to a disk failure! */
2524         if (!get_ldev_if_state(mdev, D_FAILED))
2525                 return;
2526
2527         mutex_lock(&mdev->md_io_mutex);
2528         buffer = (struct meta_data_on_disk *)page_address(mdev->md_io_page);
2529         memset(buffer, 0, 512);
2530
2531         buffer->la_size = cpu_to_be64(drbd_get_capacity(mdev->this_bdev));
2532         for (i = UI_CURRENT; i < UI_SIZE; i++)
2533                 buffer->uuid[i] = cpu_to_be64(mdev->ldev->md.uuid[i]);
2534         buffer->flags = cpu_to_be32(mdev->ldev->md.flags);
2535         buffer->magic = cpu_to_be32(DRBD_MD_MAGIC);
2536
2537         buffer->md_size_sect  = cpu_to_be32(mdev->ldev->md.md_size_sect);
2538         buffer->al_offset     = cpu_to_be32(mdev->ldev->md.al_offset);
2539         buffer->al_nr_extents = cpu_to_be32(mdev->act_log->nr_elements);
2540         buffer->bm_bytes_per_bit = cpu_to_be32(BM_BLOCK_SIZE);
2541         buffer->device_uuid = cpu_to_be64(mdev->ldev->md.device_uuid);
2542
2543         buffer->bm_offset = cpu_to_be32(mdev->ldev->md.bm_offset);
2544         buffer->la_peer_max_bio_size = cpu_to_be32(mdev->peer_max_bio_size);
2545
2546         D_ASSERT(drbd_md_ss__(mdev, mdev->ldev) == mdev->ldev->md.md_offset);
2547         sector = mdev->ldev->md.md_offset;
2548
2549         if (!drbd_md_sync_page_io(mdev, mdev->ldev, sector, WRITE)) {
2550                 /* this was a try anyways ... */
2551                 dev_err(DEV, "meta data update failed!\n");
2552                 drbd_chk_io_error(mdev, 1, true);
2553         }
2554
2555         /* Update mdev->ldev->md.la_size_sect,
2556          * since we updated it on metadata. */
2557         mdev->ldev->md.la_size_sect = drbd_get_capacity(mdev->this_bdev);
2558
2559         mutex_unlock(&mdev->md_io_mutex);
2560         put_ldev(mdev);
2561 }
2562
2563 /**
2564  * drbd_md_read() - Reads in the meta data super block
2565  * @mdev:       DRBD device.
2566  * @bdev:       Device from which the meta data should be read in.
2567  *
2568  * Return 0 (NO_ERROR) on success, and an enum drbd_ret_code in case
2569  * something goes wrong.  Currently only: ERR_IO_MD_DISK, ERR_MD_INVALID.
2570  */
2571 int drbd_md_read(struct drbd_conf *mdev, struct drbd_backing_dev *bdev)
2572 {
2573         struct meta_data_on_disk *buffer;
2574         int i, rv = NO_ERROR;
2575
2576         if (!get_ldev_if_state(mdev, D_ATTACHING))
2577                 return ERR_IO_MD_DISK;
2578
2579         mutex_lock(&mdev->md_io_mutex);
2580         buffer = (struct meta_data_on_disk *)page_address(mdev->md_io_page);
2581
2582         if (!drbd_md_sync_page_io(mdev, bdev, bdev->md.md_offset, READ)) {
2583                 /* NOTE: can't do normal error processing here as this is
2584                    called BEFORE disk is attached */
2585                 dev_err(DEV, "Error while reading metadata.\n");
2586                 rv = ERR_IO_MD_DISK;
2587                 goto err;
2588         }
2589
2590         if (buffer->magic != cpu_to_be32(DRBD_MD_MAGIC)) {
2591                 dev_err(DEV, "Error while reading metadata, magic not found.\n");
2592                 rv = ERR_MD_INVALID;
2593                 goto err;
2594         }
2595         if (be32_to_cpu(buffer->al_offset) != bdev->md.al_offset) {
2596                 dev_err(DEV, "unexpected al_offset: %d (expected %d)\n",
2597                     be32_to_cpu(buffer->al_offset), bdev->md.al_offset);
2598                 rv = ERR_MD_INVALID;
2599                 goto err;
2600         }
2601         if (be32_to_cpu(buffer->bm_offset) != bdev->md.bm_offset) {
2602                 dev_err(DEV, "unexpected bm_offset: %d (expected %d)\n",
2603                     be32_to_cpu(buffer->bm_offset), bdev->md.bm_offset);
2604                 rv = ERR_MD_INVALID;
2605                 goto err;
2606         }
2607         if (be32_to_cpu(buffer->md_size_sect) != bdev->md.md_size_sect) {
2608                 dev_err(DEV, "unexpected md_size: %u (expected %u)\n",
2609                     be32_to_cpu(buffer->md_size_sect), bdev->md.md_size_sect);
2610                 rv = ERR_MD_INVALID;
2611                 goto err;
2612         }
2613
2614         if (be32_to_cpu(buffer->bm_bytes_per_bit) != BM_BLOCK_SIZE) {
2615                 dev_err(DEV, "unexpected bm_bytes_per_bit: %u (expected %u)\n",
2616                     be32_to_cpu(buffer->bm_bytes_per_bit), BM_BLOCK_SIZE);
2617                 rv = ERR_MD_INVALID;
2618                 goto err;
2619         }
2620
2621         bdev->md.la_size_sect = be64_to_cpu(buffer->la_size);
2622         for (i = UI_CURRENT; i < UI_SIZE; i++)
2623                 bdev->md.uuid[i] = be64_to_cpu(buffer->uuid[i]);
2624         bdev->md.flags = be32_to_cpu(buffer->flags);
2625         mdev->sync_conf.al_extents = be32_to_cpu(buffer->al_nr_extents);
2626         bdev->md.device_uuid = be64_to_cpu(buffer->device_uuid);
2627
2628         spin_lock_irq(&mdev->tconn->req_lock);
2629         if (mdev->state.conn < C_CONNECTED) {
2630                 int peer;
2631                 peer = be32_to_cpu(buffer->la_peer_max_bio_size);
2632                 peer = max_t(int, peer, DRBD_MAX_BIO_SIZE_SAFE);
2633                 mdev->peer_max_bio_size = peer;
2634         }
2635         spin_unlock_irq(&mdev->tconn->req_lock);
2636
2637         if (mdev->sync_conf.al_extents < 7)
2638                 mdev->sync_conf.al_extents = 127;
2639
2640  err:
2641         mutex_unlock(&mdev->md_io_mutex);
2642         put_ldev(mdev);
2643
2644         return rv;
2645 }
2646
2647 /**
2648  * drbd_md_mark_dirty() - Mark meta data super block as dirty
2649  * @mdev:       DRBD device.
2650  *
2651  * Call this function if you change anything that should be written to
2652  * the meta-data super block. This function sets MD_DIRTY, and starts a
2653  * timer that ensures that within five seconds you have to call drbd_md_sync().
2654  */
2655 #ifdef DEBUG
2656 void drbd_md_mark_dirty_(struct drbd_conf *mdev, unsigned int line, const char *func)
2657 {
2658         if (!test_and_set_bit(MD_DIRTY, &mdev->flags)) {
2659                 mod_timer(&mdev->md_sync_timer, jiffies + HZ);
2660                 mdev->last_md_mark_dirty.line = line;
2661                 mdev->last_md_mark_dirty.func = func;
2662         }
2663 }
2664 #else
2665 void drbd_md_mark_dirty(struct drbd_conf *mdev)
2666 {
2667         if (!test_and_set_bit(MD_DIRTY, &mdev->flags))
2668                 mod_timer(&mdev->md_sync_timer, jiffies + 5*HZ);
2669 }
2670 #endif
2671
2672 static void drbd_uuid_move_history(struct drbd_conf *mdev) __must_hold(local)
2673 {
2674         int i;
2675
2676         for (i = UI_HISTORY_START; i < UI_HISTORY_END; i++)
2677                 mdev->ldev->md.uuid[i+1] = mdev->ldev->md.uuid[i];
2678 }
2679
2680 void _drbd_uuid_set(struct drbd_conf *mdev, int idx, u64 val) __must_hold(local)
2681 {
2682         if (idx == UI_CURRENT) {
2683                 if (mdev->state.role == R_PRIMARY)
2684                         val |= 1;
2685                 else
2686                         val &= ~((u64)1);
2687
2688                 drbd_set_ed_uuid(mdev, val);
2689         }
2690
2691         mdev->ldev->md.uuid[idx] = val;
2692         drbd_md_mark_dirty(mdev);
2693 }
2694
2695
2696 void drbd_uuid_set(struct drbd_conf *mdev, int idx, u64 val) __must_hold(local)
2697 {
2698         if (mdev->ldev->md.uuid[idx]) {
2699                 drbd_uuid_move_history(mdev);
2700                 mdev->ldev->md.uuid[UI_HISTORY_START] = mdev->ldev->md.uuid[idx];
2701         }
2702         _drbd_uuid_set(mdev, idx, val);
2703 }
2704
2705 /**
2706  * drbd_uuid_new_current() - Creates a new current UUID
2707  * @mdev:       DRBD device.
2708  *
2709  * Creates a new current UUID, and rotates the old current UUID into
2710  * the bitmap slot. Causes an incremental resync upon next connect.
2711  */
2712 void drbd_uuid_new_current(struct drbd_conf *mdev) __must_hold(local)
2713 {
2714         u64 val;
2715         unsigned long long bm_uuid = mdev->ldev->md.uuid[UI_BITMAP];
2716
2717         if (bm_uuid)
2718                 dev_warn(DEV, "bm UUID was already set: %llX\n", bm_uuid);
2719
2720         mdev->ldev->md.uuid[UI_BITMAP] = mdev->ldev->md.uuid[UI_CURRENT];
2721
2722         get_random_bytes(&val, sizeof(u64));
2723         _drbd_uuid_set(mdev, UI_CURRENT, val);
2724         drbd_print_uuids(mdev, "new current UUID");
2725         /* get it to stable storage _now_ */
2726         drbd_md_sync(mdev);
2727 }
2728
2729 void drbd_uuid_set_bm(struct drbd_conf *mdev, u64 val) __must_hold(local)
2730 {
2731         if (mdev->ldev->md.uuid[UI_BITMAP] == 0 && val == 0)
2732                 return;
2733
2734         if (val == 0) {
2735                 drbd_uuid_move_history(mdev);
2736                 mdev->ldev->md.uuid[UI_HISTORY_START] = mdev->ldev->md.uuid[UI_BITMAP];
2737                 mdev->ldev->md.uuid[UI_BITMAP] = 0;
2738         } else {
2739                 unsigned long long bm_uuid = mdev->ldev->md.uuid[UI_BITMAP];
2740                 if (bm_uuid)
2741                         dev_warn(DEV, "bm UUID was already set: %llX\n", bm_uuid);
2742
2743                 mdev->ldev->md.uuid[UI_BITMAP] = val & ~((u64)1);
2744         }
2745         drbd_md_mark_dirty(mdev);
2746 }
2747
2748 /**
2749  * drbd_bmio_set_n_write() - io_fn for drbd_queue_bitmap_io() or drbd_bitmap_io()
2750  * @mdev:       DRBD device.
2751  *
2752  * Sets all bits in the bitmap and writes the whole bitmap to stable storage.
2753  */
2754 int drbd_bmio_set_n_write(struct drbd_conf *mdev)
2755 {
2756         int rv = -EIO;
2757
2758         if (get_ldev_if_state(mdev, D_ATTACHING)) {
2759                 drbd_md_set_flag(mdev, MDF_FULL_SYNC);
2760                 drbd_md_sync(mdev);
2761                 drbd_bm_set_all(mdev);
2762
2763                 rv = drbd_bm_write(mdev);
2764
2765                 if (!rv) {
2766                         drbd_md_clear_flag(mdev, MDF_FULL_SYNC);
2767                         drbd_md_sync(mdev);
2768                 }
2769
2770                 put_ldev(mdev);
2771         }
2772
2773         return rv;
2774 }
2775
2776 /**
2777  * drbd_bmio_clear_n_write() - io_fn for drbd_queue_bitmap_io() or drbd_bitmap_io()
2778  * @mdev:       DRBD device.
2779  *
2780  * Clears all bits in the bitmap and writes the whole bitmap to stable storage.
2781  */
2782 int drbd_bmio_clear_n_write(struct drbd_conf *mdev)
2783 {
2784         int rv = -EIO;
2785
2786         drbd_resume_al(mdev);
2787         if (get_ldev_if_state(mdev, D_ATTACHING)) {
2788                 drbd_bm_clear_all(mdev);
2789                 rv = drbd_bm_write(mdev);
2790                 put_ldev(mdev);
2791         }
2792
2793         return rv;
2794 }
2795
2796 static int w_bitmap_io(struct drbd_work *w, int unused)
2797 {
2798         struct bm_io_work *work = container_of(w, struct bm_io_work, w);
2799         struct drbd_conf *mdev = w->mdev;
2800         int rv = -EIO;
2801
2802         D_ASSERT(atomic_read(&mdev->ap_bio_cnt) == 0);
2803
2804         if (get_ldev(mdev)) {
2805                 drbd_bm_lock(mdev, work->why, work->flags);
2806                 rv = work->io_fn(mdev);
2807                 drbd_bm_unlock(mdev);
2808                 put_ldev(mdev);
2809         }
2810
2811         clear_bit(BITMAP_IO, &mdev->flags);
2812         smp_mb__after_clear_bit();
2813         wake_up(&mdev->misc_wait);
2814
2815         if (work->done)
2816                 work->done(mdev, rv);
2817
2818         clear_bit(BITMAP_IO_QUEUED, &mdev->flags);
2819         work->why = NULL;
2820         work->flags = 0;
2821
2822         return 1;
2823 }
2824
2825 void drbd_ldev_destroy(struct drbd_conf *mdev)
2826 {
2827         lc_destroy(mdev->resync);
2828         mdev->resync = NULL;
2829         lc_destroy(mdev->act_log);
2830         mdev->act_log = NULL;
2831         __no_warn(local,
2832                 drbd_free_bc(mdev->ldev);
2833                 mdev->ldev = NULL;);
2834
2835         if (mdev->md_io_tmpp) {
2836                 __free_page(mdev->md_io_tmpp);
2837                 mdev->md_io_tmpp = NULL;
2838         }
2839         clear_bit(GO_DISKLESS, &mdev->flags);
2840 }
2841
2842 static int w_go_diskless(struct drbd_work *w, int unused)
2843 {
2844         struct drbd_conf *mdev = w->mdev;
2845
2846         D_ASSERT(mdev->state.disk == D_FAILED);
2847         /* we cannot assert local_cnt == 0 here, as get_ldev_if_state will
2848          * inc/dec it frequently. Once we are D_DISKLESS, no one will touch
2849          * the protected members anymore, though, so once put_ldev reaches zero
2850          * again, it will be safe to free them. */
2851         drbd_force_state(mdev, NS(disk, D_DISKLESS));
2852         return 1;
2853 }
2854
2855 void drbd_go_diskless(struct drbd_conf *mdev)
2856 {
2857         D_ASSERT(mdev->state.disk == D_FAILED);
2858         if (!test_and_set_bit(GO_DISKLESS, &mdev->flags))
2859                 drbd_queue_work(&mdev->tconn->data.work, &mdev->go_diskless);
2860 }
2861
2862 /**
2863  * drbd_queue_bitmap_io() - Queues an IO operation on the whole bitmap
2864  * @mdev:       DRBD device.
2865  * @io_fn:      IO callback to be called when bitmap IO is possible
2866  * @done:       callback to be called after the bitmap IO was performed
2867  * @why:        Descriptive text of the reason for doing the IO
2868  *
2869  * While IO on the bitmap happens we freeze application IO thus we ensure
2870  * that drbd_set_out_of_sync() can not be called. This function MAY ONLY be
2871  * called from worker context. It MUST NOT be used while a previous such
2872  * work is still pending!
2873  */
2874 void drbd_queue_bitmap_io(struct drbd_conf *mdev,
2875                           int (*io_fn)(struct drbd_conf *),
2876                           void (*done)(struct drbd_conf *, int),
2877                           char *why, enum bm_flag flags)
2878 {
2879         D_ASSERT(current == mdev->tconn->worker.task);
2880
2881         D_ASSERT(!test_bit(BITMAP_IO_QUEUED, &mdev->flags));
2882         D_ASSERT(!test_bit(BITMAP_IO, &mdev->flags));
2883         D_ASSERT(list_empty(&mdev->bm_io_work.w.list));
2884         if (mdev->bm_io_work.why)
2885                 dev_err(DEV, "FIXME going to queue '%s' but '%s' still pending?\n",
2886                         why, mdev->bm_io_work.why);
2887
2888         mdev->bm_io_work.io_fn = io_fn;
2889         mdev->bm_io_work.done = done;
2890         mdev->bm_io_work.why = why;
2891         mdev->bm_io_work.flags = flags;
2892
2893         spin_lock_irq(&mdev->tconn->req_lock);
2894         set_bit(BITMAP_IO, &mdev->flags);
2895         if (atomic_read(&mdev->ap_bio_cnt) == 0) {
2896                 if (!test_and_set_bit(BITMAP_IO_QUEUED, &mdev->flags))
2897                         drbd_queue_work(&mdev->tconn->data.work, &mdev->bm_io_work.w);
2898         }
2899         spin_unlock_irq(&mdev->tconn->req_lock);
2900 }
2901
2902 /**
2903  * drbd_bitmap_io() -  Does an IO operation on the whole bitmap
2904  * @mdev:       DRBD device.
2905  * @io_fn:      IO callback to be called when bitmap IO is possible
2906  * @why:        Descriptive text of the reason for doing the IO
2907  *
2908  * freezes application IO while that the actual IO operations runs. This
2909  * functions MAY NOT be called from worker context.
2910  */
2911 int drbd_bitmap_io(struct drbd_conf *mdev, int (*io_fn)(struct drbd_conf *),
2912                 char *why, enum bm_flag flags)
2913 {
2914         int rv;
2915
2916         D_ASSERT(current != mdev->tconn->worker.task);
2917
2918         if ((flags & BM_LOCKED_SET_ALLOWED) == 0)
2919                 drbd_suspend_io(mdev);
2920
2921         drbd_bm_lock(mdev, why, flags);
2922         rv = io_fn(mdev);
2923         drbd_bm_unlock(mdev);
2924
2925         if ((flags & BM_LOCKED_SET_ALLOWED) == 0)
2926                 drbd_resume_io(mdev);
2927
2928         return rv;
2929 }
2930
2931 void drbd_md_set_flag(struct drbd_conf *mdev, int flag) __must_hold(local)
2932 {
2933         if ((mdev->ldev->md.flags & flag) != flag) {
2934                 drbd_md_mark_dirty(mdev);
2935                 mdev->ldev->md.flags |= flag;
2936         }
2937 }
2938
2939 void drbd_md_clear_flag(struct drbd_conf *mdev, int flag) __must_hold(local)
2940 {
2941         if ((mdev->ldev->md.flags & flag) != 0) {
2942                 drbd_md_mark_dirty(mdev);
2943                 mdev->ldev->md.flags &= ~flag;
2944         }
2945 }
2946 int drbd_md_test_flag(struct drbd_backing_dev *bdev, int flag)
2947 {
2948         return (bdev->md.flags & flag) != 0;
2949 }
2950
2951 static void md_sync_timer_fn(unsigned long data)
2952 {
2953         struct drbd_conf *mdev = (struct drbd_conf *) data;
2954
2955         drbd_queue_work_front(&mdev->tconn->data.work, &mdev->md_sync_work);
2956 }
2957
2958 static int w_md_sync(struct drbd_work *w, int unused)
2959 {
2960         struct drbd_conf *mdev = w->mdev;
2961
2962         dev_warn(DEV, "md_sync_timer expired! Worker calls drbd_md_sync().\n");
2963 #ifdef DEBUG
2964         dev_warn(DEV, "last md_mark_dirty: %s:%u\n",
2965                 mdev->last_md_mark_dirty.func, mdev->last_md_mark_dirty.line);
2966 #endif
2967         drbd_md_sync(mdev);
2968         return 1;
2969 }
2970
2971 const char *cmdname(enum drbd_packet cmd)
2972 {
2973         /* THINK may need to become several global tables
2974          * when we want to support more than
2975          * one PRO_VERSION */
2976         static const char *cmdnames[] = {
2977                 [P_DATA]                = "Data",
2978                 [P_DATA_REPLY]          = "DataReply",
2979                 [P_RS_DATA_REPLY]       = "RSDataReply",
2980                 [P_BARRIER]             = "Barrier",
2981                 [P_BITMAP]              = "ReportBitMap",
2982                 [P_BECOME_SYNC_TARGET]  = "BecomeSyncTarget",
2983                 [P_BECOME_SYNC_SOURCE]  = "BecomeSyncSource",
2984                 [P_UNPLUG_REMOTE]       = "UnplugRemote",
2985                 [P_DATA_REQUEST]        = "DataRequest",
2986                 [P_RS_DATA_REQUEST]     = "RSDataRequest",
2987                 [P_SYNC_PARAM]          = "SyncParam",
2988                 [P_SYNC_PARAM89]        = "SyncParam89",
2989                 [P_PROTOCOL]            = "ReportProtocol",
2990                 [P_UUIDS]               = "ReportUUIDs",
2991                 [P_SIZES]               = "ReportSizes",
2992                 [P_STATE]               = "ReportState",
2993                 [P_SYNC_UUID]           = "ReportSyncUUID",
2994                 [P_AUTH_CHALLENGE]      = "AuthChallenge",
2995                 [P_AUTH_RESPONSE]       = "AuthResponse",
2996                 [P_PING]                = "Ping",
2997                 [P_PING_ACK]            = "PingAck",
2998                 [P_RECV_ACK]            = "RecvAck",
2999                 [P_WRITE_ACK]           = "WriteAck",
3000                 [P_RS_WRITE_ACK]        = "RSWriteAck",
3001                 [P_DISCARD_ACK]         = "DiscardAck",
3002                 [P_NEG_ACK]             = "NegAck",
3003                 [P_NEG_DREPLY]          = "NegDReply",
3004                 [P_NEG_RS_DREPLY]       = "NegRSDReply",
3005                 [P_BARRIER_ACK]         = "BarrierAck",
3006                 [P_STATE_CHG_REQ]       = "StateChgRequest",
3007                 [P_STATE_CHG_REPLY]     = "StateChgReply",
3008                 [P_OV_REQUEST]          = "OVRequest",
3009                 [P_OV_REPLY]            = "OVReply",
3010                 [P_OV_RESULT]           = "OVResult",
3011                 [P_CSUM_RS_REQUEST]     = "CsumRSRequest",
3012                 [P_RS_IS_IN_SYNC]       = "CsumRSIsInSync",
3013                 [P_COMPRESSED_BITMAP]   = "CBitmap",
3014                 [P_DELAY_PROBE]         = "DelayProbe",
3015                 [P_OUT_OF_SYNC]         = "OutOfSync",
3016                 [P_MAX_CMD]             = NULL,
3017         };
3018
3019         if (cmd == P_HAND_SHAKE_M)
3020                 return "HandShakeM";
3021         if (cmd == P_HAND_SHAKE_S)
3022                 return "HandShakeS";
3023         if (cmd == P_HAND_SHAKE)
3024                 return "HandShake";
3025         if (cmd >= P_MAX_CMD)
3026                 return "Unknown";
3027         return cmdnames[cmd];
3028 }
3029
3030 #ifdef CONFIG_DRBD_FAULT_INJECTION
3031 /* Fault insertion support including random number generator shamelessly
3032  * stolen from kernel/rcutorture.c */
3033 struct fault_random_state {
3034         unsigned long state;
3035         unsigned long count;
3036 };
3037
3038 #define FAULT_RANDOM_MULT 39916801  /* prime */
3039 #define FAULT_RANDOM_ADD        479001701 /* prime */
3040 #define FAULT_RANDOM_REFRESH 10000
3041
3042 /*
3043  * Crude but fast random-number generator.  Uses a linear congruential
3044  * generator, with occasional help from get_random_bytes().
3045  */
3046 static unsigned long
3047 _drbd_fault_random(struct fault_random_state *rsp)
3048 {
3049         long refresh;
3050
3051         if (!rsp->count--) {
3052                 get_random_bytes(&refresh, sizeof(refresh));
3053                 rsp->state += refresh;
3054                 rsp->count = FAULT_RANDOM_REFRESH;
3055         }
3056         rsp->state = rsp->state * FAULT_RANDOM_MULT + FAULT_RANDOM_ADD;
3057         return swahw32(rsp->state);
3058 }
3059
3060 static char *
3061 _drbd_fault_str(unsigned int type) {
3062         static char *_faults[] = {
3063                 [DRBD_FAULT_MD_WR] = "Meta-data write",
3064                 [DRBD_FAULT_MD_RD] = "Meta-data read",
3065                 [DRBD_FAULT_RS_WR] = "Resync write",
3066                 [DRBD_FAULT_RS_RD] = "Resync read",
3067                 [DRBD_FAULT_DT_WR] = "Data write",
3068                 [DRBD_FAULT_DT_RD] = "Data read",
3069                 [DRBD_FAULT_DT_RA] = "Data read ahead",
3070                 [DRBD_FAULT_BM_ALLOC] = "BM allocation",
3071                 [DRBD_FAULT_AL_EE] = "EE allocation",
3072                 [DRBD_FAULT_RECEIVE] = "receive data corruption",
3073         };
3074
3075         return (type < DRBD_FAULT_MAX) ? _faults[type] : "**Unknown**";
3076 }
3077
3078 unsigned int
3079 _drbd_insert_fault(struct drbd_conf *mdev, unsigned int type)
3080 {
3081         static struct fault_random_state rrs = {0, 0};
3082
3083         unsigned int ret = (
3084                 (fault_devs == 0 ||
3085                         ((1 << mdev_to_minor(mdev)) & fault_devs) != 0) &&
3086                 (((_drbd_fault_random(&rrs) % 100) + 1) <= fault_rate));
3087
3088         if (ret) {
3089                 fault_count++;
3090
3091                 if (__ratelimit(&drbd_ratelimit_state))
3092                         dev_warn(DEV, "***Simulating %s failure\n",
3093                                 _drbd_fault_str(type));
3094         }
3095
3096         return ret;
3097 }
3098 #endif
3099
3100 const char *drbd_buildtag(void)
3101 {
3102         /* DRBD built from external sources has here a reference to the
3103            git hash of the source code. */
3104
3105         static char buildtag[38] = "\0uilt-in";
3106
3107         if (buildtag[0] == 0) {
3108 #ifdef CONFIG_MODULES
3109                 if (THIS_MODULE != NULL)
3110                         sprintf(buildtag, "srcversion: %-24s", THIS_MODULE->srcversion);
3111                 else
3112 #endif
3113                         buildtag[0] = 'b';
3114         }
3115
3116         return buildtag;
3117 }
3118
3119 module_init(drbd_init)
3120 module_exit(drbd_cleanup)
3121
3122 EXPORT_SYMBOL(drbd_conn_str);
3123 EXPORT_SYMBOL(drbd_role_str);
3124 EXPORT_SYMBOL(drbd_disk_str);
3125 EXPORT_SYMBOL(drbd_set_st_err_str);