]> git.karo-electronics.de Git - karo-tx-linux.git/blob - drivers/block/drbd/drbd_main.c
drbd: Converted drbd_nl_(net_conf|disconnect)() from mdev to tconn
[karo-tx-linux.git] / drivers / block / drbd / drbd_main.c
1 /*
2    drbd.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    Thanks to Carter Burden, Bart Grantham and Gennadiy Nerubayev
11    from Logicworks, Inc. for making SDP replication support possible.
12
13    drbd is free software; you can redistribute it and/or modify
14    it under the terms of the GNU General Public License as published by
15    the Free Software Foundation; either version 2, or (at your option)
16    any later version.
17
18    drbd is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
21    GNU General Public License for more details.
22
23    You should have received a copy of the GNU General Public License
24    along with drbd; see the file COPYING.  If not, write to
25    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
26
27  */
28
29 #include <linux/module.h>
30 #include <linux/drbd.h>
31 #include <asm/uaccess.h>
32 #include <asm/types.h>
33 #include <net/sock.h>
34 #include <linux/ctype.h>
35 #include <linux/mutex.h>
36 #include <linux/fs.h>
37 #include <linux/file.h>
38 #include <linux/proc_fs.h>
39 #include <linux/init.h>
40 #include <linux/mm.h>
41 #include <linux/memcontrol.h>
42 #include <linux/mm_inline.h>
43 #include <linux/slab.h>
44 #include <linux/random.h>
45 #include <linux/reboot.h>
46 #include <linux/notifier.h>
47 #include <linux/kthread.h>
48
49 #define __KERNEL_SYSCALLS__
50 #include <linux/unistd.h>
51 #include <linux/vmalloc.h>
52
53 #include <linux/drbd_limits.h>
54 #include "drbd_int.h"
55 #include "drbd_req.h" /* only for _req_mod in tl_release and tl_clear */
56
57 #include "drbd_vli.h"
58
59 static DEFINE_MUTEX(drbd_main_mutex);
60 int drbdd_init(struct drbd_thread *);
61 int drbd_worker(struct drbd_thread *);
62 int drbd_asender(struct drbd_thread *);
63
64 int drbd_init(void);
65 static int drbd_open(struct block_device *bdev, fmode_t mode);
66 static int drbd_release(struct gendisk *gd, fmode_t mode);
67 static int w_md_sync(struct drbd_work *w, int unused);
68 static void md_sync_timer_fn(unsigned long data);
69 static int w_bitmap_io(struct drbd_work *w, int unused);
70 static int w_go_diskless(struct drbd_work *w, int unused);
71
72 MODULE_AUTHOR("Philipp Reisner <phil@linbit.com>, "
73               "Lars Ellenberg <lars@linbit.com>");
74 MODULE_DESCRIPTION("drbd - Distributed Replicated Block Device v" REL_VERSION);
75 MODULE_VERSION(REL_VERSION);
76 MODULE_LICENSE("GPL");
77 MODULE_PARM_DESC(minor_count, "Maximum number of drbd devices ("
78                  __stringify(DRBD_MINOR_COUNT_MIN) "-" __stringify(DRBD_MINOR_COUNT_MAX) ")");
79 MODULE_ALIAS_BLOCKDEV_MAJOR(DRBD_MAJOR);
80
81 #include <linux/moduleparam.h>
82 /* allow_open_on_secondary */
83 MODULE_PARM_DESC(allow_oos, "DONT USE!");
84 /* thanks to these macros, if compiled into the kernel (not-module),
85  * this becomes the boot parameter drbd.minor_count */
86 module_param(minor_count, uint, 0444);
87 module_param(disable_sendpage, bool, 0644);
88 module_param(allow_oos, bool, 0);
89 module_param(cn_idx, uint, 0444);
90 module_param(proc_details, int, 0644);
91
92 #ifdef CONFIG_DRBD_FAULT_INJECTION
93 int enable_faults;
94 int fault_rate;
95 static int fault_count;
96 int fault_devs;
97 /* bitmap of enabled faults */
98 module_param(enable_faults, int, 0664);
99 /* fault rate % value - applies to all enabled faults */
100 module_param(fault_rate, int, 0664);
101 /* count of faults inserted */
102 module_param(fault_count, int, 0664);
103 /* bitmap of devices to insert faults on */
104 module_param(fault_devs, int, 0644);
105 #endif
106
107 /* module parameter, defined */
108 unsigned int minor_count = DRBD_MINOR_COUNT_DEF;
109 int disable_sendpage;
110 int allow_oos;
111 unsigned int cn_idx = CN_IDX_DRBD;
112 int proc_details;       /* Detail level in proc drbd*/
113
114 /* Module parameter for setting the user mode helper program
115  * to run. Default is /sbin/drbdadm */
116 char usermode_helper[80] = "/sbin/drbdadm";
117
118 module_param_string(usermode_helper, usermode_helper, sizeof(usermode_helper), 0644);
119
120 /* in 2.6.x, our device mapping and config info contains our virtual gendisks
121  * as member "struct gendisk *vdisk;"
122  */
123 struct drbd_conf **minor_table;
124 struct list_head drbd_tconns;  /* list of struct drbd_tconn */
125
126 struct kmem_cache *drbd_request_cache;
127 struct kmem_cache *drbd_ee_cache;       /* peer requests */
128 struct kmem_cache *drbd_bm_ext_cache;   /* bitmap extents */
129 struct kmem_cache *drbd_al_ext_cache;   /* activity log extents */
130 mempool_t *drbd_request_mempool;
131 mempool_t *drbd_ee_mempool;
132
133 /* I do not use a standard mempool, because:
134    1) I want to hand out the pre-allocated objects first.
135    2) I want to be able to interrupt sleeping allocation with a signal.
136    Note: This is a single linked list, the next pointer is the private
137          member of struct page.
138  */
139 struct page *drbd_pp_pool;
140 spinlock_t   drbd_pp_lock;
141 int          drbd_pp_vacant;
142 wait_queue_head_t drbd_pp_wait;
143
144 DEFINE_RATELIMIT_STATE(drbd_ratelimit_state, 5 * HZ, 5);
145
146 static const struct block_device_operations drbd_ops = {
147         .owner =   THIS_MODULE,
148         .open =    drbd_open,
149         .release = drbd_release,
150 };
151
152 #define ARRY_SIZE(A) (sizeof(A)/sizeof(A[0]))
153
154 #ifdef __CHECKER__
155 /* When checking with sparse, and this is an inline function, sparse will
156    give tons of false positives. When this is a real functions sparse works.
157  */
158 int _get_ldev_if_state(struct drbd_conf *mdev, enum drbd_disk_state mins)
159 {
160         int io_allowed;
161
162         atomic_inc(&mdev->local_cnt);
163         io_allowed = (mdev->state.disk >= mins);
164         if (!io_allowed) {
165                 if (atomic_dec_and_test(&mdev->local_cnt))
166                         wake_up(&mdev->misc_wait);
167         }
168         return io_allowed;
169 }
170
171 #endif
172
173 /**
174  * DOC: The transfer log
175  *
176  * The transfer log is a single linked list of &struct drbd_tl_epoch objects.
177  * mdev->tconn->newest_tle points to the head, mdev->tconn->oldest_tle points to the tail
178  * of the list. There is always at least one &struct drbd_tl_epoch object.
179  *
180  * Each &struct drbd_tl_epoch has a circular double linked list of requests
181  * attached.
182  */
183 static int tl_init(struct drbd_tconn *tconn)
184 {
185         struct drbd_tl_epoch *b;
186
187         /* during device minor initialization, we may well use GFP_KERNEL */
188         b = kmalloc(sizeof(struct drbd_tl_epoch), GFP_KERNEL);
189         if (!b)
190                 return 0;
191         INIT_LIST_HEAD(&b->requests);
192         INIT_LIST_HEAD(&b->w.list);
193         b->next = NULL;
194         b->br_number = 4711;
195         b->n_writes = 0;
196         b->w.cb = NULL; /* if this is != NULL, we need to dec_ap_pending in tl_clear */
197
198         tconn->oldest_tle = b;
199         tconn->newest_tle = b;
200         INIT_LIST_HEAD(&tconn->out_of_sequence_requests);
201
202         return 1;
203 }
204
205 static void tl_cleanup(struct drbd_tconn *tconn)
206 {
207         if (tconn->oldest_tle != tconn->newest_tle)
208                 conn_err(tconn, "ASSERT FAILED: oldest_tle == newest_tle\n");
209         if (!list_empty(&tconn->out_of_sequence_requests))
210                 conn_err(tconn, "ASSERT FAILED: list_empty(out_of_sequence_requests)\n");
211         kfree(tconn->oldest_tle);
212         tconn->oldest_tle = NULL;
213         kfree(tconn->unused_spare_tle);
214         tconn->unused_spare_tle = NULL;
215 }
216
217 /**
218  * _tl_add_barrier() - Adds a barrier to the transfer log
219  * @mdev:       DRBD device.
220  * @new:        Barrier to be added before the current head of the TL.
221  *
222  * The caller must hold the req_lock.
223  */
224 void _tl_add_barrier(struct drbd_tconn *tconn, struct drbd_tl_epoch *new)
225 {
226         struct drbd_tl_epoch *newest_before;
227
228         INIT_LIST_HEAD(&new->requests);
229         INIT_LIST_HEAD(&new->w.list);
230         new->w.cb = NULL; /* if this is != NULL, we need to dec_ap_pending in tl_clear */
231         new->next = NULL;
232         new->n_writes = 0;
233
234         newest_before = tconn->newest_tle;
235         /* never send a barrier number == 0, because that is special-cased
236          * when using TCQ for our write ordering code */
237         new->br_number = (newest_before->br_number+1) ?: 1;
238         if (tconn->newest_tle != new) {
239                 tconn->newest_tle->next = new;
240                 tconn->newest_tle = new;
241         }
242 }
243
244 /**
245  * tl_release() - Free or recycle the oldest &struct drbd_tl_epoch object of the TL
246  * @mdev:       DRBD device.
247  * @barrier_nr: Expected identifier of the DRBD write barrier packet.
248  * @set_size:   Expected number of requests before that barrier.
249  *
250  * In case the passed barrier_nr or set_size does not match the oldest
251  * &struct drbd_tl_epoch objects this function will cause a termination
252  * of the connection.
253  */
254 void tl_release(struct drbd_tconn *tconn, unsigned int barrier_nr,
255                 unsigned int set_size)
256 {
257         struct drbd_conf *mdev;
258         struct drbd_tl_epoch *b, *nob; /* next old barrier */
259         struct list_head *le, *tle;
260         struct drbd_request *r;
261
262         spin_lock_irq(&tconn->req_lock);
263
264         b = tconn->oldest_tle;
265
266         /* first some paranoia code */
267         if (b == NULL) {
268                 conn_err(tconn, "BAD! BarrierAck #%u received, but no epoch in tl!?\n",
269                          barrier_nr);
270                 goto bail;
271         }
272         if (b->br_number != barrier_nr) {
273                 conn_err(tconn, "BAD! BarrierAck #%u received, expected #%u!\n",
274                          barrier_nr, b->br_number);
275                 goto bail;
276         }
277         if (b->n_writes != set_size) {
278                 conn_err(tconn, "BAD! BarrierAck #%u received with n_writes=%u, expected n_writes=%u!\n",
279                          barrier_nr, set_size, b->n_writes);
280                 goto bail;
281         }
282
283         /* Clean up list of requests processed during current epoch */
284         list_for_each_safe(le, tle, &b->requests) {
285                 r = list_entry(le, struct drbd_request, tl_requests);
286                 _req_mod(r, BARRIER_ACKED);
287         }
288         /* There could be requests on the list waiting for completion
289            of the write to the local disk. To avoid corruptions of
290            slab's data structures we have to remove the lists head.
291
292            Also there could have been a barrier ack out of sequence, overtaking
293            the write acks - which would be a bug and violating write ordering.
294            To not deadlock in case we lose connection while such requests are
295            still pending, we need some way to find them for the
296            _req_mode(CONNECTION_LOST_WHILE_PENDING).
297
298            These have been list_move'd to the out_of_sequence_requests list in
299            _req_mod(, BARRIER_ACKED) above.
300            */
301         list_del_init(&b->requests);
302         mdev = b->w.mdev;
303
304         nob = b->next;
305         if (test_and_clear_bit(CREATE_BARRIER, &mdev->flags)) {
306                 _tl_add_barrier(tconn, b);
307                 if (nob)
308                         tconn->oldest_tle = nob;
309                 /* if nob == NULL b was the only barrier, and becomes the new
310                    barrier. Therefore tconn->oldest_tle points already to b */
311         } else {
312                 D_ASSERT(nob != NULL);
313                 tconn->oldest_tle = nob;
314                 kfree(b);
315         }
316
317         spin_unlock_irq(&tconn->req_lock);
318         dec_ap_pending(mdev);
319
320         return;
321
322 bail:
323         spin_unlock_irq(&tconn->req_lock);
324         conn_request_state(tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
325 }
326
327
328 /**
329  * _tl_restart() - Walks the transfer log, and applies an action to all requests
330  * @mdev:       DRBD device.
331  * @what:       The action/event to perform with all request objects
332  *
333  * @what might be one of CONNECTION_LOST_WHILE_PENDING, RESEND, FAIL_FROZEN_DISK_IO,
334  * RESTART_FROZEN_DISK_IO.
335  */
336 void _tl_restart(struct drbd_tconn *tconn, enum drbd_req_event what)
337 {
338         struct drbd_tl_epoch *b, *tmp, **pn;
339         struct list_head *le, *tle, carry_reads;
340         struct drbd_request *req;
341         int rv, n_writes, n_reads;
342
343         b = tconn->oldest_tle;
344         pn = &tconn->oldest_tle;
345         while (b) {
346                 n_writes = 0;
347                 n_reads = 0;
348                 INIT_LIST_HEAD(&carry_reads);
349                 list_for_each_safe(le, tle, &b->requests) {
350                         req = list_entry(le, struct drbd_request, tl_requests);
351                         rv = _req_mod(req, what);
352
353                         n_writes += (rv & MR_WRITE) >> MR_WRITE_SHIFT;
354                         n_reads  += (rv & MR_READ) >> MR_READ_SHIFT;
355                 }
356                 tmp = b->next;
357
358                 if (n_writes) {
359                         if (what == RESEND) {
360                                 b->n_writes = n_writes;
361                                 if (b->w.cb == NULL) {
362                                         b->w.cb = w_send_barrier;
363                                         inc_ap_pending(b->w.mdev);
364                                         set_bit(CREATE_BARRIER, &b->w.mdev->flags);
365                                 }
366
367                                 drbd_queue_work(&tconn->data.work, &b->w);
368                         }
369                         pn = &b->next;
370                 } else {
371                         if (n_reads)
372                                 list_add(&carry_reads, &b->requests);
373                         /* there could still be requests on that ring list,
374                          * in case local io is still pending */
375                         list_del(&b->requests);
376
377                         /* dec_ap_pending corresponding to queue_barrier.
378                          * the newest barrier may not have been queued yet,
379                          * in which case w.cb is still NULL. */
380                         if (b->w.cb != NULL)
381                                 dec_ap_pending(b->w.mdev);
382
383                         if (b == tconn->newest_tle) {
384                                 /* recycle, but reinit! */
385                                 if (tmp != NULL)
386                                         conn_err(tconn, "ASSERT FAILED tmp == NULL");
387                                 INIT_LIST_HEAD(&b->requests);
388                                 list_splice(&carry_reads, &b->requests);
389                                 INIT_LIST_HEAD(&b->w.list);
390                                 b->w.cb = NULL;
391                                 b->br_number = net_random();
392                                 b->n_writes = 0;
393
394                                 *pn = b;
395                                 break;
396                         }
397                         *pn = tmp;
398                         kfree(b);
399                 }
400                 b = tmp;
401                 list_splice(&carry_reads, &b->requests);
402         }
403 }
404
405
406 /**
407  * tl_clear() - Clears all requests and &struct drbd_tl_epoch objects out of the TL
408  * @mdev:       DRBD device.
409  *
410  * This is called after the connection to the peer was lost. The storage covered
411  * by the requests on the transfer gets marked as our of sync. Called from the
412  * receiver thread and the worker thread.
413  */
414 void tl_clear(struct drbd_tconn *tconn)
415 {
416         struct drbd_conf *mdev;
417         struct list_head *le, *tle;
418         struct drbd_request *r;
419         int minor;
420
421         spin_lock_irq(&tconn->req_lock);
422
423         _tl_restart(tconn, CONNECTION_LOST_WHILE_PENDING);
424
425         /* we expect this list to be empty. */
426         if (!list_empty(&tconn->out_of_sequence_requests))
427                 conn_err(tconn, "ASSERT FAILED list_empty(&out_of_sequence_requests)\n");
428
429         /* but just in case, clean it up anyways! */
430         list_for_each_safe(le, tle, &tconn->out_of_sequence_requests) {
431                 r = list_entry(le, struct drbd_request, tl_requests);
432                 /* It would be nice to complete outside of spinlock.
433                  * But this is easier for now. */
434                 _req_mod(r, CONNECTION_LOST_WHILE_PENDING);
435         }
436
437         /* ensure bit indicating barrier is required is clear */
438         idr_for_each_entry(&tconn->volumes, mdev, minor)
439                 clear_bit(CREATE_BARRIER, &mdev->flags);
440
441         spin_unlock_irq(&tconn->req_lock);
442 }
443
444 void tl_restart(struct drbd_tconn *tconn, enum drbd_req_event what)
445 {
446         spin_lock_irq(&tconn->req_lock);
447         _tl_restart(tconn, what);
448         spin_unlock_irq(&tconn->req_lock);
449 }
450
451 static int drbd_thread_setup(void *arg)
452 {
453         struct drbd_thread *thi = (struct drbd_thread *) arg;
454         struct drbd_tconn *tconn = thi->tconn;
455         unsigned long flags;
456         int retval;
457
458         snprintf(current->comm, sizeof(current->comm), "drbd_%c_%s",
459                  thi->name[0], thi->tconn->name);
460
461 restart:
462         retval = thi->function(thi);
463
464         spin_lock_irqsave(&thi->t_lock, flags);
465
466         /* if the receiver has been "EXITING", the last thing it did
467          * was set the conn state to "StandAlone",
468          * if now a re-connect request comes in, conn state goes C_UNCONNECTED,
469          * and receiver thread will be "started".
470          * drbd_thread_start needs to set "RESTARTING" in that case.
471          * t_state check and assignment needs to be within the same spinlock,
472          * so either thread_start sees EXITING, and can remap to RESTARTING,
473          * or thread_start see NONE, and can proceed as normal.
474          */
475
476         if (thi->t_state == RESTARTING) {
477                 conn_info(tconn, "Restarting %s thread\n", thi->name);
478                 thi->t_state = RUNNING;
479                 spin_unlock_irqrestore(&thi->t_lock, flags);
480                 goto restart;
481         }
482
483         thi->task = NULL;
484         thi->t_state = NONE;
485         smp_mb();
486         complete(&thi->stop);
487         spin_unlock_irqrestore(&thi->t_lock, flags);
488
489         conn_info(tconn, "Terminating %s\n", current->comm);
490
491         /* Release mod reference taken when thread was started */
492         module_put(THIS_MODULE);
493         return retval;
494 }
495
496 static void drbd_thread_init(struct drbd_tconn *tconn, struct drbd_thread *thi,
497                              int (*func) (struct drbd_thread *), char *name)
498 {
499         spin_lock_init(&thi->t_lock);
500         thi->task    = NULL;
501         thi->t_state = NONE;
502         thi->function = func;
503         thi->tconn = tconn;
504         strncpy(thi->name, name, ARRAY_SIZE(thi->name));
505 }
506
507 int drbd_thread_start(struct drbd_thread *thi)
508 {
509         struct drbd_tconn *tconn = thi->tconn;
510         struct task_struct *nt;
511         unsigned long flags;
512
513         /* is used from state engine doing drbd_thread_stop_nowait,
514          * while holding the req lock irqsave */
515         spin_lock_irqsave(&thi->t_lock, flags);
516
517         switch (thi->t_state) {
518         case NONE:
519                 conn_info(tconn, "Starting %s thread (from %s [%d])\n",
520                          thi->name, current->comm, current->pid);
521
522                 /* Get ref on module for thread - this is released when thread exits */
523                 if (!try_module_get(THIS_MODULE)) {
524                         conn_err(tconn, "Failed to get module reference in drbd_thread_start\n");
525                         spin_unlock_irqrestore(&thi->t_lock, flags);
526                         return false;
527                 }
528
529                 init_completion(&thi->stop);
530                 thi->reset_cpu_mask = 1;
531                 thi->t_state = RUNNING;
532                 spin_unlock_irqrestore(&thi->t_lock, flags);
533                 flush_signals(current); /* otherw. may get -ERESTARTNOINTR */
534
535                 nt = kthread_create(drbd_thread_setup, (void *) thi,
536                                     "drbd_%c_%s", thi->name[0], thi->tconn->name);
537
538                 if (IS_ERR(nt)) {
539                         conn_err(tconn, "Couldn't start thread\n");
540
541                         module_put(THIS_MODULE);
542                         return false;
543                 }
544                 spin_lock_irqsave(&thi->t_lock, flags);
545                 thi->task = nt;
546                 thi->t_state = RUNNING;
547                 spin_unlock_irqrestore(&thi->t_lock, flags);
548                 wake_up_process(nt);
549                 break;
550         case EXITING:
551                 thi->t_state = RESTARTING;
552                 conn_info(tconn, "Restarting %s thread (from %s [%d])\n",
553                                 thi->name, current->comm, current->pid);
554                 /* fall through */
555         case RUNNING:
556         case RESTARTING:
557         default:
558                 spin_unlock_irqrestore(&thi->t_lock, flags);
559                 break;
560         }
561
562         return true;
563 }
564
565
566 void _drbd_thread_stop(struct drbd_thread *thi, int restart, int wait)
567 {
568         unsigned long flags;
569
570         enum drbd_thread_state ns = restart ? RESTARTING : EXITING;
571
572         /* may be called from state engine, holding the req lock irqsave */
573         spin_lock_irqsave(&thi->t_lock, flags);
574
575         if (thi->t_state == NONE) {
576                 spin_unlock_irqrestore(&thi->t_lock, flags);
577                 if (restart)
578                         drbd_thread_start(thi);
579                 return;
580         }
581
582         if (thi->t_state != ns) {
583                 if (thi->task == NULL) {
584                         spin_unlock_irqrestore(&thi->t_lock, flags);
585                         return;
586                 }
587
588                 thi->t_state = ns;
589                 smp_mb();
590                 init_completion(&thi->stop);
591                 if (thi->task != current)
592                         force_sig(DRBD_SIGKILL, thi->task);
593         }
594
595         spin_unlock_irqrestore(&thi->t_lock, flags);
596
597         if (wait)
598                 wait_for_completion(&thi->stop);
599 }
600
601 static struct drbd_thread *drbd_task_to_thread(struct drbd_tconn *tconn, struct task_struct *task)
602 {
603         struct drbd_thread *thi =
604                 task == tconn->receiver.task ? &tconn->receiver :
605                 task == tconn->asender.task  ? &tconn->asender :
606                 task == tconn->worker.task   ? &tconn->worker : NULL;
607
608         return thi;
609 }
610
611 char *drbd_task_to_thread_name(struct drbd_tconn *tconn, struct task_struct *task)
612 {
613         struct drbd_thread *thi = drbd_task_to_thread(tconn, task);
614         return thi ? thi->name : task->comm;
615 }
616
617 #ifdef CONFIG_SMP
618 int conn_lowest_minor(struct drbd_tconn *tconn)
619 {
620         int minor = 0;
621         idr_get_next(&tconn->volumes, &minor);
622         return minor;
623 }
624 /**
625  * drbd_calc_cpu_mask() - Generate CPU masks, spread over all CPUs
626  * @mdev:       DRBD device.
627  *
628  * Forces all threads of a device onto the same CPU. This is beneficial for
629  * DRBD's performance. May be overwritten by user's configuration.
630  */
631 void drbd_calc_cpu_mask(struct drbd_tconn *tconn)
632 {
633         int ord, cpu;
634
635         /* user override. */
636         if (cpumask_weight(tconn->cpu_mask))
637                 return;
638
639         ord = conn_lowest_minor(tconn) % cpumask_weight(cpu_online_mask);
640         for_each_online_cpu(cpu) {
641                 if (ord-- == 0) {
642                         cpumask_set_cpu(cpu, tconn->cpu_mask);
643                         return;
644                 }
645         }
646         /* should not be reached */
647         cpumask_setall(tconn->cpu_mask);
648 }
649
650 /**
651  * drbd_thread_current_set_cpu() - modifies the cpu mask of the _current_ thread
652  * @mdev:       DRBD device.
653  * @thi:        drbd_thread object
654  *
655  * call in the "main loop" of _all_ threads, no need for any mutex, current won't die
656  * prematurely.
657  */
658 void drbd_thread_current_set_cpu(struct drbd_thread *thi)
659 {
660         struct task_struct *p = current;
661
662         if (!thi->reset_cpu_mask)
663                 return;
664         thi->reset_cpu_mask = 0;
665         set_cpus_allowed_ptr(p, thi->tconn->cpu_mask);
666 }
667 #endif
668
669 static void prepare_header80(struct p_header80 *h, enum drbd_packet cmd, int size)
670 {
671         h->magic   = cpu_to_be32(DRBD_MAGIC);
672         h->command = cpu_to_be16(cmd);
673         h->length  = cpu_to_be16(size);
674 }
675
676 static void prepare_header95(struct p_header95 *h, enum drbd_packet cmd, int size)
677 {
678         h->magic   = cpu_to_be16(DRBD_MAGIC_BIG);
679         h->command = cpu_to_be16(cmd);
680         h->length  = cpu_to_be32(size);
681 }
682
683 static void _prepare_header(struct drbd_tconn *tconn, int vnr, struct p_header *h,
684                             enum drbd_packet cmd, int size)
685 {
686         if (tconn->agreed_pro_version >= 100 || size > DRBD_MAX_SIZE_H80_PACKET)
687                 prepare_header95(&h->h95, cmd, size);
688         else
689                 prepare_header80(&h->h80, cmd, size);
690 }
691
692 static void prepare_header(struct drbd_conf *mdev, struct p_header *h,
693                            enum drbd_packet cmd, int size)
694 {
695         _prepare_header(mdev->tconn, mdev->vnr, h, cmd, size);
696 }
697
698 /* the appropriate socket mutex must be held already */
699 int _conn_send_cmd(struct drbd_tconn *tconn, int vnr, struct socket *sock,
700                    enum drbd_packet cmd, struct p_header *h, size_t size,
701                    unsigned msg_flags)
702 {
703         int sent, ok;
704
705         _prepare_header(tconn, vnr, h, cmd, size - sizeof(struct p_header));
706
707         sent = drbd_send(tconn, sock, h, size, msg_flags);
708
709         ok = (sent == size);
710         if (!ok && !signal_pending(current))
711                 conn_warn(tconn, "short sent %s size=%d sent=%d\n",
712                           cmdname(cmd), (int)size, sent);
713         return ok;
714 }
715
716 /* don't pass the socket. we may only look at it
717  * when we hold the appropriate socket mutex.
718  */
719 int conn_send_cmd(struct drbd_tconn *tconn, int vnr, int use_data_socket,
720                   enum drbd_packet cmd, struct p_header *h, size_t size)
721 {
722         int ok = 0;
723         struct socket *sock;
724
725         if (use_data_socket) {
726                 mutex_lock(&tconn->data.mutex);
727                 sock = tconn->data.socket;
728         } else {
729                 mutex_lock(&tconn->meta.mutex);
730                 sock = tconn->meta.socket;
731         }
732
733         /* drbd_disconnect() could have called drbd_free_sock()
734          * while we were waiting in down()... */
735         if (likely(sock != NULL))
736                 ok = _conn_send_cmd(tconn, vnr, sock, cmd, h, size, 0);
737
738         if (use_data_socket)
739                 mutex_unlock(&tconn->data.mutex);
740         else
741                 mutex_unlock(&tconn->meta.mutex);
742         return ok;
743 }
744
745 int conn_send_cmd2(struct drbd_tconn *tconn, enum drbd_packet cmd, char *data,
746                    size_t size)
747 {
748         struct p_header80 h;
749         int ok;
750
751         prepare_header80(&h, cmd, size);
752
753         if (!drbd_get_data_sock(tconn))
754                 return 0;
755
756         ok = (sizeof(h) ==
757                 drbd_send(tconn, tconn->data.socket, &h, sizeof(h), 0));
758         ok = ok && (size ==
759                 drbd_send(tconn, tconn->data.socket, data, size, 0));
760
761         drbd_put_data_sock(tconn);
762
763         return ok;
764 }
765
766 int drbd_send_sync_param(struct drbd_conf *mdev, struct syncer_conf *sc)
767 {
768         struct p_rs_param_95 *p;
769         struct socket *sock;
770         int size, rv;
771         const int apv = mdev->tconn->agreed_pro_version;
772
773         size = apv <= 87 ? sizeof(struct p_rs_param)
774                 : apv == 88 ? sizeof(struct p_rs_param)
775                         + strlen(mdev->sync_conf.verify_alg) + 1
776                 : apv <= 94 ? sizeof(struct p_rs_param_89)
777                 : /* apv >= 95 */ sizeof(struct p_rs_param_95);
778
779         /* used from admin command context and receiver/worker context.
780          * to avoid kmalloc, grab the socket right here,
781          * then use the pre-allocated sbuf there */
782         mutex_lock(&mdev->tconn->data.mutex);
783         sock = mdev->tconn->data.socket;
784
785         if (likely(sock != NULL)) {
786                 enum drbd_packet cmd =
787                         apv >= 89 ? P_SYNC_PARAM89 : P_SYNC_PARAM;
788
789                 p = &mdev->tconn->data.sbuf.rs_param_95;
790
791                 /* initialize verify_alg and csums_alg */
792                 memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
793
794                 p->rate = cpu_to_be32(sc->rate);
795                 p->c_plan_ahead = cpu_to_be32(sc->c_plan_ahead);
796                 p->c_delay_target = cpu_to_be32(sc->c_delay_target);
797                 p->c_fill_target = cpu_to_be32(sc->c_fill_target);
798                 p->c_max_rate = cpu_to_be32(sc->c_max_rate);
799
800                 if (apv >= 88)
801                         strcpy(p->verify_alg, mdev->sync_conf.verify_alg);
802                 if (apv >= 89)
803                         strcpy(p->csums_alg, mdev->sync_conf.csums_alg);
804
805                 rv = _drbd_send_cmd(mdev, sock, cmd, &p->head, size, 0);
806         } else
807                 rv = 0; /* not ok */
808
809         mutex_unlock(&mdev->tconn->data.mutex);
810
811         return rv;
812 }
813
814 int drbd_send_protocol(struct drbd_tconn *tconn)
815 {
816         struct p_protocol *p;
817         int size, cf, rv;
818
819         size = sizeof(struct p_protocol);
820
821         if (tconn->agreed_pro_version >= 87)
822                 size += strlen(tconn->net_conf->integrity_alg) + 1;
823
824         /* we must not recurse into our own queue,
825          * as that is blocked during handshake */
826         p = kmalloc(size, GFP_NOIO);
827         if (p == NULL)
828                 return 0;
829
830         p->protocol      = cpu_to_be32(tconn->net_conf->wire_protocol);
831         p->after_sb_0p   = cpu_to_be32(tconn->net_conf->after_sb_0p);
832         p->after_sb_1p   = cpu_to_be32(tconn->net_conf->after_sb_1p);
833         p->after_sb_2p   = cpu_to_be32(tconn->net_conf->after_sb_2p);
834         p->two_primaries = cpu_to_be32(tconn->net_conf->two_primaries);
835
836         cf = 0;
837         if (tconn->net_conf->want_lose)
838                 cf |= CF_WANT_LOSE;
839         if (tconn->net_conf->dry_run) {
840                 if (tconn->agreed_pro_version >= 92)
841                         cf |= CF_DRY_RUN;
842                 else {
843                         conn_err(tconn, "--dry-run is not supported by peer");
844                         kfree(p);
845                         return -1;
846                 }
847         }
848         p->conn_flags    = cpu_to_be32(cf);
849
850         if (tconn->agreed_pro_version >= 87)
851                 strcpy(p->integrity_alg, tconn->net_conf->integrity_alg);
852
853         rv = conn_send_cmd2(tconn, P_PROTOCOL, p->head.payload, size - sizeof(struct p_header));
854         kfree(p);
855         return rv;
856 }
857
858 int _drbd_send_uuids(struct drbd_conf *mdev, u64 uuid_flags)
859 {
860         struct p_uuids p;
861         int i;
862
863         if (!get_ldev_if_state(mdev, D_NEGOTIATING))
864                 return 1;
865
866         for (i = UI_CURRENT; i < UI_SIZE; i++)
867                 p.uuid[i] = mdev->ldev ? cpu_to_be64(mdev->ldev->md.uuid[i]) : 0;
868
869         mdev->comm_bm_set = drbd_bm_total_weight(mdev);
870         p.uuid[UI_SIZE] = cpu_to_be64(mdev->comm_bm_set);
871         uuid_flags |= mdev->tconn->net_conf->want_lose ? 1 : 0;
872         uuid_flags |= test_bit(CRASHED_PRIMARY, &mdev->flags) ? 2 : 0;
873         uuid_flags |= mdev->new_state_tmp.disk == D_INCONSISTENT ? 4 : 0;
874         p.uuid[UI_FLAGS] = cpu_to_be64(uuid_flags);
875
876         put_ldev(mdev);
877
878         return drbd_send_cmd(mdev, USE_DATA_SOCKET, P_UUIDS, &p.head, sizeof(p));
879 }
880
881 int drbd_send_uuids(struct drbd_conf *mdev)
882 {
883         return _drbd_send_uuids(mdev, 0);
884 }
885
886 int drbd_send_uuids_skip_initial_sync(struct drbd_conf *mdev)
887 {
888         return _drbd_send_uuids(mdev, 8);
889 }
890
891 void drbd_print_uuids(struct drbd_conf *mdev, const char *text)
892 {
893         if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
894                 u64 *uuid = mdev->ldev->md.uuid;
895                 dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX\n",
896                      text,
897                      (unsigned long long)uuid[UI_CURRENT],
898                      (unsigned long long)uuid[UI_BITMAP],
899                      (unsigned long long)uuid[UI_HISTORY_START],
900                      (unsigned long long)uuid[UI_HISTORY_END]);
901                 put_ldev(mdev);
902         } else {
903                 dev_info(DEV, "%s effective data uuid: %016llX\n",
904                                 text,
905                                 (unsigned long long)mdev->ed_uuid);
906         }
907 }
908
909 int drbd_gen_and_send_sync_uuid(struct drbd_conf *mdev)
910 {
911         struct p_rs_uuid p;
912         u64 uuid;
913
914         D_ASSERT(mdev->state.disk == D_UP_TO_DATE);
915
916         uuid = mdev->ldev->md.uuid[UI_BITMAP] + UUID_NEW_BM_OFFSET;
917         drbd_uuid_set(mdev, UI_BITMAP, uuid);
918         drbd_print_uuids(mdev, "updated sync UUID");
919         drbd_md_sync(mdev);
920         p.uuid = cpu_to_be64(uuid);
921
922         return drbd_send_cmd(mdev, USE_DATA_SOCKET, P_SYNC_UUID, &p.head, sizeof(p));
923 }
924
925 int drbd_send_sizes(struct drbd_conf *mdev, int trigger_reply, enum dds_flags flags)
926 {
927         struct p_sizes p;
928         sector_t d_size, u_size;
929         int q_order_type, max_bio_size;
930         int ok;
931
932         if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
933                 D_ASSERT(mdev->ldev->backing_bdev);
934                 d_size = drbd_get_max_capacity(mdev->ldev);
935                 u_size = mdev->ldev->dc.disk_size;
936                 q_order_type = drbd_queue_order_type(mdev);
937                 max_bio_size = queue_max_hw_sectors(mdev->ldev->backing_bdev->bd_disk->queue) << 9;
938                 max_bio_size = min_t(int, max_bio_size, DRBD_MAX_BIO_SIZE);
939                 put_ldev(mdev);
940         } else {
941                 d_size = 0;
942                 u_size = 0;
943                 q_order_type = QUEUE_ORDERED_NONE;
944                 max_bio_size = DRBD_MAX_BIO_SIZE; /* ... multiple BIOs per peer_request */
945         }
946
947         p.d_size = cpu_to_be64(d_size);
948         p.u_size = cpu_to_be64(u_size);
949         p.c_size = cpu_to_be64(trigger_reply ? 0 : drbd_get_capacity(mdev->this_bdev));
950         p.max_bio_size = cpu_to_be32(max_bio_size);
951         p.queue_order_type = cpu_to_be16(q_order_type);
952         p.dds_flags = cpu_to_be16(flags);
953
954         ok = drbd_send_cmd(mdev, USE_DATA_SOCKET, P_SIZES, &p.head, sizeof(p));
955         return ok;
956 }
957
958 /**
959  * drbd_send_state() - Sends the drbd state to the peer
960  * @mdev:       DRBD device.
961  */
962 int drbd_send_state(struct drbd_conf *mdev)
963 {
964         struct socket *sock;
965         struct p_state p;
966         int ok = 0;
967
968         mutex_lock(&mdev->tconn->data.mutex);
969
970         p.state = cpu_to_be32(mdev->state.i); /* Within the send mutex */
971         sock = mdev->tconn->data.socket;
972
973         if (likely(sock != NULL)) {
974                 ok = _drbd_send_cmd(mdev, sock, P_STATE, &p.head, sizeof(p), 0);
975         }
976
977         mutex_unlock(&mdev->tconn->data.mutex);
978
979         return ok;
980 }
981
982 int _conn_send_state_req(struct drbd_tconn *tconn, int vnr, enum drbd_packet cmd,
983                          union drbd_state mask, union drbd_state val)
984 {
985         struct p_req_state p;
986
987         p.mask    = cpu_to_be32(mask.i);
988         p.val     = cpu_to_be32(val.i);
989
990         return conn_send_cmd(tconn, vnr, USE_DATA_SOCKET, cmd, &p.head, sizeof(p));
991 }
992
993 int drbd_send_sr_reply(struct drbd_conf *mdev, enum drbd_state_rv retcode)
994 {
995         struct p_req_state_reply p;
996
997         p.retcode    = cpu_to_be32(retcode);
998
999         return drbd_send_cmd(mdev, USE_META_SOCKET, P_STATE_CHG_REPLY, &p.head, sizeof(p));
1000 }
1001
1002 int conn_send_sr_reply(struct drbd_tconn *tconn, enum drbd_state_rv retcode)
1003 {
1004         struct p_req_state_reply p;
1005         enum drbd_packet cmd = tconn->agreed_pro_version < 100 ? P_STATE_CHG_REPLY : P_CONN_ST_CHG_REPLY;
1006
1007         p.retcode    = cpu_to_be32(retcode);
1008
1009         return conn_send_cmd(tconn, 0, USE_META_SOCKET, cmd, &p.head, sizeof(p));
1010 }
1011
1012 int fill_bitmap_rle_bits(struct drbd_conf *mdev,
1013         struct p_compressed_bm *p,
1014         struct bm_xfer_ctx *c)
1015 {
1016         struct bitstream bs;
1017         unsigned long plain_bits;
1018         unsigned long tmp;
1019         unsigned long rl;
1020         unsigned len;
1021         unsigned toggle;
1022         int bits;
1023
1024         /* may we use this feature? */
1025         if ((mdev->sync_conf.use_rle == 0) ||
1026                 (mdev->tconn->agreed_pro_version < 90))
1027                         return 0;
1028
1029         if (c->bit_offset >= c->bm_bits)
1030                 return 0; /* nothing to do. */
1031
1032         /* use at most thus many bytes */
1033         bitstream_init(&bs, p->code, BM_PACKET_VLI_BYTES_MAX, 0);
1034         memset(p->code, 0, BM_PACKET_VLI_BYTES_MAX);
1035         /* plain bits covered in this code string */
1036         plain_bits = 0;
1037
1038         /* p->encoding & 0x80 stores whether the first run length is set.
1039          * bit offset is implicit.
1040          * start with toggle == 2 to be able to tell the first iteration */
1041         toggle = 2;
1042
1043         /* see how much plain bits we can stuff into one packet
1044          * using RLE and VLI. */
1045         do {
1046                 tmp = (toggle == 0) ? _drbd_bm_find_next_zero(mdev, c->bit_offset)
1047                                     : _drbd_bm_find_next(mdev, c->bit_offset);
1048                 if (tmp == -1UL)
1049                         tmp = c->bm_bits;
1050                 rl = tmp - c->bit_offset;
1051
1052                 if (toggle == 2) { /* first iteration */
1053                         if (rl == 0) {
1054                                 /* the first checked bit was set,
1055                                  * store start value, */
1056                                 DCBP_set_start(p, 1);
1057                                 /* but skip encoding of zero run length */
1058                                 toggle = !toggle;
1059                                 continue;
1060                         }
1061                         DCBP_set_start(p, 0);
1062                 }
1063
1064                 /* paranoia: catch zero runlength.
1065                  * can only happen if bitmap is modified while we scan it. */
1066                 if (rl == 0) {
1067                         dev_err(DEV, "unexpected zero runlength while encoding bitmap "
1068                             "t:%u bo:%lu\n", toggle, c->bit_offset);
1069                         return -1;
1070                 }
1071
1072                 bits = vli_encode_bits(&bs, rl);
1073                 if (bits == -ENOBUFS) /* buffer full */
1074                         break;
1075                 if (bits <= 0) {
1076                         dev_err(DEV, "error while encoding bitmap: %d\n", bits);
1077                         return 0;
1078                 }
1079
1080                 toggle = !toggle;
1081                 plain_bits += rl;
1082                 c->bit_offset = tmp;
1083         } while (c->bit_offset < c->bm_bits);
1084
1085         len = bs.cur.b - p->code + !!bs.cur.bit;
1086
1087         if (plain_bits < (len << 3)) {
1088                 /* incompressible with this method.
1089                  * we need to rewind both word and bit position. */
1090                 c->bit_offset -= plain_bits;
1091                 bm_xfer_ctx_bit_to_word_offset(c);
1092                 c->bit_offset = c->word_offset * BITS_PER_LONG;
1093                 return 0;
1094         }
1095
1096         /* RLE + VLI was able to compress it just fine.
1097          * update c->word_offset. */
1098         bm_xfer_ctx_bit_to_word_offset(c);
1099
1100         /* store pad_bits */
1101         DCBP_set_pad_bits(p, (8 - bs.cur.bit) & 0x7);
1102
1103         return len;
1104 }
1105
1106 /**
1107  * send_bitmap_rle_or_plain
1108  *
1109  * Return 0 when done, 1 when another iteration is needed, and a negative error
1110  * code upon failure.
1111  */
1112 static int
1113 send_bitmap_rle_or_plain(struct drbd_conf *mdev,
1114                          struct p_header *h, struct bm_xfer_ctx *c)
1115 {
1116         struct p_compressed_bm *p = (void*)h;
1117         unsigned long num_words;
1118         int len;
1119         int ok;
1120
1121         len = fill_bitmap_rle_bits(mdev, p, c);
1122
1123         if (len < 0)
1124                 return -EIO;
1125
1126         if (len) {
1127                 DCBP_set_code(p, RLE_VLI_Bits);
1128                 ok = _drbd_send_cmd(mdev, mdev->tconn->data.socket, P_COMPRESSED_BITMAP, h,
1129                         sizeof(*p) + len, 0);
1130
1131                 c->packets[0]++;
1132                 c->bytes[0] += sizeof(*p) + len;
1133
1134                 if (c->bit_offset >= c->bm_bits)
1135                         len = 0; /* DONE */
1136         } else {
1137                 /* was not compressible.
1138                  * send a buffer full of plain text bits instead. */
1139                 num_words = min_t(size_t, BM_PACKET_WORDS, c->bm_words - c->word_offset);
1140                 len = num_words * sizeof(long);
1141                 if (len)
1142                         drbd_bm_get_lel(mdev, c->word_offset, num_words, (unsigned long*)h->payload);
1143                 ok = _drbd_send_cmd(mdev, mdev->tconn->data.socket, P_BITMAP,
1144                                    h, sizeof(struct p_header80) + len, 0);
1145                 c->word_offset += num_words;
1146                 c->bit_offset = c->word_offset * BITS_PER_LONG;
1147
1148                 c->packets[1]++;
1149                 c->bytes[1] += sizeof(struct p_header80) + len;
1150
1151                 if (c->bit_offset > c->bm_bits)
1152                         c->bit_offset = c->bm_bits;
1153         }
1154         if (ok) {
1155                 if (len == 0) {
1156                         INFO_bm_xfer_stats(mdev, "send", c);
1157                         return 0;
1158                 } else
1159                         return 1;
1160         }
1161         return -EIO;
1162 }
1163
1164 /* See the comment at receive_bitmap() */
1165 int _drbd_send_bitmap(struct drbd_conf *mdev)
1166 {
1167         struct bm_xfer_ctx c;
1168         struct p_header *p;
1169         int err;
1170
1171         if (!expect(mdev->bitmap))
1172                 return false;
1173
1174         /* maybe we should use some per thread scratch page,
1175          * and allocate that during initial device creation? */
1176         p = (struct p_header *) __get_free_page(GFP_NOIO);
1177         if (!p) {
1178                 dev_err(DEV, "failed to allocate one page buffer in %s\n", __func__);
1179                 return false;
1180         }
1181
1182         if (get_ldev(mdev)) {
1183                 if (drbd_md_test_flag(mdev->ldev, MDF_FULL_SYNC)) {
1184                         dev_info(DEV, "Writing the whole bitmap, MDF_FullSync was set.\n");
1185                         drbd_bm_set_all(mdev);
1186                         if (drbd_bm_write(mdev)) {
1187                                 /* write_bm did fail! Leave full sync flag set in Meta P_DATA
1188                                  * but otherwise process as per normal - need to tell other
1189                                  * side that a full resync is required! */
1190                                 dev_err(DEV, "Failed to write bitmap to disk!\n");
1191                         } else {
1192                                 drbd_md_clear_flag(mdev, MDF_FULL_SYNC);
1193                                 drbd_md_sync(mdev);
1194                         }
1195                 }
1196                 put_ldev(mdev);
1197         }
1198
1199         c = (struct bm_xfer_ctx) {
1200                 .bm_bits = drbd_bm_bits(mdev),
1201                 .bm_words = drbd_bm_words(mdev),
1202         };
1203
1204         do {
1205                 err = send_bitmap_rle_or_plain(mdev, p, &c);
1206         } while (err > 0);
1207
1208         free_page((unsigned long) p);
1209         return err == 0;
1210 }
1211
1212 int drbd_send_bitmap(struct drbd_conf *mdev)
1213 {
1214         int err;
1215
1216         if (!drbd_get_data_sock(mdev->tconn))
1217                 return -1;
1218         err = !_drbd_send_bitmap(mdev);
1219         drbd_put_data_sock(mdev->tconn);
1220         return err;
1221 }
1222
1223 int drbd_send_b_ack(struct drbd_conf *mdev, u32 barrier_nr, u32 set_size)
1224 {
1225         int ok;
1226         struct p_barrier_ack p;
1227
1228         p.barrier  = barrier_nr;
1229         p.set_size = cpu_to_be32(set_size);
1230
1231         if (mdev->state.conn < C_CONNECTED)
1232                 return false;
1233         ok = drbd_send_cmd(mdev, USE_META_SOCKET, P_BARRIER_ACK, &p.head, sizeof(p));
1234         return ok;
1235 }
1236
1237 /**
1238  * _drbd_send_ack() - Sends an ack packet
1239  * @mdev:       DRBD device.
1240  * @cmd:        Packet command code.
1241  * @sector:     sector, needs to be in big endian byte order
1242  * @blksize:    size in byte, needs to be in big endian byte order
1243  * @block_id:   Id, big endian byte order
1244  */
1245 static int _drbd_send_ack(struct drbd_conf *mdev, enum drbd_packet cmd,
1246                           u64 sector, u32 blksize, u64 block_id)
1247 {
1248         int ok;
1249         struct p_block_ack p;
1250
1251         p.sector   = sector;
1252         p.block_id = block_id;
1253         p.blksize  = blksize;
1254         p.seq_num  = cpu_to_be32(atomic_inc_return(&mdev->packet_seq));
1255
1256         if (!mdev->tconn->meta.socket || mdev->state.conn < C_CONNECTED)
1257                 return false;
1258         ok = drbd_send_cmd(mdev, USE_META_SOCKET, cmd, &p.head, sizeof(p));
1259         return ok;
1260 }
1261
1262 /* dp->sector and dp->block_id already/still in network byte order,
1263  * data_size is payload size according to dp->head,
1264  * and may need to be corrected for digest size. */
1265 int drbd_send_ack_dp(struct drbd_conf *mdev, enum drbd_packet cmd,
1266                      struct p_data *dp, int data_size)
1267 {
1268         data_size -= (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_r_tfm) ?
1269                 crypto_hash_digestsize(mdev->tconn->integrity_r_tfm) : 0;
1270         return _drbd_send_ack(mdev, cmd, dp->sector, cpu_to_be32(data_size),
1271                               dp->block_id);
1272 }
1273
1274 int drbd_send_ack_rp(struct drbd_conf *mdev, enum drbd_packet cmd,
1275                      struct p_block_req *rp)
1276 {
1277         return _drbd_send_ack(mdev, cmd, rp->sector, rp->blksize, rp->block_id);
1278 }
1279
1280 /**
1281  * drbd_send_ack() - Sends an ack packet
1282  * @mdev:       DRBD device
1283  * @cmd:        packet command code
1284  * @peer_req:   peer request
1285  */
1286 int drbd_send_ack(struct drbd_conf *mdev, enum drbd_packet cmd,
1287                   struct drbd_peer_request *peer_req)
1288 {
1289         return _drbd_send_ack(mdev, cmd,
1290                               cpu_to_be64(peer_req->i.sector),
1291                               cpu_to_be32(peer_req->i.size),
1292                               peer_req->block_id);
1293 }
1294
1295 /* This function misuses the block_id field to signal if the blocks
1296  * are is sync or not. */
1297 int drbd_send_ack_ex(struct drbd_conf *mdev, enum drbd_packet cmd,
1298                      sector_t sector, int blksize, u64 block_id)
1299 {
1300         return _drbd_send_ack(mdev, cmd,
1301                               cpu_to_be64(sector),
1302                               cpu_to_be32(blksize),
1303                               cpu_to_be64(block_id));
1304 }
1305
1306 int drbd_send_drequest(struct drbd_conf *mdev, int cmd,
1307                        sector_t sector, int size, u64 block_id)
1308 {
1309         int ok;
1310         struct p_block_req p;
1311
1312         p.sector   = cpu_to_be64(sector);
1313         p.block_id = block_id;
1314         p.blksize  = cpu_to_be32(size);
1315
1316         ok = drbd_send_cmd(mdev, USE_DATA_SOCKET, cmd, &p.head, sizeof(p));
1317         return ok;
1318 }
1319
1320 int drbd_send_drequest_csum(struct drbd_conf *mdev, sector_t sector, int size,
1321                             void *digest, int digest_size, enum drbd_packet cmd)
1322 {
1323         int ok;
1324         struct p_block_req p;
1325
1326         prepare_header(mdev, &p.head, cmd, sizeof(p) - sizeof(struct p_header) + digest_size);
1327         p.sector   = cpu_to_be64(sector);
1328         p.block_id = ID_SYNCER /* unused */;
1329         p.blksize  = cpu_to_be32(size);
1330
1331         mutex_lock(&mdev->tconn->data.mutex);
1332
1333         ok = (sizeof(p) == drbd_send(mdev->tconn, mdev->tconn->data.socket, &p, sizeof(p), 0));
1334         ok = ok && (digest_size == drbd_send(mdev->tconn, mdev->tconn->data.socket, digest, digest_size, 0));
1335
1336         mutex_unlock(&mdev->tconn->data.mutex);
1337
1338         return ok;
1339 }
1340
1341 int drbd_send_ov_request(struct drbd_conf *mdev, sector_t sector, int size)
1342 {
1343         int ok;
1344         struct p_block_req p;
1345
1346         p.sector   = cpu_to_be64(sector);
1347         p.block_id = ID_SYNCER /* unused */;
1348         p.blksize  = cpu_to_be32(size);
1349
1350         ok = drbd_send_cmd(mdev, USE_DATA_SOCKET, P_OV_REQUEST, &p.head, sizeof(p));
1351         return ok;
1352 }
1353
1354 /* called on sndtimeo
1355  * returns false if we should retry,
1356  * true if we think connection is dead
1357  */
1358 static int we_should_drop_the_connection(struct drbd_tconn *tconn, struct socket *sock)
1359 {
1360         int drop_it;
1361         /* long elapsed = (long)(jiffies - mdev->last_received); */
1362
1363         drop_it =   tconn->meta.socket == sock
1364                 || !tconn->asender.task
1365                 || get_t_state(&tconn->asender) != RUNNING
1366                 || tconn->cstate < C_WF_REPORT_PARAMS;
1367
1368         if (drop_it)
1369                 return true;
1370
1371         drop_it = !--tconn->ko_count;
1372         if (!drop_it) {
1373                 conn_err(tconn, "[%s/%d] sock_sendmsg time expired, ko = %u\n",
1374                          current->comm, current->pid, tconn->ko_count);
1375                 request_ping(tconn);
1376         }
1377
1378         return drop_it; /* && (mdev->state == R_PRIMARY) */;
1379 }
1380
1381 static void drbd_update_congested(struct drbd_tconn *tconn)
1382 {
1383         struct sock *sk = tconn->data.socket->sk;
1384         if (sk->sk_wmem_queued > sk->sk_sndbuf * 4 / 5)
1385                 set_bit(NET_CONGESTED, &tconn->flags);
1386 }
1387
1388 /* The idea of sendpage seems to be to put some kind of reference
1389  * to the page into the skb, and to hand it over to the NIC. In
1390  * this process get_page() gets called.
1391  *
1392  * As soon as the page was really sent over the network put_page()
1393  * gets called by some part of the network layer. [ NIC driver? ]
1394  *
1395  * [ get_page() / put_page() increment/decrement the count. If count
1396  *   reaches 0 the page will be freed. ]
1397  *
1398  * This works nicely with pages from FSs.
1399  * But this means that in protocol A we might signal IO completion too early!
1400  *
1401  * In order not to corrupt data during a resync we must make sure
1402  * that we do not reuse our own buffer pages (EEs) to early, therefore
1403  * we have the net_ee list.
1404  *
1405  * XFS seems to have problems, still, it submits pages with page_count == 0!
1406  * As a workaround, we disable sendpage on pages
1407  * with page_count == 0 or PageSlab.
1408  */
1409 static int _drbd_no_send_page(struct drbd_conf *mdev, struct page *page,
1410                    int offset, size_t size, unsigned msg_flags)
1411 {
1412         int sent = drbd_send(mdev->tconn, mdev->tconn->data.socket, kmap(page) + offset, size, msg_flags);
1413         kunmap(page);
1414         if (sent == size)
1415                 mdev->send_cnt += size>>9;
1416         return sent == size;
1417 }
1418
1419 static int _drbd_send_page(struct drbd_conf *mdev, struct page *page,
1420                     int offset, size_t size, unsigned msg_flags)
1421 {
1422         mm_segment_t oldfs = get_fs();
1423         int sent, ok;
1424         int len = size;
1425
1426         /* e.g. XFS meta- & log-data is in slab pages, which have a
1427          * page_count of 0 and/or have PageSlab() set.
1428          * we cannot use send_page for those, as that does get_page();
1429          * put_page(); and would cause either a VM_BUG directly, or
1430          * __page_cache_release a page that would actually still be referenced
1431          * by someone, leading to some obscure delayed Oops somewhere else. */
1432         if (disable_sendpage || (page_count(page) < 1) || PageSlab(page))
1433                 return _drbd_no_send_page(mdev, page, offset, size, msg_flags);
1434
1435         msg_flags |= MSG_NOSIGNAL;
1436         drbd_update_congested(mdev->tconn);
1437         set_fs(KERNEL_DS);
1438         do {
1439                 sent = mdev->tconn->data.socket->ops->sendpage(mdev->tconn->data.socket, page,
1440                                                         offset, len,
1441                                                         msg_flags);
1442                 if (sent == -EAGAIN) {
1443                         if (we_should_drop_the_connection(mdev->tconn,
1444                                                           mdev->tconn->data.socket))
1445                                 break;
1446                         else
1447                                 continue;
1448                 }
1449                 if (sent <= 0) {
1450                         dev_warn(DEV, "%s: size=%d len=%d sent=%d\n",
1451                              __func__, (int)size, len, sent);
1452                         break;
1453                 }
1454                 len    -= sent;
1455                 offset += sent;
1456         } while (len > 0 /* THINK && mdev->cstate >= C_CONNECTED*/);
1457         set_fs(oldfs);
1458         clear_bit(NET_CONGESTED, &mdev->tconn->flags);
1459
1460         ok = (len == 0);
1461         if (likely(ok))
1462                 mdev->send_cnt += size>>9;
1463         return ok;
1464 }
1465
1466 static int _drbd_send_bio(struct drbd_conf *mdev, struct bio *bio)
1467 {
1468         struct bio_vec *bvec;
1469         int i;
1470         /* hint all but last page with MSG_MORE */
1471         __bio_for_each_segment(bvec, bio, i, 0) {
1472                 if (!_drbd_no_send_page(mdev, bvec->bv_page,
1473                                      bvec->bv_offset, bvec->bv_len,
1474                                      i == bio->bi_vcnt -1 ? 0 : MSG_MORE))
1475                         return 0;
1476         }
1477         return 1;
1478 }
1479
1480 static int _drbd_send_zc_bio(struct drbd_conf *mdev, struct bio *bio)
1481 {
1482         struct bio_vec *bvec;
1483         int i;
1484         /* hint all but last page with MSG_MORE */
1485         __bio_for_each_segment(bvec, bio, i, 0) {
1486                 if (!_drbd_send_page(mdev, bvec->bv_page,
1487                                      bvec->bv_offset, bvec->bv_len,
1488                                      i == bio->bi_vcnt -1 ? 0 : MSG_MORE))
1489                         return 0;
1490         }
1491         return 1;
1492 }
1493
1494 static int _drbd_send_zc_ee(struct drbd_conf *mdev,
1495                             struct drbd_peer_request *peer_req)
1496 {
1497         struct page *page = peer_req->pages;
1498         unsigned len = peer_req->i.size;
1499
1500         /* hint all but last page with MSG_MORE */
1501         page_chain_for_each(page) {
1502                 unsigned l = min_t(unsigned, len, PAGE_SIZE);
1503                 if (!_drbd_send_page(mdev, page, 0, l,
1504                                 page_chain_next(page) ? MSG_MORE : 0))
1505                         return 0;
1506                 len -= l;
1507         }
1508         return 1;
1509 }
1510
1511 static u32 bio_flags_to_wire(struct drbd_conf *mdev, unsigned long bi_rw)
1512 {
1513         if (mdev->tconn->agreed_pro_version >= 95)
1514                 return  (bi_rw & REQ_SYNC ? DP_RW_SYNC : 0) |
1515                         (bi_rw & REQ_FUA ? DP_FUA : 0) |
1516                         (bi_rw & REQ_FLUSH ? DP_FLUSH : 0) |
1517                         (bi_rw & REQ_DISCARD ? DP_DISCARD : 0);
1518         else
1519                 return bi_rw & REQ_SYNC ? DP_RW_SYNC : 0;
1520 }
1521
1522 /* Used to send write requests
1523  * R_PRIMARY -> Peer    (P_DATA)
1524  */
1525 int drbd_send_dblock(struct drbd_conf *mdev, struct drbd_request *req)
1526 {
1527         int ok = 1;
1528         struct p_data p;
1529         unsigned int dp_flags = 0;
1530         void *dgb;
1531         int dgs;
1532
1533         if (!drbd_get_data_sock(mdev->tconn))
1534                 return 0;
1535
1536         dgs = (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_w_tfm) ?
1537                 crypto_hash_digestsize(mdev->tconn->integrity_w_tfm) : 0;
1538
1539         prepare_header(mdev, &p.head, P_DATA, sizeof(p) - sizeof(struct p_header) + dgs + req->i.size);
1540         p.sector   = cpu_to_be64(req->i.sector);
1541         p.block_id = (unsigned long)req;
1542         p.seq_num  = cpu_to_be32(req->seq_num = atomic_inc_return(&mdev->packet_seq));
1543
1544         dp_flags = bio_flags_to_wire(mdev, req->master_bio->bi_rw);
1545
1546         if (mdev->state.conn >= C_SYNC_SOURCE &&
1547             mdev->state.conn <= C_PAUSED_SYNC_T)
1548                 dp_flags |= DP_MAY_SET_IN_SYNC;
1549
1550         p.dp_flags = cpu_to_be32(dp_flags);
1551         set_bit(UNPLUG_REMOTE, &mdev->flags);
1552         ok = (sizeof(p) ==
1553                 drbd_send(mdev->tconn, mdev->tconn->data.socket, &p, sizeof(p), dgs ? MSG_MORE : 0));
1554         if (ok && dgs) {
1555                 dgb = mdev->tconn->int_dig_out;
1556                 drbd_csum_bio(mdev, mdev->tconn->integrity_w_tfm, req->master_bio, dgb);
1557                 ok = dgs == drbd_send(mdev->tconn, mdev->tconn->data.socket, dgb, dgs, 0);
1558         }
1559         if (ok) {
1560                 /* For protocol A, we have to memcpy the payload into
1561                  * socket buffers, as we may complete right away
1562                  * as soon as we handed it over to tcp, at which point the data
1563                  * pages may become invalid.
1564                  *
1565                  * For data-integrity enabled, we copy it as well, so we can be
1566                  * sure that even if the bio pages may still be modified, it
1567                  * won't change the data on the wire, thus if the digest checks
1568                  * out ok after sending on this side, but does not fit on the
1569                  * receiving side, we sure have detected corruption elsewhere.
1570                  */
1571                 if (mdev->tconn->net_conf->wire_protocol == DRBD_PROT_A || dgs)
1572                         ok = _drbd_send_bio(mdev, req->master_bio);
1573                 else
1574                         ok = _drbd_send_zc_bio(mdev, req->master_bio);
1575
1576                 /* double check digest, sometimes buffers have been modified in flight. */
1577                 if (dgs > 0 && dgs <= 64) {
1578                         /* 64 byte, 512 bit, is the largest digest size
1579                          * currently supported in kernel crypto. */
1580                         unsigned char digest[64];
1581                         drbd_csum_bio(mdev, mdev->tconn->integrity_w_tfm, req->master_bio, digest);
1582                         if (memcmp(mdev->tconn->int_dig_out, digest, dgs)) {
1583                                 dev_warn(DEV,
1584                                         "Digest mismatch, buffer modified by upper layers during write: %llus +%u\n",
1585                                         (unsigned long long)req->i.sector, req->i.size);
1586                         }
1587                 } /* else if (dgs > 64) {
1588                      ... Be noisy about digest too large ...
1589                 } */
1590         }
1591
1592         drbd_put_data_sock(mdev->tconn);
1593
1594         return ok;
1595 }
1596
1597 /* answer packet, used to send data back for read requests:
1598  *  Peer       -> (diskless) R_PRIMARY   (P_DATA_REPLY)
1599  *  C_SYNC_SOURCE -> C_SYNC_TARGET         (P_RS_DATA_REPLY)
1600  */
1601 int drbd_send_block(struct drbd_conf *mdev, enum drbd_packet cmd,
1602                     struct drbd_peer_request *peer_req)
1603 {
1604         int ok;
1605         struct p_data p;
1606         void *dgb;
1607         int dgs;
1608
1609         dgs = (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_w_tfm) ?
1610                 crypto_hash_digestsize(mdev->tconn->integrity_w_tfm) : 0;
1611
1612         prepare_header(mdev, &p.head, cmd, sizeof(p) -
1613                                            sizeof(struct p_header80) +
1614                                            dgs + peer_req->i.size);
1615         p.sector   = cpu_to_be64(peer_req->i.sector);
1616         p.block_id = peer_req->block_id;
1617         p.seq_num = 0;  /* unused */
1618
1619         /* Only called by our kernel thread.
1620          * This one may be interrupted by DRBD_SIG and/or DRBD_SIGKILL
1621          * in response to admin command or module unload.
1622          */
1623         if (!drbd_get_data_sock(mdev->tconn))
1624                 return 0;
1625
1626         ok = sizeof(p) == drbd_send(mdev->tconn, mdev->tconn->data.socket, &p, sizeof(p), dgs ? MSG_MORE : 0);
1627         if (ok && dgs) {
1628                 dgb = mdev->tconn->int_dig_out;
1629                 drbd_csum_ee(mdev, mdev->tconn->integrity_w_tfm, peer_req, dgb);
1630                 ok = dgs == drbd_send(mdev->tconn, mdev->tconn->data.socket, dgb, dgs, 0);
1631         }
1632         if (ok)
1633                 ok = _drbd_send_zc_ee(mdev, peer_req);
1634
1635         drbd_put_data_sock(mdev->tconn);
1636
1637         return ok;
1638 }
1639
1640 int drbd_send_oos(struct drbd_conf *mdev, struct drbd_request *req)
1641 {
1642         struct p_block_desc p;
1643
1644         p.sector  = cpu_to_be64(req->i.sector);
1645         p.blksize = cpu_to_be32(req->i.size);
1646
1647         return drbd_send_cmd(mdev, USE_DATA_SOCKET, P_OUT_OF_SYNC, &p.head, sizeof(p));
1648 }
1649
1650 /*
1651   drbd_send distinguishes two cases:
1652
1653   Packets sent via the data socket "sock"
1654   and packets sent via the meta data socket "msock"
1655
1656                     sock                      msock
1657   -----------------+-------------------------+------------------------------
1658   timeout           conf.timeout / 2          conf.timeout / 2
1659   timeout action    send a ping via msock     Abort communication
1660                                               and close all sockets
1661 */
1662
1663 /*
1664  * you must have down()ed the appropriate [m]sock_mutex elsewhere!
1665  */
1666 int drbd_send(struct drbd_tconn *tconn, struct socket *sock,
1667               void *buf, size_t size, unsigned msg_flags)
1668 {
1669         struct kvec iov;
1670         struct msghdr msg;
1671         int rv, sent = 0;
1672
1673         if (!sock)
1674                 return -1000;
1675
1676         /* THINK  if (signal_pending) return ... ? */
1677
1678         iov.iov_base = buf;
1679         iov.iov_len  = size;
1680
1681         msg.msg_name       = NULL;
1682         msg.msg_namelen    = 0;
1683         msg.msg_control    = NULL;
1684         msg.msg_controllen = 0;
1685         msg.msg_flags      = msg_flags | MSG_NOSIGNAL;
1686
1687         if (sock == tconn->data.socket) {
1688                 tconn->ko_count = tconn->net_conf->ko_count;
1689                 drbd_update_congested(tconn);
1690         }
1691         do {
1692                 /* STRANGE
1693                  * tcp_sendmsg does _not_ use its size parameter at all ?
1694                  *
1695                  * -EAGAIN on timeout, -EINTR on signal.
1696                  */
1697 /* THINK
1698  * do we need to block DRBD_SIG if sock == &meta.socket ??
1699  * otherwise wake_asender() might interrupt some send_*Ack !
1700  */
1701                 rv = kernel_sendmsg(sock, &msg, &iov, 1, size);
1702                 if (rv == -EAGAIN) {
1703                         if (we_should_drop_the_connection(tconn, sock))
1704                                 break;
1705                         else
1706                                 continue;
1707                 }
1708                 if (rv == -EINTR) {
1709                         flush_signals(current);
1710                         rv = 0;
1711                 }
1712                 if (rv < 0)
1713                         break;
1714                 sent += rv;
1715                 iov.iov_base += rv;
1716                 iov.iov_len  -= rv;
1717         } while (sent < size);
1718
1719         if (sock == tconn->data.socket)
1720                 clear_bit(NET_CONGESTED, &tconn->flags);
1721
1722         if (rv <= 0) {
1723                 if (rv != -EAGAIN) {
1724                         conn_err(tconn, "%s_sendmsg returned %d\n",
1725                                  sock == tconn->meta.socket ? "msock" : "sock",
1726                                  rv);
1727                         conn_request_state(tconn, NS(conn, C_BROKEN_PIPE), CS_HARD);
1728                 } else
1729                         conn_request_state(tconn, NS(conn, C_TIMEOUT), CS_HARD);
1730         }
1731
1732         return sent;
1733 }
1734
1735 static int drbd_open(struct block_device *bdev, fmode_t mode)
1736 {
1737         struct drbd_conf *mdev = bdev->bd_disk->private_data;
1738         unsigned long flags;
1739         int rv = 0;
1740
1741         mutex_lock(&drbd_main_mutex);
1742         spin_lock_irqsave(&mdev->tconn->req_lock, flags);
1743         /* to have a stable mdev->state.role
1744          * and no race with updating open_cnt */
1745
1746         if (mdev->state.role != R_PRIMARY) {
1747                 if (mode & FMODE_WRITE)
1748                         rv = -EROFS;
1749                 else if (!allow_oos)
1750                         rv = -EMEDIUMTYPE;
1751         }
1752
1753         if (!rv)
1754                 mdev->open_cnt++;
1755         spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
1756         mutex_unlock(&drbd_main_mutex);
1757
1758         return rv;
1759 }
1760
1761 static int drbd_release(struct gendisk *gd, fmode_t mode)
1762 {
1763         struct drbd_conf *mdev = gd->private_data;
1764         mutex_lock(&drbd_main_mutex);
1765         mdev->open_cnt--;
1766         mutex_unlock(&drbd_main_mutex);
1767         return 0;
1768 }
1769
1770 static void drbd_set_defaults(struct drbd_conf *mdev)
1771 {
1772         /* This way we get a compile error when sync_conf grows,
1773            and we forgot to initialize it here */
1774         mdev->sync_conf = (struct syncer_conf) {
1775                 /* .rate = */           DRBD_RATE_DEF,
1776                 /* .after = */          DRBD_AFTER_DEF,
1777                 /* .al_extents = */     DRBD_AL_EXTENTS_DEF,
1778                 /* .verify_alg = */     {}, 0,
1779                 /* .cpu_mask = */       {}, 0,
1780                 /* .csums_alg = */      {}, 0,
1781                 /* .use_rle = */        0,
1782                 /* .on_no_data = */     DRBD_ON_NO_DATA_DEF,
1783                 /* .c_plan_ahead = */   DRBD_C_PLAN_AHEAD_DEF,
1784                 /* .c_delay_target = */ DRBD_C_DELAY_TARGET_DEF,
1785                 /* .c_fill_target = */  DRBD_C_FILL_TARGET_DEF,
1786                 /* .c_max_rate = */     DRBD_C_MAX_RATE_DEF,
1787                 /* .c_min_rate = */     DRBD_C_MIN_RATE_DEF
1788         };
1789
1790         /* Have to use that way, because the layout differs between
1791            big endian and little endian */
1792         mdev->state = (union drbd_state) {
1793                 { .role = R_SECONDARY,
1794                   .peer = R_UNKNOWN,
1795                   .conn = C_STANDALONE,
1796                   .disk = D_DISKLESS,
1797                   .pdsk = D_UNKNOWN,
1798                   .susp = 0,
1799                   .susp_nod = 0,
1800                   .susp_fen = 0
1801                 } };
1802 }
1803
1804 void drbd_init_set_defaults(struct drbd_conf *mdev)
1805 {
1806         /* the memset(,0,) did most of this.
1807          * note: only assignments, no allocation in here */
1808
1809         drbd_set_defaults(mdev);
1810
1811         atomic_set(&mdev->ap_bio_cnt, 0);
1812         atomic_set(&mdev->ap_pending_cnt, 0);
1813         atomic_set(&mdev->rs_pending_cnt, 0);
1814         atomic_set(&mdev->unacked_cnt, 0);
1815         atomic_set(&mdev->local_cnt, 0);
1816         atomic_set(&mdev->pp_in_use, 0);
1817         atomic_set(&mdev->pp_in_use_by_net, 0);
1818         atomic_set(&mdev->rs_sect_in, 0);
1819         atomic_set(&mdev->rs_sect_ev, 0);
1820         atomic_set(&mdev->ap_in_flight, 0);
1821
1822         mutex_init(&mdev->md_io_mutex);
1823         mutex_init(&mdev->own_state_mutex);
1824         mdev->state_mutex = &mdev->own_state_mutex;
1825
1826         spin_lock_init(&mdev->al_lock);
1827         spin_lock_init(&mdev->peer_seq_lock);
1828         spin_lock_init(&mdev->epoch_lock);
1829
1830         INIT_LIST_HEAD(&mdev->active_ee);
1831         INIT_LIST_HEAD(&mdev->sync_ee);
1832         INIT_LIST_HEAD(&mdev->done_ee);
1833         INIT_LIST_HEAD(&mdev->read_ee);
1834         INIT_LIST_HEAD(&mdev->net_ee);
1835         INIT_LIST_HEAD(&mdev->resync_reads);
1836         INIT_LIST_HEAD(&mdev->resync_work.list);
1837         INIT_LIST_HEAD(&mdev->unplug_work.list);
1838         INIT_LIST_HEAD(&mdev->go_diskless.list);
1839         INIT_LIST_HEAD(&mdev->md_sync_work.list);
1840         INIT_LIST_HEAD(&mdev->start_resync_work.list);
1841         INIT_LIST_HEAD(&mdev->bm_io_work.w.list);
1842
1843         mdev->resync_work.cb  = w_resync_timer;
1844         mdev->unplug_work.cb  = w_send_write_hint;
1845         mdev->go_diskless.cb  = w_go_diskless;
1846         mdev->md_sync_work.cb = w_md_sync;
1847         mdev->bm_io_work.w.cb = w_bitmap_io;
1848         mdev->start_resync_work.cb = w_start_resync;
1849
1850         mdev->resync_work.mdev  = mdev;
1851         mdev->unplug_work.mdev  = mdev;
1852         mdev->go_diskless.mdev  = mdev;
1853         mdev->md_sync_work.mdev = mdev;
1854         mdev->bm_io_work.w.mdev = mdev;
1855         mdev->start_resync_work.mdev = mdev;
1856
1857         init_timer(&mdev->resync_timer);
1858         init_timer(&mdev->md_sync_timer);
1859         init_timer(&mdev->start_resync_timer);
1860         init_timer(&mdev->request_timer);
1861         mdev->resync_timer.function = resync_timer_fn;
1862         mdev->resync_timer.data = (unsigned long) mdev;
1863         mdev->md_sync_timer.function = md_sync_timer_fn;
1864         mdev->md_sync_timer.data = (unsigned long) mdev;
1865         mdev->start_resync_timer.function = start_resync_timer_fn;
1866         mdev->start_resync_timer.data = (unsigned long) mdev;
1867         mdev->request_timer.function = request_timer_fn;
1868         mdev->request_timer.data = (unsigned long) mdev;
1869
1870         init_waitqueue_head(&mdev->misc_wait);
1871         init_waitqueue_head(&mdev->state_wait);
1872         init_waitqueue_head(&mdev->ee_wait);
1873         init_waitqueue_head(&mdev->al_wait);
1874         init_waitqueue_head(&mdev->seq_wait);
1875
1876         /* mdev->tconn->agreed_pro_version gets initialized in drbd_connect() */
1877         mdev->write_ordering = WO_bdev_flush;
1878         mdev->resync_wenr = LC_FREE;
1879         mdev->peer_max_bio_size = DRBD_MAX_BIO_SIZE_SAFE;
1880         mdev->local_max_bio_size = DRBD_MAX_BIO_SIZE_SAFE;
1881 }
1882
1883 void drbd_mdev_cleanup(struct drbd_conf *mdev)
1884 {
1885         int i;
1886         if (mdev->tconn->receiver.t_state != NONE)
1887                 dev_err(DEV, "ASSERT FAILED: receiver t_state == %d expected 0.\n",
1888                                 mdev->tconn->receiver.t_state);
1889
1890         /* no need to lock it, I'm the only thread alive */
1891         if (atomic_read(&mdev->current_epoch->epoch_size) !=  0)
1892                 dev_err(DEV, "epoch_size:%d\n", atomic_read(&mdev->current_epoch->epoch_size));
1893         mdev->al_writ_cnt  =
1894         mdev->bm_writ_cnt  =
1895         mdev->read_cnt     =
1896         mdev->recv_cnt     =
1897         mdev->send_cnt     =
1898         mdev->writ_cnt     =
1899         mdev->p_size       =
1900         mdev->rs_start     =
1901         mdev->rs_total     =
1902         mdev->rs_failed    = 0;
1903         mdev->rs_last_events = 0;
1904         mdev->rs_last_sect_ev = 0;
1905         for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1906                 mdev->rs_mark_left[i] = 0;
1907                 mdev->rs_mark_time[i] = 0;
1908         }
1909         D_ASSERT(mdev->tconn->net_conf == NULL);
1910
1911         drbd_set_my_capacity(mdev, 0);
1912         if (mdev->bitmap) {
1913                 /* maybe never allocated. */
1914                 drbd_bm_resize(mdev, 0, 1);
1915                 drbd_bm_cleanup(mdev);
1916         }
1917
1918         drbd_free_resources(mdev);
1919         clear_bit(AL_SUSPENDED, &mdev->flags);
1920
1921         /*
1922          * currently we drbd_init_ee only on module load, so
1923          * we may do drbd_release_ee only on module unload!
1924          */
1925         D_ASSERT(list_empty(&mdev->active_ee));
1926         D_ASSERT(list_empty(&mdev->sync_ee));
1927         D_ASSERT(list_empty(&mdev->done_ee));
1928         D_ASSERT(list_empty(&mdev->read_ee));
1929         D_ASSERT(list_empty(&mdev->net_ee));
1930         D_ASSERT(list_empty(&mdev->resync_reads));
1931         D_ASSERT(list_empty(&mdev->tconn->data.work.q));
1932         D_ASSERT(list_empty(&mdev->tconn->meta.work.q));
1933         D_ASSERT(list_empty(&mdev->resync_work.list));
1934         D_ASSERT(list_empty(&mdev->unplug_work.list));
1935         D_ASSERT(list_empty(&mdev->go_diskless.list));
1936
1937         drbd_set_defaults(mdev);
1938 }
1939
1940
1941 static void drbd_destroy_mempools(void)
1942 {
1943         struct page *page;
1944
1945         while (drbd_pp_pool) {
1946                 page = drbd_pp_pool;
1947                 drbd_pp_pool = (struct page *)page_private(page);
1948                 __free_page(page);
1949                 drbd_pp_vacant--;
1950         }
1951
1952         /* D_ASSERT(atomic_read(&drbd_pp_vacant)==0); */
1953
1954         if (drbd_ee_mempool)
1955                 mempool_destroy(drbd_ee_mempool);
1956         if (drbd_request_mempool)
1957                 mempool_destroy(drbd_request_mempool);
1958         if (drbd_ee_cache)
1959                 kmem_cache_destroy(drbd_ee_cache);
1960         if (drbd_request_cache)
1961                 kmem_cache_destroy(drbd_request_cache);
1962         if (drbd_bm_ext_cache)
1963                 kmem_cache_destroy(drbd_bm_ext_cache);
1964         if (drbd_al_ext_cache)
1965                 kmem_cache_destroy(drbd_al_ext_cache);
1966
1967         drbd_ee_mempool      = NULL;
1968         drbd_request_mempool = NULL;
1969         drbd_ee_cache        = NULL;
1970         drbd_request_cache   = NULL;
1971         drbd_bm_ext_cache    = NULL;
1972         drbd_al_ext_cache    = NULL;
1973
1974         return;
1975 }
1976
1977 static int drbd_create_mempools(void)
1978 {
1979         struct page *page;
1980         const int number = (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count;
1981         int i;
1982
1983         /* prepare our caches and mempools */
1984         drbd_request_mempool = NULL;
1985         drbd_ee_cache        = NULL;
1986         drbd_request_cache   = NULL;
1987         drbd_bm_ext_cache    = NULL;
1988         drbd_al_ext_cache    = NULL;
1989         drbd_pp_pool         = NULL;
1990
1991         /* caches */
1992         drbd_request_cache = kmem_cache_create(
1993                 "drbd_req", sizeof(struct drbd_request), 0, 0, NULL);
1994         if (drbd_request_cache == NULL)
1995                 goto Enomem;
1996
1997         drbd_ee_cache = kmem_cache_create(
1998                 "drbd_ee", sizeof(struct drbd_peer_request), 0, 0, NULL);
1999         if (drbd_ee_cache == NULL)
2000                 goto Enomem;
2001
2002         drbd_bm_ext_cache = kmem_cache_create(
2003                 "drbd_bm", sizeof(struct bm_extent), 0, 0, NULL);
2004         if (drbd_bm_ext_cache == NULL)
2005                 goto Enomem;
2006
2007         drbd_al_ext_cache = kmem_cache_create(
2008                 "drbd_al", sizeof(struct lc_element), 0, 0, NULL);
2009         if (drbd_al_ext_cache == NULL)
2010                 goto Enomem;
2011
2012         /* mempools */
2013         drbd_request_mempool = mempool_create(number,
2014                 mempool_alloc_slab, mempool_free_slab, drbd_request_cache);
2015         if (drbd_request_mempool == NULL)
2016                 goto Enomem;
2017
2018         drbd_ee_mempool = mempool_create(number,
2019                 mempool_alloc_slab, mempool_free_slab, drbd_ee_cache);
2020         if (drbd_ee_mempool == NULL)
2021                 goto Enomem;
2022
2023         /* drbd's page pool */
2024         spin_lock_init(&drbd_pp_lock);
2025
2026         for (i = 0; i < number; i++) {
2027                 page = alloc_page(GFP_HIGHUSER);
2028                 if (!page)
2029                         goto Enomem;
2030                 set_page_private(page, (unsigned long)drbd_pp_pool);
2031                 drbd_pp_pool = page;
2032         }
2033         drbd_pp_vacant = number;
2034
2035         return 0;
2036
2037 Enomem:
2038         drbd_destroy_mempools(); /* in case we allocated some */
2039         return -ENOMEM;
2040 }
2041
2042 static int drbd_notify_sys(struct notifier_block *this, unsigned long code,
2043         void *unused)
2044 {
2045         /* just so we have it.  you never know what interesting things we
2046          * might want to do here some day...
2047          */
2048
2049         return NOTIFY_DONE;
2050 }
2051
2052 static struct notifier_block drbd_notifier = {
2053         .notifier_call = drbd_notify_sys,
2054 };
2055
2056 static void drbd_release_ee_lists(struct drbd_conf *mdev)
2057 {
2058         int rr;
2059
2060         rr = drbd_release_ee(mdev, &mdev->active_ee);
2061         if (rr)
2062                 dev_err(DEV, "%d EEs in active list found!\n", rr);
2063
2064         rr = drbd_release_ee(mdev, &mdev->sync_ee);
2065         if (rr)
2066                 dev_err(DEV, "%d EEs in sync list found!\n", rr);
2067
2068         rr = drbd_release_ee(mdev, &mdev->read_ee);
2069         if (rr)
2070                 dev_err(DEV, "%d EEs in read list found!\n", rr);
2071
2072         rr = drbd_release_ee(mdev, &mdev->done_ee);
2073         if (rr)
2074                 dev_err(DEV, "%d EEs in done list found!\n", rr);
2075
2076         rr = drbd_release_ee(mdev, &mdev->net_ee);
2077         if (rr)
2078                 dev_err(DEV, "%d EEs in net list found!\n", rr);
2079 }
2080
2081 /* caution. no locking.
2082  * currently only used from module cleanup code. */
2083 static void drbd_delete_device(unsigned int minor)
2084 {
2085         struct drbd_conf *mdev = minor_to_mdev(minor);
2086
2087         if (!mdev)
2088                 return;
2089
2090         /* paranoia asserts */
2091         D_ASSERT(mdev->open_cnt == 0);
2092         D_ASSERT(list_empty(&mdev->tconn->data.work.q));
2093         /* end paranoia asserts */
2094
2095         del_gendisk(mdev->vdisk);
2096
2097         /* cleanup stuff that may have been allocated during
2098          * device (re-)configuration or state changes */
2099
2100         if (mdev->this_bdev)
2101                 bdput(mdev->this_bdev);
2102
2103         drbd_free_resources(mdev);
2104         drbd_free_tconn(mdev->tconn);
2105
2106         drbd_release_ee_lists(mdev);
2107
2108         lc_destroy(mdev->act_log);
2109         lc_destroy(mdev->resync);
2110
2111         kfree(mdev->p_uuid);
2112         /* mdev->p_uuid = NULL; */
2113
2114         /* cleanup the rest that has been
2115          * allocated from drbd_new_device
2116          * and actually free the mdev itself */
2117         drbd_free_mdev(mdev);
2118 }
2119
2120 static void drbd_cleanup(void)
2121 {
2122         unsigned int i;
2123
2124         unregister_reboot_notifier(&drbd_notifier);
2125
2126         /* first remove proc,
2127          * drbdsetup uses it's presence to detect
2128          * whether DRBD is loaded.
2129          * If we would get stuck in proc removal,
2130          * but have netlink already deregistered,
2131          * some drbdsetup commands may wait forever
2132          * for an answer.
2133          */
2134         if (drbd_proc)
2135                 remove_proc_entry("drbd", NULL);
2136
2137         drbd_nl_cleanup();
2138
2139         if (minor_table) {
2140                 i = minor_count;
2141                 while (i--)
2142                         drbd_delete_device(i);
2143                 drbd_destroy_mempools();
2144         }
2145
2146         kfree(minor_table);
2147
2148         unregister_blkdev(DRBD_MAJOR, "drbd");
2149
2150         printk(KERN_INFO "drbd: module cleanup done.\n");
2151 }
2152
2153 /**
2154  * drbd_congested() - Callback for pdflush
2155  * @congested_data:     User data
2156  * @bdi_bits:           Bits pdflush is currently interested in
2157  *
2158  * Returns 1<<BDI_async_congested and/or 1<<BDI_sync_congested if we are congested.
2159  */
2160 static int drbd_congested(void *congested_data, int bdi_bits)
2161 {
2162         struct drbd_conf *mdev = congested_data;
2163         struct request_queue *q;
2164         char reason = '-';
2165         int r = 0;
2166
2167         if (!may_inc_ap_bio(mdev)) {
2168                 /* DRBD has frozen IO */
2169                 r = bdi_bits;
2170                 reason = 'd';
2171                 goto out;
2172         }
2173
2174         if (get_ldev(mdev)) {
2175                 q = bdev_get_queue(mdev->ldev->backing_bdev);
2176                 r = bdi_congested(&q->backing_dev_info, bdi_bits);
2177                 put_ldev(mdev);
2178                 if (r)
2179                         reason = 'b';
2180         }
2181
2182         if (bdi_bits & (1 << BDI_async_congested) && test_bit(NET_CONGESTED, &mdev->tconn->flags)) {
2183                 r |= (1 << BDI_async_congested);
2184                 reason = reason == 'b' ? 'a' : 'n';
2185         }
2186
2187 out:
2188         mdev->congestion_reason = reason;
2189         return r;
2190 }
2191
2192 static void drbd_init_workqueue(struct drbd_work_queue* wq)
2193 {
2194         sema_init(&wq->s, 0);
2195         spin_lock_init(&wq->q_lock);
2196         INIT_LIST_HEAD(&wq->q);
2197 }
2198
2199 struct drbd_tconn *conn_by_name(const char *name)
2200 {
2201         struct drbd_tconn *tconn;
2202
2203         write_lock_irq(&global_state_lock);
2204         list_for_each_entry(tconn, &drbd_tconns, all_tconn) {
2205                 if (!strcmp(tconn->name, name))
2206                         goto found;
2207         }
2208         tconn = NULL;
2209 found:
2210         write_unlock_irq(&global_state_lock);
2211         return tconn;
2212 }
2213
2214 struct drbd_tconn *drbd_new_tconn(char *name)
2215 {
2216         struct drbd_tconn *tconn;
2217
2218         tconn = kzalloc(sizeof(struct drbd_tconn), GFP_KERNEL);
2219         if (!tconn)
2220                 return NULL;
2221
2222         tconn->name = kstrdup(name, GFP_KERNEL);
2223         if (!tconn->name)
2224                 goto fail;
2225
2226         if (!tl_init(tconn))
2227                 goto fail;
2228
2229         tconn->cstate = C_STANDALONE;
2230         mutex_init(&tconn->cstate_mutex);
2231         spin_lock_init(&tconn->req_lock);
2232         atomic_set(&tconn->net_cnt, 0);
2233         init_waitqueue_head(&tconn->net_cnt_wait);
2234         init_waitqueue_head(&tconn->ping_wait);
2235         idr_init(&tconn->volumes);
2236
2237         drbd_init_workqueue(&tconn->data.work);
2238         mutex_init(&tconn->data.mutex);
2239
2240         drbd_init_workqueue(&tconn->meta.work);
2241         mutex_init(&tconn->meta.mutex);
2242
2243         drbd_thread_init(tconn, &tconn->receiver, drbdd_init, "receiver");
2244         drbd_thread_init(tconn, &tconn->worker, drbd_worker, "worker");
2245         drbd_thread_init(tconn, &tconn->asender, drbd_asender, "asender");
2246
2247         write_lock_irq(&global_state_lock);
2248         list_add(&tconn->all_tconn, &drbd_tconns);
2249         write_unlock_irq(&global_state_lock);
2250
2251         return tconn;
2252
2253 fail:
2254         tl_cleanup(tconn);
2255         kfree(tconn->name);
2256         kfree(tconn);
2257
2258         return NULL;
2259 }
2260
2261 void drbd_free_tconn(struct drbd_tconn *tconn)
2262 {
2263         write_lock_irq(&global_state_lock);
2264         list_del(&tconn->all_tconn);
2265         write_unlock_irq(&global_state_lock);
2266         idr_destroy(&tconn->volumes);
2267
2268         kfree(tconn->name);
2269         kfree(tconn->int_dig_out);
2270         kfree(tconn->int_dig_in);
2271         kfree(tconn->int_dig_vv);
2272         kfree(tconn);
2273 }
2274
2275 struct drbd_conf *drbd_new_device(unsigned int minor)
2276 {
2277         struct drbd_conf *mdev;
2278         struct gendisk *disk;
2279         struct request_queue *q;
2280         char conn_name[9]; /* drbd1234N */
2281         int vnr;
2282
2283         /* GFP_KERNEL, we are outside of all write-out paths */
2284         mdev = kzalloc(sizeof(struct drbd_conf), GFP_KERNEL);
2285         if (!mdev)
2286                 return NULL;
2287         sprintf(conn_name, "drbd%d", minor);
2288         mdev->tconn = drbd_new_tconn(conn_name);
2289         if (!mdev->tconn)
2290                 goto out_no_tconn;
2291         if (!idr_pre_get(&mdev->tconn->volumes, GFP_KERNEL))
2292                 goto out_no_cpumask;
2293         if (idr_get_new(&mdev->tconn->volumes, mdev, &vnr))
2294                 goto out_no_cpumask;
2295         if (vnr != 0) {
2296                 dev_err(DEV, "vnr = %d\n", vnr);
2297                 goto out_no_cpumask;
2298         }
2299         if (!zalloc_cpumask_var(&mdev->tconn->cpu_mask, GFP_KERNEL))
2300                 goto out_no_cpumask;
2301
2302         mdev->minor = minor;
2303
2304         drbd_init_set_defaults(mdev);
2305
2306         q = blk_alloc_queue(GFP_KERNEL);
2307         if (!q)
2308                 goto out_no_q;
2309         mdev->rq_queue = q;
2310         q->queuedata   = mdev;
2311
2312         disk = alloc_disk(1);
2313         if (!disk)
2314                 goto out_no_disk;
2315         mdev->vdisk = disk;
2316
2317         set_disk_ro(disk, true);
2318
2319         disk->queue = q;
2320         disk->major = DRBD_MAJOR;
2321         disk->first_minor = minor;
2322         disk->fops = &drbd_ops;
2323         sprintf(disk->disk_name, "drbd%d", minor);
2324         disk->private_data = mdev;
2325
2326         mdev->this_bdev = bdget(MKDEV(DRBD_MAJOR, minor));
2327         /* we have no partitions. we contain only ourselves. */
2328         mdev->this_bdev->bd_contains = mdev->this_bdev;
2329
2330         q->backing_dev_info.congested_fn = drbd_congested;
2331         q->backing_dev_info.congested_data = mdev;
2332
2333         blk_queue_make_request(q, drbd_make_request);
2334         /* Setting the max_hw_sectors to an odd value of 8kibyte here
2335            This triggers a max_bio_size message upon first attach or connect */
2336         blk_queue_max_hw_sectors(q, DRBD_MAX_BIO_SIZE_SAFE >> 8);
2337         blk_queue_bounce_limit(q, BLK_BOUNCE_ANY);
2338         blk_queue_merge_bvec(q, drbd_merge_bvec);
2339         q->queue_lock = &mdev->tconn->req_lock; /* needed since we use */
2340
2341         mdev->md_io_page = alloc_page(GFP_KERNEL);
2342         if (!mdev->md_io_page)
2343                 goto out_no_io_page;
2344
2345         if (drbd_bm_init(mdev))
2346                 goto out_no_bitmap;
2347         mdev->read_requests = RB_ROOT;
2348         mdev->write_requests = RB_ROOT;
2349
2350         mdev->current_epoch = kzalloc(sizeof(struct drbd_epoch), GFP_KERNEL);
2351         if (!mdev->current_epoch)
2352                 goto out_no_epoch;
2353
2354         INIT_LIST_HEAD(&mdev->current_epoch->list);
2355         mdev->epochs = 1;
2356
2357         return mdev;
2358
2359 /* out_whatever_else:
2360         kfree(mdev->current_epoch); */
2361 out_no_epoch:
2362         drbd_bm_cleanup(mdev);
2363 out_no_bitmap:
2364         __free_page(mdev->md_io_page);
2365 out_no_io_page:
2366         put_disk(disk);
2367 out_no_disk:
2368         blk_cleanup_queue(q);
2369 out_no_q:
2370         free_cpumask_var(mdev->tconn->cpu_mask);
2371 out_no_cpumask:
2372         drbd_free_tconn(mdev->tconn);
2373 out_no_tconn:
2374         kfree(mdev);
2375         return NULL;
2376 }
2377
2378 /* counterpart of drbd_new_device.
2379  * last part of drbd_delete_device. */
2380 void drbd_free_mdev(struct drbd_conf *mdev)
2381 {
2382         kfree(mdev->current_epoch);
2383         if (mdev->bitmap) /* should no longer be there. */
2384                 drbd_bm_cleanup(mdev);
2385         __free_page(mdev->md_io_page);
2386         put_disk(mdev->vdisk);
2387         blk_cleanup_queue(mdev->rq_queue);
2388         kfree(mdev);
2389 }
2390
2391
2392 int __init drbd_init(void)
2393 {
2394         int err;
2395
2396         BUILD_BUG_ON(sizeof(struct p_header80) != sizeof(struct p_header95));
2397         BUILD_BUG_ON(sizeof(struct p_handshake) != 80);
2398
2399         if (minor_count < DRBD_MINOR_COUNT_MIN || minor_count > DRBD_MINOR_COUNT_MAX) {
2400                 printk(KERN_ERR
2401                         "drbd: invalid minor_count (%d)\n", minor_count);
2402 #ifdef MODULE
2403                 return -EINVAL;
2404 #else
2405                 minor_count = 8;
2406 #endif
2407         }
2408
2409         err = drbd_nl_init();
2410         if (err)
2411                 return err;
2412
2413         err = register_blkdev(DRBD_MAJOR, "drbd");
2414         if (err) {
2415                 printk(KERN_ERR
2416                        "drbd: unable to register block device major %d\n",
2417                        DRBD_MAJOR);
2418                 return err;
2419         }
2420
2421         register_reboot_notifier(&drbd_notifier);
2422
2423         /*
2424          * allocate all necessary structs
2425          */
2426         err = -ENOMEM;
2427
2428         init_waitqueue_head(&drbd_pp_wait);
2429
2430         drbd_proc = NULL; /* play safe for drbd_cleanup */
2431         minor_table = kzalloc(sizeof(struct drbd_conf *)*minor_count,
2432                                 GFP_KERNEL);
2433         if (!minor_table)
2434                 goto Enomem;
2435
2436         err = drbd_create_mempools();
2437         if (err)
2438                 goto Enomem;
2439
2440         drbd_proc = proc_create_data("drbd", S_IFREG | S_IRUGO , NULL, &drbd_proc_fops, NULL);
2441         if (!drbd_proc) {
2442                 printk(KERN_ERR "drbd: unable to register proc file\n");
2443                 goto Enomem;
2444         }
2445
2446         rwlock_init(&global_state_lock);
2447         INIT_LIST_HEAD(&drbd_tconns);
2448
2449         printk(KERN_INFO "drbd: initialized. "
2450                "Version: " REL_VERSION " (api:%d/proto:%d-%d)\n",
2451                API_VERSION, PRO_VERSION_MIN, PRO_VERSION_MAX);
2452         printk(KERN_INFO "drbd: %s\n", drbd_buildtag());
2453         printk(KERN_INFO "drbd: registered as block device major %d\n",
2454                 DRBD_MAJOR);
2455         printk(KERN_INFO "drbd: minor_table @ 0x%p\n", minor_table);
2456
2457         return 0; /* Success! */
2458
2459 Enomem:
2460         drbd_cleanup();
2461         if (err == -ENOMEM)
2462                 /* currently always the case */
2463                 printk(KERN_ERR "drbd: ran out of memory\n");
2464         else
2465                 printk(KERN_ERR "drbd: initialization failure\n");
2466         return err;
2467 }
2468
2469 void drbd_free_bc(struct drbd_backing_dev *ldev)
2470 {
2471         if (ldev == NULL)
2472                 return;
2473
2474         blkdev_put(ldev->backing_bdev, FMODE_READ | FMODE_WRITE | FMODE_EXCL);
2475         blkdev_put(ldev->md_bdev, FMODE_READ | FMODE_WRITE | FMODE_EXCL);
2476
2477         kfree(ldev);
2478 }
2479
2480 void drbd_free_sock(struct drbd_tconn *tconn)
2481 {
2482         if (tconn->data.socket) {
2483                 mutex_lock(&tconn->data.mutex);
2484                 kernel_sock_shutdown(tconn->data.socket, SHUT_RDWR);
2485                 sock_release(tconn->data.socket);
2486                 tconn->data.socket = NULL;
2487                 mutex_unlock(&tconn->data.mutex);
2488         }
2489         if (tconn->meta.socket) {
2490                 mutex_lock(&tconn->meta.mutex);
2491                 kernel_sock_shutdown(tconn->meta.socket, SHUT_RDWR);
2492                 sock_release(tconn->meta.socket);
2493                 tconn->meta.socket = NULL;
2494                 mutex_unlock(&tconn->meta.mutex);
2495         }
2496 }
2497
2498
2499 void drbd_free_resources(struct drbd_conf *mdev)
2500 {
2501         crypto_free_hash(mdev->csums_tfm);
2502         mdev->csums_tfm = NULL;
2503         crypto_free_hash(mdev->verify_tfm);
2504         mdev->verify_tfm = NULL;
2505         crypto_free_hash(mdev->tconn->cram_hmac_tfm);
2506         mdev->tconn->cram_hmac_tfm = NULL;
2507         crypto_free_hash(mdev->tconn->integrity_w_tfm);
2508         mdev->tconn->integrity_w_tfm = NULL;
2509         crypto_free_hash(mdev->tconn->integrity_r_tfm);
2510         mdev->tconn->integrity_r_tfm = NULL;
2511
2512         drbd_free_sock(mdev->tconn);
2513
2514         __no_warn(local,
2515                   drbd_free_bc(mdev->ldev);
2516                   mdev->ldev = NULL;);
2517 }
2518
2519 /* meta data management */
2520
2521 struct meta_data_on_disk {
2522         u64 la_size;           /* last agreed size. */
2523         u64 uuid[UI_SIZE];   /* UUIDs. */
2524         u64 device_uuid;
2525         u64 reserved_u64_1;
2526         u32 flags;             /* MDF */
2527         u32 magic;
2528         u32 md_size_sect;
2529         u32 al_offset;         /* offset to this block */
2530         u32 al_nr_extents;     /* important for restoring the AL */
2531               /* `-- act_log->nr_elements <-- sync_conf.al_extents */
2532         u32 bm_offset;         /* offset to the bitmap, from here */
2533         u32 bm_bytes_per_bit;  /* BM_BLOCK_SIZE */
2534         u32 la_peer_max_bio_size;   /* last peer max_bio_size */
2535         u32 reserved_u32[3];
2536
2537 } __packed;
2538
2539 /**
2540  * drbd_md_sync() - Writes the meta data super block if the MD_DIRTY flag bit is set
2541  * @mdev:       DRBD device.
2542  */
2543 void drbd_md_sync(struct drbd_conf *mdev)
2544 {
2545         struct meta_data_on_disk *buffer;
2546         sector_t sector;
2547         int i;
2548
2549         del_timer(&mdev->md_sync_timer);
2550         /* timer may be rearmed by drbd_md_mark_dirty() now. */
2551         if (!test_and_clear_bit(MD_DIRTY, &mdev->flags))
2552                 return;
2553
2554         /* We use here D_FAILED and not D_ATTACHING because we try to write
2555          * metadata even if we detach due to a disk failure! */
2556         if (!get_ldev_if_state(mdev, D_FAILED))
2557                 return;
2558
2559         mutex_lock(&mdev->md_io_mutex);
2560         buffer = (struct meta_data_on_disk *)page_address(mdev->md_io_page);
2561         memset(buffer, 0, 512);
2562
2563         buffer->la_size = cpu_to_be64(drbd_get_capacity(mdev->this_bdev));
2564         for (i = UI_CURRENT; i < UI_SIZE; i++)
2565                 buffer->uuid[i] = cpu_to_be64(mdev->ldev->md.uuid[i]);
2566         buffer->flags = cpu_to_be32(mdev->ldev->md.flags);
2567         buffer->magic = cpu_to_be32(DRBD_MD_MAGIC);
2568
2569         buffer->md_size_sect  = cpu_to_be32(mdev->ldev->md.md_size_sect);
2570         buffer->al_offset     = cpu_to_be32(mdev->ldev->md.al_offset);
2571         buffer->al_nr_extents = cpu_to_be32(mdev->act_log->nr_elements);
2572         buffer->bm_bytes_per_bit = cpu_to_be32(BM_BLOCK_SIZE);
2573         buffer->device_uuid = cpu_to_be64(mdev->ldev->md.device_uuid);
2574
2575         buffer->bm_offset = cpu_to_be32(mdev->ldev->md.bm_offset);
2576         buffer->la_peer_max_bio_size = cpu_to_be32(mdev->peer_max_bio_size);
2577
2578         D_ASSERT(drbd_md_ss__(mdev, mdev->ldev) == mdev->ldev->md.md_offset);
2579         sector = mdev->ldev->md.md_offset;
2580
2581         if (!drbd_md_sync_page_io(mdev, mdev->ldev, sector, WRITE)) {
2582                 /* this was a try anyways ... */
2583                 dev_err(DEV, "meta data update failed!\n");
2584                 drbd_chk_io_error(mdev, 1, true);
2585         }
2586
2587         /* Update mdev->ldev->md.la_size_sect,
2588          * since we updated it on metadata. */
2589         mdev->ldev->md.la_size_sect = drbd_get_capacity(mdev->this_bdev);
2590
2591         mutex_unlock(&mdev->md_io_mutex);
2592         put_ldev(mdev);
2593 }
2594
2595 /**
2596  * drbd_md_read() - Reads in the meta data super block
2597  * @mdev:       DRBD device.
2598  * @bdev:       Device from which the meta data should be read in.
2599  *
2600  * Return 0 (NO_ERROR) on success, and an enum drbd_ret_code in case
2601  * something goes wrong.  Currently only: ERR_IO_MD_DISK, ERR_MD_INVALID.
2602  */
2603 int drbd_md_read(struct drbd_conf *mdev, struct drbd_backing_dev *bdev)
2604 {
2605         struct meta_data_on_disk *buffer;
2606         int i, rv = NO_ERROR;
2607
2608         if (!get_ldev_if_state(mdev, D_ATTACHING))
2609                 return ERR_IO_MD_DISK;
2610
2611         mutex_lock(&mdev->md_io_mutex);
2612         buffer = (struct meta_data_on_disk *)page_address(mdev->md_io_page);
2613
2614         if (!drbd_md_sync_page_io(mdev, bdev, bdev->md.md_offset, READ)) {
2615                 /* NOTE: can't do normal error processing here as this is
2616                    called BEFORE disk is attached */
2617                 dev_err(DEV, "Error while reading metadata.\n");
2618                 rv = ERR_IO_MD_DISK;
2619                 goto err;
2620         }
2621
2622         if (buffer->magic != cpu_to_be32(DRBD_MD_MAGIC)) {
2623                 dev_err(DEV, "Error while reading metadata, magic not found.\n");
2624                 rv = ERR_MD_INVALID;
2625                 goto err;
2626         }
2627         if (be32_to_cpu(buffer->al_offset) != bdev->md.al_offset) {
2628                 dev_err(DEV, "unexpected al_offset: %d (expected %d)\n",
2629                     be32_to_cpu(buffer->al_offset), bdev->md.al_offset);
2630                 rv = ERR_MD_INVALID;
2631                 goto err;
2632         }
2633         if (be32_to_cpu(buffer->bm_offset) != bdev->md.bm_offset) {
2634                 dev_err(DEV, "unexpected bm_offset: %d (expected %d)\n",
2635                     be32_to_cpu(buffer->bm_offset), bdev->md.bm_offset);
2636                 rv = ERR_MD_INVALID;
2637                 goto err;
2638         }
2639         if (be32_to_cpu(buffer->md_size_sect) != bdev->md.md_size_sect) {
2640                 dev_err(DEV, "unexpected md_size: %u (expected %u)\n",
2641                     be32_to_cpu(buffer->md_size_sect), bdev->md.md_size_sect);
2642                 rv = ERR_MD_INVALID;
2643                 goto err;
2644         }
2645
2646         if (be32_to_cpu(buffer->bm_bytes_per_bit) != BM_BLOCK_SIZE) {
2647                 dev_err(DEV, "unexpected bm_bytes_per_bit: %u (expected %u)\n",
2648                     be32_to_cpu(buffer->bm_bytes_per_bit), BM_BLOCK_SIZE);
2649                 rv = ERR_MD_INVALID;
2650                 goto err;
2651         }
2652
2653         bdev->md.la_size_sect = be64_to_cpu(buffer->la_size);
2654         for (i = UI_CURRENT; i < UI_SIZE; i++)
2655                 bdev->md.uuid[i] = be64_to_cpu(buffer->uuid[i]);
2656         bdev->md.flags = be32_to_cpu(buffer->flags);
2657         mdev->sync_conf.al_extents = be32_to_cpu(buffer->al_nr_extents);
2658         bdev->md.device_uuid = be64_to_cpu(buffer->device_uuid);
2659
2660         spin_lock_irq(&mdev->tconn->req_lock);
2661         if (mdev->state.conn < C_CONNECTED) {
2662                 int peer;
2663                 peer = be32_to_cpu(buffer->la_peer_max_bio_size);
2664                 peer = max_t(int, peer, DRBD_MAX_BIO_SIZE_SAFE);
2665                 mdev->peer_max_bio_size = peer;
2666         }
2667         spin_unlock_irq(&mdev->tconn->req_lock);
2668
2669         if (mdev->sync_conf.al_extents < 7)
2670                 mdev->sync_conf.al_extents = 127;
2671
2672  err:
2673         mutex_unlock(&mdev->md_io_mutex);
2674         put_ldev(mdev);
2675
2676         return rv;
2677 }
2678
2679 /**
2680  * drbd_md_mark_dirty() - Mark meta data super block as dirty
2681  * @mdev:       DRBD device.
2682  *
2683  * Call this function if you change anything that should be written to
2684  * the meta-data super block. This function sets MD_DIRTY, and starts a
2685  * timer that ensures that within five seconds you have to call drbd_md_sync().
2686  */
2687 #ifdef DEBUG
2688 void drbd_md_mark_dirty_(struct drbd_conf *mdev, unsigned int line, const char *func)
2689 {
2690         if (!test_and_set_bit(MD_DIRTY, &mdev->flags)) {
2691                 mod_timer(&mdev->md_sync_timer, jiffies + HZ);
2692                 mdev->last_md_mark_dirty.line = line;
2693                 mdev->last_md_mark_dirty.func = func;
2694         }
2695 }
2696 #else
2697 void drbd_md_mark_dirty(struct drbd_conf *mdev)
2698 {
2699         if (!test_and_set_bit(MD_DIRTY, &mdev->flags))
2700                 mod_timer(&mdev->md_sync_timer, jiffies + 5*HZ);
2701 }
2702 #endif
2703
2704 static void drbd_uuid_move_history(struct drbd_conf *mdev) __must_hold(local)
2705 {
2706         int i;
2707
2708         for (i = UI_HISTORY_START; i < UI_HISTORY_END; i++)
2709                 mdev->ldev->md.uuid[i+1] = mdev->ldev->md.uuid[i];
2710 }
2711
2712 void _drbd_uuid_set(struct drbd_conf *mdev, int idx, u64 val) __must_hold(local)
2713 {
2714         if (idx == UI_CURRENT) {
2715                 if (mdev->state.role == R_PRIMARY)
2716                         val |= 1;
2717                 else
2718                         val &= ~((u64)1);
2719
2720                 drbd_set_ed_uuid(mdev, val);
2721         }
2722
2723         mdev->ldev->md.uuid[idx] = val;
2724         drbd_md_mark_dirty(mdev);
2725 }
2726
2727
2728 void drbd_uuid_set(struct drbd_conf *mdev, int idx, u64 val) __must_hold(local)
2729 {
2730         if (mdev->ldev->md.uuid[idx]) {
2731                 drbd_uuid_move_history(mdev);
2732                 mdev->ldev->md.uuid[UI_HISTORY_START] = mdev->ldev->md.uuid[idx];
2733         }
2734         _drbd_uuid_set(mdev, idx, val);
2735 }
2736
2737 /**
2738  * drbd_uuid_new_current() - Creates a new current UUID
2739  * @mdev:       DRBD device.
2740  *
2741  * Creates a new current UUID, and rotates the old current UUID into
2742  * the bitmap slot. Causes an incremental resync upon next connect.
2743  */
2744 void drbd_uuid_new_current(struct drbd_conf *mdev) __must_hold(local)
2745 {
2746         u64 val;
2747         unsigned long long bm_uuid = mdev->ldev->md.uuid[UI_BITMAP];
2748
2749         if (bm_uuid)
2750                 dev_warn(DEV, "bm UUID was already set: %llX\n", bm_uuid);
2751
2752         mdev->ldev->md.uuid[UI_BITMAP] = mdev->ldev->md.uuid[UI_CURRENT];
2753
2754         get_random_bytes(&val, sizeof(u64));
2755         _drbd_uuid_set(mdev, UI_CURRENT, val);
2756         drbd_print_uuids(mdev, "new current UUID");
2757         /* get it to stable storage _now_ */
2758         drbd_md_sync(mdev);
2759 }
2760
2761 void drbd_uuid_set_bm(struct drbd_conf *mdev, u64 val) __must_hold(local)
2762 {
2763         if (mdev->ldev->md.uuid[UI_BITMAP] == 0 && val == 0)
2764                 return;
2765
2766         if (val == 0) {
2767                 drbd_uuid_move_history(mdev);
2768                 mdev->ldev->md.uuid[UI_HISTORY_START] = mdev->ldev->md.uuid[UI_BITMAP];
2769                 mdev->ldev->md.uuid[UI_BITMAP] = 0;
2770         } else {
2771                 unsigned long long bm_uuid = mdev->ldev->md.uuid[UI_BITMAP];
2772                 if (bm_uuid)
2773                         dev_warn(DEV, "bm UUID was already set: %llX\n", bm_uuid);
2774
2775                 mdev->ldev->md.uuid[UI_BITMAP] = val & ~((u64)1);
2776         }
2777         drbd_md_mark_dirty(mdev);
2778 }
2779
2780 /**
2781  * drbd_bmio_set_n_write() - io_fn for drbd_queue_bitmap_io() or drbd_bitmap_io()
2782  * @mdev:       DRBD device.
2783  *
2784  * Sets all bits in the bitmap and writes the whole bitmap to stable storage.
2785  */
2786 int drbd_bmio_set_n_write(struct drbd_conf *mdev)
2787 {
2788         int rv = -EIO;
2789
2790         if (get_ldev_if_state(mdev, D_ATTACHING)) {
2791                 drbd_md_set_flag(mdev, MDF_FULL_SYNC);
2792                 drbd_md_sync(mdev);
2793                 drbd_bm_set_all(mdev);
2794
2795                 rv = drbd_bm_write(mdev);
2796
2797                 if (!rv) {
2798                         drbd_md_clear_flag(mdev, MDF_FULL_SYNC);
2799                         drbd_md_sync(mdev);
2800                 }
2801
2802                 put_ldev(mdev);
2803         }
2804
2805         return rv;
2806 }
2807
2808 /**
2809  * drbd_bmio_clear_n_write() - io_fn for drbd_queue_bitmap_io() or drbd_bitmap_io()
2810  * @mdev:       DRBD device.
2811  *
2812  * Clears all bits in the bitmap and writes the whole bitmap to stable storage.
2813  */
2814 int drbd_bmio_clear_n_write(struct drbd_conf *mdev)
2815 {
2816         int rv = -EIO;
2817
2818         drbd_resume_al(mdev);
2819         if (get_ldev_if_state(mdev, D_ATTACHING)) {
2820                 drbd_bm_clear_all(mdev);
2821                 rv = drbd_bm_write(mdev);
2822                 put_ldev(mdev);
2823         }
2824
2825         return rv;
2826 }
2827
2828 static int w_bitmap_io(struct drbd_work *w, int unused)
2829 {
2830         struct bm_io_work *work = container_of(w, struct bm_io_work, w);
2831         struct drbd_conf *mdev = w->mdev;
2832         int rv = -EIO;
2833
2834         D_ASSERT(atomic_read(&mdev->ap_bio_cnt) == 0);
2835
2836         if (get_ldev(mdev)) {
2837                 drbd_bm_lock(mdev, work->why, work->flags);
2838                 rv = work->io_fn(mdev);
2839                 drbd_bm_unlock(mdev);
2840                 put_ldev(mdev);
2841         }
2842
2843         clear_bit_unlock(BITMAP_IO, &mdev->flags);
2844         wake_up(&mdev->misc_wait);
2845
2846         if (work->done)
2847                 work->done(mdev, rv);
2848
2849         clear_bit(BITMAP_IO_QUEUED, &mdev->flags);
2850         work->why = NULL;
2851         work->flags = 0;
2852
2853         return 1;
2854 }
2855
2856 void drbd_ldev_destroy(struct drbd_conf *mdev)
2857 {
2858         lc_destroy(mdev->resync);
2859         mdev->resync = NULL;
2860         lc_destroy(mdev->act_log);
2861         mdev->act_log = NULL;
2862         __no_warn(local,
2863                 drbd_free_bc(mdev->ldev);
2864                 mdev->ldev = NULL;);
2865
2866         clear_bit(GO_DISKLESS, &mdev->flags);
2867 }
2868
2869 static int w_go_diskless(struct drbd_work *w, int unused)
2870 {
2871         struct drbd_conf *mdev = w->mdev;
2872
2873         D_ASSERT(mdev->state.disk == D_FAILED);
2874         /* we cannot assert local_cnt == 0 here, as get_ldev_if_state will
2875          * inc/dec it frequently. Once we are D_DISKLESS, no one will touch
2876          * the protected members anymore, though, so once put_ldev reaches zero
2877          * again, it will be safe to free them. */
2878         drbd_force_state(mdev, NS(disk, D_DISKLESS));
2879         return 1;
2880 }
2881
2882 void drbd_go_diskless(struct drbd_conf *mdev)
2883 {
2884         D_ASSERT(mdev->state.disk == D_FAILED);
2885         if (!test_and_set_bit(GO_DISKLESS, &mdev->flags))
2886                 drbd_queue_work(&mdev->tconn->data.work, &mdev->go_diskless);
2887 }
2888
2889 /**
2890  * drbd_queue_bitmap_io() - Queues an IO operation on the whole bitmap
2891  * @mdev:       DRBD device.
2892  * @io_fn:      IO callback to be called when bitmap IO is possible
2893  * @done:       callback to be called after the bitmap IO was performed
2894  * @why:        Descriptive text of the reason for doing the IO
2895  *
2896  * While IO on the bitmap happens we freeze application IO thus we ensure
2897  * that drbd_set_out_of_sync() can not be called. This function MAY ONLY be
2898  * called from worker context. It MUST NOT be used while a previous such
2899  * work is still pending!
2900  */
2901 void drbd_queue_bitmap_io(struct drbd_conf *mdev,
2902                           int (*io_fn)(struct drbd_conf *),
2903                           void (*done)(struct drbd_conf *, int),
2904                           char *why, enum bm_flag flags)
2905 {
2906         D_ASSERT(current == mdev->tconn->worker.task);
2907
2908         D_ASSERT(!test_bit(BITMAP_IO_QUEUED, &mdev->flags));
2909         D_ASSERT(!test_bit(BITMAP_IO, &mdev->flags));
2910         D_ASSERT(list_empty(&mdev->bm_io_work.w.list));
2911         if (mdev->bm_io_work.why)
2912                 dev_err(DEV, "FIXME going to queue '%s' but '%s' still pending?\n",
2913                         why, mdev->bm_io_work.why);
2914
2915         mdev->bm_io_work.io_fn = io_fn;
2916         mdev->bm_io_work.done = done;
2917         mdev->bm_io_work.why = why;
2918         mdev->bm_io_work.flags = flags;
2919
2920         spin_lock_irq(&mdev->tconn->req_lock);
2921         set_bit(BITMAP_IO, &mdev->flags);
2922         if (atomic_read(&mdev->ap_bio_cnt) == 0) {
2923                 if (!test_and_set_bit(BITMAP_IO_QUEUED, &mdev->flags))
2924                         drbd_queue_work(&mdev->tconn->data.work, &mdev->bm_io_work.w);
2925         }
2926         spin_unlock_irq(&mdev->tconn->req_lock);
2927 }
2928
2929 /**
2930  * drbd_bitmap_io() -  Does an IO operation on the whole bitmap
2931  * @mdev:       DRBD device.
2932  * @io_fn:      IO callback to be called when bitmap IO is possible
2933  * @why:        Descriptive text of the reason for doing the IO
2934  *
2935  * freezes application IO while that the actual IO operations runs. This
2936  * functions MAY NOT be called from worker context.
2937  */
2938 int drbd_bitmap_io(struct drbd_conf *mdev, int (*io_fn)(struct drbd_conf *),
2939                 char *why, enum bm_flag flags)
2940 {
2941         int rv;
2942
2943         D_ASSERT(current != mdev->tconn->worker.task);
2944
2945         if ((flags & BM_LOCKED_SET_ALLOWED) == 0)
2946                 drbd_suspend_io(mdev);
2947
2948         drbd_bm_lock(mdev, why, flags);
2949         rv = io_fn(mdev);
2950         drbd_bm_unlock(mdev);
2951
2952         if ((flags & BM_LOCKED_SET_ALLOWED) == 0)
2953                 drbd_resume_io(mdev);
2954
2955         return rv;
2956 }
2957
2958 void drbd_md_set_flag(struct drbd_conf *mdev, int flag) __must_hold(local)
2959 {
2960         if ((mdev->ldev->md.flags & flag) != flag) {
2961                 drbd_md_mark_dirty(mdev);
2962                 mdev->ldev->md.flags |= flag;
2963         }
2964 }
2965
2966 void drbd_md_clear_flag(struct drbd_conf *mdev, int flag) __must_hold(local)
2967 {
2968         if ((mdev->ldev->md.flags & flag) != 0) {
2969                 drbd_md_mark_dirty(mdev);
2970                 mdev->ldev->md.flags &= ~flag;
2971         }
2972 }
2973 int drbd_md_test_flag(struct drbd_backing_dev *bdev, int flag)
2974 {
2975         return (bdev->md.flags & flag) != 0;
2976 }
2977
2978 static void md_sync_timer_fn(unsigned long data)
2979 {
2980         struct drbd_conf *mdev = (struct drbd_conf *) data;
2981
2982         drbd_queue_work_front(&mdev->tconn->data.work, &mdev->md_sync_work);
2983 }
2984
2985 static int w_md_sync(struct drbd_work *w, int unused)
2986 {
2987         struct drbd_conf *mdev = w->mdev;
2988
2989         dev_warn(DEV, "md_sync_timer expired! Worker calls drbd_md_sync().\n");
2990 #ifdef DEBUG
2991         dev_warn(DEV, "last md_mark_dirty: %s:%u\n",
2992                 mdev->last_md_mark_dirty.func, mdev->last_md_mark_dirty.line);
2993 #endif
2994         drbd_md_sync(mdev);
2995         return 1;
2996 }
2997
2998 const char *cmdname(enum drbd_packet cmd)
2999 {
3000         /* THINK may need to become several global tables
3001          * when we want to support more than
3002          * one PRO_VERSION */
3003         static const char *cmdnames[] = {
3004                 [P_DATA]                = "Data",
3005                 [P_DATA_REPLY]          = "DataReply",
3006                 [P_RS_DATA_REPLY]       = "RSDataReply",
3007                 [P_BARRIER]             = "Barrier",
3008                 [P_BITMAP]              = "ReportBitMap",
3009                 [P_BECOME_SYNC_TARGET]  = "BecomeSyncTarget",
3010                 [P_BECOME_SYNC_SOURCE]  = "BecomeSyncSource",
3011                 [P_UNPLUG_REMOTE]       = "UnplugRemote",
3012                 [P_DATA_REQUEST]        = "DataRequest",
3013                 [P_RS_DATA_REQUEST]     = "RSDataRequest",
3014                 [P_SYNC_PARAM]          = "SyncParam",
3015                 [P_SYNC_PARAM89]        = "SyncParam89",
3016                 [P_PROTOCOL]            = "ReportProtocol",
3017                 [P_UUIDS]               = "ReportUUIDs",
3018                 [P_SIZES]               = "ReportSizes",
3019                 [P_STATE]               = "ReportState",
3020                 [P_SYNC_UUID]           = "ReportSyncUUID",
3021                 [P_AUTH_CHALLENGE]      = "AuthChallenge",
3022                 [P_AUTH_RESPONSE]       = "AuthResponse",
3023                 [P_PING]                = "Ping",
3024                 [P_PING_ACK]            = "PingAck",
3025                 [P_RECV_ACK]            = "RecvAck",
3026                 [P_WRITE_ACK]           = "WriteAck",
3027                 [P_RS_WRITE_ACK]        = "RSWriteAck",
3028                 [P_DISCARD_WRITE]        = "DiscardWrite",
3029                 [P_NEG_ACK]             = "NegAck",
3030                 [P_NEG_DREPLY]          = "NegDReply",
3031                 [P_NEG_RS_DREPLY]       = "NegRSDReply",
3032                 [P_BARRIER_ACK]         = "BarrierAck",
3033                 [P_STATE_CHG_REQ]       = "StateChgRequest",
3034                 [P_STATE_CHG_REPLY]     = "StateChgReply",
3035                 [P_OV_REQUEST]          = "OVRequest",
3036                 [P_OV_REPLY]            = "OVReply",
3037                 [P_OV_RESULT]           = "OVResult",
3038                 [P_CSUM_RS_REQUEST]     = "CsumRSRequest",
3039                 [P_RS_IS_IN_SYNC]       = "CsumRSIsInSync",
3040                 [P_COMPRESSED_BITMAP]   = "CBitmap",
3041                 [P_DELAY_PROBE]         = "DelayProbe",
3042                 [P_OUT_OF_SYNC]         = "OutOfSync",
3043                 [P_RETRY_WRITE]         = "RetryWrite",
3044         };
3045
3046         if (cmd == P_HAND_SHAKE_M)
3047                 return "HandShakeM";
3048         if (cmd == P_HAND_SHAKE_S)
3049                 return "HandShakeS";
3050         if (cmd == P_HAND_SHAKE)
3051                 return "HandShake";
3052         if (cmd >= ARRAY_SIZE(cmdnames))
3053                 return "Unknown";
3054         return cmdnames[cmd];
3055 }
3056
3057 /**
3058  * drbd_wait_misc  -  wait for a request to make progress
3059  * @mdev:       device associated with the request
3060  * @i:          the struct drbd_interval embedded in struct drbd_request or
3061  *              struct drbd_peer_request
3062  */
3063 int drbd_wait_misc(struct drbd_conf *mdev, struct drbd_interval *i)
3064 {
3065         struct net_conf *net_conf = mdev->tconn->net_conf;
3066         DEFINE_WAIT(wait);
3067         long timeout;
3068
3069         if (!net_conf)
3070                 return -ETIMEDOUT;
3071         timeout = MAX_SCHEDULE_TIMEOUT;
3072         if (net_conf->ko_count)
3073                 timeout = net_conf->timeout * HZ / 10 * net_conf->ko_count;
3074
3075         /* Indicate to wake up mdev->misc_wait on progress.  */
3076         i->waiting = true;
3077         prepare_to_wait(&mdev->misc_wait, &wait, TASK_INTERRUPTIBLE);
3078         spin_unlock_irq(&mdev->tconn->req_lock);
3079         timeout = schedule_timeout(timeout);
3080         finish_wait(&mdev->misc_wait, &wait);
3081         spin_lock_irq(&mdev->tconn->req_lock);
3082         if (!timeout || mdev->state.conn < C_CONNECTED)
3083                 return -ETIMEDOUT;
3084         if (signal_pending(current))
3085                 return -ERESTARTSYS;
3086         return 0;
3087 }
3088
3089 #ifdef CONFIG_DRBD_FAULT_INJECTION
3090 /* Fault insertion support including random number generator shamelessly
3091  * stolen from kernel/rcutorture.c */
3092 struct fault_random_state {
3093         unsigned long state;
3094         unsigned long count;
3095 };
3096
3097 #define FAULT_RANDOM_MULT 39916801  /* prime */
3098 #define FAULT_RANDOM_ADD        479001701 /* prime */
3099 #define FAULT_RANDOM_REFRESH 10000
3100
3101 /*
3102  * Crude but fast random-number generator.  Uses a linear congruential
3103  * generator, with occasional help from get_random_bytes().
3104  */
3105 static unsigned long
3106 _drbd_fault_random(struct fault_random_state *rsp)
3107 {
3108         long refresh;
3109
3110         if (!rsp->count--) {
3111                 get_random_bytes(&refresh, sizeof(refresh));
3112                 rsp->state += refresh;
3113                 rsp->count = FAULT_RANDOM_REFRESH;
3114         }
3115         rsp->state = rsp->state * FAULT_RANDOM_MULT + FAULT_RANDOM_ADD;
3116         return swahw32(rsp->state);
3117 }
3118
3119 static char *
3120 _drbd_fault_str(unsigned int type) {
3121         static char *_faults[] = {
3122                 [DRBD_FAULT_MD_WR] = "Meta-data write",
3123                 [DRBD_FAULT_MD_RD] = "Meta-data read",
3124                 [DRBD_FAULT_RS_WR] = "Resync write",
3125                 [DRBD_FAULT_RS_RD] = "Resync read",
3126                 [DRBD_FAULT_DT_WR] = "Data write",
3127                 [DRBD_FAULT_DT_RD] = "Data read",
3128                 [DRBD_FAULT_DT_RA] = "Data read ahead",
3129                 [DRBD_FAULT_BM_ALLOC] = "BM allocation",
3130                 [DRBD_FAULT_AL_EE] = "EE allocation",
3131                 [DRBD_FAULT_RECEIVE] = "receive data corruption",
3132         };
3133
3134         return (type < DRBD_FAULT_MAX) ? _faults[type] : "**Unknown**";
3135 }
3136
3137 unsigned int
3138 _drbd_insert_fault(struct drbd_conf *mdev, unsigned int type)
3139 {
3140         static struct fault_random_state rrs = {0, 0};
3141
3142         unsigned int ret = (
3143                 (fault_devs == 0 ||
3144                         ((1 << mdev_to_minor(mdev)) & fault_devs) != 0) &&
3145                 (((_drbd_fault_random(&rrs) % 100) + 1) <= fault_rate));
3146
3147         if (ret) {
3148                 fault_count++;
3149
3150                 if (__ratelimit(&drbd_ratelimit_state))
3151                         dev_warn(DEV, "***Simulating %s failure\n",
3152                                 _drbd_fault_str(type));
3153         }
3154
3155         return ret;
3156 }
3157 #endif
3158
3159 const char *drbd_buildtag(void)
3160 {
3161         /* DRBD built from external sources has here a reference to the
3162            git hash of the source code. */
3163
3164         static char buildtag[38] = "\0uilt-in";
3165
3166         if (buildtag[0] == 0) {
3167 #ifdef CONFIG_MODULES
3168                 if (THIS_MODULE != NULL)
3169                         sprintf(buildtag, "srcversion: %-24s", THIS_MODULE->srcversion);
3170                 else
3171 #endif
3172                         buildtag[0] = 'b';
3173         }
3174
3175         return buildtag;
3176 }
3177
3178 module_init(drbd_init)
3179 module_exit(drbd_cleanup)
3180
3181 EXPORT_SYMBOL(drbd_conn_str);
3182 EXPORT_SYMBOL(drbd_role_str);
3183 EXPORT_SYMBOL(drbd_disk_str);
3184 EXPORT_SYMBOL(drbd_set_st_err_str);