]> git.karo-electronics.de Git - karo-tx-linux.git/blob - drivers/block/drbd/drbd_main.c
drbd: allow holes in minor and volume id allocation
[karo-tx-linux.git] / drivers / block / drbd / drbd_main.c
1 /*
2    drbd.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    Thanks to Carter Burden, Bart Grantham and Gennadiy Nerubayev
11    from Logicworks, Inc. for making SDP replication support possible.
12
13    drbd is free software; you can redistribute it and/or modify
14    it under the terms of the GNU General Public License as published by
15    the Free Software Foundation; either version 2, or (at your option)
16    any later version.
17
18    drbd is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
21    GNU General Public License for more details.
22
23    You should have received a copy of the GNU General Public License
24    along with drbd; see the file COPYING.  If not, write to
25    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
26
27  */
28
29 #include <linux/module.h>
30 #include <linux/drbd.h>
31 #include <asm/uaccess.h>
32 #include <asm/types.h>
33 #include <net/sock.h>
34 #include <linux/ctype.h>
35 #include <linux/mutex.h>
36 #include <linux/fs.h>
37 #include <linux/file.h>
38 #include <linux/proc_fs.h>
39 #include <linux/init.h>
40 #include <linux/mm.h>
41 #include <linux/memcontrol.h>
42 #include <linux/mm_inline.h>
43 #include <linux/slab.h>
44 #include <linux/random.h>
45 #include <linux/reboot.h>
46 #include <linux/notifier.h>
47 #include <linux/kthread.h>
48
49 #define __KERNEL_SYSCALLS__
50 #include <linux/unistd.h>
51 #include <linux/vmalloc.h>
52
53 #include <linux/drbd_limits.h>
54 #include "drbd_int.h"
55 #include "drbd_req.h" /* only for _req_mod in tl_release and tl_clear */
56
57 #include "drbd_vli.h"
58
59 static DEFINE_MUTEX(drbd_main_mutex);
60 int drbdd_init(struct drbd_thread *);
61 int drbd_worker(struct drbd_thread *);
62 int drbd_asender(struct drbd_thread *);
63
64 int drbd_init(void);
65 static int drbd_open(struct block_device *bdev, fmode_t mode);
66 static int drbd_release(struct gendisk *gd, fmode_t mode);
67 static int w_md_sync(struct drbd_work *w, int unused);
68 static void md_sync_timer_fn(unsigned long data);
69 static int w_bitmap_io(struct drbd_work *w, int unused);
70 static int w_go_diskless(struct drbd_work *w, int unused);
71
72 MODULE_AUTHOR("Philipp Reisner <phil@linbit.com>, "
73               "Lars Ellenberg <lars@linbit.com>");
74 MODULE_DESCRIPTION("drbd - Distributed Replicated Block Device v" REL_VERSION);
75 MODULE_VERSION(REL_VERSION);
76 MODULE_LICENSE("GPL");
77 MODULE_PARM_DESC(minor_count, "Approximate number of drbd devices ("
78                  __stringify(DRBD_MINOR_COUNT_MIN) "-" __stringify(DRBD_MINOR_COUNT_MAX) ")");
79 MODULE_ALIAS_BLOCKDEV_MAJOR(DRBD_MAJOR);
80
81 #include <linux/moduleparam.h>
82 /* allow_open_on_secondary */
83 MODULE_PARM_DESC(allow_oos, "DONT USE!");
84 /* thanks to these macros, if compiled into the kernel (not-module),
85  * this becomes the boot parameter drbd.minor_count */
86 module_param(minor_count, uint, 0444);
87 module_param(disable_sendpage, bool, 0644);
88 module_param(allow_oos, bool, 0);
89 module_param(proc_details, int, 0644);
90
91 #ifdef CONFIG_DRBD_FAULT_INJECTION
92 int enable_faults;
93 int fault_rate;
94 static int fault_count;
95 int fault_devs;
96 /* bitmap of enabled faults */
97 module_param(enable_faults, int, 0664);
98 /* fault rate % value - applies to all enabled faults */
99 module_param(fault_rate, int, 0664);
100 /* count of faults inserted */
101 module_param(fault_count, int, 0664);
102 /* bitmap of devices to insert faults on */
103 module_param(fault_devs, int, 0644);
104 #endif
105
106 /* module parameter, defined */
107 unsigned int minor_count = DRBD_MINOR_COUNT_DEF;
108 int disable_sendpage;
109 int allow_oos;
110 int proc_details;       /* Detail level in proc drbd*/
111
112 /* Module parameter for setting the user mode helper program
113  * to run. Default is /sbin/drbdadm */
114 char usermode_helper[80] = "/sbin/drbdadm";
115
116 module_param_string(usermode_helper, usermode_helper, sizeof(usermode_helper), 0644);
117
118 /* in 2.6.x, our device mapping and config info contains our virtual gendisks
119  * as member "struct gendisk *vdisk;"
120  */
121 struct idr minors;
122 struct list_head drbd_tconns;  /* list of struct drbd_tconn */
123
124 struct kmem_cache *drbd_request_cache;
125 struct kmem_cache *drbd_ee_cache;       /* peer requests */
126 struct kmem_cache *drbd_bm_ext_cache;   /* bitmap extents */
127 struct kmem_cache *drbd_al_ext_cache;   /* activity log extents */
128 mempool_t *drbd_request_mempool;
129 mempool_t *drbd_ee_mempool;
130 mempool_t *drbd_md_io_page_pool;
131 struct bio_set *drbd_md_io_bio_set;
132
133 /* I do not use a standard mempool, because:
134    1) I want to hand out the pre-allocated objects first.
135    2) I want to be able to interrupt sleeping allocation with a signal.
136    Note: This is a single linked list, the next pointer is the private
137          member of struct page.
138  */
139 struct page *drbd_pp_pool;
140 spinlock_t   drbd_pp_lock;
141 int          drbd_pp_vacant;
142 wait_queue_head_t drbd_pp_wait;
143
144 DEFINE_RATELIMIT_STATE(drbd_ratelimit_state, 5 * HZ, 5);
145
146 static const struct block_device_operations drbd_ops = {
147         .owner =   THIS_MODULE,
148         .open =    drbd_open,
149         .release = drbd_release,
150 };
151
152 static void bio_destructor_drbd(struct bio *bio)
153 {
154         bio_free(bio, drbd_md_io_bio_set);
155 }
156
157 struct bio *bio_alloc_drbd(gfp_t gfp_mask)
158 {
159         struct bio *bio;
160
161         if (!drbd_md_io_bio_set)
162                 return bio_alloc(gfp_mask, 1);
163
164         bio = bio_alloc_bioset(gfp_mask, 1, drbd_md_io_bio_set);
165         if (!bio)
166                 return NULL;
167         bio->bi_destructor = bio_destructor_drbd;
168         return bio;
169 }
170
171 #ifdef __CHECKER__
172 /* When checking with sparse, and this is an inline function, sparse will
173    give tons of false positives. When this is a real functions sparse works.
174  */
175 int _get_ldev_if_state(struct drbd_conf *mdev, enum drbd_disk_state mins)
176 {
177         int io_allowed;
178
179         atomic_inc(&mdev->local_cnt);
180         io_allowed = (mdev->state.disk >= mins);
181         if (!io_allowed) {
182                 if (atomic_dec_and_test(&mdev->local_cnt))
183                         wake_up(&mdev->misc_wait);
184         }
185         return io_allowed;
186 }
187
188 #endif
189
190 /**
191  * DOC: The transfer log
192  *
193  * The transfer log is a single linked list of &struct drbd_tl_epoch objects.
194  * mdev->tconn->newest_tle points to the head, mdev->tconn->oldest_tle points to the tail
195  * of the list. There is always at least one &struct drbd_tl_epoch object.
196  *
197  * Each &struct drbd_tl_epoch has a circular double linked list of requests
198  * attached.
199  */
200 static int tl_init(struct drbd_tconn *tconn)
201 {
202         struct drbd_tl_epoch *b;
203
204         /* during device minor initialization, we may well use GFP_KERNEL */
205         b = kmalloc(sizeof(struct drbd_tl_epoch), GFP_KERNEL);
206         if (!b)
207                 return 0;
208         INIT_LIST_HEAD(&b->requests);
209         INIT_LIST_HEAD(&b->w.list);
210         b->next = NULL;
211         b->br_number = 4711;
212         b->n_writes = 0;
213         b->w.cb = NULL; /* if this is != NULL, we need to dec_ap_pending in tl_clear */
214
215         tconn->oldest_tle = b;
216         tconn->newest_tle = b;
217         INIT_LIST_HEAD(&tconn->out_of_sequence_requests);
218
219         return 1;
220 }
221
222 static void tl_cleanup(struct drbd_tconn *tconn)
223 {
224         if (tconn->oldest_tle != tconn->newest_tle)
225                 conn_err(tconn, "ASSERT FAILED: oldest_tle == newest_tle\n");
226         if (!list_empty(&tconn->out_of_sequence_requests))
227                 conn_err(tconn, "ASSERT FAILED: list_empty(out_of_sequence_requests)\n");
228         kfree(tconn->oldest_tle);
229         tconn->oldest_tle = NULL;
230         kfree(tconn->unused_spare_tle);
231         tconn->unused_spare_tle = NULL;
232 }
233
234 /**
235  * _tl_add_barrier() - Adds a barrier to the transfer log
236  * @mdev:       DRBD device.
237  * @new:        Barrier to be added before the current head of the TL.
238  *
239  * The caller must hold the req_lock.
240  */
241 void _tl_add_barrier(struct drbd_tconn *tconn, struct drbd_tl_epoch *new)
242 {
243         struct drbd_tl_epoch *newest_before;
244
245         INIT_LIST_HEAD(&new->requests);
246         INIT_LIST_HEAD(&new->w.list);
247         new->w.cb = NULL; /* if this is != NULL, we need to dec_ap_pending in tl_clear */
248         new->next = NULL;
249         new->n_writes = 0;
250
251         newest_before = tconn->newest_tle;
252         /* never send a barrier number == 0, because that is special-cased
253          * when using TCQ for our write ordering code */
254         new->br_number = (newest_before->br_number+1) ?: 1;
255         if (tconn->newest_tle != new) {
256                 tconn->newest_tle->next = new;
257                 tconn->newest_tle = new;
258         }
259 }
260
261 /**
262  * tl_release() - Free or recycle the oldest &struct drbd_tl_epoch object of the TL
263  * @mdev:       DRBD device.
264  * @barrier_nr: Expected identifier of the DRBD write barrier packet.
265  * @set_size:   Expected number of requests before that barrier.
266  *
267  * In case the passed barrier_nr or set_size does not match the oldest
268  * &struct drbd_tl_epoch objects this function will cause a termination
269  * of the connection.
270  */
271 void tl_release(struct drbd_tconn *tconn, unsigned int barrier_nr,
272                 unsigned int set_size)
273 {
274         struct drbd_conf *mdev;
275         struct drbd_tl_epoch *b, *nob; /* next old barrier */
276         struct list_head *le, *tle;
277         struct drbd_request *r;
278
279         spin_lock_irq(&tconn->req_lock);
280
281         b = tconn->oldest_tle;
282
283         /* first some paranoia code */
284         if (b == NULL) {
285                 conn_err(tconn, "BAD! BarrierAck #%u received, but no epoch in tl!?\n",
286                          barrier_nr);
287                 goto bail;
288         }
289         if (b->br_number != barrier_nr) {
290                 conn_err(tconn, "BAD! BarrierAck #%u received, expected #%u!\n",
291                          barrier_nr, b->br_number);
292                 goto bail;
293         }
294         if (b->n_writes != set_size) {
295                 conn_err(tconn, "BAD! BarrierAck #%u received with n_writes=%u, expected n_writes=%u!\n",
296                          barrier_nr, set_size, b->n_writes);
297                 goto bail;
298         }
299
300         /* Clean up list of requests processed during current epoch */
301         list_for_each_safe(le, tle, &b->requests) {
302                 r = list_entry(le, struct drbd_request, tl_requests);
303                 _req_mod(r, BARRIER_ACKED);
304         }
305         /* There could be requests on the list waiting for completion
306            of the write to the local disk. To avoid corruptions of
307            slab's data structures we have to remove the lists head.
308
309            Also there could have been a barrier ack out of sequence, overtaking
310            the write acks - which would be a bug and violating write ordering.
311            To not deadlock in case we lose connection while such requests are
312            still pending, we need some way to find them for the
313            _req_mode(CONNECTION_LOST_WHILE_PENDING).
314
315            These have been list_move'd to the out_of_sequence_requests list in
316            _req_mod(, BARRIER_ACKED) above.
317            */
318         list_del_init(&b->requests);
319         mdev = b->w.mdev;
320
321         nob = b->next;
322         if (test_and_clear_bit(CREATE_BARRIER, &mdev->flags)) {
323                 _tl_add_barrier(tconn, b);
324                 if (nob)
325                         tconn->oldest_tle = nob;
326                 /* if nob == NULL b was the only barrier, and becomes the new
327                    barrier. Therefore tconn->oldest_tle points already to b */
328         } else {
329                 D_ASSERT(nob != NULL);
330                 tconn->oldest_tle = nob;
331                 kfree(b);
332         }
333
334         spin_unlock_irq(&tconn->req_lock);
335         dec_ap_pending(mdev);
336
337         return;
338
339 bail:
340         spin_unlock_irq(&tconn->req_lock);
341         conn_request_state(tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
342 }
343
344
345 /**
346  * _tl_restart() - Walks the transfer log, and applies an action to all requests
347  * @mdev:       DRBD device.
348  * @what:       The action/event to perform with all request objects
349  *
350  * @what might be one of CONNECTION_LOST_WHILE_PENDING, RESEND, FAIL_FROZEN_DISK_IO,
351  * RESTART_FROZEN_DISK_IO.
352  */
353 void _tl_restart(struct drbd_tconn *tconn, enum drbd_req_event what)
354 {
355         struct drbd_tl_epoch *b, *tmp, **pn;
356         struct list_head *le, *tle, carry_reads;
357         struct drbd_request *req;
358         int rv, n_writes, n_reads;
359
360         b = tconn->oldest_tle;
361         pn = &tconn->oldest_tle;
362         while (b) {
363                 n_writes = 0;
364                 n_reads = 0;
365                 INIT_LIST_HEAD(&carry_reads);
366                 list_for_each_safe(le, tle, &b->requests) {
367                         req = list_entry(le, struct drbd_request, tl_requests);
368                         rv = _req_mod(req, what);
369
370                         n_writes += (rv & MR_WRITE) >> MR_WRITE_SHIFT;
371                         n_reads  += (rv & MR_READ) >> MR_READ_SHIFT;
372                 }
373                 tmp = b->next;
374
375                 if (n_writes) {
376                         if (what == RESEND) {
377                                 b->n_writes = n_writes;
378                                 if (b->w.cb == NULL) {
379                                         b->w.cb = w_send_barrier;
380                                         inc_ap_pending(b->w.mdev);
381                                         set_bit(CREATE_BARRIER, &b->w.mdev->flags);
382                                 }
383
384                                 drbd_queue_work(&tconn->data.work, &b->w);
385                         }
386                         pn = &b->next;
387                 } else {
388                         if (n_reads)
389                                 list_add(&carry_reads, &b->requests);
390                         /* there could still be requests on that ring list,
391                          * in case local io is still pending */
392                         list_del(&b->requests);
393
394                         /* dec_ap_pending corresponding to queue_barrier.
395                          * the newest barrier may not have been queued yet,
396                          * in which case w.cb is still NULL. */
397                         if (b->w.cb != NULL)
398                                 dec_ap_pending(b->w.mdev);
399
400                         if (b == tconn->newest_tle) {
401                                 /* recycle, but reinit! */
402                                 if (tmp != NULL)
403                                         conn_err(tconn, "ASSERT FAILED tmp == NULL");
404                                 INIT_LIST_HEAD(&b->requests);
405                                 list_splice(&carry_reads, &b->requests);
406                                 INIT_LIST_HEAD(&b->w.list);
407                                 b->w.cb = NULL;
408                                 b->br_number = net_random();
409                                 b->n_writes = 0;
410
411                                 *pn = b;
412                                 break;
413                         }
414                         *pn = tmp;
415                         kfree(b);
416                 }
417                 b = tmp;
418                 list_splice(&carry_reads, &b->requests);
419         }
420 }
421
422
423 /**
424  * tl_clear() - Clears all requests and &struct drbd_tl_epoch objects out of the TL
425  * @mdev:       DRBD device.
426  *
427  * This is called after the connection to the peer was lost. The storage covered
428  * by the requests on the transfer gets marked as our of sync. Called from the
429  * receiver thread and the worker thread.
430  */
431 void tl_clear(struct drbd_tconn *tconn)
432 {
433         struct drbd_conf *mdev;
434         struct list_head *le, *tle;
435         struct drbd_request *r;
436         int minor;
437
438         spin_lock_irq(&tconn->req_lock);
439
440         _tl_restart(tconn, CONNECTION_LOST_WHILE_PENDING);
441
442         /* we expect this list to be empty. */
443         if (!list_empty(&tconn->out_of_sequence_requests))
444                 conn_err(tconn, "ASSERT FAILED list_empty(&out_of_sequence_requests)\n");
445
446         /* but just in case, clean it up anyways! */
447         list_for_each_safe(le, tle, &tconn->out_of_sequence_requests) {
448                 r = list_entry(le, struct drbd_request, tl_requests);
449                 /* It would be nice to complete outside of spinlock.
450                  * But this is easier for now. */
451                 _req_mod(r, CONNECTION_LOST_WHILE_PENDING);
452         }
453
454         /* ensure bit indicating barrier is required is clear */
455         idr_for_each_entry(&tconn->volumes, mdev, minor)
456                 clear_bit(CREATE_BARRIER, &mdev->flags);
457
458         spin_unlock_irq(&tconn->req_lock);
459 }
460
461 void tl_restart(struct drbd_tconn *tconn, enum drbd_req_event what)
462 {
463         spin_lock_irq(&tconn->req_lock);
464         _tl_restart(tconn, what);
465         spin_unlock_irq(&tconn->req_lock);
466 }
467
468 static int drbd_thread_setup(void *arg)
469 {
470         struct drbd_thread *thi = (struct drbd_thread *) arg;
471         struct drbd_tconn *tconn = thi->tconn;
472         unsigned long flags;
473         int retval;
474
475         snprintf(current->comm, sizeof(current->comm), "drbd_%c_%s",
476                  thi->name[0], thi->tconn->name);
477
478 restart:
479         retval = thi->function(thi);
480
481         spin_lock_irqsave(&thi->t_lock, flags);
482
483         /* if the receiver has been "EXITING", the last thing it did
484          * was set the conn state to "StandAlone",
485          * if now a re-connect request comes in, conn state goes C_UNCONNECTED,
486          * and receiver thread will be "started".
487          * drbd_thread_start needs to set "RESTARTING" in that case.
488          * t_state check and assignment needs to be within the same spinlock,
489          * so either thread_start sees EXITING, and can remap to RESTARTING,
490          * or thread_start see NONE, and can proceed as normal.
491          */
492
493         if (thi->t_state == RESTARTING) {
494                 conn_info(tconn, "Restarting %s thread\n", thi->name);
495                 thi->t_state = RUNNING;
496                 spin_unlock_irqrestore(&thi->t_lock, flags);
497                 goto restart;
498         }
499
500         thi->task = NULL;
501         thi->t_state = NONE;
502         smp_mb();
503         complete(&thi->stop);
504         spin_unlock_irqrestore(&thi->t_lock, flags);
505
506         conn_info(tconn, "Terminating %s\n", current->comm);
507
508         /* Release mod reference taken when thread was started */
509         module_put(THIS_MODULE);
510         return retval;
511 }
512
513 static void drbd_thread_init(struct drbd_tconn *tconn, struct drbd_thread *thi,
514                              int (*func) (struct drbd_thread *), char *name)
515 {
516         spin_lock_init(&thi->t_lock);
517         thi->task    = NULL;
518         thi->t_state = NONE;
519         thi->function = func;
520         thi->tconn = tconn;
521         strncpy(thi->name, name, ARRAY_SIZE(thi->name));
522 }
523
524 int drbd_thread_start(struct drbd_thread *thi)
525 {
526         struct drbd_tconn *tconn = thi->tconn;
527         struct task_struct *nt;
528         unsigned long flags;
529
530         /* is used from state engine doing drbd_thread_stop_nowait,
531          * while holding the req lock irqsave */
532         spin_lock_irqsave(&thi->t_lock, flags);
533
534         switch (thi->t_state) {
535         case NONE:
536                 conn_info(tconn, "Starting %s thread (from %s [%d])\n",
537                          thi->name, current->comm, current->pid);
538
539                 /* Get ref on module for thread - this is released when thread exits */
540                 if (!try_module_get(THIS_MODULE)) {
541                         conn_err(tconn, "Failed to get module reference in drbd_thread_start\n");
542                         spin_unlock_irqrestore(&thi->t_lock, flags);
543                         return false;
544                 }
545
546                 init_completion(&thi->stop);
547                 thi->reset_cpu_mask = 1;
548                 thi->t_state = RUNNING;
549                 spin_unlock_irqrestore(&thi->t_lock, flags);
550                 flush_signals(current); /* otherw. may get -ERESTARTNOINTR */
551
552                 nt = kthread_create(drbd_thread_setup, (void *) thi,
553                                     "drbd_%c_%s", thi->name[0], thi->tconn->name);
554
555                 if (IS_ERR(nt)) {
556                         conn_err(tconn, "Couldn't start thread\n");
557
558                         module_put(THIS_MODULE);
559                         return false;
560                 }
561                 spin_lock_irqsave(&thi->t_lock, flags);
562                 thi->task = nt;
563                 thi->t_state = RUNNING;
564                 spin_unlock_irqrestore(&thi->t_lock, flags);
565                 wake_up_process(nt);
566                 break;
567         case EXITING:
568                 thi->t_state = RESTARTING;
569                 conn_info(tconn, "Restarting %s thread (from %s [%d])\n",
570                                 thi->name, current->comm, current->pid);
571                 /* fall through */
572         case RUNNING:
573         case RESTARTING:
574         default:
575                 spin_unlock_irqrestore(&thi->t_lock, flags);
576                 break;
577         }
578
579         return true;
580 }
581
582
583 void _drbd_thread_stop(struct drbd_thread *thi, int restart, int wait)
584 {
585         unsigned long flags;
586
587         enum drbd_thread_state ns = restart ? RESTARTING : EXITING;
588
589         /* may be called from state engine, holding the req lock irqsave */
590         spin_lock_irqsave(&thi->t_lock, flags);
591
592         if (thi->t_state == NONE) {
593                 spin_unlock_irqrestore(&thi->t_lock, flags);
594                 if (restart)
595                         drbd_thread_start(thi);
596                 return;
597         }
598
599         if (thi->t_state != ns) {
600                 if (thi->task == NULL) {
601                         spin_unlock_irqrestore(&thi->t_lock, flags);
602                         return;
603                 }
604
605                 thi->t_state = ns;
606                 smp_mb();
607                 init_completion(&thi->stop);
608                 if (thi->task != current)
609                         force_sig(DRBD_SIGKILL, thi->task);
610         }
611
612         spin_unlock_irqrestore(&thi->t_lock, flags);
613
614         if (wait)
615                 wait_for_completion(&thi->stop);
616 }
617
618 static struct drbd_thread *drbd_task_to_thread(struct drbd_tconn *tconn, struct task_struct *task)
619 {
620         struct drbd_thread *thi =
621                 task == tconn->receiver.task ? &tconn->receiver :
622                 task == tconn->asender.task  ? &tconn->asender :
623                 task == tconn->worker.task   ? &tconn->worker : NULL;
624
625         return thi;
626 }
627
628 char *drbd_task_to_thread_name(struct drbd_tconn *tconn, struct task_struct *task)
629 {
630         struct drbd_thread *thi = drbd_task_to_thread(tconn, task);
631         return thi ? thi->name : task->comm;
632 }
633
634 int conn_lowest_minor(struct drbd_tconn *tconn)
635 {
636         int minor = 0;
637
638         if (!idr_get_next(&tconn->volumes, &minor))
639                 return -1;
640         return minor;
641 }
642
643 #ifdef CONFIG_SMP
644 /**
645  * drbd_calc_cpu_mask() - Generate CPU masks, spread over all CPUs
646  * @mdev:       DRBD device.
647  *
648  * Forces all threads of a device onto the same CPU. This is beneficial for
649  * DRBD's performance. May be overwritten by user's configuration.
650  */
651 void drbd_calc_cpu_mask(struct drbd_tconn *tconn)
652 {
653         int ord, cpu;
654
655         /* user override. */
656         if (cpumask_weight(tconn->cpu_mask))
657                 return;
658
659         ord = conn_lowest_minor(tconn) % cpumask_weight(cpu_online_mask);
660         for_each_online_cpu(cpu) {
661                 if (ord-- == 0) {
662                         cpumask_set_cpu(cpu, tconn->cpu_mask);
663                         return;
664                 }
665         }
666         /* should not be reached */
667         cpumask_setall(tconn->cpu_mask);
668 }
669
670 /**
671  * drbd_thread_current_set_cpu() - modifies the cpu mask of the _current_ thread
672  * @mdev:       DRBD device.
673  * @thi:        drbd_thread object
674  *
675  * call in the "main loop" of _all_ threads, no need for any mutex, current won't die
676  * prematurely.
677  */
678 void drbd_thread_current_set_cpu(struct drbd_thread *thi)
679 {
680         struct task_struct *p = current;
681
682         if (!thi->reset_cpu_mask)
683                 return;
684         thi->reset_cpu_mask = 0;
685         set_cpus_allowed_ptr(p, thi->tconn->cpu_mask);
686 }
687 #endif
688
689 static void prepare_header80(struct p_header80 *h, enum drbd_packet cmd, int size)
690 {
691         h->magic   = cpu_to_be32(DRBD_MAGIC);
692         h->command = cpu_to_be16(cmd);
693         h->length  = cpu_to_be16(size);
694 }
695
696 static void prepare_header95(struct p_header95 *h, enum drbd_packet cmd, int size)
697 {
698         h->magic   = cpu_to_be16(DRBD_MAGIC_BIG);
699         h->command = cpu_to_be16(cmd);
700         h->length  = cpu_to_be32(size);
701 }
702
703 static void _prepare_header(struct drbd_tconn *tconn, int vnr, struct p_header *h,
704                             enum drbd_packet cmd, int size)
705 {
706         if (tconn->agreed_pro_version >= 100 || size > DRBD_MAX_SIZE_H80_PACKET)
707                 prepare_header95(&h->h95, cmd, size);
708         else
709                 prepare_header80(&h->h80, cmd, size);
710 }
711
712 static void prepare_header(struct drbd_conf *mdev, struct p_header *h,
713                            enum drbd_packet cmd, int size)
714 {
715         _prepare_header(mdev->tconn, mdev->vnr, h, cmd, size);
716 }
717
718 /* the appropriate socket mutex must be held already */
719 int _conn_send_cmd(struct drbd_tconn *tconn, int vnr, struct socket *sock,
720                    enum drbd_packet cmd, struct p_header *h, size_t size,
721                    unsigned msg_flags)
722 {
723         int sent, ok;
724
725         _prepare_header(tconn, vnr, h, cmd, size - sizeof(struct p_header));
726
727         sent = drbd_send(tconn, sock, h, size, msg_flags);
728
729         ok = (sent == size);
730         if (!ok && !signal_pending(current))
731                 conn_warn(tconn, "short sent %s size=%d sent=%d\n",
732                           cmdname(cmd), (int)size, sent);
733         return ok;
734 }
735
736 /* don't pass the socket. we may only look at it
737  * when we hold the appropriate socket mutex.
738  */
739 int conn_send_cmd(struct drbd_tconn *tconn, int vnr, int use_data_socket,
740                   enum drbd_packet cmd, struct p_header *h, size_t size)
741 {
742         int ok = 0;
743         struct socket *sock;
744
745         if (use_data_socket) {
746                 mutex_lock(&tconn->data.mutex);
747                 sock = tconn->data.socket;
748         } else {
749                 mutex_lock(&tconn->meta.mutex);
750                 sock = tconn->meta.socket;
751         }
752
753         /* drbd_disconnect() could have called drbd_free_sock()
754          * while we were waiting in down()... */
755         if (likely(sock != NULL))
756                 ok = _conn_send_cmd(tconn, vnr, sock, cmd, h, size, 0);
757
758         if (use_data_socket)
759                 mutex_unlock(&tconn->data.mutex);
760         else
761                 mutex_unlock(&tconn->meta.mutex);
762         return ok;
763 }
764
765 int conn_send_cmd2(struct drbd_tconn *tconn, enum drbd_packet cmd, char *data,
766                    size_t size)
767 {
768         struct p_header80 h;
769         int ok;
770
771         prepare_header80(&h, cmd, size);
772
773         if (!drbd_get_data_sock(tconn))
774                 return 0;
775
776         ok = (sizeof(h) ==
777                 drbd_send(tconn, tconn->data.socket, &h, sizeof(h), 0));
778         ok = ok && (size ==
779                 drbd_send(tconn, tconn->data.socket, data, size, 0));
780
781         drbd_put_data_sock(tconn);
782
783         return ok;
784 }
785
786 int drbd_send_sync_param(struct drbd_conf *mdev, struct syncer_conf *sc)
787 {
788         struct p_rs_param_95 *p;
789         struct socket *sock;
790         int size, rv;
791         const int apv = mdev->tconn->agreed_pro_version;
792
793         size = apv <= 87 ? sizeof(struct p_rs_param)
794                 : apv == 88 ? sizeof(struct p_rs_param)
795                         + strlen(mdev->sync_conf.verify_alg) + 1
796                 : apv <= 94 ? sizeof(struct p_rs_param_89)
797                 : /* apv >= 95 */ sizeof(struct p_rs_param_95);
798
799         /* used from admin command context and receiver/worker context.
800          * to avoid kmalloc, grab the socket right here,
801          * then use the pre-allocated sbuf there */
802         mutex_lock(&mdev->tconn->data.mutex);
803         sock = mdev->tconn->data.socket;
804
805         if (likely(sock != NULL)) {
806                 enum drbd_packet cmd =
807                         apv >= 89 ? P_SYNC_PARAM89 : P_SYNC_PARAM;
808
809                 p = &mdev->tconn->data.sbuf.rs_param_95;
810
811                 /* initialize verify_alg and csums_alg */
812                 memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
813
814                 p->rate = cpu_to_be32(sc->rate);
815                 p->c_plan_ahead = cpu_to_be32(sc->c_plan_ahead);
816                 p->c_delay_target = cpu_to_be32(sc->c_delay_target);
817                 p->c_fill_target = cpu_to_be32(sc->c_fill_target);
818                 p->c_max_rate = cpu_to_be32(sc->c_max_rate);
819
820                 if (apv >= 88)
821                         strcpy(p->verify_alg, mdev->sync_conf.verify_alg);
822                 if (apv >= 89)
823                         strcpy(p->csums_alg, mdev->sync_conf.csums_alg);
824
825                 rv = _drbd_send_cmd(mdev, sock, cmd, &p->head, size, 0);
826         } else
827                 rv = 0; /* not ok */
828
829         mutex_unlock(&mdev->tconn->data.mutex);
830
831         return rv;
832 }
833
834 int drbd_send_protocol(struct drbd_tconn *tconn)
835 {
836         struct p_protocol *p;
837         int size, cf, rv;
838
839         size = sizeof(struct p_protocol);
840
841         if (tconn->agreed_pro_version >= 87)
842                 size += strlen(tconn->net_conf->integrity_alg) + 1;
843
844         /* we must not recurse into our own queue,
845          * as that is blocked during handshake */
846         p = kmalloc(size, GFP_NOIO);
847         if (p == NULL)
848                 return 0;
849
850         p->protocol      = cpu_to_be32(tconn->net_conf->wire_protocol);
851         p->after_sb_0p   = cpu_to_be32(tconn->net_conf->after_sb_0p);
852         p->after_sb_1p   = cpu_to_be32(tconn->net_conf->after_sb_1p);
853         p->after_sb_2p   = cpu_to_be32(tconn->net_conf->after_sb_2p);
854         p->two_primaries = cpu_to_be32(tconn->net_conf->two_primaries);
855
856         cf = 0;
857         if (tconn->net_conf->want_lose)
858                 cf |= CF_WANT_LOSE;
859         if (tconn->net_conf->dry_run) {
860                 if (tconn->agreed_pro_version >= 92)
861                         cf |= CF_DRY_RUN;
862                 else {
863                         conn_err(tconn, "--dry-run is not supported by peer");
864                         kfree(p);
865                         return -1;
866                 }
867         }
868         p->conn_flags    = cpu_to_be32(cf);
869
870         if (tconn->agreed_pro_version >= 87)
871                 strcpy(p->integrity_alg, tconn->net_conf->integrity_alg);
872
873         rv = conn_send_cmd2(tconn, P_PROTOCOL, p->head.payload, size - sizeof(struct p_header));
874         kfree(p);
875         return rv;
876 }
877
878 int _drbd_send_uuids(struct drbd_conf *mdev, u64 uuid_flags)
879 {
880         struct p_uuids p;
881         int i;
882
883         if (!get_ldev_if_state(mdev, D_NEGOTIATING))
884                 return 1;
885
886         for (i = UI_CURRENT; i < UI_SIZE; i++)
887                 p.uuid[i] = mdev->ldev ? cpu_to_be64(mdev->ldev->md.uuid[i]) : 0;
888
889         mdev->comm_bm_set = drbd_bm_total_weight(mdev);
890         p.uuid[UI_SIZE] = cpu_to_be64(mdev->comm_bm_set);
891         uuid_flags |= mdev->tconn->net_conf->want_lose ? 1 : 0;
892         uuid_flags |= test_bit(CRASHED_PRIMARY, &mdev->flags) ? 2 : 0;
893         uuid_flags |= mdev->new_state_tmp.disk == D_INCONSISTENT ? 4 : 0;
894         p.uuid[UI_FLAGS] = cpu_to_be64(uuid_flags);
895
896         put_ldev(mdev);
897
898         return drbd_send_cmd(mdev, USE_DATA_SOCKET, P_UUIDS, &p.head, sizeof(p));
899 }
900
901 int drbd_send_uuids(struct drbd_conf *mdev)
902 {
903         return _drbd_send_uuids(mdev, 0);
904 }
905
906 int drbd_send_uuids_skip_initial_sync(struct drbd_conf *mdev)
907 {
908         return _drbd_send_uuids(mdev, 8);
909 }
910
911 void drbd_print_uuids(struct drbd_conf *mdev, const char *text)
912 {
913         if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
914                 u64 *uuid = mdev->ldev->md.uuid;
915                 dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX\n",
916                      text,
917                      (unsigned long long)uuid[UI_CURRENT],
918                      (unsigned long long)uuid[UI_BITMAP],
919                      (unsigned long long)uuid[UI_HISTORY_START],
920                      (unsigned long long)uuid[UI_HISTORY_END]);
921                 put_ldev(mdev);
922         } else {
923                 dev_info(DEV, "%s effective data uuid: %016llX\n",
924                                 text,
925                                 (unsigned long long)mdev->ed_uuid);
926         }
927 }
928
929 int drbd_gen_and_send_sync_uuid(struct drbd_conf *mdev)
930 {
931         struct p_rs_uuid p;
932         u64 uuid;
933
934         D_ASSERT(mdev->state.disk == D_UP_TO_DATE);
935
936         uuid = mdev->ldev->md.uuid[UI_BITMAP] + UUID_NEW_BM_OFFSET;
937         drbd_uuid_set(mdev, UI_BITMAP, uuid);
938         drbd_print_uuids(mdev, "updated sync UUID");
939         drbd_md_sync(mdev);
940         p.uuid = cpu_to_be64(uuid);
941
942         return drbd_send_cmd(mdev, USE_DATA_SOCKET, P_SYNC_UUID, &p.head, sizeof(p));
943 }
944
945 int drbd_send_sizes(struct drbd_conf *mdev, int trigger_reply, enum dds_flags flags)
946 {
947         struct p_sizes p;
948         sector_t d_size, u_size;
949         int q_order_type, max_bio_size;
950         int ok;
951
952         if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
953                 D_ASSERT(mdev->ldev->backing_bdev);
954                 d_size = drbd_get_max_capacity(mdev->ldev);
955                 u_size = mdev->ldev->dc.disk_size;
956                 q_order_type = drbd_queue_order_type(mdev);
957                 max_bio_size = queue_max_hw_sectors(mdev->ldev->backing_bdev->bd_disk->queue) << 9;
958                 max_bio_size = min_t(int, max_bio_size, DRBD_MAX_BIO_SIZE);
959                 put_ldev(mdev);
960         } else {
961                 d_size = 0;
962                 u_size = 0;
963                 q_order_type = QUEUE_ORDERED_NONE;
964                 max_bio_size = DRBD_MAX_BIO_SIZE; /* ... multiple BIOs per peer_request */
965         }
966
967         p.d_size = cpu_to_be64(d_size);
968         p.u_size = cpu_to_be64(u_size);
969         p.c_size = cpu_to_be64(trigger_reply ? 0 : drbd_get_capacity(mdev->this_bdev));
970         p.max_bio_size = cpu_to_be32(max_bio_size);
971         p.queue_order_type = cpu_to_be16(q_order_type);
972         p.dds_flags = cpu_to_be16(flags);
973
974         ok = drbd_send_cmd(mdev, USE_DATA_SOCKET, P_SIZES, &p.head, sizeof(p));
975         return ok;
976 }
977
978 /**
979  * drbd_send_state() - Sends the drbd state to the peer
980  * @mdev:       DRBD device.
981  */
982 int drbd_send_state(struct drbd_conf *mdev)
983 {
984         struct socket *sock;
985         struct p_state p;
986         int ok = 0;
987
988         mutex_lock(&mdev->tconn->data.mutex);
989
990         p.state = cpu_to_be32(mdev->state.i); /* Within the send mutex */
991         sock = mdev->tconn->data.socket;
992
993         if (likely(sock != NULL)) {
994                 ok = _drbd_send_cmd(mdev, sock, P_STATE, &p.head, sizeof(p), 0);
995         }
996
997         mutex_unlock(&mdev->tconn->data.mutex);
998
999         return ok;
1000 }
1001
1002 int _conn_send_state_req(struct drbd_tconn *tconn, int vnr, enum drbd_packet cmd,
1003                          union drbd_state mask, union drbd_state val)
1004 {
1005         struct p_req_state p;
1006
1007         p.mask    = cpu_to_be32(mask.i);
1008         p.val     = cpu_to_be32(val.i);
1009
1010         return conn_send_cmd(tconn, vnr, USE_DATA_SOCKET, cmd, &p.head, sizeof(p));
1011 }
1012
1013 int drbd_send_sr_reply(struct drbd_conf *mdev, enum drbd_state_rv retcode)
1014 {
1015         struct p_req_state_reply p;
1016
1017         p.retcode    = cpu_to_be32(retcode);
1018
1019         return drbd_send_cmd(mdev, USE_META_SOCKET, P_STATE_CHG_REPLY, &p.head, sizeof(p));
1020 }
1021
1022 int conn_send_sr_reply(struct drbd_tconn *tconn, enum drbd_state_rv retcode)
1023 {
1024         struct p_req_state_reply p;
1025         enum drbd_packet cmd = tconn->agreed_pro_version < 100 ? P_STATE_CHG_REPLY : P_CONN_ST_CHG_REPLY;
1026
1027         p.retcode    = cpu_to_be32(retcode);
1028
1029         return conn_send_cmd(tconn, 0, USE_META_SOCKET, cmd, &p.head, sizeof(p));
1030 }
1031
1032 int fill_bitmap_rle_bits(struct drbd_conf *mdev,
1033         struct p_compressed_bm *p,
1034         struct bm_xfer_ctx *c)
1035 {
1036         struct bitstream bs;
1037         unsigned long plain_bits;
1038         unsigned long tmp;
1039         unsigned long rl;
1040         unsigned len;
1041         unsigned toggle;
1042         int bits;
1043
1044         /* may we use this feature? */
1045         if ((mdev->sync_conf.use_rle == 0) ||
1046                 (mdev->tconn->agreed_pro_version < 90))
1047                         return 0;
1048
1049         if (c->bit_offset >= c->bm_bits)
1050                 return 0; /* nothing to do. */
1051
1052         /* use at most thus many bytes */
1053         bitstream_init(&bs, p->code, BM_PACKET_VLI_BYTES_MAX, 0);
1054         memset(p->code, 0, BM_PACKET_VLI_BYTES_MAX);
1055         /* plain bits covered in this code string */
1056         plain_bits = 0;
1057
1058         /* p->encoding & 0x80 stores whether the first run length is set.
1059          * bit offset is implicit.
1060          * start with toggle == 2 to be able to tell the first iteration */
1061         toggle = 2;
1062
1063         /* see how much plain bits we can stuff into one packet
1064          * using RLE and VLI. */
1065         do {
1066                 tmp = (toggle == 0) ? _drbd_bm_find_next_zero(mdev, c->bit_offset)
1067                                     : _drbd_bm_find_next(mdev, c->bit_offset);
1068                 if (tmp == -1UL)
1069                         tmp = c->bm_bits;
1070                 rl = tmp - c->bit_offset;
1071
1072                 if (toggle == 2) { /* first iteration */
1073                         if (rl == 0) {
1074                                 /* the first checked bit was set,
1075                                  * store start value, */
1076                                 DCBP_set_start(p, 1);
1077                                 /* but skip encoding of zero run length */
1078                                 toggle = !toggle;
1079                                 continue;
1080                         }
1081                         DCBP_set_start(p, 0);
1082                 }
1083
1084                 /* paranoia: catch zero runlength.
1085                  * can only happen if bitmap is modified while we scan it. */
1086                 if (rl == 0) {
1087                         dev_err(DEV, "unexpected zero runlength while encoding bitmap "
1088                             "t:%u bo:%lu\n", toggle, c->bit_offset);
1089                         return -1;
1090                 }
1091
1092                 bits = vli_encode_bits(&bs, rl);
1093                 if (bits == -ENOBUFS) /* buffer full */
1094                         break;
1095                 if (bits <= 0) {
1096                         dev_err(DEV, "error while encoding bitmap: %d\n", bits);
1097                         return 0;
1098                 }
1099
1100                 toggle = !toggle;
1101                 plain_bits += rl;
1102                 c->bit_offset = tmp;
1103         } while (c->bit_offset < c->bm_bits);
1104
1105         len = bs.cur.b - p->code + !!bs.cur.bit;
1106
1107         if (plain_bits < (len << 3)) {
1108                 /* incompressible with this method.
1109                  * we need to rewind both word and bit position. */
1110                 c->bit_offset -= plain_bits;
1111                 bm_xfer_ctx_bit_to_word_offset(c);
1112                 c->bit_offset = c->word_offset * BITS_PER_LONG;
1113                 return 0;
1114         }
1115
1116         /* RLE + VLI was able to compress it just fine.
1117          * update c->word_offset. */
1118         bm_xfer_ctx_bit_to_word_offset(c);
1119
1120         /* store pad_bits */
1121         DCBP_set_pad_bits(p, (8 - bs.cur.bit) & 0x7);
1122
1123         return len;
1124 }
1125
1126 /**
1127  * send_bitmap_rle_or_plain
1128  *
1129  * Return 0 when done, 1 when another iteration is needed, and a negative error
1130  * code upon failure.
1131  */
1132 static int
1133 send_bitmap_rle_or_plain(struct drbd_conf *mdev,
1134                          struct p_header *h, struct bm_xfer_ctx *c)
1135 {
1136         struct p_compressed_bm *p = (void*)h;
1137         unsigned long num_words;
1138         int len;
1139         int ok;
1140
1141         len = fill_bitmap_rle_bits(mdev, p, c);
1142
1143         if (len < 0)
1144                 return -EIO;
1145
1146         if (len) {
1147                 DCBP_set_code(p, RLE_VLI_Bits);
1148                 ok = _drbd_send_cmd(mdev, mdev->tconn->data.socket, P_COMPRESSED_BITMAP, h,
1149                         sizeof(*p) + len, 0);
1150
1151                 c->packets[0]++;
1152                 c->bytes[0] += sizeof(*p) + len;
1153
1154                 if (c->bit_offset >= c->bm_bits)
1155                         len = 0; /* DONE */
1156         } else {
1157                 /* was not compressible.
1158                  * send a buffer full of plain text bits instead. */
1159                 num_words = min_t(size_t, BM_PACKET_WORDS, c->bm_words - c->word_offset);
1160                 len = num_words * sizeof(long);
1161                 if (len)
1162                         drbd_bm_get_lel(mdev, c->word_offset, num_words, (unsigned long*)h->payload);
1163                 ok = _drbd_send_cmd(mdev, mdev->tconn->data.socket, P_BITMAP,
1164                                    h, sizeof(struct p_header80) + len, 0);
1165                 c->word_offset += num_words;
1166                 c->bit_offset = c->word_offset * BITS_PER_LONG;
1167
1168                 c->packets[1]++;
1169                 c->bytes[1] += sizeof(struct p_header80) + len;
1170
1171                 if (c->bit_offset > c->bm_bits)
1172                         c->bit_offset = c->bm_bits;
1173         }
1174         if (ok) {
1175                 if (len == 0) {
1176                         INFO_bm_xfer_stats(mdev, "send", c);
1177                         return 0;
1178                 } else
1179                         return 1;
1180         }
1181         return -EIO;
1182 }
1183
1184 /* See the comment at receive_bitmap() */
1185 int _drbd_send_bitmap(struct drbd_conf *mdev)
1186 {
1187         struct bm_xfer_ctx c;
1188         struct p_header *p;
1189         int err;
1190
1191         if (!expect(mdev->bitmap))
1192                 return false;
1193
1194         /* maybe we should use some per thread scratch page,
1195          * and allocate that during initial device creation? */
1196         p = (struct p_header *) __get_free_page(GFP_NOIO);
1197         if (!p) {
1198                 dev_err(DEV, "failed to allocate one page buffer in %s\n", __func__);
1199                 return false;
1200         }
1201
1202         if (get_ldev(mdev)) {
1203                 if (drbd_md_test_flag(mdev->ldev, MDF_FULL_SYNC)) {
1204                         dev_info(DEV, "Writing the whole bitmap, MDF_FullSync was set.\n");
1205                         drbd_bm_set_all(mdev);
1206                         if (drbd_bm_write(mdev)) {
1207                                 /* write_bm did fail! Leave full sync flag set in Meta P_DATA
1208                                  * but otherwise process as per normal - need to tell other
1209                                  * side that a full resync is required! */
1210                                 dev_err(DEV, "Failed to write bitmap to disk!\n");
1211                         } else {
1212                                 drbd_md_clear_flag(mdev, MDF_FULL_SYNC);
1213                                 drbd_md_sync(mdev);
1214                         }
1215                 }
1216                 put_ldev(mdev);
1217         }
1218
1219         c = (struct bm_xfer_ctx) {
1220                 .bm_bits = drbd_bm_bits(mdev),
1221                 .bm_words = drbd_bm_words(mdev),
1222         };
1223
1224         do {
1225                 err = send_bitmap_rle_or_plain(mdev, p, &c);
1226         } while (err > 0);
1227
1228         free_page((unsigned long) p);
1229         return err == 0;
1230 }
1231
1232 int drbd_send_bitmap(struct drbd_conf *mdev)
1233 {
1234         int err;
1235
1236         if (!drbd_get_data_sock(mdev->tconn))
1237                 return -1;
1238         err = !_drbd_send_bitmap(mdev);
1239         drbd_put_data_sock(mdev->tconn);
1240         return err;
1241 }
1242
1243 int drbd_send_b_ack(struct drbd_conf *mdev, u32 barrier_nr, u32 set_size)
1244 {
1245         int ok;
1246         struct p_barrier_ack p;
1247
1248         p.barrier  = barrier_nr;
1249         p.set_size = cpu_to_be32(set_size);
1250
1251         if (mdev->state.conn < C_CONNECTED)
1252                 return false;
1253         ok = drbd_send_cmd(mdev, USE_META_SOCKET, P_BARRIER_ACK, &p.head, sizeof(p));
1254         return ok;
1255 }
1256
1257 /**
1258  * _drbd_send_ack() - Sends an ack packet
1259  * @mdev:       DRBD device.
1260  * @cmd:        Packet command code.
1261  * @sector:     sector, needs to be in big endian byte order
1262  * @blksize:    size in byte, needs to be in big endian byte order
1263  * @block_id:   Id, big endian byte order
1264  */
1265 static int _drbd_send_ack(struct drbd_conf *mdev, enum drbd_packet cmd,
1266                           u64 sector, u32 blksize, u64 block_id)
1267 {
1268         int ok;
1269         struct p_block_ack p;
1270
1271         p.sector   = sector;
1272         p.block_id = block_id;
1273         p.blksize  = blksize;
1274         p.seq_num  = cpu_to_be32(atomic_inc_return(&mdev->packet_seq));
1275
1276         if (!mdev->tconn->meta.socket || mdev->state.conn < C_CONNECTED)
1277                 return false;
1278         ok = drbd_send_cmd(mdev, USE_META_SOCKET, cmd, &p.head, sizeof(p));
1279         return ok;
1280 }
1281
1282 /* dp->sector and dp->block_id already/still in network byte order,
1283  * data_size is payload size according to dp->head,
1284  * and may need to be corrected for digest size. */
1285 int drbd_send_ack_dp(struct drbd_conf *mdev, enum drbd_packet cmd,
1286                      struct p_data *dp, int data_size)
1287 {
1288         data_size -= (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_r_tfm) ?
1289                 crypto_hash_digestsize(mdev->tconn->integrity_r_tfm) : 0;
1290         return _drbd_send_ack(mdev, cmd, dp->sector, cpu_to_be32(data_size),
1291                               dp->block_id);
1292 }
1293
1294 int drbd_send_ack_rp(struct drbd_conf *mdev, enum drbd_packet cmd,
1295                      struct p_block_req *rp)
1296 {
1297         return _drbd_send_ack(mdev, cmd, rp->sector, rp->blksize, rp->block_id);
1298 }
1299
1300 /**
1301  * drbd_send_ack() - Sends an ack packet
1302  * @mdev:       DRBD device
1303  * @cmd:        packet command code
1304  * @peer_req:   peer request
1305  */
1306 int drbd_send_ack(struct drbd_conf *mdev, enum drbd_packet cmd,
1307                   struct drbd_peer_request *peer_req)
1308 {
1309         return _drbd_send_ack(mdev, cmd,
1310                               cpu_to_be64(peer_req->i.sector),
1311                               cpu_to_be32(peer_req->i.size),
1312                               peer_req->block_id);
1313 }
1314
1315 /* This function misuses the block_id field to signal if the blocks
1316  * are is sync or not. */
1317 int drbd_send_ack_ex(struct drbd_conf *mdev, enum drbd_packet cmd,
1318                      sector_t sector, int blksize, u64 block_id)
1319 {
1320         return _drbd_send_ack(mdev, cmd,
1321                               cpu_to_be64(sector),
1322                               cpu_to_be32(blksize),
1323                               cpu_to_be64(block_id));
1324 }
1325
1326 int drbd_send_drequest(struct drbd_conf *mdev, int cmd,
1327                        sector_t sector, int size, u64 block_id)
1328 {
1329         int ok;
1330         struct p_block_req p;
1331
1332         p.sector   = cpu_to_be64(sector);
1333         p.block_id = block_id;
1334         p.blksize  = cpu_to_be32(size);
1335
1336         ok = drbd_send_cmd(mdev, USE_DATA_SOCKET, cmd, &p.head, sizeof(p));
1337         return ok;
1338 }
1339
1340 int drbd_send_drequest_csum(struct drbd_conf *mdev, sector_t sector, int size,
1341                             void *digest, int digest_size, enum drbd_packet cmd)
1342 {
1343         int ok;
1344         struct p_block_req p;
1345
1346         prepare_header(mdev, &p.head, cmd, sizeof(p) - sizeof(struct p_header) + digest_size);
1347         p.sector   = cpu_to_be64(sector);
1348         p.block_id = ID_SYNCER /* unused */;
1349         p.blksize  = cpu_to_be32(size);
1350
1351         mutex_lock(&mdev->tconn->data.mutex);
1352
1353         ok = (sizeof(p) == drbd_send(mdev->tconn, mdev->tconn->data.socket, &p, sizeof(p), 0));
1354         ok = ok && (digest_size == drbd_send(mdev->tconn, mdev->tconn->data.socket, digest, digest_size, 0));
1355
1356         mutex_unlock(&mdev->tconn->data.mutex);
1357
1358         return ok;
1359 }
1360
1361 int drbd_send_ov_request(struct drbd_conf *mdev, sector_t sector, int size)
1362 {
1363         int ok;
1364         struct p_block_req p;
1365
1366         p.sector   = cpu_to_be64(sector);
1367         p.block_id = ID_SYNCER /* unused */;
1368         p.blksize  = cpu_to_be32(size);
1369
1370         ok = drbd_send_cmd(mdev, USE_DATA_SOCKET, P_OV_REQUEST, &p.head, sizeof(p));
1371         return ok;
1372 }
1373
1374 /* called on sndtimeo
1375  * returns false if we should retry,
1376  * true if we think connection is dead
1377  */
1378 static int we_should_drop_the_connection(struct drbd_tconn *tconn, struct socket *sock)
1379 {
1380         int drop_it;
1381         /* long elapsed = (long)(jiffies - mdev->last_received); */
1382
1383         drop_it =   tconn->meta.socket == sock
1384                 || !tconn->asender.task
1385                 || get_t_state(&tconn->asender) != RUNNING
1386                 || tconn->cstate < C_WF_REPORT_PARAMS;
1387
1388         if (drop_it)
1389                 return true;
1390
1391         drop_it = !--tconn->ko_count;
1392         if (!drop_it) {
1393                 conn_err(tconn, "[%s/%d] sock_sendmsg time expired, ko = %u\n",
1394                          current->comm, current->pid, tconn->ko_count);
1395                 request_ping(tconn);
1396         }
1397
1398         return drop_it; /* && (mdev->state == R_PRIMARY) */;
1399 }
1400
1401 static void drbd_update_congested(struct drbd_tconn *tconn)
1402 {
1403         struct sock *sk = tconn->data.socket->sk;
1404         if (sk->sk_wmem_queued > sk->sk_sndbuf * 4 / 5)
1405                 set_bit(NET_CONGESTED, &tconn->flags);
1406 }
1407
1408 /* The idea of sendpage seems to be to put some kind of reference
1409  * to the page into the skb, and to hand it over to the NIC. In
1410  * this process get_page() gets called.
1411  *
1412  * As soon as the page was really sent over the network put_page()
1413  * gets called by some part of the network layer. [ NIC driver? ]
1414  *
1415  * [ get_page() / put_page() increment/decrement the count. If count
1416  *   reaches 0 the page will be freed. ]
1417  *
1418  * This works nicely with pages from FSs.
1419  * But this means that in protocol A we might signal IO completion too early!
1420  *
1421  * In order not to corrupt data during a resync we must make sure
1422  * that we do not reuse our own buffer pages (EEs) to early, therefore
1423  * we have the net_ee list.
1424  *
1425  * XFS seems to have problems, still, it submits pages with page_count == 0!
1426  * As a workaround, we disable sendpage on pages
1427  * with page_count == 0 or PageSlab.
1428  */
1429 static int _drbd_no_send_page(struct drbd_conf *mdev, struct page *page,
1430                    int offset, size_t size, unsigned msg_flags)
1431 {
1432         int sent = drbd_send(mdev->tconn, mdev->tconn->data.socket, kmap(page) + offset, size, msg_flags);
1433         kunmap(page);
1434         if (sent == size)
1435                 mdev->send_cnt += size>>9;
1436         return sent == size;
1437 }
1438
1439 static int _drbd_send_page(struct drbd_conf *mdev, struct page *page,
1440                     int offset, size_t size, unsigned msg_flags)
1441 {
1442         mm_segment_t oldfs = get_fs();
1443         int sent, ok;
1444         int len = size;
1445
1446         /* e.g. XFS meta- & log-data is in slab pages, which have a
1447          * page_count of 0 and/or have PageSlab() set.
1448          * we cannot use send_page for those, as that does get_page();
1449          * put_page(); and would cause either a VM_BUG directly, or
1450          * __page_cache_release a page that would actually still be referenced
1451          * by someone, leading to some obscure delayed Oops somewhere else. */
1452         if (disable_sendpage || (page_count(page) < 1) || PageSlab(page))
1453                 return _drbd_no_send_page(mdev, page, offset, size, msg_flags);
1454
1455         msg_flags |= MSG_NOSIGNAL;
1456         drbd_update_congested(mdev->tconn);
1457         set_fs(KERNEL_DS);
1458         do {
1459                 sent = mdev->tconn->data.socket->ops->sendpage(mdev->tconn->data.socket, page,
1460                                                         offset, len,
1461                                                         msg_flags);
1462                 if (sent == -EAGAIN) {
1463                         if (we_should_drop_the_connection(mdev->tconn,
1464                                                           mdev->tconn->data.socket))
1465                                 break;
1466                         else
1467                                 continue;
1468                 }
1469                 if (sent <= 0) {
1470                         dev_warn(DEV, "%s: size=%d len=%d sent=%d\n",
1471                              __func__, (int)size, len, sent);
1472                         break;
1473                 }
1474                 len    -= sent;
1475                 offset += sent;
1476         } while (len > 0 /* THINK && mdev->cstate >= C_CONNECTED*/);
1477         set_fs(oldfs);
1478         clear_bit(NET_CONGESTED, &mdev->tconn->flags);
1479
1480         ok = (len == 0);
1481         if (likely(ok))
1482                 mdev->send_cnt += size>>9;
1483         return ok;
1484 }
1485
1486 static int _drbd_send_bio(struct drbd_conf *mdev, struct bio *bio)
1487 {
1488         struct bio_vec *bvec;
1489         int i;
1490         /* hint all but last page with MSG_MORE */
1491         __bio_for_each_segment(bvec, bio, i, 0) {
1492                 if (!_drbd_no_send_page(mdev, bvec->bv_page,
1493                                      bvec->bv_offset, bvec->bv_len,
1494                                      i == bio->bi_vcnt -1 ? 0 : MSG_MORE))
1495                         return 0;
1496         }
1497         return 1;
1498 }
1499
1500 static int _drbd_send_zc_bio(struct drbd_conf *mdev, struct bio *bio)
1501 {
1502         struct bio_vec *bvec;
1503         int i;
1504         /* hint all but last page with MSG_MORE */
1505         __bio_for_each_segment(bvec, bio, i, 0) {
1506                 if (!_drbd_send_page(mdev, bvec->bv_page,
1507                                      bvec->bv_offset, bvec->bv_len,
1508                                      i == bio->bi_vcnt -1 ? 0 : MSG_MORE))
1509                         return 0;
1510         }
1511         return 1;
1512 }
1513
1514 static int _drbd_send_zc_ee(struct drbd_conf *mdev,
1515                             struct drbd_peer_request *peer_req)
1516 {
1517         struct page *page = peer_req->pages;
1518         unsigned len = peer_req->i.size;
1519
1520         /* hint all but last page with MSG_MORE */
1521         page_chain_for_each(page) {
1522                 unsigned l = min_t(unsigned, len, PAGE_SIZE);
1523                 if (!_drbd_send_page(mdev, page, 0, l,
1524                                 page_chain_next(page) ? MSG_MORE : 0))
1525                         return 0;
1526                 len -= l;
1527         }
1528         return 1;
1529 }
1530
1531 static u32 bio_flags_to_wire(struct drbd_conf *mdev, unsigned long bi_rw)
1532 {
1533         if (mdev->tconn->agreed_pro_version >= 95)
1534                 return  (bi_rw & REQ_SYNC ? DP_RW_SYNC : 0) |
1535                         (bi_rw & REQ_FUA ? DP_FUA : 0) |
1536                         (bi_rw & REQ_FLUSH ? DP_FLUSH : 0) |
1537                         (bi_rw & REQ_DISCARD ? DP_DISCARD : 0);
1538         else
1539                 return bi_rw & REQ_SYNC ? DP_RW_SYNC : 0;
1540 }
1541
1542 /* Used to send write requests
1543  * R_PRIMARY -> Peer    (P_DATA)
1544  */
1545 int drbd_send_dblock(struct drbd_conf *mdev, struct drbd_request *req)
1546 {
1547         int ok = 1;
1548         struct p_data p;
1549         unsigned int dp_flags = 0;
1550         void *dgb;
1551         int dgs;
1552
1553         if (!drbd_get_data_sock(mdev->tconn))
1554                 return 0;
1555
1556         dgs = (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_w_tfm) ?
1557                 crypto_hash_digestsize(mdev->tconn->integrity_w_tfm) : 0;
1558
1559         prepare_header(mdev, &p.head, P_DATA, sizeof(p) - sizeof(struct p_header) + dgs + req->i.size);
1560         p.sector   = cpu_to_be64(req->i.sector);
1561         p.block_id = (unsigned long)req;
1562         p.seq_num  = cpu_to_be32(req->seq_num = atomic_inc_return(&mdev->packet_seq));
1563
1564         dp_flags = bio_flags_to_wire(mdev, req->master_bio->bi_rw);
1565
1566         if (mdev->state.conn >= C_SYNC_SOURCE &&
1567             mdev->state.conn <= C_PAUSED_SYNC_T)
1568                 dp_flags |= DP_MAY_SET_IN_SYNC;
1569
1570         p.dp_flags = cpu_to_be32(dp_flags);
1571         set_bit(UNPLUG_REMOTE, &mdev->flags);
1572         ok = (sizeof(p) ==
1573                 drbd_send(mdev->tconn, mdev->tconn->data.socket, &p, sizeof(p), dgs ? MSG_MORE : 0));
1574         if (ok && dgs) {
1575                 dgb = mdev->tconn->int_dig_out;
1576                 drbd_csum_bio(mdev, mdev->tconn->integrity_w_tfm, req->master_bio, dgb);
1577                 ok = dgs == drbd_send(mdev->tconn, mdev->tconn->data.socket, dgb, dgs, 0);
1578         }
1579         if (ok) {
1580                 /* For protocol A, we have to memcpy the payload into
1581                  * socket buffers, as we may complete right away
1582                  * as soon as we handed it over to tcp, at which point the data
1583                  * pages may become invalid.
1584                  *
1585                  * For data-integrity enabled, we copy it as well, so we can be
1586                  * sure that even if the bio pages may still be modified, it
1587                  * won't change the data on the wire, thus if the digest checks
1588                  * out ok after sending on this side, but does not fit on the
1589                  * receiving side, we sure have detected corruption elsewhere.
1590                  */
1591                 if (mdev->tconn->net_conf->wire_protocol == DRBD_PROT_A || dgs)
1592                         ok = _drbd_send_bio(mdev, req->master_bio);
1593                 else
1594                         ok = _drbd_send_zc_bio(mdev, req->master_bio);
1595
1596                 /* double check digest, sometimes buffers have been modified in flight. */
1597                 if (dgs > 0 && dgs <= 64) {
1598                         /* 64 byte, 512 bit, is the largest digest size
1599                          * currently supported in kernel crypto. */
1600                         unsigned char digest[64];
1601                         drbd_csum_bio(mdev, mdev->tconn->integrity_w_tfm, req->master_bio, digest);
1602                         if (memcmp(mdev->tconn->int_dig_out, digest, dgs)) {
1603                                 dev_warn(DEV,
1604                                         "Digest mismatch, buffer modified by upper layers during write: %llus +%u\n",
1605                                         (unsigned long long)req->i.sector, req->i.size);
1606                         }
1607                 } /* else if (dgs > 64) {
1608                      ... Be noisy about digest too large ...
1609                 } */
1610         }
1611
1612         drbd_put_data_sock(mdev->tconn);
1613
1614         return ok;
1615 }
1616
1617 /* answer packet, used to send data back for read requests:
1618  *  Peer       -> (diskless) R_PRIMARY   (P_DATA_REPLY)
1619  *  C_SYNC_SOURCE -> C_SYNC_TARGET         (P_RS_DATA_REPLY)
1620  */
1621 int drbd_send_block(struct drbd_conf *mdev, enum drbd_packet cmd,
1622                     struct drbd_peer_request *peer_req)
1623 {
1624         int ok;
1625         struct p_data p;
1626         void *dgb;
1627         int dgs;
1628
1629         dgs = (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_w_tfm) ?
1630                 crypto_hash_digestsize(mdev->tconn->integrity_w_tfm) : 0;
1631
1632         prepare_header(mdev, &p.head, cmd, sizeof(p) -
1633                                            sizeof(struct p_header80) +
1634                                            dgs + peer_req->i.size);
1635         p.sector   = cpu_to_be64(peer_req->i.sector);
1636         p.block_id = peer_req->block_id;
1637         p.seq_num = 0;  /* unused */
1638
1639         /* Only called by our kernel thread.
1640          * This one may be interrupted by DRBD_SIG and/or DRBD_SIGKILL
1641          * in response to admin command or module unload.
1642          */
1643         if (!drbd_get_data_sock(mdev->tconn))
1644                 return 0;
1645
1646         ok = sizeof(p) == drbd_send(mdev->tconn, mdev->tconn->data.socket, &p, sizeof(p), dgs ? MSG_MORE : 0);
1647         if (ok && dgs) {
1648                 dgb = mdev->tconn->int_dig_out;
1649                 drbd_csum_ee(mdev, mdev->tconn->integrity_w_tfm, peer_req, dgb);
1650                 ok = dgs == drbd_send(mdev->tconn, mdev->tconn->data.socket, dgb, dgs, 0);
1651         }
1652         if (ok)
1653                 ok = _drbd_send_zc_ee(mdev, peer_req);
1654
1655         drbd_put_data_sock(mdev->tconn);
1656
1657         return ok;
1658 }
1659
1660 int drbd_send_oos(struct drbd_conf *mdev, struct drbd_request *req)
1661 {
1662         struct p_block_desc p;
1663
1664         p.sector  = cpu_to_be64(req->i.sector);
1665         p.blksize = cpu_to_be32(req->i.size);
1666
1667         return drbd_send_cmd(mdev, USE_DATA_SOCKET, P_OUT_OF_SYNC, &p.head, sizeof(p));
1668 }
1669
1670 /*
1671   drbd_send distinguishes two cases:
1672
1673   Packets sent via the data socket "sock"
1674   and packets sent via the meta data socket "msock"
1675
1676                     sock                      msock
1677   -----------------+-------------------------+------------------------------
1678   timeout           conf.timeout / 2          conf.timeout / 2
1679   timeout action    send a ping via msock     Abort communication
1680                                               and close all sockets
1681 */
1682
1683 /*
1684  * you must have down()ed the appropriate [m]sock_mutex elsewhere!
1685  */
1686 int drbd_send(struct drbd_tconn *tconn, struct socket *sock,
1687               void *buf, size_t size, unsigned msg_flags)
1688 {
1689         struct kvec iov;
1690         struct msghdr msg;
1691         int rv, sent = 0;
1692
1693         if (!sock)
1694                 return -1000;
1695
1696         /* THINK  if (signal_pending) return ... ? */
1697
1698         iov.iov_base = buf;
1699         iov.iov_len  = size;
1700
1701         msg.msg_name       = NULL;
1702         msg.msg_namelen    = 0;
1703         msg.msg_control    = NULL;
1704         msg.msg_controllen = 0;
1705         msg.msg_flags      = msg_flags | MSG_NOSIGNAL;
1706
1707         if (sock == tconn->data.socket) {
1708                 tconn->ko_count = tconn->net_conf->ko_count;
1709                 drbd_update_congested(tconn);
1710         }
1711         do {
1712                 /* STRANGE
1713                  * tcp_sendmsg does _not_ use its size parameter at all ?
1714                  *
1715                  * -EAGAIN on timeout, -EINTR on signal.
1716                  */
1717 /* THINK
1718  * do we need to block DRBD_SIG if sock == &meta.socket ??
1719  * otherwise wake_asender() might interrupt some send_*Ack !
1720  */
1721                 rv = kernel_sendmsg(sock, &msg, &iov, 1, size);
1722                 if (rv == -EAGAIN) {
1723                         if (we_should_drop_the_connection(tconn, sock))
1724                                 break;
1725                         else
1726                                 continue;
1727                 }
1728                 if (rv == -EINTR) {
1729                         flush_signals(current);
1730                         rv = 0;
1731                 }
1732                 if (rv < 0)
1733                         break;
1734                 sent += rv;
1735                 iov.iov_base += rv;
1736                 iov.iov_len  -= rv;
1737         } while (sent < size);
1738
1739         if (sock == tconn->data.socket)
1740                 clear_bit(NET_CONGESTED, &tconn->flags);
1741
1742         if (rv <= 0) {
1743                 if (rv != -EAGAIN) {
1744                         conn_err(tconn, "%s_sendmsg returned %d\n",
1745                                  sock == tconn->meta.socket ? "msock" : "sock",
1746                                  rv);
1747                         conn_request_state(tconn, NS(conn, C_BROKEN_PIPE), CS_HARD);
1748                 } else
1749                         conn_request_state(tconn, NS(conn, C_TIMEOUT), CS_HARD);
1750         }
1751
1752         return sent;
1753 }
1754
1755 static int drbd_open(struct block_device *bdev, fmode_t mode)
1756 {
1757         struct drbd_conf *mdev = bdev->bd_disk->private_data;
1758         unsigned long flags;
1759         int rv = 0;
1760
1761         mutex_lock(&drbd_main_mutex);
1762         spin_lock_irqsave(&mdev->tconn->req_lock, flags);
1763         /* to have a stable mdev->state.role
1764          * and no race with updating open_cnt */
1765
1766         if (mdev->state.role != R_PRIMARY) {
1767                 if (mode & FMODE_WRITE)
1768                         rv = -EROFS;
1769                 else if (!allow_oos)
1770                         rv = -EMEDIUMTYPE;
1771         }
1772
1773         if (!rv)
1774                 mdev->open_cnt++;
1775         spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
1776         mutex_unlock(&drbd_main_mutex);
1777
1778         return rv;
1779 }
1780
1781 static int drbd_release(struct gendisk *gd, fmode_t mode)
1782 {
1783         struct drbd_conf *mdev = gd->private_data;
1784         mutex_lock(&drbd_main_mutex);
1785         mdev->open_cnt--;
1786         mutex_unlock(&drbd_main_mutex);
1787         return 0;
1788 }
1789
1790 static void drbd_set_defaults(struct drbd_conf *mdev)
1791 {
1792         /* This way we get a compile error when sync_conf grows,
1793            and we forgot to initialize it here */
1794         mdev->sync_conf = (struct syncer_conf) {
1795                 /* .rate = */           DRBD_RATE_DEF,
1796                 /* .after = */          DRBD_AFTER_DEF,
1797                 /* .al_extents = */     DRBD_AL_EXTENTS_DEF,
1798                 /* .verify_alg = */     {}, 0,
1799                 /* .cpu_mask = */       {}, 0,
1800                 /* .csums_alg = */      {}, 0,
1801                 /* .use_rle = */        0,
1802                 /* .on_no_data = */     DRBD_ON_NO_DATA_DEF,
1803                 /* .c_plan_ahead = */   DRBD_C_PLAN_AHEAD_DEF,
1804                 /* .c_delay_target = */ DRBD_C_DELAY_TARGET_DEF,
1805                 /* .c_fill_target = */  DRBD_C_FILL_TARGET_DEF,
1806                 /* .c_max_rate = */     DRBD_C_MAX_RATE_DEF,
1807                 /* .c_min_rate = */     DRBD_C_MIN_RATE_DEF
1808         };
1809
1810         /* Have to use that way, because the layout differs between
1811            big endian and little endian */
1812         mdev->state = (union drbd_state) {
1813                 { .role = R_SECONDARY,
1814                   .peer = R_UNKNOWN,
1815                   .conn = C_STANDALONE,
1816                   .disk = D_DISKLESS,
1817                   .pdsk = D_UNKNOWN,
1818                   .susp = 0,
1819                   .susp_nod = 0,
1820                   .susp_fen = 0
1821                 } };
1822 }
1823
1824 void drbd_init_set_defaults(struct drbd_conf *mdev)
1825 {
1826         /* the memset(,0,) did most of this.
1827          * note: only assignments, no allocation in here */
1828
1829         drbd_set_defaults(mdev);
1830
1831         atomic_set(&mdev->ap_bio_cnt, 0);
1832         atomic_set(&mdev->ap_pending_cnt, 0);
1833         atomic_set(&mdev->rs_pending_cnt, 0);
1834         atomic_set(&mdev->unacked_cnt, 0);
1835         atomic_set(&mdev->local_cnt, 0);
1836         atomic_set(&mdev->pp_in_use, 0);
1837         atomic_set(&mdev->pp_in_use_by_net, 0);
1838         atomic_set(&mdev->rs_sect_in, 0);
1839         atomic_set(&mdev->rs_sect_ev, 0);
1840         atomic_set(&mdev->ap_in_flight, 0);
1841
1842         mutex_init(&mdev->md_io_mutex);
1843         mutex_init(&mdev->own_state_mutex);
1844         mdev->state_mutex = &mdev->own_state_mutex;
1845
1846         spin_lock_init(&mdev->al_lock);
1847         spin_lock_init(&mdev->peer_seq_lock);
1848         spin_lock_init(&mdev->epoch_lock);
1849
1850         INIT_LIST_HEAD(&mdev->active_ee);
1851         INIT_LIST_HEAD(&mdev->sync_ee);
1852         INIT_LIST_HEAD(&mdev->done_ee);
1853         INIT_LIST_HEAD(&mdev->read_ee);
1854         INIT_LIST_HEAD(&mdev->net_ee);
1855         INIT_LIST_HEAD(&mdev->resync_reads);
1856         INIT_LIST_HEAD(&mdev->resync_work.list);
1857         INIT_LIST_HEAD(&mdev->unplug_work.list);
1858         INIT_LIST_HEAD(&mdev->go_diskless.list);
1859         INIT_LIST_HEAD(&mdev->md_sync_work.list);
1860         INIT_LIST_HEAD(&mdev->start_resync_work.list);
1861         INIT_LIST_HEAD(&mdev->bm_io_work.w.list);
1862
1863         mdev->resync_work.cb  = w_resync_timer;
1864         mdev->unplug_work.cb  = w_send_write_hint;
1865         mdev->go_diskless.cb  = w_go_diskless;
1866         mdev->md_sync_work.cb = w_md_sync;
1867         mdev->bm_io_work.w.cb = w_bitmap_io;
1868         mdev->start_resync_work.cb = w_start_resync;
1869
1870         mdev->resync_work.mdev  = mdev;
1871         mdev->unplug_work.mdev  = mdev;
1872         mdev->go_diskless.mdev  = mdev;
1873         mdev->md_sync_work.mdev = mdev;
1874         mdev->bm_io_work.w.mdev = mdev;
1875         mdev->start_resync_work.mdev = mdev;
1876
1877         init_timer(&mdev->resync_timer);
1878         init_timer(&mdev->md_sync_timer);
1879         init_timer(&mdev->start_resync_timer);
1880         init_timer(&mdev->request_timer);
1881         mdev->resync_timer.function = resync_timer_fn;
1882         mdev->resync_timer.data = (unsigned long) mdev;
1883         mdev->md_sync_timer.function = md_sync_timer_fn;
1884         mdev->md_sync_timer.data = (unsigned long) mdev;
1885         mdev->start_resync_timer.function = start_resync_timer_fn;
1886         mdev->start_resync_timer.data = (unsigned long) mdev;
1887         mdev->request_timer.function = request_timer_fn;
1888         mdev->request_timer.data = (unsigned long) mdev;
1889
1890         init_waitqueue_head(&mdev->misc_wait);
1891         init_waitqueue_head(&mdev->state_wait);
1892         init_waitqueue_head(&mdev->ee_wait);
1893         init_waitqueue_head(&mdev->al_wait);
1894         init_waitqueue_head(&mdev->seq_wait);
1895
1896         /* mdev->tconn->agreed_pro_version gets initialized in drbd_connect() */
1897         mdev->write_ordering = WO_bdev_flush;
1898         mdev->resync_wenr = LC_FREE;
1899         mdev->peer_max_bio_size = DRBD_MAX_BIO_SIZE_SAFE;
1900         mdev->local_max_bio_size = DRBD_MAX_BIO_SIZE_SAFE;
1901 }
1902
1903 void drbd_mdev_cleanup(struct drbd_conf *mdev)
1904 {
1905         int i;
1906         if (mdev->tconn->receiver.t_state != NONE)
1907                 dev_err(DEV, "ASSERT FAILED: receiver t_state == %d expected 0.\n",
1908                                 mdev->tconn->receiver.t_state);
1909
1910         /* no need to lock it, I'm the only thread alive */
1911         if (atomic_read(&mdev->current_epoch->epoch_size) !=  0)
1912                 dev_err(DEV, "epoch_size:%d\n", atomic_read(&mdev->current_epoch->epoch_size));
1913         mdev->al_writ_cnt  =
1914         mdev->bm_writ_cnt  =
1915         mdev->read_cnt     =
1916         mdev->recv_cnt     =
1917         mdev->send_cnt     =
1918         mdev->writ_cnt     =
1919         mdev->p_size       =
1920         mdev->rs_start     =
1921         mdev->rs_total     =
1922         mdev->rs_failed    = 0;
1923         mdev->rs_last_events = 0;
1924         mdev->rs_last_sect_ev = 0;
1925         for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1926                 mdev->rs_mark_left[i] = 0;
1927                 mdev->rs_mark_time[i] = 0;
1928         }
1929         D_ASSERT(mdev->tconn->net_conf == NULL);
1930
1931         drbd_set_my_capacity(mdev, 0);
1932         if (mdev->bitmap) {
1933                 /* maybe never allocated. */
1934                 drbd_bm_resize(mdev, 0, 1);
1935                 drbd_bm_cleanup(mdev);
1936         }
1937
1938         drbd_free_resources(mdev);
1939         clear_bit(AL_SUSPENDED, &mdev->flags);
1940
1941         /*
1942          * currently we drbd_init_ee only on module load, so
1943          * we may do drbd_release_ee only on module unload!
1944          */
1945         D_ASSERT(list_empty(&mdev->active_ee));
1946         D_ASSERT(list_empty(&mdev->sync_ee));
1947         D_ASSERT(list_empty(&mdev->done_ee));
1948         D_ASSERT(list_empty(&mdev->read_ee));
1949         D_ASSERT(list_empty(&mdev->net_ee));
1950         D_ASSERT(list_empty(&mdev->resync_reads));
1951         D_ASSERT(list_empty(&mdev->tconn->data.work.q));
1952         D_ASSERT(list_empty(&mdev->tconn->meta.work.q));
1953         D_ASSERT(list_empty(&mdev->resync_work.list));
1954         D_ASSERT(list_empty(&mdev->unplug_work.list));
1955         D_ASSERT(list_empty(&mdev->go_diskless.list));
1956
1957         drbd_set_defaults(mdev);
1958 }
1959
1960
1961 static void drbd_destroy_mempools(void)
1962 {
1963         struct page *page;
1964
1965         while (drbd_pp_pool) {
1966                 page = drbd_pp_pool;
1967                 drbd_pp_pool = (struct page *)page_private(page);
1968                 __free_page(page);
1969                 drbd_pp_vacant--;
1970         }
1971
1972         /* D_ASSERT(atomic_read(&drbd_pp_vacant)==0); */
1973
1974         if (drbd_md_io_bio_set)
1975                 bioset_free(drbd_md_io_bio_set);
1976         if (drbd_md_io_page_pool)
1977                 mempool_destroy(drbd_md_io_page_pool);
1978         if (drbd_ee_mempool)
1979                 mempool_destroy(drbd_ee_mempool);
1980         if (drbd_request_mempool)
1981                 mempool_destroy(drbd_request_mempool);
1982         if (drbd_ee_cache)
1983                 kmem_cache_destroy(drbd_ee_cache);
1984         if (drbd_request_cache)
1985                 kmem_cache_destroy(drbd_request_cache);
1986         if (drbd_bm_ext_cache)
1987                 kmem_cache_destroy(drbd_bm_ext_cache);
1988         if (drbd_al_ext_cache)
1989                 kmem_cache_destroy(drbd_al_ext_cache);
1990
1991         drbd_md_io_bio_set   = NULL;
1992         drbd_md_io_page_pool = NULL;
1993         drbd_ee_mempool      = NULL;
1994         drbd_request_mempool = NULL;
1995         drbd_ee_cache        = NULL;
1996         drbd_request_cache   = NULL;
1997         drbd_bm_ext_cache    = NULL;
1998         drbd_al_ext_cache    = NULL;
1999
2000         return;
2001 }
2002
2003 static int drbd_create_mempools(void)
2004 {
2005         struct page *page;
2006         const int number = (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count;
2007         int i;
2008
2009         /* prepare our caches and mempools */
2010         drbd_request_mempool = NULL;
2011         drbd_ee_cache        = NULL;
2012         drbd_request_cache   = NULL;
2013         drbd_bm_ext_cache    = NULL;
2014         drbd_al_ext_cache    = NULL;
2015         drbd_pp_pool         = NULL;
2016         drbd_md_io_page_pool = NULL;
2017         drbd_md_io_bio_set   = NULL;
2018
2019         /* caches */
2020         drbd_request_cache = kmem_cache_create(
2021                 "drbd_req", sizeof(struct drbd_request), 0, 0, NULL);
2022         if (drbd_request_cache == NULL)
2023                 goto Enomem;
2024
2025         drbd_ee_cache = kmem_cache_create(
2026                 "drbd_ee", sizeof(struct drbd_peer_request), 0, 0, NULL);
2027         if (drbd_ee_cache == NULL)
2028                 goto Enomem;
2029
2030         drbd_bm_ext_cache = kmem_cache_create(
2031                 "drbd_bm", sizeof(struct bm_extent), 0, 0, NULL);
2032         if (drbd_bm_ext_cache == NULL)
2033                 goto Enomem;
2034
2035         drbd_al_ext_cache = kmem_cache_create(
2036                 "drbd_al", sizeof(struct lc_element), 0, 0, NULL);
2037         if (drbd_al_ext_cache == NULL)
2038                 goto Enomem;
2039
2040         /* mempools */
2041         drbd_md_io_bio_set = bioset_create(DRBD_MIN_POOL_PAGES, 0);
2042         if (drbd_md_io_bio_set == NULL)
2043                 goto Enomem;
2044
2045         drbd_md_io_page_pool = mempool_create_page_pool(DRBD_MIN_POOL_PAGES, 0);
2046         if (drbd_md_io_page_pool == NULL)
2047                 goto Enomem;
2048
2049         drbd_request_mempool = mempool_create(number,
2050                 mempool_alloc_slab, mempool_free_slab, drbd_request_cache);
2051         if (drbd_request_mempool == NULL)
2052                 goto Enomem;
2053
2054         drbd_ee_mempool = mempool_create(number,
2055                 mempool_alloc_slab, mempool_free_slab, drbd_ee_cache);
2056         if (drbd_ee_mempool == NULL)
2057                 goto Enomem;
2058
2059         /* drbd's page pool */
2060         spin_lock_init(&drbd_pp_lock);
2061
2062         for (i = 0; i < number; i++) {
2063                 page = alloc_page(GFP_HIGHUSER);
2064                 if (!page)
2065                         goto Enomem;
2066                 set_page_private(page, (unsigned long)drbd_pp_pool);
2067                 drbd_pp_pool = page;
2068         }
2069         drbd_pp_vacant = number;
2070
2071         return 0;
2072
2073 Enomem:
2074         drbd_destroy_mempools(); /* in case we allocated some */
2075         return -ENOMEM;
2076 }
2077
2078 static int drbd_notify_sys(struct notifier_block *this, unsigned long code,
2079         void *unused)
2080 {
2081         /* just so we have it.  you never know what interesting things we
2082          * might want to do here some day...
2083          */
2084
2085         return NOTIFY_DONE;
2086 }
2087
2088 static struct notifier_block drbd_notifier = {
2089         .notifier_call = drbd_notify_sys,
2090 };
2091
2092 static void drbd_release_ee_lists(struct drbd_conf *mdev)
2093 {
2094         int rr;
2095
2096         rr = drbd_release_ee(mdev, &mdev->active_ee);
2097         if (rr)
2098                 dev_err(DEV, "%d EEs in active list found!\n", rr);
2099
2100         rr = drbd_release_ee(mdev, &mdev->sync_ee);
2101         if (rr)
2102                 dev_err(DEV, "%d EEs in sync list found!\n", rr);
2103
2104         rr = drbd_release_ee(mdev, &mdev->read_ee);
2105         if (rr)
2106                 dev_err(DEV, "%d EEs in read list found!\n", rr);
2107
2108         rr = drbd_release_ee(mdev, &mdev->done_ee);
2109         if (rr)
2110                 dev_err(DEV, "%d EEs in done list found!\n", rr);
2111
2112         rr = drbd_release_ee(mdev, &mdev->net_ee);
2113         if (rr)
2114                 dev_err(DEV, "%d EEs in net list found!\n", rr);
2115 }
2116
2117 /* caution. no locking. */
2118 void drbd_delete_device(unsigned int minor)
2119 {
2120         struct drbd_conf *mdev = minor_to_mdev(minor);
2121
2122         if (!mdev)
2123                 return;
2124
2125         idr_remove(&mdev->tconn->volumes, mdev->vnr);
2126         idr_remove(&minors, minor);
2127         synchronize_rcu();
2128
2129         /* paranoia asserts */
2130         D_ASSERT(mdev->open_cnt == 0);
2131         D_ASSERT(list_empty(&mdev->tconn->data.work.q));
2132         /* end paranoia asserts */
2133
2134         del_gendisk(mdev->vdisk);
2135
2136         /* cleanup stuff that may have been allocated during
2137          * device (re-)configuration or state changes */
2138
2139         if (mdev->this_bdev)
2140                 bdput(mdev->this_bdev);
2141
2142         drbd_free_resources(mdev);
2143
2144         drbd_release_ee_lists(mdev);
2145
2146         lc_destroy(mdev->act_log);
2147         lc_destroy(mdev->resync);
2148
2149         kfree(mdev->p_uuid);
2150         /* mdev->p_uuid = NULL; */
2151
2152         /* cleanup the rest that has been
2153          * allocated from drbd_new_device
2154          * and actually free the mdev itself */
2155         drbd_free_mdev(mdev);
2156 }
2157
2158 static void drbd_cleanup(void)
2159 {
2160         unsigned int i;
2161         struct drbd_conf *mdev;
2162
2163         unregister_reboot_notifier(&drbd_notifier);
2164
2165         /* first remove proc,
2166          * drbdsetup uses it's presence to detect
2167          * whether DRBD is loaded.
2168          * If we would get stuck in proc removal,
2169          * but have netlink already deregistered,
2170          * some drbdsetup commands may wait forever
2171          * for an answer.
2172          */
2173         if (drbd_proc)
2174                 remove_proc_entry("drbd", NULL);
2175
2176         drbd_genl_unregister();
2177
2178         idr_for_each_entry(&minors, mdev, i)
2179                 drbd_delete_device(i);
2180         drbd_destroy_mempools();
2181         unregister_blkdev(DRBD_MAJOR, "drbd");
2182
2183         idr_destroy(&minors);
2184
2185         printk(KERN_INFO "drbd: module cleanup done.\n");
2186 }
2187
2188 /**
2189  * drbd_congested() - Callback for pdflush
2190  * @congested_data:     User data
2191  * @bdi_bits:           Bits pdflush is currently interested in
2192  *
2193  * Returns 1<<BDI_async_congested and/or 1<<BDI_sync_congested if we are congested.
2194  */
2195 static int drbd_congested(void *congested_data, int bdi_bits)
2196 {
2197         struct drbd_conf *mdev = congested_data;
2198         struct request_queue *q;
2199         char reason = '-';
2200         int r = 0;
2201
2202         if (!may_inc_ap_bio(mdev)) {
2203                 /* DRBD has frozen IO */
2204                 r = bdi_bits;
2205                 reason = 'd';
2206                 goto out;
2207         }
2208
2209         if (get_ldev(mdev)) {
2210                 q = bdev_get_queue(mdev->ldev->backing_bdev);
2211                 r = bdi_congested(&q->backing_dev_info, bdi_bits);
2212                 put_ldev(mdev);
2213                 if (r)
2214                         reason = 'b';
2215         }
2216
2217         if (bdi_bits & (1 << BDI_async_congested) && test_bit(NET_CONGESTED, &mdev->tconn->flags)) {
2218                 r |= (1 << BDI_async_congested);
2219                 reason = reason == 'b' ? 'a' : 'n';
2220         }
2221
2222 out:
2223         mdev->congestion_reason = reason;
2224         return r;
2225 }
2226
2227 static void drbd_init_workqueue(struct drbd_work_queue* wq)
2228 {
2229         sema_init(&wq->s, 0);
2230         spin_lock_init(&wq->q_lock);
2231         INIT_LIST_HEAD(&wq->q);
2232 }
2233
2234 struct drbd_tconn *conn_by_name(const char *name)
2235 {
2236         struct drbd_tconn *tconn;
2237
2238         if (!name || !name[0])
2239                 return NULL;
2240
2241         write_lock_irq(&global_state_lock);
2242         list_for_each_entry(tconn, &drbd_tconns, all_tconn) {
2243                 if (!strcmp(tconn->name, name))
2244                         goto found;
2245         }
2246         tconn = NULL;
2247 found:
2248         write_unlock_irq(&global_state_lock);
2249         return tconn;
2250 }
2251
2252 struct drbd_tconn *drbd_new_tconn(const char *name)
2253 {
2254         struct drbd_tconn *tconn;
2255
2256         tconn = kzalloc(sizeof(struct drbd_tconn), GFP_KERNEL);
2257         if (!tconn)
2258                 return NULL;
2259
2260         tconn->name = kstrdup(name, GFP_KERNEL);
2261         if (!tconn->name)
2262                 goto fail;
2263
2264         if (!zalloc_cpumask_var(&tconn->cpu_mask, GFP_KERNEL))
2265                 goto fail;
2266
2267         if (!tl_init(tconn))
2268                 goto fail;
2269
2270         tconn->cstate = C_STANDALONE;
2271         mutex_init(&tconn->cstate_mutex);
2272         spin_lock_init(&tconn->req_lock);
2273         atomic_set(&tconn->net_cnt, 0);
2274         init_waitqueue_head(&tconn->net_cnt_wait);
2275         init_waitqueue_head(&tconn->ping_wait);
2276         idr_init(&tconn->volumes);
2277
2278         drbd_init_workqueue(&tconn->data.work);
2279         mutex_init(&tconn->data.mutex);
2280
2281         drbd_init_workqueue(&tconn->meta.work);
2282         mutex_init(&tconn->meta.mutex);
2283
2284         drbd_thread_init(tconn, &tconn->receiver, drbdd_init, "receiver");
2285         drbd_thread_init(tconn, &tconn->worker, drbd_worker, "worker");
2286         drbd_thread_init(tconn, &tconn->asender, drbd_asender, "asender");
2287
2288         write_lock_irq(&global_state_lock);
2289         list_add(&tconn->all_tconn, &drbd_tconns);
2290         write_unlock_irq(&global_state_lock);
2291
2292         return tconn;
2293
2294 fail:
2295         tl_cleanup(tconn);
2296         free_cpumask_var(tconn->cpu_mask);
2297         kfree(tconn->name);
2298         kfree(tconn);
2299
2300         return NULL;
2301 }
2302
2303 void drbd_free_tconn(struct drbd_tconn *tconn)
2304 {
2305         write_lock_irq(&global_state_lock);
2306         list_del(&tconn->all_tconn);
2307         write_unlock_irq(&global_state_lock);
2308         idr_destroy(&tconn->volumes);
2309
2310         free_cpumask_var(tconn->cpu_mask);
2311         kfree(tconn->name);
2312         kfree(tconn->int_dig_out);
2313         kfree(tconn->int_dig_in);
2314         kfree(tconn->int_dig_vv);
2315         kfree(tconn);
2316 }
2317
2318 enum drbd_ret_code conn_new_minor(struct drbd_tconn *tconn, unsigned int minor, int vnr)
2319 {
2320         struct drbd_conf *mdev;
2321         struct gendisk *disk;
2322         struct request_queue *q;
2323         int vnr_got = vnr;
2324         int minor_got = minor;
2325         enum drbd_ret_code err = ERR_NOMEM;
2326
2327         mdev = minor_to_mdev(minor);
2328         if (mdev)
2329                 return ERR_MINOR_EXISTS;
2330
2331         /* GFP_KERNEL, we are outside of all write-out paths */
2332         mdev = kzalloc(sizeof(struct drbd_conf), GFP_KERNEL);
2333         if (!mdev)
2334                 return ERR_NOMEM;
2335
2336         mdev->tconn = tconn;
2337         mdev->minor = minor;
2338         mdev->vnr = vnr;
2339
2340         drbd_init_set_defaults(mdev);
2341
2342         q = blk_alloc_queue(GFP_KERNEL);
2343         if (!q)
2344                 goto out_no_q;
2345         mdev->rq_queue = q;
2346         q->queuedata   = mdev;
2347
2348         disk = alloc_disk(1);
2349         if (!disk)
2350                 goto out_no_disk;
2351         mdev->vdisk = disk;
2352
2353         set_disk_ro(disk, true);
2354
2355         disk->queue = q;
2356         disk->major = DRBD_MAJOR;
2357         disk->first_minor = minor;
2358         disk->fops = &drbd_ops;
2359         sprintf(disk->disk_name, "drbd%d", minor);
2360         disk->private_data = mdev;
2361
2362         mdev->this_bdev = bdget(MKDEV(DRBD_MAJOR, minor));
2363         /* we have no partitions. we contain only ourselves. */
2364         mdev->this_bdev->bd_contains = mdev->this_bdev;
2365
2366         q->backing_dev_info.congested_fn = drbd_congested;
2367         q->backing_dev_info.congested_data = mdev;
2368
2369         blk_queue_make_request(q, drbd_make_request);
2370         /* Setting the max_hw_sectors to an odd value of 8kibyte here
2371            This triggers a max_bio_size message upon first attach or connect */
2372         blk_queue_max_hw_sectors(q, DRBD_MAX_BIO_SIZE_SAFE >> 8);
2373         blk_queue_bounce_limit(q, BLK_BOUNCE_ANY);
2374         blk_queue_merge_bvec(q, drbd_merge_bvec);
2375         q->queue_lock = &mdev->tconn->req_lock; /* needed since we use */
2376
2377         mdev->md_io_page = alloc_page(GFP_KERNEL);
2378         if (!mdev->md_io_page)
2379                 goto out_no_io_page;
2380
2381         if (drbd_bm_init(mdev))
2382                 goto out_no_bitmap;
2383         mdev->read_requests = RB_ROOT;
2384         mdev->write_requests = RB_ROOT;
2385
2386         mdev->current_epoch = kzalloc(sizeof(struct drbd_epoch), GFP_KERNEL);
2387         if (!mdev->current_epoch)
2388                 goto out_no_epoch;
2389
2390         INIT_LIST_HEAD(&mdev->current_epoch->list);
2391         mdev->epochs = 1;
2392
2393         if (!idr_pre_get(&minors, GFP_KERNEL))
2394                 goto out_no_minor_idr;
2395         if (idr_get_new_above(&minors, mdev, minor, &minor_got))
2396                 goto out_no_minor_idr;
2397         if (minor_got != minor) {
2398                 err = ERR_MINOR_EXISTS;
2399                 drbd_msg_put_info("requested minor exists already");
2400                 goto out_idr_remove_minor;
2401         }
2402
2403         if (!idr_pre_get(&tconn->volumes, GFP_KERNEL))
2404                 goto out_idr_remove_minor;
2405         if (idr_get_new_above(&tconn->volumes, mdev, vnr, &vnr_got))
2406                 goto out_idr_remove_minor;
2407         if (vnr_got != vnr) {
2408                 err = ERR_INVALID_REQUEST;
2409                 drbd_msg_put_info("requested volume exists already");
2410                 goto out_idr_remove_vol;
2411         }
2412         add_disk(disk);
2413
2414         return NO_ERROR;
2415
2416 out_idr_remove_vol:
2417         idr_remove(&tconn->volumes, vnr_got);
2418 out_idr_remove_minor:
2419         idr_remove(&minors, minor_got);
2420         synchronize_rcu();
2421 out_no_minor_idr:
2422         kfree(mdev->current_epoch);
2423 out_no_epoch:
2424         drbd_bm_cleanup(mdev);
2425 out_no_bitmap:
2426         __free_page(mdev->md_io_page);
2427 out_no_io_page:
2428         put_disk(disk);
2429 out_no_disk:
2430         blk_cleanup_queue(q);
2431 out_no_q:
2432         kfree(mdev);
2433         return err;
2434 }
2435
2436 /* counterpart of drbd_new_device.
2437  * last part of drbd_delete_device. */
2438 void drbd_free_mdev(struct drbd_conf *mdev)
2439 {
2440         kfree(mdev->current_epoch);
2441         if (mdev->bitmap) /* should no longer be there. */
2442                 drbd_bm_cleanup(mdev);
2443         __free_page(mdev->md_io_page);
2444         put_disk(mdev->vdisk);
2445         blk_cleanup_queue(mdev->rq_queue);
2446         kfree(mdev);
2447 }
2448
2449
2450 int __init drbd_init(void)
2451 {
2452         int err;
2453
2454         BUILD_BUG_ON(sizeof(struct p_header80) != sizeof(struct p_header95));
2455         BUILD_BUG_ON(sizeof(struct p_handshake) != 80);
2456
2457         if (minor_count < DRBD_MINOR_COUNT_MIN || minor_count > DRBD_MINOR_COUNT_MAX) {
2458                 printk(KERN_ERR
2459                        "drbd: invalid minor_count (%d)\n", minor_count);
2460 #ifdef MODULE
2461                 return -EINVAL;
2462 #else
2463                 minor_count = 8;
2464 #endif
2465         }
2466
2467         err = register_blkdev(DRBD_MAJOR, "drbd");
2468         if (err) {
2469                 printk(KERN_ERR
2470                        "drbd: unable to register block device major %d\n",
2471                        DRBD_MAJOR);
2472                 return err;
2473         }
2474
2475         err = drbd_genl_register();
2476         if (err) {
2477                 printk(KERN_ERR "drbd: unable to register generic netlink family\n");
2478                 goto fail;
2479         }
2480
2481
2482         register_reboot_notifier(&drbd_notifier);
2483
2484         /*
2485          * allocate all necessary structs
2486          */
2487         err = -ENOMEM;
2488
2489         init_waitqueue_head(&drbd_pp_wait);
2490
2491         drbd_proc = NULL; /* play safe for drbd_cleanup */
2492         idr_init(&minors);
2493
2494         err = drbd_create_mempools();
2495         if (err)
2496                 goto fail;
2497
2498         drbd_proc = proc_create_data("drbd", S_IFREG | S_IRUGO , NULL, &drbd_proc_fops, NULL);
2499         if (!drbd_proc) {
2500                 printk(KERN_ERR "drbd: unable to register proc file\n");
2501                 goto fail;
2502         }
2503
2504         rwlock_init(&global_state_lock);
2505         INIT_LIST_HEAD(&drbd_tconns);
2506
2507         printk(KERN_INFO "drbd: initialized. "
2508                "Version: " REL_VERSION " (api:%d/proto:%d-%d)\n",
2509                API_VERSION, PRO_VERSION_MIN, PRO_VERSION_MAX);
2510         printk(KERN_INFO "drbd: %s\n", drbd_buildtag());
2511         printk(KERN_INFO "drbd: registered as block device major %d\n",
2512                 DRBD_MAJOR);
2513
2514         return 0; /* Success! */
2515
2516 fail:
2517         drbd_cleanup();
2518         if (err == -ENOMEM)
2519                 /* currently always the case */
2520                 printk(KERN_ERR "drbd: ran out of memory\n");
2521         else
2522                 printk(KERN_ERR "drbd: initialization failure\n");
2523         return err;
2524 }
2525
2526 void drbd_free_bc(struct drbd_backing_dev *ldev)
2527 {
2528         if (ldev == NULL)
2529                 return;
2530
2531         blkdev_put(ldev->backing_bdev, FMODE_READ | FMODE_WRITE | FMODE_EXCL);
2532         blkdev_put(ldev->md_bdev, FMODE_READ | FMODE_WRITE | FMODE_EXCL);
2533
2534         kfree(ldev);
2535 }
2536
2537 void drbd_free_sock(struct drbd_tconn *tconn)
2538 {
2539         if (tconn->data.socket) {
2540                 mutex_lock(&tconn->data.mutex);
2541                 kernel_sock_shutdown(tconn->data.socket, SHUT_RDWR);
2542                 sock_release(tconn->data.socket);
2543                 tconn->data.socket = NULL;
2544                 mutex_unlock(&tconn->data.mutex);
2545         }
2546         if (tconn->meta.socket) {
2547                 mutex_lock(&tconn->meta.mutex);
2548                 kernel_sock_shutdown(tconn->meta.socket, SHUT_RDWR);
2549                 sock_release(tconn->meta.socket);
2550                 tconn->meta.socket = NULL;
2551                 mutex_unlock(&tconn->meta.mutex);
2552         }
2553 }
2554
2555
2556 void drbd_free_resources(struct drbd_conf *mdev)
2557 {
2558         crypto_free_hash(mdev->csums_tfm);
2559         mdev->csums_tfm = NULL;
2560         crypto_free_hash(mdev->verify_tfm);
2561         mdev->verify_tfm = NULL;
2562         crypto_free_hash(mdev->tconn->cram_hmac_tfm);
2563         mdev->tconn->cram_hmac_tfm = NULL;
2564         crypto_free_hash(mdev->tconn->integrity_w_tfm);
2565         mdev->tconn->integrity_w_tfm = NULL;
2566         crypto_free_hash(mdev->tconn->integrity_r_tfm);
2567         mdev->tconn->integrity_r_tfm = NULL;
2568
2569         drbd_free_sock(mdev->tconn);
2570
2571         __no_warn(local,
2572                   drbd_free_bc(mdev->ldev);
2573                   mdev->ldev = NULL;);
2574 }
2575
2576 /* meta data management */
2577
2578 struct meta_data_on_disk {
2579         u64 la_size;           /* last agreed size. */
2580         u64 uuid[UI_SIZE];   /* UUIDs. */
2581         u64 device_uuid;
2582         u64 reserved_u64_1;
2583         u32 flags;             /* MDF */
2584         u32 magic;
2585         u32 md_size_sect;
2586         u32 al_offset;         /* offset to this block */
2587         u32 al_nr_extents;     /* important for restoring the AL */
2588               /* `-- act_log->nr_elements <-- sync_conf.al_extents */
2589         u32 bm_offset;         /* offset to the bitmap, from here */
2590         u32 bm_bytes_per_bit;  /* BM_BLOCK_SIZE */
2591         u32 la_peer_max_bio_size;   /* last peer max_bio_size */
2592         u32 reserved_u32[3];
2593
2594 } __packed;
2595
2596 /**
2597  * drbd_md_sync() - Writes the meta data super block if the MD_DIRTY flag bit is set
2598  * @mdev:       DRBD device.
2599  */
2600 void drbd_md_sync(struct drbd_conf *mdev)
2601 {
2602         struct meta_data_on_disk *buffer;
2603         sector_t sector;
2604         int i;
2605
2606         del_timer(&mdev->md_sync_timer);
2607         /* timer may be rearmed by drbd_md_mark_dirty() now. */
2608         if (!test_and_clear_bit(MD_DIRTY, &mdev->flags))
2609                 return;
2610
2611         /* We use here D_FAILED and not D_ATTACHING because we try to write
2612          * metadata even if we detach due to a disk failure! */
2613         if (!get_ldev_if_state(mdev, D_FAILED))
2614                 return;
2615
2616         mutex_lock(&mdev->md_io_mutex);
2617         buffer = (struct meta_data_on_disk *)page_address(mdev->md_io_page);
2618         memset(buffer, 0, 512);
2619
2620         buffer->la_size = cpu_to_be64(drbd_get_capacity(mdev->this_bdev));
2621         for (i = UI_CURRENT; i < UI_SIZE; i++)
2622                 buffer->uuid[i] = cpu_to_be64(mdev->ldev->md.uuid[i]);
2623         buffer->flags = cpu_to_be32(mdev->ldev->md.flags);
2624         buffer->magic = cpu_to_be32(DRBD_MD_MAGIC);
2625
2626         buffer->md_size_sect  = cpu_to_be32(mdev->ldev->md.md_size_sect);
2627         buffer->al_offset     = cpu_to_be32(mdev->ldev->md.al_offset);
2628         buffer->al_nr_extents = cpu_to_be32(mdev->act_log->nr_elements);
2629         buffer->bm_bytes_per_bit = cpu_to_be32(BM_BLOCK_SIZE);
2630         buffer->device_uuid = cpu_to_be64(mdev->ldev->md.device_uuid);
2631
2632         buffer->bm_offset = cpu_to_be32(mdev->ldev->md.bm_offset);
2633         buffer->la_peer_max_bio_size = cpu_to_be32(mdev->peer_max_bio_size);
2634
2635         D_ASSERT(drbd_md_ss__(mdev, mdev->ldev) == mdev->ldev->md.md_offset);
2636         sector = mdev->ldev->md.md_offset;
2637
2638         if (!drbd_md_sync_page_io(mdev, mdev->ldev, sector, WRITE)) {
2639                 /* this was a try anyways ... */
2640                 dev_err(DEV, "meta data update failed!\n");
2641                 drbd_chk_io_error(mdev, 1, true);
2642         }
2643
2644         /* Update mdev->ldev->md.la_size_sect,
2645          * since we updated it on metadata. */
2646         mdev->ldev->md.la_size_sect = drbd_get_capacity(mdev->this_bdev);
2647
2648         mutex_unlock(&mdev->md_io_mutex);
2649         put_ldev(mdev);
2650 }
2651
2652 /**
2653  * drbd_md_read() - Reads in the meta data super block
2654  * @mdev:       DRBD device.
2655  * @bdev:       Device from which the meta data should be read in.
2656  *
2657  * Return 0 (NO_ERROR) on success, and an enum drbd_ret_code in case
2658  * something goes wrong.  Currently only: ERR_IO_MD_DISK, ERR_MD_INVALID.
2659  */
2660 int drbd_md_read(struct drbd_conf *mdev, struct drbd_backing_dev *bdev)
2661 {
2662         struct meta_data_on_disk *buffer;
2663         int i, rv = NO_ERROR;
2664
2665         if (!get_ldev_if_state(mdev, D_ATTACHING))
2666                 return ERR_IO_MD_DISK;
2667
2668         mutex_lock(&mdev->md_io_mutex);
2669         buffer = (struct meta_data_on_disk *)page_address(mdev->md_io_page);
2670
2671         if (!drbd_md_sync_page_io(mdev, bdev, bdev->md.md_offset, READ)) {
2672                 /* NOTE: can't do normal error processing here as this is
2673                    called BEFORE disk is attached */
2674                 dev_err(DEV, "Error while reading metadata.\n");
2675                 rv = ERR_IO_MD_DISK;
2676                 goto err;
2677         }
2678
2679         if (buffer->magic != cpu_to_be32(DRBD_MD_MAGIC)) {
2680                 dev_err(DEV, "Error while reading metadata, magic not found.\n");
2681                 rv = ERR_MD_INVALID;
2682                 goto err;
2683         }
2684         if (be32_to_cpu(buffer->al_offset) != bdev->md.al_offset) {
2685                 dev_err(DEV, "unexpected al_offset: %d (expected %d)\n",
2686                     be32_to_cpu(buffer->al_offset), bdev->md.al_offset);
2687                 rv = ERR_MD_INVALID;
2688                 goto err;
2689         }
2690         if (be32_to_cpu(buffer->bm_offset) != bdev->md.bm_offset) {
2691                 dev_err(DEV, "unexpected bm_offset: %d (expected %d)\n",
2692                     be32_to_cpu(buffer->bm_offset), bdev->md.bm_offset);
2693                 rv = ERR_MD_INVALID;
2694                 goto err;
2695         }
2696         if (be32_to_cpu(buffer->md_size_sect) != bdev->md.md_size_sect) {
2697                 dev_err(DEV, "unexpected md_size: %u (expected %u)\n",
2698                     be32_to_cpu(buffer->md_size_sect), bdev->md.md_size_sect);
2699                 rv = ERR_MD_INVALID;
2700                 goto err;
2701         }
2702
2703         if (be32_to_cpu(buffer->bm_bytes_per_bit) != BM_BLOCK_SIZE) {
2704                 dev_err(DEV, "unexpected bm_bytes_per_bit: %u (expected %u)\n",
2705                     be32_to_cpu(buffer->bm_bytes_per_bit), BM_BLOCK_SIZE);
2706                 rv = ERR_MD_INVALID;
2707                 goto err;
2708         }
2709
2710         bdev->md.la_size_sect = be64_to_cpu(buffer->la_size);
2711         for (i = UI_CURRENT; i < UI_SIZE; i++)
2712                 bdev->md.uuid[i] = be64_to_cpu(buffer->uuid[i]);
2713         bdev->md.flags = be32_to_cpu(buffer->flags);
2714         mdev->sync_conf.al_extents = be32_to_cpu(buffer->al_nr_extents);
2715         bdev->md.device_uuid = be64_to_cpu(buffer->device_uuid);
2716
2717         spin_lock_irq(&mdev->tconn->req_lock);
2718         if (mdev->state.conn < C_CONNECTED) {
2719                 int peer;
2720                 peer = be32_to_cpu(buffer->la_peer_max_bio_size);
2721                 peer = max_t(int, peer, DRBD_MAX_BIO_SIZE_SAFE);
2722                 mdev->peer_max_bio_size = peer;
2723         }
2724         spin_unlock_irq(&mdev->tconn->req_lock);
2725
2726         if (mdev->sync_conf.al_extents < 7)
2727                 mdev->sync_conf.al_extents = 127;
2728
2729  err:
2730         mutex_unlock(&mdev->md_io_mutex);
2731         put_ldev(mdev);
2732
2733         return rv;
2734 }
2735
2736 /**
2737  * drbd_md_mark_dirty() - Mark meta data super block as dirty
2738  * @mdev:       DRBD device.
2739  *
2740  * Call this function if you change anything that should be written to
2741  * the meta-data super block. This function sets MD_DIRTY, and starts a
2742  * timer that ensures that within five seconds you have to call drbd_md_sync().
2743  */
2744 #ifdef DEBUG
2745 void drbd_md_mark_dirty_(struct drbd_conf *mdev, unsigned int line, const char *func)
2746 {
2747         if (!test_and_set_bit(MD_DIRTY, &mdev->flags)) {
2748                 mod_timer(&mdev->md_sync_timer, jiffies + HZ);
2749                 mdev->last_md_mark_dirty.line = line;
2750                 mdev->last_md_mark_dirty.func = func;
2751         }
2752 }
2753 #else
2754 void drbd_md_mark_dirty(struct drbd_conf *mdev)
2755 {
2756         if (!test_and_set_bit(MD_DIRTY, &mdev->flags))
2757                 mod_timer(&mdev->md_sync_timer, jiffies + 5*HZ);
2758 }
2759 #endif
2760
2761 static void drbd_uuid_move_history(struct drbd_conf *mdev) __must_hold(local)
2762 {
2763         int i;
2764
2765         for (i = UI_HISTORY_START; i < UI_HISTORY_END; i++)
2766                 mdev->ldev->md.uuid[i+1] = mdev->ldev->md.uuid[i];
2767 }
2768
2769 void _drbd_uuid_set(struct drbd_conf *mdev, int idx, u64 val) __must_hold(local)
2770 {
2771         if (idx == UI_CURRENT) {
2772                 if (mdev->state.role == R_PRIMARY)
2773                         val |= 1;
2774                 else
2775                         val &= ~((u64)1);
2776
2777                 drbd_set_ed_uuid(mdev, val);
2778         }
2779
2780         mdev->ldev->md.uuid[idx] = val;
2781         drbd_md_mark_dirty(mdev);
2782 }
2783
2784
2785 void drbd_uuid_set(struct drbd_conf *mdev, int idx, u64 val) __must_hold(local)
2786 {
2787         if (mdev->ldev->md.uuid[idx]) {
2788                 drbd_uuid_move_history(mdev);
2789                 mdev->ldev->md.uuid[UI_HISTORY_START] = mdev->ldev->md.uuid[idx];
2790         }
2791         _drbd_uuid_set(mdev, idx, val);
2792 }
2793
2794 /**
2795  * drbd_uuid_new_current() - Creates a new current UUID
2796  * @mdev:       DRBD device.
2797  *
2798  * Creates a new current UUID, and rotates the old current UUID into
2799  * the bitmap slot. Causes an incremental resync upon next connect.
2800  */
2801 void drbd_uuid_new_current(struct drbd_conf *mdev) __must_hold(local)
2802 {
2803         u64 val;
2804         unsigned long long bm_uuid = mdev->ldev->md.uuid[UI_BITMAP];
2805
2806         if (bm_uuid)
2807                 dev_warn(DEV, "bm UUID was already set: %llX\n", bm_uuid);
2808
2809         mdev->ldev->md.uuid[UI_BITMAP] = mdev->ldev->md.uuid[UI_CURRENT];
2810
2811         get_random_bytes(&val, sizeof(u64));
2812         _drbd_uuid_set(mdev, UI_CURRENT, val);
2813         drbd_print_uuids(mdev, "new current UUID");
2814         /* get it to stable storage _now_ */
2815         drbd_md_sync(mdev);
2816 }
2817
2818 void drbd_uuid_set_bm(struct drbd_conf *mdev, u64 val) __must_hold(local)
2819 {
2820         if (mdev->ldev->md.uuid[UI_BITMAP] == 0 && val == 0)
2821                 return;
2822
2823         if (val == 0) {
2824                 drbd_uuid_move_history(mdev);
2825                 mdev->ldev->md.uuid[UI_HISTORY_START] = mdev->ldev->md.uuid[UI_BITMAP];
2826                 mdev->ldev->md.uuid[UI_BITMAP] = 0;
2827         } else {
2828                 unsigned long long bm_uuid = mdev->ldev->md.uuid[UI_BITMAP];
2829                 if (bm_uuid)
2830                         dev_warn(DEV, "bm UUID was already set: %llX\n", bm_uuid);
2831
2832                 mdev->ldev->md.uuid[UI_BITMAP] = val & ~((u64)1);
2833         }
2834         drbd_md_mark_dirty(mdev);
2835 }
2836
2837 /**
2838  * drbd_bmio_set_n_write() - io_fn for drbd_queue_bitmap_io() or drbd_bitmap_io()
2839  * @mdev:       DRBD device.
2840  *
2841  * Sets all bits in the bitmap and writes the whole bitmap to stable storage.
2842  */
2843 int drbd_bmio_set_n_write(struct drbd_conf *mdev)
2844 {
2845         int rv = -EIO;
2846
2847         if (get_ldev_if_state(mdev, D_ATTACHING)) {
2848                 drbd_md_set_flag(mdev, MDF_FULL_SYNC);
2849                 drbd_md_sync(mdev);
2850                 drbd_bm_set_all(mdev);
2851
2852                 rv = drbd_bm_write(mdev);
2853
2854                 if (!rv) {
2855                         drbd_md_clear_flag(mdev, MDF_FULL_SYNC);
2856                         drbd_md_sync(mdev);
2857                 }
2858
2859                 put_ldev(mdev);
2860         }
2861
2862         return rv;
2863 }
2864
2865 /**
2866  * drbd_bmio_clear_n_write() - io_fn for drbd_queue_bitmap_io() or drbd_bitmap_io()
2867  * @mdev:       DRBD device.
2868  *
2869  * Clears all bits in the bitmap and writes the whole bitmap to stable storage.
2870  */
2871 int drbd_bmio_clear_n_write(struct drbd_conf *mdev)
2872 {
2873         int rv = -EIO;
2874
2875         drbd_resume_al(mdev);
2876         if (get_ldev_if_state(mdev, D_ATTACHING)) {
2877                 drbd_bm_clear_all(mdev);
2878                 rv = drbd_bm_write(mdev);
2879                 put_ldev(mdev);
2880         }
2881
2882         return rv;
2883 }
2884
2885 static int w_bitmap_io(struct drbd_work *w, int unused)
2886 {
2887         struct bm_io_work *work = container_of(w, struct bm_io_work, w);
2888         struct drbd_conf *mdev = w->mdev;
2889         int rv = -EIO;
2890
2891         D_ASSERT(atomic_read(&mdev->ap_bio_cnt) == 0);
2892
2893         if (get_ldev(mdev)) {
2894                 drbd_bm_lock(mdev, work->why, work->flags);
2895                 rv = work->io_fn(mdev);
2896                 drbd_bm_unlock(mdev);
2897                 put_ldev(mdev);
2898         }
2899
2900         clear_bit_unlock(BITMAP_IO, &mdev->flags);
2901         wake_up(&mdev->misc_wait);
2902
2903         if (work->done)
2904                 work->done(mdev, rv);
2905
2906         clear_bit(BITMAP_IO_QUEUED, &mdev->flags);
2907         work->why = NULL;
2908         work->flags = 0;
2909
2910         return 1;
2911 }
2912
2913 void drbd_ldev_destroy(struct drbd_conf *mdev)
2914 {
2915         lc_destroy(mdev->resync);
2916         mdev->resync = NULL;
2917         lc_destroy(mdev->act_log);
2918         mdev->act_log = NULL;
2919         __no_warn(local,
2920                 drbd_free_bc(mdev->ldev);
2921                 mdev->ldev = NULL;);
2922
2923         clear_bit(GO_DISKLESS, &mdev->flags);
2924 }
2925
2926 static int w_go_diskless(struct drbd_work *w, int unused)
2927 {
2928         struct drbd_conf *mdev = w->mdev;
2929
2930         D_ASSERT(mdev->state.disk == D_FAILED);
2931         /* we cannot assert local_cnt == 0 here, as get_ldev_if_state will
2932          * inc/dec it frequently. Once we are D_DISKLESS, no one will touch
2933          * the protected members anymore, though, so once put_ldev reaches zero
2934          * again, it will be safe to free them. */
2935         drbd_force_state(mdev, NS(disk, D_DISKLESS));
2936         return 1;
2937 }
2938
2939 void drbd_go_diskless(struct drbd_conf *mdev)
2940 {
2941         D_ASSERT(mdev->state.disk == D_FAILED);
2942         if (!test_and_set_bit(GO_DISKLESS, &mdev->flags))
2943                 drbd_queue_work(&mdev->tconn->data.work, &mdev->go_diskless);
2944 }
2945
2946 /**
2947  * drbd_queue_bitmap_io() - Queues an IO operation on the whole bitmap
2948  * @mdev:       DRBD device.
2949  * @io_fn:      IO callback to be called when bitmap IO is possible
2950  * @done:       callback to be called after the bitmap IO was performed
2951  * @why:        Descriptive text of the reason for doing the IO
2952  *
2953  * While IO on the bitmap happens we freeze application IO thus we ensure
2954  * that drbd_set_out_of_sync() can not be called. This function MAY ONLY be
2955  * called from worker context. It MUST NOT be used while a previous such
2956  * work is still pending!
2957  */
2958 void drbd_queue_bitmap_io(struct drbd_conf *mdev,
2959                           int (*io_fn)(struct drbd_conf *),
2960                           void (*done)(struct drbd_conf *, int),
2961                           char *why, enum bm_flag flags)
2962 {
2963         D_ASSERT(current == mdev->tconn->worker.task);
2964
2965         D_ASSERT(!test_bit(BITMAP_IO_QUEUED, &mdev->flags));
2966         D_ASSERT(!test_bit(BITMAP_IO, &mdev->flags));
2967         D_ASSERT(list_empty(&mdev->bm_io_work.w.list));
2968         if (mdev->bm_io_work.why)
2969                 dev_err(DEV, "FIXME going to queue '%s' but '%s' still pending?\n",
2970                         why, mdev->bm_io_work.why);
2971
2972         mdev->bm_io_work.io_fn = io_fn;
2973         mdev->bm_io_work.done = done;
2974         mdev->bm_io_work.why = why;
2975         mdev->bm_io_work.flags = flags;
2976
2977         spin_lock_irq(&mdev->tconn->req_lock);
2978         set_bit(BITMAP_IO, &mdev->flags);
2979         if (atomic_read(&mdev->ap_bio_cnt) == 0) {
2980                 if (!test_and_set_bit(BITMAP_IO_QUEUED, &mdev->flags))
2981                         drbd_queue_work(&mdev->tconn->data.work, &mdev->bm_io_work.w);
2982         }
2983         spin_unlock_irq(&mdev->tconn->req_lock);
2984 }
2985
2986 /**
2987  * drbd_bitmap_io() -  Does an IO operation on the whole bitmap
2988  * @mdev:       DRBD device.
2989  * @io_fn:      IO callback to be called when bitmap IO is possible
2990  * @why:        Descriptive text of the reason for doing the IO
2991  *
2992  * freezes application IO while that the actual IO operations runs. This
2993  * functions MAY NOT be called from worker context.
2994  */
2995 int drbd_bitmap_io(struct drbd_conf *mdev, int (*io_fn)(struct drbd_conf *),
2996                 char *why, enum bm_flag flags)
2997 {
2998         int rv;
2999
3000         D_ASSERT(current != mdev->tconn->worker.task);
3001
3002         if ((flags & BM_LOCKED_SET_ALLOWED) == 0)
3003                 drbd_suspend_io(mdev);
3004
3005         drbd_bm_lock(mdev, why, flags);
3006         rv = io_fn(mdev);
3007         drbd_bm_unlock(mdev);
3008
3009         if ((flags & BM_LOCKED_SET_ALLOWED) == 0)
3010                 drbd_resume_io(mdev);
3011
3012         return rv;
3013 }
3014
3015 void drbd_md_set_flag(struct drbd_conf *mdev, int flag) __must_hold(local)
3016 {
3017         if ((mdev->ldev->md.flags & flag) != flag) {
3018                 drbd_md_mark_dirty(mdev);
3019                 mdev->ldev->md.flags |= flag;
3020         }
3021 }
3022
3023 void drbd_md_clear_flag(struct drbd_conf *mdev, int flag) __must_hold(local)
3024 {
3025         if ((mdev->ldev->md.flags & flag) != 0) {
3026                 drbd_md_mark_dirty(mdev);
3027                 mdev->ldev->md.flags &= ~flag;
3028         }
3029 }
3030 int drbd_md_test_flag(struct drbd_backing_dev *bdev, int flag)
3031 {
3032         return (bdev->md.flags & flag) != 0;
3033 }
3034
3035 static void md_sync_timer_fn(unsigned long data)
3036 {
3037         struct drbd_conf *mdev = (struct drbd_conf *) data;
3038
3039         drbd_queue_work_front(&mdev->tconn->data.work, &mdev->md_sync_work);
3040 }
3041
3042 static int w_md_sync(struct drbd_work *w, int unused)
3043 {
3044         struct drbd_conf *mdev = w->mdev;
3045
3046         dev_warn(DEV, "md_sync_timer expired! Worker calls drbd_md_sync().\n");
3047 #ifdef DEBUG
3048         dev_warn(DEV, "last md_mark_dirty: %s:%u\n",
3049                 mdev->last_md_mark_dirty.func, mdev->last_md_mark_dirty.line);
3050 #endif
3051         drbd_md_sync(mdev);
3052         return 1;
3053 }
3054
3055 const char *cmdname(enum drbd_packet cmd)
3056 {
3057         /* THINK may need to become several global tables
3058          * when we want to support more than
3059          * one PRO_VERSION */
3060         static const char *cmdnames[] = {
3061                 [P_DATA]                = "Data",
3062                 [P_DATA_REPLY]          = "DataReply",
3063                 [P_RS_DATA_REPLY]       = "RSDataReply",
3064                 [P_BARRIER]             = "Barrier",
3065                 [P_BITMAP]              = "ReportBitMap",
3066                 [P_BECOME_SYNC_TARGET]  = "BecomeSyncTarget",
3067                 [P_BECOME_SYNC_SOURCE]  = "BecomeSyncSource",
3068                 [P_UNPLUG_REMOTE]       = "UnplugRemote",
3069                 [P_DATA_REQUEST]        = "DataRequest",
3070                 [P_RS_DATA_REQUEST]     = "RSDataRequest",
3071                 [P_SYNC_PARAM]          = "SyncParam",
3072                 [P_SYNC_PARAM89]        = "SyncParam89",
3073                 [P_PROTOCOL]            = "ReportProtocol",
3074                 [P_UUIDS]               = "ReportUUIDs",
3075                 [P_SIZES]               = "ReportSizes",
3076                 [P_STATE]               = "ReportState",
3077                 [P_SYNC_UUID]           = "ReportSyncUUID",
3078                 [P_AUTH_CHALLENGE]      = "AuthChallenge",
3079                 [P_AUTH_RESPONSE]       = "AuthResponse",
3080                 [P_PING]                = "Ping",
3081                 [P_PING_ACK]            = "PingAck",
3082                 [P_RECV_ACK]            = "RecvAck",
3083                 [P_WRITE_ACK]           = "WriteAck",
3084                 [P_RS_WRITE_ACK]        = "RSWriteAck",
3085                 [P_DISCARD_WRITE]        = "DiscardWrite",
3086                 [P_NEG_ACK]             = "NegAck",
3087                 [P_NEG_DREPLY]          = "NegDReply",
3088                 [P_NEG_RS_DREPLY]       = "NegRSDReply",
3089                 [P_BARRIER_ACK]         = "BarrierAck",
3090                 [P_STATE_CHG_REQ]       = "StateChgRequest",
3091                 [P_STATE_CHG_REPLY]     = "StateChgReply",
3092                 [P_OV_REQUEST]          = "OVRequest",
3093                 [P_OV_REPLY]            = "OVReply",
3094                 [P_OV_RESULT]           = "OVResult",
3095                 [P_CSUM_RS_REQUEST]     = "CsumRSRequest",
3096                 [P_RS_IS_IN_SYNC]       = "CsumRSIsInSync",
3097                 [P_COMPRESSED_BITMAP]   = "CBitmap",
3098                 [P_DELAY_PROBE]         = "DelayProbe",
3099                 [P_OUT_OF_SYNC]         = "OutOfSync",
3100                 [P_RETRY_WRITE]         = "RetryWrite",
3101         };
3102
3103         if (cmd == P_HAND_SHAKE_M)
3104                 return "HandShakeM";
3105         if (cmd == P_HAND_SHAKE_S)
3106                 return "HandShakeS";
3107         if (cmd == P_HAND_SHAKE)
3108                 return "HandShake";
3109         if (cmd >= ARRAY_SIZE(cmdnames))
3110                 return "Unknown";
3111         return cmdnames[cmd];
3112 }
3113
3114 /**
3115  * drbd_wait_misc  -  wait for a request to make progress
3116  * @mdev:       device associated with the request
3117  * @i:          the struct drbd_interval embedded in struct drbd_request or
3118  *              struct drbd_peer_request
3119  */
3120 int drbd_wait_misc(struct drbd_conf *mdev, struct drbd_interval *i)
3121 {
3122         struct net_conf *net_conf = mdev->tconn->net_conf;
3123         DEFINE_WAIT(wait);
3124         long timeout;
3125
3126         if (!net_conf)
3127                 return -ETIMEDOUT;
3128         timeout = MAX_SCHEDULE_TIMEOUT;
3129         if (net_conf->ko_count)
3130                 timeout = net_conf->timeout * HZ / 10 * net_conf->ko_count;
3131
3132         /* Indicate to wake up mdev->misc_wait on progress.  */
3133         i->waiting = true;
3134         prepare_to_wait(&mdev->misc_wait, &wait, TASK_INTERRUPTIBLE);
3135         spin_unlock_irq(&mdev->tconn->req_lock);
3136         timeout = schedule_timeout(timeout);
3137         finish_wait(&mdev->misc_wait, &wait);
3138         spin_lock_irq(&mdev->tconn->req_lock);
3139         if (!timeout || mdev->state.conn < C_CONNECTED)
3140                 return -ETIMEDOUT;
3141         if (signal_pending(current))
3142                 return -ERESTARTSYS;
3143         return 0;
3144 }
3145
3146 #ifdef CONFIG_DRBD_FAULT_INJECTION
3147 /* Fault insertion support including random number generator shamelessly
3148  * stolen from kernel/rcutorture.c */
3149 struct fault_random_state {
3150         unsigned long state;
3151         unsigned long count;
3152 };
3153
3154 #define FAULT_RANDOM_MULT 39916801  /* prime */
3155 #define FAULT_RANDOM_ADD        479001701 /* prime */
3156 #define FAULT_RANDOM_REFRESH 10000
3157
3158 /*
3159  * Crude but fast random-number generator.  Uses a linear congruential
3160  * generator, with occasional help from get_random_bytes().
3161  */
3162 static unsigned long
3163 _drbd_fault_random(struct fault_random_state *rsp)
3164 {
3165         long refresh;
3166
3167         if (!rsp->count--) {
3168                 get_random_bytes(&refresh, sizeof(refresh));
3169                 rsp->state += refresh;
3170                 rsp->count = FAULT_RANDOM_REFRESH;
3171         }
3172         rsp->state = rsp->state * FAULT_RANDOM_MULT + FAULT_RANDOM_ADD;
3173         return swahw32(rsp->state);
3174 }
3175
3176 static char *
3177 _drbd_fault_str(unsigned int type) {
3178         static char *_faults[] = {
3179                 [DRBD_FAULT_MD_WR] = "Meta-data write",
3180                 [DRBD_FAULT_MD_RD] = "Meta-data read",
3181                 [DRBD_FAULT_RS_WR] = "Resync write",
3182                 [DRBD_FAULT_RS_RD] = "Resync read",
3183                 [DRBD_FAULT_DT_WR] = "Data write",
3184                 [DRBD_FAULT_DT_RD] = "Data read",
3185                 [DRBD_FAULT_DT_RA] = "Data read ahead",
3186                 [DRBD_FAULT_BM_ALLOC] = "BM allocation",
3187                 [DRBD_FAULT_AL_EE] = "EE allocation",
3188                 [DRBD_FAULT_RECEIVE] = "receive data corruption",
3189         };
3190
3191         return (type < DRBD_FAULT_MAX) ? _faults[type] : "**Unknown**";
3192 }
3193
3194 unsigned int
3195 _drbd_insert_fault(struct drbd_conf *mdev, unsigned int type)
3196 {
3197         static struct fault_random_state rrs = {0, 0};
3198
3199         unsigned int ret = (
3200                 (fault_devs == 0 ||
3201                         ((1 << mdev_to_minor(mdev)) & fault_devs) != 0) &&
3202                 (((_drbd_fault_random(&rrs) % 100) + 1) <= fault_rate));
3203
3204         if (ret) {
3205                 fault_count++;
3206
3207                 if (__ratelimit(&drbd_ratelimit_state))
3208                         dev_warn(DEV, "***Simulating %s failure\n",
3209                                 _drbd_fault_str(type));
3210         }
3211
3212         return ret;
3213 }
3214 #endif
3215
3216 const char *drbd_buildtag(void)
3217 {
3218         /* DRBD built from external sources has here a reference to the
3219            git hash of the source code. */
3220
3221         static char buildtag[38] = "\0uilt-in";
3222
3223         if (buildtag[0] == 0) {
3224 #ifdef CONFIG_MODULES
3225                 if (THIS_MODULE != NULL)
3226                         sprintf(buildtag, "srcversion: %-24s", THIS_MODULE->srcversion);
3227                 else
3228 #endif
3229                         buildtag[0] = 'b';
3230         }
3231
3232         return buildtag;
3233 }
3234
3235 module_init(drbd_init)
3236 module_exit(drbd_cleanup)
3237
3238 EXPORT_SYMBOL(drbd_conn_str);
3239 EXPORT_SYMBOL(drbd_role_str);
3240 EXPORT_SYMBOL(drbd_disk_str);
3241 EXPORT_SYMBOL(drbd_set_st_err_str);